import os import chainlit as cl import re from datetime import datetime import shutil import uuid from qdrant_client import QdrantClient from qdrant_client.http.models import PointStruct # Define user roles mapping USER_ROLES = { 'moglie@esempio.com': 'business', 'ingegnere@esempio.com': 'engineering', 'architetto@esempio.com': 'architecture', 'admin@esempio.com': 'admin' } # Define the path for workspaces WORKSPACES_DIR = "./workspaces" def create_workspace(user_role): workspace_path = os.path.join(WORKSPACES_DIR, user_role) if not os.path.exists(workspace_path): os.makedirs(workspace_path) def save_code_to_file(code, user_role): timestamp = datetime.now().strftime("%Y%m%d%H%M%S") file_name = f"code_{timestamp}.py" file_path = os.path.join(WORKSPACES_DIR, user_role, file_name) with open(file_path, "w") as file: file.write(code) return file_path def limit_history(history): if len(history) > 20: history = history[-20:] return history async def connect_to_qdrant(): client = QdrantClient("http://qdrant:6333") collection_name = "documents" try: client.get_collection(collection_name) except Exception as e: client.create_collection( collection_name=collection_name, vectors_config={"size": 768, "distance": "Cosine"} ) return client async def get_embeddings(text): import ollama client = ollama.Client(os.getenv('OLLAMA_API_BASE', 'http://192.168.1.243:11434')) response = client.embed(model='nomic-embed-text', input=text) return response['embedding'] @cl.on_chat_start async def chat_start(): # Set the user's email to a hardcoded value for testing purposes user_email = "admin@esempio.com" # Determine the user's role based on the email user_role = USER_ROLES.get(user_email, 'guest') # Create workspace directory if it doesn't exist create_workspace(user_role) # Initialize history in the session cl.user_session.set("history", []) # Set the user's role in the session cl.user_session.set("role", user_role) # Send a welcome message based on the user's role if user_role == 'admin': await cl.Message(content="Welcome, Admin!").send() elif user_role == 'engineering': await cl.Message(content="Welcome, Engineer!").send() elif user_role == 'business': await cl.Message(content="Welcome, Business User!").send() elif user_role == 'architecture': await cl.Message(content="Welcome, Architect!").send() else: await cl.Message(content="Welcome, Guest!").send() @cl.on_message async def message(message): # Retrieve the user's role from the session user_role = cl.user_session.get("role", 'guest') if not user_role: await cl.Message(content="User role not found").send() return # Initialize the Ollama client using the environment variable ollama_api_base = os.getenv('OLLAMA_API_BASE', 'http://192.168.1.243:11434') try: import ollama client = ollama.Client(ollama_api_base) # Retrieve the history from the session and limit it history = cl.user_session.get("history", []) history = limit_history(history) # Append the new user message to the history history.append({"role": "user", "content": message.content}) # Check if there are any elements in the message if message.elements: uploaded_files = [] for element in message.elements: try: # Save the content of the file temp_file_path = element.path destination_file_path = os.path.join(WORKSPACES_DIR, user_role, element.name) with open(temp_file_path, 'rb') as src, open(destination_file_path, 'wb') as dst: shutil.copyfileobj(src, dst) uploaded_files.append(element.name) except Exception as e: await cl.Message(content=f"Error saving {element.name}: {e}").send() if uploaded_files: await cl.Message(content=f"Files uploaded and saved: {', '.join(uploaded_files)}").send() # Call the model and get the response response = client.chat(model='qwen2.5-coder:7b', messages=history) # Extract code blocks from the AI response code_blocks = re.findall(r"```python(.*?)```", response['message']['content'], re.DOTALL) elements = [] if code_blocks: for code in code_blocks: file_path = save_code_to_file(code, user_role) elements.append(cl.File(name=os.path.basename(file_path), path=file_path)) # Append the AI response to the history history.append({"role": "assistant", "content": response['message']['content']}) # Save the updated history in the session cl.user_session.set("history", history) # Handle text files and index them in Qdrant for element in message.elements: if element.name.endswith('.txt'): with open(element.path, 'r') as f: content = f.read() embeddings = await get_embeddings(content) qdrant_client = await connect_to_qdrant() point_id = uuid.uuid4() point = PointStruct(id=point_id, vector=embeddings, payload={"file_name": element.name}) qdrant_client.upsert(collection_name="documents", points=[point]) await cl.Message(content=f"Documento indicizzato con successo su Qdrant.").send() # Send the final message including both text and files await cl.Message(content=response['message']['content'], elements=elements).send() except Exception as e: await cl.Message(content=f"Error: {e}").send()