当前位置: 首页 > news >正文

flink代码最佳实践 - --

总体代码流程

  1. 获取一个执行环境
  2. 加载/创建初始数据
  3. 指定数据上的转换
  4. 指定计算结果放在哪里
  5. 触发程序执行

获取执行环境

通常使用固定的获取方式,根据建议定义变量为final:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

加载数据源

flink 1.15以后推荐使用新的API:fromSource:

kafka source:

简单读取string:

KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("kafka-broker:9092").setTopics("input-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStream<String> data = env.fromSource(source,WatermarkStrategy.noWatermarks(), // 指定水印策略"Kafka Source"
);

如果要在加载时转换为POJO,需要自定义序列化器:

样例数据:

{"id": 231762,"config_type": "JAVA服务-AutoMq","indicator_name": "kafka_log_end_offset_cold","instance_id": "kf-gan8h3m70kry7cgu","job_id": "4","request_time": "2026-02-02 11:38:34","response_data": "{\"message\":{\"traceId\":\"69801c3a00000000c0a221254446fb0c\",\"code\":200,\"success\":true,\"errorCode\":\"\",\"message\":\"\",\"content\":{\"data\":[{\"index_store_type\":\"\",\"query_progress\":{\"scanned_compressed_bytes\":0,\"nanos_to_finish\":0,\"total_rows\":0,\"scanned_uncompressed_bytes\":0,\"total_compressed_bytes\":0,\"total_percentage\":0,\"nanos_from_submitted\":0,\"total_uncompressed_bytes\":0,\"scanned_rows\":0,\"nanos_from_started\":0},\"cost\":\"\",\"next_cursor_time\":0,\"query_status\":\"\",\"index_names\":\"\",\"scan_completed\":false,\"async_id\":\"\",\"query_type\":\"\",\"sample\":0,\"is_cross_ws\":true,\"is_running\":false,\"is_data_latency\":false,\"next_cursor_token\":\"\",\"datasource\":\"\",\"series\":[{\"columns\":[\"time\",\"max(sum(kafka_log_end_offset{instance_id=~\\\"kf-gan8h3m70kry7cgu\\\",topic=~\\\"geea3_rawdata_prod\\\"}) - sum(kafka_group_commit_offset{instance_id=~\\\"kf-gan8h3m70kry7cgu\\\",topic=~\\\"geea3_rawdata_prod\\\",consumer_group=~\\\"flink-paimon-prod-0\\\"}), 0)\"],\"values\":[[1770003454156,71852]],\"column_names\":[\"time\",\"max(sum(kafka_log_end_offset{instance_id=~\\\"kf-gan8h3m70kry7cgu\\\",topic=~\\\"geea3_rawdata_prod\\\"}) - sum(kafka_group_commit_offset{instance_id=~\\\"kf-gan8h3m70kry7cgu\\\",topic=~\\\"geea3_rawdata_prod\\\",consumer_group=~\\\"flink-paimon-prod-0\\\"}), 0)\"]}],\"interval\":0,\"column_names\":[\"max(sum(kafka_log_end_offset{instance_id=~\\\"kf-gan8h3m70kry7cgu\\\",topic=~\\\"geea3_rawdata_prod\\\"}) - sum(kafka_group_commit_offset{instance_id=~\\\"kf-gan8h3m70kry7cgu\\\",topic=~\\\"geea3_rawdata_prod\\\",consumer_group=~\\\"flink-paimon-prod-0\\\"}), 0)\"],\"scan_index\":\"\",\"window\":0,\"complete\":false,\"index_name\":\"\"}],\"declaration\":{\"business\":\"\",\"organization\":\"default_private_organization\"}}}}","dt": "20260202"
}

原始POJO:


import java.io.Serializable;


/**
* Kafka消息POJO类
* 字段名与JSON中的key保持一致(使用下划线命名)
*/
public class KafkaMessage implements Serializable {
private static final long serialVersionUID = 1L;

private Long id;
private String config_type;
private String indicator_name;
private String instance_id;
private String job_id;
private String request_time;
private String response_data; // 保持JSON字符串,后续处理
private String dt;

// 必须有无参构造函数
public KafkaMessage() {}

// 简洁版getter/setter(生产环境建议用Lombok)
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }

public String getConfig_type() { return config_type; }
public void setConfig_type(String config_type) { this.config_type = config_type; }

public String getIndicator_name() { return indicator_name; }
public void setIndicator_name(String indicator_name) { this.indicator_name = indicator_name; }

public String getInstance_id() { return instance_id; }
public void setInstance_id(String instance_id) { this.instance_id = instance_id; }

public String getJob_id() { return job_id; }
public void setJob_id(String job_id) { this.job_id = job_id; }

public String getRequest_time() { return request_time; }
public void setRequest_time(String request_time) { this.request_time = request_time; }

public String getResponse_data() { return response_data; }
public void setResponse_data(String response_data) { this.response_data = response_data; }

public String getDt() { return dt; }
public void setDt(String dt) { this.dt = dt; }

@Override
public String toString() {
return String.format("KafkaMessage{id=%d, instance='%s', indicator='%s', dt='%s'}",
id, instance_id, indicator_name, dt);
}
}

 

序列化类:


import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;


/**
* Kafka消息反序列化器(只处理value部分)
* 使用Flink内置Jackson,无需额外依赖
*/
public class KafkaMessageDeserializer implements DeserializationSchema<KafkaMessage> {
private static final long serialVersionUID = 1L;

// 使用静态Holder模式实现单例ObjectMapper
private static final class ObjectMapperHolder {
static final ObjectMapper INSTANCE = createObjectMapper();

private static ObjectMapper createObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
// 忽略JSON中未知字段(防止字段变化导致解析失败)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// 允许空字符串转换为null
mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);
return mapper;
}
}

@Override
public KafkaMessage deserialize(byte[] message) throws IOException {
try {
// 直接使用单例ObjectMapper反序列化
return ObjectMapperHolder.INSTANCE.readValue(message, KafkaMessage.class);
} catch (Exception e) {
// 返回错误标记的消息,避免任务失败
KafkaMessage errorMessage = new KafkaMessage();
errorMessage.setId(-1L);
errorMessage.setIndicatorName("parse_error");
// 保留原始消息前100字符便于排查
String rawMsg = new String(message, "UTF-8");
errorMessage.setResponseData(rawMsg.length() > 100 ?
rawMsg.substring(0, 100) + "..." : rawMsg);
return errorMessage;
}
}

@Override
public boolean isEndOfStream(KafkaMessage nextElement) {
return false; // Kafka流是无限的
}

@Override
public TypeInformation<KafkaMessage> getProducedType() {
return TypeInformation.of(KafkaMessage.class);
}

// 可选:open方法用于初始化指标等
@Override
public void open(InitializationContext context) {
// 可以在这里初始化监控指标
// context.getMetricGroup().counter("deserialize_count");
}
}

 

创建source:

KafkaSource.<KafkaMessage>builder().setBootstrapServers("localhost:9092")  // 生产环境替换为实际地址.setTopics("monitoring-data-topic")  // 你的监控数据主题.setGroupId("flink-monitoring-group")// 使用value-only反序列化器(只处理value部分).setValueOnlyDeserializer(new KafkaMessageDeserializer())// 从最新位置开始消费
            .setStartingOffsets(OffsetsInitializer.latest())// 重要:设置偏移量提交方式(与检查点绑定).setProperty("commit.offsets.on.checkpoint", "true")// 动态发现新分区(每分钟).setProperty("partition.discovery.interval.ms", "60000")// 消费者配置.setProperty("fetch.max.wait.ms", "500")  // 拉取最大等待时间.setProperty("fetch.min.bytes", "1")  // 最小拉取字节数.setProperty("max.poll.records", "500")  // 每次拉取最大记录数.build();

 

http://www.jsqmd.com/news/337673/

相关文章:

  • 递归函数 - 练习4
  • 导师推荐8个降AIGC网站,千笔AI帮你轻松降AI率
  • Linux 启动流程和启动过程中的故障定位
  • 逐行拆解Nginx事件管道:1147行代码背后的高性能设计,从ngx_event_pipe源码看如何实现零拷贝代理
  • 从此告别拖延,AI论文平台千笔AI VS 锐智 AI
  • 盐城本地生活增长引擎:三十六行全域代运营破局增长 - 野榜数据排行
  • JAVA:JRE免安装
  • 2026最新最全!网络安全学习路线规划
  • 收藏级RAG详解|从基础到进阶,小白也能看懂的大模型检索增强生成技术(附发展历程+适用场景)
  • [STM32L5] 【STM32L562E-DK测评活动】by clever:06 使用毫米波制作的生物体监测
  • Yandex广告投放效果怎么样?B2B外贸品牌实测报告
  • 2026国内最新全屋定制板材十大源头厂家推荐!山东等地优质全屋定制板材企业权威榜单发布,环保品质双优助力高品质家居 - 品牌推荐2026
  • [STM32L5] 【STM32L562E-DK试用】第2辑:综合芯片外设测试(GPIO、定时器、串口)
  • 收藏必学!RAG技术完全指南:让大模型告别“胡说八道“,企业级AI知识库构建实战
  • 互联网大厂Java求职面试实战:全栈技术与AI应用深度解析
  • Rust 输了?在 AI Agent 的战场上,TypeScript 才是唯一的“神”
  • 1GB大文件不再卡顿?揭秘JavaScript Streaming和背压的工作原理
  • iMeta高引论文 | 张龙超/王金勇/刘娣/李明洲/印遇龙/王立贤-率先完成猪T2T基因组组装
  • 如何理解工业超级智能体及其核心价值?
  • IDC 机房的隐形防线:如何用边缘温湿度节点实现“零盲区”环境监控?
  • 收藏级|小白/程序员大模型学习指南(避坑+实战,零基础也能上手)
  • 北京全域高丽参上门回收|记录者商行:深耕匠心,守护每一份珍品的健康价值 - 品牌排行榜单
  • GPT-5.2与Sora2强强联手:万字深度解析下一代多模态架构与Python落地实战(建议收藏)
  • 福州留学中介top10,好评多!留学选择必看指南 - 留学机构评审官
  • 本地部署 DeepSeek+VS Code,搭建本地大模型
  • 2026年市场技术好的不锈钢中厚板厂家哪个好, 304 不锈钢冷热轧板材/不锈钢冷轧板,不锈钢中厚板源头厂家哪个好 - 品牌推荐师
  • 2026年 吹塑场馆椅厂家推荐排行榜:HDPE吹塑座椅/中空吹塑看台椅/抗老化塑料椅,专业实力与耐用性深度解析 - 品牌企业推荐师(官方)
  • 合肥硕士留学中介如何选?看排名更需反馈及时的服务体验 - 留学机构评审官
  • 沈阳福道金属制品有限责任公司:2026年旗杆厂家推荐,不锈钢旗杆/电动旗杆/锥形旗杆/学校旗杆/手动旗杆全系供应 - 品牌推荐官
  • CodeForces 随机乱做