Spaces:
Runtime error
Runtime error
| from multiprocessing import Process, connection | |
| from typing import Any, Dict, Optional, List, Deque | |
| from collections import deque | |
| import pyaudio | |
| import io | |
| import wave | |
| class AudioRecorder: | |
| def __init__( | |
| self, | |
| output_pipe: connection.Connection, | |
| input_device_index: Optional[int] = None, | |
| ): | |
| self.CHUNK: int = 1024 | |
| self.FORMAT: int = pyaudio.paInt16 | |
| self.CHANNELS: int = 1 | |
| self.RATE: int = 44100 | |
| self.RECORD_SECONDS: int = 1 | |
| self.recording_process: Optional[Process] = None | |
| self.audio_chunks: Deque[bytes] = deque(maxlen=2) | |
| self.output_pipe: connection.Connection = output_pipe | |
| self.input_device_index: Optional[int] = input_device_index | |
| def list_microphones() -> List[Dict[str, Any]]: | |
| """List all available input devices with their properties""" | |
| p = pyaudio.PyAudio() | |
| devices = [] | |
| for i in range(p.get_device_count()): | |
| device_info = p.get_device_info_by_index(i) | |
| if device_info["maxInputChannels"] > 0: # Only input devices | |
| devices.append(device_info) | |
| p.terminate() | |
| return devices | |
| def create_wav_bytes(self, frames: List[bytes]) -> bytes: | |
| """Convert raw audio frames to WAV format in memory""" | |
| wav_buffer = io.BytesIO() | |
| with wave.open(wav_buffer, "wb") as wf: | |
| wf.setnchannels(self.CHANNELS) | |
| wf.setsampwidth(pyaudio.get_sample_size(self.FORMAT)) | |
| wf.setframerate(self.RATE) | |
| wf.writeframes(b"".join(frames)) | |
| return wav_buffer.getvalue() | |
| def record_audio(self) -> None: | |
| p = pyaudio.PyAudio() | |
| while True: | |
| stream = p.open( | |
| format=self.FORMAT, | |
| channels=self.CHANNELS, | |
| rate=self.RATE, | |
| input=True, | |
| input_device_index=self.input_device_index, | |
| frames_per_buffer=self.CHUNK, | |
| ) | |
| frames: List[bytes] = [] | |
| # Record for RECORD_SECONDS | |
| for _ in range(0, int(self.RATE / self.CHUNK * self.RECORD_SECONDS)): | |
| try: | |
| data = stream.read(self.CHUNK, exception_on_overflow=False) | |
| frames.append(data) | |
| except OSError as e: | |
| print(f"Warning: Audio input overflow occurred: {e}") | |
| continue | |
| stream.stop_stream() | |
| stream.close() | |
| # Convert to WAV format and add to rolling buffer | |
| wav_bytes = self.create_wav_bytes(frames) | |
| self.audio_chunks.append(wav_bytes) | |
| # Send chunks through pipe if we have enough data | |
| if len(self.audio_chunks) == 2: | |
| self.output_pipe.send(b"".join(self.audio_chunks)) | |
| def start_recording(self) -> None: | |
| """Démarre l'enregistrement dans un processus séparé""" | |
| self.recording_process = Process(target=self.record_audio) | |
| self.recording_process.start() | |
| def stop_recording(self) -> None: | |
| """Arrête l'enregistrement""" | |
| if self.recording_process: | |
| self.recording_process.terminate() | |
| self.recording_process = None | |