239 lines
6.9 KiB
Python
239 lines
6.9 KiB
Python
from __future__ import annotations
|
|
|
|
import contextlib
|
|
import errno
|
|
import importlib.resources
|
|
import os.path
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
from collections.abc import Generator
|
|
from types import TracebackType
|
|
from typing import Any
|
|
from typing import Callable
|
|
|
|
from pre_commit import parse_shebang
|
|
|
|
|
|
def force_bytes(exc: Any) -> bytes:
|
|
with contextlib.suppress(TypeError):
|
|
return bytes(exc)
|
|
with contextlib.suppress(Exception):
|
|
return str(exc).encode()
|
|
return f'<unprintable {type(exc).__name__} object>'.encode()
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def clean_path_on_failure(path: str) -> Generator[None, None, None]:
|
|
"""Cleans up the directory on an exceptional failure."""
|
|
try:
|
|
yield
|
|
except BaseException:
|
|
if os.path.exists(path):
|
|
rmtree(path)
|
|
raise
|
|
|
|
|
|
def resource_text(filename: str) -> str:
|
|
files = importlib.resources.files('pre_commit.resources')
|
|
return files.joinpath(filename).read_text()
|
|
|
|
|
|
def make_executable(filename: str) -> None:
|
|
original_mode = os.stat(filename).st_mode
|
|
new_mode = original_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
|
|
os.chmod(filename, new_mode)
|
|
|
|
|
|
class CalledProcessError(RuntimeError):
|
|
def __init__(
|
|
self,
|
|
returncode: int,
|
|
cmd: tuple[str, ...],
|
|
stdout: bytes,
|
|
stderr: bytes | None,
|
|
) -> None:
|
|
super().__init__(returncode, cmd, stdout, stderr)
|
|
self.returncode = returncode
|
|
self.cmd = cmd
|
|
self.stdout = stdout
|
|
self.stderr = stderr
|
|
|
|
def __bytes__(self) -> bytes:
|
|
def _indent_or_none(part: bytes | None) -> bytes:
|
|
if part:
|
|
return b'\n ' + part.replace(b'\n', b'\n ').rstrip()
|
|
else:
|
|
return b' (none)'
|
|
|
|
return b''.join((
|
|
f'command: {self.cmd!r}\n'.encode(),
|
|
f'return code: {self.returncode}\n'.encode(),
|
|
b'stdout:', _indent_or_none(self.stdout), b'\n',
|
|
b'stderr:', _indent_or_none(self.stderr),
|
|
))
|
|
|
|
def __str__(self) -> str:
|
|
return self.__bytes__().decode()
|
|
|
|
|
|
def _setdefault_kwargs(kwargs: dict[str, Any]) -> None:
|
|
for arg in ('stdin', 'stdout', 'stderr'):
|
|
kwargs.setdefault(arg, subprocess.PIPE)
|
|
|
|
|
|
def _oserror_to_output(e: OSError) -> tuple[int, bytes, None]:
|
|
return 1, force_bytes(e).rstrip(b'\n') + b'\n', None
|
|
|
|
|
|
def cmd_output_b(
|
|
*cmd: str,
|
|
check: bool = True,
|
|
**kwargs: Any,
|
|
) -> tuple[int, bytes, bytes | None]:
|
|
_setdefault_kwargs(kwargs)
|
|
|
|
try:
|
|
cmd = parse_shebang.normalize_cmd(cmd, env=kwargs.get('env'))
|
|
except parse_shebang.ExecutableNotFoundError as e:
|
|
returncode, stdout_b, stderr_b = e.to_output()
|
|
else:
|
|
try:
|
|
proc = subprocess.Popen(cmd, **kwargs)
|
|
except OSError as e:
|
|
returncode, stdout_b, stderr_b = _oserror_to_output(e)
|
|
else:
|
|
stdout_b, stderr_b = proc.communicate()
|
|
returncode = proc.returncode
|
|
|
|
if check and returncode:
|
|
raise CalledProcessError(returncode, cmd, stdout_b, stderr_b)
|
|
|
|
return returncode, stdout_b, stderr_b
|
|
|
|
|
|
def cmd_output(*cmd: str, **kwargs: Any) -> tuple[int, str, str | None]:
|
|
returncode, stdout_b, stderr_b = cmd_output_b(*cmd, **kwargs)
|
|
stdout = stdout_b.decode() if stdout_b is not None else None
|
|
stderr = stderr_b.decode() if stderr_b is not None else None
|
|
return returncode, stdout, stderr
|
|
|
|
|
|
if sys.platform != 'win32': # pragma: win32 no cover
|
|
from os import openpty
|
|
import termios
|
|
|
|
class Pty:
|
|
def __init__(self) -> None:
|
|
self.r: int | None = None
|
|
self.w: int | None = None
|
|
|
|
def __enter__(self) -> Pty:
|
|
self.r, self.w = openpty()
|
|
|
|
# tty flags normally change \n to \r\n
|
|
attrs = termios.tcgetattr(self.w)
|
|
assert isinstance(attrs[1], int)
|
|
attrs[1] &= ~(termios.ONLCR | termios.OPOST)
|
|
termios.tcsetattr(self.w, termios.TCSANOW, attrs)
|
|
|
|
return self
|
|
|
|
def close_w(self) -> None:
|
|
if self.w is not None:
|
|
os.close(self.w)
|
|
self.w = None
|
|
|
|
def close_r(self) -> None:
|
|
assert self.r is not None
|
|
os.close(self.r)
|
|
self.r = None
|
|
|
|
def __exit__(
|
|
self,
|
|
exc_type: type[BaseException] | None,
|
|
exc_value: BaseException | None,
|
|
traceback: TracebackType | None,
|
|
) -> None:
|
|
self.close_w()
|
|
self.close_r()
|
|
|
|
def cmd_output_p(
|
|
*cmd: str,
|
|
check: bool = True,
|
|
**kwargs: Any,
|
|
) -> tuple[int, bytes, bytes | None]:
|
|
assert check is False
|
|
assert kwargs['stderr'] == subprocess.STDOUT, kwargs['stderr']
|
|
_setdefault_kwargs(kwargs)
|
|
|
|
try:
|
|
cmd = parse_shebang.normalize_cmd(cmd)
|
|
except parse_shebang.ExecutableNotFoundError as e:
|
|
return e.to_output()
|
|
|
|
with open(os.devnull) as devnull, Pty() as pty:
|
|
assert pty.r is not None
|
|
kwargs.update({'stdin': devnull, 'stdout': pty.w, 'stderr': pty.w})
|
|
try:
|
|
proc = subprocess.Popen(cmd, **kwargs)
|
|
except OSError as e:
|
|
return _oserror_to_output(e)
|
|
|
|
pty.close_w()
|
|
|
|
buf = b''
|
|
while True:
|
|
try:
|
|
bts = os.read(pty.r, 4096)
|
|
except OSError as e:
|
|
if e.errno == errno.EIO:
|
|
bts = b''
|
|
else:
|
|
raise
|
|
else:
|
|
buf += bts
|
|
if not bts:
|
|
break
|
|
|
|
return proc.wait(), buf, None
|
|
else: # pragma: no cover
|
|
cmd_output_p = cmd_output_b
|
|
|
|
|
|
def _handle_readonly(
|
|
func: Callable[[str], object],
|
|
path: str,
|
|
exc: BaseException,
|
|
) -> None:
|
|
if (
|
|
func in (os.rmdir, os.remove, os.unlink) and
|
|
isinstance(exc, OSError) and
|
|
exc.errno in {errno.EACCES, errno.EPERM}
|
|
):
|
|
for p in (path, os.path.dirname(path)):
|
|
os.chmod(p, os.stat(p).st_mode | stat.S_IWUSR)
|
|
func(path)
|
|
else:
|
|
raise
|
|
|
|
|
|
if sys.version_info < (3, 12): # pragma: <3.12 cover
|
|
def _handle_readonly_old(
|
|
func: Callable[[str], object],
|
|
path: str,
|
|
excinfo: tuple[type[BaseException], BaseException, TracebackType],
|
|
) -> None:
|
|
return _handle_readonly(func, path, excinfo[1])
|
|
|
|
def rmtree(path: str) -> None:
|
|
shutil.rmtree(path, ignore_errors=False, onerror=_handle_readonly_old)
|
|
else: # pragma: >=3.12 cover
|
|
def rmtree(path: str) -> None:
|
|
"""On windows, rmtree fails for readonly dirs."""
|
|
shutil.rmtree(path, ignore_errors=False, onexc=_handle_readonly)
|
|
|
|
|
|
def win_exe(s: str) -> str:
|
|
return s if sys.platform != 'win32' else f'{s}.exe'
|