ai-station/.venv/lib/python3.12/site-packages/textual/drivers/web_driver.py

355 lines
12 KiB
Python

"""
The Remote driver uses the following packet structure.
1 byte for packet type. "D" for data, "M" for meta.
4 byte little endian integer for the size of the payload.
Arbitrary payload.
"""
from __future__ import annotations
import asyncio
import json
import os
import signal
import sys
from codecs import getincrementaldecoder
from functools import partial
from pathlib import Path
from threading import Event, Thread
from typing import Any, BinaryIO, Literal, TextIO, cast
from textual import events, log, messages
from textual._binary_encode import dump as binary_dump
from textual._xterm_parser import XTermParser
from textual.app import App
from textual.driver import Driver
from textual.drivers._byte_stream import ByteStream
from textual.drivers._input_reader import InputReader
from textual.geometry import Size
WINDOWS = sys.platform == "win32"
class _ExitInput(Exception):
"""Internal exception to force exit of input loop."""
class WebDriver(Driver):
"""A headless driver that may be run remotely."""
def __init__(
self,
app: App[Any],
*,
debug: bool = False,
mouse: bool = True,
size: tuple[int, int] | None = None,
):
if size is None:
try:
width = int(os.environ.get("COLUMNS", 80))
height = int(os.environ.get("ROWS", 24))
except ValueError:
pass
else:
size = width, height
super().__init__(app, debug=debug, mouse=mouse, size=size)
self.stdout = sys.__stdout__
self.fileno = sys.__stdout__.fileno()
self._write = partial(os.write, self.fileno)
self.exit_event = Event()
self._key_thread: Thread = Thread(
target=self.run_input_thread, name="textual-input"
)
self._input_reader = InputReader()
self._deliveries: dict[str, BinaryIO | TextIO] = {}
"""Maps delivery keys to file-like objects, used
for delivering files to the browser."""
@property
def is_web(self) -> bool:
return True
def write(self, data: str) -> None:
"""Write string data to the output device, which may be piped to
the parent process (i.e. textual-web/textual-serve).
Args:
data: Raw data.
"""
data_bytes = data.encode("utf-8")
self._write(b"D%s%s" % (len(data_bytes).to_bytes(4, "big"), data_bytes))
def write_meta(self, data: dict[str, object]) -> None:
"""Write a dictionary containing some metadata to stdout, which
may be piped to the parent process (i.e. textual-web/textual-serve).
Args:
data: Meta dict.
"""
meta_bytes = json.dumps(data).encode("utf-8", errors="ignore")
self._write(b"M%s%s" % (len(meta_bytes).to_bytes(4, "big"), meta_bytes))
def write_binary_encoded(self, data: tuple[str | bytes, ...]) -> None:
"""Binary encode a data-structure and write to stdout.
Args:
data: The data to binary encode and write.
"""
packed_bytes = binary_dump(data)
self._write(b"P%s%s" % (len(packed_bytes).to_bytes(4, "big"), packed_bytes))
def flush(self) -> None:
pass
def _enable_mouse_support(self) -> None:
"""Enable reporting of mouse events."""
write = self.write
write("\x1b[?1000h") # SET_VT200_MOUSE
write("\x1b[?1003h") # SET_ANY_EVENT_MOUSE
write("\x1b[?1015h") # SET_VT200_HIGHLIGHT_MOUSE
write("\x1b[?1006h") # SET_SGR_EXT_MODE_MOUSE
def _enable_bracketed_paste(self) -> None:
"""Enable bracketed paste mode."""
self.write("\x1b[?2004h")
def _disable_bracketed_paste(self) -> None:
"""Disable bracketed paste mode."""
self.write("\x1b[?2004l")
def _disable_mouse_support(self) -> None:
"""Disable reporting of mouse events."""
write = self.write
write("\x1b[?1000l") #
write("\x1b[?1003l") #
write("\x1b[?1015l")
write("\x1b[?1006l")
def _request_terminal_sync_mode_support(self) -> None:
"""Writes an escape sequence to query the terminal support for the sync protocol."""
self.write("\033[?2026$p")
def start_application_mode(self) -> None:
"""Start application mode."""
loop = asyncio.get_running_loop()
def do_exit() -> None:
"""Callback to force exit."""
asyncio.run_coroutine_threadsafe(
self._app._post_message(messages.ExitApp()), loop=loop
)
if not WINDOWS:
for _signal in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(_signal, do_exit)
self._write(b"__GANGLION__\n")
self.write("\x1b[?1049h") # Alt screen
self._enable_mouse_support()
self.write("\x1b[?25l") # Hide cursor
self.write("\033[?1003h")
size = Size(80, 24) if self._size is None else Size(*self._size)
event = events.Resize(size, size)
asyncio.run_coroutine_threadsafe(
self._app._post_message(event),
loop=loop,
)
self._request_terminal_sync_mode_support()
self._enable_bracketed_paste()
self.flush()
self._key_thread.start()
self._app.call_later(self._app.post_message, events.AppBlur())
def disable_input(self) -> None:
"""Disable further input."""
def stop_application_mode(self) -> None:
"""Stop application mode, restore state."""
self.exit_event.set()
self._input_reader.close()
self.write_meta({"type": "exit"})
def run_input_thread(self) -> None:
"""Wait for input and dispatch events."""
input_reader = self._input_reader
parser = XTermParser(debug=self._debug)
utf8_decoder = getincrementaldecoder("utf-8")().decode
decode = utf8_decoder
# The server sends us a stream of bytes, which contains the equivalent of stdin, plus
# in band data packets.
byte_stream = ByteStream()
try:
for data in input_reader:
if data:
for packet_type, payload in byte_stream.feed(data):
if packet_type == "D":
# Treat as stdin
for event in parser.feed(decode(payload)):
self.process_message(event)
else:
# Process meta information separately
self._on_meta(packet_type, payload)
for event in parser.tick():
self.process_message(event)
except _ExitInput:
pass
except Exception:
from traceback import format_exc
log(format_exc())
finally:
input_reader.close()
def _on_meta(self, packet_type: str, payload: bytes) -> None:
"""Private method to dispatch meta.
Args:
packet_type: Packet type (currently always "M")
payload: Meta payload (JSON encoded as bytes).
"""
payload_map: dict[str, object] = json.loads(payload)
_type = payload_map.get("type", {})
if isinstance(_type, str):
self.on_meta(_type, payload_map)
else:
log.error(
f"Protocol error: type field value is not a string. Value is {_type!r}"
)
def on_meta(self, packet_type: str, payload: dict[str, object]) -> None:
"""Process a dictionary containing information received from the controlling process.
Args:
packet_type: The type of the packet.
payload: meta dict.
"""
if packet_type == "resize":
self._size = (payload["width"], payload["height"])
requested_size = Size(*self._size)
self._app.post_message(events.Resize(requested_size, requested_size))
elif packet_type == "focus":
self._app.post_message(events.AppFocus())
elif packet_type == "blur":
self._app.post_message(events.AppBlur())
elif packet_type == "quit":
self._app.post_message(messages.ExitApp())
elif packet_type == "exit":
raise _ExitInput()
elif packet_type == "deliver_chunk_request":
# A request from the server to deliver another chunk of a file
log.debug(f"Deliver chunk request: {payload}")
try:
delivery_key = cast(str, payload["key"])
requested_size = cast(int, payload["size"])
except KeyError:
log.error("Protocol error: deliver_chunk_request missing key or size")
return
deliveries = self._deliveries
file_like: BinaryIO | TextIO | None = None
try:
file_like = deliveries[delivery_key]
except KeyError:
log.error(
f"Protocol error: deliver_chunk_request invalid key {delivery_key!r}"
)
else:
# Read the requested amount of data from the file
name: str | None = payload.get("name", None)
try:
log.debug(f"Reading {requested_size} bytes from {delivery_key}")
chunk = file_like.read(requested_size)
log.debug(f"Delivering chunk {delivery_key!r} of len {len(chunk)}")
self.write_binary_encoded(("deliver_chunk", delivery_key, chunk))
# We've hit an empty chunk, so we're done
if not chunk:
log.info(f"Delivery complete for {delivery_key}")
file_like.close()
del deliveries[delivery_key]
self._delivery_complete(delivery_key, save_path=None, name=name)
except Exception as error:
file_like.close()
del deliveries[delivery_key]
log.error(
f"Error delivering file chunk for key {delivery_key!r}. "
"Cancelling delivery."
)
import traceback
log.error(str(traceback.format_exc()))
self._delivery_failed(delivery_key, exception=error, name=name)
def open_url(self, url: str, new_tab: bool = True) -> None:
"""Open a URL in the default web browser.
Args:
url: The URL to open.
new_tab: Whether to open the URL in a new tab.
"""
self.write_meta({"type": "open_url", "url": url, "new_tab": new_tab})
def deliver_binary(
self,
binary: BinaryIO | TextIO,
*,
delivery_key: str,
save_path: Path,
open_method: Literal["browser", "download"] = "download",
encoding: str | None = None,
mime_type: str | None = None,
name: str | None = None,
) -> None:
self._deliver_file(
binary,
delivery_key=delivery_key,
save_path=save_path,
open_method=open_method,
encoding=encoding,
mime_type=mime_type,
name=name,
)
def _deliver_file(
self,
binary: BinaryIO | TextIO,
*,
delivery_key: str,
save_path: Path,
open_method: Literal["browser", "download"],
encoding: str | None = None,
mime_type: str | None = None,
name: str | None = None,
) -> None:
"""Deliver a file to the end-user of the application."""
binary.seek(0)
self._deliveries[delivery_key] = binary
# Inform the server that we're starting a new file delivery
meta: dict[str, object] = {
"type": "deliver_file_start",
"key": delivery_key,
"path": str(save_path.resolve()),
"open_method": open_method,
"encoding": encoding or "",
"mime_type": mime_type or "",
"name": name,
}
self.write_meta(meta)
log.info(f"Delivering file {meta['path']!r}: {meta!r}")