[mpich2-commits] r3944 - in mpich2/trunk: . src/include src/mpi/coll

balaji at mcs.anl.gov balaji at mcs.anl.gov
Thu Mar 5 15:27:15 CST 2009


Author: balaji
Date: 2009-03-05 15:27:15 -0600 (Thu, 05 Mar 2009)
New Revision: 3944

Modified:
   mpich2/trunk/configure.in
   mpich2/trunk/src/include/mpiimpl.h
   mpich2/trunk/src/mpi/coll/allgatherv.c
Log:
Committing the pipelined allgatherv algorithm from the Euro PVM/MPI '08 paper. Reviewed by buntinas.

Modified: mpich2/trunk/configure.in
===================================================================
--- mpich2/trunk/configure.in	2009-03-05 21:25:26 UTC (rev 3943)
+++ mpich2/trunk/configure.in	2009-03-05 21:27:15 UTC (rev 3944)
@@ -352,6 +352,16 @@
    AC_DEFINE(USE_SMP_COLLECTIVES,1,[define to enable SMP/multi-core aware collectives])
 fi
 
+dnl Check for pipelined collectives
+AC_ARG_ENABLE(pipecoll,
+	[--enable-pipecoll - Enable support for pipelined collectives],
+	pipecoll=$enableval,
+	pipecoll=yes)
+
+if test $pipecoll = "yes" ; then
+   AC_DEFINE(USE_PIPE_COLLECTIVES,1,[define to enable pipelined collectives])
+fi
+
 dnl
 dnl The environment variable MPICH_DEBUGLIBNAME may be used to override the
 dnl default name of the library that the debugger will load to access the
@@ -6405,6 +6415,7 @@
 	  src/util/dbg/getfuncstack \
 	  src/util/osserv/Makefile \
 	  src/util/procmap/Makefile \
+	  src/util/other/Makefile \
           src/include/mpi.h \
 	  test/Makefile test/util/Makefile \
 	  test/basic/Makefile \

Modified: mpich2/trunk/src/include/mpiimpl.h
===================================================================
--- mpich2/trunk/src/include/mpiimpl.h	2009-03-05 21:25:26 UTC (rev 3943)
+++ mpich2/trunk/src/include/mpiimpl.h	2009-03-05 21:27:15 UTC (rev 3944)
@@ -3533,6 +3533,9 @@
 #define MPIR_GATHER_SHORT_MSG         2048  /* for intercommunicator scatter */
 #define MPIR_GATHERV_MIN_PROCS        32
 
+/* For pipelined collectives */
+#define MPIR_ALLGATHERV_PIPELINE_MSGSIZE   32768
+
 /* Tags for point to point operations which implement collective operations */
 #define MPIR_BARRIER_TAG               1
 #define MPIR_BCAST_TAG                 2

Modified: mpich2/trunk/src/mpi/coll/allgatherv.c
===================================================================
--- mpich2/trunk/src/mpi/coll/allgatherv.c	2009-03-05 21:25:26 UTC (rev 3943)
+++ mpich2/trunk/src/mpi/coll/allgatherv.c	2009-03-05 21:27:15 UTC (rev 3944)
@@ -695,6 +695,116 @@
         MPIU_Free((char*)tmp_buf + recvtype_true_lb);
     }
 
+#ifdef USE_PIPE_COLLECTIVES
+    else {
+	/* long message or medium-size message and non-power-of-two
+	 * no. of processes. Use ring algorithm. */
+	char * sbuf = NULL, * rbuf = NULL, * smsg, * rmsg;
+        int soffset, roffset;
+	int torecv, tosend, min;
+	int sendnow, recvnow;
+	int smsg_len, rmsg_len, sindex, rindex;
+
+        if (sendbuf != MPI_IN_PLACE) {
+            /* First, load the "local" version in the recvbuf. */
+            mpi_errno = MPIR_Localcopy(sendbuf, sendcount, sendtype, 
+				       ((char *)recvbuf + displs[rank]*recvtype_extent),
+                                       recvcounts[rank], recvtype);
+	    /* --BEGIN ERROR HANDLING-- */
+            if (mpi_errno)
+	    {
+		mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
+		return mpi_errno;
+	    }
+	    /* --END ERROR HANDLING-- */
+        }
+
+        left  = (comm_size + rank - 1) % comm_size;
+        right = (rank + 1) % comm_size;
+
+	torecv = total_count - recvcounts[rank];
+	tosend = total_count - recvcounts[right];
+
+	min = recvcounts[0];
+	for (i = 1; i < comm_size; i++)
+	    if (min > recvcounts[i])
+                min = recvcounts[i];
+	if (min * recvtype_extent < MPIR_ALLGATHERV_PIPELINE_MSGSIZE)
+	    min = MPIR_ALLGATHERV_PIPELINE_MSGSIZE / recvtype_extent;
+        /* Handle the case where the datatype extent is larger than
+         * the pipeline size. */
+        if (!min)
+            min = 1;
+
+        sindex = rank;
+        rindex = left;
+        soffset = 0;
+        roffset = 0;
+        while (tosend || torecv) { /* While we have data to send or receive */
+            sendnow = ((recvcounts[sindex] - soffset) > min) ? min : (recvcounts[sindex] - soffset);
+            recvnow = ((recvcounts[rindex] - roffset) > min) ? min : (recvcounts[rindex] - roffset);
+            sbuf = recvbuf + ((displs[sindex] + soffset) * recvtype_extent);
+            rbuf = recvbuf + ((displs[rindex] + roffset) * recvtype_extent);
+
+	    /* Communicate */
+	    if (!sendnow && !recvnow) {
+		/* Don't do anything. This case is possible if two
+		 * consecutive processes contribute 0 bytes each. */
+	    }
+	    else if (!sendnow) { /* If there's no data to send, just do a recv call */
+		MPIU_Assert(recvnow > 0);
+		mpi_errno = MPIC_Recv(rbuf, recvnow, recvtype, left, MPIR_ALLGATHERV_TAG, comm, &status);
+		/* --BEGIN ERROR HANDLING-- */
+		if (mpi_errno)
+		{
+		    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
+		    return mpi_errno;
+		}
+		/* --END ERROR HANDLING-- */
+		torecv -= recvnow;
+	    }
+	    else if (!recvnow) { /* If there's no data to receive, just do a send call */
+		MPIU_Assert(sendnow > 0);
+		mpi_errno = MPIC_Send(sbuf, sendnow, recvtype, right, MPIR_ALLGATHERV_TAG, comm);
+		/* --BEGIN ERROR HANDLING-- */
+		if (mpi_errno)
+		{
+		    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
+		    return mpi_errno;
+		}
+		/* --END ERROR HANDLING-- */
+		tosend -= sendnow;
+	    }
+	    else { /* There's data to be sent and received */
+		MPIU_Assert(sendnow > 0);
+		MPIU_Assert(recvnow > 0);
+		mpi_errno = MPIC_Sendrecv(sbuf, sendnow, recvtype, right, MPIR_ALLGATHERV_TAG, 
+					  rbuf, recvnow, recvtype, left, MPIR_ALLGATHERV_TAG,
+					  comm, &status);
+		/* --BEGIN ERROR HANDLING-- */
+		if (mpi_errno)
+		{
+		    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
+		    return mpi_errno;
+		}
+		/* --END ERROR HANDLING-- */
+		tosend -= sendnow;
+		torecv -= recvnow;
+	    }
+
+            soffset += sendnow;
+            roffset += recvnow;
+            if (soffset == recvcounts[sindex]) {
+                soffset = 0;
+                sindex = (sindex + comm_size - 1) % comm_size;
+            }
+            if (roffset == recvcounts[rindex]) {
+                roffset = 0;
+                rindex = (rindex + comm_size - 1) % comm_size;
+            }
+        }
+    }
+#else /* This case is retained as its more tested; we should eventually discard it */
     else {  /* long message or medium-size message and non-power-of-two
              * no. of processes. Use ring algorithm. */
 
@@ -735,6 +845,7 @@
             jnext = (comm_size + jnext - 1) % comm_size;
         }
     }
+#endif /* USE_PIPE_COLLECTIVES */
 
   /* check if multiple threads are calling this collective function */
     MPIDU_ERR_CHECK_MULTIPLE_THREADS_EXIT( comm_ptr );



More information about the mpich2-commits mailing list