import os import chainlit as cl import re from datetime import datetime import shutil import uuid import ollama from qdrant_client import QdrantClient, models # --- CONFIGURAZIONE --- USER_ROLES = { 'moglie@esempio.com': 'business', 'ingegnere@esempio.com': 'engineering', 'architetto@esempio.com': 'architecture', 'admin@esempio.com': 'admin' } WORKSPACES_DIR = "./workspaces" # URL Config OLLAMA_URL = os.getenv('OLLAMA_API_BASE', 'http://192.168.1.243:11434') QDRANT_URL = "http://qdrant:6333" # Nome del servizio nel docker-compose # Client Globali aclient = ollama.AsyncClient(host=OLLAMA_URL) # Per la chat (veloce) # Client sincrono per embedding (più stabile per operazioni batch) embed_client = ollama.Client(host=OLLAMA_URL) # --- FUNZIONI UTILITY --- def create_workspace(user_role): path = os.path.join(WORKSPACES_DIR, user_role) os.makedirs(path, exist_ok=True) def save_code_to_file(code, user_role): timestamp = datetime.now().strftime("%Y%m%d%H%M%S") file_name = f"code_{timestamp}.py" file_path = os.path.join(WORKSPACES_DIR, user_role, file_name) with open(file_path, "w", encoding="utf-8") as file: file.write(code) return file_path def get_qdrant_client(): return QdrantClient(url=QDRANT_URL) def ensure_collection(client): try: client.get_collection("documents") except: client.create_collection( collection_name="documents", vectors_config=models.VectorParams(size=768, distance=models.Distance.COSINE) ) def get_embeddings(text): # Taglia il testo se troppo lungo per evitare errori (max safe context) text = text[:8000] response = embed_client.embed(model='nomic-embed-text', input=text) # Gestisce diversi formati di risposta delle versioni Ollama if 'embeddings' in response: return response['embeddings'][0] return response['embedding'] # --- LOGICA CHAT --- @cl.on_chat_start async def chat_start(): user_email = "admin@esempio.com" # In prod, prendilo dagli header/auth user_role = USER_ROLES.get(user_email, 'guest') create_workspace(user_role) cl.user_session.set("history", []) cl.user_session.set("role", user_role) # Inizializza Qdrant all'avvio try: q_client = get_qdrant_client() ensure_collection(q_client) status_msg = "✅ System ready. Qdrant connected." except Exception as e: status_msg = f"⚠️ System ready, but Qdrant error: {e}" await cl.Message(content=f"Welcome {user_role}! {status_msg}").send() @cl.on_message async def message(message: cl.Message): user_role = cl.user_session.get("role", 'guest') history = cl.user_session.get("history", []) # 1. GESTIONE FILE CARICATI (RAG) if message.elements: processing_msg = cl.Message(content="⚙️ Elaborazione file in corso...") await processing_msg.send() q_client = get_qdrant_client() uploaded_files = [] for element in message.elements: # Salva su disco dest_path = os.path.join(WORKSPACES_DIR, user_role, element.name) shutil.copyfile(element.path, dest_path) uploaded_files.append(element.name) # Se è testo, indicizza su Qdrant if element.name.endswith('.txt') or element.name.endswith('.md') or element.name.endswith('.py'): try: with open(dest_path, 'r', encoding='utf-8') as f: content = f.read() vector = get_embeddings(content) point_id = str(uuid.uuid4()) q_client.upsert( collection_name="documents", points=[models.PointStruct( id=point_id, vector=vector, payload={"filename": element.name, "text": content[:500]} # Salviamo un'anteprima )] ) except Exception as e: await cl.Message(content=f"❌ Errore indicizzazione {element.name}: {e}").send() await processing_msg.remove() await cl.Message(content=f"📂 File salvati e indicizzati: {', '.join(uploaded_files)}").send() # 2. AGGIORNA STORIA E CHAT history.append({"role": "user", "content": message.content}) msg = cl.Message(content="") await msg.send() full_response = "" # Streaming della risposta try: async for part in await aclient.chat(model='qwen2.5-coder:7b', messages=history, stream=True): token = part['message']['content'] full_response += token await msg.stream_token(token) await msg.update() # 3. ESTRAZIONE CODICE (Salvataggio automatico) code_blocks = re.findall(r"```python(.*?)```", full_response, re.DOTALL) if code_blocks: files_generated = [] for code in code_blocks: code = code.strip() if code: path = save_code_to_file(code, user_role) files_generated.append(cl.File(name=os.path.basename(path), path=path)) if files_generated: await cl.Message(content="💾 Ho estratto il codice per te:", elements=files_generated).send() # Aggiorna storia history.append({"role": "assistant", "content": full_response}) cl.user_session.set("history", history) except Exception as e: await cl.Message(content=f"❌ Errore generazione AI: {e}").send()