当前位置: 首页 > news >正文

Python爬虫实战:基于异步技术与数据挖掘的图书销量排行榜监控系统

摘要

本文详细介绍如何使用Python最新技术栈构建一个高效、稳定的图书销量排行榜爬虫系统。系统将涵盖异步爬取、数据存储、反反爬策略、数据可视化及趋势分析等完整流程,实现多电商平台图书销售数据的自动化监控与分析。


1. 项目概述与技术选型

1.1 项目背景

在数字化出版时代,图书销售数据的实时监控对于出版社、书店和作者都具有重要意义。通过分析各大电商平台的销售排行榜,可以及时掌握市场趋势、读者偏好和竞争态势,为出版决策提供数据支持。

1.2 技术选型

  • 爬虫框架:aiohttp+httpx(异步HTTP客户端)

  • HTML解析:parsel+BeautifulSoup4

  • 数据存储:SQLAlchemy+Alembic+PostgreSQL

  • 异步任务队列:Celery+Redis

  • 数据可视化:Plotly+Dash

  • 代理IP管理: 自建代理池

  • 浏览器自动化:Playwright(处理JavaScript渲染)

  • 部署:Docker+Kubernetes


2. 环境配置与依赖安装

2.1 创建虚拟环境

bash

python -m venv book_monitor_env source book_monitor_env/bin/activate # Linux/Mac # 或 book_monitor_env\Scripts\activate # Windows

2.2 安装依赖包

创建requirements.txt文件:

text

aiohttp==3.8.5 httpx==0.24.1 parsel==1.7.0 beautifulsoup4==4.12.2 sqlalchemy==2.0.23 alembic==1.12.1 psycopg2-binary==2.9.9 celery==5.3.4 redis==5.0.1 playwright==1.40.0 pandas==2.1.4 numpy==1.26.2 plotly==5.18.0 dash==2.14.2 dash-bootstrap-components==1.5.0 schedule==1.2.0 loguru==0.7.2 fake-useragent==1.4.0 python-dotenv==1.0.0 scikit-learn==1.3.2

安装依赖:

bash

pip install -r requirements.txt playwright install # 安装浏览器驱动

3. 系统架构设计

3.1 系统架构图

text

┌─────────────────────────────────────────────────┐ │ 用户界面层 │ │ ┌──────────┐ ┌──────────┐ ┌─────────────┐ │ │ │ Dash │ │ API │ │ 命令行工具 │ │ │ │ 可视化 │ │ 接口 │ │ │ │ │ └──────────┘ └──────────┘ └─────────────┘ │ └─────────────────────────────────────────────────┘ │ ┌─────────────────────────────────────────────────┐ │ 业务逻辑层 │ │ ┌──────────┐ ┌──────────┐ ┌─────────────┐ │ │ │ Celery │ │ 数据清洗 │ │ 趋势分析 │ │ │ │ 任务队列 │ │ 处理器 │ │ 算法 │ │ │ └──────────┘ └──────────┘ └─────────────┘ │ └─────────────────────────────────────────────────┘ │ ┌─────────────────────────────────────────────────┐ │ 数据访问层 │ │ ┌──────────┐ ┌──────────┐ ┌─────────────┐ │ │ │ 异步 │ │ 同步 │ │ Playwright │ │ │ │ 爬虫 │ │ 爬虫 │ │ 爬虫 │ │ │ └──────────┘ └──────────┘ └─────────────┘ │ └─────────────────────────────────────────────────┘ │ ┌─────────────────────────────────────────────────┐ │ 基础设施层 │ │ ┌──────────┐ ┌──────────┐ ┌─────────────┐ │ │ │ 代理池 │ │ 验证码 │ │ 日志系统 │ │ │ │ 管理 │ │ 破解 │ │ │ │ │ └──────────┘ └──────────┘ └─────────────┘ │ └─────────────────────────────────────────────────┘

3.2 数据库设计

python

# models.py from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, JSON from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime import json Base = declarative_base() class BookPlatform(Base): """电商平台信息表""" __tablename__ = 'book_platforms' id = Column(Integer, primary_key=True) name = Column(String(100), nullable=False) # 平台名称 base_url = Column(String(500), nullable=False) # 基础URL category_url = Column(String(500)) # 分类页面URL status = Column(Integer, default=1) # 状态:1-启用,0-禁用 created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) class BookCategory(Base): """图书分类表""" __tablename__ = 'book_categories' id = Column(Integer, primary_key=True) platform_id = Column(Integer, nullable=False) # 平台ID name = Column(String(200), nullable=False) # 分类名称 url = Column(String(500), nullable=False) # 分类URL parent_id = Column(Integer, default=0) # 父分类ID level = Column(Integer, default=1) # 分类层级 created_at = Column(DateTime, default=datetime.now) class BookRank(Base): """图书排行榜数据表""" __tablename__ = 'book_ranks' id = Column(Integer, primary_key=True) platform_id = Column(Integer, nullable=False) category_id = Column(Integer, nullable=False) book_id = Column(String(100), nullable=False) # 图书唯一标识 title = Column(String(500), nullable=False) # 图书标题 author = Column(String(300)) # 作者 publisher = Column(String(300)) # 出版社 price = Column(Float) # 当前价格 original_price = Column(Float) # 原价 discount = Column(Float) # 折扣 sales_count = Column(Integer) # 销量 comment_count = Column(Integer) # 评论数 rating = Column(Float) # 评分 ranking = Column(Integer) # 排名 rank_date = Column(DateTime, nullable=False) # 排名日期 url = Column(String(500)) # 图书详情页URL cover_url = Column(String(500)) # 封面URL tags = Column(JSON) # 标签 extra_info = Column(JSON) # 额外信息 created_at = Column(DateTime, default=datetime.now) __table_args__ = ( Index('idx_platform_category_date', 'platform_id', 'category_id', 'rank_date'), Index('idx_book_rank_date', 'book_id', 'rank_date'), ) class BookTrend(Base): """图书趋势分析表""" __tablename__ = 'book_trends' id = Column(Integer, primary_key=True) book_id = Column(String(100), nullable=False) platform_id = Column(Integer, nullable=False) analysis_date = Column(DateTime, nullable=False) trend_type = Column(String(50)) # 趋势类型:sales/price/ranking trend_data = Column(JSON) # 趋势数据 prediction = Column(JSON) # 预测数据 created_at = Column(DateTime, default=datetime.now)

4. 多电商平台爬虫实现

4.1 基础爬虫类设计

python

# base_spider.py import asyncio import aiohttp from typing import Dict, List, Optional, Any from dataclasses import dataclass from loguru import logger import random import time from fake_useragent import UserAgent @dataclass class RequestConfig: """请求配置类""" headers: Dict[str, str] = None cookies: Dict[str, str] = None proxy: str = None timeout: int = 30 retry_count: int = 3 delay: float = 1.0 class BaseAsyncSpider: """基础异步爬虫类""" def __init__(self, name: str, config: RequestConfig = None): self.name = name self.config = config or RequestConfig() self.ua = UserAgent() self.session: Optional[aiohttp.ClientSession] = None self.request_count = 0 async def __aenter__(self): self.session = aiohttp.ClientSession( headers=self._get_headers(), timeout=aiohttp.ClientTimeout(total=self.config.timeout) ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() def _get_headers(self) -> Dict[str, str]: """获取随机请求头""" headers = { 'User-Agent': self.ua.random, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', } if self.config.headers: headers.update(self.config.headers) return headers async def fetch(self, url: str, method: str = 'GET', **kwargs) -> Optional[str]: """异步获取页面内容""" for attempt in range(self.config.retry_count): try: async with self.session.request(method, url, **kwargs) as response: self.request_count += 1 if response.status == 200: content = await response.text() logger.info(f"成功获取 {url},长度:{len(content)}") # 随机延迟,避免请求过快 await asyncio.sleep(random.uniform(self.config.delay, self.config.delay * 2)) return content else: logger.warning(f"请求 {url} 失败,状态码:{response.status}") except Exception as e: logger.error(f"请求 {url} 异常(尝试 {attempt + 1}/{self.config.retry_count}): {e}") if attempt < self.config.retry_count - 1: await asyncio.sleep(2 ** attempt) # 指数退避 return None async def fetch_json(self, url: str, **kwargs) -> Optional[Dict]: """获取JSON数据""" content = await self.fetch(url, **kwargs) if content: try: import json return json.loads(content) except json.JSONDecodeError as e: logger.error(f"JSON解析失败: {e}") return None async def fetch_multiple(self, urls: List[str], max_concurrent: int = 5) -> List[Optional[str]]: """并发获取多个URL""" semaphore = asyncio.Semaphore(max_concurrent) async def fetch_with_semaphore(url): async with semaphore: return await self.fetch(url) tasks = [fetch_with_semaphore(url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) # 处理异常结果 final_results = [] for result in results: if isinstance(result, Exception): logger.error(f"并发请求异常: {result}") final_results.append(None) else: final_results.append(result) return final_results

4.2 京东图书爬虫实现

python

# jd_spider.py import re import json from urllib.parse import urljoin, quote from parsel import Selector from base_spider import BaseAsyncSpider, RequestConfig from typing import List, Dict, Any from dataclasses import dataclass from datetime import datetime @dataclass class BookInfo: """图书信息数据类""" platform: str = "JD" book_id: str = "" title: str = "" author: str = "" publisher: str = "" price: float = 0.0 original_price: float = 0.0 discount: float = 0.0 sales_count: int = 0 comment_count: int = 0 rating: float = 0.0 ranking: int = 0 url: str = "" cover_url: str = "" category: str = "" tags: List[str] = None publish_date: str = "" isbn: str = "" def to_dict(self) -> Dict: """转换为字典""" return {k: v for k, v in self.__dict__.items() if not k.startswith('_')} class JDBookSpider(BaseAsyncSpider): """京东图书爬虫""" BASE_URL = "https://book.jd.com" # 图书分类映射 CATEGORIES = { "小说": "https://book.jd.com/booktop/0-0-0.html?category=1713-0-0-0-10001-1", "文学": "https://book.jd.com/booktop/0-0-0.html?category=1713-0-0-0-10002-1", "童书": "https://book.jd.com/booktop/0-0-0.html?category=1713-0-0-0-10003-1", "教材": "https://book.jd.com/booktop/0-0-0.html?category=1713-0-0-0-10004-1", "艺术": "https://book.jd.com/booktop/0-0-0.html?category=1713-0-0-0-10005-1", "管理": "https://book.jd.com/booktop/0-0-0.html?category=1713-0-0-0-10006-1", "科技": "https://book.jd.com/booktop/0-0-0.html?category=1713-0-0-0-10007-1", } def __init__(self, config: RequestConfig = None): super().__init__("JDBookSpider", config) async def get_category_books(self, category: str, pages: int = 5) -> List[BookInfo]: """获取分类图书排行榜""" books = [] for page in range(1, pages + 1): url = self.CATEGORIES.get(category) if not url: logger.warning(f"未找到分类: {category}") continue # 修改分页参数 url = url.replace("-1", f"-{page}") logger.info(f"开始抓取 {category} 第 {page} 页: {url}") content = await self.fetch(url) if not content: continue page_books = self.parse_book_list(content, category, page) books.extend(page_books) # 随机延迟 await asyncio.sleep(random.uniform(2, 4)) return books def parse_book_list(self, html: str, category: str, page: int) -> List[BookInfo]: """解析图书列表页""" selector = Selector(html) books = [] # 京东排行榜页面结构 book_elements = selector.css('.mc .book') for idx, element in enumerate(book_elements): try: book = BookInfo() book.category = category book.ranking = (page - 1) * len(book_elements) + idx + 1 # 提取图书ID book_url = element.css('.p-img a::attr(href)').get() if book_url: book.url = urljoin(self.BASE_URL, book_url) # 从URL中提取商品ID match = re.search(r'/(\d+)\.html', book_url) if match: book.book_id = f"JD_{match.group(1)}" # 标题 title = element.css('.p-name a::attr(title)').get() book.title = title.strip() if title else "" # 作者和出版社 author_pub = element.css('.p-bi-name').xpath('string()').get() if author_pub: # 简单分割作者和出版社 parts = author_pub.split('/') if len(parts) >= 1: book.author = parts[0].strip() if len(parts) >= 2: book.publisher = parts[1].strip() # 价格 price_text = element.css('.p-price .price::text').get() if price_text: try: book.price = float(price_text.replace('¥', '').strip()) except: pass # 原价 original_price_text = element.css('.p-price del::text').get() if original_price_text: try: book.original_price = float(original_price_text.replace('¥', '').strip()) if book.original_price > 0: book.discount = round(book.price / book.original_price * 10, 1) except: pass # 评论数 comment_text = element.css('.p-commit a::text').get() if comment_text: match = re.search(r'(\d+)', comment_text) if match: book.comment_count = int(match.group(1)) # 评分(京东通常显示为几星) rating_text = element.css('.p-score::attr(title)').get() if rating_text: match = re.search(r'(\d+\.?\d*)', rating_text) if match: book.rating = float(match.group(1)) # 封面图 cover_url = element.css('.p-img img::attr(src)').get() if cover_url: if cover_url.startswith('//'): cover_url = 'https:' + cover_url book.cover_url = cover_url # 获取销量数据(可能需要从详情页获取) book.sales_count = self._estimate_sales(book.comment_count) books.append(book) except Exception as e: logger.error(f"解析图书信息失败: {e}") continue logger.info(f"解析到 {len(books)} 本图书") return books def _estimate_sales(self, comment_count: int) -> int: """根据评论数估算销量(经验公式)""" if comment_count <= 10: return comment_count * 100 elif comment_count <= 100: return comment_count * 50 elif comment_count <= 1000: return comment_count * 20 else: return comment_count * 10 async def get_book_detail(self, book_id: str) -> Dict[str, Any]: """获取图书详细信息""" url = f"https://item.jd.com/{book_id.replace('JD_', '')}.html" content = await self.fetch(url) if not content: return {} detail_info = {} selector = Selector(content) try: # 提取SKU信息 sku_info = selector.css('#spec-n1 img::attr(src)').get() if sku_info: detail_info['sku_images'] = [urljoin('https:', sku_info)] # 提取商品描述 description = selector.css('.book-detail-content::text').get() if description: detail_info['description'] = description.strip() # 提取ISBN isbn_match = re.search(r'ISBN:(\d+)', content) if isbn_match: detail_info['isbn'] = isbn_match.group(1) # 提取出版时间 pub_date_match = re.search(r'出版时间:(\d{4}-\d{1,2}-\d{1,2})', content) if pub_date_match: detail_info['publish_date'] = pub_date_match.group(1) except Exception as e: logger.error(f"解析详情页失败: {e}") return detail_info

4.3 当当网爬虫实现

python

# dangdang_spider.py from base_spider import BaseAsyncSpider, RequestConfig from jd_spider import BookInfo from parsel import Selector from urllib.parse import urljoin, quote import re class DangDangSpider(BaseAsyncSpider): """当当网图书爬虫""" BASE_URL = "http://bang.dangdang.com" CATEGORIES = { "小说": "http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-24hours-0-0-1-1", "文学": "http://bang.dangdang.com/books/bestsellers/01.41.00.00.00.00-24hours-0-0-1-1", "童书": "http://bang.dangdang.com/books/bestsellers/01.47.00.00.00.00-24hours-0-0-1-1", "教材": "http://bang.dangdang.com/books/bestsellers/01.54.00.00.00.00-24hours-0-0-1-1", "艺术": "http://bang.dangdang.com/books/bestsellers/01.38.00.00.00.00-24hours-0-0-1-1", "管理": "http://bang.dangdang.com/books/bestsellers/01.01.00.00.00.00-24hours-0-0-1-1", "科技": "http://bang.dangdang.com/books/bestsellers/01.36.00.00.00.00-24hours-0-0-1-1", } def __init__(self, config: RequestConfig = None): super().__init__("DangDangSpider", config) async def get_bestsellers(self, category: str, pages: int = 3) -> List[BookInfo]: """获取畅销榜""" books = [] for page in range(1, pages + 1): base_url = self.CATEGORIES.get(category) if not base_url: continue url = base_url.replace("-1-1", f"-{page}-1") logger.info(f"抓取当当 {category} 第 {page} 页: {url}") content = await self.fetch(url) if not content: continue page_books = self.parse_bestseller_page(content, category, page) books.extend(page_books) await asyncio.sleep(random.uniform(1, 3)) return books def parse_bestseller_page(self, html: str, category: str, page: int) -> List[BookInfo]: """解析畅销榜页面""" selector = Selector(html) books = [] items = selector.css('.bang_list li') for idx, item in enumerate(items): try: book = BookInfo() book.platform = "DangDang" book.category = category book.ranking = (page - 1) * len(items) + idx + 1 # 图书URL和ID book_url = item.css('.name a::attr(href)').get() if book_url: book.url = book_url match = re.search(r'/(\d+)\.html', book_url) if match: book.book_id = f"DD_{match.group(1)}" # 标题 title = item.css('.name a::attr(title)').get() book.title = title.strip() if title else "" # 作者和出版社 author_pub = item.css('.publisher_info').xpath('string()').get() if author_pub: parts = [p.strip() for p in author_pub.split('/') if p.strip()] if len(parts) >= 1: book.author = parts[0] if len(parts) >= 2: book.publisher = parts[1] # 价格 price_text = item.css('.price .price_n::text').get() if price_text: try: book.price = float(price_text.replace('¥', '').strip()) except: pass # 原价 original_price_text = item.css('.price .price_r::text').get() if original_price_text: try: book.original_price = float(original_price_text.replace('¥', '').strip()) if book.original_price > 0: book.discount = round(book.price / book.original_price * 10, 1) except: pass # 评论数 comment_text = item.css('.star .comment_num::text').get() if comment_text: match = re.search(r'(\d+)', comment_text) if match: book.comment_count = int(match.group(1)) # 推荐度(当当的推荐度类似评分) recommend_text = item.css('.star .tuijian::text').get() if recommend_text: match = re.search(r'(\d+\.?\d*)%', recommend_text) if match: book.rating = float(match.group(1)) / 20 # 转换为5分制 # 封面图 cover_url = item.css('.pic img::attr(src)').get() if cover_url: if cover_url.startswith('//'): cover_url = 'http:' + cover_url book.cover_url = cover_url # 提取销量(当当有时会显示) sales_text = item.css('.biaosheng .num::text').get() if sales_text: match = re.search(r'(\d+)', sales_text) if match: book.sales_count = int(match.group(1)) else: book.sales_count = book.comment_count * 15 # 估算 books.append(book) except Exception as e: logger.error(f"解析当当图书失败: {e}") continue return books

4.4 异步任务调度器

python

# task_scheduler.py import asyncio import schedule import time from datetime import datetime, timedelta from typing import Dict, List, Callable from concurrent.futures import ThreadPoolExecutor from loguru import logger class AsyncTaskScheduler: """异步任务调度器""" def __init__(self, max_workers: int = 10): self.max_workers = max_workers self.executor = ThreadPoolExecutor(max_workers=max_workers) self.tasks: Dict[str, Dict] = {} self.running = False def add_task(self, name: str, task_func: Callable, interval: int = 3600, immediate: bool = True, **kwargs): """添加定时任务""" self.tasks[name] = { 'func': task_func, 'interval': interval, 'kwargs': kwargs, 'last_run': None, 'next_run': datetime.now() if immediate else None } # 使用schedule库设置定时任务 schedule.every(interval).seconds.do( self._run_task_wrapper, name, task_func, kwargs ) logger.info(f"已添加任务: {name}, 间隔: {interval}秒") async def _run_task_wrapper(self, name: str, task_func: Callable, kwargs: Dict): """任务运行包装器""" try: logger.info(f"开始执行任务: {name}") self.tasks[name]['last_run'] = datetime.now() # 运行异步任务 if asyncio.iscoroutinefunction(task_func): await task_func(**kwargs) else: # 如果是同步函数,在线程池中运行 await asyncio.get_event_loop().run_in_executor( self.executor, task_func, **kwargs ) logger.info(f"任务完成: {name}") except Exception as e: logger.error(f"任务执行失败 {name}: {e}") async def run_task(self, name: str): """立即运行指定任务""" if name in self.tasks: task_info = self.tasks[name] await self._run_task_wrapper(name, task_info['func'], task_info['kwargs']) async def start(self): """启动调度器""" self.running = True logger.info("任务调度器启动") while self.running: try: # 运行所有到期的任务 schedule.run_pending() # 更新下一次运行时间 for name, task_info in self.tasks.items(): if task_info['last_run']: task_info['next_run'] = ( task_info['last_run'] + timedelta(seconds=task_info['interval']) ) await asyncio.sleep(1) except KeyboardInterrupt: logger.info("接收到停止信号") break except Exception as e: logger.error(f"调度器运行异常: {e}") await asyncio.sleep(5) async def stop(self): """停止调度器""" self.running = False self.executor.shutdown(wait=True) logger.info("任务调度器已停止")

5. 异步爬虫优化策略

5.1 连接池管理

python

# connection_pool.py import aiohttp from typing import Dict, List import asyncio from dataclasses import dataclass from loguru import logger @dataclass class ConnectionStats: """连接统计信息""" total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 avg_response_time: float = 0.0 active_connections: int = 0 class SmartConnectionPool: """智能连接池""" def __init__(self, max_size: int = 100, max_per_host: int = 10): self.max_size = max_size self.max_per_host = max_per_host self.sessions: Dict[str, aiohttp.ClientSession] = {} self.stats = ConnectionStats() self.lock = asyncio.Lock() async def get_session(self, base_url: str) -> aiohttp.ClientSession: """获取或创建会话""" async with self.lock: if base_url not in self.sessions: connector = aiohttp.TCPConnector( limit=self.max_size, limit_per_host=self.max_per_host, enable_cleanup_closed=True, ttl_dns_cache=300 # DNS缓存5分钟 ) timeout = aiohttp.ClientTimeout( total=30, connect=10, sock_read=20 ) self.sessions[base_url] = aiohttp.ClientSession( connector=connector, timeout=timeout ) logger.info(f"创建新会话: {base_url}") return self.sessions[base_url] async def close_all(self): """关闭所有连接""" async with self.lock: for url, session in self.sessions.items(): await session.close() logger.info(f"关闭会话: {url}") self.sessions.clear()

5.2 请求去重与缓存

python

# request_cache.py import hashlib import pickle from typing import Any, Optional import time from datetime import datetime, timedelta import redis from loguru import logger class RequestCache: """请求缓存管理器""" def __init__(self, redis_url: str = "redis://localhost:6379/0", ttl: int = 3600): self.redis = redis.from_url(redis_url) self.ttl = ttl def _generate_key(self, url: str, params: dict) -> str: """生成缓存键""" key_data = f"{url}:{sorted(params.items())}" return f"cache:{hashlib.md5(key_data.encode()).hexdigest()}" def get(self, url: str, params: dict = None) -> Optional[Any]: """获取缓存""" try: cache_key = self._generate_key(url, params or {}) cached_data = self.redis.get(cache_key) if cached_data: logger.debug(f"缓存命中: {url}") return pickle.loads(cached_data) except Exception as e: logger.error(f"缓存读取失败: {e}") return None def set(self, url: str, data: Any, params: dict = None, ttl: Optional[int] = None): """设置缓存""" try: cache_key = self._generate_key(url, params or {}) ttl = ttl or self.ttl self.redis.setex( cache_key, ttl, pickle.dumps(data) ) logger.debug(f"缓存设置: {url}, TTL: {ttl}") except Exception as e: logger.error(f"缓存设置失败: {e}")

6. 数据存储与处理

6.1 数据库操作类

python

# database.py from sqlalchemy import create_engine, text from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy.exc import SQLAlchemyError from contextlib import contextmanager from loguru import logger import pandas as pd from typing import Generator, List, Dict, Any from models import Base, BookRank class DatabaseManager: """数据库管理器""" def __init__(self, connection_string: str): self.engine = create_engine( connection_string, pool_size=20, max_overflow=10, pool_pre_ping=True, pool_recycle=3600, echo=False ) self.session_factory = sessionmaker(bind=self.engine) self.Session = scoped_session(self.session_factory) @contextmanager def get_session(self) -> Generator: """获取数据库会话上下文""" session = self.Session() try: yield session session.commit() except Exception as e: session.rollback() logger.error(f"数据库操作失败: {e}") raise finally: session.close() def init_database(self): """初始化数据库""" Base.metadata.create_all(self.engine) logger.info("数据库初始化完成") def bulk_insert_books(self, books: List[BookRank]): """批量插入图书数据""" if not books: return with self.get_session() as session: try: session.bulk_save_objects(books) logger.info(f"批量插入 {len(books)} 条记录") except SQLAlchemyError as e: logger.error(f"批量插入失败: {e}") def get_recent_books(self, platform: str = None, category: str = None, limit: int = 100) -> List[Dict]: """获取最近的图书数据""" with self.get_session() as session: query = session.query(BookRank) if platform: query = query.filter(BookRank.platform == platform) if category: query = query.filter(BookRank.category == category) results = query.order_by( BookRank.rank_date.desc() ).limit(limit).all() return [book.to_dict() for book in results] def get_sales_trend(self, book_id: str, days: int = 30) -> pd.DataFrame: """获取销售趋势数据""" sql = text(""" SELECT DATE(rank_date) as date, AVG(sales_count) as avg_sales, AVG(price) as avg_price, AVG(ranking) as avg_ranking FROM book_ranks WHERE book_id = :book_id AND rank_date >= NOW() - INTERVAL :days DAY GROUP BY DATE(rank_date) ORDER BY date """) with self.get_session() as session: result = session.execute(sql, { 'book_id': book_id, 'days': days }) df = pd.DataFrame( result.fetchall(), columns=result.keys() ) return df
http://www.jsqmd.com/news/345205/

相关文章:

  • 百度免费上传组件如何处理大文件分段的方案总结?
  • 手把手玩转双向CLLC双闭环设计
  • 西门子PLC1500大型程序fanuc机器人汽车焊装 包括1台西门子1500PLC程序
  • 网页中如何实现大文件夹整体上传的解决方案总结?
  • 如何选择可靠的新能源维修公司?2026年推荐与深度评测,直击效率低下与兼容性差痛点 - 品牌推荐
  • 房屋租赁系统 二手房屋销售系统 开题报告 springboot和vue
  • vue-cli项目中如何处理大文件秒传的方案总结?
  • 2026年质量好的水帘框/四川水帘纸高性价比推荐 - 行业平台推荐
  • 2026年新能源汽车维修公司推荐:聚焦城市通勤与长途场景,严选服务商并发布避坑指南 - 品牌推荐
  • 百度WebUploader上传超大附件有哪些解决方案总结?
  • 科研党收藏!千笔ai写作,专科生论文神器
  • 从 状态管理 V1 到 V2:鸿蒙开发者的进化指南
  • 2026年新能源汽车维修公司推荐:多场景维保需求深度评测,破解技术壁垒与配件痛点 - 品牌推荐
  • 2026年海外GEO优化推广服务商权威测评--深圳昊客网络GEO技术携AI算法脱颖而出 - 深圳昊客网络
  • 2026年医院展馆迎宾讲解机器人深度技术解析与主流产品评测 - 智造出海
  • 有实力的信息流广告品牌企业哪家好,广西企业全梳理 - 工业推荐榜
  • 详细介绍:ReentrantLock 加锁与解锁流程详解(源码分析,小白易懂)
  • 2026年可靠的钢结构球形支座生产商推荐,好用的品牌有哪些 - mypinpai
  • 新能源汽车维修哪家技术强?2026年维修公司排名推荐,直击检测精度与安全痛点 - 品牌推荐
  • 2026年洗发水厂家年度排名大梳理,靠谱品牌选购指南 - myqiye
  • 2026年康复机构展厅迎宾讲解机器人技术深度解析与主流产品选型指南 - 智造出海
  • 2026年沈阳创业人群烘焙培训学校排名,知名品牌性价比大揭秘 - 工业设备
  • 2026年口碑好的个人护理加工工厂排名,哪家技艺高靠谱 - 工业品网
  • 电驴时代联系方式:使用其服务前的注意事项提示 - 品牌推荐
  • 电驴时代 联系方式: 了解其业务板块与行业定位的参考 - 品牌推荐
  • 2026年浙江地区流延机节能的品牌推荐,价格多少 - 工业品牌热点
  • 2026年热门的节能保温材料/墙体保温材料厂家热销推荐 - 行业平台推荐
  • 2026年广西诚信的信息流广告机构推荐,售后完善价格多少 - 工业推荐榜
  • 电驴时代 联系方式:用户联系与通用信息说明 - 品牌推荐
  • 开题报告 springboot和vue 装修材料销售管理系统