import os import re import uuid import shutil import requests import time import json from datetime import datetime from typing import Optional, Dict, List, Any import chainlit as cl import ollama from docling.document_converter import DocumentConverter from qdrant_client import AsyncQdrantClient # CORREZIONE IMPORT: Importiamo le classi necessarie direttamente dalla libreria from qdrant_client.models import PointStruct, Distance, VectorParams, SparseVectorParams, Prefetch from chainlit.data.sql_alchemy import SQLAlchemyDataLayer from chainlit.types import ThreadDict from functools import lru_cache # === FIX IMPORT ROBUSTO === try: from chainlit.data.storage_clients import BaseStorageClient except ImportError: try: from chainlit.data.base import BaseStorageClient except ImportError: from chainlit.data.storage_clients.base import BaseStorageClient # === CONFIGURAZIONE === DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://ai_user:secure_password_here@postgres:5432/ai_station") OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.1.243:11434") QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333") BGE_API_URL = os.getenv("BGE_API_URL", "http://192.168.1.243:8001/embed") VISION_MODEL = "minicpm-v" DEFAULT_TEXT_MODEL = "glm-4.6:cloud" WORKSPACES_DIR = "./workspaces" STORAGE_DIR = "./.files" os.makedirs(STORAGE_DIR, exist_ok=True) os.makedirs(WORKSPACES_DIR, exist_ok=True) # === MAPPING UTENTI === USER_PROFILES = { "giuseppe@defranceschi.pro": { "role": "admin", "name": "Giuseppe", "workspace": "admin_workspace", "rag_collection": "admin_docs", "capabilities": ["debug", "all"], "show_code": True }, "giuseppe.defranceschi@gmail.com": { "role": "admin", "name": "Giuseppe", "workspace": "admin_workspace", "rag_collection": "admin_docs", "capabilities": ["debug", "all"], "show_code": True }, "federica.tecchio@gmail.com": { "role": "business", "name": "Federica", "workspace": "business_workspace", "rag_collection": "contabilita", "capabilities": ["basic_chat"], "show_code": False }, "riccardob545@gmail.com": { "role": "engineering", "name": "Riccardo", "workspace": "engineering_workspace", "rag_collection": "engineering_docs", "capabilities": ["code"], "show_code": True }, "giuliadefranceschi05@gmail.com": { "role": "architecture", "name": "Giulia", "workspace": "architecture_workspace", "rag_collection": "architecture_manuals", "capabilities": ["visual"], "show_code": False } } # === STORAGE CLIENT === class LocalStorageClient(BaseStorageClient): def __init__(self, storage_path: str): self.storage_path = storage_path os.makedirs(storage_path, exist_ok=True) async def upload_file(self, object_key: str, data: bytes, mime: str = "application/octet-stream", overwrite: bool = True) -> Dict[str, str]: file_path = os.path.join(self.storage_path, object_key) os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "wb") as f: f.write(data) return {"object_key": object_key, "url": f"/files/{object_key}"} async def get_read_url(self, object_key: str) -> str: return f"/files/{object_key}" async def delete_file(self, object_key: str) -> bool: path = os.path.join(self.storage_path, object_key) if os.path.exists(path): os.remove(path); return True return False async def close(self): pass @cl.data_layer def get_data_layer(): return SQLAlchemyDataLayer(conninfo=DATABASE_URL, storage_provider=LocalStorageClient(STORAGE_DIR)) # === OAUTH & UTILS === @cl.oauth_callback def oauth_callback(provider_id: str, token: str, raw_user_data: Dict[str, str], default_user: cl.User) -> Optional[cl.User]: if provider_id == "google": email = raw_user_data.get("email", "").lower() profile = USER_PROFILES.get(email, USER_PROFILES.get("guest", {"role": "guest", "name": "Guest", "workspace": "guest", "rag_collection": "public", "show_code": False})) default_user.metadata.update({"role": profile["role"], "workspace": profile["workspace"], "rag_collection": profile["rag_collection"], "show_code": profile["show_code"], "display_name": profile["name"]}) return default_user return default_user def create_workspace(workspace_name: str) -> str: path = os.path.join(WORKSPACES_DIR, workspace_name) os.makedirs(path, exist_ok=True) return path # === CORE: DOCLING === def process_file_with_docling(file_path: str) -> str: try: converter = DocumentConverter() result = converter.convert(file_path) return result.document.export_to_markdown() except Exception as e: print(f"❌ Docling Error: {e}") return "" # === CORE: BGE-M3 CLIENT === def get_bge_embeddings(text: str) -> Optional[Dict[str, Any]]: try: payload = {"texts": [text[:8000]]} response = requests.post(BGE_API_URL, json=payload, timeout=30) response.raise_for_status() data = response.json().get("data", []) if data: return data[0] return None except Exception as e: print(f"❌ BGE API Error: {e}") return None # === CORE: QDRANT === async def ensure_collection(collection_name: str): client = AsyncQdrantClient(url=QDRANT_URL) if not await client.collection_exists(collection_name): await client.create_collection( collection_name=collection_name, vectors_config={"dense": VectorParams(size=1024, distance=Distance.COSINE)}, sparse_vectors_config={"sparse": SparseVectorParams()} ) async def index_document(file_name: str, content: str, collection_name: str): await ensure_collection(collection_name) client = AsyncQdrantClient(url=QDRANT_URL) chunk_size = 2000 overlap = 200 points = [] for i in range(0, len(content), chunk_size - overlap): chunk = content[i : i + chunk_size] embedding_data = get_bge_embeddings(chunk) if embedding_data: points.append(PointStruct( id=str(uuid.uuid4()), vector={ "dense": embedding_data["dense"], "sparse": embedding_data["sparse"] }, payload={ "file_name": file_name, "content": chunk, "indexed_at": datetime.now().isoformat() } )) if points: await client.upsert(collection_name=collection_name, points=points) return len(points) return 0 async def search_hybrid(query: str, collection_name: str, limit: int = 4) -> str: client = AsyncQdrantClient(url=QDRANT_URL) if not await client.collection_exists(collection_name): return "" query_emb = get_bge_embeddings(query) if not query_emb: return "" # CORREZIONE QUI: Usiamo l'oggetto Prefetch importato correttamente results = await client.query_points( collection_name=collection_name, prefetch=[ Prefetch( query=query_emb["sparse"], using="sparse", limit=limit * 2 ) ], query=query_emb["dense"], using="dense", limit=limit ) context = [] for hit in results.points: context.append(f"--- DA {hit.payload['file_name']} ---\n{hit.payload['content']}") return "\n\n".join(context) # === Caching Embeddings === @lru_cache(maxsize=1000) def get_bge_embeddings_cached(text: str): """Cache per query ripetute""" return get_bge_embeddings(text) # === CHAINLIT HANDLERS === @cl.on_chat_start async def start(): # 1. Profilo utente user = cl.user_session.get("user") email = user.identifier if user else "guest" profile = USER_PROFILES.get(email, USER_PROFILES["giuseppe@defranceschi.pro"]) cl.user_session.set("profile", profile) create_workspace(profile["workspace"]) # 2. Badge HTML personalizzato role_color = { "admin": "#e74c3c", "engineering": "#3498db", "business": "#2ecc71", "architecture": "#9b59b6", }.get(profile["role"], "#95a5a6") badge_html = f"""