[MOAB-dev] r3000 - MOAB/trunk/parallel

tautges at mcs.anl.gov tautges at mcs.anl.gov
Mon Jul 13 17:40:04 CDT 2009


Author: tautges
Date: 2009-07-13 17:40:04 -0500 (Mon, 13 Jul 2009)
New Revision: 3000

Modified:
   MOAB/trunk/parallel/MBParallelComm.cpp
   MOAB/trunk/parallel/MBParallelComm.hpp
   MOAB/trunk/parallel/mbparallelcomm_test.cpp
Log:
Changes to ghost exchange local message passing to implement Jason's
idea (send size in first message, which indicates if and how big 2nd
message needs to be).

Fix bug reported by Lukasz Kaczmarczyk regarding ghost exchange with
tet mesh.

Passes all tests except parallel_hdf5_test, which was failing before.

Specific changes:

mbparallelcomm_test: changing default for with_ghosts back to 1, so we
get ghosts by default.

MBParallelComm: 
- changing INITIAL_BUFF_SIZE to unsigned
- added MPE states, #ifdef'd by DEBUG_MPE
- fixed bug where during interface entity exchange, was creating
new entities when existing one wasn't found; proper behavior is to not
create those entities
- changed sending of messages during ghost exchange to always include
message size as first datum in message; for large messages, this tells
receiving processor to always post a 2nd receive, and tells how big
that receive should be; allows us to use Isend on first message too; uses
different message type for 2nd message



Modified: MOAB/trunk/parallel/MBParallelComm.cpp
===================================================================
--- MOAB/trunk/parallel/MBParallelComm.cpp	2009-07-13 22:37:33 UTC (rev 2999)
+++ MOAB/trunk/parallel/MBParallelComm.cpp	2009-07-13 22:40:04 UTC (rev 3000)
@@ -41,9 +41,17 @@
 #include "mpi.h"
 #endif
 
-const int INITIAL_BUFF_SIZE = 1024;
+const unsigned int INITIAL_BUFF_SIZE = 1024;
 const int MAX_BCAST_SIZE = (1<<28);
 
+#define DEBUG_MPE
+#ifdef DEBUG_MPE
+#include "mpe.h"
+int IFACE_START, IFACE_END;
+int GHOST_START, GHOST_END;
+int SHAREDV_START, SHAREDV_END;
+int RESOLVE_START, RESOLVE_END;
+#endif
 #undef DEBUG_COMM
 #undef DEBUG_PACKING
 #ifdef DEBUG_PACKING
@@ -108,8 +116,11 @@
       if (_new_size > buff_vec.size()) {               \
         buff_vec.resize(1.5*_new_size);            \
         buff_ptr = &buff_vec[_new_size-(addl_space)];} }
-    
 
+#define INIT_BUFFER(_buff_vec, _buff_ptr) \
+    _buff_vec.reserve(INITIAL_BUFF_SIZE); _buff_vec.resize(sizeof(int)); \
+    _buff_ptr = &_buff_vec[sizeof(int)]
+
 #define RANGE_SIZE(rng) (2*sizeof(MBEntityHandle)*num_subranges(rng)+sizeof(int))
 #define RR(a) if (MB_SUCCESS != result) {\
           dynamic_cast<MBCore*>(mbImpl)->get_error_handler()->set_last_error(a);\
@@ -147,11 +158,12 @@
 
 
 enum MBMessageTag {MB_MESG_ANY=MPI_ANY_TAG, 
-                   MB_MESG_SIZE,
                    MB_MESG_ENTS,
+                   MB_MESG_ENTS_SECOND,
                    MB_MESG_REMOTE_HANDLES,
-                   MB_MESG_SHAREDHPS,
-                   MB_MESG_TAGS };
+                   MB_MESG_REMOTE_HANDLES_SECOND,
+                   MB_MESG_TAGS,
+                   MB_MESG_TAGS_SECOND};
     
 MBParallelComm::MBParallelComm(MBInterface *impl, MPI_Comm comm, int* id ) 
         : mbImpl(impl), procConfig(comm),
@@ -346,54 +358,83 @@
 
 MBErrorCode MBParallelComm::send_buffer(const unsigned int to_proc,
                                         const unsigned char *send_buff,
-                                        const unsigned int buff_size,
-                                        const int msg_type,
-                                        MPI_Request &send_req) 
+                                        unsigned int buff_size,
+                                        int mesg_tag,
+                                        MPI_Request &send_req1,
+                                        MPI_Request &send_req2) 
 {
-#ifndef USE_MPI
-  return MB_FAILURE;
-#else
-
   MBErrorCode result = MB_SUCCESS;
   int success;
 
-    // if the message is large, send a first message to tell how large
-  if (INITIAL_BUFF_SIZE < (int)buff_size) {
-#ifdef DEBUG_COMM
-    std::cerr << "Sending MESG_SIZE " << buff_size << " to proc " << to_proc << std::endl;
-#endif    
-    int tmp_buff_size = -buff_size;
-    int success = MPI_Send(&tmp_buff_size, sizeof(int), MPI_UNSIGNED_CHAR, 
-                           to_proc, MB_MESG_SIZE, procConfig.proc_comm());
-    if (success != MPI_SUCCESS) return MB_FAILURE;
-  }
-    
+    // size goes on the front
+  *((int*)send_buff) = (int)buff_size;
+
     // send the buffer
 #ifdef DEBUG_COMM
-  std::cerr << "Isending " << buff_size << " bytes to proc " << to_proc 
-            << " with message type " << msg_type << std::endl;
+  std::cerr << "Isending " <<  std::min(buff_size, INITIAL_BUFF_SIZE)
+            << " bytes to proc " << to_proc 
+            << " with message tag " << mesg_tag << std::endl;
 #endif
-  success = MPI_Isend(const_cast<unsigned char*>(send_buff), buff_size, MPI_UNSIGNED_CHAR, to_proc, 
-                      msg_type, procConfig.proc_comm(), &send_req);
+  success = MPI_Isend(const_cast<unsigned char*>(send_buff), 
+                      std::min(buff_size, INITIAL_BUFF_SIZE),
+                      MPI_UNSIGNED_CHAR, to_proc, 
+                      mesg_tag, procConfig.proc_comm(), &send_req1);
   if (success != MPI_SUCCESS) return MB_FAILURE;
 
+  if (buff_size > INITIAL_BUFF_SIZE) {
+#ifdef DEBUG_COMM
+  std::cerr << "Isending " <<  buff_size-INITIAL_BUFF_SIZE
+            << " bytes to proc " << to_proc 
+            << " with message tag " << mesg_tag+1 << "(second)" << std::endl;
+#endif
+    success = MPI_Isend(const_cast<unsigned char*>(send_buff+INITIAL_BUFF_SIZE), 
+                        buff_size-INITIAL_BUFF_SIZE, 
+                        MPI_UNSIGNED_CHAR, to_proc, 
+                        mesg_tag+1, procConfig.proc_comm(), &send_req2);
+    if (success != MPI_SUCCESS) return MB_FAILURE;
+  }
+
   return result;
-#endif
 }
 
-MBErrorCode MBParallelComm::recv_size_buff(const int from_proc,
-                                           std::vector<unsigned char> &recv_buff,
-                                           MPI_Request &recv_req,
-                                           int mesg_tag) 
+MBErrorCode MBParallelComm::recv_buffer(int mesg_tag_expected,
+                                        const MPI_Status &mpi_status,
+                                        std::vector<unsigned char> &recv_buff,
+                                        MPI_Request &recv_req,
+                                        bool &done) 
 {
-    // use the received size to resize buffer, then post another irecv
-  recv_buff.resize(-(*((int*)&recv_buff[0])));
-  int success = MPI_Irecv(&recv_buff[0], recv_buff.size(), MPI_UNSIGNED_CHAR, from_proc, 
-                          mesg_tag, procConfig.proc_comm(), &recv_req);
-  if (MPI_SUCCESS != success) {
-    MBErrorCode result = MB_FAILURE;
-    RRA("Failed call to Irecv in recv_size_buff.");
+    // process a received message, possibly posting a receive for the 2nd part of
+    // that message
+    //
+    // if the message comes in with mesg_tag_expected, check the first integer for
+    // size; if size is INITIAL_BUFF_SIZE, post a receive for the 2nd message using
+    // the same request object, with an offset buffer; otherwise just signal that we're done
+  int success;
+  if (mpi_status.MPI_TAG == mesg_tag_expected && 
+      ((int*)&recv_buff[0])[0] >= (int)INITIAL_BUFF_SIZE) {
+      // verify that we received exactly the right number of bytes
+#ifndef NDEBUG
+    int this_count;
+    success = MPI_Get_count(const_cast<MPI_Status*>(&mpi_status), MPI_UNSIGNED_CHAR, 
+                            &this_count);
+    assert(MPI_SUCCESS == success && this_count == (int)INITIAL_BUFF_SIZE);
+#endif
+    
+      // need to resize to final size, then resubmit irecv pointing to the
+      // offset buffer position
+    recv_buff.resize(*((int*)&recv_buff[0]));
+    int success = MPI_Irecv(&recv_buff[INITIAL_BUFF_SIZE], recv_buff.size()-INITIAL_BUFF_SIZE, 
+                            MPI_UNSIGNED_CHAR, mpi_status.MPI_SOURCE, 
+                            mesg_tag_expected+1, procConfig.proc_comm(), &recv_req);
+    if (MPI_SUCCESS != success) {
+      MBErrorCode result = MB_FAILURE;
+      RRA("Failed call to Irecv in recv_buffer.");
+    }
+    done = false;
   }
+  else {
+    done = true;
+  }
   
   return MB_SUCCESS;
 }
@@ -1244,9 +1285,8 @@
         //=======================================
         // if we didn't find one, we'll have to create one
         //=======================================
-
       bool created_here = false;
-      if (!new_h) {
+      if (!new_h && !is_iface) {
         
         if (MBVERTEX == this_type) {
             // create a vertex
@@ -1283,11 +1323,14 @@
 
         // need to save entities found in order, for interpretation of
         // later parts of this message
-      if (!is_iface) msg_ents.push_back(new_h);
+      if (!is_iface) {
+        assert(new_h);
+        msg_ents.push_back(new_h);
+      }
 
       if (created_here) new_ents.insert(new_h);
 
-      if (store_remote_handles) {
+      if (new_h && store_remote_handles) {
         
           // update sharing data and pstatus, adjusting order if iface
         result = update_remote_data(new_h, &ps[0], &hs[0], num_ps, 
@@ -2451,6 +2494,12 @@
                                                 int resolve_dim,
                                                 int shared_dim) 
 {
+#ifdef DEBUG_MPE
+  define_mpe();
+
+  MPE_Log_event(RESOLVE_START, procConfig.proc_rank(), "Entering resolve_shared_ents.");
+#endif
+
   MBErrorCode result;
   if (debug) std::cerr << "Resolving shared entities." << std::endl;
 
@@ -2536,6 +2585,10 @@
   std::copy(skin_ents[0].begin(), skin_ents[0].end(), 
             std::back_inserter(handle_vec));
   
+#ifdef DEBUG_MPE
+  MPE_Log_event(SHAREDV_START, procConfig.proc_rank(), "Creating crystal router.");
+#endif
+
     // get a crystal router
   crystal_data *cd = procConfig.crystal_router();
 
@@ -2615,6 +2668,10 @@
                             proc_nranges, proc_verts);
   RRA("Trouble tagging shared verts.");
 
+#ifdef DEBUG_MPE
+  MPE_Log_event(SHAREDV_END, procConfig.proc_rank(), "Finished tag_shared_verts.");
+#endif
+
     // get entities shared by 1 or n procs
   result = tag_shared_ents(resolve_dim, shared_dim, skin_ents,
                            proc_nranges);
@@ -2658,10 +2715,30 @@
 
   gs_data_free(gsd);
 
+#ifdef DEBUG_MPE
+  MPE_Log_event(RESOLVE_END, procConfig.proc_rank(), "Exiting resolve_shared_ents.");
+#endif
+
     // done
   return result;
 }
 
+void MBParallelComm::define_mpe() 
+{
+#ifdef DEBUG_MPE
+    // define mpe states used for logging
+  int success;
+  MPE_Log_get_state_eventIDs( &IFACE_START, &IFACE_END);
+  MPE_Log_get_state_eventIDs( &GHOST_START, &GHOST_END);
+  MPE_Log_get_state_eventIDs( &SHAREDV_START, &SHAREDV_END);
+  MPE_Log_get_state_eventIDs( &RESOLVE_START, &RESOLVE_END);
+  success = MPE_Describe_state(IFACE_START, IFACE_END, "Resolve interface ents", "green");
+  success = MPE_Describe_state(GHOST_START, GHOST_END, "Exchange ghost ents", "red");
+  success = MPE_Describe_state(SHAREDV_START, SHAREDV_END, "Resolve interface vertices", "blue");
+  success = MPE_Describe_state(RESOLVE_START, RESOLVE_END, "Resolve shared ents", "purple");
+#endif
+}
+
 MBErrorCode MBParallelComm::resolve_shared_ents(MBParallelComm **pc, 
                                                 const unsigned int np, 
                                                 const int part_dim) 
@@ -3418,6 +3495,13 @@
                                                  bool store_remote_handles,
                                                  bool wait_all)
 {
+#ifdef DEBUG_MPE
+  if (!num_layers)
+    MPE_Log_event(IFACE_START, procConfig.proc_rank(), "Starting interface exchange.");
+  else
+    MPE_Log_event(GHOST_START, procConfig.proc_rank(), "Starting ghost exchange.");
+#endif
+
     // if we're only finding out about existing ents, we have to be storing
     // remote handles too
   assert(num_layers > 0 || store_remote_handles);
@@ -3437,15 +3521,15 @@
     // post ghost irecv's for ghost entities from all communicating procs
     //===========================================
     // index reqs the same as buffer/sharing procs indices
-  std::vector<MPI_Request> recv_reqs(buffProcs.size(), MPI_REQUEST_NULL),
-      send_reqs(buffProcs.size(), MPI_REQUEST_NULL);
+  std::vector<MPI_Request> recv_reqs(buffProcs.size(), MPI_REQUEST_NULL);
   std::vector<unsigned int>::iterator proc_it;
   int ind;
+  std::fill(sendReqs, sendReqs+2*buffProcs.size(), MPI_REQUEST_NULL);
   for (ind = 0, proc_it = buffProcs.begin(); 
        proc_it != buffProcs.end(); proc_it++, ind++) {
     success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(), 
                         MPI_UNSIGNED_CHAR, buffProcs[ind],
-                        MB_MESG_ANY, procConfig.proc_comm(), 
+                        MB_MESG_ENTS, procConfig.proc_comm(), 
                         &recv_reqs[ind]);
     if (success != MPI_SUCCESS) {
       result = MB_FAILURE;
@@ -3466,12 +3550,11 @@
     //===========================================
     // pack and send ents from this proc to others
     //===========================================
-    // initialize sendReqs
   for (ind = 0, proc_it = buffProcs.begin(); 
        proc_it != buffProcs.end(); proc_it++, ind++) {
 
-      // buff_ptr points to the END (one past last occupied byte) of buffer
-    buff_ptr = &ownerSBuffs[ind][0];
+      // reserve space on front for size and for initial buff size
+    INIT_BUFFER(ownerSBuffs[ind], buff_ptr);
 
       // entities
     result = pack_entities(sent_ents[ind], ownerSBuffs[ind], buff_ptr,
@@ -3479,10 +3562,10 @@
                            &entprocs, &allsent); 
     RRA("Packing entities failed.");
 
-      // now we're ready to send the buffer
+      // send the buffer (buffer size saved in send_buffer)
     result = send_buffer(*proc_it, &ownerSBuffs[ind][0], 
                          buff_ptr-&ownerSBuffs[ind][0], MB_MESG_ENTS,
-                         send_reqs[ind]);
+                         sendReqs[ind], sendReqs[ind+buffProcs.size()]);
     RRA("Failed to Isend in ghost exchange.");
   }
 
@@ -3516,49 +3599,36 @@
       
       std::cerr << "Received from " << status[0].MPI_SOURCE
                 << ": count = " << this_count << ", tag = " << status[0].MPI_TAG;
-      if (MB_MESG_SIZE == status[0].MPI_TAG) std::cerr << "; new_size = " << *((int*)&ghostRBuffs[ind][0]);
+      if (MB_MESG_ENTS+1 == status[0].MPI_TAG) std::cerr << " (second)";
       std::cerr << std::endl;
     }
 #endif    
     
       // ok, received something; decrement incoming counter
     num_incoming--;
+
+    bool done = false;
     
-      // branch on message type
-    if (MB_MESG_SIZE == status[0].MPI_TAG) {
-        // incoming message just has size; resize buffer and re-call recv,
-        // then re-increment incoming count
-      int new_size = *((int*)&ghostRBuffs[ind][0]);
-      if (0 != new_size) {
-          // assert(0 > new_size);
-        if (0 <= new_size) std::cerr << "Bad size received in MESG_SIZE message, size " 
-                                     << new_size << std::endl;
-        result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
-                                MB_MESG_ENTS);
-        RRA("Failed to resize recv buffer.");
-        num_incoming++;
-      }
-    }
-    else if (MB_MESG_ENTS == status[0].MPI_TAG) {
-      
-        // incoming ghost entities; unpack; returns entities received
-        // both from sending proc and from owning proc (which may be different)
-      unsigned char *buff_ptr = &ghostRBuffs[ind][0];
+    result = recv_buffer(MB_MESG_ENTS, status[0],
+                         ghostRBuffs[ind], recv_reqs[ind], done);
+    RRA("Failed to receive entities.");
+    
+    if (done) {
+      unsigned char *buff_ptr = &ghostRBuffs[ind][sizeof(int)];
       result = unpack_entities(buff_ptr,
                                store_remote_handles, ind, is_iface,
                                L1hloc, L1hrem, L1p, L2hloc, L2hrem, L2p, new_ents);
       RRA("Failed to unpack entities.");
     }
     else {
-      assert(false);
-      return MB_FAILURE;
+      num_incoming++;
+      continue;
     }
   }
 
     // add requests for any new addl procs
   if (recv_reqs.size() != buffProcs.size()) {
-    recv_reqs.resize(buffProcs.size());
-    send_reqs.resize(buffProcs.size());
+    recv_reqs.resize(buffProcs.size(), MPI_REQUEST_NULL);
   }
     
   if (is_iface) {
@@ -3568,6 +3638,11 @@
     result = check_all_shared_handles();
     RRA("Failed check on all shared handles.");
 #endif
+
+#ifdef DEBUG_MPE
+      MPE_Log_event(IFACE_END, procConfig.proc_rank(), "Ending interface exchange.");
+#endif
+
     return MB_SUCCESS;
   }
   
@@ -3579,7 +3654,7 @@
       // skip if iface layer and lower-rank proc
     success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(), 
                         MPI_UNSIGNED_CHAR, buffProcs[ind],
-                        MB_MESG_ANY, procConfig.proc_comm(), 
+                        MB_MESG_REMOTE_HANDLES, procConfig.proc_comm(), 
                         &recv_reqs[ind]);
     if (success != MPI_SUCCESS) {
       result = MB_FAILURE;
@@ -3593,14 +3668,17 @@
     //===========================================
   for (ind = 0, proc_it = buffProcs.begin(); 
        proc_it != buffProcs.end(); proc_it++, ind++) {
-      // skip if iface layer and higher-rank proc
-    buff_ptr = &ownerSBuffs[ind][0];
+
+      // reserve space on front for size and for initial buff size
+    INIT_BUFFER(ownerSBuffs[ind], buff_ptr);
+
     result = pack_remote_handles(L1hloc[ind], L1hrem[ind], L1p[ind], *proc_it,
-                                   ownerSBuffs[ind], buff_ptr);
+                                 ownerSBuffs[ind], buff_ptr);
     RRA("Failed to pack remote handles.");
     result = send_buffer(buffProcs[ind], &ownerSBuffs[ind][0], 
                          buff_ptr - &ownerSBuffs[ind][0], 
-                         MB_MESG_REMOTE_HANDLES, send_reqs[ind]);
+                         MB_MESG_REMOTE_HANDLES, 
+                         sendReqs[ind], sendReqs[ind+buffProcs.size()]);
     RRA("Failed to send remote handles.");
   }
   
@@ -3626,31 +3704,34 @@
       
       std::cerr << "Received from " << status[0].MPI_SOURCE
                 << ": count = " << this_count << ", tag = " << status[0].MPI_TAG;
-      if (MB_MESG_SIZE == status[0].MPI_TAG) std::cerr << "; new_size = " << *((int*)&ghostRBuffs[ind][0]);
+      if (MB_MESG_REMOTE_HANDLES_SECOND == status[0].MPI_TAG) 
+        std::cerr << " (second)";
       std::cerr << std::endl;
     }
 #endif    
-
-      // branch on message type
-    if (MB_MESG_SIZE == status[0].MPI_TAG) {
-        // incoming message just has size; resize buffer and re-call recv,
-        // then re-increment incoming count
-      int new_size = *((int*)&ghostRBuffs[ind][0]);
-      assert(0 > new_size);
-      result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
-                              MB_MESG_REMOTE_HANDLES);
-      RRA("Failed to resize recv buffer.");
-      num_incoming++;
-    }
-    else if (MB_MESG_REMOTE_HANDLES == status[0].MPI_TAG) {
+    
+    bool done;
+    result = recv_buffer(MB_MESG_REMOTE_HANDLES, status[0], 
+                         ghostRBuffs[ind], recv_reqs[ind], done);
+    RRA("Failed to resize recv buffer.");
+    if (done) {
         // incoming remote handles
-      result = unpack_remote_handles(buffProcs[ind], &ghostRBuffs[ind][0],
+      buff_ptr = &ghostRBuffs[ind][sizeof(int)];
+      result = unpack_remote_handles(buffProcs[ind], buff_ptr,
                                      L2hloc, L2hrem, L2p);
       RRA("Failed to unpack remote handles.");
     }
-    else assert(false);
+    else {
+      num_incoming++;
+      continue;
+    }
   }
     
+#ifdef DEBUG_MPE
+      MPE_Log_event(GHOST_END, procConfig.proc_rank(), 
+                    "Ending ghost exchange (still doing checks).");
+#endif
+
 #ifndef NDEBUG
   result = check_sent_ents(allsent);
   RRA("Failed check on shared entities.");
@@ -4084,11 +4165,13 @@
   
     // pack and send tags from this proc to others
     // make sendReqs vector to simplify initialization
-  std::fill(sendReqs, sendReqs+MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+  std::fill(sendReqs, sendReqs+2*buffProcs.size(), MPI_REQUEST_NULL);
 
     // take all shared entities if incoming list is empty
   if (entities.empty()) entities = sharedEnts;
   
+  unsigned char *buff_ptr;
+
   for (ind = 0, sit = buffProcs.begin(); sit != buffProcs.end(); sit++, ind++) {
     
     MBRange tag_ents = entities;
@@ -4119,7 +4202,9 @@
     }
     
       // pack the data
-    unsigned char *buff_ptr = &ownerSBuffs[ind][0];
+      // reserve space on front for size and for initial buff size
+    INIT_BUFFER(ownerSBuffs[ind], buff_ptr);
+    
     result = pack_tags(tag_ents,
                        src_tags, dst_tags, tag_ranges, 
                        ownerSBuffs[ind], buff_ptr, true, *sit);
@@ -4128,7 +4213,7 @@
       // now send it
     result = send_buffer(*sit, &ownerSBuffs[ind][0], 
                          buff_ptr-&ownerSBuffs[ind][0], 
-                         MB_MESG_TAGS, sendReqs[ind]);
+                         MB_MESG_TAGS, sendReqs[ind], sendReqs[ind+buffProcs.size()]);
     RRA("Failed to send buffer.");
                          
   }
@@ -4148,40 +4233,26 @@
       // ok, received something; decrement incoming counter
     num_incoming--;
     
-    int new_size;
-    unsigned char *buff_ptr;
+    bool done;
     MBRange dum_range;
     
-      // branch on message type
-    switch (status.MPI_TAG) {
-      case MB_MESG_SIZE:
-          // incoming message just has size; resize buffer and re-call recv,
-          // then re-increment incoming count
-        assert(ind < MAX_SHARING_PROCS);
-        new_size = *((int*)&ghostRBuffs[ind][0]);
-        assert(0 > new_size);
-        result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
-                                MB_MESG_TAGS);
-        RRA("Failed to resize recv buffer.");
-        num_incoming++;
-        break;
-      case MB_MESG_TAGS:
-          // incoming ghost entities; process
-          buff_ptr = &ghostRBuffs[ind][0];
-          result = unpack_tags(buff_ptr, dum_range, true,
-                               buffProcs[ind]);
-        RRA("Failed to recv-unpack-tag message.");
-        break;
-      default:
-        result = MB_FAILURE;
-        RRA("Failed to get message of correct type in exch_tags.");
-        break;
+    result = recv_buffer(MB_MESG_TAGS, status,
+                         ghostRBuffs[ind], recv_reqs[ind], done);
+    RRA("Failed to resize recv buffer.");
+    if (done) {
+      buff_ptr = &ghostRBuffs[ind][sizeof(int)];
+      result = unpack_tags(buff_ptr, dum_range, true, buffProcs[ind]);
+      RRA("Failed to recv-unpack-tag message.");
     }
+    else {
+      num_incoming++;
+      continue;
+    }
   }
   
     // ok, now wait
   MPI_Status status[MAX_SHARING_PROCS];
-  success = MPI_Waitall(MAX_SHARING_PROCS, &sendReqs[0], status);
+  success = MPI_Waitall(2*buffProcs.size(), &sendReqs[0], status);
   if (MPI_SUCCESS != success) {
     result = MB_FAILURE;
     RRA("Failure in waitall in tag exchange.");
@@ -5106,8 +5177,8 @@
   const int tag = 0x4A41534E;
   const MPI_Comm comm = procConfig.proc_comm();
   const int num_proc = buffProcs.size();
-  std::vector<MPI_Request> send_req(num_proc), recv_req(num_proc);
   const std::vector<int> procs( buffProcs.begin(), buffProcs.end() );
+  std::vector<MPI_Request> recv_req(buffProcs.size(), MPI_REQUEST_NULL);
   
     // set up to receive sizes
   std::vector<int> sizes_send(num_proc), sizes_recv(num_proc);
@@ -5120,9 +5191,10 @@
     // send sizes
   assert(num_proc == (int)send_data.size());
   
+  std::fill(sendReqs, sendReqs+buffProcs.size(), MPI_REQUEST_NULL);
   for (int i = 0; i < num_proc; ++i) {
     sizes_send[i] = send_data[i].size();
-    ierr = MPI_Isend( &sizes_send[i], 1, MPI_INT, buffProcs[i], tag, comm, &send_req[i] );
+    ierr = MPI_Isend( &sizes_send[i], 1, MPI_INT, buffProcs[i], tag, comm, &sendReqs[i] );
     if (ierr) 
       return MB_FILE_WRITE_ERROR;
   }
@@ -5134,7 +5206,7 @@
     return MB_FILE_WRITE_ERROR;
   
     // wait until all sizes are sent (clean up pending req's)
-  ierr = MPI_Waitall( num_proc, &send_req[0], &stat[0] );
+  ierr = MPI_Waitall( num_proc, &sendReqs[0], &stat[0] );
   if (ierr)
     return MB_FILE_WRITE_ERROR;
   
@@ -5154,7 +5226,7 @@
     ierr = MPI_Isend( &send_data[i][0], 
                       sizeof(SharedEntityData)*sizes_send[i], 
                       MPI_UNSIGNED_CHAR, 
-                      buffProcs[i], tag, comm, &send_req[i] );
+                      buffProcs[i], tag, comm, &sendReqs[i] );
     if (ierr) 
       return MB_FILE_WRITE_ERROR;
   }
@@ -5165,7 +5237,7 @@
     return MB_FILE_WRITE_ERROR;
   
     // wait until everything is sent to release send buffers
-  ierr = MPI_Waitall( num_proc, &send_req[0], &stat[0] );
+  ierr = MPI_Waitall( num_proc, &sendReqs[0], &stat[0] );
   if (ierr)
     return MB_FILE_WRITE_ERROR;
   

Modified: MOAB/trunk/parallel/MBParallelComm.hpp
===================================================================
--- MOAB/trunk/parallel/MBParallelComm.hpp	2009-07-13 22:37:33 UTC (rev 2999)
+++ MOAB/trunk/parallel/MBParallelComm.hpp	2009-07-13 22:40:04 UTC (rev 3000)
@@ -579,6 +579,8 @@
   
 private:
 
+  void define_mpe();
+
   MBErrorCode get_sent_ents(const bool is_iface,
                             const int bridge_dim, const int ghost_dim,
                             const int num_layers,
@@ -633,14 +635,16 @@
                           const unsigned char *send_buff,
                           const unsigned int buff_size,
                           const int msg_type,
-                          MPI_Request &send_req);
+                          MPI_Request &send_req1,
+                          MPI_Request &send_req2);
   
     //! use integer size in buffer to resize buffer, then post an
     //! Irecv to get message
-  MBErrorCode recv_size_buff(const int from_proc,
-                             std::vector<unsigned char> &recv_buff,
-                             MPI_Request &recv_req,
-                             int mesg_tag);
+  MBErrorCode recv_buffer(int mesg_tag_expected,
+                          const MPI_Status &mpi_status,
+                          std::vector<unsigned char> &recv_buff,
+                          MPI_Request &recv_req,
+                          bool &done);
   
     //! pack a range of entities with equal # verts per entity, along with
     //! the range on the sending proc

Modified: MOAB/trunk/parallel/mbparallelcomm_test.cpp
===================================================================
--- MOAB/trunk/parallel/mbparallelcomm_test.cpp	2009-07-13 22:37:33 UTC (rev 2999)
+++ MOAB/trunk/parallel/mbparallelcomm_test.cpp	2009-07-13 22:40:04 UTC (rev 3000)
@@ -88,7 +88,7 @@
     return 1;
   }
 
-  int npos = 1, tag_val, distrib, with_ghosts = 0, resolve_shared = 1, use_mpio = 0;
+  int npos = 1, tag_val, distrib, with_ghosts = 1, resolve_shared = 1, use_mpio = 0;
   const char *tag_name;
   std::vector<std::string> filenames;
   int parallel_option = 0;



More information about the moab-dev mailing list