告别手动刷新!用Python+Watchdog为你的Emby Server打造一个自动影片推送机器人(附Docker一键部署)
用Python+Watchdog构建Emby Server智能推送系统:从监听文件到Docker化部署
每次下载新影片后手动刷新Emby Server的日子该结束了。作为媒体库管理员,我们都经历过这样的场景:下载了一部期待已久的大片,却因为忘记刷新媒体库而错过第一时间欣赏的机会。本文将带你用Python的Watchdog模块打造一个智能监控系统,结合Telegram Bot实现"入库即推送"的自动化体验,最后通过Docker容器化让部署变得轻而易举。
1. 系统架构设计与核心组件
构建一个高效的Emby Server自动推送系统需要考虑三个核心组件:文件监听、信息处理和消息推送。Watchdog模块负责监听文件系统变化,TMDB API提供影片元数据,而Telegram Bot则作为用户交互的桥梁。
关键组件对比表:
| 组件 | 作用 | 替代方案 | 优势 |
|---|---|---|---|
| Watchdog | 实时监控文件系统变化 | pyinotify (Linux only) | 跨平台支持,API简洁 |
| TMDB API | 获取影片元数据 | OMDb API | 数据全面,更新及时 |
| Telegram Bot | 推送通知 | Discord Webhook | 用户友好,功能丰富 |
在Python生态中,Watchdog提供了跨平台的文件系统事件监控能力。它基于操作系统底层API实现,在Linux上使用inotify,在macOS使用FSEvents,在Windows使用ReadDirectoryChangesW,确保了我们方案的可移植性。
from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler class EmbyHandler(FileSystemEventHandler): def on_created(self, event): if event.src_path.endswith('.nfo'): process_new_media(event.src_path)这段基础监听代码展示了系统的核心逻辑——当检测到新的.nfo文件创建时触发处理流程。Emby Server会自动为每个新入库的媒体文件生成这种XML格式的元数据文件。
2. 深度解析.nfo文件与TMDB数据整合
Emby生成的.nfo文件包含了媒体内容的基本信息,但要想获得更丰富的元数据和高质量的海报图片,我们需要结合TMDB的API进行补充查询。
典型的.nfo文件结构示例:
<movie> <title>盗梦空间</title> <originaltitle>Inception</originaltitle> <year>2010</year> <tmdbid>27205</tmdbid> <plot>一群特殊的盗贼能够潜入人们梦境...</plot> </movie>解析这个XML文件后,我们可以提取关键字段如tmdbid,然后用它查询TMDB获取更详细的信息:
def get_tmdb_details(tmdb_id, media_type): base_url = "https://api.themoviedb.org/3" headers = {"Authorization": f"Bearer {TMDB_API}"} if media_type == "movie": url = f"{base_url}/movie/{tmdb_id}" else: url = f"{base_url}/tv/{tmdb_id}" response = requests.get(url, headers=headers) return response.json()提示:TMDB API有请求频率限制,建议实现本地缓存机制避免重复查询相同内容
处理剧集类内容时逻辑会稍复杂,因为需要区分季(Season)和集(Episode)的信息。一个完整的实现应该能够处理以下情况:
- 新电影入库
- 新剧集季入库
- 剧集新一集入库
3. Telegram Bot集成与消息模板设计
Telegram Bot提供了极其灵活的消息格式支持,我们可以根据媒体类型设计不同的消息模板,让推送内容更具吸引力。
电影类消息模板示例:
def format_movie_message(movie_info): return f""" 🎬 *{movie_info['title']}* ({movie_info['year']}) ⭐ 评分: {movie_info['vote_average']}/10 | 📅 上映: {movie_info['release_date']} {movie_info['overview']} [查看海报]({movie_info['poster_url']}) """对于剧集类内容,可以包含更多季和集的信息:
def format_tv_message(tv_info): seasons = "\n".join([f"• 第{season['season_number']}季 ({season['episode_count']}集)" for season in tv_info['seasons']]) return f""" 📺 *{tv_info['name']}* 🔄 状态: {tv_info['status']} | 🌍 语言: {tv_info['original_language']} {seasons} {tv_info['overview']} """注意:Telegram的Markdown解析器有些特殊,确保正确转义特殊字符避免格式错误
为了提升用户体验,可以考虑添加交互按钮,让用户可以直接从推送消息执行一些操作:
- 标记为已观看
- 添加到收藏
- 立即播放(需要配合Emby API)
4. Docker化部署与生产环境优化
将整个系统Docker化可以解决环境依赖问题,并简化部署流程。基于Python的Alpine镜像可以大幅减小镜像体积,提升部署效率。
优化后的Dockerfile示例:
FROM python:3.10-alpine WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . RUN mkdir -p /var/tmp/emby-watchdog ENV PYTHONUNBUFFERED=1 CMD ["python", "main.py"]这个精简的Dockerfile相比原始版本减少了约70%的镜像体积。关键优化点包括:
- 使用Alpine基础镜像
--no-cache-dir避免缓存pip包- 清理不必要的构建中间文件
环境变量配置指南:
| 变量名 | 必需 | 示例值 | 说明 |
|---|---|---|---|
| BOT_TOKEN | 是 | 123456:ABC-DEF1234 | Telegram Bot Token |
| CHAT_ID | 是 | -100123456789 | 目标聊天ID |
| TMDB_API | 是 | abcdef1234567890 | TMDB API密钥 |
| MEDIA_PATH | 是 | /media/movies | 媒体库挂载路径 |
| LOG_LEVEL | 否 | INFO | 日志级别(DEBUG/INFO/WARNING) |
启动容器时,确保正确挂载媒体库目录:
docker run -d --name emby-watchdog \ -v /path/to/media:/media/movies \ -e BOT_TOKEN="your_token" \ -e CHAT_ID="your_chat_id" \ -e TMDB_API="your_tmdb_key" \ -e MEDIA_PATH="/media/movies" \ your-image-name对于长期运行的服务,建议添加以下配置:
- 日志轮转策略
- 健康检查端点
- 资源使用限制
- 自动重启策略
5. 高级功能扩展与实践建议
基础功能实现后,可以考虑添加一些增强功能来提升系统的实用性和可靠性。
可能的扩展方向:
- 多媒体库路径支持
- 自定义消息模板
- 用户偏好设置(过滤某些类型的通知)
- 失败重试机制
- 推送历史记录
一个实用的扩展是实现媒体类型过滤,让用户可以选择只接收特定类型的通知:
# 在环境变量中添加 FILTER_TYPES = os.getenv('FILTER_TYPES', 'movie,tv').lower().split(',') # 在处理逻辑中添加检查 if media_type not in FILTER_TYPES: logging.info(f"Skipping {media_type} as it's not in filtered types") return性能优化建议:
- 使用异步IO处理网络请求(如aiohttp)
- 实现本地缓存减少API调用
- 批量处理短时间内的大量文件变更
- 优化日志记录级别减少I/O压力
监控方面,可以集成Prometheus指标导出,跟踪:
- 文件处理数量
- API调用次数
- 推送成功率
- 处理延迟
from prometheus_client import Counter, Gauge processed_files = Counter('emby_processed_files', 'Number of processed files') push_errors = Counter('emby_push_errors', 'Number of push failures') processing_time = Gauge('emby_processing_time', 'Time taken to process a file')实际部署中可能会遇到各种边缘情况,比如:
- 临时文件导致的误报
- 网络波动导致的API失败
- 文件系统事件丢失
- 权限问题
针对这些问题,我的经验是增加足够的日志记录,并实现适当的重试逻辑。例如,对于TMDB API调用:
def safe_tmdb_request(url, max_retries=3): for attempt in range(max_retries): try: response = requests.get(url, timeout=10) response.raise_for_status() return response.json() except Exception as e: if attempt == max_retries - 1: raise time.sleep(2 ** attempt)媒体文件处理本身可能很耗时,特别是当需要下载高清海报时。可以考虑实现一个预热的机制,在系统启动时扫描媒体库,建立本地缓存,避免首次推送时等待时间过长。
