ai-station/.venv/lib/python3.12/site-packages/textual/widgets/_log.py

363 lines
11 KiB
Python
Raw Permalink Blame History

from __future__ import annotations
import re
from typing import TYPE_CHECKING, Iterable, Optional, Sequence
from rich.cells import cell_len
from rich.highlighter import Highlighter, ReprHighlighter
from rich.style import Style
from rich.text import Text
from textual import work
from textual._line_split import line_split
from textual.cache import LRUCache
from textual.geometry import Size
from textual.reactive import var
from textual.scroll_view import ScrollView
from textual.selection import Selection
from textual.strip import Strip
if TYPE_CHECKING:
from typing_extensions import Self
_sub_escape = re.compile("[\u0000-\u0014]").sub
class Log(ScrollView, can_focus=True):
"""A widget to log text."""
ALLOW_SELECT = True
DEFAULT_CSS = """
Log {
background: $surface;
color: $text;
overflow: scroll;
&:focus {
background-tint: $foreground 5%;
}
}
"""
max_lines: var[int | None] = var[Optional[int]](None)
"""Maximum number of lines to show"""
auto_scroll: var[bool] = var(True)
"""Automatically scroll to new lines."""
def __init__(
self,
highlight: bool = False,
max_lines: int | None = None,
auto_scroll: bool = True,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
) -> None:
"""Create a Log widget.
Args:
highlight: Enable highlighting.
max_lines: Maximum number of lines to display.
auto_scroll: Scroll to end on new lines.
name: The name of the text log.
id: The ID of the text log in the DOM.
classes: The CSS classes of the text log.
disabled: Whether the text log is disabled or not.
"""
super().__init__(name=name, id=id, classes=classes, disabled=disabled)
self.highlight = highlight
"""Enable highlighting."""
self.max_lines = max_lines
self.auto_scroll = auto_scroll
self._lines: list[str] = []
self._width = 0
self._updates = 0
self._render_line_cache: LRUCache[int, Strip] = LRUCache(1024)
self.highlighter: Highlighter = ReprHighlighter()
"""The Rich Highlighter object to use, if `highlight=True`"""
self._clear_y = 0
@property
def allow_select(self) -> bool:
return True
@property
def lines(self) -> Sequence[str]:
"""The raw lines in the Log.
Note that this attribute is read only.
Changing the lines will not update the Log's contents.
"""
return self._lines
def notify_style_update(self) -> None:
"""Called by Textual when styles update."""
super().notify_style_update()
self._render_line_cache.clear()
def _update_maximum_width(self, updates: int, size: int) -> None:
"""Update the virtual size width.
Args:
updates: A counter of updates.
size: Maximum size of new lines.
"""
if updates == self._updates:
self._width = max(size, self._width)
self.virtual_size = Size(self._width, self.line_count)
@property
def line_count(self) -> int:
"""Number of lines of content."""
if self._lines:
return len(self._lines) - (self._lines[-1] == "")
return 0
@classmethod
def _process_line(cls, line: str) -> str:
"""Process a line before it is rendered to remove control codes.
Args:
line: A string.
Returns:
New string with no control codes.
"""
return _sub_escape("<EFBFBD>", line.expandtabs())
@work(thread=True)
def _update_size(self, updates: int, lines: list[str]) -> None:
"""A thread worker to update the width in the background.
Args:
updates: The update index at the time of invocation.
lines: Lines that were added.
"""
if lines:
_process_line = self._process_line
max_length = max(cell_len(_process_line(line)) for line in lines)
self.app.call_from_thread(self._update_maximum_width, updates, max_length)
def _prune_max_lines(self) -> None:
"""Prune lines if there are more than the maximum."""
if self.max_lines is None:
return
remove_lines = len(self._lines) - self.max_lines
if remove_lines > 0:
_cache = self._render_line_cache
# We've removed some lines, which means the y values in the cache are out of sync
# Calculated a new dict of cache values
updated_cache = {
y - remove_lines: _cache[y] for y in _cache.keys() if y > remove_lines
}
# Clear the cache
_cache.clear()
# Update the cache with previously calculated values
for y, line in updated_cache.items():
_cache[y] = line
del self._lines[:remove_lines]
def write(
self,
data: str,
scroll_end: bool | None = None,
) -> Self:
"""Write to the log.
Args:
data: Data to write.
scroll_end: Scroll to the end after writing, or `None` to use `self.auto_scroll`.
Returns:
The `Log` instance.
"""
is_vertical_scroll_end = self.is_vertical_scroll_end
if data:
if not self._lines:
self._lines.append("")
for line, ending in line_split(data):
self._lines[-1] += line
self._width = max(
self._width, cell_len(self._process_line(self._lines[-1]))
)
self.refresh_lines(len(self._lines) - 1)
if ending:
self._lines.append("")
self.virtual_size = Size(self._width, self.line_count)
if self.max_lines is not None and len(self._lines) > self.max_lines:
self._prune_max_lines()
auto_scroll = self.auto_scroll if scroll_end is None else scroll_end
if auto_scroll:
self.scroll_end(animate=False, immediate=True, x_axis=False)
return self
def write_line(
self,
line: str,
scroll_end: bool | None = None,
) -> Self:
"""Write content on a new line.
Args:
line: String to write to the log.
scroll_end: Scroll to the end after writing, or `None` to use `self.auto_scroll`.
Returns:
The `Log` instance.
"""
self.write_lines([line], scroll_end)
return self
def write_lines(
self,
lines: Iterable[str],
scroll_end: bool | None = None,
) -> Self:
"""Write an iterable of lines.
Args:
lines: An iterable of strings to write.
scroll_end: Scroll to the end after writing, or `None` to use `self.auto_scroll`.
Returns:
The `Log` instance.
"""
is_vertical_scroll_end = self.is_vertical_scroll_end
auto_scroll = self.auto_scroll if scroll_end is None else scroll_end
new_lines = []
for line in lines:
new_lines.extend(line.splitlines())
start_line = len(self._lines)
self._lines.extend(new_lines)
if self.max_lines is not None and len(self._lines) > self.max_lines:
self._prune_max_lines()
self.virtual_size = Size(self._width, len(self._lines))
self._update_size(self._updates, new_lines)
self.refresh_lines(start_line, len(new_lines))
if (
auto_scroll
and not self.is_vertical_scrollbar_grabbed
and is_vertical_scroll_end
):
self.scroll_end(animate=False, immediate=True, x_axis=False)
else:
self.refresh()
return self
def clear(self) -> Self:
"""Clear the Log.
Returns:
The `Log` instance.
"""
self._lines.clear()
self._width = 0
self._render_line_cache.clear()
self._updates += 1
self.virtual_size = Size(0, 0)
self._clear_y = 0
return self
def get_selection(self, selection: Selection) -> tuple[str, str] | None:
"""Get the text under the selection.
Args:
selection: Selection information.
Returns:
Tuple of extracted text and ending (typically "\n" or " "), or `None` if no text could be extracted.
"""
text = "\n".join(self._lines)
return selection.extract(text), "\n"
def selection_updated(self, selection: Selection | None) -> None:
self._render_line_cache.clear()
self.refresh()
def render_line(self, y: int) -> Strip:
"""Render a line of content.
Args:
y: Y Coordinate of line.
Returns:
A rendered line.
"""
scroll_x, scroll_y = self.scroll_offset
strip = self._render_line(scroll_y + y, scroll_x, self.size.width)
return strip
def _render_line(self, y: int, scroll_x: int, width: int) -> Strip:
"""Render a line into a cropped strip.
Args:
y: Y offset of line.
scroll_x: Current horizontal scroll.
width: Width of the widget.
Returns:
A Strip suitable for rendering.
"""
rich_style = self.rich_style
if y >= len(self._lines):
return Strip.blank(width, rich_style)
line = self._render_line_strip(y, rich_style)
assert line._cell_length is not None
line = line.crop_extend(scroll_x, scroll_x + width, rich_style)
line = line.apply_offsets(scroll_x, y)
return line
def _render_line_strip(self, y: int, rich_style: Style) -> Strip:
"""Render a line into a Strip.
Args:
y: Y offset of line.
rich_style: Rich style of line.
Returns:
An uncropped Strip.
"""
selection = self.text_selection
if y in self._render_line_cache and selection is None:
return self._render_line_cache[y]
_line = self._process_line(self._lines[y])
line_text = Text(_line, no_wrap=True)
line_text.stylize(rich_style)
if self.highlight:
line_text = self.highlighter(line_text)
if selection is not None:
if (select_span := selection.get_span(y - self._clear_y)) is not None:
start, end = select_span
if end == -1:
end = len(line_text)
selection_style = self.screen.get_component_rich_style(
"screen--selection"
)
line_text.stylize(selection_style, start, end)
line = Strip(line_text.render(self.app.console), cell_len(_line))
if selection is not None:
self._render_line_cache[y] = line
return line
def refresh_lines(self, y_start: int, line_count: int = 1) -> None:
"""Refresh one or more lines.
Args:
y_start: First line to refresh.
line_count: Total number of lines to refresh.
"""
for y in range(y_start, y_start + line_count):
self._render_line_cache.discard(y)
super().refresh_lines(y_start, line_count=line_count)