[Swift-commit] r7638 - trunk/bin

lgadelha at ci.uchicago.edu lgadelha at ci.uchicago.edu
Fri Mar 7 05:19:00 CST 2014


Author: lgadelha
Date: 2014-03-07 05:18:59 -0600 (Fri, 07 Mar 2014)
New Revision: 7638

Modified:
   trunk/bin/swiftlog
Log:
Included PostgreSQL support


Modified: trunk/bin/swiftlog
===================================================================
--- trunk/bin/swiftlog	2014-03-06 22:18:29 UTC (rev 7637)
+++ trunk/bin/swiftlog	2014-03-07 11:18:59 UTC (rev 7638)
@@ -11,7 +11,7 @@
     log_directory = os.path.normpath(sys.argv[1])
     log_filename = os.path.join(log_directory, os.path.basename(log_directory) + ".log")
     log_directory = os.path.normpath(sys.argv[1])
-    generateProvDB=False
+    importProv=False
 elif len(sys.argv) == 3:
     if sys.argv[1] != "-import-provenance":
         sys.exit("Usage: %s -import-provenance <run_directory>" % sys.argv[0])
@@ -19,7 +19,7 @@
         log_directory = os.path.normpath(sys.argv[2])
         log_filename = os.path.join(log_directory, os.path.basename(log_directory) + ".log")
         log_directory = os.path.normpath(sys.argv[2])
-        generateProvDB=True
+        importProv=True
 else:
     sys.exit("Usage:\n%s <run_directory>\nor\n%s -import-provenance <run_directory>" % (sys.argv[0], sys.argv[0]))
 
@@ -46,6 +46,10 @@
     thread     = ''
     workdir    = ''
 
+class Config:
+    key = ''
+    value = ''
+
 # Dictionary containing all tasks
 tasks = {}
 taskCounter = 1
@@ -90,21 +94,43 @@
     output = string.replace(entry.strip(), '/./', '/')
     return output
 
-def insertFile(fName, sRunDir, hname, crs, direction, app_exec_id):
+def insertFile(fName, sRunDir, hname, crs, direction, app_exec_id, useSQLite, usePgSQL):
     fileLocation=sRunDir + "/" + fName
     fileStat = os.stat(fileLocation)
     fileHost=hname
     fileSize=fileStat.st_size
     fileModify=fileStat.st_mtime
     fileId=fileHost + ":" + fName + ":" + str(fileSize) + ":" + str(fileModify)
-    crs.execute('SELECT COUNT(*) FROM file WHERE file_id=:fileId', {"fileId": fileId})
-    if int(crs.fetchone()[0])==0:
-        crs.execute('INSERT INTO file VALUES (?, ?, ?, ?, ?)',
-                  (fileId, fileHost, fName, fileSize, fileModify))
-    if direction=='IN':
-        crs.execute('INSERT INTO staged_in VALUES (?, ?)', (app_exec_id, fileId))
-    else:
-        crs.execute('INSERT INTO staged_out VALUES (?, ?)', (app_exec_id, fileId))
+    if useSQLite:
+        crs.execute('SELECT COUNT(*) FROM file WHERE file_id=:fileId', {"fileId": fileId})
+        if int(crs.fetchone()[0])==0:
+            crs.execute('INSERT INTO file VALUES (?, ?, ?, ?, ?)',
+                      (fileId, fileHost, fName, fileSize, fileModify))
+        if direction=='IN':
+            crs.execute('INSERT INTO staged_in VALUES (?, ?)', (app_exec_id, fileId))
+        else:
+            crs.execute('INSERT INTO staged_out VALUES (?, ?)', (app_exec_id, fileId))
+    if usePgSQL:
+        pgSQLOutput=os.popen(pgsqlConnString + ' --tuples-only -c "SELECT COUNT(*) FROM file WHERE file_id=\'' + fileId + '\'"').readline().strip()
+        if pgSQLOutput == '0':
+            os.popen(pgsqlConnString + ' -c "INSERT INTO file VALUES (\'' + fileId + '\', ' +
+                                                                         '\'' + fileHost + '\', ' +
+                                                                         '\'' + fName + '\', ' +
+                                                                         str(fileSize) + ', ' +
+                                                                         str(fileModify) + ');"')
+        if direction=='IN':
+            os.popen(pgsqlConnString + ' -c "INSERT INTO staged_in VALUES (\'' + app_exec_id + '\', ' +
+                                                                         '\'' + fileId + '\');"')
+        else:
+            os.popen(pgsqlConnString + ' -c "INSERT INTO staged_out VALUES (\'' + app_exec_id + '\', ' +
+                                                                         '\'' + fileId + '\');"')
+        
+def getConfigProperty(line):
+    configEntry=Config()
+    configTokens=line.split("=")
+    configEntry.key=configTokens[0].strip()
+    configEntry.value=configTokens[1].strip()
+    return configEntry
 
 finalState='FAIL'
 
@@ -186,57 +212,137 @@
 runId=scriptFileName.rstrip('.swift') + "-" + runDirName + "-" + logFileTailCksum
 
 
-if generateProvDB:
+if importProv:
+    
+    #read configuration file
+    enableSQLite=False
+    enablePostgreSQL=False
+    enableGlobusOnlineCatalog=False
+    provConfigFilename = swiftHomeDir + "/etc/provenance/provenance.config"
+    provConfigFile = open(provConfigFilename, "r")
+    
+    for line in iter(provConfigFile):
+        if 'database=' in line:
+            config=getConfigProperty(line)
+            if config.key=="database":
+                if config.value=="sqlite":
+                    enableSQLite=True
+                if config.value=="postgresql":
+                    enablePostgreSQL=True
+                if config.value=="gocatalog":
+                    enableGlobusOnlineCatalog=True
+        if 'pg_cmd' in line:
+            config=getConfigProperty(line)
+            if config.key=="pg_cmd":
+                pgsqlConnString=config.value.strip("\"")
+        if 'go_catalog_id' in line:
+            config=getConfigProperty(line)
+            if config.key=="go_catalog_id":
+                globusOnlineId=config.value
+        if 'go_catalog_client' in line:
+            config=getConfigProperty(line)
+            if config.key=="go_catalog_client":
+                globusOnlineClient=config.value.strip("\"")
+                    
     homeDirectory = os.path.expanduser('~')
-    provDB = homeDirectory + "/.swift_provenance.db"
+    sqliteDBFile = homeDirectory + "/.swift_provenance.db"
     
     print "Provenance import: checking if database exists" 
     
     # Initializes database if it not yet exists
-    if not(os.path.exists(provDB)):
-        print "Provenance import: creating database"
-        schemaFilename=swiftHomeDir + "/etc/provenance/schema_sqlite.sql"
-        schema=open(schemaFilename, "r").read()
-        conn = sqlite3.connect(provDB)
+    # Assumes PostgreSQL and Globus Online Catalog already initialized
+    if enableSQLite==True:
+        if not(os.path.exists(sqliteDBFile)):
+            print "Provenance import: creating database"
+            schemaFilename=swiftHomeDir + "/etc/provenance/schema_sqlite.sql"
+            schema=open(schemaFilename, "r").read()
+            conn = sqlite3.connect(sqliteDBFile)
+            c = conn.cursor()
+            c.executescript(schema)
+            conn.commit()
+            conn.close()
+    
+    runIsNewSQLite=False
+    runIsNewPgSQL=False
+    
+    # Check if run was already imported into SQLite
+    if enableSQLite:    
+        conn = sqlite3.connect(sqliteDBFile)
         c = conn.cursor()
-        c.executescript(schema)
-        conn.commit()
-        conn.close()
-        
-    conn = sqlite3.connect(provDB)
-    c = conn.cursor()
-        
-    # Checks if run was already imported
-    c.execute('SELECT COUNT(*) FROM script_run WHERE script_run_id=:runId', {"runId": runId})
-    if int(c.fetchone()[0])==0:
-        print "Provenance import: importing run " + runId
+        c.execute('SELECT COUNT(*) FROM script_run WHERE script_run_id=:runId', {"runId": runId})
+        if int(c.fetchone()[0])==0:
+            runIsNewSQLite=True
+
+    # Check if run was already imported into PostgreSQL
+    if enablePostgreSQL:
+        pgSQLOutput=os.popen(pgsqlConnString + ' --tuples-only -c "SELECT COUNT(*) FROM script_run WHERE script_run_id=\'' + runId + '\'"').readline().strip()
+        if pgSQLOutput == '0':
+            runIsNewPgSQL=True
+    
+    if runIsNewSQLite:        
+        print "Provenance import: importing run into SQLite " + runId
         c.execute('INSERT INTO script_run VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', 
                   (runId, scriptFileName, log_filename, hostname, scriptRunDir, 
-                   swiftVersion, cogVersion, finalState, scriptStartTime, scriptDuration))
+                  swiftVersion, cogVersion, finalState, scriptStartTime, scriptDuration))
+
+    if runIsNewPgSQL:
+        print "Provenance import: importing run into PostgreSQL " + runId
+        os.popen(pgsqlConnString + ' -c "INSERT INTO script_run VALUES (\'' + runId + '\', ' +
+                                                                     '\'' + scriptFileName + '\', ' +
+                                                                     '\'' + log_filename + '\', ' +
+                                                                     '\'' + hostname + '\', ' +
+                                                                     '\'' + scriptRunDir + '\', ' +
+                                                                     '\'' + swiftVersion + '\', ' +
+                                                                     '\'' + cogVersion + '\', ' +
+                                                                     '\'' + finalState + '\', ' +
+                                                                     '\'' + scriptStartTime + '\', ' +
+                                                                     '\'' + str(scriptDuration) + '\');"')
+                                                                                                                                       
+
+                                                                    
+    if runIsNewSQLite or runIsNewPgSQL:
+        
         print "Provenance import: importing application executions"
         for t in sorted(tasks.values(), key=operator.attrgetter('taskNumber')):
             appExecId=runId + ":" + t.taskId
             appExecDuration=computeDuration(t.startTime, t.stopTime)
             stageInDuration=computeDuration(t.startStageIn, t.stopStageIn)
             stageOutDuration=computeDuration(t.startStageOut, t.stopStageOut)
-            c.execute('INSERT INTO app_exec VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
-                      (appExecId, runId, t.app, t.host, t.startTime, appExecDuration, 
-                       stageInDuration, stageOutDuration, t.workdir))
+            if runIsNewSQLite:
+                c.execute('INSERT INTO app_exec VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
+                          (appExecId, runId, t.app, t.host, t.startTime, appExecDuration, 
+                           stageInDuration, stageOutDuration, t.workdir))
+            if runIsNewPgSQL:
+                os.popen(pgsqlConnString + ' -c "INSERT INTO app_exec VALUES (\'' + appExecId + '\', ' +
+                                                                             '\'' + runId + '\', ' +
+                                                                             '\'' + t.app + '\', ' +
+                                                                             '\'' + t.host + '\', ' +
+                                                                             '\'' + t.startTime + '\', ' +
+                                                                             '\'' + str(appExecDuration) + '\', ' +
+                                                                             '\'' + str(stageInDuration) + '\', ' +
+                                                                             '\'' + str(stageOutDuration) + '\', ' +
+                                                                             '\'' + t.workdir + '\');"')
+                
             for f in iter(t.stageIn.split(",")):
                 direction="IN"
                 fileName=string.replace(canonicalFileName(f), 'file://localhost/', '')
-                insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId)
+                insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId, runIsNewSQLite, runIsNewPgSQL)
                 
             for f in iter(t.stageOut.split(" ")):
                 direction="OUT"
                 fileName=canonicalFileName(f)
-                insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId)
+                insertFile(fileName, scriptRunDir, hostname, c, direction, appExecId, runIsNewSQLite, runIsNewPgSQL)
 
             for i, a in enumerate(t.arguments.split(",")):
                 position=i+1
-                c.execute('INSERT INTO argument VALUES (?, ?, ?)', (appExecId, position, a.strip()))
-        
+                if runIsNewSQLite:
+                    c.execute('INSERT INTO argument VALUES (?, ?, ?)', (appExecId, position, a.strip()))
+                if runIsNewPgSQL:
+                    os.popen(pgsqlConnString + ' -c "INSERT INTO argument VALUES (\'' + appExecId + '\', ' +
+                                                                              '\'' + str(position) + '\', ' +
+                                                                              '\'' + a.strip() + '\');"')
             # Extracts resource usage information from wrapper logs
+            noWrapperLog=False
             wrapperLogFileName=runDirName + "/" + scriptFileName.rstrip('.swift') + "-" + runDirName + ".d/" + t.workdir.split('/')[3] + "-info"
             if not(os.path.exists(wrapperLogFileName)):
                 noWrapperLog=True
@@ -247,18 +353,32 @@
                         for i, pair in enumerate(line.lstrip('APP_RESOURCES=').split(',')):
                             entry=pair.split(':')
                             if i == 0:
-                                c.execute("INSERT INTO resource_usage (app_exec_id, %s) VALUES ('%s',%s)" % (entry[0], appExecId, entry[1].rstrip('%')))
+                                if runIsNewSQLite:
+                                    c.execute("INSERT INTO resource_usage (app_exec_id, %s) VALUES ('%s',%s)" % (entry[0], appExecId, entry[1].rstrip('%')))
+                                if runIsNewPgSQL:
+                                    os.popen(pgsqlConnString + ' -c "INSERT INTO resource_usage (app_exec_id, ' + 
+                                            entry[0] + ') VALUES (\'' + appExecId + '\', ' +
+                                            '\'' + entry[1].rstrip('%') + '\');"')
                             else:
-                                c.execute("UPDATE resource_usage SET %s=%s WHERE app_exec_id='%s'" % (entry[0], entry[1].rstrip('%'), appExecId))
+                                if runIsNewSQLite:
+                                    c.execute("UPDATE resource_usage SET %s=%s WHERE app_exec_id='%s'" % (entry[0], entry[1].rstrip('%'), appExecId))
+                                if runIsNewPgSQL:
+                                    os.popen(pgsqlConnString + ' -c "UPDATE resource_usage SET ' + entry[0] + '=' + 
+                                            entry[1].rstrip('%') + ' WHERE app_exec_id=\'' + appExecId + '\'"')
                 wrapperLogFile.close()
         if noWrapperLog: 
-            print "Provenance import: wrapper log not found, enable wrapperlog.always.transfer in swift.properties to gather resource consumption information"
-        print "Provenance import: finished importing to ~/.swift_provenance.db"
+            print "Provenance import: wrapper log not found, set wrapperlog.always.transfer=true in swift.properties to gather resource consumption information"
+        if runIsNewSQLite:
+            print "Provenance import: finished importing to ~/.swift_provenance.db"
+        if runIsNewPgSQL:
+            print "Provenance import: finished importing to PostgreSQL"
+            
     else:
         print "Provenance import: run " + runId + " is already in the database"
         
-    conn.commit()
-    conn.close()
+    if runIsNewSQLite:
+        conn.commit()
+        conn.close()
 else:
     # Print tasks
     for t in sorted(tasks.values(), key=operator.attrgetter('taskNumber')):




More information about the Swift-commit mailing list