summaryrefslogtreecommitdiffstats
path: root/deluge/core/alertmanager.py
diff options
context:
space:
mode:
Diffstat (limited to 'deluge/core/alertmanager.py')
-rw-r--r--deluge/core/alertmanager.py129
1 files changed, 90 insertions, 39 deletions
diff --git a/deluge/core/alertmanager.py b/deluge/core/alertmanager.py
index 9a1ded52e..cf541f015 100644
--- a/deluge/core/alertmanager.py
+++ b/deluge/core/alertmanager.py
@@ -14,10 +14,15 @@ This should typically only be used by the Core. Plugins should utilize the
`:mod:EventManager` for similar functionality.
"""
+import contextlib
import logging
-from types import SimpleNamespace
+import threading
+import time
+from collections import defaultdict
+from functools import partial
+from typing import Any, Callable
-from twisted.internet import reactor
+from twisted.internet import reactor, task, threads
import deluge.component as component
from deluge._libtorrent import lt
@@ -31,7 +36,7 @@ class AlertManager(component.Component):
def __init__(self):
log.debug('AlertManager init...')
- component.Component.__init__(self, 'AlertManager', interval=0.3)
+ component.Component.__init__(self, 'AlertManager')
self.session = component.get('Core').session
# Increase the alert queue size so that alerts don't get lost.
@@ -52,48 +57,88 @@ class AlertManager(component.Component):
self.session.apply_settings({'alert_mask': alert_mask})
# handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
- self.handlers = {}
+ self.handlers = defaultdict(list)
+ self.handlers_timeout_secs = 2
self.delayed_calls = []
+ self._event = threading.Event()
def update(self):
- self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
- self.handle_alerts()
+ pass
+
+ def start(self):
+ thread = threading.Thread(
+ target=self.wait_for_alert_in_thread, name='alert-poller', daemon=True
+ )
+ thread.start()
+ self._event.set()
def stop(self):
+ self.cancel_delayed_calls()
+
+ def pause(self):
+ self._event.clear()
+
+ def resume(self):
+ self._event.set()
+
+ def wait_for_alert_in_thread(self):
+ while self._component_state not in ('Stopping', 'Stopped'):
+ if self.check_delayed_calls():
+ time.sleep(0.05)
+ continue
+
+ if self.session.wait_for_alert(1000) is None:
+ continue
+ if self._event.wait():
+ threads.blockingCallFromThread(reactor, self.maybe_handle_alerts)
+
+ def on_delayed_call_timeout(self, result, timeout, **kwargs):
+ log.warning('Alert handler was timed-out before being called %s', kwargs)
+
+ def cancel_delayed_calls(self):
+ """Cancel all delayed handlers."""
for delayed_call in self.delayed_calls:
- if delayed_call.active():
- delayed_call.cancel()
+ delayed_call.cancel()
self.delayed_calls = []
- def register_handler(self, alert_type, handler):
+ def check_delayed_calls(self) -> bool:
+ """Returns True if any handler calls are delayed."""
+ self.delayed_calls = [dc for dc in self.delayed_calls if not dc.called]
+ return len(self.delayed_calls) > 0
+
+ def maybe_handle_alerts(self) -> None:
+ if self._component_state != 'Started':
+ return
+
+ self.handle_alerts()
+
+ def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None:
"""
Registers a function that will be called when 'alert_type' is pop'd
in handle_alerts. The handler function should look like: handler(alert)
Where 'alert' is the actual alert object from libtorrent.
- :param alert_type: str, this is string representation of the alert name
- :param handler: func(alert), the function to be called when the alert is raised
+ Args:
+ alert_type: String representation of the libtorrent alert name.
+ Can be supplied with or without `_alert` suffix.
+ handler: Callback function when the alert is raised.
"""
- if alert_type not in self.handlers:
- # There is no entry for this alert type yet, so lets make it with an
- # empty list.
- self.handlers[alert_type] = []
+ if alert_type and alert_type.endswith('_alert'):
+ alert_type = alert_type[: -len('_alert')]
- # Append the handler to the list in the handlers dictionary
self.handlers[alert_type].append(handler)
log.debug('Registered handler for alert %s', alert_type)
- def deregister_handler(self, handler):
+ def deregister_handler(self, handler: Callable[[Any], None]):
"""
- De-registers the `:param:handler` function from all alert types.
+ De-registers the `handler` function from all alert types.
- :param handler: func, the handler function to deregister
+ Args:
+ handler: The handler function to deregister.
"""
- # Iterate through all handlers and remove 'handler' where found
- for (dummy_key, value) in self.handlers.items():
- if handler in value:
- # Handler is in this alert type list
- value.remove(handler)
+ for alert_type_handlers in self.handlers.values():
+ with contextlib.suppress(ValueError):
+ alert_type_handlers.remove(handler)
def handle_alerts(self):
"""
@@ -112,26 +157,32 @@ class AlertManager(component.Component):
num_alerts,
)
- # Loop through all alerts in the queue
for alert in alerts:
- alert_type = type(alert).__name__
+ alert_type = alert.what()
+
# Display the alert message
if log.isEnabledFor(logging.DEBUG):
log.debug('%s: %s', alert_type, decode_bytes(alert.message()))
+
+ if alert_type not in self.handlers:
+ continue
+
# Call any handlers for this alert type
- if alert_type in self.handlers:
- for handler in self.handlers[alert_type]:
- if log.isEnabledFor(logging.DEBUG):
- log.debug('Handling alert: %s', alert_type)
- # Copy alert attributes
- alert_copy = SimpleNamespace(
- **{
- attr: getattr(alert, attr)
- for attr in dir(alert)
- if not attr.startswith('__')
- }
- )
- self.delayed_calls.append(reactor.callLater(0, handler, alert_copy))
+ for handler in self.handlers[alert_type]:
+ if log.isEnabledFor(logging.DEBUG):
+ log.debug('Handling alert: %s', alert_type)
+ d = task.deferLater(reactor, 0, handler, alert)
+ on_handler_timeout = partial(
+ self.on_delayed_call_timeout,
+ handler=handler.__qualname__,
+ alert_type=alert_type,
+ )
+ d.addTimeout(
+ self.handlers_timeout_secs,
+ reactor,
+ onTimeoutCancel=on_handler_timeout,
+ )
+ self.delayed_calls.append(d)
def set_alert_queue_size(self, queue_size):
"""Sets the maximum size of the libtorrent alert queue"""