blob: f73b191ca075484fa2002c363adc4c49482846e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.SessionConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
/**
* A Kudu outputFormat
*/
public class KuduOutputFormat
implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(KuduOutputFormat.class);
private String kuduMaster;
private String kuduTableName;
private KuduClient kuduClient;
private KuduSession kuduSession;
private KuduTable kuduTable;
public static final long TIMEOUTMS = 18000;
public static final long SESSIONTIMEOUTMS = 100000;
public KuduOutputFormat(KuduSinkConfig kuduSinkConfig) {
this.kuduMaster = kuduSinkConfig.getKuduMaster();
this.kuduTableName = kuduSinkConfig.getKuduTableName();
init();
}
public void write(SeaTunnelRow element) {
Insert insert = kuduTable.newInsert();
Schema schema = kuduTable.getSchema();
int columnCount = schema.getColumnCount();
PartialRow row = insert.getRow();
for (int columnIndex = 0; columnIndex < columnCount; columnIndex++) {
ColumnSchema col = schema.getColumnByIndex(columnIndex);
try {
switch (col.getType()) {
case BOOL:
row.addBoolean(columnIndex, (Boolean) element.getField(columnIndex));
break;
case INT8:
row.addByte(columnIndex, (Byte) element.getField(columnIndex));
break;
case INT16:
row.addShort(columnIndex, (Short) element.getField(columnIndex));
break;
case INT32:
row.addInt(columnIndex, (Integer) element.getField(columnIndex));
break;
case INT64:
row.addLong(columnIndex, (Long) element.getField(columnIndex));
break;
case UNIXTIME_MICROS:
if (element.getField(columnIndex) instanceof Timestamp) {
row.addTimestamp(columnIndex, (Timestamp) element.getField(columnIndex));
} else {
row.addLong(columnIndex, (Long) element.getField(columnIndex));
}
break;
case FLOAT:
row.addFloat(columnIndex, (Float) element.getField(columnIndex));
break;
case DOUBLE:
row.addDouble(columnIndex, (Double) element.getField(columnIndex));
break;
case STRING:
row.addString(columnIndex, element.getField(columnIndex).toString());
break;
case BINARY:
if (element.getField(columnIndex) instanceof byte[]) {
row.addBinary(columnIndex, (byte[]) element.getField(columnIndex));
} else {
row.addBinary(columnIndex, (ByteBuffer) element.getField(columnIndex));
}
break;
case DECIMAL:
row.addDecimal(columnIndex, (BigDecimal) element.getField(columnIndex));
break;
default:
throw new IllegalArgumentException("Unsupported column type: " + col.getType());
}
} catch (ClassCastException e) {
e.printStackTrace();
throw new IllegalArgumentException(
"Value type does not match column type " + col.getType() +
" for column " + col.getName());
}
}
try {
kuduSession.apply(insert);
} catch (KuduException e) {
LOGGER.warn("kudu session insert data fail.", e);
throw new RuntimeException("kudu session insert data fail.", e);
}
}
public void init() {
KuduClient.KuduClientBuilder kuduClientBuilder = new
KuduClient.KuduClientBuilder(kuduMaster);
kuduClientBuilder.defaultOperationTimeoutMs(TIMEOUTMS);
this.kuduClient = kuduClientBuilder.build();
this.kuduSession = kuduClient.newSession();
this.kuduSession.setTimeoutMillis(SESSIONTIMEOUTMS);
this.kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
try {
kuduTable = kuduClient.openTable(kuduTableName);
} catch (KuduException e) {
LOGGER.warn("Failed to initialize the Kudu client.", e);
throw new RuntimeException("Failed to initialize the Kudu client.", e);
}
LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
}
public void closeOutputFormat() {
if (kuduClient != null) {
try {
kuduClient.close();
kuduSession.close();
} catch (KuduException e) {
LOGGER.warn("Kudu Client close failed.", e);
} finally {
kuduClient = null;
kuduSession = null;
}
}
}
}