blob: aaf807b3c5b406b97fa666c40f96005574372fd8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.service.thrift.impl;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFRegistrationService;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.auth.AuthorizerManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngineV2;
import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.mpp.plan.scheduler.load.LoadTsFileScheduler;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.type.Gauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionRequest;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStateReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceStateResp;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendPlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateConfigNodeGroupReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TcreateTriggerInstanceReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import com.google.common.collect.ImmutableList;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface {
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeInternalRPCServiceImpl.class);
private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
private final StorageEngineV2 storageEngine = StorageEngineV2.getInstance();
private final DataNodeRegionManager regionManager;
public DataNodeInternalRPCServiceImpl() {
super();
regionManager = new DataNodeRegionManager(schemaEngine, storageEngine);
}
@Override
public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
LOGGER.info("receive FragmentInstance to group[{}]", req.getConsensusGroupId());
// deserialize ConsensusGroupId
ConsensusGroupId groupId;
try {
groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
} catch (Throwable t) {
LOGGER.error("Deserialize ConsensusGroupId failed. ", t);
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
resp.setMessage("Deserialize ConsensusGroupId failed: " + t.getMessage());
return resp;
}
// We deserialize here instead of the underlying state machine because parallelism is possible
// here but not at the underlying state machine
FragmentInstance fragmentInstance;
try {
fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
} catch (Throwable t) {
LOGGER.error("Deserialize FragmentInstance failed.", t);
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
resp.setMessage("Deserialize FragmentInstance failed: " + t.getMessage());
return resp;
}
// execute fragment instance in state machine
ConsensusReadResponse readResponse;
try (SetThreadName threadName = new SetThreadName(fragmentInstance.getId().getFullId())) {
if (groupId instanceof DataRegionId) {
readResponse = DataRegionConsensusImpl.getInstance().read(groupId, fragmentInstance);
} else {
readResponse = SchemaRegionConsensusImpl.getInstance().read(groupId, fragmentInstance);
}
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
if (!readResponse.isSuccess()) {
LOGGER.error(
"Execute FragmentInstance in ConsensusGroup {} failed.",
req.getConsensusGroupId(),
readResponse.getException());
resp.setAccepted(false);
resp.setMessage(
"Execute FragmentInstance failed: "
+ (readResponse.getException() == null
? ""
: readResponse.getException().getMessage()));
} else {
FragmentInstanceInfo info = (FragmentInstanceInfo) readResponse.getDataset();
resp.setAccepted(!info.getState().isFailed());
resp.setMessage(info.getMessage());
}
return resp;
} catch (Throwable t) {
LOGGER.error(
"Execute FragmentInstance in ConsensusGroup {} failed.", req.getConsensusGroupId(), t);
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
return resp;
}
}
@Override
public TSendPlanNodeResp sendPlanNode(TSendPlanNodeReq req) {
LOGGER.info("receive PlanNode to group[{}]", req.getConsensusGroupId());
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
PlanNode planNode = PlanNodeType.deserialize(req.planNode.body);
return regionManager.executePlanNode(groupId, planNode);
}
@Override
public TFragmentInstanceStateResp fetchFragmentInstanceState(TFetchFragmentInstanceStateReq req) {
FragmentInstanceId instanceId = FragmentInstanceId.fromThrift(req.fragmentInstanceId);
FragmentInstanceInfo info = FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
if (info != null) {
TFragmentInstanceStateResp resp = new TFragmentInstanceStateResp(info.getState().toString());
resp.setFailedMessages(ImmutableList.of(info.getMessage()));
return resp;
} else {
return new TFragmentInstanceStateResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
}
}
@Override
public TCancelResp cancelQuery(TCancelQueryReq req) {
try (SetThreadName threadName = new SetThreadName(req.getQueryId())) {
List<FragmentInstanceId> taskIds =
req.getFragmentInstanceIds().stream()
.map(FragmentInstanceId::fromThrift)
.collect(Collectors.toList());
for (FragmentInstanceId taskId : taskIds) {
FragmentInstanceManager.getInstance().cancelTask(taskId);
}
return new TCancelResp(true);
}
}
@Override
public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) {
throw new NotImplementedException();
}
@Override
public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) {
throw new NotImplementedException();
}
@Override
public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) {
throw new UnsupportedOperationException();
}
@Override
public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) throws TException {
LOGGER.info(String.format("Receive load node from uuid %s.", req.uuid));
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) PlanNodeType.deserialize(req.body);
if (pieceNode == null) {
return createTLoadResp(new TSStatus(TSStatusCode.NODE_DESERIALIZE_ERROR.getStatusCode()));
}
TSStatus resultStatus =
StorageEngineV2.getInstance()
.writeLoadTsFileNode((DataRegionId) groupId, pieceNode, req.uuid);
return createTLoadResp(resultStatus);
}
@Override
public TLoadResp sendLoadCommand(TLoadCommandReq req) throws TException {
TSStatus resultStatus =
StorageEngineV2.getInstance()
.executeLoadCommand(
LoadTsFileScheduler.LoadCommand.values()[req.commandType], req.uuid);
return createTLoadResp(resultStatus);
}
private TLoadResp createTLoadResp(TSStatus resultStatus) {
boolean isAccepted = RpcUtils.SUCCESS_STATUS.equals(resultStatus);
TLoadResp loadResp = new TLoadResp(isAccepted);
if (!isAccepted) {
loadResp.setMessage(resultStatus.getMessage());
loadResp.setStatus(resultStatus);
}
return loadResp;
}
@Override
public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) {
return regionManager.createSchemaRegion(req.getRegionReplicaSet(), req.getStorageGroup());
}
@Override
public TSStatus createDataRegion(TCreateDataRegionReq req) {
return regionManager.createDataRegion(
req.getRegionReplicaSet(), req.getStorageGroup(), req.getTtl());
}
@Override
public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) {
ClusterPartitionFetcher.getInstance().invalidAllCache();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
DataNodeSchemaCache.getInstance().cleanUp();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TSStatus constructSchemaBlackList(TConstructSchemaBlackListReq req) throws TException {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
List<TSStatus> failureList = new ArrayList<>();
TSStatus status;
int preDeletedNum = 0;
for (TConsensusGroupId consensusGroupId : req.getSchemaRegionIdList()) {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
.getStorageGroupFullPath();
PathPatternTree filteredPatternTree = filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
continue;
}
status =
regionManager.executeSchemaPlanNode(
new SchemaRegionId(consensusGroupId.getId()),
new ConstructSchemaBlackListNode(new PlanNodeId(""), filteredPatternTree));
if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
preDeletedNum += Integer.parseInt(status.getMessage());
} else {
failureList.add(status);
}
}
if (!failureList.isEmpty()) {
return RpcUtils.getStatus(failureList);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, String.valueOf(preDeletedNum));
}
@Override
public TSStatus rollbackSchemaBlackList(TRollbackSchemaBlackListReq req) throws TException {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
List<TSStatus> failureList = new ArrayList<>();
TSStatus status;
for (TConsensusGroupId consensusGroupId : req.getSchemaRegionIdList()) {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
.getStorageGroupFullPath();
PathPatternTree filteredPatternTree = filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
continue;
}
status =
regionManager.executeSchemaPlanNode(
new SchemaRegionId(consensusGroupId.getId()),
new RollbackSchemaBlackListNode(new PlanNodeId(""), filteredPatternTree));
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failureList.add(status);
}
}
if (!failureList.isEmpty()) {
return RpcUtils.getStatus(failureList);
}
return RpcUtils.SUCCESS_STATUS;
}
@Override
public TSStatus invalidateMatchedSchemaCache(TInvalidateMatchedSchemaCacheReq req)
throws TException {
DataNodeSchemaCache cache = DataNodeSchemaCache.getInstance();
cache.takeWriteLock();
try {
for (PartialPath pathPattern :
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()))
.getAllPathPatterns()) {
cache.invalidateMatchedSchema(pathPattern);
}
} finally {
cache.releaseWriteLock();
}
return RpcUtils.SUCCESS_STATUS;
}
@Override
public TFetchSchemaBlackListResp fetchSchemaBlackList(TFetchSchemaBlackListReq req)
throws TException {
PathPatternTree patternTree = PathPatternTree.deserialize(req.pathPatternTree);
TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
PathPatternTree result = new PathPatternTree();
for (TConsensusGroupId consensusGroupId : req.getSchemaRegionIdList()) {
// todo implement as consensus layer read request
try {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()));
PathPatternTree filteredPatternTree =
filterPathPatternTree(patternTree, schemaRegion.getStorageGroupFullPath());
if (filteredPatternTree.isEmpty()) {
continue;
}
for (PartialPath path : schemaRegion.fetchSchemaBlackList(filteredPatternTree)) {
result.appendFullPath(path);
}
} catch (MetadataException e) {
LOGGER.error(e.getMessage(), e);
resp.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
return resp;
}
}
resp.setStatus(RpcUtils.SUCCESS_STATUS);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
result.constructTree();
try {
result.serialize(dataOutputStream);
} catch (IOException ignored) {
// won't reach here
}
resp.setPathPatternTree(outputStream.toByteArray());
return resp;
}
@Override
public TSStatus deleteDataForDeleteTimeSeries(TDeleteDataForDeleteTimeSeriesReq req)
throws TException {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
List<PartialPath> pathList = patternTree.getAllPathPatterns();
List<TSStatus> failureList = new ArrayList<>();
TSStatus status;
for (TConsensusGroupId consensusGroupId : req.getDataRegionIdList()) {
status =
regionManager.executeDeleteDataForDeleteTimeSeries(
new DataRegionId(consensusGroupId.getId()),
new DeleteDataNode(new PlanNodeId(""), pathList, Long.MIN_VALUE, Long.MAX_VALUE));
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failureList.add(status);
}
}
if (!failureList.isEmpty()) {
return RpcUtils.getStatus(failureList);
}
return RpcUtils.SUCCESS_STATUS;
}
@Override
public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) throws TException {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
List<TSStatus> failureList = new ArrayList<>();
TSStatus status;
for (TConsensusGroupId consensusGroupId : req.getSchemaRegionIdList()) {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
.getStorageGroupFullPath();
PathPatternTree filteredPatternTree = filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
continue;
}
status =
regionManager.executeSchemaPlanNode(
new SchemaRegionId(consensusGroupId.getId()),
new DeleteTimeSeriesNode(new PlanNodeId(""), filteredPatternTree));
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failureList.add(status);
}
}
if (!failureList.isEmpty()) {
return RpcUtils.getStatus(failureList);
}
return RpcUtils.SUCCESS_STATUS;
}
private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, String storageGroup) {
PathPatternTree filteredPatternTree = new PathPatternTree();
try {
PartialPath storageGroupPattern =
new PartialPath(storageGroup).concatNode(MULTI_LEVEL_PATH_WILDCARD);
for (PartialPath pathPattern : patternTree.getOverlappedPathPatterns(storageGroupPattern)) {
filteredPatternTree.appendPathPattern(pathPattern);
}
filteredPatternTree.constructTree();
} catch (IllegalPathException e) {
// won't reach here
}
return filteredPatternTree;
}
@Override
public THeartbeatResp getDataNodeHeartBeat(THeartbeatReq req) throws TException {
THeartbeatResp resp = new THeartbeatResp();
resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());
resp.setStatus(CommonDescriptor.getInstance().getConfig().getNodeStatus().getStatus());
// Judging leader if necessary
if (req.isNeedJudgeLeader()) {
resp.setJudgedLeaders(getJudgedLeaders());
}
// Sampling load if necessary
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()
&& req.isNeedSamplingLoad()) {
long cpuLoad =
MetricService.getInstance()
.getOrCreateGauge(
Metric.SYS_CPU_LOAD.toString(), MetricLevel.CORE, Tag.NAME.toString(), "system")
.value();
if (cpuLoad != 0) {
resp.setCpu((short) cpuLoad);
}
long usedMemory = getMemory("jvm.memory.used.bytes");
long maxMemory = getMemory("jvm.memory.max.bytes");
if (usedMemory != 0 && maxMemory != 0) {
resp.setMemory((short) (usedMemory * 100 / maxMemory));
}
}
return resp;
}
@Override
public TSStatus updateRegionCache(TRegionRouteReq req) throws TException {
boolean result = ClusterPartitionFetcher.getInstance().updateRegionCache(req);
if (result) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
return RpcUtils.getStatus(TSStatusCode.CACHE_UPDATE_FAIL);
}
}
private Map<TConsensusGroupId, Boolean> getJudgedLeaders() {
Map<TConsensusGroupId, Boolean> result = new HashMap<>();
if (DataRegionConsensusImpl.getInstance() != null) {
DataRegionConsensusImpl.getInstance()
.getAllConsensusGroupIds()
.forEach(
groupId -> {
result.put(
groupId.convertToTConsensusGroupId(),
DataRegionConsensusImpl.getInstance().isLeader(groupId));
});
}
if (SchemaRegionConsensusImpl.getInstance() != null) {
SchemaRegionConsensusImpl.getInstance()
.getAllConsensusGroupIds()
.forEach(
groupId -> {
result.put(
groupId.convertToTConsensusGroupId(),
SchemaRegionConsensusImpl.getInstance().isLeader(groupId));
});
}
return result;
}
private long getMemory(String gaugeName) {
long result = 0;
try {
//
List<String> heapIds = Arrays.asList("PS Eden Space", "PS Old Eden", "Ps Survivor Space");
List<String> noHeapIds = Arrays.asList("Code Cache", "Compressed Class Space", "Metaspace");
for (String id : heapIds) {
Gauge gauge =
MetricService.getInstance()
.getOrCreateGauge(gaugeName, MetricLevel.IMPORTANT, "id", id, "area", "heap");
result += gauge.value();
}
for (String id : noHeapIds) {
Gauge gauge =
MetricService.getInstance()
.getOrCreateGauge(gaugeName, MetricLevel.IMPORTANT, "id", id, "area", "noheap");
result += gauge.value();
}
} catch (Exception e) {
LOGGER.error("Failed to get memory from metric because {}", e.getMessage());
return 0;
}
return result;
}
@Override
public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) {
if (AuthorizerManager.getInstance().invalidateCache(req.getUsername(), req.getRoleName())) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
return RpcUtils.getStatus(TSStatusCode.INVALIDATE_PERMISSION_CACHE_ERROR);
}
@Override
public TSStatus merge() throws TException {
try {
storageEngine.mergeAll();
} catch (StorageEngineException e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus flush(TFlushReq req) throws TException {
return storageEngine.operateFlush(req);
}
@Override
public TSStatus clearCache() throws TException {
ChunkCache.getInstance().clear();
TimeSeriesMetadataCache.getInstance().clear();
BloomFilterCache.getInstance().clear();
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus loadConfiguration() throws TException {
try {
IoTDBDescriptor.getInstance().loadHotModifiedProps();
} catch (QueryProcessException e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus setSystemStatus(String status) throws TException {
try {
CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.parse(status));
} catch (Exception e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus setTTL(TSetTTLReq req) throws TException {
return storageEngine.setTTL(req);
}
@Override
public TSStatus updateConfigNodeGroup(TUpdateConfigNodeGroupReq req) {
List<TConfigNodeLocation> configNodeLocations = req.getConfigNodeLocations();
if (configNodeLocations != null) {
ConfigNodeInfo.getInstance()
.updateConfigNodeList(
configNodeLocations
.parallelStream()
.map(TConfigNodeLocation::getInternalEndPoint)
.collect(Collectors.toList()));
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus updateTemplate(TUpdateTemplateReq req) throws TException {
switch (TemplateInternalRPCUpdateType.getType(req.type)) {
case ADD_TEMPLATE_SET_INFO:
ClusterTemplateManager.getInstance().updateTemplateSetInfo(req.getTemplateInfo());
break;
case INVALIDATE_TEMPLATE_SET_INFO:
ClusterTemplateManager.getInstance().invalidateTemplateSetInfo(req.getTemplateInfo());
break;
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
if (consensusGroupId instanceof DataRegionId) {
ConsensusGenericResponse response =
DataRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
if (!response.isSuccess()
&& !(response.getException() instanceof PeerNotInConsensusGroupException)) {
return RpcUtils.getStatus(
TSStatusCode.DELETE_REGION_ERROR, response.getException().getMessage());
}
return regionManager.deleteDataRegion((DataRegionId) consensusGroupId);
} else {
ConsensusGenericResponse response =
SchemaRegionConsensusImpl.getInstance().deletePeer(consensusGroupId);
if (!response.isSuccess()
&& !(response.getException() instanceof PeerNotInConsensusGroupException)) {
return RpcUtils.getStatus(
TSStatusCode.DELETE_REGION_ERROR, response.getException().getMessage());
}
return regionManager.deleteSchemaRegion((SchemaRegionId) consensusGroupId);
}
}
public TSStatus changeRegionLeader(TRegionLeaderChangeReq req) throws TException {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
TConsensusGroupId tgId = req.getRegionId();
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId);
TEndPoint newNode = getConsensusEndPoint(req.getNewLeaderNode(), regionId);
Peer newLeaderPeer = new Peer(regionId, newNode);
if (!isLeader(regionId)) {
LOGGER.info("region {} is not leader, no need to change leader", regionId);
return status;
}
LOGGER.info("region {} is leader, will change leader", regionId);
return transferLeader(regionId, newLeaderPeer);
}
private TSStatus transferLeader(ConsensusGroupId regionId, Peer newLeaderPeer) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
ConsensusGenericResponse resp;
if (regionId instanceof DataRegionId) {
resp = DataRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
} else if (regionId instanceof SchemaRegionId) {
resp = SchemaRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
} else {
status.setCode(TSStatusCode.REGION_LEADER_CHANGE_FAILED.getStatusCode());
status.setMessage("Error Region type. region: " + regionId);
return status;
}
if (!resp.isSuccess()) {
LOGGER.error("change region {} leader failed", regionId, resp.getException());
status.setCode(TSStatusCode.REGION_LEADER_CHANGE_FAILED.getStatusCode());
status.setMessage(resp.getException().getMessage());
return status;
}
status.setMessage("change region " + regionId + " leader succeed");
return status;
}
private boolean isLeader(ConsensusGroupId regionId) {
if (regionId instanceof DataRegionId) {
return DataRegionConsensusImpl.getInstance().isLeader(regionId);
}
if (regionId instanceof SchemaRegionId) {
return SchemaRegionConsensusImpl.getInstance().isLeader(regionId);
}
LOGGER.error("region {} type is illegal", regionId);
return false;
}
@Override
public TSStatus createNewRegionPeer(TCreatePeerReq req) throws TException {
ConsensusGroupId regionId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
List<Peer> peers =
req.getRegionLocations().stream()
.map(location -> new Peer(regionId, getConsensusEndPoint(location, regionId)))
.collect(Collectors.toList());
TSStatus status = createNewRegion(regionId, req.getStorageGroup(), req.getTtl());
if (!isSucceed(status)) {
return status;
}
return createNewRegionPeer(regionId, peers);
}
@Override
public TSStatus addRegionPeer(TMaintainPeerReq req) throws TException {
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed = RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
"Successfully submit addRegionPeer task for region: {} on DataNode: {}",
regionId,
selectedDataNodeIP);
return status;
}
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("Submit addRegionPeer task failed, region: " + regionId);
return status;
}
@Override
public TSStatus removeRegionPeer(TMaintainPeerReq req) throws TException {
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
"Successfully submit removeRegionPeer task for region: {} on DataNode: {}",
regionId,
selectedDataNodeIP);
return status;
}
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("Submit removeRegionPeer task failed, region: " + regionId);
return status;
}
@Override
public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) throws TException {
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed = RegionMigrateService.getInstance().submitDeleteOldRegionPeerTask(req);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
"Successfully submit deleteOldRegionPeer task for region: {} on DataNode: {}",
regionId,
selectedDataNodeIP);
return status;
}
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("Submit deleteOldRegionPeer task failed, region: " + regionId);
return status;
}
private TSStatus createNewRegion(ConsensusGroupId regionId, String storageGroup, long ttl) {
return regionManager.createNewRegion(regionId, storageGroup, ttl);
}
@Override
public TSStatus createFunction(TCreateFunctionRequest request) {
try {
UDFRegistrationService.getInstance()
.register(
request.getUdfName(),
request.getClassName(),
request.getUris(),
UDFExecutableManager.getInstance(),
true);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
}
@Override
public TSStatus dropFunction(TDropFunctionRequest request) {
try {
UDFRegistrationService.getInstance().deregister(request.getUdfName());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
}
@Override
public TSStatus createTriggerInstance(TcreateTriggerInstanceReq req) throws TException {
TriggerInformation triggerInformation = TriggerInformation.deserialize(req.triggerInformation);
try {
// set state to INACTIVE when creating trigger instance
triggerInformation.setTriggerState(TTriggerState.INACTIVE);
// save jar file at trigger_lib_dir
TriggerExecutableManager.getInstance()
.writeToLibDir(req.jarFile, triggerInformation.getJarName());
// register trigger information with TriggerRegistrationService
// config nodes take responsibility for synchronization control
TriggerManagementService.getInstance().register(triggerInformation);
} catch (Exception e) {
LOGGER.warn(
"Error occurred when creating trigger instance for trigger: {}. The cause is {}.",
triggerInformation.getTriggerName(),
e.getMessage());
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TSStatus activeTriggerInstance(TactiveTriggerInstanceReq req) throws TException {
try {
TriggerManagementService.getInstance().activeTrigger(req.triggerName);
} catch (Exception e) {
LOGGER.error("Error occurred during ");
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TSStatus dropTriggerInstance(TDropTriggerInstanceReq req) throws TException {
// todo: implementation
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
private TEndPoint getConsensusEndPoint(
TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
if (regionId instanceof DataRegionId) {
return nodeLocation.getDataRegionConsensusEndPoint();
}
return nodeLocation.getSchemaRegionConsensusEndPoint();
}
private boolean isSucceed(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
private TSStatus createNewRegionPeer(ConsensusGroupId regionId, List<Peer> peers) {
LOGGER.info("Start to createNewRegionPeer {} to region {}", peers, regionId);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
ConsensusGenericResponse resp;
if (regionId instanceof DataRegionId) {
resp = DataRegionConsensusImpl.getInstance().createPeer(regionId, peers);
} else {
resp = SchemaRegionConsensusImpl.getInstance().createPeer(regionId, peers);
}
if (!resp.isSuccess()) {
LOGGER.error(
"CreateNewRegionPeer error, peers: {}, regionId: {}, errorMessage",
peers,
regionId,
resp.getException());
status.setCode(TSStatusCode.REGION_MIGRATE_FAILED.getStatusCode());
status.setMessage(resp.getException().getMessage());
return status;
}
LOGGER.info("Succeed to createNewRegionPeer {} for region {}", peers, regionId);
status.setMessage("createNewRegionPeer succeed, regionId: " + regionId);
return status;
}
@Override
public TSStatus disableDataNode(TDisableDataNodeReq req) throws TException {
LOGGER.info("start disable data node in the request: {}", req);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("disable datanode succeed");
// TODO what need to clean?
ClusterPartitionFetcher.getInstance().invalidAllCache();
DataNodeSchemaCache.getInstance().cleanUp();
return status;
}
@Override
public TSStatus stopDataNode() {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
LOGGER.info("Execute stopDataNode RPC method");
// kill the datanode process 20 seconds later
// because datanode process cannot exit normally for the reason of InterruptedException
new Thread(
() -> {
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
LOGGER.error("Meets InterruptedException in stopDataNode RPC method");
} finally {
LOGGER.info("Executing system.exit(0) in stopDataNode RPC method after 20 seconds");
System.exit(0);
}
})
.start();
try {
DataNode.getInstance().stop();
status.setMessage("stop datanode succeed");
} catch (Exception e) {
LOGGER.error("stop Data Node error", e);
status.setCode(TSStatusCode.DATANODE_STOP_ERROR.getStatusCode());
status.setMessage(e.getMessage());
}
return status;
}
public void handleClientExit() {}
}