[mpich2-commits] r7784 - in mpich2/trunk: . src/include src/mpi/coll src/mpi/pt2pt src/mpid/ch3 src/mpid/ch3/channels/nemesis/nemesis/include src/mpid/ch3/channels/nemesis/src src/mpid/ch3/channels/sock/src src/mpid/ch3/include src/mpid/common src/mpid/common/sched

goodell at mcs.anl.gov goodell at mcs.anl.gov
Thu Jan 20 16:08:58 CST 2011


Author: goodell
Date: 2011-01-20 16:08:58 -0600 (Thu, 20 Jan 2011)
New Revision: 7784

Added:
   mpich2/trunk/src/include/mpir_nbc.h
   mpich2/trunk/src/mpi/coll/nbcutil.c
   mpich2/trunk/src/mpid/common/sched/
   mpich2/trunk/src/mpid/common/sched/Makefile.sm
   mpich2/trunk/src/mpid/common/sched/mpid_sched.c
   mpich2/trunk/src/mpid/common/sched/mpid_sched.h
   mpich2/trunk/src/mpid/common/sched/mpid_sched_pre.h
Modified:
   mpich2/trunk/configure.in
   mpich2/trunk/src/include/mpiimpl.h
   mpich2/trunk/src/mpi/coll/Makefile.sm
   mpich2/trunk/src/mpi/coll/iallgather.c
   mpich2/trunk/src/mpi/coll/iallgatherv.c
   mpich2/trunk/src/mpi/coll/iallreduce.c
   mpich2/trunk/src/mpi/coll/ialltoall.c
   mpich2/trunk/src/mpi/coll/ialltoallv.c
   mpich2/trunk/src/mpi/coll/ialltoallw.c
   mpich2/trunk/src/mpi/coll/ibarrier.c
   mpich2/trunk/src/mpi/coll/ibcast.c
   mpich2/trunk/src/mpi/coll/iexscan.c
   mpich2/trunk/src/mpi/coll/igather.c
   mpich2/trunk/src/mpi/coll/igatherv.c
   mpich2/trunk/src/mpi/coll/ired_scat.c
   mpich2/trunk/src/mpi/coll/ired_scat_block.c
   mpich2/trunk/src/mpi/coll/ireduce.c
   mpich2/trunk/src/mpi/coll/iscan.c
   mpich2/trunk/src/mpi/coll/iscatter.c
   mpich2/trunk/src/mpi/coll/iscatterv.c
   mpich2/trunk/src/mpi/pt2pt/mpir_request.c
   mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_inline.h
   mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c
   mpich2/trunk/src/mpid/ch3/channels/sock/src/ch3_progress.c
   mpich2/trunk/src/mpid/ch3/include/mpidimpl.h
   mpich2/trunk/src/mpid/ch3/include/mpidpre.h
   mpich2/trunk/src/mpid/ch3/setup_device
   mpich2/trunk/src/mpid/common/Makefile.sm
Log:
new NBC schedule mechanism, including progress, but no working collectives

This change consists of four main parts:

 1. Updating all of the MPIR_Ifoo_impl routines to assume that all
    nonblocking collective functions are set in the coll_fns field.

 2. Defining the MPID_Sched_ portion of the ADI in mpir_nbc.h.

 3. Implementing that interface in src/mpid/common/sched

 4. Modifications to ch3:nemesis and ch3:sock in order to call the
    progress engine hook for the scheduling mechanism, as well as
    correctly including the various scheduling headers in CH3 headers.

Reviewed by balaji at .

Modified: mpich2/trunk/configure.in
===================================================================
--- mpich2/trunk/configure.in	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/configure.in	2011-01-20 22:08:58 UTC (rev 7784)
@@ -6364,6 +6364,7 @@
           src/mpid/common/Makefile \
           src/mpid/common/datatype/Makefile \
           src/mpid/common/datatype/dataloop/Makefile \
+          src/mpid/common/sched/Makefile \
           src/pm/Makefile \
           src/pmi/Makefile \
           src/pmi/${pmi_name}/Makefile \

Modified: mpich2/trunk/src/include/mpiimpl.h
===================================================================
--- mpich2/trunk/src/include/mpiimpl.h	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/include/mpiimpl.h	2011-01-20 22:08:58 UTC (rev 7784)
@@ -1305,6 +1305,7 @@
     MPID_PREQUEST_SEND,
     MPID_PREQUEST_RECV,
     MPID_UREQUEST,
+    MPID_COLL_REQUEST,
     MPID_LAST_REQUEST_KIND
 #ifdef MPID_DEV_REQUEST_KIND_DECL
     , MPID_DEV_REQUEST_KIND_DECL
@@ -1769,48 +1770,48 @@
                            MPID_Comm *);
 
     /* MPI-3 nonblocking collectives */
-    int (*Ibarrier)(MPID_Comm *comm_ptr, MPI_Request *request);
-    int (*Ibcast)(void *buffer, int count, MPI_Datatype datatype, MPID_Comm *comm_ptr,
-                  MPI_Request *request);
+    int (*Ibarrier)(MPID_Comm *comm_ptr, MPID_Sched_t s);
+    int (*Ibcast)(void *buffer, int count, MPI_Datatype datatype, int root,
+                  MPID_Comm *comm_ptr, MPID_Sched_t s);
     int (*Igather)(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
                    int recvcount, MPI_Datatype recvtype, int root, MPID_Comm *comm_ptr,
-                   MPI_Request *request);
+                   MPID_Sched_t s);
     int (*Igatherv)(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
                     int *recvcounts, int *displs, MPI_Datatype recvtype, int root,
-                    MPID_Comm *comm_ptr, MPI_Request *request);
+                    MPID_Comm *comm_ptr, MPID_Sched_t s);
     int (*Iscatter)(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
                     int recvcount, MPI_Datatype recvtype, int root, MPID_Comm *comm_ptr,
-                    MPI_Request *request);
+                    MPID_Sched_t s);
     int (*Iscatterv)(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype,
                      void *recvbuf, int recvcount, MPI_Datatype recvtype, int root,
-                     MPID_Comm *comm_ptr, MPI_Request *request);
+                     MPID_Comm *comm_ptr, MPID_Sched_t s);
     int (*Iallgather)(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
                       int recvcount, MPI_Datatype recvtype, MPID_Comm *comm_ptr,
-                      MPI_Request *request);
+                      MPID_Sched_t s);
     int (*Iallgatherv)(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
                        int *recvcounts, int *displs, MPI_Datatype recvtype, MPID_Comm *comm_ptr,
-                       MPI_Request *request);
+                       MPID_Sched_t s);
     int (*Ialltoall)(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf,
                      int recvcount, MPI_Datatype recvtype, MPID_Comm *comm_ptr,
-                     MPI_Request *request);
+                     MPID_Sched_t s);
     int (*Ialltoallv)(void *sendbuf, int *sendcounts, int *sdispls, MPI_Datatype sendtype,
                       void *recvbuf, int *recvcounts, int *rdispls, MPI_Datatype recvtype,
-                      MPID_Comm *comm_ptr, MPI_Request *request);
+                      MPID_Comm *comm_ptr, MPID_Sched_t s);
     int (*Ialltoallw)(void *sendbuf, int *sendcounts, int *sdispls, MPI_Datatype *sendtypes,
                       void *recvbuf, int *recvcounts, int *rdispls, MPI_Datatype *recvtypes,
-                      MPID_Comm *comm_ptr, MPI_Request *request);
+                      MPID_Comm *comm_ptr, MPID_Sched_t s);
     int (*Ireduce)(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op,
-                   int root, MPID_Comm *comm_ptr, MPI_Request *request);
+                   int root, MPID_Comm *comm_ptr, MPID_Sched_t s);
     int (*Iallreduce)(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op,
-                      MPID_Comm *comm_ptr, MPI_Request *request);
+                      MPID_Comm *comm_ptr, MPID_Sched_t s);
     int (*Ireduce_scatter)(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype,
-                           MPI_Op op, MPID_Comm *comm_ptr, MPI_Request *request);
+                           MPI_Op op, MPID_Comm *comm_ptr, MPID_Sched_t s);
     int (*Ireduce_scatter_block)(void *sendbuf, void *recvbuf, int recvcount, MPI_Datatype datatype,
-                                 MPI_Op op, MPID_Comm *comm_ptr, MPI_Request *request);
+                                 MPI_Op op, MPID_Comm *comm_ptr, MPID_Sched_t s);
     int (*Iscan)(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op,
-                 MPID_Comm *comm_ptr, MPI_Request *request);
+                 MPID_Comm *comm_ptr, MPID_Sched_t s);
     int (*Iexscan)(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op,
-                   MPID_Comm *comm_ptr, MPI_Request *request);
+                   MPID_Comm *comm_ptr, MPID_Sched_t s);
 } MPID_Collops;
 
 #define MPIR_BARRIER_TAG 1
@@ -3191,6 +3192,10 @@
   @*/
 int MPID_VCR_Get_lpid(MPID_VCR vcr, int * lpid_ptr);
 
+/* prototypes and declarations for the MPID_Sched interface for nonblocking
+ * collectives */
+#include "mpir_nbc.h"
+
 /* ------------------------------------------------------------------------- */
 /* Define a macro to allow us to select between statically selected functions
  * and dynamically loaded ones.  If USE_DYNAMIC_LIBRARIES is defined,

Added: mpich2/trunk/src/include/mpir_nbc.h
===================================================================
--- mpich2/trunk/src/include/mpir_nbc.h	                        (rev 0)
+++ mpich2/trunk/src/include/mpir_nbc.h	2011-01-20 22:08:58 UTC (rev 7784)
@@ -0,0 +1,87 @@
+/* -*- Mode: c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ *  (C) 2010 by Argonne National Laboratory.
+ *      See COPYRIGHT in top-level directory.
+ */
+
+#ifndef MPIR_NBC_H_INCLUDED
+#define MPIR_NBC_H_INCLUDED
+
+/* This specifies the interface that must be exposed by the ADI in order to
+ * support MPI-3 non-blocking collectives.  MPID_Sched_ routines are all
+ * permitted to be inlines.  They are not permitted to be macros.
+ *
+ * Most (currently all) devices will just use the default implementation that
+ * lives in "src/mpid/common/sched" */
+
+/* The device must supply a typedef for MPID_Sched_t.  MPID_Sched_t is a handle
+ * to the schedule (often a pointer under the hood), not the actual schedule.
+ * This makes it easy to cheaply pass the schedule between functions.  Many
+ *
+ * The device must also define a constant (possibly a macro) for an invalid
+ * schedule: MPID_SCHED_NULL */
+
+/* Context/tag strategy for send/recv ops:
+ * -------------------------------
+ *
+ * Blocking collectives were able to more or less safely separate all
+ * communication between different collectives by using a fixed tag per
+ * operation.  This prevents some potentially very surprising message matching
+ * patterns when two different collectives are posted on the same communicator
+ * in rapid succession.  But this strategy probably won't work for NBC because
+ * multiple operations of any combination of types can be outstanding at the
+ * same time.
+ *
+ * The MPI-3 draft standard says that all collective ops must be collectively
+ * posted in a consistent order w.r.t. other collective operations, including
+ * nonblocking collectives.  This means that we can just use a counter to assign
+ * tag values that is incremented at each collective start.  We can jump through
+ * some hoops to make sure that the blocking collective code is left
+ * undisturbed, but it's cleaner to just update them to use the new counter
+ * mechanism as well.
+ */
+
+int MPID_Sched_next_tag(int *next_tag);
+
+/* the device must provide a typedef for MPID_Sched_t in mpidpre.h */
+
+/* creates a new opaque schedule object and returns a handle to it in (*sp) */
+int MPID_Sched_create(MPID_Sched_t *sp);
+/* clones orig and returns a handle to the new schedule in (*cloned) */
+int MPID_Sched_clone(MPID_Sched_t orig, MPID_Sched_t *cloned);
+/* sets (*sp) to MPID_SCHED_NULL and gives you back a request pointer in (*req).
+ * The caller is giving up ownership of the opaque schedule object.
+ *
+ * comm should be the primary (user) communicator with which this collective is
+ * associated, even if other hidden communicators are used for a subset of the
+ * operations.  It will be used for error handling and similar operations. */
+int MPID_Sched_start(MPID_Sched_t *sp, MPID_Comm *comm, int tag, MPID_Request **req);
+
+/* send and recv take a comm ptr to enable hierarchical collectives */
+int MPID_Sched_send(void *buf, int count, MPI_Datatype datatype, int dest, MPID_Comm *comm, MPID_Sched_t s);
+int MPID_Sched_recv(void *buf, int count, MPI_Datatype datatype, int src, MPID_Comm *comm, MPID_Sched_t s);
+
+int MPID_Sched_reduce(void *inbuf, void *inoutbuf, int count, MPI_Datatype datatype, MPI_Op op, MPID_Sched_t s);
+/* packing/unpacking can be accomplished by passing MPI_PACKED as either intype
+ * or outtype */
+int MPID_Sched_copy(void *inbuf,  int incount,  MPI_Datatype intype,
+                    void *outbuf, int outcount, MPI_Datatype outtype, MPID_Sched_t s);
+/* require that all previously added ops are complete before subsequent ops
+ * may begin to execute */
+int MPID_Sched_barrier(MPID_Sched_t s);
+
+/* Sched_cb_t funcitons take a comm parameter, the value of which will be the
+ * comm passed to Sched_start */
+/* callback entries must be used judiciously, otherwise they will prevent
+ * caching opportunities */
+typedef int (MPID_Sched_cb_t)(MPID_Comm *comm, int tag, void *state);
+/* buffer management, fancy reductions, etc */
+int MPID_Sched_cb(MPID_Sched_cb_t *cb_p, void *cb_state, MPID_Sched_t s);
+
+/* TODO: develop a caching infrastructure for use by the upper level as well,
+ * hopefully s.t. uthash can be used somehow */
+
+/* common callback utility functions */
+int MPIR_Sched_cb_free_buf(MPID_Comm *comm, int tag, void *state);
+
+#endif /* !defined(MPIR_NBC_H_INCLUDED) */

Modified: mpich2/trunk/src/mpi/coll/Makefile.sm
===================================================================
--- mpich2/trunk/src/mpi/coll/Makefile.sm	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/Makefile.sm	2011-01-20 22:08:58 UTC (rev 7784)
@@ -11,7 +11,8 @@
 
 lib${MPILIBNAME}_a_SOURCES = $(mpi_sources) \
  helper_fns.c opsum.c opmax.c opmin.c opband.c opbor.c opbxor.c \
- opland.c oplor.c oplxor.c opprod.c opminloc.c opmaxloc.c
+ opland.c oplor.c oplxor.c opprod.c opminloc.c opmaxloc.c \
+ nbcutil.c
 
 INCLUDES = -I../../include -I${top_srcdir}/src/include -I${top_srcdir}/src/mpi/datatype
 profilelib_${MPILIBNAME} = ${PMPILIBNAME}

Modified: mpich2/trunk/src/mpi/coll/iallgather.c
===================================================================
--- mpich2/trunk/src/mpi/coll/iallgather.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/iallgather.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Iallgather_impl(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Iallgather != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Iallgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Iallgather not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Iallgather_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Iallgather_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Iallgather != NULL);
+    mpi_errno = comm_ptr->coll_fns->Iallgather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/iallgatherv.c
===================================================================
--- mpich2/trunk/src/mpi/coll/iallgatherv.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/iallgatherv.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Iallgatherv_impl(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Iallgatherv != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Iallgatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Iallgatherv not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Iallgatherv_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Iallgatherv_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Iallgatherv != NULL);
+    mpi_errno = comm_ptr->coll_fns->Iallgatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/iallreduce.c
===================================================================
--- mpich2/trunk/src/mpi/coll/iallreduce.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/iallreduce.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Iallreduce_impl(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Iallreduce != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Iallreduce(sendbuf, recvbuf, count, datatype, op, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Iallreduce not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Iallreduce_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Iallreduce_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Iallreduce != NULL);
+    mpi_errno = comm_ptr->coll_fns->Iallreduce(sendbuf, recvbuf, count, datatype, op, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/ialltoall.c
===================================================================
--- mpich2/trunk/src/mpi/coll/ialltoall.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/ialltoall.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Ialltoall_impl(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Ialltoall != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Ialltoall(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Ialltoall not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Ialltoall_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Ialltoall_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Ialltoall != NULL);
+    mpi_errno = comm_ptr->coll_fns->Ialltoall(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/ialltoallv.c
===================================================================
--- mpich2/trunk/src/mpi/coll/ialltoallv.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/ialltoallv.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Ialltoallv_impl(void *sendbuf, int *sendcounts, int *sdispls, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *rdispls, MPI_Datatype recvtype, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Ialltoallv != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Ialltoallv(sendbuf, sendcounts, sdispls, sendtype, recvbuf, recvcounts, rdispls, recvtype, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Ialltoallv not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Ialltoallv_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Ialltoallv_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Ialltoallv != NULL);
+    mpi_errno = comm_ptr->coll_fns->Ialltoallv(sendbuf, sendcounts, sdispls, sendtype, recvbuf, recvcounts, rdispls, recvtype, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/ialltoallw.c
===================================================================
--- mpich2/trunk/src/mpi/coll/ialltoallw.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/ialltoallw.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Ialltoallw_impl(void *sendbuf, int *sendcounts, int *sdispls, MPI_Datatype *sendtypes, void *recvbuf, int *recvcounts, int *rdispls, MPI_Datatype *recvtypes, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Ialltoallw != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Ialltoallw(sendbuf, sendcounts, sdispls, sendtypes, recvbuf, recvcounts, rdispls, recvtypes, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Ialltoallw not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Ialltoallw_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Ialltoallw_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Ialltoallw != NULL);
+    mpi_errno = comm_ptr->coll_fns->Ialltoallw(sendbuf, sendcounts, sdispls, sendtypes, recvbuf, recvcounts, rdispls, recvtypes, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/ibarrier.c
===================================================================
--- mpich2/trunk/src/mpi/coll/ibarrier.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/ibarrier.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Ibarrier_impl(MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Ibarrier != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Ibarrier(comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Ibarrier not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Ibarrier_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Ibarrier_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Ibarrier != NULL);
+    mpi_errno = comm_ptr->coll_fns->Ibarrier(comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/ibcast.c
===================================================================
--- mpich2/trunk/src/mpi/coll/ibcast.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/ibcast.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Ibcast_impl(void *buffer, int count, MPI_Datatype datatype, int root, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Ibcast != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Ibcast(buffer, count, datatype, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Ibcast not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Ibcast_intra(buffer, count, datatype, root, comm_ptr, request);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Ibcast_inter(buffer, count, datatype, root, comm_ptr, request);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Ibcast != NULL);
+    mpi_errno = comm_ptr->coll_fns->Ibcast(buffer, count, datatype, root, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/iexscan.c
===================================================================
--- mpich2/trunk/src/mpi/coll/iexscan.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/iexscan.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Iexscan_impl(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Iexscan != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Iexscan(sendbuf, recvbuf, count, datatype, op, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Iexscan not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Iexscan_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Iexscan_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Iexscan != NULL);
+    mpi_errno = comm_ptr->coll_fns->Iexscan(sendbuf, recvbuf, count, datatype, op, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/igather.c
===================================================================
--- mpich2/trunk/src/mpi/coll/igather.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/igather.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Igather_impl(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Igather != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Igather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Igather not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Igather_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Igather_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Igather != NULL);
+    mpi_errno = comm_ptr->coll_fns->Igather(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/igatherv.c
===================================================================
--- mpich2/trunk/src/mpi/coll/igatherv.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/igatherv.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Igatherv_impl(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, int root, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Igatherv != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Igatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Igatherv not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Igatherv_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Igatherv_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Igatherv != NULL);
+    mpi_errno = comm_ptr->coll_fns->Igatherv(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, root, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/ired_scat.c
===================================================================
--- mpich2/trunk/src/mpi/coll/ired_scat.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/ired_scat.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Ireduce_scatter_impl(void *sendbuf, void *recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Ireduce_scatter != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Ireduce_scatter(sendbuf, recvbuf, recvcounts, datatype, op, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Ireduce_scatter not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Ireduce_scatter_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Ireduce_scatter_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Ireduce_scatter != NULL);
+    mpi_errno = comm_ptr->coll_fns->Ireduce_scatter(sendbuf, recvbuf, recvcounts, datatype, op, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/ired_scat_block.c
===================================================================
--- mpich2/trunk/src/mpi/coll/ired_scat_block.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/ired_scat_block.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Ireduce_scatter_block_impl(void *sendbuf, void *recvbuf, int recvcount, MPI_Datatype datatype, MPI_Op op, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Ireduce_scatter_block != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Ireduce_scatter_block(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Ireduce_scatter_block not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Ireduce_scatter_block_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Ireduce_scatter_block_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Ireduce_scatter_block != NULL);
+    mpi_errno = comm_ptr->coll_fns->Ireduce_scatter_block(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/ireduce.c
===================================================================
--- mpich2/trunk/src/mpi/coll/ireduce.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/ireduce.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Ireduce_impl(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Ireduce != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Ireduce(sendbuf, recvbuf, count, datatype, op, root, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Ireduce not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Ireduce_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Ireduce_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Ireduce != NULL);
+    mpi_errno = comm_ptr->coll_fns->Ireduce(sendbuf, recvbuf, count, datatype, op, root, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/iscan.c
===================================================================
--- mpich2/trunk/src/mpi/coll/iscan.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/iscan.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Iscan_impl(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Iscan != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Iscan(sendbuf, recvbuf, count, datatype, op, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Iscan not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Iscan_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Iscan_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Iscan != NULL);
+    mpi_errno = comm_ptr->coll_fns->Iscan(sendbuf, recvbuf, count, datatype, op, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/iscatter.c
===================================================================
--- mpich2/trunk/src/mpi/coll/iscatter.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/iscatter.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Iscatter_impl(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Iscatter != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Iscatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Iscatter not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Iscatter_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Iscatter_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Iscatter != NULL);
+    mpi_errno = comm_ptr->coll_fns->Iscatter(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, root, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Modified: mpich2/trunk/src/mpi/coll/iscatterv.c
===================================================================
--- mpich2/trunk/src/mpi/coll/iscatterv.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/coll/iscatterv.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -31,29 +31,27 @@
 int MPIR_Iscatterv_impl(void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPID_Comm *comm_ptr, MPI_Request *request)
 {
     int mpi_errno = MPI_SUCCESS;
+    int tag = -1;
+    MPID_Request *reqp = NULL;
+    MPID_Sched_t s = MPID_SCHED_NULL;
 
-    if (comm_ptr->coll_fns != NULL && comm_ptr->coll_fns->Iscatterv != NULL) {
-        mpi_errno = comm_ptr->coll_fns->Iscatterv(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, comm_ptr, request);
-        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-    }
-    else {
-        MPID_Abort(comm_ptr, MPI_ERR_OTHER, 1, "default version of MPIX_Iscatterv not yet implemented");
-        MPIU_Assertp(0); /* never get here */
-        /* TODO implement the following functions and uncomment this code: */
-#if 0
-        if (comm_ptr->comm_kind == MPID_INTRACOMM) {
-            /* intracommunicator */
-            mpi_errno = MPIR_Iscatterv_intra(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-        else {
-            /* intercommunicator */
-            mpi_errno = MPIR_Iscatterv_inter(sendbuf, recvbuf, recvcount, datatype, op, comm_ptr);
-            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
-        }
-#endif
-    }
+    *request = MPI_REQUEST_NULL;
 
+    mpi_errno = MPID_Sched_next_tag(&tag);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+    mpi_errno = MPID_Sched_create(&s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    MPIU_Assert(comm_ptr->coll_fns != NULL);
+    MPIU_Assert(comm_ptr->coll_fns->Iscatterv != NULL);
+    mpi_errno = comm_ptr->coll_fns->Iscatterv(sendbuf, sendcounts, displs, sendtype, recvbuf, recvcount, recvtype, root, comm_ptr, s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    mpi_errno = MPID_Sched_start(&s, comm_ptr, tag, &reqp);
+    if (reqp)
+        *request = reqp->handle;
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
 fn_exit:
     return mpi_errno;
 fn_fail:

Added: mpich2/trunk/src/mpi/coll/nbcutil.c
===================================================================
--- mpich2/trunk/src/mpi/coll/nbcutil.c	                        (rev 0)
+++ mpich2/trunk/src/mpi/coll/nbcutil.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -0,0 +1,18 @@
+/* -*- Mode: c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ *  (C) 2010 by Argonne National Laboratory.
+ *      See COPYRIGHT in top-level directory.
+ */
+
+#include "mpiimpl.h"
+
+#undef FUNCNAME
+#define FUNCNAME MPIR_Sched_cb_free_buf
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPIR_Sched_cb_free_buf(MPID_Comm *comm, int tag, void *state)
+{
+    MPIU_Free(state);
+    return MPI_SUCCESS;
+}
+

Modified: mpich2/trunk/src/mpi/pt2pt/mpir_request.c
===================================================================
--- mpich2/trunk/src/mpi/pt2pt/mpir_request.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpi/pt2pt/mpir_request.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -218,6 +218,14 @@
 	    
 	    break;
 	}
+
+        case MPID_COLL_REQUEST:
+        {
+            MPIR_Request_extract_status(request_ptr, status);
+            MPID_Request_release(request_ptr);
+            *request = MPI_REQUEST_NULL;
+            break;
+        }
 	
 	default:
 	{

Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_inline.h
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_inline.h	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/nemesis/include/mpid_nem_inline.h	2011-01-20 22:08:58 UTC (rev 7784)
@@ -34,10 +34,11 @@
 
 /* evaluates to TRUE if it is safe to block on recv operations in the progress
  * loop, FALSE otherwise */
-#define MPID_nem_safe_to_block_recv()         \
+#define MPID_nem_safe_to_block_recv()             \
     (!MPID_nem_local_lmt_pending               && \
      !MPIDI_CH3I_active_send[CH3_NORMAL_QUEUE] && \
-     !MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE))
+     !MPIDI_CH3I_SendQ_head(CH3_NORMAL_QUEUE)  && \
+     !MPIDU_Sched_are_pending())
 
 #undef FUNCNAME
 #define FUNCNAME MPID_nem_mpich2_send_header

Modified: mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpid/ch3/channels/nemesis/src/ch3_progress.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -239,6 +239,7 @@
 #if !defined(ENABLE_NO_YIELD) || defined(MPICH_IS_THREADED)
     int pollcount = 0;
 #endif
+    int made_progress = FALSE;
     MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS);
 
     MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS);
@@ -443,6 +444,13 @@
             if (mpi_errno) MPIU_ERR_POP(mpi_errno);
         }
 
+        /* make progress on NBC schedules */
+        mpi_errno = MPIDU_Sched_progress(&made_progress);
+        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+        if (made_progress) {
+            MPIDI_CH3_Progress_signal_completion();
+        }
+
         /* in the case of progress_wait, bail out if anything completed (CC-1) */
         if (is_blocking) {
             int completion_count = OPA_load_int(&MPIDI_CH3I_progress_completion_count);

Modified: mpich2/trunk/src/mpid/ch3/channels/sock/src/ch3_progress.c
===================================================================
--- mpich2/trunk/src/mpid/ch3/channels/sock/src/ch3_progress.c	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpid/ch3/channels/sock/src/ch3_progress.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -169,6 +169,16 @@
     
     do
     {
+        int made_progress = FALSE;
+
+        /* make progress on NBC schedules, must come before we block on sock_wait */
+        mpi_errno = MPIDU_Sched_progress(&made_progress);
+        if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+        if (made_progress) {
+            MPIDI_CH3_Progress_signal_completion();
+            break;
+        }
+
 #       ifdef MPICH_IS_THREADED
 
 	/* The logic for this case is just complicated enough that

Modified: mpich2/trunk/src/mpid/ch3/include/mpidimpl.h
===================================================================
--- mpich2/trunk/src/mpid/ch3/include/mpidimpl.h	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpid/ch3/include/mpidimpl.h	2011-01-20 22:08:58 UTC (rev 7784)
@@ -30,6 +30,8 @@
 /* Add the ch3 packet definitions */
 #include "mpidpkt.h"
 
+#include "mpid_sched.h"
+
 /* We need to match the size of MPIR_Pint to the relevant Format control
  */
 #define MPIDI_MSG_SZ_FMT MPIR_PINT_FMT_DEC_SPEC

Modified: mpich2/trunk/src/mpid/ch3/include/mpidpre.h
===================================================================
--- mpich2/trunk/src/mpid/ch3/include/mpidpre.h	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpid/ch3/include/mpidpre.h	2011-01-20 22:08:58 UTC (rev 7784)
@@ -59,6 +59,10 @@
 typedef MPIR_Rank_t MPID_Node_id_t;
 
 
+/* provides "pre" typedefs and such for NBC scheduling mechanism */
+#include "mpid_sched_pre.h"
+
+
 /* For the typical communication system for which the ch3 channel is
    appropriate, 16 bits is sufficient for the rank.  By also using 16
    bits for the context, we can reduce the size of the match

Modified: mpich2/trunk/src/mpid/ch3/setup_device
===================================================================
--- mpich2/trunk/src/mpid/ch3/setup_device	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpid/ch3/setup_device	2011-01-20 22:08:58 UTC (rev 7784)
@@ -27,6 +27,7 @@
 pathlist=""
 pathlist="$pathlist src/mpid/${device_name}/include"
 pathlist="$pathlist src/mpid/common/datatype"
+pathlist="$pathlist src/mpid/common/sched"
 pathlist="$pathlist src/mpid/common/locks"
 for path in $pathlist ; do
     CPPFLAGS="$CPPFLAGS -I${master_top_builddir}/${path} -I${master_top_srcdir}/${path}"

Modified: mpich2/trunk/src/mpid/common/Makefile.sm
===================================================================
--- mpich2/trunk/src/mpid/common/Makefile.sm	2011-01-20 22:08:51 UTC (rev 7783)
+++ mpich2/trunk/src/mpid/common/Makefile.sm	2011-01-20 22:08:58 UTC (rev 7784)
@@ -1,4 +1,9 @@
 # Other_dirs have Makefile.in built
 OTHER_DIRS = sock
+
+# "sched" is in SUBDIRS for now, because nobody is building it conditionally at
+# this point.  If someone wants to use their own MPID_Sched_ implementation, we
+# can move it up to OTHER_DIRS
+#
 # Subdirs are *always* built
-SUBDIRS = datatype .
+SUBDIRS = datatype sched .

Added: mpich2/trunk/src/mpid/common/sched/Makefile.sm
===================================================================
--- mpich2/trunk/src/mpid/common/sched/Makefile.sm	                        (rev 0)
+++ mpich2/trunk/src/mpid/common/sched/Makefile.sm	2011-01-20 22:08:58 UTC (rev 7784)
@@ -0,0 +1,7 @@
+lib${MPILIBNAME}_a_SOURCES = \
+    mpid_sched.c
+
+HEADERS = 
+# is this INCLUDES necessary?
+INCLUDES = -I${srcdir} -I${top_builddir}/src/include -I${master_top_srcdir}/src/include
+SUBDIRS = .

Added: mpich2/trunk/src/mpid/common/sched/mpid_sched.c
===================================================================
--- mpich2/trunk/src/mpid/common/sched/mpid_sched.c	                        (rev 0)
+++ mpich2/trunk/src/mpid/common/sched/mpid_sched.c	2011-01-20 22:08:58 UTC (rev 7784)
@@ -0,0 +1,655 @@
+/* -*- Mode: c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ *  (C) 2010 by Argonne National Laboratory.
+ *      See COPYRIGHT in top-level directory.
+ */
+
+#include "mpidimpl.h"
+#include "mpl_utlist.h"
+
+/* A random guess at an appropriate value, we can tune it later.  It could also
+ * be a real tunable parameter. */
+#define MPIDU_SCHED_INITIAL_ENTRIES (16)
+
+#if 0
+#define dprintf fprintf
+#else
+/* FIXME this requires VA_ARGS macros */
+#define dprintf(...) do{}while(0)
+#endif
+
+/* TODO move to a header somewhere? */
+void MPIDU_Sched_dump(struct MPIDU_Sched *s);
+void MPIDU_Sched_dump_fh(struct MPIDU_Sched *s, FILE *fh);
+
+struct MPIDU_Sched_state {
+    struct MPIDU_Sched *head;
+    /* no need for a tail with utlist */
+};
+
+/* holds on to all incomplete schedules on which progress should be made */
+struct MPIDU_Sched_state all_schedules = {NULL};
+
+/* FIXME MT needs locking or atomic access for fine-grained threading */
+static int next_sched_tag = 0;
+
+/* returns TRUE if any schedules are currently pending completion by the
+ * progress engine, FALSE otherwise */
+#undef FUNCNAME
+#define FUNCNAME MPIDU_Sched_are_pending
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPIDU_Sched_are_pending(void)
+{
+    return (all_schedules.head != NULL);
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPID_Sched_next_tag
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_Sched_next_tag(int *tag)
+{
+    *tag = next_sched_tag;
+
+    /* TODO there should be an internal accessor/utility macro for getting the
+     * TAG_UB value that doesn't require using the attribute interface */
+    next_sched_tag = (next_sched_tag + 1) % MPIR_Process.attrs.tag_ub;
+    return MPI_SUCCESS;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPIDU_Sched_start_entry
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+/* initiates the schedule entry "e" in the NBC described by "r" and "s", where
+ * "e" is at "index" in "s".  This means posting nonblocking sends/recvs,
+ * performing reductions, calling callbacks, etc. */
+static int MPIDU_Sched_start_entry(struct MPIDU_Sched *s, int index, struct MPIDU_Sched_entry *e)
+{
+    int mpi_errno = MPI_SUCCESS;
+    int context_offset;
+    MPID_Request *r = s->req;
+
+    MPIU_Assert(e->status == MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED);
+
+    switch (e->type) {
+        case MPIDU_SCHED_ENTRY_SEND:
+            context_offset = (e->u.send.comm->comm_kind == MPID_INTRACOMM) ?
+                MPID_CONTEXT_INTRA_COLL : MPID_CONTEXT_INTER_COLL;
+            mpi_errno = MPID_Isend(e->u.send.buf, e->u.send.count, e->u.send.datatype,
+                                   e->u.send.dest, s->tag, r->comm, context_offset,
+                                   &e->u.send.sreq);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
+            break;
+        case MPIDU_SCHED_ENTRY_RECV:
+            context_offset = (e->u.recv.comm->comm_kind == MPID_INTRACOMM) ?
+                MPID_CONTEXT_INTRA_COLL : MPID_CONTEXT_INTER_COLL;
+            mpi_errno = MPID_Irecv(e->u.recv.buf, e->u.recv.count, e->u.recv.datatype,
+                                   e->u.recv.src, s->tag, r->comm, context_offset,
+                                   &e->u.recv.rreq);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED;
+            break;
+        case MPIDU_SCHED_ENTRY_REDUCE:
+            mpi_errno = MPIR_Reduce_local_impl(e->u.reduce.inbuf, e->u.reduce.inoutbuf, e->u.reduce.count,
+                                               e->u.reduce.datatype, e->u.reduce.op);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
+            break;
+        case MPIDU_SCHED_ENTRY_COPY:
+            mpi_errno = MPIR_Localcopy(e->u.copy.inbuf, e->u.copy.incount, e->u.copy.intype,
+                                       e->u.copy.outbuf, e->u.copy.outcount, e->u.copy.outtype);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
+            break;
+        case MPIDU_SCHED_ENTRY_NOP:
+            /* nothing to be done */
+            break;
+        case MPIDU_SCHED_ENTRY_CB:
+            mpi_errno = e->u.cb.cb_p(r->comm, s->tag, e->u.cb.cb_state);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+            e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
+            break;
+        default:
+            MPIU_Assert_fmt_msg(FALSE, ("unknown entry type, e->type=%d", e->type));
+            break;
+    }
+
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED;
+    if (r)
+        r->status.MPI_ERROR = mpi_errno;
+    goto fn_exit;
+}
+
+/* Posts or performs any NOT_STARTED operations in the given schedule that are
+ * permitted to be started.  That is, this routine will respect schedule
+ * barriers appropriately. */
+#undef FUNCNAME
+#define FUNCNAME MPIDU_Sched_continue
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static int MPIDU_Sched_continue(struct MPIDU_Sched *s)
+{
+    int mpi_errno = MPI_SUCCESS;
+    int i;
+
+    for (i = s->idx; i < s->num_entries; ++i) {
+        struct MPIDU_Sched_entry *e = &s->entries[i];
+
+        if (e->status == MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED) {
+            mpi_errno = MPIDU_Sched_start_entry(s, i, e);
+            if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+        }
+
+        /* _start_entry may have completed the operation, but won't update s->idx */
+        if (i == s->idx && e->status == MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
+            ++s->idx; /* this is valid even for barrier entries */
+        }
+
+        /* watch the indexing, s->idx might have been incremented above, so
+         * ||-short-circuit matters here */
+        if (e->is_barrier && (e->status != MPIDU_SCHED_ENTRY_STATUS_COMPLETE || (s->idx != i+1))) {
+            /* we've hit a barrier but outstanding operations before this
+             * barrier remain, so we cannot proceed past the barrier */
+            break;
+        }
+    }
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPID_Sched_create
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+/* creates a new opaque schedule object and returns a handle to it in (*sp) */
+int MPID_Sched_create(MPID_Sched_t *sp)
+{
+    int mpi_errno = MPI_SUCCESS;
+    struct MPIDU_Sched *s;
+    MPIU_CHKPMEM_DECL(2);
+
+    *sp = NULL;
+
+    /* this mem will be freed by the progress engine when the request is completed */
+    MPIU_CHKPMEM_MALLOC(s, struct MPIDU_Sched *, sizeof(struct MPIDU_Sched), mpi_errno, "schedule object");
+
+    s->size = MPIDU_SCHED_INITIAL_ENTRIES;
+    s->idx = 0;
+    s->num_entries = 0;
+    s->tag = -1;
+    s->req = NULL;
+    s->entries = NULL;
+    s->next = NULL; /* only needed for sanity checks */
+    s->prev = NULL; /* only needed for sanity checks */
+
+    /* this mem will be freed by the progress engine when the request is completed */
+    MPIU_CHKPMEM_MALLOC(s->entries, struct MPIDU_Sched_entry *, MPIDU_SCHED_INITIAL_ENTRIES*sizeof(struct MPIDU_Sched_entry), mpi_errno, "schedule entries vector");
+
+    /* TODO in a debug build, defensively mark all entries as status=INVALID */
+
+    MPIU_CHKPMEM_COMMIT();
+    *sp = s;
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    MPIU_CHKPMEM_REAP();
+    goto fn_exit;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPID_Sched_clone
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+/* clones orig and returns a handle to the new schedule in (*cloned) */
+int MPID_Sched_clone(MPID_Sched_t orig, MPID_Sched_t *cloned)
+{
+    int mpi_errno = MPI_SUCCESS;
+    /* TODO implement this function for real */
+    MPIU_Assert_fmt_msg(FALSE, ("clone not yet implemented"));
+    MPIU_Assertp(FALSE);
+    return mpi_errno;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPID_Sched_start
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+/* sets (*sp) to MPID_SCHED_NULL and gives you back a request pointer in (*req).
+ * The caller is giving up ownership of the opaque schedule object. */
+int MPID_Sched_start(MPID_Sched_t *sp, MPID_Comm *comm, int tag, MPID_Request **req)
+{
+    int mpi_errno = MPI_SUCCESS;
+    MPID_Request *r;
+    struct MPIDU_Sched *s = *sp;
+
+    *req = NULL;
+    *sp = MPID_SCHED_NULL;
+
+    /* sanity check the schedule */
+    MPIU_Assert(s->num_entries <= s->size);
+    MPIU_Assert(s->num_entries == 0 || s->idx < s->num_entries);
+    MPIU_Assert(s->req == NULL);
+    MPIU_Assert(s->next == NULL);
+    MPIU_Assert(s->prev == NULL);
+    MPIU_Assert(s->entries != NULL);
+
+    /* now create and populate the request */
+    r = MPID_Request_create();
+    if (!r) MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem");
+    r->kind = MPID_COLL_REQUEST;
+    /* FIXME is this right when comm/datatype GC is used? */
+    MPIR_Comm_add_ref(comm);
+    r->comm = comm;
+    /* req refcount is currently 1, for the user's request.  Increment for the
+     * schedule's reference */
+    MPIR_Request_add_ref(r);
+    s->req = r;
+    *req = r;
+    /* cc is 1, which is fine b/c we only use it as a signal, rather than
+     * incr/decr on every constituent operation */
+    s->tag = tag;
+
+    /* Now kick off any initial operations.  Do this before we tell the progress
+     * engine about this req+sched, otherwise we have more MT issues to worry
+     * about.  Skipping this step will increase latency. */
+    mpi_errno = MPIDU_Sched_continue(s);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    /* finally, enqueue in the list of all pending schedules so that the
+     * progress engine can make progress on it */
+    DL_APPEND(all_schedules.head, s);
+
+    dprintf(stderr, "started schedule s=%p\n", s); /* XXX DJG */
+    MPIDU_Sched_dump(s);
+
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    if (*req)
+        *req = NULL;
+    if (r) {
+        /* overly complicated, but we can't just destroy the req directly
+         * because we aren't truly inside the device and don't have access to
+         * MPIDI_CH3_Request_destroy (we may not even be CH3) */
+        int inuse = TRUE;
+        MPIR_Request_release_ref(r, &inuse); /* the schedule's ref */
+        MPIU_Assert(inuse);
+        MPID_Request_release(r); /* the user's ref */
+    }
+
+    goto fn_exit;
+}
+
+
+#undef FUNCNAME
+#define FUNCNAME MPIDU_Sched_add_entry
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+/* index and e are permitted to be NULL */
+static int MPIDU_Sched_add_entry(struct MPIDU_Sched *s, int *index, struct MPIDU_Sched_entry **e)
+{
+    int mpi_errno = MPI_SUCCESS;
+    int i;
+    struct MPIDU_Sched_entry *ei;
+
+    MPIU_Assert(s->entries != NULL);
+    MPIU_Assert(s->size > 0);
+
+    if (s->num_entries == s->size) {
+        /* need to grow the entries array */
+        s->entries = MPIU_Realloc(s->entries, s->size * 2);
+        if (s->entries == NULL) MPIU_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem");
+        s->size *= 2;
+    }
+
+    i = s->num_entries++;
+    ei = &s->entries[i];
+
+    if (index != NULL)
+        *index = i;
+    if (e != NULL)
+        *e = ei;
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPID_Sched_send
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+/* do these ops need an entry handle returned? */
+int MPID_Sched_send(void *buf, int count, MPI_Datatype datatype, int dest, MPID_Comm *comm, MPID_Sched_t s)
+{
+    int mpi_errno = MPI_SUCCESS;
+    struct MPIDU_Sched_entry *e = NULL;
+
+    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    e->type = MPIDU_SCHED_ENTRY_SEND;
+    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
+    e->is_barrier = FALSE;
+
+    e->u.send.buf = buf;
+    e->u.send.count = count;
+    e->u.send.datatype = datatype;
+    e->u.send.dest = dest;
+    e->u.send.sreq = NULL; /* will be populated by _start_entry */
+    e->u.send.comm = comm;
+
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPID_Sched_recv
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_Sched_recv(void *buf, int count, MPI_Datatype datatype, int src, MPID_Comm *comm, MPID_Sched_t s)
+{
+    int mpi_errno = MPI_SUCCESS;
+    struct MPIDU_Sched_entry *e = NULL;
+
+    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    e->type = MPIDU_SCHED_ENTRY_RECV;
+    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
+    e->is_barrier = FALSE;
+
+    e->u.recv.buf = buf;
+    e->u.recv.count = count;
+    e->u.recv.datatype = datatype;
+    e->u.recv.src = src;
+    e->u.recv.rreq = NULL; /* will be populated by _start_entry */
+    e->u.recv.comm = comm;
+
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPID_Sched_reduce
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPID_Sched_reduce(void *inbuf, void *inoutbuf, int count, MPI_Datatype datatype, MPI_Op op, MPID_Sched_t s)
+{
+    int mpi_errno = MPI_SUCCESS;
+    struct MPIDU_Sched_entry *e = NULL;
+    struct MPIDU_Sched_reduce *reduce = NULL;
+
+    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    e->type = MPIDU_SCHED_ENTRY_REDUCE;
+    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
+    e->is_barrier = FALSE;
+    reduce = &e->u.reduce;
+
+    reduce->inbuf = inbuf;
+    reduce->inoutbuf = inoutbuf;
+    reduce->count = count;
+    reduce->datatype = datatype;
+    reduce->op = op;
+
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPID_Sched_copy
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+/* packing/unpacking can be accomplished by passing MPI_PACKED as either intype
+ * or outtype */
+int MPID_Sched_copy(void *inbuf,  int incount,  MPI_Datatype intype,
+                    void *outbuf, int outcount, MPI_Datatype outtype, MPID_Sched_t s)
+{
+    int mpi_errno = MPI_SUCCESS;
+    struct MPIDU_Sched_entry *e = NULL;
+    struct MPIDU_Sched_copy *copy = NULL;
+
+    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    e->type = MPIDU_SCHED_ENTRY_COPY;
+    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
+    e->is_barrier = FALSE;
+    copy = &e->u.copy;
+
+    copy->inbuf = inbuf;
+    copy->incount = incount;
+    copy->intype = intype;
+    copy->outbuf = outbuf;
+    copy->outcount = outcount;
+    copy->outtype = outtype;
+
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPID_Sched_barrier
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+/* require that all previously added ops are complete before subsequent ops
+ * may begin to execute */
+int MPID_Sched_barrier(MPID_Sched_t s)
+{
+    int mpi_errno = MPI_SUCCESS;
+
+    /* mark the previous entry as a barrier unless we're at the beginning, which
+     * would be a pointless barrier */
+    if (s->num_entries > 0) {
+        s->entries[s->num_entries-1].is_barrier = TRUE;
+    }
+
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
+}
+
+#undef FUNCNAME
+#define FUNCNAME MPID_Sched_cb
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+/* buffer management, fancy reductions, etc */
+int MPID_Sched_cb(MPID_Sched_cb_t *cb_p, void *cb_state, MPID_Sched_t s)
+{
+    int mpi_errno = MPI_SUCCESS;
+    struct MPIDU_Sched_entry *e = NULL;
+    struct MPIDU_Sched_cb *cb = NULL;
+
+    mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e);
+    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+
+    e->type = MPIDU_SCHED_ENTRY_CB;
+    e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED;
+    e->is_barrier = FALSE;
+    cb = &e->u.cb;
+
+    cb->cb_p = cb_p;
+    cb->cb_state = cb_state;
+
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
+}
+
+
+/* returns TRUE in (*made_progress) if any of the outstanding schedules in state completed */
+#undef FUNCNAME
+#define FUNCNAME MPIDU_Sched_progress_state
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made_progress)
+{
+    int mpi_errno = MPI_SUCCESS;
+    int i;
+    struct MPIDU_Sched *s;
+    struct MPIDU_Sched *tmp;
+
+    if (made_progress)
+        *made_progress = FALSE;
+
+    DL_FOREACH_SAFE(state->head, s, tmp) {
+        dprintf(stderr, "making progress on s=%p\n", s);
+        /*MPIDU_Sched_dump(s);*/
+
+        for (i = s->idx; i < s->num_entries; ++i) {
+            struct MPIDU_Sched_entry *e = &s->entries[i];
+
+            switch (e->type) {
+                case MPIDU_SCHED_ENTRY_SEND:
+                    if (e->u.send.sreq != NULL && MPID_Request_is_complete(e->u.send.sreq)) {
+                        dprintf(stderr, "completed SEND entry %d, sreq=%p\n", i, e->u.send.sreq);
+                        e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
+                        MPID_Request_release(e->u.send.sreq);
+                        e->u.send.sreq = NULL;
+                    }
+                    break;
+                case MPIDU_SCHED_ENTRY_RECV:
+                    if (e->u.recv.rreq != NULL && MPID_Request_is_complete(e->u.recv.rreq)) {
+                        dprintf(stderr, "completed RECV entry %d, rreq=%p\n", i, e->u.recv.rreq);
+                        e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE;
+                        MPID_Request_release(e->u.recv.rreq);
+                        e->u.recv.rreq = NULL;
+                    }
+                    break;
+                default:
+                    /* all other entry types don't have any sub-requests that
+                     * need to be checked */
+                    break;
+            }
+
+            if (i == s->idx && e->status == MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
+                ++s->idx;
+                if (e->is_barrier) {
+                    dprintf(stderr, "completed barrier in entry %d\n", i);
+                    /* post/perform the next round of operations */
+                    mpi_errno = MPIDU_Sched_continue(s);
+                    if (mpi_errno) MPIU_ERR_POP(mpi_errno);
+                }
+            }
+            else if (e->is_barrier && e->status != MPIDU_SCHED_ENTRY_STATUS_COMPLETE) {
+                /* don't process anything after this barrier entry */
+                break;
+            }
+        }
+
+        if (s->idx == s->num_entries) {
+            dprintf(stderr, "completing and dequeuing s=%p r=%p\n", s, s->req);
+
+            /* dequeue this schedule from the state, it's complete */
+            DL_DELETE(state->head, s);
+
+            /* TODO refactor into a sched_complete routine? */
+            MPID_REQUEST_SET_COMPLETED(s->req);
+            MPID_Request_release(s->req);
+            s->req = NULL;
+            MPIU_Free(s->entries);
+            MPIU_Free(s);
+
+            if (made_progress)
+                *made_progress = TRUE;
+        }
+    }
+
+fn_exit:
+    return mpi_errno;
+fn_fail:
+    goto fn_exit;
+}
+
+/* returns TRUE in (*made_progress) if any of the outstanding schedules completed */
+#undef FUNCNAME
+#define FUNCNAME MPIDU_Sched_progress
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+int MPIDU_Sched_progress(int *made_progress)
+{
+    return MPIDU_Sched_progress_state(&all_schedules, made_progress);
+}
+
+static const char *entry_to_str(enum MPIDU_Sched_entry_type type) ATTRIBUTE((unused,used));
+static const char *entry_to_str(enum MPIDU_Sched_entry_type type)
+{
+    switch(type) {
+        case MPIDU_SCHED_ENTRY_SEND:
+            return "SEND";
+        case MPIDU_SCHED_ENTRY_RECV:
+            return "RECV";
+        case MPIDU_SCHED_ENTRY_REDUCE:
+            return "REDUCE";
+        case MPIDU_SCHED_ENTRY_COPY:
+            return "COPY";
+        case MPIDU_SCHED_ENTRY_NOP:
+            return "NOP";
+        case MPIDU_SCHED_ENTRY_CB:
+            return "CB";
+        default:
+            return "(out of range)";
+    }
+}
+
+/* utility function for debugging, dumps the given schedule object to fh */
+#undef FUNCNAME
+#define FUNCNAME MPIDU_Sched_dump_fh
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+void MPIDU_Sched_dump_fh(struct MPIDU_Sched *s, FILE *fh)
+{
+    int i;
+    dprintf(fh, "--------------------------------\n");
+    dprintf(fh, "s=%p\n", s);
+    if (s) {
+        dprintf(fh, "s->size=%zd\n", s->size);
+        dprintf(fh, "s->idx=%zd\n", s->idx);
+        dprintf(fh, "s->num_entries=%d\n", s->num_entries);
+        dprintf(fh, "s->tag=%d\n", s->tag);
+        dprintf(fh, "s->req=%p\n", s->req);
+        dprintf(fh, "s->entries=%p\n", s->entries);
+        for (i = 0; i < s->num_entries; ++i) {
+            dprintf(fh, "&s->entries[%d]=%p\n", i, &s->entries[i]);
+            dprintf(fh, "s->entries[%d].type=%s\n", i, entry_to_str(s->entries[i].type));
+            dprintf(fh, "s->entries[%d].status=%d\n", i, s->entries[i].status);
+            dprintf(fh, "s->entries[%d].is_barrier=%s\n", i, (s->entries[i].is_barrier ? "TRUE" : "FALSE"));
+        }
+    }
+    dprintf(fh, "--------------------------------\n");
+    /*
+    dprintf(fh, "s->next=%p\n", s->next);
+    dprintf(fh, "s->prev=%p\n", s->prev);
+    */
+}
+
+/* utility function for debugging, dumps the given schedule object to stderr */
+#undef FUNCNAME
+#define FUNCNAME MPIDU_Sched_dump
+#undef FCNAME
+#define FCNAME MPIU_QUOTE(FUNCNAME)
+void MPIDU_Sched_dump(struct MPIDU_Sched *s)
+{
+    MPIDU_Sched_dump_fh(s, stderr);
+}
+

Added: mpich2/trunk/src/mpid/common/sched/mpid_sched.h
===================================================================
--- mpich2/trunk/src/mpid/common/sched/mpid_sched.h	                        (rev 0)
+++ mpich2/trunk/src/mpid/common/sched/mpid_sched.h	2011-01-20 22:08:58 UTC (rev 7784)
@@ -0,0 +1,114 @@
+/* -*- Mode: c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ *  (C) 2010 by Argonne National Laboratory.
+ *      See COPYRIGHT in top-level directory.
+ */
+
+/* this file should be included by the using device's mpidimpl.h */
+
+#ifndef MPID_SCHED_H_INCLUDED
+#define MPID_SCHED_H_INCLUDED
+
+/* FIXME open questions:
+ * - Should the schedule hold a pointer to the nbc request and the nbc request
+ *   hold a pointer to the schedule?  This could cause MT issues.
+ */
+
+
+enum MPIDU_Sched_entry_type {
+    MPIDU_SCHED_ENTRY_INVALID_LB = 0,
+    MPIDU_SCHED_ENTRY_SEND,
+    MPIDU_SCHED_ENTRY_RECV,
+    MPIDU_SCHED_ENTRY_REDUCE,
+    MPIDU_SCHED_ENTRY_COPY,
+    MPIDU_SCHED_ENTRY_NOP,
+    MPIDU_SCHED_ENTRY_CB,
+    /* TODO remove these if they aren't needed
+    MPIDU_SCHED_ENTRY_TRANSPACK,
+    */
+    MPIDU_SCHED_ENTRY_INVALID_UB
+};
+
+struct MPIDU_Sched_send {
+    void *buf;
+    int count;
+    MPI_Datatype datatype;
+    int dest;
+    MPID_Comm *comm;
+    MPID_Request *sreq;
+};
+
+struct MPIDU_Sched_recv {
+    void *buf;
+    int count;
+    MPI_Datatype datatype;
+    int src;
+    MPID_Comm *comm;
+    MPID_Request *rreq;
+};
+
+struct MPIDU_Sched_reduce {
+    void *inbuf;
+    void *inoutbuf;
+    int count;
+    MPI_Datatype datatype;
+    MPI_Op op;
+};
+
+struct MPIDU_Sched_copy {
+    void *inbuf;
+    int incount;
+    MPI_Datatype intype;
+    void *outbuf;
+    int outcount;
+    MPI_Datatype outtype;
+};
+
+/* nop entries have no args, so no structure is needed */
+
+struct MPIDU_Sched_cb {
+    MPID_Sched_cb_t *cb_p;
+    void *cb_state;
+};
+
+enum MPIDU_Sched_entry_status {
+    MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED = 0,
+    MPIDU_SCHED_ENTRY_STATUS_STARTED,
+    MPIDU_SCHED_ENTRY_STATUS_COMPLETE,
+    MPIDU_SCHED_ENTRY_STATUS_FAILED, /* indicates a failure occurred while executing the entry */
+    MPIDU_SCHED_ENTRY_STATUS_INVALID /* indicates an invalid entry, or invalid status value */
+};
+
+/* Use a tagged union for schedule entries.  Not always space optimal, but saves
+ * lots of error-prone pointer arithmetic and makes scanning the schedule easy. */
+struct MPIDU_Sched_entry {
+    enum MPIDU_Sched_entry_type type;
+    enum MPIDU_Sched_entry_status status;
+    int is_barrier;
+    union {
+        struct MPIDU_Sched_send send;
+        struct MPIDU_Sched_recv recv;
+        struct MPIDU_Sched_reduce reduce;
+        struct MPIDU_Sched_copy copy;
+        /* nop entries have no args */
+        struct MPIDU_Sched_cb cb;
+    } u;
+};
+
+struct MPIDU_Sched {
+    size_t size; /* capacity (in entries) of the entries array */
+    size_t idx;  /* index into entries array of first yet-outstanding entry */
+    int num_entries; /* number of populated entries, num_entries <= size */
+    int tag;
+    MPID_Request *req; /* really needed? could cause MT problems... */
+    struct MPIDU_Sched_entry *entries;
+
+    struct MPIDU_Sched *next; /* linked-list next pointer */
+    struct MPIDU_Sched *prev; /* linked-list next pointer */
+};
+
+/* prototypes */
+int MPIDU_Sched_progress(int *made_progress);
+int MPIDU_Sched_are_pending(void);
+
+#endif /* !defined(MPID_SCHED_H_INCLUDED) */

Added: mpich2/trunk/src/mpid/common/sched/mpid_sched_pre.h
===================================================================
--- mpich2/trunk/src/mpid/common/sched/mpid_sched_pre.h	                        (rev 0)
+++ mpich2/trunk/src/mpid/common/sched/mpid_sched_pre.h	2011-01-20 22:08:58 UTC (rev 7784)
@@ -0,0 +1,16 @@
+/* -*- Mode: c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
+/*
+ *  (C) 2010 by Argonne National Laboratory.
+ *      See COPYRIGHT in top-level directory.
+ */
+
+/* this file should be included by the using device's mpidpre.h */
+
+#ifndef MPID_SCHED_PRE_H_INCLUDED
+#define MPID_SCHED_PRE_H_INCLUDED
+
+#define MPID_SCHED_NULL (NULL)
+struct MPIDU_Sched; /* forward decl, see mpidimpl.h for actual decl */
+typedef struct MPIDU_Sched * MPID_Sched_t;
+
+#endif /* !defined(MPID_SCHED_PRE_H_INCLUDED) */



More information about the mpich2-commits mailing list