[Swift-devel] hang checker fun

Allan Espinosa aespinosa at cs.uchicago.edu
Sun Mar 27 17:56:23 CDT 2011


oops. trimmed the first part.

Thanks

type SgtDim;
type Variation;
type Seismogram;
type PeakValue;

type Station {
  string name;
  float lat;
  float lon;
  int erf;
  int variation_scenario;
}

type Sgt {
  SgtDim x;
  SgtDim y;
}

type Rupture {
  int source;
  int index;
  int size;
}

/* some constants used by the apps*/
global int num_time_steps = 3000;
global string spectra_period1 = "all";
global float filter_highhz = 5.0;
global float simulation_timeskip = 0.1;

app (Sgt _ext) extract(Sgt _sgt, Station _stat, Variation _var) {
  extract @strcat("stat=", _stat.name) "extract_sgt=1"
      @strcat("slon=", _stat.lon) @strcat("slat=", _stat.lat)

      @strcat("rupmodfile=", @filename(_var))
      @strcat("sgt_xfile=", @filename(_sgt.x))
      @strcat("sgt_yfile=", @filename(_sgt.y))
      @strcat("extract_sgt_xfile=", @filename(_ext.x))
      @strcat("extract_sgt_yfile=", @filename(_ext.y));
}

app (Seismogram _seis, PeakValue _peak)
    seispeak(Sgt _sgt, Variation _var, Station _stat) {
  seispeak
      /* Args of seismogram synthesis     */
      @strcat("stat=", _stat.name) "extract_sgt=0"
      @strcat("slon=", _stat.lon) @strcat("slat=", _stat.lat)
      "outputBinary=1" "mergeOutput=1" @strcat("ntout=", num_time_steps)

      @strcat("rupmodfile=", @filename(_var))
      @strcat("sgt_xfile=", @filename(_sgt.x))
      @strcat("sgt_yfile=", @filename(_sgt.y))
      @strcat("seis_file=", @filename(_seis))

      /* Args of peak ground acceleration */
      "simulation_out_pointsX=2" "simulation_out_pointsY=1"
      "surfseis_rspectra_seismogram_units=cmpersec"
      "surfseis_rspectra_output_units=cmpersec2"
      "surfseis_rspectra_output_type=aa"
      "surfseis_rspectra_apply_byteswap=no"

      @strcat("simulation_out_timesamples=", num_time_steps)
      @strcat("simulation_out_timeskip=", simulation_timeskip)
      @strcat("surfseis_rspectra_period=", spectra_period1)
      @strcat(" surfseis_rspectra_apply_filter_highHZ=", filter_highhz)
      @strcat("in=", @filename(_seis))
      @strcat("out=", @filename(_peak));
}

app (Seismogram _seis, PeakValue _peak)
    seispeak_local(Sgt _sgt, Variation _var, Station _stat) {
  seispeak_local
      /* Args of seismogram synthesis     */
      @strcat("stat=", _stat.name) "extract_sgt=0"
      @strcat("slon=", _stat.lon) @strcat("slat=", _stat.lat)
      "outputBinary=1" "mergeOutput=1" @strcat("ntout=", num_time_steps)

      @strcat("rupmodfile=", @filename(_var))
      @strcat("sgt_xfile=", @filename(_sgt.x))
      @strcat("sgt_yfile=", @filename(_sgt.y))
      @strcat("seis_file=", @filename(_seis))

      /* Args of peak ground acceleration */
      "simulation_out_pointsX=2" "simulation_out_pointsY=1"
      "surfseis_rspectra_seismogram_units=cmpersec"
      "surfseis_rspectra_output_units=cmpersec2"
      "surfseis_rspectra_output_type=aa"
      "surfseis_rspectra_apply_byteswap=no"

      @strcat("simulation_out_timesamples=", num_time_steps)
      @strcat("simulation_out_timeskip=", simulation_timeskip)
      @strcat("surfseis_rspectra_period=", spectra_period1)
      @strcat(" surfseis_rspectra_apply_filter_highHZ=", filter_highhz)
      @strcat("in=", @filename(_seis))
      @strcat("out=", @filename(_peak));
}

app (Seismogram _seis[], PeakValue _peak[])
    seispeak_agg(Sgt _sgt, Variation _var[], Station _stat, int n) {
  seispeak_agg
      /* System args */
      _stat.name _stat.lon _stat.lat num_time_steps
      num_time_steps simulation_timeskip spectra_period1 filter_highhz

      @filename(_sgt.x) @filename(_sgt.y)

      n @filenames(_var) @filenames(_seis) @filenames(_peak);
}

// Auxillary functions for the mappers
type StationFile;
app (StationFile _stat) getsite_file(int _run_id) {
  getsite _run_id stdout=@filename(_stat);
}
(Station _stat) get_site(int _run_id) {
  StationFile file<"/var/tmp/site_tmp">;
  /*file = getsite_file(_run_id);*/
  _stat = readData(file);
}

type RuptureFile;
app (RuptureFile _rup) getrupture_file(int _run_id) {
  getrupture _run_id stdout=@filename(_rup);
}
(Rupture _rup[]) get_ruptures(int _run_id, Station _site) {
  /*RuptureFile file<single_file_mapper; file=@strcat(_site.name,
"/rup_tmp")>;*/
  RuptureFile file<"LGU/rup_tmp">;
  /*file = getrupture_file(_run_id);*/
  _rup = readData(file);
}

type VariationFile;
app (VariationFile _var) getvariation_file(Station _site, Rupture _rup,
    string _loc) {
  variation_mapper "-e" _site.erf "-v" _site.variation_scenario
      "-l" _loc "-s" _rup.source "-r" _rup.index stdout=@_var;
}
(string _vars[]) get_variations(Station _site, Rupture _rup, string _loc){
  string fname = @strcat(_rup.source, "_", _rup.index);
  VariationFile file<single_file_mapper;
      file=@strcat(_site.name, "/varlist/", _rup.source, "/", fname, ".txt")>;
  /*file = getvariation_file(_site, _rup, _loc);*/
  _vars = readData(file);
}

type offset {
  int off;
  int size;
}
type offset_file;
(offset _off[]) mkoffset(int _size, int _group_size) {
   offset_file file <single_file_mapper; file=@strcat("LGU/offset-",_size)>;
   file = mkoffset_file(_size, _group_size);
   _off = readData(file);
}
app (offset_file _off) mkoffset_file(int _size, int _group_size) {
  mkoffset _size _group_size;
}

/* TODO: data management zip jobs */

/* Main program */
int run_id = 664;
int agg_size = 80;
int loc_size = 20;
string datadir =
  "gsiftp://gridftp.pads.ci.uchicago.edu//gpfs/pads/swift/aespinosa/science/cybershake/Results";

Station site = get_site(run_id);

Sgt sgt_var <ext; exec="getsgtvar.rb", r=run_id, s=site.name,
  l="gsiftp://gridftp.pads.ci.uchicago.edu//gpfs/pads/swift/aespinosa/science/cybershake/SgtFiles">;
Rupture rups[] = get_ruptures(run_id, site);

foreach rup in rups {
  string loc_sub = @strcat(datadir, "/", site.name, "/", rup.source,
"/", rup.index);
  Sgt sub <ext; exec="getsub.rb", l=loc_sub, n=site.name, s=rup.source,
      r=rup.index>;
  string var_str[] = get_variations( site, rup,
      "gsiftp://gridftp.pads.ci.uchicago.edu//gpfs/pads/swift/aespinosa/science/cybershake/RuptureVariations"
);
  Variation vars[] <array_mapper; files=var_str>;

  sub = extract(sgt_var,  site, vars[rup.size-1]);

  string seis_str[];
  string peak_str[];

  foreach var,i in vars {
    seis_str[i] = @strcat(loc_sub, "/Seismogram_", site.name, "_", rup.source,
                          "_", rup.index, "_", i, ".grm");
    peak_str[i] = @strcat(loc_sub, "/PeakVals_", site.name, "_", rup.source,
                          "_", rup.index, "_", i, ".bsa");
  }

  Seismogram seis[] <array_mapper; files=seis_str>;
  PeakValue  peak[] <array_mapper; files=peak_str>;

  if(rup.size <= loc_size) {
    /*
     * Not worth to transfer the data. Execute on TeraGrid instead.
     * Also execute on localhost.
     */
    foreach var,i in vars {
      (seis[i], peak[i]) = seispeak_local(sub, var, site);
    }
  } else {if(rup.size <= agg_size) {
    /* Execute on a single resource */
    (seis, peak) = seispeak_agg(sub, vars, site, rup.size);
  } else {
    /*offset offs[] = mkoffset(rup.size, agg_size);*/
    /*for i in offs {*/
      /*(seis, peak) = seispeak_agg(sub, vars[i.off:i.off+off.size],*/
                                  /*off.size);*/
    /*}*/
  }}
}


2011/3/27 Mihael Hategan <hategan at mcs.anl.gov>:
> I don't believe you. There is no SgtDim data type in that script.
>
> Mihael
>
> On Sun, 2011-03-27 at 16:55 -0500, Allan Espinosa wrote:
>> Here it is.  The get_app() calls are simple wrappers to readData()
>>
>> type offset {
>>   int off;
>>   int size;
>> }
>> type offset_file;
>> (offset _off[]) mkoffset(int _size, int _group_size) {
>>    offset_file file <single_file_mapper; file=@strcat("LGU/offset-",_size)>;
>>    file = mkoffset_file(_size, _group_size);
>>    _off = readData(file);
>> }
>> app (offset_file _off) mkoffset_file(int _size, int _group_size) {
>>   mkoffset _size _group_size;
>> }
>>
>> /* TODO: data management zip jobs */
>>
>> /* Main program */
>> int run_id = 664;
>> int agg_size = 80;
>> int loc_size = 20;
>> string datadir =
>>   "gsiftp://gridftp.pads.ci.uchicago.edu//gpfs/pads/swift/aespinosa/science/cybershake/Results";
>>
>> Station site = get_site(run_id);
>>
>> Sgt sgt_var <ext; exec="getsgtvar.rb", r=run_id, s=site.name,
>>   l="gsiftp://gridftp.pads.ci.uchicago.edu//gpfs/pads/swift/aespinosa/science/cybershake/SgtFiles">;
>> Rupture rups[] = get_ruptures(run_id, site);
>>
>> foreach rup in rups {
>>   string loc_sub = @strcat(datadir, "/", site.name, "/", rup.source,
>> "/", rup.index);
>>   Sgt sub <ext; exec="getsub.rb", l=loc_sub, n=site.name, s=rup.source,
>>       r=rup.index>;
>>   string var_str[] = get_variations( site, rup,
>>       "gsiftp://gridftp.pads.ci.uchicago.edu//gpfs/pads/swift/aespinosa/science/cybershake/RuptureVariations"
>> );
>>   Variation vars[] <array_mapper; files=var_str>;
>>
>>   sub = extract(sgt_var,  site, vars[rup.size-1]);
>>
>>   string seis_str[];
>>   string peak_str[];
>>
>>   foreach var,i in vars {
>>     seis_str[i] = @strcat(loc_sub, "/Seismogram_", site.name, "_", rup.source,
>>                           "_", rup.index, "_", i, ".grm");
>>     peak_str[i] = @strcat(loc_sub, "/PeakVals_", site.name, "_", rup.source,
>>                           "_", rup.index, "_", i, ".bsa");
>>   }
>>
>>   Seismogram seis[] <array_mapper; files=seis_str>;
>>   PeakValue  peak[] <array_mapper; files=peak_str>;
>>
>>   if(rup.size <= loc_size) {
>>     /*
>>      * Not worth to transfer the data. Execute on TeraGrid instead.
>>      * Also execute on localhost.
>>      */
>>     foreach var,i in vars {
>>       (seis[i], peak[i]) = seispeak_local(sub, var, site);
>>     }
>>   } else {if(rup.size <= agg_size) {
>>     /* Execute on a single resource */
>>     (seis, peak) = seispeak_agg(sub, vars, site, rup.size);
>>   } else {
>>     /*offset offs[] = mkoffset(rup.size, agg_size);*/
>>     /*for i in offs {*/
>>       /*(seis, peak) = seispeak_agg(sub, vars[i.off:i.off+off.size],*/
>>                                   /*off.size);*/
>>     /*}*/
>>   }}
>> }
>>
>>
>> 2011/3/27 Mihael Hategan <hategan at mcs.anl.gov>:
>> > May I see the script?
>> >
>> > On Fri, 2011-03-25 at 19:42 -0500, Allan Espinosa wrote:
>> >> this has been occurring for 70 times already.  What i expect is for
>> >> the app with SgtDim sub to run and close the future.
>> >>
>> >> 2011-03-25 19:40:12,217-0500 WARN  HangChecker No events in 10s.
>> >> 2011-03-25 19:40:12,217-0500 WARN  HangChecker
>> >> Registered futures:
>> >> Rupture[] rups  Closed, 1 elements, 0 listeners
>> >> Variation vars - Closed, no listeners
>> >> SgtDim sub - Open, 1 listeners
>> >> string site  Closed, no listeners
>> >> Variation[] vars  Closed, 72 elements, 0 listeners
>> >> ----
>> >>
>> >> Waiting threads:
>> >> 0-13
>> >> 0-13-0-7
>> >> 0-13-0-8-1-1
>> >> ----
>> >>
>> >>
>> >
>> >
>> >
>> >
>>



More information about the Swift-devel mailing list