[MPICH] threaded test code

Steve Angelovich sangelovich at lgc.com
Tue Jul 25 15:27:47 CDT 2006


All,

I have a case where we need to make MPI call on multiple threads.  The 
code sample below was my attempt to try to
test the scenario.

The main thread creates the bcast_gather thread which broadcasts a small 
amount of information and then gathers
some basic statistics from each node.  Once the broadcast thread is 
running a gather_thread is created that then
moves a bunch of float data around.  Once the gather thread has moved 
all the data it will exit, which then
allows the bcast_thread to exit and then the main exits.

I'm running on Redhat AW 3.0 (32 bit) and am using MPICH2-1.0.4-rc1 and 
testing on 16 nodes.

I run into inconsistent behavior.  Sometimes the bcast_thread hangs 
during the last iteration,
sometimes the MPI_Comm_free() hangs and sometimes the MPI_Finalize() 
call hangs.  I'm also seeing
a very large performance difference with the gather operations as 
compared to running it in a single
threaded scenario.  I would expect to see some differences but the 
numbers I'm seeing are orders of
magnitude slower.

I would appreciate any feedback.  Do I have logic errors or am I pushing 
the threading support beyond
what is currently supported.

Thanks,
Steve






#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include "mpitest.h"
#include "mpithreadtest.h"
#include <sched.h>



#define DO_DEBUG = 1
#ifdef DO_DEBUG
#define DEBUG(_a){ _a ;fflush(stdout);}
#else
#define DEBUG(_a)
#endif

const int GATHER_MSG_LEN = 4096 ;
const int GATHER_ITERATIONS = 10000;

volatile int gather_count = 0 ;
volatile int bcast_done = 0 ;

/* Thread to perform a gather operation.  */
void* gather_thread(void*p) {
    int rank;
    int size ;
    int j=0;
    MPI_Comm *comm_ptr = (MPI_Comm *)p;
    MPI_Comm_rank(*comm_ptr, &rank);
    MPI_Comm_size(*comm_ptr, &size);

    float *pData = malloc(sizeof(float)*GATHER_MSG_LEN);
    float *pResults = malloc(sizeof(float)*GATHER_MSG_LEN*size) ;
   
    DEBUG(printf( "Gather thread started\n"));
    for(j=0;j<GATHER_ITERATIONS;j++) {
        int i=0 ;
        for(i=0;i<GATHER_MSG_LEN;i++)
          pData[i] = rank ;
      gather_count = j ;
      MPI_Gather(pData, GATHER_MSG_LEN, MPI_FLOAT, pResults, 
GATHER_MSG_LEN, MPI_FLOAT, 0, *comm_ptr) ;
    }
    free(pData) ;
    free(pResults) ;
    DEBUG(printf( "Gather thread completed\n"));
}

/* Thread to perform a bcast and then gather */
void* bcast_thread(void*p) {
    int rank;
    int size ;
    int j=0;
    int buf[1] ;
   
    buf[0] = -1 ;
    MPI_Comm *comm_ptr = (MPI_Comm *)p;
    MPI_Comm_rank(*comm_ptr, &rank);
    MPI_Comm_size(*comm_ptr, &size);

    float *pData = malloc(sizeof(int)*5);
    float *pResults = malloc(sizeof(int)*5*size) ;
    DEBUG(printf( "bcast thread started\n"));
    while(1) {
        j++ ;
      if(rank == 0) {
        sleep(2) ;
        DEBUG(printf("bcast iteration: %d gather_count: %d bcast_done: 
%d\n", j, gather_count, bcast_done)) ;
        buf[0] = bcast_done ;
       }
       MPI_Bcast(&buf, 1, MPI_INT, 0, *comm_ptr) ;
       //MPI_Gather(pData, 5, MPI_INT, pResults, 5, MPI_INT, 0, *comm_ptr) ;
       sched_yield() ;
       if(buf[0] != 0) {
            DEBUG(printf("breaking\n")) ;
            sleep(2) ; //let main get to the join before exiting the thread
         break ;
       }
    }
    free(pData) ;
    free(pResults) ;
    DEBUG(printf( "bcast thread completed\n"));
}

int main(int argc, char* argv[]) {
    int rank, size, provided;
    pthread_t thr_g, thr_b;
    pthread_attr_t attr_g, attr_b;
   
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);

    if (provided != MPI_THREAD_MULTIPLE) {
      printf( "This test requires MPI_THREAD_MULTIPLE\n" );
      MPI_Abort( MPI_COMM_WORLD, 1 );
    }

    //get the size and rank
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
   
    MPI_Comm comm_g, comm_b ;
    MPI_Comm_dup( MPI_COMM_WORLD, &comm_g );
    MPI_Comm_dup( MPI_COMM_WORLD, &comm_b );
   
    /* create bcast thread */
    pthread_attr_init(&attr_b);
    pthread_attr_setdetachstate(&attr_b, PTHREAD_CREATE_JOINABLE);
    pthread_create(&thr_b, &attr_b, bcast_thread,(void *)&comm_b);
    pthread_attr_destroy(&attr_b);
  
    //Let the broadcast thread get started before starting the gather thread
    sleep(5) ;
    /* create gather thread */
    pthread_attr_init(&attr_g);
    pthread_attr_setdetachstate(&attr_g, PTHREAD_CREATE_JOINABLE);
    pthread_create(&thr_g, &attr_g, gather_thread,(void *)&comm_g);
    pthread_attr_destroy(&attr_g);

    //wait till the gather completes.
    DEBUG(printf( "waiting on gather to complete\n"));
    pthread_join(thr_g, 0);
    MPI_Barrier(MPI_COMM_WORLD) ;
   
    //let the bcast thread know it is time to exit
    DEBUG(printf( "waiting on bcast to complete\n"));
    bcast_done = 1 ;
    pthread_join(thr_b, 0);
    MPI_Barrier(MPI_COMM_WORLD) ;
  
    DEBUG(printf( "MPI_Comm_free(comm_g)\n"));
    MPI_Comm_free(&comm_g) ;
    DEBUG(printf( "MPI_Comm_free(comm_b)\n"));
    MPI_Comm_free(&comm_b) ;
    DEBUG(printf( "MPI_Finalize\n"));
    MPI_Finalize();

    /* This program works if it gets here */
    if (rank == 0) {
      printf( " No Errors\n" );
    }

    return 0;
}
----------------------------------------------------------------------
This e-mail, including any attached files, may contain confidential and privileged information for the sole use of the intended recipient.  Any review, use, distribution, or disclosure by others is strictly prohibited.  If you are not the intended recipient (or authorized to receive information for the intended recipient), please contact the sender by reply e-mail and delete all copies of this message.




More information about the mpich-discuss mailing list