#----------------------------------------------------------------------------- # Name: AGFile.py # Purpose: Utilities for process managiment. # # Author: Hiroyuki Komatsu # # Created: 2003/08/07 # RCS-ID: $Id: $ # Copyright: (c) 2003 # Licence: See COPYING.TXT #----------------------------------------------------------------------------- import os, sys, string, re import logging, logging.handlers ### FIXME: Please correct the location of DDEConnection.py. ### FIXME: (2003-08-11) #from Accessgrid.lib.DDEConnection import DDEExecute from DDEConnection import DDEExecute from wxPython.wx import * from wxPython.wx import wxTheMimeTypesManager as mtm log = logging.getLogger("AG.AGFile") def GetMimeCommands(filename = None, type = None, ext = None): """ This function returns anything in the local mime type database for the type or extension specified. """ ### FIXME: This sentence is for under compatibility. ### FIXME: (2003-08-07) if filename == None: return None return AGFile(filename, type, ext).GetCommands() class AGFile: """ This class corresponds a file on Virtual Venue. """ def __init__(self, uri, mimeType = None, extension = None): AGFile._regexp_duplicatedURIHeader = re.compile(r"\s*\w+://(\w+://)") AGFile._regexp_htmlPrefix = re.compile(r"\"?\s*\w+://") self._initializeStatus(uri, mimeType, extension) def _initializeStatus(self, uri, mimeType, extension): self._uri = uri self._extension = extension self._mimeType = mimeType self._fileType = None self._commands = None #### #### Methods for Initialization. #### def GetURI(self): """ GetURI(self) -> URI[String] This method returns URI of the AGFile. """ return self._uri def GetExtension(self): """ GetFileExtension(self) -> extension[string] This method returns the extension of the AGFile. (e.g. 'file.txt' => 'txt', 'image.jpg' => 'jpg', 'README' => 'README', '.emacs' => 'emacs') """ if self._extension: return self._extension fileParts = string.split(os.path.basename(self._uri), ".") self._extension = fileParts[-1] if self._extension and self._extension[-1] == '"': self._extension = self._extension[:-1] return self._extension # if len(fileParts) > 1: # return fileParts[-1] # else: # return None def GetMimeType(self): """ GetMimeType(self) -> mimeType[string] This method guesses and returns the mimetype of the AGFile. (e.g. 'image.gif' => 'image/gif') """ if self._mimeType: return self._mimeType ### FIXME: self.GetFileType() sould not be used here. ### FIXME: (2003-08-07) # fileType = self.GetFileType() fileType = self.GetFileTypeFromExtension() if fileType: self._mimeType = fileType.GetMimeType() elif AGFile._regexp_htmlPrefix.match(self.GetURI()): self._mimeType = 'text/html' return self._mimeType def SetMimeType(self, mimeType): """ SetMimeType(self, mimeType[String]) -> [Void] This method sets the MIME-type of the AGFile and flushes the internal status. """ if self._mimeType != mimeType: self._mimeType = mimeType self._initializeStatus(self._uri, self._mimeType, self._extension) def GetFileType(self): """ GetFileType(self) -> fileType[fileType@wxPython] This method returns fileType of the AGFile. """ if self._fileType: return self._fileType self._fileType = (self.GetFileTypeFromMimeType() or self.GetFileTypeFromExtension()) return self._fileType def GetFileTypeFromMimeType(self, mimeType = None): mimeType = mimeType or self.GetMimeType() if mimeType != None: self._fileType = mtm.GetFileTypeFromMimeType(mimeType) return self._fileType def GetFileTypeFromExtension(self, extension = None): extension = extension or self.GetExtension() if extension != None: self._fileType = mtm.GetFileTypeFromExtension(extension) return self._fileType #### #### Methods for Execution #### def GetCommands(self): """ GetCommands(self) -> commandDict[Dict] This method return the dictionary of commands to execute the AGFile. """ if self._commands: return self._commands if self.GetFileType() != None: commands = self.GetFileType().GetAllCommands(self._uri) self._commands = self._ConvMimeCommandsToDict(commands) return self._commands def _ConvMimeCommandsToDict(self, mimeCommands): """ ConvMimeCommandsToDict(self, mimeCommands[mimeCommands@wxPython]) -> dict[Dict] This methods converts from mimeCommands from wxPython to Dict. """ commandDict = dict() if mimeCommands != None: verbs, commandLines = mimeCommands for i in range(0, len(verbs)): commandDict[string.lower(verbs[i])] = commandLines[i] return commandDict def Open(self): """ Open(self) -> ProcessID[Integer] or None This methods open the AGFile and returns PID of the process which opens the AGFile. If command is not avairable, None is returned. """ return self.Run('open') def Run(self, command = 'open'): """ Run(self, command[String] = 'open') -> ProcessID[Integer] or None This methods executes a process for the AGFile and returns PID. If command is not avairable, None is returned. """ commandLine = self.GetCommand(command) pid = self._RunInternal(commandLine) return pid def GetCommand(self, command = 'open'): """ GetCommand(self, command[String] = 'open') -> commandLine[string] This method returns the command line correspond to 'command'. """ commands = self.GetCommands() commandLine = None if commands.has_key(command): commandLine = commands[command] return commandLine def _RunInternal(self, commandLine): pid = None if commandLine != None: log.debug("executing command: %s" % commandLine) if self._IsDDECommand(commandLine): ### FIXME: The reason to execte this adhoc method is a ### FIXME: mismatch 'text/html' MIME-type and URL. ### FIXME: For URL, 'InternetShortcut' should be used instead ### FIXME: of 'text/html' (2003-08-12) commandLine = self._RunInternal_AdHoc_Replace(commandLine) ### FIXME: The return value of DDEExecute is not ProcessID. ### FIXME: (2003-08-11) pid = DDEExecute(commandLine) else: pid = wxExecute(commandLine) return pid def _IsDDECommand(self, commandLine): return commandLine[0:7] == "WX_DDE#" def _RunInternal_AdHoc_Replace(self, commandLine): """ _RunInternal_AdHoc_Replace(self, commandLine[String]): -> chopedCommandLine[String] [example] _RunInternal_AdHoc_Replace('file://http://www.accessgrid.org/') -> 'http://www.accessgrid.org/' This function chops a duplicated URI header to deal with a mismatch 'text/html' MIME-type and URL. """ ### FIXME: The reason to execte this adhoc method is a ### FIXME: mismatch 'text/html' MIME-type and URL. ### FIXME: For URL, 'InternetShortcut' should be used instead ### FIXME: of 'text/html' (2003-08-12) # _regexp_duplicatedURIHeader = r"\s*\w+://(\w+://)" return AGFile._regexp_duplicatedURIHeader.sub(r"\1", commandLine) # ------------------------------------------------------------ if __name__ == '__main__': def ReadLine(prompt = '', default = '', format = ['%s: ', '%s (%s): ']): """ ReadLine(prompt = '', default = '', format = ['%s: ', '%s (%s): ']) -> inputed_string[string] This function reads a user input and return it. If the user inputed nothing, this function returns Default value. """ if default: sys.stdout.write(format[1] % (prompt, default)) else: sys.stdout.write(format[0] % prompt) input = sys.stdin.readline().strip() return input or default from optik import Option, OptionParser class AGFileCommand: version = '0.0.1' filename = '' mimetype = '' interactive = False agFile = None def __init__(self, args = None): self._initArgs(args) def _initArgs(self, rawArgs = None): optionList = [ Option("--filename", "--file", "-f", metavar="FILENAME", help="Filename to open."), Option("--mimetype", "--type", "-t", metavar="MIMETYPE", help="Mime type of the file."), ] parser = OptionParser(option_list=optionList, version="%%prog: %s" % self.version) (options, args) = parser.parse_args(rawArgs) self.filename = vars(options)['filename'] self.type = vars(options)['mimetype'] if self.filename: interactive = True if args: parser.error("too many arguments") def printCommands(self, agfile): commands = agfile.GetCommands() if commands: print " %-14s %s" % ("key", "command") print "-" * 16 + "-+-" + "-" * 24 for key in commands: print "%16s %s" % (key, commands[key]) else: print "Unknown filetype." print "" def runCommandInteractive(self, agfile, doNothing = "Do nothing"): command = ReadLine("Command", doNothing) if command != doNothing: agfile.Run(command) return command else: return None def setAGFileInteractive(self): filename = ReadLine("File Name") agfile = AGFile(filename) mimeTypeDefault = agfile.GetMimeType() mimeType = ReadLine("Mime Type", mimeTypeDefault) agfile.SetMimeType(mimeType) return agfile def main(self): if self.filename: self.printCommands(AGFile(self.filename, self.type)) else: while True: agfile = self.setAGFileInteractive() self.printCommands(agfile) while self.runCommandInteractive(agfile): pass print "" # ------------------------------------------------------------ agfc = AGFileCommand() agfc.main()