[Swift-commit] r7572 - in trunk: . bin libexec/log-processing provenance
lgadelha at ci.uchicago.edu
lgadelha at ci.uchicago.edu
Wed Feb 5 09:05:20 CST 2014
Author: lgadelha
Date: 2014-02-05 09:05:19 -0600 (Wed, 05 Feb 2014)
New Revision: 7572
Added:
trunk/provenance/
trunk/provenance/schema_sqlite.sql
Modified:
trunk/bin/swiftlog
trunk/libexec/log-processing/log-to-execute2-transitions
Log:
swiftlog modified to import app/file provenance into a sqlite database
Modified: trunk/bin/swiftlog
===================================================================
--- trunk/bin/swiftlog 2014-02-05 04:30:23 UTC (rev 7571)
+++ trunk/bin/swiftlog 2014-02-05 15:05:19 UTC (rev 7572)
@@ -3,22 +3,43 @@
import sys
import os
import operator
+import sqlite3
+import string
+import datetime
-if len(sys.argv) != 2:
- sys.exit("Usage: %s <run_directory>" % sys.argv[0])
+if len(sys.argv) == 2:
+ log_directory = os.path.normpath(sys.argv[1])
+ log_filename = os.path.join(log_directory, os.path.basename(log_directory) + ".log")
+ log_directory = os.path.normpath(sys.argv[1])
+ generateProvDB=False
+elif len(sys.argv) == 3:
+ if sys.argv[1] != "-import-provenance":
+ sys.exit("Usage: %s -import-provenance <run_directory>" % sys.argv[0])
+ else:
+ log_directory = os.path.normpath(sys.argv[2])
+ log_filename = os.path.join(log_directory, os.path.basename(log_directory) + ".log")
+ log_directory = os.path.normpath(sys.argv[2])
+ generateProvDB=True
+else:
+ sys.exit("Usage:\n%s <run_directory>\nor\n%s -import-provenance <run_directory>" % (sys.argv[0], sys.argv[0]))
# Open log file
-log_directory = os.path.normpath(sys.argv[1])
log_filename = os.path.join(log_directory, os.path.basename(log_directory) + ".log")
log_file = open(log_filename, "r")
+swiftHomeDir = os.path.split(os.path.dirname(sys.argv[0]))[0]
# Class definition for a single Task
class Task:
+ taskId = ''
app = ''
arguments = ''
host = ''
stageIn = ''
stageOut = ''
+ startStageIn = ''
+ stopStageIn = ''
+ startStageOut = ''
+ stopStageOut = ''
startTime = ''
stopTime = ''
taskNumber = ''
@@ -51,18 +72,71 @@
timestamp = entry.split()[1]
return timestamp.split(',')[0]
+def getISOTime(entry):
+ entryParts=entry.split()
+ return (entryParts[0] + " " + entryParts[1]).replace(',','.')
+
+def getUnixEpoch(entry):
+ return os.popen("echo \"" + entry + "\"" + " | " + swiftHomeDir + "/libexec/log-processing/iso-to-secs").read()
+
+def computeDuration(startISOTime, endISOTime):
+ return float(getUnixEpoch(endISOTime)) - float(getUnixEpoch(startISOTime))
+
# Get all text between [ and ]
def getBracketedText(entry):
- return entry.partition('[')[-1].rpartition(']')[0]
+ return entry.partition('[')[-1].rpartition(']')[0]
+def canonicalFileName(entry):
+ output = string.replace(entry.strip(), '/./', '/')
+ return output
+
+def insertFile(fName, sRunDir, hname, crs, direction, app_exec_id):
+ fileLocation=sRunDir + "/" + fName
+ fileStat = os.stat(fileLocation)
+ fileHost=hname
+ fileSize=fileStat.st_size
+ fileModify=fileStat.st_mtime
+ fileId=fileHost + ":" + fName + ":" + str(fileSize) + ":" + str(fileModify)
+ crs.execute('SELECT COUNT(*) FROM file WHERE file_id=:fileId', {"fileId": fileId})
+ if int(crs.fetchone()[0])==0:
+ crs.execute('INSERT INTO file VALUES (?, ?, ?, ?, ?)',
+ (fileId, fileHost, fName, fileSize, fileModify))
+ if direction=='IN':
+ crs.execute('INSERT INTO staged_in VALUES (?, ?)', (app_exec_id, fileId))
+ else:
+ crs.execute('INSERT INTO staged_out VALUES (?, ?)', (app_exec_id, fileId))
+
+finalState='FAIL'
+
# Parse log
for line in iter(log_file):
+ if 'Loader Swift finished with no errors' in line:
+ finalState='SUCCESS'
+ endTime=getISOTime(line)
+
+ if 'GLOBUS_HOSTNAME' in line:
+ hostname=line.split()[5]
+
+ if 'source file' in line:
+ scriptFileName=line.split()[4].strip(':')
+
+ if 'CWD' in line:
+ scriptRunDir=line.split()[5].rstrip('/.')
+
+ if 'VERSION' in line:
+ versionLine=line.split()
+ swiftVersion=versionLine[7].strip('swift-r')
+ for s in iter(versionLine):
+ if 'cog-r' in s:
+ cogVersion=s.strip('cog-r')
+
if 'JOB_START' in line:
taskid = getValue(line, "jobid")
task = getTask(taskid)
task.app = getValue(line, "tr")
- task.startTime = getTime(line)
+ task.startTime = getISOTime(line)
+ task.taskId = getValue(line, "jobid")
task.workdir = getValue(line, "tmpdir")
task.host = getValue(line, "host")
task.arguments = getBracketedText(line)
@@ -72,12 +146,29 @@
elif 'JOB_END' in line:
taskid = getValue(line, "jobid")
task = getTask(taskid)
- task.stopTime = getTime(line)
+ task.stopTime = getISOTime(line)
- elif "Staging in files" in line:
- taskid = getValue(line, "jobid")
- task = getTask(taskid)
- task.stageIn = getBracketedText(line)
+ elif "Staging in" in line:
+ if "START" in line:
+ taskid = getValue(line, "jobid")
+ task = getTask(taskid)
+ task.stageIn = getBracketedText(line)
+ task.startStageIn = getISOTime(line)
+ if "END" in line:
+ taskid = getValue(line, "jobid")
+ task = getTask(taskid)
+ task.stopStageIn = getISOTime(line)
+
+ elif "Staging out" in line:
+ if "START" in line:
+ taskid = getValue(line, "jobid")
+ task = getTask(taskid)
+ task.startStageOut = getISOTime(line)
+ if "END" in line:
+ taskid = getValue(line, "jobid")
+ task = getTask(taskid)
+ task.stopStageOut = getISOTime(line)
+
elif "FILE_STAGE_OUT_START" in line:
taskid = getValue(line, "jobid")
@@ -85,14 +176,94 @@
file_out = getValue(line, "srcname")
task.stageOut += file_out + " "
-# Print tasks
-for t in sorted(tasks.values(), key=operator.attrgetter('taskNumber')):
- print "Task: %s" % t.taskNumber
- print "\tApp name: %s" % t.app
- print "\tCommand line arguments: %s" % t.arguments
- print "\tHost: %s" % t.host
- print "\tStart time: %s" % t.startTime
- print "\tStop time: %s" % t.stopTime
- print "\tWork directory: %s" % t.workdir
- print "\tStaged in: %s" % t.stageIn
- print "\tStaged out: %s\n" % t.stageOut
+log_file.seek(0,0)
+scriptStartTime=getISOTime(log_file.readline())
+log_file.close()
+scriptEndTime=getISOTime(os.popen("tail -1 " + log_filename).read())
+logFileTailCksum=(os.popen("tail -1000 " + log_filename +" | cksum ").read()).split()[0]
+scriptDuration=computeDuration(scriptStartTime, scriptEndTime)
+runDirName=log_filename.split('/')[0]
+runId=scriptFileName.rstrip('.swift') + "-" + runDirName + "-" + logFileTailCksum
+
+
+if generateProvDB:
+ homeDirectory = os.path.expanduser('~')
+ provDB = homeDirectory + "/.swift_provenance.db"
+
+ print "Provenance import: checking if database exists"
+
+ # Initializes database if it not yet exists
+ if not(os.path.exists(provDB)):
+ print "Provenance import: creating database"
+ schemaFilename=swiftHomeDir + "/provenance/schema_sqlite.sql"
+ schema=open(schemaFilename, "r").read()
+ conn = sqlite3.connect(provDB)
+ c = conn.cursor()
+ c.executescript(schema)
+ conn.commit()
+ conn.close()
+
+ conn = sqlite3.connect(provDB)
+ c = conn.cursor()
+
+ # Checks if run was already imported
+ c.execute('SELECT COUNT(*) FROM script_run WHERE script_run_id=:runId', {"runId": runId})
+ if int(c.fetchone()[0])==0:
+ print "Provenance import: importing run " + runId
+ c.execute('INSERT INTO script_run VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
+ (runId, scriptFileName, log_filename, hostname, scriptRunDir,
+ swiftVersion, cogVersion, finalState, scriptStartTime, scriptDuration))
+ print "Provenance import: importing application executions"
+ for t in sorted(tasks.values(), key=operator.attrgetter('taskNumber')):
+ appExecId=runId + ":" + t.taskId
+ appExecDuration=computeDuration(t.startTime, t.stopTime)
+ stageInDuration=computeDuration(t.startStageIn, t.stopStageIn)
+ stageOutDuration=computeDuration(t.startStageOut, t.stopStageOut)
+ c.execute('INSERT INTO app_exec VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
+ (appExecId, runId, t.app, t.host, t.startTime, appExecDuration,
+ stageInDuration, stageOutDuration, t.workdir))
+ for f in iter(t.stageIn.split(",")):
+ direction="IN"
+ fileName=string.replace(canonicalFileName(f), 'file://localhost/', '')
+ insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId)
+
+ for f in iter(t.stageOut.split(",")):
+ direction="OUT"
+ fileName=canonicalFileName(f)
+ insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId)
+
+ for i, a in enumerate(t.arguments.split(",")):
+ position=i+1
+ c.execute('INSERT INTO argument VALUES (?, ?, ?)', (appExecId, position, a.strip()))
+
+ # Extracts resource usage information from wrapper logs
+ wrapperLogFileName=runDirName + "/" + scriptFileName.rstrip('.swift') + "-" + runDirName + ".d/" + t.workdir.split('/')[3] + "-info"
+ wrapperLogFile = open(wrapperLogFileName, "r")
+ for line in iter(wrapperLogFile):
+ if 'APP_RESOURCES' in line:
+ for i, pair in enumerate(line.lstrip('APP_RESOURCES=').split(',')):
+ entry=pair.split(':')
+ if i == 1:
+ c.execute("INSERT INTO resource_usage (app_exec_id, %s) VALUES ('%s',%s)" % (entry[0], appExecId, entry[1].rstrip('%')))
+ else:
+ c.execute("UPDATE resource_usage SET %s=%s WHERE app_exec_id='%s'" % (entry[0], entry[1].rstrip('%'), appExecId))
+ wrapperLogFile.close()
+ print "Provenance import: finished importing to ~/.swift_provenance.db"
+ else:
+ print "Provenance import: run " + runId + " is already in the database"
+
+ conn.commit()
+ conn.close()
+else:
+ # Print tasks
+ for t in sorted(tasks.values(), key=operator.attrgetter('taskNumber')):
+ print "Task: %s" % t.taskNumber
+ print "\tApp name: %s" % t.app
+ print "\tCommand line arguments: %s" % t.arguments
+ print "\tHost: %s" % t.host
+ print "\tStart time: %s" % t.startTime
+ print "\tStop time: %s" % t.stopTime
+ print "\tWork directory: %s" % t.workdir
+ print "\tStaged in: %s" % t.stageIn
+ print "\tStaged out: %s\n" % t.stageOut
+
Modified: trunk/libexec/log-processing/log-to-execute2-transitions
===================================================================
--- trunk/libexec/log-processing/log-to-execute2-transitions 2014-02-05 04:30:23 UTC (rev 7571)
+++ trunk/libexec/log-processing/log-to-execute2-transitions 2014-02-05 15:05:19 UTC (rev 7572)
@@ -20,7 +20,7 @@
grep ' swift ' | iso-to-secs | \
grep -E '^[^ ]+ +[^ ]+ +swift ' | \
-sed 's/^\(.*\) DEBUG swift THREAD_ASSOCIATION jobid=\([^ ]*\) thread=\(R[0-9x\-]*\) host=\([^ ]*\) replicationGroup=\([^ ]*\).*$/\1 \2 ASSOCIATED \3 \4 \5/' | \
+sed 's/^\(.*\) DEBUG swift THREAD_ASSOCIATION jobid=\([^ ]*\) thread=\([0-9Rx\-]*\) host=\([^ ]*\) replicationGroup=\([^ ]*\).*$/\1 \2 ASSOCIATED \3 \4 \5/' | \
\
sed 's/^\([^ ]*\) DEBUG swift \([^ ]*\) jobid=\([^ ]*\).*/\1 \3 \2/' | grep -v DEBUG | grep -v INFO
Added: trunk/provenance/schema_sqlite.sql
===================================================================
--- trunk/provenance/schema_sqlite.sql (rev 0)
+++ trunk/provenance/schema_sqlite.sql 2014-02-05 15:05:19 UTC (rev 7572)
@@ -0,0 +1,74 @@
+create table script_run (
+ script_run_id text primary key,
+ script_filename text,
+ log_filename text,
+ hostname text,
+ script_run_dir text,
+ swift_version text,
+ cog_version text,
+ final_state text,
+ start_time text,
+ duration real
+);
+
+create table app_exec (
+ app_exec_id text primary key,
+ script_run_id references script_run(script_run_id),
+ app__name text,
+ execution_site text,
+ start_time text,
+ duration real,
+ staging_in_duration real,
+ staging_out_duration real,
+ work_directory text
+);
+
+create table argument (
+ app_exec_id text references app_exec (app_exec_id),
+ arg_position integer,
+ app_exec_arg text
+);
+
+create table resource_usage (
+ app_exec_id text primary key references app_exec (app_exec_id),
+ real_secs real,
+ kernel_secs real,
+ user_secs real,
+ percent_cpu integer,
+ max_rss integer,
+ avg_rss integer,
+ avg_tot_vm integer,
+ avg_priv_data integer,
+ avg_priv_stack integer,
+ avg_shared_text integer,
+ page_size integer,
+ major_pgfaults integer,
+ minor_pgfaults integer,
+ swaps integer,
+ invol_context_switches integer,
+ vol_waits integer,
+ fs_reads integer,
+ fs_writes integer,
+ sock_recv integer,
+ sock_send integer,
+ signals integer,
+ exit_status integer
+);
+
+create table file (
+ file_id text primary key,
+ host text,
+ name text,
+ size integer,
+ modify integer
+);
+
+create table staged_in (
+ app_exec_id text references app_exec (app_exec_id),
+ file_id text references file (file_id)
+);
+
+create table staged_out (
+ app_exec_id text references app_exec (app_exec_id),
+ file_id text references file (file_id)
+);
\ No newline at end of file
More information about the Swift-commit
mailing list