862 lines
28 KiB
Python
862 lines
28 KiB
Python
"""
|
|
Gita manages multiple git repos. It has two functionalities
|
|
|
|
1. display the status of multiple repos side by side
|
|
2. delegate git commands/aliases from any working directory
|
|
|
|
Examples:
|
|
gita ls
|
|
gita fetch
|
|
gita stat myrepo2
|
|
gita super myrepo1 commit -am 'add some cool feature'
|
|
|
|
For bash auto completion, download and source
|
|
https://github.com/nosarthur/gita/blob/master/.gita-completion.bash
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import csv
|
|
import argparse
|
|
import argcomplete
|
|
import subprocess
|
|
from functools import partial
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
import glob
|
|
from typing import Dict, Optional
|
|
|
|
from . import utils, info, common, io, get_version
|
|
|
|
|
|
def _group_name(name: str, exclude_old_names=True) -> str:
|
|
"""
|
|
Return valid group name
|
|
"""
|
|
repos = utils.get_repos()
|
|
if name in repos:
|
|
print(f"Cannot use group name {name} since it's a repo name.")
|
|
sys.exit(1)
|
|
if exclude_old_names:
|
|
if name in utils.get_groups():
|
|
print(f"Cannot use group name {name} since it's already in use.")
|
|
sys.exit(1)
|
|
if name in {"none", "auto"}:
|
|
print(f"Cannot use group name {name} since it's a reserved keyword.")
|
|
sys.exit(1)
|
|
return name
|
|
|
|
|
|
def _path_name(name: str) -> str:
|
|
"""
|
|
Return absolute path
|
|
"""
|
|
if name:
|
|
return os.path.abspath(name)
|
|
return ""
|
|
|
|
|
|
def f_add(args: argparse.Namespace):
|
|
repos = utils.get_repos()
|
|
paths = args.paths
|
|
dry_run = args.dry_run
|
|
groups = utils.get_groups()
|
|
if args.recursive or args.auto_group:
|
|
paths = (
|
|
p.rstrip(os.path.sep)
|
|
for p in chain.from_iterable(
|
|
glob.glob(os.path.join(p, "**/"), recursive=True) for p in args.paths
|
|
)
|
|
)
|
|
new_repos = utils.add_repos(
|
|
repos,
|
|
paths,
|
|
include_bare=args.bare,
|
|
exclude_submodule=args.skip_submodule,
|
|
dry_run=dry_run,
|
|
)
|
|
if dry_run:
|
|
return
|
|
if new_repos and args.auto_group:
|
|
new_groups = utils.auto_group(new_repos, args.paths)
|
|
if new_groups:
|
|
print(f"Created {len(new_groups)} new group(s).")
|
|
utils.write_to_groups_file(new_groups, "a+")
|
|
if new_repos and args.group:
|
|
gname = args.group
|
|
gname_repos = set(groups[gname]["repos"])
|
|
gname_repos.update(new_repos)
|
|
groups[gname]["repos"] = sorted(gname_repos)
|
|
print(f"Added {len(new_repos)} repos to the {gname} group")
|
|
utils.write_to_groups_file(groups, "w")
|
|
|
|
|
|
def f_rename(args: argparse.Namespace):
|
|
repos = utils.get_repos()
|
|
utils.rename_repo(repos, args.repo[0], args.new_name)
|
|
|
|
|
|
def f_flags(args: argparse.Namespace):
|
|
cmd = args.flags_cmd or "ll"
|
|
repos = utils.get_repos()
|
|
if cmd == "ll":
|
|
for r, prop in repos.items():
|
|
if prop["flags"]:
|
|
print(f"{r}: {prop['flags']}")
|
|
elif cmd == "set":
|
|
# when in memory, flags are List[str], when on disk, they are space
|
|
# delimited str
|
|
repos[args.repo]["flags"] = args.flags
|
|
utils.write_to_repo_file(repos, "w")
|
|
|
|
|
|
def f_color(args: argparse.Namespace):
|
|
cmd = args.color_cmd or "ll"
|
|
if cmd == "ll": # pragma: no cover
|
|
info.show_colors()
|
|
elif cmd == "set":
|
|
colors = info.get_color_encoding()
|
|
colors[args.situation] = args.color
|
|
csv_config = common.get_config_fname("color.csv")
|
|
with open(csv_config, "w", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=colors)
|
|
writer.writeheader()
|
|
writer.writerow(colors)
|
|
elif cmd == "reset":
|
|
Path(common.get_config_fname("color.csv")).unlink(missing_ok=True)
|
|
|
|
|
|
def f_info(args: argparse.Namespace):
|
|
to_display = info.get_info_items()
|
|
cmd = args.info_cmd or "ll"
|
|
if cmd == "ll":
|
|
print("In use:", ",".join(to_display))
|
|
unused = sorted(list(set(info.ALL_INFO_ITEMS) - set(to_display)))
|
|
if unused:
|
|
print("Unused:", ",".join(unused))
|
|
return
|
|
if cmd == "add" and args.info_item not in to_display:
|
|
to_display.append(args.info_item)
|
|
csv_config = common.get_config_fname("info.csv")
|
|
with open(csv_config, "w", newline="") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(to_display)
|
|
elif cmd == "rm" and args.info_item in to_display:
|
|
to_display.remove(args.info_item)
|
|
csv_config = common.get_config_fname("info.csv")
|
|
with open(csv_config, "w", newline="") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(to_display)
|
|
elif cmd == "set-length":
|
|
csv_config = common.get_config_fname("layout.csv")
|
|
print(f"Settings are in {csv_config}")
|
|
defaults = {
|
|
"branch": 19,
|
|
"symbols": 5,
|
|
"branch_name": 27,
|
|
"commit_msg": 0,
|
|
"commit_time": 0, # 0 means no limit
|
|
"path": 30,
|
|
}
|
|
with open(csv_config, "w", newline="") as f:
|
|
writer = csv.DictWriter(f, fieldnames=defaults)
|
|
writer.writeheader()
|
|
writer.writerow(defaults)
|
|
|
|
|
|
def f_clone(args: argparse.Namespace):
|
|
|
|
if args.dry_run:
|
|
if args.from_file:
|
|
for url, repo_name, abs_path in io.parse_clone_config(args.clonee):
|
|
print(f"git clone {url} {abs_path}")
|
|
else:
|
|
print(f"git clone {args.clonee}")
|
|
return
|
|
|
|
if args.directory:
|
|
path = args.directory
|
|
else:
|
|
path = Path.cwd()
|
|
|
|
if not args.from_file:
|
|
subprocess.run(["git", "clone", args.clonee], cwd=path)
|
|
# add the cloned repo to gita; group is also supported
|
|
cloned_path = os.path.join(path, args.clonee.split("/")[-1].split(".")[0])
|
|
args.paths = [cloned_path]
|
|
args.recursive = args.auto_group = args.bare = args.skip_submodule = False
|
|
f_add(args)
|
|
return
|
|
|
|
# TODO: add repos to group too
|
|
repos, groups = io.parse_clone_config(args.clonee)
|
|
if args.preserve_path:
|
|
utils.exec_async_tasks(
|
|
utils.run_async(repo_name, path, ["git", "clone", r["url"], r["path"]])
|
|
for repo_name, r in repos.items()
|
|
)
|
|
else:
|
|
utils.exec_async_tasks(
|
|
utils.run_async(repo_name, path, ["git", "clone", r["url"]])
|
|
for repo_name, r in repos.items()
|
|
)
|
|
|
|
|
|
def f_freeze(args):
|
|
"""
|
|
print repo and group information for future cloning
|
|
"""
|
|
ctx = utils.get_context()
|
|
if args.group is None and ctx:
|
|
args.group = ctx.stem
|
|
repos = utils.get_repos()
|
|
group_name = args.group
|
|
group_repos = None
|
|
if group_name: # only display repos in this group
|
|
group_repos = utils.get_groups()[group_name]["repos"]
|
|
repos = {k: repos[k] for k in group_repos if k in repos}
|
|
seen = {""}
|
|
# print(repos)
|
|
for name, prop in repos.items():
|
|
path = prop["path"]
|
|
url = ""
|
|
# FIXME: capture_output is new in 3.7. Maybe drop support for 3.6
|
|
cp = subprocess.run(
|
|
["git", "remote", "-v"],
|
|
cwd=path,
|
|
universal_newlines=True,
|
|
capture_output=True,
|
|
)
|
|
lines = cp.stdout.split("\n")
|
|
if cp.returncode == 0 and len(lines) > 0:
|
|
parts = lines[0].split()
|
|
if len(parts) > 1:
|
|
url = parts[1]
|
|
if url not in seen:
|
|
seen.add(url)
|
|
# TODO: add another field to distinguish regular repo or worktree or submodule
|
|
print(f"{url},{name},{path},")
|
|
# group information: these lines don't have URL
|
|
if group_name:
|
|
group_path = utils.get_groups()[group_name]["path"]
|
|
print(f",{group_name},{group_path},{'|'.join(group_repos)}")
|
|
else: # show all groups
|
|
for gname, g in utils.get_groups().items():
|
|
group_repos = "|".join(g["repos"])
|
|
print(f",{gname},{g['path']},{group_repos}")
|
|
|
|
|
|
def f_ll(args: argparse.Namespace):
|
|
"""
|
|
Display details of all repos
|
|
"""
|
|
repos = utils.get_repos()
|
|
ctx = utils.get_context()
|
|
if args.group is None and ctx:
|
|
args.group = ctx.stem
|
|
group_repos = None
|
|
if args.group: # only display repos in this group
|
|
group_repos = utils.get_groups()[args.group]["repos"]
|
|
repos = {k: repos[k] for k in group_repos if k in repos}
|
|
if args.g: # display by group
|
|
if group_repos:
|
|
print(f"{args.group}:")
|
|
for line in utils.describe(repos, no_colors=args.no_colors):
|
|
print(" ", line)
|
|
else:
|
|
for g, prop in utils.get_groups().items():
|
|
print(f"{g}:")
|
|
g_repos = {k: repos[k] for k in prop["repos"] if k in repos}
|
|
for line in utils.describe(g_repos, no_colors=args.no_colors):
|
|
print(" ", line)
|
|
else:
|
|
for line in utils.describe(repos, no_colors=args.no_colors):
|
|
print(line)
|
|
|
|
|
|
def f_ls(args: argparse.Namespace):
|
|
repos = utils.get_repos()
|
|
if args.repo: # one repo, show its path
|
|
print(repos[args.repo]["path"])
|
|
else: # show names of all repos
|
|
print(" ".join(repos))
|
|
|
|
|
|
def f_group(args: argparse.Namespace):
|
|
groups = utils.get_groups()
|
|
cmd = args.group_cmd or "ll"
|
|
if cmd == "ll":
|
|
if "to_show" in args and args.to_show:
|
|
gname = args.to_show
|
|
print(" ".join(groups[gname]["repos"]))
|
|
else:
|
|
for group, prop in groups.items():
|
|
print(f"{info.Color.underline}{group}{info.Color.end}: {prop['path']}")
|
|
for r in prop["repos"]:
|
|
print(" -", r)
|
|
elif cmd == "ls":
|
|
print(" ".join(groups))
|
|
elif cmd == "rename":
|
|
new_name = args.new_name
|
|
gname = args.gname
|
|
groups[new_name] = groups[gname]
|
|
del groups[gname]
|
|
utils.write_to_groups_file(groups, "w")
|
|
# change context
|
|
ctx = utils.get_context()
|
|
if ctx and ctx.stem == gname:
|
|
utils.replace_context(ctx, new_name)
|
|
elif cmd == "rm":
|
|
ctx = utils.get_context()
|
|
for name in args.to_ungroup:
|
|
del groups[name]
|
|
if ctx and str(ctx.stem) == name:
|
|
utils.replace_context(ctx, "")
|
|
utils.write_to_groups_file(groups, "w")
|
|
elif cmd == "add":
|
|
gname = args.gname
|
|
if gname in groups:
|
|
gname_repos = set(groups[gname]["repos"])
|
|
gname_repos.update(args.to_group)
|
|
groups[gname]["repos"] = sorted(gname_repos)
|
|
if "gpath" in args:
|
|
groups[gname]["path"] = args.gpath
|
|
utils.write_to_groups_file(groups, "w")
|
|
else:
|
|
gpath = ""
|
|
if "gpath" in args:
|
|
gpath = args.gpath
|
|
utils.write_to_groups_file(
|
|
{gname: {"repos": sorted(args.to_group), "path": gpath}}, "a+"
|
|
)
|
|
elif cmd == "rmrepo":
|
|
gname = args.gname
|
|
if gname in groups:
|
|
group = {
|
|
gname: {"repos": groups[gname]["repos"], "path": groups[gname]["path"]}
|
|
}
|
|
for repo in args.to_rm:
|
|
utils.delete_repo_from_groups(repo, group)
|
|
groups[gname] = group[gname]
|
|
utils.write_to_groups_file(groups, "w")
|
|
|
|
|
|
def f_context(args: argparse.Namespace):
|
|
choice = args.choice
|
|
ctx = utils.get_context()
|
|
if choice is None: # display current context
|
|
if ctx:
|
|
group = ctx.stem
|
|
print(f"{group}: {' '.join(utils.get_groups()[group]['repos'])}")
|
|
elif (Path(common.get_config_dir()) / "auto.context").exists():
|
|
print("auto: none detected!")
|
|
else:
|
|
print("Context is not set")
|
|
else: # set context
|
|
utils.replace_context(ctx, choice)
|
|
|
|
|
|
def f_rm(args: argparse.Namespace):
|
|
"""
|
|
Unregister repo(s) from gita
|
|
"""
|
|
path_file = common.get_config_fname("repos.csv")
|
|
if os.path.isfile(path_file):
|
|
repos = utils.get_repos()
|
|
group_updated = False
|
|
groups = utils.get_groups()
|
|
for repo in args.repo:
|
|
del repos[repo]
|
|
up = utils.delete_repo_from_groups(repo, groups)
|
|
group_updated = group_updated or up
|
|
if group_updated:
|
|
utils.write_to_groups_file(groups, "w")
|
|
|
|
utils.write_to_repo_file(repos, "w")
|
|
|
|
|
|
def f_git_cmd(args: argparse.Namespace):
|
|
"""
|
|
Delegate git command/alias defined in `args.cmd`. Asynchronous execution is
|
|
disabled for commands in the `args.async_blacklist`.
|
|
"""
|
|
if "_parsed_repos" in args:
|
|
repos = args._parsed_repos
|
|
else:
|
|
repos, _ = utils.parse_repos_and_rest(args.repo)
|
|
|
|
per_repo_cmds = []
|
|
for prop in repos.values():
|
|
cmds = args.cmd.copy()
|
|
if cmds[0] == "git" and prop["flags"]:
|
|
cmds[1:1] = prop["flags"]
|
|
per_repo_cmds.append(cmds)
|
|
|
|
# This async blacklist mechanism is broken if the git command name does
|
|
# not match with the gita command name.
|
|
if len(repos) == 1 or args.cmd[1] in args.async_blacklist:
|
|
for prop, cmds in zip(repos.values(), per_repo_cmds):
|
|
path = prop["path"]
|
|
print(path)
|
|
subprocess.run(cmds, cwd=path, shell=args.shell)
|
|
else: # run concurrent subprocesses
|
|
# Async execution cannot deal with multiple repos' user name/password.
|
|
# Here we shut off any user input in the async execution, and re-run
|
|
# the failed ones synchronously.
|
|
errors = utils.exec_async_tasks(
|
|
utils.run_async(repo_name, prop["path"], cmds)
|
|
for cmds, (repo_name, prop) in zip(per_repo_cmds, repos.items())
|
|
)
|
|
for path in errors:
|
|
if path:
|
|
print(path)
|
|
# FIXME: This is broken, flags are missing. But probably few
|
|
# people will use `gita flags`
|
|
subprocess.run(args.cmd, cwd=path)
|
|
|
|
|
|
def f_shell(args):
|
|
"""
|
|
Delegate shell command defined in `args.man`, which may or may not
|
|
contain repo names.
|
|
"""
|
|
repos, cmds = utils.parse_repos_and_rest(args.man, args.quote_mode)
|
|
if not cmds:
|
|
print("Missing commands")
|
|
sys.exit(2)
|
|
|
|
cmds = " ".join(cmds) # join the shell command into a single string
|
|
for name, prop in repos.items():
|
|
# TODO: pull this out as a function
|
|
got = subprocess.run(
|
|
cmds,
|
|
cwd=prop["path"],
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
print(utils.format_output(got.stdout.decode(), name))
|
|
|
|
|
|
def f_super(args):
|
|
"""
|
|
Delegate git command/alias defined in `args.man`, which may or may not
|
|
contain repo names.
|
|
"""
|
|
repos, cmds = utils.parse_repos_and_rest(args.man, args.quote_mode)
|
|
if not cmds:
|
|
print("Missing commands")
|
|
sys.exit(2)
|
|
|
|
args.cmd = ["git"] + cmds
|
|
args._parsed_repos = repos
|
|
args.shell = False
|
|
f_git_cmd(args)
|
|
|
|
|
|
def f_clear(_):
|
|
utils.write_to_groups_file({}, "w")
|
|
utils.write_to_repo_file({}, "w")
|
|
|
|
|
|
def main(argv=None):
|
|
p = argparse.ArgumentParser(
|
|
prog="gita", formatter_class=argparse.RawTextHelpFormatter, description=__doc__
|
|
)
|
|
subparsers = p.add_subparsers(
|
|
title="sub-commands", help="additional help with sub-command -h"
|
|
)
|
|
|
|
p.add_argument(
|
|
"-v", "--version", action="version", version=f"%(prog)s {get_version()}"
|
|
)
|
|
|
|
# bookkeeping sub-commands
|
|
p_add = subparsers.add_parser("add", description="add repo(s)", help="add repo(s)")
|
|
p_add.add_argument("paths", nargs="+", type=_path_name, help="repo(s) to add")
|
|
p_add.add_argument("-n", "--dry-run", action="store_true", help="dry run")
|
|
p_add.add_argument(
|
|
"-g",
|
|
"--group",
|
|
choices=utils.get_groups(),
|
|
help="add repo(s) to the specified group",
|
|
)
|
|
p_add.add_argument(
|
|
"-s", "--skip-submodule", action="store_true", help="skip submodule repo(s)"
|
|
)
|
|
xgroup = p_add.add_mutually_exclusive_group()
|
|
xgroup.add_argument(
|
|
"-r",
|
|
"--recursive",
|
|
action="store_true",
|
|
help="recursively add repo(s) in the given path(s).",
|
|
)
|
|
xgroup.add_argument(
|
|
"-a",
|
|
"--auto-group",
|
|
action="store_true",
|
|
help="recursively add repo(s) in the given path(s) "
|
|
"and create hierarchical groups based on folder structure.",
|
|
)
|
|
xgroup.add_argument("-b", "--bare", action="store_true", help="add bare repo(s)")
|
|
p_add.set_defaults(func=f_add)
|
|
|
|
p_rm = subparsers.add_parser(
|
|
"rm", description="remove repo(s)", help="remove repo(s)"
|
|
)
|
|
p_rm.add_argument(
|
|
"repo", nargs="+", choices=utils.get_repos(), help="remove the chosen repo(s)"
|
|
)
|
|
p_rm.set_defaults(func=f_rm)
|
|
|
|
p_freeze = subparsers.add_parser(
|
|
"freeze",
|
|
description="print all repo information",
|
|
help="print all repo information",
|
|
)
|
|
p_freeze.add_argument(
|
|
"-g",
|
|
"--group",
|
|
choices=utils.get_groups(),
|
|
help="freeze repos in the specified group",
|
|
)
|
|
p_freeze.set_defaults(func=f_freeze)
|
|
|
|
p_clone = subparsers.add_parser(
|
|
"clone", description="clone repos", help="clone repos"
|
|
)
|
|
p_clone.add_argument(
|
|
"clonee",
|
|
help="A URL or a config file.",
|
|
)
|
|
p_clone.add_argument(
|
|
"-C",
|
|
"--directory",
|
|
help="Change to DIRECTORY before doing anything.",
|
|
)
|
|
p_clone.add_argument(
|
|
"-p",
|
|
"--preserve-path",
|
|
dest="preserve_path",
|
|
action="store_true",
|
|
help="clone repo(s) in their original paths",
|
|
)
|
|
p_clone.add_argument(
|
|
"-n",
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="If set, show command without execution",
|
|
)
|
|
xgroup = p_clone.add_mutually_exclusive_group()
|
|
xgroup.add_argument(
|
|
"-g",
|
|
"--group",
|
|
choices=utils.get_groups(),
|
|
help="If set, add repo to the specified group after cloning, otherwise add to gita without group.",
|
|
)
|
|
xgroup.add_argument(
|
|
"-f",
|
|
"--from-file",
|
|
action="store_true",
|
|
help="If set, clone repos in a config file rendered from `gita freeze`",
|
|
)
|
|
p_clone.set_defaults(func=f_clone)
|
|
|
|
p_rename = subparsers.add_parser(
|
|
"rename", description="rename a repo", help="rename a repo"
|
|
)
|
|
p_rename.add_argument(
|
|
"repo", nargs=1, choices=utils.get_repos(), help="rename the chosen repo"
|
|
)
|
|
p_rename.add_argument("new_name", help="new name")
|
|
p_rename.set_defaults(func=f_rename)
|
|
|
|
p_flags = subparsers.add_parser(
|
|
"flags",
|
|
description="Set custom git flags for repo.",
|
|
help="git flags configuration",
|
|
)
|
|
p_flags.set_defaults(func=f_flags)
|
|
flags_cmds = p_flags.add_subparsers(
|
|
dest="flags_cmd", help="additional help with sub-command -h"
|
|
)
|
|
flags_cmds.add_parser("ll", description="display repos with custom flags")
|
|
pf_set = flags_cmds.add_parser("set", description="Set flags for repo.")
|
|
pf_set.add_argument("repo", choices=utils.get_repos(), help="repo name")
|
|
pf_set.add_argument(
|
|
"flags", nargs=argparse.REMAINDER, help="custom flags, use quotes"
|
|
)
|
|
|
|
p_color = subparsers.add_parser(
|
|
"color",
|
|
description="display and modify branch coloring of the ll sub-command.",
|
|
help="color configuration",
|
|
)
|
|
p_color.set_defaults(func=f_color)
|
|
color_cmds = p_color.add_subparsers(
|
|
dest="color_cmd", help="additional help with sub-command -h"
|
|
)
|
|
color_cmds.add_parser(
|
|
"ll",
|
|
description="display available colors and the current branch coloring in the ll sub-command",
|
|
)
|
|
color_cmds.add_parser("reset", description="reset color scheme.")
|
|
pc_set = color_cmds.add_parser(
|
|
"set", description="Set color for local/remote situation."
|
|
)
|
|
pc_set.add_argument(
|
|
"situation",
|
|
choices=info.get_color_encoding(),
|
|
help="5 possible local/remote situations",
|
|
)
|
|
pc_set.add_argument(
|
|
"color", choices=[c.name for c in info.Color], help="available colors"
|
|
)
|
|
|
|
p_info = subparsers.add_parser(
|
|
"info",
|
|
description="list, add, or remove information items of the ll sub-command.",
|
|
help="information setting",
|
|
)
|
|
p_info.set_defaults(func=f_info)
|
|
info_cmds = p_info.add_subparsers(
|
|
dest="info_cmd", help="additional help with sub-command -h"
|
|
)
|
|
info_cmds.add_parser(
|
|
"ll", description="show used and unused information items of the ll sub-command"
|
|
)
|
|
info_cmds.add_parser("add", description="Enable information item.").add_argument(
|
|
"info_item", choices=info.ALL_INFO_ITEMS, help="information item to add"
|
|
)
|
|
info_cmds.add_parser("rm", description="Disable information item.").add_argument(
|
|
"info_item", choices=info.ALL_INFO_ITEMS, help="information item to delete"
|
|
)
|
|
info_cmds.add_parser(
|
|
"set-length",
|
|
description="Set default column widths for information items. "
|
|
"The settings are in layout.csv",
|
|
)
|
|
|
|
ll_doc = f""" status symbols:
|
|
+: staged changes
|
|
*: unstaged changes
|
|
?: untracked files/folders
|
|
$: stashed changes
|
|
|
|
branch colors:
|
|
{info.Color.white}white{info.Color.end}: local has no remote
|
|
{info.Color.green}green{info.Color.end}: local is the same as remote
|
|
{info.Color.red}red{info.Color.end}: local has diverged from remote
|
|
{info.Color.purple}purple{info.Color.end}: local is ahead of remote (good for push)
|
|
{info.Color.yellow}yellow{info.Color.end}: local is behind remote (good for merge)"""
|
|
p_ll = subparsers.add_parser(
|
|
"ll",
|
|
help="display summary of all repos",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
description=ll_doc,
|
|
)
|
|
p_ll.add_argument(
|
|
"group",
|
|
nargs="?",
|
|
choices=utils.get_groups(),
|
|
help="show repos in the chosen group",
|
|
)
|
|
p_ll.add_argument(
|
|
"-C",
|
|
"--no-colors",
|
|
action="store_true",
|
|
help="Disable coloring on the branch names.",
|
|
)
|
|
p_ll.add_argument("-g", action="store_true", help="Show repo summaries by group.")
|
|
p_ll.set_defaults(func=f_ll)
|
|
|
|
p_context = subparsers.add_parser(
|
|
"context",
|
|
help="set context",
|
|
description="Set and remove context. A context is a group."
|
|
" When set, all operations apply only to repos in that group.",
|
|
)
|
|
p_context.add_argument(
|
|
"choice",
|
|
nargs="?",
|
|
choices=set().union(utils.get_groups(), {"none", "auto"}),
|
|
help="Without this argument, show current context. "
|
|
"Otherwise choose a group as context, or use 'auto', "
|
|
"which sets the context/group automatically based on "
|
|
"the current working directory. "
|
|
"To remove context, use 'none'. ",
|
|
)
|
|
p_context.set_defaults(func=f_context)
|
|
|
|
p_ls = subparsers.add_parser(
|
|
"ls",
|
|
help="show repo(s) or repo path",
|
|
description="display names of all repos, or path of a chosen repo",
|
|
)
|
|
p_ls.add_argument(
|
|
"repo",
|
|
nargs="?",
|
|
choices=utils.get_repos(),
|
|
help="show path of the chosen repo",
|
|
)
|
|
p_ls.set_defaults(func=f_ls)
|
|
|
|
p_group = subparsers.add_parser(
|
|
"group", description="list, add, or remove repo group(s)", help="group repos"
|
|
)
|
|
p_group.set_defaults(func=f_group)
|
|
group_cmds = p_group.add_subparsers(
|
|
dest="group_cmd", help="additional help with sub-command -h"
|
|
)
|
|
pg_ll = group_cmds.add_parser("ll", description="List all groups with repos.")
|
|
pg_ll.add_argument(
|
|
"to_show", nargs="?", choices=utils.get_groups(), help="group to show"
|
|
)
|
|
group_cmds.add_parser("ls", description="List all group names.")
|
|
pg_add = group_cmds.add_parser("add", description="Add repo(s) to a group.")
|
|
pg_add.add_argument(
|
|
"to_group",
|
|
nargs="+",
|
|
metavar="repo",
|
|
choices=utils.get_repos(),
|
|
help="repo(s) to be grouped",
|
|
)
|
|
pg_add.add_argument(
|
|
"-n",
|
|
"--name",
|
|
dest="gname",
|
|
type=partial(_group_name, exclude_old_names=False),
|
|
metavar="group-name",
|
|
required=True,
|
|
)
|
|
pg_add.add_argument(
|
|
"-p", "--path", dest="gpath", type=_path_name, metavar="group-path"
|
|
)
|
|
|
|
pg_rmrepo = group_cmds.add_parser(
|
|
"rmrepo", description="remove repo(s) from a group."
|
|
)
|
|
pg_rmrepo.add_argument(
|
|
"to_rm",
|
|
nargs="+",
|
|
metavar="repo",
|
|
choices=utils.get_repos(),
|
|
help="repo(s) to be removed from the group",
|
|
)
|
|
pg_rmrepo.add_argument(
|
|
"-n",
|
|
"--name",
|
|
dest="gname",
|
|
metavar="group-name",
|
|
required=True,
|
|
help="group name",
|
|
)
|
|
pg_rename = group_cmds.add_parser("rename", description="Change group name.")
|
|
pg_rename.add_argument(
|
|
"gname",
|
|
metavar="group-name",
|
|
choices=utils.get_groups(),
|
|
help="existing group to rename",
|
|
)
|
|
pg_rename.add_argument(
|
|
"new_name", metavar="new-name", type=_group_name, help="new group name"
|
|
)
|
|
group_cmds.add_parser("rm", description="Remove group(s).").add_argument(
|
|
"to_ungroup", nargs="+", choices=utils.get_groups(), help="group(s) to delete"
|
|
)
|
|
|
|
# superman mode
|
|
p_super = subparsers.add_parser(
|
|
"super",
|
|
help="run any git command/alias",
|
|
description="Superman mode: delegate any git command/alias in specified repo(s), group(s), or "
|
|
"all repo(s).\n"
|
|
'Examples:\n \t gita super myrepo1 commit -am "fix a bug"\n'
|
|
"\t gita super repo1 repo2 repo3 checkout new-feature",
|
|
)
|
|
p_super.add_argument(
|
|
"man",
|
|
nargs=argparse.REMAINDER,
|
|
help="execute arbitrary git command/alias for specified repo(s), group(s), or all repos.\n"
|
|
"Example: gita super repo1 diff --name-only --staged;\n"
|
|
"gita super checkout master ",
|
|
)
|
|
p_super.add_argument(
|
|
"-q", "--quote-mode", action="store_true", help="use quote mode"
|
|
)
|
|
p_super.set_defaults(func=f_super)
|
|
|
|
# shell mode
|
|
p_shell = subparsers.add_parser(
|
|
"shell",
|
|
help="run any shell command",
|
|
description="shell mode: delegate any shell command in specified repo(s), group(s), or "
|
|
"all repo(s).\n"
|
|
"Examples:\n \t gita shell pwd; \n"
|
|
"\t gita shell repo1 repo2 repo3 touch xx",
|
|
)
|
|
p_shell.add_argument(
|
|
"man",
|
|
nargs=argparse.REMAINDER,
|
|
help="execute arbitrary shell command for specified repo(s), group(s), or all repos.\n"
|
|
"Example: gita shell myrepo1 ls\n"
|
|
"Another: gita shell git checkout master ",
|
|
)
|
|
p_shell.add_argument(
|
|
"-q", "--quote-mode", action="store_true", help="use quote mode"
|
|
)
|
|
p_shell.set_defaults(func=f_shell)
|
|
|
|
# clear
|
|
p_clear = subparsers.add_parser(
|
|
"clear",
|
|
description="removes all groups and repositories",
|
|
help="removes all groups and repositories",
|
|
)
|
|
p_clear.set_defaults(func=f_clear)
|
|
|
|
# sub-commands that fit boilerplate
|
|
cmds = utils.get_cmds_from_files()
|
|
for name, data in cmds.items():
|
|
help = data.get("help")
|
|
repo_help = help
|
|
cmd = data["cmd"]
|
|
if data.get("allow_all"):
|
|
choices = utils.get_choices()
|
|
nargs = "*"
|
|
repo_help += " for all repos or"
|
|
else:
|
|
choices = utils.get_repos().keys() | utils.get_groups().keys()
|
|
nargs = "+"
|
|
repo_help += " for the chosen repo(s) or group(s)"
|
|
sp = subparsers.add_parser(name, description=help, help=help)
|
|
sp.add_argument("repo", nargs=nargs, choices=choices, help=repo_help)
|
|
is_shell = bool(data.get("shell"))
|
|
sp.add_argument(
|
|
"-s",
|
|
"--shell",
|
|
default=is_shell,
|
|
type=bool,
|
|
help="If set, run in shell mode",
|
|
)
|
|
if is_shell:
|
|
cmd = [cmd]
|
|
else:
|
|
cmd = cmd.split()
|
|
sp.set_defaults(func=f_git_cmd, cmd=cmd)
|
|
|
|
argcomplete.autocomplete(p)
|
|
args = p.parse_args(argv)
|
|
|
|
args.async_blacklist = {
|
|
name for name, data in cmds.items() if data.get("disable_async")
|
|
}
|
|
|
|
if "func" in args:
|
|
args.func(args)
|
|
else:
|
|
p.print_help() # pragma: no cover
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() # pragma: no cover
|