[mpich2-commits] r7791 - in mpich2/trunk: . src/include src/mpi/coll src/mpi/comm

goodell at mcs.anl.gov goodell at mcs.anl.gov
Thu Jan 20 16:09:17 CST 2011


Author: goodell
Date: 2011-01-20 16:09:17 -0600 (Thu, 20 Jan 2011)
New Revision: 7791

Modified:
   mpich2/trunk/CHANGES
   mpich2/trunk/src/include/mpiimpl.h
   mpich2/trunk/src/mpi/coll/ireduce.c
   mpich2/trunk/src/mpi/comm/commutil.c
Log:
default implementation of MPIX_Ireduce

This version only provides a binomial reduce algorithm that was lifted
from the existing MPI_Reduce code.

No reviewer.

Modified: mpich2/trunk/CHANGES
===================================================================
--- mpich2/trunk/CHANGES	2011-01-20 22:09:14 UTC (rev 7790)
+++ mpich2/trunk/CHANGES	2011-01-20 22:09:17 UTC (rev 7791)
@@ -6,7 +6,7 @@
    "MPIX_" functions (e.g. "MPIX_Ibcast").  All functions are provided and are
    available for device-level overrides, but only a subset of the functions have
    device-independent implementations at this time.  The current list of working
-   functions is: MPIX_Ibcast, MPIX_Ibarrier.
+   functions is: MPIX_Ibcast, MPIX_Ibarrier, MPIX_Ireduce.
 
 
 ===============================================================================

Modified: mpich2/trunk/src/include/mpiimpl.h
===================================================================
--- mpich2/trunk/src/include/mpiimpl.h	2011-01-20 22:09:14 UTC (rev 7790)
+++ mpich2/trunk/src/include/mpiimpl.h	2011-01-20 22:09:17 UTC (rev 7791)
@@ -3525,6 +3525,9 @@
 int MPIR_Ibcast_SMP(void *buffer, int count, MPI_Datatype datatype, int root, MPID_Comm *comm_ptr, MPID_Sched_t s);
 int MPIR_Ibarrier_intra(MPID_Comm *comm_ptr, MPID_Sched_t s);
 int MPIR_Ibarrier_inter(MPID_Comm *comm_ptr, MPID_Sched_t s);
+int MPIR_Ireduce_intra(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPID_Comm *comm_ptr, MPID_Sched_t s);
+int MPIR_Ireduce_inter(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPID_Comm *comm_ptr, MPID_Sched_t s);
+int MPIR_Ireduce_binomial(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPID_Comm *comm_ptr, MPID_Sched_t s);
 
 
 /* random initializers */

Modified: mpich2/trunk/src/mpi/coll/ireduce.c
===================================================================
--- mpich2/trunk/src/mpi/coll/ireduce.c	2011-01-20 22:09:14 UTC (rev 7790)
+++ mpich2/trunk/src/mpi/coll/ireduce.c	2011-01-20 22:09:17 UTC (rev 7791)
@@ -5,6 +5,7 @@
  */
 
 #include "mpiimpl.h"
+#include "collutil.h"
 
 /* -- Begin Profiling Symbol Block for routine MPIX_Ireduce */
 #if defined(HAVE_PRAGMA_WEAK)
@@ -25,6 +26,289 @@
 /* any non-MPI functions go here, especially non-static ones */
 
 #undef FUNCNAME
+#define FUNCNAME MPIR_Ireduce_binomial
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPIR_Ireduce_binomial(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPID_Comm *comm_ptr, MPID_Sched_t s)
+{
+    int mpi_errno = MPI_SUCCESS;
+    int comm_size, rank, is_commutative;
+    int mask, relrank, source, lroot;
+    MPI_Aint true_lb, true_extent, extent;
+    void *tmp_buf;
+    void *real_tmp_buf = NULL;
+    void *real_recvbuf = NULL;
+    MPIU_CHKPMEM_DECL(2);
+    MPIU_THREADPRIV_DECL;
+
+    MPIU_Assert(comm_ptr->comm_kind == MPID_INTRACOMM);
+
+    if (count == 0) return MPI_SUCCESS;
+
+    comm_size = comm_ptr->local_size;
+    rank = comm_ptr->rank;
+
+    /* set op_errno to 0. stored in perthread structure */
+    MPIU_THREADPRIV_GET;
+    MPIU_THREADPRIV_FIELD(op_errno) = 0;
+
+    /* Create a temporary buffer */
+
+    MPIR_Type_get_true_extent_impl(datatype, &true_lb, &true_extent);
+    MPID_Datatype_get_extent_macro(datatype, extent);
+
+    is_commutative = MPIR_Op_is_commutative(op);
+
+    /* I think this is the worse case, so we can avoid an assert()
+     * inside the for loop */
+    /* should be buf+{this}? */
+    MPID_Ensure_Aint_fits_in_pointer(count * MPIR_MAX(extent, true_extent));
+
+    MPIU_CHKPMEM_MALLOC(tmp_buf, void *, count*(MPIR_MAX(extent,true_extent)),
+                        mpi_errno, "temporary buffer");
+    /* adjust for potential negative lower bound in datatype */
+    real_tmp_buf = tmp_buf;
+    tmp_buf = (void *)((char*)tmp_buf - true_lb);
+
+    /* If I'm not the root, then my recvbuf may not be valid, therefore
+       I have to allocate a temporary one */
+    if (rank != root) {
+        MPIU_CHKPMEM_MALLOC(recvbuf, void *,
+                            count*(MPIR_MAX(extent,true_extent)),
+                            mpi_errno, "receive buffer");
+        real_recvbuf = recvbuf;
+        recvbuf = (void *)((char*)recvbuf - true_lb);
+    }
+
+    if ((rank != root) || (sendbuf != MPI_IN_PLACE)) {
+        /* could do this up front as an MPIR_Localcopy instead, but we'll defer
+         * it to the progress engine */
+        mpi_errno = MPID_Sched_copy(sendbuf, count, datatype, recvbuf, count, datatype, s);
+        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+        mpi_errno = MPID_Sched_barrier(s);
+        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    }
+
+    /* This code is from MPICH-1. */
+
+    /* Here's the algorithm.  Relative to the root, look at the bit pattern in
+       my rank.  Starting from the right (lsb), if the bit is 1, send to
+       the node with that bit zero and exit; if the bit is 0, receive from the
+       node with that bit set and combine (as long as that node is within the
+       group)
+
+       Note that by receiving with source selection, we guarentee that we get
+       the same bits with the same input.  If we allowed the parent to receive
+       the children in any order, then timing differences could cause different
+       results (roundoff error, over/underflows in some cases, etc).
+
+       Because of the way these are ordered, if root is 0, then this is correct
+       for both commutative and non-commutitive operations.  If root is not
+       0, then for non-commutitive, we use a root of zero and then send
+       the result to the root.  To see this, note that the ordering is
+       mask = 1: (ab)(cd)(ef)(gh)            (odds send to evens)
+       mask = 2: ((ab)(cd))((ef)(gh))        (3,6 send to 0,4)
+       mask = 4: (((ab)(cd))((ef)(gh)))      (4 sends to 0)
+
+       Comments on buffering.
+       If the datatype is not contiguous, we still need to pass contiguous
+       data to the user routine.
+       In this case, we should make a copy of the data in some format,
+       and send/operate on that.
+
+       In general, we can't use MPI_PACK, because the alignment of that
+       is rather vague, and the data may not be re-usable.  What we actually
+       need is a "squeeze" operation that removes the skips.
+    */
+    mask = 0x1;
+    if (is_commutative)
+        lroot = root;
+    else
+        lroot = 0;
+    relrank = (rank - lroot + comm_size) % comm_size;
+
+    while (mask < comm_size) {
+        /* Receive */
+        if ((mask & relrank) == 0) {
+            source = (relrank | mask);
+            if (source < comm_size) {
+                source = (source + lroot) % comm_size;
+                mpi_errno = MPID_Sched_recv(tmp_buf, count, datatype, source, comm_ptr, s);
+                if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+                mpi_errno = MPID_Sched_barrier(s);
+                if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+                /* The sender is above us, so the received buffer must be
+                   the second argument (in the noncommutative case). */
+                if (is_commutative) {
+                    mpi_errno = MPID_Sched_reduce(tmp_buf, recvbuf, count, datatype, op, s);
+                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+                }
+                else {
+                    mpi_errno = MPID_Sched_reduce(recvbuf, tmp_buf, count, datatype, op, s);
+                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+                    mpi_errno = MPID_Sched_barrier(s);
+                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+                    mpi_errno = MPID_Sched_copy(tmp_buf, count, datatype, recvbuf, count, datatype, s);
+                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+                }
+                mpi_errno = MPID_Sched_barrier(s);
+                if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            }
+        }
+        else {
+            /* I've received all that I'm going to.  Send my result to
+               my parent */
+            source = ((relrank & (~ mask)) + lroot) % comm_size;
+            mpi_errno = MPID_Sched_send(recvbuf, count, datatype, source, comm_ptr, s);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            mpi_errno = MPID_Sched_barrier(s);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+            break;
+        }
+        mask <<= 1;
+    }
+
+    if (!is_commutative && (root != 0))
+    {
+        if (rank == 0)
+        {
+            mpi_errno = MPID_Sched_send(recvbuf, count, datatype, root, comm_ptr, s);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            mpi_errno = MPID_Sched_barrier(s);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+        }
+        else if (rank == root)
+        {
+            mpi_errno = MPID_Sched_recv(recvbuf, count, datatype, 0, comm_ptr, s);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            mpi_errno = MPID_Sched_barrier(s);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+        }
+    }
+
+    if (real_tmp_buf) {
+        mpi_errno = MPID_Sched_cb(&MPIR_Sched_cb_free_buf, real_tmp_buf, s);
+        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    }
+    if (rank != root && real_recvbuf) {
+        mpi_errno = MPID_Sched_cb(&MPIR_Sched_cb_free_buf, real_recvbuf, s);
+        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    }
+
+    MPIU_CHKPMEM_COMMIT();
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    MPIU_CHKPMEM_REAP();
+    goto fn_exit;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPIR_Ireduce_intra
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPIR_Ireduce_intra(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPID_Comm *comm_ptr, MPID_Sched_t s)
+{
+    int mpi_errno = MPI_SUCCESS;
+
+    MPIU_Assert(comm_ptr->comm_kind == MPID_INTRACOMM);
+
+    /* TODO need to implement additional algorithms */
+    mpi_errno = MPIR_Ireduce_binomial(sendbuf, recvbuf, count, datatype, op, root, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPIR_Ireduce_inter
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPIR_Ireduce_inter(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPID_Comm *comm_ptr, MPID_Sched_t s)
+{
+    int mpi_errno = MPI_SUCCESS;
+    int rank;
+    MPI_Aint true_lb, true_extent, extent;
+    void *tmp_buf = NULL;
+    void *real_tmp_buf = NULL;
+    MPIU_CHKPMEM_DECL(1);
+
+    MPIU_Assert(comm_ptr->comm_kind == MPID_INTERCOMM);
+
+/*  Intercommunicator reduce.
+    Remote group does a local intracommunicator
+    reduce to rank 0. Rank 0 then sends data to root.
+*/
+
+    if (root == MPI_PROC_NULL) {
+        /* local processes other than root do nothing */
+        return MPI_SUCCESS;
+    }
+
+    if (root == MPI_ROOT) {
+        /* root receives data from rank 0 on remote group */
+        mpi_errno = MPID_Sched_recv(recvbuf, count, datatype, 0, comm_ptr, s);
+        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+        mpi_errno = MPID_Sched_barrier(s);
+        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    }
+    else {
+        /* remote group. Rank 0 allocates temporary buffer, does
+           local intracommunicator reduce, and then sends the data
+           to root. */
+        rank = comm_ptr->rank;
+
+        if (rank == 0) {
+            MPIR_Type_get_true_extent_impl(datatype, &true_lb, &true_extent);
+
+            MPID_Datatype_get_extent_macro(datatype, extent);
+            /* I think this is the worse case, so we can avoid an assert()
+             * inside the for loop */
+            /* Should MPIU_CHKPMEM_MALLOC do this? */
+            MPID_Ensure_Aint_fits_in_pointer(count * MPIR_MAX(extent, true_extent));
+            MPIU_CHKPMEM_MALLOC(tmp_buf, void *, count*(MPIR_MAX(extent,true_extent)), mpi_errno, "temporary buffer");
+            /* adjust for potential negative lower bound in datatype */
+            real_tmp_buf = tmp_buf;
+            tmp_buf = (void *)((char*)tmp_buf - true_lb);
+        }
+
+        if (!comm_ptr->local_comm) {
+            mpi_errno = MPIR_Setup_intercomm_localcomm(comm_ptr);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+        }
+
+        mpi_errno = MPIR_Ireduce_intra(sendbuf, tmp_buf, count, datatype, op, 0, comm_ptr->local_comm, s);
+        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+        mpi_errno = MPID_Sched_barrier(s);
+        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+        if (rank == 0) {
+            mpi_errno = MPID_Sched_send(tmp_buf, count, datatype, root, comm_ptr, s);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            mpi_errno = MPID_Sched_barrier(s);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+            if (real_tmp_buf) {
+                mpi_errno = MPID_Sched_cb(&MPIR_Sched_cb_free_buf, real_tmp_buf, s);
+                if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            }
+        }
+    }
+
+    MPIU_CHKPMEM_COMMIT();
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    MPIU_CHKPMEM_REAP();
+    goto fn_exit;
+}
+
+#undef FUNCNAME
 #define FUNCNAME MPIR_Ireduce_impl
 #undef FCNAME
 #define FCNAME MPIU_QUOTE(FUNCNAME)

Modified: mpich2/trunk/src/mpi/comm/commutil.c
===================================================================
--- mpich2/trunk/src/mpi/comm/commutil.c	2011-01-20 22:09:14 UTC (rev 7790)
+++ mpich2/trunk/src/mpi/comm/commutil.c	2011-01-20 22:09:17 UTC (rev 7791)
@@ -243,6 +243,7 @@
         /* intracomm default defaults... */
         ops->Ibcast = &MPIR_Ibcast_intra;
         ops->Ibarrier = &MPIR_Ibarrier_intra;
+        ops->Ireduce = &MPIR_Ireduce_intra;
         /* TODO add other fns here as they are added */
 
         /* override defaults, such as for SMP */
@@ -272,6 +273,7 @@
         /* intracomm defaults */
         ops->Ibcast = &MPIR_Ibcast_inter;
         ops->Ibarrier = &MPIR_Ibarrier_inter;
+        ops->Ireduce = &MPIR_Ireduce_inter;
 
         ic_default_collops = ops;
     }



More information about the mpich2-commits mailing list