Spaces:
Running
Running
| """ | |
| VibeVoice Gradio Demo - High-Quality Dialogue Generation Interface with Streaming Support | |
| """ | |
| import argparse, os, tempfile | |
| import torch, spaces | |
| import gradio as gr | |
| from transformers.utils import logging | |
| from transformers import set_seed | |
| from cached_path import cached_path | |
| from model import VibeVoiceDemo | |
| logging.set_verbosity_info() | |
| logger = logging.get_logger(__name__) | |
| DEFAULT_NUM_SPEAKERS = 1 | |
| model_local_dir= str(cached_path("hf://microsoft/VibeVoice-1.5B")) | |
| #model_local_dir= "./ckpts/vibevoice" | |
| #snapshot_download(repo_id="microsoft/VibeVoice-1.5B", local_dir=model_local_dir) | |
| def create_demo_interface(demo_instance: VibeVoiceDemo): | |
| """Create the Gradio interface with streaming support.""" | |
| custom_css = "" | |
| with gr.Blocks( | |
| title="VibeVoice - AI Podcast Generator", | |
| css=custom_css, | |
| theme=gr.themes.Soft( | |
| primary_hue="blue", | |
| secondary_hue="purple", | |
| neutral_hue="slate", | |
| ) | |
| ) as interface: | |
| # Header | |
| gr.HTML(""" | |
| <div class="main-header"> | |
| <h1>ποΈ Vibe Podcasting </h1> | |
| <p>Generating Long-form Multi-speaker AI Podcast with VibeVoice</p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| # Left column - Settings | |
| with gr.Column(scale=1, elem_classes="settings-card"): | |
| def process_and_refresh_voices(uploaded_files: list[tempfile._TemporaryFileWrapper]): | |
| if not uploaded_files: return [gr.update() for _ in speaker_selections] + [None] | |
| for f in uploaded_files: | |
| demo_instance.available_voices[os.path.basename(f.name)] = f.name | |
| new_choices = list(demo_instance.available_voices.keys()) | |
| return [gr.update(choices=new_choices) for _ in speaker_selections] + [None] | |
| gr.Markdown("### ποΈ **Podcast Settings**") | |
| # Number of speakers | |
| num_speakers = gr.Slider( | |
| minimum=1, | |
| maximum=4, | |
| step=1, | |
| value=DEFAULT_NUM_SPEAKERS, | |
| label="Number of Speakers", | |
| elem_classes="slider-container" | |
| ) | |
| # Speaker selection | |
| gr.Markdown("### π **Speaker Selection**") | |
| available_speaker_names = list(demo_instance.available_voices.keys()) | |
| #default_speakers = available_speaker_names[:4] if len(available_speaker_names) >= 4 else available_speaker_names | |
| #default_speakers = ['en-Alice_woman', 'en-Carter_man', 'en-Frank_man', 'en-Maya_woman'] | |
| default_speakers = available_speaker_names | |
| speaker_selections = [] | |
| for i in range(4): | |
| default_value = default_speakers[i] if i < len(default_speakers) else None | |
| speaker = gr.Dropdown( | |
| choices=available_speaker_names, | |
| value=default_value, | |
| label=f"Speaker {i+1}", | |
| visible=(i < DEFAULT_NUM_SPEAKERS), # Initially show only first 2 speakers | |
| elem_classes="speaker-item" | |
| ) | |
| speaker_selections.append(speaker) | |
| with gr.Accordion("π€ Upload Custom Voices", open=True): | |
| upload_audio = gr.File(label="Upload Voice Samples", file_count="multiple", file_types=["audio"]) | |
| process_upload_btn = gr.Button("Add Uploaded Voices to Speaker Selection") | |
| process_upload_btn.click(fn=process_and_refresh_voices, inputs=upload_audio, outputs=speaker_selections + [upload_audio]) | |
| # Advanced settings | |
| gr.Markdown("### βοΈ **Advanced Settings**") | |
| # Sampling parameters (contains all generation settings) | |
| with gr.Accordion("Generation Parameters", open=False): | |
| cfg_scale = gr.Slider( | |
| minimum=1.0, | |
| maximum=2.0, | |
| value=1.3, | |
| step=0.05, | |
| label="CFG Scale (Guidance Strength)", | |
| # info="Higher values increase adherence to text", | |
| elem_classes="slider-container" | |
| ) | |
| disable_voice_cloning = gr.Checkbox( | |
| value=False, | |
| label="Disable voice cloning (skip conditioning voice prompts)", | |
| info="When enabled, sets is_prefill=False so the model ignores provided speaker audio." | |
| ) | |
| # Right column - Generation | |
| with gr.Column(scale=2, elem_classes="generation-card"): | |
| gr.Markdown("### π **Script Input**") | |
| script_input = gr.Textbox( | |
| label="Conversation Script", | |
| placeholder="""Enter your podcast script here. You can format it as: | |
| Speaker 1: Welcome to our podcast today! | |
| Speaker 2: Thanks for having me. I'm excited to discuss... | |
| Or paste text directly and it will auto-assign speakers.""", | |
| lines=12, | |
| max_lines=20, | |
| elem_classes="script-input" | |
| ) | |
| # Button row with Random Example on the left and Generate on the right | |
| with gr.Row(): | |
| # Random example button (now on the left) | |
| random_example_btn = gr.Button( | |
| "π² Random Example", | |
| size="lg", | |
| variant="secondary", | |
| elem_classes="random-btn", | |
| scale=1 # Smaller width | |
| ) | |
| # Generate button (now on the right) | |
| generate_btn = gr.Button( | |
| "π Generate Podcast", | |
| size="lg", | |
| variant="primary", | |
| elem_classes="generate-btn", | |
| scale=2 # Wider than random button | |
| ) | |
| # Stop button | |
| stop_btn = gr.Button( | |
| "π Stop Generation", | |
| size="lg", | |
| variant="stop", | |
| elem_classes="stop-btn", | |
| visible=False | |
| ) | |
| # Streaming status indicator | |
| streaming_status = gr.HTML( | |
| value=""" | |
| <div style="background: linear-gradient(135deg, #dcfce7 0%, #bbf7d0 100%); | |
| border: 1px solid rgba(34, 197, 94, 0.3); | |
| border-radius: 8px; | |
| padding: 0.75rem; | |
| margin: 0.5rem 0; | |
| text-align: center; | |
| font-size: 0.9rem; | |
| color: #166534;"> | |
| <span class="streaming-indicator"></span> | |
| <strong>LIVE STREAMING</strong> - Audio is being generated in real-time | |
| </div> | |
| """, | |
| visible=False, | |
| elem_id="streaming-status" | |
| ) | |
| # Output section | |
| gr.Markdown("### π΅ **Generated Podcast**") | |
| # Complete audio output (non-streaming) | |
| complete_audio_output = gr.Audio( | |
| label="Complete Podcast", | |
| type="numpy", | |
| elem_classes="audio-output complete-audio-section", | |
| streaming=False, # Non-streaming mode | |
| autoplay=False, | |
| show_download_button=True, # Explicitly show download button | |
| #visible=False # Initially hidden, shown when audio is ready | |
| ) | |
| # Streaming audio output (outside of tabs for simpler handling) | |
| audio_output = gr.Audio( | |
| label="Streaming Audio (Real-time)", | |
| type="numpy", | |
| elem_classes="audio-output", | |
| streaming=True, # Enable streaming mode | |
| autoplay=True, | |
| show_download_button=False, # Explicitly show download button | |
| visible=True | |
| ) | |
| gr.Markdown(""" | |
| *π‘ **Streaming**: Audio plays as it's being generated (may have slight pauses)* | |
| *π‘ **Complete Audio**: Will appear below after generation finishes* | |
| """) | |
| # Generation log | |
| log_output = gr.Textbox( | |
| label="Generation Log", | |
| lines=8, | |
| max_lines=15, | |
| interactive=False, | |
| elem_classes="log-output" | |
| ) | |
| def update_speaker_visibility(num_speakers): | |
| updates = [] | |
| for i in range(4): | |
| updates.append(gr.update(visible=(i < num_speakers))) | |
| return updates | |
| num_speakers.change( | |
| fn=update_speaker_visibility, | |
| inputs=[num_speakers], | |
| outputs=speaker_selections | |
| ) | |
| # Main generation function with streaming | |
| def generate_podcast_wrapper(num_speakers, script, speaker_1, speaker_2, speaker_3, speaker_4, cfg_scale, disable_voice_cloning): | |
| """Wrapper function to handle the streaming generation call.""" | |
| try: | |
| speakers = [speaker_1, speaker_2, speaker_3, speaker_4] | |
| # Clear outputs and reset visibility at start | |
| yield None, gr.update(value=None, visible=False), "ποΈ Starting generation...", gr.update(visible=True), gr.update(visible=False), gr.update(visible=True) | |
| # The generator will yield multiple times | |
| final_log = "Starting generation..." | |
| for streaming_audio, complete_audio, log, streaming_visible in demo_instance.generate_podcast_streaming( | |
| num_speakers=int(num_speakers), | |
| script=script, | |
| speaker_1=speakers[0], | |
| speaker_2=speakers[1], | |
| speaker_3=speakers[2], | |
| speaker_4=speakers[3], | |
| cfg_scale=cfg_scale, | |
| disable_voice_cloning=disable_voice_cloning | |
| ): | |
| final_log = log | |
| # Check if we have complete audio (final yield) | |
| if complete_audio is not None: | |
| # Final state: clear streaming, show complete audio | |
| yield None, gr.update(value=complete_audio, visible=True), log, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
| else: | |
| # Streaming state: update streaming audio only | |
| if streaming_audio is not None: | |
| yield streaming_audio, None, log, streaming_visible, gr.update(visible=False), gr.update(visible=True) | |
| else: | |
| # No new audio, just update status | |
| yield None, None, log, streaming_visible, gr.update(visible=False), gr.update(visible=True) | |
| except Exception as e: | |
| error_msg = f"β A critical error occurred in the wrapper: {str(e)}" | |
| print(error_msg) | |
| import traceback | |
| traceback.print_exc() | |
| # Reset button states on error | |
| yield None, gr.update(value=None, visible=False), error_msg, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
| def stop_generation_handler(): | |
| """Handle stopping generation.""" | |
| demo_instance.stop_audio_generation() | |
| # Return values for: log_output, streaming_status, generate_btn, stop_btn | |
| return "π Generation stopped.", gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
| # Add a clear audio function | |
| def clear_audio_outputs(): | |
| """Clear both audio outputs before starting new generation.""" | |
| return None, None | |
| # Connect generation button with streaming outputs | |
| generate_btn.click( | |
| fn=clear_audio_outputs, | |
| inputs=[], | |
| outputs=[audio_output, complete_audio_output], | |
| queue=False | |
| ).then( # Immediate UI update to hide Generate, show Stop (non-queued) | |
| fn=lambda: (gr.update(visible=False), gr.update(visible=True)), | |
| inputs=[], | |
| outputs=[generate_btn, stop_btn], | |
| queue=False | |
| ).then( | |
| fn=generate_podcast_wrapper, | |
| inputs=[num_speakers, script_input] + speaker_selections + [cfg_scale, disable_voice_cloning], | |
| outputs=[audio_output, complete_audio_output, log_output, streaming_status, generate_btn, stop_btn], | |
| queue=True # Enable Gradio's built-in queue | |
| ) | |
| # Connect stop button | |
| stop_btn.click( | |
| fn=stop_generation_handler, | |
| inputs=[], | |
| outputs=[log_output, streaming_status, generate_btn, stop_btn], | |
| queue=False # Don't queue stop requests | |
| ).then( | |
| # Clear both audio outputs after stopping | |
| fn=lambda: (None, None), | |
| inputs=[], | |
| outputs=[audio_output, complete_audio_output], | |
| queue=False | |
| ) | |
| # Function to randomly select an example | |
| def load_random_example(): | |
| """Randomly select and load an example script.""" | |
| import random | |
| # Get available examples | |
| if hasattr(demo_instance, 'example_scripts') and demo_instance.example_scripts: | |
| example_scripts = demo_instance.example_scripts | |
| else: | |
| # Fallback to default | |
| example_scripts = [ | |
| [2, "Speaker 0: Welcome to our AI podcast demonstration!\nSpeaker 1: Thanks for having me. This is exciting!"] | |
| ] | |
| # Randomly select one | |
| if example_scripts: | |
| selected = random.choice(example_scripts) | |
| num_speakers_value = selected[0] | |
| script_value = selected[1] | |
| # Return the values to update the UI | |
| return num_speakers_value, script_value | |
| # Default values if no examples | |
| return 2, "" | |
| # Connect random example button | |
| random_example_btn.click( | |
| fn=load_random_example, | |
| inputs=[], | |
| outputs=[num_speakers, script_input], | |
| queue=False # Don't queue this simple operation | |
| ) | |
| # Add usage tips | |
| gr.Markdown(""" | |
| ### π‘ **Usage Tips** | |
| - Click **π Generate Podcast** to start audio generation | |
| - **Live Streaming** tab shows audio as it's generated (may have slight pauses) | |
| - **Complete Audio** tab provides the full, uninterrupted podcast after generation | |
| - During generation, you can click **π Stop Generation** to interrupt the process | |
| - The streaming indicator shows real-time generation progress | |
| """) | |
| # Add example scripts | |
| gr.Markdown("### π **Example Scripts**") | |
| # Use dynamically loaded examples if available, otherwise provide a default | |
| if hasattr(demo_instance, 'example_scripts') and demo_instance.example_scripts: | |
| example_scripts = demo_instance.example_scripts | |
| else: | |
| # Fallback to a simple default example if no scripts loaded | |
| example_scripts = [ | |
| [1, "Speaker 1: Welcome to our AI podcast demonstration! This is a sample script showing how VibeVoice can generate natural-sounding speech."] | |
| ] | |
| gr.Examples( | |
| examples=example_scripts, | |
| inputs=[num_speakers, script_input], | |
| label="Try these example scripts:" | |
| ) | |
| # --- Risks & limitations (footer) --- | |
| gr.Markdown( | |
| """ | |
| ## Risks and limitations | |
| While efforts have been made to optimize it through various techniques, it may still produce outputs that are unexpected, biased, or inaccurate. VibeVoice inherits any biases, errors, or omissions produced by its base model (specifically, Qwen2.5 1.5b in this release). | |
| Potential for Deepfakes and Disinformation: High-quality synthetic speech can be misused to create convincing fake audio content for impersonation, fraud, or spreading disinformation. Users must ensure transcripts are reliable, check content accuracy, and avoid using generated content in misleading ways. Users are expected to use the generated content and to deploy the models in a lawful manner, in full compliance with all applicable laws and regulations in the relevant jurisdictions. It is best practice to disclose the use of AI when sharing AI-generated content. | |
| """, | |
| elem_classes="generation-card", # ε―ιοΌε€η¨ε‘ηζ ·εΌ | |
| ) | |
| return interface | |
| def parse_args(): | |
| parser = argparse.ArgumentParser(description="VibeVoice Gradio Demo") | |
| parser.add_argument( | |
| "--model_path", | |
| type=str, | |
| default=model_local_dir, | |
| help="Path to the VibeVoice model directory", | |
| ) | |
| parser.add_argument( | |
| "--device", | |
| type=str, | |
| default=("cuda" if torch.cuda.is_available() else ("mps" if torch.backends.mps.is_available() else "cpu")), | |
| help="Device for inference: cuda | mps | cpu", | |
| ) | |
| parser.add_argument( | |
| "--inference_steps", | |
| type=int, | |
| default=10, | |
| help="Number of inference steps for DDPM (not exposed to users)", | |
| ) | |
| parser.add_argument( | |
| "--share", | |
| action="store_true", | |
| help="Share the demo publicly via Gradio", | |
| ) | |
| parser.add_argument( | |
| "--port", | |
| type=int, | |
| default=7860, | |
| help="Port to run the demo on", | |
| ) | |
| parser.add_argument( | |
| "--checkpoint_path", | |
| type=str, | |
| default=None, | |
| help="Path to a fine-tuned checkpoint directory containing LoRA adapters (optional)", | |
| ) | |
| return parser.parse_args() | |
| def main(): | |
| """Main function to run the demo.""" | |
| args = parse_args() | |
| set_seed(42) # Set a fixed seed for reproducibility | |
| print("ποΈ Initializing VibeVoice Demo with Streaming Support...") | |
| # Initialize demo instance | |
| demo_instance = VibeVoiceDemo( | |
| model_path=args.model_path, | |
| device=args.device, | |
| inference_steps=args.inference_steps, | |
| adapter_path=args.checkpoint_path, | |
| ) | |
| # Create interface | |
| interface = create_demo_interface(demo_instance) | |
| print(f"π Launching demo on port {args.port}") | |
| print(f"π Model path: {args.model_path}") | |
| print(f"π Available voices: {len(demo_instance.available_voices)}") | |
| print(f"π΄ Streaming mode: ENABLED") | |
| print(f"π Session isolation: ENABLED") | |
| # Launch the interface | |
| try: | |
| interface.queue( | |
| max_size=10, # Maximum queue size | |
| default_concurrency_limit=1 # Process one request at a time | |
| ).launch( | |
| #share= args.share, | |
| #server_port=args.port, | |
| #server_name="0.0.0.0" if args.share else "127.0.0.1", | |
| show_error=True, | |
| mcp_server=True, | |
| ) | |
| except KeyboardInterrupt: | |
| print("\nπ Shutting down gracefully...") | |
| except Exception as e: | |
| print(f"β Server error: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() |