当前位置: 首页 > news >正文

使用Flink分析用户Clickstream数据并构建可视化面板的数据管道实践

构建完整的实时数据分析管道,实现点击流数据的实时采集、处理和可视化,涵盖 Schema Registry 原理、Flink 时间窗口机制等。

总体架构如下

flowchart LRsubgraph 数据生产A[Java Producer<br/>Avro 序列化]endsubgraph 消息队列B[Kafka 3.9.0<br/>KRaft 模式]C[Schema Registry<br/>Confluent 7.4.4]endsubgraph 流处理D[Flink 1.20<br/>JobManager]E[Flink 1.20<br/>TaskManager]endsubgraph 存储&可视化F[OpenSearch 2.19]G[Dashboards]endA -->|Avro| BA -->|Schema| CB -->|Consumer Group| DD -->|RPC| EC -->|Deserialize| EE -->|Bulk Index| FF -->|Visualize| G

核心数据流实现了自动注册schema registry并生产和消费avro数据

sequenceDiagramparticipant P as Producerparticipant K as Kafkaparticipant SR as Schema Registryparticipant F as Flinkparticipant OS as OpenSearchP->>SR: 注册 SchemaSR-->>P: Schema IDP->>K: 发送 Avro 消息K->>F: Consumer 拉取F->>SR: 获取 Schema 反序列化F->>F: Session Window 聚合F->>F: Tumbling Window 聚合F->>OS: Bulk Index 写入

生产和消费者参考官方示例仓库开源项目源码

# Producer:模拟电商用户点击行为
git clone https://github.com/aws-samples/clickstream-producer-for-apache-kafka.git# Flink Processor:流处理聚合逻辑
git clone https://github.com/aws-samples/flink-clickstream-processor-msk.git

Schema Registry 原理详解

Schema Registry 是 Kafka 生态系统中的元数据管理服务,提供 RESTful 接口存储和检索 Avro、JSON Schema、Protobuf 等数据格式的 Schema 定义。

核心角色功能如下图所示

flowchart TBsubgraph Producer端A[Java/Python应用] --> B[KafkaAvroSerializer]B --> C{Schema已注册?}C -- 否 --> D[注册新Schema]C -- 是 --> E[获取Schema ID]D --> Eendsubgraph SchemaRegistryF[(_schemas Topic Kafka存储)]G[REST API:8081]H[Schema ID映射表]G --> Hendsubgraph Consumer端I[KafkaAvroDeserializer] --> J[提取Schema ID]J --> K[获取Schema定义]K --> L[反序列化数据]endE --> FF --> Istyle Producer端 fill:#e3f2fd,stroke:#2196f3style SchemaRegistry fill:#f3e5f5,stroke:#9c27b0style Consumer端 fill:#e8f5e9,stroke:#4caf50

Schema Registry 使用 Kafka 作为底层存储,特殊的 Topic _schemas 作为高可用的预写日志(Write-Ahead Log)。存储内容如下

Key: <subject-name>
Value: {"subject": "ExampleTopic-value","version": 1,"id": 2,"schema": "{\"type\":\"record\",\"name\":\"ClickEvent\"...}"
}

Magic Byte 与消息编码格式

Confluent Schema Registry 定义了标准的消息编码格式,每条 Avro 消息前缀 5 字节元数据:

字节布局:
┌────────┬────────────────────────┬─────────────────────────┐
│ Byte 0 │ Bytes 1-4              │ Bytes 5+                │
│ Magic  │ Schema ID              │ Avro Serialized Data    │
│ 0x00   │ (4-byte int)           │ (variable length)       │
└────────┴────────────────────────┴─────────────────────────┘示例消息解析:
原始数据: 00 00 00 00 02 1866.249.1.114...│  └──────┘  └─────────────────│       │            └── Avro 编码的 ClickEvent│       └── Schema ID = 2└── Magic Byte (版本号)

Magic Byte (0x00) 的作用:

  1. 格式标识:标识这是 Confluent Schema Registry 的序列化格式
  2. 版本预留:当前版本为 0,为未来格式变更预留空间
  3. 向后兼容承诺:Confluent 保证在同一 magic byte 版本内,格式不会以向后不兼容的方式变更

当 Flink Consumer 尝试反序列化非 Avro 格式的消息(如 echo "test" | kafka-console-producer 发送的纯文本),会触发 Unknown data format 错误:

java.io.IOException: Failed to deserialize consumer record
Caused by: java.io.IOException: Unknown data format. Magic number does not match

这是因为 Consumer 期望第一个字节是 0x00,但实际收到的是 ASCII 字符(如 t = 0x74)。

Schema Evolution

Schema Registry 支持 Schema 演进(Schema Evolution),允许 Producer 和 Consumer 独立升级数据格式。

兼容性模式:

模式 定义 允许操作 适用场景
BACKWARD 新 Schema 可读旧数据 添加可选字段、删除字段 消费者先升级
FORWARD 旧 Schema 可读新数据 添加有默认值字段 生产者先升级
FULL 双向兼容 添加可选字段、删除字段 独立升级
NONE 无检查 任意修改 开发环境

更详细内容可参考https://www.cnblogs.com/peacemaple/p/20017661的兼容性模式部分解释

序列化与反序列化

Producer 端序列化

// KafkaProducerFactory.java - 配置 Avro 序列化器
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"
);
// 去哪里找 Avro 的 Schema 注册中心(Schema Registry)
producerProps.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,  "http://localhost:8081"
);

序列化内部流程,生产者发送 Avro 格式消息时,会自动把数据结构(Schema)注册到Schema Registry,同时从这里获取 Schema 来序列化数据。

sequenceDiagramparticipant App as 应用程序participant Ser as KafkaAvroSerializerparticipant SR as Schema Registryparticipant Kafka as Kafka BrokerApp->>Ser: send(topic, ClickEvent object)Ser->>Ser: 检查本地缓存alt Schema ID 未缓存Ser->>SR: POST /subjects/{topic}-value/versionsSR->>SR: 兼容性检查SR-->>Ser: Schema ID (e.g., 2)Ser->>Ser: 缓存 IDendSer->>Ser: Avro 序列化数据Ser->>Ser: 添加前缀 [0x00, Schema ID]Ser->>Kafka: 发送字节数组

Consumer 端反序列化,反序列化内部流程:

  1. 读取消息前 5 字节
  2. byte[0] = 0x00 (magic byte)
  3. byte[1-4] = Schema ID (big-endian int)
  4. 向 Schema Registry 请求 Schema 定义
  5. 使用 Avro 解码剩余字节
// Flink KafkaSource 配置
KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder().setValueOnlyDeserializer(new KafkaAvroDeserializer<>(ClickEvent.class, schemaRegistryUrl)).build();

实际使用示例Schema 定义(ClickEvent.avsc)

{"namespace": "samples.clickstream.avro","type": "record","name": "ClickEvent","fields": [{"name": "ip", "type": "string"},{"name": "eventtimestamp", "type": "long"},{"name": "devicetype", "type": "string"},{"name": "event_type", "type": ["string", "null"]},{"name": "product_type", "type": ["string", "null"]},{"name": "userid", "type": "int"},{"name": "globalseq", "type": "long"},{"name": "prevglobalseq", "type": "long", "default": 0}]
}

Schema 注册验证

# 查看已注册的 Schema
curl http://localhost:8081/subjects
# ["ExampleTopic-key", "ExampleTopic-value"]# 获取 Schema 详情
curl http://localhost:8081/subjects/ExampleTopic-value/versions/latest
{"type": "record","name": "ClickEvent","namespace": "samples.clickstream.avro","fields": [{"name": "ip","type": "string"},{"name": "eventtimestamp","type": "long"},{"name": "devicetype","type": "string"},{"name": "event_type","type": ["string", "null"]},{"name": "product_type","type": ["string", "null"]},{"name": "userid","type": "int"},{"name": "globalseq","type": "long"},{"name": "prevglobalseq","type": "long","default": 0}]
}

Docker Compose 配置

Producer 在宿主机运行时,Kafka 返回 kafka:9092 作为 broker 地址,导致宿主机无法解析容器主机名。

org.apache.kafka.common.errors.TimeoutException
Failed to send record to Kafka after 5 retries
No data format. Magic number does not match

Kafka 的 ADVERTISED_LISTENERS 配置决定了 broker 向客户端返回的连接地址。当 Producer 从宿主机连接时:

  1. 客户端先连接 localhost:9092
  2. Broker 返回 PLAINTEXT://kafka:9092
  3. 客户端尝试连接 kafka:9092,但宿主机无法解析

通过配置双 Listener来解决

# docker-compose.yml - Kafka 服务配置
kafka:image: apache/kafka:3.9.0ports:- "9092:9094"  # 宿主机 9092 映射到容器 9094environment:# INTERNAL: 容器间通信# EXTERNAL: 宿主机访问KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9092KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

注意事项:

  1. PLAINTEXT listener 用于容器内部通信(Schema Registry、Flink)
  2. EXTERNAL listener 用于宿主机 Producer 访问
  3. 端口映射要对应:宿主机 9092 映射到 容器 9094
  4. Schema Registry 和 Flink 仍使用 kafka:9092

完整 Docker Compose 文件如下

services:# Kafkakafka:image: apache/kafka:3.9.0container_name: kafkahostname: kafkaports:- "9092:9094"environment:KAFKA_NODE_ID: 1KAFKA_PROCESS_ROLES: broker,controllerKAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,EXTERNAL://localhost:9092KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLERKAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXTKAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"healthcheck:test: ["CMD-SHELL", "opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092"]interval: 10stimeout: 10sretries: 5# Schema Registryschema-registry:image: confluentinc/cp-schema-registry:7.4.4depends_on:kafka:condition: service_healthyports:- "8081:8081"environment:SCHEMA_REGISTRY_HOST_NAME: schema-registrySCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092  # 使用内部 listener# Flink JobManagerflink-jobmanager:image: flink:1.20-java17ports:- "8082:8081"command: jobmanagerenvironment:FLINK_PROPERTIES: |jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 4volumes:- ./flink-jobs:/opt/flink/usrlib# Flink TaskManagerflink-taskmanager:image: flink:1.20-java17depends_on:flink-jobmanager:condition: service_healthycommand: taskmanagerenvironment:FLINK_PROPERTIES: |jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 4# OpenSearchopensearch:image: opensearchproject/opensearch:2.19.1ports:- "9200:9200"environment:discovery.type: single-nodeDISABLE_SECURITY_PLUGIN: "true"  # 简化本地开发# OpenSearch Dashboardsopensearch-dashboards:image: opensearchproject/opensearch-dashboards:2.19.1ports:- "5601:5601"environment:OPENSEARCH_HOSTS: '["http://opensearch:9200"]'DISABLE_SECURITY_DASHBOARDS_PLUGIN: "true"

Producer 改造

配置 Confluent Schema Registry 依赖,项目使用 Confluent Schema Registry 进行 Avro 序列化。

kafka-avro-serializer替换为 Confluent

<dependency><groupId>io.confluent</groupId><artifactId>kafka-avro-serializer</artifactId><version>7.4.4</version>
</dependency>

Avro Schema 定义

{"type": "record","name": "ClickEvent","namespace": "samples.clickstream.avro","fields": [{"name": "ip", "type": "string"},{"name": "eventtimestamp", "type": "long"},{"name": "devicetype", "type": "string"},{"name": "event_type", "type": ["string", "null"]},{"name": "product_type", "type": ["string", "null"]},{"name": "userid", "type": "int"},{"name": "globalseq", "type": "long"},{"name": "prevglobalseq", "type": "long", "default": 0}]
}

用户行为模拟逻辑

Producer 模拟电商用户会话的马尔可夫链状态转移,模拟真实用户的浏览和购买行为:

stateDiagram-v2[*] --> home_page: 用户进入home_page --> product_catalog: 浏览目录product_catalog --> product_detail: 查看商品product_detail --> add_to_cart: 加入购物车add_to_cart --> order: 下单order --> order_checkout: 结账order_checkout --> [*]: 完成购买add_to_cart --> remove_from_cart: 取消商品remove_from_cart --> product_catalog: 继续浏览

状态转移实现代码

// Events.java - 核心事件生成逻辑
class Events {// 设备类型池private static final String[] deviceType = {"mobile", "computer", "tablet"};// 产品类型池private static final String[] productTypeOptions = {"cell phones", "laptops", "ear phones", "soundbars", "cd players", "AirPods", "video games", "cameras"};/*** 生成单个用户的事件序列* @param kafkaProducer Kafka 生产者* @param userID 用户 ID*/void genEvents(Producer<String, ClickEvent> kafkaProducer, Integer userID) {String userDeviceType = deviceType[rand.nextInt(deviceType.length)];String userIP = "66.249.1." + rand.nextInt(255);String previousEventType = null;String previousProductType = null;// 循环生成事件,直到事件类型为空(会话结束)do {ClickEvent event = genUserEvent(userID, userDeviceType, previousEventType, previousProductType, userIP, previousGlobalSeqNo);previousEventType = event.getEventType().toString();previousProductType = event.getProductType().toString();previousGlobalSeqNo = event.getGlobalseq();// 发送到 KafkakafkaProducer.send(new ProducerRecord<>(topic, userID.toString(), event),(metadata, e) -> {if (e != null) {logger.error("发送失败", e);errorCount.incrementAndGet();} else {eventCount.incrementAndGet();}});} while (!event.getEventType().toString().equals("") && errorCount.get() < 1);}/*** 根据当前状态生成下一个事件*/private ClickEvent genUserEvent(Integer userId, String userDeviceType, String previousEventType, String previousProductType, String userIP, Long previousGlobalSeqNo) {String eventType;String productType;if (previousEventType == null) {// 首个事件:首页eventType = "home_page";productType = "N/A";} else {// 根据上一事件类型确定下一事件eventType = nextEventType(previousEventType);productType = nextProductType(previousProductType, eventType);}return ClickEvent.newBuilder().setIp(userIP).setEventtimestamp(System.currentTimeMillis()).setDevicetype(userDeviceType).setEventType(eventType.isEmpty() ? null : eventType).setProductType(productType.equals("N/A") ? null : productType).setUserid(userId).setGlobalseq(counter.incrementAndGet()).setPrevglobalseq(previousGlobalSeqNo).build();}/*** 状态转移逻辑:马尔可夫链*/private String nextEventType(String previousEventType) {switch (previousEventType) {case "home_page":return rand.nextBoolean() ? "product_catalog" : "";case "product_catalog":return rand.nextBoolean() ? "product_detail" : "home_page";case "product_detail":double r = rand.nextDouble();if (r < 0.3) return "add_to_cart";if (r < 0.6) return "product_catalog";return "";case "add_to_cart":return rand.nextBoolean() ? "order" : "remove_from_cart";case "remove_from_cart":return rand.nextBoolean() ? "product_catalog" : "";case "order":return rand.nextBoolean() ? "order_checkout" : "";case "order_checkout":return "";  // 购买完成,会话结束default:return "";}}
}

该方法实现了一个有限状态机(FSM),模拟真实用户的电商购物行为路径。核心思想:

返回空字符串 "" = 会话结束(用户离开或购买完成)
返回事件名称 = 继续生成下一个事件

逐状态解析规则

  1. 每个用户会话生成多条事件,形成完整的购买链路
  2. 会话结束条件:eventType 为空字符串或发送出错
  3. 全局序列号 (globalseq) 保证事件顺序唯一性
  4. 时间戳使用 System.currentTimeMillis() 模拟事件时间
事件类型 含义 概率转移
home_page 首页 50% → product_catalog, 50% → 结束
product_catalog 浏览目录 50% → product_detail, 50% → home_page
product_detail 商品详情 30% → add_to_cart, 30% → product_catalog, 40% → 结束
add_to_cart 加入购物车 50% → order, 50% → remove_from_cart
remove_from_cart 移出购物车 50% → product_catalog, 50% → 结束
order 下单 50% → order_checkout, 50% → 结束
order_checkout 结账 100% → 结束(购买完成)

从 Flink 1.8.2 升级到 1.20.1,KafkaSource 配置如下

// ClickstreamProcessor.java
KafkaSource<ClickEvent> source = KafkaSource.<ClickEvent>builder().setBootstrapServers("kafka:9092").setTopics("ExampleTopic").setGroupId("flink-clickstream-processor").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new KafkaAvroDeserializer<>(ClickEvent.class, schemaRegistryUrl)).build();DataStream<ClickEvent> events = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source");

出现Jackson 版本冲突问题,StreamWriteConstraints 类从 Jackson 2.15.0 开始引入。OpenSearch Sink 和 Flink 内部使用的 Jackson 版本冲突:

java.io.IOException: Failed to deserialize consumer record
Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/core/StreamWriteConstraints$Defaults

在 pom.xml 强制指定 Jackson BOM

<dependencyManagement><dependencies><dependency><groupId>com.fasterxml.jackson</groupId><artifactId>jackson-bom</artifactId><version>2.15.2</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement><dependencies><!-- 显式声明版本 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.15.2</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version></dependency>
</dependencies>

时间窗口聚合

Flink 的窗口机制是流处理的核心概念,用于在无界数据流上定义有限的数据集进行计算。

窗口类型对比

flowchart LRsubgraph T [滚动窗口 Tumbling]direction TBT1[0-10s] --> T2[10-20s] --> T3[20-30s]style T1 fill:#e1f5festyle T2 fill:#e8f5e9style T3 fill:#fff3e0endsubgraph S [滑动窗口 Sliding]direction TBS1[0-10s] --> S2[5-15s] --> S3[10-20s]style S1 fill:#e1f5festyle S2 fill:#e1f5fe,stroke-dasharray: 5 5style S3 fill:#e8f5e9endsubgraph SS [会话窗口 Session]direction TBSS1[事件1-3] -.->|gap| SS2[事件4-6] -.->|gap| SS3[事件7]style SS1 fill:#e1f5festyle SS2 fill:#e8f5e9style SS3 fill:#fff3e0endT ~~~ S ~~~ SS
窗口类型 特点 适用场景 本项目应用
Tumbling 固定大小、不重叠 定时统计 部门点击计数(10s)
Sliding 固定大小、可重叠 移动平均 未使用
Session 动态大小、按活动分组 用户行为分析 用户会话(1s gap)

事件时间语义示意图如下,具体请参考https://www.cnblogs.com/peacemaple/p/20128616

sequenceDiagramparticipant Source as Kafka Sourceparticipant Watermark as WatermarkGeneratorparticipant Window as Window Operatorparticipant Sink as OpenSearch SinkSource->>Source: 提取 eventtimestampSource->>Watermark: 更新最大时间戳Watermark->>Watermark: 计算 Watermark = max - latenessNote over Watermark: Watermark 用于处理迟到数据alt 事件时间 > WatermarkSource->>Window: 正常处理,进入窗口else 事件时间 <= WatermarkSource->>Window: 迟到数据,丢弃或侧输出endWindow->>Window: 等待 Watermark 超过窗口结束Window->>Sink: 触发窗口计算,输出结果

窗口触发机制如下

// 1. Session Window: 用户会话聚合
events.keyBy(ClickEvent::getUserid)  // 按 userId 分组.window(EventTimeSessionWindows.withGap(Time.seconds(1)))// gap = 1s: 如果连续两个事件间隔超过 1 秒,视为新会话.process(new SessionAggregator());// 2. Tumbling Window: 部门点击统计
events.keyBy(e -> e.getProductType())  // 按产品类型分组.window(TumblingEventTimeWindows.of(Time.seconds(10)))// 每 10 秒一个窗口,窗口之间不重叠.aggregate(new DepartmentAggregator());

Watermark 策略

// WatermarkStrategy 配置
WatermarkStrategy<ClickEvent> watermarkStrategy = WatermarkStrategy.<ClickEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2))  // 允许 2 秒迟到.withTimestampAssigner((event, timestamp) -> event.getEventtimestamp()).withIdleness(Duration.ofSeconds(10));  // 空闲超时DataStream<ClickEvent> events = env.fromSource(kafkaSource, watermarkStrategy, "kafka-source");

OpenSearch Sink

示例如下:

  1. OpenSearch 2.x client 使用 Request objects(IndexRequest),不同于 ES 7.x 的 IndexRequest
  2. DISABLE_SECURITY_PLUGIN=true 简化本地开发,生产环境需要配置认证
  3. Bulk flush 参数影响延迟和吞吐量
OpenSearchSink<String> opensearchSink = OpenSearchSink.<String>builder().setHosts("http://opensearch:9200").setConnectionUsername("").setConnectionPassword("").setBulkFlushMaxActions(100).setBulkFlushInterval(5000).setEmitter((element, context, indexer) -> {indexer.add(IndexRequest.of(ir -> ir.index("user_session_details").document(element)));}).build();

启动与验证

启动所有服务

docker-compose up -d

创建 Kafka Topics

docker exec kafka /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 \--create --topic ExampleTopic --partitions 3 --replication-factor 1

创建索引映射

# user_session_details - 用户会话详情
curl -X PUT "http://localhost:9200/user_session_details" -H 'Content-Type: application/json' -d '{"mappings": {"properties": {"userId": {"type": "integer"},"eventCount": {"type": "integer"},"orderCheckoutEventCount": {"type": "integer"},"deptList": {"type": "keyword"},"windowBeginTime": {"type": "date"},"windowEndTime": {"type": "date"}}}
}'# departments_count - 部门点击统计
curl -X PUT "http://localhost:9200/departments_count" -H 'Content-Type: application/json' -d '{"mappings": {"properties": {"departmentName": {"type": "keyword"},"departmentCount": {"type": "integer"},"windowBeginTime": {"type": "date"},"windowEndTime": {"type": "date"}}}
}'# user_session_counts - 会话汇总统计
curl -X PUT "http://localhost:9200/user_session_counts" -H 'Content-Type: application/json' -d '{"mappings": {"properties": {"userSessionCount": {"type": "integer"},"userSessionCountWithOrderCheckout": {"type": "integer"},"percentSessionswithBuy": {"type": "float"},"windowBeginTime": {"type": "date"},"windowEndTime": {"type": "date"}}}
}'

上传 Flink JAR

curl -X POST -F "jarfile=@flink-jobs/ClickstreamProcessor-1.0-SNAPSHOT.jar" \http://localhost:8082/jars/upload

提交 Flink Job

curl -X POST "http://localhost:8082/jars/<jar-id>/run"

查看运行状态

image

启动 Producer

java -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar \--propertiesFilePath producer.properties \--numThreads 4 --numberOfUsers 100 --runFor 120 --noDelay

验证数据流

# 1. Schema Registry - 确认 Schema 已注册
curl http://localhost:8081/subjects | jq '.'
# 预期输出: ["ExampleTopic-key", "ExampleTopic-value"]# 2. Kafka - 确认消息存在(Avro 格式)
docker exec kafka /opt/kafka/bin/kafka-console-consumer.sh \--bootstrap-server localhost:9092 \--topic ExampleTopic --from-beginning --max-messages 1
# 预期输出: 乱码(Avro 二进制),说明数据存在# 3. Flink Consumer Group - 确认消费完成
docker exec kafka /opt/kafka/bin/kafka-consumer-groups.sh \--bootstrap-server localhost:9092 \--describe --group flink-clickstream-processorGROUP                       TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
flink-clickstream-processor ExampleTopic    0          83550           83550           0               -               -               -
flink-clickstream-processor ExampleTopic    2          87172           87172           0               -               -               -
flink-clickstream-processor ExampleTopic    1          86511           86511           0               -               -               -# 4. OpenSearch - 确认数据写入
curl http://localhost:9200/user_session_details/_count | jq '.count'
curl http://localhost:9200/departments_count/_count | jq '.count'
curl http://localhost:9200/user_session_counts/_count | jq '.count'

Dashboards 可视化

OpenSearch Dashboards 需要先创建索引模式才能对数据进行可视化分析。

# 创建 user_session_details 索引模式
curl -X POST "http://localhost:5601/api/saved_objects/index-pattern" \-H "Content-Type: application/json" \-H "osd-xsrf: true" \-d '{"attributes": {"title": "user_session_details","timeFieldName": "windowBeginTime"}}'# 创建 departments_count 索引模式
curl -X POST "http://localhost:5601/api/saved_objects/index-pattern" \-H "Content-Type: application/json" \-H "osd-xsrf: true" \-d '{"attributes": {"title": "departments_count", "timeFieldName": "windowBeginTime"}}'# 创建 user_session_counts 索引模式
curl -X POST "http://localhost:5601/api/saved_objects/index-pattern" \-H "Content-Type: application/json" \-H "osd-xsrf: true" \-d '{"attributes": {"title": "user_session_counts","timeFieldName": "windowBeginTime"}}'

验证索引模式创建成功

# 列出所有索引模式
curl -s "http://localhost:5601/api/saved_objects/_find?type=index-pattern" | jq '.saved_objects[].attributes.title'# 预期输出:
# "user_session_details"
# "departments_count"  
# "user_session_counts"

创建可视化图表

打开 Visualize 界面,左侧菜单点击 VisualizeCreate visualization

部门点击分布饼图

  1. 选择 "Pie" 图表类型
  2. 选择索引模式: departments_count
  3. 配置 Metrics(指标):
    • Aggregation: Count(默认,统计文档数量)
    • 或选择 Sum,Field: departmentCount(统计总点击数)
  4. 配置 Buckets(分桶)- 点击 "Add" 添加:
    • Split Slices
    • Aggregation: Terms
    • Field: departmentName(现在应该在下拉列表中可见)
    • Order By: Metric: Count
    • Order: Descending
    • Size: 10
  5. 点击右上角 "Update" 预览图表
  6. 点击 "Save" 保存为 "部门点击分布"

用户会话统计柱状图

配置步骤:

  1. 选择 "Vertical Bar" 图表类型
  2. 选择索引模式: user_session_counts
  3. 配置 Metrics(指标):
    • Y-axis
    • Aggregation: Average(平均值)
    • Field: userSessionCount
    • Custom label: "平均会话数"
  4. 配置 Buckets(分桶)- 点击 "Add" 添加:
    • X-axis
    • Aggregation: Date Histogram
    • Field: windowBeginTime
    • Interval: Auto(或手动选择如 1 hour)
  5. 点击 "Update" 预览
  6. 保存为 "用户会话趋势"

用户会话详情表格

配置步骤:

  1. 选择 "Data Table" 图表类型
  2. 选择索引模式: user_session_details
  3. 配置 Metrics(指标):
    • Aggregation: Count(默认)
  4. 配置 Buckets(分桶)- 点击 "Add" 添加:
    • Split Rows
    • Aggregation: Terms
    • Field: userId
    • Order By: Metric: Count
    • Order: Descending
    • Size: 100
  5. 如果需要显示更多列,可以添加多个 Metrics:
    • 点击 Metrics 下方 "Add Metric"
    • Aggregation: Average
    • Field: eventCount
  6. 点击 "Update" 预览表格
  7. 保存为 "用户会话详情"

最终效果如下

image

http://www.jsqmd.com/news/880179/

相关文章:

  • ChatGPT融资路演PPT全链路复盘:从技术叙事到估值锚点,98%初创团队忽略的3个合规雷区与2套可复用话术模板
  • 2026Q2优质手拉葫芦厂家盘点|全品类全覆盖 行业实力品牌优选 - 品牌智鉴榜
  • 线上获客选SEO还是GEO优化
  • 2026年Hermes Agent/OpenClaw怎么部署?阿里云弹性部署及Token Plan配置
  • Codeforces Round 1058
  • SOF对柴油机SCR系统NOx转化效率影响分析【附程序】
  • SpringBoot 实现 DOCX 转 PDF
  • 阴阳师自动化脚本终极指南:一键解放双手,轻松享受游戏乐趣
  • 144-基于Flask的电商超市数据可视化分析系统
  • 避坑指南:Ubuntu 22.04换源后sudo apt update报错?手把手教你排查和修复
  • UnrealPakViewer:虚幻引擎Pak文件分析终极可视化工具
  • 2026济南财税机构怎么选?主流财税服务商测评与企业合规避坑指南 - 品牌智鉴榜
  • P1313 计算系数【洛谷算法习题】
  • 2026免费一键去图片水印App详细教程,哪个好用一看就会
  • 国内医养家具品牌排行:聚焦专业适配与人文关怀 - 互联网科技品牌测评
  • 高校教务系统DES加密登录逆向实战:从抓包到Python自动化
  • 20252914 2025-2026-2 《网络攻防实践》第8次作业
  • 国内学校家具厂家实力排行 实测资质与交付表现 - 互联网科技品牌测评
  • Pikachu暴力破解实战:Burp Suite爆破思维训练全解析
  • 2026会所家具厂家排行:定制适配与品质实测盘点 - 互联网科技品牌测评
  • C#中实现左侧折叠导航菜单的示例代码
  • CSS背景效果完全指南
  • 国内别墅家具厂家权威排行:品质与服务核心对比 - 互联网科技品牌测评
  • 国内酒店家具品牌排行:实测定制与供货能力综合对比 - 互联网科技品牌测评
  • OpenSSH用户枚举漏洞CVE-2018-15473深度解析与修复指南
  • OpenSSH ssh-agent动态链接劫持漏洞CVE-2023-38408深度修复指南
  • Flutter国际化与本地化完全指南
  • 事业单位办公家具厂家排行 实测资质与交付能力 - 互联网科技品牌测评
  • AWVS 25.5 Windows版深度部署指南:CVE精准验证与DevSecOps集成
  • Linux端口敲门原理与knockd实战部署指南