From 8e7432e71c80a47c96bf48c07535ec1991efe1d9 Mon Sep 17 00:00:00 2001 From: bendikro Date: Mon, 18 Jun 2012 20:28:32 +0200 Subject: Added a protocol for the network traffic between client and daemon. Implemented a protocol layer above twisted.internet.protocol.Protocol which guarantees correct transfer of RPC messages. The network messages are transfered with a header containing the length of the message. --- deluge/core/rpcserver.py | 62 +++----- deluge/transfer.py | 170 ++++++++++++++++++++++ deluge/ui/client.py | 191 +++++++++++------------- tests/test_transfer.py | 368 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 641 insertions(+), 150 deletions(-) create mode 100644 deluge/transfer.py create mode 100644 tests/test_transfer.py diff --git a/deluge/core/rpcserver.py b/deluge/core/rpcserver.py index b9cfba378..3ecbd39e8 100644 --- a/deluge/core/rpcserver.py +++ b/deluge/core/rpcserver.py @@ -60,6 +60,8 @@ from deluge.core.authmanager import (AUTH_LEVEL_NONE, AUTH_LEVEL_DEFAULT, from deluge.error import (DelugeError, NotAuthorizedError, WrappedException, _ClientSideRecreateError, IncompatibleClient) +from deluge.transfer import DelugeTransferProtocol + RPC_RESPONSE = 1 RPC_ERROR = 2 RPC_EVENT = 3 @@ -134,54 +136,34 @@ class ServerContextFactory(object): ctx.use_privatekey_file(os.path.join(ssl_dir, "daemon.pkey")) return ctx -class DelugeRPCProtocol(Protocol): - __buffer = None +class DelugeRPCProtocol(DelugeTransferProtocol): - def dataReceived(self, data): + def message_received(self, request): """ - This method is called whenever data is received from a client. The + This method is called whenever a message is received from a client. The only message that a client sends to the server is a RPC Request message. If the RPC Request message is valid, then the method is called in :meth:`dispatch`. - - :param data: the data from the client. It should be a zlib compressed - rencoded string. - :type data: str + + :param request: the request from the client. + :type data: tuple """ - if self.__buffer: - # We have some data from the last dataReceived() so lets prepend it - data = self.__buffer + data - self.__buffer = None - - while data: - dobj = zlib.decompressobj() - try: - request = rencode.loads(dobj.decompress(data)) - except Exception, e: - #log.debug("Received possible invalid message (%r): %s", data, e) - # This could be cut-off data, so we'll save this in the buffer - # and try to prepend it on the next dataReceived() - self.__buffer = data - return - else: - data = dobj.unused_data - - if type(request) is not tuple: - log.debug("Received invalid message: type is not tuple") - return + if type(request) is not tuple: + log.debug("Received invalid message: type is not tuple") + return - if len(request) < 1: - log.debug("Received invalid message: there are no items") - return + if len(request) < 1: + log.debug("Received invalid message: there are no items") + return - for call in request: - if len(call) != 4: - log.debug("Received invalid rpc request: number of items " - "in request is %s", len(call)) - continue - #log.debug("RPCRequest: %s", format_request(call)) - reactor.callLater(0, self.dispatch, *call) + for call in request: + if len(call) != 4: + log.debug("Received invalid rpc request: number of items " + "in request is %s", len(call)) + continue + #log.debug("RPCRequest: %s", format_request(call)) + reactor.callLater(0, self.dispatch, *call) def sendData(self, data): """ @@ -192,7 +174,7 @@ class DelugeRPCProtocol(Protocol): :type data: object """ - self.transport.write(zlib.compress(rencode.dumps(data))) + self.transfer_message(data) def connectionMade(self): """ diff --git a/deluge/transfer.py b/deluge/transfer.py new file mode 100644 index 000000000..c89f89e4f --- /dev/null +++ b/deluge/transfer.py @@ -0,0 +1,170 @@ +# -*- coding: utf-8 -*- +# +# transfer.py +# +# Copyright (C) 2012 Bro +# +# Deluge is free software. +# +# You may redistribute it and/or modify it under the terms of the +# GNU General Public License, as published by the Free Software +# Foundation; either version 3 of the License, or (at your option) +# any later version. +# +# deluge is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with deluge. If not, write to: +# The Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor +# Boston, MA 02110-1301, USA. +# +# In addition, as a special exception, the copyright holders give +# permission to link the code of portions of this program with the OpenSSL +# library. +# You must obey the GNU General Public License in all respects for all of +# the code used other than OpenSSL. If you modify file(s) with this +# exception, you may extend this exception to your version of the file(s), +# but you are not obligated to do so. If you do not wish to do so, delete +# this exception statement from your version. If you delete this exception +# statement from all source files in the program, then also delete it here. +# +# + +try: + import rencode +except ImportError: + import deluge.rencode as rencode + +import zlib +import struct + +from twisted.internet.protocol import Protocol + +from deluge.log import LOG as log + +MESSAGE_HEADER_SIZE = 5 + +class DelugeTransferProtocol(Protocol): + """ + Data messages are transfered using very a simple protocol. + Data messages are transfered with a header containing + the length of the data to be transfered (payload). + + """ + def __init__(self): + self._buffer = "" + self._message_length = 0 + self._bytes_received = 0 + self._bytes_sent = 0 + + def transfer_message(self, data): + """ + Transfer the data. + + The data will be serialized and compressed before being sent. + First a header is sent - containing the length of the compressed payload + to come as a signed integer. After the header, the payload is transfered. + + :param data: data to be transfered in a data structure serializable by rencode. + + """ + compressed = zlib.compress(rencode.dumps(data)) + size_data = len(compressed) + # Store length as a signed integer (using 4 bytes). "!" denotes network byte order. + payload_len = struct.pack("!i", size_data) + header = "D" + payload_len + self._bytes_sent += len(header) + len(compressed) + self.transport.write(header) + self.transport.write(compressed) + + def dataReceived(self, data): + """ + This method is called whenever data is received. + + :param data: a message as transfered by transfer_message, or a part of such + a messsage. + + Global variables: + _buffer - contains the data received + _message_length - the length of the payload of the current message. + + """ + self._buffer += data + self._bytes_received += len(data) + + while len(self._buffer) >= MESSAGE_HEADER_SIZE: + if self._message_length == 0: + self._handle_new_message() + # We have a complete packet + if len(self._buffer) >= self._message_length: + self._handle_complete_message(self._buffer[:self._message_length]) + # Remove message data from buffer + self._buffer = self._buffer[self._message_length:] + self._message_length = 0 + else: + break + + def _handle_new_message(self): + """ + Handle the start of a new message. This method is called only when the + beginning of the buffer contains data from a new message (i.e. the header). + + """ + try: + # Read the first bytes of the message (MESSAGE_HEADER_SIZE bytes) + header = self._buffer[:MESSAGE_HEADER_SIZE] + payload_len = header[1:MESSAGE_HEADER_SIZE] + if header[0] != 'D': + raise Exception("Invalid header format. First byte is %d" % ord(header[0])) + # Extract the length stored as a signed integer (using 4 bytes) + self._message_length = struct.unpack("!i", payload_len)[0] + if self._message_length < 0: + raise Exception("Message length is negative: %d" % self._message_length) + # Remove the header from the buffer + self._buffer = self._buffer[MESSAGE_HEADER_SIZE:] + except Exception, e: + log.warn("Error occured when parsing message header: %s." % str(e)) + log.warn("This version of Deluge cannot communicate with the sender of this data.") + self._message_length = 0 + self._buffer = "" + + def _handle_complete_message(self, data): + """ + Handles a complete message as it is transfered on the network. + + :param data: a zlib compressed string encoded with rencode. + + """ + try: + self.message_received(rencode.loads(zlib.decompress(data))) + except Exception, e: + log.warn("Failed to decompress (%d bytes) and load serialized data "\ + "with rencode: %s" % (len(data), str(e))) + + def get_bytes_recv(self): + """ + Returns the number of bytes received. + + :returns: the number of bytes received + :rtype: int + + """ + return self._bytes_received + + def get_bytes_sent(self): + """ + Returns the number of bytes sent. + + :returns: the number of bytes sent + :rtype: int + + """ + return self._bytes_sent + + def message_received(self, message): + """Override this method to receive the complete message""" + pass diff --git a/deluge/ui/client.py b/deluge/ui/client.py index 8be7aaed5..479b7367d 100644 --- a/deluge/ui/client.py +++ b/deluge/ui/client.py @@ -35,18 +35,13 @@ # import logging -from twisted.internet.protocol import Protocol, ClientFactory +from twisted.internet.protocol import ClientFactory from twisted.internet import reactor, ssl, defer -try: - import rencode -except ImportError: - import deluge.rencode as rencode - -import zlib import deluge.common from deluge import error from deluge.event import known_events +from deluge.transfer import DelugeTransferProtocol if deluge.common.windows_check(): import win32api @@ -104,11 +99,11 @@ class DelugeRPCRequest(object): return (self.request_id, self.method, self.args, self.kwargs) -class DelugeRPCProtocol(Protocol): + +class DelugeRPCProtocol(DelugeTransferProtocol): def connectionMade(self): self.__rpc_requests = {} - self.__buffer = None # Set the protocol in the daemon so it can send data self.factory.daemon.protocol = self # Get the address of the daemon that we've connected to @@ -116,102 +111,80 @@ class DelugeRPCProtocol(Protocol): self.factory.daemon.host = peer.host self.factory.daemon.port = peer.port self.factory.daemon.connected = True - log.info("Connected to daemon at %s:%s..", peer.host, peer.port) self.factory.daemon.connect_deferred.callback((peer.host, peer.port)) - def dataReceived(self, data): + def message_received(self, request): """ - This method is called whenever we receive data from the daemon. + This method is called whenever we receive a message from the daemon. + + :param request: a tuple that should be either a RPCResponse, RCPError or RPCSignal - :param data: a zlib compressed and rencoded string that should be either - a RPCResponse, RCPError or RPCSignal """ - if self.__buffer: - # We have some data from the last dataReceived() so lets prepend it - data = self.__buffer + data - self.__buffer = None + if type(request) is not tuple: + log.debug("Received invalid message: type is not tuple") + return + if len(request) < 3: + log.debug("Received invalid message: number of items in " + "response is %s", len(request)) + return + + message_type = request[0] - while data: - # Increase the byte counter - self.factory.bytes_recv += len(data) + if message_type == RPC_EVENT: + event = request[1] + #log.debug("Received RPCEvent: %s", event) + # A RPCEvent was received from the daemon so run any handlers + # associated with it. + if event in self.factory.event_handlers: + for handler in self.factory.event_handlers[event]: + reactor.callLater(0, handler, *request[2]) + return + + request_id = request[1] + + # We get the Deferred object for this request_id to either run the + # callbacks or the errbacks dependent on the response from the daemon. + d = self.factory.daemon.pop_deferred(request_id) + + if message_type == RPC_RESPONSE: + # Run the callbacks registered with this Deferred object + d.callback(request[2]) + elif message_type == RPC_ERROR: + # Recreate exception and errback'it + exception_cls = getattr(error, request[2]) + exception = exception_cls(*request[3], **request[4]) + + # Ideally we would chain the deferreds instead of instance + # checking just to log them. But, that would mean that any + # errback on the fist deferred should returns it's failure + # so it could pass back to the 2nd deferred on the chain. But, + # that does not always happen. + # So, just do some instance checking and just log rpc error at + # diferent levels. + r = self.__rpc_requests[request_id] + msg = "RPCError Message Received!" + msg += "\n" + "-" * 80 + msg += "\n" + "RPCRequest: " + r.__repr__() + msg += "\n" + "-" * 80 + if isinstance(exception, error.WrappedException): + msg += "\n" + exception.type + "\n" + exception.message + ": " + msg += exception.traceback + else: + msg += "\n" + request[5] + "\n" + request[2] + ": " + msg += str(exception) + msg += "\n" + "-" * 80 - dobj = zlib.decompressobj() - try: - request = rencode.loads(dobj.decompress(data)) - except Exception, e: - #log.debug("Received possible invalid message (%r): %s", data, e) - # This could be cut-off data, so we'll save this in the buffer - # and try to prepend it on the next dataReceived() - self.__buffer = data - return + if not isinstance(exception, error._ClientSideRecreateError): + # Let's log these as errors + log.error(msg) else: - data = dobj.unused_data - - if type(request) is not tuple: - log.debug("Received invalid message: type is not tuple") - return - if len(request) < 3: - log.debug("Received invalid message: number of items in " - "response is %s", len(3)) - return - - message_type = request[0] - - if message_type == RPC_EVENT: - event = request[1] - #log.debug("Received RPCEvent: %s", event) - # A RPCEvent was received from the daemon so run any handlers - # associated with it. - if event in self.factory.event_handlers: - for handler in self.factory.event_handlers[event]: - reactor.callLater(0, handler, *request[2]) - continue - - request_id = request[1] - - # We get the Deferred object for this request_id to either run the - # callbacks or the errbacks dependent on the response from the daemon. - d = self.factory.daemon.pop_deferred(request_id) - - if message_type == RPC_RESPONSE: - # Run the callbacks registered with this Deferred object - d.callback(request[2]) - elif message_type == RPC_ERROR: - # Recreate exception and errback'it - exception_cls = getattr(error, request[2]) - exception = exception_cls(*request[3], **request[4]) - - # Ideally we would chain the deferreds instead of instance - # checking just to log them. But, that would mean that any - # errback on the fist deferred should returns it's failure - # so it could pass back to the 2nd deferred on the chain. But, - # that does not always happen. - # So, just do some instance checking and just log rpc error at - # diferent levels. - r = self.__rpc_requests[request_id] - msg = "RPCError Message Received!" - msg += "\n" + "-" * 80 - msg += "\n" + "RPCRequest: " + r.__repr__() - msg += "\n" + "-" * 80 - if isinstance(exception, error.WrappedException): - msg += "\n" + exception.type + "\n" + exception.message + ": " - msg += exception.traceback - else: - msg += "\n" + request[5] + "\n" + request[2] + ": " - msg += str(exception) - msg += "\n" + "-" * 80 - - if not isinstance(exception, error._ClientSideRecreateError): - # Let's log these as errors - log.error(msg) - else: - # The rest just get's logged in debug level, just to log - # what's happening - log.debug(msg) - - d.errback(exception) - del self.__rpc_requests[request_id] + # The rest just get's logged in debug level, just to log + # what's happening + log.debug(msg) + + d.errback(exception) + del self.__rpc_requests[request_id] def send_request(self, request): """ @@ -220,15 +193,16 @@ class DelugeRPCProtocol(Protocol): :param request: RPCRequest """ - # Store the DelugeRPCRequest object just in case a RPCError is sent in - # response to this request. We use the extra information when printing - # out the error for debugging purposes. - self.__rpc_requests[request.request_id] = request - #log.debug("Sending RPCRequest %s: %s", request.request_id, request) - # Send the request in a tuple because multiple requests can be sent at once - data = zlib.compress(rencode.dumps((request.format_message(),))) - self.factory.bytes_sent += len(data) - self.transport.write(data) + try: + # Store the DelugeRPCRequest object just in case a RPCError is sent in + # response to this request. We use the extra information when printing + # out the error for debugging purposes. + self.__rpc_requests[request.request_id] = request + #log.debug("Sending RPCRequest %s: %s", request.request_id, request) + # Send the request in a tuple because multiple requests can be sent at once + self.transfer_message((request.format_message(),)) + except Exception, e: + log.warn("Error occured when sending message:" + str(e)) class DelugeRPCClientFactory(ClientFactory): protocol = DelugeRPCProtocol @@ -237,9 +211,6 @@ class DelugeRPCClientFactory(ClientFactory): self.daemon = daemon self.event_handlers = event_handlers - self.bytes_recv = 0 - self.bytes_sent = 0 - def startedConnecting(self, connector): log.info("Connecting to daemon at \"%s:%s\"...", connector.host, connector.port) @@ -462,10 +433,10 @@ class DaemonSSLProxy(DaemonProxy): self.disconnect_callback = cb def get_bytes_recv(self): - return self.__factory.bytes_recv + return self.protocol.get_bytes_recv() def get_bytes_sent(self): - return self.__factory.bytes_sent + return self.protocol.get_bytes_sent() class DaemonClassicProxy(DaemonProxy): def __init__(self, event_handlers=None): diff --git a/tests/test_transfer.py b/tests/test_transfer.py new file mode 100644 index 000000000..cd9a271d5 --- /dev/null +++ b/tests/test_transfer.py @@ -0,0 +1,368 @@ +# -*- coding: utf-8 -*- +# +# test_transfer.py +# +# Copyright (C) 2012 Bro +# +# Deluge is free software. +# +# You may redistribute it and/or modify it under the terms of the +# GNU General Public License, as published by the Free Software +# Foundation; either version 3 of the License, or (at your option) +# any later version. +# +# deluge is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# See the GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with deluge. If not, write to: +# The Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor +# Boston, MA 02110-1301, USA. +# +# In addition, as a special exception, the copyright holders give +# permission to link the code of portions of this program with the OpenSSL +# library. +# You must obey the GNU General Public License in all respects for all of +# the code used other than OpenSSL. If you modify file(s) with this +# exception, you may extend this exception to your version of the file(s), +# but you are not obligated to do so. If you do not wish to do so, delete +# this exception statement from your version. If you delete this exception +# statement from all source files in the program, then also delete it here. +# + +from twisted.trial import unittest + +from deluge.transfer import DelugeTransferProtocol + +import base64 + +import deluge.rencode as rencode + +class TransferTestClass(DelugeTransferProtocol): + + def __init__(self): + DelugeTransferProtocol.__init__(self) + self.transport = self + self.messages_out = [] + self.messages_in = [] + self.packet_count = 0 + + def write(self, message): + """ + Called by DelugeTransferProtocol class + This simulates the write method of the self.transport in DelugeTransferProtocol. + """ + self.messages_out.append(message) + + def message_received(self, message): + """ + This method overrides message_received is DelugeTransferProtocol and is + called with the complete message as it was sent by DelugeRPCProtocol + """ + self.messages_in.append(message) + + def get_messages_out_joined(self): + return b"".join(self.messages_out) + + def get_messages_in(self): + return self.messages_in + + def dataReceived_old_protocol(self, data): + """ + This is the original method logic (as close as possible) for handling data receival on the client + + :param data: a zlib compressed string encoded with rencode. + + """ + from datetime import timedelta + import zlib + print "\n=== New Data Received ===\nBytes received:", len(data) + + if self._buffer: + # We have some data from the last dataReceived() so lets prepend it + print "Current buffer:", len(self._buffer) if self._buffer else "0" + data = self._buffer + data + self._buffer = None + + self.packet_count += 1 + self._bytes_received += len(data) + + while data: + print "\n-- Handle packet data --" + + print "Bytes received:", self._bytes_received + print "Current data:", len(data) + + if self._message_length == 0: + # handle_new_message uses _buffer so set data to _buffer. + self._buffer = data + self._handle_new_message() + data = self._buffer + self._buffer = None + self.packet_count = 1 + print "New message of length:", self._message_length + + dobj = zlib.decompressobj() + try: + request = rencode.loads(dobj.decompress(data)) + print "Successfully loaded message", + print " - Buffer length: %d, data length: %d, unused length: %d" % (len(data), \ + len(data) - len(dobj.unused_data), len(dobj.unused_data)) + print "Packet count:", self.packet_count + except Exception, e: + #log.debug("Received possible invalid message (%r): %s", data, e) + # This could be cut-off data, so we'll save this in the buffer + # and try to prepend it on the next dataReceived() + self._buffer = data + print "Failed to load buffer (size %d): %s" % (len(self._buffer), str(e)) + return + else: + data = dobj.unused_data + self._message_length = 0 + + self.message_received(request) + +class DelugeTransferProtocolTestCase(unittest.TestCase): + + def setUp(self): + """ + The expected messages corresponds to the test messages (msg1, msg2) after they've been processed + by DelugeTransferProtocol.send, which means that they've first been encoded with pickle, + and then compressed with zlib. + The expected messages are encoded in base64 to easily including it here in the source. + So before comparing the results with the expected messages, the expected messages must be decoded, + or the result message be encoded in base64. + + """ + self.transfer = TransferTestClass() + self.msg1 = (0, 1, {"key_int": 1242429423}, {"key_str": "some string"}, {"key_bool": True}) + self.msg2 = (2, 3, {"key_float": 12424.29423}, + {"key_unicode": u"some string"}, + {"key_dict_with_tuple": {"key_tuple": (1, 2, 3)}}, + {"keylist": [4, "5", 6.7]}) + + self.msg1_expected_compressed_base64 = "RAAAADF4nDvKwJjenp1aGZ+ZV+Lgxfv9PYRXXFLU"\ + "XZyfm6oAZGTmpad3gAST8vNznAEAJhSQ" + + self.msg2_expected_compressed_base64 = "RAAAAF14nDvGxJzemZ1aGZ+Wk59Y4uTmpKib3g3i"\ + "l+ZlJuenpHYX5+emKhSXFGXmpadPBkmkZCaXxJdn"\ + "lmTEl5QW5KRCdIOZhxmBhrUDuTmZxSWHWRpNnRyu"\ + "paUBAHYlJxI=" + + def test_send_one_message(self): + """ + Send one message and test that it has been sent correctoly to the + method 'write' in self.transport. + + """ + self.transfer.transfer_message(self.msg1) + # Get the data as sent by DelugeTransferProtocol + messages = self.transfer.get_messages_out_joined() + base64_encoded = base64.b64encode(messages) + self.assertEquals(base64_encoded, self.msg1_expected_compressed_base64) + + def test_receive_one_message(self): + """ + Receive one message and test that it has been sent to the + method 'message_received'. + + """ + self.transfer.dataReceived(base64.b64decode(self.msg1_expected_compressed_base64)) + # Get the data as sent by DelugeTransferProtocol + messages = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(messages)) + + def test_receive_old_message(self): + """ + Receive an old message (with no header) and verify that the data is discarded. + + """ + self.transfer.dataReceived(rencode.dumps(self.msg1)) + self.assertEquals(len(self.transfer.get_messages_in()), 0) + self.assertEquals(self.transfer._message_length, 0) + self.assertEquals(len(self.transfer._buffer), 0) + + def test_receive_two_concatenated_messages(self): + """ + This test simply concatenates two messsages (as they're sent over the network), + and lets DelugeTransferProtocol receive the data as one string. + + """ + two_concatenated = base64.b64decode(self.msg1_expected_compressed_base64) + base64.b64decode(self.msg2_expected_compressed_base64) + self.transfer.dataReceived(two_concatenated) + + # Get the data as sent by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1)) + message2 = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2)) + + def test_receive_three_messages_in_parts(self): + """ + This test concatenates three messsages (as they're sent over the network), + and lets DelugeTransferProtocol receive the data in multiple parts. + + """ + msg_bytes = base64.b64decode(self.msg1_expected_compressed_base64) + \ + base64.b64decode(self.msg2_expected_compressed_base64) + \ + base64.b64decode(self.msg1_expected_compressed_base64) + packet_size = 40 + + one_message_byte_count = len(base64.b64decode(self.msg1_expected_compressed_base64)) + two_messages_byte_count = one_message_byte_count + len(base64.b64decode(self.msg2_expected_compressed_base64)) + three_messages_byte_count = two_messages_byte_count + len(base64.b64decode(self.msg1_expected_compressed_base64)) + + for d in self.receive_parts_helper(msg_bytes, packet_size): + bytes_received = self.transfer.get_bytes_recv() + + if bytes_received >= three_messages_byte_count: + expected_msgs_received_count = 3 + elif bytes_received >= two_messages_byte_count: + expected_msgs_received_count = 2 + elif bytes_received >= one_message_byte_count: + expected_msgs_received_count = 1 + else: + expected_msgs_received_count = 0 + # Verify that the expected number of complete messages has arrived + self.assertEquals(expected_msgs_received_count, len(self.transfer.get_messages_in())) + + # Get the data as received by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1)) + message2 = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2)) + message3 = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message3)) + + + # Remove underscore to enable test, or run the test directly: + # tests $ trial test_transfer.DelugeTransferProtocolTestCase._test_rencode_fail_protocol + def _test_rencode_fail_protocol(self): + """ + This test tries to test the protocol that relies on errors from rencode. + + """ + msg_bytes = base64.b64decode(self.msg1_expected_compressed_base64) + \ + base64.b64decode(self.msg2_expected_compressed_base64) + \ + base64.b64decode(self.msg1_expected_compressed_base64) + packet_size = 149 + + one_message_byte_count = len(base64.b64decode(self.msg1_expected_compressed_base64)) + two_messages_byte_count = one_message_byte_count + len(base64.b64decode(self.msg2_expected_compressed_base64)) + three_messages_byte_count = two_messages_byte_count + len(base64.b64decode(self.msg1_expected_compressed_base64)) + + print + + print "Msg1 size:", len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4 + print "Msg2 size:", len(base64.b64decode(self.msg2_expected_compressed_base64)) - 4 + print "Msg3 size:", len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4 + + print "one_message_byte_count:", one_message_byte_count + print "two_messages_byte_count:", two_messages_byte_count + print "three_messages_byte_count:", three_messages_byte_count + + for d in self.receive_parts_helper(msg_bytes, packet_size, self.transfer.dataReceived_old_protocol): + bytes_received = self.transfer.get_bytes_recv() + + if bytes_received >= three_messages_byte_count: + expected_msgs_received_count = 3 + elif bytes_received >= two_messages_byte_count: + expected_msgs_received_count = 2 + elif bytes_received >= one_message_byte_count: + expected_msgs_received_count = 1 + else: + expected_msgs_received_count = 0 + # Verify that the expected number of complete messages has arrived + if expected_msgs_received_count != len(self.transfer.get_messages_in()): + print "Expected number of messages received is %d, but %d have been received."\ + % (expected_msgs_received_count, len(self.transfer.get_messages_in())) + + # Get the data as received by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1)) + message2 = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2)) + message3 = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message3)) + + + def test_receive_middle_of_header(self): + """ + This test concatenates two messsages (as they're sent over the network), + and lets DelugeTransferProtocol receive the data in two parts. + The first part contains the first message, plus two bytes of the next message. + The next part contains the rest of the message. + + This is a special case, as DelugeTransferProtocol can't start parsing + a message until it has at least 4 bytes (the size of the header) to be able + to read and parse the size of the payload. + + """ + two_concatenated = base64.b64decode(self.msg1_expected_compressed_base64) + base64.b64decode(self.msg2_expected_compressed_base64) + first_len = len(base64.b64decode(self.msg1_expected_compressed_base64)) + + # Now found the entire first message, and half the header of the next message (2 bytes into the header) + self.transfer.dataReceived(two_concatenated[:first_len+2]) + + # Should be 1 message in the list + self.assertEquals(1, len(self.transfer.get_messages_in())) + + # Send the rest + self.transfer.dataReceived(two_concatenated[first_len+2:]) + + # Should be 2 messages in the list + self.assertEquals(2, len(self.transfer.get_messages_in())) + + # Get the data as sent by DelugeTransferProtocol + message1 = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1)) + message2 = self.transfer.get_messages_in().pop(0) + self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2)) + + + # Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon + #def test_simulate_big_transfer(self): + # filename = "../deluge.torrentlist" + # + # f = open(filename, "r") + # data = f.read() + # message_to_send = eval(data) + # self.transfer.transfer_message(message_to_send) + # + # # Get the data as sent to the network by DelugeTransferProtocol + # compressed_data = self.transfer.get_messages_out_joined() + # packet_size = 16000 # Or something smaller... + # + # for d in self.receive_parts_helper(compressed_data, packet_size): + # bytes_recv = self.transfer.get_bytes_recv() + # if bytes_recv < len(compressed_data): + # self.assertEquals(len(self.transfer.get_messages_in()), 0) + # else: + # self.assertEquals(len(self.transfer.get_messages_in()), 1) + # # Get the data as received by DelugeTransferProtocol + # transfered_message = self.transfer.get_messages_in().pop(0) + # # Test that the data structures are equal + # #self.assertEquals(transfered_message, message_to_send) + # #self.assertTrue(transfered_message == message_to_send) + # + # #f.close() + # #f = open("rencode.torrentlist", "w") + # #f.write(str(transfered_message)) + # #f.close() + + def receive_parts_helper(self, data, packet_size, receive_func=None): + byte_count = len(data) + sent_bytes = 0 + while byte_count > 0: + to_receive = packet_size if byte_count > packet_size else byte_count + sent_bytes += to_receive + byte_count -= to_receive + if receive_func: + receive_func(data[:to_receive]) + else: + self.transfer.dataReceived(data[:to_receive]) + data = data[to_receive:] + yield -- cgit