blob: c56204c38ada0015f19a392aba2efa997406662f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.connectors.seatunnel.iotdb.sink;
import org.apache.seatunnel.connectors.seatunnel.iotdb.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.iotdb.serialize.IoTDBRecord;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
public class IoTDBSinkClient {
private final SinkConfig sinkConfig;
private final List<IoTDBRecord> batchList;
private Session session;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
private volatile boolean initialize;
private volatile Exception flushException;
public IoTDBSinkClient(SinkConfig sinkConfig) {
this.sinkConfig = sinkConfig;
this.batchList = new ArrayList<>();
}
private void tryInit() throws IOException {
if (initialize) {
return;
}
Session.Builder sessionBuilder = new Session.Builder()
.nodeUrls(sinkConfig.getNodeUrls())
.username(sinkConfig.getUsername())
.password(sinkConfig.getPassword());
if (sinkConfig.getThriftDefaultBufferSize() != null) {
sessionBuilder.thriftDefaultBufferSize(sinkConfig.getThriftDefaultBufferSize());
}
if (sinkConfig.getThriftMaxFrameSize() != null) {
sessionBuilder.thriftMaxFrameSize(sinkConfig.getThriftMaxFrameSize());
}
if (sinkConfig.getZoneId() != null) {
sessionBuilder.zoneId(sinkConfig.getZoneId());
}
session = sessionBuilder.build();
try {
if (sinkConfig.getConnectionTimeoutInMs() != null) {
session.open(sinkConfig.getEnableRPCCompression(), sinkConfig.getConnectionTimeoutInMs());
} else if (sinkConfig.getEnableRPCCompression() != null) {
session.open(sinkConfig.getEnableRPCCompression());
} else {
session.open();
}
} catch (IoTDBConnectionException e) {
log.error("Initialize IoTDB client failed.", e);
throw new IOException(e);
}
if (sinkConfig.getBatchIntervalMs() != null) {
scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("IoTDB-sink-output-%s").build());
scheduledFuture = scheduler.scheduleAtFixedRate(
() -> {
try {
flush();
} catch (IOException e) {
flushException = e;
}
},
sinkConfig.getBatchIntervalMs(),
sinkConfig.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
}
initialize = true;
}
public synchronized void write(IoTDBRecord record) throws IOException {
tryInit();
checkFlushException();
batchList.add(record);
if (sinkConfig.getBatchSize() > 0
&& batchList.size() >= sinkConfig.getBatchSize()) {
flush();
}
}
public synchronized void close() throws IOException {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
scheduler.shutdown();
}
flush();
try {
session.close();
} catch (IoTDBConnectionException e) {
log.error("Close IoTDB client failed.", e);
throw new IOException("Close IoTDB client failed.", e);
}
}
private synchronized void flush() throws IOException {
checkFlushException();
if (batchList.isEmpty()) {
return;
}
BatchRecords batchRecords = new BatchRecords(batchList);
for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) {
try {
if (batchRecords.getTypesList().isEmpty()) {
session.insertRecords(batchRecords.getDeviceIds(),
batchRecords.getTimestamps(),
batchRecords.getMeasurementsList(),
batchRecords.getStringValuesList());
} else {
session.insertRecords(batchRecords.getDeviceIds(),
batchRecords.getTimestamps(),
batchRecords.getMeasurementsList(),
batchRecords.getTypesList(),
batchRecords.getValuesList());
}
} catch (IoTDBConnectionException | StatementExecutionException e) {
log.error("Writing records to IoTDB failed, retry times = {}", i, e);
if (i >= sinkConfig.getMaxRetries()) {
throw new IOException("Writing records to IoTDB failed.", e);
}
try {
long backoff = Math.min(sinkConfig.getRetryBackoffMultiplierMs() * i,
sinkConfig.getMaxRetryBackoffMs());
Thread.sleep(backoff);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException(
"Unable to flush; interrupted while doing another attempt.", e);
}
}
}
batchList.clear();
}
private void checkFlushException() {
if (flushException != null) {
throw new RuntimeException("Writing records to IoTDB failed.", flushException);
}
}
@Getter
private class BatchRecords {
private List<String> deviceIds;
private List<Long> timestamps;
private List<List<String>> measurementsList;
private List<List<TSDataType>> typesList;
private List<List<Object>> valuesList;
public BatchRecords(List<IoTDBRecord> batchList) {
int batchSize = batchList.size();
this.deviceIds = new ArrayList<>(batchSize);
this.timestamps = new ArrayList<>(batchSize);
this.measurementsList = new ArrayList<>(batchSize);
this.typesList = new ArrayList<>(batchSize);
this.valuesList = new ArrayList<>(batchSize);
for (IoTDBRecord record : batchList) {
deviceIds.add(record.getDevice());
timestamps.add(record.getTimestamp());
measurementsList.add(record.getMeasurements());
if (record.getTypes() != null && !record.getTypes().isEmpty()) {
typesList.add(record.getTypes());
}
valuesList.add(record.getValues());
}
}
private List<List<String>> getStringValuesList() {
List<?> tmp = valuesList;
return (List<List<String>>) tmp;
}
}
}