1
0
Fork 0
pre-commit/tests/xargs_test.py

252 lines
6.7 KiB
Python
Raw Normal View History

from __future__ import annotations
import concurrent.futures
import multiprocessing
import os
import sys
import time
from unittest import mock
import pytest
from pre_commit import parse_shebang
from pre_commit import xargs
def test_cpu_count_sched_getaffinity_exists():
with mock.patch.object(
os, 'sched_getaffinity', create=True, return_value=set(range(345)),
):
assert xargs.cpu_count() == 345
@pytest.fixture
def no_sched_getaffinity():
# Simulates an OS without os.sched_getaffinity available (mac/windows)
# https://docs.python.org/3/library/os.html#interface-to-the-scheduler
with mock.patch.object(
os,
'sched_getaffinity',
create=True,
side_effect=AttributeError,
):
yield
def test_cpu_count_multiprocessing_cpu_count_implemented(no_sched_getaffinity):
with mock.patch.object(multiprocessing, 'cpu_count', return_value=123):
assert xargs.cpu_count() == 123
def test_cpu_count_multiprocessing_cpu_count_not_implemented(
no_sched_getaffinity,
):
with mock.patch.object(
multiprocessing, 'cpu_count', side_effect=NotImplementedError,
):
assert xargs.cpu_count() == 1
@pytest.mark.parametrize(
('env', 'expected'),
(
({}, 0),
({b'x': b'1'}, 12),
({b'x': b'12'}, 13),
({b'x': b'1', b'y': b'2'}, 24),
),
)
def test_environ_size(env, expected):
# normalize integer sizing
assert xargs._environ_size(_env=env) == expected
@pytest.fixture
def win32_mock():
with mock.patch.object(sys, 'getfilesystemencoding', return_value='utf-8'):
with mock.patch.object(sys, 'platform', 'win32'):
yield
@pytest.fixture
def linux_mock():
with mock.patch.object(sys, 'getfilesystemencoding', return_value='utf-8'):
with mock.patch.object(sys, 'platform', 'linux'):
yield
def test_partition_trivial():
assert xargs.partition(('cmd',), (), 1) == (('cmd',),)
def test_partition_simple():
assert xargs.partition(('cmd',), ('foo',), 1) == (('cmd', 'foo'),)
def test_partition_limits():
ret = xargs.partition(
('ninechars',), (
# Just match the end (with spaces)
'.' * 5, '.' * 4,
# Just match the end (single arg)
'.' * 10,
# Goes over the end
'.' * 5,
'.' * 6,
),
1,
_max_length=21,
)
assert ret == (
('ninechars', '.' * 5, '.' * 4),
('ninechars', '.' * 10),
('ninechars', '.' * 5),
('ninechars', '.' * 6),
)
def test_partition_limit_win32(win32_mock):
cmd = ('ninechars',)
# counted as half because of utf-16 encode
varargs = ('😑' * 5,)
ret = xargs.partition(cmd, varargs, 1, _max_length=21)
assert ret == (cmd + varargs,)
def test_partition_limit_linux(linux_mock):
cmd = ('ninechars',)
varargs = ('😑' * 5,)
ret = xargs.partition(cmd, varargs, 1, _max_length=31)
assert ret == (cmd + varargs,)
def test_argument_too_long_with_large_unicode(linux_mock):
cmd = ('ninechars',)
varargs = ('😑' * 10,) # 4 bytes * 10
with pytest.raises(xargs.ArgumentTooLongError):
xargs.partition(cmd, varargs, 1, _max_length=20)
def test_partition_target_concurrency():
ret = xargs.partition(
('foo',), ('A',) * 22,
4,
_max_length=50,
)
assert ret == (
('foo',) + ('A',) * 6,
('foo',) + ('A',) * 6,
('foo',) + ('A',) * 6,
('foo',) + ('A',) * 4,
)
def test_partition_target_concurrency_wont_make_tiny_partitions():
ret = xargs.partition(
('foo',), ('A',) * 10,
4,
_max_length=50,
)
assert ret == (
('foo',) + ('A',) * 4,
('foo',) + ('A',) * 4,
('foo',) + ('A',) * 2,
)
def test_argument_too_long():
with pytest.raises(xargs.ArgumentTooLongError):
xargs.partition(('a' * 5,), ('a' * 5,), 1, _max_length=10)
def test_xargs_smoke():
ret, out = xargs.xargs(('echo',), ('hello', 'world'))
assert ret == 0
assert out.replace(b'\r\n', b'\n') == b'hello world\n'
exit_cmd = parse_shebang.normalize_cmd(('bash', '-c', 'exit $1', '--'))
# Abuse max_length to control the exit code
max_length = len(' '.join(exit_cmd)) + 3
def test_xargs_retcode_normal():
ret, _ = xargs.xargs(exit_cmd, ('0',), _max_length=max_length)
assert ret == 0
ret, _ = xargs.xargs(exit_cmd, ('0', '1'), _max_length=max_length)
assert ret == 1
# takes the maximum return code
ret, _ = xargs.xargs(exit_cmd, ('0', '5', '1'), _max_length=max_length)
assert ret == 5
@pytest.mark.xfail(sys.platform == 'win32', reason='posix only')
def test_xargs_retcode_killed_by_signal():
ret, _ = xargs.xargs(
parse_shebang.normalize_cmd(('bash', '-c', 'kill -9 $$', '--')),
('foo', 'bar'),
)
assert ret == -9
def test_xargs_concurrency():
bash_cmd = parse_shebang.normalize_cmd(('bash', '-c'))
print_pid = ('sleep 0.5 && echo $$',)
start = time.time()
ret, stdout = xargs.xargs(
bash_cmd, print_pid * 5,
target_concurrency=5,
_max_length=len(' '.join(bash_cmd + print_pid)) + 1,
)
elapsed = time.time() - start
assert ret == 0
pids = stdout.splitlines()
assert len(pids) == 5
# It would take 0.5*5=2.5 seconds to run all of these in serial, so if it
# takes less, they must have run concurrently.
assert elapsed < 2.5
def test_thread_mapper_concurrency_uses_threadpoolexecutor_map():
with xargs._thread_mapper(10) as thread_map:
_self = thread_map.__self__ # type: ignore
assert isinstance(_self, concurrent.futures.ThreadPoolExecutor)
def test_thread_mapper_concurrency_uses_regular_map():
with xargs._thread_mapper(1) as thread_map:
assert thread_map is map
def test_xargs_propagate_kwargs_to_cmd():
env = {'PRE_COMMIT_TEST_VAR': 'Pre commit is awesome'}
cmd: tuple[str, ...] = ('bash', '-c', 'echo $PRE_COMMIT_TEST_VAR', '--')
cmd = parse_shebang.normalize_cmd(cmd)
ret, stdout = xargs.xargs(cmd, ('1',), env=env)
assert ret == 0
assert b'Pre commit is awesome' in stdout
@pytest.mark.xfail(sys.platform == 'win32', reason='posix only')
def test_xargs_color_true_makes_tty():
retcode, out = xargs.xargs(
(sys.executable, '-c', 'import sys; print(sys.stdout.isatty())'),
('1',),
color=True,
)
assert retcode == 0
assert out == b'True\n'
@pytest.mark.xfail(os.name == 'posix', reason='nt only')
@pytest.mark.parametrize('filename', ('t.bat', 't.cmd', 'T.CMD'))
def test_xargs_with_batch_files(tmpdir, filename):
f = tmpdir.join(filename)
f.write('echo it works\n')
retcode, out = xargs.xargs((str(f),), ('x',) * 8192)
assert retcode == 0, (retcode, out)