summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--deluge/core/rpcserver.py62
-rw-r--r--deluge/transfer.py170
-rw-r--r--deluge/ui/client.py191
-rw-r--r--tests/test_transfer.py368
4 files changed, 641 insertions, 150 deletions
diff --git a/deluge/core/rpcserver.py b/deluge/core/rpcserver.py
index b9cfba378..3ecbd39e8 100644
--- a/deluge/core/rpcserver.py
+++ b/deluge/core/rpcserver.py
@@ -60,6 +60,8 @@ from deluge.core.authmanager import (AUTH_LEVEL_NONE, AUTH_LEVEL_DEFAULT,
from deluge.error import (DelugeError, NotAuthorizedError, WrappedException,
_ClientSideRecreateError, IncompatibleClient)
+from deluge.transfer import DelugeTransferProtocol
+
RPC_RESPONSE = 1
RPC_ERROR = 2
RPC_EVENT = 3
@@ -134,54 +136,34 @@ class ServerContextFactory(object):
ctx.use_privatekey_file(os.path.join(ssl_dir, "daemon.pkey"))
return ctx
-class DelugeRPCProtocol(Protocol):
- __buffer = None
+class DelugeRPCProtocol(DelugeTransferProtocol):
- def dataReceived(self, data):
+ def message_received(self, request):
"""
- This method is called whenever data is received from a client. The
+ This method is called whenever a message is received from a client. The
only message that a client sends to the server is a RPC Request message.
If the RPC Request message is valid, then the method is called in
:meth:`dispatch`.
-
- :param data: the data from the client. It should be a zlib compressed
- rencoded string.
- :type data: str
+
+ :param request: the request from the client.
+ :type data: tuple
"""
- if self.__buffer:
- # We have some data from the last dataReceived() so lets prepend it
- data = self.__buffer + data
- self.__buffer = None
-
- while data:
- dobj = zlib.decompressobj()
- try:
- request = rencode.loads(dobj.decompress(data))
- except Exception, e:
- #log.debug("Received possible invalid message (%r): %s", data, e)
- # This could be cut-off data, so we'll save this in the buffer
- # and try to prepend it on the next dataReceived()
- self.__buffer = data
- return
- else:
- data = dobj.unused_data
-
- if type(request) is not tuple:
- log.debug("Received invalid message: type is not tuple")
- return
+ if type(request) is not tuple:
+ log.debug("Received invalid message: type is not tuple")
+ return
- if len(request) < 1:
- log.debug("Received invalid message: there are no items")
- return
+ if len(request) < 1:
+ log.debug("Received invalid message: there are no items")
+ return
- for call in request:
- if len(call) != 4:
- log.debug("Received invalid rpc request: number of items "
- "in request is %s", len(call))
- continue
- #log.debug("RPCRequest: %s", format_request(call))
- reactor.callLater(0, self.dispatch, *call)
+ for call in request:
+ if len(call) != 4:
+ log.debug("Received invalid rpc request: number of items "
+ "in request is %s", len(call))
+ continue
+ #log.debug("RPCRequest: %s", format_request(call))
+ reactor.callLater(0, self.dispatch, *call)
def sendData(self, data):
"""
@@ -192,7 +174,7 @@ class DelugeRPCProtocol(Protocol):
:type data: object
"""
- self.transport.write(zlib.compress(rencode.dumps(data)))
+ self.transfer_message(data)
def connectionMade(self):
"""
diff --git a/deluge/transfer.py b/deluge/transfer.py
new file mode 100644
index 000000000..c89f89e4f
--- /dev/null
+++ b/deluge/transfer.py
@@ -0,0 +1,170 @@
+# -*- coding: utf-8 -*-
+#
+# transfer.py
+#
+# Copyright (C) 2012 Bro <bro.development@gmail.com>
+#
+# Deluge is free software.
+#
+# You may redistribute it and/or modify it under the terms of the
+# GNU General Public License, as published by the Free Software
+# Foundation; either version 3 of the License, or (at your option)
+# any later version.
+#
+# deluge is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+# See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with deluge. If not, write to:
+# The Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor
+# Boston, MA 02110-1301, USA.
+#
+# In addition, as a special exception, the copyright holders give
+# permission to link the code of portions of this program with the OpenSSL
+# library.
+# You must obey the GNU General Public License in all respects for all of
+# the code used other than OpenSSL. If you modify file(s) with this
+# exception, you may extend this exception to your version of the file(s),
+# but you are not obligated to do so. If you do not wish to do so, delete
+# this exception statement from your version. If you delete this exception
+# statement from all source files in the program, then also delete it here.
+#
+#
+
+try:
+ import rencode
+except ImportError:
+ import deluge.rencode as rencode
+
+import zlib
+import struct
+
+from twisted.internet.protocol import Protocol
+
+from deluge.log import LOG as log
+
+MESSAGE_HEADER_SIZE = 5
+
+class DelugeTransferProtocol(Protocol):
+ """
+ Data messages are transfered using very a simple protocol.
+ Data messages are transfered with a header containing
+ the length of the data to be transfered (payload).
+
+ """
+ def __init__(self):
+ self._buffer = ""
+ self._message_length = 0
+ self._bytes_received = 0
+ self._bytes_sent = 0
+
+ def transfer_message(self, data):
+ """
+ Transfer the data.
+
+ The data will be serialized and compressed before being sent.
+ First a header is sent - containing the length of the compressed payload
+ to come as a signed integer. After the header, the payload is transfered.
+
+ :param data: data to be transfered in a data structure serializable by rencode.
+
+ """
+ compressed = zlib.compress(rencode.dumps(data))
+ size_data = len(compressed)
+ # Store length as a signed integer (using 4 bytes). "!" denotes network byte order.
+ payload_len = struct.pack("!i", size_data)
+ header = "D" + payload_len
+ self._bytes_sent += len(header) + len(compressed)
+ self.transport.write(header)
+ self.transport.write(compressed)
+
+ def dataReceived(self, data):
+ """
+ This method is called whenever data is received.
+
+ :param data: a message as transfered by transfer_message, or a part of such
+ a messsage.
+
+ Global variables:
+ _buffer - contains the data received
+ _message_length - the length of the payload of the current message.
+
+ """
+ self._buffer += data
+ self._bytes_received += len(data)
+
+ while len(self._buffer) >= MESSAGE_HEADER_SIZE:
+ if self._message_length == 0:
+ self._handle_new_message()
+ # We have a complete packet
+ if len(self._buffer) >= self._message_length:
+ self._handle_complete_message(self._buffer[:self._message_length])
+ # Remove message data from buffer
+ self._buffer = self._buffer[self._message_length:]
+ self._message_length = 0
+ else:
+ break
+
+ def _handle_new_message(self):
+ """
+ Handle the start of a new message. This method is called only when the
+ beginning of the buffer contains data from a new message (i.e. the header).
+
+ """
+ try:
+ # Read the first bytes of the message (MESSAGE_HEADER_SIZE bytes)
+ header = self._buffer[:MESSAGE_HEADER_SIZE]
+ payload_len = header[1:MESSAGE_HEADER_SIZE]
+ if header[0] != 'D':
+ raise Exception("Invalid header format. First byte is %d" % ord(header[0]))
+ # Extract the length stored as a signed integer (using 4 bytes)
+ self._message_length = struct.unpack("!i", payload_len)[0]
+ if self._message_length < 0:
+ raise Exception("Message length is negative: %d" % self._message_length)
+ # Remove the header from the buffer
+ self._buffer = self._buffer[MESSAGE_HEADER_SIZE:]
+ except Exception, e:
+ log.warn("Error occured when parsing message header: %s." % str(e))
+ log.warn("This version of Deluge cannot communicate with the sender of this data.")
+ self._message_length = 0
+ self._buffer = ""
+
+ def _handle_complete_message(self, data):
+ """
+ Handles a complete message as it is transfered on the network.
+
+ :param data: a zlib compressed string encoded with rencode.
+
+ """
+ try:
+ self.message_received(rencode.loads(zlib.decompress(data)))
+ except Exception, e:
+ log.warn("Failed to decompress (%d bytes) and load serialized data "\
+ "with rencode: %s" % (len(data), str(e)))
+
+ def get_bytes_recv(self):
+ """
+ Returns the number of bytes received.
+
+ :returns: the number of bytes received
+ :rtype: int
+
+ """
+ return self._bytes_received
+
+ def get_bytes_sent(self):
+ """
+ Returns the number of bytes sent.
+
+ :returns: the number of bytes sent
+ :rtype: int
+
+ """
+ return self._bytes_sent
+
+ def message_received(self, message):
+ """Override this method to receive the complete message"""
+ pass
diff --git a/deluge/ui/client.py b/deluge/ui/client.py
index 8be7aaed5..479b7367d 100644
--- a/deluge/ui/client.py
+++ b/deluge/ui/client.py
@@ -35,18 +35,13 @@
#
import logging
-from twisted.internet.protocol import Protocol, ClientFactory
+from twisted.internet.protocol import ClientFactory
from twisted.internet import reactor, ssl, defer
-try:
- import rencode
-except ImportError:
- import deluge.rencode as rencode
-
-import zlib
import deluge.common
from deluge import error
from deluge.event import known_events
+from deluge.transfer import DelugeTransferProtocol
if deluge.common.windows_check():
import win32api
@@ -104,11 +99,11 @@ class DelugeRPCRequest(object):
return (self.request_id, self.method, self.args, self.kwargs)
-class DelugeRPCProtocol(Protocol):
+
+class DelugeRPCProtocol(DelugeTransferProtocol):
def connectionMade(self):
self.__rpc_requests = {}
- self.__buffer = None
# Set the protocol in the daemon so it can send data
self.factory.daemon.protocol = self
# Get the address of the daemon that we've connected to
@@ -116,102 +111,80 @@ class DelugeRPCProtocol(Protocol):
self.factory.daemon.host = peer.host
self.factory.daemon.port = peer.port
self.factory.daemon.connected = True
-
log.info("Connected to daemon at %s:%s..", peer.host, peer.port)
self.factory.daemon.connect_deferred.callback((peer.host, peer.port))
- def dataReceived(self, data):
+ def message_received(self, request):
"""
- This method is called whenever we receive data from the daemon.
+ This method is called whenever we receive a message from the daemon.
+
+ :param request: a tuple that should be either a RPCResponse, RCPError or RPCSignal
- :param data: a zlib compressed and rencoded string that should be either
- a RPCResponse, RCPError or RPCSignal
"""
- if self.__buffer:
- # We have some data from the last dataReceived() so lets prepend it
- data = self.__buffer + data
- self.__buffer = None
+ if type(request) is not tuple:
+ log.debug("Received invalid message: type is not tuple")
+ return
+ if len(request) < 3:
+ log.debug("Received invalid message: number of items in "
+ "response is %s", len(request))
+ return
+
+ message_type = request[0]
- while data:
- # Increase the byte counter
- self.factory.bytes_recv += len(data)
+ if message_type == RPC_EVENT:
+ event = request[1]
+ #log.debug("Received RPCEvent: %s", event)
+ # A RPCEvent was received from the daemon so run any handlers
+ # associated with it.
+ if event in self.factory.event_handlers:
+ for handler in self.factory.event_handlers[event]:
+ reactor.callLater(0, handler, *request[2])
+ return
+
+ request_id = request[1]
+
+ # We get the Deferred object for this request_id to either run the
+ # callbacks or the errbacks dependent on the response from the daemon.
+ d = self.factory.daemon.pop_deferred(request_id)
+
+ if message_type == RPC_RESPONSE:
+ # Run the callbacks registered with this Deferred object
+ d.callback(request[2])
+ elif message_type == RPC_ERROR:
+ # Recreate exception and errback'it
+ exception_cls = getattr(error, request[2])
+ exception = exception_cls(*request[3], **request[4])
+
+ # Ideally we would chain the deferreds instead of instance
+ # checking just to log them. But, that would mean that any
+ # errback on the fist deferred should returns it's failure
+ # so it could pass back to the 2nd deferred on the chain. But,
+ # that does not always happen.
+ # So, just do some instance checking and just log rpc error at
+ # diferent levels.
+ r = self.__rpc_requests[request_id]
+ msg = "RPCError Message Received!"
+ msg += "\n" + "-" * 80
+ msg += "\n" + "RPCRequest: " + r.__repr__()
+ msg += "\n" + "-" * 80
+ if isinstance(exception, error.WrappedException):
+ msg += "\n" + exception.type + "\n" + exception.message + ": "
+ msg += exception.traceback
+ else:
+ msg += "\n" + request[5] + "\n" + request[2] + ": "
+ msg += str(exception)
+ msg += "\n" + "-" * 80
- dobj = zlib.decompressobj()
- try:
- request = rencode.loads(dobj.decompress(data))
- except Exception, e:
- #log.debug("Received possible invalid message (%r): %s", data, e)
- # This could be cut-off data, so we'll save this in the buffer
- # and try to prepend it on the next dataReceived()
- self.__buffer = data
- return
+ if not isinstance(exception, error._ClientSideRecreateError):
+ # Let's log these as errors
+ log.error(msg)
else:
- data = dobj.unused_data
-
- if type(request) is not tuple:
- log.debug("Received invalid message: type is not tuple")
- return
- if len(request) < 3:
- log.debug("Received invalid message: number of items in "
- "response is %s", len(3))
- return
-
- message_type = request[0]
-
- if message_type == RPC_EVENT:
- event = request[1]
- #log.debug("Received RPCEvent: %s", event)
- # A RPCEvent was received from the daemon so run any handlers
- # associated with it.
- if event in self.factory.event_handlers:
- for handler in self.factory.event_handlers[event]:
- reactor.callLater(0, handler, *request[2])
- continue
-
- request_id = request[1]
-
- # We get the Deferred object for this request_id to either run the
- # callbacks or the errbacks dependent on the response from the daemon.
- d = self.factory.daemon.pop_deferred(request_id)
-
- if message_type == RPC_RESPONSE:
- # Run the callbacks registered with this Deferred object
- d.callback(request[2])
- elif message_type == RPC_ERROR:
- # Recreate exception and errback'it
- exception_cls = getattr(error, request[2])
- exception = exception_cls(*request[3], **request[4])
-
- # Ideally we would chain the deferreds instead of instance
- # checking just to log them. But, that would mean that any
- # errback on the fist deferred should returns it's failure
- # so it could pass back to the 2nd deferred on the chain. But,
- # that does not always happen.
- # So, just do some instance checking and just log rpc error at
- # diferent levels.
- r = self.__rpc_requests[request_id]
- msg = "RPCError Message Received!"
- msg += "\n" + "-" * 80
- msg += "\n" + "RPCRequest: " + r.__repr__()
- msg += "\n" + "-" * 80
- if isinstance(exception, error.WrappedException):
- msg += "\n" + exception.type + "\n" + exception.message + ": "
- msg += exception.traceback
- else:
- msg += "\n" + request[5] + "\n" + request[2] + ": "
- msg += str(exception)
- msg += "\n" + "-" * 80
-
- if not isinstance(exception, error._ClientSideRecreateError):
- # Let's log these as errors
- log.error(msg)
- else:
- # The rest just get's logged in debug level, just to log
- # what's happening
- log.debug(msg)
-
- d.errback(exception)
- del self.__rpc_requests[request_id]
+ # The rest just get's logged in debug level, just to log
+ # what's happening
+ log.debug(msg)
+
+ d.errback(exception)
+ del self.__rpc_requests[request_id]
def send_request(self, request):
"""
@@ -220,15 +193,16 @@ class DelugeRPCProtocol(Protocol):
:param request: RPCRequest
"""
- # Store the DelugeRPCRequest object just in case a RPCError is sent in
- # response to this request. We use the extra information when printing
- # out the error for debugging purposes.
- self.__rpc_requests[request.request_id] = request
- #log.debug("Sending RPCRequest %s: %s", request.request_id, request)
- # Send the request in a tuple because multiple requests can be sent at once
- data = zlib.compress(rencode.dumps((request.format_message(),)))
- self.factory.bytes_sent += len(data)
- self.transport.write(data)
+ try:
+ # Store the DelugeRPCRequest object just in case a RPCError is sent in
+ # response to this request. We use the extra information when printing
+ # out the error for debugging purposes.
+ self.__rpc_requests[request.request_id] = request
+ #log.debug("Sending RPCRequest %s: %s", request.request_id, request)
+ # Send the request in a tuple because multiple requests can be sent at once
+ self.transfer_message((request.format_message(),))
+ except Exception, e:
+ log.warn("Error occured when sending message:" + str(e))
class DelugeRPCClientFactory(ClientFactory):
protocol = DelugeRPCProtocol
@@ -237,9 +211,6 @@ class DelugeRPCClientFactory(ClientFactory):
self.daemon = daemon
self.event_handlers = event_handlers
- self.bytes_recv = 0
- self.bytes_sent = 0
-
def startedConnecting(self, connector):
log.info("Connecting to daemon at \"%s:%s\"...",
connector.host, connector.port)
@@ -462,10 +433,10 @@ class DaemonSSLProxy(DaemonProxy):
self.disconnect_callback = cb
def get_bytes_recv(self):
- return self.__factory.bytes_recv
+ return self.protocol.get_bytes_recv()
def get_bytes_sent(self):
- return self.__factory.bytes_sent
+ return self.protocol.get_bytes_sent()
class DaemonClassicProxy(DaemonProxy):
def __init__(self, event_handlers=None):
diff --git a/tests/test_transfer.py b/tests/test_transfer.py
new file mode 100644
index 000000000..cd9a271d5
--- /dev/null
+++ b/tests/test_transfer.py
@@ -0,0 +1,368 @@
+# -*- coding: utf-8 -*-
+#
+# test_transfer.py
+#
+# Copyright (C) 2012 Bro <bro.development@gmail.com>
+#
+# Deluge is free software.
+#
+# You may redistribute it and/or modify it under the terms of the
+# GNU General Public License, as published by the Free Software
+# Foundation; either version 3 of the License, or (at your option)
+# any later version.
+#
+# deluge is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+# See the GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with deluge. If not, write to:
+# The Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor
+# Boston, MA 02110-1301, USA.
+#
+# In addition, as a special exception, the copyright holders give
+# permission to link the code of portions of this program with the OpenSSL
+# library.
+# You must obey the GNU General Public License in all respects for all of
+# the code used other than OpenSSL. If you modify file(s) with this
+# exception, you may extend this exception to your version of the file(s),
+# but you are not obligated to do so. If you do not wish to do so, delete
+# this exception statement from your version. If you delete this exception
+# statement from all source files in the program, then also delete it here.
+#
+
+from twisted.trial import unittest
+
+from deluge.transfer import DelugeTransferProtocol
+
+import base64
+
+import deluge.rencode as rencode
+
+class TransferTestClass(DelugeTransferProtocol):
+
+ def __init__(self):
+ DelugeTransferProtocol.__init__(self)
+ self.transport = self
+ self.messages_out = []
+ self.messages_in = []
+ self.packet_count = 0
+
+ def write(self, message):
+ """
+ Called by DelugeTransferProtocol class
+ This simulates the write method of the self.transport in DelugeTransferProtocol.
+ """
+ self.messages_out.append(message)
+
+ def message_received(self, message):
+ """
+ This method overrides message_received is DelugeTransferProtocol and is
+ called with the complete message as it was sent by DelugeRPCProtocol
+ """
+ self.messages_in.append(message)
+
+ def get_messages_out_joined(self):
+ return b"".join(self.messages_out)
+
+ def get_messages_in(self):
+ return self.messages_in
+
+ def dataReceived_old_protocol(self, data):
+ """
+ This is the original method logic (as close as possible) for handling data receival on the client
+
+ :param data: a zlib compressed string encoded with rencode.
+
+ """
+ from datetime import timedelta
+ import zlib
+ print "\n=== New Data Received ===\nBytes received:", len(data)
+
+ if self._buffer:
+ # We have some data from the last dataReceived() so lets prepend it
+ print "Current buffer:", len(self._buffer) if self._buffer else "0"
+ data = self._buffer + data
+ self._buffer = None
+
+ self.packet_count += 1
+ self._bytes_received += len(data)
+
+ while data:
+ print "\n-- Handle packet data --"
+
+ print "Bytes received:", self._bytes_received
+ print "Current data:", len(data)
+
+ if self._message_length == 0:
+ # handle_new_message uses _buffer so set data to _buffer.
+ self._buffer = data
+ self._handle_new_message()
+ data = self._buffer
+ self._buffer = None
+ self.packet_count = 1
+ print "New message of length:", self._message_length
+
+ dobj = zlib.decompressobj()
+ try:
+ request = rencode.loads(dobj.decompress(data))
+ print "Successfully loaded message",
+ print " - Buffer length: %d, data length: %d, unused length: %d" % (len(data), \
+ len(data) - len(dobj.unused_data), len(dobj.unused_data))
+ print "Packet count:", self.packet_count
+ except Exception, e:
+ #log.debug("Received possible invalid message (%r): %s", data, e)
+ # This could be cut-off data, so we'll save this in the buffer
+ # and try to prepend it on the next dataReceived()
+ self._buffer = data
+ print "Failed to load buffer (size %d): %s" % (len(self._buffer), str(e))
+ return
+ else:
+ data = dobj.unused_data
+ self._message_length = 0
+
+ self.message_received(request)
+
+class DelugeTransferProtocolTestCase(unittest.TestCase):
+
+ def setUp(self):
+ """
+ The expected messages corresponds to the test messages (msg1, msg2) after they've been processed
+ by DelugeTransferProtocol.send, which means that they've first been encoded with pickle,
+ and then compressed with zlib.
+ The expected messages are encoded in base64 to easily including it here in the source.
+ So before comparing the results with the expected messages, the expected messages must be decoded,
+ or the result message be encoded in base64.
+
+ """
+ self.transfer = TransferTestClass()
+ self.msg1 = (0, 1, {"key_int": 1242429423}, {"key_str": "some string"}, {"key_bool": True})
+ self.msg2 = (2, 3, {"key_float": 12424.29423},
+ {"key_unicode": u"some string"},
+ {"key_dict_with_tuple": {"key_tuple": (1, 2, 3)}},
+ {"keylist": [4, "5", 6.7]})
+
+ self.msg1_expected_compressed_base64 = "RAAAADF4nDvKwJjenp1aGZ+ZV+Lgxfv9PYRXXFLU"\
+ "XZyfm6oAZGTmpad3gAST8vNznAEAJhSQ"
+
+ self.msg2_expected_compressed_base64 = "RAAAAF14nDvGxJzemZ1aGZ+Wk59Y4uTmpKib3g3i"\
+ "l+ZlJuenpHYX5+emKhSXFGXmpadPBkmkZCaXxJdn"\
+ "lmTEl5QW5KRCdIOZhxmBhrUDuTmZxSWHWRpNnRyu"\
+ "paUBAHYlJxI="
+
+ def test_send_one_message(self):
+ """
+ Send one message and test that it has been sent correctoly to the
+ method 'write' in self.transport.
+
+ """
+ self.transfer.transfer_message(self.msg1)
+ # Get the data as sent by DelugeTransferProtocol
+ messages = self.transfer.get_messages_out_joined()
+ base64_encoded = base64.b64encode(messages)
+ self.assertEquals(base64_encoded, self.msg1_expected_compressed_base64)
+
+ def test_receive_one_message(self):
+ """
+ Receive one message and test that it has been sent to the
+ method 'message_received'.
+
+ """
+ self.transfer.dataReceived(base64.b64decode(self.msg1_expected_compressed_base64))
+ # Get the data as sent by DelugeTransferProtocol
+ messages = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(messages))
+
+ def test_receive_old_message(self):
+ """
+ Receive an old message (with no header) and verify that the data is discarded.
+
+ """
+ self.transfer.dataReceived(rencode.dumps(self.msg1))
+ self.assertEquals(len(self.transfer.get_messages_in()), 0)
+ self.assertEquals(self.transfer._message_length, 0)
+ self.assertEquals(len(self.transfer._buffer), 0)
+
+ def test_receive_two_concatenated_messages(self):
+ """
+ This test simply concatenates two messsages (as they're sent over the network),
+ and lets DelugeTransferProtocol receive the data as one string.
+
+ """
+ two_concatenated = base64.b64decode(self.msg1_expected_compressed_base64) + base64.b64decode(self.msg2_expected_compressed_base64)
+ self.transfer.dataReceived(two_concatenated)
+
+ # Get the data as sent by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1))
+ message2 = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2))
+
+ def test_receive_three_messages_in_parts(self):
+ """
+ This test concatenates three messsages (as they're sent over the network),
+ and lets DelugeTransferProtocol receive the data in multiple parts.
+
+ """
+ msg_bytes = base64.b64decode(self.msg1_expected_compressed_base64) + \
+ base64.b64decode(self.msg2_expected_compressed_base64) + \
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ packet_size = 40
+
+ one_message_byte_count = len(base64.b64decode(self.msg1_expected_compressed_base64))
+ two_messages_byte_count = one_message_byte_count + len(base64.b64decode(self.msg2_expected_compressed_base64))
+ three_messages_byte_count = two_messages_byte_count + len(base64.b64decode(self.msg1_expected_compressed_base64))
+
+ for d in self.receive_parts_helper(msg_bytes, packet_size):
+ bytes_received = self.transfer.get_bytes_recv()
+
+ if bytes_received >= three_messages_byte_count:
+ expected_msgs_received_count = 3
+ elif bytes_received >= two_messages_byte_count:
+ expected_msgs_received_count = 2
+ elif bytes_received >= one_message_byte_count:
+ expected_msgs_received_count = 1
+ else:
+ expected_msgs_received_count = 0
+ # Verify that the expected number of complete messages has arrived
+ self.assertEquals(expected_msgs_received_count, len(self.transfer.get_messages_in()))
+
+ # Get the data as received by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1))
+ message2 = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2))
+ message3 = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message3))
+
+
+ # Remove underscore to enable test, or run the test directly:
+ # tests $ trial test_transfer.DelugeTransferProtocolTestCase._test_rencode_fail_protocol
+ def _test_rencode_fail_protocol(self):
+ """
+ This test tries to test the protocol that relies on errors from rencode.
+
+ """
+ msg_bytes = base64.b64decode(self.msg1_expected_compressed_base64) + \
+ base64.b64decode(self.msg2_expected_compressed_base64) + \
+ base64.b64decode(self.msg1_expected_compressed_base64)
+ packet_size = 149
+
+ one_message_byte_count = len(base64.b64decode(self.msg1_expected_compressed_base64))
+ two_messages_byte_count = one_message_byte_count + len(base64.b64decode(self.msg2_expected_compressed_base64))
+ three_messages_byte_count = two_messages_byte_count + len(base64.b64decode(self.msg1_expected_compressed_base64))
+
+ print
+
+ print "Msg1 size:", len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4
+ print "Msg2 size:", len(base64.b64decode(self.msg2_expected_compressed_base64)) - 4
+ print "Msg3 size:", len(base64.b64decode(self.msg1_expected_compressed_base64)) - 4
+
+ print "one_message_byte_count:", one_message_byte_count
+ print "two_messages_byte_count:", two_messages_byte_count
+ print "three_messages_byte_count:", three_messages_byte_count
+
+ for d in self.receive_parts_helper(msg_bytes, packet_size, self.transfer.dataReceived_old_protocol):
+ bytes_received = self.transfer.get_bytes_recv()
+
+ if bytes_received >= three_messages_byte_count:
+ expected_msgs_received_count = 3
+ elif bytes_received >= two_messages_byte_count:
+ expected_msgs_received_count = 2
+ elif bytes_received >= one_message_byte_count:
+ expected_msgs_received_count = 1
+ else:
+ expected_msgs_received_count = 0
+ # Verify that the expected number of complete messages has arrived
+ if expected_msgs_received_count != len(self.transfer.get_messages_in()):
+ print "Expected number of messages received is %d, but %d have been received."\
+ % (expected_msgs_received_count, len(self.transfer.get_messages_in()))
+
+ # Get the data as received by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1))
+ message2 = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2))
+ message3 = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message3))
+
+
+ def test_receive_middle_of_header(self):
+ """
+ This test concatenates two messsages (as they're sent over the network),
+ and lets DelugeTransferProtocol receive the data in two parts.
+ The first part contains the first message, plus two bytes of the next message.
+ The next part contains the rest of the message.
+
+ This is a special case, as DelugeTransferProtocol can't start parsing
+ a message until it has at least 4 bytes (the size of the header) to be able
+ to read and parse the size of the payload.
+
+ """
+ two_concatenated = base64.b64decode(self.msg1_expected_compressed_base64) + base64.b64decode(self.msg2_expected_compressed_base64)
+ first_len = len(base64.b64decode(self.msg1_expected_compressed_base64))
+
+ # Now found the entire first message, and half the header of the next message (2 bytes into the header)
+ self.transfer.dataReceived(two_concatenated[:first_len+2])
+
+ # Should be 1 message in the list
+ self.assertEquals(1, len(self.transfer.get_messages_in()))
+
+ # Send the rest
+ self.transfer.dataReceived(two_concatenated[first_len+2:])
+
+ # Should be 2 messages in the list
+ self.assertEquals(2, len(self.transfer.get_messages_in()))
+
+ # Get the data as sent by DelugeTransferProtocol
+ message1 = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg1), rencode.dumps(message1))
+ message2 = self.transfer.get_messages_in().pop(0)
+ self.assertEquals(rencode.dumps(self.msg2), rencode.dumps(message2))
+
+
+ # Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon
+ #def test_simulate_big_transfer(self):
+ # filename = "../deluge.torrentlist"
+ #
+ # f = open(filename, "r")
+ # data = f.read()
+ # message_to_send = eval(data)
+ # self.transfer.transfer_message(message_to_send)
+ #
+ # # Get the data as sent to the network by DelugeTransferProtocol
+ # compressed_data = self.transfer.get_messages_out_joined()
+ # packet_size = 16000 # Or something smaller...
+ #
+ # for d in self.receive_parts_helper(compressed_data, packet_size):
+ # bytes_recv = self.transfer.get_bytes_recv()
+ # if bytes_recv < len(compressed_data):
+ # self.assertEquals(len(self.transfer.get_messages_in()), 0)
+ # else:
+ # self.assertEquals(len(self.transfer.get_messages_in()), 1)
+ # # Get the data as received by DelugeTransferProtocol
+ # transfered_message = self.transfer.get_messages_in().pop(0)
+ # # Test that the data structures are equal
+ # #self.assertEquals(transfered_message, message_to_send)
+ # #self.assertTrue(transfered_message == message_to_send)
+ #
+ # #f.close()
+ # #f = open("rencode.torrentlist", "w")
+ # #f.write(str(transfered_message))
+ # #f.close()
+
+ def receive_parts_helper(self, data, packet_size, receive_func=None):
+ byte_count = len(data)
+ sent_bytes = 0
+ while byte_count > 0:
+ to_receive = packet_size if byte_count > packet_size else byte_count
+ sent_bytes += to_receive
+ byte_count -= to_receive
+ if receive_func:
+ receive_func(data[:to_receive])
+ else:
+ self.transfer.dataReceived(data[:to_receive])
+ data = data[to_receive:]
+ yield