[mpich2-commits] r4055 - in mpich2/trunk/src/mpi/romio: . adio/ad_lustre adio/common adio/include

robl at mcs.anl.gov robl at mcs.anl.gov
Fri Mar 13 10:30:30 CDT 2009


Author: robl
Date: 2009-03-13 10:30:30 -0500 (Fri, 13 Mar 2009)
New Revision: 4055

Added:
   mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_aggregate.c
   mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_wrcoll.c
   mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_wrstr.c
Modified:
   mpich2/trunk/src/mpi/romio/adio/ad_lustre/Makefile.in
   mpich2/trunk/src/mpi/romio/adio/ad_lustre/README
   mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre.c
   mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre.h
   mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_hints.c
   mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_open.c
   mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_rwcontig.c
   mpich2/trunk/src/mpi/romio/adio/common/ad_write_coll.c
   mpich2/trunk/src/mpi/romio/adio/include/adioi.h
   mpich2/trunk/src/mpi/romio/configure.in
Log:
The improved Lustre driver, contributed by Sun with help from Weikuan Yu.


Modified: mpich2/trunk/src/mpi/romio/adio/ad_lustre/Makefile.in
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/ad_lustre/Makefile.in	2009-03-13 11:28:26 UTC (rev 4054)
+++ mpich2/trunk/src/mpi/romio/adio/ad_lustre/Makefile.in	2009-03-13 15:30:30 UTC (rev 4055)
@@ -16,8 +16,10 @@
 @VPATH@
 
 AD_LUSTRE_OBJECTS = ad_lustre.o ad_lustre_open.o \
-      ad_lustre_rwcontig.o ad_lustre_hints.o 
+      ad_lustre_rwcontig.o ad_lustre_wrcoll.o ad_lustre_wrstr.o  \
+      ad_lustre_hints.o ad_lustre_aggregate.o
 
+
 default: $(LIBNAME)
 	@if [ "@ENABLE_SHLIB@" != "none" ] ; then \
 	    $(MAKE) $(SHLIBNAME).la ;\

Modified: mpich2/trunk/src/mpi/romio/adio/ad_lustre/README
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/ad_lustre/README	2009-03-13 11:28:26 UTC (rev 4054)
+++ mpich2/trunk/src/mpi/romio/adio/ad_lustre/README	2009-03-13 15:30:30 UTC (rev 4055)
@@ -5,6 +5,23 @@
   o To post the code for ParColl (Partitioned collective IO)
  
 -----------------------------------------------------
+V05: 
+-----------------------------------------------------
+Improved data redistribution
+  o Improve I/O pattern identification. Besides checking interleaving,
+    if request I/O size is small, collective I/O will be performed.
+    The hint bigsize can be used to define the req size value.
+  o Provide hint CO for load balancing to control the number of
+    IO clients for each OST
+  o Produce stripe-contiguous I/O pattern that Lustre prefers
+  o Reduce the collective overhead by hints contig_data and
+    samesize to remove unnecessary MPI_Alltoall()
+  o Control read-modify-write in data sieving in collective IO
+    by hint ds_in_coll.
+  o Reduce extent lock conflicts by make each OST accessed by one or
+    more constant clients.
+
+-----------------------------------------------------
 V04: 
 -----------------------------------------------------
   o Direct IO and Lockless IO support

Modified: mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre.c
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre.c	2009-03-13 11:28:26 UTC (rev 4054)
+++ mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre.c	2009-03-13 15:30:30 UTC (rev 4055)
@@ -1,9 +1,11 @@
 /* -*- Mode: C; c-basic-offset:4 ; -*- */
-/* 
- *   Copyright (C) 2001 University of Chicago. 
+/*
+ *   Copyright (C) 2001 University of Chicago.
  *   See COPYRIGHT notice in top-level directory.
  *
  *   Copyright (C) 2007 Oak Ridge National Laboratory
+ *
+ *   Copyright (C) 2008 Sun Microsystems, Lustre group
  */
 
 #include "ad_lustre.h"
@@ -13,12 +15,12 @@
     ADIOI_LUSTRE_ReadContig, /* ReadContig */
     ADIOI_LUSTRE_WriteContig, /* WriteContig */
     ADIOI_GEN_ReadStridedColl, /* ReadStridedColl */
-    ADIOI_GEN_WriteStridedColl, /* WriteStridedColl */
+    ADIOI_LUSTRE_WriteStridedColl, /* WriteStridedColl */
     ADIOI_GEN_SeekIndividual, /* SeekIndividual */
     ADIOI_GEN_Fcntl, /* Fcntl */
     ADIOI_LUSTRE_SetInfo, /* SetInfo */
     ADIOI_GEN_ReadStrided, /* ReadStrided */
-    ADIOI_GEN_WriteStrided, /* WriteStrided */
+    ADIOI_LUSTRE_WriteStrided, /* WriteStrided */
     ADIOI_GEN_Close, /* Close */
 #if defined(ROMIO_HAVE_WORKING_AIO) && !defined(CRAY_XT_LUSTRE)
     ADIOI_GEN_IreadContig, /* IreadContig */

Modified: mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre.h
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre.h	2009-03-13 11:28:26 UTC (rev 4054)
+++ mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre.h	2009-03-13 15:30:30 UTC (rev 4055)
@@ -1,9 +1,11 @@
 /* -*- Mode: C; c-basic-offset:4 ; -*- */
-/* 
- *   Copyright (C) 1997 University of Chicago. 
+/*
+ *   Copyright (C) 1997 University of Chicago.
  *   See COPYRIGHT notice in top-level directory.
  *
  *   Copyright (C) 2007 Oak Ridge National Laboratory
+ *
+ *   Copyright (C) 2008 Sun Microsystems, Lustre group
  */
 
 #ifndef AD_UNIX_INCLUDE
@@ -24,7 +26,7 @@
 
 /*#include <fcntl.h>*/
 #include <sys/ioctl.h>
-#include "lustre/lustre_user.h"
+#include <lustre/lustre_user.h>
 #include "adio.h"
 /*#include "adioi.h"*/
 
@@ -41,24 +43,31 @@
 
 void ADIOI_LUSTRE_Open(ADIO_File fd, int *error_code);
 void ADIOI_LUSTRE_Close(ADIO_File fd, int *error_code);
-void ADIOI_LUSTRE_ReadContig(ADIO_File fd, void *buf, int count, 
-                      MPI_Datatype datatype, int file_ptr_type,
-                     ADIO_Offset offset, ADIO_Status *status, int
-		     *error_code);
-void ADIOI_LUSTRE_WriteContig(ADIO_File fd, void *buf, int count, 
-                      MPI_Datatype datatype, int file_ptr_type,
-                      ADIO_Offset offset, ADIO_Status *status, int
-		      *error_code);   
+void ADIOI_LUSTRE_ReadContig(ADIO_File fd, void *buf, int count,
+                             MPI_Datatype datatype, int file_ptr_type,
+                             ADIO_Offset offset, ADIO_Status *status,
+                             int *error_code);
+void ADIOI_LUSTRE_WriteContig(ADIO_File fd, void *buf, int count,
+                              MPI_Datatype datatype, int file_ptr_type,
+                              ADIO_Offset offset, ADIO_Status *status,
+                              int *error_code);
+void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, void *buf, int count,
+			       MPI_Datatype datatype, int file_ptr_type,
+			       ADIO_Offset offset, ADIO_Status *status,
+			       int *error_code);
 void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count,
-		       MPI_Datatype datatype, int file_ptr_type,
-		       ADIO_Offset offset, ADIO_Status *status, int
-		       *error_code);
+		                   MPI_Datatype datatype, int file_ptr_type,
+		                   ADIO_Offset offset, ADIO_Status *status,
+                                   int *error_code);
 void ADIOI_LUSTRE_ReadStridedColl(ADIO_File fd, void *buf, int count,
-		       MPI_Datatype datatype, int file_ptr_type,
-		       ADIO_Offset offset, ADIO_Status *status, int
-		       *error_code);
+		                  MPI_Datatype datatype, int file_ptr_type,
+		                  ADIO_Offset offset, ADIO_Status *status,
+                                  int *error_code);
+void ADIOI_LUSTRE_ReadStrided(ADIO_File fd, void *buf, int count,
+			      MPI_Datatype datatype, int file_ptr_type,
+			      ADIO_Offset offset, ADIO_Status *status,
+                              int *error_code);
 void ADIOI_LUSTRE_Fcntl(ADIO_File fd, int flag, ADIO_Fcntl_t *fcntl_struct,
 	               int *error_code);
 void ADIOI_LUSTRE_SetInfo(ADIO_File fd, MPI_Info users_info, int *error_code);
-
 #endif /* End of AD_UNIX_INCLUDE */

Added: mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_aggregate.c
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_aggregate.c	                        (rev 0)
+++ mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_aggregate.c	2009-03-13 15:30:30 UTC (rev 4055)
@@ -0,0 +1,511 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ *   Copyright (C) 1997 University of Chicago.
+ *   See COPYRIGHT notice in top-level directory.
+ *
+ *   Copyright (C) 2007 Oak Ridge National Laboratory
+ *
+ *   Copyright (C) 2008 Sun Microsystems, Lustre group
+ */
+
+#include "ad_lustre.h"
+#include "adio_extern.h"
+
+#undef AGG_DEBUG
+
+void ADIOI_LUSTRE_Get_striping_info(ADIO_File fd, int ** striping_info_ptr,
+				    int mode)
+{
+    int *striping_info = NULL;
+    /* get striping information:
+     *  striping_info[0]: stripe_size
+     *  striping_info[1]: stripe_count
+     *  striping_info[2]: avail_cb_nodes
+     */
+    int stripe_size, stripe_count, CO = 1, CO_max = 1, CO_nodes, lflag;
+    int avail_cb_nodes, divisor, nprocs_for_coll = fd->hints->cb_nodes;
+    char *value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL + 1) * sizeof(char));
+
+    /* Get hints value */
+    /* stripe size */
+    stripe_size = fd->hints->striping_unit;
+    /* stripe count */
+    /* stripe_size and stripe_count have been validated in ADIOI_LUSTRE_Open() */
+    stripe_count = fd->hints->striping_factor;
+
+    /* Calculate the available number of I/O clients, that is
+     *  avail_cb_nodes=min(cb_nodes, stripe_count*CO), where
+     *  CO=1 by default
+     */
+    if (!mode) {
+        /* for collective read,
+	 * if "CO" clients access the same OST simultaneously,
+	 * the OST disk seek time would be much. So, to avoid this,
+	 * it might be better if 1 client only accesses 1 OST.
+	 * So, we set CO = 1 to meet the above requirement.
+	 */
+	CO = 1;
+	/*XXX: maybe there are other better way for collective read */
+    } else {
+        /* CO_max: the largest number of IO clients for each ost group */
+        CO_max = (nprocs_for_coll - 1)/ stripe_count + 1;
+        /* CO also has been validated in ADIOI_LUSTRE_Open(), >0 */
+	CO = fd->hints->fs_hints.lustre.CO;
+	CO = ADIOI_MIN(CO_max, CO);
+    }
+    /* Calculate how many IO clients we need */
+    /* To avoid extent lock conflicts,
+     * avail_cb_nodes should divide (stripe_count*CO) exactly,
+     * so that each OST is accessed by only one or more constant clients. */
+    avail_cb_nodes = ADIOI_MIN(nprocs_for_coll, stripe_count * CO);
+    if (avail_cb_nodes == nprocs_for_coll) {
+        CO_nodes = stripe_count * CO;
+        do {
+            /* find the divisor of CO_nodes */
+            divisor = 1;
+            do {
+                divisor ++;
+            } while (CO_nodes % divisor);
+            CO_nodes = CO_nodes / divisor;
+            /* if stripe_count*CO is a prime number, change nothing */
+            if ((CO_nodes <= avail_cb_nodes) && (CO_nodes != 1)) {
+                avail_cb_nodes = CO_nodes;
+                break;
+            }
+        } while (CO_nodes != 1);
+    }
+
+    *striping_info_ptr = (int *) ADIOI_Malloc(3 * sizeof(int));
+    striping_info = *striping_info_ptr;
+    striping_info[0] = stripe_size;
+    striping_info[1] = stripe_count;
+    striping_info[2] = avail_cb_nodes;
+
+    ADIOI_Free(value);
+}
+
+int ADIOI_LUSTRE_Calc_aggregator(ADIO_File fd, ADIO_Offset off,
+                                 ADIO_Offset *len, int *striping_info)
+{
+    int rank_index, rank;
+    ADIO_Offset avail_bytes;
+    int stripe_size = striping_info[0];
+    int avail_cb_nodes = striping_info[2];
+
+    /* Produce the stripe-contiguous pattern for Lustre */
+    rank_index = (int)((off / stripe_size) % avail_cb_nodes);
+
+    /* we index into fd_end with rank_index, and fd_end was allocated to be no
+     * bigger than fd->hins->cb_nodes.   If we ever violate that, we're
+     * overrunning arrays.  Obviously, we should never ever hit this abort
+     */
+    if (rank_index >= fd->hints->cb_nodes)
+	    MPI_Abort(MPI_COMM_WORLD, 1);
+
+    avail_bytes = (off / (ADIO_Offset)stripe_size + 1) *
+                  (ADIO_Offset)stripe_size - off;
+    if (avail_bytes < *len) {
+	/* this proc only has part of the requested contig. region */
+	*len = avail_bytes;
+    }
+    /* map our index to a rank */
+    /* NOTE: FOR NOW WE DON'T HAVE A MAPPING...JUST DO 0..NPROCS_FOR_COLL */
+    rank = fd->hints->ranklist[rank_index];
+
+    return rank;
+}
+
+/* ADIOI_LUSTRE_Calc_my_req() - calculate what portions of the access requests
+ * of this process are located in the file domains of various processes
+ * (including this one)
+ */
+void ADIOI_LUSTRE_Calc_my_req(ADIO_File fd, ADIO_Offset *offset_list,
+			      int *len_list, int contig_access_count,
+			      int *striping_info, int nprocs,
+                              int *count_my_req_procs_ptr,
+			      int **count_my_req_per_proc_ptr,
+			      ADIOI_Access ** my_req_ptr,
+			      int **buf_idx_ptr)
+{
+    /* Nothing different from ADIOI_Calc_my_req(), except calling
+     * ADIOI_Lustre_Calc_aggregator() instead of the old one */
+    int *count_my_req_per_proc, count_my_req_procs, *buf_idx;
+    int i, l, proc;
+    ADIO_Offset avail_len, rem_len, curr_idx, off;
+    ADIOI_Access *my_req;
+
+    *count_my_req_per_proc_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
+    count_my_req_per_proc = *count_my_req_per_proc_ptr;
+    /* count_my_req_per_proc[i] gives the no. of contig. requests of this
+     * process in process i's file domain. calloc initializes to zero.
+     * I'm allocating memory of size nprocs, so that I can do an
+     * MPI_Alltoall later on.
+     */
+
+    buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
+    /* buf_idx is relevant only if buftype_is_contig.
+     * buf_idx[i] gives the index into user_buf where data received
+     * from proc. i should be placed. This allows receives to be done
+     * without extra buffer. This can't be done if buftype is not contig.
+     */
+
+    /* initialize buf_idx to -1 */
+    for (i = 0; i < nprocs; i++)
+	buf_idx[i] = -1;
+
+    /* one pass just to calculate how much space to allocate for my_req;
+     * contig_access_count was calculated way back in ADIOI_Calc_my_off_len()
+     */
+    for (i = 0; i < contig_access_count; i++) {
+	/* short circuit offset/len processing if len == 0
+	 * (zero-byte  read/write
+	 */
+	if (len_list[i] == 0)
+	    continue;
+	off = offset_list[i];
+	avail_len = len_list[i];
+	/* note: we set avail_len to be the total size of the access.
+	 * then ADIOI_LUSTRE_Calc_aggregator() will modify the value to return
+	 * the amount that was available.
+	 */
+	proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
+	count_my_req_per_proc[proc]++;
+
+	/* figure out how many data is remaining in the access
+	 * we'll take care of this data (if there is any)
+	 * in the while loop below.
+	 */
+	rem_len = len_list[i] - avail_len;
+
+	while (rem_len != 0) {
+	    off += avail_len;	/* point to first remaining byte */
+	    avail_len = rem_len;	/* save remaining size, pass to calc */
+	    proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
+	    count_my_req_per_proc[proc]++;
+	    rem_len -= avail_len;	/* reduce remaining length by amount from fd */
+	}
+    }
+
+    /* now allocate space for my_req, offset, and len */
+    *my_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs * sizeof(ADIOI_Access));
+    my_req = *my_req_ptr;
+
+    count_my_req_procs = 0;
+    for (i = 0; i < nprocs; i++) {
+	if (count_my_req_per_proc[i]) {
+	    my_req[i].offsets = (ADIO_Offset *)
+		                ADIOI_Malloc(count_my_req_per_proc[i] *
+                                             sizeof(ADIO_Offset));
+	    my_req[i].lens = (int *) ADIOI_Malloc(count_my_req_per_proc[i] *
+				                  sizeof(int));
+	    count_my_req_procs++;
+	}
+	my_req[i].count = 0;	/* will be incremented where needed later */
+    }
+
+    /* now fill in my_req */
+    curr_idx = 0;
+    for (i = 0; i < contig_access_count; i++) {
+	/* short circuit offset/len processing if len == 0
+	 *	(zero-byte  read/write */
+	if (len_list[i] == 0)
+	    continue;
+	off = offset_list[i];
+	avail_len = len_list[i];
+	proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len, striping_info);
+
+	/* for each separate contiguous access from this process */
+	if (buf_idx[proc] == -1)
+	    buf_idx[proc] = (int) curr_idx;
+
+	l = my_req[proc].count;
+	curr_idx += (int) avail_len;	/* NOTE: Why is curr_idx an int?  Fix? */
+
+	rem_len = len_list[i] - avail_len;
+
+	/* store the proc, offset, and len information in an array
+	 * of structures, my_req. Each structure contains the
+	 * offsets and lengths located in that process's FD,
+	 * and the associated count.
+	 */
+	my_req[proc].offsets[l] = off;
+	my_req[proc].lens[l] = (int) avail_len;
+	my_req[proc].count++;
+
+	while (rem_len != 0) {
+	    off += avail_len;
+	    avail_len = rem_len;
+	    proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
+                                                striping_info);
+	    if (buf_idx[proc] == -1)
+		buf_idx[proc] = (int) curr_idx;
+
+	    l = my_req[proc].count;
+	    curr_idx += avail_len;
+	    rem_len -= avail_len;
+
+	    my_req[proc].offsets[l] = off;
+	    my_req[proc].lens[l] = (int) avail_len;
+	    my_req[proc].count++;
+	}
+    }
+
+#ifdef AGG_DEBUG
+    for (i = 0; i < nprocs; i++) {
+	if (count_my_req_per_proc[i] > 0) {
+	    FPRINTF(stdout, "data needed from %d (count = %d):\n",
+		            i, my_req[i].count);
+	    for (l = 0; l < my_req[i].count; l++) {
+		FPRINTF(stdout, "   off[%d] = %lld, len[%d] = %d\n",
+			        l, my_req[i].offsets[l], l, my_req[i].lens[l]);
+	    }
+	}
+    }
+#if 0
+    for (i = 0; i < nprocs; i++) {
+	FPRINTF(stdout, "buf_idx[%d] = 0x%x\n", i, buf_idx[i]);
+    }
+#endif
+#endif
+
+    *count_my_req_procs_ptr = count_my_req_procs;
+    *buf_idx_ptr = buf_idx;
+}
+
+int ADIOI_LUSTRE_Docollect(ADIO_File fd, int contig_access_count,
+			   int *len_list, int nprocs)
+{
+    /* If the processes are non-interleaved, we will check the req_size.
+     *   if (avg_req_size > big_req_size) {
+     *       docollect = 0;
+     *   }
+     */
+
+    int i, docollect = 1, lflag, big_req_size = 0;
+    ADIO_Offset req_size = 0, total_req_size;
+    int avg_req_size, total_access_count;
+
+    /* calculate total_req_size and total_access_count */
+    for (i = 0; i < contig_access_count; i++)
+        req_size += len_list[i];
+    MPI_Allreduce(&req_size, &total_req_size, 1, MPI_LONG_LONG_INT, MPI_SUM,
+               fd->comm);
+    MPI_Allreduce(&contig_access_count, &total_access_count, 1, MPI_INT, MPI_SUM,
+               fd->comm);
+    /* estimate average req_size */
+    avg_req_size = (int)(total_req_size / total_access_count);
+    /* get hint of big_req_size */
+    big_req_size = fd->hints->fs_hints.lustre.bigsize;
+    /* Don't perform collective I/O if there are big requests */
+    if ((big_req_size > 0) && (avg_req_size > big_req_size))
+        docollect = 0;
+
+    return docollect;
+}
+
+void ADIOI_LUSTRE_Calc_others_req(ADIO_File fd, int count_my_req_procs,
+				  int *count_my_req_per_proc,
+				  ADIOI_Access * my_req,
+				  int nprocs, int myrank,
+                                  ADIO_Offset start_offset,
+                                  ADIO_Offset end_offset,
+                                  int *striping_info,
+				  int *count_others_req_procs_ptr,
+				  ADIOI_Access ** others_req_ptr)
+{
+    /* what requests of other processes will be accessed by this process */
+
+    /* count_others_req_procs = number of processes whose requests (including
+     * this process itself) will be accessed by this process
+     * count_others_req_per_proc[i] indicates how many separate contiguous
+     * requests of proc. i lie will be accessed by this process.
+     */
+
+    int *count_others_req_per_proc, count_others_req_procs, proc;
+    int i, j, samesize = 0, contiguous = 0;
+    int avail_cb_nodes = striping_info[2];
+    MPI_Request *send_requests, *recv_requests;
+    MPI_Status *statuses;
+    ADIOI_Access *others_req;
+    ADIO_Offset min_st_offset, off, req_len, avail_len, rem_len, *all_lens;
+
+    /* There are two hints, which could reduce some MPI communication overhead,
+     * if the users knows the I/O pattern and set them correctly. */
+    /* They are
+     * contig_data: if the data are contiguous,
+     *              we don't need to do MPI_Alltoall().
+     * samesize: if the data req size is same,
+     *           we can calculate the offset directly
+     */
+    /* hint of contiguous data */
+    contiguous = fd->hints->fs_hints.lustre.contig_data;
+    /* hint of same io size */
+    samesize = fd->hints->fs_hints.lustre.samesize;
+
+    *others_req_ptr = (ADIOI_Access *) ADIOI_Malloc(nprocs *
+                                                    sizeof(ADIOI_Access));
+    others_req = *others_req_ptr;
+
+    /* if the data are contiguous, we can calulate the offset and length
+     * of the other requests simply, instead of MPI_Alltoall() */
+    if (contiguous) {
+        for (i = 0; i < nprocs; i++) {
+            others_req[i].count = 0;
+        }
+        req_len = end_offset - start_offset + 1;
+        all_lens = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
+
+        /* same req size ? */
+        if (samesize == 0) {
+            /* calculate the min_st_offset */
+            MPI_Allreduce(&start_offset, &min_st_offset, 1, MPI_LONG_LONG,
+                          MPI_MIN, fd->comm);
+            /* exchange request length */
+            MPI_Allgather(&req_len, 1, ADIO_OFFSET, all_lens, 1, ADIO_OFFSET,
+                          fd->comm);
+        } else { /* same request size */
+            /* calculate the 1st request's offset */
+            min_st_offset = start_offset - myrank * req_len;
+            /* assign request length to all_lens[] */
+            for (i = 0; i < nprocs; i ++)
+               all_lens[i] = req_len;
+        }
+        if (myrank < avail_cb_nodes) {
+            /* This is a IO client and it will receive data from others */
+            off = min_st_offset;
+            /* calcaulte other_req[i].count */
+            for (i = 0; i < nprocs; i++) {
+                avail_len = all_lens[i];
+                rem_len = avail_len;
+                while (rem_len > 0) {
+	            proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
+                                                        striping_info);
+                    if (proc == myrank) {
+                        others_req[i].count ++;
+                    }
+                    off += avail_len;
+                    rem_len -= avail_len;
+                    avail_len = rem_len;
+                }
+            }
+            /* calculate offset and len for each request */
+            off = min_st_offset;
+            for (i = 0; i < nprocs; i++) {
+                if (others_req[i].count) {
+	            others_req[i].offsets = (ADIO_Offset *)
+                                            ADIOI_Malloc(others_req[i].count *
+			                                 sizeof(ADIO_Offset));
+	            others_req[i].lens = (int *)
+                                         ADIOI_Malloc(others_req[i].count *
+                                                      sizeof(int));
+                    others_req[i].mem_ptrs = (MPI_Aint *)
+                                             ADIOI_Malloc(others_req[i].count *
+			                                  sizeof(MPI_Aint));
+                }
+                j = 0;
+                avail_len = all_lens[i];
+                rem_len = avail_len;
+                while (rem_len > 0) {
+	            proc = ADIOI_LUSTRE_Calc_aggregator(fd, off, &avail_len,
+                                                        striping_info);
+                    if (proc == myrank) {
+                        others_req[i].offsets[j] = off;
+                        others_req[i].lens[j] = (int)avail_len;
+                        j ++;
+                    }
+                    off += avail_len;
+                    rem_len -= avail_len;
+                    avail_len = rem_len;
+                }
+            }
+        }
+        ADIOI_Free(all_lens);
+    } else {
+        /* multiple non-contiguous requests */
+
+        /*
+         * count_others_req_procs:
+         *    number of processes whose requests will be written by
+         *    this process (including this process itself)
+         * count_others_req_per_proc[i]:
+         *    how many separate contiguous requests of proc[i] will be
+         *    written by this process.
+         */
+
+        /* first find out how much to send/recv and from/to whom */
+        count_others_req_per_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
+
+        MPI_Alltoall(count_my_req_per_proc, 1, MPI_INT,
+	             count_others_req_per_proc, 1, MPI_INT, fd->comm);
+
+        count_others_req_procs = 0;
+        for (i = 0; i < nprocs; i++) {
+	    if (count_others_req_per_proc[i]) {
+	        others_req[i].count = count_others_req_per_proc[i];
+	        others_req[i].offsets = (ADIO_Offset *)
+                                        ADIOI_Malloc(others_req[i].count *
+			                         sizeof(ADIO_Offset));
+	        others_req[i].lens = (int *)
+		                     ADIOI_Malloc(others_req[i].count *
+                                                  sizeof(int));
+	        others_req[i].mem_ptrs = (MPI_Aint *)
+		                         ADIOI_Malloc(others_req[i].count *
+			                              sizeof(MPI_Aint));
+	        count_others_req_procs++;
+	    } else
+	        others_req[i].count = 0;
+        }
+
+        /* now send the calculated offsets and lengths to respective processes */
+
+        send_requests = (MPI_Request *) ADIOI_Malloc(2 * (count_my_req_procs + 1) *
+                                                     sizeof(MPI_Request));
+        recv_requests = (MPI_Request *) ADIOI_Malloc(2 * (count_others_req_procs+1)*
+	                                             sizeof(MPI_Request));
+        /* +1 to avoid a 0-size malloc */
+
+        j = 0;
+        for (i = 0; i < nprocs; i++) {
+	    if (others_req[i].count) {
+	        MPI_Irecv(others_req[i].offsets, others_req[i].count,
+		          ADIO_OFFSET, i, i + myrank, fd->comm,
+		          &recv_requests[j]);
+	        j++;
+	        MPI_Irecv(others_req[i].lens, others_req[i].count,
+		          MPI_INT, i, i + myrank + 1, fd->comm,
+		          &recv_requests[j]);
+	        j++;
+	    }
+        }
+
+        j = 0;
+        for (i = 0; i < nprocs; i++) {
+	    if (my_req[i].count) {
+	        MPI_Isend(my_req[i].offsets, my_req[i].count,
+		          ADIO_OFFSET, i, i + myrank, fd->comm,
+		          &send_requests[j]);
+	        j++;
+	        MPI_Isend(my_req[i].lens, my_req[i].count,
+		          MPI_INT, i, i + myrank + 1, fd->comm,
+		          &send_requests[j]);
+	        j++;
+	    }
+        }
+
+        statuses = (MPI_Status *)
+                   ADIOI_Malloc((1 + 2 * ADIOI_MAX(count_my_req_procs,
+						   count_others_req_procs)) *
+                                         sizeof(MPI_Status));
+        /* +1 to avoid a 0-size malloc */
+
+        MPI_Waitall(2 * count_my_req_procs, send_requests, statuses);
+        MPI_Waitall(2 * count_others_req_procs, recv_requests, statuses);
+
+        ADIOI_Free(send_requests);
+        ADIOI_Free(recv_requests);
+        ADIOI_Free(statuses);
+        ADIOI_Free(count_others_req_per_proc);
+
+        *count_others_req_procs_ptr = count_others_req_procs;
+    }
+}

Modified: mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_hints.c
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_hints.c	2009-03-13 11:28:26 UTC (rev 4054)
+++ mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_hints.c	2009-03-13 15:30:30 UTC (rev 4055)
@@ -1,9 +1,11 @@
 /* -*- Mode: C; c-basic-offset:4 ; -*- */
-/* 
- *   Copyright (C) 1997 University of Chicago. 
+/*
+ *   Copyright (C) 1997 University of Chicago.
  *   See COPYRIGHT notice in top-level directory.
  *
  *   Copyright (C) 2007 Oak Ridge National Laboratory
+ *
+ *   Copyright (C) 2008 Sun Microsystems, Lustre group
  */
 
 #include "ad_lustre.h"
@@ -12,46 +14,60 @@
 void ADIOI_LUSTRE_SetInfo(ADIO_File fd, MPI_Info users_info, int *error_code)
 {
     char *value, *value_in_fd;
-    int flag, tmp_val[3], str_factor=-1, str_unit=0, start_iodev=-1;
+    int flag, stripe_val[3], str_factor = -1, str_unit=0, start_iodev=-1;
     struct lov_user_md lum = { 0 };
     int err, myrank, fd_sys, perm, amode, old_mask;
+    int int_val, tmp_val;
+    static char myname[] = "ADIOI_LUSTRE_SETINFO";
 
     value = (char *) ADIOI_Malloc((MPI_MAX_INFO_VAL+1)*sizeof(char));
     if ( (fd->info) == MPI_INFO_NULL) {
-	/* This must be part of the open call. can set striping parameters 
-           if necessary. */ 
+	/* This must be part of the open call. can set striping parameters
+           if necessary. */
 	MPI_Info_create(&(fd->info));
 
 	MPI_Info_set(fd->info, "direct_read", "false");
 	MPI_Info_set(fd->info, "direct_write", "false");
 	fd->direct_read = fd->direct_write = 0;
-	
-	/* has user specified striping or server buffering parameters 
+        /* initialize lustre hints */
+	MPI_Info_set(fd->info, "romio_lustre_CO", "1");
+        fd->hints->fs_hints.lustre.CO = 1;
+	MPI_Info_set(fd->info, "romio_lustre_bigsize", "0");
+        fd->hints->fs_hints.lustre.bigsize = 0;
+	MPI_Info_set(fd->info, "romio_lustre_ds_in_coll", "enable");
+        fd->hints->fs_hints.lustre.ds_in_coll = ADIOI_HINT_ENABLE;
+	MPI_Info_set(fd->info, "romio_lustre_contig_data", "0");
+        fd->hints->fs_hints.lustre.contig_data = 0;
+	MPI_Info_set(fd->info, "romio_lustre_samesize", "0");
+        fd->hints->fs_hints.lustre.samesize = 0;
+
+	/* has user specified striping or server buffering parameters
            and do they have the same value on all processes? */
 	if (users_info != MPI_INFO_NULL) {
-	    MPI_Info_get(users_info, "striping_unit", MPI_MAX_INFO_VAL, 
+            /* striping information */
+	    MPI_Info_get(users_info, "striping_unit", MPI_MAX_INFO_VAL,
 			 value, &flag);
-	    if (flag) 
+	    if (flag)
 		str_unit=atoi(value);
 
-	    MPI_Info_get(users_info, "striping_factor", MPI_MAX_INFO_VAL, 
+	    MPI_Info_get(users_info, "striping_factor", MPI_MAX_INFO_VAL,
 			 value, &flag);
-	    if (flag) 
+	    if (flag)
 		str_factor=atoi(value);
 
-	    MPI_Info_get(users_info, "start_iodevice", MPI_MAX_INFO_VAL, 
-			 value, &flag);
-	    if (flag) 
+	    MPI_Info_get(users_info, "romio_lustre_start_iodevice",
+                         MPI_MAX_INFO_VAL, value, &flag);
+	    if (flag)
 		start_iodev=atoi(value);
 
-	    MPI_Info_get(users_info, "direct_read", MPI_MAX_INFO_VAL, 
-			     value, &flag);
+            /* direct read and write */
+	    MPI_Info_get(users_info, "direct_read", MPI_MAX_INFO_VAL,
+			 value, &flag);
 	    if (flag && (!strcmp(value, "true") || !strcmp(value, "TRUE"))) {
 		MPI_Info_set(fd->info, "direct_read", "true");
 		fd->direct_read = 1;
 	    }
-
-	    MPI_Info_get(users_info, "direct_write", MPI_MAX_INFO_VAL, 
+	    MPI_Info_get(users_info, "direct_write", MPI_MAX_INFO_VAL,
 			     value, &flag);
 	    if (flag && (!strcmp(value, "true") || !strcmp(value, "TRUE"))) {
 		MPI_Info_set(fd->info, "direct_write", "true");
@@ -59,22 +75,23 @@
 	    }
 	}
 
+        /* set striping information with ioctl */
 	MPI_Comm_rank(fd->comm, &myrank);
 	if (myrank == 0) {
-	    tmp_val[0] = str_factor;
-	    tmp_val[1] = str_unit;
-	    tmp_val[2] = start_iodev;
+	    stripe_val[0] = str_factor;
+	    stripe_val[1] = str_unit;
+	    stripe_val[2] = start_iodev;
 	}
-	MPI_Bcast(tmp_val, 3, MPI_INT, 0, fd->comm);
+	MPI_Bcast(stripe_val, 3, MPI_INT, 0, fd->comm);
 
-	if (tmp_val[0] != str_factor 
-		|| tmp_val[1] != str_unit 
-		|| tmp_val[2] != start_iodev) {
+	if (stripe_val[0] != str_factor
+		|| stripe_val[1] != str_unit
+		|| stripe_val[2] != start_iodev) {
 	    FPRINTF(stderr, "ADIOI_LUSTRE_SetInfo: All keys"
 		    "-striping_factor:striping_unit:start_iodevice "
 		    "need to be identical across all processes\n");
 	    MPI_Abort(MPI_COMM_WORLD, 1);
-       	} else if ((str_factor > 0) || (str_unit > 0) || (start_iodev >= 0)) {
+	} else if ((str_factor > 0) || (str_unit > 0) || (start_iodev >= 0)) {
 	     /* if user has specified striping info, process 0 tries to set it */
 	    if (!myrank) {
 		if (fd->perm == ADIO_PERM_NULL) {
@@ -100,9 +117,9 @@
 		amode = amode | O_LOV_DELAY_CREATE | O_CREAT;
 
 		fd_sys = open(fd->filename, amode, perm);
-		if (fd_sys == -1) { 
-		    if (errno != EEXIST) 
-			fprintf(stderr, 
+		if (fd_sys == -1) {
+		    if (errno != EEXIST)
+			fprintf(stderr,
 				"Failure to open file %s %d %d\n",strerror(errno), amode, perm);
 		} else {
 		    lum.lmm_magic = LOV_USER_MAGIC;
@@ -112,25 +129,108 @@
 		    lum.lmm_stripe_offset = start_iodev;
 
 		    err = ioctl(fd_sys, LL_IOC_LOV_SETSTRIPE, &lum);
-		    if (err == -1 && errno != EEXIST) { 
+		    if (err == -1 && errno != EEXIST) {
 			fprintf(stderr, "Failure to set stripe info %s \n", strerror(errno));
 		    }
 		    close(fd_sys);
 	       }
 	    } /* End of striping parameters validation */
 	}
-	
 	MPI_Barrier(fd->comm);
-	/* set the values for collective I/O and data sieving parameters */
-	ADIOI_GEN_SetInfo(fd, users_info, error_code);
-    } else {
-	/* The file has been opened previously and fd->fd_sys is a valid
-           file descriptor. cannot set striping parameters now. */
-	
-	/* set the values for collective I/O and data sieving parameters */
-	ADIOI_GEN_SetInfo(fd, users_info, error_code);
     }
- 
+    /* get other hint */
+    if (users_info != MPI_INFO_NULL) {
+        /* CO: IO Clients/OST,
+         * to keep the load balancing between clients and OSTs */
+        MPI_Info_get(users_info, "romio_lustre_CO", MPI_MAX_INFO_VAL, value,
+                     &flag);
+	if (flag && (int_val = atoi(value)) > 0) {
+            tmp_val = int_val;
+	    MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm);
+	    if (tmp_val != int_val) {
+                MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname,
+                                                   "romio_lustre_CO",
+                                                   error_code);
+                ADIOI_Free(value);
+		return;
+	    }
+	    MPI_Info_set(fd->info, "romio_lustre_CO", value);
+            fd->hints->fs_hints.lustre.CO = atoi(value);
+	}
+        /* big_req_size:
+         * if the req size is bigger than this,
+         * collective IO may not be performed.
+         */
+	MPI_Info_get(users_info, "romio_lustre_bigsize", MPI_MAX_INFO_VAL, value,
+                     &flag);
+	if (flag && (int_val = atoi(value)) > 0) {
+            tmp_val = int_val;
+	    MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm);
+	    if (tmp_val != int_val) {
+	        MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname,
+		                                   "romio_lustre_bigsize",
+	                                           error_code);
+                ADIOI_Free(value);
+	        return;
+	    }
+	    MPI_Info_set(fd->info, "romio_lustre_bigsize", value);
+            fd->hints->fs_hints.lustre.bigsize = atoi(value);
+        }
+        /* ds_in_coll: disable data sieving in collective IO */
+	MPI_Info_get(users_info, "romio_lustre_ds_in_coll", MPI_MAX_INFO_VAL,
+	             value, &flag);
+	if (flag && (!strcmp(value, "disable") ||
+                     !strcmp(value, "DISABLE"))) {
+            tmp_val = int_val = 2;
+	    MPI_Bcast(&tmp_val, 2, MPI_INT, 0, fd->comm);
+	    if (tmp_val != int_val) {
+	        MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname,
+		                                   "romio_lustre_ds_in_coll",
+						   error_code);
+                ADIOI_Free(value);
+                return;
+	    }
+	    MPI_Info_set(fd->info, "romio_lustre_ds_in_coll", "disable");
+            fd->hints->fs_hints.lustre.ds_in_coll = ADIOI_HINT_DISABLE;
+	}
+        /* contig_data: whether the request data are contiguous */
+	MPI_Info_get(users_info, "romio_lustre_contig_data", MPI_MAX_INFO_VAL,
+	             value, &flag);
+        if (flag && (!strcmp(value, "yes") ||
+                     !strcmp(value, "YES"))) {
+            tmp_val = int_val = 1;
+	    MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm);
+	    if (tmp_val != int_val) {
+	        MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname,
+		                                   "romio_lustre_contig_data",
+						   error_code);
+                ADIOI_Free(value);
+                return;
+	    }
+	    MPI_Info_set(fd->info, "romio_lustre_contig_data", "yes");
+            fd->hints->fs_hints.lustre.contig_data = 1;
+	}
+        /* same_io_size: whether the req size is same */
+	MPI_Info_get(users_info, "romio_lustre_samesize", MPI_MAX_INFO_VAL,
+	             value, &flag);
+        if (flag && (!strcmp(value, "yes") ||
+                     !strcmp(value, "YES"))) {
+            tmp_val = int_val = 1;
+	    MPI_Bcast(&tmp_val, 1, MPI_INT, 0, fd->comm);
+	    if (tmp_val != int_val) {
+	        MPIO_ERR_CREATE_CODE_INFO_NOT_SAME(myname,
+		                                   "romio_lustre_samesize",
+						   error_code);
+                ADIOI_Free(value);
+                return;
+	    }
+	    MPI_Info_set(fd->info, "romio_lustre_samesize", "yes");
+            fd->hints->fs_hints.lustre.samesize = 1;
+	}
+    }
+    /* set the values for collective I/O and data sieving parameters */
+    ADIOI_GEN_SetInfo(fd, users_info, error_code);
+
     if (ADIOI_Direct_read) fd->direct_read = 1;
     if (ADIOI_Direct_write) fd->direct_write = 1;
 

Modified: mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_open.c
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_open.c	2009-03-13 11:28:26 UTC (rev 4054)
+++ mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_open.c	2009-03-13 15:30:30 UTC (rev 4055)
@@ -1,9 +1,11 @@
 /* -*- Mode: C; c-basic-offset:4 ; -*- */
-/* 
- *   Copyright (C) 1997 University of Chicago. 
+/*
+ *   Copyright (C) 1997 University of Chicago.
  *   See COPYRIGHT notice in top-level directory.
  *
  *   Copyright (C) 2007 Oak Ridge National Laboratory
+ *
+ *   Copyright (C) 2008 Sun Microsystems, Lustre group
  */
 
 #include "ad_lustre.h"
@@ -51,14 +53,17 @@
         err = ioctl(fd->fd_sys, LL_IOC_LOV_GETSTRIPE, (void *) &lum);
 
         if (!err) {
+            fd->hints->striping_unit = lum.lmm_stripe_size;
             sprintf(value, "%d", lum.lmm_stripe_size);
             MPI_Info_set(fd->info, "striping_unit", value);
 
+            fd->hints->striping_factor = lum.lmm_stripe_count;
             sprintf(value, "%d", lum.lmm_stripe_count);
             MPI_Info_set(fd->info, "striping_factor", value);
 
+            fd->hints->fs_hints.lustre.start_iodevice = lum.lmm_stripe_offset;
             sprintf(value, "%d", lum.lmm_stripe_offset);
-            MPI_Info_set(fd->info, "start_iodevice", value);
+            MPI_Info_set(fd->info, "romio_lustre_start_iodevice", value);
         }
         ADIOI_Free(value);
 

Modified: mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_rwcontig.c
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_rwcontig.c	2009-03-13 11:28:26 UTC (rev 4054)
+++ mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_rwcontig.c	2009-03-13 15:30:30 UTC (rev 4055)
@@ -1,9 +1,11 @@
 /* -*- Mode: C; c-basic-offset:4 ; -*- */
-/* 
- *   Copyright (C) 1997 University of Chicago. 
+/*
+ *   Copyright (C) 1997 University of Chicago.
  *   See COPYRIGHT notice in top-level directory.
  *
  *   Copyright (C) 2007 Oak Ridge National Laboratory
+ *
+ *   Copyright (C) 2008 Sun Microsystems, Lustre group
  */
 
 #define _XOPEN_SOURCE 600
@@ -136,10 +138,23 @@
 	    if (err == -1) goto ioerr;
 	}
 	
-	if (io_mode)
+	if (io_mode) {
+#ifdef ADIOI_MPE_LOGGING
+        MPE_Log_event(ADIOI_MPE_write_a, 0, NULL);
+#endif
 	    err = write(fd->fd_sys, buf, len);
-	else 
+#ifdef ADIOI_MPE_LOGGING
+        MPE_Log_event(ADIOI_MPE_write_b, 0, NULL);
+#endif
+        } else {
+#ifdef ADIOI_MPE_LOGGING
+        MPE_Log_event(ADIOI_MPE_read_a, 0, NULL);
+#endif
 	    err = read(fd->fd_sys, buf, len);
+#ifdef ADIOI_MPE_LOGGING
+        MPE_Log_event(ADIOI_MPE_read_b, 0, NULL);
+#endif
+        }
     } else {
 	err = ADIOI_LUSTRE_Directio(fd, buf, len, offset, io_mode);
     }

Added: mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_wrcoll.c
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_wrcoll.c	                        (rev 0)
+++ mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_wrcoll.c	2009-03-13 15:30:30 UTC (rev 4055)
@@ -0,0 +1,936 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ *   Copyright (C) 1997 University of Chicago.
+ *   See COPYRIGHT notice in top-level directory.
+ *
+ *   Copyright (C) 2007 Oak Ridge National Laboratory
+ *
+ *   Copyright (C) 2008 Sun Microsystems, Lustre group
+ */
+
+#include "ad_lustre.h"
+#include "adio_extern.h"
+
+/* prototypes of functions used for collective writes only. */
+static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
+					MPI_Datatype datatype, int nprocs,
+					int myrank,
+					ADIOI_Access *others_req,
+					ADIOI_Access *my_req,
+					ADIO_Offset *offset_list,
+					int *len_list,
+					int contig_access_count,
+					int * striping_info,
+					int *buf_idx, int *error_code);
+static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
+					  ADIOI_Flatlist_node * flat_buf,
+					  char **send_buf,
+					  ADIO_Offset * offset_list,
+					  int *len_list, int *send_size,
+					  MPI_Request * requests,
+					  int *sent_to_proc, int nprocs,
+					  int myrank, int contig_access_count,
+					  int * striping_info,
+					  int *send_buf_idx,
+                                          int *curr_to_proc,
+					  int *done_to_proc, int iter,
+					  MPI_Aint buftype_extent);
+static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
+					 char *write_buf,
+					 ADIOI_Flatlist_node * flat_buf,
+					 ADIO_Offset * offset_list,
+					 int *len_list, int *send_size,
+					 int *recv_size, ADIO_Offset off,
+					 int size, int *count,
+					 int *start_pos, int *partial_recv,
+					 int *sent_to_proc, int nprocs,
+					 int myrank, int buftype_is_contig,
+					 int contig_access_count,
+					 int * striping_info,
+					 ADIOI_Access * others_req,
+					 int *send_buf_idx,
+					 int *curr_to_proc,
+					 int *done_to_proc, int *hole,
+					 int iter, MPI_Aint buftype_extent,
+					 int *buf_idx, int *error_code);
+void ADIOI_Heap_merge(ADIOI_Access * others_req, int *count,
+                      ADIO_Offset * srt_off, int *srt_len, int *start_pos,
+                      int nprocs, int nprocs_recv, int total_elements);
+
+void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, void *buf, int count,
+				   MPI_Datatype datatype,
+				   int file_ptr_type, ADIO_Offset offset,
+				   ADIO_Status * status, int *error_code)
+{
+    /* Uses a generalized version of the extended two-phase method described
+     * in "An Extended Two-Phase Method for Accessing Sections of
+     * Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
+     * Scientific Programming, (5)4:301--317, Winter 1996.
+     * http://www.mcs.anl.gov/home/thakur/ext2ph.ps
+     */
+
+    ADIOI_Access *my_req;
+    /* array of nprocs access structures, one for each other process has
+       this process's request */
+
+    ADIOI_Access *others_req;
+    /* array of nprocs access structures, one for each other process
+       whose request is written by this process. */
+
+    int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
+    int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
+    int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
+    ADIO_Offset orig_fp, start_offset, end_offset, off;
+    ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
+    int *buf_idx = NULL, *len_list = NULL, *striping_info = NULL;
+    int old_error, tmp_error;
+
+    MPI_Comm_size(fd->comm, &nprocs);
+    MPI_Comm_rank(fd->comm, &myrank);
+
+    orig_fp = fd->fp_ind;
+
+    /* IO patten identification if cb_write isn't disabled */
+    if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
+	/* For this process's request, calculate the list of offsets and
+	   lengths in the file and determine the start and end offsets. */
+
+	/* Note: end_offset points to the last byte-offset that will be accessed.
+         * e.g., if start_offset=0 and 100 bytes to be read, end_offset=99
+         */
+
+	ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
+	                      &offset_list, &len_list, &start_offset,
+	                      &end_offset, &contig_access_count);
+
+	/* each process communicates its start and end offsets to other
+         * processes. The result is an array each of start and end offsets
+         * stored in order of process rank.
+         */
+	st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
+	end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
+	MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
+		      ADIO_OFFSET, fd->comm);
+	MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
+		      ADIO_OFFSET, fd->comm);
+	/* are the accesses of different processes interleaved? */
+	for (i = 1; i < nprocs; i++)
+	    if ((st_offsets[i] < end_offsets[i-1]) &&
+                (st_offsets[i] <= end_offsets[i]))
+                interleave_count++;
+	/* This is a rudimentary check for interleaving, but should suffice
+	   for the moment. */
+
+	/* Two typical access patterns can benefit from collective write.
+         *   1) the processes are interleaved, and
+         *   2) the req size is small.
+         */
+        if (interleave_count > 0) {
+	    do_collect = 1;
+        } else {
+            do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
+			                        len_list, nprocs);
+        }
+    }
+    ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
+
+    /* Decide if collective I/O should be done */
+    if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
+        fd->hints->cb_write == ADIOI_HINT_DISABLE) {
+
+	int filerange_is_contig = 0;
+
+	/* use independent accesses */
+	if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
+	    ADIOI_Free(offset_list);
+	    ADIOI_Free(len_list);
+            ADIOI_Free(st_offsets);
+            ADIOI_Free(end_offsets);
+	}
+
+	fd->fp_ind = orig_fp;
+	ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
+	if (buftype_is_contig && filetype_is_contig) {
+	    if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
+		off = fd->disp + (fd->etype_size) * offset;
+		ADIO_WriteContig(fd, buf, count, datatype,
+				 ADIO_EXPLICIT_OFFSET,
+				 off, status, error_code);
+	    } else
+		ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
+				 0, status, error_code);
+	} else {
+	    ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
+			      offset, status, error_code);
+	}
+	return;
+    }
+
+    /* Get Lustre hints information */
+    ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
+
+    /* calculate what portions of the access requests of this process are
+     * located in which process
+     */
+    ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
+                             striping_info, nprocs, &count_my_req_procs,
+                             &count_my_req_per_proc, &my_req, &buf_idx);
+
+    /* based on everyone's my_req, calculate what requests of other processes
+     * will be accessed by this process.
+     * count_others_req_procs = number of processes whose requests (including
+     * this process itself) will be accessed by this process
+     * count_others_req_per_proc[i] indicates how many separate contiguous
+     * requests of proc. i will be accessed by this process.
+     */
+
+    ADIOI_LUSTRE_Calc_others_req(fd, count_my_req_procs,
+                                 count_my_req_per_proc,
+			         my_req, nprocs, myrank,
+                                 start_offset, end_offset, striping_info,
+                                 &count_others_req_procs, &others_req);
+    ADIOI_Free(count_my_req_per_proc);
+
+    /* exchange data and write in sizes of no more than stripe_size. */
+    ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
+                                others_req, my_req,
+                                offset_list, len_list, contig_access_count,
+				striping_info, buf_idx, error_code);
+
+    /* If this collective write is followed by an independent write,
+     * it's possible to have those subsequent writes on other processes
+     * race ahead and sneak in before the read-modify-write completes.
+     * We carry out a collective communication at the end here so no one
+     * can start independent i/o before collective I/O completes.
+     *
+     * need to do some gymnastics with the error codes so that if something
+     * went wrong, all processes report error, but if a process has a more
+     * specific error code, we can still have that process report the
+     * additional information */
+
+    old_error = *error_code;
+    if (*error_code != MPI_SUCCESS)
+	*error_code = MPI_ERR_IO;
+
+    /* optimization: if only one process performing i/o, we can perform
+     * a less-expensive Bcast  */
+#ifdef ADIOI_MPE_LOGGING
+    MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
+#endif
+    if (fd->hints->cb_nodes == 1)
+	MPI_Bcast(error_code, 1, MPI_INT,
+		  fd->hints->ranklist[0], fd->comm);
+    else {
+	tmp_error = *error_code;
+	MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
+		      MPI_MAX, fd->comm);
+    }
+#ifdef ADIOI_MPE_LOGGING
+    MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
+#endif
+
+    if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
+	*error_code = old_error;
+
+
+    if (!buftype_is_contig)
+	ADIOI_Delete_flattened(datatype);
+
+    /* free all memory allocated for collective I/O */
+    /* free others_req */
+    for (i = 0; i < nprocs; i++) {
+	if (others_req[i].count) {
+	    ADIOI_Free(others_req[i].offsets);
+	    ADIOI_Free(others_req[i].lens);
+	    ADIOI_Free(others_req[i].mem_ptrs);
+	}
+    }
+    ADIOI_Free(others_req);
+    /* free my_req here */
+    for (i = 0; i < nprocs; i++) {
+	if (my_req[i].count) {
+	    ADIOI_Free(my_req[i].offsets);
+	    ADIOI_Free(my_req[i].lens);
+	}
+    }
+    ADIOI_Free(my_req);
+    ADIOI_Free(buf_idx);
+    ADIOI_Free(offset_list);
+    ADIOI_Free(len_list);
+    ADIOI_Free(st_offsets);
+    ADIOI_Free(end_offsets);
+    ADIOI_Free(striping_info);
+
+#ifdef HAVE_STATUS_SET_BYTES
+    if (status) {
+	int bufsize, size;
+	/* Don't set status if it isn't needed */
+	MPI_Type_size(datatype, &size);
+	bufsize = size * count;
+	MPIR_Status_set_bytes(status, datatype, bufsize);
+    }
+    /* This is a temporary way of filling in status. The right way is to
+     * keep track of how much data was actually written during collective I/O.
+     */
+#endif
+
+    fd->fp_sys_posn = -1;	/* set it to null. */
+}
+
+/* If successful, error_code is set to MPI_SUCCESS.  Otherwise an error
+ * code is created and returned in error_code.
+ */
+static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, void *buf,
+					MPI_Datatype datatype, int nprocs,
+					int myrank, ADIOI_Access *others_req,
+                                        ADIOI_Access *my_req,
+					ADIO_Offset *offset_list,
+                                        int *len_list, int contig_access_count,
+					int *striping_info, int *buf_idx,
+                                        int *error_code)
+{
+    /* Send data to appropriate processes and write in sizes of no more
+     * than lustre stripe_size.
+     * The idea is to reduce the amount of extra memory required for
+     * collective I/O. If all data were written all at once, which is much
+     * easier, it would require temp space more than the size of user_buf,
+     * which is often unacceptable. For example, to write a distributed
+     * array to a file, where each local array is 8Mbytes, requiring
+     * at least another 8Mbytes of temp space is unacceptable.
+     */
+
+    int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
+    ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
+    ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
+    ADIO_Offset max_size, step_size = 0;
+    int real_size, req_len, send_len;
+    int *recv_curr_offlen_ptr, *recv_count, *recv_size;
+    int *send_curr_offlen_ptr, *send_size;
+    int *partial_recv, *sent_to_proc, *recv_start_pos;
+    int *send_buf_idx, *curr_to_proc, *done_to_proc;
+    char *write_buf = NULL;
+    MPI_Status status;
+    ADIOI_Flatlist_node *flat_buf = NULL;
+    MPI_Aint buftype_extent;
+    int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
+    int data_sieving = 0;
+
+    *error_code = MPI_SUCCESS;	/* changed below if error */
+    /* only I/O errors are currently reported */
+
+    /* calculate the number of writes of stripe size to be done.
+     * That gives the no. of communication phases as well.
+     * Note:
+     *   Because we redistribute data in stripe-contiguous pattern for Lustre,
+     *   each process has the same no. of communication phases.
+     */
+
+    for (i = 0; i < nprocs; i++) {
+	if (others_req[i].count) {
+	    st_loc = others_req[i].offsets[0];
+	    end_loc = others_req[i].offsets[0];
+	    break;
+	}
+    }
+    for (i = 0; i < nprocs; i++) {
+	for (j = 0; j < others_req[i].count; j++) {
+	    st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
+	    end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
+                                          others_req[i].lens[j] - 1));
+	}
+    }
+    /* this process does no writing. */
+    if ((st_loc == -1) && (end_loc == -1))
+	ntimes = 0;
+    MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
+    /* avoid min_st_loc be -1 */
+    if (st_loc == -1)
+        st_loc = max_end_loc;
+    MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
+    /* align downward */
+    min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
+
+    /* Each time, only avail_cb_nodes number of IO clients perform IO,
+     * so, step_size=avail_cb_nodes*stripe_size IO will be performed at most,
+     * and ntimes=whole_file_portion/step_size
+     */
+    step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
+    max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1);
+    if (ntimes)
+	write_buf = (char *) ADIOI_Malloc(stripe_size);
+
+    /* calculate the start offset for each iteration */
+    off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
+    for (m = 0; m < max_ntimes; m ++)
+        off_list[m] = max_end_loc;
+    for (i = 0; i < nprocs; i++) {
+        for (j = 0; j < others_req[i].count; j ++) {
+            req_off = others_req[i].offsets[j];
+            m = (int)((req_off - min_st_loc) / step_size);
+            off_list[m] = ADIOI_MIN(off_list[m], req_off);
+        }
+    }
+
+    recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
+    send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
+    /* their use is explained below. calloc initializes to 0. */
+
+    recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
+    /* to store count of how many off-len pairs per proc are satisfied
+       in an iteration. */
+
+    send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
+    /* total size of data to be sent to each proc. in an iteration.
+       Of size nprocs so that I can use MPI_Alltoall later. */
+
+    recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
+    /* total size of data to be recd. from each proc. in an iteration. */
+
+    sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
+    /* amount of data sent to each proc so far. Used in
+       ADIOI_Fill_send_buffer. initialized to 0 here. */
+
+    send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
+    curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
+    done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
+    /* Above three are used in ADIOI_Fill_send_buffer */
+
+    recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
+    /* used to store the starting value of recv_curr_offlen_ptr[i] in
+       this iteration */
+
+    ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
+    if (!buftype_is_contig) {
+	ADIOI_Flatten_datatype(datatype);
+	flat_buf = ADIOI_Flatlist;
+	while (flat_buf->type != datatype)
+	    flat_buf = flat_buf->next;
+    }
+    MPI_Type_extent(datatype, &buftype_extent);
+    /* I need to check if there are any outstanding nonblocking writes to
+     * the file, which could potentially interfere with the writes taking
+     * place in this collective write call. Since this is not likely to be
+     * common, let me do the simplest thing possible here: Each process
+     * completes all pending nonblocking operations before completing.
+     */
+    /*ADIOI_Complete_async(error_code);
+    if (*error_code != MPI_SUCCESS) return;
+    MPI_Barrier(fd->comm);
+    */
+
+    iter_st_off = min_st_loc;
+
+    /* Although we have recognized the data according to OST index,
+     * a read-modify-write will be done if there is a hole between the data.
+     * For example: if blocksize=60, xfersize=30 and stripe_size=100,
+     * then rank0 will collect data [0, 30] and [60, 90] then write. There
+     * is a hole in [30, 60], which will cause a read-modify-write in [0, 90].
+     *
+     * To reduce its impact on the performance, we can disable data sieving
+     * by hint "ds_in_coll".
+     */
+    /* check the hint for data sieving */
+    data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
+
+    for (m = 0; m < max_ntimes; m++) {
+	/* go through all others_req and my_req to check which will be received
+         * and sent in this iteration.
+         */
+
+	/* Note that MPI guarantees that displacements in filetypes are in
+	   monotonically nondecreasing order and that, for writes, the
+	   filetypes cannot specify overlapping regions in the file. This
+	   simplifies implementation a bit compared to reads. */
+
+	/*
+           off         = start offset in the file for the data to be written in
+                         this iteration
+           iter_st_off = start offset of this iteration
+           real_size   = size of data written (bytes) corresponding to off
+           max_size    = possible maximum size of data written in this iteration
+           req_off     = offset in the file for a particular contiguous request minus
+                         what was satisfied in previous iteration
+           send_off    = offset the request needed by other processes in this iteration
+           req_len     = size corresponding to req_off
+           send_len    = size corresponding to send_off
+         */
+
+	/* first calculate what should be communicated */
+	for (i = 0; i < nprocs; i++)
+	    recv_count[i] = recv_size[i] = send_size[i] = 0;
+
+        off = off_list[m];
+        max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
+        real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size - off,
+                                    end_loc - off + 1);
+
+	for (i = 0; i < nprocs; i++) {
+            if (my_req[i].count) {
+                for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
+                    send_off = my_req[i].offsets[j];
+                    send_len = my_req[i].lens[j];
+                    if (send_off < iter_st_off + max_size) {
+                        send_size[i] += send_len;
+                    } else {
+                        break;
+                    }
+                }
+                send_curr_offlen_ptr[i] = j;
+            }
+	    if (others_req[i].count) {
+		recv_start_pos[i] = recv_curr_offlen_ptr[i];
+		for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
+                    req_off = others_req[i].offsets[j];
+                    req_len = others_req[i].lens[j];
+		    if (req_off < iter_st_off + max_size) {
+			recv_count[i]++;
+			MPI_Address(write_buf + req_off - off,
+				    &(others_req[i].mem_ptrs[j]));
+                        recv_size[i] += req_len;
+		    } else {
+			break;
+                    }
+		}
+		recv_curr_offlen_ptr[i] = j;
+	    }
+	}
+        /* use variable "hole" to pass data_sieving flag into W_Exchange_data */
+        hole = data_sieving;
+	ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
+                                     len_list, send_size, recv_size, off, real_size,
+                                     recv_count, recv_start_pos, partial_recv,
+                                     sent_to_proc, nprocs, myrank,
+                                     buftype_is_contig, contig_access_count,
+                                     striping_info, others_req, send_buf_idx,
+                                     curr_to_proc, done_to_proc, &hole, m,
+                                     buftype_extent, buf_idx, error_code);
+	if (*error_code != MPI_SUCCESS)
+            goto over;
+
+	flag = 0;
+	for (i = 0; i < nprocs; i++)
+	    if (recv_count[i]) {
+		flag = 1;
+		break;
+	    }
+	if (flag) {
+            /* check whether to do data sieving */
+            if(data_sieving == ADIOI_HINT_ENABLE) {
+	        ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
+			         ADIO_EXPLICIT_OFFSET, off, &status,
+			         error_code);
+            } else {
+                /* if there is no hole, write data in one time;
+                 * otherwise, write data in several times */
+                if (!hole) {
+                    ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
+                                     ADIO_EXPLICIT_OFFSET, off, &status,
+                                     error_code);
+                } else {
+                    for (i = 0; i < nprocs; i++) {
+                        if (others_req[i].count) {
+                            for (j = 0; j < others_req[i].count; j++) {
+                                if (others_req[i].offsets[j] < off + real_size &&
+                                    others_req[i].offsets[j] >= off) {
+                                    ADIO_WriteContig(fd,
+                                                     write_buf + others_req[i].offsets[j] - off,
+                                                     others_req[i].lens[j],
+                                                     MPI_BYTE, ADIO_EXPLICIT_OFFSET,
+                                                     others_req[i].offsets[j], &status,
+                                                     error_code);
+	                            if (*error_code != MPI_SUCCESS)
+		                        goto over;
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+	    if (*error_code != MPI_SUCCESS)
+		goto over;
+	}
+        iter_st_off += max_size;
+    }
+over:
+    if (ntimes)
+	ADIOI_Free(write_buf);
+    ADIOI_Free(recv_curr_offlen_ptr);
+    ADIOI_Free(send_curr_offlen_ptr);
+    ADIOI_Free(recv_count);
+    ADIOI_Free(send_size);
+    ADIOI_Free(recv_size);
+    ADIOI_Free(sent_to_proc);
+    ADIOI_Free(recv_start_pos);
+    ADIOI_Free(send_buf_idx);
+    ADIOI_Free(curr_to_proc);
+    ADIOI_Free(done_to_proc);
+    ADIOI_Free(off_list);
+}
+
+/* Sets error_code to MPI_SUCCESS if successful, or creates an error code
+ * in the case of error.
+ */
+static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, void *buf,
+					 char *write_buf,
+					 ADIOI_Flatlist_node * flat_buf,
+					 ADIO_Offset * offset_list,
+					 int *len_list, int *send_size,
+					 int *recv_size, ADIO_Offset off,
+					 int size, int *count,
+					 int *start_pos, int *partial_recv,
+					 int *sent_to_proc, int nprocs,
+					 int myrank, int buftype_is_contig,
+					 int contig_access_count,
+					 int * striping_info,
+					 ADIOI_Access * others_req,
+					 int *send_buf_idx,
+					 int *curr_to_proc, int *done_to_proc,
+                                         int *hole, int iter,
+                                         MPI_Aint buftype_extent,
+					 int *buf_idx, int *error_code)
+{
+    int i, j, nprocs_recv, nprocs_send, err;
+    char **send_buf = NULL;
+    MPI_Request *requests, *send_req;
+    MPI_Datatype *recv_types;
+    MPI_Status *statuses, status;
+    int *srt_len, sum, sum_recv;
+    ADIO_Offset *srt_off;
+    int data_sieving = *hole;
+    static char myname[] = "ADIOI_W_EXCHANGE_DATA";
+
+    /* create derived datatypes for recv */
+    nprocs_recv = 0;
+    for (i = 0; i < nprocs; i++)
+	if (recv_size[i])
+	    nprocs_recv++;
+
+    recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
+					       sizeof(MPI_Datatype));
+    /* +1 to avoid a 0-size malloc */
+
+    j = 0;
+    for (i = 0; i < nprocs; i++) {
+	if (recv_size[i]) {
+	    MPI_Type_hindexed(count[i],
+			      &(others_req[i].lens[start_pos[i]]),
+			      &(others_req[i].mem_ptrs[start_pos[i]]),
+			      MPI_BYTE, recv_types + j);
+	    /* absolute displacements; use MPI_BOTTOM in recv */
+	    MPI_Type_commit(recv_types + j);
+	    j++;
+	}
+    }
+
+    /* To avoid a read-modify-write,
+     * check if there are holes in the data to be written.
+     * For this, merge the (sorted) offset lists others_req using a heap-merge.
+     */
+
+    sum = 0;
+    for (i = 0; i < nprocs; i++)
+	sum += count[i];
+    srt_off = (ADIO_Offset *) ADIOI_Malloc((sum + 1) * sizeof(ADIO_Offset));
+    srt_len = (int *) ADIOI_Malloc((sum + 1) * sizeof(int));
+    /* +1 to avoid a 0-size malloc */
+
+    ADIOI_Heap_merge(others_req, count, srt_off, srt_len, start_pos,
+		     nprocs, nprocs_recv, sum);
+
+    /* check if there are any holes */
+    *hole = 0;
+    for (i = 0; i < sum - 1; i++) {
+        if (srt_off[i] + srt_len[i] < srt_off[i + 1]) {
+            *hole = 1;
+	    break;
+	}
+    }
+    /* In some cases (see John Bent ROMIO REQ # 835), an odd interaction
+     * between aggregation, nominally contiguous regions, and cb_buffer_size
+     * should be handled with a read-modify-write (otherwise we will write out
+     * more data than we receive from everyone else (inclusive), so override
+     * hole detection
+     */
+    if (*hole == 0) {
+        sum_recv = 0;
+        for (i = 0; i < nprocs; i++)
+            sum_recv += recv_size[i];
+	if (size > sum_recv)
+	    *hole = 1;
+    }
+    /* check the hint for data sieving */
+    if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
+        ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
+                        ADIO_EXPLICIT_OFFSET, off, &status, &err);
+        // --BEGIN ERROR HANDLING--
+        if (err != MPI_SUCCESS) {
+            *error_code = MPIO_Err_create_code(err,
+                                               MPIR_ERR_RECOVERABLE,
+                                               myname, __LINE__,
+                                               MPI_ERR_IO,
+                                               "**ioRMWrdwr", 0);
+            ADIOI_Free(recv_types);
+            ADIOI_Free(srt_off);
+            ADIOI_Free(srt_len);
+            return;
+        }
+        // --END ERROR HANDLING--
+    }
+    ADIOI_Free(srt_off);
+    ADIOI_Free(srt_len);
+
+    nprocs_send = 0;
+    for (i = 0; i < nprocs; i++)
+	if (send_size[i])
+	    nprocs_send++;
+
+    if (fd->atomicity) {
+	/* bug fix from Wei-keng Liao and Kenin Coloma */
+	requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
+                                                sizeof(MPI_Request));
+	send_req = requests;
+    } else {
+	requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
+                                                sizeof(MPI_Request));
+	/* +1 to avoid a 0-size malloc */
+
+	/* post receives */
+	j = 0;
+	for (i = 0; i < nprocs; i++) {
+	    if (recv_size[i]) {
+		MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
+			  myrank + i + 100 * iter, fd->comm, requests + j);
+		j++;
+	    }
+	}
+	send_req = requests + nprocs_recv;
+    }
+
+    /* post sends.
+     * if buftype_is_contig, data can be directly sent from
+     * user buf at location given by buf_idx. else use send_buf.
+     */
+    if (buftype_is_contig) {
+	j = 0;
+	for (i = 0; i < nprocs; i++)
+	    if (send_size[i]) {
+		MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
+			  MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
+			  send_req + j);
+		j++;
+		buf_idx[i] += send_size[i];
+	    }
+    } else if (nprocs_send) {
+	/* buftype is not contig */
+	send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
+	for (i = 0; i < nprocs; i++)
+	    if (send_size[i])
+		send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
+
+	ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
+                                      len_list, send_size, send_req,
+                                      sent_to_proc, nprocs, myrank,
+                                      contig_access_count, striping_info,
+                                      send_buf_idx, curr_to_proc, done_to_proc,
+                                      iter, buftype_extent);
+	/* the send is done in ADIOI_Fill_send_buffer */
+    }
+
+	/* bug fix from Wei-keng Liao and Kenin Coloma */
+    if (fd->atomicity) {
+	j = 0;
+	for (i = 0; i < nprocs; i++) {
+	    MPI_Status wkl_status;
+	    if (recv_size[i]) {
+		MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
+			 myrank + i + 100 * iter, fd->comm, &wkl_status);
+		j++;
+	    }
+	}
+    }
+
+    for (i = 0; i < nprocs_recv; i++)
+	MPI_Type_free(recv_types + i);
+    ADIOI_Free(recv_types);
+
+	/* bug fix from Wei-keng Liao and Kenin Coloma */
+	/* +1 to avoid a 0-size malloc */
+    if (fd->atomicity) {
+	statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
+					       sizeof(MPI_Status));
+    } else {
+	statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
+					       sizeof(MPI_Status));
+    }
+
+#ifdef NEEDS_MPI_TEST
+    i = 0;
+    if (fd->atomicity) {
+	/* bug fix from Wei-keng Liao and Kenin Coloma */
+	while (!i)
+	    MPI_Testall(nprocs_send, send_req, &i, statuses);
+    } else {
+	while (!i)
+	    MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
+    }
+#else
+	/* bug fix from Wei-keng Liao and Kenin Coloma */
+    if (fd->atomicity)
+	MPI_Waitall(nprocs_send, send_req, statuses);
+    else
+	MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
+#endif
+    ADIOI_Free(statuses);
+    ADIOI_Free(requests);
+    if (!buftype_is_contig && nprocs_send) {
+	for (i = 0; i < nprocs; i++)
+	    if (send_size[i])
+		ADIOI_Free(send_buf[i]);
+	ADIOI_Free(send_buf);
+    }
+}
+
+#define ADIOI_BUF_INCR \
+{ \
+    while (buf_incr) { \
+        size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
+        user_buf_idx += size_in_buf; \
+        flat_buf_sz -= size_in_buf; \
+        if (!flat_buf_sz) { \
+            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
+            else { \
+                flat_buf_idx = 0; \
+                n_buftypes++; \
+            } \
+            user_buf_idx = flat_buf->indices[flat_buf_idx] + \
+                              n_buftypes*buftype_extent; \
+            flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
+        } \
+        buf_incr -= size_in_buf; \
+    } \
+}
+
+
+#define ADIOI_BUF_COPY \
+{ \
+    while (size) { \
+        size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
+        memcpy(&(send_buf[p][send_buf_idx[p]]), \
+               ((char *) buf) + user_buf_idx, size_in_buf); \
+        send_buf_idx[p] += size_in_buf; \
+        user_buf_idx += size_in_buf; \
+        flat_buf_sz -= size_in_buf; \
+        if (!flat_buf_sz) { \
+            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
+            else { \
+                flat_buf_idx = 0; \
+                n_buftypes++; \
+            } \
+            user_buf_idx = flat_buf->indices[flat_buf_idx] + \
+                              n_buftypes*buftype_extent; \
+            flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
+        } \
+        size -= size_in_buf; \
+        buf_incr -= size_in_buf; \
+    } \
+    ADIOI_BUF_INCR \
+}
+
+static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, void *buf,
+					  ADIOI_Flatlist_node * flat_buf,
+					  char **send_buf,
+					  ADIO_Offset * offset_list,
+					  int *len_list, int *send_size,
+					  MPI_Request * requests,
+					  int *sent_to_proc, int nprocs,
+					  int myrank,
+					  int contig_access_count,
+					  int * striping_info,
+					  int *send_buf_idx,
+					  int *curr_to_proc,
+					  int *done_to_proc, int iter,
+					  MPI_Aint buftype_extent)
+{
+    /* this function is only called if buftype is not contig */
+    int i, p, flat_buf_idx, size;
+    int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
+    ADIO_Offset off, len, rem_len, user_buf_idx;
+
+    /* curr_to_proc[p] = amount of data sent to proc. p that has already
+     * been accounted for so far
+     * done_to_proc[p] = amount of data already sent to proc. p in
+     * previous iterations
+     * user_buf_idx = current location in user buffer
+     * send_buf_idx[p] = current location in send_buf of proc. p
+     */
+
+    for (i = 0; i < nprocs; i++) {
+	send_buf_idx[i] = curr_to_proc[i] = 0;
+	done_to_proc[i] = sent_to_proc[i];
+    }
+    jj = 0;
+
+    user_buf_idx = flat_buf->indices[0];
+    flat_buf_idx = 0;
+    n_buftypes = 0;
+    flat_buf_sz = flat_buf->blocklens[0];
+
+    /* flat_buf_idx = current index into flattened buftype
+     * flat_buf_sz = size of current contiguous component in flattened buf
+     */
+    for (i = 0; i < contig_access_count; i++) {
+	off = offset_list[i];
+	rem_len = (ADIO_Offset) len_list[i];
+
+	/*this request may span to more than one process */
+	while (rem_len != 0) {
+	    len = rem_len;
+	    /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
+	     * longer than the single region that processor "p" is responsible
+	     * for.
+	     */
+	    p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
+
+	    if (send_buf_idx[p] < send_size[p]) {
+		if (curr_to_proc[p] + len > done_to_proc[p]) {
+		    if (done_to_proc[p] > curr_to_proc[p]) {
+			size = (int) ADIOI_MIN(curr_to_proc[p] + len -
+					       done_to_proc[p],
+					       send_size[p] -
+					       send_buf_idx[p]);
+			buf_incr = done_to_proc[p] - curr_to_proc[p];
+			ADIOI_BUF_INCR
+			    buf_incr = (int) (curr_to_proc[p] + len -
+					      done_to_proc[p]);
+			curr_to_proc[p] = done_to_proc[p] + size;
+		        ADIOI_BUF_COPY
+                    } else {
+			size = (int) ADIOI_MIN(len, send_size[p] -
+					       send_buf_idx[p]);
+			buf_incr = (int) len;
+			curr_to_proc[p] += size;
+		        ADIOI_BUF_COPY
+                    }
+		    if (send_buf_idx[p] == send_size[p]) {
+			MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
+				  myrank + p + 100 * iter, fd->comm,
+				  requests + jj);
+			jj++;
+		    }
+		} else {
+		    curr_to_proc[p] += (int) len;
+		    buf_incr = (int) len;
+		    ADIOI_BUF_INCR
+                }
+	    } else {
+		buf_incr = (int) len;
+	        ADIOI_BUF_INCR
+            }
+	    off += len;
+	    rem_len -= len;
+	}
+    }
+    for (i = 0; i < nprocs; i++)
+	if (send_size[i])
+	    sent_to_proc[i] = curr_to_proc[i];
+}

Added: mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_wrstr.c
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_wrstr.c	                        (rev 0)
+++ mpich2/trunk/src/mpi/romio/adio/ad_lustre/ad_lustre_wrstr.c	2009-03-13 15:30:30 UTC (rev 4055)
@@ -0,0 +1,467 @@
+/* -*- Mode: C; c-basic-offset:4 ; -*- */
+/*
+ *   Copyright (C) 1997 University of Chicago.
+ *   See COPYRIGHT notice in top-level directory.
+ *
+ *   Copyright (C) 2007 Oak Ridge National Laboratory
+ *
+ *   Copyright (C) 2008 Sun Microsystems, Lustre group
+ */
+
+#include "ad_lustre.h"
+#include "adio_extern.h"
+
+#define ADIOI_BUFFERED_WRITE \
+{ \
+    if (req_off >= writebuf_off + writebuf_len) { \
+        if (writebuf_len) { \
+           ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
+                  ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
+           if (!(fd->atomicity)) \
+                ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
+           if (*error_code != MPI_SUCCESS) { \
+               *error_code = MPIO_Err_create_code(*error_code, \
+                                                  MPIR_ERR_RECOVERABLE, myname, \
+                                                  __LINE__, MPI_ERR_IO, \
+                                                  "**iowswc", 0); \
+               ADIOI_Free(writebuf); \
+               return; \
+           } \
+        } \
+	writebuf_off = req_off; \
+        /* stripe_size alignment */ \
+        writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
+                                       (writebuf_off / stripe_size + 1) * \
+                                       stripe_size - writebuf_off);\
+	if (!(fd->atomicity)) \
+            ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
+	ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, ADIO_EXPLICIT_OFFSET,\
+                        writebuf_off, &status1, error_code); \
+	if (*error_code != MPI_SUCCESS) { \
+	    *error_code = MPIO_Err_create_code(*error_code, \
+					       MPIR_ERR_RECOVERABLE, myname, \
+					       __LINE__, MPI_ERR_IO, \
+					       "**iowsrc", 0); \
+            ADIOI_Free(writebuf); \
+	    return; \
+	} \
+    } \
+    write_sz = (int) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \
+    memcpy(writebuf + req_off - writebuf_off, (char *)buf + userbuf_off, write_sz);\
+    while (write_sz != req_len) {\
+        ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
+                         ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
+        if (!(fd->atomicity)) \
+            ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
+        if (*error_code != MPI_SUCCESS) { \
+            *error_code = MPIO_Err_create_code(*error_code, \
+                                               MPIR_ERR_RECOVERABLE, myname, \
+                                               __LINE__, MPI_ERR_IO, \
+                                               "**iowswc", 0); \
+            ADIOI_Free(writebuf); \
+            return; \
+        } \
+        req_len -= write_sz; \
+        userbuf_off += write_sz; \
+        writebuf_off += writebuf_len; \
+        /* stripe_size alignment */ \
+        writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
+                                       (writebuf_off / stripe_size + 1) * \
+                                       stripe_size - writebuf_off);\
+	if (!(fd->atomicity)) \
+            ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
+        ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE, ADIO_EXPLICIT_OFFSET,\
+                        writebuf_off, &status1, error_code); \
+	if (*error_code != MPI_SUCCESS) { \
+	    *error_code = MPIO_Err_create_code(*error_code, \
+	                                       MPIR_ERR_RECOVERABLE, myname, \
+		                               __LINE__, MPI_ERR_IO, \
+				               "**iowsrc", 0); \
+            ADIOI_Free(writebuf); \
+	    return; \
+	} \
+        write_sz = ADIOI_MIN(req_len, writebuf_len); \
+        memcpy(writebuf, (char *)buf + userbuf_off, write_sz);\
+    } \
+}
+
+
+/* this macro is used when filetype is contig and buftype is not contig.
+   it does not do a read-modify-write and does not lock*/
+#define ADIOI_BUFFERED_WRITE_WITHOUT_READ \
+{ \
+    if (req_off >= writebuf_off + writebuf_len) { \
+	writebuf_off = req_off; \
+        /* stripe_size alignment */ \
+        writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
+                                       (writebuf_off / stripe_size + 1) * \
+                                       stripe_size - writebuf_off);\
+    } \
+    write_sz = (int) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \
+    memcpy(writebuf + req_off - writebuf_off, (char *)buf + userbuf_off, write_sz);\
+    while (req_len) { \
+        ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
+                         ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
+        if (*error_code != MPI_SUCCESS) { \
+            *error_code = MPIO_Err_create_code(*error_code, \
+                                               MPIR_ERR_RECOVERABLE, myname, \
+                                               __LINE__, MPI_ERR_IO, \
+                                               "**iowswc", 0); \
+            ADIOI_Free(writebuf); \
+            return; \
+        } \
+        req_len -= write_sz; \
+        userbuf_off += write_sz; \
+        writebuf_off += writebuf_len; \
+        /* stripe_size alignment */ \
+        writebuf_len = (int) ADIOI_MIN(end_offset - writebuf_off + 1, \
+                                       (writebuf_off / stripe_size + 1) * \
+                                       stripe_size - writebuf_off);\
+        write_sz = ADIOI_MIN(req_len, writebuf_len); \
+        memcpy(writebuf, (char *)buf + userbuf_off, write_sz);\
+    } \
+}
+
+void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, void *buf, int count,
+			       MPI_Datatype datatype, int file_ptr_type,
+			       ADIO_Offset offset, ADIO_Status * status,
+			       int *error_code)
+{
+    /* offset is in units of etype relative to the filetype. */
+    ADIOI_Flatlist_node *flat_buf, *flat_file;
+    int i, j, k, bwr_size, fwr_size = 0, st_index = 0;
+    int bufsize, num, size, sum, n_etypes_in_filetype, size_in_filetype;
+    int n_filetypes, etype_in_filetype;
+    ADIO_Offset abs_off_in_filetype = 0;
+    int filetype_size, etype_size, buftype_size, req_len;
+    MPI_Aint filetype_extent, buftype_extent;
+    int buf_count, buftype_is_contig, filetype_is_contig;
+    ADIO_Offset userbuf_off;
+    ADIO_Offset off, req_off, disp, end_offset = 0, writebuf_off, start_off;
+    char *writebuf;
+    int flag, st_fwr_size, st_n_filetypes, writebuf_len, write_sz;
+    ADIO_Status status1;
+    int new_bwr_size, new_fwr_size;
+    int stripe_size;
+    static char myname[] = "ADIOI_LUSTRE_WriteStrided";
+    int myrank;
+    MPI_Comm_rank(fd->comm, &myrank);
+
+    if (fd->hints->ds_write == ADIOI_HINT_DISABLE) {
+	/* if user has disabled data sieving on writes, use naive
+	 * approach instead.
+	 */
+	ADIOI_GEN_WriteStrided_naive(fd,
+				     buf,
+				     count,
+				     datatype,
+				     file_ptr_type,
+				     offset, status, error_code);
+	return;
+    }
+
+    *error_code = MPI_SUCCESS;	/* changed below if error */
+
+    ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
+    ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
+
+    MPI_Type_size(fd->filetype, &filetype_size);
+    if (!filetype_size) {
+	*error_code = MPI_SUCCESS;
+	return;
+    }
+
+    MPI_Type_extent(fd->filetype, &filetype_extent);
+    MPI_Type_size(datatype, &buftype_size);
+    MPI_Type_extent(datatype, &buftype_extent);
+    etype_size = fd->etype_size;
+
+    bufsize = buftype_size * count;
+
+    /* get striping info */
+    stripe_size = fd->hints->striping_unit;
+
+    /* Different buftype to different filetype */
+    if (!buftype_is_contig && filetype_is_contig) {
+        /* noncontiguous in memory, contiguous in file. */
+	ADIOI_Flatten_datatype(datatype);
+	flat_buf = ADIOI_Flatlist;
+	while (flat_buf->type != datatype)
+	    flat_buf = flat_buf->next;
+
+	off = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
+	    fd->disp + etype_size * offset;
+
+	start_off = off;
+	end_offset = start_off + bufsize - 1;
+	writebuf_off = start_off;
+        /* write stripe size buffer each time */
+	writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
+        writebuf_len = (int) ADIOI_MIN(bufsize,
+                                       (writebuf_off / stripe_size + 1) *
+                                       stripe_size - writebuf_off);
+
+        /* if atomicity is true, lock the region to be accessed */
+	if (fd->atomicity)
+	    ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize);
+
+	for (j = 0; j < count; j++) {
+	    for (i = 0; i < flat_buf->count; i++) {
+		userbuf_off = j * buftype_extent + flat_buf->indices[i];
+		req_off = off;
+		req_len = flat_buf->blocklens[i];
+		ADIOI_BUFFERED_WRITE_WITHOUT_READ
+		off += flat_buf->blocklens[i];
+	    }
+        }
+
+	/* write the buffer out finally */
+	ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
+			 ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,
+			 error_code);
+
+	if (fd->atomicity)
+	    ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize);
+	if (*error_code != MPI_SUCCESS) {
+            ADIOI_Free(writebuf);
+	    return;
+        }
+	ADIOI_Free(writebuf);
+	if (file_ptr_type == ADIO_INDIVIDUAL)
+	    fd->fp_ind = off;
+    } else {
+        /* noncontiguous in file */
+        /* filetype already flattened in ADIO_Open */
+	flat_file = ADIOI_Flatlist;
+	while (flat_file->type != fd->filetype)
+	    flat_file = flat_file->next;
+	disp = fd->disp;
+
+	if (file_ptr_type == ADIO_INDIVIDUAL) {
+	    offset = fd->fp_ind;	/* in bytes */
+	    n_filetypes = -1;
+	    flag = 0;
+	    while (!flag) {
+		n_filetypes++;
+		for (i = 0; i < flat_file->count; i++) {
+		    if (disp + flat_file->indices[i] +
+			(ADIO_Offset) n_filetypes * filetype_extent +
+			flat_file->blocklens[i]	>= offset) {
+			st_index = i;
+			fwr_size = (int) (disp + flat_file->indices[i] +
+					  (ADIO_Offset) n_filetypes *
+					  filetype_extent +
+					  flat_file->blocklens[i] -
+					  offset);
+			flag = 1;
+			break;
+		    }
+		}
+	    }
+	} else {
+	    n_etypes_in_filetype = filetype_size / etype_size;
+	    n_filetypes = (int) (offset / n_etypes_in_filetype);
+	    etype_in_filetype = (int) (offset % n_etypes_in_filetype);
+	    size_in_filetype = etype_in_filetype * etype_size;
+
+	    sum = 0;
+	    for (i = 0; i < flat_file->count; i++) {
+		sum += flat_file->blocklens[i];
+		if (sum > size_in_filetype) {
+		    st_index = i;
+		    fwr_size = sum - size_in_filetype;
+		    abs_off_in_filetype = flat_file->indices[i] +
+			size_in_filetype - (sum - flat_file->blocklens[i]);
+		    break;
+		}
+	    }
+
+	    /* abs. offset in bytes in the file */
+	    offset = disp + (ADIO_Offset) n_filetypes *filetype_extent +
+		     abs_off_in_filetype;
+	}
+
+	start_off = offset;
+
+	/* If the file bytes is actually contiguous, we do not need data sieve at all */
+	if (bufsize <= fwr_size) {
+            req_off = start_off;
+            req_len = bufsize;
+            end_offset = start_off + bufsize - 1;
+	    writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
+	    memset(writebuf, -1, ADIOI_MIN(bufsize, stripe_size));
+            writebuf_off = 0;
+            writebuf_len = 0;
+            userbuf_off = 0;
+            ADIOI_BUFFERED_WRITE_WITHOUT_READ
+	} else {
+	    /* Calculate end_offset, the last byte-offset that will be accessed.
+	       e.g., if start_offset=0 and 100 bytes to be write, end_offset=99 */
+	    st_fwr_size = fwr_size;
+	    st_n_filetypes = n_filetypes;
+	    i = 0;
+	    j = st_index;
+	    off = offset;
+	    fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
+	    while (i < bufsize) {
+		i += fwr_size;
+		end_offset = off + fwr_size - 1;
+
+		if (j < (flat_file->count - 1))
+		    j++;
+		else {
+		    j = 0;
+		    n_filetypes++;
+		}
+
+		off = disp + flat_file->indices[j] +
+		      (ADIO_Offset) n_filetypes * filetype_extent;
+		fwr_size = ADIOI_MIN(flat_file->blocklens[j], bufsize - i);
+	    }
+
+	    writebuf_off = 0;
+	    writebuf_len = 0;
+	    writebuf = (char *) ADIOI_Malloc(stripe_size);
+	    memset(writebuf, -1, stripe_size);
+	    /* if atomicity is true, lock the region to be accessed */
+	    if (fd->atomicity)
+		ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize);
+
+	    if (buftype_is_contig && !filetype_is_contig) {
+		/* contiguous in memory, noncontiguous in file. should be the most
+		   common case. */
+		i = 0;
+		j = st_index;
+		off = offset;
+		n_filetypes = st_n_filetypes;
+		fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
+		while (i < bufsize) {
+		    if (fwr_size) {
+			/* TYPE_UB and TYPE_LB can result in
+			   fwr_size = 0. save system call in such cases */
+			/*
+                        lseek(fd->fd_sys, off, SEEK_SET);
+			err = write(fd->fd_sys, ((char *) buf) + i, fwr_size);
+                        */
+			req_off = off;
+			req_len = fwr_size;
+			userbuf_off = i;
+			ADIOI_BUFFERED_WRITE
+                    }
+		    i += fwr_size;
+
+		    if (off + fwr_size < disp + flat_file->indices[j] +
+		                         flat_file->blocklens[j] +
+			    (ADIO_Offset) n_filetypes * filetype_extent)
+		        off += fwr_size;
+		    /* did not reach end of contiguous block in filetype.
+		    no more I/O needed. off is incremented by fwr_size. */
+		    else {
+		        if (j < (flat_file->count - 1))
+			    j++;
+			else {
+			    j = 0;
+			    n_filetypes++;
+			}
+			off = disp + flat_file->indices[j] +
+		              (ADIO_Offset) n_filetypes * filetype_extent;
+			fwr_size = ADIOI_MIN(flat_file->blocklens[j],
+                                             bufsize - i);
+		    }
+		}
+	    } else {
+		    /* noncontiguous in memory as well as in file */
+	        ADIOI_Flatten_datatype(datatype);
+	        flat_buf = ADIOI_Flatlist;
+	        while (flat_buf->type != datatype)
+		    flat_buf = flat_buf->next;
+
+		k = num = buf_count = 0;
+		i = (int) (flat_buf->indices[0]);
+		j = st_index;
+		off = offset;
+		n_filetypes = st_n_filetypes;
+		fwr_size = st_fwr_size;
+		bwr_size = flat_buf->blocklens[0];
+
+		while (num < bufsize) {
+		    size = ADIOI_MIN(fwr_size, bwr_size);
+		    if (size) {
+		        /*
+                        lseek(fd->fd_sys, off, SEEK_SET);
+		         err = write(fd->fd_sys, ((char *) buf) + i, size);
+                        */
+		        req_off = off;
+		        req_len = size;
+		        userbuf_off = i;
+		        ADIOI_BUFFERED_WRITE
+                    }
+
+	            new_fwr_size = fwr_size;
+		    new_bwr_size = bwr_size;
+
+		    if (size == fwr_size) {
+		        /* reached end of contiguous block in file */
+		        if (j < (flat_file->count - 1)) {
+			    j++;
+                        } else {
+			    j = 0;
+			    n_filetypes++;
+			}
+			off = disp + flat_file->indices[j] +
+			      (ADIO_Offset) n_filetypes * filetype_extent;
+                        new_fwr_size = flat_file->blocklens[j];
+			if (size != bwr_size) {
+			    i += size;
+			    new_bwr_size -= size;
+			}
+		    }
+		    if (size == bwr_size) {
+		        /* reached end of contiguous block in memory */
+		        k = (k + 1) % flat_buf->count;
+		        buf_count++;
+		        i = (int) (buftype_extent *
+                                  (buf_count / flat_buf->count) +
+				  flat_buf->indices[k]);
+			new_bwr_size = flat_buf->blocklens[k];
+			if (size != fwr_size) {
+			    off += size;
+			    new_fwr_size -= size;
+			}
+		    }
+		    num += size;
+		    fwr_size = new_fwr_size;
+		    bwr_size = new_bwr_size;
+		}
+            }
+
+	    /* write the buffer out finally */
+	    if (writebuf_len) {
+	        ADIO_WriteContig(fd, writebuf, writebuf_len,
+	                         MPI_BYTE, ADIO_EXPLICIT_OFFSET,
+	                         writebuf_off, &status1, error_code);
+		if (!(fd->atomicity))
+		    ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len);
+		if (*error_code != MPI_SUCCESS) {
+                    ADIOI_Free(writebuf);
+		    return;
+                }
+	    }
+	    if (fd->atomicity)
+	        ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize);
+	}
+        ADIOI_Free(writebuf);
+	if (file_ptr_type == ADIO_INDIVIDUAL)
+	    fd->fp_ind = off;
+    }
+    fd->fp_sys_posn = -1;	/* set it to null. */
+
+#ifdef HAVE_STATUS_SET_BYTES
+    MPIR_Status_set_bytes(status, datatype, bufsize);
+    /* This is a temporary way of filling in status. The right way is to
+    keep track of how much data was actually written by ADIOI_BUFFERED_WRITE. */
+#endif
+
+    if (!buftype_is_contig)
+        ADIOI_Delete_flattened(datatype);
+}

Modified: mpich2/trunk/src/mpi/romio/adio/common/ad_write_coll.c
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/common/ad_write_coll.c	2009-03-13 11:28:26 UTC (rev 4054)
+++ mpich2/trunk/src/mpi/romio/adio/common/ad_write_coll.c	2009-03-13 15:30:30 UTC (rev 4055)
@@ -46,7 +46,7 @@
                            int *send_buf_idx, int *curr_to_proc, 
                            int *done_to_proc, int iter, 
                            MPI_Aint buftype_extent);
-static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, 
+void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, 
                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,
                       int nprocs, int nprocs_recv, int total_elements);
 
@@ -958,7 +958,7 @@
 
 
 
-static void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, 
+void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count, 
 		      ADIO_Offset *srt_off, int *srt_len, int *start_pos,
 		      int nprocs, int nprocs_recv, int total_elements)
 {

Modified: mpich2/trunk/src/mpi/romio/adio/include/adioi.h
===================================================================
--- mpich2/trunk/src/mpi/romio/adio/include/adioi.h	2009-03-13 11:28:26 UTC (rev 4054)
+++ mpich2/trunk/src/mpi/romio/adio/include/adioi.h	2009-03-13 15:30:30 UTC (rev 4055)
@@ -64,6 +64,14 @@
 		    int dtype_read;
 		    int dtype_write;
 	    } pvfs2;
+            struct {
+                    int start_iodevice;
+                    int CO;
+                    int bigsize;
+                    int contig_data;
+                    int samesize;
+                    int ds_in_coll;
+            } lustre;
     } fs_hints;
 
 };

Modified: mpich2/trunk/src/mpi/romio/configure.in
===================================================================
--- mpich2/trunk/src/mpi/romio/configure.in	2009-03-13 11:28:26 UTC (rev 4054)
+++ mpich2/trunk/src/mpi/romio/configure.in	2009-03-13 15:30:30 UTC (rev 4055)
@@ -1123,8 +1123,14 @@
 if test -n "$file_system_testfs"; then
     AC_DEFINE(ROMIO_TESTFS,1,[Define for ROMIO with TESTFS])
 fi
+#
+# Verify presence of lustre/lustre_user.h
+#
 if test -n "$file_system_lustre"; then
-    AC_DEFINE(ROMIO_LUSTRE,1,[Define for ROMIO with LUSTRE])
+    AC_CHECK_HEADERS(lustre/lustre_user.h,
+        AC_DEFINE(ROMIO_LUSTRE,1,[Define for ROMIO with LUSTRE]),
+        AC_MSG_ERROR([LUSTRE support requested but cannot find lustre/lustre_user.h header file])                                        
+    )
 fi
 
 if test -n "$file_system_xfs"; then



More information about the mpich2-commits mailing list