Adding upstream version 0.16.3.
Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
parent
149ef1ff29
commit
7cdc86fc2c
9 changed files with 371 additions and 233 deletions
20
.github/workflows/nos.yml
vendored
20
.github/workflows/nos.yml
vendored
|
@ -4,24 +4,26 @@ on: [push, pull_request]
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build:
|
build:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-20.04
|
||||||
strategy:
|
strategy:
|
||||||
matrix:
|
matrix:
|
||||||
os: [ubuntu-latest, macos-latest, windows-latest]
|
os: [ubuntu-20.04, macos-latest, windows-latest]
|
||||||
python-version: [3.6, 3.7, 3.8, 3.9]
|
python-version: [3.6, 3.7, 3.8, 3.9, "3.10", "3.11"]
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v2
|
- uses: actions/checkout@v3
|
||||||
- name: Set up Python ${{ matrix.python-version }}
|
- name: Set up Python ${{ matrix.python-version }}
|
||||||
uses: actions/setup-python@v2
|
uses: actions/setup-python@v4
|
||||||
with:
|
with:
|
||||||
python-version: ${{ matrix.python-version }}
|
python-version: ${{ matrix.python-version }}
|
||||||
- name: Install dependences
|
- name: Install dependences
|
||||||
run: |
|
run: |
|
||||||
python -m pip install --upgrade pip
|
python -m pip install --upgrade pip wheel
|
||||||
pip install -r requirements.txt
|
pip install -r requirements.txt
|
||||||
pip install .
|
pip install -e .
|
||||||
- name: Pytest
|
- name: Pytest
|
||||||
run: |
|
run: |
|
||||||
pytest tests --cov=./gita
|
pytest tests --cov=./gita --cov-report=xml
|
||||||
- name: Upload coverage to Codecov
|
- name: Upload coverage to Codecov
|
||||||
uses: codecov/codecov-action@v2
|
uses: codecov/codecov-action@v3
|
||||||
|
with:
|
||||||
|
token: ${{ secrets.CODECOV_TOKEN }}
|
||||||
|
|
|
@ -128,8 +128,8 @@ If more than one repos are specified, the `git` command runs asynchronously,
|
||||||
with the exception of `log`, `difftool` and `mergetool`,
|
with the exception of `log`, `difftool` and `mergetool`,
|
||||||
which require non-trivial user input.
|
which require non-trivial user input.
|
||||||
|
|
||||||
Repo configuration is saved in `$XDG_CONFIG_HOME/gita/repos.csv`
|
Repo configuration global is saved in `$XDG_CONFIG_HOME/gita/repos.csv`
|
||||||
(most likely `~/.config/gita/repos.csv`).
|
(most likely `~/.config/gita/repos.csv`) or if you prefered at project configuration add environment variable `GITA_PROJECT_HOME`.
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
|
|
|
@ -48,10 +48,10 @@ def _group_name(name: str, exclude_old_names=True) -> str:
|
||||||
|
|
||||||
def _path_name(name: str) -> str:
|
def _path_name(name: str) -> str:
|
||||||
"""
|
"""
|
||||||
Return absolute path without trailing /
|
Return absolute path
|
||||||
"""
|
"""
|
||||||
if name:
|
if name:
|
||||||
return os.path.abspath(name).rstrip(os.path.sep)
|
return os.path.abspath(name)
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ import os
|
||||||
|
|
||||||
|
|
||||||
def get_config_dir() -> str:
|
def get_config_dir() -> str:
|
||||||
root = os.environ.get('XDG_CONFIG_HOME') or os.path.join(
|
root = os.environ.get('GITA_PROJECT_HOME') or os.environ.get('XDG_CONFIG_HOME') or os.path.join(
|
||||||
os.path.expanduser('~'), '.config')
|
os.path.expanduser('~'), '.config')
|
||||||
return os.path.join(root, "gita")
|
return os.path.join(root, "gita")
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ from typing import Tuple, List, Callable, Dict
|
||||||
from . import common
|
from . import common
|
||||||
|
|
||||||
|
|
||||||
class Color(str, Enum):
|
class Color(Enum):
|
||||||
"""
|
"""
|
||||||
Terminal color
|
Terminal color
|
||||||
"""
|
"""
|
||||||
|
@ -32,6 +32,12 @@ class Color(str, Enum):
|
||||||
b_white = '\x1b[37;1m'
|
b_white = '\x1b[37;1m'
|
||||||
underline = '\x1B[4m'
|
underline = '\x1B[4m'
|
||||||
|
|
||||||
|
# Make f"{Color.foo}" expand to Color.foo.value .
|
||||||
|
#
|
||||||
|
# See https://stackoverflow.com/a/24487545
|
||||||
|
def __str__(self):
|
||||||
|
return f"{self.value}"
|
||||||
|
|
||||||
|
|
||||||
default_colors = {
|
default_colors = {
|
||||||
'no-remote': Color.white.name,
|
'no-remote': Color.white.name,
|
||||||
|
|
262
gita/utils.py
262
gita/utils.py
|
@ -9,6 +9,8 @@ from functools import lru_cache, partial
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List, Dict, Coroutine, Union, Iterator, Tuple
|
from typing import List, Dict, Coroutine, Union, Iterator, Tuple
|
||||||
from collections import Counter, defaultdict
|
from collections import Counter, defaultdict
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
import multiprocessing
|
||||||
|
|
||||||
from . import info
|
from . import info
|
||||||
from . import common
|
from . import common
|
||||||
|
@ -17,24 +19,25 @@ from . import common
|
||||||
MAX_INT = sys.maxsize
|
MAX_INT = sys.maxsize
|
||||||
|
|
||||||
|
|
||||||
def get_relative_path(kid: str, parent: str) -> Union[List[str], None]:
|
def get_relative_path(kid: os.PathLike, parent: str) -> Union[List[str], None]:
|
||||||
"""
|
"""
|
||||||
Return the relative path depth if relative, otherwise MAX_INT.
|
Return the relative path depth if relative, otherwise None.
|
||||||
|
|
||||||
Both the `kid` and `parent` should be absolute paths without trailing /
|
Both the `kid` and `parent` should be absolute paths
|
||||||
"""
|
"""
|
||||||
# Note that os.path.commonpath has no trailing /
|
if parent == "":
|
||||||
# TODO: python3.9 pathlib has is_relative_to() function
|
|
||||||
# TODO: Maybe use os.path.commonprefix? since it's faster?
|
|
||||||
if parent == '':
|
|
||||||
return None
|
return None
|
||||||
if parent == os.path.commonpath((kid, parent)):
|
|
||||||
rel = os.path.normpath(os.path.relpath(kid, parent)).split(os.sep)
|
p_kid = Path(kid)
|
||||||
if rel == ['.']:
|
# p_kid = Path(kid).resolve()
|
||||||
rel = []
|
try:
|
||||||
return rel
|
p_rel = p_kid.relative_to(parent)
|
||||||
else:
|
except ValueError:
|
||||||
return None
|
return None
|
||||||
|
rel = str(p_rel).split(os.sep)
|
||||||
|
if rel == ["."]:
|
||||||
|
rel = []
|
||||||
|
return rel
|
||||||
|
|
||||||
|
|
||||||
@lru_cache()
|
@lru_cache()
|
||||||
|
@ -43,16 +46,22 @@ def get_repos() -> Dict[str, Dict[str, str]]:
|
||||||
Return a `dict` of repo name to repo absolute path and repo type
|
Return a `dict` of repo name to repo absolute path and repo type
|
||||||
|
|
||||||
"""
|
"""
|
||||||
path_file = common.get_config_fname('repos.csv')
|
path_file = common.get_config_fname("repos.csv")
|
||||||
repos = {}
|
repos = {}
|
||||||
if os.path.isfile(path_file) and os.stat(path_file).st_size > 0:
|
if os.path.isfile(path_file) and os.stat(path_file).st_size > 0:
|
||||||
with open(path_file) as f:
|
with open(path_file) as f:
|
||||||
rows = csv.DictReader(f, ['path', 'name', 'type', 'flags'],
|
rows = csv.DictReader(
|
||||||
restval='') # it's actually a reader
|
f, ["path", "name", "type", "flags"], restval=""
|
||||||
repos = {r['name']:
|
) # it's actually a reader
|
||||||
{'path': r['path'], 'type': r['type'],
|
repos = {
|
||||||
'flags': r['flags'].split()}
|
r["name"]: {
|
||||||
for r in rows if is_git(r['path'], include_bare=True)}
|
"path": r["path"],
|
||||||
|
"type": r["type"],
|
||||||
|
"flags": r["flags"].split(),
|
||||||
|
}
|
||||||
|
for r in rows
|
||||||
|
if is_git(r["path"], include_bare=True)
|
||||||
|
}
|
||||||
return repos
|
return repos
|
||||||
|
|
||||||
|
|
||||||
|
@ -65,20 +74,19 @@ def get_context() -> Union[Path, None]:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
config_dir = Path(common.get_config_dir())
|
config_dir = Path(common.get_config_dir())
|
||||||
matches = list(config_dir.glob('*.context'))
|
matches = list(config_dir.glob("*.context"))
|
||||||
if len(matches) > 1:
|
if len(matches) > 1:
|
||||||
print("Cannot have multiple .context file")
|
print("Cannot have multiple .context file")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
if not matches:
|
if not matches:
|
||||||
return None
|
return None
|
||||||
ctx = matches[0]
|
ctx = matches[0]
|
||||||
if ctx.stem == 'auto':
|
if ctx.stem == "auto":
|
||||||
cwd = str(Path.cwd())
|
|
||||||
# The context is set to be the group with minimal distance to cwd
|
# The context is set to be the group with minimal distance to cwd
|
||||||
candidate = None
|
candidate = None
|
||||||
min_dist = MAX_INT
|
min_dist = MAX_INT
|
||||||
for gname, prop in get_groups().items():
|
for gname, prop in get_groups().items():
|
||||||
rel = get_relative_path(cwd, prop['path'])
|
rel = get_relative_path(Path.cwd(), prop["path"])
|
||||||
if rel is None:
|
if rel is None:
|
||||||
continue
|
continue
|
||||||
d = len(rel)
|
d = len(rel)
|
||||||
|
@ -88,7 +96,7 @@ def get_context() -> Union[Path, None]:
|
||||||
if not candidate:
|
if not candidate:
|
||||||
ctx = None
|
ctx = None
|
||||||
else:
|
else:
|
||||||
ctx = ctx.with_name(f'{candidate}.context')
|
ctx = ctx.with_name(f"{candidate}.context")
|
||||||
return ctx
|
return ctx
|
||||||
|
|
||||||
|
|
||||||
|
@ -98,19 +106,23 @@ def get_groups() -> Dict[str, Dict[str, Union[str, List]]]:
|
||||||
Return a `dict` of group name to group properties such as repo names and
|
Return a `dict` of group name to group properties such as repo names and
|
||||||
group path.
|
group path.
|
||||||
"""
|
"""
|
||||||
fname = common.get_config_fname('groups.csv')
|
fname = common.get_config_fname("groups.csv")
|
||||||
groups = {}
|
groups = {}
|
||||||
|
repos = get_repos()
|
||||||
# Each line is: group-name:repo1 repo2 repo3:group-path
|
# Each line is: group-name:repo1 repo2 repo3:group-path
|
||||||
if os.path.isfile(fname) and os.stat(fname).st_size > 0:
|
if os.path.isfile(fname) and os.stat(fname).st_size > 0:
|
||||||
with open(fname, 'r') as f:
|
with open(fname, "r") as f:
|
||||||
rows = csv.DictReader(f, ['name', 'repos', 'path'],
|
rows = csv.DictReader(
|
||||||
restval='', delimiter=':')
|
f, ["name", "repos", "path"], restval="", delimiter=":"
|
||||||
|
)
|
||||||
|
# filter out invalid repos
|
||||||
groups = {
|
groups = {
|
||||||
r['name']: {
|
r["name"]: {
|
||||||
'repos': r['repos'].split(),
|
"repos": [repo for repo in r["repos"].split() if repo in repos],
|
||||||
'path': r['path']
|
"path": r["path"],
|
||||||
}
|
}
|
||||||
for r in rows}
|
for r in rows
|
||||||
|
}
|
||||||
return groups
|
return groups
|
||||||
|
|
||||||
|
|
||||||
|
@ -121,7 +133,7 @@ def delete_repo_from_groups(repo: str, groups: Dict[str, Dict]) -> bool:
|
||||||
deleted = False
|
deleted = False
|
||||||
for name in groups:
|
for name in groups:
|
||||||
try:
|
try:
|
||||||
groups[name]['repos'].remove(repo)
|
groups[name]["repos"].remove(repo)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
@ -130,20 +142,18 @@ def delete_repo_from_groups(repo: str, groups: Dict[str, Dict]) -> bool:
|
||||||
|
|
||||||
|
|
||||||
def replace_context(old: Union[Path, None], new: str):
|
def replace_context(old: Union[Path, None], new: str):
|
||||||
"""
|
""" """
|
||||||
|
auto = Path(common.get_config_dir()) / "auto.context"
|
||||||
"""
|
|
||||||
auto = Path(common.get_config_dir()) / 'auto.context'
|
|
||||||
if auto.exists():
|
if auto.exists():
|
||||||
old = auto
|
old = auto
|
||||||
|
|
||||||
if new == 'none': # delete
|
if new == "none": # delete
|
||||||
old and old.unlink()
|
old and old.unlink()
|
||||||
elif old:
|
elif old:
|
||||||
# ctx.rename(ctx.with_stem(new_name)) # only works in py3.9
|
# ctx.rename(ctx.with_stem(new_name)) # only works in py3.9
|
||||||
old.rename(old.with_name(f'{new}.context'))
|
old.rename(old.with_name(f"{new}.context"))
|
||||||
else:
|
else:
|
||||||
Path(auto.with_name(f'{new}.context')).write_text('')
|
Path(auto.with_name(f"{new}.context")).write_text("")
|
||||||
|
|
||||||
|
|
||||||
def get_choices() -> List[Union[str, None]]:
|
def get_choices() -> List[Union[str, None]]:
|
||||||
|
@ -162,10 +172,8 @@ def get_choices() -> List[Union[str, None]]:
|
||||||
|
|
||||||
|
|
||||||
def is_submodule_repo(p: Path) -> bool:
|
def is_submodule_repo(p: Path) -> bool:
|
||||||
"""
|
""" """
|
||||||
|
if p.is_file() and ".git/modules" in p.read_text():
|
||||||
"""
|
|
||||||
if p.is_file() and '.git/modules' in p.read_text():
|
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -183,7 +191,7 @@ def is_git(path: str, include_bare=False, exclude_submodule=False) -> bool:
|
||||||
# A more reliable way to differentiable regular and worktree repos is to
|
# A more reliable way to differentiable regular and worktree repos is to
|
||||||
# compare the result of `git rev-parse --git-dir` and
|
# compare the result of `git rev-parse --git-dir` and
|
||||||
# `git rev-parse --git-common-dir`
|
# `git rev-parse --git-common-dir`
|
||||||
loc = os.path.join(path, '.git')
|
loc = os.path.join(path, ".git")
|
||||||
# TODO: we can display the worktree repos in a different font.
|
# TODO: we can display the worktree repos in a different font.
|
||||||
if os.path.exists(loc):
|
if os.path.exists(loc):
|
||||||
if exclude_submodule and is_submodule_repo(Path(loc)):
|
if exclude_submodule and is_submodule_repo(Path(loc)):
|
||||||
|
@ -192,11 +200,13 @@ def is_git(path: str, include_bare=False, exclude_submodule=False) -> bool:
|
||||||
if not include_bare:
|
if not include_bare:
|
||||||
return False
|
return False
|
||||||
# detect bare repo
|
# detect bare repo
|
||||||
got = subprocess.run('git rev-parse --is-bare-repository'.split(),
|
got = subprocess.run(
|
||||||
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
|
"git rev-parse --is-bare-repository".split(),
|
||||||
cwd=path
|
stdout=subprocess.PIPE,
|
||||||
)
|
stderr=subprocess.DEVNULL,
|
||||||
if got.returncode == 0 and got.stdout == b'true\n':
|
cwd=path,
|
||||||
|
)
|
||||||
|
if got.returncode == 0 and got.stdout == b"true\n":
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -211,16 +221,16 @@ def rename_repo(repos: Dict[str, Dict[str, str]], repo: str, new_name: str):
|
||||||
prop = repos[repo]
|
prop = repos[repo]
|
||||||
del repos[repo]
|
del repos[repo]
|
||||||
repos[new_name] = prop
|
repos[new_name] = prop
|
||||||
write_to_repo_file(repos, 'w')
|
write_to_repo_file(repos, "w")
|
||||||
|
|
||||||
groups = get_groups()
|
groups = get_groups()
|
||||||
for g, values in groups.items():
|
for g, values in groups.items():
|
||||||
members = values['repos']
|
members = values["repos"]
|
||||||
if repo in members:
|
if repo in members:
|
||||||
members.remove(repo)
|
members.remove(repo)
|
||||||
members.append(new_name)
|
members.append(new_name)
|
||||||
groups[g]['repos'] = sorted(members)
|
groups[g]["repos"] = sorted(members)
|
||||||
write_to_groups_file(groups, 'w')
|
write_to_groups_file(groups, "w")
|
||||||
|
|
||||||
|
|
||||||
def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str):
|
def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str):
|
||||||
|
@ -228,40 +238,43 @@ def write_to_repo_file(repos: Dict[str, Dict[str, str]], mode: str):
|
||||||
@param repos: each repo is {name: {properties}}
|
@param repos: each repo is {name: {properties}}
|
||||||
"""
|
"""
|
||||||
# The 3rd column is repo type; unused field
|
# The 3rd column is repo type; unused field
|
||||||
data = [(prop['path'], name, '', ' '.join(prop['flags']))
|
data = [
|
||||||
for name, prop in repos.items()]
|
(prop["path"], name, "", " ".join(prop["flags"]))
|
||||||
fname = common.get_config_fname('repos.csv')
|
for name, prop in repos.items()
|
||||||
|
]
|
||||||
|
fname = common.get_config_fname("repos.csv")
|
||||||
os.makedirs(os.path.dirname(fname), exist_ok=True)
|
os.makedirs(os.path.dirname(fname), exist_ok=True)
|
||||||
with open(fname, mode, newline='') as f:
|
with open(fname, mode, newline="") as f:
|
||||||
writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
|
writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
|
||||||
writer.writerows(data)
|
writer.writerows(data)
|
||||||
|
|
||||||
|
|
||||||
# TODO: combine with the repo writer
|
# TODO: combine with the repo writer
|
||||||
def write_to_groups_file(groups: Dict[str, Dict], mode: str):
|
def write_to_groups_file(groups: Dict[str, Dict], mode: str):
|
||||||
"""
|
""" """
|
||||||
|
fname = common.get_config_fname("groups.csv")
|
||||||
"""
|
|
||||||
fname = common.get_config_fname('groups.csv')
|
|
||||||
os.makedirs(os.path.dirname(fname), exist_ok=True)
|
os.makedirs(os.path.dirname(fname), exist_ok=True)
|
||||||
if not groups: # all groups are deleted
|
if not groups: # all groups are deleted
|
||||||
Path(fname).write_text('')
|
Path(fname).write_text("")
|
||||||
else:
|
else:
|
||||||
# delete the group if there are no repos
|
# delete the group if there are no repos
|
||||||
for name in list(groups):
|
for name in list(groups):
|
||||||
if not groups[name]['repos']:
|
if not groups[name]["repos"]:
|
||||||
del groups[name]
|
del groups[name]
|
||||||
with open(fname, mode, newline='') as f:
|
with open(fname, mode, newline="") as f:
|
||||||
data = [
|
data = [
|
||||||
(group, ' '.join(prop['repos']), prop['path'])
|
(group, " ".join(prop["repos"]), prop["path"])
|
||||||
for group, prop in groups.items()
|
for group, prop in groups.items()
|
||||||
]
|
]
|
||||||
writer = csv.writer(f, delimiter=':', quotechar='"', quoting=csv.QUOTE_MINIMAL)
|
writer = csv.writer(
|
||||||
|
f, delimiter=":", quotechar='"', quoting=csv.QUOTE_MINIMAL
|
||||||
|
)
|
||||||
writer.writerows(data)
|
writer.writerows(data)
|
||||||
|
|
||||||
|
|
||||||
def _make_name(path: str, repos: Dict[str, Dict[str, str]],
|
def _make_name(
|
||||||
name_counts: Counter) -> str:
|
path: str, repos: Dict[str, Dict[str, str]], name_counts: Counter
|
||||||
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Given a new repo `path`, create a repo name. By default, basename is used.
|
Given a new repo `path`, create a repo name. By default, basename is used.
|
||||||
If name collision exists, further include parent path name.
|
If name collision exists, further include parent path name.
|
||||||
|
@ -276,17 +289,19 @@ def _make_name(path: str, repos: Dict[str, Dict[str, str]],
|
||||||
return name
|
return name
|
||||||
|
|
||||||
|
|
||||||
def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str],
|
def add_repos(
|
||||||
include_bare=False,
|
repos: Dict[str, Dict[str, str]],
|
||||||
exclude_submodule=False,
|
new_paths: List[str],
|
||||||
dry_run=False,
|
include_bare=False,
|
||||||
) -> Dict[str, Dict[str, str]]:
|
exclude_submodule=False,
|
||||||
|
dry_run=False,
|
||||||
|
) -> Dict[str, Dict[str, str]]:
|
||||||
"""
|
"""
|
||||||
Write new repo paths to file; return the added repos.
|
Write new repo paths to file; return the added repos.
|
||||||
|
|
||||||
@param repos: name -> path
|
@param repos: name -> path
|
||||||
"""
|
"""
|
||||||
existing_paths = {prop['path'] for prop in repos.values()}
|
existing_paths = {prop["path"] for prop in repos.values()}
|
||||||
new_paths = {p for p in new_paths if is_git(p, include_bare, exclude_submodule)}
|
new_paths = {p for p in new_paths if is_git(p, include_bare, exclude_submodule)}
|
||||||
new_paths = new_paths - existing_paths
|
new_paths = new_paths - existing_paths
|
||||||
new_repos = {}
|
new_repos = {}
|
||||||
|
@ -296,21 +311,21 @@ def add_repos(repos: Dict[str, Dict[str, str]], new_paths: List[str],
|
||||||
for p in new_paths:
|
for p in new_paths:
|
||||||
print(p)
|
print(p)
|
||||||
return {}
|
return {}
|
||||||
name_counts = Counter(
|
name_counts = Counter(os.path.basename(os.path.normpath(p)) for p in new_paths)
|
||||||
os.path.basename(os.path.normpath(p)) for p in new_paths
|
new_repos = {
|
||||||
)
|
_make_name(path, repos, name_counts): {
|
||||||
new_repos = {_make_name(path, repos, name_counts): {
|
"path": path,
|
||||||
'path': path,
|
"flags": "",
|
||||||
'flags': '',
|
}
|
||||||
} for path in new_paths}
|
for path in new_paths
|
||||||
write_to_repo_file(new_repos, 'a+')
|
}
|
||||||
|
write_to_repo_file(new_repos, "a+")
|
||||||
else:
|
else:
|
||||||
print('No new repos found!')
|
print("No new repos found!")
|
||||||
return new_repos
|
return new_repos
|
||||||
|
|
||||||
|
|
||||||
def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[
|
def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[Tuple[str, ...], str]:
|
||||||
Tuple[str, ...], str]:
|
|
||||||
"""
|
"""
|
||||||
Return relative parent strings, and the parent head string
|
Return relative parent strings, and the parent head string
|
||||||
|
|
||||||
|
@ -322,13 +337,12 @@ def _generate_dir_hash(repo_path: str, paths: List[str]) -> Tuple[
|
||||||
if rel is not None:
|
if rel is not None:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
return (), ''
|
return (), ""
|
||||||
head, tail = os.path.split(p)
|
head, tail = os.path.split(p)
|
||||||
return (tail, *rel), head
|
return (tail, *rel), head
|
||||||
|
|
||||||
|
|
||||||
def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]
|
def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]) -> Dict[str, Dict]:
|
||||||
) -> Dict[str, Dict]:
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@params repos: repos to be grouped
|
@params repos: repos to be grouped
|
||||||
|
@ -337,17 +351,17 @@ def auto_group(repos: Dict[str, Dict[str, str]], paths: List[str]
|
||||||
# i.e., each repo should be contained in one and only one path
|
# i.e., each repo should be contained in one and only one path
|
||||||
new_groups = defaultdict(dict)
|
new_groups = defaultdict(dict)
|
||||||
for repo_name, prop in repos.items():
|
for repo_name, prop in repos.items():
|
||||||
hash, head = _generate_dir_hash(prop['path'], paths)
|
hash, head = _generate_dir_hash(prop["path"], paths)
|
||||||
if not hash:
|
if not hash:
|
||||||
continue
|
continue
|
||||||
for i in range(1, len(hash)+1):
|
for i in range(1, len(hash) + 1):
|
||||||
group_name = '-'.join(hash[:i])
|
group_name = "-".join(hash[:i])
|
||||||
prop = new_groups[group_name]
|
prop = new_groups[group_name]
|
||||||
prop['path'] = os.path.join(head, *hash[:i])
|
prop["path"] = os.path.join(head, *hash[:i])
|
||||||
if 'repos' not in prop:
|
if "repos" not in prop:
|
||||||
prop['repos'] = [repo_name]
|
prop["repos"] = [repo_name]
|
||||||
else:
|
else:
|
||||||
prop['repos'].append(repo_name)
|
prop["repos"].append(repo_name)
|
||||||
# FIXME: need to make sure the new group names don't clash with old ones
|
# FIXME: need to make sure the new group names don't clash with old ones
|
||||||
# or repo names
|
# or repo names
|
||||||
return new_groups
|
return new_groups
|
||||||
|
@ -359,7 +373,7 @@ def parse_clone_config(fname: str) -> Iterator[List[str]]:
|
||||||
"""
|
"""
|
||||||
with open(fname) as f:
|
with open(fname) as f:
|
||||||
for line in f:
|
for line in f:
|
||||||
yield line.strip().split(',')
|
yield line.strip().split(",")
|
||||||
|
|
||||||
|
|
||||||
async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]:
|
async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, str]:
|
||||||
|
@ -374,7 +388,8 @@ async def run_async(repo_name: str, path: str, cmds: List[str]) -> Union[None, s
|
||||||
stdout=asyncio.subprocess.PIPE,
|
stdout=asyncio.subprocess.PIPE,
|
||||||
stderr=asyncio.subprocess.PIPE,
|
stderr=asyncio.subprocess.PIPE,
|
||||||
start_new_session=True,
|
start_new_session=True,
|
||||||
cwd=path)
|
cwd=path,
|
||||||
|
)
|
||||||
stdout, stderr = await process.communicate()
|
stdout, stderr = await process.communicate()
|
||||||
for pipe in (stdout, stderr):
|
for pipe in (stdout, stderr):
|
||||||
if pipe:
|
if pipe:
|
||||||
|
@ -389,7 +404,7 @@ def format_output(s: str, prefix: str):
|
||||||
"""
|
"""
|
||||||
Prepends every line in given string with the given prefix.
|
Prepends every line in given string with the given prefix.
|
||||||
"""
|
"""
|
||||||
return ''.join([f'{prefix}: {line}' for line in s.splitlines(keepends=True)])
|
return "".join([f"{prefix}: {line}" for line in s.splitlines(keepends=True)])
|
||||||
|
|
||||||
|
|
||||||
def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
|
def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
|
||||||
|
@ -397,7 +412,7 @@ def exec_async_tasks(tasks: List[Coroutine]) -> List[Union[None, str]]:
|
||||||
Execute tasks asynchronously
|
Execute tasks asynchronously
|
||||||
"""
|
"""
|
||||||
# TODO: asyncio API is nicer in python 3.7
|
# TODO: asyncio API is nicer in python 3.7
|
||||||
if platform.system() == 'Windows':
|
if platform.system() == "Windows":
|
||||||
loop = asyncio.ProactorEventLoop()
|
loop = asyncio.ProactorEventLoop()
|
||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(loop)
|
||||||
else:
|
else:
|
||||||
|
@ -415,17 +430,20 @@ def describe(repos: Dict[str, Dict[str, str]], no_colors: bool = False) -> str:
|
||||||
Return the status of all repos
|
Return the status of all repos
|
||||||
"""
|
"""
|
||||||
if repos:
|
if repos:
|
||||||
name_width = max(len(n) for n in repos) + 1
|
name_width = len(max(repos, key=len)) + 1
|
||||||
funcs = info.get_info_funcs()
|
funcs = info.get_info_funcs()
|
||||||
|
|
||||||
get_repo_status = info.get_repo_status
|
get_repo_status = info.get_repo_status
|
||||||
if get_repo_status in funcs and no_colors:
|
if get_repo_status in funcs and no_colors:
|
||||||
idx = funcs.index(get_repo_status)
|
idx = funcs.index(get_repo_status)
|
||||||
funcs[idx] = partial(get_repo_status, no_colors=True)
|
funcs[idx] = partial(get_repo_status, no_colors=True)
|
||||||
|
|
||||||
for name in sorted(repos):
|
num_threads = min(multiprocessing.cpu_count(), len(repos))
|
||||||
info_items = ' '.join(f(repos[name]) for f in funcs)
|
with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
||||||
yield f'{name:<{name_width}}{info_items}'
|
for line in executor.map(
|
||||||
|
lambda repo: f'{repo:<{name_width}}{" ".join(f(repos[repo]) for f in funcs)}',
|
||||||
|
sorted(repos)):
|
||||||
|
yield line
|
||||||
|
|
||||||
|
|
||||||
def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
|
def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
|
||||||
|
@ -442,15 +460,15 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
|
||||||
"""
|
"""
|
||||||
# default config file
|
# default config file
|
||||||
fname = os.path.join(os.path.dirname(__file__), "cmds.json")
|
fname = os.path.join(os.path.dirname(__file__), "cmds.json")
|
||||||
with open(fname, 'r') as f:
|
with open(fname, "r") as f:
|
||||||
cmds = json.load(f)
|
cmds = json.load(f)
|
||||||
|
|
||||||
# custom config file
|
# custom config file
|
||||||
root = common.get_config_dir()
|
root = common.get_config_dir()
|
||||||
fname = os.path.join(root, 'cmds.json')
|
fname = os.path.join(root, "cmds.json")
|
||||||
custom_cmds = {}
|
custom_cmds = {}
|
||||||
if os.path.isfile(fname) and os.path.getsize(fname):
|
if os.path.isfile(fname) and os.path.getsize(fname):
|
||||||
with open(fname, 'r') as f:
|
with open(fname, "r") as f:
|
||||||
custom_cmds = json.load(f)
|
custom_cmds = json.load(f)
|
||||||
|
|
||||||
# custom commands shadow default ones
|
# custom commands shadow default ones
|
||||||
|
@ -458,8 +476,10 @@ def get_cmds_from_files() -> Dict[str, Dict[str, str]]:
|
||||||
return cmds
|
return cmds
|
||||||
|
|
||||||
|
|
||||||
def parse_repos_and_rest(input: List[str], quote_mode=False,
|
def parse_repos_and_rest(
|
||||||
) -> Tuple[Dict[str, Dict[str, str]], List[str]]:
|
input: List[str],
|
||||||
|
quote_mode=False,
|
||||||
|
) -> Tuple[Dict[str, Dict[str, str]], List[str]]:
|
||||||
"""
|
"""
|
||||||
Parse gita input arguments
|
Parse gita input arguments
|
||||||
|
|
||||||
|
@ -481,7 +501,7 @@ def parse_repos_and_rest(input: List[str], quote_mode=False,
|
||||||
if not names and ctx:
|
if not names and ctx:
|
||||||
names = [ctx.stem]
|
names = [ctx.stem]
|
||||||
if quote_mode and i + 1 != len(input):
|
if quote_mode and i + 1 != len(input):
|
||||||
print(input[i], 'is not a repo or group' )
|
print(input[i], "is not a repo or group")
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
|
|
||||||
if names:
|
if names:
|
||||||
|
@ -490,7 +510,7 @@ def parse_repos_and_rest(input: List[str], quote_mode=False,
|
||||||
if k in repos:
|
if k in repos:
|
||||||
chosen[k] = repos[k]
|
chosen[k] = repos[k]
|
||||||
if k in groups:
|
if k in groups:
|
||||||
for r in groups[k]['repos']:
|
for r in groups[k]["repos"]:
|
||||||
chosen[r] = repos[r]
|
chosen[r] = repos[r]
|
||||||
# if not set here, all repos are chosen
|
# if not set here, all repos are chosen
|
||||||
repos = chosen
|
repos = chosen
|
||||||
|
|
2
setup.py
2
setup.py
|
@ -7,7 +7,7 @@ with open("README.md", encoding="utf-8") as f:
|
||||||
setup(
|
setup(
|
||||||
name="gita",
|
name="gita",
|
||||||
packages=["gita"],
|
packages=["gita"],
|
||||||
version="0.16.2",
|
version="0.16.3",
|
||||||
license="MIT",
|
license="MIT",
|
||||||
description="Manage multiple git repos with sanity",
|
description="Manage multiple git repos with sanity",
|
||||||
long_description=long_description,
|
long_description=long_description,
|
||||||
|
|
|
@ -94,14 +94,14 @@ class TestLsLl:
|
||||||
out, err = capfd.readouterr()
|
out, err = capfd.readouterr()
|
||||||
assert err == ""
|
assert err == ""
|
||||||
assert "gita" in out
|
assert "gita" in out
|
||||||
assert info.Color.end in out
|
assert info.Color.end.value in out
|
||||||
|
|
||||||
# no color on branch name
|
# no color on branch name
|
||||||
__main__.main(["ll", "-C"])
|
__main__.main(["ll", "-C"])
|
||||||
out, err = capfd.readouterr()
|
out, err = capfd.readouterr()
|
||||||
assert err == ""
|
assert err == ""
|
||||||
assert "gita" in out
|
assert "gita" in out
|
||||||
assert info.Color.end not in out
|
assert info.Color.end.value not in out
|
||||||
|
|
||||||
__main__.main(["ls", "gita"])
|
__main__.main(["ls", "gita"])
|
||||||
out, err = capfd.readouterr()
|
out, err = capfd.readouterr()
|
||||||
|
@ -367,8 +367,9 @@ class TestGroupCmd:
|
||||||
assert err == ""
|
assert err == ""
|
||||||
assert "xx yy\n" == out
|
assert "xx yy\n" == out
|
||||||
|
|
||||||
|
@patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""})
|
||||||
@patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
|
@patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
|
||||||
def test_ll(self, _, capfd):
|
def test_ll(self, _, __, capfd):
|
||||||
args = argparse.Namespace()
|
args = argparse.Namespace()
|
||||||
args.to_group = None
|
args.to_group = None
|
||||||
args.group_cmd = None
|
args.group_cmd = None
|
||||||
|
@ -382,8 +383,9 @@ class TestGroupCmd:
|
||||||
== "\x1b[4mxx\x1b[0m: \n - a\n - b\n\x1b[4myy\x1b[0m: \n - a\n - c\n - d\n"
|
== "\x1b[4mxx\x1b[0m: \n - a\n - b\n\x1b[4myy\x1b[0m: \n - a\n - c\n - d\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""})
|
||||||
@patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
|
@patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
|
||||||
def test_ll_with_group(self, _, capfd):
|
def test_ll_with_group(self, _, __, capfd):
|
||||||
args = argparse.Namespace()
|
args = argparse.Namespace()
|
||||||
args.to_group = None
|
args.to_group = None
|
||||||
args.group_cmd = None
|
args.group_cmd = None
|
||||||
|
@ -394,9 +396,10 @@ class TestGroupCmd:
|
||||||
assert err == ""
|
assert err == ""
|
||||||
assert "a c d\n" == out
|
assert "a c d\n" == out
|
||||||
|
|
||||||
|
@patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""})
|
||||||
@patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
|
@patch("gita.common.get_config_fname", return_value=GROUP_FNAME)
|
||||||
@patch("gita.utils.write_to_groups_file")
|
@patch("gita.utils.write_to_groups_file")
|
||||||
def test_rename(self, mock_write, _):
|
def test_rename(self, mock_write, *_):
|
||||||
args = argparse.Namespace()
|
args = argparse.Namespace()
|
||||||
args.gname = "xx"
|
args.gname = "xx"
|
||||||
args.new_name = "zz"
|
args.new_name = "zz"
|
||||||
|
|
|
@ -6,134 +6,237 @@ from unittest.mock import patch, mock_open
|
||||||
|
|
||||||
from gita import utils, info
|
from gita import utils, info
|
||||||
from conftest import (
|
from conftest import (
|
||||||
PATH_FNAME, PATH_FNAME_EMPTY, PATH_FNAME_CLASH, GROUP_FNAME, TEST_DIR,
|
PATH_FNAME,
|
||||||
|
PATH_FNAME_EMPTY,
|
||||||
|
PATH_FNAME_CLASH,
|
||||||
|
GROUP_FNAME,
|
||||||
|
TEST_DIR,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('input, expected', [
|
@pytest.mark.parametrize(
|
||||||
([], ({'repo1': {'path': '/a/bcd/repo1', 'type': '', 'flags': []}, 'xxx': {'path': '/a/b/c/repo3', 'type': '', 'flags': []}, 'repo2': {'path': '/e/fgh/repo2', 'type': '', 'flags': []}}, [])),
|
"kid, parent, expected",
|
||||||
(['st'], ({'repo1': {'path': '/a/bcd/repo1', 'type': '', 'flags': []}, 'xxx': {'path': '/a/b/c/repo3', 'type': '', 'flags': []}, 'repo2': {'path': '/e/fgh/repo2', 'type': '', 'flags': []}}, ['st'])),
|
[
|
||||||
(['repo1', 'st'], ({'repo1': {'flags': [], 'path': '/a/bcd/repo1', 'type': ''}}, ['st'])),
|
("/a/b/repo", "/a/b", ["repo"]),
|
||||||
(['repo1'], ({'repo1': {'flags': [], 'path': '/a/bcd/repo1', 'type': ''}}, [])),
|
("/a/b/repo", "/a", ["b", "repo"]),
|
||||||
])
|
("/a/b/repo", "/a/", ["b", "repo"]),
|
||||||
@patch('gita.utils.is_git', return_value=True)
|
("/a/b/repo", "", None),
|
||||||
@patch('gita.common.get_config_fname', return_value=PATH_FNAME)
|
("/a/b/repo", "/a/b/repo", []),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
def test_get_relative_path(kid, parent, expected):
|
||||||
|
assert expected == utils.get_relative_path(kid, parent)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
"input, expected",
|
||||||
|
[
|
||||||
|
(
|
||||||
|
[],
|
||||||
|
(
|
||||||
|
{
|
||||||
|
"repo1": {"path": "/a/bcd/repo1", "type": "", "flags": []},
|
||||||
|
"xxx": {"path": "/a/b/c/repo3", "type": "", "flags": []},
|
||||||
|
"repo2": {"path": "/e/fgh/repo2", "type": "", "flags": []},
|
||||||
|
},
|
||||||
|
[],
|
||||||
|
),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
["st"],
|
||||||
|
(
|
||||||
|
{
|
||||||
|
"repo1": {"path": "/a/bcd/repo1", "type": "", "flags": []},
|
||||||
|
"xxx": {"path": "/a/b/c/repo3", "type": "", "flags": []},
|
||||||
|
"repo2": {"path": "/e/fgh/repo2", "type": "", "flags": []},
|
||||||
|
},
|
||||||
|
["st"],
|
||||||
|
),
|
||||||
|
),
|
||||||
|
(
|
||||||
|
["repo1", "st"],
|
||||||
|
({"repo1": {"flags": [], "path": "/a/bcd/repo1", "type": ""}}, ["st"]),
|
||||||
|
),
|
||||||
|
(["repo1"], ({"repo1": {"flags": [], "path": "/a/bcd/repo1", "type": ""}}, [])),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@patch("gita.utils.is_git", return_value=True)
|
||||||
|
@patch("gita.common.get_config_fname", return_value=PATH_FNAME)
|
||||||
def test_parse_repos_and_rest(mock_path_fname, _, input, expected):
|
def test_parse_repos_and_rest(mock_path_fname, _, input, expected):
|
||||||
got = utils.parse_repos_and_rest(input)
|
got = utils.parse_repos_and_rest(input)
|
||||||
assert got == expected
|
assert got == expected
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('repo_path, paths, expected', [
|
@pytest.mark.parametrize(
|
||||||
('/a/b/c/repo', ['/a/b'], (('b', 'c'), '/a')),
|
"repo_path, paths, expected",
|
||||||
])
|
[
|
||||||
|
("/a/b/c/repo", ["/a/b"], (("b", "c"), "/a")),
|
||||||
|
],
|
||||||
|
)
|
||||||
def test_generate_dir_hash(repo_path, paths, expected):
|
def test_generate_dir_hash(repo_path, paths, expected):
|
||||||
got = utils._generate_dir_hash(repo_path, paths)
|
got = utils._generate_dir_hash(repo_path, paths)
|
||||||
assert got == expected
|
assert got == expected
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('repos, paths, expected', [
|
@pytest.mark.parametrize(
|
||||||
({'r1': {'path': '/a/b//repo1'}, 'r2': {'path': '/a/b/repo2'}},
|
"repos, paths, expected",
|
||||||
['/a/b'], {'b': {'repos': ['r1', 'r2'], 'path': '/a/b'}}),
|
[
|
||||||
({'r1': {'path': '/a/b//repo1'}, 'r2': {'path': '/a/b/c/repo2'}},
|
(
|
||||||
['/a/b'], {'b': {'repos': ['r1', 'r2'], 'path': '/a/b'},
|
{"r1": {"path": "/a/b//repo1"}, "r2": {"path": "/a/b/repo2"}},
|
||||||
'b-c': {'repos': ['r2'], 'path': "/a/b/c"}}),
|
["/a/b"],
|
||||||
({'r1': {'path': '/a/b/c/repo1'}, 'r2': {'path': '/a/b/c/repo2'}},
|
{"b": {"repos": ["r1", "r2"], "path": "/a/b"}},
|
||||||
['/a/b'], {'b-c': {'repos': ['r1', 'r2'], 'path': '/a/b/c'},
|
),
|
||||||
'b': {'path': '/a/b', 'repos': ['r1', 'r2']}}),
|
(
|
||||||
])
|
{"r1": {"path": "/a/b//repo1"}, "r2": {"path": "/a/b/c/repo2"}},
|
||||||
|
["/a/b"],
|
||||||
|
{
|
||||||
|
"b": {"repos": ["r1", "r2"], "path": "/a/b"},
|
||||||
|
"b-c": {"repos": ["r2"], "path": "/a/b/c"},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
(
|
||||||
|
{"r1": {"path": "/a/b/c/repo1"}, "r2": {"path": "/a/b/c/repo2"}},
|
||||||
|
["/a/b"],
|
||||||
|
{
|
||||||
|
"b-c": {"repos": ["r1", "r2"], "path": "/a/b/c"},
|
||||||
|
"b": {"path": "/a/b", "repos": ["r1", "r2"]},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
def test_auto_group(repos, paths, expected):
|
def test_auto_group(repos, paths, expected):
|
||||||
got = utils.auto_group(repos, paths)
|
got = utils.auto_group(repos, paths)
|
||||||
assert got == expected
|
assert got == expected
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('test_input, diff_return, expected', [
|
@pytest.mark.parametrize(
|
||||||
([{'abc': {'path': '/root/repo/', 'type': '', 'flags': []}}, False],
|
"test_input, diff_return, expected",
|
||||||
True, 'abc \x1b[31mrepo *+_ \x1b[0m msg xx'),
|
[
|
||||||
([{'abc': {'path': '/root/repo/', 'type': '', 'flags': []}}, True],
|
(
|
||||||
True, 'abc repo *+_ msg xx'),
|
[{"abc": {"path": "/root/repo/", "type": "", "flags": []}}, False],
|
||||||
([{'repo': {'path': '/root/repo2/', 'type': '', 'flags': []}}, False],
|
True,
|
||||||
False, 'repo \x1b[32mrepo _ \x1b[0m msg xx'),
|
"abc \x1b[31mrepo *+_ \x1b[0m msg xx",
|
||||||
])
|
),
|
||||||
|
(
|
||||||
|
[{"abc": {"path": "/root/repo/", "type": "", "flags": []}}, True],
|
||||||
|
True,
|
||||||
|
"abc repo *+_ msg xx",
|
||||||
|
),
|
||||||
|
(
|
||||||
|
[{"repo": {"path": "/root/repo2/", "type": "", "flags": []}}, False],
|
||||||
|
False,
|
||||||
|
"repo \x1b[32mrepo _ \x1b[0m msg xx",
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
def test_describe(test_input, diff_return, expected, monkeypatch):
|
def test_describe(test_input, diff_return, expected, monkeypatch):
|
||||||
monkeypatch.setattr(info, 'get_head', lambda x: 'repo')
|
monkeypatch.setattr(info, "get_head", lambda x: "repo")
|
||||||
monkeypatch.setattr(info, 'run_quiet_diff', lambda *_: diff_return)
|
monkeypatch.setattr(info, "run_quiet_diff", lambda *_: diff_return)
|
||||||
monkeypatch.setattr(info, 'get_commit_msg', lambda *_: "msg")
|
monkeypatch.setattr(info, "get_commit_msg", lambda *_: "msg")
|
||||||
monkeypatch.setattr(info, 'get_commit_time', lambda *_: "xx")
|
monkeypatch.setattr(info, "get_commit_time", lambda *_: "xx")
|
||||||
monkeypatch.setattr(info, 'has_untracked', lambda *_: True)
|
monkeypatch.setattr(info, "has_untracked", lambda *_: True)
|
||||||
monkeypatch.setattr('os.chdir', lambda x: None)
|
monkeypatch.setattr("os.chdir", lambda x: None)
|
||||||
|
|
||||||
info.get_color_encoding.cache_clear() # avoid side effect
|
info.get_color_encoding.cache_clear() # avoid side effect
|
||||||
assert expected == next(utils.describe(*test_input))
|
assert expected == next(utils.describe(*test_input))
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('path_fname, expected', [
|
@pytest.mark.parametrize(
|
||||||
(PATH_FNAME, {
|
"path_fname, expected",
|
||||||
'repo1': {'path': '/a/bcd/repo1', 'type': '', 'flags': []},
|
[
|
||||||
'repo2': {'path': '/e/fgh/repo2', 'type': '', 'flags': []},
|
(
|
||||||
'xxx': {'path': '/a/b/c/repo3', 'type': '', 'flags': []},
|
PATH_FNAME,
|
||||||
}),
|
{
|
||||||
(PATH_FNAME_EMPTY, {}),
|
"repo1": {"path": "/a/bcd/repo1", "type": "", "flags": []},
|
||||||
(PATH_FNAME_CLASH, {
|
"repo2": {"path": "/e/fgh/repo2", "type": "", "flags": []},
|
||||||
'repo2': {'path': '/e/fgh/repo2', 'type': '', 'flags': ['--haha', '--pp']},
|
"xxx": {"path": "/a/b/c/repo3", "type": "", "flags": []},
|
||||||
'repo1': {'path': '/root/x/repo1', 'type': '', 'flags': []}
|
},
|
||||||
}),
|
),
|
||||||
])
|
(PATH_FNAME_EMPTY, {}),
|
||||||
@patch('gita.utils.is_git', return_value=True)
|
(
|
||||||
@patch('gita.common.get_config_fname')
|
PATH_FNAME_CLASH,
|
||||||
|
{
|
||||||
|
"repo2": {
|
||||||
|
"path": "/e/fgh/repo2",
|
||||||
|
"type": "",
|
||||||
|
"flags": ["--haha", "--pp"],
|
||||||
|
},
|
||||||
|
"repo1": {"path": "/root/x/repo1", "type": "", "flags": []},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@patch("gita.utils.is_git", return_value=True)
|
||||||
|
@patch("gita.common.get_config_fname")
|
||||||
def test_get_repos(mock_path_fname, _, path_fname, expected):
|
def test_get_repos(mock_path_fname, _, path_fname, expected):
|
||||||
mock_path_fname.return_value = path_fname
|
mock_path_fname.return_value = path_fname
|
||||||
utils.get_repos.cache_clear()
|
utils.get_repos.cache_clear()
|
||||||
assert utils.get_repos() == expected
|
assert utils.get_repos() == expected
|
||||||
|
|
||||||
|
|
||||||
@patch('gita.common.get_config_dir')
|
@patch("gita.common.get_config_dir")
|
||||||
def test_get_context(mock_config_dir):
|
def test_get_context(mock_config_dir):
|
||||||
mock_config_dir.return_value = TEST_DIR
|
mock_config_dir.return_value = TEST_DIR
|
||||||
utils.get_context.cache_clear()
|
utils.get_context.cache_clear()
|
||||||
assert utils.get_context() == TEST_DIR / 'xx.context'
|
assert utils.get_context() == TEST_DIR / "xx.context"
|
||||||
|
|
||||||
mock_config_dir.return_value = '/'
|
mock_config_dir.return_value = "/"
|
||||||
utils.get_context.cache_clear()
|
utils.get_context.cache_clear()
|
||||||
assert utils.get_context() == None
|
assert utils.get_context() == None
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('group_fname, expected', [
|
@pytest.mark.parametrize(
|
||||||
(GROUP_FNAME, {'xx': {'repos': ['a', 'b'], 'path': ''},
|
"group_fname, expected",
|
||||||
'yy': {'repos': ['a', 'c', 'd'], 'path': ''}}),
|
[
|
||||||
])
|
(
|
||||||
@patch('gita.common.get_config_fname')
|
GROUP_FNAME,
|
||||||
def test_get_groups(mock_group_fname, group_fname, expected):
|
{
|
||||||
|
"xx": {"repos": ["a", "b"], "path": ""},
|
||||||
|
"yy": {"repos": ["a", "c", "d"], "path": ""},
|
||||||
|
},
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@patch("gita.common.get_config_fname")
|
||||||
|
@patch("gita.utils.get_repos", return_value={"a": "", "b": "", "c": "", "d": ""})
|
||||||
|
def test_get_groups(_, mock_group_fname, group_fname, expected):
|
||||||
mock_group_fname.return_value = group_fname
|
mock_group_fname.return_value = group_fname
|
||||||
utils.get_groups.cache_clear()
|
utils.get_groups.cache_clear()
|
||||||
assert utils.get_groups() == expected
|
assert utils.get_groups() == expected
|
||||||
|
|
||||||
|
|
||||||
@patch('os.path.isfile', return_value=True)
|
@patch("os.path.isfile", return_value=True)
|
||||||
@patch('os.path.getsize', return_value=True)
|
@patch("os.path.getsize", return_value=True)
|
||||||
def test_custom_push_cmd(*_):
|
def test_custom_push_cmd(*_):
|
||||||
with patch('builtins.open',
|
with patch(
|
||||||
mock_open(read_data='{"push":{"cmd":"hand","help":"me","allow_all":true}}')):
|
"builtins.open",
|
||||||
|
mock_open(read_data='{"push":{"cmd":"hand","help":"me","allow_all":true}}'),
|
||||||
|
):
|
||||||
cmds = utils.get_cmds_from_files()
|
cmds = utils.get_cmds_from_files()
|
||||||
assert cmds['push'] == {'cmd': 'hand', 'help': 'me', 'allow_all': True}
|
assert cmds["push"] == {"cmd": "hand", "help": "me", "allow_all": True}
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'path_input, expected',
|
"path_input, expected",
|
||||||
[
|
[
|
||||||
(['/home/some/repo'], '/home/some/repo,some/repo,,\r\n'), # add one new
|
(["/home/some/repo"], "/home/some/repo,some/repo,,\r\n"), # add one new
|
||||||
(['/home/some/repo1', '/repo2'],
|
(
|
||||||
{'/repo2,repo2,,\r\n', # add two new
|
["/home/some/repo1", "/repo2"],
|
||||||
'/home/some/repo1,repo1,,\r\n'}), # add two new
|
{"/repo2,repo2,,\r\n", "/home/some/repo1,repo1,,\r\n"}, # add two new
|
||||||
(['/home/some/repo1', '/nos/repo'],
|
), # add two new
|
||||||
'/home/some/repo1,repo1,,\r\n'), # add one old one new
|
(
|
||||||
])
|
["/home/some/repo1", "/nos/repo"],
|
||||||
@patch('os.makedirs')
|
"/home/some/repo1,repo1,,\r\n",
|
||||||
@patch('gita.utils.is_git', return_value=True)
|
), # add one old one new
|
||||||
|
],
|
||||||
|
)
|
||||||
|
@patch("os.makedirs")
|
||||||
|
@patch("gita.utils.is_git", return_value=True)
|
||||||
def test_add_repos(_0, _1, path_input, expected, monkeypatch):
|
def test_add_repos(_0, _1, path_input, expected, monkeypatch):
|
||||||
monkeypatch.setenv('XDG_CONFIG_HOME', '/config')
|
monkeypatch.setenv("XDG_CONFIG_HOME", "/config")
|
||||||
with patch('builtins.open', mock_open()) as mock_file:
|
with patch("builtins.open", mock_open()) as mock_file:
|
||||||
utils.add_repos({'repo': {'path': '/nos/repo'}}, path_input)
|
utils.add_repos({"repo": {"path": "/nos/repo"}}, path_input)
|
||||||
mock_file.assert_called_with('/config/gita/repos.csv', 'a+', newline='')
|
mock_file.assert_called_with("/config/gita/repos.csv", "a+", newline="")
|
||||||
handle = mock_file()
|
handle = mock_file()
|
||||||
if type(expected) == str:
|
if type(expected) == str:
|
||||||
handle.write.assert_called_once_with(expected)
|
handle.write.assert_called_once_with(expected)
|
||||||
|
@ -145,21 +248,22 @@ def test_add_repos(_0, _1, path_input, expected, monkeypatch):
|
||||||
assert not kwargs
|
assert not kwargs
|
||||||
|
|
||||||
|
|
||||||
@patch('gita.utils.write_to_groups_file')
|
@patch("gita.utils.write_to_groups_file")
|
||||||
@patch('gita.utils.write_to_repo_file')
|
@patch("gita.utils.write_to_repo_file")
|
||||||
def test_rename_repo(mock_write, _):
|
def test_rename_repo(mock_write, _):
|
||||||
repos = {'r1': {'path': '/a/b', 'type': None},
|
repos = {"r1": {"path": "/a/b", "type": None}, "r2": {"path": "/c/c", "type": None}}
|
||||||
'r2': {'path': '/c/c', 'type': None}}
|
utils.rename_repo(repos, "r2", "xxx")
|
||||||
utils.rename_repo(repos, 'r2', 'xxx')
|
mock_write.assert_called_once_with(repos, "w")
|
||||||
mock_write.assert_called_once_with(repos, 'w')
|
|
||||||
|
|
||||||
|
|
||||||
def test_async_output(capfd):
|
def test_async_output(capfd):
|
||||||
tasks = [
|
tasks = [
|
||||||
utils.run_async('myrepo', '.', [
|
utils.run_async(
|
||||||
'python3', '-c',
|
"myrepo",
|
||||||
f"print({i});import time; time.sleep({i});print({i})"
|
".",
|
||||||
]) for i in range(4)
|
["python3", "-c", f"print({i});import time; time.sleep({i});print({i})"],
|
||||||
|
)
|
||||||
|
for i in range(4)
|
||||||
]
|
]
|
||||||
# I don't fully understand why a new loop is needed here. Without a new
|
# I don't fully understand why a new loop is needed here. Without a new
|
||||||
# loop, "pytest" fails but "pytest tests/test_utils.py" works. Maybe pytest
|
# loop, "pytest" fails but "pytest tests/test_utils.py" works. Maybe pytest
|
||||||
|
@ -168,12 +272,15 @@ def test_async_output(capfd):
|
||||||
utils.exec_async_tasks(tasks)
|
utils.exec_async_tasks(tasks)
|
||||||
|
|
||||||
out, err = capfd.readouterr()
|
out, err = capfd.readouterr()
|
||||||
assert err == ''
|
assert err == ""
|
||||||
assert out == 'myrepo: 0\nmyrepo: 0\n\nmyrepo: 1\nmyrepo: 1\n\nmyrepo: 2\nmyrepo: 2\n\nmyrepo: 3\nmyrepo: 3\n\n'
|
assert (
|
||||||
|
out
|
||||||
|
== "myrepo: 0\nmyrepo: 0\n\nmyrepo: 1\nmyrepo: 1\n\nmyrepo: 2\nmyrepo: 2\n\nmyrepo: 3\nmyrepo: 3\n\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_is_git(tmpdir):
|
def test_is_git(tmpdir):
|
||||||
with tmpdir.as_cwd():
|
with tmpdir.as_cwd():
|
||||||
subprocess.run('git init --bare .'.split())
|
subprocess.run("git init --bare .".split())
|
||||||
assert utils.is_git(Path.cwd()) is False
|
assert utils.is_git(Path.cwd()) is False
|
||||||
assert utils.is_git(Path.cwd(), include_bare=True) is True
|
assert utils.is_git(Path.cwd(), include_bare=True) is True
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue