]> git.wh0rd.org - home.git/commitdiff
git-repack: rewrite with pathlib
authorMike Frysinger <vapier@gentoo.org>
Sun, 28 Mar 2021 16:07:13 +0000 (12:07 -0400)
committerMike Frysinger <vapier@gentoo.org>
Sun, 28 Mar 2021 16:07:13 +0000 (12:07 -0400)
.bin/git-repack

index 82f8f17ced5c86d72f726eba77fc430b0beae739..865ca5e51cc18c06c42e99f8e7b2642df9c925c7 100755 (executable)
@@ -8,51 +8,52 @@
 from __future__ import print_function
 
 import argparse
-import glob
 import os
+from pathlib import Path
 import shutil
 import subprocess
 import sys
 import tempfile
+from typing import Dict, List, Optional
 
 
-def mount_settings():
+def mount_settings() -> Dict[str, str]:
     """Return dict mapping path to its type"""
     ret = {}
-    with open('/proc/mounts') as fp:
+    with Path('/proc/mounts').open(encoding='utf-8') as fp:
         for line in fp:
             ele = line.split()
             ret[ele[1]] = ele[2]
     return ret
 
 
-def is_git_dir(path):
+def is_git_dir(path: Path) -> bool:
     """Whether |path| is a .git dir"""
-    return (os.path.isdir(os.path.join(path, 'refs')) and
-            os.path.isdir(os.path.join(path, 'objects')) and
-            os.path.isfile(os.path.join(path, 'config')))
+    return ((path / 'refs').is_dir() and
+            (path / 'objects').is_dir() and
+            (path / 'config').is_file())
 
 
-def find_git_dir(path):
+def find_git_dir(path: Path) -> Path:
     """Try to find the .git dir to operate on"""
     orig_path = path
-    real_path = path = os.path.realpath(path)
+    real_path = path = path.resolve()
     while True:
         curr_path = path
-        if os.path.isdir(os.path.join(path, '.git')):
-            curr_path = os.path.join(path, '.git')
+        if (path / '.git').is_dir():
+            curr_path = path / '.git'
 
         if is_git_dir(curr_path):
             return curr_path
 
-        path = os.path.dirname(path)
-
-        if path == '/':
+        parent = path.parent
+        if path == parent:
             raise ValueError('could not locate .git dir: %s (%s)' %
                              (orig_path, real_path))
+        path = parent
 
 
-def find_temp_dir():
+def find_temp_dir() -> Optional[Path]:
     """Find a good temp dir (one backed by tmpfs)"""
     SEARCH_PATHS = (
         '/var/tmp/portage',
@@ -63,94 +64,89 @@ def find_temp_dir():
     mounts = mount_settings()
     for path in SEARCH_PATHS:
         if mounts.get(path) == 'tmpfs':
-            return path
+            return Path(path)
     return None
 
 
-def readfile(path):
+def readfile(path: Path) -> str:
     """Read |path| and return its data"""
-    if os.path.isfile(path):
-        with open(path) as fp:
-            return fp.read()
+    if path.is_file():
+        return path.read_text(encoding='utf-8')
     return ''
 
 
-def unlink(path):
-    """Unlink |path| if it exists else do nothing"""
-    if os.path.isfile(path):
-        os.unlink(path)
-
-
 def clean_hooks(path):
     """Strip out sample files from hooks/"""
-    hooks_path = os.path.join(path, 'hooks')
-    for hook in glob.glob(os.path.join(hooks_path, '*.sample')):
-        print('Trimming hook: %s' % hook)
-        os.unlink(hook)
+    for hook in (path / 'hooks').glob('*.sample'):
+        print('Trimming hook:', hook)
+        hook.unlink()
 
 
 def clean_packs(path):
     """Strip out temp files from objects/packs/"""
-    packs_path = os.path.join(path, 'objects', 'packs')
-    for pack in glob.glob(os.path.join(packs_path, 'tmp_pack_*')):
-        print('Trimming pack: %s' % pack)
-        os.unlink(pack)
+    for pack in (path / 'objects' / 'packs').glob('tmp_pack_*'):
+        print('Trimming pack:', pack)
+        pack.unlink()
 
 
 def is_packed(path):
     """See if the git repo is already packed"""
-    obj_path = os.path.join(path, 'objects')
-    paths = set(os.listdir(obj_path))
+    obj_path = path / 'objects'
+    paths = {x.name for x in obj_path.iterdir()}
     if paths not in ({'info', 'pack'}, {'pack'}):
         return False
-    packs = os.listdir(os.path.join(obj_path, 'pack'))
+    packs = tuple((obj_path / 'pack').iterdir())
     if len(packs) != 2:
         return False
     return True
 
 
-def repack(path):
+def repack(path: Path):
     """Clean up and trim cruft and repack |path|"""
     path = find_git_dir(path)
-    print('Repacking %s' % path)
+    print('Repacking', path)
 
     # Repack any submodules this project might use.
-    modules_path = os.path.join(path, 'modules')
-    if os.path.isdir(modules_path):
+    modules_path = path / 'modules'
+    if modules_path.is_dir():
         for root, dirs, _ in os.walk(modules_path):
+            root = Path(root)
             dirs.sort()
             for d in dirs:
-                mod_path = os.path.join(root, d)
+                mod_path = root / d
                 if is_git_dir(mod_path):
                     repack(mod_path)
 
     tmpdir = find_temp_dir()
     if tmpdir:
-        tmpdir = tempfile.mkdtemp(prefix='git-repack.', dir=tmpdir)
-        print('Using tempdir: %s' % tmpdir)
-        os.rmdir(tmpdir)
+        tmpdir = Path(tempfile.mkdtemp(prefix='git-repack.', dir=tmpdir))
+        print('Using tempdir:', tmpdir)
+        tmpdir.rmdir()
         # Doesn't matter for these needs.
-        os.environ['GIT_WORK_TREE'] = tmpdir
+        os.environ['GIT_WORK_TREE'] = str(tmpdir)
 
     grafts = alts = None
     try:
         # Push/pop the graft & alternate paths so we don't read them.
         # XXX: In some cases, this is bad, but I don't use them that way ...
-        graft_file = os.path.join(path, 'info', 'grafts')
+        graft_file = path / 'info' / 'grafts'
         grafts = readfile(graft_file)
-        unlink(graft_file)
+        graft_file.unlink(missing_ok=True)
 
-        alt_file = os.path.join(path, 'objects', 'info', 'alternates')
+        alt_file = path / 'objects' / 'info' / 'alternates'
         alts = readfile(alt_file)
-        unlink(alt_file)
+        alt_file.unlink(missing_ok=True)
 
         clean_hooks(path)
 
         # XXX: Should do this for all remotes?
-        origin_path = os.path.join(path, 'refs', 'remotes', 'origin')
-        packed_refs = readfile(os.path.join(path, 'packed-refs'))
-        if os.path.exists(origin_path) or 'refs/remotes/origin/' in packed_refs:
-            cmd = ['git', '--git-dir', path, 'remote', 'prune', 'origin']
+        origin_path = path / 'refs' / 'remotes' / 'origin'
+        # Delete remote HEAD as we don't need it, and it might be stale.
+        head = origin_path / 'HEAD'
+        head.unlink(missing_ok=True)
+        packed_refs = readfile(path / 'packed-refs')
+        if origin_path.exists() or 'refs/remotes/origin/' in packed_refs:
+            cmd = ['git', '--git-dir', str(path), 'remote', 'prune', 'origin']
             subprocess.run(cmd, cwd='/', check=True)
 
         clean_packs(path)
@@ -166,37 +162,35 @@ def repack(path):
         else:
             rundir = path
 
-        cmd = ['git', '--git-dir', rundir, 'reflog', 'expire', '--all', '--stale-fix']
-        print('Cleaning reflog: %s' % ' '.join(cmd))
+        cmd = ['git', '--git-dir', str(rundir), 'reflog', 'expire', '--all', '--stale-fix']
+        print('Cleaning reflog:', ' '.join(cmd))
         subprocess.run(cmd, cwd='/', check=True)
 
         # This also packs refs/tags for us.
-        cmd = ['git', '--git-dir', rundir, 'gc', '--aggressive', '--prune=all']
-        print('Repacking git repo: %s' % ' '.join(cmd))
+        cmd = ['git', '--git-dir', str(rundir), 'gc', '--aggressive', '--prune=all']
+        print('Repacking git repo:', ' '.join(cmd))
         subprocess.run(cmd, cwd='/', check=True)
 
         # Clean empty dirs.
-        cmd = ['find', rundir, '-depth', '-type', 'd', '-exec', 'rmdir', '{}', '+']
-        subprocess.call(cmd, stderr=subprocess.DEVNULL)
+        cmd = ['find', str(rundir), '-depth', '-type', 'd', '-exec', 'rmdir', '{}', '+']
+        subprocess.run(cmd, stderr=subprocess.DEVNULL, check=False)
 
         # There's a few dirs we need to exist even if they're empty.
-        refdir = os.path.join(rundir, 'refs')
-        os.makedirs(refdir, exist_ok=True)
+        refdir = rundir / 'refs'
+        refdir.mkdir(exist_ok=True)
 
         if tmpdir:
-            cmd = ['rsync', '-a', '--delete', tmpdir + '/', path + '/']
-            print('Syncing back git repo: %s' % ' '.join(cmd))
+            cmd = ['rsync', '-a', '--delete', str(tmpdir) + '/', str(path) + '/']
+            print('Syncing back git repo:', ' '.join(cmd))
             subprocess.run(cmd, cwd='/', check=True)
-            cmd = ['find', path + '/', '-exec', 'chmod', 'u+rw', '{}', '+']
+            cmd = ['find', str(path) + '/', '-exec', 'chmod', 'u+rw', '{}', '+']
             subprocess.run(cmd, cwd='/', check=True)
 
     finally:
         if grafts:
-            with open(graft_file, 'w') as fp:
-                fp.write(grafts)
+            graft_file.write_text(grafts, encoding='utf-8')
         if alts:
-            with open(alt_file, 'w') as fp:
-                fp.write(alts)
+            alt_file.write_text(alts, encoding='utf-8')
         if tmpdir:
             shutil.rmtree(tmpdir, ignore_errors=True)
 
@@ -204,11 +198,11 @@ def repack(path):
 def get_parser():
     """Get the command line parser"""
     parser = argparse.ArgumentParser(description=__doc__)
-    parser.add_argument('dir', help='The git repo to process')
+    parser.add_argument('dir', type=Path, help='The git repo to process')
     return parser
 
 
-def main(argv):
+def main(argv: List[str]):
     """The main script entry point"""
     parser = get_parser()
     opts = parser.parse_args(argv)
@@ -216,4 +210,4 @@ def main(argv):
 
 
 if __name__ == '__main__':
-    exit(main(sys.argv[1:]))
+    sys.exit(main(sys.argv[1:]))