[mpich-discuss] MPI queue management
Robert Kubrick
robertkubrick at gmail.com
Tue Jul 15 11:06:18 CDT 2008
I need to write a simple 1-Publisher/Multiple-Subscribers queue to
distribute messages from a master producer process to N slave
consumers. Each message must be delivered asynchronously and only
once to the first available slave. The slave will process the message
and do some work before returning to accept new messages from the
master queue.
MPI offers a number of collective function to distribute messages
across a communicator but AFAIK none of the API functions will do the
kind of message distribution I described. To be fair, the goal of MPI
is not to offer smart routing or publisher/subscriber
functionalities. I am only listing these functions for some
background in basic MPI message message distribution to better
understand my issue:
- MPI_Bcast sends one or more messages to *all* the processes in the
communicator. This is different than assigning a single message to
each slave as all the messages in the MPI_Bcast buffer are copied to
each slave.
- MPI_Scatter comes closer to the distribution I want. This function
sends each message in the input buffer to a different slave. However,
there is no control on which message will be assigned to which slave.
All processes in the communicator are simply supposed to receive an
equal number of messages. This function is the inverse of MPI_Gather.
Both MPI_Bcast and MPI_Scatter are collective over the communicator,
which means the publisher execution will not continue until all
slaves have received the message. This is a problem in my case
because I want the publisher to keep receiving messages from its own
input source and add them to the queue.
So I came up with a solution based on the shared counter example in
Using MPI, section 7.1.5. The idea is to let the slaves send a
request when they're ready to accept a message from the queue. The
publisher posts non-blocking receives MPI_Irecv/MPI_Testsome to read
incoming messages from its own input channel and listen to slave
requests at the same time:
Publisher pseudo-code:
#include <queue>
#define SLAVEREQ_TAG 1
#define NEWMSG_TAG 2
std::queue myQueue;
MPI_Request reqs[10];
int indices[10];
MPI_Status statuses[10];
int outcount;
double inmsg;
MPI_Comm_size(comm, &size);
/* Issue a receive for each slave process */
for( int i = 0; i < size; i++ )
MPI_Irecv(MPI_BOTTOM, 0, MPI_INT, i, SLAVEREQ_TAG, comm, &reqs
[i]); /* No data, only tag required */
/* Issue a receive on the message input channel */
MPI_Irecv(&inmsg, 1, MPI_DOUBLE, REMOTE_SENDER, NEWMSG_TAG,
othercomm, &reqs[9]);
while ( 1 )
{
MPI_Testsome(10, reqs, &outcount , indices, statuses);
if( outcount == 0 )
continue;
/* Give priority to new incoming message */
if( NEWMSG_TAG message ) {
myQueue.push(inmsg);
/* Repost receive on input channel
MPI_Irecv(&inmsg, 1, MPI_DOUBLE, REMOTE_SENDER, NEWMSG_TAG,
othercomm, &req[9]);
}
for each slave posting a request {
if( myQueue.empty() )
break;
/* Send message from queue */
int requester = MPI_SOURCE;
double msg = myQueue.pop();
MPI_Send(&inmsg, 1, MPI_DOUBLE, requester, SLAVERES_TAG, comm);
/* Repost slave request receive */
MPI_Irecv(0, 0, MPI_INT, slave_rank, SLAVEREQ_TAG, comm, &reqs
[slave_index]);
}
/* Do some extra work.... */
}
Slave pseudo-code:
while ( 1 ) {
MPI_Sendrecv(..., SLAVEREQ_TAG, ...);
/* Do some work with received message.... */
}
Is there any other way this could be implemented? I would appreciate
any comment or reference to work done in this area.
Thanks,
Rob.
More information about the mpich-discuss
mailing list