#!/usr/bin/env python3 # Distributed under the terms of the GNU General Public License v2 or later. """Helper to automatically rebase onto latest commit possible. Helpful when you have a branch tracking an old commit, and a lot of conflicting changes have landed in the latest branch, but you still want to update. A single rebase to the latest commit will require addressing all the different changes at once which can be difficult, overwhelming, and error-prone. Instead, if you rebased onto each intermediate conflicting point, you'd break up the work into smaller pieces, and be able to run tests to make sure things were still OK. """ import argparse import os from pathlib import Path import subprocess import sys from typing import List, Tuple, Union assert sys.version_info >= (3, 7), f'Need Python 3.7+, not {sys.version_info}' def git(args: List[str], **kwargs) -> subprocess.CompletedProcess: """Run git.""" kwargs.setdefault('check', True) kwargs.setdefault('capture_output', True) kwargs.setdefault('encoding', 'utf-8') # pylint: disable=subprocess-run-check return subprocess.run(['git'] + args, **kwargs) def rebase(target: str) -> bool: """Try to rebase onto |target|.""" try: git(['rebase', target]) return True except KeyboardInterrupt: git(['rebase', '--abort']) print('aborted') sys.exit(1) except: git(['rebase', '--abort']) return False def rebase_bisect(lbranch: str, rbranch: str, behind: int, leave_rebase: bool = False, force_checkout: bool = False): """Try to rebase branch as close to |rbranch| as possible.""" def attempt(pos: int) -> bool: target = f'{rbranch}~{pos}' print(f'Rebasing onto {target} ', end='') print('.', end='', flush=True) # Checking out these branches directly helps clobber orphaned files, # but is usually unnessary, and can slow down the overall process. if force_checkout: git(['checkout', '-f', target]) print('.', end='', flush=True) if force_checkout: git(['checkout', '-f', lbranch]) print('. ', end='', flush=True) ret = rebase(target) print('OK' if ret else 'failed') return ret # "pmin" is the latest branch position while "pmax" is where we're now. pmin = 0 pmax = behind old_mid = None first_fail = 0 while True: mid = pmin + (pmax - pmin) // 2 if mid == old_mid or mid < pmin or mid >= pmax: break if attempt(mid): pmax = mid else: first_fail = max(first_fail, mid) pmin = mid old_mid = mid if pmin or pmax: last_target = f'{rbranch}~{first_fail}' result = git(['log', '-1', '--format=%s', last_target]) subject = result.stdout.strip() if leave_rebase: print('Restarting', last_target) result = git(['rebase', last_target], check=False) print(result.stdout.strip()) print(f'* Found first failure {last_target}: {subject}') else: print('All caught up!') def get_ahead_behind(lbranch: str, rbranch: str) -> Tuple[int, int]: """Return number of commits |lbranch| is ahead & behind relative to |rbranch|.""" output = git( ['rev-list', '--first-parent', '--left-right', '--count', f'{lbranch}...{rbranch}']).stdout return [int(x) for x in output.split()] def get_tracking_branch(branch: str) -> Union[str, None]: """Return branch that |branch| is tracking.""" merge = git(['config', '--local', f'branch.{branch}.merge']).stdout.strip() if not merge: return None remote = git(['config', '--local', f'branch.{branch}.remote']).stdout.strip() if remote: if merge.startswith('refs/heads/'): merge = merge[11:] return f'{remote}/{merge}' else: return merge def get_local_branch() -> str: """Return the name of the local checked out branch.""" return git(['branch', '--show-current']).stdout.strip() def get_parser() -> argparse.ArgumentParser: """Get CLI parser.""" parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument( '--skip-initial-rebase-latest', dest='initial_rebase', action='store_false', default=True, help='skip initial rebase attempt onto the latest branch') parser.add_argument( '--leave-at-last-failed-rebase', dest='leave_rebase', action='store_true', default=False, help='leave tree state at last failing rebase') parser.add_argument( '--checkout-before-rebase', dest='force_checkout', action='store_true', default=False, help='force checkout before rebasing to target (to cleanup orphans)') parser.add_argument( 'branch', nargs='?', help='branch to rebase onto') return parser def main(argv: List[str]) -> int: """The main entry point for scripts.""" parser = get_parser() opts = parser.parse_args(argv) try: lbranch = get_local_branch() except subprocess.CalledProcessError as e: sys.exit(f'{os.path.basename(sys.argv[0])}: {Path.cwd()}:\n{e}\n{e.stderr.strip()}') print(f'Local branch resolved to "{lbranch}".') if not lbranch: print('Unable to resolve local branch', file=sys.stderr) return 1 if opts.branch: rbranch = opts.branch else: rbranch = get_tracking_branch(lbranch) print(f'Tracking branch resolved to "{rbranch}".') ahead, behind = get_ahead_behind(lbranch, rbranch) print(f'Branch is {ahead} commits ahead and {behind} commits behind.') print('NB: Counts for the first parent in merge history, not all commits.') if not behind: print('Up-to-date!') elif not ahead: print('Fast forwarding ...') git(['merge']) else: if opts.initial_rebase: print(f'Trying to rebase onto latest {rbranch} ... ', end='', flush=True) if rebase(rbranch): print('OK!') return 0 print('failed; falling back to bisect') rebase_bisect(lbranch, rbranch, behind, leave_rebase=opts.leave_rebase, force_checkout=opts.force_checkout) return 0 if __name__ == '__main__': sys.exit(main(sys.argv[1:]))