ai-station/bck/app-final.py

360 lines
12 KiB
Python
Raw Permalink Normal View History

import os
import re
import uuid
import shutil
from datetime import datetime
from typing import Optional
import chainlit as cl
import ollama
import fitz # PyMuPDF
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import PointStruct, Distance, VectorParams
from chainlit.data.sql_alchemy import SQLAlchemyDataLayer
# === CONFIGURAZIONE ===
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://ai_user:secure_password_here@postgres:5432/ai_station")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.1.243:11434")
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333")
# === INIZIALIZZAZIONE DATA LAYER ===
try:
data_layer = SQLAlchemyDataLayer(conninfo=DATABASE_URL)
cl.data_layer = data_layer
print("✅ SQLAlchemyDataLayer initialized successfully")
except Exception as e:
print(f"❌ Failed to initialize data layer: {e}")
cl.data_layer = None
WORKSPACES_DIR = "./workspaces"
USER_ROLE = "admin"
# === UTILITY FUNCTIONS ===
def create_workspace(user_role: str):
"""Crea directory workspace se non esiste"""
workspace_path = os.path.join(WORKSPACES_DIR, user_role)
os.makedirs(workspace_path, exist_ok=True)
return workspace_path
def save_code_to_file(code: str, user_role: str) -> str:
"""Salva blocco codice come file .py"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"code_{timestamp}.py"
file_path = os.path.join(WORKSPACES_DIR, user_role, file_name)
with open(file_path, "w", encoding="utf-8") as f:
f.write(code)
return file_path
def extract_text_from_pdf(pdf_path: str) -> str:
"""Estrae testo da PDF usando PyMuPDF"""
try:
doc = fitz.open(pdf_path)
text_parts = []
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text()
text_parts.append(f"--- Pagina {page_num + 1} ---\n{text}\n")
doc.close()
return "\n".join(text_parts)
except Exception as e:
print(f"❌ Errore estrazione PDF: {e}")
return ""
# === QDRANT FUNCTIONS ===
async def get_qdrant_client() -> AsyncQdrantClient:
"""Connessione a Qdrant"""
client = AsyncQdrantClient(url=QDRANT_URL)
collection_name = "documents"
# Crea collection se non esiste
if not await client.collection_exists(collection_name):
await client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(size=768, distance=Distance.COSINE)
)
return client
async def get_embeddings(text: str) -> list:
"""Genera embeddings con Ollama"""
client = ollama.Client(host=OLLAMA_URL)
# Limita lunghezza per evitare errori
max_length = 2000
if len(text) > max_length:
text = text[:max_length]
try:
response = client.embed(model='nomic-embed-text', input=text)
if 'embeddings' in response:
return response['embeddings'][0]
return response.get('embedding', [])
except Exception as e:
print(f"❌ Errore Embedding: {e}")
return []
async def index_document(file_name: str, content: str) -> bool:
"""Indicizza documento su Qdrant"""
try:
# Suddividi documento lungo in chunks
chunks = chunk_text(content, max_length=1500)
qdrant_client = await get_qdrant_client()
points = []
for i, chunk in enumerate(chunks):
embeddings = await get_embeddings(chunk)
if not embeddings:
continue
point_id = str(uuid.uuid4())
point = PointStruct(
id=point_id,
vector=embeddings,
payload={
"file_name": file_name,
"content": chunk,
"chunk_index": i,
"total_chunks": len(chunks),
"indexed_at": datetime.now().isoformat()
}
)
points.append(point)
if points:
await qdrant_client.upsert(collection_name="documents", points=points)
return True
return False
except Exception as e:
print(f"❌ Errore indicizzazione: {e}")
return False
def chunk_text(text: str, max_length: int = 1500, overlap: int = 200) -> list:
"""Divide testo in chunks con overlap"""
if len(text) <= max_length:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + max_length
# Cerca l'ultimo punto/newline prima del limite
if end < len(text):
last_period = text.rfind('.', start, end)
last_newline = text.rfind('\n', start, end)
split_point = max(last_period, last_newline)
if split_point > start:
end = split_point + 1
chunks.append(text[start:end].strip())
start = end - overlap # Overlap per continuità
return chunks
async def search_qdrant(query_text: str, limit: int = 5) -> str:
"""Ricerca documenti rilevanti"""
try:
qdrant_client = await get_qdrant_client()
query_embedding = await get_embeddings(query_text)
if not query_embedding:
return ""
search_result = await qdrant_client.query_points(
collection_name="documents",
query=query_embedding,
limit=limit
)
contexts = []
seen_files = set()
for hit in search_result.points:
if hit.payload:
file_name = hit.payload.get('file_name', 'Unknown')
content = hit.payload.get('content', '')
chunk_idx = hit.payload.get('chunk_index', 0)
score = hit.score if hasattr(hit, 'score') else 0
# Evita duplicati dello stesso file
file_key = f"{file_name}_{chunk_idx}"
if file_key not in seen_files:
seen_files.add(file_key)
contexts.append(
f"📄 **{file_name}** (chunk {chunk_idx+1}, score: {score:.2f})\n"
f"```\n{content[:600]}...\n```"
)
return "\n\n".join(contexts) if contexts else ""
except Exception as e:
print(f"❌ Errore ricerca Qdrant: {e}")
return ""
# === CHAINLIT HANDLERS ===
@cl.on_chat_start
async def on_chat_start():
"""Inizializzazione chat"""
create_workspace(USER_ROLE)
# Imposta variabili sessione
cl.user_session.set("role", USER_ROLE)
# Verifica persistenza
persistence_status = "✅ Attiva" if cl.data_layer else "⚠️ Disattivata"
await cl.Message(
content=f"🚀 **AI Station Ready** - Workspace: `{USER_ROLE}`\n\n"
f"📤 Upload **PDF** o **.txt** per indicizzarli nel RAG\n"
f"💾 Persistenza conversazioni: {persistence_status}\n"
f"🤖 Modello: `glm-4.6:cloud` @ {OLLAMA_URL}\n\n"
f"💡 **Supporto PDF attivo**: Carica fatture, F24, dichiarazioni fiscali!"
).send()
@cl.on_message
async def on_message(message: cl.Message):
"""Gestione messaggi utente"""
user_role = cl.user_session.get("role", "guest")
try:
# === STEP 1: Gestione Upload ===
if message.elements:
await handle_file_uploads(message.elements, user_role)
# === STEP 2: RAG Search ===
context_text = await search_qdrant(message.content, limit=5)
# === STEP 3: Preparazione Prompt ===
system_prompt = (
"Sei un assistente AI esperto in analisi documentale e fiscale. "
"Usa ESCLUSIVAMENTE il seguente contesto per rispondere. "
"Se la risposta non è nel contesto, dillo chiaramente."
)
if context_text:
full_prompt = f"{system_prompt}\n\n**CONTESTO DOCUMENTI:**\n{context_text}\n\n**DOMANDA UTENTE:**\n{message.content}"
else:
full_prompt = f"{system_prompt}\n\n**DOMANDA UTENTE:**\n{message.content}"
# === STEP 4: Usa glm-4.6:cloud ===
client = ollama.Client(host=OLLAMA_URL)
msg = cl.Message(content="")
await msg.send()
messages = [{"role": "user", "content": full_prompt}]
stream = client.chat(
model='glm-4.6:cloud',
messages=messages,
stream=True
)
full_response = ""
for chunk in stream:
content = chunk['message']['content']
full_response += content
await msg.stream_token(content)
await msg.update()
# === STEP 5: Estrai e Salva Codice ===
code_blocks = re.findall(r"```python\n(.*?)```", full_response, re.DOTALL)
if code_blocks:
elements = []
for code in code_blocks:
file_path = save_code_to_file(code.strip(), user_role)
elements.append(
cl.File(
name=os.path.basename(file_path),
path=file_path,
display="inline"
)
)
await cl.Message(
content=f"💾 Codice salvato in `{user_role}/`",
elements=elements
).send()
except Exception as e:
await cl.Message(content=f"❌ **Errore:** {str(e)}").send()
async def handle_file_uploads(elements, user_role: str):
"""Gestisce upload e indicizzazione file (TXT e PDF)"""
for element in elements:
try:
# Salva file
dest_path = os.path.join(WORKSPACES_DIR, user_role, element.name)
shutil.copy(element.path, dest_path)
content = None
# Estrai testo in base al tipo di file
if element.name.lower().endswith('.pdf'):
await cl.Message(content=f"📄 Elaborazione PDF **{element.name}**...").send()
content = extract_text_from_pdf(dest_path)
if not content:
await cl.Message(
content=f"⚠️ **{element.name}**: PDF vuoto o non leggibile"
).send()
continue
elif element.name.lower().endswith('.txt'):
with open(dest_path, 'r', encoding='utf-8') as f:
content = f.read()
else:
await cl.Message(
content=f"📁 **{element.name}** salvato (supportati: .pdf, .txt)"
).send()
continue
# Indicizza su Qdrant
if content:
success = await index_document(element.name, content)
if success:
word_count = len(content.split())
await cl.Message(
content=f"✅ **{element.name}** indicizzato in Qdrant\n"
f"📊 Parole estratte: {word_count:,}"
).send()
else:
await cl.Message(
content=f"⚠️ Errore indicizzazione **{element.name}**"
).send()
except Exception as e:
await cl.Message(
content=f"❌ Errore con **{element.name}**: {str(e)}"
).send()