ETL工具 – JAVA 调用 Kettle 转换、作业脚本
•
Jave
一、JAVA 调用 Kettle 转换
在写 Java 程序前,先使用 Spoon 设计一下转换的过程,这里以拉取 CSDN 文章列表存入 txt 文本为例:
拉取的接口为 https://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950
返回格式如下:
{
"code": 200,
"message": "success",
"traceId": "b1e8ccb0-2e39-4834-bacd-b52a260bb521",
"data": {
"list": [
{
"articleId": 130450076,
"title": "ETL工具 - Kettle 查询、连接、统计、脚本算子介绍",
"description": "连接算子一般将多个数据集通过关键字进行连接,类似 `SQL` 中的连接操作,统计算子可以提供数据的采样和统计功能,脚本算子可以通过程序代码完成一些复杂的操作",
"url": "https://xiaobichao.blog.csdn.net/article/details/130450076",
"type": 1,
"top": false,
"forcePlan": false,
"viewCount": 313,
"commentCount": 0,
"editUrl": "https://editor.csdn.net/md?articleId=130450076",
"postTime": "2023-04-30 23:12:13",
"diggCount": 1,
"formatTime": "前天 23:12",
"picList": [
"https://img-blog.csdnimg.cn/2e817e14046f4cba9663c89978198f12.png"
]
}
],
"total": 287
}
}
1.1 转换设计过程
这里 url 通过变量的形式传递进来,整体的转换设计如下:

获取变量:

REST client:

JSON input:


字段选择:

文本文件输出:

设计完后,保存 ktr 脚本:

1.2 Java 调用转换脚本
新建一个 Mavne 项目,在 pom 中引入下面依赖:
pentaho-kettle
kettle-core
9.6.0.0-SNAPSHOT
pentaho-kettle
kettle-engine
9.6.0.0-SNAPSHOT
org.pentaho.di.plugins
pdi-core-plugins-impl
9.6.0.0-SNAPSHOT
pentaho
pentaho-capability-manager
9.6.0.0-SNAPSHOT
compile
commons-cli
commons-cli
1.3.1
com.sun.jersey.contribs
jersey-apache-client4
1.9.1
com.sun.jersey
jersey-core
1.19.1
com.sun.jersey
jersey-client
1.19.1
com.sun.jersey
jersey-bundle
1.19.1
pentaho-public
Pentaho Public
https://repo.orl.eng.hitachivantara.com/artifactory/pnt-mvn/
true
daily
true
interval:15
在 resources 下新建 kettle-password-encoder-plugins.xml 文件,内容如下:
Kettle Password Encoder
org.pentaho.di.core.encryption.KettleTwoWayPasswordEncoder
Java 调用逻辑:
public class RunTrans {
public static void main(String[] args) {
try {
// 指定插件位置,注意改为你的安装目录
StepPluginType.getInstance().getPluginFolders().
add(new PluginFolder("D:/data-integration_9_3/plugins/", false, true));
// 初始化 kettle 环境
KettleEnvironment.init();
} catch (KettleException e) {
e.printStackTrace();
}
String ktrPath = "D:/data/job/trans.ktr";
String url = "https://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950";
// 添加变量
Map variableMap = new HashMap();
variableMap.put("url", url);
Boolean res = runTrans(ktrPath, variableMap, null);
System.out.println("转换执行结果:" + res);
}
private static Boolean runTrans(String ktrPath, Map variableMap, Map parameterMap) {
try {
// 加载 ktr 文件
TransMeta transMeta = new TransMeta(ktrPath, (Repository) null);
Trans trans = new Trans(transMeta);
trans.setLogLevel(LogLevel.MINIMAL);
// 变量
if (Objects.nonNull(variableMap) && !variableMap.isEmpty()) {
variableMap.forEach(trans::setVariable);
}
// 参数
if (Objects.nonNull(parameterMap) && !parameterMap.isEmpty()) {
parameterMap.forEach((k, v) -> {
try {
trans.setParameterValue(k, v);
} catch (UnknownParamException e) {
e.printStackTrace();
}
});
}
// 监听执行日志
KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {
@Override
public void eventAdded(KettleLoggingEvent logs) {
System.out.println("Kettle 日志:level = " + logs.getLevel() + " , time = " + logs.getTimeStamp() + " , message = " + logs.getMessage());
}
});
// 执行转换
trans.execute(new String[0]);
// 等待执行完成
trans.waitUntilFinished();
// 是否执行成功
return trans.getErrors() == 0;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}

下面到输出目录查看结果:


二、JAVA 调用 Kettle 任务
任务就调用上面的转换,进行测试:

保存 kjb 文件:

Java 调用逻辑:
public class RunJob {
public static void main(String[] args) {
try {
// 指定插件位置
StepPluginType.getInstance().getPluginFolders().
add(new PluginFolder("D:/data-integration_9_3/plugins/", false, true));
// 初始化 kettle 环境
KettleEnvironment.init();
} catch (KettleException e) {
e.printStackTrace();
}
String kjbPath = "D:/data/job/job.kjb";
String url = "https://blog.csdn.net/community/home-api/v1/get-business-list?page=2&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950";
// 添加变量
Map variableMap = new HashMap();
variableMap.put("url", url);
Boolean res = runJob(kjbPath, variableMap, null);
System.out.println("转换执行结果:" + res);
}
private static Boolean runJob(String kjbPath, Map variableMap, Map parameterMap) {
try {
JobMeta jobMeta = new JobMeta(kjbPath, null);
Job job = new Job(null, jobMeta);
job.setLogLevel(LogLevel.MINIMAL);
// 变量
if (Objects.nonNull(variableMap) && !variableMap.isEmpty()) {
variableMap.forEach(job::setVariable);
}
// 参数
if (Objects.nonNull(parameterMap) && !parameterMap.isEmpty()) {
parameterMap.forEach((k, v) -> {
try {
job.setParameterValue(k, v);
} catch (UnknownParamException e) {
e.printStackTrace();
}
});
}
// 监听执行日志
KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {
@Override
public void eventAdded(KettleLoggingEvent logs) {
System.out.println("Kettle 日志:level = " + logs.getLevel() + " , time = " + logs.getTimeStamp() + " , message = " + logs.getMessage());
}
});
// 执行作业
job.start();
// 等待执行完成
job.waitUntilFinished();
// 是否执行成功
return job.getErrors() == 0;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
}

下面到输出目录查看结果:

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/57b9eb57c1.html
