summaryrefslogtreecommitdiffstats
path: root/deluge/transfer.py
blob: 6b26549e2ea9ddae4749f3dd1ea8617de93596ca (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# -*- coding: utf-8 -*-
#
# Copyright (C) 2012 Bro <bro.development@gmail.com>
# Copyright (C) 2018 Andrew Resch <andrewresch@gmail.com>
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#

from __future__ import unicode_literals

import logging
import struct
import zlib

import rencode
from twisted.internet.protocol import Protocol

log = logging.getLogger(__name__)

PROTOCOL_VERSION = 1
MESSAGE_HEADER_FORMAT = '!BI'
MESSAGE_HEADER_SIZE = struct.calcsize(MESSAGE_HEADER_FORMAT)


class DelugeTransferProtocol(Protocol, object):
    """
    Deluge RPC wire protocol.

    Data messages are transfered with a header containing a protocol version
    and the length of the data to be transfered (payload).

    The format is::

            ubyte    uint4     bytestring
        |.version.|..size..|.....body.....|

    The version is an unsigned byte that indicates the protocol version.
    The size is a unsigned 32-bit integer that is equal to the length of the body bytestring.
    The body is the compressed rencoded byte string of the data object.
    """

    def __init__(self):
        self._buffer = b''  # TODO: Look into using bytearray instead of byte string.
        self._message_length = 0
        self._bytes_received = 0
        self._bytes_sent = 0

    def transfer_message(self, data):
        """
        Transfer the data.

        :param data: data to be transfered in a data structure serializable by rencode.
        """
        body = zlib.compress(rencode.dumps(data))
        body_len = len(body)
        message = struct.pack(
            '{}{}s'.format(MESSAGE_HEADER_FORMAT, body_len),
            PROTOCOL_VERSION,
            body_len,
            body,
        )
        self._bytes_sent += len(message)
        self.transport.write(message)

    def dataReceived(self, data):  # NOQA: N802
        """
        This method is called whenever data is received.

        :param data: a message as transfered by transfer_message, or a part of such
                     a messsage.

        Global variables:
            _buffer         - contains the data received
            _message_length - the length of the payload of the current message.

        """
        self._buffer += data
        self._bytes_received += len(data)

        while len(self._buffer) >= MESSAGE_HEADER_SIZE:
            if self._message_length == 0:
                self._handle_new_message()
            # We have a complete packet
            if len(self._buffer) >= self._message_length:
                self._handle_complete_message(self._buffer[: self._message_length])
                # Remove message data from buffer
                self._buffer = self._buffer[self._message_length :]
                self._message_length = 0
            else:
                break

    def _handle_new_message(self):
        """
        Handle the start of a new message. This method is called only when the
        beginning of the buffer contains data from a new message (i.e. the header).

        """
        try:
            # Read the first bytes of the message (MESSAGE_HEADER_SIZE bytes)
            header = self._buffer[:MESSAGE_HEADER_SIZE]
            # Extract the length stored as an unsigned 32-bit integer
            version, self._message_length = struct.unpack(MESSAGE_HEADER_FORMAT, header)
            if version != PROTOCOL_VERSION:
                raise Exception(
                    'Received invalid protocol version: {}. PROTOCOL_VERSION is {}.'.format(
                        version, PROTOCOL_VERSION
                    )
                )
            # Remove the header from the buffer
            self._buffer = self._buffer[MESSAGE_HEADER_SIZE:]
        except Exception as ex:
            log.warning('Error occurred when parsing message header: %s.', ex)
            log.warning(
                'This version of Deluge cannot communicate with the sender of this data.'
            )
            self._message_length = 0
            self._buffer = b''

    def _handle_complete_message(self, data):
        """
        Handles a complete message as it is transfered on the network.

        :param data: a zlib compressed string encoded with rencode.

        """
        try:
            self.message_received(
                rencode.loads(zlib.decompress(data), decode_utf8=True)
            )
        except Exception as ex:
            log.warning(
                'Failed to decompress (%d bytes) and load serialized data with rencode: %s',
                len(data),
                ex,
            )

    def get_bytes_recv(self):
        """
        Returns the number of bytes received.

        :returns: the number of bytes received
        :rtype: int

        """
        return self._bytes_received

    def get_bytes_sent(self):
        """
        Returns the number of bytes sent.

        :returns: the number of bytes sent
        :rtype: int

        """
        return self._bytes_sent

    def message_received(self, message):
        """Override this method to receive the complete message"""
        pass