[Swift-user] sort on large data

Jiada Tu jtu3 at hawk.iit.edu
Sun Oct 19 04:00:42 CDT 2014


Hi Yadu,

Thanks for your answer! That's really helpful.



I forgot to forward my last question and your answer to swift-user group,
so if anybody have the same confusions or have interest on it, please check
below :

On Sat, Oct 18, 2014 at 11:45 PM, Yadu Nand Babuji <yadunand at uchicago.edu>
wrote:

>  Hi Jiada,
>
> Please find replies inline :
>
> On 10/18/2014 09:30 PM, Jiada Tu wrote:
>
> Thanks for your answer, Yadu. I have some more questions:
> 1) If I didn't misunderstand, the (outputs) will be staging back to the
> head node, right?
>
> Yes, in the current modes that you are using.
>
>  2)
> ---------------------------
>  type file;
> app (file outs[]) make_outputs (file script)
> {
>     bash @script;
> }
>
> file outputs[] <filesys_mapper; prefix="outputs">;
> file script       <"make_outputs.sh">; # This script creates a few files
> with outputs as prefix
> (outputs) = make_outputs(script);
> ---------------------------
>
>  If I have some later app function that takes the "outputs" files as
> input, will that app function wait until all possible
>
>   outputs generated?
>
> Yes! Swift is implicitly parallel, and the order of execution is based on
> the availability of dependent data items.
>
>   For example:
>
>  app (file outs[]) final_ouputs (file script, file input[])
>  {
>     bash @script @filenames(input)
> }
> foreach i in [0:100]
> {
>     file outputs[] <filesys_mapper; prefix="outputs-"+ at tostring(i)+"-">;
>     #I know ""outputs-"+ at tostring(i)+"-"" may not work, please think it
> as pesudo-code
>     (outputs) = make_outputs(script);
> }
>
>  file script2 <"final_output"> #take multiple input and merge them into a
> file
> file inputs[] <filesys_mapper; prefix="outputs", suffix="000">
> file finoutput<filesys_mapper; prefix="finalouputs-">
> (finoutput) = final_ouputs(script2, inputs)
>
>  final_outputs needs to take some output files from "every" single loops
> as its input (first loop may generate "outputs-0-000", second loop may
> generate "outputs-1-000", etc).
>
>  So, will final_outputs() task be "block" until all make_outputs() task
> finish processing?
>
> Yes, final_outputs will block till the array that it depends on is closed.
>
>   3) Actually, wordCount is our first program, and sort is our second
> program which gives "extra credits". You gives a great answer to another
> question which I also confused in.
>
>   I hope I did not give too much away :)
>
>  The output of sort program will be 10GB, so it will not fit in memory.
> That why I want to split the intermediate files and send them to several
> merge task. Each merge task will generate, say, a 100MB file. So the result
> of my sort program will have 10GB/100MB=100 files, with file1 have the
> smallest words and file100 have the largest words.
>
>  From your answer, I believe this can be deal with by using s3fs? so:
> 4) Yes, I want some help to use s3fs.
>
> Since this is something that would be of general interest. I will update
> the github readme page with directions for how to run swift
> over a s3fs acting as a shared-filesystem.
>
>  But, are there any other general ways to deal with big-file-sorting in
> swift? Can you give me a hit about what they would be? Like, how you
> generate deal with this sorting problem?
>
> The simplest strategy I can think of is to split each chunk into say 100
> buckets, and have the corresponding buckets from every chunk merge-sorted.
>
>  Thanks,
> Jiada Tu
>
> On Sat, Oct 18, 2014 at 6:13 PM, Yadu Nand Babuji <yadunand at uchicago.edu>
> wrote:
>
>>  Hi Jiada Tu,
>>
>> 1) Here's an example for returning an array of files :
>>
>> type file;
>> app (file outs[]) make_outputs (file script)
>> {
>>     bash @script;
>> }
>>
>> file outputs[] <filesys_mapper; prefix="outputs">;
>> file script       <"make_outputs.sh">; # This script creates a few files
>> with outputs as prefix
>> (outputs) = make_outputs(script);
>>
>> 2) The products of a successful task execution, must be visible to the
>> headnode (where swift runs) either through a
>> - shared filesystem (NFS, S3 mounted over s3fs etc)  or
>> - must be brought back over the network.
>> But, we can reduce the overhead in moving the results to the headnode and
>> then to the workers for the reduce stage.
>>
>> I understand that this is part of your assignment, so I will try to
>> answer without getting too specific, at the same time,
>> concepts from hadoop do not necessarily work directly in this context. So
>> here are some things to consider to get
>> the best performance possible:
>>
>> - Assuming that the texts contain 10K unique words, your sort program
>> will generate a file containing atmost 10K lines
>>  (which would be definitely under an MB). Is there any advantage into
>> splitting this into smaller files ?
>>
>> - Since the final merge involves tiny files, you could very well do the
>> reduce stage on the headnode and be quite efficient
>>   (you can define the reduce app only for site:local)
>>
>>   sites : [local, cloud-static]
>>   site.local {
>>                 ....
>>                 app.reduce {
>>                         executable : ${env.PWD}/reduce.py
>>                 }
>>   }
>>
>>   site.cloud-static {
>>                 ....
>>                 app.python {
>>                         executable : /usr/bin/python
>>                 }
>>
>>  }
>>
>>  This assumes that you are going to define your sorting app like this :
>>
>>   app (file freqs) sort (file sorting_script, file input ) {
>>        python @sorting_script @input;
>>  }
>>
>>
>> - The real cost is in having the original text reach the workers, this
>> can be made faster by :
>>     - A better headnode with better network/disk IO (I've measured
>> 140Mbit/s between m1.medium nodes, c3.8xlarge comes with 975Mbits/s)
>>     - Use S3 with S3fs and have swift-workers pull data from S3 which is
>> pretty scalable, and remove the IO load from the headnode.
>>
>> - Identify the optimal size for data chunks for your specific problem.
>> Each chunk of data in this case comes with the overhead of starting
>>   a new remote task, sending the data and bringing results back. Note
>> that the result of a wordcount on a file whether it is 1Mb or 10Gb
>>   is still the atmost 1Mb (with earlier assumptions)
>>
>> - Ensure that the data with the same datacenter, for cost as well as
>> performance. By limiting the cluster to US-Oregon we already do this.
>>
>> If you would like to attempt this using S3FS, let me know, I'll be happy
>> to explain that in detail.
>>
>> Thanks,
>> Yadu
>>
>>
>>
>> On 10/18/2014 04:18 PM, Jiada Tu wrote:
>>
>>  I am doing an assignment with swift to sort large data. The data
>> contains one record (string) each line. We need to sort the records base on
>> ascii code. The data is too large to fit in the memory.
>>
>>  The large data file is in head node, and I run the swift script
>> directly on head node.
>>
>>  Here's what I plan to do:
>>
>>  1) split the big file into 64MB files
>> 2) let each worker task sort one 64MB files. Say, each task will call a
>> "sort.py" (written by me). sort.py will output a list of files,
>> say:"sorted-worker1-001; sorted-worker1-002; ......". The first file
>> contains the records started with 'a', the second started with 'b', etc.
>> 3) now we will have all records started with 'a' in
>> (sorted-worker1-001;sorted-worker2-001;...); 'b' in
>>  (sorted-worker1-002;sorted-worker2-002; ......); ...... Then I send all
>> the files contains records 'a' to a "reduce" worker task and let it merge
>> these files into one single file. Same to 'b', 'c', etc.
>> 4) now we get 26 files (a-z) with each sorted inside.
>>
>>  Basically what I am doing is simulate Map-reduce. step 2 is map and
>> step 3 is reduce
>>
>>  Here comes some problems:
>> 1) for step 2, sort.py need to output a list of files. How can swift app
>> function handles list of outputs?
>>
>>     app (file[] outfiles) sort (file[] infiles) {
>>           sort.py // how to put out files here?
>>     }
>>
>>  2) As I know (may be wrong), swift will stage all the output file back
>> to the local disk (here is the head node since I run the swift script
>> directly on headnode). So the output files in step 2 will be staged back to
>> head node first, then stage from head node to the worker nodes to do the
>> step 3, then stage the 26 files in step 4 back to head node. I don't want
>> it because the network will be a huge bottleneck. Is there any way to tell
>> the "reduce" worker to get data directly from "map" worker? Maybe a shared
>> file system will help, but is there any way that user can control the data
>> staging between workers without using the shared file system?
>>
>>  Since I am new to the swift, I may be totally wrong and
>> misunderstanding what swift do. If so, please correct me.
>>
>>
>>
>>
>>  _______________________________________________
>> Swift-user mailing listSwift-user at ci.uchicago.eduhttps://lists.ci.uchicago.edu/cgi-bin/mailman/listinfo/swift-user
>>
>>
>>
>> _______________________________________________
>> Swift-user mailing list
>> Swift-user at ci.uchicago.edu
>> https://lists.ci.uchicago.edu/cgi-bin/mailman/listinfo/swift-user
>>
>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.mcs.anl.gov/pipermail/swift-user/attachments/20141019/2152d3c4/attachment.html>


More information about the Swift-user mailing list