blob: d8b4ff691de365f94c2e9c8083adc0368b178e37 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.consensus.statemachine.schemaregion;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.template.TemplateIsInUseException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningQueue;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.BatchActivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeactivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalBatchActivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateMultiTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.MeasurementGroup;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.PreDeactivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.RollbackPreDeactivateTemplateNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.ConstructLogicalViewBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.CreateLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.DeleteLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.RollbackLogicalViewBlackListNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedNonWritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.schemaengine.schemaregion.write.req.ICreateTimeSeriesPlan;
import org.apache.iotdb.db.schemaengine.schemaregion.write.req.SchemaRegionWritePlanFactory;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/** Schema write {@link PlanNode} visitor */
public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
private static final Logger logger = LoggerFactory.getLogger(SchemaExecutionVisitor.class);
@Override
public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.createTimeseries(node, -1);
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
@Override
public TSStatus visitCreateAlignedTimeSeries(
CreateAlignedTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.createAlignedTimeSeries(node);
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
@Override
public TSStatus visitCreateMultiTimeSeries(
CreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
Map<PartialPath, MeasurementGroup> measurementGroupMap = node.getMeasurementGroupMap();
List<TSStatus> failingStatus = new ArrayList<>();
PartialPath devicePath;
MeasurementGroup measurementGroup;
int size;
for (Map.Entry<PartialPath, MeasurementGroup> entry : measurementGroupMap.entrySet()) {
devicePath = entry.getKey();
measurementGroup = entry.getValue();
size = measurementGroup.getMeasurements().size();
// todo implement batch creation of one device in SchemaRegion
for (int i = 0; i < size; i++) {
try {
schemaRegion.createTimeseries(
transformToCreateTimeSeriesPlan(devicePath, measurementGroup, i), -1);
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
}
if (!failingStatus.isEmpty()) {
return RpcUtils.getStatus(failingStatus);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
private ICreateTimeSeriesPlan transformToCreateTimeSeriesPlan(
PartialPath devicePath, MeasurementGroup measurementGroup, int index) {
return SchemaRegionWritePlanFactory.getCreateTimeSeriesPlan(
devicePath.concatNode(measurementGroup.getMeasurements().get(index)),
measurementGroup.getDataTypes().get(index),
measurementGroup.getEncodings().get(index),
measurementGroup.getCompressors().get(index),
measurementGroup.getPropsList() == null ? null : measurementGroup.getPropsList().get(index),
measurementGroup.getTagsList() == null ? null : measurementGroup.getTagsList().get(index),
measurementGroup.getAttributesList() == null
? null
: measurementGroup.getAttributesList().get(index),
measurementGroup.getAliasList() == null
? null
: measurementGroup.getAliasList().get(index));
}
@Override
public TSStatus visitInternalCreateTimeSeries(
InternalCreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
PartialPath devicePath = node.getDevicePath();
MeasurementGroup measurementGroup = node.getMeasurementGroup();
List<TSStatus> alreadyExistingTimeseries = new ArrayList<>();
List<TSStatus> failingStatus = new ArrayList<>();
if (node.isAligned()) {
executeInternalCreateAlignedTimeseries(
devicePath, measurementGroup, schemaRegion, alreadyExistingTimeseries, failingStatus);
} else {
executeInternalCreateTimeseries(
devicePath, measurementGroup, schemaRegion, alreadyExistingTimeseries, failingStatus);
}
if (!failingStatus.isEmpty()) {
return RpcUtils.getStatus(failingStatus);
}
if (!alreadyExistingTimeseries.isEmpty()) {
return RpcUtils.getStatus(alreadyExistingTimeseries);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
@Override
public TSStatus visitInternalCreateMultiTimeSeries(
InternalCreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
PartialPath devicePath;
MeasurementGroup measurementGroup;
List<TSStatus> alreadyExistingTimeseries = new ArrayList<>();
List<TSStatus> failingStatus = new ArrayList<>();
for (Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry :
node.getDeviceMap().entrySet()) {
devicePath = deviceEntry.getKey();
measurementGroup = deviceEntry.getValue().right;
if (deviceEntry.getValue().left) {
executeInternalCreateAlignedTimeseries(
devicePath, measurementGroup, schemaRegion, alreadyExistingTimeseries, failingStatus);
} else {
executeInternalCreateTimeseries(
devicePath, measurementGroup, schemaRegion, alreadyExistingTimeseries, failingStatus);
}
}
if (!failingStatus.isEmpty()) {
return RpcUtils.getStatus(failingStatus);
}
if (!alreadyExistingTimeseries.isEmpty()) {
return RpcUtils.getStatus(alreadyExistingTimeseries);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
private void executeInternalCreateTimeseries(
PartialPath devicePath,
MeasurementGroup measurementGroup,
ISchemaRegion schemaRegion,
List<TSStatus> alreadyExistingTimeseries,
List<TSStatus> failingStatus) {
int size = measurementGroup.getMeasurements().size();
// todo implement batch creation of one device in SchemaRegion
for (int i = 0; i < size; i++) {
try {
schemaRegion.createTimeseries(
transformToCreateTimeSeriesPlan(devicePath, measurementGroup, i), -1);
} catch (MeasurementAlreadyExistException e) {
// There's no need to internal create timeseries.
alreadyExistingTimeseries.add(
RpcUtils.getStatus(
e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath())));
} catch (MetadataException e) {
logger.warn("{}: MetaData error: ", e.getMessage(), e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
}
private void executeInternalCreateAlignedTimeseries(
PartialPath devicePath,
MeasurementGroup measurementGroup,
ISchemaRegion schemaRegion,
List<TSStatus> alreadyExistingTimeseries,
List<TSStatus> failingStatus) {
List<String> measurementList = measurementGroup.getMeasurements();
List<TSDataType> dataTypeList = measurementGroup.getDataTypes();
List<TSEncoding> encodingList = measurementGroup.getEncodings();
List<CompressionType> compressionTypeList = measurementGroup.getCompressors();
ICreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
SchemaRegionWritePlanFactory.getCreateAlignedTimeSeriesPlan(
devicePath,
measurementList,
dataTypeList,
encodingList,
compressionTypeList,
null,
null,
null);
boolean shouldRetry = true;
while (shouldRetry) {
try {
schemaRegion.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
shouldRetry = false;
} catch (MeasurementAlreadyExistException e) {
// the existence check will be executed before truly creation
// There's no need to internal create timeseries.
MeasurementPath measurementPath = e.getMeasurementPath();
alreadyExistingTimeseries.add(
RpcUtils.getStatus(
e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath())));
// remove the existing timeseries from plan
int index = measurementList.indexOf(measurementPath.getMeasurement());
measurementList.remove(index);
dataTypeList.remove(index);
encodingList.remove(index);
compressionTypeList.remove(index);
if (measurementList.isEmpty()) {
shouldRetry = false;
}
} catch (MetadataException e) {
logger.warn("{}: MetaData error: ", e.getMessage(), e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
shouldRetry = false;
}
}
}
@Override
public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
switch (node.getAlterType()) {
case RENAME:
String beforeName = node.getAlterMap().keySet().iterator().next();
String currentName = node.getAlterMap().get(beforeName);
schemaRegion.renameTagOrAttributeKey(beforeName, currentName, node.getPath());
break;
case SET:
schemaRegion.setTagsOrAttributesValue(node.getAlterMap(), node.getPath());
break;
case DROP:
schemaRegion.dropTagsOrAttributes(node.getAlterMap().keySet(), node.getPath());
break;
case ADD_TAGS:
schemaRegion.addTags(node.getAlterMap(), node.getPath());
break;
case ADD_ATTRIBUTES:
schemaRegion.addAttributes(node.getAlterMap(), node.getPath());
break;
case UPSERT:
schemaRegion.upsertAliasAndTagsAndAttributes(
node.getAlias(), node.getTagsMap(), node.getAttributesMap(), node.getPath());
break;
}
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
} catch (IOException e) {
logger.error("{}: IO error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
@Override
public TSStatus visitActivateTemplate(ActivateTemplateNode node, ISchemaRegion schemaRegion) {
try {
Template template = ClusterTemplateManager.getInstance().getTemplate(node.getTemplateId());
schemaRegion.activateSchemaTemplate(node, template);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitBatchActivateTemplate(
BatchActivateTemplateNode node, ISchemaRegion schemaRegion) {
for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
node.getTemplateActivationMap().entrySet()) {
Template template = ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
try {
schemaRegion.activateSchemaTemplate(
SchemaRegionWritePlanFactory.getActivateTemplateInClusterPlan(
entry.getKey(), entry.getValue().right, entry.getValue().left),
template);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus visitInternalBatchActivateTemplate(
InternalBatchActivateTemplateNode node, ISchemaRegion schemaRegion) {
for (Map.Entry<PartialPath, Pair<Integer, Integer>> entry :
node.getTemplateActivationMap().entrySet()) {
Template template = ClusterTemplateManager.getInstance().getTemplate(entry.getValue().left);
try {
schemaRegion.activateSchemaTemplate(
SchemaRegionWritePlanFactory.getActivateTemplateInClusterPlan(
entry.getKey(), entry.getValue().right, entry.getValue().left),
template);
} catch (TemplateIsInUseException e) {
logger.info(
String.format(
"Device Template has already been activated on path %s, there's no need to activate again.",
entry.getKey()));
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
@Override
public TSStatus visitConstructSchemaBlackList(
ConstructSchemaBlackListNode node, ISchemaRegion schemaRegion) {
try {
long preDeletedNum = schemaRegion.constructSchemaBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, String.valueOf(preDeletedNum));
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitRollbackSchemaBlackList(
RollbackSchemaBlackListNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.rollbackSchemaBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitDeleteTimeseries(DeleteTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.deleteTimeseriesInBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitPreDeactivateTemplate(
PreDeactivateTemplateNode node, ISchemaRegion schemaRegion) {
try {
long preDeactivateNum = schemaRegion.constructSchemaBlackListWithTemplate(node);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, String.valueOf(preDeactivateNum));
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitRollbackPreDeactivateTemplate(
RollbackPreDeactivateTemplateNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.rollbackSchemaBlackListWithTemplate(node);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitDeactivateTemplate(DeactivateTemplateNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.deactivateTemplateInBlackList(node);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitCreateLogicalView(CreateLogicalViewNode node, ISchemaRegion schemaRegion) {
Map<PartialPath, ViewExpression> viewPathToSourceMap = node.getViewPathToSourceExpressionMap();
List<TSStatus> failingStatus = new ArrayList<>();
for (Map.Entry<PartialPath, ViewExpression> entry : viewPathToSourceMap.entrySet()) {
try {
schemaRegion.createLogicalView(
SchemaRegionWritePlanFactory.getCreateLogicalViewPlan(
entry.getKey(), entry.getValue()));
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
if (!failingStatus.isEmpty()) {
return RpcUtils.getStatus(failingStatus);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
@Override
public TSStatus visitAlterLogicalView(AlterLogicalViewNode node, ISchemaRegion schemaRegion) {
Map<PartialPath, ViewExpression> viewPathToSourceMap = node.getViewPathToSourceMap();
List<TSStatus> failingStatus = new ArrayList<>();
for (Map.Entry<PartialPath, ViewExpression> entry : viewPathToSourceMap.entrySet()) {
try {
schemaRegion.alterLogicalView(
SchemaRegionWritePlanFactory.getAlterLogicalViewPlan(entry.getKey(), entry.getValue()));
} catch (MetadataException e) {
logger.warn("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
if (!failingStatus.isEmpty()) {
return RpcUtils.getStatus(failingStatus);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
@Override
public TSStatus visitConstructLogicalViewBlackList(
ConstructLogicalViewBlackListNode node, ISchemaRegion schemaRegion) {
try {
long preDeletedNum = schemaRegion.constructLogicalViewBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, String.valueOf(preDeletedNum));
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitRollbackLogicalViewBlackList(
RollbackLogicalViewBlackListNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.rollbackLogicalViewBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitDeleteLogicalView(DeleteLogicalViewNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.deleteLogicalView(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitPipeEnrichedWritePlanNode(
PipeEnrichedWritePlanNode node, ISchemaRegion schemaRegion) {
return node.getWritePlanNode().accept(this, schemaRegion);
}
@Override
public TSStatus visitPipeEnrichedNonWritePlanNode(
PipeEnrichedNonWritePlanNode node, ISchemaRegion schemaRegion) {
return node.getNonWritePlanNode().accept(this, schemaRegion);
}
@Override
public TSStatus visitPipeOperateSchemaQueueNode(
PipeOperateSchemaQueueNode node, ISchemaRegion schemaRegion) {
final SchemaRegionId id = schemaRegion.getSchemaRegionId();
final SchemaRegionListeningQueue queue = PipeAgent.runtime().schemaListener(id);
try {
if (node.isOpen() && !queue.isOpened()) {
logger.info("Opened pipe listening queue on schema region {}", id);
queue.open();
} else if (!node.isOpen() && queue.isOpened()) {
logger.info("Closed pipe listening queue on schema region {}", id);
queue.close();
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (IOException e) {
return new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
.setMessage("Failed to clear the queue, because " + e.getMessage());
}
}
@Override
public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
return null;
}
}