summaryrefslogtreecommitdiffstats
path: root/deluge/tests/test_client.py
blob: c89ad53096e9d601e9f62a35bc082238cc6380f2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#

from __future__ import unicode_literals

from twisted.internet import defer

import deluge.component as component
from deluge import error
from deluge.common import AUTH_LEVEL_NORMAL, get_localhost_auth, windows_check
from deluge.core.authmanager import AUTH_LEVEL_ADMIN
from deluge.ui.client import Client, DaemonSSLProxy, client

from .basetest import BaseTestCase
from .daemon_base import DaemonBase


class NoVersionSendingDaemonSSLProxy(DaemonSSLProxy):
    def authenticate(self, username, password):
        self.login_deferred = defer.Deferred()
        d = self.call('daemon.login', username, password)
        d.addCallback(self.__on_login, username)
        d.addErrback(self.__on_login_fail)
        return self.login_deferred

    def __on_login(self, result, username):
        self.login_deferred.callback(result)

    def __on_login_fail(self, result):
        self.login_deferred.errback(result)


class NoVersionSendingClient(Client):
    def connect(
        self,
        host='127.0.0.1',
        port=58846,
        username='',
        password='',
        skip_authentication=False,
    ):
        self._daemon_proxy = NoVersionSendingDaemonSSLProxy()
        self._daemon_proxy.set_disconnect_callback(self.__on_disconnect)

        d = self._daemon_proxy.connect(host, port)

        def on_connect_fail(reason):
            self.disconnect()
            return reason

        def on_authenticate(result, daemon_info):
            return result

        def on_authenticate_fail(reason):
            return reason

        def on_connected(daemon_version):
            return daemon_version

        def authenticate(daemon_version, username, password):
            d = self._daemon_proxy.authenticate(username, password)
            d.addCallback(on_authenticate, daemon_version)
            d.addErrback(on_authenticate_fail)
            return d

        d.addCallback(on_connected)
        d.addErrback(on_connect_fail)
        if not skip_authentication:
            d.addCallback(authenticate, username, password)
        return d

    def __on_disconnect(self):
        if self.disconnect_callback:
            self.disconnect_callback()


class ClientTestCase(BaseTestCase, DaemonBase):

    if windows_check():
        skip = 'windows cant start_core not enough arguments for format string'

    def set_up(self):
        d = self.common_set_up()
        d.addCallback(self.start_core)
        d.addErrback(self.terminate_core)
        return d

    def tear_down(self):
        d = component.shutdown()
        d.addCallback(self.terminate_core)
        return d

    def test_connect_no_credentials(self):
        d = client.connect('localhost', self.listen_port, username='', password='')

        def on_connect(result):
            self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN)
            self.addCleanup(client.disconnect)
            return result

        d.addCallbacks(on_connect, self.fail)
        return d

    def test_connect_localclient(self):
        username, password = get_localhost_auth()
        d = client.connect(
            'localhost', self.listen_port, username=username, password=password
        )

        def on_connect(result):
            self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN)
            self.addCleanup(client.disconnect)
            return result

        d.addCallbacks(on_connect, self.fail)
        return d

    def test_connect_bad_password(self):
        username, password = get_localhost_auth()
        d = client.connect(
            'localhost', self.listen_port, username=username, password=password + '1'
        )

        def on_failure(failure):
            self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError)
            self.assertEqual(failure.value.message, 'Password does not match')
            self.addCleanup(client.disconnect)

        d.addCallbacks(self.fail, on_failure)
        return d

    def test_connect_invalid_user(self):
        username, password = get_localhost_auth()
        d = client.connect('localhost', self.listen_port, username='invalid-user')

        def on_failure(failure):
            self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError)
            self.assertEqual(failure.value.message, 'Username does not exist')
            self.addCleanup(client.disconnect)

        d.addCallbacks(self.fail, on_failure)
        return d

    def test_connect_without_password(self):
        username, password = get_localhost_auth()
        d = client.connect('localhost', self.listen_port, username=username)

        def on_failure(failure):
            self.assertEqual(
                failure.trap(error.AuthenticationRequired), error.AuthenticationRequired
            )
            self.assertEqual(failure.value.username, username)
            self.addCleanup(client.disconnect)

        d.addCallbacks(self.fail, on_failure)
        return d

    @defer.inlineCallbacks
    def test_connect_with_password(self):
        username, password = get_localhost_auth()
        yield client.connect(
            'localhost', self.listen_port, username=username, password=password
        )
        yield client.core.create_account('testuser', 'testpw', 'DEFAULT')
        yield client.disconnect()
        ret = yield client.connect(
            'localhost', self.listen_port, username='testuser', password='testpw'
        )
        self.assertEqual(ret, AUTH_LEVEL_NORMAL)
        yield

    @defer.inlineCallbacks
    def test_invalid_rpc_method_call(self):
        yield client.connect('localhost', self.listen_port, username='', password='')
        d = client.core.invalid_method()

        def on_failure(failure):
            self.assertEqual(
                failure.trap(error.WrappedException), error.WrappedException
            )
            self.addCleanup(client.disconnect)

        d.addCallbacks(self.fail, on_failure)
        yield d

    def test_connect_without_sending_client_version_fails(self):
        username, password = get_localhost_auth()
        no_version_sending_client = NoVersionSendingClient()
        d = no_version_sending_client.connect(
            'localhost', self.listen_port, username=username, password=password
        )

        def on_failure(failure):
            self.assertEqual(
                failure.trap(error.IncompatibleClient), error.IncompatibleClient
            )
            self.addCleanup(no_version_sending_client.disconnect)

        d.addCallbacks(self.fail, on_failure)
        return d