355 lines
12 KiB
Python
355 lines
12 KiB
Python
"""
|
|
|
|
The Remote driver uses the following packet structure.
|
|
|
|
1 byte for packet type. "D" for data, "M" for meta.
|
|
4 byte little endian integer for the size of the payload.
|
|
Arbitrary payload.
|
|
|
|
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import signal
|
|
import sys
|
|
from codecs import getincrementaldecoder
|
|
from functools import partial
|
|
from pathlib import Path
|
|
from threading import Event, Thread
|
|
from typing import Any, BinaryIO, Literal, TextIO, cast
|
|
|
|
from textual import events, log, messages
|
|
from textual._binary_encode import dump as binary_dump
|
|
from textual._xterm_parser import XTermParser
|
|
from textual.app import App
|
|
from textual.driver import Driver
|
|
from textual.drivers._byte_stream import ByteStream
|
|
from textual.drivers._input_reader import InputReader
|
|
from textual.geometry import Size
|
|
|
|
WINDOWS = sys.platform == "win32"
|
|
|
|
|
|
class _ExitInput(Exception):
|
|
"""Internal exception to force exit of input loop."""
|
|
|
|
|
|
class WebDriver(Driver):
|
|
"""A headless driver that may be run remotely."""
|
|
|
|
def __init__(
|
|
self,
|
|
app: App[Any],
|
|
*,
|
|
debug: bool = False,
|
|
mouse: bool = True,
|
|
size: tuple[int, int] | None = None,
|
|
):
|
|
if size is None:
|
|
try:
|
|
width = int(os.environ.get("COLUMNS", 80))
|
|
height = int(os.environ.get("ROWS", 24))
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
size = width, height
|
|
super().__init__(app, debug=debug, mouse=mouse, size=size)
|
|
self.stdout = sys.__stdout__
|
|
self.fileno = sys.__stdout__.fileno()
|
|
self._write = partial(os.write, self.fileno)
|
|
self.exit_event = Event()
|
|
self._key_thread: Thread = Thread(
|
|
target=self.run_input_thread, name="textual-input"
|
|
)
|
|
self._input_reader = InputReader()
|
|
|
|
self._deliveries: dict[str, BinaryIO | TextIO] = {}
|
|
"""Maps delivery keys to file-like objects, used
|
|
for delivering files to the browser."""
|
|
|
|
@property
|
|
def is_web(self) -> bool:
|
|
return True
|
|
|
|
def write(self, data: str) -> None:
|
|
"""Write string data to the output device, which may be piped to
|
|
the parent process (i.e. textual-web/textual-serve).
|
|
|
|
Args:
|
|
data: Raw data.
|
|
"""
|
|
|
|
data_bytes = data.encode("utf-8")
|
|
self._write(b"D%s%s" % (len(data_bytes).to_bytes(4, "big"), data_bytes))
|
|
|
|
def write_meta(self, data: dict[str, object]) -> None:
|
|
"""Write a dictionary containing some metadata to stdout, which
|
|
may be piped to the parent process (i.e. textual-web/textual-serve).
|
|
|
|
Args:
|
|
data: Meta dict.
|
|
"""
|
|
meta_bytes = json.dumps(data).encode("utf-8", errors="ignore")
|
|
self._write(b"M%s%s" % (len(meta_bytes).to_bytes(4, "big"), meta_bytes))
|
|
|
|
def write_binary_encoded(self, data: tuple[str | bytes, ...]) -> None:
|
|
"""Binary encode a data-structure and write to stdout.
|
|
|
|
Args:
|
|
data: The data to binary encode and write.
|
|
"""
|
|
packed_bytes = binary_dump(data)
|
|
self._write(b"P%s%s" % (len(packed_bytes).to_bytes(4, "big"), packed_bytes))
|
|
|
|
def flush(self) -> None:
|
|
pass
|
|
|
|
def _enable_mouse_support(self) -> None:
|
|
"""Enable reporting of mouse events."""
|
|
write = self.write
|
|
write("\x1b[?1000h") # SET_VT200_MOUSE
|
|
write("\x1b[?1003h") # SET_ANY_EVENT_MOUSE
|
|
write("\x1b[?1015h") # SET_VT200_HIGHLIGHT_MOUSE
|
|
write("\x1b[?1006h") # SET_SGR_EXT_MODE_MOUSE
|
|
|
|
def _enable_bracketed_paste(self) -> None:
|
|
"""Enable bracketed paste mode."""
|
|
self.write("\x1b[?2004h")
|
|
|
|
def _disable_bracketed_paste(self) -> None:
|
|
"""Disable bracketed paste mode."""
|
|
self.write("\x1b[?2004l")
|
|
|
|
def _disable_mouse_support(self) -> None:
|
|
"""Disable reporting of mouse events."""
|
|
write = self.write
|
|
write("\x1b[?1000l") #
|
|
write("\x1b[?1003l") #
|
|
write("\x1b[?1015l")
|
|
write("\x1b[?1006l")
|
|
|
|
def _request_terminal_sync_mode_support(self) -> None:
|
|
"""Writes an escape sequence to query the terminal support for the sync protocol."""
|
|
self.write("\033[?2026$p")
|
|
|
|
def start_application_mode(self) -> None:
|
|
"""Start application mode."""
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def do_exit() -> None:
|
|
"""Callback to force exit."""
|
|
asyncio.run_coroutine_threadsafe(
|
|
self._app._post_message(messages.ExitApp()), loop=loop
|
|
)
|
|
|
|
if not WINDOWS:
|
|
for _signal in (signal.SIGINT, signal.SIGTERM):
|
|
loop.add_signal_handler(_signal, do_exit)
|
|
|
|
self._write(b"__GANGLION__\n")
|
|
|
|
self.write("\x1b[?1049h") # Alt screen
|
|
self._enable_mouse_support()
|
|
|
|
self.write("\x1b[?25l") # Hide cursor
|
|
self.write("\033[?1003h")
|
|
|
|
size = Size(80, 24) if self._size is None else Size(*self._size)
|
|
event = events.Resize(size, size)
|
|
asyncio.run_coroutine_threadsafe(
|
|
self._app._post_message(event),
|
|
loop=loop,
|
|
)
|
|
|
|
self._request_terminal_sync_mode_support()
|
|
self._enable_bracketed_paste()
|
|
self.flush()
|
|
self._key_thread.start()
|
|
self._app.call_later(self._app.post_message, events.AppBlur())
|
|
|
|
def disable_input(self) -> None:
|
|
"""Disable further input."""
|
|
|
|
def stop_application_mode(self) -> None:
|
|
"""Stop application mode, restore state."""
|
|
self.exit_event.set()
|
|
self._input_reader.close()
|
|
self.write_meta({"type": "exit"})
|
|
|
|
def run_input_thread(self) -> None:
|
|
"""Wait for input and dispatch events."""
|
|
input_reader = self._input_reader
|
|
parser = XTermParser(debug=self._debug)
|
|
utf8_decoder = getincrementaldecoder("utf-8")().decode
|
|
decode = utf8_decoder
|
|
# The server sends us a stream of bytes, which contains the equivalent of stdin, plus
|
|
# in band data packets.
|
|
byte_stream = ByteStream()
|
|
try:
|
|
for data in input_reader:
|
|
if data:
|
|
for packet_type, payload in byte_stream.feed(data):
|
|
if packet_type == "D":
|
|
# Treat as stdin
|
|
for event in parser.feed(decode(payload)):
|
|
self.process_message(event)
|
|
else:
|
|
# Process meta information separately
|
|
self._on_meta(packet_type, payload)
|
|
for event in parser.tick():
|
|
self.process_message(event)
|
|
except _ExitInput:
|
|
pass
|
|
except Exception:
|
|
from traceback import format_exc
|
|
|
|
log(format_exc())
|
|
finally:
|
|
input_reader.close()
|
|
|
|
def _on_meta(self, packet_type: str, payload: bytes) -> None:
|
|
"""Private method to dispatch meta.
|
|
|
|
Args:
|
|
packet_type: Packet type (currently always "M")
|
|
payload: Meta payload (JSON encoded as bytes).
|
|
"""
|
|
payload_map: dict[str, object] = json.loads(payload)
|
|
_type = payload_map.get("type", {})
|
|
if isinstance(_type, str):
|
|
self.on_meta(_type, payload_map)
|
|
else:
|
|
log.error(
|
|
f"Protocol error: type field value is not a string. Value is {_type!r}"
|
|
)
|
|
|
|
def on_meta(self, packet_type: str, payload: dict[str, object]) -> None:
|
|
"""Process a dictionary containing information received from the controlling process.
|
|
|
|
Args:
|
|
packet_type: The type of the packet.
|
|
payload: meta dict.
|
|
"""
|
|
if packet_type == "resize":
|
|
self._size = (payload["width"], payload["height"])
|
|
requested_size = Size(*self._size)
|
|
self._app.post_message(events.Resize(requested_size, requested_size))
|
|
elif packet_type == "focus":
|
|
self._app.post_message(events.AppFocus())
|
|
elif packet_type == "blur":
|
|
self._app.post_message(events.AppBlur())
|
|
elif packet_type == "quit":
|
|
self._app.post_message(messages.ExitApp())
|
|
elif packet_type == "exit":
|
|
raise _ExitInput()
|
|
elif packet_type == "deliver_chunk_request":
|
|
# A request from the server to deliver another chunk of a file
|
|
log.debug(f"Deliver chunk request: {payload}")
|
|
try:
|
|
delivery_key = cast(str, payload["key"])
|
|
requested_size = cast(int, payload["size"])
|
|
except KeyError:
|
|
log.error("Protocol error: deliver_chunk_request missing key or size")
|
|
return
|
|
|
|
deliveries = self._deliveries
|
|
|
|
file_like: BinaryIO | TextIO | None = None
|
|
try:
|
|
file_like = deliveries[delivery_key]
|
|
except KeyError:
|
|
log.error(
|
|
f"Protocol error: deliver_chunk_request invalid key {delivery_key!r}"
|
|
)
|
|
else:
|
|
# Read the requested amount of data from the file
|
|
name: str | None = payload.get("name", None)
|
|
try:
|
|
log.debug(f"Reading {requested_size} bytes from {delivery_key}")
|
|
chunk = file_like.read(requested_size)
|
|
log.debug(f"Delivering chunk {delivery_key!r} of len {len(chunk)}")
|
|
self.write_binary_encoded(("deliver_chunk", delivery_key, chunk))
|
|
# We've hit an empty chunk, so we're done
|
|
if not chunk:
|
|
log.info(f"Delivery complete for {delivery_key}")
|
|
file_like.close()
|
|
del deliveries[delivery_key]
|
|
self._delivery_complete(delivery_key, save_path=None, name=name)
|
|
except Exception as error:
|
|
file_like.close()
|
|
del deliveries[delivery_key]
|
|
|
|
log.error(
|
|
f"Error delivering file chunk for key {delivery_key!r}. "
|
|
"Cancelling delivery."
|
|
)
|
|
import traceback
|
|
|
|
log.error(str(traceback.format_exc()))
|
|
|
|
self._delivery_failed(delivery_key, exception=error, name=name)
|
|
|
|
def open_url(self, url: str, new_tab: bool = True) -> None:
|
|
"""Open a URL in the default web browser.
|
|
|
|
Args:
|
|
url: The URL to open.
|
|
new_tab: Whether to open the URL in a new tab.
|
|
"""
|
|
self.write_meta({"type": "open_url", "url": url, "new_tab": new_tab})
|
|
|
|
def deliver_binary(
|
|
self,
|
|
binary: BinaryIO | TextIO,
|
|
*,
|
|
delivery_key: str,
|
|
save_path: Path,
|
|
open_method: Literal["browser", "download"] = "download",
|
|
encoding: str | None = None,
|
|
mime_type: str | None = None,
|
|
name: str | None = None,
|
|
) -> None:
|
|
self._deliver_file(
|
|
binary,
|
|
delivery_key=delivery_key,
|
|
save_path=save_path,
|
|
open_method=open_method,
|
|
encoding=encoding,
|
|
mime_type=mime_type,
|
|
name=name,
|
|
)
|
|
|
|
def _deliver_file(
|
|
self,
|
|
binary: BinaryIO | TextIO,
|
|
*,
|
|
delivery_key: str,
|
|
save_path: Path,
|
|
open_method: Literal["browser", "download"],
|
|
encoding: str | None = None,
|
|
mime_type: str | None = None,
|
|
name: str | None = None,
|
|
) -> None:
|
|
"""Deliver a file to the end-user of the application."""
|
|
binary.seek(0)
|
|
|
|
self._deliveries[delivery_key] = binary
|
|
|
|
# Inform the server that we're starting a new file delivery
|
|
meta: dict[str, object] = {
|
|
"type": "deliver_file_start",
|
|
"key": delivery_key,
|
|
"path": str(save_path.resolve()),
|
|
"open_method": open_method,
|
|
"encoding": encoding or "",
|
|
"mime_type": mime_type or "",
|
|
"name": name,
|
|
}
|
|
self.write_meta(meta)
|
|
log.info(f"Delivering file {meta['path']!r}: {meta!r}")
|