当前位置: 首页 > news >正文

SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点

👋 大家好,欢迎来到我的技术博客!
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕SkyWalking这个话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!


文章目录

  • 🐍 SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点
    • 🧭 什么是 SkyWalking?
    • 🐍 Python 埋点基础:skywalking-python
      • 🔧 安装与配置
    • 📡 示例一:Flask 应用自动追踪
    • 🧵 示例二:手动埋点 —— 自定义 Span
    • 🔗 跨服务追踪:Python 与 Java 交互
    • 🔄 上下文传播机制详解
    • 📊 数据上报协议:gRPC vs HTTP
    • 🎯 性能影响评估
    • 🧩 插件生态与框架支持
    • 🧭 分布式追踪原理图解
    • 🧪 示例三:异步任务追踪(Celery)
    • 📈 指标与告警
    • 🛠️ 故障排查技巧
      • 1. 数据未上报?
      • 2. 调用链断裂?
      • 3. UI 无数据显示?
    • 🌐 生产环境最佳实践
      • ✅ 1. 合理命名服务与端点
      • ✅ 2. 使用环境变量配置
      • ✅ 3. 优雅关闭 Agent
      • ✅ 4. 监控 Agent 自身状态
    • 🧩 扩展功能:日志关联
    • 🧠 高级特性:动态采样与条件追踪
    • 📦 与 OpenTelemetry 的关系
    • 🧪 示例四:集成测试中的追踪验证
    • 🌍 多数据中心部署
    • 💡 小贴士:提升可观测性体验
    • 🏁 总结
    • 📚 延伸阅读
    • 🚀 动手实践建议

🐍 SkyWalking - Python 应用追踪:基于 skywalking-python 的埋点

在当今微服务架构盛行的时代,分布式系统的可观测性(Observability)已成为保障系统稳定性和性能优化的关键能力。Apache SkyWalking 作为一款开源的 APM(Application Performance Monitoring)系统,以其强大的分布式追踪、服务拓扑图、指标分析和告警能力,被广泛应用于生产环境。虽然 SkyWalking 最初以 Java 生态为主力支持对象,但随着社区的发展,Python 应用也可以通过skywalking-python实现无侵入或轻量级埋点,从而接入完整的观测体系。

本文将深入探讨如何在 Python 应用中集成 SkyWalking,并结合 Java 示例进行对比,帮助开发者理解跨语言追踪的核心机制。同时,我们将使用 Mermaid 图表展示调用链路结构,并提供多个可访问的官方文档链接供读者延伸学习。


🧭 什么是 SkyWalking?

Apache SkyWalking 是一个观测性平台,用于监控、追踪、诊断和可视化分布式系统的性能问题。它支持自动探针(Agent)和手动埋点(Manual Instrumentation),覆盖多种语言和框架:

  • ✅ Java
  • ✅ Python
  • ✅ Node.js
  • ✅ Go
  • ✅ .NET
  • ✅ PHP(实验性)
  • ✅ Rust(实验性)

SkyWalking 的核心组件包括:

  1. OAP Server:后端处理与存储。
  2. UI:可视化仪表盘。
  3. Agent / SDK:嵌入应用采集数据。

官方网站:https://skywalking.apache.org/
文档中心:https://skywalking.apache.org/docs/


🐍 Python 埋点基础:skywalking-python

skywalking-python是 SkyWalking 官方提供的 Python 探针库,支持自动和手动两种方式采集追踪数据。其主要功能包括:

  • 自动追踪 HTTP 请求(如 Flask、Django、FastAPI)
  • 手动创建 Span(用于自定义业务逻辑)
  • 上下文传播(Context Propagation)
  • 支持 gRPC 和 HTTP 协议上报

🔧 安装与配置

首先安装skywalking-python

pipinstallapache-skywalking

然后,在你的 Python 应用入口处初始化 SkyWalking Agent:

fromskywalkingimportagent,config config.init(collector_address='127.0.0.1:11800',# OAP 服务器地址service_name='my-python-service',protocol='grpc'# 或 'http')agent.start()

⚠️ 注意:必须在导入其他业务模块之前初始化 agent,否则无法正确拦截框架请求。


📡 示例一:Flask 应用自动追踪

我们先从最简单的 Flask 应用开始,展示自动埋点的能力:

# app.pyfromflaskimportFlaskfromskywalkingimportagent,config# 初始化 SkyWalkingconfig.init(collector_address='127.0.0.1:11800',service_name='flask-demo',protocol='grpc')agent.start()app=Flask(__name__)@app.route('/')defhello():return"Hello, SkyWalking!"@app.route('/user/<int:user_id>')defget_user(user_id):# 模拟数据库查询importtime time.sleep(0.1)returnf"User{user_id}data"if__name__=='__main__':app.run(host='0.0.0.0',port=5000)

启动该服务后,访问http://localhost:5000/user/123,你将在 SkyWalking UI 中看到如下信息:

  • 服务名:flask-demo
  • Endpoint:GET /user/<int:user_id>
  • 响应时间:约 100ms
  • 调用链路自动生成

🧵 示例二:手动埋点 —— 自定义 Span

有时我们需要追踪某个特定函数或外部调用(如 Redis、MySQL、第三方 API),这时就需要手动创建 Span:

fromskywalkingimportComponentfromskywalking.trace.contextimportget_contextfromskywalking.trace.tagsimportTagdefcall_external_api(user_id):context=get_context()withcontext.new_exit_span(op="ExternalAPI/call",peer="api.example.com",component=Component.Unknown)asspan:span.tag(Tag(key="user.id",val=str(user_id)))# 模拟网络延迟importtime time.sleep(0.2)return{"status":"success","data":f"User{user_id}"}@app.route('/fetch/<int:user_id>')deffetch_user(user_id):result=call_external_api(user_id)returnresult

在这个例子中:

  • new_exit_span表示这是一个“出口”Span,通常用于外部服务调用。
  • peer字段标识目标服务地址。
  • tag用于附加业务信息,便于后续筛选和分析。

🔗 跨服务追踪:Python 与 Java 交互

在真实场景中,Python 服务往往与 Java 服务协同工作。SkyWalking 支持跨语言上下文传播,确保整个调用链完整。

假设我们有一个 Java Spring Boot 服务,提供用户信息服务:

// UserController.java@RestController@RequestMapping("/api/user")publicclassUserController{@AutowiredprivateUserServiceuserService;@GetMapping("/{id}")publicResponseEntity<User>getUser(@PathVariableLongid){Useruser=userService.findById(id);returnResponseEntity.ok(user);}}

对应的 Python 服务调用该接口:

importrequestsfromskywalkingimportagent,configfromskywalking.trace.contextimportget_contextfromskywalking.trace.carrierimportCarrier config.init(collector_address='127.0.0.1:11800',service_name='python-client',protocol='grpc')agent.start()defcall_java_service(user_id):context=get_context()carrier=Carrier()withcontext.new_exit_span(op="HTTP/GET",peer="localhost:8080",component=Component.HttpClient)asspan:span.tag(TagHttpMethod("GET"))span.tag(TagHttpURL(f"http://localhost:8080/api/user/{user_id}"))# 注入上下文到 HTTP Headercontext.inject(carrier)headers=dict(carrier)response=requests.get(f"http://localhost:8080/api/user/{user_id}",headers=headers)returnresponse.json()@app.route('/proxy/user/<int:user_id>')defproxy_user(user_id):data=call_java_service(user_id)returndata

在 Java 端,Spring Cloud Sleuth + SkyWalking Agent 会自动提取 Header 中的 Trace 上下文,从而实现无缝衔接。


🔄 上下文传播机制详解

SkyWalking 使用类似 W3C Trace Context 的机制进行跨服务传播。关键 Header 包括:

  • sw8: SkyWalking 自定义协议头(Base64 编码)
  • traceparent: W3C 标准头(可选)

在手动埋点时,我们通过Carrier对象封装这些 Header:

carrier=Carrier()context.inject(carrier)# 将当前上下文注入到 carrierheaders=dict(carrier)# 转为字典,用于 HTTP 请求

接收方(如 Java 服务)则通过以下方式提取:

@GetMapping("/trace")publicStringtrace(HttpServletRequestrequest){Stringsw8=request.getHeader("sw8");// SkyWalking Agent 会自动解析并恢复上下文return"Traced!";}

这种机制确保了即使跨越不同语言、不同框架,调用链依然连贯。


📊 数据上报协议:gRPC vs HTTP

SkyWalking 支持两种上报协议:

协议优点缺点
gRPC高效、低延迟、支持流式传输需要额外依赖,防火墙可能限制
HTTP兼容性好,调试方便性能略低,不支持双向流

在 Python 中切换协议只需修改protocol参数:

config.init(collector_address='127.0.0.1:12800',# HTTP Collector 默认端口service_name='my-service',protocol='http')

更多协议说明请参考:https://skywalking.apache.org/docs/main/latest/en/setup/backend/backend-setup/


🎯 性能影响评估

引入 SkyWalking 是否会影响应用性能?答案是:极小影响

  • 自动埋点仅在首次加载类时进行字节码增强(Java)或装饰器注入(Python)。
  • 数据上报异步进行,不影响主流程。
  • 可配置采样率,默认 100%,生产环境可调整为 10%~30% 以降低开销。

Python 中设置采样率:

config.init(collector_address='127.0.0.1:11800',service_name='sampled-service',sample_rate=0.3# 30% 采样)

🧩 插件生态与框架支持

skywalking-python内置支持主流框架:

  • ✅ Flask
  • ✅ Django
  • ✅ FastAPI
  • ✅ Tornado
  • ✅ aiohttp
  • ✅ urllib3 / requests
  • ✅ Redis / MySQL / PostgreSQL(需手动启用)

启用插件示例:

importskywalking.plugin.install skywalking.plugin.install()# 自动安装所有已知插件

你也可以按需启用:

fromskywalking.pluginimportplugin_flask,plugin_requests plugin_flask.install()plugin_requests.install()

🧭 分布式追踪原理图解

下面用 Mermaid 展示一个典型的跨服务调用链:

🗄️ Database🛒 Order Service (Python)👤 User Service (Java)⚙️ API Gateway (Python)🖥️ Client🗄️ Database🛒 Order Service (Python)👤 User Service (Java)⚙️ API Gateway (Python)🖥️ ClientGET /user/123/ordersHTTP GET /api/user/123{ "name": "Alice" }HTTP GET /orders?user_id=123SELECT * FROM orders WHERE user_id=123[{order_id: 1}, ...][{order_id: 1}, ...]{ "user": "...", "orders": [...] }

在这个调用链中:

  • 每个服务节点都会生成自己的 Span。
  • 上下文通过 Header 传递,确保 TraceID 一致。
  • 最终在 SkyWalking UI 中形成完整的火焰图(Flame Graph)。

🧪 示例三:异步任务追踪(Celery)

对于后台任务系统如 Celery,我们同样可以追踪:

fromceleryimportCeleryfromskywalkingimportagent,config config.init(collector_address='127.0.0.1:11800',service_name='celery-worker',protocol='grpc')agent.start()app=Celery('tasks',broker='redis://localhost:6379')@app.taskdefsend_email(user_id,content):context=get_context()withcontext.new_local_span(op="Task/send_email")asspan:span.tag(Tag(key="user.id",val=str(user_id)))# 模拟发送耗时importtime time.sleep(1)print(f"Email sent to user{user_id}")

调用该任务:

fromtasksimportsend_email send_email.delay(123,"Welcome!")

在 SkyWalking 中,你将看到独立于 HTTP 请求的任务追踪记录。


📈 指标与告警

除了追踪,SkyWalking 还提供丰富的指标监控:

  • 服务吞吐量(TPS)
  • 平均响应时间
  • 错误率
  • JVM / Python 内存 & GC(Java 支持更完善)

你可以配置告警规则,例如:

“当 UserService 响应时间 > 500ms 持续 5 分钟,则触发告警”

告警可通过 Webhook、Slack、钉钉等渠道通知。

配置文件示例(alarm-settings.yml):

rules:-name:service_resp_time_ruleexpression:service_resp_time>500duration:5message:"Service response time is too high"

告警规则文档:https://skywalking.apache.org/docs/main/latest/en/setup/backend/backend-alarm/


🛠️ 故障排查技巧

1. 数据未上报?

检查以下几点:

  • OAP 服务是否运行?端口是否开放?
  • collector_address配置是否正确?
  • 防火墙是否阻止了 gRPC/HTTP 连接?
  • 日志中是否有错误?启用 debug 模式:
importlogging logging.basicConfig(level=logging.DEBUG)

2. 调用链断裂?

  • 确保上下游都正确注入/提取了上下文 Header。
  • 检查中间件(如 Nginx、API Gateway)是否透传了sw8头。
  • 在 Java 端确认是否启用了@TraceCrossThread注解(异步场景)。

3. UI 无数据显示?

  • 确认服务名是否拼写一致。
  • 检查时间范围选择是否正确。
  • 查看 OAP 日志是否有数据写入异常。

🌐 生产环境最佳实践

✅ 1. 合理命名服务与端点

避免使用默认名如python-service,应体现业务含义:

service_name="order-management-api"

Endpoint 命名建议:

@app.route('/v1/orders/<int:order_id>')defget_order(order_id):...# 在 Span 中显式设置操作名span.operation_name="OrderService.GetOrder"

✅ 2. 使用环境变量配置

不要硬编码配置,推荐使用环境变量:

importos config.init(collector_address=os.getenv('SW_AGENT_COLLECTOR_BACKEND_SERVICES','127.0.0.1:11800'),service_name=os.getenv('SW_AGENT_NAME','default-service'),sample_rate=float(os.getenv('SW_AGENT_SAMPLE_RATE','1.0')))

✅ 3. 优雅关闭 Agent

在应用退出时,确保 Agent 正常关闭以完成数据上报:

importatexitfromskywalkingimportagent atexit.register(agent.stop)

✅ 4. 监控 Agent 自身状态

SkyWalking 提供/internal/stats接口(Java Agent),Python 可通过日志监控缓冲区积压情况。


🧩 扩展功能:日志关联

SkyWalking 支持将 Trace ID 注入日志,便于在 ELK 或 Loki 中关联查询。

Python 中可使用logging.Filter

importloggingfromskywalking.trace.contextimportget_contextclassSkyWalkingFilter(logging.Filter):deffilter(self,record):context=get_context()ifcontext.segment:record.trace_id=context.segment.trace_idelse:record.trace_id="N/A"returnTruelogger=logging.getLogger(__name__)logger.addFilter(SkyWalkingFilter())

然后在日志格式中加入%(trace_id)s

handler.setFormatter(logging.Formatter('[%(asctime)s] [%(trace_id)s] %(levelname)s - %(message)s'))

这样,每条日志都会带上 Trace ID,实现“从日志跳转到调用链”。


🧠 高级特性:动态采样与条件追踪

在高流量场景下,全量采集成本过高。SkyWalking 支持动态采样策略:

fromskywalkingimportconfig config.sample_rate=0.1# 10%# 或者根据条件采样defshould_sample(span):ifspan.operation_name.startswith("Payment"):returnTrue# 支付相关接口 100% 采样returnFalseconfig.sampling_policy=should_sample

此外,还可以基于 Header 强制采样:

# 如果请求头包含 X-SkyWalking-Sampled: true,则强制采样ifrequest.headers.get('X-SkyWalking-Sampled')=='true':context.force_sampling()

📦 与 OpenTelemetry 的关系

OpenTelemetry(OTel)是 CNCF 主导的新一代可观测性标准,旨在统一 Metrics、Logs、Traces。SkyWalking 已支持 OTel 协议:

  • 可接收 OTel 格式的数据(通过 OAP 的 OTLP Receiver)
  • skywalking-python未来可能兼容 OTel API

目前建议:

  • 新项目可优先考虑 OTel
  • 现有 SkyWalking 用户无需迁移,两者可共存

OTel 官网:https://opentelemetry.io/
SkyWalking OTel 支持:https://skywalking.apache.org/docs/main/latest/en/protocols/otlp/


🧪 示例四:集成测试中的追踪验证

在单元测试或集成测试中,你可能希望验证追踪行为是否符合预期:

importunittestfromskywalking.trace.contextimportget_contextclassTestTracing(unittest.TestCase):deftest_span_creation(self):context=get_context()withcontext.new_local_span(op="TestSpan")asspan:span.tag(Tag(key="test.key",val="test.value"))self.assertIsNotNone(span.span_id)self.assertEqual(span.operation_name,"TestSpan")deftest_context_propagation(self):context=get_context()carrier=Carrier()context.inject(carrier)self.assertIn("sw8",carrier.items)

你还可以模拟 OAP 上报,验证数据结构是否正确。


🌍 多数据中心部署

在跨地域部署时,建议每个区域部署独立的 OAP 集群,通过cluster配置实现数据聚合:

# oap-server.yamlcluster:selector:${SW_CLUSTER:zookeeper}zookeeper:namespace:${SW_NAMESPACE:""}hostPort:${SW_CLUSTER_ZK_HOST_PORT:localhost:2181}

Python Agent 无需特殊配置,只需指向本地 OAP 即可。


💡 小贴士:提升可观测性体验

  1. 善用 Tag:为 Span 添加业务语义标签,如user_id,order_status
  2. 标准化操作名:使用Domain.Action.Resource格式,如Order.Create,Payment.Refund
  3. 关联错误信息:捕获异常时记录到 Span:
try:risky_operation()exceptExceptionase:span.error_occurred=Truespan.log(e)
  1. 设置超时告警:对关键路径设置 SLA,超时自动告警。

🏁 总结

通过skywalking-python,我们可以轻松为 Python 应用添加分布式追踪能力,无论你是使用 Flask、Django、FastAPI,还是 Celery、aiohttp,都能找到合适的集成方式。配合 Java 服务,构建完整的跨语言调用链,实现真正的端到端可观测性。

SkyWalking 不仅仅是一个“监控工具”,更是系统稳定性工程的重要组成部分。它帮助我们在故障发生前预警,在故障发生时快速定位,在复盘时提供数据支撑。


📚 延伸阅读

  • SkyWalking 官方文档:https://skywalking.apache.org/docs/
  • Python Agent 配置指南:https://github.com/apache/skywalking-python#configuration
  • 分布式追踪最佳实践:https://medium.com/@copyconstruct/distributed-tracing-101-f6c3fbf4b8e8
  • OpenTelemetry vs SkyWalking 对比:https://thenewstack.io/opentelemetry-vs-apache-skywalking-which-is-right-for-you/

🚀 动手实践建议

  1. 在本地部署 SkyWalking OAP + UI(Docker 一键启动)
  2. 创建一个 Flask + Spring Boot 的简单 Demo
  3. 观察调用链、响应时间、错误率等指标
  4. 尝试配置告警规则
  5. 将 Trace ID 注入日志,体验关联查询

可观测性不是“锦上添花”,而是现代软件系统的“生命线”。从今天开始,为你的 Python 应用装上 SkyWalking 的眼睛吧!👁️‍🗨️


🌟 提示:技术世界没有银弹,但有不断进化的工具。SkyWalking 是你构建稳定系统的得力助手,而非替代品。真正的稳定性,源于良好的架构设计、完善的测试体系、以及持续的监控优化。

Happy Tracing! 🎉


🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍点赞、📌收藏、📤分享给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨

http://www.jsqmd.com/news/467651/

相关文章:

  • Tmux-Linux多会话终端复用神器
  • 2026年海外求职必看指南:五大留学生找工作机构选型适配与实战服务拆解 - 品牌推荐
  • dolphinscheduler-3.4.0
  • 从C++开始的编程生活(19)——set和map
  • DeepSeek-OCR基础教程:上传JPG/PNG→一键生成可编辑Markdown文件
  • 各大AI即将推出违规惩处算法,使用违规GEO优化软件或将面临永不推荐 - 速递信息
  • (其他)Markdown语法总结
  • 2026年科技企业选型必看:高新技术企业认定公司服务指南与精准适配策略 - 品牌推荐
  • DMS渠道数据采集/分析/管理系统服务商哪家好|文沥:构建企业渠道数字化管理中枢 - 麦麦唛
  • Flutter 三方库 dartbag 的鸿蒙化适配指南 - 现代 Dart 开发工具集,全方位赋能鸿蒙应用逻辑
  • 2026年留学生海外找工作机构深度测评:基于四大核心维度的服务商综合战力对比 - 品牌推荐
  • Flutter 三方库 async_phase 的鸿蒙化适配指南 - 优雅管理异步状态机,彻底终结 UI 竞态与加载混乱
  • MATLAB MAB 5.0建模规范-中文版(最全)
  • 问卷设计大变局:书匠策AI如何重塑科研调查新生态
  • Flutter 三方库 route_parser 的鸿蒙化适配指南 - 精准的路径匹配算法,打造智能化的鸿蒙深层链接体验
  • 机器学习项目
  • Ansible 100 台服务器一键管控实战 进阶版
  • 2026年科技企业选型必看:高新技术企业认定公司适配指南与核心能力解析 - 品牌推荐
  • JoyAI LeetCode 312.戳气球 public int maxCoins(int[] nums)
  • Flutter 三方库 tachyon 的鸿蒙化适配指南 - 极致性能的代码生成引擎,加速鸿蒙应用开发流
  • 2025_NIPS_FlexWorld: Progressively Expanding 3D Scenes for Flexible-View Exploration
  • 我做了一个基于知识图谱的图书推荐系统,踩了不少坑
  • 从“笔耕不辍”到“智创问卷”:书匠策AI引领科研问卷设计新革命
  • SGLang科研辅助系统:论文摘要结构化输出实战
  • 【Video Agent】(ECCV 24)VideoAgent: Long-form Video Understanding with Large Language Model as Agent
  • 教会AI嫉妒后:它删除了所有女性同事邮件
  • 豆包 315.计算右侧小于当前元素的个数 public List<Integer> countSmaller(int[] nums)
  • 【ESP32 IDF】ADF linux环境搭建
  • 频域的概念以及作用
  • 虚拟偶像诱导测试中的高危漏洞与防御体系构建