#!/usr/bin/python3 # pylint: disable=fixme,invalid-name # pylint: disable=too-many-branches,too-many-locals,too-many-statements """Repack git repos fully the way I like them.""" from __future__ import print_function import argparse import glob import os import shutil import subprocess import sys import tempfile def mount_settings(): """Return dict mapping path to its type""" ret = {} with open('/proc/mounts') as fp: for line in fp: ele = line.split() ret[ele[1]] = ele[2] return ret def is_git_dir(path): """Whether |path| is a .git dir""" return (os.path.isdir(os.path.join(path, 'refs')) and os.path.isdir(os.path.join(path, 'objects')) and os.path.isfile(os.path.join(path, 'config'))) def find_git_dir(path): """Try to find the .git dir to operate on""" orig_path = path real_path = path = os.path.realpath(path) while True: curr_path = path if os.path.isdir(os.path.join(path, '.git')): curr_path = os.path.join(path, '.git') if is_git_dir(curr_path): return curr_path path = os.path.dirname(path) if path == '/': raise ValueError('could not locate .git dir: %s (%s)' % (orig_path, real_path)) def find_temp_dir(): """Find a good temp dir (one backed by tmpfs)""" SEARCH_PATHS = ( '/var/tmp/portage', '/var/tmp', '/tmp', tempfile.gettempdir(), ) mounts = mount_settings() for path in SEARCH_PATHS: if mounts.get(path) == 'tmpfs': return path return None def readfile(path): """Read |path| and return its data""" if os.path.isfile(path): with open(path) as fp: return fp.read() return '' def unlink(path): """Unlink |path| if it exists else do nothing""" if os.path.isfile(path): os.unlink(path) def clean_hooks(path): """Strip out sample files from hooks/""" hooks_path = os.path.join(path, 'hooks') for hook in glob.glob(os.path.join(hooks_path, '*.sample')): print('Trimming hook: %s' % hook) os.unlink(hook) def clean_packs(path): """Strip out temp files from objects/packs/""" packs_path = os.path.join(path, 'objects', 'packs') for pack in glob.glob(os.path.join(packs_path, 'tmp_pack_*')): print('Trimming pack: %s' % pack) os.unlink(pack) def is_packed(path): """See if the git repo is already packed""" obj_path = os.path.join(path, 'objects') paths = set(os.listdir(obj_path)) if paths not in ({'info', 'pack'}, {'pack'}): return False packs = os.listdir(os.path.join(obj_path, 'pack')) if len(packs) != 2: return False return True def repack(path): """Clean up and trim cruft and repack |path|""" path = find_git_dir(path) print('Repacking %s' % path) # Repack any submodules this project might use. modules_path = os.path.join(path, 'modules') if os.path.isdir(modules_path): for root, dirs, _ in os.walk(modules_path): dirs.sort() for d in dirs: mod_path = os.path.join(root, d) if is_git_dir(mod_path): repack(mod_path) tmpdir = find_temp_dir() if tmpdir: tmpdir = tempfile.mkdtemp(prefix='git-repack.', dir=tmpdir) print('Using tempdir: %s' % tmpdir) os.rmdir(tmpdir) # Doesn't matter for these needs. os.environ['GIT_WORK_TREE'] = tmpdir grafts = alts = None try: # Push/pop the graft & alternate paths so we don't read them. # XXX: In some cases, this is bad, but I don't use them that way ... graft_file = os.path.join(path, 'info', 'grafts') grafts = readfile(graft_file) unlink(graft_file) alt_file = os.path.join(path, 'objects', 'info', 'alternates') alts = readfile(alt_file) unlink(alt_file) clean_hooks(path) # XXX: Should do this for all remotes? origin_path = os.path.join(path, 'refs', 'remotes', 'origin') packed_refs = readfile(os.path.join(path, 'packed-refs')) if os.path.exists(origin_path) or 'refs/remotes/origin/' in packed_refs: cmd = ['git', '--git-dir', path, 'remote', 'prune', 'origin'] subprocess.run(cmd, cwd='/', check=True) clean_packs(path) if is_packed(path): print('Git repo is already packed; nothing to do') return if tmpdir: print('Syncing git repo to tempdir') shutil.copytree(path, tmpdir, symlinks=True) rundir = tmpdir else: rundir = path cmd = ['git', '--git-dir', rundir, 'reflog', 'expire', '--all', '--stale-fix'] print('Cleaning reflog: %s' % ' '.join(cmd)) subprocess.run(cmd, cwd='/', check=True) # This also packs refs/tags for us. cmd = ['git', '--git-dir', rundir, 'gc', '--aggressive', '--prune=all'] print('Repacking git repo: %s' % ' '.join(cmd)) subprocess.run(cmd, cwd='/', check=True) # Clean empty dirs. cmd = ['find', rundir, '-depth', '-type', 'd', '-exec', 'rmdir', '{}', '+'] subprocess.call(cmd, stderr=subprocess.DEVNULL) # There's a few dirs we need to exist even if they're empty. refdir = os.path.join(rundir, 'refs') os.makedirs(refdir, exist_ok=True) if tmpdir: cmd = ['rsync', '-a', '--delete', tmpdir + '/', path + '/'] print('Syncing back git repo: %s' % ' '.join(cmd)) subprocess.run(cmd, cwd='/', check=True) cmd = ['find', path + '/', '-exec', 'chmod', 'u+rw', '{}', '+'] subprocess.run(cmd, cwd='/', check=True) finally: if grafts: with open(graft_file, 'w') as fp: fp.write(grafts) if alts: with open(alt_file, 'w') as fp: fp.write(alts) if tmpdir: shutil.rmtree(tmpdir, ignore_errors=True) def get_parser(): """Get the command line parser""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('dir', help='The git repo to process') return parser def main(argv): """The main script entry point""" parser = get_parser() opts = parser.parse_args(argv) repack(opts.dir) if __name__ == '__main__': exit(main(sys.argv[1:]))