summaryrefslogtreecommitdiffstats
path: root/deluge/ui/web/auth.py
blob: d631f9186b45f1a3adf2cc054e145df92a727a6a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# -*- coding: utf-8 -*-
#
# Copyright (C) 2009 Damien Churchill <damoxc@gmail.com>
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#

from __future__ import unicode_literals

import hashlib
import logging
import os
import time
from datetime import datetime, timedelta
from email.utils import formatdate

from twisted.internet.task import LoopingCall

from deluge.common import AUTH_LEVEL_ADMIN, AUTH_LEVEL_NONE
from deluge.error import NotAuthorizedError
from deluge.ui.web.json_api import JSONComponent, export

log = logging.getLogger(__name__)


def make_checksum(session_id):
    checksum = 0
    for value in [ord(char) for char in session_id]:
        checksum += value
    return checksum


def get_session_id(session_id):
    """
    Checks a session id against its checksum
    """
    if not session_id:
        return None

    try:
        checksum = int(session_id[-4:])
        session_id = session_id[:-4]

        if checksum == make_checksum(session_id):
            return session_id
        return None
    except Exception as ex:
        log.exception(ex)
        return None


def make_expires(timeout):
    dt = timedelta(seconds=timeout)
    expires = time.mktime((datetime.now() + dt).timetuple())
    expires_str = formatdate(timeval=expires, localtime=False, usegmt=True)
    return expires, expires_str


class Auth(JSONComponent):
    """
    The component that implements authentication into the JSON interface.
    """

    def __init__(self, config):
        super(Auth, self).__init__('Auth')
        self.worker = LoopingCall(self._clean_sessions)
        self.config = config

    def start(self):
        self.worker.start(5)

    def stop(self):
        self.worker.stop()

    def _clean_sessions(self):
        now = time.gmtime()
        for session_id in list(self.config['sessions']):
            session = self.config['sessions'][session_id]

            if 'expires' not in session:
                del self.config['sessions'][session_id]
                continue

            if time.gmtime(session['expires']) < now:
                del self.config['sessions'][session_id]
                continue

    def _create_session(self, request, login='admin'):
        """
        Creates a new session.

        :param login: the username of the user logging in, currently \
        only for future use currently.
        :type login: string
        """
        m = hashlib.sha256()
        m.update(os.urandom(32))
        session_id = m.hexdigest()

        expires, expires_str = make_expires(self.config['session_timeout'])
        checksum = str(make_checksum(session_id))

        request.addCookie(
            b'_session_id',
            session_id + checksum,
            path=request.base + b'json',
            expires=expires_str,
        )

        log.debug('Creating session for %s', login)

        if isinstance(self.config['sessions'], list):
            self.config['sessions'] = {}

        self.config['sessions'][session_id] = {
            'login': login,
            'level': AUTH_LEVEL_ADMIN,
            'expires': expires,
        }
        return True

    def check_password(self, password):
        config = self.config
        if 'pwd_sha1' not in config.config:
            log.debug('Failed to find config login details.')
            return False

        s = hashlib.sha1()
        s.update(config['pwd_salt'].encode('utf8'))
        s.update(password.encode('utf8'))
        return s.hexdigest() == config['pwd_sha1']

    def check_request(self, request, method=None, level=None):
        """
        Check to ensure that a request is authorised to call the specified
        method of authentication level.

        :param request: The HTTP request in question
        :type request: twisted.web.http.Request
        :param method: Check the specified method
        :type method: function
        :param level: Check the specified auth level
        :type level: integer

        :raises: Exception
        """
        cookie_sess_id = request.getCookie(b'_session_id')
        if cookie_sess_id:
            session_id = get_session_id(cookie_sess_id.decode())
        else:
            session_id = None

        if session_id not in self.config['sessions']:
            auth_level = AUTH_LEVEL_NONE
            session_id = None
        else:
            session = self.config['sessions'][session_id]
            auth_level = session['level']
            expires, expires_str = make_expires(self.config['session_timeout'])
            session['expires'] = expires

            _session_id = request.getCookie(b'_session_id')
            request.addCookie(
                b'_session_id',
                _session_id,
                path=request.base + b'json',
                expires=expires_str.encode('utf8'),
            )

        if method:
            if not hasattr(method, '_json_export'):
                raise Exception('Not an exported method')

            method_level = getattr(method, '_json_auth_level')
            if method_level is None:
                raise Exception('Method has no auth level')

            level = method_level

        if level is None:
            raise Exception('No level specified to check against')

        request.auth_level = auth_level
        request.session_id = session_id

        if auth_level < level:
            raise NotAuthorizedError(auth_level, level)

    def _change_password(self, new_password):
        """
        Change the password. This is to allow the UI to change/reset a
        password.

        :param new_password: the password to change to
        :type new_password: string
        """
        log.debug('Changing password')
        salt = hashlib.sha1(os.urandom(32)).hexdigest()
        s = hashlib.sha1(salt.encode('utf-8'))
        s.update(new_password.encode('utf8'))
        self.config['pwd_salt'] = salt
        self.config['pwd_sha1'] = s.hexdigest()
        return True

    @export
    def change_password(self, old_password, new_password):
        """
        Change the password.

        :param old_password: the current password
        :type old_password: string
        :param new_password: the password to change to
        :type new_password: string
        """
        if not self.check_password(old_password):
            return False
        return self._change_password(new_password)

    @export(AUTH_LEVEL_NONE)
    def check_session(self, session_id=None):
        """
        Check a session to see if it's still valid.

        :returns: True if the session is valid, False if not.
        :rtype: booleon
        """
        return __request__.session_id is not None

    @export
    def delete_session(self):
        """
        Removes a session.

        :param session_id: the id for the session to remove
        :type session_id: string
        """
        del self.config['sessions'][__request__.session_id]
        return True

    @export(AUTH_LEVEL_NONE)
    def login(self, password):
        """
        Test a password to see if it's valid.

        :param password: the password to test
        :type password: string
        :returns: a session id or False
        :rtype: string or False
        """
        if self.check_password(password):
            log.info('Login success (ClientIP %s)', __request__.getClientIP())
            return self._create_session(__request__)
        else:
            log.error('Login failed (ClientIP %s)', __request__.getClientIP())
            return False