ai-station/.venv/lib/python3.12/site-packages/textual/await_complete.py

86 lines
2.5 KiB
Python
Raw Permalink Normal View History

2025-12-25 14:54:33 +00:00
from __future__ import annotations
from asyncio import Future, gather
from typing import TYPE_CHECKING, Any, Awaitable, Generator
import rich.repr
from typing_extensions import Self
from textual._debug import get_caller_file_and_line
from textual.message_pump import MessagePump
if TYPE_CHECKING:
from textual.types import CallbackType
@rich.repr.auto(angular=True)
class AwaitComplete:
"""An 'optionally-awaitable' object which runs one or more coroutines (or other awaitables) concurrently."""
def __init__(
self, *awaitables: Awaitable, pre_await: CallbackType | None = None
) -> None:
"""Create an AwaitComplete.
Args:
awaitables: One or more awaitables to run concurrently.
"""
self._awaitables = awaitables
self._future: Future[Any] = gather(*awaitables)
self._pre_await: CallbackType | None = pre_await
self._caller = get_caller_file_and_line()
def __rich_repr__(self) -> rich.repr.Result:
yield self._awaitables
yield "pre_await", self._pre_await, None
yield "caller", self._caller, None
def set_pre_await_callback(self, pre_await: CallbackType | None) -> None:
"""Set a callback to run prior to awaiting.
This is used by Textual, mainly to check for possible deadlocks.
You are unlikely to need to call this method in an app.
Args:
pre_await: A callback.
"""
self._pre_await = pre_await
def call_next(self, node: MessagePump) -> Self:
"""Await after the next message.
Args:
node: The node which created the object.
"""
node.call_next(self)
return self
async def __call__(self) -> Any:
return await self
def __await__(self) -> Generator[Any, None, Any]:
_rich_traceback_omit = True
if self._pre_await is not None:
self._pre_await()
return self._future.__await__()
@property
def is_done(self) -> bool:
"""`True` if the task has completed."""
return self._future.done()
@property
def exception(self) -> BaseException | None:
"""An exception if the awaitables failed."""
if self._future.done():
return self._future.exception()
return None
@classmethod
def nothing(cls):
"""Returns an already completed instance of AwaitComplete."""
instance = cls()
instance._future = Future()
instance._future.set_result(None) # Mark it as completed with no result
return instance