1
0
Fork 0

Merging upstream version 0.15.1.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-11 18:42:23 +01:00
parent 86d5d7fe9f
commit 9cbf6c15e9
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
22 changed files with 1805 additions and 372 deletions

View file

@ -16,22 +16,52 @@ https://github.com/nosarthur/gita/blob/master/.gita-completion.bash
import os
import sys
import yaml
import csv
import argparse
import subprocess
import pkg_resources
from itertools import chain
from pathlib import Path
import glob
from . import utils, info, common
def _group_name(name: str) -> str:
"""
"""
repos = utils.get_repos()
if name in repos:
print(f"Cannot use group name {name} since it's a repo name.")
sys.exit(1)
return name
def f_add(args: argparse.Namespace):
repos = utils.get_repos()
paths = args.paths
if args.recursive:
paths = chain.from_iterable(Path(p).glob('**') for p in args.paths)
utils.add_repos(repos, paths)
if args.main:
# add to global and tag as main
main_repos = utils.add_repos(repos, paths, repo_type='m')
# add sub-repo recursively and save to local config
for name, prop in main_repos.items():
main_path = prop['path']
print('Inside main repo:', name)
#sub_paths = Path(main_path).glob('**')
sub_paths = glob.glob(os.path.join(main_path,'**/'), recursive=True)
utils.add_repos({}, sub_paths, root=main_path)
else:
if args.recursive or args.auto_group:
paths = chain.from_iterable(
glob.glob(os.path.join(p, '**/'), recursive=True)
for p in args.paths)
new_repos = utils.add_repos(repos, paths, is_bare=args.bare)
if args.auto_group:
new_groups = utils.auto_group(new_repos, args.paths)
if new_groups:
print(f'Created {len(new_groups)} new group(s).')
utils.write_to_groups_file(new_groups, 'a+')
def f_rename(args: argparse.Namespace):
@ -39,16 +69,32 @@ def f_rename(args: argparse.Namespace):
utils.rename_repo(repos, args.repo[0], args.new_name)
def f_flags(args: argparse.Namespace):
cmd = args.flags_cmd or 'll'
repos = utils.get_repos()
if cmd == 'll':
for r, prop in repos.items():
if prop['flags']:
print(f"{r}: {prop['flags']}")
elif cmd == 'set':
# when in memory, flags are List[str], when on disk, they are space
# delimited str
repos[args.repo]['flags'] = args.flags
utils.write_to_repo_file(repos, 'w')
def f_color(args: argparse.Namespace):
cmd = args.color_cmd or 'll'
if cmd == 'll': # pragma: no cover
info.show_colors()
elif cmd == 'set':
colors = info.get_color_encoding()
colors[args.situation] = info.Color[args.color].value
yml_config = common.get_config_fname('color.yml')
with open(yml_config, 'w') as f:
yaml.dump(colors, f, default_flow_style=None)
colors[args.situation] = args.color
csv_config = common.get_config_fname('color.csv')
with open(csv_config, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=colors)
writer.writeheader()
writer.writerow(colors)
def f_info(args: argparse.Namespace):
@ -56,37 +102,53 @@ def f_info(args: argparse.Namespace):
cmd = args.info_cmd or 'll'
if cmd == 'll':
print('In use:', ','.join(to_display))
unused = set(info.ALL_INFO_ITEMS) - set(to_display)
unused = sorted(list(set(info.ALL_INFO_ITEMS) - set(to_display)))
if unused:
print('Unused:', ' '.join(unused))
print('Unused:', ','.join(unused))
return
if cmd == 'add' and args.info_item not in to_display:
to_display.append(args.info_item)
yml_config = common.get_config_fname('info.yml')
with open(yml_config, 'w') as f:
yaml.dump(to_display, f, default_flow_style=None)
csv_config = common.get_config_fname('info.csv')
with open(csv_config, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(to_display)
elif cmd == 'rm' and args.info_item in to_display:
to_display.remove(args.info_item)
yml_config = common.get_config_fname('info.yml')
with open(yml_config, 'w') as f:
yaml.dump(to_display, f, default_flow_style=None)
csv_config = common.get_config_fname('info.csv')
with open(csv_config, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(to_display)
def f_clone(args: argparse.Namespace):
path = Path.cwd()
errors = utils.exec_async_tasks(
if args.preserve_path:
utils.exec_async_tasks(
utils.run_async(repo_name, path, ['git', 'clone', url, abs_path])
for url, repo_name, abs_path in utils.parse_clone_config(args.fname))
else:
utils.exec_async_tasks(
utils.run_async(repo_name, path, ['git', 'clone', url])
for url, repo_name, _ in utils.parse_clone_config(args.fname))
def f_freeze(_):
repos = utils.get_repos()
for name, path in repos.items():
seen = {''}
for name, prop in repos.items():
path = prop['path']
# TODO: What do we do with main repos? Maybe give an option to print
# their sub-repos too.
url = ''
cp = subprocess.run(['git', 'remote', '-v'], cwd=path, capture_output=True)
if cp.returncode == 0:
url = cp.stdout.decode('utf-8').split('\n')[0].split()[1]
print(f'{url},{name},{path}')
lines = cp.stdout.decode('utf-8').split('\n')
if cp.returncode == 0 and len(lines) > 0:
parts = lines[0].split()
if len(parts)>1:
url = parts[1]
if url not in seen:
seen.add(url)
print(f'{url},{name},{path}')
def f_ll(args: argparse.Namespace):
@ -107,7 +169,7 @@ def f_ll(args: argparse.Namespace):
def f_ls(args: argparse.Namespace):
repos = utils.get_repos()
if args.repo: # one repo, show its path
print(repos[args.repo])
print(repos[args.repo]['path'])
else: # show names of all repos
print(' '.join(repos))
@ -128,6 +190,11 @@ def f_group(args: argparse.Namespace):
groups[new_name] = groups[gname]
del groups[gname]
utils.write_to_groups_file(groups, 'w')
# change context
ctx = utils.get_context()
if ctx and str(ctx.stem) == gname:
# ctx.rename(ctx.with_stem(new_name)) # only works in py3.9
ctx.rename(ctx.with_name(f'{new_name}.context'))
elif cmd == 'rm':
ctx = utils.get_context()
for name in args.to_ungroup:
@ -178,12 +245,22 @@ def f_rm(args: argparse.Namespace):
"""
Unregister repo(s) from gita
"""
path_file = common.get_config_fname('repo_path')
path_file = common.get_config_fname('repos.csv')
if os.path.isfile(path_file):
repos = utils.get_repos()
main_paths = [prop['path'] for prop in repos.values() if prop['type'] == 'm']
# TODO: add test case to delete main repo from main repo
# only local setting should be affected instead of the global one
for repo in args.repo:
del repos[repo]
utils.write_to_repo_file(repos, 'w')
# If cwd is relative to any main repo, write to local config
cwd = os.getcwd()
for p in main_paths:
if utils.is_relative_to(cwd, p):
utils.write_to_repo_file(repos, 'w', p)
break
else: # global config
utils.write_to_repo_file(repos, 'w')
def f_git_cmd(args: argparse.Namespace):
@ -205,21 +282,33 @@ def f_git_cmd(args: argparse.Namespace):
for r in groups[k]:
chosen[r] = repos[r]
repos = chosen
cmds = ['git'] + args.cmd
if len(repos) == 1 or cmds[1] in args.async_blacklist:
for path in repos.values():
per_repo_cmds = []
for prop in repos.values():
cmds = args.cmd.copy()
if cmds[0] == 'git' and prop['flags']:
cmds[1:1] = prop['flags']
per_repo_cmds.append(cmds)
# This async blacklist mechanism is broken if the git command name does
# not match with the gita command name.
if len(repos) == 1 or args.cmd[1] in args.async_blacklist:
for prop, cmds in zip(repos.values(), per_repo_cmds):
path = prop['path']
print(path)
subprocess.run(cmds, cwd=path)
subprocess.run(cmds, cwd=path, shell=args.shell)
else: # run concurrent subprocesses
# Async execution cannot deal with multiple repos' user name/password.
# Here we shut off any user input in the async execution, and re-run
# the failed ones synchronously.
errors = utils.exec_async_tasks(
utils.run_async(repo_name, path, cmds) for repo_name, path in repos.items())
utils.run_async(repo_name, prop['path'], cmds)
for cmds, (repo_name, prop) in zip(per_repo_cmds, repos.items()))
for path in errors:
if path:
print(path)
subprocess.run(cmds, cwd=path)
# FIXME: This is broken, flags are missing. But probably few
# people will use `gita flags`
subprocess.run(args.cmd, cwd=path)
def f_shell(args):
@ -249,10 +338,10 @@ def f_shell(args):
for r in groups[k]:
chosen[r] = repos[r]
repos = chosen
cmds = args.man[i:]
for name, path in repos.items():
cmds = ' '.join(args.man[i:]) # join the shell command into a single string
for name, prop in repos.items():
# TODO: pull this out as a function
got = subprocess.run(cmds, cwd=path, check=True,
got = subprocess.run(cmds, cwd=prop['path'], check=True, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
print(utils.format_output(got.stdout.decode(), name))
@ -271,8 +360,9 @@ def f_super(args):
names.append(word)
else:
break
args.cmd = args.man[i:]
args.cmd = ['git'] + args.man[i:]
args.repo = names
args.shell = False
f_git_cmd(args)
@ -292,9 +382,17 @@ def main(argv=None):
# bookkeeping sub-commands
p_add = subparsers.add_parser('add', description='add repo(s)',
help='add repo(s)')
p_add.add_argument('paths', nargs='+', help="repo(s) to add")
p_add.add_argument('-r', dest='recursive', action='store_true',
help="recursively add repo(s) in the given path.")
p_add.add_argument('paths', nargs='+', type=os.path.abspath, help="repo(s) to add")
xgroup = p_add.add_mutually_exclusive_group()
xgroup.add_argument('-r', '--recursive', action='store_true',
help="recursively add repo(s) in the given path(s).")
xgroup.add_argument('-m', '--main', action='store_true',
help="make main repo(s), sub-repos are recursively added.")
xgroup.add_argument('-a', '--auto-group', action='store_true',
help="recursively add repo(s) in the given path(s) "
"and create hierarchical groups based on folder structure.")
xgroup.add_argument('-b', '--bare', action='store_true',
help="add bare repo(s)")
p_add.set_defaults(func=f_add)
p_rm = subparsers.add_parser('rm', description='remove repo(s)',
@ -305,15 +403,22 @@ def main(argv=None):
help="remove the chosen repo(s)")
p_rm.set_defaults(func=f_rm)
p_freeze = subparsers.add_parser('freeze', description='print all repo information')
p_freeze = subparsers.add_parser('freeze',
description='print all repo information',
help='print all repo information')
p_freeze.set_defaults(func=f_freeze)
p_clone = subparsers.add_parser('clone', description='clone repos from config file')
p_clone = subparsers.add_parser('clone',
description='clone repos from config file',
help='clone repos from config file')
p_clone.add_argument('fname',
help='config file. Its content should be the output of `gita freeze`.')
p_clone.add_argument('-p', '--preserve-path', dest='preserve_path', action='store_true',
help="clone repo(s) in their original paths")
p_clone.set_defaults(func=f_clone)
p_rename = subparsers.add_parser('rename', description='rename a repo')
p_rename = subparsers.add_parser('rename', description='rename a repo',
help='rename a repo')
p_rename.add_argument(
'repo',
nargs=1,
@ -322,8 +427,25 @@ def main(argv=None):
p_rename.add_argument('new_name', help="new name")
p_rename.set_defaults(func=f_rename)
p_flags = subparsers.add_parser('flags',
description='Set custom git flags for repo.',
help='git flags configuration')
p_flags.set_defaults(func=f_flags)
flags_cmds = p_flags.add_subparsers(dest='flags_cmd',
help='additional help with sub-command -h')
flags_cmds.add_parser('ll',
description='display repos with custom flags')
pf_set = flags_cmds.add_parser('set',
description='Set flags for repo.')
pf_set.add_argument('repo', choices=utils.get_repos(),
help="repo name")
pf_set.add_argument('flags',
nargs=argparse.REMAINDER,
help="custom flags, use quotes")
p_color = subparsers.add_parser('color',
description='display and modify branch coloring of the ll sub-command.')
description='display and modify branch coloring of the ll sub-command.',
help='color configuration')
p_color.set_defaults(func=f_color)
color_cmds = p_color.add_subparsers(dest='color_cmd',
help='additional help with sub-command -h')
@ -339,7 +461,8 @@ def main(argv=None):
help="available colors")
p_info = subparsers.add_parser('info',
description='list, add, or remove information items of the ll sub-command.')
description='list, add, or remove information items of the ll sub-command.',
help='information setting')
p_info.set_defaults(func=f_info)
info_cmds = p_info.add_subparsers(dest='info_cmd',
help='additional help with sub-command -h')
@ -347,11 +470,11 @@ def main(argv=None):
description='show used and unused information items of the ll sub-command')
info_cmds.add_parser('add', description='Enable information item.'
).add_argument('info_item',
choices=('branch', 'commit_msg', 'path'),
choices=info.ALL_INFO_ITEMS,
help="information item to add")
info_cmds.add_parser('rm', description='Disable information item.'
).add_argument('info_item',
choices=('branch', 'commit_msg', 'path'),
choices=info.ALL_INFO_ITEMS,
help="information item to delete")
@ -379,6 +502,7 @@ def main(argv=None):
p_ll.set_defaults(func=f_ll)
p_context = subparsers.add_parser('context',
help='set context',
description='Set and remove context. A context is a group.'
' When set, all operations apply only to repos in that group.')
p_context.add_argument('choice',
@ -388,7 +512,8 @@ def main(argv=None):
p_context.set_defaults(func=f_context)
p_ls = subparsers.add_parser(
'ls', description='display names of all repos, or path of a chosen repo')
'ls', help='show repo(s) or repo path',
description='display names of all repos, or path of a chosen repo')
p_ls.add_argument('repo',
nargs='?',
choices=utils.get_repos(),
@ -396,7 +521,8 @@ def main(argv=None):
p_ls.set_defaults(func=f_ls)
p_group = subparsers.add_parser(
'group', description='list, add, or remove repo group(s)')
'group', description='list, add, or remove repo group(s)',
help='group repos')
p_group.set_defaults(func=f_group)
group_cmds = p_group.add_subparsers(dest='group_cmd',
help='additional help with sub-command -h')
@ -410,6 +536,7 @@ def main(argv=None):
help="repo(s) to be grouped")
pg_add.add_argument('-n', '--name',
dest='gname',
type=_group_name,
metavar='group-name',
required=True,
help="group name")
@ -429,6 +556,7 @@ def main(argv=None):
choices=utils.get_groups(),
help="existing group to rename")
pg_rename.add_argument('new_name', metavar='new-name',
type=_group_name,
help="new group name")
group_cmds.add_parser('rm',
description='Remove group(s).').add_argument('to_ungroup',
@ -439,6 +567,7 @@ def main(argv=None):
# superman mode
p_super = subparsers.add_parser(
'super',
help='run any git command/alias',
description='Superman mode: delegate any git command/alias in specified or '
'all repo(s).\n'
'Examples:\n \t gita super myrepo1 commit -am "fix a bug"\n'
@ -446,7 +575,7 @@ def main(argv=None):
p_super.add_argument(
'man',
nargs=argparse.REMAINDER,
help="execute arbitrary git command/alias for specified or all repos "
help="execute arbitrary git command/alias for specified or all repos\n"
"Example: gita super myrepo1 diff --name-only --staged "
"Another: gita super checkout master ")
p_super.set_defaults(func=f_super)
@ -454,6 +583,7 @@ def main(argv=None):
# shell mode
p_shell = subparsers.add_parser(
'shell',
help='run any shell command',
description='shell mode: delegate any shell command in specified or '
'all repo(s).\n'
'Examples:\n \t gita shell pwd\n'
@ -470,7 +600,7 @@ def main(argv=None):
cmds = utils.get_cmds_from_files()
for name, data in cmds.items():
help = data.get('help')
cmd = data.get('cmd') or name
cmd = data['cmd']
if data.get('allow_all'):
choices = utils.get_choices()
nargs = '*'
@ -481,7 +611,14 @@ def main(argv=None):
help += ' for the chosen repo(s) or group(s)'
sp = subparsers.add_parser(name, description=help)
sp.add_argument('repo', nargs=nargs, choices=choices, help=help)
sp.set_defaults(func=f_git_cmd, cmd=cmd.split())
is_shell = bool(data.get('shell'))
sp.add_argument('-s', '--shell', default=is_shell, type=bool,
help='If set, run in shell mode')
if is_shell:
cmd = [cmd]
else:
cmd = cmd.split()
sp.set_defaults(func=f_git_cmd, cmd=cmd)
args = p.parse_args(argv)

89
gita/cmds.json Normal file
View file

@ -0,0 +1,89 @@
{
"br":{
"cmd": "git branch -vv",
"help":"show local branches"},
"clean":{
"cmd": "git clean -dfx",
"help": "remove all untracked files/folders"},
"diff":{
"cmd": "git diff",
"help": "git show differences"},
"difftool":{
"cmd": "git difftool",
"disable_async": true,
"help": "show differences using a tool"
},
"fetch":{
"cmd": "git fetch",
"allow_all": true,
"help": "fetch remote update"
},
"last":{
"cmd": "git log -1 HEAD",
"help": "show log information of HEAD"
},
"log":
{"cmd": "git log",
"disable_async": true,
"help": "show logs"
},
"merge":{
"cmd": "git merge @{u}",
"help": "merge remote updates"
},
"mergetool":{
"cmd": "git mergetool",
"disable_async": true,
"help": "merge updates with a tool"
},
"patch":{
"cmd": "git format-patch HEAD~",
"help": "make a patch"
},
"pull":{
"cmd": "git pull",
"allow_all": true,
"help": "pull remote updates"
},
"push":{
"cmd": "git push",
"help": "push the local updates"
},
"rebase":{
"cmd": "git rebase",
"help": "rebase from master"
},
"reflog":{
"cmd": "git reflog",
"help": "show ref logs"
},
"remote":{
"cmd": "git remote -v",
"help": "show remote settings"
},
"reset":{
"cmd": "git reset",
"help": "reset repo(s)"
},
"show":{
"cmd": "git show",
"disable_async": true,
"help": "show detailed commit information"
},
"stash":{
"cmd": "git stash",
"help": "store uncommited changes"
},
"stat":{
"cmd": "git diff --stat",
"help": "show edit statistics"
},
"st":{
"cmd": "git status",
"help": "show status"
},
"tag":{
"cmd": "git tag -n",
"help": "show tags"
}
}

View file

@ -1,65 +0,0 @@
br:
cmd: branch -vv
help: show local branches
clean:
cmd: clean -dfx
help: remove all untracked files/folders
diff:
help: show differences
difftool:
disable_async: true
help: show differences using a tool
fetch:
allow_all: true
help: fetch remote update
last:
cmd: log -1 HEAD
help: show log information of HEAD
log:
disable_async: true
help: show logs
merge:
cmd: merge @{u}
help: merge remote updates
mergetool:
disable_async: true
help: merge updates with a tool
patch:
cmd: format-patch HEAD~
help: make a patch
pull:
allow_all: true
help: pull remote updates
push:
help: push the local updates
rebase:
help: rebase from master
reflog:
help: show ref logs
remote:
cmd: remote -v
help: show remote settings
reset:
help: reset repo(s)
shortlog:
disable_async: true
help: show short log
show:
disable_async: true
help: show detailed commit information
show-branch:
disable_async: true
help: show detailed branch information
stash:
help: store uncommited changes
stat:
cmd: diff --stat
help: show edit statistics
st:
help: show status
tag:
cmd: tag -n
help: show tags
whatchanged:
disable_async: true
help: show detailed log

View file

@ -1,16 +1,17 @@
import os
def get_config_dir() -> str:
parent = os.environ.get('XDG_CONFIG_HOME') or os.path.join(
os.path.expanduser('~'), '.config')
root = os.path.join(parent, "gita")
return root
def get_config_dir(root=None) -> str:
if root is None:
root = os.environ.get('XDG_CONFIG_HOME') or os.path.join(
os.path.expanduser('~'), '.config')
return os.path.join(root, "gita")
else:
return os.path.join(root, ".gita")
def get_config_fname(fname: str) -> str:
def get_config_fname(fname: str, root=None) -> str:
"""
Return the file name that stores the repo locations.
"""
root = get_config_dir()
return os.path.join(root, fname)
return os.path.join(get_config_dir(root), fname)

View file

@ -1,5 +1,5 @@
import os
import sys
import csv
import yaml
import subprocess
from enum import Enum
@ -31,40 +31,42 @@ class Color(str, Enum):
b_purple = '\x1b[35;1m'
b_cyan = '\x1b[36;1m'
b_white = '\x1b[37;1m'
underline = '\x1B[4m'
def show_colors(): # pragma: no cover
"""
"""
names = {c.value: c.name for c in Color}
for i, c in enumerate(Color, start=1):
if c != Color.end:
if c != Color.end and c != Color.underline:
print(f'{c.value}{c.name:<8} ', end='')
if i % 9 == 0:
print()
print(f'{Color.end}')
for situation, c in sorted(get_color_encoding().items()):
print(f'{situation:<12}: {c}{names[c]:<8}{Color.end} ')
print(f'{situation:<12}: {Color[c].value}{c:<8}{Color.end} ')
@lru_cache()
def get_color_encoding() -> Dict[str, str]:
"""
Return color scheme for different local/remote situations.
In the format of {situation: color name}
"""
# custom settings
yml_config = Path(common.get_config_fname('color.yml'))
if yml_config.is_file():
with open(yml_config, 'r') as stream:
colors = yaml.load(stream, Loader=yaml.FullLoader)
csv_config = Path(common.get_config_fname('color.csv'))
if csv_config.is_file():
with open(csv_config, 'r') as f:
reader = csv.DictReader(f)
colors = next(reader)
else:
colors = {
'no-remote': Color.white.value,
'in-sync': Color.green.value,
'diverged': Color.red.value,
'local-ahead': Color.purple.value,
'remote-ahead': Color.yellow.value,
'no-remote': Color.white.name,
'in-sync': Color.green.name,
'diverged': Color.red.name,
'local-ahead': Color.purple.name,
'remote-ahead': Color.yellow.name,
}
return colors
@ -80,6 +82,7 @@ def get_info_funcs() -> List[Callable[[str], str]]:
all_info_items = {
'branch': get_repo_status,
'commit_msg': get_commit_msg,
'commit_time': get_commit_time,
'path': get_path,
}
return [all_info_items[k] for k in to_display]
@ -90,23 +93,26 @@ def get_info_items() -> List[str]:
Return the information items to be displayed in the `gita ll` command.
"""
# custom settings
yml_config = Path(common.get_config_fname('info.yml'))
if yml_config.is_file():
with open(yml_config, 'r') as stream:
display_items = yaml.load(stream, Loader=yaml.FullLoader)
csv_config = Path(common.get_config_fname('info.csv'))
if csv_config.is_file():
with open(csv_config, 'r') as f:
reader = csv.reader(f)
display_items = next(reader)
display_items = [x for x in display_items if x in ALL_INFO_ITEMS]
else:
# default settings
display_items = ['branch', 'commit_msg']
display_items = ['branch', 'commit_msg', 'commit_time']
return display_items
def get_path(path):
return f'{Color.cyan}{path}{Color.end}'
def get_path(prop: Dict[str, str]) -> str:
return f'{Color.cyan}{prop["path"]}{Color.end}'
# TODO: do we need to add the flags here too?
def get_head(path: str) -> str:
result = subprocess.run('git rev-parse --abbrev-ref HEAD'.split(),
result = subprocess.run('git symbolic-ref -q --short HEAD || git describe --tags --exact-match',
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
universal_newlines=True,
@ -114,12 +120,12 @@ def get_head(path: str) -> str:
return result.stdout.strip()
def run_quiet_diff(args: List[str]) -> bool:
def run_quiet_diff(flags: List[str], args: List[str]) -> int:
"""
Return the return code of git diff `args` in quiet mode
"""
result = subprocess.run(
['git', 'diff', '--quiet'] + args,
['git'] + flags + ['diff', '--quiet'] + args,
stderr=subprocess.DEVNULL,
)
return result.returncode
@ -135,50 +141,68 @@ def get_common_commit() -> str:
return result.stdout.strip()
def has_untracked() -> bool:
def has_untracked(flags: List[str]) -> bool:
"""
Return True if untracked file/folder exists
"""
result = subprocess.run('git ls-files -zo --exclude-standard'.split(),
cmd = ['git'] + flags + 'ls-files -zo --exclude-standard'.split()
result = subprocess.run(cmd,
stdout=subprocess.PIPE)
return bool(result.stdout)
def get_commit_msg(path: str) -> str:
def get_commit_msg(prop: Dict[str, str]) -> str:
"""
Return the last commit message.
"""
# `git show-branch --no-name HEAD` is faster than `git show -s --format=%s`
result = subprocess.run('git show-branch --no-name HEAD'.split(),
cmd = ['git'] + prop['flags'] + 'show-branch --no-name HEAD'.split()
result = subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
universal_newlines=True,
cwd=path)
cwd=prop['path'])
return result.stdout.strip()
def get_repo_status(path: str, no_colors=False) -> str:
head = get_head(path)
dirty, staged, untracked, color = _get_repo_status(path, no_colors)
def get_commit_time(prop: Dict[str, str]) -> str:
"""
Return the last commit time in parenthesis.
"""
cmd = ['git'] + prop['flags'] + 'log -1 --format=%cd --date=relative'.split()
result = subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
universal_newlines=True,
cwd=prop['path'])
return f"({result.stdout.strip()})"
def get_repo_status(prop: Dict[str, str], no_colors=False) -> str:
head = get_head(prop['path'])
dirty, staged, untracked, color = _get_repo_status(prop, no_colors)
if color:
return f'{color}{head+" "+dirty+staged+untracked:<10}{Color.end}'
return f'{head+" "+dirty+staged+untracked:<10}'
def _get_repo_status(path: str, no_colors: bool) -> Tuple[str]:
def _get_repo_status(prop: Dict[str, str], no_colors: bool) -> Tuple[str]:
"""
Return the status of one repo
"""
path = prop['path']
flags = prop['flags']
os.chdir(path)
dirty = '*' if run_quiet_diff([]) else ''
staged = '+' if run_quiet_diff(['--cached']) else ''
untracked = '_' if has_untracked() else ''
dirty = '*' if run_quiet_diff(flags, []) else ''
staged = '+' if run_quiet_diff(flags, ['--cached']) else ''
untracked = '_' if has_untracked(flags) else ''
if no_colors:
return dirty, staged, untracked, ''
colors = get_color_encoding()
diff_returncode = run_quiet_diff(['@{u}', '@{0}'])
colors = {situ: Color[name].value
for situ, name in get_color_encoding().items()}
diff_returncode = run_quiet_diff(flags, ['@{u}', '@{0}'])
has_no_remote = diff_returncode == 128
has_no_diff = diff_returncode == 0
if has_no_remote:
@ -187,9 +211,9 @@ def _get_repo_status(path: str, no_colors: bool) -> Tuple[str]:
color = colors['in-sync']
else:
common_commit = get_common_commit()
outdated = run_quiet_diff(['@{u}', common_commit])
outdated = run_quiet_diff(flags, ['@{u}', common_commit])
if outdated:
diverged = run_quiet_diff(['@{0}', common_commit])
diverged = run_quiet_diff(flags, ['@{0}', common_commit])
color = colors['diverged'] if diverged else colors['remote-ahead']
else: # local is ahead of remote
color = colors['local-ahead']
@ -199,5 +223,6 @@ def _get_repo_status(path: str, no_colors: bool) -> Tuple[str]:
ALL_INFO_ITEMS = {
'branch': get_repo_status,
'commit_msg': get_commit_msg,
'commit_time': get_commit_time,
'path': get_path,
}

View file

@ -1,15 +1,53 @@
import os
import yaml
import json
import csv
import asyncio
import platform
import subprocess
from functools import lru_cache, partial
from pathlib import Path
from typing import List, Dict, Coroutine, Union, Iterator
from typing import List, Dict, Coroutine, Union, Iterator, Tuple
from collections import Counter, defaultdict
from . import info
from . import common
# TODO: python3.9 pathlib has is_relative_to() function
def is_relative_to(kid: str, parent: str) -> bool:
"""
Both the `kid` and `parent` should be absolute path
"""
return parent == os.path.commonpath((kid, parent))
@lru_cache()
def get_repos(root=None) -> Dict[str, Dict[str, str]]:
"""
Return a `dict` of repo name to repo absolute path and repo type
@param root: Use local config if set. If None, use either global or local
config depending on cwd.
"""
path_file = common.get_config_fname('repos.csv', root)
repos = {}
if os.path.isfile(path_file) and os.stat(path_file).st_size > 0:
with open(path_file) as f:
rows = csv.DictReader(f, ['path', 'name', 'type', 'flags'],
restval='') # it's actually a reader
repos = {r['name']:
{'path': r['path'], 'type': r['type'],
'flags': r['flags'].split()}
for r in rows if is_git(r['path'], is_bare=True)}
if root is None: # detect if inside a main path
cwd = os.getcwd()
for prop in repos.values():
path = prop['path']
if prop['type'] == 'm' and is_relative_to(cwd, path):
return get_repos(path)
return repos
@lru_cache()
def get_context() -> Union[Path, None]:
"""
@ -21,42 +59,18 @@ def get_context() -> Union[Path, None]:
return matches[0] if matches else None
@lru_cache()
def get_repos() -> Dict[str, str]:
"""
Return a `dict` of repo name to repo absolute path
"""
path_file = common.get_config_fname('repo_path')
repos = {}
# Each line is a repo path and repo name separated by ,
if os.path.isfile(path_file) and os.stat(path_file).st_size > 0:
with open(path_file) as f:
for line in f:
line = line.rstrip()
if not line: # blank line
continue
path, name = line.split(',')
if not is_git(path):
continue
if name not in repos:
repos[name] = path
else: # repo name collision for different paths: include parent path name
par_name = os.path.basename(os.path.dirname(path))
repos[os.path.join(par_name, name)] = path
return repos
@lru_cache()
def get_groups() -> Dict[str, List[str]]:
"""
Return a `dict` of group name to repo names.
"""
fname = common.get_config_fname('groups.yml')
fname = common.get_config_fname('groups.csv')
groups = {}
# Each line is a repo path and repo name separated by ,
if os.path.isfile(fname) and os.stat(fname).st_size > 0:
with open(fname, 'r') as f:
groups = yaml.load(f, Loader=yaml.FullLoader)
rows = csv.reader(f, delimiter=':')
groups = {r[0]: r[1].split() for r in rows}
return groups
@ -75,10 +89,12 @@ def get_choices() -> List[Union[str, None]]:
return choices
def is_git(path: str) -> bool:
def is_git(path: str, is_bare=False) -> bool:
"""
Return True if the path is a git repo.
"""
if not os.path.exists(path):
return False
# An alternative is to call `git rev-parse --is-inside-work-tree`
# I don't see why that one is better yet.
# For a regular git repo, .git is a folder, for a worktree repo, .git is a file.
@ -88,59 +104,172 @@ def is_git(path: str) -> bool:
# `git rev-parse --git-common-dir`
loc = os.path.join(path, '.git')
# TODO: we can display the worktree repos in a different font.
return os.path.exists(loc)
if os.path.exists(loc):
return True
if not is_bare:
return False
# detect bare repo
got = subprocess.run('git rev-parse --is-bare-repository'.split(),
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
cwd=path
)
if got.returncode == 0 and got.stdout == b'true\n':
return True
return False
def rename_repo(repos: Dict[str, str], repo: str, new_name: str):
def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str):
"""
Write new repo name to file
"""
path = repos[repo]
if new_name in repos:
print(f"{new_name} is already in use!")
return
prop = repos[repo]
del repos[repo]
repos[new_name] = path
write_to_repo_file(repos, 'w')
repos[new_name] = prop
# write to local config if inside a main path
main_paths = (prop['path'] for prop in repos.values() if prop['type'] == 'm')
cwd = os.getcwd()
is_local_config = True
for p in main_paths:
if is_relative_to(cwd, p):
write_to_repo_file(repos, 'w', p)
break
else: # global config
write_to_repo_file(repos, 'w')
is_local_config = False
# update groups only when outside any main repos
if is_local_config:
return
groups = get_groups()
for g, members in groups.items():
if repo in members:
members.remove(repo)
members.append(new_name)
groups[g] = sorted(members)
write_to_groups_file(groups, 'w')
def write_to_repo_file(repos: Dict[str, str], mode: str):
def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str, root=None):
"""
@param repos: each repo is {name: {properties}}
"""
data = ''.join(f'{path},{name}\n' for name, path in repos.items())
fname = common.get_config_fname('repo_path')
data = [(prop['path'], name, prop['type'], ' '.join(prop['flags']))
for name, prop in repos.items()]
fname = common.get_config_fname('repos.csv', root)
os.makedirs(os.path.dirname(fname), exist_ok=True)
with open(fname, mode) as f:
f.write(data)
with open(fname, mode, newline='') as f:
writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerows(data)
def write_to_groups_file(groups: Dict[str, List[str]], mode: str):
"""
"""
fname = common.get_config_fname('groups.yml')
fname = common.get_config_fname('groups.csv')
os.makedirs(os.path.dirname(fname), exist_ok=True)
if not groups: # all groups are deleted
open(fname, 'w').close()
else:
with open(fname, mode) as f:
yaml.dump(groups, f, default_flow_style=None)
with open(fname, mode, newline='') as f:
data = [
(group, ' '.join(repos))
for group, repos in groups.items()
]
writer = csv.writer(f, delimiter=':', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerows(data)
def add_repos(repos: Dict[str, str], new_paths: List[str]):
def _make_name(path: str, repos: Dict[str, Dict[str, str]],
name_counts: Counter) -> str:
"""
Write new repo paths to file
Given a new repo `path`, create a repo name. By default, basename is used.
If name collision exists, further include parent path name.
@param path: It should not be in `repos` and is absolute
"""
name = os.path.basename(os.path.normpath(path))
if name in repos or name_counts[name] > 1:
par_name = os.path.basename(os.path.dirname(path))
return os.path.join(par_name, name)
return name
def _get_repo_type(path, repo_type, root) -> str:
"""
"""
if repo_type != '': # explicitly set
return repo_type
if root is not None and os.path.normpath(root) == os.path.normpath(path):
return 'm'
return ''
def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str],
repo_type='', root=None, is_bare=False) -> Dict[str, Dict[str, str]]:
"""
Write new repo paths to file; return the added repos.
@param repos: name -> path
"""
existing_paths = set(repos.values())
new_paths = set(os.path.abspath(p) for p in new_paths if is_git(p))
existing_paths = {prop['path'] for prop in repos.values()}
new_paths = {p for p in new_paths if is_git(p, is_bare)}
new_paths = new_paths - existing_paths
new_repos = {}
if new_paths:
print(f"Found {len(new_paths)} new repo(s).")
new_repos = {
os.path.basename(os.path.normpath(path)): path
for path in new_paths}
write_to_repo_file(new_repos, 'a+')
name_counts = Counter(
os.path.basename(os.path.normpath(p)) for p in new_paths
)
new_repos = {_make_name(path, repos, name_counts): {
'path': path,
'type': _get_repo_type(path, repo_type, root),
'flags': '',
} for path in new_paths}
# When root is not None, we could optionally set its type to 'm', i.e.,
# main repo.
write_to_repo_file(new_repos, 'a+', root)
else:
print('No new repos found!')
return new_repos
def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[str, ...]:
"""
Return relative parent strings
For example, if `repo_path` is /a/b/c/d/here, and one of `paths` is /a/b/
then return (b, c, d)
"""
for p in paths:
if is_relative_to(repo_path, p):
break
else:
return ()
return (os.path.basename(p),
*os.path.normpath(os.path.relpath(repo_path, p)).split(os.sep)[:-1])
def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]
) -> Dict[str, List[str]]:
"""
"""
# FIXME: the upstream code should make sure that paths are all independent
# i.e., each repo should be contained in one and only one path
new_groups = defaultdict(list)
for repo_name, prop in repos.items():
hash = _generate_dir_hash(prop['path'], paths)
if not hash:
continue
for i in range(1, len(hash)+1):
group_name = '-'.join(hash[:i])
new_groups[group_name].append(repo_name)
# FIXME: need to make sure the new group names don't clash with old ones
# or repo names
return new_groups
def parse_clone_config(fname: str) -> Iterator[List[str]]:
@ -157,6 +286,7 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s
Run `cmds` asynchronously in `path` directory. Return the `path` if
execution fails.
"""
# TODO: deprecated since 3.8, will be removed in 3.10
process = await asyncio.create_subprocess_exec(
*cmds,
stdin=asyncio.subprocess.DEVNULL,
@ -199,7 +329,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
return errors
def describe(repos: Dict[str, str], no_colors: bool=False) -> str:
def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str:
"""
Return the status of all repos
"""
@ -213,9 +343,14 @@ def describe(repos: Dict[str, str], no_colors: bool=False) -> str:
funcs[idx] = partial(get_repo_status, no_colors=True)
for name in sorted(repos):
path = repos[name]
info_items = ' '.join(f(path) for f in funcs)
yield f'{name:<{name_width}}{info_items}'
info_items = ' '.join(f(repos[name]) for f in funcs)
if repos[name]['type'] == 'm':
# ANSI color code also takes length in Python
name = f'{info.Color.underline}{name}{info.Color.end}'
width = name_width + 8
yield f'{name:<{width}}{info_items}'
else:
yield f'{name:<{name_width}}{info_items}'
def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
@ -231,17 +366,17 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
}
"""
# default config file
fname = os.path.join(os.path.dirname(__file__), "cmds.yml")
with open(fname, 'r') as stream:
cmds = yaml.load(stream, Loader=yaml.FullLoader)
fname = os.path.join(os.path.dirname(__file__), "cmds.json")
with open(fname, 'r') as f:
cmds = json.load(f)
# custom config file
root = common.get_config_dir()
fname = os.path.join(root, 'cmds.yml')
fname = os.path.join(root, 'cmds.json')
custom_cmds = {}
if os.path.isfile(fname) and os.path.getsize(fname):
with open(fname, 'r') as stream:
custom_cmds = yaml.load(stream, Loader=yaml.FullLoader)
with open(fname, 'r') as f:
custom_cmds = json.load(f)
# custom commands shadow default ones
cmds.update(custom_cmds)