Java调用执行Kettle资源库信息示例
2024/6/3...大约 2 分钟
Java调用执行Kettle资源库信息示例
package com.boy.common.utils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.ProgressNullMonitorListener;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.parameters.UnknownParamException;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import java.util.Map;
import java.util.Set;
public class KettleUtils {
private static final Logger logger = LoggerFactory.getLogger(KettleUtils.class);
public static KettleDatabaseRepository repositoryConnection() {
/** 初始化 */
try {
KettleEnvironment.init();
} catch (KettleException ex) {
logger.error(ex.getMessage());
ex.printStackTrace();
}
/**
* Construct a new database connections. Note that not all these parameters are not always mandatory.
*
* @param name
* The database name
* @param type
* The type of database
* @param access
* The type of database access
* @param host
* The hostname or IP address
* @param db
* The database name
* @param port
* The port on which the database listens.
* @param user
* The username
* @param pass
* The password
*/
DatabaseMeta databaseMeta = new DatabaseMeta("kettle-repo", "MySQL", "Native(JDBC)", "127.0.0.1", "kettle-repo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai", "3306", "root", "root");
/** 数据库形式的资源库元对象 */
KettleDatabaseRepositoryMeta repositoryMeta = new KettleDatabaseRepositoryMeta();
repositoryMeta.setConnection(databaseMeta);
/** 数据库形式的资源库对象 */
KettleDatabaseRepository repository = new KettleDatabaseRepository();
/** 用资源库元对象初始化资源库对象 */
repository.init(repositoryMeta);
/** 连接到资源库:默认的连接资源库的用户名及密码 */
try {
repository.connect("admin", "admin");
} catch (KettleException e) {
e.printStackTrace();
}
if (repository.isConnected()) {
System.out.println("@>>>>>>>>> Connection Success!");
return repository;
} else {
System.out.println("@>>>>>>>>> Connection Failed!");
return null;
}
}
/**
* 执行转换
*
* @param repository kettle仓库对象
* @param directory 转换存储路径
* @param transName 转换名称
* @param params 转换参数
*/
public static void runTrans(KettleDatabaseRepository repository, String directory, String transName, Map<String, String> params) {
try {
/** 根据指定的字符串路径 找到目录 */
RepositoryDirectoryInterface repDirectory = repository.findDirectory(directory);
TransMeta transMeta = repository.loadTransformation(repository.getTransformationID(transName, repDirectory), null);
// 设置参数
if (!ObjectUtils.isEmpty(params)) {
Set<Map.Entry<String, String>> entries = params.entrySet();
entries.forEach(item -> {
String key = item.getKey();
String value = item.getValue();
try {
transMeta.setParameterValue(key, value);
} catch (UnknownParamException e) {
e.printStackTrace();
}
});
}
// transMeta.setParameterValue("", "");
Trans trans = new Trans(transMeta);
/** 执行转换 */
trans.execute(null);
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
System.out.println("有异常");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void runJobs(KettleDatabaseRepository repository, String directory, String jobName, Map<String, String> params) {
try {
/** 根据指定的字符串路径 找到目录 */
RepositoryDirectoryInterface repDirectory = repository.findDirectory(directory);
JobMeta jobMeta = repository.loadJob(jobName, repDirectory, new ProgressNullMonitorListener(), null);
Job job = new Job(repository, jobMeta);
// 向Job脚本传递参数, 脚本中获取参数值:${参数名}
if (!ObjectUtils.isEmpty(params)) {
Set<Map.Entry<String, String>> entries = params.entrySet();
entries.forEach(item -> {
String key = item.getKey();
String value = item.getValue();
job.setVariable(key, value);
});
}
job.start();
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception("运行Kettle Job任务时发生错误!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
KettleDatabaseRepository repository = repositoryConnection();
/**
* @param repository kettle仓库对象
* @param directory 转换存储路径
* @param transName || jobName 转换/任务名称
* @param params 转换参数
*/
// runTrans(repository, "/Jobs", "DB2Excel2", null);
runJobs(repository, "/Jobs", "DB2Excel", null);
}
}