#include #include #include #include #include #include #include #include #include #include #include #include #include #include #define PROCESS_FAILED 1000 bool GotSignal = false; using namespace std; int nIters = 10; int Scale = 100000; int failNodeRank = 2; int failIteration = 5; int SleepSecs = 2; const int EndOfSession = 0; char OutFilePrefix[256]; bool bExeSlaveBcast = false; int BcastNum = -1; void ReadParameters(int argc, char* argv[]) { int i = 1; sscanf(argv[i++], "%d", &nIters); sscanf(argv[i++], "%d", &Scale); sscanf(argv[i++], "%d", &failNodeRank); sscanf(argv[i++], "%d", &failIteration); sscanf(argv[i++], "%d", &SleepSecs); strncpy(OutFilePrefix, argv[i++], 256); } void PrintUsage(const char* ProgName) { printf("Usage example: %s 10 100000 2 5 1 out\n", ProgName); printf("Where: \n"); printf("%s - program exe file \n", ProgName); printf("10 - number of interactions between master & each one of slaves (= loop iterations in master & slaves)\n"); printf("100000 - scale factor for integer sent by slave, slave sends 100000*rank + iteration number\n"); printf("2 - rank of fail node \n"); printf("5 - Fail iteration. On this iteration, slave 2 will fail & the rest slaves will sleep 1 sec (see next parameter)\n"); printf("1 - all slaves except failed one, will sleep 1 second on iteration of failure, to ensure that they finish after failure.\n"); printf(" This mechanism achieves slave failure in the midle of test, but not after all slaves finished\n"); printf("out - prefix for output files. out_rank.txt files will be created.\n\n"); } void FailFunction(int rank, int iter, int testSndNum, FILE* fpLog) { fflush(fpLog); if(iter == failIteration) { if( rank == failNodeRank){ fprintf(fpLog, "Last Sent number = %d\n", testSndNum); fprintf(fpLog, "Exit condition is true\n"); // exit(1); // abort(); int A = 5; int B = 8-3-5; int C = A / B; // divide by zero exception. printf("C = %d\n", C); // dummy, never will reach this command. } else usleep(1); // sleep(SleepSecs); } } void handleMPIerror(FILE* fpLog, const string str, int retErr, const MPI_Status* Status = NULL) { if (retErr == MPI_SUCCESS) return; char errString[MPI_MAX_ERROR_STRING]; int errStrLen; MPI_Error_string(retErr, errString, &errStrLen); fprintf(fpLog, "\n\n==========================================\n"); fprintf(fpLog, "%s: \n", str.c_str()); fprintf(fpLog, "------------------------------------------\n"); fprintf(fpLog, "ErrorCode: 0x%X \n", retErr); fprintf(fpLog, "ErrorString: %s \n" , errString); if(Status != NULL) { MPI_Error_string(Status->MPI_ERROR, errString, &errStrLen); fprintf(fpLog, "Status.MPI_ERROR(code) = %d \n" , Status->MPI_ERROR); fprintf(fpLog, "Status.MPI_ERROR(str) = %s \n" , errString); fprintf(fpLog, "Status.MPI_SOURCE = %d \n" , Status->MPI_SOURCE); fprintf(fpLog, "Status.MPI_TAG = %d \n" , Status->MPI_TAG); fprintf(fpLog, "Status.count = %d \n" , Status->count); fprintf(fpLog, "Status.cancelled = %d \n" , Status->cancelled); } fprintf(fpLog, "==========================================\n\n"); fflush(fpLog); } void catch_SIGUSR1(int sig_num) { printf("\n\nGot SIGUSR1\n\n"); // int dummyVar = PROCESS_FAILED; // MPI_Errhandler mpiErrhandler; // MPI_Errhandler_get(MPI_COMM_WORLD, &mpiErrhandler); // throw MPI::Exception(MPI_ERR_OTHER); // MPI_Comm_call_errhandler(MPI_COMM_WORLD, MPI_ERR_OTHER); GotSignal = true; /* re-set the signal handler again to catch_int, for next time */ signal(SIGUSR1, catch_SIGUSR1); } void CreateOwnSignalHandler() { signal(SIGUSR1, catch_SIGUSR1); } int mainInit(int argc, char* argv[], int* clusterSize, int* procRank, FILE** fppLog) { if(argc < 2) { // PrintUsage(ProgName); PrintUsage(argv[0]); return -1; } CreateOwnSignalHandler(); ReadParameters(argc, argv); int retErr; MPI::Init(argc, argv); MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); // return an exception instead of Fatal Error, which finishes execution of all processes. int size, rank; char hostname[256]; MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); char filename[256]; sprintf(filename, "./%s_r%d.log", OutFilePrefix, rank); // FILE* fpLog = fopen(filename, "at"); *fppLog = fopen(filename, "wt"); fprintf(*fppLog, "rank = %d\n", rank); gethostname(hostname, 256); fprintf(*fppLog, "hostname = %s\n", hostname); *clusterSize = size; *procRank = rank; return 0; } void MainFinalize(FILE* fpLog, bool isAbort) { int retErr = MPI_SUCCESS; if(!isAbort) { fprintf(fpLog, "Start MPI_Barrier \n"); fflush(fpLog); retErr = MPI_Barrier(MPI_COMM_WORLD); handleMPIerror(fpLog, "Error in MPI_Barrier: ", retErr); fprintf(fpLog, "Start finalization \n"); fflush(fpLog); retErr = MPI_Finalize(); handleMPIerror(fpLog, "Error in MPI_Finilize: ", retErr); fprintf(fpLog, "End finalization \n"); fprintf(fpLog, "\n\n"); fclose(fpLog); } else { fprintf(fpLog, "Sleep 2 secs ... \n"); sleep(2); fprintf(fpLog, "Start abort \n"); fflush(fpLog); retErr = MPI_Abort(MPI_COMM_WORLD, 0); handleMPIerror(fpLog, "Error in abort: ", retErr); fprintf(fpLog, "End abort \n"); fprintf(fpLog, "\n\n"); fclose(fpLog); } } class MasterBase { public: MasterBase(int slaves, int iters, FILE* fpLog); virtual ~MasterBase() {} bool IsAllSlavesLive() { unsigned int Sum = 0; for(int i=0; i mSlavesRcvIters; vector mRcvNumsBuf; vector mSndNumsBuf; int mSlavesFinished; vector mIsSlaveLives; }; MasterBase::MasterBase(int slaves, int iters, FILE* fpLog) : mSlaves(slaves), mIters(iters), mFpLog(fpLog), mSlavesFinished(0) { mSlavesRcvIters.resize(mSlaves); mRcvNumsBuf.resize(mSlaves); mSndNumsBuf.resize(mSlaves); mIsSlaveLives.resize(mSlaves); fill(mIsSlaveLives.begin(), mIsSlaveLives.end(), 1); } void MasterBase::SendEndOfCommunication(MPI::Intracomm* comm) { int retErr; for(int i=0; i