The object will need to be registered with an :class:`RPCServer` to be effective. :param func: the function to export :type func: function :param auth_level: the auth level required to call this method :type auth_level: int """ def wrap(func, *args, **kwargs): func._rpcserver_export = True func._rpcserver_auth_level = auth_level doc = func.__doc__ func.__doc__ = "**RPC Exported Function** (*Auth Level: %s*)\n\n" % auth_level if doc: func.__doc__ += doc return func if type(auth_level) is FunctionType: func = auth_level auth_level = AUTH_LEVEL_DEFAULT return wrap(func) else: return wrap def format_request(call): """ Format the RPCRequest message for debug printing :param call: the request :type call: a RPCRequest :returns: a formatted string for printing :rtype: str """ try: s = call[1] + "(" if call[2]: s += ", ".join([str(x) for x in call[2]]) if call[3]: if call[2]: s += ", " s += ", ".join([key + "=" + str(value) for key, value in call[3].items()]) s += ")" except UnicodeEncodeError: return "UnicodeEncodeError, call: %s" % call else: return s class ServerContextFactory(object): def getContext(self): """ Create an SSL context. This loads the servers cert/private key SSL files for use with the SSL transport. """ ssl_dir = deluge.configmanager.get_config_dir("ssl") ctx = SSL.Context(SSL.SSLv3_METHOD) ctx.use_certificate_file(os.path.join(ssl_dir, "daemon.cert")) ctx.use_privatekey_file(os.path.join(ssl_dir, "daemon.pkey")) return ctx class DelugeRPCProtocol(DelugeTransferProtocol): def message_received(self, request): """ This method is called whenever a message is received from a client. The only message that a client sends to the server is a RPC Request message. If the RPC Request message is valid, then the method is called in :meth:`dispatch`. :param request: the request from the client. :type data: tuple """ if type(request) is not tuple: log.debug("Received invalid message: type is not tuple") return if len(request) < 1: log.debug("Received invalid message: there are no items") return for call in request: if len(call) != 4: log.debug("Received invalid rpc request: number of items " "in request is %s", len(call)) continue #log.debug("RPCRequest: %s", format_request(call)) reactor.callLater(0, self.dispatch, *call) def sendData(self, data): """ Sends the data to the client. :param data: the object that is to be sent to the client. This should be one of the RPC message types. :type data: object """ self.transfer_message(data) def connectionMade(self): """ This method is called when a new client connects. """ peer = self.transport.getPeer() log.info("Deluge Client connection made from: %s:%s", peer.host, peer.port) # Set the initial auth level of this session to AUTH_LEVEL_NONE self.factory.authorized_sessions[self.transport.sessionno] = AUTH_LEVEL_NONE def connectionLost(self, reason): """ This method is called when the client is disconnected. :param reason: the reason the client disconnected. :type reason: str """ # We need to remove this session from various dicts del self.factory.authorized_sessions[self.transport.sessionno] if self.transport.sessionno in self.factory.session_protocols: del self.factory.session_protocols[self.transport.sessionno] if self.transport.sessionno in self.factory.interested_events: del self.factory.interested_events[self.transport.sessionno] log.info("Deluge client disconnected: %s", reason.value) def valid_session(self): return self.transport.sessionno in self.factory.authorized_sessions def dispatch(self, request_id, method, args, kwargs): """ This method is run when a RPC Request is made. It will run the local method and will send either a RPC Response or RPC Error back to the client. :param request_id: the request_id from the client (sent in the RPC Request) :type request_id: int :param method: the local method to call. It must be registered with the :class:`RPCServer`. :type method: str :param args: the arguments to pass to `method` :type args: list :param kwargs: the keyword-arguments to pass to `method` :type kwargs: dict """ def sendError(): """ Sends an error response with the contents of the exception that was raised. """ exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() formated_tb = "".join(traceback.format_tb(exceptionTraceback)) try: self.sendData(( RPC_ERROR, request_id, exceptionType.__name__, exceptionValue._args, exceptionValue._kwargs, formated_tb )) except Exception, err: # This most likely not a deluge exception, let's wrap it log.error("An exception occurred while sending RPC_ERROR to " "client. Wrapping it and resending. Error to " "send(causing exception goes next):\n%s", formated_tb) log.exception(err) try: raise WrappedException(str(exceptionValue), exceptionType.__name__, formated_tb) except: sendError() if method == "daemon.info": # This is a special case and used in the initial connection process self.sendData((RPC_RESPONSE, request_id, deluge.common.get_version())) return elif method == "daemon.login": # This is a special case and used in the initial connection process # We need to authenticate the user here log.debug("RPC dispatch daemon.login") try: client_version = kwargs.pop('client_version', None) if client_version is None: raise IncompatibleClient(deluge.common.get_version()) ret = component.get("AuthManager").authorize(*args, **kwargs) if ret: self.factory.authorized_sessions[self.transport.sessionno] = (ret, args[0]) self.factory.session_protocols[self.transport.sessionno] = self except Exception, e: sendError() if not isinstance(e, _ClientSideRecreateError): log.exception(e) else: self.sendData((RPC_RESPONSE, request_id, (ret))) if not ret: self.transport.loseConnection() finally: return elif method == "daemon.set_event_interest" and self.valid_session(): log.debug("RPC dispatch daemon.set_event_interest") # This special case is to allow clients to set which events they are # interested in receiving. # We are expecting a sequence from the client. try: if self.transport.sessionno not in self.factory.interested_events: self.factory.interested_events[self.transport.sessionno] = [] self.factory.interested_events[self.transport.sessionno].extend(args[0]) except Exception, e: sendError() else: self.sendData((RPC_RESPONSE, request_id, (True))) finally: return if method in self.factory.methods and self.valid_session(): log.debug("RPC dispatch %s", method) try: method_auth_requirement = self.factory.methods[method]._rpcserver_auth_level auth_level = self.factory.authorized_sessions[self.transport.sessionno][0] if auth_level < method_auth_requirement: # This session is not allowed to call this method log.debug("Session %s is trying to call a method it is not " "authorized to call!", self.transport.sessionno) raise NotAuthorizedError(auth_level, method_auth_requirement) # Set the session_id in the factory so that methods can know # which session is calling it. self.factory.session_id = self.transport.sessionno ret = self.factory.methods[method](*args, **kwargs) except Exception, e: sendError() # Don't bother printing out DelugeErrors, because they are just # for the client if not isinstance(e, DelugeError): log.exception("Exception calling RPC request: %s", e) else: # Check if the return value is a deferred, since we'll need to # wait for it to fire before sending the RPC_RESPONSE if isinstance(ret, defer.Deferred): def on_success(result): self.sendData((RPC_RESPONSE, request_id, result)) return result def on_fail(failure): try: failure.raiseException() except Exception, e: sendError() return failure ret.addCallbacks(on_success, on_fail) else: self.sendData((RPC_RESPONSE, request_id, ret)) class RPCServer(component.Component): """ This class is used to handle rpc requests from the client. Objects are registered with this class and their methods are exported using the export decorator. :param port: the port the RPCServer will listen on :type port: int :param interface: the interface to listen on, this may override the `allow_remote` setting :type interface: str :param allow_remote: set True if the server should allow remote connections :type allow_remote: bool :param listen: if False, will not start listening.. This is only useful in Classic Mode :type listen: bool """ def __init__(self, port=58846, interface="", allow_remote=False, listen=True): component.Component.__init__(self, "RPCServer") self.factory = Factory() self.factory.protocol = DelugeRPCProtocol self.factory.session_id = -1 # Holds the registered methods self.factory.methods = {} # Holds the session_ids and auth levels self.factory.authorized_sessions = {} # Holds the protocol objects with the session_id as key self.factory.session_protocols = {} # Holds the interested event list for the sessions self.factory.interested_events = {} self.listen = listen if not listen: return if allow_remote: hostname = "" else: hostname = "localhost" if interface: hostname = interface log.info("Starting DelugeRPC server %s:%s", hostname, port) # Check for SSL keys and generate some if needed check_ssl_keys() try: reactor.listenSSL(port, self.factory, ServerContextFactory(), interface=hostname) except Exception, e: log.info("Daemon already running or port not available..") log.error(e) sys.exit(0) def register_object(self, obj, name=None): """ Registers an object to export it's rpc methods. These methods should be exported with the export decorator prior to registering the object. :param obj: the object that we want to export :type obj: object :param name: the name to use, if None, it will be the class name of the object :type name: str """ if not name: name = obj.__class__.__name__.lower() for d in dir(obj): if d[0] == "_": continue if getattr(getattr(obj, d), '_rpcserver_export', False): log.debug("Registering method: %s", name + "." + d) self.factory.methods[name + "." + d] = getattr(obj, d) def deregister_object(self, obj): """ Deregisters an objects exported rpc methods. :param obj: the object that was previously registered """ for key, value in self.factory.methods.items(): if value.im_self == obj: del self.factory.methods[key] def get_object_method(self, name): """ Returns a registered method. :param name: the name of the method, usually in the form of 'object.method' :type name: str :returns: method :raises KeyError: if `name` is not registered """ return self.factory.methods[name] def get_method_list(self): """ Returns a list of the exported methods. :returns: the exported methods :rtype: list """ return self.factory.methods.keys() def get_session_id(self): """ Returns the session id of the current RPC. :returns: the session id, this will be -1 if no connections have been made :rtype: int """ return self.factory.session_id def get_session_user(self): """ Returns the username calling the current RPC. :returns: the username of the user calling the current RPC :rtype: string """ if not self.listen: return "localclient" session_id = self.get_session_id() if session_id > -1 and session_id in self.factory.authorized_sessions: return self.factory.authorized_sessions[session_id][1] else: # No connections made yet return "" def get_session_auth_level(self): """ Returns the auth level of the user calling the current RPC. :returns: the auth level :rtype: int """ if not self.listen or not self.is_session_valid(self.get_session_id()): return AUTH_LEVEL_ADMIN return self.factory.authorized_sessions[self.get_session_id()][0] def get_rpc_auth_level(self, rpc): """ Returns the auth level requirement for an exported rpc. :returns: the auth level :rtype: int """ self.factory.methods[rpc]._rpcserver_auth_level def is_session_valid(self, session_id): """ Checks if the session is still valid, eg, if the client is still connected. :param session_id: the session id :type session_id: int :returns: True if the session is valid :rtype: bool """ return session_id in self.factory.authorized_sessions def emit_event(self, event): """ Emits the event to interested clients. :param event: the event to emit :type event: :class:`deluge.event.DelugeEvent` """ log.debug("intevents: %s", self.factory.interested_events) # Find sessions interested in this event for session_id, interest in self.factory.interested_events.iteritems(): if event.name in interest: log.debug("Emit Event: %s %s", event.name, event.args) # This session is interested so send a RPC_EVENT self.factory.session_protocols[session_id].sendData( (RPC_EVENT, event.name, event.args) ) def emit_event_for_session_id(self, session_id, event): """ Emits the event to specified session_id. :param session_id: the event to emit :type session_id: int :param event: the event to emit :type event: :class:`deluge.event.DelugeEvent` """ if not self.is_session_valid(session_id): log.debug("Session ID %s is not valid. Not sending event \"%s\".", session_id, event.name) return if session_id not in self.factory.interested_events: log.debug("Session ID %s is not interested in any events. Not sending event \"%s\".", session_id, event.name) return if event.name not in self.factory.interested_events[session_id]: log.debug("Session ID %s is not interested in event \"%s\". Not sending it.", session_id, event.name) return log.debug("Sending event \"%s\" with args \"%s\" to session id \"%s\".", event.name, event.args, session_id) self.factory.session_protocols[session_id].sendData((RPC_EVENT, event.name, event.args)) def check_ssl_keys(): """ Check for SSL cert/key and create them if necessary """ ssl_dir = deluge.configmanager.get_config_dir("ssl") if not os.path.exists(ssl_dir): # The ssl folder doesn't exist so we need to create it os.makedirs(ssl_dir) generate_ssl_keys() else: for f in ("daemon.pkey", "daemon.cert"): if not os.path.exists(os.path.join(ssl_dir, f)): generate_ssl_keys() break def generate_ssl_keys(): """ This method generates a new SSL key/cert. """ digest = "md5" # Generate key pair pkey = crypto.PKey() pkey.generate_key(crypto.TYPE_RSA, 1024) # Generate cert request req = crypto.X509Req() subj = req.get_subject() setattr(subj, "CN", "Deluge Daemon") req.set_pubkey(pkey) req.sign(pkey, digest) # Generate certificate cert = crypto.X509() cert.set_serial_number(0) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(60*60*24*365*5) # Five Years cert.set_issuer(req.get_subject()) cert.set_subject(req.get_subject()) cert.set_pubkey(req.get_pubkey()) cert.sign(pkey, digest) # Write out files ssl_dir = deluge.configmanager.get_config_dir("ssl") open(os.path.join(ssl_dir, "daemon.pkey"), "w").write( crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey) ) open(os.path.join(ssl_dir, "daemon.cert"), "w").write( crypto.dump_certificate(crypto.FILETYPE_PEM, cert) ) # Make the files only readable by this user for f in ("daemon.pkey", "daemon.cert"): os.chmod(os.path.join(ssl_dir, f), stat.S_IREAD | stat.S_IWRITE)