[mpich-discuss] MPI queue management

Robert Kubrick robertkubrick at gmail.com
Tue Jul 15 11:06:18 CDT 2008


I need to write a simple 1-Publisher/Multiple-Subscribers queue to  
distribute messages from a master producer process to N slave  
consumers. Each message must be delivered asynchronously and only  
once to the first available slave. The slave will process the message  
and do some work before returning to accept new messages from the  
master queue.

MPI offers a number of collective function to distribute messages  
across a communicator but AFAIK none of the API functions will do the  
kind of message distribution I described. To be fair, the goal of MPI  
is not to offer smart routing or publisher/subscriber  
functionalities. I am only listing these functions for some  
background in basic MPI message message distribution to better  
understand my issue:

- MPI_Bcast sends one or more messages to *all* the processes in the  
communicator. This is different than assigning a single message to  
each slave as all the messages in the MPI_Bcast buffer are copied to  
each slave.

- MPI_Scatter comes closer to the distribution I want. This function  
sends each message in the input buffer to a different slave. However,  
there is no control on which message will be assigned to which slave.  
All processes in the communicator are simply supposed to receive an  
equal number of messages. This function is the inverse of MPI_Gather.

Both MPI_Bcast and MPI_Scatter are collective over the communicator,  
which means the publisher execution will not continue until all  
slaves have received the message. This is a problem in my case  
because I want the publisher to keep receiving messages from its own  
input source and add them to the queue.

So I came up with a solution based on the shared counter example in  
Using MPI, section 7.1.5. The idea is to let the slaves send a  
request when they're ready to accept a message from the queue. The  
publisher posts non-blocking receives MPI_Irecv/MPI_Testsome to read  
incoming messages from its own input channel and listen to slave  
requests at the same time:

Publisher pseudo-code:

#include <queue>

#define SLAVEREQ_TAG 1
#define NEWMSG_TAG 2

std::queue myQueue;

MPI_Request reqs[10];
int indices[10];
MPI_Status statuses[10];
int outcount;

double inmsg;
MPI_Comm_size(comm, &size);

/* Issue a receive for each slave process */
for( int i = 0; i < size; i++ )
   MPI_Irecv(MPI_BOTTOM, 0, MPI_INT, i, SLAVEREQ_TAG, comm, &reqs 
[i]); /* No data, only tag required */

/* Issue a receive on the message input channel */
MPI_Irecv(&inmsg, 1, MPI_DOUBLE, REMOTE_SENDER, NEWMSG_TAG,  
othercomm, &reqs[9]);

while ( 1 )
{
   MPI_Testsome(10, reqs, &outcount , indices, statuses);
   if( outcount == 0 )
     continue;

   /* Give priority to new incoming message */
   if( NEWMSG_TAG message ) {
     myQueue.push(inmsg);
     /* Repost receive on input channel
     MPI_Irecv(&inmsg, 1, MPI_DOUBLE, REMOTE_SENDER, NEWMSG_TAG,  
othercomm, &req[9]);
   }

   for each slave posting a request {

     if( myQueue.empty() )
       break;

     /* Send message from queue */
     int requester = MPI_SOURCE;
     double msg = myQueue.pop();
     MPI_Send(&inmsg, 1, MPI_DOUBLE, requester, SLAVERES_TAG, comm);

     /* Repost slave request receive */
     MPI_Irecv(0, 0, MPI_INT, slave_rank, SLAVEREQ_TAG, comm, &reqs 
[slave_index]);
   }

   /* Do some extra work.... */
}


Slave pseudo-code:

while ( 1 ) {
   MPI_Sendrecv(..., SLAVEREQ_TAG, ...);
   /* Do some work with received message.... */
}


Is there any other way this could be implemented? I would appreciate  
any comment or reference to work done in this area.

Thanks,
Rob.




More information about the mpich-discuss mailing list