当前位置: 首页 > news >正文

Flink-SQL经过过滤-解析-去重-聚合计算写入到MySQL表

数据源来自于Kafka的Json结构数据,数据结构为源头不断更新的小时报表,Flink的任务是处理计算并将结果输出到MySQL中。代码如下:

-- Kafka源表:账户级报表
CREATE TEMPORARY TABLE kafka_account_hour_report (
`data` STRING,
`log_date` AS JSON_VALUE(`data`,'$.log_date'),
`hour_id` AS JSON_VALUE(`data`,'$.hour_id'),
`biz_code` AS JSON_VALUE(`data`,'$.bizCode'),
`ad_pv` AS JSON_VALUE(`data`,'$.ad_pv'),
`click` AS JSON_VALUE(`data`,'$.click'),
`charge` AS JSON_VALUE(`data`,'$.charge'),
`car_num` AS JSON_VALUE(`data`,'$.car_num'),
`date` VARCHAR(20),
`hour` VARCHAR(20),
`brandId` VARCHAR(64),
`accountId` VARCHAR(64),
`isBatchEnd` INT,
`offset` INT NOT NULL METADATA VIRTUAL,
`my_part` BIGINT NOT NULL METADATA FROM 'partition',
`my_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`my_date` AS CAST(`my_time` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka-sever1:9092,kafka-server2:9092,kafka-server3:9092',
'properties.group.id' = 'flink_group',
'topic' = 'account_hour_report',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
-- 结果表:品牌层级小时报表
CREATE TEMPORARY TABLE mysql_brand_report_hour (
`brand_id` VARCHAR(32) COMMENT '品牌ID',
`date_hour` INT COMMENT '日期时间(YYYYMMDDHH)',
`platform_id` VARCHAR(32) COMMENT '平台ID',
`cost` DECIMAL(20,4) COMMENT '花费',
`show_num` BIGINT COMMENT '曝光量',
`click_num` BIGINT COMMENT '点击量',
PRIMARY KEY (`brand_id`,`date_hour`,`platform_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = 'host_name',
'port' = '3306',
'username' = 'mysql_user',
'password' = 'password',
'database-name' = 'db_name',
'table-name' = 'ads_brand_report_hour'
);
-- 账户数据解析
CREATE TEMPORARY VIEW view_account_report_ori AS
SELECT
TO_DATE(FROM_UNIXTIME(CAST(`log_date` AS BIGINT)/1000,'yyyy-MM-dd')) AS stat_date,
LPAD(`hour_id`,2,'0') AS stat_hour,
`brandId` AS brand_id,
`accountId` AS account_id,
`biz_code` AS biz_code,
`isBatchEnd` AS batch_end,
CAST(`charge` AS DECIMAL(20,5)) AS cost,
CAST(`ad_pv` AS INT) AS show_num,
CAST(`click` AS INT) AS click_num,
CONCAT(SUBSTR(`date`,1,4),SUBSTR(`date`,6,2),SUBSTR(`date`,9,2)) AS batch_date,
`hour` AS batch_hour,
my_time
FROM kafka_account_hour_report
WHERE FROM_UNIXTIME(CAST(`log_date` AS BIGINT)/1000,'yyyy-MM-dd')>=
DATE_FORMAT(TIMESTAMPADD(HOUR,-1,LOCALTIMESTAMP),'yyyy-MM-dd') AND `isBatchEnd`=0;
-- 去重并汇总作为小时报中间表
CREATE TEMPORARY VIEW view_brand_report_stg AS
SELECT
stat_date,
brand_id,
batch_date,
batch_hour,
IFNULL(SUM(show_num),0) AS show_num,
IFNULL(SUM(click_num),0) AS click_num,
IFNULL(SUM(cost),0) AS cost
FROM
(
SELECT *,ROW_NUMBER() OVER(PARTITION BY stat_date,brand_id,account_id,biz_code,
batch_date,batch_hour ORDER BY my_time DESC) AS rn
FROM view_account_report_ori t
) t
WHERE rn=1
GROUP BY stat_date,brand_id,batch_date,batch_hour;
-- 小时报结果
CREATE TEMPORARY VIEW view_brand_report_res AS
SELECT
brand_id,
CAST(CONCAT(batch_date,batch_hour) AS INT) AS date_hour,
'1003' AS platform_id,
ROUND(cost,4) AS cost,
show_num,
click_num
FROM view_brand_report_stg;
-- Sink 开始
BEGIN STATEMENT SET;
-- 插入小时报 --
INSERT INTO mysql_brand_report_hour
SELECT
brand_id,
date_hour,
platform_id,
cost,
show_num,
click_num
FROM view_brand_report_res;
END;
-- Sink结束

以上程序实现了从Kafka源表(主题/Topic为account_hour_report)消费数据,然后进行过滤、解析、去重、聚合等计算,最终将结果写入到MySQL结果表ads_brand_report_hour中。

http://www.jsqmd.com/news/19961/

相关文章:

  • 2025年超声波清洗机厂家联系电话推荐:精选推荐与使用指南。
  • PICO FIDO 使用教程
  • 2025年10月低空经济核心公司对比评测榜:赛飞特领衔全链条方案
  • MySQLDay2
  • 2025年10月GEO优化推荐:高性价比解决方案市场报告
  • 2025年10月祛斑产品推荐榜:仙瑟传明酸领衔全维度对比
  • 2025年诺士诚公司:权威解析全过程咨询竞争力与风险
  • 2025年10月GEO优化推荐:全平台同步优化榜单与避坑指南
  • 2025年10月医美项目后用什么产品推荐榜:五款修护精华对比评测
  • 2025年10月敏感肌可用美白产品推荐榜:温和淡斑实力排行
  • 2025年仙瑟品牌权威深度解析:揭秘其皮肤护理创新与市场领导地位揭秘
  • 2025年仙瑟传明酸精华液权威盘点:敏感肌多通路美白的临床级解读
  • 2025年仙瑟传明酸精华液权威盘点:敏感肌多通路美白的临床级证据链
  • 2025年仙瑟传明酸精华液权威解析:敏感肌多通路美白的临床级证据链
  • 2025年10月无功补偿装置厂家推荐榜:权威对比与选购指南
  • 2025年仙瑟传明酸精华液权威解析:多通路美白修护的临床级证据链
  • 2025年10月geo优化供应商推荐:主流排行榜全解析
  • 小米机械键盘TKL如何进入蓝牙配对模式?
  • 2025年10月全过程工程咨询公司推荐榜:权威评测五强对比
  • 2025年10月geo优化供应商推荐:全维度对比与可验证选择指南
  • 2025.10.23博客
  • 一款智能手表上语音通话时的音频设备动态切换
  • 题解:P10257 [COCI 2023/2024 #5] Zlagalica
  • AI代码辅助工具标准使用场景
  • CRM的终结与重生:基于SOP的一体化企业协同新范式
  • 实用指南:Coze源码分析-资源库-删除数据库-后端源码-流程/核心技术/总结
  • 2025年10月北京GEO优化公司推荐:主流公司列表评测指南
  • 2025年10月北京geo优化公司推荐:排行榜与避坑指南
  • Qt/C++实现无人机监控系统/航点实时监控系统/集群地面站管理平台/飞行轨迹规划和模拟
  • 【GitHub每日速递 251023】46.1k star, 1.2B参数逆袭!MinerU2.5成最牛文档解析多模态大模型