From 5b79cfeb713cf8a5f55ad124e15f65a4bd20f088 Mon Sep 17 00:00:00 2001 From: Komisar Date: Mon, 23 Mar 2026 15:27:57 +0300 Subject: [PATCH] Initial commit: Qwen3-TTS Console Assistant implementation --- .gitignore | 36 +++++++ config.yaml | 38 +++++++ main.py | 266 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 9 ++ tts_engine.py | 243 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 592 insertions(+) create mode 100644 .gitignore create mode 100644 config.yaml create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 tts_engine.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..08ca9b4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,36 @@ +echo "# Python +__pycache__/ +*.py[cod] +*.so +*.egg-info/ +dist/ +build/ + +# Virtual Environment +venv/ +env/ +.venv/ + +# Models (Игнорируем тяжелые файлы моделей) +models/ +*.bin +*.safetensors +*.pt + +# Generated Data (История и сэмплы) +out/ +samples/ + +# IDE +.vscode/ +.idea/ +*.swp + +# OS +.DS_Store +Thumbs.db + +# Logs +*.log" > .gitignore + +info.txt diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..39a1068 --- /dev/null +++ b/config.yaml @@ -0,0 +1,38 @@ +# config.yaml +storage: + # Базовый путь для хранения моделей. + # Если папки не существует, она будет создана. + model_path: "./models" + + # Папка для записанных сэмплов голосов + sample_dir: "./samples" + + # Папка для результатов синтеза (история) + output_dir: "./out" + +models: + # Идентификаторы моделей. + # Логика: + # 1. Если путь абсолютный (начинается с / или C:/) -> используется он. + # 2. Иначе ищет в storage.model_path/. + # 3. Если не находит -> качает с HuggingFace в storage.model_path/. + base: "Qwen/Qwen3-TTS-12Hz-1.7B-Base" + voice_design: "Qwen/Qwen3-TTS-12Hz-1.7B-VoiceDesign" + custom_voice: "Qwen/Qwen3-TTS-12Hz-1.7B-CustomVoice" + +generation: + default_language: "Russian" + default_speaker: "Chelsie" + device: "auto" + dtype: "bfloat16" + +recording: + sample_rate: 16000 + channels: 1 + # Чувствительность тишины (0.0 - 1.0). + # Чем меньше число, тем тише звук считается тишиной. + silence_threshold: 0.015 + # Длительность тишины в секундах для автоматической остановки + silence_duration: 1.0 + # Минимальная длительность записи (защита от случайного клика) + min_duration: 2.0 diff --git a/main.py b/main.py new file mode 100644 index 0000000..4064ca3 --- /dev/null +++ b/main.py @@ -0,0 +1,266 @@ +import sys +import os +import time +import sounddevice as sd +import numpy as np +import yaml + +# Добавляем текущую директорию в путь для импорта +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from tts_engine import TTSEngine + +# --- Функции записи и воспроизведения --- + +def get_rms(data): + """Вычисление среднеквадратичного уровня громкости""" + return np.sqrt(np.mean(data**2)) + +def record_sample_interactive(engine): + cfg = engine.config['recording'] + sr = cfg['sample_rate'] + channels = cfg['channels'] + + print("\n--- 🎙 Запись нового сэмпла ---") + name = input("Имя сэмпла (латиница, без пробелов, Enter для auto): ").strip() + if not name: name = f"speaker_{int(time.time())}" + + # Очистка имени от недопустимых символов + name = "".join(c for c in name if c.isalnum() or c in ('_', '-')).strip() + + text_prompt = input("Фраза для чтения (будет сохранена как текст сэмпла): ").strip() + if not text_prompt: + print("❌ Текст сэмпла обязателен для качественного клонирования.") + return None + + save_dir = os.path.join(engine.config['storage']['sample_dir'], name) + if os.path.exists(save_dir): + print(f"⚠️ Сэмпл с именем '{name}' уже существует. Перезапись.") + + print("\n🎤 НАЧАЛО ЗАПИСИ. Говорите сейчас!") + print(f"⏳ Запись остановится автоматически после паузы в {cfg['silence_duration']} сек.") + + frames = [] + silence_counter = 0 + start_time = time.time() + + try: + with sd.InputStream(samplerate=sr, channels=channels, dtype='float32') as stream: + while True: + data, overflowed = stream.read(1024) + if overflowed: + print("⚠️ Buffer overflow") + + frames.append(data.copy()) + + # Логика VAD (Voice Activity Detection) + rms = get_rms(data) + + # Если звук тихий + if rms < cfg['silence_threshold']: + silence_counter += len(data) / sr + else: + silence_counter = 0 # Сброс при наличии голоса + + # Авто-стоп + current_duration = time.time() - start_time + if silence_counter >= cfg['silence_duration'] and current_duration > cfg['min_duration']: + print("\n🛑 Тишина обнаружена. Остановка записи.") + break + + except KeyboardInterrupt: + print("\nЗапись прервана вручную.") + + # Обработка и сохранение + recording = np.concatenate(frames, axis=0) + + # Создаем папку и сохраняем + os.makedirs(save_dir, exist_ok=True) + + audio_path = os.path.join(save_dir, "audio.wav") + sf.write(audio_path, recording, sr) + + prompt_path = os.path.join(save_dir, "prompt.txt") + with open(prompt_path, 'w', encoding='utf-8') as f: + f.write(text_prompt) + + print(f"✅ Сэмпл сохранен: {save_dir}") + return save_dir + +def select_sample_ui(engine): + samples = engine.get_available_samples() + if not samples: + print("Нет сохраненных сэмплов. Сначала запишите один.") + return None + + print("\n--- 📂 Выберите сэмпл ---") + for i, s in enumerate(samples): + txt_preview = s['prompt'][:40] + "..." if len(s['prompt']) > 40 else s['prompt'] + print(f"[{i+1}] {s['name']} : \"{txt_preview}\"") + + try: + idx = int(input("Номер: ")) - 1 + if 0 <= idx < len(samples): + return samples[idx]['path'] + except ValueError: + pass + print("Неверный выбор.") + return None + +def select_audio_device(): + print("\n--- 🔊 Выбор устройства вывода ---") + devices = sd.query_devices() + output_devices = [] + + for i, dev in enumerate(devices): + if dev['max_output_channels'] > 0: + output_devices.append((i, dev['name'])) + + for idx, name in output_devices: + default_marker = " (DEFAULT)" if idx == sd.default.device[1] else "" + print(f"[{idx}] {name}{default_marker}") + + print("\nВведите ID устройства или Enter для использования по умолчанию.") + choice = input(">> ").strip() + + if choice.isdigit(): + return int(choice) + return None + +def play_audio(filepath, device_id=None): + try: + data, sr = sf.read(filepath, dtype='float32') + sd.play(data, sr, device=device_id) + print(f"▶️ Воспроизведение: {os.path.basename(filepath)}") + # sd.wait() # Раскомментировать, если нужно блокировать консоль до конца воспроизведения + except Exception as e: + print(f"❌ Ошибка воспроизведения: {e}") + +# --- Главное меню --- + +def main(): + print("Инициализация движка...") + try: + engine = TTSEngine("config.yaml") + except Exception as e: + print(f"Критическая ошибка инициализации: {e}") + return + + current_sample_path = None + output_device = None + + while True: + print("\n" + "="*40) + print(" QWEN3-TTS CONSOLE (Full Version)") + print("="*40) + print("1. 🎙 Управление сэмплами") + print("2. 🗣 Синтез речи") + print("3. 📁 История (Прослушивание/Чтение)") + print("4. ⚙️ Настройки") + print("0. Выход") + + choice = input(">> ").strip() + + if choice == '1': + print("\n--- Управление сэмплами ---") + print("1. Записать новый сэмпл") + print("2. Вырать существующий") + print("3. Сбросить текущий выбор") + sub = input(">> ").strip() + + if sub == '1': + path = record_sample_interactive(engine) + if path: current_sample_path = path + elif sub == '2': + path = select_sample_ui(engine) + if path: + current_sample_path = path + print(f"✅ Выбран сэмпл: {path}") + elif sub == '3': + current_sample_path = None + print("Сброшено. Будет использован голос по умолчанию.") + + elif choice == '2': + text = input("\nВведите текст: ").strip() + if not text: continue + + print("\nРежим синтеза:") + print(f"1. Клонирование (Сэмпл: {'Да' if current_sample_path else 'Нет'})") + print("2. Описание голоса (Voice Design)") + print("3. Стандартный голос") + mode = input(">> ").strip() + + try: + start_t = time.time() + wavs, sr = None, None + + if mode == '1': + if not current_sample_path: + print("❌ Ошибка: Сэмпл не выбран!") + continue + wavs, sr = engine.generate_with_sample(text, current_sample_path) + + elif mode == '2': + desc = input("Описание голоса (напр. 'Добрый женский голос'): ").strip() + if not desc: desc = "Neutral voice" + wavs, sr = engine.generate_with_description(text, desc) + + elif mode == '3': + wavs, sr = engine.generate_standard(text) + + if wavs is not None: + # Сохранение + saved_path = engine.save_result(text, wavs, sr) + elapsed = time.time() - start_t + print(f"\n✅ Успешно за {elapsed:.2f} сек.") + print(f"📁 Файл: {saved_path}") + + # Вопрос о воспроизведении + ans = input("Воспроизвести сейчас? (y/n): ").strip().lower() + if ans == 'y': + play_audio(saved_path, output_device) + + except Exception as e: + print(f"❌ Ошибка генерации: {e}") + + elif choice == '3': + history = engine.get_history() + if not history: + print("\n📂 Папка out пуста.") + continue + + print(f"\n--- 📂 История ({len(history)} файлов) ---") + # Выводим последние 10 + for i, item in enumerate(history[:10]): + print(f"[{i+1}] {item['filename']}") + print(f" Текст: {item['text'][:50]}...") + + if len(history) > 10: + print("... (показаны последние 10)") + + print("\nВведите номер для прослушивания/чтения или Enter.") + sel = input(">> ").strip() + if sel.isdigit(): + idx = int(sel) - 1 + # Ищем в полном списке, но отображаем 10 + # Для простоты берем индекс из отображаемого списка (0-9) + # Но правильнее из полного списка history + if 0 <= idx < len(history): + item = history[idx] + print(f"\n▶️ Файл: {item['filename']}") + print(f"📝 Текст:\n{item['text']}") + print("-" * 30) + play_audio(item['wav_path'], output_device) + + elif choice == '4': + output_device = select_audio_device() + if output_device: + print(f"Установлено устройство вывода: {sd.query_devices(output_device)['name']}") + else: + print("Используется системное устройство по умолчанию.") + + elif choice == '0': + print("Выход...") + break + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..4a1c13b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +torch +soundfile +sounddevice +numpy +pyyaml +huggingface_hub +transformers +accelerate +qwen-tts diff --git a/tts_engine.py b/tts_engine.py new file mode 100644 index 0000000..88951d6 --- /dev/null +++ b/tts_engine.py @@ -0,0 +1,243 @@ +import torch +import soundfile as sf +import yaml +import os +import time +import datetime +import numpy as np +from pathlib import Path +from typing import Optional, List, Dict, Tuple +from huggingface_hub import snapshot_download + +# --- +# Блок импорта модели. +# Если у вас установлен отдельный пакет qwen-tts: +try: + from qwen_tts import Qwen3TTSModel +except ImportError: + # Заглушка для тестирования без реальной модели (эмуляция) + print("WARNING: qwen_tts not found. Using Mock Model for testing.") + class Qwen3TTSModel: + @staticmethod + def from_pretrained(path, **kwargs): + print(f"[Mock] Loading model from {path}") + return Qwen3TTSModel() + + def create_voice_clone_prompt(self, **kwargs): + return "mock_prompt" + + def generate_voice_clone(self, text, **kwargs): + print(f"[Mock] Generating voice clone for: {text[:30]}...") + # Возвращаем пустой массив нужной размерности + sr = 24000 + duration = len(text) * 0.1 + return np.random.rand(1, int(sr * duration)).astype(np.float32), sr + + def generate_custom_voice(self, text, **kwargs): + return self.generate_voice_clone(text, **kwargs) + + def generate_voice_design(self, text, **kwargs): + return self.generate_voice_clone(text, **kwargs) +# --- + +class TTSEngine: + def __init__(self, config_path: str = "config.yaml"): + with open(config_path, 'r', encoding='utf-8') as f: + self.config = yaml.safe_load(f) + + self.models = {} + try: + self.dtype = getattr(torch, self.config['generation']['dtype']) + except AttributeError: + self.dtype = torch.float32 + + # Инициализация папок + Path(self.config['storage']['model_path']).mkdir(parents=True, exist_ok=True) + Path(self.config['storage']['sample_dir']).mkdir(parents=True, exist_ok=True) + Path(self.config['storage']['output_dir']).mkdir(parents=True, exist_ok=True) + + def _resolve_model(self, model_key: str) -> str: + """ + Умная загрузка моделей: + 1. Абсолютный путь -> использовать его. + 2. Локальный путь внутри model_path -> использовать. + 3. Скачать с HF в model_path. + """ + model_cfg_value = self.config['models'][model_key] + base_model_path = self.config['storage']['model_path'] + + # 1. Если это абсолютный путь или файл уже существует по этому пути + if os.path.isabs(model_cfg_value) or os.path.exists(model_cfg_value): + print(f"📂 Model [{model_key}]: Using direct path {model_cfg_value}") + return model_cfg_value + + # 2. Формируем путь внутри хранилища + # Используем имя репозития как имя папки (замена / на _ если нужно, или сохранение структуры) + folder_name = model_cfg_value.split('/')[-1] + local_path = os.path.join(base_model_path, folder_name) + + if os.path.exists(local_path) and os.listdir(local_path): + print(f"📂 Model [{model_key}]: Found locally at {local_path}") + return local_path + + # 3. Скачивание с Hugging Face + print(f"⬇️ Model [{model_key}]: Not found. Downloading from HF to {local_path}...") + try: + snapshot_download( + repo_id=model_cfg_value, + local_dir=local_path, + local_dir_use_symlinks=False + ) + print(f"✅ Model [{model_key}]: Downloaded.") + return local_path + except Exception as e: + print(f"❌ Error downloading model {model_cfg_value}: {e}") + raise RuntimeError(f"Failed to load model {model_key}") + + def _get_model(self, model_type: str): + if model_type not in self.models: + model_path = self._resolve_model(model_type) + print(f"🚀 Loading model [{model_type}] into memory...") + self.models[model_type] = Qwen3TTSModel.from_pretrained( + model_path, + device_map=self.config['generation']['device'], + torch_dtype=self.dtype + ) + return self.models[model_type] + + def get_available_samples(self) -> List[Dict[str, str]]: + samples = [] + sample_dir = self.config['storage']['sample_dir'] + if not os.path.exists(sample_dir): return samples + + for name in sorted(os.listdir(sample_dir)): + full_path = os.path.join(sample_dir, name) + if os.path.isdir(full_path): + audio_path = os.path.join(full_path, "audio.wav") + prompt_path = os.path.join(full_path, "prompt.txt") + if os.path.exists(audio_path): + prompt = "" + if os.path.exists(prompt_path): + with open(prompt_path, 'r', encoding='utf-8') as f: + prompt = f.read().strip() + samples.append({ + "name": name, + "path": full_path, + "prompt": prompt + }) + return samples + + def generate_with_sample(self, text: str, sample_path: str) -> Tuple[np.ndarray, int]: + """Режим 1: Клонирование по сэмплу""" + model = self._get_model('base') + + audio_file = os.path.join(sample_path, "audio.wav") + prompt_file = os.path.join(sample_path, "prompt.txt") + + ref_text = None + if os.path.exists(prompt_file): + with open(prompt_file, 'r', encoding='utf-8') as f: + ref_text = f.read().strip() + + print(f"🎤 Cloning voice from: {sample_path}") + + # Создаем промпт клонирования + prompt = model.create_voice_clone_prompt( + ref_audio=audio_file, + ref_text=ref_text + ) + + wavs, sr = model.generate_voice_clone( + text=text, + language=self.config['generation']['default_language'], + voice_clone_prompt=prompt + ) + return wavs, sr + + def generate_with_description(self, text: str, description: str) -> Tuple[np.ndarray, int]: + """Режим 2: Генерация голоса по описанию (Design -> Clone)""" + print(f"🎨 Designing voice: '{description}'") + + # Шаг А: Генерируем референс через VoiceDesign + vd_model = self._get_model('voice_design') + ref_text = text[:100] if len(text) > 100 else text + + # Генерируем сэмпл для будущего клонирования + ref_wavs, ref_sr = vd_model.generate_voice_design( + text=ref_text, + language=self.config['generation']['default_language'], + instruct=description + ) + + # Шаг Б: Клонируем этот сгенерированный голос через Base модель + base_model = self._get_model('base') + + # Передаем tuple (numpy_array, sr) как ref_audio + prompt = base_model.create_voice_clone_prompt( + ref_audio=(ref_wavs[0], ref_sr), + ref_text=ref_text + ) + + wavs, sr = base_model.generate_voice_clone( + text=text, + language=self.config['generation']['default_language'], + voice_clone_prompt=prompt + ) + return wavs, sr + + def generate_standard(self, text: str, speaker: str = None) -> Tuple[np.ndarray, int]: + """Режим 3: Стандартный голос""" + model = self._get_model('custom_voice') + speaker = speaker or self.config['generation']['default_speaker'] + + print(f"🗣️ Using built-in speaker: {speaker}") + wavs, sr = model.generate_custom_voice( + text=text, + language=self.config['generation']['default_language'], + speaker=speaker + ) + return wavs, sr + + def save_result(self, text: str, wavs: np.ndarray, sr: int) -> str: + """Сохраняет WAV и TXT в папку out""" + out_dir = self.config['storage']['output_dir'] + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"speech_{timestamp}" + + wav_path = os.path.join(out_dir, f"{filename}.wav") + txt_path = os.path.join(out_dir, f"{filename}.txt") + + # Сохраняем аудио + sf.write(wav_path, wavs[0], sr) + + # Сохраняем текст + with open(txt_path, 'w', encoding='utf-8') as f: + f.write(text) + + return wav_path + + def get_history(self) -> List[Dict[str, str]]: + """Возвращает список сгенерированных файлов""" + out_dir = self.config['storage']['output_dir'] + history = [] + if not os.path.exists(out_dir): return history + + files = sorted(os.listdir(out_dir), reverse=True) + for f in files: + if f.endswith(".wav"): + base_name = f[:-4] + txt_path = os.path.join(out_dir, f"{base_name}.txt") + wav_path = os.path.join(out_dir, f) + + text_content = "(Текст не найден)" + if os.path.exists(txt_path): + with open(txt_path, 'r', encoding='utf-8') as file: + text_content = file.read() + + history.append({ + "filename": f, + "wav_path": wav_path, + "txt_path": txt_path, + "text": text_content + }) + return history