blob: e6403c3857705f960fdd8edf917bddc44252b588 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class InsertRowStatement extends InsertBaseStatement implements ISchemaValidation {
private static final Logger LOGGER = LoggerFactory.getLogger(InsertRowStatement.class);
private static final byte TYPE_RAW_STRING = -1;
private static final byte TYPE_NULL = -2;
private long time;
private Object[] values;
private boolean isNeedInferType = false;
/**
* This param record whether the source of logical view is aligned. Only used when there are
* views.
*/
private boolean[] measurementIsAligned;
public InsertRowStatement() {
super();
statementType = StatementType.INSERT;
this.recordedBeginOfLogicalViewSchemaList = 0;
this.recordedEndOfLogicalViewSchemaList = 0;
}
@Override
public List<PartialPath> getPaths() {
List<PartialPath> ret = new ArrayList<>();
for (String m : measurements) {
PartialPath fullPath = devicePath.concatNode(m);
ret.add(fullPath);
}
return ret;
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public Object[] getValues() {
return values;
}
public void setValues(Object[] values) {
this.values = values;
}
public boolean isNeedInferType() {
return isNeedInferType;
}
public void setNeedInferType(boolean needInferType) {
isNeedInferType = needInferType;
}
@Override
public boolean isEmpty() {
return values.length == 0;
}
public void fillValues(ByteBuffer buffer) throws QueryProcessException {
this.values = new Object[measurements.length];
this.dataTypes = new TSDataType[measurements.length];
for (int i = 0; i < dataTypes.length; i++) {
// Types are not determined, the situation mainly occurs when the plan uses string values
// and is forwarded to other nodes
byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
if (typeNum == TYPE_RAW_STRING || typeNum == TYPE_NULL) {
values[i] = typeNum == TYPE_RAW_STRING ? ReadWriteIOUtils.readString(buffer) : null;
continue;
}
dataTypes[i] = TSDataType.values()[typeNum];
switch (dataTypes[i]) {
case BOOLEAN:
values[i] = ReadWriteIOUtils.readBool(buffer);
break;
case INT32:
values[i] = ReadWriteIOUtils.readInt(buffer);
break;
case INT64:
values[i] = ReadWriteIOUtils.readLong(buffer);
break;
case FLOAT:
values[i] = ReadWriteIOUtils.readFloat(buffer);
break;
case DOUBLE:
values[i] = ReadWriteIOUtils.readDouble(buffer);
break;
case TEXT:
values[i] = ReadWriteIOUtils.readBinary(buffer);
break;
default:
throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
}
}
}
public TTimePartitionSlot getTimePartitionSlot() {
return TimePartitionUtils.getTimePartitionSlot(time);
}
@Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsertRow(this, context);
}
@Override
public long getMinTime() {
return getTime();
}
@Override
public Object getFirstValueOfIndex(int index) {
return values[index];
}
@Override
protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) {
if (CommonUtils.checkCanCastType(dataTypes[columnIndex], dataType)) {
LOGGER.warn(
"Inserting to {}.{} : Cast from {} to {}",
devicePath,
measurements[columnIndex],
dataTypes[columnIndex],
dataType);
values[columnIndex] =
CommonUtils.castValue(dataTypes[columnIndex], dataType, values[columnIndex]);
dataTypes[columnIndex] = dataType;
return true;
}
return false;
}
/**
* transfer String[] values to specific data types when isNeedInferType is true. <br>
* Notice: measurementSchemas must be initialized before calling this method
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void transferType() throws QueryProcessException {
for (int i = 0; i < measurementSchemas.length; i++) {
// null when time series doesn't exist
if (measurementSchemas[i] == null) {
if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
throw new QueryProcessException(
new PathNotExistException(
devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
} else {
markFailedMeasurement(
i,
new QueryProcessException(
new PathNotExistException(
devicePath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i])));
}
continue;
}
// parse string value to specific type
dataTypes[i] = measurementSchemas[i].getType();
try {
values[i] = CommonUtils.parseValue(dataTypes[i], values[i].toString());
} catch (Exception e) {
LOGGER.warn(
"data type of {}.{} is not consistent, "
+ "registered type {}, inserting timestamp {}, value {}",
devicePath,
measurements[i],
dataTypes[i],
time,
values[i]);
if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
throw e;
} else {
markFailedMeasurement(i, e);
}
}
}
isNeedInferType = false;
}
@Override
public void markFailedMeasurement(int index, Exception cause) {
if (measurements[index] == null) {
return;
}
if (failedMeasurementIndex2Info == null) {
failedMeasurementIndex2Info = new HashMap<>();
}
InsertBaseStatement.FailedMeasurementInfo failedMeasurementInfo =
new InsertBaseStatement.FailedMeasurementInfo(
measurements[index], dataTypes[index], values[index], cause);
failedMeasurementIndex2Info.putIfAbsent(index, failedMeasurementInfo);
measurements[index] = null;
dataTypes[index] = null;
values[index] = null;
}
public boolean isNeedSplit() {
return hasLogicalViewNeedProcess();
}
public List<InsertRowStatement> getSplitList() {
if (!isNeedSplit()) {
return Collections.singletonList(this);
}
Map<PartialPath, List<Pair<String, Integer>>> mapFromDeviceToMeasurementAndIndex =
this.getMapFromDeviceToMeasurementAndIndex();
// Reconstruct statements
List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
for (Map.Entry<PartialPath, List<Pair<String, Integer>>> entry :
mapFromDeviceToMeasurementAndIndex.entrySet()) {
List<Pair<String, Integer>> pairList = entry.getValue();
InsertRowStatement statement = new InsertRowStatement();
statement.setTime(this.time);
statement.setNeedInferType(this.isNeedInferType);
statement.setDevicePath(entry.getKey());
statement.setAligned(this.isAligned);
Object[] copiedValues = new Object[pairList.size()];
String[] measurements = new String[pairList.size()];
MeasurementSchema[] measurementSchemas = new MeasurementSchema[pairList.size()];
TSDataType[] dataTypes = new TSDataType[pairList.size()];
for (int i = 0; i < pairList.size(); i++) {
int realIndex = pairList.get(i).right;
copiedValues[i] = this.values[realIndex];
measurements[i] = pairList.get(i).left;
measurementSchemas[i] = this.measurementSchemas[realIndex];
dataTypes[i] = this.dataTypes[realIndex];
if (this.measurementIsAligned != null) {
statement.setAligned(this.measurementIsAligned[realIndex]);
}
}
statement.setValues(copiedValues);
statement.setMeasurements(measurements);
statement.setMeasurementSchemas(measurementSchemas);
statement.setDataTypes(dataTypes);
statement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
insertRowStatementList.add(statement);
}
return insertRowStatementList;
}
@Override
public InsertBaseStatement removeLogicalView() {
if (!isNeedSplit()) {
return this;
}
List<InsertRowStatement> insertRowStatementList = this.getSplitList();
if (insertRowStatementList.size() == 1) {
return insertRowStatementList.get(0);
}
InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
insertRowsStatement.setInsertRowStatementList(insertRowStatementList);
return insertRowsStatement;
}
@Override
public ISchemaValidation getSchemaValidation() {
return this;
}
@Override
public List<ISchemaValidation> getSchemaValidationList() {
throw new UnsupportedOperationException();
}
@Override
public void updateAfterSchemaValidation() throws QueryProcessException {
if (isNeedInferType) {
transferType();
}
}
@Override
public TSDataType getDataType(int index) {
if (isNeedInferType) {
return TypeInferenceUtils.getPredictedDataType(values[index], true);
} else {
return dataTypes[index];
}
}
@Override
public TSEncoding getEncoding(int index) {
return null;
}
@Override
public CompressionType getCompressionType(int index) {
return null;
}
@Override
public void validateDeviceSchema(boolean isAligned) {
this.isAligned = isAligned;
}
@Override
public void validateMeasurementSchema(int index, IMeasurementSchemaInfo measurementSchemaInfo) {
if (measurementSchemas == null) {
measurementSchemas = new MeasurementSchema[measurements.length];
}
if (measurementSchemaInfo == null) {
measurementSchemas[index] = null;
} else {
if (measurementSchemaInfo.isLogicalView()) {
if (logicalViewSchemaList == null || indexOfSourcePathsOfLogicalViews == null) {
logicalViewSchemaList = new ArrayList<>();
indexOfSourcePathsOfLogicalViews = new ArrayList<>();
}
logicalViewSchemaList.add(measurementSchemaInfo.getSchemaAsLogicalViewSchema());
indexOfSourcePathsOfLogicalViews.add(index);
return;
} else {
measurementSchemas[index] = measurementSchemaInfo.getSchemaAsMeasurementSchema();
}
}
if (isNeedInferType) {
return;
}
try {
selfCheckDataTypes(index);
} catch (DataTypeMismatchException | PathNotExistException e) {
throw new SemanticException(e);
}
}
@Override
public void validateMeasurementSchema(
int index, IMeasurementSchemaInfo measurementSchemaInfo, boolean isAligned) {
this.validateMeasurementSchema(index, measurementSchemaInfo);
if (this.measurementIsAligned == null) {
this.measurementIsAligned = new boolean[this.measurements.length];
Arrays.fill(this.measurementIsAligned, this.isAligned);
}
this.measurementIsAligned[index] = isAligned;
}
@Override
public boolean hasLogicalViewNeedProcess() {
if (this.indexOfSourcePathsOfLogicalViews == null) {
return false;
}
return !this.indexOfSourcePathsOfLogicalViews.isEmpty();
}
@Override
public List<LogicalViewSchema> getLogicalViewSchemaList() {
return this.logicalViewSchemaList;
}
@Override
public List<Integer> getIndexListOfLogicalViewPaths() {
return this.indexOfSourcePathsOfLogicalViews;
}
@Override
public void recordRangeOfLogicalViewSchemaListNow() {
if (this.logicalViewSchemaList != null) {
this.recordedBeginOfLogicalViewSchemaList = this.recordedEndOfLogicalViewSchemaList;
this.recordedEndOfLogicalViewSchemaList = this.logicalViewSchemaList.size();
}
}
@Override
public Pair<Integer, Integer> getRangeOfLogicalViewSchemaListRecorded() {
return new Pair<>(
this.recordedBeginOfLogicalViewSchemaList, this.recordedEndOfLogicalViewSchemaList);
}
}