1 # Copyright (C) 2021 Free Software Foundation, Inc.
3 # This program is free software: you can redistribute it and/or modify it under
4 # the terms of the GNU Lesser General Public License as published by the Free
5 # Software Foundation, either version 3 of the License, or at your option) any
8 # This program is distributed in the hope that it will be useful, but WITHOUT
9 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
10 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 # You should have received a copy of the GNU Lesser General Public License
14 # along with this program. If not, see <https://www.gnu.org/licenses/>.
16 """Control interface to debbugs.
18 https://debbugs.gnu.org/Advanced.html
19 https://debbugs.gnu.org/server-control.html
23 from email.message import EmailMessage
26 from typing import Any, List, Optional
27 import xml.etree.ElementTree as ET
33 """A control interface to debbugs."""
35 def __init__(self, host: str):
37 self.to = f"control@{host}"
40 def assert_action(action: str) -> None:
41 """Verify |action| is a valid action."""
42 if action not in ("+", "-", "="):
43 raise ValueError(f'Invalid action "{action}"')
46 def assert_bug(bug: int) -> None:
47 """Verify |bug| is a valid syntax-wise bug number."""
48 if not isinstance(bug, int):
49 raise TypeError(f'Expected integer, but bug "{bug}" is of type {type(bug)}')
51 raise ValueError(f"Bugs must be positive, not {bug}")
54 def assert_bugs(bugs: List[int]) -> None:
55 """Verify |bugs| are valid syntax-wise bug numbers."""
57 raise ValueError(f"Need at least one bug to merge")
62 def assert_email(email: str) -> None:
63 """Verify |email| is a valid e-mail address."""
64 # Special case: debbugs uses ! as an alias to the From address.
69 if " " in email or "@" not in email:
70 raise ValueError(f'Invalid e-mail address "{email}"')
73 def assert_newid(newid: int) -> None:
74 """Verify |newid| is a valid new id (for cloning)."""
75 if not isinstance(newid, int):
77 f'Expected integer, but newid "{newid}" is of type {type(newid)}'
80 raise ValueError(f"Newids must be negative, not {newid}")
83 def assert_package(package: str) -> None:
84 """Verify |package| is a valid package name."""
86 raise ValueError(f'Invalid package "{package}"')
104 def assert_tags(tags: List[str]) -> None:
105 """Verify |tags| are valid tag name."""
107 raise ValueError(f"Need at least one tag")
110 raise ValueError(f'Invalid tag "{tag}"')
124 def assert_severity(severity: str) -> None:
125 """Verify |severity| is a valid severity level."""
126 if severity not in SEVERITIES:
127 raise ValueError(f'Invalid severity "{severity}"')
131 """A queue of messages."""
135 self.stop_message = "thankyou"
137 def _get_messages(self):
138 if not self.messages:
140 return "\n".join(self.messages) + f"\n{self.stop_message}\n"
142 def _queue(self, message: str) -> None:
143 assert "\n" not in message
144 self.messages.append(message)
146 def close(self, bug: int, fixed: Optional[str] = "") -> None:
148 self._queue(f"close {bug} {fixed}")
150 def reassign(self, bug: int, package: str, version: Optional[str] = "") -> None:
152 assert_package(package)
153 self._queue(f"reassign {bug} {package} {version}")
155 def severity(self, bug: int, severity: str) -> None:
157 assert_severity(severity)
158 self._queue(f"severity {bug} {severity}")
160 def reopen(self, bug: int, address: Optional[str] = "") -> None:
162 if address and address != "=":
163 assert_email(address)
164 self._queue(f"reopen {bug} {address}")
166 def found(self, bug: int, version: Optional[str] = "") -> None:
169 assert_version(version)
170 self._queue(f"found {bug} {version}")
172 def notfound(self, bug: int, version: str) -> None:
174 assert_version(version)
175 self._queue(f"notfound {bug} {version}")
177 def submitter(self, bug: int, address: str) -> None:
179 assert_email(address)
180 self._queue(f"submitter {bug} {address}")
182 def forwarded(self, bug: int, address: str) -> None:
184 assert_email(address)
185 self._queue(f"forwarded {bug} {address}")
187 def notforwarded(self, bug: int) -> None:
189 self._queue(f"notforwarded {bug}")
191 def owner(self, bug: int, address: str) -> None:
193 assert_email(address)
194 self._queue(f"owner {bug} {address}")
196 def noowner(self, bug: int) -> None:
198 self._queue(f"noowner {bug}")
200 def retitle(self, bug: int, title: str) -> None:
203 raise ValueError(f"Newlines not allowed in titles")
204 self._queue(f"retitle {bug} {title}")
206 def clone(self, bug: int, *newids: List[int]) -> None:
209 raise ValueError(f"Need at least one newid to clone")
212 self._queue(f'clone {bug} {" ".join(str(x) for x in newids)}')
214 def merge(self, bug: int, *bugs: List[int]) -> None:
217 self._queue(f'merge {bug} {" ".join(str(x) for x in bugs)}')
219 def forcemerge(self, bug: int, *bugs: List[int]) -> None:
222 self._queue(f'forcemerge {bug} {" ".join(str(x) for x in bugs)}')
224 def unmerge(self, bug: int) -> None:
226 self._queue(f"unmerge {bug}")
228 def tag(self, bug: int, action: str, *tags: List[str]) -> None:
230 assert_action(action)
231 # TODO: Double check you can clear tags this way. The spec doesn't say it's possible at all ...
232 if action != '=' or tags:
234 self._queue(f'tag {bug} {action} {" ".join(tags)}')
236 def block(self, bug: int, *bugs: List[int]) -> None:
239 self._queue(f'block {bug} {" ".join(str(x) for x in bugs)}')
241 def unblock(self, bug: int, *bugs: List[int]) -> None:
244 self._queue(f'unblock {bug} {" ".join(str(x) for x in bugs)}')
246 def stop(self, form: Optional[str] = None):
247 """Halt further commands.
249 NB: This is handled automatically.
251 if form not in ("quit", "stop", "--") and not form.startswith("thank"):
252 raise ValueError(f'Invalid stop command "{form}"')
253 self.stop_message = form
255 def user(self, user: str) -> None:
256 """Set the user to the given email address."""
258 self._queue(f"user {user}")
260 def usertag(self, bug: int, action: str, *tags: List[str]) -> None:
261 """Sets usertags for the bug report.
263 Action may be +-= to add/remove/set.
265 assert_action(action)
266 # TODO: Need to check usertags are arbitrary.
268 self._queue(f'usertag {bug} {action} {"".join(tags)}')
273 self, to: str, email: str, use_sendmail: bool = True, sendmail: str = "sendmail"
277 self.use_sendmail = use_sendmail
278 self.sendmail = sendmail
279 self.queue = MessageQueue()
281 def send(self, dryrun: Optional[bool] = False) -> None:
284 msg["From"] = self.email
285 msg.set_content(self.queue._get_messages())
286 self._send_email(msg, dryrun=dryrun)
288 def _send_email(self, msg: EmailMessage, dryrun: Optional[bool] = False) -> None:
289 if self.use_sendmail:
290 sendmail = self.sendmail + " -t"
292 print(f"Sending message:\n$ {sendmail}\n{msg}")
293 sendmail = "exit 0; " + sendmail
294 subprocess.run(sendmail, shell=True, input=str(msg).encode(), check=True)
296 with smtplib.SMTP("localhost") as s: