[mpich2-commits] r7842 - in mpich2/trunk/src/mpid/ch3/channels/nemesis: include nemesis/src src

buntinas at mcs.anl.gov buntinas at mcs.anl.gov
Wed Jan 26 12:49:39 CST 2011


Author: buntinas
Date: 2011-01-26 12:49:39 -0600 (Wed, 26 Jan 2011)
New Revision: 7842

Modified:
   mpich2/trunk/src/mpid/ch3/channels/nemesis/include/mpidi_ch3_impl.h
   mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_finalize.c
   mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_init.c
   mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_dma.c
   mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_shm.c
   mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_vmsplice.c
   mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c
Log:
shm fixes to handle failed processes.  We no longer poll forever in LMT waiting for other side to finish copying a buffer.  We now clean up LMT operations when a process fails.  We now complete requests in shm send queue when a VC fails.

Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/include/mpidi_ch3_impl.h
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/include/mpidi_ch3_impl.h	2011-01-26 18:41:03 UTC (rev 7841)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/include/mpidi_ch3_impl.h	2011-01-26 18:49:39 UTC (rev 7842)
@@ -82,6 +82,7 @@
 int MPIDI_CH3I_Progress_init(void);
 int MPIDI_CH3I_Progress_finalize(void);
 int MPIDI_CH3I_Shm_send_progress(void);
+int MPIDI_CH3I_Complete_sendq_with_error(MPIDI_VC_t * vc);
 
 int MPIDI_CH3I_SendNoncontig( MPIDI_VC_t *vc, MPID_Request *sreq, void *header, MPIDI_msg_sz_t hdr_sz );
 
@@ -91,6 +92,7 @@
 int MPID_nem_lmt_shm_handle_cookie(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV cookie);
 int MPID_nem_lmt_shm_done_send(MPIDI_VC_t *vc, MPID_Request *req);
 int MPID_nem_lmt_shm_done_recv(MPIDI_VC_t *vc, MPID_Request *req);
+int MPID_nem_lmt_shm_vc_terminated(MPIDI_VC_t *vc);
 
 int MPID_nem_lmt_dma_initiate_lmt(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *rts_pkt, MPID_Request *req);
 int MPID_nem_lmt_dma_start_recv(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV s_cookie);
@@ -98,6 +100,7 @@
 int MPID_nem_lmt_dma_handle_cookie(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV cookie);
 int MPID_nem_lmt_dma_done_send(MPIDI_VC_t *vc, MPID_Request *req);
 int MPID_nem_lmt_dma_done_recv(MPIDI_VC_t *vc, MPID_Request *req);
+int MPID_nem_lmt_dma_vc_terminated(MPIDI_VC_t *vc);
 
 int MPID_nem_lmt_vmsplice_initiate_lmt(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *rts_pkt, MPID_Request *req);
 int MPID_nem_lmt_vmsplice_start_recv(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV s_cookie);
@@ -105,6 +108,7 @@
 int MPID_nem_lmt_vmsplice_handle_cookie(MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV cookie);
 int MPID_nem_lmt_vmsplice_done_send(MPIDI_VC_t *vc, MPID_Request *req);
 int MPID_nem_lmt_vmsplice_done_recv(MPIDI_VC_t *vc, MPID_Request *req);
+int MPID_nem_lmt_vmsplice_vc_terminated(MPIDI_VC_t *vc);
 
 int MPID_nem_handle_pkt(MPIDI_VC_t *vc, char *buf, MPIDI_msg_sz_t buflen);
 
@@ -167,6 +171,7 @@
     int (* lmt_handle_cookie)(struct MPIDI_VC *vc, struct MPID_Request *req, MPID_IOV cookie);
     int (* lmt_done_send)(struct MPIDI_VC *vc, struct MPID_Request *req);
     int (* lmt_done_recv)(struct MPIDI_VC *vc, struct MPID_Request *req);
+    int (* lmt_vc_terminated)(struct MPIDI_VC *vc);
 
     /* LMT shared memory copy-buffer ptr */
     struct MPID_nem_copy_buf *lmt_copy_buf;

Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_finalize.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_finalize.c	2011-01-26 18:41:03 UTC (rev 7841)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_finalize.c	2011-01-26 18:49:39 UTC (rev 7842)
@@ -72,6 +72,13 @@
 
     MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_VC_TERMINATE);
 
+    
+    mpi_errno = ((MPIDI_CH3I_VC *)vc->channel_private)->lmt_vc_terminated(vc);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    
+    mpi_errno = MPIDI_CH3I_Complete_sendq_with_error(vc);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    
     mpi_errno = MPIU_SHMW_Hnd_finalize(&(((MPIDI_CH3I_VC *)vc->channel_private)->lmt_copy_buf_handle));
     if(mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
     mpi_errno = MPIU_SHMW_Hnd_finalize(&(((MPIDI_CH3I_VC *)vc->channel_private)->lmt_recv_copy_buf_handle));

Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_init.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_init.c	2011-01-26 18:41:03 UTC (rev 7841)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_init.c	2011-01-26 18:49:39 UTC (rev 7842)
@@ -413,6 +413,7 @@
         vc_ch->lmt_handle_cookie = MPID_nem_lmt_shm_handle_cookie;
         vc_ch->lmt_done_send     = MPID_nem_lmt_shm_done_send;
         vc_ch->lmt_done_recv     = MPID_nem_lmt_shm_done_recv;
+        vc_ch->lmt_vc_terminated = MPID_nem_lmt_shm_vc_terminated;
 #elif MPID_NEM_LOCAL_LMT_IMPL == MPID_NEM_LOCAL_LMT_DMA
         vc_ch->lmt_initiate_lmt  = MPID_nem_lmt_dma_initiate_lmt;
         vc_ch->lmt_start_recv    = MPID_nem_lmt_dma_start_recv;
@@ -420,6 +421,7 @@
         vc_ch->lmt_handle_cookie = MPID_nem_lmt_dma_handle_cookie;
         vc_ch->lmt_done_send     = MPID_nem_lmt_dma_done_send;
         vc_ch->lmt_done_recv     = MPID_nem_lmt_dma_done_recv;
+        vc_ch->lmt_vc_terminated = MPID_nem_lmt_dma_vc_terminated;
 #elif MPID_NEM_LOCAL_LMT_IMPL == MPID_NEM_LOCAL_LMT_VMSPLICE
         vc_ch->lmt_initiate_lmt  = MPID_nem_lmt_vmsplice_initiate_lmt;
         vc_ch->lmt_start_recv    = MPID_nem_lmt_vmsplice_start_recv;
@@ -427,6 +429,7 @@
         vc_ch->lmt_handle_cookie = MPID_nem_lmt_vmsplice_handle_cookie;
         vc_ch->lmt_done_send     = MPID_nem_lmt_vmsplice_done_send;
         vc_ch->lmt_done_recv     = MPID_nem_lmt_vmsplice_done_recv;
+        vc_ch->lmt_vc_terminated = MPID_nem_lmt_vmsplice_vc_terminated;
 #elif MPID_NEM_LOCAL_LMT_IMPL == MPID_NEM_LOCAL_LMT_NONE
         vc_ch->lmt_initiate_lmt  = NULL;
         vc_ch->lmt_start_recv    = NULL;
@@ -434,6 +437,7 @@
         vc_ch->lmt_handle_cookie = NULL;
         vc_ch->lmt_done_send     = NULL;
         vc_ch->lmt_done_recv     = NULL;
+        vc_ch->lmt_vc_terminated = NULL;
 #else
 #  error Must select a valid local LMT implementation!
 #endif
@@ -464,6 +468,7 @@
         vc_ch->lmt_handle_cookie = NULL;
         vc_ch->lmt_done_send     = NULL;
         vc_ch->lmt_done_recv     = NULL;
+        vc_ch->lmt_vc_terminated = NULL;
 
         /* FIXME: DARIUS set these to default for now */
         vc_ch->iStartContigMsg = NULL;

Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_dma.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_dma.c	2011-01-26 18:41:03 UTC (rev 7841)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_dma.c	2011-01-26 18:49:39 UTC (rev 7842)
@@ -584,10 +584,28 @@
                 }
                 break;
             case KNEM_STATUS_FAILED:
-                /* return an error for now, there might be something that's
-                   less heavy-handed to do later */
-                MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**recv_status",
-                                     "**recv_status %d", *cur->status_p);
+                /* set the error status for the request, complete it then dequeue the entry */
+                cur->req->status.MPI_ERROR = MPI_SUCCESS;
+                MPIU_ERR_SET1(cur->req->status.MPI_ERROR, MPI_ERR_OTHER, "**recv_status", "**recv_status %d", *cur->status_p);
+
+                MPIDI_CH3U_Request_complete(cur->req);
+
+                if (cur == outstanding_head) {
+                    outstanding_head = cur->next;
+                    prev = NULL;
+                    free_me = cur;
+                    cur = cur->next;
+                }
+                else {
+                    prev->next = cur->next;
+                    free_me = cur;
+                    cur = cur->next;
+                }
+
+                if (free_me) MPIU_Free(free_me);
+                --MPID_nem_local_lmt_pending;
+                continue;
+                
                 break;
             case KNEM_STATUS_PENDING:
                 /* nothing to do here */
@@ -609,6 +627,26 @@
     goto fn_exit;
 }
 
+#undef FUNCNAME
+#define FUNCNAME MPID_nem_lmt_dma_vc_terminated
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_nem_lmt_dma_vc_terminated(MPIDI_VC_t *vc)
+{
+    int mpi_errno = MPI_SUCCESS;
+    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_DMA_VC_TERMINATED);
+
+    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_DMA_VC_TERMINATED);
+
+    /* Do nothing.  KNEM should abort any ops with dead processes. */
+
+ fn_exit:
+    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_DMA_VC_TERMINATED);
+    return mpi_errno;
+ fn_fail:
+    goto fn_exit;
+}
+
 /* --------------------------------------------------------------------------
    The functions below are nops, stubs that might be used in later versions of
    this code.

Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_shm.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_shm.c	2011-01-26 18:41:03 UTC (rev 7841)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_shm.c	2011-01-26 18:49:39 UTC (rev 7842)
@@ -120,6 +120,8 @@
 
 /* number of iterations to wait for the other side to process a buffer */
 #define LMT_POLLS_BEFORE_YIELD 1000
+/* how many times we'll call yield before we give up waiting */
+#define LMT_YIELDS_BEFORE_GIVING_UP 1000
 
 #undef FUNCNAME
 #define FUNCNAME MPID_nem_lmt_shm_initiate_lmt
@@ -379,7 +381,7 @@
     {
         /* copy buf is owned by the remote side */
         /* remote side chooses next transfer */
-        int i = 0;
+        int p = 0, y = 0;
 
         OPA_read_barrier();
         
@@ -388,12 +390,17 @@
             
         while (copy_buf->owner_info.val.remote_req_id == MPI_REQUEST_NULL)
         {
-            if (i == LMT_POLLS_BEFORE_YIELD)
+            if (p == LMT_POLLS_BEFORE_YIELD)
             {
-                COND_Yield();
-                i = 0;
+                if (y < LMT_YIELDS_BEFORE_GIVING_UP) {
+                    COND_Yield();
+                    p = 0;
+                    ++y;
+                } else {
+                    goto fn_exit;
+                }
             }
-            ++i;
+            ++p;
         }
 
         OPA_read_barrier();
@@ -473,19 +480,20 @@
 
     do
     {
-        int i;
+        int p, y;
         /* If the buffer is full, wait.  If the receiver is actively
            working on this transfer, yield the processor and keep
            waiting, otherwise wait for a bounded amount of time. */
-        i = 0;
+        p = y = 0;
         while (copy_buf->len[buf_num].val != 0)
         {
-            if (i == LMT_POLLS_BEFORE_YIELD)
+            if (p == LMT_POLLS_BEFORE_YIELD)
             {
-                if (copy_buf->receiver_present.val)
+                if (copy_buf->receiver_present.val && y < LMT_YIELDS_BEFORE_GIVING_UP)
                 {
                     COND_Yield();
-                    i = 0;
+                    p = 0;
+                    ++y;
                 }
                 else
                 {
@@ -498,7 +506,7 @@
                 }
             }
 
-            ++i;
+            ++p;
         }
 
         OPA_read_write_barrier();
@@ -572,18 +580,20 @@
 
     do
     {
+        int p, y;
         /* If the buffer is empty, wait.  If the sender is actively
            working on this transfer, yield the processor and keep
            waiting, otherwise wait for a bounded amount of time. */
-        i = 0;
+        p = y = 0;
         while ((len = copy_buf->len[buf_num].val) == 0)
         {
-            if (i == LMT_POLLS_BEFORE_YIELD)
+            if (p == LMT_POLLS_BEFORE_YIELD)
             {
-                if (copy_buf->sender_present.val)
+                if (copy_buf->sender_present.val && y < LMT_YIELDS_BEFORE_GIVING_UP)
                 {
                     COND_Yield();
-                    i = 0;
+                    p = 0;
+                    ++y;
                 }
                 else
                 {
@@ -597,7 +607,7 @@
                 }
             }
 
-            ++i;
+            ++p;
         }
 
         OPA_read_barrier();
@@ -621,7 +631,7 @@
             MPIU_Assert(last - first > surfeit);
 
             MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "freed previous buffer");
-       }
+        }
 
         if (last < expected_last)
         {
@@ -744,6 +754,10 @@
         if (vc_ch->lmt_active_lmt == NULL)
         {
             /* couldn't find an appropriate request, try again later */
+            
+            if (LMT_SHM_Q_EMPTY(vc_ch->lmt_queue))
+                *done = TRUE; /* There's nothing in the queue.  VC
+                                 must have terminated */
             goto fn_exit;
         }
     }
@@ -817,6 +831,43 @@
 }
 
 #undef FUNCNAME
+#define FUNCNAME MPID_nem_lmt_shm_vc_terminated
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_nem_lmt_shm_vc_terminated(MPIDI_VC_t *vc)
+{
+    int mpi_errno = MPI_SUCCESS;
+    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;
+    MPID_nem_lmt_shm_wait_element_t *we;
+    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_SHM_VC_TERMINATED);
+
+    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_SHM_VC_TERMINATED);
+
+    /* We empty the vc queue, but don't remove the vc from the global
+       list.  That will eventually happen when lmt_shm_progress()
+       calls lmt_shm_progress_vc() and it finds an empty queue. */
+
+    if (vc_ch->lmt_active_lmt) {
+        MPIDI_CH3U_Request_complete(vc_ch->lmt_active_lmt->req);
+        MPIU_Free(vc_ch->lmt_active_lmt);
+        vc_ch->lmt_active_lmt = NULL;
+    }
+
+    while (!LMT_SHM_Q_EMPTY(vc_ch->lmt_queue)) {
+        LMT_SHM_Q_DEQUEUE(&vc_ch->lmt_queue, &we);
+        MPIDI_CH3U_Request_complete(we->req);
+        MPIU_Free(we);
+    }
+
+ fn_exit:
+    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_SHM_VC_TERMINATED);
+    return mpi_errno;
+ fn_fail:
+    goto fn_exit;
+}
+
+
+#undef FUNCNAME
 #define FUNCNAME MPID_nem_allocate_shm_region
 #undef FCNAME
 #define FCNAME MPIDI_QUOTE(FUNCNAME)

Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_vmsplice.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_vmsplice.c	2011-01-26 18:41:03 UTC (rev 7841)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/src/mpid_nem_lmt_vmsplice.c	2011-01-26 18:49:39 UTC (rev 7842)
@@ -316,12 +316,14 @@
                 mpi_errno = do_readv(cur->req, cur->pipe_fd, cur->req->dev.iov,
                                      &cur->req->dev.iov_offset,
                                      &cur->req->dev.iov_count, &complete);
+                /* FIXME: set the error status of the req and complete it, rather than POP */
                 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
                 break;
             case MPIDI_REQUEST_TYPE_SEND:
                 mpi_errno = do_vmsplice(cur->req, cur->pipe_fd, cur->req->dev.iov,
                                         &cur->req->dev.iov_offset,
                                         &cur->req->dev.iov_count, &complete);
+                /* FIXME: set the error status of the req and complete it, rather than POP */
                 if (mpi_errno) MPIU_ERR_POP(mpi_errno);
                 break;
             default:
@@ -411,6 +413,30 @@
     MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_LMT_VMSPLICE_START_SEND);
     return mpi_errno;
 }
+
+#undef FUNCNAME
+#define FUNCNAME MPIDI_CH3_MPID_nem_lmt_vmsplice_vc_terminated
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPIDI_CH3_MPID_nem_lmt_vmsplice_vc_terminated(MPIDI_VC_t *vc)
+{
+    int mpi_errno = MPI_SUCCESS;
+    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_MPID_NEM_LMT_VMSPLICE_VC_TERMINATED);
+
+    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_MPID_NEM_LMT_VMSPLICE_VC_TERMINATED);
+
+    /* FIXME: need to handle the case where a VC is terminated due to
+       a process failure.  We need to remove any outstanding LMT ops
+       for this VC. */
+
+ fn_exit:
+    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_MPID_NEM_LMT_VMSPLICE_VC_TERMINATED);
+    return mpi_errno;
+ fn_fail:
+    goto fn_exit;
+}
+
+
 /* --------------------------------------------------------------------------
    The functions below are nops, stubs that might be used in later versions of
    this code.

Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c	2011-01-26 18:41:03 UTC (rev 7841)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c	2011-01-26 18:49:39 UTC (rev 7842)
@@ -893,7 +893,51 @@
 }
 /* end MPIDI_CH3_Connection_terminate() */
 
+#undef FUNCNAME
+#define FUNCNAME MPIDI_CH3I_Complete_sendq_with_error
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPIDI_CH3I_Complete_sendq_with_error(MPIDI_VC_t * vc)
+{
+    int mpi_errno = MPI_SUCCESS;
+    MPID_Request *req, *prev;
+    MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_COMPLETE_SENDQ_WITH_ERROR);
 
+    MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_COMPLETE_SENDQ_WITH_ERROR);
+
+    req = MPIDI_CH3I_sendq_head[CH3_NORMAL_QUEUE];
+    prev = NULL;
+    while (req) {
+        if (req->ch.vc == vc) {
+            MPID_Request *next = req->dev.next;
+            if (prev)
+                prev->dev.next = next;
+            else
+                MPIDI_CH3I_sendq_head[CH3_NORMAL_QUEUE] = next;
+            if (MPIDI_CH3I_sendq_tail[CH3_NORMAL_QUEUE] == req)
+                MPIDI_CH3I_sendq_tail[CH3_NORMAL_QUEUE] = prev;
+
+            req->status.MPI_ERROR = MPI_SUCCESS;
+            MPIU_ERR_SET1(req->status.MPI_ERROR, MPI_ERR_OTHER, "**comm_fail", "**comm_fail %d", vc->pg_rank);
+            
+            MPID_Request_release(req); /* ref count was incremented when added to queue */
+            MPIDI_CH3U_Request_complete(req);
+            req = next;
+        } else {
+            prev = req;
+            req = req->dev.next;
+        }
+    }
+
+ fn_exit:
+    MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_COMPLETE_SENDQ_WITH_ERROR);
+    return mpi_errno;
+ fn_fail:
+    goto fn_exit;
+}
+
+
+
 #undef FUNCNAME
 #define FUNCNAME pkt_NETMOD_handler
 #undef FCNAME



More information about the mpich2-commits mailing list