#include #include #include using namespace std; #define gComm MPI::COMM_WORLD struct ThreadInfo { MPI::Request *req; int num_requests; }; volatile bool gSomeArrived = false; extern "C" void* ListenerFunction(void* data) { ThreadInfo* info = (ThreadInfo*)data; printf("Waiting for %d requests to complete.\n", info->num_requests); //wait for one to complete MPI::Status status; int completed = MPI::Request::Waitany(info->num_requests, info->req, status); //re-copy requests to be contiguous MPI::Request new_requests[info->num_requests - 1]; int index = 0; for (int i=0; i < info->num_requests; i++) { if (i == completed) continue; new_requests[index++] = info->req[i]; } printf("request %d completed.\n", completed); //now we wait for the rest while we're cancelling MPI::Request::Waitall(info->num_requests - 1, new_requests); printf("finished waiting for %d requests\n", info->num_requests); return 0; } int main(int argc, char* argv[]) { MPI::Init_thread(MPI_THREAD_MULTIPLE); //initialize threads ::pthread_attr_t thread_attr; ::sched_param scheduler_param; ::pthread_attr_init(&thread_attr); ::pthread_attr_getschedparam(&thread_attr, &scheduler_param); ::pthread_attr_setscope(&thread_attr, PTHREAD_SCOPE_SYSTEM); int OldType; ::pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,&OldType); unsigned int sleep_time; int num_requests; if (argc > 1) { num_requests = atoi(argv[1]); } else { num_requests = 10; } if (argc > 2) { sleep_time = atoi(argv[2]); } else { sleep_time = 100; } int rank = gComm.Get_rank(); int ttl_requests = num_requests * (gComm.Get_size() - 1); int answers[ttl_requests]; //printf("ttl_requests=%d num_requests=%d\n", ttl_requests, num_requests); if (rank == 0) { //on master we start a bunch of listening requests MPI::Request requests[ttl_requests]; pthread_t listener_id; ThreadInfo listener_info; listener_info.req = requests; listener_info.num_requests = ttl_requests; //create requests for (int i=0; i < ttl_requests; i++) { answers[i] = 0; //clear requests[i] = gComm.Irecv(&answers[i], 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG); } //start listening for reply ::pthread_create(&listener_id, &thread_attr, ListenerFunction, &listener_info); //set handler just for this call so it won't abort if //a request has completed gComm.Set_errhandler(MPI::ERRORS_RETURN); for (int i = 0; i < ttl_requests; i++) { requests[i].Cancel(); } //return to desired behavior gComm.Set_errhandler(MPI::ERRORS_ARE_FATAL); ::pthread_join(listener_id, NULL); printf("request complete\n"); } else { //slaves just send number of requested messages to master int offset = num_requests * (gComm.Get_rank() - 1); int answers[num_requests]; MPI::Request requests[num_requests]; //gComm.Barrier(); for (int i=0; i < num_requests; i++) { answers[i] = i + offset; requests[i] = gComm.Isend(&answers[i], 1, MPI_INT, 0, 0); } printf("rank %d waiting on %d sends to complete.\n", rank, num_requests); MPI::Request::Waitall(num_requests, requests); printf("sent answers %d through %d.\n", offset, offset+num_requests); } gComm.Barrier(); MPI::Finalize(); // cout << "finalized MPI" << endl; return 0; }