1
0
Fork 0

Merging upstream version 3.0.25.

Signed-off-by: Daniel Baumann <daniel@debian.org>
This commit is contained in:
Daniel Baumann 2025-02-09 18:31:41 +01:00
parent 8e41bc821f
commit 7159687519
Signed by: daniel
GPG key ID: FBB4F0E80A80222F
26 changed files with 770 additions and 530 deletions

View file

@ -22,13 +22,13 @@ jobs:
run: | run: |
sudo apt remove python3-pip sudo apt remove python3-pip
python -m pip install --upgrade pip python -m pip install --upgrade pip
python -m pip install . black isort mypy pytest readme_renderer python -m pip install . ruff mypy pytest readme_renderer
pip list pip list
- name: Type Checker - name: Type Checker
run: | run: |
mypy ptpython mypy ptpython
isort -c --profile black ptpython examples setup.py ruff .
black --check ptpython examples setup.py ruff format --check .
- name: Run Tests - name: Run Tests
run: | run: |
./tests/run_tests.py ./tests/run_tests.py

View file

@ -1,6 +1,33 @@
CHANGELOG CHANGELOG
========= =========
3.0.25: 2023-12-14
------------------
Fixes:
- Fix handling of 'config file does not exist' when embedding ptpython.
3.0.24: 2023-12-13
------------------
Fixes:
- Don't show "Impossible to read config file" warnings when no config file was
passed to `run_config()`.
- IPython integration fixes:
* Fix top-level await in IPython.
* Fix IPython `DeprecationWarning`.
- Output printing fixes:
* Paginate exceptions if pagination is enabled.
* Handle big outputs without running out of memory.
- Asyncio REPL improvements:
* From now on, passing `--asyncio` is required to activate the asyncio-REPL.
This will ensure that an event loop is created at the start in which we can
run top-level await statements.
* Use `get_running_loop()` instead of `get_event_loop()`.
* Better handling of `SystemExit` and control-c in the async REPL.
3.0.23: 2023-02-22 3.0.23: 2023-02-22
------------------ ------------------
@ -191,7 +218,7 @@ New features:
- Optional pager for displaying outputs that don't fit on the screen. - Optional pager for displaying outputs that don't fit on the screen.
- Added --light-bg and --dark-bg flags to automatically optimize the brightness - Added --light-bg and --dark-bg flags to automatically optimize the brightness
of the colors according to the terminal background. of the colors according to the terminal background.
- Addd `PTPYTHON_CONFIG_HOME` for explicitely setting the config directory. - Add `PTPYTHON_CONFIG_HOME` for explicitly setting the config directory.
- Show completion suffixes (like '(' for functions). - Show completion suffixes (like '(' for functions).
Fixes: Fixes:

View file

@ -1,4 +1,4 @@
Copyright (c) 2015, Jonathan Slenders Copyright (c) 2015-2023, Jonathan Slenders
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without modification, Redistribution and use in source and binary forms, with or without modification,

View file

@ -71,6 +71,7 @@ The help menu shows basic command-line options.
-h, --help show this help message and exit -h, --help show this help message and exit
--vi Enable Vi key bindings --vi Enable Vi key bindings
-i, --interactive Start interactive shell after executing this file. -i, --interactive Start interactive shell after executing this file.
--asyncio Run an asyncio event loop to support top-level "await".
--light-bg Run on a light background (use dark colors for text). --light-bg Run on a light background (use dark colors for text).
--dark-bg Run on a dark background (use light colors for text). --dark-bg Run on a dark background (use light colors for text).
--config-file CONFIG_FILE --config-file CONFIG_FILE
@ -171,6 +172,20 @@ error.
.. image :: https://github.com/jonathanslenders/ptpython/raw/master/docs/images/validation.png .. image :: https://github.com/jonathanslenders/ptpython/raw/master/docs/images/validation.png
Asyncio REPL and top level await
********************************
In order to get top-level ``await`` support, start ptpython as follows:
.. code::
ptpython --asyncio
This will spawn an asyncio event loop and embed the async REPL in the event
loop. After this, top-level await will work and statements like ``await
asyncio.sleep(10)`` will execute.
Additional features Additional features
******************* *******************

View file

@ -19,7 +19,7 @@ loop = asyncio.get_event_loop()
counter = [0] counter = [0]
async def print_counter(): async def print_counter() -> None:
""" """
Coroutine that prints counters and saves it in a global variable. Coroutine that prints counters and saves it in a global variable.
""" """
@ -29,7 +29,7 @@ async def print_counter():
await asyncio.sleep(3) await asyncio.sleep(3)
async def interactive_shell(): async def interactive_shell() -> None:
""" """
Coroutine that starts a Python REPL from which we can access the global Coroutine that starts a Python REPL from which we can access the global
counter variable. counter variable.
@ -44,13 +44,10 @@ async def interactive_shell():
loop.stop() loop.stop()
def main(): async def main() -> None:
asyncio.ensure_future(print_counter()) asyncio.create_task(print_counter())
asyncio.ensure_future(interactive_shell()) await interactive_shell()
loop.run_forever()
loop.close()
if __name__ == "__main__": if __name__ == "__main__":
main() asyncio.run(main())

View file

@ -32,31 +32,25 @@ class MySSHServer(asyncssh.SSHServer):
return ReplSSHServerSession(self.get_namespace) return ReplSSHServerSession(self.get_namespace)
def main(port=8222): async def main(port: int = 8222) -> None:
""" """
Example that starts the REPL through an SSH server. Example that starts the REPL through an SSH server.
""" """
loop = asyncio.get_event_loop()
# Namespace exposed in the REPL. # Namespace exposed in the REPL.
environ = {"hello": "world"} environ = {"hello": "world"}
# Start SSH server. # Start SSH server.
def create_server(): def create_server() -> MySSHServer:
return MySSHServer(lambda: environ) return MySSHServer(lambda: environ)
print("Listening on :%i" % port) print("Listening on :%i" % port)
print('To connect, do "ssh localhost -p %i"' % port) print('To connect, do "ssh localhost -p %i"' % port)
loop.run_until_complete( await asyncssh.create_server(
asyncssh.create_server( create_server, "", port, server_host_keys=["/etc/ssh/ssh_host_dsa_key"]
create_server, "", port, server_host_keys=["/etc/ssh/ssh_host_dsa_key"]
)
) )
await asyncio.Future() # Wait forever.
# Run eventloop.
loop.run_forever()
if __name__ == "__main__": if __name__ == "__main__":
main() asyncio.run(main())

View file

@ -70,6 +70,9 @@ def configure(repl):
# Vi mode. # Vi mode.
repl.vi_mode = False repl.vi_mode = False
# Enable the modal cursor (when using Vi mode). Other options are 'Block', 'Underline', 'Beam', 'Blink under', 'Blink block', and 'Blink beam'
repl.cursor_shape_config = "Modal (vi)"
# Paste mode. (When True, don't insert whitespace after new line.) # Paste mode. (When True, don't insert whitespace after new line.)
repl.paste_mode = False repl.paste_mode = False

View file

@ -2,26 +2,26 @@
""" """
Example of embedding a Python REPL, and setting a custom prompt. Example of embedding a Python REPL, and setting a custom prompt.
""" """
from prompt_toolkit.formatted_text import HTML from prompt_toolkit.formatted_text import HTML, AnyFormattedText
from ptpython.prompt_style import PromptStyle from ptpython.prompt_style import PromptStyle
from ptpython.repl import embed from ptpython.repl import embed
def configure(repl): def configure(repl) -> None:
# Probably, the best is to add a new PromptStyle to `all_prompt_styles` and # Probably, the best is to add a new PromptStyle to `all_prompt_styles` and
# activate it. This way, the other styles are still selectable from the # activate it. This way, the other styles are still selectable from the
# menu. # menu.
class CustomPrompt(PromptStyle): class CustomPrompt(PromptStyle):
def in_prompt(self): def in_prompt(self) -> AnyFormattedText:
return HTML("<ansigreen>Input[%s]</ansigreen>: ") % ( return HTML("<ansigreen>Input[%s]</ansigreen>: ") % (
repl.current_statement_index, repl.current_statement_index,
) )
def in2_prompt(self, width): def in2_prompt(self, width: int) -> AnyFormattedText:
return "...: ".rjust(width) return "...: ".rjust(width)
def out_prompt(self): def out_prompt(self) -> AnyFormattedText:
return HTML("<ansired>Result[%s]</ansired>: ") % ( return HTML("<ansired>Result[%s]</ansired>: ") % (
repl.current_statement_index, repl.current_statement_index,
) )
@ -30,7 +30,7 @@ def configure(repl):
repl.prompt_style = "custom" repl.prompt_style = "custom"
def main(): def main() -> None:
embed(globals(), locals(), configure=configure) embed(globals(), locals(), configure=configure)

View file

@ -4,7 +4,7 @@
from ptpython.repl import embed from ptpython.repl import embed
def main(): def main() -> None:
embed(globals(), locals(), vi_mode=False) embed(globals(), locals(), vi_mode=False)

View file

@ -11,13 +11,16 @@ import pathlib
import asyncssh import asyncssh
from prompt_toolkit import print_formatted_text from prompt_toolkit import print_formatted_text
from prompt_toolkit.contrib.ssh.server import PromptToolkitSSHServer from prompt_toolkit.contrib.ssh.server import (
PromptToolkitSSHServer,
PromptToolkitSSHSession,
)
from prompt_toolkit.contrib.telnet.server import TelnetServer from prompt_toolkit.contrib.telnet.server import TelnetServer
from ptpython.repl import embed from ptpython.repl import embed
def ensure_key(filename="ssh_host_key"): def ensure_key(filename: str = "ssh_host_key") -> str:
path = pathlib.Path(filename) path = pathlib.Path(filename)
if not path.exists(): if not path.exists():
rsa_key = asyncssh.generate_private_key("ssh-rsa") rsa_key = asyncssh.generate_private_key("ssh-rsa")
@ -25,12 +28,12 @@ def ensure_key(filename="ssh_host_key"):
return str(path) return str(path)
async def interact(connection=None): async def interact(connection: PromptToolkitSSHSession) -> None:
global_dict = {**globals(), "print": print_formatted_text} global_dict = {**globals(), "print": print_formatted_text}
await embed(return_asyncio_coroutine=True, globals=global_dict) await embed(return_asyncio_coroutine=True, globals=global_dict)
async def main(ssh_port=8022, telnet_port=8023): async def main(ssh_port: int = 8022, telnet_port: int = 8023) -> None:
ssh_server = PromptToolkitSSHServer(interact=interact) ssh_server = PromptToolkitSSHServer(interact=interact)
await asyncssh.create_server( await asyncssh.create_server(
lambda: ssh_server, "", ssh_port, server_host_keys=[ensure_key()] lambda: ssh_server, "", ssh_port, server_host_keys=[ensure_key()]

View file

@ -6,7 +6,7 @@ import inspect
import keyword import keyword
import re import re
from enum import Enum from enum import Enum
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple from typing import TYPE_CHECKING, Any, Callable, Iterable
from prompt_toolkit.completion import ( from prompt_toolkit.completion import (
CompleteEvent, CompleteEvent,
@ -259,7 +259,7 @@ class JediCompleter(Completer):
# See: https://github.com/jonathanslenders/ptpython/issues/223 # See: https://github.com/jonathanslenders/ptpython/issues/223
pass pass
except Exception: except Exception:
# Supress all other Jedi exceptions. # Suppress all other Jedi exceptions.
pass pass
else: else:
# Move function parameters to the top. # Move function parameters to the top.
@ -367,7 +367,7 @@ class DictionaryCompleter(Completer):
rf""" rf"""
{expression} {expression}
# Dict loopup to complete (square bracket open + start of # Dict lookup to complete (square bracket open + start of
# string). # string).
\[ \[
\s* ([^\[\]]*)$ \s* ([^\[\]]*)$
@ -380,7 +380,7 @@ class DictionaryCompleter(Completer):
rf""" rf"""
{expression} {expression}
# Attribute loopup to complete (dot + varname). # Attribute lookup to complete (dot + varname).
\. \.
\s* ([a-zA-Z0-9_]*)$ \s* ([a-zA-Z0-9_]*)$
""", """,

View file

@ -9,20 +9,20 @@ package should be installable in Python 2 as well!
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
from typing import Any, Optional, TextIO, cast from typing import Any, AnyStr, TextIO, cast
import asyncssh import asyncssh
from prompt_toolkit.data_structures import Size from prompt_toolkit.data_structures import Size
from prompt_toolkit.input import create_pipe_input from prompt_toolkit.input import create_pipe_input
from prompt_toolkit.output.vt100 import Vt100_Output from prompt_toolkit.output.vt100 import Vt100_Output
from ptpython.python_input import _GetNamespace from ptpython.python_input import _GetNamespace, _Namespace
from ptpython.repl import PythonRepl from ptpython.repl import PythonRepl
__all__ = ["ReplSSHServerSession"] __all__ = ["ReplSSHServerSession"]
class ReplSSHServerSession(asyncssh.SSHServerSession): class ReplSSHServerSession(asyncssh.SSHServerSession[str]):
""" """
SSH server session that runs a Python REPL. SSH server session that runs a Python REPL.
@ -35,7 +35,7 @@ class ReplSSHServerSession(asyncssh.SSHServerSession):
) -> None: ) -> None:
self._chan: Any = None self._chan: Any = None
def _globals() -> dict: def _globals() -> _Namespace:
data = get_globals() data = get_globals()
data.setdefault("print", self._print) data.setdefault("print", self._print)
return data return data
@ -79,7 +79,7 @@ class ReplSSHServerSession(asyncssh.SSHServerSession):
width, height, pixwidth, pixheight = self._chan.get_terminal_size() width, height, pixwidth, pixheight = self._chan.get_terminal_size()
return Size(rows=height, columns=width) return Size(rows=height, columns=width)
def connection_made(self, chan): def connection_made(self, chan: Any) -> None:
""" """
Client connected, run repl in coroutine. Client connected, run repl in coroutine.
""" """
@ -89,7 +89,7 @@ class ReplSSHServerSession(asyncssh.SSHServerSession):
f = asyncio.ensure_future(self.repl.run_async()) f = asyncio.ensure_future(self.repl.run_async())
# Close channel when done. # Close channel when done.
def done(_) -> None: def done(_: object) -> None:
chan.close() chan.close()
self._chan = None self._chan = None
@ -98,24 +98,28 @@ class ReplSSHServerSession(asyncssh.SSHServerSession):
def shell_requested(self) -> bool: def shell_requested(self) -> bool:
return True return True
def terminal_size_changed(self, width, height, pixwidth, pixheight): def terminal_size_changed(
self, width: int, height: int, pixwidth: int, pixheight: int
) -> None:
""" """
When the terminal size changes, report back to CLI. When the terminal size changes, report back to CLI.
""" """
self.repl.app._on_resize() self.repl.app._on_resize()
def data_received(self, data, datatype): def data_received(self, data: AnyStr, datatype: int | None) -> None:
""" """
When data is received, send to inputstream of the CLI and repaint. When data is received, send to inputstream of the CLI and repaint.
""" """
self._input_pipe.send(data) self._input_pipe.send(data) # type: ignore
def _print(self, *data, sep=" ", end="\n", file=None) -> None: def _print(
self, *data: object, sep: str = " ", end: str = "\n", file: Any = None
) -> None:
""" """
Alternative 'print' function that prints back into the SSH channel. Alternative 'print' function that prints back into the SSH channel.
""" """
# Pop keyword-only arguments. (We cannot use the syntax from the # Pop keyword-only arguments. (We cannot use the syntax from the
# signature. Otherwise, Python2 will give a syntax error message when # signature. Otherwise, Python2 will give a syntax error message when
# installing.) # installing.)
data = sep.join(map(str, data)) data_as_str = sep.join(map(str, data))
self._chan.write(data + end) self._chan.write(data_as_str + end)

View file

@ -9,6 +9,7 @@ optional arguments:
-h, --help show this help message and exit -h, --help show this help message and exit
--vi Enable Vi key bindings --vi Enable Vi key bindings
-i, --interactive Start interactive shell after executing this file. -i, --interactive Start interactive shell after executing this file.
--asyncio Run an asyncio event loop to support top-level "await".
--light-bg Run on a light background (use dark colors for text). --light-bg Run on a light background (use dark colors for text).
--dark-bg Run on a dark background (use light colors for text). --dark-bg Run on a dark background (use light colors for text).
--config-file CONFIG_FILE --config-file CONFIG_FILE
@ -24,11 +25,12 @@ environment variables:
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import asyncio
import os import os
import pathlib import pathlib
import sys import sys
from textwrap import dedent from textwrap import dedent
from typing import IO, Optional, Tuple from typing import IO
import appdirs import appdirs
from prompt_toolkit.formatted_text import HTML from prompt_toolkit.formatted_text import HTML
@ -68,16 +70,21 @@ def create_parser() -> _Parser:
action="store_true", action="store_true",
help="Start interactive shell after executing this file.", help="Start interactive shell after executing this file.",
) )
parser.add_argument(
"--asyncio",
action="store_true",
help='Run an asyncio event loop to support top-level "await".',
)
parser.add_argument( parser.add_argument(
"--light-bg", "--light-bg",
action="store_true", action="store_true",
help="Run on a light background (use dark colors for text).", help="Run on a light background (use dark colors for text).",
), )
parser.add_argument( parser.add_argument(
"--dark-bg", "--dark-bg",
action="store_true", action="store_true",
help="Run on a dark background (use light colors for text).", help="Run on a dark background (use light colors for text).",
), )
parser.add_argument( parser.add_argument(
"--config-file", type=str, help="Location of configuration file." "--config-file", type=str, help="Location of configuration file."
) )
@ -206,7 +213,7 @@ def run() -> None:
import __main__ import __main__
embed( embed_result = embed( # type: ignore
vi_mode=a.vi, vi_mode=a.vi,
history_filename=history_file, history_filename=history_file,
configure=configure, configure=configure,
@ -214,8 +221,14 @@ def run() -> None:
globals=__main__.__dict__, globals=__main__.__dict__,
startup_paths=startup_paths, startup_paths=startup_paths,
title="Python REPL (ptpython)", title="Python REPL (ptpython)",
return_asyncio_coroutine=a.asyncio,
) )
if a.asyncio:
print("Starting ptpython asyncio REPL")
print('Use "await" directly instead of "asyncio.run()".')
asyncio.run(embed_result)
if __name__ == "__main__": if __name__ == "__main__":
run() run()

View file

@ -7,7 +7,7 @@ run as a sub application of the Repl/PythonInput.
from __future__ import annotations from __future__ import annotations
from functools import partial from functools import partial
from typing import TYPE_CHECKING, Callable, List, Optional, Set from typing import TYPE_CHECKING, Callable
from prompt_toolkit.application import Application from prompt_toolkit.application import Application
from prompt_toolkit.application.current import get_app from prompt_toolkit.application.current import get_app
@ -107,6 +107,7 @@ Further, remember that searching works like in Emacs
class BORDER: class BORDER:
"Box drawing characters." "Box drawing characters."
HORIZONTAL = "\u2501" HORIZONTAL = "\u2501"
VERTICAL = "\u2503" VERTICAL = "\u2503"
TOP_LEFT = "\u250f" TOP_LEFT = "\u250f"

View file

@ -14,7 +14,7 @@ from typing import Iterable
from warnings import warn from warnings import warn
from IPython import utils as ipy_utils from IPython import utils as ipy_utils
from IPython.core.inputsplitter import IPythonInputSplitter from IPython.core.inputtransformer2 import TransformerManager
from IPython.terminal.embed import InteractiveShellEmbed as _InteractiveShellEmbed from IPython.terminal.embed import InteractiveShellEmbed as _InteractiveShellEmbed
from IPython.terminal.ipapp import load_default_config from IPython.terminal.ipapp import load_default_config
from prompt_toolkit.completion import ( from prompt_toolkit.completion import (
@ -38,6 +38,7 @@ from ptpython.prompt_style import PromptStyle
from .completer import PythonCompleter from .completer import PythonCompleter
from .python_input import PythonInput from .python_input import PythonInput
from .repl import PyCF_ALLOW_TOP_LEVEL_AWAIT
from .style import default_ui_style from .style import default_ui_style
from .validator import PythonValidator from .validator import PythonValidator
@ -65,7 +66,7 @@ class IPythonPrompt(PromptStyle):
class IPythonValidator(PythonValidator): class IPythonValidator(PythonValidator):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.isp = IPythonInputSplitter() self.isp = TransformerManager()
def validate(self, document: Document) -> None: def validate(self, document: Document) -> None:
document = Document(text=self.isp.transform_cell(document.text)) document = Document(text=self.isp.transform_cell(document.text))
@ -211,6 +212,12 @@ class IPythonInput(PythonInput):
self.ui_styles = {"default": Style.from_dict(style_dict)} self.ui_styles = {"default": Style.from_dict(style_dict)}
self.use_ui_colorscheme("default") self.use_ui_colorscheme("default")
def get_compiler_flags(self):
flags = super().get_compiler_flags()
if self.ipython_shell.autoawait:
flags |= PyCF_ALLOW_TOP_LEVEL_AWAIT
return flags
class InteractiveShellEmbed(_InteractiveShellEmbed): class InteractiveShellEmbed(_InteractiveShellEmbed):
""" """

View file

@ -7,7 +7,7 @@ import platform
import sys import sys
from enum import Enum from enum import Enum
from inspect import _ParameterKind as ParameterKind from inspect import _ParameterKind as ParameterKind
from typing import TYPE_CHECKING, Any, List, Optional, Type from typing import TYPE_CHECKING, Any
from prompt_toolkit.application import get_app from prompt_toolkit.application import get_app
from prompt_toolkit.enums import DEFAULT_BUFFER, SEARCH_BUFFER from prompt_toolkit.enums import DEFAULT_BUFFER, SEARCH_BUFFER
@ -17,11 +17,7 @@ from prompt_toolkit.filters import (
is_done, is_done,
renderer_height_is_known, renderer_height_is_known,
) )
from prompt_toolkit.formatted_text import ( from prompt_toolkit.formatted_text import fragment_list_width, to_formatted_text
AnyFormattedText,
fragment_list_width,
to_formatted_text,
)
from prompt_toolkit.formatted_text.base import StyleAndTextTuples from prompt_toolkit.formatted_text.base import StyleAndTextTuples
from prompt_toolkit.key_binding.vi_state import InputMode from prompt_toolkit.key_binding.vi_state import InputMode
from prompt_toolkit.layout.containers import ( from prompt_toolkit.layout.containers import (
@ -60,7 +56,6 @@ from prompt_toolkit.widgets.toolbars import (
SystemToolbar, SystemToolbar,
ValidationToolbar, ValidationToolbar,
) )
from pygments.lexers import PythonLexer
from .filters import HasSignature, ShowDocstring, ShowSidebar, ShowSignature from .filters import HasSignature, ShowDocstring, ShowSidebar, ShowSignature
from .prompt_style import PromptStyle from .prompt_style import PromptStyle
@ -74,6 +69,7 @@ __all__ = ["PtPythonLayout", "CompletionVisualisation"]
class CompletionVisualisation(Enum): class CompletionVisualisation(Enum):
"Visualisation method for the completions." "Visualisation method for the completions."
NONE = "none" NONE = "none"
POP_UP = "pop-up" POP_UP = "pop-up"
MULTI_COLUMN = "multi-column" MULTI_COLUMN = "multi-column"
@ -151,7 +147,7 @@ def python_sidebar(python_input: PythonInput) -> Window:
append_category(category) append_category(category)
for option in category.options: for option in category.options:
append(i, option.title, "%s" % (option.get_current_value(),)) append(i, option.title, str(option.get_current_value()))
i += 1 i += 1
tokens.pop() # Remove last newline. tokens.pop() # Remove last newline.
@ -302,13 +298,15 @@ def signature_toolbar(python_input: PythonInput) -> Container:
content=Window( content=Window(
FormattedTextControl(get_text_fragments), height=Dimension.exact(1) FormattedTextControl(get_text_fragments), height=Dimension.exact(1)
), ),
filter=
# Show only when there is a signature # Show only when there is a signature
HasSignature(python_input) & filter=HasSignature(python_input)
&
# Signature needs to be shown. # Signature needs to be shown.
ShowSignature(python_input) & ShowSignature(python_input)
&
# And no sidebar is visible. # And no sidebar is visible.
~ShowSidebar(python_input) & ~ShowSidebar(python_input)
&
# Not done yet. # Not done yet.
~is_done, ~is_done,
) )

View file

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import Callable, Optional from typing import Callable
from prompt_toolkit.document import Document from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import StyleAndTextTuples from prompt_toolkit.formatted_text import StyleAndTextTuples

435
ptpython/printer.py Normal file
View file

@ -0,0 +1,435 @@
from __future__ import annotations
import sys
import traceback
from dataclasses import dataclass
from enum import Enum
from typing import Generator, Iterable
from prompt_toolkit.formatted_text import (
HTML,
AnyFormattedText,
FormattedText,
OneStyleAndTextTuple,
StyleAndTextTuples,
fragment_list_width,
merge_formatted_text,
to_formatted_text,
)
from prompt_toolkit.formatted_text.utils import split_lines
from prompt_toolkit.input import Input
from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
from prompt_toolkit.output import Output
from prompt_toolkit.shortcuts import PromptSession, print_formatted_text
from prompt_toolkit.styles import BaseStyle, StyleTransformation
from prompt_toolkit.styles.pygments import pygments_token_to_classname
from prompt_toolkit.utils import get_cwidth
from pygments.lexers import PythonLexer, PythonTracebackLexer
__all__ = ["OutputPrinter"]
# Never reformat results larger than this:
MAX_REFORMAT_SIZE = 1_000_000
@dataclass
class OutputPrinter:
"""
Result printer.
Usage::
printer = OutputPrinter(...)
printer.display_result(...)
printer.display_exception(...)
"""
output: Output
input: Input
style: BaseStyle
title: AnyFormattedText
style_transformation: StyleTransformation
def display_result(
self,
result: object,
*,
out_prompt: AnyFormattedText,
reformat: bool,
highlight: bool,
paginate: bool,
) -> None:
"""
Show __repr__ (or `__pt_repr__`) for an `eval` result and print to output.
:param reformat: Reformat result using 'black' before printing if the
result is parsable as Python code.
:param highlight: Syntax highlight the result.
:param paginate: Show paginator when the result does not fit on the
screen.
"""
out_prompt = to_formatted_text(out_prompt)
out_prompt_width = fragment_list_width(out_prompt)
result = self._insert_out_prompt_and_split_lines(
self._format_result_output(
result,
reformat=reformat,
highlight=highlight,
line_length=self.output.get_size().columns - out_prompt_width,
paginate=paginate,
),
out_prompt=out_prompt,
)
self._display_result(result, paginate=paginate)
def display_exception(
self, e: BaseException, *, highlight: bool, paginate: bool
) -> None:
"""
Render an exception.
"""
result = self._insert_out_prompt_and_split_lines(
self._format_exception_output(e, highlight=highlight),
out_prompt="",
)
self._display_result(result, paginate=paginate)
def display_style_and_text_tuples(
self,
result: Iterable[OneStyleAndTextTuple],
*,
paginate: bool,
) -> None:
self._display_result(
self._insert_out_prompt_and_split_lines(result, out_prompt=""),
paginate=paginate,
)
def _display_result(
self,
lines: Iterable[StyleAndTextTuples],
*,
paginate: bool,
) -> None:
if paginate:
self._print_paginated_formatted_text(lines)
else:
for line in lines:
self._print_formatted_text(line)
self.output.flush()
def _print_formatted_text(self, line: StyleAndTextTuples, end: str = "\n") -> None:
print_formatted_text(
FormattedText(line),
style=self.style,
style_transformation=self.style_transformation,
include_default_pygments_style=False,
output=self.output,
end=end,
)
def _format_result_output(
self,
result: object,
*,
reformat: bool,
highlight: bool,
line_length: int,
paginate: bool,
) -> Generator[OneStyleAndTextTuple, None, None]:
"""
Format __repr__ for an `eval` result.
Note: this can raise `KeyboardInterrupt` if either calling `__repr__`,
`__pt_repr__` or formatting the output with "Black" takes to long
and the user presses Control-C.
"""
# If __pt_repr__ is present, take this. This can return prompt_toolkit
# formatted text.
try:
if hasattr(result, "__pt_repr__"):
formatted_result_repr = to_formatted_text(
getattr(result, "__pt_repr__")()
)
yield from formatted_result_repr
return
except KeyboardInterrupt:
raise # Don't catch here.
except:
# For bad code, `__getattr__` can raise something that's not an
# `AttributeError`. This happens already when calling `hasattr()`.
pass
# Call `__repr__` of given object first, to turn it in a string.
try:
result_repr = repr(result)
except KeyboardInterrupt:
raise # Don't catch here.
except BaseException as e:
# Calling repr failed.
self.display_exception(e, highlight=highlight, paginate=paginate)
return
# Determine whether it's valid Python code. If not,
# reformatting/highlighting won't be applied.
if len(result_repr) < MAX_REFORMAT_SIZE:
try:
compile(result_repr, "", "eval")
except SyntaxError:
valid_python = False
else:
valid_python = True
else:
valid_python = False
if valid_python and reformat:
# Inline import. Slightly speed up start-up time if black is
# not used.
try:
import black
if not hasattr(black, "Mode"):
raise ImportError
except ImportError:
pass # no Black package in your installation
else:
result_repr = black.format_str(
result_repr,
mode=black.Mode(line_length=line_length),
)
if valid_python and highlight:
yield from _lex_python_result(result_repr)
else:
yield ("", result_repr)
def _insert_out_prompt_and_split_lines(
self, result: Iterable[OneStyleAndTextTuple], out_prompt: AnyFormattedText
) -> Iterable[StyleAndTextTuples]:
r"""
Split styled result in lines (based on the \n characters in the result)
an insert output prompt on whitespace in front of each line. (This does
not yet do the soft wrapping.)
Yield lines as a result.
"""
out_prompt = to_formatted_text(out_prompt)
out_prompt_width = fragment_list_width(out_prompt)
prefix = ("", " " * out_prompt_width)
for i, line in enumerate(split_lines(result)):
if i == 0:
line = [*out_prompt, *line]
else:
line = [prefix, *line]
yield line
def _apply_soft_wrapping(
self, lines: Iterable[StyleAndTextTuples]
) -> Iterable[StyleAndTextTuples]:
"""
Apply soft wrapping to the given lines. Wrap according to the terminal
width. Insert whitespace in front of each wrapped line to align it with
the output prompt.
"""
line_length = self.output.get_size().columns
# Iterate over hard wrapped lines.
for lineno, line in enumerate(lines):
columns_in_buffer = 0
current_line: list[OneStyleAndTextTuple] = []
for style, text, *_ in line:
for c in text:
width = get_cwidth(c)
# (Soft) wrap line if it doesn't fit.
if columns_in_buffer + width > line_length:
yield current_line
columns_in_buffer = 0
current_line = []
columns_in_buffer += width
current_line.append((style, c))
if len(current_line) > 0:
yield current_line
def _print_paginated_formatted_text(
self, lines: Iterable[StyleAndTextTuples]
) -> None:
"""
Print formatted text, using --MORE-- style pagination.
(Avoid filling up the terminal's scrollback buffer.)
"""
lines = self._apply_soft_wrapping(lines)
pager_prompt = create_pager_prompt(
self.style, self.title, output=self.output, input=self.input
)
abort = False
print_all = False
# Max number of lines allowed in the buffer before painting.
size = self.output.get_size()
max_rows = size.rows - 1
# Page buffer.
page: StyleAndTextTuples = []
def show_pager() -> None:
nonlocal abort, max_rows, print_all
# Run pager prompt in another thread.
# Same as for the input. This prevents issues with nested event
# loops.
pager_result = pager_prompt.prompt(in_thread=True)
if pager_result == PagerResult.ABORT:
print("...")
abort = True
elif pager_result == PagerResult.NEXT_LINE:
max_rows = 1
elif pager_result == PagerResult.NEXT_PAGE:
max_rows = size.rows - 1
elif pager_result == PagerResult.PRINT_ALL:
print_all = True
# Loop over lines. Show --MORE-- prompt when page is filled.
rows = 0
for lineno, line in enumerate(lines):
page.extend(line)
page.append(("", "\n"))
rows += 1
if rows >= max_rows:
self._print_formatted_text(page, end="")
page = []
rows = 0
if not print_all:
show_pager()
if abort:
return
self._print_formatted_text(page)
def _format_exception_output(
self, e: BaseException, highlight: bool
) -> Generator[OneStyleAndTextTuple, None, None]:
# Instead of just calling ``traceback.format_exc``, we take the
# traceback and skip the bottom calls of this framework.
t, v, tb = sys.exc_info()
# Required for pdb.post_mortem() to work.
sys.last_type, sys.last_value, sys.last_traceback = t, v, tb
tblist = list(traceback.extract_tb(tb))
for line_nr, tb_tuple in enumerate(tblist):
if tb_tuple[0] == "<stdin>":
tblist = tblist[line_nr:]
break
tb_list = traceback.format_list(tblist)
if tb_list:
tb_list.insert(0, "Traceback (most recent call last):\n")
tb_list.extend(traceback.format_exception_only(t, v))
tb_str = "".join(tb_list)
# Format exception and write to output.
# (We use the default style. Most other styles result
# in unreadable colors for the traceback.)
if highlight:
for index, tokentype, text in PythonTracebackLexer().get_tokens_unprocessed(
tb_str
):
yield ("class:" + pygments_token_to_classname(tokentype), text)
else:
yield ("", tb_str)
class PagerResult(Enum):
ABORT = "ABORT"
NEXT_LINE = "NEXT_LINE"
NEXT_PAGE = "NEXT_PAGE"
PRINT_ALL = "PRINT_ALL"
def create_pager_prompt(
style: BaseStyle,
title: AnyFormattedText = "",
input: Input | None = None,
output: Output | None = None,
) -> PromptSession[PagerResult]:
"""
Create a "--MORE--" prompt for paginated output.
"""
bindings = KeyBindings()
@bindings.add("enter")
@bindings.add("down")
def next_line(event: KeyPressEvent) -> None:
event.app.exit(result=PagerResult.NEXT_LINE)
@bindings.add("space")
def next_page(event: KeyPressEvent) -> None:
event.app.exit(result=PagerResult.NEXT_PAGE)
@bindings.add("a")
def print_all(event: KeyPressEvent) -> None:
event.app.exit(result=PagerResult.PRINT_ALL)
@bindings.add("q")
@bindings.add("c-c")
@bindings.add("c-d")
@bindings.add("escape", eager=True)
def no(event: KeyPressEvent) -> None:
event.app.exit(result=PagerResult.ABORT)
@bindings.add("<any>")
def _(event: KeyPressEvent) -> None:
"Disallow inserting other text."
pass
session: PromptSession[PagerResult] = PromptSession(
merge_formatted_text(
[
title,
HTML(
"<status-toolbar>"
"<more> -- MORE -- </more> "
"<key>[Enter]</key> Scroll "
"<key>[Space]</key> Next page "
"<key>[a]</key> Print all "
"<key>[q]</key> Quit "
"</status-toolbar>: "
),
]
),
key_bindings=bindings,
erase_when_done=True,
style=style,
input=input,
output=output,
)
return session
def _lex_python_result(result: str) -> Generator[tuple[str, str], None, None]:
"Return token list for Python string."
lexer = PythonLexer()
# Use `get_tokens_unprocessed`, so that we get exactly the same string,
# without line endings appended. `print_formatted_text` already appends a
# line ending, and otherwise we'll have two line endings.
tokens = lexer.get_tokens_unprocessed(result)
for index, tokentype, text in tokens:
yield ("class:" + pygments_token_to_classname(tokentype), text)

View file

@ -4,20 +4,9 @@ This can be used for creation of Python REPLs.
""" """
from __future__ import annotations from __future__ import annotations
from asyncio import get_event_loop from asyncio import get_running_loop
from functools import partial from functools import partial
from typing import ( from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Mapping, TypeVar, Union
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
List,
Mapping,
Optional,
Tuple,
TypeVar,
)
from prompt_toolkit.application import Application, get_app from prompt_toolkit.application import Application, get_app
from prompt_toolkit.auto_suggest import ( from prompt_toolkit.auto_suggest import (
@ -42,7 +31,7 @@ from prompt_toolkit.cursor_shapes import (
) )
from prompt_toolkit.document import Document from prompt_toolkit.document import Document
from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode from prompt_toolkit.enums import DEFAULT_BUFFER, EditingMode
from prompt_toolkit.filters import Condition from prompt_toolkit.filters import Condition, FilterOrBool
from prompt_toolkit.formatted_text import AnyFormattedText from prompt_toolkit.formatted_text import AnyFormattedText
from prompt_toolkit.history import ( from prompt_toolkit.history import (
FileHistory, FileHistory,
@ -60,8 +49,13 @@ from prompt_toolkit.key_binding.bindings.auto_suggest import load_auto_suggest_b
from prompt_toolkit.key_binding.bindings.open_in_editor import ( from prompt_toolkit.key_binding.bindings.open_in_editor import (
load_open_in_editor_bindings, load_open_in_editor_bindings,
) )
from prompt_toolkit.key_binding.key_bindings import Binding, KeyHandlerCallable
from prompt_toolkit.key_binding.key_processor import KeyPressEvent
from prompt_toolkit.key_binding.vi_state import InputMode from prompt_toolkit.key_binding.vi_state import InputMode
from prompt_toolkit.keys import Keys
from prompt_toolkit.layout.containers import AnyContainer from prompt_toolkit.layout.containers import AnyContainer
from prompt_toolkit.layout.dimension import AnyDimension
from prompt_toolkit.layout.processors import Processor
from prompt_toolkit.lexers import DynamicLexer, Lexer, SimpleLexer from prompt_toolkit.lexers import DynamicLexer, Lexer, SimpleLexer
from prompt_toolkit.output import ColorDepth, Output from prompt_toolkit.output import ColorDepth, Output
from prompt_toolkit.styles import ( from prompt_toolkit.styles import (
@ -102,22 +96,23 @@ if TYPE_CHECKING:
from typing_extensions import Protocol from typing_extensions import Protocol
class _SupportsLessThan(Protocol): class _SupportsLessThan(Protocol):
# Taken from typeshed. _T is used by "sorted", which needs anything # Taken from typeshed. _T_lt is used by "sorted", which needs anything
# sortable. # sortable.
def __lt__(self, __other: Any) -> bool: def __lt__(self, __other: Any) -> bool:
... ...
_T = TypeVar("_T", bound="_SupportsLessThan") _T_lt = TypeVar("_T_lt", bound="_SupportsLessThan")
_T_kh = TypeVar("_T_kh", bound=Union[KeyHandlerCallable, Binding])
class OptionCategory(Generic[_T]): class OptionCategory(Generic[_T_lt]):
def __init__(self, title: str, options: list[Option[_T]]) -> None: def __init__(self, title: str, options: list[Option[_T_lt]]) -> None:
self.title = title self.title = title
self.options = options self.options = options
class Option(Generic[_T]): class Option(Generic[_T_lt]):
""" """
Ptpython configuration option that can be shown and modified from the Ptpython configuration option that can be shown and modified from the
sidebar. sidebar.
@ -133,10 +128,10 @@ class Option(Generic[_T]):
self, self,
title: str, title: str,
description: str, description: str,
get_current_value: Callable[[], _T], get_current_value: Callable[[], _T_lt],
# We accept `object` as return type for the select functions, because # We accept `object` as return type for the select functions, because
# often they return an unused boolean. Maybe this can be improved. # often they return an unused boolean. Maybe this can be improved.
get_values: Callable[[], Mapping[_T, Callable[[], object]]], get_values: Callable[[], Mapping[_T_lt, Callable[[], object]]],
) -> None: ) -> None:
self.title = title self.title = title
self.description = description self.description = description
@ -144,7 +139,7 @@ class Option(Generic[_T]):
self.get_values = get_values self.get_values = get_values
@property @property
def values(self) -> Mapping[_T, Callable[[], object]]: def values(self) -> Mapping[_T_lt, Callable[[], object]]:
return self.get_values() return self.get_values()
def activate_next(self, _previous: bool = False) -> None: def activate_next(self, _previous: bool = False) -> None:
@ -219,10 +214,10 @@ class PythonInput:
_completer: Completer | None = None, _completer: Completer | None = None,
_validator: Validator | None = None, _validator: Validator | None = None,
_lexer: Lexer | None = None, _lexer: Lexer | None = None,
_extra_buffer_processors=None, _extra_buffer_processors: list[Processor] | None = None,
_extra_layout_body: AnyContainer | None = None, _extra_layout_body: AnyContainer | None = None,
_extra_toolbars=None, _extra_toolbars: list[AnyContainer] | None = None,
_input_buffer_height=None, _input_buffer_height: AnyDimension | None = None,
) -> None: ) -> None:
self.get_globals: _GetNamespace = get_globals or (lambda: {}) self.get_globals: _GetNamespace = get_globals or (lambda: {})
self.get_locals: _GetNamespace = get_locals or self.get_globals self.get_locals: _GetNamespace = get_locals or self.get_globals
@ -333,7 +328,7 @@ class PythonInput:
# Cursor shapes. # Cursor shapes.
self.cursor_shape_config = "Block" self.cursor_shape_config = "Block"
self.all_cursor_shape_configs: Dict[str, AnyCursorShapeConfig] = { self.all_cursor_shape_configs: dict[str, AnyCursorShapeConfig] = {
"Block": CursorShape.BLOCK, "Block": CursorShape.BLOCK,
"Underline": CursorShape.UNDERLINE, "Underline": CursorShape.UNDERLINE,
"Beam": CursorShape.BEAM, "Beam": CursorShape.BEAM,
@ -379,7 +374,7 @@ class PythonInput:
self.options = self._create_options() self.options = self._create_options()
self.selected_option_index: int = 0 self.selected_option_index: int = 0
#: Incremeting integer counting the current statement. #: Incrementing integer counting the current statement.
self.current_statement_index: int = 1 self.current_statement_index: int = 1
# Code signatures. (This is set asynchronously after a timeout.) # Code signatures. (This is set asynchronously after a timeout.)
@ -477,24 +472,36 @@ class PythonInput:
return flags return flags
@property def add_key_binding(
def add_key_binding(self) -> Callable[[_T], _T]: self,
*keys: Keys | str,
filter: FilterOrBool = True,
eager: FilterOrBool = False,
is_global: FilterOrBool = False,
save_before: Callable[[KeyPressEvent], bool] = (lambda e: True),
record_in_macro: FilterOrBool = True,
) -> Callable[[_T_kh], _T_kh]:
""" """
Shortcut for adding new key bindings. Shortcut for adding new key bindings.
(Mostly useful for a config.py file, that receives (Mostly useful for a config.py file, that receives
a PythonInput/Repl instance as input.) a PythonInput/Repl instance as input.)
All arguments are identical to prompt_toolkit's `KeyBindings.add`.
:: ::
@python_input.add_key_binding(Keys.ControlX, filter=...) @python_input.add_key_binding(Keys.ControlX, filter=...)
def handler(event): def handler(event):
... ...
""" """
return self.extra_key_bindings.add(
def add_binding_decorator(*k, **kw): *keys,
return self.extra_key_bindings.add(*k, **kw) filter=filter,
eager=eager,
return add_binding_decorator is_global=is_global,
save_before=save_before,
record_in_macro=record_in_macro,
)
def install_code_colorscheme(self, name: str, style: BaseStyle) -> None: def install_code_colorscheme(self, name: str, style: BaseStyle) -> None:
""" """
@ -607,10 +614,10 @@ class PythonInput:
description="Change the cursor style, possibly according " description="Change the cursor style, possibly according "
"to the Vi input mode.", "to the Vi input mode.",
get_current_value=lambda: self.cursor_shape_config, get_current_value=lambda: self.cursor_shape_config,
get_values=lambda: dict( get_values=lambda: {
(s, partial(enable, "cursor_shape_config", s)) s: partial(enable, "cursor_shape_config", s)
for s in self.all_cursor_shape_configs for s in self.all_cursor_shape_configs
), },
), ),
simple_option( simple_option(
title="Paste mode", title="Paste mode",
@ -835,7 +842,7 @@ class PythonInput:
[ [
simple_option( simple_option(
title="Syntax highlighting", title="Syntax highlighting",
description="Use colors for syntax highligthing", description="Use colors for syntax highlighting",
field_name="enable_syntax_highlighting", field_name="enable_syntax_highlighting",
), ),
simple_option( simple_option(
@ -1003,7 +1010,7 @@ class PythonInput:
app = self.app app = self.app
async def on_timeout_task() -> None: async def on_timeout_task() -> None:
loop = get_event_loop() loop = get_running_loop()
# Never run multiple get-signature threads. # Never run multiple get-signature threads.
if self._get_signatures_thread_running: if self._get_signatures_thread_running:

View file

@ -12,38 +12,24 @@ from __future__ import annotations
import asyncio import asyncio
import builtins import builtins
import os import os
import signal
import sys import sys
import traceback import traceback
import types import types
import warnings import warnings
from dis import COMPILER_FLAG_NAMES from dis import COMPILER_FLAG_NAMES
from enum import Enum from typing import Any, Callable, ContextManager, Iterable
from typing import Any, Callable, ContextManager, Dict, Optional
from prompt_toolkit.formatted_text import ( from prompt_toolkit.formatted_text import OneStyleAndTextTuple
HTML,
AnyFormattedText,
FormattedText,
PygmentsTokens,
StyleAndTextTuples,
fragment_list_width,
merge_formatted_text,
to_formatted_text,
)
from prompt_toolkit.formatted_text.utils import fragment_list_to_text, split_lines
from prompt_toolkit.key_binding import KeyBindings, KeyPressEvent
from prompt_toolkit.patch_stdout import patch_stdout as patch_stdout_context from prompt_toolkit.patch_stdout import patch_stdout as patch_stdout_context
from prompt_toolkit.shortcuts import ( from prompt_toolkit.shortcuts import (
PromptSession,
clear_title, clear_title,
print_formatted_text,
set_title, set_title,
) )
from prompt_toolkit.styles import BaseStyle from prompt_toolkit.utils import DummyContext
from prompt_toolkit.utils import DummyContext, get_cwidth from pygments.lexers import PythonTracebackLexer # noqa: F401
from pygments.lexers import PythonLexer, PythonTracebackLexer
from pygments.token import Token
from .printer import OutputPrinter
from .python_input import PythonInput from .python_input import PythonInput
PyCF_ALLOW_TOP_LEVEL_AWAIT: int PyCF_ALLOW_TOP_LEVEL_AWAIT: int
@ -108,7 +94,9 @@ class PythonRepl(PythonInput):
else: else:
# Print. # Print.
if result is not None: if result is not None:
self.show_result(result) self._show_result(result)
if self.insert_blank_line_after_output:
self.app.output.write("\n")
# Loop. # Loop.
self.current_statement_index += 1 self.current_statement_index += 1
@ -123,6 +111,24 @@ class PythonRepl(PythonInput):
# any case.) # any case.)
self._handle_keyboard_interrupt(e) self._handle_keyboard_interrupt(e)
def _get_output_printer(self) -> OutputPrinter:
return OutputPrinter(
output=self.app.output,
input=self.app.input,
style=self._current_style,
style_transformation=self.style_transformation,
title=self.title,
)
def _show_result(self, result: object) -> None:
self._get_output_printer().display_result(
result=result,
out_prompt=self.get_output_prompt(),
reformat=self.enable_output_formatting,
highlight=self.enable_syntax_highlighting,
paginate=self.enable_pager,
)
def run(self) -> None: def run(self) -> None:
""" """
Run the REPL loop. Run the REPL loop.
@ -153,27 +159,58 @@ class PythonRepl(PythonInput):
clear_title() clear_title()
self._remove_from_namespace() self._remove_from_namespace()
async def run_and_show_expression_async(self, text: str): async def run_and_show_expression_async(self, text: str) -> Any:
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
system_exit: SystemExit | None = None
try: try:
result = await self.eval_async(text) try:
except KeyboardInterrupt: # KeyboardInterrupt doesn't inherit from Exception. # Create `eval` task. Ensure that control-c will cancel this
raise # task.
except SystemExit: async def eval() -> Any:
return nonlocal system_exit
except BaseException as e: try:
self._handle_exception(e) return await self.eval_async(text)
else: except SystemExit as e:
# Print. # Don't propagate SystemExit in `create_task()`. That
if result is not None: # will kill the event loop. We want to handle it
await loop.run_in_executor(None, lambda: self.show_result(result)) # gracefully.
system_exit = e
# Loop. task = asyncio.create_task(eval())
self.current_statement_index += 1 loop.add_signal_handler(signal.SIGINT, lambda *_: task.cancel())
self.signatures = [] result = await task
# Return the result for future consumers.
return result if system_exit is not None:
raise system_exit
except KeyboardInterrupt:
# KeyboardInterrupt doesn't inherit from Exception.
raise
except SystemExit:
raise
except BaseException as e:
self._handle_exception(e)
else:
# Print.
if result is not None:
await loop.run_in_executor(None, lambda: self._show_result(result))
# Loop.
self.current_statement_index += 1
self.signatures = []
# Return the result for future consumers.
return result
finally:
loop.remove_signal_handler(signal.SIGINT)
except KeyboardInterrupt as e:
# Handle all possible `KeyboardInterrupt` errors. This can
# happen during the `eval`, but also during the
# `show_result` if something takes too long.
# (Try/catch is around the whole block, because we want to
# prevent that a Control-C keypress terminates the REPL in
# any case.)
self._handle_keyboard_interrupt(e)
async def run_async(self) -> None: async def run_async(self) -> None:
""" """
@ -187,7 +224,7 @@ class PythonRepl(PythonInput):
(Both for control-C to work, as well as for the code to see the right (Both for control-C to work, as well as for the code to see the right
thread in which it was embedded). thread in which it was embedded).
""" """
loop = asyncio.get_event_loop() loop = asyncio.get_running_loop()
if self.terminal_title: if self.terminal_title:
set_title(self.terminal_title) set_title(self.terminal_title)
@ -217,6 +254,8 @@ class PythonRepl(PythonInput):
# `KeyboardInterrupt` exceptions can end up in the event # `KeyboardInterrupt` exceptions can end up in the event
# loop selector. # loop selector.
self._handle_keyboard_interrupt(e) self._handle_keyboard_interrupt(e)
except SystemExit:
return
finally: finally:
if self.terminal_title: if self.terminal_title:
clear_title() clear_title()
@ -245,7 +284,7 @@ class PythonRepl(PythonInput):
result = eval(code, self.get_globals(), self.get_locals()) result = eval(code, self.get_globals(), self.get_locals())
if _has_coroutine_flag(code): if _has_coroutine_flag(code):
result = asyncio.get_event_loop().run_until_complete(result) result = asyncio.get_running_loop().run_until_complete(result)
self._store_eval_result(result) self._store_eval_result(result)
return result return result
@ -258,7 +297,7 @@ class PythonRepl(PythonInput):
result = eval(code, self.get_globals(), self.get_locals()) result = eval(code, self.get_globals(), self.get_locals())
if _has_coroutine_flag(code): if _has_coroutine_flag(code):
result = asyncio.get_event_loop().run_until_complete(result) result = asyncio.get_running_loop().run_until_complete(result)
return None return None
@ -318,264 +357,12 @@ class PythonRepl(PythonInput):
dont_inherit=True, dont_inherit=True,
) )
def _format_result_output(self, result: object) -> StyleAndTextTuples:
"""
Format __repr__ for an `eval` result.
Note: this can raise `KeyboardInterrupt` if either calling `__repr__`,
`__pt_repr__` or formatting the output with "Black" takes to long
and the user presses Control-C.
"""
out_prompt = to_formatted_text(self.get_output_prompt())
# If the repr is valid Python code, use the Pygments lexer.
try:
result_repr = repr(result)
except KeyboardInterrupt:
raise # Don't catch here.
except BaseException as e:
# Calling repr failed.
self._handle_exception(e)
return []
try:
compile(result_repr, "", "eval")
except SyntaxError:
formatted_result_repr = to_formatted_text(result_repr)
else:
# Syntactically correct. Format with black and syntax highlight.
if self.enable_output_formatting:
# Inline import. Slightly speed up start-up time if black is
# not used.
try:
import black
if not hasattr(black, "Mode"):
raise ImportError
except ImportError:
pass # no Black package in your installation
else:
result_repr = black.format_str(
result_repr,
mode=black.Mode(line_length=self.app.output.get_size().columns),
)
formatted_result_repr = to_formatted_text(
PygmentsTokens(list(_lex_python_result(result_repr)))
)
# If __pt_repr__ is present, take this. This can return prompt_toolkit
# formatted text.
try:
if hasattr(result, "__pt_repr__"):
formatted_result_repr = to_formatted_text(
getattr(result, "__pt_repr__")()
)
if isinstance(formatted_result_repr, list):
formatted_result_repr = FormattedText(formatted_result_repr)
except KeyboardInterrupt:
raise # Don't catch here.
except:
# For bad code, `__getattr__` can raise something that's not an
# `AttributeError`. This happens already when calling `hasattr()`.
pass
# Align every line to the prompt.
line_sep = "\n" + " " * fragment_list_width(out_prompt)
indented_repr: StyleAndTextTuples = []
lines = list(split_lines(formatted_result_repr))
for i, fragment in enumerate(lines):
indented_repr.extend(fragment)
# Add indentation separator between lines, not after the last line.
if i != len(lines) - 1:
indented_repr.append(("", line_sep))
# Write output tokens.
if self.enable_syntax_highlighting:
formatted_output = merge_formatted_text([out_prompt, indented_repr])
else:
formatted_output = FormattedText(
out_prompt + [("", fragment_list_to_text(formatted_result_repr))]
)
return to_formatted_text(formatted_output)
def show_result(self, result: object) -> None:
"""
Show __repr__ for an `eval` result and print to output.
"""
formatted_text_output = self._format_result_output(result)
if self.enable_pager:
self.print_paginated_formatted_text(formatted_text_output)
else:
self.print_formatted_text(formatted_text_output)
self.app.output.flush()
if self.insert_blank_line_after_output:
self.app.output.write("\n")
def print_formatted_text(
self, formatted_text: StyleAndTextTuples, end: str = "\n"
) -> None:
print_formatted_text(
FormattedText(formatted_text),
style=self._current_style,
style_transformation=self.style_transformation,
include_default_pygments_style=False,
output=self.app.output,
end=end,
)
def print_paginated_formatted_text(
self,
formatted_text: StyleAndTextTuples,
end: str = "\n",
) -> None:
"""
Print formatted text, using --MORE-- style pagination.
(Avoid filling up the terminal's scrollback buffer.)
"""
pager_prompt = self.create_pager_prompt()
size = self.app.output.get_size()
abort = False
print_all = False
# Max number of lines allowed in the buffer before painting.
max_rows = size.rows - 1
# Page buffer.
rows_in_buffer = 0
columns_in_buffer = 0
page: StyleAndTextTuples = []
def flush_page() -> None:
nonlocal page, columns_in_buffer, rows_in_buffer
self.print_formatted_text(page, end="")
page = []
columns_in_buffer = 0
rows_in_buffer = 0
def show_pager() -> None:
nonlocal abort, max_rows, print_all
# Run pager prompt in another thread.
# Same as for the input. This prevents issues with nested event
# loops.
pager_result = pager_prompt.prompt(in_thread=True)
if pager_result == PagerResult.ABORT:
print("...")
abort = True
elif pager_result == PagerResult.NEXT_LINE:
max_rows = 1
elif pager_result == PagerResult.NEXT_PAGE:
max_rows = size.rows - 1
elif pager_result == PagerResult.PRINT_ALL:
print_all = True
# Loop over lines. Show --MORE-- prompt when page is filled.
formatted_text = formatted_text + [("", end)]
lines = list(split_lines(formatted_text))
for lineno, line in enumerate(lines):
for style, text, *_ in line:
for c in text:
width = get_cwidth(c)
# (Soft) wrap line if it doesn't fit.
if columns_in_buffer + width > size.columns:
# Show pager first if we get too many lines after
# wrapping.
if rows_in_buffer + 1 >= max_rows and not print_all:
page.append(("", "\n"))
flush_page()
show_pager()
if abort:
return
rows_in_buffer += 1
columns_in_buffer = 0
columns_in_buffer += width
page.append((style, c))
if rows_in_buffer + 1 >= max_rows and not print_all:
page.append(("", "\n"))
flush_page()
show_pager()
if abort:
return
else:
# Add line ending between lines (if `end="\n"` was given, one
# more empty line is added in `split_lines` automatically to
# take care of the final line ending).
if lineno != len(lines) - 1:
page.append(("", "\n"))
rows_in_buffer += 1
columns_in_buffer = 0
flush_page()
def create_pager_prompt(self) -> PromptSession[PagerResult]:
"""
Create pager --MORE-- prompt.
"""
return create_pager_prompt(self._current_style, self.title)
def _format_exception_output(self, e: BaseException) -> PygmentsTokens:
# Instead of just calling ``traceback.format_exc``, we take the
# traceback and skip the bottom calls of this framework.
t, v, tb = sys.exc_info()
# Required for pdb.post_mortem() to work.
sys.last_type, sys.last_value, sys.last_traceback = t, v, tb
tblist = list(traceback.extract_tb(tb))
for line_nr, tb_tuple in enumerate(tblist):
if tb_tuple[0] == "<stdin>":
tblist = tblist[line_nr:]
break
l = traceback.format_list(tblist)
if l:
l.insert(0, "Traceback (most recent call last):\n")
l.extend(traceback.format_exception_only(t, v))
tb_str = "".join(l)
# Format exception and write to output.
# (We use the default style. Most other styles result
# in unreadable colors for the traceback.)
if self.enable_syntax_highlighting:
tokens = list(_lex_python_traceback(tb_str))
else:
tokens = [(Token, tb_str)]
return PygmentsTokens(tokens)
def _handle_exception(self, e: BaseException) -> None: def _handle_exception(self, e: BaseException) -> None:
output = self.app.output self._get_output_printer().display_exception(
e,
tokens = self._format_exception_output(e) highlight=self.enable_syntax_highlighting,
paginate=self.enable_pager,
print_formatted_text(
tokens,
style=self._current_style,
style_transformation=self.style_transformation,
include_default_pygments_style=False,
output=output,
) )
output.flush()
def _handle_keyboard_interrupt(self, e: KeyboardInterrupt) -> None: def _handle_keyboard_interrupt(self, e: KeyboardInterrupt) -> None:
output = self.app.output output = self.app.output
@ -602,21 +389,16 @@ class PythonRepl(PythonInput):
globals = self.get_globals() globals = self.get_globals()
del globals["get_ptpython"] del globals["get_ptpython"]
def print_paginated_formatted_text(
def _lex_python_traceback(tb): self,
"Return token list for traceback string." formatted_text: Iterable[OneStyleAndTextTuple],
lexer = PythonTracebackLexer() end: str = "\n",
return lexer.get_tokens(tb) ) -> None:
# Warning: This is mainly here backwards-compatibility. Some projects
# call `print_paginated_formatted_text` on the Repl object.
def _lex_python_result(tb): self._get_output_printer().display_style_and_text_tuples(
"Return token list for Python string." formatted_text, paginate=True
lexer = PythonLexer() )
# Use `get_tokens_unprocessed`, so that we get exactly the same string,
# without line endings appended. `print_formatted_text` already appends a
# line ending, and otherwise we'll have two line endings.
tokens = lexer.get_tokens_unprocessed(tb)
return [(tokentype, value) for index, tokentype, value in tokens]
def enable_deprecation_warnings() -> None: def enable_deprecation_warnings() -> None:
@ -630,25 +412,31 @@ def enable_deprecation_warnings() -> None:
warnings.filterwarnings("default", category=DeprecationWarning, module="__main__") warnings.filterwarnings("default", category=DeprecationWarning, module="__main__")
def run_config( DEFAULT_CONFIG_FILE = "~/.config/ptpython/config.py"
repl: PythonInput, config_file: str = "~/.config/ptpython/config.py"
) -> None:
def run_config(repl: PythonInput, config_file: str | None = None) -> None:
""" """
Execute REPL config file. Execute REPL config file.
:param repl: `PythonInput` instance. :param repl: `PythonInput` instance.
:param config_file: Path of the configuration file. :param config_file: Path of the configuration file.
""" """
explicit_config_file = config_file is not None
# Expand tildes. # Expand tildes.
config_file = os.path.expanduser(config_file) config_file = os.path.expanduser(
config_file if config_file is not None else DEFAULT_CONFIG_FILE
)
def enter_to_continue() -> None: def enter_to_continue() -> None:
input("\nPress ENTER to continue...") input("\nPress ENTER to continue...")
# Check whether this file exists. # Check whether this file exists.
if not os.path.exists(config_file): if not os.path.exists(config_file):
print("Impossible to read %r" % config_file) if explicit_config_file:
enter_to_continue() print(f"Impossible to read {config_file}")
enter_to_continue()
return return
# Run the config file in an empty namespace. # Run the config file in an empty namespace.
@ -741,67 +529,3 @@ def embed(
else: else:
with patch_context: with patch_context:
repl.run() repl.run()
class PagerResult(Enum):
ABORT = "ABORT"
NEXT_LINE = "NEXT_LINE"
NEXT_PAGE = "NEXT_PAGE"
PRINT_ALL = "PRINT_ALL"
def create_pager_prompt(
style: BaseStyle, title: AnyFormattedText = ""
) -> PromptSession[PagerResult]:
"""
Create a "continue" prompt for paginated output.
"""
bindings = KeyBindings()
@bindings.add("enter")
@bindings.add("down")
def next_line(event: KeyPressEvent) -> None:
event.app.exit(result=PagerResult.NEXT_LINE)
@bindings.add("space")
def next_page(event: KeyPressEvent) -> None:
event.app.exit(result=PagerResult.NEXT_PAGE)
@bindings.add("a")
def print_all(event: KeyPressEvent) -> None:
event.app.exit(result=PagerResult.PRINT_ALL)
@bindings.add("q")
@bindings.add("c-c")
@bindings.add("c-d")
@bindings.add("escape", eager=True)
def no(event: KeyPressEvent) -> None:
event.app.exit(result=PagerResult.ABORT)
@bindings.add("<any>")
def _(event: KeyPressEvent) -> None:
"Disallow inserting other text."
pass
style
session: PromptSession[PagerResult] = PromptSession(
merge_formatted_text(
[
title,
HTML(
"<status-toolbar>"
"<more> -- MORE -- </more> "
"<key>[Enter]</key> Scroll "
"<key>[Space]</key> Next page "
"<key>[a]</key> Print all "
"<key>[q]</key> Quit "
"</status-toolbar>: "
),
]
),
key_bindings=bindings,
erase_when_done=True,
style=style,
)
return session

View file

@ -10,7 +10,7 @@ from __future__ import annotations
import inspect import inspect
from inspect import Signature as InspectSignature from inspect import Signature as InspectSignature
from inspect import _ParameterKind as ParameterKind from inspect import _ParameterKind as ParameterKind
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple from typing import TYPE_CHECKING, Any, Sequence
from prompt_toolkit.document import Document from prompt_toolkit.document import Document
@ -203,7 +203,6 @@ def get_signatures_using_eval(
running `eval()` over the detected function name. running `eval()` over the detected function name.
""" """
# Look for open parenthesis, before cursor position. # Look for open parenthesis, before cursor position.
text = document.text_before_cursor
pos = document.cursor_position - 1 pos = document.cursor_position - 1
paren_mapping = {")": "(", "}": "{", "]": "["} paren_mapping = {")": "(", "}": "{", "]": "["}

View file

@ -1,7 +1,5 @@
from __future__ import annotations from __future__ import annotations
from typing import Dict
from prompt_toolkit.styles import BaseStyle, Style, merge_styles from prompt_toolkit.styles import BaseStyle, Style, merge_styles
from prompt_toolkit.styles.pygments import style_from_pygments_cls from prompt_toolkit.styles.pygments import style_from_pygments_cls
from prompt_toolkit.utils import is_conemu_ansi, is_windows, is_windows_vt100_supported from prompt_toolkit.utils import is_conemu_ansi, is_windows, is_windows_vt100_supported

View file

@ -4,17 +4,7 @@ For internal use only.
from __future__ import annotations from __future__ import annotations
import re import re
from typing import ( from typing import TYPE_CHECKING, Any, Callable, Iterable, TypeVar, cast
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
Optional,
Type,
TypeVar,
cast,
)
from prompt_toolkit.document import Document from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import to_formatted_text from prompt_toolkit.formatted_text import to_formatted_text
@ -91,7 +81,7 @@ def get_jedi_script_from_document(
# Workaround Jedi issue #514: for https://github.com/davidhalter/jedi/issues/514 # Workaround Jedi issue #514: for https://github.com/davidhalter/jedi/issues/514
return None return None
except KeyError: except KeyError:
# Workaroud for a crash when the input is "u'", the start of a unicode string. # Workaround for a crash when the input is "u'", the start of a unicode string.
return None return None
except Exception: except Exception:
# Workaround for: https://github.com/jonathanslenders/ptpython/issues/91 # Workaround for: https://github.com/jonathanslenders/ptpython/issues/91

View file

@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from typing import Callable, Optional from typing import Callable
from prompt_toolkit.document import Document from prompt_toolkit.document import Document
from prompt_toolkit.validation import ValidationError, Validator from prompt_toolkit.validation import ValidationError, Validator

View file

@ -1,13 +1,35 @@
[tool.black] [tool.ruff]
target-version = ['py36'] target-version = "py37"
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"C", # flake8-comprehensions
"T", # Print.
"I", # isort
# "B", # flake8-bugbear
"UP", # pyupgrade
"RUF100", # unused-noqa
"Q", # quotes
]
ignore = [
"E501", # Line too long, handled by black
"C901", # Too complex
"E722", # bare except.
]
[tool.isort] [tool.ruff.per-file-ignores]
# isort configuration that is compatible with Black. "examples/*" = ["T201"] # Print allowed in examples.
multi_line_output = 3 "examples/ptpython_config/config.py" = ["F401"] # Unused imports in config.
include_trailing_comma = true "ptpython/entry_points/run_ptipython.py" = ["T201", "F401"] # Print, import usage.
known_first_party = "ptpython" "ptpython/entry_points/run_ptpython.py" = ["T201"] # Print usage.
known_third_party = "prompt_toolkit,pygments,asyncssh" "ptpython/ipython.py" = ["T100"] # Import usage.
force_grid_wrap = 0 "ptpython/repl.py" = ["T201"] # Print usage.
use_parentheses = true "ptpython/printer.py" = ["T201"] # Print usage.
line_length = 88 "tests/run_tests.py" = ["F401"] # Unused imports.
[tool.ruff.isort]
known-first-party = ["ptpython"]
known-third-party = ["prompt_toolkit", "pygments", "asyncssh"]

View file

@ -11,7 +11,7 @@ with open(os.path.join(os.path.dirname(__file__), "README.rst")) as f:
setup( setup(
name="ptpython", name="ptpython",
author="Jonathan Slenders", author="Jonathan Slenders",
version="3.0.23", version="3.0.25",
url="https://github.com/prompt-toolkit/ptpython", url="https://github.com/prompt-toolkit/ptpython",
description="Python REPL build on top of prompt_toolkit", description="Python REPL build on top of prompt_toolkit",
long_description=long_description, long_description=long_description,
@ -21,12 +21,13 @@ setup(
"appdirs", "appdirs",
"importlib_metadata;python_version<'3.8'", "importlib_metadata;python_version<'3.8'",
"jedi>=0.16.0", "jedi>=0.16.0",
# Use prompt_toolkit 3.0.28, because of cursor shape support. # Use prompt_toolkit 3.0.34, because of `OneStyleAndTextTuple` import.
"prompt_toolkit>=3.0.28,<3.1.0", "prompt_toolkit>=3.0.34,<3.1.0",
"pygments", "pygments",
], ],
python_requires=">=3.7", python_requires=">=3.7",
classifiers=[ classifiers=[
"License :: OSI Approved :: BSD License",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
@ -39,12 +40,14 @@ setup(
"ptpython = ptpython.entry_points.run_ptpython:run", "ptpython = ptpython.entry_points.run_ptpython:run",
"ptipython = ptpython.entry_points.run_ptipython:run", "ptipython = ptpython.entry_points.run_ptipython:run",
"ptpython%s = ptpython.entry_points.run_ptpython:run" % sys.version_info[0], "ptpython%s = ptpython.entry_points.run_ptpython:run" % sys.version_info[0],
"ptpython%s.%s = ptpython.entry_points.run_ptpython:run" "ptpython{}.{} = ptpython.entry_points.run_ptpython:run".format(
% sys.version_info[:2], *sys.version_info[:2]
),
"ptipython%s = ptpython.entry_points.run_ptipython:run" "ptipython%s = ptpython.entry_points.run_ptipython:run"
% sys.version_info[0], % sys.version_info[0],
"ptipython%s.%s = ptpython.entry_points.run_ptipython:run" "ptipython{}.{} = ptpython.entry_points.run_ptipython:run".format(
% sys.version_info[:2], *sys.version_info[:2]
),
] ]
}, },
extras_require={ extras_require={