【FlinkSQL笔记】(二)Flink SQL 基础语法详解
一、数据类型(常用)
Flink SQL 数据类型和MySQL高度兼容,常用类型如下:
STRING:字符串(对应MySQL varchar)
INT:整型
BIGINT:长整型(计数、时间戳常用)
DOUBLE:浮点型
TIMESTAMP(3):高精度时间戳(实时任务必备,保留3位毫秒)
BOOLEAN:布尔值
二、核心建表语法(重点Kafka源表为主)
Kafka是Flink实时任务最核心数据源,以下是生产通用标准模板,可直接复用:
-- 实时数据源:读取Kafka JSON数据CREATETABLEkafka_source(vin STRING,-- 车辆唯一标识online_statusINT,-- 在线状态high_voltage STRING,-- 高压状态event_timeTIMESTAMP(3),-- 数据产生时间(核心!用于开窗、水印)-- 水印定义:容忍5秒数据乱序,生产通用配置WATERMARKFORevent_timeASevent_time-INTERVAL'5'SECOND)WITH('connector'='kafka','topic'='vehicle_realtime_data',-- 你的Kafka主题名'properties.bootstrap.servers'='127.0.0.1:9092',-- Kafka地址'properties.group.id'='flink_sql_consumer_01',-- 消费者组'format'='json',-- 数据格式:JSON'scan.startup.mode'='latest'-- 启动消费位置:最新数据);参数说明:
connector:指定数据源类型,固定kafka
topic:需要消费的Kafka主题名称
bootstrap.servers:Kafka集群地址端口
group.id:消费者组,自定义不重复即可,用于记录消费偏移量
format:数据序列化格式,企业99%为json
scan.startup.mode:启动规则
latest:从当前最新数据开始消费(生产默认)
earliest:从头消费所有历史数据(测试用)
三、 结果表建表语法(数据输出)
用于将实时计算结果,写入Kafka、MySQL等存储,模板如下:
-- 结果输出表:写入KafkaCREATETABLEkafka_sink(vin STRING,online_durationBIGINT,alert_timeTIMESTAMP(3),alert_msg STRING)WITH('connector'='kafka','topic'='vehicle_alert_result','properties.bootstrap.servers'='127.0.0.1:9092','format'='json');四、Flink SQL 常用查询语法(和标准SQL基本一致)
1、 数据过滤 WHERE
SELECT*FROMkafka_sourceWHEREonline_status=1-- 只筛选在线车辆ANDhigh_voltage!='01';-- 筛选未上高压车辆2、字段选取、别名
SELECTvinAScar_no,online_status,event_timeAScreate_timeFROMkafka_source;3、分组聚合 GROUP BY
SELECTvin,COUNT(*)ASdata_count,-- 单车辆上报次数MAX(online_status)ASmax_statusFROMkafka_sourceGROUPBYvin;4、去重 DISTINCT
SELECTDISTINCTvinFROMkafka_source;