[MOAB-dev] commit/MOAB: iulian07: fix reduce_tags and settle_intersection_points

commits-noreply at bitbucket.org commits-noreply at bitbucket.org
Thu May 29 18:15:39 CDT 2014


1 new commit in MOAB:

https://bitbucket.org/fathomteam/moab/commits/50481409f301/
Changeset:   50481409f301
Branch:      master
User:        iulian07
Date:        2014-05-30 01:13:30
Summary:     fix reduce_tags and settle_intersection_points

these will use now a total of 6 MPI_requests for each communicating
processor (3 for isend, 3 for irecv), the same way as exchange_tags was
corrected

Affected #:  1 file

diff --git a/src/parallel/ParallelComm.cpp b/src/parallel/ParallelComm.cpp
index 769bace..10ce709 100644
--- a/src/parallel/ParallelComm.cpp
+++ b/src/parallel/ParallelComm.cpp
@@ -7146,7 +7146,8 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
                            recv_tag_reqs[3*ind + 1], // this is for receiving the second message
                            recv_tag_reqs[3*ind + 2], // this would be for ack, but it is not used; consider removing it
                            incoming,
-                           localOwnedBuffs[ind], sendReqs[3*ind+1], // send reg for sendig the second message
+                           localOwnedBuffs[ind],
+                           sendReqs[3*ind+1], // send request for sending the second message
                            sendReqs[3*ind+2], // this is for sending the ack
                            done);
       RRA("Failed to resize recv buffer.");
@@ -7284,8 +7285,8 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
 
     // post ghost irecv's for all interface procs
     // index greqs the same as buffer/sharing procs indices
-    std::vector<MPI_Request> recv_tag_reqs(2*buffProcs.size(), MPI_REQUEST_NULL),
-      sent_ack_reqs(buffProcs.size(), MPI_REQUEST_NULL);
+    std::vector<MPI_Request> recv_tag_reqs(3*buffProcs.size(), MPI_REQUEST_NULL);
+
     std::vector<unsigned int>::iterator sit;
     int ind;
 
@@ -7300,7 +7301,7 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
       success = MPI_Irecv(remoteOwnedBuffs[ind]->mem_ptr, INITIAL_BUFF_SIZE,
                           MPI_UNSIGNED_CHAR, *sit,
                           MB_MESG_TAGS_SIZE, procConfig.proc_comm(), 
-                          &recv_tag_reqs[2*ind]);
+                          &recv_tag_reqs[3*ind]);
       if (success != MPI_SUCCESS) {
         result = MB_FAILURE;
         RRA("Failed to post irecv in ghost exchange.");
@@ -7310,7 +7311,7 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
   
     // pack and send tags from this proc to others
     // make sendReqs vector to simplify initialization
-    sendReqs.resize(2*buffProcs.size(), MPI_REQUEST_NULL);
+    sendReqs.resize(3*buffProcs.size(), MPI_REQUEST_NULL);
   
     // take all shared entities if incoming list is empty
     Range entities;
@@ -7367,8 +7368,8 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
       RRA("Failed to count buffer in pack_send_tag.");
 
       // now send it
-      result = send_buffer(*sit, localOwnedBuffs[ind], MB_MESG_TAGS_SIZE, sendReqs[2*ind],
-                           recv_tag_reqs[2*ind+1], &dum_ack_buff, incoming);
+      result = send_buffer(*sit, localOwnedBuffs[ind], MB_MESG_TAGS_SIZE, sendReqs[3*ind],
+                           recv_tag_reqs[3*ind+2], &dum_ack_buff, incoming);
       RRA("Failed to send buffer.");
                          
     }
@@ -7376,12 +7377,14 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
     // receive/unpack tags
     while (incoming) {
       MPI_Status status;
+      int index_in_recv_requests;
       PRINT_DEBUG_WAITANY(recv_tag_reqs, MB_MESG_TAGS_SIZE, procConfig.proc_rank());
-      success = MPI_Waitany(2*buffProcs.size(), &recv_tag_reqs[0], &ind, &status);
+      success = MPI_Waitany(3*buffProcs.size(), &recv_tag_reqs[0], &index_in_recv_requests, &status);
       if (MPI_SUCCESS != success) {
         result = MB_FAILURE;
         RRA("Failed in waitany in ghost exchange.");
       }
+      ind = index_in_recv_requests/3;
     
       PRINT_DEBUG_RECD(status);
 
@@ -7390,15 +7393,19 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
     
       bool done = false;
       std::vector<EntityHandle> dum_vec;
-      result = recv_buffer(MB_MESG_TAGS_SIZE, status, remoteOwnedBuffs[ind/2],
-                           recv_tag_reqs[ind/2 * 2], recv_tag_reqs[ind/2 * 2 + 1],
-                           incoming, localOwnedBuffs[ind/2], sendReqs[ind/2*2], sendReqs[ind/2*2+1], 
-                           done);
+      result = recv_buffer(MB_MESG_TAGS_SIZE, status,
+                        remoteOwnedBuffs[ind],
+                        recv_tag_reqs[3*ind+1], // this is for receiving the second message
+                        recv_tag_reqs[3*ind+2], // this would be for ack, but it is not used; consider removing it
+                        incoming, localOwnedBuffs[ind],
+                        sendReqs[3*ind+1],// send request for sending the second message
+                        sendReqs[3*ind+2], // this is for sending the ack
+                        done);
       RRA("Failed to resize recv buffer.");
       if (done) {
-        remoteOwnedBuffs[ind/2]->reset_ptr(sizeof(int));
-        result = unpack_tags(remoteOwnedBuffs[ind/2]->buff_ptr,
-                               dum_vec, true, buffProcs[ind/2], &mpi_op);
+        remoteOwnedBuffs[ind]->reset_ptr(sizeof(int));
+        result = unpack_tags(remoteOwnedBuffs[ind]->buff_ptr,
+                               dum_vec, true, buffProcs[ind], &mpi_op);
         RRA("Failed to recv-unpack-tag message.");
       }
     }
@@ -7408,8 +7415,8 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
       success = MPI_Barrier(procConfig.proc_comm());
     }
     else {
-      MPI_Status status[2*MAX_SHARING_PROCS];
-      success = MPI_Waitall(2*buffProcs.size(), &sendReqs[0], status);
+      MPI_Status status[3*MAX_SHARING_PROCS];
+      success = MPI_Waitall(3*buffProcs.size(), &sendReqs[0], status);
     }
     if (MPI_SUCCESS != success) {
       result = MB_FAILURE;
@@ -7421,158 +7428,6 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
     return MB_SUCCESS;
   }
 
-  /*
-    ErrorCode ParallelComm::exchange_tags( Tag src_tag, 
-    Tag dst_tag, 
-    const Range& entities )
-    {
-    ErrorCode result;
-    int success;
-
-    // get all procs interfacing to this proc
-    std::set<unsigned int> exch_procs;
-    result = get_comm_procs(exch_procs);  
-
-    // post ghost irecv's for all interface procs
-    // index greqs the same as buffer/sharing procs indices
-    std::vector<MPI_Request> recv_reqs(MAX_SHARING_PROCS, MPI_REQUEST_NULL);
-    std::vector<MPI_Status> gstatus(MAX_SHARING_PROCS);
-    std::vector<unsigned int>::iterator sit;
-    int ind;
-    for (ind = 0, sit = buffProcs.begin(); sit != buffProcs.end(); sit++, ind++) {
-    success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(), 
-    MPI_UNSIGNED_CHAR, *sit,
-    MB_MESG_ANY, procConfig.proc_comm(), 
-    &recv_reqs[ind]);
-    if (success != MPI_SUCCESS) {
-    result = MB_FAILURE;
-    RRA("Failed to post irecv in ghost exchange.");
-    }
-    }
-  
-    // figure out which entities are shared with which processors
-    std::map<int,Range> proc_ents;
-    int other_procs[MAX_SHARING_PROCS], num_sharing;
-    for (Range::const_iterator i = entities.begin(); i != entities.end(); ++i) {
-    int owner;
-    result = get_owner( *i, owner );
-    RRA("Failed to get entity owner.");
-
-    // only send entities that this proc owns
-    if ((unsigned)owner != proc_config().proc_rank()) 
-    continue;
-    
-    result = get_sharing_parts( *i, other_procs, num_sharing );
-    RRA("Failed to get procs sharing entity.");
-    if (num_sharing == 0) // keep track of non-shared entities for later
-    proc_ents[proc_config().proc_rank()].insert( *i );
-    for (int j = 0; j < num_sharing; ++j)
-    proc_ents[other_procs[j]].insert( *i );
-    }
-  
-    // pack and send tags from this proc to others
-    // make sendReqs vector to simplify initialization
-    std::fill(sendReqs, sendReqs+MAX_SHARING_PROCS, MPI_REQUEST_NULL);
-    std::map<unsigned int,Range>::const_iterator mit;
-  
-    for (ind = 0, sit = buffProcs.begin(); sit != buffProcs.end(); sit++, ind++) {
-    
-    // count first
-    // buffer needs to begin with the number of tags (one)
-    int buff_size = sizeof(int);
-    result = packed_tag_size( src_tag, proc_ents[*sit], buff_size );
-    RRA("Failed to count buffer in pack_send_tag.");
-
-    unsigned char *buff_ptr = &ownerSBuffs[ind][0];
-    buff->check_space(ownerSBuffs[ind], buff_ptr, buff_size);
-    PACK_INT( buff_ptr, 1 ); // number of tags
-    result = pack_tag( src_tag, dst_tag, proc_ents[*sit], proc_ents[*sit],
-    ownerSBuffs[ind], buff_ptr, true, *sit );
-    RRA("Failed to pack buffer in pack_send_tag.");
-
-    // if the message is large, send a first message to tell how large
-    if (INITIAL_BUFF_SIZE < buff_size) {
-    int tmp_buff_size = -buff_size;
-    int success = MPI_Send(&tmp_buff_size, sizeof(int), MPI_UNSIGNED_CHAR, 
-    *sit, MB_MESG_SIZE, procConfig.proc_comm());
-    if (success != MPI_SUCCESS) return MB_FAILURE;
-    }
-    
-    // send the buffer
-    success = MPI_Isend(&ownerSBuffs[ind][0], buff_size, MPI_UNSIGNED_CHAR, *sit, 
-    MB_MESG_TAGS, procConfig.proc_comm(), &sendReqs[ind]);
-    if (success != MPI_SUCCESS) return MB_FAILURE;
-    }
-  
-    // receive/unpack tags
-    int num_incoming = exch_procs.size();
-  
-    while (num_incoming) {
-    int ind;
-    MPI_Status status;
-    success = MPI_Waitany(MAX_SHARING_PROCS, &recv_reqs[0], &ind, &status);
-    if (MPI_SUCCESS != success) {
-    result = MB_FAILURE;
-    RRA("Failed in waitany in ghost exchange.");
-    }
-    
-    // ok, received something; decrement incoming counter
-    num_incoming--;
-    
-    int new_size;
-    unsigned char *buff_ptr;
-    Range dum_range;
-    
-    // branch on message type
-    switch (status.MPI_TAG) {
-    case MB_MESG_SIZE:
-    // incoming message just has size; resize buffer and re-call recv,
-    // then re-increment incoming count
-    assert(ind < MAX_SHARING_PROCS);
-    new_size = *((int*)&ghostRBuffs[ind][0]);
-    assert(0 > new_size);
-    result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
-    MB_MESG_TAGS);
-    RRA("Failed to resize recv buffer.");
-    num_incoming++;
-    break;
-    case MB_MESG_TAGS:
-    // incoming ghost entities; process
-    buff_ptr = &ghostRBuffs[ind][0];
-    result = unpack_tags(buff_ptr, dum_range, true,
-    buffProcs[ind]);
-    RRA("Failed to recv-unpack-tag message.");
-    break;
-    default:
-    result = MB_FAILURE;
-    RRA("Failed to get message of correct type in exch_tags.");
-    break;
-    }
-    }
-  
-    // ok, now wait
-    MPI_Status status[MAX_SHARING_PROCS];
-    success = MPI_Waitall(MAX_SHARING_PROCS, &sendReqs[0], status);
-    if (MPI_SUCCESS != success) {
-    result = MB_FAILURE;
-    RRA("Failure in waitall in tag exchange.");
-    }
-  
-    // if src and destination tags aren't the same, need to copy 
-    // values for local entities
-    if (src_tag != dst_tag) {
-    const Range& myents = proc_ents[proc_config().proc_rank()];
-    std::vector<const void*> data_ptrs(myents.size());
-    std::vector<int> data_sizes(myents.size());
-    result = get_moab()->tag_get_data( src_tag, myents, &data_ptrs[0], &data_sizes[0] );
-    RRA("Failure to get pointers to local data.");
-    result = get_moab()->tag_set_data( dst_tag, myents, &data_ptrs[0], &data_sizes[0] );
-    RRA("Failure to get pointers to local data.");
-    }  
-  
-    return MB_SUCCESS;
-    }
-  */
 
   //! return sharedp tag
   Tag ParallelComm::sharedp_tag()
@@ -8719,8 +8574,7 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
 
   // post ghost irecv's for all interface procs
   // index requests the same as buffer/sharing procs indices
-  std::vector<MPI_Request>  recv_intx_reqs(2 * buffProcs.size(), MPI_REQUEST_NULL),
-      sent_ack_reqs(buffProcs.size(), MPI_REQUEST_NULL);
+  std::vector<MPI_Request>  recv_intx_reqs(3 * buffProcs.size(), MPI_REQUEST_NULL);
   std::vector<unsigned int>::iterator sit;
   int ind;
 
@@ -8734,7 +8588,7 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
 
     success = MPI_Irecv(remoteOwnedBuffs[ind]->mem_ptr, INITIAL_BUFF_SIZE,
         MPI_UNSIGNED_CHAR, *sit, MB_MESG_TAGS_SIZE, procConfig.proc_comm(),
-        &recv_intx_reqs[2 * ind]);
+        &recv_intx_reqs[3 * ind]);
     if (success != MPI_SUCCESS) {
       result = MB_FAILURE;
       RRA("Failed to post irecv in settle intersection point.");
@@ -8744,7 +8598,7 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
 
   // pack and send intersection points from this proc to others
   // make sendReqs vector to simplify initialization
-  sendReqs.resize(2 * buffProcs.size(), MPI_REQUEST_NULL);
+  sendReqs.resize(3 * buffProcs.size(), MPI_REQUEST_NULL);
 
   // take all shared entities if incoming list is empty
   Range & entities = shared_edges_owned;
@@ -8820,7 +8674,7 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
 
     // now send it
     result = send_buffer(*sit, localOwnedBuffs[ind], MB_MESG_TAGS_SIZE,
-        sendReqs[2 * ind], recv_intx_reqs[2 * ind + 1], &dum_ack_buff, incoming);
+        sendReqs[3 * ind], recv_intx_reqs[3 * ind + 2], &dum_ack_buff, incoming);
     RRA("Failed to send buffer.");
 
   }
@@ -8828,13 +8682,16 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
   // receive/unpack intx points
   while (incoming) {
     MPI_Status status;
+    int index_in_recv_requests;
     PRINT_DEBUG_WAITANY(recv_intx_reqs, MB_MESG_TAGS_SIZE, procConfig.proc_rank());
-    success = MPI_Waitany(2 * buffProcs.size(), &recv_intx_reqs[0], &ind,
-        &status);
+    success = MPI_Waitany(3 * buffProcs.size(), &recv_intx_reqs[0],
+        &index_in_recv_requests, &status);
     if (MPI_SUCCESS != success) {
       result = MB_FAILURE;
       RRA("Failed in waitany in ghost exchange.");
     }
+    // processor index in the list is divided by 3
+    ind = index_in_recv_requests/3;
 
     PRINT_DEBUG_RECD(status);
 
@@ -8843,13 +8700,18 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
 
     bool done = false;
     std::vector<EntityHandle> dum_vec;
-    result = recv_buffer(MB_MESG_TAGS_SIZE, status, remoteOwnedBuffs[ind / 2],
-        recv_intx_reqs[ind / 2 * 2], recv_intx_reqs[ind / 2 * 2 + 1], incoming,
-        localOwnedBuffs[ind / 2], sendReqs[ind / 2 * 2],
-        sendReqs[ind / 2 * 2 + 1], done);
+    result = recv_buffer(MB_MESG_TAGS_SIZE, status,
+        remoteOwnedBuffs[ind],
+        recv_intx_reqs[3*ind+1], // this is for receiving the second message
+        recv_intx_reqs[3*ind+2], // this would be for ack, but it is not used; consider removing it
+        incoming,
+        localOwnedBuffs[ind],
+        sendReqs[3*ind+1], // send request for sending the second message
+        sendReqs[3*ind+2], // this is for sending the ack
+        done);
     RRA("Failed to resize recv buffer.");
     if (done) {
-      Buffer * buff = remoteOwnedBuffs[ind / 2];
+      Buffer * buff = remoteOwnedBuffs[ind];
       buff->reset_ptr(sizeof(int));
       /*result = unpack_tags(remoteOwnedBuffs[ind / 2]->buff_ptr, dum_vec, true,
           buffProcs[ind / 2]);*/
@@ -8915,8 +8777,8 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
   if (myDebug->get_verbosity() == 5) {
     success = MPI_Barrier(procConfig.proc_comm());
   } else {
-    MPI_Status status[2 * MAX_SHARING_PROCS];
-    success = MPI_Waitall(2 * buffProcs.size(), &sendReqs[0], status);
+    MPI_Status status[3 * MAX_SHARING_PROCS];
+    success = MPI_Waitall(3 * buffProcs.size(), &sendReqs[0], status);
   }
   if (MPI_SUCCESS != success) {
     result = MB_FAILURE;

Repository URL: https://bitbucket.org/fathomteam/moab/

--

This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.


More information about the moab-dev mailing list