[mpich-discuss] MPI_Recv: MPI + Pthreads

Olya Krachina okrachin at purdue.edu
Tue Apr 22 23:16:06 CDT 2008


> Can you send us a sample code that fails? I'm assuming you are using 
> mpich2-1.0.7 (the latest version).
yes, i am using latest version, here's a portion of the code:

#define NUMTHREADS 8
pthread_t threads[NUMTHREADS];

#define TAG_MATRIX_PARTITION	0x1234

typedef struct
{  unsigned int   m, n;  // Rows, cols
   int        *data;  // Data, ordered by row, then by col
   int       **rows;  // Pointers to rows in data
} TMatrix;

// Globals
int processor_rank  = 0;
int processor_count = 1;

typedef struct
{
        TMatrix A;
        TMatrix B;
        TMatrix C;
        int thread_id;

} thread_args; //pass this to the threads 
thread_args thread_data[NUMTHREADS];

int main (int argc, char *argv[])
{  MPI_Status   status;
   TMatrix      A,B,C,D;
   
      MPI_Init(&argc, &argv)  
      MPI_Comm_size (MPI_COMM_WORLD, &processor_count);
      MPI_Comm_rank (MPI_COMM_WORLD, &processor_rank );

      if (processor_rank == 0)
      {  
      	printf ("Enter matrix dimension n : "); scanf("%u", &n);
         // Allocate memory for matrices
	 // Initialize matrices
         
         // Broadcast (send) size of matrix
         MPI_Bcast((void *)&n, 1, MPI_INT, 0, MPI_COMM_WORLD); 
         m = n / processor_count;
     
         // Broadcast (send) B matrix
         MPI_Bcast((void *)B.data, n*n, MPI_INT, 0, MPI_COMM_WORLD);

         // Send each process it's own part of A
	 offset = n - m * (processor_count - 1);
	 for (i = 1; i < processor_count; i++)
	 {
	    MPI_Send((void *)A.rows[offset], m*n, MPI_INT,
	             i, TAG_MATRIX_PARTITION, MPI_COMM_WORLD);
	    offset += m; //update teh offset
	}
	 A.m = n - m * (processor_count -1);
	
	 for (x = 0; x < NUMTHREADS; x++)
                {
                        //pass in the structure A, B, C and thread_id;
                        thread_data[x].A = A;
                        thread_data[x].B = B;
                        thread_data[x].C = C;
                        thread_data[x].thread_id = x;
                        pthread_create(&threads[x], NULL, doMyWork,
                           (void*)&thread_data[x]);
		
                }
                for (x = 0; x < NUMTHREADS; x++)
                        pthread_join(threads[x], NULL);
			
	 	A.m = n; //reset A's dimension
	 // Receive part of C matrix from each process
	 offset = n - m * (processor_count - 1);
	 
	 for (i = 1; i < processor_count; i++)
	 {  
	 	//something is WROOOONG here!
	 	MPI_Recv((void *)C.rows[offset], m*n, MPI_INT,
	             i, TAG_MATRIX_PARTITION, MPI_COMM_WORLD, &status);
	        offset += m;
	 }
         
      }
      else
      {
         // Broadcast (receive) size of matrix
         MPI_Bcast((void *)&n, 1, MPI_INT, 0, MPI_COMM_WORLD); 

         // Allocate memory for matrices
         m = n / processor_count;
	 A = createMatrix(m, n); //A is of a different size
	 B = createMatrix(n ,n);
     
         // Broadcast (receive) B matrix, everyone has to do this
         MPI_Bcast((void *)B.data, n*n, MPI_INT, 0, MPI_COMM_WORLD);
	 MPI_Recv((void *)A.data, m*n, MPI_INT, 0, TAG_MATRIX_PARTITION,
                 MPI_COMM_WORLD, &status[0]);

	        for (x = 0; x < NUMTHREADS; x++)
                {
                        thread_data[x].A = A;
                        thread_data[x].B = B;
                        thread_data[x].C = C;
                        thread_data[x].thread_id = x;
                        pthread_create(&threads[x], NULL, doMyWork, 
                                           (void*)&thread_data[x]);

                }
                for (x = 0; x < NUMTHREADS; x++)
                        pthread_join(threads[x], NULL);

	  MPI_Send((void *)C.data, m*n, MPI_INT, 0, 
                TAG_MATRIX_PARTITION,MPI_COMM_WORLD);

      }
   MPI_Barrier(MPI_COMM_WORLD);
   MPI_Finalize();
   pthread_exit(NULL);

   return 0;
}

each thread does this: it basically takes a portion of A assigned to this MPI 
process and splits it in narrower strips of rows, performs multiplication and 
returns part of resulting matrix.

void *doMyWork(void *thread_datas)
{
        int i, j, k, r, N;
        thread_args *my_data_ptr = NULL;
        my_data_ptr = (thread_args *)thread_datas;
        int Id = my_data_ptr->thread_id; //thread_id
        int A_rows = 0;

        TMatrix myA, myB, myC;
        myA = my_data_ptr->A;
        myB = my_data_ptr->B;
        myC = my_data_ptr->C;
        N = my_data_ptr->B.n;
        A_rows = my_data_ptr->A.m;

        int from = (Id*A_rows) >> 3;
        int to = ((Id+1)*A_rows) >> 3;
	
        for (i = from;  i < to; i++)
        {
          for (j = 0; j < N; j++)
          {
             r = 0;
             for (k = 0; k < N; k++)
             {
                r = r +  myA.rows[i][k]*myB.rows[k][j];
             }
                myC.rows[i][j] = myC.rows[i][j] + r;
          }
        }
}

> Btw, the most common error people make while using threads (assuming all 
> threads perform MPI calls) is to use MPI_Init() -- you should use 
> MPI_Init_thread().
I will try that..... i am not sure what you mean, but i am using threads on 
each MPI process, i.e. one MPI is an 8-pthread application.

thanks again.




More information about the mpich-discuss mailing list