blob: 0d9099335541a1c4db16a9603aa6a239e27f62a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.snapshot.SnapshotLoader;
import org.apache.iotdb.db.engine.snapshot.SnapshotTaker;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
public class DataRegionStateMachine extends BaseStateMachine {
private static final Logger logger = LoggerFactory.getLogger(DataRegionStateMachine.class);
private static final FragmentInstanceManager QUERY_INSTANCE_MANAGER =
FragmentInstanceManager.getInstance();
private DataRegion region;
private static final int MAX_REQUEST_CACHE_SIZE = 5;
private static final long CACHE_WINDOW_TIME_IN_MS =
IoTDBDescriptor.getInstance().getConfig().getCacheWindowTimeInMs();
private ConcurrentHashMap<String, SyncLogCacheQueue> cacheQueueMap;
public DataRegionStateMachine(DataRegion region) {
this.region = region;
this.cacheQueueMap = new ConcurrentHashMap<>();
}
@Override
public void start() {}
@Override
public void stop() {}
@Override
public boolean isReadOnly() {
return CommonDescriptor.getInstance().getConfig().isReadOnly();
}
@Override
public boolean takeSnapshot(File snapshotDir) {
try {
return new SnapshotTaker(region).takeFullSnapshot(snapshotDir.getAbsolutePath(), true);
} catch (Exception e) {
logger.error(
"Exception occurs when taking snapshot for {}-{} in {}",
region.getStorageGroupName(),
region.getDataRegionId(),
snapshotDir,
e);
return false;
}
}
@Override
public void loadSnapshot(File latestSnapshotRootDir) {
DataRegion newRegion =
new SnapshotLoader(
latestSnapshotRootDir.getAbsolutePath(),
region.getStorageGroupName(),
region.getDataRegionId())
.loadSnapshotForStateMachine();
if (newRegion == null) {
logger.error("Fail to load snapshot from {}", latestSnapshotRootDir);
return;
}
this.region = newRegion;
try {
StorageEngineV2.getInstance()
.setDataRegion(new DataRegionId(Integer.parseInt(region.getDataRegionId())), region);
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
BloomFilterCache.getInstance().clear();
} catch (Exception e) {
logger.error("Exception occurs when replacing data region in storage engine.", e);
}
}
/**
* This method is used for write of MultiLeader SyncLog. By this method, we can keep write order
* in follower the same as the leader. And besides order insurance, we can make the
* deserialization of PlanNode to be concurrent
*/
private class SyncLogCacheQueue {
private final String sourcePeerId;
private final Lock queueLock = new ReentrantLock();
private final Condition queueSortCondition = queueLock.newCondition();
private final PriorityQueue<InsertNodeWrapper> requestCache;
private long nextSyncIndex = -1;
public SyncLogCacheQueue(String sourcePeerId, int queueSize, long timeout) {
this.sourcePeerId = sourcePeerId;
this.requestCache = new PriorityQueue<>();
}
/**
* This method is used for write of MultiLeader SyncLog. By this method, we can keep write order
* in follower the same as the leader. And besides order insurance, we can make the
* deserialization of PlanNode to be concurrent
*/
private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper insertNodeWrapper) {
queueLock.lock();
try {
requestCache.add(insertNodeWrapper);
// If the peek is not hold by current thread, it should notify the corresponding thread to
// process the peek when the queue is full
if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
&& requestCache.peek().getStartSyncIndex() != insertNodeWrapper.getStartSyncIndex()) {
queueSortCondition.signalAll();
}
while (true) {
// If current InsertNode is the next target InsertNode, write it
if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
requestCache.remove(insertNodeWrapper);
nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
break;
}
// If all write thread doesn't hit nextSyncIndex and the heap is full, write
// the peek request. This is used to keep the whole write correct when nextSyncIndex
// is not set. We won't persist the value of nextSyncIndex to reduce the complexity.
// There are some cases that nextSyncIndex is not set:
// 1. When the system was just started
// 2. When some exception occurs during SyncLog
if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
&& requestCache.peek().getStartSyncIndex() == insertNodeWrapper.getStartSyncIndex()) {
requestCache.remove();
nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
break;
}
try {
boolean timeout =
!queueSortCondition.await(CACHE_WINDOW_TIME_IN_MS, TimeUnit.MILLISECONDS);
if (timeout) {
// although the timeout is triggered, current thread cannot write its request
// if current thread does not hold the peek request. And there should be some
// other thread who hold the peek request. In this scenario, current thread
// should go into await again and wait until its request becoming peek request
if (requestCache.peek().getStartSyncIndex()
== insertNodeWrapper.getStartSyncIndex()) {
// current thread hold the peek request thus it can write the peek immediately.
logger.info(
"waiting target request timeout. current index: {}, target index: {}",
insertNodeWrapper.getStartSyncIndex(),
nextSyncIndex);
requestCache.remove(insertNodeWrapper);
break;
}
}
} catch (InterruptedException e) {
logger.warn(
"current waiting is interrupted. SyncIndex: {}. Exception: {}",
insertNodeWrapper.getStartSyncIndex(),
e);
Thread.currentThread().interrupt();
}
}
logger.debug(
"source = {}, region = {}, queue size {}, startSyncIndex = {}, endSyncIndex = {}",
sourcePeerId,
region.getDataRegionId(),
requestCache.size(),
insertNodeWrapper.getStartSyncIndex(),
insertNodeWrapper.getEndSyncIndex());
List<TSStatus> subStatus = new LinkedList<>();
for (PlanNode planNode : insertNodeWrapper.getInsertNodes()) {
subStatus.add(write(planNode));
}
queueSortCondition.signalAll();
return new TSStatus().setSubStatus(subStatus);
} finally {
queueLock.unlock();
}
}
}
private static class InsertNodeWrapper implements Comparable<InsertNodeWrapper> {
private final long startSyncIndex;
private final long endSyncIndex;
private final List<PlanNode> insertNodes;
public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
this.startSyncIndex = startSyncIndex;
this.endSyncIndex = endSyncIndex;
this.insertNodes = new LinkedList<>();
}
@Override
public int compareTo(InsertNodeWrapper o) {
return Long.compare(startSyncIndex, o.startSyncIndex);
}
public void add(PlanNode insertNode) {
this.insertNodes.add(insertNode);
}
public long getStartSyncIndex() {
return startSyncIndex;
}
public long getEndSyncIndex() {
return endSyncIndex;
}
public List<PlanNode> getInsertNodes() {
return insertNodes;
}
}
private InsertNodeWrapper deserializeAndWrap(BatchIndexedConsensusRequest batchRequest) {
InsertNodeWrapper insertNodeWrapper =
new InsertNodeWrapper(batchRequest.getStartSyncIndex(), batchRequest.getEndSyncIndex());
for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
insertNodeWrapper.add(grabInsertNode(indexedRequest));
}
return insertNodeWrapper;
}
private PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
List<InsertNode> insertNodes = new ArrayList<>(indexedRequest.getRequests().size());
for (IConsensusRequest req : indexedRequest.getRequests()) {
// PlanNode in IndexedConsensusRequest should always be InsertNode
PlanNode planNode = getPlanNode(req);
if (planNode instanceof InsertNode) {
InsertNode innerNode = (InsertNode) planNode;
innerNode.setSearchIndex(indexedRequest.getSearchIndex());
insertNodes.add(innerNode);
} else if (indexedRequest.getRequests().size() == 1) {
// If the planNode is not InsertNode, it is expected that the IndexedConsensusRequest only
// contains one request
return planNode;
} else {
throw new IllegalArgumentException(
"PlanNodes in IndexedConsensusRequest are not InsertNode and the size of requests are larger than 1");
}
}
return mergeInsertNodes(insertNodes);
}
@Override
public List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
try {
return new SnapshotLoader(
latestSnapshotRootDir.getAbsolutePath(),
region.getStorageGroupName(),
region.getDataRegionId())
.getSnapshotFileInfo().stream().map(File::toPath).collect(Collectors.toList());
} catch (IOException e) {
logger.error(
"Meets error when getting snapshot files for {}-{}",
region.getStorageGroupName(),
region.getDataRegionId(),
e);
return null;
}
}
@Override
public TSStatus write(IConsensusRequest request) {
PlanNode planNode;
try {
if (request instanceof IndexedConsensusRequest) {
IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) request;
planNode = grabInsertNode(indexedRequest);
} else if (request instanceof BatchIndexedConsensusRequest) {
InsertNodeWrapper insertNodeWrapper =
deserializeAndWrap((BatchIndexedConsensusRequest) request);
String sourcePeerId = ((BatchIndexedConsensusRequest) request).getSourcePeerId();
return cacheQueueMap
.computeIfAbsent(
sourcePeerId,
k -> new SyncLogCacheQueue(k, MAX_REQUEST_CACHE_SIZE, CACHE_WINDOW_TIME_IN_MS))
.cacheAndInsertLatestNode(insertNodeWrapper);
} else {
planNode = getPlanNode(request);
}
return write(planNode);
} catch (IllegalArgumentException e) {
logger.error(e.getMessage(), e);
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
}
/**
* Merge insert nodes sharing same search index ( e.g. tablet-100, tablet-100, tablet-100 will be
* merged to one multi-tablet). <br>
* Notice: the continuity of insert nodes sharing same search index should be protected by the
* upper layer.
*/
private InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
int size = insertNodes.size();
if (size == 0) {
throw new RuntimeException();
}
if (size == 1) {
return insertNodes.get(0);
}
InsertNode result;
if (insertNodes.get(0) instanceof InsertTabletNode) { // merge to InsertMultiTabletsNode
List<Integer> index = new ArrayList<>(size);
List<InsertTabletNode> insertTabletNodes = new ArrayList<>(size);
int i = 0;
for (InsertNode insertNode : insertNodes) {
insertTabletNodes.add((InsertTabletNode) insertNode);
index.add(i);
i++;
}
result =
new InsertMultiTabletsNode(insertNodes.get(0).getPlanNodeId(), index, insertTabletNodes);
} else { // merge to InsertRowsNode or InsertRowsOfOneDeviceNode
boolean sameDevice = true;
PartialPath device = insertNodes.get(0).getDevicePath();
List<Integer> index = new ArrayList<>(size);
List<InsertRowNode> insertRowNodes = new ArrayList<>(size);
int i = 0;
for (InsertNode insertNode : insertNodes) {
if (sameDevice && !insertNode.getDevicePath().equals(device)) {
sameDevice = false;
}
insertRowNodes.add((InsertRowNode) insertNode);
index.add(i);
i++;
}
result =
sameDevice
? new InsertRowsOfOneDeviceNode(
insertNodes.get(0).getPlanNodeId(), index, insertRowNodes)
: new InsertRowsNode(insertNodes.get(0).getPlanNodeId(), index, insertRowNodes);
}
result.setSearchIndex(insertNodes.get(0).getSearchIndex());
result.setDevicePath(insertNodes.get(0).getDevicePath());
return result;
}
protected TSStatus write(PlanNode planNode) {
return planNode.accept(new DataExecutionVisitor(), region);
}
@Override
public DataSet read(IConsensusRequest request) {
if (request instanceof GetConsensusReqReaderPlan) {
return region.getWALNode();
} else {
FragmentInstance fragmentInstance;
try {
fragmentInstance = getFragmentInstance(request);
} catch (IllegalArgumentException e) {
logger.error(e.getMessage());
return null;
}
return QUERY_INSTANCE_MANAGER.execDataQueryFragmentInstance(fragmentInstance, region);
}
}
@Override
public boolean shouldRetry(TSStatus writeResult) {
// TODO implement this
return super.shouldRetry(writeResult);
}
@Override
public TSStatus updateResult(TSStatus previousResult, TSStatus retryResult) {
// TODO implement this
return super.updateResult(previousResult, retryResult);
}
@Override
public long getSleepTime() {
// TODO implement this
return super.getSleepTime();
}
}