From 2b31c45684ac321d19189f5f2d0bfe19509f05a3 Mon Sep 17 00:00:00 2001 From: Mike Frysinger Date: Sun, 28 Mar 2021 12:07:13 -0400 Subject: [PATCH] git-repack: rewrite with pathlib --- .bin/git-repack | 138 +++++++++++++++++++++++------------------------- 1 file changed, 66 insertions(+), 72 deletions(-) diff --git a/.bin/git-repack b/.bin/git-repack index 82f8f17..865ca5e 100755 --- a/.bin/git-repack +++ b/.bin/git-repack @@ -8,51 +8,52 @@ from __future__ import print_function import argparse -import glob import os +from pathlib import Path import shutil import subprocess import sys import tempfile +from typing import Dict, List, Optional -def mount_settings(): +def mount_settings() -> Dict[str, str]: """Return dict mapping path to its type""" ret = {} - with open('/proc/mounts') as fp: + with Path('/proc/mounts').open(encoding='utf-8') as fp: for line in fp: ele = line.split() ret[ele[1]] = ele[2] return ret -def is_git_dir(path): +def is_git_dir(path: Path) -> bool: """Whether |path| is a .git dir""" - return (os.path.isdir(os.path.join(path, 'refs')) and - os.path.isdir(os.path.join(path, 'objects')) and - os.path.isfile(os.path.join(path, 'config'))) + return ((path / 'refs').is_dir() and + (path / 'objects').is_dir() and + (path / 'config').is_file()) -def find_git_dir(path): +def find_git_dir(path: Path) -> Path: """Try to find the .git dir to operate on""" orig_path = path - real_path = path = os.path.realpath(path) + real_path = path = path.resolve() while True: curr_path = path - if os.path.isdir(os.path.join(path, '.git')): - curr_path = os.path.join(path, '.git') + if (path / '.git').is_dir(): + curr_path = path / '.git' if is_git_dir(curr_path): return curr_path - path = os.path.dirname(path) - - if path == '/': + parent = path.parent + if path == parent: raise ValueError('could not locate .git dir: %s (%s)' % (orig_path, real_path)) + path = parent -def find_temp_dir(): +def find_temp_dir() -> Optional[Path]: """Find a good temp dir (one backed by tmpfs)""" SEARCH_PATHS = ( '/var/tmp/portage', @@ -63,94 +64,89 @@ def find_temp_dir(): mounts = mount_settings() for path in SEARCH_PATHS: if mounts.get(path) == 'tmpfs': - return path + return Path(path) return None -def readfile(path): +def readfile(path: Path) -> str: """Read |path| and return its data""" - if os.path.isfile(path): - with open(path) as fp: - return fp.read() + if path.is_file(): + return path.read_text(encoding='utf-8') return '' -def unlink(path): - """Unlink |path| if it exists else do nothing""" - if os.path.isfile(path): - os.unlink(path) - - def clean_hooks(path): """Strip out sample files from hooks/""" - hooks_path = os.path.join(path, 'hooks') - for hook in glob.glob(os.path.join(hooks_path, '*.sample')): - print('Trimming hook: %s' % hook) - os.unlink(hook) + for hook in (path / 'hooks').glob('*.sample'): + print('Trimming hook:', hook) + hook.unlink() def clean_packs(path): """Strip out temp files from objects/packs/""" - packs_path = os.path.join(path, 'objects', 'packs') - for pack in glob.glob(os.path.join(packs_path, 'tmp_pack_*')): - print('Trimming pack: %s' % pack) - os.unlink(pack) + for pack in (path / 'objects' / 'packs').glob('tmp_pack_*'): + print('Trimming pack:', pack) + pack.unlink() def is_packed(path): """See if the git repo is already packed""" - obj_path = os.path.join(path, 'objects') - paths = set(os.listdir(obj_path)) + obj_path = path / 'objects' + paths = {x.name for x in obj_path.iterdir()} if paths not in ({'info', 'pack'}, {'pack'}): return False - packs = os.listdir(os.path.join(obj_path, 'pack')) + packs = tuple((obj_path / 'pack').iterdir()) if len(packs) != 2: return False return True -def repack(path): +def repack(path: Path): """Clean up and trim cruft and repack |path|""" path = find_git_dir(path) - print('Repacking %s' % path) + print('Repacking', path) # Repack any submodules this project might use. - modules_path = os.path.join(path, 'modules') - if os.path.isdir(modules_path): + modules_path = path / 'modules' + if modules_path.is_dir(): for root, dirs, _ in os.walk(modules_path): + root = Path(root) dirs.sort() for d in dirs: - mod_path = os.path.join(root, d) + mod_path = root / d if is_git_dir(mod_path): repack(mod_path) tmpdir = find_temp_dir() if tmpdir: - tmpdir = tempfile.mkdtemp(prefix='git-repack.', dir=tmpdir) - print('Using tempdir: %s' % tmpdir) - os.rmdir(tmpdir) + tmpdir = Path(tempfile.mkdtemp(prefix='git-repack.', dir=tmpdir)) + print('Using tempdir:', tmpdir) + tmpdir.rmdir() # Doesn't matter for these needs. - os.environ['GIT_WORK_TREE'] = tmpdir + os.environ['GIT_WORK_TREE'] = str(tmpdir) grafts = alts = None try: # Push/pop the graft & alternate paths so we don't read them. # XXX: In some cases, this is bad, but I don't use them that way ... - graft_file = os.path.join(path, 'info', 'grafts') + graft_file = path / 'info' / 'grafts' grafts = readfile(graft_file) - unlink(graft_file) + graft_file.unlink(missing_ok=True) - alt_file = os.path.join(path, 'objects', 'info', 'alternates') + alt_file = path / 'objects' / 'info' / 'alternates' alts = readfile(alt_file) - unlink(alt_file) + alt_file.unlink(missing_ok=True) clean_hooks(path) # XXX: Should do this for all remotes? - origin_path = os.path.join(path, 'refs', 'remotes', 'origin') - packed_refs = readfile(os.path.join(path, 'packed-refs')) - if os.path.exists(origin_path) or 'refs/remotes/origin/' in packed_refs: - cmd = ['git', '--git-dir', path, 'remote', 'prune', 'origin'] + origin_path = path / 'refs' / 'remotes' / 'origin' + # Delete remote HEAD as we don't need it, and it might be stale. + head = origin_path / 'HEAD' + head.unlink(missing_ok=True) + packed_refs = readfile(path / 'packed-refs') + if origin_path.exists() or 'refs/remotes/origin/' in packed_refs: + cmd = ['git', '--git-dir', str(path), 'remote', 'prune', 'origin'] subprocess.run(cmd, cwd='/', check=True) clean_packs(path) @@ -166,37 +162,35 @@ def repack(path): else: rundir = path - cmd = ['git', '--git-dir', rundir, 'reflog', 'expire', '--all', '--stale-fix'] - print('Cleaning reflog: %s' % ' '.join(cmd)) + cmd = ['git', '--git-dir', str(rundir), 'reflog', 'expire', '--all', '--stale-fix'] + print('Cleaning reflog:', ' '.join(cmd)) subprocess.run(cmd, cwd='/', check=True) # This also packs refs/tags for us. - cmd = ['git', '--git-dir', rundir, 'gc', '--aggressive', '--prune=all'] - print('Repacking git repo: %s' % ' '.join(cmd)) + cmd = ['git', '--git-dir', str(rundir), 'gc', '--aggressive', '--prune=all'] + print('Repacking git repo:', ' '.join(cmd)) subprocess.run(cmd, cwd='/', check=True) # Clean empty dirs. - cmd = ['find', rundir, '-depth', '-type', 'd', '-exec', 'rmdir', '{}', '+'] - subprocess.call(cmd, stderr=subprocess.DEVNULL) + cmd = ['find', str(rundir), '-depth', '-type', 'd', '-exec', 'rmdir', '{}', '+'] + subprocess.run(cmd, stderr=subprocess.DEVNULL, check=False) # There's a few dirs we need to exist even if they're empty. - refdir = os.path.join(rundir, 'refs') - os.makedirs(refdir, exist_ok=True) + refdir = rundir / 'refs' + refdir.mkdir(exist_ok=True) if tmpdir: - cmd = ['rsync', '-a', '--delete', tmpdir + '/', path + '/'] - print('Syncing back git repo: %s' % ' '.join(cmd)) + cmd = ['rsync', '-a', '--delete', str(tmpdir) + '/', str(path) + '/'] + print('Syncing back git repo:', ' '.join(cmd)) subprocess.run(cmd, cwd='/', check=True) - cmd = ['find', path + '/', '-exec', 'chmod', 'u+rw', '{}', '+'] + cmd = ['find', str(path) + '/', '-exec', 'chmod', 'u+rw', '{}', '+'] subprocess.run(cmd, cwd='/', check=True) finally: if grafts: - with open(graft_file, 'w') as fp: - fp.write(grafts) + graft_file.write_text(grafts, encoding='utf-8') if alts: - with open(alt_file, 'w') as fp: - fp.write(alts) + alt_file.write_text(alts, encoding='utf-8') if tmpdir: shutil.rmtree(tmpdir, ignore_errors=True) @@ -204,11 +198,11 @@ def repack(path): def get_parser(): """Get the command line parser""" parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument('dir', help='The git repo to process') + parser.add_argument('dir', type=Path, help='The git repo to process') return parser -def main(argv): +def main(argv: List[str]): """The main script entry point""" parser = get_parser() opts = parser.parse_args(argv) @@ -216,4 +210,4 @@ def main(argv): if __name__ == '__main__': - exit(main(sys.argv[1:])) + sys.exit(main(sys.argv[1:])) -- 2.39.5