[mpich2-commits] r3944 - in mpich2/trunk: . src/include src/mpi/coll
balaji at mcs.anl.gov
balaji at mcs.anl.gov
Thu Mar 5 15:27:15 CST 2009
Author: balaji
Date: 2009-03-05 15:27:15 -0600 (Thu, 05 Mar 2009)
New Revision: 3944
Modified:
mpich2/trunk/configure.in
mpich2/trunk/src/include/mpiimpl.h
mpich2/trunk/src/mpi/coll/allgatherv.c
Log:
Committing the pipelined allgatherv algorithm from the Euro PVM/MPI '08 paper. Reviewed by buntinas.
Modified: mpich2/trunk/configure.in
===================================================================
--- mpich2/trunk/configure.in 2009-03-05 21:25:26 UTC (rev 3943)
+++ mpich2/trunk/configure.in 2009-03-05 21:27:15 UTC (rev 3944)
@@ -352,6 +352,16 @@
AC_DEFINE(USE_SMP_COLLECTIVES,1,[define to enable SMP/multi-core aware collectives])
fi
+dnl Check for pipelined collectives
+AC_ARG_ENABLE(pipecoll,
+ [--enable-pipecoll - Enable support for pipelined collectives],
+ pipecoll=$enableval,
+ pipecoll=yes)
+
+if test $pipecoll = "yes" ; then
+ AC_DEFINE(USE_PIPE_COLLECTIVES,1,[define to enable pipelined collectives])
+fi
+
dnl
dnl The environment variable MPICH_DEBUGLIBNAME may be used to override the
dnl default name of the library that the debugger will load to access the
@@ -6405,6 +6415,7 @@
src/util/dbg/getfuncstack \
src/util/osserv/Makefile \
src/util/procmap/Makefile \
+ src/util/other/Makefile \
src/include/mpi.h \
test/Makefile test/util/Makefile \
test/basic/Makefile \
Modified: mpich2/trunk/src/include/mpiimpl.h
===================================================================
--- mpich2/trunk/src/include/mpiimpl.h 2009-03-05 21:25:26 UTC (rev 3943)
+++ mpich2/trunk/src/include/mpiimpl.h 2009-03-05 21:27:15 UTC (rev 3944)
@@ -3533,6 +3533,9 @@
#define MPIR_GATHER_SHORT_MSG 2048 /* for intercommunicator scatter */
#define MPIR_GATHERV_MIN_PROCS 32
+/* For pipelined collectives */
+#define MPIR_ALLGATHERV_PIPELINE_MSGSIZE 32768
+
/* Tags for point to point operations which implement collective operations */
#define MPIR_BARRIER_TAG 1
#define MPIR_BCAST_TAG 2
Modified: mpich2/trunk/src/mpi/coll/allgatherv.c
===================================================================
--- mpich2/trunk/src/mpi/coll/allgatherv.c 2009-03-05 21:25:26 UTC (rev 3943)
+++ mpich2/trunk/src/mpi/coll/allgatherv.c 2009-03-05 21:27:15 UTC (rev 3944)
@@ -695,6 +695,116 @@
MPIU_Free((char*)tmp_buf + recvtype_true_lb);
}
+#ifdef USE_PIPE_COLLECTIVES
+ else {
+ /* long message or medium-size message and non-power-of-two
+ * no. of processes. Use ring algorithm. */
+ char * sbuf = NULL, * rbuf = NULL, * smsg, * rmsg;
+ int soffset, roffset;
+ int torecv, tosend, min;
+ int sendnow, recvnow;
+ int smsg_len, rmsg_len, sindex, rindex;
+
+ if (sendbuf != MPI_IN_PLACE) {
+ /* First, load the "local" version in the recvbuf. */
+ mpi_errno = MPIR_Localcopy(sendbuf, sendcount, sendtype,
+ ((char *)recvbuf + displs[rank]*recvtype_extent),
+ recvcounts[rank], recvtype);
+ /* --BEGIN ERROR HANDLING-- */
+ if (mpi_errno)
+ {
+ mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
+ return mpi_errno;
+ }
+ /* --END ERROR HANDLING-- */
+ }
+
+ left = (comm_size + rank - 1) % comm_size;
+ right = (rank + 1) % comm_size;
+
+ torecv = total_count - recvcounts[rank];
+ tosend = total_count - recvcounts[right];
+
+ min = recvcounts[0];
+ for (i = 1; i < comm_size; i++)
+ if (min > recvcounts[i])
+ min = recvcounts[i];
+ if (min * recvtype_extent < MPIR_ALLGATHERV_PIPELINE_MSGSIZE)
+ min = MPIR_ALLGATHERV_PIPELINE_MSGSIZE / recvtype_extent;
+ /* Handle the case where the datatype extent is larger than
+ * the pipeline size. */
+ if (!min)
+ min = 1;
+
+ sindex = rank;
+ rindex = left;
+ soffset = 0;
+ roffset = 0;
+ while (tosend || torecv) { /* While we have data to send or receive */
+ sendnow = ((recvcounts[sindex] - soffset) > min) ? min : (recvcounts[sindex] - soffset);
+ recvnow = ((recvcounts[rindex] - roffset) > min) ? min : (recvcounts[rindex] - roffset);
+ sbuf = recvbuf + ((displs[sindex] + soffset) * recvtype_extent);
+ rbuf = recvbuf + ((displs[rindex] + roffset) * recvtype_extent);
+
+ /* Communicate */
+ if (!sendnow && !recvnow) {
+ /* Don't do anything. This case is possible if two
+ * consecutive processes contribute 0 bytes each. */
+ }
+ else if (!sendnow) { /* If there's no data to send, just do a recv call */
+ MPIU_Assert(recvnow > 0);
+ mpi_errno = MPIC_Recv(rbuf, recvnow, recvtype, left, MPIR_ALLGATHERV_TAG, comm, &status);
+ /* --BEGIN ERROR HANDLING-- */
+ if (mpi_errno)
+ {
+ mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
+ return mpi_errno;
+ }
+ /* --END ERROR HANDLING-- */
+ torecv -= recvnow;
+ }
+ else if (!recvnow) { /* If there's no data to receive, just do a send call */
+ MPIU_Assert(sendnow > 0);
+ mpi_errno = MPIC_Send(sbuf, sendnow, recvtype, right, MPIR_ALLGATHERV_TAG, comm);
+ /* --BEGIN ERROR HANDLING-- */
+ if (mpi_errno)
+ {
+ mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
+ return mpi_errno;
+ }
+ /* --END ERROR HANDLING-- */
+ tosend -= sendnow;
+ }
+ else { /* There's data to be sent and received */
+ MPIU_Assert(sendnow > 0);
+ MPIU_Assert(recvnow > 0);
+ mpi_errno = MPIC_Sendrecv(sbuf, sendnow, recvtype, right, MPIR_ALLGATHERV_TAG,
+ rbuf, recvnow, recvtype, left, MPIR_ALLGATHERV_TAG,
+ comm, &status);
+ /* --BEGIN ERROR HANDLING-- */
+ if (mpi_errno)
+ {
+ mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
+ return mpi_errno;
+ }
+ /* --END ERROR HANDLING-- */
+ tosend -= sendnow;
+ torecv -= recvnow;
+ }
+
+ soffset += sendnow;
+ roffset += recvnow;
+ if (soffset == recvcounts[sindex]) {
+ soffset = 0;
+ sindex = (sindex + comm_size - 1) % comm_size;
+ }
+ if (roffset == recvcounts[rindex]) {
+ roffset = 0;
+ rindex = (rindex + comm_size - 1) % comm_size;
+ }
+ }
+ }
+#else /* This case is retained as its more tested; we should eventually discard it */
else { /* long message or medium-size message and non-power-of-two
* no. of processes. Use ring algorithm. */
@@ -735,6 +845,7 @@
jnext = (comm_size + jnext - 1) % comm_size;
}
}
+#endif /* USE_PIPE_COLLECTIVES */
/* check if multiple threads are calling this collective function */
MPIDU_ERR_CHECK_MULTIPLE_THREADS_EXIT( comm_ptr );
More information about the mpich2-commits
mailing list