Adding upstream version 0.15.1.
Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
parent
d01d7be95b
commit
f2586667ea
22 changed files with 1805 additions and 372 deletions
259
gita/utils.py
259
gita/utils.py
|
@ -1,15 +1,53 @@
|
|||
import os
|
||||
import yaml
|
||||
import json
|
||||
import csv
|
||||
import asyncio
|
||||
import platform
|
||||
import subprocess
|
||||
from functools import lru_cache, partial
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Coroutine, Union, Iterator
|
||||
from typing import List, Dict, Coroutine, Union, Iterator, Tuple
|
||||
from collections import Counter, defaultdict
|
||||
|
||||
from . import info
|
||||
from . import common
|
||||
|
||||
|
||||
# TODO: python3.9 pathlib has is_relative_to() function
|
||||
def is_relative_to(kid: str, parent: str) -> bool:
|
||||
"""
|
||||
Both the `kid` and `parent` should be absolute path
|
||||
"""
|
||||
return parent == os.path.commonpath((kid, parent))
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_repos(root=None) -> Dict[str, Dict[str, str]]:
|
||||
"""
|
||||
Return a `dict` of repo name to repo absolute path and repo type
|
||||
|
||||
@param root: Use local config if set. If None, use either global or local
|
||||
config depending on cwd.
|
||||
"""
|
||||
path_file = common.get_config_fname('repos.csv', root)
|
||||
repos = {}
|
||||
if os.path.isfile(path_file) and os.stat(path_file).st_size > 0:
|
||||
with open(path_file) as f:
|
||||
rows = csv.DictReader(f, ['path', 'name', 'type', 'flags'],
|
||||
restval='') # it's actually a reader
|
||||
repos = {r['name']:
|
||||
{'path': r['path'], 'type': r['type'],
|
||||
'flags': r['flags'].split()}
|
||||
for r in rows if is_git(r['path'], is_bare=True)}
|
||||
if root is None: # detect if inside a main path
|
||||
cwd = os.getcwd()
|
||||
for prop in repos.values():
|
||||
path = prop['path']
|
||||
if prop['type'] == 'm' and is_relative_to(cwd, path):
|
||||
return get_repos(path)
|
||||
return repos
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_context() -> Union[Path, None]:
|
||||
"""
|
||||
|
@ -21,42 +59,18 @@ def get_context() -> Union[Path, None]:
|
|||
return matches[0] if matches else None
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_repos() -> Dict[str, str]:
|
||||
"""
|
||||
Return a `dict` of repo name to repo absolute path
|
||||
"""
|
||||
path_file = common.get_config_fname('repo_path')
|
||||
repos = {}
|
||||
# Each line is a repo path and repo name separated by ,
|
||||
if os.path.isfile(path_file) and os.stat(path_file).st_size > 0:
|
||||
with open(path_file) as f:
|
||||
for line in f:
|
||||
line = line.rstrip()
|
||||
if not line: # blank line
|
||||
continue
|
||||
path, name = line.split(',')
|
||||
if not is_git(path):
|
||||
continue
|
||||
if name not in repos:
|
||||
repos[name] = path
|
||||
else: # repo name collision for different paths: include parent path name
|
||||
par_name = os.path.basename(os.path.dirname(path))
|
||||
repos[os.path.join(par_name, name)] = path
|
||||
return repos
|
||||
|
||||
|
||||
@lru_cache()
|
||||
def get_groups() -> Dict[str, List[str]]:
|
||||
"""
|
||||
Return a `dict` of group name to repo names.
|
||||
"""
|
||||
fname = common.get_config_fname('groups.yml')
|
||||
fname = common.get_config_fname('groups.csv')
|
||||
groups = {}
|
||||
# Each line is a repo path and repo name separated by ,
|
||||
if os.path.isfile(fname) and os.stat(fname).st_size > 0:
|
||||
with open(fname, 'r') as f:
|
||||
groups = yaml.load(f, Loader=yaml.FullLoader)
|
||||
rows = csv.reader(f, delimiter=':')
|
||||
groups = {r[0]: r[1].split() for r in rows}
|
||||
return groups
|
||||
|
||||
|
||||
|
@ -75,10 +89,12 @@ def get_choices() -> List[Union[str, None]]:
|
|||
return choices
|
||||
|
||||
|
||||
def is_git(path: str) -> bool:
|
||||
def is_git(path: str, is_bare=False) -> bool:
|
||||
"""
|
||||
Return True if the path is a git repo.
|
||||
"""
|
||||
if not os.path.exists(path):
|
||||
return False
|
||||
# An alternative is to call `git rev-parse --is-inside-work-tree`
|
||||
# I don't see why that one is better yet.
|
||||
# For a regular git repo, .git is a folder, for a worktree repo, .git is a file.
|
||||
|
@ -88,59 +104,172 @@ def is_git(path: str) -> bool:
|
|||
# `git rev-parse --git-common-dir`
|
||||
loc = os.path.join(path, '.git')
|
||||
# TODO: we can display the worktree repos in a different font.
|
||||
return os.path.exists(loc)
|
||||
if os.path.exists(loc):
|
||||
return True
|
||||
if not is_bare:
|
||||
return False
|
||||
# detect bare repo
|
||||
got = subprocess.run('git rev-parse --is-bare-repository'.split(),
|
||||
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
|
||||
cwd=path
|
||||
)
|
||||
if got.returncode == 0 and got.stdout == b'true\n':
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def rename_repo(repos: Dict[str, str], repo: str, new_name: str):
|
||||
def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str):
|
||||
"""
|
||||
Write new repo name to file
|
||||
"""
|
||||
path = repos[repo]
|
||||
if new_name in repos:
|
||||
print(f"{new_name} is already in use!")
|
||||
return
|
||||
prop = repos[repo]
|
||||
del repos[repo]
|
||||
repos[new_name] = path
|
||||
write_to_repo_file(repos, 'w')
|
||||
repos[new_name] = prop
|
||||
# write to local config if inside a main path
|
||||
main_paths = (prop['path'] for prop in repos.values() if prop['type'] == 'm')
|
||||
cwd = os.getcwd()
|
||||
is_local_config = True
|
||||
for p in main_paths:
|
||||
if is_relative_to(cwd, p):
|
||||
write_to_repo_file(repos, 'w', p)
|
||||
break
|
||||
else: # global config
|
||||
write_to_repo_file(repos, 'w')
|
||||
is_local_config = False
|
||||
# update groups only when outside any main repos
|
||||
if is_local_config:
|
||||
return
|
||||
groups = get_groups()
|
||||
for g, members in groups.items():
|
||||
if repo in members:
|
||||
members.remove(repo)
|
||||
members.append(new_name)
|
||||
groups[g] = sorted(members)
|
||||
write_to_groups_file(groups, 'w')
|
||||
|
||||
|
||||
def write_to_repo_file(repos: Dict[str, str], mode: str):
|
||||
def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str, root=None):
|
||||
"""
|
||||
@param repos: each repo is {name: {properties}}
|
||||
"""
|
||||
data = ''.join(f'{path},{name}\n' for name, path in repos.items())
|
||||
fname = common.get_config_fname('repo_path')
|
||||
data = [(prop['path'], name, prop['type'], ' '.join(prop['flags']))
|
||||
for name, prop in repos.items()]
|
||||
fname = common.get_config_fname('repos.csv', root)
|
||||
os.makedirs(os.path.dirname(fname), exist_ok=True)
|
||||
with open(fname, mode) as f:
|
||||
f.write(data)
|
||||
with open(fname, mode, newline='') as f:
|
||||
writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
|
||||
writer.writerows(data)
|
||||
|
||||
|
||||
def write_to_groups_file(groups: Dict[str, List[str]], mode: str):
|
||||
"""
|
||||
|
||||
"""
|
||||
fname = common.get_config_fname('groups.yml')
|
||||
fname = common.get_config_fname('groups.csv')
|
||||
os.makedirs(os.path.dirname(fname), exist_ok=True)
|
||||
if not groups: # all groups are deleted
|
||||
open(fname, 'w').close()
|
||||
else:
|
||||
with open(fname, mode) as f:
|
||||
yaml.dump(groups, f, default_flow_style=None)
|
||||
with open(fname, mode, newline='') as f:
|
||||
data = [
|
||||
(group, ' '.join(repos))
|
||||
for group, repos in groups.items()
|
||||
]
|
||||
writer = csv.writer(f, delimiter=':', quotechar='"', quoting=csv.QUOTE_MINIMAL)
|
||||
writer.writerows(data)
|
||||
|
||||
|
||||
def add_repos(repos: Dict[str, str], new_paths: List[str]):
|
||||
def _make_name(path: str, repos: Dict[str, Dict[str, str]],
|
||||
name_counts: Counter) -> str:
|
||||
"""
|
||||
Write new repo paths to file
|
||||
Given a new repo `path`, create a repo name. By default, basename is used.
|
||||
If name collision exists, further include parent path name.
|
||||
|
||||
@param path: It should not be in `repos` and is absolute
|
||||
"""
|
||||
name = os.path.basename(os.path.normpath(path))
|
||||
if name in repos or name_counts[name] > 1:
|
||||
par_name = os.path.basename(os.path.dirname(path))
|
||||
return os.path.join(par_name, name)
|
||||
return name
|
||||
|
||||
|
||||
def _get_repo_type(path, repo_type, root) -> str:
|
||||
"""
|
||||
|
||||
"""
|
||||
if repo_type != '': # explicitly set
|
||||
return repo_type
|
||||
if root is not None and os.path.normpath(root) == os.path.normpath(path):
|
||||
return 'm'
|
||||
return ''
|
||||
|
||||
|
||||
def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str],
|
||||
repo_type='', root=None, is_bare=False) -> Dict[str, Dict[str, str]]:
|
||||
"""
|
||||
Write new repo paths to file; return the added repos.
|
||||
|
||||
@param repos: name -> path
|
||||
"""
|
||||
existing_paths = set(repos.values())
|
||||
new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p))
|
||||
existing_paths = {prop['path'] for prop in repos.values()}
|
||||
new_paths = {p for p in new_paths if is_git(p, is_bare)}
|
||||
new_paths = new_paths - existing_paths
|
||||
new_repos = {}
|
||||
if new_paths:
|
||||
print(f"Found {len(new_paths)} new repo(s).")
|
||||
new_repos = {
|
||||
os.path.basename(os.path.normpath(path)): path
|
||||
for path in new_paths}
|
||||
write_to_repo_file(new_repos, 'a+')
|
||||
name_counts = Counter(
|
||||
os.path.basename(os.path.normpath(p)) for p in new_paths
|
||||
)
|
||||
new_repos = {_make_name(path, repos, name_counts): {
|
||||
'path': path,
|
||||
'type': _get_repo_type(path, repo_type, root),
|
||||
'flags': '',
|
||||
} for path in new_paths}
|
||||
# When root is not None, we could optionally set its type to 'm', i.e.,
|
||||
# main repo.
|
||||
write_to_repo_file(new_repos, 'a+', root)
|
||||
else:
|
||||
print('No new repos found!')
|
||||
return new_repos
|
||||
|
||||
|
||||
def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[str, ...]:
|
||||
"""
|
||||
Return relative parent strings
|
||||
|
||||
For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/
|
||||
then return (b, c, d)
|
||||
"""
|
||||
for p in paths:
|
||||
if is_relative_to(repo_path, p):
|
||||
break
|
||||
else:
|
||||
return ()
|
||||
return (os.path.basename(p),
|
||||
*os.path.normpath(os.path.relpath(repo_path, p)).split(os.sep)[:-1])
|
||||
|
||||
|
||||
def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]
|
||||
) -> Dict[str, List[str]]:
|
||||
"""
|
||||
|
||||
"""
|
||||
# FIXME: the upstream code should make sure that paths are all independent
|
||||
# i.e., each repo should be contained in one and only one path
|
||||
new_groups = defaultdict(list)
|
||||
for repo_name, prop in repos.items():
|
||||
hash = _generate_dir_hash(prop['path'], paths)
|
||||
if not hash:
|
||||
continue
|
||||
for i in range(1, len(hash)+1):
|
||||
group_name = '-'.join(hash[:i])
|
||||
new_groups[group_name].append(repo_name)
|
||||
# FIXME: need to make sure the new group names don't clash with old ones
|
||||
# or repo names
|
||||
return new_groups
|
||||
|
||||
|
||||
def parse_clone_config(fname: str) -> Iterator[List[str]]:
|
||||
|
@ -157,6 +286,7 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s
|
|||
Run `cmds` asynchronously in `path` directory. Return the `path` if
|
||||
execution fails.
|
||||
"""
|
||||
# TODO: deprecated since 3.8, will be removed in 3.10
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmds,
|
||||
stdin=asyncio.subprocess.DEVNULL,
|
||||
|
@ -199,7 +329,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
|
|||
return errors
|
||||
|
||||
|
||||
def describe(repos: Dict[str, str], no_colors: bool=False) -> str:
|
||||
def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str:
|
||||
"""
|
||||
Return the status of all repos
|
||||
"""
|
||||
|
@ -213,9 +343,14 @@ def describe(repos: Dict[str, str], no_colors: bool=False) -> str:
|
|||
funcs[idx] = partial(get_repo_status, no_colors=True)
|
||||
|
||||
for name in sorted(repos):
|
||||
path = repos[name]
|
||||
info_items = ' '.join(f(path) for f in funcs)
|
||||
yield f'{name:<{name_width}}{info_items}'
|
||||
info_items = ' '.join(f(repos[name]) for f in funcs)
|
||||
if repos[name]['type'] == 'm':
|
||||
# ANSI color code also takes length in Python
|
||||
name = f'{info.Color.underline}{name}{info.Color.end}'
|
||||
width = name_width + 8
|
||||
yield f'{name:<{width}}{info_items}'
|
||||
else:
|
||||
yield f'{name:<{name_width}}{info_items}'
|
||||
|
||||
|
||||
def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
|
||||
|
@ -231,17 +366,17 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
|
|||
}
|
||||
"""
|
||||
# default config file
|
||||
fname = os.path.join(os.path.dirname(__file__), "cmds.yml")
|
||||
with open(fname, 'r') as stream:
|
||||
cmds = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
fname = os.path.join(os.path.dirname(__file__), "cmds.json")
|
||||
with open(fname, 'r') as f:
|
||||
cmds = json.load(f)
|
||||
|
||||
# custom config file
|
||||
root = common.get_config_dir()
|
||||
fname = os.path.join(root, 'cmds.yml')
|
||||
fname = os.path.join(root, 'cmds.json')
|
||||
custom_cmds = {}
|
||||
if os.path.isfile(fname) and os.path.getsize(fname):
|
||||
with open(fname, 'r') as stream:
|
||||
custom_cmds = yaml.load(stream, Loader=yaml.FullLoader)
|
||||
with open(fname, 'r') as f:
|
||||
custom_cmds = json.load(f)
|
||||
|
||||
# custom commands shadow default ones
|
||||
cmds.update(custom_cmds)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue