用Flask和Python搞定m3u8视频下载与备份:本地存储+Cloudflare R2上传保姆级教程
Flask与Python构建m3u8视频自动化处理系统:从本地存储到Cloudflare R2的工程实践
在数字内容爆炸式增长的时代,视频资源的获取与存储已成为开发者经常需要处理的任务之一。m3u8作为HTTP Live Streaming(HLS)协议的核心播放列表格式,广泛应用于各类视频平台。本文将深入探讨如何利用Flask框架构建一个完整的m3u8视频处理系统,实现从解析、下载到本地存储和Cloudflare R2云存储上传的全流程自动化。
1. 系统架构设计与核心组件
一个健壮的m3u8视频处理系统需要考虑多个关键环节,包括网络请求管理、文件分片处理、错误恢复机制以及存储策略等。以下是系统的主要架构组件:
- 前端接口层:基于Flask构建RESTful API,提供任务提交、状态查询等接口
- 任务调度中心:负责协调下载任务的分发与状态跟踪
- m3u8解析引擎:处理m3u8文件解析与ts分片URL提取
- 下载工作器:执行实际的文件下载操作
- 存储管理层:协调本地存储与云存储的上传操作
- 配置中心:集中管理系统配置与敏感信息
# 系统核心类结构示例 class VideoProcessingSystem: def __init__(self): self.task_queue = Queue() self.download_workers = [] self.storage_backends = { 'local': LocalStorage(), 'r2': R2Storage() }2. Flask应用工程化配置
构建生产级Flask应用需要关注配置管理、数据库集成和错误处理等关键方面。我们采用工厂模式创建应用,实现更好的可测试性和模块化。
2.1 应用工厂与配置管理
from flask import Flask from config import Config def create_app(config_class=Config): app = Flask(__name__) app.config.from_object(config_class) # 初始化扩展 from extensions import db, cors db.init_app(app) cors.init_app(app) # 注册蓝图 from api import video_bp app.register_blueprint(video_bp) return app2.2 数据库模型设计
使用SQLAlchemy ORM定义系统数据模型,包括任务状态跟踪和配置存储:
from datetime import datetime from extensions import db class DownloadTask(db.Model): id = db.Column(db.Integer, primary_key=True) task_id = db.Column(db.String(64), unique=True, index=True) m3u8_url = db.Column(db.String(512)) status = db.Column(db.String(32), default='pending') created_at = db.Column(db.DateTime, default=datetime.utcnow) completed_at = db.Column(db.DateTime) storage_path = db.Column(db.String(512)) def to_dict(self): return { 'task_id': self.task_id, 'status': self.status, 'created_at': self.created_at.isoformat(), 'storage_path': self.storage_path }3. m3u8处理核心逻辑实现
3.1 m3u8解析与ts分片下载
高效解析m3u8文件并处理ts分片下载需要考虑网络异常、重试机制和并发控制:
import requests from concurrent.futures import ThreadPoolExecutor class M3U8Processor: def __init__(self, max_workers=5): self.session = requests.Session() self.executor = ThreadPoolExecutor(max_workers=max_workers) def parse_m3u8(self, m3u8_url): try: response = self.session.get(m3u8_url, timeout=10) response.raise_for_status() lines = response.text.splitlines() return [line for line in lines if line and not line.startswith('#')] except requests.RequestException as e: raise M3U8ParseError(f"Failed to parse m3u8: {str(e)}") def download_ts(self, base_url, ts_url, retries=3): for attempt in range(retries): try: full_url = f"{base_url}{ts_url}" response = self.session.get(full_url, timeout=30) response.raise_for_status() return ts_url, response.content except requests.RequestException: if attempt == retries - 1: raise time.sleep(2 ** attempt)3.2 分片合并与完整性校验
下载完成后,需要对分片进行合并和校验:
import hashlib from pathlib import Path def merge_ts_files(ts_files, output_path): with open(output_path, 'wb') as output: for ts_file in sorted(ts_files): with open(ts_file, 'rb') as f: output.write(f.read()) # 验证文件完整性 file_size = os.path.getsize(output_path) if file_size == 0: raise ValueError("合并后的文件大小为0,可能下载失败") return output_path4. 存储策略实现:本地与Cloudflare R2
4.1 本地存储管理
实现带目录结构和文件管理的本地存储方案:
import os from datetime import datetime class LocalStorage: def __init__(self, base_dir='videos'): self.base_dir = base_dir os.makedirs(base_dir, exist_ok=True) def save(self, content, filename, subdir=None): save_dir = os.path.join(self.base_dir, subdir) if subdir else self.base_dir os.makedirs(save_dir, exist_ok=True) filepath = os.path.join(save_dir, filename) with open(filepath, 'wb') as f: f.write(content) return filepath def cleanup_old_files(self, days=30): cutoff = datetime.now().timestamp() - days * 86400 for root, _, files in os.walk(self.base_dir): for file in files: filepath = os.path.join(root, file) if os.path.getmtime(filepath) < cutoff: os.remove(filepath)4.2 Cloudflare R2集成
安全集成Cloudflare R2对象存储,实现分片上传和进度跟踪:
import boto3 from botocore.exceptions import ClientError class R2Storage: def __init__(self, endpoint_url, access_key, secret_key, bucket_name): self.client = boto3.client( 's3', endpoint_url=endpoint_url, aws_access_key_id=access_key, aws_secret_access_key=secret_key ) self.bucket_name = bucket_name def upload_file(self, file_obj, object_name): try: self.client.upload_fileobj( file_obj, self.bucket_name, object_name, ExtraArgs={'ACL': 'private'} ) return True except ClientError as e: print(f"R2 upload error: {e}") return False def generate_presigned_url(self, object_name, expiration=3600): try: url = self.client.generate_presigned_url( 'get_object', Params={ 'Bucket': self.bucket_name, 'Key': object_name }, ExpiresIn=expiration ) return url except ClientError as e: print(f"Presigned URL error: {e}") return None5. 任务调度与状态管理
5.1 异步任务处理
使用Celery实现异步任务队列,处理长时间运行的下载任务:
from celery import Celery from flask import current_app def make_celery(app): celery = Celery( app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'] ) celery.conf.update(app.config) class ContextTask(celery.Task): def __call__(self, *args, **kwargs): with app.app_context(): return self.run(*args, **kwargs) celery.Task = ContextTask return celery celery = make_celery(create_app()) @celery.task(bind=True) def process_m3u8_task(self, m3u8_url, storage_backend='both'): task_id = self.request.id current_app.logger.info(f"Starting task {task_id} for {m3u8_url}") # 初始化处理器和存储 processor = M3U8Processor() local_storage = LocalStorage() r2_storage = R2Storage( current_app.config['R2_ENDPOINT'], current_app.config['R2_ACCESS_KEY'], current_app.config['R2_SECRET_KEY'], current_app.config['R2_BUCKET'] ) try: # 更新任务状态 task = DownloadTask.query.filter_by(task_id=task_id).first() if task: task.status = 'processing' db.session.commit() # 处理m3u8文件 ts_urls = processor.parse_m3u8(m3u8_url) # 下载并存储分片 base_url = extract_base_url(m3u8_url) results = [] for ts_url in ts_urls: _, content = processor.download_ts(base_url, ts_url) # 本地存储 if storage_backend in ['local', 'both']: local_path = local_storage.save(content, ts_url, task_id) results.append(local_path) # R2存储 if storage_backend in ['r2', 'both']: r2_storage.upload_file(content, f"{task_id}/{ts_url}") # 更新任务状态 if task: task.status = 'completed' task.completed_at = datetime.utcnow() task.storage_path = f"{task_id}/" db.session.commit() return {'status': 'success', 'task_id': task_id} except Exception as e: if task: task.status = 'failed' db.session.commit() current_app.logger.error(f"Task {task_id} failed: {str(e)}") raise self.retry(exc=e, countdown=60)5.2 API端点设计
提供清晰的任务管理API接口:
from flask import Blueprint, request, jsonify from werkzeug.utils import secure_filename video_bp = Blueprint('video', __name__, url_prefix='/api/video') @video_bp.route('/tasks', methods=['POST']) def create_task(): data = request.get_json() m3u8_url = data.get('m3u8_url') storage_backend = data.get('storage_backend', 'both') if not m3u8_url: return jsonify({'error': 'm3u8_url is required'}), 400 # 创建数据库记录 task = DownloadTask( task_id=str(uuid.uuid4()), m3u8_url=m3u8_url, status='pending' ) db.session.add(task) db.session.commit() # 启动异步任务 process_m3u8_task.apply_async( args=[m3u8_url, storage_backend], task_id=task.task_id ) return jsonify(task.to_dict()), 202 @video_bp.route('/tasks/<task_id>', methods=['GET']) def get_task_status(task_id): task = DownloadTask.query.filter_by(task_id=task_id).first_or_404() return jsonify(task.to_dict())6. 系统监控与错误处理
6.1 日志记录配置
配置结构化日志记录,便于问题排查:
import logging from logging.handlers import RotatingFileHandler def configure_logging(app): formatter = logging.Formatter( '%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]' ) # 文件日志 file_handler = RotatingFileHandler( 'video_processor.log', maxBytes=1024 * 1024 * 10, backupCount=5 ) file_handler.setFormatter(formatter) file_handler.setLevel(logging.INFO) # 控制台日志 stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) stream_handler.setLevel(logging.DEBUG) app.logger.addHandler(file_handler) app.logger.addHandler(stream_handler) app.logger.setLevel(logging.DEBUG)6.2 错误处理中间件
实现全局错误处理,提供友好的API错误响应:
from http import HTTPStatus from werkzeug.exceptions import HTTPException @video_bp.app_errorhandler(Exception) def handle_exception(e): # 记录错误日志 current_app.logger.error(f"Unhandled exception: {str(e)}", exc_info=True) # 返回JSON格式错误响应 if isinstance(e, HTTPException): code = e.code message = e.description else: code = HTTPStatus.INTERNAL_SERVER_ERROR message = str(e) or 'Internal Server Error' response = { 'error': message, 'status_code': code } return jsonify(response), code7. 性能优化与扩展性考虑
7.1 并发下载优化
实现更高效的并发下载策略:
import asyncio import aiohttp async def download_ts_concurrently(base_url, ts_urls, max_concurrent=10): semaphore = asyncio.Semaphore(max_concurrent) async def download_one(session, ts_url): async with semaphore: try: async with session.get(f"{base_url}{ts_url}") as response: response.raise_for_status() return ts_url, await response.read() except Exception as e: print(f"Failed to download {ts_url}: {str(e)}") return None async with aiohttp.ClientSession() as session: tasks = [download_one(session, ts_url) for ts_url in ts_urls] results = await asyncio.gather(*tasks) return [r for r in results if r is not None]7.2 断点续传实现
添加断点续传功能,提高大文件下载的可靠性:
def download_with_resume(url, filepath, headers=None): temp_filepath = f"{filepath}.part" headers = headers or {} # 检查已有部分下载 if os.path.exists(temp_filepath): file_size = os.path.getsize(temp_filepath) headers['Range'] = f'bytes={file_size}-' else: file_size = 0 with requests.get(url, headers=headers, stream=True) as r: r.raise_for_status() total_size = int(r.headers.get('content-length', 0)) + file_size mode = 'ab' if file_size > 0 else 'wb' with open(temp_filepath, mode) as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) file_size += len(chunk) # 下载完成后重命名文件 if file_size == total_size or total_size == 0: os.rename(temp_filepath, filepath) return True else: return False8. 安全最佳实践
8.1 敏感信息管理
使用环境变量和加密存储保护敏感信息:
from cryptography.fernet import Fernet import dotenv dotenv.load_dotenv() class ConfigManager: def __init__(self, encryption_key=None): self.fernet = Fernet(encryption_key) if encryption_key else None def get_config(self, key, default=None): value = os.getenv(key, default) if value and self.fernet and key.startswith('SECRET_'): return self.fernet.decrypt(value.encode()).decode() return value def set_config(self, key, value): if self.fernet and key.startswith('SECRET_'): encrypted = self.fernet.encrypt(value.encode()) os.environ[key] = encrypted.decode() else: os.environ[key] = value8.2 请求验证与限流
实现API请求验证和速率限制:
from flask_limiter import Limiter from flask_limiter.util import get_remote_address limiter = Limiter( key_func=get_remote_address, default_limits=["200 per day", "50 per hour"] ) @video_bp.route('/tasks', methods=['POST']) @limiter.limit("10 per minute") def create_task(): # 验证API密钥 api_key = request.headers.get('X-API-KEY') if not api_key or api_key != current_app.config['API_KEY']: return jsonify({'error': 'Invalid API key'}), 401 # 其余任务创建逻辑...