summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--deluge/core/core.py20
-rw-r--r--deluge/core/rpcserver.py13
-rw-r--r--deluge/core/torrentmanager.py66
-rw-r--r--deluge/tests/test_core.py16
-rw-r--r--deluge/tests/test_torrentmanager.py6
5 files changed, 64 insertions, 57 deletions
diff --git a/deluge/core/core.py b/deluge/core/core.py
index 18fda68b2..35cf0194f 100644
--- a/deluge/core/core.py
+++ b/deluge/core/core.py
@@ -432,9 +432,10 @@ class Core(component.Component):
return d
@export
- def prefetch_magnet_metadata(
+ @maybe_coroutine
+ async def prefetch_magnet_metadata(
self, magnet: str, timeout: int = 30
- ) -> 'defer.Deferred[Tuple[str, bytes]]':
+ ) -> Tuple[str, bytes]:
"""Download magnet metadata without adding to Deluge session.
Used by UIs to get magnet files for selection before adding to session.
@@ -446,19 +447,10 @@ class Core(component.Component):
timeout: Number of seconds to wait before canceling request.
Returns:
- A tuple of (torrent_id (str), metadata (str)) for the magnet.
- """
+ A tuple of (torrent_id, metadata) for the magnet.
- def on_metadata(result, result_d):
- """Return result of torrent_id and metadata"""
- result_d.callback(result)
- return result
-
- d = self.torrentmanager.prefetch_metadata(magnet, timeout)
- # Use a separate callback chain to handle existing prefetching magnet.
- result_d = defer.Deferred()
- d.addBoth(on_metadata, result_d)
- return result_d
+ """
+ return await self.torrentmanager.prefetch_metadata(magnet, timeout)
@export
def add_torrent_file(
diff --git a/deluge/core/rpcserver.py b/deluge/core/rpcserver.py
index 104399df9..d4ca5d19d 100644
--- a/deluge/core/rpcserver.py
+++ b/deluge/core/rpcserver.py
@@ -13,6 +13,7 @@ import sys
import traceback
from collections import namedtuple
from types import FunctionType
+from typing import Callable, TypeVar, overload
from twisted.internet import defer, reactor
from twisted.internet.protocol import Factory, connectionDone
@@ -41,6 +42,18 @@ RPC_EVENT = 3
log = logging.getLogger(__name__)
+TCallable = TypeVar('TCallable', bound=Callable)
+
+
+@overload
+def export(func: TCallable) -> TCallable:
+ ...
+
+
+@overload
+def export(auth_level: int) -> Callable[[TCallable], TCallable]:
+ ...
+
def export(auth_level=AUTH_LEVEL_DEFAULT):
"""
diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py
index 41e1966b0..4904e94ed 100644
--- a/deluge/core/torrentmanager.py
+++ b/deluge/core/torrentmanager.py
@@ -14,10 +14,10 @@ import os
import pickle
import time
from base64 import b64encode
-from collections import namedtuple
from tempfile import gettempdir
+from typing import Dict, List, NamedTuple, Tuple
-from twisted.internet import defer, error, reactor, threads
+from twisted.internet import defer, reactor, threads
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.task import LoopingCall
@@ -57,6 +57,11 @@ LT_DEFAULT_ADD_TORRENT_FLAGS = (
)
+class PrefetchQueueItem(NamedTuple):
+ alert_deferred: Deferred
+ result_queue: List[Deferred]
+
+
class TorrentState: # pylint: disable=old-style-class
"""Create a torrent state.
@@ -134,7 +139,8 @@ class TorrentManager(component.Component):
"""
- callLater = reactor.callLater # noqa: N815
+ # This is used in the test to mock out timeouts
+ clock = reactor
def __init__(self):
component.Component.__init__(
@@ -163,7 +169,7 @@ class TorrentManager(component.Component):
self.is_saving_state = False
self.save_resume_data_file_lock = defer.DeferredLock()
self.torrents_loading = {}
- self.prefetching_metadata = {}
+ self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {}
# This is a map of torrent_ids to Deferreds used to track needed resume data.
# The Deferreds will be completed when resume data has been saved.
@@ -339,21 +345,24 @@ class TorrentManager(component.Component):
else:
return torrent_info
- def prefetch_metadata(self, magnet, timeout):
+ @maybe_coroutine
+ async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]:
"""Download the metadata for a magnet URI.
Args:
- magnet (str): A magnet URI to download the metadata for.
- timeout (int): Number of seconds to wait before canceling.
+ magnet: A magnet URI to download the metadata for.
+ timeout: Number of seconds to wait before canceling.
Returns:
- Deferred: A tuple of (torrent_id (str), metadata (dict))
+ A tuple of (torrent_id, metadata)
"""
torrent_id = get_magnet_info(magnet)['info_hash']
if torrent_id in self.prefetching_metadata:
- return self.prefetching_metadata[torrent_id].defer
+ d = Deferred()
+ self.prefetching_metadata[torrent_id].result_queue.append(d)
+ return await d
add_torrent_params = lt.parse_magnet_uri(magnet)
add_torrent_params.save_path = gettempdir()
@@ -371,36 +380,29 @@ class TorrentManager(component.Component):
d = Deferred()
# Cancel the defer if timeout reached.
- defer_timeout = self.callLater(timeout, d.cancel)
- d.addBoth(self.on_prefetch_metadata, torrent_id, defer_timeout)
- Prefetch = namedtuple('Prefetch', 'defer handle')
- self.prefetching_metadata[torrent_id] = Prefetch(defer=d, handle=torrent_handle)
- return d
+ d.addTimeout(timeout, self.clock)
+ self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, [])
- def on_prefetch_metadata(self, torrent_info, torrent_id, defer_timeout):
- # Cancel reactor.callLater.
try:
- defer_timeout.cancel()
- except error.AlreadyCalled:
- pass
-
- log.debug('remove prefetch magnet from session')
- try:
- torrent_handle = self.prefetching_metadata.pop(torrent_id).handle
- except KeyError:
- pass
+ torrent_info = await d
+ except (defer.TimeoutError, defer.CancelledError):
+ log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.')
+ metadata = b''
else:
- self.session.remove_torrent(torrent_handle, 1)
-
- metadata = b''
- if isinstance(torrent_info, lt.torrent_info):
log.debug('prefetch metadata received')
if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'):
metadata = torrent_info.metadata()
else:
metadata = torrent_info.info_section()
- return torrent_id, b64encode(metadata)
+ log.debug('remove prefetch magnet from session')
+ result_queue = self.prefetching_metadata.pop(torrent_id).result_queue
+ self.session.remove_torrent(torrent_handle, 1)
+ result = torrent_id, b64encode(metadata)
+
+ for d in result_queue:
+ d.callback(result)
+ return result
def _build_torrent_options(self, options):
"""Load default options and update if needed."""
@@ -451,7 +453,7 @@ class TorrentManager(component.Component):
raise AddTorrentError('Torrent already being added (%s).' % torrent_id)
elif torrent_id in self.prefetching_metadata:
# Cancel and remove metadata fetching torrent.
- self.prefetching_metadata[torrent_id].defer.cancel()
+ self.prefetching_metadata[torrent_id].alert_deferred.cancel()
# Check for renamed files and if so, rename them in the torrent_info before adding.
if options['mapped_files'] and torrent_info:
@@ -1559,7 +1561,7 @@ class TorrentManager(component.Component):
# Try callback to prefetch_metadata method.
try:
- d = self.prefetching_metadata[torrent_id].defer
+ d = self.prefetching_metadata[torrent_id].alert_deferred
except KeyError:
pass
else:
diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py
index 42457480d..f1c2e3be8 100644
--- a/deluge/tests/test_core.py
+++ b/deluge/tests/test_core.py
@@ -80,10 +80,10 @@ class TopLevelResource(Resource):
class TestCore(BaseTestCase):
def set_up(self):
self.rpcserver = RPCServer(listen=False)
- self.core = Core()
+ self.core: Core = Core()
self.core.config.config['lsd'] = False
self.clock = task.Clock()
- self.core.torrentmanager.callLater = self.clock.callLater
+ self.core.torrentmanager.clock = self.clock
self.listen_port = 51242
return component.start().addCallback(self.start_web_server)
@@ -313,20 +313,18 @@ class TestCore(BaseTestCase):
r2 = self.core.get_torrent_status(tid2, ['paused'])
assert r2['paused']
+ @pytest_twisted.inlineCallbacks
def test_prefetch_metadata_existing(self):
"""Check another call with same magnet returns existing deferred."""
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'')
- def on_result(result):
- assert result == expected
-
- d = self.core.prefetch_magnet_metadata(magnet)
- d.addCallback(on_result)
+ d1 = self.core.prefetch_magnet_metadata(magnet)
d2 = self.core.prefetch_magnet_metadata(magnet)
- d2.addCallback(on_result)
+ dg = defer.gatherResults([d1, d2], consumeErrors=True)
self.clock.advance(30)
- return defer.DeferredList([d, d2])
+ result = yield dg
+ assert result == [expected] * 2
@pytest_twisted.inlineCallbacks
def test_remove_torrent(self):
diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py
index e1eaa6e50..0ead27230 100644
--- a/deluge/tests/test_torrentmanager.py
+++ b/deluge/tests/test_torrentmanager.py
@@ -12,7 +12,7 @@ from unittest import mock
import pytest
import pytest_twisted
-from twisted.internet import task
+from twisted.internet import reactor, task
from deluge import component
from deluge.bencode import bencode
@@ -78,7 +78,9 @@ class TestTorrentmanager(BaseTestCase):
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
d = self.tm.prefetch_metadata(magnet, 30)
- self.tm.on_alert_metadata_received(mock_alert)
+ # Make sure to use calllater, because the above prefetch call won't
+ # actually start running until we await it.
+ reactor.callLater(0, self.tm.on_alert_metadata_received, mock_alert)
expected = (
'ab570cdd5a17ea1b61e970bb72047de141bce173',