diff options
Diffstat (limited to 'deluge/ui/client.py')
-rw-r--r-- | deluge/ui/client.py | 191 |
1 files changed, 81 insertions, 110 deletions
diff --git a/deluge/ui/client.py b/deluge/ui/client.py index 8be7aaed5..479b7367d 100644 --- a/deluge/ui/client.py +++ b/deluge/ui/client.py @@ -35,18 +35,13 @@ # import logging -from twisted.internet.protocol import Protocol, ClientFactory +from twisted.internet.protocol import ClientFactory from twisted.internet import reactor, ssl, defer -try: - import rencode -except ImportError: - import deluge.rencode as rencode - -import zlib import deluge.common from deluge import error from deluge.event import known_events +from deluge.transfer import DelugeTransferProtocol if deluge.common.windows_check(): import win32api @@ -104,11 +99,11 @@ class DelugeRPCRequest(object): return (self.request_id, self.method, self.args, self.kwargs) -class DelugeRPCProtocol(Protocol): + +class DelugeRPCProtocol(DelugeTransferProtocol): def connectionMade(self): self.__rpc_requests = {} - self.__buffer = None # Set the protocol in the daemon so it can send data self.factory.daemon.protocol = self # Get the address of the daemon that we've connected to @@ -116,102 +111,80 @@ class DelugeRPCProtocol(Protocol): self.factory.daemon.host = peer.host self.factory.daemon.port = peer.port self.factory.daemon.connected = True - log.info("Connected to daemon at %s:%s..", peer.host, peer.port) self.factory.daemon.connect_deferred.callback((peer.host, peer.port)) - def dataReceived(self, data): + def message_received(self, request): """ - This method is called whenever we receive data from the daemon. + This method is called whenever we receive a message from the daemon. + + :param request: a tuple that should be either a RPCResponse, RCPError or RPCSignal - :param data: a zlib compressed and rencoded string that should be either - a RPCResponse, RCPError or RPCSignal """ - if self.__buffer: - # We have some data from the last dataReceived() so lets prepend it - data = self.__buffer + data - self.__buffer = None + if type(request) is not tuple: + log.debug("Received invalid message: type is not tuple") + return + if len(request) < 3: + log.debug("Received invalid message: number of items in " + "response is %s", len(request)) + return + + message_type = request[0] - while data: - # Increase the byte counter - self.factory.bytes_recv += len(data) + if message_type == RPC_EVENT: + event = request[1] + #log.debug("Received RPCEvent: %s", event) + # A RPCEvent was received from the daemon so run any handlers + # associated with it. + if event in self.factory.event_handlers: + for handler in self.factory.event_handlers[event]: + reactor.callLater(0, handler, *request[2]) + return + + request_id = request[1] + + # We get the Deferred object for this request_id to either run the + # callbacks or the errbacks dependent on the response from the daemon. + d = self.factory.daemon.pop_deferred(request_id) + + if message_type == RPC_RESPONSE: + # Run the callbacks registered with this Deferred object + d.callback(request[2]) + elif message_type == RPC_ERROR: + # Recreate exception and errback'it + exception_cls = getattr(error, request[2]) + exception = exception_cls(*request[3], **request[4]) + + # Ideally we would chain the deferreds instead of instance + # checking just to log them. But, that would mean that any + # errback on the fist deferred should returns it's failure + # so it could pass back to the 2nd deferred on the chain. But, + # that does not always happen. + # So, just do some instance checking and just log rpc error at + # diferent levels. + r = self.__rpc_requests[request_id] + msg = "RPCError Message Received!" + msg += "\n" + "-" * 80 + msg += "\n" + "RPCRequest: " + r.__repr__() + msg += "\n" + "-" * 80 + if isinstance(exception, error.WrappedException): + msg += "\n" + exception.type + "\n" + exception.message + ": " + msg += exception.traceback + else: + msg += "\n" + request[5] + "\n" + request[2] + ": " + msg += str(exception) + msg += "\n" + "-" * 80 - dobj = zlib.decompressobj() - try: - request = rencode.loads(dobj.decompress(data)) - except Exception, e: - #log.debug("Received possible invalid message (%r): %s", data, e) - # This could be cut-off data, so we'll save this in the buffer - # and try to prepend it on the next dataReceived() - self.__buffer = data - return + if not isinstance(exception, error._ClientSideRecreateError): + # Let's log these as errors + log.error(msg) else: - data = dobj.unused_data - - if type(request) is not tuple: - log.debug("Received invalid message: type is not tuple") - return - if len(request) < 3: - log.debug("Received invalid message: number of items in " - "response is %s", len(3)) - return - - message_type = request[0] - - if message_type == RPC_EVENT: - event = request[1] - #log.debug("Received RPCEvent: %s", event) - # A RPCEvent was received from the daemon so run any handlers - # associated with it. - if event in self.factory.event_handlers: - for handler in self.factory.event_handlers[event]: - reactor.callLater(0, handler, *request[2]) - continue - - request_id = request[1] - - # We get the Deferred object for this request_id to either run the - # callbacks or the errbacks dependent on the response from the daemon. - d = self.factory.daemon.pop_deferred(request_id) - - if message_type == RPC_RESPONSE: - # Run the callbacks registered with this Deferred object - d.callback(request[2]) - elif message_type == RPC_ERROR: - # Recreate exception and errback'it - exception_cls = getattr(error, request[2]) - exception = exception_cls(*request[3], **request[4]) - - # Ideally we would chain the deferreds instead of instance - # checking just to log them. But, that would mean that any - # errback on the fist deferred should returns it's failure - # so it could pass back to the 2nd deferred on the chain. But, - # that does not always happen. - # So, just do some instance checking and just log rpc error at - # diferent levels. - r = self.__rpc_requests[request_id] - msg = "RPCError Message Received!" - msg += "\n" + "-" * 80 - msg += "\n" + "RPCRequest: " + r.__repr__() - msg += "\n" + "-" * 80 - if isinstance(exception, error.WrappedException): - msg += "\n" + exception.type + "\n" + exception.message + ": " - msg += exception.traceback - else: - msg += "\n" + request[5] + "\n" + request[2] + ": " - msg += str(exception) - msg += "\n" + "-" * 80 - - if not isinstance(exception, error._ClientSideRecreateError): - # Let's log these as errors - log.error(msg) - else: - # The rest just get's logged in debug level, just to log - # what's happening - log.debug(msg) - - d.errback(exception) - del self.__rpc_requests[request_id] + # The rest just get's logged in debug level, just to log + # what's happening + log.debug(msg) + + d.errback(exception) + del self.__rpc_requests[request_id] def send_request(self, request): """ @@ -220,15 +193,16 @@ class DelugeRPCProtocol(Protocol): :param request: RPCRequest """ - # Store the DelugeRPCRequest object just in case a RPCError is sent in - # response to this request. We use the extra information when printing - # out the error for debugging purposes. - self.__rpc_requests[request.request_id] = request - #log.debug("Sending RPCRequest %s: %s", request.request_id, request) - # Send the request in a tuple because multiple requests can be sent at once - data = zlib.compress(rencode.dumps((request.format_message(),))) - self.factory.bytes_sent += len(data) - self.transport.write(data) + try: + # Store the DelugeRPCRequest object just in case a RPCError is sent in + # response to this request. We use the extra information when printing + # out the error for debugging purposes. + self.__rpc_requests[request.request_id] = request + #log.debug("Sending RPCRequest %s: %s", request.request_id, request) + # Send the request in a tuple because multiple requests can be sent at once + self.transfer_message((request.format_message(),)) + except Exception, e: + log.warn("Error occured when sending message:" + str(e)) class DelugeRPCClientFactory(ClientFactory): protocol = DelugeRPCProtocol @@ -237,9 +211,6 @@ class DelugeRPCClientFactory(ClientFactory): self.daemon = daemon self.event_handlers = event_handlers - self.bytes_recv = 0 - self.bytes_sent = 0 - def startedConnecting(self, connector): log.info("Connecting to daemon at \"%s:%s\"...", connector.host, connector.port) @@ -462,10 +433,10 @@ class DaemonSSLProxy(DaemonProxy): self.disconnect_callback = cb def get_bytes_recv(self): - return self.__factory.bytes_recv + return self.protocol.get_bytes_recv() def get_bytes_sent(self): - return self.__factory.bytes_sent + return self.protocol.get_bytes_sent() class DaemonClassicProxy(DaemonProxy): def __init__(self, event_handlers=None): |