ETL工具 – JAVA 调用 Kettle 转换、作业脚本

一、JAVA 调用 Kettle 转换

在写 Java 程序前,先使用 Spoon 设计一下转换的过程,这里以拉取 CSDN 文章列表存入 txt 文本为例:

拉取的接口为 https://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950

返回格式如下:

{
    "code": 200,
    "message": "success",
    "traceId": "b1e8ccb0-2e39-4834-bacd-b52a260bb521",
    "data": {
        "list": [
            {
                "articleId": 130450076,
                "title": "ETL工具 - Kettle 查询、连接、统计、脚本算子介绍",
                "description": "连接算子一般将多个数据集通过关键字进行连接,类似 `SQL` 中的连接操作,统计算子可以提供数据的采样和统计功能,脚本算子可以通过程序代码完成一些复杂的操作",
                "url": "https://xiaobichao.blog.csdn.net/article/details/130450076",
                "type": 1,
                "top": false,
                "forcePlan": false,
                "viewCount": 313,
                "commentCount": 0,
                "editUrl": "https://editor.csdn.net/md?articleId=130450076",
                "postTime": "2023-04-30 23:12:13",
                "diggCount": 1,
                "formatTime": "前天 23:12",
                "picList": [
                    "https://img-blog.csdnimg.cn/2e817e14046f4cba9663c89978198f12.png"
                ]
            }
        ],
        "total": 287
    }
}

1.1 转换设计过程

这里 url 通过变量的形式传递进来,整体的转换设计如下:

在这里插入图片描述

获取变量:

在这里插入图片描述

REST client:

在这里插入图片描述

JSON input:

在这里插入图片描述

在这里插入图片描述

字段选择:

在这里插入图片描述

文本文件输出:

在这里插入图片描述

设计完后,保存 ktr 脚本:

在这里插入图片描述

1.2 Java 调用转换脚本

新建一个 Mavne 项目,在 pom 中引入下面依赖:

    
        pentaho-kettle
        kettle-core
        9.6.0.0-SNAPSHOT
    

    
        pentaho-kettle
        kettle-engine
        9.6.0.0-SNAPSHOT
    

    
        org.pentaho.di.plugins
        pdi-core-plugins-impl
        9.6.0.0-SNAPSHOT
    

    
        pentaho
        pentaho-capability-manager
        9.6.0.0-SNAPSHOT
        compile
    

    
        commons-cli
        commons-cli
        1.3.1
    

    
        com.sun.jersey.contribs
        jersey-apache-client4
        1.9.1
    

    
        com.sun.jersey
        jersey-core
        1.19.1
    

    
        com.sun.jersey
        jersey-client
        1.19.1
    

    
        com.sun.jersey
        jersey-bundle
        1.19.1
    


    
        pentaho-public
        Pentaho Public
        https://repo.orl.eng.hitachivantara.com/artifactory/pnt-mvn/
        
            true
            daily
        
        
            true
            interval:15
        
    

在 resources 下新建 kettle-password-encoder-plugins.xml 文件,内容如下:

         
        Kettle Password Encoder    
        org.pentaho.di.core.encryption.KettleTwoWayPasswordEncoder
    

Java 调用逻辑:

public class RunTrans {

    public static void main(String[] args) {
        try {
            // 指定插件位置,注意改为你的安装目录
            StepPluginType.getInstance().getPluginFolders().
                    add(new PluginFolder("D:/data-integration_9_3/plugins/", false, true));
            // 初始化 kettle 环境
            KettleEnvironment.init();
        } catch (KettleException e) {
            e.printStackTrace();
        }
        String ktrPath = "D:/data/job/trans.ktr";
        String url = "https://blog.csdn.net/community/home-api/v1/get-business-list?page=1&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950";
        // 添加变量
        Map variableMap = new HashMap();
        variableMap.put("url", url);
        Boolean res = runTrans(ktrPath, variableMap, null);
        System.out.println("转换执行结果:" + res);
    }

    private static Boolean runTrans(String ktrPath, Map variableMap, Map parameterMap) {
        try {
            // 加载 ktr 文件
            TransMeta transMeta = new TransMeta(ktrPath, (Repository) null);
            Trans trans = new Trans(transMeta);
            trans.setLogLevel(LogLevel.MINIMAL);
            // 变量
            if (Objects.nonNull(variableMap) && !variableMap.isEmpty()) {
                variableMap.forEach(trans::setVariable);
            }
            // 参数
            if (Objects.nonNull(parameterMap) && !parameterMap.isEmpty()) {
                parameterMap.forEach((k, v) -> {
                    try {
                        trans.setParameterValue(k, v);
                    } catch (UnknownParamException e) {
                        e.printStackTrace();
                    }
                });
            }
            // 监听执行日志
            KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {
                @Override
                public void eventAdded(KettleLoggingEvent logs) {
                    System.out.println("Kettle 日志:level = " + logs.getLevel() + " , time = " + logs.getTimeStamp() + " , message = " + logs.getMessage());
                }
            });
            // 执行转换
            trans.execute(new String[0]);
            // 等待执行完成
            trans.waitUntilFinished();
            // 是否执行成功
            return trans.getErrors() == 0;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

}

在这里插入图片描述

下面到输出目录查看结果:

在这里插入图片描述

在这里插入图片描述

二、JAVA 调用 Kettle 任务

任务就调用上面的转换,进行测试:

在这里插入图片描述

保存 kjb 文件:

在这里插入图片描述

Java 调用逻辑:

public class RunJob {

    public static void main(String[] args) {
        try {
            // 指定插件位置
            StepPluginType.getInstance().getPluginFolders().
                    add(new PluginFolder("D:/data-integration_9_3/plugins/", false, true));
            // 初始化 kettle 环境
            KettleEnvironment.init();
        } catch (KettleException e) {
            e.printStackTrace();
        }
        String kjbPath = "D:/data/job/job.kjb";
        String url = "https://blog.csdn.net/community/home-api/v1/get-business-list?page=2&size=20&businessType=blog&orderby=&noMore=false&year=&month=&username=qq_43692950";
        // 添加变量
        Map variableMap = new HashMap();
        variableMap.put("url", url);
        Boolean res = runJob(kjbPath, variableMap, null);
        System.out.println("转换执行结果:" + res);
    }

    private static Boolean runJob(String kjbPath, Map variableMap, Map parameterMap) {
        try {
            JobMeta jobMeta = new JobMeta(kjbPath, null);
            Job job = new Job(null, jobMeta);
            job.setLogLevel(LogLevel.MINIMAL);
            // 变量
            if (Objects.nonNull(variableMap) && !variableMap.isEmpty()) {
                variableMap.forEach(job::setVariable);
            }
            // 参数
            if (Objects.nonNull(parameterMap) && !parameterMap.isEmpty()) {
                parameterMap.forEach((k, v) -> {
                    try {
                        job.setParameterValue(k, v);
                    } catch (UnknownParamException e) {
                        e.printStackTrace();
                    }
                });
            }
            // 监听执行日志
            KettleLogStore.getAppender().addLoggingEventListener(new KettleLoggingEventListener() {
                @Override
                public void eventAdded(KettleLoggingEvent logs) {
                    System.out.println("Kettle 日志:level = " + logs.getLevel() + " , time = " + logs.getTimeStamp() + " , message = " + logs.getMessage());
                }
            });
            // 执行作业
            job.start();
            // 等待执行完成
            job.waitUntilFinished();
            // 是否执行成功
            return job.getErrors() == 0;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

}

在这里插入图片描述

下面到输出目录查看结果:

在这里插入图片描述

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://net2asp.com/57b9eb57c1.html