Spaces:
Paused
Paused
| from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| import outetts | |
| import io | |
| import json | |
| import base64 | |
| import struct | |
| import os | |
| # Initialize the interface | |
| interface = outetts.Interface( | |
| config=outetts.ModelConfig.auto_config( | |
| model=outetts.Models.VERSION_1_0_SIZE_1B, | |
| # For llama.cpp backend | |
| #backend=outetts.Backend.LLAMACPP, | |
| #quantization=outetts.LlamaCppQuantization.FP16 | |
| # For transformers backend | |
| backend=outetts.Backend.HF, | |
| ) | |
| ) | |
| # Load the default speaker profile | |
| speaker = interface.load_default_speaker("EN-FEMALE-1-NEUTRAL") | |
| app = FastAPI() | |
| def greet_json(): | |
| return {"Hello": "World!"} | |
| async def websocket_tts(websocket: WebSocket): | |
| await websocket.accept() | |
| try: | |
| while True: | |
| # Empfange Text-Chunk vom Client | |
| data = await websocket.receive_text() | |
| # Status: Warming up | |
| await websocket.send_text(json.dumps({"generation_status": "Warming up TTS model"})) | |
| output = interface.generate( | |
| config=outetts.GenerationConfig( | |
| text=data, | |
| generation_type=outetts.GenerationType.CHUNKED, | |
| speaker=speaker, | |
| sampler_config=outetts.SamplerConfig( | |
| temperature=0.4 | |
| ), | |
| ) | |
| ) | |
| # Status: Generating linguistic features | |
| await websocket.send_text(json.dumps({"generation_status": "Generating linguistic features"})) | |
| # Save to buffer | |
| import uuid | |
| temp_path = f"temp_{uuid.uuid4().hex}.wav" | |
| output.save(temp_path) | |
| chunk_size = 4096 | |
| try: | |
| with open(temp_path, "rb") as f: | |
| wav_data = f.read() | |
| # WAV header is typically 44 bytes, but let's detect it robustly | |
| # Find the end of the header (data chunk) | |
| if wav_data[:4] != b'RIFF' or wav_data[8:12] != b'WAVE': | |
| raise ValueError("Not a valid WAV file") | |
| # Find 'data' subchunk | |
| data_offset = wav_data.find(b'data') | |
| if data_offset == -1: | |
| raise ValueError("No 'data' chunk found in WAV file") | |
| header_end = data_offset + 8 # 'data' + size (4 bytes) | |
| wav_header = bytearray(wav_data[:header_end]) | |
| pcm_data = wav_data[header_end:] | |
| # Patch header: set data length to 0xFFFFFFFF (unknown/streaming) | |
| wav_header[data_offset+4:data_offset+8] = (0xFFFFFFFF).to_bytes(4, 'little') | |
| # Send header + first PCM chunk | |
| first_chunk = pcm_data[:chunk_size] | |
| audio_b64 = base64.b64encode(wav_header + first_chunk).decode("ascii") | |
| await websocket.send_text(json.dumps({ | |
| "data": { | |
| "audio_bytes": audio_b64, | |
| "duration": None, | |
| "request_finished": False | |
| } | |
| })) | |
| # Send rest of PCM data in chunks (without header) | |
| idx = chunk_size | |
| while idx < len(pcm_data): | |
| chunk = pcm_data[idx:idx+chunk_size] | |
| if not chunk: | |
| break | |
| audio_b64 = base64.b64encode(chunk).decode("ascii") | |
| await websocket.send_text(json.dumps({ | |
| "data": { | |
| "audio_bytes": audio_b64, | |
| "duration": None, | |
| "request_finished": False | |
| } | |
| })) | |
| idx += chunk_size | |
| finally: | |
| try: | |
| os.remove(temp_path) | |
| except FileNotFoundError: | |
| pass | |
| # Final event | |
| await websocket.send_text(json.dumps({ | |
| "data": { | |
| "audio_bytes": "", | |
| "duration": None, | |
| "request_finished": True | |
| } | |
| })) | |
| except WebSocketDisconnect: | |
| pass | |
| except Exception as e: | |
| await websocket.send_text(json.dumps({"error": str(e)})) |