ai-station/app.py.broken-20251229-081214

607 lines
20 KiB
Plaintext
Raw Permalink Normal View History

import os
import re
import uuid
import shutil
from datetime import datetime
from typing import Optional, Dict, List
import chainlit as cl
import ollama
import fitz # PyMuPDF
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
from chainlit.data.sql_alchemy import SQLAlchemyDataLayer
from chainlit.data.storage_clients import BaseStorageClient
# === CONFIGURAZIONE ===
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://ai_user:secure_password_here@postgres:5432/ai_station")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.1.243:11434")
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333")
WORKSPACES_DIR = "./workspaces"
STORAGE_DIR = "./.files"
os.makedirs(STORAGE_DIR, exist_ok=True)
os.makedirs(WORKSPACES_DIR, exist_ok=True)
# === MAPPING UTENTI E RUOLI ===
USER_PROFILES = {
"giuseppe@defranceschi.pro": {
"role": "admin",
"name": "Giuseppe",
"workspace": "admin_workspace",
"rag_collection": "admin_docs",
"capabilities": ["debug", "system_prompts", "user_management", "all_models"],
"show_code": True
},
"giuseppe.defranceschi@gmail.com": {
"role": "admin",
"name": "Giuseppe",
"workspace": "admin_workspace",
"rag_collection": "admin_docs",
"capabilities": ["debug", "system_prompts", "user_management", "all_models"],
"show_code": True
},
"federica.tecchio@gmail.com": {
"role": "business",
"name": "Federica",
"workspace": "business_workspace",
"rag_collection": "contabilita",
"capabilities": ["pdf_upload", "basic_chat"],
"show_code": False
},
"riccardob545@gmail.com": {
"role": "engineering",
"name": "Riccardo",
"workspace": "engineering_workspace",
"rag_collection": "engineering_docs",
"capabilities": ["code_execution", "data_viz", "advanced_chat"],
"show_code": True
},
"giuliadefranceschi05@gmail.com": {
"role": "architecture",
"name": "Giulia",
"workspace": "architecture_workspace",
"rag_collection": "architecture_manuals",
"capabilities": ["visual_chat", "pdf_upload", "image_gen"],
"show_code": False
}
}
# === CUSTOM LOCAL STORAGE CLIENT ===
class LocalStorageClient(BaseStorageClient):
"""Storage locale su filesystem per file/elementi"""
def __init__(self, storage_path: str):
self.storage_path = storage_path
os.makedirs(storage_path, exist_ok=True)
async def upload_file(
self,
object_key: str,
data: bytes,
mime: str = "application/octet-stream",
overwrite: bool = True,
) -> Dict[str, str]:
"""Salva file localmente"""
file_path = os.path.join(self.storage_path, object_key)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as f:
f.write(data)
return {
"object_key": object_key,
"url": f"/files/{object_key}"
}
# === INIZIALIZZAZIONE DATA LAYER ===
print("🔧 Inizializzazione database...")
storage_client = LocalStorageClient(storage_path=STORAGE_DIR)
try:
data_layer = SQLAlchemyDataLayer(
conninfo=DATABASE_URL,
storage_provider=storage_client,
user_thread_limit=1000,
show_logger=False
)
# ⬇️ QUESTA RIGA È CRUCIALE PER LA PERSISTENZA
cl.data_layer = data_layer
print("✅ SQLAlchemyDataLayer + LocalStorage initialized successfully")
print(f"✅ Data layer set: {cl.data_layer is not None}")
except Exception as e:
print(f"❌ Failed to initialize data layer: {e}")
cl.data_layer = None
# === OAUTH CALLBACK CON RUOLI ===
@cl.oauth_callback
def oauth_callback(
provider_id: str,
token: str,
raw_user_data: Dict[str, str],
default_user: cl.User,
) -> Optional[cl.User]:
"""Validazione e arricchimento dati utente con ruoli"""
if provider_id == "google":
email = raw_user_data.get("email", "").lower()
# Verifica se utente è autorizzato
if email not in USER_PROFILES:
print(f"❌ Utente non autorizzato: {email}")
return None # Nega accesso
# Arricchisci metadata con profilo
profile = USER_PROFILES[email]
default_user.metadata.update({
"picture": raw_user_data.get("picture", ""),
"locale": raw_user_data.get("locale", "en"),
"role": profile["role"],
"workspace": profile["workspace"],
"rag_collection": profile["rag_collection"],
"capabilities": profile["capabilities"],
"show_code": profile["show_code"],
"display_name": profile["name"]
})
print(f"✅ Utente autorizzato: {email} - Ruolo: {profile['role']}")
return default_user
return default_user
# === UTILITY FUNCTIONS ===
def get_user_profile(user_email: str) -> Dict:
"""Recupera profilo utente"""
return USER_PROFILES.get(user_email.lower(), {
"role": "guest",
"name": "Ospite",
"workspace": "guest_workspace",
"rag_collection": "documents",
"capabilities": [],
"show_code": False
})
def create_workspace(workspace_name: str) -> str:
"""Crea directory workspace se non esiste"""
workspace_path = os.path.join(WORKSPACES_DIR, workspace_name)
os.makedirs(workspace_path, exist_ok=True)
return workspace_path
def save_code_to_file(code: str, workspace: str) -> str:
"""Salva blocco codice come file .py"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"code_{timestamp}.py"
file_path = os.path.join(WORKSPACES_DIR, workspace, file_name)
with open(file_path, "w", encoding="utf-8") as f:
f.write(code)
return file_path
def extract_text_from_pdf(pdf_path: str) -> str:
"""Estrae testo da PDF usando PyMuPDF"""
try:
doc = fitz.open(pdf_path)
text_parts = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
text_parts.append(f"--- Pagina {page_num + 1} ---\n{text}\n")
doc.close()
return "\n".join(text_parts)
except Exception as e:
print(f"❌ Errore estrazione PDF: {e}")
return ""
# === QDRANT FUNCTIONS ===
async def get_qdrant_client() -> AsyncQdrantClient:
"""Connessione a Qdrant"""
return AsyncQdrantClient(url=QDRANT_URL)
async def ensure_collection(collection_name: str):
"""Crea collection se non esiste"""
client = await get_qdrant_client()
if not await client.collection_exists(collection_name):
await client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=768, distance=Distance.COSINE)
)
async def get_embeddings(text: str) -> list:
"""Genera embeddings con Ollama"""
max_length = 2000
if len(text) > max_length:
text = text[:max_length]
client = ollama.Client(host=OLLAMA_URL)
try:
response = client.embed(model='nomic-embed-text', input=text)
if 'embeddings' in response:
return response['embeddings'][0]
return response.get('embedding', [])
except Exception as e:
print(f"❌ Errore Embedding: {e}")
return []
def chunk_text(text: str, max_length: int = 1500, overlap: int = 200) -> list:
"""Divide testo in chunks con overlap"""
if len(text) <= max_length:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + max_length
if end < len(text):
last_period = text.rfind('.', start, end)
last_newline = text.rfind('\n', start, end)
split_point = max(last_period, last_newline)
if split_point > start:
end = split_point + 1
chunks.append(text[start:end].strip())
start = end - overlap
return chunks
async def index_document(file_name: str, content: str, collection_name: str) -> bool:
"""Indicizza documento su Qdrant in collection specifica"""
try:
await ensure_collection(collection_name)
chunks = chunk_text(content, max_length=1500)
qdrant_client = await get_qdrant_client()
points = []
for i, chunk in enumerate(chunks):
embeddings = await get_embeddings(chunk)
if not embeddings:
continue
point_id = str(uuid.uuid4())
point = PointStruct(
id=point_id,
vector=embeddings,
payload={
"file_name": file_name,
"content": chunk,
"chunk_index": i,
"total_chunks": len(chunks),
"indexed_at": datetime.now().isoformat()
}
)
points.append(point)
if points:
await qdrant_client.upsert(collection_name=collection_name, points=points)
return True
return False
except Exception as e:
print(f"❌ Errore indicizzazione: {e}")
return False
async def search_qdrant(query_text: str, collection_name: str, limit: int = 5) -> str:
"""Ricerca documenti rilevanti in collection specifica"""
try:
qdrant_client = await get_qdrant_client()
# Verifica se collection esiste
if not await qdrant_client.collection_exists(collection_name):
return ""
query_embedding = await get_embeddings(query_text)
if not query_embedding:
return ""
search_result = await qdrant_client.query_points(
collection_name=collection_name,
query=query_embedding,
limit=limit
)
contexts = []
seen_files = set()
for hit in search_result.points:
if hit.payload:
file_name = hit.payload.get('file_name', 'Unknown')
content = hit.payload.get('content', '')
chunk_idx = hit.payload.get('chunk_index', 0)
score = hit.score if hasattr(hit, 'score') else 0
file_key = f"{file_name}_{chunk_idx}"
if file_key not in seen_files:
seen_files.add(file_key)
# ✅ FIX: Markdown code block corretto
contexts.append(
f"📄 **{file_name}** (chunk {chunk_idx+1}, score: {score:.2f})\n"
f"``````"
)
return "\n\n".join(contexts) if contexts else ""
except Exception as e:
print(f"❌ Errore ricerca Qdrant: {e}")
return ""
# === CHAINLIT HANDLERS ===
@cl.on_chat_start
async def on_chat_start():
"""Inizializzazione chat con profili utente"""
user = cl.user_session.get("user")
if user:
user_email = user.identifier
profile = get_user_profile(user_email)
user_name = profile["name"]
user_role = profile["role"]
workspace = profile["workspace"]
user_picture = user.metadata.get("picture", "")
show_code = profile["show_code"]
capabilities = profile["capabilities"]
else:
user_email = "guest@local"
user_name = "Ospite"
user_role = "guest"
workspace = "guest_workspace"
user_picture = ""
show_code = False
capabilities = []
create_workspace(workspace)
# Salva in sessione
cl.user_session.set("email", user_email)
cl.user_session.set("name", user_name)
cl.user_session.set("role", user_role)
cl.user_session.set("workspace", workspace)
cl.user_session.set("show_code", show_code)
cl.user_session.set("capabilities", capabilities)
cl.user_session.set("rag_collection", profile.get("rag_collection", "documents"))
# Settings basati su ruolo
settings_widgets = [
cl.input_widget.Select(
id="model",
label="🤖 Modello AI",
values=["glm-4.6:cloud", "llama3.2", "mistral", "qwen2.5-coder:32b"],
initial_value="glm-4.6:cloud",
),
cl.input_widget.Slider(
id="temperature",
label="🌡️ Temperatura",
initial=0.7,
min=0,
max=2,
step=0.1,
),
]
# Solo admin può disabilitare RAG
if user_role == "admin":
settings_widgets.append(
cl.input_widget.Switch(
id="rag_enabled",
label="📚 Abilita RAG",
initial=True,
)
)
# ⬇️ INVIA SETTINGS (questo attiva l'icona ⚙️)
await cl.ChatSettings(settings_widgets).send()
# Emoji ruolo
role_emoji = {
"admin": "👑",
"business": "💼",
"engineering": "⚙️",
"architecture": "🏛️",
"guest": "👤"
}
persistence_status = "✅ Attiva" if cl.data_layer else "⚠️ Disattivata"
welcome_msg = f"{role_emoji.get(user_role, '👋')} **Benvenuto, {user_name}!**\n\n"
if user_picture:
welcome_msg += f"![Avatar]({user_picture})\n\n"
welcome_msg += (
f"🎭 **Ruolo**: {user_role.upper()}\n"
f"📁 **Workspace**: `{workspace}`\n"
f"💾 **Persistenza**: {persistence_status}\n"
f"🤖 **Modello**: `glm-4.6:cloud`\n\n"
)
# Capabilities specifiche
if "debug" in capabilities:
welcome_msg += "🔧 **Modalità Debug**: Attiva\n"
if "user_management" in capabilities:
welcome_msg += "👥 **Gestione Utenti**: Disponibile\n"
if not show_code:
welcome_msg += "🎨 **Modalità Visuale**: Codice nascosto\n"
welcome_msg += "\n⚙ **Usa le Settings (icona ⚙️ in alto a destra) per personalizzare!**"
await cl.Message(content=welcome_msg).send()
@cl.on_settings_update
async def on_settings_update(settings):
"""Gestisce aggiornamento settings utente"""
cl.user_session.set("settings", settings)
model = settings.get("model", "glm-4.6:cloud")
temp = settings.get("temperature", 0.7)
rag = settings.get("rag_enabled", True)
await cl.Message(
content=f"✅ **Settings aggiornati**:\n"
f"- 🤖 Modello: `{model}`\n"
f"- 🌡️ Temperatura: `{temp}`\n"
f"- 📚 RAG: {'✅ Attivo' if rag else '❌ Disattivato'}"
).send()
@cl.on_message
async def on_message(message: cl.Message):
"""Gestione messaggi utente con RAG intelligente"""
user_email = cl.user_session.get("email", "guest")
user_role = cl.user_session.get("role", "guest")
workspace = cl.user_session.get("workspace", "guest_workspace")
show_code = cl.user_session.get("show_code", False)
rag_collection = cl.user_session.get("rag_collection", "documents")
settings = cl.user_session.get("settings", {})
model = settings.get("model", "glm-4.6:cloud")
temperature = settings.get("temperature", 0.7)
# Admin può disabilitare RAG, altri lo hanno sempre attivo
rag_enabled = settings.get("rag_enabled", True) if user_role == "admin" else True
try:
# Gestisci upload file
if message.elements:
await handle_file_uploads(message.elements, workspace, rag_collection)
# RAG Search solo se abilitato
context_text = ""
if rag_enabled:
context_text = await search_qdrant(message.content, rag_collection, limit=5)
# Costruisci prompt con o senza contesto
if context_text:
system_prompt = (
"Sei un assistente AI esperto. "
"Usa il seguente contesto per arricchire la tua risposta, "
"ma puoi anche rispondere usando la tua conoscenza generale se il contesto non è sufficiente."
)
full_prompt = f"{system_prompt}\n\n**CONTESTO DOCUMENTI:**\n{context_text}\n\n**DOMANDA UTENTE:**\n{message.content}"
else:
system_prompt = "Sei un assistente AI esperto e disponibile. Rispondi in modo chiaro e utile."
full_prompt = f"{system_prompt}\n\n**DOMANDA UTENTE:**\n{message.content}"
# Streaming risposta da Ollama
client = ollama.Client(host=OLLAMA_URL)
msg = cl.Message(content="")
await msg.send()
messages = [{"role": "user", "content": full_prompt}]
stream = client.chat(
model=model,
messages=messages,
stream=True,
options={"temperature": temperature}
)
full_response = ""
for chunk in stream:
content = chunk['message']['content']
full_response += content
await msg.stream_token(content)
await msg.update()
# ✅ FIX: Estrai codice Python con regex corretto
code_blocks = re.findall(r"``````", full_response, re.DOTALL)
if code_blocks:
elements = []
# Se show_code è False, nascondi il codice dalla risposta
if not show_code:
cleaned_response = re.sub(
r"``````",
"[💻 Codice eseguito internamente]",
full_response,
flags=re.DOTALL
)
await msg.update(content=cleaned_response)
# Salva codice nel workspace
for code in code_blocks:
file_path = save_code_to_file(code.strip(), workspace)
elements.append(
cl.File(
name=os.path.basename(file_path),
path=file_path,
display="inline" if show_code else "side"
)
)
if show_code:
await cl.Message(
content=f"💾 Codice salvato in workspace `{workspace}`",
elements=elements
).send()
except Exception as e:
await cl.Message(content=f"❌ **Errore:** {str(e)}").send()
async def handle_file_uploads(elements, workspace: str, collection_name: str):
"""Gestisce upload e indicizzazione file in collection specifica"""
for element in elements:
try:
dest_path = os.path.join(WORKSPACES_DIR, workspace, element.name)
shutil.copy(element.path, dest_path)
content = None
if element.name.lower().endswith('.pdf'):
await cl.Message(content=f"📄 Elaborazione PDF **{element.name}**...").send()
content = extract_text_from_pdf(dest_path)
if not content:
await cl.Message(
content=f"⚠️ **{element.name}**: PDF vuoto o non leggibile"
).send()
continue
elif element.name.lower().endswith('.txt'):
with open(dest_path, 'r', encoding='utf-8') as f:
content = f.read()
else:
await cl.Message(
content=f"📁 **{element.name}** salvato in workspace (supportati: .pdf, .txt)"
).send()
continue
if content:
success = await index_document(element.name, content, collection_name)
if success:
word_count = len(content.split())
await cl.Message(
content=f"✅ **{element.name}** indicizzato in `{collection_name}`\n"
f"📊 Parole estratte: {word_count:,}"
).send()
else:
await cl.Message(
content=f"⚠️ Errore indicizzazione **{element.name}**"
).send()
except Exception as e:
await cl.Message(
content=f"❌ Errore con **{element.name}**: {str(e)}"
).send()