Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import inspect | |
| import time | |
| import pandas as pd | |
| from smolagents import DuckDuckGoSearchTool | |
| import threading | |
| from typing import Dict, List, Optional, Tuple | |
| import json | |
| from huggingface_hub import InferenceClient | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- Global Cache for Answers --- | |
| cached_answers = {} | |
| cached_questions = [] | |
| processing_status = {"is_processing": False, "progress": 0, "total": 0} | |
| # --- Intelligent Agent with Conditional Search --- | |
| class IntelligentAgent: | |
| def __init__(self, debug: bool = False, model_name: str = "meta-llama/Llama-3.1-8B-Instruct"): | |
| self.search = DuckDuckGoSearchTool() | |
| self.client = InferenceClient(model=model_name) | |
| self.debug = debug | |
| if self.debug: | |
| print(f"IntelligentAgent initialized with model: {model_name}") | |
| def _should_search(self, question: str) -> bool: | |
| """ | |
| Use LLM to determine if search is needed for the question. | |
| Returns True if search is recommended, False otherwise. | |
| """ | |
| decision_prompt = f"""You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending on whether the element to be put in the list is a number or a string | |
| Analyze this question and decide if it requires real-time information, recent data, or specific facts that might not be in your training data. | |
| SEARCH IS NEEDED for: | |
| - Current events, news, recent developments | |
| - Real-time data (weather, stock prices, sports scores) | |
| - Specific factual information that changes frequently | |
| - Recent product releases, company information | |
| - Current status of people, organizations, or projects | |
| - Location-specific current information | |
| SEARCH IS NOT NEEDED for: | |
| - General knowledge questions | |
| - Mathematical calculations | |
| - Programming concepts and syntax | |
| - Historical facts (older than 1 year) | |
| - Definitions of well-established concepts | |
| - How-to instructions for common tasks | |
| - Creative writing or opinion-based responses | |
| Question: "{question}" | |
| Respond with only "SEARCH" or "NO_SEARCH" followed by a brief reason (max 20 words). | |
| Example responses: | |
| - "SEARCH - Current weather data needed" | |
| - "NO_SEARCH - Mathematical concept, general knowledge sufficient" | |
| """ | |
| try: | |
| response = self.client.text_generation( | |
| decision_prompt, | |
| max_new_tokens=50, | |
| temperature=0.1, | |
| do_sample=False | |
| ) | |
| decision = response.strip().upper() | |
| should_search = decision.startswith("SEARCH") | |
| time.sleep(5) | |
| if self.debug: | |
| print(f"Decision for '{question}': {decision}") | |
| return should_search | |
| except Exception as e: | |
| if self.debug: | |
| print(f"Error in search decision: {e}, defaulting to search") | |
| # Default to search if decision fails | |
| return True | |
| def _answer_with_llm(self, question: str) -> str: | |
| """ | |
| Generate answer using LLM without search. | |
| """ | |
| answer_prompt = f"""You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string." | |
| Question: {question} | |
| Answer:""" | |
| try: | |
| response = self.client.text_generation( | |
| answer_prompt, | |
| max_new_tokens=500, | |
| temperature=0.3, | |
| do_sample=True | |
| ) | |
| return response.strip() | |
| except Exception as e: | |
| return f"Sorry, I encountered an error generating the response: {e}" | |
| def _answer_with_search(self, question: str) -> str: | |
| """ | |
| Generate answer using search results and LLM. | |
| """ | |
| try: | |
| # Perform search | |
| time.sleep(10) | |
| search_results = self.search(question) | |
| if self.debug: | |
| print(f"Search results type: {type(search_results)}") | |
| #print(f"Search results: {search_results}") | |
| if not search_results: | |
| return "No search results found. Let me try to answer based on my knowledge:\n\n" + self._answer_with_llm(question) | |
| # Format search results - handle different result formats | |
| if self.debug: | |
| print(f"First result type: {type(search_results[0]) if search_results else 'None'}") | |
| print(f"First result: {search_results[0] if search_results else 'None'}") | |
| # If search_results is a string, use it directly | |
| if isinstance(search_results, str): | |
| search_context = search_results | |
| else: | |
| # Handle list of results | |
| formatted_results = [] | |
| for i, result in enumerate(search_results[:3]): # Use top 3 results | |
| if isinstance(result, dict): | |
| title = result.get("title", "No title") | |
| snippet = result.get("snippet", "").strip() | |
| link = result.get("link", "") | |
| formatted_results.append(f"Title: {title}\nContent: {snippet}\nSource: {link}") | |
| elif isinstance(result, str): | |
| # If result is a string, use it directly | |
| formatted_results.append(result) | |
| else: | |
| # Handle other formats | |
| formatted_results.append(str(result)) | |
| search_context = "\n\n".join(formatted_results) | |
| # Generate answer using search context | |
| answer_prompt = f"""You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string." | |
| Question: {question} | |
| Search Results: | |
| {search_context} | |
| Based on the search results above, provide an answer to the question. If the search results don't fully answer the question, you can supplement with your general knowledge. | |
| Answer:""" | |
| try: | |
| response = self.client.text_generation( | |
| answer_prompt, | |
| max_new_tokens=600, | |
| temperature=0.3, | |
| do_sample=True | |
| ) | |
| return response.strip() | |
| except Exception as e: | |
| if self.debug: | |
| print(f"LLM generation error: {e}") | |
| # Fallback to simple search result formatting | |
| if search_results: | |
| if isinstance(search_results, str): | |
| return search_results | |
| elif isinstance(search_results, list) and len(search_results) > 0: | |
| first_result = search_results[0] | |
| if isinstance(first_result, dict): | |
| title = first_result.get("title", "Search Result") | |
| snippet = first_result.get("snippet", "").strip() | |
| link = first_result.get("link", "") | |
| return f"**{title}**\n\n{snippet}\n\n{f'Source: {link}' if link else ''}" | |
| else: | |
| return str(first_result) | |
| else: | |
| return str(search_results) | |
| else: | |
| return "Search completed but no usable results found." | |
| except Exception as e: | |
| return f"Search failed: {e}. Let me try to answer based on my knowledge:\n\n" + self._answer_with_llm(question) | |
| def __call__(self, question: str) -> str: | |
| """ | |
| Main entry point - decide whether to search and generate appropriate response. | |
| """ | |
| if self.debug: | |
| print(f"Agent received question: {question}") | |
| # Early validation | |
| if not question or not question.strip(): | |
| return "Please provide a valid question." | |
| try: | |
| # Decide whether to search | |
| if self._should_search(question): | |
| if self.debug: | |
| print("Using search-based approach") | |
| answer = self._answer_with_search(question) | |
| else: | |
| if self.debug: | |
| print("Using LLM-only approach") | |
| answer = self._answer_with_llm(question) | |
| except Exception as e: | |
| answer = f"Sorry, I encountered an error: {e}" | |
| if self.debug: | |
| print(f"Agent returning answer: {answer[:100]}...") | |
| return answer | |
| def fetch_questions() -> Tuple[str, Optional[pd.DataFrame]]: | |
| """ | |
| Fetch questions from the API and cache them. | |
| """ | |
| global cached_questions | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| return "Fetched questions list is empty.", None | |
| cached_questions = questions_data | |
| # Create DataFrame for display | |
| display_data = [] | |
| for item in questions_data: | |
| display_data.append({ | |
| "Task ID": item.get("task_id", "Unknown"), | |
| "Question": item.get("question", "") | |
| }) | |
| df = pd.DataFrame(display_data) | |
| status_msg = f"Successfully fetched {len(questions_data)} questions. Ready to generate answers." | |
| return status_msg, df | |
| except requests.exceptions.RequestException as e: | |
| return f"Error fetching questions: {e}", None | |
| except Exception as e: | |
| return f"An unexpected error occurred: {e}", None | |
| def generate_answers_async(model_name: str = "meta-llama/Llama-3.1-8B-Instruct", progress_callback=None): | |
| """ | |
| Generate answers for all cached questions asynchronously using the intelligent agent. | |
| """ | |
| global cached_answers, processing_status | |
| if not cached_questions: | |
| return "No questions available. Please fetch questions first." | |
| processing_status["is_processing"] = True | |
| processing_status["progress"] = 0 | |
| processing_status["total"] = len(cached_questions) | |
| try: | |
| agent = IntelligentAgent(debug=True, model_name=model_name) | |
| cached_answers = {} | |
| for i, item in enumerate(cached_questions): | |
| if not processing_status["is_processing"]: # Check if cancelled | |
| break | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or question_text is None: | |
| continue | |
| try: | |
| answer = agent(question_text) | |
| cached_answers[task_id] = { | |
| "question": question_text, | |
| "answer": answer | |
| } | |
| except Exception as e: | |
| cached_answers[task_id] = { | |
| "question": question_text, | |
| "answer": f"AGENT ERROR: {e}" | |
| } | |
| processing_status["progress"] = i + 1 | |
| if progress_callback: | |
| progress_callback(i + 1, len(cached_questions)) | |
| except Exception as e: | |
| print(f"Error in generate_answers_async: {e}") | |
| finally: | |
| processing_status["is_processing"] = False | |
| def start_answer_generation(model_choice: str): | |
| """ | |
| Start the answer generation process in a separate thread. | |
| """ | |
| if processing_status["is_processing"]: | |
| return "Answer generation is already in progress.", None | |
| if not cached_questions: | |
| return "No questions available. Please fetch questions first.", None | |
| # Map model choice to actual model name | |
| model_map = { | |
| "Llama 3.1 8B": "meta-llama/Llama-3.1-8B-Instruct", | |
| "Llama 3.1 70B": "meta-llama/Llama-3.1-70B-Instruct", | |
| "Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3", | |
| "CodeLlama 7B": "codellama/CodeLlama-7b-Instruct-hf" | |
| } | |
| selected_model = model_map.get(model_choice, "meta-llama/Llama-3.1-8B-Instruct") | |
| # Start generation in background thread | |
| thread = threading.Thread(target=generate_answers_async, args=(selected_model,)) | |
| thread.daemon = True | |
| thread.start() | |
| return f"Answer generation started using {model_choice}. Check progress below.", None | |
| def get_generation_progress(): | |
| """ | |
| Get the current progress of answer generation. | |
| """ | |
| if not processing_status["is_processing"] and processing_status["progress"] == 0: | |
| return "Not started", None | |
| if processing_status["is_processing"]: | |
| progress = processing_status["progress"] | |
| total = processing_status["total"] | |
| status_msg = f"Generating answers... {progress}/{total} completed" | |
| return status_msg, None | |
| else: | |
| # Generation completed | |
| if cached_answers: | |
| # Create DataFrame with results | |
| display_data = [] | |
| for task_id, data in cached_answers.items(): | |
| display_data.append({ | |
| "Task ID": task_id, | |
| "Question": data["question"][:100] + "..." if len(data["question"]) > 100 else data["question"], | |
| "Generated Answer": data["answer"][:200] + "..." if len(data["answer"]) > 200 else data["answer"] | |
| }) | |
| df = pd.DataFrame(display_data) | |
| status_msg = f"Answer generation completed! {len(cached_answers)} answers ready for submission." | |
| return status_msg, df | |
| else: | |
| return "Answer generation completed but no answers were generated.", None | |
| def submit_cached_answers(profile: gr.OAuthProfile | None): | |
| """ | |
| Submit the cached answers to the evaluation API. | |
| """ | |
| global cached_answers | |
| if not profile: | |
| return "Please log in to Hugging Face first.", None | |
| if not cached_answers: | |
| return "No cached answers available. Please generate answers first.", None | |
| username = profile.username | |
| space_id = os.getenv("SPACE_ID") | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" if space_id else "Unknown" | |
| # Prepare submission payload | |
| answers_payload = [] | |
| for task_id, data in cached_answers.items(): | |
| answers_payload.append({ | |
| "task_id": task_id, | |
| "submitted_answer": data["answer"] | |
| }) | |
| submission_data = { | |
| "username": username.strip(), | |
| "agent_code": agent_code, | |
| "answers": answers_payload | |
| } | |
| # Submit to API | |
| api_url = DEFAULT_API_URL | |
| submit_url = f"{api_url}/submit" | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| # Create results DataFrame | |
| results_log = [] | |
| for task_id, data in cached_answers.items(): | |
| results_log.append({ | |
| "Task ID": task_id, | |
| "Question": data["question"], | |
| "Submitted Answer": data["answer"] | |
| }) | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| except: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| return f"Submission Failed: {error_detail}", None | |
| except requests.exceptions.Timeout: | |
| return "Submission Failed: The request timed out.", None | |
| except Exception as e: | |
| return f"Submission Failed: {e}", None | |
| def clear_cache(): | |
| """ | |
| Clear all cached data. | |
| """ | |
| global cached_answers, cached_questions, processing_status | |
| cached_answers = {} | |
| cached_questions = [] | |
| processing_status = {"is_processing": False, "progress": 0, "total": 0} | |
| return "Cache cleared successfully.", None | |
| # --- Enhanced Gradio Interface --- | |
| with gr.Blocks(title="Intelligent Agent with Conditional Search") as demo: | |
| gr.Markdown("# Intelligent Agent with Conditional Search") | |
| gr.Markdown("This agent uses an LLM to decide when search is needed, optimizing for both accuracy and efficiency.") | |
| with gr.Row(): | |
| gr.LoginButton() | |
| clear_btn = gr.Button("Clear Cache", variant="secondary") | |
| with gr.Tab("Step 1: Fetch Questions"): | |
| gr.Markdown("### Fetch Questions from API") | |
| fetch_btn = gr.Button("Fetch Questions", variant="primary") | |
| fetch_status = gr.Textbox(label="Fetch Status", lines=2, interactive=False) | |
| questions_table = gr.DataFrame(label="Available Questions", wrap=True) | |
| fetch_btn.click( | |
| fn=fetch_questions, | |
| outputs=[fetch_status, questions_table] | |
| ) | |
| with gr.Tab("Step 2: Generate Answers"): | |
| gr.Markdown("### Generate Answers with Intelligent Search Decision") | |
| with gr.Row(): | |
| model_choice = gr.Dropdown( | |
| choices=["Llama 3.1 8B", "Llama 3.1 70B", "Mistral 7B", "CodeLlama 7B"], | |
| value="Llama 3.1 8B", | |
| label="Select Model" | |
| ) | |
| generate_btn = gr.Button("Start Answer Generation", variant="primary") | |
| refresh_btn = gr.Button("Refresh Progress", variant="secondary") | |
| generation_status = gr.Textbox(label="Generation Status", lines=2, interactive=False) | |
| answers_preview = gr.DataFrame(label="Generated Answers Preview", wrap=True) | |
| generate_btn.click( | |
| fn=start_answer_generation, | |
| inputs=[model_choice], | |
| outputs=[generation_status, answers_preview] | |
| ) | |
| refresh_btn.click( | |
| fn=get_generation_progress, | |
| outputs=[generation_status, answers_preview] | |
| ) | |
| with gr.Tab("Step 3: Submit Results"): | |
| gr.Markdown("### Submit Generated Answers") | |
| submit_btn = gr.Button("Submit Cached Answers", variant="primary") | |
| submission_status = gr.Textbox(label="Submission Status", lines=5, interactive=False) | |
| final_results = gr.DataFrame(label="Final Submission Results", wrap=True) | |
| submit_btn.click( | |
| fn=submit_cached_answers, | |
| outputs=[submission_status, final_results] | |
| ) | |
| # Clear cache functionality | |
| clear_btn.click( | |
| fn=clear_cache, | |
| outputs=[fetch_status, questions_table] | |
| ) | |
| # Auto-refresh progress every 5 seconds when generation is active | |
| demo.load( | |
| fn=get_generation_progress, | |
| outputs=[generation_status, answers_preview] | |
| ) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " Intelligent Agent Starting " + "-"*30) | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") | |
| if space_host_startup: | |
| print(f"✅ SPACE_HOST found: {space_host_startup}") | |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
| else: | |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
| if space_id_startup: | |
| print(f"✅ SPACE_ID found: {space_id_startup}") | |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: | |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
| print("-"*(60 + len(" Intelligent Agent Starting ")) + "\n") | |
| print("Launching Intelligent Agent Interface...") | |
| demo.launch(debug=True, share=False) |