494 lines
15 KiB
Python
494 lines
15 KiB
Python
import inspect
|
|
from typing import Any, Awaitable, Callable, Dict, List, Optional, Union, overload
|
|
|
|
from fastapi import Request, Response
|
|
from mcp import ClientSession
|
|
from starlette.datastructures import Headers
|
|
|
|
from chainlit.action import Action
|
|
from chainlit.config import config
|
|
from chainlit.context import context
|
|
from chainlit.data.base import BaseDataLayer
|
|
from chainlit.mcp import McpConnection
|
|
from chainlit.message import Message
|
|
from chainlit.oauth_providers import get_configured_oauth_providers
|
|
from chainlit.step import Step, step
|
|
from chainlit.types import ChatProfile, Starter, ThreadDict
|
|
from chainlit.user import User
|
|
from chainlit.utils import wrap_user_function
|
|
|
|
|
|
def on_app_startup(func: Callable[[], Union[None, Awaitable[None]]]) -> Callable:
|
|
"""
|
|
Hook to run code when the Chainlit application starts.
|
|
Useful for initializing resources, loading models, setting up database connections, etc.
|
|
The function can be synchronous or asynchronous.
|
|
|
|
Args:
|
|
func (Callable[[], Union[None, Awaitable[None]]]): The startup hook to execute. Takes no arguments.
|
|
|
|
Example:
|
|
@cl.on_app_startup
|
|
async def startup():
|
|
print("Application is starting!")
|
|
# Initialize resources here
|
|
|
|
Returns:
|
|
Callable[[], Union[None, Awaitable[None]]]: The decorated startup hook.
|
|
"""
|
|
config.code.on_app_startup = wrap_user_function(func, with_task=False)
|
|
return func
|
|
|
|
|
|
def on_app_shutdown(func: Callable[[], Union[None, Awaitable[None]]]) -> Callable:
|
|
"""
|
|
Hook to run code when the Chainlit application shuts down.
|
|
Useful for cleaning up resources, closing connections, saving state, etc.
|
|
The function can be synchronous or asynchronous.
|
|
|
|
Args:
|
|
func (Callable[[], Union[None, Awaitable[None]]]): The shutdown hook to execute. Takes no arguments.
|
|
|
|
Example:
|
|
@cl.on_app_shutdown
|
|
async def shutdown():
|
|
print("Application is shutting down!")
|
|
# Clean up resources here
|
|
|
|
Returns:
|
|
Callable[[], Union[None, Awaitable[None]]]: The decorated shutdown hook.
|
|
"""
|
|
config.code.on_app_shutdown = wrap_user_function(func, with_task=False)
|
|
return func
|
|
|
|
|
|
def password_auth_callback(
|
|
func: Callable[[str, str], Awaitable[Optional[User]]],
|
|
) -> Callable:
|
|
"""
|
|
Framework agnostic decorator to authenticate the user.
|
|
|
|
Args:
|
|
func (Callable[[str, str], Awaitable[Optional[User]]]): The authentication callback to execute. Takes the email and password as parameters.
|
|
|
|
Example:
|
|
@cl.password_auth_callback
|
|
async def password_auth_callback(username: str, password: str) -> Optional[User]:
|
|
|
|
Returns:
|
|
Callable[[str, str], Awaitable[Optional[User]]]: The decorated authentication callback.
|
|
"""
|
|
|
|
config.code.password_auth_callback = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def header_auth_callback(
|
|
func: Callable[[Headers], Awaitable[Optional[User]]],
|
|
) -> Callable:
|
|
"""
|
|
Framework agnostic decorator to authenticate the user via a header
|
|
|
|
Args:
|
|
func (Callable[[Headers], Awaitable[Optional[User]]]): The authentication callback to execute.
|
|
|
|
Example:
|
|
@cl.header_auth_callback
|
|
async def header_auth_callback(headers: Headers) -> Optional[User]:
|
|
|
|
Returns:
|
|
Callable[[Headers], Awaitable[Optional[User]]]: The decorated authentication callback.
|
|
"""
|
|
|
|
config.code.header_auth_callback = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def oauth_callback(
|
|
func: Callable[
|
|
[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]
|
|
],
|
|
) -> Callable:
|
|
"""
|
|
Framework agnostic decorator to authenticate the user via oauth
|
|
|
|
Args:
|
|
func (Callable[[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]]): The authentication callback to execute.
|
|
|
|
Example:
|
|
@cl.oauth_callback
|
|
async def oauth_callback(provider_id: str, token: str, raw_user_data: Dict[str, str], default_app_user: User, id_token: Optional[str]) -> Optional[User]:
|
|
|
|
Returns:
|
|
Callable[[str, str, Dict[str, str], User, Optional[str]], Awaitable[Optional[User]]]: The decorated authentication callback.
|
|
"""
|
|
|
|
if len(get_configured_oauth_providers()) == 0:
|
|
raise ValueError(
|
|
"You must set the environment variable for at least one oauth provider to use oauth authentication."
|
|
)
|
|
|
|
config.code.oauth_callback = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def on_logout(func: Callable[[Request, Response], Any]) -> Callable:
|
|
"""
|
|
Function called when the user logs out.
|
|
Takes the FastAPI request and response as parameters.
|
|
"""
|
|
|
|
config.code.on_logout = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def on_message(func: Callable) -> Callable:
|
|
"""
|
|
Framework agnostic decorator to react to messages coming from the UI.
|
|
The decorated function is called every time a new message is received.
|
|
|
|
Args:
|
|
func (Callable[[Message], Any]): The function to be called when a new message is received. Takes a cl.Message.
|
|
|
|
Returns:
|
|
Callable[[str], Any]: The decorated on_message function.
|
|
"""
|
|
|
|
async def with_parent_id(message: Message):
|
|
async with Step(name="on_message", type="run", parent_id=message.id) as s:
|
|
s.input = message.content
|
|
if len(inspect.signature(func).parameters) > 0:
|
|
await func(message)
|
|
else:
|
|
await func()
|
|
|
|
config.code.on_message = wrap_user_function(with_parent_id)
|
|
return func
|
|
|
|
|
|
async def send_window_message(data: Any):
|
|
"""
|
|
Send custom data to the host window via a window.postMessage event.
|
|
|
|
Args:
|
|
data (Any): The data to send with the event.
|
|
"""
|
|
await context.emitter.send_window_message(data)
|
|
|
|
|
|
def on_window_message(func: Callable[[str], Any]) -> Callable:
|
|
"""
|
|
Hook to react to javascript postMessage events coming from the UI.
|
|
|
|
Args:
|
|
func (Callable[[str], Any]): The function to be called when a window message is received.
|
|
Takes the message content as a string parameter.
|
|
|
|
Returns:
|
|
Callable[[str], Any]: The decorated on_window_message function.
|
|
"""
|
|
config.code.on_window_message = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def on_chat_start(func: Callable) -> Callable:
|
|
"""
|
|
Hook to react to the user websocket connection event.
|
|
|
|
Args:
|
|
func (Callable[], Any]): The connection hook to execute.
|
|
|
|
Returns:
|
|
Callable[], Any]: The decorated hook.
|
|
"""
|
|
|
|
config.code.on_chat_start = wrap_user_function(
|
|
step(func, name="on_chat_start", type="run"), with_task=True
|
|
)
|
|
return func
|
|
|
|
|
|
def on_chat_resume(func: Callable[[ThreadDict], Any]) -> Callable:
|
|
"""
|
|
Hook to react to resume websocket connection event.
|
|
|
|
Args:
|
|
func (Callable[], Any]): The connection hook to execute.
|
|
|
|
Returns:
|
|
Callable[], Any]: The decorated hook.
|
|
"""
|
|
|
|
config.code.on_chat_resume = wrap_user_function(func, with_task=True)
|
|
return func
|
|
|
|
|
|
@overload
|
|
def set_chat_profiles(
|
|
func: Callable[[Optional["User"]], Awaitable[List["ChatProfile"]]],
|
|
) -> Callable[[Optional["User"]], Awaitable[List["ChatProfile"]]]: ...
|
|
|
|
|
|
@overload
|
|
def set_chat_profiles(
|
|
func: Callable[[Optional["User"], Optional["str"]], Awaitable[List["ChatProfile"]]],
|
|
) -> Callable[[Optional["User"], Optional["str"]], Awaitable[List["ChatProfile"]]]: ...
|
|
|
|
|
|
def set_chat_profiles(func):
|
|
"""
|
|
Programmatic declaration of the available chat profiles (can depend on the User from the session if authentication is setup).
|
|
|
|
Args:
|
|
func (Callable[[Optional["User"]], Awaitable[List["ChatProfile"]]]): The function declaring the chat profiles.
|
|
|
|
Returns:
|
|
Callable[[Optional["User"]], Awaitable[List["ChatProfile"]]]: The decorated function.
|
|
"""
|
|
|
|
config.code.set_chat_profiles = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
@overload
|
|
def set_starters(
|
|
func: Callable[[Optional["User"]], Awaitable[List["Starter"]]],
|
|
) -> Callable[[Optional["User"]], Awaitable[List["Starter"]]]: ...
|
|
|
|
|
|
@overload
|
|
def set_starters(
|
|
func: Callable[[Optional["User"], Optional["str"]], Awaitable[List["Starter"]]],
|
|
) -> Callable[[Optional["User"], Optional["str"]], Awaitable[List["Starter"]]]: ...
|
|
|
|
|
|
def set_starters(func):
|
|
"""
|
|
Programmatic declaration of the available starter (can depend on the User from the session if authentication is setup).
|
|
|
|
Args:
|
|
func (Callable[[Optional["User"], Optional["str"]], Awaitable[List["Starter"]]]): The function declaring the starters with optional user and language arguments.
|
|
|
|
Returns:
|
|
Callable[[Optional["User"], Optional["str"]], Awaitable[List["Starter"]]]: The decorated function.
|
|
"""
|
|
|
|
config.code.set_starters = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def on_chat_end(func: Callable) -> Callable:
|
|
"""
|
|
Hook to react to the user websocket disconnect event.
|
|
|
|
Args:
|
|
func (Callable[], Any]): The disconnect hook to execute.
|
|
|
|
Returns:
|
|
Callable[], Any]: The decorated hook.
|
|
"""
|
|
|
|
config.code.on_chat_end = wrap_user_function(func, with_task=True)
|
|
return func
|
|
|
|
|
|
def on_audio_start(func: Callable) -> Callable:
|
|
"""
|
|
Hook to react to the user initiating audio.
|
|
|
|
Returns:
|
|
Callable[], Any]: The decorated hook.
|
|
"""
|
|
|
|
config.code.on_audio_start = wrap_user_function(func, with_task=False)
|
|
return func
|
|
|
|
|
|
def on_audio_chunk(func: Callable) -> Callable:
|
|
"""
|
|
Hook to react to the audio chunks being sent.
|
|
|
|
Args:
|
|
chunk (InputAudioChunk): The audio chunk being sent.
|
|
|
|
Returns:
|
|
Callable[], Any]: The decorated hook.
|
|
"""
|
|
|
|
config.code.on_audio_chunk = wrap_user_function(func, with_task=False)
|
|
return func
|
|
|
|
|
|
def on_audio_end(func: Callable) -> Callable:
|
|
"""
|
|
Hook to react to the audio stream ending. This is called after the last audio chunk is sent.
|
|
|
|
Returns:
|
|
Callable[], Any]: The decorated hook.
|
|
"""
|
|
|
|
config.code.on_audio_end = wrap_user_function(
|
|
step(func, name="on_audio_end", type="run"), with_task=True
|
|
)
|
|
return func
|
|
|
|
|
|
def author_rename(
|
|
func: Callable[[str], Awaitable[str]],
|
|
) -> Callable[[str], Awaitable[str]]:
|
|
"""
|
|
Useful to rename the author of message to display more friendly author names in the UI.
|
|
Args:
|
|
func (Callable[[str], Awaitable[str]]): The function to be called to rename an author. Takes the original author name as parameter.
|
|
|
|
Returns:
|
|
Callable[[Any, str], Awaitable[Any]]: The decorated function.
|
|
"""
|
|
|
|
config.code.author_rename = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def on_mcp_connect(
|
|
func: Callable[[McpConnection, ClientSession], Awaitable[None]],
|
|
) -> Callable[[McpConnection, ClientSession], Awaitable[None]]:
|
|
"""
|
|
Called everytime an MCP is connected
|
|
"""
|
|
|
|
config.code.on_mcp_connect = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def on_mcp_disconnect(
|
|
func: Callable[[str, ClientSession], Awaitable[None]],
|
|
) -> Callable[[str, ClientSession], Awaitable[None]]:
|
|
"""
|
|
Called everytime an MCP is disconnected
|
|
"""
|
|
|
|
config.code.on_mcp_disconnect = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def on_stop(func: Callable) -> Callable:
|
|
"""
|
|
Hook to react to the user stopping a thread.
|
|
|
|
Args:
|
|
func (Callable[[], Any]): The stop hook to execute.
|
|
|
|
Returns:
|
|
Callable[[], Any]: The decorated stop hook.
|
|
"""
|
|
|
|
config.code.on_stop = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def action_callback(name: str) -> Callable:
|
|
"""
|
|
Callback to call when an action is clicked in the UI.
|
|
|
|
Args:
|
|
func (Callable[[Action], Any]): The action callback to execute. First parameter is the action.
|
|
"""
|
|
|
|
def decorator(func: Callable[[Action], Any]):
|
|
config.code.action_callbacks[name] = wrap_user_function(func, with_task=False)
|
|
return func
|
|
|
|
return decorator
|
|
|
|
|
|
def on_settings_update(
|
|
func: Callable[[Dict[str, Any]], Any],
|
|
) -> Callable[[Dict[str, Any]], Any]:
|
|
"""
|
|
Hook to react to the user changing any settings.
|
|
|
|
Args:
|
|
func (Callable[], Any]): The hook to execute after settings were changed.
|
|
|
|
Returns:
|
|
Callable[], Any]: The decorated hook.
|
|
"""
|
|
|
|
config.code.on_settings_update = wrap_user_function(func, with_task=True)
|
|
return func
|
|
|
|
|
|
def data_layer(
|
|
func: Callable[[], BaseDataLayer],
|
|
) -> Callable[[], BaseDataLayer]:
|
|
"""
|
|
Hook to configure custom data layer.
|
|
"""
|
|
|
|
# We don't use wrap_user_function here because:
|
|
# 1. We don't need to support async here and;
|
|
# 2. We don't want to change the API for get_data_layer() to be async, everywhere (at this point).
|
|
config.code.data_layer = func
|
|
return func
|
|
|
|
|
|
def on_feedback(func: Callable) -> Callable:
|
|
"""
|
|
Hook to react to user feedback events from the UI.
|
|
The decorated function is called every time feedback is received.
|
|
|
|
Args:
|
|
func (Callable[[Feedback], Any]): The function to be called when feedback is received. Takes a cl.Feedback object.
|
|
|
|
Example:
|
|
@cl.on_feedback
|
|
async def on_feedback(feedback: Feedback):
|
|
print(f"Received feedback: {feedback.value} for step {feedback.forId}")
|
|
# Handle feedback here
|
|
|
|
Returns:
|
|
Callable[[Feedback], Any]: The decorated on_feedback function.
|
|
"""
|
|
config.code.on_feedback = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def on_slack_reaction_added(func: Callable[[Dict[str, Any]], Any]) -> Callable:
|
|
"""
|
|
Hook to react to Slack reaction_added events.
|
|
The decorated function is called every time a user adds a reaction to a message in Slack.
|
|
|
|
Args:
|
|
func (Callable[[Dict[str, Any]], Any]): The function to be called when a reaction is added.
|
|
Takes a Slack event dictionary containing:
|
|
- reaction: The emoji reaction name (e.g., "thumbsup")
|
|
- user: The user ID who added the reaction
|
|
- item: Dictionary with type, ts, and channel of the reacted item
|
|
|
|
Example:
|
|
@cl.on_slack_reaction_added
|
|
async def handle_reaction(event: Dict[str, Any]):
|
|
reaction = event.get("reaction")
|
|
user_id = event.get("user")
|
|
print(f"User {user_id} added reaction {reaction}")
|
|
# Handle reaction here
|
|
|
|
Returns:
|
|
Callable[[Dict[str, Any]], Any]: The decorated on_slack_reaction_added function.
|
|
"""
|
|
config.code.on_slack_reaction_added = wrap_user_function(func)
|
|
return func
|
|
|
|
|
|
def on_shared_thread_view(
|
|
func: Callable[[ThreadDict, Optional[User]], Awaitable[bool]],
|
|
) -> Callable[[ThreadDict, Optional[User]], Awaitable[bool]]:
|
|
"""Hook to authorize viewing a shared thread.
|
|
|
|
Users must implement and return True to allow a non-author to view a thread.
|
|
Thread metadata contains "is_shared" boolean flag and "shared_at" timestamp for custom thread sharing.
|
|
Signature: async (thread: ThreadDict, viewer: Optional[User]) -> bool
|
|
"""
|
|
config.code.on_shared_thread_view = wrap_user_function(func)
|
|
return func
|