[MOAB-dev] r1983 - in MOAB/trunk: . parallel
tautges at mcs.anl.gov
tautges at mcs.anl.gov
Mon Jul 7 11:07:38 CDT 2008
Author: tautges
Date: 2008-07-07 11:07:38 -0500 (Mon, 07 Jul 2008)
New Revision: 1983
Modified:
MOAB/trunk/MBParallelConventions.h
MOAB/trunk/parallel/MBParallelComm.cpp
MOAB/trunk/parallel/MBParallelComm.hpp
MOAB/trunk/parallel/WriteHDF5Parallel.cpp
MOAB/trunk/parallel/mbparallelcomm_test.cpp
Log:
Implementing the exchange_tags function, which exchanges specified tags for
all shared entities for a given MBParallelComm.
NOTE: I'm changing the tag names for the shared proc/handle tags,
adding two underscores. Any tag names beginning with two underscores
are not exchanged. This is consistent with past practice, those tags
aren't stored to files either.
MBParallelConventions.h: adding double underscore to tag names
parallel/mbparallelcomm_test.cpp: call the tag exchange function for
GLOBAL_ID tag after reading file.
parallel/MBParallelComm.cpp:
- adding message type MB_MESG_TAGS
- adding exchange_tags functionality
- changing some std::vector's to std::set's for proc rank lists
parallel/WriteHDF5Parallel.cpp: changed a proc_size() to proc_rank()
on line 332 (paste-o, I think).
Passes make check.
Modified: MOAB/trunk/MBParallelConventions.h
===================================================================
--- MOAB/trunk/MBParallelConventions.h 2008-07-03 21:53:52 UTC (rev 1982)
+++ MOAB/trunk/MBParallelConventions.h 2008-07-07 16:07:38 UTC (rev 1983)
@@ -27,7 +27,7 @@
*
* This single-valued tag implies an entity is shared with one other proc
*/
-#define PARALLEL_SHARED_PROC_TAG_NAME "PARALLEL_SHARED_PROC"
+#define PARALLEL_SHARED_PROC_TAG_NAME "__PARALLEL_SHARED_PROC"
/** \brief Tag storing which other processorS a given entity is shared with
*
@@ -35,13 +35,13 @@
* other processors. Length of tag is application-dependent, and depends on
* what the maximum number of processors is which share an entity
*/
-#define PARALLEL_SHARED_PROCS_TAG_NAME "PARALLEL_SHARED_PROCS"
+#define PARALLEL_SHARED_PROCS_TAG_NAME "__PARALLEL_SHARED_PROCS"
/** \brief Tag storing the handle of a shared entity on the other proc
*
* This single-valued tag implies an entity is shared with one other proc
*/
-#define PARALLEL_SHARED_HANDLE_TAG_NAME "PARALLEL_SHARED_HANDLE"
+#define PARALLEL_SHARED_HANDLE_TAG_NAME "__PARALLEL_SHARED_HANDLE"
/** \brief Tag storing handles of a shared entity on other processors
*
@@ -49,7 +49,7 @@
* other processors. Length of tag is application-dependent, and depends on
* what the maximum number of processors is which share an entity
*/
-#define PARALLEL_SHARED_HANDLES_TAG_NAME "PARALLEL_SHARED_HANDLES"
+#define PARALLEL_SHARED_HANDLES_TAG_NAME "__PARALLEL_SHARED_HANDLES"
/** \brief Tag storing parallel status (as bits in this tag)
*
@@ -62,7 +62,7 @@
* bit 2: interface (0=not interface, 1=interface)
* bit 3: ghost (0=not ghost, 1=ghost)
*/
-#define PARALLEL_STATUS_TAG_NAME "PARALLEL_STATUS"
+#define PARALLEL_STATUS_TAG_NAME "__PARALLEL_STATUS"
#define PSTATUS_NOT_OWNED 0x1
#define PSTATUS_SHARED 0x2
@@ -82,6 +82,6 @@
* Tag type: opaque
* Tag size: MAX_SHARING_PROCS*sizeof(MBParallelComm*)
*/
-#define PARALLEL_COMM_TAG_NAME "PARALLEL_COMM"
+#define PARALLEL_COMM_TAG_NAME "__PARALLEL_COMM"
#endif
Modified: MOAB/trunk/parallel/MBParallelComm.cpp
===================================================================
--- MOAB/trunk/parallel/MBParallelComm.cpp 2008-07-03 21:53:52 UTC (rev 1982)
+++ MOAB/trunk/parallel/MBParallelComm.cpp 2008-07-07 16:07:38 UTC (rev 1983)
@@ -113,9 +113,9 @@
MB_MESG_SIZE,
MB_MESG_ENTS,
MB_MESG_REMOTE_HANDLES_RANGE,
- MB_MESG_REMOTE_HANDLES_VECTOR
-};
-
+ MB_MESG_REMOTE_HANDLES_VECTOR,
+ MB_MESG_TAGS,};
+
MBParallelComm::MBParallelComm(MBInterface *impl, MPI_Comm comm)
: mbImpl(impl), procConfig(comm), sharedpTag(0), sharedpsTag(0),
sharedhTag(0), sharedhsTag(0), pstatusTag(0), ifaceSetsTag(0),
@@ -347,7 +347,7 @@
if (MB_MESG_SIZE == status.MPI_TAG) {
result = recv_size_buff(from_proc,
ghostRBuffs[index],
- req);
+ req, MB_MESG_ANY);
RRA("Failed to resize recv buffer.");
MPI_Wait(&req, &status);
}
@@ -454,12 +454,13 @@
MBErrorCode MBParallelComm::recv_size_buff(const int from_proc,
std::vector<unsigned char> &recv_buff,
- MPI_Request &recv_req)
+ MPI_Request &recv_req,
+ int mesg_tag)
{
// use the received size to resize buffer, then post another irecv
recv_buff.resize(-(*((int*)&recv_buff[0])));
int success = MPI_Irecv(&recv_buff[0], recv_buff.size(), MPI_UNSIGNED_CHAR, from_proc,
- MB_MESG_ENTS, procConfig.proc_comm(), &recv_req);
+ mesg_tag, procConfig.proc_comm(), &recv_req);
if (MPI_SUCCESS != success) {
MBErrorCode result = MB_FAILURE;
RRA("Failed call to Irecv in recv_size_buff.");
@@ -1986,7 +1987,8 @@
const bool store_remote_handles,
const int to_proc,
std::vector<MBTag> &all_tags,
- std::vector<MBRange> &tag_ranges)
+ std::vector<MBRange> &tag_ranges,
+ const bool all_possible_tags)
{
// tags
// get all the tags
@@ -2000,23 +2002,37 @@
if (just_count) {
- std::vector<MBTag> tmp_tags;
- result = tagServer->get_tags(tmp_tags);
- RRA("Failed to get tags in pack_tags.");
+ if (all_possible_tags) {
+ std::vector<MBTag> tmp_tags;
+
+ result = tagServer->get_tags(tmp_tags);
+ RRA("Failed to get tags in pack_tags.");
- for (std::vector<MBTag>::iterator tag_it = tmp_tags.begin(); tag_it != tmp_tags.end(); tag_it++) {
- const TagInfo *tinfo = tagServer->get_tag_info(*tag_it);
- MBRange tmp_range;
- result = tagServer->get_entities(*tag_it, tmp_range);
- RRA("Failed to get entities for tag in pack_tags.");
- tmp_range = tmp_range.intersect(whole_range);
-
- if (tmp_range.empty())
- continue;
+ for (std::vector<MBTag>::iterator tag_it = tmp_tags.begin(); tag_it != tmp_tags.end(); tag_it++) {
+ std::string tag_name;
+ result = mbImpl->tag_get_name(*tag_it, tag_name);
+ if (tag_name.c_str()[0] == '_' && tag_name.c_str()[1] == '_')
+ continue;
- // ok, we'll be sending this tag
- all_tags.push_back(*tag_it);
+ MBRange tmp_range;
+ result = tagServer->get_entities(*tag_it, tmp_range);
+ RRA("Failed to get entities for tag in pack_tags.");
+ tmp_range = tmp_range.intersect(whole_range);
+ if (tmp_range.empty()) continue;
+
+ // ok, we'll be sending this tag
+ all_tags.push_back(*tag_it);
+ tag_ranges.push_back(tmp_range);
+ }
+ }
+
+ std::vector<MBTag>::iterator tag_it;
+ std::vector<MBRange>::iterator rit;
+ for (tag_it = all_tags.begin(), rit = tag_ranges.begin();
+ tag_it != all_tags.end(); tag_it++, rit++) {
+
+ const TagInfo *tinfo = tagServer->get_tag_info(*tag_it);
// default value
count += sizeof(int);
if (NULL != tinfo->default_value())
@@ -2029,29 +2045,25 @@
count += sizeof(int);
count += tinfo->get_name().size();
- if (!tmp_range.empty()) {
- tag_ranges.push_back(tmp_range);
-
// range of tag
- count += sizeof(int) + tmp_range.size() * sizeof(MBEntityHandle);
+ count += sizeof(int) + rit->size() * sizeof(MBEntityHandle);
- if (tinfo->get_size() == MB_VARIABLE_LENGTH) {
- const int num_ent = tmp_range.size();
- // send a tag size for each entity
- count += num_ent * sizeof(int);
- // send tag data for each entity
- var_len_sizes.resize( num_ent );
- var_len_values.resize( num_ent );
- result = tagServer->get_data( *tag_it, tmp_range, &var_len_values[0],
- &var_len_sizes[0] );
- RRA("Failed to get lenghts of variable-length tag values.");
- count += std::accumulate( var_len_sizes.begin(), var_len_sizes.end(), 0 );
- }
- else {
- // tag data values for range or vector
- count += tmp_range.size() * tinfo->get_size();
- }
+ if (tinfo->get_size() == MB_VARIABLE_LENGTH) {
+ const int num_ent = rit->size();
+ // send a tag size for each entity
+ count += num_ent * sizeof(int);
+ // send tag data for each entity
+ var_len_sizes.resize( num_ent );
+ var_len_values.resize( num_ent );
+ result = tagServer->get_data( *tag_it, *rit, &var_len_values[0],
+ &var_len_sizes[0] );
+ RRA("Failed to get lenghts of variable-length tag values.");
+ count += std::accumulate( var_len_sizes.begin(), var_len_sizes.end(), 0 );
}
+ else {
+ // tag data values for range or vector
+ count += rit->size() * tinfo->get_size();
+ }
}
// number of tags
@@ -2059,7 +2071,6 @@
}
else {
- std::vector<unsigned char> tag_data;
std::vector<MBRange>::const_iterator tr_it = tag_ranges.begin();
PACK_INT(buff_ptr, all_tags.size());
@@ -2869,9 +2880,7 @@
MBRange &iface_ents)
{
MBRange iface_sets;
- std::vector<int> iface_procs;
- MBErrorCode result = get_interface_procs(iface_procs);
- RRA("Failed to get iface sets/procs.");
+ MBErrorCode result = MB_SUCCESS;
for (MBRange::iterator rit = interfaceSets.begin(); rit != interfaceSets.end(); rit++) {
if (-1 != other_proc && !is_iface_proc(*rit, other_proc)) continue;
@@ -2885,10 +2894,10 @@
}
//! get processors with which this processor communicates; sets are sorted by processor
-MBErrorCode MBParallelComm::get_interface_procs(std::vector<int> &iface_procs)
+MBErrorCode MBParallelComm::get_interface_procs(std::set<unsigned int> &procs_set)
{
// make sure the sharing procs vector is empty
- iface_procs.clear();
+ procs_set.clear();
// pre-load vector of single-proc tag values
unsigned int i, j;
@@ -2899,18 +2908,17 @@
// get sharing procs either from single-proc vector or by getting
// multi-proc tag value
int tmp_iface_procs[MAX_SHARING_PROCS];
- std::set<int> procs_set;
std::fill(tmp_iface_procs, tmp_iface_procs+MAX_SHARING_PROCS, -1);
MBRange::iterator rit;
for (rit = interfaceSets.begin(), i = 0; rit != interfaceSets.end(); rit++, i++) {
- if (-1 != iface_proc[i]) procs_set.insert(iface_proc[i]);
+ if (-1 != iface_proc[i]) procs_set.insert((unsigned int) iface_proc[i]);
else {
// get the sharing_procs tag
result = mbImpl->tag_get_data(sharedps_tag(), &(*rit), 1,
tmp_iface_procs);
RRA("Failed to get iface_procs for iface set.");
for (j = 0; j < MAX_SHARING_PROCS; j++) {
- if (-1 != tmp_iface_procs[j]) procs_set.insert(tmp_iface_procs[j]);
+ if (-1 != tmp_iface_procs[j]) procs_set.insert((unsigned int) tmp_iface_procs[j]);
else {
std::fill(tmp_iface_procs, tmp_iface_procs+j, -1);
break;
@@ -2918,9 +2926,6 @@
}
}
}
-
- // now put the set contents into the vector
- std::copy(procs_set.begin(), procs_set.end(), std::back_inserter(iface_procs));
return MB_SUCCESS;
}
@@ -3130,7 +3135,7 @@
int success;
// get all procs interfacing to this proc
- std::vector<int> iface_procs;
+ std::set<unsigned int> iface_procs;
result = get_interface_procs(iface_procs);
RRA("Failed to get iface sets, procs");
@@ -3138,11 +3143,11 @@
// index greqs the same as buffer/sharing procs indices
std::vector<MPI_Request> recv_reqs(2*MAX_SHARING_PROCS, MPI_REQUEST_NULL);
std::vector<MPI_Status> gstatus(MAX_SHARING_PROCS);
- std::vector<int>::iterator vit;
- for (vit = iface_procs.begin(); vit != iface_procs.end(); vit++) {
- int ind = get_buffers(*vit);
+ std::set<unsigned int>::iterator sit;
+ for (sit = iface_procs.begin(); sit != iface_procs.end(); sit++) {
+ int ind = get_buffers(*sit);
success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(),
- MPI_UNSIGNED_CHAR, *vit,
+ MPI_UNSIGNED_CHAR, *sit,
MB_MESG_ANY, procConfig.proc_comm(),
&recv_reqs[ind]);
if (success != MPI_SUCCESS) {
@@ -3159,15 +3164,15 @@
// keep track of new ghosted and ghost ents so we can tag them later
MBRange new_ghosted, new_ghosts;
- for (vit = iface_procs.begin(); vit != iface_procs.end(); vit++) {
- int ind = get_buffers(*vit);
+ for (sit = iface_procs.begin(); sit != iface_procs.end(); sit++) {
+ int ind = get_buffers(*sit);
MBRange bridge_ents;
// get bridge ents on interface(s)
for (MBRange::iterator rit = interfaceSets.begin(); rit != interfaceSets.end();
rit++) {
- if (!is_iface_proc(*rit, *vit)) continue;
+ if (!is_iface_proc(*rit, *sit)) continue;
result = get_ghost_layers(*rit, ghost_dim, bridge_dim, num_layers, bridge_ents);
RRA("Failed to get ghost layers.");
@@ -3182,7 +3187,7 @@
}
// pack-send; this also posts receives if store_remote_handles is true
- result = pack_send_entities(*vit, bridge_ents, false, true,
+ result = pack_send_entities(*sit, bridge_ents, false, true,
store_remote_handles, (0 == num_layers),
ownerSBuffs[ind], ownerRBuffs[MAX_SHARING_PROCS+ind],
sendReqs[ind], recv_reqs[MAX_SHARING_PROCS+ind],
@@ -3222,7 +3227,8 @@
assert(ind < MAX_SHARING_PROCS);
new_size = *((int*)&ghostRBuffs[ind][0]);
assert(0 > new_size);
- result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind]);
+ result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
+ MB_MESG_ENTS);
RRA("Failed to resize recv buffer.");
num_incoming++;
break;
@@ -3233,7 +3239,10 @@
ghostRBuffs[ind], ghostSBuffs[ind],
sendReqs[ind], recd_ents[ind]);
RRA("Failed to recv-unpack message.");
- if (0 != num_layers) new_ghosts.merge(recd_ents[ind]);
+ if (0 != num_layers) {
+ new_ghosts.merge(recd_ents[ind]);
+ ghostedEnts[buffProcs[ind]].merge(recd_ents[ind]);
+ }
break;
case MB_MESG_REMOTE_HANDLES_VECTOR:
// incoming remote handles; use to set remote handles
@@ -3296,6 +3305,145 @@
return MB_SUCCESS;
}
+MBErrorCode MBParallelComm::exchange_tags(std::vector<MBTag> &tags)
+{
+ MBErrorCode result;
+ int success;
+
+ // get all procs interfacing to this proc
+ std::set<unsigned int> exch_procs;
+ result = get_comm_procs(exch_procs);
+
+ // post ghost irecv's for all interface procs
+ // index greqs the same as buffer/sharing procs indices
+ std::vector<MPI_Request> recv_reqs(MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+ std::vector<MPI_Status> gstatus(MAX_SHARING_PROCS);
+ std::set<unsigned int>::iterator sit;
+ for (sit = exch_procs.begin(); sit != exch_procs.end(); sit++) {
+ int ind = get_buffers(*sit);
+ success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(),
+ MPI_UNSIGNED_CHAR, *sit,
+ MB_MESG_ANY, procConfig.proc_comm(),
+ &recv_reqs[ind]);
+ if (success != MPI_SUCCESS) {
+ result = MB_FAILURE;
+ RRA("Failed to post irecv in ghost exchange.");
+ }
+ }
+
+ // pack and send tags from this proc to others
+ // make sendReqs vector to simplify initialization
+ std::fill(sendReqs, sendReqs+MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+ std::map<unsigned int,MBRange>::const_iterator mit;
+
+ for (sit = exch_procs.begin(); sit != exch_procs.end(); sit++) {
+ int ind = get_buffers(*sit);
+
+ MBRange tag_ents;
+
+ // get bridge ents on interface(s)
+ for (MBRange::iterator rit = interfaceSets.begin(); rit != interfaceSets.end();
+ rit++) {
+ if (!is_iface_proc(*rit, *sit)) continue;
+
+ result = get_ghost_layers(*rit, -1, 0, 0, tag_ents);
+ RRA("Failed to get tag ents for exchange.");
+ }
+
+ // also get ghosted entities for this proc
+ if ((mit = ghostedEnts.find(*sit)) != ghostedEnts.end())
+ tag_ents.merge((*mit).second);
+
+ // pack-send; this also posts receives if store_remote_handles is true
+ int buff_size = 0;
+ std::vector<MBRange> tag_ranges;
+ for (std::vector<MBTag>::iterator vit = tags.begin(); vit != tags.end(); vit++)
+ tag_ranges.push_back(tag_ents);
+
+ // count first
+ unsigned char *buff_ptr = &ownerSBuffs[ind][0];
+ MBRange::iterator rit = tag_ents.begin();
+ result = pack_tags(tag_ents, rit, tag_ents,
+ buff_ptr, buff_size, true, true, *sit,
+ tags, tag_ranges, false);
+ RRA("Failed to count buffer in pack_send_tag.");
+
+ result = pack_tags(tag_ents, rit, tag_ents,
+ buff_ptr, buff_size, false, true, *sit,
+ tags, tag_ranges, false);
+ RRA("Failed to pack buffer in pack_send_tag.");
+
+ // if the message is large, send a first message to tell how large
+ if (INITIAL_BUFF_SIZE < buff_size) {
+ int tmp_buff_size = -buff_size;
+ int success = MPI_Send(&tmp_buff_size, sizeof(int), MPI_UNSIGNED_CHAR,
+ *sit, MB_MESG_SIZE, procConfig.proc_comm());
+ if (success != MPI_SUCCESS) return MB_FAILURE;
+ }
+
+ // send the buffer
+ success = MPI_Isend(&ownerSBuffs[ind][0], buff_size, MPI_UNSIGNED_CHAR, *sit,
+ MB_MESG_TAGS, procConfig.proc_comm(), &sendReqs[ind]);
+ if (success != MPI_SUCCESS) return MB_FAILURE;
+ }
+
+ // receive/unpack tags
+ int num_incoming = exch_procs.size();
+
+ while (num_incoming) {
+ int ind;
+ MPI_Status status;
+ success = MPI_Waitany(MAX_SHARING_PROCS, &recv_reqs[0], &ind, &status);
+ if (MPI_SUCCESS != success) {
+ result = MB_FAILURE;
+ RRA("Failed in waitany in ghost exchange.");
+ }
+
+ // ok, received something; decrement incoming counter
+ num_incoming--;
+
+ int new_size;
+ unsigned char *buff_ptr;
+ MBRange dum_range;
+
+ // branch on message type
+ switch (status.MPI_TAG) {
+ case MB_MESG_SIZE:
+ // incoming message just has size; resize buffer and re-call recv,
+ // then re-increment incoming count
+ assert(ind < MAX_SHARING_PROCS);
+ new_size = *((int*)&ghostRBuffs[ind][0]);
+ assert(0 > new_size);
+ result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
+ MB_MESG_TAGS);
+ RRA("Failed to resize recv buffer.");
+ num_incoming++;
+ break;
+ case MB_MESG_TAGS:
+ // incoming ghost entities; process
+ buff_ptr = &ghostRBuffs[ind][0];
+ result = unpack_tags(buff_ptr, dum_range, true,
+ buffProcs[ind]);
+ RRA("Failed to recv-unpack-tag message.");
+ break;
+ default:
+ result = MB_FAILURE;
+ RRA("Failed to get message of correct type in exch_tags.");
+ break;
+ }
+ }
+
+ // ok, now wait
+ MPI_Status status[MAX_SHARING_PROCS];
+ success = MPI_Waitall(MAX_SHARING_PROCS, &sendReqs[0], status);
+ if (MPI_SUCCESS != success) {
+ result = MB_FAILURE;
+ RRA("Failure in waitall in tag exchange.");
+ }
+
+ return MB_SUCCESS;
+}
+
MBErrorCode MBParallelComm::update_iface_sets(MBRange &sent_ents,
std::vector<MBEntityHandle> &remote_handles,
int from_proc)
Modified: MOAB/trunk/parallel/MBParallelComm.hpp
===================================================================
--- MOAB/trunk/parallel/MBParallelComm.hpp 2008-07-03 21:53:52 UTC (rev 1982)
+++ MOAB/trunk/parallel/MBParallelComm.hpp 2008-07-07 16:07:38 UTC (rev 1983)
@@ -30,6 +30,7 @@
#include "MBRange.hpp"
#include "MBProcConfig.hpp"
#include <map>
+#include <set>
#include "math.h"
#include "mpi.h"
@@ -138,6 +139,26 @@
bool store_remote_handles,
bool wait_all = true);
+ /** \brief Exchange tags for all shared and ghosted entities
+ * This function should be called collectively over the communicator for this MBParallelComm.
+ * If this version is called, all ghosted/shared entities should have a value for this
+ * tag (or the tag should have a default value).
+ * \param tags Vector of tag handles to be exchanged
+ */
+ MBErrorCode exchange_tags(std::vector<MBTag> &tags);
+
+ /** \brief Exchange tags for all shared and ghosted entities
+ * This function should be called collectively over the communicator for this MBParallelComm
+ * \param tag_name Name of tag to be exchanged
+ */
+ MBErrorCode exchange_tags(const char *tag_name);
+
+ /** \brief Exchange tags for all shared and ghosted entities
+ * This function should be called collectively over the communicator for this MBParallelComm
+ * \param tagh Handle of tag to be exchanged
+ */
+ MBErrorCode exchange_tags(MBTag tagh);
+
/** \brief Broadcast all entities resident on from_proc to other processors
* This function assumes remote handles are *not* being stored, since (usually)
* every processor will know about the whole mesh.
@@ -236,8 +257,11 @@
MBRange &part_sets,
const char *tag_name = NULL);
- //! get processors with which this processor communicates; sets are sorted by processor
- MBErrorCode get_interface_procs(std::vector<int> &iface_procs);
+ //! get processors with which this processor shares an interface
+ MBErrorCode get_interface_procs(std::set<unsigned int> &iface_procs);
+
+ //! get processors with which this processor communicates
+ MBErrorCode get_comm_procs(std::set<unsigned int> &procs);
//! pack the buffer with ALL data for orig_ents; return entities actually
//! packed (including reference sub-entities) in final_ents
@@ -356,7 +380,8 @@
//! Irecv to get message
MBErrorCode recv_size_buff(const int from_proc,
std::vector<unsigned char> &recv_buff,
- MPI_Request &recv_req);
+ MPI_Request &recv_req,
+ int mesg_tag);
//! process contents of receive buffer to get new entities; if store_remote_handles
//! is true, also Isend (using send_buff) handles for these entities back to
@@ -436,7 +461,8 @@
const bool store_handles,
const int to_proc,
std::vector<MBTag> &all_tags,
- std::vector<MBRange> &tag_ranges);
+ std::vector<MBRange> &tag_ranges,
+ const bool all_possible_tags = true);
MBErrorCode unpack_tags(unsigned char *&buff_ptr,
MBRange &entities,
@@ -587,11 +613,15 @@
//! request objects, may be used if store_remote_handles is used
MPI_Request sendReqs[2*MAX_SHARING_PROCS];
+ //! processor rank for each buffer index
std::vector<int> buffProcs;
//! the partition, interface sets for this comm'n instance
MBRange partitionSets, interfaceSets;
+ //! local entities ghosted to other procs
+ std::map<unsigned int, MBRange> ghostedEnts;
+
//! tags used to save sharing procs and handles
MBTag sharedpTag, sharedpsTag, sharedhTag, sharedhsTag, pstatusTag,
ifaceSetsTag, partitionTag;
@@ -612,4 +642,37 @@
return MB_SUCCESS;
}
+inline MBErrorCode MBParallelComm::exchange_tags(const char *tag_name)
+{
+ // get the tag handle
+ std::vector<MBTag> tags(1);
+ MBErrorCode result = mbImpl->tag_get_handle(tag_name, tags[0]);
+ if (MB_SUCCESS != result) return result;
+ else if (!tags[0]) return MB_TAG_NOT_FOUND;
+
+ return exchange_tags(tags);
+}
+
+inline MBErrorCode MBParallelComm::exchange_tags(MBTag tagh)
+{
+ // get the tag handle
+ std::vector<MBTag> tags;
+ tags.push_back(tagh);
+
+ return exchange_tags(tags);
+}
+
+inline MBErrorCode MBParallelComm::get_comm_procs(std::set<unsigned int> &procs)
+{
+ MBErrorCode result = get_interface_procs(procs);
+ if (MB_SUCCESS != result) return result;
+
+ // add any procs sharing ghosts but not already in exch_procs
+ for (std::map<unsigned int, MBRange>::iterator mit = ghostedEnts.begin();
+ mit != ghostedEnts.end(); mit++)
+ procs.insert((*mit).first);
+
+ return MB_SUCCESS;
+}
+
#endif
Modified: MOAB/trunk/parallel/WriteHDF5Parallel.cpp
===================================================================
--- MOAB/trunk/parallel/WriteHDF5Parallel.cpp 2008-07-03 21:53:52 UTC (rev 1982)
+++ MOAB/trunk/parallel/WriteHDF5Parallel.cpp 2008-07-07 16:07:38 UTC (rev 1983)
@@ -329,7 +329,7 @@
// For the 'remoteMesh' list for this processor, just remove
// entities we aren't writing.
- MBRange& my_remote_mesh = remoteMesh[myPcomm->proc_config().proc_size()];
+ MBRange& my_remote_mesh = remoteMesh[myPcomm->proc_config().proc_rank()];
tmpset = my_remote_mesh.subtract( nodeSet.range );
if (!tmpset.empty())
my_remote_mesh = my_remote_mesh.subtract( tmpset );
Modified: MOAB/trunk/parallel/mbparallelcomm_test.cpp
===================================================================
--- MOAB/trunk/parallel/mbparallelcomm_test.cpp 2008-07-03 21:53:52 UTC (rev 1982)
+++ MOAB/trunk/parallel/mbparallelcomm_test.cpp 2008-07-07 16:07:38 UTC (rev 1983)
@@ -104,7 +104,7 @@
std::vector<std::string> filenames;
int parallel_option = 0;
- while (npos < argc) {
+ while (npos != argc) {
MBErrorCode tmp_result;
int nshared = -1;
int this_opt = strtol(argv[npos++], NULL, 0);
@@ -113,158 +113,85 @@
case -1:
case -2:
case -3:
- parallel_option = this_opt;
- continue;
+ parallel_option = this_opt;
+ continue;
case 3:
- // read a file in parallel from the filename on the command line
- tag_name = "MATERIAL_SET";
- tag_val = -1;
- filenames.push_back(std::string(argv[npos++]));
- if (npos < argc) tag_name = argv[npos++];
- if (npos < argc) tag_val = strtol(argv[npos++], NULL, 0);
- if (npos < argc) distrib = strtol(argv[npos++], NULL, 0);
- else distrib = 1;
- if (npos < argc) resolve_shared = strtol(argv[npos++], NULL, 0);
- if (npos < argc) with_ghosts = strtol(argv[npos++], NULL, 0);
+ // read a file in parallel from the filename on the command line
+ tag_name = "MATERIAL_SET";
+ tag_val = -1;
+ filenames.push_back(std::string(argv[npos++]));
+ if (npos < argc) tag_name = argv[npos++];
+ if (npos < argc) tag_val = strtol(argv[npos++], NULL, 0);
+ if (npos < argc) distrib = strtol(argv[npos++], NULL, 0);
+ else distrib = 1;
+ if (npos < argc) resolve_shared = strtol(argv[npos++], NULL, 0);
+ if (npos < argc) with_ghosts = strtol(argv[npos++], NULL, 0);
- tmp_result = read_file(mbImpl, filenames, tag_name, tag_val,
- distrib, parallel_option,
- resolve_shared, with_ghosts);
- if (MB_SUCCESS != tmp_result) {
- result = tmp_result;
- std::cerr << "Couldn't read mesh; error message:" << std::endl;
- PRINT_LAST_ERROR;
- MPI_Abort(MPI_COMM_WORLD, result);
- }
- nshared = -1;
- break;
+ tmp_result = read_file(mbImpl, filenames, tag_name, tag_val,
+ distrib, parallel_option,
+ resolve_shared, with_ghosts);
+ if (MB_SUCCESS != tmp_result) {
+ result = tmp_result;
+ std::cerr << "Couldn't read mesh; error message:" << std::endl;
+ PRINT_LAST_ERROR;
+ MPI_Abort(MPI_COMM_WORLD, result);
+ }
+ nshared = -1;
+ break;
case 4:
- filenames.push_back(argv[npos++]);
- tmp_result = test_packing(mbImpl, filenames[0].c_str());
- if (MB_SUCCESS != tmp_result) {
- result = tmp_result;
- std::cerr << "Packing test failed; error message:" << std::endl;
- PRINT_LAST_ERROR
- }
- break;
+ filenames.push_back(argv[npos++]);
+ tmp_result = test_packing(mbImpl, filenames[0].c_str());
+ if (MB_SUCCESS != tmp_result) {
+ result = tmp_result;
+ std::cerr << "Packing test failed; error message:" << std::endl;
+ PRINT_LAST_ERROR
+ }
+ break;
case 5:
- // read a file in parallel from the filename on the command line
- tag_name = "MATERIAL_SET";
- distrib = 1;
- tag_val = -1;
- with_ghosts = 0;
- resolve_shared = 1;
- while (npos < argc)
- filenames.push_back(std::string(argv[npos++]));
- tmp_result = read_file(mbImpl, filenames, tag_name, tag_val,
- distrib, parallel_option, resolve_shared,
- with_ghosts);
- if (MB_SUCCESS != tmp_result) {
- result = tmp_result;
- std::cerr << "Couldn't read mesh; error message:" << std::endl;
- PRINT_LAST_ERROR;
- MPI_Abort(MPI_COMM_WORLD, result);
- }
- nshared = -1;
- break;
+ // read a file in parallel from the filename on the command line
+ tag_name = "MATERIAL_SET";
+ distrib = 1;
+ tag_val = -1;
+ with_ghosts = 0;
+ resolve_shared = 1;
+ while (npos < argc)
+ filenames.push_back(std::string(argv[npos++]));
+ tmp_result = read_file(mbImpl, filenames, tag_name, tag_val,
+ distrib, parallel_option, resolve_shared,
+ with_ghosts);
+ if (MB_SUCCESS != tmp_result) {
+ result = tmp_result;
+ std::cerr << "Couldn't read mesh; error message:" << std::endl;
+ PRINT_LAST_ERROR;
+ MPI_Abort(MPI_COMM_WORLD, result);
+ }
+ nshared = -1;
+ break;
default:
- std::cerr << "Unrecognized option \"" << this_opt
- << "\"; skipping." << std::endl;
- tmp_result = MB_FAILURE;
+ std::cerr << "Unrecognized option \"" << this_opt
+ << "\"; skipping." << std::endl;
+ tmp_result = MB_FAILURE;
}
if (0 == rank) rtime = MPI_Wtime();
- if (MB_SUCCESS == tmp_result && 4 != this_opt && false) {
- // now figure out which vertices are shared
- MBParallelComm *pcomm = MBParallelComm::get_pcomm(mbImpl, 0);
- assert(pcomm);
-
- MBRange iface_ents[7];
- for (int i = 0; i < 4; i++) {
- tmp_result = pcomm->get_iface_entities(-1, i, iface_ents[i]);
-
- if (MB_SUCCESS != tmp_result) {
- std::cerr << "get_iface_entities returned error on proc "
- << rank << "; message: " << std::endl;
- PRINT_LAST_ERROR;
- result = tmp_result;
- }
- if (0 != i) iface_ents[4].merge(iface_ents[i]);
- }
- result = pcomm->get_part_entities(iface_ents[6], -1);
- PRINT_LAST_ERROR;
-
- std::cerr << "Proc " << rank << " partition entities:" << std::endl;
- iface_ents[6].print(" ");
-
- if (0 == rank) setime = MPI_Wtime();
-
- // check # iface entities
- if (0 <= nshared && nshared != (int) iface_ents[0].size()) {
- std::cerr << "Didn't get correct number of iface vertices on "
- << "processor " << rank << std::endl;
- result = MB_FAILURE;
- }
-
- else
- std::cerr << "Proc " << rank << " option " << this_opt
- << " succeeded." << std::endl;
-
- if (-1 == nshared) {
- result = mbImpl->get_adjacencies(iface_ents[4], 0, false, iface_ents[5],
- MBInterface::UNION);
-
- std::cerr << "Proc " << rank << " iface entities: " << std::endl;
- for (int i = 0; i < 4; i++)
- std::cerr << " " << iface_ents[i].size() << " "
- << i << "d iface entities." << std::endl;
- std::cerr << " (" << iface_ents[5].size()
- << " verts adj to other iface ents)" << std::endl;
- }
-
- if (debug && false) {
-// if (debug && 2 == nprocs) {
- // if I'm root, get and print handles on other procs
- std::vector<MBEntityHandle> sharedh_tags(iface_ents[0].size());
- std::fill(sharedh_tags.begin(), sharedh_tags.end(), 0);
- MBTag dumt, sharedh_tag;
- result = pcomm->get_shared_proc_tags(dumt, dumt, sharedh_tag, dumt, dumt);
- result = mbImpl->tag_get_data(sharedh_tag, iface_ents[0], &sharedh_tags[0]);
- if (MB_SUCCESS != result) {
- std::cerr << "Couldn't get shared handle tag." << std::endl;
- }
- else {
- MBRange dum_range;
- std::copy(sharedh_tags.begin(), sharedh_tags.end(), mb_range_inserter(dum_range));
- std::cerr << "Shared handles: " << std::endl;
- dum_range.print();
- }
-
- result = report_nsets(mbImpl);
- }
-
- if (0 == rank) ltime = MPI_Wtime();
-
- delete pcomm;
- tmp_result = mbImpl->delete_mesh();
- if (MB_SUCCESS != tmp_result) {
- result = tmp_result;
- std::cerr << "Couldn't delete mesh on rank " << rank
- << "; error message: " << std::endl;
- PRINT_LAST_ERROR
- }
- }
}
if (0 == rank) dtime = MPI_Wtime();
err = MPI_Finalize();
+ result = mbImpl->delete_mesh();
+ if (MB_SUCCESS != result) {
+ std::cerr << "Couldn't delete mesh on rank " << rank
+ << "; error message: " << std::endl;
+ PRINT_LAST_ERROR;
+ }
+
if (MB_SUCCESS == result)
std::cerr << "Proc " << rank << ": Success." << std::endl;
@@ -400,6 +327,14 @@
MPI_Abort(MPI_COMM_WORLD, result);
break;
}
+
+ // exchange tag
+ result = pcs[i]->exchange_tags("GLOBAL_ID");
+ if (MB_SUCCESS != result) {
+ std::cerr << "Tag exchange didn't work." << std::endl;
+ break;
+ }
+
}
if (MB_SUCCESS == result) report_iface_ents(mbImpl, pcs);
More information about the moab-dev
mailing list