From: Mike Frysinger Date: Sat, 11 Dec 2021 18:22:05 +0000 (-0500) Subject: initial release X-Git-Url: https://git.wh0rd.org/?a=commitdiff_plain;ds=inline;p=gnudebbugs.git initial release --- 532fdf4c3fd3863ecb0506422fe113c745e17dab diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e63f60d --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +__pycache__ +*.egg-info + +/build/ +/.tox/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0a04128 --- /dev/null +++ b/LICENSE @@ -0,0 +1,165 @@ + GNU LESSER GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + + This version of the GNU Lesser General Public License incorporates +the terms and conditions of version 3 of the GNU General Public +License, supplemented by the additional permissions listed below. + + 0. Additional Definitions. + + As used herein, "this License" refers to version 3 of the GNU Lesser +General Public License, and the "GNU GPL" refers to version 3 of the GNU +General Public License. + + "The Library" refers to a covered work governed by this License, +other than an Application or a Combined Work as defined below. + + An "Application" is any work that makes use of an interface provided +by the Library, but which is not otherwise based on the Library. +Defining a subclass of a class defined by the Library is deemed a mode +of using an interface provided by the Library. + + A "Combined Work" is a work produced by combining or linking an +Application with the Library. The particular version of the Library +with which the Combined Work was made is also called the "Linked +Version". + + The "Minimal Corresponding Source" for a Combined Work means the +Corresponding Source for the Combined Work, excluding any source code +for portions of the Combined Work that, considered in isolation, are +based on the Application, and not on the Linked Version. + + The "Corresponding Application Code" for a Combined Work means the +object code and/or source code for the Application, including any data +and utility programs needed for reproducing the Combined Work from the +Application, but excluding the System Libraries of the Combined Work. + + 1. Exception to Section 3 of the GNU GPL. + + You may convey a covered work under sections 3 and 4 of this License +without being bound by section 3 of the GNU GPL. + + 2. Conveying Modified Versions. + + If you modify a copy of the Library, and, in your modifications, a +facility refers to a function or data to be supplied by an Application +that uses the facility (other than as an argument passed when the +facility is invoked), then you may convey a copy of the modified +version: + + a) under this License, provided that you make a good faith effort to + ensure that, in the event an Application does not supply the + function or data, the facility still operates, and performs + whatever part of its purpose remains meaningful, or + + b) under the GNU GPL, with none of the additional permissions of + this License applicable to that copy. + + 3. Object Code Incorporating Material from Library Header Files. + + The object code form of an Application may incorporate material from +a header file that is part of the Library. You may convey such object +code under terms of your choice, provided that, if the incorporated +material is not limited to numerical parameters, data structure +layouts and accessors, or small macros, inline functions and templates +(ten or fewer lines in length), you do both of the following: + + a) Give prominent notice with each copy of the object code that the + Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the object code with a copy of the GNU GPL and this license + document. + + 4. Combined Works. + + You may convey a Combined Work under terms of your choice that, +taken together, effectively do not restrict modification of the +portions of the Library contained in the Combined Work and reverse +engineering for debugging such modifications, if you also do each of +the following: + + a) Give prominent notice with each copy of the Combined Work that + the Library is used in it and that the Library and its use are + covered by this License. + + b) Accompany the Combined Work with a copy of the GNU GPL and this license + document. + + c) For a Combined Work that displays copyright notices during + execution, include the copyright notice for the Library among + these notices, as well as a reference directing the user to the + copies of the GNU GPL and this license document. + + d) Do one of the following: + + 0) Convey the Minimal Corresponding Source under the terms of this + License, and the Corresponding Application Code in a form + suitable for, and under terms that permit, the user to + recombine or relink the Application with a modified version of + the Linked Version to produce a modified Combined Work, in the + manner specified by section 6 of the GNU GPL for conveying + Corresponding Source. + + 1) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (a) uses at run time + a copy of the Library already present on the user's computer + system, and (b) will operate properly with a modified version + of the Library that is interface-compatible with the Linked + Version. + + e) Provide Installation Information, but only if you would otherwise + be required to provide such information under section 6 of the + GNU GPL, and only to the extent that such information is + necessary to install and execute a modified version of the + Combined Work produced by recombining or relinking the + Application with a modified version of the Linked Version. (If + you use option 4d0, the Installation Information must accompany + the Minimal Corresponding Source and Corresponding Application + Code. If you use option 4d1, you must provide the Installation + Information in the manner specified by section 6 of the GNU GPL + for conveying Corresponding Source.) + + 5. Combined Libraries. + + You may place library facilities that are a work based on the +Library side by side in a single library together with other library +facilities that are not Applications and are not covered by this +License, and convey such a combined library under terms of your +choice, if you do both of the following: + + a) Accompany the combined library with a copy of the same work based + on the Library, uncombined with any other library facilities, + conveyed under the terms of this License. + + b) Give prominent notice with the combined library that part of it + is a work based on the Library, and explaining where to find the + accompanying uncombined form of the same work. + + 6. Revised Versions of the GNU Lesser General Public License. + + The Free Software Foundation may publish revised and/or new versions +of the GNU Lesser General Public License from time to time. Such new +versions will be similar in spirit to the present version, but may +differ in detail to address new problems or concerns. + + Each version is given a distinguishing version number. If the +Library as you received it specifies that a certain numbered version +of the GNU Lesser General Public License "or any later version" +applies to it, you have the option of following the terms and +conditions either of that published version or of any later version +published by the Free Software Foundation. If the Library as you +received it does not specify a version number of the GNU Lesser +General Public License, you may choose any version of the GNU Lesser +General Public License ever published by the Free Software Foundation. + + If the Library as you received it specifies that a proxy can decide +whether future versions of the GNU Lesser General Public License shall +apply, that proxy's public statement of acceptance of any version is +permanent authorization for you to choose that version for the +Library. diff --git a/README.md b/README.md new file mode 100644 index 0000000..55ae3b5 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# GNU debbugs interface + +A simple CLI tool for working with https://debbugs.gnu.org/. diff --git a/gnudebbugs b/gnudebbugs new file mode 100755 index 0000000..51678d3 --- /dev/null +++ b/gnudebbugs @@ -0,0 +1,33 @@ +#!/usr/bin/env python3 +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Wrapper to run directly out of git.""" + +from pathlib import Path +import sys + + +# Doesn't make sense to use this script for anything else. +assert __name__ == "__main__" +assert sys.version_info >= (3, 8), "Python 3.8+ required; found " + sys.version + + +TOPDIR = Path(__file__).resolve().parent +sys.path.insert(0, str(TOPDIR / "src")) + +from gnudebbugs import __main__ + +sys.exit(__main__.main()) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..374b58c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,6 @@ +[build-system] +requires = [ + "setuptools>=42", + "wheel" +] +build-backend = "setuptools.build_meta" diff --git a/pytest b/pytest new file mode 100755 index 0000000..464f538 --- /dev/null +++ b/pytest @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Wrapper to run tests directly out of git.""" + +from pathlib import Path +import sys + +import pytest + +TOPDIR = Path(__file__).resolve().parent +sys.path.insert(0, str(TOPDIR / "src")) + +sys.exit(pytest.console_main()) diff --git a/setup.py b/setup.py new file mode 100755 index 0000000..6591a00 --- /dev/null +++ b/setup.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Tooling to build the project. + +Users should just run: +$ ./setup.py build +""" + +from pathlib import Path +import re + +import setuptools + + +TOPDIR = Path(__file__).resolve().parent + +long_description = (TOPDIR / "README.md").read_text() + +DUNDERS = {} +data = (TOPDIR / "src/gnudebbugs/__init__.py").read_text() +for m in re.finditer(r"^__([^ ]+)__ = .(.*).$", data, flags=re.M): + DUNDERS[m.group(1)] = m.group(2) + + +setuptools.setup( + name="gnudebbugs", + version=DUNDERS['version'], + author="Mike Frysinger", + author_email="vapier@gmail.com", + description="CLI for debbugs.gnu.org", + long_description=long_description, + long_description_content_type="text/markdown", + url=DUNDERS['homepage'], + project_urls={ + "Bug Tracker": DUNDERS['issue_tracker'], + }, + classifiers=[ + "Development Status :: 3 - Alpha", + "Environment :: Console", + "License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + ], + package_dir={"": "src"}, + packages=setuptools.find_packages(where="src"), + entry_points={ + "console_scripts": [ + 'gnudebbugs=gnudebbugs.__main__:main', + ], + }, + python_requires=">=3.8", +) diff --git a/src/gnudebbugs/__init__.py b/src/gnudebbugs/__init__.py new file mode 100644 index 0000000..934d7bf --- /dev/null +++ b/src/gnudebbugs/__init__.py @@ -0,0 +1,24 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Common module settings.""" + +__version__ = "0" +__homepage__ = 'https://savannah.gnu.org/projects/gnudebbugs' +__issue_tracker__ = 'https://savannah.gnu.org/support/?group=gnudebbugs' + + +class BaseError(Exception): + """Base error class for all errors in this project.""" diff --git a/src/gnudebbugs/__main__.py b/src/gnudebbugs/__main__.py new file mode 100644 index 0000000..97964ad --- /dev/null +++ b/src/gnudebbugs/__main__.py @@ -0,0 +1,344 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""CLI for debbugs.gnu.org (and other debbugs trackers).""" + +import argparse +import collections +import math +from pathlib import Path +import sys +from typing import List, Optional, Tuple + +from .control import Control +from . import pretty +from . import soap +from . import ui as ui_mod + + +SYSTEMS = { + "gnu": ("https://debbugs.gnu.org/cgi/soap.cgi", "control@debbugs.gnu.org"), +} + +# Servers reject attempts to query more than this many bugs at once. +GET_STATUS_MAX_COUNT = 1000 + + +def add_raw_subparser(parser: argparse.ArgumentParser) -> None: + group = parser.add_argument_group("output format control") + group.add_argument( + "--pretty", + action="store_true", + help="Format the output in a human friendly form", + ) + # Have to specify this for each one that uses dest=format and not just the + # canonical --format as argparse will use None by default, and clobber the + # dest=format. + default_format = "json" + group.add_argument( + "--xml", + default=default_format, + dest="format", + action="store_const", + const="full-xml", + help="Output the entire raw SOAP (XML) document", + ) + group.add_argument( + "--json", + default=default_format, + dest="format", + action="store_const", + const="json", + help="Output the raw data in JSON (default)", + ) + group.add_argument( + "--format", + default=default_format, + choices=("full-xml", "xml", "json"), + help="Select the output format", + ) + + parser.add_argument( + "--chunk", + default=True, + action="store_true", + help="Automatically break up large requests to avoid server limits", + ) + parser.add_argument( + "--no-chunk", + default=True, + dest="chunk", + action="store_false", + help="Do not automatically break up large requests", + ) + + subparsers = parser.add_subparsers(title="API", dest="api", required=True) + + subparser = subparsers.add_parser("get_bug_log") + subparser.add_argument("bug", type=int, help="The bug to inspect") + + subparser = subparsers.add_parser("get_bugs") + subparser.add_argument("args", nargs="+", help="Search terms") + + subparser = subparsers.add_parser("get_status") + subparser.add_argument("bugs", nargs="+", type=int, help="The bugs to inspect") + + subparser = subparsers.add_parser("get_usertag") + subparser.add_argument("user", help="The user whose tags to lookup") + subparser.add_argument("tags", nargs="+", help="Tags to filter by") + + subparser = subparsers.add_parser("newest_bugs") + subparser.add_argument("count", type=int, help="How many recent bugs to return") + + +def get_parser() -> argparse.ArgumentParser: + """Get CLI parser.""" + parser = argparse.ArgumentParser(description=__doc__) + + parser.add_argument( + "-B", "--bts", default="gnu", metavar="SYSTEM", help="The bug tracker to use" + ) + + parser.add_argument( + "-U", + "--ui", + default="cli", + choices=("cli", "urwid"), + help="The UI to use (default: %(default)s)", + ) + parser.add_argument( + "--mouse", + default=False, + action="store_true", + help="Enable mouse support (%(default)s)", + ) + parser.add_argument( + "--no-mouse", + default=False, + dest="mouse", + action="store_false", + help="Do not enable mouse support", + ) + + # TODO: Add these to subparsers too. + parser.add_argument("-v", "--verbose", action="count", help="More verbose output") + parser.add_argument("-q", "--quiet", action="count", help="Only display errors") + parser.add_argument( + "-n", "--dry-run", dest="dryrun", action="store_true", help="Don't make changes" + ) + + group = parser.add_argument_group("e-mail options") + group.add_argument("--sendmail", default="sendmail", help="Sendmail command to use") + + subparsers = parser.add_subparsers( + title="Subcommands", dest="command", required=True + ) + + # Subcommand: raw + subparser = subparsers.add_parser( + "raw", + aliases=["r"], + description="Make direct calls to the server & return raw results", + ) + add_raw_subparser(subparser) + + # Subcommand: interactive + subparser = subparsers.add_parser( + "interactive", + aliases=["i"], + description="Make direct calls to the server & return raw results", + ) + add_raw_subparser(subparser) + + # Subcommand: query + subparser = subparsers.add_parser("query") + subparser.add_argument( + "args", + nargs="+", + help="Package or bug numbers to query", + metavar="package|bugnum", + ) + + # Subcommand: control + subparser = subparsers.add_parser( + "control", description="Send control messages to change bug state" + ) + subparser.add_argument("commands", nargs="+", help="Send control messages") + + return parser + + +def parse_args( + argv: Optional[List[str]] = None, +) -> Tuple[argparse.ArgumentParser, argparse.Namespace]: + """Parse the command line.""" + if argv is None: + argv = sys.argv[1:] + + parser = get_parser() + opts = parser.parse_args(argv) + + if opts.bts not in SYSTEMS: + parser.error(f"Unknown system: {opts.bts}") + + return (parser, opts) + + +TAG_SUMMARY = { + "patch": "+", + "wontfix": "W", + "moreinfo": "M", + "unreproducible": "R", + "fixed": "F", + "notabug": "N", + "pending": "P", + "help": "H", + "security": "*", + "confirmed": "x", + "easy": "E", +} + + +def subcmd_query(opts: argparse.Namespace, client: soap.Client) -> Optional[int]: + try: + ui = ui_mod.get_backend(opts.ui)(mouse=opts.mouse) + except ui_mod.Error as e: + sys.exit(e) + + try: + ui.start() + + # Server doesn't maintain order, so we don't either (wrt the user args). + ui.status("Searching for bugs ...") + bugs = set() + for arg in opts.args: + try: + bug = int(arg) + bugs.add(bug) + except ValueError: + found_bugs = client.get_bugs(package=arg) + if found_bugs: + bugs.update(found_bugs) + if not bugs: + sys.exit('\nNo bugs found!') + bug_width = math.floor(math.log10(max(bugs))) + 1 + bugs_list = list(bugs) + + # Servers restrict how many bugs you can query at once. + statuses = {} + for i in range(0, len(bugs), GET_STATUS_MAX_COUNT): + slice = bugs_list[i : i + GET_STATUS_MAX_COUNT] + ui.status(f"Loading {i + len(slice)}/{len(bugs)} bugs ...") + found_bugs = client.get_status(slice) + if found_bugs: + statuses.update(found_bugs) + if not statuses: + sys.exit('\nNo bugs found!') + + summary_tags = {} + tags_width = 1 + summary_states = {} + states_width = 1 + for bug, status in statuses.items(): + bug_tags = set(status["tags"].split()) + summary_bug_tags = "".join(TAG_SUMMARY[x] for x in bug_tags) + tags_width = max(tags_width, len(summary_bug_tags)) + summary_tags[bug] = summary_bug_tags + + # TODO: Figure out precedence order. + state = "" + if status["archived"]: + state = "♲" + elif status["mergedwith"]: + state = "=" + elif status["blockedby"]: + state = "♙" + elif status["fixed_versions"]: + state = "☺" + states_width = max(states_width, len(state)) + summary_states[bug] = state + + ui.status() + ui.interface_query(client, opts.to, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) + except KeyboardInterrupt: + # We ignore it here because interface_query already took care of quitting. + pass + finally: + ui.stop() + + +def subcmd_control(opts: argparse.Namespace, client: soap.Client) -> Optional[int]: + # TODO: Make this configurable. + control = Control(opts.to, "Mike Frysinger ") + for message in opts.commands: + control.queue._queue(message) + control.send(dryrun=opts.dryrun) + + +def subcmd_raw(opts: argparse.Namespace, client: soap.Client) -> Optional[int]: + # Is this more terrible than hand maintaining a CLI<->enum mapping? + sformat = opts.format.upper().replace("-", "_") + for prop in dir(soap.Format): + if prop == sformat: + format = getattr(soap.Format, prop) + break + else: + raise RuntimeError(f'Unable to locate "{sformat}" in {dir(soap.Format)}') + + if opts.api == "get_bug_log": + ret = client.get_bug_log(opts.bug, format=format) + elif opts.api == "get_bugs": + kwargs = {} + for arg in opts.args: + key, value = arg.split("=", 1) + kwargs[key] = value + ret = client.get_bugs(format=format, **kwargs) + elif opts.api == "get_status": + bugs = [int(x) for x in opts.bugs] + if opts.chunk: + # TODO: Implement XML chunking. + assert format == soap.Format.JSON, 'Chunking only supported for JSON currently' + ret = {} + for i in range(0, len(bugs), GET_STATUS_MAX_COUNT): + ret.update( + client.get_status(bugs[i : i + GET_STATUS_MAX_COUNT], format=format) + ) + else: + ret = client.get_status(bugs, format=format) + elif opts.api == "get_usertag": + ret = client.get_usertag(opts.user, opts.tags, format=format) + elif opts.api == "newest_bugs": + ret = client.newest_bugs(opts.count, format=format) + else: + raise RuntimeError(f'Unhandled API "{opts.api}"') + + if opts.pretty: + if format == soap.Format.JSON: + ret = pretty.json(ret) + + print(ret) + + +def main(argv: Optional[List[str]] = None) -> Optional[int]: + """The main entry point for scripts.""" + _, opts = parse_args(argv) + + tracker, to = SYSTEMS[opts.bts] + client = soap.Client(tracker) + opts.to = to + + return {"query": subcmd_query, "control": subcmd_control, "raw": subcmd_raw}[ + opts.command + ](opts, client) diff --git a/src/gnudebbugs/control.py b/src/gnudebbugs/control.py new file mode 100644 index 0000000..8afc2d8 --- /dev/null +++ b/src/gnudebbugs/control.py @@ -0,0 +1,297 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Control interface to debbugs. + +https://debbugs.gnu.org/Advanced.html +https://debbugs.gnu.org/server-control.html +""" + +import collections +from email.message import EmailMessage +import smtplib +import subprocess +from typing import Any, List, Optional +import xml.etree.ElementTree as ET + +import requests + + +class Control: + """A control interface to debbugs.""" + + def __init__(self, host: str): + self.host = host + self.to = f"control@{host}" + + +def assert_action(action: str) -> None: + """Verify |action| is a valid action.""" + if action not in ("+", "-", "="): + raise ValueError(f'Invalid action "{action}"') + + +def assert_bug(bug: int) -> None: + """Verify |bug| is a valid syntax-wise bug number.""" + if not isinstance(bug, int): + raise TypeError(f'Expected integer, but bug "{bug}" is of type {type(bug)}') + if bug <= 0: + raise ValueError(f"Bugs must be positive, not {bug}") + + +def assert_bugs(bugs: List[int]) -> None: + """Verify |bugs| are valid syntax-wise bug numbers.""" + if not bugs: + raise ValueError(f"Need at least one bug to merge") + for b in bugs: + assert_bug(b) + + +def assert_email(email: str) -> None: + """Verify |email| is a valid e-mail address.""" + # Special case: debbugs uses ! as an alias to the From address. + if email == "!": + return + + # TODO: Lookup RFC. + if " " in email or "@" not in email: + raise ValueError(f'Invalid e-mail address "{email}"') + + +def assert_newid(newid: int) -> None: + """Verify |newid| is a valid new id (for cloning).""" + if not isinstance(newid, int): + raise TypeError( + f'Expected integer, but newid "{newid}" is of type {type(newid)}' + ) + if newid >= 0: + raise ValueError(f"Newids must be negative, not {newid}") + + +def assert_package(package: str) -> None: + """Verify |package| is a valid package name.""" + if " " in package: + raise ValueError(f'Invalid package "{package}"') + + +TAGS = { + "patch", + "wontfix", + "moreinfo", + "unreproducible", + "fixed", + "notabug", + "pending", + "help", + "security", + "confirmed", + "easy", +} + + +def assert_tags(tags: List[str]) -> None: + """Verify |tags| are valid tag name.""" + if not tags: + raise ValueError(f"Need at least one tag") + for t in tags: + if t not in TAGS: + raise ValueError(f'Invalid tag "{tag}"') + + +SEVERITIES = { + "critical", + "grave", + "serious", + "important", + "normal", + "minor", + "wishlist", +} + + +def assert_severity(severity: str) -> None: + """Verify |severity| is a valid severity level.""" + if severity not in SEVERITIES: + raise ValueError(f'Invalid severity "{severity}"') + + +class MessageQueue: + """A queue of messages.""" + + def __init__(self): + self.messages = [] + self.stop_message = "thankyou" + + def _get_messages(self): + if not self.messages: + return '' + return "\n".join(self.messages) + f"\n{self.stop_message}\n" + + def _queue(self, message: str) -> None: + assert "\n" not in message + self.messages.append(message) + + def close(self, bug: int, fixed: Optional[str] = "") -> None: + assert_bug(bug) + self._queue(f"close {bug} {fixed}") + + def reassign(self, bug: int, package: str, version: Optional[str] = "") -> None: + assert_bug(bug) + assert_package(package) + self._queue(f"reassign {bug} {package} {version}") + + def severity(self, bug: int, severity: str) -> None: + assert_bug(bug) + assert_severity(severity) + self._queue(f"severity {bug} {severity}") + + def reopen(self, bug: int, address: Optional[str] = "") -> None: + assert_bug(bug) + if address and address != "=": + assert_email(address) + self._queue(f"reopen {bug} {address}") + + def found(self, bug: int, version: Optional[str] = "") -> None: + assert_bug(bug) + if found: + assert_version(version) + self._queue(f"found {bug} {version}") + + def notfound(self, bug: int, version: str) -> None: + assert_bug(bug) + assert_version(version) + self._queue(f"notfound {bug} {version}") + + def submitter(self, bug: int, address: str) -> None: + assert_bug(bug) + assert_email(address) + self._queue(f"submitter {bug} {address}") + + def forwarded(self, bug: int, address: str) -> None: + assert_bug(bug) + assert_email(address) + self._queue(f"forwarded {bug} {address}") + + def notforwarded(self, bug: int) -> None: + assert_bug(bug) + self._queue(f"notforwarded {bug}") + + def owner(self, bug: int, address: str) -> None: + assert_bug(bug) + assert_email(address) + self._queue(f"owner {bug} {address}") + + def noowner(self, bug: int) -> None: + assert_bug(bug) + self._queue(f"noowner {bug}") + + def retitle(self, bug: int, title: str) -> None: + assert_bug(bug) + if "\n" in title: + raise ValueError(f"Newlines not allowed in titles") + self._queue(f"retitle {bug} {title}") + + def clone(self, bug: int, *newids: List[int]) -> None: + assert_bug(bug) + if not newids: + raise ValueError(f"Need at least one newid to clone") + for b in newids: + assert_newid(b) + self._queue(f'clone {bug} {" ".join(str(x) for x in newids)}') + + def merge(self, bug: int, *bugs: List[int]) -> None: + assert_bug(bug) + assert_bugs(bugs) + self._queue(f'merge {bug} {" ".join(str(x) for x in bugs)}') + + def forcemerge(self, bug: int, *bugs: List[int]) -> None: + assert_bug(bug) + assert_bugs(bugs) + self._queue(f'forcemerge {bug} {" ".join(str(x) for x in bugs)}') + + def unmerge(self, bug: int) -> None: + assert_bug(bug) + self._queue(f"unmerge {bug}") + + def tag(self, bug: int, action: str, *tags: List[str]) -> None: + assert_bug(bug) + assert_action(action) + # TODO: Double check you can clear tags this way. The spec doesn't say it's possible at all ... + if action != '=' or tags: + assert_tags(tags) + self._queue(f'tag {bug} {action} {" ".join(tags)}') + + def block(self, bug: int, *bugs: List[int]) -> None: + assert_bug(bug) + assert_bugs(bugs) + self._queue(f'block {bug} {" ".join(str(x) for x in bugs)}') + + def unblock(self, bug: int, *bugs: List[int]) -> None: + assert_bug(bug) + assert_bugs(bugs) + self._queue(f'unblock {bug} {" ".join(str(x) for x in bugs)}') + + def stop(self, form: Optional[str] = None): + """Halt further commands. + + NB: This is handled automatically. + """ + if form not in ("quit", "stop", "--") and not form.startswith("thank"): + raise ValueError(f'Invalid stop command "{form}"') + self.stop_message = form + + def user(self, user: str) -> None: + """Set the user to the given email address.""" + assert_email(user) + self._queue(f"user {user}") + + def usertag(self, bug: int, action: str, *tags: List[str]) -> None: + """Sets usertags for the bug report. + + Action may be +-= to add/remove/set. + """ + assert_action(action) + # TODO: Need to check usertags are arbitrary. + # assert_tags(tags) + self._queue(f'usertag {bug} {action} {"".join(tags)}') + + +class Control: + def __init__( + self, to: str, email: str, use_sendmail: bool = True, sendmail: str = "sendmail" + ): + self.to = to + self.email = email + self.use_sendmail = use_sendmail + self.sendmail = sendmail + self.queue = MessageQueue() + + def send(self, dryrun: Optional[bool] = False) -> None: + msg = EmailMessage() + msg["To"] = self.to + msg["From"] = self.email + msg.set_content(self.queue._get_messages()) + self._send_email(msg, dryrun=dryrun) + + def _send_email(self, msg: EmailMessage, dryrun: Optional[bool] = False) -> None: + if self.use_sendmail: + sendmail = self.sendmail + " -t" + if dryrun: + print(f"Sending message:\n$ {sendmail}\n{msg}") + sendmail = "exit 0; " + sendmail + subprocess.run(sendmail, shell=True, input=str(msg).encode(), check=True) + else: + with smtplib.SMTP("localhost") as s: + s.send_message(msg) diff --git a/src/gnudebbugs/messages.py b/src/gnudebbugs/messages.py new file mode 100644 index 0000000..301cadd --- /dev/null +++ b/src/gnudebbugs/messages.py @@ -0,0 +1,93 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""E-mail helpers. + +E-mail messages often contain a lot of mess & noise. Contain that here. +""" + +import email.parser +import email.policy +import email.utils +from typing import Dict, List, NamedTuple, Tuple + + +class Attachment(NamedTuple): + """A single attachment.""" + name: str + mime_type: str + data: bytes + + +class Message(NamedTuple): + number: int + headers: Dict[str, str] + body: Tuple[str] + attachments: Tuple[Attachment] + + +def parse_entry(entry: Dict) -> Message: + """Parse a single |entry| from the bug |log|.""" + # TODO: Implement this! Have to find an example first ... + assert not entry['attachments'], 'attachments not supported!' + attachments = [] + body = [] + + msg = email.message_from_string(entry['header'] + '\n\n' + entry['body'], + policy=email.policy.SMTPUTF8) + + # Figure out which parts we'll actually show. + def _flatten_msg(msg): + if msg.is_multipart(): + if msg.get_content_type() == 'multipart/alternative': + for part in msg.get_payload(): + if part.get_content_maintype() == 'text': + yield part + break + else: + yield next(iter(msg.get_payload())) + else: + for part in msg.get_payload(): + yield from _flatten_msg(part) + else: + ctype = msg.get_content_type() + if ctype != 'application/pgp-signature': + yield msg + + for part in _flatten_msg(msg): + ctype = part.get_content_type() + mtype = part.get_content_maintype() + if mtype == 'text' or ctype == 'multipart/alternative': + body.append(part.get_payload().strip()) + elif mtype == 'application': + attachments.append(Attachment( + name=part.get_filename(), + mime_type=ctype, + data=part.get_content(), + )) + else: + body.append(f'Unhandled type: {ctype}') + + return Message( + number=entry['msg_num'], + headers=dict(msg), + body=body, + attachments=attachments, + ) + + +def parse_log(log: List[Dict]) -> Tuple[Message]: + """Parse the bug |log| and return structured messages.""" + yield from (parse_entry(x) for x in log) diff --git a/src/gnudebbugs/pretty.py b/src/gnudebbugs/pretty.py new file mode 100644 index 0000000..9e30b9c --- /dev/null +++ b/src/gnudebbugs/pretty.py @@ -0,0 +1,25 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Helpers for format things for humans.""" + +import json as mod_json +from typing import Any + + +def json(obj: Any) -> str: + return mod_json.dumps( + obj, ensure_ascii=False, indent=2, separators=(",", ": "), sort_keys=True + ) diff --git a/src/gnudebbugs/soap.py b/src/gnudebbugs/soap.py new file mode 100644 index 0000000..bb4890b --- /dev/null +++ b/src/gnudebbugs/soap.py @@ -0,0 +1,256 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Bespoke interface to debbugs. + +Since debbugs doesn't provide a standard WSDL, we can't use off-the-shelf SOAP +modules. Instead we have to define the APIs by hand. Nice. +""" + +import base64 +import enum +from typing import Any, List, Optional +import xml.etree.ElementTree as ET + +from . import __version__, BaseError + +import requests +try: + import requests_cache +except ImportError: + requests_cache = None + + +class Error(BaseError): + """Base error class.""" + + +class ServerError(Error): + """The server encountered an error.""" + + +def _encode(*args: List[Any]): + """Recursive encode Python |args|.""" + for arg in args: + if isinstance(arg, int): + yield f"{arg}" + elif isinstance(arg, (list, set, tuple)): + yield "".join(_encode(*arg)) + elif isinstance(arg, str): + yield f"{arg}" + elif isinstance(arg, dict): + n = 0 + for key, value in arg.items(): + yield f"{key}" + n += 1 + yield f"{value}" + n += 1 + else: + raise TypeError(f"Unsupported argument {type(arg)}") + + +def _decode(node, node_type: Optional[str] = None) -> Any: + """Recursively decode the |node|.""" + encodingStyle = "{http://schemas.xmlsoap.org/soap/encoding/}" + soap = "{http://schemas.xmlsoap.org/soap/envelope/}" + soapenc = "{http://schemas.xmlsoap.org/soap/encoding/}" + xsd = "{http://www.w3.org/2001/XMLSchema}" + xsi = "{http://www.w3.org/2001/XMLSchema-instance}" + debbugs = "{Debbugs/SOAP}" + gnudebbugs = "<<>>" + + if node_type is None: + node_type = node.attrib.get(f"{xsi}type") + + if node.attrib.get(f"{xsi}nil") == "true": + assert not list(node) + return None + elif node_type == "xsd:int": + return int(node.text) + elif node_type == "xsd:float": + return node.text # float(node.text) + elif node_type == "xsd:string": + # Turn None into an empty string. + return "" if not node.text else node.text + elif node_type == "xsd:base64Binary": + return base64.b64decode(node.text).decode("utf-8") + elif node_type in (f"{soapenc}Array", "soapenc:Array"): + ret = [] + for item in node: + assert item.tag == f"{debbugs}item" + # if f'{xsi}type' not in item.attrib: + # Assume typeless is a map. + item.attrib.setdefault(f"{xsi}type", f"{gnudebbugs}map") + ret.append(_decode(item)) + return ret + elif node_type == "apachens:Map": + ret = {} + for item in node: + key_node, value_node = list(item) + if key_node.tag != f"{debbugs}key": + key_node, value_node = value_node, key_node + key = _decode(key_node) + if value_node.attrib.get(f"{xsi}nil") == "true": + ret[key] = None + else: + ret[key] = _decode(value_node, node_type=f"{gnudebbugs}map") + return ret + elif node_type == f"{gnudebbugs}map": + ret = {} + for item in node: + key = item.tag + if key.startswith(debbugs): + key = key[len(debbugs) :] + ret[key] = _decode(item) + return ret + elif node_type is None: + if not list(node): + return None + + raise TypeError(f"Unhandled node type {node}[{node.attrib}][{node.text}]") + + +class Format(enum.IntEnum): + """Response output formats.""" + + # The entire document (XML & SOAP headers). + FULL_XML = 0 + # The method-specific XML content. + XML = 1 + # The method-specific XML content converted to JSON. + JSON = 2 + + +def parse(document: str, format: Optional[Format] = None) -> Any: + """Parse the SOAP response.""" + if format == Format.FULL_XML: + return document.decode("utf-8") + + # Assume the structure: + # + # + # <...method...> + # ...response... + root = ET.fromstring(document) + body = list(root) + assert len(body) == 1 + method = list(body[0]) + assert len(method) == 1 + response = list(method[0]) + assert len(response) == 1 + if format == Format.XML: + assert False, "todo" + + if format is not None and format != Format.JSON: + raise ValueError(f'Unknown format "{format}"') + return _decode(response[0]) + + +class Api: + """Generate SOAP documents for API requests.""" + + @classmethod + def document(cls, method: str, *args: List[Any]) -> str: + """Generate the document for |method| with |args|.""" + return f"""\ +\ +\ +<{method}>{"".join(_encode(*args))}\ +""" + + @classmethod + def get_bug_log(cls, bug: int) -> str: + return cls.document("get_bug_log", bug) + + @classmethod + def get_bugs(cls, **kwargs) -> str: + return cls.document("get_bugs", kwargs) + + @classmethod + def get_status(cls, bugs: List[int]) -> str: + return cls.document("get_status", bugs) + + @classmethod + def get_usertag(cls, user: str, tags: List[str]) -> str: + return cls.document("get_usertag", tags) + + @classmethod + def newest_bugs(cls, count: int) -> str: + return cls.document("newest_bugs", count) + + +class Client: + """Client to access a debbugs site.""" + + def __init__(self, tracker: str): + self.tracker = tracker + + if requests_cache is not None: + self.session = requests_cache.CachedSession('gnudebbugs') + else: + self.session = requests.Session() + self.session.headers.update( + { + # "User-Agent": f'GNU debbugs/{__version__}', + "Content-Type": "application/soap+xml", + } + ) + + def link(self, bug: int) -> str: + """Return the URI to the |bug| report.""" + return self.tracker.rsplit("/", 2)[0] + f"/{bug}" + + def _post(self, data: Any) -> bytes: + """Cover function to make the request. + + Provides an easy way to mock in tests. + """ + return self.session.post(self.tracker, data=data).content + + def get_bug_log(self, bug: int, format: Optional[Format] = None) -> Any: + """Get full |bug| log.""" + data = Api.get_bug_log(bug) + response = self._post(data) + return parse(response, format=format) + + def get_bugs(self, format: Optional[Format] = None, **kwargs) -> Any: + """Search for bugs.""" + data = Api.get_bugs(**kwargs) + response = self._post(data) + return parse(response, format=format) + + def get_status(self, bugs: List[int], format: Optional[Format] = None) -> Any: + """Get status for |bugs|.""" + data = Api.get_status(bugs) + response = self._post(data) + return parse(response, format=format) + + def get_usertag( + self, user: str, tags: List[str], format: Optional[Format] = None + ) -> str: + """Lookup usertags.""" + data = Api.get_usertag(user, tags) + response = self._post(data) + return parse(response, format=format) + + def newest_bugs(self, count: int, format: Optional[Format] = None) -> List[int]: + """Lookup new bugs.""" + data = Api.newest_bugs(count) + response = self._post(data) + return parse(response, format=format) diff --git a/src/gnudebbugs/ui/__init__.py b/src/gnudebbugs/ui/__init__.py new file mode 100644 index 0000000..89f652c --- /dev/null +++ b/src/gnudebbugs/ui/__init__.py @@ -0,0 +1,62 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Common UI settings.""" + +import abc + +from .. import BaseError + + +class Error(BaseError): + """Base UI error class.""" + + +class MissingUrwidError(Error): + """The urwid module is unavailable.""" + + +class UIBase(abc.ABC): + """Base class for all UI implementations.""" + + def __init__(self): + pass + + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + def status(self, text: str) -> None: + """Print status information.""" + + def interface_query(self, client, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) -> None: + """Run the query interface.""" + + +def get_backend(backend: str) -> UIBase: + """Helper to dynamically load the right backend.""" + if backend == "cli": + from . import cli + + return cli.UI + elif backend == "urwid": + try: + from . import urwid + except ImportError as e: + raise MissingUrwidError(f'Unable to import urwid: {e}') + + return urwid.UI diff --git a/src/gnudebbugs/ui/cli.py b/src/gnudebbugs/ui/cli.py new file mode 100644 index 0000000..aec5a68 --- /dev/null +++ b/src/gnudebbugs/ui/cli.py @@ -0,0 +1,53 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Command line interface UI.""" + +import os +from typing import Optional + +from . import UIBase + + +class UI(UIBase): + """Command line interfacee.""" + + CSI_ERASE_LINE = "\x1b[2K" + + def __init__(self): + self._clear_line_text = self.CSI_ERASE_LINE + "\r" if os.isatty(1) else "\n" + + def osc8_link(self, url: str, text: str) -> str: + return f"\x1b]8;;{url}\x07{text}\x1b]8;;\x07" + + def _clear_line(self, flush: bool = False): + print(self._clear_line_text, end='', flush=flush) + + def status(self, text: Optional[str] = None) -> None: + """Print status information.""" + self._clear_line() + if text: + print(text, end='', flush=True) + + def interface_query(self, client, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) -> None: + """Run the query interface.""" + for bug, status in sorted(statuses.items()): + # Severity levels all happen to start with a diff first letter. + sev = status["severity"][0] + print( + self.osc8_link(client.link(bug), f"{bug:{bug_width}}"), + f"{sev}{summary_tags[bug]:{tags_width}}{summary_states[bug]:{states_width}}", + status["subject"], + ) diff --git a/src/gnudebbugs/ui/urwid.py b/src/gnudebbugs/ui/urwid.py new file mode 100644 index 0000000..6988be2 --- /dev/null +++ b/src/gnudebbugs/ui/urwid.py @@ -0,0 +1,721 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""urwid (terminal/curses) UI.""" + +import email.utils +import os +from pathlib import Path +import random +import re +import subprocess +import tempfile +from typing import Any, List, NamedTuple, Optional, Tuple + +import urwid + +from .. import __homepage__, __version__ +from .. import messages +from ..control import MessageQueue +from . import UIBase + + +class HelpContents(NamedTuple): + """Content to show in the help page.""" + + name: str + shortcuts: Tuple[str] + contents: Tuple[Any] = () + + +class SlimButton(urwid.Button): + """A button w/out <> markers and w/hidden cursor.""" + + def __init__(self, label): + self._label = urwid.SelectableIcon("", 0) + cols = urwid.Columns([self._label], dividechars=1) + urwid.WidgetWrap.__init__(self, cols) + self.set_label(label) + # Hide the cursor on the button since we style it with reverse text. A + # hack, but works. If urwid ever fixes things, we can clean this up. + # https://github.com/urwid/urwid/issues/170 + self._label._cursor_position = len(label) + 1 + + def pack(self, size=None, focus=False): + return self._label.pack(size, focus) + + +class MemoryEdit(urwid.Edit): + """Edit widget that tracks its original value.""" + + def __init__(self, *args, **kwargs): + self.control = kwargs.pop('control') + super().__init__(*args, **kwargs) + self.original_edit_text = self.edit_text + + def keypress(self, size, key): + if key == 'esc': + self.set_edit_text(self.original_edit_text) + else: + return super().keypress(size, key) + + def is_modified(self): + return self.original_edit_text != self.edit_text + + +class MemoryCheckBox(urwid.CheckBox): + """CheckBox widget that tracks its original value.""" + + def __init__(self, *args, **kwargs): + self.control = kwargs.pop('control') + super().__init__(*args, **kwargs) + self.original_state = self.get_state() + + def keypress(self, size, key): + if key == 'esc': + self.set_state(self.original_state) + else: + return super().keypress(size, key) + + def is_modified(self): + return self.original_state != self.get_state() + + +def ClickableButton(label, callback, *args, attr_map=None): + """Convenience function for generating clickable buttons.""" + def _trampoline(button): + """Helper to swallow the |button| argument.""" + return callback(*args) + button = SlimButton(label) + urwid.connect_signal(button, 'click', _trampoline) + return urwid.AttrMap(button, attr_map, focus_map='reversed') + + +BUTTON_SEP = urwid.Text(['┃']) + + +class StatusBar(urwid.Columns): + """The status bar.""" + + def __init__(self, holder): + self.holder = holder + self._status = urwid.Text(['Initializing ...']) + super().__init__([ + self._status, + ('pack', BUTTON_SEP), + ('pack', ClickableButton('^G HELP', self.holder.show_help)), + ('pack', BUTTON_SEP), + ('pack', ClickableButton('M-LEFT BACK', self.holder.go_back)), + ('pack', BUTTON_SEP), + ('pack', ClickableButton('^C QUIT', self.holder.exit)), + ('pack', BUTTON_SEP), + ]) + + def status(self, text: Optional[str] = '') -> None: + if not text: + text = f'Idle (GNU/debbugs v{__version__})' + else: + text = ('standout', text) + self._status.set_text(text) + + +class PaneHolder: + """The root (blank) dialog that holds UI elements.""" + + def __init__(self, statusbar: bool = True, mouse: bool = False): + self._status_bar = StatusBar(self) + self.panes = [] + self.widget = urwid.Frame(urwid.SolidFill(), footer=self._status_bar) + palette = [ + ('reversed', 'standout', 'default'), + ('frame header', 'dark blue', 'default'), + ('e-mail header', 'dark green', 'default'), + ('edit', 'white', 'dark gray'), + ] + self.loop = urwid.MainLoop( + self.widget, palette=palette, handle_mouse=mouse, + unhandled_input=self.unhandled_input) + + def start(self): + self.loop.start() + #self.loop.screen.clear() + # NB: We don't restore the keys ourselves because loop.screen already + # saves & restores them on start & stop. + keys = self.loop.screen.tty_signal_keys() + self.loop.screen.tty_signal_keys(intr=keys[0], quit=0, start=0, stop=0, susp=keys[4]) + + def stop(self): + self.loop.stop() + # TODO: Doesn't seem to work? + _, rows = self.loop.screen.get_cols_rows() + urwid.escape.set_cursor_position(0, rows) + + def run(self): + self.loop.event_loop.run() + + def unhandled_input(self, key): + # First global shortcuts. + if key == 'ctrl c': + raise urwid.ExitMainLoop() + elif key in ('h', '?', 'ctrl g'): + self.show_help() + elif key == 'meta left': + self.go_back() + elif key in ('up', 'page up', 'home') and self.widget.focus_position == 'footer': + self.widget.set_focus('body') + elif self.pane.keypress(key): + pass + elif key == 'down' and self.widget.focus_position == 'body': + self.widget.set_focus('footer') + else: + self.status(f'Unhandled key "{key}"') + return key + + def status(self, text: Optional[str] = '') -> None: + self._status_bar.status(text) + self.loop.draw_screen() + + def get_help_contents(self) -> Optional[HelpContents]: + return HelpContents( + name='Global', + shortcuts=( + ('Ctrl+C', 'Quit'), + *((x, 'Show help for the current page') for x in ('Ctrl+G', 'h', '?')), + ('Alt+Left', 'Go back to previous page'), + ), + ) + + def show_help(self): + if not isinstance(self.pane, HelpPane): + self.push_pane(HelpPane(self, self.pane)) + else: + self.status(random.choice(( + 'Sorry, there is only so much help one can give', + 'Sometimes the answer you seek lies inside yourself', + 'IDK, try Googling it?', + 'Maybe no one knows the answer :(', + '(╯°□°)╯︵ ┻━┻', + ))) + + def go_back(self): + if len(self.panes) > 1: + # TODO: Add callback to save/check state. + self.pop_pane() + else: + self.status('Cannot go back: already at top') + + def exit(self): + raise urwid.ExitMainLoop() + + @property + def pane(self): + return self.panes[-1] if self.panes else None + + def push_pane(self, pane): + self.status() + self.panes.append(pane) + self.widget.set_body(urwid.Overlay( + pane.widget, self.widget.get_body(), + align='center', width=('relative', 100), + valign='middle', height=('relative', 100), + )) + + def pop_pane(self): + self.status() + self.panes.pop() + self.widget.set_body(self.widget.get_body()[0]) + + +class Pane: + """The common pane API.""" + + def __init__(self, holder): + self.holder = holder + + def status(self, text: Optional[str] = '') -> None: + self.holder.status(text) + + def get_help_contents(self) -> Optional[HelpContents]: + return None + + def keypress(self, key) -> bool: + return False + + +class HelpPane(Pane): + """The help page.""" + + def __init__(self, holder, pane): + super().__init__(holder) + self.pane = pane + self.populate() + + def populate(self): + contents = [ + urwid.Divider(), + urwid.Text([f'Homepage: {__homepage__}']), + ] + + for obj in (self.holder, self, self.pane): + help = obj.get_help_contents() + if help is None: + continue + + contents += [ + urwid.Divider(), + urwid.Text([help.name, ' keyboard shortcuts:']), + ] + contents.extend( + urwid.Columns([(15, urwid.Text([key])), urwid.Text([command])]) + for key, command in help.shortcuts + ) + if help.contents: + contents.append(urwid.Divider()) + for entry in help.contents: + if not entry: + contents.append(urwid.Divider()) + elif isinstance(entry, str): + contents.append(urwid.Text([entry])) + else: + contents.append(entry) + + body = urwid.ListBox(urwid.SimpleListWalker(contents)) + self.widget = urwid.Frame(body, header=urwid.Text([f'GNU/debbugs version {__version__} help'])) + + def get_help_contents(self) -> HelpContents: + return HelpContents( + name='Help page', + shortcuts=( + *((x, 'Close help') for x in ('q', 'Escape')), + ), + ) + + def keypress(self, key) -> bool: + if key in ('q', 'esc'): + self.holder.go_back() + else: + return False + return True + + +class SaveFilePane(Pane): + """Save a file to disk.""" + + def __init__(self, holder, name, data, callback): + super().__init__(holder) + self.name = name + self.data = data + self.callback = callback + self.populate() + + def populate(self): + self.path = MemoryEdit(edit_text=str(Path.cwd() / self.name)) + urwid.connect_signal(self.path, 'change', self.check_path) + self.status = urwid.Text(['']) + contents = [ + urwid.LineBox(self.path), + self.status, + ] + self.check_path(self.path) + body = urwid.ListBox(urwid.SimpleFocusListWalker(contents)) + self.widget = urwid.Frame(body, header=urwid.Text([f'Pick path to save {self.name} ...'])) + + def get_help_contents(self) -> HelpContents: + return HelpContents( + name='Save page', + shortcuts=( + ('Enter', 'Save the file to the specified path'), + ('Escape', 'Cancel save'), + ), + ) + + def keypress(self, key) -> bool: + if key == 'enter': + self.save() + elif key == 'esc': + self.callback(False, self.path.edit_text) + else: + return False + return True + + def check_path(self, element, path=None): + path = Path(element.edit_text if path is None else path) + ret = False + if not path.exists(): + if not path.parent.exists(): + msg = f'Error: {path.parent} is not a directory' + elif not os.access(path.parent, os.W_OK): + msg = f'Error: {path.parent} is not writable' + else: + msg = 'Press enter to save!' + ret = True + else: + msg = f'Error: path already exists; please pick a different name' + self.status.set_text(msg) + return ret + + def save(self): + path = Path(self.path.edit_text) + if self.check_path(self.path): + try: + with path.open('wb') as fp: + fp.write(self.data) + self.callback(True, self.path.edit_text) + except OSError as e: + self.status(f'Saving failed: {e}') + + +class BugListPane(Pane): + """The bug picker list.""" + + def __init__(self, holder, client, control, statuses, bug_width, summary_tags, tags_width, summary_states, states_width): + super().__init__(holder) + self.client = client + self.control = control + self.populate(statuses, bug_width, summary_tags, tags_width, summary_states, states_width) + + def populate(self, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) -> None: + contents = [] + for bug, status in sorted(statuses.items()): + # Severity levels all happen to start with a diff first letter. + sev = status["severity"][0] + text = ( + f"{bug:{bug_width}} " + + f"{sev}{summary_tags[bug]:{tags_width}}{summary_states[bug]:{states_width}} " + + status["subject"] + ) + contents.append(ClickableButton(text, self.select_bug, status)) + self.widget = urwid.ListBox(urwid.SimpleFocusListWalker(contents)) + + def select_bug(self, status): + self.status(f'loading: #{status["id"]}: {status["subject"]}') + self.holder.push_pane(BugViewPane(self.holder, self.client, self.control, status)) + + def get_help_contents(self) -> HelpContents: + return HelpContents( + name='Bug list', + shortcuts=( + *((x, 'Close the list (i.e. quit)') for x in ('q', 'Escape')), + ('Enter', 'View selected bug/issue'), + ), + contents=( + 'The bug list is for navigating all the currently discovered issues.', + 'Simplify select the issue you want to view and press Enter!', + ), + ) + + def keypress(self, key) -> bool: + if key in ('q', 'esc'): + raise urwid.ExitMainLoop() + else: + return False + return True + + +class BugViewPane(Pane): + """Show details about a specific bug.""" + + def __init__(self, holder, client, control, status): + super().__init__(holder) + self.client = client + self.control = control + self.bug_status = status + self.bug_log = client.get_bug_log(status['id']) + self.inputs = [] + self.populate() + + def populate(self) -> None: + status = self.bug_status + # https://debbugs.gnu.org/cgi/bugreport.cgi?bug=25740 + self._header = urwid.Columns([ + urwid.Text([('frame header', f'#{status["id"]} {status["subject"]}')]), + ('pack', BUTTON_SEP), + ('pack', ClickableButton('^S SEND', self.send)), + ('pack', BUTTON_SEP), + ]) + + label_width = 10 + def _label_row(label, element): + assert len(label) < label_width + return urwid.Columns([ + #(1, urwid.Text(['|'])), + (label_width, urwid.Text([label])), + element, + ]) + def _label_edit_row(label, element): + self.inputs.append(element) + return _label_row(label, urwid.AttrMap(element, 'edit')) + + report_date = email.utils.formatdate(status['date'], localtime=True) + # TODO: Show log_modified ? + modify_date = email.utils.formatdate(status['last_modified'], localtime=True) + contents = [ + _label_edit_row('Title:', MemoryEdit(edit_text=status["subject"], control='title')), + _label_edit_row('Package:', MemoryEdit(edit_text=status["package"], control='package')), + _label_row('Reporter:', urwid.Text(status["originator"])), + _label_row('Reported:', urwid.Text(report_date)), + _label_row('Updated:', urwid.Text(modify_date)), + _label_edit_row('Severity:', MemoryEdit(edit_text=status["severity"], control='severity')), + _label_edit_row('Tags:', MemoryEdit(edit_text=status['tags'], control='tag')), + _label_edit_row('Done:', MemoryEdit(edit_text=status['done'], control='done')), + _label_edit_row('Archived:', MemoryCheckBox('', state=status['archived'], control='archive')), + # TODO: Add more fields. + ] + + for entry in messages.parse_log(self.bug_log): + reply_placeholder = urwid.WidgetPlaceholder(urwid.Divider()) + contents += [ + urwid.AttrMap(urwid.Divider('─'), 'e-mail header'), + # https://debbugs.gnu.org/cgi/bugreport.cgi?msg=14;bug=25740 + urwid.Columns([ + urwid.Text([('e-mail header', f'|Message #{entry.number}')]), + ('pack', BUTTON_SEP), + ('pack', ClickableButton('REPLY', self.reply, entry, reply_placeholder)), + ('pack', BUTTON_SEP), + ]), + ] + for header in ('From', 'To', 'CC', 'Subject', 'Date'): + if header in entry.headers: + contents.append(urwid.Text([('e-mail header', f'|{header}: {entry.headers[header]}')])) + contents.append(urwid.Divider()) + + for part in entry.attachments: + # TODO: Add inline viewer for text/* files, and application/octet-stream (w/user prompt). + # e.g. A .patch file encoded as application/octet-stream. + contents.append(urwid.Columns([ + ('pack', urwid.Text([('e-mail header', '[')])), + ('pack', ClickableButton( + part.name, self.save_attachment, part, + attr_map='e-mail header')), + ('pack', urwid.Text([('e-mail header', f' ({part.mime_type}, attachment)]')])), + ])) + if entry.attachments: + contents.append(urwid.Divider()) + + first = True + for part in entry.body: + if first: + first = False + else: + contents.append(urwid.Divider()) + contents.append(urwid.Text([part])) + + contents.append(reply_placeholder) + + contents.append(urwid.AttrMap(urwid.Divider('━'), 'frame header')) + + body = urwid.ListBox(urwid.SimpleListWalker(contents)) + self.widget = urwid.Frame(body, header=self._header) + + def get_help_contents(self) -> HelpContents: + return HelpContents( + name='Bug page', + shortcuts=( + ('Ctrl+S', 'Send updates to the tracker'), + ), + ) + + def keypress(self, key): + if key in ('down', 'page down', 'end') and self.widget.focus_position == 'header': + self.widget.set_focus('body') + elif key == 'up' and self.widget.focus_position == 'body': + self.widget.set_focus('header') + elif key == 'ctrl s': + self.send() + else: + return False + return True + + def reply(self, msg, placeholder): + msg_date = msg.headers.get('Date') + msg_from = msg.headers.get('From') + if msg_from: + from_name, _ = email.utils.parseaddr(msg_from) + if from_name: + msg_from = from_name + with tempfile.NamedTemporaryFile() as tmp: + if msg_date and msg_from: + tmp.write(f'On {msg_date}, {msg_from} wrote:\n'.encode()) + elif msg_from: + tmp.write(f'{msg_from} wrote:\n'.encode()) + for line in msg.body[0].splitlines(): + if line: + tmp.write(f'> {line}\n'.encode()) + else: + tmp.write(b'>\n') + tmp.flush() + editor = os.getenv("EDITOR", "true") + result = subprocess.run(f'{editor} "{tmp.name}"', shell=True, check=False) + tmp.seek(0) + edit = MemoryEdit(edit_text=tmp.read(), multiline=True, control='reply') + edit.headers = msg.headers + self.inputs.append(edit) + placeholder.original_widget = urwid.LineBox(edit, title='REPLY', title_attr='e-mail header') + self.holder.loop.screen.clear() + + def save_attachment(self, part): + def done(success, name): + self.holder.go_back() + if success: + self.status(f'Saved to {name}') + else: + self.status('Saving failed') + + self.status(f'Saving {part.name} ...') + # Try to avoid malicious names. + name = part.name.lstrip('.').replace('/', '_') + self.holder.push_pane(SaveFilePane(self.holder, name, part.data, done)) + + def _gather_control(self): + def _tag(num, tags): + ret.tag(num, '=', *tags.split()) + def _done(num, fixer): + if fixer: + ret.close(num, fixer) + else: + ret.reopen(num) + + ret = MessageQueue() + bug_num = self.bug_status['id'] + + mapping = { + 'done': _done, + 'package': ret.reassign, + 'severity': ret.severity, + 'tag': _tag, + 'title': ret.retitle, + } + + for field in self.inputs: + if not field.is_modified(): + continue + if not field.control or field.control in {'reply'}: + continue + func = mapping[field.control] + func(bug_num, field.get_edit_text()) + return ret._get_messages() + + def send(self): + self.status('Preparing to send updates ...') + # TODO: Find reply ... + # TODO: Use subject from reply, or from title. + # TODO: Use In-Reply-To from reply. + # TODO: Setup To/CC from reply + + replies = [x for x in self.inputs if x.control == 'reply'] + + control_domain = self.control.split('@', 1)[1] + bug_control_email = f'{self.bug_status["id"]}@{control_domain}' + to = None + # Mapping unique e-mail address to the display name. + cc = {} + def _add_cc(name, email): + # If the address doesn't exist or the name is blank, add this entry. + if not cc.get(email): + cc[email] = name + subject = None + if not replies: + to = self.control + else: + for reply in replies: + reply_from = email.utils.parseaddr(reply.headers['From']) + if to is None: + to = reply_from + _add_cc(*reply_from) + _add_cc(*email.utils.parseaddr(reply.headers['To'])) + for addr in email.utils.getaddresses([reply.headers.get('Cc', '')]): + _add_cc(*addr) + if not subject: + subject = reply.headers.get('Subject') + cc[bug_control_email] = '' + del cc[to[1]] + to = email.utils.formataddr(to) + if not subject: + subject = self.bug_status['subject'] + else: + # Filter out reply prefixes. + # English & Spanish: RE + # English: FWD + # German: AW + # Danish: SV + # Mandarin: 回覆 + subject = 'Re: ' + re.sub(r"^((RE|AW|FWD|SV|回复):\s*)+", '', subject, flags=re.I) + headers = { + 'From': 'Mike Frysinger ', + 'To': to, + 'CC': ', '.join(email.utils.formataddr((v, k)) for (k, v) in cc.items()), + 'Subject': subject, + } + + with tempfile.NamedTemporaryFile() as tmp: + for header in ('From', 'To', 'CC', 'Subject'): + tmp.write(f'{header}: {headers[header]}\n'.encode()) + tmp.write(f'User-Agent: GNU debbugs/{__version__}\n'.encode()) + tmp.write(b'\n') + control_message = self._gather_control() + if control_message: + tmp.write(b'\n' + control_message.encode() + b'\n') + for i, reply in enumerate(replies): + if i: + tmp.write(b'\n') + tmp.write(reply.get_edit_text().strip().encode() + b'\n') + tmp.flush() + editor = os.getenv("EDITOR", "true") + result = subprocess.run(f'{editor} "{tmp.name}"', shell=True, check=False) + tmp.seek(0) + msg = tmp.read() + + self.holder.loop.screen.clear() + + # TODO: Prompt here. + + + +class UI(UIBase): + """Command line interfacee.""" + + def __init__(self, mouse: bool = False): + self.dialog = PaneHolder(mouse=mouse) + self._started = False + + def start(self): + # Only allow one start. + assert not self._started + self._started = True + self.dialog.start() + + def stop(self): + # Allow multiple stops. + if self._started: + self._started = False + self.dialog.stop() + + def status(self, text: Optional[str] = '') -> None: + """Print status information.""" + self.dialog.status(text) + + def interface_query(self, client, control, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) -> None: + """Run the query interface.""" + bug_list = BugListPane(self.dialog, client, control, statuses, bug_width, summary_tags, tags_width, summary_states, states_width) + self.dialog.push_pane(bug_list) + + # If there's only one bug, automatically display it. + if len(statuses) == 1: + bug_list.select_bug(next(iter(statuses.values()))) + + try: + self.dialog.run() + finally: + self.stop() diff --git a/tests/data/get_bug_log-33711.xml b/tests/data/get_bug_log-33711.xml new file mode 100644 index 0000000..8593d84 --- /dev/null +++ b/tests/data/get_bug_log-33711.xml @@ -0,0 +1,41 @@ + + + + + + + 5 + + I compiled ... +
...
+
+ + 8 + + reassign 33711 autoconf... +
...
+
+ +
...
+ 10 + + reassign 33711 autoconf +stop + +
+ + owner 33711 bug-autoconf@gnu.org +stop + + 12 + +
...
+
+
+
+
+
diff --git a/tests/data/get_bug_log-NA.xml b/tests/data/get_bug_log-NA.xml new file mode 100644 index 0000000..0a9ab97 --- /dev/null +++ b/tests/data/get_bug_log-NA.xml @@ -0,0 +1,14 @@ + + + + + soap:Server + Unable to open bug log /var/lib/debbugs/spool/db-h/12/3371101212.log for reading: No such file or directory at /usr/share/perl5/Debbugs/Log.pm line 199. + + + + diff --git a/tests/data/get_bugs-0.xml b/tests/data/get_bugs-0.xml new file mode 100644 index 0000000..6afe8f9 --- /dev/null +++ b/tests/data/get_bugs-0.xml @@ -0,0 +1,12 @@ + + + + + + + + diff --git a/tests/data/get_bugs-2.xml b/tests/data/get_bugs-2.xml new file mode 100644 index 0000000..d9cfee9 --- /dev/null +++ b/tests/data/get_bugs-2.xml @@ -0,0 +1,15 @@ + + + + + + 25740 + 33711 + + + + diff --git a/tests/data/get_status-0.xml b/tests/data/get_status-0.xml new file mode 100644 index 0000000..bcc3b13 --- /dev/null +++ b/tests/data/get_status-0.xml @@ -0,0 +1,12 @@ + + + + + + + + diff --git a/tests/data/get_status-1.xml b/tests/data/get_status-1.xml new file mode 100644 index 0000000..175b007 --- /dev/null +++ b/tests/data/get_status-1.xml @@ -0,0 +1,57 @@ + + + + + + + 52435 + + + 0 + + + 52435 + 1639234742 + + + + + + + + 29.0.50 + + + + + + 29.0.50 + + 52435 + + 1639234742 + db-h + 1639234742 + + <46037FBE-8DAB-4913-850C-96F4F78622E2@Web.DE> + The Reporter <example@example.com> + + emacs + pending + normal + unknown + MjkuMC41MDsgQy15IHNlZW1zIHRvIGNvbnRhaW4gYW4gb2xkIGNvbnRlbnRz + + + + + + + + + diff --git a/tests/data/get_status-2.xml b/tests/data/get_status-2.xml new file mode 100644 index 0000000..c76a0aa --- /dev/null +++ b/tests/data/get_status-2.xml @@ -0,0 +1,97 @@ + + + + + + + 52435 + + + 0 + + + 52435 + 1639234742 + + + + + + + + 29.0.50 + + + + 4.16 + + + + + + 29.0.50 + + 52435 + + 1639234742 + db-h + 1639234742 + + <46037FBE-8DAB-4913-850C-96F4F78622E2@Web.DE> + The Reporter <example@example.com> + + emacs + pending + normal + unknown + 29.0.50; C-y seems to contain an old contents + + + + + + + 52436 + + + 0 + + + 52436 + 1639234742 + + + + + + + + + 52436 + patch + 1639234743 + db-h + 1639234743 + + <87h7bfrxw9.fsf@web.de> + "Dr. Reporter" <example@example.com> + + guile + pending + normal + unknown + [patch] autoconf: Check for gperf if running from git + + patch + + + + + + + diff --git a/tests/data/get_status-NA.xml b/tests/data/get_status-NA.xml new file mode 100644 index 0000000..08b711c --- /dev/null +++ b/tests/data/get_status-NA.xml @@ -0,0 +1,10 @@ + + + + + soap:Server + Too many bug reports (1001) requested, limit is 1000 at /usr/share/perl5/Debbugs/SOAP.pm line 128. + + + + diff --git a/tests/data/get_usertag-0.xml b/tests/data/get_usertag-0.xml new file mode 100644 index 0000000..4ad81b4 --- /dev/null +++ b/tests/data/get_usertag-0.xml @@ -0,0 +1,12 @@ + + + + + + + + diff --git a/tests/data/newest_bugs-0.xml b/tests/data/newest_bugs-0.xml new file mode 100644 index 0000000..6b1d2b9 --- /dev/null +++ b/tests/data/newest_bugs-0.xml @@ -0,0 +1,13 @@ + + + + + + + + diff --git a/tests/data/newest_bugs-1.xml b/tests/data/newest_bugs-1.xml new file mode 100644 index 0000000..70ba4f7 --- /dev/null +++ b/tests/data/newest_bugs-1.xml @@ -0,0 +1,15 @@ + + + + + + 52436 + + + + diff --git a/tests/data/newest_bugs-2.xml b/tests/data/newest_bugs-2.xml new file mode 100644 index 0000000..a27864b --- /dev/null +++ b/tests/data/newest_bugs-2.xml @@ -0,0 +1,16 @@ + + + + + + 52435 + 52436 + + + + diff --git a/tests/test_soap.py b/tests/test_soap.py new file mode 100644 index 0000000..40a6aaf --- /dev/null +++ b/tests/test_soap.py @@ -0,0 +1,210 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +"""Tests for the soap module.""" + +from pathlib import Path +from typing import Any, List +import unittest + +import pytest + +from gnudebbugs import soap + +TEST_DIR = Path(__file__).resolve().parent +TEST_DATA = TEST_DIR / 'data' + + +# The start of every SOAP request. +SOAP_HEADER = """\ +\ +""" + +# The end of every SOAP request. +SOAP_FOOTER = """""" + + +@pytest.mark.parametrize("func,args,kwargs,expected", ( + (soap.Api.get_bug_log, [1234], {}, '1234'), + (soap.Api.get_bugs, [], {}, ''), + (soap.Api.get_bugs, [], {'package': 'automake'}, 'packageautomake'), + (soap.Api.get_status, [[]], {}, ''), + (soap.Api.get_status, [[1]], {}, '1'), + (soap.Api.get_status, [[1, 2]], {}, '12'), +# (soap.Api.get_usertag, ['v@b', []], {}, '12'), + (soap.Api.newest_bugs, [0], {}, '0'), + (soap.Api.newest_bugs, [100], {}, '100'), +)) +def test_encode(func, args: List[Any], kwargs, expected: str): + expected = SOAP_HEADER + expected + SOAP_FOOTER + result = func(*args, **kwargs) + assert result == expected + + +@pytest.mark.parametrize("filename,expected", ( + ('get_bug_log-33711.xml', [ + { + 'attachments': [], + 'body': 'I compiled ...', + 'header': '...', + 'msg_num': 5, + }, + { + 'attachments': [], + 'body': 'reassign 33711 autoconf...', + 'header': '...', + 'msg_num': 8, + }, + { + 'attachments': [], + 'body': 'reassign 33711 autoconf\nstop\n', + 'header': '...', + 'msg_num': 10, + }, + { + 'attachments': [], + 'body': 'owner 33711 bug-autoconf@gnu.org\nstop\n', + 'header': '...', + 'msg_num': 12, + }, + ]), + ('get_bugs-0.xml', []), + ('get_bugs-2.xml', [25740, 33711]), + ('get_status-0.xml', None), + ('get_status-1.xml', { + 52435: { + 'affects': '', + 'archived': 0, + 'blockedby': '', + 'blocks': '', + 'bug_num': 52435, + 'date': 1639234742, + 'done': '', + 'fixed': None, + 'fixed_date': [], + 'fixed_versions': [], + 'forwarded': '', + 'found': { + '29.0.50': None, + }, + 'found_date': [], + 'found_versions': [ + '29.0.50', + ], + 'id': 52435, + 'keywords': '', + 'last_modified': 1639234742, + 'location': 'db-h', + 'log_modified': 1639234742, + 'mergedwith': '', + 'msgid': '<46037FBE-8DAB-4913-850C-96F4F78622E2@Web.DE>', + 'originator': 'The Reporter ', + 'owner': '', + 'package': 'emacs', + 'pending': 'pending', + 'severity': 'normal', + 'source': 'unknown', + 'subject': '29.0.50; C-y seems to contain an old contents', + 'summary': '', + 'tags': '', + 'unarchived': '', + }, + }), + ('get_status-2.xml', { + 52435: { + 'affects': '', + 'archived': 0, + 'blockedby': '', + 'blocks': '', + 'bug_num': 52435, + 'date': 1639234742, + 'done': '', + 'fixed': None, + 'fixed_date': [], + 'fixed_versions': [], + 'forwarded': '', + 'found': { + '29.0.50': None, + '4.16': None, + }, + 'found_date': [], + 'found_versions': [ + '29.0.50', + ], + 'id': 52435, + 'keywords': '', + 'last_modified': 1639234742, + 'location': 'db-h', + 'log_modified': 1639234742, + 'mergedwith': '', + 'msgid': '<46037FBE-8DAB-4913-850C-96F4F78622E2@Web.DE>', + 'originator': 'The Reporter ', + 'owner': '', + 'package': 'emacs', + 'pending': 'pending', + 'severity': 'normal', + 'source': 'unknown', + 'subject': '29.0.50; C-y seems to contain an old contents', + 'summary': '', + 'tags': '', + 'unarchived': '', + }, + 52436: { + 'affects': '', + 'archived': 0, + 'blockedby': '', + 'blocks': '', + 'bug_num': 52436, + 'date': 1639234742, + 'done': '', + 'fixed': None, + 'fixed_date': [], + 'fixed_versions': [], + 'forwarded': '', + 'found': None, + 'found_date': [], + 'found_versions': [], + 'id': 52436, + 'keywords': 'patch', + 'last_modified': 1639234743, + 'location': 'db-h', + 'log_modified': 1639234743, + 'mergedwith': '', + 'msgid': '<87h7bfrxw9.fsf@web.de>', + 'originator': '"Dr. Reporter" ', + 'owner': '', + 'package': 'guile', + 'pending': 'pending', + 'severity': 'normal', + 'source': 'unknown', + 'subject': '[patch] autoconf: Check for gperf if running from git', + 'summary': '', + 'tags': 'patch', + 'unarchived': '', + }, + }), + ('get_usertag-0.xml', None), + ('newest_bugs-0.xml', []), + ('newest_bugs-1.xml', [52436]), + ('newest_bugs-2.xml', [52435, 52436]), +)) +def test_parse(filename: str, expected: Any): + data = (TEST_DATA / filename).read_text() + result = soap.parse(data) + assert result == expected diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..4f10f3c --- /dev/null +++ b/tox.ini @@ -0,0 +1,30 @@ +# Copyright (C) 2021 Free Software Foundation, Inc. +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation, either version 3 of the License, or at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program. If not, see . + +# https://tox.readthedocs.io/ + +[tox] +envlist = py38, py39 + +[gh-actions] +python = + 3.8: py38 + 3.9: py39 + +[testenv] +deps = + pytest + requests +commands = {envpython} ./pytest