blob: a46158aafb0b5a6d034bbcf9f70cd58d768eafa0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.protocol.thrift.impl;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.common.rpc.thrift.TSetThrottleQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSettleReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.udf.UDFInformation;
import org.apache.iotdb.commons.udf.service.UDFManagementService;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.protocol.thrift.OperationType;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.execution.executor.RegionExecutionResult;
import org.apache.iotdb.db.queryengine.execution.executor.RegionReadExecutor;
import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceFailureInfo;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.queryengine.execution.operator.schema.source.ISchemaSource;
import org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.binary.GreaterEqualExpression;
import org.apache.iotdb.db.queryengine.plan.expression.binary.LessThanExpression;
import org.apache.iotdb.db.queryengine.plan.expression.binary.LogicAndExpression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.RollbackPreDeactivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.ConstructLogicalViewBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
import org.apache.iotdb.db.queryengine.plan.statement.component.WhereCondition;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ITimeSeriesSchemaInfo;
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.reader.ISchemaReader;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.service.DataNode;
import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.compaction.repair.RepairTaskStatus;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleTaskManager;
import org.apache.iotdb.db.storageengine.dataregion.compaction.settle.SettleRequestHandler;
import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
import org.apache.iotdb.db.trigger.executor.TriggerFireResult;
import org.apache.iotdb.db.trigger.service.TriggerManagementService;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.metrics.type.AutoGauge;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.SystemMetric;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TAlterViewReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelPlanFragmentReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TCancelResp;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckSchemaRegionUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCheckTimeSeriesExistenceResp;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructSchemaBlackListWithTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TConstructViewSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateFunctionInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TDeactivateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteDataForDeleteSchemaReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.mpp.rpc.thrift.TDeleteViewSchemaReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropFunctionInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TExecuteCQ;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceInfoReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerReq;
import org.apache.iotdb.mpp.rpc.thrift.TFireTriggerResp;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TLoadSample;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaRespExceptionMessage;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiTopicMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
import org.apache.iotdb.mpp.rpc.thrift.TPushSingleConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushSinglePipeMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushSingleTopicMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackSchemaBlackListWithTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TRollbackViewSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchRequest;
import org.apache.iotdb.mpp.rpc.thrift.TSchemaFetchResponse;
import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
import com.google.common.collect.ImmutableList;
import org.apache.thrift.TException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.NotImplementedException;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.record.Tablet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.service.RegionMigrateService.REGION_MIGRATE_PROCESS;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface {
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeInternalRPCServiceImpl.class);
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
private static final Coordinator COORDINATOR = Coordinator.getInstance();
private final IPartitionFetcher partitionFetcher;
private final ISchemaFetcher schemaFetcher;
private final SchemaEngine schemaEngine = SchemaEngine.getInstance();
private final StorageEngine storageEngine = StorageEngine.getInstance();
private final DataNodeRegionManager regionManager = DataNodeRegionManager.getInstance();
private final DataNodeSpaceQuotaManager spaceQuotaManager =
DataNodeSpaceQuotaManager.getInstance();
private final DataNodeThrottleQuotaManager throttleQuotaManager =
DataNodeThrottleQuotaManager.getInstance();
private final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
private static final String SYSTEM = "system";
public DataNodeInternalRPCServiceImpl() {
super();
partitionFetcher = ClusterPartitionFetcher.getInstance();
schemaFetcher = ClusterSchemaFetcher.getInstance();
}
@Override
public TSendFragmentInstanceResp sendFragmentInstance(TSendFragmentInstanceReq req) {
LOGGER.debug("receive FragmentInstance to group[{}]", req.getConsensusGroupId());
// Deserialize ConsensusGroupId
ConsensusGroupId groupId = null;
if (req.consensusGroupId != null) {
try {
groupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
} catch (Exception e) {
LOGGER.warn("Deserialize ConsensusGroupId failed. ", e);
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
resp.setMessage("Deserialize ConsensusGroupId failed: " + e.getMessage());
return resp;
}
}
// We deserialize here instead of the underlying state machine because parallelism is possible
// here but not at the underlying state machine
FragmentInstance fragmentInstance;
try {
fragmentInstance = FragmentInstance.deserializeFrom(req.fragmentInstance.body);
} catch (Exception e) {
LOGGER.warn("Deserialize FragmentInstance failed.", e);
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp(false);
resp.setMessage("Deserialize FragmentInstance failed: " + e.getMessage());
return resp;
}
RegionReadExecutor executor = new RegionReadExecutor();
RegionExecutionResult executionResult =
groupId == null
? executor.execute(fragmentInstance)
: executor.execute(groupId, fragmentInstance);
TSendFragmentInstanceResp resp = new TSendFragmentInstanceResp();
resp.setAccepted(executionResult.isAccepted());
resp.setMessage(executionResult.getMessage());
resp.setNeedRetry(executionResult.isNeedRetry());
return resp;
}
@Override
public TSendBatchPlanNodeResp sendBatchPlanNode(TSendBatchPlanNodeReq req) {
List<TSendSinglePlanNodeResp> responses =
req.getRequests().stream()
.map(
request -> {
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(
request.getConsensusGroupId());
PlanNode planNode = PlanNodeType.deserialize(request.planNode.body);
RegionWriteExecutor executor = new RegionWriteExecutor();
TSendSinglePlanNodeResp resp = new TSendSinglePlanNodeResp();
RegionExecutionResult executionResult = executor.execute(groupId, planNode);
resp.setAccepted(executionResult.isAccepted());
resp.setMessage(executionResult.getMessage());
resp.setStatus(executionResult.getStatus());
return resp;
})
.collect(Collectors.toList());
return new TSendBatchPlanNodeResp(responses);
}
@Override
public TFragmentInstanceInfoResp fetchFragmentInstanceInfo(TFetchFragmentInstanceInfoReq req) {
FragmentInstanceId instanceId = FragmentInstanceId.fromThrift(req.fragmentInstanceId);
FragmentInstanceInfo info = FragmentInstanceManager.getInstance().getInstanceInfo(instanceId);
if (info != null) {
TFragmentInstanceInfoResp resp = new TFragmentInstanceInfoResp(info.getState().toString());
resp.setEndTime(info.getEndTime());
resp.setFailedMessages(ImmutableList.of(info.getMessage()));
try {
List<ByteBuffer> failureInfoList = new ArrayList<>();
for (FragmentInstanceFailureInfo failureInfo : info.getFailureInfoList()) {
failureInfoList.add(failureInfo.serialize());
}
resp.setFailureInfoList(failureInfoList);
return resp;
} catch (IOException e) {
return resp;
}
} else {
return new TFragmentInstanceInfoResp(FragmentInstanceState.NO_SUCH_INSTANCE.toString());
}
}
@Override
public TCancelResp cancelQuery(TCancelQueryReq req) {
try (SetThreadName threadName = new SetThreadName(req.getQueryId())) {
List<FragmentInstanceId> taskIds =
req.getFragmentInstanceIds().stream()
.map(FragmentInstanceId::fromThrift)
.collect(Collectors.toList());
for (FragmentInstanceId taskId : taskIds) {
FragmentInstanceManager.getInstance().cancelTask(taskId, req.hasThrowable);
}
return new TCancelResp(true);
}
}
@Override
public TCancelResp cancelPlanFragment(TCancelPlanFragmentReq req) {
throw new NotImplementedException();
}
@Override
public TCancelResp cancelFragmentInstance(TCancelFragmentInstanceReq req) {
throw new NotImplementedException();
}
@Override
public TSchemaFetchResponse fetchSchema(TSchemaFetchRequest req) {
throw new UnsupportedOperationException();
}
@Override
public TLoadResp sendTsFilePieceNode(TTsFilePieceReq req) {
LOGGER.info("Receive load node from uuid {}.", req.uuid);
ConsensusGroupId groupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) PlanNodeType.deserialize(req.body);
if (pieceNode == null) {
return createTLoadResp(
new TSStatus(TSStatusCode.DESERIALIZE_PIECE_OF_TSFILE_ERROR.getStatusCode()));
}
TSStatus resultStatus =
StorageEngine.getInstance()
.writeLoadTsFileNode((DataRegionId) groupId, pieceNode, req.uuid);
return createTLoadResp(resultStatus);
}
@Override
public TLoadResp sendLoadCommand(TLoadCommandReq req) {
final ProgressIndex progressIndex;
if (req.isSetProgressIndex()) {
progressIndex = ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
} else {
// fallback to use local generated progress index for compatibility
progressIndex = PipeAgent.runtime().getNextProgressIndexForTsFileLoad();
LOGGER.info(
"Use local generated load progress index {} for uuid {}.", progressIndex, req.uuid);
}
return createTLoadResp(
StorageEngine.getInstance()
.executeLoadCommand(
LoadTsFileScheduler.LoadCommand.values()[req.commandType],
req.uuid,
req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
progressIndex));
}
private TLoadResp createTLoadResp(TSStatus resultStatus) {
boolean isAccepted = RpcUtils.SUCCESS_STATUS.equals(resultStatus);
TLoadResp loadResp = new TLoadResp(isAccepted);
if (!isAccepted) {
loadResp.setMessage(resultStatus.getMessage());
loadResp.setStatus(resultStatus);
}
return loadResp;
}
@Override
public TSStatus createSchemaRegion(TCreateSchemaRegionReq req) {
return regionManager.createSchemaRegion(req.getRegionReplicaSet(), req.getStorageGroup());
}
@Override
public TSStatus createDataRegion(TCreateDataRegionReq req) {
return regionManager.createDataRegion(
req.getRegionReplicaSet(), req.getStorageGroup(), req.getTtl());
}
@Override
public TSStatus invalidatePartitionCache(TInvalidateCacheReq req) {
ClusterPartitionFetcher.getInstance().invalidAllCache();
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TSStatus invalidateSchemaCache(TInvalidateCacheReq req) {
DataNodeSchemaCache.getInstance().takeWriteLock();
try {
// req.getFullPath() is a database path
DataNodeSchemaCache.getInstance().invalidate(req.getFullPath());
ClusterTemplateManager.getInstance().invalid(req.getFullPath());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} finally {
DataNodeSchemaCache.getInstance().releaseWriteLock();
}
}
@Override
public TSStatus constructSchemaBlackList(TConstructSchemaBlackListReq req) throws TException {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
AtomicInteger preDeletedNum = new AtomicInteger(0);
TSStatus executionResult =
executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
.getDatabaseFullPath();
PathPatternTree filteredPatternTree =
filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
RegionWriteExecutor executor = new RegionWriteExecutor();
TSStatus status =
executor
.execute(
new SchemaRegionId(consensusGroupId.getId()),
new ConstructSchemaBlackListNode(new PlanNodeId(""), filteredPatternTree))
.getStatus();
if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
preDeletedNum.getAndAdd(Integer.parseInt(status.getMessage()));
}
return status;
});
executionResult.setMessage(String.valueOf(preDeletedNum.get()));
return executionResult;
}
@Override
public TSStatus rollbackSchemaBlackList(TRollbackSchemaBlackListReq req) {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
return executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
.getDatabaseFullPath();
PathPatternTree filteredPatternTree = filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
RegionWriteExecutor executor = new RegionWriteExecutor();
return executor
.execute(
new SchemaRegionId(consensusGroupId.getId()),
new RollbackSchemaBlackListNode(new PlanNodeId(""), filteredPatternTree))
.getStatus();
});
}
@Override
public TSStatus invalidateMatchedSchemaCache(TInvalidateMatchedSchemaCacheReq req) {
DataNodeSchemaCache cache = DataNodeSchemaCache.getInstance();
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
cache.takeWriteLock();
try {
cache.invalidate(PathPatternTree.deserialize(req.pathPatternTree).getAllPathPatterns());
} finally {
cache.releaseWriteLock();
DataNodeSchemaLockManager.getInstance().releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
}
return RpcUtils.SUCCESS_STATUS;
}
@Override
public TFetchSchemaBlackListResp fetchSchemaBlackList(TFetchSchemaBlackListReq req) {
PathPatternTree patternTree = PathPatternTree.deserialize(req.pathPatternTree);
TFetchSchemaBlackListResp resp = new TFetchSchemaBlackListResp();
PathPatternTree result = new PathPatternTree();
for (TConsensusGroupId consensusGroupId : req.getSchemaRegionIdList()) {
// todo implement as consensus layer read request
try {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()));
PathPatternTree filteredPatternTree =
filterPathPatternTree(patternTree, schemaRegion.getDatabaseFullPath());
if (filteredPatternTree.isEmpty()) {
continue;
}
for (PartialPath path : schemaRegion.fetchSchemaBlackList(filteredPatternTree)) {
result.appendFullPath(path);
}
} catch (MetadataException e) {
LOGGER.warn(e.getMessage(), e);
resp.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
return resp;
}
}
resp.setStatus(RpcUtils.SUCCESS_STATUS);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
result.constructTree();
try {
result.serialize(dataOutputStream);
} catch (IOException ignored) {
// Won't reach here
}
resp.setPathPatternTree(outputStream.toByteArray());
return resp;
}
@Override
public TSStatus deleteDataForDeleteSchema(TDeleteDataForDeleteSchemaReq req) {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
List<PartialPath> pathList = patternTree.getAllPathPatterns();
return executeInternalSchemaTask(
req.getDataRegionIdList(),
consensusGroupId -> {
RegionWriteExecutor executor = new RegionWriteExecutor();
return executor
.execute(
new DataRegionId(consensusGroupId.getId()),
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe()
? new PipeEnrichedDeleteDataNode(
new DeleteDataNode(
new PlanNodeId(""), pathList, Long.MIN_VALUE, Long.MAX_VALUE))
: new DeleteDataNode(
new PlanNodeId(""), pathList, Long.MIN_VALUE, Long.MAX_VALUE))
.getStatus();
});
}
@Override
public TSStatus deleteTimeSeries(TDeleteTimeSeriesReq req) throws TException {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
return executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
.getDatabaseFullPath();
PathPatternTree filteredPatternTree = filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
RegionWriteExecutor executor = new RegionWriteExecutor();
return executor
.execute(
new SchemaRegionId(consensusGroupId.getId()),
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe()
? new PipeEnrichedNonWritePlanNode(
new DeleteTimeSeriesNode(new PlanNodeId(""), filteredPatternTree))
: new DeleteTimeSeriesNode(new PlanNodeId(""), filteredPatternTree))
.getStatus();
});
}
@Override
public TSStatus constructSchemaBlackListWithTemplate(TConstructSchemaBlackListWithTemplateReq req)
throws TException {
AtomicInteger preDeactivateTemplateNum = new AtomicInteger(0);
Map<PartialPath, List<Integer>> templateSetInfo =
transformTemplateSetInfo(req.getTemplateSetInfo());
TSStatus executionResult =
executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
Map<PartialPath, List<Integer>> filteredTemplateSetInfo =
filterTemplateSetInfo(templateSetInfo, consensusGroupId);
if (filteredTemplateSetInfo.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
RegionWriteExecutor executor = new RegionWriteExecutor();
TSStatus status =
executor
.execute(
new SchemaRegionId(consensusGroupId.getId()),
new PreDeactivateTemplateNode(
new PlanNodeId(""), filteredTemplateSetInfo))
.getStatus();
if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
preDeactivateTemplateNum.getAndAdd(Integer.parseInt(status.getMessage()));
}
return status;
});
executionResult.setMessage(String.valueOf(preDeactivateTemplateNum.get()));
return executionResult;
}
private Map<PartialPath, List<Integer>> transformTemplateSetInfo(
Map<String, List<Integer>> rawTemplateSetInfo) {
Map<PartialPath, List<Integer>> result = new HashMap<>();
rawTemplateSetInfo.forEach(
(k, v) -> {
try {
result.put(new PartialPath(k), v);
} catch (IllegalPathException ignored) {
// Won't reach here
}
});
return result;
}
private Map<PartialPath, List<Integer>> filterTemplateSetInfo(
Map<PartialPath, List<Integer>> templateSetInfo, TConsensusGroupId consensusGroupId) {
Map<PartialPath, List<Integer>> result = new HashMap<>();
PartialPath storageGroupPath = getStorageGroupPath(consensusGroupId);
if (null != storageGroupPath) {
PartialPath storageGroupPattern = storageGroupPath.concatNode(MULTI_LEVEL_PATH_WILDCARD);
templateSetInfo.forEach(
(k, v) -> {
if (storageGroupPattern.overlapWith(k) || storageGroupPath.overlapWith(k)) {
result.put(k, v);
}
});
}
return result;
}
private PartialPath getStorageGroupPath(TConsensusGroupId consensusGroupId) {
PartialPath storageGroupPath = null;
try {
storageGroupPath =
new PartialPath(
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
.getDatabaseFullPath());
} catch (IllegalPathException ignored) {
// Won't reach here
}
return storageGroupPath;
}
@Override
public TSStatus rollbackSchemaBlackListWithTemplate(TRollbackSchemaBlackListWithTemplateReq req) {
Map<PartialPath, List<Integer>> templateSetInfo =
transformTemplateSetInfo(req.getTemplateSetInfo());
return executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
Map<PartialPath, List<Integer>> filteredTemplateSetInfo =
filterTemplateSetInfo(templateSetInfo, consensusGroupId);
if (filteredTemplateSetInfo.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
RegionWriteExecutor executor = new RegionWriteExecutor();
return executor
.execute(
new SchemaRegionId(consensusGroupId.getId()),
new RollbackPreDeactivateTemplateNode(
new PlanNodeId(""), filteredTemplateSetInfo))
.getStatus();
});
}
@Override
public TSStatus deactivateTemplate(TDeactivateTemplateReq req) throws TException {
Map<PartialPath, List<Integer>> templateSetInfo =
transformTemplateSetInfo(req.getTemplateSetInfo());
return executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
Map<PartialPath, List<Integer>> filteredTemplateSetInfo =
filterTemplateSetInfo(templateSetInfo, consensusGroupId);
if (filteredTemplateSetInfo.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
RegionWriteExecutor executor = new RegionWriteExecutor();
return executor
.execute(
new SchemaRegionId(consensusGroupId.getId()),
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe()
? new PipeEnrichedNonWritePlanNode(
new DeactivateTemplateNode(new PlanNodeId(""), filteredTemplateSetInfo))
: new DeactivateTemplateNode(new PlanNodeId(""), filteredTemplateSetInfo))
.getStatus();
});
}
@Override
public TCountPathsUsingTemplateResp countPathsUsingTemplate(TCountPathsUsingTemplateReq req) {
PathPatternTree patternTree = PathPatternTree.deserialize(req.patternTree);
TCountPathsUsingTemplateResp resp = new TCountPathsUsingTemplateResp();
AtomicLong result = new AtomicLong(0);
resp.setStatus(
executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
ReadWriteLock readWriteLock =
regionManager.getRegionLock(new SchemaRegionId(consensusGroupId.getId()));
// Count paths using template for unset template shall block all template activation
readWriteLock.writeLock().lock();
try {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()));
PathPatternTree filteredPatternTree =
filterPathPatternTree(patternTree, schemaRegion.getDatabaseFullPath());
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
result.getAndAdd(
schemaRegion.countPathsUsingTemplate(req.getTemplateId(), filteredPatternTree));
return RpcUtils.SUCCESS_STATUS;
} catch (MetadataException e) {
LOGGER.warn(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
} finally {
readWriteLock.writeLock().unlock();
}
}));
resp.setCount(result.get());
return resp;
}
@Override
public TCheckSchemaRegionUsingTemplateResp checkSchemaRegionUsingTemplate(
TCheckSchemaRegionUsingTemplateReq req) {
TCheckSchemaRegionUsingTemplateResp resp = new TCheckSchemaRegionUsingTemplateResp();
AtomicBoolean result = new AtomicBoolean(false);
resp.setStatus(
executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
ReadWriteLock readWriteLock =
regionManager.getRegionLock(new SchemaRegionId(consensusGroupId.getId()));
readWriteLock.writeLock().lock();
try {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()));
if (schemaRegion.getSchemaRegionStatistics().getTemplateActivatedNumber() > 0) {
result.set(true);
}
return RpcUtils.SUCCESS_STATUS;
} finally {
readWriteLock.writeLock().unlock();
}
}));
resp.setResult(result.get());
return resp;
}
@Override
public TCheckTimeSeriesExistenceResp checkTimeSeriesExistence(TCheckTimeSeriesExistenceReq req) {
PathPatternTree patternTree = PathPatternTree.deserialize(req.patternTree);
TCheckTimeSeriesExistenceResp resp = new TCheckTimeSeriesExistenceResp();
TSStatus status =
executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
ReadWriteLock readWriteLock =
regionManager.getRegionLock(new SchemaRegionId(consensusGroupId.getId()));
// Check timeseries existence for set template shall block all timeseries creation
readWriteLock.writeLock().lock();
try {
ISchemaRegion schemaRegion =
schemaEngine.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()));
PathPatternTree filteredPatternTree =
filterPathPatternTree(patternTree, schemaRegion.getDatabaseFullPath());
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
for (PartialPath pattern : filteredPatternTree.getAllPathPatterns()) {
ISchemaSource<ITimeSeriesSchemaInfo> schemaSource =
SchemaSourceFactory.getTimeSeriesSchemaCountSource(
pattern, false, null, null, SchemaConstant.ALL_MATCH_SCOPE);
try (ISchemaReader<ITimeSeriesSchemaInfo> schemaReader =
schemaSource.getSchemaReader(schemaRegion)) {
if (schemaReader.hasNext()) {
return RpcUtils.getStatus(TSStatusCode.TIMESERIES_ALREADY_EXIST);
}
} catch (Exception e) {
LOGGER.warn(e.getMessage(), e);
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
return RpcUtils.SUCCESS_STATUS;
} finally {
readWriteLock.writeLock().unlock();
}
});
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
resp.setStatus(RpcUtils.SUCCESS_STATUS);
resp.setExists(false);
} else if (status.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
boolean hasFailure = false;
for (TSStatus subStatus : status.getSubStatus()) {
if (subStatus.getCode() == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
resp.setExists(true);
} else if (subStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
hasFailure = true;
break;
}
}
if (hasFailure) {
resp.setStatus(status);
} else {
resp.setStatus(RpcUtils.SUCCESS_STATUS);
}
} else {
resp.setStatus(status);
}
return resp;
}
@Override
public TSStatus constructViewSchemaBlackList(TConstructViewSchemaBlackListReq req) {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
AtomicInteger preDeletedNum = new AtomicInteger(0);
TSStatus executionResult =
executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
.getDatabaseFullPath();
PathPatternTree filteredPatternTree =
filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
RegionWriteExecutor executor = new RegionWriteExecutor();
TSStatus status =
executor
.execute(
new SchemaRegionId(consensusGroupId.getId()),
new ConstructLogicalViewBlackListNode(
new PlanNodeId(""), filteredPatternTree))
.getStatus();
if (status.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
preDeletedNum.getAndAdd(Integer.parseInt(status.getMessage()));
}
return status;
});
executionResult.setMessage(String.valueOf(preDeletedNum.get()));
return executionResult;
}
@Override
public TSStatus rollbackViewSchemaBlackList(TRollbackViewSchemaBlackListReq req) {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
return executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
.getDatabaseFullPath();
PathPatternTree filteredPatternTree = filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
RegionWriteExecutor executor = new RegionWriteExecutor();
return executor
.execute(
new SchemaRegionId(consensusGroupId.getId()),
new RollbackLogicalViewBlackListNode(new PlanNodeId(""), filteredPatternTree))
.getStatus();
});
}
@Override
public TSStatus deleteViewSchema(TDeleteViewSchemaReq req) {
PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));
return executeInternalSchemaTask(
req.getSchemaRegionIdList(),
consensusGroupId -> {
String storageGroup =
schemaEngine
.getSchemaRegion(new SchemaRegionId(consensusGroupId.getId()))
.getDatabaseFullPath();
PathPatternTree filteredPatternTree = filterPathPatternTree(patternTree, storageGroup);
if (filteredPatternTree.isEmpty()) {
return RpcUtils.SUCCESS_STATUS;
}
RegionWriteExecutor executor = new RegionWriteExecutor();
return executor
.execute(
new SchemaRegionId(consensusGroupId.getId()),
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe()
? new PipeEnrichedNonWritePlanNode(
new DeleteLogicalViewNode(new PlanNodeId(""), filteredPatternTree))
: new DeleteLogicalViewNode(new PlanNodeId(""), filteredPatternTree))
.getStatus();
});
}
@Override
public TSStatus alterView(TAlterViewReq req) {
List<TConsensusGroupId> consensusGroupIdList = req.getSchemaRegionIdList();
List<ByteBuffer> viewBinaryList = req.getViewBinaryList();
Map<TConsensusGroupId, Map<PartialPath, ViewExpression>> schemaRegionRequestMap =
new HashMap<>();
for (int i = 0; i < consensusGroupIdList.size(); i++) {
ByteBuffer byteBuffer = viewBinaryList.get(i);
int size = ReadWriteIOUtils.readInt(byteBuffer);
Map<PartialPath, ViewExpression> viewMap = new HashMap<>();
for (int j = 0; j < size; j++) {
viewMap.put(
(PartialPath) PathDeserializeUtil.deserialize(byteBuffer),
ViewExpression.deserialize(byteBuffer));
}
schemaRegionRequestMap.put(consensusGroupIdList.get(i), viewMap);
}
return executeInternalSchemaTask(
consensusGroupIdList,
consensusGroupId -> {
RegionWriteExecutor executor = new RegionWriteExecutor();
return executor
.execute(
new SchemaRegionId(consensusGroupId.getId()),
req.isSetIsGeneratedByPipe() && req.isIsGeneratedByPipe()
? new PipeEnrichedNonWritePlanNode(
new AlterLogicalViewNode(
new PlanNodeId(""), schemaRegionRequestMap.get(consensusGroupId)))
: new AlterLogicalViewNode(
new PlanNodeId(""), schemaRegionRequestMap.get(consensusGroupId)))
.getStatus();
});
}
@Override
public TPushPipeMetaResp pushPipeMeta(TPushPipeMetaReq req) {
final List<PipeMeta> pipeMetas = new ArrayList<>();
for (ByteBuffer byteBuffer : req.getPipeMetas()) {
pipeMetas.add(PipeMeta.deserialize(byteBuffer));
}
try {
List<TPushPipeMetaRespExceptionMessage> exceptionMessages =
PipeAgent.task().handlePipeMetaChanges(pipeMetas);
return exceptionMessages.isEmpty()
? new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
: new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(exceptionMessages);
} catch (Exception e) {
LOGGER.error("Error occurred when pushing pipe meta", e);
return new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()));
}
}
@Override
public TPushPipeMetaResp pushSinglePipeMeta(TPushSinglePipeMetaReq req) {
try {
TPushPipeMetaRespExceptionMessage exceptionMessage;
if (req.isSetPipeNameToDrop()) {
exceptionMessage = PipeAgent.task().handleDropPipe(req.getPipeNameToDrop());
} else if (req.isSetPipeMeta()) {
final PipeMeta pipeMeta = PipeMeta.deserialize(ByteBuffer.wrap(req.getPipeMeta()));
exceptionMessage = PipeAgent.task().handleSinglePipeMetaChanges(pipeMeta);
} else {
throw new Exception("Invalid TPushSinglePipeMetaReq");
}
return exceptionMessage == null
? new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
: new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(Collections.singletonList(exceptionMessage));
} catch (Exception e) {
LOGGER.error("Error occurred when pushing single pipe meta", e);
return new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()));
}
}
@Override
public TPushPipeMetaResp pushMultiPipeMeta(TPushMultiPipeMetaReq req) {
boolean hasException = false;
// If there is any exception, we use the size of exceptionMessages to record the fail index
List<TPushPipeMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
try {
if (req.isSetPipeNamesToDrop()) {
for (String pipeNameToDrop : req.getPipeNamesToDrop()) {
TPushPipeMetaRespExceptionMessage message =
PipeAgent.task().handleDropPipe(pipeNameToDrop);
exceptionMessages.add(message);
if (message != null) {
// If there is any exception, skip the remaining pipes
hasException = true;
break;
}
}
} else if (req.isSetPipeMetas()) {
for (ByteBuffer byteBuffer : req.getPipeMetas()) {
final PipeMeta pipeMeta = PipeMeta.deserialize(byteBuffer);
TPushPipeMetaRespExceptionMessage message =
PipeAgent.task().handleSinglePipeMetaChanges(pipeMeta);
exceptionMessages.add(message);
if (message != null) {
// If there is any exception, skip the remaining pipes
hasException = true;
break;
}
}
} else {
throw new Exception("Invalid TPushMultiPipeMetaReq");
}
return hasException
? new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(exceptionMessages)
: new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
} catch (Exception e) {
LOGGER.warn("Error occurred when pushing multi pipe meta", e);
return new TPushPipeMetaResp()
.setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(exceptionMessages);
}
}
@Override
public TPushTopicMetaResp pushTopicMeta(TPushTopicMetaReq req) {
final List<TopicMeta> topicMetas = new ArrayList<>();
for (ByteBuffer byteBuffer : req.getTopicMetas()) {
topicMetas.add(TopicMeta.deserialize(byteBuffer));
}
try {
TPushTopicMetaRespExceptionMessage exceptionMessage =
SubscriptionAgent.topic().handleTopicMetaChanges(topicMetas);
return exceptionMessage == null
? new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
: new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.TOPIC_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(Collections.singletonList(exceptionMessage));
} catch (Exception e) {
LOGGER.warn("Error occurred when pushing topic meta", e);
return new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.TOPIC_PUSH_META_ERROR.getStatusCode()));
}
}
@Override
public TPushTopicMetaResp pushSingleTopicMeta(TPushSingleTopicMetaReq req) {
try {
final TPushTopicMetaRespExceptionMessage exceptionMessage;
if (req.isSetTopicNameToDrop()) {
exceptionMessage = SubscriptionAgent.topic().handleDropTopic(req.getTopicNameToDrop());
} else if (req.isSetTopicMeta()) {
exceptionMessage =
SubscriptionAgent.topic()
.handleSingleTopicMetaChanges(
TopicMeta.deserialize(ByteBuffer.wrap(req.getTopicMeta())));
} else {
throw new SubscriptionException("Invalid request " + req + " from config node.");
}
return exceptionMessage == null
? new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
: new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.TOPIC_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(Collections.singletonList(exceptionMessage));
} catch (Exception e) {
LOGGER.warn("Error occurred when pushing single topic meta", e);
return new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.TOPIC_PUSH_META_ERROR.getStatusCode()));
}
}
@Override
public TPushTopicMetaResp pushMultiTopicMeta(TPushMultiTopicMetaReq req) {
boolean hasException = false;
// If there is any exception, we use the size of exceptionMessages to record the fail index
List<TPushTopicMetaRespExceptionMessage> exceptionMessages = new ArrayList<>();
try {
if (req.isSetTopicNamesToDrop()) {
for (String topicNameToDrop : req.getTopicNamesToDrop()) {
TPushTopicMetaRespExceptionMessage message =
SubscriptionAgent.topic().handleDropTopic(topicNameToDrop);
exceptionMessages.add(message);
if (message != null) {
// If there is any exception, skip the remaining topics
hasException = true;
break;
}
}
} else if (req.isSetTopicMetas()) {
for (ByteBuffer byteBuffer : req.getTopicMetas()) {
final TopicMeta topicMeta = TopicMeta.deserialize(byteBuffer);
TPushTopicMetaRespExceptionMessage message =
SubscriptionAgent.topic().handleSingleTopicMetaChanges(topicMeta);
exceptionMessages.add(message);
if (message != null) {
// If there is any exception, skip the remaining pipes
hasException = true;
break;
}
}
} else {
throw new Exception("Invalid TPushMultiTopicMetaReq");
}
return hasException
? new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.TOPIC_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(exceptionMessages)
: new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
} catch (Exception e) {
LOGGER.warn("Error occurred when pushing multi topic meta", e);
return new TPushTopicMetaResp()
.setStatus(new TSStatus(TSStatusCode.TOPIC_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(exceptionMessages);
}
}
@Override
public TPushConsumerGroupMetaResp pushConsumerGroupMeta(TPushConsumerGroupMetaReq req) {
final List<ConsumerGroupMeta> consumerGroupMetas = new ArrayList<>();
for (ByteBuffer byteBuffer : req.getConsumerGroupMetas()) {
consumerGroupMetas.add(ConsumerGroupMeta.deserialize(byteBuffer));
}
try {
TPushConsumerGroupMetaRespExceptionMessage exceptionMessage =
SubscriptionAgent.consumer().handleConsumerGroupMetaChanges(consumerGroupMetas);
return exceptionMessage == null
? new TPushConsumerGroupMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
: new TPushConsumerGroupMetaResp()
.setStatus(new TSStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(Collections.singletonList(exceptionMessage));
} catch (Exception e) {
LOGGER.warn("Error occurred when pushing consumer group meta", e);
return new TPushConsumerGroupMetaResp()
.setStatus(new TSStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR.getStatusCode()));
}
}
@Override
public TPushConsumerGroupMetaResp pushSingleConsumerGroupMeta(
TPushSingleConsumerGroupMetaReq req) {
try {
final TPushConsumerGroupMetaRespExceptionMessage exceptionMessage;
if (req.isSetConsumerGroupNameToDrop()) {
exceptionMessage =
SubscriptionAgent.consumer().handleDropConsumerGroup(req.getConsumerGroupNameToDrop());
} else if (req.isSetConsumerGroupMeta()) {
exceptionMessage =
SubscriptionAgent.consumer()
.handleSingleConsumerGroupMetaChanges(
ConsumerGroupMeta.deserialize(ByteBuffer.wrap(req.getConsumerGroupMeta())));
} else {
throw new SubscriptionException("Invalid request " + req + " from config node.");
}
return exceptionMessage == null
? new TPushConsumerGroupMetaResp()
.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))
: new TPushConsumerGroupMetaResp()
.setStatus(new TSStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR.getStatusCode()))
.setExceptionMessages(Collections.singletonList(exceptionMessage));
} catch (Exception e) {
LOGGER.warn("Error occurred when pushing single consumer group meta", e);
return new TPushConsumerGroupMetaResp()
.setStatus(new TSStatus(TSStatusCode.CONSUMER_PUSH_META_ERROR.getStatusCode()));
}
}
@Override
public TPipeHeartbeatResp pipeHeartbeat(TPipeHeartbeatReq req) throws TException {
final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
PipeAgent.task().collectPipeMetaList(req, resp);
return resp;
}
private TSStatus executeInternalSchemaTask(
List<TConsensusGroupId> consensusGroupIdList,
Function<TConsensusGroupId, TSStatus> executeOnOneRegion) {
List<TSStatus> statusList = new ArrayList<>();
TSStatus status;
boolean hasFailure = false;
for (TConsensusGroupId consensusGroupId : consensusGroupIdList) {
status = executeOnOneRegion.apply(consensusGroupId);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
hasFailure = true;
}
statusList.add(status);
}
if (hasFailure) {
return RpcUtils.getStatus(statusList);
} else {
return RpcUtils.SUCCESS_STATUS;
}
}
@Override
public TSStatus executeCQ(TExecuteCQ req) {
IClientSession session = new InternalClientSession(req.cqId);
SESSION_MANAGER.registerSession(session);
SESSION_MANAGER.supplySession(
session, req.getUsername(), ZoneId.of(req.getZoneId()), ClientVersion.V_1_0);
String executedSQL = req.queryBody;
try {
QueryStatement s =
(QueryStatement) StatementGenerator.createStatement(req.queryBody, session.getZoneId());
if (s == null) {
return RpcUtils.getStatus(
TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported");
}
// 1. Add time filter in where
Expression timeFilter =
new LogicAndExpression(
new GreaterEqualExpression(
new TimestampOperand(),
new ConstantOperand(TSDataType.INT64, String.valueOf(req.startTime))),
new LessThanExpression(
new TimestampOperand(),
new ConstantOperand(TSDataType.INT64, String.valueOf(req.endTime))));
if (s.getWhereCondition() != null) {
s.getWhereCondition()
.setPredicate(new LogicAndExpression(timeFilter, s.getWhereCondition().getPredicate()));
} else {
s.setWhereCondition(new WhereCondition(timeFilter));
}
// 2. Add time range in group by time
if (s.getGroupByTimeComponent() != null) {
s.getGroupByTimeComponent().setStartTime(req.startTime);
s.getGroupByTimeComponent().setEndTime(req.endTime);
s.getGroupByTimeComponent().setLeftCRightO(true);
}
executedSQL = String.join(" ", s.constructFormattedSQL().split("\n")).replaceAll(" +", " ");
long queryId =
SESSION_MANAGER.requestQueryId(session, SESSION_MANAGER.requestStatementId(session));
// Create and cache dataset
ExecutionResult result =
COORDINATOR.executeForTreeModel(
s,
queryId,
SESSION_MANAGER.getSessionInfo(session),
executedSQL,
partitionFetcher,
schemaFetcher,
req.getTimeout());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return result.status;
}
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
if (queryExecution != null) {
// Consume up all the result
while (true) {
Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult();
if (!optionalTsBlock.isPresent()) {
break;
}
}
}
return result.status;
}
} catch (Exception e) {
// TODO call the coordinator to release query resource
return onQueryException(e, "\"" + executedSQL + "\". " + OperationType.EXECUTE_STATEMENT);
} finally {
SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution);
SESSION_MANAGER.removeCurrSession();
}
}
@Override
public TSStatus setSpaceQuota(TSetSpaceQuotaReq req) throws TException {
return spaceQuotaManager.setSpaceQuota(req);
}
@Override
public TSStatus setThrottleQuota(TSetThrottleQuotaReq req) throws TException {
return throttleQuotaManager.setThrottleQuota(req);
}
@Override
public TFetchFragmentInstanceStatisticsResp fetchFragmentInstanceStatistics(
TFetchFragmentInstanceStatisticsReq req) throws TException {
FragmentInstanceManager fragmentInstanceManager = FragmentInstanceManager.getInstance();
TFetchFragmentInstanceStatisticsResp resp;
try {
resp =
fragmentInstanceManager.getFragmentInstanceStatistics(
FragmentInstanceId.fromThrift(req.getFragmentInstanceId()));
resp.setStatus(RpcUtils.SUCCESS_STATUS);
} catch (Exception e) {
resp = new TFetchFragmentInstanceStatisticsResp();
resp.setStatus(RpcUtils.getStatus(TSStatusCode.EXPLAIN_ANALYZE_FETCH_ERROR, e.getMessage()));
LOGGER.error(e.getMessage());
}
return resp;
}
private PathPatternTree filterPathPatternTree(PathPatternTree patternTree, String storageGroup) {
PathPatternTree filteredPatternTree = new PathPatternTree();
try {
PartialPath storageGroupPattern =
new PartialPath(storageGroup).concatNode(MULTI_LEVEL_PATH_WILDCARD);
for (PartialPath pathPattern : patternTree.getOverlappedPathPatterns(storageGroupPattern)) {
filteredPatternTree.appendPathPattern(pathPattern);
}
filteredPatternTree.constructTree();
} catch (IllegalPathException e) {
// Won't reach here
}
return filteredPatternTree;
}
@Override
public TDataNodeHeartbeatResp getDataNodeHeartBeat(TDataNodeHeartbeatReq req) throws TException {
TDataNodeHeartbeatResp resp = new TDataNodeHeartbeatResp();
// Judging leader if necessary
if (req.isNeedJudgeLeader()) {
// Always get logical clock before judging leader
// to ensure that the leader is up-to-date
resp.setConsensusLogicalTimeMap(getLogicalClockMap());
resp.setJudgedLeaders(getJudgedLeaders());
}
// Sampling load if necessary
if (req.isNeedSamplingLoad()) {
TLoadSample loadSample = new TLoadSample();
// Sample cpu load
double cpuLoad =
MetricService.getInstance()
.getAutoGauge(
SystemMetric.SYS_CPU_LOAD.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
SYSTEM)
.getValue();
if (cpuLoad != 0) {
loadSample.setCpuUsageRate(cpuLoad);
}
// Sample memory load
double usedMemory = getMemory("jvm.memory.used.bytes");
double maxMemory = getMemory("jvm.memory.max.bytes");
if (usedMemory != 0 && maxMemory != 0) {
loadSample.setMemoryUsageRate(usedMemory * 100 / maxMemory);
}
// Sample disk load
sampleDiskLoad(loadSample);
resp.setLoadSample(loadSample);
}
AuthorityChecker.getAuthorityFetcher().refreshToken();
resp.setHeartbeatTimestamp(req.getHeartbeatTimestamp());
resp.setStatus(commonConfig.getNodeStatus().getStatus());
if (commonConfig.getStatusReason() != null) {
resp.setStatusReason(commonConfig.getStatusReason());
}
if (req.getSchemaRegionIds() != null) {
spaceQuotaManager.updateSpaceQuotaUsage(req.getSpaceQuotaUsage());
resp.setRegionDeviceUsageMap(
schemaEngine.countDeviceNumBySchemaRegion(req.getSchemaRegionIds()));
resp.setRegionSeriesUsageMap(
schemaEngine.countTimeSeriesNumBySchemaRegion(req.getSchemaRegionIds()));
}
if (req.getDataRegionIds() != null) {
spaceQuotaManager.setDataRegionIds(req.getDataRegionIds());
resp.setRegionDisk(spaceQuotaManager.getRegionDisk());
}
// Update schema quota if necessary
SchemaEngine.getInstance().updateAndFillSchemaCountMap(req, resp);
// Update pipe meta if necessary
if (req.isNeedPipeMetaList()) {
PipeAgent.task().collectPipeMetaList(resp);
}
if (req.isSetConfigNodeEndPoints()) {
if (ConfigNodeInfo.getInstance()
.updateConfigNodeList(new ArrayList<>(req.getConfigNodeEndPoints()))) {
resp.setConfirmedConfigNodeEndPoints(req.getConfigNodeEndPoints());
}
}
return resp;
}
@Override
public TSStatus updateRegionCache(TRegionRouteReq req) {
boolean result = ClusterPartitionFetcher.getInstance().updateRegionCache(req);
if (result) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
return RpcUtils.getStatus(TSStatusCode.PARTITION_CACHE_UPDATE_ERROR);
}
}
private Map<TConsensusGroupId, Boolean> getJudgedLeaders() {
Map<TConsensusGroupId, Boolean> result = new HashMap<>();
DataRegionConsensusImpl.getInstance()
.getAllConsensusGroupIds()
.forEach(
groupId ->
result.put(
groupId.convertToTConsensusGroupId(),
DataRegionConsensusImpl.getInstance().isLeader(groupId)));
SchemaRegionConsensusImpl.getInstance()
.getAllConsensusGroupIds()
.forEach(
groupId ->
result.put(
groupId.convertToTConsensusGroupId(),
SchemaRegionConsensusImpl.getInstance().isLeader(groupId)));
return result;
}
private Map<TConsensusGroupId, Long> getLogicalClockMap() {
Map<TConsensusGroupId, Long> result = new HashMap<>();
DataRegionConsensusImpl.getInstance()
.getAllConsensusGroupIds()
.forEach(
groupId ->
result.put(
groupId.convertToTConsensusGroupId(),
DataRegionConsensusImpl.getInstance().getLogicalClock(groupId)));
SchemaRegionConsensusImpl.getInstance()
.getAllConsensusGroupIds()
.forEach(
groupId ->
result.put(
groupId.convertToTConsensusGroupId(),
SchemaRegionConsensusImpl.getInstance().getLogicalClock(groupId)));
return result;
}
private long getLogicalClock(TConsensusGroupId groupId) {
switch (groupId.getType()) {
case DataRegion:
return DataRegionConsensusImpl.getInstance()
.getLogicalClock(ConsensusGroupId.Factory.createFromTConsensusGroupId(groupId));
case SchemaRegion:
return SchemaRegionConsensusImpl.getInstance()
.getLogicalClock(ConsensusGroupId.Factory.createFromTConsensusGroupId(groupId));
default:
throw new IllegalArgumentException("Unknown consensus group type: " + groupId.getType());
}
}
private double getMemory(String gaugeName) {
double result = 0d;
try {
//
List<String> heapIds = Arrays.asList("PS Eden Space", "PS Old Eden", "Ps Survivor Space");
List<String> noHeapIds = Arrays.asList("Code Cache", "Compressed Class Space", "Metaspace");
for (String id : heapIds) {
AutoGauge gauge =
MetricService.getInstance()
.getAutoGauge(gaugeName, MetricLevel.IMPORTANT, "id", id, "area", "heap");
result += gauge.getValue();
}
for (String id : noHeapIds) {
AutoGauge gauge =
MetricService.getInstance()
.getAutoGauge(gaugeName, MetricLevel.IMPORTANT, "id", id, "area", "noheap");
result += gauge.getValue();
}
} catch (Exception e) {
LOGGER.warn("Failed to get memory from metric because: ", e);
return 0d;
}
return result;
}
private void sampleDiskLoad(TLoadSample loadSample) {
double availableDisk =
MetricService.getInstance()
.getAutoGauge(
SystemMetric.SYS_DISK_AVAILABLE_SPACE.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
SYSTEM)
.getValue();
double totalDisk =
MetricService.getInstance()
.getAutoGauge(
SystemMetric.SYS_DISK_TOTAL_SPACE.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
SYSTEM)
.getValue();
if (availableDisk != 0 && totalDisk != 0) {
double freeDiskRatio = availableDisk / totalDisk;
loadSample.setFreeDiskSpace(availableDisk);
loadSample.setDiskUsageRate(1d - freeDiskRatio);
// Reset NodeStatus if necessary
if (freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) {
LOGGER.warn(
"The available disk space is : {}, "
+ "the total disk space is : {}, "
+ "and the remaining disk usage ratio: {} is "
+ "less than disk_spec_warning_threshold: {}, set system to readonly!",
RamUsageEstimator.humanReadableUnits((long) availableDisk),
RamUsageEstimator.humanReadableUnits((long) totalDisk),
freeDiskRatio,
commonConfig.getDiskSpaceWarningThreshold());
commonConfig.setNodeStatus(NodeStatus.ReadOnly);
commonConfig.setStatusReason(NodeStatus.DISK_FULL);
} else if (NodeStatus.ReadOnly.equals(commonConfig.getNodeStatus())
&& NodeStatus.DISK_FULL.equals(commonConfig.getStatusReason())) {
commonConfig.setNodeStatus(NodeStatus.Running);
commonConfig.setStatusReason(null);
}
}
}
@Override
public TSStatus invalidatePermissionCache(TInvalidatePermissionCacheReq req) {
if (AuthorityChecker.invalidateCache(req.getUsername(), req.getRoleName())) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
return RpcUtils.getStatus(TSStatusCode.CLEAR_PERMISSION_CACHE_ERROR);
}
@Override
public TSStatus merge() throws TException {
try {
storageEngine.mergeAll();
} catch (StorageEngineException e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus startRepairData() throws TException {
if (!storageEngine.isAllSgReady()) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, "not all sg is ready");
}
IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
if (!iotdbConfig.isEnableSeqSpaceCompaction() || !iotdbConfig.isEnableUnseqSpaceCompaction()) {
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR,
"cannot start repair task because inner space compaction is not enabled");
}
try {
if (storageEngine.repairData()) {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
if (CompactionScheduleTaskManager.getRepairTaskManagerInstance().getRepairTaskStatus()
== RepairTaskStatus.STOPPING) {
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "previous repair task is still stopping");
} else {
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "already have a running repair task");
}
}
} catch (StorageEngineException e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
}
@Override
public TSStatus stopRepairData() throws TException {
try {
storageEngine.stopRepairData();
} catch (StorageEngineException e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus flush(TFlushReq req) throws TException {
try {
storageEngine.operateFlush(req);
} catch (Exception e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus clearCache() throws TException {
try {
storageEngine.clearCache();
} catch (Exception e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus settle(TSettleReq req) throws TException {
return SettleRequestHandler.getInstance().handleSettleRequest(req);
}
@Override
public TSStatus loadConfiguration() throws TException {
try {
IoTDBDescriptor.getInstance().loadHotModifiedProps();
} catch (Exception e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus setSystemStatus(String status) throws TException {
try {
commonConfig.setNodeStatus(NodeStatus.parse(status));
if (commonConfig.getNodeStatus().equals(NodeStatus.Removing)) {
PipeAgent.runtime().stop();
}
} catch (Exception e) {
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus killQueryInstance(String queryId) {
Coordinator coordinator = Coordinator.getInstance();
if (queryId == null) {
coordinator.getAllQueryExecutions().forEach(IQueryExecution::cancel);
} else {
Optional<IQueryExecution> queryExecution =
coordinator.getAllQueryExecutions().stream()
.filter(iQueryExecution -> iQueryExecution.getQueryId().equals(queryId))
.findAny();
if (queryExecution.isPresent()) {
queryExecution.get().cancel();
} else {
return new TSStatus(TSStatusCode.NO_SUCH_QUERY.getStatusCode()).setMessage("No such query");
}
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TSStatus setTTL(TSetTTLReq req) throws TException {
return storageEngine.setTTL(req);
}
@Override
public TSStatus updateTemplate(TUpdateTemplateReq req) {
switch (TemplateInternalRPCUpdateType.getType(req.type)) {
case ADD_TEMPLATE_SET_INFO:
DataNodeSchemaLockManager.getInstance()
.takeWriteLock(SchemaLockType.TIMESERIES_VS_TEMPLATE);
try {
ClusterTemplateManager.getInstance().addTemplateSetInfo(req.getTemplateInfo());
} finally {
DataNodeSchemaLockManager.getInstance()
.releaseWriteLock(SchemaLockType.TIMESERIES_VS_TEMPLATE);
}
break;
case INVALIDATE_TEMPLATE_SET_INFO:
ClusterTemplateManager.getInstance().invalidateTemplateSetInfo(req.getTemplateInfo());
break;
case ADD_TEMPLATE_PRE_SET_INFO:
ClusterTemplateManager.getInstance().addTemplatePreSetInfo(req.getTemplateInfo());
break;
case COMMIT_TEMPLATE_SET_INFO:
ClusterTemplateManager.getInstance().commitTemplatePreSetInfo(req.getTemplateInfo());
break;
case UPDATE_TEMPLATE_INFO:
ClusterTemplateManager.getInstance().updateTemplateInfo(req.getTemplateInfo());
break;
default:
LOGGER.warn("Unsupported type {} when updating template", req.type);
return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
if (consensusGroupId instanceof DataRegionId) {
try {
DataRegionConsensusImpl.getInstance().deleteLocalPeer(consensusGroupId);
} catch (ConsensusException e) {
if (!(e instanceof ConsensusGroupNotExistException)) {
return RpcUtils.getStatus(TSStatusCode.DELETE_REGION_ERROR, e.getMessage());
}
}
return regionManager.deleteDataRegion((DataRegionId) consensusGroupId);
} else {
try {
SchemaRegionConsensusImpl.getInstance().deleteLocalPeer(consensusGroupId);
} catch (ConsensusException e) {
if (!(e instanceof ConsensusGroupNotExistException)) {
return RpcUtils.getStatus(TSStatusCode.DELETE_REGION_ERROR, e.getMessage());
}
}
return regionManager.deleteSchemaRegion((SchemaRegionId) consensusGroupId);
}
}
@Override
public TRegionLeaderChangeResp changeRegionLeader(TRegionLeaderChangeReq req) {
LOGGER.info("[ChangeRegionLeader] {}", req);
TRegionLeaderChangeResp resp = new TRegionLeaderChangeResp();
TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
TConsensusGroupId tgId = req.getRegionId();
ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId);
TEndPoint newNode = getConsensusEndPoint(req.getNewLeaderNode(), regionId);
Peer newLeaderPeer = new Peer(regionId, req.getNewLeaderNode().getDataNodeId(), newNode);
if (isLeader(regionId)) {
String msg =
"[ChangeRegionLeader] The current DataNode: "
+ req.getNewLeaderNode().getDataNodeId()
+ " is already the leader of RegionGroup: "
+ regionId
+ ", skip leader transfer.";
LOGGER.info(msg);
resp.setStatus(successStatus.setMessage(msg));
resp.setConsensusLogicalTimestamp(getLogicalClock(req.getRegionId()));
return resp;
}
LOGGER.info(
"[ChangeRegionLeader] Start change the leader of RegionGroup: {} to DataNode: {}",
regionId,
req.getNewLeaderNode().getDataNodeId());
resp.setStatus(transferLeader(regionId, newLeaderPeer));
resp.setConsensusLogicalTimestamp(getLogicalClock(req.getRegionId()));
return resp;
}
private TSStatus transferLeader(ConsensusGroupId regionId, Peer newLeaderPeer) {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
try {
if (regionId instanceof DataRegionId) {
DataRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
} else if (regionId instanceof SchemaRegionId) {
SchemaRegionConsensusImpl.getInstance().transferLeader(regionId, newLeaderPeer);
} else {
status.setCode(TSStatusCode.REGION_LEADER_CHANGE_ERROR.getStatusCode());
status.setMessage("[ChangeRegionLeader] Error Region type: " + regionId);
return status;
}
} catch (ConsensusException e) {
LOGGER.warn(
"[ChangeRegionLeader] Failed to change the leader of RegionGroup: {}", regionId, e);
status.setCode(TSStatusCode.REGION_LEADER_CHANGE_ERROR.getStatusCode());
status.setMessage(e.getMessage());
return status;
}
status.setMessage(
"[ChangeRegionLeader] Successfully change the leader of RegionGroup: "
+ regionId
+ " to "
+ newLeaderPeer.getNodeId());
return status;
}
private boolean isLeader(ConsensusGroupId regionId) {
if (regionId instanceof DataRegionId) {
return DataRegionConsensusImpl.getInstance().isLeader(regionId);
}
if (regionId instanceof SchemaRegionId) {
return SchemaRegionConsensusImpl.getInstance().isLeader(regionId);
}
LOGGER.warn("region {} type is illegal", regionId);
return false;
}
@Override
public TSStatus createNewRegionPeer(TCreatePeerReq req) {
ConsensusGroupId regionId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
List<Peer> peers =
req.getRegionLocations().stream()
.map(
location ->
new Peer(
regionId,
location.getDataNodeId(),
getConsensusEndPoint(location, regionId)))
.collect(Collectors.toList());
TSStatus status = createNewRegion(regionId, req.getStorageGroup(), req.getTtl());
if (!isSucceed(status)) {
return status;
}
return createNewRegionPeer(regionId, peers);
}
@Override
public TSStatus addRegionPeer(TMaintainPeerReq req) {
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed = RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
"Successfully submit addRegionPeer task for region: {}, target DataNode: {}",
regionId,
selectedDataNodeIP);
return status;
}
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("Submit addRegionPeer task failed, region: " + regionId);
return status;
}
@Override
public TSStatus removeRegionPeer(TMaintainPeerReq req) {
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
"Successfully submit removeRegionPeer task for region: {}, DataNode to be removed: {}",
regionId,
selectedDataNodeIP);
return status;
}
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("Submit removeRegionPeer task failed, region: " + regionId);
return status;
}
@Override
public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) {
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed = RegionMigrateService.getInstance().submitDeleteOldRegionPeerTask(req);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
if (submitSucceed) {
LOGGER.info(
"Successfully submit deleteOldRegionPeer task for region: {}, DataNode to be removed: {}",
regionId,
selectedDataNodeIP);
return status;
}
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage("Submit deleteOldRegionPeer task failed, region: " + regionId);
return status;
}
// TODO: return which DataNode fail
@Override
public TSStatus resetPeerList(TResetPeerListReq req) throws TException {
return RegionMigrateService.getInstance().resetPeerList(req);
}
@Override
public TRegionMigrateResult getRegionMaintainResult(long taskId) throws TException {
return RegionMigrateService.getInstance().getRegionMaintainResult(taskId);
}
private TSStatus createNewRegion(ConsensusGroupId regionId, String storageGroup, long ttl) {
return regionManager.createNewRegion(regionId, storageGroup, ttl);
}
@Override
public TSStatus createFunction(TCreateFunctionInstanceReq req) {
try {
UDFInformation udfInformation = UDFInformation.deserialize(req.udfInformation);
UDFManagementService.getInstance().register(udfInformation, req.jarFile);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
return new TSStatus(TSStatusCode.CREATE_UDF_ON_DATANODE_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
}
@Override
public TSStatus dropFunction(TDropFunctionInstanceReq req) {
try {
UDFManagementService.getInstance().deregister(req.getFunctionName(), req.isNeedToDeleteJar());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
return new TSStatus(TSStatusCode.DROP_UDF_ON_DATANODE_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
}
@Override
public TSStatus createTriggerInstance(TCreateTriggerInstanceReq req) {
TriggerInformation triggerInformation = TriggerInformation.deserialize(req.triggerInformation);
try {
// Register trigger information with TriggerRegistrationService
// Config nodes take responsibility for synchronization control
TriggerManagementService.getInstance().register(triggerInformation, req.jarFile);
} catch (Exception e) {
LOGGER.warn(
"Error occurred when creating trigger instance for trigger: {}. The cause is {}.",
triggerInformation.getTriggerName(),
e);
return new TSStatus(TSStatusCode.CREATE_TRIGGER_INSTANCE_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TSStatus activeTriggerInstance(TActiveTriggerInstanceReq req) {
try {
TriggerManagementService.getInstance().activeTrigger(req.triggerName);
} catch (Exception e) {
LOGGER.warn(
"Error occurred during active trigger instance for trigger: {}. The cause is {}.",
req.triggerName,
e);
return new TSStatus(TSStatusCode.ACTIVE_TRIGGER_INSTANCE_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TSStatus inactiveTriggerInstance(TInactiveTriggerInstanceReq req) {
try {
TriggerManagementService.getInstance().inactiveTrigger(req.triggerName);
} catch (Exception e) {
LOGGER.warn(
"Error occurred when try to inactive trigger instance for trigger: {}. The cause is {}. ",
req.triggerName,
e);
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TSStatus dropTriggerInstance(TDropTriggerInstanceReq req) {
try {
TriggerManagementService.getInstance().dropTrigger(req.triggerName, req.needToDeleteJarFile);
} catch (Exception e) {
LOGGER.warn(
"Error occurred when dropping trigger instance for trigger: {}. The cause is {}.",
req.triggerName,
e);
return new TSStatus(TSStatusCode.DROP_TRIGGER_INSTANCE_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TSStatus updateTriggerLocation(TUpdateTriggerLocationReq req) {
try {
TriggerManagementService.getInstance()
.updateLocationOfStatefulTrigger(req.triggerName, req.newLocation);
} catch (Exception e) {
LOGGER.warn(
"Error occurred when updating Location for trigger: {}. The cause is {}.",
req.triggerName,
e);
return new TSStatus(TSStatusCode.UPDATE_TRIGGER_LOCATION_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
public TFireTriggerResp fireTrigger(TFireTriggerReq req) {
String triggerName = req.getTriggerName();
TriggerExecutor executor = TriggerManagementService.getInstance().getExecutor(triggerName);
// No executor for given trigger name on this data node
if (executor == null) {
return new TFireTriggerResp(false, TriggerFireResult.FAILED_NO_TERMINATION.getId());
}
TriggerFireResult result = TriggerFireResult.SUCCESS;
try {
boolean fireResult =
executor.fire(
Tablet.deserialize(req.tablet), TriggerEvent.construct(req.getTriggerEvent()));
if (!fireResult) {
result =
executor.getFailureStrategy().equals(FailureStrategy.PESSIMISTIC)
? TriggerFireResult.TERMINATION
: TriggerFireResult.FAILED_NO_TERMINATION;
}
} catch (Exception e) {
result =
executor.getFailureStrategy().equals(FailureStrategy.PESSIMISTIC)
? TriggerFireResult.TERMINATION
: TriggerFireResult.FAILED_NO_TERMINATION;
}
return new TFireTriggerResp(true, result.getId());
}
private TEndPoint getConsensusEndPoint(
TDataNodeLocation nodeLocation, ConsensusGroupId regionId) {
if (regionId instanceof DataRegionId) {
return nodeLocation.getDataRegionConsensusEndPoint();
}
return nodeLocation.getSchemaRegionConsensusEndPoint();
}
@Override
public TSStatus createPipePlugin(TCreatePipePluginInstanceReq req) {
try {
PipePluginMeta pipePluginMeta = PipePluginMeta.deserialize(req.pipePluginMeta);
PipeAgent.plugin().register(pipePluginMeta, req.jarFile);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
return new TSStatus(TSStatusCode.CREATE_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
}
@Override
public TSStatus dropPipePlugin(TDropPipePluginInstanceReq req) {
try {
PipeAgent.plugin().deregister(req.getPipePluginName(), req.isNeedToDeleteJar());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (Exception e) {
return new TSStatus(TSStatusCode.DROP_PIPE_PLUGIN_ON_DATANODE_ERROR.getStatusCode())
.setMessage(e.getMessage());
}
}
private boolean isSucceed(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
private TSStatus createNewRegionPeer(ConsensusGroupId regionId, List<Peer> peers) {
LOGGER.info(
"{}, Start to createNewRegionPeer {} to region {}",
REGION_MIGRATE_PROCESS,
peers,
regionId);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
try {
if (regionId instanceof DataRegionId) {
DataRegionConsensusImpl.getInstance().createLocalPeer(regionId, peers);
} else {
SchemaRegionConsensusImpl.getInstance().createLocalPeer(regionId, peers);
}
} catch (ConsensusException e) {
if (!(e instanceof ConsensusGroupAlreadyExistException)) {
LOGGER.warn(
"{}, CreateNewRegionPeer error, peers: {}, regionId: {}, errorMessage",
REGION_MIGRATE_PROCESS,
peers,
regionId,
e);
status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
status.setMessage(e.getMessage());
return status;
}
}
LOGGER.info(
"{}, Succeed to createNewRegionPeer {} for region {}",
REGION_MIGRATE_PROCESS,
peers,
regionId);
status.setMessage("createNewRegionPeer succeed, regionId: " + regionId);
return status;
}
@Override
public TSStatus disableDataNode(TDisableDataNodeReq req) {
LOGGER.info("start disable data node in the request: {}", req);
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("disable datanode succeed");
// TODO what need to clean?
ClusterPartitionFetcher.getInstance().invalidAllCache();
DataNodeSchemaCache.getInstance().takeWriteLock();
try {
DataNodeSchemaCache.getInstance().cleanUp();
} finally {
DataNodeSchemaCache.getInstance().releaseWriteLock();
}
DataNodeDevicePathCache.getInstance().cleanUp();
return status;
}
@SuppressWarnings("squid:S2142") // ignore Either re-interrupt this method or rethrow
@Override
public TSStatus stopDataNode() {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
LOGGER.info("Execute stopDataNode RPC method");
// kill the datanode process 20 seconds later
// because datanode process cannot exit normally for the reason of InterruptedException
new Thread(
() -> {
try {
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
LOGGER.warn("Meets InterruptedException in stopDataNode RPC method");
} finally {
LOGGER.info("Executing system.exit(0) in stopDataNode RPC method after 20 seconds");
System.exit(0);
}
})
.start();
try {
DataNode.getInstance().stop();
status.setMessage("stop datanode succeed");
} catch (Exception e) {
LOGGER.warn("Stop Data Node error", e);
status.setCode(TSStatusCode.DATANODE_STOP_ERROR.getStatusCode());
status.setMessage(e.getMessage());
}
return status;
}
public void handleClientExit() {
// Do nothing
}
}