1
0
Fork 0

Adding upstream version 0.10.9.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-11 18:25:06 +01:00
parent c0f23aff1f
commit a9c588f707
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
27 changed files with 1822 additions and 0 deletions

3
tests/clash_path_file Normal file
View file

@ -0,0 +1,3 @@
/a/bcd/repo1,repo1
/e/fgh/repo2,repo2
/root/x/repo1,repo1

26
tests/conftest.py Normal file
View file

@ -0,0 +1,26 @@
from pathlib import Path
from unittest.mock import MagicMock
TEST_DIR = Path(__file__).parents[0]
def fullpath(fname: str):
return str(TEST_DIR / fname)
PATH_FNAME = fullpath('mock_path_file')
PATH_FNAME_EMPTY = fullpath('empty_path_file')
PATH_FNAME_CLASH = fullpath('clash_path_file')
GROUP_FNAME = fullpath('mock_group_file')
def async_mock():
"""
Mock an async function. The calling arguments are saved in a MagicMock.
"""
m = MagicMock()
async def coro(*args, **kwargs):
return m(*args, **kwargs)
coro.mock = m
return coro

0
tests/empty_path_file Normal file
View file

2
tests/mock_group_file Normal file
View file

@ -0,0 +1,2 @@
xx: [a, b]
yy: [a, c, d]

4
tests/mock_path_file Normal file
View file

@ -0,0 +1,4 @@
/a/bcd/repo1,repo1
/a/b/c/repo3,xxx
/e/fgh/repo2,repo2

16
tests/test_info.py Normal file
View file

@ -0,0 +1,16 @@
import subprocess
from unittest.mock import patch, MagicMock
from gita import info
@patch('subprocess.run')
def test_run_quiet_diff(mock_run):
mock_return = MagicMock()
mock_run.return_value = mock_return
got = info.run_quiet_diff(['my', 'args'])
mock_run.assert_called_once_with(
['git', 'diff', '--quiet', 'my', 'args'],
stderr=subprocess.DEVNULL,
)
assert got == mock_return.returncode

167
tests/test_main.py Normal file
View file

@ -0,0 +1,167 @@
import pytest
from unittest.mock import patch
import argparse
import shlex
from gita import __main__
from gita import utils
from conftest import (
PATH_FNAME, PATH_FNAME_EMPTY, PATH_FNAME_CLASH, GROUP_FNAME,
async_mock
)
class TestLsLl:
@patch('gita.utils.get_config_fname')
def testLl(self, mock_path_fname, capfd, tmp_path):
""" functional test """
# avoid modifying the local configuration
mock_path_fname.return_value = tmp_path / 'path_config.txt'
__main__.main(['add', '.'])
out, err = capfd.readouterr()
assert err == ''
assert 'Found 1 new repo(s).\n' == out
# in production this is not needed
utils.get_repos.cache_clear()
__main__.main(['ls'])
out, err = capfd.readouterr()
assert err == ''
assert 'gita\n' == out
__main__.main(['ll'])
out, err = capfd.readouterr()
assert err == ''
assert 'gita' in out
__main__.main(['ls', 'gita'])
out, err = capfd.readouterr()
assert err == ''
assert out.strip() == utils.get_repos()['gita']
def testLs(self, monkeypatch, capfd):
monkeypatch.setattr(utils, 'get_repos',
lambda: {'repo1': '/a/', 'repo2': '/b/'})
monkeypatch.setattr(utils, 'describe', lambda x: x)
__main__.main(['ls'])
out, err = capfd.readouterr()
assert err == ''
assert out == "repo1 repo2\n"
__main__.main(['ls', 'repo1'])
out, err = capfd.readouterr()
assert err == ''
assert out == '/a/\n'
@pytest.mark.parametrize('path_fname, expected', [
(PATH_FNAME,
"repo1 cmaster dsu\x1b[0m msg\nrepo2 cmaster dsu\x1b[0m msg\nxxx cmaster dsu\x1b[0m msg\n"),
(PATH_FNAME_EMPTY, ""),
(PATH_FNAME_CLASH,
"repo1 cmaster dsu\x1b[0m msg\nrepo2 cmaster dsu\x1b[0m msg\nx/repo1 cmaster dsu\x1b[0m msg\n"
),
])
@patch('gita.utils.is_git', return_value=True)
@patch('gita.info.get_head', return_value="master")
@patch('gita.info._get_repo_status', return_value=("d", "s", "u", "c"))
@patch('gita.info.get_commit_msg', return_value="msg")
@patch('gita.utils.get_config_fname')
def testWithPathFiles(self, mock_path_fname, _0, _1, _2, _3, path_fname,
expected, capfd):
mock_path_fname.return_value = path_fname
utils.get_repos.cache_clear()
__main__.main(['ll'])
out, err = capfd.readouterr()
print(out)
assert err == ''
assert out == expected
@patch('os.path.isfile', return_value=True)
@patch('gita.utils.get_config_fname', return_value='some path')
@patch('gita.utils.get_repos', return_value={'repo1': '/a/', 'repo2': '/b/'})
@patch('gita.utils.write_to_repo_file')
def test_rm(mock_write, *_):
args = argparse.Namespace()
args.repo = ['repo1']
__main__.f_rm(args)
mock_write.assert_called_once_with({'repo2': '/b/'}, 'w')
def test_not_add():
# this won't write to disk because the repo is not valid
__main__.main(['add', '/home/some/repo/'])
@patch('gita.utils.get_repos', return_value={'repo2': '/d/efg'})
@patch('subprocess.run')
def test_fetch(mock_run, *_):
__main__.main(['fetch'])
mock_run.assert_called_once_with(['git', 'fetch'], cwd='/d/efg')
@patch(
'gita.utils.get_repos', return_value={
'repo1': '/a/bc',
'repo2': '/d/efg'
})
@patch('gita.utils.run_async', new=async_mock())
@patch('subprocess.run')
def test_async_fetch(*_):
__main__.main(['fetch'])
mock_run = utils.run_async.mock
assert mock_run.call_count == 2
cmds = ['git', 'fetch']
# print(mock_run.call_args_list)
mock_run.assert_any_call('repo1', '/a/bc', cmds)
mock_run.assert_any_call('repo2', '/d/efg', cmds)
@pytest.mark.parametrize('input', [
'diff --name-only --staged',
"commit -am 'lala kaka'",
])
@patch('gita.utils.get_repos', return_value={'repo7': 'path7'})
@patch('subprocess.run')
def test_superman(mock_run, _, input):
mock_run.reset_mock()
args = ['super', 'repo7'] + shlex.split(input)
__main__.main(args)
expected_cmds = ['git'] + shlex.split(input)
mock_run.assert_called_once_with(expected_cmds, cwd='path7')
@pytest.mark.parametrize('input, expected', [
('a', {'xx': ['b'], 'yy': ['c', 'd']}),
("c", {'xx': ['a', 'b'], 'yy': ['a', 'd']}),
("a b", {'yy': ['c', 'd']}),
])
@patch('gita.utils.get_repos', return_value={'a': '', 'b': '', 'c': '', 'd': ''})
@patch('gita.utils.get_config_fname', return_value=GROUP_FNAME)
@patch('gita.utils.write_to_groups_file')
def test_ungroup(mock_write, _, __, input, expected):
utils.get_groups.cache_clear()
args = ['ungroup'] + shlex.split(input)
__main__.main(args)
mock_write.assert_called_once_with(expected, 'w')
@patch('gita.utils.is_git', return_value=True)
@patch('gita.utils.get_config_fname', return_value=PATH_FNAME)
@patch('gita.utils.rename_repo')
def test_rename(mock_rename, _, __):
utils.get_repos.cache_clear()
args = ['rename', 'repo1', 'abc']
__main__.main(args)
mock_rename.assert_called_once_with(
{'repo1': '/a/bcd/repo1', 'repo2': '/e/fgh/repo2',
'xxx': '/a/b/c/repo3'},
'repo1', 'abc')
@patch('os.path.isfile', return_value=False)
def test_info(mock_isfile, capfd):
__main__.f_info(None)
out, err = capfd.readouterr()
assert 'In use: branch,commit_msg\nUnused: path\n' == out
assert err == ''

118
tests/test_utils.py Normal file
View file

@ -0,0 +1,118 @@
import pytest
import asyncio
from unittest.mock import patch, mock_open
from gita import utils, info
from conftest import (
PATH_FNAME, PATH_FNAME_EMPTY, PATH_FNAME_CLASH, GROUP_FNAME,
)
@pytest.mark.parametrize('test_input, diff_return, expected', [
({
'abc': '/root/repo/'
}, True, 'abc \x1b[31mrepo *+_ \x1b[0m msg'),
({
'repo': '/root/repo2/'
}, False, 'repo \x1b[32mrepo _ \x1b[0m msg'),
])
def test_describe(test_input, diff_return, expected, monkeypatch):
monkeypatch.setattr(info, 'get_head', lambda x: 'repo')
monkeypatch.setattr(info, 'run_quiet_diff', lambda _: diff_return)
monkeypatch.setattr(info, 'get_commit_msg', lambda _: "msg")
monkeypatch.setattr(info, 'has_untracked', lambda: True)
monkeypatch.setattr('os.chdir', lambda x: None)
print('expected: ', repr(expected))
print('got: ', repr(next(utils.describe(test_input))))
assert expected == next(utils.describe(test_input))
@pytest.mark.parametrize('path_fname, expected', [
(PATH_FNAME, {
'repo1': '/a/bcd/repo1',
'repo2': '/e/fgh/repo2',
'xxx': '/a/b/c/repo3',
}),
(PATH_FNAME_EMPTY, {}),
(PATH_FNAME_CLASH, {
'repo1': '/a/bcd/repo1',
'repo2': '/e/fgh/repo2',
'x/repo1': '/root/x/repo1'
}),
])
@patch('gita.utils.is_git', return_value=True)
@patch('gita.utils.get_config_fname')
def test_get_repos(mock_path_fname, _, path_fname, expected):
mock_path_fname.return_value = path_fname
utils.get_repos.cache_clear()
assert utils.get_repos() == expected
@pytest.mark.parametrize('group_fname, expected', [
(GROUP_FNAME, {'xx': ['a', 'b'], 'yy': ['a', 'c', 'd']}),
])
@patch('gita.utils.get_config_fname')
def test_get_groups(mock_group_fname, group_fname, expected):
mock_group_fname.return_value = group_fname
utils.get_groups.cache_clear()
assert utils.get_groups() == expected
@patch('os.path.isfile', return_value=True)
@patch('os.path.getsize', return_value=True)
def test_custom_push_cmd(*_):
with patch('builtins.open',
mock_open(read_data='push:\n cmd: hand\n help: me')):
cmds = utils.get_cmds_from_files()
assert cmds['push'] == {'cmd': 'hand', 'help': 'me'}
@pytest.mark.parametrize(
'path_input, expected',
[
(['/home/some/repo/'], '/home/some/repo,repo\n'), # add one new
(['/home/some/repo1', '/repo2'],
{'/repo2,repo2\n/home/some/repo1,repo1\n', # add two new
'/home/some/repo1,repo1\n/repo2,repo2\n'}), # add two new
(['/home/some/repo1', '/nos/repo'],
'/home/some/repo1,repo1\n'), # add one old one new
])
@patch('os.makedirs')
@patch('gita.utils.is_git', return_value=True)
def test_add_repos(_0, _1, path_input, expected, monkeypatch):
monkeypatch.setenv('XDG_CONFIG_HOME', '/config')
with patch('builtins.open', mock_open()) as mock_file:
utils.add_repos({'repo': '/nos/repo'}, path_input)
mock_file.assert_called_with('/config/gita/repo_path', 'a+')
handle = mock_file()
if type(expected) == str:
handle.write.assert_called_once_with(expected)
else:
handle.write.assert_called_once()
args, kwargs = handle.write.call_args
assert args[0] in expected
assert not kwargs
@patch('gita.utils.write_to_repo_file')
def test_rename_repo(mock_write):
utils.rename_repo({'r1': '/a/b', 'r2': '/c/c'}, 'r2', 'xxx')
mock_write.assert_called_once_with({'r1': '/a/b', 'xxx': '/c/c'}, 'w')
def test_async_output(capfd):
tasks = [
utils.run_async('myrepo', '.', [
'python3', '-c',
f"print({i});import time; time.sleep({i});print({i})"
]) for i in range(4)
]
# I don't fully understand why a new loop is needed here. Without a new
# loop, "pytest" fails but "pytest tests/test_utils.py" works. Maybe pytest
# itself uses asyncio (or maybe pytest-xdist)?
asyncio.set_event_loop(asyncio.new_event_loop())
utils.exec_async_tasks(tasks)
out, err = capfd.readouterr()
assert err == ''
assert out == 'myrepo: 0\nmyrepo: 0\n\nmyrepo: 1\nmyrepo: 1\n\nmyrepo: 2\nmyrepo: 2\n\nmyrepo: 3\nmyrepo: 3\n\n'