]> git.wh0rd.org - home.git/blob - .bin/git-rb-catchup
git-rb-catchup: fix merge counts
[home.git] / .bin / git-rb-catchup
1 #!/usr/bin/env python3
2 # Distributed under the terms of the GNU General Public License v2 or later.
3
4 """Helper to automatically rebase onto latest commit possible.
5
6 Helpful when you have a branch tracking an old commit, and a lot of conflicting
7 changes have landed in the latest branch, but you still want to update.
8
9 A single rebase to the latest commit will require addressing all the different
10 changes at once which can be difficult, overwhelming, and error-prone. Instead,
11 if you rebased onto each intermediate conflicting point, you'd break up the work
12 into smaller pieces, and be able to run tests to make sure things were still OK.
13 """
14
15 import argparse
16 import subprocess
17 import sys
18 from typing import List, Tuple, Union
19
20
21 assert sys.version_info >= (3, 7), f'Need Python 3.7+, not {sys.version_info}'
22
23
24 def git(args: List[str], **kwargs) -> subprocess.CompletedProcess:
25 """Run git."""
26 kwargs.setdefault('check', True)
27 kwargs.setdefault('capture_output', True)
28 kwargs.setdefault('encoding', 'utf-8')
29 # pylint: disable=subprocess-run-check
30 return subprocess.run(['git'] + args, **kwargs)
31
32
33 def rebase(target: str) -> bool:
34 """Try to rebase onto |target|."""
35 try:
36 git(['rebase', target])
37 return True
38 except KeyboardInterrupt:
39 git(['rebase', '--abort'])
40 print('aborted')
41 sys.exit(1)
42 except:
43 git(['rebase', '--abort'])
44 return False
45
46
47 def rebase_bisect(lbranch: str,
48 rbranch: str,
49 behind: int,
50 leave_rebase: bool = False,
51 force_checkout: bool = False):
52 """Try to rebase branch as close to |rbranch| as possible."""
53 def attempt(pos: int) -> bool:
54 target = f'{rbranch}~{pos}'
55 print(f'Rebasing onto {target} ', end='')
56 print('.', end='', flush=True)
57 # Checking out these branches directly helps clobber orphaned files,
58 # but is usually unnessary, and can slow down the overall process.
59 if force_checkout:
60 git(['checkout', '-f', target])
61 print('.', end='', flush=True)
62 if force_checkout:
63 git(['checkout', '-f', lbranch])
64 print('. ', end='', flush=True)
65 ret = rebase(target)
66 print('OK' if ret else 'failed')
67 return ret
68
69 # "pmin" is the latest branch position while "pmax" is where we're now.
70 pmin = 0
71 pmax = behind
72 old_mid = None
73 first_fail = 0
74 while True:
75 mid = pmin + (pmax - pmin) // 2
76 if mid == old_mid or mid < pmin or mid >= pmax:
77 break
78 if attempt(mid):
79 pmax = mid
80 else:
81 first_fail = max(first_fail, mid)
82 pmin = mid
83 old_mid = mid
84
85 if pmin or pmax:
86 last_target = f'{rbranch}~{first_fail}'
87 if leave_rebase:
88 print('Restarting', last_target)
89 result = git(['rebase', last_target], check=False)
90 print(result.stdout.strip())
91 else:
92 print('Found first failure', last_target)
93 else:
94 print('All caught up!')
95
96
97 def get_ahead_behind(lbranch: str, rbranch: str) -> Tuple[int, int]:
98 """Return number of commits |lbranch| is ahead & behind relative to |rbranch|."""
99 output = git(
100 ['rev-list', '--first-parent', '--left-right', '--count',
101 f'{lbranch}...{rbranch}']).stdout
102 return [int(x) for x in output.split()]
103
104
105 def get_tracking_branch(branch: str) -> Union[str, None]:
106 """Return branch that |branch| is tracking."""
107 merge = git(['config', '--local', f'branch.{branch}.merge']).stdout.strip()
108 if not merge:
109 return None
110
111 remote = git(['config', '--local', f'branch.{branch}.remote']).stdout.strip()
112 if remote:
113 if merge.startswith('refs/heads/'):
114 merge = merge[11:]
115 return f'{remote}/{merge}'
116 else:
117 return merge
118
119
120 def get_local_branch() -> str:
121 """Return the name of the local checked out branch."""
122 return git(['branch', '--show-current']).stdout.strip()
123
124
125 def get_parser() -> argparse.ArgumentParser:
126 """Get CLI parser."""
127 parser = argparse.ArgumentParser(
128 description=__doc__,
129 formatter_class=argparse.RawDescriptionHelpFormatter)
130 parser.add_argument(
131 '--skip-initial-rebase-latest', dest='initial_rebase',
132 action='store_false', default=True,
133 help='skip initial rebase attempt onto the latest branch')
134 parser.add_argument(
135 '--leave-at-last-failed-rebase', dest='leave_rebase',
136 action='store_true', default=False,
137 help='leave tree state at last failing rebase')
138 parser.add_argument(
139 '--checkout-before-rebase', dest='force_checkout',
140 action='store_true', default=False,
141 help='force checkout before rebasing to target (to cleanup orphans)')
142 parser.add_argument(
143 'branch', nargs='?',
144 help='branch to rebase onto')
145 return parser
146
147
148 def main(argv: List[str]) -> int:
149 """The main entry point for scripts."""
150 parser = get_parser()
151 opts = parser.parse_args(argv)
152
153 lbranch = get_local_branch()
154 print(f'Local branch resolved to "{lbranch}".')
155 if not lbranch:
156 print('Unable to resolve local branch', file=sys.stderr)
157 return 1
158
159 if opts.branch:
160 rbranch = opts.branch
161 else:
162 rbranch = get_tracking_branch(lbranch)
163 print(f'Tracking branch resolved to "{rbranch}".')
164
165 ahead, behind = get_ahead_behind(lbranch, rbranch)
166 print(f'Branch is {ahead} commits ahead and {behind} commits behind.')
167 print('NB: Counts for the first parent in merge history, not all commits.')
168
169 if not behind:
170 print('Up-to-date!')
171 elif not ahead:
172 print('Fast forwarding ...')
173 git(['merge'])
174 else:
175 if opts.initial_rebase:
176 print(f'Trying to rebase onto latest {rbranch} ... ',
177 end='', flush=True)
178 if rebase(rbranch):
179 print('OK!')
180 return 0
181 print('failed; falling back to bisect')
182 rebase_bisect(lbranch, rbranch, behind, leave_rebase=opts.leave_rebase,
183 force_checkout=opts.force_checkout)
184
185 return 0
186
187
188 if __name__ == '__main__':
189 sys.exit(main(sys.argv[1:]))