summaryrefslogtreecommitdiffstats
path: root/deluge/tests
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/tests')
-rw-r--r--deluge/tests/__init__.py2
-rw-r--r--deluge/tests/basetest.py59
-rw-r--r--deluge/tests/common.py70
-rw-r--r--deluge/tests/common_web.py22
-rw-r--r--deluge/tests/daemon_base.py19
-rw-r--r--deluge/tests/data/seo.icobin1150 -> 0 bytes
-rw-r--r--deluge/tests/data/seo.svg1
-rw-r--r--deluge/tests/test_alertmanager.py12
-rw-r--r--deluge/tests/test_authmanager.py10
-rw-r--r--deluge/tests/test_bencode.py18
-rw-r--r--deluge/tests/test_client.py66
-rw-r--r--deluge/tests/test_common.py212
-rw-r--r--deluge/tests/test_component.py122
-rw-r--r--deluge/tests/test_config.py159
-rw-r--r--deluge/tests/test_core.py242
-rw-r--r--deluge/tests/test_decorators.py20
-rw-r--r--deluge/tests/test_error.py33
-rw-r--r--deluge/tests/test_files_tab.py66
-rw-r--r--deluge/tests/test_httpdownloader.py198
-rw-r--r--deluge/tests/test_json_api.py179
-rw-r--r--deluge/tests/test_log.py10
-rw-r--r--deluge/tests/test_maketorrent.py33
-rw-r--r--deluge/tests/test_maybe_coroutine.py213
-rw-r--r--deluge/tests/test_metafile.py27
-rw-r--r--deluge/tests/test_plugin_metadata.py36
-rw-r--r--deluge/tests/test_rpcserver.py46
-rw-r--r--deluge/tests/test_security.py75
-rw-r--r--deluge/tests/test_sessionproxy.py56
-rw-r--r--deluge/tests/test_torrent.py111
-rw-r--r--deluge/tests/test_torrentmanager.py116
-rw-r--r--deluge/tests/test_torrentview.py170
-rw-r--r--deluge/tests/test_tracker_icons.py80
-rw-r--r--deluge/tests/test_transfer.py49
-rw-r--r--deluge/tests/test_ui_common.py42
-rw-r--r--deluge/tests/test_ui_console.py71
-rw-r--r--deluge/tests/test_ui_entry.py324
-rw-r--r--deluge/tests/test_ui_gtk3.py22
-rw-r--r--deluge/tests/test_web_api.py115
-rw-r--r--deluge/tests/test_web_auth.py11
-rw-r--r--deluge/tests/test_webserver.py23
-rw-r--r--deluge/tests/twisted/plugins/delugereporter.py50
41 files changed, 1520 insertions, 1670 deletions
diff --git a/deluge/tests/__init__.py b/deluge/tests/__init__.py
index d3bf10def..7b6afa194 100644
--- a/deluge/tests/__init__.py
+++ b/deluge/tests/__init__.py
@@ -1,7 +1,5 @@
# Increase open file descriptor limit to allow tests to run
# without getting error: what(): epoll: Too many open files
-from __future__ import print_function, unicode_literals
-
from deluge.i18n import setup_translation
try:
diff --git a/deluge/tests/basetest.py b/deluge/tests/basetest.py
deleted file mode 100644
index 11ca18e53..000000000
--- a/deluge/tests/basetest.py
+++ /dev/null
@@ -1,59 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
-# the additional special exception to link portions of this program with the OpenSSL library.
-# See LICENSE for more details.
-#
-
-from __future__ import unicode_literals
-
-import warnings
-
-from twisted.internet.defer import maybeDeferred
-from twisted.trial import unittest
-
-import deluge.component as component
-
-
-class BaseTestCase(unittest.TestCase):
- """This is the base class that should be used for all test classes
- that create classes that inherit from deluge.component.Component. It
- ensures that the component registry has been cleaned up when tests
- have finished.
-
- """
-
- def setUp(self): # NOQA: N803
-
- if len(component._ComponentRegistry.components) != 0:
- warnings.warn(
- 'The component._ComponentRegistry.components is not empty on test setup.\n'
- 'This is probably caused by another test that did not clean up after finishing!: %s'
- % component._ComponentRegistry.components
- )
- d = maybeDeferred(self.set_up)
-
- def on_setup_error(error):
- warnings.warn('Error caught in test setup!\n%s' % error.getTraceback())
- self.fail()
-
- return d.addErrback(on_setup_error)
-
- def tearDown(self): # NOQA: N803
- d = maybeDeferred(self.tear_down)
-
- def on_teardown_failed(error):
- warnings.warn('Error caught in test teardown!\n%s' % error.getTraceback())
- self.fail()
-
- def on_teardown_complete(result):
- component._ComponentRegistry.components.clear()
- component._ComponentRegistry.dependents.clear()
-
- return d.addCallbacks(on_teardown_complete, on_teardown_failed)
-
- def set_up(self):
- pass
-
- def tear_down(self):
- pass
diff --git a/deluge/tests/common.py b/deluge/tests/common.py
index be33f8c58..b5941568d 100644
--- a/deluge/tests/common.py
+++ b/deluge/tests/common.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
#
@@ -7,22 +6,21 @@
# See LICENSE for more details.
#
-from __future__ import print_function, unicode_literals
-
import os
import sys
-import tempfile
import traceback
+import pytest
from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import Deferred
from twisted.internet.error import CannotListenError
-from twisted.trial import unittest
import deluge.configmanager
import deluge.core.preferencesmanager
import deluge.log
+from deluge.common import get_localhost_auth
from deluge.error import DelugeError
+from deluge.ui.client import Client
# This sets log level to critical, so use log.critical() to debug while running unit tests
deluge.log.setup_logger('none')
@@ -32,12 +30,6 @@ def disable_new_release_check():
deluge.core.preferencesmanager.DEFAULT_PREFS['new_release_check'] = False
-def set_tmp_config_dir():
- config_directory = tempfile.mkdtemp()
- deluge.configmanager.set_config_dir(config_directory)
- return config_directory
-
-
def setup_test_logger(level='info', prefix='deluge'):
deluge.log.setup_logger(level, filename='%s.log' % prefix, twisted_observer=False)
@@ -55,7 +47,7 @@ def todo_test(caller):
filename = os.path.basename(traceback.extract_stack(None, 2)[0][0])
funcname = traceback.extract_stack(None, 2)[0][2]
- raise unittest.SkipTest('TODO: %s:%s' % (filename, funcname))
+ pytest.skip(f'TODO: {filename}:{funcname}')
def add_watchdog(deferred, timeout=0.05, message=None):
@@ -73,7 +65,7 @@ def add_watchdog(deferred, timeout=0.05, message=None):
return watchdog
-class ReactorOverride(object):
+class ReactorOverride:
"""Class used to patch reactor while running unit tests
to avoid starting and stopping the twisted reactor
"""
@@ -97,12 +89,19 @@ class ReactorOverride(object):
class ProcessOutputHandler(protocol.ProcessProtocol):
def __init__(
- self, script, callbacks, logfile=None, print_stdout=True, print_stderr=True
+ self,
+ script,
+ shutdown_func,
+ callbacks,
+ logfile=None,
+ print_stdout=True,
+ print_stderr=True,
):
"""Executes a script and handle the process' output to stdout and stderr.
Args:
script (str): The script to execute.
+ shutdown_func (func): A function which will gracefully stop the called script.
callbacks (list): Callbacks to trigger if the expected output if found.
logfile (str, optional): Filename to wrote the process' output.
print_stderr (bool): Print the process' stderr output to stdout.
@@ -111,6 +110,7 @@ class ProcessOutputHandler(protocol.ProcessProtocol):
"""
self.callbacks = callbacks
self.script = script
+ self.shutdown_func = shutdown_func
self.log_output = ''
self.stderr_out = ''
self.logfile = logfile
@@ -130,6 +130,7 @@ class ProcessOutputHandler(protocol.ProcessProtocol):
with open(self.logfile, 'w') as f:
f.write(self.log_output)
+ @defer.inlineCallbacks
def kill(self):
"""Kill the running process.
@@ -142,11 +143,17 @@ class ProcessOutputHandler(protocol.ProcessProtocol):
self.killed = True
self._kill_watchdogs()
self.quit_d = Deferred()
- self.transport.signalProcess('INT')
- return self.quit_d
+ shutdown = self.shutdown_func()
+ shutdown.addTimeout(5, reactor)
+ try:
+ yield shutdown
+ except Exception:
+ self.transport.signalProcess('TERM')
+ result = yield self.quit_d
+ return result
def _kill_watchdogs(self):
- """"Cancel all watchdogs"""
+ """Cancel all watchdogs"""
for w in self.watchdogs:
if not w.called and not w.cancelled:
w.cancel()
@@ -205,7 +212,7 @@ class ProcessOutputHandler(protocol.ProcessProtocol):
def start_core(
- listen_port=58846,
+ listen_port=58900,
logfile=None,
timeout=10,
timeout_msg=None,
@@ -213,13 +220,14 @@ def start_core(
print_stdout=True,
print_stderr=True,
extra_callbacks=None,
+ config_directory='',
):
"""Start the deluge core as a daemon.
Args:
listen_port (int, optional): The port the daemon listens for client connections.
logfile (str, optional): Logfile name to write the output from the process.
- timeout (int): If none of the callbacks have been triggered before the imeout, the process is killed.
+ timeout (int): If none of the callbacks have been triggered before the timeout, the process is killed.
timeout_msg (str): The message to print when the timeout expires.
custom_script (str): Extra python code to insert into the daemon process script.
print_stderr (bool): If the output from the process' stderr should be printed to stdout.
@@ -234,7 +242,6 @@ def start_core(
or upon timeout expiry. The ProcessOutputHandler is the handler for the deluged process.
"""
- config_directory = set_tmp_config_dir()
daemon_script = """
import sys
import deluge.core.daemon_entry
@@ -254,7 +261,7 @@ except Exception:
import traceback
sys.stderr.write('Exception raised:\\n %%s' %% traceback.format_exc())
""" % {
- 'dir': config_directory,
+ 'dir': config_directory.as_posix(),
'port': listen_port,
'script': custom_script,
}
@@ -289,20 +296,30 @@ except Exception:
if extra_callbacks:
callbacks.extend(extra_callbacks)
+ @defer.inlineCallbacks
+ def shutdown_daemon():
+ username, password = get_localhost_auth()
+ client = Client()
+ yield client.connect(
+ 'localhost', listen_port, username=username, password=password
+ )
+ yield client.daemon.shutdown()
+
process_protocol = start_process(
- daemon_script, callbacks, logfile, print_stdout, print_stderr
+ daemon_script, shutdown_daemon, callbacks, logfile, print_stdout, print_stderr
)
return default_core_cb['deferred'], process_protocol
def start_process(
- script, callbacks, logfile=None, print_stdout=True, print_stderr=True
+ script, shutdown_func, callbacks, logfile=None, print_stdout=True, print_stderr=True
):
"""
Starts an external python process which executes the given script.
Args:
script (str): The content of the script to execute.
+ shutdown_func (func): A function which will gracefully end the called script.
callbacks (list): list of dictionaries specifying callbacks.
logfile (str, optional): Logfile name to write the output from the process.
print_stderr (bool): If the output from the process' stderr should be printed to stdout.
@@ -324,7 +341,12 @@ def start_process(
"""
cwd = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
process_protocol = ProcessOutputHandler(
- script.encode('utf8'), callbacks, logfile, print_stdout, print_stderr
+ script.encode('utf8'),
+ shutdown_func,
+ callbacks,
+ logfile,
+ print_stdout,
+ print_stderr,
)
# Add timeouts to deferreds
diff --git a/deluge/tests/common_web.py b/deluge/tests/common_web.py
index 706eb8d72..8db49d243 100644
--- a/deluge/tests/common_web.py
+++ b/deluge/tests/common_web.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
#
@@ -7,21 +6,20 @@
# See LICENSE for more details.
#
-from __future__ import unicode_literals
+import pytest
import deluge.common
-import deluge.component as component
import deluge.ui.web.auth
import deluge.ui.web.server
from deluge import configmanager
+from deluge.conftest import BaseTestCase
from deluge.ui.web.server import DelugeWeb
-from .basetest import BaseTestCase
from .common import ReactorOverride
-from .daemon_base import DaemonBase
-class WebServerTestBase(BaseTestCase, DaemonBase):
+@pytest.mark.usefixtures('daemon', 'component')
+class WebServerTestBase(BaseTestCase):
"""
Base class for tests that need a running webapi
@@ -30,10 +28,7 @@ class WebServerTestBase(BaseTestCase, DaemonBase):
def set_up(self):
self.host_id = None
deluge.ui.web.server.reactor = ReactorOverride()
- d = self.common_set_up()
- d.addCallback(self.start_core)
- d.addCallback(self.start_webapi)
- return d
+ return self.start_webapi(None)
def start_webapi(self, arg):
self.webserver_listen_port = 8999
@@ -50,13 +45,8 @@ class WebServerTestBase(BaseTestCase, DaemonBase):
self.host_id = host[0]
self.deluge_web.start()
- def tear_down(self):
- d = component.shutdown()
- d.addCallback(self.terminate_core)
- return d
-
-class WebServerMockBase(object):
+class WebServerMockBase:
"""
Class with utility functions for mocking with tests using the webserver
diff --git a/deluge/tests/daemon_base.py b/deluge/tests/daemon_base.py
index 7352e0d32..3ae86c4ca 100644
--- a/deluge/tests/daemon_base.py
+++ b/deluge/tests/daemon_base.py
@@ -1,12 +1,9 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import print_function, unicode_literals
-
import os.path
import pytest
@@ -14,24 +11,13 @@ from twisted.internet import defer
from twisted.internet.error import CannotListenError
import deluge.component as component
-from deluge.common import windows_check
from . import common
-@pytest.mark.usefixtures('get_pytest_basetemp')
-class DaemonBase(object):
- basetemp = None
-
- if windows_check():
- skip = 'windows cant start_core not enough arguments for format string'
-
- @pytest.fixture
- def get_pytest_basetemp(self, request):
- self.basetemp = request.config.option.basetemp
-
+@pytest.mark.usefixtures('config_dir')
+class DaemonBase:
def common_set_up(self):
- common.set_tmp_config_dir()
self.listen_port = 58900
self.core = None
return component.start()
@@ -78,6 +64,7 @@ class DaemonBase(object):
print_stdout=print_stdout,
print_stderr=print_stderr,
extra_callbacks=extra_callbacks,
+ config_directory=self.config_dir,
)
yield d
except CannotListenError as ex:
diff --git a/deluge/tests/data/seo.ico b/deluge/tests/data/seo.ico
deleted file mode 100644
index 841e52871..000000000
--- a/deluge/tests/data/seo.ico
+++ /dev/null
Binary files differ
diff --git a/deluge/tests/data/seo.svg b/deluge/tests/data/seo.svg
new file mode 100644
index 000000000..fc96f74d4
--- /dev/null
+++ b/deluge/tests/data/seo.svg
@@ -0,0 +1 @@
+<?xml version="1.0" encoding="UTF-8"?> <svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 95.63 110.42" width="10cm" height="10cm"><defs><style>.cls-1{fill:#ec5728;}.cls-2{fill:#fff;}</style></defs><title>seocom-target</title><g id="Layer_2" data-name="Layer 2"><g id="Layer_1-2" data-name="Layer 1"><polygon class="cls-1" points="95.63 82.81 47.81 110.42 0 82.81 0 27.61 47.81 0 95.63 27.61 95.63 82.81"></polygon><path class="cls-2" d="M47.81,18.64A36.57,36.57,0,1,0,84.38,55.21,36.57,36.57,0,0,0,47.81,18.64Zm0,63.92A27.35,27.35,0,1,1,75.16,55.21,27.35,27.35,0,0,1,47.81,82.56Z"></path><path class="cls-2" d="M47.81,39.46A15.75,15.75,0,1,0,63.56,55.21,15.75,15.75,0,0,0,47.81,39.46Zm0,24.25a8.5,8.5,0,1,1,8.5-8.5A8.51,8.51,0,0,1,47.81,63.71Z"></path></g></g></svg> \ No newline at end of file
diff --git a/deluge/tests/test_alertmanager.py b/deluge/tests/test_alertmanager.py
index f197882cd..5e63864e8 100644
--- a/deluge/tests/test_alertmanager.py
+++ b/deluge/tests/test_alertmanager.py
@@ -1,19 +1,15 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import deluge.component as component
+from deluge.conftest import BaseTestCase
from deluge.core.core import Core
-from .basetest import BaseTestCase
-
-class AlertManagerTestCase(BaseTestCase):
+class TestAlertManager(BaseTestCase):
def set_up(self):
self.core = Core()
self.core.config.config['lsd'] = False
@@ -28,7 +24,7 @@ class AlertManagerTestCase(BaseTestCase):
return
self.am.register_handler('dummy_alert', handler)
- self.assertEqual(self.am.handlers['dummy_alert'], [handler])
+ assert self.am.handlers['dummy_alert'] == [handler]
def test_deregister_handler(self):
def handler(alert):
@@ -36,4 +32,4 @@ class AlertManagerTestCase(BaseTestCase):
self.am.register_handler('dummy_alert', handler)
self.am.deregister_handler(handler)
- self.assertEqual(self.am.handlers['dummy_alert'], [])
+ assert self.am.handlers['dummy_alert'] == []
diff --git a/deluge/tests/test_authmanager.py b/deluge/tests/test_authmanager.py
index 91e122f73..aa86fdbac 100644
--- a/deluge/tests/test_authmanager.py
+++ b/deluge/tests/test_authmanager.py
@@ -1,20 +1,16 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import deluge.component as component
from deluge.common import get_localhost_auth
+from deluge.conftest import BaseTestCase
from deluge.core.authmanager import AUTH_LEVEL_ADMIN, AuthManager
-from .basetest import BaseTestCase
-
-class AuthManagerTestCase(BaseTestCase):
+class TestAuthManager(BaseTestCase):
def set_up(self):
self.auth = AuthManager()
self.auth.start()
@@ -24,4 +20,4 @@ class AuthManagerTestCase(BaseTestCase):
return component.shutdown()
def test_authorize(self):
- self.assertEqual(self.auth.authorize(*get_localhost_auth()), AUTH_LEVEL_ADMIN)
+ assert self.auth.authorize(*get_localhost_auth()) == AUTH_LEVEL_ADMIN
diff --git a/deluge/tests/test_bencode.py b/deluge/tests/test_bencode.py
index b49c21f83..a4a76818f 100644
--- a/deluge/tests/test_bencode.py
+++ b/deluge/tests/test_bencode.py
@@ -1,19 +1,17 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-from twisted.trial import unittest
+import pytest
from deluge import bencode
from . import common
-class BencodeTestCase(unittest.TestCase):
+class TestBencode:
def test_bencode_unicode_metainfo(self):
filename = common.get_test_data_file('test.torrent')
with open(filename, 'rb') as _file:
@@ -21,14 +19,14 @@ class BencodeTestCase(unittest.TestCase):
bencode.bencode({b'info': metainfo})
def test_bencode_unicode_value(self):
- self.assertEqual(bencode.bencode(b'abc'), b'3:abc')
- self.assertEqual(bencode.bencode('abc'), b'3:abc')
+ assert bencode.bencode(b'abc') == b'3:abc'
+ assert bencode.bencode('abc') == b'3:abc'
def test_bdecode(self):
- self.assertEqual(bencode.bdecode(b'3:dEf'), b'dEf')
- with self.assertRaises(bencode.BTFailure):
+ assert bencode.bdecode(b'3:dEf') == b'dEf'
+ with pytest.raises(bencode.BTFailure):
bencode.bdecode('dEf')
- with self.assertRaises(bencode.BTFailure):
+ with pytest.raises(bencode.BTFailure):
bencode.bdecode(b'dEf')
- with self.assertRaises(bencode.BTFailure):
+ with pytest.raises(bencode.BTFailure):
bencode.bdecode({'dEf': 123})
diff --git a/deluge/tests/test_client.py b/deluge/tests/test_client.py
index ae1e95a71..5a6727907 100644
--- a/deluge/tests/test_client.py
+++ b/deluge/tests/test_client.py
@@ -1,23 +1,17 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-
-from __future__ import unicode_literals
-
+import pytest
+import pytest_twisted
from twisted.internet import defer
-import deluge.component as component
from deluge import error
from deluge.common import AUTH_LEVEL_NORMAL, get_localhost_auth
from deluge.core.authmanager import AUTH_LEVEL_ADMIN
from deluge.ui.client import Client, DaemonSSLProxy, client
-from .basetest import BaseTestCase
-from .daemon_base import DaemonBase
-
class NoVersionSendingDaemonSSLProxy(DaemonSSLProxy):
def authenticate(self, username, password):
@@ -78,24 +72,13 @@ class NoVersionSendingClient(Client):
self.disconnect_callback()
-class ClientTestCase(BaseTestCase, DaemonBase):
- def set_up(self):
- d = self.common_set_up()
- d.addCallback(self.start_core)
- d.addErrback(self.terminate_core)
- return d
-
- def tear_down(self):
- d = component.shutdown()
- d.addCallback(self.terminate_core)
- return d
-
+@pytest.mark.usefixtures('daemon', 'client')
+class TestClient:
def test_connect_no_credentials(self):
d = client.connect('localhost', self.listen_port, username='', password='')
def on_connect(result):
- self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN)
- self.addCleanup(client.disconnect)
+ assert client.get_auth_level() == AUTH_LEVEL_ADMIN
return result
d.addCallbacks(on_connect, self.fail)
@@ -108,8 +91,7 @@ class ClientTestCase(BaseTestCase, DaemonBase):
)
def on_connect(result):
- self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN)
- self.addCleanup(client.disconnect)
+ assert client.get_auth_level() == AUTH_LEVEL_ADMIN
return result
d.addCallbacks(on_connect, self.fail)
@@ -122,21 +104,18 @@ class ClientTestCase(BaseTestCase, DaemonBase):
)
def on_failure(failure):
- self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError)
- self.assertEqual(failure.value.message, 'Password does not match')
- self.addCleanup(client.disconnect)
+ assert failure.trap(error.BadLoginError) == error.BadLoginError
+ assert failure.value.message == 'Password does not match'
d.addCallbacks(self.fail, on_failure)
return d
def test_connect_invalid_user(self):
- username, password = get_localhost_auth()
d = client.connect('localhost', self.listen_port, username='invalid-user')
def on_failure(failure):
- self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError)
- self.assertEqual(failure.value.message, 'Username does not exist')
- self.addCleanup(client.disconnect)
+ assert failure.trap(error.BadLoginError) == error.BadLoginError
+ assert failure.value.message == 'Username does not exist'
d.addCallbacks(self.fail, on_failure)
return d
@@ -146,16 +125,16 @@ class ClientTestCase(BaseTestCase, DaemonBase):
d = client.connect('localhost', self.listen_port, username=username)
def on_failure(failure):
- self.assertEqual(
- failure.trap(error.AuthenticationRequired), error.AuthenticationRequired
+ assert (
+ failure.trap(error.AuthenticationRequired)
+ == error.AuthenticationRequired
)
- self.assertEqual(failure.value.username, username)
- self.addCleanup(client.disconnect)
+ assert failure.value.username == username
d.addCallbacks(self.fail, on_failure)
return d
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_connect_with_password(self):
username, password = get_localhost_auth()
yield client.connect(
@@ -166,19 +145,15 @@ class ClientTestCase(BaseTestCase, DaemonBase):
ret = yield client.connect(
'localhost', self.listen_port, username='testuser', password='testpw'
)
- self.assertEqual(ret, AUTH_LEVEL_NORMAL)
- yield
+ assert ret == AUTH_LEVEL_NORMAL
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_invalid_rpc_method_call(self):
yield client.connect('localhost', self.listen_port, username='', password='')
d = client.core.invalid_method()
def on_failure(failure):
- self.assertEqual(
- failure.trap(error.WrappedException), error.WrappedException
- )
- self.addCleanup(client.disconnect)
+ assert failure.trap(error.WrappedException) == error.WrappedException
d.addCallbacks(self.fail, on_failure)
yield d
@@ -191,10 +166,7 @@ class ClientTestCase(BaseTestCase, DaemonBase):
)
def on_failure(failure):
- self.assertEqual(
- failure.trap(error.IncompatibleClient), error.IncompatibleClient
- )
- self.addCleanup(no_version_sending_client.disconnect)
+ assert failure.trap(error.IncompatibleClient) == error.IncompatibleClient
d.addCallbacks(self.fail, on_failure)
return d
diff --git a/deluge/tests/test_common.py b/deluge/tests/test_common.py
index 4f6aa2fd4..780d368ef 100644
--- a/deluge/tests/test_common.py
+++ b/deluge/tests/test_common.py
@@ -1,17 +1,15 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import os
import sys
import tarfile
+from urllib.parse import quote_plus
-from twisted.trial import unittest
+import pytest
from deluge.common import (
VersionSplit,
@@ -22,8 +20,11 @@ from deluge.common import (
fsize,
fspeed,
ftime,
+ get_magnet_info,
get_path_size,
is_infohash,
+ is_interface,
+ is_interface_name,
is_ip,
is_ipv4,
is_ipv6,
@@ -31,113 +32,123 @@ from deluge.common import (
is_url,
windows_check,
)
-from deluge.i18n import setup_translation
-
-from .common import get_test_data_file, set_tmp_config_dir
-
-class CommonTestCase(unittest.TestCase):
- def setUp(self): # NOQA
- self.config_dir = set_tmp_config_dir()
- setup_translation()
+from .common import get_test_data_file
- def tearDown(self): # NOQA
- pass
+class TestCommon:
def test_fsize(self):
- self.assertEqual(fsize(0), '0 B')
- self.assertEqual(fsize(100), '100 B')
- self.assertEqual(fsize(1023), '1023 B')
- self.assertEqual(fsize(1024), '1.0 KiB')
- self.assertEqual(fsize(1048575), '1024.0 KiB')
- self.assertEqual(fsize(1048576), '1.0 MiB')
- self.assertEqual(fsize(1073741823), '1024.0 MiB')
- self.assertEqual(fsize(1073741824), '1.0 GiB')
- self.assertEqual(fsize(112245), '109.6 KiB')
- self.assertEqual(fsize(110723441824), '103.1 GiB')
- self.assertEqual(fsize(1099511627775), '1024.0 GiB')
- self.assertEqual(fsize(1099511627777), '1.0 TiB')
- self.assertEqual(fsize(766148267453245), '696.8 TiB')
+ assert fsize(0) == '0 B'
+ assert fsize(100) == '100 B'
+ assert fsize(1023) == '1023 B'
+ assert fsize(1024) == '1.0 KiB'
+ assert fsize(1048575) == '1024.0 KiB'
+ assert fsize(1048576) == '1.0 MiB'
+ assert fsize(1073741823) == '1024.0 MiB'
+ assert fsize(1073741824) == '1.0 GiB'
+ assert fsize(112245) == '109.6 KiB'
+ assert fsize(110723441824) == '103.1 GiB'
+ assert fsize(1099511627775) == '1024.0 GiB'
+ assert fsize(1099511627777) == '1.0 TiB'
+ assert fsize(766148267453245) == '696.8 TiB'
def test_fpcnt(self):
- self.assertTrue(fpcnt(0.9311) == '93.11%')
+ assert fpcnt(0.9311) == '93.11%'
def test_fspeed(self):
- self.assertTrue(fspeed(43134) == '42.1 KiB/s')
+ assert fspeed(43134) == '42.1 KiB/s'
def test_fpeer(self):
- self.assertTrue(fpeer(10, 20) == '10 (20)')
- self.assertTrue(fpeer(10, -1) == '10')
+ assert fpeer(10, 20) == '10 (20)'
+ assert fpeer(10, -1) == '10'
def test_ftime(self):
- self.assertEqual(ftime(0), '')
- self.assertEqual(ftime(5), '5s')
- self.assertEqual(ftime(100), '1m 40s')
- self.assertEqual(ftime(3789), '1h 3m')
- self.assertEqual(ftime(23011), '6h 23m')
- self.assertEqual(ftime(391187), '4d 12h')
- self.assertEqual(ftime(604800), '1w 0d')
- self.assertEqual(ftime(13893086), '22w 6d')
- self.assertEqual(ftime(59740269), '1y 46w')
- self.assertEqual(ftime(61.25), '1m 1s')
- self.assertEqual(ftime(119.9), '1m 59s')
+ assert ftime(0) == ''
+ assert ftime(5) == '5s'
+ assert ftime(100) == '1m 40s'
+ assert ftime(3789) == '1h 3m'
+ assert ftime(23011) == '6h 23m'
+ assert ftime(391187) == '4d 12h'
+ assert ftime(604800) == '1w 0d'
+ assert ftime(13893086) == '22w 6d'
+ assert ftime(59740269) == '1y 46w'
+ assert ftime(61.25) == '1m 1s'
+ assert ftime(119.9) == '1m 59s'
def test_fdate(self):
- self.assertTrue(fdate(-1) == '')
+ assert fdate(-1) == ''
def test_is_url(self):
- self.assertTrue(is_url('http://deluge-torrent.org'))
- self.assertFalse(is_url('file://test.torrent'))
+ assert is_url('http://deluge-torrent.org')
+ assert not is_url('file://test.torrent')
def test_is_magnet(self):
- self.assertTrue(
- is_magnet('magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN')
- )
- self.assertFalse(is_magnet(None))
+ assert is_magnet('magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN')
+ assert not is_magnet(None)
def test_is_infohash(self):
- self.assertTrue(is_infohash('2dc5d0e71a66fe69649a640d39cb00a259704973'))
+ assert is_infohash('2dc5d0e71a66fe69649a640d39cb00a259704973')
def test_get_path_size(self):
if windows_check() and sys.version_info < (3, 8):
# https://bugs.python.org/issue1311
- raise unittest.SkipTest('os.devnull returns False on Windows')
- self.assertTrue(get_path_size(os.devnull) == 0)
- self.assertTrue(get_path_size('non-existant.file') == -1)
+ pytest.skip('os.devnull returns False on Windows')
+ assert get_path_size(os.devnull) == 0
+ assert get_path_size('non-existant.file') == -1
def test_is_ip(self):
- self.assertTrue(is_ip('192.0.2.0'))
- self.assertFalse(is_ip('192..0.0'))
- self.assertTrue(is_ip('2001:db8::'))
- self.assertFalse(is_ip('2001:db8:'))
+ assert is_ip('192.0.2.0')
+ assert not is_ip('192..0.0')
+ assert is_ip('2001:db8::')
+ assert not is_ip('2001:db8:')
def test_is_ipv4(self):
- self.assertTrue(is_ipv4('192.0.2.0'))
- self.assertFalse(is_ipv4('192..0.0'))
+ assert is_ipv4('192.0.2.0')
+ assert not is_ipv4('192..0.0')
def test_is_ipv6(self):
- self.assertTrue(is_ipv6('2001:db8::'))
- self.assertFalse(is_ipv6('2001:db8:'))
+ assert is_ipv6('2001:db8::')
+ assert not is_ipv6('2001:db8:')
+
+ def test_is_interface_name(self):
+ if windows_check():
+ assert not is_interface_name('2001:db8:')
+ assert not is_interface_name('{THIS0000-IS00-ONLY-FOR0-TESTING00000}')
+ else:
+ assert is_interface_name('lo')
+ assert not is_interface_name('127.0.0.1')
+ assert not is_interface_name('eth01101')
+
+ def test_is_interface(self):
+ if windows_check():
+ assert is_interface('127.0.0.1')
+ assert not is_interface('127')
+ assert not is_interface('{THIS0000-IS00-ONLY-FOR0-TESTING00000}')
+ else:
+ assert is_interface('lo')
+ assert is_interface('127.0.0.1')
+ assert not is_interface('127.')
+ assert not is_interface('eth01101')
def test_version_split(self):
- self.assertTrue(VersionSplit('1.2.2') == VersionSplit('1.2.2'))
- self.assertTrue(VersionSplit('1.2.1') < VersionSplit('1.2.2'))
- self.assertTrue(VersionSplit('1.1.9') < VersionSplit('1.2.2'))
- self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.1'))
- self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.2-dev0'))
- self.assertTrue(VersionSplit('1.2.2-dev') < VersionSplit('1.3.0-rc2'))
- self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.2-rc2'))
- self.assertTrue(VersionSplit('1.2.2-rc2-dev') < VersionSplit('1.2.2-rc2'))
- self.assertTrue(VersionSplit('1.2.2-rc3') > VersionSplit('1.2.2-rc2'))
- self.assertTrue(VersionSplit('0.14.9') == VersionSplit('0.14.9'))
- self.assertTrue(VersionSplit('0.14.9') > VersionSplit('0.14.5'))
- self.assertTrue(VersionSplit('0.14.10') >= VersionSplit('0.14.9'))
- self.assertTrue(VersionSplit('1.4.0') > VersionSplit('1.3.900.dev123'))
- self.assertTrue(VersionSplit('1.3.2rc2.dev1') < VersionSplit('1.3.2-rc2'))
- self.assertTrue(VersionSplit('1.3.900.dev888') > VersionSplit('1.3.900.dev123'))
- self.assertTrue(VersionSplit('1.4.0') > VersionSplit('1.4.0.dev123'))
- self.assertTrue(VersionSplit('1.4.0.dev1') < VersionSplit('1.4.0'))
- self.assertTrue(VersionSplit('1.4.0a1') < VersionSplit('1.4.0'))
+ assert VersionSplit('1.2.2') == VersionSplit('1.2.2')
+ assert VersionSplit('1.2.1') < VersionSplit('1.2.2')
+ assert VersionSplit('1.1.9') < VersionSplit('1.2.2')
+ assert VersionSplit('1.2.2') > VersionSplit('1.2.1')
+ assert VersionSplit('1.2.2') > VersionSplit('1.2.2-dev0')
+ assert VersionSplit('1.2.2-dev') < VersionSplit('1.3.0-rc2')
+ assert VersionSplit('1.2.2') > VersionSplit('1.2.2-rc2')
+ assert VersionSplit('1.2.2-rc2-dev') < VersionSplit('1.2.2-rc2')
+ assert VersionSplit('1.2.2-rc3') > VersionSplit('1.2.2-rc2')
+ assert VersionSplit('0.14.9') == VersionSplit('0.14.9')
+ assert VersionSplit('0.14.9') > VersionSplit('0.14.5')
+ assert VersionSplit('0.14.10') >= VersionSplit('0.14.9')
+ assert VersionSplit('1.4.0') > VersionSplit('1.3.900.dev123')
+ assert VersionSplit('1.3.2rc2.dev1') < VersionSplit('1.3.2-rc2')
+ assert VersionSplit('1.3.900.dev888') > VersionSplit('1.3.900.dev123')
+ assert VersionSplit('1.4.0') > VersionSplit('1.4.0.dev123')
+ assert VersionSplit('1.4.0.dev1') < VersionSplit('1.4.0')
+ assert VersionSplit('1.4.0a1') < VersionSplit('1.4.0')
def test_parse_human_size(self):
from deluge.common import parse_human_size
@@ -150,17 +161,15 @@ class CommonTestCase(unittest.TestCase):
('1 MiB', 2 ** (10 * 2)),
('1 GiB', 2 ** (10 * 3)),
('1 GiB', 2 ** (10 * 3)),
- ('1M', 10 ** 6),
- ('1MB', 10 ** 6),
- ('1 GB', 10 ** 9),
- ('1 TB', 10 ** 12),
+ ('1M', 10**6),
+ ('1MB', 10**6),
+ ('1 GB', 10**9),
+ ('1 TB', 10**12),
]
for human_size, byte_size in sizes:
parsed = parse_human_size(human_size)
- self.assertEqual(
- parsed, byte_size, 'Mismatch when converting: %s' % human_size
- )
+ assert parsed == byte_size, 'Mismatch when converting: %s' % human_size
def test_archive_files(self):
arc_filelist = [
@@ -171,10 +180,10 @@ class CommonTestCase(unittest.TestCase):
with tarfile.open(arc_filepath, 'r') as tar:
for tar_info in tar:
- self.assertTrue(tar_info.isfile())
- self.assertTrue(
- tar_info.name in [os.path.basename(arcf) for arcf in arc_filelist]
- )
+ assert tar_info.isfile()
+ assert tar_info.name in [
+ os.path.basename(arcf) for arcf in arc_filelist
+ ]
def test_archive_files_missing(self):
"""Archive exists even with file not found."""
@@ -185,8 +194,8 @@ class CommonTestCase(unittest.TestCase):
filelist.remove('missing.file')
with tarfile.open(arc_filepath, 'r') as tar:
- self.assertEqual(tar.getnames(), filelist)
- self.assertTrue(all(tarinfo.isfile() for tarinfo in tar))
+ assert tar.getnames() == filelist
+ assert all(tarinfo.isfile() for tarinfo in tar)
def test_archive_files_message(self):
filelist = ['test.torrent', 'deluge.png']
@@ -196,9 +205,22 @@ class CommonTestCase(unittest.TestCase):
result_files = filelist + ['archive_message.txt']
with tarfile.open(arc_filepath, 'r') as tar:
- self.assertEqual(tar.getnames(), result_files)
+ assert tar.getnames() == result_files
for tar_info in tar:
- self.assertTrue(tar_info.isfile())
+ assert tar_info.isfile()
if tar_info.name == 'archive_message.txt':
result = tar.extractfile(tar_info).read().decode()
- self.assertEqual(result, 'test')
+ assert result == 'test'
+
+ def test_get_magnet_info_tiers(self):
+ tracker1 = 'udp://tracker1.example.com'
+ tracker2 = 'udp://tracker2.example.com'
+ magnet = (
+ 'magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN'
+ f'&tr.1={quote_plus(tracker1)}'
+ f'&tr.2={quote_plus(tracker2)}'
+ )
+ result = get_magnet_info(magnet)
+ assert result['info_hash'] == '953bad769164e8482c7785a21d12166f94b9e14d'
+ assert result['trackers'][tracker1] == 1
+ assert result['trackers'][tracker2] == 2
diff --git a/deluge/tests/test_component.py b/deluge/tests/test_component.py
index 26f24ad00..0345e24d3 100644
--- a/deluge/tests/test_component.py
+++ b/deluge/tests/test_component.py
@@ -1,19 +1,15 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
+import pytest
+import pytest_twisted
from twisted.internet import defer, threads
-from twisted.trial.unittest import SkipTest
import deluge.component as component
-from .basetest import BaseTestCase
-
class ComponentTester(component.Component):
def __init__(self, name, depend=None):
@@ -70,14 +66,15 @@ class ComponentTesterShutdown(component.Component):
self.stop_count += 1
-class ComponentTestClass(BaseTestCase):
+@pytest.mark.usefixtures('component')
+class TestComponent:
def tear_down(self):
return component.shutdown()
def test_start_component(self):
def on_start(result, c):
- self.assertEqual(c._component_state, 'Started')
- self.assertEqual(c.start_count, 1)
+ assert c._component_state == 'Started'
+ assert c.start_count == 1
c = ComponentTester('test_start_c1')
d = component.start(['test_start_c1'])
@@ -86,16 +83,16 @@ class ComponentTestClass(BaseTestCase):
def test_start_stop_depends(self):
def on_stop(result, c1, c2):
- self.assertEqual(c1._component_state, 'Stopped')
- self.assertEqual(c2._component_state, 'Stopped')
- self.assertEqual(c1.stop_count, 1)
- self.assertEqual(c2.stop_count, 1)
+ assert c1._component_state == 'Stopped'
+ assert c2._component_state == 'Stopped'
+ assert c1.stop_count == 1
+ assert c2.stop_count == 1
def on_start(result, c1, c2):
- self.assertEqual(c1._component_state, 'Started')
- self.assertEqual(c2._component_state, 'Started')
- self.assertEqual(c1.start_count, 1)
- self.assertEqual(c2.start_count, 1)
+ assert c1._component_state == 'Started'
+ assert c2._component_state == 'Started'
+ assert c1.start_count == 1
+ assert c2.start_count == 1
return component.stop(['test_start_depends_c1']).addCallback(
on_stop, c1, c2
)
@@ -126,8 +123,8 @@ class ComponentTestClass(BaseTestCase):
def test_start_all(self):
def on_start(*args):
for c in args[1:]:
- self.assertEqual(c._component_state, 'Started')
- self.assertEqual(c.start_count, 1)
+ assert c._component_state == 'Started'
+ assert c.start_count == 1
ret = self.start_with_depends()
ret[0].addCallback(on_start, *ret[1:])
@@ -136,20 +133,19 @@ class ComponentTestClass(BaseTestCase):
def test_register_exception(self):
ComponentTester('test_register_exception_c1')
- self.assertRaises(
- component.ComponentAlreadyRegistered,
- ComponentTester,
- 'test_register_exception_c1',
- )
+ with pytest.raises(component.ComponentAlreadyRegistered):
+ ComponentTester(
+ 'test_register_exception_c1',
+ )
def test_stop_component(self):
def on_stop(result, c):
- self.assertEqual(c._component_state, 'Stopped')
- self.assertFalse(c._component_timer.running)
- self.assertEqual(c.stop_count, 1)
+ assert c._component_state == 'Stopped'
+ assert not c._component_timer.running
+ assert c.stop_count == 1
def on_start(result, c):
- self.assertEqual(c._component_state, 'Started')
+ assert c._component_state == 'Started'
return component.stop(['test_stop_component_c1']).addCallback(on_stop, c)
c = ComponentTesterUpdate('test_stop_component_c1')
@@ -160,12 +156,12 @@ class ComponentTestClass(BaseTestCase):
def test_stop_all(self):
def on_stop(result, *args):
for c in args:
- self.assertEqual(c._component_state, 'Stopped')
- self.assertEqual(c.stop_count, 1)
+ assert c._component_state == 'Stopped'
+ assert c.stop_count == 1
def on_start(result, *args):
for c in args:
- self.assertEqual(c._component_state, 'Started')
+ assert c._component_state == 'Started'
return component.stop().addCallback(on_stop, *args)
ret = self.start_with_depends()
@@ -175,9 +171,9 @@ class ComponentTestClass(BaseTestCase):
def test_update(self):
def on_start(result, c1, counter):
- self.assertTrue(c1._component_timer)
- self.assertTrue(c1._component_timer.running)
- self.assertNotEqual(c1.counter, counter)
+ assert c1._component_timer
+ assert c1._component_timer.running
+ assert c1.counter != counter
return component.stop()
c1 = ComponentTesterUpdate('test_update_c1')
@@ -189,13 +185,13 @@ class ComponentTestClass(BaseTestCase):
def test_pause(self):
def on_pause(result, c1, counter):
- self.assertEqual(c1._component_state, 'Paused')
- self.assertNotEqual(c1.counter, counter)
- self.assertFalse(c1._component_timer.running)
+ assert c1._component_state == 'Paused'
+ assert c1.counter != counter
+ assert not c1._component_timer.running
def on_start(result, c1, counter):
- self.assertTrue(c1._component_timer)
- self.assertNotEqual(c1.counter, counter)
+ assert c1._component_timer
+ assert c1.counter != counter
d = component.pause(['test_pause_c1'])
d.addCallback(on_pause, c1, counter)
return d
@@ -207,23 +203,16 @@ class ComponentTestClass(BaseTestCase):
d.addCallback(on_start, c1, cnt)
return d
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_component_start_error(self):
ComponentTesterUpdate('test_pause_c1')
yield component.start(['test_pause_c1'])
yield component.pause(['test_pause_c1'])
test_comp = component.get('test_pause_c1')
- try:
- result = self.failureResultOf(test_comp._component_start())
- except AttributeError:
- raise SkipTest(
- 'This test requires trial failureResultOf() in Twisted version >= 13'
- )
- self.assertEqual(
- result.check(component.ComponentException), component.ComponentException
- )
+ with pytest.raises(component.ComponentException, match='Current state: Paused'):
+ yield test_comp._component_start()
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_start_paused_error(self):
ComponentTesterUpdate('test_pause_c1')
yield component.start(['test_pause_c1'])
@@ -232,29 +221,26 @@ class ComponentTestClass(BaseTestCase):
# Deferreds that fail in component have to error handler which results in
# twisted doing a log.err call which causes the test to fail.
# Prevent failure by ignoring the exception
- self._observer._ignoreErrors(component.ComponentException)
+ # self._observer._ignoreErrors(component.ComponentException)
result = yield component.start()
- self.assertEqual(
- [(result[0][0], result[0][1].value)],
- [
- (
- defer.FAILURE,
- component.ComponentException(
- 'Trying to start component "%s" but it is '
- 'not in a stopped state. Current state: %s'
- % ('test_pause_c1', 'Paused'),
- '',
- ),
- )
- ],
- )
+ assert [(result[0][0], result[0][1].value)] == [
+ (
+ defer.FAILURE,
+ component.ComponentException(
+ 'Trying to start component "%s" but it is '
+ 'not in a stopped state. Current state: %s'
+ % ('test_pause_c1', 'Paused'),
+ '',
+ ),
+ )
+ ]
def test_shutdown(self):
def on_shutdown(result, c1):
- self.assertTrue(c1.shutdowned)
- self.assertEqual(c1._component_state, 'Stopped')
- self.assertEqual(c1.stop_count, 1)
+ assert c1.shutdowned
+ assert c1._component_state == 'Stopped'
+ assert c1.stop_count == 1
def on_start(result, c1):
d = component.shutdown()
diff --git a/deluge/tests/test_config.py b/deluge/tests/test_config.py
index 0f6df3bb5..2840dbf5b 100644
--- a/deluge/tests/test_config.py
+++ b/deluge/tests/test_config.py
@@ -1,23 +1,21 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
+import json
+import logging
import os
from codecs import getwriter
+import pytest
+import pytest_twisted
from twisted.internet import task
-from twisted.trial import unittest
-import deluge.config
from deluge.common import JSON_FORMAT
from deluge.config import Config
-
-from .common import set_tmp_config_dir
+from deluge.ui.hostlist import mask_hosts_password
DEFAULTS = {
'string': 'foobar',
@@ -26,37 +24,42 @@ DEFAULTS = {
'bool': True,
'unicode': 'foobar',
'password': 'abc123*\\[!]?/<>#{@}=|"+$%(^)~',
+ 'hosts': [
+ ('host1', 'port', '', 'password1234'),
+ ('host2', 'port', '', 'password5678'),
+ ],
}
-class ConfigTestCase(unittest.TestCase):
- def setUp(self): # NOQA: N803
- self.config_dir = set_tmp_config_dir()
+LOGGER = logging.getLogger(__name__)
+
+class TestConfig:
def test_init(self):
config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
- self.assertEqual(DEFAULTS, config.config)
+ assert DEFAULTS == config.config
config = Config('test.conf', config_dir=self.config_dir)
- self.assertEqual({}, config.config)
+ assert {} == config.config
def test_set_get_item(self):
config = Config('test.conf', config_dir=self.config_dir)
config['foo'] = 1
- self.assertEqual(config['foo'], 1)
- self.assertRaises(ValueError, config.set_item, 'foo', 'bar')
+ assert config['foo'] == 1
+ with pytest.raises(ValueError):
+ config.set_item('foo', 'bar')
config['foo'] = 2
- self.assertEqual(config.get_item('foo'), 2)
+ assert config.get_item('foo') == 2
config['foo'] = '3'
- self.assertEqual(config.get_item('foo'), 3)
+ assert config.get_item('foo') == 3
config['unicode'] = 'ВИДЕОФИЛЬМЫ'
- self.assertEqual(config['unicode'], 'ВИДЕОФИЛЬМЫ')
+ assert config['unicode'] == 'ВИДЕОФИЛЬМЫ'
config['unicode'] = b'foostring'
- self.assertFalse(isinstance(config.get_item('unicode'), bytes))
+ assert not isinstance(config.get_item('unicode'), bytes)
config._save_timer.cancel()
@@ -64,43 +67,103 @@ class ConfigTestCase(unittest.TestCase):
config = Config('test.conf', config_dir=self.config_dir)
config['foo'] = None
- self.assertIsNone(config['foo'])
- self.assertIsInstance(config['foo'], type(None))
+ assert config['foo'] is None
+ assert isinstance(config['foo'], type(None))
config['foo'] = 1
- self.assertEqual(config.get('foo'), 1)
+ assert config.get('foo') == 1
config['foo'] = None
- self.assertIsNone(config['foo'])
+ assert config['foo'] is None
config['bar'] = None
- self.assertIsNone(config['bar'])
+ assert config['bar'] is None
config['bar'] = None
- self.assertIsNone(config['bar'])
+ assert config['bar'] is None
config._save_timer.cancel()
+ @pytest_twisted.ensureDeferred
+ async def test_on_changed_callback(self, mock_callback):
+ config = Config('test.conf', config_dir=self.config_dir)
+ config.register_change_callback(mock_callback)
+ config['foo'] = 1
+ assert config['foo'] == 1
+ await mock_callback.deferred
+ mock_callback.assert_called_once_with('foo', 1)
+
+ @pytest_twisted.ensureDeferred
+ async def test_key_function_callback(self, mock_callback):
+ config = Config(
+ 'test.conf', defaults={'foo': 1, 'bar': 1}, config_dir=self.config_dir
+ )
+
+ assert config['foo'] == 1
+ config.register_set_function('foo', mock_callback)
+ await mock_callback.deferred
+ mock_callback.assert_called_once_with('foo', 1)
+
+ mock_callback.reset_mock()
+ config.register_set_function('bar', mock_callback, apply_now=False)
+ mock_callback.assert_not_called()
+ config['bar'] = 2
+ await mock_callback.deferred
+ mock_callback.assert_called_once_with('bar', 2)
+
def test_get(self):
config = Config('test.conf', config_dir=self.config_dir)
config['foo'] = 1
- self.assertEqual(config.get('foo'), 1)
- self.assertEqual(config.get('foobar'), None)
- self.assertEqual(config.get('foobar', 2), 2)
+ assert config.get('foo') == 1
+ assert config.get('foobar') is None
+ assert config.get('foobar', 2) == 2
config['foobar'] = 5
- self.assertEqual(config.get('foobar', 2), 5)
+ assert config.get('foobar', 2) == 5
+
+ def test_set_log_mask_funcs(self, caplog):
+ """Test mask func masks key in log"""
+ caplog.set_level(logging.DEBUG)
+ config = Config(
+ 'test.conf',
+ config_dir=self.config_dir,
+ log_mask_funcs={'hosts': mask_hosts_password},
+ )
+ config['hosts'] = DEFAULTS['hosts']
+ assert isinstance(config['hosts'], list)
+ assert 'host1' in caplog.text
+ assert 'host2' in caplog.text
+ assert 'password1234' not in caplog.text
+ assert 'password5678' not in caplog.text
+ assert '*' * 10 in caplog.text
+
+ def test_load_log_mask_funcs(self, caplog):
+ """Test mask func masks key in log"""
+ with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file:
+ json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT)
+
+ config = Config(
+ 'test.conf',
+ config_dir=self.config_dir,
+ log_mask_funcs={'hosts': mask_hosts_password},
+ )
+ with caplog.at_level(logging.DEBUG):
+ config.load(os.path.join(self.config_dir, 'test.conf'))
+ assert 'host1' in caplog.text
+ assert 'host2' in caplog.text
+ assert 'foobar' in caplog.text
+ assert 'password1234' not in caplog.text
+ assert 'password5678' not in caplog.text
+ assert '*' * 10 in caplog.text
def test_load(self):
def check_config():
config = Config('test.conf', config_dir=self.config_dir)
- self.assertEqual(config['string'], 'foobar')
- self.assertEqual(config['float'], 0.435)
- self.assertEqual(config['password'], 'abc123*\\[!]?/<>#{@}=|"+$%(^)~')
+ assert config['string'] == 'foobar'
+ assert config['float'] == 0.435
+ assert config['password'] == 'abc123*\\[!]?/<>#{@}=|"+$%(^)~'
# Test opening a previous 1.2 config file of just a json object
- import json
-
with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file:
json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT)
@@ -128,38 +191,38 @@ class ConfigTestCase(unittest.TestCase):
# We do this twice because the first time we need to save the file to disk
# and the second time we do a compare and we should not write
ret = config.save()
- self.assertTrue(ret)
+ assert ret
ret = config.save()
- self.assertTrue(ret)
+ assert ret
config['string'] = 'baz'
config['int'] = 2
ret = config.save()
- self.assertTrue(ret)
+ assert ret
del config
config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
- self.assertEqual(config['string'], 'baz')
- self.assertEqual(config['int'], 2)
+ assert config['string'] == 'baz'
+ assert config['int'] == 2
def test_save_timer(self):
- self.clock = task.Clock()
- deluge.config.callLater = self.clock.callLater
+ clock = task.Clock()
config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
+ config.callLater = clock.callLater
config['string'] = 'baz'
config['int'] = 2
- self.assertTrue(config._save_timer.active())
+ assert config._save_timer.active()
# Timeout set for 5 seconds in config, so lets move clock by 5 seconds
- self.clock.advance(5)
+ clock.advance(5)
def check_config(config):
- self.assertTrue(not config._save_timer.active())
+ assert not config._save_timer.active()
del config
config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir)
- self.assertEqual(config['string'], 'baz')
- self.assertEqual(config['int'], 2)
+ assert config['string'] == 'baz'
+ assert config['int'] == 2
check_config(config)
@@ -176,7 +239,7 @@ class ConfigTestCase(unittest.TestCase):
from deluge.config import find_json_objects
objects = find_json_objects(s)
- self.assertEqual(len(objects), 2)
+ assert len(objects) == 2
def test_find_json_objects_curly_brace(self):
"""Test with string containing curly brace"""
@@ -193,7 +256,7 @@ class ConfigTestCase(unittest.TestCase):
from deluge.config import find_json_objects
objects = find_json_objects(s)
- self.assertEqual(len(objects), 2)
+ assert len(objects) == 2
def test_find_json_objects_double_quote(self):
"""Test with string containing double quote"""
@@ -211,4 +274,4 @@ class ConfigTestCase(unittest.TestCase):
from deluge.config import find_json_objects
objects = find_json_objects(s)
- self.assertEqual(len(objects), 2)
+ assert len(objects) == 2
diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py
index 15fbc1bcf..6a3fb9506 100644
--- a/deluge/tests/test_core.py
+++ b/deluge/tests/test_core.py
@@ -1,20 +1,17 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
+import os
from base64 import b64encode
from hashlib import sha1 as sha
import pytest
-from six import integer_types
+import pytest_twisted
from twisted.internet import defer, reactor, task
from twisted.internet.error import CannotListenError
-from twisted.python.failure import Failure
from twisted.web.http import FORBIDDEN
from twisted.web.resource import EncodingResourceWrapper, Resource
from twisted.web.server import GzipEncoderFactory, Site
@@ -24,12 +21,12 @@ import deluge.common
import deluge.component as component
import deluge.core.torrent
from deluge._libtorrent import lt
+from deluge.conftest import BaseTestCase
from deluge.core.core import Core
from deluge.core.rpcserver import RPCServer
from deluge.error import AddTorrentError, InvalidTorrentError
from . import common
-from .basetest import BaseTestCase
common.disable_new_release_check()
@@ -80,14 +77,13 @@ class TopLevelResource(Resource):
)
-class CoreTestCase(BaseTestCase):
+class TestCore(BaseTestCase):
def set_up(self):
- common.set_tmp_config_dir()
self.rpcserver = RPCServer(listen=False)
- self.core = Core()
+ self.core: Core = Core()
self.core.config.config['lsd'] = False
self.clock = task.Clock()
- self.core.torrentmanager.callLater = self.clock.callLater
+ self.core.torrentmanager.clock = self.clock
self.listen_port = 51242
return component.start().addCallback(self.start_web_server)
@@ -131,7 +127,7 @@ class CoreTestCase(BaseTestCase):
torrent_id = self.core.add_torrent_file(filename, filedump, options)
return torrent_id
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_add_torrent_files(self):
options = {}
filenames = ['test.torrent', 'test_torrent.file.torrent']
@@ -142,9 +138,9 @@ class CoreTestCase(BaseTestCase):
filedump = b64encode(_file.read())
files_to_add.append((filename, filedump, options))
errors = yield self.core.add_torrent_files(files_to_add)
- self.assertEqual(len(errors), 0)
+ assert len(errors) == 0
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_add_torrent_files_error_duplicate(self):
options = {}
filenames = ['test.torrent', 'test.torrent']
@@ -155,10 +151,10 @@ class CoreTestCase(BaseTestCase):
filedump = b64encode(_file.read())
files_to_add.append((filename, filedump, options))
errors = yield self.core.add_torrent_files(files_to_add)
- self.assertEqual(len(errors), 1)
- self.assertTrue(str(errors[0]).startswith('Torrent already in session'))
+ assert len(errors) == 1
+ assert str(errors[0]).startswith('Torrent already in session')
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_add_torrent_file(self):
options = {}
filename = common.get_test_data_file('test.torrent')
@@ -171,17 +167,16 @@ class CoreTestCase(BaseTestCase):
with open(filename, 'rb') as _file:
info_hash = sha(bencode(bdecode(_file.read())[b'info'])).hexdigest()
- self.assertEqual(torrent_id, info_hash)
+ assert torrent_id == info_hash
def test_add_torrent_file_invalid_filedump(self):
options = {}
filename = common.get_test_data_file('test.torrent')
- self.assertRaises(
- AddTorrentError, self.core.add_torrent_file, filename, False, options
- )
+ with pytest.raises(AddTorrentError):
+ self.core.add_torrent_file(filename, False, options)
- @defer.inlineCallbacks
- def test_add_torrent_url(self):
+ @pytest_twisted.inlineCallbacks
+ def test_add_torrent_url(self, mock_mkstemp):
url = (
'http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent'
% self.listen_port
@@ -190,78 +185,83 @@ class CoreTestCase(BaseTestCase):
info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
torrent_id = yield self.core.add_torrent_url(url, options)
- self.assertEqual(torrent_id, info_hash)
+ assert torrent_id == info_hash
+ assert not os.path.isfile(mock_mkstemp[1])
- def test_add_torrent_url_with_cookie(self):
+ @pytest_twisted.ensureDeferred
+ async def test_add_torrent_url_with_cookie(self):
url = 'http://localhost:%d/cookie' % self.listen_port
options = {}
headers = {'Cookie': 'password=deluge'}
info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
- d = self.core.add_torrent_url(url, options)
- d.addCallbacks(self.fail, self.assertIsInstance, errbackArgs=(Failure,))
+ with pytest.raises(Exception):
+ await self.core.add_torrent_url(url, options)
- d = self.core.add_torrent_url(url, options, headers)
- d.addCallbacks(self.assertEqual, self.fail, callbackArgs=(info_hash,))
-
- return d
+ result = await self.core.add_torrent_url(url, options, headers)
+ assert result == info_hash
- def test_add_torrent_url_with_redirect(self):
+ @pytest_twisted.ensureDeferred
+ async def test_add_torrent_url_with_redirect(self):
url = 'http://localhost:%d/redirect' % self.listen_port
options = {}
info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
- d = self.core.add_torrent_url(url, options)
- d.addCallback(self.assertEqual, info_hash)
- return d
+ result = await self.core.add_torrent_url(url, options)
+ assert result == info_hash
- def test_add_torrent_url_with_partial_download(self):
+ @pytest_twisted.ensureDeferred
+ async def test_add_torrent_url_with_partial_download(self):
url = 'http://localhost:%d/partial' % self.listen_port
options = {}
info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
- d = self.core.add_torrent_url(url, options)
- d.addCallback(self.assertEqual, info_hash)
- return d
+ result = await self.core.add_torrent_url(url, options)
+ assert result == info_hash
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_add_torrent_magnet(self):
info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00'
- uri = deluge.common.create_magnet_uri(info_hash)
+ tracker = 'udp://tracker.example.com'
+ name = 'test magnet'
+ uri = deluge.common.create_magnet_uri(info_hash, name=name, trackers=[tracker])
options = {}
torrent_id = yield self.core.add_torrent_magnet(uri, options)
- self.assertEqual(torrent_id, info_hash)
+ assert torrent_id == info_hash
+ torrent_status = self.core.get_torrent_status(torrent_id, ['name', 'trackers'])
+ assert torrent_status['trackers'][0]['url'] == tracker
+ assert torrent_status['name'] == name
def test_resume_torrent(self):
tid1 = self.add_torrent('test.torrent', paused=True)
tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
# Assert paused
r1 = self.core.get_torrent_status(tid1, ['paused'])
- self.assertTrue(r1['paused'])
+ assert r1['paused']
r2 = self.core.get_torrent_status(tid2, ['paused'])
- self.assertTrue(r2['paused'])
+ assert r2['paused']
self.core.resume_torrent(tid2)
r1 = self.core.get_torrent_status(tid1, ['paused'])
- self.assertTrue(r1['paused'])
+ assert r1['paused']
r2 = self.core.get_torrent_status(tid2, ['paused'])
- self.assertFalse(r2['paused'])
+ assert not r2['paused']
def test_resume_torrent_list(self):
"""Backward compatibility for list of torrent_ids."""
torrent_id = self.add_torrent('test.torrent', paused=True)
self.core.resume_torrent([torrent_id])
result = self.core.get_torrent_status(torrent_id, ['paused'])
- self.assertFalse(result['paused'])
+ assert not result['paused']
def test_resume_torrents(self):
tid1 = self.add_torrent('test.torrent', paused=True)
tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
self.core.resume_torrents([tid1, tid2])
r1 = self.core.get_torrent_status(tid1, ['paused'])
- self.assertFalse(r1['paused'])
+ assert not r1['paused']
r2 = self.core.get_torrent_status(tid2, ['paused'])
- self.assertFalse(r2['paused'])
+ assert not r2['paused']
def test_resume_torrents_all(self):
"""With no torrent_ids param, resume all torrents"""
@@ -269,33 +269,33 @@ class CoreTestCase(BaseTestCase):
tid2 = self.add_torrent('test_torrent.file.torrent', paused=True)
self.core.resume_torrents()
r1 = self.core.get_torrent_status(tid1, ['paused'])
- self.assertFalse(r1['paused'])
+ assert not r1['paused']
r2 = self.core.get_torrent_status(tid2, ['paused'])
- self.assertFalse(r2['paused'])
+ assert not r2['paused']
def test_pause_torrent(self):
tid1 = self.add_torrent('test.torrent')
tid2 = self.add_torrent('test_torrent.file.torrent')
# Assert not paused
r1 = self.core.get_torrent_status(tid1, ['paused'])
- self.assertFalse(r1['paused'])
+ assert not r1['paused']
r2 = self.core.get_torrent_status(tid2, ['paused'])
- self.assertFalse(r2['paused'])
+ assert not r2['paused']
self.core.pause_torrent(tid2)
r1 = self.core.get_torrent_status(tid1, ['paused'])
- self.assertFalse(r1['paused'])
+ assert not r1['paused']
r2 = self.core.get_torrent_status(tid2, ['paused'])
- self.assertTrue(r2['paused'])
+ assert r2['paused']
def test_pause_torrent_list(self):
"""Backward compatibility for list of torrent_ids."""
torrent_id = self.add_torrent('test.torrent')
result = self.core.get_torrent_status(torrent_id, ['paused'])
- self.assertFalse(result['paused'])
+ assert not result['paused']
self.core.pause_torrent([torrent_id])
result = self.core.get_torrent_status(torrent_id, ['paused'])
- self.assertTrue(result['paused'])
+ assert result['paused']
def test_pause_torrents(self):
tid1 = self.add_torrent('test.torrent')
@@ -303,9 +303,9 @@ class CoreTestCase(BaseTestCase):
self.core.pause_torrents([tid1, tid2])
r1 = self.core.get_torrent_status(tid1, ['paused'])
- self.assertTrue(r1['paused'])
+ assert r1['paused']
r2 = self.core.get_torrent_status(tid2, ['paused'])
- self.assertTrue(r2['paused'])
+ assert r2['paused']
def test_pause_torrents_all(self):
"""With no torrent_ids param, pause all torrents"""
@@ -314,26 +314,24 @@ class CoreTestCase(BaseTestCase):
self.core.pause_torrents()
r1 = self.core.get_torrent_status(tid1, ['paused'])
- self.assertTrue(r1['paused'])
+ assert r1['paused']
r2 = self.core.get_torrent_status(tid2, ['paused'])
- self.assertTrue(r2['paused'])
+ assert r2['paused']
+ @pytest_twisted.inlineCallbacks
def test_prefetch_metadata_existing(self):
"""Check another call with same magnet returns existing deferred."""
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
- expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', None)
-
- def on_result(result):
- self.assertEqual(result, expected)
+ expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'')
- d = self.core.prefetch_magnet_metadata(magnet)
- d.addCallback(on_result)
+ d1 = self.core.prefetch_magnet_metadata(magnet)
d2 = self.core.prefetch_magnet_metadata(magnet)
- d2.addCallback(on_result)
+ dg = defer.gatherResults([d1, d2], consumeErrors=True)
self.clock.advance(30)
- return defer.DeferredList([d, d2])
+ result = yield dg
+ assert result == [expected] * 2
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_remove_torrent(self):
options = {}
filename = common.get_test_data_file('test.torrent')
@@ -341,18 +339,17 @@ class CoreTestCase(BaseTestCase):
filedump = b64encode(_file.read())
torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options)
removed = self.core.remove_torrent(torrent_id, True)
- self.assertTrue(removed)
- self.assertEqual(len(self.core.get_session_state()), 0)
+ assert removed
+ assert len(self.core.get_session_state()) == 0
def test_remove_torrent_invalid(self):
- self.assertRaises(
- InvalidTorrentError,
- self.core.remove_torrent,
- 'torrentidthatdoesntexist',
- True,
- )
+ with pytest.raises(InvalidTorrentError):
+ self.core.remove_torrent(
+ 'torrentidthatdoesntexist',
+ True,
+ )
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_remove_torrents(self):
options = {}
filename = common.get_test_data_file('test.torrent')
@@ -369,17 +366,17 @@ class CoreTestCase(BaseTestCase):
d = self.core.remove_torrents([torrent_id, torrent_id2], True)
def test_ret(val):
- self.assertTrue(val == [])
+ assert val == []
d.addCallback(test_ret)
def test_session_state(val):
- self.assertEqual(len(self.core.get_session_state()), 0)
+ assert len(self.core.get_session_state()) == 0
d.addCallback(test_session_state)
yield d
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_remove_torrents_invalid(self):
options = {}
filename = common.get_test_data_file('test.torrent')
@@ -391,58 +388,53 @@ class CoreTestCase(BaseTestCase):
val = yield self.core.remove_torrents(
['invalidid1', 'invalidid2', torrent_id], False
)
- self.assertEqual(len(val), 2)
- self.assertEqual(
- val[0], ('invalidid1', 'torrent_id invalidid1 not in session.')
- )
- self.assertEqual(
- val[1], ('invalidid2', 'torrent_id invalidid2 not in session.')
- )
+ assert len(val) == 2
+ assert val[0] == ('invalidid1', 'torrent_id invalidid1 not in session.')
+ assert val[1] == ('invalidid2', 'torrent_id invalidid2 not in session.')
def test_get_session_status(self):
status = self.core.get_session_status(
['net.recv_tracker_bytes', 'net.sent_tracker_bytes']
)
- self.assertIsInstance(status, dict)
- self.assertEqual(status['net.recv_tracker_bytes'], 0)
- self.assertEqual(status['net.sent_tracker_bytes'], 0)
+ assert isinstance(status, dict)
+ assert status['net.recv_tracker_bytes'] == 0
+ assert status['net.sent_tracker_bytes'] == 0
def test_get_session_status_all(self):
status = self.core.get_session_status([])
- self.assertIsInstance(status, dict)
- self.assertIn('upload_rate', status)
- self.assertIn('net.recv_bytes', status)
+ assert isinstance(status, dict)
+ assert 'upload_rate' in status
+ assert 'net.recv_bytes' in status
def test_get_session_status_depr(self):
status = self.core.get_session_status(['num_peers', 'num_unchoked'])
- self.assertIsInstance(status, dict)
- self.assertEqual(status['num_peers'], 0)
- self.assertEqual(status['num_unchoked'], 0)
+ assert isinstance(status, dict)
+ assert status['num_peers'] == 0
+ assert status['num_unchoked'] == 0
def test_get_session_status_rates(self):
status = self.core.get_session_status(['upload_rate', 'download_rate'])
- self.assertIsInstance(status, dict)
- self.assertEqual(status['upload_rate'], 0)
+ assert isinstance(status, dict)
+ assert status['upload_rate'] == 0
def test_get_session_status_ratio(self):
status = self.core.get_session_status(['write_hit_ratio', 'read_hit_ratio'])
- self.assertIsInstance(status, dict)
- self.assertEqual(status['write_hit_ratio'], 0.0)
- self.assertEqual(status['read_hit_ratio'], 0.0)
+ assert isinstance(status, dict)
+ assert status['write_hit_ratio'] == 0.0
+ assert status['read_hit_ratio'] == 0.0
def test_get_free_space(self):
space = self.core.get_free_space('.')
- # get_free_space returns long on Python 2 (32-bit).
- self.assertTrue(isinstance(space, integer_types))
- self.assertTrue(space >= 0)
- self.assertEqual(self.core.get_free_space('/someinvalidpath'), -1)
+ assert isinstance(space, int)
+ assert space >= 0
+ assert self.core.get_free_space('/someinvalidpath') == -1
@pytest.mark.slow
def test_test_listen_port(self):
d = self.core.test_listen_port()
def result(r):
- self.assertTrue(r in (True, False))
+ assert r in (True, False)
d.addCallback(result)
return d
@@ -460,24 +452,22 @@ class CoreTestCase(BaseTestCase):
}
for key in pathlist:
- self.assertEqual(
- deluge.core.torrent.sanitize_filepath(key, folder=False), pathlist[key]
+ assert (
+ deluge.core.torrent.sanitize_filepath(key, folder=False)
+ == pathlist[key]
)
- self.assertEqual(
- deluge.core.torrent.sanitize_filepath(key, folder=True),
- pathlist[key] + '/',
+
+ assert (
+ deluge.core.torrent.sanitize_filepath(key, folder=True)
+ == pathlist[key] + '/'
)
def test_get_set_config_values(self):
- self.assertEqual(
- self.core.get_config_values(['abc', 'foo']), {'foo': None, 'abc': None}
- )
- self.assertEqual(self.core.get_config_value('foobar'), None)
+ assert self.core.get_config_values(['abc', 'foo']) == {'foo': None, 'abc': None}
+ assert self.core.get_config_value('foobar') is None
self.core.set_config({'abc': 'def', 'foo': 10, 'foobar': 'barfoo'})
- self.assertEqual(
- self.core.get_config_values(['foo', 'abc']), {'foo': 10, 'abc': 'def'}
- )
- self.assertEqual(self.core.get_config_value('foobar'), 'barfoo')
+ assert self.core.get_config_values(['foo', 'abc']) == {'foo': 10, 'abc': 'def'}
+ assert self.core.get_config_value('foobar') == 'barfoo'
def test_read_only_config_keys(self):
key = 'max_upload_speed'
@@ -486,13 +476,13 @@ class CoreTestCase(BaseTestCase):
old_value = self.core.get_config_value(key)
self.core.set_config({key: old_value + 10})
new_value = self.core.get_config_value(key)
- self.assertEqual(old_value, new_value)
+ assert old_value == new_value
self.core.read_only_config_keys = None
def test__create_peer_id(self):
- self.assertEqual(self.core._create_peer_id('2.0.0'), '-DE200s-')
- self.assertEqual(self.core._create_peer_id('2.0.0.dev15'), '-DE200D-')
- self.assertEqual(self.core._create_peer_id('2.0.1rc1'), '-DE201r-')
- self.assertEqual(self.core._create_peer_id('2.11.0b2'), '-DE2B0b-')
- self.assertEqual(self.core._create_peer_id('2.4.12b2.dev3'), '-DE24CD-')
+ assert self.core._create_peer_id('2.0.0') == '-DE200s-'
+ assert self.core._create_peer_id('2.0.0.dev15') == '-DE200D-'
+ assert self.core._create_peer_id('2.0.1rc1') == '-DE201r-'
+ assert self.core._create_peer_id('2.11.0b2') == '-DE2B0b-'
+ assert self.core._create_peer_id('2.4.12b2.dev3') == '-DE24CD-'
diff --git a/deluge/tests/test_decorators.py b/deluge/tests/test_decorators.py
index 7d4bd98c8..d2ecd1a2b 100644
--- a/deluge/tests/test_decorators.py
+++ b/deluge/tests/test_decorators.py
@@ -1,18 +1,14 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
-from twisted.trial import unittest
from deluge.decorators import proxy
-class DecoratorsTestCase(unittest.TestCase):
+class TestDecorators:
def test_proxy_with_simple_functions(self):
def negate(func, *args, **kwargs):
return not func(*args, **kwargs)
@@ -26,16 +22,16 @@ class DecoratorsTestCase(unittest.TestCase):
def double_nothing(_bool):
return _bool
- self.assertTrue(something(False))
- self.assertFalse(something(True))
- self.assertTrue(double_nothing(True))
- self.assertFalse(double_nothing(False))
+ assert something(False)
+ assert not something(True)
+ assert double_nothing(True)
+ assert not double_nothing(False)
def test_proxy_with_class_method(self):
def negate(func, *args, **kwargs):
return -func(*args, **kwargs)
- class Test(object):
+ class Test:
def __init__(self, number):
self.number = number
@@ -48,5 +44,5 @@ class DecoratorsTestCase(unittest.TestCase):
return self.diff(number)
t = Test(5)
- self.assertEqual(t.diff(1), -4)
- self.assertEqual(t.no_diff(1), 4)
+ assert t.diff(1) == -4
+ assert t.no_diff(1) == 4
diff --git a/deluge/tests/test_error.py b/deluge/tests/test_error.py
index c552e9422..a87d6a2d8 100644
--- a/deluge/tests/test_error.py
+++ b/deluge/tests/test_error.py
@@ -1,54 +1,39 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
-from twisted.trial import unittest
-
import deluge.error
-class ErrorTestCase(unittest.TestCase):
- def setUp(self): # NOQA: N803
- pass
-
- def tearDown(self): # NOQA: N803
- pass
-
+class TestError:
def test_deluge_error(self):
msg = 'Some message'
e = deluge.error.DelugeError(msg)
- self.assertEqual(str(e), msg)
+ assert str(e) == msg
from twisted.internet.defer import DebugInfo
del DebugInfo.__del__ # Hides all errors
- self.assertEqual(e._args, (msg,))
- self.assertEqual(e._kwargs, {})
+ assert e._args == (msg,)
+ assert e._kwargs == {}
def test_incompatible_client(self):
version = '1.3.6'
e = deluge.error.IncompatibleClient(version)
- self.assertEqual(
- str(e),
- 'Your deluge client is not compatible with the daemon. \
-Please upgrade your client to %s'
- % version,
+ assert (
+ str(e) == 'Your deluge client is not compatible with the daemon. '
+ 'Please upgrade your client to %s' % version
)
def test_not_authorized_error(self):
current_level = 5
required_level = 10
e = deluge.error.NotAuthorizedError(current_level, required_level)
- self.assertEqual(
- str(e), 'Auth level too low: %d < %d' % (current_level, required_level)
- )
+ assert str(e) == 'Auth level too low: %d < %d' % (current_level, required_level)
def test_bad_login_error(self):
message = 'Login failed'
username = 'deluge'
e = deluge.error.BadLoginError(message, username)
- self.assertEqual(str(e), message)
+ assert str(e) == message
diff --git a/deluge/tests/test_files_tab.py b/deluge/tests/test_files_tab.py
index 1ec8e18de..1e97cbbc3 100644
--- a/deluge/tests/test_files_tab.py
+++ b/deluge/tests/test_files_tab.py
@@ -1,23 +1,16 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import print_function, unicode_literals
-
import pytest
-from twisted.trial import unittest
import deluge.component as component
-from deluge.common import windows_check
from deluge.configmanager import ConfigManager
+from deluge.conftest import BaseTestCase
from deluge.i18n import setup_translation
-from . import common
-from .basetest import BaseTestCase
-
libs_available = True
# Allow running other tests without GTKUI dependencies available
try:
@@ -32,12 +25,11 @@ setup_translation()
@pytest.mark.gtkui
-class FilesTabTestCase(BaseTestCase):
+class TestFilesTab(BaseTestCase):
def set_up(self):
if libs_available is False:
- raise unittest.SkipTest('GTKUI dependencies not available')
+ pytest.skip('GTKUI dependencies not available')
- common.set_tmp_config_dir()
ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS)
self.mainwindow = MainWindow()
self.filestab = FilesTab()
@@ -52,8 +44,8 @@ class FilesTabTestCase(BaseTestCase):
root = treestore.get_iter_first()
level = 1
- def p_level(s, l):
- print('%s%s' % (' ' * l, s))
+ def p_level(s, lvl):
+ print('{}{}'.format(' ' * lvl, s))
def _print_treestore_children(i, lvl):
while i:
@@ -98,80 +90,74 @@ class FilesTabTestCase(BaseTestCase):
)
if not ret:
self.print_treestore('Treestore not expected:', self.filestab.treestore)
- self.assertTrue(ret)
+ assert ret
def test_files_tab2(self):
- if windows_check():
- raise unittest.SkipTest('on windows \\ != / for path names')
self.filestab.files_list[self.t_id] = (
- {'index': 0, 'path': '1/1/test_10.txt', 'offset': 0, 'size': 13},
- {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14},
+ {'index': 0, 'path': '1/1/test_100.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': 'test_101.txt', 'offset': 13, 'size': 14},
)
self.filestab.update_files()
self.filestab._on_torrentfilerenamed_event(
- self.t_id, self.index, '1/1/test_100.txt'
+ self.t_id, self.index, '1/1/test_101.txt'
)
ret = self.verify_treestore(
self.filestab.treestore,
- [['1/', [['1/', [['test_100.txt'], ['test_10.txt']]]]]],
+ [['1/', [['1/', [['test_100.txt'], ['test_101.txt']]]]]],
)
if not ret:
self.print_treestore('Treestore not expected:', self.filestab.treestore)
- self.assertTrue(ret)
+ assert ret
def test_files_tab3(self):
- if windows_check():
- raise unittest.SkipTest('on windows \\ != / for path names')
self.filestab.files_list[self.t_id] = (
- {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13},
- {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14},
+ {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': 'test_101.txt', 'offset': 13, 'size': 14},
)
self.filestab.update_files()
self.filestab._on_torrentfilerenamed_event(
- self.t_id, self.index, '1/test_100.txt'
+ self.t_id, self.index, '1/test_101.txt'
)
ret = self.verify_treestore(
- self.filestab.treestore, [['1/', [['test_100.txt'], ['test_10.txt']]]]
+ self.filestab.treestore, [['1/', [['test_100.txt'], ['test_101.txt']]]]
)
if not ret:
self.print_treestore('Treestore not expected:', self.filestab.treestore)
- self.assertTrue(ret)
+ assert ret
def test_files_tab4(self):
self.filestab.files_list[self.t_id] = (
- {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13},
- {'index': 1, 'path': '1/test_100.txt', 'offset': 13, 'size': 14},
+ {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': '1/test_101.txt', 'offset': 13, 'size': 14},
)
self.filestab.update_files()
self.filestab._on_torrentfilerenamed_event(
- self.t_id, self.index, '1/2/test_100.txt'
+ self.t_id, self.index, '1/2/test_101.txt'
)
ret = self.verify_treestore(
self.filestab.treestore,
- [['1/', [['2/', [['test_100.txt']]], ['test_10.txt']]]],
+ [['1/', [['2/', [['test_101.txt']]], ['test_100.txt']]]],
)
if not ret:
self.print_treestore('Treestore not expected:', self.filestab.treestore)
- self.assertTrue(ret)
+ assert ret
def test_files_tab5(self):
- if windows_check():
- raise unittest.SkipTest('on windows \\ != / for path names')
self.filestab.files_list[self.t_id] = (
- {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13},
- {'index': 1, 'path': '2/test_100.txt', 'offset': 13, 'size': 14},
+ {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13},
+ {'index': 1, 'path': '2/test_101.txt', 'offset': 13, 'size': 14},
)
self.filestab.update_files()
self.filestab._on_torrentfilerenamed_event(
- self.t_id, self.index, '1/test_100.txt'
+ self.t_id, self.index, '1/test_101.txt'
)
ret = self.verify_treestore(
- self.filestab.treestore, [['1/', [['test_100.txt'], ['test_10.txt']]]]
+ self.filestab.treestore, [['1/', [['test_100.txt'], ['test_101.txt']]]]
)
if not ret:
self.print_treestore('Treestore not expected:', self.filestab.treestore)
- self.assertTrue(ret)
+ assert ret
diff --git a/deluge/tests/test_httpdownloader.py b/deluge/tests/test_httpdownloader.py
index ad947a422..8c491b68a 100644
--- a/deluge/tests/test_httpdownloader.py
+++ b/deluge/tests/test_httpdownloader.py
@@ -1,22 +1,18 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import os
import tempfile
from email.utils import formatdate
-from io import open
+import pytest
+import pytest_twisted
from twisted.internet import reactor
from twisted.internet.error import CannotListenError
-from twisted.python.failure import Failure
-from twisted.trial import unittest
-from twisted.web.error import PageRedirect
+from twisted.web.error import Error, PageRedirect
from twisted.web.http import NOT_MODIFIED
from twisted.web.resource import EncodingResourceWrapper, Resource
from twisted.web.server import GzipEncoderFactory, Site
@@ -71,7 +67,7 @@ class TorrentResource(Resource):
content_type += b'; charset=' + charset
request.setHeader(b'Content-Type', content_type)
request.setHeader(b'Content-Disposition', b'attachment; filename=test.torrent')
- return 'Binary attachment ignore charset 世丕且\n'.encode('utf8')
+ return 'Binary attachment ignore charset 世丕且\n'.encode()
class CookieResource(Resource):
@@ -138,11 +134,13 @@ class TopLevelResource(Resource):
return b'<h1>Deluge HTTP Downloader tests webserver here</h1>'
-class DownloadFileTestCase(unittest.TestCase):
+class TestDownloadFile:
def get_url(self, path=''):
return 'http://localhost:%d/%s' % (self.listen_port, path)
- def setUp(self): # NOQA
+ @pytest_twisted.async_yield_fixture(autouse=True)
+ async def setUp(self, request): # NOQA
+ self = request.instance
setup_logger('warning', fname('log_file'))
self.website = Site(TopLevelResource())
self.listen_port = 51242
@@ -158,140 +156,136 @@ class DownloadFileTestCase(unittest.TestCase):
else:
raise error
- def tearDown(self): # NOQA
- return self.webserver.stopListening()
+ yield
+
+ await self.webserver.stopListening()
- def assertContains(self, filename, contents): # NOQA
- with open(filename, 'r', encoding='utf8') as _file:
+ def assert_contains(self, filename, contents):
+ with open(filename, encoding='utf8') as _file:
try:
- self.assertEqual(_file.read(), contents)
+ assert _file.read() == contents
except Exception as ex:
- self.fail(ex)
+ pytest.fail(ex)
return filename
- def assertNotContains(self, filename, contents, file_mode=''): # NOQA
- with open(filename, 'r', encoding='utf8') as _file:
+ def assert_not_contains(self, filename, contents, file_mode=''):
+ with open(filename, encoding='utf8') as _file:
try:
- self.assertNotEqual(_file.read(), contents)
+ assert _file.read() != contents
except Exception as ex:
- self.fail(ex)
+ pytest.fail(ex)
return filename
- def test_download(self):
- d = download_file(self.get_url(), fname('index.html'))
- d.addCallback(self.assertEqual, fname('index.html'))
- return d
+ @pytest_twisted.ensureDeferred
+ async def test_download(self):
+ filename = await download_file(self.get_url(), fname('index.html'))
+ assert filename == fname('index.html')
- def test_download_without_required_cookies(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_without_required_cookies(self):
url = self.get_url('cookie')
- d = download_file(url, fname('none'))
- d.addCallback(self.fail)
- d.addErrback(self.assertIsInstance, Failure)
- return d
+ filename = await download_file(url, fname('none'))
+ self.assert_contains(filename, 'Password cookie not set!')
- def test_download_with_required_cookies(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_required_cookies(self):
url = self.get_url('cookie')
cookie = {'cookie': 'password=deluge'}
- d = download_file(url, fname('monster'), headers=cookie)
- d.addCallback(self.assertEqual, fname('monster'))
- d.addCallback(self.assertContains, 'COOKIE MONSTER!')
- return d
+ filename = await download_file(url, fname('monster'), headers=cookie)
+ assert filename == fname('monster')
+ self.assert_contains(filename, 'COOKIE MONSTER!')
- def test_download_with_rename(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_rename(self):
url = self.get_url('rename?filename=renamed')
- d = download_file(url, fname('original'))
- d.addCallback(self.assertEqual, fname('renamed'))
- d.addCallback(self.assertContains, 'This file should be called renamed')
- return d
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('renamed')
+ self.assert_contains(filename, 'This file should be called renamed')
- def test_download_with_rename_exists(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_rename_exists(self):
open(fname('renamed'), 'w').close()
url = self.get_url('rename?filename=renamed')
- d = download_file(url, fname('original'))
- d.addCallback(self.assertEqual, fname('renamed-1'))
- d.addCallback(self.assertContains, 'This file should be called renamed')
- return d
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('renamed-1')
+ self.assert_contains(filename, 'This file should be called renamed')
- def test_download_with_rename_sanitised(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_rename_sanitised(self):
url = self.get_url('rename?filename=/etc/passwd')
- d = download_file(url, fname('original'))
- d.addCallback(self.assertEqual, fname('passwd'))
- d.addCallback(self.assertContains, 'This file should be called /etc/passwd')
- return d
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('passwd')
+ self.assert_contains(filename, 'This file should be called /etc/passwd')
- def test_download_with_attachment_no_filename(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_attachment_no_filename(self):
url = self.get_url('attachment')
- d = download_file(url, fname('original'))
- d.addCallback(self.assertEqual, fname('original'))
- d.addCallback(self.assertContains, 'Attachment with no filename set')
- return d
+ filename = await download_file(url, fname('original'))
+ assert filename == fname('original')
+ self.assert_contains(filename, 'Attachment with no filename set')
- def test_download_with_rename_prevented(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_rename_prevented(self):
url = self.get_url('rename?filename=spam')
- d = download_file(url, fname('forced'), force_filename=True)
- d.addCallback(self.assertEqual, fname('forced'))
- d.addCallback(self.assertContains, 'This file should be called spam')
- return d
+ filename = await download_file(url, fname('forced'), force_filename=True)
+ assert filename == fname('forced')
+ self.assert_contains(filename, 'This file should be called spam')
- def test_download_with_gzip_encoding(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_gzip_encoding(self):
url = self.get_url('gzip?msg=success')
- d = download_file(url, fname('gzip_encoded'))
- d.addCallback(self.assertContains, 'success')
- return d
+ filename = await download_file(url, fname('gzip_encoded'))
+ self.assert_contains(filename, 'success')
- def test_download_with_gzip_encoding_disabled(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_with_gzip_encoding_disabled(self):
url = self.get_url('gzip?msg=unzip')
- d = download_file(url, fname('gzip_encoded'), allow_compression=False)
- d.addCallback(self.assertContains, 'unzip')
- return d
+ filename = await download_file(
+ url, fname('gzip_encoded'), allow_compression=False
+ )
+ self.assert_contains(filename, 'unzip')
- def test_page_redirect_unhandled(self):
+ @pytest_twisted.ensureDeferred
+ async def test_page_redirect_unhandled(self):
url = self.get_url('redirect')
- d = download_file(url, fname('none'))
- d.addCallback(self.fail)
+ with pytest.raises(PageRedirect):
+ await download_file(url, fname('none'), handle_redirects=False)
- def on_redirect(failure):
- self.assertTrue(type(failure), PageRedirect)
+ @pytest_twisted.ensureDeferred
+ async def test_page_redirect(self):
+ url = self.get_url('redirect')
+ filename = await download_file(url, fname('none'), handle_redirects=True)
+ assert filename == fname('none')
- d.addErrback(on_redirect)
- return d
+ @pytest_twisted.ensureDeferred
+ async def test_page_not_found(self):
+ with pytest.raises(Error):
+ await download_file(self.get_url('page/not/found'), fname('none'))
- def test_page_redirect(self):
- url = self.get_url('redirect')
- d = download_file(url, fname('none'), handle_redirects=True)
- d.addCallback(self.assertEqual, fname('none'))
- d.addErrback(self.fail)
- return d
-
- def test_page_not_found(self):
- d = download_file(self.get_url('page/not/found'), fname('none'))
- d.addCallback(self.fail)
- d.addErrback(self.assertIsInstance, Failure)
- return d
-
- def test_page_not_modified(self):
+ @pytest.mark.xfail(reason="Doesn't seem like httpdownloader ever implemented this.")
+ @pytest_twisted.ensureDeferred
+ async def test_page_not_modified(self):
headers = {'If-Modified-Since': formatdate(usegmt=True)}
- d = download_file(self.get_url(), fname('index.html'), headers=headers)
- d.addCallback(self.fail)
- d.addErrback(self.assertIsInstance, Failure)
- return d
+ with pytest.raises(Error) as exc_info:
+ await download_file(self.get_url(), fname('index.html'), headers=headers)
+ assert exc_info.value.status == NOT_MODIFIED
- def test_download_text_reencode_charset(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_text_reencode_charset(self):
"""Re-encode as UTF-8 specified charset for text content-type header"""
url = self.get_url('attachment')
filepath = fname('test.txt')
headers = {'content-charset': 'Windows-1251', 'content-append': 'бвгде'}
- d = download_file(url, filepath, headers=headers)
- d.addCallback(self.assertEqual, filepath)
- d.addCallback(self.assertContains, 'Attachment with no filename setбвгде')
- return d
+ filename = await download_file(url, filepath, headers=headers)
+ assert filename == filepath
+ self.assert_contains(filename, 'Attachment with no filename setбвгде')
- def test_download_binary_ignore_charset(self):
+ @pytest_twisted.ensureDeferred
+ async def test_download_binary_ignore_charset(self):
"""Ignore charset for binary content-type header e.g. torrent files"""
url = self.get_url('torrent')
headers = {'content-charset': 'Windows-1251'}
filepath = fname('test.torrent')
- d = download_file(url, fname('test.torrent'), headers=headers)
- d.addCallback(self.assertEqual, filepath)
- d.addCallback(self.assertContains, 'Binary attachment ignore charset 世丕且\n')
- return d
+ filename = await download_file(url, fname('test.torrent'), headers=headers)
+ assert filename == filepath
+ self.assert_contains(filename, 'Binary attachment ignore charset 世丕且\n')
diff --git a/deluge/tests/test_json_api.py b/deluge/tests/test_json_api.py
index 1da64bf97..41efb0206 100644
--- a/deluge/tests/test_json_api.py
+++ b/deluge/tests/test_json_api.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
#
@@ -7,64 +6,35 @@
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import json as json_lib
+from unittest.mock import MagicMock
-from mock import MagicMock
-from twisted.internet import defer
+import pytest
+import pytest_twisted
from twisted.web import server
from twisted.web.http import Request
import deluge.common
-import deluge.component as component
import deluge.ui.web.auth
import deluge.ui.web.json_api
from deluge.error import DelugeError
-from deluge.ui.client import client
from deluge.ui.web.auth import Auth
from deluge.ui.web.json_api import JSON, JSONException
from . import common
-from .basetest import BaseTestCase
from .common_web import WebServerMockBase
-from .daemon_base import DaemonBase
common.disable_new_release_check()
-class JSONBase(BaseTestCase, DaemonBase):
- def connect_client(self, *args, **kwargs):
- return client.connect(
- 'localhost',
- self.listen_port,
- username=kwargs.get('user', ''),
- password=kwargs.get('password', ''),
- )
-
- def disconnect_client(self, *args):
- return client.disconnect()
-
- def tear_down(self):
- d = component.shutdown()
- d.addCallback(self.disconnect_client)
- d.addCallback(self.terminate_core)
- return d
-
-
-class JSONTestCase(JSONBase):
- def set_up(self):
- d = self.common_set_up()
- d.addCallback(self.start_core)
- d.addCallbacks(self.connect_client, self.terminate_core)
- return d
-
- @defer.inlineCallbacks
- def test_get_remote_methods(self):
+@pytest.mark.usefixtures('daemon', 'client', 'component')
+class TestJSON:
+ @pytest_twisted.ensureDeferred
+ async def test_get_remote_methods(self):
json = JSON()
- methods = yield json.get_remote_methods()
- self.assertEqual(type(methods), tuple)
- self.assertTrue(len(methods) > 0)
+ methods = await json.get_remote_methods()
+ assert type(methods) == tuple
+ assert len(methods) > 0
def test_render_fail_disconnected(self):
json = JSON()
@@ -72,7 +42,7 @@ class JSONTestCase(JSONBase):
request.method = b'POST'
request._disconnected = True
# When disconnected, returns empty string
- self.assertEqual(json.render(request), '')
+ assert json.render(request) == ''
def test_render_fail(self):
json = JSON()
@@ -82,19 +52,17 @@ class JSONTestCase(JSONBase):
def write(response_str):
request.write_was_called = True
response = json_lib.loads(response_str.decode())
- self.assertEqual(response['result'], None)
- self.assertEqual(response['id'], None)
- self.assertEqual(
- response['error']['message'], 'JSONException: JSON not decodable'
- )
- self.assertEqual(response['error']['code'], 5)
+ assert response['result'] is None
+ assert response['id'] is None
+ assert response['error']['message'] == 'JSONException: JSON not decodable'
+ assert response['error']['code'] == 5
request.write = write
request.write_was_called = False
request._disconnected = False
request.getHeader.return_value = b'application/json'
- self.assertEqual(json.render(request), server.NOT_DONE_YET)
- self.assertTrue(request.write_was_called)
+ assert json.render(request) == server.NOT_DONE_YET
+ assert request.write_was_called
def test_handle_request_invalid_method(self):
json = JSON()
@@ -102,20 +70,23 @@ class JSONTestCase(JSONBase):
json_data = {'method': 'no-existing-module.test', 'id': 0, 'params': []}
request.json = json_lib.dumps(json_data).encode()
request_id, result, error = json._handle_request(request)
- self.assertEqual(error, {'message': 'Unknown method', 'code': 2})
+ assert error == {'message': 'Unknown method', 'code': 2}
def test_handle_request_invalid_json_request(self):
json = JSON()
request = MagicMock()
json_data = {'id': 0, 'params': []}
request.json = json_lib.dumps(json_data).encode()
- self.assertRaises(JSONException, json._handle_request, request)
+ with pytest.raises(JSONException):
+ json._handle_request(request)
json_data = {'method': 'some.method', 'params': []}
request.json = json_lib.dumps(json_data).encode()
- self.assertRaises(JSONException, json._handle_request, request)
+ with pytest.raises(JSONException):
+ json._handle_request(request)
json_data = {'method': 'some.method', 'id': 0}
request.json = json_lib.dumps(json_data).encode()
- self.assertRaises(JSONException, json._handle_request, request)
+ with pytest.raises(JSONException):
+ json._handle_request(request)
def test_on_json_request_invalid_content_type(self):
"""Test for exception with content type not application/json"""
@@ -124,18 +95,32 @@ class JSONTestCase(JSONBase):
request.getHeader.return_value = b'text/plain'
json_data = {'method': 'some.method', 'id': 0, 'params': []}
request.json = json_lib.dumps(json_data).encode()
- self.assertRaises(JSONException, json._on_json_request, request)
+ with pytest.raises(JSONException):
+ json._on_json_request(request)
+ def test_on_json_request_valid_content_type(self):
+ """Ensure content-type application/json is accepted"""
+ json = JSON()
+ request = MagicMock()
+ request.getHeader.return_value = b'application/json'
+ json_data = {'method': 'some.method', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ json._on_json_request(request)
-class JSONCustomUserTestCase(JSONBase):
- def set_up(self):
- d = self.common_set_up()
- d.addCallback(self.start_core)
- return d
+ def test_on_json_request_valid_content_type_with_charset(self):
+ """Ensure content-type parameters such as charset are ignored"""
+ json = JSON()
+ request = MagicMock()
+ request.getHeader.return_value = b'application/json;charset=utf-8'
+ json_data = {'method': 'some.method', 'id': 0, 'params': []}
+ request.json = json_lib.dumps(json_data).encode()
+ json._on_json_request(request)
- @defer.inlineCallbacks
+
+@pytest.mark.usefixtures('daemon', 'client', 'component')
+class TestJSONCustomUserTestCase:
+ @pytest_twisted.inlineCallbacks
def test_handle_request_auth_error(self):
- yield self.connect_client()
json = JSON()
auth_conf = {'session_timeout': 10, 'sessions': {}}
Auth(auth_conf) # Must create the component
@@ -148,13 +133,12 @@ class JSONCustomUserTestCase(JSONBase):
json_data = {'method': 'core.get_libtorrent_version', 'id': 0, 'params': []}
request.json = json_lib.dumps(json_data).encode()
request_id, result, error = json._handle_request(request)
- self.assertEqual(error, {'message': 'Not authenticated', 'code': 1})
+ assert error == {'message': 'Not authenticated', 'code': 1}
-class RPCRaiseDelugeErrorJSONTestCase(JSONBase):
- def set_up(self):
- d = self.common_set_up()
- custom_script = """
+@pytest.mark.usefixtures('daemon', 'client', 'component')
+class TestRPCRaiseDelugeErrorJSON:
+ daemon_custom_script = """
from deluge.error import DelugeError
from deluge.core.rpcserver import export
class TestClass(object):
@@ -165,12 +149,9 @@ class RPCRaiseDelugeErrorJSONTestCase(JSONBase):
test = TestClass()
daemon.rpcserver.register_object(test)
"""
- d.addCallback(self.start_core, custom_script=custom_script)
- d.addCallbacks(self.connect_client, self.terminate_core)
- return d
- @defer.inlineCallbacks
- def test_handle_request_method_raise_delugeerror(self):
+ @pytest_twisted.ensureDeferred
+ async def test_handle_request_method_raise_delugeerror(self):
json = JSON()
def get_session_id(s_id):
@@ -182,9 +163,9 @@ class RPCRaiseDelugeErrorJSONTestCase(JSONBase):
request = Request(MagicMock(), False)
request.base = b''
auth._create_session(request)
- methods = yield json.get_remote_methods()
+ methods = await json.get_remote_methods()
# Verify the function has been registered
- self.assertTrue('testclass.test' in methods)
+ assert 'testclass.test' in methods
request = MagicMock()
session_id = list(auth.config['sessions'])[0]
@@ -192,18 +173,13 @@ class RPCRaiseDelugeErrorJSONTestCase(JSONBase):
json_data = {'method': 'testclass.test', 'id': 0, 'params': []}
request.json = json_lib.dumps(json_data).encode()
request_id, result, error = json._handle_request(request)
- result.addCallback(self.fail)
-
- def on_error(error):
- self.assertEqual(error.type, DelugeError)
-
- result.addErrback(on_error)
- yield result
+ with pytest.raises(DelugeError):
+ await result
-class JSONRequestFailedTestCase(JSONBase, WebServerMockBase):
- def set_up(self):
- d = self.common_set_up()
+class TestJSONRequestFailed(WebServerMockBase):
+ @pytest_twisted.async_yield_fixture(autouse=True)
+ async def set_up(self, config_dir):
custom_script = """
from deluge.error import DelugeError
from deluge.core.rpcserver import export
@@ -234,28 +210,29 @@ class JSONRequestFailedTestCase(JSONBase, WebServerMockBase):
}
def on_test_raise(*args):
- self.assertTrue('Unhandled error in Deferred:' in self.core.stderr_out)
- self.assertTrue('in test_raise_error' in self.core.stderr_out)
+ assert 'Unhandled error in Deferred:' in self.core.stderr_out
+ assert 'in test_raise_error' in self.core.stderr_out
extra_callback['deferred'].addCallback(on_test_raise)
- d.addCallback(
- self.start_core,
+ d, daemon = common.start_core(
custom_script=custom_script,
print_stdout=False,
print_stderr=False,
timeout=5,
extra_callbacks=[extra_callback],
+ config_directory=config_dir,
)
- d.addCallbacks(self.connect_client, self.terminate_core)
- return d
+ await d
+ yield
+ await daemon.kill()
- @defer.inlineCallbacks
- def test_render_on_rpc_request_failed(self):
+ @pytest_twisted.inlineCallbacks
+ def test_render_on_rpc_request_failed(self, component, client):
json = JSON()
methods = yield json.get_remote_methods()
# Verify the function has been registered
- self.assertTrue('testclass.test' in methods)
+ assert 'testclass.test' in methods
request = MagicMock()
@@ -266,14 +243,14 @@ class JSONRequestFailedTestCase(JSONBase, WebServerMockBase):
def write(response_str):
request.write_was_called = True
response = json_lib.loads(response_str.decode())
- self.assertEqual(response['result'], None, 'BAD RESULT')
- self.assertEqual(response['id'], 0)
- self.assertEqual(
- response['error']['message'],
- 'Failure: [Failure instance: Traceback (failure with no frames):'
- " <class 'deluge.error.DelugeError'>: DelugeERROR\n]",
+ assert response['result'] is None, 'BAD RESULT'
+ assert response['id'] == 0
+ assert (
+ response['error']['message']
+ == 'Failure: [Failure instance: Traceback (failure with no frames):'
+ " <class 'deluge.error.DelugeError'>: DelugeERROR\n]"
)
- self.assertEqual(response['error']['code'], 4)
+ assert response['error']['code'] == 4
request.write = write
request.write_was_called = False
@@ -284,8 +261,8 @@ class JSONRequestFailedTestCase(JSONBase, WebServerMockBase):
d = json._on_json_request(request)
def on_success(arg):
- self.assertEqual(arg, server.NOT_DONE_YET)
+ assert arg == server.NOT_DONE_YET
return True
- d.addCallbacks(on_success, self.fail)
+ d.addCallbacks(on_success, pytest.fail)
yield d
diff --git a/deluge/tests/test_log.py b/deluge/tests/test_log.py
index 572693b7c..f0dcbee86 100644
--- a/deluge/tests/test_log.py
+++ b/deluge/tests/test_log.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 Calum Lind <calumlind@gmail.com>
# Copyright (C) 2010 Pedro Algarvio <ufs@ufsoft.org>
@@ -8,17 +7,14 @@
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import logging
import warnings
+from deluge.conftest import BaseTestCase
from deluge.log import setup_logger
-from .basetest import BaseTestCase
-
-class LogTestCase(BaseTestCase):
+class TestLog(BaseTestCase):
def set_up(self):
setup_logger(logging.DEBUG)
@@ -32,7 +28,7 @@ class LogTestCase(BaseTestCase):
# Cause all warnings to always be triggered.
warnings.simplefilter('always')
LOG.debug('foo')
- self.assertEqual(w[-1].category, DeprecationWarning)
+ assert w[-1].category == DeprecationWarning
# def test_twisted_error_log(self):
# from twisted.internet import defer
diff --git a/deluge/tests/test_maketorrent.py b/deluge/tests/test_maketorrent.py
index 4e0099653..a2e473f00 100644
--- a/deluge/tests/test_maketorrent.py
+++ b/deluge/tests/test_maketorrent.py
@@ -1,19 +1,13 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import os
import tempfile
-from twisted.trial import unittest
-
from deluge import maketorrent
-from deluge.common import windows_check
def check_torrent(filename):
@@ -28,7 +22,7 @@ def check_torrent(filename):
TorrentInfo(filename)
-class MakeTorrentTestCase(unittest.TestCase):
+class TestMakeTorrent:
def test_save_multifile(self):
# Create a temporary folder for torrent creation
tmp_path = tempfile.mkdtemp()
@@ -54,21 +48,16 @@ class MakeTorrentTestCase(unittest.TestCase):
os.remove(tmp_file)
def test_save_singlefile(self):
- if windows_check():
- raise unittest.SkipTest('on windows file not released')
- tmp_data = tempfile.mkstemp('testdata')[1]
- with open(tmp_data, 'wb') as _file:
- _file.write(b'a' * (2314 * 1024))
- t = maketorrent.TorrentMetadata()
- t.data_path = tmp_data
- tmp_fd, tmp_file = tempfile.mkstemp('.torrent')
- t.save(tmp_file)
-
- check_torrent(tmp_file)
-
- os.remove(tmp_data)
- os.close(tmp_fd)
- os.remove(tmp_file)
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ tmp_data = tmp_dir + '/data'
+ with open(tmp_data, 'wb') as _file:
+ _file.write(b'a' * (2314 * 1024))
+ t = maketorrent.TorrentMetadata()
+ t.data_path = tmp_data
+ tmp_file = tmp_dir + '/.torrent'
+ t.save(tmp_file)
+
+ check_torrent(tmp_file)
def test_save_multifile_padded(self):
# Create a temporary folder for torrent creation
diff --git a/deluge/tests/test_maybe_coroutine.py b/deluge/tests/test_maybe_coroutine.py
new file mode 100644
index 000000000..2717e78bb
--- /dev/null
+++ b/deluge/tests/test_maybe_coroutine.py
@@ -0,0 +1,213 @@
+#
+# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
+# the additional special exception to link portions of this program with the OpenSSL library.
+# See LICENSE for more details.
+#
+import pytest
+import pytest_twisted
+import twisted.python.failure
+from twisted.internet import defer, reactor, task
+from twisted.internet.defer import maybeDeferred
+
+from deluge.decorators import maybe_coroutine
+
+
+@defer.inlineCallbacks
+def inline_func():
+ result = yield task.deferLater(reactor, 0, lambda: 'function_result')
+ return result
+
+
+@defer.inlineCallbacks
+def inline_error():
+ raise Exception('function_error')
+ yield
+
+
+@maybe_coroutine
+async def coro_func():
+ result = await task.deferLater(reactor, 0, lambda: 'function_result')
+ return result
+
+
+@maybe_coroutine
+async def coro_error():
+ raise Exception('function_error')
+
+
+@defer.inlineCallbacks
+def coro_func_from_inline():
+ result = yield coro_func()
+ return result
+
+
+@defer.inlineCallbacks
+def coro_error_from_inline():
+ result = yield coro_error()
+ return result
+
+
+@maybe_coroutine
+async def coro_func_from_coro():
+ return await coro_func()
+
+
+@maybe_coroutine
+async def coro_error_from_coro():
+ return await coro_error()
+
+
+@maybe_coroutine
+async def inline_func_from_coro():
+ return await inline_func()
+
+
+@maybe_coroutine
+async def inline_error_from_coro():
+ return await inline_error()
+
+
+@pytest_twisted.inlineCallbacks
+def test_standard_twisted():
+ """Sanity check that twisted tests work how we expect.
+
+ Not really testing deluge code at all.
+ """
+ result = yield inline_func()
+ assert result == 'function_result'
+
+ with pytest.raises(Exception, match='function_error'):
+ yield inline_error()
+
+
+@pytest.mark.parametrize(
+ 'function',
+ [
+ inline_func,
+ coro_func,
+ coro_func_from_coro,
+ coro_func_from_inline,
+ inline_func_from_coro,
+ ],
+)
+@pytest_twisted.inlineCallbacks
+def test_from_inline(function):
+ """Test our coroutines wrapped with maybe_coroutine as if they returned plain twisted deferreds."""
+ result = yield function()
+ assert result == 'function_result'
+
+ def cb(result):
+ assert result == 'function_result'
+
+ d = function()
+ d.addCallback(cb)
+ yield d
+
+
+@pytest.mark.parametrize(
+ 'function',
+ [
+ inline_error,
+ coro_error,
+ coro_error_from_coro,
+ coro_error_from_inline,
+ inline_error_from_coro,
+ ],
+)
+@pytest_twisted.inlineCallbacks
+def test_error_from_inline(function):
+ """Test our coroutines wrapped with maybe_coroutine as if they returned plain twisted deferreds that raise."""
+ with pytest.raises(Exception, match='function_error'):
+ yield function()
+
+ def eb(result):
+ assert isinstance(result, twisted.python.failure.Failure)
+ assert result.getErrorMessage() == 'function_error'
+
+ d = function()
+ d.addErrback(eb)
+ yield d
+
+
+@pytest.mark.parametrize(
+ 'function',
+ [
+ inline_func,
+ coro_func,
+ coro_func_from_coro,
+ coro_func_from_inline,
+ inline_func_from_coro,
+ ],
+)
+@pytest_twisted.ensureDeferred
+async def test_from_coro(function):
+ """Test our coroutines wrapped with maybe_coroutine work from another coroutine."""
+ result = await function()
+ assert result == 'function_result'
+
+
+@pytest.mark.parametrize(
+ 'function',
+ [
+ inline_error,
+ coro_error,
+ coro_error_from_coro,
+ coro_error_from_inline,
+ inline_error_from_coro,
+ ],
+)
+@pytest_twisted.ensureDeferred
+async def test_error_from_coro(function):
+ """Test our coroutines wrapped with maybe_coroutine work from another coroutine with errors."""
+ with pytest.raises(Exception, match='function_error'):
+ await function()
+
+
+@pytest_twisted.ensureDeferred
+async def test_tracebacks_preserved():
+ with pytest.raises(Exception) as exc:
+ await coro_error_from_coro()
+ traceback_lines = [
+ 'await coro_error_from_coro()',
+ 'return await coro_error()',
+ "raise Exception('function_error')",
+ ]
+ # If each coroutine got wrapped with ensureDeferred, the traceback will be mangled
+ # verify the coroutines passed through by checking the traceback.
+ for expected, actual in zip(traceback_lines, exc.traceback):
+ assert expected in str(actual)
+
+
+@pytest_twisted.ensureDeferred
+async def test_maybe_deferred_coroutine():
+ result = await maybeDeferred(coro_func)
+ assert result == 'function_result'
+
+
+@pytest_twisted.ensureDeferred
+async def test_callback_before_await():
+ def cb(res):
+ assert res == 'function_result'
+ return res
+
+ d = coro_func()
+ d.addCallback(cb)
+ result = await d
+ assert result == 'function_result'
+
+
+@pytest_twisted.ensureDeferred
+async def test_callback_after_await():
+ """If it has already been used as a coroutine, can't be retroactively turned into a Deferred.
+ This limitation could be fixed, but the extra complication doesn't feel worth it.
+ """
+
+ def cb(res):
+ pass
+
+ d = coro_func()
+ await d
+ with pytest.raises(
+ Exception, match='Cannot add callbacks to an already awaited coroutine'
+ ):
+ d.addCallback(cb)
diff --git a/deluge/tests/test_metafile.py b/deluge/tests/test_metafile.py
index fc6507cb8..fda1cb73e 100644
--- a/deluge/tests/test_metafile.py
+++ b/deluge/tests/test_metafile.py
@@ -1,19 +1,13 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import os
import tempfile
-from twisted.trial import unittest
-
from deluge import metafile
-from deluge.common import windows_check
def check_torrent(filename):
@@ -28,7 +22,7 @@ def check_torrent(filename):
TorrentInfo(filename)
-class MetafileTestCase(unittest.TestCase):
+class TestMetafile:
def test_save_multifile(self):
# Create a temporary folder for torrent creation
tmp_path = tempfile.mkdtemp()
@@ -52,17 +46,12 @@ class MetafileTestCase(unittest.TestCase):
os.remove(tmp_file)
def test_save_singlefile(self):
- if windows_check():
- raise unittest.SkipTest('on windows \\ != / for path names')
- tmp_path = tempfile.mkstemp('testdata')[1]
- with open(tmp_path, 'wb') as tmp_file:
- tmp_file.write(b'a' * (2314 * 1024))
-
- tmp_fd, tmp_file = tempfile.mkstemp('.torrent')
- metafile.make_meta_file(tmp_path, '', 32768, target=tmp_file)
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ tmp_data = tmp_dir + '/testdata'
+ with open(tmp_data, 'wb') as tmp_file:
+ tmp_file.write(b'a' * (2314 * 1024))
- check_torrent(tmp_file)
+ tmp_torrent = tmp_dir + '/.torrent'
+ metafile.make_meta_file(tmp_data, '', 32768, target=tmp_torrent)
- os.remove(tmp_path)
- os.close(tmp_fd)
- os.remove(tmp_file)
+ check_torrent(tmp_torrent)
diff --git a/deluge/tests/test_plugin_metadata.py b/deluge/tests/test_plugin_metadata.py
index 436fc2c50..adf115d1b 100644
--- a/deluge/tests/test_plugin_metadata.py
+++ b/deluge/tests/test_plugin_metadata.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2015 Calum Lind <calumlind@gmail.com>
#
@@ -7,25 +6,38 @@
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
from deluge.pluginmanagerbase import PluginManagerBase
-from . import common
-from .basetest import BaseTestCase
-
-
-class PluginManagerBaseTestCase(BaseTestCase):
- def set_up(self):
- common.set_tmp_config_dir()
+class TestPluginManagerBase:
def test_get_plugin_info(self):
pm = PluginManagerBase('core.conf', 'deluge.plugin.core')
for p in pm.get_available_plugins():
for key, value in pm.get_plugin_info(p).items():
- self.assertTrue(isinstance('%s: %s' % (key, value), ''.__class__))
+ assert isinstance(key, str)
+ assert isinstance(value, str)
def test_get_plugin_info_invalid_name(self):
pm = PluginManagerBase('core.conf', 'deluge.plugin.core')
for key, value in pm.get_plugin_info('random').items():
- self.assertEqual(value, 'not available')
+ result = 'not available' if key in ('Name', 'Version') else ''
+ assert value == result
+
+ def test_parse_pkg_info_metadata_2_1(self):
+ pkg_info = """Metadata-Version: 2.1
+Name: AutoAdd
+Version: 1.8
+Summary: Monitors folders for .torrent files.
+Home-page: http://dev.deluge-torrent.org/wiki/Plugins/AutoAdd
+Author: Chase Sterling, Pedro Algarvio
+Author-email: chase.sterling@gmail.com, pedro@algarvio.me
+License: GPLv3
+Platform: UNKNOWN
+
+Monitors folders for .torrent files.
+ """
+ plugin_info = PluginManagerBase.parse_pkg_info(pkg_info)
+ for value in plugin_info.values():
+ assert value != ''
+ result = 'Monitors folders for .torrent files.'
+ assert plugin_info['Description'] == result
diff --git a/deluge/tests/test_rpcserver.py b/deluge/tests/test_rpcserver.py
index 02f9af023..982d1d5f1 100644
--- a/deluge/tests/test_rpcserver.py
+++ b/deluge/tests/test_rpcserver.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2013 Bro <bro.development@gmail.com>
#
@@ -7,18 +6,15 @@
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import deluge.component as component
import deluge.error
from deluge.common import get_localhost_auth
+from deluge.conftest import BaseTestCase
from deluge.core import rpcserver
from deluge.core.authmanager import AuthManager
from deluge.core.rpcserver import DelugeRPCProtocol, RPCServer
from deluge.log import setup_logger
-from .basetest import BaseTestCase
-
setup_logger('none')
@@ -30,7 +26,7 @@ class DelugeRPCProtocolTester(DelugeRPCProtocol):
self.messages.append(data)
-class RPCServerTestCase(BaseTestCase):
+class TestRPCServer(BaseTestCase):
def set_up(self):
self.rpcserver = RPCServer(listen=False)
self.rpcserver.factory.protocol = DelugeRPCProtocolTester
@@ -60,15 +56,15 @@ class RPCServerTestCase(BaseTestCase):
e = TorrentFolderRenamedEvent(*data)
self.rpcserver.emit_event_for_session_id(self.session_id, e)
msg = self.protocol.messages.pop()
- self.assertEqual(msg[0], rpcserver.RPC_EVENT, str(msg))
- self.assertEqual(msg[1], 'TorrentFolderRenamedEvent', str(msg))
- self.assertEqual(msg[2], data, str(msg))
+ assert msg[0] == rpcserver.RPC_EVENT, str(msg)
+ assert msg[1] == 'TorrentFolderRenamedEvent', str(msg)
+ assert msg[2] == data, str(msg)
def test_invalid_client_login(self):
self.protocol.dispatch(self.request_id, 'daemon.login', [1], {})
msg = self.protocol.messages.pop()
- self.assertEqual(msg[0], rpcserver.RPC_ERROR)
- self.assertEqual(msg[1], self.request_id)
+ assert msg[0] == rpcserver.RPC_ERROR
+ assert msg[1] == self.request_id
def test_valid_client_login(self):
self.authmanager = AuthManager()
@@ -77,9 +73,9 @@ class RPCServerTestCase(BaseTestCase):
self.request_id, 'daemon.login', auth, {'client_version': 'Test'}
)
msg = self.protocol.messages.pop()
- self.assertEqual(msg[0], rpcserver.RPC_RESPONSE, str(msg))
- self.assertEqual(msg[1], self.request_id, str(msg))
- self.assertEqual(msg[2], rpcserver.AUTH_LEVEL_ADMIN, str(msg))
+ assert msg[0] == rpcserver.RPC_RESPONSE, str(msg)
+ assert msg[1] == self.request_id, str(msg)
+ assert msg[2] == rpcserver.AUTH_LEVEL_ADMIN, str(msg)
def test_client_login_error(self):
# This test causes error log prints while running the test...
@@ -90,24 +86,24 @@ class RPCServerTestCase(BaseTestCase):
self.request_id, 'daemon.login', auth, {'client_version': 'Test'}
)
msg = self.protocol.messages.pop()
- self.assertEqual(msg[0], rpcserver.RPC_ERROR)
- self.assertEqual(msg[1], self.request_id)
- self.assertEqual(msg[2], 'WrappedException')
- self.assertEqual(msg[3][1], 'AttributeError')
+ assert msg[0] == rpcserver.RPC_ERROR
+ assert msg[1] == self.request_id
+ assert msg[2] == 'WrappedException'
+ assert msg[3][1] == 'AttributeError'
def test_client_invalid_method_call(self):
self.authmanager = AuthManager()
auth = get_localhost_auth()
self.protocol.dispatch(self.request_id, 'invalid_function', auth, {})
msg = self.protocol.messages.pop()
- self.assertEqual(msg[0], rpcserver.RPC_ERROR)
- self.assertEqual(msg[1], self.request_id)
- self.assertEqual(msg[2], 'WrappedException')
- self.assertEqual(msg[3][1], 'AttributeError')
+ assert msg[0] == rpcserver.RPC_ERROR
+ assert msg[1] == self.request_id
+ assert msg[2] == 'WrappedException'
+ assert msg[3][1] == 'AttributeError'
def test_daemon_info(self):
self.protocol.dispatch(self.request_id, 'daemon.info', [], {})
msg = self.protocol.messages.pop()
- self.assertEqual(msg[0], rpcserver.RPC_RESPONSE, str(msg))
- self.assertEqual(msg[1], self.request_id, str(msg))
- self.assertEqual(msg[2], deluge.common.get_version(), str(msg))
+ assert msg[0] == rpcserver.RPC_RESPONSE, str(msg)
+ assert msg[1] == self.request_id, str(msg)
+ assert msg[2] == deluge.common.get_version(), str(msg)
diff --git a/deluge/tests/test_security.py b/deluge/tests/test_security.py
index 700fc9967..e3e434433 100644
--- a/deluge/tests/test_security.py
+++ b/deluge/tests/test_security.py
@@ -1,12 +1,9 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import print_function, unicode_literals
-
import os
import pytest
@@ -16,9 +13,9 @@ import deluge.component as component
import deluge.ui.web.server
from deluge import configmanager
from deluge.common import windows_check
+from deluge.conftest import BaseTestCase
from deluge.ui.web.server import DelugeWeb
-from .basetest import BaseTestCase
from .common import get_test_data_file
from .common_web import WebServerTestBase
from .daemon_base import DaemonBase
@@ -26,15 +23,10 @@ from .daemon_base import DaemonBase
SECURITY_TESTS = bool(os.getenv('SECURITY_TESTS', False))
-class SecurityBaseTestCase(object):
- if windows_check():
- skip = 'windows cannot run .sh files'
- elif not SECURITY_TESTS:
- skip = 'Skipping security tests'
-
- http_err = 'cannot run http tests on daemon'
-
- def __init__(self):
+# TODO: This whole module has not been tested since migrating tests fully to pytest
+class SecurityBaseTestCase:
+ @pytest.fixture(autouse=True)
+ def setvars(self):
self.home_dir = os.path.expanduser('~')
self.port = 8112
@@ -45,6 +37,7 @@ class SecurityBaseTestCase(object):
get_test_data_file('testssl.sh'),
'--quiet',
'--nodns',
+ 'none',
'--color',
'0',
test,
@@ -55,11 +48,11 @@ class SecurityBaseTestCase(object):
def on_result(results):
if test == '-e':
- results = results[0].split('\n')[7:-6]
- self.assertTrue(len(results) > 3)
+ results = results[0].split(b'\n')[7:-6]
+ assert len(results) > 3
else:
- self.assertIn('OK', results[0])
- self.assertNotIn('NOT ok', results[0])
+ assert b'OK' in results[0]
+ assert b'NOT ok' not in results[0]
d.addCallback(on_result)
return d
@@ -76,18 +69,12 @@ class SecurityBaseTestCase(object):
def test_secured_webserver_css_injection_vulnerability(self):
return self._run_test('-I')
- def test_secured_webserver_ticketbleed_vulnerability(self):
- return self._run_test('-T')
-
def test_secured_webserver_renegotiation_vulnerabilities(self):
return self._run_test('-R')
def test_secured_webserver_crime_vulnerability(self):
return self._run_test('-C')
- def test_secured_webserver_breach_vulnerability(self):
- return self._run_test('-B')
-
def test_secured_webserver_poodle_vulnerability(self):
return self._run_test('-O')
@@ -121,33 +108,14 @@ class SecurityBaseTestCase(object):
def test_secured_webserver_preference(self):
return self._run_test('-P')
- def test_secured_webserver_headers(self):
- return self._run_test('-h')
-
def test_secured_webserver_ciphers(self):
return self._run_test('-e')
+@pytest.mark.skipif(windows_check(), reason='windows cannot run .sh files')
+@pytest.mark.skipif(not SECURITY_TESTS, reason='skipping security tests')
@pytest.mark.security
-class DaemonSecurityTestCase(BaseTestCase, DaemonBase, SecurityBaseTestCase):
-
- if windows_check():
- skip = 'windows cannot start_core not enough arguments for format string'
-
- def __init__(self, testname):
- super(DaemonSecurityTestCase, self).__init__(testname)
- DaemonBase.__init__(self)
- SecurityBaseTestCase.__init__(self)
-
- def setUp(self):
- skip = False
- for not_http_test in ('breach', 'headers', 'ticketbleed'):
- if not_http_test in self.id().split('.')[-1]:
- self.skipTest(SecurityBaseTestCase.http_err)
- skip = True
- if not skip:
- super(DaemonSecurityTestCase, self).setUp()
-
+class TestDaemonSecurity(BaseTestCase, DaemonBase, SecurityBaseTestCase):
def set_up(self):
d = self.common_set_up()
self.port = self.listen_port
@@ -161,12 +129,10 @@ class DaemonSecurityTestCase(BaseTestCase, DaemonBase, SecurityBaseTestCase):
return d
+@pytest.mark.skipif(windows_check(), reason='windows cannot run .sh files')
+@pytest.mark.skipif(not SECURITY_TESTS, reason='skipping security tests')
@pytest.mark.security
-class WebUISecurityTestBase(WebServerTestBase, SecurityBaseTestCase):
- def __init__(self, testname):
- super(WebUISecurityTestBase, self).__init__(testname)
- SecurityBaseTestCase.__init__(self)
-
+class TestWebUISecurity(WebServerTestBase, SecurityBaseTestCase):
def start_webapi(self, arg):
self.port = self.webserver_listen_port = 8999
@@ -182,3 +148,12 @@ class WebUISecurityTestBase(WebServerTestBase, SecurityBaseTestCase):
self.deluge_web.web_api.hostlist.config['hosts'][0] = tuple(host)
self.host_id = host[0]
self.deluge_web.start()
+
+ def test_secured_webserver_headers(self):
+ return self._run_test('-h')
+
+ def test_secured_webserver_breach_vulnerability(self):
+ return self._run_test('-B')
+
+ def test_secured_webserver_ticketbleed_vulnerability(self):
+ return self._run_test('-T')
diff --git a/deluge/tests/test_sessionproxy.py b/deluge/tests/test_sessionproxy.py
index 03f3cc27e..6fbbb248b 100644
--- a/deluge/tests/test_sessionproxy.py
+++ b/deluge/tests/test_sessionproxy.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
#
@@ -6,19 +5,16 @@
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-
-from __future__ import unicode_literals
-
+import pytest_twisted
from twisted.internet.defer import maybeDeferred, succeed
from twisted.internet.task import Clock
import deluge.component as component
import deluge.ui.sessionproxy
+from deluge.conftest import BaseTestCase
-from .basetest import BaseTestCase
-
-class Core(object):
+class Core:
def __init__(self):
self.reset()
@@ -91,7 +87,7 @@ class Core(object):
return succeed(ret)
-class Client(object):
+class Client:
def __init__(self):
self.core = Core()
@@ -105,7 +101,7 @@ class Client(object):
client = Client()
-class SessionProxyTestCase(BaseTestCase):
+class TestSessionProxy(BaseTestCase):
def set_up(self):
self.clock = Clock()
self.patch(deluge.ui.sessionproxy, 'time', self.clock.seconds)
@@ -127,38 +123,38 @@ class SessionProxyTestCase(BaseTestCase):
return component.deregister(self.sp)
def test_startup(self):
- self.assertEqual(client.core.torrents['a'], self.sp.torrents['a'][1])
+ assert client.core.torrents['a'] == self.sp.torrents['a'][1]
- def test_get_torrent_status_no_change(self):
- d = self.sp.get_torrent_status('a', [])
- d.addCallback(self.assertEqual, client.core.torrents['a'])
- return d
+ @pytest_twisted.ensureDeferred
+ async def test_get_torrent_status_no_change(self):
+ result = await self.sp.get_torrent_status('a', [])
+ assert result == client.core.torrents['a']
- def test_get_torrent_status_change_with_cache(self):
+ @pytest_twisted.ensureDeferred
+ async def test_get_torrent_status_change_with_cache(self):
client.core.torrents['a']['key1'] = 2
- d = self.sp.get_torrent_status('a', ['key1'])
- d.addCallback(self.assertEqual, {'key1': 1})
- return d
+ result = await self.sp.get_torrent_status('a', ['key1'])
+ assert result == {'key1': 1}
- def test_get_torrent_status_change_without_cache(self):
+ @pytest_twisted.ensureDeferred
+ async def test_get_torrent_status_change_without_cache(self):
client.core.torrents['a']['key1'] = 2
self.clock.advance(self.sp.cache_time + 0.1)
- d = self.sp.get_torrent_status('a', [])
- d.addCallback(self.assertEqual, client.core.torrents['a'])
- return d
+ result = await self.sp.get_torrent_status('a', [])
+ assert result == client.core.torrents['a']
- def test_get_torrent_status_key_not_updated(self):
+ @pytest_twisted.ensureDeferred
+ async def test_get_torrent_status_key_not_updated(self):
self.clock.advance(self.sp.cache_time + 0.1)
self.sp.get_torrent_status('a', ['key1'])
client.core.torrents['a']['key2'] = 99
- d = self.sp.get_torrent_status('a', ['key2'])
- d.addCallback(self.assertEqual, {'key2': 99})
- return d
+ result = await self.sp.get_torrent_status('a', ['key2'])
+ assert result == {'key2': 99}
- def test_get_torrents_status_key_not_updated(self):
+ @pytest_twisted.ensureDeferred
+ async def test_get_torrents_status_key_not_updated(self):
self.clock.advance(self.sp.cache_time + 0.1)
self.sp.get_torrents_status({'id': ['a']}, ['key1'])
client.core.torrents['a']['key2'] = 99
- d = self.sp.get_torrents_status({'id': ['a']}, ['key2'])
- d.addCallback(self.assertEqual, {'a': {'key2': 99}})
- return d
+ result = await self.sp.get_torrents_status({'id': ['a']}, ['key2'])
+ assert result == {'a': {'key2': 99}}
diff --git a/deluge/tests/test_torrent.py b/deluge/tests/test_torrent.py
index 5da817924..36adc0fde 100644
--- a/deluge/tests/test_torrent.py
+++ b/deluge/tests/test_torrent.py
@@ -1,41 +1,37 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-
-from __future__ import print_function, unicode_literals
-
+import itertools
import os
import time
from base64 import b64encode
+from unittest import mock
-import mock
+import pytest
+import pytest_twisted
from twisted.internet import defer, reactor
from twisted.internet.task import deferLater
-from twisted.trial import unittest
import deluge.component as component
import deluge.core.torrent
import deluge.tests.common as common
from deluge._libtorrent import lt
-from deluge.common import VersionSplit, utf8_encode_structure, windows_check
+from deluge.common import VersionSplit, utf8_encode_structure
+from deluge.conftest import BaseTestCase
from deluge.core.core import Core
from deluge.core.rpcserver import RPCServer
from deluge.core.torrent import Torrent
from deluge.core.torrentmanager import TorrentManager, TorrentState
-from .basetest import BaseTestCase
-
-class TorrentTestCase(BaseTestCase):
+class TestTorrent(BaseTestCase):
def setup_config(self):
- config_dir = common.set_tmp_config_dir()
core_config = deluge.config.Config(
'core.conf',
defaults=deluge.core.preferencesmanager.DEFAULT_PREFS,
- config_dir=config_dir,
+ config_dir=self.config_dir,
)
core_config.save()
@@ -67,7 +63,7 @@ class TorrentTestCase(BaseTestCase):
def assert_state(self, torrent, state):
torrent.update_state()
- self.assertEqual(torrent.state, state)
+ assert torrent.state == state
def get_torrent_atp(self, filename):
filename = common.get_test_data_file(filename)
@@ -85,25 +81,37 @@ class TorrentTestCase(BaseTestCase):
}
return atp
- def test_set_file_priorities(self):
+ @pytest_twisted.ensureDeferred
+ async def test_set_file_priorities(self):
+ if getattr(lt, 'file_prio_alert', None):
+ # Libtorrent 2.0.3 and later has a file_prio_alert
+ prios_set = defer.Deferred()
+ prios_set.addTimeout(1.5, reactor)
+ component.get('AlertManager').register_handler(
+ 'file_prio_alert', lambda a: prios_set.callback(True)
+ )
+ else:
+ # On older libtorrent, we just wait a while
+ prios_set = deferLater(reactor, 0.8)
+
atp = self.get_torrent_atp('dir_with_6_files.torrent')
handle = self.session.add_torrent(atp)
torrent = Torrent(handle, {})
result = torrent.get_file_priorities()
- self.assertTrue(all(x == 4 for x in result))
+ assert all(x == 4 for x in result)
new_priorities = [3, 1, 2, 0, 5, 6, 7]
torrent.set_file_priorities(new_priorities)
- self.assertEqual(torrent.get_file_priorities(), new_priorities)
+ assert torrent.get_file_priorities() == new_priorities
# Test with handle.piece_priorities as handle.file_priorities async
# updates and will return old value. Also need to remove a priority
# value as one file is much smaller than piece size so doesn't show.
- time.sleep(0.6) # Delay to wait for alert from lt
- piece_prio = handle.piece_priorities()
+ await prios_set # Delay to wait for alert from lt
+ piece_prio = handle.get_piece_priorities()
result = all(p in piece_prio for p in [3, 2, 0, 5, 6, 7])
- self.assertTrue(result)
+ assert result
def test_set_prioritize_first_last_pieces(self):
piece_indexes = [
@@ -143,19 +151,19 @@ class TorrentTestCase(BaseTestCase):
handle = self.session.add_torrent(atp)
self.torrent = Torrent(handle, {})
- priorities_original = handle.piece_priorities()
+ priorities_original = handle.get_piece_priorities()
self.torrent.set_prioritize_first_last_pieces(True)
- priorities = handle.piece_priorities()
+ priorities = handle.get_piece_priorities()
# The length of the list of new priorites is the same as the original
- self.assertEqual(len(priorities_original), len(priorities))
+ assert len(priorities_original) == len(priorities)
# Test the priority of all the pieces against the calculated indexes.
for idx, priority in enumerate(priorities):
if idx in prioritized_piece_indexes:
- self.assertEqual(priorities[idx], 7)
+ assert priorities[idx] == 7
else:
- self.assertEqual(priorities[idx], 4)
+ assert priorities[idx] == 4
# self.print_priority_list(priorities)
@@ -167,17 +175,15 @@ class TorrentTestCase(BaseTestCase):
self.torrent.set_prioritize_first_last_pieces(True)
# Reset pirorities
self.torrent.set_prioritize_first_last_pieces(False)
- priorities = handle.piece_priorities()
+ priorities = handle.get_piece_priorities()
# Test the priority of the prioritized pieces
for i in priorities:
- self.assertEqual(priorities[i], 4)
+ assert priorities[i] == 4
# self.print_priority_list(priorities)
def test_torrent_error_data_missing(self):
- if windows_check():
- raise unittest.SkipTest('unexpected end of file in bencoded string')
options = {'seed_mode': True}
filename = common.get_test_data_file('test_torrent.file.torrent')
with open(filename, 'rb') as _file:
@@ -194,8 +200,6 @@ class TorrentTestCase(BaseTestCase):
self.assert_state(torrent, 'Error')
def test_torrent_error_resume_original_state(self):
- if windows_check():
- raise unittest.SkipTest('unexpected end of file in bencoded string')
options = {'seed_mode': True, 'add_paused': True}
filename = common.get_test_data_file('test_torrent.file.torrent')
with open(filename, 'rb') as _file:
@@ -215,10 +219,8 @@ class TorrentTestCase(BaseTestCase):
torrent.force_recheck()
def test_torrent_error_resume_data_unaltered(self):
- if windows_check():
- raise unittest.SkipTest('unexpected end of file in bencoded string')
if VersionSplit(lt.__version__) >= VersionSplit('1.2.0.0'):
- raise unittest.SkipTest('Test not working as expected on lt 1.2 or greater')
+ pytest.skip('Test not working as expected on lt 1.2 or greater')
resume_data = {
'active_time': 13399,
@@ -286,7 +288,7 @@ class TorrentTestCase(BaseTestCase):
tm_resume_data = lt.bdecode(
self.core.torrentmanager.resume_data[torrent.torrent_id]
)
- self.assertEqual(tm_resume_data, resume_data)
+ assert tm_resume_data == resume_data
return deferLater(reactor, 0.5, assert_resume_data)
@@ -294,7 +296,7 @@ class TorrentTestCase(BaseTestCase):
atp = self.get_torrent_atp('test_torrent.file.torrent')
handle = self.session.add_torrent(atp)
self.torrent = Torrent(handle, {})
- self.assertEqual(self.torrent.get_eta(), 0)
+ assert self.torrent.get_eta() == 0
self.torrent.status = mock.MagicMock()
self.torrent.status.upload_payload_rate = 5000
@@ -304,18 +306,18 @@ class TorrentTestCase(BaseTestCase):
self.torrent.is_finished = True
self.torrent.options = {'stop_at_ratio': False}
# Test finished and uploading but no stop_at_ratio set.
- self.assertEqual(self.torrent.get_eta(), 0)
+ assert self.torrent.get_eta() == 0
self.torrent.options = {'stop_at_ratio': True, 'stop_ratio': 1.5}
result = self.torrent.get_eta()
- self.assertEqual(result, 2)
- self.assertIsInstance(result, int)
+ assert result == 2
+ assert isinstance(result, int)
def test_get_eta_downloading(self):
atp = self.get_torrent_atp('test_torrent.file.torrent')
handle = self.session.add_torrent(atp)
self.torrent = Torrent(handle, {})
- self.assertEqual(self.torrent.get_eta(), 0)
+ assert self.torrent.get_eta() == 0
self.torrent.status = mock.MagicMock()
self.torrent.status.download_payload_rate = 50
@@ -323,15 +325,15 @@ class TorrentTestCase(BaseTestCase):
self.torrent.status.total_wanted_done = 5000
result = self.torrent.get_eta()
- self.assertEqual(result, 100)
- self.assertIsInstance(result, int)
+ assert result == 100
+ assert isinstance(result, int)
def test_get_name_unicode(self):
"""Test retrieving a unicode torrent name from libtorrent."""
atp = self.get_torrent_atp('unicode_file.torrent')
handle = self.session.add_torrent(atp)
self.torrent = Torrent(handle, {})
- self.assertEqual(self.torrent.get_name(), 'সুকুমার রায়.txt')
+ assert self.torrent.get_name() == 'সুকুমার রায়.txt'
def test_rename_unicode(self):
"""Test renaming file/folders with unicode filenames."""
@@ -342,15 +344,32 @@ class TorrentTestCase(BaseTestCase):
TorrentManager.save_resume_data = mock.MagicMock
result = self.torrent.rename_folder('unicode_filenames', 'Горбачёв')
- self.assertIsInstance(result, defer.DeferredList)
+ assert isinstance(result, defer.DeferredList)
result = self.torrent.rename_files([[0, 'new_рбачёв']])
- self.assertIsNone(result)
+ assert result is None
def test_connect_peer_port(self):
"""Test to ensure port is int for libtorrent"""
atp = self.get_torrent_atp('test_torrent.file.torrent')
handle = self.session.add_torrent(atp)
self.torrent = Torrent(handle, {})
- self.assertFalse(self.torrent.connect_peer('127.0.0.1', 'text'))
- self.assertTrue(self.torrent.connect_peer('127.0.0.1', '1234'))
+ assert not self.torrent.connect_peer('127.0.0.1', 'text')
+ assert self.torrent.connect_peer('127.0.0.1', '1234')
+
+ def test_status_cache(self):
+ atp = self.get_torrent_atp('test_torrent.file.torrent')
+ handle = self.session.add_torrent(atp)
+ mock_time = mock.Mock(return_value=time.time())
+ with mock.patch('time.time', mock_time):
+ torrent = Torrent(handle, {})
+ counter = itertools.count()
+ handle.status = mock.Mock(side_effect=counter.__next__)
+ first_status = torrent.get_lt_status()
+ assert first_status == 0, 'sanity check'
+ assert first_status == torrent.status, 'cached status should be used'
+ assert torrent.get_lt_status() == 1, 'status should update'
+ assert torrent.status == 1
+ # Advance time and verify cache expires and updates
+ mock_time.return_value += 10
+ assert torrent.status == 2
diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py
index e0ff09efc..0ead27230 100644
--- a/deluge/tests/test_torrentmanager.py
+++ b/deluge/tests/test_torrentmanager.py
@@ -1,38 +1,34 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import os
import shutil
import warnings
from base64 import b64encode
+from unittest import mock
-import mock
import pytest
-from twisted.internet import defer, task
-from twisted.trial import unittest
+import pytest_twisted
+from twisted.internet import reactor, task
from deluge import component
-from deluge.common import windows_check
+from deluge.bencode import bencode
+from deluge.conftest import BaseTestCase
from deluge.core.core import Core
from deluge.core.rpcserver import RPCServer
from deluge.error import InvalidTorrentError
from . import common
-from .basetest import BaseTestCase
warnings.filterwarnings('ignore', category=RuntimeWarning)
warnings.resetwarnings()
-class TorrentmanagerTestCase(BaseTestCase):
+class TestTorrentmanager(BaseTestCase):
def set_up(self):
- self.config_dir = common.set_tmp_config_dir()
self.rpcserver = RPCServer(listen=False)
self.core = Core()
self.core.config.config['lsd'] = False
@@ -48,7 +44,7 @@ class TorrentmanagerTestCase(BaseTestCase):
return component.shutdown().addCallback(on_shutdown)
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_remove_torrent(self):
filename = common.get_test_data_file('test.torrent')
with open(filename, 'rb') as _file:
@@ -56,9 +52,9 @@ class TorrentmanagerTestCase(BaseTestCase):
torrent_id = yield self.core.add_torrent_file_async(
filename, b64encode(filedump), {}
)
- self.assertTrue(self.tm.remove(torrent_id, False))
+ assert self.tm.remove(torrent_id, False)
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_remove_magnet(self):
"""Test remove magnet before received metadata and delete_copies is True"""
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
@@ -66,9 +62,10 @@ class TorrentmanagerTestCase(BaseTestCase):
self.core.config.config['copy_torrent_file'] = True
self.core.config.config['del_copy_torrent_file'] = True
torrent_id = yield self.core.add_torrent_magnet(magnet, options)
- self.assertTrue(self.tm.remove(torrent_id, False))
+ assert self.tm.remove(torrent_id, False)
- def test_prefetch_metadata(self):
+ @pytest_twisted.ensureDeferred
+ async def test_prefetch_metadata(self):
from deluge._libtorrent import lt
with open(common.get_test_data_file('test.torrent'), 'rb') as _file:
@@ -81,47 +78,55 @@ class TorrentmanagerTestCase(BaseTestCase):
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
d = self.tm.prefetch_metadata(magnet, 30)
- self.tm.on_alert_metadata_received(mock_alert)
+ # Make sure to use calllater, because the above prefetch call won't
+ # actually start running until we await it.
+ reactor.callLater(0, self.tm.on_alert_metadata_received, mock_alert)
expected = (
'ab570cdd5a17ea1b61e970bb72047de141bce173',
- {
- b'piece length': 32768,
- b'sha1': (
- b'2\xce\xb6\xa8"\xd7\xf0\xd4\xbf\xdc^K\xba\x1bh'
- b'\x9d\xc5\xb7\xac\xdd'
- ),
- b'name': b'azcvsupdater_2.6.2.jar',
- b'private': 0,
- b'pieces': (
- b"\xdb\x04B\x05\xc3'\xdab\xb8su97\xa9u"
- b'\xca<w\\\x1ef\xd4\x9b\x16\xa9}\xc0\x9f:\xfd'
- b'\x97qv\x83\xa2"\xef\x9d7\x0by!\rl\xe5v\xb7'
- b'\x18{\xf7/"P\xe9\x8d\x01D\x9e8\xbd\x16\xe3'
- b'\xfb-\x9d\xaa\xbcM\x11\xba\x92\xfc\x13F\xf0'
- b'\x1c\x86x+\xc8\xd0S\xa9\x90`\xa1\xe4\x82\xe8'
- b'\xfc\x08\xf7\xe3\xe5\xf6\x85\x1c%\xe7%\n\xed'
- b'\xc0\x1f\xa1;\x9a\xea\xcf\x90\x0c/F>\xdf\xdagA'
- b'\xc42|\xda\x82\xf5\xa6b\xa1\xb8#\x80wI\xd8f'
- b'\xf8\xbd\xacW\xab\xc3s\xe0\xbbw\xf2K\xbe\xee'
- b'\xa8rG\xe1W\xe8\xb7\xc2i\xf3\xd8\xaf\x9d\xdc'
- b'\xd0#\xf4\xc1\x12u\xcd\x0bE?:\xe8\x9c\x1cu'
- b'\xabb(oj\r^\xd5\xd5A\x83\x88\x9a\xa1J\x1c?'
- b'\xa1\xd6\x8c\x83\x9e&'
- ),
- b'length': 307949,
- b'name.utf-8': b'azcvsupdater_2.6.2.jar',
- b'ed2k': b'>p\xefl\xfa]\x95K\x1b^\xc2\\;;e\xb7',
- },
+ b64encode(
+ bencode(
+ {
+ b'piece length': 32768,
+ b'sha1': (
+ b'2\xce\xb6\xa8"\xd7\xf0\xd4\xbf\xdc^K\xba\x1bh'
+ b'\x9d\xc5\xb7\xac\xdd'
+ ),
+ b'name': b'azcvsupdater_2.6.2.jar',
+ b'private': 0,
+ b'pieces': (
+ b"\xdb\x04B\x05\xc3'\xdab\xb8su97\xa9u"
+ b'\xca<w\\\x1ef\xd4\x9b\x16\xa9}\xc0\x9f:\xfd'
+ b'\x97qv\x83\xa2"\xef\x9d7\x0by!\rl\xe5v\xb7'
+ b'\x18{\xf7/"P\xe9\x8d\x01D\x9e8\xbd\x16\xe3'
+ b'\xfb-\x9d\xaa\xbcM\x11\xba\x92\xfc\x13F\xf0'
+ b'\x1c\x86x+\xc8\xd0S\xa9\x90`\xa1\xe4\x82\xe8'
+ b'\xfc\x08\xf7\xe3\xe5\xf6\x85\x1c%\xe7%\n\xed'
+ b'\xc0\x1f\xa1;\x9a\xea\xcf\x90\x0c/F>\xdf\xdagA'
+ b'\xc42|\xda\x82\xf5\xa6b\xa1\xb8#\x80wI\xd8f'
+ b'\xf8\xbd\xacW\xab\xc3s\xe0\xbbw\xf2K\xbe\xee'
+ b'\xa8rG\xe1W\xe8\xb7\xc2i\xf3\xd8\xaf\x9d\xdc'
+ b'\xd0#\xf4\xc1\x12u\xcd\x0bE?:\xe8\x9c\x1cu'
+ b'\xabb(oj\r^\xd5\xd5A\x83\x88\x9a\xa1J\x1c?'
+ b'\xa1\xd6\x8c\x83\x9e&'
+ ),
+ b'length': 307949,
+ b'name.utf-8': b'azcvsupdater_2.6.2.jar',
+ b'ed2k': b'>p\xefl\xfa]\x95K\x1b^\xc2\\;;e\xb7',
+ }
+ )
+ ),
)
- self.assertEqual(expected, self.successResultOf(d))
+ assert expected == await d
- def test_prefetch_metadata_timeout(self):
+ @pytest_twisted.ensureDeferred
+ async def test_prefetch_metadata_timeout(self):
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
d = self.tm.prefetch_metadata(magnet, 30)
self.clock.advance(30)
- expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', None)
- return d.addCallback(self.assertEqual, expected)
+ result = await d
+ expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'')
+ assert result == expected
@pytest.mark.todo
def test_remove_torrent_false(self):
@@ -129,20 +134,15 @@ class TorrentmanagerTestCase(BaseTestCase):
common.todo_test(self)
def test_remove_invalid_torrent(self):
- self.assertRaises(
- InvalidTorrentError, self.tm.remove, 'torrentidthatdoesntexist'
- )
+ with pytest.raises(InvalidTorrentError):
+ self.tm.remove('torrentidthatdoesntexist')
- def test_open_state_from_python2(self):
- """Open a Python2 state with a UTF-8 encoded torrent filename."""
+ def test_open_state(self):
+ """Open a state with a UTF-8 encoded torrent filename."""
shutil.copy(
common.get_test_data_file('utf8_filename_torrents.state'),
os.path.join(self.config_dir, 'state', 'torrents.state'),
)
- if windows_check():
- raise unittest.SkipTest(
- 'Windows ModuleNotFoundError due to Linux line ending'
- )
state = self.tm.open_state()
- self.assertEqual(len(state.torrents), 1)
+ assert len(state.torrents) == 1
diff --git a/deluge/tests/test_torrentview.py b/deluge/tests/test_torrentview.py
index 590760d1e..8d0568866 100644
--- a/deluge/tests/test_torrentview.py
+++ b/deluge/tests/test_torrentview.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2014 Bro <bro.development@gmail.com>
# Copyright (C) 2014 Calum Lind <calumlind@gmail.com>
@@ -8,18 +7,13 @@
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import pytest
-from twisted.trial import unittest
import deluge.component as component
from deluge.configmanager import ConfigManager
+from deluge.conftest import BaseTestCase
from deluge.i18n import setup_translation
-from . import common
-from .basetest import BaseTestCase
-
# Allow running other tests without GTKUI dependencies available
try:
# pylint: disable=ungrouped-imports
@@ -40,7 +34,7 @@ setup_translation()
@pytest.mark.gtkui
-class TorrentviewTestCase(BaseTestCase):
+class TestTorrentview(BaseTestCase):
default_column_index = [
'filter',
@@ -66,6 +60,7 @@ class TorrentviewTestCase(BaseTestCase):
'Added',
'Completed',
'Complete Seen',
+ 'Last Transfer',
'Tracker',
'Download Folder',
'Owner',
@@ -99,6 +94,7 @@ class TorrentviewTestCase(BaseTestCase):
int,
int,
int,
+ int,
str,
str, # Tracker
str,
@@ -108,9 +104,8 @@ class TorrentviewTestCase(BaseTestCase):
def set_up(self):
if libs_available is False:
- raise unittest.SkipTest('GTKUI dependencies not available')
+ pytest.skip('GTKUI dependencies not available')
- common.set_tmp_config_dir()
# MainWindow loads this config file, so lets make sure it contains the defaults
ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS)
self.mainwindow = MainWindow()
@@ -122,36 +117,23 @@ class TorrentviewTestCase(BaseTestCase):
return component.shutdown()
def test_torrentview_columns(self):
-
- self.assertEqual(
- self.torrentview.column_index, TorrentviewTestCase.default_column_index
- )
- self.assertEqual(
- self.torrentview.liststore_columns,
- TorrentviewTestCase.default_liststore_columns,
- )
- self.assertEqual(
- self.torrentview.columns['Download Folder'].column_indices, [29]
- )
+ assert self.torrentview.column_index == self.default_column_index
+ assert self.torrentview.liststore_columns == self.default_liststore_columns
+ assert self.torrentview.columns['Download Folder'].column_indices == [30]
def test_add_column(self):
-
# Add a text column
test_col = 'Test column'
self.torrentview.add_text_column(test_col, status_field=['label'])
- self.assertEqual(
- len(self.torrentview.liststore_columns),
- len(TorrentviewTestCase.default_liststore_columns) + 1,
- )
- self.assertEqual(
- len(self.torrentview.column_index),
- len(TorrentviewTestCase.default_column_index) + 1,
+ assert (
+ len(self.torrentview.liststore_columns)
+ == len(self.default_liststore_columns) + 1
)
- self.assertEqual(self.torrentview.column_index[-1], test_col)
- self.assertEqual(self.torrentview.columns[test_col].column_indices, [32])
+ assert len(self.torrentview.column_index) == len(self.default_column_index) + 1
+ assert self.torrentview.column_index[-1] == test_col
+ assert self.torrentview.columns[test_col].column_indices == [33]
def test_add_columns(self):
-
# Add a text column
test_col = 'Test column'
self.torrentview.add_text_column(test_col, status_field=['label'])
@@ -160,50 +142,35 @@ class TorrentviewTestCase(BaseTestCase):
test_col2 = 'Test column2'
self.torrentview.add_text_column(test_col2, status_field=['label2'])
- self.assertEqual(
- len(self.torrentview.liststore_columns),
- len(TorrentviewTestCase.default_liststore_columns) + 2,
- )
- self.assertEqual(
- len(self.torrentview.column_index),
- len(TorrentviewTestCase.default_column_index) + 2,
+ assert (
+ len(self.torrentview.liststore_columns)
+ == len(self.default_liststore_columns) + 2
)
+ assert len(self.torrentview.column_index) == len(self.default_column_index) + 2
# test_col
- self.assertEqual(self.torrentview.column_index[-2], test_col)
- self.assertEqual(self.torrentview.columns[test_col].column_indices, [32])
+ assert self.torrentview.column_index[-2] == test_col
+ assert self.torrentview.columns[test_col].column_indices == [33]
# test_col2
- self.assertEqual(self.torrentview.column_index[-1], test_col2)
- self.assertEqual(self.torrentview.columns[test_col2].column_indices, [33])
+ assert self.torrentview.column_index[-1] == test_col2
+ assert self.torrentview.columns[test_col2].column_indices == [34]
def test_remove_column(self):
-
# Add and remove text column
test_col = 'Test column'
self.torrentview.add_text_column(test_col, status_field=['label'])
self.torrentview.remove_column(test_col)
- self.assertEqual(
- len(self.torrentview.liststore_columns),
- len(TorrentviewTestCase.default_liststore_columns),
- )
- self.assertEqual(
- len(self.torrentview.column_index),
- len(TorrentviewTestCase.default_column_index),
- )
- self.assertEqual(
- self.torrentview.column_index[-1],
- TorrentviewTestCase.default_column_index[-1],
- )
- self.assertEqual(
- self.torrentview.columns[
- TorrentviewTestCase.default_column_index[-1]
- ].column_indices,
- [31],
+ assert len(self.torrentview.liststore_columns) == len(
+ self.default_liststore_columns
)
+ assert len(self.torrentview.column_index) == len(self.default_column_index)
+ assert self.torrentview.column_index[-1] == self.default_column_index[-1]
+ assert self.torrentview.columns[
+ self.default_column_index[-1]
+ ].column_indices == [32]
def test_remove_columns(self):
-
# Add two columns
test_col = 'Test column'
self.torrentview.add_text_column(test_col, status_field=['label'])
@@ -212,74 +179,47 @@ class TorrentviewTestCase(BaseTestCase):
# Remove test_col
self.torrentview.remove_column(test_col)
- self.assertEqual(
- len(self.torrentview.liststore_columns),
- len(TorrentviewTestCase.default_liststore_columns) + 1,
+ assert (
+ len(self.torrentview.liststore_columns)
+ == len(self.default_liststore_columns) + 1
)
- self.assertEqual(
- len(self.torrentview.column_index),
- len(TorrentviewTestCase.default_column_index) + 1,
- )
- self.assertEqual(self.torrentview.column_index[-1], test_col2)
- self.assertEqual(self.torrentview.columns[test_col2].column_indices, [32])
+ assert len(self.torrentview.column_index) == len(self.default_column_index) + 1
+ assert self.torrentview.column_index[-1] == test_col2
+ assert self.torrentview.columns[test_col2].column_indices == [33]
# Remove test_col2
self.torrentview.remove_column(test_col2)
- self.assertEqual(
- len(self.torrentview.liststore_columns),
- len(TorrentviewTestCase.default_liststore_columns),
- )
- self.assertEqual(
- len(self.torrentview.column_index),
- len(TorrentviewTestCase.default_column_index),
- )
- self.assertEqual(
- self.torrentview.column_index[-1],
- TorrentviewTestCase.default_column_index[-1],
- )
- self.assertEqual(
- self.torrentview.columns[
- TorrentviewTestCase.default_column_index[-1]
- ].column_indices,
- [31],
+ assert len(self.torrentview.liststore_columns) == len(
+ self.default_liststore_columns
)
+ assert len(self.torrentview.column_index) == len(self.default_column_index)
+ assert self.torrentview.column_index[-1] == self.default_column_index[-1]
+ assert self.torrentview.columns[
+ self.default_column_index[-1]
+ ].column_indices == [32]
def test_add_remove_column_multiple_types(self):
-
# Add a column with multiple column types
test_col3 = 'Test column3'
self.torrentview.add_progress_column(
test_col3, status_field=['progress', 'label3'], col_types=[float, str]
)
- self.assertEqual(
- len(self.torrentview.liststore_columns),
- len(TorrentviewTestCase.default_liststore_columns) + 2,
- )
- self.assertEqual(
- len(self.torrentview.column_index),
- len(TorrentviewTestCase.default_column_index) + 1,
+ assert (
+ len(self.torrentview.liststore_columns)
+ == len(self.default_liststore_columns) + 2
)
- self.assertEqual(self.torrentview.column_index[-1], test_col3)
- self.assertEqual(self.torrentview.columns[test_col3].column_indices, [32, 33])
+ assert len(self.torrentview.column_index) == len(self.default_column_index) + 1
+ assert self.torrentview.column_index[-1] == test_col3
+ assert self.torrentview.columns[test_col3].column_indices == [33, 34]
# Remove multiple column-types column
self.torrentview.remove_column(test_col3)
- self.assertEqual(
- len(self.torrentview.liststore_columns),
- len(TorrentviewTestCase.default_liststore_columns),
- )
- self.assertEqual(
- len(self.torrentview.column_index),
- len(TorrentviewTestCase.default_column_index),
- )
- self.assertEqual(
- self.torrentview.column_index[-1],
- TorrentviewTestCase.default_column_index[-1],
- )
- self.assertEqual(
- self.torrentview.columns[
- TorrentviewTestCase.default_column_index[-1]
- ].column_indices,
- [31],
+ assert len(self.torrentview.liststore_columns) == len(
+ self.default_liststore_columns
)
+ assert len(self.torrentview.column_index) == len(self.default_column_index)
+ assert self.torrentview.column_index[-1] == self.default_column_index[-1]
+ assert self.torrentview.columns[
+ self.default_column_index[-1]
+ ].column_indices == [32]
diff --git a/deluge/tests/test_tracker_icons.py b/deluge/tests/test_tracker_icons.py
index e18d33987..2f793d12e 100644
--- a/deluge/tests/test_tracker_icons.py
+++ b/deluge/tests/test_tracker_icons.py
@@ -1,34 +1,25 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-
-from __future__ import unicode_literals
+import os.path
import pytest
-from twisted.trial.unittest import SkipTest
+import pytest_twisted
import deluge.component as component
import deluge.ui.tracker_icons
-from deluge.common import windows_check
+from deluge.conftest import BaseTestCase
from deluge.ui.tracker_icons import TrackerIcon, TrackerIcons
from . import common
-from .basetest import BaseTestCase
-common.set_tmp_config_dir()
-deluge.ui.tracker_icons.PIL_INSTALLED = False
common.disable_new_release_check()
@pytest.mark.internet
-class TrackerIconsTestCase(BaseTestCase):
-
- if windows_check():
- skip = 'cannot use os.path.samefile to compair on windows(unix only)'
-
+class TestTrackerIcons(BaseTestCase):
def set_up(self):
# Disable resizing with Pillow for consistency.
self.patch(deluge.ui.tracker_icons, 'Image', None)
@@ -37,41 +28,52 @@ class TrackerIconsTestCase(BaseTestCase):
def tear_down(self):
return component.shutdown()
- def test_get_deluge_png(self):
+ @pytest_twisted.ensureDeferred
+ async def test_get_deluge_png(self, mock_mkstemp):
# Deluge has a png favicon link
icon = TrackerIcon(common.get_test_data_file('deluge.png'))
- d = self.icons.fetch('deluge-torrent.org')
- d.addCallback(self.assertNotIdentical, None)
- d.addCallback(self.assertEqual, icon)
- return d
+ result = await self.icons.fetch('deluge-torrent.org')
+ assert result == icon
+ assert not os.path.isfile(mock_mkstemp[1])
- def test_get_google_ico(self):
+ @pytest_twisted.ensureDeferred
+ async def test_get_google_ico(self):
# Google doesn't have any icon links
# So instead we'll grab its favicon.ico
icon = TrackerIcon(common.get_test_data_file('google.ico'))
- d = self.icons.fetch('www.google.com')
- d.addCallback(self.assertNotIdentical, None)
- d.addCallback(self.assertEqual, icon)
- return d
+ result = await self.icons.fetch('www.google.com')
+ assert result == icon
- def test_get_google_ico_with_redirect(self):
+ @pytest_twisted.ensureDeferred
+ async def test_get_google_ico_hebrew(self):
+ """Test that Google.co.il page is read as UTF-8"""
+ icon = TrackerIcon(common.get_test_data_file('google.ico'))
+ result = await self.icons.fetch('www.google.co.il')
+ assert result == icon
+
+ @pytest_twisted.ensureDeferred
+ async def test_get_google_ico_with_redirect(self):
# google.com redirects to www.google.com
icon = TrackerIcon(common.get_test_data_file('google.ico'))
- d = self.icons.fetch('google.com')
- d.addCallback(self.assertNotIdentical, None)
- d.addCallback(self.assertEqual, icon)
- return d
+ result = await self.icons.fetch('google.com')
+ assert result == icon
- def test_get_seo_ico_with_sni(self):
+ @pytest.mark.skip(reason='Site removed favicon, new SNI test will be needed')
+ @pytest_twisted.ensureDeferred
+ async def test_get_seo_svg_with_sni(self):
# seo using certificates with SNI support only
- raise SkipTest('Site certificate expired')
- icon = TrackerIcon(common.get_test_data_file('seo.ico'))
- d = self.icons.fetch('www.seo.com')
- d.addCallback(self.assertNotIdentical, None)
- d.addCallback(self.assertEqual, icon)
- return d
+ icon = TrackerIcon(common.get_test_data_file('seo.svg'))
+ result = await self.icons.fetch('www.seo.com')
+ assert result == icon
+
+ @pytest_twisted.ensureDeferred
+ async def test_get_empty_string_tracker(self):
+ result = await self.icons.fetch('')
+ assert result is None
- def test_get_empty_string_tracker(self):
- d = self.icons.fetch('')
- d.addCallback(self.assertIdentical, None)
- return d
+ @pytest_twisted.ensureDeferred
+ async def test_invalid_host(self, mock_mkstemp):
+ """Test that TrackerIcon can handle invalid hostname"""
+ result = await self.icons.fetch('deluge.example.com')
+ assert not result
+ assert not os.path.isfile(mock_mkstemp[1])
diff --git a/deluge/tests/test_transfer.py b/deluge/tests/test_transfer.py
index a04830325..92e349b5d 100644
--- a/deluge/tests/test_transfer.py
+++ b/deluge/tests/test_transfer.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2012 Bro <bro.development@gmail.com>
#
@@ -7,12 +6,10 @@
# See LICENSE for more details.
#
-from __future__ import print_function, unicode_literals
-
import base64
+import pytest
import rencode
-from twisted.trial import unittest
import deluge.log
from deluge.transfer import DelugeTransferProtocol
@@ -112,8 +109,9 @@ class TransferTestClass(DelugeTransferProtocol):
self.message_received(request)
-class DelugeTransferProtocolTestCase(unittest.TestCase):
- def setUp(self): # NOQA: N803
+class TestDelugeTransferProtocol:
+ @pytest.fixture(autouse=True)
+ def set_up(self):
"""
The expected messages corresponds to the test messages (msg1, msg2) after they've been processed
by DelugeTransferProtocol.send, which means that they've first been encoded with rencode,
@@ -160,7 +158,7 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
# Get the data as sent by DelugeTransferProtocol
messages = self.transfer.get_messages_out_joined()
base64_encoded = base64.b64encode(messages)
- self.assertEqual(base64_encoded, self.msg1_expected_compressed_base64)
+ assert base64_encoded == self.msg1_expected_compressed_base64
def test_receive_one_message(self):
"""
@@ -173,7 +171,7 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
)
# Get the data as sent by DelugeTransferProtocol
messages = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(messages))
+ assert rencode.dumps(self.msg1) == rencode.dumps(messages)
def test_receive_old_message(self):
"""
@@ -181,9 +179,9 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
"""
self.transfer.dataReceived(rencode.dumps(self.msg1))
- self.assertEqual(len(self.transfer.get_messages_in()), 0)
- self.assertEqual(self.transfer._message_length, 0)
- self.assertEqual(len(self.transfer._buffer), 0)
+ assert len(self.transfer.get_messages_in()) == 0
+ assert self.transfer._message_length == 0
+ assert len(self.transfer._buffer) == 0
def test_receive_two_concatenated_messages(self):
"""
@@ -198,9 +196,9 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
# Get the data as sent by DelugeTransferProtocol
message1 = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1))
+ assert rencode.dumps(self.msg1) == rencode.dumps(message1)
message2 = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2))
+ assert rencode.dumps(self.msg2) == rencode.dumps(message2)
def test_receive_three_messages_in_parts(self):
"""
@@ -237,20 +235,17 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
else:
expected_msgs_received_count = 0
# Verify that the expected number of complete messages has arrived
- self.assertEqual(
- expected_msgs_received_count, len(self.transfer.get_messages_in())
- )
+ assert expected_msgs_received_count == len(self.transfer.get_messages_in())
# Get the data as received by DelugeTransferProtocol
message1 = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1))
+ assert rencode.dumps(self.msg1) == rencode.dumps(message1)
message2 = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2))
+ assert rencode.dumps(self.msg2) == rencode.dumps(message2)
message3 = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message3))
+ assert rencode.dumps(self.msg1) == rencode.dumps(message3)
# Remove underscore to enable test, or run the test directly:
- # tests $ trial test_transfer.DelugeTransferProtocolTestCase._test_rencode_fail_protocol
def _test_rencode_fail_protocol(self):
"""
This test tries to test the protocol that relies on errors from rencode.
@@ -317,11 +312,11 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
# Get the data as received by DelugeTransferProtocol
message1 = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1))
+ assert rencode.dumps(self.msg1) == rencode.dumps(message1)
message2 = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2))
+ assert rencode.dumps(self.msg2) == rencode.dumps(message2)
message3 = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message3))
+ assert rencode.dumps(self.msg1) == rencode.dumps(message3)
def test_receive_middle_of_header(self):
"""
@@ -344,19 +339,19 @@ class DelugeTransferProtocolTestCase(unittest.TestCase):
self.transfer.dataReceived(two_concatenated[: first_len + 2])
# Should be 1 message in the list
- self.assertEqual(1, len(self.transfer.get_messages_in()))
+ assert 1 == len(self.transfer.get_messages_in())
# Send the rest
self.transfer.dataReceived(two_concatenated[first_len + 2 :])
# Should be 2 messages in the list
- self.assertEqual(2, len(self.transfer.get_messages_in()))
+ assert 2 == len(self.transfer.get_messages_in())
# Get the data as sent by DelugeTransferProtocol
message1 = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1))
+ assert rencode.dumps(self.msg1) == rencode.dumps(message1)
message2 = self.transfer.get_messages_in().pop(0)
- self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2))
+ assert rencode.dumps(self.msg2) == rencode.dumps(message2)
# Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon
# def test_simulate_big_transfer(self):
diff --git a/deluge/tests/test_ui_common.py b/deluge/tests/test_ui_common.py
index b0c311183..ee97259de 100644
--- a/deluge/tests/test_ui_common.py
+++ b/deluge/tests/test_ui_common.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
#
@@ -6,30 +5,19 @@
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-from six import assertCountEqual
-from twisted.trial import unittest
-
-from deluge.common import windows_check
from deluge.ui.common import TorrentInfo
from . import common
-class UICommonTestCase(unittest.TestCase):
- def setUp(self): # NOQA: N803
- pass
-
- def tearDown(self): # NOQA: N803
- pass
-
+class TestUICommon:
def test_hash_optional_single_file(self):
"""Ensure single file with `ed2k` and `sha1` keys are not in filetree output."""
filename = common.get_test_data_file('test.torrent')
files_tree = {'azcvsupdater_2.6.2.jar': (0, 307949, True)}
ti = TorrentInfo(filename, filetree=1)
- self.assertEqual(ti.files_tree, files_tree)
+ assert ti.files_tree == files_tree
files_tree2 = {
'contents': {
@@ -42,7 +30,7 @@ class UICommonTestCase(unittest.TestCase):
}
}
ti = TorrentInfo(filename, filetree=2)
- self.assertEqual(ti.files_tree, files_tree2)
+ assert ti.files_tree == files_tree2
def test_hash_optional_multi_file(self):
"""Ensure multi-file with `filehash` and `ed2k` are keys not in filetree output."""
@@ -54,7 +42,7 @@ class UICommonTestCase(unittest.TestCase):
}
}
ti = TorrentInfo(filename, filetree=1)
- self.assertEqual(ti.files_tree, files_tree)
+ assert ti.files_tree == files_tree
filestree2 = {
'contents': {
@@ -83,14 +71,14 @@ class UICommonTestCase(unittest.TestCase):
'type': 'dir',
}
ti = TorrentInfo(filename, filetree=2)
- self.assertEqual(ti.files_tree, filestree2)
+ assert ti.files_tree == filestree2
def test_hash_optional_md5sum(self):
# Ensure `md5sum` key is not included in filetree output
filename = common.get_test_data_file('md5sum.torrent')
files_tree = {'test': {'lol': (0, 4, True), 'rofl': (1, 5, True)}}
ti = TorrentInfo(filename, filetree=1)
- self.assertEqual(ti.files_tree, files_tree)
+ assert ti.files_tree == files_tree
ti = TorrentInfo(filename, filetree=2)
files_tree2 = {
'contents': {
@@ -118,16 +106,14 @@ class UICommonTestCase(unittest.TestCase):
},
'type': 'dir',
}
- self.assertEqual(ti.files_tree, files_tree2)
+ assert ti.files_tree == files_tree2
def test_utf8_encoded_paths(self):
filename = common.get_test_data_file('test.torrent')
ti = TorrentInfo(filename)
- self.assertTrue('azcvsupdater_2.6.2.jar' in ti.files_tree)
+ assert 'azcvsupdater_2.6.2.jar' in ti.files_tree
def test_utf8_encoded_paths2(self):
- if windows_check():
- raise unittest.SkipTest('on windows KeyError: unicode_filenames')
filename = common.get_test_data_file('unicode_filenames.torrent')
filepath1 = '\u30c6\u30af\u30b9\u30fb\u30c6\u30af\u30b5\u30f3.mkv'
filepath2 = (
@@ -140,11 +126,11 @@ class UICommonTestCase(unittest.TestCase):
ti = TorrentInfo(filename)
files_tree = ti.files_tree['unicode_filenames']
- self.assertIn(filepath1, files_tree)
- self.assertIn(filepath2, files_tree)
- self.assertIn(filepath3, files_tree)
- self.assertIn(filepath4, files_tree)
- self.assertIn(filepath5, files_tree)
+ assert filepath1 in files_tree
+ assert filepath2 in files_tree
+ assert filepath3 in files_tree
+ assert filepath4 in files_tree
+ assert filepath5 in files_tree
result_files = [
{
@@ -170,4 +156,4 @@ class UICommonTestCase(unittest.TestCase):
{'download': True, 'path': 'unicode_filenames/' + filepath1, 'size': 1771},
]
- assertCountEqual(self, ti.files, result_files)
+ assert len(ti.files) == len(result_files)
diff --git a/deluge/tests/test_ui_console.py b/deluge/tests/test_ui_console.py
index 3667c608e..34398ee19 100644
--- a/deluge/tests/test_ui_console.py
+++ b/deluge/tests/test_ui_console.py
@@ -1,35 +1,30 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import argparse
+import pytest
+
from deluge.ui.console.cmdline.commands.add import Command
from deluge.ui.console.cmdline.commands.config import json_eval
from deluge.ui.console.widgets.fields import TextInput
-from .basetest import BaseTestCase
-
-class MockParent(object):
+class MockParent:
def __init__(self):
self.border_off_x = 1
self.pane_width = 20
self.encoding = 'utf8'
-class UIConsoleFieldTestCase(BaseTestCase):
- def setUp(self): # NOQA: N803
+class TestUIConsoleField:
+ @pytest.fixture(autouse=True)
+ def set_up(self):
self.parent = MockParent()
- def tearDown(self): # NOQA: N803
- pass
-
def test_text_input(self):
def move_func(self, r, c):
self._cursor_row = r
@@ -44,48 +39,42 @@ class UIConsoleFieldTestCase(BaseTestCase):
'/text/field/file/path',
complete=False,
)
- self.assertTrue(t)
- self.assertTrue(t.handle_read(33))
-
-
-class UIConsoleCommandsTestCase(BaseTestCase):
- def setUp(self):
- pass
+ assert t
+ assert t.handle_read(33)
- def tearDown(self):
- pass
+class TestUIConsoleCommands:
def test_add_move_completed(self):
completed_path = 'completed_path'
parser = argparse.ArgumentParser()
cmd = Command()
cmd.add_arguments(parser)
args = parser.parse_args(['torrent', '-m', completed_path])
- self.assertEqual(args.move_completed_path, completed_path)
+ assert args.move_completed_path == completed_path
args = parser.parse_args(['torrent', '--move-path', completed_path])
- self.assertEqual(args.move_completed_path, completed_path)
+ assert args.move_completed_path == completed_path
def test_config_json_eval(self):
- self.assertEqual(json_eval('/downloads'), '/downloads')
- self.assertEqual(json_eval('/dir/with space'), '/dir/with space')
- self.assertEqual(json_eval('c:\\\\downloads'), 'c:\\\\downloads')
- self.assertEqual(json_eval('c:/downloads'), 'c:/downloads')
+ assert json_eval('/downloads') == '/downloads'
+ assert json_eval('/dir/with space') == '/dir/with space'
+ assert json_eval('c:\\\\downloads') == 'c:\\\\downloads'
+ assert json_eval('c:/downloads') == 'c:/downloads'
# Ensure newlines are split and only first setting is used.
- self.assertEqual(json_eval('setting\nwithneline'), 'setting')
+ assert json_eval('setting\nwithneline') == 'setting'
# Allow both parentheses and square brackets.
- self.assertEqual(json_eval('(8000, 8001)'), [8000, 8001])
- self.assertEqual(json_eval('[8000, 8001]'), [8000, 8001])
- self.assertEqual(json_eval('["abc", "def"]'), ['abc', 'def'])
- self.assertEqual(json_eval('{"foo": "bar"}'), {'foo': 'bar'})
- self.assertEqual(json_eval('{"number": 1234}'), {'number': 1234})
+ assert json_eval('(8000, 8001)') == [8000, 8001]
+ assert json_eval('[8000, 8001]') == [8000, 8001]
+ assert json_eval('["abc", "def"]') == ['abc', 'def']
+ assert json_eval('{"foo": "bar"}') == {'foo': 'bar'}
+ assert json_eval('{"number": 1234}') == {'number': 1234}
# Hex string for peer_tos.
- self.assertEqual(json_eval('0x00'), '0x00')
- self.assertEqual(json_eval('1000'), 1000)
- self.assertEqual(json_eval('-6'), -6)
- self.assertEqual(json_eval('10.5'), 10.5)
- self.assertEqual(json_eval('True'), True)
- self.assertEqual(json_eval('false'), False)
- self.assertEqual(json_eval('none'), None)
+ assert json_eval('0x00') == '0x00'
+ assert json_eval('1000') == 1000
+ assert json_eval('-6') == -6
+ assert json_eval('10.5') == 10.5
+ assert json_eval('True')
+ assert not json_eval('false')
+ assert json_eval('none') is None
# Empty values to clear config key.
- self.assertEqual(json_eval('[]'), [])
- self.assertEqual(json_eval(''), '')
+ assert json_eval('[]') == []
+ assert json_eval('') == ''
diff --git a/deluge/tests/test_ui_entry.py b/deluge/tests/test_ui_entry.py
index f85bc7d7d..0546ad7b8 100644
--- a/deluge/tests/test_ui_entry.py
+++ b/deluge/tests/test_ui_entry.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
#
@@ -7,14 +6,13 @@
# See LICENSE for more details.
#
-from __future__ import print_function, unicode_literals
-
import argparse
import sys
from io import StringIO
+from unittest import mock
-import mock
import pytest
+import pytest_twisted
from twisted.internet import defer
import deluge
@@ -23,12 +21,12 @@ import deluge.ui.console
import deluge.ui.console.cmdline.commands.quit
import deluge.ui.console.main
import deluge.ui.web.server
-from deluge.common import PY2, get_localhost_auth, windows_check
+from deluge.common import get_localhost_auth, windows_check
+from deluge.conftest import BaseTestCase
from deluge.ui import ui_entry
from deluge.ui.web.server import DelugeWeb
from . import common
-from .basetest import BaseTestCase
from .daemon_base import DaemonBase
DEBUG_COMMAND = False
@@ -41,7 +39,7 @@ sys_stdout = sys.stdout
# To print to terminal from the tests, use: print('Message...', file=sys_stdout)
-class StringFileDescriptor(object):
+class StringFileDescriptor:
"""File descriptor that writes to string buffer"""
def __init__(self, fd):
@@ -51,22 +49,15 @@ class StringFileDescriptor(object):
setattr(self, a, getattr(sys_stdout, a))
def write(self, *data, **kwargs):
- # io.StringIO requires unicode strings.
data_string = str(*data)
- if PY2:
- data_string = data_string.decode()
print(data_string, file=self.out, end='')
def flush(self):
self.out.flush()
-class UIBaseTestCase(object):
- def __init__(self):
- self.var = {}
-
+class UIBaseTestCase:
def set_up(self):
- common.set_tmp_config_dir()
common.setup_test_logger(level='info', prefix=self.id())
return component.start()
@@ -82,28 +73,14 @@ class UIBaseTestCase(object):
class UIWithDaemonBaseTestCase(UIBaseTestCase, DaemonBase):
"""Subclass for test that require a deluged daemon"""
- def __init__(self):
- UIBaseTestCase.__init__(self)
-
def set_up(self):
d = self.common_set_up()
common.setup_test_logger(level='info', prefix=self.id())
- d.addCallback(self.start_core)
- return d
-
- def tear_down(self):
- d = UIBaseTestCase.tear_down(self)
- d.addCallback(self.terminate_core)
return d
-class DelugeEntryTestCase(BaseTestCase):
-
- if windows_check():
- skip = 'Console ui test on Windows broken due to sys args issue'
-
+class TestDelugeEntry(BaseTestCase):
def set_up(self):
- common.set_tmp_config_dir()
return component.start()
def tear_down(self):
@@ -119,10 +96,11 @@ class DelugeEntryTestCase(BaseTestCase):
self.patch(argparse._sys, 'stdout', fd)
with mock.patch('deluge.ui.console.main.ConsoleUI'):
- self.assertRaises(SystemExit, ui_entry.start_ui)
- self.assertTrue('usage: deluge' in fd.out.getvalue())
- self.assertTrue('UI Options:' in fd.out.getvalue())
- self.assertTrue('* console' in fd.out.getvalue())
+ with pytest.raises(SystemExit):
+ ui_entry.start_ui()
+ assert 'usage: deluge' in fd.out.getvalue()
+ assert 'UI Options:' in fd.out.getvalue()
+ assert '* console' in fd.out.getvalue()
def test_start_default(self):
self.patch(sys, 'argv', ['./deluge'])
@@ -157,7 +135,7 @@ class DelugeEntryTestCase(BaseTestCase):
# Just test that no exception is raised
ui_entry.start_ui()
- self.assertEqual(_level[0], 'info')
+ assert _level[0] == 'info'
class GtkUIBaseTestCase(UIBaseTestCase):
@@ -173,38 +151,27 @@ class GtkUIBaseTestCase(UIBaseTestCase):
@pytest.mark.gtkui
-class GtkUIDelugeScriptEntryTestCase(BaseTestCase, GtkUIBaseTestCase):
- def __init__(self, testname):
- super(GtkUIDelugeScriptEntryTestCase, self).__init__(testname)
- GtkUIBaseTestCase.__init__(self)
-
- self.var['cmd_name'] = 'deluge gtk'
- self.var['start_cmd'] = ui_entry.start_ui
- self.var['sys_arg_cmd'] = ['./deluge', 'gtk']
-
- def set_up(self):
- return GtkUIBaseTestCase.set_up(self)
-
- def tear_down(self):
- return GtkUIBaseTestCase.tear_down(self)
+class TestGtkUIDelugeScriptEntry(BaseTestCase, GtkUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge gtk',
+ 'start_cmd': ui_entry.start_ui,
+ 'sys_arg_cmd': ['./deluge', 'gtk'],
+ }
@pytest.mark.gtkui
-class GtkUIScriptEntryTestCase(BaseTestCase, GtkUIBaseTestCase):
- def __init__(self, testname):
- super(GtkUIScriptEntryTestCase, self).__init__(testname)
- GtkUIBaseTestCase.__init__(self)
+class TestGtkUIScriptEntry(BaseTestCase, GtkUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
from deluge.ui import gtk3
- self.var['cmd_name'] = 'deluge-gtk'
- self.var['start_cmd'] = gtk3.start
- self.var['sys_arg_cmd'] = ['./deluge-gtk']
-
- def set_up(self):
- return GtkUIBaseTestCase.set_up(self)
-
- def tear_down(self):
- return GtkUIBaseTestCase.tear_down(self)
+ request.cls.var = {
+ 'cmd_name': 'deluge-gtk',
+ 'start_cmd': gtk3.start,
+ 'sys_arg_cmd': ['./deluge-gtk'],
+ }
class DelugeWebMock(DelugeWeb):
@@ -242,45 +209,31 @@ class WebUIBaseTestCase(UIBaseTestCase):
self.patch(deluge.ui.web.server, 'DelugeWeb', DelugeWebMock)
self.exec_command()
- self.assertEqual(_level[0], 'info')
-
-
-class WebUIScriptEntryTestCase(BaseTestCase, WebUIBaseTestCase):
-
- if windows_check():
- skip = 'Console ui test on Windows broken due to sys args issue'
-
- def __init__(self, testname):
- super(WebUIScriptEntryTestCase, self).__init__(testname)
- WebUIBaseTestCase.__init__(self)
- self.var['cmd_name'] = 'deluge-web'
- self.var['start_cmd'] = deluge.ui.web.start
- self.var['sys_arg_cmd'] = ['./deluge-web', '--do-not-daemonize']
-
- def set_up(self):
- return WebUIBaseTestCase.set_up(self)
-
- def tear_down(self):
- return WebUIBaseTestCase.tear_down(self)
-
+ assert _level[0] == 'info'
-class WebUIDelugeScriptEntryTestCase(BaseTestCase, WebUIBaseTestCase):
- if windows_check():
- skip = 'Console ui test on Windows broken due to sys args issue'
+class TestWebUIScriptEntry(BaseTestCase, WebUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge-web',
+ 'start_cmd': deluge.ui.web.start,
+ 'sys_arg_cmd': ['./deluge-web'],
+ }
+ if not windows_check():
+ request.cls.var['sys_arg_cmd'].append('--do-not-daemonize')
- def __init__(self, testname):
- super(WebUIDelugeScriptEntryTestCase, self).__init__(testname)
- WebUIBaseTestCase.__init__(self)
- self.var['cmd_name'] = 'deluge web'
- self.var['start_cmd'] = ui_entry.start_ui
- self.var['sys_arg_cmd'] = ['./deluge', 'web', '--do-not-daemonize']
-
- def set_up(self):
- return WebUIBaseTestCase.set_up(self)
- def tear_down(self):
- return WebUIBaseTestCase.tear_down(self)
+class TestWebUIDelugeScriptEntry(BaseTestCase, WebUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge web',
+ 'start_cmd': ui_entry.start_ui,
+ 'sys_arg_cmd': ['./deluge', 'web'],
+ }
+ if not windows_check():
+ request.cls.var['sys_arg_cmd'].append('--do-not-daemonize')
class ConsoleUIBaseTestCase(UIBaseTestCase):
@@ -291,7 +244,7 @@ class ConsoleUIBaseTestCase(UIBaseTestCase):
with mock.patch('deluge.ui.console.main.ConsoleUI'):
self.exec_command()
- def test_start_console_with_log_level(self):
+ def test_start_console_with_log_level(self, request):
_level = []
def setup_logger(
@@ -314,7 +267,7 @@ class ConsoleUIBaseTestCase(UIBaseTestCase):
# Just test that no exception is raised
self.exec_command()
- self.assertEqual(_level[0], 'info')
+ assert _level[0] == 'info'
def test_console_help(self):
self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-h'])
@@ -322,18 +275,19 @@ class ConsoleUIBaseTestCase(UIBaseTestCase):
self.patch(argparse._sys, 'stdout', fd)
with mock.patch('deluge.ui.console.main.ConsoleUI'):
- self.assertRaises(SystemExit, self.exec_command)
+ with pytest.raises(SystemExit):
+ self.exec_command()
std_output = fd.out.getvalue()
- self.assertTrue(
- ('usage: %s' % self.var['cmd_name']) in std_output
- ) # Check command name
- self.assertTrue('Common Options:' in std_output)
- self.assertTrue('Console Options:' in std_output)
- self.assertIn(
- 'Console Commands:\n The following console commands are available:',
- std_output,
+ assert (
+ 'usage: %s' % self.var['cmd_name']
+ ) in std_output # Check command name
+ assert 'Common Options:' in std_output
+ assert 'Console Options:' in std_output
+ assert (
+ 'Console Commands:\n The following console commands are available:'
+ in std_output
)
- self.assertIn('The following console commands are available:', std_output)
+ assert 'The following console commands are available:' in std_output
def test_console_command_info(self):
self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['info'])
@@ -349,10 +303,11 @@ class ConsoleUIBaseTestCase(UIBaseTestCase):
self.patch(argparse._sys, 'stdout', fd)
with mock.patch('deluge.ui.console.main.ConsoleUI'):
- self.assertRaises(SystemExit, self.exec_command)
+ with pytest.raises(SystemExit):
+ self.exec_command()
std_output = fd.out.getvalue()
- self.assertIn('usage: info', std_output)
- self.assertIn('Show information about the torrents', std_output)
+ assert 'usage: info' in std_output
+ assert 'Show information about the torrents' in std_output
def test_console_unrecognized_arguments(self):
self.patch(
@@ -361,8 +316,9 @@ class ConsoleUIBaseTestCase(UIBaseTestCase):
fd = StringFileDescriptor(sys.stdout)
self.patch(argparse._sys, 'stderr', fd)
with mock.patch('deluge.ui.console.main.ConsoleUI'):
- self.assertRaises(SystemExit, self.exec_command)
- self.assertIn('unrecognized arguments: --ui', fd.out.getvalue())
+ with pytest.raises(SystemExit):
+ self.exec_command()
+ assert 'unrecognized arguments: --ui' in fd.out.getvalue()
class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase):
@@ -390,26 +346,28 @@ class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase):
+ command,
)
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_console_command_add(self):
filename = common.get_test_data_file('test.torrent')
- self.patch_arg_command(['add ' + filename])
+ self.patch_arg_command([f'add "{filename}"'])
fd = StringFileDescriptor(sys.stdout)
self.patch(sys, 'stdout', fd)
yield self.exec_command()
std_output = fd.out.getvalue()
- self.assertEqual(
- std_output, 'Attempting to add torrent: ' + filename + '\nTorrent added!\n'
+ assert (
+ std_output
+ == 'Attempting to add torrent: ' + filename + '\nTorrent added!\n'
)
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_console_command_add_move_completed(self):
filename = common.get_test_data_file('test.torrent')
+ tmp_path = 'c:\\tmp' if windows_check() else '/tmp'
self.patch_arg_command(
[
- 'add --move-path /tmp ' + filename + ' ; status'
+ f'add --move-path "{tmp_path}" "{filename}" ; status'
' ; manage'
' ab570cdd5a17ea1b61e970bb72047de141bce173'
' move_completed'
@@ -422,22 +380,23 @@ class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase):
yield self.exec_command()
std_output = fd.out.getvalue()
- self.assertTrue(
- std_output.endswith('move_completed: True\nmove_completed_path: /tmp\n')
- or std_output.endswith('move_completed_path: /tmp\nmove_completed: True\n')
+ assert std_output.endswith(
+ f'move_completed: True\nmove_completed_path: {tmp_path}\n'
+ ) or std_output.endswith(
+ f'move_completed_path: {tmp_path}\nmove_completed: True\n'
)
- @defer.inlineCallbacks
- def test_console_command_status(self):
+ @pytest_twisted.ensureDeferred
+ async def test_console_command_status(self):
fd = StringFileDescriptor(sys.stdout)
self.patch_arg_command(['status'])
self.patch(sys, 'stdout', fd)
- yield self.exec_command()
+ await self.exec_command()
std_output = fd.out.getvalue()
- self.assertTrue(std_output.startswith('Total upload: '))
- self.assertTrue(std_output.endswith(' Moving: 0\n'))
+ assert std_output.startswith('Total upload: ')
+ assert std_output.endswith(' Moving: 0\n')
@defer.inlineCallbacks
def test_console_command_config_set_download_location(self):
@@ -447,79 +406,36 @@ class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase):
yield self.exec_command()
std_output = fd.out.getvalue()
- self.assertTrue(
- std_output.startswith(
- 'Setting "download_location" to: {}\'/downloads\''.format(
- 'u' if PY2 else ''
- )
- )
- )
- self.assertTrue(
- std_output.endswith('Configuration value successfully updated.\n')
- )
-
-
-class ConsoleScriptEntryWithDaemonTestCase(
- BaseTestCase, ConsoleUIWithDaemonBaseTestCase
-):
-
- if windows_check():
- skip = 'Console ui test on Windows broken due to sys args issue'
-
- def __init__(self, testname):
- super(ConsoleScriptEntryWithDaemonTestCase, self).__init__(testname)
- ConsoleUIWithDaemonBaseTestCase.__init__(self)
- self.var['cmd_name'] = 'deluge-console'
- self.var['sys_arg_cmd'] = ['./deluge-console']
-
- def set_up(self):
- from deluge.ui.console.console import Console
-
- def start_console():
- return Console().start()
-
- self.patch(deluge.ui.console, 'start', start_console)
- self.var['start_cmd'] = deluge.ui.console.start
-
- return ConsoleUIWithDaemonBaseTestCase.set_up(self)
-
- def tear_down(self):
- return ConsoleUIWithDaemonBaseTestCase.tear_down(self)
-
-
-class ConsoleScriptEntryTestCase(BaseTestCase, ConsoleUIBaseTestCase):
-
- if windows_check():
- skip = 'Console ui test on Windows broken due to sys args issue'
-
- def __init__(self, testname):
- super(ConsoleScriptEntryTestCase, self).__init__(testname)
- ConsoleUIBaseTestCase.__init__(self)
- self.var['cmd_name'] = 'deluge-console'
- self.var['start_cmd'] = deluge.ui.console.start
- self.var['sys_arg_cmd'] = ['./deluge-console']
-
- def set_up(self):
- return ConsoleUIBaseTestCase.set_up(self)
-
- def tear_down(self):
- return ConsoleUIBaseTestCase.tear_down(self)
-
-
-class ConsoleDelugeScriptEntryTestCase(BaseTestCase, ConsoleUIBaseTestCase):
-
- if windows_check():
- skip = 'cannot test console ui on windows'
-
- def __init__(self, testname):
- super(ConsoleDelugeScriptEntryTestCase, self).__init__(testname)
- ConsoleUIBaseTestCase.__init__(self)
- self.var['cmd_name'] = 'deluge console'
- self.var['start_cmd'] = ui_entry.start_ui
- self.var['sys_arg_cmd'] = ['./deluge', 'console']
-
- def set_up(self):
- return ConsoleUIBaseTestCase.set_up(self)
-
- def tear_down(self):
- return ConsoleUIBaseTestCase.tear_down(self)
+ assert std_output.startswith('Setting "download_location" to: \'/downloads\'')
+ assert std_output.endswith('Configuration value successfully updated.\n')
+
+
+@pytest.mark.usefixtures('daemon', 'client')
+class TestConsoleScriptEntryWithDaemon(BaseTestCase, ConsoleUIWithDaemonBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge-console',
+ 'start_cmd': deluge.ui.console.start,
+ 'sys_arg_cmd': ['./deluge-console'],
+ }
+
+
+class TestConsoleScriptEntry(BaseTestCase, ConsoleUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge-console',
+ 'start_cmd': deluge.ui.console.start,
+ 'sys_arg_cmd': ['./deluge-console'],
+ }
+
+
+class TestConsoleDelugeScriptEntry(BaseTestCase, ConsoleUIBaseTestCase):
+ @pytest.fixture(autouse=True)
+ def set_var(self, request):
+ request.cls.var = {
+ 'cmd_name': 'deluge console',
+ 'start_cmd': ui_entry.start_ui,
+ 'sys_arg_cmd': ['./deluge', 'console'],
+ }
diff --git a/deluge/tests/test_ui_gtk3.py b/deluge/tests/test_ui_gtk3.py
index a208bb494..e6d025c7c 100644
--- a/deluge/tests/test_ui_gtk3.py
+++ b/deluge/tests/test_ui_gtk3.py
@@ -1,21 +1,17 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import sys
+from unittest import mock
-import mock
import pytest
-from twisted.trial import unittest
@pytest.mark.gtkui
-class GTK3CommonTestCase(unittest.TestCase):
+class TestGTK3Common:
def setUp(self):
sys.modules['gi.repository'] = mock.MagicMock()
@@ -25,10 +21,10 @@ class GTK3CommonTestCase(unittest.TestCase):
def test_cmp(self):
from deluge.ui.gtk3.common import cmp
- self.assertEqual(cmp(None, None), 0)
- self.assertEqual(cmp(1, None), 1)
- self.assertEqual(cmp(0, None), 1)
- self.assertEqual(cmp(None, 7), -1)
- self.assertEqual(cmp(None, 'bar'), -1)
- self.assertEqual(cmp('foo', None), 1)
- self.assertEqual(cmp('', None), 1)
+ assert cmp(None, None) == 0
+ assert cmp(1, None) == 1
+ assert cmp(0, None) == 1
+ assert cmp(None, 7) == -1
+ assert cmp(None, 'bar') == -1
+ assert cmp('foo', None) == 1
+ assert cmp('', None) == 1
diff --git a/deluge/tests/test_web_api.py b/deluge/tests/test_web_api.py
index 0180e0bda..56f86aa56 100644
--- a/deluge/tests/test_web_api.py
+++ b/deluge/tests/test_web_api.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
#
@@ -7,19 +6,17 @@
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import json
from io import BytesIO
+import pytest
+import pytest_twisted
from twisted.internet import defer, reactor
-from twisted.python.failure import Failure
from twisted.web.client import Agent, FileBodyProducer
from twisted.web.http_headers import Headers
from twisted.web.static import File
import deluge.component as component
-from deluge.ui.client import client
from . import common
from .common_web import WebServerTestBase
@@ -27,20 +24,19 @@ from .common_web import WebServerTestBase
common.disable_new_release_check()
-class WebAPITestCase(WebServerTestBase):
- def test_connect_invalid_host(self):
- d = self.deluge_web.web_api.connect('id')
- d.addCallback(self.fail)
- d.addErrback(self.assertIsInstance, Failure)
- return d
+class TestWebAPI(WebServerTestBase):
+ @pytest.mark.xfail(reason='This just logs an error at the moment.')
+ @pytest_twisted.ensureDeferred
+ async def test_connect_invalid_host(self):
+ with pytest.raises(Exception):
+ await self.deluge_web.web_api.connect('id')
- def test_connect(self):
+ def test_connect(self, client):
d = self.deluge_web.web_api.connect(self.host_id)
def on_connect(result):
- self.assertEqual(type(result), tuple)
- self.assertTrue(len(result) > 0)
- self.addCleanup(client.disconnect)
+ assert type(result) == tuple
+ assert len(result) > 0
return result
d.addCallback(on_connect)
@@ -52,9 +48,9 @@ class WebAPITestCase(WebServerTestBase):
@defer.inlineCallbacks
def on_connect(result):
- self.assertTrue(self.deluge_web.web_api.connected())
+ assert self.deluge_web.web_api.connected()
yield self.deluge_web.web_api.disconnect()
- self.assertFalse(self.deluge_web.web_api.connected())
+ assert not self.deluge_web.web_api.connected()
d.addCallback(on_connect)
d.addErrback(self.fail)
@@ -62,7 +58,7 @@ class WebAPITestCase(WebServerTestBase):
def test_get_config(self):
config = self.deluge_web.web_api.get_config()
- self.assertEqual(self.webserver_listen_port, config['port'])
+ assert self.webserver_listen_port == config['port']
def test_set_config(self):
config = self.deluge_web.web_api.get_config()
@@ -77,9 +73,9 @@ class WebAPITestCase(WebServerTestBase):
}
self.deluge_web.web_api.set_config(config)
web_config = component.get('DelugeWeb').config.config
- self.assertNotEquals(config['pwd_salt'], web_config['pwd_salt'])
- self.assertNotEquals(config['pwd_sha1'], web_config['pwd_sha1'])
- self.assertNotEquals(config['sessions'], web_config['sessions'])
+ assert config['pwd_salt'] != web_config['pwd_salt']
+ assert config['pwd_sha1'] != web_config['pwd_sha1']
+ assert config['sessions'] != web_config['sessions']
@defer.inlineCallbacks
def get_host_status(self):
@@ -87,49 +83,49 @@ class WebAPITestCase(WebServerTestBase):
host[3] = 'Online'
host[4] = '2.0.0.dev562'
status = yield self.deluge_web.web_api.get_host_status(self.host_id)
- self.assertEqual(status, tuple(status))
+ assert status == tuple(status)
def test_get_host(self):
- self.assertFalse(self.deluge_web.web_api._get_host('invalid_id'))
+ assert not self.deluge_web.web_api._get_host('invalid_id')
conn = list(self.deluge_web.web_api.hostlist.get_hosts_info()[0])
- self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4])
+ assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4]
def test_add_host(self):
conn = ['abcdef', '10.0.0.1', 0, 'user123', 'pass123']
- self.assertFalse(self.deluge_web.web_api._get_host(conn[0]))
+ assert not self.deluge_web.web_api._get_host(conn[0])
# Add valid host
result, host_id = self.deluge_web.web_api.add_host(
conn[1], conn[2], conn[3], conn[4]
)
- self.assertEqual(result, True)
+ assert result
conn[0] = host_id
- self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4])
+ assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4]
# Add already existing host
ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4])
- self.assertEqual(ret, (False, 'Host details already in hostlist'))
+ assert ret == (False, 'Host details already in hostlist')
# Add invalid port
conn[2] = 'bad port'
ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4])
- self.assertEqual(ret, (False, 'Invalid port. Must be an integer'))
+ assert ret == (False, 'Invalid port. Must be an integer')
def test_remove_host(self):
conn = ['connection_id', '', 0, '', '']
self.deluge_web.web_api.hostlist.config['hosts'].append(conn)
- self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4])
+ assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4]
# Remove valid host
- self.assertTrue(self.deluge_web.web_api.remove_host(conn[0]))
- self.assertFalse(self.deluge_web.web_api._get_host(conn[0]))
+ assert self.deluge_web.web_api.remove_host(conn[0])
+ assert not self.deluge_web.web_api._get_host(conn[0])
# Remove non-existing host
- self.assertFalse(self.deluge_web.web_api.remove_host(conn[0]))
+ assert not self.deluge_web.web_api.remove_host(conn[0])
def test_get_torrent_info(self):
filename = common.get_test_data_file('test.torrent')
ret = self.deluge_web.web_api.get_torrent_info(filename)
- self.assertEqual(ret['name'], 'azcvsupdater_2.6.2.jar')
- self.assertEqual(ret['info_hash'], 'ab570cdd5a17ea1b61e970bb72047de141bce173')
- self.assertTrue('files_tree' in ret)
+ assert ret['name'] == 'azcvsupdater_2.6.2.jar'
+ assert ret['info_hash'] == 'ab570cdd5a17ea1b61e970bb72047de141bce173'
+ assert 'files_tree' in ret
def test_get_torrent_info_with_md5(self):
filename = common.get_test_data_file('md5sum.torrent')
@@ -137,19 +133,19 @@ class WebAPITestCase(WebServerTestBase):
# JSON dumping happens during response creation in normal usage
# JSON serialization may fail if any of the dictionary items are byte arrays rather than strings
ret = json.loads(json.dumps(ret))
- self.assertEqual(ret['name'], 'test')
- self.assertEqual(ret['info_hash'], 'f6408ba9944cf9fe01b547b28f336b3ee6ec32c5')
- self.assertTrue('files_tree' in ret)
+ assert ret['name'] == 'test'
+ assert ret['info_hash'] == 'f6408ba9944cf9fe01b547b28f336b3ee6ec32c5'
+ assert 'files_tree' in ret
def test_get_magnet_info(self):
ret = self.deluge_web.web_api.get_magnet_info(
'magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN'
)
- self.assertEqual(ret['name'], '953bad769164e8482c7785a21d12166f94b9e14d')
- self.assertEqual(ret['info_hash'], '953bad769164e8482c7785a21d12166f94b9e14d')
- self.assertTrue('files_tree' in ret)
+ assert ret['name'] == '953bad769164e8482c7785a21d12166f94b9e14d'
+ assert ret['info_hash'] == '953bad769164e8482c7785a21d12166f94b9e14d'
+ assert 'files_tree' in ret
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_get_torrent_files(self):
yield self.deluge_web.web_api.connect(self.host_id)
filename = common.get_test_data_file('test.torrent')
@@ -160,23 +156,20 @@ class WebAPITestCase(WebServerTestBase):
ret = yield self.deluge_web.web_api.get_torrent_files(
'ab570cdd5a17ea1b61e970bb72047de141bce173'
)
- self.assertEqual(ret['type'], 'dir')
- self.assertEqual(
- ret['contents'],
- {
- 'azcvsupdater_2.6.2.jar': {
- 'priority': 4,
- 'index': 0,
- 'offset': 0,
- 'progress': 0.0,
- 'path': 'azcvsupdater_2.6.2.jar',
- 'type': 'file',
- 'size': 307949,
- }
- },
- )
+ assert ret['type'] == 'dir'
+ assert ret['contents'] == {
+ 'azcvsupdater_2.6.2.jar': {
+ 'priority': 4,
+ 'index': 0,
+ 'offset': 0,
+ 'progress': 0.0,
+ 'path': 'azcvsupdater_2.6.2.jar',
+ 'type': 'file',
+ 'size': 307949,
+ }
+ }
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_download_torrent_from_url(self):
filename = 'ubuntu-9.04-desktop-i386.iso.torrent'
self.deluge_web.top_level.putChild(
@@ -184,9 +177,9 @@ class WebAPITestCase(WebServerTestBase):
)
url = 'http://localhost:%d/%s' % (self.webserver_listen_port, filename)
res = yield self.deluge_web.web_api.download_torrent_from_url(url)
- self.assertTrue(res.endswith(filename))
+ assert res.endswith(filename)
- @defer.inlineCallbacks
+ @pytest_twisted.inlineCallbacks
def test_invalid_json(self):
"""
If json_api._send_response does not return server.NOT_DONE_YET
diff --git a/deluge/tests/test_web_auth.py b/deluge/tests/test_web_auth.py
index a5185737c..39d66c1c1 100644
--- a/deluge/tests/test_web_auth.py
+++ b/deluge/tests/test_web_auth.py
@@ -1,18 +1,15 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-from mock import patch
-from twisted.trial import unittest
+from unittest.mock import patch
from deluge.ui.web import auth
-class MockConfig(object):
+class MockConfig:
def __init__(self, config):
self.config = config
@@ -23,7 +20,7 @@ class MockConfig(object):
self.config[key] = value
-class WebAuthTestCase(unittest.TestCase):
+class TestWebAuth:
@patch('deluge.ui.web.auth.JSONComponent.__init__', return_value=None)
def test_change_password(self, mock_json):
config = MockConfig(
@@ -33,4 +30,4 @@ class WebAuthTestCase(unittest.TestCase):
}
)
webauth = auth.Auth(config)
- self.assertTrue(webauth.change_password('deluge', 'deluge_new'))
+ assert webauth.change_password('deluge', 'deluge_new')
diff --git a/deluge/tests/test_webserver.py b/deluge/tests/test_webserver.py
index d9684bacd..e1588fdf3 100644
--- a/deluge/tests/test_webserver.py
+++ b/deluge/tests/test_webserver.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com>
#
@@ -7,13 +6,12 @@
# See LICENSE for more details.
#
-from __future__ import unicode_literals
-
import json as json_lib
from io import BytesIO
+import pytest_twisted
import twisted.web.client
-from twisted.internet import defer, reactor
+from twisted.internet import reactor
from twisted.web.client import Agent, FileBodyProducer
from twisted.web.http_headers import Headers
@@ -24,8 +22,8 @@ from .common_web import WebServerMockBase, WebServerTestBase
common.disable_new_release_check()
-class WebServerTestCase(WebServerTestBase, WebServerMockBase):
- @defer.inlineCallbacks
+class TestWebServer(WebServerTestBase, WebServerMockBase):
+ @pytest_twisted.inlineCallbacks
def test_get_torrent_info(self):
agent = Agent(reactor)
@@ -37,7 +35,8 @@ class WebServerTestCase(WebServerTestBase, WebServerMockBase):
# UnicodeDecodeError: 'utf8' codec can't decode byte 0xe5 in position 0: invalid continuation byte
filename = get_test_data_file('filehash_field.torrent')
input_file = (
- '{"params": ["%s"], "method": "web.get_torrent_info", "id": 22}' % filename
+ '{"params": ["%s"], "method": "web.get_torrent_info", "id": 22}'
+ % filename.replace('\\', '\\\\')
)
headers = {
b'User-Agent': ['Twisted Web Client Example'],
@@ -51,9 +50,11 @@ class WebServerTestCase(WebServerTestBase, WebServerMockBase):
Headers(headers),
FileBodyProducer(BytesIO(input_file.encode('utf-8'))),
)
-
body = yield twisted.web.client.readBody(d)
- json = json_lib.loads(body.decode())
- self.assertEqual(None, json['error'])
- self.assertEqual('torrent_filehash', json['result']['name'])
+ try:
+ json = json_lib.loads(body.decode())
+ except Exception:
+ print('aoeu')
+ assert json['error'] is None
+ assert 'torrent_filehash' == json['result']['name']
diff --git a/deluge/tests/twisted/plugins/delugereporter.py b/deluge/tests/twisted/plugins/delugereporter.py
deleted file mode 100644
index c2a7b52b5..000000000
--- a/deluge/tests/twisted/plugins/delugereporter.py
+++ /dev/null
@@ -1,50 +0,0 @@
-#! /usr/bin/env python
-# -*- coding: utf-8 -*-
-#
-# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
-# the additional special exception to link portions of this program with the OpenSSL library.
-# See LICENSE for more details.
-#
-
-from __future__ import unicode_literals
-
-import os
-
-from twisted.plugin import IPlugin
-from twisted.trial.itrial import IReporter
-from twisted.trial.reporter import TreeReporter
-from zope.interface import implements
-
-
-class _Reporter(object):
- implements(IPlugin, IReporter)
-
- def __init__(
- self, name, module, description, longOpt, shortOpt, klass # noqa: N803
- ):
- self.name = name
- self.module = module
- self.description = description
- self.longOpt = longOpt
- self.shortOpt = shortOpt
- self.klass = klass
-
-
-deluge = _Reporter(
- 'Deluge reporter that suppresses Stacktrace from TODO tests',
- 'twisted.plugins.delugereporter',
- description='Deluge Reporter',
- longOpt='deluge-reporter',
- shortOpt=None,
- klass='DelugeReporter',
-)
-
-
-class DelugeReporter(TreeReporter):
- def __init__(self, *args, **kwargs):
- os.environ['DELUGE_REPORTER'] = 'true'
- TreeReporter.__init__(self, *args, **kwargs)
-
- def addExpectedFailure(self, *args): # NOQA: N802
- # super(TreeReporter, self).addExpectedFailure(*args)
- self.endLine('[TODO]', self.TODO)