pyGlobus io.py for current AG cvs
Eric Olson
eolson at mcs.anl.gov
Wed Mar 10 17:23:11 CST 2004
If you're getting an undefined ThreadingGSITCPSocketServer in cvs, this
file defines an empty one.
-------------- next part --------------
############################################################################
# Keith R. Jackson, LBNL
# See Copyright for copyright notice!
###########################################################################
"""
The main interface to the Globus io module.
"""
from pyGlobus import utilc
from pyGlobus.util import GlobusException, Buffer
import ioc
import SocketServer, socket, os, httplib
from pyGlobus.security import GSSContext
from pyGlobus.security import GSSCred
from pyGlobus import gsic
##########################################################################
# Module Methods
#########################################################################
def server_auth_callback(server, g_handle, remote_user, context):
server.remote_user = remote_user
# print "inside GSITCPServer.auth_callback. Remote user is %s" % remote_user
return 1
def client_auth_callback(server, g_handle, remote_user, context):
# print "inside client_auth_callback. Remote user is %s" % remote_user
return 1
###########################################################################
# Module Classes
###########################################################################
class AuthDataException(GlobusException):
pass
class IOBaseException(GlobusException):
pass
class FileIOAttrException(GlobusException):
pass
class NetIOAttrException(GlobusException):
pass
class TCPIOAttrException(NetIOAttrException):
pass
class AuthData:
def __init__(self, handle=None):
self._cbList = []
ret = utilc.globus_module_activate(utilc.get_module())
if not ret == 0:
ex = AuthDataException("Unable to initialize the common module")
raise ex
ret = utilc.globus_module_activate(ioc.get_module())
if not ret == 0:
ex = AuthDataException("Unable to initialize the module")
raise ex
if handle == None:
ret, handle = ioc.authorization_data_init()
if not ret == 0:
ex = AuthDataException(handle)
raise ex
self._handle = handle
return
def __del__(self):
for cb in self._cbList:
self.free_callback(cb)
ret, err = ioc.authorization_data_destroy(self._handle)
if not ret == 0:
ex = AuthDataException(err)
raise ex
ret = utilc.globus_module_deactivate(ioc.get_module())
if not ret == 0:
ex = AuthDataException("unable to deactivate the module")
raise ex
ret = utilc.globus_module_deactivate(utilc.get_module())
if not ret == 0:
ex = AuthDataException("unable to deactivate the module")
raise ex
return
def set_identity(self, identity):
ret, err = ioc.authorization_data_set_identity(self._handle, identity)
if not ret == 0:
ex = AuthDataException(err)
raise ex
return
def get_identity(self):
list = ioc.authorization_data_get_identity(self._handle)
if not list[0] == 0:
ex = AuthDataException(list[1])
raise ex
return list[2]
def set_callback(self, callback, arg):
ret, cbStruct = ioc.authorization_data_set_callback(self._handle,
callback, arg)
if not ret == 0:
ex = AuthDataException(cbStruct)
raise ex
self._cbList.append(cbStruct)
return cbStruct
#########################################################################
# Free's the underlying memory allocated for the callback structure.
#########################################################################
def free_callback(self, cbHandle):
ioc.free_callback(cbHandle)
indx = self._cbList.index(cbHandle)
del self._cbList[indx]
return
########################################################################
# XXX This handle is only valid as long as this object exists. When
# the object is destroyed, the memory this handle points to is freed.
#######################################################################
def get_handle(self):
return self._handle
class IOBase:
def __init__(self, handle):
self._handle = handle
self._cbList = []
ret = utilc.globus_module_activate(utilc.get_module())
if not ret == 0:
ex = IOBaseException("Unable to initialize the common module")
raise ex
ret = utilc.globus_module_activate(ioc.get_module())
if not ret == 0:
ex = IOBaseException("Unable to initialize the io module")
raise ex
return
def __del__(self):
for cb in self._cbList:
self.free_callback(cb)
if not self._handle == None:
try:
self.close()
except IOBaseException:
pass
ioc.free_handle(self._handle)
ret = utilc.globus_module_deactivate(ioc.get_module())
if not ret == 0:
ex = IOBaseException("Unable to deactivate the io module")
raise ex
ret = utilc.globus_module_deactivate(utilc.get_module())
if not ret == 0:
ex = IOBaseException("Unable to deactivate the common module")
raise ex
return
def register_close(self, callback, arg):
if self._handle == None:
ex = IOBaseException("Unable to close uninitialized handle")
raise ex
list = ioc.io_register_close(self._handle, callback, arg)
if not list[0] == 0:
ex = IOBaseException(list[1])
raise ex
self._cbList.append(list[1])
return list[1]
def close(self):
if self._handle == None:
ex = IOBaseException("Unable to close uninitialized handle")
raise ex
ret, err = ioc.io_close(self._handle)
if not ret == 0:
ex = IOBaseException(err)
raise ex
return
def cancel(self, doCallbacks):
if self._handle == None:
ex = IOBaseException("Unable to cancel uninitialized handle")
raise ex
ret, err = ioc.io_cancel(self._handle, doCallbacks)
if not ret == 0:
ex = IOBaseException(err)
raise ex
return
def register_read(self, buffer, maxBytes, waitForBytes, callback, arg):
if self._handle == None:
ex = IOBaseException("Cannot read from uninitialized handle")
raise ex
ret, cbHandle = ioc.io_register_read(self._handle, buffer.get_handle(),
maxBytes, waitForBytes,
callback, arg)
if not ret == 0:
ex = IOBaseException(cbHandle)
raise ex
self._cbList.append(cbHandle)
return cbHandle
def try_read(self, buffer, maxBytes):
if self._handle == None:
ex = IOBaseException("Cannot read from uninitialized handle")
raise ex
ret, bytesRead = ioc.io_try_read(self._handle,
buffer.get_handle(), maxBytes)
if not ret == 0:
ex = IOBaseException(bytesRead)
raise ex
return bytesRead
def read(self, buffer, maxBytes, waitForBytes):
if self._handle == None:
ex = IOBaseException("Cannot read from uninitialized handle")
raise ex
ret, bytesRead = ioc.io_read(self._handle, buffer.get_handle(),
maxBytes, waitForBytes)
if not ret == 0:
ex = IOBaseException(bytesRead)
raise ex
return bytesRead
def register_write(self, data, dataLen, callback, arg):
if self._handle == None:
ex = IOBaseException("Cannot write to an uninitialized handle")
raise ex
ret, cbHandle = ioc.io_register_write(self._handle, data, dataLen,
callback, arg)
if not ret == 0:
ex = IOBaseException(cbHandle)
raise ex
self._cbList.append(cbHandle)
return cbHandle
def try_write(self, data, maxLen):
if self._handle == None:
ex = IOBaseException("Cannot write to an uninitialized handle")
raise ex
ret, bytesWritten = ioc.io_try_write(self._handle, data, maxLen)
if not ret == 0:
ex = IOBaseException(bytesWritten)
raise ex
return bytesWritten
def write(self, data, dataLen):
if self._handle == None:
ex = IOBaseException("Cannot write to an uninitialized handle")
raise ex
ret, bytesWritten = ioc.io_write(self._handle, data, dataLen)
if not ret == 0:
ex = IOBaseException(bytesWritten)
raise ex
return bytesWritten
#########################################################################
# Free's the underlying memory allocated for the callback structure.
#########################################################################
def free_callback(self, cbHandle):
ioc.free_callback(cbHandle)
indx = self._cbList.index(cbHandle)
del self._cbList[indx]
return
########################################################################
# XXX This handle is only valid as long as this object exists. When
# the object is destroyed, the memory this handle points to is freed.
#######################################################################
def get_handle(self):
return self._handle
class FileIOAttr:
def __init__(self, handle = None):
self._activated = 0
ret = utilc.globus_module_activate(ioc.get_module())
if not ret == 0:
ex = FileIOAttrException("Unable to initialize the module")
raise ex
self._activated = 1
if handle == None:
ret, handle = ioc.fileattr_init()
if not ret == 0:
ex = FileIOAttrException("Unable to initialize the attr")
raise ex
self._handle = handle
def __del__(self):
ret = ioc.fileattr_destroy(self._handle)
if not ret == 0:
ex = FileIOAttrException("unable to destroy the attr")
raise ex
ret = utilc.globus_module_deactivate(ioc.get_module())
if not ret == 0:
ex = FileIOAttrException("Unable to deactivate the module")
raise ex
########################################################################
# XXX This handle is only valid as long as this object exists. When
# the object is destroyed, the memory this handle points to is freed.
#######################################################################
def get_handle(self):
return self._handle
class GSIFile:
"""
This is a file-like class that wraps a GSITCPSocket.
"""
def __init__(self, sock, mode):
"""
Initialize a GSIFile class. the sock argument should be a GSITCPSocket.
The mode argument is interpreted as for a file object, except that only
'r' and 'w' are meaningful.
"""
self.readok = 0
self.writeok = 0
self.open = 1
for char in mode[:]:
if char == "r":
self.readok = 1
self.rbuf = Buffer(8192)
if char == "w":
self.writeok = 1
self.sock = sock
self.sock.refcount = self.sock.refcount + 1
def __del__(self):
if self.open:
self.close()
def read(self, size=0,waitForBytes=1):
"""
Read size bytes, returning a string.
"""
if self.readok == 0:
raise IOError, "File not open for reading"
if size == 0:
size = self.rbuf.get_size()
if waitForBytes > size:
raise RuntimeException, "waitForBytes is larger than buffer_size"
elif self.rbuf.get_size() < size:
del(self.rbuf)
self.rbuf = Buffer(size)
nbytes = self.sock.read(self.rbuf, size, waitForBytes)
if nbytes < 1:
return ""
return self.rbuf.as_string(nbytes)
def readline(self):
"""
Read a line, and return it.
"""
# There's probably a much more efficient way to implement this...
str = ""
while (1):
s = self.read(1)
if s == "":
break
str = str + s[0]
if s[0] == '\n':
break
return str
def write(self, str):
"""
Write a string to the socket.
"""
if self.writeok == 0:
raise IOError, "File not open for writing"
self.sock.write(str, len(str))
def flush(self):
"""
In theory, flush any pending output. In practice, this does nothing.
"""
if self.writeok == 0:
raise IOError, "File not open for writing"
pass
def close(self):
"""
Close the associated socket.
"""
self.open = 0
self.sock.close()
def shutdown(self, n):
"""
Shutdown the associated socket.
"""
self.sock.shutdown(n)
class GSIInfile(GSIFile):
def __init__(self, g_handle):
GSIFile.__init__(self, g_handle, "r")
class GSIOutfile(GSIFile):
def __init__(self, g_handle):
GSIFile.__init__(self, g_handle, "w")
class GSITCPServer(SocketServer.BaseServer):
"""Base class for various socket-based server classes.
Defaults to synchronous IP stream (i.e., TCP).
Methods for the caller:
- __init__(server_address, RequestHandlerClass)
- serve_forever()
- handle_request() # if you don't use serve_forever()
- fileno() -> int # for select()
Methods that may be overridden:
- server_bind()
- server_activate()
- get_request() -> request, client_address
- verify_request(request, client_address)
- process_request(request, client_address)
- close_request(request)
- handle_error()
Methods for derived classes:
- finish_request(request, client_address)
Class variables that may be overridden by derived classes or
instances:
- address_family
- socket_type
- request_queue_size (only for stream sockets)
- reuse_address
Instance variables:
- server_address
- RequestHandlerClass
- socket
"""
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
request_queue_size = 5
allow_reuse_address = 0
def __init__(self, server_address, RequestHandlerClass,
channel_mode=ioc.GLOBUS_IO_SECURE_CHANNEL_MODE_SSL_WRAP,
delegation_mode=ioc.GLOBUS_IO_SECURE_DELEGATION_MODE_FULL_PROXY,
tcpAttr=None):
"""Constructor. May be extended, do not override."""
SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass)
if tcpAttr == None:
self.attr = TCPIOAttr()
self.attr.set_authentication_mode(ioc.GLOBUS_IO_SECURE_AUTHENTICATION_MODE_GSSAPI)
self.authdata = AuthData()
self.authdata.set_callback(server_auth_callback, self)
self.attr.set_authorization_mode(ioc.GLOBUS_IO_SECURE_AUTHORIZATION_MODE_CALLBACK, self.authdata)
self.attr.set_delegation_mode(delegation_mode)
self.attr.set_restrict_port(0)
self.attr.set_reuseaddr(1)
self.attr.set_nodelay(0)
self.attr.set_channel_mode(channel_mode)
else:
self.attr = tcpAttr
self.delegated_cred = None
self.socket = GSITCPSocket()
self.server_bind()
self.server_activate()
return
def get_delegated_credential(self):
return self.delegated_cred
def server_bind(self):
"""Called by constructor to bind the socket.
May be overridden.
"""
self.port = self.socket.create_listener(self.attr,
self.server_address[1])
# print "bound to port ", self.port
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
pass
def server_close(self):
"""Called to clean-up the server.
May be overridden.
"""
self.socket.close()
def fileno(self):
"""Return socket file number.
Interface required by select().
"""
return self.socket.fileno()
def get_request(self):
"""Get the request and client address from the socket.
May be overridden.
"""
self.socket.listen()
try:
(request, client_address) = self.socket.accept(self.attr)
self.delegated_cred = request.get_delegated_credential()
except GSITCPSocketException, ex:
raise socket.error, ex.args
return request, client_address
def close_request(self, request):
"""Called to clean up an individual request."""
request.close()
class GSITCPSocketException(IOBaseException):
"""
A sub-class of IOBaseException.
Used to report errors in the GSITCPSocket class.
**Extends:**
*pyGlobus.io.IOBase.IOBaseException*
"""
pass
class GSITCPSocket(IOBase):
"""
A simple socket interface to secure Globus IO.
Provides the ability to make strongly authenticated connetions using GSI. It
supports several modes of authorization and provides optional integrity/privacy
to the data.
**Extends:**
*pyGlobus.io.IOBase.IOBase*
"""
def __init__(self, handle = None):
"""
Constructs an instance.
Each instance wraps a globus_io_handle_t object. An instance of this class
may only have a single open connection at a time. To have more then one open
connection, use multiple instances.
**Arguments:**
- *handle* An optional argument that should contain a SWIG'ized pointer to
a globus_io_handle_t object.
**Raises:**
An IOBaseException is thrown if unable to initialize the globus modules.
"""
IOBase.__init__(self, handle)
self.refcount = 1
return
def __del__(self):
"""
Destroys an instance.
**Raises:**
An IOBaseException is thrown if unable to deactivate the globus modules.
"""
IOBase.__del__(self)
return
def register_connect(self, host, port, attr, callback, arg):
"""
Asynchronous TCP connection establishment.
Connect a TCP socket on the specified host/port pair. The connection will be
started by this function, and a callback will be invoked when the connection is
established.
**Arguments:**
- *host* A string containing the host to connect to.
- *port* An int containing the port to connect to.
- *attr* A TCPIOAttr object with the appropriate attributes set.
- *callback* Function to be called when the security handshake is
complete. The function has the following prototype:
'function(arg, handle, result)'
arg is the user argument passed when setting the callback,
handle is a SWIG'ized pointer to a globus_io_handle_t, and
result is a SWIG'ized pointer to a globus_result_t object.
- *arg* A user argument to be returned when the callback is executed.
**Raises:**
A GSITCPSocketException is thrown if the handle is currently in use,
or an error occurs while making the connection.
**Returns:**
A SWIG'ized pointer to a callback handle. This handle can be free'd
with a call to free_callback, or it will be free'd when the instance
is destroyed.
"""
if not self._handle == None:
ex = GSITCPSocketException("Handle is already active")
raise ex
ret, cbHandle, handle = ioc.tcp_register_connect(host, port,
attr.get_handle(),
callback, arg)
if not ret == 0:
ex = GSITCPSocketException(cbHandle)
raise ex
self._cbList.append(cbHandle)
self._handle = handle
return cbHandle
def connect(self, host, port, attr):
"""
Blocking TCP connection establishment.
Connect a TCP socket on the specified host/port pair.
**Arguments:**
- *host* A string containing the host to connect to.
- *port* An int containing the port to connect to.
- *attr* A TCPIOAttr object with the appropriate attributes set.
**Raises:**
A GSITCPSocketException is thrown if the handle is currently in use,
or an error occurs while making the connection.
"""
if not self._handle == None:
ex = GSITCPException("Handle is already active")
raise ex
# print "calling connect h - %s p - %d" %(host, port)
ret, handle = ioc.tcp_connect(host, port, attr.get_handle())
if not ret == 0:
ex = GSITCPSocketException(handle)
raise ex
self._handle = handle
return
def create_listener(self, attr, port=0, backlog=-1):
"""
Create a TCP server socket.
Creates a socket handle capable of accepting new TCP connections
from other hosts or processes. In order to actually listen for
connections, you must call either listen or register listen on this
instance.
**Arguments**
- *attr* A TCPIOAttr containing the attributes for this server.
- *port* The TCP port that the socket will listen for connections on. If the
port number is 0, then an arbitrary TCP port will be selected. If
this is true, and the restrict_port attribute is set to TRUE (the
default) and the GLOBUS_TCP_PORT_RANGE environment variable
was set when Globus I/O was initialized, then the port will be
selected from that range. Otherwise, any port number may be
chosen.
- *backlog* The backlog parameter indicates he maximum length of the
system's queue of pending connections. Any connection attempts
when the queue is full will fail. If backlog is equal to -1, then the
system-specific maximum queue length will be used.
**Raises:**
A GSITCPSocketException is thrown if the handle is currently in use,
or an error occurs while making the connection.
**Returns:**
An int containing the port the server will listen on.
"""
if not self._handle == None:
ex = GSITCPSocketException("Handle is already active")
raise ex
ret, port, handle = ioc.tcp_create_listener(port, backlog, attr.get_handle())
if not ret == 0:
ex = GSITCPSocketException(port)
raise ex
self._handle = handle
return port
def register_accept(self, attr, callback, arg):
"""
Asynchronous server-side TCP connection establishment.
Once the connection has been accepted, the callback function will be called,
with the arg and a newly created, connected, handle argument passed to the
callback.
**Arguments:**
- *attr* A TCPIOAttr object containing the attributes for the new eonnection.
- *callback* Function to be called when the security handshake is
complete. The function has the following prototype:
'function(arg, handle, result)'
arg is the user argument passed when setting the callback,
handle is a SWIG'ized pointer to a globus_io_handle_t, and
result is a SWIG'ized pointer to a globus_result_t object.
- *arg* A user argument to be returned when the callback is executed.
**Raises:**
A GSITCPSocketException is thrown if a listener was not already created, or
an error occures while doing the underlying register_accept.
**Returns**
A tuple containing:
- A new instance of GSITCPSocket for the new connetion, this must
not be used until the callback occurs.
- A callback handle. This can either be free'd with a call
to free_callback, or all outstanding handles will be free'd when
the instance is destroyed.
"""
if self._handle == None:
ex = GSITCPSocketException("Unable to accept before creating a listener")
raise ex
ret, cbHandle, childHandle = ioc.tcp_register_accept(self._handle,
attr.get_handle(),
callback, arg)
if not ret == 0:
ex = GSITCPSocketException(cbHandle)
raise ex
self._cbList.append(cbHandle)
return GSITCPSocket(childHandle), cbHandle
def accept(self, attr):
"""
Blocking server-side TCP connection establishment.
**Arguments:**
- *attr* A TCPIOAttr object containing the attributes for the new eonnection.
**Raises:**
A GSITCPSocketException is thrown if a listener was not already created, or
an error occures while doing the underlying accept.
**Returns:**
A new GSITCPSocket object for the new connection.
"""
if self._handle == None:
ex = GSITCPSocketException("Unable to accept before creating a listener")
raise ex
ret, childHandle = ioc.tcp_accept(self._handle, attr.get_handle())
if not ret == 0:
ex = GSITCPSocketException(childHandle)
raise ex
soc = GSITCPSocket(childHandle)
ret, addr, port = ioc.tcp_get_remote_address(childHandle)
return soc, (addr, port)
def register_listen(self, callback, arg):
"""
Asynchronous wait until a client connection is pending.
The method will issue a callback when a connection is pending.
**Arguments:**
- *callback* The function to be called when a new connection
is pending. The function has the following prototype:
'function(arg, handle, result)'
arg is the user argument passed when setting the callback,
handle is a SWIG'ized pointer to a globus_io_handle_t, and
result is a SWIG'ized pointer to a globus_result_t object.
- *arg* A user argument to be returned when the callback is executed.
**Raises:**
A GSITCPSocketException is thrown if a listener was not already created, or
an error occures while doing the underlying register_listen.
**Returns:**
A callback handle. The callback handle can either be free'd with a call
to free_callback, or all outstanding handles will be free'd when the instance
is destroyed.
"""
if self._handle == None:
ex = GSITCPSocketException("Unable to listen before creating a listener")
raise ex
ret, cbHandle = ioc.tcp_register_listen(self._handle, callback, arg)
if not ret == 0:
ex = GSITCPSocketException(cbHandle)
raise ex
self._cbList.append(cbHandle)
return cbHandle
def listen(self):
"""
Block until a client connection is pending.
This method will block until a connection is pending. Once this method
returns, accept may be called to return a new GSITCPSocket object.
**Raises:**
A GSITCPSocketException is thrown if a listener was not already created, or
an error occures while doing the underlying listen.
"""
if self._handle == None:
ex = GSITCPSocketException("Unable to listen before creating a listener")
raise ex
ret, err = ioc.tcp_listen(self._handle)
if not ret == 0:
ex = GSITCPSocketException(err)
raise ex
return
def get_local_address(self):
"""
Return the local IP and port.
**Raises:**
A GSITCPSocketException is thrown if a listener was not already created.
**Returns:**
A tuple containing:
- The IP address of the local socket.
- The port number.
"""
if self._handle == None:
ex = GSITCPSocketException("Unable to get address from uninitialized handle")
raise ex
list = ioc.tcp_get_local_address(self._handle)
if not list[0] == 0:
ex = GSITCPSocketException(list[1])
raise ex
return (list[1], list[2])
def get_remote_address(self):
"""
Return the remote IP and port.
**Raises:**
A GSITCPSocketException is thrown if a listener was not already created.
**Returns:**
A tuple containing:
- The IP address of the remote socket.
- The port number.
"""
if self._handle == None:
ex = GSITCPSocketException("Unable to get address from uninitialized handle")
raise ex
list = ioc.tcp_get_remote_address(self._handle)
if not list[0] == 0:
ex = GSITCPSocketException(list[1])
raise ex
return (list[1], list[2])
def get_security_context(self):
"""
Return the GSS security context.
**Raises:**
A GSITCPSocketException is thrown if a listener was not already created.
**Returns:**
A GSSContext object.
"""
if self._handle == None:
ex = GSITCPSocketException("Unable to get context from uninitialized handle")
raise ex
ret, context = ioc.tcp_get_security_context(self._handle)
if not ret == 0:
ex = GSITCPSocketException(context)
raise ex
return GSSContext(context, 0)
#Experimental GSS extensions
def init_delegation(self, cred, oids, buffers, time_req):
"""
Delegate a credential to a remote entity.
cred is the credential to delegate
oids is the set of extension oids to add to the credential
buffers is the set of extension values (must correspond to oids)
time_req is the requested credential lifetime, in seconds.
"""
if self._handle == None:
ex = GSITCPSocketException("unable to init delegation from unititialized handle")
raise ex
if cred == None:
cred_handle = gsic.get_no_cred_const_ptr()
else:
cred_handle = cred.get_handle()
gsic.init_delegation(self.get_security_context().get_handle(),
cred_handle, oids, buffers, time_req, self._handle)
return
def accept_delegation(self, oids, buffers, time_req):
if self._handle == None:
ex = GSITCPSocketException("unable to accept delegation from unititialized handle")
raise ex
cred = gsic.accept_delegation(self.get_security_context().get_handle(),
oids, buffers, time_req, self._handle)
return GSSCred(cred, 0)
def get_delegated_credential(self):
"""
Return the delegated credential.
**Raises:**
A GSSCredException is thrown if unable to return the credential.
**Returns:**
A GSSCred object.
"""
if self._handle == None:
ex = GSITCPSocketException("Unable to get credential from uninitialized handle")
raise ex
ret, cred = ioc.tcp_get_delegated_credential(self._handle)
if not ret == 0:
# print "ret is %i" % ret
# print "error is %s" % cred
ex = GSITCPSocketException(cred)
raise ex
return GSSCred(cred, 0)
def makefile(self, mode="r", bufsize=0):
"""
Create a GSIFile object from the socket. Note: unlike socket.makefile,
this routine does NOT dup the socket.
"""
return GSIFile(self, mode)
def shutdown(self, how):
"""
This should emulate socket.shutdown -- currently, this routine does
nothing (it's simply here as a placeholder).
"""
# should really do something here ...
pass
def sendall(self, str):
"""
Emulates socket.send -- writes a string out over the socket.
"""
l = len(str)
num = self.write(str, l)
while num < l:
l = l - num
num = self.write(str[num:], l)
return
def send(self, str):
"""
Emulates socket.send -- writes a string out over the socket.
"""
return self.write(str, len(str))
def close(self):
"""
Close this socket, if there are no open GSIFile objects using it.
"""
self.refcount = self.refcount - 1
if self.refcount < 1:
IOBase.close(self)
class GSITCPSocketServer(GSITCPServer):
"""Derives from the base GSITCPServer class.
Allows applications using the default SocketServer class to use the
GSI context supplied by Globus instead.
If the application needs to connect both secure and non-secure
connections, a seperate instance of GSITCPSocketServer can be created
and run alongside the unsecure SocketServer.
*Be sure to use different ports for both!*
"""
def __init__(self, addr, RequestHandlerClass,
channel_mode=ioc.GLOBUS_IO_SECURE_CHANNEL_MODE_GSI_WRAP,
delegation_mode=ioc.GLOBUS_IO_SECURE_DELEGATION_MODE_FULL_PROXY,
tcpAttr=None):
GSITCPServer.__init__(self, addr, RequestHandlerClass,
channel_mode, delegation_mode, tcpAttr)
self.socket.allow_reuse_address = 1
# print "GSI server listening on %d" % (addr[1],)
return
#NOTE: This doesn't always free up the addr
def server_close(self):
self.socket.close()
return
class GSIRequestHandler(SocketServer.StreamRequestHandler):
def handle(self):
# print "GSIRequestHandler got a connection from %s" % (self.client_address,)
cred = self.server.get_delegated_credential()
self.server.delegated_cred = cred
result = self.rfile.readline()
while result:
result = self.rfile.readline()
return
DELEGATION_MODE = ioc.GLOBUS_IO_SECURE_DELEGATION_MODE_FULL_PROXY
CHANNEL_MODE = ioc.GLOBUS_IO_SECURE_CHANNEL_MODE_GSI_WRAP
class ThreadingGSITCPSocketServer(SocketServer.ThreadingMixIn, GSITCPSocketServer):
"""
Threaded GSITCPSocketServer
"""
pass
class GSIHTTPConnection(httplib.HTTPConnection):
"This class allows communication via SSL."
def __init__(self, host, port=None, tcpAttr=None, **x509):
httplib.HTTPConnection.__init__(self, host, port=None)
if tcpAttr is None:
self.io_attr = TCPIOAttr()
self.io_attr.set_authentication_mode(ioc.GLOBUS_IO_SECURE_AUTHENTICATION_MODE_GSSAPI)
authdata = AuthData()
self.authdata = authdata
authdata.set_callback(client_auth_callback, None)
self.io_attr.set_authorization_mode(ioc.GLOBUS_IO_SECURE_AUTHORIZATION_MODE_CALLBACK, authdata)
self.io_attr.set_delegation_mode(DELEGATION_MODE)
self.io_attr.set_nodelay(0)
self.io_attr.set_channel_mode(CHANNEL_MODE)
else:
self.io_attr = tcpAttr
self.response_class = httplib.HTTPResponse
# hack: port is not being coppied to self.port by HTTPConnection.__init__
if port:
self.port = port
if self.debuglevel > 0:
print "GSIHTTPconnection.__init__"
print "GSIHTTPConnection host=%s port=%d shost=%s sport=%d" %(host, port, self.host, self.port)
def connect(self):
self.sock = GSITCPSocket()
self.sock.connect(self.host, self.port, self.io_attr)
if self.debuglevel > 0:
print "connect: (%s, %s)" % (self.host, self.port)
def close(self):
if self.debuglevel > 0:
print "close: (%s, %s)" % (self.host, self.port)
class GSIHTTP(httplib.HTTP):
def __init__(self, host='', port=None, tcpAttr=None, **x509):
#self._connection_class = GSIHTTPConnection
#
# _connection_class is used as:
# self._setup(self._connection_class(host, port))
# in httplib.
# So we can make the class actually a lambda that passes along the
# attributes.
#
class ConnFactory:
def __init__(self, attr):
self.attr = attr
def __call__(self, host, port, strict = None):
return GSIHTTPConnection(host, port, tcpAttr = self.attr)
self._connection_class = ConnFactory(tcpAttr)
# self._connection_class = lambda host, port, tcpAttr = tcpAttr: \
# GSIHTTPConnection(host, port, tcpAttr=tcpAttr)
httplib.HTTP.__init__(self, host, port)
class NetIOAttr:
def __init__(self):
self._activated = 0
self._handle = None
self._authData = None
ret = utilc.globus_module_activate(utilc.get_module())
if not ret == 0:
ex = NetIOAttrException("Unable to initialize the common module")
raise ex
ret = utilc.globus_module_activate(ioc.get_module())
if not ret == 0:
ex = NetIOAttrException("Unable to initialize the io module")
raise ex
self._activated = 1
return
def __del__(self):
ret = utilc.globus_module_deactivate(ioc.get_module())
if not ret == 0:
ex = NetIOAttrException("Unable to deactivate the io module")
raise ex
ret = utilc.globus_module_deactivate(utilc.get_module())
if not ret == 0:
ex = NetIOAttrException("Unable to deactivate the common module")
raise ex
def set_reuseaddr(self, reuse):
ret, err = ioc.attr_set_socket_reuseaddr(self._handle, reuse)
if not ret == 0:
ex = NetIOAttrException(err)
raise ex
return
def get_reuseaddr(self):
list = ioc.attr_get_socket_reuseaddr(self._handle)
if not list[0] == 0:
ex = NetIOAttrException(list[1])
raise ex
return list[2]
def set_keepalive(self, keepAlive):
ret, err = ioc.attr_set_socket_keepalive(self._handle, keepAlive)
if not ret == 0:
ex = NetIOAttrException(err)
raise ex
return
def get_keepalive(self):
list = ioc.attr_get_socket_keepalive(self._handle)
if not list[0] == 0:
ex = NetIOAttrException(list[1])
raise ex
return list[2]
def set_linger(self, linger, lingerTime):
ret, err = ioc.attr_set_socket_linger(self._handle, linger, lingerTime)
if not ret == 0:
ex = NetIOAttrException(err)
raise ex
return
def get_linger(self):
list = ioc.attr_get_socket_linger(self._handle)
if not list[0] == 0:
ex = NetIOAttrException(list[1])
raise ex
return list[2], list[3]
def set_oobinline(self, inline):
ret, err = ioc.attr_set_socket_oobinline(self._handle, inline)
if not ret == 0:
ex = NetIOAttrException(err)
raise ex
return
def get_oobinline(self):
list = ioc.attr_get_socket_oobinline(self._handle)
if not list[0] == 0:
ex = NetIOAttrException(list[1])
raise ex
return list[2]
def set_sndbuf(self, sndbuf):
ret, err = ioc.attr_set_socket_sndbuf(self._handle, sndbuf)
if not ret == 0:
ex = NetIOAttrException(err)
raise ex
return
def get_sndbuf(self):
list = ioc.attr_get_socket_sndbuf(self._handle)
if not list[0] == 0:
ex = NetIOAttrException(list[1])
raise ex
return list[2]
def set_rcvbuf(self, rcvbuf):
ret, err = ioc.attr_set_socket_rcvbuf(self._handle, rcvbuf)
if not ret == 0:
ex = NetIOAttrException(err)
raise ex
return
def get_rcvbuf(self):
list = ioc.attr_get_socket_rcvbuf(self._handle)
if not list[0] == 0:
ex = NetIOAttrException(list[1])
raise ex
return list[2]
########################################################################
# XXX This handle is only valid as long as this object exists. When
# the object is destroyed, the memory this handle points to is freed.
#######################################################################
def get_handle(self):
return self._handle
class TCPIOAttr(NetIOAttr):
def __init__(self, handle = None):
NetIOAttr.__init__(self)
if handle == None:
ret, handle = ioc.tcpattr_init()
if not ret == 0:
ex = TCPIOAttrException("Unable to initialize the attr")
raise ex
self._handle = handle
def __del__(self):
ret, err = ioc.tcpattr_destroy(self._handle)
self._handle = None
if not ret == 0:
ex = TCPIOAttrException(err)
raise ex
NetIOAttr.__del__(self)
def set_restrict_port(self, restrict):
ret, err = ioc.attr_set_tcp_restrict_port(self._handle, restrict)
if not ret == 0:
ex = TCPIOAttrException(err)
raise ex
return
def get_restrict_port(self):
list = ioc.attr_get_tcp_restrict_port(self._handle)
if not list[0] == 0:
ex = TCPIOAttrException(list[1])
raise ex
return list[2]
def set_nodelay(self, nodelay):
ret, err = ioc.attr_set_tcp_nodelay(self._handle, nodelay)
if not ret == 0:
ex = TCPIOAttrException(err)
raise ex
return
def get_nodelay(self):
list = ioc.attr_get_tcp_nodelay(self._handle)
if not list[0] == 0:
ex = TCPIOAttrException(list[1])
raise ex
return list[2]
def set_interface(self, interface):
ret, err = ioc.attr_set_tcp_nodelay(self._handle, interface)
if not ret == 0:
ex = TCPIOAttrException(err)
raise ex
return
def get_interface(self):
list = ioc.attr_get_tcp_interface(self._handle)
if not list[0] == 0:
ex = TCPIOAttrException(list[1])
raise ex
return list[2]
def set_authentication_mode(self, mode, cred = None):
if cred == None:
cred = gsic.get_no_cred_const()
ret, err = ioc.attr_set_secure_authentication_mode(self._handle,
mode, cred)
if not ret == 0:
ex = TCPIOAttrException(err)
raise ex
return
def get_authentication_mode(self):
ret, mode, cred = ioc.attr_get_secure_authentication_mode(self._handle)
if not ret == 0:
ex = TCPIOAttrException(mode)
raise ex
return mode, cred
def set_authorization_mode(self, mode, authData):
ret, err = ioc.attr_set_secure_authorization_mode(self._handle,
mode,
authData.get_handle())
if not ret == 0:
ex = TCPIOAttrException(err)
raise ex
self._authData = authData
return
def get_authorization_mode(self):
if self.authData is None:
ex = TCPIOAttrException("Auth Mode is not set")
raise ex
ret, mode = ioc.attr_get_secure_authorization_mode(self._handle,
self._authData.get_handle())
if not ret == 0:
ex = TCPIOAttrException(mode)
raise ex
return mode
def set_channel_mode(self, mode):
ret, err = ioc.attr_set_secure_channel_mode(self._handle, mode)
if not ret == 0:
ex = TCPIOAttrException(err)
raise ex
return
def get_channel_mode(self):
ret, mode = ioc.attr_get_secure_channel_mode(self._handle)
if not ret == 0:
ex = TCPIOAttrException(mode)
raise ex
return mode
def set_protection_mode(self, mode):
ret, err = ioc.attr_set_secure_protection_mode(self._handle, mode)
if not ret == 0:
ex = TCPIOAttrException(err)
raise ex
return
def get_protection_mode(self):
ret, mode = ioc.attr_get_secure_protection_mode(self._handle)
if not ret == 0:
ex = TCPIOAttrException(mode)
raise ex
return mode
def set_delegation_mode(self, mode):
ret, err = ioc.attr_set_secure_delegation_mode(self._handle, mode)
if not ret == 0:
ex = TCPIOAttrException(err)
raise ex
return
def get_delegation_mode(self):
ret, mode = ioc.attr_get_secure_delegation_mode(self._handle)
if not ret == 0:
ex = TCPIOAttrException(mode)
raise ex
return mode
More information about the ag-dev
mailing list