当前位置: 首页 > news >正文

手把手教你用Flask搭个视频中转站:爬取m3u8流,本地/Cloudflare R2双备份实战

Flask视频中转站实战:从m3u8爬取到双备份存储的完整方案

在数字内容爆炸式增长的今天,视频资源的管理和存储成为了许多独立开发者和内容创作者面临的挑战。想象一下这样的场景:你是一位知识付费内容创作者,课程视频分散在多个平台;或者你是一位自媒体运营者,需要整理来自不同渠道的素材。这些视频往往以m3u8流媒体形式存在,难以统一管理。本文将带你从零开始构建一个基于Flask的视频中转站,实现m3u8视频的自动爬取、本地存储与Cloudflare R2云存储的双备份方案。

1. 环境准备与基础架构设计

在开始编码之前,我们需要规划好整个系统的架构。一个健壮的视频中转站应该包含以下几个核心模块:

  • 视频爬取模块:负责解析m3u8文件并下载ts分片
  • 本地存储模块:将视频文件保存到服务器本地磁盘
  • 云存储模块:将视频同步上传到Cloudflare R2
  • 任务管理模块:记录爬取状态和存储位置
  • Web接口模块:提供API供外部调用

1.1 安装必要的Python包

首先创建一个新的Python虚拟环境,然后安装以下依赖:

pip install flask flask-sqlalchemy pymysql boto3 requests beautifulsoup4 python-dotenv

这些包各自的作用如下:

  • flask:我们的Web框架核心
  • flask-sqlalchemy:ORM工具,用于数据库操作
  • pymysql:MySQL数据库驱动
  • boto3:AWS SDK,用于与Cloudflare R2交互
  • requests:HTTP请求库,用于下载视频
  • beautifulsoup4:HTML解析,用于提取视频链接
  • python-dotenv:管理环境变量

1.2 数据库设计

我们使用MySQL来存储爬取任务和配置信息。创建两个主要表:

CREATE TABLE video_tasks ( id INT AUTO_INCREMENT PRIMARY KEY, source_url VARCHAR(512) NOT NULL, status ENUM('pending', 'downloading', 'completed', 'failed') DEFAULT 'pending', local_path VARCHAR(512), cloud_path VARCHAR(512), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); CREATE TABLE system_config ( id INT AUTO_INCREMENT PRIMARY KEY, config_key VARCHAR(255) NOT NULL UNIQUE, config_value TEXT, description VARCHAR(512) );

2. Flask应用核心实现

2.1 应用初始化与配置

创建一个app.py文件作为应用入口:

from flask import Flask from flask_sqlalchemy import SQLAlchemy from dotenv import load_dotenv import os load_dotenv() app = Flask(__name__) app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DB_URI') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) # 导入模型和路由 from models import VideoTask, SystemConfig from routes import api_blueprint app.register_blueprint(api_blueprint, url_prefix='/api')

2.2 视频下载与处理核心逻辑

创建一个video_processor.py文件处理视频下载:

import os import requests import tempfile from concurrent.futures import ThreadPoolExecutor from urllib.parse import urlparse class VideoProcessor: def __init__(self, max_workers=5): self.executor = ThreadPoolExecutor(max_workers=max_workers) def download_m3u8(self, m3u8_url, output_dir, headers=None): """下载m3u8文件及其所有ts分片""" try: # 下载m3u8文件 response = requests.get(m3u8_url, headers=headers) response.raise_for_status() # 解析m3u8内容 content = response.text lines = content.split('\n') ts_urls = [line.strip() for line in lines if line.endswith('.ts')] # 创建本地目录 os.makedirs(output_dir, exist_ok=True) # 保存m3u8文件 m3u8_path = os.path.join(output_dir, 'index.m3u8') with open(m3u8_path, 'w') as f: f.write(content) # 并行下载所有ts文件 futures = [] for ts_url in ts_urls: if not ts_url.startswith('http'): base_url = m3u8_url.rsplit('/', 1)[0] ts_url = f"{base_url}/{ts_url}" future = self.executor.submit( self.download_ts, ts_url, output_dir, headers ) futures.append(future) # 等待所有下载完成 for future in futures: future.result() return True, m3u8_path except Exception as e: return False, str(e) def download_ts(self, ts_url, output_dir, headers=None): """下载单个ts文件""" try: response = requests.get(ts_url, headers=headers) response.raise_for_status() filename = os.path.basename(urlparse(ts_url).path) filepath = os.path.join(output_dir, filename) with open(filepath, 'wb') as f: f.write(response.content) return True, filepath except Exception as e: return False, str(e)

3. Cloudflare R2集成

3.1 R2配置与初始化

.env文件中添加R2配置:

R2_ENDPOINT_URL=https://your-account-id.r2.cloudflarestorage.com R2_ACCESS_KEY_ID=your-access-key R2_SECRET_ACCESS_KEY=your-secret-key R2_BUCKET_NAME=your-bucket-name

创建r2_client.py处理与Cloudflare R2的交互:

import boto3 from botocore.exceptions import ClientError import os from dotenv import load_dotenv load_dotenv() class R2Client: def __init__(self): self.client = boto3.client( 's3', endpoint_url=os.getenv('R2_ENDPOINT_URL'), aws_access_key_id=os.getenv('R2_ACCESS_KEY_ID'), aws_secret_access_key=os.getenv('R2_SECRET_ACCESS_KEY') ) self.bucket_name = os.getenv('R2_BUCKET_NAME') def upload_file(self, file_path, object_name=None): """上传文件到R2""" if object_name is None: object_name = os.path.basename(file_path) try: self.client.upload_file(file_path, self.bucket_name, object_name) return True, f"r2://{self.bucket_name}/{object_name}" except ClientError as e: return False, str(e) def upload_directory(self, local_dir, r2_prefix=''): """上传整个目录到R2""" results = [] for root, _, files in os.walk(local_dir): for file in files: local_path = os.path.join(root, file) relative_path = os.path.relpath(local_path, local_dir) r2_path = os.path.join(r2_prefix, relative_path).replace('\\', '/') success, message = self.upload_file(local_path, r2_path) results.append({ 'file': file, 'success': success, 'message': message }) return results

3.2 分片上传优化

对于大文件上传,我们可以使用分片上传提高可靠性:

def multipart_upload(self, file_path, object_name=None, part_size=8*1024*1024): """分片上传大文件到R2""" if object_name is None: object_name = os.path.basename(file_path) try: # 初始化分片上传 mpu = self.client.create_multipart_upload( Bucket=self.bucket_name, Key=object_name ) mpu_id = mpu['UploadId'] # 分片上传 parts = [] with open(file_path, 'rb') as f: i = 1 while True: data = f.read(part_size) if not data: break part = self.client.upload_part( Bucket=self.bucket_name, Key=object_name, PartNumber=i, UploadId=mpu_id, Body=data ) parts.append({ 'PartNumber': i, 'ETag': part['ETag'] }) i += 1 # 完成分片上传 result = self.client.complete_multipart_upload( Bucket=self.bucket_name, Key=object_name, UploadId=mpu_id, MultipartUpload={'Parts': parts} ) return True, result['Location'] except Exception as e: # 出错时中止上传 self.client.abort_multipart_upload( Bucket=self.bucket_name, Key=object_name, UploadId=mpu_id ) return False, str(e)

4. 系统监控与错误处理

4.1 日志记录配置

良好的日志系统对于运维至关重要。在Flask中配置日志:

import logging from logging.handlers import RotatingFileHandler def setup_logging(app): # 禁用Flask默认的日志处理器 app.logger.handlers.clear() # 设置日志格式 formatter = logging.Formatter( '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]' ) # 文件日志 - 按大小轮转 file_handler = RotatingFileHandler( 'video_transfer.log', maxBytes=1024*1024*10, # 10MB backupCount=5 ) file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) app.logger.addHandler(file_handler) # 控制台日志 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) console_handler.setLevel(logging.DEBUG) app.logger.addHandler(console_handler) app.logger.setLevel(logging.DEBUG) app.logger.info('Video Transfer logging initialized')

4.2 错误处理中间件

创建错误处理中间件统一处理异常:

from functools import wraps from flask import jsonify def handle_errors(f): @wraps(f) def wrapper(*args, **kwargs): try: return f(*args, **kwargs) except requests.exceptions.RequestException as e: app.logger.error(f"Request error: {str(e)}") return jsonify({ 'status': 'error', 'message': f'Network error: {str(e)}' }), 500 except Exception as e: app.logger.error(f"Unexpected error: {str(e)}") return jsonify({ 'status': 'error', 'message': f'Internal server error: {str(e)}' }), 500 return wrapper

4.3 任务状态监控

我们可以创建一个简单的监控端点来检查系统状态:

@app.route('/api/status') def system_status(): # 获取数据库状态 db_status = 'ok' try: db.session.execute('SELECT 1') except Exception as e: db_status = str(e) # 获取存储空间状态 local_storage = { 'total': os.statvfs('/').f_blocks * os.statvfs('/').f_frsize, 'used': (os.statvfs('/').f_blocks - os.statvfs('/').f_bfree) * os.statvfs('/').f_frsize } # 获取任务统计 stats = { 'pending': VideoTask.query.filter_by(status='pending').count(), 'downloading': VideoTask.query.filter_by(status='downloading').count(), 'completed': VideoTask.query.filter_by(status='completed').count(), 'failed': VideoTask.query.filter_by(status='failed').count() } return jsonify({ 'database': db_status, 'storage': local_storage, 'tasks': stats })

5. 部署与性能优化

5.1 生产环境部署

对于生产环境,我们推荐使用Gunicorn作为WSGI服务器,配合Nginx作为反向代理:

pip install gunicorn

创建gunicorn.conf.py配置文件:

workers = 4 worker_class = 'gevent' bind = '0.0.0.0:8000' accesslog = '-' errorlog = '-' timeout = 120 keepalive = 5

Nginx配置示例:

server { listen 80; server_name yourdomain.com; location / { proxy_pass http://127.0.0.1:8000; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; } location /static { alias /path/to/your/static/files; } }

5.2 性能优化技巧

  1. 连接池优化
from sqlalchemy.pool import QueuePool app.config['SQLALCHEMY_ENGINE_OPTIONS'] = { 'poolclass': QueuePool, 'pool_size': 10, 'max_overflow': 20, 'pool_timeout': 30, 'pool_recycle': 3600 }
  1. 缓存常用配置
from flask_caching import Cache cache = Cache(config={ 'CACHE_TYPE': 'SimpleCache', 'CACHE_DEFAULT_TIMEOUT': 300 }) cache.init_app(app) @cache.memoize(timeout=60) def get_config_value(key): config = SystemConfig.query.filter_by(config_key=key).first() return config.config_value if config else None
  1. 异步任务处理

对于长时间运行的任务,可以使用Celery进行异步处理:

from celery import Celery def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery app.config.update( CELERY_BROKER_URL='redis://localhost:6379/0', CELERY_RESULT_BACKEND='redis://localhost:6379/0' ) celery = make_celery(app) @celery.task(bind=True) def process_video_task(self, task_id): task = VideoTask.query.get(task_id) if not task: return {'status': 'error', 'message': 'Task not found'} try: task.status = 'downloading' db.session.commit() processor = VideoProcessor() success, result = processor.download_m3u8( task.source_url, f"videos/{task.id}" ) if success: # 上传到R2 r2 = R2Client() upload_results = r2.upload_directory( f"videos/{task.id}", f"videos/{task.id}" ) task.status = 'completed' task.local_path = result task.cloud_path = upload_results[0]['message'] # 取第一个文件的上传路径 db.session.commit() return {'status': 'success', 'task_id': task.id} else: task.status = 'failed' db.session.commit() return {'status': 'error', 'message': result} except Exception as e: task.status = 'failed' db.session.commit() return {'status': 'error', 'message': str(e)}
http://www.jsqmd.com/news/920319/

相关文章:

  • 不止于上报:用移远EC800M+QuecPython玩转MQTT双向通信(订阅/发布详解)
  • 别再死记硬背了!用Pikachu靶场实战,手把手教你理解XSS攻击的5种触发方式
  • 从零搭建一个AIoT小项目:用IMX6ULL和WS2812B灯带玩转智能环境感知
  • 2026实验室装修技术指南:大型写字楼装修、实验室装修、无尘车间装修、净化厂房装修、办公室装修、办公室设计、办公楼装修选择指南 - 优质品牌商家
  • ZYNQ7100实战:用AXI DMA把PL端ADC数据高速灌进PS DDR(Vivado 2017.4配置详解)
  • MySQL 5.7.44 安装后必做的5件事:从修改root密码到避免常见连接错误
  • 别再只会用默认参数了!MATLAB medfilt2滤波核大小[m n]和padopt参数实战避坑指南
  • QMCDecode终极指南:如何快速将QQ音乐加密格式转换为通用音频文件
  • 华为S5720/S6720交换机配置备份与恢复实操:FTP、TFTP、SFTP到底怎么选?
  • 从一次充电故障说起:我是如何通过分析USB PD消息头(Message Header)定位和解决握手问题的
  • Lindy安全响应自动化能力评估模型(Gartner未公开的7维成熟度框架)
  • 告别卡顿!实测最有效的CLion虚拟机参数调优与内存分配方案(Ubuntu环境)
  • 别再只盯着功放了!拆解TDA7294芯片,看它如何在400Hz精密电源里扮演‘稳压放大’核心角色
  • 2026年4月养老院软件系统诚信之选:智能化养老设备/最近养老院/养老管理系统/养老院平台运营/养老院护理系统/选择指南 - 优质品牌商家
  • RTMDet数据增强的‘缓存’黑科技:如何用CachedMosaic和MixUp让你的目标检测训练快起来
  • 别再手动写RAM了!Vivado里这个Distributed Memory Generator IP核,5分钟搞定ROM/RAM配置
  • 多智能体协作框架对比:LangGraph、AutoGen、CrewAI 的取舍维度
  • 告别手动抠图!用Labelme的AI-Polygon功能快速分割图像(Python 3.8 + Windows保姆级教程)
  • 保姆级教程:在Windows 10/11上手动配置MySQL 5.7.44的my.ini和环境变量
  • 手把手教你用Docker Compose一键部署WVP-PRO+ZLM+录像服务(含Nginx反代)
  • ThinkPad X1 Carbon相机罢工?别急着重装驱动,先试试这个‘暂停更新’大法(附0x80070103错误解决)
  • 从石英振荡到TDA7294功放:深入拆解一个400Hz中频电源的每个模块(含稳压电路设计)
  • 深入Xilinx AXI UART 16550 IP核:从16550历史到FIFO中断机制的底层逻辑全解析
  • 别再只盯着原理图了!400Hz电源设计中TDA7294功放芯片的实战选型与散热避坑指南
  • 【AI Daily】AI日报 | 2026-05-30
  • 【Lindy函数计算自动化白皮书】:基于17个行业真实案例,验证MTBF提升3.8倍的关键公式
  • 别再用MNIST了!用路透社数据集实战多分类,解决新闻主题自动归类问题
  • Zotero Style:让文献管理变得直观高效的智能插件
  • 告别手动点点点!用Auto.js脚本一键直达抖音直播间和用户主页(附完整Scheme清单)
  • 毕业设计救星:手把手教你用单片机+AD采集搞定400Hz中频电源(附完整电路图)