1
0
Fork 0

Adding upstream version 2.2.0.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-09 21:10:22 +01:00
parent 18c908e4f3
commit c0d06915b7
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
199 changed files with 14930 additions and 0 deletions

View file

View file

@ -0,0 +1,182 @@
import os.path
import re
from typing import Any
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Sequence
from typing import Tuple
import pre_commit.constants as C
from pre_commit import git
from pre_commit import output
from pre_commit.clientlib import InvalidManifestError
from pre_commit.clientlib import load_config
from pre_commit.clientlib import load_manifest
from pre_commit.clientlib import LOCAL
from pre_commit.clientlib import META
from pre_commit.commands.migrate_config import migrate_config
from pre_commit.store import Store
from pre_commit.util import CalledProcessError
from pre_commit.util import cmd_output
from pre_commit.util import cmd_output_b
from pre_commit.util import tmpdir
from pre_commit.util import yaml_dump
from pre_commit.util import yaml_load
class RevInfo(NamedTuple):
repo: str
rev: str
frozen: Optional[str]
@classmethod
def from_config(cls, config: Dict[str, Any]) -> 'RevInfo':
return cls(config['repo'], config['rev'], None)
def update(self, tags_only: bool, freeze: bool) -> 'RevInfo':
if tags_only:
tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags', '--abbrev=0')
else:
tag_cmd = ('git', 'describe', 'FETCH_HEAD', '--tags', '--exact')
with tmpdir() as tmp:
git.init_repo(tmp, self.repo)
cmd_output_b('git', 'fetch', 'origin', 'HEAD', '--tags', cwd=tmp)
try:
rev = cmd_output(*tag_cmd, cwd=tmp)[1].strip()
except CalledProcessError:
cmd = ('git', 'rev-parse', 'FETCH_HEAD')
rev = cmd_output(*cmd, cwd=tmp)[1].strip()
frozen = None
if freeze:
exact = cmd_output('git', 'rev-parse', rev, cwd=tmp)[1].strip()
if exact != rev:
rev, frozen = exact, rev
return self._replace(rev=rev, frozen=frozen)
class RepositoryCannotBeUpdatedError(RuntimeError):
pass
def _check_hooks_still_exist_at_rev(
repo_config: Dict[str, Any],
info: RevInfo,
store: Store,
) -> None:
try:
path = store.clone(repo_config['repo'], info.rev)
manifest = load_manifest(os.path.join(path, C.MANIFEST_FILE))
except InvalidManifestError as e:
raise RepositoryCannotBeUpdatedError(str(e))
# See if any of our hooks were deleted with the new commits
hooks = {hook['id'] for hook in repo_config['hooks']}
hooks_missing = hooks - {hook['id'] for hook in manifest}
if hooks_missing:
raise RepositoryCannotBeUpdatedError(
f'Cannot update because the tip of HEAD is missing these hooks:\n'
f'{", ".join(sorted(hooks_missing))}',
)
REV_LINE_RE = re.compile(r'^(\s+)rev:(\s*)([^\s#]+)(.*)(\r?\n)$', re.DOTALL)
def _original_lines(
path: str,
rev_infos: List[Optional[RevInfo]],
retry: bool = False,
) -> Tuple[List[str], List[int]]:
"""detect `rev:` lines or reformat the file"""
with open(path) as f:
original = f.read()
lines = original.splitlines(True)
idxs = [i for i, line in enumerate(lines) if REV_LINE_RE.match(line)]
if len(idxs) == len(rev_infos):
return lines, idxs
elif retry:
raise AssertionError('could not find rev lines')
else:
with open(path, 'w') as f:
f.write(yaml_dump(yaml_load(original)))
return _original_lines(path, rev_infos, retry=True)
def _write_new_config(path: str, rev_infos: List[Optional[RevInfo]]) -> None:
lines, idxs = _original_lines(path, rev_infos)
for idx, rev_info in zip(idxs, rev_infos):
if rev_info is None:
continue
match = REV_LINE_RE.match(lines[idx])
assert match is not None
new_rev_s = yaml_dump({'rev': rev_info.rev})
new_rev = new_rev_s.split(':', 1)[1].strip()
if rev_info.frozen is not None:
comment = f' # frozen: {rev_info.frozen}'
elif match[4].strip().startswith('# frozen:'):
comment = ''
else:
comment = match[4]
lines[idx] = f'{match[1]}rev:{match[2]}{new_rev}{comment}{match[5]}'
with open(path, 'w') as f:
f.write(''.join(lines))
def autoupdate(
config_file: str,
store: Store,
tags_only: bool,
freeze: bool,
repos: Sequence[str] = (),
) -> int:
"""Auto-update the pre-commit config to the latest versions of repos."""
migrate_config(config_file, quiet=True)
retv = 0
rev_infos: List[Optional[RevInfo]] = []
changed = False
config = load_config(config_file)
for repo_config in config['repos']:
if repo_config['repo'] in {LOCAL, META}:
continue
info = RevInfo.from_config(repo_config)
if repos and info.repo not in repos:
rev_infos.append(None)
continue
output.write(f'Updating {info.repo} ... ')
new_info = info.update(tags_only=tags_only, freeze=freeze)
try:
_check_hooks_still_exist_at_rev(repo_config, new_info, store)
except RepositoryCannotBeUpdatedError as error:
output.write_line(error.args[0])
rev_infos.append(None)
retv = 1
continue
if new_info.rev != info.rev:
changed = True
if new_info.frozen:
updated_to = f'{new_info.frozen} (frozen)'
else:
updated_to = new_info.rev
msg = f'updating {info.rev} -> {updated_to}.'
output.write_line(msg)
rev_infos.append(new_info)
else:
output.write_line('already up to date.')
rev_infos.append(None)
if changed:
_write_new_config(config_file, rev_infos)
return retv

View file

@ -0,0 +1,14 @@
import os.path
from pre_commit import output
from pre_commit.store import Store
from pre_commit.util import rmtree
def clean(store: Store) -> int:
legacy_path = os.path.expanduser('~/.pre-commit')
for directory in (store.directory, legacy_path):
if os.path.exists(directory):
rmtree(directory)
output.write_line(f'Cleaned {directory}.')
return 0

90
pre_commit/commands/gc.py Normal file
View file

@ -0,0 +1,90 @@
import os.path
from typing import Any
from typing import Dict
from typing import Set
from typing import Tuple
import pre_commit.constants as C
from pre_commit import output
from pre_commit.clientlib import InvalidConfigError
from pre_commit.clientlib import InvalidManifestError
from pre_commit.clientlib import load_config
from pre_commit.clientlib import load_manifest
from pre_commit.clientlib import LOCAL
from pre_commit.clientlib import META
from pre_commit.store import Store
def _mark_used_repos(
store: Store,
all_repos: Dict[Tuple[str, str], str],
unused_repos: Set[Tuple[str, str]],
repo: Dict[str, Any],
) -> None:
if repo['repo'] == META:
return
elif repo['repo'] == LOCAL:
for hook in repo['hooks']:
deps = hook.get('additional_dependencies')
unused_repos.discard((
store.db_repo_name(repo['repo'], deps), C.LOCAL_REPO_VERSION,
))
else:
key = (repo['repo'], repo['rev'])
path = all_repos.get(key)
# can't inspect manifest if it isn't cloned
if path is None:
return
try:
manifest = load_manifest(os.path.join(path, C.MANIFEST_FILE))
except InvalidManifestError:
return
else:
unused_repos.discard(key)
by_id = {hook['id']: hook for hook in manifest}
for hook in repo['hooks']:
if hook['id'] not in by_id:
continue
deps = hook.get(
'additional_dependencies',
by_id[hook['id']]['additional_dependencies'],
)
unused_repos.discard((
store.db_repo_name(repo['repo'], deps), repo['rev'],
))
def _gc_repos(store: Store) -> int:
configs = store.select_all_configs()
repos = store.select_all_repos()
# delete config paths which do not exist
dead_configs = [p for p in configs if not os.path.exists(p)]
live_configs = [p for p in configs if os.path.exists(p)]
all_repos = {(repo, ref): path for repo, ref, path in repos}
unused_repos = set(all_repos)
for config_path in live_configs:
try:
config = load_config(config_path)
except InvalidConfigError:
dead_configs.append(config_path)
continue
else:
for repo in config['repos']:
_mark_used_repos(store, all_repos, unused_repos, repo)
store.delete_configs(dead_configs)
for db_repo_name, ref in unused_repos:
store.delete_repo(db_repo_name, ref, all_repos[(db_repo_name, ref)])
return len(unused_repos)
def gc(store: Store) -> int:
with store.exclusive_lock():
repos_removed = _gc_repos(store)
output.write_line(f'{repos_removed} repo(s) removed.')
return 0

View file

@ -0,0 +1,187 @@
import argparse
import os.path
import subprocess
import sys
from typing import Optional
from typing import Sequence
from typing import Tuple
from pre_commit.commands.run import run
from pre_commit.envcontext import envcontext
from pre_commit.parse_shebang import normalize_cmd
from pre_commit.store import Store
Z40 = '0' * 40
def _run_legacy(
hook_type: str,
hook_dir: str,
args: Sequence[str],
) -> Tuple[int, bytes]:
if os.environ.get('PRE_COMMIT_RUNNING_LEGACY'):
raise SystemExit(
f"bug: pre-commit's script is installed in migration mode\n"
f'run `pre-commit install -f --hook-type {hook_type}` to fix '
f'this\n\n'
f'Please report this bug at '
f'https://github.com/pre-commit/pre-commit/issues',
)
if hook_type == 'pre-push':
stdin = sys.stdin.buffer.read()
else:
stdin = b''
# not running in legacy mode
legacy_hook = os.path.join(hook_dir, f'{hook_type}.legacy')
if not os.access(legacy_hook, os.X_OK):
return 0, stdin
with envcontext((('PRE_COMMIT_RUNNING_LEGACY', '1'),)):
cmd = normalize_cmd((legacy_hook, *args))
return subprocess.run(cmd, input=stdin).returncode, stdin
def _validate_config(
retv: int,
config: str,
skip_on_missing_config: bool,
) -> None:
if not os.path.isfile(config):
if skip_on_missing_config or os.getenv('PRE_COMMIT_ALLOW_NO_CONFIG'):
print(f'`{config}` config file not found. Skipping `pre-commit`.')
raise SystemExit(retv)
else:
print(
f'No {config} file was found\n'
f'- To temporarily silence this, run '
f'`PRE_COMMIT_ALLOW_NO_CONFIG=1 git ...`\n'
f'- To permanently silence this, install pre-commit with the '
f'--allow-missing-config option\n'
f'- To uninstall pre-commit run `pre-commit uninstall`',
)
raise SystemExit(1)
def _ns(
hook_type: str,
color: bool,
*,
all_files: bool = False,
from_ref: Optional[str] = None,
to_ref: Optional[str] = None,
remote_name: Optional[str] = None,
remote_url: Optional[str] = None,
commit_msg_filename: Optional[str] = None,
checkout_type: Optional[str] = None,
) -> argparse.Namespace:
return argparse.Namespace(
color=color,
hook_stage=hook_type.replace('pre-', ''),
from_ref=from_ref,
to_ref=to_ref,
remote_name=remote_name,
remote_url=remote_url,
commit_msg_filename=commit_msg_filename,
all_files=all_files,
checkout_type=checkout_type,
files=(),
hook=None,
verbose=False,
show_diff_on_failure=False,
)
def _rev_exists(rev: str) -> bool:
return not subprocess.call(('git', 'rev-list', '--quiet', rev))
def _pre_push_ns(
color: bool,
args: Sequence[str],
stdin: bytes,
) -> Optional[argparse.Namespace]:
remote_name = args[0]
remote_url = args[1]
for line in stdin.decode().splitlines():
_, local_sha, _, remote_sha = line.split()
if local_sha == Z40:
continue
elif remote_sha != Z40 and _rev_exists(remote_sha):
return _ns(
'pre-push', color,
from_ref=remote_sha, to_ref=local_sha,
remote_name=remote_name, remote_url=remote_url,
)
else:
# ancestors not found in remote
ancestors = subprocess.check_output((
'git', 'rev-list', local_sha, '--topo-order', '--reverse',
'--not', f'--remotes={remote_name}',
)).decode().strip()
if not ancestors:
continue
else:
first_ancestor = ancestors.splitlines()[0]
cmd = ('git', 'rev-list', '--max-parents=0', local_sha)
roots = set(subprocess.check_output(cmd).decode().splitlines())
if first_ancestor in roots:
# pushing the whole tree including root commit
return _ns(
'pre-push', color,
all_files=True,
remote_name=remote_name, remote_url=remote_url,
)
else:
rev_cmd = ('git', 'rev-parse', f'{first_ancestor}^')
source = subprocess.check_output(rev_cmd).decode().strip()
return _ns(
'pre-push', color,
from_ref=source, to_ref=local_sha,
remote_name=remote_name, remote_url=remote_url,
)
# nothing to push
return None
def _run_ns(
hook_type: str,
color: bool,
args: Sequence[str],
stdin: bytes,
) -> Optional[argparse.Namespace]:
if hook_type == 'pre-push':
return _pre_push_ns(color, args, stdin)
elif hook_type in {'prepare-commit-msg', 'commit-msg'}:
return _ns(hook_type, color, commit_msg_filename=args[0])
elif hook_type in {'pre-merge-commit', 'pre-commit'}:
return _ns(hook_type, color)
elif hook_type == 'post-checkout':
return _ns(
hook_type, color,
from_ref=args[0], to_ref=args[1], checkout_type=args[2],
)
else:
raise AssertionError(f'unexpected hook type: {hook_type}')
def hook_impl(
store: Store,
*,
config: str,
color: bool,
hook_type: str,
hook_dir: str,
skip_on_missing_config: bool,
args: Sequence[str],
) -> int:
retv, stdin = _run_legacy(hook_type, hook_dir, args)
_validate_config(retv, config, skip_on_missing_config)
ns = _run_ns(hook_type, color, args, stdin)
if ns is None:
return retv
else:
return retv | run(config, store, ns)

View file

@ -0,0 +1,33 @@
import logging
import os.path
from typing import Sequence
from pre_commit.commands.install_uninstall import install
from pre_commit.store import Store
from pre_commit.util import CalledProcessError
from pre_commit.util import cmd_output
logger = logging.getLogger('pre_commit')
def init_templatedir(
config_file: str,
store: Store,
directory: str,
hook_types: Sequence[str],
) -> int:
install(
config_file, store, hook_types=hook_types,
overwrite=True, skip_on_missing_config=True, git_dir=directory,
)
try:
_, out, _ = cmd_output('git', 'config', 'init.templateDir')
except CalledProcessError:
configured_path = None
else:
configured_path = os.path.realpath(os.path.expanduser(out.strip()))
dest = os.path.realpath(directory)
if configured_path != dest:
logger.warning('`init.templateDir` not set to the target directory')
logger.warning(f'maybe `git config --global init.templateDir {dest}`?')
return 0

View file

@ -0,0 +1,175 @@
import itertools
import logging
import os.path
import shutil
import sys
from typing import Optional
from typing import Sequence
from typing import Tuple
from pre_commit import git
from pre_commit import output
from pre_commit.clientlib import load_config
from pre_commit.repository import all_hooks
from pre_commit.repository import install_hook_envs
from pre_commit.store import Store
from pre_commit.util import make_executable
from pre_commit.util import resource_text
logger = logging.getLogger(__name__)
# This is used to identify the hook file we install
PRIOR_HASHES = (
'4d9958c90bc262f47553e2c073f14cfe',
'd8ee923c46731b42cd95cc869add4062',
'49fd668cb42069aa1b6048464be5d395',
'79f09a650522a87b0da915d0d983b2de',
'e358c9dae00eac5d06b38dfdb1e33a8c',
)
CURRENT_HASH = '138fd403232d2ddd5efb44317e38bf03'
TEMPLATE_START = '# start templated\n'
TEMPLATE_END = '# end templated\n'
# Homebrew/homebrew-core#35825: be more timid about appropriate `PATH`
# #1312 os.defpath is too restrictive on BSD
POSIX_SEARCH_PATH = ('/usr/local/bin', '/usr/bin', '/bin')
SYS_EXE = os.path.basename(os.path.realpath(sys.executable))
def _hook_paths(
hook_type: str,
git_dir: Optional[str] = None,
) -> Tuple[str, str]:
git_dir = git_dir if git_dir is not None else git.get_git_dir()
pth = os.path.join(git_dir, 'hooks', hook_type)
return pth, f'{pth}.legacy'
def is_our_script(filename: str) -> bool:
if not os.path.exists(filename): # pragma: win32 no cover (symlink)
return False
with open(filename) as f:
contents = f.read()
return any(h in contents for h in (CURRENT_HASH,) + PRIOR_HASHES)
def shebang() -> str:
if sys.platform == 'win32':
py = SYS_EXE
else:
exe_choices = [
f'python{sys.version_info[0]}.{sys.version_info[1]}',
f'python{sys.version_info[0]}',
]
# avoid searching for bare `python` as it's likely to be python 2
if SYS_EXE != 'python':
exe_choices.append(SYS_EXE)
for path, exe in itertools.product(POSIX_SEARCH_PATH, exe_choices):
if os.access(os.path.join(path, exe), os.X_OK):
py = exe
break
else:
py = SYS_EXE
return f'#!/usr/bin/env {py}'
def _install_hook_script(
config_file: str,
hook_type: str,
overwrite: bool = False,
skip_on_missing_config: bool = False,
git_dir: Optional[str] = None,
) -> None:
hook_path, legacy_path = _hook_paths(hook_type, git_dir=git_dir)
os.makedirs(os.path.dirname(hook_path), exist_ok=True)
# If we have an existing hook, move it to pre-commit.legacy
if os.path.lexists(hook_path) and not is_our_script(hook_path):
shutil.move(hook_path, legacy_path)
# If we specify overwrite, we simply delete the legacy file
if overwrite and os.path.exists(legacy_path):
os.remove(legacy_path)
elif os.path.exists(legacy_path):
output.write_line(
f'Running in migration mode with existing hooks at {legacy_path}\n'
f'Use -f to use only pre-commit.',
)
args = ['hook-impl', f'--config={config_file}', f'--hook-type={hook_type}']
if skip_on_missing_config:
args.append('--skip-on-missing-config')
params = {'INSTALL_PYTHON': sys.executable, 'ARGS': args}
with open(hook_path, 'w') as hook_file:
contents = resource_text('hook-tmpl')
before, rest = contents.split(TEMPLATE_START)
to_template, after = rest.split(TEMPLATE_END)
before = before.replace('#!/usr/bin/env python3', shebang())
hook_file.write(before + TEMPLATE_START)
for line in to_template.splitlines():
var = line.split()[0]
hook_file.write(f'{var} = {params[var]!r}\n')
hook_file.write(TEMPLATE_END + after)
make_executable(hook_path)
output.write_line(f'pre-commit installed at {hook_path}')
def install(
config_file: str,
store: Store,
hook_types: Sequence[str],
overwrite: bool = False,
hooks: bool = False,
skip_on_missing_config: bool = False,
git_dir: Optional[str] = None,
) -> int:
if git_dir is None and git.has_core_hookpaths_set():
logger.error(
'Cowardly refusing to install hooks with `core.hooksPath` set.\n'
'hint: `git config --unset-all core.hooksPath`',
)
return 1
for hook_type in hook_types:
_install_hook_script(
config_file, hook_type,
overwrite=overwrite,
skip_on_missing_config=skip_on_missing_config,
git_dir=git_dir,
)
if hooks:
install_hooks(config_file, store)
return 0
def install_hooks(config_file: str, store: Store) -> int:
install_hook_envs(all_hooks(load_config(config_file), store), store)
return 0
def _uninstall_hook_script(hook_type: str) -> None:
hook_path, legacy_path = _hook_paths(hook_type)
# If our file doesn't exist or it isn't ours, gtfo.
if not os.path.exists(hook_path) or not is_our_script(hook_path):
return
os.remove(hook_path)
output.write_line(f'{hook_type} uninstalled')
if os.path.exists(legacy_path):
os.rename(legacy_path, hook_path)
output.write_line(f'Restored previous hooks to {hook_path}')
def uninstall(hook_types: Sequence[str]) -> int:
for hook_type in hook_types:
_uninstall_hook_script(hook_type)
return 0

View file

@ -0,0 +1,59 @@
import re
import yaml
from pre_commit.util import yaml_load
def _indent(s: str) -> str:
lines = s.splitlines(True)
return ''.join(' ' * 4 + line if line.strip() else line for line in lines)
def _is_header_line(line: str) -> bool:
return line.startswith(('#', '---')) or not line.strip()
def _migrate_map(contents: str) -> str:
# Find the first non-header line
lines = contents.splitlines(True)
i = 0
# Only loop on non empty configuration file
while i < len(lines) and _is_header_line(lines[i]):
i += 1
header = ''.join(lines[:i])
rest = ''.join(lines[i:])
if isinstance(yaml_load(contents), list):
# If they are using the "default" flow style of yaml, this operation
# will yield a valid configuration
try:
trial_contents = f'{header}repos:\n{rest}'
yaml_load(trial_contents)
contents = trial_contents
except yaml.YAMLError:
contents = f'{header}repos:\n{_indent(rest)}'
return contents
def _migrate_sha_to_rev(contents: str) -> str:
return re.sub(r'(\n\s+)sha:', r'\1rev:', contents)
def migrate_config(config_file: str, quiet: bool = False) -> int:
with open(config_file) as f:
orig_contents = contents = f.read()
contents = _migrate_map(contents)
contents = _migrate_sha_to_rev(contents)
if contents != orig_contents:
with open(config_file, 'w') as f:
f.write(contents)
print('Configuration has been migrated.')
elif not quiet:
print('Configuration is already migrated.')
return 0

360
pre_commit/commands/run.py Normal file
View file

@ -0,0 +1,360 @@
import argparse
import contextlib
import functools
import logging
import os
import re
import subprocess
import time
from typing import Any
from typing import Collection
from typing import Dict
from typing import List
from typing import Sequence
from typing import Set
from typing import Tuple
from identify.identify import tags_from_path
from pre_commit import color
from pre_commit import git
from pre_commit import output
from pre_commit.clientlib import load_config
from pre_commit.hook import Hook
from pre_commit.languages.all import languages
from pre_commit.repository import all_hooks
from pre_commit.repository import install_hook_envs
from pre_commit.staged_files_only import staged_files_only
from pre_commit.store import Store
from pre_commit.util import cmd_output_b
from pre_commit.util import EnvironT
logger = logging.getLogger('pre_commit')
def _start_msg(*, start: str, cols: int, end_len: int) -> str:
dots = '.' * (cols - len(start) - end_len - 1)
return f'{start}{dots}'
def _full_msg(
*,
start: str,
cols: int,
end_msg: str,
end_color: str,
use_color: bool,
postfix: str = '',
) -> str:
dots = '.' * (cols - len(start) - len(postfix) - len(end_msg) - 1)
end = color.format_color(end_msg, end_color, use_color)
return f'{start}{dots}{postfix}{end}\n'
def filter_by_include_exclude(
names: Collection[str],
include: str,
exclude: str,
) -> List[str]:
include_re, exclude_re = re.compile(include), re.compile(exclude)
return [
filename for filename in names
if include_re.search(filename)
if not exclude_re.search(filename)
]
class Classifier:
def __init__(self, filenames: Sequence[str]) -> None:
# on windows we normalize all filenames to use forward slashes
# this makes it easier to filter using the `files:` regex
# this also makes improperly quoted shell-based hooks work better
# see #1173
if os.altsep == '/' and os.sep == '\\':
filenames = [f.replace(os.sep, os.altsep) for f in filenames]
self.filenames = [f for f in filenames if os.path.lexists(f)]
@functools.lru_cache(maxsize=None)
def _types_for_file(self, filename: str) -> Set[str]:
return tags_from_path(filename)
def by_types(
self,
names: Sequence[str],
types: Collection[str],
exclude_types: Collection[str],
) -> List[str]:
types, exclude_types = frozenset(types), frozenset(exclude_types)
ret = []
for filename in names:
tags = self._types_for_file(filename)
if tags >= types and not tags & exclude_types:
ret.append(filename)
return ret
def filenames_for_hook(self, hook: Hook) -> Tuple[str, ...]:
names = self.filenames
names = filter_by_include_exclude(names, hook.files, hook.exclude)
names = self.by_types(names, hook.types, hook.exclude_types)
return tuple(names)
def _get_skips(environ: EnvironT) -> Set[str]:
skips = environ.get('SKIP', '')
return {skip.strip() for skip in skips.split(',') if skip.strip()}
SKIPPED = 'Skipped'
NO_FILES = '(no files to check)'
def _subtle_line(s: str, use_color: bool) -> None:
output.write_line(color.format_color(s, color.SUBTLE, use_color))
def _run_single_hook(
classifier: Classifier,
hook: Hook,
skips: Set[str],
cols: int,
verbose: bool,
use_color: bool,
) -> bool:
filenames = classifier.filenames_for_hook(hook)
if hook.id in skips or hook.alias in skips:
output.write(
_full_msg(
start=hook.name,
end_msg=SKIPPED,
end_color=color.YELLOW,
use_color=use_color,
cols=cols,
),
)
duration = None
retcode = 0
files_modified = False
out = b''
elif not filenames and not hook.always_run:
output.write(
_full_msg(
start=hook.name,
postfix=NO_FILES,
end_msg=SKIPPED,
end_color=color.TURQUOISE,
use_color=use_color,
cols=cols,
),
)
duration = None
retcode = 0
files_modified = False
out = b''
else:
# print hook and dots first in case the hook takes a while to run
output.write(_start_msg(start=hook.name, end_len=6, cols=cols))
diff_cmd = ('git', 'diff', '--no-ext-diff')
diff_before = cmd_output_b(*diff_cmd, retcode=None)
if not hook.pass_filenames:
filenames = ()
time_before = time.time()
language = languages[hook.language]
retcode, out = language.run_hook(hook, filenames, use_color)
duration = round(time.time() - time_before, 2) or 0
diff_after = cmd_output_b(*diff_cmd, retcode=None)
# if the hook makes changes, fail the commit
files_modified = diff_before != diff_after
if retcode or files_modified:
print_color = color.RED
status = 'Failed'
else:
print_color = color.GREEN
status = 'Passed'
output.write_line(color.format_color(status, print_color, use_color))
if verbose or hook.verbose or retcode or files_modified:
_subtle_line(f'- hook id: {hook.id}', use_color)
if (verbose or hook.verbose) and duration is not None:
_subtle_line(f'- duration: {duration}s', use_color)
if retcode:
_subtle_line(f'- exit code: {retcode}', use_color)
# Print a message if failing due to file modifications
if files_modified:
_subtle_line('- files were modified by this hook', use_color)
if out.strip():
output.write_line()
output.write_line_b(out.strip(), logfile_name=hook.log_file)
output.write_line()
return files_modified or bool(retcode)
def _compute_cols(hooks: Sequence[Hook]) -> int:
"""Compute the number of columns to display hook messages. The widest
that will be displayed is in the no files skipped case:
Hook name...(no files to check) Skipped
"""
if hooks:
name_len = max(len(hook.name) for hook in hooks)
else:
name_len = 0
cols = name_len + 3 + len(NO_FILES) + 1 + len(SKIPPED)
return max(cols, 80)
def _all_filenames(args: argparse.Namespace) -> Collection[str]:
if args.hook_stage == 'post-checkout': # no files for post-checkout
return ()
elif args.hook_stage in {'prepare-commit-msg', 'commit-msg'}:
return (args.commit_msg_filename,)
elif args.from_ref and args.to_ref:
return git.get_changed_files(args.from_ref, args.to_ref)
elif args.files:
return args.files
elif args.all_files:
return git.get_all_files()
elif git.is_in_merge_conflict():
return git.get_conflicted_files()
else:
return git.get_staged_files()
def _run_hooks(
config: Dict[str, Any],
hooks: Sequence[Hook],
args: argparse.Namespace,
environ: EnvironT,
) -> int:
"""Actually run the hooks."""
skips = _get_skips(environ)
cols = _compute_cols(hooks)
filenames = filter_by_include_exclude(
_all_filenames(args), config['files'], config['exclude'],
)
classifier = Classifier(filenames)
retval = 0
for hook in hooks:
retval |= _run_single_hook(
classifier, hook, skips, cols,
verbose=args.verbose, use_color=args.color,
)
if retval and config['fail_fast']:
break
if retval and args.show_diff_on_failure and git.has_diff():
if args.all_files:
output.write_line(
'pre-commit hook(s) made changes.\n'
'If you are seeing this message in CI, '
'reproduce locally with: `pre-commit run --all-files`.\n'
'To run `pre-commit` as part of git workflow, use '
'`pre-commit install`.',
)
output.write_line('All changes made by hooks:')
# args.color is a boolean.
# See user_color function in color.py
git_color_opt = 'always' if args.color else 'never'
subprocess.call((
'git', '--no-pager', 'diff', '--no-ext-diff',
f'--color={git_color_opt}',
))
return retval
def _has_unmerged_paths() -> bool:
_, stdout, _ = cmd_output_b('git', 'ls-files', '--unmerged')
return bool(stdout.strip())
def _has_unstaged_config(config_file: str) -> bool:
retcode, _, _ = cmd_output_b(
'git', 'diff', '--no-ext-diff', '--exit-code', config_file,
retcode=None,
)
# be explicit, other git errors don't mean it has an unstaged config.
return retcode == 1
def run(
config_file: str,
store: Store,
args: argparse.Namespace,
environ: EnvironT = os.environ,
) -> int:
stash = not args.all_files and not args.files
# Check if we have unresolved merge conflict files and fail fast.
if _has_unmerged_paths():
logger.error('Unmerged files. Resolve before committing.')
return 1
if bool(args.from_ref) != bool(args.to_ref):
logger.error('Specify both --from-ref and --to-ref.')
return 1
if stash and _has_unstaged_config(config_file):
logger.error(
f'Your pre-commit configuration is unstaged.\n'
f'`git add {config_file}` to fix this.',
)
return 1
if (
args.hook_stage in {'prepare-commit-msg', 'commit-msg'} and
not args.commit_msg_filename
):
logger.error(
f'`--commit-msg-filename` is required for '
f'`--hook-stage {args.hook_stage}`',
)
return 1
# Expose from-ref / to-ref as environment variables for hooks to consume
if args.from_ref and args.to_ref:
# legacy names
environ['PRE_COMMIT_ORIGIN'] = args.from_ref
environ['PRE_COMMIT_SOURCE'] = args.to_ref
# new names
environ['PRE_COMMIT_FROM_REF'] = args.from_ref
environ['PRE_COMMIT_TO_REF'] = args.to_ref
if args.remote_name and args.remote_url:
environ['PRE_COMMIT_REMOTE_NAME'] = args.remote_name
environ['PRE_COMMIT_REMOTE_URL'] = args.remote_url
if args.checkout_type:
environ['PRE_COMMIT_CHECKOUT_TYPE'] = args.checkout_type
with contextlib.ExitStack() as exit_stack:
if stash:
exit_stack.enter_context(staged_files_only(store.directory))
config = load_config(config_file)
hooks = [
hook
for hook in all_hooks(config, store)
if not args.hook or hook.id == args.hook or hook.alias == args.hook
if args.hook_stage in hook.stages
]
if args.hook and not hooks:
output.write_line(
f'No hook with id `{args.hook}` in stage `{args.hook_stage}`',
)
return 1
install_hook_envs(hooks, store)
return _run_hooks(config, hooks, args, environ)
# https://github.com/python/mypy/issues/7726
raise AssertionError('unreachable')

View file

@ -0,0 +1,21 @@
# TODO: maybe `git ls-remote git://github.com/pre-commit/pre-commit-hooks` to
# determine the latest revision? This adds ~200ms from my tests (and is
# significantly faster than https:// or http://). For now, periodically
# manually updating the revision is fine.
SAMPLE_CONFIG = '''\
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
'''
def sample_config() -> int:
print(SAMPLE_CONFIG, end='')
return 0

View file

@ -0,0 +1,77 @@
import argparse
import logging
import os.path
from typing import Optional
from typing import Tuple
import pre_commit.constants as C
from pre_commit import git
from pre_commit import output
from pre_commit.clientlib import load_manifest
from pre_commit.commands.run import run
from pre_commit.store import Store
from pre_commit.util import cmd_output_b
from pre_commit.util import tmpdir
from pre_commit.util import yaml_dump
from pre_commit.xargs import xargs
logger = logging.getLogger(__name__)
def _repo_ref(tmpdir: str, repo: str, ref: Optional[str]) -> Tuple[str, str]:
# if `ref` is explicitly passed, use it
if ref is not None:
return repo, ref
ref = git.head_rev(repo)
# if it exists on disk, we'll try and clone it with the local changes
if os.path.exists(repo) and git.has_diff('HEAD', repo=repo):
logger.warning('Creating temporary repo with uncommitted changes...')
shadow = os.path.join(tmpdir, 'shadow-repo')
cmd_output_b('git', 'clone', repo, shadow)
cmd_output_b('git', 'checkout', ref, '-b', '_pc_tmp', cwd=shadow)
idx = git.git_path('index', repo=shadow)
objs = git.git_path('objects', repo=shadow)
env = dict(os.environ, GIT_INDEX_FILE=idx, GIT_OBJECT_DIRECTORY=objs)
staged_files = git.get_staged_files(cwd=repo)
if staged_files:
xargs(('git', 'add', '--'), staged_files, cwd=repo, env=env)
cmd_output_b('git', 'add', '-u', cwd=repo, env=env)
git.commit(repo=shadow)
return shadow, git.head_rev(shadow)
else:
return repo, ref
def try_repo(args: argparse.Namespace) -> int:
with tmpdir() as tempdir:
repo, ref = _repo_ref(tempdir, args.repo, args.ref)
store = Store(tempdir)
if args.hook:
hooks = [{'id': args.hook}]
else:
repo_path = store.clone(repo, ref)
manifest = load_manifest(os.path.join(repo_path, C.MANIFEST_FILE))
manifest = sorted(manifest, key=lambda hook: hook['id'])
hooks = [{'id': hook['id']} for hook in manifest]
config = {'repos': [{'repo': repo, 'rev': ref, 'hooks': hooks}]}
config_s = yaml_dump(config)
config_filename = os.path.join(tempdir, C.CONFIG_FILE)
with open(config_filename, 'w') as cfg:
cfg.write(config_s)
output.write_line('=' * 79)
output.write_line('Using config:')
output.write_line('=' * 79)
output.write(config_s)
output.write_line('=' * 79)
return run(config_filename, store, args)