Kokoro TTS自动化脚本编写:批量处理大量文件的完整方案
Kokoro TTS自动化脚本编写:批量处理大量文件的完整方案
【免费下载链接】kokoro-ttsA CLI text-to-speech tool using the Kokoro model, supporting multiple languages, voices (with blending), and various input formats including EPUB books and PDF documents.项目地址: https://gitcode.com/gh_mirrors/ko/kokoro-tts
Kokoro TTS是一款功能强大的命令行文本转语音工具,支持多种语言、语音混合以及EPUB和PDF文档处理。对于需要处理大量文件的用户来说,手动操作每个文件既耗时又容易出错。本文将为您提供一套完整的Kokoro TTS自动化脚本编写方案,帮助您高效批量处理大量文件,节省宝贵时间。🚀
为什么需要自动化脚本?
当您需要处理数十甚至数百个文本文件、电子书或文档时,手动运行Kokoro TTS命令不仅繁琐,还容易出错。自动化脚本可以帮助您:
- 批量处理:一次性处理多个文件
- 统一配置:确保所有文件使用相同的语音、语速等参数
- 错误处理:自动处理失败的任务并重试
- 进度跟踪:实时监控处理进度
- 结果整理:自动组织输出文件结构
准备工作与环境配置
在开始编写自动化脚本之前,确保您已经正确安装了Kokoro TTS:
# 使用uv安装(推荐) uv tool install kokoro-tts # 或使用pip安装 pip install kokoro-tts下载必要的模型文件:
wget https://github.com/nazdridoy/kokoro-tts/releases/download/v1.0.0/voices-v1.0.bin wget https://github.com/nazdridoy/kokoro-tts/releases/download/v1.0.0/kokoro-v1.0.onnx基础批量处理脚本
1. 文本文件批量转换脚本
创建一个简单的Python脚本来自动处理多个文本文件:
import os import subprocess import sys def batch_process_text_files(input_dir, output_dir, voice="af_sarah", speed=1.0, lang="en-us"): """ 批量处理文本文件 :param input_dir: 输入目录路径 :param output_dir: 输出目录路径 :param voice: 语音选择 :param speed: 语速 :param lang: 语言代码 """ # 确保输出目录存在 os.makedirs(output_dir, exist_ok=True) # 获取所有txt文件 txt_files = [f for f in os.listdir(input_dir) if f.endswith('.txt')] print(f"找到 {len(txt_files)} 个文本文件需要处理") for i, txt_file in enumerate(txt_files, 1): input_path = os.path.join(input_dir, txt_file) output_name = os.path.splitext(txt_file)[0] + ".wav" output_path = os.path.join(output_dir, output_name) print(f"处理文件 {i}/{len(txt_files)}: {txt_file}") # 构建Kokoro TTS命令 cmd = [ "kokoro-tts", input_path, output_path, "--voice", voice, "--speed", str(speed), "--lang", lang ] try: # 执行命令 result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: print(f"✓ 成功处理: {txt_file}") else: print(f"✗ 处理失败: {txt_file}") print(f"错误信息: {result.stderr}") except Exception as e: print(f"✗ 执行错误: {txt_file} - {e}") if __name__ == "__main__": # 示例用法 batch_process_text_files( input_dir="./texts", output_dir="./audio_output", voice="af_sarah", speed=1.2, lang="en-us" )2. EPUB电子书批量处理脚本
对于电子书爱好者,这个脚本可以批量处理EPUB文件:
import os import subprocess import json def batch_process_epub_files(input_dir, output_base_dir, voice="af_sarah", speed=1.0, lang="en-us"): """ 批量处理EPUB文件,每个文件单独创建目录 :param input_dir: 输入目录路径 :param output_base_dir: 输出基础目录 :param voice: 语音选择 :param speed: 语速 :param lang: 语言代码 """ # 确保输出目录存在 os.makedirs(output_base_dir, exist_ok=True) # 获取所有epub文件 epub_files = [f for f in os.listdir(input_dir) if f.endswith('.epub')] print(f"找到 {len(epub_files)} 个EPUB文件需要处理") for i, epub_file in enumerate(epub_files, 1): input_path = os.path.join(input_dir, epub_file) book_name = os.path.splitext(epub_file)[0] output_dir = os.path.join(output_base_dir, book_name) print(f"处理电子书 {i}/{len(epub_files)}: {book_name}") # 构建Kokoro TTS命令(分章节输出) cmd = [ "kokoro-tts", input_path, "--split-output", output_dir, "--voice", voice, "--speed", str(speed), "--lang", lang, "--format", "mp3" ] try: # 执行命令 result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: print(f"✓ 成功处理: {book_name}") # 生成处理报告 generate_processing_report(output_dir, book_name) else: print(f"✗ 处理失败: {book_name}") print(f"错误信息: {result.stderr}") except Exception as e: print(f"✗ 执行错误: {book_name} - {e}") def generate_processing_report(output_dir, book_name): """生成处理报告""" report_path = os.path.join(output_dir, "processing_report.json") # 统计章节信息 chapters = [] for item in os.listdir(output_dir): if item.startswith("chapter_") and os.path.isdir(os.path.join(output_dir, item)): chapter_dir = os.path.join(output_dir, item) info_file = os.path.join(chapter_dir, "info.txt") chapter_info = { "chapter": item, "chunks": len([f for f in os.listdir(chapter_dir) if f.startswith("chunk_") and f.endswith(".mp3")]) } if os.path.exists(info_file): with open(info_file, 'r', encoding='utf-8') as f: chapter_info["title"] = f.read().strip() chapters.append(chapter_info) # 生成报告 report = { "book": book_name, "total_chapters": len(chapters), "chapters": chapters, "total_audio_files": sum(c["chunks"] for c in chapters) } with open(report_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) print(f"生成处理报告: {report_path}") if __name__ == "__main__": batch_process_epub_files( input_dir="./ebooks", output_base_dir="./audio_books", voice="af_sarah", speed=1.0, lang="en-us" )高级自动化脚本方案
3. 带错误重试和进度监控的脚本
import os import subprocess import time import logging from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('kokoro_batch.log'), logging.StreamHandler() ] ) class KokoroBatchProcessor: def __init__(self, config): self.config = config self.success_count = 0 self.failed_count = 0 self.start_time = None def process_single_file(self, input_file, output_file, retry_count=3): """处理单个文件,支持重试""" cmd = [ "kokoro-tts", input_file, output_file, "--voice", self.config.get("voice", "af_sarah"), "--speed", str(self.config.get("speed", 1.0)), "--lang", self.config.get("lang", "en-us"), "--format", self.config.get("format", "wav") ] for attempt in range(retry_count): try: logging.info(f"尝试 {attempt+1}/{retry_count}: {input_file}") result = subprocess.run( cmd, capture_output=True, text=True, timeout=self.config.get("timeout", 300) # 5分钟超时 ) if result.returncode == 0: logging.info(f"✓ 成功处理: {input_file}") return True else: logging.warning(f"处理失败 (尝试 {attempt+1}): {input_file}") logging.warning(f"错误: {result.stderr}") if attempt < retry_count - 1: wait_time = 5 * (attempt + 1) # 指数退避 logging.info(f"等待 {wait_time}秒后重试...") time.sleep(wait_time) except subprocess.TimeoutExpired: logging.error(f"超时: {input_file}") except Exception as e: logging.error(f"异常: {input_file} - {e}") logging.error(f"✗ 处理失败,已重试{retry_count}次: {input_file}") return False def batch_process(self, file_list, max_workers=2): """批量处理文件,支持并发""" self.start_time = datetime.now() total_files = len(file_list) logging.info(f"开始批量处理 {total_files} 个文件") logging.info(f"配置: {self.config}") with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {} for input_file, output_file in file_list: future = executor.submit( self.process_single_file, input_file, output_file, self.config.get("retry_count", 3) ) futures[future] = (input_file, output_file) for future in as_completed(futures): input_file, output_file = futures[future] try: success = future.result() if success: self.success_count += 1 else: self.failed_count += 1 # 更新进度 processed = self.success_count + self.failed_count progress = processed / total_files * 100 logging.info(f"进度: {processed}/{total_files} ({progress:.1f}%)") except Exception as e: logging.error(f"任务异常: {input_file} - {e}") self.failed_count += 1 self.generate_summary_report() def generate_summary_report(self): """生成汇总报告""" end_time = datetime.now() duration = (end_time - self.start_time).total_seconds() report = { "start_time": self.start_time.isoformat(), "end_time": end_time.isoformat(), "duration_seconds": duration, "total_files": self.success_count + self.failed_count, "success_count": self.success_count, "failed_count": self.failed_count, "success_rate": self.success_count / (self.success_count + self.failed_count) * 100, "config": self.config } report_file = f"batch_report_{self.start_time.strftime('%Y%m%d_%H%M%S')}.json" import json with open(report_file, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) logging.info(f"批量处理完成!报告已保存到: {report_file}") logging.info(f"成功: {self.success_count}, 失败: {self.failed_count}") logging.info(f"总耗时: {duration:.1f}秒") # 使用示例 if __name__ == "__main__": # 配置文件 config = { "voice": "af_sarah", "speed": 1.2, "lang": "en-us", "format": "mp3", "retry_count": 3, "timeout": 600 # 10分钟超时 } # 准备文件列表 input_dir = "./documents" output_dir = "./audio_output" os.makedirs(output_dir, exist_ok=True) file_list = [] for filename in os.listdir(input_dir): if filename.endswith(('.txt', '.pdf')): input_path = os.path.join(input_dir, filename) output_name = os.path.splitext(filename)[0] + ".mp3" output_path = os.path.join(output_dir, output_name) file_list.append((input_path, output_path)) # 创建处理器并执行 processor = KokoroBatchProcessor(config) processor.batch_process(file_list, max_workers=2)实用技巧与最佳实践
4. 配置文件管理
创建一个配置文件来管理不同的处理方案:
# config.yaml profiles: english_audiobook: voice: "af_sarah" speed: 1.0 lang: "en-us" format: "mp3" output_dir: "./audiobooks/english" chinese_document: voice: "zf_xiaoxiao" speed: 1.1 lang: "cmn" format: "wav" output_dir: "./audiobooks/chinese" fast_processing: voice: "am_adam" speed: 1.5 lang: "en-us" format: "mp3" output_dir: "./quick_results"5. 监控脚本
创建一个实时监控脚本,跟踪处理进度:
import time import os from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class AudioFileHandler(FileSystemEventHandler): def __init__(self, output_dir): self.output_dir = output_dir self.processed_files = set() def on_created(self, event): if not event.is_directory and event.src_path.endswith(('.wav', '.mp3')): filename = os.path.basename(event.src_path) if filename not in self.processed_files: self.processed_files.add(filename) print(f"✓ 新音频文件生成: {filename}") print(f" 路径: {event.src_path}") print(f" 大小: {os.path.getsize(event.src_path) / 1024 / 1024:.2f} MB") def monitor_directory(directory_to_watch): """监控目录中的新文件""" print(f"开始监控目录: {directory_to_watch}") print("按 Ctrl+C 停止监控") event_handler = AudioFileHandler(directory_to_watch) observer = Observer() observer.schedule(event_handler, directory_to_watch, recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() print("\n监控已停止") observer.join() if __name__ == "__main__": monitor_directory("./audio_output")常见问题与解决方案
Q1: 如何处理内存不足的问题?
解决方案:
- 使用
--split-output参数分章节处理大型文件 - 增加Python内存限制:
export PYTHONMALLOC=malloc - 分批处理文件,不要一次性加载所有内容
Q2: 如何处理处理中断的情况?
解决方案:
- 使用带断点续传的脚本
- 检查已存在的输出文件,跳过已处理的部分
- 使用日志记录处理进度
Q3: 如何优化处理速度?
解决方案:
- 使用并发处理(但注意不要超过系统资源限制)
- 调整
--speed参数提高语速 - 使用更简单的语音模型(如果质量可接受)
Q4: 如何确保语音质量一致性?
解决方案:
- 使用相同的语音和参数配置
- 预处理文本,确保格式统一
- 使用
--debug模式检查处理细节
总结与建议
Kokoro TTS的自动化脚本编写可以显著提高批量文件处理的效率。以下是一些关键建议:
- 从简单开始:先实现基础功能,再逐步添加高级特性
- 做好错误处理:确保脚本能够优雅地处理各种异常情况
- 记录日志:详细的日志有助于调试和监控
- 测试充分:在小规模数据集上测试脚本,确保稳定后再处理重要文件
- 资源管理:监控系统资源使用,避免过度消耗内存或CPU
通过本文提供的完整方案,您可以轻松构建适合自己需求的Kokoro TTS自动化处理系统,无论是处理文档、电子书还是其他文本内容,都能高效完成批量转换任务。🎯
记住,自动化脚本的核心目标是让您专注于内容创作,而不是重复的技术操作。现在就开始编写您的第一个Kokoro TTS自动化脚本吧!
【免费下载链接】kokoro-ttsA CLI text-to-speech tool using the Kokoro model, supporting multiple languages, voices (with blending), and various input formats including EPUB books and PDF documents.项目地址: https://gitcode.com/gh_mirrors/ko/kokoro-tts
创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考
