238 lines
7.3 KiB
Python
238 lines
7.3 KiB
Python
import os
|
|
import yaml
|
|
import asyncio
|
|
import platform
|
|
from functools import lru_cache, partial
|
|
from pathlib import Path
|
|
from typing import List, Dict, Coroutine, Union
|
|
|
|
from . import info
|
|
from . import common
|
|
|
|
|
|
@lru_cache()
|
|
def get_context() -> Union[Path, None]:
|
|
"""
|
|
Return the context: either a group name or 'none'
|
|
"""
|
|
config_dir = Path(common.get_config_dir())
|
|
matches = list(config_dir.glob('*.context'))
|
|
assert len(matches) < 2, "Cannot have multiple .context file"
|
|
return matches[0] if matches else None
|
|
|
|
|
|
@lru_cache()
|
|
def get_repos() -> Dict[str, str]:
|
|
"""
|
|
Return a `dict` of repo name to repo absolute path
|
|
"""
|
|
path_file = common.get_config_fname('repo_path')
|
|
repos = {}
|
|
# Each line is a repo path and repo name separated by ,
|
|
if os.path.isfile(path_file) and os.stat(path_file).st_size > 0:
|
|
with open(path_file) as f:
|
|
for line in f:
|
|
line = line.rstrip()
|
|
if not line: # blank line
|
|
continue
|
|
path, name = line.split(',')
|
|
if not is_git(path):
|
|
continue
|
|
if name not in repos:
|
|
repos[name] = path
|
|
else: # repo name collision for different paths: include parent path name
|
|
par_name = os.path.basename(os.path.dirname(path))
|
|
repos[os.path.join(par_name, name)] = path
|
|
return repos
|
|
|
|
|
|
@lru_cache()
|
|
def get_groups() -> Dict[str, List[str]]:
|
|
"""
|
|
Return a `dict` of group name to repo names.
|
|
"""
|
|
fname = common.get_config_fname('groups.yml')
|
|
groups = {}
|
|
# Each line is a repo path and repo name separated by ,
|
|
if os.path.isfile(fname) and os.stat(fname).st_size > 0:
|
|
with open(fname, 'r') as f:
|
|
groups = yaml.load(f, Loader=yaml.FullLoader)
|
|
return groups
|
|
|
|
|
|
|
|
def get_choices() -> List[Union[str, None]]:
|
|
"""
|
|
Return all repo names, group names, and an additional empty list. The empty
|
|
list is added as a workaround of
|
|
argparse's problem with coexisting nargs='*' and choices.
|
|
See https://utcc.utoronto.ca/~cks/space/blog/python/ArgparseNargsChoicesLimitation
|
|
and
|
|
https://bugs.python.org/issue27227
|
|
"""
|
|
choices = list(get_repos())
|
|
choices.extend(get_groups())
|
|
choices.append([])
|
|
return choices
|
|
|
|
|
|
def is_git(path: str) -> bool:
|
|
"""
|
|
Return True if the path is a git repo.
|
|
"""
|
|
# An alternative is to call `git rev-parse --is-inside-work-tree`
|
|
# I don't see why that one is better yet.
|
|
# For a regular git repo, .git is a folder, for a worktree repo, .git is a file.
|
|
# However, git submodule repo also has .git as a file.
|
|
# A more reliable way to differentiable regular and worktree repos is to
|
|
# compare the result of `git rev-parse --git-dir` and
|
|
# `git rev-parse --git-common-dir`
|
|
loc = os.path.join(path, '.git')
|
|
# TODO: we can display the worktree repos in a different font.
|
|
return os.path.exists(loc)
|
|
|
|
|
|
def rename_repo(repos: Dict[str, str], repo: str, new_name: str):
|
|
"""
|
|
Write new repo name to file
|
|
"""
|
|
path = repos[repo]
|
|
del repos[repo]
|
|
repos[new_name] = path
|
|
write_to_repo_file(repos, 'w')
|
|
|
|
|
|
def write_to_repo_file(repos: Dict[str, str], mode: str):
|
|
"""
|
|
"""
|
|
data = ''.join(f'{path},{name}\n' for name, path in repos.items())
|
|
fname = common.get_config_fname('repo_path')
|
|
os.makedirs(os.path.dirname(fname), exist_ok=True)
|
|
with open(fname, mode) as f:
|
|
f.write(data)
|
|
|
|
|
|
def write_to_groups_file(groups: Dict[str, List[str]], mode: str):
|
|
"""
|
|
|
|
"""
|
|
fname = common.get_config_fname('groups.yml')
|
|
os.makedirs(os.path.dirname(fname), exist_ok=True)
|
|
if not groups: # all groups are deleted
|
|
open(fname, 'w').close()
|
|
else:
|
|
with open(fname, mode) as f:
|
|
yaml.dump(groups, f, default_flow_style=None)
|
|
|
|
|
|
def add_repos(repos: Dict[str, str], new_paths: List[str]):
|
|
"""
|
|
Write new repo paths to file
|
|
"""
|
|
existing_paths = set(repos.values())
|
|
new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p))
|
|
new_paths = new_paths - existing_paths
|
|
if new_paths:
|
|
print(f"Found {len(new_paths)} new repo(s).")
|
|
new_repos = {
|
|
os.path.basename(os.path.normpath(path)): path
|
|
for path in new_paths}
|
|
write_to_repo_file(new_repos, 'a+')
|
|
else:
|
|
print('No new repos found!')
|
|
|
|
|
|
async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]:
|
|
"""
|
|
Run `cmds` asynchronously in `path` directory. Return the `path` if
|
|
execution fails.
|
|
"""
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmds,
|
|
stdin=asyncio.subprocess.DEVNULL,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
start_new_session=True,
|
|
cwd=path)
|
|
stdout, stderr = await process.communicate()
|
|
for pipe in (stdout, stderr):
|
|
if pipe:
|
|
print(format_output(pipe.decode(), f'{repo_name}: '))
|
|
# The existence of stderr is not good indicator since git sometimes write
|
|
# to stderr even if the execution is successful, e.g. git fetch
|
|
if process.returncode != 0:
|
|
return path
|
|
|
|
|
|
def format_output(s: str, prefix: str):
|
|
"""
|
|
Prepends every line in given string with the given prefix.
|
|
"""
|
|
return ''.join([f'{prefix}{line}' for line in s.splitlines(keepends=True)])
|
|
|
|
|
|
def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
|
|
"""
|
|
Execute tasks asynchronously
|
|
"""
|
|
# TODO: asyncio API is nicer in python 3.7
|
|
if platform.system() == 'Windows':
|
|
loop = asyncio.ProactorEventLoop()
|
|
asyncio.set_event_loop(loop)
|
|
else:
|
|
loop = asyncio.get_event_loop()
|
|
|
|
try:
|
|
errors = loop.run_until_complete(asyncio.gather(*tasks))
|
|
finally:
|
|
loop.close()
|
|
return errors
|
|
|
|
|
|
def describe(repos: Dict[str, str], no_colors: bool=False) -> str:
|
|
"""
|
|
Return the status of all repos
|
|
"""
|
|
if repos:
|
|
name_width = max(len(n) for n in repos) + 1
|
|
funcs = info.get_info_funcs()
|
|
|
|
get_repo_status = info.get_repo_status
|
|
if get_repo_status in funcs and no_colors:
|
|
idx = funcs.index(get_repo_status)
|
|
funcs[idx] = partial(get_repo_status, no_colors=True)
|
|
|
|
for name in sorted(repos):
|
|
path = repos[name]
|
|
info_items = ' '.join(f(path) for f in funcs)
|
|
yield f'{name:<{name_width}}{info_items}'
|
|
|
|
|
|
def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
|
|
"""
|
|
Parse delegated git commands from default config file
|
|
and custom config file.
|
|
|
|
Example return
|
|
{
|
|
'branch': {'help': 'show local branches'},
|
|
'clean': {'cmd': 'clean -dfx',
|
|
'help': 'remove all untracked files/folders'},
|
|
}
|
|
"""
|
|
# default config file
|
|
fname = os.path.join(os.path.dirname(__file__), "cmds.yml")
|
|
with open(fname, 'r') as stream:
|
|
cmds = yaml.load(stream, Loader=yaml.FullLoader)
|
|
|
|
# custom config file
|
|
root = common.get_config_dir()
|
|
fname = os.path.join(root, 'cmds.yml')
|
|
custom_cmds = {}
|
|
if os.path.isfile(fname) and os.path.getsize(fname):
|
|
with open(fname, 'r') as stream:
|
|
custom_cmds = yaml.load(stream, Loader=yaml.FullLoader)
|
|
|
|
# custom commands shadow default ones
|
|
cmds.update(custom_cmds)
|
|
return cmds
|