363 lines
11 KiB
Python
363 lines
11 KiB
Python
from __future__ import annotations
|
||
|
||
import re
|
||
from typing import TYPE_CHECKING, Iterable, Optional, Sequence
|
||
|
||
from rich.cells import cell_len
|
||
from rich.highlighter import Highlighter, ReprHighlighter
|
||
from rich.style import Style
|
||
from rich.text import Text
|
||
|
||
from textual import work
|
||
from textual._line_split import line_split
|
||
from textual.cache import LRUCache
|
||
from textual.geometry import Size
|
||
from textual.reactive import var
|
||
from textual.scroll_view import ScrollView
|
||
from textual.selection import Selection
|
||
from textual.strip import Strip
|
||
|
||
if TYPE_CHECKING:
|
||
from typing_extensions import Self
|
||
|
||
_sub_escape = re.compile("[\u0000-\u0014]").sub
|
||
|
||
|
||
class Log(ScrollView, can_focus=True):
|
||
"""A widget to log text."""
|
||
|
||
ALLOW_SELECT = True
|
||
DEFAULT_CSS = """
|
||
Log {
|
||
background: $surface;
|
||
color: $text;
|
||
overflow: scroll;
|
||
&:focus {
|
||
background-tint: $foreground 5%;
|
||
}
|
||
}
|
||
"""
|
||
|
||
max_lines: var[int | None] = var[Optional[int]](None)
|
||
"""Maximum number of lines to show"""
|
||
|
||
auto_scroll: var[bool] = var(True)
|
||
"""Automatically scroll to new lines."""
|
||
|
||
def __init__(
|
||
self,
|
||
highlight: bool = False,
|
||
max_lines: int | None = None,
|
||
auto_scroll: bool = True,
|
||
name: str | None = None,
|
||
id: str | None = None,
|
||
classes: str | None = None,
|
||
disabled: bool = False,
|
||
) -> None:
|
||
"""Create a Log widget.
|
||
|
||
Args:
|
||
highlight: Enable highlighting.
|
||
max_lines: Maximum number of lines to display.
|
||
auto_scroll: Scroll to end on new lines.
|
||
name: The name of the text log.
|
||
id: The ID of the text log in the DOM.
|
||
classes: The CSS classes of the text log.
|
||
disabled: Whether the text log is disabled or not.
|
||
"""
|
||
super().__init__(name=name, id=id, classes=classes, disabled=disabled)
|
||
self.highlight = highlight
|
||
"""Enable highlighting."""
|
||
self.max_lines = max_lines
|
||
self.auto_scroll = auto_scroll
|
||
self._lines: list[str] = []
|
||
self._width = 0
|
||
self._updates = 0
|
||
self._render_line_cache: LRUCache[int, Strip] = LRUCache(1024)
|
||
self.highlighter: Highlighter = ReprHighlighter()
|
||
"""The Rich Highlighter object to use, if `highlight=True`"""
|
||
self._clear_y = 0
|
||
|
||
@property
|
||
def allow_select(self) -> bool:
|
||
return True
|
||
|
||
@property
|
||
def lines(self) -> Sequence[str]:
|
||
"""The raw lines in the Log.
|
||
|
||
Note that this attribute is read only.
|
||
Changing the lines will not update the Log's contents.
|
||
|
||
"""
|
||
return self._lines
|
||
|
||
def notify_style_update(self) -> None:
|
||
"""Called by Textual when styles update."""
|
||
super().notify_style_update()
|
||
self._render_line_cache.clear()
|
||
|
||
def _update_maximum_width(self, updates: int, size: int) -> None:
|
||
"""Update the virtual size width.
|
||
|
||
Args:
|
||
updates: A counter of updates.
|
||
size: Maximum size of new lines.
|
||
"""
|
||
if updates == self._updates:
|
||
self._width = max(size, self._width)
|
||
self.virtual_size = Size(self._width, self.line_count)
|
||
|
||
@property
|
||
def line_count(self) -> int:
|
||
"""Number of lines of content."""
|
||
if self._lines:
|
||
return len(self._lines) - (self._lines[-1] == "")
|
||
return 0
|
||
|
||
@classmethod
|
||
def _process_line(cls, line: str) -> str:
|
||
"""Process a line before it is rendered to remove control codes.
|
||
|
||
Args:
|
||
line: A string.
|
||
|
||
Returns:
|
||
New string with no control codes.
|
||
"""
|
||
return _sub_escape("<EFBFBD>", line.expandtabs())
|
||
|
||
@work(thread=True)
|
||
def _update_size(self, updates: int, lines: list[str]) -> None:
|
||
"""A thread worker to update the width in the background.
|
||
|
||
Args:
|
||
updates: The update index at the time of invocation.
|
||
lines: Lines that were added.
|
||
"""
|
||
if lines:
|
||
_process_line = self._process_line
|
||
max_length = max(cell_len(_process_line(line)) for line in lines)
|
||
self.app.call_from_thread(self._update_maximum_width, updates, max_length)
|
||
|
||
def _prune_max_lines(self) -> None:
|
||
"""Prune lines if there are more than the maximum."""
|
||
if self.max_lines is None:
|
||
return
|
||
remove_lines = len(self._lines) - self.max_lines
|
||
if remove_lines > 0:
|
||
_cache = self._render_line_cache
|
||
# We've removed some lines, which means the y values in the cache are out of sync
|
||
# Calculated a new dict of cache values
|
||
updated_cache = {
|
||
y - remove_lines: _cache[y] for y in _cache.keys() if y > remove_lines
|
||
}
|
||
# Clear the cache
|
||
_cache.clear()
|
||
# Update the cache with previously calculated values
|
||
for y, line in updated_cache.items():
|
||
_cache[y] = line
|
||
del self._lines[:remove_lines]
|
||
|
||
def write(
|
||
self,
|
||
data: str,
|
||
scroll_end: bool | None = None,
|
||
) -> Self:
|
||
"""Write to the log.
|
||
|
||
Args:
|
||
data: Data to write.
|
||
scroll_end: Scroll to the end after writing, or `None` to use `self.auto_scroll`.
|
||
|
||
Returns:
|
||
The `Log` instance.
|
||
"""
|
||
is_vertical_scroll_end = self.is_vertical_scroll_end
|
||
if data:
|
||
if not self._lines:
|
||
self._lines.append("")
|
||
for line, ending in line_split(data):
|
||
self._lines[-1] += line
|
||
self._width = max(
|
||
self._width, cell_len(self._process_line(self._lines[-1]))
|
||
)
|
||
self.refresh_lines(len(self._lines) - 1)
|
||
if ending:
|
||
self._lines.append("")
|
||
self.virtual_size = Size(self._width, self.line_count)
|
||
|
||
if self.max_lines is not None and len(self._lines) > self.max_lines:
|
||
self._prune_max_lines()
|
||
|
||
auto_scroll = self.auto_scroll if scroll_end is None else scroll_end
|
||
if auto_scroll:
|
||
self.scroll_end(animate=False, immediate=True, x_axis=False)
|
||
return self
|
||
|
||
def write_line(
|
||
self,
|
||
line: str,
|
||
scroll_end: bool | None = None,
|
||
) -> Self:
|
||
"""Write content on a new line.
|
||
|
||
Args:
|
||
line: String to write to the log.
|
||
scroll_end: Scroll to the end after writing, or `None` to use `self.auto_scroll`.
|
||
|
||
Returns:
|
||
The `Log` instance.
|
||
"""
|
||
self.write_lines([line], scroll_end)
|
||
return self
|
||
|
||
def write_lines(
|
||
self,
|
||
lines: Iterable[str],
|
||
scroll_end: bool | None = None,
|
||
) -> Self:
|
||
"""Write an iterable of lines.
|
||
|
||
Args:
|
||
lines: An iterable of strings to write.
|
||
scroll_end: Scroll to the end after writing, or `None` to use `self.auto_scroll`.
|
||
|
||
Returns:
|
||
The `Log` instance.
|
||
"""
|
||
is_vertical_scroll_end = self.is_vertical_scroll_end
|
||
auto_scroll = self.auto_scroll if scroll_end is None else scroll_end
|
||
new_lines = []
|
||
for line in lines:
|
||
new_lines.extend(line.splitlines())
|
||
start_line = len(self._lines)
|
||
self._lines.extend(new_lines)
|
||
if self.max_lines is not None and len(self._lines) > self.max_lines:
|
||
self._prune_max_lines()
|
||
self.virtual_size = Size(self._width, len(self._lines))
|
||
self._update_size(self._updates, new_lines)
|
||
self.refresh_lines(start_line, len(new_lines))
|
||
if (
|
||
auto_scroll
|
||
and not self.is_vertical_scrollbar_grabbed
|
||
and is_vertical_scroll_end
|
||
):
|
||
self.scroll_end(animate=False, immediate=True, x_axis=False)
|
||
else:
|
||
self.refresh()
|
||
return self
|
||
|
||
def clear(self) -> Self:
|
||
"""Clear the Log.
|
||
|
||
Returns:
|
||
The `Log` instance.
|
||
"""
|
||
self._lines.clear()
|
||
self._width = 0
|
||
self._render_line_cache.clear()
|
||
self._updates += 1
|
||
self.virtual_size = Size(0, 0)
|
||
self._clear_y = 0
|
||
return self
|
||
|
||
def get_selection(self, selection: Selection) -> tuple[str, str] | None:
|
||
"""Get the text under the selection.
|
||
|
||
Args:
|
||
selection: Selection information.
|
||
|
||
Returns:
|
||
Tuple of extracted text and ending (typically "\n" or " "), or `None` if no text could be extracted.
|
||
"""
|
||
text = "\n".join(self._lines)
|
||
return selection.extract(text), "\n"
|
||
|
||
def selection_updated(self, selection: Selection | None) -> None:
|
||
self._render_line_cache.clear()
|
||
self.refresh()
|
||
|
||
def render_line(self, y: int) -> Strip:
|
||
"""Render a line of content.
|
||
|
||
Args:
|
||
y: Y Coordinate of line.
|
||
|
||
Returns:
|
||
A rendered line.
|
||
"""
|
||
scroll_x, scroll_y = self.scroll_offset
|
||
strip = self._render_line(scroll_y + y, scroll_x, self.size.width)
|
||
return strip
|
||
|
||
def _render_line(self, y: int, scroll_x: int, width: int) -> Strip:
|
||
"""Render a line into a cropped strip.
|
||
|
||
Args:
|
||
y: Y offset of line.
|
||
scroll_x: Current horizontal scroll.
|
||
width: Width of the widget.
|
||
|
||
Returns:
|
||
A Strip suitable for rendering.
|
||
"""
|
||
rich_style = self.rich_style
|
||
if y >= len(self._lines):
|
||
return Strip.blank(width, rich_style)
|
||
|
||
line = self._render_line_strip(y, rich_style)
|
||
assert line._cell_length is not None
|
||
line = line.crop_extend(scroll_x, scroll_x + width, rich_style)
|
||
line = line.apply_offsets(scroll_x, y)
|
||
return line
|
||
|
||
def _render_line_strip(self, y: int, rich_style: Style) -> Strip:
|
||
"""Render a line into a Strip.
|
||
|
||
Args:
|
||
y: Y offset of line.
|
||
rich_style: Rich style of line.
|
||
|
||
Returns:
|
||
An uncropped Strip.
|
||
"""
|
||
selection = self.text_selection
|
||
if y in self._render_line_cache and selection is None:
|
||
return self._render_line_cache[y]
|
||
|
||
_line = self._process_line(self._lines[y])
|
||
|
||
line_text = Text(_line, no_wrap=True)
|
||
line_text.stylize(rich_style)
|
||
|
||
if self.highlight:
|
||
line_text = self.highlighter(line_text)
|
||
if selection is not None:
|
||
if (select_span := selection.get_span(y - self._clear_y)) is not None:
|
||
start, end = select_span
|
||
if end == -1:
|
||
end = len(line_text)
|
||
|
||
selection_style = self.screen.get_component_rich_style(
|
||
"screen--selection"
|
||
)
|
||
line_text.stylize(selection_style, start, end)
|
||
|
||
line = Strip(line_text.render(self.app.console), cell_len(_line))
|
||
|
||
if selection is not None:
|
||
self._render_line_cache[y] = line
|
||
return line
|
||
|
||
def refresh_lines(self, y_start: int, line_count: int = 1) -> None:
|
||
"""Refresh one or more lines.
|
||
|
||
Args:
|
||
y_start: First line to refresh.
|
||
line_count: Total number of lines to refresh.
|
||
"""
|
||
for y in range(y_start, y_start + line_count):
|
||
self._render_line_cache.discard(y)
|
||
super().refresh_lines(y_start, line_count=line_count)
|