+# GNU debbugs interface
+A simple CLI tool for working with https://debbugs.gnu.org/.
+#!/usr/bin/env python3
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""Wrapper to run directly out of git."""
+from pathlib import Path
+import sys
+# Doesn't make sense to use this script for anything else.
+assert __name__ == "__main__"
+assert sys.version_info >= (3, 8), "Python 3.8+ required; found " + sys.version
+TOPDIR = Path(__file__).resolve().parent
+sys.path.insert(0, str(TOPDIR / "src"))
+from gnudebbugs import __main__
+requires = [
+ "setuptools>=42",
+ "wheel"
+build-backend = "setuptools.build_meta"
+#!/usr/bin/env python3
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""Wrapper to run tests directly out of git."""
+from pathlib import Path
+import sys
+import pytest
+TOPDIR = Path(__file__).resolve().parent
+sys.path.insert(0, str(TOPDIR / "src"))
+#!/usr/bin/env python3
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""Tooling to build the project.
+Users should just run:
+$ ./setup.py build
+from pathlib import Path
+import re
+import setuptools
+TOPDIR = Path(__file__).resolve().parent
+long_description = (TOPDIR / "README.md").read_text()
+data = (TOPDIR / "src/gnudebbugs/__init__.py").read_text()
+for m in re.finditer(r"^__([^ ]+)__ = .(.*).$", data, flags=re.M):
+ DUNDERS[m.group(1)] = m.group(2)
+ name="gnudebbugs",
+ version=DUNDERS['version'],
+ author="Mike Frysinger",
+ author_email="vapier@gmail.com",
+ description="CLI for debbugs.gnu.org",
+ long_description=long_description,
+ long_description_content_type="text/markdown",
+ url=DUNDERS['homepage'],
+ project_urls={
+ "Bug Tracker": DUNDERS['issue_tracker'],
+ },
+ classifiers=[
+ "Development Status :: 3 - Alpha",
+ "Environment :: Console",
+ "License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)",
+ "Operating System :: OS Independent",
+ "Programming Language :: Python :: 3",
+ ],
+ package_dir={"": "src"},
+ packages=setuptools.find_packages(where="src"),
+ entry_points={
+ "console_scripts": [
+ 'gnudebbugs=gnudebbugs.__main__:main',
+ ],
+ },
+ python_requires=">=3.8",
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""Common module settings."""
+__version__ = "0"
+__homepage__ = 'https://savannah.gnu.org/projects/gnudebbugs'
+__issue_tracker__ = 'https://savannah.gnu.org/support/?group=gnudebbugs'
+class BaseError(Exception):
+ """Base error class for all errors in this project."""
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""CLI for debbugs.gnu.org (and other debbugs trackers)."""
+import argparse
+import collections
+import math
+from pathlib import Path
+import sys
+from typing import List, Optional, Tuple
+from .control import Control
+from . import pretty
+from . import soap
+from . import ui as ui_mod
+ "gnu": ("https://debbugs.gnu.org/cgi/soap.cgi", "control@debbugs.gnu.org"),
+# Servers reject attempts to query more than this many bugs at once.
+def add_raw_subparser(parser: argparse.ArgumentParser) -> None:
+ group = parser.add_argument_group("output format control")
+ group.add_argument(
+ "--pretty",
+ action="store_true",
+ help="Format the output in a human friendly form",
+ )
+ # Have to specify this for each one that uses dest=format and not just the
+ # canonical --format as argparse will use None by default, and clobber the
+ # dest=format.
+ default_format = "json"
+ group.add_argument(
+ "--xml",
+ default=default_format,
+ dest="format",
+ action="store_const",
+ const="full-xml",
+ help="Output the entire raw SOAP (XML) document",
+ )
+ group.add_argument(
+ "--json",
+ default=default_format,
+ dest="format",
+ action="store_const",
+ const="json",
+ help="Output the raw data in JSON (default)",
+ )
+ group.add_argument(
+ "--format",
+ default=default_format,
+ choices=("full-xml", "xml", "json"),
+ help="Select the output format",
+ )
+ parser.add_argument(
+ "--chunk",
+ default=True,
+ action="store_true",
+ help="Automatically break up large requests to avoid server limits",
+ )
+ parser.add_argument(
+ "--no-chunk",
+ default=True,
+ dest="chunk",
+ action="store_false",
+ help="Do not automatically break up large requests",
+ )
+ subparsers = parser.add_subparsers(title="API", dest="api", required=True)
+ subparser = subparsers.add_parser("get_bug_log")
+ subparser.add_argument("bug", type=int, help="The bug to inspect")
+ subparser = subparsers.add_parser("get_bugs")
+ subparser.add_argument("args", nargs="+", help="Search terms")
+ subparser = subparsers.add_parser("get_status")
+ subparser.add_argument("bugs", nargs="+", type=int, help="The bugs to inspect")
+ subparser = subparsers.add_parser("get_usertag")
+ subparser.add_argument("user", help="The user whose tags to lookup")
+ subparser.add_argument("tags", nargs="+", help="Tags to filter by")
+ subparser = subparsers.add_parser("newest_bugs")
+ subparser.add_argument("count", type=int, help="How many recent bugs to return")
+def get_parser() -> argparse.ArgumentParser:
+ """Get CLI parser."""
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument(
+ "-B", "--bts", default="gnu", metavar="SYSTEM", help="The bug tracker to use"
+ )
+ parser.add_argument(
+ "-U",
+ "--ui",
+ default="cli",
+ choices=("cli", "urwid"),
+ help="The UI to use (default: %(default)s)",
+ )
+ parser.add_argument(
+ "--mouse",
+ default=False,
+ action="store_true",
+ help="Enable mouse support (%(default)s)",
+ )
+ parser.add_argument(
+ "--no-mouse",
+ default=False,
+ dest="mouse",
+ action="store_false",
+ help="Do not enable mouse support",
+ )
+ # TODO: Add these to subparsers too.
+ parser.add_argument("-v", "--verbose", action="count", help="More verbose output")
+ parser.add_argument("-q", "--quiet", action="count", help="Only display errors")
+ parser.add_argument(
+ "-n", "--dry-run", dest="dryrun", action="store_true", help="Don't make changes"
+ )
+ group = parser.add_argument_group("e-mail options")
+ group.add_argument("--sendmail", default="sendmail", help="Sendmail command to use")
+ subparsers = parser.add_subparsers(
+ title="Subcommands", dest="command", required=True
+ )
+ # Subcommand: raw
+ subparser = subparsers.add_parser(
+ "raw",
+ aliases=["r"],
+ description="Make direct calls to the server & return raw results",
+ )
+ add_raw_subparser(subparser)
+ # Subcommand: interactive
+ subparser = subparsers.add_parser(
+ "interactive",
+ aliases=["i"],
+ description="Make direct calls to the server & return raw results",
+ )
+ add_raw_subparser(subparser)
+ # Subcommand: query
+ subparser = subparsers.add_parser("query")
+ subparser.add_argument(
+ "args",
+ nargs="+",
+ help="Package or bug numbers to query",
+ metavar="package|bugnum",
+ )
+ # Subcommand: control
+ subparser = subparsers.add_parser(
+ "control", description="Send control messages to change bug state"
+ )
+ subparser.add_argument("commands", nargs="+", help="Send control messages")
+ return parser
+def parse_args(
+ argv: Optional[List[str]] = None,
+) -> Tuple[argparse.ArgumentParser, argparse.Namespace]:
+ """Parse the command line."""
+ if argv is None:
+ argv = sys.argv[1:]
+ parser = get_parser()
+ opts = parser.parse_args(argv)
+ if opts.bts not in SYSTEMS:
+ parser.error(f"Unknown system: {opts.bts}")
+ return (parser, opts)
+ "patch": "+",
+ "wontfix": "W",
+ "moreinfo": "M",
+ "unreproducible": "R",
+ "fixed": "F",
+ "notabug": "N",
+ "pending": "P",
+ "help": "H",
+ "security": "*",
+ "confirmed": "x",
+ "easy": "E",
+def subcmd_query(opts: argparse.Namespace, client: soap.Client) -> Optional[int]:
+ try:
+ ui = ui_mod.get_backend(opts.ui)(mouse=opts.mouse)
+ except ui_mod.Error as e:
+ sys.exit(e)
+ try:
+ ui.start()
+ # Server doesn't maintain order, so we don't either (wrt the user args).
+ ui.status("Searching for bugs ...")
+ bugs = set()
+ for arg in opts.args:
+ try:
+ bug = int(arg)
+ bugs.add(bug)
+ except ValueError:
+ found_bugs = client.get_bugs(package=arg)
+ if found_bugs:
+ bugs.update(found_bugs)
+ if not bugs:
+ sys.exit('\nNo bugs found!')
+ bug_width = math.floor(math.log10(max(bugs))) + 1
+ bugs_list = list(bugs)
+ # Servers restrict how many bugs you can query at once.
+ statuses = {}
+ for i in range(0, len(bugs), GET_STATUS_MAX_COUNT):
+ slice = bugs_list[i : i + GET_STATUS_MAX_COUNT]
+ ui.status(f"Loading {i + len(slice)}/{len(bugs)} bugs ...")
+ found_bugs = client.get_status(slice)
+ if found_bugs:
+ statuses.update(found_bugs)
+ if not statuses:
+ sys.exit('\nNo bugs found!')
+ summary_tags = {}
+ tags_width = 1
+ summary_states = {}
+ states_width = 1
+ for bug, status in statuses.items():
+ bug_tags = set(status["tags"].split())
+ summary_bug_tags = "".join(TAG_SUMMARY[x] for x in bug_tags)
+ tags_width = max(tags_width, len(summary_bug_tags))
+ summary_tags[bug] = summary_bug_tags
+ # TODO: Figure out precedence order.
+ state = ""
+ if status["archived"]:
+ state = "♲"
+ elif status["mergedwith"]:
+ state = "="
+ elif status["blockedby"]:
+ state = "♙"
+ elif status["fixed_versions"]:
+ state = "☺"
+ states_width = max(states_width, len(state))
+ summary_states[bug] = state
+ ui.status()
+ ui.interface_query(client, opts.to, statuses, bug_width, summary_tags, tags_width, summary_states, states_width)
+ except KeyboardInterrupt:
+ # We ignore it here because interface_query already took care of quitting.
+ pass
+ finally:
+ ui.stop()
+def subcmd_control(opts: argparse.Namespace, client: soap.Client) -> Optional[int]:
+ # TODO: Make this configurable.
+ control = Control(opts.to, "Mike Frysinger <vapier@gentoo.org>")
+ for message in opts.commands:
+ control.queue._queue(message)
+ control.send(dryrun=opts.dryrun)
+def subcmd_raw(opts: argparse.Namespace, client: soap.Client) -> Optional[int]:
+ # Is this more terrible than hand maintaining a CLI<->enum mapping?
+ sformat = opts.format.upper().replace("-", "_")
+ for prop in dir(soap.Format):
+ if prop == sformat:
+ format = getattr(soap.Format, prop)
+ break
+ else:
+ raise RuntimeError(f'Unable to locate "{sformat}" in {dir(soap.Format)}')
+ if opts.api == "get_bug_log":
+ ret = client.get_bug_log(opts.bug, format=format)
+ elif opts.api == "get_bugs":
+ kwargs = {}
+ for arg in opts.args:
+ key, value = arg.split("=", 1)
+ kwargs[key] = value
+ ret = client.get_bugs(format=format, **kwargs)
+ elif opts.api == "get_status":
+ bugs = [int(x) for x in opts.bugs]
+ if opts.chunk:
+ # TODO: Implement XML chunking.
+ assert format == soap.Format.JSON, 'Chunking only supported for JSON currently'
+ ret = {}
+ for i in range(0, len(bugs), GET_STATUS_MAX_COUNT):
+ ret.update(
+ client.get_status(bugs[i : i + GET_STATUS_MAX_COUNT], format=format)
+ )
+ else:
+ ret = client.get_status(bugs, format=format)
+ elif opts.api == "get_usertag":
+ ret = client.get_usertag(opts.user, opts.tags, format=format)
+ elif opts.api == "newest_bugs":
+ ret = client.newest_bugs(opts.count, format=format)
+ else:
+ raise RuntimeError(f'Unhandled API "{opts.api}"')
+ if opts.pretty:
+ if format == soap.Format.JSON:
+ ret = pretty.json(ret)
+ print(ret)
+def main(argv: Optional[List[str]] = None) -> Optional[int]:
+ """The main entry point for scripts."""
+ _, opts = parse_args(argv)
+ tracker, to = SYSTEMS[opts.bts]
+ client = soap.Client(tracker)
+ opts.to = to
+ return {"query": subcmd_query, "control": subcmd_control, "raw": subcmd_raw}[
+ opts.command
+ ](opts, client)
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""Control interface to debbugs.
+import collections
+from email.message import EmailMessage
+import smtplib
+import subprocess
+from typing import Any, List, Optional
+import xml.etree.ElementTree as ET
+import requests
+class Control:
+ """A control interface to debbugs."""
+ def __init__(self, host: str):
+ self.host = host
+ self.to = f"control@{host}"
+def assert_action(action: str) -> None:
+ """Verify |action| is a valid action."""
+ if action not in ("+", "-", "="):
+ raise ValueError(f'Invalid action "{action}"')
+def assert_bug(bug: int) -> None:
+ """Verify |bug| is a valid syntax-wise bug number."""
+ if not isinstance(bug, int):
+ raise TypeError(f'Expected integer, but bug "{bug}" is of type {type(bug)}')
+ if bug <= 0:
+ raise ValueError(f"Bugs must be positive, not {bug}")
+def assert_bugs(bugs: List[int]) -> None:
+ """Verify |bugs| are valid syntax-wise bug numbers."""
+ if not bugs:
+ raise ValueError(f"Need at least one bug to merge")
+ for b in bugs:
+ assert_bug(b)
+def assert_email(email: str) -> None:
+ """Verify |email| is a valid e-mail address."""
+ # Special case: debbugs uses ! as an alias to the From address.
+ if email == "!":
+ return
+ # TODO: Lookup RFC.
+ if " " in email or "@" not in email:
+ raise ValueError(f'Invalid e-mail address "{email}"')
+def assert_newid(newid: int) -> None:
+ """Verify |newid| is a valid new id (for cloning)."""
+ if not isinstance(newid, int):
+ raise TypeError(
+ f'Expected integer, but newid "{newid}" is of type {type(newid)}'
+ )
+ if newid >= 0:
+ raise ValueError(f"Newids must be negative, not {newid}")
+def assert_package(package: str) -> None:
+ """Verify |package| is a valid package name."""
+ if " " in package:
+ raise ValueError(f'Invalid package "{package}"')
+TAGS = {
+ "patch",
+ "wontfix",
+ "moreinfo",
+ "unreproducible",
+ "fixed",
+ "notabug",
+ "pending",
+ "help",
+ "security",
+ "confirmed",
+ "easy",
+def assert_tags(tags: List[str]) -> None:
+ """Verify |tags| are valid tag name."""
+ if not tags:
+ raise ValueError(f"Need at least one tag")
+ for t in tags:
+ if t not in TAGS:
+ raise ValueError(f'Invalid tag "{tag}"')
+ "critical",
+ "grave",
+ "serious",
+ "important",
+ "normal",
+ "minor",
+ "wishlist",
+def assert_severity(severity: str) -> None:
+ """Verify |severity| is a valid severity level."""
+ if severity not in SEVERITIES:
+ raise ValueError(f'Invalid severity "{severity}"')
+class MessageQueue:
+ """A queue of messages."""
+ def __init__(self):
+ self.messages = []
+ self.stop_message = "thankyou"
+ def _get_messages(self):
+ if not self.messages:
+ return ''
+ return "\n".join(self.messages) + f"\n{self.stop_message}\n"
+ def _queue(self, message: str) -> None:
+ assert "\n" not in message
+ self.messages.append(message)
+ def close(self, bug: int, fixed: Optional[str] = "") -> None:
+ assert_bug(bug)
+ self._queue(f"close {bug} {fixed}")
+ def reassign(self, bug: int, package: str, version: Optional[str] = "") -> None:
+ assert_bug(bug)
+ assert_package(package)
+ self._queue(f"reassign {bug} {package} {version}")
+ def severity(self, bug: int, severity: str) -> None:
+ assert_bug(bug)
+ assert_severity(severity)
+ self._queue(f"severity {bug} {severity}")
+ def reopen(self, bug: int, address: Optional[str] = "") -> None:
+ assert_bug(bug)
+ if address and address != "=":
+ assert_email(address)
+ self._queue(f"reopen {bug} {address}")
+ def found(self, bug: int, version: Optional[str] = "") -> None:
+ assert_bug(bug)
+ if found:
+ assert_version(version)
+ self._queue(f"found {bug} {version}")
+ def notfound(self, bug: int, version: str) -> None:
+ assert_bug(bug)
+ assert_version(version)
+ self._queue(f"notfound {bug} {version}")
+ def submitter(self, bug: int, address: str) -> None:
+ assert_bug(bug)
+ assert_email(address)
+ self._queue(f"submitter {bug} {address}")
+ def forwarded(self, bug: int, address: str) -> None:
+ assert_bug(bug)
+ assert_email(address)
+ self._queue(f"forwarded {bug} {address}")
+ def notforwarded(self, bug: int) -> None:
+ assert_bug(bug)
+ self._queue(f"notforwarded {bug}")
+ def owner(self, bug: int, address: str) -> None:
+ assert_bug(bug)
+ assert_email(address)
+ self._queue(f"owner {bug} {address}")
+ def noowner(self, bug: int) -> None:
+ assert_bug(bug)
+ self._queue(f"noowner {bug}")
+ def retitle(self, bug: int, title: str) -> None:
+ assert_bug(bug)
+ if "\n" in title:
+ raise ValueError(f"Newlines not allowed in titles")
+ self._queue(f"retitle {bug} {title}")
+ def clone(self, bug: int, *newids: List[int]) -> None:
+ assert_bug(bug)
+ if not newids:
+ raise ValueError(f"Need at least one newid to clone")
+ for b in newids:
+ assert_newid(b)
+ self._queue(f'clone {bug} {" ".join(str(x) for x in newids)}')
+ def merge(self, bug: int, *bugs: List[int]) -> None:
+ assert_bug(bug)
+ assert_bugs(bugs)
+ self._queue(f'merge {bug} {" ".join(str(x) for x in bugs)}')
+ def forcemerge(self, bug: int, *bugs: List[int]) -> None:
+ assert_bug(bug)
+ assert_bugs(bugs)
+ self._queue(f'forcemerge {bug} {" ".join(str(x) for x in bugs)}')
+ def unmerge(self, bug: int) -> None:
+ assert_bug(bug)
+ self._queue(f"unmerge {bug}")
+ def tag(self, bug: int, action: str, *tags: List[str]) -> None:
+ assert_bug(bug)
+ assert_action(action)
+ # TODO: Double check you can clear tags this way. The spec doesn't say it's possible at all ...
+ if action != '=' or tags:
+ assert_tags(tags)
+ self._queue(f'tag {bug} {action} {" ".join(tags)}')
+ def block(self, bug: int, *bugs: List[int]) -> None:
+ assert_bug(bug)
+ assert_bugs(bugs)
+ self._queue(f'block {bug} {" ".join(str(x) for x in bugs)}')
+ def unblock(self, bug: int, *bugs: List[int]) -> None:
+ assert_bug(bug)
+ assert_bugs(bugs)
+ self._queue(f'unblock {bug} {" ".join(str(x) for x in bugs)}')
+ def stop(self, form: Optional[str] = None):
+ """Halt further commands.
+ NB: This is handled automatically.
+ """
+ if form not in ("quit", "stop", "--") and not form.startswith("thank"):
+ raise ValueError(f'Invalid stop command "{form}"')
+ self.stop_message = form
+ def user(self, user: str) -> None:
+ """Set the user to the given email address."""
+ assert_email(user)
+ self._queue(f"user {user}")
+ def usertag(self, bug: int, action: str, *tags: List[str]) -> None:
+ """Sets usertags for the bug report.
+ Action may be +-= to add/remove/set.
+ """
+ assert_action(action)
+ # TODO: Need to check usertags are arbitrary.
+ # assert_tags(tags)
+ self._queue(f'usertag {bug} {action} {"".join(tags)}')
+class Control:
+ def __init__(
+ self, to: str, email: str, use_sendmail: bool = True, sendmail: str = "sendmail"
+ ):
+ self.to = to
+ self.email = email
+ self.use_sendmail = use_sendmail
+ self.sendmail = sendmail
+ self.queue = MessageQueue()
+ def send(self, dryrun: Optional[bool] = False) -> None:
+ msg = EmailMessage()
+ msg["To"] = self.to
+ msg["From"] = self.email
+ msg.set_content(self.queue._get_messages())
+ self._send_email(msg, dryrun=dryrun)
+ def _send_email(self, msg: EmailMessage, dryrun: Optional[bool] = False) -> None:
+ if self.use_sendmail:
+ sendmail = self.sendmail + " -t"
+ if dryrun:
+ print(f"Sending message:\n$ {sendmail}\n{msg}")
+ sendmail = "exit 0; " + sendmail
+ subprocess.run(sendmail, shell=True, input=str(msg).encode(), check=True)
+ else:
+ with smtplib.SMTP("localhost") as s:
+ s.send_message(msg)
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""E-mail helpers.
+E-mail messages often contain a lot of mess & noise. Contain that here.
+import email.parser
+import email.policy
+import email.utils
+from typing import Dict, List, NamedTuple, Tuple
+class Attachment(NamedTuple):
+ """A single attachment."""
+ name: str
+ mime_type: str
+ data: bytes
+class Message(NamedTuple):
+ number: int
+ headers: Dict[str, str]
+ body: Tuple[str]
+ attachments: Tuple[Attachment]
+def parse_entry(entry: Dict) -> Message:
+ """Parse a single |entry| from the bug |log|."""
+ # TODO: Implement this! Have to find an example first ...
+ assert not entry['attachments'], 'attachments not supported!'
+ attachments = []
+ body = []
+ msg = email.message_from_string(entry['header'] + '\n\n' + entry['body'],
+ policy=email.policy.SMTPUTF8)
+ # Figure out which parts we'll actually show.
+ def _flatten_msg(msg):
+ if msg.is_multipart():
+ if msg.get_content_type() == 'multipart/alternative':
+ for part in msg.get_payload():
+ if part.get_content_maintype() == 'text':
+ yield part
+ break
+ else:
+ yield next(iter(msg.get_payload()))
+ else:
+ for part in msg.get_payload():
+ yield from _flatten_msg(part)
+ else:
+ ctype = msg.get_content_type()
+ if ctype != 'application/pgp-signature':
+ yield msg
+ for part in _flatten_msg(msg):
+ ctype = part.get_content_type()
+ mtype = part.get_content_maintype()
+ if mtype == 'text' or ctype == 'multipart/alternative':
+ body.append(part.get_payload().strip())
+ elif mtype == 'application':
+ attachments.append(Attachment(
+ name=part.get_filename(),
+ mime_type=ctype,
+ data=part.get_content(),
+ ))
+ else:
+ body.append(f'Unhandled type: {ctype}')
+ return Message(
+ number=entry['msg_num'],
+ headers=dict(msg),
+ body=body,
+ attachments=attachments,
+ )
+def parse_log(log: List[Dict]) -> Tuple[Message]:
+ """Parse the bug |log| and return structured messages."""
+ yield from (parse_entry(x) for x in log)
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""Helpers for format things for humans."""
+import json as mod_json
+from typing import Any
+def json(obj: Any) -> str:
+ return mod_json.dumps(
+ obj, ensure_ascii=False, indent=2, separators=(",", ": "), sort_keys=True
+ )
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""Bespoke interface to debbugs.
+Since debbugs doesn't provide a standard WSDL, we can't use off-the-shelf SOAP
+modules. Instead we have to define the APIs by hand. Nice.
+import base64
+import enum
+from typing import Any, List, Optional
+import xml.etree.ElementTree as ET
+from . import __version__, BaseError
+import requests
+ import requests_cache
+except ImportError:
+ requests_cache = None
+class Error(BaseError):
+ """Base error class."""
+class ServerError(Error):
+ """The server encountered an error."""
+def _encode(*args: List[Any]):
+ """Recursive encode Python |args|."""
+ for arg in args:
+ if isinstance(arg, int):
+ yield f"<xsd:int>{arg}</xsd:int>"
+ elif isinstance(arg, (list, set, tuple)):
+ yield "".join(_encode(*arg))
+ elif isinstance(arg, str):
+ yield f"<xsd:string>{arg}</xsd:string>"
+ elif isinstance(arg, dict):
+ n = 0
+ for key, value in arg.items():
+ yield f"<a{n}>{key}</a{n}>"
+ n += 1
+ yield f"<a{n}>{value}</a{n}>"
+ n += 1
+ else:
+ raise TypeError(f"Unsupported argument {type(arg)}")
+def _decode(node, node_type: Optional[str] = None) -> Any:
+ """Recursively decode the |node|."""
+ encodingStyle = "{http://schemas.xmlsoap.org/soap/encoding/}"
+ soap = "{http://schemas.xmlsoap.org/soap/envelope/}"
+ soapenc = "{http://schemas.xmlsoap.org/soap/encoding/}"
+ xsd = "{http://www.w3.org/2001/XMLSchema}"
+ xsi = "{http://www.w3.org/2001/XMLSchema-instance}"
+ debbugs = "{Debbugs/SOAP}"
+ gnudebbugs = "<<<gnudebbugs internal hack>>>"
+ if node_type is None:
+ node_type = node.attrib.get(f"{xsi}type")
+ if node.attrib.get(f"{xsi}nil") == "true":
+ assert not list(node)
+ return None
+ elif node_type == "xsd:int":
+ return int(node.text)
+ elif node_type == "xsd:float":
+ return node.text # float(node.text)
+ elif node_type == "xsd:string":
+ # Turn None into an empty string.
+ return "" if not node.text else node.text
+ elif node_type == "xsd:base64Binary":
+ return base64.b64decode(node.text).decode("utf-8")
+ elif node_type in (f"{soapenc}Array", "soapenc:Array"):
+ ret = []
+ for item in node:
+ assert item.tag == f"{debbugs}item"
+ # if f'{xsi}type' not in item.attrib:
+ # Assume typeless <item> is a map.
+ item.attrib.setdefault(f"{xsi}type", f"{gnudebbugs}map")
+ ret.append(_decode(item))
+ return ret
+ elif node_type == "apachens:Map":
+ ret = {}
+ for item in node:
+ key_node, value_node = list(item)
+ if key_node.tag != f"{debbugs}key":
+ key_node, value_node = value_node, key_node
+ key = _decode(key_node)
+ if value_node.attrib.get(f"{xsi}nil") == "true":
+ ret[key] = None
+ else:
+ ret[key] = _decode(value_node, node_type=f"{gnudebbugs}map")
+ return ret
+ elif node_type == f"{gnudebbugs}map":
+ ret = {}
+ for item in node:
+ key = item.tag
+ if key.startswith(debbugs):
+ key = key[len(debbugs) :]
+ ret[key] = _decode(item)
+ return ret
+ elif node_type is None:
+ if not list(node):
+ return None
+ raise TypeError(f"Unhandled node type {node}[{node.attrib}][{node.text}]")
+class Format(enum.IntEnum):
+ """Response output formats."""
+ # The entire document (XML & SOAP headers).
+ FULL_XML = 0
+ # The method-specific XML content.
+ XML = 1
+ # The method-specific XML content converted to JSON.
+ JSON = 2
+def parse(document: str, format: Optional[Format] = None) -> Any:
+ """Parse the SOAP response."""
+ if format == Format.FULL_XML:
+ return document.decode("utf-8")
+ # Assume the structure:
+ # <soap:Envelope>
+ # <soap:Body>
+ # <...method...>
+ # ...response...
+ root = ET.fromstring(document)
+ body = list(root)
+ assert len(body) == 1
+ method = list(body[0])
+ assert len(method) == 1
+ response = list(method[0])
+ assert len(response) == 1
+ if format == Format.XML:
+ assert False, "todo"
+ if format is not None and format != Format.JSON:
+ raise ValueError(f'Unknown format "{format}"')
+ return _decode(response[0])
+class Api:
+ """Generate SOAP documents for API requests."""
+ @classmethod
+ def document(cls, method: str, *args: List[Any]) -> str:
+ """Generate the document for |method| with |args|."""
+ return f"""\
+<?xml version="1.0" encoding="UTF-8"?>\
+<soap:Envelope soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" \
+xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" \
+xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" \
+xmlns:xsd="http://www.w3.org/2001/XMLSchema" \
+ @classmethod
+ def get_bug_log(cls, bug: int) -> str:
+ return cls.document("get_bug_log", bug)
+ @classmethod
+ def get_bugs(cls, **kwargs) -> str:
+ return cls.document("get_bugs", kwargs)
+ @classmethod
+ def get_status(cls, bugs: List[int]) -> str:
+ return cls.document("get_status", bugs)
+ @classmethod
+ def get_usertag(cls, user: str, tags: List[str]) -> str:
+ return cls.document("get_usertag", tags)
+ @classmethod
+ def newest_bugs(cls, count: int) -> str:
+ return cls.document("newest_bugs", count)
+class Client:
+ """Client to access a debbugs site."""
+ def __init__(self, tracker: str):
+ self.tracker = tracker
+ if requests_cache is not None:
+ self.session = requests_cache.CachedSession('gnudebbugs')
+ else:
+ self.session = requests.Session()
+ self.session.headers.update(
+ {
+ # "User-Agent": f'GNU debbugs/{__version__}',
+ "Content-Type": "application/soap+xml",
+ }
+ )
+ def link(self, bug: int) -> str:
+ """Return the URI to the |bug| report."""
+ return self.tracker.rsplit("/", 2)[0] + f"/{bug}"
+ def _post(self, data: Any) -> bytes:
+ """Cover function to make the request.
+ Provides an easy way to mock in tests.
+ """
+ return self.session.post(self.tracker, data=data).content
+ def get_bug_log(self, bug: int, format: Optional[Format] = None) -> Any:
+ """Get full |bug| log."""
+ data = Api.get_bug_log(bug)
+ response = self._post(data)
+ return parse(response, format=format)
+ def get_bugs(self, format: Optional[Format] = None, **kwargs) -> Any:
+ """Search for bugs."""
+ data = Api.get_bugs(**kwargs)
+ response = self._post(data)
+ return parse(response, format=format)
+ def get_status(self, bugs: List[int], format: Optional[Format] = None) -> Any:
+ """Get status for |bugs|."""
+ data = Api.get_status(bugs)
+ response = self._post(data)
+ return parse(response, format=format)
+ def get_usertag(
+ self, user: str, tags: List[str], format: Optional[Format] = None
+ ) -> str:
+ """Lookup usertags."""
+ data = Api.get_usertag(user, tags)
+ response = self._post(data)
+ return parse(response, format=format)
+ def newest_bugs(self, count: int, format: Optional[Format] = None) -> List[int]:
+ """Lookup new bugs."""
+ data = Api.newest_bugs(count)
+ response = self._post(data)
+ return parse(response, format=format)
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""Common UI settings."""
+import abc
+from .. import BaseError
+class Error(BaseError):
+ """Base UI error class."""
+class MissingUrwidError(Error):
+ """The urwid module is unavailable."""
+class UIBase(abc.ABC):
+ """Base class for all UI implementations."""
+ def __init__(self):
+ pass
+ def start(self) -> None:
+ pass
+ def stop(self) -> None:
+ pass
+ def status(self, text: str) -> None:
+ """Print status information."""
+ def interface_query(self, client, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) -> None:
+ """Run the query interface."""
+def get_backend(backend: str) -> UIBase:
+ """Helper to dynamically load the right backend."""
+ if backend == "cli":
+ from . import cli
+ return cli.UI
+ elif backend == "urwid":
+ try:
+ from . import urwid
+ except ImportError as e:
+ raise MissingUrwidError(f'Unable to import urwid: {e}')
+ return urwid.UI
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""Command line interface UI."""
+import os
+from typing import Optional
+from . import UIBase
+class UI(UIBase):
+ """Command line interfacee."""
+ CSI_ERASE_LINE = "\x1b[2K"
+ def __init__(self):
+ self._clear_line_text = self.CSI_ERASE_LINE + "\r" if os.isatty(1) else "\n"
+ def osc8_link(self, url: str, text: str) -> str:
+ return f"\x1b]8;;{url}\x07{text}\x1b]8;;\x07"
+ def _clear_line(self, flush: bool = False):
+ print(self._clear_line_text, end='', flush=flush)
+ def status(self, text: Optional[str] = None) -> None:
+ """Print status information."""
+ self._clear_line()
+ if text:
+ print(text, end='', flush=True)
+ def interface_query(self, client, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) -> None:
+ """Run the query interface."""
+ for bug, status in sorted(statuses.items()):
+ # Severity levels all happen to start with a diff first letter.
+ sev = status["severity"][0]
+ print(
+ self.osc8_link(client.link(bug), f"{bug:{bug_width}}"),
+ f"{sev}{summary_tags[bug]:{tags_width}}{summary_states[bug]:{states_width}}",
+ status["subject"],
+ )
--- /dev/null
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""urwid (terminal/curses) UI."""
+import email.utils
+import os
+from pathlib import Path
+import random
+import re
+import subprocess
+import tempfile
+from typing import Any, List, NamedTuple, Optional, Tuple
+import urwid
+from .. import __homepage__, __version__
+from .. import messages
+from ..control import MessageQueue
+from . import UIBase
+class HelpContents(NamedTuple):
+ """Content to show in the help page."""
+ name: str
+ shortcuts: Tuple[str]
+ contents: Tuple[Any] = ()
+class SlimButton(urwid.Button):
+ """A button w/out <> markers and w/hidden cursor."""
+ def __init__(self, label):
+ self._label = urwid.SelectableIcon("", 0)
+ cols = urwid.Columns([self._label], dividechars=1)
+ urwid.WidgetWrap.__init__(self, cols)
+ self.set_label(label)
+ # Hide the cursor on the button since we style it with reverse text. A
+ # hack, but works. If urwid ever fixes things, we can clean this up.
+ # https://github.com/urwid/urwid/issues/170
+ self._label._cursor_position = len(label) + 1
+ def pack(self, size=None, focus=False):
+ return self._label.pack(size, focus)
+class MemoryEdit(urwid.Edit):
+ """Edit widget that tracks its original value."""
+ def __init__(self, *args, **kwargs):
+ self.control = kwargs.pop('control')
+ super().__init__(*args, **kwargs)
+ self.original_edit_text = self.edit_text
+ def keypress(self, size, key):
+ if key == 'esc':
+ self.set_edit_text(self.original_edit_text)
+ else:
+ return super().keypress(size, key)
+ def is_modified(self):
+ return self.original_edit_text != self.edit_text
+class MemoryCheckBox(urwid.CheckBox):
+ """CheckBox widget that tracks its original value."""
+ def __init__(self, *args, **kwargs):
+ self.control = kwargs.pop('control')
+ super().__init__(*args, **kwargs)
+ self.original_state = self.get_state()
+ def keypress(self, size, key):
+ if key == 'esc':
+ self.set_state(self.original_state)
+ else:
+ return super().keypress(size, key)
+ def is_modified(self):
+ return self.original_state != self.get_state()
+def ClickableButton(label, callback, *args, attr_map=None):
+ """Convenience function for generating clickable buttons."""
+ def _trampoline(button):
+ """Helper to swallow the |button| argument."""
+ return callback(*args)
+ button = SlimButton(label)
+ urwid.connect_signal(button, 'click', _trampoline)
+ return urwid.AttrMap(button, attr_map, focus_map='reversed')
+BUTTON_SEP = urwid.Text(['┃'])
+class StatusBar(urwid.Columns):
+ """The status bar."""
+ def __init__(self, holder):
+ self.holder = holder
+ self._status = urwid.Text(['Initializing ...'])
+ super().__init__([
+ self._status,
+ ('pack', BUTTON_SEP),
+ ('pack', ClickableButton('^G HELP', self.holder.show_help)),
+ ('pack', BUTTON_SEP),
+ ('pack', ClickableButton('M-LEFT BACK', self.holder.go_back)),
+ ('pack', BUTTON_SEP),
+ ('pack', ClickableButton('^C QUIT', self.holder.exit)),
+ ('pack', BUTTON_SEP),
+ ])
+ def status(self, text: Optional[str] = '') -> None:
+ if not text:
+ text = f'Idle (GNU/debbugs v{__version__})'
+ else:
+ text = ('standout', text)
+ self._status.set_text(text)
+class PaneHolder:
+ """The root (blank) dialog that holds UI elements."""
+ def __init__(self, statusbar: bool = True, mouse: bool = False):
+ self._status_bar = StatusBar(self)
+ self.panes = []
+ self.widget = urwid.Frame(urwid.SolidFill(), footer=self._status_bar)
+ palette = [
+ ('reversed', 'standout', 'default'),
+ ('frame header', 'dark blue', 'default'),
+ ('e-mail header', 'dark green', 'default'),
+ ('edit', 'white', 'dark gray'),
+ ]
+ self.loop = urwid.MainLoop(
+ self.widget, palette=palette, handle_mouse=mouse,
+ unhandled_input=self.unhandled_input)
+ def start(self):
+ self.loop.start()
+ #self.loop.screen.clear()
+ # NB: We don't restore the keys ourselves because loop.screen already
+ # saves & restores them on start & stop.
+ keys = self.loop.screen.tty_signal_keys()
+ self.loop.screen.tty_signal_keys(intr=keys[0], quit=0, start=0, stop=0, susp=keys[4])
+ def stop(self):
+ self.loop.stop()
+ # TODO: Doesn't seem to work?
+ _, rows = self.loop.screen.get_cols_rows()
+ urwid.escape.set_cursor_position(0, rows)
+ def run(self):
+ self.loop.event_loop.run()
+ def unhandled_input(self, key):
+ # First global shortcuts.
+ if key == 'ctrl c':
+ raise urwid.ExitMainLoop()
+ elif key in ('h', '?', 'ctrl g'):
+ self.show_help()
+ elif key == 'meta left':
+ self.go_back()
+ elif key in ('up', 'page up', 'home') and self.widget.focus_position == 'footer':
+ self.widget.set_focus('body')
+ elif self.pane.keypress(key):
+ pass
+ elif key == 'down' and self.widget.focus_position == 'body':
+ self.widget.set_focus('footer')
+ else:
+ self.status(f'Unhandled key "{key}"')
+ return key
+ def status(self, text: Optional[str] = '') -> None:
+ self._status_bar.status(text)
+ self.loop.draw_screen()
+ def get_help_contents(self) -> Optional[HelpContents]:
+ return HelpContents(
+ name='Global',
+ shortcuts=(
+ ('Ctrl+C', 'Quit'),
+ *((x, 'Show help for the current page') for x in ('Ctrl+G', 'h', '?')),
+ ('Alt+Left', 'Go back to previous page'),
+ ),
+ )
+ def show_help(self):
+ if not isinstance(self.pane, HelpPane):
+ self.push_pane(HelpPane(self, self.pane))
+ else:
+ self.status(random.choice((
+ 'Sorry, there is only so much help one can give',
+ 'Sometimes the answer you seek lies inside yourself',
+ 'IDK, try Googling it?',
+ 'Maybe no one knows the answer :(',
+ '(╯°□°)╯︵ ┻━┻',
+ )))
+ def go_back(self):
+ if len(self.panes) > 1:
+ # TODO: Add callback to save/check state.
+ self.pop_pane()
+ else:
+ self.status('Cannot go back: already at top')
+ def exit(self):
+ raise urwid.ExitMainLoop()
+ @property
+ def pane(self):
+ return self.panes[-1] if self.panes else None
+ def push_pane(self, pane):
+ self.status()
+ self.panes.append(pane)
+ self.widget.set_body(urwid.Overlay(
+ pane.widget, self.widget.get_body(),
+ align='center', width=('relative', 100),
+ valign='middle', height=('relative', 100),
+ ))
+ def pop_pane(self):
+ self.status()
+ self.panes.pop()
+ self.widget.set_body(self.widget.get_body()[0])
+class Pane:
+ """The common pane API."""
+ def __init__(self, holder):
+ self.holder = holder
+ def status(self, text: Optional[str] = '') -> None:
+ self.holder.status(text)
+ def get_help_contents(self) -> Optional[HelpContents]:
+ return None
+ def keypress(self, key) -> bool:
+ return False
+class HelpPane(Pane):
+ """The help page."""
+ def __init__(self, holder, pane):
+ super().__init__(holder)
+ self.pane = pane
+ self.populate()
+ def populate(self):
+ contents = [
+ urwid.Divider(),
+ urwid.Text([f'Homepage: {__homepage__}']),
+ ]
+ for obj in (self.holder, self, self.pane):
+ help = obj.get_help_contents()
+ if help is None:
+ continue
+ contents += [
+ urwid.Divider(),
+ urwid.Text([help.name, ' keyboard shortcuts:']),
+ ]
+ contents.extend(
+ urwid.Columns([(15, urwid.Text([key])), urwid.Text([command])])
+ for key, command in help.shortcuts
+ )
+ if help.contents:
+ contents.append(urwid.Divider())
+ for entry in help.contents:
+ if not entry:
+ contents.append(urwid.Divider())
+ elif isinstance(entry, str):
+ contents.append(urwid.Text([entry]))
+ else:
+ contents.append(entry)
+ body = urwid.ListBox(urwid.SimpleListWalker(contents))
+ self.widget = urwid.Frame(body, header=urwid.Text([f'GNU/debbugs version {__version__} help']))
+ def get_help_contents(self) -> HelpContents:
+ return HelpContents(
+ name='Help page',
+ shortcuts=(
+ *((x, 'Close help') for x in ('q', 'Escape')),
+ ),
+ )
+ def keypress(self, key) -> bool:
+ if key in ('q', 'esc'):
+ self.holder.go_back()
+ else:
+ return False
+ return True
+class SaveFilePane(Pane):
+ """Save a file to disk."""
+ def __init__(self, holder, name, data, callback):
+ super().__init__(holder)
+ self.name = name
+ self.data = data
+ self.callback = callback
+ self.populate()
+ def populate(self):
+ self.path = MemoryEdit(edit_text=str(Path.cwd() / self.name))
+ urwid.connect_signal(self.path, 'change', self.check_path)
+ self.status = urwid.Text([''])
+ contents = [
+ urwid.LineBox(self.path),
+ self.status,
+ ]
+ self.check_path(self.path)
+ body = urwid.ListBox(urwid.SimpleFocusListWalker(contents))
+ self.widget = urwid.Frame(body, header=urwid.Text([f'Pick path to save {self.name} ...']))
+ def get_help_contents(self) -> HelpContents:
+ return HelpContents(
+ name='Save page',
+ shortcuts=(
+ ('Enter', 'Save the file to the specified path'),
+ ('Escape', 'Cancel save'),
+ ),
+ )
+ def keypress(self, key) -> bool:
+ if key == 'enter':
+ self.save()
+ elif key == 'esc':
+ self.callback(False, self.path.edit_text)
+ else:
+ return False
+ return True
+ def check_path(self, element, path=None):
+ path = Path(element.edit_text if path is None else path)
+ ret = False
+ if not path.exists():
+ if not path.parent.exists():
+ msg = f'Error: {path.parent} is not a directory'
+ elif not os.access(path.parent, os.W_OK):
+ msg = f'Error: {path.parent} is not writable'
+ else:
+ msg = 'Press enter to save!'
+ ret = True
+ else:
+ msg = f'Error: path already exists; please pick a different name'
+ self.status.set_text(msg)
+ return ret
+ def save(self):
+ path = Path(self.path.edit_text)
+ if self.check_path(self.path):
+ try:
+ with path.open('wb') as fp:
+ fp.write(self.data)
+ self.callback(True, self.path.edit_text)
+ except OSError as e:
+ self.status(f'Saving failed: {e}')
+class BugListPane(Pane):
+ """The bug picker list."""
+ def __init__(self, holder, client, control, statuses, bug_width, summary_tags, tags_width, summary_states, states_width):
+ super().__init__(holder)
+ self.client = client
+ self.control = control
+ self.populate(statuses, bug_width, summary_tags, tags_width, summary_states, states_width)
+ def populate(self, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) -> None:
+ contents = []
+ for bug, status in sorted(statuses.items()):
+ # Severity levels all happen to start with a diff first letter.
+ sev = status["severity"][0]
+ text = (
+ f"{bug:{bug_width}} " +
+ f"{sev}{summary_tags[bug]:{tags_width}}{summary_states[bug]:{states_width}} " +
+ status["subject"]
+ )
+ contents.append(ClickableButton(text, self.select_bug, status))
+ self.widget = urwid.ListBox(urwid.SimpleFocusListWalker(contents))
+ def select_bug(self, status):
+ self.status(f'loading: #{status["id"]}: {status["subject"]}')
+ self.holder.push_pane(BugViewPane(self.holder, self.client, self.control, status))
+ def get_help_contents(self) -> HelpContents:
+ return HelpContents(
+ name='Bug list',
+ shortcuts=(
+ *((x, 'Close the list (i.e. quit)') for x in ('q', 'Escape')),
+ ('Enter', 'View selected bug/issue'),
+ ),
+ contents=(
+ 'The bug list is for navigating all the currently discovered issues.',
+ 'Simplify select the issue you want to view and press Enter!',
+ ),
+ )
+ def keypress(self, key) -> bool:
+ if key in ('q', 'esc'):
+ raise urwid.ExitMainLoop()
+ else:
+ return False
+ return True
+class BugViewPane(Pane):
+ """Show details about a specific bug."""
+ def __init__(self, holder, client, control, status):
+ super().__init__(holder)
+ self.client = client
+ self.control = control
+ self.bug_status = status
+ self.bug_log = client.get_bug_log(status['id'])
+ self.inputs = []
+ self.populate()
+ def populate(self) -> None:
+ status = self.bug_status
+ # https://debbugs.gnu.org/cgi/bugreport.cgi?bug=25740
+ self._header = urwid.Columns([
+ urwid.Text([('frame header', f'#{status["id"]} {status["subject"]}')]),
+ ('pack', BUTTON_SEP),
+ ('pack', ClickableButton('^S SEND', self.send)),
+ ('pack', BUTTON_SEP),
+ ])
+ label_width = 10
+ def _label_row(label, element):
+ assert len(label) < label_width
+ return urwid.Columns([
+ #(1, urwid.Text(['|'])),
+ (label_width, urwid.Text([label])),
+ element,
+ ])
+ def _label_edit_row(label, element):
+ self.inputs.append(element)
+ return _label_row(label, urwid.AttrMap(element, 'edit'))
+ report_date = email.utils.formatdate(status['date'], localtime=True)
+ # TODO: Show log_modified ?
+ modify_date = email.utils.formatdate(status['last_modified'], localtime=True)
+ contents = [
+ _label_edit_row('Title:', MemoryEdit(edit_text=status["subject"], control='title')),
+ _label_edit_row('Package:', MemoryEdit(edit_text=status["package"], control='package')),
+ _label_row('Reporter:', urwid.Text(status["originator"])),
+ _label_row('Reported:', urwid.Text(report_date)),
+ _label_row('Updated:', urwid.Text(modify_date)),
+ _label_edit_row('Severity:', MemoryEdit(edit_text=status["severity"], control='severity')),
+ _label_edit_row('Tags:', MemoryEdit(edit_text=status['tags'], control='tag')),
+ _label_edit_row('Done:', MemoryEdit(edit_text=status['done'], control='done')),
+ _label_edit_row('Archived:', MemoryCheckBox('', state=status['archived'], control='archive')),
+ # TODO: Add more fields.
+ ]
+ for entry in messages.parse_log(self.bug_log):
+ reply_placeholder = urwid.WidgetPlaceholder(urwid.Divider())
+ contents += [
+ urwid.AttrMap(urwid.Divider('─'), 'e-mail header'),
+ # https://debbugs.gnu.org/cgi/bugreport.cgi?msg=14;bug=25740
+ urwid.Columns([
+ urwid.Text([('e-mail header', f'|Message #{entry.number}')]),
+ ('pack', BUTTON_SEP),
+ ('pack', ClickableButton('REPLY', self.reply, entry, reply_placeholder)),
+ ('pack', BUTTON_SEP),
+ ]),
+ ]
+ for header in ('From', 'To', 'CC', 'Subject', 'Date'):
+ if header in entry.headers:
+ contents.append(urwid.Text([('e-mail header', f'|{header}: {entry.headers[header]}')]))
+ contents.append(urwid.Divider())
+ for part in entry.attachments:
+ # TODO: Add inline viewer for text/* files, and application/octet-stream (w/user prompt).
+ # e.g. A .patch file encoded as application/octet-stream.
+ contents.append(urwid.Columns([
+ ('pack', urwid.Text([('e-mail header', '[')])),
+ ('pack', ClickableButton(
+ part.name, self.save_attachment, part,
+ attr_map='e-mail header')),
+ ('pack', urwid.Text([('e-mail header', f' ({part.mime_type}, attachment)]')])),
+ ]))
+ if entry.attachments:
+ contents.append(urwid.Divider())
+ first = True
+ for part in entry.body:
+ if first:
+ first = False
+ else:
+ contents.append(urwid.Divider())
+ contents.append(urwid.Text([part]))
+ contents.append(reply_placeholder)
+ contents.append(urwid.AttrMap(urwid.Divider('━'), 'frame header'))
+ body = urwid.ListBox(urwid.SimpleListWalker(contents))
+ self.widget = urwid.Frame(body, header=self._header)
+ def get_help_contents(self) -> HelpContents:
+ return HelpContents(
+ name='Bug page',
+ shortcuts=(
+ ('Ctrl+S', 'Send updates to the tracker'),
+ ),
+ )
+ def keypress(self, key):
+ if key in ('down', 'page down', 'end') and self.widget.focus_position == 'header':
+ self.widget.set_focus('body')
+ elif key == 'up' and self.widget.focus_position == 'body':
+ self.widget.set_focus('header')
+ elif key == 'ctrl s':
+ self.send()
+ else:
+ return False
+ return True
+ def reply(self, msg, placeholder):
+ msg_date = msg.headers.get('Date')
+ msg_from = msg.headers.get('From')
+ if msg_from:
+ from_name, _ = email.utils.parseaddr(msg_from)
+ if from_name:
+ msg_from = from_name
+ with tempfile.NamedTemporaryFile() as tmp:
+ if msg_date and msg_from:
+ tmp.write(f'On {msg_date}, {msg_from} wrote:\n'.encode())
+ elif msg_from:
+ tmp.write(f'{msg_from} wrote:\n'.encode())
+ for line in msg.body[0].splitlines():
+ if line:
+ tmp.write(f'> {line}\n'.encode())
+ else:
+ tmp.write(b'>\n')
+ tmp.flush()
+ editor = os.getenv("EDITOR", "true")
+ result = subprocess.run(f'{editor} "{tmp.name}"', shell=True, check=False)
+ tmp.seek(0)
+ edit = MemoryEdit(edit_text=tmp.read(), multiline=True, control='reply')
+ edit.headers = msg.headers
+ self.inputs.append(edit)
+ placeholder.original_widget = urwid.LineBox(edit, title='REPLY', title_attr='e-mail header')
+ self.holder.loop.screen.clear()
+ def save_attachment(self, part):
+ def done(success, name):
+ self.holder.go_back()
+ if success:
+ self.status(f'Saved to {name}')
+ else:
+ self.status('Saving failed')
+ self.status(f'Saving {part.name} ...')
+ # Try to avoid malicious names.
+ name = part.name.lstrip('.').replace('/', '_')
+ self.holder.push_pane(SaveFilePane(self.holder, name, part.data, done))
+ def _gather_control(self):
+ def _tag(num, tags):
+ ret.tag(num, '=', *tags.split())
+ def _done(num, fixer):
+ if fixer:
+ ret.close(num, fixer)
+ else:
+ ret.reopen(num)
+ ret = MessageQueue()
+ bug_num = self.bug_status['id']
+ mapping = {
+ 'done': _done,
+ 'package': ret.reassign,
+ 'severity': ret.severity,
+ 'tag': _tag,
+ 'title': ret.retitle,
+ }
+ for field in self.inputs:
+ if not field.is_modified():
+ continue
+ if not field.control or field.control in {'reply'}:
+ continue
+ func = mapping[field.control]
+ func(bug_num, field.get_edit_text())
+ return ret._get_messages()
+ def send(self):
+ self.status('Preparing to send updates ...')
+ # TODO: Find reply ...
+ # TODO: Use subject from reply, or from title.
+ # TODO: Use In-Reply-To from reply.
+ # TODO: Setup To/CC from reply
+ replies = [x for x in self.inputs if x.control == 'reply']
+ control_domain = self.control.split('@', 1)[1]
+ bug_control_email = f'{self.bug_status["id"]}@{control_domain}'
+ to = None
+ # Mapping unique e-mail address to the display name.
+ cc = {}
+ def _add_cc(name, email):
+ # If the address doesn't exist or the name is blank, add this entry.
+ if not cc.get(email):
+ cc[email] = name
+ subject = None
+ if not replies:
+ to = self.control
+ else:
+ for reply in replies:
+ reply_from = email.utils.parseaddr(reply.headers['From'])
+ if to is None:
+ to = reply_from
+ _add_cc(*reply_from)
+ _add_cc(*email.utils.parseaddr(reply.headers['To']))
+ for addr in email.utils.getaddresses([reply.headers.get('Cc', '')]):
+ _add_cc(*addr)
+ if not subject:
+ subject = reply.headers.get('Subject')
+ cc[bug_control_email] = ''
+ del cc[to[1]]
+ to = email.utils.formataddr(to)
+ if not subject:
+ subject = self.bug_status['subject']
+ else:
+ # Filter out reply prefixes.
+ # English & Spanish: RE
+ # English: FWD
+ # German: AW
+ # Danish: SV
+ # Mandarin: 回覆
+ subject = 'Re: ' + re.sub(r"^((RE|AW|FWD|SV|回复):\s*)+", '', subject, flags=re.I)
+ headers = {
+ 'From': 'Mike Frysinger <vapier@gentoo.org>',
+ 'To': to,
+ 'CC': ', '.join(email.utils.formataddr((v, k)) for (k, v) in cc.items()),
+ 'Subject': subject,
+ }
+ with tempfile.NamedTemporaryFile() as tmp:
+ for header in ('From', 'To', 'CC', 'Subject'):
+ tmp.write(f'{header}: {headers[header]}\n'.encode())
+ tmp.write(f'User-Agent: GNU debbugs/{__version__}\n'.encode())
+ tmp.write(b'\n')
+ control_message = self._gather_control()
+ if control_message:
+ tmp.write(b'\n' + control_message.encode() + b'\n')
+ for i, reply in enumerate(replies):
+ if i:
+ tmp.write(b'\n')
+ tmp.write(reply.get_edit_text().strip().encode() + b'\n')
+ tmp.flush()
+ editor = os.getenv("EDITOR", "true")
+ result = subprocess.run(f'{editor} "{tmp.name}"', shell=True, check=False)
+ tmp.seek(0)
+ msg = tmp.read()
+ self.holder.loop.screen.clear()
+ # TODO: Prompt here.
+class UI(UIBase):
+ """Command line interfacee."""
+ def __init__(self, mouse: bool = False):
+ self.dialog = PaneHolder(mouse=mouse)
+ self._started = False
+ def start(self):
+ # Only allow one start.
+ assert not self._started
+ self._started = True
+ self.dialog.start()
+ def stop(self):
+ # Allow multiple stops.
+ if self._started:
+ self._started = False
+ self.dialog.stop()
+ def status(self, text: Optional[str] = '') -> None:
+ """Print status information."""
+ self.dialog.status(text)
+ def interface_query(self, client, control, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) -> None:
+ """Run the query interface."""
+ bug_list = BugListPane(self.dialog, client, control, statuses, bug_width, summary_tags, tags_width, summary_states, states_width)
+ self.dialog.push_pane(bug_list)
+ # If there's only one bug, automatically display it.
+ if len(statuses) == 1:
+ bug_list.select_bug(next(iter(statuses.values())))
+ try:
+ self.dialog.run()
+ finally:
+ self.stop()
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <get_bug_logResponse xmlns="Debbugs/SOAP">
+ <soapenc:Array soapenc:arrayType="xsd:anyType[4]" xsi:type="soapenc:Array">
+ <item>
+ <msg_num xsi:type="xsd:int">5</msg_num>
+ <attachments soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <body xsi:type="xsd:string">I compiled ...</body>
+ <header xsi:type="xsd:string">...</header>
+ </item>
+ <item>
+ <msg_num xsi:type="xsd:int">8</msg_num>
+ <attachments soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <body xsi:type="xsd:string">reassign 33711 autoconf...</body>
+ <header xsi:type="xsd:string">...</header>
+ </item>
+ <item>
+ <header xsi:type="xsd:string">...</header>
+ <msg_num xsi:type="xsd:int">10</msg_num>
+ <attachments soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <body xsi:type="xsd:string">reassign 33711 autoconf
+ </item>
+ <item>
+ <body xsi:type="xsd:string">owner 33711 bug-autoconf@gnu.org
+ <msg_num xsi:type="xsd:int">12</msg_num>
+ <attachments soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <header xsi:type="xsd:string">...</header>
+ </item>
+ </soapenc:Array>
+ </get_bug_logResponse>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <soap:Fault>
+ <faultcode>soap:Server</faultcode>
+ <faultstring>Unable to open bug log /var/lib/debbugs/spool/db-h/12/3371101212.log for reading: No such file or directory at /usr/share/perl5/Debbugs/Log.pm line 199.
+ </soap:Fault>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <get_bugsResponse xmlns="Debbugs/SOAP">
+ <soapenc:Array soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ </get_bugsResponse>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <get_bugsResponse xmlns="Debbugs/SOAP">
+ <soapenc:Array soapenc:arrayType="xsd:int[2]" xsi:type="soapenc:Array">
+ <item xsi:type="xsd:int">25740</item>
+ <item xsi:type="xsd:int">33711</item>
+ </soapenc:Array>
+ </get_bugsResponse>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <get_statusResponse xmlns="Debbugs/SOAP">
+ <s-gensym3/>
+ </get_statusResponse>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:apachens="http://xml.apache.org/xml-soap"
+ xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <get_statusResponse xmlns="Debbugs/SOAP">
+ <s-gensym3 xsi:type="apachens:Map">
+ <item>
+ <key xsi:type="xsd:int">52435</key>
+ <value>
+ <affects xsi:type="xsd:string"/>
+ <archived xsi:type="xsd:int">0</archived>
+ <blockedby xsi:type="xsd:string"/>
+ <blocks xsi:type="xsd:string"/>
+ <bug_num xsi:type="xsd:int">52435</bug_num>
+ <date xsi:type="xsd:int">1639234742</date>
+ <done xsi:type="xsd:string"/>
+ <fixed/>
+ <fixed_date soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <fixed_versions soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <forwarded xsi:type="xsd:string"/>
+ <found xsi:type="apachens:Map">
+ <item>
+ <key xsi:type="xsd:string">29.0.50</key>
+ <value xsi:nil="true"/>
+ </item>
+ </found>
+ <found_date soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <found_versions soapenc:arrayType="xsd:string[1]" xsi:type="soapenc:Array">
+ <item xsi:type="xsd:string">29.0.50</item>
+ </found_versions>
+ <id xsi:type="xsd:int">52435</id>
+ <keywords xsi:type="xsd:string"/>
+ <last_modified xsi:type="xsd:int">1639234742</last_modified>
+ <location xsi:type="xsd:string">db-h</location>
+ <log_modified xsi:type="xsd:int">1639234742</log_modified>
+ <mergedwith xsi:type="xsd:string"/>
+ <msgid xsi:type="xsd:string"><46037FBE-8DAB-4913-850C-96F4F78622E2@Web.DE></msgid>
+ <originator xsi:type="xsd:string">The Reporter <example@example.com></originator>
+ <owner xsi:type="xsd:string"/>
+ <package xsi:type="xsd:string">emacs</package>
+ <pending xsi:type="xsd:string">pending</pending>
+ <severity xsi:type="xsd:string">normal</severity>
+ <source xsi:type="xsd:string">unknown</source>
+ <subject xsi:type="xsd:base64Binary">MjkuMC41MDsgQy15IHNlZW1zIHRvIGNvbnRhaW4gYW4gb2xkIGNvbnRlbnRz</subject>
+ <summary xsi:type="xsd:string"/>
+ <tags xsi:type="xsd:string"/>
+ <unarchived xsi:type="xsd:string"/>
+ </value>
+ </item>
+ </s-gensym3>
+ </get_statusResponse>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:apachens="http://xml.apache.org/xml-soap"
+ xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <get_statusResponse xmlns="Debbugs/SOAP">
+ <s-gensym3 xsi:type="apachens:Map">
+ <item>
+ <key xsi:type="xsd:int">52435</key>
+ <value>
+ <affects xsi:type="xsd:string"/>
+ <archived xsi:type="xsd:int">0</archived>
+ <blockedby xsi:type="xsd:string"/>
+ <blocks xsi:type="xsd:string"/>
+ <bug_num xsi:type="xsd:int">52435</bug_num>
+ <date xsi:type="xsd:int">1639234742</date>
+ <done xsi:type="xsd:string"/>
+ <fixed/>
+ <fixed_date soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <fixed_versions soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <forwarded xsi:type="xsd:string"/>
+ <found xsi:type="apachens:Map">
+ <item>
+ <key xsi:type="xsd:string">29.0.50</key>
+ <value xsi:nil="true"/>
+ </item>
+ <item>
+ <key xsi:type="xsd:float">4.16</key>
+ <value xsi:nil="true"/>
+ </item>
+ </found>
+ <found_date soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <found_versions soapenc:arrayType="xsd:string[1]" xsi:type="soapenc:Array">
+ <item xsi:type="xsd:string">29.0.50</item>
+ </found_versions>
+ <id xsi:type="xsd:int">52435</id>
+ <keywords xsi:type="xsd:string"/>
+ <last_modified xsi:type="xsd:int">1639234742</last_modified>
+ <location xsi:type="xsd:string">db-h</location>
+ <log_modified xsi:type="xsd:int">1639234742</log_modified>
+ <mergedwith xsi:type="xsd:string"/>
+ <msgid xsi:type="xsd:string"><46037FBE-8DAB-4913-850C-96F4F78622E2@Web.DE></msgid>
+ <originator xsi:type="xsd:string">The Reporter <example@example.com></originator>
+ <owner xsi:type="xsd:string"/>
+ <package xsi:type="xsd:string">emacs</package>
+ <pending xsi:type="xsd:string">pending</pending>
+ <severity xsi:type="xsd:string">normal</severity>
+ <source xsi:type="xsd:string">unknown</source>
+ <subject xsi:type="xsd:string">29.0.50; C-y seems to contain an old contents</subject>
+ <summary xsi:type="xsd:string"/>
+ <tags xsi:type="xsd:string"/>
+ <unarchived xsi:type="xsd:string"/>
+ </value>
+ </item>
+ <item>
+ <key xsi:type="xsd:int">52436</key>
+ <value>
+ <affects xsi:type="xsd:string"/>
+ <archived xsi:type="xsd:int">0</archived>
+ <blockedby xsi:type="xsd:string"/>
+ <blocks xsi:type="xsd:string"/>
+ <bug_num xsi:type="xsd:int">52436</bug_num>
+ <date xsi:type="xsd:int">1639234742</date>
+ <done xsi:type="xsd:string"/>
+ <fixed/>
+ <fixed_date soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <fixed_versions soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <forwarded xsi:type="xsd:string"/>
+ <found/>
+ <found_date soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <found_versions soapenc:arrayType="xsd:anyType[0]" xsi:type="soapenc:Array"/>
+ <id xsi:type="xsd:int">52436</id>
+ <keywords xsi:type="xsd:string">patch</keywords>
+ <last_modified xsi:type="xsd:int">1639234743</last_modified>
+ <location xsi:type="xsd:string">db-h</location>
+ <log_modified xsi:type="xsd:int">1639234743</log_modified>
+ <mergedwith xsi:type="xsd:string"/>
+ <msgid xsi:type="xsd:string"><87h7bfrxw9.fsf@web.de></msgid>
+ <originator xsi:type="xsd:string">"Dr. Reporter" <example@example.com></originator>
+ <owner xsi:type="xsd:string"/>
+ <package xsi:type="xsd:string">guile</package>
+ <pending xsi:type="xsd:string">pending</pending>
+ <severity xsi:type="xsd:string">normal</severity>
+ <source xsi:type="xsd:string">unknown</source>
+ <subject xsi:type="xsd:string">[patch] autoconf: Check for gperf if running from git</subject>
+ <summary xsi:type="xsd:string"/>
+ <tags xsi:type="xsd:string">patch</tags>
+ <unarchived xsi:type="xsd:string"/>
+ </value>
+ </item>
+ </s-gensym3>
+ </get_statusResponse>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <soap:Fault>
+ <faultcode>soap:Server</faultcode>
+ <faultstring>Too many bug reports (1001) requested, limit is 1000 at /usr/share/perl5/Debbugs/SOAP.pm line 128.
+ </soap:Fault>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <soap:Body>
+ <get_usertagResponse xmlns="Debbugs/SOAP">
+ <s-gensym3 />
+ </get_usertagResponse>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <newest_bugsResponse xmlns="Debbugs/SOAP">
+ <soapenc:Array soapenc:arrayType="xsd:anyType[0]"
+ xsi:type="soapenc:Array"/>
+ </newest_bugsResponse>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <newest_bugsResponse xmlns="Debbugs/SOAP">
+ <soapenc:Array soapenc:arrayType="xsd:int[1]"
+ xsi:type="soapenc:Array">
+ <item xsi:type="xsd:int">52436</item>
+ </soapenc:Array>
+ </newest_bugsResponse>
+ </soap:Body>
+<?xml version="1.0" encoding="UTF-8"?>
+<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"
+ xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/"
+ xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
+ <soap:Body>
+ <newest_bugsResponse xmlns="Debbugs/SOAP">
+ <soapenc:Array soapenc:arrayType="xsd:int[2]"
+ xsi:type="soapenc:Array">
+ <item xsi:type="xsd:int">52435</item>
+ <item xsi:type="xsd:int">52436</item>
+ </soapenc:Array>
+ </newest_bugsResponse>
+ </soap:Body>
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+"""Tests for the soap module."""
+from pathlib import Path
+from typing import Any, List
+import unittest
+import pytest
+from gnudebbugs import soap
+TEST_DIR = Path(__file__).resolve().parent
+TEST_DATA = TEST_DIR / 'data'
+# The start of every SOAP request.
+SOAP_HEADER = """<?xml version="1.0" encoding="UTF-8"?>\
+<soap:Envelope soap:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/" \
+xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" \
+xmlns:soapenc="http://schemas.xmlsoap.org/soap/encoding/" \
+xmlns:xsd="http://www.w3.org/2001/XMLSchema" \
+# The end of every SOAP request.
+SOAP_FOOTER = """</soap:Body></soap:Envelope>"""
+@pytest.mark.parametrize("func,args,kwargs,expected", (
+ (soap.Api.get_bug_log, [1234], {}, '<get_bug_log><xsd:int>1234</xsd:int></get_bug_log>'),
+ (soap.Api.get_bugs, [], {}, '<get_bugs></get_bugs>'),
+ (soap.Api.get_bugs, [], {'package': 'automake'}, '<get_bugs><a0>package</a0><a1>automake</a1></get_bugs>'),
+ (soap.Api.get_status, [[]], {}, '<get_status></get_status>'),
+ (soap.Api.get_status, [[1]], {}, '<get_status><xsd:int>1</xsd:int></get_status>'),
+ (soap.Api.get_status, [[1, 2]], {}, '<get_status><xsd:int>1</xsd:int><xsd:int>2</xsd:int></get_status>'),
+# (soap.Api.get_usertag, ['v@b', []], {}, '<get_status><xsd:int>1</xsd:int><xsd:int>2</xsd:int></get_status>'),
+ (soap.Api.newest_bugs, [0], {}, '<newest_bugs><xsd:int>0</xsd:int></newest_bugs>'),
+ (soap.Api.newest_bugs, [100], {}, '<newest_bugs><xsd:int>100</xsd:int></newest_bugs>'),
+def test_encode(func, args: List[Any], kwargs, expected: str):
+ expected = SOAP_HEADER + expected + SOAP_FOOTER
+ result = func(*args, **kwargs)
+ assert result == expected
+@pytest.mark.parametrize("filename,expected", (
+ ('get_bug_log-33711.xml', [
+ {
+ 'attachments': [],
+ 'body': 'I compiled ...',
+ 'header': '...',
+ 'msg_num': 5,
+ },
+ {
+ 'attachments': [],
+ 'body': 'reassign 33711 autoconf...',
+ 'header': '...',
+ 'msg_num': 8,
+ },
+ {
+ 'attachments': [],
+ 'body': 'reassign 33711 autoconf\nstop\n',
+ 'header': '...',
+ 'msg_num': 10,
+ },
+ {
+ 'attachments': [],
+ 'body': 'owner 33711 bug-autoconf@gnu.org\nstop\n',
+ 'header': '...',
+ 'msg_num': 12,
+ },
+ ]),
+ ('get_bugs-0.xml', []),
+ ('get_bugs-2.xml', [25740, 33711]),
+ ('get_status-0.xml', None),
+ ('get_status-1.xml', {
+ 52435: {
+ 'affects': '',
+ 'archived': 0,
+ 'blockedby': '',
+ 'blocks': '',
+ 'bug_num': 52435,
+ 'date': 1639234742,
+ 'done': '',
+ 'fixed': None,
+ 'fixed_date': [],
+ 'fixed_versions': [],
+ 'forwarded': '',
+ 'found': {
+ '29.0.50': None,
+ },
+ 'found_date': [],
+ 'found_versions': [
+ '29.0.50',
+ ],
+ 'id': 52435,
+ 'keywords': '',
+ 'last_modified': 1639234742,
+ 'location': 'db-h',
+ 'log_modified': 1639234742,
+ 'mergedwith': '',
+ 'msgid': '<46037FBE-8DAB-4913-850C-96F4F78622E2@Web.DE>',
+ 'originator': 'The Reporter <example@example.com>',
+ 'owner': '',
+ 'package': 'emacs',
+ 'pending': 'pending',
+ 'severity': 'normal',
+ 'source': 'unknown',
+ 'subject': '29.0.50; C-y seems to contain an old contents',
+ 'summary': '',
+ 'tags': '',
+ 'unarchived': '',
+ },
+ }),
+ ('get_status-2.xml', {
+ 52435: {
+ 'affects': '',
+ 'archived': 0,
+ 'blockedby': '',
+ 'blocks': '',
+ 'bug_num': 52435,
+ 'date': 1639234742,
+ 'done': '',
+ 'fixed': None,
+ 'fixed_date': [],
+ 'fixed_versions': [],
+ 'forwarded': '',
+ 'found': {
+ '29.0.50': None,
+ '4.16': None,
+ },
+ 'found_date': [],
+ 'found_versions': [
+ '29.0.50',
+ ],
+ 'id': 52435,
+ 'keywords': '',
+ 'last_modified': 1639234742,
+ 'location': 'db-h',
+ 'log_modified': 1639234742,
+ 'mergedwith': '',
+ 'msgid': '<46037FBE-8DAB-4913-850C-96F4F78622E2@Web.DE>',
+ 'originator': 'The Reporter <example@example.com>',
+ 'owner': '',
+ 'package': 'emacs',
+ 'pending': 'pending',
+ 'severity': 'normal',
+ 'source': 'unknown',
+ 'subject': '29.0.50; C-y seems to contain an old contents',
+ 'summary': '',
+ 'tags': '',
+ 'unarchived': '',
+ },
+ 52436: {
+ 'affects': '',
+ 'archived': 0,
+ 'blockedby': '',
+ 'blocks': '',
+ 'bug_num': 52436,
+ 'date': 1639234742,
+ 'done': '',
+ 'fixed': None,
+ 'fixed_date': [],
+ 'fixed_versions': [],
+ 'forwarded': '',
+ 'found': None,
+ 'found_date': [],
+ 'found_versions': [],
+ 'id': 52436,
+ 'keywords': 'patch',
+ 'last_modified': 1639234743,
+ 'location': 'db-h',
+ 'log_modified': 1639234743,
+ 'mergedwith': '',
+ 'msgid': '<87h7bfrxw9.fsf@web.de>',
+ 'originator': '"Dr. Reporter" <example@example.com>',
+ 'owner': '',
+ 'package': 'guile',
+ 'pending': 'pending',
+ 'severity': 'normal',
+ 'source': 'unknown',
+ 'subject': '[patch] autoconf: Check for gperf if running from git',
+ 'summary': '',
+ 'tags': 'patch',
+ 'unarchived': '',
+ },
+ }),
+ ('get_usertag-0.xml', None),
+ ('newest_bugs-0.xml', []),
+ ('newest_bugs-1.xml', [52436]),
+ ('newest_bugs-2.xml', [52435, 52436]),
+def test_parse(filename: str, expected: Any):
+ data = (TEST_DATA / filename).read_text()
+ result = soap.parse(data)
+ assert result == expected
+# Copyright (C) 2021 Free Software Foundation, Inc.
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or at your option) any
+# later version.
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <https://www.gnu.org/licenses/>.
+# https://tox.readthedocs.io/
+envlist = py38, py39
+python =
+ 3.8: py38
+ 3.9: py39
+deps =
+ pytest
+ requests
+commands = {envpython} ./pytest