[MPICH] threaded test code
Rajeev Thakur
thakur at mcs.anl.gov
Tue Jul 25 17:14:30 CDT 2006
Steve,
Your code is probably running into a known thread-related bug
1.0.4-rc1, which has since been fixed. Your program runs fine with our
latest code -- fine in the sense that it doesn't hang. I don't know about
performance. I ran it with as many as 17 processes on a single machine
(after reducing GATHER_ITERATIONS to 100).
Rajeev
> -----Original Message-----
> From: owner-mpich-discuss at mcs.anl.gov
> [mailto:owner-mpich-discuss at mcs.anl.gov] On Behalf Of Steve Angelovich
> Sent: Tuesday, July 25, 2006 3:28 PM
> To: mpich-discuss at mcs.anl.gov
> Subject: [MPICH] threaded test code
>
> All,
>
> I have a case where we need to make MPI call on multiple
> threads. The
> code sample below was my attempt to try to
> test the scenario.
>
> The main thread creates the bcast_gather thread which
> broadcasts a small
> amount of information and then gathers
> some basic statistics from each node. Once the broadcast thread is
> running a gather_thread is created that then
> moves a bunch of float data around. Once the gather thread has moved
> all the data it will exit, which then
> allows the bcast_thread to exit and then the main exits.
>
> I'm running on Redhat AW 3.0 (32 bit) and am using
> MPICH2-1.0.4-rc1 and
> testing on 16 nodes.
>
> I run into inconsistent behavior. Sometimes the bcast_thread hangs
> during the last iteration,
> sometimes the MPI_Comm_free() hangs and sometimes the MPI_Finalize()
> call hangs. I'm also seeing
> a very large performance difference with the gather operations as
> compared to running it in a single
> threaded scenario. I would expect to see some differences but the
> numbers I'm seeing are orders of
> magnitude slower.
>
> I would appreciate any feedback. Do I have logic errors or
> am I pushing
> the threading support beyond
> what is currently supported.
>
> Thanks,
> Steve
>
>
>
>
>
>
> #include "mpi.h"
> #include <stdio.h>
> #include <stdlib.h>
> #include "mpitest.h"
> #include "mpithreadtest.h"
> #include <sched.h>
>
>
>
> #define DO_DEBUG = 1
> #ifdef DO_DEBUG
> #define DEBUG(_a){ _a ;fflush(stdout);}
> #else
> #define DEBUG(_a)
> #endif
>
> const int GATHER_MSG_LEN = 4096 ;
> const int GATHER_ITERATIONS = 10000;
>
> volatile int gather_count = 0 ;
> volatile int bcast_done = 0 ;
>
> /* Thread to perform a gather operation. */
> void* gather_thread(void*p) {
> int rank;
> int size ;
> int j=0;
> MPI_Comm *comm_ptr = (MPI_Comm *)p;
> MPI_Comm_rank(*comm_ptr, &rank);
> MPI_Comm_size(*comm_ptr, &size);
>
> float *pData = malloc(sizeof(float)*GATHER_MSG_LEN);
> float *pResults = malloc(sizeof(float)*GATHER_MSG_LEN*size) ;
>
> DEBUG(printf( "Gather thread started\n"));
> for(j=0;j<GATHER_ITERATIONS;j++) {
> int i=0 ;
> for(i=0;i<GATHER_MSG_LEN;i++)
> pData[i] = rank ;
> gather_count = j ;
> MPI_Gather(pData, GATHER_MSG_LEN, MPI_FLOAT, pResults,
> GATHER_MSG_LEN, MPI_FLOAT, 0, *comm_ptr) ;
> }
> free(pData) ;
> free(pResults) ;
> DEBUG(printf( "Gather thread completed\n"));
> }
>
> /* Thread to perform a bcast and then gather */
> void* bcast_thread(void*p) {
> int rank;
> int size ;
> int j=0;
> int buf[1] ;
>
> buf[0] = -1 ;
> MPI_Comm *comm_ptr = (MPI_Comm *)p;
> MPI_Comm_rank(*comm_ptr, &rank);
> MPI_Comm_size(*comm_ptr, &size);
>
> float *pData = malloc(sizeof(int)*5);
> float *pResults = malloc(sizeof(int)*5*size) ;
> DEBUG(printf( "bcast thread started\n"));
> while(1) {
> j++ ;
> if(rank == 0) {
> sleep(2) ;
> DEBUG(printf("bcast iteration: %d gather_count: %d
> bcast_done:
> %d\n", j, gather_count, bcast_done)) ;
> buf[0] = bcast_done ;
> }
> MPI_Bcast(&buf, 1, MPI_INT, 0, *comm_ptr) ;
> //MPI_Gather(pData, 5, MPI_INT, pResults, 5, MPI_INT,
> 0, *comm_ptr) ;
> sched_yield() ;
> if(buf[0] != 0) {
> DEBUG(printf("breaking\n")) ;
> sleep(2) ; //let main get to the join before
> exiting the thread
> break ;
> }
> }
> free(pData) ;
> free(pResults) ;
> DEBUG(printf( "bcast thread completed\n"));
> }
>
> int main(int argc, char* argv[]) {
> int rank, size, provided;
> pthread_t thr_g, thr_b;
> pthread_attr_t attr_g, attr_b;
>
> MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
>
> if (provided != MPI_THREAD_MULTIPLE) {
> printf( "This test requires MPI_THREAD_MULTIPLE\n" );
> MPI_Abort( MPI_COMM_WORLD, 1 );
> }
>
> //get the size and rank
> MPI_Comm_rank(MPI_COMM_WORLD, &rank);
> MPI_Comm_size(MPI_COMM_WORLD, &size);
>
> MPI_Comm comm_g, comm_b ;
> MPI_Comm_dup( MPI_COMM_WORLD, &comm_g );
> MPI_Comm_dup( MPI_COMM_WORLD, &comm_b );
>
> /* create bcast thread */
> pthread_attr_init(&attr_b);
> pthread_attr_setdetachstate(&attr_b, PTHREAD_CREATE_JOINABLE);
> pthread_create(&thr_b, &attr_b, bcast_thread,(void *)&comm_b);
> pthread_attr_destroy(&attr_b);
>
> //Let the broadcast thread get started before starting
> the gather thread
> sleep(5) ;
> /* create gather thread */
> pthread_attr_init(&attr_g);
> pthread_attr_setdetachstate(&attr_g, PTHREAD_CREATE_JOINABLE);
> pthread_create(&thr_g, &attr_g, gather_thread,(void *)&comm_g);
> pthread_attr_destroy(&attr_g);
>
> //wait till the gather completes.
> DEBUG(printf( "waiting on gather to complete\n"));
> pthread_join(thr_g, 0);
> MPI_Barrier(MPI_COMM_WORLD) ;
>
> //let the bcast thread know it is time to exit
> DEBUG(printf( "waiting on bcast to complete\n"));
> bcast_done = 1 ;
> pthread_join(thr_b, 0);
> MPI_Barrier(MPI_COMM_WORLD) ;
>
> DEBUG(printf( "MPI_Comm_free(comm_g)\n"));
> MPI_Comm_free(&comm_g) ;
> DEBUG(printf( "MPI_Comm_free(comm_b)\n"));
> MPI_Comm_free(&comm_b) ;
> DEBUG(printf( "MPI_Finalize\n"));
> MPI_Finalize();
>
> /* This program works if it gets here */
> if (rank == 0) {
> printf( " No Errors\n" );
> }
>
> return 0;
> }
> ----------------------------------------------------------------------
> This e-mail, including any attached files, may contain
> confidential and privileged information for the sole use of
> the intended recipient. Any review, use, distribution, or
> disclosure by others is strictly prohibited. If you are not
> the intended recipient (or authorized to receive information
> for the intended recipient), please contact the sender by
> reply e-mail and delete all copies of this message.
>
>
More information about the mpich-discuss
mailing list