[mpich2-commits] r8031 - in mpich2/trunk/src/mpid/ch3/channels/nemesis: include nemesis/include nemesis/netmod/tcp nemesis/src src
buntinas at mcs.anl.gov
buntinas at mcs.anl.gov
Wed Feb 23 15:10:27 CST 2011
Author: buntinas
Date: 2011-02-23 15:10:27 -0600 (Wed, 23 Feb 2011)
New Revision: 8031
Modified:
mpich2/trunk/src/mpid/ch3/channels/nemesis/include/mpidi_ch3_impl.h
mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_generic_queue.h
mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_impl.h
mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_inline.h
mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_ckpt.c
mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_impl.h
mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_send.c
mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_debug.c
mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_isend.c
mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_isendv.c
mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_istartmsg.c
mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_istartmsgv.c
mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c
mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c
Log:
Unified send queue implementations to use the same macros. This also fixes a bug that accessed the dev.next pointer in the request after it had been freed.
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/include/mpidi_ch3_impl.h
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/include/mpidi_ch3_impl.h 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/include/mpidi_ch3_impl.h 2011-02-23 21:10:27 UTC (rev 8031)
@@ -10,6 +10,7 @@
#include "mpidi_ch3_conf.h"
#include "mpidimpl.h"
#include "mpiu_os_wrappers.h"
+#include "mpid_nem_generic_queue.h"
#if defined(HAVE_ASSERT_H)
#include <assert.h>
@@ -18,67 +19,34 @@
extern void *MPIDI_CH3_packet_buffer;
extern int MPIDI_CH3I_my_rank;
-#define CH3_NORMAL_QUEUE 0
-#define CH3_RNDV_QUEUE 1
-#define CH3_NUM_QUEUES 2
+typedef GENERIC_Q_DECL(struct MPID_Request) MPIDI_CH3I_shm_sendq_t;
+extern MPIDI_CH3I_shm_sendq_t MPIDI_CH3I_shm_sendq;
+extern struct MPID_Request *MPIDI_CH3I_shm_active_send;
-extern struct MPID_Request *MPIDI_CH3I_sendq_head[CH3_NUM_QUEUES];
-extern struct MPID_Request *MPIDI_CH3I_sendq_tail[CH3_NUM_QUEUES];
-extern struct MPID_Request *MPIDI_CH3I_active_send[CH3_NUM_QUEUES];
+/* Send queue macros */
+/* MT - not thread safe! */
+#define MPIDI_CH3I_Sendq_empty(q) GENERIC_Q_EMPTY (q)
+#define MPIDI_CH3I_Sendq_head(q) GENERIC_Q_HEAD (q)
+#define MPIDI_CH3I_Sendq_enqueue(qp, ep) do { \
+ /* add refcount so req doesn't get freed before it's dequeued */ \
+ MPIR_Request_add_ref(ep); \
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
+ "MPIDI_CH3I_Sendq_enqueue req=%p (handle=%#x), queue=%p", \
+ ep, (ep)->handle, qp)); \
+ GENERIC_Q_ENQUEUE (qp, ep, dev.next); \
+ } while (0)
+#define MPIDI_CH3I_Sendq_dequeue(qp, ep) do { \
+ GENERIC_Q_DEQUEUE (qp, ep, dev.next); \
+ MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
+ "MPIDI_CH3I_Sendq_dequeuereq=%p (handle=%#x), queue=%p", \
+ *(ep), *(ep) ? (*(ep))->handle : -1, qp)); \
+ MPID_Request_release(*(ep)); \
+ } while (0)
+#define MPIDI_CH3I_Sendq_enqueue_multiple_no_refcount(qp, ep0, ep1) \
+ /* used to move reqs from one queue to another, so we don't update */ \
+ /* the refcounts */ \
+ GENERIC_Q_ENQUEUE_MULTIPLE(qp, ep0, ep1, dev.next)
-#define MPIDI_CH3I_SendQ_enqueue(req, queue) \
-do { \
- MPIU_Assert(req != NULL); \
- /* MT - not thread safe! */ \
- MPIDI_DBG_PRINTF((50, FCNAME, "SendQ_enqueue req=0x%08x", req->handle)); \
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
- "MPIDI_CH3I_SendQ_enqueue(req=%p (handle=0x%x), queue=%s (%d))", \
- (req), \
- (req)->handle, \
- #queue, queue)); \
- /* because an OnDataAvail function might complete this request and cause */ \
- /* it to be freed before it is dequeued, we have to add a reference */ \
- /* whenever a req is added to a queue */ \
- MPIR_Request_add_ref(req); \
- req->dev.next = NULL; \
- if (MPIDI_CH3I_sendq_tail[queue] != NULL) \
- { \
- MPIDI_CH3I_sendq_tail[queue]->dev.next = req; \
- } \
- else \
- { \
- MPIDI_CH3I_sendq_head[queue] = req; \
- } \
- MPIDI_CH3I_sendq_tail[queue] = req; \
-} while (0)
-
-/* NOTE: this macro may result in the dequeued request being freed (via
- * MPID_Request_release) */
-#define MPIDI_CH3I_SendQ_dequeue(queue) \
-do { \
- MPID_Request *req_; \
- /* MT - not thread safe! */ \
- MPIDI_DBG_PRINTF((50, FCNAME, "SendQ_dequeue req=0x%08x", \
- MPIDI_CH3I_sendq_head[queue]->handle)); \
- MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
- "MPIDI_CH3I_SendQ_dequeue(queue=%s (%d)), head req=%p (handle=0x%x)", \
- #queue, queue, \
- MPIDI_CH3I_sendq_head[queue], \
- ((MPIDI_CH3I_sendq_head[queue]) ? MPIDI_CH3I_sendq_head[queue]->handle : -1))); \
- /* see the comment in _enqueue above about refcounts */ \
- req_ = MPIDI_CH3I_sendq_head[queue]; \
- MPIDI_CH3I_sendq_head[queue] = MPIDI_CH3I_sendq_head[queue]->dev.next; \
- MPID_Request_release(req_); \
- if (MPIDI_CH3I_sendq_head[queue] == NULL) \
- { \
- MPIDI_CH3I_sendq_tail[queue] = NULL; \
- } \
-} while (0)
-
-#define MPIDI_CH3I_SendQ_head(queue) (MPIDI_CH3I_sendq_head[queue])
-
-#define MPIDI_CH3I_SendQ_empty(queue) (MPIDI_CH3I_sendq_head[queue] == NULL)
-
int MPIDI_CH3I_Progress_init(void);
int MPIDI_CH3I_Progress_finalize(void);
int MPIDI_CH3I_Shm_send_progress(void);
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_generic_queue.h
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_generic_queue.h 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_generic_queue.h 2011-02-23 21:10:27 UTC (rev 8031)
@@ -11,11 +11,12 @@
the next pointer field in the element (e.g., "ch.tcp_sendq_next") */
#define PRINT_QUEUE(qp, next_field) do { \
- } while(0)
-#define PRINTM_QUEUE(qp, next_field_macro, next_field) do { \
- } while(0)
-
+ } while(0)
+#define PRINTM_QUEUE(qp, next_field_macro, next_field) do { \
+ } while(0)
+#define GENERIC_Q_DECL(type) struct { type *head, *tail; }
+
#define GENERIC_Q_EMPTY(q) ((q).head == NULL)
#define GENERIC_Q_HEAD(q) ((q).head)
@@ -78,6 +79,7 @@
(qp)->head = (*(epp))->next_field; \
if ((qp)->head == NULL) \
(qp)->tail = NULL; \
+ PRINT_QUEUE (qp, next_field); \
} while (0)
/* remove the elements from the top of the queue starting with ep0 through ep1 */
@@ -147,6 +149,8 @@
/* queue macros that use another macro to find the 'next' field, e.g.,
when the next field is in the channel private area of a request.
The macro is of the form "macro_name(element_ptr, next_field)"*/
+#define GENERICM_Q_DECL(type, q_name) typedef struct { type *head, *tail; } q_name;
+
#define GENERICM_Q_EMPTY(q) ((q).head == NULL)
#define GENERICM_Q_HEAD(q) ((q).head)
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_impl.h
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_impl.h 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_impl.h 2011-02-23 21:10:27 UTC (rev 8031)
@@ -16,11 +16,7 @@
#include "mpid_nem_fbox.h"
#include "mpid_nem_nets.h"
#include "mpid_nem_queue.h"
-#include "mpid_nem_generic_queue.h"
-#include "mpiu_os_wrappers.h"
-
-
#define MPID_NEM__BYPASS_Q_MAX_VAL ((MPID_NEM_MPICH2_DATA_LEN) - (sizeof(MPIDI_CH3_Pkt_t)))
int MPIDI_CH3I_Seg_alloc(size_t len, void **ptr_p);
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_inline.h
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_inline.h 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_inline.h 2011-02-23 21:10:27 UTC (rev 8031)
@@ -34,10 +34,10 @@
/* evaluates to TRUE if it is safe to block on recv operations in the progress
* loop, FALSE otherwise */
-#define MPID_nem_safe_to_block_recv() \
- (!MPID_nem_local_lmt_pending && \
- !MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] && \
- !MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE) && \
+#define MPID_nem_safe_to_block_recv() \
+ (!MPID_nem_local_lmt_pending && \
+ !MPIDI_CH3I_shm_active_send && \
+ !MPIDI_CH3I_Sendq_head(MPIDI_CH3I_shm_sendq) && \
!MPIDU_Sched_are_pending())
#undef FUNCNAME
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_ckpt.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_ckpt.c 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_ckpt.c 2011-02-23 21:10:27 UTC (rev 8031)
@@ -11,12 +11,6 @@
#ifdef ENABLE_CHECKPOINTING
-#define SENDQ_EMPTY(q) GENERIC_Q_EMPTY (q)
-#define SENDQ_HEAD(q) GENERIC_Q_HEAD (q)
-#define SENDQ_ENQUEUE(qp, ep) GENERIC_Q_ENQUEUE (qp, ep, dev.next)
-#define SENDQ_DEQUEUE(qp, ep) GENERIC_Q_DEQUEUE (qp, ep, dev.next)
-#define SENDQ_ENQUEUE_MULTIPLE(qp, ep0, ep1) GENERIC_Q_ENQUEUE_MULTIPLE(qp, ep0, ep1, dev.next)
-
#undef FUNCNAME
#define FUNCNAME MPID_nem_tcp_ckpt_pause_send_vc
#undef FCNAME
@@ -54,13 +48,13 @@
vc_tcp->send_paused = FALSE;
/* There may be a unpause message in the send queue. If so, just enqueue everything on the send queue. */
- if (SENDQ_EMPTY(vc_tcp->send_queue))
+ if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->paused_send_queue);
/* if anything is left on the paused queue, put it on the send queue and wait for the reconnect */
- if (!SENDQ_EMPTY(vc_tcp->paused_send_queue)) {
+ if (!MPIDI_CH3I_Sendq_empty(vc_tcp->paused_send_queue)) {
- SENDQ_ENQUEUE_MULTIPLE(&vc_tcp->send_queue, vc_tcp->paused_send_queue.head, vc_tcp->paused_send_queue.tail);
+ MPIDI_CH3I_Sendq_enqueue_multiple_no_refcount(&vc_tcp->send_queue, vc_tcp->paused_send_queue.head, vc_tcp->paused_send_queue.tail);
vc_tcp->paused_send_queue.head = vc_tcp->paused_send_queue.tail = NULL;
}
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_impl.h
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_impl.h 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_impl.h 2011-02-23 21:10:27 UTC (rev 8031)
@@ -28,11 +28,7 @@
#define MPIDI_NEM_TCP_MAX_CONNECT_RETRIES 100
-typedef struct MPIDI_nem_tcp_request_queue
-{
- struct MPID_Request *head;
- struct MPID_Request *tail;
-} MPIDI_nem_tcp_request_queue_t;
+typedef GENERIC_Q_DECL(struct MPID_Request) MPIDI_nem_tcp_request_queue_t;
/* The vc provides a generic buffer in which network modules can store
private fields This removes all dependencies from the VC struction
@@ -147,14 +143,7 @@
#define Q_DEQUEUE(qp, ep) GENERIC_Q_DEQUEUE (qp, ep, next)
#define Q_REMOVE_ELEMENTS(qp, ep0, ep1) GENERIC_Q_REMOVE_ELEMENTS (qp, ep0, ep1, next)
-/* Send queue macros */
-#define SENDQ_EMPTY(q) GENERIC_Q_EMPTY (q)
-#define SENDQ_HEAD(q) GENERIC_Q_HEAD (q)
-#define SENDQ_ENQUEUE(qp, ep) GENERIC_Q_ENQUEUE (qp, ep, dev.next)
-#define SENDQ_DEQUEUE(qp, ep) GENERIC_Q_DEQUEUE (qp, ep, dev.next)
-#define SENDQ_ENQUEUE_MULTIPLE(qp, ep0, ep1) GENERIC_Q_ENQUEUE_MULTIPLE(qp, ep0, ep1, dev.next)
-
/* VC list macros */
#define VC_L_EMPTY(q) GENERIC_L_EMPTY (q)
#define VC_L_HEAD(q) GENERIC_L_HEAD (q)
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_send.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_send.c 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/netmod/tcp/tcp_send.c 2011-02-23 21:10:27 UTC (rev 8031)
@@ -9,12 +9,6 @@
#define NUM_PREALLOC_SENDQ 10
#define MAX_SEND_IOV 10
-#define SENDQ_EMPTY(q) GENERIC_Q_EMPTY (q)
-#define SENDQ_HEAD(q) GENERIC_Q_HEAD (q)
-#define SENDQ_ENQUEUE(qp, ep) GENERIC_Q_ENQUEUE (qp, ep, dev.next)
-#define SENDQ_DEQUEUE(qp, ep) GENERIC_Q_DEQUEUE (qp, ep, dev.next)
-
-
typedef struct MPID_nem_tcp_send_q_element
{
struct MPID_nem_tcp_send_q_element *next;
@@ -87,13 +81,14 @@
MPIU_Assert(vc != NULL);
- if (SENDQ_EMPTY(*send_queue))
+ if (MPIDI_CH3I_Sendq_empty(*send_queue))
goto fn_exit;
- while (!SENDQ_EMPTY(*send_queue))
+ while (!MPIDI_CH3I_Sendq_empty(*send_queue))
{
- sreq = SENDQ_HEAD(*send_queue);
-
+ sreq = MPIDI_CH3I_Sendq_head(*send_queue);
+ MPIU_DBG_MSG_P(CH3_CHANNEL, VERBOSE, "Sending %p", sreq);
+
iov = &sreq->dev.iov[sreq->dev.iov_offset];
CHECK_EINTR(offset, writev(vc_tcp->sc->fd, iov, sreq->dev.iov_count));
@@ -155,7 +150,7 @@
MPIU_Assert(MPIDI_Request_get_type(sreq) != MPIDI_REQUEST_TYPE_GET_RESP);
MPIDI_CH3U_Request_complete(sreq);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
- SENDQ_DEQUEUE(send_queue, &sreq);
+ MPIDI_CH3I_Sendq_dequeue(send_queue, &sreq);
continue;
}
@@ -166,14 +161,14 @@
if (complete)
{
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
- SENDQ_DEQUEUE(send_queue, &sreq);
+ MPIDI_CH3I_Sendq_dequeue(send_queue, &sreq);
continue;
}
sreq->dev.iov_offset = 0;
}
}
- if (SENDQ_EMPTY(*send_queue))
+ if (MPIDI_CH3I_Sendq_empty(*send_queue))
UNSET_PLFD(vc_tcp);
fn_exit:
@@ -216,7 +211,7 @@
MPIDI_CHANGE_VC_STATE(vc, ACTIVE);
- if (!SENDQ_EMPTY (vc_tcp->send_queue))
+ if (!MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
SET_PLFD(vc_tcp);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
@@ -252,7 +247,7 @@
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
- if (SENDQ_EMPTY(vc_tcp->send_queue))
+ if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
MPID_IOV iov[2];
@@ -340,21 +335,21 @@
MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPID_IOV_LEN > 0);
if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
- SENDQ_ENQUEUE(&vc_tcp->paused_send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
} else {
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
- if (SENDQ_EMPTY(vc_tcp->send_queue)) {
+ if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
} else {
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
}
@@ -391,7 +386,7 @@
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
- if (SENDQ_EMPTY(vc_tcp->send_queue))
+ if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
MPID_IOV iov[2];
@@ -479,18 +474,18 @@
MPIU_Assert(sreq->dev.iov_count >= 1 && sreq->dev.iov[0].MPID_IOV_LEN > 0);
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
- if (SENDQ_EMPTY(vc_tcp->send_queue)) {
+ if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
} else {
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
*sreq_ptr = sreq;
@@ -526,7 +521,7 @@
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
- if (SENDQ_EMPTY(vc_tcp->send_queue))
+ if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
MPID_IOV iov[2];
@@ -633,21 +628,21 @@
sreq->dev.iov_offset = 0;
if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
- SENDQ_ENQUEUE(&vc_tcp->paused_send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
} else {
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
- if (SENDQ_EMPTY(vc_tcp->send_queue)) {
+ if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
} else {
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
}
@@ -690,7 +685,7 @@
if (!MPID_nem_tcp_vc_send_paused(vc_tcp)) {
if (MPID_nem_tcp_vc_is_connected(vc_tcp))
{
- if (SENDQ_EMPTY(vc_tcp->send_queue))
+ if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue))
{
CHECK_EINTR(offset, writev(vc_tcp->sc->fd, iov, iov_n));
if (offset == 0) {
@@ -785,21 +780,21 @@
sreq->dev.iov_offset = 0;
if (MPID_nem_tcp_vc_send_paused(vc_tcp)) {
- SENDQ_ENQUEUE(&vc_tcp->paused_send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->paused_send_queue, sreq);
} else {
if (MPID_nem_tcp_vc_is_connected(vc_tcp)) {
- if (SENDQ_EMPTY(vc_tcp->send_queue)) {
+ if (MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
/* this will be the first send on the queue: queue it and set the write flag on the pollfd */
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
SET_PLFD(vc_tcp);
} else {
/* there are other sends in the queue before this one: try to send from the queue */
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
mpi_errno = MPID_nem_tcp_send_queued(vc, &vc_tcp->send_queue);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
} else {
- SENDQ_ENQUEUE(&vc_tcp->send_queue, sreq);
+ MPIDI_CH3I_Sendq_enqueue(&vc_tcp->send_queue, sreq);
}
}
@@ -828,16 +823,16 @@
an error condition and we just want to mark them as complete */
/* send queue */
- while (!SENDQ_EMPTY(vc_tcp->send_queue)) {
- SENDQ_DEQUEUE(&vc_tcp->send_queue, &req);
+ while (!MPIDI_CH3I_Sendq_empty(vc_tcp->send_queue)) {
+ MPIDI_CH3I_Sendq_dequeue(&vc_tcp->send_queue, &req);
req->status.MPI_ERROR = req_errno;
MPIDI_CH3U_Request_complete(req);
}
/* paused send queue */
- while (!SENDQ_EMPTY(vc_tcp->paused_send_queue)) {
- SENDQ_DEQUEUE(&vc_tcp->paused_send_queue, &req);
+ while (!MPIDI_CH3I_Sendq_empty(vc_tcp->paused_send_queue)) {
+ MPIDI_CH3I_Sendq_dequeue(&vc_tcp->paused_send_queue, &req);
req->status.MPI_ERROR = req_errno;
MPIDI_CH3U_Request_complete(req);
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_debug.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_debug.c 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_debug.c 2011-02-23 21:10:27 UTC (rev 8031)
@@ -65,8 +65,8 @@
fprintf(stream, "..VC ptr=%p pg_rank=%d state=%s:\n", vc, vc->pg_rank, vc_state_to_str(vc->state));
if (vc_ch->is_local) {
- fprintf(stream, "....CH3_NORMAL_QUEUE active_send\n");
- sreq = MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE];
+ fprintf(stream, "....shm_active_send\n");
+ sreq = MPIDI_CH3I_shm_active_send;
if (sreq) {
fprintf(stream, ".... sreq=%p ctx=%#x rank=%d tag=%d\n", sreq,
sreq->dev.match.parts.context_id,
@@ -74,8 +74,8 @@
sreq->dev.match.parts.tag);
}
- fprintf(stream, "....CH3_NORMAL_QUEUE queue (head-to-tail)\n");
- sreq = MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE);
+ fprintf(stream, "....shm send queue (head-to-tail)\n");
+ sreq = MPIDI_CH3I_Sendq_head(MPIDI_CH3I_shm_sendq);
i = 0;
while (sreq != NULL) {
fprintf(stream, "....[%d] sreq=%p ctx=%#x rank=%d tag=%d\n", i, sreq,
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_isend.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_isend.c 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_isend.c 2011-02-23 21:10:27 UTC (rev 8031)
@@ -49,7 +49,7 @@
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
in_cs = TRUE;
- if (MPIDI_CH3I_SendQ_empty (CH3_NORMAL_QUEUE))
+ if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq))
{
MPIU_DBG_MSG_D (CH3_CHANNEL, VERBOSE, "iSend %d", (int) hdr_sz);
mpi_errno = MPID_nem_mpich2_send_header (hdr, hdr_sz, vc, &again);
@@ -103,12 +103,12 @@
sreq->ch.noncontig = FALSE;
sreq->ch.vc = vc;
- if (MPIDI_CH3I_SendQ_empty(CH3_NORMAL_QUEUE)) {
- MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
+ if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq)) {
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
} else {
/* this is not the first send on the queue, enqueue it then
check to see if we can send any now */
- MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
mpi_errno = MPIDI_CH3I_Shm_send_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_isendv.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_isendv.c 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_isendv.c 2011-02-23 21:10:27 UTC (rev 8031)
@@ -65,7 +65,7 @@
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
in_cs = TRUE;
- if (MPIDI_CH3I_SendQ_empty (CH3_NORMAL_QUEUE))
+ if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq))
{
MPID_IOV *remaining_iov = iov;
int remaining_n_iov = n_iov;
@@ -105,9 +105,9 @@
sreq->dev.iov_count = remaining_n_iov;
sreq->ch.noncontig = FALSE;
sreq->ch.vc = vc;
- MPIDI_CH3I_SendQ_enqueue (sreq, CH3_NORMAL_QUEUE);
- MPIU_Assert (MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] == NULL);
- MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
+ MPIU_Assert (MPIDI_CH3I_shm_active_send == NULL);
+ MPIDI_CH3I_shm_active_send = sreq;
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, " enqueued");
}
else
@@ -133,9 +133,9 @@
sreq->dev.iov_offset = 0;
sreq->ch.noncontig = FALSE;
sreq->ch.vc = vc;
- MPIDI_CH3I_SendQ_enqueue (sreq, CH3_NORMAL_QUEUE);
- MPIU_Assert (MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] == NULL);
- MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
+ MPIU_Assert (MPIDI_CH3I_shm_active_send == NULL);
+ MPIDI_CH3I_shm_active_send = sreq;
MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, ".... reloaded and enqueued");
}
else
@@ -167,7 +167,7 @@
/* this is not the first send on the queue, enqueue it then
check to see if we can send any now */
- MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
mpi_errno = MPIDI_CH3I_Shm_send_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_istartmsg.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_istartmsg.c 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_istartmsg.c 2011-02-23 21:10:27 UTC (rev 8031)
@@ -52,7 +52,7 @@
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
in_cs = 1;
- if (MPIDI_CH3I_SendQ_empty (CH3_NORMAL_QUEUE))
+ if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq))
/* MT */
{
MPIU_DBG_MSG_D (CH3_CHANNEL, VERBOSE, "iStartMsg %d", (int) hdr_sz);
@@ -102,12 +102,12 @@
sreq->ch.vc = vc;
sreq->dev.OnDataAvail = 0;
- if (MPIDI_CH3I_SendQ_empty(CH3_NORMAL_QUEUE)) {
- MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
+ if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq)) {
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
} else {
/* this is not the first send on the queue, enqueue it then
check to see if we can send any now */
- MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
/* FIXME we are sometimes called from within the progress engine, we
* shouldn't be calling the progress engine again */
mpi_errno = MPIDI_CH3I_Shm_send_progress();
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_istartmsgv.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_istartmsgv.c 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_istartmsgv.c 2011-02-23 21:10:27 UTC (rev 8031)
@@ -74,7 +74,7 @@
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
in_cs = TRUE;
- if (MPIDI_CH3I_SendQ_empty (CH3_NORMAL_QUEUE))
+ if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq))
/* MT */
{
MPID_IOV *remaining_iov = iov;
@@ -136,9 +136,9 @@
sreq->dev.iov[0].MPID_IOV_BUF = (char *) &sreq->dev.pending_pkt;
sreq->dev.iov[0].MPID_IOV_LEN = iov[0].MPID_IOV_LEN;
}
- MPIDI_CH3I_SendQ_enqueue (sreq, CH3_NORMAL_QUEUE);
- MPIU_Assert (MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] == NULL);
- MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
+ MPIU_Assert (MPIDI_CH3I_shm_active_send == NULL);
+ MPIDI_CH3I_shm_active_send = sreq;
}
}
else
@@ -168,12 +168,12 @@
sreq->ch.noncontig = FALSE;
sreq->ch.vc = vc;
- if (MPIDI_CH3I_SendQ_empty(CH3_NORMAL_QUEUE)) {
- MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
+ if (MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq)) {
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
} else {
/* this is not the first send on the queue, enqueue it then
check to see if we can send any now */
- MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
mpi_errno = MPIDI_CH3I_Shm_send_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c 2011-02-23 21:10:27 UTC (rev 8031)
@@ -51,9 +51,8 @@
static volatile int sigusr1_count = 0;
static int my_sigusr1_count = 0;
-struct MPID_Request *MPIDI_CH3I_sendq_head[CH3_NUM_QUEUES] = {0};
-struct MPID_Request *MPIDI_CH3I_sendq_tail[CH3_NUM_QUEUES] = {0};
-struct MPID_Request *MPIDI_CH3I_active_send[CH3_NUM_QUEUES] = {0};
+MPIDI_CH3I_shm_sendq_t MPIDI_CH3I_shm_sendq = {NULL, NULL};
+struct MPID_Request *MPIDI_CH3I_shm_active_send = NULL;
static int pkt_NETMOD_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t *buflen, MPID_Request **rreqp);
@@ -97,7 +96,7 @@
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_SEND_PROGRESS);
- sreq = MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE];
+ sreq = MPIDI_CH3I_shm_active_send;
MPIU_DBG_STMT(CH3_CHANNEL, VERBOSE, {if (sreq) MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "Send: cont sreq");});
if (sreq)
{
@@ -139,7 +138,7 @@
}
else
{
- sreq = MPIDI_CH3I_SendQ_head (CH3_NORMAL_QUEUE);
+ sreq = MPIDI_CH3I_Sendq_head(MPIDI_CH3I_shm_sendq);
MPIU_DBG_STMT (CH3_CHANNEL, VERBOSE, {if (sreq) MPIU_DBG_MSG (CH3_CHANNEL, VERBOSE, "Send: new sreq ");});
if (!sreq->ch.noncontig)
@@ -153,7 +152,7 @@
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
if (!again)
{
- MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;
+ MPIDI_CH3I_shm_active_send = sreq;
while (!again && n_iov > 0)
{
mpi_errno = MPID_nem_mpich2_sendv(&iov, &n_iov, sreq->ch.vc, &again);
@@ -176,7 +175,7 @@
&sreq->dev.pending_pkt, sreq->ch.header_sz, sreq->ch.vc, &again);
if (!again)
{
- MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;
+ MPIDI_CH3I_shm_active_send = sreq;
while (!again && sreq->dev.segment_first < sreq->dev.segment_size)
{
MPID_nem_mpich2_send_seg(sreq->dev.segment_ptr, &sreq->dev.segment_first, sreq->dev.segment_size,
@@ -203,8 +202,8 @@
MPIDI_CH3U_Request_complete(sreq);
/* MT - clear the current active send before dequeuing/destroying the current request */
- MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = NULL;
- MPIDI_CH3I_SendQ_dequeue(CH3_NORMAL_QUEUE);
+ MPIDI_CH3I_shm_active_send = NULL;
+ MPIDI_CH3I_Sendq_dequeue(&MPIDI_CH3I_shm_sendq, &sreq);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
}
else
@@ -215,8 +214,8 @@
if (complete)
{
- MPIDI_CH3I_SendQ_dequeue(CH3_NORMAL_QUEUE);
- MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = NULL;
+ MPIDI_CH3I_Sendq_dequeue(&MPIDI_CH3I_shm_sendq, &sreq);
+ MPIDI_CH3I_shm_active_send = NULL;
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, ".... complete");
}
}
@@ -420,7 +419,7 @@
/* make progress sending */
- if (MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] || MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE)) {
+ if (MPIDI_CH3I_shm_active_send || MPIDI_CH3I_Sendq_head(MPIDI_CH3I_shm_sendq)) {
mpi_errno = MPIDI_CH3I_Shm_send_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
} else {
@@ -781,12 +780,10 @@
# endif
MPIU_THREAD_CHECK_END
- for (i = 0; i < CH3_NUM_QUEUES; ++i)
- {
- MPIDI_CH3I_sendq_head[i] = NULL;
- MPIDI_CH3I_sendq_tail[i] = NULL;
- }
-
+ MPIDI_CH3I_shm_sendq.head = NULL;
+ MPIDI_CH3I_shm_sendq.tail = NULL;
+ MPIDI_CH3I_shm_active_send = NULL;
+
/* Initialize the code to handle incoming packets */
if (PKTARRAY_SIZE <= MPIDI_NEM_PKT_END) {
MPIU_ERR_SETFATALANDJUMP(mpi_errno, MPI_ERR_INTERN, "**ch3|pktarraytoosmall");
@@ -895,7 +892,7 @@
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_COMPLETE_SENDQ_WITH_ERROR);
- req = MPIDI_CH3I_sendq_head[CH3_NORMAL_QUEUE];
+ req = MPIDI_CH3I_shm_sendq.head;
prev = NULL;
while (req) {
if (req->ch.vc == vc) {
@@ -903,9 +900,9 @@
if (prev)
prev->dev.next = next;
else
- MPIDI_CH3I_sendq_head[CH3_NORMAL_QUEUE] = next;
- if (MPIDI_CH3I_sendq_tail[CH3_NORMAL_QUEUE] == req)
- MPIDI_CH3I_sendq_tail[CH3_NORMAL_QUEUE] = prev;
+ MPIDI_CH3I_shm_sendq.head = next;
+ if (MPIDI_CH3I_shm_sendq.tail == req)
+ MPIDI_CH3I_shm_sendq.tail = prev;
req->status.MPI_ERROR = MPI_SUCCESS;
MPIU_ERR_SET1(req->status.MPI_ERROR, MPI_ERR_OTHER, "**comm_fail", "**comm_fail %d", vc->pg_rank);
Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c 2011-02-23 20:38:55 UTC (rev 8030)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3i_eagernoncontig.c 2011-02-23 21:10:27 UTC (rev 8031)
@@ -30,7 +30,7 @@
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
- if (!MPIDI_CH3I_SendQ_empty(CH3_NORMAL_QUEUE)) /* MT */
+ if (!MPIDI_CH3I_Sendq_empty(MPIDI_CH3I_shm_sendq)) /* MT */
{
/* send queue is not empty, enqueue the request then check to
see if we can send any now */
@@ -42,7 +42,7 @@
sreq->ch.header_sz = hdr_sz;
sreq->ch.vc = vc;
- MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
mpi_errno = MPIDI_CH3I_Shm_send_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
goto fn_exit;
@@ -66,10 +66,10 @@
else
{
/* part of message was sent, make this req an active send */
- MPIU_Assert(MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] == NULL);
- MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] = sreq;
+ MPIU_Assert(MPIDI_CH3I_shm_active_send == NULL);
+ MPIDI_CH3I_shm_active_send = sreq;
}
- MPIDI_CH3I_SendQ_enqueue(sreq, CH3_NORMAL_QUEUE);
+ MPIDI_CH3I_Sendq_enqueue(&MPIDI_CH3I_shm_sendq, sreq);
goto fn_exit;
}
More information about the mpich2-commits
mailing list