当前位置: 首页 > news >正文

使用OpenMetadata结合Great Expectations实现数据质量管理的实践

想象你是一家电商公司的数据分析师。每天有大量数据进入数据库:用户订单、商品库存、用户行为日志等。如果数据有问题(比如订单金额是负数、用户邮箱格式错误、同一订单号出现两次),你的分析结果就是错的,业务决策也会出错。

概念和理论

数据质量管理就是一套方法和工具,用来:

  1. 定义规则:比如"邮箱必须符合格式"、"订单金额必须大于0"
  2. 自动检查:用程序定期扫描数据,找出不符合规则的记录
  3. 记录结果:保存每次检查的结果,方便追溯和汇报

OpenMetadata

OpenMetadata(OM) 是一个数据目录平台。可以把它理解为一个"数据库的数据库",它记录所有数据库的元信息:

  • 有多少个数据库?叫什么名字?
  • 每个库里有哪些表?每个表有哪些列?
  • 表之间的血缘关系是什么?(比如表A的数据来自表B)
  • 谁在用这些数据?多久没更新了?

还会保存数据质量检查的结果,这正是我们集成 Great Expectations 的原因。

对比 Hive Metastore

那么OpenMetadata 能替代 Hive Metastore 吗?不能直接替代,但可以同步。

Hive Metastore 不只是"存元数据",它是 Hadoop 生态的基础设施,Hive/Spark/Trino 通过 Thrift RPC 调用 Metastore,这是硬编码的协议。

graph TBsubgraph "Hive Metastore 职责"H1[存储表定义<br/>列名/类型/分区]H2[存储数据位置<br/>HDFS/S3 路径]H3[存储序列化格式<br/>ORC/Parquet]H4[提供 Thrift RPC<br/>供引擎调用]endHIVE[Hive] --> H1HIVE --> H2SPARK[Spark] --> H1SPARK --> H3PRESTO[Trino] --> H4

OpenMetadata 则是用来"看"数据的。正确的架构是两者并存,例如下图

graph TBsubgraph "数据层"DATA[(HDFS/S3)]endsubgraph "计算层"HIVE[Hive]SPARK[Spark]TRINO[Trino]endsubgraph "元数据层"MS[(Hive Metastore<br/>Thrift 接口)]OM[OpenMetadata<br/>REST API]endDATA --> HIVEDATA --> SPARKDATA --> TRINOHIVE <-->|Thrift RPC| MSSPARK <-->|Thrift RPC| MSTRINO <-->|Thrift RPC| MSMS -->|同步| OMBI[BI 工具] -->|SQL 查询| TRINOGOV[数据治理] -->|搜索/血缘| OMstyle MS fill:#ffcdd2style OM fill:#c8e6c9

如何将mestore同步到openmetadata?

# 摄取配置:hive-metastore-ingestion.yaml
source:type: HiveserviceName: hive-metastore-prodserviceConnection:config:type: HivehostPort: hive-metastore:9083  # Thrift 端口metastoreConnection: thrift://hive-metastore:9083sink:type: metadata-restconfig: {}workflowConfig:openMetadataServerConfig:hostPort: http://localhost:8585/api

运行同步指令

metadata ingest -c hive-metastore-ingestion.yaml

同步后OM 中会有:

  • 所有数据库和表
  • 表的分区信息
  • 列的类型定义
  • 数据位置(HDFS/S3 路径)

Great Expectations

Great Expectations(GX) 是一个 Python 数据质量检查框架。"Expectation"的意思是"期望"——你告诉它"我期望邮箱列不为空",它就会检查数据是否满足这个期望。

  1. 读取数据,检查每一行的 email 列,统计有多少行是空的
  2. 返回结果:通过/失败,失败的数量,失败的比例
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="email")
)

为什么要使用 GX?

  • 不用写 SQL,用简单的 Python API 定义规则
  • 自动生成检查报告
  • 有 300+ 种内置期望规则
  • 与 OpenMetadata 无缝集成

GX和OpenMetadata集成的具体流程

  1. 在 GX 里写好规则(比如"邮箱不能为空")
  2. GX 检查 PostgreSQL 中的数据
  3. GX 把检查结果推送到 OM
  4. 打开 OM 的网页,看到仪表盘上有红色/绿色的测试结果

下图说明了本文涉及到的所有组件关系

graph TBsubgraph "第一步:存储数据"PG[(PostgreSQL<br/>存业务数据<br/>端口 5433)]endsubgraph "第二步:检查数据质量"GX[Great Expectations<br/>Python 脚本]endsubgraph "第三步:展示结果"OM[OpenMetadata Server<br/>端口 8585]MY[(MySQL<br/>存元数据<br/>端口 3307)]OS[(OpenSearch<br/>搜索引擎<br/>端口 9202)]UI[网页界面]endPG -->|读取数据| GXGX -->|发送检查结果| OMOM -->|存储| MYOM -->|搜索| OSUI -->|查看| OMstyle PG fill:#c8e6c9style GX fill:#bbdefbstyle OM fill:#fff9c4

OpenMetadata需要 MySQL 和 OpenSearch

  • MySQL,把"有哪些数据库"、"每个表的列信息"存在 MySQL 里。
  • OpenSearch,当在 OM 网页里搜索"用户表"时,是 OpenSearch 在帮忙快速查找。

因此,OM Server 启动前,MySQL 必须已经准备好,数据库迁移必须在 Server 启动前完成(迁移就是创建 OM 需要的表结构)。此外,GX 检查前,OM 必须知道有这张表(否则 GX 的结果无处存放)

数据库迁移

例如以下场景:开发了一个应用,第一版只需要一张 users 表。第二版加了 orders 表。第三版改了 users 表的结构,加了一个 phone 列。怎么让数据库从第一版升级到第三版?手动执行 SQL 脚本?那如果生产环境已经是第二版了呢?

数据库迁移就是一套自动化的数据库版本管理机制。每个版本对应一个迁移脚本,系统会记录当前是哪个版本,然后自动执行未运行的脚本。

版本          迁移脚本内容
-----------   ---------------------------
V001          CREATE TABLE users (...)
V002          CREATE TABLE orders (...)
V003          ALTER TABLE users ADD phone VARCHAR(20)

在 OM 启动过程中可能会出现如下错误,这说明数据库里缺少某些表。必须先运行迁移,才能启动服务。

ERROR: There are pending migrations to be run on the database.

OM 官方提供了一个 openmetadata/db:1.12.0 镜像,但这个镜像有个问题:它里面已经创建了一些表(Flowable 工作流引擎的 ACT_* 表)。

当运行迁移时,迁移脚本会尝试创建这些表,发现表已存在就报错:

Table 'ACT_GE_PROPERTY' already exists

我们不用官方 db 镜像,改用标准空的 MySQL 镜像(如 mysql:8.2),让迁移脚本从头创建所有表。

OpenMetadata 的数据模型层级

OM 用"层级"来组织数据的元信息。就像文件夹结构一样。OM 的层级是:

DatabaseService(数据库服务)└── Database(数据库)└── DatabaseSchema(模式)└── Table(表)└── Column(列)

为什么要有这么多层级?

  • DatabaseService:可能有好几个数据库(PostgreSQL、MySQL、Oracle),每个都是一个 Service
  • Database:一个 PostgreSQL 里可以有好几个库(testdb、production、analytics)
  • Schema:PostgreSQL 的概念,类似文件夹,把表分组(public 是默认 schema)
  • Table:就是具体的表
  • Column:表的列

每一层都有名字,组合起来就是 FQN,唯一标识一个实体,表的 FQN = test-postgres.testdb.public.users= 服务名.库名.schema名.表名

GX 推送检查结果到 OM 时,必须告诉 OM "这是哪张表的检查结果"。用的就是 FQN。

Great Expectations 核心概念

flowchart LRA[数据] --> B[Checkpoint]B --> C[执行 Expectations]C --> D[生成结果]D --> E[Action: 推送到 OM]

Expectation

就是一条数据质量规则。例如:

  • ExpectColumnValuesToNotBeNull:列数据不能为空
  • ExpectColumnValuesToBeUnique:列数据不可重复
  • ExpectColumnValuesToMatchRegex:列数据符合正则格式

ExpectationSuite是一组期望的集合。比如针对 users 表,我们有 3 个期望:

suite = ExpectationSuite(name="users_suite")
suite.add_expectation(ExpectationValuesToNotBeNull(column="email"))
suite.add_expectation(ExpectColumnValuesToBeUnique(column="id"))
suite.add_expectation(ExpectColumnValuesToMatchRegex(column="email", regex=".*@.*"))

Checkpoint

执行检查的入口。它的作用是:

  1. 连接数据库,读取数据
  2. 运行期望套件里的所有期望
  3. 把结果交给 Action 处理

Action

检查完成后的后续操作。常用的 Action:

  • ExpectationStore: 把结果保存到本地文件
  • OpenMetadataValidationAction1xx: 把结果推送到 OpenMetadata

GX 的工作流程用代码理解如下

# 第一步:准备数据(告诉 GX 数据在哪里)
data_source = context.data_sources.add_postgres(name="my_db", connection_string="...")
data_asset = data_source.add_table_asset(name="my_table", table_name="users")
batch_definition = data_asset.add_batch_definition_whole_table("full_table")# 第二步:准备规则(告诉 GX 检查什么)
suite = context.suites.add(ExpectationSuite(name="my_suite"))
suite.add_expectation(ExpectColumnValuesToNotBeNull(column="email"))# 第三步:绑定数据和规则
validation = context.validation_definitions.add(ValidationDefinition(name="my_validation", data=batch_definition, suite=suite)
)# 第四步:执行检查
checkpoint = context.checkpoints.add(Checkpoint(name="my_checkpoint", validation_definitions=[validation], actions=[...])
)
result = checkpoint.run()

多数据源与血缘关系

前面的例子只有一个 PostgreSQL。但现实中,数据往往分散在多个系统,不同数据源需要不同的处理方式:

数据源类型 处理工具 原因
关系数据库(MySQL/PG/Oracle) DataX 结构化数据,直接同步
对象存储(S3/MinIO 文件) Spark 需要解析文件格式、处理非结构化数据
消息队列(Kafka) Flink 实时流处理,低延迟
复杂转换逻辑 Spark/dbt SQL 处理不了的场景

注意:S3 文件需要经过 Spark 解析才能入库。S3 中存放的是原始文件(CSV、JSON、Parquet),不能直接变成数仓表,必须通过 Spark 读取、解析格式、清洗后再写入。

graph LRsubgraph "数据源层"MYSQL[(MySQL<br/>业务库)]PG[(PostgreSQL<br/>分析库)]S3[(S3/MinIO<br/>文件)]KAFKA[Kafka<br/>消息队列]endsubgraph "ETL 工具层(真正干活的)"DATAX[DataX<br/>数据同步]SPARK[Spark<br/>批处理]FLINK[Flink<br/>实时流]endsubgraph "调度层(指挥官)"AIRFLOW[Airflow<br/>调度编排]endsubgraph "数据仓库层"DWH[(数据仓库<br/>ClickHouse)]endAIRFLOW -->|调度触发| DATAXAIRFLOW -->|调度触发| SPARKAIRFLOW -->|调度触发| FLINKMYSQL -->|结构化同步| DATAXPG -->|结构化同步| DATAXDATAX -->|写入| DWHS3 -->|文件解析| SPARKSPARK -->|写入| DWHKAFKA -->|实时流| FLINKFLINK -->|写入| DWHstyle AIRFLOW fill:#fff9c4style DATAX fill:#c8e6c9style SPARK fill:#c8e6c9style FLINK fill:#c8e6c9

Airflow编排任务

Airflow 不做数据抽取,它只负责调度。核心工作是:

  1. 定时触发任务(每天凌晨 2 点运行)
  2. 调用真正的 ETL 工具(DataX、Spark、Flink 等)
  3. 监控任务状态(成功/失败/重试)
  4. 发送告警(任务失败时通知)

一个典型的 Airflow DAG 示例如下

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperatorwith DAG("daily_etl") as dag:# 任务1:调用 DataX 同步 MySQL 数据sync_mysql = BashOperator(task_id="sync_mysql_to_clickhouse",bash_command="python /opt/datax/bin/datax.py /jobs/mysql_to_ch.json")# 任务2:调用 Spark 做数据清洗process_data = SparkSubmitOperator(task_id="spark_clean",application="/jobs/clean_data.py")# 任务3:运行质量检查quality_check = BashOperator(task_id="quality_check",bash_command="cd /app && uv run python run_checkpoint.py")# 任务顺序sync_mysql >> process_data >> quality_check

解决了任务的编排,对于复杂的数据流管道,下面的问题是:

  1. 数据从 A 流到 B,质量检查应该在哪做?
  2. A 的数据有问题,会影响到 B 吗?
  3. 多个数据源,检查规则怎么统一管理?

多数据源检查方案

业界最佳实践是在数据生命周期的不同阶段设置检查点:

flowchart TBsubgraph "源头检查"S1[MySQL 入库前检查]S2[API 数据校验]endsubgraph "传输检查"T1[ETL 行数对比<br/>源 vs 目标]T2[数据完整性校验]endsubgraph "消费检查"C1[数据仓库表检查]C2[报表指标监控]endS1 --> T1S2 --> T1T1 --> C1C1 --> C2

源头检查,不让脏数据进入系统,在应用层做校验,数据写入数据库前拦截:

# 在应用写入数据库前检查
def create_user(name, email):if not email or "@" not in email:raise ValueError("邮箱格式错误")db.insert("users", {"name": name, "email": email})

传输检查,确保数据完整传输

# 方式一:SQL 聚合对比(最轻量)
def check_data_integrity(source_conn, target_conn, table_name):with source_conn.cursor() as src, target_conn.cursor() as tgt:src.execute(f"SELECT COUNT(*) FROM {table_name}")tgt.execute(f"SELECT COUNT(*) FROM {table_name}")if src.fetchone()[0] != tgt.fetchone()[0]:return {"passed": False, "error": "行数不一致"}# 关键指标对比src.execute(f"SELECT SUM(amount) FROM {table_name}")tgt.execute(f"SELECT SUM(amount) FROM {table_name}")if src.fetchone() != tgt.fetchone():return {"passed": False, "error": "汇总值不一致"}return {"passed": True}# 方式二:DataDiff 工具(逐行差异检查)
from datadiff import diff_tables
diff = diff_tables(source_url, target_url, table_name="orders", key_columns=["id"])

消费检查,确保分析用正确数据。使用 GX + OM 定期检查数据仓库中的表。

suite = ExpectationSuite(name="users_suite")
suite.add_expectation(ExpectationValuesToNotBeNull(column="email"))

与 Airflow 集成实现完整 ETL和质量检查流程示例如下,实现质量检查的分层职责

flowchart LRA[MySQL 源表] -->|源头检查 GX| B{检查通过?}B -->|通过| C[数据同步]B -->|不通过| D[阻断 + 告警]C --> E[传输检查<br/>SQL 聚合对比]E -->|通过| F[ClickHouse 目标表]E -->|不通过| G[告警 + 人工介入]F --> H[消费检查<br/>定期 GX + OM]

示例代码

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperatordef run_checkpoint(source, table):"""运行 GX 检查并推送到 OM"""import great_expectations as gx# ... checkpoint 代码 ...def check_row_count():"""传输检查:对比源和目标的行数"""source_count = mysql.query("SELECT COUNT(*) FROM orders")target_count = clickhouse.query("SELECT COUNT(*) FROM orders")if source_count != target_count:raise Exception(f"行数不一致: {source_count} vs {target_count}")with DAG("daily_etl", schedule_interval="@daily") as dag:# 1. 源头检查(GX)source_check = PythonOperator(task_id="source_quality_check",python_callable=run_checkpoint,op_kwargs={"source": "mysql_prod", "table": "orders"})# 2. 数据同步sync_data = BashOperator(task_id="sync_orders",bash_command="python /opt/datax/bin/datax.py /jobs/orders.json")# 3. 传输检查(SQL 聚合,不用 GX)transfer_check = PythonOperator(task_id="transfer_check",python_callable=check_row_count)# 4. 消费检查(GX + OM)sink_check = PythonOperator(task_id="sink_quality_check",python_callable=run_checkpoint,op_kwargs={"source": "clickhouse_dwh", "table": "orders"})# 任务顺序:源头检查 → 同步 → 传输检查 → 消费检查source_check >> sync_data >> transfer_check >> sink_check

数据血缘与质量追溯

血缘就是记录数据"从哪来、到哪去"的关系:

订单表(MySQL) ──ETL──→ 订单宽表(数仓) ──SQL──→ 销售报表│                      │                    │└── 数据有问题 ────────┴── 会传播到 ────────┘

如果"订单表"的 amount 列有负数,这个问题会传播到所有下游:

  • 订单宽表的 total_amount 会出错
  • 销售报表的"总销售额"会偏低

OM 中的血缘

graph LRA[users<br/>MySQL] -->|INSERT INTO| B[user_stats<br/>PostgreSQL]B -->|CREATE VIEW| C[daily_report<br/>ClickHouse]A -.->|质量检查失败| D[告警]D -.->|影响分析| BD -.->|影响分析| C

users 表质量检查失败时,OM 会提示:"下游影响:user_stats、daily_report 可能受影响"

如何在 OM 中建立血缘?通常OM 可以自动解析血缘(例如 dbt 管理 SQL 转换,)

-- models/user_stats.sql
SELECT u.id,u.name,COUNT(o.id) as order_count
FROM {{ ref('users') }} u      -- dbt 知道依赖 users
LEFT JOIN {{ ref('orders') }} o ON u.id = o.user_id
GROUP BY u.id, u.name

OM 摄取 dbt 的 manifest.json 后自动建立血缘:users → user_statsorders → user_stats

在集成Airflow场景中,OM 还可以解析 Airflow DAG 中的 SQL,自动推断血缘。

环境搭建

OpenSearch 需要设置内核参数设置

echo "vm.max_map_count=262144" | sudo tee -a /etc/sysctl.conf
sudo sysctl -p

OpenSearch 内部使用一种叫"mmapfs"的文件系统来存储索引数据。vm.max_map_count 控制进程可以创建的内存映射区域数量。默认值(通常是 65530)太小,OpenSearch 启动时会报错:

max virtual memory areas vm.max_map_count [65530] is too low

Docker Compose 配置

创建一个配置文件来管理所有 OM 组件:

services:# OM 的数据库mysql:image: mysql:8.2container_name: om-mysqlrestart: unless-stoppedenvironment:MYSQL_ROOT_PASSWORD: passwordMYSQL_USER: openmetadata_userMYSQL_PASSWORD: openmetadata_passwordMYSQL_DATABASE: openmetadata_dbports:- "3307:3306"volumes:- om-db-data:/var/lib/mysqlhealthcheck:test: ["CMD", "mysqladmin", "ping", "-h", "localhost"]interval: 10stimeout: 5sretries: 10networks:- om-net# OM 的搜索引擎opensearch:image: opensearchproject/opensearch:2.19.1container_name: om-opensearchrestart: unless-stoppedenvironment:- discovery.type=single-node- DISABLE_SECURITY_PLUGIN=true- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"- cluster.name=open-metadataports:- "9202:9200"volumes:- om-es-data:/usr/share/opensearch/datahealthcheck:test: ["CMD-SHELL", "curl -s http://localhost:9200/_cluster/health | grep -q '\"status\":\"green\\|yellow\"'"]interval: 10stimeout: 5sretries: 10networks:- om-net# OM 服务器server:image: docker.getcollate.io/openmetadata/server:1.12.0container_name: om-serverrestart: unless-stoppeddepends_on:mysql:condition: service_healthyopensearch:condition: service_healthyenvironment:- DB_HOST=mysql- DB_PORT=3306- DB_USER=openmetadata_user- DB_USER_PASSWORD=openmetadata_password- OM_DATABASE=openmetadata_db- SEARCH_TYPE=opensearch- ELASTICSEARCH_HOST=opensearch- ELASTICSEARCH_PORT=9200- ELASTICSEARCH_SCHEME=http- AUTHENTICATION_PROVIDER=basic- OM_HEAP_SIZE=1Gports:- "8585:8585"healthcheck:test: ["CMD-SHELL", "curl -s http://localhost:8585/api/v1/system/version"]interval: 30stimeout: 10sretries: 15start_period: 180snetworks:- om-net# 测试用的 PostgreSQLtest-postgres:image: postgres:15container_name: test-postgresrestart: unless-stoppedenvironment:POSTGRES_PASSWORD: postgresPOSTGRES_DB: testdbports:- "5433:5432"networks:- om-netnetworks:om-net:driver: bridgevolumes:om-db-data:om-es-data:

一键启动所有服务

docker-compose -f docker-compose.om.yml up -d

首次启动时执行openmetadata的迁移命令

# 运行迁移(首次必须)
docker run --rm \--network om-net \-e DB_HOST=mysql \-e DB_PORT=3306 \-e DB_USER=openmetadata_user \-e DB_USER_PASSWORD=openmetadata_password \-e OM_DATABASE=openmetadata_db \-e SEARCH_TYPE=opensearch \-e ELASTICSEARCH_HOST=opensearch \-e ELASTICSEARCH_PORT=9200 \-e ELASTICSEARCH_SCHEME=http \-e AUTHENTICATION_PROVIDER=basic \docker.getcollate.io/openmetadata/server:1.12.0 \./bootstrap/openmetadata-ops.sh migrate

迁移脚本会打印很多日志,关键是看有没有报错:

SERVER_CHANGE_LOG table doesn't exist yet, will run all migrations
[MigrationWorkFlow] Migration Run started for Version: 0.0.0
[MigrationWorkFlow] Migration Run finished for Version: 0.0.0
...
[MigrationWorkFlow] Migration Run finished for Version: 1.12.0
Successfully completed

如果看到 Table 'ACT_GE_PROPERTY' already exists,说明用了错误的 MySQL 镜像。检查 docker images 确认使用的是 mysql:8.2

创建测试数据

# 进入 PostgreSQL 容器
docker exec -it test-postgres psql -U postgres -d testdb# 创建表
CREATE TABLE users (id SERIAL PRIMARY KEY,name VARCHAR(100),email VARCHAR(100),created_at TIMESTAMP DEFAULT NOW()
);# 插入数据
INSERT INTO users (name, email) VALUES('Alice', 'alice@example.com'),('Bob', 'bob@example.com'),('Charlie', 'charlie@example.com'),('Diana', 'diana@example.com'),('Eve', 'eve@example.com'),('Frank', 'frank@example.com'),('Grace', 'grace@example.com');

安装python依赖

uv init
uv add great_expectations psycopg2-binary
uv add "openmetadata-ingestion[great-expectations-1xx]"

连接 OpenMetadata

OM 使用 JWT 令牌认证。需要先登录获取令牌:

echo -n "admin" | base64 # 输出:YWRtaW4=
curl -X POST http://localhost:8585/api/v1/users/login \-H "Content-Type: application/json" \-d '{"email": "admin@open-metadata.org", "password": "YWRtaW4="}' \| jq -r '.accessToken'

创建配置文件

# 文件:om_config/config.yml(注意是 .yml 不是 .yaml)
hostPort: http://localhost:8585/api
authProvider: basic
apiVersion: v1
securityConfig:jwtToken: "<刚才获取的令牌>"

文件名必须是 config.yml(不是 config.yaml)。如果文件名错了,GX 运行时会打印警告:

WARNING - Template file at config.yml not found

securityConfig 只接受 jwtToken 字段。如果你写 usernamepassword,会触发 Pydantic 校验错误:

ValidationError: Extra inputs are not permitted

注册表元数据

这是最关键的一步,很多人在这里踩坑。

GX 推送结果到 OM 时,会:

  1. 用 FQN 查找 OM 中的表
  2. 如果找到,把测试结果挂在这张表下面
  3. 如果找不到,静默跳过(不报错,但也没有结果)

所以必须先在 OM 里创建实体

import requestsBASE = "http://localhost:8585/api/v1"resp = requests.post(f"{BASE}/users/login",json={"email": "admin@open-metadata.org", "password": "YWRtaW4="}
)
headers = {"Authorization": f"Bearer {resp.json()['accessToken']}"}# 1. 创建数据库服务
print("创建 DatabaseService...")
resp = requests.post(f"{BASE}/services/databaseServices",headers=headers,json={"name": "test-postgres","serviceType": "Postgres","connection": {"config": {"type": "Postgres","hostPort": "test-postgres:5432","database": "testdb"}}}
)# 2. 创建数据库
print("创建 Database...")
resp = requests.post(f"{BASE}/databases",headers=headers,json={"name": "testdb", "service": "test-postgres"}
)
print(f"  状态: {resp.status_code}")# 3. 创建 Schema
print("创建 Schema...")
resp = requests.post(f"{BASE}/databaseSchemas",headers=headers,json={"name": "public", "database": "test-postgres.testdb"}
)
print(f"  状态: {resp.status_code}")# 4. 创建表(注意 VARCHAR 要指定长度)
print("创建 Table...")
resp = requests.post(f"{BASE}/tables",headers=headers,json={"name": "users","databaseSchema": "test-postgres.testdb.public","columns": [{"name": "id", "dataType": "INT"},{"name": "name", "dataType": "VARCHAR", "dataLength": 100},{"name": "email", "dataType": "VARCHAR", "dataLength": 100},{"name": "created_at", "dataType": "TIMESTAMP"}]}
)
print(f"  状态: {resp.status_code}")

运行:

uv add requests
uv run python register_table.py

创建表时如果看到这个错误:

{"code":400,"message":"For column data types char, varchar, binary, varbinary dataLength must not be null"}

说明 VARCHAR 列没有指定长度。OM 要求所有变长类型必须提供 dataLength

{"name": "email", "dataType": "VARCHAR", "dataLength": 100}

验证查询表是否存在

curl -s "http://localhost:8585/api/v1/tables/name/test-postgres.testdb.public.users" \-H "Authorization: Bearer $TOKEN" | jq '.name'

编写数据质量检查脚本

# 文件:run_checkpoint.py
import os
os.environ['GX_ANALYTICS_ENABLED'] = 'False'
os.environ['GE_ANALYTICS_ENABLED'] = 'False'import great_expectations as gx
from metadata.great_expectations.action1xx import OpenMetadataValidationAction1xx# 1. 创建上下文(GX 的入口)
context = gx.get_context()# 2. 连接数据库
data_source = context.data_sources.add_postgres(name="test_datasource",connection_string="postgresql+psycopg2://postgres:postgres@test-postgres:5432/testdb"
)# 3. 指定要检查的表
data_asset = data_source.add_table_asset(name="users_table",table_name="users",schema_name="public"
)
batch_definition = data_asset.add_batch_definition_whole_table("users_batch")# 4. 定义期望规则
suite = context.suites.add(gx.core.expectation_suite.ExpectationSuite(name="users_suite")
)# 规则1:邮箱不能为空
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="email")
)# 规则2:ID 必须唯一
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="id")
)# 规则3:邮箱格式必须正确
suite.add_expectation(gx.expectations.ExpectColumnValuesToMatchRegex(column="email",regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
)# 5. 绑定数据和规则
validation_definition = context.validation_definitions.add(gx.core.validation_definition.ValidationDefinition(name="users_validation",data=batch_definition,suite=suite)
)# 6. 配置推送到 OM 的 Action
actions = [OpenMetadataValidationAction1xx(database_service_name="test-postgres",database_name="testdb",schema_name="public",table_name="users",config_file_path="om_config/",)
]# 7. 创建检查点
checkpoint = context.checkpoints.add(gx.checkpoint.checkpoint.Checkpoint(name="users_checkpoint",validation_definitions=[validation_definition],actions=actions)
)# 8. 执行
result = checkpoint.run()
print(f"\n检查结果: {'通过' if result.success else '失败'}")
print(f"详细结果: {result}")

运行

uv run python run_checkpoint.py

成功输出示例

Calculating Metrics: 100%|██████████| 24/24
Checkpoint result: True

验证结果

检查 OM 中的测试用例

# 获取令牌
TOKEN=$(curl -s -X POST http://localhost:8585/api/v1/users/login \-H "Content-Type: application/json" \-d '{"email": "admin@open-metadata.org", "password": "YWRtaW4="}' \| jq -r '.accessToken')# 查看测试用例
curl -s "http://localhost:8585/api/v1/dataQuality/testCases?limit=10" \-H "Authorization: Bearer $TOKEN" | jq '.data[].name'

期望输出如下

"expect_column_values_to_not_be_null"
"expect_column_values_to_be_unique"
"expect_column_values_to_match_regex"

打开 Web 界面访问数据质控页面

image

http://www.jsqmd.com/news/879888/

相关文章:

  • Windows下JMeter高并发压测端口耗尽问题排查与修复
  • CPT 强化学习完整实现(PyTorch 版 - Actor-Critic + CPT)
  • 2026年装修季必看!专业明装暖气怎么选的实用攻略来了 - 资讯纵览
  • 从救援受阻事故案例,看无感定位技术普及的迫切意义
  • m4s-converter终极指南:3步解锁B站缓存视频的离线观看自由
  • 如何免费解锁Wand专业版功能:Wand-Enhancer完整使用指南
  • 六音音源修复版:三步解决洛雪音乐播放失效问题
  • DeepSeek计费策略终极对比:RPM限制、上下文长度溢价、多模态附加费,一文讲透
  • 在Windows 10上从零开始:手把手教你安装和运行TELEMAC-MASCARET V8P4水动力模型
  • BooruDatasetTagManager:如何用AI智能标注工具将图像数据集处理效率提升10倍
  • Claude Code用户如何通过Taotoken解决API不稳定与Token不足问题
  • 2026 北京包包回收实测:上门回收估价 vs 线下实体店,哪个更划算 - 奢侈品回收测评
  • 对比直接使用原厂API,Taotoken在计费透明性上给我们的感受
  • 等保2.0三级Linux服务器合规基线重建实战指南
  • 终极指南:让老旧Mac免费升级最新macOS系统的完整方案
  • 新沂沙发翻新换皮换布面靠谱商家优选推荐|匠阁沙发翻新、御匠沙发翻新、锦修沙发翻新三大品牌、全品类沙发翻新换皮换布一站式服务 - 卓一科技
  • 纯视觉破界空间感知 自研体系领跑视频孪生领域
  • 5分钟搞定Sunshine游戏串流:从安装到畅玩的完整指南
  • 2026年5月有实力的电磁阀厂家推荐钢特阀门科技有限公司,优化产品结构提升流体调控效能 - 品牌鉴赏师
  • Python Anaconda,为什么要创建虚拟环境,Pycharm使用
  • 因果推断与双机器学习在LED制造返工决策中的实战应用
  • Gemini企业社会责任实践白皮书(2024独家解密版):覆盖AI伦理、碳足迹追踪与社区赋能的3层合规架构
  • 夏季前挡膜怎么选?固驰蓝闪幻蝶车窗膜给出不止隔热的答案
  • 3分钟快速找回Navicat数据库连接密码:开源解密工具完整教程
  • DeepSeek模型版本选择实战手册(2024最新版):从推理延迟、显存占用到LoRA兼容性全拆解
  • 抖音无水印视频解析工具:3分钟搭建你的个人视频素材库
  • 量子机器学习模型鲁棒性验证:VeriQR工具原理与应用实战
  • AI构建的Python学习路线
  • 3个场景告诉你:为什么你需要PowerToys Text Extractor
  • 告别笔记本续航焦虑:手把手教你用NVMe电源管理给SSD“降频省电”