[MPICH] threaded test code

Steve Angelovich sangelovich at lgc.com
Tue Jul 25 15:27:47 CDT 2006


I have a case where we need to make MPI call on multiple threads.  The 
code sample below was my attempt to try to
test the scenario.

The main thread creates the bcast_gather thread which broadcasts a small 
amount of information and then gathers
some basic statistics from each node.  Once the broadcast thread is 
running a gather_thread is created that then
moves a bunch of float data around.  Once the gather thread has moved 
all the data it will exit, which then
allows the bcast_thread to exit and then the main exits.

I'm running on Redhat AW 3.0 (32 bit) and am using MPICH2-1.0.4-rc1 and 
testing on 16 nodes.

I run into inconsistent behavior.  Sometimes the bcast_thread hangs 
during the last iteration,
sometimes the MPI_Comm_free() hangs and sometimes the MPI_Finalize() 
call hangs.  I'm also seeing
a very large performance difference with the gather operations as 
compared to running it in a single
threaded scenario.  I would expect to see some differences but the 
numbers I'm seeing are orders of
magnitude slower.

I would appreciate any feedback.  Do I have logic errors or am I pushing 
the threading support beyond
what is currently supported.


#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include "mpitest.h"
#include "mpithreadtest.h"
#include <sched.h>

#define DO_DEBUG = 1
#ifdef DO_DEBUG
#define DEBUG(_a){ _a ;fflush(stdout);}
#define DEBUG(_a)

const int GATHER_MSG_LEN = 4096 ;
const int GATHER_ITERATIONS = 10000;

volatile int gather_count = 0 ;
volatile int bcast_done = 0 ;

/* Thread to perform a gather operation.  */
void* gather_thread(void*p) {
    int rank;
    int size ;
    int j=0;
    MPI_Comm *comm_ptr = (MPI_Comm *)p;
    MPI_Comm_rank(*comm_ptr, &rank);
    MPI_Comm_size(*comm_ptr, &size);

    float *pData = malloc(sizeof(float)*GATHER_MSG_LEN);
    float *pResults = malloc(sizeof(float)*GATHER_MSG_LEN*size) ;
    DEBUG(printf( "Gather thread started\n"));
    for(j=0;j<GATHER_ITERATIONS;j++) {
        int i=0 ;
          pData[i] = rank ;
      gather_count = j ;
      MPI_Gather(pData, GATHER_MSG_LEN, MPI_FLOAT, pResults, 
GATHER_MSG_LEN, MPI_FLOAT, 0, *comm_ptr) ;
    free(pData) ;
    free(pResults) ;
    DEBUG(printf( "Gather thread completed\n"));

/* Thread to perform a bcast and then gather */
void* bcast_thread(void*p) {
    int rank;
    int size ;
    int j=0;
    int buf[1] ;
    buf[0] = -1 ;
    MPI_Comm *comm_ptr = (MPI_Comm *)p;
    MPI_Comm_rank(*comm_ptr, &rank);
    MPI_Comm_size(*comm_ptr, &size);

    float *pData = malloc(sizeof(int)*5);
    float *pResults = malloc(sizeof(int)*5*size) ;
    DEBUG(printf( "bcast thread started\n"));
    while(1) {
        j++ ;
      if(rank == 0) {
        sleep(2) ;
        DEBUG(printf("bcast iteration: %d gather_count: %d bcast_done: 
%d\n", j, gather_count, bcast_done)) ;
        buf[0] = bcast_done ;
       MPI_Bcast(&buf, 1, MPI_INT, 0, *comm_ptr) ;
       //MPI_Gather(pData, 5, MPI_INT, pResults, 5, MPI_INT, 0, *comm_ptr) ;
       sched_yield() ;
       if(buf[0] != 0) {
            DEBUG(printf("breaking\n")) ;
            sleep(2) ; //let main get to the join before exiting the thread
         break ;
    free(pData) ;
    free(pResults) ;
    DEBUG(printf( "bcast thread completed\n"));

int main(int argc, char* argv[]) {
    int rank, size, provided;
    pthread_t thr_g, thr_b;
    pthread_attr_t attr_g, attr_b;
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);

    if (provided != MPI_THREAD_MULTIPLE) {
      printf( "This test requires MPI_THREAD_MULTIPLE\n" );
      MPI_Abort( MPI_COMM_WORLD, 1 );

    //get the size and rank
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm comm_g, comm_b ;
    MPI_Comm_dup( MPI_COMM_WORLD, &comm_g );
    MPI_Comm_dup( MPI_COMM_WORLD, &comm_b );
    /* create bcast thread */
    pthread_attr_setdetachstate(&attr_b, PTHREAD_CREATE_JOINABLE);
    pthread_create(&thr_b, &attr_b, bcast_thread,(void *)&comm_b);
    //Let the broadcast thread get started before starting the gather thread
    sleep(5) ;
    /* create gather thread */
    pthread_attr_setdetachstate(&attr_g, PTHREAD_CREATE_JOINABLE);
    pthread_create(&thr_g, &attr_g, gather_thread,(void *)&comm_g);

    //wait till the gather completes.
    DEBUG(printf( "waiting on gather to complete\n"));
    pthread_join(thr_g, 0);
    MPI_Barrier(MPI_COMM_WORLD) ;
    //let the bcast thread know it is time to exit
    DEBUG(printf( "waiting on bcast to complete\n"));
    bcast_done = 1 ;
    pthread_join(thr_b, 0);
    MPI_Barrier(MPI_COMM_WORLD) ;
    DEBUG(printf( "MPI_Comm_free(comm_g)\n"));
    MPI_Comm_free(&comm_g) ;
    DEBUG(printf( "MPI_Comm_free(comm_b)\n"));
    MPI_Comm_free(&comm_b) ;
    DEBUG(printf( "MPI_Finalize\n"));

    /* This program works if it gets here */
    if (rank == 0) {
      printf( " No Errors\n" );

    return 0;
