[Swift-commit] r7572 - in trunk: . bin libexec/log-processing provenance

lgadelha at ci.uchicago.edu lgadelha at ci.uchicago.edu
Wed Feb 5 09:05:20 CST 2014


Author: lgadelha
Date: 2014-02-05 09:05:19 -0600 (Wed, 05 Feb 2014)
New Revision: 7572

Added:
   trunk/provenance/
   trunk/provenance/schema_sqlite.sql
Modified:
   trunk/bin/swiftlog
   trunk/libexec/log-processing/log-to-execute2-transitions
Log:
swiftlog modified to import app/file provenance into a sqlite database


Modified: trunk/bin/swiftlog
===================================================================
--- trunk/bin/swiftlog	2014-02-05 04:30:23 UTC (rev 7571)
+++ trunk/bin/swiftlog	2014-02-05 15:05:19 UTC (rev 7572)
@@ -3,22 +3,43 @@
 import sys
 import os
 import operator
+import sqlite3
+import string
+import datetime
 
-if len(sys.argv) != 2:
-    sys.exit("Usage: %s <run_directory>" % sys.argv[0])
+if len(sys.argv) == 2:
+    log_directory = os.path.normpath(sys.argv[1])
+    log_filename = os.path.join(log_directory, os.path.basename(log_directory) + ".log")
+    log_directory = os.path.normpath(sys.argv[1])
+    generateProvDB=False
+elif len(sys.argv) == 3:
+    if sys.argv[1] != "-import-provenance":
+        sys.exit("Usage: %s -import-provenance <run_directory>" % sys.argv[0])
+    else:
+        log_directory = os.path.normpath(sys.argv[2])
+        log_filename = os.path.join(log_directory, os.path.basename(log_directory) + ".log")
+        log_directory = os.path.normpath(sys.argv[2])
+        generateProvDB=True
+else:
+    sys.exit("Usage:\n%s <run_directory>\nor\n%s -import-provenance <run_directory>" % (sys.argv[0], sys.argv[0]))
 
 # Open log file
-log_directory = os.path.normpath(sys.argv[1])
 log_filename = os.path.join(log_directory, os.path.basename(log_directory) + ".log")
 log_file = open(log_filename, "r")
+swiftHomeDir = os.path.split(os.path.dirname(sys.argv[0]))[0]
 
 # Class definition for a single Task
 class Task:
+    taskId     = ''
     app        = ''
     arguments  = ''
     host       = ''
     stageIn    = ''
     stageOut   = ''
+    startStageIn = ''
+    stopStageIn = ''
+    startStageOut = ''
+    stopStageOut = ''
     startTime  = ''
     stopTime   = ''
     taskNumber = ''
@@ -51,18 +72,71 @@
    timestamp = entry.split()[1]
    return timestamp.split(',')[0]
 
+def getISOTime(entry):
+    entryParts=entry.split()
+    return (entryParts[0] + " " + entryParts[1]).replace(',','.')
+
+def getUnixEpoch(entry):
+    return os.popen("echo \"" + entry + "\"" + " | " + swiftHomeDir + "/libexec/log-processing/iso-to-secs").read()
+
+def computeDuration(startISOTime, endISOTime):
+    return float(getUnixEpoch(endISOTime)) - float(getUnixEpoch(startISOTime))
+
 # Get all text between [ and ]
 def getBracketedText(entry):
-   return entry.partition('[')[-1].rpartition(']')[0]
+    return entry.partition('[')[-1].rpartition(']')[0]
 
+def canonicalFileName(entry):
+    output = string.replace(entry.strip(), '/./', '/')
+    return output
+
+def insertFile(fName, sRunDir, hname, crs, direction, app_exec_id):
+    fileLocation=sRunDir + "/" + fName
+    fileStat = os.stat(fileLocation)
+    fileHost=hname
+    fileSize=fileStat.st_size
+    fileModify=fileStat.st_mtime
+    fileId=fileHost + ":" + fName + ":" + str(fileSize) + ":" + str(fileModify)
+    crs.execute('SELECT COUNT(*) FROM file WHERE file_id=:fileId', {"fileId": fileId})
+    if int(crs.fetchone()[0])==0:
+        crs.execute('INSERT INTO file VALUES (?, ?, ?, ?, ?)',
+                  (fileId, fileHost, fName, fileSize, fileModify))
+    if direction=='IN':
+        crs.execute('INSERT INTO staged_in VALUES (?, ?)', (app_exec_id, fileId))
+    else:
+        crs.execute('INSERT INTO staged_out VALUES (?, ?)', (app_exec_id, fileId))
+
+finalState='FAIL'
+
 # Parse log
 for line in iter(log_file):
 
+    if 'Loader Swift finished with no errors' in line:
+        finalState='SUCCESS'
+        endTime=getISOTime(line)
+
+    if 'GLOBUS_HOSTNAME' in line:
+        hostname=line.split()[5]
+
+    if 'source file' in line:
+        scriptFileName=line.split()[4].strip(':')
+  
+    if 'CWD' in line:
+        scriptRunDir=line.split()[5].rstrip('/.')
+
+    if 'VERSION' in line:
+        versionLine=line.split()
+        swiftVersion=versionLine[7].strip('swift-r')
+        for s in iter(versionLine):
+            if 'cog-r' in s:
+                cogVersion=s.strip('cog-r')
+                
     if 'JOB_START' in line:
         taskid          = getValue(line, "jobid")
         task            = getTask(taskid)
         task.app        = getValue(line, "tr")
-        task.startTime  = getTime(line)
+        task.startTime  = getISOTime(line)
+        task.taskId     = getValue(line, "jobid")
         task.workdir    = getValue(line, "tmpdir")
         task.host       = getValue(line, "host")
         task.arguments  = getBracketedText(line)
@@ -72,12 +146,29 @@
     elif 'JOB_END' in line:
         taskid        = getValue(line, "jobid")
         task          = getTask(taskid)
-        task.stopTime = getTime(line)
+        task.stopTime = getISOTime(line)
 
-    elif "Staging in files" in line:
-        taskid       = getValue(line, "jobid")
-        task         = getTask(taskid)
-        task.stageIn = getBracketedText(line)
+    elif "Staging in" in line:
+        if "START" in line:
+            taskid       = getValue(line, "jobid")
+            task         = getTask(taskid)
+            task.stageIn = getBracketedText(line)
+            task.startStageIn = getISOTime(line)
+        if "END" in line:
+            taskid       = getValue(line, "jobid")
+            task         = getTask(taskid)           
+            task.stopStageIn = getISOTime(line)
+            
+    elif "Staging out" in line:
+        if "START" in line:
+            taskid       = getValue(line, "jobid")
+            task         = getTask(taskid)
+            task.startStageOut = getISOTime(line)
+        if "END" in line:
+            taskid       = getValue(line, "jobid")
+            task         = getTask(taskid)
+            task.stopStageOut = getISOTime(line)
+    
 
     elif "FILE_STAGE_OUT_START" in line:
         taskid         = getValue(line, "jobid")
@@ -85,14 +176,94 @@
         file_out       = getValue(line, "srcname")
         task.stageOut += file_out + " "
 
-# Print tasks
-for t in sorted(tasks.values(), key=operator.attrgetter('taskNumber')):
-    print "Task: %s" % t.taskNumber
-    print "\tApp name: %s" % t.app
-    print "\tCommand line arguments: %s" % t.arguments
-    print "\tHost: %s" % t.host
-    print "\tStart time: %s" % t.startTime
-    print "\tStop time: %s" % t.stopTime
-    print "\tWork directory: %s" % t.workdir
-    print "\tStaged in: %s" % t.stageIn
-    print "\tStaged out: %s\n" % t.stageOut
+log_file.seek(0,0)
+scriptStartTime=getISOTime(log_file.readline())
+log_file.close()
+scriptEndTime=getISOTime(os.popen("tail -1 " + log_filename).read())
+logFileTailCksum=(os.popen("tail -1000 " + log_filename +" | cksum ").read()).split()[0]
+scriptDuration=computeDuration(scriptStartTime, scriptEndTime) 
+runDirName=log_filename.split('/')[0]
+runId=scriptFileName.rstrip('.swift') + "-" + runDirName + "-" + logFileTailCksum
+
+
+if generateProvDB:
+    homeDirectory = os.path.expanduser('~')
+    provDB = homeDirectory + "/.swift_provenance.db"
+    
+    print "Provenance import: checking if database exists" 
+    
+    # Initializes database if it not yet exists
+    if not(os.path.exists(provDB)):
+        print "Provenance import: creating database"
+        schemaFilename=swiftHomeDir + "/provenance/schema_sqlite.sql"
+        schema=open(schemaFilename, "r").read()
+        conn = sqlite3.connect(provDB)
+        c = conn.cursor()
+        c.executescript(schema)
+        conn.commit()
+        conn.close()
+        
+    conn = sqlite3.connect(provDB)
+    c = conn.cursor()
+        
+    # Checks if run was already imported
+    c.execute('SELECT COUNT(*) FROM script_run WHERE script_run_id=:runId', {"runId": runId})
+    if int(c.fetchone()[0])==0:
+        print "Provenance import: importing run " + runId
+        c.execute('INSERT INTO script_run VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', 
+                  (runId, scriptFileName, log_filename, hostname, scriptRunDir, 
+                   swiftVersion, cogVersion, finalState, scriptStartTime, scriptDuration))
+        print "Provenance import: importing application executions"
+        for t in sorted(tasks.values(), key=operator.attrgetter('taskNumber')):
+            appExecId=runId + ":" + t.taskId
+            appExecDuration=computeDuration(t.startTime, t.stopTime)
+            stageInDuration=computeDuration(t.startStageIn, t.stopStageIn)
+            stageOutDuration=computeDuration(t.startStageOut, t.stopStageOut)
+            c.execute('INSERT INTO app_exec VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
+                      (appExecId, runId, t.app, t.host, t.startTime, appExecDuration, 
+                       stageInDuration, stageOutDuration, t.workdir))
+            for f in iter(t.stageIn.split(",")):
+                direction="IN"
+                fileName=string.replace(canonicalFileName(f), 'file://localhost/', '')
+                insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId)
+                
+            for f in iter(t.stageOut.split(",")):
+                direction="OUT"
+                fileName=canonicalFileName(f)
+                insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId)
+
+            for i, a in enumerate(t.arguments.split(",")):
+                position=i+1
+                c.execute('INSERT INTO argument VALUES (?, ?, ?)', (appExecId, position, a.strip()))
+        
+            # Extracts resource usage information from wrapper logs
+            wrapperLogFileName=runDirName + "/" + scriptFileName.rstrip('.swift') + "-" + runDirName + ".d/" + t.workdir.split('/')[3] + "-info"
+            wrapperLogFile = open(wrapperLogFileName, "r")
+            for line in iter(wrapperLogFile):
+                if 'APP_RESOURCES' in line:
+                    for i, pair in enumerate(line.lstrip('APP_RESOURCES=').split(',')):
+                        entry=pair.split(':')
+                        if i == 1:
+                            c.execute("INSERT INTO resource_usage (app_exec_id, %s) VALUES ('%s',%s)" % (entry[0], appExecId, entry[1].rstrip('%')))
+                        else:
+                            c.execute("UPDATE resource_usage SET %s=%s WHERE app_exec_id='%s'" % (entry[0], entry[1].rstrip('%'), appExecId))
+            wrapperLogFile.close()
+        print "Provenance import: finished importing to ~/.swift_provenance.db"
+    else:
+        print "Provenance import: run " + runId + " is already in the database"
+        
+    conn.commit()
+    conn.close()
+else:
+    # Print tasks
+    for t in sorted(tasks.values(), key=operator.attrgetter('taskNumber')):
+        print "Task: %s" % t.taskNumber
+        print "\tApp name: %s" % t.app
+        print "\tCommand line arguments: %s" % t.arguments
+        print "\tHost: %s" % t.host
+        print "\tStart time: %s" % t.startTime
+        print "\tStop time: %s" % t.stopTime
+        print "\tWork directory: %s" % t.workdir
+        print "\tStaged in: %s" % t.stageIn
+        print "\tStaged out: %s\n" % t.stageOut
+

Modified: trunk/libexec/log-processing/log-to-execute2-transitions
===================================================================
--- trunk/libexec/log-processing/log-to-execute2-transitions	2014-02-05 04:30:23 UTC (rev 7571)
+++ trunk/libexec/log-processing/log-to-execute2-transitions	2014-02-05 15:05:19 UTC (rev 7572)
@@ -20,7 +20,7 @@
 
 grep ' swift ' | iso-to-secs | \
 grep -E '^[^ ]+ +[^ ]+ +swift ' | \
-sed 's/^\(.*\) DEBUG swift THREAD_ASSOCIATION jobid=\([^ ]*\) thread=\(R[0-9x\-]*\) host=\([^ ]*\) replicationGroup=\([^ ]*\).*$/\1 \2 ASSOCIATED \3 \4 \5/'  | \
+sed 's/^\(.*\) DEBUG swift THREAD_ASSOCIATION jobid=\([^ ]*\) thread=\([0-9Rx\-]*\) host=\([^ ]*\) replicationGroup=\([^ ]*\).*$/\1 \2 ASSOCIATED \3 \4 \5/'  | \
 \
 sed 's/^\([^ ]*\) DEBUG swift \([^ ]*\) jobid=\([^ ]*\).*/\1 \3 \2/' | grep -v DEBUG | grep -v INFO
 

Added: trunk/provenance/schema_sqlite.sql
===================================================================
--- trunk/provenance/schema_sqlite.sql	                        (rev 0)
+++ trunk/provenance/schema_sqlite.sql	2014-02-05 15:05:19 UTC (rev 7572)
@@ -0,0 +1,74 @@
+create table script_run (
+	script_run_id	   text primary key,
+        script_filename    text,
+	log_filename       text,
+	hostname	   text,
+	script_run_dir	   text,
+        swift_version      text,
+        cog_version        text,
+        final_state        text,
+        start_time         text,
+        duration           real
+);
+
+create table app_exec (
+	app_exec_id			text primary key,
+        script_run_id   		references script_run(script_run_id),
+	app__name			text,
+	execution_site			text,
+	start_time			text,
+	duration			real,
+	staging_in_duration		real,
+	staging_out_duration		real,
+	work_directory			text
+);
+
+create table argument (
+	app_exec_id			text references app_exec (app_exec_id),
+	arg_position			integer,
+	app_exec_arg			text
+);	
+
+create table resource_usage (
+       app_exec_id	    		text primary key references app_exec (app_exec_id),
+       real_secs	       		real,
+       kernel_secs             		real,
+       user_secs	       		real,
+       percent_cpu             		integer,
+       max_rss	       	       		integer,
+       avg_rss	       			integer,
+       avg_tot_vm	       		integer,
+       avg_priv_data     		integer,
+       avg_priv_stack    		integer,
+       avg_shared_text   		integer,
+       page_size	       		integer,
+       major_pgfaults    		integer,
+       minor_pgfaults    		integer,
+       swaps	       			integer,
+       invol_context_switches		integer,
+       vol_waits			integer,
+       fs_reads				integer,
+       fs_writes			integer,
+       sock_recv			integer,
+       sock_send			integer,
+       signals				integer,
+       exit_status			integer
+);
+
+create table file (
+       file_id		text primary key,
+       host		text, 
+       name		text, 
+       size		integer,
+       modify		integer
+);	
+
+create table staged_in (
+       app_exec_id			text references app_exec (app_exec_id),
+       file_id 				text references file (file_id)
+);
+
+create table staged_out (
+       app_exec_id			text references app_exec (app_exec_id),
+       file_id				text references file (file_id)
+);
\ No newline at end of file




More information about the Swift-commit mailing list