ai-station/app-oauth2.py

420 lines
14 KiB
Python

import os
import re
import uuid
import shutil
import httpx
from datetime import datetime
from typing import Optional
import chainlit as cl
import ollama
import fitz # PyMuPDF
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
from chainlit.data.sql_alchemy import SQLAlchemyDataLayer
from authlib.integrations.httpx_client import AsyncOAuth2Client
from authlib.integrations.starlette_client import OAuth
# === CONFIGURAZIONE ===
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://ai_user:secure_password_here@postgres:5432/ai_station")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.1.243:11434")
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333")
GOOGLE_CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "")
GOOGLE_CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "")
# === INIZIALIZZAZIONE DATA LAYER ===
try:
data_layer = SQLAlchemyDataLayer(conninfo=DATABASE_URL)
cl.data_layer = data_layer
print("✅ SQLAlchemyDataLayer initialized successfully")
except Exception as e:
print(f"❌ Failed to initialize data layer: {e}")
cl.data_layer = None
# === OAUTH2 SETUP ===
oauth = OAuth()
oauth.register(
name='google',
client_id=GOOGLE_CLIENT_ID,
client_secret=GOOGLE_CLIENT_SECRET,
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid profile email'}
)
WORKSPACES_DIR = "./workspaces"
# === UTILITY FUNCTIONS ===
def create_workspace(user_email: str):
"""Crea directory workspace se non esiste"""
# Usa email come identifier (sostituisce caratteri problematici)
safe_email = user_email.replace("@", "_").replace(".", "_")
workspace_path = os.path.join(WORKSPACES_DIR, safe_email)
os.makedirs(workspace_path, exist_ok=True)
return workspace_path
def save_code_to_file(code: str, user_email: str) -> str:
"""Salva blocco codice come file .py"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"code_{timestamp}.py"
safe_email = user_email.replace("@", "_").replace(".", "_")
file_path = os.path.join(WORKSPACES_DIR, safe_email, file_name)
with open(file_path, "w", encoding="utf-8") as f:
f.write(code)
return file_path
def extract_text_from_pdf(pdf_path: str) -> str:
"""Estrae testo da PDF usando PyMuPDF"""
try:
doc = fitz.open(pdf_path)
text_parts = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
text_parts.append(f"--- Pagina {page_num + 1} ---\n{text}\n")
doc.close()
return "\n".join(text_parts)
except Exception as e:
print(f"❌ Errore estrazione PDF: {e}")
return ""
# === QDRANT FUNCTIONS ===
async def get_qdrant_client() -> AsyncQdrantClient:
"""Connessione a Qdrant"""
client = AsyncQdrantClient(url=QDRANT_URL)
collection_name = "documents"
# Crea collection se non esiste
if not await client.collection_exists(collection_name):
await client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=768, distance=Distance.COSINE)
)
return client
async def get_embeddings(text: str) -> list:
"""Genera embeddings con Ollama"""
client = ollama.Client(host=OLLAMA_URL)
# Limita lunghezza per evitare errori
max_length = 2000
if len(text) > max_length:
text = text[:max_length]
try:
response = client.embed(model='nomic-embed-text', input=text)
if 'embeddings' in response:
return response['embeddings'][0]
return response.get('embedding', [])
except Exception as e:
print(f"❌ Errore Embedding: {e}")
return []
async def index_document(file_name: str, content: str) -> bool:
"""Indicizza documento su Qdrant"""
try:
# Suddividi documento lungo in chunks
chunks = chunk_text(content, max_length=1500)
qdrant_client = await get_qdrant_client()
points = []
for i, chunk in enumerate(chunks):
embeddings = await get_embeddings(chunk)
if not embeddings:
continue
point_id = str(uuid.uuid4())
point = PointStruct(
id=point_id,
vector=embeddings,
payload={
"file_name": file_name,
"content": chunk,
"chunk_index": i,
"total_chunks": len(chunks),
"indexed_at": datetime.now().isoformat()
}
)
points.append(point)
if points:
await qdrant_client.upsert(collection_name="documents", points=points)
return True
return False
except Exception as e:
print(f"❌ Errore indicizzazione: {e}")
return False
def chunk_text(text: str, max_length: int = 1500, overlap: int = 200) -> list:
"""Divide testo in chunks con overlap"""
if len(text) <= max_length:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + max_length
# Cerca l'ultimo punto/newline prima del limite
if end < len(text):
last_period = text.rfind('.', start, end)
last_newline = text.rfind('\n', start, end)
split_point = max(last_period, last_newline)
if split_point > start:
end = split_point + 1
chunks.append(text[start:end].strip())
start = end - overlap # Overlap per continuità
return chunks
async def search_qdrant(query_text: str, limit: int = 5) -> str:
"""Ricerca documenti rilevanti"""
try:
qdrant_client = await get_qdrant_client()
query_embedding = await get_embeddings(query_text)
if not query_embedding:
return ""
search_result = await qdrant_client.query_points(
collection_name="documents",
query=query_embedding,
limit=limit
)
contexts = []
seen_files = set()
for hit in search_result.points:
if hit.payload:
file_name = hit.payload.get('file_name', 'Unknown')
content = hit.payload.get('content', '')
chunk_idx = hit.payload.get('chunk_index', 0)
score = hit.score if hasattr(hit, 'score') else 0
# Evita duplicati dello stesso file
file_key = f"{file_name}_{chunk_idx}"
if file_key not in seen_files:
seen_files.add(file_key)
contexts.append(
f"📄 **{file_name}** (chunk {chunk_idx+1}, score: {score:.2f})\n"
f"```\n{content[:600]}...\n```"
)
return "\n\n".join(contexts) if contexts else ""
except Exception as e:
print(f"❌ Errore ricerca Qdrant: {e}")
return ""
# === CHAINLIT HANDLERS ===
@cl.oauth_callback
def oauth_callback(provider_id: str, token: dict, raw_user_data: dict, question_filter) -> Optional[cl.User]:
"""Callback OAuth2 per autenticazione Google"""
if provider_id == "google":
user_email = raw_user_data.get("email", "")
user_name = raw_user_data.get("name", "User")
# Crea/recupera utente
user = cl.User(
identifier=user_email,
metadata={
"email": user_email,
"name": user_name,
"picture": raw_user_data.get("picture", ""),
"provider": "google"
}
)
# Crea workspace per l'utente
create_workspace(user_email)
return user
return None
@cl.on_chat_start
async def on_chat_start():
"""Inizializzazione chat"""
# Recupera user da OAuth2
user = cl.user_session.get("user")
if user:
user_email = user.identifier
user_name = user.metadata.get("name", "User")
# Crea workspace
create_workspace(user_email)
# Salva nella sessione
cl.user_session.set("email", user_email)
cl.user_session.set("name", user_name)
# Verifica persistenza
persistence_status = "✅ Attiva" if cl.data_layer else "⚠️ Disattivata"
await cl.Message(
content=f"👋 **Benvenuto, {user_name}!**\n\n"
f"🚀 **AI Station Ready**\n"
f"📤 Upload **PDF** o **.txt** per indicizzarli nel RAG\n"
f"💾 Persistenza conversazioni: {persistence_status}\n"
f"🤖 Modello: `glm-4.6:cloud` @ {OLLAMA_URL}\n\n"
f"💡 **Supporto PDF attivo**: Carica fatture, F24, dichiarazioni fiscali!"
).send()
else:
await cl.Message(
content="❌ Autenticazione fallita. Riprova."
).send()
@cl.on_message
async def on_message(message: cl.Message):
"""Gestione messaggi utente"""
user_email = cl.user_session.get("email", "guest")
user_name = cl.user_session.get("name", "User")
try:
# === STEP 1: Gestione Upload ===
if message.elements:
await handle_file_uploads(message.elements, user_email)
# === STEP 2: RAG Search ===
context_text = await search_qdrant(message.content, limit=5)
# === STEP 3: Preparazione Prompt ===
system_prompt = (
"Sei un assistente AI esperto in analisi documentale e fiscale. "
"Usa ESCLUSIVAMENTE il seguente contesto per rispondere. "
"Se la risposta non è nel contesto, dillo chiaramente."
)
if context_text:
full_prompt = f"{system_prompt}\n\n**CONTESTO DOCUMENTI:**\n{context_text}\n\n**DOMANDA UTENTE:**\n{message.content}"
else:
full_prompt = f"{system_prompt}\n\n**DOMANDA UTENTE:**\n{message.content}"
# === STEP 4: Usa glm-4.6:cloud ===
client = ollama.Client(host=OLLAMA_URL)
msg = cl.Message(content="")
await msg.send()
messages = [{"role": "user", "content": full_prompt}]
stream = client.chat(
model='glm-4.6:cloud',
messages=messages,
stream=True
)
full_response = ""
for chunk in stream:
content = chunk['message']['content']
full_response += content
await msg.stream_token(content)
await msg.update()
# === STEP 5: Estrai e Salva Codice ===
code_blocks = re.findall(r"```python\n(.*?)```", full_response, re.DOTALL)
if code_blocks:
elements = []
for code in code_blocks:
file_path = save_code_to_file(code.strip(), user_email)
elements.append(
cl.File(
name=os.path.basename(file_path),
path=file_path,
display="inline"
)
)
await cl.Message(
content=f"💾 Codice salvato in workspace",
elements=elements
).send()
except Exception as e:
await cl.Message(content=f"❌ **Errore:** {str(e)}").send()
async def handle_file_uploads(elements, user_email: str):
"""Gestisce upload e indicizzazione file (TXT e PDF)"""
for element in elements:
try:
# Salva file
safe_email = user_email.replace("@", "_").replace(".", "_")
dest_path = os.path.join(WORKSPACES_DIR, safe_email, element.name)
shutil.copy(element.path, dest_path)
content = None
# Estrai testo in base al tipo di file
if element.name.lower().endswith('.pdf'):
await cl.Message(content=f"📄 Elaborazione PDF **{element.name}**...").send()
content = extract_text_from_pdf(dest_path)
if not content:
await cl.Message(
content=f"⚠️ **{element.name}**: PDF vuoto o non leggibile"
).send()
continue
elif element.name.lower().endswith('.txt'):
with open(dest_path, 'r', encoding='utf-8') as f:
content = f.read()
else:
await cl.Message(
content=f"📁 **{element.name}** salvato (supportati: .pdf, .txt)"
).send()
continue
# Indicizza su Qdrant
if content:
success = await index_document(element.name, content)
if success:
word_count = len(content.split())
await cl.Message(
content=f"✅ **{element.name}** indicizzato in Qdrant\n"
f"📊 Parole estratte: {word_count:,}"
).send()
else:
await cl.Message(
content=f"⚠️ Errore indicizzazione **{element.name}**"
).send()
except Exception as e:
await cl.Message(
content=f"❌ Errore con **{element.name}**: {str(e)}"
).send()