diff options
Diffstat (limited to 'deluge/core/torrentmanager.py')
-rw-r--r-- | deluge/core/torrentmanager.py | 160 |
1 files changed, 73 insertions, 87 deletions
diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py index 365f37233..5609df4bd 100644 --- a/deluge/core/torrentmanager.py +++ b/deluge/core/torrentmanager.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # # Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com> # @@ -8,25 +7,23 @@ # """TorrentManager handles Torrent objects""" -from __future__ import unicode_literals - import datetime import logging import operator import os +import pickle import time -from collections import namedtuple +from base64 import b64encode from tempfile import gettempdir +from typing import Dict, List, NamedTuple, Tuple -import six.moves.cPickle as pickle # noqa: N813 -from twisted.internet import defer, error, reactor, threads +from twisted.internet import defer, reactor, threads from twisted.internet.defer import Deferred, DeferredList from twisted.internet.task import LoopingCall import deluge.component as component from deluge._libtorrent import LT_VERSION, lt from deluge.common import ( - PY2, VersionSplit, archive_files, decode_bytes, @@ -36,6 +33,7 @@ from deluge.common import ( from deluge.configmanager import ConfigManager, get_config_dir from deluge.core.authmanager import AUTH_LEVEL_ADMIN from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath +from deluge.decorators import maybe_coroutine from deluge.error import AddTorrentError, InvalidTorrentError from deluge.event import ( ExternalIPEvent, @@ -59,6 +57,11 @@ LT_DEFAULT_ADD_TORRENT_FLAGS = ( ) +class PrefetchQueueItem(NamedTuple): + alert_deferred: Deferred + result_queue: List[Deferred] + + class TorrentState: # pylint: disable=old-style-class """Create a torrent state. @@ -136,7 +139,8 @@ class TorrentManager(component.Component): """ - callLater = reactor.callLater # noqa: N815 + # This is used in the test to mock out timeouts + clock = reactor def __init__(self): component.Component.__init__( @@ -165,7 +169,7 @@ class TorrentManager(component.Component): self.is_saving_state = False self.save_resume_data_file_lock = defer.DeferredLock() self.torrents_loading = {} - self.prefetching_metadata = {} + self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {} # This is a map of torrent_ids to Deferreds used to track needed resume data. # The Deferreds will be completed when resume data has been saved. @@ -250,8 +254,8 @@ class TorrentManager(component.Component): self.save_resume_data_timer.start(190, False) self.prev_status_cleanup_loop.start(10) - @defer.inlineCallbacks - def stop(self): + @maybe_coroutine + async def stop(self): # Stop timers if self.save_state_timer.running: self.save_state_timer.stop() @@ -263,11 +267,11 @@ class TorrentManager(component.Component): self.prev_status_cleanup_loop.stop() # Save state on shutdown - yield self.save_state() + await self.save_state() self.session.pause() - result = yield self.save_resume_data(flush_disk_cache=True) + result = await self.save_resume_data(flush_disk_cache=True) # Remove the temp_file to signify successfully saved state if result and os.path.isfile(self.temp_file): os.remove(self.temp_file) @@ -281,11 +285,6 @@ class TorrentManager(component.Component): 'Paused', 'Queued', ): - # If the global setting is set, but the per-torrent isn't... - # Just skip to the next torrent. - # This is so that a user can turn-off the stop at ratio option on a per-torrent basis - if not torrent.options['stop_at_ratio']: - continue if ( torrent.get_ratio() >= torrent.options['stop_ratio'] and torrent.is_finished @@ -293,7 +292,7 @@ class TorrentManager(component.Component): if torrent.options['remove_at_ratio']: self.remove(torrent_id) break - if not torrent.handle.status().paused: + if not torrent.status.paused: torrent.pause() def __getitem__(self, torrent_id): @@ -346,26 +345,28 @@ class TorrentManager(component.Component): else: return torrent_info - def prefetch_metadata(self, magnet, timeout): + @maybe_coroutine + async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]: """Download the metadata for a magnet URI. Args: - magnet (str): A magnet URI to download the metadata for. - timeout (int): Number of seconds to wait before canceling. + magnet: A magnet URI to download the metadata for. + timeout: Number of seconds to wait before canceling. Returns: - Deferred: A tuple of (torrent_id (str), metadata (dict)) + A tuple of (torrent_id, metadata) """ torrent_id = get_magnet_info(magnet)['info_hash'] if torrent_id in self.prefetching_metadata: - return self.prefetching_metadata[torrent_id].defer + d = Deferred() + self.prefetching_metadata[torrent_id].result_queue.append(d) + return await d - add_torrent_params = {} - add_torrent_params['save_path'] = gettempdir() - add_torrent_params['url'] = magnet.strip().encode('utf8') - add_torrent_params['flags'] = ( + add_torrent_params = lt.parse_magnet_uri(magnet) + add_torrent_params.save_path = gettempdir() + add_torrent_params.flags = ( ( LT_DEFAULT_ADD_TORRENT_FLAGS | lt.add_torrent_params_flags_t.flag_duplicate_is_error @@ -379,33 +380,29 @@ class TorrentManager(component.Component): d = Deferred() # Cancel the defer if timeout reached. - defer_timeout = self.callLater(timeout, d.cancel) - d.addBoth(self.on_prefetch_metadata, torrent_id, defer_timeout) - Prefetch = namedtuple('Prefetch', 'defer handle') - self.prefetching_metadata[torrent_id] = Prefetch(defer=d, handle=torrent_handle) - return d + d.addTimeout(timeout, self.clock) + self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, []) - def on_prefetch_metadata(self, torrent_info, torrent_id, defer_timeout): - # Cancel reactor.callLater. try: - defer_timeout.cancel() - except error.AlreadyCalled: - pass - - log.debug('remove prefetch magnet from session') - try: - torrent_handle = self.prefetching_metadata.pop(torrent_id).handle - except KeyError: - pass + torrent_info = await d + except (defer.TimeoutError, defer.CancelledError): + log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.') + metadata = b'' else: - self.session.remove_torrent(torrent_handle, 1) - - metadata = None - if isinstance(torrent_info, lt.torrent_info): log.debug('prefetch metadata received') - metadata = lt.bdecode(torrent_info.metadata()) + if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'): + metadata = torrent_info.metadata() + else: + metadata = torrent_info.info_section() - return torrent_id, metadata + log.debug('remove prefetch magnet from session') + result_queue = self.prefetching_metadata.pop(torrent_id).result_queue + self.session.remove_torrent(torrent_handle, 1) + result = torrent_id, b64encode(metadata) + + for d in result_queue: + d.callback(result) + return result def _build_torrent_options(self, options): """Load default options and update if needed.""" @@ -438,14 +435,10 @@ class TorrentManager(component.Component): elif magnet: magnet_info = get_magnet_info(magnet) if magnet_info: - add_torrent_params['url'] = magnet.strip().encode('utf8') add_torrent_params['name'] = magnet_info['name'] + add_torrent_params['trackers'] = list(magnet_info['trackers']) torrent_id = magnet_info['info_hash'] - # Workaround lt 1.2 bug for magnet resume data with no metadata - if resume_data and VersionSplit(LT_VERSION) >= VersionSplit('1.2.10.0'): - add_torrent_params['info_hash'] = bytes( - bytearray.fromhex(torrent_id) - ) + add_torrent_params['info_hash'] = bytes(bytearray.fromhex(torrent_id)) else: raise AddTorrentError( 'Unable to add magnet, invalid magnet info: %s' % magnet @@ -460,7 +453,7 @@ class TorrentManager(component.Component): raise AddTorrentError('Torrent already being added (%s).' % torrent_id) elif torrent_id in self.prefetching_metadata: # Cancel and remove metadata fetching torrent. - self.prefetching_metadata[torrent_id].defer.cancel() + self.prefetching_metadata[torrent_id].alert_deferred.cancel() # Check for renamed files and if so, rename them in the torrent_info before adding. if options['mapped_files'] and torrent_info: @@ -821,12 +814,9 @@ class TorrentManager(component.Component): try: with open(filepath, 'rb') as _file: - if PY2: - state = pickle.load(_file) - else: - state = pickle.load(_file, encoding='utf8') - except (IOError, EOFError, pickle.UnpicklingError) as ex: - message = 'Unable to load {}: {}'.format(filepath, ex) + state = pickle.load(_file, encoding='utf8') + except (OSError, EOFError, pickle.UnpicklingError) as ex: + message = f'Unable to load {filepath}: {ex}' log.error(message) if not filepath.endswith('.bak'): self.archive_state(message) @@ -1082,7 +1072,7 @@ class TorrentManager(component.Component): try: with open(_filepath, 'rb') as _file: resume_data = lt.bdecode(_file.read()) - except (IOError, EOFError, RuntimeError) as ex: + except (OSError, EOFError, RuntimeError) as ex: if self.torrents: log.warning('Unable to load %s: %s', _filepath, ex) resume_data = None @@ -1366,10 +1356,8 @@ class TorrentManager(component.Component): torrent.set_tracker_status('Announce OK') # Check for peer information from the tracker, if none then send a scrape request. - if ( - alert.handle.status().num_complete == -1 - or alert.handle.status().num_incomplete == -1 - ): + torrent.get_lt_status() + if torrent.status.num_complete == -1 or torrent.status.num_incomplete == -1: torrent.scrape_tracker() def on_alert_tracker_announce(self, alert): @@ -1404,22 +1392,18 @@ class TorrentManager(component.Component): log.debug( 'Tracker Error Alert: %s [%s]', decode_bytes(alert.message()), error_message ) - if VersionSplit(LT_VERSION) >= VersionSplit('1.2.0.0'): - # libtorrent 1.2 added endpoint struct to each tracker. to prevent false updates - # we will need to verify that at least one endpoint to the errored tracker is working - for tracker in torrent.handle.trackers(): - if tracker['url'] == alert.url: - if any( - endpoint['last_error']['value'] == 0 - for endpoint in tracker['endpoints'] - ): - torrent.set_tracker_status('Announce OK') - else: - torrent.set_tracker_status('Error: ' + error_message) - break - else: - # preserve old functionality for libtorrent < 1.2 - torrent.set_tracker_status('Error: ' + error_message) + # libtorrent 1.2 added endpoint struct to each tracker. to prevent false updates + # we will need to verify that at least one endpoint to the errored tracker is working + for tracker in torrent.handle.trackers(): + if tracker['url'] == alert.url: + if any( + endpoint['last_error']['value'] == 0 + for endpoint in tracker['endpoints'] + ): + torrent.set_tracker_status('Announce OK') + else: + torrent.set_tracker_status('Error: ' + error_message) + break def on_alert_storage_moved(self, alert): """Alert handler for libtorrent storage_moved_alert""" @@ -1493,7 +1477,9 @@ class TorrentManager(component.Component): return if torrent_id in self.torrents: # libtorrent add_torrent expects bencoded resume_data. - self.resume_data[torrent_id] = lt.bencode(alert.resume_data) + self.resume_data[torrent_id] = lt.bencode( + lt.write_resume_data(alert.params) + ) if torrent_id in self.waiting_on_resume_data: self.waiting_on_resume_data[torrent_id].callback(None) @@ -1575,7 +1561,7 @@ class TorrentManager(component.Component): # Try callback to prefetch_metadata method. try: - d = self.prefetching_metadata[torrent_id].defer + d = self.prefetching_metadata[torrent_id].alert_deferred except KeyError: pass else: @@ -1621,7 +1607,7 @@ class TorrentManager(component.Component): except RuntimeError: continue if torrent_id in self.torrents: - self.torrents[torrent_id].update_status(t_status) + self.torrents[torrent_id].status = t_status self.handle_torrents_status_callback(self.torrents_status_requests.pop()) |