[Swift-commit] r7638 - trunk/bin
lgadelha at ci.uchicago.edu
lgadelha at ci.uchicago.edu
Fri Mar 7 05:19:00 CST 2014
Author: lgadelha
Date: 2014-03-07 05:18:59 -0600 (Fri, 07 Mar 2014)
New Revision: 7638
Modified:
trunk/bin/swiftlog
Log:
Included PostgreSQL support
Modified: trunk/bin/swiftlog
===================================================================
--- trunk/bin/swiftlog 2014-03-06 22:18:29 UTC (rev 7637)
+++ trunk/bin/swiftlog 2014-03-07 11:18:59 UTC (rev 7638)
@@ -11,7 +11,7 @@
log_directory = os.path.normpath(sys.argv[1])
log_filename = os.path.join(log_directory, os.path.basename(log_directory) + ".log")
log_directory = os.path.normpath(sys.argv[1])
- generateProvDB=False
+ importProv=False
elif len(sys.argv) == 3:
if sys.argv[1] != "-import-provenance":
sys.exit("Usage: %s -import-provenance <run_directory>" % sys.argv[0])
@@ -19,7 +19,7 @@
log_directory = os.path.normpath(sys.argv[2])
log_filename = os.path.join(log_directory, os.path.basename(log_directory) + ".log")
log_directory = os.path.normpath(sys.argv[2])
- generateProvDB=True
+ importProv=True
else:
sys.exit("Usage:\n%s <run_directory>\nor\n%s -import-provenance <run_directory>" % (sys.argv[0], sys.argv[0]))
@@ -46,6 +46,10 @@
thread = ''
workdir = ''
+class Config:
+ key = ''
+ value = ''
+
# Dictionary containing all tasks
tasks = {}
taskCounter = 1
@@ -90,21 +94,43 @@
output = string.replace(entry.strip(), '/./', '/')
return output
-def insertFile(fName, sRunDir, hname, crs, direction, app_exec_id):
+def insertFile(fName, sRunDir, hname, crs, direction, app_exec_id, useSQLite, usePgSQL):
fileLocation=sRunDir + "/" + fName
fileStat = os.stat(fileLocation)
fileHost=hname
fileSize=fileStat.st_size
fileModify=fileStat.st_mtime
fileId=fileHost + ":" + fName + ":" + str(fileSize) + ":" + str(fileModify)
- crs.execute('SELECT COUNT(*) FROM file WHERE file_id=:fileId', {"fileId": fileId})
- if int(crs.fetchone()[0])==0:
- crs.execute('INSERT INTO file VALUES (?, ?, ?, ?, ?)',
- (fileId, fileHost, fName, fileSize, fileModify))
- if direction=='IN':
- crs.execute('INSERT INTO staged_in VALUES (?, ?)', (app_exec_id, fileId))
- else:
- crs.execute('INSERT INTO staged_out VALUES (?, ?)', (app_exec_id, fileId))
+ if useSQLite:
+ crs.execute('SELECT COUNT(*) FROM file WHERE file_id=:fileId', {"fileId": fileId})
+ if int(crs.fetchone()[0])==0:
+ crs.execute('INSERT INTO file VALUES (?, ?, ?, ?, ?)',
+ (fileId, fileHost, fName, fileSize, fileModify))
+ if direction=='IN':
+ crs.execute('INSERT INTO staged_in VALUES (?, ?)', (app_exec_id, fileId))
+ else:
+ crs.execute('INSERT INTO staged_out VALUES (?, ?)', (app_exec_id, fileId))
+ if usePgSQL:
+ pgSQLOutput=os.popen(pgsqlConnString + ' --tuples-only -c "SELECT COUNT(*) FROM file WHERE file_id=\'' + fileId + '\'"').readline().strip()
+ if pgSQLOutput == '0':
+ os.popen(pgsqlConnString + ' -c "INSERT INTO file VALUES (\'' + fileId + '\', ' +
+ '\'' + fileHost + '\', ' +
+ '\'' + fName + '\', ' +
+ str(fileSize) + ', ' +
+ str(fileModify) + ');"')
+ if direction=='IN':
+ os.popen(pgsqlConnString + ' -c "INSERT INTO staged_in VALUES (\'' + app_exec_id + '\', ' +
+ '\'' + fileId + '\');"')
+ else:
+ os.popen(pgsqlConnString + ' -c "INSERT INTO staged_out VALUES (\'' + app_exec_id + '\', ' +
+ '\'' + fileId + '\');"')
+
+def getConfigProperty(line):
+ configEntry=Config()
+ configTokens=line.split("=")
+ configEntry.key=configTokens[0].strip()
+ configEntry.value=configTokens[1].strip()
+ return configEntry
finalState='FAIL'
@@ -186,57 +212,137 @@
runId=scriptFileName.rstrip('.swift') + "-" + runDirName + "-" + logFileTailCksum
-if generateProvDB:
+if importProv:
+
+ #read configuration file
+ enableSQLite=False
+ enablePostgreSQL=False
+ enableGlobusOnlineCatalog=False
+ provConfigFilename = swiftHomeDir + "/etc/provenance/provenance.config"
+ provConfigFile = open(provConfigFilename, "r")
+
+ for line in iter(provConfigFile):
+ if 'database=' in line:
+ config=getConfigProperty(line)
+ if config.key=="database":
+ if config.value=="sqlite":
+ enableSQLite=True
+ if config.value=="postgresql":
+ enablePostgreSQL=True
+ if config.value=="gocatalog":
+ enableGlobusOnlineCatalog=True
+ if 'pg_cmd' in line:
+ config=getConfigProperty(line)
+ if config.key=="pg_cmd":
+ pgsqlConnString=config.value.strip("\"")
+ if 'go_catalog_id' in line:
+ config=getConfigProperty(line)
+ if config.key=="go_catalog_id":
+ globusOnlineId=config.value
+ if 'go_catalog_client' in line:
+ config=getConfigProperty(line)
+ if config.key=="go_catalog_client":
+ globusOnlineClient=config.value.strip("\"")
+
homeDirectory = os.path.expanduser('~')
- provDB = homeDirectory + "/.swift_provenance.db"
+ sqliteDBFile = homeDirectory + "/.swift_provenance.db"
print "Provenance import: checking if database exists"
# Initializes database if it not yet exists
- if not(os.path.exists(provDB)):
- print "Provenance import: creating database"
- schemaFilename=swiftHomeDir + "/etc/provenance/schema_sqlite.sql"
- schema=open(schemaFilename, "r").read()
- conn = sqlite3.connect(provDB)
+ # Assumes PostgreSQL and Globus Online Catalog already initialized
+ if enableSQLite==True:
+ if not(os.path.exists(sqliteDBFile)):
+ print "Provenance import: creating database"
+ schemaFilename=swiftHomeDir + "/etc/provenance/schema_sqlite.sql"
+ schema=open(schemaFilename, "r").read()
+ conn = sqlite3.connect(sqliteDBFile)
+ c = conn.cursor()
+ c.executescript(schema)
+ conn.commit()
+ conn.close()
+
+ runIsNewSQLite=False
+ runIsNewPgSQL=False
+
+ # Check if run was already imported into SQLite
+ if enableSQLite:
+ conn = sqlite3.connect(sqliteDBFile)
c = conn.cursor()
- c.executescript(schema)
- conn.commit()
- conn.close()
-
- conn = sqlite3.connect(provDB)
- c = conn.cursor()
-
- # Checks if run was already imported
- c.execute('SELECT COUNT(*) FROM script_run WHERE script_run_id=:runId', {"runId": runId})
- if int(c.fetchone()[0])==0:
- print "Provenance import: importing run " + runId
+ c.execute('SELECT COUNT(*) FROM script_run WHERE script_run_id=:runId', {"runId": runId})
+ if int(c.fetchone()[0])==0:
+ runIsNewSQLite=True
+
+ # Check if run was already imported into PostgreSQL
+ if enablePostgreSQL:
+ pgSQLOutput=os.popen(pgsqlConnString + ' --tuples-only -c "SELECT COUNT(*) FROM script_run WHERE script_run_id=\'' + runId + '\'"').readline().strip()
+ if pgSQLOutput == '0':
+ runIsNewPgSQL=True
+
+ if runIsNewSQLite:
+ print "Provenance import: importing run into SQLite " + runId
c.execute('INSERT INTO script_run VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
(runId, scriptFileName, log_filename, hostname, scriptRunDir,
- swiftVersion, cogVersion, finalState, scriptStartTime, scriptDuration))
+ swiftVersion, cogVersion, finalState, scriptStartTime, scriptDuration))
+
+ if runIsNewPgSQL:
+ print "Provenance import: importing run into PostgreSQL " + runId
+ os.popen(pgsqlConnString + ' -c "INSERT INTO script_run VALUES (\'' + runId + '\', ' +
+ '\'' + scriptFileName + '\', ' +
+ '\'' + log_filename + '\', ' +
+ '\'' + hostname + '\', ' +
+ '\'' + scriptRunDir + '\', ' +
+ '\'' + swiftVersion + '\', ' +
+ '\'' + cogVersion + '\', ' +
+ '\'' + finalState + '\', ' +
+ '\'' + scriptStartTime + '\', ' +
+ '\'' + str(scriptDuration) + '\');"')
+
+
+
+ if runIsNewSQLite or runIsNewPgSQL:
+
print "Provenance import: importing application executions"
for t in sorted(tasks.values(), key=operator.attrgetter('taskNumber')):
appExecId=runId + ":" + t.taskId
appExecDuration=computeDuration(t.startTime, t.stopTime)
stageInDuration=computeDuration(t.startStageIn, t.stopStageIn)
stageOutDuration=computeDuration(t.startStageOut, t.stopStageOut)
- c.execute('INSERT INTO app_exec VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
- (appExecId, runId, t.app, t.host, t.startTime, appExecDuration,
- stageInDuration, stageOutDuration, t.workdir))
+ if runIsNewSQLite:
+ c.execute('INSERT INTO app_exec VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
+ (appExecId, runId, t.app, t.host, t.startTime, appExecDuration,
+ stageInDuration, stageOutDuration, t.workdir))
+ if runIsNewPgSQL:
+ os.popen(pgsqlConnString + ' -c "INSERT INTO app_exec VALUES (\'' + appExecId + '\', ' +
+ '\'' + runId + '\', ' +
+ '\'' + t.app + '\', ' +
+ '\'' + t.host + '\', ' +
+ '\'' + t.startTime + '\', ' +
+ '\'' + str(appExecDuration) + '\', ' +
+ '\'' + str(stageInDuration) + '\', ' +
+ '\'' + str(stageOutDuration) + '\', ' +
+ '\'' + t.workdir + '\');"')
+
for f in iter(t.stageIn.split(",")):
direction="IN"
fileName=string.replace(canonicalFileName(f), 'file://localhost/', '')
- insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId)
+ insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId, runIsNewSQLite, runIsNewPgSQL)
for f in iter(t.stageOut.split(" ")):
direction="OUT"
fileName=canonicalFileName(f)
- insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId)
+ insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId, runIsNewSQLite, runIsNewPgSQL)
for i, a in enumerate(t.arguments.split(",")):
position=i+1
- c.execute('INSERT INTO argument VALUES (?, ?, ?)', (appExecId, position, a.strip()))
-
+ if runIsNewSQLite:
+ c.execute('INSERT INTO argument VALUES (?, ?, ?)', (appExecId, position, a.strip()))
+ if runIsNewPgSQL:
+ os.popen(pgsqlConnString + ' -c "INSERT INTO argument VALUES (\'' + appExecId + '\', ' +
+ '\'' + str(position) + '\', ' +
+ '\'' + a.strip() + '\');"')
# Extracts resource usage information from wrapper logs
+ noWrapperLog=False
wrapperLogFileName=runDirName + "/" + scriptFileName.rstrip('.swift') + "-" + runDirName + ".d/" + t.workdir.split('/')[3] + "-info"
if not(os.path.exists(wrapperLogFileName)):
noWrapperLog=True
@@ -247,18 +353,32 @@
for i, pair in enumerate(line.lstrip('APP_RESOURCES=').split(',')):
entry=pair.split(':')
if i == 0:
- c.execute("INSERT INTO resource_usage (app_exec_id, %s) VALUES ('%s',%s)" % (entry[0], appExecId, entry[1].rstrip('%')))
+ if runIsNewSQLite:
+ c.execute("INSERT INTO resource_usage (app_exec_id, %s) VALUES ('%s',%s)" % (entry[0], appExecId, entry[1].rstrip('%')))
+ if runIsNewPgSQL:
+ os.popen(pgsqlConnString + ' -c "INSERT INTO resource_usage (app_exec_id, ' +
+ entry[0] + ') VALUES (\'' + appExecId + '\', ' +
+ '\'' + entry[1].rstrip('%') + '\');"')
else:
- c.execute("UPDATE resource_usage SET %s=%s WHERE app_exec_id='%s'" % (entry[0], entry[1].rstrip('%'), appExecId))
+ if runIsNewSQLite:
+ c.execute("UPDATE resource_usage SET %s=%s WHERE app_exec_id='%s'" % (entry[0], entry[1].rstrip('%'), appExecId))
+ if runIsNewPgSQL:
+ os.popen(pgsqlConnString + ' -c "UPDATE resource_usage SET ' + entry[0] + '=' +
+ entry[1].rstrip('%') + ' WHERE app_exec_id=\'' + appExecId + '\'"')
wrapperLogFile.close()
if noWrapperLog:
- print "Provenance import: wrapper log not found, enable wrapperlog.always.transfer in swift.properties to gather resource consumption information"
- print "Provenance import: finished importing to ~/.swift_provenance.db"
+ print "Provenance import: wrapper log not found, set wrapperlog.always.transfer=true in swift.properties to gather resource consumption information"
+ if runIsNewSQLite:
+ print "Provenance import: finished importing to ~/.swift_provenance.db"
+ if runIsNewPgSQL:
+ print "Provenance import: finished importing to PostgreSQL"
+
else:
print "Provenance import: run " + runId + " is already in the database"
- conn.commit()
- conn.close()
+ if runIsNewSQLite:
+ conn.commit()
+ conn.close()
else:
# Print tasks
for t in sorted(tasks.values(), key=operator.attrgetter('taskNumber')):
More information about the Swift-commit
mailing list