#!/usr/bin/python3 # pylint: disable=fixme,invalid-name # pylint: disable=too-many-branches,too-many-locals,too-many-statements """Repack git repos fully the way I like them.""" from __future__ import print_function import argparse import os from pathlib import Path import shutil import subprocess import sys import tempfile from typing import Dict, List, Optional def mount_settings() -> Dict[str, str]: """Return dict mapping path to its type""" ret = {} with Path('/proc/mounts').open(encoding='utf-8') as fp: for line in fp: ele = line.split() ret[ele[1]] = ele[2] return ret def is_git_dir(path: Path) -> bool: """Whether |path| is a .git dir""" return ((path / 'refs').is_dir() and (path / 'objects').is_dir() and (path / 'config').is_file()) def find_git_dir(path: Path) -> Path: """Try to find the .git dir to operate on""" orig_path = path real_path = path = path.resolve() while True: curr_path = path if (path / '.git').is_dir(): curr_path = path / '.git' if is_git_dir(curr_path): return curr_path parent = path.parent if path == parent: raise ValueError('could not locate .git dir: %s (%s)' % (orig_path, real_path)) path = parent def find_temp_dir() -> Optional[Path]: """Find a good temp dir (one backed by tmpfs)""" SEARCH_PATHS = ( '/var/tmp/portage', '/var/tmp', '/tmp', tempfile.gettempdir(), ) mounts = mount_settings() for path in SEARCH_PATHS: if mounts.get(path) == 'tmpfs': return Path(path) return None def readfile(path: Path) -> str: """Read |path| and return its data""" if path.is_file(): return path.read_text(encoding='utf-8') return '' def clean_hooks(path): """Strip out sample files from hooks/""" for hook in (path / 'hooks').glob('*.sample'): print('Trimming hook:', hook) hook.unlink() def clean_packs(path): """Strip out temp files from objects/packs/""" for pack in (path / 'objects' / 'packs').glob('tmp_pack_*'): print('Trimming pack:', pack) pack.unlink() def is_packed(path): """See if the git repo is already packed""" obj_path = path / 'objects' paths = {x.name for x in obj_path.iterdir()} if paths not in ({'info', 'pack'}, {'pack'}): return False packs = tuple((obj_path / 'pack').iterdir()) if len(packs) != 2: return False return True def repack(path: Path): """Clean up and trim cruft and repack |path|""" path = find_git_dir(path) print('Repacking', path) # Repack any submodules this project might use. modules_path = path / 'modules' if modules_path.is_dir(): for root, dirs, _ in os.walk(modules_path): root = Path(root) dirs.sort() for d in dirs: mod_path = root / d if is_git_dir(mod_path): repack(mod_path) tmpdir = find_temp_dir() if tmpdir: tmpdir = Path(tempfile.mkdtemp(prefix='git-repack.', dir=tmpdir)) print('Using tempdir:', tmpdir) tmpdir.rmdir() # Doesn't matter for these needs. os.environ['GIT_WORK_TREE'] = str(tmpdir) grafts = alts = None try: # Push/pop the graft & alternate paths so we don't read them. # XXX: In some cases, this is bad, but I don't use them that way ... graft_file = path / 'info' / 'grafts' grafts = readfile(graft_file) graft_file.unlink(missing_ok=True) alt_file = path / 'objects' / 'info' / 'alternates' alts = readfile(alt_file) alt_file.unlink(missing_ok=True) clean_hooks(path) # XXX: Should do this for all remotes? origin_path = path / 'refs' / 'remotes' / 'origin' # Delete remote HEAD as we don't need it, and it might be stale. head = origin_path / 'HEAD' head.unlink(missing_ok=True) packed_refs = readfile(path / 'packed-refs') if origin_path.exists() or 'refs/remotes/origin/' in packed_refs: cmd = ['git', '--git-dir', str(path), 'remote', 'prune', 'origin'] subprocess.run(cmd, cwd='/', check=True) clean_packs(path) if is_packed(path): print('Git repo is already packed; nothing to do') return if tmpdir: print('Syncing git repo to tempdir') shutil.copytree(path, tmpdir, symlinks=True) rundir = tmpdir else: rundir = path cmd = ['git', '--git-dir', str(rundir), 'reflog', 'expire', '--all', '--stale-fix'] print('Cleaning reflog:', ' '.join(cmd)) subprocess.run(cmd, cwd='/', check=True) # This also packs refs/tags for us. cmd = ['git', '--git-dir', str(rundir), 'gc', '--aggressive', '--prune=all'] print('Repacking git repo:', ' '.join(cmd)) subprocess.run(cmd, cwd='/', check=True) # Clean empty dirs. cmd = ['find', str(rundir), '-depth', '-type', 'd', '-exec', 'rmdir', '{}', '+'] subprocess.run(cmd, stderr=subprocess.DEVNULL, check=False) # There's a few dirs we need to exist even if they're empty. refdir = rundir / 'refs' refdir.mkdir(exist_ok=True) if tmpdir: cmd = ['rsync', '-a', '--delete', str(tmpdir) + '/', str(path) + '/'] print('Syncing back git repo:', ' '.join(cmd)) subprocess.run(cmd, cwd='/', check=True) cmd = ['find', str(path) + '/', '-exec', 'chmod', 'u+rw', '{}', '+'] subprocess.run(cmd, cwd='/', check=True) finally: if grafts: graft_file.write_text(grafts, encoding='utf-8') if alts: alt_file.write_text(alts, encoding='utf-8') if tmpdir: shutil.rmtree(tmpdir, ignore_errors=True) def get_parser(): """Get the command line parser""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('dir', type=Path, help='The git repo to process') return parser def main(argv: List[str]): """The main script entry point""" parser = get_parser() opts = parser.parse_args(argv) repack(opts.dir) if __name__ == '__main__': sys.exit(main(sys.argv[1:]))