当前位置: 首页 > news >正文

Kettle9.4(Pentaho Data Integration)调度PostgreSQL18存储过程或函数,在传入指定日期时优先指定日期,未传入指定日期默认T-1昨天

目录

环境说明

一、过程参数/函数参数

1.1 参数说明

1.2 参数示例 IN/OUT

1.3 存储过程与函数对比

二、调度结构

2.1 转换结构如下

2.2 作业结构如下

三、存储过程与函数结构

3.1 建表

[3.1.1] d_run_log:日志表

[3.1.2] d_bsz_proc_batchdate:函数和过程统一的目标表

3.2 建存储过程

[3.2.1] sp_d_run_log:日志存储过程

[3.2.2] sp_d_bsz_proc_batchdate:调度存储过程

3.3 建函数(功能与过程一致)

[3.3.1] sp_d_bsz_func_batchdate:调度函数

四、Kettle(Pentaho Data Integration)配置

4.1 转换

4.1.1 获取变量

4.1.2 JS脚本

4.1.3 输出日志

4.1.4 执行SQL-存储过程

4.1.5 执行SQL-函数

4.1.6 测试并保存转换

4.2 作业

4.2.1 设置变量

4.2.2 设置转换

4.2.3 测试执行


环境说明

Ubuntu24.04

Kettle9.4(pdi-ce-9.4.0.0-343.zip)

PostgreSQL18

这里会分别按存储过程与函数来演示调用过程,可以按需选择;

一、过程参数/函数参数

在真实项目中,为了日常跑批和管理,通常过程和函数入参都是固定的昨天为日期参数,为字符串格式,例'20260101'

月末跑批也是每月1号跑昨日上月末数据,这种周期跑批可以由作业控制,参数中仅为日期字符串即可;

场景一:每天批量跑昨天数据;

场景二:指定日期重跑数据(逻辑口径紧急调整重跑);

场景三:指定日期区间重跑数据(历史逻辑口径有误,重跑历史区间数据);

1.1 参数说明

存储过程参数有两个,日期入参整型出参

存储过程也可以改写为函数;

函数只有日期参数,并对应一个整型返回值;(两者在执行语句上有细微差别,call+存储过程名,select+函数名)

1.2 参数示例 IN/OUT

存储过程参数示例: 入参p_i_date,出参p_o_rtn: CREATE OR REPLACE PROCEDURE sp_d_bsz_proc_batchdate(IN p_i_date character varying, OUT p_o_rtn integer)
函数参数示例:入参p_i_date CREATE OR REPLACE FUNCTION sp_d_bsz_proc_batchdate(IN p_i_date character varying)

1.3 存储过程与函数对比

特性函数(FUNCTION)存储过程(PROCEDURE)
引入版本一直存在PostgreSQL 11
返回值必须返回一个值(单值、记录或集合)可以不返回值(通过 OUT 参数模拟返回)
调用方式SELECT function(...)或作为表达式的一部分CALL procedure(...)
事务控制不允许包含COMMIT/ROLLBACK允许包含事务控制语句
在 SQL 中使用可以在SELECTWHEREINSERT等语句中直接使用不能在 SQL 语句中直接使用
输出参数支持OUT参数(但通常用返回值)支持INOUT/OUT参数
常见用途计算并返回结果,封装复杂查询逻辑执行数据修改、批量操作、需要事务控制的流程

⚠️注意

Kettle 的“调用 DB 存储过程”步骤默认使用函数调用语法(SELECT procedure(...));

PostgreSQL 引入的存储过程(PROCEDURE)必须使用 CALL procedure(...)来执行;

Kettle 的该步骤尚未适配,需要用“执行SQL”步骤替代“调用 DB 存储过程”步骤。

二、调度结构

由作业设置变量传给转换 --> 转换获取变量 --> 转换处理变量传入参数 --> 转换执行函数和存储过程 --> 作业成功;

主要的执行过程在转换中完成;

2.1 转换结构如下

2.2 作业结构如下

三、存储过程与函数结构

⚠️注意:postgresql名称有大小写敏感,请统一;

这里会创建一个统一的日志存储过程用来记录存储和函数的执行日志(sp_d_run_log),同时也便于排查报错问题;(如果存储或函数报错了,用来看执行步骤到了第几步,什么时候执行的)

存储过程和函数功能都一样,改写一下的区别;

3.1 建表

[3.1.1] d_run_log:日志表

-- DROP TABLE public.d_run_log; CREATE TABLE public.d_run_log ( run_time timestamp DEFAULT now() NULL, -- 运行时间,记录日志插入的时间戳,默认当前时间 prc_nm varchar(60) NOT NULL, -- 存储过程名 batch_date date NOT NULL, -- 批量日期(业务日期),用于区分不同批次的日志 step_no int4 NOT NULL, -- 执行步骤编号,1234,,便于排序和定位 step_desc varchar(100) NULL, -- 执行步骤说明,描述该步骤的具体操作或状态 step_type varchar(20) NOT NULL -- 步骤类型,使用 START、END、INFO、ERROR 等约定值,便于过滤分析 ); CREATE INDEX idx_run_log_batch_date ON public.d_run_log USING btree (batch_date); CREATE INDEX idx_run_log_run_time ON public.d_run_log USING btree (run_time); COMMENT ON TABLE public.d_run_log IS '运行日志表,记录业务批处理过程中的步骤信息'; -- Column comments COMMENT ON COLUMN public.d_run_log.run_time IS '运行时间,记录日志插入的时间戳,默认当前时间'; COMMENT ON COLUMN public.d_run_log.prc_nm IS '存储过程名'; COMMENT ON COLUMN public.d_run_log.batch_date IS '批量日期(业务日期),用于区分不同批次的日志'; COMMENT ON COLUMN public.d_run_log.step_no IS '执行步骤编号,1234,,便于排序和定位'; COMMENT ON COLUMN public.d_run_log.step_desc IS '执行步骤说明,描述该步骤的具体操作或状态'; COMMENT ON COLUMN public.d_run_log.step_type IS '步骤类型,使用 START、END、INFO、ERROR 等约定值,便于过滤分析';

[3.1.2] d_bsz_proc_batchdate:函数和过程统一的目标表

-- public.d_bsz_proc_batchdate definition -- Drop table -- DROP TABLE public.d_bsz_proc_batchdate; CREATE TABLE public.d_bsz_proc_batchdate ( prc_nm varchar(100) NOT NULL, -- 执行的存储过程名称 run_time timestamp DEFAULT now() NOT NULL, -- 执行时间(插入时自动记录) batch_date date NOT NULL -- 业务日期 ); COMMENT ON TABLE public.d_bsz_proc_batchdate IS '存储过程执行记录表'; -- Column comments COMMENT ON COLUMN public.d_bsz_proc_batchdate.prc_nm IS '执行的存储过程名称'; COMMENT ON COLUMN public.d_bsz_proc_batchdate.run_time IS '执行时间(插入时自动记录)'; COMMENT ON COLUMN public.d_bsz_proc_batchdate.batch_date IS '业务日期';

3.2 建存储过程

[3.2.1] sp_d_run_log:日志存储过程

被日常存储过程和函数调用,记录执行日志

-- DROP PROCEDURE public.sp_d_run_log(varchar, date, int4, varchar, varchar); CREATE OR REPLACE PROCEDURE public.sp_d_run_log(IN p_prc_nm character varying, IN p_batch_date date, IN p_step_no integer, IN p_step_desc character varying, IN p_step_type character varying) LANGUAGE plpgsql AS $procedure$ BEGIN INSERT INTO public.d_run_log (run_time,prc_nm, batch_date, step_no, step_desc, step_type) VALUES (now(),p_prc_nm, p_batch_date, p_step_no, p_step_desc, p_step_type); -- 可选:提交事务(如果在调用者的事务中已包含则无需此句) -- COMMIT; EXCEPTION WHEN OTHERS THEN -- 记录错误日志到标准输出或应用日志,避免存储过程抛出异常影响主流程 RAISE WARNING 'Failed to insert run log: %', SQLERRM; END; $procedure$ ;

[3.2.2] sp_d_bsz_proc_batchdate:调度存储过程

用来将本过程执行的执行时间和对应业务日期插入目标表,功能与函数一致

-- DROP PROCEDURE public.sp_d_bsz_proc_batchdate(in varchar, out int4); CREATE OR REPLACE PROCEDURE public.sp_d_bsz_proc_batchdate(IN p_i_date character varying, OUT p_o_rtn integer) LANGUAGE plpgsql AS $procedure$ DECLARE /*日志变量区域*/ V_DATE DATE; p_prc_nm VARCHAR(60); p_batch_date date; p_step_no int; p_step_desc VARCHAR(100); p_step_type VARCHAR(20); /**********************************************************************/ BEGIN -- 将字符串转换为日期,格式 YYYYMMDD V_DATE := to_date(P_I_DATE, 'YYYYMMDD'); p_prc_nm := 'sp_d_bsz_proc_batchdate'; p_batch_date := V_DATE; p_step_no := 1; p_step_desc := '插入目标表'; p_step_type := 'START'; call sp_d_run_log(p_prc_nm, p_batch_date, p_step_no, p_step_desc, p_step_type); -- 插入记录 INSERT INTO public.d_bsz_proc_batchdate (prc_nm, batch_date, run_time) VALUES ('sp_d_bsz_proc_batchdate', V_DATE, now()); -- 成功返回 0 P_O_RTN := 0; p_prc_nm := 'sp_d_bsz_proc_batchdate'; p_batch_date := V_DATE; p_step_no := 99; p_step_desc := '运行成功'; p_step_type := 'END'; call sp_d_run_log(p_prc_nm, p_batch_date, p_step_no, p_step_desc, p_step_type); EXCEPTION WHEN OTHERS THEN -- 发生任何异常返回 1 P_O_RTN := 1; -- 可选的错误记录(输出到控制台或日志) RAISE WARNING 'Failed to insert log for procedure %: %', p_prc_nm, SQLERRM; END; $procedure$ ;

3.3 建函数(功能与过程一致)

[3.3.1] sp_d_bsz_func_batchdate:调度函数

-- DROP FUNCTION public.sp_d_bsz_func_batchdate(varchar); CREATE OR REPLACE FUNCTION public.sp_d_bsz_func_batchdate(p_i_date character varying) RETURNS integer LANGUAGE plpgsql AS $function$ DECLARE /*日志变量区域*/ V_DATE DATE; p_prc_nm VARCHAR(60); p_batch_date date; p_step_no int; p_step_desc VARCHAR(100); p_step_type VARCHAR(20); /**********************************************************************/ BEGIN -- 将字符串转换为日期,格式 YYYYMMDD V_DATE := to_date(P_I_DATE, 'YYYYMMDD'); p_prc_nm := 'sp_d_bsz_func_batchdate'; p_batch_date := V_DATE; p_step_no := 1; p_step_desc := '插入目标表'; p_step_type := 'START'; CALL sp_d_run_log(p_prc_nm, p_batch_date, p_step_no, p_step_desc, p_step_type); -- 插入记录 INSERT INTO public.d_bsz_proc_batchdate (prc_nm, batch_date, run_time) VALUES ('sp_d_bsz_func_batchdate', V_DATE, now()); -- 成功返回 0 -- P_O_RTN := 0; -- 移除OUT参数 p_prc_nm := 'sp_d_bsz_func_batchdate'; p_batch_date := V_DATE; p_step_no := 99; p_step_desc := '运行成功'; p_step_type := 'END'; CALL sp_d_run_log(p_prc_nm, p_batch_date, p_step_no, p_step_desc, p_step_type); RETURN 0; -- 显式返回成功值 EXCEPTION WHEN OTHERS THEN -- 发生任何异常返回 1 -- P_O_RTN := 1; RAISE WARNING 'Failed to insert log for func %: %', p_prc_nm, SQLERRM; RETURN 1; -- 显式返回失败值 END; $function$ ;

四、Kettle(Pentaho Data Integration)配置

4.1 转换

4.1.1 获取变量

由作业传入run_date参数(变量res是针对存储过程,用来获取存储过程的返回值,函数不需要)

在界面空白处右键-转换设置

命名参数-新增 run_date;为了能在测试执行转换的时候指定日期;

4.1.2 JS脚本

用来处理传入的run_date变量,并输出p_i_date作为变量和参数

//Script here //Script here // 获取传入的日期字符串参数(可能为空) var dateStr = run_date; // 来自“获取变量”步骤的字段 var res; var targetDate; if (dateStr == null || dateStr.trim() == '') { // 无参数,默认使用昨天日期 targetDate = new Date(); targetDate.setDate(targetDate.getDate() - 1); } else { // 解析传入的 yyyymmdd 格式字符串 var year = parseInt(dateStr.substring(0, 4), 10); var month = parseInt(dateStr.substring(4, 6), 10) - 1; // 月份从0开始 var day = parseInt(dateStr.substring(6, 8), 10); targetDate = new Date(year, month, day); } // 格式化为 yyyymmdd(不使用 padStart) var year = targetDate.getFullYear(); var month = targetDate.getMonth() + 1; var day = targetDate.getDate(); // 补零 var monthStr = month < 10 ? '0' + month : '' + month; var dayStr = day < 10 ? '0' + day : '' + day; var yesterdayStr = year + monthStr + dayStr; setVariable("p_i_date", yesterdayStr, "r");

4.1.3 输出日志

4.1.4 执行SQL-存储过程

执行call语句:

call sp_d_bsz_proc_batchdate(?,?);

?表示参数,需要在参数列表Bind绑定;参数会按编号顺序绑定,顺序不能乱

下面的Bind parameters?一定要勾上,要不然加不了参数;

参数p_i_date作为日期字符串的输入,res作为存储过程的输出;

4.1.5 执行SQL-函数

执行select语句:

select sp_d_bsz_func_batchdate(?) AS result;

函数只有一个参数,p_i_date作为日期字符串的输入;

4.1.6 测试并保存转换

点击执行,可以自定义日志级别,可以选调试级别日志更加详细;命名参数中可以在值位置指定业务日期,也可以留空,默认昨天日期;

点击启动,日志中会打印各项参数信息,确认是否符合自己的预期;我这边run_date留空,默认为昨天'20260219',无误;

转换完成查看目标表,函数与存储过程都已入表,转换完成;

4.2 作业

4.2.1 设置变量

拖入设置变量 run_date,值 ${run_date}

空白处右键-作业设置

新增命名参数 run_date,在测试执行作业时可以自定义指定日期;

4.2.2 设置转换

拖入转换,选择保存的转换文件;

4.2.3 测试执行

选择起始位置 run_date,可以按需选择是否要指定日期,点击执行;

我这里指定日期 '20251231' 进行测试;

指定日期、传入参数无误;

查询目标表,函数与存储过程插入正常;

最后在收尾拖入Start与成功步骤,用来配置调度定时和结束标记;

Start步骤:

作业调度配置完成。

http://www.jsqmd.com/news/511247/

相关文章:

  • PHP 8 新特性、Laravel/Hyperf 源码理解、MySQL 索引优化、Redis 场景应用的庖丁解牛
  • 【限时解密】Dify 0.12+版本Multi-Agent热协同协议:支持200+并发Agent动态协商,延迟<87ms——附性能调优checklist》
  • Vue—条件渲染与循环渲染
  • 代码随想录一刷记录Day1—— leetcode704. 二分查找 leetcode27. 移除元素 leetcode977.有序数组的平方
  • EasyCVR视频届的万能接口
  • Fun-ASR-MLT-Nano实战:搭建支持31种语言的语音识别服务
  • java微信小程序的外卖点餐点单系统 商家协同过滤
  • VOOHU 沃虎电子 SFP28 高速连接器 WHSFP32221F013 集成导光柱与散热孔 满足25G数据中心高密度应用
  • 提升自控力差孩子的学习生活:有效的学习障碍帮助与冲动控制训练方法
  • 2026年3月,评测精选皮带导轨厂家,导轨品牌分析深度剖析助力明智之选 - 品牌推荐师
  • 嵌入式C代码安全防线如何崩塌?静态分析7大盲区正在 silently 毁掉你的量产固件
  • 网络安全之linux2
  • LightOnOCR-2-1B多语种OCR落地:国际NGO多语言援助文件OCR+机器翻译流水线
  • 互联网是从0到1,AI是1到无穷大
  • Python基础学习(3)——容器数据类型
  • MGeo门址模型部署教程:阿里云ACK集群中MGeo服务CI/CD自动化发布流程
  • 长沙有没有能解决频繁染发问题且提供贴心售后的男士补发实体店 - myqiye
  • Dify多智能体工作流实战手册:从零搭建高可用协同架构,7天上线金融级审批Agent集群
  • 5分钟快速上手:终极免费生态系统模拟器Ecosim完整指南
  • 小白也能懂:LingBot-Depth模型卡解读,快速上手单目深度估计
  • 讲讲山西靠谱的防腐实验室工作台品牌有哪些 - 工业推荐榜
  • 配电网有功电压控制:多智能体强化学习的奇妙之旅
  • Luos串行网络协议:嵌入式微服务的确定性串行总线实现
  • Anaconda环境管理:为BERT文本分割模型创建独立的Python开发环境
  • 【Dify企业级私有化部署权威指南】:2026年GPU资源优化率提升37%的5大架构跃迁实践
  • 使用LaTeX撰写MogFace-large模型技术报告与论文
  • L298N电机驱动模块原理与HC32F4A0嵌入式移植实践
  • 诡异代码
  • gazebo 中通过sac 训练机械臂进行轨迹规划
  • Pixel Dimension Fissioner多场景落地:医疗问诊记录→患者教育材料生成