blob: 572beaf8639013c306600011e2f41e3b572e8262 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.sync.externalpipe;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.sync.datasource.PipeOpManager;
import org.apache.iotdb.db.sync.datasource.PipeStorageGroupInfo;
import org.apache.iotdb.db.sync.externalpipe.operation.DeleteOperation;
import org.apache.iotdb.db.sync.externalpipe.operation.InsertOperation;
import org.apache.iotdb.db.sync.externalpipe.operation.Operation;
import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
import org.apache.iotdb.pipe.external.api.ExternalPipeSinkWriterStatus;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriter;
import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static java.lang.Math.abs;
/**
* This class handles 1 external plugin's work, including multiple working threads. All
* StorageGroups will be distributed to different thread according to hash value. Every working
* thread is responsible for put several StorageGroups' data into external sink.
*/
public class ExtPipePlugin {
private static final Logger logger = LoggerFactory.getLogger(ExtPipePlugin.class);
private String extPipeTypeName;
// ParamKey => ParamValue
Map<String, String> sinkParams;
private PipeOpManager pipeOpManager;
private ExtPipePluginManager extPipePluginManager;
private IExternalPipeSinkWriterFactory pipeSinkWriterFactory;
private ExtPipePluginConfiguration configuration;
private volatile boolean alive = false;
// ArrayList: save DataTransmissionTasks' info
private List<DataTransmissionTask> dataTransmissionTasks;
private ExecutorService executorService;
// SG => DataCommitIndex
private Map<String, Long> dataCommitMap = new ConcurrentHashMap<>();
// Writer method name => (exception message => count)
private Map<String, Map<String, AtomicInteger>> writerInvocationFailures;
private int timestampDivisor;
public ExtPipePlugin(
String extPipeTypeName,
Map<String, String> sinkParams,
ExtPipePluginManager extPipePluginManager,
PipeOpManager pipeOpManager) {
this.extPipeTypeName = extPipeTypeName;
this.sinkParams = sinkParams;
this.pipeOpManager = pipeOpManager;
this.extPipePluginManager = extPipePluginManager;
String timePrecision = IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision();
switch (timePrecision) {
case "ms":
timestampDivisor = 1;
break;
case "us":
timestampDivisor = 1_000;
break;
case "ns":
timestampDivisor = 1_000_000;
break;
default:
throw new IllegalArgumentException("Unrecognized time precision: " + timePrecision);
}
}
@TestOnly
public ExtPipePlugin(
String Name,
IExternalPipeSinkWriterFactory factory,
ExtPipePluginConfiguration conf,
TsFilePipe tsFilePipe) {}
@TestOnly
public void setPipeSinkWriterFactory(IExternalPipeSinkWriterFactory pipeSinkWriterFactory) {
this.pipeSinkWriterFactory = pipeSinkWriterFactory;
}
/**
* Get the parameter value from sinkParams that is from CMD
*
* @param paramName
* @param defaultValue
* @return
*/
private int getIntParam(String paramName, int defaultValue) {
String valueStr = sinkParams.get(paramName);
if (valueStr == null) {
return defaultValue;
}
return Integer.parseInt(valueStr);
}
/**
* start and init all working threads
*
* @throws IOException
*/
public void start() throws IOException {
logger.debug("ExtPipePlugin start(), extPipeName={}.", extPipeTypeName);
if (alive) {
String errMsg = "Can not re-run alive External pipe: " + extPipeTypeName + ".";
logger.error(errMsg);
throw new IllegalStateException(errMsg);
}
int threadNum = getIntParam("thread_num", ExtPipePluginConfiguration.DEFAULT_NUM_OF_THREADS);
int batchSize =
getIntParam("batch_size", ExtPipePluginConfiguration.DEFAULT_OPERATION_BATCH_SIZE);
int attemptTimes =
getIntParam("attempt_times", ExtPipePluginConfiguration.DEFAULT_ATTEMPT_TIMES);
int backoffInterval =
getIntParam("retry_interval", ExtPipePluginConfiguration.DEFAULT_BACKOFF_INTERVAL);
try {
// == Pipe configuration
configuration =
new ExtPipePluginConfiguration.Builder(extPipeTypeName)
.numOfThreads(threadNum)
.operationBatchSize(batchSize)
.attemptTimes(attemptTimes)
.backOffInterval(backoffInterval)
.build();
if (pipeSinkWriterFactory == null) {
pipeSinkWriterFactory =
ExtPipePluginRegister.getInstance().getWriteFactory(extPipeTypeName);
}
// Try to init externalPipeSinkWriterFactory, also check the PIPESink parameter
pipeSinkWriterFactory.initialize(sinkParams);
} catch (Exception e) {
logger.error("Failed to start External Pipe: {}.", extPipeTypeName, e);
throw new IOException(
"Failed to start External Pipe: " + extPipeTypeName + ". " + e.getMessage());
}
alive = true;
logger.info("External pipe " + extPipeTypeName + " begin to START");
// == Launch pipe worker threads
executorService =
IoTDBThreadPoolFactory.newFixedThreadPool(
threadNum, ThreadName.EXT_PIPE_PLUGIN_WORKER.getName() + "-" + extPipeTypeName);
// == Start threads that will run external PiPeSink plugin
dataTransmissionTasks = new ArrayList<>(threadNum);
for (int i = 0; i < threadNum; i++) {
IExternalPipeSinkWriter writer = pipeSinkWriterFactory.get();
DataTransmissionTask dataTransmissionTask =
new DataTransmissionTask(writer, i, configuration);
dataTransmissionTasks.add(dataTransmissionTask);
executorService.submit(dataTransmissionTask);
}
writerInvocationFailures = new ConcurrentHashMap<>();
logger.info("External pipe " + extPipeTypeName + " finish START.");
}
/** Stop all working threads */
public void stop() {
if (!alive) {
String errMsg = "Error: External pipe " + extPipeTypeName + " has not started.";
logger.error(errMsg);
throw new IllegalStateException(errMsg);
}
alive = false;
executorService.shutdown();
boolean isExecutorServiceTerminated = false;
try {
isExecutorServiceTerminated = executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error(
"Interrupted when waiting for the termination of external pipe, " + extPipeTypeName, e);
} finally {
if (!isExecutorServiceTerminated) {
logger.warn(
"ExtPipePlugin stop(), graceful termination of external pipe {} timed out. So force terminating working threads.",
extPipeTypeName);
executorService.shutdownNow();
}
}
}
public boolean isAlive() {
return alive;
}
/**
* Collect the status of External Pipe.
*
* @return
*/
public ExternalPipeStatus getStatus() {
ExternalPipeStatus status = new ExternalPipeStatus();
try {
List<ExternalPipeSinkWriterStatus> writerStatuses =
dataTransmissionTasks.stream()
.map(DataTransmissionTask::getStatus)
.collect(Collectors.toList());
status.setWriterStatuses(writerStatuses);
} catch (Exception e) {
handleExceptionsThrownByWriter("getStatus", e);
}
status.setAlive(alive);
status.setWriterInvocationFailures(writerInvocationFailures);
return status;
}
private void handleExceptionsThrownByWriter(String method, Exception e) {
writerInvocationFailures.computeIfAbsent(method, m -> new ConcurrentHashMap<>());
String eMsg = e.getMessage();
if (eMsg == null) {
eMsg = "N/A";
}
writerInvocationFailures.get(method).computeIfAbsent(eMsg, msg -> new AtomicInteger(0));
writerInvocationFailures.get(method).get(eMsg).incrementAndGet();
logger.info("Exception thrown from writer", e);
}
/**
* Get the data committed index of dedicated StorageGroup
*
* @param sgName
* @return
*/
public long getDataCommitIndex(String sgName) {
return dataCommitMap.getOrDefault(sgName, Long.MIN_VALUE);
}
private int getThreadIndex(String sgName) {
return (abs(sgName.hashCode()) % configuration.getNumOfThreads());
}
/**
* Notify every working thread task that new data arrive
*
* @param sgName
* @param newDataBeginIndex
* @param newDataCount
*/
public void notifyNewDataArrive(String sgName, long newDataBeginIndex, long newDataCount) {
logger.debug(
"notifyNewDataArrive(), sgName={}, newDataBeginIndex={}, newDataCount={}",
sgName,
newDataBeginIndex,
newDataCount);
DataTransmissionTask dataTransmissionTask = dataTransmissionTasks.get(getThreadIndex(sgName));
if (dataTransmissionTask != null) {
dataTransmissionTask.notifyNewDataArrive(sgName, newDataBeginIndex, newDataCount);
}
}
/**
* working thread class Every thread take 1 IExternalPipeSinkWriter and is responsible for several
* StorageGroup's data
*/
private class DataTransmissionTask implements Callable<Void> {
private final IExternalPipeSinkWriter writer;
private final int threadIndex;
private final ExtPipePluginConfiguration configuration;
// SG ==> PipeStorageGroupInfo
private Map<String, PipeStorageGroupInfo> sgInfoMap;
private long nextIndex;
private String lastReadSgName;
// condition-lock for notifying new data arriving
private byte[] newDataLocker = new byte[0];
private long newDataCounter = 0L;
DataTransmissionTask(
IExternalPipeSinkWriter writer, int threadIndex, ExtPipePluginConfiguration configuration)
throws IOException {
this.writer = Validate.notNull(writer);
this.threadIndex = threadIndex;
this.configuration = configuration;
this.sgInfoMap = configuration.getBucketSgInfoMap(threadIndex);
this.writer.open();
}
/**
* Get 1 SG's next data index that will be read according to past reading record. If there is no
* SG in local record, get the first valid data index in PIPE data source.
*
* @param sgName
* @return
*/
private long getSgNextDataIndex(String sgName) {
PipeStorageGroupInfo sgInfo = sgInfoMap.get(sgName);
if (sgInfo != null) {
return sgInfo.getNextReadIndex();
}
long nextReadIndex = pipeOpManager.getCommittedIndex(sgName) + 1;
sgInfoMap.put(sgName, new PipeStorageGroupInfo(sgName, nextReadIndex - 1, nextReadIndex));
return nextReadIndex;
}
/**
* save the data index info to local record.
*
* @param sgName
* @param nextReadIndex
* @param committedIndex
*/
private void setSgNextDataIndex(String sgName, long nextReadIndex, long committedIndex) {
logger.debug(
"setSgNextDataIndex(), sgName={}, nextReadIndex={}, committedIndex={}.",
sgName,
nextReadIndex,
committedIndex);
PipeStorageGroupInfo sgInfo =
sgInfoMap.computeIfAbsent(sgName, k -> new PipeStorageGroupInfo(sgName, -1, 0));
sgInfo.setNextReadIndex(nextReadIndex);
sgInfo.setCommittedIndex(committedIndex);
}
/**
* commit data to let Pipe data source may remove useless data.
*
* @param sgName
* @param committedIndex
*/
private void commitData(String sgName, long committedIndex) throws IOException {
logger.debug("commitData(), sgName={}, committedIndex={}.", sgName, committedIndex);
dataCommitMap.put(sgName, committedIndex);
if (pipeOpManager.opBlockNeedCommit(sgName, committedIndex)) {
extPipePluginManager.triggerCommit(sgName, committedIndex);
}
}
/**
* check whether PIPE data source has new data for dedicate StorageGroup
*
* @param sgName
* @return
*/
private boolean sgHasNewData(String sgName) {
PipeStorageGroupInfo pipeStorageGroupInfo = sgInfoMap.get(sgName);
if (pipeStorageGroupInfo == null) {
return true;
}
long nextReadIndex = pipeStorageGroupInfo.getNextReadIndex();
long nextIndex = pipeOpManager.getNextIndex(sgName);
if (nextIndex > nextReadIndex) {
return true;
}
return false;
}
/**
* Notify current working thread that new data arrive.
*
* @param sgName
* @param newDataBeginIndex
* @param newDataCount
*/
public void notifyNewDataArrive(String sgName, long newDataBeginIndex, long newDataCount) {
synchronized (newDataLocker) {
newDataCounter++;
newDataLocker.notifyAll();
}
}
/**
* Check/Wait new data/operation
*
* @return StorageGroup Name that has new data
* @throws InterruptedException
*/
public String waitForOperations() throws InterruptedException {
if (lastReadSgName != null) {
if (sgHasNewData(lastReadSgName)) {
return lastReadSgName;
}
lastReadSgName = null;
}
// find new data for the StorageGroup in sgSet
while (alive) {
Set<String> sgSet = pipeOpManager.getSgSet();
Iterator<String> iter = sgSet.iterator();
while (iter.hasNext()) {
String sgName = iter.next();
if (threadIndex != getThreadIndex(sgName)) {
continue;
}
if (sgHasNewData(sgName)) {
lastReadSgName = sgName;
return sgName;
}
}
synchronized (newDataLocker) {
if (newDataCounter <= 0L) {
try {
newDataLocker.wait(15000); // maximum time 15 seconds
} catch (InterruptedException ignored) {
}
}
newDataCounter = 0L;
}
}
return null;
}
/**
* Working thread Entrance. Read data from pipe and send data to external sink.
*
* @return
* @throws Exception
*/
@Override
public Void call() throws Exception {
logger.info("ExternalPipeWorker start. thread={}.", Thread.currentThread().getName());
while (alive) {
try {
// String sgName = pipeOpManager.waitForOperations(threadIndex, sgInfoMap);
String sgName = waitForOperations();
if (sgName == null) {
continue;
}
Operation operation = null;
try {
operation =
pipeOpManager.getOperation(
sgName, getSgNextDataIndex(sgName), configuration.getOperationBatchSize());
} catch (IOException e) {
continue;
}
if ((operation == null) || (operation.getDataCount() <= 0)) {
continue;
}
if (!handleOperationWithRetry(sgName, operation)) {
logger.error(
"Failed to handle operation after "
+ configuration.getAttemptTimes()
+ " attempts: "
+ operation);
}
if (!flushWithRetry()) {
logger.error(
"Failed to flush operations after "
+ configuration.getAttemptTimes()
+ " attempts: startIndex="
+ operation.getStartIndex()
+ ",endIndex="
+ operation.getEndIndex());
}
// TODO: Reminder, May cause data loss if the flush failed.
// Update the next index and commit to the pipe source.
nextIndex = operation.getEndIndex();
setSgNextDataIndex(sgName, nextIndex, nextIndex - 1);
commitData(sgName, nextIndex - 1);
} catch (InterruptedException e) {
break;
} catch (Exception e) {
logger.error("Unexpected system exception", e);
}
}
try {
writer.close();
} catch (IOException e) {
handleExceptionsThrownByWriter("close", e);
logger.info("Exception happened when closing the writer", e);
}
logger.info("ExternalPipeWorker exits. Thread={}", Thread.currentThread().getName());
return null;
}
public ExternalPipeSinkWriterStatus getStatus() {
return writer.getStatus();
}
private boolean handleOperationWithRetry(String sgName, Operation operation) {
boolean succeed = false;
int attemptTimes = configuration.getAttemptTimes();
while (alive && attemptTimes > 0) {
// Retry
try {
pushOperationToExtPipe(sgName, operation);
succeed = true;
break;
} catch (Exception e) {
logger.error("When handle operation {}, Exception", operation.getOperationType(), e);
handleExceptionsThrownByWriter(operation.getOperationTypeName(), e);
}
attemptTimes--;
// Backoff
try {
Thread.sleep(configuration.getBackOffInterval());
} catch (InterruptedException ignored) {
}
}
return succeed;
}
private void pushOperationToExtPipe(String sgName, Operation operation)
throws IOException, IllegalArgumentException {
if (operation instanceof InsertOperation) {
handleInsertOperation(sgName, (InsertOperation) operation);
return;
}
if (operation instanceof DeleteOperation) {
handleDeleteOperation(sgName, (DeleteOperation) operation);
return;
}
logger.error("pushOperationToExtPipe(), Unrecognized Operation: {}", operation);
throw new IllegalArgumentException(
"pushOperationToExtPipe(), Unrecognized Operation:" + operation);
}
private void handleInsertOperation(String sgName, InsertOperation operation)
throws IOException, IllegalArgumentException {
for (Pair<MeasurementPath, List<TimeValuePair>> dataPair : operation.getDataList()) {
MeasurementPath measurementPath = dataPair.left;
for (TimeValuePair tvPair : dataPair.right) {
if (tvPair == null) {
continue;
}
String[] path = measurementPath.getNodes();
long timestampInMs = tvPair.getTimestamp() / timestampDivisor;
switch (tvPair.getValue().getDataType()) {
case BOOLEAN:
writer.insertBoolean(sgName, path, timestampInMs, tvPair.getValue().getBoolean());
break;
case INT32:
writer.insertInt32(sgName, path, timestampInMs, tvPair.getValue().getInt());
break;
case INT64:
writer.insertInt64(sgName, path, timestampInMs, tvPair.getValue().getLong());
break;
case FLOAT:
writer.insertFloat(sgName, path, timestampInMs, tvPair.getValue().getFloat());
break;
case DOUBLE:
writer.insertDouble(sgName, path, timestampInMs, tvPair.getValue().getDouble());
break;
case TEXT:
writer.insertText(sgName, path, timestampInMs, tvPair.getValue().getStringValue());
break;
// case VECTOR:
// writer.insertVector(
// sgName,
// path,
// Arrays.stream(tvPair.getValue().getVector())
// .map(TsPrimitiveType::getDataType)
// .map(type -> DataType.fromTsDataType(type.serialize()))
// .toArray(DataType[]::new),
// timestampInMs,
// Arrays.stream(tvPair.getValue().getVector())
// .map(TsPrimitiveType::getValue)
// .toArray(Object[]::new));
// break;
default:
throw new IllegalArgumentException(
"Unrecognized data type " + tvPair.getValue().getDataType());
}
}
}
}
private void handleDeleteOperation(String sgName, DeleteOperation deleteOperation)
throws IOException {
writer.delete(
sgName,
deleteOperation.getDeletePathStr(),
deleteOperation.getStartTime() / timestampDivisor,
deleteOperation.getEndTime() / timestampDivisor);
}
private boolean flushWithRetry() {
try {
writer.flush();
} catch (IOException e1) {
logger.error("Exception happened when flushing operations", e1);
handleExceptionsThrownByWriter("flush", e1);
boolean succeed = false;
int attemptTimes = 1;
while (alive && attemptTimes < configuration.getAttemptTimes()) {
// Backoff
try {
Thread.sleep(configuration.getBackOffInterval());
} catch (InterruptedException ignored) {
}
// Retry
try {
writer.flush();
succeed = true;
break;
} catch (Exception e2) {
handleExceptionsThrownByWriter("flush", e2);
}
attemptTimes += 1;
}
return succeed;
}
return true;
}
}
}