[MOAB-dev] commit/MOAB: iulian07: fix reduce_tags and settle_intersection_points
commits-noreply at bitbucket.org
commits-noreply at bitbucket.org
Thu May 29 18:15:39 CDT 2014
1 new commit in MOAB:
https://bitbucket.org/fathomteam/moab/commits/50481409f301/
Changeset: 50481409f301
Branch: master
User: iulian07
Date: 2014-05-30 01:13:30
Summary: fix reduce_tags and settle_intersection_points
these will use now a total of 6 MPI_requests for each communicating
processor (3 for isend, 3 for irecv), the same way as exchange_tags was
corrected
Affected #: 1 file
diff --git a/src/parallel/ParallelComm.cpp b/src/parallel/ParallelComm.cpp
index 769bace..10ce709 100644
--- a/src/parallel/ParallelComm.cpp
+++ b/src/parallel/ParallelComm.cpp
@@ -7146,7 +7146,8 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
recv_tag_reqs[3*ind + 1], // this is for receiving the second message
recv_tag_reqs[3*ind + 2], // this would be for ack, but it is not used; consider removing it
incoming,
- localOwnedBuffs[ind], sendReqs[3*ind+1], // send reg for sendig the second message
+ localOwnedBuffs[ind],
+ sendReqs[3*ind+1], // send request for sending the second message
sendReqs[3*ind+2], // this is for sending the ack
done);
RRA("Failed to resize recv buffer.");
@@ -7284,8 +7285,8 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
// post ghost irecv's for all interface procs
// index greqs the same as buffer/sharing procs indices
- std::vector<MPI_Request> recv_tag_reqs(2*buffProcs.size(), MPI_REQUEST_NULL),
- sent_ack_reqs(buffProcs.size(), MPI_REQUEST_NULL);
+ std::vector<MPI_Request> recv_tag_reqs(3*buffProcs.size(), MPI_REQUEST_NULL);
+
std::vector<unsigned int>::iterator sit;
int ind;
@@ -7300,7 +7301,7 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
success = MPI_Irecv(remoteOwnedBuffs[ind]->mem_ptr, INITIAL_BUFF_SIZE,
MPI_UNSIGNED_CHAR, *sit,
MB_MESG_TAGS_SIZE, procConfig.proc_comm(),
- &recv_tag_reqs[2*ind]);
+ &recv_tag_reqs[3*ind]);
if (success != MPI_SUCCESS) {
result = MB_FAILURE;
RRA("Failed to post irecv in ghost exchange.");
@@ -7310,7 +7311,7 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
// pack and send tags from this proc to others
// make sendReqs vector to simplify initialization
- sendReqs.resize(2*buffProcs.size(), MPI_REQUEST_NULL);
+ sendReqs.resize(3*buffProcs.size(), MPI_REQUEST_NULL);
// take all shared entities if incoming list is empty
Range entities;
@@ -7367,8 +7368,8 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
RRA("Failed to count buffer in pack_send_tag.");
// now send it
- result = send_buffer(*sit, localOwnedBuffs[ind], MB_MESG_TAGS_SIZE, sendReqs[2*ind],
- recv_tag_reqs[2*ind+1], &dum_ack_buff, incoming);
+ result = send_buffer(*sit, localOwnedBuffs[ind], MB_MESG_TAGS_SIZE, sendReqs[3*ind],
+ recv_tag_reqs[3*ind+2], &dum_ack_buff, incoming);
RRA("Failed to send buffer.");
}
@@ -7376,12 +7377,14 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
// receive/unpack tags
while (incoming) {
MPI_Status status;
+ int index_in_recv_requests;
PRINT_DEBUG_WAITANY(recv_tag_reqs, MB_MESG_TAGS_SIZE, procConfig.proc_rank());
- success = MPI_Waitany(2*buffProcs.size(), &recv_tag_reqs[0], &ind, &status);
+ success = MPI_Waitany(3*buffProcs.size(), &recv_tag_reqs[0], &index_in_recv_requests, &status);
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
RRA("Failed in waitany in ghost exchange.");
}
+ ind = index_in_recv_requests/3;
PRINT_DEBUG_RECD(status);
@@ -7390,15 +7393,19 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
bool done = false;
std::vector<EntityHandle> dum_vec;
- result = recv_buffer(MB_MESG_TAGS_SIZE, status, remoteOwnedBuffs[ind/2],
- recv_tag_reqs[ind/2 * 2], recv_tag_reqs[ind/2 * 2 + 1],
- incoming, localOwnedBuffs[ind/2], sendReqs[ind/2*2], sendReqs[ind/2*2+1],
- done);
+ result = recv_buffer(MB_MESG_TAGS_SIZE, status,
+ remoteOwnedBuffs[ind],
+ recv_tag_reqs[3*ind+1], // this is for receiving the second message
+ recv_tag_reqs[3*ind+2], // this would be for ack, but it is not used; consider removing it
+ incoming, localOwnedBuffs[ind],
+ sendReqs[3*ind+1],// send request for sending the second message
+ sendReqs[3*ind+2], // this is for sending the ack
+ done);
RRA("Failed to resize recv buffer.");
if (done) {
- remoteOwnedBuffs[ind/2]->reset_ptr(sizeof(int));
- result = unpack_tags(remoteOwnedBuffs[ind/2]->buff_ptr,
- dum_vec, true, buffProcs[ind/2], &mpi_op);
+ remoteOwnedBuffs[ind]->reset_ptr(sizeof(int));
+ result = unpack_tags(remoteOwnedBuffs[ind]->buff_ptr,
+ dum_vec, true, buffProcs[ind], &mpi_op);
RRA("Failed to recv-unpack-tag message.");
}
}
@@ -7408,8 +7415,8 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
success = MPI_Barrier(procConfig.proc_comm());
}
else {
- MPI_Status status[2*MAX_SHARING_PROCS];
- success = MPI_Waitall(2*buffProcs.size(), &sendReqs[0], status);
+ MPI_Status status[3*MAX_SHARING_PROCS];
+ success = MPI_Waitall(3*buffProcs.size(), &sendReqs[0], status);
}
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
@@ -7421,158 +7428,6 @@ ErrorCode ParallelComm::post_irecv(std::vector<unsigned int>& shared_procs,
return MB_SUCCESS;
}
- /*
- ErrorCode ParallelComm::exchange_tags( Tag src_tag,
- Tag dst_tag,
- const Range& entities )
- {
- ErrorCode result;
- int success;
-
- // get all procs interfacing to this proc
- std::set<unsigned int> exch_procs;
- result = get_comm_procs(exch_procs);
-
- // post ghost irecv's for all interface procs
- // index greqs the same as buffer/sharing procs indices
- std::vector<MPI_Request> recv_reqs(MAX_SHARING_PROCS, MPI_REQUEST_NULL);
- std::vector<MPI_Status> gstatus(MAX_SHARING_PROCS);
- std::vector<unsigned int>::iterator sit;
- int ind;
- for (ind = 0, sit = buffProcs.begin(); sit != buffProcs.end(); sit++, ind++) {
- success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(),
- MPI_UNSIGNED_CHAR, *sit,
- MB_MESG_ANY, procConfig.proc_comm(),
- &recv_reqs[ind]);
- if (success != MPI_SUCCESS) {
- result = MB_FAILURE;
- RRA("Failed to post irecv in ghost exchange.");
- }
- }
-
- // figure out which entities are shared with which processors
- std::map<int,Range> proc_ents;
- int other_procs[MAX_SHARING_PROCS], num_sharing;
- for (Range::const_iterator i = entities.begin(); i != entities.end(); ++i) {
- int owner;
- result = get_owner( *i, owner );
- RRA("Failed to get entity owner.");
-
- // only send entities that this proc owns
- if ((unsigned)owner != proc_config().proc_rank())
- continue;
-
- result = get_sharing_parts( *i, other_procs, num_sharing );
- RRA("Failed to get procs sharing entity.");
- if (num_sharing == 0) // keep track of non-shared entities for later
- proc_ents[proc_config().proc_rank()].insert( *i );
- for (int j = 0; j < num_sharing; ++j)
- proc_ents[other_procs[j]].insert( *i );
- }
-
- // pack and send tags from this proc to others
- // make sendReqs vector to simplify initialization
- std::fill(sendReqs, sendReqs+MAX_SHARING_PROCS, MPI_REQUEST_NULL);
- std::map<unsigned int,Range>::const_iterator mit;
-
- for (ind = 0, sit = buffProcs.begin(); sit != buffProcs.end(); sit++, ind++) {
-
- // count first
- // buffer needs to begin with the number of tags (one)
- int buff_size = sizeof(int);
- result = packed_tag_size( src_tag, proc_ents[*sit], buff_size );
- RRA("Failed to count buffer in pack_send_tag.");
-
- unsigned char *buff_ptr = &ownerSBuffs[ind][0];
- buff->check_space(ownerSBuffs[ind], buff_ptr, buff_size);
- PACK_INT( buff_ptr, 1 ); // number of tags
- result = pack_tag( src_tag, dst_tag, proc_ents[*sit], proc_ents[*sit],
- ownerSBuffs[ind], buff_ptr, true, *sit );
- RRA("Failed to pack buffer in pack_send_tag.");
-
- // if the message is large, send a first message to tell how large
- if (INITIAL_BUFF_SIZE < buff_size) {
- int tmp_buff_size = -buff_size;
- int success = MPI_Send(&tmp_buff_size, sizeof(int), MPI_UNSIGNED_CHAR,
- *sit, MB_MESG_SIZE, procConfig.proc_comm());
- if (success != MPI_SUCCESS) return MB_FAILURE;
- }
-
- // send the buffer
- success = MPI_Isend(&ownerSBuffs[ind][0], buff_size, MPI_UNSIGNED_CHAR, *sit,
- MB_MESG_TAGS, procConfig.proc_comm(), &sendReqs[ind]);
- if (success != MPI_SUCCESS) return MB_FAILURE;
- }
-
- // receive/unpack tags
- int num_incoming = exch_procs.size();
-
- while (num_incoming) {
- int ind;
- MPI_Status status;
- success = MPI_Waitany(MAX_SHARING_PROCS, &recv_reqs[0], &ind, &status);
- if (MPI_SUCCESS != success) {
- result = MB_FAILURE;
- RRA("Failed in waitany in ghost exchange.");
- }
-
- // ok, received something; decrement incoming counter
- num_incoming--;
-
- int new_size;
- unsigned char *buff_ptr;
- Range dum_range;
-
- // branch on message type
- switch (status.MPI_TAG) {
- case MB_MESG_SIZE:
- // incoming message just has size; resize buffer and re-call recv,
- // then re-increment incoming count
- assert(ind < MAX_SHARING_PROCS);
- new_size = *((int*)&ghostRBuffs[ind][0]);
- assert(0 > new_size);
- result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
- MB_MESG_TAGS);
- RRA("Failed to resize recv buffer.");
- num_incoming++;
- break;
- case MB_MESG_TAGS:
- // incoming ghost entities; process
- buff_ptr = &ghostRBuffs[ind][0];
- result = unpack_tags(buff_ptr, dum_range, true,
- buffProcs[ind]);
- RRA("Failed to recv-unpack-tag message.");
- break;
- default:
- result = MB_FAILURE;
- RRA("Failed to get message of correct type in exch_tags.");
- break;
- }
- }
-
- // ok, now wait
- MPI_Status status[MAX_SHARING_PROCS];
- success = MPI_Waitall(MAX_SHARING_PROCS, &sendReqs[0], status);
- if (MPI_SUCCESS != success) {
- result = MB_FAILURE;
- RRA("Failure in waitall in tag exchange.");
- }
-
- // if src and destination tags aren't the same, need to copy
- // values for local entities
- if (src_tag != dst_tag) {
- const Range& myents = proc_ents[proc_config().proc_rank()];
- std::vector<const void*> data_ptrs(myents.size());
- std::vector<int> data_sizes(myents.size());
- result = get_moab()->tag_get_data( src_tag, myents, &data_ptrs[0], &data_sizes[0] );
- RRA("Failure to get pointers to local data.");
- result = get_moab()->tag_set_data( dst_tag, myents, &data_ptrs[0], &data_sizes[0] );
- RRA("Failure to get pointers to local data.");
- }
-
- return MB_SUCCESS;
- }
- */
//! return sharedp tag
Tag ParallelComm::sharedp_tag()
@@ -8719,8 +8574,7 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
// post ghost irecv's for all interface procs
// index requests the same as buffer/sharing procs indices
- std::vector<MPI_Request> recv_intx_reqs(2 * buffProcs.size(), MPI_REQUEST_NULL),
- sent_ack_reqs(buffProcs.size(), MPI_REQUEST_NULL);
+ std::vector<MPI_Request> recv_intx_reqs(3 * buffProcs.size(), MPI_REQUEST_NULL);
std::vector<unsigned int>::iterator sit;
int ind;
@@ -8734,7 +8588,7 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
success = MPI_Irecv(remoteOwnedBuffs[ind]->mem_ptr, INITIAL_BUFF_SIZE,
MPI_UNSIGNED_CHAR, *sit, MB_MESG_TAGS_SIZE, procConfig.proc_comm(),
- &recv_intx_reqs[2 * ind]);
+ &recv_intx_reqs[3 * ind]);
if (success != MPI_SUCCESS) {
result = MB_FAILURE;
RRA("Failed to post irecv in settle intersection point.");
@@ -8744,7 +8598,7 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
// pack and send intersection points from this proc to others
// make sendReqs vector to simplify initialization
- sendReqs.resize(2 * buffProcs.size(), MPI_REQUEST_NULL);
+ sendReqs.resize(3 * buffProcs.size(), MPI_REQUEST_NULL);
// take all shared entities if incoming list is empty
Range & entities = shared_edges_owned;
@@ -8820,7 +8674,7 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
// now send it
result = send_buffer(*sit, localOwnedBuffs[ind], MB_MESG_TAGS_SIZE,
- sendReqs[2 * ind], recv_intx_reqs[2 * ind + 1], &dum_ack_buff, incoming);
+ sendReqs[3 * ind], recv_intx_reqs[3 * ind + 2], &dum_ack_buff, incoming);
RRA("Failed to send buffer.");
}
@@ -8828,13 +8682,16 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
// receive/unpack intx points
while (incoming) {
MPI_Status status;
+ int index_in_recv_requests;
PRINT_DEBUG_WAITANY(recv_intx_reqs, MB_MESG_TAGS_SIZE, procConfig.proc_rank());
- success = MPI_Waitany(2 * buffProcs.size(), &recv_intx_reqs[0], &ind,
- &status);
+ success = MPI_Waitany(3 * buffProcs.size(), &recv_intx_reqs[0],
+ &index_in_recv_requests, &status);
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
RRA("Failed in waitany in ghost exchange.");
}
+ // processor index in the list is divided by 3
+ ind = index_in_recv_requests/3;
PRINT_DEBUG_RECD(status);
@@ -8843,13 +8700,18 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
bool done = false;
std::vector<EntityHandle> dum_vec;
- result = recv_buffer(MB_MESG_TAGS_SIZE, status, remoteOwnedBuffs[ind / 2],
- recv_intx_reqs[ind / 2 * 2], recv_intx_reqs[ind / 2 * 2 + 1], incoming,
- localOwnedBuffs[ind / 2], sendReqs[ind / 2 * 2],
- sendReqs[ind / 2 * 2 + 1], done);
+ result = recv_buffer(MB_MESG_TAGS_SIZE, status,
+ remoteOwnedBuffs[ind],
+ recv_intx_reqs[3*ind+1], // this is for receiving the second message
+ recv_intx_reqs[3*ind+2], // this would be for ack, but it is not used; consider removing it
+ incoming,
+ localOwnedBuffs[ind],
+ sendReqs[3*ind+1], // send request for sending the second message
+ sendReqs[3*ind+2], // this is for sending the ack
+ done);
RRA("Failed to resize recv buffer.");
if (done) {
- Buffer * buff = remoteOwnedBuffs[ind / 2];
+ Buffer * buff = remoteOwnedBuffs[ind];
buff->reset_ptr(sizeof(int));
/*result = unpack_tags(remoteOwnedBuffs[ind / 2]->buff_ptr, dum_vec, true,
buffProcs[ind / 2]);*/
@@ -8915,8 +8777,8 @@ ErrorCode ParallelComm::settle_intersection_points(Range & edges, Range & shared
if (myDebug->get_verbosity() == 5) {
success = MPI_Barrier(procConfig.proc_comm());
} else {
- MPI_Status status[2 * MAX_SHARING_PROCS];
- success = MPI_Waitall(2 * buffProcs.size(), &sendReqs[0], status);
+ MPI_Status status[3 * MAX_SHARING_PROCS];
+ success = MPI_Waitall(3 * buffProcs.size(), &sendReqs[0], status);
}
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
Repository URL: https://bitbucket.org/fathomteam/moab/
--
This is a commit notification from bitbucket.org. You are receiving
this because you have the service enabled, addressing the recipient of
this email.
More information about the moab-dev
mailing list