| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.confignode.persistence.executor; |
| |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.common.rpc.thrift.TSchemaNode; |
| import org.apache.iotdb.commons.auth.AuthException; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.commons.schema.node.MNodeType; |
| import org.apache.iotdb.commons.snapshot.SnapshotProcessor; |
| import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; |
| import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan; |
| import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan; |
| import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.function.GetUDFJarPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.CountTimeSlotListPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetNodePathsPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetSchemaPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlotListPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.pipe.plugin.GetPipePluginJarPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplateSettablePlan; |
| import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.read.template.GetTemplateSetInfoPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerJarPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerLocationPlan; |
| import org.apache.iotdb.confignode.consensus.request.read.trigger.GetTriggerTablePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateClusterIdPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.confignode.UpdateVersionInfoPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.database.AdjustMaxRegionGroupNumPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleLeaderChangePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.pipe.task.AlterPipePlanV2; |
| import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2; |
| import org.apache.iotdb.confignode.consensus.request.write.pipe.task.DropPipePlanV2; |
| import org.apache.iotdb.confignode.consensus.request.write.pipe.task.OperateMultiplePipesPlanV2; |
| import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; |
| import org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.quota.SetSpaceQuotaPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.quota.SetThrottleQuotaPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.AlterConsumerGroupPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.AlterMultipleTopicsPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.AlterTopicPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.template.CommitSetSchemaTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.template.ExtendSchemaTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.template.PreSetSchemaTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.template.UnsetSchemaTemplatePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.trigger.AddTriggerInTablePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.trigger.DeleteTriggerInTablePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerLocationPlan; |
| import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggerStateInTablePlan; |
| import org.apache.iotdb.confignode.consensus.request.write.trigger.UpdateTriggersOnTransferNodesPlan; |
| import org.apache.iotdb.confignode.consensus.response.partition.SchemaNodeManagementResp; |
| import org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeException; |
| import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent; |
| import org.apache.iotdb.confignode.persistence.AuthorInfo; |
| import org.apache.iotdb.confignode.persistence.ClusterInfo; |
| import org.apache.iotdb.confignode.persistence.ProcedureInfo; |
| import org.apache.iotdb.confignode.persistence.TriggerInfo; |
| import org.apache.iotdb.confignode.persistence.UDFInfo; |
| import org.apache.iotdb.confignode.persistence.cq.CQInfo; |
| import org.apache.iotdb.confignode.persistence.node.NodeInfo; |
| import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; |
| import org.apache.iotdb.confignode.persistence.pipe.PipeInfo; |
| import org.apache.iotdb.confignode.persistence.quota.QuotaInfo; |
| import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo; |
| import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo; |
| import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; |
| import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; |
| import org.apache.iotdb.consensus.common.DataSet; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| |
| import org.apache.thrift.TException; |
| import org.apache.tsfile.utils.Pair; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.stream.Collectors; |
| |
| public class ConfigPlanExecutor { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(ConfigPlanExecutor.class); |
| |
| /** |
| * Every Info class that implement SnapshotProcessor in ConfigNode should be registered in |
| * snapshotProcessorList. |
| */ |
| private final List<SnapshotProcessor> snapshotProcessorList; |
| |
| private final ClusterInfo clusterInfo; |
| |
| private final NodeInfo nodeInfo; |
| |
| private final ClusterSchemaInfo clusterSchemaInfo; |
| |
| private final PartitionInfo partitionInfo; |
| |
| private final AuthorInfo authorInfo; |
| |
| private final ProcedureInfo procedureInfo; |
| |
| private final UDFInfo udfInfo; |
| |
| private final TriggerInfo triggerInfo; |
| |
| private final CQInfo cqInfo; |
| |
| private final PipeInfo pipeInfo; |
| |
| private final SubscriptionInfo subscriptionInfo; |
| |
| private final QuotaInfo quotaInfo; |
| |
| public ConfigPlanExecutor( |
| ClusterInfo clusterInfo, |
| NodeInfo nodeInfo, |
| ClusterSchemaInfo clusterSchemaInfo, |
| PartitionInfo partitionInfo, |
| AuthorInfo authorInfo, |
| ProcedureInfo procedureInfo, |
| UDFInfo udfInfo, |
| TriggerInfo triggerInfo, |
| CQInfo cqInfo, |
| PipeInfo pipeInfo, |
| SubscriptionInfo subscriptionInfo, |
| QuotaInfo quotaInfo) { |
| |
| this.snapshotProcessorList = new ArrayList<>(); |
| |
| this.clusterInfo = clusterInfo; |
| this.snapshotProcessorList.add(clusterInfo); |
| |
| this.nodeInfo = nodeInfo; |
| this.snapshotProcessorList.add(nodeInfo); |
| |
| this.clusterSchemaInfo = clusterSchemaInfo; |
| this.snapshotProcessorList.add(clusterSchemaInfo); |
| |
| this.partitionInfo = partitionInfo; |
| this.snapshotProcessorList.add(partitionInfo); |
| |
| this.authorInfo = authorInfo; |
| this.snapshotProcessorList.add(authorInfo); |
| |
| this.triggerInfo = triggerInfo; |
| this.snapshotProcessorList.add(triggerInfo); |
| |
| this.udfInfo = udfInfo; |
| this.snapshotProcessorList.add(udfInfo); |
| |
| this.cqInfo = cqInfo; |
| this.snapshotProcessorList.add(cqInfo); |
| |
| this.pipeInfo = pipeInfo; |
| this.snapshotProcessorList.add(pipeInfo); |
| |
| this.subscriptionInfo = subscriptionInfo; |
| this.snapshotProcessorList.add(subscriptionInfo); |
| |
| this.procedureInfo = procedureInfo; |
| this.snapshotProcessorList.add(procedureInfo); |
| |
| this.quotaInfo = quotaInfo; |
| this.snapshotProcessorList.add(quotaInfo); |
| |
| this.snapshotProcessorList.add(PipeConfigNodeAgent.runtime().listener()); |
| } |
| |
| public DataSet executeQueryPlan(ConfigPhysicalPlan req) |
| throws UnknownPhysicalPlanTypeException, AuthException { |
| switch (req.getType()) { |
| case GetDataNodeConfiguration: |
| return nodeInfo.getDataNodeConfiguration((GetDataNodeConfigurationPlan) req); |
| case CountDatabase: |
| return clusterSchemaInfo.countMatchedDatabases((CountDatabasePlan) req); |
| case GetDatabase: |
| return clusterSchemaInfo.getMatchedDatabaseSchemas((GetDatabasePlan) req); |
| case GetDataPartition: |
| case GetOrCreateDataPartition: |
| return partitionInfo.getDataPartition((GetDataPartitionPlan) req); |
| case GetSchemaPartition: |
| case GetOrCreateSchemaPartition: |
| return partitionInfo.getSchemaPartition((GetSchemaPartitionPlan) req); |
| case ListUser: |
| return authorInfo.executeListUsers((AuthorPlan) req); |
| case ListRole: |
| return authorInfo.executeListRoles((AuthorPlan) req); |
| case ListUserPrivilege: |
| return authorInfo.executeListUserPrivileges((AuthorPlan) req); |
| case ListRolePrivilege: |
| return authorInfo.executeListRolePrivileges((AuthorPlan) req); |
| case GetNodePathsPartition: |
| return getSchemaNodeManagementPartition(req); |
| case GetRegionInfoList: |
| return getRegionInfoList(req); |
| case GetAllSchemaTemplate: |
| return clusterSchemaInfo.getAllTemplates(); |
| case GetSchemaTemplate: |
| return clusterSchemaInfo.getTemplate((GetSchemaTemplatePlan) req); |
| case CheckTemplateSettable: |
| return clusterSchemaInfo.checkTemplateSettable((CheckTemplateSettablePlan) req); |
| case GetPathsSetTemplate: |
| return clusterSchemaInfo.getPathsSetTemplate((GetPathsSetTemplatePlan) req); |
| case GetAllTemplateSetInfo: |
| return clusterSchemaInfo.getAllTemplateSetInfo(); |
| case GetTemplateSetInfo: |
| return clusterSchemaInfo.getTemplateSetInfo((GetTemplateSetInfoPlan) req); |
| case GetTriggerTable: |
| return triggerInfo.getTriggerTable((GetTriggerTablePlan) req); |
| case GetTriggerLocation: |
| return triggerInfo.getTriggerLocation((GetTriggerLocationPlan) req); |
| case GetTriggerJar: |
| return triggerInfo.getTriggerJar((GetTriggerJarPlan) req); |
| case GetTransferringTriggers: |
| return triggerInfo.getTransferringTriggers(); |
| case GetRegionId: |
| return partitionInfo.getRegionId((GetRegionIdPlan) req); |
| case GetTimeSlotList: |
| return partitionInfo.getTimeSlotList((GetTimeSlotListPlan) req); |
| case CountTimeSlotList: |
| return partitionInfo.countTimeSlotList((CountTimeSlotListPlan) req); |
| case GetSeriesSlotList: |
| return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req); |
| case SHOW_CQ: |
| return cqInfo.showCQ(); |
| case GetFunctionTable: |
| return udfInfo.getUDFTable(); |
| case GetFunctionJar: |
| return udfInfo.getUDFJar((GetUDFJarPlan) req); |
| case GetPipePluginTable: |
| return pipeInfo.getPipePluginInfo().showPipePlugins(); |
| case GetPipePluginJar: |
| return pipeInfo.getPipePluginInfo().getPipePluginJar((GetPipePluginJarPlan) req); |
| case ShowPipeV2: |
| return pipeInfo.getPipeTaskInfo().showPipes(); |
| case ShowTopic: |
| return subscriptionInfo.showTopics(); |
| case ShowSubscription: |
| return subscriptionInfo.showSubscriptions(); |
| default: |
| throw new UnknownPhysicalPlanTypeException(req.getType()); |
| } |
| } |
| |
| public TSStatus executeNonQueryPlan(ConfigPhysicalPlan physicalPlan) |
| throws UnknownPhysicalPlanTypeException { |
| TSStatus status; |
| switch (physicalPlan.getType()) { |
| case RegisterDataNode: |
| return nodeInfo.registerDataNode((RegisterDataNodePlan) physicalPlan); |
| case RemoveDataNode: |
| return nodeInfo.removeDataNode((RemoveDataNodePlan) physicalPlan); |
| case UpdateDataNodeConfiguration: |
| status = nodeInfo.updateDataNode((UpdateDataNodePlan) physicalPlan); |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| return status; |
| } |
| return partitionInfo.updateDataNode((UpdateDataNodePlan) physicalPlan); |
| case CreateDatabase: |
| status = clusterSchemaInfo.createDatabase((DatabaseSchemaPlan) physicalPlan); |
| if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| return status; |
| } |
| return partitionInfo.createDatabase((DatabaseSchemaPlan) physicalPlan); |
| case AlterDatabase: |
| return clusterSchemaInfo.alterDatabase((DatabaseSchemaPlan) physicalPlan); |
| case AdjustMaxRegionGroupNum: |
| return clusterSchemaInfo.adjustMaxRegionGroupCount( |
| (AdjustMaxRegionGroupNumPlan) physicalPlan); |
| case DeleteDatabase: |
| try { |
| return clusterSchemaInfo.deleteDatabase((DeleteDatabasePlan) physicalPlan); |
| } finally { |
| partitionInfo.deleteDatabase((DeleteDatabasePlan) physicalPlan); |
| } |
| case PreDeleteDatabase: |
| return partitionInfo.preDeleteDatabase((PreDeleteDatabasePlan) physicalPlan); |
| case SetTTL: |
| return clusterSchemaInfo.setTTL((SetTTLPlan) physicalPlan); |
| case SetSchemaReplicationFactor: |
| return clusterSchemaInfo.setSchemaReplicationFactor( |
| (SetSchemaReplicationFactorPlan) physicalPlan); |
| case SetDataReplicationFactor: |
| return clusterSchemaInfo.setDataReplicationFactor( |
| (SetDataReplicationFactorPlan) physicalPlan); |
| case SetTimePartitionInterval: |
| return clusterSchemaInfo.setTimePartitionInterval( |
| (SetTimePartitionIntervalPlan) physicalPlan); |
| case CreateRegionGroups: |
| return partitionInfo.createRegionGroups((CreateRegionGroupsPlan) physicalPlan); |
| case OfferRegionMaintainTasks: |
| return partitionInfo.offerRegionMaintainTasks((OfferRegionMaintainTasksPlan) physicalPlan); |
| case PollRegionMaintainTask: |
| return partitionInfo.pollRegionMaintainTask(); |
| case PollSpecificRegionMaintainTask: |
| return partitionInfo.pollSpecificRegionMaintainTask( |
| (PollSpecificRegionMaintainTaskPlan) physicalPlan); |
| case CreateSchemaPartition: |
| return partitionInfo.createSchemaPartition((CreateSchemaPartitionPlan) physicalPlan); |
| case CreateDataPartition: |
| return partitionInfo.createDataPartition((CreateDataPartitionPlan) physicalPlan); |
| case UpdateProcedure: |
| return procedureInfo.updateProcedure((UpdateProcedurePlan) physicalPlan); |
| case DeleteProcedure: |
| return procedureInfo.deleteProcedure((DeleteProcedurePlan) physicalPlan); |
| case CreateUser: |
| case CreateRole: |
| case DropUser: |
| case DropRole: |
| case GrantRole: |
| case GrantUser: |
| case GrantRoleToUser: |
| case RevokeUser: |
| case RevokeRole: |
| case RevokeRoleFromUser: |
| case UpdateUser: |
| case CreateUserWithRawPassword: |
| case CreateUserDep: |
| case CreateRoleDep: |
| case DropUserDep: |
| case DropRoleDep: |
| case GrantRoleDep: |
| case GrantUserDep: |
| case GrantRoleToUserDep: |
| case RevokeUserDep: |
| case RevokeRoleDep: |
| case RevokeRoleFromUserDep: |
| case UpdateUserDep: |
| return authorInfo.authorNonQuery((AuthorPlan) physicalPlan); |
| case ApplyConfigNode: |
| return nodeInfo.applyConfigNode((ApplyConfigNodePlan) physicalPlan); |
| case RemoveConfigNode: |
| return nodeInfo.removeConfigNode((RemoveConfigNodePlan) physicalPlan); |
| case UpdateVersionInfo: |
| return nodeInfo.updateVersionInfo((UpdateVersionInfoPlan) physicalPlan); |
| case UpdateClusterId: |
| return clusterInfo.updateClusterId((UpdateClusterIdPlan) physicalPlan); |
| case CreateFunction: |
| return udfInfo.addUDFInTable((CreateFunctionPlan) physicalPlan); |
| case DropFunction: |
| return udfInfo.dropFunction((DropFunctionPlan) physicalPlan); |
| case AddTriggerInTable: |
| return triggerInfo.addTriggerInTable((AddTriggerInTablePlan) physicalPlan); |
| case DeleteTriggerInTable: |
| return triggerInfo.deleteTriggerInTable((DeleteTriggerInTablePlan) physicalPlan); |
| case UpdateTriggerStateInTable: |
| return triggerInfo.updateTriggerStateInTable((UpdateTriggerStateInTablePlan) physicalPlan); |
| case UpdateTriggersOnTransferNodes: |
| return triggerInfo.updateTriggersOnTransferNodes( |
| (UpdateTriggersOnTransferNodesPlan) physicalPlan); |
| case UpdateTriggerLocation: |
| return triggerInfo.updateTriggerLocation((UpdateTriggerLocationPlan) physicalPlan); |
| case CreateSchemaTemplate: |
| return clusterSchemaInfo.createSchemaTemplate((CreateSchemaTemplatePlan) physicalPlan); |
| case UpdateRegionLocation: |
| return partitionInfo.updateRegionLocation((UpdateRegionLocationPlan) physicalPlan); |
| case AddRegionLocation: |
| return partitionInfo.addRegionLocation((AddRegionLocationPlan) physicalPlan); |
| case RemoveRegionLocation: |
| return partitionInfo.removeRegionLocation((RemoveRegionLocationPlan) physicalPlan); |
| case SetSchemaTemplate: |
| return clusterSchemaInfo.setSchemaTemplate((SetSchemaTemplatePlan) physicalPlan); |
| case PreSetSchemaTemplate: |
| return clusterSchemaInfo.preSetSchemaTemplate((PreSetSchemaTemplatePlan) physicalPlan); |
| case CommitSetSchemaTemplate: |
| return clusterSchemaInfo.commitSetSchemaTemplate( |
| (CommitSetSchemaTemplatePlan) physicalPlan); |
| case PreUnsetTemplate: |
| return clusterSchemaInfo.preUnsetSchemaTemplate((PreUnsetSchemaTemplatePlan) physicalPlan); |
| case RollbackUnsetTemplate: |
| return clusterSchemaInfo.rollbackUnsetSchemaTemplate( |
| (RollbackPreUnsetSchemaTemplatePlan) physicalPlan); |
| case UnsetTemplate: |
| return clusterSchemaInfo.unsetSchemaTemplate((UnsetSchemaTemplatePlan) physicalPlan); |
| case DropSchemaTemplate: |
| return clusterSchemaInfo.dropSchemaTemplate((DropSchemaTemplatePlan) physicalPlan); |
| case ExtendSchemaTemplate: |
| return clusterSchemaInfo.extendSchemaTemplate((ExtendSchemaTemplatePlan) physicalPlan); |
| case CreatePipeV2: |
| return pipeInfo.createPipe((CreatePipePlanV2) physicalPlan); |
| case SetPipeStatusV2: |
| return pipeInfo.setPipeStatus((SetPipeStatusPlanV2) physicalPlan); |
| case DropPipeV2: |
| return pipeInfo.dropPipe((DropPipePlanV2) physicalPlan); |
| case AlterPipeV2: |
| return pipeInfo.alterPipe((AlterPipePlanV2) physicalPlan); |
| case OperateMultiplePipesV2: |
| return pipeInfo.operateMultiplePipes((OperateMultiplePipesPlanV2) physicalPlan); |
| case PipeHandleLeaderChange: |
| return pipeInfo.handleLeaderChange((PipeHandleLeaderChangePlan) physicalPlan); |
| case PipeHandleMetaChange: |
| return pipeInfo.handleMetaChanges((PipeHandleMetaChangePlan) physicalPlan); |
| case CreateTopic: |
| return subscriptionInfo.createTopic((CreateTopicPlan) physicalPlan); |
| case DropTopic: |
| return subscriptionInfo.dropTopic((DropTopicPlan) physicalPlan); |
| case AlterTopic: |
| return subscriptionInfo.alterTopic((AlterTopicPlan) physicalPlan); |
| case AlterMultipleTopics: |
| return subscriptionInfo.alterMultipleTopics((AlterMultipleTopicsPlan) physicalPlan); |
| case ConsumerGroupHandleMetaChange: |
| return subscriptionInfo.handleConsumerGroupMetaChanges( |
| (ConsumerGroupHandleMetaChangePlan) physicalPlan); |
| case AlterConsumerGroup: |
| return subscriptionInfo.alterConsumerGroup((AlterConsumerGroupPlan) physicalPlan); |
| case TopicHandleMetaChange: |
| return subscriptionInfo.handleTopicMetaChanges((TopicHandleMetaChangePlan) physicalPlan); |
| case ADD_CQ: |
| return cqInfo.addCQ((AddCQPlan) physicalPlan); |
| case DROP_CQ: |
| return cqInfo.dropCQ((DropCQPlan) physicalPlan); |
| case ACTIVE_CQ: |
| return cqInfo.activeCQ((ActiveCQPlan) physicalPlan); |
| case UPDATE_CQ_LAST_EXEC_TIME: |
| return cqInfo.updateCQLastExecutionTime((UpdateCQLastExecTimePlan) physicalPlan); |
| case CreatePipePlugin: |
| return pipeInfo.getPipePluginInfo().createPipePlugin((CreatePipePluginPlan) physicalPlan); |
| case DropPipePlugin: |
| return pipeInfo.getPipePluginInfo().dropPipePlugin((DropPipePluginPlan) physicalPlan); |
| case CreatePipeSinkV1: |
| case DropPipeV1: |
| case DropPipeSinkV1: |
| case GetPipeSinkV1: |
| case PreCreatePipeV1: |
| case RecordPipeMessageV1: |
| case SetPipeStatusV1: |
| case ShowPipeV1: |
| return new TSStatus(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode()); |
| case setSpaceQuota: |
| return quotaInfo.setSpaceQuota((SetSpaceQuotaPlan) physicalPlan); |
| case setThrottleQuota: |
| return quotaInfo.setThrottleQuota((SetThrottleQuotaPlan) physicalPlan); |
| case PipeEnriched: |
| return executeNonQueryPlan(((PipeEnrichedPlan) physicalPlan).getInnerPlan()); |
| case PipeDeleteTimeSeries: |
| case PipeDeleteLogicalView: |
| case PipeDeactivateTemplate: |
| // Pipe payload, used to trigger plan extraction. |
| // Will not be actually executed. |
| return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); |
| case PipeUnsetTemplate: |
| // PipeUnsetTemplate plan will not be written here, and exists only after pipe sender |
| // collects UnsetTemplatePlan and before receiver calls ConfigManager. |
| throw new UnsupportedOperationException("PipeUnsetTemplate is not supported."); |
| case TestOnly: |
| return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); |
| default: |
| throw new UnknownPhysicalPlanTypeException(physicalPlan.getType()); |
| } |
| } |
| |
| public boolean takeSnapshot(File snapshotDir) { |
| // Consensus layer needs to ensure that the directory exists. |
| // if it does not exist, print a log to warn there may have a problem. |
| if (!snapshotDir.exists()) { |
| LOGGER.warn( |
| "snapshot directory [{}] is not exist,start to create it.", |
| snapshotDir.getAbsolutePath()); |
| // Try to create a directory to enable snapshot operation |
| if (!snapshotDir.mkdirs()) { |
| LOGGER.error("snapshot directory [{}] can not be created.", snapshotDir.getAbsolutePath()); |
| return false; |
| } |
| } |
| |
| // If the directory is not empty, we should not continue the snapshot operation, |
| // which may result in incorrect results. |
| File[] fileList = snapshotDir.listFiles(); |
| if (fileList != null && fileList.length > 0) { |
| LOGGER.error("Snapshot directory [{}] is not empty.", snapshotDir.getAbsolutePath()); |
| return false; |
| } |
| |
| AtomicBoolean result = new AtomicBoolean(true); |
| snapshotProcessorList.forEach( |
| x -> { |
| boolean takeSnapshotResult = true; |
| try { |
| long startTime = System.currentTimeMillis(); |
| LOGGER.info( |
| "[ConfigNodeSnapshot] Start to take snapshot for {} into {}", |
| x.getClass().getName(), |
| snapshotDir.getAbsolutePath()); |
| takeSnapshotResult = x.processTakeSnapshot(snapshotDir); |
| LOGGER.info( |
| "[ConfigNodeSnapshot] Finish to take snapshot for {}, time consumption: {} ms", |
| x.getClass().getName(), |
| System.currentTimeMillis() - startTime); |
| } catch (TException | IOException e) { |
| LOGGER.error("Take snapshot error: {}", e.getMessage()); |
| takeSnapshotResult = false; |
| } finally { |
| // If any snapshot fails, the whole fails |
| // So this is just going to be false |
| if (!takeSnapshotResult) { |
| result.set(false); |
| } |
| } |
| }); |
| if (result.get()) { |
| LOGGER.info("[ConfigNodeSnapshot] Task snapshot success, snapshotDir: {}", snapshotDir); |
| } |
| return result.get(); |
| } |
| |
| public void loadSnapshot(File latestSnapshotRootDir) { |
| if (!latestSnapshotRootDir.exists()) { |
| LOGGER.error( |
| "snapshot directory [{}] is not exist, can not load snapshot with this directory.", |
| latestSnapshotRootDir.getAbsolutePath()); |
| return; |
| } |
| |
| AtomicBoolean result = new AtomicBoolean(true); |
| snapshotProcessorList.parallelStream() |
| .forEach( |
| x -> { |
| try { |
| long startTime = System.currentTimeMillis(); |
| LOGGER.info( |
| "[ConfigNodeSnapshot] Start to load snapshot for {} from {}", |
| x.getClass().getName(), |
| latestSnapshotRootDir.getAbsolutePath()); |
| x.processLoadSnapshot(latestSnapshotRootDir); |
| LOGGER.info( |
| "[ConfigNodeSnapshot] Load snapshot for {} cost {} ms", |
| x.getClass().getName(), |
| System.currentTimeMillis() - startTime); |
| } catch (TException | IOException e) { |
| result.set(false); |
| LOGGER.error("Load snapshot error: {}", e.getMessage()); |
| } |
| }); |
| if (result.get()) { |
| LOGGER.info( |
| "[ConfigNodeSnapshot] Load snapshot success, latestSnapshotRootDir: {}", |
| latestSnapshotRootDir); |
| } |
| } |
| |
| private DataSet getSchemaNodeManagementPartition(ConfigPhysicalPlan req) { |
| int level; |
| PartialPath partialPath; |
| Set<TSchemaNode> alreadyMatchedNode; |
| Set<PartialPath> needMatchedNode; |
| List<String> matchedStorageGroups = new ArrayList<>(); |
| |
| GetNodePathsPartitionPlan getNodePathsPartitionPlan = (GetNodePathsPartitionPlan) req; |
| partialPath = getNodePathsPartitionPlan.getPartialPath(); |
| level = getNodePathsPartitionPlan.getLevel(); |
| if (-1 == level) { |
| // get child paths |
| Pair<Set<TSchemaNode>, Set<PartialPath>> matchedChildInNextLevel = |
| clusterSchemaInfo.getChildNodePathInNextLevel( |
| partialPath, getNodePathsPartitionPlan.getScope()); |
| alreadyMatchedNode = matchedChildInNextLevel.left; |
| if (!partialPath.hasMultiLevelMatchWildcard()) { |
| needMatchedNode = new HashSet<>(); |
| for (PartialPath databasePath : matchedChildInNextLevel.right) { |
| if (databasePath.getNodeLength() == partialPath.getNodeLength() + 1) { |
| // this database node is already the target child node, no need to traverse its |
| // schemaengine |
| // region |
| continue; |
| } |
| needMatchedNode.add(databasePath); |
| } |
| } else { |
| needMatchedNode = matchedChildInNextLevel.right; |
| } |
| } else { |
| // count nodes |
| Pair<List<PartialPath>, Set<PartialPath>> matchedChildInNextLevel = |
| clusterSchemaInfo.getNodesListInGivenLevel( |
| partialPath, level, getNodePathsPartitionPlan.getScope()); |
| alreadyMatchedNode = |
| matchedChildInNextLevel.left.stream() |
| .map(path -> new TSchemaNode(path.getFullPath(), MNodeType.UNIMPLEMENT.getNodeType())) |
| .collect(Collectors.toSet()); |
| needMatchedNode = matchedChildInNextLevel.right; |
| } |
| |
| needMatchedNode.forEach(nodePath -> matchedStorageGroups.add(nodePath.getFullPath())); |
| SchemaNodeManagementResp schemaNodeManagementResp = |
| (SchemaNodeManagementResp) |
| partitionInfo.getSchemaNodeManagementPartition(matchedStorageGroups); |
| if (schemaNodeManagementResp.getStatus().getCode() |
| == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| schemaNodeManagementResp.setMatchedNode(alreadyMatchedNode); |
| } |
| return schemaNodeManagementResp; |
| } |
| |
| private DataSet getRegionInfoList(ConfigPhysicalPlan req) { |
| final GetRegionInfoListPlan getRegionInfoListPlan = (GetRegionInfoListPlan) req; |
| TShowRegionReq showRegionReq = getRegionInfoListPlan.getShowRegionReq(); |
| if (showRegionReq != null && showRegionReq.isSetDatabases()) { |
| final List<String> storageGroups = showRegionReq.getDatabases(); |
| final List<String> matchedStorageGroups = |
| clusterSchemaInfo.getMatchedDatabaseSchemasByName(storageGroups).values().stream() |
| .map(TDatabaseSchema::getName) |
| .collect(Collectors.toList()); |
| if (!matchedStorageGroups.isEmpty()) { |
| showRegionReq.setDatabases(matchedStorageGroups); |
| } |
| } |
| return partitionInfo.getRegionInfoList(getRegionInfoListPlan); |
| } |
| } |