diff options
-rw-r--r-- | deluge/core/core.py | 20 | ||||
-rw-r--r-- | deluge/core/rpcserver.py | 13 | ||||
-rw-r--r-- | deluge/core/torrentmanager.py | 66 | ||||
-rw-r--r-- | deluge/tests/test_core.py | 16 | ||||
-rw-r--r-- | deluge/tests/test_torrentmanager.py | 6 |
5 files changed, 64 insertions, 57 deletions
diff --git a/deluge/core/core.py b/deluge/core/core.py index 18fda68b2..35cf0194f 100644 --- a/deluge/core/core.py +++ b/deluge/core/core.py @@ -432,9 +432,10 @@ class Core(component.Component): return d @export - def prefetch_magnet_metadata( + @maybe_coroutine + async def prefetch_magnet_metadata( self, magnet: str, timeout: int = 30 - ) -> 'defer.Deferred[Tuple[str, bytes]]': + ) -> Tuple[str, bytes]: """Download magnet metadata without adding to Deluge session. Used by UIs to get magnet files for selection before adding to session. @@ -446,19 +447,10 @@ class Core(component.Component): timeout: Number of seconds to wait before canceling request. Returns: - A tuple of (torrent_id (str), metadata (str)) for the magnet. - """ + A tuple of (torrent_id, metadata) for the magnet. - def on_metadata(result, result_d): - """Return result of torrent_id and metadata""" - result_d.callback(result) - return result - - d = self.torrentmanager.prefetch_metadata(magnet, timeout) - # Use a separate callback chain to handle existing prefetching magnet. - result_d = defer.Deferred() - d.addBoth(on_metadata, result_d) - return result_d + """ + return await self.torrentmanager.prefetch_metadata(magnet, timeout) @export def add_torrent_file( diff --git a/deluge/core/rpcserver.py b/deluge/core/rpcserver.py index 104399df9..d4ca5d19d 100644 --- a/deluge/core/rpcserver.py +++ b/deluge/core/rpcserver.py @@ -13,6 +13,7 @@ import sys import traceback from collections import namedtuple from types import FunctionType +from typing import Callable, TypeVar, overload from twisted.internet import defer, reactor from twisted.internet.protocol import Factory, connectionDone @@ -41,6 +42,18 @@ RPC_EVENT = 3 log = logging.getLogger(__name__) +TCallable = TypeVar('TCallable', bound=Callable) + + +@overload +def export(func: TCallable) -> TCallable: + ... + + +@overload +def export(auth_level: int) -> Callable[[TCallable], TCallable]: + ... + def export(auth_level=AUTH_LEVEL_DEFAULT): """ diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py index 41e1966b0..4904e94ed 100644 --- a/deluge/core/torrentmanager.py +++ b/deluge/core/torrentmanager.py @@ -14,10 +14,10 @@ import os import pickle import time from base64 import b64encode -from collections import namedtuple from tempfile import gettempdir +from typing import Dict, List, NamedTuple, Tuple -from twisted.internet import defer, error, reactor, threads +from twisted.internet import defer, reactor, threads from twisted.internet.defer import Deferred, DeferredList from twisted.internet.task import LoopingCall @@ -57,6 +57,11 @@ LT_DEFAULT_ADD_TORRENT_FLAGS = ( ) +class PrefetchQueueItem(NamedTuple): + alert_deferred: Deferred + result_queue: List[Deferred] + + class TorrentState: # pylint: disable=old-style-class """Create a torrent state. @@ -134,7 +139,8 @@ class TorrentManager(component.Component): """ - callLater = reactor.callLater # noqa: N815 + # This is used in the test to mock out timeouts + clock = reactor def __init__(self): component.Component.__init__( @@ -163,7 +169,7 @@ class TorrentManager(component.Component): self.is_saving_state = False self.save_resume_data_file_lock = defer.DeferredLock() self.torrents_loading = {} - self.prefetching_metadata = {} + self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {} # This is a map of torrent_ids to Deferreds used to track needed resume data. # The Deferreds will be completed when resume data has been saved. @@ -339,21 +345,24 @@ class TorrentManager(component.Component): else: return torrent_info - def prefetch_metadata(self, magnet, timeout): + @maybe_coroutine + async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]: """Download the metadata for a magnet URI. Args: - magnet (str): A magnet URI to download the metadata for. - timeout (int): Number of seconds to wait before canceling. + magnet: A magnet URI to download the metadata for. + timeout: Number of seconds to wait before canceling. Returns: - Deferred: A tuple of (torrent_id (str), metadata (dict)) + A tuple of (torrent_id, metadata) """ torrent_id = get_magnet_info(magnet)['info_hash'] if torrent_id in self.prefetching_metadata: - return self.prefetching_metadata[torrent_id].defer + d = Deferred() + self.prefetching_metadata[torrent_id].result_queue.append(d) + return await d add_torrent_params = lt.parse_magnet_uri(magnet) add_torrent_params.save_path = gettempdir() @@ -371,36 +380,29 @@ class TorrentManager(component.Component): d = Deferred() # Cancel the defer if timeout reached. - defer_timeout = self.callLater(timeout, d.cancel) - d.addBoth(self.on_prefetch_metadata, torrent_id, defer_timeout) - Prefetch = namedtuple('Prefetch', 'defer handle') - self.prefetching_metadata[torrent_id] = Prefetch(defer=d, handle=torrent_handle) - return d + d.addTimeout(timeout, self.clock) + self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, []) - def on_prefetch_metadata(self, torrent_info, torrent_id, defer_timeout): - # Cancel reactor.callLater. try: - defer_timeout.cancel() - except error.AlreadyCalled: - pass - - log.debug('remove prefetch magnet from session') - try: - torrent_handle = self.prefetching_metadata.pop(torrent_id).handle - except KeyError: - pass + torrent_info = await d + except (defer.TimeoutError, defer.CancelledError): + log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.') + metadata = b'' else: - self.session.remove_torrent(torrent_handle, 1) - - metadata = b'' - if isinstance(torrent_info, lt.torrent_info): log.debug('prefetch metadata received') if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'): metadata = torrent_info.metadata() else: metadata = torrent_info.info_section() - return torrent_id, b64encode(metadata) + log.debug('remove prefetch magnet from session') + result_queue = self.prefetching_metadata.pop(torrent_id).result_queue + self.session.remove_torrent(torrent_handle, 1) + result = torrent_id, b64encode(metadata) + + for d in result_queue: + d.callback(result) + return result def _build_torrent_options(self, options): """Load default options and update if needed.""" @@ -451,7 +453,7 @@ class TorrentManager(component.Component): raise AddTorrentError('Torrent already being added (%s).' % torrent_id) elif torrent_id in self.prefetching_metadata: # Cancel and remove metadata fetching torrent. - self.prefetching_metadata[torrent_id].defer.cancel() + self.prefetching_metadata[torrent_id].alert_deferred.cancel() # Check for renamed files and if so, rename them in the torrent_info before adding. if options['mapped_files'] and torrent_info: @@ -1559,7 +1561,7 @@ class TorrentManager(component.Component): # Try callback to prefetch_metadata method. try: - d = self.prefetching_metadata[torrent_id].defer + d = self.prefetching_metadata[torrent_id].alert_deferred except KeyError: pass else: diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py index 42457480d..f1c2e3be8 100644 --- a/deluge/tests/test_core.py +++ b/deluge/tests/test_core.py @@ -80,10 +80,10 @@ class TopLevelResource(Resource): class TestCore(BaseTestCase): def set_up(self): self.rpcserver = RPCServer(listen=False) - self.core = Core() + self.core: Core = Core() self.core.config.config['lsd'] = False self.clock = task.Clock() - self.core.torrentmanager.callLater = self.clock.callLater + self.core.torrentmanager.clock = self.clock self.listen_port = 51242 return component.start().addCallback(self.start_web_server) @@ -313,20 +313,18 @@ class TestCore(BaseTestCase): r2 = self.core.get_torrent_status(tid2, ['paused']) assert r2['paused'] + @pytest_twisted.inlineCallbacks def test_prefetch_metadata_existing(self): """Check another call with same magnet returns existing deferred.""" magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'') - def on_result(result): - assert result == expected - - d = self.core.prefetch_magnet_metadata(magnet) - d.addCallback(on_result) + d1 = self.core.prefetch_magnet_metadata(magnet) d2 = self.core.prefetch_magnet_metadata(magnet) - d2.addCallback(on_result) + dg = defer.gatherResults([d1, d2], consumeErrors=True) self.clock.advance(30) - return defer.DeferredList([d, d2]) + result = yield dg + assert result == [expected] * 2 @pytest_twisted.inlineCallbacks def test_remove_torrent(self): diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py index e1eaa6e50..0ead27230 100644 --- a/deluge/tests/test_torrentmanager.py +++ b/deluge/tests/test_torrentmanager.py @@ -12,7 +12,7 @@ from unittest import mock import pytest import pytest_twisted -from twisted.internet import task +from twisted.internet import reactor, task from deluge import component from deluge.bencode import bencode @@ -78,7 +78,9 @@ class TestTorrentmanager(BaseTestCase): magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' d = self.tm.prefetch_metadata(magnet, 30) - self.tm.on_alert_metadata_received(mock_alert) + # Make sure to use calllater, because the above prefetch call won't + # actually start running until we await it. + reactor.callLater(0, self.tm.on_alert_metadata_received, mock_alert) expected = ( 'ab570cdd5a17ea1b61e970bb72047de141bce173', |