blob: c6d1b51af6f30d7d9cd9907b0f7ef3734db832a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.metadata.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.metadata.MeasurementAlreadyExistException;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ActivateTemplateNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.ConstructSchemaBlackListNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateMultiTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.DeleteTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.InternalCreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.MeasurementGroup;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.write.RollbackSchemaBlackListNode;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplateInClusterPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/** Schema write PlanNode visitor */
public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
private static final Logger logger = LoggerFactory.getLogger(SchemaExecutionVisitor.class);
@Override
public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
PhysicalPlan plan = node.accept(new PhysicalPlanTransformer(), new TransformerContext());
schemaRegion.createTimeseries((CreateTimeSeriesPlan) plan, -1);
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
@Override
public TSStatus visitCreateAlignedTimeSeries(
CreateAlignedTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
PhysicalPlan plan = node.accept(new PhysicalPlanTransformer(), new TransformerContext());
schemaRegion.createAlignedTimeSeries((CreateAlignedTimeSeriesPlan) plan);
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
@Override
public TSStatus visitCreateMultiTimeSeries(
CreateMultiTimeSeriesNode node, ISchemaRegion schemaRegion) {
Map<PartialPath, MeasurementGroup> measurementGroupMap = node.getMeasurementGroupMap();
List<TSStatus> failingStatus = new ArrayList<>();
PartialPath devicePath;
MeasurementGroup measurementGroup;
int size;
for (Map.Entry<PartialPath, MeasurementGroup> entry : measurementGroupMap.entrySet()) {
devicePath = entry.getKey();
measurementGroup = entry.getValue();
size = measurementGroup.getMeasurements().size();
// todo implement batch creation of one device in SchemaRegion
for (int i = 0; i < size; i++) {
try {
schemaRegion.createTimeseries(
transformToCreateTimeSeriesPlan(devicePath, measurementGroup, i), -1);
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
}
if (!failingStatus.isEmpty()) {
return RpcUtils.getStatus(failingStatus);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
private CreateTimeSeriesPlan transformToCreateTimeSeriesPlan(
PartialPath devicePath, MeasurementGroup measurementGroup, int index) {
return new CreateTimeSeriesPlan(
devicePath.concatNode(measurementGroup.getMeasurements().get(index)),
measurementGroup.getDataTypes().get(index),
measurementGroup.getEncodings().get(index),
measurementGroup.getCompressors().get(index),
measurementGroup.getPropsList() == null ? null : measurementGroup.getPropsList().get(index),
measurementGroup.getTagsList() == null ? null : measurementGroup.getTagsList().get(index),
measurementGroup.getAttributesList() == null
? null
: measurementGroup.getAttributesList().get(index),
measurementGroup.getAliasList() == null
? null
: measurementGroup.getAliasList().get(index));
}
@Override
public TSStatus visitInternalCreateTimeSeries(
InternalCreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
PartialPath devicePath = node.getDevicePath();
MeasurementGroup measurementGroup = node.getMeasurementGroup();
List<TSStatus> alreadyExistingTimeseries = new ArrayList<>();
List<TSStatus> failingStatus = new ArrayList<>();
if (node.isAligned()) {
executeInternalCreateAlignedTimeseries(
devicePath, measurementGroup, schemaRegion, alreadyExistingTimeseries, failingStatus);
} else {
executeInternalCreateTimeseries(
devicePath, measurementGroup, schemaRegion, alreadyExistingTimeseries, failingStatus);
}
if (!failingStatus.isEmpty()) {
return RpcUtils.getStatus(failingStatus);
}
if (!alreadyExistingTimeseries.isEmpty()) {
return RpcUtils.getStatus(alreadyExistingTimeseries);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
private void executeInternalCreateTimeseries(
PartialPath devicePath,
MeasurementGroup measurementGroup,
ISchemaRegion schemaRegion,
List<TSStatus> alreadyExistingTimeseries,
List<TSStatus> failingStatus) {
int size = measurementGroup.getMeasurements().size();
// todo implement batch creation of one device in SchemaRegion
for (int i = 0; i < size; i++) {
try {
schemaRegion.createTimeseries(
transformToCreateTimeSeriesPlan(devicePath, measurementGroup, i), -1);
} catch (MeasurementAlreadyExistException e) {
logger.info("There's no need to internal create timeseries. {}", e.getMessage());
alreadyExistingTimeseries.add(
RpcUtils.getStatus(
e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath())));
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
}
}
private void executeInternalCreateAlignedTimeseries(
PartialPath devicePath,
MeasurementGroup measurementGroup,
ISchemaRegion schemaRegion,
List<TSStatus> alreadyExistingTimeseries,
List<TSStatus> failingStatus) {
List<String> measurementList = measurementGroup.getMeasurements();
List<TSDataType> dataTypeList = measurementGroup.getDataTypes();
List<TSEncoding> encodingList = measurementGroup.getEncodings();
List<CompressionType> compressionTypeList = measurementGroup.getCompressors();
CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
new CreateAlignedTimeSeriesPlan(
devicePath,
measurementList,
dataTypeList,
encodingList,
compressionTypeList,
null,
null,
null);
boolean shouldRetry = true;
while (shouldRetry) {
try {
schemaRegion.createAlignedTimeSeries(createAlignedTimeSeriesPlan);
shouldRetry = false;
} catch (MeasurementAlreadyExistException e) {
// the existence check will be executed before truly creation
logger.info("There's no need to internal create timeseries. {}", e.getMessage());
MeasurementPath measurementPath = e.getMeasurementPath();
alreadyExistingTimeseries.add(
RpcUtils.getStatus(
e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath())));
// remove the existing timeseries from plan
int index = measurementList.indexOf(measurementPath.getMeasurement());
measurementList.remove(index);
dataTypeList.remove(index);
encodingList.remove(index);
compressionTypeList.remove(index);
if (measurementList.isEmpty()) {
shouldRetry = false;
}
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
shouldRetry = false;
}
}
}
@Override
public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
switch (node.getAlterType()) {
case RENAME:
String beforeName = node.getAlterMap().keySet().iterator().next();
String currentName = node.getAlterMap().get(beforeName);
schemaRegion.renameTagOrAttributeKey(beforeName, currentName, node.getPath());
break;
case SET:
schemaRegion.setTagsOrAttributesValue(node.getAlterMap(), node.getPath());
break;
case DROP:
schemaRegion.dropTagsOrAttributes(node.getAlterMap().keySet(), node.getPath());
break;
case ADD_TAGS:
schemaRegion.addTags(node.getAlterMap(), node.getPath());
break;
case ADD_ATTRIBUTES:
schemaRegion.addAttributes(node.getAlterMap(), node.getPath());
break;
case UPSERT:
schemaRegion.upsertTagsAndAttributes(
node.getAlias(), node.getTagsMap(), node.getAttributesMap(), node.getPath());
break;
}
} catch (MetadataException e) {
logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
} catch (IOException e) {
logger.error("{}: IO error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
@Override
public TSStatus visitActivateTemplate(ActivateTemplateNode node, ISchemaRegion schemaRegion) {
try {
ActivateTemplateInClusterPlan plan =
(ActivateTemplateInClusterPlan)
new PhysicalPlanTransformer().visitActivateTemplate(node, new TransformerContext());
Template template = ClusterTemplateManager.getInstance().getTemplate(node.getTemplateId());
plan.setAligned(template.isDirectAligned());
schemaRegion.activateSchemaTemplate(plan, template);
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitConstructSchemaBlackList(
ConstructSchemaBlackListNode node, ISchemaRegion schemaRegion) {
try {
int preDeletedNum = schemaRegion.constructSchemaBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, String.valueOf(preDeletedNum));
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitRollbackSchemaBlackList(
RollbackSchemaBlackListNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.rollbackSchemaBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitDeleteTimeseries(DeleteTimeSeriesNode node, ISchemaRegion schemaRegion) {
try {
schemaRegion.deleteTimeseriesInBlackList(node.getPatternTree());
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} catch (MetadataException e) {
logger.error(e.getMessage(), e);
return RpcUtils.getStatus(e.getErrorCode(), e.getMessage());
}
}
@Override
public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
return null;
}
// TODO need remove
private static class PhysicalPlanTransformer
extends PlanVisitor<PhysicalPlan, TransformerContext> {
@Override
public PhysicalPlan visitPlan(PlanNode node, TransformerContext context) {
throw new NotImplementedException();
}
public PhysicalPlan visitCreateTimeSeries(
CreateTimeSeriesNode node, TransformerContext context) {
return new CreateTimeSeriesPlan(
node.getPath(),
node.getDataType(),
node.getEncoding(),
node.getCompressor(),
node.getProps(),
node.getTags(),
node.getAttributes(),
node.getAlias());
}
public PhysicalPlan visitCreateAlignedTimeSeries(
CreateAlignedTimeSeriesNode node, TransformerContext context) {
return new CreateAlignedTimeSeriesPlan(
node.getDevicePath(),
node.getMeasurements(),
node.getDataTypes(),
node.getEncodings(),
node.getCompressors(),
node.getAliasList(),
node.getTagsList(),
node.getAttributesList());
}
@Override
public PhysicalPlan visitActivateTemplate(
ActivateTemplateNode node, TransformerContext context) {
return new ActivateTemplateInClusterPlan(
node.getActivatePath(), node.getTemplateSetLevel(), node.getTemplateId());
}
}
private static class TransformerContext {}
}