#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define PROCESS_FAILED 1000 bool GotSignal = false; int nSigUsr1 = 0; using namespace std; int nIters = 10; int Scale = 100000; int failNodeRank = 2; int failIteration = 5; int SleepSecs = 2; const int EndOfSession = 0; char OutFilePrefix[256]; bool bExeSlaveBcast = false; int BcastNum = -1; void ReadParameters(int argc, char* argv[]) { int i = 1; sscanf(argv[i++], "%d", &nIters); sscanf(argv[i++], "%d", &Scale); sscanf(argv[i++], "%d", &failNodeRank); sscanf(argv[i++], "%d", &failIteration); sscanf(argv[i++], "%d", &SleepSecs); strncpy(OutFilePrefix, argv[i++], 256); } void PrintUsage(const char* ProgName) { printf("Usage example: %s 10 100000 2 5 1 out\n", ProgName); printf("Where: \n"); printf("%s - program exe file \n", ProgName); printf("10 - number of interactions between master & each one of slaves (= loop iterations in master & slaves)\n"); printf("100000 - scale factor for integer sent by slave, slave sends 100000*rank + iteration number\n"); printf("2 - rank of fail node \n"); printf("5 - Fail iteration. On this iteration, slave 2 will fail & the rest slaves will sleep 1 sec (see next parameter)\n"); printf("1 - all slaves except failed one, will sleep 1 second on iteration of failure, to ensure that they finish after failure.\n"); printf(" This mechanism achieves slave failure in the midle of test, but not after all slaves finished\n"); printf("out - prefix for output files. out_rank.txt files will be created.\n\n"); } int FailFunction(int rank, int iter, int testSndNum, FILE* fpLog) { fflush(fpLog); if(iter == failIteration) { if( rank == failNodeRank){ fprintf(fpLog, "Last Sent number = %d\n", testSndNum); fprintf(fpLog, "Exit condition is true\n"); // exit(1); // abort(); int A = 5; int B = 8-3-5; int C = A / B; // divide by zero exception. printf("C = %d\n", C); // dummy, never will reach this command. } else usleep(1); // sleep(SleepSecs); return 1; } else { return 0; } } void handleMPIerror(FILE* fpLog, const string str, int retErr, const MPI_Status* Status = NULL) { if (retErr == MPI_SUCCESS) return; // char errString[MPI_MAX_ERROR_STRING]; // int errStrLen; // MPI_Error_string(retErr, errString, &errStrLen); fprintf(fpLog, "\n\n==========================================\n"); fprintf(fpLog, "%s: \n", str.c_str()); fprintf(fpLog, "------------------------------------------\n"); fprintf(fpLog, "ErrorCode: 0x%X \n", retErr); // fprintf(fpLog, "ErrorString: %s \n" , errString); if(Status != NULL) { // MPI_Error_string(Status->MPI_ERROR, errString, &errStrLen); fprintf(fpLog, "Status.MPI_ERROR(code) = %d \n" , Status->MPI_ERROR); // fprintf(fpLog, "Status.MPI_ERROR(str) = %s \n" , errString); fprintf(fpLog, "Status.MPI_SOURCE = %d \n" , Status->MPI_SOURCE); fprintf(fpLog, "Status.MPI_TAG = %d \n" , Status->MPI_TAG); fprintf(fpLog, "Status.count = %d \n" , Status->count); fprintf(fpLog, "Status.cancelled = %d \n" , Status->cancelled); } fprintf(fpLog, "==========================================\n\n"); fflush(fpLog); } /* void catch_SIGUSR1(int sig_num) { // int dummyVar = PROCESS_FAILED; // MPI_Errhandler mpiErrhandler; // MPI_Errhandler_get(MPI_COMM_WORLD, &mpiErrhandler); // throw MPI::Exception(MPI_ERR_OTHER); // MPI_Comm_call_errhandler(MPI_COMM_WORLD, MPI_ERR_OTHER); // printf("SIGUSR1 "); // GotSignal = true; ++nSigUsr1; // re-set the signal handler again to catch_int, for next time signal(SIGUSR1, catch_SIGUSR1); } */ /* void CreateOwnSignalHandler() { signal(SIGUSR1, catch_SIGUSR1); } */ int mainInit(int argc, char* argv[], int* clusterSize, int* procRank, FILE** fppLog) { if(argc < 2) { // PrintUsage(ProgName); PrintUsage(argv[0]); return -1; } // CreateOwnSignalHandler(); ReadParameters(argc, argv); int retErr; MPI::Init(argc, argv); /* int provided; retErr = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); printf("Provided %d threads\n", provided); if(retErr != MPI_SUCCESS) { printf("Error in calling: MPI::Init_thread(argc, argv, MPI_THREAD_MULTIPLE, &provided)"); exit(-1); } */ MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN); // return an exception instead of Fatal Error, which finishes execution of all processes. int size, rank; char hostname[256]; MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); char filename[256]; sprintf(filename, "./%s_r%d.log", OutFilePrefix, rank); // FILE* fpLog = fopen(filename, "at"); *fppLog = fopen(filename, "wt"); fprintf(*fppLog, "rank = %d\n", rank); gethostname(hostname, 256); fprintf(*fppLog, "hostname = %s\n", hostname); *clusterSize = size; *procRank = rank; return 0; } void MainFinalize(FILE* fpLog, bool isAbort) { int retErr = MPI_SUCCESS; fclose(fpLog); /* if(!isAbort) { fprintf(fpLog, "Start MPI_Barrier \n"); fflush(fpLog); retErr = MPI_Barrier(MPI_COMM_WORLD); handleMPIerror(fpLog, "Error in MPI_Barrier: ", retErr); fprintf(fpLog, "Start finalization \n"); fflush(fpLog); retErr = MPI_Finalize(); handleMPIerror(fpLog, "Error in MPI_Finilize: ", retErr); fprintf(fpLog, "End finalization \n"); fprintf(fpLog, "\n\n"); fclose(fpLog); } else { fprintf(fpLog, "Sleep 2 secs ... \n"); sleep(2); fprintf(fpLog, "Start abort \n"); fflush(fpLog); retErr = MPI_Abort(MPI_COMM_WORLD, 0); handleMPIerror(fpLog, "Error in abort: ", retErr); fprintf(fpLog, "End abort \n"); fprintf(fpLog, "\n\n"); fclose(fpLog); } */ } void MainFinalize2(FILE* fpLog, bool isAbort, bool isMaster) { int retErr = MPI_SUCCESS; fclose(fpLog); if(isMaster) { if(isAbort) retErr = MPI_Abort(MPI_COMM_WORLD, 0); else retErr = MPI_Finalize(); } else { retErr = MPI_Finalize(); } } class MasterBase { public: MasterBase(int slaves, int iters, FILE* fpLog); virtual ~MasterBase() {} bool IsAllSlavesLive() { unsigned int Sum = 0; for(int i=0; i mSlavesRcvIters; vector mRcvNumsBuf; vector mSndNumsBuf; int mSlavesFinished; vector mIsSlaveLives; }; MasterBase::MasterBase(int slaves, int iters, FILE* fpLog) : mSlaves(slaves), mIters(iters), mFpLog(fpLog), mSlavesFinished(0) { mSlavesRcvIters.resize(mSlaves); mRcvNumsBuf.resize(mSlaves); mSndNumsBuf.resize(mSlaves); mIsSlaveLives.resize(mSlaves); fill(mIsSlaveLives.begin(), mIsSlaveLives.end(), 1); } void MasterBase::markFailedRanks(const char* prefix) { markFailedRanks_MPICH2_communicator_get_attr(prefix); } extern int MPICH_ATTR_FAILED_PROCESSES; void MasterBase::markFailedRanks_MPICH2_communicator_get_attr(const char* prefix) { fprintf(mFpLog, "\nmarkFailedRanks_MPICH2_communicator_get_attr on %s - start \n", prefix); int* failedRanks; // = new int [MAX_RANKS]; int flag; int retErr = MPI_Comm_get_attr(MPI_COMM_WORLD, MPICH_ATTR_FAILED_PROCESSES, &failedRanks, &flag); if( (!flag) || (retErr != MPI_SUCCESS) ) { fprintf(mFpLog, "\n\nMPI_Comm_get_attr error \n\n"); return; } for(int* nextRank = failedRanks; (*nextRank != MPI_PROC_NULL); ++nextRank) { fprintf(mFpLog, "%s - failedRank = %d\n", prefix, *nextRank); fflush(mFpLog); int slaveIdx = *nextRank - 1; if(mIsSlaveLives[slaveIdx]) { mIsSlaveLives[slaveIdx] = 0; ++mSlavesFinished; } } updateOnFail(); fprintf(mFpLog, "\nmarkFailedRanks_MPICH2_communicator_get_attr on %s - end \n", prefix); } void MasterBase::SendEndOfCommunication(MPI::Intracomm* comm) { int retErr; for(int i=0; i 0) { fprintf(mFpLog, "Got %d SIGUSR1\n", nSigUsr1); printf( "Got %d SIGUSR1\n", nSigUsr1); } usleep(500000*mSlaves); fflush(mFpLog); } void MasterBase::RecvEndOfCommunication(MPI::Intracomm* comm) { int retErr; for(int i=0; i 0) { fprintf(mFpLog, "Got %d SIGUSR1\n", nSigUsr1); printf( "Got %d SIGUSR1\n", nSigUsr1); } usleep(500000*mSlaves); fflush(mFpLog); }