Is this the proper way to do parallel reads? and if so, is there a bug?

Wei-keng Liao wkliao at ece.northwestern.edu
Thu Jan 10 17:03:16 CST 2013


Hi, Brian,

The code fragment I sent earlier has a bug. Please see the revised version below.


	start[0] = start[1] = 0;  /* say the variable is 2D */
	if (rank == 0) {
		count2[0] = count2[1] = 0;
		count1[0] = full length of VAR1 in dimension Y 
		count1[1] = full length of VAR1 in dimension X 
       }
	else if (rank == 1) {
		count1[0] = count1[1] = 0;
		count2[0] = full length of VAR2 in dimension Y 
		count2[1] = full length of VAR2 in dimension X 
       }
	ncmpi_get_vara_float_all(infh, VAR1.varid, start, count1, buf);
	ncmpi_get_vara_float_all(infh, VAR2.varid, start, count2, buf);

Wei-keng

On Jan 10, 2013, at 11:49 AM, Wei-keng Liao wrote:

> Hi, Brian,
> 
> The communicators you used look fine to me. What Rob explained the collective
> read in your step 3 indeed points out the problem of the error.
> 
> Since the call you make is to read the entire variables, you will have to
> read the whole variables on both processes, no matter if the process needs
> the variable data or not.
> 
> Say the 2 processes of world rank 0 and 1, opening file 0151-01.nc, are
> reading variables VAR1 and VAR2, respectively. The collective I/O requires
> both processes participate the calls to ncmpi_get_var_float_all().
> 
> 	ncmpi_get_var_float_all(infh, VAR1.varid, buf1))
> 	ncmpi_get_var_float_all(infh, VAR2.varid, buf2))
> 
> 
> An alternative is to call ncmpi_get_vara_float_all(), the one with arguments
> of start[] and count[]. Set start[] to all zeros and count[] to full variable
> dimension sizes on the process that need to read the variable data, and
> set count[] to all zeros on the process that does not need the variable.
> For example, for the 2 processes in the same node,
> 
> 	start[0] = start[1] = 0;  /* say the variable is 2D */
> 	count[0] = count[1] = 0;
> 	if (rank == 0) {
> 		count[0] = full length of VAR1 in dimension Y 
> 		count[1] = full length of VAR1 in dimension X 
>        }
> 	else if (rank == 1) {
> 		count[0] = full length of VAR2 in dimension Y 
> 		count[1] = full length of VAR2 in dimension X 
>        }
> 	ncmpi_get_vara_float_all(infh, VAR1.varid, start, count, buf);
> 	ncmpi_get_vara_float_all(infh, VAR2.varid, start, count, buf);
> 
> In this case, rank0 reads the entire VAR1 and none for VAR2. Similarly,
> rank1 read the entire VAR2 and none for VAR1.
> 
> As Rob suggested, using nonblocking APIs will avoid the above and give you
> better performance
> 
> Wei-keng
> 
> On Jan 10, 2013, at 10:18 AM, Smith, Brian E. wrote:
> 
>> Is reading collectively across the communicator that opened the file enough? 
>> 
>> I have 4 MPI tasks. I create 2 "node" communicators via MPI_Comm_split, then 2 "core" communicators via MPI_Comm_split:
>> 
>> I am world rank 0. IO Rank 0. core rank 0. I will be processing file 0151-01.nc (fh 1) and var VAR1.fcount 0 vcount 0, nc_type 5
>> I am world rank 2. IO Rank 1. core rank 0. I will be processing file 0158-07.nc (fh 1) and var VAR1. fcount 0 vcount 0, nc_type 5
>> I am world rank 1. IO Rank 0. core rank 1. I will be processing file 0151-01.nc (fh 1) and var VAR2. fcount 0 vcount 0, nc_type 5
>> I am world rank 3. IO Rank 1. core rank 1. I will be processing file 0158-07.nc (fh 1) and var VAR2. fcount 0 vcount 0, nc_type 5
>> 
>> So in this case, world rank 0 and world rank 1 are reading different variables out of the same file. They are on the same "node" communicator, so the file is ncmpi_open()ed with that communicator. 
>> 
>> Here are the comm splits too. Rankspernode is passed in in this case. I can figure it out on Cray and BlueGene systems, but there isn't a portable way to do that (in MPI2.x) , and I'm just testing on my laptop anyway.
>> 
>> 
>>  numnodes = world_size / rankspernode;
>> 
>>  mynode = rank % numnodes; /* generic for laptop testing */
>> 
>>  MPI_Comm_split(MPI_COMM_WORLD, mynode, rank, &iocomm);
>> 
>>  MPI_Comm_rank(iocomm, &iorank);
>>  MPI_Comm_split(MPI_COMM_WORLD, iorank, rank, &corecomm);
>>  MPI_Comm_size(corecomm, &numcores);
>>  MPI_Comm_rank(corecomm, &corerank);
>> 
>> then:
>> 	files_pernode = numfiles / numnodes;
>> 	vars_percore = numvars / numcores;
>> 
>> 	for(i = mynode * files_pernode; i < (mynode + 1 ) * files_pernode; i++)
>> 	 {
>> 		ncmpi_open(iocomm, fname[i], NC_NOWRITE, MPI_INFO_NULL, &infh);
>> 		for(j = corerank * vars_percore; j < (corerank+1)*vars_percore; j++)
>> 		{
>> 		ncmpi_get_var_float_all(info, vars[j], data)
>> 		}
>> 	}
>> 
>> I'm fairly sure from my printfs (see above) that the communicators are split appropriately for the IO I want to do. However, I'll double check by printing out the communicator pointers too.
>> 
>> Does that make more sense? 
>> 
>> 
>> Thanks.
>> 
>> 
>> 
>> On Jan 10, 2013, at 10:43 AM, Rob Latham wrote:
>> 
>>> On Thu, Jan 10, 2013 at 06:53:45AM -0500, Smith, Brian E. wrote:
>>>> Hi all,
>>>> 
>>>> I am fairly new to (p)netCDF, but have plenty of MPI experience. I am working on improving IO performance of a fairly simple code I have.
>>>> 
>>>> The basic premise is:
>>>> 1) create communicators where the members of the communicator are on the same "node".  
>>>> 2) each "node" opens their subset of the total number of files (say 50 files out of 200 if we have 4 "nodes")
>>>> 3) each "core" on a given node reads their subset of variables from their node's file. (say 25 out of 100 variables if we have 4 "cores" per "node")
>>>> 4) each "core" does some local work (right now, just a simple sum)
>>>> 5) each "node" combines the local work (basically combine local sums for a given var on multiple nodes to a global sum to calculate an average)
>>>> 6) write results.
>>> 
>>> Ah! I think I see a problem in step 3. If there are 100 variables, and
>>> you want to read those variables collectively, you're going to have to
>>> get everyone participating, even if the 'count' array is zero for the
>>> do-nothing processors. 
>>> 
>>> Maybe you are already doing that...
>>> 
>>> If you use the pnetcdf non-blocking interface, you have a bit more
>>> flexibility: the collective requirement is deferred until the
>>> ncmpi_wait_all() call  (because in our implementation, all I/O is
>>> deferred until the ncmpi_wait_all() call). 
>>> 
>>> You can put yourself into independent mode (must call
>>> ncmpi_begin_indep_data to flush record variable caches and reset file
>>> views), but performance is pretty poor. we don't spend any time
>>> optimizing independent I/O.
>>> 
>>>> Depending on how I tweak ncmpi_get_var_*_all vs ncmpi_get_var and indep mode vs collective mode, I get different errors (in all 4 cases though). I think this snippet is the "right" way, so it is what I'd like to focus on.
>>>> 
>>>> Should this work? I am using pnetcdf 1.2. I will probably try upgrading today to see if that changes the behavior, but I don't see much in the change logs that would suggest that is necessary. This is also linked against MPICH2 1.4.1p1.
>>>> 
>>>> 
>>>> here's the rough snippet:
>>>> 
>>>>   fcount = 0;
>>>>   /* each node handles files, so we need rank in IOcomm */
>>>>   for(i = IORank * files_pernode; i < (IORank + 1 ) * files_pernode; i++)
>>>>   {
>>>>      fdata[fcount] = (float **)Malloc(sizeof(float *) * vars_percore);
>>>>      SAFE(ncmpi_open(iocomm, fname[i], NC_NOWRITE, MPI_INFO_NULL, &infh));
>>>> 
>>>>      vcount = 0;
>>>>      for(j = coreRank * vars_percore; j < (coreRank + 1) * vars_percore; j++)
>>>>      {
>>>>               fdata[fcount][vcount] = (float *)Malloc(sizeof(float) * lat * lon);
>>>>               SAFE(ncmpi_get_var_float_all(infh, var1[j].varid, fdata[fcount][vcount]));
>>>> 		  finish_doing_some_local_work();
>>>> 		 vcount++;
>>>>       }
>>>> 	   do_some_collective_work();
>>>>         fcount++;
>>>> 	}
>>>> 
>>>> 
>>>> I get errors deep inside MPI doing this:
>>>> #0  0x00007fff8c83bd57 in memmove$VARIANT$sse42 ()
>>>> #1  0x0000000100052389 in MPIUI_Memcpy ()
>>>> #2  0x0000000100051ffb in MPIDI_CH3U_Buffer_copy ()
>>>> #3  0x000000010006792f in MPIDI_Isend_self ()
>>>> #4  0x0000000100063e72 in MPID_Isend ()
>>>> #5  0x00000001000a34b5 in PMPI_Isend ()
>>>> #6  0x00000001000ac8c5 in ADIOI_R_Exchange_data ()
>>>> #7  0x00000001000aba71 in ADIOI_GEN_ReadStridedColl ()
>>>> #8  0x00000001000c6545 in MPIOI_File_read_all ()
>>>> #9  0x00000001000a779f in MPI_File_read_all ()
>>>> #10 0x000000010001d0c9 in ncmpii_getput_vars (ncp=0x7fff5fbff600, varp=0x100282890, start=0x7fff5fbff600, count=0x7fff5fbff600, stride=0x7fff5fbff600, buf=0x7fff5fbff600, bufcount=3888000, datatype=1275069450, rw_flag=1, io_method=1) at getput_vars.c:741
>>>> #11 0x0000000100018fc9 in ncmpi_get_var_float_all (ncid=2440416, varid=238391296, ip=0x7fff5fbff670) at getput_var.c:500
>>>> #12 0x0000000100006e50 in do_fn (fn_params=0x7fff5fbff7b0) at average.c:103
>>>> #13 0x0000000100000c00 in main (argc=12, argv=0x7fff5fbff9b8) at main.c:47
>>>> 
>>>> with this failing assert:
>>>> Assertion failed in file ch3u_buffer.c at line 77: FALSE
>>>> memcpy argument memory ranges overlap, dst_=0x1048bf000 src_=0x1049bd000 len_=15552000
>>>> 
>>>> I am testing with 4 processes on my laptop (mpirun -n 4 ./a.out --various-parameters). The communicators are setup as I expect, at least as far as being properly disjoint:
>>>> I am world rank 0. IO Rank 0. core rank 0. I will be processing file 0151-01.nc (fh 1) and var VAR1.fcount 0 vcount 0, nc_type 5
>>>> I am world rank 2. IO Rank 1. core rank 0. I will be processing file 0158-07.nc (fh 1) and var VAR1. fcount 0 vcount 0, nc_type 5
>>>> I am world rank 1. IO Rank 0. core rank 1. I will be processing file 0151-01.nc (fh 1) and var VAR2. fcount 0 vcount 0, nc_type 5
>>>> I am world rank 3. IO Rank 1. core rank 1. I will be processing file 0158-07.nc (fh 1) and var VAR2. fcount 0 vcount 0, nc_type 5
>>>> 
>>>> What am I doing wrong?
>>>> 
>>>> Thanks.
>>>> 
>>> 
>>> -- 
>>> Rob Latham
>>> Mathematics and Computer Science Division
>>> Argonne National Lab, IL USA
>> 
> 



More information about the parallel-netcdf mailing list