#!/usr/bin/env python3 """Helper to rebase all local branches.""" import argparse import functools import os from pathlib import Path import re import shlex import subprocess import sys # Not sure if newer features are used. assert sys.version_info >= (3, 6), f'Python 3.6+ required but found {sys.version}' PROG = os.path.basename(__file__) DEBUG = False class Terminal: """Terminal escape sequences.""" CSI_PREFIX = '\033[' SGR_SUFFIX = 'm' NORMAL = '' BOLD = '1' _BLACK, _RED, _GREEN, _YELLOW, _BLUE, _MAGENTA, _CYAN, _WHITE = range(0, 8) _FG = 30 _BG = 40 FG_BLACK = str(_FG + _BLACK) FG_RED = str(_FG + _RED) FG_GREEN = str(_FG + _GREEN) FG_YELLOW = str(_FG + _YELLOW) FG_BLUE = str(_FG + _BLUE) FG_MAGENTA = str(_FG + _MAGENTA) FG_CYAN = str(_FG + _CYAN) FG_WHITE = str(_FG + _WHITE) class Color: """Helper colors.""" _combine = lambda *args: Terminal.CSI_PREFIX + ';'.join(args) + Terminal.SGR_SUFFIX NORMAL = _combine(Terminal.NORMAL) GOOD = _combine(Terminal.FG_GREEN) WARN = _combine(Terminal.FG_YELLOW) BAD = _combine(Terminal.FG_RED) HILITE = _combine(Terminal.BOLD, Terminal.FG_CYAN) BRACKET = _combine(Terminal.BOLD, Terminal.FG_BLUE) @classmethod def good(cls, msg): return cls.GOOD + msg + cls.NORMAL @classmethod def bad(cls, msg): return cls.BAD + msg + cls.NORMAL def dbg(*args, **kwargs): """Print a debug |msg|.""" if DEBUG: print(*args, file=sys.stderr, **kwargs) def fatal(msg): """Show an error |msg| then exit.""" print(Color.bad(f'{PROG}: error: {msg}'), file=sys.stderr) sys.exit(1) def git(args, **kwargs): """Run git.""" kwargs.setdefault('check', True) kwargs.setdefault('stdout', subprocess.PIPE) kwargs.setdefault('stderr', subprocess.STDOUT) kwargs.setdefault('encoding', 'utf-8') if DEBUG: dbg('+', 'git', shlex.join(args)) ret = subprocess.run(['git'] + args, **kwargs) if DEBUG: if ret.stdout: dbg(ret.stdout.rstrip()) if ret.stderr: dbg('stderr =', ret.stderr) if ret.returncode: dbg('++ exit', ret.returncode) return ret @functools.lru_cache(maxsize=None) def checkout_is_dirty(): """Determine whether the checkout is dirty (e.g. modified files).""" output = git(['status', '--porcelain']).stdout return any(x for x in output.splitlines() if '?' not in x[0:2]) @functools.lru_cache(maxsize=None) def rebase_inprogress(): """Determine whether a rebase is already in progress.""" output = git(['rev-parse', '--git-path', 'rebase-merge']).stdout.strip() if Path(output).exists(): return True output = git(['rev-parse', '--git-path', 'rebase-apply']).stdout.strip() if Path(output).exists(): return True return False @functools.lru_cache(maxsize=None) def cherry_pick_inprogress(): """Determine whether a cherry-pick is in progress.""" output = git(['rev-parse', '--git-path', 'CHERRY_PICK_HEAD']).stdout.strip() return Path(output).exists() @functools.lru_cache(maxsize=None) def top_dir() -> Path: """Find the top dir of the git checkout.""" output = git(['rev-parse', '--show-toplevel'], stderr=subprocess.PIPE).stdout.strip() return Path(output).resolve() @functools.lru_cache(maxsize=None) def git_dir() -> Path: """Find the internal git dir for this project.""" output = git(['rev-parse', '--git-dir']).stdout.strip() return Path(output).resolve() @functools.lru_cache(maxsize=None) def worktree_is_local(worktree: str) -> bool: """See whether |worktree| is the cwd git repo.""" if not worktree: return True # If .git is a symlink, worktree result might be the target. if worktree == str(git_dir().resolve()): return True # NB: worktree path is supposed to be absolute from for-each-ref, but it's # not always, so we have to resolve it. https://crbug.com/git/88 worktree = (git_dir() / worktree).resolve() return worktree == top_dir() class AppendOption(argparse.Action): """Append the command line option (with no arguments) to dest. parser.add_argument('-b', '--barg', dest='out', action='append_option') options = parser.parse_args(['-b', '--barg']) options.out == ['-b', '--barg'] """ def __init__(self, option_strings, dest, **kwargs): if 'nargs' in kwargs: raise ValueError('nargs is not supported for append_option action') super().__init__(option_strings, dest, nargs=0, **kwargs) def __call__(self, parser, namespace, values, option_string=None): if getattr(namespace, self.dest, None) is None: setattr(namespace, self.dest, []) getattr(namespace, self.dest).append(option_string) def get_parser(): """Get CLI parser.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( '--catchup', action='store_true', help='run git-rb-catchup when rebasing') parser.add_argument( '-d', '--debug', action='store_true', help='enable debug output') parser.add_argument( '-q', '--quiet', dest='git_options', action=AppendOption, default=['-q'], help='passthru to git rebase') return parser def main(argv): """The main entry point for scripts.""" parser = get_parser() opts = parser.parse_args(argv) global DEBUG DEBUG = opts.debug # Switch to the top dir in case the working dir doesn't exist in every branch. try: topdir = top_dir() except subprocess.CalledProcessError as e: sys.exit(f'{os.path.basename(sys.argv[0])}: {Path.cwd()}:\n{e}\n{e.stderr.strip()}') os.chdir(topdir) # Example output: # ||m|refs/remotes/origin/master|ahead 2, behind 203 # *||master|refs/remotes/origin/master|ahead 1 # |/usr/local/src/gnu/gdb/build/release|release||behind 10 # ||s-stash|| state = git( ['for-each-ref', '--format=%(HEAD)|%(worktreepath)|%(refname:short)|%(upstream)|%(upstream:track,nobracket)', 'HEAD', 'refs/heads/*']).stdout.splitlines() curr_state = None branch_width = 0 local_count = 0 for line in state: head, worktreepath, branch, tracking, ahead_behind = line.split('|') branch_width = max(branch_width, len(branch)) if head == '*': curr_state = branch local_count += 1 elif worktree_is_local(worktreepath): local_count += 1 else: dbg(f'{worktreepath}:{branch}: Skipping branch checked out in diff worktree') # If there are no branches to rebase, go silently. if not local_count: return 0 if not curr_state: # Are we in a detached head state? if not git(['symbolic-ref', '-q', 'HEAD'], check=False).returncode: fatal('unable to resolve current branch') curr_state = git(['rev-parse', 'HEAD']).stdout.strip() switched_head = False branches = {} for line in state: _, worktreepath, branch, tracking, ahead_behind = line.split('|') # If it's a branch in another worktree, ignore it. if not worktree_is_local(worktreepath): dbg(f'{worktreepath}:{branch}: Skipping branch checked out in diff worktree') continue print(f'{Color.BRACKET}### {Color.GOOD}{branch:{branch_width}}{Color.NORMAL} ', end='', flush=True) if not tracking: print(f'{Color.WARN}skipping{Color.NORMAL} due to missing merge branch') continue m = re.match(r'ahead ([0-9]+)', ahead_behind) ahead = int(m.group(1)) if m else 0 m = re.search(r'behind ([0-9]+)', ahead_behind) behind = int(m.group(1)) if m else 0 if not behind: print(Color.good('up-to-date')) continue elif not ahead: # If we haven't switched the checkout, update-ref on current HEAD # will get us into a dirty checkout, so use merge to handle it. if switched_head or curr_state != branch: git(['update-ref', f'refs/heads/{branch}', tracking]) print('fast forwarded [updated ref]') else: result = git(['merge', '-q', '--ff-only'], check=False) if result.returncode: print(Color.bad('unable to merge') + '\n' + result.stdout.strip()) else: print('fast forwarded [merged]') continue # Skip this ref if tree is in a bad state. if rebase_inprogress(): print(Color.bad('skipping due to active rebase')) continue if cherry_pick_inprogress(): print(Color.bad('skipping due to active cherry-pick')) continue if checkout_is_dirty(): print(Color.bad('unable to rebase: tree is dirty')) continue print(f'rebasing [{ahead_behind}] ', end='', flush=True) result = git(['checkout', '-q', branch], check=False) if result.returncode: print(Color.bad('unable to checkout') + '\n' + result.stdout.strip()) continue switched_head = True if opts.catchup: print() result = git(['rb-catchup'], capture_output=False, check=False) else: result = git(['rebase'] + opts.git_options, check=False) if result.returncode: git(['rebase', '--abort'], check=False) print(Color.bad('failed') + '\n' + result.stdout.strip()) else: print(Color.good('OK')) if switched_head: git(['checkout', '-q', curr_state]) return 0 if __name__ == '__main__': sys.exit(main(sys.argv[1:]))