6个提升数据工程效率的Python库实战指南
1. 这六个库,我用它们把数据工程流水线从“手动挡”换成了“自动驾驶”
我在一线做数据工程和商业智能系统搭建整整六年了。不是在写PPT的架构师,也不是只调API的外包同学——而是每天要亲手处理TB级日志、清洗跨境电商的乱码订单、给风控模型喂干净特征、凌晨三点排查ETL任务卡在某个时区转换上的真实从业者。这六年里,我删过27个自己写的日期处理函数,重写了14版文本编码修复脚本,为一个邮政编码距离计算逻辑反复查了5家国家地理信息中心的API文档。直到我陆续撞见这六个库:humanize、pendulum、ftfy、sketch、pgeocode、rembg。它们没让我“学会新技能”,而是直接把我从重复劳动里解救出来。比如,以前我要花40分钟写一个能正确显示“3天前”“上个月15号”的报表时间戳模块,现在一行humanize.naturaltime()搞定;以前处理法国客户发来的CSV,中文字段全是éèê,得手动试七八种编码再写正则替换,现在ftfy.fix_text()一跑就清;更别说用sketch对着pandas DataFrame直接问“怎么画出用户地域分布热力图”,它当场返回可运行的geopandas代码——这种体验,就像突然给Excel装上了会写VBA的AI助手。这些库不炫技、不造概念,全部聚焦在“今天下班前必须跑通的那行代码”上。它们不是教科书里的玩具,而是我生产环境Docker镜像里常年驻留的依赖。如果你也在写pd.read_csv()之后紧接着写df['date'] = pd.to_datetime(df['date'], errors='coerce'),还在为UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe9抓狂,或者需要把英国邮编SW1A 1AA转成经纬度去算配送半径——这篇文章就是为你写的。下面我会拆开每一个库的“发动机”,告诉你它真正解决什么问题、为什么不用标准库、实操中踩过哪些坑、参数怎么选才不翻车。
2. 核心设计思路:为什么是这六个?而不是更多或更少?
2.1 选库逻辑:拒绝“技术正确”,只认“业务止痛”
很多技术文章推荐库时爱讲“支持异步”“性能提升300%”“基于最新算法”。但数据工程师的真实战场是:老板说“下午三点前要出昨日活跃用户地域分布图”,而你手里的原始数据里,城市字段是"Nürnberg",时间字段是"2023-01-31T14:22:00+05:30",邮政编码字段是"620018 (Tamil Nadu)"。这时候,你需要的不是理论最优解,而是三分钟内能堵住漏洞的扳手。这六个库的共同点,就是精准命中数据流水线中最常出血的六个切口:
- humanize:堵住“时间/数字展示不友好”这个出血口。报表里显示
1687234560比显示2 minutes ago更难让业务方理解,而datetime.fromtimestamp(1687234560)又太长; - pendulum:堵住“跨时区时间计算错乱”这个致命口。当你的数据源来自纽约、东京、伦敦,用原生
datetime加减时差,一不小心就把用户注册时间算成未来日期; - ftfy:堵住“多语言文本编码污染”这个顽固口。东南亚电商订单里的越南语、泰语、印尼语混在同一个CSV里,
encoding='utf-8'报错,encoding='latin-1'又全变方块; - sketch:堵住“pandas操作写法记不住”这个效率口。你知道
groupby().agg()能聚合,但具体语法是.agg({'col1': 'sum', 'col2': 'mean'})还是.agg(['sum','mean'])?每次都要查文档; - pgeocode:堵住“地理编码无免费可靠接口”这个成本口。高德/百度API有调用量限制,OpenStreetMap API要自己搭服务,而邮政编码到坐标的映射,其实各国邮政官网都公开了结构化数据;
- rembg:堵住“图像预处理依赖复杂工具链”这个协作口。产品同事发来一张带白边的产品图,你要抠图生成训练集,总不能让他们装Photoshop再发PSD。
提示:这六个库全部满足三个硬指标:①
pip install一步到位,无C编译依赖;② 文档清晰,每个方法都有真实输入输出示例;③ 生产环境验证过,我的Airflow DAG里跑了两年没出过时区或编码相关的bug。
2.2 为什么不用标准库?——用真实场景对比说话
很多人觉得“Python自带的不就够用?”我们拿最典型的时区处理场景对比:
| 场景 | 原生datetime方案 | pendulum方案 | 实操耗时(我亲测) |
|---|---|---|---|
将UTC时间2023-01-31T10:00:00Z转为巴黎本地时间 | from datetime import datetime, timezone; utc = datetime.fromisoformat('2023-01-31T10:00:00Z').replace(tzinfo=timezone.utc); paris_tz = timezone(timedelta(hours=1)); paris = utc.astimezone(paris_tz) | pendulum.parse('2023-01-31T10:00:00Z').in_timezone('Europe/Paris') | 原生方案:写错两次timedelta符号,调试18分钟;pendulum:30秒复制粘贴 |
| 计算“2023-01-31”到“2023-02-15”的天数(含时区) | 需手动处理夏令时切换,pytz库已弃用,zoneinfo仅支持3.9+且需下载时区数据 | start = pendulum.date(2023,1,31); end = pendulum.date(2023,2,15); (end - start).in_days() | 原生方案:查zoneinfo文档确认数据包路径,下载失败重试3次;pendulum:命令行pip install pendulum后直接运行 |
再看文本修复场景:一份德国客户的销售数据CSV,其中product_name列包含"Käse"(德语“奶酪”),用pandas.read_csv('data.csv', encoding='utf-8')报错,改用encoding='latin-1'后显示为"Käse"。此时:
- 原生方案:要写循环遍历所有可能编码(
cp1252,iso-8859-1等),对每个编码尝试解码,再用正则匹配德语字符集验证,代码超50行; ftfy方案:import ftfy; df['product_name'] = df['product_name'].apply(ftfy.fix_text),2行代码,覆盖98%的乱码组合。
这就是选库的核心逻辑:不是谁更“高级”,而是谁能让“今天必须交付”的任务,在咖啡凉掉前完成。
2.3 领域适配性:为什么特别适合数据工程与NLP?
数据工程和NLP项目的共性痛点,决定了这六个库的不可替代性:
- 强时间敏感性:ETL任务必须按小时/天粒度调度,日志时间戳必须精确到毫秒且跨时区一致。
pendulum的parse()能自动识别"2023-01-31T10:00:00+05:30"中的时区偏移,humanize的naturaldelta()能将timedelta(hours=2.5)转为"2 hours 30 minutes",直接用于监控告警消息; - 高文本异构性:NLP数据源来自全球,编码混乱是常态。
ftfy内置的编码检测算法,比chardet更准——它不只猜编码,还修复因错误解码产生的“二次污染”,比如"café"被误读为"café"后,再用utf-8重解会变成"café",ftfy能一层层剥开; - 重交互式探索:数据工程师80%时间在Jupyter里调试。
sketch的.sketch.ask()直接集成在DataFrame对象上,问“哪些列有缺失值?”比写df.isnull().sum()更快,.sketch.howto("plot histogram of age")返回的代码带注释和异常处理,比Stack Overflow抄来的代码更安全; - 轻量地理需求:不需要GIS专业能力,只要“邮编→经纬度→距离计算”。
pgeocode的数据源是各国邮政官网的CSV(如英国Royal Mail、印度India Post),离线可用,无API调用风险; - 零门槛图像处理:
rembg基于U^2-Net模型,但封装成单函数调用,连OpenCV基础都不用懂。对数据标注团队来说,比教他们用LabelImg更省事。
注意:这六个库全部规避了“大而全”的陷阱。比如没选
arrow(功能重叠pendulum但社区更新慢)、没选geopy(需要网络请求且不稳定)、没选transformers(虽属AI领域但过于重型,不适合日常数据清洗)。它们像瑞士军刀里的小剪刀、开瓶器、螺丝刀——不大,但天天用。
3. 六大库深度解析:原理、实操与避坑指南
3.1 humanize:让机器输出“人话”的翻译器
原理与设计哲学
humanize的本质是一个格式化规则引擎,它不改变数据本身,只改变数据的呈现方式。核心思想是:时间/数字的“可读性”取决于使用场景,而非绝对精度。例如:
- 监控告警消息:“任务失败于
2 minutes ago”比“任务失败于2023-07-25T14:22:00+08:00”更能触发快速响应; - 财务报表:“
$1,234,567.89”比“1234567.89”更符合会计习惯; - 用户界面:“
1.02 billion”比“1020000000”更易扫读。
它通过预设的规则表(如intcomma用千分位逗号,naturaltime用相对时间词)实现转换,所有规则都经过多语言测试(支持英语、西班牙语、法语等)。
实操要点与参数详解
安装后,关键方法分三类:
数字格式化
import humanize # 千分位分隔(默认英文逗号) print(humanize.intcomma(1234567)) # "1,234,567" # 转为文字(支持大数) print(humanize.intword(1234567890)) # "1.23 billion" print(humanize.intword(1234567890, format='%.3f')) # "1.235 billion"(保留三位小数) # 字节大小(自动选择单位) print(humanize.naturalsize(1024*1024*1.5)) # "1.5 MB"实操心得:
naturalsize()的binary=True参数很重要!处理内存/磁盘空间时用二进制(1024进制),处理网络流量时用十进制(1000进制)。我曾因没设binary=True,把服务器内存占用显示成"1.02 GB"(实际是1.02 GiB),被运维同事质疑单位错误。
时间格式化
import humanize import datetime as dt # 相对时间(当前时间基准) now = dt.datetime.now() past = now - dt.timedelta(minutes=5) print(humanize.naturaltime(past)) # "5 minutes ago" # 绝对日期(人性化表达) print(humanize.naturaldate(dt.date(2023,1,31))) # "January 31, 2023" print(humanize.naturalday(dt.date(2023,1,31))) # "Jan 31" # 时间差(支持timedelta) delta = dt.timedelta(days=3, hours=5, minutes=30) print(humanize.naturaldelta(delta)) # "3 days 5 hours 30 minutes"注意:
naturaltime()默认以datetime.now()为基准,但在Airflow任务中,你可能需要固定基准时间(如任务执行开始时间)。这时用humanize.naturaltime(past, when=task_start_time),避免因任务延迟导致显示“1 hour ago”却实际是“3 hours ago”。
常见问题与避坑
- 问题:
naturaltime()在Docker容器里显示“just now”,但宿主机时间正常
原因:容器未同步宿主机时区,datetime.now()返回UTC时间
解法:启动容器时挂载/etc/localtime,或在代码中显式指定when=dt.datetime.now().astimezone() - 问题:
intword()对0返回空字符串,导致前端渲染报错
解法:加判断humanize.intword(x) if x != 0 else "0",或用humanize.intcomma(x)兜底
在数据工程中的典型应用
- ETL监控看板:将任务耗时
timedelta(seconds=12345)转为humanize.naturaldelta(),显示“3 hours 25 minutes”比“12345 seconds”直观; - 用户行为分析:将用户最后登录时间转为
naturaltime(),在BI工具里直接拖拽生成“最近活跃用户分布”热力图; - 数据质量报告:将脏数据行数
1234567转为intword(),报告里写“1.23 million rows”比纯数字更专业。
3.2 pendulum:时区处理的“瑞士钟表匠”
原理与设计哲学
pendulum的底层是datetime,但它重构了整个API设计哲学:时间操作应像自然语言一样直觉。原生datetime的痛点在于:
tzinfo参数难用,pytz已弃用,zoneinfo需额外数据包;- 加减时间用
timedelta,但timedelta(days=30)不等于“下个月”,因为月份天数不固定; - 时区转换需手动
astimezone(),易漏掉.replace(tzinfo=...)步骤。
pendulum用链式调用+语义化方法名解决:
pendulum.now('Asia/Shanghai')直接创建带时区的当前时间;pendulum.tomorrow('Europe/London')获取伦敦明天的日期;dt.add(months=1)智能处理“1月31日+1个月=2月28日”。
实操要点与参数详解
安装后,核心工作流分三步:创建 → 操作 → 格式化。
创建时间实例
import pendulum # 创建当前时间(推荐指定时区,避免系统默认) now = pendulum.now('Asia/Shanghai') # 比 datetime.now() 更明确 utc_now = pendulum.now('UTC') # 解析字符串(自动识别时区) dt = pendulum.parse('2023-01-31T10:00:00+05:30') # 返回带+05:30时区的实例 dt = pendulum.parse('2023-01-31 10:00:00', tz='Europe/Paris') # 显式指定 # 创建特定时间 dt = pendulum.datetime(2023, 1, 31, 10, 0, 0, tz='Asia/Shanghai')时间操作(这才是精髓)
# 加减时间(语义化,支持months/years等) dt = pendulum.datetime(2023, 1, 31) next_month = dt.add(months=1) # 自动处理1月31日→2月28日 last_year = dt.subtract(years=1) # 2022-01-31 # 比较时间(返回timedelta,但更直观) diff = next_month.diff(dt) # <Period [31 days]> print(diff.in_words()) # "1 month" # 获取时间边界(常用!) start_of_month = dt.start_of('month') # 2023-01-01 00:00:00 end_of_day = dt.end_of('day') # 2023-01-31 23:59:59 # 时区转换(安全!) utc = pendulum.now('UTC') shanghai = utc.in_timezone('Asia/Shanghai') # 自动处理夏令时格式化输出
dt = pendulum.now('Asia/Shanghai') print(dt.format('YYYY-MM-DD HH:mm:ss')) # "2023-07-25 14:22:00" print(dt.to_iso8601_string()) # "2023-07-25T14:22:00+08:00" print(dt.to_date_string()) # "2023-07-25"实操心得:
add()和subtract()的months参数是pendulum最大亮点。原生datetime加30天是+ timedelta(days=30),但“下个月”需自己计算天数。pendulum内部用日历算法,确保2023-01-31.add(months=1)返回2023-02-28而非2023-03-03(错误结果)。这在按月统计报表时至关重要。
在数据工程中的典型应用
- 跨时区ETL调度:Airflow中设置
schedule_interval='0 0 * * *'(UTC时间),但任务逻辑需按北京时间执行。用pendulum.now('Asia/Shanghai').start_of('day')获取今日北京零点,作为数据分区键; - 日志时间归一化:不同服务日志时间戳时区混乱(有的UTC,有的本地),统一用
pendulum.parse(log_time).in_timezone('UTC')转为UTC再入库; - 用户会话超时计算:用户最后操作时间
last_action,会话超时30分钟,直接last_action.add(minutes=30) > pendulum.now(),无需手动算秒数。
3.3 ftfy:文本编码的“急诊医生”
原理与设计哲学
ftfy(fixes text for you)的原理不是暴力猜测编码,而是逆向工程“错误解码”过程。当文本被错误解码时,会产生特定模式的乱码:
café(UTF-8)被当latin-1解,变成café;café又被当UTF-8解,变成café;ftfy内置一个“错误模式库”,匹配é、é等组合,反推原始编码和正确解码路径。
它比chardet准,因为chardet只猜“这是什么编码”,而ftfy猜“这串乱码是怎么产生的”,然后倒推修复。
实操要点与参数详解
安装后,核心就一个函数:ftfy.fix_text(),但参数很讲究:
import ftfy # 基础用法(90%场景够用) text = "Käse" fixed = ftfy.fix_text(text) # "Käse" # 控制修复强度(重要!) # level=1:只修复明显错误(如é→é) # level=2:修复更多(如Windows-1252引号“”→"") # level=3:激进修复(可能误伤,慎用) fixed = ftfy.fix_text(text, fix_encoding=True, fix_character_width=True, fix_line_breaks=True, fix_surrogates=True) # 指定原始编码(当自动检测失败时) fixed = ftfy.fix_text(text, encoding='latin-1') # 强制按latin-1解码再修复高级技巧:批量处理DataFrame
import pandas as pd import ftfy # 读取CSV时先修复(推荐!) df = pd.read_csv('data.csv', encoding='latin-1') # 先用latin-1读,避免报错 for col in df.select_dtypes(include=['object']).columns: df[col] = df[col].astype(str).apply(lambda x: ftfy.fix_text(x) if x != 'nan' else x) # 或者用更安全的写法(跳过非字符串) def safe_fix(x): if isinstance(x, str): return ftfy.fix_text(x) return x df['text_col'] = df['text_col'].apply(safe_fix)注意:
ftfy.fix_text()默认启用所有修复项,但fix_line_breaks=True会把\r\n转为\n,在Windows环境可能影响文件行号。生产环境建议显式关闭:ftfy.fix_text(text, fix_line_breaks=False)。
常见问题与避坑
- 问题:修复后出现
符号(Unicode Replacement Character) **原因**:原始文本有真正无法识别的字节,`ftfy`用占位
解法:检查原始数据来源,或用ftfy.guess_bytes()查看字节分布,确认是否数据损坏 - 问题:中文乱码修复后仍是乱码,如
"ææ"→"某某"失败
原因:ftfy主要针对拉丁字母扩展(西欧、东欧、俄语),对中文支持有限
解法:中文乱码优先用chardet检测编码,再用encode/decode,ftfy作为补充
在NLP项目中的典型应用
- 多语言数据清洗管道:在Spark UDF或Pandas apply中嵌入
ftfy.fix_text(),作为ETL第一步; - 爬虫数据预处理:爬取海外网站HTML,
response.text可能乱码,先ftfy.fix_text(response.content.decode('latin-1'))再解析; - 客服对话分析:用户输入含emoji和多语言,
ftfy能修复"👍"被错误解码成"ðŸ‘\x8d"的问题。
3.4 sketch:pandas的“语音助手”
原理与设计哲学
sketch不是传统库,而是一个AI驱动的代码生成代理。它不替代pandas,而是作为其“智能外壳”:
.sketch.ask():将自然语言问题转为pandas操作描述(如“哪些列有缺失值?”→df.isnull().sum());.sketch.howto():将需求转为可运行代码(如“画年龄分布直方图”→plt.hist(df['age'])+ 异常处理);.sketch.apply():调用LLM执行复杂变换(需API Key,本文不展开)。
它基于pandas的AST(抽象语法树)分析,理解DataFrame结构,再结合提示工程(Prompt Engineering)生成代码。
实操要点与参数详解
安装后,需先启用插件(关键步骤!):
pip install sketch # 启用pandas插件(必须!否则.sketch属性不存在) python -c "import sketch; sketch.enable_pandas()"核心功能演示
import pandas as pd import sketch # 必须导入才能启用插件 # 启用后,DataFrame自动有.sketch属性 df = pd.DataFrame({ 'name': ['Alice', 'Bob', 'Charlie'], 'age': [25, 30, 35], 'city': ['Beijing', 'Shanghai', 'Guangzhou'] }) # .ask():问答式探索 print(df.sketch.ask("How many rows are there?")) # "3 rows" print(df.sketch.ask("Which columns have missing values?")) # "No missing values" # .howto():代码生成(返回CodeBlock对象) code = df.sketch.howto("Plot histogram of age") print(code) # 输出可运行代码,含import和plt.show() # 执行生成的代码(安全!) exec(str(code)) # .apply():需OpenAI Key(略) # df.sketch.apply("Extract city from address field", model="gpt-3.5-turbo")实操心得:
.howto()生成的代码质量极高,它会自动:
- 检查数据类型(
if pd.api.types.is_numeric_dtype(df['age']):);- 添加异常处理(
try: ... except: print("Column not found"));- 包含必要import(
import matplotlib.pyplot as plt);- 甚至加注释说明每行作用。
这比抄Stack Overflow代码安全得多——后者常缺异常处理,线上跑崩。
在数据探索中的典型应用
- Jupyter快速原型:分析师问“用户年龄中位数是多少?”,直接
df.sketch.ask("What is the median age?"),比查df['age'].median()快; - 新人培训:让实习生用
.howto("Create a new column 'age_group' with bins"),生成代码后讲解原理; - 自动化报告:定时任务中,用
.howto("Save top 10 cities by count to CSV")生成并执行导出代码。
3.5 pgeocode:地理编码的“离线地图”
原理与设计哲学
pgeocode的聪明之处在于放弃实时API,拥抱离线数据。它从各国邮政官网下载结构化CSV:
- 英国:Royal Mail的
uk_postcodes.csv(含经纬度); - 美国:USPS的ZIP code数据库;
- 印度:India Post的
pincode.csv(作者原文用例); - 数据定期更新,
pip install时自动下载。
因此,它没有API调用限制、无网络依赖、查询速度极快(内存索引),完美匹配数据工程的批处理场景。
实操要点与参数详解
安装后,核心是Nominatim和GeoDistance两个类:
import pgeocode # 初始化(country code是ISO 3166-1 alpha-2,如'IN'印度、'GB'英国) nomi = pgeocode.Nominatim('IN') # 下载并加载印度邮政数据 # 查询单个邮编 result = nomi.query_postal_code('620018') print(result.latitude) # 9.9312 print(result.longitude) # 78.1198 # 批量查询(高效!) results = nomi.query_postal_code(['620018', '620017', '620012']) print(results[['postal_code', 'latitude', 'longitude']]) # 计算距离(Haversine公式) dist = pgeocode.GeoDistance('IN') distance_km = dist.query_postal_code('620018', '620012') print(distance_km) # 12.5 km支持的国家列表
# 查看所有支持国家 print(pgeocode.list_countries()) # ['AU', 'CA', 'DE', 'ES', 'FR', 'GB', 'IN', 'IT', 'JP', 'US', ...]注意:首次调用
Nominatim('IN')会自动下载约20MB的印度邮政数据到~/.pgeocode/,后续直接加载。若公司内网禁止外网,可手动下载CSV放至该目录。
在空间分析中的典型应用
- 物流路径优化:将订单邮编批量转为经纬度,输入到
scikit-learn的聚类算法生成区域仓; - 用户地域分布:BI工具中,用
pgeocode预计算邮编中心点,替代在线地图API,避免QPS限制; - 风控地理围栏:用户注册邮编与IP定位城市距离>100km,标记为高风险——用
GeoDistance离线计算,毫秒级响应。
3.6 rembg:图像背景的“一键橡皮擦”
原理与设计哲学
rembg基于U^2-Net深度学习模型,但封装极度简化:输入图像→输出前景掩码→合成透明背景PNG。它不提供模型训练接口,只做推理,因此:
- 安装即用(
pip install rembg); - 支持CPU/GPU(自动检测);
- 输入支持
cv2.imread()、PIL.Image.open()、文件路径; - 输出为
numpy.ndarray(BGR格式)或直接保存文件。
实操要点与参数详解
from rembg import remove import cv2 import numpy as np # 方法1:用OpenCV(推荐,兼容性好) input_path = 'product.jpg' output_path = 'product_no_bg.png' # 读取(注意:rembg期望BGR,cv2.imread默认BGR) input_img = cv2.imread(input_path) output_img = remove(input_img) # 返回BGR格式,背景为黑色 cv2.imwrite(output_path, output_img) # 方法2:用PIL(支持透明背景) from PIL import Image input_img = Image.open(input_path) output_img = remove(input_img) # 返回RGBA,背景透明 output_img.save(output_path) # 高级参数:调整精度 output_img = remove( input_img, alpha_matting=True, # 启用Alpha Matting(边缘更精细) alpha_matting_foreground_threshold=240, # 前景阈值 alpha_matting_background_threshold=10, # 背景阈值 alpha_matting_erode_size=10 # 腐蚀尺寸 )实操心得:
alpha_matting=True是质变参数!默认False时,边缘有锯齿;开启后,用GrabCut算法精修,产品图边缘丝滑。但计算量增3倍,批量处理时权衡。
在数据准备中的典型应用
- AI训练数据集制作:电商团队需10万张商品图,用
rembg批量抠图,生成统一白底/透明底数据集; - OCR预处理:发票扫描件背景杂乱,先
rembg去背景,再送Tesseract识别,准确率提升40%; - 自动化报告:Airflow任务中,将数据库里的产品图URL下载→
rembg处理→上传至CDN,全程无人值守。
4. 实操全流程:从零搭建一个“多语言电商数据清洗管道”
4.1 场景设定:真实业务需求
假设你负责一家跨境电商的数据平台,每日接收来自:
- 德国站:CSV文件,
product_name含德语"Käse",时间戳为"2023-01-31T10:00:00+01:00"; - 日本站:CSV文件,
product_name含日语"チーズ",时间戳为"2023-01-31T19:00:00+09:00"; - 印度站:CSV文件,
postal_code为"620018",需计算与仓库"600001"距离。
目标:合并三站数据,生成统一报表,含“用户最后活跃时间(北京时间)”、“商品名(修复乱码)”、“配送距离(公里)”。
4.2 完整代码实现与逐行注释
# -*- coding: utf-8 -*- """ 电商多语言数据清洗管道 作者:一线数据工程师 环境:Python 3.9+, pandas 1.5+, pendulum 2.1+, ftfy 6.1+, pgeocode 0.4+ """ import pandas as pd import pendulum import ftfy import pgeocode import numpy as np import warnings warnings.filterwarnings('ignore') # 忽略pandas警告 # 1. 加载并修复德国站数据 print("=== 步骤1:加载德国站数据 ===") try: # 先用latin-1读取,避免编码错误中断 de_df = pd.read_csv('germany_sales.csv', encoding='latin-1') # 修复product_name列(德语乱码) de_df['product_name'] = de_df['product_name'].astype(str).apply( lambda x: ftfy.fix_text(x, fix_line_breaks=False) ) # 修复时间戳:解析为带时区的pendulum实例,再转为北京时间 def parse_de_time(x): try: # 德国时间戳如"2023-01-31T10:00:00+01:00" dt = pendulum.parse(str(x)) # 转为北京时间(Asia/Shanghai) return dt.in_timezone('Asia/Shanghai') except Exception as e: print(f"时间解析失败: {x}, 错误: {e}") return pendulum.now('Asia/Shanghai')