当前位置: 首页 > news >正文

GBase 8c逻辑解码解析

本文针对南大通用 GBase 8c (gbase database)数据库,围绕逻辑解码增量数据推送至 Kafka 场景,详细介绍配置流程、操作步骤及注意事项,指导用户快速完成增量数据实时同步到 Kafka 的落地部署。

概述

GBase 8c 提供逻辑解码能力,通过解析 WAL 预写日志生成逻辑日志,目标数据库可基于逻辑日志实现实时数据复制。逻辑复制方式对目标数据库部署形态约束较低,支持异构数据库、同库不同部署形态之间的数据同步;同步过程中目标数据库可正常读写,整体同步时延较低。
依托逻辑复制机制,结合 GBase 8c 自研的逻辑转 Kafka 能力,可按数据节点并发从 WAL 日志中实时解析增量数据,并推送至 Kafka 集群。

配置步骤

当前测试是将 GBase8c 集群中 testa 库下的 public.t_user 的数据变更操作推送到 kafka,环境信息如下:

配置项目配置地址
推送的数据名称testa
推送表public.t_user
Kafka 地址172.16.5.51:9092,172.16.5.53:9092,172.16.5.54:9092
Topic 名称gbase_user_topic

kafka 配置

配置时只需提供 Kafka 地址、Topic 名称,若开启 SSL 安全认证,同步提供 kafka 认证信息即可。

数据库配置

  1. 修改数据库配置参数,开启参数后,数据库即可支持将数据变更推送至 Kafka。
--开启l2k同步开关 gs_guc reload -Z datanode -N all -I all -c "enable_l2k=true" --指定wal日志存储的级别为logical,在replica级别(支持wal归档和复制)的基础上,还支持逻辑解码信息 gs_guc reload -Z datanode -N all -l all -c "wal_level=logical" --开启默认用户的replication连接认证 gs_guc reload -Z datanode -N all -l all -h "local replication gbase trust"
  1. 新建配置表
    配置表用于记录需要将数据变更推送至 Kafka 的表清单,必须以复制表形式创建。如:
create table gbase_to_kafka_cnf( tablename varchar(100), topic varchar(100) ) DISTRIBUTE BY REPLICATION; 字段解释: Tablename:需要推送表的名称,字段名称不能修改,字段长度可以调整; Topic:推送到kafka的topic名称,字段名称不能修改,字段长度可以调整;
  1. 配置表写入数据
    将需要推送的表清单写入配置表,表名必须按照 schema.table_name 格式填写。
INSERT INTO gbase_to_kafka_cnf values('public.t_user,'gbase_user_topic');
  1. 开启数据同步
gs_guc reload -Z datanode -N all -I all -c "l2k_conf='{\"config\":[{\"dbname\":\"testa\",\"kafka_broker\":\"172.16.5.51:9092,172.16.5.53:9092,172.16.5.54:9092\",\"kafka_topic_table\":\"gbase_to_kafka_cnf\"}],\"type\":\"wal2json\",\"verbose\":\"off\"}'"

其他配置项如:

配置信息描述举例
dbname数据库名称。test
kafka_brokerkafka 的 broker 配置,用","分隔。192.168.1.10:9092
kafka_topickafka 的 topic 配置,与 kafka_topic_table 互斥。test1
kafka_topic_table记录表名和 kafka topic 映射关系的数据库表名,与 kafka_topic 互斥。testft1
schema2db为 schema 与 db 的映射关系,用","分隔;schema 和 db 间用":"分隔。test1:public
type转换类型。不填写为默认的 wal2json。wal2json
verbose详细日志。默认关闭。on
pause同步暂停开关。默认关闭。on

其中 kafka_topic_table 数据表格式要求:

  • kafka_topic_table 数据表需为复制表,包含 tablename 和 topic 列。
  • tablename 需填写转换后的表名。例如:若配置 schema2db 转换规则为 test1::public,则表 test1.k1 需填写为 public.k1。

测试推送

  1. 在数据库中对 public.t_user 表进行数据 DML 操作。
update t_user set age=28,city='深圳',update_time='2026-05-11 15:38:43.150452' where id=9; insert into t_user values(12,'王五',28,'北京','2026-05-11 16:55:09.453177'); delete from t_user where id=6;
  1. 在 kafka 中读取内容,内容为 JSON 文档格式,其他应用程序可以通过消费 kafka 内容,读取到指定表的数据变更。
{"change":[{"kind":"update","schema":"public","table":"t_user","columnnames":["id","name","age","city","update_time"],"columntypes":["integer","character varying(50)","integer","character varying(50)","timestamp without time zone"],"columnvalues":[9,"赵六",28,"深圳","2026-05-11 15:38:43.150452"],"oldkeys":{"keynames":["id"],"keytypes":["integer"],"keyvalues":[9]}}]} {"change":[{"kind":"insert","schema":"public","table":"t_user","columnnames":["id","name","age","city","update_time"],"columntypes":["integer","character varying(50)","integer","character varying(50)","timestamp without time zone"],"columnvalues":[12,"王五",28,"北京","2026-05-11 16:55:09.453177"]}]} {"change":[{"kind":"delete","schema":"public","table":"t_user","oldkeys":{"keynames":["id","name","age","city","update_time"],"keytypes":["integer","character varying(50)","integer","character varying(50)","timestamp without time zone"],"keyvalues":[6,"张三",25,"南京","2026-05-11 15:38:43.150452"]}}]}

注意事项

  • 默认情况下,推送到 Kafka 的 UPDATE 数据仅包含主键列及更新后的数据,不包含其他字段与修改前数据。若需要将修改前的值 + 全部字段一同推送至 kafka,需将推送表的 replica 属性设置为 full,命令如下:
alter table table_name replica identify full;
  • 需要注意 DN 节点的 max_replication_slots 和 max_replication_slots 参数,需满足:max_wal_senders>=物理复制槽数量 +max_replication_slots。
http://www.jsqmd.com/news/939449/

相关文章:

  • ai-agent 响应速度优化
  • ImageJ:开源科学图像分析的完整解决方案
  • 别再只盯着Gini和OOB了:用Python的sklearn实战对比随机森林特征重要性(附完整代码)
  • 从DeLong检验的数学原理到Python复现:一篇搞懂AUC显著性检验的底层逻辑(附完整代码)
  • 维修公司用什么工单系统比较好?2026年真实对比亲测好用
  • 2026年MRAM芯片价格分析,本土厂的优势在哪? - mypinpai
  • 别再手动调参数了!用UE5材质函数快速搞定下雨积水效果(附完整材质蓝图)
  • 用Python和PyTorch实战MADQN:在Switch4游戏里教会4个AI协作通关
  • 超越简单分类:用东南大学齿轮箱数据集实战故障严重度评估与迁移学习
  • 用Python从零实现混沌博弈算法(CGO):一个骰子如何帮你优化参数?
  • 作物生长模拟全流程研究:基于WOFOST与PCSE模型的理论、实操与应用对比
  • ASIC压缩加速器技术解析与存储优化实践
  • MIPI I3C从设备Verilog实现方案:高性能嵌入式通信架构解析
  • 如何用BepInEx框架为Unity游戏注入无限可能:从零到精通的完整指南
  • 2026年选购建筑垃圾清运公司,这些排名值得参考 - mypinpai
  • 计算机毕业设计之基于Hadoop和Echarts的京东消费者行为分析与可视化
  • ESP8266+阿里云物联网平台:从设备创建到双向通信的保姆级配置指南
  • 全光网与PON网络区别对比分析
  • 泰安双龙线路器材包塑金属软管如何检测环境适应性
  • 2026年Q355B钢管好用的厂家推荐 - mypinpai
  • 答辩PPT制作效率翻倍!百考通AI学术PPT实战测评
  • 从实验设计到结果解读:RNA-seq数据归一化(RPKM/TPM)的常见误区与避坑指南
  • 2026年q2郑州优质专科学校选型推荐:郑州工业应用技术学院怎么样/郑州民办大学有那些/实测维度解析 - 优质品牌商家
  • MMD分裂准则在分布随机森林中的原理与应用
  • 魔兽争霸III焕新指南:WarcraftHelper游戏增强插件完整教程
  • 算盘科技深度解析:定制智慧城市解决方案的顶层设计“珠算”逻辑
  • 【第 4 篇:RAG 知识库问答——检索只是第一步】
  • 大模型又把星期几算错了?一行Python代码彻底杜绝“幻觉”
  • IAR环境下HT1621B驱动笔段式LCD的可烧录工程包(含调试脚本与硬件验证)
  • Linux视频教程之高级运维企业实战(高级版)【共24课时】_Linux课程-51CTO学堂