]> git.wh0rd.org Git - gnudebbugs.git/blob - src/gnudebbugs/control.py
initial release
[gnudebbugs.git] / src / gnudebbugs / control.py
1 # Copyright (C) 2021 Free Software Foundation, Inc.
2 #
3 # This program is free software: you can redistribute it and/or modify it under
4 # the terms of the GNU Lesser General Public License as published by the Free
5 # Software Foundation, either version 3 of the License, or at your option) any
6 # later version.
7 #
8 # This program is distributed in the hope that it will be useful, but WITHOUT
9 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10 # FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
11 # details.
12 #
13 # You should have received a copy of the GNU Lesser General Public License
14 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
15
16 """Control interface to debbugs.
17
18 https://debbugs.gnu.org/Advanced.html
19 https://debbugs.gnu.org/server-control.html
20 """
21
22 import collections
23 from email.message import EmailMessage
24 import smtplib
25 import subprocess
26 from typing import Any, List, Optional
27 import xml.etree.ElementTree as ET
28
29 import requests
30
31
32 class Control:
33     """A control interface to debbugs."""
34
35     def __init__(self, host: str):
36         self.host = host
37         self.to = f"control@{host}"
38
39
40 def assert_action(action: str) -> None:
41     """Verify |action| is a valid action."""
42     if action not in ("+", "-", "="):
43         raise ValueError(f'Invalid action "{action}"')
44
45
46 def assert_bug(bug: int) -> None:
47     """Verify |bug| is a valid syntax-wise bug number."""
48     if not isinstance(bug, int):
49         raise TypeError(f'Expected integer, but bug "{bug}" is of type {type(bug)}')
50     if bug <= 0:
51         raise ValueError(f"Bugs must be positive, not {bug}")
52
53
54 def assert_bugs(bugs: List[int]) -> None:
55     """Verify |bugs| are valid syntax-wise bug numbers."""
56     if not bugs:
57         raise ValueError(f"Need at least one bug to merge")
58     for b in bugs:
59         assert_bug(b)
60
61
62 def assert_email(email: str) -> None:
63     """Verify |email| is a valid e-mail address."""
64     # Special case: debbugs uses ! as an alias to the From address.
65     if email == "!":
66         return
67
68     # TODO: Lookup RFC.
69     if " " in email or "@" not in email:
70         raise ValueError(f'Invalid e-mail address "{email}"')
71
72
73 def assert_newid(newid: int) -> None:
74     """Verify |newid| is a valid new id (for cloning)."""
75     if not isinstance(newid, int):
76         raise TypeError(
77             f'Expected integer, but newid "{newid}" is of type {type(newid)}'
78         )
79     if newid >= 0:
80         raise ValueError(f"Newids must be negative, not {newid}")
81
82
83 def assert_package(package: str) -> None:
84     """Verify |package| is a valid package name."""
85     if " " in package:
86         raise ValueError(f'Invalid package "{package}"')
87
88
89 TAGS = {
90     "patch",
91     "wontfix",
92     "moreinfo",
93     "unreproducible",
94     "fixed",
95     "notabug",
96     "pending",
97     "help",
98     "security",
99     "confirmed",
100     "easy",
101 }
102
103
104 def assert_tags(tags: List[str]) -> None:
105     """Verify |tags| are valid tag name."""
106     if not tags:
107         raise ValueError(f"Need at least one tag")
108     for t in tags:
109         if t not in TAGS:
110             raise ValueError(f'Invalid tag "{tag}"')
111
112
113 SEVERITIES = {
114     "critical",
115     "grave",
116     "serious",
117     "important",
118     "normal",
119     "minor",
120     "wishlist",
121 }
122
123
124 def assert_severity(severity: str) -> None:
125     """Verify |severity| is a valid severity level."""
126     if severity not in SEVERITIES:
127         raise ValueError(f'Invalid severity "{severity}"')
128
129
130 class MessageQueue:
131     """A queue of messages."""
132
133     def __init__(self):
134         self.messages = []
135         self.stop_message = "thankyou"
136
137     def _get_messages(self):
138         if not self.messages:
139             return ''
140         return "\n".join(self.messages) + f"\n{self.stop_message}\n"
141
142     def _queue(self, message: str) -> None:
143         assert "\n" not in message
144         self.messages.append(message)
145
146     def close(self, bug: int, fixed: Optional[str] = "") -> None:
147         assert_bug(bug)
148         self._queue(f"close {bug} {fixed}")
149
150     def reassign(self, bug: int, package: str, version: Optional[str] = "") -> None:
151         assert_bug(bug)
152         assert_package(package)
153         self._queue(f"reassign {bug} {package} {version}")
154
155     def severity(self, bug: int, severity: str) -> None:
156         assert_bug(bug)
157         assert_severity(severity)
158         self._queue(f"severity {bug} {severity}")
159
160     def reopen(self, bug: int, address: Optional[str] = "") -> None:
161         assert_bug(bug)
162         if address and address != "=":
163             assert_email(address)
164         self._queue(f"reopen {bug} {address}")
165
166     def found(self, bug: int, version: Optional[str] = "") -> None:
167         assert_bug(bug)
168         if found:
169             assert_version(version)
170         self._queue(f"found {bug} {version}")
171
172     def notfound(self, bug: int, version: str) -> None:
173         assert_bug(bug)
174         assert_version(version)
175         self._queue(f"notfound {bug} {version}")
176
177     def submitter(self, bug: int, address: str) -> None:
178         assert_bug(bug)
179         assert_email(address)
180         self._queue(f"submitter {bug} {address}")
181
182     def forwarded(self, bug: int, address: str) -> None:
183         assert_bug(bug)
184         assert_email(address)
185         self._queue(f"forwarded {bug} {address}")
186
187     def notforwarded(self, bug: int) -> None:
188         assert_bug(bug)
189         self._queue(f"notforwarded {bug}")
190
191     def owner(self, bug: int, address: str) -> None:
192         assert_bug(bug)
193         assert_email(address)
194         self._queue(f"owner {bug} {address}")
195
196     def noowner(self, bug: int) -> None:
197         assert_bug(bug)
198         self._queue(f"noowner {bug}")
199
200     def retitle(self, bug: int, title: str) -> None:
201         assert_bug(bug)
202         if "\n" in title:
203             raise ValueError(f"Newlines not allowed in titles")
204         self._queue(f"retitle {bug} {title}")
205
206     def clone(self, bug: int, *newids: List[int]) -> None:
207         assert_bug(bug)
208         if not newids:
209             raise ValueError(f"Need at least one newid to clone")
210         for b in newids:
211             assert_newid(b)
212         self._queue(f'clone {bug} {" ".join(str(x) for x in newids)}')
213
214     def merge(self, bug: int, *bugs: List[int]) -> None:
215         assert_bug(bug)
216         assert_bugs(bugs)
217         self._queue(f'merge {bug} {" ".join(str(x) for x in bugs)}')
218
219     def forcemerge(self, bug: int, *bugs: List[int]) -> None:
220         assert_bug(bug)
221         assert_bugs(bugs)
222         self._queue(f'forcemerge {bug} {" ".join(str(x) for x in bugs)}')
223
224     def unmerge(self, bug: int) -> None:
225         assert_bug(bug)
226         self._queue(f"unmerge {bug}")
227
228     def tag(self, bug: int, action: str, *tags: List[str]) -> None:
229         assert_bug(bug)
230         assert_action(action)
231         # TODO: Double check you can clear tags this way.  The spec doesn't say it's possible at all ...
232         if action != '=' or tags:
233             assert_tags(tags)
234         self._queue(f'tag {bug} {action} {" ".join(tags)}')
235
236     def block(self, bug: int, *bugs: List[int]) -> None:
237         assert_bug(bug)
238         assert_bugs(bugs)
239         self._queue(f'block {bug} {" ".join(str(x) for x in bugs)}')
240
241     def unblock(self, bug: int, *bugs: List[int]) -> None:
242         assert_bug(bug)
243         assert_bugs(bugs)
244         self._queue(f'unblock {bug} {" ".join(str(x) for x in bugs)}')
245
246     def stop(self, form: Optional[str] = None):
247         """Halt further commands.
248
249         NB: This is handled automatically.
250         """
251         if form not in ("quit", "stop", "--") and not form.startswith("thank"):
252             raise ValueError(f'Invalid stop command "{form}"')
253         self.stop_message = form
254
255     def user(self, user: str) -> None:
256         """Set the user to the given email address."""
257         assert_email(user)
258         self._queue(f"user {user}")
259
260     def usertag(self, bug: int, action: str, *tags: List[str]) -> None:
261         """Sets usertags for the bug report.
262
263         Action may be +-= to add/remove/set.
264         """
265         assert_action(action)
266         # TODO: Need to check usertags are arbitrary.
267         # assert_tags(tags)
268         self._queue(f'usertag {bug} {action} {"".join(tags)}')
269
270
271 class Control:
272     def __init__(
273         self, to: str, email: str, use_sendmail: bool = True, sendmail: str = "sendmail"
274     ):
275         self.to = to
276         self.email = email
277         self.use_sendmail = use_sendmail
278         self.sendmail = sendmail
279         self.queue = MessageQueue()
280
281     def send(self, dryrun: Optional[bool] = False) -> None:
282         msg = EmailMessage()
283         msg["To"] = self.to
284         msg["From"] = self.email
285         msg.set_content(self.queue._get_messages())
286         self._send_email(msg, dryrun=dryrun)
287
288     def _send_email(self, msg: EmailMessage, dryrun: Optional[bool] = False) -> None:
289         if self.use_sendmail:
290             sendmail = self.sendmail + " -t"
291             if dryrun:
292                 print(f"Sending message:\n$ {sendmail}\n{msg}")
293                 sendmail = "exit 0; " + sendmail
294             subprocess.run(sendmail, shell=True, input=str(msg).encode(), check=True)
295         else:
296             with smtplib.SMTP("localhost") as s:
297                 s.send_message(msg)