From 9af128ffc6467d0dafa081ae45c9aa1d38f4be4d Mon Sep 17 00:00:00 2001 From: Komisar Date: Mon, 23 Mar 2026 20:08:04 +0300 Subject: [PATCH] FIX state: Curent in worked state --- config.yaml | 14 ++- main.py | 305 ++++++++++++++++++++++++++++++++--------------- requirements.txt | 2 + tts_engine.py | 230 +++++++++++++++++++++-------------- 4 files changed, 364 insertions(+), 187 deletions(-) diff --git a/config.yaml b/config.yaml index 39a1068..8bc6e07 100644 --- a/config.yaml +++ b/config.yaml @@ -22,9 +22,19 @@ models: generation: default_language: "Russian" - default_speaker: "Chelsie" + default_speaker: "serena" device: "auto" - dtype: "bfloat16" + dtype: "float16" + +# Настройки для VoiceDesign +voice_design: + # Тестовая фраза для предпрослушки голоса + # Используется в пункте "3. Предпрослушка VoiceDesign" + test_phrase: "Привет! Это тестовая фраза. Я готов помочь тебе с любой задачей. Как тебе мой новый голос?" + + # Альтернативные варианты (можно раскомментировать): + # test_phrase: "Здравствуй! Меня зовут... ну, пока у меня нет имени. Но звучу я классно, правда?" + # test_phrase: "Добрый день. Это короткая демонстрация синтезированной речи. Спасибо за внимание." recording: sample_rate: 16000 diff --git a/main.py b/main.py index 4064ca3..5ddb5a5 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,11 @@ -import sys +import sys import os import time import sounddevice as sd +import soundfile as sf import numpy as np import yaml -# Добавляем текущую директорию в путь для импорта sys.path.append(os.path.dirname(os.path.abspath(__file__))) from tts_engine import TTSEngine @@ -19,109 +19,105 @@ def record_sample_interactive(engine): cfg = engine.config['recording'] sr = cfg['sample_rate'] channels = cfg['channels'] - - print("\n--- 🎙 Запись нового сэмпла ---") + + print("\n--- [REC] Запись нового сэмпла ---") name = input("Имя сэмпла (латиница, без пробелов, Enter для auto): ").strip() if not name: name = f"speaker_{int(time.time())}" - - # Очистка имени от недопустимых символов + + # Очистка имени name = "".join(c for c in name if c.isalnum() or c in ('_', '-')).strip() - - text_prompt = input("Фраза для чтения (будет сохранена как текст сэмпла): ").strip() + + text_prompt = input("Фраза для чтения (сохранится как текст сэмпла): ").strip() if not text_prompt: - print("❌ Текст сэмпла обязателен для качественного клонирования.") + print("[!] Текст сэмпла обязателен для качественного клонирования.") return None save_dir = os.path.join(engine.config['storage']['sample_dir'], name) if os.path.exists(save_dir): - print(f"⚠️ Сэмпл с именем '{name}' уже существует. Перезапись.") - - print("\n🎤 НАЧАЛО ЗАПИСИ. Говорите сейчас!") - print(f"⏳ Запись остановится автоматически после паузы в {cfg['silence_duration']} сек.") - + print(f"[!] Сэмпл с именем '{name}' уже существует. Перезапись.") + + print("\n>>> НАЧАЛО ЗАПИСИ. Говорите сейчас!") + print(f">>> Запись остановится автоматически после паузы в {cfg['silence_duration']} сек.") + frames = [] silence_counter = 0 start_time = time.time() - + try: with sd.InputStream(samplerate=sr, channels=channels, dtype='float32') as stream: while True: data, overflowed = stream.read(1024) if overflowed: - print("⚠️ Buffer overflow") - + print("[!] Buffer overflow") + frames.append(data.copy()) - - # Логика VAD (Voice Activity Detection) + + # Логика VAD rms = get_rms(data) - - # Если звук тихий + if rms < cfg['silence_threshold']: silence_counter += len(data) / sr else: - silence_counter = 0 # Сброс при наличии голоса - - # Авто-стоп + silence_counter = 0 + current_duration = time.time() - start_time if silence_counter >= cfg['silence_duration'] and current_duration > cfg['min_duration']: - print("\n🛑 Тишина обнаружена. Остановка записи.") + print("\n[STOP] Тишина обнаружена. Остановка записи.") break - + except KeyboardInterrupt: - print("\nЗапись прервана вручную.") + print("\n[!] Запись прервана вручную.") # Обработка и сохранение recording = np.concatenate(frames, axis=0) - - # Создаем папку и сохраняем os.makedirs(save_dir, exist_ok=True) - + audio_path = os.path.join(save_dir, "audio.wav") sf.write(audio_path, recording, sr) - + prompt_path = os.path.join(save_dir, "prompt.txt") with open(prompt_path, 'w', encoding='utf-8') as f: f.write(text_prompt) - - print(f"✅ Сэмпл сохранен: {save_dir}") + + print(f"[+] Сэмпл сохранен: {save_dir}") return save_dir def select_sample_ui(engine): samples = engine.get_available_samples() if not samples: - print("Нет сохраненных сэмплов. Сначала запишите один.") + print("[!] Нет сохраненных сэмплов. Сначала запишите один.") return None - - print("\n--- 📂 Выберите сэмпл ---") + + print("\n--- Выберите сэмпл ---") for i, s in enumerate(samples): txt_preview = s['prompt'][:40] + "..." if len(s['prompt']) > 40 else s['prompt'] print(f"[{i+1}] {s['name']} : \"{txt_preview}\"") - + try: idx = int(input("Номер: ")) - 1 if 0 <= idx < len(samples): return samples[idx]['path'] except ValueError: pass - print("Неверный выбор.") + print("[!] Неверный выбор.") return None def select_audio_device(): - print("\n--- 🔊 Выбор устройства вывода ---") + print("\n--- Выбор устройства вывода ---") devices = sd.query_devices() output_devices = [] - + for i, dev in enumerate(devices): if dev['max_output_channels'] > 0: output_devices.append((i, dev['name'])) - + for idx, name in output_devices: default_marker = " (DEFAULT)" if idx == sd.default.device[1] else "" print(f"[{idx}] {name}{default_marker}") - + print("\nВведите ID устройства или Enter для использования по умолчанию.") choice = input(">> ").strip() - + if choice.isdigit(): return int(choice) return None @@ -130,14 +126,25 @@ def play_audio(filepath, device_id=None): try: data, sr = sf.read(filepath, dtype='float32') sd.play(data, sr, device=device_id) - print(f"▶️ Воспроизведение: {os.path.basename(filepath)}") - # sd.wait() # Раскомментировать, если нужно блокировать консоль до конца воспроизведения + print(f">>> Воспроизведение: {os.path.basename(filepath)}") except Exception as e: - print(f"❌ Ошибка воспроизведения: {e}") + print(f"[!] Ошибка воспроизведения: {e}") + +def get_test_phrase(engine): + """Получает тестовую фразу из конфига или возвращает дефолт""" + return engine.config.get('voice_design', {}).get('test_phrase', + 'Привет! Это тестовая фраза для проверки нового голоса.') # --- Главное меню --- def main(): + # Попытка установить UTF-8 для консоли Windows + if sys.platform == 'win32': + try: + sys.stdout.reconfigure(encoding='utf-8') + except: + pass + print("Инициализация движка...") try: engine = TTSEngine("config.yaml") @@ -147,26 +154,29 @@ def main(): current_sample_path = None output_device = None - + while True: - print("\n" + "="*40) - print(" QWEN3-TTS CONSOLE (Full Version)") - print("="*40) - print("1. 🎙 Управление сэмплами") - print("2. 🗣 Синтез речи") - print("3. 📁 История (Прослушивание/Чтение)") - print("4. ⚙️ Настройки") + print("\n" + "="*50) + print(" QWEN3-TTS CONSOLE") + print("="*50) + print("1. Управление сэмплами (Запись/Выбор)") + print("2. Синтез речи") + print("3. История (Прослушивание/Чтение)") + print("4. Список стандартных голосов (CustomVoice)") + print("5. Загрузить/Проверить модели (кэширование)") + print("6. Настройки (Устройство вывода)") print("0. Выход") - + choice = input(">> ").strip() - + + # 1. Управление сэмплами if choice == '1': print("\n--- Управление сэмплами ---") print("1. Записать новый сэмпл") - print("2. Вырать существующий") + print("2. Выбрать существующий") print("3. Сбросить текущий выбор") sub = input(">> ").strip() - + if sub == '1': path = record_sample_interactive(engine) if path: current_sample_path = path @@ -174,89 +184,192 @@ def main(): path = select_sample_ui(engine) if path: current_sample_path = path - print(f"✅ Выбран сэмпл: {path}") + print(f"[+] Выбран сэмпл: {path}") elif sub == '3': current_sample_path = None - print("Сброшено. Будет использован голос по умолчанию.") + print("[i] Сброшено. Будет использован голос по умолчанию.") + # 2. Синтез речи elif choice == '2': - text = input("\nВведите текст: ").strip() - if not text: continue - print("\nРежим синтеза:") print(f"1. Клонирование (Сэмпл: {'Да' if current_sample_path else 'Нет'})") - print("2. Описание голоса (Voice Design)") - print("3. Стандартный голос") + print("2. Voice Design (описание + клонирование)") + print("3. Предпрослушка VoiceDesign (только генерация)") + print("4. Стандартный голос") mode = input(">> ").strip() - + + # Для режимов 1, 2, 4 нужен ввод текста + # Для режима 3 используем тестовую фразу из конфига + text = None + if mode in ['1', '2', '4']: + text = input("\nВведите текст: ").strip() + if not text: + continue + elif mode == '3': + test_phrase = get_test_phrase(engine) + print(f"\nТестовая фраза из конфига: \"{test_phrase}\"") + use_custom = input("Использовать свой текст? (y/n): ").strip().lower() + if use_custom == 'y': + text = input("Введите текст: ").strip() + if not text: + continue + else: + text = test_phrase + else: + continue + try: start_t = time.time() wavs, sr = None, None - + if mode == '1': if not current_sample_path: - print("❌ Ошибка: Сэмпл не выбран!") + print("[!] Ошибка: Сэмпл не выбран!") continue wavs, sr = engine.generate_with_sample(text, current_sample_path) - + elif mode == '2': - desc = input("Описание голоса (напр. 'Добрый женский голос'): ").strip() + desc = input("Описание голоса (ТОЛЬКО АНГЛИЙСКИЙ!): ").strip() + if not desc: + desc = "A neutral clear voice with moderate pace" # дефолт на английском if not desc: desc = "Neutral voice" wavs, sr = engine.generate_with_description(text, desc) - + elif mode == '3': - wavs, sr = engine.generate_standard(text) - + # Предпрослушка VoiceDesign + desc = input("Описание голоса (напр. 'Злой робот'): ").strip() + if not desc: desc = "Neutral voice" + + print(f"\n🎨 Генерация VoiceDesign: '{desc}'") + wavs, sr = engine.generate_voice_design_only(text, desc) + + # Автовоспроизведение + print("🎵 Воспроизведение...") + temp_path = os.path.join(engine.config['storage']['output_dir'], "_temp_preview.wav") + sf.write(temp_path, wavs[0], sr) + play_audio(temp_path, output_device) + + # Меню после предпрослушки + print("\n--- Что дальше? ---") + print("1. Сгенерировать полный текст этим голосом (Design + Clone)") + print("2. Попробовать другое описание") + print("3. Сохранить результат") + print("4. Вернуться в меню") + next_action = input(">> ").strip() + + if next_action == '1': + full_text = input("\nВведите полный текст: ").strip() + if full_text: + print(f"\n🔄 Генерация полного текста...") + start_t = time.time() + wavs, sr = engine.generate_with_description(full_text, desc) + else: + continue + elif next_action == '2': + continue + elif next_action == '3': + # Сохраняем текущий результат (тестовую фразу) + pass # wavs уже содержит аудио, сохранится ниже + else: + continue + + elif mode == '4': + # Получаем список доступных спикеров + print("\nЗагрузка списка стандартных голосов...") + speakers = engine.get_custom_speakers_list() + + selected_speaker = None + if speakers: + print(f"\n--- Доступные голоса ({len(speakers)}) ---") + for i, spk in enumerate(speakers): + marker = " (DEFAULT)" if spk == engine.config['generation']['default_speaker'] else "" + print(f"[{i+1}] {spk}{marker}") + + print(f"\nВведите номер голоса (1-{len(speakers)}) или Enter для default:") + spk_choice = input(">> ").strip() + + if spk_choice.isdigit(): + spk_idx = int(spk_choice) - 1 + if 0 <= spk_idx < len(speakers): + selected_speaker = speakers[spk_idx] + print(f"[+] Выбран голос: {selected_speaker}") + else: + print("[!] Неверный номер, используется default.") + else: + print("[i] Используется голос по умолчанию.") + else: + print("[!] Не удалось получить список голосов, используется default.") + + wavs, sr = engine.generate_standard(text, speaker=selected_speaker) + if wavs is not None: # Сохранение saved_path = engine.save_result(text, wavs, sr) elapsed = time.time() - start_t - print(f"\n✅ Успешно за {elapsed:.2f} сек.") - print(f"📁 Файл: {saved_path}") - - # Вопрос о воспроизведении - ans = input("Воспроизвести сейчас? (y/n): ").strip().lower() - if ans == 'y': - play_audio(saved_path, output_device) - - except Exception as e: - print(f"❌ Ошибка генерации: {e}") + print(f"\n[+] Успешно за {elapsed:.2f} сек.") + print(f"[+] Файл: {saved_path}") + # Для режима 3 (предпрослушка) уже воспроизвели, спрашиваем повторно только для других + if mode != '3': + ans = input("Воспроизвести сейчас? (y/n): ").strip().lower() + if ans == 'y': + play_audio(saved_path, output_device) + + except Exception as e: + print(f"[!] Ошибка генерации: {e}") + + # 3. История elif choice == '3': history = engine.get_history() if not history: - print("\n📂 Папка out пуста.") + print("\n[i] Папка out пуста.") continue - - print(f"\n--- 📂 История ({len(history)} файлов) ---") - # Выводим последние 10 + + print(f"\n--- История ({len(history)} файлов) ---") for i, item in enumerate(history[:10]): print(f"[{i+1}] {item['filename']}") print(f" Текст: {item['text'][:50]}...") - + if len(history) > 10: print("... (показаны последние 10)") - + print("\nВведите номер для прослушивания/чтения или Enter.") sel = input(">> ").strip() if sel.isdigit(): idx = int(sel) - 1 - # Ищем в полном списке, но отображаем 10 - # Для простоты берем индекс из отображаемого списка (0-9) - # Но правильнее из полного списка history if 0 <= idx < len(history): item = history[idx] - print(f"\n▶️ Файл: {item['filename']}") - print(f"📝 Текст:\n{item['text']}") + print(f"\n>>> Файл: {item['filename']}") + print(f"Текст:\n{item['text']}") print("-" * 30) play_audio(item['wav_path'], output_device) + # 4. Список стандартных голосов elif choice == '4': + print("\nЗагрузка списка голосов из CustomVoice модели...") + try: + speakers = engine.get_custom_speakers_list() + if speakers: + print("\n--- Доступные голоса ---") + for spk in speakers: + print(f"- {spk}") + print("\n(Один из них используется в режиме 'Стандартный голос')") + else: + print("[!] Список пуст или модель не загружена.") + except Exception as e: + print(f"[!] Ошибка: {e}") + + # 5. Принудительное скачивание моделей + elif choice == '5': + engine.download_all_models() + + # 6. Настройки + elif choice == '6': output_device = select_audio_device() if output_device: - print(f"Установлено устройство вывода: {sd.query_devices(output_device)['name']}") + print(f"[+] Установлено устройство вывода: {sd.query_devices(output_device)['name']}") else: - print("Используется системное устройство по умолчанию.") + print("[i] Используется системное устройство по умолчанию.") elif choice == '0': print("Выход...") diff --git a/requirements.txt b/requirements.txt index 4a1c13b..3e43166 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,5 @@ huggingface_hub transformers accelerate qwen-tts +bitsandbytes +hf_xet diff --git a/tts_engine.py b/tts_engine.py index 88951d6..626519c 100644 --- a/tts_engine.py +++ b/tts_engine.py @@ -1,115 +1,162 @@ -import torch +import torch import soundfile as sf import yaml import os import time import datetime import numpy as np +import gc from pathlib import Path from typing import Optional, List, Dict, Tuple from huggingface_hub import snapshot_download # --- # Блок импорта модели. -# Если у вас установлен отдельный пакет qwen-tts: try: from qwen_tts import Qwen3TTSModel except ImportError: - # Заглушка для тестирования без реальной модели (эмуляция) print("WARNING: qwen_tts not found. Using Mock Model for testing.") class Qwen3TTSModel: @staticmethod def from_pretrained(path, **kwargs): print(f"[Mock] Loading model from {path}") return Qwen3TTSModel() - + def create_voice_clone_prompt(self, **kwargs): return "mock_prompt" - + def generate_voice_clone(self, text, **kwargs): print(f"[Mock] Generating voice clone for: {text[:30]}...") - # Возвращаем пустой массив нужной размерности sr = 24000 duration = len(text) * 0.1 return np.random.rand(1, int(sr * duration)).astype(np.float32), sr def generate_custom_voice(self, text, **kwargs): return self.generate_voice_clone(text, **kwargs) - + def generate_voice_design(self, text, **kwargs): return self.generate_voice_clone(text, **kwargs) + + def get_supported_speakers(self): + return ["Chelsie", "Dylan", "Eric", "Serena", "Vivian", "Aiden", "Ryan"] # --- class TTSEngine: def __init__(self, config_path: str = "config.yaml"): with open(config_path, 'r', encoding='utf-8') as f: self.config = yaml.safe_load(f) - + self.models = {} + self.current_model_type = None + try: self.dtype = getattr(torch, self.config['generation']['dtype']) except AttributeError: - self.dtype = torch.float32 - + self.dtype = torch.float16 # По умолчанию FP16 + + if torch.cuda.is_available(): + gpu_mem = torch.cuda.get_device_properties(0).total_memory / (1024**3) + print(f"📊 GPU: {torch.cuda.get_device_name(0)}") + print(f"📊 GPU Memory: {gpu_mem:.1f} GB") + # Инициализация папок Path(self.config['storage']['model_path']).mkdir(parents=True, exist_ok=True) Path(self.config['storage']['sample_dir']).mkdir(parents=True, exist_ok=True) Path(self.config['storage']['output_dir']).mkdir(parents=True, exist_ok=True) def _resolve_model(self, model_key: str) -> str: - """ - Умная загрузка моделей: - 1. Абсолютный путь -> использовать его. - 2. Локальный путь внутри model_path -> использовать. - 3. Скачать с HF в model_path. - """ + """Умная загрузка моделей""" model_cfg_value = self.config['models'][model_key] base_model_path = self.config['storage']['model_path'] - - # 1. Если это абсолютный путь или файл уже существует по этому пути + if os.path.isabs(model_cfg_value) or os.path.exists(model_cfg_value): - print(f"📂 Model [{model_key}]: Using direct path {model_cfg_value}") return model_cfg_value - - # 2. Формируем путь внутри хранилища - # Используем имя репозития как имя папки (замена / на _ если нужно, или сохранение структуры) + folder_name = model_cfg_value.split('/')[-1] local_path = os.path.join(base_model_path, folder_name) - + if os.path.exists(local_path) and os.listdir(local_path): - print(f"📂 Model [{model_key}]: Found locally at {local_path}") return local_path - - # 3. Скачивание с Hugging Face - print(f"⬇️ Model [{model_key}]: Not found. Downloading from HF to {local_path}...") + + print(f"⬇️ Downloading {model_key}...") try: - snapshot_download( - repo_id=model_cfg_value, - local_dir=local_path, - local_dir_use_symlinks=False - ) - print(f"✅ Model [{model_key}]: Downloaded.") + snapshot_download(repo_id=model_cfg_value, local_dir=local_path, local_dir_use_symlinks=False) return local_path except Exception as e: - print(f"❌ Error downloading model {model_cfg_value}: {e}") - raise RuntimeError(f"Failed to load model {model_key}") + raise RuntimeError(f"Failed to load model {model_key}: {e}") + + def _unload_other_models(self, keep_model_type: str): + """Выгружает все модели кроме указанной""" + for mtype in list(self.models.keys()): + if mtype != keep_model_type and mtype in self.models: + print(f"🗑️ Unloading model [{mtype}] to free memory...") + del self.models[mtype] + gc.collect() + if torch.cuda.is_available(): + torch.cuda.empty_cache() def _get_model(self, model_type: str): - if model_type not in self.models: - model_path = self._resolve_model(model_type) - print(f"🚀 Loading model [{model_type}] into memory...") + # Если модель уже загружена — возвращаем её + if model_type in self.models: + return self.models[model_type] + + # Выгружаем другие модели чтобы освободить память + self._unload_other_models(model_type) + + model_path = self._resolve_model(model_type) + print(f"🚀 Loading model [{model_type}]...") + + if torch.cuda.is_available(): + torch.cuda.empty_cache() + free_before = (torch.cuda.get_device_properties(0).total_memory - torch.cuda.memory_allocated()) / (1024**3) + print(f"📊 Free GPU memory before load: {free_before:.2f} GB") + + try: + # Стратегия 1: Пробуем загрузить всё на GPU в FP16 + print(f"⚙️ Trying FP16 on GPU...") self.models[model_type] = Qwen3TTSModel.from_pretrained( model_path, - device_map=self.config['generation']['device'], - torch_dtype=self.dtype + dtype=torch.float16, + device_map="cuda:0", + low_cpu_mem_usage=True ) + print(f"✅ Model [{model_type}] loaded on GPU (FP16)") + + except RuntimeError as e: + if "out of memory" in str(e).lower(): + print(f"⚠️ GPU OOM, trying CPU offloading...") + torch.cuda.empty_cache() + gc.collect() + + # Стратегия 2: Используем accelerate с offloading + # Но при этом избегаем bitsandbytes который вызывает pickle ошибку + print(f"⚙️ Using accelerate with CPU offloading (FP16)...") + + # Ограничиваем память GPU чтобы force offloading + max_memory = {0: "3GiB", "cpu": "28GiB"} # Оставляем 3GB для одной модели + + self.models[model_type] = Qwen3TTSModel.from_pretrained( + model_path, + dtype=torch.float16, + device_map="auto", + max_memory=max_memory, + low_cpu_mem_usage=True + ) + print(f"✅ Model [{model_type}] loaded with CPU offloading") + else: + raise e + + if torch.cuda.is_available(): + allocated = torch.cuda.memory_allocated() / (1024**3) + print(f"📊 GPU memory allocated: {allocated:.2f} GB") + return self.models[model_type] def get_available_samples(self) -> List[Dict[str, str]]: samples = [] sample_dir = self.config['storage']['sample_dir'] if not os.path.exists(sample_dir): return samples - + for name in sorted(os.listdir(sample_dir)): full_path = os.path.join(sample_dir, name) if os.path.isdir(full_path): @@ -120,33 +167,23 @@ class TTSEngine: if os.path.exists(prompt_path): with open(prompt_path, 'r', encoding='utf-8') as f: prompt = f.read().strip() - samples.append({ - "name": name, - "path": full_path, - "prompt": prompt - }) + samples.append({"name": name, "path": full_path, "prompt": prompt}) return samples def generate_with_sample(self, text: str, sample_path: str) -> Tuple[np.ndarray, int]: - """Режим 1: Клонирование по сэмплу""" model = self._get_model('base') - + audio_file = os.path.join(sample_path, "audio.wav") prompt_file = os.path.join(sample_path, "prompt.txt") - + ref_text = None if os.path.exists(prompt_file): with open(prompt_file, 'r', encoding='utf-8') as f: ref_text = f.read().strip() - print(f"🎤 Cloning voice from: {sample_path}") - - # Создаем промпт клонирования - prompt = model.create_voice_clone_prompt( - ref_audio=audio_file, - ref_text=ref_text - ) - + print(f"🎤 Cloning voice...") + + prompt = model.create_voice_clone_prompt(ref_audio=audio_file, ref_text=ref_text) wavs, sr = model.generate_voice_clone( text=text, language=self.config['generation']['default_language'], @@ -155,29 +192,26 @@ class TTSEngine: return wavs, sr def generate_with_description(self, text: str, description: str) -> Tuple[np.ndarray, int]: - """Режим 2: Генерация голоса по описанию (Design -> Clone)""" print(f"🎨 Designing voice: '{description}'") - - # Шаг А: Генерируем референс через VoiceDesign + + # Генерируем референс через VoiceDesign vd_model = self._get_model('voice_design') ref_text = text[:100] if len(text) > 100 else text - - # Генерируем сэмпл для будущего клонирования + ref_wavs, ref_sr = vd_model.generate_voice_design( text=ref_text, language=self.config['generation']['default_language'], instruct=description ) - - # Шаг Б: Клонируем этот сгенерированный голос через Base модель + + # Переключаемся на Base (VoiceDesign автоматически выгрузится) base_model = self._get_model('base') - - # Передаем tuple (numpy_array, sr) как ref_audio + prompt = base_model.create_voice_clone_prompt( ref_audio=(ref_wavs[0], ref_sr), ref_text=ref_text ) - + wavs, sr = base_model.generate_voice_clone( text=text, language=self.config['generation']['default_language'], @@ -185,12 +219,24 @@ class TTSEngine: ) return wavs, sr + def generate_voice_design_only(self, text: str, description: str) -> Tuple[np.ndarray, int]: + """Режим предпрослушки: только VoiceDesign без клонирования""" + print(f"🎨 VoiceDesign preview: '{description}'") + + model = self._get_model('voice_design') + + wavs, sr = model.generate_voice_design( + text=text, + language=self.config['generation']['default_language'], + instruct=description + ) + return wavs, sr + def generate_standard(self, text: str, speaker: str = None) -> Tuple[np.ndarray, int]: - """Режим 3: Стандартный голос""" model = self._get_model('custom_voice') speaker = speaker or self.config['generation']['default_speaker'] - - print(f"🗣️ Using built-in speaker: {speaker}") + + print(f"🗣️ Using speaker: {speaker}") wavs, sr = model.generate_custom_voice( text=text, language=self.config['generation']['default_language'], @@ -198,46 +244,52 @@ class TTSEngine: ) return wavs, sr + def download_all_models(self): + print("\n--- Checking models ---") + for key in ['base', 'voice_design', 'custom_voice']: + try: + self._resolve_model(key) + print(f"✅ {key}: OK") + except Exception as e: + print(f"❌ {key}: {e}") + + def get_custom_speakers_list(self): + try: + model = self._get_model('custom_voice') + speakers = model.get_supported_speakers() + return list(speakers) if hasattr(speakers, '__iter__') else speakers + except Exception as e: + print(f"Error: {e}") + return ["Chelsie", "Dylan", "Eric", "Serena", "Vivian", "Aiden", "Ryan"] + def save_result(self, text: str, wavs: np.ndarray, sr: int) -> str: - """Сохраняет WAV и TXT в папку out""" out_dir = self.config['storage']['output_dir'] timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"speech_{timestamp}" - + wav_path = os.path.join(out_dir, f"{filename}.wav") txt_path = os.path.join(out_dir, f"{filename}.txt") - - # Сохраняем аудио + sf.write(wav_path, wavs[0], sr) - - # Сохраняем текст with open(txt_path, 'w', encoding='utf-8') as f: f.write(text) - return wav_path def get_history(self) -> List[Dict[str, str]]: - """Возвращает список сгенерированных файлов""" out_dir = self.config['storage']['output_dir'] history = [] if not os.path.exists(out_dir): return history - - files = sorted(os.listdir(out_dir), reverse=True) - for f in files: + + for f in sorted(os.listdir(out_dir), reverse=True): if f.endswith(".wav"): base_name = f[:-4] txt_path = os.path.join(out_dir, f"{base_name}.txt") wav_path = os.path.join(out_dir, f) - + text_content = "(Текст не найден)" if os.path.exists(txt_path): with open(txt_path, 'r', encoding='utf-8') as file: text_content = file.read() - - history.append({ - "filename": f, - "wav_path": wav_path, - "txt_path": txt_path, - "text": text_content - }) + + history.append({"filename": f, "wav_path": wav_path, "txt_path": txt_path, "text": text_content}) return history