1
0
Fork 0

Merging upstream version 0.15.7.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-11 18:44:38 +01:00
parent 95bdcb5aa6
commit 83fc50a933
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
10 changed files with 324 additions and 206 deletions

View file

@ -19,6 +19,7 @@ import sys
import csv
import argparse
import subprocess
from functools import partial
import pkg_resources
from itertools import chain
from pathlib import Path
@ -27,21 +28,37 @@ import glob
from . import utils, info, common
def _group_name(name: str) -> str:
def _group_name(name: str, exclude_old_names=True) -> str:
"""
Return valid group name
"""
repos = utils.get_repos()
if name in repos:
print(f"Cannot use group name {name} since it's a repo name.")
sys.exit(1)
if exclude_old_names:
if name in utils.get_groups():
print(f"Cannot use group name {name} since it's already in use.")
sys.exit(1)
if name in {'none', 'auto'}:
print(f"Cannot use group name {name} since it's a reserved keyword.")
sys.exit(1)
return name
def _path_name(name: str) -> str:
"""
Return absolute path without trailing /
"""
if name:
return os.path.abspath(name).rstrip(os.path.sep)
return ''
def f_add(args: argparse.Namespace):
repos = utils.get_repos()
paths = args.paths
if args.main:
if 0:
# add to global and tag as main
main_repos = utils.add_repos(repos, paths, repo_type='m')
# add sub-repo recursively and save to local config
@ -53,11 +70,11 @@ def f_add(args: argparse.Namespace):
utils.add_repos({}, sub_paths, root=main_path)
else:
if args.recursive or args.auto_group:
paths = chain.from_iterable(
paths = (p.rstrip(os.path.sep) for p in chain.from_iterable(
glob.glob(os.path.join(p, '**/'), recursive=True)
for p in args.paths)
for p in args.paths))
new_repos = utils.add_repos(repos, paths, is_bare=args.bare)
if args.auto_group:
if new_repos and args.auto_group:
new_groups = utils.auto_group(new_repos, args.paths)
if new_groups:
print(f'Created {len(new_groups)} new group(s).')
@ -159,11 +176,24 @@ def f_ll(args: argparse.Namespace):
ctx = utils.get_context()
if args.group is None and ctx:
args.group = ctx.stem
group_repos = None
if args.group: # only display repos in this group
group_repos = utils.get_groups()[args.group]
group_repos = utils.get_groups()[args.group]['repos']
repos = {k: repos[k] for k in group_repos if k in repos}
for line in utils.describe(repos, no_colors=args.no_colors):
print(line)
if args.g: # display by group
if group_repos:
print(f'{args.group}:')
for line in utils.describe(repos, no_colors=args.no_colors):
print(' ', line)
else:
for g, g_repos in utils.get_groups().items():
print(f'{g}:')
g_repos = {k: repos[k] for k in g_repos if k in repos}
for line in utils.describe(g_repos, no_colors=args.no_colors):
print(' ', line)
else:
for line in utils.describe(repos, no_colors=args.no_colors):
print(line)
def f_ls(args: argparse.Namespace):
@ -180,69 +210,73 @@ def f_group(args: argparse.Namespace):
if cmd == 'll':
if 'to_show' in args and args.to_show:
gname = args.to_show
print(' '.join(groups[gname]))
print(' '.join(groups[gname]['repos']))
else:
for group, repos in groups.items():
print(f"{group}: {' '.join(repos)}")
for group, prop in groups.items():
print(f"{info.Color.underline}{group}{info.Color.end}: {prop['path']}")
for r in prop['repos']:
print(' -', r)
elif cmd == 'ls':
print(' '.join(groups))
elif cmd == 'rename':
new_name = args.new_name
if new_name in groups:
sys.exit(f'{new_name} already exists.')
gname = args.gname
groups[new_name] = groups[gname]
del groups[gname]
utils.write_to_groups_file(groups, 'w')
# change context
ctx = utils.get_context()
if ctx and str(ctx.stem) == gname:
# ctx.rename(ctx.with_stem(new_name)) # only works in py3.9
ctx.rename(ctx.with_name(f'{new_name}.context'))
if ctx and ctx.stem == gname:
utils.replace_context(ctx, new_name)
elif cmd == 'rm':
ctx = utils.get_context()
for name in args.to_ungroup:
del groups[name]
if ctx and str(ctx.stem) == name:
ctx.unlink()
utils.replace_context(ctx, '')
utils.write_to_groups_file(groups, 'w')
elif cmd == 'add':
gname = args.gname
if gname in groups:
gname_repos = set(groups[gname])
gname_repos = set(groups[gname]['repos'])
gname_repos.update(args.to_group)
groups[gname] = sorted(gname_repos)
groups[gname]['repos'] = sorted(gname_repos)
if 'gpath' in args:
groups[gname]['path'] = args.gpath
utils.write_to_groups_file(groups, 'w')
else:
utils.write_to_groups_file({gname: sorted(args.to_group)}, 'a+')
gpath = ''
if 'gpath' in args:
gpath = args.gpath
utils.write_to_groups_file(
{gname: {'repos': sorted(args.to_group),
'path': gpath}},
'a+')
elif cmd == 'rmrepo':
gname = args.gname
if gname in groups:
for repo in args.from_group:
try:
groups[gname].remove(repo)
except ValueError as e:
pass
group = {gname: {'repos': groups[gname]['repos'],
'path': groups[gname]['path']
}}
for repo in args.to_rm:
utils.delete_repo_from_groups(repo, group)
groups[gname] = group[gname]
utils.write_to_groups_file(groups, 'w')
def f_context(args: argparse.Namespace):
choice = args.choice
ctx = utils.get_context()
if choice is None:
if choice is None: # display current context
if ctx:
group = ctx.stem
print(f"{group}: {' '.join(utils.get_groups()[group])}")
print(f"{group}: {' '.join(utils.get_groups()[group]['repos'])}")
elif (Path(common.get_config_dir()) / 'auto.context').exists():
print('auto: none detected!')
else:
print('Context is not set')
elif choice == 'none': # remove context
ctx and ctx.unlink()
else: # set context
fname = Path(common.get_config_dir()) / (choice + '.context')
if ctx:
ctx.rename(fname)
else:
open(fname, 'w').close()
utils.replace_context(ctx, choice)
def f_rm(args: argparse.Namespace):
@ -255,12 +289,19 @@ def f_rm(args: argparse.Namespace):
main_paths = [prop['path'] for prop in repos.values() if prop['type'] == 'm']
# TODO: add test case to delete main repo from main repo
# only local setting should be affected instead of the global one
group_updated = False
for repo in args.repo:
del repos[repo]
groups = utils.get_groups()
group_updated = group_updated or utils.delete_repo_from_groups(repo, groups)
if group_updated:
utils.write_to_groups_file(groups, 'w')
# If cwd is relative to any main repo, write to local config
cwd = os.getcwd()
# TODO: delete main path mechanism
for p in main_paths:
if utils.is_relative_to(cwd, p):
if utils.get_relative_path(cwd, p) is not None:
utils.write_to_repo_file(repos, 'w', p)
break
else: # global config
@ -283,7 +324,7 @@ def f_git_cmd(args: argparse.Namespace):
if k in repos:
chosen[k] = repos[k]
if k in groups:
for r in groups[k]:
for r in groups[k]['repos']:
chosen[r] = repos[r]
repos = chosen
per_repo_cmds = []
@ -343,7 +384,6 @@ def f_shell(args):
chosen[r] = repos[r]
repos = chosen
cmds = ' '.join(args.man[i:]) # join the shell command into a single string
#cmds = args.man[i:]
for name, prop in repos.items():
# TODO: pull this out as a function
got = subprocess.run(cmds, cwd=prop['path'], shell=True,
@ -387,12 +427,10 @@ def main(argv=None):
# bookkeeping sub-commands
p_add = subparsers.add_parser('add', description='add repo(s)',
help='add repo(s)')
p_add.add_argument('paths', nargs='+', type=os.path.abspath, help="repo(s) to add")
p_add.add_argument('paths', nargs='+', type=_path_name, help="repo(s) to add")
xgroup = p_add.add_mutually_exclusive_group()
xgroup.add_argument('-r', '--recursive', action='store_true',
help="recursively add repo(s) in the given path(s).")
xgroup.add_argument('-m', '--main', action='store_true',
help="make main repo(s), sub-repos are recursively added.")
xgroup.add_argument('-a', '--auto-group', action='store_true',
help="recursively add repo(s) in the given path(s) "
"and create hierarchical groups based on folder structure.")
@ -504,6 +542,8 @@ def main(argv=None):
help="show repos in the chosen group")
p_ll.add_argument('-C', '--no-colors', action='store_true',
help='Disable coloring on the branch names.')
p_ll.add_argument('-g', action='store_true',
help='Show repo summaries by group.')
p_ll.set_defaults(func=f_ll)
p_context = subparsers.add_parser('context',
@ -512,8 +552,12 @@ def main(argv=None):
' When set, all operations apply only to repos in that group.')
p_context.add_argument('choice',
nargs='?',
choices=set().union(utils.get_groups(), {'none'}),
help="Without argument, show current context. Otherwise choose a group as context. To remove context, use 'none'. ")
choices=set().union(utils.get_groups(), {'none', 'auto'}),
help="Without this argument, show current context. "
"Otherwise choose a group as context, or use 'auto', "
"which sets the context/group automatically based on "
"the current working directory. "
"To remove context, use 'none'. ")
p_context.set_defaults(func=f_context)
p_ls = subparsers.add_parser(
@ -545,12 +589,16 @@ def main(argv=None):
help="repo(s) to be grouped")
pg_add.add_argument('-n', '--name',
dest='gname',
type=_group_name,
type=partial(_group_name, exclude_old_names=False),
metavar='group-name',
required=True,
help="group name")
required=True)
pg_add.add_argument('-p', '--path',
dest='gpath',
type=_path_name,
metavar='group-path')
pg_rmrepo = group_cmds.add_parser('rmrepo', description='remove repo(s) from a group.')
pg_rmrepo.add_argument('from_group',
pg_rmrepo.add_argument('to_rm',
nargs='+',
metavar='repo',
choices=utils.get_repos(),
@ -641,6 +689,5 @@ def main(argv=None):
else:
p.print_help() # pragma: no cover
if __name__ == '__main__':
main() # pragma: no cover

View file

@ -1,6 +1,5 @@
import os
import csv
import yaml
import subprocess
from enum import Enum
from pathlib import Path
@ -34,6 +33,15 @@ class Color(str, Enum):
underline = '\x1B[4m'
default_colors = {
'no-remote': Color.white.name,
'in-sync': Color.green.name,
'diverged': Color.red.name,
'local-ahead': Color.purple.name,
'remote-ahead': Color.yellow.name,
}
def show_colors(): # pragma: no cover
"""
@ -61,13 +69,7 @@ def get_color_encoding() -> Dict[str, str]:
reader = csv.DictReader(f)
colors = next(reader)
else:
colors = {
'no-remote': Color.white.name,
'in-sync': Color.green.name,
'diverged': Color.red.name,
'local-ahead': Color.purple.name,
'remote-ahead': Color.yellow.name,
}
colors = default_colors
return colors

View file

@ -1,3 +1,4 @@
import sys
import os
import json
import csv
@ -13,12 +14,27 @@ from . import info
from . import common
# TODO: python3.9 pathlib has is_relative_to() function
def is_relative_to(kid: str, parent: str) -> bool:
MAX_INT = sys.maxsize
def get_relative_path(kid: str, parent: str) -> Union[List[str], None]:
"""
Both the `kid` and `parent` should be absolute path
Return the relative path depth if relative, otherwise MAX_INT.
Both the `kid` and `parent` should be absolute paths without trailing /
"""
return parent == os.path.commonpath((kid, parent))
# Note that os.path.commonpath has no trailing /
# TODO: python3.9 pathlib has is_relative_to() function
# TODO: Maybe use os.path.commonprefix? since it's faster?
if parent == '':
return None
if parent == os.path.commonpath((kid, parent)):
rel = os.path.normpath(os.path.relpath(kid, parent)).split(os.sep)
if rel == ['.']:
rel = []
return rel
else:
return None
@lru_cache()
@ -43,7 +59,7 @@ def get_repos(root=None) -> Dict[str, Dict[str, str]]:
cwd = os.getcwd()
for prop in repos.values():
path = prop['path']
if prop['type'] == 'm' and is_relative_to(cwd, path):
if prop['type'] == 'm' and get_relative_path(cwd, path) != MAX_INT:
return get_repos(path)
return repos
@ -51,29 +67,94 @@ def get_repos(root=None) -> Dict[str, Dict[str, str]]:
@lru_cache()
def get_context() -> Union[Path, None]:
"""
Return the context: either a group name or 'none'
Return context file path, or None if not set. Note that if in auto context
mode, the return value is not auto.context but the resolved context,
which could be None.
"""
config_dir = Path(common.get_config_dir())
matches = list(config_dir.glob('*.context'))
assert len(matches) < 2, "Cannot have multiple .context file"
return matches[0] if matches else None
if len(matches) > 1:
print("Cannot have multiple .context file")
sys.exit(1)
if not matches:
return None
ctx = matches[0]
if ctx.stem == 'auto':
cwd = str(Path.cwd())
repos = get_repos()
# The context is set to be the group with minimal distance to cwd
candidate = None
min_dist = MAX_INT
for gname, prop in get_groups().items():
rel = get_relative_path(cwd, prop['path'])
if rel is None:
continue
d = len(rel)
if d < min_dist:
candidate = gname
min_dist = d
if not candidate:
ctx = None
else:
ctx = ctx.with_name(f'{candidate}.context')
return ctx
@lru_cache()
def get_groups() -> Dict[str, List[str]]:
def get_groups() -> Dict[str, Dict]:
"""
Return a `dict` of group name to repo names.
Return a `dict` of group name to group properties such as repo names and
group path.
"""
fname = common.get_config_fname('groups.csv')
groups = {}
# Each line is a repo path and repo name separated by ,
# Each line is: group-name:repo1 repo2 repo3:group-path
if os.path.isfile(fname) and os.stat(fname).st_size > 0:
with open(fname, 'r') as f:
rows = csv.reader(f, delimiter=':')
groups = {r[0]: r[1].split() for r in rows}
rows = csv.DictReader(f, ['name', 'repos', 'path'],
restval='', delimiter=':')
groups = {
r['name']: {
'repos': r['repos'].split(),
'path': r['path']
}
for r in rows}
return groups
def delete_repo_from_groups(repo: str, groups: Dict[str, Dict]) -> bool:
"""
Delete repo from groups
"""
deleted = False
for name in groups:
try:
groups[name]['repos'].remove(repo)
except ValueError as e:
pass
else:
deleted = True
return deleted
def replace_context(old: Union[Path, None], new: str):
"""
"""
auto = Path(common.get_config_dir()) / 'auto.context'
if auto.exists():
old = auto
if new == 'none': # delete
old and old.unlink()
elif old:
# ctx.rename(ctx.with_stem(new_name)) # only works in py3.9
old.rename(old.with_name(f'{new}.context'))
else:
open(auto.with_name(f'{new}.context'), 'w').close()
def get_choices() -> List[Union[str, None]]:
"""
Return all repo names, group names, and an additional empty list. The empty
@ -117,6 +198,7 @@ def is_git(path: str, is_bare=False) -> bool:
return True
return False
def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str):
"""
Write new repo name to file
@ -131,8 +213,9 @@ def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str):
main_paths = (prop['path'] for prop in repos.values() if prop['type'] == 'm')
cwd = os.getcwd()
is_local_config = True
# TODO: delete
for p in main_paths:
if is_relative_to(cwd, p):
if get_relative_path(cwd, p) != MAX_INT:
write_to_repo_file(repos, 'w', p)
break
else: # global config
@ -163,7 +246,8 @@ def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str, root=None):
writer.writerows(data)
def write_to_groups_file(groups: Dict[str, List[str]], mode: str):
# TODO: combine with the repo writer
def write_to_groups_file(groups: Dict[str, Dict], mode: str):
"""
"""
@ -174,8 +258,8 @@ def write_to_groups_file(groups: Dict[str, List[str]], mode: str):
else:
with open(fname, mode, newline='') as f:
data = [
(group, ' '.join(repos))
for group, repos in groups.items()
(group, ' '.join(prop['repos']), prop['path'])
for group, prop in groups.items()
]
writer = csv.writer(f, delimiter=':', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerows(data)
@ -191,11 +275,13 @@ def _make_name(path: str, repos: Dict[str, Dict[str, str]],
"""
name = os.path.basename(os.path.normpath(path))
if name in repos or name_counts[name] > 1:
# path has no trailing /
par_name = os.path.basename(os.path.dirname(path))
return os.path.join(par_name, name)
return name
# TODO: delete
def _get_repo_type(path, repo_type, root) -> str:
"""
@ -236,37 +322,45 @@ def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str],
return new_repos
def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[str, ...]:
def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[
Tuple[str, ...], str]:
"""
Return relative parent strings
Return relative parent strings, and the parent head string
For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/
then return (b, c, d)
"""
for p in paths:
if is_relative_to(repo_path, p):
rel = get_relative_path(repo_path, p)[:-1]
if rel is not None:
break
else:
return ()
return (os.path.basename(p),
*os.path.normpath(os.path.relpath(repo_path, p)).split(os.sep)[:-1])
return (), ''
head, tail = os.path.split(p)
return (tail, *rel), head
def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]
) -> Dict[str, List[str]]:
) -> Dict[str, Dict]:
"""
@params repos: repos to be grouped
"""
# FIXME: the upstream code should make sure that paths are all independent
# i.e., each repo should be contained in one and only one path
new_groups = defaultdict(list)
new_groups = defaultdict(dict)
for repo_name, prop in repos.items():
hash = _generate_dir_hash(prop['path'], paths)
hash, head = _generate_dir_hash(prop['path'], paths)
if not hash:
continue
for i in range(1, len(hash)+1):
group_name = '-'.join(hash[:i])
new_groups[group_name].append(repo_name)
prop = new_groups[group_name]
prop['path'] = os.path.join(head, *hash[:i])
if 'repos' not in prop:
prop['repos'] = [repo_name]
else:
prop['repos'].append(repo_name)
# FIXME: need to make sure the new group names don't clash with old ones
# or repo names
return new_groups