[mpich-discuss] How to implement this case
Xiao Li
shinelee.thewise at gmail.com
Tue Jan 4 16:36:30 CST 2011
Hi Eric,
I tried to mix MPI_Waitany, MPI_Isend and MPI_Irecv in my code. But I still
get error like
Fatal error in PMPI_Wait: Other MPI error, error stack:
PMPI_Wait(179).......................: MPI_Wait(request=0012FF24,
status00000001) failed
MPIR_Wait_impl(69)...................:
MPIDI_CH3I_Progress(335).............:
MPID_nem_mpich2_blocking_recv(906)...:
MPID_nem_newtcp_module_poll(37)......:
MPID_nem_newtcp_module_connpoll(2655):
gen_write_fail_handler(1194).........: write to socket failed - The
specified network name is no longer available.
Fatal error in MPI_Finalize: Other MPI error, error stack:
MPI_Finalize(281)....................: MPI_Finalize failed
MPI_Finalize(209)....................:
MPID_Finalize(118)...................:
MPIDI_CH3U_VC_WaitForClose(358)......: an error occurred while the device
was waiting for all open connections to close
MPIDI_CH3I_Progress(335).............:
MPID_nem_mpich2_blocking_recv(906)...:
MPID_nem_newtcp_module_poll(37)......:
MPID_nem_newtcp_module_connpoll(2655):
gen_read_fail_handler(1145)..........: read from socket failed - The
specified network name is no longer available.
Below is the sampling code making these error.
int test_mpi_wait_2(int argc, char* argv[])
{
int rank;
int numprocs;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
int trunk_size = 7;
int buf_size = 10000;
if(rank == 0)
{
int** b = new int* [numprocs];
for(int i=0;i<numprocs;i++)
b[i] = new int [buf_size];
MPI_Request* requests = new MPI_Request[numprocs];
for(int i=1;i<numprocs;i++)
MPI_Irecv(b[i], buf_size, MPI_INT, i, 0, MPI_COMM_WORLD, &requests[i]);
vector<int> counter(numprocs);
MPI_Status status;
for(int i=0;i<(numprocs-1)*trunk_size;i++)
{
int active_index;
MPI_Waitany(numprocs-1, requests+1, &active_index, &status);
int request_index = active_index + 1;
int procs_index = active_index + 1;
cout<<"get " <<b[procs_index][0]<<b[procs_index][1]<<" from
"<<procs_index<<endl;
if(++counter[procs_index] != trunk_size)
{
cout<<"begin receive next trunk from "<<procs_index<<" remained as
"<<trunk_size - counter[procs_index]<<endl;
MPI_Irecv(b[procs_index], buf_size, MPI_INT, procs_index, 0, MPI_COMM_WORLD,
&requests[request_index]);
}
else
cout<<"finished at "<<procs_index<<" remained as "<<trunk_size -
counter[procs_index]<<endl;
}
cout<<"receiving count"<<endl;
for(int i=1;i<numprocs;i++)
cout<<counter[i]<<endl;
for(int i=0;i<numprocs;i++)
delete [] b[i];
delete [] b;
delete [] requests;
cout<<rank<<" done"<<endl;
}
else
{
int* a[2];
a[0] = new int [buf_size];
a[1] = new int [buf_size];
MPI_Request requests[2];
//file A bufer
for(int i=0;i<buf_size;i++)
a[0][i] = 0;
MPI_Isend(a[0], buf_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[0]);
if(trunk_size > 1)
{
//file B bufer
for(int i=0;i<buf_size;i++)
a[1][i] = i;
MPI_Isend(a[1], buf_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[1]);
}
for(int i=2;i<trunk_size;i+=2)
{
cout<<rank<<" wait A for trunk "<<i-2<<endl;
MPI_Wait(&requests[0], MPI_STATUS_IGNORE);
for(int j=0;j<buf_size;j++)
a[0][j] = j * i;
MPI_Isend(a[0], buf_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[0]);
if(i+ 1 < trunk_size)
{
cout<<rank<<" wait B for trunk "<<i-1<<endl;
MPI_Wait(&requests[1], MPI_STATUS_IGNORE);
for(int j=0;j<buf_size;j++)
a[1][j] = j * (i + 1);
MPI_Isend(a[1], buf_size, MPI_INT, 0, 0, MPI_COMM_WORLD, &requests[1]);
}
}
if(trunk_size == 1)
{
cout<<rank<<" wait A for trunk 0"<<endl;
MPI_Wait(&requests[0], MPI_STATUS_IGNORE);
}
else
{
if(trunk_size % 2)
{
cout<<rank<<" wait A for trunk "<<trunk_size-1<<endl;
MPI_Wait(&requests[0], MPI_STATUS_IGNORE);
cout<<rank<<" wait B for trunk "<<trunk_size-2<<endl;
MPI_Wait(&requests[1], MPI_STATUS_IGNORE);
}
else
{
cout<<rank<<" wait A for trunk "<<trunk_size-2<<endl;
MPI_Wait(&requests[0], MPI_STATUS_IGNORE);
cout<<rank<<" wait B for trunk "<<trunk_size-1<<endl;
MPI_Wait(&requests[1], MPI_STATUS_IGNORE);
}
}
delete [] a[0];
delete [] a[1];
cout<<rank<<" done"<<endl;
}
MPI_Finalize();
return 0;
}
On Tue, Jan 4, 2011 at 1:18 PM, Eric A. Borisch <eborisch at ieee.org> wrote:
> That shouldn't be the case; the request at index should get set to
> MPI_REQUEST_NULL and not hurt anything:
>
> **** Test program; Only uses two nodes as written!****
> #include <iostream>
>
> #include <mpi.h>
>
> int main(int argc, char * argv[])
> {
> MPI_Init(&argc, &argv);
> int rank;
> MPI_Comm_rank(MPI_COMM_WORLD, &rank);
>
> #define BUFSIZE (0x1 << 20) // 1M-element
> #define COUNT 4
> if (rank == 1)
> {
> int * buffer = (int *) malloc(BUFSIZE * sizeof(int));
> for (int i=0; i < COUNT; i++)
> MPI_Send(buffer, BUFSIZE, MPI_INT, 0, (i*3)%COUNT, MPI_COMM_WORLD);
> }
> else if (!rank)
> {
> int * buffers[COUNT];
> MPI_Request requests[COUNT];
> for (int i=0; i < COUNT; i++)
> {
> buffers[i] = (int *) malloc(BUFSIZE * sizeof(int));
> MPI_Irecv(buffers[i], BUFSIZE, MPI_INT, 1, i, MPI_COMM_WORLD,
> requests + i);
> }
> int rec = 0;
> while (rec != MPI_UNDEFINED)
> {
> std::cout << "Waiting... ";
> if(MPI_Waitany(COUNT, requests, &rec, MPI_STATUS_IGNORE))
> {
> std::cout << "Error in wait?" << std::endl;
> break;
> }
> else if (rec != MPI_UNDEFINED)
> std::cout << "Received message " << rec;
> else
> std::cout << "None left! ";
>
> std::cout << " Requests: ";
> for (int i=0; i < COUNT; i++)
> if (requests[i] == MPI_REQUEST_NULL)
> std::cout << "N"; // NULL
> else
> std::cout << "V"; // VALID
> std::cout << std::endl;
> }
> }
>
> MPI_Finalize();
> return 0;
> }
> *** Execution output
>
> Waiting... Received message 0 Requests: NVVV
> Waiting... Received message 3 Requests: NVVN
> Waiting... Received message 2 Requests: NVNN
> Waiting... Received message 1 Requests: NNNN
> Waiting... None left! Requests: NNNN
>
> -Eric
>
> On Mon, Jan 3, 2011 at 10:19 PM, Xiao Li <shinelee.thewise at gmail.com>wrote:
>
>> Hi Eric,
>>
>> The if statement may make some error, I think it should be altered as .
>>
>> if ++trunk_sent[index] != M do
>> MPI_Irecv(buffer[index], index, requests[index])
>> else
>> //remove the MPIRequest object of finished process, or else it
>> might be halt forever
>> remove requests[index]
>> end
>>
>> cheers
>> Xiao
>>
>> On Mon, Jan 3, 2011 at 4:24 PM, Xiao Li <shinelee.thewise at gmail.com>wrote:
>>
>>> Hi Eric,
>>>
>>> You are right. An extra MPI_Irecv will be executed at end. Thanks for
>>> your comment.
>>>
>>> cheers
>>> Xiao
>>>
>>>
>>> On Mon, Jan 3, 2011 at 4:16 PM, Eric A. Borisch <eborisch at ieee.org>wrote:
>>>
>>>> Looks about right... I'm assuming there is a <do actual work here> to be
>>>> inserted between the MPI_Waitany and MPI_Irecv within the N*M-sized loop....
>>>> and I count from 0 rather than 1 by force of habit... :)
>>>>
>>>> I think the logic below will end with one extra attempted MPI_Irecv than
>>>> desired; perhaps change
>>>>
>>>> if trunk_sent[index] != M do
>>>> MPI_Irecv(buffer[index], index, requests[index])
>>>> trunk_sent[index]++
>>>> end
>>>>
>>>> to
>>>>
>>>> if ++trunk_sent[index] != M do
>>>> MPI_Irecv(buffer[index], index, requests[index])
>>>> end
>>>>
>>>> -Eric
>>>>
>>>>
>>>> On Mon, Jan 3, 2011 at 3:00 PM, Xiao Li <shinelee.thewise at gmail.com>wrote:
>>>>
>>>>> Hi Eric,
>>>>>
>>>>> Thanks for your detailed suggestion. After read MPI documents, I
>>>>> propose the following algorithm,
>>>>>
>>>>> //begin collecting data for the first trunk
>>>>> for i=1 to N do
>>>>> MPI_Irecv(buffer[i], i, requests[i])
>>>>> end
>>>>> //set data sending counter
>>>>> for i=1 to N do
>>>>> trunk_sent[i] = 0
>>>>> end
>>>>> //begin collecting data
>>>>> for i=1 to N*M do
>>>>> MPI_Waitany(N, requests, &index, &status)
>>>>> if trunk_sent[index] != M do
>>>>> MPI_Irecv(buffer[index], index, requests[index])
>>>>> trunk_sent[index]++
>>>>> end
>>>>> end
>>>>>
>>>>> May I know what is your opinion of this algorithm?
>>>>>
>>>>> cheers
>>>>> Xiao
>>>>>
>>>>>
>>>>> On Mon, Jan 3, 2011 at 3:31 PM, Eric A. Borisch <eborisch at ieee.org>wrote:
>>>>>
>>>>>> Xiao,
>>>>>>
>>>>>> You should be able to get by with just N buffers, one for each client.
>>>>>> After you have processed the i-th iteration for client n, re-issue an
>>>>>> MPI_Irecv with the same buffer. This will match up with the next MPI_Send
>>>>>> from client n. You don't have to worry about synchronizing -- the MPI_Irecv
>>>>>> does not need to be posted before the MPI_Send. (But the MPI_Send won't
>>>>>> complete until it has been, of course...)
>>>>>>
>>>>>> You could always roll your own sockets, but MPI does a nice job of
>>>>>> managing connections and messages for you. In addition, MPI can be used
>>>>>> fairly efficiently on a wide range of interconnects, from shared memory to
>>>>>> Infiniband with little to no change on the user's part.
>>>>>>
>>>>>> In addition, you could likely improve performance in MPI by having two
>>>>>> sets (call them A and B) of buffers to send from on each worker; one is in
>>>>>> the "send" state (let's call this one A, started with an MPI_Isend after it
>>>>>> was initially filled) while you're filling B. After B is filled, initiate a
>>>>>> new MPI_Isend (very quick) on B and then wait for A's first send (MPI_Wait)
>>>>>> to complete. Once the first send on A is completed, you can start populating
>>>>>> A with the next iteration's output, initiate A's send, wait for B's send to
>>>>>> complete, and the cycle begins again.
>>>>>>
>>>>>> This approach allows you to overlay communication and computation
>>>>>> times, and still works with the MPI_Waitany() approach to harvesting
>>>>>> completed jobs in first-completed order on the master. This is an almost
>>>>>> trivial thing to implement in MPI, but achieving it with sockets requires
>>>>>> (IMHO) much more programmer overhead...
>>>>>>
>>>>>> Just my 2c.
>>>>>>
>>>>>> Eric
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 3, 2011 at 1:24 PM, Xiao Li <shinelee.thewise at gmail.com>wrote:
>>>>>>
>>>>>>> Hi Eric,
>>>>>>>
>>>>>>> Assume I have N workers and M trunks of sending data for each worker
>>>>>>> respectively, then I have to create N*M data buffer for MPI_Irecv usage. Is
>>>>>>> this method too costly?
>>>>>>>
>>>>>>> Or If I write raw socket programming, is that better? Just like
>>>>>>> traditional client/server socket programming? Master listens on port
>>>>>>> and spawn a new thread to accept worker's data storage request?
>>>>>>>
>>>>>>> cheers
>>>>>>> Xiao
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jan 3, 2011 at 2:13 PM, Eric A. Borisch <eborisch at ieee.org>wrote:
>>>>>>>
>>>>>>>> Look at the documentation for MPI_Irecv and MPI_Testany ... these
>>>>>>>> should help you do what you want.
>>>>>>>>
>>>>>>>> Eric
>>>>>>>>
>>>>>>>> On Mon, Jan 3, 2011 at 12:45 PM, Xiao Li <
>>>>>>>> shinelee.thewise at gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi MPICH2 people,
>>>>>>>>>
>>>>>>>>> Now, I have a application that composed of single master and many
>>>>>>>>> workers. The application requirement is very simple: workers finish some
>>>>>>>>> jobs and send data to master and master store these data into files
>>>>>>>>> separately. I can simply use MPI_Send on worker side to send data to master.
>>>>>>>>> But master does not know the data sending sequence. Some worker go fast
>>>>>>>>> while some are slow. More specifically, suppose there are 5 workers, then
>>>>>>>>> the data send sequence may be 1,3,4,5,2 or 2,5,4,1,3. If I just write a for
>>>>>>>>> loop for(i=1 to 5) on master side with MPI_Recv to get data, the master and
>>>>>>>>> some faster worker have to wait for a long time. I know MPI_Gather can
>>>>>>>>> implement this. But I am not sure is MPI_Gather works parallelly or just a
>>>>>>>>> sequential MPI_Recv? Another issue is my data is extremely large, more than
>>>>>>>>> 1GB data needed to be sent to master. If I divide the data into pieces, I do
>>>>>>>>> not think MPI_Gather can work. I also tried to think about raw socket
>>>>>>>>> programming, but I do not think it is a good practice. Would you give me
>>>>>>>>> some suggestion please?
>>>>>>>>>
>>>>>>>>> cheers
>>>>>>>>> Xiao
>>>>>>>>>
>>>>>>>>> _______________________________________________
>>>>>>>>> mpich-discuss mailing list
>>>>>>>>> mpich-discuss at mcs.anl.gov
>>>>>>>>> https://lists.mcs.anl.gov/mailman/listinfo/mpich-discuss
>>>>>>>>>
>>>>>>>>>
>>>>>>>> _______________________________________________
>>>>>>>> mpich-discuss mailing list
>>>>>>>> mpich-discuss at mcs.anl.gov
>>>>>>>> https://lists.mcs.anl.gov/mailman/listinfo/mpich-discuss
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> _______________________________________________
>>>>>>> mpich-discuss mailing list
>>>>>>> mpich-discuss at mcs.anl.gov
>>>>>>> https://lists.mcs.anl.gov/mailman/listinfo/mpich-discuss
>>>>>>>
>>>>>>>
>>>>>> _______________________________________________
>>>>>> mpich-discuss mailing list
>>>>>> mpich-discuss at mcs.anl.gov
>>>>>> https://lists.mcs.anl.gov/mailman/listinfo/mpich-discuss
>>>>>>
>>>>>>
>>>>>
>>>>> _______________________________________________
>>>>> mpich-discuss mailing list
>>>>> mpich-discuss at mcs.anl.gov
>>>>> https://lists.mcs.anl.gov/mailman/listinfo/mpich-discuss
>>>>>
>>>>>
>>>>
>>>> _______________________________________________
>>>> mpich-discuss mailing list
>>>> mpich-discuss at mcs.anl.gov
>>>> https://lists.mcs.anl.gov/mailman/listinfo/mpich-discuss
>>>>
>>>>
>>>
>>
>> _______________________________________________
>> mpich-discuss mailing list
>> mpich-discuss at mcs.anl.gov
>> https://lists.mcs.anl.gov/mailman/listinfo/mpich-discuss
>>
>>
>
> _______________________________________________
> mpich-discuss mailing list
> mpich-discuss at mcs.anl.gov
> https://lists.mcs.anl.gov/mailman/listinfo/mpich-discuss
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.mcs.anl.gov/pipermail/mpich-discuss/attachments/20110104/bbfa5279/attachment-0001.htm>
More information about the mpich-discuss
mailing list