blob: d00cc2bd3889415594a0a1d7a33d90eaea4bbaf9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.ExceptionUtil;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.RowResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class KuduInputFormat implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(KuduInputFormat.class);
public KuduInputFormat(String kuduMaster, String tableName, String columnsList) {
this.kuduMaster = kuduMaster;
this.columnsList = Arrays.asList(columnsList.split(","));
this.tableName = tableName;
}
/**
* Declare the global variable KuduClient and use it to manipulate the Kudu table
*/
public KuduClient kuduClient;
/**
* Specify kuduMaster address
*/
public String kuduMaster;
public List<String> columnsList;
public Schema schema;
public String keyColumn;
public static final int TIMEOUTMS = 18000;
/**
* Specifies the name of the table
*/
public String tableName;
public List<ColumnSchema> getColumnsSchemas() {
List<ColumnSchema> columns = null;
try {
schema = kuduClient.openTable(tableName).getSchema();
keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
columns = schema.getColumns();
} catch (KuduException e) {
LOGGER.warn("get table Columns Schemas Fail.", e);
throw new RuntimeException("get table Columns Schemas Fail..", e);
}
return columns;
}
public static SeaTunnelRow getSeaTunnelRowData(RowResult rs, SeaTunnelRowType typeInfo) throws SQLException {
List<Object> fields = new ArrayList<>();
SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
for (int i = 0; i < seaTunnelDataTypes.length; i++) {
Object seatunnelField;
SeaTunnelDataType<?> seaTunnelDataType = seaTunnelDataTypes[i];
if (null == rs.getObject(i)) {
seatunnelField = null;
} else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getBoolean(i);
} else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getByte(i);
} else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getShort(i);
} else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getInt(i);
} else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getLong(i);
} else if (seaTunnelDataType instanceof DecimalType) {
Object value = rs.getObject(i);
seatunnelField = value instanceof BigInteger ?
new BigDecimal((BigInteger) value, 0)
: value;
} else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getFloat(i);
} else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getDouble(i);
} else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
seatunnelField = rs.getString(i);
} else {
throw new IllegalStateException("Unexpected value: " + seaTunnelDataType);
}
fields.add(seatunnelField);
}
return new SeaTunnelRow(fields.toArray());
}
public SeaTunnelRowType getSeaTunnelRowType(List<ColumnSchema> columnSchemaList) {
ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
try {
for (int i = 0; i < columnSchemaList.size(); i++) {
fieldNames.add(columnSchemaList.get(i).getName());
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
}
} catch (Exception e) {
LOGGER.warn("get row type info exception.", e);
throw new PrepareFailException("kudu", PluginType.SOURCE, ExceptionUtil.getMessage(e));
}
return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
}
public void openInputFormat() {
KuduClient.KuduClientBuilder kuduClientBuilder = new
KuduClient.KuduClientBuilder(kuduMaster);
kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
kuduClient = kuduClientBuilder.build();
LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
}
/**
* @param lowerBound The beginning of each slice
* @param upperBound End of each slice
* @return Get the kuduScanner object for each slice
*/
public KuduScanner getKuduBuildSplit(int lowerBound, int upperBound) {
KuduScanner kuduScanner = null;
try {
KuduScanner.KuduScannerBuilder kuduScannerBuilder =
kuduClient.newScannerBuilder(kuduClient.openTable(tableName));
kuduScannerBuilder.setProjectedColumnNames(columnsList);
KuduPredicate lowerPred = KuduPredicate.newComparisonPredicate(
schema.getColumn("" + keyColumn),
KuduPredicate.ComparisonOp.GREATER_EQUAL,
lowerBound);
KuduPredicate upperPred = KuduPredicate.newComparisonPredicate(
schema.getColumn("" + keyColumn),
KuduPredicate.ComparisonOp.LESS,
upperBound);
kuduScanner = kuduScannerBuilder.addPredicate(lowerPred)
.addPredicate(upperPred).build();
} catch (KuduException e) {
LOGGER.warn("get the Kuduscan object for each splice exception", e);
throw new RuntimeException("get the Kuduscan object for each splice exception.", e);
}
return kuduScanner;
}
public void closeInputFormat() {
if (kuduClient != null) {
try {
kuduClient.close();
} catch (KuduException e) {
LOGGER.warn("Kudu Client close failed.", e);
throw new RuntimeException("Kudu Client close failed.", e);
} finally {
kuduClient = null;
}
}
}
}