summaryrefslogtreecommitdiffstats
path: root/deluge/tests/test_torrent.py
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/tests/test_torrent.py')
-rw-r--r--deluge/tests/test_torrent.py111
1 files changed, 65 insertions, 46 deletions
diff --git a/deluge/tests/test_torrent.py b/deluge/tests/test_torrent.py
index 5da817924..36adc0fde 100644
--- a/deluge/tests/test_torrent.py
+++ b/deluge/tests/test_torrent.py
@@ -1,41 +1,37 @@
-# -*- coding: utf-8 -*-
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
-
-from __future__ import print_function, unicode_literals
-
+import itertools
import os
import time
from base64 import b64encode
+from unittest import mock
-import mock
+import pytest
+import pytest_twisted
from twisted.internet import defer, reactor
from twisted.internet.task import deferLater
-from twisted.trial import unittest
import deluge.component as component
import deluge.core.torrent
import deluge.tests.common as common
from deluge._libtorrent import lt
-from deluge.common import VersionSplit, utf8_encode_structure, windows_check
+from deluge.common import VersionSplit, utf8_encode_structure
+from deluge.conftest import BaseTestCase
from deluge.core.core import Core
from deluge.core.rpcserver import RPCServer
from deluge.core.torrent import Torrent
from deluge.core.torrentmanager import TorrentManager, TorrentState
-from .basetest import BaseTestCase
-
-class TorrentTestCase(BaseTestCase):
+class TestTorrent(BaseTestCase):
def setup_config(self):
- config_dir = common.set_tmp_config_dir()
core_config = deluge.config.Config(
'core.conf',
defaults=deluge.core.preferencesmanager.DEFAULT_PREFS,
- config_dir=config_dir,
+ config_dir=self.config_dir,
)
core_config.save()
@@ -67,7 +63,7 @@ class TorrentTestCase(BaseTestCase):
def assert_state(self, torrent, state):
torrent.update_state()
- self.assertEqual(torrent.state, state)
+ assert torrent.state == state
def get_torrent_atp(self, filename):
filename = common.get_test_data_file(filename)
@@ -85,25 +81,37 @@ class TorrentTestCase(BaseTestCase):
}
return atp
- def test_set_file_priorities(self):
+ @pytest_twisted.ensureDeferred
+ async def test_set_file_priorities(self):
+ if getattr(lt, 'file_prio_alert', None):
+ # Libtorrent 2.0.3 and later has a file_prio_alert
+ prios_set = defer.Deferred()
+ prios_set.addTimeout(1.5, reactor)
+ component.get('AlertManager').register_handler(
+ 'file_prio_alert', lambda a: prios_set.callback(True)
+ )
+ else:
+ # On older libtorrent, we just wait a while
+ prios_set = deferLater(reactor, 0.8)
+
atp = self.get_torrent_atp('dir_with_6_files.torrent')
handle = self.session.add_torrent(atp)
torrent = Torrent(handle, {})
result = torrent.get_file_priorities()
- self.assertTrue(all(x == 4 for x in result))
+ assert all(x == 4 for x in result)
new_priorities = [3, 1, 2, 0, 5, 6, 7]
torrent.set_file_priorities(new_priorities)
- self.assertEqual(torrent.get_file_priorities(), new_priorities)
+ assert torrent.get_file_priorities() == new_priorities
# Test with handle.piece_priorities as handle.file_priorities async
# updates and will return old value. Also need to remove a priority
# value as one file is much smaller than piece size so doesn't show.
- time.sleep(0.6) # Delay to wait for alert from lt
- piece_prio = handle.piece_priorities()
+ await prios_set # Delay to wait for alert from lt
+ piece_prio = handle.get_piece_priorities()
result = all(p in piece_prio for p in [3, 2, 0, 5, 6, 7])
- self.assertTrue(result)
+ assert result
def test_set_prioritize_first_last_pieces(self):
piece_indexes = [
@@ -143,19 +151,19 @@ class TorrentTestCase(BaseTestCase):
handle = self.session.add_torrent(atp)
self.torrent = Torrent(handle, {})
- priorities_original = handle.piece_priorities()
+ priorities_original = handle.get_piece_priorities()
self.torrent.set_prioritize_first_last_pieces(True)
- priorities = handle.piece_priorities()
+ priorities = handle.get_piece_priorities()
# The length of the list of new priorites is the same as the original
- self.assertEqual(len(priorities_original), len(priorities))
+ assert len(priorities_original) == len(priorities)
# Test the priority of all the pieces against the calculated indexes.
for idx, priority in enumerate(priorities):
if idx in prioritized_piece_indexes:
- self.assertEqual(priorities[idx], 7)
+ assert priorities[idx] == 7
else:
- self.assertEqual(priorities[idx], 4)
+ assert priorities[idx] == 4
# self.print_priority_list(priorities)
@@ -167,17 +175,15 @@ class TorrentTestCase(BaseTestCase):
self.torrent.set_prioritize_first_last_pieces(True)
# Reset pirorities
self.torrent.set_prioritize_first_last_pieces(False)
- priorities = handle.piece_priorities()
+ priorities = handle.get_piece_priorities()
# Test the priority of the prioritized pieces
for i in priorities:
- self.assertEqual(priorities[i], 4)
+ assert priorities[i] == 4
# self.print_priority_list(priorities)
def test_torrent_error_data_missing(self):
- if windows_check():
- raise unittest.SkipTest('unexpected end of file in bencoded string')
options = {'seed_mode': True}
filename = common.get_test_data_file('test_torrent.file.torrent')
with open(filename, 'rb') as _file:
@@ -194,8 +200,6 @@ class TorrentTestCase(BaseTestCase):
self.assert_state(torrent, 'Error')
def test_torrent_error_resume_original_state(self):
- if windows_check():
- raise unittest.SkipTest('unexpected end of file in bencoded string')
options = {'seed_mode': True, 'add_paused': True}
filename = common.get_test_data_file('test_torrent.file.torrent')
with open(filename, 'rb') as _file:
@@ -215,10 +219,8 @@ class TorrentTestCase(BaseTestCase):
torrent.force_recheck()
def test_torrent_error_resume_data_unaltered(self):
- if windows_check():
- raise unittest.SkipTest('unexpected end of file in bencoded string')
if VersionSplit(lt.__version__) >= VersionSplit('1.2.0.0'):
- raise unittest.SkipTest('Test not working as expected on lt 1.2 or greater')
+ pytest.skip('Test not working as expected on lt 1.2 or greater')
resume_data = {
'active_time': 13399,
@@ -286,7 +288,7 @@ class TorrentTestCase(BaseTestCase):
tm_resume_data = lt.bdecode(
self.core.torrentmanager.resume_data[torrent.torrent_id]
)
- self.assertEqual(tm_resume_data, resume_data)
+ assert tm_resume_data == resume_data
return deferLater(reactor, 0.5, assert_resume_data)
@@ -294,7 +296,7 @@ class TorrentTestCase(BaseTestCase):
atp = self.get_torrent_atp('test_torrent.file.torrent')
handle = self.session.add_torrent(atp)
self.torrent = Torrent(handle, {})
- self.assertEqual(self.torrent.get_eta(), 0)
+ assert self.torrent.get_eta() == 0
self.torrent.status = mock.MagicMock()
self.torrent.status.upload_payload_rate = 5000
@@ -304,18 +306,18 @@ class TorrentTestCase(BaseTestCase):
self.torrent.is_finished = True
self.torrent.options = {'stop_at_ratio': False}
# Test finished and uploading but no stop_at_ratio set.
- self.assertEqual(self.torrent.get_eta(), 0)
+ assert self.torrent.get_eta() == 0
self.torrent.options = {'stop_at_ratio': True, 'stop_ratio': 1.5}
result = self.torrent.get_eta()
- self.assertEqual(result, 2)
- self.assertIsInstance(result, int)
+ assert result == 2
+ assert isinstance(result, int)
def test_get_eta_downloading(self):
atp = self.get_torrent_atp('test_torrent.file.torrent')
handle = self.session.add_torrent(atp)
self.torrent = Torrent(handle, {})
- self.assertEqual(self.torrent.get_eta(), 0)
+ assert self.torrent.get_eta() == 0
self.torrent.status = mock.MagicMock()
self.torrent.status.download_payload_rate = 50
@@ -323,15 +325,15 @@ class TorrentTestCase(BaseTestCase):
self.torrent.status.total_wanted_done = 5000
result = self.torrent.get_eta()
- self.assertEqual(result, 100)
- self.assertIsInstance(result, int)
+ assert result == 100
+ assert isinstance(result, int)
def test_get_name_unicode(self):
"""Test retrieving a unicode torrent name from libtorrent."""
atp = self.get_torrent_atp('unicode_file.torrent')
handle = self.session.add_torrent(atp)
self.torrent = Torrent(handle, {})
- self.assertEqual(self.torrent.get_name(), 'সুকুমার রায়.txt')
+ assert self.torrent.get_name() == 'সুকুমার রায়.txt'
def test_rename_unicode(self):
"""Test renaming file/folders with unicode filenames."""
@@ -342,15 +344,32 @@ class TorrentTestCase(BaseTestCase):
TorrentManager.save_resume_data = mock.MagicMock
result = self.torrent.rename_folder('unicode_filenames', 'Горбачёв')
- self.assertIsInstance(result, defer.DeferredList)
+ assert isinstance(result, defer.DeferredList)
result = self.torrent.rename_files([[0, 'new_рбачёв']])
- self.assertIsNone(result)
+ assert result is None
def test_connect_peer_port(self):
"""Test to ensure port is int for libtorrent"""
atp = self.get_torrent_atp('test_torrent.file.torrent')
handle = self.session.add_torrent(atp)
self.torrent = Torrent(handle, {})
- self.assertFalse(self.torrent.connect_peer('127.0.0.1', 'text'))
- self.assertTrue(self.torrent.connect_peer('127.0.0.1', '1234'))
+ assert not self.torrent.connect_peer('127.0.0.1', 'text')
+ assert self.torrent.connect_peer('127.0.0.1', '1234')
+
+ def test_status_cache(self):
+ atp = self.get_torrent_atp('test_torrent.file.torrent')
+ handle = self.session.add_torrent(atp)
+ mock_time = mock.Mock(return_value=time.time())
+ with mock.patch('time.time', mock_time):
+ torrent = Torrent(handle, {})
+ counter = itertools.count()
+ handle.status = mock.Mock(side_effect=counter.__next__)
+ first_status = torrent.get_lt_status()
+ assert first_status == 0, 'sanity check'
+ assert first_status == torrent.status, 'cached status should be used'
+ assert torrent.get_lt_status() == 1, 'status should update'
+ assert torrent.status == 1
+ # Advance time and verify cache expires and updates
+ mock_time.return_value += 10
+ assert torrent.status == 2