from __future__ import print_function
import argparse
-import glob
import os
+from pathlib import Path
import shutil
import subprocess
import sys
import tempfile
+from typing import Dict, List, Optional
-def mount_settings():
+def mount_settings() -> Dict[str, str]:
"""Return dict mapping path to its type"""
ret = {}
- with open('/proc/mounts') as fp:
+ with Path('/proc/mounts').open(encoding='utf-8') as fp:
for line in fp:
ele = line.split()
ret[ele[1]] = ele[2]
return ret
-def is_git_dir(path):
+def is_git_dir(path: Path) -> bool:
"""Whether |path| is a .git dir"""
- return (os.path.isdir(os.path.join(path, 'refs')) and
- os.path.isdir(os.path.join(path, 'objects')) and
- os.path.isfile(os.path.join(path, 'config')))
+ return ((path / 'refs').is_dir() and
+ (path / 'objects').is_dir() and
+ (path / 'config').is_file())
-def find_git_dir(path):
+def find_git_dir(path: Path) -> Path:
"""Try to find the .git dir to operate on"""
orig_path = path
- real_path = path = os.path.realpath(path)
+ real_path = path = path.resolve()
while True:
curr_path = path
- if os.path.isdir(os.path.join(path, '.git')):
- curr_path = os.path.join(path, '.git')
+ if (path / '.git').is_dir():
+ curr_path = path / '.git'
if is_git_dir(curr_path):
return curr_path
- path = os.path.dirname(path)
-
- if path == '/':
+ parent = path.parent
+ if path == parent:
raise ValueError('could not locate .git dir: %s (%s)' %
(orig_path, real_path))
+ path = parent
-def find_temp_dir():
+def find_temp_dir() -> Optional[Path]:
"""Find a good temp dir (one backed by tmpfs)"""
SEARCH_PATHS = (
'/var/tmp/portage',
mounts = mount_settings()
for path in SEARCH_PATHS:
if mounts.get(path) == 'tmpfs':
- return path
+ return Path(path)
return None
-def readfile(path):
+def readfile(path: Path) -> str:
"""Read |path| and return its data"""
- if os.path.isfile(path):
- with open(path) as fp:
- return fp.read()
+ if path.is_file():
+ return path.read_text(encoding='utf-8')
return ''
-def unlink(path):
- """Unlink |path| if it exists else do nothing"""
- if os.path.isfile(path):
- os.unlink(path)
-
-
def clean_hooks(path):
"""Strip out sample files from hooks/"""
- hooks_path = os.path.join(path, 'hooks')
- for hook in glob.glob(os.path.join(hooks_path, '*.sample')):
- print('Trimming hook: %s' % hook)
- os.unlink(hook)
+ for hook in (path / 'hooks').glob('*.sample'):
+ print('Trimming hook:', hook)
+ hook.unlink()
def clean_packs(path):
"""Strip out temp files from objects/packs/"""
- packs_path = os.path.join(path, 'objects', 'packs')
- for pack in glob.glob(os.path.join(packs_path, 'tmp_pack_*')):
- print('Trimming pack: %s' % pack)
- os.unlink(pack)
+ for pack in (path / 'objects' / 'packs').glob('tmp_pack_*'):
+ print('Trimming pack:', pack)
+ pack.unlink()
def is_packed(path):
"""See if the git repo is already packed"""
- obj_path = os.path.join(path, 'objects')
- paths = set(os.listdir(obj_path))
+ obj_path = path / 'objects'
+ paths = {x.name for x in obj_path.iterdir()}
if paths not in ({'info', 'pack'}, {'pack'}):
return False
- packs = os.listdir(os.path.join(obj_path, 'pack'))
+ packs = tuple((obj_path / 'pack').iterdir())
if len(packs) != 2:
return False
return True
-def repack(path):
+def repack(path: Path):
"""Clean up and trim cruft and repack |path|"""
path = find_git_dir(path)
- print('Repacking %s' % path)
+ print('Repacking', path)
# Repack any submodules this project might use.
- modules_path = os.path.join(path, 'modules')
- if os.path.isdir(modules_path):
+ modules_path = path / 'modules'
+ if modules_path.is_dir():
for root, dirs, _ in os.walk(modules_path):
+ root = Path(root)
dirs.sort()
for d in dirs:
- mod_path = os.path.join(root, d)
+ mod_path = root / d
if is_git_dir(mod_path):
repack(mod_path)
tmpdir = find_temp_dir()
if tmpdir:
- tmpdir = tempfile.mkdtemp(prefix='git-repack.', dir=tmpdir)
- print('Using tempdir: %s' % tmpdir)
- os.rmdir(tmpdir)
+ tmpdir = Path(tempfile.mkdtemp(prefix='git-repack.', dir=tmpdir))
+ print('Using tempdir:', tmpdir)
+ tmpdir.rmdir()
# Doesn't matter for these needs.
- os.environ['GIT_WORK_TREE'] = tmpdir
+ os.environ['GIT_WORK_TREE'] = str(tmpdir)
grafts = alts = None
try:
# Push/pop the graft & alternate paths so we don't read them.
# XXX: In some cases, this is bad, but I don't use them that way ...
- graft_file = os.path.join(path, 'info', 'grafts')
+ graft_file = path / 'info' / 'grafts'
grafts = readfile(graft_file)
- unlink(graft_file)
+ graft_file.unlink(missing_ok=True)
- alt_file = os.path.join(path, 'objects', 'info', 'alternates')
+ alt_file = path / 'objects' / 'info' / 'alternates'
alts = readfile(alt_file)
- unlink(alt_file)
+ alt_file.unlink(missing_ok=True)
clean_hooks(path)
# XXX: Should do this for all remotes?
- origin_path = os.path.join(path, 'refs', 'remotes', 'origin')
- packed_refs = readfile(os.path.join(path, 'packed-refs'))
- if os.path.exists(origin_path) or 'refs/remotes/origin/' in packed_refs:
- cmd = ['git', '--git-dir', path, 'remote', 'prune', 'origin']
+ origin_path = path / 'refs' / 'remotes' / 'origin'
+ # Delete remote HEAD as we don't need it, and it might be stale.
+ head = origin_path / 'HEAD'
+ head.unlink(missing_ok=True)
+ packed_refs = readfile(path / 'packed-refs')
+ if origin_path.exists() or 'refs/remotes/origin/' in packed_refs:
+ cmd = ['git', '--git-dir', str(path), 'remote', 'prune', 'origin']
subprocess.run(cmd, cwd='/', check=True)
clean_packs(path)
else:
rundir = path
- cmd = ['git', '--git-dir', rundir, 'reflog', 'expire', '--all', '--stale-fix']
- print('Cleaning reflog: %s' % ' '.join(cmd))
+ cmd = ['git', '--git-dir', str(rundir), 'reflog', 'expire', '--all', '--stale-fix']
+ print('Cleaning reflog:', ' '.join(cmd))
subprocess.run(cmd, cwd='/', check=True)
# This also packs refs/tags for us.
- cmd = ['git', '--git-dir', rundir, 'gc', '--aggressive', '--prune=all']
- print('Repacking git repo: %s' % ' '.join(cmd))
+ cmd = ['git', '--git-dir', str(rundir), 'gc', '--aggressive', '--prune=all']
+ print('Repacking git repo:', ' '.join(cmd))
subprocess.run(cmd, cwd='/', check=True)
# Clean empty dirs.
- cmd = ['find', rundir, '-depth', '-type', 'd', '-exec', 'rmdir', '{}', '+']
- subprocess.call(cmd, stderr=subprocess.DEVNULL)
+ cmd = ['find', str(rundir), '-depth', '-type', 'd', '-exec', 'rmdir', '{}', '+']
+ subprocess.run(cmd, stderr=subprocess.DEVNULL, check=False)
# There's a few dirs we need to exist even if they're empty.
- refdir = os.path.join(rundir, 'refs')
- os.makedirs(refdir, exist_ok=True)
+ refdir = rundir / 'refs'
+ refdir.mkdir(exist_ok=True)
if tmpdir:
- cmd = ['rsync', '-a', '--delete', tmpdir + '/', path + '/']
- print('Syncing back git repo: %s' % ' '.join(cmd))
+ cmd = ['rsync', '-a', '--delete', str(tmpdir) + '/', str(path) + '/']
+ print('Syncing back git repo:', ' '.join(cmd))
subprocess.run(cmd, cwd='/', check=True)
- cmd = ['find', path + '/', '-exec', 'chmod', 'u+rw', '{}', '+']
+ cmd = ['find', str(path) + '/', '-exec', 'chmod', 'u+rw', '{}', '+']
subprocess.run(cmd, cwd='/', check=True)
finally:
if grafts:
- with open(graft_file, 'w') as fp:
- fp.write(grafts)
+ graft_file.write_text(grafts, encoding='utf-8')
if alts:
- with open(alt_file, 'w') as fp:
- fp.write(alts)
+ alt_file.write_text(alts, encoding='utf-8')
if tmpdir:
shutil.rmtree(tmpdir, ignore_errors=True)
def get_parser():
"""Get the command line parser"""
parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument('dir', help='The git repo to process')
+ parser.add_argument('dir', type=Path, help='The git repo to process')
return parser
-def main(argv):
+def main(argv: List[str]):
"""The main script entry point"""
parser = get_parser()
opts = parser.parse_args(argv)
if __name__ == '__main__':
- exit(main(sys.argv[1:]))
+ sys.exit(main(sys.argv[1:]))