blob: 605f72fd831fb06700e9386de0c43ab0ea9e951f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.iotdb.db.queryengine.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.DuplicateInsertException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.MeasurementSchema;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public abstract class InsertBaseStatement extends Statement {
* if use id table, this filed is id form of device path <br>
* if not, this filed is device path<br>
protected PartialPath devicePath;
protected boolean isAligned;
protected MeasurementSchema[] measurementSchemas;
protected String[] measurements;
// get from client
protected TSDataType[] dataTypes;
/** index of failed measurements -> info including measurement, data type and value */
protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info;
// region params used by analyzing logical views.
/** This param records the logical view schema appeared in this statement. */
List<LogicalViewSchema> logicalViewSchemaList;
* This param records the index of the location where the source of this view should be placed.
* <p>For example, indexListOfLogicalViewPaths[alpha] = beta means source of
* logicalViewSchemaList[alpha] should be filled into measurementSchemas[beta].
List<Integer> indexOfSourcePathsOfLogicalViews;
/** it is the end of last range, the beginning of current range. */
int recordedBeginOfLogicalViewSchemaList = 0;
/** it is the end of current range. */
int recordedEndOfLogicalViewSchemaList = 0;
// endregion
public PartialPath getDevicePath() {
return devicePath;
public void setDevicePath(PartialPath devicePath) {
this.devicePath = devicePath;
public String[] getMeasurements() {
return measurements;
public void setMeasurements(String[] measurements) {
this.measurements = measurements;
public MeasurementSchema[] getMeasurementSchemas() {
return measurementSchemas;
public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
this.measurementSchemas = measurementSchemas;
public boolean isAligned() {
return isAligned;
public void setAligned(boolean aligned) {
isAligned = aligned;
public TSDataType[] getDataTypes() {
return dataTypes;
public void setDataTypes(TSDataType[] dataTypes) {
this.dataTypes = dataTypes;
/** Returns true when this statement is empty and no need to write into the server */
public abstract boolean isEmpty();
public List<PartialPath> getPaths() {
return Collections.emptyList();
public TSStatus checkPermissionBeforeProcess(String userName) {
if (AuthorityChecker.SUPER_USER.equals(userName)) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<PartialPath> checkedPaths = getPaths().stream().distinct().collect(Collectors.toList());
return AuthorityChecker.getTSStatus(
userName, checkedPaths, PrivilegeType.WRITE_DATA.ordinal()),
public abstract ISchemaValidation getSchemaValidation();
public abstract List<ISchemaValidation> getSchemaValidationList();
public void updateAfterSchemaValidation() throws QueryProcessException {}
/** Check whether data types are matched with measurement schemas */
protected void selfCheckDataTypes(int index)
throws DataTypeMismatchException, PathNotExistException {
if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
// if enable partial insert, mark failed measurements with exception
if (measurementSchemas[index] == null) {
new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath()));
} else if ((dataTypes[index] != measurementSchemas[index].getType()
&& !checkAndCastDataType(index, measurementSchemas[index].getType()))) {
new DataTypeMismatchException(
} else {
// if not enable partial insert, throw the exception directly
if (measurementSchemas[index] == null) {
throw new PathNotExistException(devicePath.concatNode(measurements[index]).getFullPath());
} else if ((dataTypes[index] != measurementSchemas[index].getType()
&& !checkAndCastDataType(index, measurementSchemas[index].getType()))) {
throw new DataTypeMismatchException(
protected abstract boolean checkAndCastDataType(int columnIndex, TSDataType dataType);
public abstract long getMinTime();
public abstract Object getFirstValueOfIndex(int index);
// region partial insert
* Mark failed measurement, measurements[index], dataTypes[index] and values/columns[index] would
* be null. We'd better use "measurements[index] == null" to determine if the measurement failed.
* <br>
* This method is not concurrency-safe.
* @param index failed measurement index
* @param cause cause Exception of failure
public void markFailedMeasurement(int index, Exception cause) {
throw new UnsupportedOperationException();
public boolean hasValidMeasurements() {
for (Object o : measurements) {
if (o != null) {
return true;
return false;
public boolean hasFailedMeasurements() {
return failedMeasurementIndex2Info != null && !failedMeasurementIndex2Info.isEmpty();
public int getFailedMeasurementNumber() {
return failedMeasurementIndex2Info == null ? 0 : failedMeasurementIndex2Info.size();
public List<String> getFailedMeasurements() {
return failedMeasurementIndex2Info == null
? Collections.emptyList()
: failedMeasurementIndex2Info.values().stream()
.map(info -> info.measurement)
public List<Exception> getFailedExceptions() {
return failedMeasurementIndex2Info == null
? Collections.emptyList()
: failedMeasurementIndex2Info.values().stream()
.map(info -> info.cause)
public List<String> getFailedMessages() {
return failedMeasurementIndex2Info == null
? Collections.emptyList()
: failedMeasurementIndex2Info.values().stream()
info -> {
Throwable cause = info.cause;
while (cause.getCause() != null) {
cause = cause.getCause();
return cause.getMessage();
protected static class FailedMeasurementInfo {
protected String measurement;
protected TSDataType dataType;
protected Object value;
protected Exception cause;
public FailedMeasurementInfo(
String measurement, TSDataType dataType, Object value, Exception cause) {
this.measurement = measurement;
this.dataType = dataType;
this.value = value;
this.cause = cause;
// endregion
// region functions used by analyzing logical views
* Remove logical view in this statement according to validated schemas. So this function should
* be called after validating schemas.
public abstract InsertBaseStatement removeLogicalView();
public void setFailedMeasurementIndex2Info(
Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info) {
this.failedMeasurementIndex2Info = failedMeasurementIndex2Info;
* This function is used in splitting. Traverse two lists: logicalViewSchemaList, measurements,
* then find out all devices in this statement. Those devices will map to their measurements,
* recorded in a pair of measurement name and the index of measurement schemas
* (this.measurementSchemas).
* @return map from device path to its measurements.
protected Map<PartialPath, List<Pair<String, Integer>>> getMapFromDeviceToMeasurementAndIndex() {
boolean[] isLogicalView = new boolean[this.measurements.length];
int[] indexMapToLogicalViewList = new int[this.measurements.length];
Arrays.fill(isLogicalView, false);
if (this.indexOfSourcePathsOfLogicalViews != null) {
for (int i = 0; i < this.indexOfSourcePathsOfLogicalViews.size(); i++) {
int realIndex = this.indexOfSourcePathsOfLogicalViews.get(i);
isLogicalView[realIndex] = true;
indexMapToLogicalViewList[realIndex] = i;
// construct map from device to measurements and record the index of its measurement
// schemaengine
Map<PartialPath, List<Pair<String, Integer>>> mapFromDeviceToMeasurementAndIndex =
new HashMap<>();
for (int i = 0; i < this.measurements.length; i++) {
PartialPath targetDevicePath;
String measurementName;
if (isLogicalView[i]) {
int viewIndex = indexMapToLogicalViewList[i];
targetDevicePath =
measurementName =
} else {
targetDevicePath = this.devicePath;
measurementName = this.measurements[i];
int index = i;
final String finalMeasurementName = measurementName;
(k, v) -> {
if (v == null) {
List<Pair<String, Integer>> valueList = new ArrayList<>();
valueList.add(new Pair<>(finalMeasurementName, index));
return valueList;
} else {
v.add(new Pair<>(finalMeasurementName, index));
return v;
// check this map, ensure that all time series (measurements in each device) only appear once
return mapFromDeviceToMeasurementAndIndex;
protected static void validateMapFromDeviceToMeasurement(
Map<PartialPath, List<Pair<String, Integer>>> map) {
if (map == null) {
for (Map.Entry<PartialPath, List<Pair<String, Integer>>> entry : map.entrySet()) {
List<Pair<String, Integer>> measurementList = entry.getValue();
if (measurementList.size() <= 1) {
Set<String> measurementSet = new HashSet<>();
for (Pair<String, Integer> thisPair : measurementList) {
boolean measurementNotExists = measurementSet.add(thisPair.left);
if (!measurementNotExists) {
PartialPath devicePath = entry.getKey();
throw new SemanticException(
new DuplicateInsertException(devicePath.getFullPath(), thisPair.left));
// endregion