summaryrefslogtreecommitdiffstats
path: root/deluge/ui/console/cmdline/command.py
blob: 40edd78f08a8b547f50cdad252a4931f4b6cbd9f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#
# Copyright (C) 2008-2009 Ido Abramovich <ido.deluge@gmail.com>
# Copyright (C) 2009 Andrew Resch <andrewresch@gmail.com>
# Copyright (C) 2011 Nick Lanham <nick@afternight.org>
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#

import logging
import shlex

from twisted.internet import defer

from deluge.ui.client import client
from deluge.ui.console.parser import OptionParser, OptionParserError
from deluge.ui.console.utils.colors import strip_colors

log = logging.getLogger(__name__)


class Commander:
    def __init__(self, cmds, interactive=False):
        self._commands = cmds
        self.interactive = interactive

    def write(self, line):
        print(strip_colors(line))

    def do_command(self, cmd_line):
        """Run a console command.

        Args:
            cmd_line (str): Console command.

        Returns:
            Deferred: A deferred that fires when the command has been executed.

        """
        options = self.parse_command(cmd_line)
        if options:
            return self.exec_command(options)
        return defer.succeed(None)

    def exit(self, status=0, msg=None):
        self._exit = True
        if msg:
            print(msg)

    def parse_command(self, cmd_line):
        """Parse a console command and process with argparse.

        Args:
            cmd_line (str): Console command.

        Returns:
            argparse.Namespace: The parsed command.

        """
        if not cmd_line:
            return
        cmd, _, line = cmd_line.partition(' ')
        try:
            parser = self._commands[cmd].create_parser()
        except KeyError:
            self.write('{!error!}Unknown command: %s' % cmd)
            return

        try:
            args = [cmd] + self._commands[cmd].split(line)
        except ValueError as ex:
            self.write('{!error!}Error parsing command: %s' % ex)
            return

        # Do a little hack here to print 'command --help' properly
        parser._print_help = parser.print_help

        def print_help(f=None):
            if self.interactive:
                self.write(parser.format_help())
            else:
                parser._print_help(f)

        parser.print_help = print_help

        # Only these commands can be run when not connected to a daemon
        not_connected_cmds = ['help', 'connect', 'quit']
        aliases = []
        for c in not_connected_cmds:
            aliases.extend(self._commands[c].aliases)
        not_connected_cmds.extend(aliases)

        if not client.connected() and cmd not in not_connected_cmds:
            self.write(
                '{!error!}Not connected to a daemon, please use the connect command first.'
            )
            return

        try:
            options = parser.parse_args(args=args)
            options.command = cmd
        except TypeError as ex:
            self.write('{!error!}Error parsing options: %s' % ex)
            import traceback

            self.write('%s' % traceback.format_exc())
            return
        except OptionParserError as ex:
            import traceback

            log.warning('Error parsing command "%s":  %s', args, ex)
            self.write('{!error!} %s' % ex)
            parser.print_help()
            return

        if getattr(parser, '_exit', False):
            return
        return options

    def exec_command(self, options, *args):
        """Execute a console command.

        Args:
            options (argparse.Namespace): The command to execute.

        Returns:
            Deferred: A deferred that fires when command has been executed.

        """
        try:
            ret = self._commands[options.command].handle(options)
        except Exception as ex:  # pylint: disable=broad-except
            self.write('{!error!} %s' % ex)
            log.exception(ex)
            import traceback

            self.write('%s' % traceback.format_exc())
            return defer.succeed(True)
        else:
            return ret


class BaseCommand:

    usage = None
    interactive_only = False
    aliases = []
    _name = 'base'
    epilog = ''

    def complete(self, text, *args):
        return []

    def handle(self, options):
        pass

    @property
    def name(self):
        return self._name

    @property
    def name_with_alias(self):
        return '/'.join([self._name] + self.aliases)

    @property
    def description(self):
        return self.__doc__

    def split(self, text):
        text = text.replace('\\', '\\\\')
        result = shlex.split(text)
        for i, s in enumerate(result):
            result[i] = s.replace(r'\ ', ' ')
        result = [s for s in result if s != '']
        return result

    def create_parser(self):
        opts = {
            'prog': self.name_with_alias,
            'description': self.__doc__,
            'epilog': self.epilog,
        }
        if self.usage:
            opts['usage'] = self.usage
        parser = OptionParser(**opts)
        parser.add_argument(self.name, metavar='')
        parser.base_parser = parser
        self.add_arguments(parser)
        return parser

    def add_subparser(self, subparsers):
        opts = {
            'prog': self.name_with_alias,
            'help': self.__doc__,
            'description': self.__doc__,
        }
        if self.usage:
            opts['usage'] = self.usage

        # A workaround for aliases showing as duplicate command names in help output.
        for cmd_name in sorted([self.name] + self.aliases):
            if cmd_name not in subparsers._name_parser_map:
                if cmd_name in self.aliases:
                    opts['help'] = _('`%s` alias' % self.name)
                parser = subparsers.add_parser(cmd_name, **opts)
                break

        self.add_arguments(parser)

    def add_arguments(self, parser):
        pass