diff options
Diffstat (limited to 'deluge/tests')
41 files changed, 1520 insertions, 1670 deletions
diff --git a/deluge/tests/__init__.py b/deluge/tests/__init__.py index d3bf10def..7b6afa194 100644 --- a/deluge/tests/__init__.py +++ b/deluge/tests/__init__.py @@ -1,7 +1,5 @@ # Increase open file descriptor limit to allow tests to run # without getting error: what(): epoll: Too many open files -from __future__ import print_function, unicode_literals - from deluge.i18n import setup_translation try: diff --git a/deluge/tests/basetest.py b/deluge/tests/basetest.py deleted file mode 100644 index 11ca18e53..000000000 --- a/deluge/tests/basetest.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- coding: utf-8 -*- -# -# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with -# the additional special exception to link portions of this program with the OpenSSL library. -# See LICENSE for more details. -# - -from __future__ import unicode_literals - -import warnings - -from twisted.internet.defer import maybeDeferred -from twisted.trial import unittest - -import deluge.component as component - - -class BaseTestCase(unittest.TestCase): - """This is the base class that should be used for all test classes - that create classes that inherit from deluge.component.Component. It - ensures that the component registry has been cleaned up when tests - have finished. - - """ - - def setUp(self): # NOQA: N803 - - if len(component._ComponentRegistry.components) != 0: - warnings.warn( - 'The component._ComponentRegistry.components is not empty on test setup.\n' - 'This is probably caused by another test that did not clean up after finishing!: %s' - % component._ComponentRegistry.components - ) - d = maybeDeferred(self.set_up) - - def on_setup_error(error): - warnings.warn('Error caught in test setup!\n%s' % error.getTraceback()) - self.fail() - - return d.addErrback(on_setup_error) - - def tearDown(self): # NOQA: N803 - d = maybeDeferred(self.tear_down) - - def on_teardown_failed(error): - warnings.warn('Error caught in test teardown!\n%s' % error.getTraceback()) - self.fail() - - def on_teardown_complete(result): - component._ComponentRegistry.components.clear() - component._ComponentRegistry.dependents.clear() - - return d.addCallbacks(on_teardown_complete, on_teardown_failed) - - def set_up(self): - pass - - def tear_down(self): - pass diff --git a/deluge/tests/common.py b/deluge/tests/common.py index be33f8c58..b5941568d 100644 --- a/deluge/tests/common.py +++ b/deluge/tests/common.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> # @@ -7,22 +6,21 @@ # See LICENSE for more details. # -from __future__ import print_function, unicode_literals - import os import sys -import tempfile import traceback +import pytest from twisted.internet import defer, protocol, reactor from twisted.internet.defer import Deferred from twisted.internet.error import CannotListenError -from twisted.trial import unittest import deluge.configmanager import deluge.core.preferencesmanager import deluge.log +from deluge.common import get_localhost_auth from deluge.error import DelugeError +from deluge.ui.client import Client # This sets log level to critical, so use log.critical() to debug while running unit tests deluge.log.setup_logger('none') @@ -32,12 +30,6 @@ def disable_new_release_check(): deluge.core.preferencesmanager.DEFAULT_PREFS['new_release_check'] = False -def set_tmp_config_dir(): - config_directory = tempfile.mkdtemp() - deluge.configmanager.set_config_dir(config_directory) - return config_directory - - def setup_test_logger(level='info', prefix='deluge'): deluge.log.setup_logger(level, filename='%s.log' % prefix, twisted_observer=False) @@ -55,7 +47,7 @@ def todo_test(caller): filename = os.path.basename(traceback.extract_stack(None, 2)[0][0]) funcname = traceback.extract_stack(None, 2)[0][2] - raise unittest.SkipTest('TODO: %s:%s' % (filename, funcname)) + pytest.skip(f'TODO: {filename}:{funcname}') def add_watchdog(deferred, timeout=0.05, message=None): @@ -73,7 +65,7 @@ def add_watchdog(deferred, timeout=0.05, message=None): return watchdog -class ReactorOverride(object): +class ReactorOverride: """Class used to patch reactor while running unit tests to avoid starting and stopping the twisted reactor """ @@ -97,12 +89,19 @@ class ReactorOverride(object): class ProcessOutputHandler(protocol.ProcessProtocol): def __init__( - self, script, callbacks, logfile=None, print_stdout=True, print_stderr=True + self, + script, + shutdown_func, + callbacks, + logfile=None, + print_stdout=True, + print_stderr=True, ): """Executes a script and handle the process' output to stdout and stderr. Args: script (str): The script to execute. + shutdown_func (func): A function which will gracefully stop the called script. callbacks (list): Callbacks to trigger if the expected output if found. logfile (str, optional): Filename to wrote the process' output. print_stderr (bool): Print the process' stderr output to stdout. @@ -111,6 +110,7 @@ class ProcessOutputHandler(protocol.ProcessProtocol): """ self.callbacks = callbacks self.script = script + self.shutdown_func = shutdown_func self.log_output = '' self.stderr_out = '' self.logfile = logfile @@ -130,6 +130,7 @@ class ProcessOutputHandler(protocol.ProcessProtocol): with open(self.logfile, 'w') as f: f.write(self.log_output) + @defer.inlineCallbacks def kill(self): """Kill the running process. @@ -142,11 +143,17 @@ class ProcessOutputHandler(protocol.ProcessProtocol): self.killed = True self._kill_watchdogs() self.quit_d = Deferred() - self.transport.signalProcess('INT') - return self.quit_d + shutdown = self.shutdown_func() + shutdown.addTimeout(5, reactor) + try: + yield shutdown + except Exception: + self.transport.signalProcess('TERM') + result = yield self.quit_d + return result def _kill_watchdogs(self): - """"Cancel all watchdogs""" + """Cancel all watchdogs""" for w in self.watchdogs: if not w.called and not w.cancelled: w.cancel() @@ -205,7 +212,7 @@ class ProcessOutputHandler(protocol.ProcessProtocol): def start_core( - listen_port=58846, + listen_port=58900, logfile=None, timeout=10, timeout_msg=None, @@ -213,13 +220,14 @@ def start_core( print_stdout=True, print_stderr=True, extra_callbacks=None, + config_directory='', ): """Start the deluge core as a daemon. Args: listen_port (int, optional): The port the daemon listens for client connections. logfile (str, optional): Logfile name to write the output from the process. - timeout (int): If none of the callbacks have been triggered before the imeout, the process is killed. + timeout (int): If none of the callbacks have been triggered before the timeout, the process is killed. timeout_msg (str): The message to print when the timeout expires. custom_script (str): Extra python code to insert into the daemon process script. print_stderr (bool): If the output from the process' stderr should be printed to stdout. @@ -234,7 +242,6 @@ def start_core( or upon timeout expiry. The ProcessOutputHandler is the handler for the deluged process. """ - config_directory = set_tmp_config_dir() daemon_script = """ import sys import deluge.core.daemon_entry @@ -254,7 +261,7 @@ except Exception: import traceback sys.stderr.write('Exception raised:\\n %%s' %% traceback.format_exc()) """ % { - 'dir': config_directory, + 'dir': config_directory.as_posix(), 'port': listen_port, 'script': custom_script, } @@ -289,20 +296,30 @@ except Exception: if extra_callbacks: callbacks.extend(extra_callbacks) + @defer.inlineCallbacks + def shutdown_daemon(): + username, password = get_localhost_auth() + client = Client() + yield client.connect( + 'localhost', listen_port, username=username, password=password + ) + yield client.daemon.shutdown() + process_protocol = start_process( - daemon_script, callbacks, logfile, print_stdout, print_stderr + daemon_script, shutdown_daemon, callbacks, logfile, print_stdout, print_stderr ) return default_core_cb['deferred'], process_protocol def start_process( - script, callbacks, logfile=None, print_stdout=True, print_stderr=True + script, shutdown_func, callbacks, logfile=None, print_stdout=True, print_stderr=True ): """ Starts an external python process which executes the given script. Args: script (str): The content of the script to execute. + shutdown_func (func): A function which will gracefully end the called script. callbacks (list): list of dictionaries specifying callbacks. logfile (str, optional): Logfile name to write the output from the process. print_stderr (bool): If the output from the process' stderr should be printed to stdout. @@ -324,7 +341,12 @@ def start_process( """ cwd = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) process_protocol = ProcessOutputHandler( - script.encode('utf8'), callbacks, logfile, print_stdout, print_stderr + script.encode('utf8'), + shutdown_func, + callbacks, + logfile, + print_stdout, + print_stderr, ) # Add timeouts to deferreds diff --git a/deluge/tests/common_web.py b/deluge/tests/common_web.py index 706eb8d72..8db49d243 100644 --- a/deluge/tests/common_web.py +++ b/deluge/tests/common_web.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> # @@ -7,21 +6,20 @@ # See LICENSE for more details. # -from __future__ import unicode_literals +import pytest import deluge.common -import deluge.component as component import deluge.ui.web.auth import deluge.ui.web.server from deluge import configmanager +from deluge.conftest import BaseTestCase from deluge.ui.web.server import DelugeWeb -from .basetest import BaseTestCase from .common import ReactorOverride -from .daemon_base import DaemonBase -class WebServerTestBase(BaseTestCase, DaemonBase): +@pytest.mark.usefixtures('daemon', 'component') +class WebServerTestBase(BaseTestCase): """ Base class for tests that need a running webapi @@ -30,10 +28,7 @@ class WebServerTestBase(BaseTestCase, DaemonBase): def set_up(self): self.host_id = None deluge.ui.web.server.reactor = ReactorOverride() - d = self.common_set_up() - d.addCallback(self.start_core) - d.addCallback(self.start_webapi) - return d + return self.start_webapi(None) def start_webapi(self, arg): self.webserver_listen_port = 8999 @@ -50,13 +45,8 @@ class WebServerTestBase(BaseTestCase, DaemonBase): self.host_id = host[0] self.deluge_web.start() - def tear_down(self): - d = component.shutdown() - d.addCallback(self.terminate_core) - return d - -class WebServerMockBase(object): +class WebServerMockBase: """ Class with utility functions for mocking with tests using the webserver diff --git a/deluge/tests/daemon_base.py b/deluge/tests/daemon_base.py index 7352e0d32..3ae86c4ca 100644 --- a/deluge/tests/daemon_base.py +++ b/deluge/tests/daemon_base.py @@ -1,12 +1,9 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import print_function, unicode_literals - import os.path import pytest @@ -14,24 +11,13 @@ from twisted.internet import defer from twisted.internet.error import CannotListenError import deluge.component as component -from deluge.common import windows_check from . import common -@pytest.mark.usefixtures('get_pytest_basetemp') -class DaemonBase(object): - basetemp = None - - if windows_check(): - skip = 'windows cant start_core not enough arguments for format string' - - @pytest.fixture - def get_pytest_basetemp(self, request): - self.basetemp = request.config.option.basetemp - +@pytest.mark.usefixtures('config_dir') +class DaemonBase: def common_set_up(self): - common.set_tmp_config_dir() self.listen_port = 58900 self.core = None return component.start() @@ -78,6 +64,7 @@ class DaemonBase(object): print_stdout=print_stdout, print_stderr=print_stderr, extra_callbacks=extra_callbacks, + config_directory=self.config_dir, ) yield d except CannotListenError as ex: diff --git a/deluge/tests/data/seo.ico b/deluge/tests/data/seo.ico Binary files differdeleted file mode 100644 index 841e52871..000000000 --- a/deluge/tests/data/seo.ico +++ /dev/null diff --git a/deluge/tests/data/seo.svg b/deluge/tests/data/seo.svg new file mode 100644 index 000000000..fc96f74d4 --- /dev/null +++ b/deluge/tests/data/seo.svg @@ -0,0 +1 @@ +<?xml version="1.0" encoding="UTF-8"?> <svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 95.63 110.42" width="10cm" height="10cm"><defs><style>.cls-1{fill:#ec5728;}.cls-2{fill:#fff;}</style></defs><title>seocom-target</title><g id="Layer_2" data-name="Layer 2"><g id="Layer_1-2" data-name="Layer 1"><polygon class="cls-1" points="95.63 82.81 47.81 110.42 0 82.81 0 27.61 47.81 0 95.63 27.61 95.63 82.81"></polygon><path class="cls-2" d="M47.81,18.64A36.57,36.57,0,1,0,84.38,55.21,36.57,36.57,0,0,0,47.81,18.64Zm0,63.92A27.35,27.35,0,1,1,75.16,55.21,27.35,27.35,0,0,1,47.81,82.56Z"></path><path class="cls-2" d="M47.81,39.46A15.75,15.75,0,1,0,63.56,55.21,15.75,15.75,0,0,0,47.81,39.46Zm0,24.25a8.5,8.5,0,1,1,8.5-8.5A8.51,8.51,0,0,1,47.81,63.71Z"></path></g></g></svg>
\ No newline at end of file diff --git a/deluge/tests/test_alertmanager.py b/deluge/tests/test_alertmanager.py index f197882cd..5e63864e8 100644 --- a/deluge/tests/test_alertmanager.py +++ b/deluge/tests/test_alertmanager.py @@ -1,19 +1,15 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - import deluge.component as component +from deluge.conftest import BaseTestCase from deluge.core.core import Core -from .basetest import BaseTestCase - -class AlertManagerTestCase(BaseTestCase): +class TestAlertManager(BaseTestCase): def set_up(self): self.core = Core() self.core.config.config['lsd'] = False @@ -28,7 +24,7 @@ class AlertManagerTestCase(BaseTestCase): return self.am.register_handler('dummy_alert', handler) - self.assertEqual(self.am.handlers['dummy_alert'], [handler]) + assert self.am.handlers['dummy_alert'] == [handler] def test_deregister_handler(self): def handler(alert): @@ -36,4 +32,4 @@ class AlertManagerTestCase(BaseTestCase): self.am.register_handler('dummy_alert', handler) self.am.deregister_handler(handler) - self.assertEqual(self.am.handlers['dummy_alert'], []) + assert self.am.handlers['dummy_alert'] == [] diff --git a/deluge/tests/test_authmanager.py b/deluge/tests/test_authmanager.py index 91e122f73..aa86fdbac 100644 --- a/deluge/tests/test_authmanager.py +++ b/deluge/tests/test_authmanager.py @@ -1,20 +1,16 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - import deluge.component as component from deluge.common import get_localhost_auth +from deluge.conftest import BaseTestCase from deluge.core.authmanager import AUTH_LEVEL_ADMIN, AuthManager -from .basetest import BaseTestCase - -class AuthManagerTestCase(BaseTestCase): +class TestAuthManager(BaseTestCase): def set_up(self): self.auth = AuthManager() self.auth.start() @@ -24,4 +20,4 @@ class AuthManagerTestCase(BaseTestCase): return component.shutdown() def test_authorize(self): - self.assertEqual(self.auth.authorize(*get_localhost_auth()), AUTH_LEVEL_ADMIN) + assert self.auth.authorize(*get_localhost_auth()) == AUTH_LEVEL_ADMIN diff --git a/deluge/tests/test_bencode.py b/deluge/tests/test_bencode.py index b49c21f83..a4a76818f 100644 --- a/deluge/tests/test_bencode.py +++ b/deluge/tests/test_bencode.py @@ -1,19 +1,17 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals -from twisted.trial import unittest +import pytest from deluge import bencode from . import common -class BencodeTestCase(unittest.TestCase): +class TestBencode: def test_bencode_unicode_metainfo(self): filename = common.get_test_data_file('test.torrent') with open(filename, 'rb') as _file: @@ -21,14 +19,14 @@ class BencodeTestCase(unittest.TestCase): bencode.bencode({b'info': metainfo}) def test_bencode_unicode_value(self): - self.assertEqual(bencode.bencode(b'abc'), b'3:abc') - self.assertEqual(bencode.bencode('abc'), b'3:abc') + assert bencode.bencode(b'abc') == b'3:abc' + assert bencode.bencode('abc') == b'3:abc' def test_bdecode(self): - self.assertEqual(bencode.bdecode(b'3:dEf'), b'dEf') - with self.assertRaises(bencode.BTFailure): + assert bencode.bdecode(b'3:dEf') == b'dEf' + with pytest.raises(bencode.BTFailure): bencode.bdecode('dEf') - with self.assertRaises(bencode.BTFailure): + with pytest.raises(bencode.BTFailure): bencode.bdecode(b'dEf') - with self.assertRaises(bencode.BTFailure): + with pytest.raises(bencode.BTFailure): bencode.bdecode({'dEf': 123}) diff --git a/deluge/tests/test_client.py b/deluge/tests/test_client.py index ae1e95a71..5a6727907 100644 --- a/deluge/tests/test_client.py +++ b/deluge/tests/test_client.py @@ -1,23 +1,17 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # - -from __future__ import unicode_literals - +import pytest +import pytest_twisted from twisted.internet import defer -import deluge.component as component from deluge import error from deluge.common import AUTH_LEVEL_NORMAL, get_localhost_auth from deluge.core.authmanager import AUTH_LEVEL_ADMIN from deluge.ui.client import Client, DaemonSSLProxy, client -from .basetest import BaseTestCase -from .daemon_base import DaemonBase - class NoVersionSendingDaemonSSLProxy(DaemonSSLProxy): def authenticate(self, username, password): @@ -78,24 +72,13 @@ class NoVersionSendingClient(Client): self.disconnect_callback() -class ClientTestCase(BaseTestCase, DaemonBase): - def set_up(self): - d = self.common_set_up() - d.addCallback(self.start_core) - d.addErrback(self.terminate_core) - return d - - def tear_down(self): - d = component.shutdown() - d.addCallback(self.terminate_core) - return d - +@pytest.mark.usefixtures('daemon', 'client') +class TestClient: def test_connect_no_credentials(self): d = client.connect('localhost', self.listen_port, username='', password='') def on_connect(result): - self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN) - self.addCleanup(client.disconnect) + assert client.get_auth_level() == AUTH_LEVEL_ADMIN return result d.addCallbacks(on_connect, self.fail) @@ -108,8 +91,7 @@ class ClientTestCase(BaseTestCase, DaemonBase): ) def on_connect(result): - self.assertEqual(client.get_auth_level(), AUTH_LEVEL_ADMIN) - self.addCleanup(client.disconnect) + assert client.get_auth_level() == AUTH_LEVEL_ADMIN return result d.addCallbacks(on_connect, self.fail) @@ -122,21 +104,18 @@ class ClientTestCase(BaseTestCase, DaemonBase): ) def on_failure(failure): - self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError) - self.assertEqual(failure.value.message, 'Password does not match') - self.addCleanup(client.disconnect) + assert failure.trap(error.BadLoginError) == error.BadLoginError + assert failure.value.message == 'Password does not match' d.addCallbacks(self.fail, on_failure) return d def test_connect_invalid_user(self): - username, password = get_localhost_auth() d = client.connect('localhost', self.listen_port, username='invalid-user') def on_failure(failure): - self.assertEqual(failure.trap(error.BadLoginError), error.BadLoginError) - self.assertEqual(failure.value.message, 'Username does not exist') - self.addCleanup(client.disconnect) + assert failure.trap(error.BadLoginError) == error.BadLoginError + assert failure.value.message == 'Username does not exist' d.addCallbacks(self.fail, on_failure) return d @@ -146,16 +125,16 @@ class ClientTestCase(BaseTestCase, DaemonBase): d = client.connect('localhost', self.listen_port, username=username) def on_failure(failure): - self.assertEqual( - failure.trap(error.AuthenticationRequired), error.AuthenticationRequired + assert ( + failure.trap(error.AuthenticationRequired) + == error.AuthenticationRequired ) - self.assertEqual(failure.value.username, username) - self.addCleanup(client.disconnect) + assert failure.value.username == username d.addCallbacks(self.fail, on_failure) return d - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_connect_with_password(self): username, password = get_localhost_auth() yield client.connect( @@ -166,19 +145,15 @@ class ClientTestCase(BaseTestCase, DaemonBase): ret = yield client.connect( 'localhost', self.listen_port, username='testuser', password='testpw' ) - self.assertEqual(ret, AUTH_LEVEL_NORMAL) - yield + assert ret == AUTH_LEVEL_NORMAL - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_invalid_rpc_method_call(self): yield client.connect('localhost', self.listen_port, username='', password='') d = client.core.invalid_method() def on_failure(failure): - self.assertEqual( - failure.trap(error.WrappedException), error.WrappedException - ) - self.addCleanup(client.disconnect) + assert failure.trap(error.WrappedException) == error.WrappedException d.addCallbacks(self.fail, on_failure) yield d @@ -191,10 +166,7 @@ class ClientTestCase(BaseTestCase, DaemonBase): ) def on_failure(failure): - self.assertEqual( - failure.trap(error.IncompatibleClient), error.IncompatibleClient - ) - self.addCleanup(no_version_sending_client.disconnect) + assert failure.trap(error.IncompatibleClient) == error.IncompatibleClient d.addCallbacks(self.fail, on_failure) return d diff --git a/deluge/tests/test_common.py b/deluge/tests/test_common.py index 4f6aa2fd4..780d368ef 100644 --- a/deluge/tests/test_common.py +++ b/deluge/tests/test_common.py @@ -1,17 +1,15 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - import os import sys import tarfile +from urllib.parse import quote_plus -from twisted.trial import unittest +import pytest from deluge.common import ( VersionSplit, @@ -22,8 +20,11 @@ from deluge.common import ( fsize, fspeed, ftime, + get_magnet_info, get_path_size, is_infohash, + is_interface, + is_interface_name, is_ip, is_ipv4, is_ipv6, @@ -31,113 +32,123 @@ from deluge.common import ( is_url, windows_check, ) -from deluge.i18n import setup_translation - -from .common import get_test_data_file, set_tmp_config_dir - -class CommonTestCase(unittest.TestCase): - def setUp(self): # NOQA - self.config_dir = set_tmp_config_dir() - setup_translation() +from .common import get_test_data_file - def tearDown(self): # NOQA - pass +class TestCommon: def test_fsize(self): - self.assertEqual(fsize(0), '0 B') - self.assertEqual(fsize(100), '100 B') - self.assertEqual(fsize(1023), '1023 B') - self.assertEqual(fsize(1024), '1.0 KiB') - self.assertEqual(fsize(1048575), '1024.0 KiB') - self.assertEqual(fsize(1048576), '1.0 MiB') - self.assertEqual(fsize(1073741823), '1024.0 MiB') - self.assertEqual(fsize(1073741824), '1.0 GiB') - self.assertEqual(fsize(112245), '109.6 KiB') - self.assertEqual(fsize(110723441824), '103.1 GiB') - self.assertEqual(fsize(1099511627775), '1024.0 GiB') - self.assertEqual(fsize(1099511627777), '1.0 TiB') - self.assertEqual(fsize(766148267453245), '696.8 TiB') + assert fsize(0) == '0 B' + assert fsize(100) == '100 B' + assert fsize(1023) == '1023 B' + assert fsize(1024) == '1.0 KiB' + assert fsize(1048575) == '1024.0 KiB' + assert fsize(1048576) == '1.0 MiB' + assert fsize(1073741823) == '1024.0 MiB' + assert fsize(1073741824) == '1.0 GiB' + assert fsize(112245) == '109.6 KiB' + assert fsize(110723441824) == '103.1 GiB' + assert fsize(1099511627775) == '1024.0 GiB' + assert fsize(1099511627777) == '1.0 TiB' + assert fsize(766148267453245) == '696.8 TiB' def test_fpcnt(self): - self.assertTrue(fpcnt(0.9311) == '93.11%') + assert fpcnt(0.9311) == '93.11%' def test_fspeed(self): - self.assertTrue(fspeed(43134) == '42.1 KiB/s') + assert fspeed(43134) == '42.1 KiB/s' def test_fpeer(self): - self.assertTrue(fpeer(10, 20) == '10 (20)') - self.assertTrue(fpeer(10, -1) == '10') + assert fpeer(10, 20) == '10 (20)' + assert fpeer(10, -1) == '10' def test_ftime(self): - self.assertEqual(ftime(0), '') - self.assertEqual(ftime(5), '5s') - self.assertEqual(ftime(100), '1m 40s') - self.assertEqual(ftime(3789), '1h 3m') - self.assertEqual(ftime(23011), '6h 23m') - self.assertEqual(ftime(391187), '4d 12h') - self.assertEqual(ftime(604800), '1w 0d') - self.assertEqual(ftime(13893086), '22w 6d') - self.assertEqual(ftime(59740269), '1y 46w') - self.assertEqual(ftime(61.25), '1m 1s') - self.assertEqual(ftime(119.9), '1m 59s') + assert ftime(0) == '' + assert ftime(5) == '5s' + assert ftime(100) == '1m 40s' + assert ftime(3789) == '1h 3m' + assert ftime(23011) == '6h 23m' + assert ftime(391187) == '4d 12h' + assert ftime(604800) == '1w 0d' + assert ftime(13893086) == '22w 6d' + assert ftime(59740269) == '1y 46w' + assert ftime(61.25) == '1m 1s' + assert ftime(119.9) == '1m 59s' def test_fdate(self): - self.assertTrue(fdate(-1) == '') + assert fdate(-1) == '' def test_is_url(self): - self.assertTrue(is_url('http://deluge-torrent.org')) - self.assertFalse(is_url('file://test.torrent')) + assert is_url('http://deluge-torrent.org') + assert not is_url('file://test.torrent') def test_is_magnet(self): - self.assertTrue( - is_magnet('magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN') - ) - self.assertFalse(is_magnet(None)) + assert is_magnet('magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN') + assert not is_magnet(None) def test_is_infohash(self): - self.assertTrue(is_infohash('2dc5d0e71a66fe69649a640d39cb00a259704973')) + assert is_infohash('2dc5d0e71a66fe69649a640d39cb00a259704973') def test_get_path_size(self): if windows_check() and sys.version_info < (3, 8): # https://bugs.python.org/issue1311 - raise unittest.SkipTest('os.devnull returns False on Windows') - self.assertTrue(get_path_size(os.devnull) == 0) - self.assertTrue(get_path_size('non-existant.file') == -1) + pytest.skip('os.devnull returns False on Windows') + assert get_path_size(os.devnull) == 0 + assert get_path_size('non-existant.file') == -1 def test_is_ip(self): - self.assertTrue(is_ip('192.0.2.0')) - self.assertFalse(is_ip('192..0.0')) - self.assertTrue(is_ip('2001:db8::')) - self.assertFalse(is_ip('2001:db8:')) + assert is_ip('192.0.2.0') + assert not is_ip('192..0.0') + assert is_ip('2001:db8::') + assert not is_ip('2001:db8:') def test_is_ipv4(self): - self.assertTrue(is_ipv4('192.0.2.0')) - self.assertFalse(is_ipv4('192..0.0')) + assert is_ipv4('192.0.2.0') + assert not is_ipv4('192..0.0') def test_is_ipv6(self): - self.assertTrue(is_ipv6('2001:db8::')) - self.assertFalse(is_ipv6('2001:db8:')) + assert is_ipv6('2001:db8::') + assert not is_ipv6('2001:db8:') + + def test_is_interface_name(self): + if windows_check(): + assert not is_interface_name('2001:db8:') + assert not is_interface_name('{THIS0000-IS00-ONLY-FOR0-TESTING00000}') + else: + assert is_interface_name('lo') + assert not is_interface_name('127.0.0.1') + assert not is_interface_name('eth01101') + + def test_is_interface(self): + if windows_check(): + assert is_interface('127.0.0.1') + assert not is_interface('127') + assert not is_interface('{THIS0000-IS00-ONLY-FOR0-TESTING00000}') + else: + assert is_interface('lo') + assert is_interface('127.0.0.1') + assert not is_interface('127.') + assert not is_interface('eth01101') def test_version_split(self): - self.assertTrue(VersionSplit('1.2.2') == VersionSplit('1.2.2')) - self.assertTrue(VersionSplit('1.2.1') < VersionSplit('1.2.2')) - self.assertTrue(VersionSplit('1.1.9') < VersionSplit('1.2.2')) - self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.1')) - self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.2-dev0')) - self.assertTrue(VersionSplit('1.2.2-dev') < VersionSplit('1.3.0-rc2')) - self.assertTrue(VersionSplit('1.2.2') > VersionSplit('1.2.2-rc2')) - self.assertTrue(VersionSplit('1.2.2-rc2-dev') < VersionSplit('1.2.2-rc2')) - self.assertTrue(VersionSplit('1.2.2-rc3') > VersionSplit('1.2.2-rc2')) - self.assertTrue(VersionSplit('0.14.9') == VersionSplit('0.14.9')) - self.assertTrue(VersionSplit('0.14.9') > VersionSplit('0.14.5')) - self.assertTrue(VersionSplit('0.14.10') >= VersionSplit('0.14.9')) - self.assertTrue(VersionSplit('1.4.0') > VersionSplit('1.3.900.dev123')) - self.assertTrue(VersionSplit('1.3.2rc2.dev1') < VersionSplit('1.3.2-rc2')) - self.assertTrue(VersionSplit('1.3.900.dev888') > VersionSplit('1.3.900.dev123')) - self.assertTrue(VersionSplit('1.4.0') > VersionSplit('1.4.0.dev123')) - self.assertTrue(VersionSplit('1.4.0.dev1') < VersionSplit('1.4.0')) - self.assertTrue(VersionSplit('1.4.0a1') < VersionSplit('1.4.0')) + assert VersionSplit('1.2.2') == VersionSplit('1.2.2') + assert VersionSplit('1.2.1') < VersionSplit('1.2.2') + assert VersionSplit('1.1.9') < VersionSplit('1.2.2') + assert VersionSplit('1.2.2') > VersionSplit('1.2.1') + assert VersionSplit('1.2.2') > VersionSplit('1.2.2-dev0') + assert VersionSplit('1.2.2-dev') < VersionSplit('1.3.0-rc2') + assert VersionSplit('1.2.2') > VersionSplit('1.2.2-rc2') + assert VersionSplit('1.2.2-rc2-dev') < VersionSplit('1.2.2-rc2') + assert VersionSplit('1.2.2-rc3') > VersionSplit('1.2.2-rc2') + assert VersionSplit('0.14.9') == VersionSplit('0.14.9') + assert VersionSplit('0.14.9') > VersionSplit('0.14.5') + assert VersionSplit('0.14.10') >= VersionSplit('0.14.9') + assert VersionSplit('1.4.0') > VersionSplit('1.3.900.dev123') + assert VersionSplit('1.3.2rc2.dev1') < VersionSplit('1.3.2-rc2') + assert VersionSplit('1.3.900.dev888') > VersionSplit('1.3.900.dev123') + assert VersionSplit('1.4.0') > VersionSplit('1.4.0.dev123') + assert VersionSplit('1.4.0.dev1') < VersionSplit('1.4.0') + assert VersionSplit('1.4.0a1') < VersionSplit('1.4.0') def test_parse_human_size(self): from deluge.common import parse_human_size @@ -150,17 +161,15 @@ class CommonTestCase(unittest.TestCase): ('1 MiB', 2 ** (10 * 2)), ('1 GiB', 2 ** (10 * 3)), ('1 GiB', 2 ** (10 * 3)), - ('1M', 10 ** 6), - ('1MB', 10 ** 6), - ('1 GB', 10 ** 9), - ('1 TB', 10 ** 12), + ('1M', 10**6), + ('1MB', 10**6), + ('1 GB', 10**9), + ('1 TB', 10**12), ] for human_size, byte_size in sizes: parsed = parse_human_size(human_size) - self.assertEqual( - parsed, byte_size, 'Mismatch when converting: %s' % human_size - ) + assert parsed == byte_size, 'Mismatch when converting: %s' % human_size def test_archive_files(self): arc_filelist = [ @@ -171,10 +180,10 @@ class CommonTestCase(unittest.TestCase): with tarfile.open(arc_filepath, 'r') as tar: for tar_info in tar: - self.assertTrue(tar_info.isfile()) - self.assertTrue( - tar_info.name in [os.path.basename(arcf) for arcf in arc_filelist] - ) + assert tar_info.isfile() + assert tar_info.name in [ + os.path.basename(arcf) for arcf in arc_filelist + ] def test_archive_files_missing(self): """Archive exists even with file not found.""" @@ -185,8 +194,8 @@ class CommonTestCase(unittest.TestCase): filelist.remove('missing.file') with tarfile.open(arc_filepath, 'r') as tar: - self.assertEqual(tar.getnames(), filelist) - self.assertTrue(all(tarinfo.isfile() for tarinfo in tar)) + assert tar.getnames() == filelist + assert all(tarinfo.isfile() for tarinfo in tar) def test_archive_files_message(self): filelist = ['test.torrent', 'deluge.png'] @@ -196,9 +205,22 @@ class CommonTestCase(unittest.TestCase): result_files = filelist + ['archive_message.txt'] with tarfile.open(arc_filepath, 'r') as tar: - self.assertEqual(tar.getnames(), result_files) + assert tar.getnames() == result_files for tar_info in tar: - self.assertTrue(tar_info.isfile()) + assert tar_info.isfile() if tar_info.name == 'archive_message.txt': result = tar.extractfile(tar_info).read().decode() - self.assertEqual(result, 'test') + assert result == 'test' + + def test_get_magnet_info_tiers(self): + tracker1 = 'udp://tracker1.example.com' + tracker2 = 'udp://tracker2.example.com' + magnet = ( + 'magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN' + f'&tr.1={quote_plus(tracker1)}' + f'&tr.2={quote_plus(tracker2)}' + ) + result = get_magnet_info(magnet) + assert result['info_hash'] == '953bad769164e8482c7785a21d12166f94b9e14d' + assert result['trackers'][tracker1] == 1 + assert result['trackers'][tracker2] == 2 diff --git a/deluge/tests/test_component.py b/deluge/tests/test_component.py index 26f24ad00..0345e24d3 100644 --- a/deluge/tests/test_component.py +++ b/deluge/tests/test_component.py @@ -1,19 +1,15 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - +import pytest +import pytest_twisted from twisted.internet import defer, threads -from twisted.trial.unittest import SkipTest import deluge.component as component -from .basetest import BaseTestCase - class ComponentTester(component.Component): def __init__(self, name, depend=None): @@ -70,14 +66,15 @@ class ComponentTesterShutdown(component.Component): self.stop_count += 1 -class ComponentTestClass(BaseTestCase): +@pytest.mark.usefixtures('component') +class TestComponent: def tear_down(self): return component.shutdown() def test_start_component(self): def on_start(result, c): - self.assertEqual(c._component_state, 'Started') - self.assertEqual(c.start_count, 1) + assert c._component_state == 'Started' + assert c.start_count == 1 c = ComponentTester('test_start_c1') d = component.start(['test_start_c1']) @@ -86,16 +83,16 @@ class ComponentTestClass(BaseTestCase): def test_start_stop_depends(self): def on_stop(result, c1, c2): - self.assertEqual(c1._component_state, 'Stopped') - self.assertEqual(c2._component_state, 'Stopped') - self.assertEqual(c1.stop_count, 1) - self.assertEqual(c2.stop_count, 1) + assert c1._component_state == 'Stopped' + assert c2._component_state == 'Stopped' + assert c1.stop_count == 1 + assert c2.stop_count == 1 def on_start(result, c1, c2): - self.assertEqual(c1._component_state, 'Started') - self.assertEqual(c2._component_state, 'Started') - self.assertEqual(c1.start_count, 1) - self.assertEqual(c2.start_count, 1) + assert c1._component_state == 'Started' + assert c2._component_state == 'Started' + assert c1.start_count == 1 + assert c2.start_count == 1 return component.stop(['test_start_depends_c1']).addCallback( on_stop, c1, c2 ) @@ -126,8 +123,8 @@ class ComponentTestClass(BaseTestCase): def test_start_all(self): def on_start(*args): for c in args[1:]: - self.assertEqual(c._component_state, 'Started') - self.assertEqual(c.start_count, 1) + assert c._component_state == 'Started' + assert c.start_count == 1 ret = self.start_with_depends() ret[0].addCallback(on_start, *ret[1:]) @@ -136,20 +133,19 @@ class ComponentTestClass(BaseTestCase): def test_register_exception(self): ComponentTester('test_register_exception_c1') - self.assertRaises( - component.ComponentAlreadyRegistered, - ComponentTester, - 'test_register_exception_c1', - ) + with pytest.raises(component.ComponentAlreadyRegistered): + ComponentTester( + 'test_register_exception_c1', + ) def test_stop_component(self): def on_stop(result, c): - self.assertEqual(c._component_state, 'Stopped') - self.assertFalse(c._component_timer.running) - self.assertEqual(c.stop_count, 1) + assert c._component_state == 'Stopped' + assert not c._component_timer.running + assert c.stop_count == 1 def on_start(result, c): - self.assertEqual(c._component_state, 'Started') + assert c._component_state == 'Started' return component.stop(['test_stop_component_c1']).addCallback(on_stop, c) c = ComponentTesterUpdate('test_stop_component_c1') @@ -160,12 +156,12 @@ class ComponentTestClass(BaseTestCase): def test_stop_all(self): def on_stop(result, *args): for c in args: - self.assertEqual(c._component_state, 'Stopped') - self.assertEqual(c.stop_count, 1) + assert c._component_state == 'Stopped' + assert c.stop_count == 1 def on_start(result, *args): for c in args: - self.assertEqual(c._component_state, 'Started') + assert c._component_state == 'Started' return component.stop().addCallback(on_stop, *args) ret = self.start_with_depends() @@ -175,9 +171,9 @@ class ComponentTestClass(BaseTestCase): def test_update(self): def on_start(result, c1, counter): - self.assertTrue(c1._component_timer) - self.assertTrue(c1._component_timer.running) - self.assertNotEqual(c1.counter, counter) + assert c1._component_timer + assert c1._component_timer.running + assert c1.counter != counter return component.stop() c1 = ComponentTesterUpdate('test_update_c1') @@ -189,13 +185,13 @@ class ComponentTestClass(BaseTestCase): def test_pause(self): def on_pause(result, c1, counter): - self.assertEqual(c1._component_state, 'Paused') - self.assertNotEqual(c1.counter, counter) - self.assertFalse(c1._component_timer.running) + assert c1._component_state == 'Paused' + assert c1.counter != counter + assert not c1._component_timer.running def on_start(result, c1, counter): - self.assertTrue(c1._component_timer) - self.assertNotEqual(c1.counter, counter) + assert c1._component_timer + assert c1.counter != counter d = component.pause(['test_pause_c1']) d.addCallback(on_pause, c1, counter) return d @@ -207,23 +203,16 @@ class ComponentTestClass(BaseTestCase): d.addCallback(on_start, c1, cnt) return d - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_component_start_error(self): ComponentTesterUpdate('test_pause_c1') yield component.start(['test_pause_c1']) yield component.pause(['test_pause_c1']) test_comp = component.get('test_pause_c1') - try: - result = self.failureResultOf(test_comp._component_start()) - except AttributeError: - raise SkipTest( - 'This test requires trial failureResultOf() in Twisted version >= 13' - ) - self.assertEqual( - result.check(component.ComponentException), component.ComponentException - ) + with pytest.raises(component.ComponentException, match='Current state: Paused'): + yield test_comp._component_start() - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_start_paused_error(self): ComponentTesterUpdate('test_pause_c1') yield component.start(['test_pause_c1']) @@ -232,29 +221,26 @@ class ComponentTestClass(BaseTestCase): # Deferreds that fail in component have to error handler which results in # twisted doing a log.err call which causes the test to fail. # Prevent failure by ignoring the exception - self._observer._ignoreErrors(component.ComponentException) + # self._observer._ignoreErrors(component.ComponentException) result = yield component.start() - self.assertEqual( - [(result[0][0], result[0][1].value)], - [ - ( - defer.FAILURE, - component.ComponentException( - 'Trying to start component "%s" but it is ' - 'not in a stopped state. Current state: %s' - % ('test_pause_c1', 'Paused'), - '', - ), - ) - ], - ) + assert [(result[0][0], result[0][1].value)] == [ + ( + defer.FAILURE, + component.ComponentException( + 'Trying to start component "%s" but it is ' + 'not in a stopped state. Current state: %s' + % ('test_pause_c1', 'Paused'), + '', + ), + ) + ] def test_shutdown(self): def on_shutdown(result, c1): - self.assertTrue(c1.shutdowned) - self.assertEqual(c1._component_state, 'Stopped') - self.assertEqual(c1.stop_count, 1) + assert c1.shutdowned + assert c1._component_state == 'Stopped' + assert c1.stop_count == 1 def on_start(result, c1): d = component.shutdown() diff --git a/deluge/tests/test_config.py b/deluge/tests/test_config.py index 0f6df3bb5..2840dbf5b 100644 --- a/deluge/tests/test_config.py +++ b/deluge/tests/test_config.py @@ -1,23 +1,21 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - +import json +import logging import os from codecs import getwriter +import pytest +import pytest_twisted from twisted.internet import task -from twisted.trial import unittest -import deluge.config from deluge.common import JSON_FORMAT from deluge.config import Config - -from .common import set_tmp_config_dir +from deluge.ui.hostlist import mask_hosts_password DEFAULTS = { 'string': 'foobar', @@ -26,37 +24,42 @@ DEFAULTS = { 'bool': True, 'unicode': 'foobar', 'password': 'abc123*\\[!]?/<>#{@}=|"+$%(^)~', + 'hosts': [ + ('host1', 'port', '', 'password1234'), + ('host2', 'port', '', 'password5678'), + ], } -class ConfigTestCase(unittest.TestCase): - def setUp(self): # NOQA: N803 - self.config_dir = set_tmp_config_dir() +LOGGER = logging.getLogger(__name__) + +class TestConfig: def test_init(self): config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) - self.assertEqual(DEFAULTS, config.config) + assert DEFAULTS == config.config config = Config('test.conf', config_dir=self.config_dir) - self.assertEqual({}, config.config) + assert {} == config.config def test_set_get_item(self): config = Config('test.conf', config_dir=self.config_dir) config['foo'] = 1 - self.assertEqual(config['foo'], 1) - self.assertRaises(ValueError, config.set_item, 'foo', 'bar') + assert config['foo'] == 1 + with pytest.raises(ValueError): + config.set_item('foo', 'bar') config['foo'] = 2 - self.assertEqual(config.get_item('foo'), 2) + assert config.get_item('foo') == 2 config['foo'] = '3' - self.assertEqual(config.get_item('foo'), 3) + assert config.get_item('foo') == 3 config['unicode'] = 'ВИДЕОФИЛЬМЫ' - self.assertEqual(config['unicode'], 'ВИДЕОФИЛЬМЫ') + assert config['unicode'] == 'ВИДЕОФИЛЬМЫ' config['unicode'] = b'foostring' - self.assertFalse(isinstance(config.get_item('unicode'), bytes)) + assert not isinstance(config.get_item('unicode'), bytes) config._save_timer.cancel() @@ -64,43 +67,103 @@ class ConfigTestCase(unittest.TestCase): config = Config('test.conf', config_dir=self.config_dir) config['foo'] = None - self.assertIsNone(config['foo']) - self.assertIsInstance(config['foo'], type(None)) + assert config['foo'] is None + assert isinstance(config['foo'], type(None)) config['foo'] = 1 - self.assertEqual(config.get('foo'), 1) + assert config.get('foo') == 1 config['foo'] = None - self.assertIsNone(config['foo']) + assert config['foo'] is None config['bar'] = None - self.assertIsNone(config['bar']) + assert config['bar'] is None config['bar'] = None - self.assertIsNone(config['bar']) + assert config['bar'] is None config._save_timer.cancel() + @pytest_twisted.ensureDeferred + async def test_on_changed_callback(self, mock_callback): + config = Config('test.conf', config_dir=self.config_dir) + config.register_change_callback(mock_callback) + config['foo'] = 1 + assert config['foo'] == 1 + await mock_callback.deferred + mock_callback.assert_called_once_with('foo', 1) + + @pytest_twisted.ensureDeferred + async def test_key_function_callback(self, mock_callback): + config = Config( + 'test.conf', defaults={'foo': 1, 'bar': 1}, config_dir=self.config_dir + ) + + assert config['foo'] == 1 + config.register_set_function('foo', mock_callback) + await mock_callback.deferred + mock_callback.assert_called_once_with('foo', 1) + + mock_callback.reset_mock() + config.register_set_function('bar', mock_callback, apply_now=False) + mock_callback.assert_not_called() + config['bar'] = 2 + await mock_callback.deferred + mock_callback.assert_called_once_with('bar', 2) + def test_get(self): config = Config('test.conf', config_dir=self.config_dir) config['foo'] = 1 - self.assertEqual(config.get('foo'), 1) - self.assertEqual(config.get('foobar'), None) - self.assertEqual(config.get('foobar', 2), 2) + assert config.get('foo') == 1 + assert config.get('foobar') is None + assert config.get('foobar', 2) == 2 config['foobar'] = 5 - self.assertEqual(config.get('foobar', 2), 5) + assert config.get('foobar', 2) == 5 + + def test_set_log_mask_funcs(self, caplog): + """Test mask func masks key in log""" + caplog.set_level(logging.DEBUG) + config = Config( + 'test.conf', + config_dir=self.config_dir, + log_mask_funcs={'hosts': mask_hosts_password}, + ) + config['hosts'] = DEFAULTS['hosts'] + assert isinstance(config['hosts'], list) + assert 'host1' in caplog.text + assert 'host2' in caplog.text + assert 'password1234' not in caplog.text + assert 'password5678' not in caplog.text + assert '*' * 10 in caplog.text + + def test_load_log_mask_funcs(self, caplog): + """Test mask func masks key in log""" + with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file: + json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT) + + config = Config( + 'test.conf', + config_dir=self.config_dir, + log_mask_funcs={'hosts': mask_hosts_password}, + ) + with caplog.at_level(logging.DEBUG): + config.load(os.path.join(self.config_dir, 'test.conf')) + assert 'host1' in caplog.text + assert 'host2' in caplog.text + assert 'foobar' in caplog.text + assert 'password1234' not in caplog.text + assert 'password5678' not in caplog.text + assert '*' * 10 in caplog.text def test_load(self): def check_config(): config = Config('test.conf', config_dir=self.config_dir) - self.assertEqual(config['string'], 'foobar') - self.assertEqual(config['float'], 0.435) - self.assertEqual(config['password'], 'abc123*\\[!]?/<>#{@}=|"+$%(^)~') + assert config['string'] == 'foobar' + assert config['float'] == 0.435 + assert config['password'] == 'abc123*\\[!]?/<>#{@}=|"+$%(^)~' # Test opening a previous 1.2 config file of just a json object - import json - with open(os.path.join(self.config_dir, 'test.conf'), 'wb') as _file: json.dump(DEFAULTS, getwriter('utf8')(_file), **JSON_FORMAT) @@ -128,38 +191,38 @@ class ConfigTestCase(unittest.TestCase): # We do this twice because the first time we need to save the file to disk # and the second time we do a compare and we should not write ret = config.save() - self.assertTrue(ret) + assert ret ret = config.save() - self.assertTrue(ret) + assert ret config['string'] = 'baz' config['int'] = 2 ret = config.save() - self.assertTrue(ret) + assert ret del config config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) - self.assertEqual(config['string'], 'baz') - self.assertEqual(config['int'], 2) + assert config['string'] == 'baz' + assert config['int'] == 2 def test_save_timer(self): - self.clock = task.Clock() - deluge.config.callLater = self.clock.callLater + clock = task.Clock() config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) + config.callLater = clock.callLater config['string'] = 'baz' config['int'] = 2 - self.assertTrue(config._save_timer.active()) + assert config._save_timer.active() # Timeout set for 5 seconds in config, so lets move clock by 5 seconds - self.clock.advance(5) + clock.advance(5) def check_config(config): - self.assertTrue(not config._save_timer.active()) + assert not config._save_timer.active() del config config = Config('test.conf', defaults=DEFAULTS, config_dir=self.config_dir) - self.assertEqual(config['string'], 'baz') - self.assertEqual(config['int'], 2) + assert config['string'] == 'baz' + assert config['int'] == 2 check_config(config) @@ -176,7 +239,7 @@ class ConfigTestCase(unittest.TestCase): from deluge.config import find_json_objects objects = find_json_objects(s) - self.assertEqual(len(objects), 2) + assert len(objects) == 2 def test_find_json_objects_curly_brace(self): """Test with string containing curly brace""" @@ -193,7 +256,7 @@ class ConfigTestCase(unittest.TestCase): from deluge.config import find_json_objects objects = find_json_objects(s) - self.assertEqual(len(objects), 2) + assert len(objects) == 2 def test_find_json_objects_double_quote(self): """Test with string containing double quote""" @@ -211,4 +274,4 @@ class ConfigTestCase(unittest.TestCase): from deluge.config import find_json_objects objects = find_json_objects(s) - self.assertEqual(len(objects), 2) + assert len(objects) == 2 diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py index 15fbc1bcf..6a3fb9506 100644 --- a/deluge/tests/test_core.py +++ b/deluge/tests/test_core.py @@ -1,20 +1,17 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - +import os from base64 import b64encode from hashlib import sha1 as sha import pytest -from six import integer_types +import pytest_twisted from twisted.internet import defer, reactor, task from twisted.internet.error import CannotListenError -from twisted.python.failure import Failure from twisted.web.http import FORBIDDEN from twisted.web.resource import EncodingResourceWrapper, Resource from twisted.web.server import GzipEncoderFactory, Site @@ -24,12 +21,12 @@ import deluge.common import deluge.component as component import deluge.core.torrent from deluge._libtorrent import lt +from deluge.conftest import BaseTestCase from deluge.core.core import Core from deluge.core.rpcserver import RPCServer from deluge.error import AddTorrentError, InvalidTorrentError from . import common -from .basetest import BaseTestCase common.disable_new_release_check() @@ -80,14 +77,13 @@ class TopLevelResource(Resource): ) -class CoreTestCase(BaseTestCase): +class TestCore(BaseTestCase): def set_up(self): - common.set_tmp_config_dir() self.rpcserver = RPCServer(listen=False) - self.core = Core() + self.core: Core = Core() self.core.config.config['lsd'] = False self.clock = task.Clock() - self.core.torrentmanager.callLater = self.clock.callLater + self.core.torrentmanager.clock = self.clock self.listen_port = 51242 return component.start().addCallback(self.start_web_server) @@ -131,7 +127,7 @@ class CoreTestCase(BaseTestCase): torrent_id = self.core.add_torrent_file(filename, filedump, options) return torrent_id - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_add_torrent_files(self): options = {} filenames = ['test.torrent', 'test_torrent.file.torrent'] @@ -142,9 +138,9 @@ class CoreTestCase(BaseTestCase): filedump = b64encode(_file.read()) files_to_add.append((filename, filedump, options)) errors = yield self.core.add_torrent_files(files_to_add) - self.assertEqual(len(errors), 0) + assert len(errors) == 0 - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_add_torrent_files_error_duplicate(self): options = {} filenames = ['test.torrent', 'test.torrent'] @@ -155,10 +151,10 @@ class CoreTestCase(BaseTestCase): filedump = b64encode(_file.read()) files_to_add.append((filename, filedump, options)) errors = yield self.core.add_torrent_files(files_to_add) - self.assertEqual(len(errors), 1) - self.assertTrue(str(errors[0]).startswith('Torrent already in session')) + assert len(errors) == 1 + assert str(errors[0]).startswith('Torrent already in session') - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_add_torrent_file(self): options = {} filename = common.get_test_data_file('test.torrent') @@ -171,17 +167,16 @@ class CoreTestCase(BaseTestCase): with open(filename, 'rb') as _file: info_hash = sha(bencode(bdecode(_file.read())[b'info'])).hexdigest() - self.assertEqual(torrent_id, info_hash) + assert torrent_id == info_hash def test_add_torrent_file_invalid_filedump(self): options = {} filename = common.get_test_data_file('test.torrent') - self.assertRaises( - AddTorrentError, self.core.add_torrent_file, filename, False, options - ) + with pytest.raises(AddTorrentError): + self.core.add_torrent_file(filename, False, options) - @defer.inlineCallbacks - def test_add_torrent_url(self): + @pytest_twisted.inlineCallbacks + def test_add_torrent_url(self, mock_mkstemp): url = ( 'http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent' % self.listen_port @@ -190,78 +185,83 @@ class CoreTestCase(BaseTestCase): info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' torrent_id = yield self.core.add_torrent_url(url, options) - self.assertEqual(torrent_id, info_hash) + assert torrent_id == info_hash + assert not os.path.isfile(mock_mkstemp[1]) - def test_add_torrent_url_with_cookie(self): + @pytest_twisted.ensureDeferred + async def test_add_torrent_url_with_cookie(self): url = 'http://localhost:%d/cookie' % self.listen_port options = {} headers = {'Cookie': 'password=deluge'} info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' - d = self.core.add_torrent_url(url, options) - d.addCallbacks(self.fail, self.assertIsInstance, errbackArgs=(Failure,)) + with pytest.raises(Exception): + await self.core.add_torrent_url(url, options) - d = self.core.add_torrent_url(url, options, headers) - d.addCallbacks(self.assertEqual, self.fail, callbackArgs=(info_hash,)) - - return d + result = await self.core.add_torrent_url(url, options, headers) + assert result == info_hash - def test_add_torrent_url_with_redirect(self): + @pytest_twisted.ensureDeferred + async def test_add_torrent_url_with_redirect(self): url = 'http://localhost:%d/redirect' % self.listen_port options = {} info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' - d = self.core.add_torrent_url(url, options) - d.addCallback(self.assertEqual, info_hash) - return d + result = await self.core.add_torrent_url(url, options) + assert result == info_hash - def test_add_torrent_url_with_partial_download(self): + @pytest_twisted.ensureDeferred + async def test_add_torrent_url_with_partial_download(self): url = 'http://localhost:%d/partial' % self.listen_port options = {} info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' - d = self.core.add_torrent_url(url, options) - d.addCallback(self.assertEqual, info_hash) - return d + result = await self.core.add_torrent_url(url, options) + assert result == info_hash - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_add_torrent_magnet(self): info_hash = '60d5d82328b4547511fdeac9bf4d0112daa0ce00' - uri = deluge.common.create_magnet_uri(info_hash) + tracker = 'udp://tracker.example.com' + name = 'test magnet' + uri = deluge.common.create_magnet_uri(info_hash, name=name, trackers=[tracker]) options = {} torrent_id = yield self.core.add_torrent_magnet(uri, options) - self.assertEqual(torrent_id, info_hash) + assert torrent_id == info_hash + torrent_status = self.core.get_torrent_status(torrent_id, ['name', 'trackers']) + assert torrent_status['trackers'][0]['url'] == tracker + assert torrent_status['name'] == name def test_resume_torrent(self): tid1 = self.add_torrent('test.torrent', paused=True) tid2 = self.add_torrent('test_torrent.file.torrent', paused=True) # Assert paused r1 = self.core.get_torrent_status(tid1, ['paused']) - self.assertTrue(r1['paused']) + assert r1['paused'] r2 = self.core.get_torrent_status(tid2, ['paused']) - self.assertTrue(r2['paused']) + assert r2['paused'] self.core.resume_torrent(tid2) r1 = self.core.get_torrent_status(tid1, ['paused']) - self.assertTrue(r1['paused']) + assert r1['paused'] r2 = self.core.get_torrent_status(tid2, ['paused']) - self.assertFalse(r2['paused']) + assert not r2['paused'] def test_resume_torrent_list(self): """Backward compatibility for list of torrent_ids.""" torrent_id = self.add_torrent('test.torrent', paused=True) self.core.resume_torrent([torrent_id]) result = self.core.get_torrent_status(torrent_id, ['paused']) - self.assertFalse(result['paused']) + assert not result['paused'] def test_resume_torrents(self): tid1 = self.add_torrent('test.torrent', paused=True) tid2 = self.add_torrent('test_torrent.file.torrent', paused=True) self.core.resume_torrents([tid1, tid2]) r1 = self.core.get_torrent_status(tid1, ['paused']) - self.assertFalse(r1['paused']) + assert not r1['paused'] r2 = self.core.get_torrent_status(tid2, ['paused']) - self.assertFalse(r2['paused']) + assert not r2['paused'] def test_resume_torrents_all(self): """With no torrent_ids param, resume all torrents""" @@ -269,33 +269,33 @@ class CoreTestCase(BaseTestCase): tid2 = self.add_torrent('test_torrent.file.torrent', paused=True) self.core.resume_torrents() r1 = self.core.get_torrent_status(tid1, ['paused']) - self.assertFalse(r1['paused']) + assert not r1['paused'] r2 = self.core.get_torrent_status(tid2, ['paused']) - self.assertFalse(r2['paused']) + assert not r2['paused'] def test_pause_torrent(self): tid1 = self.add_torrent('test.torrent') tid2 = self.add_torrent('test_torrent.file.torrent') # Assert not paused r1 = self.core.get_torrent_status(tid1, ['paused']) - self.assertFalse(r1['paused']) + assert not r1['paused'] r2 = self.core.get_torrent_status(tid2, ['paused']) - self.assertFalse(r2['paused']) + assert not r2['paused'] self.core.pause_torrent(tid2) r1 = self.core.get_torrent_status(tid1, ['paused']) - self.assertFalse(r1['paused']) + assert not r1['paused'] r2 = self.core.get_torrent_status(tid2, ['paused']) - self.assertTrue(r2['paused']) + assert r2['paused'] def test_pause_torrent_list(self): """Backward compatibility for list of torrent_ids.""" torrent_id = self.add_torrent('test.torrent') result = self.core.get_torrent_status(torrent_id, ['paused']) - self.assertFalse(result['paused']) + assert not result['paused'] self.core.pause_torrent([torrent_id]) result = self.core.get_torrent_status(torrent_id, ['paused']) - self.assertTrue(result['paused']) + assert result['paused'] def test_pause_torrents(self): tid1 = self.add_torrent('test.torrent') @@ -303,9 +303,9 @@ class CoreTestCase(BaseTestCase): self.core.pause_torrents([tid1, tid2]) r1 = self.core.get_torrent_status(tid1, ['paused']) - self.assertTrue(r1['paused']) + assert r1['paused'] r2 = self.core.get_torrent_status(tid2, ['paused']) - self.assertTrue(r2['paused']) + assert r2['paused'] def test_pause_torrents_all(self): """With no torrent_ids param, pause all torrents""" @@ -314,26 +314,24 @@ class CoreTestCase(BaseTestCase): self.core.pause_torrents() r1 = self.core.get_torrent_status(tid1, ['paused']) - self.assertTrue(r1['paused']) + assert r1['paused'] r2 = self.core.get_torrent_status(tid2, ['paused']) - self.assertTrue(r2['paused']) + assert r2['paused'] + @pytest_twisted.inlineCallbacks def test_prefetch_metadata_existing(self): """Check another call with same magnet returns existing deferred.""" magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' - expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', None) - - def on_result(result): - self.assertEqual(result, expected) + expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'') - d = self.core.prefetch_magnet_metadata(magnet) - d.addCallback(on_result) + d1 = self.core.prefetch_magnet_metadata(magnet) d2 = self.core.prefetch_magnet_metadata(magnet) - d2.addCallback(on_result) + dg = defer.gatherResults([d1, d2], consumeErrors=True) self.clock.advance(30) - return defer.DeferredList([d, d2]) + result = yield dg + assert result == [expected] * 2 - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_remove_torrent(self): options = {} filename = common.get_test_data_file('test.torrent') @@ -341,18 +339,17 @@ class CoreTestCase(BaseTestCase): filedump = b64encode(_file.read()) torrent_id = yield self.core.add_torrent_file_async(filename, filedump, options) removed = self.core.remove_torrent(torrent_id, True) - self.assertTrue(removed) - self.assertEqual(len(self.core.get_session_state()), 0) + assert removed + assert len(self.core.get_session_state()) == 0 def test_remove_torrent_invalid(self): - self.assertRaises( - InvalidTorrentError, - self.core.remove_torrent, - 'torrentidthatdoesntexist', - True, - ) + with pytest.raises(InvalidTorrentError): + self.core.remove_torrent( + 'torrentidthatdoesntexist', + True, + ) - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_remove_torrents(self): options = {} filename = common.get_test_data_file('test.torrent') @@ -369,17 +366,17 @@ class CoreTestCase(BaseTestCase): d = self.core.remove_torrents([torrent_id, torrent_id2], True) def test_ret(val): - self.assertTrue(val == []) + assert val == [] d.addCallback(test_ret) def test_session_state(val): - self.assertEqual(len(self.core.get_session_state()), 0) + assert len(self.core.get_session_state()) == 0 d.addCallback(test_session_state) yield d - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_remove_torrents_invalid(self): options = {} filename = common.get_test_data_file('test.torrent') @@ -391,58 +388,53 @@ class CoreTestCase(BaseTestCase): val = yield self.core.remove_torrents( ['invalidid1', 'invalidid2', torrent_id], False ) - self.assertEqual(len(val), 2) - self.assertEqual( - val[0], ('invalidid1', 'torrent_id invalidid1 not in session.') - ) - self.assertEqual( - val[1], ('invalidid2', 'torrent_id invalidid2 not in session.') - ) + assert len(val) == 2 + assert val[0] == ('invalidid1', 'torrent_id invalidid1 not in session.') + assert val[1] == ('invalidid2', 'torrent_id invalidid2 not in session.') def test_get_session_status(self): status = self.core.get_session_status( ['net.recv_tracker_bytes', 'net.sent_tracker_bytes'] ) - self.assertIsInstance(status, dict) - self.assertEqual(status['net.recv_tracker_bytes'], 0) - self.assertEqual(status['net.sent_tracker_bytes'], 0) + assert isinstance(status, dict) + assert status['net.recv_tracker_bytes'] == 0 + assert status['net.sent_tracker_bytes'] == 0 def test_get_session_status_all(self): status = self.core.get_session_status([]) - self.assertIsInstance(status, dict) - self.assertIn('upload_rate', status) - self.assertIn('net.recv_bytes', status) + assert isinstance(status, dict) + assert 'upload_rate' in status + assert 'net.recv_bytes' in status def test_get_session_status_depr(self): status = self.core.get_session_status(['num_peers', 'num_unchoked']) - self.assertIsInstance(status, dict) - self.assertEqual(status['num_peers'], 0) - self.assertEqual(status['num_unchoked'], 0) + assert isinstance(status, dict) + assert status['num_peers'] == 0 + assert status['num_unchoked'] == 0 def test_get_session_status_rates(self): status = self.core.get_session_status(['upload_rate', 'download_rate']) - self.assertIsInstance(status, dict) - self.assertEqual(status['upload_rate'], 0) + assert isinstance(status, dict) + assert status['upload_rate'] == 0 def test_get_session_status_ratio(self): status = self.core.get_session_status(['write_hit_ratio', 'read_hit_ratio']) - self.assertIsInstance(status, dict) - self.assertEqual(status['write_hit_ratio'], 0.0) - self.assertEqual(status['read_hit_ratio'], 0.0) + assert isinstance(status, dict) + assert status['write_hit_ratio'] == 0.0 + assert status['read_hit_ratio'] == 0.0 def test_get_free_space(self): space = self.core.get_free_space('.') - # get_free_space returns long on Python 2 (32-bit). - self.assertTrue(isinstance(space, integer_types)) - self.assertTrue(space >= 0) - self.assertEqual(self.core.get_free_space('/someinvalidpath'), -1) + assert isinstance(space, int) + assert space >= 0 + assert self.core.get_free_space('/someinvalidpath') == -1 @pytest.mark.slow def test_test_listen_port(self): d = self.core.test_listen_port() def result(r): - self.assertTrue(r in (True, False)) + assert r in (True, False) d.addCallback(result) return d @@ -460,24 +452,22 @@ class CoreTestCase(BaseTestCase): } for key in pathlist: - self.assertEqual( - deluge.core.torrent.sanitize_filepath(key, folder=False), pathlist[key] + assert ( + deluge.core.torrent.sanitize_filepath(key, folder=False) + == pathlist[key] ) - self.assertEqual( - deluge.core.torrent.sanitize_filepath(key, folder=True), - pathlist[key] + '/', + + assert ( + deluge.core.torrent.sanitize_filepath(key, folder=True) + == pathlist[key] + '/' ) def test_get_set_config_values(self): - self.assertEqual( - self.core.get_config_values(['abc', 'foo']), {'foo': None, 'abc': None} - ) - self.assertEqual(self.core.get_config_value('foobar'), None) + assert self.core.get_config_values(['abc', 'foo']) == {'foo': None, 'abc': None} + assert self.core.get_config_value('foobar') is None self.core.set_config({'abc': 'def', 'foo': 10, 'foobar': 'barfoo'}) - self.assertEqual( - self.core.get_config_values(['foo', 'abc']), {'foo': 10, 'abc': 'def'} - ) - self.assertEqual(self.core.get_config_value('foobar'), 'barfoo') + assert self.core.get_config_values(['foo', 'abc']) == {'foo': 10, 'abc': 'def'} + assert self.core.get_config_value('foobar') == 'barfoo' def test_read_only_config_keys(self): key = 'max_upload_speed' @@ -486,13 +476,13 @@ class CoreTestCase(BaseTestCase): old_value = self.core.get_config_value(key) self.core.set_config({key: old_value + 10}) new_value = self.core.get_config_value(key) - self.assertEqual(old_value, new_value) + assert old_value == new_value self.core.read_only_config_keys = None def test__create_peer_id(self): - self.assertEqual(self.core._create_peer_id('2.0.0'), '-DE200s-') - self.assertEqual(self.core._create_peer_id('2.0.0.dev15'), '-DE200D-') - self.assertEqual(self.core._create_peer_id('2.0.1rc1'), '-DE201r-') - self.assertEqual(self.core._create_peer_id('2.11.0b2'), '-DE2B0b-') - self.assertEqual(self.core._create_peer_id('2.4.12b2.dev3'), '-DE24CD-') + assert self.core._create_peer_id('2.0.0') == '-DE200s-' + assert self.core._create_peer_id('2.0.0.dev15') == '-DE200D-' + assert self.core._create_peer_id('2.0.1rc1') == '-DE201r-' + assert self.core._create_peer_id('2.11.0b2') == '-DE2B0b-' + assert self.core._create_peer_id('2.4.12b2.dev3') == '-DE24CD-' diff --git a/deluge/tests/test_decorators.py b/deluge/tests/test_decorators.py index 7d4bd98c8..d2ecd1a2b 100644 --- a/deluge/tests/test_decorators.py +++ b/deluge/tests/test_decorators.py @@ -1,18 +1,14 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - -from twisted.trial import unittest from deluge.decorators import proxy -class DecoratorsTestCase(unittest.TestCase): +class TestDecorators: def test_proxy_with_simple_functions(self): def negate(func, *args, **kwargs): return not func(*args, **kwargs) @@ -26,16 +22,16 @@ class DecoratorsTestCase(unittest.TestCase): def double_nothing(_bool): return _bool - self.assertTrue(something(False)) - self.assertFalse(something(True)) - self.assertTrue(double_nothing(True)) - self.assertFalse(double_nothing(False)) + assert something(False) + assert not something(True) + assert double_nothing(True) + assert not double_nothing(False) def test_proxy_with_class_method(self): def negate(func, *args, **kwargs): return -func(*args, **kwargs) - class Test(object): + class Test: def __init__(self, number): self.number = number @@ -48,5 +44,5 @@ class DecoratorsTestCase(unittest.TestCase): return self.diff(number) t = Test(5) - self.assertEqual(t.diff(1), -4) - self.assertEqual(t.no_diff(1), 4) + assert t.diff(1) == -4 + assert t.no_diff(1) == 4 diff --git a/deluge/tests/test_error.py b/deluge/tests/test_error.py index c552e9422..a87d6a2d8 100644 --- a/deluge/tests/test_error.py +++ b/deluge/tests/test_error.py @@ -1,54 +1,39 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - -from twisted.trial import unittest - import deluge.error -class ErrorTestCase(unittest.TestCase): - def setUp(self): # NOQA: N803 - pass - - def tearDown(self): # NOQA: N803 - pass - +class TestError: def test_deluge_error(self): msg = 'Some message' e = deluge.error.DelugeError(msg) - self.assertEqual(str(e), msg) + assert str(e) == msg from twisted.internet.defer import DebugInfo del DebugInfo.__del__ # Hides all errors - self.assertEqual(e._args, (msg,)) - self.assertEqual(e._kwargs, {}) + assert e._args == (msg,) + assert e._kwargs == {} def test_incompatible_client(self): version = '1.3.6' e = deluge.error.IncompatibleClient(version) - self.assertEqual( - str(e), - 'Your deluge client is not compatible with the daemon. \ -Please upgrade your client to %s' - % version, + assert ( + str(e) == 'Your deluge client is not compatible with the daemon. ' + 'Please upgrade your client to %s' % version ) def test_not_authorized_error(self): current_level = 5 required_level = 10 e = deluge.error.NotAuthorizedError(current_level, required_level) - self.assertEqual( - str(e), 'Auth level too low: %d < %d' % (current_level, required_level) - ) + assert str(e) == 'Auth level too low: %d < %d' % (current_level, required_level) def test_bad_login_error(self): message = 'Login failed' username = 'deluge' e = deluge.error.BadLoginError(message, username) - self.assertEqual(str(e), message) + assert str(e) == message diff --git a/deluge/tests/test_files_tab.py b/deluge/tests/test_files_tab.py index 1ec8e18de..1e97cbbc3 100644 --- a/deluge/tests/test_files_tab.py +++ b/deluge/tests/test_files_tab.py @@ -1,23 +1,16 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import print_function, unicode_literals - import pytest -from twisted.trial import unittest import deluge.component as component -from deluge.common import windows_check from deluge.configmanager import ConfigManager +from deluge.conftest import BaseTestCase from deluge.i18n import setup_translation -from . import common -from .basetest import BaseTestCase - libs_available = True # Allow running other tests without GTKUI dependencies available try: @@ -32,12 +25,11 @@ setup_translation() @pytest.mark.gtkui -class FilesTabTestCase(BaseTestCase): +class TestFilesTab(BaseTestCase): def set_up(self): if libs_available is False: - raise unittest.SkipTest('GTKUI dependencies not available') + pytest.skip('GTKUI dependencies not available') - common.set_tmp_config_dir() ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS) self.mainwindow = MainWindow() self.filestab = FilesTab() @@ -52,8 +44,8 @@ class FilesTabTestCase(BaseTestCase): root = treestore.get_iter_first() level = 1 - def p_level(s, l): - print('%s%s' % (' ' * l, s)) + def p_level(s, lvl): + print('{}{}'.format(' ' * lvl, s)) def _print_treestore_children(i, lvl): while i: @@ -98,80 +90,74 @@ class FilesTabTestCase(BaseTestCase): ) if not ret: self.print_treestore('Treestore not expected:', self.filestab.treestore) - self.assertTrue(ret) + assert ret def test_files_tab2(self): - if windows_check(): - raise unittest.SkipTest('on windows \\ != / for path names') self.filestab.files_list[self.t_id] = ( - {'index': 0, 'path': '1/1/test_10.txt', 'offset': 0, 'size': 13}, - {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14}, + {'index': 0, 'path': '1/1/test_100.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': 'test_101.txt', 'offset': 13, 'size': 14}, ) self.filestab.update_files() self.filestab._on_torrentfilerenamed_event( - self.t_id, self.index, '1/1/test_100.txt' + self.t_id, self.index, '1/1/test_101.txt' ) ret = self.verify_treestore( self.filestab.treestore, - [['1/', [['1/', [['test_100.txt'], ['test_10.txt']]]]]], + [['1/', [['1/', [['test_100.txt'], ['test_101.txt']]]]]], ) if not ret: self.print_treestore('Treestore not expected:', self.filestab.treestore) - self.assertTrue(ret) + assert ret def test_files_tab3(self): - if windows_check(): - raise unittest.SkipTest('on windows \\ != / for path names') self.filestab.files_list[self.t_id] = ( - {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13}, - {'index': 1, 'path': 'test_100.txt', 'offset': 13, 'size': 14}, + {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': 'test_101.txt', 'offset': 13, 'size': 14}, ) self.filestab.update_files() self.filestab._on_torrentfilerenamed_event( - self.t_id, self.index, '1/test_100.txt' + self.t_id, self.index, '1/test_101.txt' ) ret = self.verify_treestore( - self.filestab.treestore, [['1/', [['test_100.txt'], ['test_10.txt']]]] + self.filestab.treestore, [['1/', [['test_100.txt'], ['test_101.txt']]]] ) if not ret: self.print_treestore('Treestore not expected:', self.filestab.treestore) - self.assertTrue(ret) + assert ret def test_files_tab4(self): self.filestab.files_list[self.t_id] = ( - {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13}, - {'index': 1, 'path': '1/test_100.txt', 'offset': 13, 'size': 14}, + {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': '1/test_101.txt', 'offset': 13, 'size': 14}, ) self.filestab.update_files() self.filestab._on_torrentfilerenamed_event( - self.t_id, self.index, '1/2/test_100.txt' + self.t_id, self.index, '1/2/test_101.txt' ) ret = self.verify_treestore( self.filestab.treestore, - [['1/', [['2/', [['test_100.txt']]], ['test_10.txt']]]], + [['1/', [['2/', [['test_101.txt']]], ['test_100.txt']]]], ) if not ret: self.print_treestore('Treestore not expected:', self.filestab.treestore) - self.assertTrue(ret) + assert ret def test_files_tab5(self): - if windows_check(): - raise unittest.SkipTest('on windows \\ != / for path names') self.filestab.files_list[self.t_id] = ( - {'index': 0, 'path': '1/test_10.txt', 'offset': 0, 'size': 13}, - {'index': 1, 'path': '2/test_100.txt', 'offset': 13, 'size': 14}, + {'index': 0, 'path': '1/test_100.txt', 'offset': 0, 'size': 13}, + {'index': 1, 'path': '2/test_101.txt', 'offset': 13, 'size': 14}, ) self.filestab.update_files() self.filestab._on_torrentfilerenamed_event( - self.t_id, self.index, '1/test_100.txt' + self.t_id, self.index, '1/test_101.txt' ) ret = self.verify_treestore( - self.filestab.treestore, [['1/', [['test_100.txt'], ['test_10.txt']]]] + self.filestab.treestore, [['1/', [['test_100.txt'], ['test_101.txt']]]] ) if not ret: self.print_treestore('Treestore not expected:', self.filestab.treestore) - self.assertTrue(ret) + assert ret diff --git a/deluge/tests/test_httpdownloader.py b/deluge/tests/test_httpdownloader.py index ad947a422..8c491b68a 100644 --- a/deluge/tests/test_httpdownloader.py +++ b/deluge/tests/test_httpdownloader.py @@ -1,22 +1,18 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - import os import tempfile from email.utils import formatdate -from io import open +import pytest +import pytest_twisted from twisted.internet import reactor from twisted.internet.error import CannotListenError -from twisted.python.failure import Failure -from twisted.trial import unittest -from twisted.web.error import PageRedirect +from twisted.web.error import Error, PageRedirect from twisted.web.http import NOT_MODIFIED from twisted.web.resource import EncodingResourceWrapper, Resource from twisted.web.server import GzipEncoderFactory, Site @@ -71,7 +67,7 @@ class TorrentResource(Resource): content_type += b'; charset=' + charset request.setHeader(b'Content-Type', content_type) request.setHeader(b'Content-Disposition', b'attachment; filename=test.torrent') - return 'Binary attachment ignore charset 世丕且\n'.encode('utf8') + return 'Binary attachment ignore charset 世丕且\n'.encode() class CookieResource(Resource): @@ -138,11 +134,13 @@ class TopLevelResource(Resource): return b'<h1>Deluge HTTP Downloader tests webserver here</h1>' -class DownloadFileTestCase(unittest.TestCase): +class TestDownloadFile: def get_url(self, path=''): return 'http://localhost:%d/%s' % (self.listen_port, path) - def setUp(self): # NOQA + @pytest_twisted.async_yield_fixture(autouse=True) + async def setUp(self, request): # NOQA + self = request.instance setup_logger('warning', fname('log_file')) self.website = Site(TopLevelResource()) self.listen_port = 51242 @@ -158,140 +156,136 @@ class DownloadFileTestCase(unittest.TestCase): else: raise error - def tearDown(self): # NOQA - return self.webserver.stopListening() + yield + + await self.webserver.stopListening() - def assertContains(self, filename, contents): # NOQA - with open(filename, 'r', encoding='utf8') as _file: + def assert_contains(self, filename, contents): + with open(filename, encoding='utf8') as _file: try: - self.assertEqual(_file.read(), contents) + assert _file.read() == contents except Exception as ex: - self.fail(ex) + pytest.fail(ex) return filename - def assertNotContains(self, filename, contents, file_mode=''): # NOQA - with open(filename, 'r', encoding='utf8') as _file: + def assert_not_contains(self, filename, contents, file_mode=''): + with open(filename, encoding='utf8') as _file: try: - self.assertNotEqual(_file.read(), contents) + assert _file.read() != contents except Exception as ex: - self.fail(ex) + pytest.fail(ex) return filename - def test_download(self): - d = download_file(self.get_url(), fname('index.html')) - d.addCallback(self.assertEqual, fname('index.html')) - return d + @pytest_twisted.ensureDeferred + async def test_download(self): + filename = await download_file(self.get_url(), fname('index.html')) + assert filename == fname('index.html') - def test_download_without_required_cookies(self): + @pytest_twisted.ensureDeferred + async def test_download_without_required_cookies(self): url = self.get_url('cookie') - d = download_file(url, fname('none')) - d.addCallback(self.fail) - d.addErrback(self.assertIsInstance, Failure) - return d + filename = await download_file(url, fname('none')) + self.assert_contains(filename, 'Password cookie not set!') - def test_download_with_required_cookies(self): + @pytest_twisted.ensureDeferred + async def test_download_with_required_cookies(self): url = self.get_url('cookie') cookie = {'cookie': 'password=deluge'} - d = download_file(url, fname('monster'), headers=cookie) - d.addCallback(self.assertEqual, fname('monster')) - d.addCallback(self.assertContains, 'COOKIE MONSTER!') - return d + filename = await download_file(url, fname('monster'), headers=cookie) + assert filename == fname('monster') + self.assert_contains(filename, 'COOKIE MONSTER!') - def test_download_with_rename(self): + @pytest_twisted.ensureDeferred + async def test_download_with_rename(self): url = self.get_url('rename?filename=renamed') - d = download_file(url, fname('original')) - d.addCallback(self.assertEqual, fname('renamed')) - d.addCallback(self.assertContains, 'This file should be called renamed') - return d + filename = await download_file(url, fname('original')) + assert filename == fname('renamed') + self.assert_contains(filename, 'This file should be called renamed') - def test_download_with_rename_exists(self): + @pytest_twisted.ensureDeferred + async def test_download_with_rename_exists(self): open(fname('renamed'), 'w').close() url = self.get_url('rename?filename=renamed') - d = download_file(url, fname('original')) - d.addCallback(self.assertEqual, fname('renamed-1')) - d.addCallback(self.assertContains, 'This file should be called renamed') - return d + filename = await download_file(url, fname('original')) + assert filename == fname('renamed-1') + self.assert_contains(filename, 'This file should be called renamed') - def test_download_with_rename_sanitised(self): + @pytest_twisted.ensureDeferred + async def test_download_with_rename_sanitised(self): url = self.get_url('rename?filename=/etc/passwd') - d = download_file(url, fname('original')) - d.addCallback(self.assertEqual, fname('passwd')) - d.addCallback(self.assertContains, 'This file should be called /etc/passwd') - return d + filename = await download_file(url, fname('original')) + assert filename == fname('passwd') + self.assert_contains(filename, 'This file should be called /etc/passwd') - def test_download_with_attachment_no_filename(self): + @pytest_twisted.ensureDeferred + async def test_download_with_attachment_no_filename(self): url = self.get_url('attachment') - d = download_file(url, fname('original')) - d.addCallback(self.assertEqual, fname('original')) - d.addCallback(self.assertContains, 'Attachment with no filename set') - return d + filename = await download_file(url, fname('original')) + assert filename == fname('original') + self.assert_contains(filename, 'Attachment with no filename set') - def test_download_with_rename_prevented(self): + @pytest_twisted.ensureDeferred + async def test_download_with_rename_prevented(self): url = self.get_url('rename?filename=spam') - d = download_file(url, fname('forced'), force_filename=True) - d.addCallback(self.assertEqual, fname('forced')) - d.addCallback(self.assertContains, 'This file should be called spam') - return d + filename = await download_file(url, fname('forced'), force_filename=True) + assert filename == fname('forced') + self.assert_contains(filename, 'This file should be called spam') - def test_download_with_gzip_encoding(self): + @pytest_twisted.ensureDeferred + async def test_download_with_gzip_encoding(self): url = self.get_url('gzip?msg=success') - d = download_file(url, fname('gzip_encoded')) - d.addCallback(self.assertContains, 'success') - return d + filename = await download_file(url, fname('gzip_encoded')) + self.assert_contains(filename, 'success') - def test_download_with_gzip_encoding_disabled(self): + @pytest_twisted.ensureDeferred + async def test_download_with_gzip_encoding_disabled(self): url = self.get_url('gzip?msg=unzip') - d = download_file(url, fname('gzip_encoded'), allow_compression=False) - d.addCallback(self.assertContains, 'unzip') - return d + filename = await download_file( + url, fname('gzip_encoded'), allow_compression=False + ) + self.assert_contains(filename, 'unzip') - def test_page_redirect_unhandled(self): + @pytest_twisted.ensureDeferred + async def test_page_redirect_unhandled(self): url = self.get_url('redirect') - d = download_file(url, fname('none')) - d.addCallback(self.fail) + with pytest.raises(PageRedirect): + await download_file(url, fname('none'), handle_redirects=False) - def on_redirect(failure): - self.assertTrue(type(failure), PageRedirect) + @pytest_twisted.ensureDeferred + async def test_page_redirect(self): + url = self.get_url('redirect') + filename = await download_file(url, fname('none'), handle_redirects=True) + assert filename == fname('none') - d.addErrback(on_redirect) - return d + @pytest_twisted.ensureDeferred + async def test_page_not_found(self): + with pytest.raises(Error): + await download_file(self.get_url('page/not/found'), fname('none')) - def test_page_redirect(self): - url = self.get_url('redirect') - d = download_file(url, fname('none'), handle_redirects=True) - d.addCallback(self.assertEqual, fname('none')) - d.addErrback(self.fail) - return d - - def test_page_not_found(self): - d = download_file(self.get_url('page/not/found'), fname('none')) - d.addCallback(self.fail) - d.addErrback(self.assertIsInstance, Failure) - return d - - def test_page_not_modified(self): + @pytest.mark.xfail(reason="Doesn't seem like httpdownloader ever implemented this.") + @pytest_twisted.ensureDeferred + async def test_page_not_modified(self): headers = {'If-Modified-Since': formatdate(usegmt=True)} - d = download_file(self.get_url(), fname('index.html'), headers=headers) - d.addCallback(self.fail) - d.addErrback(self.assertIsInstance, Failure) - return d + with pytest.raises(Error) as exc_info: + await download_file(self.get_url(), fname('index.html'), headers=headers) + assert exc_info.value.status == NOT_MODIFIED - def test_download_text_reencode_charset(self): + @pytest_twisted.ensureDeferred + async def test_download_text_reencode_charset(self): """Re-encode as UTF-8 specified charset for text content-type header""" url = self.get_url('attachment') filepath = fname('test.txt') headers = {'content-charset': 'Windows-1251', 'content-append': 'бвгде'} - d = download_file(url, filepath, headers=headers) - d.addCallback(self.assertEqual, filepath) - d.addCallback(self.assertContains, 'Attachment with no filename setбвгде') - return d + filename = await download_file(url, filepath, headers=headers) + assert filename == filepath + self.assert_contains(filename, 'Attachment with no filename setбвгде') - def test_download_binary_ignore_charset(self): + @pytest_twisted.ensureDeferred + async def test_download_binary_ignore_charset(self): """Ignore charset for binary content-type header e.g. torrent files""" url = self.get_url('torrent') headers = {'content-charset': 'Windows-1251'} filepath = fname('test.torrent') - d = download_file(url, fname('test.torrent'), headers=headers) - d.addCallback(self.assertEqual, filepath) - d.addCallback(self.assertContains, 'Binary attachment ignore charset 世丕且\n') - return d + filename = await download_file(url, fname('test.torrent'), headers=headers) + assert filename == filepath + self.assert_contains(filename, 'Binary attachment ignore charset 世丕且\n') diff --git a/deluge/tests/test_json_api.py b/deluge/tests/test_json_api.py index 1da64bf97..41efb0206 100644 --- a/deluge/tests/test_json_api.py +++ b/deluge/tests/test_json_api.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> # @@ -7,64 +6,35 @@ # See LICENSE for more details. # -from __future__ import unicode_literals - import json as json_lib +from unittest.mock import MagicMock -from mock import MagicMock -from twisted.internet import defer +import pytest +import pytest_twisted from twisted.web import server from twisted.web.http import Request import deluge.common -import deluge.component as component import deluge.ui.web.auth import deluge.ui.web.json_api from deluge.error import DelugeError -from deluge.ui.client import client from deluge.ui.web.auth import Auth from deluge.ui.web.json_api import JSON, JSONException from . import common -from .basetest import BaseTestCase from .common_web import WebServerMockBase -from .daemon_base import DaemonBase common.disable_new_release_check() -class JSONBase(BaseTestCase, DaemonBase): - def connect_client(self, *args, **kwargs): - return client.connect( - 'localhost', - self.listen_port, - username=kwargs.get('user', ''), - password=kwargs.get('password', ''), - ) - - def disconnect_client(self, *args): - return client.disconnect() - - def tear_down(self): - d = component.shutdown() - d.addCallback(self.disconnect_client) - d.addCallback(self.terminate_core) - return d - - -class JSONTestCase(JSONBase): - def set_up(self): - d = self.common_set_up() - d.addCallback(self.start_core) - d.addCallbacks(self.connect_client, self.terminate_core) - return d - - @defer.inlineCallbacks - def test_get_remote_methods(self): +@pytest.mark.usefixtures('daemon', 'client', 'component') +class TestJSON: + @pytest_twisted.ensureDeferred + async def test_get_remote_methods(self): json = JSON() - methods = yield json.get_remote_methods() - self.assertEqual(type(methods), tuple) - self.assertTrue(len(methods) > 0) + methods = await json.get_remote_methods() + assert type(methods) == tuple + assert len(methods) > 0 def test_render_fail_disconnected(self): json = JSON() @@ -72,7 +42,7 @@ class JSONTestCase(JSONBase): request.method = b'POST' request._disconnected = True # When disconnected, returns empty string - self.assertEqual(json.render(request), '') + assert json.render(request) == '' def test_render_fail(self): json = JSON() @@ -82,19 +52,17 @@ class JSONTestCase(JSONBase): def write(response_str): request.write_was_called = True response = json_lib.loads(response_str.decode()) - self.assertEqual(response['result'], None) - self.assertEqual(response['id'], None) - self.assertEqual( - response['error']['message'], 'JSONException: JSON not decodable' - ) - self.assertEqual(response['error']['code'], 5) + assert response['result'] is None + assert response['id'] is None + assert response['error']['message'] == 'JSONException: JSON not decodable' + assert response['error']['code'] == 5 request.write = write request.write_was_called = False request._disconnected = False request.getHeader.return_value = b'application/json' - self.assertEqual(json.render(request), server.NOT_DONE_YET) - self.assertTrue(request.write_was_called) + assert json.render(request) == server.NOT_DONE_YET + assert request.write_was_called def test_handle_request_invalid_method(self): json = JSON() @@ -102,20 +70,23 @@ class JSONTestCase(JSONBase): json_data = {'method': 'no-existing-module.test', 'id': 0, 'params': []} request.json = json_lib.dumps(json_data).encode() request_id, result, error = json._handle_request(request) - self.assertEqual(error, {'message': 'Unknown method', 'code': 2}) + assert error == {'message': 'Unknown method', 'code': 2} def test_handle_request_invalid_json_request(self): json = JSON() request = MagicMock() json_data = {'id': 0, 'params': []} request.json = json_lib.dumps(json_data).encode() - self.assertRaises(JSONException, json._handle_request, request) + with pytest.raises(JSONException): + json._handle_request(request) json_data = {'method': 'some.method', 'params': []} request.json = json_lib.dumps(json_data).encode() - self.assertRaises(JSONException, json._handle_request, request) + with pytest.raises(JSONException): + json._handle_request(request) json_data = {'method': 'some.method', 'id': 0} request.json = json_lib.dumps(json_data).encode() - self.assertRaises(JSONException, json._handle_request, request) + with pytest.raises(JSONException): + json._handle_request(request) def test_on_json_request_invalid_content_type(self): """Test for exception with content type not application/json""" @@ -124,18 +95,32 @@ class JSONTestCase(JSONBase): request.getHeader.return_value = b'text/plain' json_data = {'method': 'some.method', 'id': 0, 'params': []} request.json = json_lib.dumps(json_data).encode() - self.assertRaises(JSONException, json._on_json_request, request) + with pytest.raises(JSONException): + json._on_json_request(request) + def test_on_json_request_valid_content_type(self): + """Ensure content-type application/json is accepted""" + json = JSON() + request = MagicMock() + request.getHeader.return_value = b'application/json' + json_data = {'method': 'some.method', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + json._on_json_request(request) -class JSONCustomUserTestCase(JSONBase): - def set_up(self): - d = self.common_set_up() - d.addCallback(self.start_core) - return d + def test_on_json_request_valid_content_type_with_charset(self): + """Ensure content-type parameters such as charset are ignored""" + json = JSON() + request = MagicMock() + request.getHeader.return_value = b'application/json;charset=utf-8' + json_data = {'method': 'some.method', 'id': 0, 'params': []} + request.json = json_lib.dumps(json_data).encode() + json._on_json_request(request) - @defer.inlineCallbacks + +@pytest.mark.usefixtures('daemon', 'client', 'component') +class TestJSONCustomUserTestCase: + @pytest_twisted.inlineCallbacks def test_handle_request_auth_error(self): - yield self.connect_client() json = JSON() auth_conf = {'session_timeout': 10, 'sessions': {}} Auth(auth_conf) # Must create the component @@ -148,13 +133,12 @@ class JSONCustomUserTestCase(JSONBase): json_data = {'method': 'core.get_libtorrent_version', 'id': 0, 'params': []} request.json = json_lib.dumps(json_data).encode() request_id, result, error = json._handle_request(request) - self.assertEqual(error, {'message': 'Not authenticated', 'code': 1}) + assert error == {'message': 'Not authenticated', 'code': 1} -class RPCRaiseDelugeErrorJSONTestCase(JSONBase): - def set_up(self): - d = self.common_set_up() - custom_script = """ +@pytest.mark.usefixtures('daemon', 'client', 'component') +class TestRPCRaiseDelugeErrorJSON: + daemon_custom_script = """ from deluge.error import DelugeError from deluge.core.rpcserver import export class TestClass(object): @@ -165,12 +149,9 @@ class RPCRaiseDelugeErrorJSONTestCase(JSONBase): test = TestClass() daemon.rpcserver.register_object(test) """ - d.addCallback(self.start_core, custom_script=custom_script) - d.addCallbacks(self.connect_client, self.terminate_core) - return d - @defer.inlineCallbacks - def test_handle_request_method_raise_delugeerror(self): + @pytest_twisted.ensureDeferred + async def test_handle_request_method_raise_delugeerror(self): json = JSON() def get_session_id(s_id): @@ -182,9 +163,9 @@ class RPCRaiseDelugeErrorJSONTestCase(JSONBase): request = Request(MagicMock(), False) request.base = b'' auth._create_session(request) - methods = yield json.get_remote_methods() + methods = await json.get_remote_methods() # Verify the function has been registered - self.assertTrue('testclass.test' in methods) + assert 'testclass.test' in methods request = MagicMock() session_id = list(auth.config['sessions'])[0] @@ -192,18 +173,13 @@ class RPCRaiseDelugeErrorJSONTestCase(JSONBase): json_data = {'method': 'testclass.test', 'id': 0, 'params': []} request.json = json_lib.dumps(json_data).encode() request_id, result, error = json._handle_request(request) - result.addCallback(self.fail) - - def on_error(error): - self.assertEqual(error.type, DelugeError) - - result.addErrback(on_error) - yield result + with pytest.raises(DelugeError): + await result -class JSONRequestFailedTestCase(JSONBase, WebServerMockBase): - def set_up(self): - d = self.common_set_up() +class TestJSONRequestFailed(WebServerMockBase): + @pytest_twisted.async_yield_fixture(autouse=True) + async def set_up(self, config_dir): custom_script = """ from deluge.error import DelugeError from deluge.core.rpcserver import export @@ -234,28 +210,29 @@ class JSONRequestFailedTestCase(JSONBase, WebServerMockBase): } def on_test_raise(*args): - self.assertTrue('Unhandled error in Deferred:' in self.core.stderr_out) - self.assertTrue('in test_raise_error' in self.core.stderr_out) + assert 'Unhandled error in Deferred:' in self.core.stderr_out + assert 'in test_raise_error' in self.core.stderr_out extra_callback['deferred'].addCallback(on_test_raise) - d.addCallback( - self.start_core, + d, daemon = common.start_core( custom_script=custom_script, print_stdout=False, print_stderr=False, timeout=5, extra_callbacks=[extra_callback], + config_directory=config_dir, ) - d.addCallbacks(self.connect_client, self.terminate_core) - return d + await d + yield + await daemon.kill() - @defer.inlineCallbacks - def test_render_on_rpc_request_failed(self): + @pytest_twisted.inlineCallbacks + def test_render_on_rpc_request_failed(self, component, client): json = JSON() methods = yield json.get_remote_methods() # Verify the function has been registered - self.assertTrue('testclass.test' in methods) + assert 'testclass.test' in methods request = MagicMock() @@ -266,14 +243,14 @@ class JSONRequestFailedTestCase(JSONBase, WebServerMockBase): def write(response_str): request.write_was_called = True response = json_lib.loads(response_str.decode()) - self.assertEqual(response['result'], None, 'BAD RESULT') - self.assertEqual(response['id'], 0) - self.assertEqual( - response['error']['message'], - 'Failure: [Failure instance: Traceback (failure with no frames):' - " <class 'deluge.error.DelugeError'>: DelugeERROR\n]", + assert response['result'] is None, 'BAD RESULT' + assert response['id'] == 0 + assert ( + response['error']['message'] + == 'Failure: [Failure instance: Traceback (failure with no frames):' + " <class 'deluge.error.DelugeError'>: DelugeERROR\n]" ) - self.assertEqual(response['error']['code'], 4) + assert response['error']['code'] == 4 request.write = write request.write_was_called = False @@ -284,8 +261,8 @@ class JSONRequestFailedTestCase(JSONBase, WebServerMockBase): d = json._on_json_request(request) def on_success(arg): - self.assertEqual(arg, server.NOT_DONE_YET) + assert arg == server.NOT_DONE_YET return True - d.addCallbacks(on_success, self.fail) + d.addCallbacks(on_success, pytest.fail) yield d diff --git a/deluge/tests/test_log.py b/deluge/tests/test_log.py index 572693b7c..f0dcbee86 100644 --- a/deluge/tests/test_log.py +++ b/deluge/tests/test_log.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2015 Calum Lind <calumlind@gmail.com> # Copyright (C) 2010 Pedro Algarvio <ufs@ufsoft.org> @@ -8,17 +7,14 @@ # See LICENSE for more details. # -from __future__ import unicode_literals - import logging import warnings +from deluge.conftest import BaseTestCase from deluge.log import setup_logger -from .basetest import BaseTestCase - -class LogTestCase(BaseTestCase): +class TestLog(BaseTestCase): def set_up(self): setup_logger(logging.DEBUG) @@ -32,7 +28,7 @@ class LogTestCase(BaseTestCase): # Cause all warnings to always be triggered. warnings.simplefilter('always') LOG.debug('foo') - self.assertEqual(w[-1].category, DeprecationWarning) + assert w[-1].category == DeprecationWarning # def test_twisted_error_log(self): # from twisted.internet import defer diff --git a/deluge/tests/test_maketorrent.py b/deluge/tests/test_maketorrent.py index 4e0099653..a2e473f00 100644 --- a/deluge/tests/test_maketorrent.py +++ b/deluge/tests/test_maketorrent.py @@ -1,19 +1,13 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - import os import tempfile -from twisted.trial import unittest - from deluge import maketorrent -from deluge.common import windows_check def check_torrent(filename): @@ -28,7 +22,7 @@ def check_torrent(filename): TorrentInfo(filename) -class MakeTorrentTestCase(unittest.TestCase): +class TestMakeTorrent: def test_save_multifile(self): # Create a temporary folder for torrent creation tmp_path = tempfile.mkdtemp() @@ -54,21 +48,16 @@ class MakeTorrentTestCase(unittest.TestCase): os.remove(tmp_file) def test_save_singlefile(self): - if windows_check(): - raise unittest.SkipTest('on windows file not released') - tmp_data = tempfile.mkstemp('testdata')[1] - with open(tmp_data, 'wb') as _file: - _file.write(b'a' * (2314 * 1024)) - t = maketorrent.TorrentMetadata() - t.data_path = tmp_data - tmp_fd, tmp_file = tempfile.mkstemp('.torrent') - t.save(tmp_file) - - check_torrent(tmp_file) - - os.remove(tmp_data) - os.close(tmp_fd) - os.remove(tmp_file) + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_data = tmp_dir + '/data' + with open(tmp_data, 'wb') as _file: + _file.write(b'a' * (2314 * 1024)) + t = maketorrent.TorrentMetadata() + t.data_path = tmp_data + tmp_file = tmp_dir + '/.torrent' + t.save(tmp_file) + + check_torrent(tmp_file) def test_save_multifile_padded(self): # Create a temporary folder for torrent creation diff --git a/deluge/tests/test_maybe_coroutine.py b/deluge/tests/test_maybe_coroutine.py new file mode 100644 index 000000000..2717e78bb --- /dev/null +++ b/deluge/tests/test_maybe_coroutine.py @@ -0,0 +1,213 @@ +# +# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with +# the additional special exception to link portions of this program with the OpenSSL library. +# See LICENSE for more details. +# +import pytest +import pytest_twisted +import twisted.python.failure +from twisted.internet import defer, reactor, task +from twisted.internet.defer import maybeDeferred + +from deluge.decorators import maybe_coroutine + + +@defer.inlineCallbacks +def inline_func(): + result = yield task.deferLater(reactor, 0, lambda: 'function_result') + return result + + +@defer.inlineCallbacks +def inline_error(): + raise Exception('function_error') + yield + + +@maybe_coroutine +async def coro_func(): + result = await task.deferLater(reactor, 0, lambda: 'function_result') + return result + + +@maybe_coroutine +async def coro_error(): + raise Exception('function_error') + + +@defer.inlineCallbacks +def coro_func_from_inline(): + result = yield coro_func() + return result + + +@defer.inlineCallbacks +def coro_error_from_inline(): + result = yield coro_error() + return result + + +@maybe_coroutine +async def coro_func_from_coro(): + return await coro_func() + + +@maybe_coroutine +async def coro_error_from_coro(): + return await coro_error() + + +@maybe_coroutine +async def inline_func_from_coro(): + return await inline_func() + + +@maybe_coroutine +async def inline_error_from_coro(): + return await inline_error() + + +@pytest_twisted.inlineCallbacks +def test_standard_twisted(): + """Sanity check that twisted tests work how we expect. + + Not really testing deluge code at all. + """ + result = yield inline_func() + assert result == 'function_result' + + with pytest.raises(Exception, match='function_error'): + yield inline_error() + + +@pytest.mark.parametrize( + 'function', + [ + inline_func, + coro_func, + coro_func_from_coro, + coro_func_from_inline, + inline_func_from_coro, + ], +) +@pytest_twisted.inlineCallbacks +def test_from_inline(function): + """Test our coroutines wrapped with maybe_coroutine as if they returned plain twisted deferreds.""" + result = yield function() + assert result == 'function_result' + + def cb(result): + assert result == 'function_result' + + d = function() + d.addCallback(cb) + yield d + + +@pytest.mark.parametrize( + 'function', + [ + inline_error, + coro_error, + coro_error_from_coro, + coro_error_from_inline, + inline_error_from_coro, + ], +) +@pytest_twisted.inlineCallbacks +def test_error_from_inline(function): + """Test our coroutines wrapped with maybe_coroutine as if they returned plain twisted deferreds that raise.""" + with pytest.raises(Exception, match='function_error'): + yield function() + + def eb(result): + assert isinstance(result, twisted.python.failure.Failure) + assert result.getErrorMessage() == 'function_error' + + d = function() + d.addErrback(eb) + yield d + + +@pytest.mark.parametrize( + 'function', + [ + inline_func, + coro_func, + coro_func_from_coro, + coro_func_from_inline, + inline_func_from_coro, + ], +) +@pytest_twisted.ensureDeferred +async def test_from_coro(function): + """Test our coroutines wrapped with maybe_coroutine work from another coroutine.""" + result = await function() + assert result == 'function_result' + + +@pytest.mark.parametrize( + 'function', + [ + inline_error, + coro_error, + coro_error_from_coro, + coro_error_from_inline, + inline_error_from_coro, + ], +) +@pytest_twisted.ensureDeferred +async def test_error_from_coro(function): + """Test our coroutines wrapped with maybe_coroutine work from another coroutine with errors.""" + with pytest.raises(Exception, match='function_error'): + await function() + + +@pytest_twisted.ensureDeferred +async def test_tracebacks_preserved(): + with pytest.raises(Exception) as exc: + await coro_error_from_coro() + traceback_lines = [ + 'await coro_error_from_coro()', + 'return await coro_error()', + "raise Exception('function_error')", + ] + # If each coroutine got wrapped with ensureDeferred, the traceback will be mangled + # verify the coroutines passed through by checking the traceback. + for expected, actual in zip(traceback_lines, exc.traceback): + assert expected in str(actual) + + +@pytest_twisted.ensureDeferred +async def test_maybe_deferred_coroutine(): + result = await maybeDeferred(coro_func) + assert result == 'function_result' + + +@pytest_twisted.ensureDeferred +async def test_callback_before_await(): + def cb(res): + assert res == 'function_result' + return res + + d = coro_func() + d.addCallback(cb) + result = await d + assert result == 'function_result' + + +@pytest_twisted.ensureDeferred +async def test_callback_after_await(): + """If it has already been used as a coroutine, can't be retroactively turned into a Deferred. + This limitation could be fixed, but the extra complication doesn't feel worth it. + """ + + def cb(res): + pass + + d = coro_func() + await d + with pytest.raises( + Exception, match='Cannot add callbacks to an already awaited coroutine' + ): + d.addCallback(cb) diff --git a/deluge/tests/test_metafile.py b/deluge/tests/test_metafile.py index fc6507cb8..fda1cb73e 100644 --- a/deluge/tests/test_metafile.py +++ b/deluge/tests/test_metafile.py @@ -1,19 +1,13 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - import os import tempfile -from twisted.trial import unittest - from deluge import metafile -from deluge.common import windows_check def check_torrent(filename): @@ -28,7 +22,7 @@ def check_torrent(filename): TorrentInfo(filename) -class MetafileTestCase(unittest.TestCase): +class TestMetafile: def test_save_multifile(self): # Create a temporary folder for torrent creation tmp_path = tempfile.mkdtemp() @@ -52,17 +46,12 @@ class MetafileTestCase(unittest.TestCase): os.remove(tmp_file) def test_save_singlefile(self): - if windows_check(): - raise unittest.SkipTest('on windows \\ != / for path names') - tmp_path = tempfile.mkstemp('testdata')[1] - with open(tmp_path, 'wb') as tmp_file: - tmp_file.write(b'a' * (2314 * 1024)) - - tmp_fd, tmp_file = tempfile.mkstemp('.torrent') - metafile.make_meta_file(tmp_path, '', 32768, target=tmp_file) + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_data = tmp_dir + '/testdata' + with open(tmp_data, 'wb') as tmp_file: + tmp_file.write(b'a' * (2314 * 1024)) - check_torrent(tmp_file) + tmp_torrent = tmp_dir + '/.torrent' + metafile.make_meta_file(tmp_data, '', 32768, target=tmp_torrent) - os.remove(tmp_path) - os.close(tmp_fd) - os.remove(tmp_file) + check_torrent(tmp_torrent) diff --git a/deluge/tests/test_plugin_metadata.py b/deluge/tests/test_plugin_metadata.py index 436fc2c50..adf115d1b 100644 --- a/deluge/tests/test_plugin_metadata.py +++ b/deluge/tests/test_plugin_metadata.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2015 Calum Lind <calumlind@gmail.com> # @@ -7,25 +6,38 @@ # See LICENSE for more details. # -from __future__ import unicode_literals - from deluge.pluginmanagerbase import PluginManagerBase -from . import common -from .basetest import BaseTestCase - - -class PluginManagerBaseTestCase(BaseTestCase): - def set_up(self): - common.set_tmp_config_dir() +class TestPluginManagerBase: def test_get_plugin_info(self): pm = PluginManagerBase('core.conf', 'deluge.plugin.core') for p in pm.get_available_plugins(): for key, value in pm.get_plugin_info(p).items(): - self.assertTrue(isinstance('%s: %s' % (key, value), ''.__class__)) + assert isinstance(key, str) + assert isinstance(value, str) def test_get_plugin_info_invalid_name(self): pm = PluginManagerBase('core.conf', 'deluge.plugin.core') for key, value in pm.get_plugin_info('random').items(): - self.assertEqual(value, 'not available') + result = 'not available' if key in ('Name', 'Version') else '' + assert value == result + + def test_parse_pkg_info_metadata_2_1(self): + pkg_info = """Metadata-Version: 2.1 +Name: AutoAdd +Version: 1.8 +Summary: Monitors folders for .torrent files. +Home-page: http://dev.deluge-torrent.org/wiki/Plugins/AutoAdd +Author: Chase Sterling, Pedro Algarvio +Author-email: chase.sterling@gmail.com, pedro@algarvio.me +License: GPLv3 +Platform: UNKNOWN + +Monitors folders for .torrent files. + """ + plugin_info = PluginManagerBase.parse_pkg_info(pkg_info) + for value in plugin_info.values(): + assert value != '' + result = 'Monitors folders for .torrent files.' + assert plugin_info['Description'] == result diff --git a/deluge/tests/test_rpcserver.py b/deluge/tests/test_rpcserver.py index 02f9af023..982d1d5f1 100644 --- a/deluge/tests/test_rpcserver.py +++ b/deluge/tests/test_rpcserver.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2013 Bro <bro.development@gmail.com> # @@ -7,18 +6,15 @@ # See LICENSE for more details. # -from __future__ import unicode_literals - import deluge.component as component import deluge.error from deluge.common import get_localhost_auth +from deluge.conftest import BaseTestCase from deluge.core import rpcserver from deluge.core.authmanager import AuthManager from deluge.core.rpcserver import DelugeRPCProtocol, RPCServer from deluge.log import setup_logger -from .basetest import BaseTestCase - setup_logger('none') @@ -30,7 +26,7 @@ class DelugeRPCProtocolTester(DelugeRPCProtocol): self.messages.append(data) -class RPCServerTestCase(BaseTestCase): +class TestRPCServer(BaseTestCase): def set_up(self): self.rpcserver = RPCServer(listen=False) self.rpcserver.factory.protocol = DelugeRPCProtocolTester @@ -60,15 +56,15 @@ class RPCServerTestCase(BaseTestCase): e = TorrentFolderRenamedEvent(*data) self.rpcserver.emit_event_for_session_id(self.session_id, e) msg = self.protocol.messages.pop() - self.assertEqual(msg[0], rpcserver.RPC_EVENT, str(msg)) - self.assertEqual(msg[1], 'TorrentFolderRenamedEvent', str(msg)) - self.assertEqual(msg[2], data, str(msg)) + assert msg[0] == rpcserver.RPC_EVENT, str(msg) + assert msg[1] == 'TorrentFolderRenamedEvent', str(msg) + assert msg[2] == data, str(msg) def test_invalid_client_login(self): self.protocol.dispatch(self.request_id, 'daemon.login', [1], {}) msg = self.protocol.messages.pop() - self.assertEqual(msg[0], rpcserver.RPC_ERROR) - self.assertEqual(msg[1], self.request_id) + assert msg[0] == rpcserver.RPC_ERROR + assert msg[1] == self.request_id def test_valid_client_login(self): self.authmanager = AuthManager() @@ -77,9 +73,9 @@ class RPCServerTestCase(BaseTestCase): self.request_id, 'daemon.login', auth, {'client_version': 'Test'} ) msg = self.protocol.messages.pop() - self.assertEqual(msg[0], rpcserver.RPC_RESPONSE, str(msg)) - self.assertEqual(msg[1], self.request_id, str(msg)) - self.assertEqual(msg[2], rpcserver.AUTH_LEVEL_ADMIN, str(msg)) + assert msg[0] == rpcserver.RPC_RESPONSE, str(msg) + assert msg[1] == self.request_id, str(msg) + assert msg[2] == rpcserver.AUTH_LEVEL_ADMIN, str(msg) def test_client_login_error(self): # This test causes error log prints while running the test... @@ -90,24 +86,24 @@ class RPCServerTestCase(BaseTestCase): self.request_id, 'daemon.login', auth, {'client_version': 'Test'} ) msg = self.protocol.messages.pop() - self.assertEqual(msg[0], rpcserver.RPC_ERROR) - self.assertEqual(msg[1], self.request_id) - self.assertEqual(msg[2], 'WrappedException') - self.assertEqual(msg[3][1], 'AttributeError') + assert msg[0] == rpcserver.RPC_ERROR + assert msg[1] == self.request_id + assert msg[2] == 'WrappedException' + assert msg[3][1] == 'AttributeError' def test_client_invalid_method_call(self): self.authmanager = AuthManager() auth = get_localhost_auth() self.protocol.dispatch(self.request_id, 'invalid_function', auth, {}) msg = self.protocol.messages.pop() - self.assertEqual(msg[0], rpcserver.RPC_ERROR) - self.assertEqual(msg[1], self.request_id) - self.assertEqual(msg[2], 'WrappedException') - self.assertEqual(msg[3][1], 'AttributeError') + assert msg[0] == rpcserver.RPC_ERROR + assert msg[1] == self.request_id + assert msg[2] == 'WrappedException' + assert msg[3][1] == 'AttributeError' def test_daemon_info(self): self.protocol.dispatch(self.request_id, 'daemon.info', [], {}) msg = self.protocol.messages.pop() - self.assertEqual(msg[0], rpcserver.RPC_RESPONSE, str(msg)) - self.assertEqual(msg[1], self.request_id, str(msg)) - self.assertEqual(msg[2], deluge.common.get_version(), str(msg)) + assert msg[0] == rpcserver.RPC_RESPONSE, str(msg) + assert msg[1] == self.request_id, str(msg) + assert msg[2] == deluge.common.get_version(), str(msg) diff --git a/deluge/tests/test_security.py b/deluge/tests/test_security.py index 700fc9967..e3e434433 100644 --- a/deluge/tests/test_security.py +++ b/deluge/tests/test_security.py @@ -1,12 +1,9 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import print_function, unicode_literals - import os import pytest @@ -16,9 +13,9 @@ import deluge.component as component import deluge.ui.web.server from deluge import configmanager from deluge.common import windows_check +from deluge.conftest import BaseTestCase from deluge.ui.web.server import DelugeWeb -from .basetest import BaseTestCase from .common import get_test_data_file from .common_web import WebServerTestBase from .daemon_base import DaemonBase @@ -26,15 +23,10 @@ from .daemon_base import DaemonBase SECURITY_TESTS = bool(os.getenv('SECURITY_TESTS', False)) -class SecurityBaseTestCase(object): - if windows_check(): - skip = 'windows cannot run .sh files' - elif not SECURITY_TESTS: - skip = 'Skipping security tests' - - http_err = 'cannot run http tests on daemon' - - def __init__(self): +# TODO: This whole module has not been tested since migrating tests fully to pytest +class SecurityBaseTestCase: + @pytest.fixture(autouse=True) + def setvars(self): self.home_dir = os.path.expanduser('~') self.port = 8112 @@ -45,6 +37,7 @@ class SecurityBaseTestCase(object): get_test_data_file('testssl.sh'), '--quiet', '--nodns', + 'none', '--color', '0', test, @@ -55,11 +48,11 @@ class SecurityBaseTestCase(object): def on_result(results): if test == '-e': - results = results[0].split('\n')[7:-6] - self.assertTrue(len(results) > 3) + results = results[0].split(b'\n')[7:-6] + assert len(results) > 3 else: - self.assertIn('OK', results[0]) - self.assertNotIn('NOT ok', results[0]) + assert b'OK' in results[0] + assert b'NOT ok' not in results[0] d.addCallback(on_result) return d @@ -76,18 +69,12 @@ class SecurityBaseTestCase(object): def test_secured_webserver_css_injection_vulnerability(self): return self._run_test('-I') - def test_secured_webserver_ticketbleed_vulnerability(self): - return self._run_test('-T') - def test_secured_webserver_renegotiation_vulnerabilities(self): return self._run_test('-R') def test_secured_webserver_crime_vulnerability(self): return self._run_test('-C') - def test_secured_webserver_breach_vulnerability(self): - return self._run_test('-B') - def test_secured_webserver_poodle_vulnerability(self): return self._run_test('-O') @@ -121,33 +108,14 @@ class SecurityBaseTestCase(object): def test_secured_webserver_preference(self): return self._run_test('-P') - def test_secured_webserver_headers(self): - return self._run_test('-h') - def test_secured_webserver_ciphers(self): return self._run_test('-e') +@pytest.mark.skipif(windows_check(), reason='windows cannot run .sh files') +@pytest.mark.skipif(not SECURITY_TESTS, reason='skipping security tests') @pytest.mark.security -class DaemonSecurityTestCase(BaseTestCase, DaemonBase, SecurityBaseTestCase): - - if windows_check(): - skip = 'windows cannot start_core not enough arguments for format string' - - def __init__(self, testname): - super(DaemonSecurityTestCase, self).__init__(testname) - DaemonBase.__init__(self) - SecurityBaseTestCase.__init__(self) - - def setUp(self): - skip = False - for not_http_test in ('breach', 'headers', 'ticketbleed'): - if not_http_test in self.id().split('.')[-1]: - self.skipTest(SecurityBaseTestCase.http_err) - skip = True - if not skip: - super(DaemonSecurityTestCase, self).setUp() - +class TestDaemonSecurity(BaseTestCase, DaemonBase, SecurityBaseTestCase): def set_up(self): d = self.common_set_up() self.port = self.listen_port @@ -161,12 +129,10 @@ class DaemonSecurityTestCase(BaseTestCase, DaemonBase, SecurityBaseTestCase): return d +@pytest.mark.skipif(windows_check(), reason='windows cannot run .sh files') +@pytest.mark.skipif(not SECURITY_TESTS, reason='skipping security tests') @pytest.mark.security -class WebUISecurityTestBase(WebServerTestBase, SecurityBaseTestCase): - def __init__(self, testname): - super(WebUISecurityTestBase, self).__init__(testname) - SecurityBaseTestCase.__init__(self) - +class TestWebUISecurity(WebServerTestBase, SecurityBaseTestCase): def start_webapi(self, arg): self.port = self.webserver_listen_port = 8999 @@ -182,3 +148,12 @@ class WebUISecurityTestBase(WebServerTestBase, SecurityBaseTestCase): self.deluge_web.web_api.hostlist.config['hosts'][0] = tuple(host) self.host_id = host[0] self.deluge_web.start() + + def test_secured_webserver_headers(self): + return self._run_test('-h') + + def test_secured_webserver_breach_vulnerability(self): + return self._run_test('-B') + + def test_secured_webserver_ticketbleed_vulnerability(self): + return self._run_test('-T') diff --git a/deluge/tests/test_sessionproxy.py b/deluge/tests/test_sessionproxy.py index 03f3cc27e..6fbbb248b 100644 --- a/deluge/tests/test_sessionproxy.py +++ b/deluge/tests/test_sessionproxy.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> # @@ -6,19 +5,16 @@ # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # - -from __future__ import unicode_literals - +import pytest_twisted from twisted.internet.defer import maybeDeferred, succeed from twisted.internet.task import Clock import deluge.component as component import deluge.ui.sessionproxy +from deluge.conftest import BaseTestCase -from .basetest import BaseTestCase - -class Core(object): +class Core: def __init__(self): self.reset() @@ -91,7 +87,7 @@ class Core(object): return succeed(ret) -class Client(object): +class Client: def __init__(self): self.core = Core() @@ -105,7 +101,7 @@ class Client(object): client = Client() -class SessionProxyTestCase(BaseTestCase): +class TestSessionProxy(BaseTestCase): def set_up(self): self.clock = Clock() self.patch(deluge.ui.sessionproxy, 'time', self.clock.seconds) @@ -127,38 +123,38 @@ class SessionProxyTestCase(BaseTestCase): return component.deregister(self.sp) def test_startup(self): - self.assertEqual(client.core.torrents['a'], self.sp.torrents['a'][1]) + assert client.core.torrents['a'] == self.sp.torrents['a'][1] - def test_get_torrent_status_no_change(self): - d = self.sp.get_torrent_status('a', []) - d.addCallback(self.assertEqual, client.core.torrents['a']) - return d + @pytest_twisted.ensureDeferred + async def test_get_torrent_status_no_change(self): + result = await self.sp.get_torrent_status('a', []) + assert result == client.core.torrents['a'] - def test_get_torrent_status_change_with_cache(self): + @pytest_twisted.ensureDeferred + async def test_get_torrent_status_change_with_cache(self): client.core.torrents['a']['key1'] = 2 - d = self.sp.get_torrent_status('a', ['key1']) - d.addCallback(self.assertEqual, {'key1': 1}) - return d + result = await self.sp.get_torrent_status('a', ['key1']) + assert result == {'key1': 1} - def test_get_torrent_status_change_without_cache(self): + @pytest_twisted.ensureDeferred + async def test_get_torrent_status_change_without_cache(self): client.core.torrents['a']['key1'] = 2 self.clock.advance(self.sp.cache_time + 0.1) - d = self.sp.get_torrent_status('a', []) - d.addCallback(self.assertEqual, client.core.torrents['a']) - return d + result = await self.sp.get_torrent_status('a', []) + assert result == client.core.torrents['a'] - def test_get_torrent_status_key_not_updated(self): + @pytest_twisted.ensureDeferred + async def test_get_torrent_status_key_not_updated(self): self.clock.advance(self.sp.cache_time + 0.1) self.sp.get_torrent_status('a', ['key1']) client.core.torrents['a']['key2'] = 99 - d = self.sp.get_torrent_status('a', ['key2']) - d.addCallback(self.assertEqual, {'key2': 99}) - return d + result = await self.sp.get_torrent_status('a', ['key2']) + assert result == {'key2': 99} - def test_get_torrents_status_key_not_updated(self): + @pytest_twisted.ensureDeferred + async def test_get_torrents_status_key_not_updated(self): self.clock.advance(self.sp.cache_time + 0.1) self.sp.get_torrents_status({'id': ['a']}, ['key1']) client.core.torrents['a']['key2'] = 99 - d = self.sp.get_torrents_status({'id': ['a']}, ['key2']) - d.addCallback(self.assertEqual, {'a': {'key2': 99}}) - return d + result = await self.sp.get_torrents_status({'id': ['a']}, ['key2']) + assert result == {'a': {'key2': 99}} diff --git a/deluge/tests/test_torrent.py b/deluge/tests/test_torrent.py index 5da817924..36adc0fde 100644 --- a/deluge/tests/test_torrent.py +++ b/deluge/tests/test_torrent.py @@ -1,41 +1,37 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # - -from __future__ import print_function, unicode_literals - +import itertools import os import time from base64 import b64encode +from unittest import mock -import mock +import pytest +import pytest_twisted from twisted.internet import defer, reactor from twisted.internet.task import deferLater -from twisted.trial import unittest import deluge.component as component import deluge.core.torrent import deluge.tests.common as common from deluge._libtorrent import lt -from deluge.common import VersionSplit, utf8_encode_structure, windows_check +from deluge.common import VersionSplit, utf8_encode_structure +from deluge.conftest import BaseTestCase from deluge.core.core import Core from deluge.core.rpcserver import RPCServer from deluge.core.torrent import Torrent from deluge.core.torrentmanager import TorrentManager, TorrentState -from .basetest import BaseTestCase - -class TorrentTestCase(BaseTestCase): +class TestTorrent(BaseTestCase): def setup_config(self): - config_dir = common.set_tmp_config_dir() core_config = deluge.config.Config( 'core.conf', defaults=deluge.core.preferencesmanager.DEFAULT_PREFS, - config_dir=config_dir, + config_dir=self.config_dir, ) core_config.save() @@ -67,7 +63,7 @@ class TorrentTestCase(BaseTestCase): def assert_state(self, torrent, state): torrent.update_state() - self.assertEqual(torrent.state, state) + assert torrent.state == state def get_torrent_atp(self, filename): filename = common.get_test_data_file(filename) @@ -85,25 +81,37 @@ class TorrentTestCase(BaseTestCase): } return atp - def test_set_file_priorities(self): + @pytest_twisted.ensureDeferred + async def test_set_file_priorities(self): + if getattr(lt, 'file_prio_alert', None): + # Libtorrent 2.0.3 and later has a file_prio_alert + prios_set = defer.Deferred() + prios_set.addTimeout(1.5, reactor) + component.get('AlertManager').register_handler( + 'file_prio_alert', lambda a: prios_set.callback(True) + ) + else: + # On older libtorrent, we just wait a while + prios_set = deferLater(reactor, 0.8) + atp = self.get_torrent_atp('dir_with_6_files.torrent') handle = self.session.add_torrent(atp) torrent = Torrent(handle, {}) result = torrent.get_file_priorities() - self.assertTrue(all(x == 4 for x in result)) + assert all(x == 4 for x in result) new_priorities = [3, 1, 2, 0, 5, 6, 7] torrent.set_file_priorities(new_priorities) - self.assertEqual(torrent.get_file_priorities(), new_priorities) + assert torrent.get_file_priorities() == new_priorities # Test with handle.piece_priorities as handle.file_priorities async # updates and will return old value. Also need to remove a priority # value as one file is much smaller than piece size so doesn't show. - time.sleep(0.6) # Delay to wait for alert from lt - piece_prio = handle.piece_priorities() + await prios_set # Delay to wait for alert from lt + piece_prio = handle.get_piece_priorities() result = all(p in piece_prio for p in [3, 2, 0, 5, 6, 7]) - self.assertTrue(result) + assert result def test_set_prioritize_first_last_pieces(self): piece_indexes = [ @@ -143,19 +151,19 @@ class TorrentTestCase(BaseTestCase): handle = self.session.add_torrent(atp) self.torrent = Torrent(handle, {}) - priorities_original = handle.piece_priorities() + priorities_original = handle.get_piece_priorities() self.torrent.set_prioritize_first_last_pieces(True) - priorities = handle.piece_priorities() + priorities = handle.get_piece_priorities() # The length of the list of new priorites is the same as the original - self.assertEqual(len(priorities_original), len(priorities)) + assert len(priorities_original) == len(priorities) # Test the priority of all the pieces against the calculated indexes. for idx, priority in enumerate(priorities): if idx in prioritized_piece_indexes: - self.assertEqual(priorities[idx], 7) + assert priorities[idx] == 7 else: - self.assertEqual(priorities[idx], 4) + assert priorities[idx] == 4 # self.print_priority_list(priorities) @@ -167,17 +175,15 @@ class TorrentTestCase(BaseTestCase): self.torrent.set_prioritize_first_last_pieces(True) # Reset pirorities self.torrent.set_prioritize_first_last_pieces(False) - priorities = handle.piece_priorities() + priorities = handle.get_piece_priorities() # Test the priority of the prioritized pieces for i in priorities: - self.assertEqual(priorities[i], 4) + assert priorities[i] == 4 # self.print_priority_list(priorities) def test_torrent_error_data_missing(self): - if windows_check(): - raise unittest.SkipTest('unexpected end of file in bencoded string') options = {'seed_mode': True} filename = common.get_test_data_file('test_torrent.file.torrent') with open(filename, 'rb') as _file: @@ -194,8 +200,6 @@ class TorrentTestCase(BaseTestCase): self.assert_state(torrent, 'Error') def test_torrent_error_resume_original_state(self): - if windows_check(): - raise unittest.SkipTest('unexpected end of file in bencoded string') options = {'seed_mode': True, 'add_paused': True} filename = common.get_test_data_file('test_torrent.file.torrent') with open(filename, 'rb') as _file: @@ -215,10 +219,8 @@ class TorrentTestCase(BaseTestCase): torrent.force_recheck() def test_torrent_error_resume_data_unaltered(self): - if windows_check(): - raise unittest.SkipTest('unexpected end of file in bencoded string') if VersionSplit(lt.__version__) >= VersionSplit('1.2.0.0'): - raise unittest.SkipTest('Test not working as expected on lt 1.2 or greater') + pytest.skip('Test not working as expected on lt 1.2 or greater') resume_data = { 'active_time': 13399, @@ -286,7 +288,7 @@ class TorrentTestCase(BaseTestCase): tm_resume_data = lt.bdecode( self.core.torrentmanager.resume_data[torrent.torrent_id] ) - self.assertEqual(tm_resume_data, resume_data) + assert tm_resume_data == resume_data return deferLater(reactor, 0.5, assert_resume_data) @@ -294,7 +296,7 @@ class TorrentTestCase(BaseTestCase): atp = self.get_torrent_atp('test_torrent.file.torrent') handle = self.session.add_torrent(atp) self.torrent = Torrent(handle, {}) - self.assertEqual(self.torrent.get_eta(), 0) + assert self.torrent.get_eta() == 0 self.torrent.status = mock.MagicMock() self.torrent.status.upload_payload_rate = 5000 @@ -304,18 +306,18 @@ class TorrentTestCase(BaseTestCase): self.torrent.is_finished = True self.torrent.options = {'stop_at_ratio': False} # Test finished and uploading but no stop_at_ratio set. - self.assertEqual(self.torrent.get_eta(), 0) + assert self.torrent.get_eta() == 0 self.torrent.options = {'stop_at_ratio': True, 'stop_ratio': 1.5} result = self.torrent.get_eta() - self.assertEqual(result, 2) - self.assertIsInstance(result, int) + assert result == 2 + assert isinstance(result, int) def test_get_eta_downloading(self): atp = self.get_torrent_atp('test_torrent.file.torrent') handle = self.session.add_torrent(atp) self.torrent = Torrent(handle, {}) - self.assertEqual(self.torrent.get_eta(), 0) + assert self.torrent.get_eta() == 0 self.torrent.status = mock.MagicMock() self.torrent.status.download_payload_rate = 50 @@ -323,15 +325,15 @@ class TorrentTestCase(BaseTestCase): self.torrent.status.total_wanted_done = 5000 result = self.torrent.get_eta() - self.assertEqual(result, 100) - self.assertIsInstance(result, int) + assert result == 100 + assert isinstance(result, int) def test_get_name_unicode(self): """Test retrieving a unicode torrent name from libtorrent.""" atp = self.get_torrent_atp('unicode_file.torrent') handle = self.session.add_torrent(atp) self.torrent = Torrent(handle, {}) - self.assertEqual(self.torrent.get_name(), 'সুকুমার রায়.txt') + assert self.torrent.get_name() == 'সুকুমার রায়.txt' def test_rename_unicode(self): """Test renaming file/folders with unicode filenames.""" @@ -342,15 +344,32 @@ class TorrentTestCase(BaseTestCase): TorrentManager.save_resume_data = mock.MagicMock result = self.torrent.rename_folder('unicode_filenames', 'Горбачёв') - self.assertIsInstance(result, defer.DeferredList) + assert isinstance(result, defer.DeferredList) result = self.torrent.rename_files([[0, 'new_рбачёв']]) - self.assertIsNone(result) + assert result is None def test_connect_peer_port(self): """Test to ensure port is int for libtorrent""" atp = self.get_torrent_atp('test_torrent.file.torrent') handle = self.session.add_torrent(atp) self.torrent = Torrent(handle, {}) - self.assertFalse(self.torrent.connect_peer('127.0.0.1', 'text')) - self.assertTrue(self.torrent.connect_peer('127.0.0.1', '1234')) + assert not self.torrent.connect_peer('127.0.0.1', 'text') + assert self.torrent.connect_peer('127.0.0.1', '1234') + + def test_status_cache(self): + atp = self.get_torrent_atp('test_torrent.file.torrent') + handle = self.session.add_torrent(atp) + mock_time = mock.Mock(return_value=time.time()) + with mock.patch('time.time', mock_time): + torrent = Torrent(handle, {}) + counter = itertools.count() + handle.status = mock.Mock(side_effect=counter.__next__) + first_status = torrent.get_lt_status() + assert first_status == 0, 'sanity check' + assert first_status == torrent.status, 'cached status should be used' + assert torrent.get_lt_status() == 1, 'status should update' + assert torrent.status == 1 + # Advance time and verify cache expires and updates + mock_time.return_value += 10 + assert torrent.status == 2 diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py index e0ff09efc..0ead27230 100644 --- a/deluge/tests/test_torrentmanager.py +++ b/deluge/tests/test_torrentmanager.py @@ -1,38 +1,34 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - import os import shutil import warnings from base64 import b64encode +from unittest import mock -import mock import pytest -from twisted.internet import defer, task -from twisted.trial import unittest +import pytest_twisted +from twisted.internet import reactor, task from deluge import component -from deluge.common import windows_check +from deluge.bencode import bencode +from deluge.conftest import BaseTestCase from deluge.core.core import Core from deluge.core.rpcserver import RPCServer from deluge.error import InvalidTorrentError from . import common -from .basetest import BaseTestCase warnings.filterwarnings('ignore', category=RuntimeWarning) warnings.resetwarnings() -class TorrentmanagerTestCase(BaseTestCase): +class TestTorrentmanager(BaseTestCase): def set_up(self): - self.config_dir = common.set_tmp_config_dir() self.rpcserver = RPCServer(listen=False) self.core = Core() self.core.config.config['lsd'] = False @@ -48,7 +44,7 @@ class TorrentmanagerTestCase(BaseTestCase): return component.shutdown().addCallback(on_shutdown) - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_remove_torrent(self): filename = common.get_test_data_file('test.torrent') with open(filename, 'rb') as _file: @@ -56,9 +52,9 @@ class TorrentmanagerTestCase(BaseTestCase): torrent_id = yield self.core.add_torrent_file_async( filename, b64encode(filedump), {} ) - self.assertTrue(self.tm.remove(torrent_id, False)) + assert self.tm.remove(torrent_id, False) - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_remove_magnet(self): """Test remove magnet before received metadata and delete_copies is True""" magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' @@ -66,9 +62,10 @@ class TorrentmanagerTestCase(BaseTestCase): self.core.config.config['copy_torrent_file'] = True self.core.config.config['del_copy_torrent_file'] = True torrent_id = yield self.core.add_torrent_magnet(magnet, options) - self.assertTrue(self.tm.remove(torrent_id, False)) + assert self.tm.remove(torrent_id, False) - def test_prefetch_metadata(self): + @pytest_twisted.ensureDeferred + async def test_prefetch_metadata(self): from deluge._libtorrent import lt with open(common.get_test_data_file('test.torrent'), 'rb') as _file: @@ -81,47 +78,55 @@ class TorrentmanagerTestCase(BaseTestCase): magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' d = self.tm.prefetch_metadata(magnet, 30) - self.tm.on_alert_metadata_received(mock_alert) + # Make sure to use calllater, because the above prefetch call won't + # actually start running until we await it. + reactor.callLater(0, self.tm.on_alert_metadata_received, mock_alert) expected = ( 'ab570cdd5a17ea1b61e970bb72047de141bce173', - { - b'piece length': 32768, - b'sha1': ( - b'2\xce\xb6\xa8"\xd7\xf0\xd4\xbf\xdc^K\xba\x1bh' - b'\x9d\xc5\xb7\xac\xdd' - ), - b'name': b'azcvsupdater_2.6.2.jar', - b'private': 0, - b'pieces': ( - b"\xdb\x04B\x05\xc3'\xdab\xb8su97\xa9u" - b'\xca<w\\\x1ef\xd4\x9b\x16\xa9}\xc0\x9f:\xfd' - b'\x97qv\x83\xa2"\xef\x9d7\x0by!\rl\xe5v\xb7' - b'\x18{\xf7/"P\xe9\x8d\x01D\x9e8\xbd\x16\xe3' - b'\xfb-\x9d\xaa\xbcM\x11\xba\x92\xfc\x13F\xf0' - b'\x1c\x86x+\xc8\xd0S\xa9\x90`\xa1\xe4\x82\xe8' - b'\xfc\x08\xf7\xe3\xe5\xf6\x85\x1c%\xe7%\n\xed' - b'\xc0\x1f\xa1;\x9a\xea\xcf\x90\x0c/F>\xdf\xdagA' - b'\xc42|\xda\x82\xf5\xa6b\xa1\xb8#\x80wI\xd8f' - b'\xf8\xbd\xacW\xab\xc3s\xe0\xbbw\xf2K\xbe\xee' - b'\xa8rG\xe1W\xe8\xb7\xc2i\xf3\xd8\xaf\x9d\xdc' - b'\xd0#\xf4\xc1\x12u\xcd\x0bE?:\xe8\x9c\x1cu' - b'\xabb(oj\r^\xd5\xd5A\x83\x88\x9a\xa1J\x1c?' - b'\xa1\xd6\x8c\x83\x9e&' - ), - b'length': 307949, - b'name.utf-8': b'azcvsupdater_2.6.2.jar', - b'ed2k': b'>p\xefl\xfa]\x95K\x1b^\xc2\\;;e\xb7', - }, + b64encode( + bencode( + { + b'piece length': 32768, + b'sha1': ( + b'2\xce\xb6\xa8"\xd7\xf0\xd4\xbf\xdc^K\xba\x1bh' + b'\x9d\xc5\xb7\xac\xdd' + ), + b'name': b'azcvsupdater_2.6.2.jar', + b'private': 0, + b'pieces': ( + b"\xdb\x04B\x05\xc3'\xdab\xb8su97\xa9u" + b'\xca<w\\\x1ef\xd4\x9b\x16\xa9}\xc0\x9f:\xfd' + b'\x97qv\x83\xa2"\xef\x9d7\x0by!\rl\xe5v\xb7' + b'\x18{\xf7/"P\xe9\x8d\x01D\x9e8\xbd\x16\xe3' + b'\xfb-\x9d\xaa\xbcM\x11\xba\x92\xfc\x13F\xf0' + b'\x1c\x86x+\xc8\xd0S\xa9\x90`\xa1\xe4\x82\xe8' + b'\xfc\x08\xf7\xe3\xe5\xf6\x85\x1c%\xe7%\n\xed' + b'\xc0\x1f\xa1;\x9a\xea\xcf\x90\x0c/F>\xdf\xdagA' + b'\xc42|\xda\x82\xf5\xa6b\xa1\xb8#\x80wI\xd8f' + b'\xf8\xbd\xacW\xab\xc3s\xe0\xbbw\xf2K\xbe\xee' + b'\xa8rG\xe1W\xe8\xb7\xc2i\xf3\xd8\xaf\x9d\xdc' + b'\xd0#\xf4\xc1\x12u\xcd\x0bE?:\xe8\x9c\x1cu' + b'\xabb(oj\r^\xd5\xd5A\x83\x88\x9a\xa1J\x1c?' + b'\xa1\xd6\x8c\x83\x9e&' + ), + b'length': 307949, + b'name.utf-8': b'azcvsupdater_2.6.2.jar', + b'ed2k': b'>p\xefl\xfa]\x95K\x1b^\xc2\\;;e\xb7', + } + ) + ), ) - self.assertEqual(expected, self.successResultOf(d)) + assert expected == await d - def test_prefetch_metadata_timeout(self): + @pytest_twisted.ensureDeferred + async def test_prefetch_metadata_timeout(self): magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' d = self.tm.prefetch_metadata(magnet, 30) self.clock.advance(30) - expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', None) - return d.addCallback(self.assertEqual, expected) + result = await d + expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'') + assert result == expected @pytest.mark.todo def test_remove_torrent_false(self): @@ -129,20 +134,15 @@ class TorrentmanagerTestCase(BaseTestCase): common.todo_test(self) def test_remove_invalid_torrent(self): - self.assertRaises( - InvalidTorrentError, self.tm.remove, 'torrentidthatdoesntexist' - ) + with pytest.raises(InvalidTorrentError): + self.tm.remove('torrentidthatdoesntexist') - def test_open_state_from_python2(self): - """Open a Python2 state with a UTF-8 encoded torrent filename.""" + def test_open_state(self): + """Open a state with a UTF-8 encoded torrent filename.""" shutil.copy( common.get_test_data_file('utf8_filename_torrents.state'), os.path.join(self.config_dir, 'state', 'torrents.state'), ) - if windows_check(): - raise unittest.SkipTest( - 'Windows ModuleNotFoundError due to Linux line ending' - ) state = self.tm.open_state() - self.assertEqual(len(state.torrents), 1) + assert len(state.torrents) == 1 diff --git a/deluge/tests/test_torrentview.py b/deluge/tests/test_torrentview.py index 590760d1e..8d0568866 100644 --- a/deluge/tests/test_torrentview.py +++ b/deluge/tests/test_torrentview.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2014 Bro <bro.development@gmail.com> # Copyright (C) 2014 Calum Lind <calumlind@gmail.com> @@ -8,18 +7,13 @@ # See LICENSE for more details. # -from __future__ import unicode_literals - import pytest -from twisted.trial import unittest import deluge.component as component from deluge.configmanager import ConfigManager +from deluge.conftest import BaseTestCase from deluge.i18n import setup_translation -from . import common -from .basetest import BaseTestCase - # Allow running other tests without GTKUI dependencies available try: # pylint: disable=ungrouped-imports @@ -40,7 +34,7 @@ setup_translation() @pytest.mark.gtkui -class TorrentviewTestCase(BaseTestCase): +class TestTorrentview(BaseTestCase): default_column_index = [ 'filter', @@ -66,6 +60,7 @@ class TorrentviewTestCase(BaseTestCase): 'Added', 'Completed', 'Complete Seen', + 'Last Transfer', 'Tracker', 'Download Folder', 'Owner', @@ -99,6 +94,7 @@ class TorrentviewTestCase(BaseTestCase): int, int, int, + int, str, str, # Tracker str, @@ -108,9 +104,8 @@ class TorrentviewTestCase(BaseTestCase): def set_up(self): if libs_available is False: - raise unittest.SkipTest('GTKUI dependencies not available') + pytest.skip('GTKUI dependencies not available') - common.set_tmp_config_dir() # MainWindow loads this config file, so lets make sure it contains the defaults ConfigManager('gtk3ui.conf', defaults=DEFAULT_PREFS) self.mainwindow = MainWindow() @@ -122,36 +117,23 @@ class TorrentviewTestCase(BaseTestCase): return component.shutdown() def test_torrentview_columns(self): - - self.assertEqual( - self.torrentview.column_index, TorrentviewTestCase.default_column_index - ) - self.assertEqual( - self.torrentview.liststore_columns, - TorrentviewTestCase.default_liststore_columns, - ) - self.assertEqual( - self.torrentview.columns['Download Folder'].column_indices, [29] - ) + assert self.torrentview.column_index == self.default_column_index + assert self.torrentview.liststore_columns == self.default_liststore_columns + assert self.torrentview.columns['Download Folder'].column_indices == [30] def test_add_column(self): - # Add a text column test_col = 'Test column' self.torrentview.add_text_column(test_col, status_field=['label']) - self.assertEqual( - len(self.torrentview.liststore_columns), - len(TorrentviewTestCase.default_liststore_columns) + 1, - ) - self.assertEqual( - len(self.torrentview.column_index), - len(TorrentviewTestCase.default_column_index) + 1, + assert ( + len(self.torrentview.liststore_columns) + == len(self.default_liststore_columns) + 1 ) - self.assertEqual(self.torrentview.column_index[-1], test_col) - self.assertEqual(self.torrentview.columns[test_col].column_indices, [32]) + assert len(self.torrentview.column_index) == len(self.default_column_index) + 1 + assert self.torrentview.column_index[-1] == test_col + assert self.torrentview.columns[test_col].column_indices == [33] def test_add_columns(self): - # Add a text column test_col = 'Test column' self.torrentview.add_text_column(test_col, status_field=['label']) @@ -160,50 +142,35 @@ class TorrentviewTestCase(BaseTestCase): test_col2 = 'Test column2' self.torrentview.add_text_column(test_col2, status_field=['label2']) - self.assertEqual( - len(self.torrentview.liststore_columns), - len(TorrentviewTestCase.default_liststore_columns) + 2, - ) - self.assertEqual( - len(self.torrentview.column_index), - len(TorrentviewTestCase.default_column_index) + 2, + assert ( + len(self.torrentview.liststore_columns) + == len(self.default_liststore_columns) + 2 ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + 2 # test_col - self.assertEqual(self.torrentview.column_index[-2], test_col) - self.assertEqual(self.torrentview.columns[test_col].column_indices, [32]) + assert self.torrentview.column_index[-2] == test_col + assert self.torrentview.columns[test_col].column_indices == [33] # test_col2 - self.assertEqual(self.torrentview.column_index[-1], test_col2) - self.assertEqual(self.torrentview.columns[test_col2].column_indices, [33]) + assert self.torrentview.column_index[-1] == test_col2 + assert self.torrentview.columns[test_col2].column_indices == [34] def test_remove_column(self): - # Add and remove text column test_col = 'Test column' self.torrentview.add_text_column(test_col, status_field=['label']) self.torrentview.remove_column(test_col) - self.assertEqual( - len(self.torrentview.liststore_columns), - len(TorrentviewTestCase.default_liststore_columns), - ) - self.assertEqual( - len(self.torrentview.column_index), - len(TorrentviewTestCase.default_column_index), - ) - self.assertEqual( - self.torrentview.column_index[-1], - TorrentviewTestCase.default_column_index[-1], - ) - self.assertEqual( - self.torrentview.columns[ - TorrentviewTestCase.default_column_index[-1] - ].column_indices, - [31], + assert len(self.torrentview.liststore_columns) == len( + self.default_liststore_columns ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + assert self.torrentview.column_index[-1] == self.default_column_index[-1] + assert self.torrentview.columns[ + self.default_column_index[-1] + ].column_indices == [32] def test_remove_columns(self): - # Add two columns test_col = 'Test column' self.torrentview.add_text_column(test_col, status_field=['label']) @@ -212,74 +179,47 @@ class TorrentviewTestCase(BaseTestCase): # Remove test_col self.torrentview.remove_column(test_col) - self.assertEqual( - len(self.torrentview.liststore_columns), - len(TorrentviewTestCase.default_liststore_columns) + 1, + assert ( + len(self.torrentview.liststore_columns) + == len(self.default_liststore_columns) + 1 ) - self.assertEqual( - len(self.torrentview.column_index), - len(TorrentviewTestCase.default_column_index) + 1, - ) - self.assertEqual(self.torrentview.column_index[-1], test_col2) - self.assertEqual(self.torrentview.columns[test_col2].column_indices, [32]) + assert len(self.torrentview.column_index) == len(self.default_column_index) + 1 + assert self.torrentview.column_index[-1] == test_col2 + assert self.torrentview.columns[test_col2].column_indices == [33] # Remove test_col2 self.torrentview.remove_column(test_col2) - self.assertEqual( - len(self.torrentview.liststore_columns), - len(TorrentviewTestCase.default_liststore_columns), - ) - self.assertEqual( - len(self.torrentview.column_index), - len(TorrentviewTestCase.default_column_index), - ) - self.assertEqual( - self.torrentview.column_index[-1], - TorrentviewTestCase.default_column_index[-1], - ) - self.assertEqual( - self.torrentview.columns[ - TorrentviewTestCase.default_column_index[-1] - ].column_indices, - [31], + assert len(self.torrentview.liststore_columns) == len( + self.default_liststore_columns ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + assert self.torrentview.column_index[-1] == self.default_column_index[-1] + assert self.torrentview.columns[ + self.default_column_index[-1] + ].column_indices == [32] def test_add_remove_column_multiple_types(self): - # Add a column with multiple column types test_col3 = 'Test column3' self.torrentview.add_progress_column( test_col3, status_field=['progress', 'label3'], col_types=[float, str] ) - self.assertEqual( - len(self.torrentview.liststore_columns), - len(TorrentviewTestCase.default_liststore_columns) + 2, - ) - self.assertEqual( - len(self.torrentview.column_index), - len(TorrentviewTestCase.default_column_index) + 1, + assert ( + len(self.torrentview.liststore_columns) + == len(self.default_liststore_columns) + 2 ) - self.assertEqual(self.torrentview.column_index[-1], test_col3) - self.assertEqual(self.torrentview.columns[test_col3].column_indices, [32, 33]) + assert len(self.torrentview.column_index) == len(self.default_column_index) + 1 + assert self.torrentview.column_index[-1] == test_col3 + assert self.torrentview.columns[test_col3].column_indices == [33, 34] # Remove multiple column-types column self.torrentview.remove_column(test_col3) - self.assertEqual( - len(self.torrentview.liststore_columns), - len(TorrentviewTestCase.default_liststore_columns), - ) - self.assertEqual( - len(self.torrentview.column_index), - len(TorrentviewTestCase.default_column_index), - ) - self.assertEqual( - self.torrentview.column_index[-1], - TorrentviewTestCase.default_column_index[-1], - ) - self.assertEqual( - self.torrentview.columns[ - TorrentviewTestCase.default_column_index[-1] - ].column_indices, - [31], + assert len(self.torrentview.liststore_columns) == len( + self.default_liststore_columns ) + assert len(self.torrentview.column_index) == len(self.default_column_index) + assert self.torrentview.column_index[-1] == self.default_column_index[-1] + assert self.torrentview.columns[ + self.default_column_index[-1] + ].column_indices == [32] diff --git a/deluge/tests/test_tracker_icons.py b/deluge/tests/test_tracker_icons.py index e18d33987..2f793d12e 100644 --- a/deluge/tests/test_tracker_icons.py +++ b/deluge/tests/test_tracker_icons.py @@ -1,34 +1,25 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # - -from __future__ import unicode_literals +import os.path import pytest -from twisted.trial.unittest import SkipTest +import pytest_twisted import deluge.component as component import deluge.ui.tracker_icons -from deluge.common import windows_check +from deluge.conftest import BaseTestCase from deluge.ui.tracker_icons import TrackerIcon, TrackerIcons from . import common -from .basetest import BaseTestCase -common.set_tmp_config_dir() -deluge.ui.tracker_icons.PIL_INSTALLED = False common.disable_new_release_check() @pytest.mark.internet -class TrackerIconsTestCase(BaseTestCase): - - if windows_check(): - skip = 'cannot use os.path.samefile to compair on windows(unix only)' - +class TestTrackerIcons(BaseTestCase): def set_up(self): # Disable resizing with Pillow for consistency. self.patch(deluge.ui.tracker_icons, 'Image', None) @@ -37,41 +28,52 @@ class TrackerIconsTestCase(BaseTestCase): def tear_down(self): return component.shutdown() - def test_get_deluge_png(self): + @pytest_twisted.ensureDeferred + async def test_get_deluge_png(self, mock_mkstemp): # Deluge has a png favicon link icon = TrackerIcon(common.get_test_data_file('deluge.png')) - d = self.icons.fetch('deluge-torrent.org') - d.addCallback(self.assertNotIdentical, None) - d.addCallback(self.assertEqual, icon) - return d + result = await self.icons.fetch('deluge-torrent.org') + assert result == icon + assert not os.path.isfile(mock_mkstemp[1]) - def test_get_google_ico(self): + @pytest_twisted.ensureDeferred + async def test_get_google_ico(self): # Google doesn't have any icon links # So instead we'll grab its favicon.ico icon = TrackerIcon(common.get_test_data_file('google.ico')) - d = self.icons.fetch('www.google.com') - d.addCallback(self.assertNotIdentical, None) - d.addCallback(self.assertEqual, icon) - return d + result = await self.icons.fetch('www.google.com') + assert result == icon - def test_get_google_ico_with_redirect(self): + @pytest_twisted.ensureDeferred + async def test_get_google_ico_hebrew(self): + """Test that Google.co.il page is read as UTF-8""" + icon = TrackerIcon(common.get_test_data_file('google.ico')) + result = await self.icons.fetch('www.google.co.il') + assert result == icon + + @pytest_twisted.ensureDeferred + async def test_get_google_ico_with_redirect(self): # google.com redirects to www.google.com icon = TrackerIcon(common.get_test_data_file('google.ico')) - d = self.icons.fetch('google.com') - d.addCallback(self.assertNotIdentical, None) - d.addCallback(self.assertEqual, icon) - return d + result = await self.icons.fetch('google.com') + assert result == icon - def test_get_seo_ico_with_sni(self): + @pytest.mark.skip(reason='Site removed favicon, new SNI test will be needed') + @pytest_twisted.ensureDeferred + async def test_get_seo_svg_with_sni(self): # seo using certificates with SNI support only - raise SkipTest('Site certificate expired') - icon = TrackerIcon(common.get_test_data_file('seo.ico')) - d = self.icons.fetch('www.seo.com') - d.addCallback(self.assertNotIdentical, None) - d.addCallback(self.assertEqual, icon) - return d + icon = TrackerIcon(common.get_test_data_file('seo.svg')) + result = await self.icons.fetch('www.seo.com') + assert result == icon + + @pytest_twisted.ensureDeferred + async def test_get_empty_string_tracker(self): + result = await self.icons.fetch('') + assert result is None - def test_get_empty_string_tracker(self): - d = self.icons.fetch('') - d.addCallback(self.assertIdentical, None) - return d + @pytest_twisted.ensureDeferred + async def test_invalid_host(self, mock_mkstemp): + """Test that TrackerIcon can handle invalid hostname""" + result = await self.icons.fetch('deluge.example.com') + assert not result + assert not os.path.isfile(mock_mkstemp[1]) diff --git a/deluge/tests/test_transfer.py b/deluge/tests/test_transfer.py index a04830325..92e349b5d 100644 --- a/deluge/tests/test_transfer.py +++ b/deluge/tests/test_transfer.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2012 Bro <bro.development@gmail.com> # @@ -7,12 +6,10 @@ # See LICENSE for more details. # -from __future__ import print_function, unicode_literals - import base64 +import pytest import rencode -from twisted.trial import unittest import deluge.log from deluge.transfer import DelugeTransferProtocol @@ -112,8 +109,9 @@ class TransferTestClass(DelugeTransferProtocol): self.message_received(request) -class DelugeTransferProtocolTestCase(unittest.TestCase): - def setUp(self): # NOQA: N803 +class TestDelugeTransferProtocol: + @pytest.fixture(autouse=True) + def set_up(self): """ The expected messages corresponds to the test messages (msg1, msg2) after they've been processed by DelugeTransferProtocol.send, which means that they've first been encoded with rencode, @@ -160,7 +158,7 @@ class DelugeTransferProtocolTestCase(unittest.TestCase): # Get the data as sent by DelugeTransferProtocol messages = self.transfer.get_messages_out_joined() base64_encoded = base64.b64encode(messages) - self.assertEqual(base64_encoded, self.msg1_expected_compressed_base64) + assert base64_encoded == self.msg1_expected_compressed_base64 def test_receive_one_message(self): """ @@ -173,7 +171,7 @@ class DelugeTransferProtocolTestCase(unittest.TestCase): ) # Get the data as sent by DelugeTransferProtocol messages = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(messages)) + assert rencode.dumps(self.msg1) == rencode.dumps(messages) def test_receive_old_message(self): """ @@ -181,9 +179,9 @@ class DelugeTransferProtocolTestCase(unittest.TestCase): """ self.transfer.dataReceived(rencode.dumps(self.msg1)) - self.assertEqual(len(self.transfer.get_messages_in()), 0) - self.assertEqual(self.transfer._message_length, 0) - self.assertEqual(len(self.transfer._buffer), 0) + assert len(self.transfer.get_messages_in()) == 0 + assert self.transfer._message_length == 0 + assert len(self.transfer._buffer) == 0 def test_receive_two_concatenated_messages(self): """ @@ -198,9 +196,9 @@ class DelugeTransferProtocolTestCase(unittest.TestCase): # Get the data as sent by DelugeTransferProtocol message1 = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1)) + assert rencode.dumps(self.msg1) == rencode.dumps(message1) message2 = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2)) + assert rencode.dumps(self.msg2) == rencode.dumps(message2) def test_receive_three_messages_in_parts(self): """ @@ -237,20 +235,17 @@ class DelugeTransferProtocolTestCase(unittest.TestCase): else: expected_msgs_received_count = 0 # Verify that the expected number of complete messages has arrived - self.assertEqual( - expected_msgs_received_count, len(self.transfer.get_messages_in()) - ) + assert expected_msgs_received_count == len(self.transfer.get_messages_in()) # Get the data as received by DelugeTransferProtocol message1 = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1)) + assert rencode.dumps(self.msg1) == rencode.dumps(message1) message2 = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2)) + assert rencode.dumps(self.msg2) == rencode.dumps(message2) message3 = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message3)) + assert rencode.dumps(self.msg1) == rencode.dumps(message3) # Remove underscore to enable test, or run the test directly: - # tests $ trial test_transfer.DelugeTransferProtocolTestCase._test_rencode_fail_protocol def _test_rencode_fail_protocol(self): """ This test tries to test the protocol that relies on errors from rencode. @@ -317,11 +312,11 @@ class DelugeTransferProtocolTestCase(unittest.TestCase): # Get the data as received by DelugeTransferProtocol message1 = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1)) + assert rencode.dumps(self.msg1) == rencode.dumps(message1) message2 = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2)) + assert rencode.dumps(self.msg2) == rencode.dumps(message2) message3 = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message3)) + assert rencode.dumps(self.msg1) == rencode.dumps(message3) def test_receive_middle_of_header(self): """ @@ -344,19 +339,19 @@ class DelugeTransferProtocolTestCase(unittest.TestCase): self.transfer.dataReceived(two_concatenated[: first_len + 2]) # Should be 1 message in the list - self.assertEqual(1, len(self.transfer.get_messages_in())) + assert 1 == len(self.transfer.get_messages_in()) # Send the rest self.transfer.dataReceived(two_concatenated[first_len + 2 :]) # Should be 2 messages in the list - self.assertEqual(2, len(self.transfer.get_messages_in())) + assert 2 == len(self.transfer.get_messages_in()) # Get the data as sent by DelugeTransferProtocol message1 = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg1), rencode.dumps(message1)) + assert rencode.dumps(self.msg1) == rencode.dumps(message1) message2 = self.transfer.get_messages_in().pop(0) - self.assertEqual(rencode.dumps(self.msg2), rencode.dumps(message2)) + assert rencode.dumps(self.msg2) == rencode.dumps(message2) # Needs file containing big data structure e.g. like thetorrent list as it is transfered by the daemon # def test_simulate_big_transfer(self): diff --git a/deluge/tests/test_ui_common.py b/deluge/tests/test_ui_common.py index b0c311183..ee97259de 100644 --- a/deluge/tests/test_ui_common.py +++ b/deluge/tests/test_ui_common.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> # @@ -6,30 +5,19 @@ # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals -from six import assertCountEqual -from twisted.trial import unittest - -from deluge.common import windows_check from deluge.ui.common import TorrentInfo from . import common -class UICommonTestCase(unittest.TestCase): - def setUp(self): # NOQA: N803 - pass - - def tearDown(self): # NOQA: N803 - pass - +class TestUICommon: def test_hash_optional_single_file(self): """Ensure single file with `ed2k` and `sha1` keys are not in filetree output.""" filename = common.get_test_data_file('test.torrent') files_tree = {'azcvsupdater_2.6.2.jar': (0, 307949, True)} ti = TorrentInfo(filename, filetree=1) - self.assertEqual(ti.files_tree, files_tree) + assert ti.files_tree == files_tree files_tree2 = { 'contents': { @@ -42,7 +30,7 @@ class UICommonTestCase(unittest.TestCase): } } ti = TorrentInfo(filename, filetree=2) - self.assertEqual(ti.files_tree, files_tree2) + assert ti.files_tree == files_tree2 def test_hash_optional_multi_file(self): """Ensure multi-file with `filehash` and `ed2k` are keys not in filetree output.""" @@ -54,7 +42,7 @@ class UICommonTestCase(unittest.TestCase): } } ti = TorrentInfo(filename, filetree=1) - self.assertEqual(ti.files_tree, files_tree) + assert ti.files_tree == files_tree filestree2 = { 'contents': { @@ -83,14 +71,14 @@ class UICommonTestCase(unittest.TestCase): 'type': 'dir', } ti = TorrentInfo(filename, filetree=2) - self.assertEqual(ti.files_tree, filestree2) + assert ti.files_tree == filestree2 def test_hash_optional_md5sum(self): # Ensure `md5sum` key is not included in filetree output filename = common.get_test_data_file('md5sum.torrent') files_tree = {'test': {'lol': (0, 4, True), 'rofl': (1, 5, True)}} ti = TorrentInfo(filename, filetree=1) - self.assertEqual(ti.files_tree, files_tree) + assert ti.files_tree == files_tree ti = TorrentInfo(filename, filetree=2) files_tree2 = { 'contents': { @@ -118,16 +106,14 @@ class UICommonTestCase(unittest.TestCase): }, 'type': 'dir', } - self.assertEqual(ti.files_tree, files_tree2) + assert ti.files_tree == files_tree2 def test_utf8_encoded_paths(self): filename = common.get_test_data_file('test.torrent') ti = TorrentInfo(filename) - self.assertTrue('azcvsupdater_2.6.2.jar' in ti.files_tree) + assert 'azcvsupdater_2.6.2.jar' in ti.files_tree def test_utf8_encoded_paths2(self): - if windows_check(): - raise unittest.SkipTest('on windows KeyError: unicode_filenames') filename = common.get_test_data_file('unicode_filenames.torrent') filepath1 = '\u30c6\u30af\u30b9\u30fb\u30c6\u30af\u30b5\u30f3.mkv' filepath2 = ( @@ -140,11 +126,11 @@ class UICommonTestCase(unittest.TestCase): ti = TorrentInfo(filename) files_tree = ti.files_tree['unicode_filenames'] - self.assertIn(filepath1, files_tree) - self.assertIn(filepath2, files_tree) - self.assertIn(filepath3, files_tree) - self.assertIn(filepath4, files_tree) - self.assertIn(filepath5, files_tree) + assert filepath1 in files_tree + assert filepath2 in files_tree + assert filepath3 in files_tree + assert filepath4 in files_tree + assert filepath5 in files_tree result_files = [ { @@ -170,4 +156,4 @@ class UICommonTestCase(unittest.TestCase): {'download': True, 'path': 'unicode_filenames/' + filepath1, 'size': 1771}, ] - assertCountEqual(self, ti.files, result_files) + assert len(ti.files) == len(result_files) diff --git a/deluge/tests/test_ui_console.py b/deluge/tests/test_ui_console.py index 3667c608e..34398ee19 100644 --- a/deluge/tests/test_ui_console.py +++ b/deluge/tests/test_ui_console.py @@ -1,35 +1,30 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - import argparse +import pytest + from deluge.ui.console.cmdline.commands.add import Command from deluge.ui.console.cmdline.commands.config import json_eval from deluge.ui.console.widgets.fields import TextInput -from .basetest import BaseTestCase - -class MockParent(object): +class MockParent: def __init__(self): self.border_off_x = 1 self.pane_width = 20 self.encoding = 'utf8' -class UIConsoleFieldTestCase(BaseTestCase): - def setUp(self): # NOQA: N803 +class TestUIConsoleField: + @pytest.fixture(autouse=True) + def set_up(self): self.parent = MockParent() - def tearDown(self): # NOQA: N803 - pass - def test_text_input(self): def move_func(self, r, c): self._cursor_row = r @@ -44,48 +39,42 @@ class UIConsoleFieldTestCase(BaseTestCase): '/text/field/file/path', complete=False, ) - self.assertTrue(t) - self.assertTrue(t.handle_read(33)) - - -class UIConsoleCommandsTestCase(BaseTestCase): - def setUp(self): - pass + assert t + assert t.handle_read(33) - def tearDown(self): - pass +class TestUIConsoleCommands: def test_add_move_completed(self): completed_path = 'completed_path' parser = argparse.ArgumentParser() cmd = Command() cmd.add_arguments(parser) args = parser.parse_args(['torrent', '-m', completed_path]) - self.assertEqual(args.move_completed_path, completed_path) + assert args.move_completed_path == completed_path args = parser.parse_args(['torrent', '--move-path', completed_path]) - self.assertEqual(args.move_completed_path, completed_path) + assert args.move_completed_path == completed_path def test_config_json_eval(self): - self.assertEqual(json_eval('/downloads'), '/downloads') - self.assertEqual(json_eval('/dir/with space'), '/dir/with space') - self.assertEqual(json_eval('c:\\\\downloads'), 'c:\\\\downloads') - self.assertEqual(json_eval('c:/downloads'), 'c:/downloads') + assert json_eval('/downloads') == '/downloads' + assert json_eval('/dir/with space') == '/dir/with space' + assert json_eval('c:\\\\downloads') == 'c:\\\\downloads' + assert json_eval('c:/downloads') == 'c:/downloads' # Ensure newlines are split and only first setting is used. - self.assertEqual(json_eval('setting\nwithneline'), 'setting') + assert json_eval('setting\nwithneline') == 'setting' # Allow both parentheses and square brackets. - self.assertEqual(json_eval('(8000, 8001)'), [8000, 8001]) - self.assertEqual(json_eval('[8000, 8001]'), [8000, 8001]) - self.assertEqual(json_eval('["abc", "def"]'), ['abc', 'def']) - self.assertEqual(json_eval('{"foo": "bar"}'), {'foo': 'bar'}) - self.assertEqual(json_eval('{"number": 1234}'), {'number': 1234}) + assert json_eval('(8000, 8001)') == [8000, 8001] + assert json_eval('[8000, 8001]') == [8000, 8001] + assert json_eval('["abc", "def"]') == ['abc', 'def'] + assert json_eval('{"foo": "bar"}') == {'foo': 'bar'} + assert json_eval('{"number": 1234}') == {'number': 1234} # Hex string for peer_tos. - self.assertEqual(json_eval('0x00'), '0x00') - self.assertEqual(json_eval('1000'), 1000) - self.assertEqual(json_eval('-6'), -6) - self.assertEqual(json_eval('10.5'), 10.5) - self.assertEqual(json_eval('True'), True) - self.assertEqual(json_eval('false'), False) - self.assertEqual(json_eval('none'), None) + assert json_eval('0x00') == '0x00' + assert json_eval('1000') == 1000 + assert json_eval('-6') == -6 + assert json_eval('10.5') == 10.5 + assert json_eval('True') + assert not json_eval('false') + assert json_eval('none') is None # Empty values to clear config key. - self.assertEqual(json_eval('[]'), []) - self.assertEqual(json_eval(''), '') + assert json_eval('[]') == [] + assert json_eval('') == '' diff --git a/deluge/tests/test_ui_entry.py b/deluge/tests/test_ui_entry.py index f85bc7d7d..0546ad7b8 100644 --- a/deluge/tests/test_ui_entry.py +++ b/deluge/tests/test_ui_entry.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> # @@ -7,14 +6,13 @@ # See LICENSE for more details. # -from __future__ import print_function, unicode_literals - import argparse import sys from io import StringIO +from unittest import mock -import mock import pytest +import pytest_twisted from twisted.internet import defer import deluge @@ -23,12 +21,12 @@ import deluge.ui.console import deluge.ui.console.cmdline.commands.quit import deluge.ui.console.main import deluge.ui.web.server -from deluge.common import PY2, get_localhost_auth, windows_check +from deluge.common import get_localhost_auth, windows_check +from deluge.conftest import BaseTestCase from deluge.ui import ui_entry from deluge.ui.web.server import DelugeWeb from . import common -from .basetest import BaseTestCase from .daemon_base import DaemonBase DEBUG_COMMAND = False @@ -41,7 +39,7 @@ sys_stdout = sys.stdout # To print to terminal from the tests, use: print('Message...', file=sys_stdout) -class StringFileDescriptor(object): +class StringFileDescriptor: """File descriptor that writes to string buffer""" def __init__(self, fd): @@ -51,22 +49,15 @@ class StringFileDescriptor(object): setattr(self, a, getattr(sys_stdout, a)) def write(self, *data, **kwargs): - # io.StringIO requires unicode strings. data_string = str(*data) - if PY2: - data_string = data_string.decode() print(data_string, file=self.out, end='') def flush(self): self.out.flush() -class UIBaseTestCase(object): - def __init__(self): - self.var = {} - +class UIBaseTestCase: def set_up(self): - common.set_tmp_config_dir() common.setup_test_logger(level='info', prefix=self.id()) return component.start() @@ -82,28 +73,14 @@ class UIBaseTestCase(object): class UIWithDaemonBaseTestCase(UIBaseTestCase, DaemonBase): """Subclass for test that require a deluged daemon""" - def __init__(self): - UIBaseTestCase.__init__(self) - def set_up(self): d = self.common_set_up() common.setup_test_logger(level='info', prefix=self.id()) - d.addCallback(self.start_core) - return d - - def tear_down(self): - d = UIBaseTestCase.tear_down(self) - d.addCallback(self.terminate_core) return d -class DelugeEntryTestCase(BaseTestCase): - - if windows_check(): - skip = 'Console ui test on Windows broken due to sys args issue' - +class TestDelugeEntry(BaseTestCase): def set_up(self): - common.set_tmp_config_dir() return component.start() def tear_down(self): @@ -119,10 +96,11 @@ class DelugeEntryTestCase(BaseTestCase): self.patch(argparse._sys, 'stdout', fd) with mock.patch('deluge.ui.console.main.ConsoleUI'): - self.assertRaises(SystemExit, ui_entry.start_ui) - self.assertTrue('usage: deluge' in fd.out.getvalue()) - self.assertTrue('UI Options:' in fd.out.getvalue()) - self.assertTrue('* console' in fd.out.getvalue()) + with pytest.raises(SystemExit): + ui_entry.start_ui() + assert 'usage: deluge' in fd.out.getvalue() + assert 'UI Options:' in fd.out.getvalue() + assert '* console' in fd.out.getvalue() def test_start_default(self): self.patch(sys, 'argv', ['./deluge']) @@ -157,7 +135,7 @@ class DelugeEntryTestCase(BaseTestCase): # Just test that no exception is raised ui_entry.start_ui() - self.assertEqual(_level[0], 'info') + assert _level[0] == 'info' class GtkUIBaseTestCase(UIBaseTestCase): @@ -173,38 +151,27 @@ class GtkUIBaseTestCase(UIBaseTestCase): @pytest.mark.gtkui -class GtkUIDelugeScriptEntryTestCase(BaseTestCase, GtkUIBaseTestCase): - def __init__(self, testname): - super(GtkUIDelugeScriptEntryTestCase, self).__init__(testname) - GtkUIBaseTestCase.__init__(self) - - self.var['cmd_name'] = 'deluge gtk' - self.var['start_cmd'] = ui_entry.start_ui - self.var['sys_arg_cmd'] = ['./deluge', 'gtk'] - - def set_up(self): - return GtkUIBaseTestCase.set_up(self) - - def tear_down(self): - return GtkUIBaseTestCase.tear_down(self) +class TestGtkUIDelugeScriptEntry(BaseTestCase, GtkUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge gtk', + 'start_cmd': ui_entry.start_ui, + 'sys_arg_cmd': ['./deluge', 'gtk'], + } @pytest.mark.gtkui -class GtkUIScriptEntryTestCase(BaseTestCase, GtkUIBaseTestCase): - def __init__(self, testname): - super(GtkUIScriptEntryTestCase, self).__init__(testname) - GtkUIBaseTestCase.__init__(self) +class TestGtkUIScriptEntry(BaseTestCase, GtkUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): from deluge.ui import gtk3 - self.var['cmd_name'] = 'deluge-gtk' - self.var['start_cmd'] = gtk3.start - self.var['sys_arg_cmd'] = ['./deluge-gtk'] - - def set_up(self): - return GtkUIBaseTestCase.set_up(self) - - def tear_down(self): - return GtkUIBaseTestCase.tear_down(self) + request.cls.var = { + 'cmd_name': 'deluge-gtk', + 'start_cmd': gtk3.start, + 'sys_arg_cmd': ['./deluge-gtk'], + } class DelugeWebMock(DelugeWeb): @@ -242,45 +209,31 @@ class WebUIBaseTestCase(UIBaseTestCase): self.patch(deluge.ui.web.server, 'DelugeWeb', DelugeWebMock) self.exec_command() - self.assertEqual(_level[0], 'info') - - -class WebUIScriptEntryTestCase(BaseTestCase, WebUIBaseTestCase): - - if windows_check(): - skip = 'Console ui test on Windows broken due to sys args issue' - - def __init__(self, testname): - super(WebUIScriptEntryTestCase, self).__init__(testname) - WebUIBaseTestCase.__init__(self) - self.var['cmd_name'] = 'deluge-web' - self.var['start_cmd'] = deluge.ui.web.start - self.var['sys_arg_cmd'] = ['./deluge-web', '--do-not-daemonize'] - - def set_up(self): - return WebUIBaseTestCase.set_up(self) - - def tear_down(self): - return WebUIBaseTestCase.tear_down(self) - + assert _level[0] == 'info' -class WebUIDelugeScriptEntryTestCase(BaseTestCase, WebUIBaseTestCase): - if windows_check(): - skip = 'Console ui test on Windows broken due to sys args issue' +class TestWebUIScriptEntry(BaseTestCase, WebUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge-web', + 'start_cmd': deluge.ui.web.start, + 'sys_arg_cmd': ['./deluge-web'], + } + if not windows_check(): + request.cls.var['sys_arg_cmd'].append('--do-not-daemonize') - def __init__(self, testname): - super(WebUIDelugeScriptEntryTestCase, self).__init__(testname) - WebUIBaseTestCase.__init__(self) - self.var['cmd_name'] = 'deluge web' - self.var['start_cmd'] = ui_entry.start_ui - self.var['sys_arg_cmd'] = ['./deluge', 'web', '--do-not-daemonize'] - - def set_up(self): - return WebUIBaseTestCase.set_up(self) - def tear_down(self): - return WebUIBaseTestCase.tear_down(self) +class TestWebUIDelugeScriptEntry(BaseTestCase, WebUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge web', + 'start_cmd': ui_entry.start_ui, + 'sys_arg_cmd': ['./deluge', 'web'], + } + if not windows_check(): + request.cls.var['sys_arg_cmd'].append('--do-not-daemonize') class ConsoleUIBaseTestCase(UIBaseTestCase): @@ -291,7 +244,7 @@ class ConsoleUIBaseTestCase(UIBaseTestCase): with mock.patch('deluge.ui.console.main.ConsoleUI'): self.exec_command() - def test_start_console_with_log_level(self): + def test_start_console_with_log_level(self, request): _level = [] def setup_logger( @@ -314,7 +267,7 @@ class ConsoleUIBaseTestCase(UIBaseTestCase): # Just test that no exception is raised self.exec_command() - self.assertEqual(_level[0], 'info') + assert _level[0] == 'info' def test_console_help(self): self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['-h']) @@ -322,18 +275,19 @@ class ConsoleUIBaseTestCase(UIBaseTestCase): self.patch(argparse._sys, 'stdout', fd) with mock.patch('deluge.ui.console.main.ConsoleUI'): - self.assertRaises(SystemExit, self.exec_command) + with pytest.raises(SystemExit): + self.exec_command() std_output = fd.out.getvalue() - self.assertTrue( - ('usage: %s' % self.var['cmd_name']) in std_output - ) # Check command name - self.assertTrue('Common Options:' in std_output) - self.assertTrue('Console Options:' in std_output) - self.assertIn( - 'Console Commands:\n The following console commands are available:', - std_output, + assert ( + 'usage: %s' % self.var['cmd_name'] + ) in std_output # Check command name + assert 'Common Options:' in std_output + assert 'Console Options:' in std_output + assert ( + 'Console Commands:\n The following console commands are available:' + in std_output ) - self.assertIn('The following console commands are available:', std_output) + assert 'The following console commands are available:' in std_output def test_console_command_info(self): self.patch(sys, 'argv', self.var['sys_arg_cmd'] + ['info']) @@ -349,10 +303,11 @@ class ConsoleUIBaseTestCase(UIBaseTestCase): self.patch(argparse._sys, 'stdout', fd) with mock.patch('deluge.ui.console.main.ConsoleUI'): - self.assertRaises(SystemExit, self.exec_command) + with pytest.raises(SystemExit): + self.exec_command() std_output = fd.out.getvalue() - self.assertIn('usage: info', std_output) - self.assertIn('Show information about the torrents', std_output) + assert 'usage: info' in std_output + assert 'Show information about the torrents' in std_output def test_console_unrecognized_arguments(self): self.patch( @@ -361,8 +316,9 @@ class ConsoleUIBaseTestCase(UIBaseTestCase): fd = StringFileDescriptor(sys.stdout) self.patch(argparse._sys, 'stderr', fd) with mock.patch('deluge.ui.console.main.ConsoleUI'): - self.assertRaises(SystemExit, self.exec_command) - self.assertIn('unrecognized arguments: --ui', fd.out.getvalue()) + with pytest.raises(SystemExit): + self.exec_command() + assert 'unrecognized arguments: --ui' in fd.out.getvalue() class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase): @@ -390,26 +346,28 @@ class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase): + command, ) - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_console_command_add(self): filename = common.get_test_data_file('test.torrent') - self.patch_arg_command(['add ' + filename]) + self.patch_arg_command([f'add "{filename}"']) fd = StringFileDescriptor(sys.stdout) self.patch(sys, 'stdout', fd) yield self.exec_command() std_output = fd.out.getvalue() - self.assertEqual( - std_output, 'Attempting to add torrent: ' + filename + '\nTorrent added!\n' + assert ( + std_output + == 'Attempting to add torrent: ' + filename + '\nTorrent added!\n' ) - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_console_command_add_move_completed(self): filename = common.get_test_data_file('test.torrent') + tmp_path = 'c:\\tmp' if windows_check() else '/tmp' self.patch_arg_command( [ - 'add --move-path /tmp ' + filename + ' ; status' + f'add --move-path "{tmp_path}" "{filename}" ; status' ' ; manage' ' ab570cdd5a17ea1b61e970bb72047de141bce173' ' move_completed' @@ -422,22 +380,23 @@ class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase): yield self.exec_command() std_output = fd.out.getvalue() - self.assertTrue( - std_output.endswith('move_completed: True\nmove_completed_path: /tmp\n') - or std_output.endswith('move_completed_path: /tmp\nmove_completed: True\n') + assert std_output.endswith( + f'move_completed: True\nmove_completed_path: {tmp_path}\n' + ) or std_output.endswith( + f'move_completed_path: {tmp_path}\nmove_completed: True\n' ) - @defer.inlineCallbacks - def test_console_command_status(self): + @pytest_twisted.ensureDeferred + async def test_console_command_status(self): fd = StringFileDescriptor(sys.stdout) self.patch_arg_command(['status']) self.patch(sys, 'stdout', fd) - yield self.exec_command() + await self.exec_command() std_output = fd.out.getvalue() - self.assertTrue(std_output.startswith('Total upload: ')) - self.assertTrue(std_output.endswith(' Moving: 0\n')) + assert std_output.startswith('Total upload: ') + assert std_output.endswith(' Moving: 0\n') @defer.inlineCallbacks def test_console_command_config_set_download_location(self): @@ -447,79 +406,36 @@ class ConsoleUIWithDaemonBaseTestCase(UIWithDaemonBaseTestCase): yield self.exec_command() std_output = fd.out.getvalue() - self.assertTrue( - std_output.startswith( - 'Setting "download_location" to: {}\'/downloads\''.format( - 'u' if PY2 else '' - ) - ) - ) - self.assertTrue( - std_output.endswith('Configuration value successfully updated.\n') - ) - - -class ConsoleScriptEntryWithDaemonTestCase( - BaseTestCase, ConsoleUIWithDaemonBaseTestCase -): - - if windows_check(): - skip = 'Console ui test on Windows broken due to sys args issue' - - def __init__(self, testname): - super(ConsoleScriptEntryWithDaemonTestCase, self).__init__(testname) - ConsoleUIWithDaemonBaseTestCase.__init__(self) - self.var['cmd_name'] = 'deluge-console' - self.var['sys_arg_cmd'] = ['./deluge-console'] - - def set_up(self): - from deluge.ui.console.console import Console - - def start_console(): - return Console().start() - - self.patch(deluge.ui.console, 'start', start_console) - self.var['start_cmd'] = deluge.ui.console.start - - return ConsoleUIWithDaemonBaseTestCase.set_up(self) - - def tear_down(self): - return ConsoleUIWithDaemonBaseTestCase.tear_down(self) - - -class ConsoleScriptEntryTestCase(BaseTestCase, ConsoleUIBaseTestCase): - - if windows_check(): - skip = 'Console ui test on Windows broken due to sys args issue' - - def __init__(self, testname): - super(ConsoleScriptEntryTestCase, self).__init__(testname) - ConsoleUIBaseTestCase.__init__(self) - self.var['cmd_name'] = 'deluge-console' - self.var['start_cmd'] = deluge.ui.console.start - self.var['sys_arg_cmd'] = ['./deluge-console'] - - def set_up(self): - return ConsoleUIBaseTestCase.set_up(self) - - def tear_down(self): - return ConsoleUIBaseTestCase.tear_down(self) - - -class ConsoleDelugeScriptEntryTestCase(BaseTestCase, ConsoleUIBaseTestCase): - - if windows_check(): - skip = 'cannot test console ui on windows' - - def __init__(self, testname): - super(ConsoleDelugeScriptEntryTestCase, self).__init__(testname) - ConsoleUIBaseTestCase.__init__(self) - self.var['cmd_name'] = 'deluge console' - self.var['start_cmd'] = ui_entry.start_ui - self.var['sys_arg_cmd'] = ['./deluge', 'console'] - - def set_up(self): - return ConsoleUIBaseTestCase.set_up(self) - - def tear_down(self): - return ConsoleUIBaseTestCase.tear_down(self) + assert std_output.startswith('Setting "download_location" to: \'/downloads\'') + assert std_output.endswith('Configuration value successfully updated.\n') + + +@pytest.mark.usefixtures('daemon', 'client') +class TestConsoleScriptEntryWithDaemon(BaseTestCase, ConsoleUIWithDaemonBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge-console', + 'start_cmd': deluge.ui.console.start, + 'sys_arg_cmd': ['./deluge-console'], + } + + +class TestConsoleScriptEntry(BaseTestCase, ConsoleUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge-console', + 'start_cmd': deluge.ui.console.start, + 'sys_arg_cmd': ['./deluge-console'], + } + + +class TestConsoleDelugeScriptEntry(BaseTestCase, ConsoleUIBaseTestCase): + @pytest.fixture(autouse=True) + def set_var(self, request): + request.cls.var = { + 'cmd_name': 'deluge console', + 'start_cmd': ui_entry.start_ui, + 'sys_arg_cmd': ['./deluge', 'console'], + } diff --git a/deluge/tests/test_ui_gtk3.py b/deluge/tests/test_ui_gtk3.py index a208bb494..e6d025c7c 100644 --- a/deluge/tests/test_ui_gtk3.py +++ b/deluge/tests/test_ui_gtk3.py @@ -1,21 +1,17 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals - import sys +from unittest import mock -import mock import pytest -from twisted.trial import unittest @pytest.mark.gtkui -class GTK3CommonTestCase(unittest.TestCase): +class TestGTK3Common: def setUp(self): sys.modules['gi.repository'] = mock.MagicMock() @@ -25,10 +21,10 @@ class GTK3CommonTestCase(unittest.TestCase): def test_cmp(self): from deluge.ui.gtk3.common import cmp - self.assertEqual(cmp(None, None), 0) - self.assertEqual(cmp(1, None), 1) - self.assertEqual(cmp(0, None), 1) - self.assertEqual(cmp(None, 7), -1) - self.assertEqual(cmp(None, 'bar'), -1) - self.assertEqual(cmp('foo', None), 1) - self.assertEqual(cmp('', None), 1) + assert cmp(None, None) == 0 + assert cmp(1, None) == 1 + assert cmp(0, None) == 1 + assert cmp(None, 7) == -1 + assert cmp(None, 'bar') == -1 + assert cmp('foo', None) == 1 + assert cmp('', None) == 1 diff --git a/deluge/tests/test_web_api.py b/deluge/tests/test_web_api.py index 0180e0bda..56f86aa56 100644 --- a/deluge/tests/test_web_api.py +++ b/deluge/tests/test_web_api.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> # @@ -7,19 +6,17 @@ # See LICENSE for more details. # -from __future__ import unicode_literals - import json from io import BytesIO +import pytest +import pytest_twisted from twisted.internet import defer, reactor -from twisted.python.failure import Failure from twisted.web.client import Agent, FileBodyProducer from twisted.web.http_headers import Headers from twisted.web.static import File import deluge.component as component -from deluge.ui.client import client from . import common from .common_web import WebServerTestBase @@ -27,20 +24,19 @@ from .common_web import WebServerTestBase common.disable_new_release_check() -class WebAPITestCase(WebServerTestBase): - def test_connect_invalid_host(self): - d = self.deluge_web.web_api.connect('id') - d.addCallback(self.fail) - d.addErrback(self.assertIsInstance, Failure) - return d +class TestWebAPI(WebServerTestBase): + @pytest.mark.xfail(reason='This just logs an error at the moment.') + @pytest_twisted.ensureDeferred + async def test_connect_invalid_host(self): + with pytest.raises(Exception): + await self.deluge_web.web_api.connect('id') - def test_connect(self): + def test_connect(self, client): d = self.deluge_web.web_api.connect(self.host_id) def on_connect(result): - self.assertEqual(type(result), tuple) - self.assertTrue(len(result) > 0) - self.addCleanup(client.disconnect) + assert type(result) == tuple + assert len(result) > 0 return result d.addCallback(on_connect) @@ -52,9 +48,9 @@ class WebAPITestCase(WebServerTestBase): @defer.inlineCallbacks def on_connect(result): - self.assertTrue(self.deluge_web.web_api.connected()) + assert self.deluge_web.web_api.connected() yield self.deluge_web.web_api.disconnect() - self.assertFalse(self.deluge_web.web_api.connected()) + assert not self.deluge_web.web_api.connected() d.addCallback(on_connect) d.addErrback(self.fail) @@ -62,7 +58,7 @@ class WebAPITestCase(WebServerTestBase): def test_get_config(self): config = self.deluge_web.web_api.get_config() - self.assertEqual(self.webserver_listen_port, config['port']) + assert self.webserver_listen_port == config['port'] def test_set_config(self): config = self.deluge_web.web_api.get_config() @@ -77,9 +73,9 @@ class WebAPITestCase(WebServerTestBase): } self.deluge_web.web_api.set_config(config) web_config = component.get('DelugeWeb').config.config - self.assertNotEquals(config['pwd_salt'], web_config['pwd_salt']) - self.assertNotEquals(config['pwd_sha1'], web_config['pwd_sha1']) - self.assertNotEquals(config['sessions'], web_config['sessions']) + assert config['pwd_salt'] != web_config['pwd_salt'] + assert config['pwd_sha1'] != web_config['pwd_sha1'] + assert config['sessions'] != web_config['sessions'] @defer.inlineCallbacks def get_host_status(self): @@ -87,49 +83,49 @@ class WebAPITestCase(WebServerTestBase): host[3] = 'Online' host[4] = '2.0.0.dev562' status = yield self.deluge_web.web_api.get_host_status(self.host_id) - self.assertEqual(status, tuple(status)) + assert status == tuple(status) def test_get_host(self): - self.assertFalse(self.deluge_web.web_api._get_host('invalid_id')) + assert not self.deluge_web.web_api._get_host('invalid_id') conn = list(self.deluge_web.web_api.hostlist.get_hosts_info()[0]) - self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4]) + assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4] def test_add_host(self): conn = ['abcdef', '10.0.0.1', 0, 'user123', 'pass123'] - self.assertFalse(self.deluge_web.web_api._get_host(conn[0])) + assert not self.deluge_web.web_api._get_host(conn[0]) # Add valid host result, host_id = self.deluge_web.web_api.add_host( conn[1], conn[2], conn[3], conn[4] ) - self.assertEqual(result, True) + assert result conn[0] = host_id - self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4]) + assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4] # Add already existing host ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4]) - self.assertEqual(ret, (False, 'Host details already in hostlist')) + assert ret == (False, 'Host details already in hostlist') # Add invalid port conn[2] = 'bad port' ret = self.deluge_web.web_api.add_host(conn[1], conn[2], conn[3], conn[4]) - self.assertEqual(ret, (False, 'Invalid port. Must be an integer')) + assert ret == (False, 'Invalid port. Must be an integer') def test_remove_host(self): conn = ['connection_id', '', 0, '', ''] self.deluge_web.web_api.hostlist.config['hosts'].append(conn) - self.assertEqual(self.deluge_web.web_api._get_host(conn[0]), conn[0:4]) + assert self.deluge_web.web_api._get_host(conn[0]) == conn[0:4] # Remove valid host - self.assertTrue(self.deluge_web.web_api.remove_host(conn[0])) - self.assertFalse(self.deluge_web.web_api._get_host(conn[0])) + assert self.deluge_web.web_api.remove_host(conn[0]) + assert not self.deluge_web.web_api._get_host(conn[0]) # Remove non-existing host - self.assertFalse(self.deluge_web.web_api.remove_host(conn[0])) + assert not self.deluge_web.web_api.remove_host(conn[0]) def test_get_torrent_info(self): filename = common.get_test_data_file('test.torrent') ret = self.deluge_web.web_api.get_torrent_info(filename) - self.assertEqual(ret['name'], 'azcvsupdater_2.6.2.jar') - self.assertEqual(ret['info_hash'], 'ab570cdd5a17ea1b61e970bb72047de141bce173') - self.assertTrue('files_tree' in ret) + assert ret['name'] == 'azcvsupdater_2.6.2.jar' + assert ret['info_hash'] == 'ab570cdd5a17ea1b61e970bb72047de141bce173' + assert 'files_tree' in ret def test_get_torrent_info_with_md5(self): filename = common.get_test_data_file('md5sum.torrent') @@ -137,19 +133,19 @@ class WebAPITestCase(WebServerTestBase): # JSON dumping happens during response creation in normal usage # JSON serialization may fail if any of the dictionary items are byte arrays rather than strings ret = json.loads(json.dumps(ret)) - self.assertEqual(ret['name'], 'test') - self.assertEqual(ret['info_hash'], 'f6408ba9944cf9fe01b547b28f336b3ee6ec32c5') - self.assertTrue('files_tree' in ret) + assert ret['name'] == 'test' + assert ret['info_hash'] == 'f6408ba9944cf9fe01b547b28f336b3ee6ec32c5' + assert 'files_tree' in ret def test_get_magnet_info(self): ret = self.deluge_web.web_api.get_magnet_info( 'magnet:?xt=urn:btih:SU5225URMTUEQLDXQWRB2EQWN6KLTYKN' ) - self.assertEqual(ret['name'], '953bad769164e8482c7785a21d12166f94b9e14d') - self.assertEqual(ret['info_hash'], '953bad769164e8482c7785a21d12166f94b9e14d') - self.assertTrue('files_tree' in ret) + assert ret['name'] == '953bad769164e8482c7785a21d12166f94b9e14d' + assert ret['info_hash'] == '953bad769164e8482c7785a21d12166f94b9e14d' + assert 'files_tree' in ret - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_get_torrent_files(self): yield self.deluge_web.web_api.connect(self.host_id) filename = common.get_test_data_file('test.torrent') @@ -160,23 +156,20 @@ class WebAPITestCase(WebServerTestBase): ret = yield self.deluge_web.web_api.get_torrent_files( 'ab570cdd5a17ea1b61e970bb72047de141bce173' ) - self.assertEqual(ret['type'], 'dir') - self.assertEqual( - ret['contents'], - { - 'azcvsupdater_2.6.2.jar': { - 'priority': 4, - 'index': 0, - 'offset': 0, - 'progress': 0.0, - 'path': 'azcvsupdater_2.6.2.jar', - 'type': 'file', - 'size': 307949, - } - }, - ) + assert ret['type'] == 'dir' + assert ret['contents'] == { + 'azcvsupdater_2.6.2.jar': { + 'priority': 4, + 'index': 0, + 'offset': 0, + 'progress': 0.0, + 'path': 'azcvsupdater_2.6.2.jar', + 'type': 'file', + 'size': 307949, + } + } - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_download_torrent_from_url(self): filename = 'ubuntu-9.04-desktop-i386.iso.torrent' self.deluge_web.top_level.putChild( @@ -184,9 +177,9 @@ class WebAPITestCase(WebServerTestBase): ) url = 'http://localhost:%d/%s' % (self.webserver_listen_port, filename) res = yield self.deluge_web.web_api.download_torrent_from_url(url) - self.assertTrue(res.endswith(filename)) + assert res.endswith(filename) - @defer.inlineCallbacks + @pytest_twisted.inlineCallbacks def test_invalid_json(self): """ If json_api._send_response does not return server.NOT_DONE_YET diff --git a/deluge/tests/test_web_auth.py b/deluge/tests/test_web_auth.py index a5185737c..39d66c1c1 100644 --- a/deluge/tests/test_web_auth.py +++ b/deluge/tests/test_web_auth.py @@ -1,18 +1,15 @@ -# -*- coding: utf-8 -*- # # This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with # the additional special exception to link portions of this program with the OpenSSL library. # See LICENSE for more details. # -from __future__ import unicode_literals -from mock import patch -from twisted.trial import unittest +from unittest.mock import patch from deluge.ui.web import auth -class MockConfig(object): +class MockConfig: def __init__(self, config): self.config = config @@ -23,7 +20,7 @@ class MockConfig(object): self.config[key] = value -class WebAuthTestCase(unittest.TestCase): +class TestWebAuth: @patch('deluge.ui.web.auth.JSONComponent.__init__', return_value=None) def test_change_password(self, mock_json): config = MockConfig( @@ -33,4 +30,4 @@ class WebAuthTestCase(unittest.TestCase): } ) webauth = auth.Auth(config) - self.assertTrue(webauth.change_password('deluge', 'deluge_new')) + assert webauth.change_password('deluge', 'deluge_new') diff --git a/deluge/tests/test_webserver.py b/deluge/tests/test_webserver.py index d9684bacd..e1588fdf3 100644 --- a/deluge/tests/test_webserver.py +++ b/deluge/tests/test_webserver.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2016 bendikro <bro.devel+deluge@gmail.com> # @@ -7,13 +6,12 @@ # See LICENSE for more details. # -from __future__ import unicode_literals - import json as json_lib from io import BytesIO +import pytest_twisted import twisted.web.client -from twisted.internet import defer, reactor +from twisted.internet import reactor from twisted.web.client import Agent, FileBodyProducer from twisted.web.http_headers import Headers @@ -24,8 +22,8 @@ from .common_web import WebServerMockBase, WebServerTestBase common.disable_new_release_check() -class WebServerTestCase(WebServerTestBase, WebServerMockBase): - @defer.inlineCallbacks +class TestWebServer(WebServerTestBase, WebServerMockBase): + @pytest_twisted.inlineCallbacks def test_get_torrent_info(self): agent = Agent(reactor) @@ -37,7 +35,8 @@ class WebServerTestCase(WebServerTestBase, WebServerMockBase): # UnicodeDecodeError: 'utf8' codec can't decode byte 0xe5 in position 0: invalid continuation byte filename = get_test_data_file('filehash_field.torrent') input_file = ( - '{"params": ["%s"], "method": "web.get_torrent_info", "id": 22}' % filename + '{"params": ["%s"], "method": "web.get_torrent_info", "id": 22}' + % filename.replace('\\', '\\\\') ) headers = { b'User-Agent': ['Twisted Web Client Example'], @@ -51,9 +50,11 @@ class WebServerTestCase(WebServerTestBase, WebServerMockBase): Headers(headers), FileBodyProducer(BytesIO(input_file.encode('utf-8'))), ) - body = yield twisted.web.client.readBody(d) - json = json_lib.loads(body.decode()) - self.assertEqual(None, json['error']) - self.assertEqual('torrent_filehash', json['result']['name']) + try: + json = json_lib.loads(body.decode()) + except Exception: + print('aoeu') + assert json['error'] is None + assert 'torrent_filehash' == json['result']['name'] diff --git a/deluge/tests/twisted/plugins/delugereporter.py b/deluge/tests/twisted/plugins/delugereporter.py deleted file mode 100644 index c2a7b52b5..000000000 --- a/deluge/tests/twisted/plugins/delugereporter.py +++ /dev/null @@ -1,50 +0,0 @@ -#! /usr/bin/env python -# -*- coding: utf-8 -*- -# -# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with -# the additional special exception to link portions of this program with the OpenSSL library. -# See LICENSE for more details. -# - -from __future__ import unicode_literals - -import os - -from twisted.plugin import IPlugin -from twisted.trial.itrial import IReporter -from twisted.trial.reporter import TreeReporter -from zope.interface import implements - - -class _Reporter(object): - implements(IPlugin, IReporter) - - def __init__( - self, name, module, description, longOpt, shortOpt, klass # noqa: N803 - ): - self.name = name - self.module = module - self.description = description - self.longOpt = longOpt - self.shortOpt = shortOpt - self.klass = klass - - -deluge = _Reporter( - 'Deluge reporter that suppresses Stacktrace from TODO tests', - 'twisted.plugins.delugereporter', - description='Deluge Reporter', - longOpt='deluge-reporter', - shortOpt=None, - klass='DelugeReporter', -) - - -class DelugeReporter(TreeReporter): - def __init__(self, *args, **kwargs): - os.environ['DELUGE_REPORTER'] = 'true' - TreeReporter.__init__(self, *args, **kwargs) - - def addExpectedFailure(self, *args): # NOQA: N802 - # super(TreeReporter, self).addExpectedFailure(*args) - self.endLine('[TODO]', self.TODO) |