"""Helper to rebase all local branches."""
import argparse
+import functools
import os
from pathlib import Path
import re
+import shlex
import subprocess
import sys
+# Not sure if newer features are used.
+assert sys.version_info >= (3, 6), f'Python 3.6+ required but found {sys.version}'
+
+
PROG = os.path.basename(__file__)
+DEBUG = False
class Terminal:
HILITE = _combine(Terminal.BOLD, Terminal.FG_CYAN)
BRACKET = _combine(Terminal.BOLD, Terminal.FG_BLUE)
+ @classmethod
+ def good(cls, msg):
+ return cls.GOOD + msg + cls.NORMAL
+
+ @classmethod
+ def bad(cls, msg):
+ return cls.BAD + msg + cls.NORMAL
+
+
+def dbg(*args, **kwargs):
+ """Print a debug |msg|."""
+ if DEBUG:
+ print(*args, file=sys.stderr, **kwargs)
+
def fatal(msg):
"""Show an error |msg| then exit."""
- print(f'{Color.BAD}{PROG}: error: {msg}{Color.NORMAL}', file=sys.stderr)
+ print(Color.bad(f'{PROG}: error: {msg}'), file=sys.stderr)
sys.exit(1)
def git(args, **kwargs):
"""Run git."""
kwargs.setdefault('check', True)
- kwargs.setdefault('capture_output', True)
+ kwargs.setdefault('stdout', subprocess.PIPE)
+ kwargs.setdefault('stderr', subprocess.STDOUT)
kwargs.setdefault('encoding', 'utf-8')
- #print('+', 'git', *args)
- return subprocess.run(['git'] + args, **kwargs)
-
-
+ if DEBUG:
+ dbg('+', 'git', shlex.join(args))
+ ret = subprocess.run(['git'] + args, **kwargs)
+ if DEBUG:
+ if ret.stdout:
+ dbg(ret.stdout.rstrip())
+ if ret.stderr:
+ dbg('stderr =', ret.stderr)
+ if ret.returncode:
+ dbg('++ exit', ret.returncode)
+ return ret
+
+
+@functools.lru_cache(maxsize=None)
+def checkout_is_dirty():
+ """Determine whether the checkout is dirty (e.g. modified files)."""
+ output = git(['status', '--porcelain']).stdout
+ return any(x for x in output.splitlines() if '?' not in x[0:2])
+
+
+@functools.lru_cache(maxsize=None)
def rebase_inprogress():
"""Determine whether a rebase is already in progress."""
output = git(['rev-parse', '--git-path', 'rebase-merge']).stdout.strip()
return False
+@functools.lru_cache(maxsize=None)
+def cherry_pick_inprogress():
+ """Determine whether a cherry-pick is in progress."""
+ output = git(['rev-parse', '--git-path', 'CHERRY_PICK_HEAD']).stdout.strip()
+ return Path(output).exists()
+
+
+@functools.lru_cache(maxsize=None)
+def top_dir() -> Path:
+ """Find the top dir of the git checkout."""
+ output = git(['rev-parse', '--show-toplevel'], stderr=subprocess.PIPE).stdout.strip()
+ return Path(output).resolve()
+
+
+@functools.lru_cache(maxsize=None)
+def git_dir() -> Path:
+ """Find the internal git dir for this project."""
+ output = git(['rev-parse', '--git-dir']).stdout.strip()
+ return Path(output).resolve()
+
+
+@functools.lru_cache(maxsize=None)
+def worktree_is_local(worktree: str) -> bool:
+ """See whether |worktree| is the cwd git repo."""
+ if not worktree:
+ return True
+
+ # If .git is a symlink, worktree result might be the target.
+ if worktree == str(git_dir().resolve()):
+ return True
+
+ # NB: worktree path is supposed to be absolute from for-each-ref, but it's
+ # not always, so we have to resolve it. https://crbug.com/git/88
+ worktree = (git_dir() / worktree).resolve()
+ return worktree == top_dir()
+
+
class AppendOption(argparse.Action):
"""Append the command line option (with no arguments) to dest.
parser.add_argument(
'--catchup', action='store_true',
help='run git-rb-catchup when rebasing')
+ parser.add_argument(
+ '-d', '--debug', action='store_true',
+ help='enable debug output')
parser.add_argument(
'-q', '--quiet', dest='git_options', action=AppendOption, default=['-q'],
help='passthru to git rebase')
parser = get_parser()
opts = parser.parse_args(argv)
- # Skip if rebase is in progress.
- if rebase_inprogress():
- fatal('skipping due to active rebase')
+ global DEBUG
+ DEBUG = opts.debug
# Switch to the top dir in case the working dir doesn't exist in every branch.
- topdir = git(['rev-parse', '--show-toplevel']).stdout.strip()
+ try:
+ topdir = top_dir()
+ except subprocess.CalledProcessError as e:
+ sys.exit(f'{os.path.basename(sys.argv[0])}: {Path.cwd()}:\n{e}\n{e.stderr.strip()}')
os.chdir(topdir)
# Example output:
# ||s-stash||
state = git(
['for-each-ref', '--format=%(HEAD)|%(worktreepath)|%(refname:short)|%(upstream)|%(upstream:track,nobracket)',
- 'refs/heads/*']).stdout.splitlines()
+ 'HEAD', 'refs/heads/*']).stdout.splitlines()
curr_state = None
branch_width = 0
branch_width = max(branch_width, len(branch))
if head == '*':
curr_state = branch
- if not (worktreepath and worktreepath != topdir):
local_count += 1
+ elif worktree_is_local(worktreepath):
+ local_count += 1
+ else:
+ dbg(f'{worktreepath}:{branch}: Skipping branch checked out in diff worktree')
# If there are no branches to rebase, go silently.
if not local_count:
return 0
if not curr_state:
- fatal('unable to resolve current branch')
+ # Are we in a detached head state?
+ if not git(['symbolic-ref', '-q', 'HEAD'], check=False).returncode:
+ fatal('unable to resolve current branch')
+ curr_state = git(['rev-parse', 'HEAD']).stdout.strip()
- is_dirty = None
+ switched_head = False
branches = {}
for line in state:
- head, worktreepath, branch, tracking, ahead_behind = line.split('|')
+ _, worktreepath, branch, tracking, ahead_behind = line.split('|')
# If it's a branch in another worktree, ignore it.
- if worktreepath and worktreepath != topdir:
+ if not worktree_is_local(worktreepath):
+ dbg(f'{worktreepath}:{branch}: Skipping branch checked out in diff worktree')
continue
print(f'{Color.BRACKET}### {Color.GOOD}{branch:{branch_width}}{Color.NORMAL} ',
m = re.search(r'behind ([0-9]+)', ahead_behind)
behind = int(m.group(1)) if m else 0
if not behind:
- print('Up-to-date!')
+ print(Color.good('up-to-date'))
continue
elif not ahead:
- git(['update-ref', f'refs/heads/{branch}', tracking])
- print('fast forwarded')
+ # If we haven't switched the checkout, update-ref on current HEAD
+ # will get us into a dirty checkout, so use merge to handle it.
+ if switched_head or curr_state != branch:
+ git(['update-ref', f'refs/heads/{branch}', tracking])
+ print('fast forwarded [updated ref]')
+ else:
+ result = git(['merge', '-q', '--ff-only'], check=False)
+ if result.returncode:
+ print(Color.bad('unable to merge') + '\n' + result.stdout.strip())
+ else:
+ print('fast forwarded [merged]')
continue
- if is_dirty is None:
- output = git(['status', '--porcelain']).stdout
- is_dirty = any(x for x in output.splitlines() if '?' not in x[0:2])
- if is_dirty:
- print(f'{Color.BAD}unable to rebase: tree is dirty{Color.NORMAL}')
+ # Skip this ref if tree is in a bad state.
+ if rebase_inprogress():
+ print(Color.bad('skipping due to active rebase'))
+ continue
+ if cherry_pick_inprogress():
+ print(Color.bad('skipping due to active cherry-pick'))
+ continue
+ if checkout_is_dirty():
+ print(Color.bad('unable to rebase: tree is dirty'))
continue
print(f'rebasing [{ahead_behind}] ', end='', flush=True)
- git(['checkout', '-q', branch])
+ result = git(['checkout', '-q', branch], check=False)
+ if result.returncode:
+ print(Color.bad('unable to checkout') + '\n' + result.stdout.strip())
+ continue
+ switched_head = True
if opts.catchup:
print()
result = git(['rb-catchup'], capture_output=False, check=False)
else:
result = git(['rebase'] + opts.git_options, check=False)
if result.returncode:
- git(['rebase', '--abort'])
- print(f'{Color.BAD}failed{Color.NORMAL}\n' + result.stdout.strip())
+ git(['rebase', '--abort'], check=False)
+ print(Color.bad('failed') + '\n' + result.stdout.strip())
else:
- print('OK!')
+ print(Color.good('OK'))
- git(['checkout', '-q', curr_state])
+ if switched_head:
+ git(['checkout', '-q', curr_state])
return 0