summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChase Sterling <chase.sterling@gmail.com>2022-02-07 19:40:05 -0500
committerCalum Lind <calumlind+deluge@gmail.com>2022-05-17 22:15:55 +0100
commit47e548fdb5fed54a8e607ece81ad9f862d96b64a (patch)
tree186d5389075a6dda5b98ceb86c8d37b644acb006
parentcd63efd93557d04c0e3570fa89b338696811c8e0 (diff)
downloaddeluge-47e548fdb5fed54a8e607ece81ad9f862d96b64a.tar.gz
deluge-47e548fdb5fed54a8e607ece81ad9f862d96b64a.tar.bz2
deluge-47e548fdb5fed54a8e607ece81ad9f862d96b64a.zip
[Core] Refactor prefetch_metadata for more clarity
Just trying to clean up some of the more complicated callback logic. Notable changes: * The test was awaiting a DeferredList. By default that will eat exceptions and just add them to the result list (including test assertion exceptions.) Added fireOnOneErrback=True to make sure that wasn't happening. * Moved the logic for multiple calls to await the same response into torrentmanager from core, so no matter where the prefetch is called from it will wait for the original call. * Implemented the multiple calls with an explicit queue of waiting callbacks, rather than a callback callback chain. * Moved to one inline async function rather than split into a main and callback after alert function. * Added some more type hints to the stuff I changed. Adjusted test since we are using prefetch as an async function now we have to schedule the alert to come after we start awaiting the prefetch call. Closes: https://github.com/deluge-torrent/deluge/pull/368
-rw-r--r--deluge/core/core.py20
-rw-r--r--deluge/core/rpcserver.py13
-rw-r--r--deluge/core/torrentmanager.py66
-rw-r--r--deluge/tests/test_core.py16
-rw-r--r--deluge/tests/test_torrentmanager.py6
5 files changed, 64 insertions, 57 deletions
diff --git a/deluge/core/core.py b/deluge/core/core.py
index 18fda68b2..35cf0194f 100644
--- a/deluge/core/core.py
+++ b/deluge/core/core.py
@@ -432,9 +432,10 @@ class Core(component.Component):
return d
@export
- def prefetch_magnet_metadata(
+ @maybe_coroutine
+ async def prefetch_magnet_metadata(
self, magnet: str, timeout: int = 30
- ) -> 'defer.Deferred[Tuple[str, bytes]]':
+ ) -> Tuple[str, bytes]:
"""Download magnet metadata without adding to Deluge session.
Used by UIs to get magnet files for selection before adding to session.
@@ -446,19 +447,10 @@ class Core(component.Component):
timeout: Number of seconds to wait before canceling request.
Returns:
- A tuple of (torrent_id (str), metadata (str)) for the magnet.
- """
+ A tuple of (torrent_id, metadata) for the magnet.
- def on_metadata(result, result_d):
- """Return result of torrent_id and metadata"""
- result_d.callback(result)
- return result
-
- d = self.torrentmanager.prefetch_metadata(magnet, timeout)
- # Use a separate callback chain to handle existing prefetching magnet.
- result_d = defer.Deferred()
- d.addBoth(on_metadata, result_d)
- return result_d
+ """
+ return await self.torrentmanager.prefetch_metadata(magnet, timeout)
@export
def add_torrent_file(
diff --git a/deluge/core/rpcserver.py b/deluge/core/rpcserver.py
index 104399df9..d4ca5d19d 100644
--- a/deluge/core/rpcserver.py
+++ b/deluge/core/rpcserver.py
@@ -13,6 +13,7 @@ import sys
import traceback
from collections import namedtuple
from types import FunctionType
+from typing import Callable, TypeVar, overload
from twisted.internet import defer, reactor
from twisted.internet.protocol import Factory, connectionDone
@@ -41,6 +42,18 @@ RPC_EVENT = 3
log = logging.getLogger(__name__)
+TCallable = TypeVar('TCallable', bound=Callable)
+
+
+@overload
+def export(func: TCallable) -> TCallable:
+ ...
+
+
+@overload
+def export(auth_level: int) -> Callable[[TCallable], TCallable]:
+ ...
+
def export(auth_level=AUTH_LEVEL_DEFAULT):
"""
diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py
index 41e1966b0..4904e94ed 100644
--- a/deluge/core/torrentmanager.py
+++ b/deluge/core/torrentmanager.py
@@ -14,10 +14,10 @@ import os
import pickle
import time
from base64 import b64encode
-from collections import namedtuple
from tempfile import gettempdir
+from typing import Dict, List, NamedTuple, Tuple
-from twisted.internet import defer, error, reactor, threads
+from twisted.internet import defer, reactor, threads
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.task import LoopingCall
@@ -57,6 +57,11 @@ LT_DEFAULT_ADD_TORRENT_FLAGS = (
)
+class PrefetchQueueItem(NamedTuple):
+ alert_deferred: Deferred
+ result_queue: List[Deferred]
+
+
class TorrentState: # pylint: disable=old-style-class
"""Create a torrent state.
@@ -134,7 +139,8 @@ class TorrentManager(component.Component):
"""
- callLater = reactor.callLater # noqa: N815
+ # This is used in the test to mock out timeouts
+ clock = reactor
def __init__(self):
component.Component.__init__(
@@ -163,7 +169,7 @@ class TorrentManager(component.Component):
self.is_saving_state = False
self.save_resume_data_file_lock = defer.DeferredLock()
self.torrents_loading = {}
- self.prefetching_metadata = {}
+ self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {}
# This is a map of torrent_ids to Deferreds used to track needed resume data.
# The Deferreds will be completed when resume data has been saved.
@@ -339,21 +345,24 @@ class TorrentManager(component.Component):
else:
return torrent_info
- def prefetch_metadata(self, magnet, timeout):
+ @maybe_coroutine
+ async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]:
"""Download the metadata for a magnet URI.
Args:
- magnet (str): A magnet URI to download the metadata for.
- timeout (int): Number of seconds to wait before canceling.
+ magnet: A magnet URI to download the metadata for.
+ timeout: Number of seconds to wait before canceling.
Returns:
- Deferred: A tuple of (torrent_id (str), metadata (dict))
+ A tuple of (torrent_id, metadata)
"""
torrent_id = get_magnet_info(magnet)['info_hash']
if torrent_id in self.prefetching_metadata:
- return self.prefetching_metadata[torrent_id].defer
+ d = Deferred()
+ self.prefetching_metadata[torrent_id].result_queue.append(d)
+ return await d
add_torrent_params = lt.parse_magnet_uri(magnet)
add_torrent_params.save_path = gettempdir()
@@ -371,36 +380,29 @@ class TorrentManager(component.Component):
d = Deferred()
# Cancel the defer if timeout reached.
- defer_timeout = self.callLater(timeout, d.cancel)
- d.addBoth(self.on_prefetch_metadata, torrent_id, defer_timeout)
- Prefetch = namedtuple('Prefetch', 'defer handle')
- self.prefetching_metadata[torrent_id] = Prefetch(defer=d, handle=torrent_handle)
- return d
+ d.addTimeout(timeout, self.clock)
+ self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, [])
- def on_prefetch_metadata(self, torrent_info, torrent_id, defer_timeout):
- # Cancel reactor.callLater.
try:
- defer_timeout.cancel()
- except error.AlreadyCalled:
- pass
-
- log.debug('remove prefetch magnet from session')
- try:
- torrent_handle = self.prefetching_metadata.pop(torrent_id).handle
- except KeyError:
- pass
+ torrent_info = await d
+ except (defer.TimeoutError, defer.CancelledError):
+ log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.')
+ metadata = b''
else:
- self.session.remove_torrent(torrent_handle, 1)
-
- metadata = b''
- if isinstance(torrent_info, lt.torrent_info):
log.debug('prefetch metadata received')
if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'):
metadata = torrent_info.metadata()
else:
metadata = torrent_info.info_section()
- return torrent_id, b64encode(metadata)
+ log.debug('remove prefetch magnet from session')
+ result_queue = self.prefetching_metadata.pop(torrent_id).result_queue
+ self.session.remove_torrent(torrent_handle, 1)
+ result = torrent_id, b64encode(metadata)
+
+ for d in result_queue:
+ d.callback(result)
+ return result
def _build_torrent_options(self, options):
"""Load default options and update if needed."""
@@ -451,7 +453,7 @@ class TorrentManager(component.Component):
raise AddTorrentError('Torrent already being added (%s).' % torrent_id)
elif torrent_id in self.prefetching_metadata:
# Cancel and remove metadata fetching torrent.
- self.prefetching_metadata[torrent_id].defer.cancel()
+ self.prefetching_metadata[torrent_id].alert_deferred.cancel()
# Check for renamed files and if so, rename them in the torrent_info before adding.
if options['mapped_files'] and torrent_info:
@@ -1559,7 +1561,7 @@ class TorrentManager(component.Component):
# Try callback to prefetch_metadata method.
try:
- d = self.prefetching_metadata[torrent_id].defer
+ d = self.prefetching_metadata[torrent_id].alert_deferred
except KeyError:
pass
else:
diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py
index 42457480d..f1c2e3be8 100644
--- a/deluge/tests/test_core.py
+++ b/deluge/tests/test_core.py
@@ -80,10 +80,10 @@ class TopLevelResource(Resource):
class TestCore(BaseTestCase):
def set_up(self):
self.rpcserver = RPCServer(listen=False)
- self.core = Core()
+ self.core: Core = Core()
self.core.config.config['lsd'] = False
self.clock = task.Clock()
- self.core.torrentmanager.callLater = self.clock.callLater
+ self.core.torrentmanager.clock = self.clock
self.listen_port = 51242
return component.start().addCallback(self.start_web_server)
@@ -313,20 +313,18 @@ class TestCore(BaseTestCase):
r2 = self.core.get_torrent_status(tid2, ['paused'])
assert r2['paused']
+ @pytest_twisted.inlineCallbacks
def test_prefetch_metadata_existing(self):
"""Check another call with same magnet returns existing deferred."""
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', b'')
- def on_result(result):
- assert result == expected
-
- d = self.core.prefetch_magnet_metadata(magnet)
- d.addCallback(on_result)
+ d1 = self.core.prefetch_magnet_metadata(magnet)
d2 = self.core.prefetch_magnet_metadata(magnet)
- d2.addCallback(on_result)
+ dg = defer.gatherResults([d1, d2], consumeErrors=True)
self.clock.advance(30)
- return defer.DeferredList([d, d2])
+ result = yield dg
+ assert result == [expected] * 2
@pytest_twisted.inlineCallbacks
def test_remove_torrent(self):
diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py
index e1eaa6e50..0ead27230 100644
--- a/deluge/tests/test_torrentmanager.py
+++ b/deluge/tests/test_torrentmanager.py
@@ -12,7 +12,7 @@ from unittest import mock
import pytest
import pytest_twisted
-from twisted.internet import task
+from twisted.internet import reactor, task
from deluge import component
from deluge.bencode import bencode
@@ -78,7 +78,9 @@ class TestTorrentmanager(BaseTestCase):
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
d = self.tm.prefetch_metadata(magnet, 30)
- self.tm.on_alert_metadata_received(mock_alert)
+ # Make sure to use calllater, because the above prefetch call won't
+ # actually start running until we await it.
+ reactor.callLater(0, self.tm.on_alert_metadata_received, mock_alert)
expected = (
'ab570cdd5a17ea1b61e970bb72047de141bce173',