[mpich-discuss] Memory leaks using ROMIO
Jayesh Krishna
jayesh at mcs.anl.gov
Thu Jul 12 15:23:13 CDT 2012
Hi,
Windows is known to delay writes even after the user is done with the file (I have seen similar issues before. Your problem could be different though.).
# Does the system memory decrease after a period of time?
# Does the memory increase correspond to the amount of data written?
# How much does the memory increase?
# Do you see similar behavior with a simple Windows program (CreateFileMapping() + WriteFile() + CloseHandle())?
Regards,
Jayesh
----- Original Message -----
From: "Rob Latham" <robl at mcs.anl.gov>
To: mpich-discuss at mcs.anl.gov
Sent: Thursday, July 12, 2012 11:09:50 AM
Subject: Re: [mpich-discuss] Memory leaks using ROMIO
On Thu, Jul 12, 2012 at 01:29:38PM +0200, Akram Smaali wrote:
> Hello,
>
> i'm using mpich2 in a C program that reads several buffers from a data text
> files.
> I noticed that even when i free the buffer at the end, the global memory
> usage still increase but the memory usage per process still the same.
> I changed the MPI_File_read_at() with fseek/fread and the problem has
> dissapeared.
> I'm wondering if that i should do some extra instriuctions to avoid that
> because it seems that mpi_read allocate an intermediate buffer for reading
> (sieving optimization maybe) from the user space memory and the OS (
> windosws 7) don't get it back.
> i'm using MPICH2 1.4 on w windows 7 with NTFS file system.
It might be an NTFS-specific leak. I don't see any leaks on my linux
system (ad_ufs).
> Thank you for your help.
>
> here is my code :
It would be great if you could make this case more self contained.
Whatever offsets are in your Dictionary file, put those in an array
and broadcast that.
> #include <stdio.h>
> #include <stdlib.h>
> #include <mpi.h>
> #include <omp.h>
> #include <string.h>
> #include <time.h>
> #include "queue.h"
> #include "KhiopsNativeInterface.h"
> #include <windows.h>
>
> #define maxLength 100000
>
> double CPUtime(){ return ((double) clock())/CLOCKS_PER_SEC;}
>
>
> int main(int argc, char* argv []){
>
> if(argc != 5) {
> printf("\t[Dictionary file] [Dictionary] [Input file] [Buffer size]\n");
> exit(0);
> }
>
>
>
> char* sDictionaryFileName = argv[1];
> char* sDictionaryName = argv[2];
> char* filename = argv[3];
> int Mbuffer = atoi(argv[4]);
>
> int maxBuffer = Mbuffer*1024*1024;
> int over = 10000;
>
> int rank,numprocess;
> long int offset;
>
> char* buffer;
> char* opbuffer;
>
> double tstart=CPUtime();
>
> MPI_Init( &argc, &argv );
> MPI_Comm_rank( MPI_COMM_WORLD, &rank );
>
> /* mpi version */
> /* open the file*/
> MPI_File fh;
> int err;
> err = MPI_File_open(MPI_COMM_WORLD, filename, MPI_MODE_RDONLY,
> MPI_INFO_NULL, &fh);
> if (err != MPI_SUCCESS) {
> char errstr[MPI_MAX_ERROR_STRING];
> int errlen;
> MPI_Error_string(err, errstr, &errlen);
> printf("Error at opening file %s (%s)\n",filename,errstr);
> MPI_Finalize();
> exit(1);
> }
> // get offsets and buffer size
> MPI_Offset sfile;
> MPI_File_get_size(fh,&sfile);
> MPI_Status status;
>
>
> /* C version */
> /*FILE* fh;
> long int sfile;
> fh =fopen( filename,"rb");
> if (fh==NULL) {
> printf("Error at opening file %s\n",filename);
> exit(1);
> }
> // get offsets and buffer size
> fseek(fh, 0L, SEEK_END);
> sfile = ftell(fh);
> fseek(fh, 0L, SEEK_SET);*/
>
>
> MPI_Comm_size( MPI_COMM_WORLD, &numprocess );
>
> /* number of iterations */
> long int data_size = (long int)(sfile/(numprocess));
> int nbIter = data_size/maxBuffer;
> if(nbIter<=1){
> nbIter = 1;
> maxBuffer = data_size;
> }
>
> /* offsets */
> offset = data_size*(rank);
> long int cursor = offset;
> char* header;
> if(rank==0){
> FILE* fh;
> fh =fopen( filename,"rb");
> if (fh==NULL) {
> printf("Error at opening file %s\n",filename);
> exit(1);
> }
> /* read the header and broadcast it */
> header = malloc(sizeof(char)*1000);
> fgets(header,1000,fh);
> fclose(fh);
>
> //broadcast header
> int sndHeader = strlen(header);
> //cursor+=sndHeader;
> int process_counter;
> for(process_counter=1;process_counter<numprocess;process_counter++){
> int ierr = MPI_Send(&sndHeader,sizeof(int), MPI_INT, process_counter,
> 42,MPI_COMM_WORLD);
> if (ierr != MPI_SUCCESS) {
> int errclass,resultlen;
> char err_buffer[MPI_MAX_ERROR_STRING];
> MPI_Error_class(ierr,&errclass);
> if (errclass== MPI_ERR_RANK) {
> fprintf(stderr,"Invalid rank used in MPI send call\n");
> MPI_Error_string(ierr,err_buffer,&resultlen);
> fprintf(stderr,err_buffer);
> MPI_Finalize();
> }
> }
> MPI_Send(header, sndHeader, MPI_CHAR, process_counter, 43, MPI_COMM_WORLD);
> }
> }
> else{
> /* receive the header */
> int sizeofHeader;
> MPI_Status s ;
> MPI_Recv(&sizeofHeader,sizeof(int),MPI_INT,0,42,MPI_COMM_WORLD,&s);
> header = malloc (sizeof(char)*sizeofHeader+1);
> MPI_Recv(header,sizeofHeader,MPI_CHAR,0,43,MPI_COMM_WORLD,&s);
> }
>
>
>
> /* Synchronization barrier */
> MPI_Barrier(MPI_COMM_WORLD);
>
> int count;
>
> opbuffer = malloc(sizeof(char)*maxBuffer);
>
> /* C version */
> //fseek(fh,cursor,SEEK_SET);
>
> for(count=0;count<nbIter;count++){
>
> if(count==0 && rank==numprocess-1){ //init ring
> //send the token to p0
> int token=1;
> MPI_Send(&token,sizeof(int),MPI_INT,0,55,MPI_COMM_WORLD);
> }
>
> //recv
> int token;
> int sender;
> if(rank==0)
> sender = numprocess-1;
> else
> sender=rank-1;
>
> MPI_Status s;
> MPI_Recv(&token,sizeof(int),MPI_INT,sender,55,MPI_COMM_WORLD,&s);
> fflush(stdout);printf("P%d got the token at %G\n",rank,CPUtime());
> //read
> double start=CPUtime();
> /*double readtime;
> double sread=CPUtime();//read time*/
>
> //read
> if(token==1){
> /* MPI version */
> int err=MPI_File_read_at(fh, cursor,opbuffer, sizeof(char)*maxBuffer,
> MPI_CHAR, &status);
> if(err!=MPI_SUCCESS){
> char errstr[MPI_MAX_ERROR_STRING];
> int errlen;
> MPI_Error_string(err, errstr, &errlen);
> printf("Error reading file %s (%s)\n",filename,errstr);
> MPI_Finalize();
> exit(0);
> }
>
> /* C version of read */
> /*int k=fread(opbuffer,sizeof(char),maxBuffer,fh);
> if(k==0)
> perror("fread");*/
>
> cursor+=maxBuffer;
> buffer=opbuffer;
>
> }
> else{
> printf("Error token!\n");
> token=1;
> }
> //printf("P%d readtime=%G\n",rank,CPUtime()-sread);
> //Isend
> int next = (rank+1)%numprocess;
> MPI_Send(&token,sizeof(int),MPI_INT,next,55,MPI_COMM_WORLD);
>
> //i perform processing buffer here!
>
> }
> free(opbuffer);
>
> /* mpi version */
> int er=MPI_File_close(&fh);
> if(er!=MPI_SUCCESS){
> printf("Error closing file\n");
> MPI_Finalize();
> exit(1);
> }
>
> /* c version */
> //fclose(fh);
>
>
> MPI_Finalize();
>
> printf("Global time : %G\n",CPUtime()-tstart);
> return 0;
> }
>
>
> Best Regards
> _______________________________________________
> mpich-discuss mailing list mpich-discuss at mcs.anl.gov
> To manage subscription options or unsubscribe:
> https://lists.mcs.anl.gov/mailman/listinfo/mpich-discuss
--
Rob Latham
Mathematics and Computer Science Division
Argonne National Lab, IL USA
_______________________________________________
mpich-discuss mailing list mpich-discuss at mcs.anl.gov
To manage subscription options or unsubscribe:
https://lists.mcs.anl.gov/mailman/listinfo/mpich-discuss
More information about the mpich-discuss
mailing list