Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import inspect | |
| import time | |
| import pandas as pd | |
| from smolagents import DuckDuckGoSearchTool | |
| import threading | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import json | |
| from huggingface_hub import InferenceClient | |
| import base64 | |
| from PIL import Image | |
| import io | |
| import tempfile | |
| import urllib.parse | |
| from pathlib import Path | |
| import re | |
| from bs4 import BeautifulSoup | |
| import mimetypes | |
| import tempfile | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- Global Cache for Answers --- | |
| cached_answers = {} | |
| cached_questions = [] | |
| processing_status = {"is_processing": False, "progress": 0, "total": 0} | |
| # simple search instrad of duck: | |
| class SimpleSearchTool: | |
| """ | |
| Simple search tool that scrapes DuckDuckGo HTML results. | |
| Drop-in replacement for DuckDuckGoSearchTool. | |
| """ | |
| def __init__(self): | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
| }) | |
| def run(self, query: str) -> str: | |
| """Search and return formatted results.""" | |
| try: | |
| # Encode query for URL | |
| encoded_query = urllib.parse.quote_plus(query) | |
| url = f"https://html.duckduckgo.com/html/?q={encoded_query}" | |
| response = self.session.get(url, timeout=10) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| results = [] | |
| # Find search result containers | |
| result_containers = soup.find_all('div', class_='result__body') | |
| for i, container in enumerate(result_containers[:5], 1): | |
| try: | |
| # Extract title and URL | |
| title_elem = container.find('a', class_='result__a') | |
| if not title_elem: | |
| continue | |
| title = title_elem.get_text().strip() | |
| url = title_elem.get('href', '') | |
| # Extract snippet | |
| snippet_elem = container.find('a', class_='result__snippet') | |
| snippet = snippet_elem.get_text().strip() if snippet_elem else '' | |
| if title and url: | |
| result = f"{i}. {title}\n URL: {url}\n" | |
| if snippet: | |
| result += f" Snippet: {snippet}\n" | |
| results.append(result) | |
| except Exception: | |
| continue | |
| return "\n".join(results) if results else "No search results found." | |
| except Exception as e: | |
| return f"Search failed: {str(e)}" | |
| # --- Excel Processing Tool --- | |
| class ExcelAnalysisTool: | |
| def __init__(self): | |
| pass | |
| def analyze_excel(self, file_path: str) -> str: | |
| """Extract and format Excel content for LLM context.""" | |
| try: | |
| # Read all sheets | |
| excel_data = pd.read_excel(file_path, sheet_name=None, nrows=100) | |
| result = [] | |
| result.append(f"EXCEL FILE ANALYSIS: {file_path}") | |
| for sheet_name, df in excel_data.items(): | |
| result.append(f"\nSHEET: {sheet_name}") | |
| result.append(f"Size: {df.shape[0]} rows × {df.shape[1]} columns") | |
| result.append(f"Columns: {list(df.columns)}") | |
| # Show first few rows | |
| if not df.empty: | |
| result.append("Sample data:") | |
| result.append(df.head(3).to_string(index=False)) | |
| return "\n".join(result) | |
| except Exception as e: | |
| return f"Excel analysis failed: {e}" | |
| # --- Web Content Fetcher --- | |
| class WebContentFetcher: | |
| def __init__(self, debug: bool = True): | |
| self.debug = debug | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| }) | |
| def extract_urls_from_text(self, text: str) -> List[str]: | |
| """Extract URLs from text using regex.""" | |
| url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' | |
| urls = re.findall(url_pattern, text) | |
| return list(set(urls)) # Remove duplicates | |
| def fetch_url_content(self, url: str) -> Dict[str, str]: | |
| """ | |
| Fetch content from a URL and extract text, handling different content types. | |
| Returns a dictionary with 'content', 'title', 'content_type', and 'error' keys. | |
| """ | |
| try: | |
| # Clean the URL | |
| url = url.strip() | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| if self.debug: | |
| print(f"Fetching URL: {url}") | |
| response = self.session.get(url, timeout=30, allow_redirects=True) | |
| response.raise_for_status() | |
| content_type = response.headers.get('content-type', '').lower() | |
| result = { | |
| 'url': url, | |
| 'content_type': content_type, | |
| 'title': '', | |
| 'content': '', | |
| 'error': None | |
| } | |
| # Handle different content types | |
| if 'text/html' in content_type: | |
| # Parse HTML content | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Extract title | |
| title_tag = soup.find('title') | |
| result['title'] = title_tag.get_text().strip() if title_tag else 'No title' | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| # Extract text content | |
| text_content = soup.get_text() | |
| # Clean up text | |
| lines = (line.strip() for line in text_content.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| text_content = ' '.join(chunk for chunk in chunks if chunk) | |
| # Limit content length | |
| if len(text_content) > 8000: | |
| text_content = text_content[:8000] + "... (truncated)" | |
| result['content'] = text_content | |
| elif 'text/plain' in content_type: | |
| # Handle plain text | |
| text_content = response.text | |
| if len(text_content) > 8000: | |
| text_content = text_content[:8000] + "... (truncated)" | |
| result['content'] = text_content | |
| result['title'] = f"Text document from {url}" | |
| elif 'application/json' in content_type: | |
| # Handle JSON content | |
| try: | |
| json_data = response.json() | |
| result['content'] = json.dumps(json_data, indent=2)[:8000] | |
| result['title'] = f"JSON document from {url}" | |
| except: | |
| result['content'] = response.text[:8000] | |
| result['title'] = f"JSON document from {url}" | |
| elif any(x in content_type for x in ['application/pdf', 'application/msword', 'application/vnd.openxmlformats']): | |
| # Handle document files | |
| result['content'] = f"Document file detected ({content_type}). Content extraction for this file type is not implemented." | |
| result['title'] = f"Document from {url}" | |
| else: | |
| # Handle other content types | |
| if response.text: | |
| content = response.text[:8000] | |
| result['content'] = content | |
| result['title'] = f"Content from {url}" | |
| else: | |
| result['content'] = f"Non-text content detected ({content_type})" | |
| result['title'] = f"File from {url}" | |
| if self.debug: | |
| print(f"Successfully fetched content from {url}: {len(result['content'])} characters") | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| error_msg = f"Failed to fetch {url}: {str(e)}" | |
| if self.debug: | |
| print(error_msg) | |
| return { | |
| 'url': url, | |
| 'content_type': 'error', | |
| 'title': f"Error fetching {url}", | |
| 'content': '', | |
| 'error': error_msg | |
| } | |
| except Exception as e: | |
| error_msg = f"Unexpected error fetching {url}: {str(e)}" | |
| if self.debug: | |
| print(error_msg) | |
| return { | |
| 'url': url, | |
| 'content_type': 'error', | |
| 'title': f"Error fetching {url}", | |
| 'content': '', | |
| 'error': error_msg | |
| } | |
| def fetch_multiple_urls(self, urls: List[str]) -> List[Dict[str, str]]: | |
| """Fetch content from multiple URLs.""" | |
| results = [] | |
| for url in urls[:5]: # Limit to 5 URLs to avoid excessive processing | |
| result = self.fetch_url_content(url) | |
| results.append(result) | |
| time.sleep(1) # Be respectful to servers | |
| return results | |
| # --- File Processing Utility --- | |
| def save_attachment_to_file(attachment_data: Union[str, bytes, dict], temp_dir: str, file_name: str = None) -> Optional[str]: | |
| """ | |
| Save attachment data to a temporary file. | |
| Returns the local file path if successful, None otherwise. | |
| """ | |
| try: | |
| # Determine file name and extension | |
| if not file_name: | |
| file_name = f"attachment_{int(time.time())}" | |
| # Handle different data types | |
| if isinstance(attachment_data, dict): | |
| # Handle dict with file data | |
| if 'data' in attachment_data: | |
| file_data = attachment_data['data'] | |
| file_type = attachment_data.get('type', '').lower() | |
| original_name = attachment_data.get('name', file_name) | |
| elif 'content' in attachment_data: | |
| file_data = attachment_data['content'] | |
| file_type = attachment_data.get('mime_type', '').lower() | |
| original_name = attachment_data.get('filename', file_name) | |
| else: | |
| # Try to use the dict as file data directly | |
| file_data = str(attachment_data) | |
| file_type = '' | |
| original_name = file_name | |
| # Use original name if available | |
| if original_name and original_name != file_name: | |
| file_name = original_name | |
| elif isinstance(attachment_data, str): | |
| # Could be base64 encoded data or plain text | |
| file_data = attachment_data | |
| file_type = '' | |
| elif isinstance(attachment_data, bytes): | |
| # Binary data | |
| file_data = attachment_data | |
| file_type = '' | |
| else: | |
| print(f"Unknown attachment data type: {type(attachment_data)}") | |
| return None | |
| # Ensure file has an extension | |
| if '.' not in file_name: | |
| # Try to determine extension from type | |
| if 'image' in file_type: | |
| if 'jpeg' in file_type or 'jpg' in file_type: | |
| file_name += '.jpg' | |
| elif 'png' in file_type: | |
| file_name += '.png' | |
| else: | |
| file_name += '.img' | |
| elif 'audio' in file_type: | |
| if 'mp3' in file_type: | |
| file_name += '.mp3' | |
| elif 'wav' in file_type: | |
| file_name += '.wav' | |
| else: | |
| file_name += '.audio' | |
| elif 'python' in file_type or 'text' in file_type: | |
| file_name += '.py' | |
| else: | |
| file_name += '.file' | |
| file_path = os.path.join(temp_dir, file_name) | |
| # Save the file | |
| if isinstance(file_data, str): | |
| # Try to decode if it's base64 | |
| try: | |
| # Check if it looks like base64 | |
| if len(file_data) > 100 and '=' in file_data[-5:]: | |
| decoded_data = base64.b64decode(file_data) | |
| with open(file_path, 'wb') as f: | |
| f.write(decoded_data) | |
| else: | |
| # Plain text | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| f.write(file_data) | |
| except: | |
| # If base64 decode fails, save as text | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| f.write(file_data) | |
| else: | |
| # Binary data | |
| with open(file_path, 'wb') as f: | |
| f.write(file_data) | |
| print(f"Saved attachment: {file_path}") | |
| return file_path | |
| except Exception as e: | |
| print(f"Failed to save attachment: {e}") | |
| return None | |
| # --- Code Processing Tool --- | |
| class CodeAnalysisTool: | |
| def __init__(self, model_name: str = "meta-llama/Llama-3.1-8B-Instruct"): | |
| self.client = InferenceClient(model=model_name, provider="sambanova") | |
| def analyze_code(self, code_path: str) -> str: | |
| """ | |
| Analyze Python code and return insights. | |
| """ | |
| try: | |
| with open(code_path, 'r', encoding='utf-8') as f: | |
| code_content = f.read() | |
| # Limit code length for analysis | |
| if len(code_content) > 5000: | |
| code_content = code_content[:5000] + "\n... (truncated)" | |
| analysis_prompt = f"""Just provide the code content withiut any changes as reply. | |
| Code: | |
| ```python | |
| {code_content} | |
| ``` | |
| Provide a brief, focused analysis:""" | |
| messages = [{"role": "user", "content": analysis_prompt}] | |
| response = self.client.chat_completion( | |
| messages=messages, | |
| max_tokens=500, | |
| temperature=0.3 | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| return f"Code analysis failed: {e}" | |
| # --- Image Processing Tool --- | |
| class ImageAnalysisTool: | |
| def __init__(self, model_name: str = "microsoft/Florence-2-large"): | |
| self.client = InferenceClient(model=model_name) | |
| def analyze_image(self, image_path: str, prompt: str = "Describe this image in detail") -> str: | |
| """ | |
| Analyze an image and return a description. | |
| """ | |
| try: | |
| # Open and process the image | |
| with open(image_path, "rb") as f: | |
| image_bytes = f.read() | |
| # Use the vision model to analyze the image | |
| response = self.client.image_to_text( | |
| image=image_bytes, | |
| model="microsoft/Florence-2-large" | |
| ) | |
| return response.get("generated_text", "Could not analyze image") | |
| except Exception as e: | |
| try: | |
| # Fallback: use a different vision model | |
| response = self.client.image_to_text( | |
| image=image_bytes, | |
| model="Salesforce/blip-image-captioning-large" | |
| ) | |
| return response.get("generated_text", f"Image analysis error: {e}") | |
| except: | |
| return f"Image analysis failed: {e}" | |
| def extract_text_from_image(self, image_path: str) -> str: | |
| """ | |
| Extract text from an image using OCR. | |
| """ | |
| try: | |
| with open(image_path, "rb") as f: | |
| image_bytes = f.read() | |
| # Use an OCR model | |
| response = self.client.image_to_text( | |
| image=image_bytes, | |
| model="microsoft/trocr-base-printed" | |
| ) | |
| return response.get("generated_text", "No text found in image") | |
| except Exception as e: | |
| return f"OCR failed: {e}" | |
| # --- Audio Processing Tool --- | |
| class AudioTranscriptionTool: | |
| def __init__(self, model_name: str = "openai/whisper-large-v3"): | |
| self.client = InferenceClient(model=model_name) | |
| def transcribe_audio(self, audio_path: str) -> str: | |
| """ | |
| Transcribe audio file to text. | |
| """ | |
| try: | |
| with open(audio_path, "rb") as f: | |
| audio_bytes = f.read() | |
| # Use Whisper for transcription | |
| response = self.client.automatic_speech_recognition( | |
| audio=audio_bytes | |
| ) | |
| return response.get("text", "Could not transcribe audio") | |
| except Exception as e: | |
| try: | |
| # Fallback to a different ASR model | |
| response = self.client.automatic_speech_recognition( | |
| audio=audio_bytes, | |
| model="facebook/wav2vec2-large-960h-lv60-self" | |
| ) | |
| return response.get("text", f"Audio transcription error: {e}") | |
| except: | |
| return f"Audio transcription failed: {e}" | |
| # --- Enhanced Intelligent Agent with Direct Attachment Processing --- | |
| class IntelligentAgent: | |
| def __init__(self, debug: bool = True, model_name: str = "meta-llama/Llama-3.1-8B-Instruct"): | |
| self.search_tool = SimpleSearchTool() | |
| self.client = InferenceClient(model=model_name, provider="sambanova") | |
| self.image_tool = ImageAnalysisTool() | |
| self.audio_tool = AudioTranscriptionTool() | |
| self.code_tool = CodeAnalysisTool(model_name) | |
| self.excel_tool = ExcelAnalysisTool() | |
| self.web_fetcher = WebContentFetcher(debug) | |
| self.debug = debug | |
| if self.debug: | |
| print(f"IntelligentAgent initialized with model: {model_name}") | |
| def _chat_completion(self, prompt: str, max_tokens: int = 500, temperature: float = 0.3) -> str: | |
| """ | |
| Use chat completion instead of text generation to avoid provider compatibility issues. | |
| """ | |
| try: | |
| messages = [{"role": "user", "content": prompt}] | |
| # Try chat completion first | |
| try: | |
| response = self.client.chat_completion( | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as chat_error: | |
| if self.debug: | |
| print(f"Chat completion failed: {chat_error}, trying text generation...") | |
| # Fallback to text generation | |
| response = self.client.conversational( | |
| prompt, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| do_sample=temperature > 0 | |
| ) | |
| return response.strip() | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Both chat completion and text generation failed: {e}") | |
| raise e | |
| def _extract_and_process_urls(self, question_text: str) -> str: | |
| """ | |
| Extract URLs from question text and fetch their content. | |
| Returns formatted content from all URLs. | |
| """ | |
| urls = self.web_fetcher.extract_urls_from_text(question_text) | |
| if not urls: | |
| return "" | |
| if self.debug: | |
| print(f"...Found {len(urls)} URLs in question: {urls}") | |
| url_contents = self.web_fetcher.fetch_multiple_urls(urls) | |
| if not url_contents: | |
| return "" | |
| # Format the content | |
| formatted_content = [] | |
| for content_data in url_contents: | |
| if content_data['error']: | |
| formatted_content.append(f"URL: {content_data['url']}\nError: {content_data['error']}") | |
| else: | |
| formatted_content.append( | |
| f"URL: {content_data['url']}\n" | |
| f"Title: {content_data['title']}\n" | |
| f"Content Type: {content_data['content_type']}\n" | |
| f"Content: {content_data['content']}" | |
| ) | |
| return "\n\n" + "="*50 + "\n".join(formatted_content) + "\n" + "="*50 | |
| def _detect_and_process_direct_attachments(self, file_name: str, local_path: str) -> Tuple[List[str], List[str], List[str]]: | |
| """ | |
| Detect and process a single attachment directly attached to a question (not as a URL). | |
| Returns (image_files, audio_files, code_files) | |
| """ | |
| image_files = [] | |
| audio_files = [] | |
| code_files = [] | |
| excel_files = [] | |
| if not file_name: | |
| return image_files, audio_files, excel_files, code_files | |
| try: | |
| # Construct the file path (assuming file is in the same directory) | |
| #file_path = os.path.join(local_path, file_name) | |
| # Check if file exists | |
| if not os.path.exists(local_path): | |
| if self.debug: | |
| print(f"File not found: {local_path}") | |
| return image_files, audio_files, excel_files, code_files | |
| # Get file extension | |
| file_ext = Path(file_name).suffix.lower() | |
| # Determine category | |
| is_image = ( | |
| file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.tiff'] | |
| ) | |
| is_audio = ( | |
| file_ext in ['.mp3', '.wav', '.m4a', '.ogg', '.flac', '.aac'] | |
| ) | |
| is_code = ( | |
| file_ext in ['.py', '.txt', '.js', '.html', '.css', '.json', '.xml', '.md', '.c', '.cpp', '.java'] | |
| ) | |
| is_excel = ( | |
| file_ext in ['.xlsx', '.xls'] | |
| ) | |
| # Categorize the file | |
| if is_image: | |
| image_files.append(local_path) | |
| elif is_audio: | |
| audio_files.append(local_path) | |
| elif is_code: | |
| code_files.append(local_path) | |
| elif is_excel: | |
| excel_files.append(local_path) | |
| else: | |
| # Default to code/text for unknown types | |
| code_files.append(local_path) | |
| if self.debug: | |
| print(f"Processed file: {file_name} -> {'image' if is_image else 'audio' if is_audio else 'excel' if is_excel else 'code'}") | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Error processing attachment {file_name}: {e}") | |
| if self.debug: | |
| print(f"Processed attachment: {len(image_files)} images, {len(audio_files)} audio, {len(code_files)} code files, {len(excel_files)} excel files") | |
| return image_files, audio_files, excel_files, code_files | |
| def _process_attachments(self, image_files: List[str], audio_files: List[str], excel_files: List[str], code_files: List[str]) -> str: | |
| """ | |
| Process different types of attachments and return consolidated context. | |
| """ | |
| attachment_context = "" | |
| # Process images | |
| for image_file in image_files: | |
| if self.debug: | |
| print(f"Processing image: {image_file}") | |
| try: | |
| image_description = self.image_tool.analyze_image(image_file) | |
| ocr_text = self.image_tool.extract_text_from_image(image_file) | |
| attachment_context += f"\n\nIMAGE ANALYSIS ({image_file}):\n" | |
| attachment_context += f"Description: {image_description}\n" | |
| if ocr_text and "No text found" not in ocr_text and "OCR failed" not in ocr_text: | |
| attachment_context += f"Text extracted: {ocr_text}\n" | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Error processing image {image_file}: {e}") | |
| attachment_context += f"\n\nIMAGE PROCESSING ERROR ({image_file}): {e}\n" | |
| # Process audio files | |
| for audio_file in audio_files: | |
| if self.debug: | |
| print(f"Processing audio: {audio_file}") | |
| try: | |
| transcription = self.audio_tool.transcribe_audio(audio_file) | |
| attachment_context += f"\n\nAUDIO TRANSCRIPTION ({audio_file}):\n{transcription}\n" | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Error processing audio {audio_file}: {e}") | |
| attachment_context += f"\n\nAUDIO PROCESSING ERROR ({audio_file}): {e}\n" | |
| # Process code/text files | |
| for code_file in code_files: | |
| if self.debug: | |
| print(f"Processing code/text: {code_file}") | |
| try: | |
| code_analysis = self.code_tool.analyze_code(code_file) | |
| attachment_context += f"\n\nCODE ANALYSIS ({code_file}):\n{code_analysis}\n" | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Error processing code {code_file}: {e}") | |
| attachment_context += f"\n\nCODE PROCESSING ERROR ({code_file}): {e}\n" | |
| # Process excel files | |
| for excel_file in excel_files: | |
| if self.debug: | |
| print(f"Processing excel: {excel_file}") | |
| try: | |
| excel_analysis = self.excel_tool.analyze_excel(excel_file) | |
| attachment_context += f"\n\nEXCEL ANALYSIS ({excel_file}):\n{excel_analysis}\n" | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Error processing code {excel_file}: {e}") | |
| attachment_context += f"\n\nEXCEL PROCESSING ERROR ({excel_file}): {e}\n" | |
| return attachment_context | |
| def _should_search(self, question: str, attachment_context: str, url_context: str) -> bool: | |
| """ | |
| Decide whether to use search based on the question and available context. | |
| """ | |
| # If we have rich context from attachments or URLs, we might not need search | |
| has_rich_context = bool(attachment_context.strip() or url_context.strip()) | |
| # Keywords that typically indicate search is needed | |
| search_keywords = [ | |
| "latest", "recent", "current", "today", "now", "2024", "2025", | |
| "news", "update", "breaking", "trending", "happening", | |
| "who is", "what is", "where is", "when did", "how many", | |
| "price", "stock", "weather", "forecast" | |
| ] | |
| question_lower = question.lower() | |
| needs_search = any(keyword in question_lower for keyword in search_keywords) | |
| # Use LLM to make a more nuanced decision | |
| try: | |
| decision_prompt = f""" | |
| Given this question and available context, should I search the web for additional information? | |
| Question: {question} | |
| Available context: {"Yes - context from attachments/URLs" if has_rich_context else "No additional context"} | |
| Context preview: {(attachment_context + url_context)[:500]}... | |
| Answer with just "YES" only if your general knowledge is insufficient to answer the question. Say "NO" if your knowledge is enough or if there is context available. | |
| """ | |
| decision = self._chat_completion(decision_prompt, max_tokens=10, temperature=0.1) | |
| should_search = "YES" in decision.upper() | |
| if self.debug: | |
| print(f"Search decision: {should_search} (LLM said: {decision})") | |
| return should_search | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Error in search decision: {e}, falling back to keyword-based decision") | |
| return needs_search and not has_rich_context | |
| def _answer_with_search(self, question: str, attachment_context: str, url_context: str) -> str: | |
| """ | |
| Answer the question using search + LLM. | |
| """ | |
| try: | |
| # Perform search | |
| #search_results = self.search_tool.forward(question) | |
| search_results = self.search_tool.run(question) | |
| # Combine all contexts | |
| full_context = f"""Question: {question} Search Results: {search_results} {attachment_context}{url_context}""" | |
| answer_prompt = f"""\no_think You are a general AI assistant. I will ask you a question. | |
| The question might refer to external information or an attachment (e.g. video or image). Do not say that you cannot process those. Use context below. | |
| Based on the context sections below, provide an answer to the question. | |
| If the search results don't fully answer the question, you can supplement with your general knowledge. | |
| Your ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. | |
| If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. | |
| If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwuse. | |
| Do not include your motivation, reasoning or explanation. Provide only the answer | |
| {full_context} """ | |
| return self._chat_completion(answer_prompt, max_tokens=800, temperature=0.3) | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Search-based answer failed: {e}") | |
| return self._answer_with_llm(question, attachment_context, url_context) | |
| def _answer_with_llm(self, question: str, attachment_context: str, url_context: str) -> str: | |
| """ | |
| Answer the question using only the LLM and available context. | |
| """ | |
| try: | |
| full_context = f"""Question: {question} {attachment_context} {url_context}""" | |
| answer_prompt = f"""\no_think You are a general AI assistant. I will ask you a question. | |
| The question might refer to external information or an attachment (e.g. video or image). Do not say that you cannot process those. Use context below. | |
| Based on the context sections below, provide an answer to the question. | |
| If the context doesn't fully answer the question, you can supplement with your general knowledge. | |
| Your ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. | |
| If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. | |
| If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. | |
| Do not include your motivation, reasoning or explanation. Provide only the short answer (e.g. one word). | |
| {full_context} """ | |
| return self._chat_completion(answer_prompt, max_tokens=800, temperature=0.3) | |
| except Exception as e: | |
| return f"I apologize, but I encountered an error while processing your question: {e}" | |
| def process_question_with_attachments(self, question_data: dict) -> str: | |
| """ | |
| Process a question that may have attachments and URLs. | |
| """ | |
| question_text = question_data.get('question', '') | |
| if self.debug: | |
| print(f"Question data keys: {list(question_data.keys())}") | |
| print(f"\n1. Processing question with potential attachments and URLs: {question_text[:300]}...") | |
| try: | |
| # Detect and process URLs | |
| if self.debug: | |
| print(f"2. Detecting and processing URLs...") | |
| url_context = self._extract_and_process_urls(question_text) | |
| if self.debug and url_context: | |
| print(f"URL context found: {len(url_context)} characters") | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Error extracting URLs: {e}") | |
| url_context = "" | |
| attachment_context = "" | |
| try: | |
| # Detect and download attachments | |
| if self.debug: | |
| print(f"3. Searching for images, audio or code attachments...") | |
| attachment_name = question_data.get('file_name', '') | |
| task_id = question_data.get('task_id', '') | |
| if self.debug: | |
| print(f"Attachment name from question_data: '{attachment_name}'") | |
| if self.debug and attachment_name: | |
| print(f"Downloading attachment") | |
| if attachment_name: | |
| download_url = f"{DEFAULT_API_URL}/files/{task_id}" | |
| response = requests.get(download_url, timeout=30) | |
| response.raise_for_status() | |
| temp_dir = tempfile.mkdtemp() | |
| local_file_path = os.path.join(temp_dir, attachment_name) | |
| with open(local_file_path, 'wb') as f: | |
| f.write(response.content) | |
| image_files, audio_files, excel_files, code_files = self._detect_and_process_direct_attachments(attachment_name, local_file_path) | |
| # Process attachments to get context | |
| attachment_context = self._process_attachments(image_files, audio_files, excel_files, code_files) | |
| if self.debug and attachment_context: | |
| print(f"Attachment context: {attachment_context[:200]}...") | |
| # Decide whether to search | |
| if self._should_search(question_text, attachment_context, url_context): | |
| if self.debug: | |
| print("5. Using search-based approach") | |
| answer = self._answer_with_search(question_text, attachment_context, url_context) | |
| else: | |
| if self.debug: | |
| print("5. Using LLM-only approach") | |
| answer = self._answer_with_llm(question_text, attachment_context, url_context) | |
| if self.debug: | |
| print(f"LLM answer: {answer}") | |
| # Note: We don't cleanup files here since they're not temporary files we created | |
| # They are actual files in the working directory | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Error in attachment processing: {e}") | |
| answer = f"Sorry, I encountered an error: {e}" | |
| if self.debug: | |
| print(f"6. Agent returning answer: {answer[:100]}...") | |
| return answer | |
| def fetch_questions() -> Tuple[str, Optional[pd.DataFrame]]: | |
| """ | |
| Fetch questions from the API and cache them. | |
| """ | |
| global cached_questions | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| files_url = f"{api_url}/files" | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| return "Fetched questions list is empty.", None | |
| cached_questions = questions_data | |
| # Create DataFrame for display | |
| display_data = [] | |
| for item in questions_data: | |
| # Check for attachments | |
| has_file = False | |
| file_info = "" | |
| file_name = item.get("file_name", "") | |
| item_id = item.get("task_id") | |
| if file_name and file_name.strip(): | |
| has_file = True | |
| file_info = file_name | |
| file_data = requests.get(f"{files_url}/{item_id}") | |
| print(file_data) | |
| # Check if question contains URLs | |
| question_text = item.get("question", "") | |
| if 'http' in question_text: | |
| has_file = True | |
| file_info += "URLs in text, " | |
| if file_info: | |
| file_info = file_info.rstrip(", ") | |
| display_data.append({ | |
| "Task ID": item.get("task_id", "Unknown"), | |
| "Question": question_text[:100] + "..." if len(question_text) > 100 else question_text, | |
| "Has Attachments": "Yes" if has_file else "No", | |
| "Attachment Info": file_info | |
| }) | |
| df = pd.DataFrame(display_data) | |
| attachment_count = sum(1 for item in display_data if item["Has Attachments"] == "Yes") | |
| status_msg = f"Successfully fetched {len(questions_data)} questions. {attachment_count} questions have attachments. Ready to generate answers." | |
| return status_msg, df | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching questions: {e}", None | |
| except Exception as e: | |
| return f"An unexpected error occurred: {e}", None | |
| def generate_answers_async(model_name: str = "meta-llama/Llama-3.1-8B-Instruct", progress_callback=None): | |
| """ | |
| Generate answers for all cached questions asynchronously using the intelligent agent. | |
| """ | |
| global cached_answers, processing_status | |
| if not cached_questions: | |
| return "No questions available. Please fetch questions first." | |
| processing_status["is_processing"] = True | |
| processing_status["progress"] = 0 | |
| processing_status["total"] = len(cached_questions) | |
| try: | |
| agent = IntelligentAgent(debug=True, model_name=model_name) | |
| cached_answers = {} | |
| for i, question_data in enumerate(cached_questions): | |
| if not processing_status["is_processing"]: # Check if cancelled | |
| break | |
| task_id = question_data.get("task_id") | |
| question_text = question_data.get("question") | |
| if not task_id or question_text is None: | |
| continue | |
| try: | |
| # Use the new method that handles attachments | |
| answer = agent.process_question_with_attachments(question_data) | |
| cached_answers[task_id] = { | |
| "question": question_text, | |
| "answer": answer | |
| } | |
| except Exception as e: | |
| cached_answers[task_id] = { | |
| "question": question_text, | |
| "answer": f"AGENT ERROR: {e}" | |
| } | |
| processing_status["progress"] = i + 1 | |
| if progress_callback: | |
| progress_callback(i + 1, len(cached_questions)) | |
| except Exception as e: | |
| print(f"Error in generate_answers_async: {e}") | |
| finally: | |
| processing_status["is_processing"] = False | |
| def start_answer_generation(model_choice: str): | |
| """ | |
| Start the answer generation process in a separate thread. | |
| """ | |
| if processing_status["is_processing"]: | |
| return "Answer generation is already in progress." | |
| if not cached_questions: | |
| return "No questions available. Please fetch questions first." | |
| # Map model choice to actual model name | |
| model_map = { | |
| "Llama 3.1 8B": "meta-llama/Llama-3.1-8B-Instruct", | |
| "Llama 3.3 70B": "meta-llama/Llama-3.3-70B-Instruct", | |
| "Llama 3.3 Shallow 70B": "tokyotech-llm/Llama-3.3-Swallow-70B-Instruct-v0.4", | |
| "Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3", | |
| "Qwen 2.5": "Qwen/Qwen‑2.5‑Omni‑7B", | |
| #"Qwen 2.5 instruct": "Qwen/Qwen2.5-14B-Instruct-1M", | |
| "Qwen 3": "Qwen/Qwen3-32B" | |
| } | |
| selected_model = model_map.get(model_choice, "meta-llama/Llama-3.1-8B-Instruct") | |
| # Start generation in background thread | |
| thread = threading.Thread(target=generate_answers_async, args=(selected_model,)) | |
| thread.daemon = True | |
| thread.start() | |
| return f"Answer generation started using {model_choice}. Check progress." | |
| def get_generation_progress(): | |
| """ | |
| Get the current progress of answer generation. | |
| """ | |
| if not processing_status["is_processing"] and processing_status["progress"] == 0: | |
| return "Not started" | |
| if processing_status["is_processing"]: | |
| progress = processing_status["progress"] | |
| total = processing_status["total"] | |
| status_msg = f"Generating answers... {progress}/{total} completed" | |
| return status_msg | |
| else: | |
| # Generation completed | |
| if cached_answers: | |
| # Create DataFrame with results | |
| display_data = [] | |
| for task_id, data in cached_answers.items(): | |
| display_data.append({ | |
| "Task ID": task_id, | |
| "Question": data["question"][:100] + "..." if len(data["question"]) > 100 else data["question"], | |
| "Generated Answer": data["answer"][:200] + "..." if len(data["answer"]) > 200 else data["answer"] | |
| }) | |
| df = pd.DataFrame(display_data) | |
| status_msg = f"Answer generation completed! {len(cached_answers)} answers ready for submission." | |
| return status_msg, df | |
| else: | |
| return "Answer generation completed but no answers were generated." | |
| def submit_cached_answers(profile: gr.OAuthProfile | None): | |
| """ | |
| Submit the cached answers to the evaluation API. | |
| """ | |
| global cached_answers | |
| if not profile: | |
| return "Please log in to Hugging Face first.", None | |
| if not cached_answers: | |
| return "No cached answers available. Please generate answers first.", None | |
| username = profile.username | |
| space_id = os.getenv("SPACE_ID") | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "Unknown" | |
| # Prepare submission payload | |
| answers_payload = [] | |
| for task_id, data in cached_answers.items(): | |
| answers_payload.append({ | |
| "task_id": task_id, | |
| "submitted_answer": data["answer"] | |
| }) | |
| submission_data = { | |
| "username": username.strip(), | |
| "agent_code": agent_code, | |
| "answers": answers_payload | |
| } | |
| # Submit to API | |
| api_url = DEFAULT_API_URL | |
| submit_url = f"{api_url}/submit" | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| # Create results DataFrame | |
| results_log = [] | |
| for task_id, data in cached_answers.items(): | |
| results_log.append({ | |
| "Task ID": task_id, | |
| "Question": data["question"], | |
| "Submitted Answer": data["answer"] | |
| }) | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| except: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| return f"Submission Failed: {error_detail}", None | |
| except requests.exceptions.Timeout: | |
| return "Submission Failed: The request timed out.", None | |
| except Exception as e: | |
| return f"Submission Failed: {e}", None | |
| def clear_cache(): | |
| """ | |
| Clear all cached data. | |
| """ | |
| global cached_answers, cached_questions, processing_status | |
| cached_answers = {} | |
| cached_questions = [] | |
| processing_status = {"is_processing": False, "progress": 0, "total": 0} | |
| return "Cache cleared successfully.", None | |
| # --- Enhanced Gradio Interface --- | |
| with gr.Blocks(title="Intelligent Agent with Media Processing") as demo: | |
| gr.Markdown("# Intelligent Agent with Conditional Search and Media Processing") | |
| gr.Markdown("This agent can process images and audio files, uses an LLM to decide when search is needed, optimizing for both accuracy and efficiency.") | |
| with gr.Row(): | |
| gr.LoginButton() | |
| clear_btn = gr.Button("Clear Cache", variant="secondary") | |
| with gr.Tab("Step 1: Fetch Questions"): | |
| gr.Markdown("### Fetch Questions from API") | |
| fetch_btn = gr.Button("Fetch Questions", variant="primary") | |
| fetch_status = gr.Textbox(label="Fetch Status", lines=2, interactive=False) | |
| questions_table = gr.DataFrame(label="Available Questions", wrap=True) | |
| fetch_btn.click( | |
| fn=fetch_questions, | |
| outputs=[fetch_status, questions_table] | |
| ) | |
| with gr.Tab("Step 2: Generate Answers"): | |
| gr.Markdown("### Generate Answers with Intelligent Search Decision") | |
| with gr.Row(): | |
| model_choice = gr.Dropdown( | |
| choices=["Llama 3.1 8B", "Llama 3.3 70B", "Llama 3.3 Shallow 70B", "Mistral 7B", "Qwen 2.5", "Qwen 3"], | |
| value="Llama 3.1 8B", | |
| label="Select Model" | |
| ) | |
| generate_btn = gr.Button("Start Answer Generation", variant="primary") | |
| refresh_btn = gr.Button("Refresh Progress", variant="secondary") | |
| generation_status = gr.Textbox(label="Generation Status", lines=2, interactive=False) | |
| answers_table = gr.DataFrame(label="Generated Answers", wrap=True) | |
| generate_btn.click( | |
| fn=start_answer_generation, | |
| inputs=[model_choice], | |
| outputs=generation_status | |
| ) | |
| refresh_btn.click( | |
| fn=get_generation_progress, | |
| outputs=[generation_status, answers_table] | |
| ) | |
| with gr.Tab("Step 3: Submit Results"): | |
| gr.Markdown("### Submit Generated Answers") | |
| submit_btn = gr.Button("Submit Answers", variant="primary") | |
| submit_status = gr.Textbox(label="Submission Status", lines=4, interactive=False) | |
| results_table = gr.DataFrame(label="Submission Results", wrap=True) | |
| submit_btn.click( | |
| fn=submit_cached_answers, | |
| outputs=[submit_status, results_table] | |
| ) | |
| # Clear cache functionality | |
| clear_btn.click( | |
| fn=clear_cache, | |
| outputs=[fetch_status, questions_table] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |