Spaces:
Runtime error
Runtime error
| import click | |
| import librosa | |
| import numpy as np | |
| import pyloudnorm as pyln | |
| import torch | |
| import torchaudio | |
| from pathlib import Path | |
| from tqdm import tqdm | |
| from torch.amp import autocast | |
| from rift_svc import DiT, RF | |
| from rift_svc.feature_extractors import HubertModelWithFinalProj, RMSExtractor, get_mel_spectrogram | |
| from rift_svc.nsf_hifigan import NsfHifiGAN | |
| from rift_svc.rmvpe import RMVPE | |
| from rift_svc.utils import linear_interpolate_tensor, post_process_f0, f0_ensemble, f0_ensemble_light, get_f0_pw, get_f0_pm | |
| from slicer import Slicer | |
| torch.set_grad_enabled(False) | |
| def extract_state_dict(ckpt): | |
| state_dict = ckpt['state_dict'] | |
| new_state_dict = {} | |
| for k, v in state_dict.items(): | |
| if k.startswith('model.'): | |
| new_k = k.replace('model.', '') | |
| new_state_dict[new_k] = v | |
| spk2idx = ckpt['hyper_parameters']['cfg']['spk2idx'] | |
| model_cfg = ckpt['hyper_parameters']['cfg']['model'] | |
| dataset_cfg = ckpt['hyper_parameters']['cfg']['dataset'] | |
| return new_state_dict, spk2idx, model_cfg, dataset_cfg | |
| def load_models(model_path, device, use_fp16=True): | |
| """Load all required models and return them""" | |
| click.echo("Loading models...") | |
| # Load the conversion model | |
| ckpt = torch.load(model_path, map_location='cpu') | |
| state_dict, spk2idx, dit_cfg, dataset_cfg = extract_state_dict(ckpt) | |
| transformer = DiT(num_speaker=len(spk2idx), **dit_cfg) | |
| svc_model = RF(transformer=transformer) | |
| svc_model.load_state_dict(state_dict) | |
| svc_model = svc_model.to(device) | |
| # Convert to half precision (float16) if specified and using CUDA | |
| if use_fp16 and device != 'cpu': | |
| svc_model = svc_model.half() | |
| svc_model.eval() | |
| # Load additional models | |
| vocoder = NsfHifiGAN('pretrained/nsf_hifigan_44.1k_hop512_128bin_2024.02/model.ckpt').to(device) | |
| rmvpe = RMVPE(model_path="pretrained/rmvpe/model.pt", hop_length=160, device=device) | |
| hubert = HubertModelWithFinalProj.from_pretrained("pretrained/content-vec-best").to(device) | |
| rms_extractor = RMSExtractor().to(device) | |
| # Convert additional models to half precision if specified and using CUDA | |
| if use_fp16 and device != 'cpu': | |
| vocoder = vocoder.half() | |
| hubert = hubert.half() | |
| rms_extractor = rms_extractor.half() | |
| # RMVPE model is handled separately as it may have custom implementation | |
| return svc_model, vocoder, rmvpe, hubert, rms_extractor, spk2idx, dataset_cfg | |
| def load_audio(file_path, target_sr): | |
| """Load and preprocess audio file""" | |
| click.echo("Loading audio...") | |
| audio, sr = torchaudio.load(file_path) | |
| if sr != target_sr: | |
| audio = torchaudio.functional.resample(audio, sr, target_sr) | |
| if len(audio.shape) > 1: | |
| audio = audio.mean(dim=0, keepdim=True) | |
| return audio.numpy().squeeze() | |
| def apply_fade(audio, fade_samples, fade_in=True): | |
| """Apply fade in/out using half of a Hanning window""" | |
| fade_window = np.hanning(fade_samples * 2) | |
| if fade_in: | |
| fade_curve = fade_window[:fade_samples] | |
| else: | |
| fade_curve = fade_window[fade_samples:] | |
| audio[:fade_samples] *= fade_curve | |
| return audio | |
| def extract_features(audio_segment, sample_rate, hop_length, rmvpe, hubert, rms_extractor, | |
| device, key_shift=0, ds_cfg_strength=0.0, cvec_downsample_rate=2, target_loudness=-18.0, | |
| robust_f0=0, use_fp16=True): | |
| """Extract all required features from an audio segment""" | |
| # Normalize input segment | |
| meter = pyln.Meter(sample_rate) | |
| original_loudness = meter.integrated_loudness(audio_segment) | |
| normalized_audio = pyln.normalize.loudness(audio_segment, original_loudness, target_loudness) | |
| # Handle potential clipping | |
| max_amp = np.max(np.abs(normalized_audio)) | |
| if max_amp > 1.0: | |
| normalized_audio = normalized_audio * (0.99 / max_amp) | |
| audio_tensor = torch.from_numpy(normalized_audio).float().unsqueeze(0).to(device) | |
| audio_16khz = torch.from_numpy(librosa.resample(normalized_audio, orig_sr=sample_rate, target_sr=16000)).float().unsqueeze(0).to(device) | |
| # Convert to half precision if specified and using CUDA | |
| if use_fp16 and device.type != 'cpu': | |
| audio_tensor = audio_tensor.half() | |
| audio_16khz = audio_16khz.half() | |
| # Extract mel spectrogram | |
| mel = get_mel_spectrogram( | |
| audio_tensor, | |
| sampling_rate=sample_rate, | |
| n_fft=2048, | |
| num_mels=128, | |
| hop_size=512, | |
| win_size=2048, | |
| fmin=40, | |
| fmax=16000 | |
| ).transpose(1, 2) | |
| # Extract content vector | |
| device_type = 'cuda' if device.type == 'cuda' else 'cpu' | |
| with autocast(device_type=device_type, enabled=use_fp16): | |
| cvec = hubert(audio_16khz)["last_hidden_state"].squeeze(0) | |
| cvec = linear_interpolate_tensor(cvec, mel.shape[1])[None, :] | |
| # Create bad_cvec (downsampled) for classifier-free guidance | |
| if ds_cfg_strength > 0: | |
| cvec_ds = cvec.clone() | |
| # Downsample and then interpolate back, similar to dataset.py | |
| cvec_ds = cvec_ds[0, ::2, :] # Take every other frame | |
| cvec_ds = linear_interpolate_tensor(cvec_ds, cvec_ds.shape[0]//cvec_downsample_rate) | |
| cvec_ds = linear_interpolate_tensor(cvec_ds, mel.shape[1])[None, :] | |
| else: | |
| cvec_ds = None | |
| # Extract f0 | |
| if robust_f0 > 0: | |
| # Parameters for F0 extraction | |
| time_step = hop_length / sample_rate | |
| f0_min = 40 | |
| f0_max = 1100 | |
| # Extract F0 using multiple methods | |
| with autocast(device_type=device_type, enabled=use_fp16): | |
| rmvpe_f0 = rmvpe.infer_from_audio(audio_tensor, sample_rate=sample_rate, device=device) | |
| rmvpe_f0 = post_process_f0(rmvpe_f0, sample_rate, hop_length, mel.shape[1], silence_front=0.0, cut_last=False) | |
| pw_f0 = get_f0_pw(normalized_audio, sample_rate, time_step, f0_min, f0_max) | |
| pmac_f0 = get_f0_pm(normalized_audio, sample_rate, time_step, f0_min, f0_max) | |
| if robust_f0 == 1: | |
| # Level 1: Light ensemble that preserves expressiveness | |
| with autocast(device_type=device_type, enabled=use_fp16): | |
| rms_np = rms_extractor(audio_tensor).squeeze().cpu().numpy() | |
| f0 = f0_ensemble_light(rmvpe_f0, pw_f0, pmac_f0, rms=rms_np) | |
| else: | |
| # Level 2: Strong ensemble with more filtering | |
| f0 = f0_ensemble(rmvpe_f0, pw_f0, pmac_f0) | |
| else: | |
| # Level 0: Use only RMVPE for F0 extraction (original method) | |
| device_type = 'cuda' if device.type == 'cuda' else 'cpu' | |
| with autocast(device_type=device_type, enabled=use_fp16): | |
| f0 = rmvpe.infer_from_audio(audio_tensor, sample_rate=sample_rate, device=device) | |
| f0 = post_process_f0(f0, sample_rate, hop_length, mel.shape[1], silence_front=0.0, cut_last=False) | |
| if key_shift != 0: | |
| f0 = f0 * 2 ** (key_shift / 12) | |
| f0 = torch.from_numpy(f0).float().to(device)[None, :] | |
| # Extract RMS | |
| rms = rms_extractor(audio_tensor) | |
| return mel, cvec, cvec_ds, f0, rms, original_loudness | |
| def run_inference( | |
| model, mel, cvec, f0, rms, cvec_ds, spk_id, | |
| infer_steps, ds_cfg_strength, spk_cfg_strength, | |
| skip_cfg_strength, cfg_skip_layers, cfg_rescale, | |
| sliced_inference=False, use_fp16=True, frame_lengths=None | |
| ): | |
| """Run the actual inference through the model""" | |
| device_type = 'cuda' if mel.device.type == 'cuda' else 'cpu' | |
| if frame_lengths is not None: | |
| # Use batch inference with frame lengths | |
| with autocast(device_type=device_type, enabled=use_fp16): | |
| mel_out, _ = model.sample( | |
| src_mel=mel, | |
| spk_id=spk_id, | |
| f0=f0, | |
| rms=rms, | |
| cvec=cvec, | |
| steps=infer_steps, | |
| bad_cvec=cvec_ds, | |
| ds_cfg_strength=ds_cfg_strength, | |
| spk_cfg_strength=spk_cfg_strength, | |
| skip_cfg_strength=skip_cfg_strength, | |
| cfg_skip_layers=cfg_skip_layers, | |
| cfg_rescale=cfg_rescale, | |
| frame_len=frame_lengths, | |
| ) | |
| return mel_out | |
| elif sliced_inference: | |
| # Use sliced inference for long segments | |
| sliced_len = 256 | |
| mel_crossfade_len = 8 # Number of frames to crossfade in mel domain | |
| # If the segment is shorter than one slice, just process it directly | |
| if mel.shape[1] <= sliced_len: | |
| with autocast(device_type=device_type, enabled=use_fp16): | |
| mel_out, _ = model.sample( | |
| src_mel=mel, | |
| spk_id=spk_id, | |
| f0=f0, | |
| rms=rms, | |
| cvec=cvec, | |
| steps=infer_steps, | |
| bad_cvec=cvec_ds, | |
| ds_cfg_strength=ds_cfg_strength, | |
| spk_cfg_strength=spk_cfg_strength, | |
| skip_cfg_strength=skip_cfg_strength, | |
| cfg_skip_layers=cfg_skip_layers, | |
| cfg_rescale=cfg_rescale, | |
| ) | |
| return mel_out | |
| # Create a tensor to hold the full output with crossfading | |
| full_mel_out = torch.zeros_like(mel) | |
| # Process each slice | |
| for i in range(0, mel.shape[1], sliced_len - mel_crossfade_len): | |
| # Determine slice boundaries | |
| start_idx = i | |
| end_idx = min(i + sliced_len, mel.shape[1]) | |
| # Skip if we're at the end | |
| if start_idx >= mel.shape[1]: | |
| break | |
| # Extract slices for this window | |
| mel_slice = mel[:, start_idx:end_idx, :] | |
| cvec_slice = cvec[:, start_idx:end_idx, :] | |
| f0_slice = f0[:, start_idx:end_idx] | |
| rms_slice = rms[:, start_idx:end_idx] | |
| # Slice the bad_cvec if it exists | |
| cvec_ds_slice = None | |
| if cvec_ds is not None: | |
| cvec_ds_slice = cvec_ds[:, start_idx:end_idx, :] | |
| # Process with model using mixed precision if enabled | |
| with autocast(device_type=device_type, enabled=use_fp16): | |
| mel_out_slice, _ = model.sample( | |
| src_mel=mel_slice, | |
| spk_id=spk_id, | |
| f0=f0_slice, | |
| rms=rms_slice, | |
| cvec=cvec_slice, | |
| steps=infer_steps, | |
| bad_cvec=cvec_ds_slice, | |
| ds_cfg_strength=ds_cfg_strength, | |
| spk_cfg_strength=spk_cfg_strength, | |
| skip_cfg_strength=skip_cfg_strength, | |
| cfg_skip_layers=cfg_skip_layers, | |
| cfg_rescale=cfg_rescale, | |
| ) | |
| # Create crossfade weights | |
| slice_len = end_idx - start_idx | |
| # Apply different strategies depending on position | |
| if i == 0: # First slice | |
| # No crossfade at the beginning | |
| weights = torch.ones((1, slice_len, 1), device=mel.device) | |
| if i + sliced_len < mel.shape[1]: # If not the last slice too | |
| # Fade out at the end - use the minimum of slice_len and mel_crossfade_len | |
| actual_crossfade_len = min(mel_crossfade_len, slice_len) | |
| if actual_crossfade_len > 0: # Only apply if we have space | |
| fade_out = torch.linspace(1, 0, actual_crossfade_len, device=mel.device) | |
| weights[:, -actual_crossfade_len:, :] = fade_out.view(1, -1, 1) | |
| elif end_idx >= mel.shape[1]: # Last slice | |
| # Fade in at the beginning - use the minimum of slice_len and mel_crossfade_len | |
| weights = torch.ones((1, slice_len, 1), device=mel.device) | |
| actual_crossfade_len = min(mel_crossfade_len, slice_len) | |
| if actual_crossfade_len > 0: # Only apply if we have space | |
| fade_in = torch.linspace(0, 1, actual_crossfade_len, device=mel.device) | |
| weights[:, :actual_crossfade_len, :] = fade_in.view(1, -1, 1) | |
| else: # Middle slice | |
| # Crossfade both ends | |
| weights = torch.ones((1, slice_len, 1), device=mel.device) | |
| # Fade in at the beginning | |
| if mel_crossfade_len > 0: # Only apply if we have space | |
| fade_in = torch.linspace(0, 1, mel_crossfade_len, device=mel.device) | |
| weights[:, :mel_crossfade_len, :] = fade_in.view(1, -1, 1) | |
| # Fade out at the end | |
| if mel_crossfade_len > 0: # Only apply if we have space | |
| fade_out = torch.linspace(1, 0, mel_crossfade_len, device=mel.device) | |
| weights[:, -mel_crossfade_len:, :] = fade_out.view(1, -1, 1) | |
| # Apply weighted update to the output | |
| full_mel_out[:, start_idx:end_idx, :] += weights * mel_out_slice | |
| # Return the full crossfaded output | |
| mel_out = full_mel_out | |
| else: | |
| # Process the entire segment at once with mixed precision if enabled | |
| with autocast(device_type=device_type, enabled=use_fp16): | |
| mel_out, _ = model.sample( | |
| src_mel=mel, | |
| spk_id=spk_id, | |
| f0=f0, | |
| rms=rms, | |
| cvec=cvec, | |
| steps=infer_steps, | |
| bad_cvec=cvec_ds, | |
| ds_cfg_strength=ds_cfg_strength, | |
| spk_cfg_strength=spk_cfg_strength, | |
| skip_cfg_strength=skip_cfg_strength, | |
| cfg_skip_layers=cfg_skip_layers, | |
| cfg_rescale=cfg_rescale, | |
| ) | |
| return mel_out | |
| def generate_audio(vocoder, mel_out, f0, original_loudness=None, restore_loudness=True, use_fp16=True): | |
| """Generate audio from mel spectrogram using vocoder""" | |
| # Use mixed precision for vocoder inference if enabled | |
| device_type = 'cuda' if mel_out.device.type == 'cuda' else 'cpu' | |
| with autocast(device_type=device_type, enabled=use_fp16): | |
| audio_out = vocoder(mel_out.transpose(1, 2), f0) | |
| audio_out = audio_out.squeeze().cpu().numpy() | |
| if restore_loudness and original_loudness is not None: | |
| # Restore original loudness | |
| meter = pyln.Meter(44100) | |
| audio_out_loudness = meter.integrated_loudness(audio_out) | |
| audio_out = pyln.normalize.loudness(audio_out, audio_out_loudness, original_loudness) | |
| # Handle clipping | |
| max_amp = np.max(np.abs(audio_out)) | |
| if max_amp > 1.0: | |
| audio_out = audio_out * (0.99 / max_amp) | |
| return audio_out | |
| def process_segment( | |
| audio_segment, | |
| svc_model, vocoder, rmvpe, hubert, rms_extractor, | |
| speaker_id, sample_rate, hop_length, device, | |
| key_shift=0, | |
| infer_steps=32, | |
| ds_cfg_strength=0.0, | |
| spk_cfg_strength=0.0, | |
| skip_cfg_strength=0.0, | |
| cfg_skip_layers=None, | |
| cfg_rescale=0.7, | |
| cvec_downsample_rate=2, | |
| target_loudness=-18.0, | |
| restore_loudness=True, | |
| sliced_inference=False, | |
| robust_f0=0, | |
| use_fp16=True | |
| ): | |
| """Process a single audio segment and return the converted audio""" | |
| # Extract features | |
| mel, cvec, cvec_ds, f0, rms, original_loudness = extract_features( | |
| audio_segment, sample_rate, hop_length, rmvpe, hubert, rms_extractor, | |
| device, key_shift, ds_cfg_strength, cvec_downsample_rate, target_loudness, | |
| robust_f0, use_fp16 | |
| ) | |
| # Prepare speaker ID - convert to tensor | |
| spk_id = torch.LongTensor([speaker_id]).to(device) | |
| # Run inference to generate output mel spectrogram | |
| mel_out = run_inference( | |
| model=svc_model, | |
| mel=mel, | |
| cvec=cvec, | |
| f0=f0, | |
| rms=rms, | |
| cvec_ds=cvec_ds, | |
| spk_id=spk_id, | |
| infer_steps=infer_steps, | |
| ds_cfg_strength=ds_cfg_strength, | |
| spk_cfg_strength=spk_cfg_strength, | |
| skip_cfg_strength=skip_cfg_strength, | |
| cfg_skip_layers=cfg_skip_layers, | |
| cfg_rescale=cfg_rescale, | |
| sliced_inference=sliced_inference, | |
| use_fp16=use_fp16 | |
| ) | |
| # Generate audio | |
| audio_out = generate_audio( | |
| vocoder, mel_out, f0, | |
| original_loudness if restore_loudness else None, | |
| restore_loudness, use_fp16 | |
| ) | |
| return audio_out | |
| def pad_tensor_to_length(tensor, length): | |
| """Pad a tensor to the specified length along the sequence dimension (dim=1)""" | |
| curr_len = tensor.shape[1] | |
| if curr_len >= length: | |
| return tensor | |
| pad_len = length - curr_len | |
| if tensor.dim() == 2: | |
| padding = (0, pad_len) | |
| elif tensor.dim() == 3: | |
| padding = (0, 0, 0, pad_len) | |
| else: | |
| raise ValueError(f"Unsupported tensor dimension: {tensor.dim()}") | |
| padded = torch.nn.functional.pad(tensor, padding, "constant", 0) | |
| return padded | |
| def batch_process_segments( | |
| segments_with_pos, | |
| svc_model, vocoder, rmvpe, hubert, rms_extractor, | |
| speaker_id, sample_rate, hop_length, device, | |
| key_shift=0, | |
| infer_steps=32, | |
| ds_cfg_strength=0.0, | |
| spk_cfg_strength=0.0, | |
| skip_cfg_strength=0.0, | |
| cfg_skip_layers=None, | |
| cfg_rescale=0.7, | |
| cvec_downsample_rate=2, | |
| target_loudness=-18.0, | |
| restore_loudness=True, | |
| robust_f0=0, | |
| use_fp16=True, | |
| batch_size=1, | |
| gr_progress=None, | |
| progress_desc=None | |
| ): | |
| """Process audio segments in batches for faster inference""" | |
| if batch_size <= 1: | |
| results = [] | |
| for i, (start_sample, chunk) in enumerate(tqdm(segments_with_pos, desc="Processing segments")): | |
| if gr_progress is not None: | |
| gr_progress(0.2 + (0.7 * (i / len(segments_with_pos))), desc=progress_desc.format(i+1, len(segments_with_pos))) | |
| audio_out = process_segment( | |
| chunk, svc_model, vocoder, rmvpe, hubert, rms_extractor, | |
| speaker_id, sample_rate, hop_length, device, | |
| key_shift, infer_steps, ds_cfg_strength, spk_cfg_strength, | |
| skip_cfg_strength, cfg_skip_layers, cfg_rescale, | |
| cvec_downsample_rate, target_loudness, restore_loudness, | |
| robust_f0, use_fp16 | |
| ) | |
| results.append((start_sample, audio_out, len(chunk))) | |
| return results | |
| sorted_with_idx = sorted(enumerate(segments_with_pos), key=lambda x: len(x[1][1])) | |
| sorted_segments = [] | |
| original_indices = [] | |
| for orig_idx, (pos, chunk) in sorted_with_idx: | |
| original_indices.append(orig_idx) | |
| sorted_segments.append((pos, chunk)) | |
| batched_segments = [sorted_segments[i:i + batch_size] for i in range(0, len(sorted_segments), batch_size)] | |
| all_results = [] | |
| for batch_idx, batch in enumerate(tqdm(batched_segments, desc="Processing batches")): | |
| if gr_progress is not None: | |
| gr_progress( | |
| 0.2 + (0.7 * (batch_idx / len(batched_segments))), | |
| desc=progress_desc.format(batch_idx+1, len(batched_segments))) | |
| batch_start_samples = [pos for pos, _ in batch] | |
| batch_chunks = [chunk for _, chunk in batch] | |
| batch_lengths = [len(chunk) for chunk in batch_chunks] | |
| batch_features = [] | |
| for chunk in batch_chunks: | |
| mel, cvec, cvec_ds, f0, rms, original_loudness = extract_features( | |
| chunk, sample_rate, hop_length, rmvpe, hubert, rms_extractor, | |
| device, key_shift, ds_cfg_strength, cvec_downsample_rate, target_loudness, | |
| robust_f0, use_fp16 | |
| ) | |
| batch_features.append({ | |
| 'mel': mel, | |
| 'cvec': cvec, | |
| 'cvec_ds': cvec_ds, | |
| 'f0': f0, | |
| 'rms': rms, | |
| 'original_loudness': original_loudness, | |
| 'length': mel.shape[1] | |
| }) | |
| max_length = max(feat['length'] for feat in batch_features) | |
| padded_mels = [] | |
| padded_cvecs = [] | |
| padded_f0s = [] | |
| padded_rmss = [] | |
| frame_lengths = [] | |
| original_loudness_values = [] | |
| if ds_cfg_strength > 0: | |
| padded_cvec_ds = [] | |
| for feat in batch_features: | |
| curr_len = feat['length'] | |
| frame_lengths.append(curr_len) | |
| padded_mels.append(pad_tensor_to_length(feat['mel'], max_length)) | |
| padded_cvecs.append(pad_tensor_to_length(feat['cvec'], max_length)) | |
| padded_f0s.append(pad_tensor_to_length(feat['f0'], max_length)) | |
| padded_rmss.append(pad_tensor_to_length(feat['rms'], max_length)) | |
| if ds_cfg_strength > 0: | |
| padded_cvec_ds.append(pad_tensor_to_length(feat['cvec_ds'], max_length)) | |
| original_loudness_values.append(feat['original_loudness']) | |
| batched_mel = torch.cat(padded_mels, dim=0) | |
| batched_cvec = torch.cat(padded_cvecs, dim=0) | |
| batched_f0 = torch.cat(padded_f0s, dim=0) | |
| batched_rms = torch.cat(padded_rmss, dim=0) | |
| if ds_cfg_strength > 0: | |
| batched_cvec_ds = torch.cat(padded_cvec_ds, dim=0) | |
| else: | |
| batched_cvec_ds = None | |
| frame_lengths = torch.tensor(frame_lengths, device=device) | |
| batch_spk_id = torch.LongTensor([speaker_id] * len(batch)).to(device) | |
| with torch.no_grad(): | |
| mel_out = run_inference( | |
| model=svc_model, | |
| mel=batched_mel, | |
| cvec=batched_cvec, | |
| f0=batched_f0, | |
| rms=batched_rms, | |
| cvec_ds=batched_cvec_ds, | |
| spk_id=batch_spk_id, | |
| infer_steps=infer_steps, | |
| ds_cfg_strength=ds_cfg_strength, | |
| spk_cfg_strength=spk_cfg_strength, | |
| skip_cfg_strength=skip_cfg_strength, | |
| cfg_skip_layers=cfg_skip_layers, | |
| cfg_rescale=cfg_rescale, | |
| frame_lengths=frame_lengths, | |
| use_fp16=use_fp16 | |
| ) | |
| with autocast(device_type='cuda' if device.type == 'cuda' else 'cpu', enabled=use_fp16): | |
| audio_out = vocoder(mel_out.transpose(1, 2), batched_f0) | |
| for i in range(len(batch)): | |
| expected_audio_length = batch_lengths[i] | |
| curr_audio = audio_out[i].squeeze().cpu().numpy() | |
| if len(curr_audio) > expected_audio_length: | |
| curr_audio = curr_audio[:expected_audio_length] | |
| elif len(curr_audio) < expected_audio_length: | |
| curr_audio = np.pad(curr_audio, (0, expected_audio_length - len(curr_audio)), 'constant') | |
| if restore_loudness: | |
| meter = pyln.Meter(44100, block_size=0.1) | |
| curr_loudness = meter.integrated_loudness(curr_audio) | |
| curr_audio = pyln.normalize.loudness(curr_audio, curr_loudness, original_loudness_values[i]) | |
| max_amp = np.max(np.abs(curr_audio)) | |
| if max_amp > 1.0: | |
| curr_audio = curr_audio * (0.99 / max_amp) | |
| expected_length = batch_lengths[i] | |
| all_results.append((batch_idx, i, batch_start_samples[i], curr_audio, expected_length, original_indices[batch_size * batch_idx + i])) | |
| all_results.sort(key=lambda x: x[5]) | |
| return [(pos, audio, length) for _, _, pos, audio, length, _ in all_results] | |
| def main( | |
| model, | |
| input, | |
| output, | |
| speaker, | |
| key_shift, | |
| device, | |
| infer_steps, | |
| ds_cfg_strength, | |
| spk_cfg_strength, | |
| skip_cfg_strength, | |
| cfg_skip_layers, | |
| cfg_rescale, | |
| cvec_downsample_rate, | |
| target_loudness, | |
| restore_loudness, | |
| fade_duration, | |
| sliced_inference, | |
| robust_f0, | |
| slicer_threshold, | |
| slicer_min_length, | |
| slicer_min_interval, | |
| slicer_hop_size, | |
| slicer_max_sil_kept, | |
| use_fp16, | |
| batch_size | |
| ): | |
| """Convert the voice in an audio file to a target speaker.""" | |
| # Setup device | |
| if device is None: | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| device = torch.device(device) | |
| # Load models | |
| svc_model, vocoder, rmvpe, hubert, rms_extractor, spk2idx, dataset_cfg = load_models(model, device, use_fp16) | |
| try: | |
| speaker_id = spk2idx[speaker] | |
| except KeyError: | |
| raise ValueError(f"Speaker {speaker} not found in the model's speaker list, valid speakers are {spk2idx.keys()}") | |
| # Get config from loaded model | |
| hop_length = 512 | |
| sample_rate = 44100 | |
| # Load audio | |
| audio = load_audio(input, sample_rate) | |
| # Initialize Slicer | |
| slicer = Slicer( | |
| sr=sample_rate, | |
| threshold=slicer_threshold, | |
| min_length=slicer_min_length, | |
| min_interval=slicer_min_interval, | |
| hop_size=slicer_hop_size, | |
| max_sil_kept=slicer_max_sil_kept | |
| ) | |
| # Step (1): Use slicer to segment the input audio and get positions | |
| click.echo("Slicing audio...") | |
| segments_with_pos = slicer.slice(audio) # Now returns list of (start_pos, chunk) | |
| if restore_loudness: | |
| click.echo(f"Will restore loudness to original") | |
| # Calculate fade size in samples | |
| fade_samples = int(fade_duration * sample_rate / 1000) | |
| # Process segments | |
| if batch_size > 1: | |
| click.echo(f"Processing {len(segments_with_pos)} segments with batch size {batch_size}...") | |
| result_audio = np.zeros(len(audio) + fade_samples) # Extra space for potential overlap | |
| with torch.no_grad(): | |
| processed_segments = batch_process_segments( | |
| segments_with_pos, svc_model, vocoder, rmvpe, hubert, rms_extractor, | |
| speaker_id, sample_rate, hop_length, device, | |
| key_shift, infer_steps, ds_cfg_strength, spk_cfg_strength, | |
| skip_cfg_strength, cfg_skip_layers, cfg_rescale, | |
| cvec_downsample_rate, target_loudness, restore_loudness, | |
| robust_f0, use_fp16, batch_size | |
| ) | |
| for idx, (start_sample, audio_out, expected_length) in enumerate(processed_segments): | |
| # Apply fades | |
| if idx > 0: # Not first segment | |
| audio_out = apply_fade(audio_out.copy(), fade_samples, fade_in=True) | |
| result_audio[start_sample:start_sample + fade_samples] *= \ | |
| np.linspace(1, 0, fade_samples) # Fade out previous | |
| if idx < len(processed_segments) - 1: # Not last segment | |
| audio_out[-fade_samples:] *= np.linspace(1, 0, fade_samples) # Fade out | |
| # Add to result | |
| result_audio[start_sample:start_sample + len(audio_out)] += audio_out | |
| else: | |
| # Original processing method using sliced_inference | |
| click.echo(f"Processing {len(segments_with_pos)} segments...") | |
| result_audio = np.zeros(len(audio) + fade_samples) # Extra space for potential overlap | |
| with torch.no_grad(): | |
| for idx, (start_sample, chunk) in enumerate(tqdm(segments_with_pos)): | |
| # Process the segment | |
| audio_out = process_segment( | |
| chunk, svc_model, vocoder, rmvpe, hubert, rms_extractor, | |
| speaker_id, sample_rate, hop_length, device, | |
| key_shift, infer_steps, ds_cfg_strength, spk_cfg_strength, | |
| skip_cfg_strength, cfg_skip_layers, cfg_rescale, | |
| cvec_downsample_rate, target_loudness, restore_loudness, sliced_inference, | |
| robust_f0, use_fp16 | |
| ) | |
| # Ensure consistent length | |
| expected_length = len(chunk) | |
| if len(audio_out) > expected_length: | |
| audio_out = audio_out[:expected_length] | |
| elif len(audio_out) < expected_length: | |
| audio_out = np.pad(audio_out, (0, expected_length - len(audio_out)), 'constant') | |
| # Apply fades | |
| if idx > 0: # Not first segment | |
| audio_out = apply_fade(audio_out.copy(), fade_samples, fade_in=True) | |
| result_audio[start_sample:start_sample + fade_samples] *= \ | |
| np.linspace(1, 0, fade_samples) # Fade out previous | |
| if idx < len(segments_with_pos) - 1: # Not last segment | |
| audio_out[-fade_samples:] *= np.linspace(1, 0, fade_samples) # Fade out | |
| # Add to result | |
| result_audio[start_sample:start_sample + len(audio_out)] += audio_out | |
| # Trim any extra padding | |
| result_audio = result_audio[:len(audio)] | |
| # Save output | |
| click.echo("Saving output...") | |
| output_path = Path(output) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| torchaudio.save(output, torch.from_numpy(result_audio).unsqueeze(0), sample_rate) | |
| click.echo("Done!") | |
| if __name__ == '__main__': | |
| main() |