Merging upstream version 4.66.2.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-05 19:14:31 +01:00
parent ec03e12832
commit 6759e100fe
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
61 changed files with 917 additions and 4364 deletions

View file

@ -18,24 +18,10 @@ def pretest_posttest():
n = len(tqdm._instances)
if n:
tqdm._instances.clear()
raise EnvironmentError(
"{0} `tqdm` instances still in existence PRE-test".format(n))
raise EnvironmentError(f"{n} `tqdm` instances still in existence PRE-test")
yield
if getattr(tqdm, "_instances", False):
n = len(tqdm._instances)
if n:
tqdm._instances.clear()
raise EnvironmentError(
"{0} `tqdm` instances still in existence POST-test".format(n))
if sys.version_info[0] > 2:
@fixture
def capsysbin(capsysbinary):
"""alias for capsysbinary (py3)"""
return capsysbinary
else:
@fixture
def capsysbin(capsys):
"""alias for capsys (py2)"""
return capsys
raise EnvironmentError(f"{n} `tqdm` instances still in existence POST-test")

View file

@ -1,128 +0,0 @@
import asyncio
from functools import partial
from sys import platform
from time import time
from tqdm.asyncio import tarange, tqdm_asyncio
from .tests_tqdm import StringIO, closing, mark
tqdm = partial(tqdm_asyncio, miniters=0, mininterval=0)
trange = partial(tarange, miniters=0, mininterval=0)
as_completed = partial(tqdm_asyncio.as_completed, miniters=0, mininterval=0)
gather = partial(tqdm_asyncio.gather, miniters=0, mininterval=0)
def count(start=0, step=1):
i = start
while True:
new_start = yield i
if new_start is None:
i += step
else:
i = new_start
async def acount(*args, **kwargs):
for i in count(*args, **kwargs):
yield i
@mark.asyncio
async def test_break():
"""Test asyncio break"""
pbar = tqdm(count())
async for _ in pbar:
break
pbar.close()
@mark.asyncio
async def test_generators(capsys):
"""Test asyncio generators"""
with tqdm(count(), desc="counter") as pbar:
async for i in pbar:
if i >= 8:
break
_, err = capsys.readouterr()
assert '9it' in err
with tqdm(acount(), desc="async_counter") as pbar:
async for i in pbar:
if i >= 8:
break
_, err = capsys.readouterr()
assert '9it' in err
@mark.asyncio
async def test_range():
"""Test asyncio range"""
with closing(StringIO()) as our_file:
async for _ in tqdm(range(9), desc="range", file=our_file):
pass
assert '9/9' in our_file.getvalue()
our_file.seek(0)
our_file.truncate()
async for _ in trange(9, desc="trange", file=our_file):
pass
assert '9/9' in our_file.getvalue()
@mark.asyncio
async def test_nested():
"""Test asyncio nested"""
with closing(StringIO()) as our_file:
async for _ in tqdm(trange(9, desc="inner", file=our_file),
desc="outer", file=our_file):
pass
assert 'inner: 100%' in our_file.getvalue()
assert 'outer: 100%' in our_file.getvalue()
@mark.asyncio
async def test_coroutines():
"""Test asyncio coroutine.send"""
with closing(StringIO()) as our_file:
with tqdm(count(), file=our_file) as pbar:
async for i in pbar:
if i == 9:
pbar.send(-10)
elif i < 0:
assert i == -9
break
assert '10it' in our_file.getvalue()
@mark.slow
@mark.asyncio
@mark.parametrize("tol", [0.2 if platform.startswith("darwin") else 0.1])
async def test_as_completed(capsys, tol):
"""Test asyncio as_completed"""
for retry in range(3):
t = time()
skew = time() - t
for i in as_completed([asyncio.sleep(0.01 * i) for i in range(30, 0, -1)]):
await i
t = time() - t - 2 * skew
try:
assert 0.3 * (1 - tol) < t < 0.3 * (1 + tol), t
_, err = capsys.readouterr()
assert '30/30' in err
except AssertionError:
if retry == 2:
raise
async def double(i):
return i * 2
@mark.asyncio
async def test_gather(capsys):
"""Test asyncio gather"""
res = await gather(*map(double, range(30)))
_, err = capsys.readouterr()
assert '30/30' in err
assert res == list(range(0, 30 * 2, 2))

View file

@ -1,11 +1,129 @@
"""Tests `tqdm.asyncio` on `python>=3.7`."""
import sys
"""Tests `tqdm.asyncio`."""
import asyncio
from functools import partial
from sys import platform
from time import time
if sys.version_info[:2] > (3, 6):
from .py37_asyncio import * # NOQA, pylint: disable=wildcard-import
else:
from .tests_tqdm import skip
try:
skip("async not supported", allow_module_level=True)
except TypeError:
pass
from tqdm.asyncio import tarange, tqdm_asyncio
from .tests_tqdm import StringIO, closing, mark
tqdm = partial(tqdm_asyncio, miniters=0, mininterval=0)
trange = partial(tarange, miniters=0, mininterval=0)
as_completed = partial(tqdm_asyncio.as_completed, miniters=0, mininterval=0)
gather = partial(tqdm_asyncio.gather, miniters=0, mininterval=0)
def count(start=0, step=1):
i = start
while True:
new_start = yield i
if new_start is None:
i += step
else:
i = new_start
async def acount(*args, **kwargs):
for i in count(*args, **kwargs):
yield i
@mark.asyncio
async def test_break():
"""Test asyncio break"""
pbar = tqdm(count())
async for _ in pbar:
break
pbar.close()
@mark.asyncio
async def test_generators(capsys):
"""Test asyncio generators"""
with tqdm(count(), desc="counter") as pbar:
async for i in pbar:
if i >= 8:
break
_, err = capsys.readouterr()
assert '9it' in err
with tqdm(acount(), desc="async_counter") as pbar:
async for i in pbar:
if i >= 8:
break
_, err = capsys.readouterr()
assert '9it' in err
@mark.asyncio
async def test_range():
"""Test asyncio range"""
with closing(StringIO()) as our_file:
async for _ in tqdm(range(9), desc="range", file=our_file):
pass
assert '9/9' in our_file.getvalue()
our_file.seek(0)
our_file.truncate()
async for _ in trange(9, desc="trange", file=our_file):
pass
assert '9/9' in our_file.getvalue()
@mark.asyncio
async def test_nested():
"""Test asyncio nested"""
with closing(StringIO()) as our_file:
async for _ in tqdm(trange(9, desc="inner", file=our_file),
desc="outer", file=our_file):
pass
assert 'inner: 100%' in our_file.getvalue()
assert 'outer: 100%' in our_file.getvalue()
@mark.asyncio
async def test_coroutines():
"""Test asyncio coroutine.send"""
with closing(StringIO()) as our_file:
with tqdm(count(), file=our_file) as pbar:
async for i in pbar:
if i == 9:
pbar.send(-10)
elif i < 0:
assert i == -9
break
assert '10it' in our_file.getvalue()
@mark.slow
@mark.asyncio
@mark.parametrize("tol", [0.2 if platform.startswith("darwin") else 0.1])
async def test_as_completed(capsys, tol):
"""Test asyncio as_completed"""
for retry in range(3):
t = time()
skew = time() - t
for i in as_completed([asyncio.sleep(0.01 * i) for i in range(30, 0, -1)]):
await i
t = time() - t - 2 * skew
try:
assert 0.3 * (1 - tol) < t < 0.3 * (1 + tol), t
_, err = capsys.readouterr()
assert '30/30' in err
except AssertionError:
if retry == 2:
raise
async def double(i):
return i * 2
@mark.asyncio
async def test_gather(capsys):
"""Test asyncio gather"""
res = await gather(*map(double, range(30)))
_, err = capsys.readouterr()
assert '30/30' in err
assert res == list(range(0, 30 * 2, 2))

View file

@ -1,8 +1,6 @@
"""
Tests for `tqdm.contrib`.
"""
import sys
import pytest
from tqdm import tqdm
@ -47,12 +45,9 @@ def test_zip(tqdm_kwargs):
with closing(StringIO()) as our_file:
a = range(9)
b = [i + 1 for i in a]
if sys.version_info[:1] < (3,):
assert tzip(a, b, file=our_file, **tqdm_kwargs) == zip(a, b)
else:
gen = tzip(a, b, file=our_file, **tqdm_kwargs)
assert gen != list(zip(a, b))
assert list(gen) == list(zip(a, b))
gen = tzip(a, b, file=our_file, **tqdm_kwargs)
assert gen != list(zip(a, b))
assert list(gen) == list(zip(a, b))
@pytest.mark.parametrize("tqdm_kwargs", [{}, {"tqdm_class": tqdm}])
@ -61,11 +56,6 @@ def test_map(tqdm_kwargs):
with closing(StringIO()) as our_file:
a = range(9)
b = [i + 1 for i in a]
if sys.version_info[:1] < (3,):
assert tmap(lambda x: x + 1, a, file=our_file, **tqdm_kwargs) == map(
incr, a
)
else:
gen = tmap(lambda x: x + 1, a, file=our_file, **tqdm_kwargs)
assert gen != b
assert list(gen) == b
gen = tmap(lambda x: x + 1, a, file=our_file, **tqdm_kwargs)
assert gen != b
assert list(gen) == b

View file

@ -1,7 +1,5 @@
# pylint: disable=missing-module-docstring, missing-class-docstring
# pylint: disable=missing-function-docstring, no-self-use
from __future__ import absolute_import
import logging
import logging.handlers
import sys

View file

@ -1,5 +1,3 @@
from __future__ import division
from time import sleep
from .tests_tqdm import importorskip, mark

View file

@ -1,5 +1,3 @@
from __future__ import division
from .tests_tqdm import importorskip, mark
pytestmark = mark.slow
@ -41,8 +39,8 @@ def test_keras(capsys):
verbose=0)])
_, res = capsys.readouterr()
assert "training: " in res
assert "{epochs}/{epochs}".format(epochs=epochs) in res
assert "{batches}/{batches}".format(batches=batches) not in res
assert f"{epochs}/{epochs}" in res
assert f"{batches}/{batches}" not in res
# full (epoch and batch) progress
model.fit(
@ -60,8 +58,8 @@ def test_keras(capsys):
verbose=2)])
_, res = capsys.readouterr()
assert "training: " in res
assert "{epochs}/{epochs}".format(epochs=epochs) in res
assert "{batches}/{batches}".format(batches=batches) in res
assert f"{epochs}/{epochs}" in res
assert f"{batches}/{batches}" in res
# auto-detect epochs and batches
model.fit(
@ -73,8 +71,8 @@ def test_keras(capsys):
callbacks=[TqdmCallback(desc="training", verbose=2)])
_, res = capsys.readouterr()
assert "training: " in res
assert "{epochs}/{epochs}".format(epochs=epochs) in res
assert "{batches}/{batches}".format(batches=batches) in res
assert f"{epochs}/{epochs}" in res
assert f"{batches}/{batches}" in res
# continue training (start from epoch != 0)
initial_epoch = 3
@ -89,5 +87,5 @@ def test_keras(capsys):
miniters=1, mininterval=0, maxinterval=0)])
_, res = capsys.readouterr()
assert "training: " in res
assert "{epochs}/{epochs}".format(epochs=initial_epoch - 1) not in res
assert "{epochs}/{epochs}".format(epochs=epochs) in res
assert f"{initial_epoch - 1}/{initial_epoch - 1}" not in res
assert f"{epochs}/{epochs}" in res

View file

@ -8,17 +8,17 @@ from os import linesep
from tqdm.cli import TqdmKeyError, TqdmTypeError, main
from tqdm.utils import IS_WIN
from .tests_tqdm import BytesIO, _range, closing, mark, raises
from .tests_tqdm import BytesIO, closing, mark, raises
def restore_sys(func):
"""Decorates `func(capsysbin)` to save & restore `sys.(stdin|argv)`."""
"""Decorates `func(capsysbinary)` to save & restore `sys.(stdin|argv)`."""
@wraps(func)
def inner(capsysbin):
"""function requiring capsysbin which may alter `sys.(stdin|argv)`"""
def inner(capsysbinary):
"""function requiring capsysbinary which may alter `sys.(stdin|argv)`"""
_SYS = sys.stdin, sys.argv
try:
res = func(capsysbin)
res = func(capsysbinary)
finally:
sys.stdin, sys.argv = _SYS
return res
@ -58,7 +58,7 @@ def test_main_import():
N = 123
_SYS = sys.stdin, sys.argv
# test direct import
sys.stdin = [str(i).encode() for i in _range(N)]
sys.stdin = [str(i).encode() for i in range(N)]
sys.argv = ['', '--desc', 'Test CLI import',
'--ascii', 'True', '--unit_scale', 'True']
try:
@ -68,19 +68,19 @@ def test_main_import():
@restore_sys
def test_main_bytes(capsysbin):
def test_main_bytes(capsysbinary):
"""Test CLI --bytes"""
N = 123
# test --delim
IN_DATA = '\0'.join(map(str, _range(N))).encode()
IN_DATA = '\0'.join(map(str, range(N))).encode()
with closing(BytesIO()) as sys.stdin:
sys.stdin.write(IN_DATA)
# sys.stdin.write(b'\xff') # TODO
sys.stdin.seek(0)
main(sys.stderr, ['--desc', 'Test CLI delim', '--ascii', 'True',
'--delim', r'\0', '--buf_size', '64'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert out == IN_DATA
assert str(N) + "it" in err.decode("U8")
@ -90,27 +90,26 @@ def test_main_bytes(capsysbin):
sys.stdin.write(IN_DATA)
sys.stdin.seek(0)
main(sys.stderr, ['--ascii', '--bytes=True', '--unit_scale', 'False'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert out == IN_DATA
assert str(len(IN_DATA)) + "B" in err.decode("U8")
@mark.skipif(sys.version_info[0] == 2, reason="no caplog on py2")
def test_main_log(capsysbin, caplog):
def test_main_log(capsysbinary, caplog):
"""Test CLI --log"""
_SYS = sys.stdin, sys.argv
N = 123
sys.stdin = [(str(i) + '\n').encode() for i in _range(N)]
sys.stdin = [(str(i) + '\n').encode() for i in range(N)]
IN_DATA = b''.join(sys.stdin)
try:
with caplog.at_level(logging.INFO):
main(sys.stderr, ['--log', 'INFO'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA and b"123/123" in err
assert not caplog.record_tuples
with caplog.at_level(logging.DEBUG):
main(sys.stderr, ['--log', 'DEBUG'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA and b"123/123" in err
assert caplog.record_tuples
finally:
@ -118,39 +117,39 @@ def test_main_log(capsysbin, caplog):
@restore_sys
def test_main(capsysbin):
def test_main(capsysbinary):
"""Test misc CLI options"""
N = 123
sys.stdin = [(str(i) + '\n').encode() for i in _range(N)]
sys.stdin = [(str(i) + '\n').encode() for i in range(N)]
IN_DATA = b''.join(sys.stdin)
# test --tee
main(sys.stderr, ['--mininterval', '0', '--miniters', '1'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA and b"123/123" in err
assert N <= len(err.split(b"\r")) < N + 5
len_err = len(err)
main(sys.stderr, ['--tee', '--mininterval', '0', '--miniters', '1'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA and b"123/123" in err
# spaces to clear intermediate lines could increase length
assert len_err + len(norm(out)) <= len(err)
# test --null
main(sys.stderr, ['--null'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert not out and b"123/123" in err
# test integer --update
main(sys.stderr, ['--update'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA
assert (str(N // 2 * N) + "it").encode() in err, "expected arithmetic sum formula"
# test integer --update_to
main(sys.stderr, ['--update-to'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA
assert (str(N - 1) + "it").encode() in err
assert (str(N) + "it").encode() not in err
@ -161,23 +160,23 @@ def test_main(capsysbin):
# test integer --update --delim
sys.stdin.seek(0)
main(sys.stderr, ['--update', '--delim', 'D'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert out == IN_DATA.replace(b'\n', b'D')
assert (str(N // 2 * N) + "it").encode() in err, "expected arithmetic sum"
# test integer --update_to --delim
sys.stdin.seek(0)
main(sys.stderr, ['--update-to', '--delim', 'D'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert out == IN_DATA.replace(b'\n', b'D')
assert (str(N - 1) + "it").encode() in err
assert (str(N) + "it").encode() not in err
# test float --update_to
sys.stdin = [(str(i / 2.0) + '\n').encode() for i in _range(N)]
sys.stdin = [(str(i / 2.0) + '\n').encode() for i in range(N)]
IN_DATA = b''.join(sys.stdin)
main(sys.stderr, ['--update-to'])
out, err = capsysbin.readouterr()
out, err = capsysbinary.readouterr()
assert norm(out) == IN_DATA
assert (str((N - 1) / 2.0) + "it").encode() in err
assert (str(N / 2.0) + "it").encode() not in err
@ -213,30 +212,30 @@ def test_comppath(tmp_path):
@restore_sys
def test_exceptions(capsysbin):
def test_exceptions(capsysbinary):
"""Test CLI Exceptions"""
N = 123
sys.stdin = [str(i) + '\n' for i in _range(N)]
sys.stdin = [str(i) + '\n' for i in range(N)]
IN_DATA = ''.join(sys.stdin).encode()
with raises(TqdmKeyError, match="bad_arg_u_ment"):
main(sys.stderr, argv=['-ascii', '-unit_scale', '--bad_arg_u_ment', 'foo'])
out, _ = capsysbin.readouterr()
out, _ = capsysbinary.readouterr()
assert norm(out) == IN_DATA
with raises(TqdmTypeError, match="invalid_bool_value"):
main(sys.stderr, argv=['-ascii', '-unit_scale', 'invalid_bool_value'])
out, _ = capsysbin.readouterr()
out, _ = capsysbinary.readouterr()
assert norm(out) == IN_DATA
with raises(TqdmTypeError, match="invalid_int_value"):
main(sys.stderr, argv=['-ascii', '--total', 'invalid_int_value'])
out, _ = capsysbin.readouterr()
out, _ = capsysbinary.readouterr()
assert norm(out) == IN_DATA
with raises(TqdmKeyError, match="Can only have one of --"):
main(sys.stderr, argv=['--update', '--update_to'])
out, _ = capsysbin.readouterr()
out, _ = capsysbinary.readouterr()
assert norm(out) == IN_DATA
# test SystemExits

View file

@ -4,6 +4,7 @@ from .tests_tqdm import StringIO, closing, importorskip, mark, skip
pytestmark = mark.slow
np = importorskip('numpy')
random = importorskip('numpy.random')
rand = random.rand
randint = random.randint
@ -39,8 +40,8 @@ def test_pandas_rolling_expanding():
our_file.seek(0)
if our_file.getvalue().count(exres) < 2:
our_file.seek(0)
raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format(
exres + " at least twice.", our_file.read()))
raise AssertionError(
f"\nExpected:\n{exres} at least twice.\nIn:\n{our_file.read()}\n")
def test_pandas_series():
@ -62,10 +63,11 @@ def test_pandas_series():
our_file.seek(0)
if our_file.getvalue().count(exres) < 2:
our_file.seek(0)
raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format(
exres + " at least twice.", our_file.read()))
raise AssertionError(
f"\nExpected:\n{exres} at least twice.\nIn:\n{our_file.read()}\n")
@mark.filterwarnings("ignore:DataFrame.applymap has been deprecated:FutureWarning")
def test_pandas_data_frame():
"""Test pandas.DataFrame.progress_apply and .progress_applymap"""
with closing(StringIO()) as our_file:
@ -80,6 +82,12 @@ def test_pandas_data_frame():
res2 = df.applymap(task_func)
assert res1.equals(res2)
# map
if hasattr(df, 'map'): # pandas>=2.1.0
res1 = df.progress_map(task_func)
res2 = df.map(task_func)
assert res1.equals(res2)
# apply unhashable
res1 = []
df.progress_apply(res1.extend)
@ -94,8 +102,8 @@ def test_pandas_data_frame():
our_file.seek(0)
if our_file.read().count('100%') < 3:
our_file.seek(0)
raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format(
'100% at least three times', our_file.read()))
raise AssertionError(
f"\nExpected:\n100% at least three times\nIn:\n{our_file.read()}\n")
# apply_map, apply axis=0, apply axis=1
expects = ['20000/20000', '200/200', '100/100']
@ -103,10 +111,12 @@ def test_pandas_data_frame():
our_file.seek(0)
if our_file.getvalue().count(exres) < 1:
our_file.seek(0)
raise AssertionError("\nExpected:\n{0}\nIn:\n {1}\n".format(
exres + " at least once.", our_file.read()))
raise AssertionError(
f"\nExpected:\n{exres} at least once.\nIn:\n{our_file.read()}\n")
@mark.filterwarnings(
"ignore:DataFrameGroupBy.apply operated on the grouping columns:DeprecationWarning")
def test_pandas_groupby_apply():
"""Test pandas.DataFrame.groupby(...).progress_apply"""
with closing(StringIO()) as our_file:
@ -119,8 +129,8 @@ def test_pandas_groupby_apply():
dfs.groupby(['a']).progress_apply(lambda x: None)
df2 = df = pd.DataFrame({'a': randint(1, 8, 10000), 'b': rand(10000)})
res1 = df2.groupby("a").apply(max)
res2 = df2.groupby("a").progress_apply(max)
res1 = df2.groupby("a").apply(np.maximum.reduce)
res2 = df2.groupby("a").progress_apply(np.maximum.reduce)
assert res1.equals(res2)
our_file.seek(0)
@ -130,8 +140,7 @@ def test_pandas_groupby_apply():
nexres = '100%|##########|'
if nexres in our_file.read():
our_file.seek(0)
raise AssertionError("\nDid not expect:\n{0}\nIn:{1}\n".format(
nexres, our_file.read()))
raise AssertionError(f"\nDid not expect:\n{nexres}\nIn:{our_file.read()}\n")
with closing(StringIO()) as our_file:
tqdm.pandas(file=our_file, leave=True, ascii=True)
@ -140,26 +149,28 @@ def test_pandas_groupby_apply():
dfs.loc[0] = [2, 1, 1]
dfs['d'] = 100
expects = ['500/500', '1/1', '4/4', '2/2']
expects = ['500/500', '1/1', '4/4', '4/4']
dfs.groupby(dfs.index).progress_apply(lambda x: None)
dfs.groupby('d').progress_apply(lambda x: None)
dfs.groupby(dfs.columns, axis=1).progress_apply(lambda x: None)
dfs.groupby([2, 2, 1, 1], axis=1).progress_apply(lambda x: None)
dfs.T.groupby(dfs.columns).progress_apply(lambda x: None)
dfs.T.groupby([2, 2, 1, 1]).progress_apply(lambda x: None)
our_file.seek(0)
if our_file.read().count('100%') < 4:
our_file.seek(0)
raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format(
'100% at least four times', our_file.read()))
raise AssertionError(
f"\nExpected:\n100% at least four times\nIn:\n{our_file.read()}\n")
for exres in expects:
our_file.seek(0)
if our_file.getvalue().count(exres) < 1:
our_file.seek(0)
raise AssertionError("\nExpected:\n{0}\nIn:\n {1}\n".format(
exres + " at least once.", our_file.read()))
raise AssertionError(
f"\nExpected:\n{exres} at least once.\nIn:\n{our_file.read()}\n")
@mark.filterwarnings(
"ignore:DataFrameGroupBy.apply operated on the grouping columns:DeprecationWarning")
def test_pandas_leave():
"""Test pandas with `leave=True`"""
with closing(StringIO()) as our_file:
@ -172,8 +183,7 @@ def test_pandas_leave():
exres = '100%|##########| 100/100'
if exres not in our_file.read():
our_file.seek(0)
raise AssertionError("\nExpected:\n{0}\nIn:{1}\n".format(
exres, our_file.read()))
raise AssertionError(f"\nExpected:\n{exres}\nIn:{our_file.read()}\n")
def test_pandas_apply_args_deprecation():
@ -195,6 +205,8 @@ def test_pandas_apply_args_deprecation():
"keyword arguments instead"))
@mark.filterwarnings(
"ignore:DataFrameGroupBy.apply operated on the grouping columns:DeprecationWarning")
def test_pandas_deprecation():
"""Test bar object instance as argument deprecation"""
try:

View file

@ -1,5 +1,3 @@
from __future__ import division, print_function
import sys
from contextlib import contextmanager
from functools import wraps
@ -14,7 +12,7 @@ except ImportError:
from tqdm import tqdm, trange
from .tests_tqdm import _range, importorskip, mark, patch_lock, skip
from .tests_tqdm import importorskip, mark, patch_lock, skip
pytestmark = mark.slow
@ -98,10 +96,7 @@ def simple_progress(iterable=None, total=None, file=sys.stdout, desc='',
def format_interval(t):
mins, s = divmod(int(t), 60)
h, m = divmod(mins, 60)
if h:
return '{0:d}:{1:02d}:{2:02d}'.format(h, m, s)
else:
return '{0:02d}:{1:02d}'.format(m, s)
return f'{h:d}:{m:02d}:{s:02d}' if h else f'{m:02d}:{s:02d}'
def update_and_print(i=1):
n[0] += i
@ -143,20 +138,15 @@ def simple_progress(iterable=None, total=None, file=sys.stdout, desc='',
update_and_print(0)
if iterable is not None:
return update_and_yield()
else:
return update_and_print
return update_and_print
def assert_performance(thresh, name_left, time_left, name_right, time_right):
"""raises if time_left > thresh * time_right"""
if time_left > thresh * time_right:
raise ValueError(
('{name[0]}: {time[0]:f}, '
'{name[1]}: {time[1]:f}, '
'ratio {ratio:f} > {thresh:f}').format(
name=(name_left, name_right),
time=(time_left, time_right),
ratio=time_left / time_right, thresh=thresh))
f'{name_left}: {time_left:f}, {name_right}: {time_right:f}'
f', ratio {time_left / time_right:f} > {thresh:f}')
@retry_on_except()
@ -173,7 +163,7 @@ def test_iter_basic_overhead():
a = 0
with relative_timer() as time_bench:
for i in _range(total):
for i in range(total):
a += i
sys.stdout.write(str(a))
@ -188,13 +178,13 @@ def test_manual_basic_overhead():
with tqdm(total=total * 10, leave=True) as t:
a = 0
with relative_timer() as time_tqdm:
for i in _range(total):
for i in range(total):
a += i
t.update(10)
a = 0
with relative_timer() as time_bench:
for i in _range(total):
for i in range(total):
a += i
sys.stdout.write(str(a))
@ -249,7 +239,7 @@ def test_iter_overhead_hard():
a = 0
with relative_timer() as time_bench:
for i in _range(total):
for i in range(total):
a += i
sys.stdout.write(("%i" % a) * 40)
@ -265,13 +255,13 @@ def test_manual_overhead_hard():
mininterval=0, maxinterval=0) as t:
a = 0
with relative_timer() as time_tqdm:
for i in _range(total):
for i in range(total):
a += i
t.update(10)
a = 0
with relative_timer() as time_bench:
for i in _range(total):
for i in range(total):
a += i
sys.stdout.write(("%i" % a) * 40)
@ -292,7 +282,7 @@ def test_iter_overhead_simplebar_hard():
assert a == (total ** 2 - total) / 2.0
a = 0
s = simple_progress(_range(total), leave=True,
s = simple_progress(range(total), leave=True,
miniters=1, mininterval=0)
with relative_timer() as time_bench:
for i in s:
@ -310,7 +300,7 @@ def test_manual_overhead_simplebar_hard():
mininterval=0, maxinterval=0) as t:
a = 0
with relative_timer() as time_tqdm:
for i in _range(total):
for i in range(total):
a += i
t.update(10)
@ -318,7 +308,7 @@ def test_manual_overhead_simplebar_hard():
miniters=1, mininterval=0)
a = 0
with relative_timer() as time_bench:
for i in _range(total):
for i in range(total):
a += i
simplebar_update(10)

View file

@ -1,10 +1,7 @@
"""Test `tqdm.rich`."""
import sys
from .tests_tqdm import importorskip, mark
from .tests_tqdm import importorskip
@mark.skipif(sys.version_info[:3] < (3, 6, 1), reason="`rich` needs py>=3.6.1")
def test_rich_import():
"""Test `tqdm.rich` import"""
importorskip('tqdm.rich')

View file

@ -1,13 +1,9 @@
from __future__ import division
import sys
from functools import wraps
from threading import Event
from time import sleep, time
from tqdm import TMonitor, tqdm, trange
from .tests_perf import retry_on_except
from .tests_tqdm import StringIO, closing, importorskip, patch_lock, skip
@ -37,18 +33,13 @@ class Time(object):
sleep(0.000001) # sleep to allow interrupt (instead of pass)
def FakeEvent():
class FakeEvent(Event):
"""patched `threading.Event` where `wait()` uses `Time.fake_sleep()`"""
event = Event() # not a class in py2 so can't inherit
def wait(timeout=None):
def wait(self, timeout=None):
"""uses Time.fake_sleep"""
if timeout is not None:
Time.fake_sleep(timeout)
return event.is_set()
event.wait = wait
return event
return self.is_set()
def patch_sleep(func):
@ -206,19 +197,11 @@ def test_imap():
assert res[-1] == 100
# py2: locks won't propagate to incr_bar so may cause `AttributeError`
@retry_on_except(n=3 if sys.version_info < (3,) else 1, check_cpu_time=False)
@patch_lock(thread=True)
def test_threadpool():
"""Test concurrent.futures.ThreadPoolExecutor"""
ThreadPoolExecutor = importorskip('concurrent.futures').ThreadPoolExecutor
with ThreadPoolExecutor(8) as pool:
try:
res = list(tqdm(pool.map(incr_bar, range(100)), disable=True))
except AttributeError:
if sys.version_info < (3,):
skip("not supported on py2")
else:
raise
res = list(tqdm(pool.map(incr_bar, range(100)), disable=True))
assert sum(res) == sum(range(1, 101))

View file

@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
# Advice: use repr(our_file.read()) to print the full output of tqdm
# (else '\r' will replace the previous lines and you'll see only the latest.
from __future__ import print_function
import csv
import os
import re
@ -37,16 +35,6 @@ if getattr(StringIO, '__exit__', False) and getattr(StringIO, '__enter__', False
else:
from contextlib import closing
try:
_range = xrange
except NameError:
_range = range
try:
_unicode = unicode
except NameError:
_unicode = str
nt_and_no_colorama = False
if os.name == 'nt':
try:
@ -201,6 +189,8 @@ def test_format_num():
assert float(format_num(1337)) == 1337
assert format_num(int(1e6)) == '1e+6'
assert format_num(1239876) == '1' '239' '876'
assert format_num(0.00001234) == '1.23e-5'
assert format_num(-0.1234) == '-0.123'
def test_format_meter():
@ -271,11 +261,10 @@ def test_format_meter():
20, 100, 12, ncols=14, rate=8.1,
bar_format=r'{l_bar}{bar}|{n_fmt}/{total_fmt}') == " 20%|" + unich(0x258d) + " |20/100"
# Check wide characters
if sys.version_info >= (3,):
assert format_meter(0, 1000, 13, ncols=68, prefix=': ') == (
": 0%| | 0/1000 [00:13<?, ?it/s]")
assert format_meter(0, 1000, 13, ncols=68, prefix='ニッポン [ニッポン]: ') == (
"ニッポン [ニッポン]: 0%| | 0/1000 [00:13<?, ?it/s]")
assert format_meter(0, 1000, 13, ncols=68, prefix=': ') == (
": 0%| | 0/1000 [00:13<?, ?it/s]")
assert format_meter(0, 1000, 13, ncols=68, prefix='ニッポン [ニッポン]: ') == (
"ニッポン [ニッポン]: 0%| | 0/1000 [00:13<?, ?it/s]")
# Check that bar_format can print only {bar} or just one side
assert format_meter(20, 100, 12, ncols=2, rate=8.1,
bar_format=r'{bar}') == unich(0x258d) + " "
@ -328,11 +317,11 @@ def test_si_format():
def test_bar_formatspec():
"""Test Bar.__format__ spec"""
assert "{0:5a}".format(Bar(0.3)) == "#5 "
assert "{0:2}".format(Bar(0.5, charset=" .oO0")) == "0 "
assert "{0:2a}".format(Bar(0.5, charset=" .oO0")) == "# "
assert "{0:-6a}".format(Bar(0.5, 10)) == '## '
assert "{0:2b}".format(Bar(0.5, 10)) == ' '
assert f"{Bar(0.3):5a}" == "#5 "
assert f"{Bar(0.5, charset=' .oO0'):2}" == "0 "
assert f"{Bar(0.5, charset=' .oO0'):2a}" == "# "
assert f"{Bar(0.5, 10):-6a}" == '## '
assert f"{Bar(0.5, 10):2b}" == ' '
def test_all_defaults():
@ -401,7 +390,7 @@ def test_iterate_over_csv_rows():
# Create a test csv pseudo file
with closing(StringIO()) as test_csv_file:
writer = csv.writer(test_csv_file)
for _ in _range(3):
for _ in range(3):
writer.writerow(['test'] * 3)
test_csv_file.seek(0)
@ -415,7 +404,7 @@ def test_iterate_over_csv_rows():
def test_file_output():
"""Test output to arbitrary file-like objects"""
with closing(StringIO()) as our_file:
for i in tqdm(_range(3), file=our_file):
for i in tqdm(range(3), file=our_file):
if i == 1:
our_file.seek(0)
assert '0/3' in our_file.read()
@ -424,14 +413,14 @@ def test_file_output():
def test_leave_option():
"""Test `leave=True` always prints info about the last iteration"""
with closing(StringIO()) as our_file:
for _ in tqdm(_range(3), file=our_file, leave=True):
for _ in tqdm(range(3), file=our_file, leave=True):
pass
res = our_file.getvalue()
assert '| 3/3 ' in res
assert '\n' == res[-1] # not '\r'
with closing(StringIO()) as our_file2:
for _ in tqdm(_range(3), file=our_file2, leave=False):
for _ in tqdm(range(3), file=our_file2, leave=False):
pass
assert '| 3/3 ' not in our_file2.getvalue()
@ -452,7 +441,7 @@ def test_trange():
def test_min_interval():
"""Test mininterval"""
with closing(StringIO()) as our_file:
for _ in tqdm(_range(3), file=our_file, mininterval=1e-10):
for _ in tqdm(range(3), file=our_file, mininterval=1e-10):
pass
assert " 0%| | 0/3 [00:00<" in our_file.getvalue()
@ -484,7 +473,7 @@ def test_max_interval():
t.update(bigstep)
t2.update(bigstep)
# The next iterations should not trigger maxinterval (step 10)
for _ in _range(4):
for _ in range(4):
t.update(smallstep)
t2.update(smallstep)
timer.sleep(1e-5)
@ -504,7 +493,7 @@ def test_max_interval():
# Increase 10 iterations at once
t.update(bigstep)
# The next iterations should trigger maxinterval (step 5)
for _ in _range(4):
for _ in range(4):
t.update(smallstep)
timer.sleep(1e-2)
@ -513,7 +502,7 @@ def test_max_interval():
# Test iteration based tqdm with maxinterval effect
timer = DiscreteTimer()
with closing(StringIO()) as our_file:
with tqdm(_range(total), file=our_file, miniters=None,
with tqdm(range(total), file=our_file, miniters=None,
mininterval=1e-5, smoothing=1, maxinterval=1e-4) as t2:
cpu_timify(t2, timer)
@ -560,9 +549,9 @@ def test_max_interval():
mininterval = 0.1
maxinterval = 10
with closing(StringIO()) as our_file:
t1 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1,
t1 = tqdm(range(total), file=our_file, miniters=None, smoothing=1,
mininterval=mininterval, maxinterval=maxinterval)
t2 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1,
t2 = tqdm(range(total), file=our_file, miniters=None, smoothing=1,
mininterval=0, maxinterval=maxinterval)
cpu_timify(t1, timer1)
@ -605,7 +594,7 @@ def test_delay():
def test_min_iters():
"""Test miniters"""
with closing(StringIO()) as our_file:
for _ in tqdm(_range(3), file=our_file, leave=True, mininterval=0, miniters=2):
for _ in tqdm(range(3), file=our_file, leave=True, mininterval=0, miniters=2):
pass
out = our_file.getvalue()
@ -615,7 +604,7 @@ def test_min_iters():
assert '| 3/3 ' in out
with closing(StringIO()) as our_file:
for _ in tqdm(_range(3), file=our_file, leave=True, mininterval=0, miniters=1):
for _ in tqdm(range(3), file=our_file, leave=True, mininterval=0, miniters=1):
pass
out = our_file.getvalue()
@ -669,7 +658,7 @@ def test_dynamic_min_iters():
# Check iterable based tqdm
with closing(StringIO()) as our_file:
t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None,
t = tqdm(range(10), file=our_file, miniters=None, mininterval=None,
smoothing=0.5)
for _ in t:
pass
@ -677,7 +666,7 @@ def test_dynamic_min_iters():
# No smoothing
with closing(StringIO()) as our_file:
t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None,
t = tqdm(range(10), file=our_file, miniters=None, mininterval=None,
smoothing=0)
for _ in t:
pass
@ -685,7 +674,7 @@ def test_dynamic_min_iters():
# No dynamic_miniters (miniters is fixed manually)
with closing(StringIO()) as our_file:
t = tqdm(_range(10), file=our_file, miniters=1, mininterval=None)
t = tqdm(range(10), file=our_file, miniters=1, mininterval=None)
for _ in t:
pass
assert not t.dynamic_miniters
@ -694,12 +683,12 @@ def test_dynamic_min_iters():
def test_big_min_interval():
"""Test large mininterval"""
with closing(StringIO()) as our_file:
for _ in tqdm(_range(2), file=our_file, mininterval=1E10):
for _ in tqdm(range(2), file=our_file, mininterval=1E10):
pass
assert '50%' not in our_file.getvalue()
with closing(StringIO()) as our_file:
with tqdm(_range(2), file=our_file, mininterval=1E10) as t:
with tqdm(range(2), file=our_file, mininterval=1E10) as t:
t.update()
t.update()
assert '50%' not in our_file.getvalue()
@ -718,10 +707,10 @@ def test_smoothed_dynamic_min_iters():
timer.sleep(1)
t.update(10)
# The next iterations should be partially skipped
for _ in _range(2):
for _ in range(2):
timer.sleep(1)
t.update(4)
for _ in _range(20):
for _ in range(20):
timer.sleep(1)
t.update()
@ -750,7 +739,7 @@ def test_smoothed_dynamic_min_iters_with_min_interval():
t.update(10)
timer.sleep(1e-2)
for _ in _range(4):
for _ in range(4):
t.update()
timer.sleep(1e-2)
out = our_file.getvalue()
@ -758,7 +747,7 @@ def test_smoothed_dynamic_min_iters_with_min_interval():
with closing(StringIO()) as our_file:
# Test iteration-based tqdm
with tqdm(_range(total), file=our_file, miniters=None,
with tqdm(range(total), file=our_file, miniters=None,
mininterval=0.01, smoothing=1, maxinterval=0) as t2:
cpu_timify(t2, timer)
@ -817,7 +806,7 @@ def _rlock_creation_target():
def test_disable():
"""Test disable"""
with closing(StringIO()) as our_file:
for _ in tqdm(_range(3), file=our_file, disable=True):
for _ in tqdm(range(3), file=our_file, disable=True):
pass
assert our_file.getvalue() == ''
@ -831,7 +820,7 @@ def test_disable():
def test_infinite_total():
"""Test treatment of infinite total"""
with closing(StringIO()) as our_file:
for _ in tqdm(_range(3), file=our_file, total=float("inf")):
for _ in tqdm(range(3), file=our_file, total=float("inf")):
pass
@ -852,7 +841,7 @@ def test_nototal():
def test_unit():
"""Test SI unit prefix"""
with closing(StringIO()) as our_file:
for _ in tqdm(_range(3), file=our_file, miniters=1, unit="bytes"):
for _ in tqdm(range(3), file=our_file, miniters=1, unit="bytes"):
pass
assert 'bytes/s' in our_file.getvalue()
@ -866,7 +855,7 @@ def test_ascii():
# Test ascii bar
with closing(StringIO()) as our_file:
for _ in tqdm(_range(3), total=15, file=our_file, miniters=1,
for _ in tqdm(range(3), total=15, file=our_file, miniters=1,
mininterval=0, ascii=True):
pass
res = our_file.getvalue().strip("\r").split("\r")
@ -877,7 +866,7 @@ def test_ascii():
# Test unicode bar
with closing(UnicodeIO()) as our_file:
with tqdm(total=15, file=our_file, ascii=False, mininterval=0) as t:
for _ in _range(3):
for _ in range(3):
t.update()
res = our_file.getvalue().strip("\r").split("\r")
assert u"7%|\u258b" in res[1]
@ -887,7 +876,7 @@ def test_ascii():
# Test custom bar
for bars in [" .oO0", " #"]:
with closing(StringIO()) as our_file:
for _ in tqdm(_range(len(bars) - 1), file=our_file, miniters=1,
for _ in tqdm(range(len(bars) - 1), file=our_file, miniters=1,
mininterval=0, ascii=bars, ncols=27):
pass
res = our_file.getvalue().strip("\r").split("\r")
@ -949,8 +938,7 @@ def test_close():
res = our_file.getvalue()
assert res[-1] == '\n'
if not res.startswith(exres):
raise AssertionError("\n<<< Expected:\n{0}\n>>> Got:\n{1}\n===".format(
exres + ', ...it/s]\n', our_file.getvalue()))
raise AssertionError(f"\n<<< Expected:\n{exres}, ...it/s]\n>>> Got:\n{res}\n===")
# Closing after the output stream has closed
with closing(StringIO()) as our_file:
@ -976,7 +964,7 @@ def test_smoothing():
# -- Test disabling smoothing
with closing(StringIO()) as our_file:
with tqdm(_range(3), file=our_file, smoothing=None, leave=True) as t:
with tqdm(range(3), file=our_file, smoothing=None, leave=True) as t:
cpu_timify(t, timer)
for _ in t:
@ -987,11 +975,11 @@ def test_smoothing():
# 1st case: no smoothing (only use average)
with closing(StringIO()) as our_file2:
with closing(StringIO()) as our_file:
t = tqdm(_range(3), file=our_file2, smoothing=None, leave=True,
t = tqdm(range(3), file=our_file2, smoothing=None, leave=True,
miniters=1, mininterval=0)
cpu_timify(t, timer)
with tqdm(_range(3), file=our_file, smoothing=None, leave=True,
with tqdm(range(3), file=our_file, smoothing=None, leave=True,
miniters=1, mininterval=0) as t2:
cpu_timify(t2, timer)
@ -1017,11 +1005,11 @@ def test_smoothing():
# 2nd case: use max smoothing (= instant rate)
with closing(StringIO()) as our_file2:
with closing(StringIO()) as our_file:
t = tqdm(_range(3), file=our_file2, smoothing=1, leave=True,
t = tqdm(range(3), file=our_file2, smoothing=1, leave=True,
miniters=1, mininterval=0)
cpu_timify(t, timer)
with tqdm(_range(3), file=our_file, smoothing=1, leave=True,
with tqdm(range(3), file=our_file, smoothing=1, leave=True,
miniters=1, mininterval=0) as t2:
cpu_timify(t2, timer)
@ -1040,11 +1028,11 @@ def test_smoothing():
# 3rd case: use medium smoothing
with closing(StringIO()) as our_file2:
with closing(StringIO()) as our_file:
t = tqdm(_range(3), file=our_file2, smoothing=0.5, leave=True,
t = tqdm(range(3), file=our_file2, smoothing=0.5, leave=True,
miniters=1, mininterval=0)
cpu_timify(t, timer)
t2 = tqdm(_range(3), file=our_file, smoothing=0.5, leave=True,
t2 = tqdm(range(3), file=our_file, smoothing=0.5, leave=True,
miniters=1, mininterval=0)
cpu_timify(t2, timer)
@ -1098,7 +1086,7 @@ def test_bar_format():
with closing(StringIO()) as our_file:
bar_format = r'hello world'
with tqdm(ascii=False, bar_format=bar_format, file=our_file) as t:
assert isinstance(t.bar_format, _unicode)
assert isinstance(t.bar_format, str)
def test_custom_format():
@ -1127,7 +1115,7 @@ def test_eta(capsys):
bar_format='{l_bar}{eta:%Y-%m-%d}'):
pass
_, err = capsys.readouterr()
assert "\r100%|{eta:%Y-%m-%d}\n".format(eta=dt.now()) in err
assert f"\r100%|{dt.now():%Y-%m-%d}\n" in err
def test_unpause():
@ -1257,7 +1245,7 @@ def test_position():
t1 = tqdm(desc='pos0 bar', position=0, **kwargs)
t2 = tqdm(desc='pos1 bar', position=1, **kwargs)
t3 = tqdm(desc='pos2 bar', position=2, **kwargs)
for _ in _range(2):
for _ in range(2):
t1.update()
t3.update()
t2.update()
@ -1360,7 +1348,7 @@ def test_deprecated_gui():
# t.close()
# len(tqdm._instances) += 1 # undo the close() decrement
t = tqdm(_range(3), gui=True, file=our_file, miniters=1, mininterval=0)
t = tqdm(range(3), gui=True, file=our_file, miniters=1, mininterval=0)
try:
for _ in t:
pass
@ -1735,7 +1723,7 @@ def test_external_write():
def test_unit_scale():
"""Test numeric `unit_scale`"""
with closing(StringIO()) as our_file:
for _ in tqdm(_range(9), unit_scale=9, file=our_file,
for _ in tqdm(range(9), unit_scale=9, file=our_file,
miniters=1, mininterval=0):
pass
out = our_file.getvalue()
@ -1937,7 +1925,7 @@ def test_screen_shape():
def test_initial():
"""Test `initial`"""
with closing(StringIO()) as our_file:
for _ in tqdm(_range(9), initial=10, total=19, file=our_file,
for _ in tqdm(range(9), initial=10, total=19, file=our_file,
miniters=1, mininterval=0):
pass
out = our_file.getvalue()
@ -1948,7 +1936,7 @@ def test_initial():
def test_colour():
"""Test `colour`"""
with closing(StringIO()) as our_file:
for _ in tqdm(_range(9), file=our_file, colour="#beefed"):
for _ in tqdm(range(9), file=our_file, colour="#beefed"):
pass
out = our_file.getvalue()
assert '\x1b[38;2;%d;%d;%dm' % (0xbe, 0xef, 0xed) in out
@ -1961,7 +1949,7 @@ def test_colour():
assert "Unknown colour" in str(w[-1].message)
with closing(StringIO()) as our_file2:
for _ in tqdm(_range(9), file=our_file2, colour="blue"):
for _ in tqdm(range(9), file=our_file2, colour="blue"):
pass
out = our_file2.getvalue()
assert '\x1b[34m' in out
@ -1977,7 +1965,7 @@ def test_closed():
def test_reversed(capsys):
"""Test reversed()"""
for _ in reversed(tqdm(_range(9))):
for _ in reversed(tqdm(range(9))):
pass
out, err = capsys.readouterr()
assert not out
@ -1989,7 +1977,7 @@ def test_contains(capsys):
"""Test __contains__ doesn't iterate"""
with tqdm(list(range(9))) as t:
assert 9 not in t
assert all(i in t for i in _range(9))
assert all(i in t for i in range(9))
out, err = capsys.readouterr()
assert not out
assert ' 0%' in err

51
tests/tests_utils.py Normal file
View file

@ -0,0 +1,51 @@
from ast import literal_eval
from collections import defaultdict
from typing import Union # py<3.10
from tqdm.utils import envwrap
def test_envwrap(monkeypatch):
"""Test @envwrap (basic)"""
monkeypatch.setenv('FUNC_A', "42")
monkeypatch.setenv('FUNC_TyPe_HiNt', "1337")
monkeypatch.setenv('FUNC_Unused', "x")
@envwrap("FUNC_")
def func(a=1, b=2, type_hint: int = None):
return a, b, type_hint
assert (42, 2, 1337) == func()
assert (99, 2, 1337) == func(a=99)
def test_envwrap_types(monkeypatch):
"""Test @envwrap(types)"""
monkeypatch.setenv('FUNC_notype', "3.14159")
@envwrap("FUNC_", types=defaultdict(lambda: literal_eval))
def func(notype=None):
return notype
assert 3.14159 == func()
monkeypatch.setenv('FUNC_number', "1")
monkeypatch.setenv('FUNC_string', "1")
@envwrap("FUNC_", types={'number': int})
def nofallback(number=None, string=None):
return number, string
assert 1, "1" == nofallback()
def test_envwrap_annotations(monkeypatch):
"""Test @envwrap with typehints"""
monkeypatch.setenv('FUNC_number', "1.1")
monkeypatch.setenv('FUNC_string', "1.1")
@envwrap("FUNC_")
def annotated(number: Union[int, float] = None, string: int = None):
return number, string
assert 1.1, "1.1" == annotated()