blob: 3fbabe5166e0b79d5deb20e091d56e55fb7acf1d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.datax;
import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.spi.task.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DatasourceUtil;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.util.MapUtils;
import org.apache.dolphinscheduler.plugin.task.util.OSUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.dolphinscheduler.spi.task.AbstractParameters;
import org.apache.dolphinscheduler.spi.task.Property;
import org.apache.dolphinscheduler.spi.task.paramparser.ParamUtils;
import org.apache.dolphinscheduler.spi.task.paramparser.ParameterUtils;
import org.apache.dolphinscheduler.spi.task.request.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class DataxTask extends AbstractTaskExecutor {
/**
* jvm parameters
*/
public static final String JVM_PARAM = " --jvm=\"-Xms%sG -Xmx%sG\" ";
/**
* python process(datax only supports version 2.7 by default)
*/
private static final String DATAX_PYTHON = "python2.7";
private static final Pattern PYTHON_PATH_PATTERN = Pattern.compile("/bin/python[\\d.]*$");
/**
* datax path
*/
private static final String DATAX_PATH = "${DATAX_HOME}/bin/datax.py";
/**
* datax channel count
*/
private static final int DATAX_CHANNEL_COUNT = 1;
/**
* datax parameters
*/
private DataxParameters dataXParameters;
/**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
/**
* taskExecutionContext
*/
private TaskRequest taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
public DataxTask(TaskRequest taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext, logger);
}
/**
* init DataX config
*/
@Override
public void init() {
logger.info("datax task params {}", taskExecutionContext.getTaskParams());
dataXParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DataxParameters.class);
if (!dataXParameters.checkParameters()) {
throw new RuntimeException("datax task params is not valid");
}
}
/**
* run DataX process
*
* @throws Exception if error throws Exception
*/
@Override
public void handle() throws Exception {
try {
// set the name of the current thread
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskExecutionContext, getParameters());
if (MapUtils.isEmpty(paramsMap)) {
paramsMap = new HashMap<>();
}
if (MapUtils.isNotEmpty(taskExecutionContext.getParamsMap())) {
paramsMap.putAll(taskExecutionContext.getParamsMap());
}
// run datax procesDataSourceService.s
String jsonFilePath = buildDataxJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
}
}
/**
* cancel DataX process
*
* @param cancelApplication cancelApplication
* @throws Exception if error throws Exception
*/
@Override
public void cancelApplication(boolean cancelApplication)
throws Exception {
// cancel process
shellCommandExecutor.cancelApplication();
}
/**
* build datax configuration file
*
* @return datax json file name
* @throws Exception if error throws Exception
*/
private String buildDataxJsonFile(Map<String, Property> paramsMap)
throws Exception {
// generate json
String fileName = String.format("%s/%s_job.json",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
String json;
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()) {
json = dataXParameters.getJson().replaceAll("\\r\\n", "\n");
} else {
ObjectNode job = JSONUtils.createObjectNode();
job.putArray("content").addAll(buildDataxJobContentJson());
job.set("setting", buildDataxJobSettingJson());
ObjectNode root = JSONUtils.createObjectNode();
root.set("job", job);
root.set("core", buildDataxCoreJson());
json = root.toString();
}
// replace placeholder
json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap));
logger.debug("datax job json : {}", json);
// create datax json file
FileUtils.writeStringToFile(new File(fileName), json, StandardCharsets.UTF_8);
return fileName;
}
/**
* build datax job config
*
* @return collection of datax job config JSONObject
* @throws SQLException if error throws SQLException
*/
private List<ObjectNode> buildDataxJobContentJson() {
DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext();
BaseConnectionParam dataSourceCfg = (BaseConnectionParam) DatasourceUtil.buildConnectionParams(
DbType.of(dataxTaskExecutionContext.getSourcetype()),
dataxTaskExecutionContext.getSourceConnectionParams());
BaseConnectionParam dataTargetCfg = (BaseConnectionParam) DatasourceUtil.buildConnectionParams(
DbType.of(dataxTaskExecutionContext.getTargetType()),
dataxTaskExecutionContext.getTargetConnectionParams());
List<ObjectNode> readerConnArr = new ArrayList<>();
ObjectNode readerConn = JSONUtils.createObjectNode();
ArrayNode sqlArr = readerConn.putArray("querySql");
for (String sql : new String[]{dataXParameters.getSql()}) {
sqlArr.add(sql);
}
ArrayNode urlArr = readerConn.putArray("jdbcUrl");
urlArr.add(DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDsType()), dataSourceCfg));
readerConnArr.add(readerConn);
ObjectNode readerParam = JSONUtils.createObjectNode();
readerParam.put("username", dataSourceCfg.getUser());
readerParam.put("password", decodePassword(dataSourceCfg.getPassword()));
readerParam.putArray("connection").addAll(readerConnArr);
ObjectNode reader = JSONUtils.createObjectNode();
reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype())));
reader.set("parameter", readerParam);
List<ObjectNode> writerConnArr = new ArrayList<>();
ObjectNode writerConn = JSONUtils.createObjectNode();
ArrayNode tableArr = writerConn.putArray("table");
tableArr.add(dataXParameters.getTargetTable());
writerConn.put("jdbcUrl", DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataTargetCfg));
writerConnArr.add(writerConn);
ObjectNode writerParam = JSONUtils.createObjectNode();
writerParam.put("username", dataTargetCfg.getUser());
writerParam.put("password", decodePassword(dataTargetCfg.getPassword()));
String[] columns = parsingSqlColumnNames(DbType.of(dataxTaskExecutionContext.getSourcetype()),
DbType.of(dataxTaskExecutionContext.getTargetType()),
dataSourceCfg, dataXParameters.getSql());
ArrayNode columnArr = writerParam.putArray("column");
for (String column : columns) {
columnArr.add(column);
}
writerParam.putArray("connection").addAll(writerConnArr);
if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) {
ArrayNode preSqlArr = writerParam.putArray("preSql");
for (String preSql : dataXParameters.getPreStatements()) {
preSqlArr.add(preSql);
}
}
if (CollectionUtils.isNotEmpty(dataXParameters.getPostStatements())) {
ArrayNode postSqlArr = writerParam.putArray("postSql");
for (String postSql : dataXParameters.getPostStatements()) {
postSqlArr.add(postSql);
}
}
ObjectNode writer = JSONUtils.createObjectNode();
writer.put("name", DataxUtils.getWriterPluginName(DbType.of(dataxTaskExecutionContext.getTargetType())));
writer.set("parameter", writerParam);
List<ObjectNode> contentList = new ArrayList<>();
ObjectNode content = JSONUtils.createObjectNode();
content.set("reader", reader);
content.set("writer", writer);
contentList.add(content);
return contentList;
}
/**
* build datax setting config
*
* @return datax setting config JSONObject
*/
private ObjectNode buildDataxJobSettingJson() {
ObjectNode speed = JSONUtils.createObjectNode();
speed.put("channel", DATAX_CHANNEL_COUNT);
if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
}
if (dataXParameters.getJobSpeedRecord() > 0) {
speed.put("record", dataXParameters.getJobSpeedRecord());
}
ObjectNode errorLimit = JSONUtils.createObjectNode();
errorLimit.put("record", 0);
errorLimit.put("percentage", 0);
ObjectNode setting = JSONUtils.createObjectNode();
setting.set("speed", speed);
setting.set("errorLimit", errorLimit);
return setting;
}
private ObjectNode buildDataxCoreJson() {
ObjectNode speed = JSONUtils.createObjectNode();
speed.put("channel", DATAX_CHANNEL_COUNT);
if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
}
if (dataXParameters.getJobSpeedRecord() > 0) {
speed.put("record", dataXParameters.getJobSpeedRecord());
}
ObjectNode channel = JSONUtils.createObjectNode();
channel.set("speed", speed);
ObjectNode transport = JSONUtils.createObjectNode();
transport.set("channel", channel);
ObjectNode core = JSONUtils.createObjectNode();
core.set("transport", transport);
return core;
}
/**
* create command
*
* @return shell command file name
* @throws Exception if error throws Exception
*/
private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap)
throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(),
OSUtils.isWindows() ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
// datax python command
StringBuilder sbr = new StringBuilder();
sbr.append(getPythonCommand());
sbr.append(" ");
sbr.append(DATAX_PATH);
sbr.append(" ");
sbr.append(loadJvmEnv(dataXParameters));
sbr.append(jobConfigFilePath);
// replace placeholder
String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr.toString(), ParamUtils.convert(paramsMap));
logger.debug("raw script : {}", dataxCommand);
// create shell command file
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (OSUtils.isWindows()) {
Files.createFile(path);
} else {
Files.createFile(path, attr);
}
Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND);
return fileName;
}
public String getPythonCommand() {
String pythonHome = System.getenv("PYTHON_HOME");
return getPythonCommand(pythonHome);
}
public String getPythonCommand(String pythonHome) {
if (StringUtils.isEmpty(pythonHome)) {
return DATAX_PYTHON;
}
String pythonBinPath = "/bin/" + DATAX_PYTHON;
Matcher matcher = PYTHON_PATH_PATTERN.matcher(pythonHome);
if (matcher.find()) {
return matcher.replaceAll(pythonBinPath);
}
return Paths.get(pythonHome, pythonBinPath).toString();
}
public String loadJvmEnv(DataxParameters dataXParameters) {
int xms = Math.max(dataXParameters.getXms(), 1);
int xmx = Math.max(dataXParameters.getXmx(), 1);
return String.format(JVM_PARAM, xms, xmx);
}
/**
* parsing synchronized column names in SQL statements
*
* @param sourceType the database type of the data source
* @param targetType the database type of the data target
* @param dataSourceCfg the database connection parameters of the data source
* @param sql sql for data synchronization
* @return Keyword converted column names
*/
private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg, String sql) {
String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql);
if (columnNames == null || columnNames.length == 0) {
logger.info("try to execute sql analysis query column name");
columnNames = tryExecuteSqlResolveColumnNames(sourceType, dataSourceCfg, sql);
}
notNull(columnNames, String.format("parsing sql columns failed : %s", sql));
return DataxUtils.convertKeywordsColumns(targetType, columnNames);
}
/**
* try grammatical parsing column
*
* @param dbType database type
* @param sql sql for data synchronization
* @return column name array
* @throws RuntimeException if error throws RuntimeException
*/
private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) {
String[] columnNames;
try {
SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql);
if (parser == null) {
logger.warn("database driver [{}] is not support grammatical analysis sql", dbType);
return new String[0];
}
SQLStatement sqlStatement = parser.parseStatement();
SQLSelectStatement sqlSelectStatement = (SQLSelectStatement) sqlStatement;
SQLSelect sqlSelect = sqlSelectStatement.getSelect();
List<SQLSelectItem> selectItemList = null;
if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) {
SQLSelectQueryBlock block = (SQLSelectQueryBlock) sqlSelect.getQuery();
selectItemList = block.getSelectList();
} else if (sqlSelect.getQuery() instanceof SQLUnionQuery) {
SQLUnionQuery unionQuery = (SQLUnionQuery) sqlSelect.getQuery();
SQLSelectQueryBlock block = (SQLSelectQueryBlock) unionQuery.getRight();
selectItemList = block.getSelectList();
}
notNull(selectItemList,
String.format("select query type [%s] is not support", sqlSelect.getQuery().toString()));
columnNames = new String[selectItemList.size()];
for (int i = 0; i < selectItemList.size(); i++) {
SQLSelectItem item = selectItemList.get(i);
String columnName = null;
if (item.getAlias() != null) {
columnName = item.getAlias();
} else if (item.getExpr() != null) {
if (item.getExpr() instanceof SQLPropertyExpr) {
SQLPropertyExpr expr = (SQLPropertyExpr) item.getExpr();
columnName = expr.getName();
} else if (item.getExpr() instanceof SQLIdentifierExpr) {
SQLIdentifierExpr expr = (SQLIdentifierExpr) item.getExpr();
columnName = expr.getName();
}
} else {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item.toString()));
}
if (columnName == null) {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item.toString()));
}
columnNames[i] = columnName;
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return new String[0];
}
return columnNames;
}
/**
* try to execute sql to resolve column names
*
* @param baseDataSource the database connection parameters
* @param sql sql for data synchronization
* @return column name array
*/
public String[] tryExecuteSqlResolveColumnNames(DbType sourceType, BaseConnectionParam baseDataSource, String sql) {
String[] columnNames;
sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql);
sql = sql.replace(";", "");
try (
Connection connection = DataSourceClientProvider.getInstance().getConnection(sourceType, baseDataSource);
PreparedStatement stmt = connection.prepareStatement(sql);
ResultSet resultSet = stmt.executeQuery()) {
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
columnNames = new String[num];
for (int i = 1; i <= num; i++) {
columnNames[i - 1] = md.getColumnName(i);
}
} catch (SQLException e) {
logger.warn(e.getMessage(), e);
return null;
}
return columnNames;
}
@Override
public AbstractParameters getParameters() {
return dataXParameters;
}
private void notNull(Object obj, String message) {
if (obj == null) {
throw new RuntimeException(message);
}
}
}