Spaces:
Runtime error
Runtime error
| import io | |
| from flask import Flask, Response, send_from_directory, jsonify, request, abort | |
| import os | |
| from flask_cors import CORS | |
| from multiprocessing import Queue | |
| import base64 | |
| from typing import Any, List, Dict, Tuple | |
| from multiprocessing import Queue | |
| import logging | |
| import sys | |
| from server.AudioTranscriber import AudioTranscriber | |
| from server.ActionProcessor import ActionProcessor | |
| from server.StandaloneApplication import StandaloneApplication | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| handlers=[logging.StreamHandler(sys.stdout)], | |
| ) | |
| # Get a logger for your app | |
| logger = logging.getLogger(__name__) | |
| # Use a directory in the user's home folder for static files | |
| STATIC_DIR = ( | |
| "/app/server/static" | |
| if os.getenv("DEBUG") != "true" | |
| else "/home/gab/work/gogogo/html" | |
| ) | |
| audio_queue: "Queue[io.BytesIO]" = Queue() | |
| text_queue: "Queue[str]" = Queue() | |
| action_queue: "Queue[str]" = Queue() | |
| app = Flask(__name__, static_folder=STATIC_DIR) | |
| _ = CORS( | |
| app, | |
| origins=["*"], | |
| methods=["GET", "POST", "OPTIONS"], | |
| allow_headers=["Content-Type", "Authorization"], | |
| ) | |
| def add_header(response: Response): | |
| # Add permissive CORS headers | |
| response.headers["Access-Control-Allow-Origin"] = "*" | |
| response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS" | |
| response.headers["Access-Control-Allow-Headers"] = "*" # Allow all headers | |
| # Cross-origin isolation headers | |
| response.headers["Cross-Origin-Embedder-Policy"] = "require-corp" | |
| response.headers["Cross-Origin-Opener-Policy"] = "same-origin" | |
| response.headers["Cross-Origin-Resource-Policy"] = "cross-origin" | |
| return response | |
| def serve_index(): | |
| # Handle logs=container query parameter | |
| if request.args.get("logs") == "container": | |
| files = ( | |
| os.listdir(app.static_folder) if os.path.exists(app.static_folder) else [] | |
| ) | |
| return jsonify( | |
| { | |
| "static_folder": app.static_folder, | |
| "exists": os.path.exists(app.static_folder), | |
| "files": files, | |
| "pwd": os.getcwd(), | |
| "user": os.getenv("USER"), | |
| } | |
| ) | |
| try: | |
| response = send_from_directory(app.static_folder, "index.html") | |
| response.headers["Cross-Origin-Opener-Policy"] = "same-origin" | |
| response.headers["Cross-Origin-Embedder-Policy"] = "require-corp" | |
| return response | |
| except FileNotFoundError: | |
| abort( | |
| 404, | |
| description=f"Static folder or index.html not found. Static folder: {app.static_folder}", | |
| ) | |
| def get_data(): | |
| return jsonify({"status": "success"}) | |
| def process_data(): | |
| try: | |
| # Check content type | |
| content_type = request.headers.get("Content-Type", "") | |
| # Handle different content types | |
| if "application/json" in content_type: | |
| data = request.get_json() | |
| audio_base64 = data.get("audio_chunk") | |
| elif "multipart/form-data" in content_type: | |
| audio_base64 = request.form.get("audio_chunk") | |
| else: | |
| # Try to get raw data | |
| audio_base64 = request.get_data().decode("utf-8") | |
| # Validate the incoming data | |
| if not audio_base64: | |
| return ( | |
| jsonify({"error": "Missing audio_chunk in request", "status": "error"}), | |
| 400, | |
| ) | |
| # Decode the base64 audio chunk | |
| try: | |
| audio_chunk = base64.b64decode(audio_base64) | |
| except Exception as e: | |
| return ( | |
| jsonify( | |
| { | |
| "error": f"Failed to decode audio chunk: {str(e)}", | |
| "status": "error", | |
| } | |
| ), | |
| 400, | |
| ) | |
| # Put the audio chunk in the queue for processing | |
| audio_queue.put(io.BytesIO(audio_chunk)) | |
| return jsonify( | |
| { | |
| "status": "success", | |
| } | |
| ) | |
| except Exception as e: | |
| return ( | |
| jsonify( | |
| {"error": f"Failed to process request: {str(e)}", "status": "error"} | |
| ), | |
| 500, | |
| ) | |
| def get_actions() -> Tuple[Response, int]: | |
| """Retrieve and clear all pending actions from the queue""" | |
| actions: List[Dict[str, Any]] = [] | |
| # Drain the queue into our actions list | |
| while not action_queue.empty(): | |
| try: | |
| actions.append(action_queue.get_nowait()) | |
| except Exception: | |
| break | |
| return jsonify({"actions": actions, "status": "success"}), 200 | |
| def serve_static(path: str): | |
| try: | |
| return send_from_directory(app.static_folder, path) | |
| except FileNotFoundError: | |
| abort(404, description=f"File {path} not found in static folder") | |
| if __name__ == "__main__": | |
| if os.path.exists(app.static_folder): | |
| logger.info(f"Static folder contents: {os.listdir(app.static_folder)}") | |
| os.makedirs(app.static_folder, exist_ok=True) | |
| # Start the audio transcriber thread | |
| transcriber = AudioTranscriber(audio_queue, text_queue) | |
| transcriber.start() | |
| # Start the action processor thread | |
| MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY") | |
| if not MISTRAL_API_KEY: | |
| raise ValueError("MISTRAL_API_KEY is not set") | |
| action_processor = ActionProcessor(text_queue, action_queue, MISTRAL_API_KEY) | |
| action_processor.start() | |
| options: Any = { | |
| "bind": "0.0.0.0:7860", | |
| "workers": 3, | |
| "worker_class": "sync", | |
| "timeout": 120, | |
| "forwarded_allow_ips": "*", | |
| "accesslog": None, # Disable access logging | |
| "errorlog": "-", # Keep error logging to stderr | |
| "capture_output": True, | |
| "enable_stdio_inheritance": True, | |
| } | |
| StandaloneApplication(app, options).run() | |