[MOAB-dev] r2228 - MOAB/trunk/parallel

kraftche at mcs.anl.gov kraftche at mcs.anl.gov
Wed Nov 12 21:38:11 CST 2008


Author: kraftche
Date: 2008-11-12 21:38:11 -0600 (Wed, 12 Nov 2008)
New Revision: 2228

Modified:
   MOAB/trunk/parallel/MBParallelComm.cpp
   MOAB/trunk/parallel/MBParallelComm.hpp
Log:
o Add function to get MBParallelComm ID
o Add function to get all MBParallelComm instances
o Add more precise exchange_tags function
o Add const forms of functions to get interface and part sets
o Add function to get part handle given part id
o Fix incorrect signature for get_part_neighbor_ids
o Add update_shared_mesh function
o Make the PARALLEL_PARTITON_TAG_NAME tag an int rather than 64 handles
o Fix bug in collective_sync_parts
o Fix bug in get_part_neighbor_ids


Modified: MOAB/trunk/parallel/MBParallelComm.cpp
===================================================================
--- MOAB/trunk/parallel/MBParallelComm.cpp	2008-11-13 02:45:19 UTC (rev 2227)
+++ MOAB/trunk/parallel/MBParallelComm.cpp	2008-11-13 03:38:11 UTC (rev 2228)
@@ -155,9 +155,9 @@
     retval = MPI_Init(&argc, &argv);
   }
 
-  int myid = add_pcomm(this);
+  pcommID = add_pcomm(this);
   if (id)
-    *id = myid;
+    *id = pcommID;
 }
 
 MBParallelComm::MBParallelComm(MBInterface *impl,
@@ -179,9 +179,9 @@
     retval = MPI_Init(&argc, &argv);
   }
 
-  int myid = add_pcomm(this);
+  pcommID = add_pcomm(this);
   if (id)
-    *id = myid;
+    *id = pcommID;
 }
 
 MBParallelComm::~MBParallelComm() 
@@ -3535,6 +3535,253 @@
   return MB_SUCCESS;
 }
 
+MBErrorCode MBParallelComm::exchange_tags( MBTag tag, const MBRange& entities )
+{
+  MBErrorCode result;
+  int success;
+
+    // get all procs interfacing to this proc
+  std::set<unsigned int> exch_procs;
+  result = get_comm_procs(exch_procs);  
+
+    // post ghost irecv's for all interface procs
+    // index greqs the same as buffer/sharing procs indices
+  std::vector<MPI_Request> recv_reqs(MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+  std::vector<MPI_Status> gstatus(MAX_SHARING_PROCS);
+  std::set<unsigned int>::iterator sit;
+  for (sit = exch_procs.begin(); sit != exch_procs.end(); sit++) {
+    int ind = get_buffers(*sit);
+    success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(), 
+                        MPI_UNSIGNED_CHAR, *sit,
+                        MB_MESG_ANY, procConfig.proc_comm(), 
+                        &recv_reqs[ind]);
+    if (success != MPI_SUCCESS) {
+      result = MB_FAILURE;
+      RRA("Failed to post irecv in ghost exchange.");
+    }
+  }
+  
+    // group entities by owner
+  std::map<int,MBRange> proc_ents;
+  for (MBRange::const_iterator i = entities.begin(); i != entities.end(); ++i) {
+    int owner;
+    result = get_owner( *i, owner );
+    RRA("Failed to get entity owner.");
+    proc_ents[owner].insert( *i );
+  }
+  
+    // pack and send tags from this proc to others
+    // make sendReqs vector to simplify initialization
+  std::fill(sendReqs, sendReqs+MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+  std::map<unsigned int,MBRange>::const_iterator mit;
+  
+  for (sit = exch_procs.begin(); sit != exch_procs.end(); sit++) {
+    int ind = get_buffers(*sit);
+    
+    MBRange& tag_ents = proc_ents[*sit];
+    std::vector<MBTag> tags(1); tags[0] = tag;
+    std::vector<MBRange> tag_ranges(1); tag_ranges[0].swap(tag_ents);
+    
+      // count first
+    int buff_size = 0;
+    unsigned char *buff_ptr = &ownerSBuffs[ind][0];
+    MBRange::iterator rit = tag_ranges[0].begin();
+    result = pack_tags(tag_ranges[0], rit, tag_ranges[0],
+                       buff_ptr, buff_size, true, true, *sit,
+                       tags, tag_ranges, false);
+    RRA("Failed to count buffer in pack_send_tag.");
+
+    result = pack_tags(tag_ranges[0], rit, tag_ranges[0],
+                       buff_ptr, buff_size, false, true, *sit,
+                       tags, tag_ranges, false);
+    RRA("Failed to pack buffer in pack_send_tag.");
+
+      // if the message is large, send a first message to tell how large
+    if (INITIAL_BUFF_SIZE < buff_size) {
+      int tmp_buff_size = -buff_size;
+      int success = MPI_Send(&tmp_buff_size, sizeof(int), MPI_UNSIGNED_CHAR, 
+                             *sit, MB_MESG_SIZE, procConfig.proc_comm());
+      if (success != MPI_SUCCESS) return MB_FAILURE;
+    }
+    
+      // send the buffer
+    success = MPI_Isend(&ownerSBuffs[ind][0], buff_size, MPI_UNSIGNED_CHAR, *sit, 
+                        MB_MESG_TAGS, procConfig.proc_comm(), &sendReqs[ind]);
+    if (success != MPI_SUCCESS) return MB_FAILURE;
+  }
+  
+    // receive/unpack tags
+  int num_incoming = exch_procs.size();
+  
+  while (num_incoming) {
+    int ind;
+    MPI_Status status;
+    success = MPI_Waitany(MAX_SHARING_PROCS, &recv_reqs[0], &ind, &status);
+    if (MPI_SUCCESS != success) {
+      result = MB_FAILURE;
+      RRA("Failed in waitany in ghost exchange.");
+    }
+    
+      // ok, received something; decrement incoming counter
+    num_incoming--;
+    
+    int new_size;
+    unsigned char *buff_ptr;
+    MBRange dum_range;
+    
+      // branch on message type
+    switch (status.MPI_TAG) {
+      case MB_MESG_SIZE:
+          // incoming message just has size; resize buffer and re-call recv,
+          // then re-increment incoming count
+        assert(ind < MAX_SHARING_PROCS);
+        new_size = *((int*)&ghostRBuffs[ind][0]);
+        assert(0 > new_size);
+        result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
+                                MB_MESG_TAGS);
+        RRA("Failed to resize recv buffer.");
+        num_incoming++;
+        break;
+      case MB_MESG_TAGS:
+          // incoming ghost entities; process
+          buff_ptr = &ghostRBuffs[ind][0];
+          result = unpack_tags(buff_ptr, dum_range, true,
+                               buffProcs[ind]);
+        RRA("Failed to recv-unpack-tag message.");
+        break;
+      default:
+        result = MB_FAILURE;
+        RRA("Failed to get message of correct type in exch_tags.");
+        break;
+    }
+  }
+  
+    // ok, now wait
+  MPI_Status status[MAX_SHARING_PROCS];
+  success = MPI_Waitall(MAX_SHARING_PROCS, &sendReqs[0], status);
+  if (MPI_SUCCESS != success) {
+    result = MB_FAILURE;
+    RRA("Failure in waitall in tag exchange.");
+  }
+  
+  return MB_SUCCESS;
+}
+
+MBErrorCode MBParallelComm::update_shared_mesh()
+{
+  MBErrorCode result;
+  int success;
+
+    // get all procs interfacing to this proc
+  std::set<unsigned int> iface_procs;
+  result = get_interface_procs(iface_procs);
+  RRA("Failed to get iface sets, procs");
+
+    // post ghost irecv's for all interface procs
+    // index greqs the same as buffer/sharing procs indices
+  std::vector<MPI_Request> recv_reqs(2*MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+  std::vector<MPI_Status> gstatus(MAX_SHARING_PROCS);
+  std::set<unsigned int>::iterator sit;
+  for (sit = iface_procs.begin(); sit != iface_procs.end(); sit++) {
+    int ind = get_buffers(*sit);
+    success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(), 
+                        MPI_UNSIGNED_CHAR, *sit,
+                        MB_MESG_ANY, procConfig.proc_comm(), 
+                        &recv_reqs[ind]);
+    if (success != MPI_SUCCESS) {
+      result = MB_FAILURE;
+      RRA("Failed to post irecv in ghost exchange.");
+    }
+  }
+  
+    // pack and send vertex coordinates from this proc to others
+    // make sendReqs vector to simplify initialization
+  std::fill(sendReqs, sendReqs+2*MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+  MBRange recd_ents[MAX_SHARING_PROCS];
+  
+  for (sit = iface_procs.begin(); sit != iface_procs.end(); sit++) {
+    int ind = get_buffers(*sit);
+    
+    MBRange vertices;
+    for (MBRange::iterator rit = interfaceSets.begin(); rit != interfaceSets.end();
+         rit++) {
+      if (!is_iface_proc(*rit, *sit)) 
+        continue;
+      
+      result = mbImpl->get_entities_by_type( *rit, MBVERTEX, vertices );
+      RRA("Bad interface set.");
+    }
+    std::map<unsigned int,MBRange>::iterator ghosted = ghostedEnts.find(*sit);
+    if (ghosted != ghostedEnts.end()) {
+      MBRange::iterator e = ghosted->second.upper_bound(MBVERTEX);
+      vertices.merge( ghosted->second.begin(), e );
+    }
+
+      // pack-send; this also posts receives if store_remote_handles is true
+    MBRange sent;
+    result = pack_send_entities(*sit, vertices, false, false, 
+                                false, true,
+                                ownerSBuffs[ind], ownerRBuffs[MAX_SHARING_PROCS+ind], 
+                                sendReqs[ind], recv_reqs[MAX_SHARING_PROCS+ind], 
+                                sent);
+    RRA("Failed to pack-send in mesh update exchange.");
+  }
+  
+    // receive/unpack entities
+    // number of incoming messages depends on whether we're getting back
+    // remote handles
+  int num_incoming = iface_procs.size();
+  
+  while (num_incoming) {
+    int ind;
+    MPI_Status status;
+    success = MPI_Waitany(2*MAX_SHARING_PROCS, &recv_reqs[0], &ind, &status);
+    if (MPI_SUCCESS != success) {
+      result = MB_FAILURE;
+      RRA("Failed in waitany in ghost exchange.");
+    }
+    
+      // ok, received something; decrement incoming counter
+    num_incoming--;
+    
+    std::vector<MBEntityHandle> remote_handles_v, sent_ents_tmp;
+    MBRange remote_handles_r;
+    int new_size;
+    
+      // branch on message type
+    switch (status.MPI_TAG) {
+      case MB_MESG_SIZE:
+          // incoming message just has size; resize buffer and re-call recv,
+          // then re-increment incoming count
+        assert(ind < MAX_SHARING_PROCS);
+        new_size = *((int*)&ghostRBuffs[ind][0]);
+        assert(0 > new_size);
+        result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
+                                MB_MESG_ENTS);
+        RRA("Failed to resize recv buffer.");
+        num_incoming++;
+        break;
+      case MB_MESG_ENTS:
+          // incoming ghost entities; process
+        result = recv_unpack_entities(buffProcs[ind], true,
+                                      false, 
+                                      ghostRBuffs[ind], ghostSBuffs[ind], 
+                                      sendReqs[ind], recd_ents[ind]);
+        RRA("Failed to recv-unpack message.");
+        break;
+    }
+  }
+  
+    // ok, now wait if requested
+  MPI_Status status[2*MAX_SHARING_PROCS];
+  success = MPI_Waitall(2*MAX_SHARING_PROCS, &sendReqs[0], status);
+  if (MPI_SUCCESS != success) {
+    result = MB_FAILURE;
+    RRA("Failure in waitall in ghost exchange.");
+  }
+  
+  return MB_SUCCESS;
+}
 MBErrorCode MBParallelComm::update_iface_sets(MBRange &sent_ents,
                                               std::vector<MBEntityHandle> &remote_handles, 
                                               int from_proc) 
@@ -3641,9 +3888,10 @@
 {  
   if (!partitionTag) {
     MBErrorCode result = mbImpl->tag_create(PARALLEL_PARTITION_TAG_NAME, 
-                                            MAX_SHARING_PROCS*sizeof(MBEntityHandle),
+                                            sizeof(int),
                                             MB_TAG_SPARSE,
-                                            MB_TYPE_HANDLE, partitionTag, 
+                                            MB_TYPE_INTEGER, 
+                                            partitionTag, 
                                             NULL, true);
     if (MB_SUCCESS != result && MB_ALREADY_ALLOCATED != result)
       return 0;
@@ -3686,6 +3934,25 @@
   return pc_array[index];
 }
 
+MBErrorCode MBParallelComm::get_all_pcomm( MBInterface* impl, std::vector<MBParallelComm*>& list )
+{
+  MBTag pc_tag = pcomm_tag(impl, false);
+  if (0 == pc_tag)
+    return MB_TAG_NOT_FOUND;
+  
+  MBParallelComm *pc_array[MAX_SHARING_PROCS];
+  MBErrorCode rval = impl->tag_get_data( pc_tag, 0, 0, pc_array );
+  if (MB_SUCCESS != rval)
+    return rval;
+  
+  for (int i = 0; i < MAX_SHARING_PROCS; ++i)
+    if (pc_array[i])
+      list.push_back( pc_array[i] );
+  
+  return MB_SUCCESS;
+}
+  
+
     //! get the indexed pcomm object from the interface
 MBParallelComm *MBParallelComm::get_pcomm( MBInterface *impl, 
                                            MBEntityHandle prtn,
@@ -3861,6 +4128,15 @@
   return MB_SUCCESS;
 }
 
+MBErrorCode MBParallelComm::get_part_handle( int id, MBEntityHandle& handle_out ) const
+{
+  // FIXME: assumes only 1 local part
+  if ((unsigned)id != proc_config().proc_rank())
+    return MB_ENTITY_NOT_FOUND;
+  handle_out = partition_sets().front();
+  return MB_SUCCESS;
+}
+
 MBErrorCode MBParallelComm::create_part( MBEntityHandle& set_out )
 {
     // mark as invalid so we know that it needs to be updated
@@ -3908,6 +4184,7 @@
 MBErrorCode MBParallelComm::collective_sync_partition()
 {
   int count = partition_sets().size();
+  globalPartCount = 0;
   int err = MPI_Allreduce( &count, &globalPartCount, 1, MPI_INT, MPI_SUM, 
                            proc_config().proc_comm() );
   return err ? MB_FAILURE : MB_SUCCESS;
@@ -3915,12 +4192,12 @@
 
 MBErrorCode MBParallelComm::get_part_neighbor_ids( MBEntityHandle part,
                                                    int neighbors_out[MAX_SHARING_PROCS],
-                                                   int num_neighbors_out )
+                                                   int& num_neighbors_out )
 {
   MBErrorCode rval;
   MBRange iface;
   rval = get_interface_sets( part, iface );
-  if (MB_SUCCESS != rval);
+  if (MB_SUCCESS != rval)
     return rval;
   
   num_neighbors_out = 0;
@@ -4051,7 +4328,6 @@
   return mbImpl->tag_get_data( sharedhs_tag(), &entity, 1, remote_handles );
 }
 
-  
 
 #ifdef TEST_PARALLELCOMM
 

Modified: MOAB/trunk/parallel/MBParallelComm.hpp
===================================================================
--- MOAB/trunk/parallel/MBParallelComm.hpp	2008-11-13 02:45:19 UTC (rev 2227)
+++ MOAB/trunk/parallel/MBParallelComm.hpp	2008-11-13 03:38:11 UTC (rev 2228)
@@ -60,6 +60,8 @@
                  MPI_Comm comm = MPI_COMM_WORLD,
                  int* pcomm_id_out = 0);
 
+    //! Get ID used to reference this PCOMM instance
+  int get_id() const { return pcommID; }
 
     //! get the indexed pcomm object from the interface
   static MBParallelComm *get_pcomm(MBInterface *impl, const int index);
@@ -71,6 +73,9 @@
                                     MBEntityHandle partitioning,
                                     const MPI_Comm* comm = 0 );
 
+  static MBErrorCode get_all_pcomm( MBInterface* impl,
+                                    std::vector<MBParallelComm*>& list );
+
     //! destructor
   ~MBParallelComm();
   
@@ -171,6 +176,8 @@
      * \param tagh Handle of tag to be exchanged
      */
   MBErrorCode exchange_tags(MBTag tagh);
+  
+  MBErrorCode exchange_tags( MBTag tag, const MBRange& entities );
 
     /** \brief Broadcast all entities resident on from_proc to other processors
      * This function assumes remote handles are *not* being stored, since (usually)
@@ -309,7 +316,9 @@
 
     //! return partition, interface set ranges
   MBRange &partition_sets() {return partitionSets;}
+  const MBRange &partition_sets() const {return partitionSets;}
   MBRange &interface_sets() {return interfaceSets;}
+  const MBRange &interface_sets() const {return interfaceSets;}
       
     //! return sharedp tag
   MBTag sharedp_tag();
@@ -366,12 +375,13 @@
   MBErrorCode get_global_part_count( int& count_out ) const;
   MBErrorCode get_part_owner( int part_id, int& owner_out ) const;
   MBErrorCode get_part_id( MBEntityHandle part, int& id_out ) const;
+  MBErrorCode get_part_handle( int id, MBEntityHandle& handle_out ) const;
   MBErrorCode create_part( MBEntityHandle& part_out );
   MBErrorCode destroy_part( MBEntityHandle part ) ;
   MBErrorCode collective_sync_partition();
   MBErrorCode get_part_neighbor_ids( MBEntityHandle part, 
                                      int neighbors_out[MAX_SHARING_PROCS],
-                                     int num_neighbors_out );
+                                     int& num_neighbors_out );
   MBErrorCode get_interface_sets( MBEntityHandle part, 
                                   MBRange& iface_sets_out,
                                   int* adj_part_id = 0 );
@@ -382,6 +392,11 @@
                                  int part_ids_out[MAX_SHARING_PROCS],
                                  int& num_part_ids_out,
                                  MBEntityHandle remote_handles[MAX_SHARING_PROCS] = 0 );
+
+    // Propogate mesh modification amongst shared entities
+    // from the onwing processor to any procs with copies.
+  MBErrorCode update_shared_mesh();
+
 private:
 
   int num_subranges(const MBRange &this_range);
@@ -486,7 +501,7 @@
   
   /**\brief Serialize entity tag data
    *
-   * This function operates in two pases.  The first phase,
+   * This function operates in two passes.  The first phase,
    * specified by 'just_count == true' calculates the necesary
    * buffer size for the serialized data and, optionally populates
    * the vectors of tag handles and entity ranges.  The second phase
@@ -726,6 +741,8 @@
   int globalPartCount; //!< Cache of global part count
   
   MBEntityHandle partitioningSet; //!< entity set containing all parts
+  
+  int pcommID;
 };
 
 inline MBErrorCode MBParallelComm::get_shared_proc_tags(MBTag &sharedp,




More information about the moab-dev mailing list