| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.db.queryengine.execution.executor; |
| |
| import org.apache.iotdb.common.rpc.thrift.TSStatus; |
| import org.apache.iotdb.commons.conf.CommonDescriptor; |
| import org.apache.iotdb.commons.consensus.ConsensusGroupId; |
| import org.apache.iotdb.commons.consensus.DataRegionId; |
| import org.apache.iotdb.commons.consensus.SchemaRegionId; |
| import org.apache.iotdb.commons.exception.MetadataException; |
| import org.apache.iotdb.commons.path.MeasurementPath; |
| import org.apache.iotdb.commons.path.PartialPath; |
| import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; |
| import org.apache.iotdb.commons.utils.TestOnly; |
| import org.apache.iotdb.consensus.ConsensusFactory; |
| import org.apache.iotdb.consensus.IConsensus; |
| import org.apache.iotdb.consensus.exception.ConsensusException; |
| import org.apache.iotdb.db.conf.IoTDBConfig; |
| import org.apache.iotdb.db.conf.IoTDBDescriptor; |
| import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; |
| import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; |
| import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException; |
| import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException; |
| import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.BatchActivateTemplateNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; |
| import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; |
| import org.apache.iotdb.db.schemaengine.SchemaEngine; |
| import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion; |
| import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager; |
| import org.apache.iotdb.db.schemaengine.template.Template; |
| import org.apache.iotdb.db.trigger.executor.TriggerFireResult; |
| import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor; |
| import org.apache.iotdb.rpc.RpcUtils; |
| import org.apache.iotdb.rpc.TSStatusCode; |
| import org.apache.iotdb.trigger.api.enums.TriggerEvent; |
| |
| import org.apache.tsfile.utils.Pair; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| public class RegionWriteExecutor { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(RegionWriteExecutor.class); |
| |
| private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = |
| PerformanceOverviewMetrics.getInstance(); |
| |
| private static final String METADATA_ERROR_MSG = "Metadata error: "; |
| |
| private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); |
| |
| private final IConsensus dataRegionConsensus; |
| |
| private final IConsensus schemaRegionConsensus; |
| |
| private final DataNodeRegionManager regionManager; |
| |
| private final SchemaEngine schemaEngine; |
| |
| private final ClusterTemplateManager clusterTemplateManager; |
| |
| private final TriggerFireVisitor triggerFireVisitor; |
| |
| private final WritePlanNodeExecutionVisitor executionVisitor; |
| |
| private final PipeEnrichedWriteSchemaNodeExecutionVisitor pipeExecutionVisitor; |
| |
| public RegionWriteExecutor() { |
| dataRegionConsensus = DataRegionConsensusImpl.getInstance(); |
| schemaRegionConsensus = SchemaRegionConsensusImpl.getInstance(); |
| regionManager = DataNodeRegionManager.getInstance(); |
| schemaEngine = SchemaEngine.getInstance(); |
| clusterTemplateManager = ClusterTemplateManager.getInstance(); |
| triggerFireVisitor = new TriggerFireVisitor(); |
| executionVisitor = new WritePlanNodeExecutionVisitor(); |
| pipeExecutionVisitor = new PipeEnrichedWriteSchemaNodeExecutionVisitor(executionVisitor); |
| } |
| |
| @TestOnly |
| public RegionWriteExecutor( |
| IConsensus dataRegionConsensus, |
| IConsensus schemaRegionConsensus, |
| DataNodeRegionManager regionManager, |
| SchemaEngine schemaEngine, |
| ClusterTemplateManager clusterTemplateManager, |
| TriggerFireVisitor triggerFireVisitor) { |
| this.dataRegionConsensus = dataRegionConsensus; |
| this.schemaRegionConsensus = schemaRegionConsensus; |
| this.regionManager = regionManager; |
| this.schemaEngine = schemaEngine; |
| this.clusterTemplateManager = clusterTemplateManager; |
| this.triggerFireVisitor = triggerFireVisitor; |
| executionVisitor = new WritePlanNodeExecutionVisitor(); |
| pipeExecutionVisitor = new PipeEnrichedWriteSchemaNodeExecutionVisitor(executionVisitor); |
| } |
| |
| @SuppressWarnings("squid:S1181") |
| public RegionExecutionResult execute(ConsensusGroupId groupId, PlanNode planNode) { |
| try { |
| WritePlanNodeExecutionContext context = |
| new WritePlanNodeExecutionContext(groupId, regionManager.getRegionLock(groupId)); |
| return planNode.accept(executionVisitor, context); |
| } catch (Throwable e) { |
| LOGGER.error(e.getMessage(), e); |
| RegionExecutionResult result = new RegionExecutionResult(); |
| result.setAccepted(false); |
| result.setMessage(e.getMessage()); |
| result.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage())); |
| return result; |
| } |
| } |
| |
| private class WritePlanNodeExecutionVisitor |
| extends PlanVisitor<RegionExecutionResult, WritePlanNodeExecutionContext> { |
| |
| @Override |
| public RegionExecutionResult visitPlan(PlanNode node, WritePlanNodeExecutionContext context) { |
| RegionExecutionResult response = new RegionExecutionResult(); |
| |
| if (CommonDescriptor.getInstance().getConfig().isReadOnly()) { |
| response.setAccepted(false); |
| response.setMessage("Fail to do non-query operations because system is read-only."); |
| response.setStatus( |
| RpcUtils.getStatus( |
| TSStatusCode.SYSTEM_READ_ONLY, |
| "Fail to do non-query operations because system is read-only.")); |
| return response; |
| } |
| |
| try { |
| TSStatus status = executePlanNodeInConsensusLayer(context.getRegionId(), node); |
| response.setAccepted(TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()); |
| response.setMessage(status.getMessage()); |
| response.setStatus(status); |
| } catch (ConsensusException e) { |
| LOGGER.error("Failed in the write API executing the consensus layer due to: ", e); |
| response.setAccepted(false); |
| response.setMessage(e.toString()); |
| response.setStatus( |
| RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, e.getMessage())); |
| } |
| return response; |
| } |
| |
| private TSStatus executePlanNodeInConsensusLayer(ConsensusGroupId groupId, PlanNode planNode) |
| throws ConsensusException { |
| if (groupId instanceof DataRegionId) { |
| return dataRegionConsensus.write(groupId, planNode); |
| } else { |
| return schemaRegionConsensus.write(groupId, planNode); |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitInsertRow( |
| InsertRowNode node, WritePlanNodeExecutionContext context) { |
| return executeDataInsert(node, context); |
| } |
| |
| @Override |
| public RegionExecutionResult visitInsertTablet( |
| InsertTabletNode node, WritePlanNodeExecutionContext context) { |
| return executeDataInsert(node, context); |
| } |
| |
| @Override |
| public RegionExecutionResult visitInsertRows( |
| InsertRowsNode node, WritePlanNodeExecutionContext context) { |
| return executeDataInsert(node, context); |
| } |
| |
| @Override |
| public RegionExecutionResult visitInsertMultiTablets( |
| InsertMultiTabletsNode node, WritePlanNodeExecutionContext context) { |
| return executeDataInsert(node, context); |
| } |
| |
| @Override |
| public RegionExecutionResult visitInsertRowsOfOneDevice( |
| InsertRowsOfOneDeviceNode node, WritePlanNodeExecutionContext context) { |
| return executeDataInsert(node, context); |
| } |
| |
| @Override |
| public RegionExecutionResult visitPipeEnrichedInsertNode( |
| PipeEnrichedInsertNode node, WritePlanNodeExecutionContext context) { |
| return executeDataInsert(node, context); |
| } |
| |
| private RegionExecutionResult executeDataInsert( |
| InsertNode insertNode, WritePlanNodeExecutionContext context) { |
| RegionExecutionResult response = new RegionExecutionResult(); |
| context.getRegionWriteValidationRWLock().readLock().lock(); |
| try { |
| TSStatus status = fireTriggerAndInsert(context.getRegionId(), insertNode); |
| response.setAccepted(TSStatusCode.SUCCESS_STATUS.getStatusCode() == status.getCode()); |
| response.setMessage(status.message); |
| if (!response.isAccepted()) { |
| response.setStatus(status); |
| } |
| return response; |
| } catch (ConsensusException e) { |
| LOGGER.warn("Failed in the write API executing the consensus layer due to: ", e); |
| response.setAccepted(false); |
| response.setMessage(e.toString()); |
| response.setStatus(RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR, e.toString())); |
| return response; |
| } finally { |
| context.getRegionWriteValidationRWLock().readLock().unlock(); |
| } |
| } |
| |
| private TSStatus fireTriggerAndInsert(ConsensusGroupId groupId, InsertNode insertNode) |
| throws ConsensusException { |
| long triggerCostTime = 0; |
| TSStatus status; |
| long startTime = System.nanoTime(); |
| // fire Trigger before the insertion |
| TriggerFireResult result = triggerFireVisitor.process(insertNode, TriggerEvent.BEFORE_INSERT); |
| triggerCostTime += (System.nanoTime() - startTime); |
| if (result.equals(TriggerFireResult.TERMINATION)) { |
| status = |
| RpcUtils.getStatus( |
| TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(), |
| "Failed to complete the insertion because trigger error before the insertion."); |
| } else { |
| long startWriteTime = System.nanoTime(); |
| status = dataRegionConsensus.write(groupId, insertNode); |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleStorageCost(System.nanoTime() - startWriteTime); |
| |
| // fire Trigger after the insertion |
| startTime = System.nanoTime(); |
| boolean hasFailedTriggerBeforeInsertion = |
| result.equals(TriggerFireResult.FAILED_NO_TERMINATION); |
| result = triggerFireVisitor.process(insertNode, TriggerEvent.AFTER_INSERT); |
| if (hasFailedTriggerBeforeInsertion || !result.equals(TriggerFireResult.SUCCESS)) { |
| status = |
| RpcUtils.getStatus( |
| TSStatusCode.TRIGGER_FIRE_ERROR.getStatusCode(), |
| "Meet trigger error before/after the insertion, the insertion itself is completed."); |
| } |
| triggerCostTime += (System.nanoTime() - startTime); |
| } |
| PERFORMANCE_OVERVIEW_METRICS.recordScheduleTriggerCost(triggerCostTime); |
| return status; |
| } |
| |
| @Override |
| public RegionExecutionResult visitPipeEnrichedDeleteDataNode( |
| PipeEnrichedDeleteDataNode node, WritePlanNodeExecutionContext context) { |
| // data deletion should block data insertion, especially when executed for deleting timeseries |
| context.getRegionWriteValidationRWLock().writeLock().lock(); |
| try { |
| return super.visitPipeEnrichedDeleteDataNode(node, context); |
| } finally { |
| context.getRegionWriteValidationRWLock().writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitDeleteData( |
| DeleteDataNode node, WritePlanNodeExecutionContext context) { |
| // data deletion don't need to block data insertion, but there are some creation operation |
| // require write lock on data region. |
| context.getRegionWriteValidationRWLock().writeLock().lock(); |
| try { |
| return super.visitDeleteData(node, context); |
| } finally { |
| context.getRegionWriteValidationRWLock().writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitCreateTimeSeries( |
| CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return executeCreateTimeSeries(node, context, false); |
| } |
| |
| private RegionExecutionResult executeCreateTimeSeries( |
| CreateTimeSeriesNode node, |
| WritePlanNodeExecutionContext context, |
| boolean receivedFromPipe) { |
| ISchemaRegion schemaRegion = |
| schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); |
| RegionExecutionResult result = |
| checkQuotaBeforeCreatingTimeSeries(schemaRegion, node.getPath().getDevicePath(), 1); |
| if (result != null) { |
| return result; |
| } |
| if (CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { |
| context.getRegionWriteValidationRWLock().writeLock().lock(); |
| try { |
| Map<Integer, MetadataException> failingMeasurementMap = |
| schemaRegion.checkMeasurementExistence( |
| node.getPath().getDevicePath(), |
| Collections.singletonList(node.getPath().getMeasurement()), |
| Collections.singletonList(node.getAlias())); |
| if (failingMeasurementMap.isEmpty()) { |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitCreateTimeSeries(node, context); |
| } else { |
| MetadataException metadataException = failingMeasurementMap.get(0); |
| LOGGER.error(METADATA_ERROR_MSG, metadataException); |
| result = new RegionExecutionResult(); |
| result.setAccepted(false); |
| result.setMessage(metadataException.getMessage()); |
| result.setStatus( |
| RpcUtils.getStatus( |
| metadataException.getErrorCode(), metadataException.getMessage())); |
| return result; |
| } |
| } finally { |
| context.getRegionWriteValidationRWLock().writeLock().unlock(); |
| } |
| } else { |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitCreateTimeSeries(node, context); |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitCreateAlignedTimeSeries( |
| CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return executeCreateAlignedTimeSeries(node, context, false); |
| } |
| |
| private RegionExecutionResult executeCreateAlignedTimeSeries( |
| CreateAlignedTimeSeriesNode node, |
| WritePlanNodeExecutionContext context, |
| boolean receivedFromPipe) { |
| ISchemaRegion schemaRegion = |
| schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); |
| RegionExecutionResult result = |
| checkQuotaBeforeCreatingTimeSeries( |
| schemaRegion, node.getDevicePath(), node.getMeasurements().size()); |
| if (result != null) { |
| return result; |
| } |
| if (CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { |
| context.getRegionWriteValidationRWLock().writeLock().lock(); |
| try { |
| Map<Integer, MetadataException> failingMeasurementMap = |
| schemaRegion.checkMeasurementExistence( |
| node.getDevicePath(), node.getMeasurements(), node.getAliasList()); |
| if (failingMeasurementMap.isEmpty()) { |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitCreateAlignedTimeSeries(node, context); |
| } else { |
| MetadataException metadataException = failingMeasurementMap.values().iterator().next(); |
| LOGGER.error(METADATA_ERROR_MSG, metadataException); |
| result = new RegionExecutionResult(); |
| result.setAccepted(false); |
| result.setMessage(metadataException.getMessage()); |
| result.setStatus( |
| RpcUtils.getStatus( |
| metadataException.getErrorCode(), metadataException.getMessage())); |
| return result; |
| } |
| } finally { |
| context.getRegionWriteValidationRWLock().writeLock().unlock(); |
| } |
| } else { |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitCreateAlignedTimeSeries(node, context); |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitCreateMultiTimeSeries( |
| CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return executeCreateMultiTimeSeries(node, context, false); |
| } |
| |
| private RegionExecutionResult executeCreateMultiTimeSeries( |
| CreateMultiTimeSeriesNode node, |
| WritePlanNodeExecutionContext context, |
| boolean receivedFromPipe) { |
| ISchemaRegion schemaRegion = |
| schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); |
| RegionExecutionResult result; |
| for (Map.Entry<PartialPath, MeasurementGroup> entry : |
| node.getMeasurementGroupMap().entrySet()) { |
| result = |
| checkQuotaBeforeCreatingTimeSeries( |
| schemaRegion, entry.getKey(), entry.getValue().getMeasurements().size()); |
| if (result != null) { |
| return result; |
| } |
| } |
| if (CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { |
| context.getRegionWriteValidationRWLock().writeLock().lock(); |
| try { |
| List<TSStatus> failingStatus = new ArrayList<>(); |
| Map<PartialPath, MeasurementGroup> measurementGroupMap = node.getMeasurementGroupMap(); |
| List<PartialPath> emptyDeviceList = new ArrayList<>(); |
| |
| checkMeasurementExistence( |
| measurementGroupMap, schemaRegion, failingStatus, emptyDeviceList); |
| |
| for (PartialPath emptyDevice : emptyDeviceList) { |
| measurementGroupMap.remove(emptyDevice); |
| } |
| |
| RegionExecutionResult failingResult = |
| registerTimeSeries( |
| measurementGroupMap, node, context, failingStatus, receivedFromPipe); |
| |
| if (failingResult != null) { |
| return failingResult; |
| } |
| |
| TSStatus status = RpcUtils.getStatus(failingStatus); |
| failingResult = new RegionExecutionResult(); |
| failingResult.setAccepted(false); |
| failingResult.setMessage(status.getMessage()); |
| failingResult.setStatus(status); |
| return failingResult; |
| } finally { |
| context.getRegionWriteValidationRWLock().writeLock().unlock(); |
| } |
| } else { |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitCreateMultiTimeSeries(node, context); |
| } |
| } |
| |
| private void checkMeasurementExistence( |
| Map<PartialPath, MeasurementGroup> measurementGroupMap, |
| ISchemaRegion schemaRegion, |
| List<TSStatus> failingStatus, |
| List<PartialPath> emptyDeviceList) { |
| for (Map.Entry<PartialPath, MeasurementGroup> entry : measurementGroupMap.entrySet()) { |
| Map<Integer, MetadataException> failingMeasurementMap = |
| schemaRegion.checkMeasurementExistence( |
| entry.getKey(), |
| entry.getValue().getMeasurements(), |
| entry.getValue().getAliasList()); |
| if (failingMeasurementMap.isEmpty()) { |
| continue; |
| } |
| |
| for (Map.Entry<Integer, MetadataException> failingMeasurement : |
| failingMeasurementMap.entrySet()) { |
| LOGGER.error(METADATA_ERROR_MSG, failingMeasurement.getValue()); |
| failingStatus.add( |
| RpcUtils.getStatus( |
| failingMeasurement.getValue().getErrorCode(), |
| failingMeasurement.getValue().getMessage())); |
| } |
| entry.getValue().removeMeasurements(failingMeasurementMap.keySet()); |
| |
| if (entry.getValue().isEmpty()) { |
| emptyDeviceList.add(entry.getKey()); |
| } |
| } |
| } |
| |
| private RegionExecutionResult registerTimeSeries( |
| Map<PartialPath, MeasurementGroup> measurementGroupMap, |
| CreateMultiTimeSeriesNode node, |
| WritePlanNodeExecutionContext context, |
| List<TSStatus> failingStatus, |
| boolean receivedFromPipe) { |
| if (!measurementGroupMap.isEmpty()) { |
| // try registering the rest timeseries |
| RegionExecutionResult executionResult = |
| receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitCreateMultiTimeSeries(node, context); |
| if (failingStatus.isEmpty()) { |
| return executionResult; |
| } |
| |
| TSStatus executionStatus = executionResult.getStatus(); |
| if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { |
| failingStatus.addAll(executionStatus.getSubStatus()); |
| } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| failingStatus.add(executionStatus); |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public RegionExecutionResult visitInternalCreateTimeSeries( |
| InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return executeInternalCreateTimeSeries(node, context, false); |
| } |
| |
| private RegionExecutionResult executeInternalCreateTimeSeries( |
| InternalCreateTimeSeriesNode node, |
| WritePlanNodeExecutionContext context, |
| boolean receivedFromPipe) { |
| ISchemaRegion schemaRegion = |
| schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); |
| RegionExecutionResult result = |
| checkQuotaBeforeCreatingTimeSeries( |
| schemaRegion, node.getDevicePath(), node.getMeasurementGroup().size()); |
| if (result != null) { |
| return result; |
| } |
| if (CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { |
| context.getRegionWriteValidationRWLock().writeLock().lock(); |
| try { |
| List<TSStatus> failingStatus = new ArrayList<>(); |
| List<TSStatus> alreadyExistingStatus = new ArrayList<>(); |
| MeasurementGroup measurementGroup = node.getMeasurementGroup(); |
| Map<Integer, MetadataException> failingMeasurementMap = |
| schemaRegion.checkMeasurementExistence( |
| node.getDevicePath(), |
| measurementGroup.getMeasurements(), |
| measurementGroup.getAliasList()); |
| MetadataException metadataException; |
| // filter failed measurement and keep the rest for execution |
| for (Map.Entry<Integer, MetadataException> failingMeasurement : |
| failingMeasurementMap.entrySet()) { |
| metadataException = failingMeasurement.getValue(); |
| if (metadataException.getErrorCode() |
| == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { |
| // There's no need to internal create timeseries. |
| alreadyExistingStatus.add( |
| RpcUtils.getStatus( |
| metadataException.getErrorCode(), |
| MeasurementPath.transformDataToString( |
| ((MeasurementAlreadyExistException) metadataException) |
| .getMeasurementPath()))); |
| } else { |
| int errorCode = metadataException.getErrorCode(); |
| if (errorCode != TSStatusCode.PATH_ALREADY_EXIST.getStatusCode() |
| || errorCode != TSStatusCode.ALIAS_ALREADY_EXIST.getStatusCode()) { |
| LOGGER.warn(METADATA_ERROR_MSG, metadataException); |
| } |
| failingStatus.add( |
| RpcUtils.getStatus( |
| metadataException.getErrorCode(), metadataException.getMessage())); |
| } |
| } |
| measurementGroup.removeMeasurements(failingMeasurementMap.keySet()); |
| |
| return processExecutionResultOfInternalCreateSchema( |
| receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode( |
| new PipeEnrichedWritePlanNode(node), context) |
| : super.visitInternalCreateTimeSeries(node, context), |
| failingStatus, |
| alreadyExistingStatus); |
| } finally { |
| context.getRegionWriteValidationRWLock().writeLock().unlock(); |
| } |
| } else { |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitInternalCreateTimeSeries(node, context); |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitInternalCreateMultiTimeSeries( |
| InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return executeInternalCreateMultiTimeSeries(node, context, false); |
| } |
| |
| private RegionExecutionResult executeInternalCreateMultiTimeSeries( |
| InternalCreateMultiTimeSeriesNode node, |
| WritePlanNodeExecutionContext context, |
| boolean receivedFromPipe) { |
| ISchemaRegion schemaRegion = |
| schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); |
| RegionExecutionResult result; |
| for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry : |
| node.getDeviceMap().entrySet()) { |
| result = |
| checkQuotaBeforeCreatingTimeSeries( |
| schemaRegion, deviceEntry.getKey(), deviceEntry.getValue().getRight().size()); |
| if (result != null) { |
| return result; |
| } |
| } |
| if (CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { |
| context.getRegionWriteValidationRWLock().writeLock().lock(); |
| try { |
| List<TSStatus> failingStatus = new ArrayList<>(); |
| List<TSStatus> alreadyExistingStatus = new ArrayList<>(); |
| |
| MeasurementGroup measurementGroup; |
| Map<Integer, MetadataException> failingMeasurementMap; |
| MetadataException metadataException; |
| for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry : |
| node.getDeviceMap().entrySet()) { |
| measurementGroup = deviceEntry.getValue().right; |
| failingMeasurementMap = |
| schemaRegion.checkMeasurementExistence( |
| deviceEntry.getKey(), |
| measurementGroup.getMeasurements(), |
| measurementGroup.getAliasList()); |
| // filter failed measurement and keep the rest for execution |
| for (Map.Entry<Integer, MetadataException> failingMeasurement : |
| failingMeasurementMap.entrySet()) { |
| metadataException = failingMeasurement.getValue(); |
| if (metadataException.getErrorCode() |
| == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { |
| // There's no need to internal create timeseries. |
| alreadyExistingStatus.add( |
| RpcUtils.getStatus( |
| metadataException.getErrorCode(), |
| MeasurementPath.transformDataToString( |
| ((MeasurementAlreadyExistException) metadataException) |
| .getMeasurementPath()))); |
| } else { |
| LOGGER.warn(METADATA_ERROR_MSG, metadataException); |
| failingStatus.add( |
| RpcUtils.getStatus( |
| metadataException.getErrorCode(), metadataException.getMessage())); |
| } |
| } |
| measurementGroup.removeMeasurements(failingMeasurementMap.keySet()); |
| } |
| |
| return processExecutionResultOfInternalCreateSchema( |
| receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode( |
| new PipeEnrichedWritePlanNode(node), context) |
| : super.visitInternalCreateMultiTimeSeries(node, context), |
| failingStatus, |
| alreadyExistingStatus); |
| } finally { |
| context.getRegionWriteValidationRWLock().writeLock().unlock(); |
| } |
| } else { |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitInternalCreateMultiTimeSeries(node, context); |
| } |
| } |
| |
| /** |
| * Check the quota before creating time series. |
| * |
| * @return null if the quota is not exceeded, otherwise return the execution result. |
| */ |
| private RegionExecutionResult checkQuotaBeforeCreatingTimeSeries( |
| ISchemaRegion schemaRegion, PartialPath path, int size) { |
| try { |
| schemaRegion.checkSchemaQuota(path, size); |
| } catch (SchemaQuotaExceededException e) { |
| RegionExecutionResult result = new RegionExecutionResult(); |
| result.setAccepted(false); |
| result.setMessage(e.getMessage()); |
| result.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); |
| return result; |
| } |
| return null; |
| } |
| |
| private RegionExecutionResult processExecutionResultOfInternalCreateSchema( |
| RegionExecutionResult executionResult, |
| List<TSStatus> failingStatus, |
| List<TSStatus> alreadyExistingStatus) { |
| TSStatus executionStatus = executionResult.getStatus(); |
| |
| separateMeasurementAlreadyExistException( |
| failingStatus, executionStatus, alreadyExistingStatus); |
| |
| RegionExecutionResult result = new RegionExecutionResult(); |
| TSStatus status; |
| if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) { |
| status = RpcUtils.SUCCESS_STATUS; |
| result.setAccepted(true); |
| } else if (failingStatus.isEmpty()) { |
| status = RpcUtils.getStatus(alreadyExistingStatus); |
| result.setAccepted(true); |
| } else { |
| status = RpcUtils.getStatus(failingStatus); |
| result.setAccepted(false); |
| } |
| |
| result.setMessage(status.getMessage()); |
| result.setStatus(status); |
| return result; |
| } |
| |
| private void separateMeasurementAlreadyExistException( |
| List<TSStatus> failingStatus, |
| TSStatus executionStatus, |
| List<TSStatus> alreadyExistingStatus) { |
| // separate the measurement_already_exist exception and other exceptions process, |
| // measurement_already_exist exception is acceptable due to concurrent timeseries creation |
| if (failingStatus.isEmpty()) { |
| if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { |
| if (executionStatus.getSubStatus().get(0).getCode() |
| == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { |
| // there's only measurement_already_exist exception |
| alreadyExistingStatus.addAll(executionStatus.getSubStatus()); |
| } else { |
| failingStatus.addAll(executionStatus.getSubStatus()); |
| } |
| } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| failingStatus.add(executionStatus); |
| } |
| } else { |
| if (executionStatus.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { |
| if (executionStatus.getSubStatus().get(0).getCode() |
| != TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { |
| failingStatus.addAll(executionStatus.getSubStatus()); |
| } |
| } else if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| failingStatus.add(executionStatus); |
| } |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitAlterTimeSeries( |
| AlterTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return executeAlterTimeSeries(node, context, false); |
| } |
| |
| private RegionExecutionResult executeAlterTimeSeries( |
| AlterTimeSeriesNode node, WritePlanNodeExecutionContext context, boolean receivedFromPipe) { |
| ISchemaRegion schemaRegion = |
| schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); |
| try { |
| MeasurementPath measurementPath = schemaRegion.fetchMeasurementPath(node.getPath()); |
| if (node.isAlterView() && !measurementPath.getMeasurementSchema().isLogicalView()) { |
| throw new MetadataException( |
| String.format("%s is not view.", measurementPath.getFullPath())); |
| } |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitAlterTimeSeries(node, context); |
| } catch (MetadataException e) { |
| RegionExecutionResult result = new RegionExecutionResult(); |
| result.setAccepted(true); |
| result.setMessage(e.getMessage()); |
| result.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); |
| return result; |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitActivateTemplate( |
| ActivateTemplateNode node, WritePlanNodeExecutionContext context) { |
| return executeActivateTemplate(node, context, false); |
| } |
| |
| private RegionExecutionResult executeActivateTemplate( |
| ActivateTemplateNode node, |
| WritePlanNodeExecutionContext context, |
| boolean receivedFromPipe) { |
| // activate template operation shall be blocked by unset template check |
| context.getRegionWriteValidationRWLock().readLock().lock(); |
| try { |
| Pair<Template, PartialPath> templateSetInfo = |
| clusterTemplateManager.checkTemplateSetInfo(node.getActivatePath()); |
| if (templateSetInfo == null) { |
| // The activation has already been validated during analyzing. |
| // That means the template is being unset during the activation plan transport. |
| RegionExecutionResult result = new RegionExecutionResult(); |
| result.setAccepted(false); |
| String message = |
| String.format( |
| "Template is being unsetting from path %s. Please try activating later.", |
| node.getPathSetTemplate()); |
| result.setMessage(message); |
| result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message)); |
| return result; |
| } |
| ISchemaRegion schemaRegion = |
| schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); |
| RegionExecutionResult result = |
| checkQuotaBeforeCreatingTimeSeries( |
| schemaRegion, node.getActivatePath(), templateSetInfo.left.getMeasurementNumber()); |
| if (result == null) { |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitActivateTemplate(node, context); |
| } else { |
| return result; |
| } |
| } finally { |
| context.getRegionWriteValidationRWLock().readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitBatchActivateTemplate( |
| BatchActivateTemplateNode node, WritePlanNodeExecutionContext context) { |
| return executeBatchActivateTemplate(node, context, false); |
| } |
| |
| private RegionExecutionResult executeBatchActivateTemplate( |
| BatchActivateTemplateNode node, |
| WritePlanNodeExecutionContext context, |
| boolean receivedFromPipe) { |
| // activate template operation shall be blocked by unset template check |
| context.getRegionWriteValidationRWLock().readLock().lock(); |
| try { |
| ISchemaRegion schemaRegion = |
| schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); |
| for (PartialPath devicePath : node.getTemplateActivationMap().keySet()) { |
| Pair<Template, PartialPath> templateSetInfo = |
| clusterTemplateManager.checkTemplateSetInfo(devicePath); |
| if (templateSetInfo == null) { |
| // The activation has already been validated during analyzing. |
| // That means the template is being unset during the activation plan transport. |
| RegionExecutionResult result = new RegionExecutionResult(); |
| result.setAccepted(false); |
| String message = |
| String.format( |
| "Template is being unsetting from path %s. Please try activating later.", |
| node.getPathSetTemplate(devicePath)); |
| result.setMessage(message); |
| result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message)); |
| return result; |
| } |
| RegionExecutionResult result = |
| checkQuotaBeforeCreatingTimeSeries( |
| schemaRegion, devicePath, templateSetInfo.left.getMeasurementNumber()); |
| if (result != null) { |
| return result; |
| } |
| } |
| |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitBatchActivateTemplate(node, context); |
| } finally { |
| context.getRegionWriteValidationRWLock().readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitInternalBatchActivateTemplate( |
| InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext context) { |
| return executeInternalBatchActivateTemplate(node, context, false); |
| } |
| |
| private RegionExecutionResult executeInternalBatchActivateTemplate( |
| InternalBatchActivateTemplateNode node, |
| WritePlanNodeExecutionContext context, |
| boolean receivedFromPipe) { |
| // activate template operation shall be blocked by unset template check |
| context.getRegionWriteValidationRWLock().readLock().lock(); |
| try { |
| ISchemaRegion schemaRegion = |
| schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); |
| for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry : |
| node.getTemplateActivationMap().entrySet()) { |
| Pair<Template, PartialPath> templateSetInfo = |
| clusterTemplateManager.checkTemplateSetInfo(entry.getKey()); |
| if (templateSetInfo == null) { |
| // The activation has already been validated during analyzing. |
| // That means the template is being unset during the activation plan transport. |
| RegionExecutionResult result = new RegionExecutionResult(); |
| result.setAccepted(false); |
| String message = |
| String.format( |
| "Template is being unsetting from prefix path of %s. Please try activating later.", |
| new PartialPath( |
| Arrays.copyOf(entry.getKey().getNodes(), entry.getValue().right + 1)) |
| .getFullPath()); |
| result.setMessage(message); |
| result.setStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, message)); |
| return result; |
| } |
| RegionExecutionResult result = |
| checkQuotaBeforeCreatingTimeSeries( |
| schemaRegion, entry.getKey(), templateSetInfo.left.getMeasurementNumber()); |
| if (result != null) { |
| return result; |
| } |
| } |
| |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitInternalBatchActivateTemplate(node, context); |
| } finally { |
| context.getRegionWriteValidationRWLock().readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public RegionExecutionResult visitCreateLogicalView( |
| CreateLogicalViewNode node, WritePlanNodeExecutionContext context) { |
| return executeCreateLogicalView(node, context, false); |
| } |
| |
| private RegionExecutionResult executeCreateLogicalView( |
| CreateLogicalViewNode node, |
| WritePlanNodeExecutionContext context, |
| boolean receivedFromPipe) { |
| ISchemaRegion schemaRegion = |
| schemaEngine.getSchemaRegion((SchemaRegionId) context.getRegionId()); |
| if (CONFIG.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.RATIS_CONSENSUS)) { |
| context.getRegionWriteValidationRWLock().writeLock().lock(); |
| try { |
| // step 1. make sure all target paths do NOT exist. |
| List<PartialPath> targetPaths = node.getViewPathList(); |
| List<MetadataException> failingMetadataException = new ArrayList<>(); |
| for (PartialPath thisPath : targetPaths) { |
| // no alias list for a view, so the third parameter is null |
| Map<Integer, MetadataException> failingMeasurementMap = |
| schemaRegion.checkMeasurementExistence( |
| thisPath.getDevicePath(), |
| Collections.singletonList(thisPath.getMeasurement()), |
| null); |
| // merge all exceptions into one map |
| for (Map.Entry<Integer, MetadataException> entry : failingMeasurementMap.entrySet()) { |
| failingMetadataException.add(entry.getValue()); |
| } |
| } |
| // if there are some exceptions, handle each exception and return first of them. |
| if (!failingMetadataException.isEmpty()) { |
| MetadataException metadataException = failingMetadataException.get(0); |
| LOGGER.error(METADATA_ERROR_MSG, metadataException); |
| RegionExecutionResult result = new RegionExecutionResult(); |
| result.setAccepted(false); |
| result.setMessage(metadataException.getMessage()); |
| result.setStatus( |
| RpcUtils.getStatus( |
| metadataException.getErrorCode(), metadataException.getMessage())); |
| return result; |
| } |
| // step 2. make sure all source paths exist. |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitCreateLogicalView(node, context); |
| } finally { |
| context.getRegionWriteValidationRWLock().writeLock().unlock(); |
| } |
| } else { |
| return receivedFromPipe |
| ? super.visitPipeEnrichedWritePlanNode(new PipeEnrichedWritePlanNode(node), context) |
| : super.visitCreateLogicalView(node, context); |
| } |
| // end of visitCreateLogicalView |
| } |
| |
| @Override |
| public RegionExecutionResult visitPipeEnrichedWritePlanNode( |
| PipeEnrichedWritePlanNode node, WritePlanNodeExecutionContext context) { |
| return node.getWritePlanNode().accept(pipeExecutionVisitor, context); |
| } |
| } |
| |
| private class PipeEnrichedWriteSchemaNodeExecutionVisitor |
| extends PlanVisitor<RegionExecutionResult, WritePlanNodeExecutionContext> { |
| |
| WritePlanNodeExecutionVisitor visitor; |
| |
| private PipeEnrichedWriteSchemaNodeExecutionVisitor(WritePlanNodeExecutionVisitor visitor) { |
| this.visitor = visitor; |
| } |
| |
| @Override |
| public RegionExecutionResult visitPlan(PlanNode node, WritePlanNodeExecutionContext context) { |
| throw new UnsupportedOperationException( |
| "PipeEnrichedWriteSchemaNodeExecutionVisitor does not support visiting general plan."); |
| } |
| |
| @Override |
| public RegionExecutionResult visitCreateTimeSeries( |
| CreateTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return visitor.executeCreateTimeSeries(node, context, true); |
| } |
| |
| @Override |
| public RegionExecutionResult visitCreateAlignedTimeSeries( |
| CreateAlignedTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return visitor.executeCreateAlignedTimeSeries(node, context, true); |
| } |
| |
| @Override |
| public RegionExecutionResult visitCreateMultiTimeSeries( |
| CreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return visitor.executeCreateMultiTimeSeries(node, context, true); |
| } |
| |
| @Override |
| public RegionExecutionResult visitInternalCreateTimeSeries( |
| InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return visitor.executeInternalCreateTimeSeries(node, context, true); |
| } |
| |
| @Override |
| public RegionExecutionResult visitInternalCreateMultiTimeSeries( |
| InternalCreateMultiTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return visitor.executeInternalCreateMultiTimeSeries(node, context, true); |
| } |
| |
| @Override |
| public RegionExecutionResult visitAlterTimeSeries( |
| AlterTimeSeriesNode node, WritePlanNodeExecutionContext context) { |
| return visitor.executeAlterTimeSeries(node, context, true); |
| } |
| |
| @Override |
| public RegionExecutionResult visitActivateTemplate( |
| ActivateTemplateNode node, WritePlanNodeExecutionContext context) { |
| return visitor.executeActivateTemplate(node, context, true); |
| } |
| |
| @Override |
| public RegionExecutionResult visitBatchActivateTemplate( |
| BatchActivateTemplateNode node, WritePlanNodeExecutionContext context) { |
| return visitor.executeBatchActivateTemplate(node, context, true); |
| } |
| |
| @Override |
| public RegionExecutionResult visitInternalBatchActivateTemplate( |
| InternalBatchActivateTemplateNode node, WritePlanNodeExecutionContext context) { |
| return visitor.executeInternalBatchActivateTemplate(node, context, true); |
| } |
| |
| @Override |
| public RegionExecutionResult visitCreateLogicalView( |
| CreateLogicalViewNode node, WritePlanNodeExecutionContext context) { |
| return visitor.executeCreateLogicalView(node, context, true); |
| } |
| } |
| |
| private static class WritePlanNodeExecutionContext { |
| |
| private final ConsensusGroupId regionId; |
| |
| private final ReentrantReadWriteLock regionRWLock; |
| |
| WritePlanNodeExecutionContext(ConsensusGroupId regionId, ReentrantReadWriteLock regionRWLock) { |
| this.regionId = regionId; |
| this.regionRWLock = regionRWLock; |
| } |
| |
| public ConsensusGroupId getRegionId() { |
| return regionId; |
| } |
| |
| public ReentrantReadWriteLock getRegionWriteValidationRWLock() { |
| return regionRWLock; |
| } |
| } |
| } |