diff options
Diffstat (limited to 'deluge/core/alertmanager.py')
-rw-r--r-- | deluge/core/alertmanager.py | 129 |
1 files changed, 90 insertions, 39 deletions
diff --git a/deluge/core/alertmanager.py b/deluge/core/alertmanager.py index 9a1ded52e..cf541f015 100644 --- a/deluge/core/alertmanager.py +++ b/deluge/core/alertmanager.py @@ -14,10 +14,15 @@ This should typically only be used by the Core. Plugins should utilize the `:mod:EventManager` for similar functionality. """ +import contextlib import logging -from types import SimpleNamespace +import threading +import time +from collections import defaultdict +from functools import partial +from typing import Any, Callable -from twisted.internet import reactor +from twisted.internet import reactor, task, threads import deluge.component as component from deluge._libtorrent import lt @@ -31,7 +36,7 @@ class AlertManager(component.Component): def __init__(self): log.debug('AlertManager init...') - component.Component.__init__(self, 'AlertManager', interval=0.3) + component.Component.__init__(self, 'AlertManager') self.session = component.get('Core').session # Increase the alert queue size so that alerts don't get lost. @@ -52,48 +57,88 @@ class AlertManager(component.Component): self.session.apply_settings({'alert_mask': alert_mask}) # handlers is a dictionary of lists {"alert_type": [handler1,h2,..]} - self.handlers = {} + self.handlers = defaultdict(list) + self.handlers_timeout_secs = 2 self.delayed_calls = [] + self._event = threading.Event() def update(self): - self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()] - self.handle_alerts() + pass + + def start(self): + thread = threading.Thread( + target=self.wait_for_alert_in_thread, name='alert-poller', daemon=True + ) + thread.start() + self._event.set() def stop(self): + self.cancel_delayed_calls() + + def pause(self): + self._event.clear() + + def resume(self): + self._event.set() + + def wait_for_alert_in_thread(self): + while self._component_state not in ('Stopping', 'Stopped'): + if self.check_delayed_calls(): + time.sleep(0.05) + continue + + if self.session.wait_for_alert(1000) is None: + continue + if self._event.wait(): + threads.blockingCallFromThread(reactor, self.maybe_handle_alerts) + + def on_delayed_call_timeout(self, result, timeout, **kwargs): + log.warning('Alert handler was timed-out before being called %s', kwargs) + + def cancel_delayed_calls(self): + """Cancel all delayed handlers.""" for delayed_call in self.delayed_calls: - if delayed_call.active(): - delayed_call.cancel() + delayed_call.cancel() self.delayed_calls = [] - def register_handler(self, alert_type, handler): + def check_delayed_calls(self) -> bool: + """Returns True if any handler calls are delayed.""" + self.delayed_calls = [dc for dc in self.delayed_calls if not dc.called] + return len(self.delayed_calls) > 0 + + def maybe_handle_alerts(self) -> None: + if self._component_state != 'Started': + return + + self.handle_alerts() + + def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None: """ Registers a function that will be called when 'alert_type' is pop'd in handle_alerts. The handler function should look like: handler(alert) Where 'alert' is the actual alert object from libtorrent. - :param alert_type: str, this is string representation of the alert name - :param handler: func(alert), the function to be called when the alert is raised + Args: + alert_type: String representation of the libtorrent alert name. + Can be supplied with or without `_alert` suffix. + handler: Callback function when the alert is raised. """ - if alert_type not in self.handlers: - # There is no entry for this alert type yet, so lets make it with an - # empty list. - self.handlers[alert_type] = [] + if alert_type and alert_type.endswith('_alert'): + alert_type = alert_type[: -len('_alert')] - # Append the handler to the list in the handlers dictionary self.handlers[alert_type].append(handler) log.debug('Registered handler for alert %s', alert_type) - def deregister_handler(self, handler): + def deregister_handler(self, handler: Callable[[Any], None]): """ - De-registers the `:param:handler` function from all alert types. + De-registers the `handler` function from all alert types. - :param handler: func, the handler function to deregister + Args: + handler: The handler function to deregister. """ - # Iterate through all handlers and remove 'handler' where found - for (dummy_key, value) in self.handlers.items(): - if handler in value: - # Handler is in this alert type list - value.remove(handler) + for alert_type_handlers in self.handlers.values(): + with contextlib.suppress(ValueError): + alert_type_handlers.remove(handler) def handle_alerts(self): """ @@ -112,26 +157,32 @@ class AlertManager(component.Component): num_alerts, ) - # Loop through all alerts in the queue for alert in alerts: - alert_type = type(alert).__name__ + alert_type = alert.what() + # Display the alert message if log.isEnabledFor(logging.DEBUG): log.debug('%s: %s', alert_type, decode_bytes(alert.message())) + + if alert_type not in self.handlers: + continue + # Call any handlers for this alert type - if alert_type in self.handlers: - for handler in self.handlers[alert_type]: - if log.isEnabledFor(logging.DEBUG): - log.debug('Handling alert: %s', alert_type) - # Copy alert attributes - alert_copy = SimpleNamespace( - **{ - attr: getattr(alert, attr) - for attr in dir(alert) - if not attr.startswith('__') - } - ) - self.delayed_calls.append(reactor.callLater(0, handler, alert_copy)) + for handler in self.handlers[alert_type]: + if log.isEnabledFor(logging.DEBUG): + log.debug('Handling alert: %s', alert_type) + d = task.deferLater(reactor, 0, handler, alert) + on_handler_timeout = partial( + self.on_delayed_call_timeout, + handler=handler.__qualname__, + alert_type=alert_type, + ) + d.addTimeout( + self.handlers_timeout_secs, + reactor, + onTimeoutCancel=on_handler_timeout, + ) + self.delayed_calls.append(d) def set_alert_queue_size(self, queue_size): """Sets the maximum size of the libtorrent alert queue""" |