[MOAB-dev] r3389 - in MOAB/trunk: . parallel tools/iMesh
tautges at mcs.anl.gov
tautges at mcs.anl.gov
Mon Nov 30 17:06:35 CST 2009
Author: tautges
Date: 2009-11-30 17:06:35 -0600 (Mon, 30 Nov 2009)
New Revision: 3389
Modified:
MOAB/trunk/MBParallelConventions.h
MOAB/trunk/MBSkinner.cpp
MOAB/trunk/parallel/MBParallelComm.cpp
MOAB/trunk/parallel/MBParallelComm.hpp
MOAB/trunk/parallel/mbparallelcomm_test.cpp
MOAB/trunk/parallel/pcomm_serial.cpp
MOAB/trunk/parallel/pcomm_unit.cpp
MOAB/trunk/tools/iMesh/iMeshP_unit_tests.cpp
Log:
Fairly major changes to parallel ghosting. First, implemented a handshaking approach for
exchanging large messages; this guarantees that a buffer will be allocated and ready
when a large message arrives. Second, implementing an abstract Buffer object (in this case,
a class in MBParallelComm) for buffering. This provides more explicit control over memory
management in buffers, and makes it easier to debug memory access problems. One shortcoming
of the current code is that there's an MPI_Barrier call between sending/receiving entities and
sending/receiving remote handles. I'll try and eliminate this after parallel testing. I'm also
sure there are plenty of places where this code can be cleaned up and made more bulletproof,
and where comments need to be added/improved.
With this commit, parallel ghost exchange now works for all the serial tests I've run so far.
Next up is to start running parallel and timing tests. A couple of the iMeshP tests still fail also.
Specific changes:
MBSkinner: fix problem causing scdtest to die
MBParallelConventions.h: inserted clarifying comment
pcomm_serial: requesting ghosting using faces as bridge entities
pcomm_unit, mbparallelcomm_test: updated to use new buffering scheme
MBParallelComm: many changes summarized in first part of message
iMeshP_unit_tests.cpp: adding PARTITION option
Modified: MOAB/trunk/MBParallelConventions.h
===================================================================
--- MOAB/trunk/MBParallelConventions.h 2009-11-25 21:22:39 UTC (rev 3388)
+++ MOAB/trunk/MBParallelConventions.h 2009-11-30 23:06:35 UTC (rev 3389)
@@ -83,6 +83,7 @@
#define PSTATUS_SHARED 0x2
#define PSTATUS_MULTISHARED 0x4
#define PSTATUS_INTERFACE 0x8
+// note, these numbers are in hex, so 0x10 is the 4th bit, or 2^4.
#define PSTATUS_GHOST 0x10
#define PSTATUS_AND 0x1
Modified: MOAB/trunk/MBSkinner.cpp
===================================================================
--- MOAB/trunk/MBSkinner.cpp 2009-11-25 21:22:39 UTC (rev 3388)
+++ MOAB/trunk/MBSkinner.cpp 2009-11-30 23:06:35 UTC (rev 3389)
@@ -1261,8 +1261,9 @@
MBEntityType type = TYPE_FROM_HANDLE(elem), tmp_type;
const int ncorner = MBCN::VerticesPerEntity( side_type );
const int d = MBCN::Dimension(side_type);
+ std::vector<MBEntityHandle> storage;
- rval = thisMB->get_connectivity( elem, conn, len, false );
+ rval = thisMB->get_connectivity( elem, conn, len, false, &storage );
if (MB_SUCCESS != rval) return rval;
MBCN::SideNumber( type, conn, side_conn, ncorner, d, side, sense, offset );
Modified: MOAB/trunk/parallel/MBParallelComm.cpp
===================================================================
--- MOAB/trunk/parallel/MBParallelComm.cpp 2009-11-25 21:22:39 UTC (rev 3388)
+++ MOAB/trunk/parallel/MBParallelComm.cpp 2009-11-30 23:06:35 UTC (rev 3389)
@@ -43,7 +43,8 @@
#include "MBmpi.h"
#endif
-const unsigned int INITIAL_BUFF_SIZE = 1024;
+const unsigned int MBParallelComm::INITIAL_BUFF_SIZE = 1024;
+
const int MAX_BCAST_SIZE = (1<<28);
#undef DEBUG_MPE
@@ -62,6 +63,12 @@
//#define DEBUG_COMM 1
#undef DEBUG_PACKING
#undef DEBUG_MSGS
+//#define DEBUG_MSGS 1
+#undef DEBUG_BARRIER
+#define DEBUG_BARRIER 1
+#ifdef DEBUG_MSGS
+std::vector<MBParallelComm::Buffer*> msgs;
+#endif
#ifdef DEBUG_PACKING
unsigned int __PACK_num = 0, __UNPACK_num = 0, __PACK_count = 0, __UNPACK_count = 0;
std::string __PACK_string, __UNPACK_string;
@@ -199,31 +206,67 @@
}
}
-template <typename T> static inline
-void CHECK_BUFF_SPACE( std::vector<T>& buff_vec,
- T*& buff_ptr,
- size_t addl_space )
+enum MBMessageTag {MB_MESG_ANY=MPI_ANY_TAG,
+ MB_MESG_ENTS_ACK,
+ MB_MESG_ENTS_SIZE,
+ MB_MESG_ENTS_LARGE,
+ MB_MESG_REMOTEH_ACK,
+ MB_MESG_REMOTEH_SIZE,
+ MB_MESG_REMOTEH_LARGE,
+ MB_MESG_TAGS_ACK,
+ MB_MESG_TAGS_SIZE,
+ MB_MESG_TAGS_LARGE};
+
+static inline size_t RANGE_SIZE(const MBRange& rng)
+ { return 2*sizeof(MBEntityHandle)*rng.psize()+sizeof(int); }
+
+static inline void PRINT_DEBUG_ISEND(int from, int to, unsigned char *buff,
+ int tag, int size)
{
- size_t old_size = buff_ptr - &buff_vec[0];
- size_t new_size = old_size + addl_space;
- if (new_size > buff_vec.size()) {
- buff_vec.resize(1.5*new_size);
- buff_ptr = &buff_vec[old_size];
- }
+#ifdef DEBUG_COMM
+ std::cerr << "Isend, " << from << "->" << to
+ << ", buffer ptr = " << (void*)buff << ", tag=" << tag
+ << ", size=" << size << std::endl; std::cerr.flush();
+#endif
}
-template<typename T> static inline
-void INIT_BUFFER(std::vector<T>& buff_vec, T*& buff_ptr)
+static inline void PRINT_DEBUG_IRECV(int to, int from, unsigned char *buff, int size,
+ int tag, int incoming)
{
- buff_vec.reserve(INITIAL_BUFF_SIZE);
- buff_vec.resize(sizeof(int));
- buff_ptr = &buff_vec[sizeof(int)];
+#ifdef DEBUG_COMM
+ std::cerr << "Irecv, " << to << "<-" << from << ", buffer ptr=" << (void*)buff
+ << ", size=" << size << ", tag=" << tag
+ << (tag < MB_MESG_REMOTEH_ACK ? ", incoming1=" :
+ (tag < MB_MESG_TAGS_ACK ? ", incoming2=" : ", incoming="))
+ << incoming << std::endl; std::cerr.flush();
+#endif
}
+static inline void PRINT_DEBUG_RECD(MPI_Status status)
+{
+#ifdef DEBUG_COMM
+ int this_count;
+ int success = MPI_Get_count(&status, MPI_UNSIGNED_CHAR, &this_count);
+ if (MPI_SUCCESS != success) this_count = -1;
+ std::cerr << "Received from " << status.MPI_SOURCE
+ << ", count = " << this_count
+ << ", tag = " << status.MPI_TAG
+ << std::endl; std::cerr.flush();
+#endif
+}
-static inline size_t RANGE_SIZE(const MBRange& rng)
- { return 2*sizeof(MBEntityHandle)*rng.psize()+sizeof(int); }
+static inline void PRINT_DEBUG_WAITANY(std::vector<MPI_Request> &reqs, int tag, int proc)
+{
+#ifdef DEBUG_COMM
+ std::cerr << "Waitany, p=" << proc
+ << (tag < MB_MESG_REMOTEH_ACK ? ", recv_ent_reqs = " :
+ (tag < MB_MESG_TAGS_ACK ? ", recv_remoteh_reqs = " : ", recv_tag_reqs = "));
+ for (unsigned int i = 0; i < reqs.size(); i++) std::cerr << " " << reqs[i];
+ std::cerr << std::endl; std::cerr.flush();
+#endif
+}
+
#define RR(a) if (MB_SUCCESS != result) {\
dynamic_cast<MBCore*>(mbImpl)->get_error_handler()->set_last_error(a);\
return result;}
@@ -259,14 +302,6 @@
#define PARALLEL_COMM_TAG_NAME "__PARALLEL_COMM"
-enum MBMessageTag {MB_MESG_ANY=MPI_ANY_TAG,
- MB_MESG_ENTS,
- MB_MESG_ENTS_SECOND,
- MB_MESG_REMOTE_HANDLES,
- MB_MESG_REMOTE_HANDLES_SECOND,
- MB_MESG_TAGS,
- MB_MESG_TAGS_SECOND};
-
MBParallelComm::MBParallelComm(MBInterface *impl, MPI_Comm comm, int* id )
: mbImpl(impl), procConfig(comm),
sharedpTag(0), sharedpsTag(0),
@@ -297,6 +332,7 @@
MBParallelComm::~MBParallelComm()
{
remove_pcomm(this);
+ delete_all_buffers();
}
void MBParallelComm::initialize()
@@ -317,8 +353,8 @@
// reserve space for vectors
buffProcs.reserve(MAX_SHARING_PROCS);
- ownerSBuffs.reserve(MAX_SHARING_PROCS);
- ghostRBuffs.reserve(MAX_SHARING_PROCS);
+ localOwnedBuffs.reserve(MAX_SHARING_PROCS);
+ remoteOwnedBuffs.reserve(MAX_SHARING_PROCS);
pcommID = add_pcomm(this);
}
@@ -446,8 +482,8 @@
if (vit == buffProcs.end()) {
ind = buffProcs.size();
buffProcs.push_back((unsigned int)to_proc);
- ownerSBuffs.push_back(std::vector<unsigned char>());
- ghostRBuffs.push_back(std::vector<unsigned char>(INITIAL_BUFF_SIZE));
+ localOwnedBuffs.push_back(new Buffer(INITIAL_BUFF_SIZE));
+ remoteOwnedBuffs.push_back(new Buffer(INITIAL_BUFF_SIZE));
if (is_new) *is_new = true;
}
else {
@@ -458,96 +494,6 @@
return ind;
}
-MBErrorCode MBParallelComm::send_buffer(const unsigned int to_proc,
- const unsigned char *send_buff,
- unsigned int buff_size,
- int mesg_tag,
- MPI_Request &send_req1,
- MPI_Request &send_req2)
-{
- MBErrorCode result = MB_SUCCESS;
- int success;
-
- // size goes on the front
- *((int*)send_buff) = (int)buff_size;
-
- // send the buffer
-#ifdef DEBUG_COMM
- std::cerr << "Isending " << std::min(buff_size, INITIAL_BUFF_SIZE)
- << " bytes to proc " << to_proc
- << " with message tag " << mesg_tag << std::endl;
-#endif
- success = MPI_Isend(const_cast<unsigned char*>(send_buff),
- std::min(buff_size, INITIAL_BUFF_SIZE),
- MPI_UNSIGNED_CHAR, to_proc,
- mesg_tag, procConfig.proc_comm(), &send_req1);
- if (success != MPI_SUCCESS) return MB_FAILURE;
-
- if (buff_size > INITIAL_BUFF_SIZE) {
-#ifdef DEBUG_COMM
- std::cerr << "Isending " << buff_size-INITIAL_BUFF_SIZE
- << " bytes to proc " << to_proc
- << " with message tag " << mesg_tag+1 << "(second)" << std::endl;
-#endif
- success = MPI_Isend(const_cast<unsigned char*>(send_buff+INITIAL_BUFF_SIZE),
- buff_size-INITIAL_BUFF_SIZE,
- MPI_UNSIGNED_CHAR, to_proc,
- mesg_tag+1, procConfig.proc_comm(), &send_req2);
- // put this inside so we can stop on completion in the debugger
- if (success != MPI_SUCCESS) return MB_FAILURE;
- }
-
- return result;
-}
-
-MBErrorCode MBParallelComm::recv_buffer(int mesg_tag_expected,
- const MPI_Status &mpi_status,
- std::vector<unsigned char> &recv_buff,
- MPI_Request &recv_req,
- bool &done)
-{
- // process a received message, possibly posting a receive for the 2nd part of
- // that message
- //
- // if the message comes in with mesg_tag_expected, check the first integer for
- // size; if size is INITIAL_BUFF_SIZE, post a receive for the 2nd message using
- // the same request object, with an offset buffer; otherwise just signal that we're done
- int success;
- if (mpi_status.MPI_TAG == mesg_tag_expected &&
- ((int*)&recv_buff[0])[0] >= (int)INITIAL_BUFF_SIZE) {
- // verify that we received exactly the right number of bytes
-#ifndef NDEBUG
- int this_count;
- success = MPI_Get_count(const_cast<MPI_Status*>(&mpi_status), MPI_UNSIGNED_CHAR,
- &this_count);
- assert(MPI_SUCCESS == success && this_count == (int)INITIAL_BUFF_SIZE);
-#endif
-
- // need to resize to final size, then resubmit irecv pointing to the
- // offset buffer position
- recv_buff.resize(*((int*)&recv_buff[0]));
-
-#ifdef DEBUG_COMM
- std::cout << "Posting Irecv from " << mpi_status.MPI_SOURCE
- << " for 2nd message." << std::endl;
-#endif
-
- int success = MPI_Irecv(&recv_buff[INITIAL_BUFF_SIZE], recv_buff.size()-INITIAL_BUFF_SIZE,
- MPI_UNSIGNED_CHAR, mpi_status.MPI_SOURCE,
- mesg_tag_expected+1, procConfig.proc_comm(), &recv_req);
- if (MPI_SUCCESS != success) {
- MBErrorCode result = MB_FAILURE;
- RRA("Failed call to Irecv in recv_buffer.");
- }
- done = false;
- }
- else {
- done = true;
- }
-
- return MB_SUCCESS;
-}
-
MBErrorCode MBParallelComm::broadcast_entities( const int from_proc,
MBRange &entities,
const bool adjacencies,
@@ -560,16 +506,19 @@
MBErrorCode result = MB_SUCCESS;
int success;
int buff_size;
-
- std::vector<unsigned char> buff;
- std::vector<int> addl_procs;
+
+ Buffer buff(INITIAL_BUFF_SIZE);
+ buff.reset_ptr(sizeof(int));
if ((int)procConfig.proc_rank() == from_proc) {
result = add_verts(entities);
RRA("Failed to add adj vertices.");
+ buff.reset_ptr(sizeof(int));
result = pack_buffer( entities, adjacencies, tags,
- false, -1, buff, buff_size);
+ false, -1, &buff);
RRA("Failed to compute buffer size in broadcast_entities.");
+ buff.set_stored_size();
+ buff_size = buff.buff_ptr - buff.mem_ptr;
}
success = MPI_Bcast( &buff_size, 1, MPI_INT, from_proc, procConfig.proc_comm() );
@@ -582,12 +531,12 @@
return MB_SUCCESS;
if ((int)procConfig.proc_rank() != from_proc)
- buff.resize(buff_size);
+ buff.reserve(buff_size);
size_t offset = 0;
while (buff_size) {
int size = std::min( buff_size, MAX_BCAST_SIZE );
- success = MPI_Bcast( &buff[offset], size, MPI_UNSIGNED_CHAR, from_proc, procConfig.proc_comm() );
+ success = MPI_Bcast(buff.mem_ptr+offset, size, MPI_UNSIGNED_CHAR, from_proc, procConfig.proc_comm() );
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
RRA("MPI_Bcast of buffer failed.");
@@ -602,7 +551,8 @@
std::vector<std::vector<int> > dum1p;
std::vector<MBEntityHandle> dum2;
std::vector<unsigned int> dum3;
- result = unpack_buffer(&buff[0], false, from_proc, -1,
+ buff.reset_ptr(sizeof(int));
+ result = unpack_buffer(buff.buff_ptr, false, from_proc, -1,
dum1a, dum1b, dum1p, dum2, dum2, dum3, entities);
RRA("Failed to unpack buffer in broadcast_entities.");
}
@@ -616,8 +566,7 @@
const bool tags,
const bool store_remote_handles,
const int to_proc,
- std::vector<unsigned char> &buff,
- int &buff_size)
+ Buffer *buff)
{
// pack the buffer with the entity ranges, adjacencies, and tags sections
//
@@ -635,17 +584,13 @@
MBRange::const_iterator rit;
- // get initial buffer ptr
- buff.resize(1);
- unsigned char *buff_ptr = &buff[0];
-
// entities
- result = pack_entities(orig_ents, buff, buff_ptr,
+ result = pack_entities(orig_ents, buff,
store_remote_handles, to_proc, false);
RRA("Packing entities failed.");
// sets
- result = pack_sets(orig_ents, buff, buff_ptr,
+ result = pack_sets(orig_ents, buff,
store_remote_handles, to_proc);
RRA("Packing sets (count) failed.");
@@ -655,11 +600,10 @@
result = get_tag_send_list(orig_ents, all_tags, tag_ranges );
RRA("Failed to get tagged entities.");
result = pack_tags(orig_ents, all_tags, all_tags, tag_ranges,
- buff, buff_ptr, store_remote_handles, to_proc);
+ buff, store_remote_handles, to_proc);
RRA("Packing tags (count) failed.");
}
- buff_size = buff_ptr - &buff[0];
return result;
}
@@ -803,8 +747,7 @@
}
MBErrorCode MBParallelComm::pack_entities(MBRange &entities,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr,
+ Buffer *buff,
const bool store_remote_handles,
const int to_proc,
const bool is_iface,
@@ -822,7 +765,7 @@
// get an estimate of the buffer size & pre-allocate buffer size
unsigned int buff_size = estimate_ents_buffer_size(entities,
store_remote_handles);
- CHECK_BUFF_SPACE(buff, buff_ptr, buff_size);
+ buff->check_space(buff_size);
MBWriteUtilIface *wu;
MBErrorCode result = mbImpl->query_interface(std::string("MBWriteUtilIface"),
@@ -837,11 +780,10 @@
// buff space is at least proc+handle for each entity; use avg of 4 other procs
// to estimate buff size, but check later
- CHECK_BUFF_SPACE(buff, buff_ptr,
- sizeof(int) + (5*sizeof(int) + sizeof(MBEntityHandle))*entities.size());
+ buff->check_space(sizeof(int) + (5*sizeof(int) + sizeof(MBEntityHandle))*entities.size());
// 1. # entities = E
- PACK_INT(buff_ptr, entities.size());
+ PACK_INT(buff->buff_ptr, entities.size());
MBRange::iterator rit;
@@ -868,11 +810,11 @@
RRA("Failed to build sharedhps.");
// now pack them
- CHECK_BUFF_SPACE(buff, buff_ptr, (num_ents+1)*sizeof(int) +
+ buff->check_space((num_ents+1)*sizeof(int) +
num_ents*sizeof(MBEntityHandle));
- PACK_INT(buff_ptr, num_ents);
- PACK_INTS(buff_ptr, tmp_procs, num_ents);
- PACK_EH(buff_ptr, tmp_handles, num_ents);
+ PACK_INT(buff->buff_ptr, num_ents);
+ PACK_INTS(buff->buff_ptr, tmp_procs, num_ents);
+ PACK_EH(buff->buff_ptr, tmp_handles, num_ents);
#ifndef NDEBUG
// check for duplicates in proc list
@@ -891,17 +833,17 @@
if (num_ents) {
buff_size = 2*sizeof(int) + 3*num_ents*sizeof(double);
- CHECK_BUFF_SPACE(buff, buff_ptr, buff_size);
+ buff->check_space(buff_size);
// type, # ents
- PACK_INT(buff_ptr, ((int) MBVERTEX));
- PACK_INT(buff_ptr, ((int) num_ents));
+ PACK_INT(buff->buff_ptr, ((int) MBVERTEX));
+ PACK_INT(buff->buff_ptr, ((int) num_ents));
- result = mbImpl->get_coords(these_ents, (double*)buff_ptr);
+ result = mbImpl->get_coords(these_ents, (double*)buff->buff_ptr);
RRA("Couldn't get vertex coordinates.");
PC(3*num_ents, " doubles");
- buff_ptr += 3 * num_ents * sizeof(double);
+ buff->buff_ptr += 3 * num_ents * sizeof(double);
#ifdef DEBUG_PACKING
std::cerr << "Packed " << these_ents.size() << " ents of type "
@@ -940,7 +882,7 @@
(!eseq || eseq->type() != last_type ||
last_nodes != (int) eseq->nodes_per_element())) {
result = pack_entity_seq(last_nodes, store_remote_handles,
- to_proc, these_ents, entities, buff, buff_ptr);
+ to_proc, these_ents, entities, buff);
RRA("Failed to pack entities from a sequence.");
these_ents.clear();
}
@@ -964,9 +906,10 @@
}
// pack MBMAXTYPE to indicate end of ranges
- CHECK_BUFF_SPACE(buff, buff_ptr, sizeof(int));
- PACK_INT(buff_ptr, ((int)MBMAXTYPE));
+ buff->check_space(sizeof(int));
+ PACK_INT(buff->buff_ptr, ((int)MBMAXTYPE));
+ buff->set_stored_size();
return MB_SUCCESS;
}
@@ -1044,33 +987,32 @@
const int to_proc,
MBRange &these_ents,
MBRange &entities,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr)
+ Buffer *buff)
{
int tmp_space = 3*sizeof(int) + nodes_per_entity*these_ents.size()*sizeof(MBEntityHandle);
- CHECK_BUFF_SPACE(buff, buff_ptr, tmp_space);
+ buff->check_space(tmp_space);
// pack the entity type
- PACK_INT(buff_ptr, ((int)TYPE_FROM_HANDLE(*these_ents.begin())));
+ PACK_INT(buff->buff_ptr, ((int)TYPE_FROM_HANDLE(*these_ents.begin())));
// pack # ents
- PACK_INT(buff_ptr, these_ents.size());
+ PACK_INT(buff->buff_ptr, these_ents.size());
// pack the nodes per entity
- PACK_INT(buff_ptr, nodes_per_entity);
+ PACK_INT(buff->buff_ptr, nodes_per_entity);
// pack the connectivity
const MBEntityHandle *connect;
int num_connect;
std::vector<MBEntityHandle> dum_connect;
- MBEntityHandle *start_vec = (MBEntityHandle*)buff_ptr;
+ MBEntityHandle *start_vec = (MBEntityHandle*)buff->buff_ptr;
MBErrorCode result = MB_SUCCESS;
for (MBRange::const_iterator rit = these_ents.begin(); rit != these_ents.end(); rit++) {
result = mbImpl->get_connectivity(*rit, connect, num_connect, false,
&dum_connect);
RRA("Failed to get connectivity.");
assert(num_connect == nodes_per_entity);
- PACK_EH(buff_ptr, connect, num_connect);
+ PACK_EH(buff->buff_ptr, connect, num_connect);
}
// substitute destination handles
@@ -1539,27 +1481,26 @@
int mesg_tag,
int from_proc, bool sent)
{
- std::cout << procConfig.proc_rank();
- if (sent) std::cout << " sent";
- else std::cout << " received";
- std::cout << " message type " << mesg_tag
+ std::cerr << procConfig.proc_rank();
+ if (sent) std::cerr << " sent";
+ else std::cerr << " received";
+ std::cerr << " message type " << mesg_tag
<< " to/from proc " << from_proc << "; contents:" << std::endl;
- int msg_length;
+ int msg_length, num_ents;
unsigned char *orig_ptr = buff_ptr;
UNPACK_INT(buff_ptr, msg_length);
- std::cout << msg_length << " bytes..." << std::endl;
+ std::cerr << msg_length << " bytes..." << std::endl;
- if (MB_MESG_ENTS == mesg_tag) {
+ if (MB_MESG_ENTS_SIZE == mesg_tag || MB_MESG_ENTS_LARGE == mesg_tag) {
// 1. # entities = E
- int num_ents;
int i, j, k;
std::vector<int> ps;
std::vector<MBEntityHandle> hs;
UNPACK_INT(buff_ptr, num_ents);
- std::cout << num_ents << " entities..." << std::endl;
+ std::cerr << num_ents << " entities..." << std::endl;
// save place where remote handle info starts, then scan forward to ents
for (i = 0; i < num_ents; i++) {
@@ -1567,18 +1508,19 @@
if (0 > j) return MB_FAILURE;
ps.resize(j);
hs.resize(j);
- std::cout << "Entity " << i << ", # procs = " << j << std::endl;
+ std::cerr << "Entity " << i << ", # procs = " << j << std::endl;
UNPACK_INTS(buff_ptr, &ps[0], j);
UNPACK_EH(buff_ptr, &hs[0], j);
- std::cout << " Procs: ";
- for (k = 0; k < j; k++) std::cout << ps[k] << " ";
- std::cout << std::endl;
- std::cout << " Handles: ";
- for (k = 0; k < j; k++) std::cout << hs[k] << " ";
- std::cout << std::endl;
+ std::cerr << " Procs: ";
+ for (k = 0; k < j; k++) std::cerr << ps[k] << " ";
+ std::cerr << std::endl;
+ std::cerr << " Handles: ";
+ for (k = 0; k < j; k++) std::cerr << hs[k] << " ";
+ std::cerr << std::endl;
if (buff_ptr-orig_ptr > msg_length) {
- std::cout << "End of buffer..." << std::endl;
+ std::cerr << "End of buffer..." << std::endl;
+ std::cerr.flush();
return MB_FAILURE;
}
}
@@ -1600,10 +1542,14 @@
UNPACK_INT(buff_ptr, verts_per_entity);
}
- std::cout << "Type: " << MBCN::EntityTypeName(this_type)
+ std::cerr << "Type: " << MBCN::EntityTypeName(this_type)
<< "; num_ents = " << num_ents2;
- if (MBVERTEX != this_type) std::cout << "; verts_per_ent = " << verts_per_entity;
- std::cout << std::endl;
+ if (MBVERTEX != this_type) std::cerr << "; verts_per_ent = " << verts_per_entity;
+ std::cerr << std::endl;
+ if (num_ents2 < 0 || num_ents2 > msg_length) {
+ std::cerr << "Wrong number of entities, returning." << std::endl;
+ return MB_FAILURE;
+ }
for (int e = 0; e < num_ents2; e++) {
// check for existing entity, otherwise make new one
@@ -1613,7 +1559,7 @@
if (MBVERTEX == this_type) {
coords = (double*) buff_ptr;
buff_ptr += 3*sizeof(double);
- std::cout << "xyz = " << coords[0] << ", " << coords[1] << ", "
+ std::cerr << "xyz = " << coords[0] << ", " << coords[1] << ", "
<< coords[2] << std::endl;
}
else {
@@ -1621,53 +1567,59 @@
buff_ptr += verts_per_entity * sizeof(MBEntityHandle);
// update connectivity to local handles
- std::cout << "Connectivity: ";
- for (k = 0; k < verts_per_entity; k++) std::cout << connect[k] << " ";
- std::cout << std::endl;
+ std::cerr << "Connectivity: ";
+ for (k = 0; k < verts_per_entity; k++) std::cerr << connect[k] << " ";
+ std::cerr << std::endl;
}
if (buff_ptr-orig_ptr > msg_length) {
- std::cout << "End of buffer..." << std::endl;
+ std::cerr << "End of buffer..." << std::endl;
+ std::cerr.flush();
return MB_FAILURE;
}
}
}
}
- else if (MB_MESG_REMOTE_HANDLES) {
- int num_bytes;
- UNPACK_INT(buff_ptr, num_bytes);
- std::cout << num_bytes << " bytes..." << std::endl;
- int num_ents;
+ else if (MB_MESG_REMOTEH_SIZE == mesg_tag || MB_MESG_REMOTEH_LARGE == mesg_tag) {
UNPACK_INT(buff_ptr, num_ents);
+ std::cerr << num_ents << " entities..." << std::endl;
+ if (0 > num_ents || num_ents > msg_length) {
+ std::cerr << "Wrong number of entities, returning." << std::endl;
+ return MB_FAILURE;
+ }
std::vector<MBEntityHandle> L1hloc(num_ents), L1hrem(num_ents);
std::vector<int> L1p(num_ents);
UNPACK_INTS(buff_ptr, &L1p[0], num_ents);
UNPACK_EH(buff_ptr, &L1hrem[0], num_ents);
UNPACK_EH(buff_ptr, &L1hloc[0], num_ents);
- std::cout << num_ents << " Entity pairs; hremote/hlocal/proc: " << std::endl;
+ std::cerr << num_ents << " Entity pairs; hremote/hlocal/proc: " << std::endl;
for (int i = 0; i < num_ents; i++) {
MBEntityType etype = TYPE_FROM_HANDLE(L1hloc[i]);
- std::cout << MBCN::EntityTypeName(etype) << ID_FROM_HANDLE(L1hrem[i]) << ", "
+ std::cerr << MBCN::EntityTypeName(etype) << ID_FROM_HANDLE(L1hrem[i]) << ", "
<< MBCN::EntityTypeName(etype) << ID_FROM_HANDLE(L1hloc[i]) << ", "
<< L1p[i] << std::endl;
}
if (buff_ptr-orig_ptr > msg_length) {
- std::cout << "End of buffer..." << std::endl;
+ std::cerr << "End of buffer..." << std::endl;
+ std::cerr.flush();
return MB_FAILURE;
}
}
- else if (MB_MESG_TAGS) {
- assert(false);
- return MB_FAILURE;
+ else if (mesg_tag == MB_MESG_TAGS_SIZE || mesg_tag == MB_MESG_TAGS_LARGE) {
+ std::cerr << "Printed as ints: " << std::endl;
+ for (int i = 0; i < msg_length; i+= sizeof(int))
+ std::cerr << *((int*)buff_ptr[i]) << std::endl;
}
else {
assert(false);
return MB_FAILURE;
}
+ std::cerr.flush();
+
return MB_SUCCESS;
}
@@ -2096,8 +2048,7 @@
}
MBErrorCode MBParallelComm::pack_sets(MBRange &entities,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr,
+ Buffer *buff,
const bool store_remote_handles,
const int to_proc)
{
@@ -2119,10 +2070,10 @@
MBRange all_sets = entities.subset_by_type(MBENTITYSET);
int buff_size = estimate_sets_buffer_size(all_sets, store_remote_handles);
- CHECK_BUFF_SPACE(buff, buff_ptr, buff_size);
+ buff->check_space(buff_size);
// number of sets
- PACK_INT(buff_ptr, all_sets.size());
+ PACK_INT(buff->buff_ptr, all_sets.size());
// options for all sets
std::vector<unsigned int> options(all_sets.size());
@@ -2133,8 +2084,8 @@
result = mbImpl->get_meshset_options(*rit, options[i]);
RRA("Failed to get meshset options.");
}
- CHECK_BUFF_SPACE(buff, buff_ptr, all_sets.size()*sizeof(unsigned int));
- PACK_VOID(buff_ptr, &options[0], all_sets.size()*sizeof(unsigned int));
+ buff->check_space(all_sets.size()*sizeof(unsigned int));
+ PACK_VOID(buff->buff_ptr, &options[0], all_sets.size()*sizeof(unsigned int));
// vectors/ranges
for (rit = all_sets.begin(), i = 0; rit != all_sets.end(); rit++, i++) {
@@ -2145,33 +2096,32 @@
RRA("Failed to get set entities.");
buff_size = RANGE_SIZE(set_range);
- CHECK_BUFF_SPACE(buff, buff_ptr, buff_size);
- PACK_RANGE(buff_ptr, set_range);
+ buff->check_space(buff_size);
+ PACK_RANGE(buff->buff_ptr, set_range);
}
else if (options[i] & MESHSET_ORDERED) {
members.clear();
result = mbImpl->get_entities_by_handle(*rit, members);
RRA("Failed to get entities in ordered set.");
- CHECK_BUFF_SPACE(buff, buff_ptr,
- members.size()*sizeof(MBEntityHandle)+sizeof(int));
- PACK_INT(buff_ptr, members.size());
- PACK_EH(buff_ptr, &members[0], members.size());
+ buff->check_space(members.size()*sizeof(MBEntityHandle)+sizeof(int));
+ PACK_INT(buff->buff_ptr, members.size());
+ PACK_EH(buff->buff_ptr, &members[0], members.size());
}
}
// pack numbers of parents/children
unsigned int tot_pch = 0;
int num_pch;
- CHECK_BUFF_SPACE(buff, buff_ptr, 2*all_sets.size()*sizeof(int));
+ buff->check_space(2*all_sets.size()*sizeof(int));
for (rit = all_sets.begin(), i = 0; rit != all_sets.end(); rit++, i++) {
// pack parents
result = mbImpl->num_parent_meshsets(*rit, &num_pch);
RRA("Failed to get num parents.");
- PACK_INT(buff_ptr, num_pch);
+ PACK_INT(buff->buff_ptr, num_pch);
tot_pch += num_pch;
result = mbImpl->num_child_meshsets(*rit, &num_pch);
RRA("Failed to get num children.");
- PACK_INT(buff_ptr, num_pch);
+ PACK_INT(buff->buff_ptr, num_pch);
tot_pch += num_pch;
}
@@ -2203,21 +2153,23 @@
ID_FROM_HANDLE(members[__j]) < (int)entities.size()) ||
TYPE_FROM_HANDLE(members[__j]) == MBENTITYSET);
#endif
- CHECK_BUFF_SPACE(buff, buff_ptr, members.size()*sizeof(MBEntityHandle));
- PACK_EH(buff_ptr, &members[0], members.size());
+ buff->check_space(members.size()*sizeof(MBEntityHandle));
+ PACK_EH(buff->buff_ptr, &members[0], members.size());
}
// pack the handles
if (store_remote_handles && !all_sets.empty()) {
buff_size = RANGE_SIZE(all_sets);
- CHECK_BUFF_SPACE(buff, buff_ptr, buff_size);
- PACK_RANGE(buff_ptr, all_sets);
+ buff->check_space(buff_size);
+ PACK_RANGE(buff->buff_ptr, all_sets);
}
#ifdef DEBUG_PACKING
std::cerr << std::endl << "Done packing sets." << std::endl;
#endif
+ buff->set_stored_size();
+
return MB_SUCCESS;
}
@@ -2356,8 +2308,7 @@
const std::vector<MBTag> &src_tags,
const std::vector<MBTag> &dst_tags,
const std::vector<MBRange> &tag_ranges,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr,
+ Buffer *buff,
const bool store_remote_handles,
const int to_proc)
{
@@ -2379,14 +2330,14 @@
// number of tags
count += sizeof(int);
- CHECK_BUFF_SPACE(buff, buff_ptr, count);
+ buff->check_space(count);
- PACK_INT(buff_ptr, src_tags.size());
+ PACK_INT(buff->buff_ptr, src_tags.size());
for (tag_it = src_tags.begin(), dst_it = dst_tags.begin(), rit = tag_ranges.begin();
tag_it != src_tags.end(); tag_it++, dst_it++, rit++) {
- result = pack_tag( *tag_it, *dst_it, *rit, entities, buff, buff_ptr,
+ result = pack_tag( *tag_it, *dst_it, *rit, entities, buff,
store_remote_handles, to_proc );
if (MB_SUCCESS != result)
return result;
@@ -2396,6 +2347,8 @@
std::cerr << std::endl << "Done packing tags." << std::endl;
#endif
+ buff->set_stored_size();
+
return MB_SUCCESS;
}
@@ -2453,8 +2406,7 @@
MBTag dst_tag,
const MBRange &tagged_entities,
const MBRange &whole_range,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr,
+ Buffer *buff,
const bool store_remote_handles,
const int to_proc )
{
@@ -2483,26 +2435,26 @@
}
// size, type, data type
- CHECK_BUFF_SPACE(buff, buff_ptr, 3*sizeof(int));
- PACK_INT(buff_ptr, tinfo->get_size());
+ buff->check_space(3*sizeof(int));
+ PACK_INT(buff->buff_ptr, tinfo->get_size());
MBTagType this_type;
result = mbImpl->tag_get_type(dst_tag, this_type);
- PACK_INT(buff_ptr, (int)this_type);
- PACK_INT(buff_ptr, (int)(tinfo->get_data_type()));
+ PACK_INT(buff->buff_ptr, (int)this_type);
+ PACK_INT(buff->buff_ptr, (int)(tinfo->get_data_type()));
// default value
if (NULL == tinfo->default_value()) {
- CHECK_BUFF_SPACE(buff, buff_ptr, sizeof(int));
- PACK_INT(buff_ptr, 0);
+ buff->check_space(sizeof(int));
+ PACK_INT(buff->buff_ptr, 0);
}
else {
- CHECK_BUFF_SPACE(buff, buff_ptr, tinfo->default_value_size());
- PACK_BYTES(buff_ptr, tinfo->default_value(), tinfo->default_value_size());
+ buff->check_space(tinfo->default_value_size());
+ PACK_BYTES(buff->buff_ptr, tinfo->default_value(), tinfo->default_value_size());
}
// name
- CHECK_BUFF_SPACE(buff, buff_ptr, tinfo->get_name().size());
- PACK_BYTES(buff_ptr, dst_tinfo->get_name().c_str(), dst_tinfo->get_name().size());
+ buff->check_space(tinfo->get_name().size());
+ PACK_BYTES(buff->buff_ptr, dst_tinfo->get_name().c_str(), dst_tinfo->get_name().size());
#ifdef DEBUG_PACKING
std::cerr << "Packing tag \"" << tinfo->get_name() << "\"";
@@ -2511,10 +2463,10 @@
std::cerr << std::endl;
#endif
// pack entities
- CHECK_BUFF_SPACE(buff, buff_ptr, tagged_entities.size()*sizeof(MBEntityHandle)+sizeof(int));
- PACK_INT(buff_ptr, tagged_entities.size());
+ buff->check_space(tagged_entities.size()*sizeof(MBEntityHandle)+sizeof(int));
+ PACK_INT(buff->buff_ptr, tagged_entities.size());
result = get_remote_handles(store_remote_handles,
- tagged_entities, (MBEntityHandle*)buff_ptr, to_proc,
+ tagged_entities, (MBEntityHandle*)buff->buff_ptr, to_proc,
whole_range);
#ifdef DEBUG_PACKING
if (MB_SUCCESS != result) {
@@ -2525,7 +2477,7 @@
RRA("Trouble getting remote handles for tagged entities.");
#endif
- buff_ptr += tagged_entities.size() * sizeof(MBEntityHandle);
+ buff->buff_ptr += tagged_entities.size() * sizeof(MBEntityHandle);
const size_t num_ent = tagged_entities.size();
if (tinfo->get_size() == MB_VARIABLE_LENGTH) {
@@ -2534,18 +2486,18 @@
result = mbImpl->tag_get_data(src_tag, tagged_entities, &var_len_values[0],
&var_len_sizes[0] );
RRA("Failed to get variable-length tag data in pack_tags.");
- CHECK_BUFF_SPACE(buff, buff_ptr, num_ent*sizeof(int));
- PACK_INTS(buff_ptr, &var_len_sizes[0], num_ent);
+ buff->check_space(num_ent*sizeof(int));
+ PACK_INTS(buff->buff_ptr, &var_len_sizes[0], num_ent);
for (unsigned int i = 0; i < num_ent; ++i) {
- CHECK_BUFF_SPACE(buff, buff_ptr, var_len_sizes[i]);
- PACK_VOID(buff_ptr, var_len_values[i], var_len_sizes[i]);
+ buff->check_space(var_len_sizes[i]);
+ PACK_VOID(buff->buff_ptr, var_len_values[i], var_len_sizes[i]);
}
}
else {
- CHECK_BUFF_SPACE(buff, buff_ptr, num_ent * tinfo->get_size());
- result = mbImpl->tag_get_data(src_tag, tagged_entities, buff_ptr);
+ buff->check_space(num_ent * tinfo->get_size());
+ result = mbImpl->tag_get_data(src_tag, tagged_entities, buff->buff_ptr);
RRA("Failed to get tag data in pack_tags.");
- buff_ptr += num_ent * tinfo->get_size();
+ buff->buff_ptr += num_ent * tinfo->get_size();
PC(num_ent*tinfo->get_size(), " void");
}
@@ -3764,22 +3716,31 @@
#endif
#ifdef DEBUG_COMM
- std::cout << "Entering exchange_ghost_cells with num_layers = "
- << num_layers << std::endl;
+// std::ostringstream pfile("p");
+// pfile << "p" << procConfig.proc_rank() << ".txt";
+// std::cerr.open(pfile.str().c_str(), std::ios_base::trunc);
+ std::cerr << "Entering exchange_ghost_cells with num_layers = "
+ << num_layers << std::endl; std::cerr.flush();
#endif
+#ifdef DEBUG_MSGS
+ msgs.clear();
+ msgs.reserve(MAX_SHARING_PROCS);
+#endif
// if we're only finding out about existing ents, we have to be storing
// remote handles too
assert(num_layers > 0 || store_remote_handles);
const bool is_iface = !num_layers;
-
+
// get the b-dimensional interface(s) with with_proc, where b = bridge_dim
int success;
- unsigned char *buff_ptr;
MBErrorCode result = MB_SUCCESS;
+ int incoming1 = 0, incoming2 = 0;
+ reset_all_buffers();
+
// when this function is called, buffProcs should already have any
// communicating procs
@@ -3787,23 +3748,24 @@
// post ghost irecv's for ghost entities from all communicating procs
//===========================================
#ifdef DEBUG_MPE
- MPE_Log_event(ENTITIES_START, procConfig.proc_rank(), "Starting entity exchange.");
+ MPE_Log_event(ENTITIES_START, procConfig.proc_rank(), "Starting entity exchange.");
#endif
// index reqs the same as buffer/sharing procs indices
- std::vector<MPI_Request> recv_reqs(buffProcs.size(), MPI_REQUEST_NULL);
+ std::vector<MPI_Request> recv_ent_reqs(2*buffProcs.size(), MPI_REQUEST_NULL),
+ recv_remoteh_reqs(2*buffProcs.size(), MPI_REQUEST_NULL);
std::vector<unsigned int>::iterator proc_it;
- int ind;
- std::fill(sendReqs, sendReqs+2*buffProcs.size(), MPI_REQUEST_NULL);
+ int ind, p;
+ sendReqs.resize(2*buffProcs.size(), MPI_REQUEST_NULL);
for (ind = 0, proc_it = buffProcs.begin();
proc_it != buffProcs.end(); proc_it++, ind++) {
-#ifdef DEBUG_COMM
- std::cout << "Posting Irecv from " << buffProcs[ind]
- << " for ghost entities." << std::endl;
-#endif
- success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(),
+ incoming1++;
+ PRINT_DEBUG_IRECV(procConfig.proc_rank(), buffProcs[ind],
+ remoteOwnedBuffs[ind]->mem_ptr, INITIAL_BUFF_SIZE,
+ MB_MESG_ENTS_SIZE, incoming1);
+ success = MPI_Irecv(remoteOwnedBuffs[ind]->mem_ptr, INITIAL_BUFF_SIZE,
MPI_UNSIGNED_CHAR, buffProcs[ind],
- MB_MESG_ENTS, procConfig.proc_comm(),
- &recv_reqs[ind]);
+ MB_MESG_ENTS_SIZE, procConfig.proc_comm(),
+ &recv_ent_reqs[2*ind]);
if (success != MPI_SUCCESS) {
result = MB_FAILURE;
RRA("Failed to post irecv in ghost exchange.");
@@ -3816,6 +3778,7 @@
MBRange sent_ents[MAX_SHARING_PROCS], allsent, tmp_range;
std::vector<std::set<unsigned int> > entprocs(allsent.size());
+ int dum_ack_buff;
result = get_sent_ents(is_iface, bridge_dim, ghost_dim, num_layers,
sent_ents, allsent, entprocs);
RRA("get_sent_ents failed.");
@@ -3823,107 +3786,135 @@
//===========================================
// pack and send ents from this proc to others
//===========================================
- for (ind = 0, proc_it = buffProcs.begin();
- proc_it != buffProcs.end(); proc_it++, ind++) {
+ for (p = 0, proc_it = buffProcs.begin();
+ proc_it != buffProcs.end(); proc_it++, p++) {
// reserve space on front for size and for initial buff size
- INIT_BUFFER(ownerSBuffs[ind], buff_ptr);
+ localOwnedBuffs[p]->reset_buffer(sizeof(int));
// entities
- result = pack_entities(sent_ents[ind], ownerSBuffs[ind], buff_ptr,
- store_remote_handles, buffProcs[ind], is_iface,
+ result = pack_entities(sent_ents[p], localOwnedBuffs[p],
+ store_remote_handles, buffProcs[p], is_iface,
&entprocs, &allsent);
RRA("Packing entities failed.");
#ifdef DEBUG_MSGS
- result = print_buffer(&ownerSBuffs[ind][0], MB_MESG_ENTS, *proc_it, true);
+ msgs.resize(msgs.size()+1);
+ msgs.back() = new Buffer(*localOwnedBuffs[p]);
+ //result = print_buffer(&ownerSBuffs[ind][0], MB_MESG_ENTS_SIZE, *proc_it, true);
#endif
-
- // send the buffer (buffer size saved in send_buffer)
- result = send_buffer(*proc_it, &ownerSBuffs[ind][0],
- buff_ptr-&ownerSBuffs[ind][0], MB_MESG_ENTS,
- sendReqs[ind], sendReqs[ind+buffProcs.size()]);
+
+ // send the buffer (size stored in front in send_buffer)
+ result = send_buffer(*proc_it, localOwnedBuffs[p],
+ MB_MESG_ENTS_SIZE, sendReqs[2*p],
+ recv_ent_reqs[2*p+1], &dum_ack_buff,
+ incoming1,
+ MB_MESG_REMOTEH_SIZE,
+ (!is_iface && store_remote_handles ?
+ localOwnedBuffs[p] : NULL),
+ &recv_remoteh_reqs[2*p], &incoming2);
RRA("Failed to Isend in ghost exchange.");
+ }
-// if (1 == num_layers)
-// print_buffer(&ownerSBuffs[ind][0], MB_MESG_ENTS, *proc_it, true);
- }
//===========================================
// receive/unpack new entities
//===========================================
// number of incoming messages for ghosts is the number of procs we
// communicate with; for iface, it's the number of those with lower rank
- int num_incoming = buffProcs.size();
- std::vector<MPI_Status> status(MAX_SHARING_PROCS);
- std::vector<std::vector<MBEntityHandle> > recd_ents(num_incoming);
+ MPI_Status status;
+ std::vector<std::vector<MBEntityHandle> > recd_ents(buffProcs.size());
std::vector<std::vector<MBEntityHandle> > L1hloc(buffProcs.size()), L1hrem(buffProcs.size());
std::vector<std::vector<int> > L1p(buffProcs.size());
std::vector<MBEntityHandle> L2hloc, L2hrem;
std::vector<unsigned int> L2p;
MBRange new_ents;
- while (num_incoming) {
- // wait for all recvs of ghost ents before proceeding,
+ while (incoming1) {
+ // wait for all recvs of ghost ents before proceeding to sending remote handles,
// b/c some procs may have sent to a 3rd proc ents owned by me;
- success = MPI_Waitany(buffProcs.size(), &recv_reqs[0], &ind, &status[0]);
+ PRINT_DEBUG_WAITANY(recv_ent_reqs, MB_MESG_ENTS_SIZE, procConfig.proc_rank());
+
+ success = MPI_Waitany(2*buffProcs.size(), &recv_ent_reqs[0], &ind, &status);
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
RRA("Failed in waitany in ghost exchange.");
}
-#ifdef DEBUG_COMM
- {
- int this_count;
- success = MPI_Get_count(&status[0], MPI_UNSIGNED_CHAR, &this_count);
- if (MPI_SUCCESS != success) this_count = -1;
-
- std::cerr << "Received from " << status[0].MPI_SOURCE
- << ", count = " << this_count << ", tag = " << status[0].MPI_TAG;
- if (MB_MESG_ENTS+1 == status[0].MPI_TAG) std::cerr << " (second)";
- std::cerr << std::endl;
- }
-#endif
+
+ PRINT_DEBUG_RECD(status);
// ok, received something; decrement incoming counter
- num_incoming--;
+ incoming1--;
+ bool done = false;
- bool done = false;
-
- result = recv_buffer(MB_MESG_ENTS, status[0],
- ghostRBuffs[ind], recv_reqs[ind], done);
- RRA("Failed to receive entities.");
-
+ // In case ind is for ack, we need index of one before it
+ unsigned int base_ind = 2*(ind/2);
+ result = recv_buffer(MB_MESG_ENTS_SIZE,
+ status,
+ remoteOwnedBuffs[ind/2],
+ recv_ent_reqs[ind], recv_ent_reqs[ind+1],
+ incoming1,
+ localOwnedBuffs[ind/2], sendReqs[base_ind], sendReqs[base_ind+1],
+ done,
+ (!is_iface && store_remote_handles ?
+ localOwnedBuffs[ind/2] : NULL),
+ MB_MESG_REMOTEH_SIZE,
+ &recv_remoteh_reqs[base_ind], &incoming2);
+ RRA("Failed to receive buffer.");
+
if (done) {
#ifdef DEBUG_MSGS
- print_buffer(&ghostRBuffs[ind][0], MB_MESG_ENTS, buffProcs[ind], false);
-#endif
- unsigned char *buff_ptr = &ghostRBuffs[ind][sizeof(int)];
- result = unpack_entities(buff_ptr,
- store_remote_handles, ind, is_iface,
- L1hloc, L1hrem, L1p, L2hloc, L2hrem, L2p, new_ents);
+ msgs.resize(msgs.size()+1);
+ msgs.back() = new Buffer(*remoteOwnedBuffs[ind/2]);
+ //print_buffer(&ghostRBuffs[ind/2][0], MB_MESG_ENTS_SIZE, buffProcs[ind/2], false);
+#endif
+
+ // message completely received - process buffer that was sent
+ remoteOwnedBuffs[ind/2]->reset_ptr(sizeof(int));
+ result = unpack_entities(remoteOwnedBuffs[ind/2]->buff_ptr,
+ store_remote_handles, ind/2, is_iface,
+ L1hloc, L1hrem, L1p, L2hloc, L2hrem, L2p, new_ents);
if (MB_SUCCESS != result) {
std::cout << "Failed to unpack entities. Buffer contents:" << std::endl;
- print_buffer(&ghostRBuffs[ind][0], MB_MESG_ENTS, buffProcs[ind], false);
+ print_buffer(remoteOwnedBuffs[ind/2]->mem_ptr, MB_MESG_ENTS_SIZE, buffProcs[ind/2], false);
return result;
}
- if (recv_reqs.size() != buffProcs.size()) {
- recv_reqs.resize(buffProcs.size(), MPI_REQUEST_NULL);
+ if (recv_ent_reqs.size() != 2*buffProcs.size()) {
+ // post irecv's for remote handles from new proc; shouldn't be iface,
+ // since we know about all procs we share with
+ assert(!is_iface);
+ recv_remoteh_reqs.resize(2*buffProcs.size(), MPI_REQUEST_NULL);
+ for (unsigned int i = recv_ent_reqs.size(); i < 2*buffProcs.size(); i+=2) {
+ localOwnedBuffs[i/2]->reset_buffer();
+ incoming2++;
+ PRINT_DEBUG_IRECV(procConfig.proc_rank(), buffProcs[i/2],
+ localOwnedBuffs[i/2]->mem_ptr, INITIAL_BUFF_SIZE,
+ MB_MESG_REMOTEH_SIZE, incoming2);
+ success = MPI_Irecv(localOwnedBuffs[i/2]->mem_ptr, INITIAL_BUFF_SIZE,
+ MPI_UNSIGNED_CHAR, buffProcs[i/2],
+ MB_MESG_REMOTEH_SIZE, procConfig.proc_comm(),
+ &recv_remoteh_reqs[i]);
+ if (success != MPI_SUCCESS) {
+ result = MB_FAILURE;
+ RRA("Failed to post irecv for remote handles in ghost exchange.");
+ }
+ }
+ recv_ent_reqs.resize(2*buffProcs.size(), MPI_REQUEST_NULL);
+ sendReqs.resize(2*buffProcs.size(), MPI_REQUEST_NULL);
}
}
- else {
- num_incoming++;
- continue;
- }
}
-
+
// add requests for any new addl procs
- if (recv_reqs.size() != buffProcs.size()) {
- recv_reqs.resize(buffProcs.size(), MPI_REQUEST_NULL);
+ if (recv_ent_reqs.size() != 2*buffProcs.size()) {
+ // shouldn't get here...
+ result = MB_FAILURE;
+ RRA("Requests length doesn't match proc count in ghost exchange.");
}
#ifdef DEBUG_MPE
- MPE_Log_event(ENTITIES_END, procConfig.proc_rank(), "Ending entity exchange.");
+ MPE_Log_event(ENTITIES_END, procConfig.proc_rank(), "Ending entity exchange.");
#endif
if (is_iface) {
@@ -3948,17 +3939,18 @@
MPE_Log_event(IFACE_END, procConfig.proc_rank(), "Ending interface exchange.");
#endif
#ifdef DEBUG_COMM
- std::cout << "Exiting exchange_ghost_cells" << std::endl;
+ std::cerr << "Exiting exchange_ghost_cells" << std::endl; std::cerr.flush();
#endif
- //===========================================
- // wait if requested
- //===========================================
+ //===========================================
+ // wait if requested
+ //===========================================
if (wait_all) {
-#ifdef DEBUG_COMM
+#ifdef DEBUG_BARRIER
success = MPI_Barrier(procConfig.proc_comm());
#else
- success = MPI_Waitall(buffProcs.size(), &recv_reqs[0], &status[0]);
+ success = MPI_Waitall(2*buffProcs.size(), &recv_ent_reqs[0], &status);
+ success = MPI_Waitall(2*buffProcs.size(), &sendReqs[0], &status);
#endif
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
@@ -3967,128 +3959,91 @@
}
return MB_SUCCESS;
}
-
+
//===========================================
- // post recvs for remote handles of my sent ents
- //===========================================
-#ifdef DEBUG_MPE
- MPE_Log_event(RHANDLES_START, procConfig.proc_rank(), "Starting remote handles.");
-#endif
- for (ind = 0, proc_it = buffProcs.begin();
- proc_it != buffProcs.end(); proc_it++, ind++) {
- // skip if iface layer and lower-rank proc
-#ifdef DEBUG_COMM
- std::cout << "Posting Irecv from " << buffProcs[ind]
- << " for remote handles." << std::endl;
-#endif
- success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(),
- MPI_UNSIGNED_CHAR, buffProcs[ind],
- MB_MESG_REMOTE_HANDLES, procConfig.proc_comm(),
- &recv_reqs[ind]);
- if (success != MPI_SUCCESS) {
- result = MB_FAILURE;
- RRA("Failed to post irecv in ghost exchange.");
- }
- }
-
- //===========================================
// send local handles for new ghosts to owner, then add
// those to ghost list for that owner
//===========================================
- for (ind = 0, proc_it = buffProcs.begin();
- proc_it != buffProcs.end(); proc_it++, ind++) {
+ for (p = 0, proc_it = buffProcs.begin();
+ proc_it != buffProcs.end(); proc_it++, p++) {
// reserve space on front for size and for initial buff size
- INIT_BUFFER(ownerSBuffs[ind], buff_ptr);
+ remoteOwnedBuffs[p]->reset_buffer(sizeof(int));
- result = pack_remote_handles(L1hloc[ind], L1hrem[ind], L1p[ind], *proc_it,
- ownerSBuffs[ind], buff_ptr);
+ result = pack_remote_handles(L1hloc[p], L1hrem[p], L1p[p], *proc_it,
+ remoteOwnedBuffs[p]);
RRA("Failed to pack remote handles.");
-
+ remoteOwnedBuffs[p]->set_stored_size();
+
#ifdef DEBUG_MSGS
- print_buffer(&ownerSBuffs[ind][0], MB_MESG_REMOTE_HANDLES, buffProcs[ind], true);
+ msgs.resize(msgs.size()+1);
+ msgs.back() = new Buffer(*remoteOwnedBuffs[p]);
+ //print_buffer(&ownerSBuffs[ind][0], MB_MESG_REMOTEH_SIZE, buffProcs[ind], true);
#endif
- result = send_buffer(buffProcs[ind], &ownerSBuffs[ind][0],
- buff_ptr - &ownerSBuffs[ind][0],
- MB_MESG_REMOTE_HANDLES,
- sendReqs[ind], sendReqs[ind+buffProcs.size()]);
+ result = send_buffer(buffProcs[p], remoteOwnedBuffs[p],
+ MB_MESG_REMOTEH_SIZE,
+ sendReqs[2*p], recv_remoteh_reqs[2*p+1],
+ &dum_ack_buff, incoming2);
RRA("Failed to send remote handles.");
}
//===========================================
// process remote handles of my ghosteds
//===========================================
- num_incoming = buffProcs.size();
- while (num_incoming) {
- success = MPI_Waitany(buffProcs.size(), &recv_reqs[0], &ind, &status[0]);
+ while (incoming2) {
+ PRINT_DEBUG_WAITANY(recv_remoteh_reqs, MB_MESG_REMOTEH_SIZE, procConfig.proc_rank());
+ success = MPI_Waitany(2*buffProcs.size(), &recv_remoteh_reqs[0], &ind, &status);
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
RRA("Failed in waitany in ghost exchange.");
}
// ok, received something; decrement incoming counter
- num_incoming--;
+ incoming2--;
+
+ PRINT_DEBUG_RECD(status);
-#ifdef DEBUG_COMM
- {
- int this_count;
- success = MPI_Get_count(&status[0], MPI_UNSIGNED_CHAR, &this_count);
- if (MPI_SUCCESS != success) this_count = -1;
-
- std::cerr << "Received from " << status[0].MPI_SOURCE
- << ", count = " << this_count << ", tag = " << status[0].MPI_TAG;
- if (MB_MESG_REMOTE_HANDLES_SECOND == status[0].MPI_TAG)
- std::cerr << " (second)";
- std::cerr << std::endl;
- }
-#endif
-
- bool done;
- result = recv_buffer(MB_MESG_REMOTE_HANDLES, status[0],
- ghostRBuffs[ind], recv_reqs[ind], done);
- RRA("Failed to resize recv buffer.");
+ bool done = false;
+ unsigned int base_ind = 2*(ind/2);
+ result = recv_buffer(MB_MESG_REMOTEH_SIZE, status,
+ localOwnedBuffs[ind/2],
+ recv_remoteh_reqs[ind], recv_remoteh_reqs[ind+1], incoming2,
+ remoteOwnedBuffs[ind/2],
+ sendReqs[base_ind], sendReqs[base_ind+1],
+ done);
+ RRA("Failed to receive remote handles.");
if (done) {
// incoming remote handles
#ifdef DEBUG_MSGS
- print_buffer(&ghostRBuffs[ind][0], MB_MESG_REMOTE_HANDLES, buffProcs[ind], false);
+ msgs.resize(msgs.size()+1);
+ msgs.back() = new Buffer(*localOwnedBuffs[ind]);
+ //print_buffer(&remotehRBuffs[ind/2][0], MB_MESG_REMOTEH_SIZE, buffProcs[ind/2], false);
#endif
- buff_ptr = &ghostRBuffs[ind][sizeof(int)];
- result = unpack_remote_handles(buffProcs[ind], buff_ptr,
- L2hloc, L2hrem, L2p);
+ localOwnedBuffs[ind/2]->reset_ptr(sizeof(int));
+ result = unpack_remote_handles(buffProcs[ind/2],
+ localOwnedBuffs[ind/2]->buff_ptr,
+ L2hloc, L2hrem, L2p);
RRA("Failed to unpack remote handles.");
}
- else {
- num_incoming++;
- continue;
- }
}
#ifdef DEBUG_MPE
- MPE_Log_event(RHANDLES_END, procConfig.proc_rank(), "Ending remote handles.");
+ MPE_Log_event(RHANDLES_END, procConfig.proc_rank(), "Ending remote handles.");
#endif
#ifdef DEBUG_MPE
- MPE_Log_event(GHOST_END, procConfig.proc_rank(),
- "Ending ghost exchange (still doing checks).");
+ MPE_Log_event(GHOST_END, procConfig.proc_rank(),
+ "Ending ghost exchange (still doing checks).");
#endif
-#ifndef NDEBUG
- result = check_sent_ents(allsent);
- RRA("Failed check on shared entities.");
- result = check_all_shared_handles();
- RRA("Failed check on all shared handles.");
-#endif
-#ifdef DEBUG_COMM
- std::cout << "Exiting exchange_ghost_cells" << std::endl;
-#endif
-
//===========================================
// wait if requested
//===========================================
if (wait_all) {
-#ifdef DEBUG_COMM
+#ifdef DEBUG_BARRIER
success = MPI_Barrier(procConfig.proc_comm());
#else
- success = MPI_Waitall(buffProcs.size(), &recv_reqs[0], &status[0]);
+ success = MPI_Waitall(2*buffProcs.size(), &recv_remoteh_reqs[0], &status);
+ success = MPI_Waitall(2*buffProcs.size(), &sendReqs[0], &status);
#endif
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
@@ -4096,9 +4051,193 @@
}
}
+#ifndef NDEBUG
+ result = check_sent_ents(allsent);
+ RRA("Failed check on shared entities.");
+ result = check_all_shared_handles();
+ RRA("Failed check on all shared handles.");
+#endif
+#ifdef DEBUG_COMM
+ std::cerr << "Exiting exchange_ghost_cells" << std::endl; std::cerr.flush();
+#endif
+
return MB_SUCCESS;
}
+MBErrorCode MBParallelComm::send_buffer(const unsigned int to_proc,
+ Buffer *send_buff,
+ int mesg_tag,
+ MPI_Request &send_req,
+ MPI_Request &ack_req,
+ int *ack_buff,
+ int &this_incoming,
+ int next_mesg_tag,
+ Buffer *next_recv_buff,
+ MPI_Request *next_recv_req,
+ int *next_incoming)
+{
+ MBErrorCode result = MB_SUCCESS;
+ int success;
+
+ // if small message, post recv for remote handle message
+ if (send_buff->get_stored_size() <= (int)INITIAL_BUFF_SIZE && next_recv_buff) {
+ (*next_incoming)++;
+ PRINT_DEBUG_IRECV(procConfig.proc_rank(), to_proc, next_recv_buff->mem_ptr,
+ INITIAL_BUFF_SIZE, next_mesg_tag, *next_incoming);
+ success = MPI_Irecv(next_recv_buff->mem_ptr, INITIAL_BUFF_SIZE,
+ MPI_UNSIGNED_CHAR, to_proc,
+ next_mesg_tag, procConfig.proc_comm(),
+ next_recv_req);
+ if (success != MPI_SUCCESS) {
+ result = MB_FAILURE;
+ RRA("Failed to post irecv for next message in ghost exchange.");
+ }
+ }
+ // if large, we'll need an ack before sending the rest
+ else if (send_buff->get_stored_size() > (int)INITIAL_BUFF_SIZE) {
+ this_incoming++;
+ PRINT_DEBUG_IRECV(procConfig.proc_rank(), to_proc, (unsigned char*)ack_buff,
+ sizeof(int), mesg_tag-1, this_incoming);
+ success = MPI_Irecv(ack_buff, sizeof(int),
+ MPI_UNSIGNED_CHAR, to_proc,
+ mesg_tag-1, procConfig.proc_comm(),
+ &ack_req);
+ if (success != MPI_SUCCESS) {
+ result = MB_FAILURE;
+ RRA("Failed to post irecv for entity ack in ghost exchange.");
+ }
+ }
+
+ // send the buffer
+ PRINT_DEBUG_ISEND(procConfig.proc_rank(), to_proc, send_buff->mem_ptr, mesg_tag,
+ std::min(send_buff->get_stored_size(), (int)INITIAL_BUFF_SIZE));
+ assert(0 <= send_buff->get_stored_size() &&
+ send_buff->get_stored_size() <= (int)send_buff->alloc_size);
+ success = MPI_Isend(send_buff->mem_ptr,
+ std::min(send_buff->get_stored_size(),
+ (int)INITIAL_BUFF_SIZE),
+ MPI_UNSIGNED_CHAR, to_proc,
+ mesg_tag, procConfig.proc_comm(), &send_req);
+ if (success != MPI_SUCCESS) return MB_FAILURE;
+
+ return result;
+}
+
+MBErrorCode MBParallelComm::recv_buffer(int mesg_tag_expected,
+ const MPI_Status &mpi_status,
+ Buffer *recv_buff,
+ MPI_Request &recv_req,
+ MPI_Request &ack_recvd_req,
+ int &this_incoming,
+ Buffer *send_buff,
+ MPI_Request &send_req,
+ MPI_Request &sent_ack_req,
+ bool &done,
+ Buffer *next_buff,
+ int next_tag,
+ MPI_Request *next_req,
+ int *next_incoming)
+{
+ // process a received message; if there will be more coming,
+ // post a receive for 2nd part then send an ack message
+ //
+ int from_proc = mpi_status.MPI_SOURCE;
+ int success;
+ MBErrorCode result = MB_SUCCESS;
+
+ // set the buff_ptr on the recv_buffer; needs to point beyond any
+ // valid data already in the buffer
+ recv_buff->reset_ptr(std::min(recv_buff->get_stored_size(),
+ (int)recv_buff->alloc_size));
+
+ if (mpi_status.MPI_TAG == mesg_tag_expected &&
+ recv_buff->get_stored_size() > (int)INITIAL_BUFF_SIZE) {
+ // 1st message & large - allocate buffer, post irecv for 2nd message,
+ // then send ack
+ recv_buff->reserve(recv_buff->get_stored_size());
+ assert(recv_buff->alloc_size > INITIAL_BUFF_SIZE);
+
+ // will expect a 2nd message
+ this_incoming++;
+
+ PRINT_DEBUG_IRECV(procConfig.proc_rank(), from_proc,
+ recv_buff->mem_ptr+INITIAL_BUFF_SIZE,
+ recv_buff->get_stored_size() - INITIAL_BUFF_SIZE,
+ mesg_tag_expected+1, this_incoming);
+ success = MPI_Irecv(recv_buff->mem_ptr+INITIAL_BUFF_SIZE,
+ recv_buff->get_stored_size() - INITIAL_BUFF_SIZE,
+ MPI_UNSIGNED_CHAR, from_proc,
+ mesg_tag_expected+1, procConfig.proc_comm(),
+ &recv_req);
+ if (success != MPI_SUCCESS) {
+ result = MB_FAILURE;
+ RRA("Failed to post 2nd iRecv in ghost exchange.");
+ }
+
+ // send ack, doesn't matter what data actually is
+ PRINT_DEBUG_ISEND(procConfig.proc_rank(), from_proc, recv_buff->mem_ptr,
+ mesg_tag_expected-1, sizeof(int));
+ success = MPI_Isend(recv_buff->mem_ptr, sizeof(int),
+ MPI_UNSIGNED_CHAR, from_proc,
+ mesg_tag_expected-1, procConfig.proc_comm(), &sent_ack_req);
+ if (success != MPI_SUCCESS) {
+ result = MB_FAILURE;
+ RRA("Failed to send ack in ghost exchange.");
+ }
+ }
+
+ else if (mpi_status.MPI_TAG == mesg_tag_expected-1) {
+ // got an ack back, send the 2nd half of message
+
+ // should be a large message if we got this
+ assert(*((size_t*)send_buff->mem_ptr) > INITIAL_BUFF_SIZE);
+
+ // post irecv for next message, then send 2nd message
+ if (next_buff) {
+ // we'll expect a return message
+ (*next_incoming)++;
+ PRINT_DEBUG_IRECV(procConfig.proc_rank(), from_proc, next_buff->mem_ptr,
+ INITIAL_BUFF_SIZE, next_tag, *next_incoming);
+
+ success = MPI_Irecv(next_buff->mem_ptr,
+ INITIAL_BUFF_SIZE,
+ MPI_UNSIGNED_CHAR, from_proc,
+ next_tag, procConfig.proc_comm(),
+ next_req);
+ if (success != MPI_SUCCESS) {
+ result = MB_FAILURE;
+ RRA("Failed to post next irecv in ghost exchange.");
+ }
+
+ }
+
+ // send 2nd message
+ PRINT_DEBUG_ISEND(procConfig.proc_rank(), from_proc,
+ send_buff->mem_ptr+INITIAL_BUFF_SIZE,
+ mesg_tag_expected+1,
+ send_buff->get_stored_size() - INITIAL_BUFF_SIZE);
+
+ assert(send_buff->get_stored_size()-INITIAL_BUFF_SIZE < send_buff->alloc_size &&
+ 0 <= send_buff->get_stored_size());
+ success = MPI_Isend(send_buff->mem_ptr+INITIAL_BUFF_SIZE,
+ send_buff->get_stored_size() - INITIAL_BUFF_SIZE,
+ MPI_UNSIGNED_CHAR, from_proc, mesg_tag_expected+1,
+ procConfig.proc_comm(), &send_req);
+ if (success != MPI_SUCCESS) {
+ result = MB_FAILURE;
+ RRA("Failed to send 2nd message in ghost exchange.");
+ }
+ }
+ else if ((mpi_status.MPI_TAG == mesg_tag_expected &&
+ recv_buff->get_stored_size() <= (int)INITIAL_BUFF_SIZE) ||
+ mpi_status.MPI_TAG == mesg_tag_expected+1) {
+ // message completely received - signal that we're done
+ done = true;
+ }
+
+ return MB_SUCCESS;
+}
+
MBErrorCode MBParallelComm::check_clean_iface(MBRange &allsent)
{
// allsent is all entities I think are on interface; go over them, looking
@@ -4124,6 +4263,10 @@
}
}
if (numz) {
+ for (int i = numz; i > 0; i--) {
+ sharedp[nump-i] = -1;
+ sharedh[nump-i] = 0;
+ }
result = set_sharing_data(*rit, pstatus, nump, nump-numz, sharedp, sharedh);
RRA("");
}
@@ -4163,6 +4306,16 @@
RRA("");
result = mbImpl->tag_set_data(sharedhs_tag(), &ent, 1, hs);
RRA("");
+#ifndef NDEBUG
+ {
+ // check for duplicates in proc list
+ std::set<unsigned int> dumprocs;
+ int dp = 0;
+ for (; dp < old_nump && -1 != ps[dp]; dp++)
+ dumprocs.insert(ps[dp]);
+ assert(dp == (int)dumprocs.size());
+ }
+#endif
}
else {
unsigned int j = (ps[0] == (int)procConfig.proc_rank() ? 1 : 0);
@@ -4250,7 +4403,6 @@
const bool is_iface = !num_layers;
unsigned int ind;
- unsigned char *buff_ptr;
MBParallelComm *pc;
MBErrorCode result = MB_SUCCESS;
@@ -4282,12 +4434,9 @@
//===========================================
for (ind = 0; ind < pc->buffProcs.size(); ind++) {
-
- // buff_ptr points to the END (one past last occupied byte) of buffer
- buff_ptr = &pc->ownerSBuffs[ind][0];
-
// entities
- result = pc->pack_entities(sent_ents[p][ind], pc->ownerSBuffs[ind], buff_ptr,
+ pc->localOwnedBuffs[ind]->reset_ptr(sizeof(int));
+ result = pc->pack_entities(sent_ents[p][ind], pc->localOwnedBuffs[ind],
store_remote_handles, pc->buffProcs[ind], is_iface,
&entprocs[p], &allsent[p]);
RRAI(pc->get_moab(), "Packing entities failed.");
@@ -4322,11 +4471,11 @@
// buffer could be empty, which means there isn't any message to
// unpack (due to this comm proc getting added as a result of indirect
// communication); just skip this unpack
- if (pc->ownerSBuffs[ind].empty()) continue;
+ if (pc->localOwnedBuffs[ind]->get_stored_size() == 0) continue;
unsigned int to_p = pc->buffProcs[ind];
- unsigned char *buff_ptr = &pc->ownerSBuffs[ind][0];
- result = pcs[to_p]->unpack_entities(buff_ptr,
+ pc->localOwnedBuffs[ind]->reset_ptr(sizeof(int));
+ result = pcs[to_p]->unpack_entities(pc->localOwnedBuffs[ind]->buff_ptr,
store_remote_handles, ind, is_iface,
L1hloc[to_p], L1hrem[to_p], L1p[to_p], L2hloc[to_p],
L2hrem[to_p], L2p[to_p], new_ents[to_p]);
@@ -4365,9 +4514,9 @@
for (ind = 0, proc_it = pc->buffProcs.begin();
proc_it != pc->buffProcs.end(); proc_it++, ind++) {
// skip if iface layer and higher-rank proc
- unsigned char *buff_ptr = &pc->ownerSBuffs[ind][0];
+ pc->localOwnedBuffs[ind]->reset_ptr(sizeof(int));
result = pc->pack_remote_handles(L1hloc[p][ind], L1hrem[p][ind], L1p[p][ind], *proc_it,
- pc->ownerSBuffs[ind], buff_ptr);
+ pc->localOwnedBuffs[ind]);
RRAI(pc->get_moab(), "Failed to pack remote handles.");
}
}
@@ -4382,7 +4531,9 @@
proc_it != pc->buffProcs.end(); proc_it++, ind++) {
// incoming remote handles
unsigned int to_p = pc->buffProcs[ind];
- result = pcs[to_p]->unpack_remote_handles(p, &pc->ownerSBuffs[ind][0],
+ pc->localOwnedBuffs[ind]->reset_ptr(sizeof(int));
+ result = pcs[to_p]->unpack_remote_handles(p,
+ pc->localOwnedBuffs[ind]->buff_ptr,
L2hloc[to_p], L2hrem[to_p], L2p[to_p]);
RRAI(pc->get_moab(), "Failed to unpack remote handles.");
}
@@ -4469,20 +4620,21 @@
std::vector<MBEntityHandle> &L1hrem,
std::vector<int> &L1p,
unsigned int to_proc,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr)
+ Buffer *buff)
{
// 2 vectors of handles plus ints
- CHECK_BUFF_SPACE(buff, buff_ptr, ((L1p.size()+1)*sizeof(int) +
- (L1hloc.size()+1)*sizeof(MBEntityHandle) +
- (L1hrem.size()+1)*sizeof(MBEntityHandle)));
+ buff->check_space(((L1p.size()+1)*sizeof(int) +
+ (L1hloc.size()+1)*sizeof(MBEntityHandle) +
+ (L1hrem.size()+1)*sizeof(MBEntityHandle)));
// should be in pairs of handles
- PACK_INT(buff_ptr, L1hloc.size());
- PACK_INTS(buff_ptr, &L1p[0], L1p.size());
- PACK_EH(buff_ptr, &L1hrem[0], L1hrem.size());
- PACK_EH(buff_ptr, &L1hloc[0], L1hloc.size());
+ PACK_INT(buff->buff_ptr, L1hloc.size());
+ PACK_INTS(buff->buff_ptr, &L1p[0], L1p.size());
+ PACK_EH(buff->buff_ptr, &L1hrem[0], L1hrem.size());
+ PACK_EH(buff->buff_ptr, &L1hloc[0], L1hloc.size());
+ buff->set_stored_size();
+
return MB_SUCCESS;
}
@@ -4546,6 +4698,7 @@
RRA("Couldn't get bridge ents in the set.");
// need to get layers of bridge-adj entities
+ if (from_ents.empty()) continue;
result = MeshTopoUtil(mbImpl).get_bridge_adjacencies(from_ents, bridge_dim,
ghost_dim, ghosted_ents,
num_layers);
@@ -4596,29 +4749,38 @@
// post ghost irecv's for all interface procs
// index greqs the same as buffer/sharing procs indices
- std::vector<MPI_Request> recv_reqs(MAX_SHARING_PROCS, MPI_REQUEST_NULL);
- std::vector<MPI_Status> gstatus(MAX_SHARING_PROCS);
+ std::vector<MPI_Request> recv_tag_reqs(2*buffProcs.size(), MPI_REQUEST_NULL),
+ sent_ack_reqs(buffProcs.size(), MPI_REQUEST_NULL);
std::vector<unsigned int>::iterator sit;
int ind;
+
+ reset_all_buffers();
+ int incoming = 0;
+
for (ind = 0, sit = buffProcs.begin(); sit != buffProcs.end(); sit++, ind++) {
- success = MPI_Irecv(&ghostRBuffs[ind][0], ghostRBuffs[ind].size(),
+ incoming++;
+ PRINT_DEBUG_IRECV(*sit, procConfig.proc_rank(), remoteOwnedBuffs[ind]->mem_ptr,
+ INITIAL_BUFF_SIZE, MB_MESG_TAGS_SIZE, incoming);
+
+ success = MPI_Irecv(remoteOwnedBuffs[ind]->mem_ptr, INITIAL_BUFF_SIZE,
MPI_UNSIGNED_CHAR, *sit,
- MB_MESG_ANY, procConfig.proc_comm(),
- &recv_reqs[ind]);
+ MB_MESG_TAGS_SIZE, procConfig.proc_comm(),
+ &recv_tag_reqs[ind]);
if (success != MPI_SUCCESS) {
result = MB_FAILURE;
RRA("Failed to post irecv in ghost exchange.");
}
+
}
// pack and send tags from this proc to others
// make sendReqs vector to simplify initialization
- std::fill(sendReqs, sendReqs+2*buffProcs.size(), MPI_REQUEST_NULL);
-
+ sendReqs.resize(2*buffProcs.size(), MPI_REQUEST_NULL);
+
// take all shared entities if incoming list is empty
if (entities.empty()) entities = sharedEnts;
- unsigned char *buff_ptr;
+ int dum_ack_buff;
for (ind = 0, sit = buffProcs.begin(); sit != buffProcs.end(); sit++, ind++) {
@@ -4651,57 +4813,62 @@
// pack the data
// reserve space on front for size and for initial buff size
- INIT_BUFFER(ownerSBuffs[ind], buff_ptr);
+ localOwnedBuffs[ind]->reset_ptr(sizeof(int));
result = pack_tags(tag_ents,
src_tags, dst_tags, tag_ranges,
- ownerSBuffs[ind], buff_ptr, true, *sit);
+ localOwnedBuffs[ind], true, *sit);
RRA("Failed to count buffer in pack_send_tag.");
// now send it
- result = send_buffer(*sit, &ownerSBuffs[ind][0],
- buff_ptr-&ownerSBuffs[ind][0],
- MB_MESG_TAGS, sendReqs[ind], sendReqs[ind+buffProcs.size()]);
+ result = send_buffer(*sit, localOwnedBuffs[ind], MB_MESG_TAGS_SIZE, sendReqs[2*ind],
+ recv_tag_reqs[2*ind+1], &dum_ack_buff, incoming);
RRA("Failed to send buffer.");
}
// receive/unpack tags
- int num_incoming = exch_procs.size();
-
- while (num_incoming) {
+ while (incoming) {
int ind;
MPI_Status status;
- success = MPI_Waitany(MAX_SHARING_PROCS, &recv_reqs[0], &ind, &status);
+ PRINT_DEBUG_WAITANY(recv_tag_reqs, MB_MESG_TAGS_SIZE, procConfig.proc_rank());
+ success = MPI_Waitany(2*buffProcs.size(), &recv_tag_reqs[0], &ind, &status);
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
RRA("Failed in waitany in ghost exchange.");
}
+ PRINT_DEBUG_RECD(status);
+
// ok, received something; decrement incoming counter
- num_incoming--;
+ incoming--;
- bool done;
+ bool done = false;
MBRange dum_range;
- result = recv_buffer(MB_MESG_TAGS, status,
- ghostRBuffs[ind], recv_reqs[ind], done);
+ result = recv_buffer(MB_MESG_TAGS_SIZE,
+ status,
+ remoteOwnedBuffs[ind/2],
+ recv_tag_reqs[ind], recv_tag_reqs[ind+1],
+ incoming,
+ localOwnedBuffs[ind/2], sendReqs[ind], sendReqs[ind+1],
+ done);
RRA("Failed to resize recv buffer.");
if (done) {
- buff_ptr = &ghostRBuffs[ind][sizeof(int)];
- result = unpack_tags(buff_ptr, dum_range, true, buffProcs[ind]);
+ remoteOwnedBuffs[ind]->reset_ptr(sizeof(int));
+ result = unpack_tags(remoteOwnedBuffs[ind]->buff_ptr,
+ dum_range, true, buffProcs[ind]);
RRA("Failed to recv-unpack-tag message.");
}
- else {
- num_incoming++;
- continue;
- }
}
// ok, now wait
-// MPI_Status status[MAX_SHARING_PROCS];
+#ifdef DEBUG_BARRIER
success = MPI_Barrier(procConfig.proc_comm());
-// success = MPI_Waitall(2*buffProcs.size(), &sendReqs[0], status);
+#else
+ MPI_Status status;
+ success = MPI_Waitall(2*buffProcs.size(), &sendReqs[0], status);
+#endif
if (MPI_SUCCESS != success) {
result = MB_FAILURE;
RRA("Failure in waitall in tag exchange.");
@@ -4773,7 +4940,7 @@
RRA("Failed to count buffer in pack_send_tag.");
unsigned char *buff_ptr = &ownerSBuffs[ind][0];
- CHECK_BUFF_SPACE(ownerSBuffs[ind], buff_ptr, buff_size);
+ buff->check_space(ownerSBuffs[ind], buff_ptr, buff_size);
PACK_INT( buff_ptr, 1 ); // number of tags
result = pack_tag( src_tag, dst_tag, proc_ents[*sit], proc_ents[*sit],
ownerSBuffs[ind], buff_ptr, true, *sit );
@@ -5642,7 +5809,7 @@
// send sizes
assert(num_proc == (int)send_data.size());
- std::fill(sendReqs, sendReqs+buffProcs.size(), MPI_REQUEST_NULL);
+ sendReqs.resize(buffProcs.size(), MPI_REQUEST_NULL);
for (int i = 0; i < num_proc; ++i) {
sizes_send[i] = send_data[i].size();
ierr = MPI_Isend( &sizes_send[i], 1, MPI_INT, buffProcs[i], tag, comm, &sendReqs[i] );
Modified: MOAB/trunk/parallel/MBParallelComm.hpp
===================================================================
--- MOAB/trunk/parallel/MBParallelComm.hpp 2009-11-25 21:22:39 UTC (rev 3388)
+++ MOAB/trunk/parallel/MBParallelComm.hpp 2009-11-30 23:06:35 UTC (rev 3389)
@@ -32,6 +32,9 @@
#include <map>
#include <set>
#include <vector>
+#include <iostream>
+#include <fstream>
+#include <assert.h>
#include "math.h"
#include "MBmpi.h"
@@ -484,14 +487,31 @@
MBInterface* get_moab() const { return mbImpl; }
+ class Buffer {
+ public:
+ unsigned char *mem_ptr;
+ unsigned char *buff_ptr;
+ unsigned int alloc_size;
+
+ Buffer(unsigned int sz = 0);
+ Buffer(const Buffer &);
+ ~Buffer();
+ void reset_buffer(size_t buff_pos = 0) {reset_ptr(buff_pos); reserve(INITIAL_BUFF_SIZE);}
+ void reset_ptr(size_t buff_pos = 0) {assert((!mem_ptr && !buff_pos)|| (alloc_size >= buff_pos)); buff_ptr = mem_ptr + buff_pos;}
+ void reserve(unsigned int new_size);
+ void set_stored_size() {*((int*)mem_ptr) = (int)(buff_ptr - mem_ptr);}
+ int get_stored_size() {return *((int*)mem_ptr);}
+
+ void check_space(unsigned int addl_space);
+ };
+
//! public 'cuz we want to unit test these externally
MBErrorCode pack_buffer(MBRange &orig_ents,
const bool adjacencies,
const bool tags,
const bool store_remote_handles,
const int to_proc,
- std::vector<unsigned char> &buff,
- int &buff_size);
+ Buffer *buff);
MBErrorCode unpack_buffer(unsigned char *buff_ptr,
const bool store_remote_handles,
@@ -506,8 +526,7 @@
MBRange &new_ents);
MBErrorCode pack_entities(MBRange &entities,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr,
+ Buffer *buff,
const bool store_remote_handles,
const int to_proc,
const bool is_iface,
@@ -580,13 +599,14 @@
std::vector<MBEntityHandle> &L1hrem,
std::vector<int> &procs,
unsigned int to_proc,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr);
+ Buffer *buff);
MBErrorCode list_entities(const MBEntityHandle *ents, int num_ents);
MBErrorCode list_entities(const MBRange &ents);
+ static const unsigned int INITIAL_BUFF_SIZE;
+
private:
// common initialization code, called from various constructors
@@ -651,19 +671,34 @@
//! send the indicated buffer, possibly sending size first
MBErrorCode send_buffer(const unsigned int to_proc,
- const unsigned char *send_buff,
- const unsigned int buff_size,
- const int msg_type,
- MPI_Request &send_req1,
- MPI_Request &send_req2);
+ Buffer *send_buff,
+ const int msg_tag,
+ MPI_Request &send_req,
+ MPI_Request &ack_recv_req,
+ int *ack_buff,
+ int &this_incoming,
+ int next_mesg_tag = -1,
+ Buffer *next_recv_buff = NULL,
+ MPI_Request *next_recv_req = NULL,
+ int *next_incoming = NULL);
- //! use integer size in buffer to resize buffer, then post an
- //! Irecv to get message
+ //! process incoming message; if longer than the initial size, post
+ //! recv for next part then send ack; if ack, send second part; else
+ //! indicate that we're done and buffer is ready for processing
MBErrorCode recv_buffer(int mesg_tag_expected,
const MPI_Status &mpi_status,
- std::vector<unsigned char> &recv_buff,
- MPI_Request &recv_req,
- bool &done);
+ Buffer *recv_buff,
+ MPI_Request &recv_2nd_req,
+ MPI_Request &ack_req,
+ int &this_incoming,
+ Buffer *send_buff,
+ MPI_Request &send_req,
+ MPI_Request &sent_ack_req,
+ bool &done,
+ Buffer *next_buff = NULL,
+ int next_tag = -1,
+ MPI_Request *next_req = NULL,
+ int *next_incoming = NULL);
//! pack a range of entities with equal # verts per entity, along with
//! the range on the sending proc
@@ -672,8 +707,7 @@
const int to_proc,
MBRange &these_ents,
MBRange &entities,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr);
+ Buffer *buff);
MBErrorCode print_buffer(unsigned char *buff_ptr, int mesg_type, int from_proc,
bool sent);
@@ -686,8 +720,7 @@
std::vector<MBEntityHandle> &recd_ents);
MBErrorCode pack_sets(MBRange &entities,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr,
+ Buffer *buff,
const bool store_handles,
const int to_proc);
@@ -806,8 +839,7 @@
const std::vector<MBTag> &src_tags,
const std::vector<MBTag> &dst_tags,
const std::vector<MBRange> &tag_ranges,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr,
+ Buffer *buff,
const bool store_handles,
const int to_proc);
@@ -849,8 +881,7 @@
MBTag destination_tag,
const MBRange &entities,
const MBRange &whole_range,
- std::vector<unsigned char> &buff,
- unsigned char *&buff_ptr,
+ Buffer *buff,
const bool store_remote_handles,
const int to_proc );
@@ -999,10 +1030,16 @@
SequenceManager *sequenceManager;
//! more data buffers, proc-specific
- std::vector<std::vector<unsigned char> > ownerSBuffs, ghostRBuffs;
+ std::vector<Buffer*> localOwnedBuffs, remoteOwnedBuffs;
+ //! reset message buffers to their initial state
+ void reset_all_buffers();
+
+ //! delete all buffers, freeing up any memory held by them
+ void delete_all_buffers();
+
//! request objects, may be used if store_remote_handles is used
- MPI_Request sendReqs[2*MAX_SHARING_PROCS];
+ std::vector<MPI_Request> sendReqs;
//! processor rank for each buffer index
std::vector<unsigned int> buffProcs;
@@ -1020,10 +1057,95 @@
int globalPartCount; //!< Cache of global part count
MBEntityHandle partitioningSet; //!< entity set containing all parts
+
+ std::ofstream myFile;
int pcommID;
+
};
+inline MBParallelComm::Buffer::Buffer(const Buffer &other_buff)
+{
+ alloc_size = other_buff.alloc_size;
+ mem_ptr = (unsigned char *)malloc(alloc_size);
+ memcpy(mem_ptr, other_buff.mem_ptr, alloc_size);
+ buff_ptr = mem_ptr + (other_buff.buff_ptr - other_buff.mem_ptr);
+}
+
+inline MBParallelComm::Buffer::Buffer(unsigned int new_size)
+ : mem_ptr(NULL), buff_ptr(NULL), alloc_size(0)
+{
+ if (new_size) this->reserve(new_size);
+}
+
+inline MBParallelComm::Buffer::~Buffer()
+{
+ if (mem_ptr) {
+ free(mem_ptr);
+ mem_ptr = NULL;
+ }
+}
+
+#define DEBUG_BUFFER 1
+
+inline void MBParallelComm::Buffer::reserve(unsigned int new_size) {
+
+#ifdef DEBUG_BUFFER
+ int tmp_pos = 0;
+ if (mem_ptr) {
+ tmp_pos = buff_ptr - mem_ptr;
+ }
+ buff_ptr = (unsigned char *)malloc(new_size);
+ assert(0 <= tmp_pos && tmp_pos <= (int)alloc_size);
+ if (tmp_pos) memcpy(buff_ptr, mem_ptr, tmp_pos);
+ if (mem_ptr) free(mem_ptr);
+ mem_ptr = buff_ptr;
+ alloc_size = new_size;
+ buff_ptr = mem_ptr + tmp_pos;
+#else
+ if (mem_ptr && alloc_size < new_size) {
+ size_t tmp_pos = mem_ptr ? buff_ptr - mem_ptr : 0;
+ mem_ptr = (unsigned char *)realloc(mem_ptr, new_size);
+ alloc_size = new_size;
+ buff_ptr = mem_ptr + tmp_pos;
+ }
+ else if (!mem_ptr) {
+ mem_ptr = (unsigned char *)malloc(new_size);
+ alloc_size = new_size;
+ buff_ptr = mem_ptr;
+ }
+#endif
+}
+
+inline void MBParallelComm::Buffer::check_space(unsigned int addl_space )
+{
+ assert(buff_ptr >= mem_ptr && buff_ptr <= mem_ptr+alloc_size);
+ unsigned int new_size = buff_ptr - mem_ptr + addl_space;
+ if (new_size > alloc_size)
+ reserve(1.5*new_size);
+}
+
+inline void MBParallelComm::reset_all_buffers()
+{
+ std::vector<Buffer*>::iterator vit;
+ for (vit = localOwnedBuffs.begin(); vit != localOwnedBuffs.end(); vit++)
+ (*vit)->reset_buffer();
+ for (vit = remoteOwnedBuffs.begin(); vit != remoteOwnedBuffs.end(); vit++)
+ (*vit)->reset_buffer();
+}
+
+inline void MBParallelComm::delete_all_buffers()
+{
+ std::vector<Buffer*>::iterator vit;
+ for (vit = localOwnedBuffs.begin(); vit != localOwnedBuffs.end(); vit++)
+ delete (*vit);
+ localOwnedBuffs.clear();
+
+ for (vit = remoteOwnedBuffs.begin(); vit != remoteOwnedBuffs.end(); vit++)
+ delete (*vit);
+ remoteOwnedBuffs.clear();
+}
+
inline std::vector<unsigned int> &MBParallelComm::buff_procs()
{
return buffProcs;
Modified: MOAB/trunk/parallel/mbparallelcomm_test.cpp
===================================================================
--- MOAB/trunk/parallel/mbparallelcomm_test.cpp 2009-11-25 21:22:39 UTC (rev 3388)
+++ MOAB/trunk/parallel/mbparallelcomm_test.cpp 2009-11-30 23:06:35 UTC (rev 3389)
@@ -401,10 +401,8 @@
MBParallelComm *pcomm = new MBParallelComm(mbImpl);
- std::vector<unsigned char> buff(1024);
- int buff_size;
- result = pcomm->pack_buffer(ents, false, true, false, -1,
- buff, buff_size);
+ MBParallelComm::Buffer buff;
+ result = pcomm->pack_buffer(ents, false, true, false, -1, &buff);
RRA("Packing buffer count (non-stored handles) failed.");
std::vector<std::vector<MBEntityHandle> > L1hloc, L1hrem;
@@ -412,7 +410,8 @@
std::vector<MBEntityHandle> L2hloc, L2hrem;
std::vector<unsigned int> L2p;
- result = pcomm->unpack_buffer(&buff[0], false, -1, -1, L1hloc, L1hrem, L1p, L2hloc,
+ buff.reset_ptr();
+ result = pcomm->unpack_buffer(buff.buff_ptr, false, -1, -1, L1hloc, L1hrem, L1p, L2hloc,
L2hrem, L2p, new_ents);
RRA("Unpacking buffer (non-stored handles) failed.");
Modified: MOAB/trunk/parallel/pcomm_serial.cpp
===================================================================
--- MOAB/trunk/parallel/pcomm_serial.cpp 2009-11-25 21:22:39 UTC (rev 3388)
+++ MOAB/trunk/parallel/pcomm_serial.cpp 2009-11-30 23:06:35 UTC (rev 3389)
@@ -7,6 +7,11 @@
#include "TestUtil.hpp"
#include <vector>
+void print_usage(char *argv)
+{
+ std::cout << "Usage: " << argv << " nprocs filename" << std::endl;
+}
+
int main( int argc, char* argv[] )
{
#ifdef USE_MPI
@@ -14,9 +19,10 @@
#endif
if (1 < argc && !strcmp(argv[1], "-h")) {
- std::cout << "Usage: " << argv[0] << " nprocs filename" << std::endl;
+ print_usage(argv[0]);
return 0;
}
+
int nprocs = 2;
std::string ptag_name("GEOM_DIMENSION");
std::vector<int> partition_tag_vals;
@@ -34,6 +40,11 @@
if (argc > 4) partition_tag_vals.push_back(atoi(argv[4]));
}
else partition_tag_vals.push_back(3);
+
+ if (0 == nprocs) {
+ print_usage(argv[0]);
+ return 1;
+ }
MBErrorCode rval;
MBCore *moab = new MBCore[nprocs]();
@@ -74,7 +85,7 @@
CHECK_ERR(rval);
// now 1 layer of hex ghosts
- rval = MBParallelComm::exchange_ghost_cells(&pc[0], nprocs, 3, 0, 1, true);
+ rval = MBParallelComm::exchange_ghost_cells(&pc[0], nprocs, 3, 2, 1, true);
CHECK_ERR(rval);
for (int i = 0; i < nprocs; i++)
Modified: MOAB/trunk/parallel/pcomm_unit.cpp
===================================================================
--- MOAB/trunk/parallel/pcomm_unit.cpp 2009-11-25 21:22:39 UTC (rev 3388)
+++ MOAB/trunk/parallel/pcomm_unit.cpp 2009-11-30 23:06:35 UTC (rev 3389)
@@ -92,8 +92,6 @@
}
MBParallelComm *pcomm = new MBParallelComm( &moab );
- int size = 0;
- std::vector<unsigned char> buff;
std::vector<int> addl_procs;
// get the necessary vertices too
@@ -103,9 +101,12 @@
CHECK_ERR(rval);
entities.merge(tmp_range);
+ MBParallelComm::Buffer buff(MBParallelComm::INITIAL_BUFF_SIZE);
+ buff.reset_ptr(sizeof(int));
rval = pcomm->pack_buffer( entities, false, true, false,
- -1, buff, size);
+ -1, &buff);
CHECK_ERR(rval);
+ buff.set_stored_size();
delete pcomm;
moab.~MBCore();
@@ -118,8 +119,9 @@
std::vector<std::vector<int> > L1p;
std::vector<MBEntityHandle> L2hloc, L2hrem;
std::vector<unsigned int> L2p;
- rval = pcomm->unpack_buffer( &buff[0], false, -1, -1, L1hloc, L1hrem, L1p, L2hloc,
- L2hrem, L2p, entities);
+ buff.reset_ptr(sizeof(int));
+ rval = pcomm->unpack_buffer(buff.buff_ptr, false, -1, -1, L1hloc, L1hrem, L1p, L2hloc,
+ L2hrem, L2p, entities);
CHECK_ERR(rval);
delete pcomm;
Modified: MOAB/trunk/tools/iMesh/iMeshP_unit_tests.cpp
===================================================================
--- MOAB/trunk/tools/iMesh/iMeshP_unit_tests.cpp 2009-11-25 21:22:39 UTC (rev 3388)
+++ MOAB/trunk/tools/iMesh/iMeshP_unit_tests.cpp 2009-11-30 23:06:35 UTC (rev 3389)
@@ -684,7 +684,8 @@
iBase_EntitySetHandle root_set;
iMesh_getRootSet( imesh, &root_set, &ierr );
- iMeshP_loadAll( imesh, prtn, root_set, FILENAME, 0, &ierr, strlen(FILENAME), 0 );
+ const char *opt = ";;PARTITION=PARALLEL_PARTITION";
+ iMeshP_loadAll( imesh, prtn, root_set, FILENAME, opt, &ierr, strlen(FILENAME), strlen(opt) );
PCHECK;
More information about the moab-dev
mailing list