当前位置: 首页 > news >正文

【FlinkSQL笔记】(二)Flink SQL 基础语法详解

一、数据类型(常用)

Flink SQL 数据类型和MySQL高度兼容,常用类型如下:

  • STRING:字符串(对应MySQL varchar)

  • INT:整型

  • BIGINT:长整型(计数、时间戳常用)

  • DOUBLE:浮点型

  • TIMESTAMP(3):高精度时间戳(实时任务必备,保留3位毫秒)

  • BOOLEAN:布尔值

二、核心建表语法(重点Kafka源表为主)

Kafka是Flink实时任务最核心数据源,以下是生产通用标准模板,可直接复用:

-- 实时数据源:读取Kafka JSON数据CREATETABLEkafka_source(vin STRING,-- 车辆唯一标识online_statusINT,-- 在线状态high_voltage STRING,-- 高压状态event_timeTIMESTAMP(3),-- 数据产生时间(核心!用于开窗、水印)-- 水印定义:容忍5秒数据乱序,生产通用配置WATERMARKFORevent_timeASevent_time-INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='vehicle_realtime_data',-- 你的Kafka主题名'properties.bootstrap.servers'='127.0.0.1:9092',-- Kafka地址'properties.group.id'='flink_sql_consumer_01',-- 消费者组'format'='json',-- 数据格式:JSON'scan.startup.mode'='latest'-- 启动消费位置:最新数据);

参数说明:

  • connector:指定数据源类型,固定kafka

  • topic:需要消费的Kafka主题名称

  • bootstrap.servers:Kafka集群地址端口

  • group.id:消费者组,自定义不重复即可,用于记录消费偏移量

  • format:数据序列化格式,企业99%为json

  • scan.startup.mode:启动规则

    • latest:从当前最新数据开始消费(生产默认)

    • earliest:从头消费所有历史数据(测试用)

三、 结果表建表语法(数据输出)

用于将实时计算结果,写入Kafka、MySQL等存储,模板如下:

-- 结果输出表:写入KafkaCREATETABLEkafka_sink(vin STRING,online_durationBIGINT,alert_timeTIMESTAMP(3),alert_msg STRING)WITH('connector'='kafka','topic'='vehicle_alert_result','properties.bootstrap.servers'='127.0.0.1:9092','format'='json');

四、Flink SQL 常用查询语法(和标准SQL基本一致)

1、 数据过滤 WHERE

SELECT*FROMkafka_sourceWHEREonline_status=1-- 只筛选在线车辆ANDhigh_voltage!='01';-- 筛选未上高压车辆

2、字段选取、别名

SELECTvinAScar_no,online_status,event_timeAScreate_timeFROMkafka_source;

3、分组聚合 GROUP BY

SELECTvin,COUNT(*)ASdata_count,-- 单车辆上报次数MAX(online_status)ASmax_statusFROMkafka_sourceGROUPBYvin;

4、去重 DISTINCT

SELECTDISTINCTvinFROMkafka_source;
http://www.jsqmd.com/news/863110/

相关文章:

  • Apifox压测模块深度解析:接口定义、场景编排与实时监控一体化
  • Unity地形Mesh草刷不上?底层限制与4种生产级解决方案
  • 3步解密网易云NCM音乐完整指南:高效实现跨平台播放自由
  • Unity集成DeepSeek AI对话的工程实践与避坑指南
  • SQL注入原理与sqlmap实战:从手工验证到自动化渗透
  • Unity低多边形资源包实战指南:POLYGON Knights深度解析
  • 空洞骑士模组管理器Scarab:高效管理你的游戏模组世界
  • 百度网盘高速下载终极指南:使用baidu-wangpan-parse突破限速
  • Python C扩展安全测试:Fuzzing+ASan+UBSan实战指南
  • Apifox压测功能如何替代JMeter实现高效接口性能测试
  • Unity VR开发环境配置避坑指南:从OpenXR初始化到Quest真机部署
  • 终极C盘瘦身指南:FreeMove一键释放Windows磁盘空间的完整教程
  • Unity传送门特效实现原理与渲染管线适配指南
  • Appium环境搭建与元素定位的底层原理与实战避坑指南
  • 如何在Blender中实现3D打印文件的无缝转换:终极3MF插件指南 [特殊字符]
  • 3步实现专业级直播效果:OBS背景移除插件完全指南
  • VR控制器编程:重构输入控制实现跨设备低延迟交互
  • Unity VR控制器输入控制重构:从延迟优化到语义分层
  • 会话管理:创建、切换、删除对话历史
  • 3步轻松实现炉石佣兵战记自动化:告别重复劳动的游戏助手
  • Unity背包系统实战:JSON配置+对象池+像素级UI优化
  • 书面沟通的5C原则
  • 基于平行素数对等腰梯形网格拓扑的完备性证明哥德巴赫猜想1+1
  • Unity背包系统实战:数据建模、UI性能与网络同步三位一体设计
  • 基于CentOS7.9部署的LAMP(2)——安装部署WordPress及Discuz
  • 思迈特SmartBI白泽V5正式发布 企业级Agent BI加速规模化落地
  • 使用 IndexedDB 在客户端存储对话记录
  • EC2 M3 Ultra Mac 实例实战:28 核 256GB 跑 12 路并行 Simulator 测试
  • GitHub中文界面插件架构解析与实战指南
  • 哥德巴赫猜想1+1基于平行素数对等腰梯形网格拓扑与素数渐近密度的大偶数满填充完备性证明