summaryrefslogtreecommitdiffstats
path: root/deluge/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/common.py')
-rw-r--r--deluge/common.py268
1 files changed, 136 insertions, 132 deletions
diff --git a/deluge/common.py b/deluge/common.py
index a0e3fdd3b..ecf90a390 100644
--- a/deluge/common.py
+++ b/deluge/common.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
#
# Copyright (C) 2007,2008 Andrew Resch <andrewresch@gmail.com>
#
@@ -8,25 +7,25 @@
#
"""Common functions for various parts of Deluge to use."""
-from __future__ import division, print_function, unicode_literals
-
import base64
import binascii
import functools
import glob
-import locale
import logging
import numbers
import os
import platform
import re
+import socket
import subprocess
import sys
import tarfile
import time
from contextlib import closing
from datetime import datetime
-from io import BytesIO, open
+from io import BytesIO
+from urllib.parse import unquote_plus, urljoin
+from urllib.request import pathname2url
import pkg_resources
@@ -38,14 +37,6 @@ try:
except ImportError:
chardet = None
-try:
- from urllib.parse import unquote_plus, urljoin
- from urllib.request import pathname2url
-except ImportError:
- # PY2 fallback
- from urllib import pathname2url, unquote_plus # pylint: disable=ungrouped-imports
- from urlparse import urljoin # pylint: disable=ungrouped-imports
-
# Windows workaround for HTTPS requests requiring certificate authority bundle.
# see: https://twistedmatrix.com/trac/ticket/9209
if platform.system() in ('Windows', 'Microsoft'):
@@ -53,6 +44,11 @@ if platform.system() in ('Windows', 'Microsoft'):
os.environ['SSL_CERT_FILE'] = where()
+try:
+ import ifaddr
+except ImportError:
+ ifaddr = None
+
if platform.system() not in ('Windows', 'Microsoft', 'Darwin'):
# gi makes dbus available on Window but don't import it as unused.
@@ -84,7 +80,8 @@ JSON_FORMAT = {'indent': 4, 'sort_keys': True, 'ensure_ascii': False}
DBUS_FM_ID = 'org.freedesktop.FileManager1'
DBUS_FM_PATH = '/org/freedesktop/FileManager1'
-PY2 = sys.version_info.major == 2
+# Retained for plugin backward compatibility
+PY2 = False
def get_version():
@@ -111,10 +108,8 @@ def get_default_config_dir(filename=None):
def save_config_path(resource):
app_data_path = os.environ.get('APPDATA')
if not app_data_path:
- try:
- import winreg
- except ImportError:
- import _winreg as winreg # For Python 2.
+ import winreg
+
hkey = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
'Software\\Microsoft\\Windows\\CurrentVersion\\Explorer\\Shell Folders',
@@ -147,14 +142,14 @@ def get_default_download_dir():
try:
user_dirs_path = os.path.join(xdg_config_home, 'user-dirs.dirs')
- with open(user_dirs_path, 'r', encoding='utf8') as _file:
+ with open(user_dirs_path, encoding='utf8') as _file:
for line in _file:
if not line.startswith('#') and line.startswith('XDG_DOWNLOAD_DIR'):
download_dir = os.path.expandvars(
line.partition('=')[2].rstrip().strip('"')
)
break
- except IOError:
+ except OSError:
pass
if not download_dir:
@@ -178,8 +173,8 @@ def archive_files(arc_name, filepaths, message=None, rotate=10):
from deluge.configmanager import get_config_dir
- # Set archive compression to lzma with bz2 fallback.
- arc_comp = 'xz' if not PY2 else 'bz2'
+ # Set archive compression to lzma
+ arc_comp = 'xz'
archive_dir = os.path.join(get_config_dir(), 'archive')
timestamp = datetime.now().replace(microsecond=0).isoformat().replace(':', '-')
@@ -275,7 +270,7 @@ def get_os_version():
os_version = list(platform.mac_ver())
os_version[1] = '' # versioninfo always empty.
elif distro:
- os_version = distro.linux_distribution()
+ os_version = (distro.name(), distro.version(), distro.codename())
else:
os_version = (platform.release(),)
@@ -441,22 +436,22 @@ def fsize(fsize_b, precision=1, shortform=False):
"""
- if fsize_b >= 1024 ** 4:
+ if fsize_b >= 1024**4:
return '%.*f %s' % (
precision,
- fsize_b / 1024 ** 4,
+ fsize_b / 1024**4,
tib_txt_short if shortform else tib_txt,
)
- elif fsize_b >= 1024 ** 3:
+ elif fsize_b >= 1024**3:
return '%.*f %s' % (
precision,
- fsize_b / 1024 ** 3,
+ fsize_b / 1024**3,
gib_txt_short if shortform else gib_txt,
)
- elif fsize_b >= 1024 ** 2:
+ elif fsize_b >= 1024**2:
return '%.*f %s' % (
precision,
- fsize_b / 1024 ** 2,
+ fsize_b / 1024**2,
mib_txt_short if shortform else mib_txt,
)
elif fsize_b >= 1024:
@@ -508,28 +503,28 @@ def fspeed(bps, precision=1, shortform=False):
"""
- if bps < 1024 ** 2:
+ if bps < 1024**2:
return '%.*f %s' % (
precision,
bps / 1024,
_('K/s') if shortform else _('KiB/s'),
)
- elif bps < 1024 ** 3:
+ elif bps < 1024**3:
return '%.*f %s' % (
precision,
- bps / 1024 ** 2,
+ bps / 1024**2,
_('M/s') if shortform else _('MiB/s'),
)
- elif bps < 1024 ** 4:
+ elif bps < 1024**4:
return '%.*f %s' % (
precision,
- bps / 1024 ** 3,
+ bps / 1024**3,
_('G/s') if shortform else _('GiB/s'),
)
else:
return '%.*f %s' % (
precision,
- bps / 1024 ** 4,
+ bps / 1024**4,
_('T/s') if shortform else _('TiB/s'),
)
@@ -552,9 +547,9 @@ def fpeer(num_peers, total_peers):
"""
if total_peers > -1:
- return '{:d} ({:d})'.format(num_peers, total_peers)
+ return f'{num_peers:d} ({total_peers:d})'
else:
- return '{:d}'.format(num_peers)
+ return f'{num_peers:d}'
def ftime(secs):
@@ -580,17 +575,17 @@ def ftime(secs):
if secs <= 0:
time_str = ''
elif secs < 60:
- time_str = '{}s'.format(secs)
+ time_str = f'{secs}s'
elif secs < 3600:
- time_str = '{}m {}s'.format(secs // 60, secs % 60)
+ time_str = f'{secs // 60}m {secs % 60}s'
elif secs < 86400:
- time_str = '{}h {}m'.format(secs // 3600, secs // 60 % 60)
+ time_str = f'{secs // 3600}h {secs // 60 % 60}m'
elif secs < 604800:
- time_str = '{}d {}h'.format(secs // 86400, secs // 3600 % 24)
+ time_str = f'{secs // 86400}d {secs // 3600 % 24}h'
elif secs < 31449600:
- time_str = '{}w {}d'.format(secs // 604800, secs // 86400 % 7)
+ time_str = f'{secs // 604800}w {secs // 86400 % 7}d'
else:
- time_str = '{}y {}w'.format(secs // 31449600, secs // 604800 % 52)
+ time_str = f'{secs // 31449600}y {secs // 604800 % 52}w'
return time_str
@@ -644,17 +639,17 @@ def tokenize(text):
size_units = [
{'prefix': 'b', 'divider': 1, 'singular': 'byte', 'plural': 'bytes'},
- {'prefix': 'KiB', 'divider': 1024 ** 1},
- {'prefix': 'MiB', 'divider': 1024 ** 2},
- {'prefix': 'GiB', 'divider': 1024 ** 3},
- {'prefix': 'TiB', 'divider': 1024 ** 4},
- {'prefix': 'PiB', 'divider': 1024 ** 5},
- {'prefix': 'KB', 'divider': 1000 ** 1},
- {'prefix': 'MB', 'divider': 1000 ** 2},
- {'prefix': 'GB', 'divider': 1000 ** 3},
- {'prefix': 'TB', 'divider': 1000 ** 4},
- {'prefix': 'PB', 'divider': 1000 ** 5},
- {'prefix': 'm', 'divider': 1000 ** 2},
+ {'prefix': 'KiB', 'divider': 1024**1},
+ {'prefix': 'MiB', 'divider': 1024**2},
+ {'prefix': 'GiB', 'divider': 1024**3},
+ {'prefix': 'TiB', 'divider': 1024**4},
+ {'prefix': 'PiB', 'divider': 1024**5},
+ {'prefix': 'KB', 'divider': 1000**1},
+ {'prefix': 'MB', 'divider': 1000**2},
+ {'prefix': 'GB', 'divider': 1000**3},
+ {'prefix': 'TB', 'divider': 1000**4},
+ {'prefix': 'PB', 'divider': 1000**5},
+ {'prefix': 'm', 'divider': 1000**2},
]
@@ -712,6 +707,9 @@ def is_url(url):
True
"""
+ if not url:
+ return False
+
return url.partition('://')[0] in ('http', 'https', 'ftp', 'udp')
@@ -726,6 +724,9 @@ def is_infohash(infohash):
bool: True if valid infohash, False otherwise.
"""
+ if not infohash:
+ return False
+
return len(infohash) == 40 and infohash.isalnum()
@@ -733,6 +734,8 @@ MAGNET_SCHEME = 'magnet:?'
XT_BTIH_PARAM = 'xt=urn:btih:'
DN_PARAM = 'dn='
TR_PARAM = 'tr='
+TR_TIER_PARAM = 'tr.'
+TR_TIER_REGEX = re.compile(r'^tr.(\d+)=(\S+)')
def is_magnet(uri):
@@ -775,8 +778,6 @@ def get_magnet_info(uri):
"""
- tr0_param = 'tr.'
- tr0_param_regex = re.compile(r'^tr.(\d+)=(\S+)')
if not uri.startswith(MAGNET_SCHEME):
return {}
@@ -804,12 +805,14 @@ def get_magnet_info(uri):
tracker = unquote_plus(param[len(TR_PARAM) :])
trackers[tracker] = tier
tier += 1
- elif param.startswith(tr0_param):
- try:
- tier, tracker = re.match(tr0_param_regex, param).groups()
- trackers[tracker] = tier
- except AttributeError:
- pass
+ elif param.startswith(TR_TIER_PARAM):
+ tracker_match = re.match(TR_TIER_REGEX, param)
+ if not tracker_match:
+ continue
+
+ tier, tracker = tracker_match.groups()
+ tracker = unquote_plus(tracker)
+ trackers[tracker] = int(tier)
if info_hash:
if not name:
@@ -904,6 +907,29 @@ def free_space(path):
return disk_data.f_bavail * block_size
+def is_interface(interface):
+ """Check if interface is a valid IP or network adapter.
+
+ Args:
+ interface (str): The IP or interface name to test.
+
+ Returns:
+ bool: Whether interface is valid is not.
+
+ Examples:
+ Windows:
+ >>> is_interface('{7A30AE62-23ZA-3744-Z844-A5B042524871}')
+ >>> is_interface('127.0.0.1')
+ True
+ Linux:
+ >>> is_interface('lo')
+ >>> is_interface('127.0.0.1')
+ True
+
+ """
+ return is_ip(interface) or is_interface_name(interface)
+
+
def is_ip(ip):
"""A test to see if 'ip' is a valid IPv4 or IPv6 address.
@@ -939,15 +965,12 @@ def is_ipv4(ip):
"""
- import socket
-
try:
- if windows_check():
- return socket.inet_aton(ip)
- else:
- return socket.inet_pton(socket.AF_INET, ip)
- except socket.error:
+ socket.inet_pton(socket.AF_INET, ip)
+ except OSError:
return False
+ else:
+ return True
def is_ipv6(ip):
@@ -966,23 +989,51 @@ def is_ipv6(ip):
"""
try:
- import ipaddress
- except ImportError:
- import socket
-
- try:
- return socket.inet_pton(socket.AF_INET6, ip)
- except (socket.error, AttributeError):
- if windows_check():
- log.warning('Unable to verify IPv6 Address on Windows.')
- return True
+ socket.inet_pton(socket.AF_INET6, ip)
+ except OSError:
+ return False
else:
+ return True
+
+
+def is_interface_name(name):
+ """Returns True if an interface name exists.
+
+ Args:
+ name (str): The Interface to test. eg. eth0 linux. GUID on Windows.
+
+ Returns:
+ bool: Whether name is valid or not.
+
+ Examples:
+ >>> is_interface_name("eth0")
+ True
+ >>> is_interface_name("{7A30AE62-23ZA-3744-Z844-A5B042524871}")
+ True
+
+ """
+
+ if not windows_check():
try:
- return ipaddress.IPv6Address(decode_bytes(ip))
- except ipaddress.AddressValueError:
+ socket.if_nametoindex(name)
+ except OSError:
pass
+ else:
+ return True
+
+ if ifaddr:
+ try:
+ adapters = ifaddr.get_adapters()
+ except OSError:
+ return True
+ else:
+ return any([name == a.name for a in adapters])
+
+ if windows_check():
+ regex = '^{[0-9A-Z]{8}-([0-9A-Z]{4}-){3}[0-9A-Z]{12}}$'
+ return bool(re.search(regex, str(name)))
- return False
+ return True
def decode_bytes(byte_str, encoding='utf8'):
@@ -1060,7 +1111,7 @@ def utf8_encode_structure(data):
@functools.total_ordering
-class VersionSplit(object):
+class VersionSplit:
"""
Used for comparing version numbers.
@@ -1239,11 +1290,7 @@ def set_env_variable(name, value):
http://sourceforge.net/p/gramps/code/HEAD/tree/branches/maintenance/gramps32/src/TransUtils.py
"""
# Update Python's copy of the environment variables
- try:
- os.environ[name] = value
- except UnicodeEncodeError:
- # Python 2
- os.environ[name] = value.encode('utf8')
+ os.environ[name] = value
if windows_check():
from ctypes import cdll, windll
@@ -1262,56 +1309,13 @@ def set_env_variable(name, value):
)
# Update the copy maintained by msvcrt (used by gtk+ runtime)
- result = cdll.msvcrt._wputenv('%s=%s' % (name, value))
+ result = cdll.msvcrt._wputenv(f'{name}={value}')
if result != 0:
log.info("Failed to set Env Var '%s' (msvcrt._putenv)", name)
else:
log.debug("Set Env Var '%s' to '%s' (msvcrt._putenv)", name, value)
-def unicode_argv():
- """ Gets sys.argv as list of unicode objects on any platform."""
- if windows_check():
- # Versions 2.x of Python don't support Unicode in sys.argv on
- # Windows, with the underlying Windows API instead replacing multi-byte
- # characters with '?'.
- from ctypes import POINTER, byref, c_int, cdll, windll
- from ctypes.wintypes import LPCWSTR, LPWSTR
-
- get_cmd_linew = cdll.kernel32.GetCommandLineW
- get_cmd_linew.argtypes = []
- get_cmd_linew.restype = LPCWSTR
-
- cmdline_to_argvw = windll.shell32.CommandLineToArgvW
- cmdline_to_argvw.argtypes = [LPCWSTR, POINTER(c_int)]
- cmdline_to_argvw.restype = POINTER(LPWSTR)
-
- cmd = get_cmd_linew()
- argc = c_int(0)
- argv = cmdline_to_argvw(cmd, byref(argc))
- if argc.value > 0:
- # Remove Python executable and commands if present
- start = argc.value - len(sys.argv)
- return [argv[i] for i in range(start, argc.value)]
- else:
- # On other platforms, we have to find the likely encoding of the args and decode
- # First check if sys.stdout or stdin have encoding set
- encoding = getattr(sys.stdout, 'encoding') or getattr(sys.stdin, 'encoding')
- # If that fails, check what the locale is set to
- encoding = encoding or locale.getpreferredencoding()
- # As a last resort, just default to utf-8
- encoding = encoding or 'utf-8'
-
- arg_list = []
- for arg in sys.argv:
- try:
- arg_list.append(arg.decode(encoding))
- except AttributeError:
- arg_list.append(arg)
-
- return arg_list
-
-
def run_profiled(func, *args, **kwargs):
"""
Profile a function with cProfile