[MOAB-dev] r2228 - MOAB/trunk/parallel
kraftche at mcs.anl.gov
kraftche at mcs.anl.gov
Wed Nov 12 21:38:11 CST 2008
Author: kraftche
Date: 2008-11-12 21:38:11 -0600 (Wed, 12 Nov 2008)
New Revision: 2228
Modified:
MOAB/trunk/parallel/MBParallelComm.cpp
MOAB/trunk/parallel/MBParallelComm.hpp
Log:
o Add function to get MBParallelComm ID
o Add function to get all MBParallelComm instances
o Add more precise exchange_tags function
o Add const forms of functions to get interface and part sets
o Add function to get part handle given part id
o Fix incorrect signature for get_part_neighbor_ids
o Add update_shared_mesh function
o Make the PARALLEL_PARTITON_TAG_NAME tag an int rather than 64 handles
o Fix bug in collective_sync_parts
o Fix bug in get_part_neighbor_ids
Modified: MOAB/trunk/parallel/MBParallelComm.cpp
===================================================================
--- MOAB/trunk/parallel/MBParallelComm.cpp 2008-11-13 02:45:19 UTC (rev 2227)
+++ MOAB/trunk/parallel/MBParallelComm.cpp 2008-11-13 03:38:11 UTC (rev 2228)
@@ -155,9 +155,9 @@
retval = MPI_Init(&argc, &argv);
}
- int myid = add_pcomm(this);
+ pcommID = add_pcomm(this);
if (id)
- *id = myid;
+ *id = pcommID;
}
MBParallelComm::MBParallelComm(MBInterface *impl,
@@ -179,9 +179,9 @@
retval = MPI_Init(&argc, &argv);
}
- int myid = add_pcomm(this);
+ pcommID = add_pcomm(this);
if (id)
- *id = myid;
+ *id = pcommID;
}
MBParallelComm::~MBParallelComm()
@@ -3535,6 +3535,253 @@
return MB_SUCCESS;
}
+MBErrorCode MBParallelComm::exchange_tags( MBTag tag, const MBRange& entities )
+{
+ MBErrorCode result;
+ int success;
+
+ // get all procs interfacing to this proc
+ std::set<unsigned int> exch_procs;
+ result = get_comm_procs(exch_procs);
+
+ // post ghost irecv's for all interface procs
+ // index greqs the same as buffer/sharing procs indices
+ std::vector<MPI_Request> recv_reqs(MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+ std::vector<MPI_Status> gstatus(MAX_SHARING_PROCS);
+ std::set<unsigned int>::iterator sit;
+ for (sit = exch_procs.begin(); sit != exch_procs.end(); sit++) {
+ int ind = get_buffers(*sit);
+ success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(),
+ MPI_UNSIGNED_CHAR, *sit,
+ MB_MESG_ANY, procConfig.proc_comm(),
+ &recv_reqs[ind]);
+ if (success != MPI_SUCCESS) {
+ result = MB_FAILURE;
+ RRA("Failed to post irecv in ghost exchange.");
+ }
+ }
+
+ // group entities by owner
+ std::map<int,MBRange> proc_ents;
+ for (MBRange::const_iterator i = entities.begin(); i != entities.end(); ++i) {
+ int owner;
+ result = get_owner( *i, owner );
+ RRA("Failed to get entity owner.");
+ proc_ents[owner].insert( *i );
+ }
+
+ // pack and send tags from this proc to others
+ // make sendReqs vector to simplify initialization
+ std::fill(sendReqs, sendReqs+MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+ std::map<unsigned int,MBRange>::const_iterator mit;
+
+ for (sit = exch_procs.begin(); sit != exch_procs.end(); sit++) {
+ int ind = get_buffers(*sit);
+
+ MBRange& tag_ents = proc_ents[*sit];
+ std::vector<MBTag> tags(1); tags[0] = tag;
+ std::vector<MBRange> tag_ranges(1); tag_ranges[0].swap(tag_ents);
+
+ // count first
+ int buff_size = 0;
+ unsigned char *buff_ptr = &ownerSBuffs[ind][0];
+ MBRange::iterator rit = tag_ranges[0].begin();
+ result = pack_tags(tag_ranges[0], rit, tag_ranges[0],
+ buff_ptr, buff_size, true, true, *sit,
+ tags, tag_ranges, false);
+ RRA("Failed to count buffer in pack_send_tag.");
+
+ result = pack_tags(tag_ranges[0], rit, tag_ranges[0],
+ buff_ptr, buff_size, false, true, *sit,
+ tags, tag_ranges, false);
+ RRA("Failed to pack buffer in pack_send_tag.");
+
+ // if the message is large, send a first message to tell how large
+ if (INITIAL_BUFF_SIZE < buff_size) {
+ int tmp_buff_size = -buff_size;
+ int success = MPI_Send(&tmp_buff_size, sizeof(int), MPI_UNSIGNED_CHAR,
+ *sit, MB_MESG_SIZE, procConfig.proc_comm());
+ if (success != MPI_SUCCESS) return MB_FAILURE;
+ }
+
+ // send the buffer
+ success = MPI_Isend(&ownerSBuffs[ind][0], buff_size, MPI_UNSIGNED_CHAR, *sit,
+ MB_MESG_TAGS, procConfig.proc_comm(), &sendReqs[ind]);
+ if (success != MPI_SUCCESS) return MB_FAILURE;
+ }
+
+ // receive/unpack tags
+ int num_incoming = exch_procs.size();
+
+ while (num_incoming) {
+ int ind;
+ MPI_Status status;
+ success = MPI_Waitany(MAX_SHARING_PROCS, &recv_reqs[0], &ind, &status);
+ if (MPI_SUCCESS != success) {
+ result = MB_FAILURE;
+ RRA("Failed in waitany in ghost exchange.");
+ }
+
+ // ok, received something; decrement incoming counter
+ num_incoming--;
+
+ int new_size;
+ unsigned char *buff_ptr;
+ MBRange dum_range;
+
+ // branch on message type
+ switch (status.MPI_TAG) {
+ case MB_MESG_SIZE:
+ // incoming message just has size; resize buffer and re-call recv,
+ // then re-increment incoming count
+ assert(ind < MAX_SHARING_PROCS);
+ new_size = *((int*)&ghostRBuffs[ind][0]);
+ assert(0 > new_size);
+ result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
+ MB_MESG_TAGS);
+ RRA("Failed to resize recv buffer.");
+ num_incoming++;
+ break;
+ case MB_MESG_TAGS:
+ // incoming ghost entities; process
+ buff_ptr = &ghostRBuffs[ind][0];
+ result = unpack_tags(buff_ptr, dum_range, true,
+ buffProcs[ind]);
+ RRA("Failed to recv-unpack-tag message.");
+ break;
+ default:
+ result = MB_FAILURE;
+ RRA("Failed to get message of correct type in exch_tags.");
+ break;
+ }
+ }
+
+ // ok, now wait
+ MPI_Status status[MAX_SHARING_PROCS];
+ success = MPI_Waitall(MAX_SHARING_PROCS, &sendReqs[0], status);
+ if (MPI_SUCCESS != success) {
+ result = MB_FAILURE;
+ RRA("Failure in waitall in tag exchange.");
+ }
+
+ return MB_SUCCESS;
+}
+
+MBErrorCode MBParallelComm::update_shared_mesh()
+{
+ MBErrorCode result;
+ int success;
+
+ // get all procs interfacing to this proc
+ std::set<unsigned int> iface_procs;
+ result = get_interface_procs(iface_procs);
+ RRA("Failed to get iface sets, procs");
+
+ // post ghost irecv's for all interface procs
+ // index greqs the same as buffer/sharing procs indices
+ std::vector<MPI_Request> recv_reqs(2*MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+ std::vector<MPI_Status> gstatus(MAX_SHARING_PROCS);
+ std::set<unsigned int>::iterator sit;
+ for (sit = iface_procs.begin(); sit != iface_procs.end(); sit++) {
+ int ind = get_buffers(*sit);
+ success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(),
+ MPI_UNSIGNED_CHAR, *sit,
+ MB_MESG_ANY, procConfig.proc_comm(),
+ &recv_reqs[ind]);
+ if (success != MPI_SUCCESS) {
+ result = MB_FAILURE;
+ RRA("Failed to post irecv in ghost exchange.");
+ }
+ }
+
+ // pack and send vertex coordinates from this proc to others
+ // make sendReqs vector to simplify initialization
+ std::fill(sendReqs, sendReqs+2*MAX_SHARING_PROCS, MPI_REQUEST_NULL);
+ MBRange recd_ents[MAX_SHARING_PROCS];
+
+ for (sit = iface_procs.begin(); sit != iface_procs.end(); sit++) {
+ int ind = get_buffers(*sit);
+
+ MBRange vertices;
+ for (MBRange::iterator rit = interfaceSets.begin(); rit != interfaceSets.end();
+ rit++) {
+ if (!is_iface_proc(*rit, *sit))
+ continue;
+
+ result = mbImpl->get_entities_by_type( *rit, MBVERTEX, vertices );
+ RRA("Bad interface set.");
+ }
+ std::map<unsigned int,MBRange>::iterator ghosted = ghostedEnts.find(*sit);
+ if (ghosted != ghostedEnts.end()) {
+ MBRange::iterator e = ghosted->second.upper_bound(MBVERTEX);
+ vertices.merge( ghosted->second.begin(), e );
+ }
+
+ // pack-send; this also posts receives if store_remote_handles is true
+ MBRange sent;
+ result = pack_send_entities(*sit, vertices, false, false,
+ false, true,
+ ownerSBuffs[ind], ownerRBuffs[MAX_SHARING_PROCS+ind],
+ sendReqs[ind], recv_reqs[MAX_SHARING_PROCS+ind],
+ sent);
+ RRA("Failed to pack-send in mesh update exchange.");
+ }
+
+ // receive/unpack entities
+ // number of incoming messages depends on whether we're getting back
+ // remote handles
+ int num_incoming = iface_procs.size();
+
+ while (num_incoming) {
+ int ind;
+ MPI_Status status;
+ success = MPI_Waitany(2*MAX_SHARING_PROCS, &recv_reqs[0], &ind, &status);
+ if (MPI_SUCCESS != success) {
+ result = MB_FAILURE;
+ RRA("Failed in waitany in ghost exchange.");
+ }
+
+ // ok, received something; decrement incoming counter
+ num_incoming--;
+
+ std::vector<MBEntityHandle> remote_handles_v, sent_ents_tmp;
+ MBRange remote_handles_r;
+ int new_size;
+
+ // branch on message type
+ switch (status.MPI_TAG) {
+ case MB_MESG_SIZE:
+ // incoming message just has size; resize buffer and re-call recv,
+ // then re-increment incoming count
+ assert(ind < MAX_SHARING_PROCS);
+ new_size = *((int*)&ghostRBuffs[ind][0]);
+ assert(0 > new_size);
+ result = recv_size_buff(buffProcs[ind], ghostRBuffs[ind], recv_reqs[ind],
+ MB_MESG_ENTS);
+ RRA("Failed to resize recv buffer.");
+ num_incoming++;
+ break;
+ case MB_MESG_ENTS:
+ // incoming ghost entities; process
+ result = recv_unpack_entities(buffProcs[ind], true,
+ false,
+ ghostRBuffs[ind], ghostSBuffs[ind],
+ sendReqs[ind], recd_ents[ind]);
+ RRA("Failed to recv-unpack message.");
+ break;
+ }
+ }
+
+ // ok, now wait if requested
+ MPI_Status status[2*MAX_SHARING_PROCS];
+ success = MPI_Waitall(2*MAX_SHARING_PROCS, &sendReqs[0], status);
+ if (MPI_SUCCESS != success) {
+ result = MB_FAILURE;
+ RRA("Failure in waitall in ghost exchange.");
+ }
+
+ return MB_SUCCESS;
+}
MBErrorCode MBParallelComm::update_iface_sets(MBRange &sent_ents,
std::vector<MBEntityHandle> &remote_handles,
int from_proc)
@@ -3641,9 +3888,10 @@
{
if (!partitionTag) {
MBErrorCode result = mbImpl->tag_create(PARALLEL_PARTITION_TAG_NAME,
- MAX_SHARING_PROCS*sizeof(MBEntityHandle),
+ sizeof(int),
MB_TAG_SPARSE,
- MB_TYPE_HANDLE, partitionTag,
+ MB_TYPE_INTEGER,
+ partitionTag,
NULL, true);
if (MB_SUCCESS != result && MB_ALREADY_ALLOCATED != result)
return 0;
@@ -3686,6 +3934,25 @@
return pc_array[index];
}
+MBErrorCode MBParallelComm::get_all_pcomm( MBInterface* impl, std::vector<MBParallelComm*>& list )
+{
+ MBTag pc_tag = pcomm_tag(impl, false);
+ if (0 == pc_tag)
+ return MB_TAG_NOT_FOUND;
+
+ MBParallelComm *pc_array[MAX_SHARING_PROCS];
+ MBErrorCode rval = impl->tag_get_data( pc_tag, 0, 0, pc_array );
+ if (MB_SUCCESS != rval)
+ return rval;
+
+ for (int i = 0; i < MAX_SHARING_PROCS; ++i)
+ if (pc_array[i])
+ list.push_back( pc_array[i] );
+
+ return MB_SUCCESS;
+}
+
+
//! get the indexed pcomm object from the interface
MBParallelComm *MBParallelComm::get_pcomm( MBInterface *impl,
MBEntityHandle prtn,
@@ -3861,6 +4128,15 @@
return MB_SUCCESS;
}
+MBErrorCode MBParallelComm::get_part_handle( int id, MBEntityHandle& handle_out ) const
+{
+ // FIXME: assumes only 1 local part
+ if ((unsigned)id != proc_config().proc_rank())
+ return MB_ENTITY_NOT_FOUND;
+ handle_out = partition_sets().front();
+ return MB_SUCCESS;
+}
+
MBErrorCode MBParallelComm::create_part( MBEntityHandle& set_out )
{
// mark as invalid so we know that it needs to be updated
@@ -3908,6 +4184,7 @@
MBErrorCode MBParallelComm::collective_sync_partition()
{
int count = partition_sets().size();
+ globalPartCount = 0;
int err = MPI_Allreduce( &count, &globalPartCount, 1, MPI_INT, MPI_SUM,
proc_config().proc_comm() );
return err ? MB_FAILURE : MB_SUCCESS;
@@ -3915,12 +4192,12 @@
MBErrorCode MBParallelComm::get_part_neighbor_ids( MBEntityHandle part,
int neighbors_out[MAX_SHARING_PROCS],
- int num_neighbors_out )
+ int& num_neighbors_out )
{
MBErrorCode rval;
MBRange iface;
rval = get_interface_sets( part, iface );
- if (MB_SUCCESS != rval);
+ if (MB_SUCCESS != rval)
return rval;
num_neighbors_out = 0;
@@ -4051,7 +4328,6 @@
return mbImpl->tag_get_data( sharedhs_tag(), &entity, 1, remote_handles );
}
-
#ifdef TEST_PARALLELCOMM
Modified: MOAB/trunk/parallel/MBParallelComm.hpp
===================================================================
--- MOAB/trunk/parallel/MBParallelComm.hpp 2008-11-13 02:45:19 UTC (rev 2227)
+++ MOAB/trunk/parallel/MBParallelComm.hpp 2008-11-13 03:38:11 UTC (rev 2228)
@@ -60,6 +60,8 @@
MPI_Comm comm = MPI_COMM_WORLD,
int* pcomm_id_out = 0);
+ //! Get ID used to reference this PCOMM instance
+ int get_id() const { return pcommID; }
//! get the indexed pcomm object from the interface
static MBParallelComm *get_pcomm(MBInterface *impl, const int index);
@@ -71,6 +73,9 @@
MBEntityHandle partitioning,
const MPI_Comm* comm = 0 );
+ static MBErrorCode get_all_pcomm( MBInterface* impl,
+ std::vector<MBParallelComm*>& list );
+
//! destructor
~MBParallelComm();
@@ -171,6 +176,8 @@
* \param tagh Handle of tag to be exchanged
*/
MBErrorCode exchange_tags(MBTag tagh);
+
+ MBErrorCode exchange_tags( MBTag tag, const MBRange& entities );
/** \brief Broadcast all entities resident on from_proc to other processors
* This function assumes remote handles are *not* being stored, since (usually)
@@ -309,7 +316,9 @@
//! return partition, interface set ranges
MBRange &partition_sets() {return partitionSets;}
+ const MBRange &partition_sets() const {return partitionSets;}
MBRange &interface_sets() {return interfaceSets;}
+ const MBRange &interface_sets() const {return interfaceSets;}
//! return sharedp tag
MBTag sharedp_tag();
@@ -366,12 +375,13 @@
MBErrorCode get_global_part_count( int& count_out ) const;
MBErrorCode get_part_owner( int part_id, int& owner_out ) const;
MBErrorCode get_part_id( MBEntityHandle part, int& id_out ) const;
+ MBErrorCode get_part_handle( int id, MBEntityHandle& handle_out ) const;
MBErrorCode create_part( MBEntityHandle& part_out );
MBErrorCode destroy_part( MBEntityHandle part ) ;
MBErrorCode collective_sync_partition();
MBErrorCode get_part_neighbor_ids( MBEntityHandle part,
int neighbors_out[MAX_SHARING_PROCS],
- int num_neighbors_out );
+ int& num_neighbors_out );
MBErrorCode get_interface_sets( MBEntityHandle part,
MBRange& iface_sets_out,
int* adj_part_id = 0 );
@@ -382,6 +392,11 @@
int part_ids_out[MAX_SHARING_PROCS],
int& num_part_ids_out,
MBEntityHandle remote_handles[MAX_SHARING_PROCS] = 0 );
+
+ // Propogate mesh modification amongst shared entities
+ // from the onwing processor to any procs with copies.
+ MBErrorCode update_shared_mesh();
+
private:
int num_subranges(const MBRange &this_range);
@@ -486,7 +501,7 @@
/**\brief Serialize entity tag data
*
- * This function operates in two pases. The first phase,
+ * This function operates in two passes. The first phase,
* specified by 'just_count == true' calculates the necesary
* buffer size for the serialized data and, optionally populates
* the vectors of tag handles and entity ranges. The second phase
@@ -726,6 +741,8 @@
int globalPartCount; //!< Cache of global part count
MBEntityHandle partitioningSet; //!< entity set containing all parts
+
+ int pcommID;
};
inline MBErrorCode MBParallelComm::get_shared_proc_tags(MBTag &sharedp,
More information about the moab-dev
mailing list