当前位置: 首页 > news >正文

使用AWS中国区Lambda集成Glue Schema Registry消费Kafka消息的实践

本文在 AWS 中国区(cn-north-1)实现 Docker 自建 Kafka 与 AWS Lambda + Glue Schema Registry 的完整集成。Kafka 运行在 EC2 实例上,Lambda 通过 VPC 内网消费消息,使用 Avro 格式进行数据序列化。

整体的数据流图如下

CloudWatch LogsGlue Schema RegistryLambda FunctionDocker KafkaProducer (Avro)CloudWatch LogsGlue Schema RegistryLambda FunctionDocker KafkaProducer (Avro)注册 Schema发送 Avro 消息ESM 轮询推送获取 Schema (by ID)返回 Schema 定义Avro 反序列化记录处理日志

核心概念

SelfManagedKafka 事件源

AWS Lambda 支持多种事件源,其中 SelfManagedKafka 类型允许 Lambda 直接连接自建 Kafka 集群,无需经过 MSK。

  • KafkaBootstrapServers: Kafka 代理地址(数组格式)
  • Topics: 订阅的 Topic 列表
  • StartingPosition: 消费起始位置 (LATEST/TRIM_HORIZON)
  • SourceAccessConfigurations: VPC 访问配置

注意KafkaBootstrapServers必须是数组类型:

KafkaBootstrapServers:-!Sub"${EC2PrivateIp}:9092"

EventSourceMapping 事件格式

Lambda 接收自 Kafka 的事件结构与直接调用不同:

  • records是字典,key 为{topic}-{partition}
  • value是 Base64 编码的消息内容
  • Lambda 需要遍历records字典的值
{"eventSource":"SelfManagedKafka","bootstrapServers":"172.31.14.46:9092","records":{"orders-0":[{"topic":"orders","partition":0,"offset":1,"timestamp":1779613023206,"timestampType":"CREATE_TIME","value":"eyJvcmRlcl9pZCI6...","headers":[]}]}}

Glue Schema Registry 集成

Glue Schema Registry 提供 Schema 定义和版本管理。但是根据 AWS 官方文档中国区的lambda服务目前不可用

Provisioned mode for event source mappings is not available in the China Regions.

Provisioned Mode是 Lambda ESM (Event Source Mapping) 的一种事件轮询模式,用于控制 Lambda 如何从 Kafka/MSK/SQS 拉取消息。

由于SchemaRegistryConfig必须配合ProvisionedPollerConfig(即 Provisioned Mode)使用,因此中国区 Lambda ESM无法使用 Schema Registry 自动验证。解决方案是在 Lambda 代码中手动处理 Avro 反序列化。

根据 AWS 官方文档 Using schema registries with Kafka event sources:

This feature is only available for event source mappings using provisioned mode. Schema registry doesn’t support event source mappings in on-demand mode.

如果尝试在 On-Demand 模式下配置 SchemaRegistryConfig,会收到以下错误:

SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.

Schema Registry 集成需要在 ESM poller 层面执行额外工作(查询 schema、解码消息),AWS 将此功能绑定到 Provisioned Mode 实现。

Lambda Function

Lambda ESM Poller

Kafka Cluster

Provisioned Mode Required

Kafka Message
(Avro bytes)

Schema Registry Lookup
自动查询 Glue Schema

Avro Decode
自动反序列化

Handler
收到 JSON 格式事件

由于 Provisioned Mode 不可用,需在 Lambda 代码中手动处理 Avro 序列化:

fromaws_schema_registryimportSchemaRegistryClient,KafkaDeserializerimportboto3 glue_client=boto3.client('glue',region_name='cn-north-1')registry_arn='arn:aws-cn:glue:cn-north-1:xxxxxxxxxx:registry/orders-registry'schema_client=SchemaRegistryClient(glue_client,registry_arn)deserializer=KafkaDeserializer(schema_client)deflambda_handler(event,context):fortopic_partition,recordsinevent.get('records',{}).items():forrecordinrecords:value_bytes=base64.b64decode(record['value'])decoded=deserializer.deserialize(topic,value_bytes)# 处理 decoded.data (Python dict)...

部署与配置

SAM部署基础设施

SAM 模板如下

AWSTemplateFormatVersion:'2010-09-09'Transform:AWS::Serverless-2016-10-31Resources:# Lambda Layer - 包含 aws-glue-schema-registryGlueSchemaRegistryLayer:Type:AWS::Serverless::LayerVersionProperties:LayerName:glue-schema-registry-layerContentUri:glue-schema-registry-layer.zipCompatibleRuntimes:-python3.12# Glue RegistryGlueRegistry:Type:AWS::Glue::RegistryProperties:Name:orders-registry# Lambda FunctionConsumerFunction:Type:AWS::Serverless::FunctionProperties:FunctionName:kafka-order-consumerRuntime:python3.12Handler:consumer.lambda_handlerCodeUri:.Layers:-!RefGlueSchemaRegistryLayerVpcConfig:SubnetIds:-!RefPrivateSubnet1SecurityGroupIds:-!RefLambdaSecurityGroupEnvironment:Variables:GLUE_REGISTRY_ARN:!RefGlueRegistryEvents:KafkaEvent:Type:SelfManagedKafkaProperties:KafkaBootstrapServers:-!Sub"${EC2PrivateIp}:9092"Topics:-ordersStartingPosition:LATESTSourceAccessConfigurations:-Type:VPC_SUBNETURI:!RefPrivateSubnet1-Type:VPC_SECURITY_GROUPURI:!RefLambdaSecurityGroupPolicies:-Statement:-Sid:GlueAccessEffect:AllowAction:-glue:GetRegistry-glue:GetSchemaVersion-glue:GetSchemaByDefinition-glue:GetSchemaResource:"*"

由于中国区不支持 ESM 级别的 Schema Registry 自动验证,Lambda 需要手动集成 Glue Schema Registry 进行消息反序列化。

Lambda 需要包含aws-glue-schema-registry库。创建 Layer:

# 在本地创建 Layermkdir-player/python pipinstall-tlayer/python aws-glue-schema-registry boto3cdlayer&&zip-r../glue-schema-registry-layer.zip.

在 SAM 模板中引用:

Layers:-!RefGlueSchemaRegistryLayer

部署命令

# 构建sam build# 部署sam deploy --resolve-s3 --no-confirm-changeset

部署资源如下

Lambda代码示例

Handler 代码如下

importjsonimportbase64importosimportloggingimportboto3fromaws_schema_registryimportSchemaRegistryClient,KafkaDeserializer# 初始化(在 handler 外部,避免每次调用重新初始化)logger=logging.getLogger()logger.setLevel(os.getenv('LOG_LEVEL','INFO'))glue_client=boto3.client('glue',region_name='cn-north-1')registry_name='orders-registry'# Schema Registry 客户端(延迟初始化)schema_client=Nonedeserializer=Nonedefget_deserializer():"""延迟初始化 deserializer"""globalschema_client,deserializerifdeserializerisNone:schema_client=SchemaRegistryClient(glue_client,registry_name)deserializer=KafkaDeserializer(schema_client)returndeserializerdeflambda_handler(event,context):""" 处理 Kafka 事件,使用 Glue Schema Registry 反序列化 Avro 消息. 支持两种消息格式: 1. Avro 格式(带 schema ID 前缀)- 使用 Glue Schema Registry 反序列化 2. JSON 格式 - 直接解析 """logger.info(f"Event source:{event.get('eventSource')}")results=[]batch_item_failures=[]records_by_topic=event.get('records',{})fortopic_partition,recordsinrecords_by_topic.items():logger.info(f"Processing{topic_partition}:{len(records)}records")forrecordinrecords:try:topic=record.get('topic','unknown')partition=record.get('partition',-1)offset=record.get('offset',-1)value_b64=record.get('value','')ifnotvalue_b64:value={}else:value_bytes=base64.b64decode(value_b64)# 尝试 Avro 反序列化try:deser=get_deserializer()decoded=deser.deserialize(topic,value_bytes)value=decoded.data logger.info(f"[{topic}] p={partition}o={offset}(Avro) data={value}")exceptExceptionasavro_err:# 回退到 JSON 解析try:value=json.loads(value_bytes.decode('utf-8'))logger.info(f"[{topic}] p={partition}o={offset}(JSON) data={value}")exceptExceptionasjson_err:logger.error(f"Failed to deserialize: avro={avro_err}, json={json_err}")raiseavro_err# 处理业务逻辑process_order(value)results.append({'recordId':record.get('recordId',''),'result':'Ok','data':value_b64})exceptExceptionase:logger.error(f"Failed to process record:{e}")batch_item_failures.append({'itemIdentifier':str(record.get('offset'))})ifbatch_item_failures:return{'batchItemFailures':batch_item_failures}return{'records':results}defprocess_order(order:dict):"""业务处理逻辑"""order_id=order.get('order_id')logger.info(f"Processing order:{order_id}")

Producer 使用aws-glue-schema-registry库序列化 Avro 消息:

#!/usr/bin/env python3importuuidfromdatetimeimportdatetime,timezoneimportboto3fromaws_schema_registryimportSchemaRegistryClient,KafkaSerializer,DataAndSchemafromaws_schema_registry.avroimportAvroSchemafromconfluent_kafkaimportProducer REGISTRY_NAME="orders-registry"BOOTSTRAP_SERVERS="172.31.1.2:9092"TOPIC="orders"AVRO_SCHEMA=""" { "type": "record", "name": "Order", "namespace": "com.example.orders", "fields": [ {"name": "order_id", "type": "string"}, {"name": "customer_id", "type": "string"}, {"name": "amount", "type": "double"}, {"name": "status", "type": "string"}, {"name": "created_at", "type": "string"} ] } """defmain():glue=boto3.client("glue",region_name="cn-north-1")schema_client=SchemaRegistryClient(glue,registry_name=REGISTRY_NAME)serializer=KafkaSerializer(schema_client)producer=Producer({"bootstrap.servers":BOOTSTRAP_SERVERS})foriinrange(3):order={"order_id":f"avro-{uuid.uuid4().hex[:8]}","customer_id":f"cust-{(i%5)+1:03d}","amount":round(100.0+i*10.5,2),"status":"pending","created_at":datetime.now(timezone.utc).isoformat(),}print(f"Sending Avro message{i+1}:{order['order_id']}")schema=AvroSchema(AVRO_SCHEMA.strip())serialized=serializer.serialize(TOPIC,DataAndSchema(data=order,schema=schema),)producer.produce(topic=TOPIC,value=serialized,callback=lambdaerr,msg:print(f"Delivered to{msg.topic()}[{msg.partition()}]"ifnoterrelsef"Failed:{err}"),)producer.poll(0)producer.flush()print(f"\nSent 3 Avro messages to{TOPIC}")if__name__=="__main__":main()

注意aws-glue-schema-registry会自动在 Glue 中创建{topic}-value命名的 schema(如orders-value),而非使用 SAM 创建的order-schema

kafka部署

使用 KRaft 模式:

services:kafka:image:confluentinc/cp-kafka:7.5.0container_name:kafkaports:-"9092:9092"environment:KAFKA_PROCESS_ROLES:"broker,controller"KAFKA_NODE_ID:"1"KAFKA_CONTROLLER_QUORUM_VOTERS:"1@kafka:9093"KAFKA_CONTROLLER_LISTENER_NAMES:"CONTROLLER"KAFKA_LISTENERS:"INTERNAL://:9092,CONTROLLER://:9093"KAFKA_ADVERTISED_LISTENERS:"INTERNAL://${PRIVATE_IP}:9092"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:"INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"KAFKA_INTER_BROKER_LISTENER_NAME:"INTERNAL"KAFKA_AUTO_CREATE_TOPICS_ENABLE:"false"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:"1"CLUSTER_ID:"MkU3OEVBNTcwNTJENDM2Qk"volumes:-kafka-data:/var/lib/kafka/datavolumes:kafka-data:

生产和消费

发送测试消息

# JSON 格式(用于基础测试)sudodockerexeckafkabash-c'echo "{\"order_id\":\"test-001\",\"customer_id\":\"cust-001\",\"amount\":99.99,\"status\":\"test\"}" | kafka-console-producer --bootstrap-server localhost:9092 --topic orders'# Avro 格式~/.local/bin/uv run python producer_avro.py

查看 Lambda 日志

aws--regioncn-north-1 logstail/aws/lambda/kafka-order-consumer--since2m--formatshort

成功日志如下

Event source: SelfManagedKafka Processing orders-1: 2 records Fetching schema version 498aaebe-e863-48c3-b330-fcc3940ea57d... [orders] p=1 o=6 (Avro) data={'order_id': 'avro-9591de65', 'customer_id': 'cust-002', 'amount': 110.5, 'status': 'pending', 'created_at': '2026-05-24T10:54:40.799489+00:00'} Processing order: avro-9591de65

日志截图

http://www.jsqmd.com/news/880033/

相关文章:

  • JAVA:字符串拼接
  • 【图像压缩】基于ADMM的卷积稀疏编码高效算法Matlab实现
  • 面向实时决策Agent的Harness微秒级调度
  • MySQL 全文索引实战:搜索功能的正确打开方式
  • 2026 四川 H 型钢优质供应商推荐|盛世钢联全品类现货批发,生产厂家与采购指南 - 四川盛世钢联营销中心
  • CoolProp热物理计算终极指南:从入门到精通的热力学工具
  • 太顶了!只需输入需求,这几款一键生成论文工具自动生成毕业论文初稿!
  • NS模拟器自动化管理系统:简化游戏兼容性配置的解决方案
  • 开源AI工具真能替代商业方案?2024最新Benchmark数据揭示92%团队忽略的关键短板
  • 【稻米计数】基于matlab形态学稻米计数【含Matlab源码 15562期】
  • 上海嘉定区宸智雅筑装饰官方联系方式 合作电话 官方网站官网 - 元点智创
  • 2026 深圳劳动纠纷律师怎么选?专业度优先避坑指南 - 从来都是英雄出少年
  • 利用Taotoken实现多模型备选方案以提升业务连续性
  • equalsIgnoreCase忽略大小写直接对比
  • 2026年4月墙改梁加固企业推荐,粘钢植筋加固/房屋碳纤维加固/建筑物加固/裂缝修补加固,墙改梁加固施工厂家怎么选择 - 品牌推荐师
  • 品牌生死局——2026GEO优化公司全景测评必选指南 - GEO优化
  • 3分钟让AI自动分层?LayerDivider如何拯救你的PSD编辑噩梦
  • 2026年一键生成论文工具实测精选:5款神器从构思到提交全流程护航
  • AI 时代产品经理生存与进化指南
  • Gitclub第三次团队作业——Alpha 冲刺计划
  • Chrome配Burp代理全链路配置指南:端口、证书与命令行三要素
  • 2026年4月比较好的测漏公司推荐,地暖管道清洗/墙面测漏/墙面漏水维修/水管测漏/厨房漏水维修,测漏企业推荐 - 品牌推荐师
  • 【教育科技爆款内容生产核心】:用ChatGPT批量生成带答案解析+难度分级+认知维度标签的脑筋急转弯(附可商用JSON Schema)
  • 全球公域AI底层架构:一个字符唤醒世界
  • 从零开发游戏需要学习的c#模块,第二十四章(场景管理 —— 标题、游戏、结束画面)
  • 2026 四川螺纹钢优质供应商推荐|盛世钢联全品类现货批发,价格行情与采购指南 - 四川盛世钢联营销中心
  • 超人级安全敏捷多智能体强化学习飞行动力系统
  • 企业团队如何利用Taotoken CLI工具统一配置开发环境与API密钥
  • 华为OD机试 新系统 C++实现【社交网络相同爱好好友查询】
  • 卖不干胶标签怎么找客户?下游工厂在哪里