+# Copyright (C) 2021 Free Software Foundation, Inc.
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+"""urwid (terminal/curses) UI."""
+
+import email.utils
+import os
+from pathlib import Path
+import random
+import re
+import subprocess
+import tempfile
+from typing import Any, List, NamedTuple, Optional, Tuple
+
+import urwid
+
+from .. import __homepage__, __version__
+from .. import messages
+from ..control import MessageQueue
+from . import UIBase
+
+
+class HelpContents(NamedTuple):
+ """Content to show in the help page."""
+
+ name: str
+ shortcuts: Tuple[str]
+ contents: Tuple[Any] = ()
+
+
+class SlimButton(urwid.Button):
+ """A button w/out <> markers and w/hidden cursor."""
+
+ def __init__(self, label):
+ self._label = urwid.SelectableIcon("", 0)
+ cols = urwid.Columns([self._label], dividechars=1)
+ urwid.WidgetWrap.__init__(self, cols)
+ self.set_label(label)
+ # Hide the cursor on the button since we style it with reverse text. A
+ # hack, but works. If urwid ever fixes things, we can clean this up.
+ # https://github.com/urwid/urwid/issues/170
+ self._label._cursor_position = len(label) + 1
+
+ def pack(self, size=None, focus=False):
+ return self._label.pack(size, focus)
+
+
+class MemoryEdit(urwid.Edit):
+ """Edit widget that tracks its original value."""
+
+ def __init__(self, *args, **kwargs):
+ self.control = kwargs.pop('control')
+ super().__init__(*args, **kwargs)
+ self.original_edit_text = self.edit_text
+
+ def keypress(self, size, key):
+ if key == 'esc':
+ self.set_edit_text(self.original_edit_text)
+ else:
+ return super().keypress(size, key)
+
+ def is_modified(self):
+ return self.original_edit_text != self.edit_text
+
+
+class MemoryCheckBox(urwid.CheckBox):
+ """CheckBox widget that tracks its original value."""
+
+ def __init__(self, *args, **kwargs):
+ self.control = kwargs.pop('control')
+ super().__init__(*args, **kwargs)
+ self.original_state = self.get_state()
+
+ def keypress(self, size, key):
+ if key == 'esc':
+ self.set_state(self.original_state)
+ else:
+ return super().keypress(size, key)
+
+ def is_modified(self):
+ return self.original_state != self.get_state()
+
+
+def ClickableButton(label, callback, *args, attr_map=None):
+ """Convenience function for generating clickable buttons."""
+ def _trampoline(button):
+ """Helper to swallow the |button| argument."""
+ return callback(*args)
+ button = SlimButton(label)
+ urwid.connect_signal(button, 'click', _trampoline)
+ return urwid.AttrMap(button, attr_map, focus_map='reversed')
+
+
+BUTTON_SEP = urwid.Text(['┃'])
+
+
+class StatusBar(urwid.Columns):
+ """The status bar."""
+
+ def __init__(self, holder):
+ self.holder = holder
+ self._status = urwid.Text(['Initializing ...'])
+ super().__init__([
+ self._status,
+ ('pack', BUTTON_SEP),
+ ('pack', ClickableButton('^G HELP', self.holder.show_help)),
+ ('pack', BUTTON_SEP),
+ ('pack', ClickableButton('M-LEFT BACK', self.holder.go_back)),
+ ('pack', BUTTON_SEP),
+ ('pack', ClickableButton('^C QUIT', self.holder.exit)),
+ ('pack', BUTTON_SEP),
+ ])
+
+ def status(self, text: Optional[str] = '') -> None:
+ if not text:
+ text = f'Idle (GNU/debbugs v{__version__})'
+ else:
+ text = ('standout', text)
+ self._status.set_text(text)
+
+
+class PaneHolder:
+ """The root (blank) dialog that holds UI elements."""
+
+ def __init__(self, statusbar: bool = True, mouse: bool = False):
+ self._status_bar = StatusBar(self)
+ self.panes = []
+ self.widget = urwid.Frame(urwid.SolidFill(), footer=self._status_bar)
+ palette = [
+ ('reversed', 'standout', 'default'),
+ ('frame header', 'dark blue', 'default'),
+ ('e-mail header', 'dark green', 'default'),
+ ('edit', 'white', 'dark gray'),
+ ]
+ self.loop = urwid.MainLoop(
+ self.widget, palette=palette, handle_mouse=mouse,
+ unhandled_input=self.unhandled_input)
+
+ def start(self):
+ self.loop.start()
+ #self.loop.screen.clear()
+ # NB: We don't restore the keys ourselves because loop.screen already
+ # saves & restores them on start & stop.
+ keys = self.loop.screen.tty_signal_keys()
+ self.loop.screen.tty_signal_keys(intr=keys[0], quit=0, start=0, stop=0, susp=keys[4])
+
+ def stop(self):
+ self.loop.stop()
+ # TODO: Doesn't seem to work?
+ _, rows = self.loop.screen.get_cols_rows()
+ urwid.escape.set_cursor_position(0, rows)
+
+ def run(self):
+ self.loop.event_loop.run()
+
+ def unhandled_input(self, key):
+ # First global shortcuts.
+ if key == 'ctrl c':
+ raise urwid.ExitMainLoop()
+ elif key in ('h', '?', 'ctrl g'):
+ self.show_help()
+ elif key == 'meta left':
+ self.go_back()
+ elif key in ('up', 'page up', 'home') and self.widget.focus_position == 'footer':
+ self.widget.set_focus('body')
+ elif self.pane.keypress(key):
+ pass
+ elif key == 'down' and self.widget.focus_position == 'body':
+ self.widget.set_focus('footer')
+ else:
+ self.status(f'Unhandled key "{key}"')
+ return key
+
+ def status(self, text: Optional[str] = '') -> None:
+ self._status_bar.status(text)
+ self.loop.draw_screen()
+
+ def get_help_contents(self) -> Optional[HelpContents]:
+ return HelpContents(
+ name='Global',
+ shortcuts=(
+ ('Ctrl+C', 'Quit'),
+ *((x, 'Show help for the current page') for x in ('Ctrl+G', 'h', '?')),
+ ('Alt+Left', 'Go back to previous page'),
+ ),
+ )
+
+ def show_help(self):
+ if not isinstance(self.pane, HelpPane):
+ self.push_pane(HelpPane(self, self.pane))
+ else:
+ self.status(random.choice((
+ 'Sorry, there is only so much help one can give',
+ 'Sometimes the answer you seek lies inside yourself',
+ 'IDK, try Googling it?',
+ 'Maybe no one knows the answer :(',
+ '(╯°□°)╯︵ ┻━┻',
+ )))
+
+ def go_back(self):
+ if len(self.panes) > 1:
+ # TODO: Add callback to save/check state.
+ self.pop_pane()
+ else:
+ self.status('Cannot go back: already at top')
+
+ def exit(self):
+ raise urwid.ExitMainLoop()
+
+ @property
+ def pane(self):
+ return self.panes[-1] if self.panes else None
+
+ def push_pane(self, pane):
+ self.status()
+ self.panes.append(pane)
+ self.widget.set_body(urwid.Overlay(
+ pane.widget, self.widget.get_body(),
+ align='center', width=('relative', 100),
+ valign='middle', height=('relative', 100),
+ ))
+
+ def pop_pane(self):
+ self.status()
+ self.panes.pop()
+ self.widget.set_body(self.widget.get_body()[0])
+
+
+class Pane:
+ """The common pane API."""
+
+ def __init__(self, holder):
+ self.holder = holder
+
+ def status(self, text: Optional[str] = '') -> None:
+ self.holder.status(text)
+
+ def get_help_contents(self) -> Optional[HelpContents]:
+ return None
+
+ def keypress(self, key) -> bool:
+ return False
+
+
+class HelpPane(Pane):
+ """The help page."""
+
+ def __init__(self, holder, pane):
+ super().__init__(holder)
+ self.pane = pane
+ self.populate()
+
+ def populate(self):
+ contents = [
+ urwid.Divider(),
+ urwid.Text([f'Homepage: {__homepage__}']),
+ ]
+
+ for obj in (self.holder, self, self.pane):
+ help = obj.get_help_contents()
+ if help is None:
+ continue
+
+ contents += [
+ urwid.Divider(),
+ urwid.Text([help.name, ' keyboard shortcuts:']),
+ ]
+ contents.extend(
+ urwid.Columns([(15, urwid.Text([key])), urwid.Text([command])])
+ for key, command in help.shortcuts
+ )
+ if help.contents:
+ contents.append(urwid.Divider())
+ for entry in help.contents:
+ if not entry:
+ contents.append(urwid.Divider())
+ elif isinstance(entry, str):
+ contents.append(urwid.Text([entry]))
+ else:
+ contents.append(entry)
+
+ body = urwid.ListBox(urwid.SimpleListWalker(contents))
+ self.widget = urwid.Frame(body, header=urwid.Text([f'GNU/debbugs version {__version__} help']))
+
+ def get_help_contents(self) -> HelpContents:
+ return HelpContents(
+ name='Help page',
+ shortcuts=(
+ *((x, 'Close help') for x in ('q', 'Escape')),
+ ),
+ )
+
+ def keypress(self, key) -> bool:
+ if key in ('q', 'esc'):
+ self.holder.go_back()
+ else:
+ return False
+ return True
+
+
+class SaveFilePane(Pane):
+ """Save a file to disk."""
+
+ def __init__(self, holder, name, data, callback):
+ super().__init__(holder)
+ self.name = name
+ self.data = data
+ self.callback = callback
+ self.populate()
+
+ def populate(self):
+ self.path = MemoryEdit(edit_text=str(Path.cwd() / self.name))
+ urwid.connect_signal(self.path, 'change', self.check_path)
+ self.status = urwid.Text([''])
+ contents = [
+ urwid.LineBox(self.path),
+ self.status,
+ ]
+ self.check_path(self.path)
+ body = urwid.ListBox(urwid.SimpleFocusListWalker(contents))
+ self.widget = urwid.Frame(body, header=urwid.Text([f'Pick path to save {self.name} ...']))
+
+ def get_help_contents(self) -> HelpContents:
+ return HelpContents(
+ name='Save page',
+ shortcuts=(
+ ('Enter', 'Save the file to the specified path'),
+ ('Escape', 'Cancel save'),
+ ),
+ )
+
+ def keypress(self, key) -> bool:
+ if key == 'enter':
+ self.save()
+ elif key == 'esc':
+ self.callback(False, self.path.edit_text)
+ else:
+ return False
+ return True
+
+ def check_path(self, element, path=None):
+ path = Path(element.edit_text if path is None else path)
+ ret = False
+ if not path.exists():
+ if not path.parent.exists():
+ msg = f'Error: {path.parent} is not a directory'
+ elif not os.access(path.parent, os.W_OK):
+ msg = f'Error: {path.parent} is not writable'
+ else:
+ msg = 'Press enter to save!'
+ ret = True
+ else:
+ msg = f'Error: path already exists; please pick a different name'
+ self.status.set_text(msg)
+ return ret
+
+ def save(self):
+ path = Path(self.path.edit_text)
+ if self.check_path(self.path):
+ try:
+ with path.open('wb') as fp:
+ fp.write(self.data)
+ self.callback(True, self.path.edit_text)
+ except OSError as e:
+ self.status(f'Saving failed: {e}')
+
+
+class BugListPane(Pane):
+ """The bug picker list."""
+
+ def __init__(self, holder, client, control, statuses, bug_width, summary_tags, tags_width, summary_states, states_width):
+ super().__init__(holder)
+ self.client = client
+ self.control = control
+ self.populate(statuses, bug_width, summary_tags, tags_width, summary_states, states_width)
+
+ def populate(self, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) -> None:
+ contents = []
+ for bug, status in sorted(statuses.items()):
+ # Severity levels all happen to start with a diff first letter.
+ sev = status["severity"][0]
+ text = (
+ f"{bug:{bug_width}} " +
+ f"{sev}{summary_tags[bug]:{tags_width}}{summary_states[bug]:{states_width}} " +
+ status["subject"]
+ )
+ contents.append(ClickableButton(text, self.select_bug, status))
+ self.widget = urwid.ListBox(urwid.SimpleFocusListWalker(contents))
+
+ def select_bug(self, status):
+ self.status(f'loading: #{status["id"]}: {status["subject"]}')
+ self.holder.push_pane(BugViewPane(self.holder, self.client, self.control, status))
+
+ def get_help_contents(self) -> HelpContents:
+ return HelpContents(
+ name='Bug list',
+ shortcuts=(
+ *((x, 'Close the list (i.e. quit)') for x in ('q', 'Escape')),
+ ('Enter', 'View selected bug/issue'),
+ ),
+ contents=(
+ 'The bug list is for navigating all the currently discovered issues.',
+ 'Simplify select the issue you want to view and press Enter!',
+ ),
+ )
+
+ def keypress(self, key) -> bool:
+ if key in ('q', 'esc'):
+ raise urwid.ExitMainLoop()
+ else:
+ return False
+ return True
+
+
+class BugViewPane(Pane):
+ """Show details about a specific bug."""
+
+ def __init__(self, holder, client, control, status):
+ super().__init__(holder)
+ self.client = client
+ self.control = control
+ self.bug_status = status
+ self.bug_log = client.get_bug_log(status['id'])
+ self.inputs = []
+ self.populate()
+
+ def populate(self) -> None:
+ status = self.bug_status
+ # https://debbugs.gnu.org/cgi/bugreport.cgi?bug=25740
+ self._header = urwid.Columns([
+ urwid.Text([('frame header', f'#{status["id"]} {status["subject"]}')]),
+ ('pack', BUTTON_SEP),
+ ('pack', ClickableButton('^S SEND', self.send)),
+ ('pack', BUTTON_SEP),
+ ])
+
+ label_width = 10
+ def _label_row(label, element):
+ assert len(label) < label_width
+ return urwid.Columns([
+ #(1, urwid.Text(['|'])),
+ (label_width, urwid.Text([label])),
+ element,
+ ])
+ def _label_edit_row(label, element):
+ self.inputs.append(element)
+ return _label_row(label, urwid.AttrMap(element, 'edit'))
+
+ report_date = email.utils.formatdate(status['date'], localtime=True)
+ # TODO: Show log_modified ?
+ modify_date = email.utils.formatdate(status['last_modified'], localtime=True)
+ contents = [
+ _label_edit_row('Title:', MemoryEdit(edit_text=status["subject"], control='title')),
+ _label_edit_row('Package:', MemoryEdit(edit_text=status["package"], control='package')),
+ _label_row('Reporter:', urwid.Text(status["originator"])),
+ _label_row('Reported:', urwid.Text(report_date)),
+ _label_row('Updated:', urwid.Text(modify_date)),
+ _label_edit_row('Severity:', MemoryEdit(edit_text=status["severity"], control='severity')),
+ _label_edit_row('Tags:', MemoryEdit(edit_text=status['tags'], control='tag')),
+ _label_edit_row('Done:', MemoryEdit(edit_text=status['done'], control='done')),
+ _label_edit_row('Archived:', MemoryCheckBox('', state=status['archived'], control='archive')),
+ # TODO: Add more fields.
+ ]
+
+ for entry in messages.parse_log(self.bug_log):
+ reply_placeholder = urwid.WidgetPlaceholder(urwid.Divider())
+ contents += [
+ urwid.AttrMap(urwid.Divider('─'), 'e-mail header'),
+ # https://debbugs.gnu.org/cgi/bugreport.cgi?msg=14;bug=25740
+ urwid.Columns([
+ urwid.Text([('e-mail header', f'|Message #{entry.number}')]),
+ ('pack', BUTTON_SEP),
+ ('pack', ClickableButton('REPLY', self.reply, entry, reply_placeholder)),
+ ('pack', BUTTON_SEP),
+ ]),
+ ]
+ for header in ('From', 'To', 'CC', 'Subject', 'Date'):
+ if header in entry.headers:
+ contents.append(urwid.Text([('e-mail header', f'|{header}: {entry.headers[header]}')]))
+ contents.append(urwid.Divider())
+
+ for part in entry.attachments:
+ # TODO: Add inline viewer for text/* files, and application/octet-stream (w/user prompt).
+ # e.g. A .patch file encoded as application/octet-stream.
+ contents.append(urwid.Columns([
+ ('pack', urwid.Text([('e-mail header', '[')])),
+ ('pack', ClickableButton(
+ part.name, self.save_attachment, part,
+ attr_map='e-mail header')),
+ ('pack', urwid.Text([('e-mail header', f' ({part.mime_type}, attachment)]')])),
+ ]))
+ if entry.attachments:
+ contents.append(urwid.Divider())
+
+ first = True
+ for part in entry.body:
+ if first:
+ first = False
+ else:
+ contents.append(urwid.Divider())
+ contents.append(urwid.Text([part]))
+
+ contents.append(reply_placeholder)
+
+ contents.append(urwid.AttrMap(urwid.Divider('━'), 'frame header'))
+
+ body = urwid.ListBox(urwid.SimpleListWalker(contents))
+ self.widget = urwid.Frame(body, header=self._header)
+
+ def get_help_contents(self) -> HelpContents:
+ return HelpContents(
+ name='Bug page',
+ shortcuts=(
+ ('Ctrl+S', 'Send updates to the tracker'),
+ ),
+ )
+
+ def keypress(self, key):
+ if key in ('down', 'page down', 'end') and self.widget.focus_position == 'header':
+ self.widget.set_focus('body')
+ elif key == 'up' and self.widget.focus_position == 'body':
+ self.widget.set_focus('header')
+ elif key == 'ctrl s':
+ self.send()
+ else:
+ return False
+ return True
+
+ def reply(self, msg, placeholder):
+ msg_date = msg.headers.get('Date')
+ msg_from = msg.headers.get('From')
+ if msg_from:
+ from_name, _ = email.utils.parseaddr(msg_from)
+ if from_name:
+ msg_from = from_name
+ with tempfile.NamedTemporaryFile() as tmp:
+ if msg_date and msg_from:
+ tmp.write(f'On {msg_date}, {msg_from} wrote:\n'.encode())
+ elif msg_from:
+ tmp.write(f'{msg_from} wrote:\n'.encode())
+ for line in msg.body[0].splitlines():
+ if line:
+ tmp.write(f'> {line}\n'.encode())
+ else:
+ tmp.write(b'>\n')
+ tmp.flush()
+ editor = os.getenv("EDITOR", "true")
+ result = subprocess.run(f'{editor} "{tmp.name}"', shell=True, check=False)
+ tmp.seek(0)
+ edit = MemoryEdit(edit_text=tmp.read(), multiline=True, control='reply')
+ edit.headers = msg.headers
+ self.inputs.append(edit)
+ placeholder.original_widget = urwid.LineBox(edit, title='REPLY', title_attr='e-mail header')
+ self.holder.loop.screen.clear()
+
+ def save_attachment(self, part):
+ def done(success, name):
+ self.holder.go_back()
+ if success:
+ self.status(f'Saved to {name}')
+ else:
+ self.status('Saving failed')
+
+ self.status(f'Saving {part.name} ...')
+ # Try to avoid malicious names.
+ name = part.name.lstrip('.').replace('/', '_')
+ self.holder.push_pane(SaveFilePane(self.holder, name, part.data, done))
+
+ def _gather_control(self):
+ def _tag(num, tags):
+ ret.tag(num, '=', *tags.split())
+ def _done(num, fixer):
+ if fixer:
+ ret.close(num, fixer)
+ else:
+ ret.reopen(num)
+
+ ret = MessageQueue()
+ bug_num = self.bug_status['id']
+
+ mapping = {
+ 'done': _done,
+ 'package': ret.reassign,
+ 'severity': ret.severity,
+ 'tag': _tag,
+ 'title': ret.retitle,
+ }
+
+ for field in self.inputs:
+ if not field.is_modified():
+ continue
+ if not field.control or field.control in {'reply'}:
+ continue
+ func = mapping[field.control]
+ func(bug_num, field.get_edit_text())
+ return ret._get_messages()
+
+ def send(self):
+ self.status('Preparing to send updates ...')
+ # TODO: Find reply ...
+ # TODO: Use subject from reply, or from title.
+ # TODO: Use In-Reply-To from reply.
+ # TODO: Setup To/CC from reply
+
+ replies = [x for x in self.inputs if x.control == 'reply']
+
+ control_domain = self.control.split('@', 1)[1]
+ bug_control_email = f'{self.bug_status["id"]}@{control_domain}'
+ to = None
+ # Mapping unique e-mail address to the display name.
+ cc = {}
+ def _add_cc(name, email):
+ # If the address doesn't exist or the name is blank, add this entry.
+ if not cc.get(email):
+ cc[email] = name
+ subject = None
+ if not replies:
+ to = self.control
+ else:
+ for reply in replies:
+ reply_from = email.utils.parseaddr(reply.headers['From'])
+ if to is None:
+ to = reply_from
+ _add_cc(*reply_from)
+ _add_cc(*email.utils.parseaddr(reply.headers['To']))
+ for addr in email.utils.getaddresses([reply.headers.get('Cc', '')]):
+ _add_cc(*addr)
+ if not subject:
+ subject = reply.headers.get('Subject')
+ cc[bug_control_email] = ''
+ del cc[to[1]]
+ to = email.utils.formataddr(to)
+ if not subject:
+ subject = self.bug_status['subject']
+ else:
+ # Filter out reply prefixes.
+ # English & Spanish: RE
+ # English: FWD
+ # German: AW
+ # Danish: SV
+ # Mandarin: 回覆
+ subject = 'Re: ' + re.sub(r"^((RE|AW|FWD|SV|回复):\s*)+", '', subject, flags=re.I)
+ headers = {
+ 'From': 'Mike Frysinger <vapier@gentoo.org>',
+ 'To': to,
+ 'CC': ', '.join(email.utils.formataddr((v, k)) for (k, v) in cc.items()),
+ 'Subject': subject,
+ }
+
+ with tempfile.NamedTemporaryFile() as tmp:
+ for header in ('From', 'To', 'CC', 'Subject'):
+ tmp.write(f'{header}: {headers[header]}\n'.encode())
+ tmp.write(f'User-Agent: GNU debbugs/{__version__}\n'.encode())
+ tmp.write(b'\n')
+ control_message = self._gather_control()
+ if control_message:
+ tmp.write(b'\n' + control_message.encode() + b'\n')
+ for i, reply in enumerate(replies):
+ if i:
+ tmp.write(b'\n')
+ tmp.write(reply.get_edit_text().strip().encode() + b'\n')
+ tmp.flush()
+ editor = os.getenv("EDITOR", "true")
+ result = subprocess.run(f'{editor} "{tmp.name}"', shell=True, check=False)
+ tmp.seek(0)
+ msg = tmp.read()
+
+ self.holder.loop.screen.clear()
+
+ # TODO: Prompt here.
+
+
+
+class UI(UIBase):
+ """Command line interfacee."""
+
+ def __init__(self, mouse: bool = False):
+ self.dialog = PaneHolder(mouse=mouse)
+ self._started = False
+
+ def start(self):
+ # Only allow one start.
+ assert not self._started
+ self._started = True
+ self.dialog.start()
+
+ def stop(self):
+ # Allow multiple stops.
+ if self._started:
+ self._started = False
+ self.dialog.stop()
+
+ def status(self, text: Optional[str] = '') -> None:
+ """Print status information."""
+ self.dialog.status(text)
+
+ def interface_query(self, client, control, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) -> None:
+ """Run the query interface."""
+ bug_list = BugListPane(self.dialog, client, control, statuses, bug_width, summary_tags, tags_width, summary_states, states_width)
+ self.dialog.push_pane(bug_list)
+
+ # If there's only one bug, automatically display it.
+ if len(statuses) == 1:
+ bug_list.select_bug(next(iter(statuses.values())))
+
+ try:
+ self.dialog.run()
+ finally:
+ self.stop()