380 lines
16 KiB
Python
380 lines
16 KiB
Python
import sys
|
||
import os
|
||
import time
|
||
import sounddevice as sd
|
||
import soundfile as sf
|
||
import numpy as np
|
||
import yaml
|
||
|
||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||
from tts_engine import TTSEngine
|
||
|
||
# --- Функции записи и воспроизведения ---
|
||
|
||
def get_rms(data):
|
||
"""Вычисление среднеквадратичного уровня громкости"""
|
||
return np.sqrt(np.mean(data**2))
|
||
|
||
def record_sample_interactive(engine):
|
||
cfg = engine.config['recording']
|
||
sr = cfg['sample_rate']
|
||
channels = cfg['channels']
|
||
|
||
print("\n--- [REC] Запись нового сэмпла ---")
|
||
name = input("Имя сэмпла (латиница, без пробелов, Enter для auto): ").strip()
|
||
if not name: name = f"speaker_{int(time.time())}"
|
||
|
||
# Очистка имени
|
||
name = "".join(c for c in name if c.isalnum() or c in ('_', '-')).strip()
|
||
|
||
text_prompt = input("Фраза для чтения (сохранится как текст сэмпла): ").strip()
|
||
if not text_prompt:
|
||
print("[!] Текст сэмпла обязателен для качественного клонирования.")
|
||
return None
|
||
|
||
save_dir = os.path.join(engine.config['storage']['sample_dir'], name)
|
||
if os.path.exists(save_dir):
|
||
print(f"[!] Сэмпл с именем '{name}' уже существует. Перезапись.")
|
||
|
||
print("\n>>> НАЧАЛО ЗАПИСИ. Говорите сейчас!")
|
||
print(f">>> Запись остановится автоматически после паузы в {cfg['silence_duration']} сек.")
|
||
|
||
frames = []
|
||
silence_counter = 0
|
||
start_time = time.time()
|
||
|
||
try:
|
||
with sd.InputStream(samplerate=sr, channels=channels, dtype='float32') as stream:
|
||
while True:
|
||
data, overflowed = stream.read(1024)
|
||
if overflowed:
|
||
print("[!] Buffer overflow")
|
||
|
||
frames.append(data.copy())
|
||
|
||
# Логика VAD
|
||
rms = get_rms(data)
|
||
|
||
if rms < cfg['silence_threshold']:
|
||
silence_counter += len(data) / sr
|
||
else:
|
||
silence_counter = 0
|
||
|
||
current_duration = time.time() - start_time
|
||
if silence_counter >= cfg['silence_duration'] and current_duration > cfg['min_duration']:
|
||
print("\n[STOP] Тишина обнаружена. Остановка записи.")
|
||
break
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n[!] Запись прервана вручную.")
|
||
|
||
# Обработка и сохранение
|
||
recording = np.concatenate(frames, axis=0)
|
||
os.makedirs(save_dir, exist_ok=True)
|
||
|
||
audio_path = os.path.join(save_dir, "audio.wav")
|
||
sf.write(audio_path, recording, sr)
|
||
|
||
prompt_path = os.path.join(save_dir, "prompt.txt")
|
||
with open(prompt_path, 'w', encoding='utf-8') as f:
|
||
f.write(text_prompt)
|
||
|
||
print(f"[+] Сэмпл сохранен: {save_dir}")
|
||
return save_dir
|
||
|
||
def select_sample_ui(engine):
|
||
samples = engine.get_available_samples()
|
||
if not samples:
|
||
print("[!] Нет сохраненных сэмплов. Сначала запишите один.")
|
||
return None
|
||
|
||
print("\n--- Выберите сэмпл ---")
|
||
for i, s in enumerate(samples):
|
||
txt_preview = s['prompt'][:40] + "..." if len(s['prompt']) > 40 else s['prompt']
|
||
print(f"[{i+1}] {s['name']} : \"{txt_preview}\"")
|
||
|
||
try:
|
||
idx = int(input("Номер: ")) - 1
|
||
if 0 <= idx < len(samples):
|
||
return samples[idx]['path']
|
||
except ValueError:
|
||
pass
|
||
print("[!] Неверный выбор.")
|
||
return None
|
||
|
||
def select_audio_device():
|
||
print("\n--- Выбор устройства вывода ---")
|
||
devices = sd.query_devices()
|
||
output_devices = []
|
||
|
||
for i, dev in enumerate(devices):
|
||
if dev['max_output_channels'] > 0:
|
||
output_devices.append((i, dev['name']))
|
||
|
||
for idx, name in output_devices:
|
||
default_marker = " (DEFAULT)" if idx == sd.default.device[1] else ""
|
||
print(f"[{idx}] {name}{default_marker}")
|
||
|
||
print("\nВведите ID устройства или Enter для использования по умолчанию.")
|
||
choice = input(">> ").strip()
|
||
|
||
if choice.isdigit():
|
||
return int(choice)
|
||
return None
|
||
|
||
def play_audio(filepath, device_id=None):
|
||
try:
|
||
data, sr = sf.read(filepath, dtype='float32')
|
||
sd.play(data, sr, device=device_id)
|
||
print(f">>> Воспроизведение: {os.path.basename(filepath)}")
|
||
except Exception as e:
|
||
print(f"[!] Ошибка воспроизведения: {e}")
|
||
|
||
def get_test_phrase(engine):
|
||
"""Получает тестовую фразу из конфига или возвращает дефолт"""
|
||
return engine.config.get('voice_design', {}).get('test_phrase',
|
||
'Привет! Это тестовая фраза для проверки нового голоса.')
|
||
|
||
# --- Главное меню ---
|
||
|
||
def main():
|
||
# Попытка установить UTF-8 для консоли Windows
|
||
if sys.platform == 'win32':
|
||
try:
|
||
sys.stdout.reconfigure(encoding='utf-8')
|
||
except:
|
||
pass
|
||
|
||
print("Инициализация движка...")
|
||
try:
|
||
engine = TTSEngine("config.yaml")
|
||
except Exception as e:
|
||
print(f"Критическая ошибка инициализации: {e}")
|
||
return
|
||
|
||
current_sample_path = None
|
||
output_device = None
|
||
|
||
while True:
|
||
print("\n" + "="*50)
|
||
print(" QWEN3-TTS CONSOLE")
|
||
print("="*50)
|
||
print("1. Управление сэмплами (Запись/Выбор)")
|
||
print("2. Синтез речи")
|
||
print("3. История (Прослушивание/Чтение)")
|
||
print("4. Список стандартных голосов (CustomVoice)")
|
||
print("5. Загрузить/Проверить модели (кэширование)")
|
||
print("6. Настройки (Устройство вывода)")
|
||
print("0. Выход")
|
||
|
||
choice = input(">> ").strip()
|
||
|
||
# 1. Управление сэмплами
|
||
if choice == '1':
|
||
print("\n--- Управление сэмплами ---")
|
||
print("1. Записать новый сэмпл")
|
||
print("2. Выбрать существующий")
|
||
print("3. Сбросить текущий выбор")
|
||
sub = input(">> ").strip()
|
||
|
||
if sub == '1':
|
||
path = record_sample_interactive(engine)
|
||
if path: current_sample_path = path
|
||
elif sub == '2':
|
||
path = select_sample_ui(engine)
|
||
if path:
|
||
current_sample_path = path
|
||
print(f"[+] Выбран сэмпл: {path}")
|
||
elif sub == '3':
|
||
current_sample_path = None
|
||
print("[i] Сброшено. Будет использован голос по умолчанию.")
|
||
|
||
# 2. Синтез речи
|
||
elif choice == '2':
|
||
print("\nРежим синтеза:")
|
||
print(f"1. Клонирование (Сэмпл: {'Да' if current_sample_path else 'Нет'})")
|
||
print("2. Voice Design (описание + клонирование)")
|
||
print("3. Предпрослушка VoiceDesign (только генерация)")
|
||
print("4. Стандартный голос")
|
||
mode = input(">> ").strip()
|
||
|
||
# Для режимов 1, 2, 4 нужен ввод текста
|
||
# Для режима 3 используем тестовую фразу из конфига
|
||
text = None
|
||
if mode in ['1', '2', '4']:
|
||
text = input("\nВведите текст: ").strip()
|
||
if not text:
|
||
continue
|
||
elif mode == '3':
|
||
test_phrase = get_test_phrase(engine)
|
||
print(f"\nТестовая фраза из конфига: \"{test_phrase}\"")
|
||
use_custom = input("Использовать свой текст? (y/n): ").strip().lower()
|
||
if use_custom == 'y':
|
||
text = input("Введите текст: ").strip()
|
||
if not text:
|
||
continue
|
||
else:
|
||
text = test_phrase
|
||
else:
|
||
continue
|
||
|
||
try:
|
||
start_t = time.time()
|
||
wavs, sr = None, None
|
||
|
||
if mode == '1':
|
||
if not current_sample_path:
|
||
print("[!] Ошибка: Сэмпл не выбран!")
|
||
continue
|
||
wavs, sr = engine.generate_with_sample(text, current_sample_path)
|
||
|
||
elif mode == '2':
|
||
desc = input("Описание голоса (ТОЛЬКО АНГЛИЙСКИЙ!): ").strip()
|
||
if not desc:
|
||
desc = "A neutral clear voice with moderate pace" # дефолт на английском
|
||
if not desc: desc = "Neutral voice"
|
||
wavs, sr = engine.generate_with_description(text, desc)
|
||
|
||
elif mode == '3':
|
||
# Предпрослушка VoiceDesign
|
||
desc = input("Описание голоса (напр. 'Злой робот'): ").strip()
|
||
if not desc: desc = "Neutral voice"
|
||
|
||
print(f"\n🎨 Генерация VoiceDesign: '{desc}'")
|
||
wavs, sr = engine.generate_voice_design_only(text, desc)
|
||
|
||
# Автовоспроизведение
|
||
print("🎵 Воспроизведение...")
|
||
temp_path = os.path.join(engine.config['storage']['output_dir'], "_temp_preview.wav")
|
||
sf.write(temp_path, wavs[0], sr)
|
||
play_audio(temp_path, output_device)
|
||
|
||
# Меню после предпрослушки
|
||
print("\n--- Что дальше? ---")
|
||
print("1. Сгенерировать полный текст этим голосом (Design + Clone)")
|
||
print("2. Попробовать другое описание")
|
||
print("3. Сохранить результат")
|
||
print("4. Вернуться в меню")
|
||
next_action = input(">> ").strip()
|
||
|
||
if next_action == '1':
|
||
full_text = input("\nВведите полный текст: ").strip()
|
||
if full_text:
|
||
print(f"\n🔄 Генерация полного текста...")
|
||
start_t = time.time()
|
||
wavs, sr = engine.generate_with_description(full_text, desc)
|
||
else:
|
||
continue
|
||
elif next_action == '2':
|
||
continue
|
||
elif next_action == '3':
|
||
# Сохраняем текущий результат (тестовую фразу)
|
||
pass # wavs уже содержит аудио, сохранится ниже
|
||
else:
|
||
continue
|
||
|
||
elif mode == '4':
|
||
# Получаем список доступных спикеров
|
||
print("\nЗагрузка списка стандартных голосов...")
|
||
speakers = engine.get_custom_speakers_list()
|
||
|
||
selected_speaker = None
|
||
if speakers:
|
||
print(f"\n--- Доступные голоса ({len(speakers)}) ---")
|
||
for i, spk in enumerate(speakers):
|
||
marker = " (DEFAULT)" if spk == engine.config['generation']['default_speaker'] else ""
|
||
print(f"[{i+1}] {spk}{marker}")
|
||
|
||
print(f"\nВведите номер голоса (1-{len(speakers)}) или Enter для default:")
|
||
spk_choice = input(">> ").strip()
|
||
|
||
if spk_choice.isdigit():
|
||
spk_idx = int(spk_choice) - 1
|
||
if 0 <= spk_idx < len(speakers):
|
||
selected_speaker = speakers[spk_idx]
|
||
print(f"[+] Выбран голос: {selected_speaker}")
|
||
else:
|
||
print("[!] Неверный номер, используется default.")
|
||
else:
|
||
print("[i] Используется голос по умолчанию.")
|
||
else:
|
||
print("[!] Не удалось получить список голосов, используется default.")
|
||
|
||
wavs, sr = engine.generate_standard(text, speaker=selected_speaker)
|
||
|
||
if wavs is not None:
|
||
# Сохранение
|
||
saved_path = engine.save_result(text, wavs, sr)
|
||
elapsed = time.time() - start_t
|
||
print(f"\n[+] Успешно за {elapsed:.2f} сек.")
|
||
print(f"[+] Файл: {saved_path}")
|
||
|
||
# Для режима 3 (предпрослушка) уже воспроизвели, спрашиваем повторно только для других
|
||
if mode != '3':
|
||
ans = input("Воспроизвести сейчас? (y/n): ").strip().lower()
|
||
if ans == 'y':
|
||
play_audio(saved_path, output_device)
|
||
|
||
except Exception as e:
|
||
print(f"[!] Ошибка генерации: {e}")
|
||
|
||
# 3. История
|
||
elif choice == '3':
|
||
history = engine.get_history()
|
||
if not history:
|
||
print("\n[i] Папка out пуста.")
|
||
continue
|
||
|
||
print(f"\n--- История ({len(history)} файлов) ---")
|
||
for i, item in enumerate(history[:10]):
|
||
print(f"[{i+1}] {item['filename']}")
|
||
print(f" Текст: {item['text'][:50]}...")
|
||
|
||
if len(history) > 10:
|
||
print("... (показаны последние 10)")
|
||
|
||
print("\nВведите номер для прослушивания/чтения или Enter.")
|
||
sel = input(">> ").strip()
|
||
if sel.isdigit():
|
||
idx = int(sel) - 1
|
||
if 0 <= idx < len(history):
|
||
item = history[idx]
|
||
print(f"\n>>> Файл: {item['filename']}")
|
||
print(f"Текст:\n{item['text']}")
|
||
print("-" * 30)
|
||
play_audio(item['wav_path'], output_device)
|
||
|
||
# 4. Список стандартных голосов
|
||
elif choice == '4':
|
||
print("\nЗагрузка списка голосов из CustomVoice модели...")
|
||
try:
|
||
speakers = engine.get_custom_speakers_list()
|
||
if speakers:
|
||
print("\n--- Доступные голоса ---")
|
||
for spk in speakers:
|
||
print(f"- {spk}")
|
||
print("\n(Один из них используется в режиме 'Стандартный голос')")
|
||
else:
|
||
print("[!] Список пуст или модель не загружена.")
|
||
except Exception as e:
|
||
print(f"[!] Ошибка: {e}")
|
||
|
||
# 5. Принудительное скачивание моделей
|
||
elif choice == '5':
|
||
engine.download_all_models()
|
||
|
||
# 6. Настройки
|
||
elif choice == '6':
|
||
output_device = select_audio_device()
|
||
if output_device:
|
||
print(f"[+] Установлено устройство вывода: {sd.query_devices(output_device)['name']}")
|
||
else:
|
||
print("[i] Используется системное устройство по умолчанию.")
|
||
|
||
elif choice == '0':
|
||
print("Выход...")
|
||
break
|
||
|
||
if __name__ == "__main__":
|
||
main()
|