Compare commits

..

1 Commits

Author SHA1 Message Date
AI Station Server 7095e83b41 Chore: Aggiunto .files al gitignore e rimosso tracking dati locali 2025-12-30 17:19:30 +01:00
12 changed files with 199 additions and 230 deletions

4
.gitignore vendored
View File

@ -3,4 +3,6 @@ __pycache__/
*.pyc *.pyc
.aider* .aider*
workspaces/* workspaces/*
qdrant_storage/ qdrant_storage/.files/
__pycache__/
.env

419
app.py
View File

@ -2,19 +2,17 @@ import os
import re import re
import uuid import uuid
import shutil import shutil
import pandas as pd # NUOVO: Gestione Excel
import httpx # NUOVO: Chiamate API Remote
from datetime import datetime from datetime import datetime
from typing import Optional, Dict, List from typing import Optional, Dict, List
import chainlit as cl import chainlit as cl
import ollama import ollama
import fitz # PyMuPDF import fitz # PyMuPDF
from qdrant_client import AsyncQdrantClient from qdrant_client import AsyncQdrantClient
from qdrant_client import models from qdrant_client.models import PointStruct, Distance, VectorParams
from qdrant_client.models import PointStruct, Distance, VectorParams, SparseVectorParams, SparseIndexParams
from chainlit.data.sql_alchemy import SQLAlchemyDataLayer from chainlit.data.sql_alchemy import SQLAlchemyDataLayer
# === FIX IMPORT === # === FIX IMPORT ROBUSTO ===
# Gestisce le differenze tra le versioni di Chainlit 2.x
try: try:
from chainlit.data.storage_clients import BaseStorageClient from chainlit.data.storage_clients import BaseStorageClient
except ImportError: except ImportError:
@ -25,11 +23,8 @@ except ImportError:
# === CONFIGURAZIONE === # === CONFIGURAZIONE ===
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://ai_user:secure_password_here@postgres:5432/ai_station") DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://ai_user:secure_password_here@postgres:5432/ai_station")
# PUNTANO AL SERVER .243 (Il "Cervello")
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.1.243:11434") OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.1.243:11434")
BGE_API_URL = os.getenv("BGE_API_URL", "http://192.168.1.243:8001")
QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333") QDRANT_URL = os.getenv("QDRANT_URL", "http://qdrant:6333")
WORKSPACES_DIR = "./workspaces" WORKSPACES_DIR = "./workspaces"
STORAGE_DIR = "./.files" STORAGE_DIR = "./.files"
@ -80,228 +75,178 @@ USER_PROFILES = {
} }
} }
# === STORAGE CLIENT === # === CUSTOM LOCAL STORAGE CLIENT (FIXED) ===# Questa classe ora implementa tutti i metodi astratti richiesti da Chainlit 2.8.3
class LocalStorageClient(BaseStorageClient): class LocalStorageClient(BaseStorageClient):
"""Storage locale su filesystem per file/elementi"""
def __init__(self, storage_path: str): def __init__(self, storage_path: str):
self.storage_path = storage_path self.storage_path = storage_path
os.makedirs(storage_path, exist_ok=True) os.makedirs(storage_path, exist_ok=True)
async def upload_file(self, object_key: str, data: bytes, mime: str = "application/octet-stream", overwrite: bool = True) -> Dict[str, str]: async def upload_file(
self,
object_key: str,
data: bytes,
mime: str = "application/octet-stream",
overwrite: bool = True,
) -> Dict[str, str]:
file_path = os.path.join(self.storage_path, object_key) file_path = os.path.join(self.storage_path, object_key)
os.makedirs(os.path.dirname(file_path), exist_ok=True) os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as f: f.write(data) with open(file_path, "wb") as f:
f.write(data)
return {"object_key": object_key, "url": f"/files/{object_key}"} return {"object_key": object_key, "url": f"/files/{object_key}"}
async def get_read_url(self, object_key: str) -> str: return f"/files/{object_key}" # Implementazione metodi obbligatori mancanti nella versione precedente
async def delete_file(self, object_key: str) -> bool: async def get_read_url(self, object_key: str) -> str:
path = os.path.join(self.storage_path, object_key) return f"/files/{object_key}"
if os.path.exists(path): os.remove(path); return True
return False
async def close(self): pass
async def delete_file(self, object_key: str) -> bool:
file_path = os.path.join(self.storage_path, object_key)
if os.path.exists(file_path):
os.remove(file_path)
return True
return False
async def close(self):
pass
# === DATA LAYER ===
@cl.data_layer @cl.data_layer
def get_data_layer(): def get_data_layer():
return SQLAlchemyDataLayer(conninfo=DATABASE_URL, user_thread_limit=1000, storage_provider=LocalStorageClient(STORAGE_DIR)) return SQLAlchemyDataLayer(
conninfo=DATABASE_URL,
user_thread_limit=1000,
storage_provider=LocalStorageClient(storage_path=STORAGE_DIR)
)
# === OAUTH === # === OAUTH CALLBACK ===
@cl.oauth_callback @cl.oauth_callback
def oauth_callback(provider_id: str, token: str, raw_user_data: Dict[str, str], default_user: cl.User) -> Optional[cl.User]: def oauth_callback(
provider_id: str,
token: str,
raw_user_data: Dict[str, str],
default_user: cl.User,
) -> Optional[cl.User]:
if provider_id == "google": if provider_id == "google":
email = raw_user_data.get("email", "").lower() email = raw_user_data.get("email", "").lower()
# Verifica se utente è autorizzato (opzionale: blocca se non in lista)
# if email not in USER_PROFILES:
# return None
# Recupera profilo o usa default Guest
profile = USER_PROFILES.get(email, get_user_profile("guest")) profile = USER_PROFILES.get(email, get_user_profile("guest"))
default_user.metadata.update({ default_user.metadata.update({
"picture": raw_user_data.get("picture", ""), "picture": raw_user_data.get("picture", ""),
"role": profile["role"], "workspace": profile["workspace"], "role": profile["role"],
"rag_collection": profile["rag_collection"], "capabilities": profile["capabilities"], "workspace": profile["workspace"],
"show_code": profile["show_code"], "display_name": profile["name"] "rag_collection": profile["rag_collection"],
"capabilities": profile["capabilities"],
"show_code": profile["show_code"],
"display_name": profile["name"]
}) })
return default_user return default_user
return default_user return default_user
def get_user_profile(email: str) -> Dict: # === UTILITY FUNCTIONS ===
return USER_PROFILES.get(email.lower(), {"role": "guest", "name": "Ospite", "workspace": "guest_workspace", "rag_collection": "documents", "show_code": False}) def get_user_profile(user_email: str) -> Dict:
return USER_PROFILES.get(user_email.lower(), {
"role": "guest",
"name": "Ospite",
"workspace": "guest_workspace",
"rag_collection": "documents",
"capabilities": [],
"show_code": False
})
def create_workspace(name: str) -> str: def create_workspace(workspace_name: str) -> str:
path = os.path.join(WORKSPACES_DIR, name) path = os.path.join(WORKSPACES_DIR, workspace_name)
os.makedirs(path, exist_ok=True) os.makedirs(path, exist_ok=True)
return path return path
def save_code_to_file(code: str, workspace: str) -> str: def save_code_to_file(code: str, workspace: str) -> str:
ts = datetime.now().strftime("%Y%m%d_%H%M%S") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
path = os.path.join(WORKSPACES_DIR, workspace, f"code_{ts}.py") file_name = f"code_{timestamp}.py"
with open(path, "w", encoding="utf-8") as f: f.write(code) file_path = os.path.join(WORKSPACES_DIR, workspace, file_name)
return path with open(file_path, "w", encoding="utf-8") as f:
f.write(code)
return file_path
# === PARSING DOCUMENTI === def extract_text_from_pdf(pdf_path: str) -> str:
def extract_text_from_pdf(path: str) -> str:
try: try:
doc = fitz.open(path) doc = fitz.open(pdf_path)
return "\n".join([page.get_text() for page in doc]) text = "\n".join([page.get_text() for page in doc])
except: return "" doc.close()
return text
def extract_text_from_excel(path: str) -> str: except Exception:
"""Estrae testo da Excel convertendo i fogli in Markdown"""
try:
xl = pd.read_excel(path, sheet_name=None)
text_content = []
for sheet, df in xl.items():
text_content.append(f"\n--- Foglio Excel: {sheet} ---\n")
# Pulisce NaN e converte in stringa
clean_df = df.fillna("").astype(str)
text_content.append(clean_df.to_markdown(index=False))
return "\n".join(text_content)
except Exception as e:
print(f"❌ Errore Excel: {e}")
return "" return ""
# === AI & EMBEDDINGS (Remoto) === # === QDRANT FUNCTIONS ===
async def get_embeddings(text: str) -> dict: async def get_qdrant_client() -> AsyncQdrantClient:
"""Ritorna dict con keys 'dense' e 'sparse'""" return AsyncQdrantClient(url=QDRANT_URL)
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{BGE_API_URL}/embed",
json={"texts": [text]}
)
if resp.status_code == 200:
data = resp.json()
# La nuova API ritorna {"data": [{"dense":..., "sparse":...}]}
return data["data"][0]
except Exception as e:
print(f"⚠️ Errore Embedding: {e}")
return {}
async def ensure_collection(name: str): async def ensure_collection(collection_name: str):
client = AsyncQdrantClient(url=QDRANT_URL) client = await get_qdrant_client()
if not await client.collection_exists(name): if not await client.collection_exists(collection_name):
await client.create_collection( await client.create_collection(
collection_name=name, collection_name=collection_name,
vectors_config={ vectors_config=VectorParams(size=768, distance=Distance.COSINE)
"bge_dense": VectorParams(size=1024, distance=Distance.COSINE)
},
# ABILITIAMO LO SPARSE VECTOR
sparse_vectors_config={
"bge_sparse": SparseVectorParams(
index=SparseIndexParams(
on_disk=False, # True se hai poca RAM, ma hai 32GB quindi False è meglio
)
)
}
) )
def chunk_text_by_lines(text: str, max_chars: int = 2500) -> List[str]: async def get_embeddings(text: str) -> list:
"""Taglia il testo raggruppando linee intere senza spezzarle.""" client = ollama.Client(host=OLLAMA_URL)
lines = text.split('\n')
chunks = []
current_chunk = ""
for line in lines:
# Se la riga è troppo lunga da sola (caso raro), la tagliamo
if len(line) > max_chars:
if current_chunk: chunks.append(current_chunk)
chunks.append(line[:max_chars])
current_chunk = ""
continue
# Se aggiungere la riga supera il limite, salviamo il chunk attuale
if len(current_chunk) + len(line) > max_chars:
chunks.append(current_chunk)
current_chunk = line + "\n"
else:
current_chunk += line + "\n"
if current_chunk:
chunks.append(current_chunk)
return chunks
async def index_document(filename: str, content: str, collection: str) -> bool:
try: try:
await ensure_collection(collection) response = client.embed(model='nomic-embed-text', input=text[:2000])
if 'embeddings' in response: return response['embeddings'][0]
# --- MODIFICA QUI: Usiamo il chunking intelligente invece di quello brutale --- return response.get('embedding', [])
# Vecchio: chunks = [content[i:i+3000] for i in range(0, len(content), 3000)] except: return []
chunks = chunk_text_by_lines(content, max_chars=2000)
# ---------------------------------------------------------------------------
qdrant = AsyncQdrantClient(url=QDRANT_URL) async def index_document(file_name: str, content: str, collection_name: str) -> bool:
points = [] try:
await ensure_collection(collection_name)
embedding = await get_embeddings(content)
if not embedding: return False
for i, chunk in enumerate(chunks): qdrant = await get_qdrant_client()
vectors = await get_embeddings(chunk) await qdrant.upsert(
collection_name=collection_name,
if vectors: points=[PointStruct(
points.append(PointStruct( id=str(uuid.uuid4()),
id=str(uuid.uuid4()), vector=embedding,
vector={ payload={"file_name": file_name, "content": content[:3000], "indexed_at": datetime.now().isoformat()}
"bge_dense": vectors["dense"], )]
"bge_sparse": models.SparseVector( )
indices=vectors["sparse"]["indices"], return True
values=vectors["sparse"]["values"] except: return False
)
},
payload={"file_name": filename, "content": chunk, "chunk_id": i}
))
if points:
await qdrant.upsert(collection_name=collection, points=points)
return True
except Exception as e:
print(f"Index Error: {e}")
return False
async def search_qdrant(query: str, collection: str) -> str: async def search_qdrant(query: str, collection: str) -> str:
try: try:
client = AsyncQdrantClient(url=QDRANT_URL) client = await get_qdrant_client()
if not await client.collection_exists(collection): return "" if not await client.collection_exists(collection): return ""
emb = await get_embeddings(query)
vectors = await get_embeddings(query) if not emb: return ""
if not vectors: return "" res = await client.query_points(collection_name=collection, query=emb, limit=3)
return "\n\n".join([hit.payload['content'] for hit in res.points if hit.payload])
# HYBRID QUERY (RRF FUSION) except: return ""
res = await client.query_points(
collection_name=collection, # === CHAINLIT HANDLERS ===
prefetch=[
models.Prefetch(
query=vectors["dense"],
using="bge_dense",
limit=10,
),
models.Prefetch(
query=models.SparseVector(
indices=vectors["sparse"]["indices"],
values=vectors["sparse"]["values"]
),
using="bge_sparse",
limit=20,
),
],
# --- CORREZIONE QUI SOTTO (da 'method' a 'fusion') ---
query=models.FusionQuery(fusion=models.Fusion.RRF),
limit=12
)
return "\n\n".join([f"📄 {hit.payload['file_name']}:\n{hit.payload['content']}" for hit in res.points if hit.payload])
except Exception as e:
print(f"Search Error: {e}") # Questo è quello che vedevi nei log
return ""
return "\n\n".join([f"📄 {hit.payload['file_name']}:\n{hit.payload['content']}" for hit in res.points if hit.payload])
except Exception as e:
print(f"Search Error: {e}")
return ""
# === CHAT LOGIC ===
@cl.on_chat_start @cl.on_chat_start
async def on_chat_start(): async def on_chat_start():
user = cl.user_session.get("user") user = cl.user_session.get("user")
if not user: if not user:
email = "guest@local" # Fallback locale se non c'è auth
profile = get_user_profile(email) user_email = "guest@local"
profile = get_user_profile(user_email)
else: else:
email = user.identifier user_email = user.identifier
profile = USER_PROFILES.get(email, get_user_profile("guest")) # I metadati sono già popolati dalla callback oauth
profile = USER_PROFILES.get(user_email, get_user_profile("guest"))
cl.user_session.set("email", email) # Salva in sessione
cl.user_session.set("email", user_email)
cl.user_session.set("role", profile["role"]) cl.user_session.set("role", profile["role"])
cl.user_session.set("workspace", profile["workspace"]) cl.user_session.set("workspace", profile["workspace"])
cl.user_session.set("rag_collection", profile["rag_collection"]) cl.user_session.set("rag_collection", profile["rag_collection"])
@ -309,65 +254,91 @@ async def on_chat_start():
create_workspace(profile["workspace"]) create_workspace(profile["workspace"])
settings = [ # === SETTINGS WIDGETS ===
cl.input_widget.Select(id="model", label="Modello", values=["glm-4.6:cloud", "llama3"], initial_value="glm-4.6:cloud"), settings_widgets = [
cl.input_widget.Slider(id="temp", label="Temperatura", initial=0.5, min=0, max=1, step=0.1) cl.input_widget.Select(
id="model",
label="Modello AI",
values=["glm-4.6:cloud", "llama3.2", "mistral", "qwen2.5-coder:32b"],
initial_value="glm-4.6:cloud",
),
cl.input_widget.Slider(
id="temperature",
label="Temperatura",
initial=0.7, min=0, max=2, step=0.1,
),
] ]
if profile["role"] == "admin": if profile["role"] == "admin":
settings.append(cl.input_widget.Switch(id="rag", label="RAG Attivo", initial=True)) settings_widgets.append(cl.input_widget.Switch(id="rag_enabled", label="Abilita RAG", initial=True))
await cl.ChatSettings(settings).send() await cl.ChatSettings(settings_widgets).send()
await cl.Message(content=f"👋 Ciao **{profile['name']}**! Pronto per l'automazione.").send()
await cl.Message(
content=f"👋 Ciao **{profile['name']}**!\n"
f"Ruolo: `{profile['role']}` | Workspace: `{profile['workspace']}`\n"
).send()
@cl.on_settings_update @cl.on_settings_update
async def on_settings_update(s): cl.user_session.set("settings", s) async def on_settings_update(settings):
cl.user_session.set("settings", settings)
await cl.Message(content="✅ Impostazioni aggiornate").send()
@cl.on_message @cl.on_message
async def on_message(message: cl.Message): async def on_message(message: cl.Message):
workspace = cl.user_session.get("workspace") workspace = cl.user_session.get("workspace")
rag_collection = cl.user_session.get("rag_collection") rag_collection = cl.user_session.get("rag_collection")
role = cl.user_session.get("role") user_role = cl.user_session.get("role")
show_code = cl.user_session.get("show_code")
settings = cl.user_session.get("settings", {}) settings = cl.user_session.get("settings", {})
model = settings.get("model", "glm-4.6:cloud")
# 1. FILE UPLOAD (PDF & EXCEL) temperature = settings.get("temperature", 0.7)
if message.elements: rag_enabled = settings.get("rag_enabled", True) if user_role == "admin" else True
for el in message.elements:
dest = os.path.join(WORKSPACES_DIR, workspace, el.name)
shutil.copy(el.path, dest)
content = ""
if el.name.endswith(".pdf"):
content = extract_text_from_pdf(dest)
elif el.name.endswith((".xlsx", ".xls")):
await cl.Message(content=f"📊 Analisi Excel **{el.name}**...").send()
content = extract_text_from_excel(dest)
if content:
ok = await index_document(el.name, content, rag_collection)
icon = "" if ok else ""
await cl.Message(content=f"{icon} **{el.name}** elaborato.").send()
# 2. RAG & GENERATION # 1. GESTIONE FILE
rag_active = settings.get("rag", True) if role == "admin" else True if message.elements:
context = await search_qdrant(message.content, rag_collection) if rag_active else "" for element in message.elements:
dest = os.path.join(WORKSPACES_DIR, workspace, element.name)
shutil.copy(element.path, dest)
if element.name.endswith(".pdf"):
text = extract_text_from_pdf(dest)
if text:
await index_document(element.name, text, rag_collection)
await cl.Message(content=f"✅ **{element.name}** indicizzato.").send()
# 2. RAG
context = ""
if rag_enabled:
context = await search_qdrant(message.content, rag_collection)
prompt = "Sei un esperto di automazione industriale." system_prompt = "Sei un assistente esperto."
if context: prompt += f"\n\nUSA QUESTO CONTESTO (Manuali/Excel):\n{context}" if context: system_prompt += f"\n\nCONTESTO:\n{context}"
# 3. GENERAZIONE
client = ollama.AsyncClient(host=OLLAMA_URL)
msg = cl.Message(content="") msg = cl.Message(content="")
await msg.send() await msg.send()
try: stream = await client.chat(
client = ollama.AsyncClient(host=OLLAMA_URL) model=model,
stream = await client.chat( messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": message.content}],
model=settings.get("model", "glm-4.6:cloud"), options={"temperature": temperature},
messages=[{"role": "system", "content": prompt}, {"role": "user", "content": message.content}], stream=True
options={"temperature": settings.get("temp", 0.5)}, )
stream=True
)
async for chunk in stream:
await msg.stream_token(chunk['message']['content'])
except Exception as e:
await msg.stream_token(f"Errore connessione AI: {e}")
await msg.update() full_resp = ""
async for chunk in stream:
token = chunk['message']['content']
full_resp += token
await msg.stream_token(token)
await msg.update()
# 4. SALVATAGGIO CODICE
if show_code:
blocks = re.findall(r"``````", full_resp, re.DOTALL)
elements = []
for code in blocks:
path = save_code_to_file(code.strip(), workspace)
elements.append(cl.File(name=os.path.basename(path), path=path, display="inline"))
if elements:
await cl.Message(content="💾 Codice salvato", elements=elements).send()

View File

@ -26,8 +26,4 @@ aiofiles>=23.0.0
sniffio sniffio
aiohttp aiohttp
boto3>=1.28.0 boto3>=1.28.0
azure-storage-file-datalake>=12.14.0 azure-storage-file-datalake>=12.14.0
# NUOVI PER EXCEL
pandas
openpyxl
tabulate