summaryrefslogtreecommitdiffstats
path: root/deluge/core/torrentmanager.py
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/core/torrentmanager.py')
-rw-r--r--deluge/core/torrentmanager.py160
1 files changed, 73 insertions, 87 deletions
diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py
index 365f37233..5609df4bd 100644
--- a/deluge/core/torrentmanager.py
+++ b/deluge/core/torrentmanager.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2007-2009 Andrew Resch <andrewresch@gmail.com>
#
@@ -8,25 +7,23 @@
#
"""TorrentManager handles Torrent objects"""
-from __future__ import unicode_literals
-
import datetime
import logging
import operator
import os
+import pickle
import time
-from collections import namedtuple
+from base64 import b64encode
from tempfile import gettempdir
+from typing import Dict, List, NamedTuple, Tuple
-import six.moves.cPickle as pickle # noqa: N813
-from twisted.internet import defer, error, reactor, threads
+from twisted.internet import defer, reactor, threads
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.task import LoopingCall
import deluge.component as component
from deluge._libtorrent import LT_VERSION, lt
from deluge.common import (
- PY2,
VersionSplit,
archive_files,
decode_bytes,
@@ -36,6 +33,7 @@ from deluge.common import (
from deluge.configmanager import ConfigManager, get_config_dir
from deluge.core.authmanager import AUTH_LEVEL_ADMIN
from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath
+from deluge.decorators import maybe_coroutine
from deluge.error import AddTorrentError, InvalidTorrentError
from deluge.event import (
ExternalIPEvent,
@@ -59,6 +57,11 @@ LT_DEFAULT_ADD_TORRENT_FLAGS = (
)
+class PrefetchQueueItem(NamedTuple):
+ alert_deferred: Deferred
+ result_queue: List[Deferred]
+
+
class TorrentState: # pylint: disable=old-style-class
"""Create a torrent state.
@@ -136,7 +139,8 @@ class TorrentManager(component.Component):
"""
- callLater = reactor.callLater # noqa: N815
+ # This is used in the test to mock out timeouts
+ clock = reactor
def __init__(self):
component.Component.__init__(
@@ -165,7 +169,7 @@ class TorrentManager(component.Component):
self.is_saving_state = False
self.save_resume_data_file_lock = defer.DeferredLock()
self.torrents_loading = {}
- self.prefetching_metadata = {}
+ self.prefetching_metadata: Dict[str, PrefetchQueueItem] = {}
# This is a map of torrent_ids to Deferreds used to track needed resume data.
# The Deferreds will be completed when resume data has been saved.
@@ -250,8 +254,8 @@ class TorrentManager(component.Component):
self.save_resume_data_timer.start(190, False)
self.prev_status_cleanup_loop.start(10)
- @defer.inlineCallbacks
- def stop(self):
+ @maybe_coroutine
+ async def stop(self):
# Stop timers
if self.save_state_timer.running:
self.save_state_timer.stop()
@@ -263,11 +267,11 @@ class TorrentManager(component.Component):
self.prev_status_cleanup_loop.stop()
# Save state on shutdown
- yield self.save_state()
+ await self.save_state()
self.session.pause()
- result = yield self.save_resume_data(flush_disk_cache=True)
+ result = await self.save_resume_data(flush_disk_cache=True)
# Remove the temp_file to signify successfully saved state
if result and os.path.isfile(self.temp_file):
os.remove(self.temp_file)
@@ -281,11 +285,6 @@ class TorrentManager(component.Component):
'Paused',
'Queued',
):
- # If the global setting is set, but the per-torrent isn't...
- # Just skip to the next torrent.
- # This is so that a user can turn-off the stop at ratio option on a per-torrent basis
- if not torrent.options['stop_at_ratio']:
- continue
if (
torrent.get_ratio() >= torrent.options['stop_ratio']
and torrent.is_finished
@@ -293,7 +292,7 @@ class TorrentManager(component.Component):
if torrent.options['remove_at_ratio']:
self.remove(torrent_id)
break
- if not torrent.handle.status().paused:
+ if not torrent.status.paused:
torrent.pause()
def __getitem__(self, torrent_id):
@@ -346,26 +345,28 @@ class TorrentManager(component.Component):
else:
return torrent_info
- def prefetch_metadata(self, magnet, timeout):
+ @maybe_coroutine
+ async def prefetch_metadata(self, magnet: str, timeout: int) -> Tuple[str, bytes]:
"""Download the metadata for a magnet URI.
Args:
- magnet (str): A magnet URI to download the metadata for.
- timeout (int): Number of seconds to wait before canceling.
+ magnet: A magnet URI to download the metadata for.
+ timeout: Number of seconds to wait before canceling.
Returns:
- Deferred: A tuple of (torrent_id (str), metadata (dict))
+ A tuple of (torrent_id, metadata)
"""
torrent_id = get_magnet_info(magnet)['info_hash']
if torrent_id in self.prefetching_metadata:
- return self.prefetching_metadata[torrent_id].defer
+ d = Deferred()
+ self.prefetching_metadata[torrent_id].result_queue.append(d)
+ return await d
- add_torrent_params = {}
- add_torrent_params['save_path'] = gettempdir()
- add_torrent_params['url'] = magnet.strip().encode('utf8')
- add_torrent_params['flags'] = (
+ add_torrent_params = lt.parse_magnet_uri(magnet)
+ add_torrent_params.save_path = gettempdir()
+ add_torrent_params.flags = (
(
LT_DEFAULT_ADD_TORRENT_FLAGS
| lt.add_torrent_params_flags_t.flag_duplicate_is_error
@@ -379,33 +380,29 @@ class TorrentManager(component.Component):
d = Deferred()
# Cancel the defer if timeout reached.
- defer_timeout = self.callLater(timeout, d.cancel)
- d.addBoth(self.on_prefetch_metadata, torrent_id, defer_timeout)
- Prefetch = namedtuple('Prefetch', 'defer handle')
- self.prefetching_metadata[torrent_id] = Prefetch(defer=d, handle=torrent_handle)
- return d
+ d.addTimeout(timeout, self.clock)
+ self.prefetching_metadata[torrent_id] = PrefetchQueueItem(d, [])
- def on_prefetch_metadata(self, torrent_info, torrent_id, defer_timeout):
- # Cancel reactor.callLater.
try:
- defer_timeout.cancel()
- except error.AlreadyCalled:
- pass
-
- log.debug('remove prefetch magnet from session')
- try:
- torrent_handle = self.prefetching_metadata.pop(torrent_id).handle
- except KeyError:
- pass
+ torrent_info = await d
+ except (defer.TimeoutError, defer.CancelledError):
+ log.debug(f'Prefetching metadata for {torrent_id} timed out or cancelled.')
+ metadata = b''
else:
- self.session.remove_torrent(torrent_handle, 1)
-
- metadata = None
- if isinstance(torrent_info, lt.torrent_info):
log.debug('prefetch metadata received')
- metadata = lt.bdecode(torrent_info.metadata())
+ if VersionSplit(LT_VERSION) < VersionSplit('2.0.0.0'):
+ metadata = torrent_info.metadata()
+ else:
+ metadata = torrent_info.info_section()
- return torrent_id, metadata
+ log.debug('remove prefetch magnet from session')
+ result_queue = self.prefetching_metadata.pop(torrent_id).result_queue
+ self.session.remove_torrent(torrent_handle, 1)
+ result = torrent_id, b64encode(metadata)
+
+ for d in result_queue:
+ d.callback(result)
+ return result
def _build_torrent_options(self, options):
"""Load default options and update if needed."""
@@ -438,14 +435,10 @@ class TorrentManager(component.Component):
elif magnet:
magnet_info = get_magnet_info(magnet)
if magnet_info:
- add_torrent_params['url'] = magnet.strip().encode('utf8')
add_torrent_params['name'] = magnet_info['name']
+ add_torrent_params['trackers'] = list(magnet_info['trackers'])
torrent_id = magnet_info['info_hash']
- # Workaround lt 1.2 bug for magnet resume data with no metadata
- if resume_data and VersionSplit(LT_VERSION) >= VersionSplit('1.2.10.0'):
- add_torrent_params['info_hash'] = bytes(
- bytearray.fromhex(torrent_id)
- )
+ add_torrent_params['info_hash'] = bytes(bytearray.fromhex(torrent_id))
else:
raise AddTorrentError(
'Unable to add magnet, invalid magnet info: %s' % magnet
@@ -460,7 +453,7 @@ class TorrentManager(component.Component):
raise AddTorrentError('Torrent already being added (%s).' % torrent_id)
elif torrent_id in self.prefetching_metadata:
# Cancel and remove metadata fetching torrent.
- self.prefetching_metadata[torrent_id].defer.cancel()
+ self.prefetching_metadata[torrent_id].alert_deferred.cancel()
# Check for renamed files and if so, rename them in the torrent_info before adding.
if options['mapped_files'] and torrent_info:
@@ -821,12 +814,9 @@ class TorrentManager(component.Component):
try:
with open(filepath, 'rb') as _file:
- if PY2:
- state = pickle.load(_file)
- else:
- state = pickle.load(_file, encoding='utf8')
- except (IOError, EOFError, pickle.UnpicklingError) as ex:
- message = 'Unable to load {}: {}'.format(filepath, ex)
+ state = pickle.load(_file, encoding='utf8')
+ except (OSError, EOFError, pickle.UnpicklingError) as ex:
+ message = f'Unable to load {filepath}: {ex}'
log.error(message)
if not filepath.endswith('.bak'):
self.archive_state(message)
@@ -1082,7 +1072,7 @@ class TorrentManager(component.Component):
try:
with open(_filepath, 'rb') as _file:
resume_data = lt.bdecode(_file.read())
- except (IOError, EOFError, RuntimeError) as ex:
+ except (OSError, EOFError, RuntimeError) as ex:
if self.torrents:
log.warning('Unable to load %s: %s', _filepath, ex)
resume_data = None
@@ -1366,10 +1356,8 @@ class TorrentManager(component.Component):
torrent.set_tracker_status('Announce OK')
# Check for peer information from the tracker, if none then send a scrape request.
- if (
- alert.handle.status().num_complete == -1
- or alert.handle.status().num_incomplete == -1
- ):
+ torrent.get_lt_status()
+ if torrent.status.num_complete == -1 or torrent.status.num_incomplete == -1:
torrent.scrape_tracker()
def on_alert_tracker_announce(self, alert):
@@ -1404,22 +1392,18 @@ class TorrentManager(component.Component):
log.debug(
'Tracker Error Alert: %s [%s]', decode_bytes(alert.message()), error_message
)
- if VersionSplit(LT_VERSION) >= VersionSplit('1.2.0.0'):
- # libtorrent 1.2 added endpoint struct to each tracker. to prevent false updates
- # we will need to verify that at least one endpoint to the errored tracker is working
- for tracker in torrent.handle.trackers():
- if tracker['url'] == alert.url:
- if any(
- endpoint['last_error']['value'] == 0
- for endpoint in tracker['endpoints']
- ):
- torrent.set_tracker_status('Announce OK')
- else:
- torrent.set_tracker_status('Error: ' + error_message)
- break
- else:
- # preserve old functionality for libtorrent < 1.2
- torrent.set_tracker_status('Error: ' + error_message)
+ # libtorrent 1.2 added endpoint struct to each tracker. to prevent false updates
+ # we will need to verify that at least one endpoint to the errored tracker is working
+ for tracker in torrent.handle.trackers():
+ if tracker['url'] == alert.url:
+ if any(
+ endpoint['last_error']['value'] == 0
+ for endpoint in tracker['endpoints']
+ ):
+ torrent.set_tracker_status('Announce OK')
+ else:
+ torrent.set_tracker_status('Error: ' + error_message)
+ break
def on_alert_storage_moved(self, alert):
"""Alert handler for libtorrent storage_moved_alert"""
@@ -1493,7 +1477,9 @@ class TorrentManager(component.Component):
return
if torrent_id in self.torrents:
# libtorrent add_torrent expects bencoded resume_data.
- self.resume_data[torrent_id] = lt.bencode(alert.resume_data)
+ self.resume_data[torrent_id] = lt.bencode(
+ lt.write_resume_data(alert.params)
+ )
if torrent_id in self.waiting_on_resume_data:
self.waiting_on_resume_data[torrent_id].callback(None)
@@ -1575,7 +1561,7 @@ class TorrentManager(component.Component):
# Try callback to prefetch_metadata method.
try:
- d = self.prefetching_metadata[torrent_id].defer
+ d = self.prefetching_metadata[torrent_id].alert_deferred
except KeyError:
pass
else:
@@ -1621,7 +1607,7 @@ class TorrentManager(component.Component):
except RuntimeError:
continue
if torrent_id in self.torrents:
- self.torrents[torrent_id].update_status(t_status)
+ self.torrents[torrent_id].status = t_status
self.handle_torrents_status_callback(self.torrents_status_requests.pop())