blob: d201ae7a2b770d48114f7dcf5ebe126015ef2887 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.qp.physical.crud;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
public class InsertRowPlan extends InsertPlan {
private static final Logger logger = LoggerFactory.getLogger(InsertRowPlan.class);
private static final byte TYPE_RAW_STRING = -1;
private static final byte TYPE_NULL = -2;
private long time;
private Object[] values;
// if isNeedInferType is true, the values must be String[], so we could infer types from them
// if values is object[], we could use the raw type of them, and we should set this to false
private boolean isNeedInferType = false;
private List<Object> failedValues;
public InsertRowPlan() {
super(OperatorType.INSERT);
}
public InsertRowPlan(InsertRowPlan another) {
super(OperatorType.INSERT);
this.prefixPath = another.prefixPath;
this.time = another.time;
this.measurements = new String[another.measurements.length];
System.arraycopy(another.measurements, 0, this.measurements, 0, another.measurements.length);
this.values = new Object[another.values.length];
System.arraycopy(another.values, 0, this.values, 0, another.values.length);
this.dataTypes = new TSDataType[another.dataTypes.length];
System.arraycopy(another.dataTypes, 0, this.dataTypes, 0, another.dataTypes.length);
}
public InsertRowPlan(
PartialPath prefixPath, long insertTime, String[] measurementList, String[] insertValues) {
super(Operator.OperatorType.INSERT);
this.time = insertTime;
this.prefixPath = prefixPath;
this.measurements = measurementList;
this.dataTypes = new TSDataType[insertValues.length];
// We need to create an Object[] for the data type casting, because we can not set Float, Long
// to String[i]
this.values = new Object[insertValues.length];
System.arraycopy(insertValues, 0, values, 0, insertValues.length);
isNeedInferType = true;
}
/** should be deprecated after insertRecords() and insertRowsOfOneDevice() support vector */
public InsertRowPlan(
PartialPath prefixPath, long insertTime, String[] measurementList, ByteBuffer values)
throws QueryProcessException {
super(Operator.OperatorType.INSERT);
this.time = insertTime;
this.prefixPath = prefixPath;
this.measurements = measurementList;
this.dataTypes = new TSDataType[measurementList.length];
this.values = new Object[measurementList.length];
this.fillValues(values);
isNeedInferType = false;
}
public InsertRowPlan(
PartialPath prefixPath,
long insertTime,
String[] measurementList,
ByteBuffer values,
boolean isAligned)
throws QueryProcessException {
super(Operator.OperatorType.INSERT);
this.time = insertTime;
this.prefixPath = prefixPath;
this.measurements = measurementList;
this.dataTypes = new TSDataType[measurementList.length];
this.values = new Object[measurementList.length];
this.fillValues(values);
isNeedInferType = false;
this.isAligned = isAligned;
}
@TestOnly
public InsertRowPlan(
PartialPath prefixPath,
long insertTime,
String[] measurements,
TSDataType[] dataTypes,
String[] insertValues) {
super(OperatorType.INSERT);
this.time = insertTime;
this.prefixPath = prefixPath;
this.measurements = measurements;
this.dataTypes = dataTypes;
this.values = new Object[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
try {
values[i] = CommonUtils.parseValueForTest(dataTypes[i], insertValues[i]);
} catch (QueryProcessException e) {
e.printStackTrace();
}
}
}
@TestOnly
public InsertRowPlan(
PartialPath prefixPath,
long insertTime,
String[] measurements,
TSDataType[] dataTypes,
String[] insertValues,
boolean isAligned) {
super(OperatorType.INSERT);
this.time = insertTime;
this.prefixPath = prefixPath;
this.measurements = measurements;
this.dataTypes = dataTypes;
this.values = new Object[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
try {
values[i] = CommonUtils.parseValueForTest(dataTypes[i], insertValues[i]);
} catch (QueryProcessException e) {
e.printStackTrace();
}
}
this.isAligned = isAligned;
}
@TestOnly
public InsertRowPlan(
PartialPath prefixPath,
long insertTime,
String measurement,
TSDataType type,
String insertValue) {
super(OperatorType.INSERT);
this.time = insertTime;
this.prefixPath = prefixPath;
this.measurements = new String[] {measurement};
this.dataTypes = new TSDataType[] {type};
this.values = new Object[1];
try {
values[0] = CommonUtils.parseValueForTest(dataTypes[0], insertValue);
} catch (QueryProcessException e) {
logger.error(e.getMessage());
}
}
@TestOnly
public InsertRowPlan(TSRecord tsRecord) throws IllegalPathException {
super(OperatorType.INSERT);
this.prefixPath = new PartialPath(tsRecord.deviceId);
this.time = tsRecord.time;
this.measurements = new String[tsRecord.dataPointList.size()];
this.measurementMNodes = new IMeasurementMNode[tsRecord.dataPointList.size()];
this.dataTypes = new TSDataType[tsRecord.dataPointList.size()];
this.values = new Object[tsRecord.dataPointList.size()];
for (int i = 0; i < tsRecord.dataPointList.size(); i++) {
measurements[i] = tsRecord.dataPointList.get(i).getMeasurementId();
measurementMNodes[i] =
new MeasurementMNode(
null,
measurements[i],
new MeasurementSchema(
measurements[i], tsRecord.dataPointList.get(i).getType(), TSEncoding.PLAIN),
null);
dataTypes[i] = tsRecord.dataPointList.get(i).getType();
values[i] = tsRecord.dataPointList.get(i).getValue();
}
}
public long getTime() {
return time;
}
public void setTime(long time) {
this.time = time;
}
public boolean isNeedInferType() {
return isNeedInferType;
}
public void setNeedInferType(boolean inferType) {
this.isNeedInferType = inferType;
}
/**
* if inferType is true, transfer String[] values to specific data types (Integer, Long, Float,
* Double, Binary)
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public void transferType() throws QueryProcessException {
if (isNeedInferType) {
int columnIndex = 0;
for (int i = 0; i < measurementMNodes.length; i++) {
if (measurementMNodes[i] == null) {
if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
markFailedMeasurementInsertion(
i,
new QueryProcessException(
new PathNotExistException(
prefixPath.getFullPath()
+ IoTDBConstant.PATH_SEPARATOR
+ measurements[i])));
} else {
throw new QueryProcessException(
new PathNotExistException(
prefixPath.getFullPath() + IoTDBConstant.PATH_SEPARATOR + measurements[i]));
}
columnIndex++;
continue;
}
if (measurementMNodes[i].getSchema().getType() != TSDataType.VECTOR) {
dataTypes[columnIndex] = measurementMNodes[i].getSchema().getType();
try {
values[columnIndex] =
CommonUtils.parseValue(dataTypes[columnIndex], values[columnIndex].toString());
} catch (Exception e) {
logger.warn(
"{}.{} data type is not consistent, input {}, registered {}",
prefixPath,
measurements[i],
values[i],
dataTypes[i]);
if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
markFailedMeasurementInsertion(i, e);
measurementMNodes[i] = null;
} else {
throw e;
}
}
columnIndex++;
}
// for aligned timeseries
else {
for (TSDataType dataType : measurementMNodes[i].getSchema().getValueTSDataTypeList()) {
dataTypes[columnIndex] = dataType;
try {
values[columnIndex] =
CommonUtils.parseValue(dataTypes[columnIndex], values[columnIndex].toString());
} catch (Exception e) {
logger.warn(
"{}.{} data type is not consistent, input {}, registered {}",
prefixPath,
measurements[i],
values[columnIndex],
dataTypes[columnIndex]);
if (IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
markFailedMeasurementInsertion(i, e);
measurementMNodes[i] = null;
} else {
throw e;
}
}
columnIndex++;
}
}
}
}
}
@Override
public long getMinTime() {
return getTime();
}
@Override
public void markFailedMeasurementInsertion(int index, Exception e) {
if (measurements[index] == null) {
return;
}
super.markFailedMeasurementInsertion(index, e);
if (failedValues == null) {
failedValues = new ArrayList<>();
}
failedValues.add(values[index]);
values[index] = null;
dataTypes[index] = null;
}
@Override
public List<PartialPath> getPaths() {
List<PartialPath> ret = new ArrayList<>();
for (String m : measurements) {
PartialPath fullPath = prefixPath.concatNode(m);
ret.add(fullPath);
}
return ret;
}
public Object[] getValues() {
return this.values;
}
public void setValues(Object[] values) {
this.values = values;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
InsertRowPlan that = (InsertRowPlan) o;
return time == that.time
&& Objects.equals(prefixPath, that.prefixPath)
&& Arrays.equals(measurements, that.measurements)
&& Arrays.equals(values, that.values);
}
@Override
public int hashCode() {
return Objects.hash(prefixPath, time);
}
@Override
public void serialize(DataOutputStream stream) throws IOException {
int type = PhysicalPlanType.INSERT.ordinal();
stream.writeByte((byte) type);
subSerialize(stream);
}
public void subSerialize(DataOutputStream stream) throws IOException {
stream.writeLong(time);
putString(stream, prefixPath.getFullPath());
serializeMeasurementsAndValues(stream);
}
void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException {
stream.writeInt(
measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
for (String m : measurements) {
if (m != null) {
putString(stream, m);
}
}
try {
stream.writeInt(dataTypes.length);
putValues(stream);
} catch (QueryProcessException e) {
throw new IOException(e);
}
// the types are not inferred before the plan is serialized
stream.write((byte) (isNeedInferType ? 1 : 0));
stream.writeLong(index);
stream.write((byte) (isAligned ? 1 : 0));
}
private void putValues(DataOutputStream outputStream) throws QueryProcessException, IOException {
for (int i = 0; i < values.length; i++) {
if (values[i] == null) {
ReadWriteIOUtils.write(TYPE_NULL, outputStream);
continue;
}
// types are not determined, the situation mainly occurs when the plan uses string values
// and is forwarded to other nodes
if (dataTypes == null || dataTypes[i] == null) {
ReadWriteIOUtils.write(TYPE_RAW_STRING, outputStream);
ReadWriteIOUtils.write((String) values[i], outputStream);
} else {
ReadWriteIOUtils.write(dataTypes[i], outputStream);
switch (dataTypes[i]) {
case BOOLEAN:
ReadWriteIOUtils.write((Boolean) values[i], outputStream);
break;
case INT32:
ReadWriteIOUtils.write((Integer) values[i], outputStream);
break;
case INT64:
ReadWriteIOUtils.write((Long) values[i], outputStream);
break;
case FLOAT:
ReadWriteIOUtils.write((Float) values[i], outputStream);
break;
case DOUBLE:
ReadWriteIOUtils.write((Double) values[i], outputStream);
break;
case TEXT:
ReadWriteIOUtils.write((Binary) values[i], outputStream);
break;
default:
throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
}
}
}
}
private void putValues(ByteBuffer buffer) throws QueryProcessException {
for (int i = 0; i < values.length; i++) {
if (values[i] == null) {
ReadWriteIOUtils.write(TYPE_NULL, buffer);
continue;
}
// types are not determined, the situation mainly occurs when the plan uses string values
// and is forwarded to other nodes
if (dataTypes == null || dataTypes[i] == null) {
ReadWriteIOUtils.write(TYPE_RAW_STRING, buffer);
ReadWriteIOUtils.write((String) values[i], buffer);
} else {
ReadWriteIOUtils.write(dataTypes[i], buffer);
switch (dataTypes[i]) {
case BOOLEAN:
ReadWriteIOUtils.write((Boolean) values[i], buffer);
break;
case INT32:
ReadWriteIOUtils.write((Integer) values[i], buffer);
break;
case INT64:
ReadWriteIOUtils.write((Long) values[i], buffer);
break;
case FLOAT:
ReadWriteIOUtils.write((Float) values[i], buffer);
break;
case DOUBLE:
ReadWriteIOUtils.write((Double) values[i], buffer);
break;
case TEXT:
ReadWriteIOUtils.write((Binary) values[i], buffer);
break;
default:
throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
}
}
}
}
/** Make sure the values is already inited before calling this */
public void fillValues(ByteBuffer buffer) throws QueryProcessException {
for (int i = 0; i < dataTypes.length; i++) {
// types are not determined, the situation mainly occurs when the plan uses string values
// and is forwarded to other nodes
byte typeNum = (byte) ReadWriteIOUtils.read(buffer);
if (typeNum == TYPE_RAW_STRING || typeNum == TYPE_NULL) {
values[i] = typeNum == TYPE_RAW_STRING ? ReadWriteIOUtils.readString(buffer) : null;
continue;
}
dataTypes[i] = TSDataType.values()[typeNum];
switch (dataTypes[i]) {
case BOOLEAN:
values[i] = ReadWriteIOUtils.readBool(buffer);
break;
case INT32:
values[i] = ReadWriteIOUtils.readInt(buffer);
break;
case INT64:
values[i] = ReadWriteIOUtils.readLong(buffer);
break;
case FLOAT:
values[i] = ReadWriteIOUtils.readFloat(buffer);
break;
case DOUBLE:
values[i] = ReadWriteIOUtils.readDouble(buffer);
break;
case TEXT:
values[i] = ReadWriteIOUtils.readBinary(buffer);
break;
default:
throw new QueryProcessException("Unsupported data type:" + dataTypes[i]);
}
}
}
@Override
public void serialize(ByteBuffer buffer) {
int type = PhysicalPlanType.INSERT.ordinal();
buffer.put((byte) type);
subSerialize(buffer);
}
public void subSerialize(ByteBuffer buffer) {
buffer.putLong(time);
putString(buffer, prefixPath.getFullPath());
serializeMeasurementsAndValues(buffer);
}
void serializeMeasurementsAndValues(ByteBuffer buffer) {
buffer.putInt(
measurements.length - (failedMeasurements == null ? 0 : failedMeasurements.size()));
for (String measurement : measurements) {
if (measurement != null) {
putString(buffer, measurement);
}
}
try {
buffer.putInt(dataTypes.length);
putValues(buffer);
} catch (QueryProcessException e) {
logger.error("Failed to serialize values for {}", this, e);
}
// the types are not inferred before the plan is serialized
buffer.put((byte) (isNeedInferType ? 1 : 0));
buffer.putLong(index);
buffer.put((byte) (isAligned ? 1 : 0));
}
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
this.time = buffer.getLong();
this.prefixPath = new PartialPath(readString(buffer));
deserializeMeasurementsAndValues(buffer);
}
void deserializeMeasurementsAndValues(ByteBuffer buffer) {
int measurementSize = buffer.getInt();
this.measurements = new String[measurementSize];
for (int i = 0; i < measurementSize; i++) {
measurements[i] = readString(buffer);
}
int dataTypeSize = buffer.getInt();
this.dataTypes = new TSDataType[dataTypeSize];
this.values = new Object[dataTypeSize];
try {
fillValues(buffer);
} catch (QueryProcessException e) {
e.printStackTrace();
}
isNeedInferType = buffer.get() == 1;
this.index = buffer.getLong();
isAligned = buffer.get() == 1;
}
@Override
public String toString() {
return "prefixPath: "
+ prefixPath
+ ", time: "
+ time
+ ", measurements: "
+ Arrays.toString(measurements)
+ ", values: "
+ Arrays.toString(values);
}
boolean hasFailedValues() {
return failedValues != null && !failedValues.isEmpty();
}
public TimeValuePair composeTimeValuePair(int columnIndex) {
if (columnIndex >= values.length) {
return null;
}
Object value = values[columnIndex];
return new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value));
}
@Override
public InsertPlan getPlanFromFailed() {
if (super.getPlanFromFailed() == null) {
return null;
}
values = failedValues.toArray(new Object[0]);
failedValues = null;
return this;
}
@Override
public void recoverFromFailure() {
if (failedMeasurements == null) {
return;
}
for (int i = 0; i < failedMeasurements.size(); i++) {
int index = failedIndices.get(i);
values[index] = failedValues.get(i);
}
super.recoverFromFailure();
failedValues = null;
}
@Override
public void checkIntegrity() throws QueryProcessException {
super.checkIntegrity();
if (values == null) {
throw new QueryProcessException("Values are null");
}
if (values.length == 0) {
throw new QueryProcessException("The size of values is 0");
}
for (Object value : values) {
if (value == null) {
throw new QueryProcessException("Values contain null: " + Arrays.toString(values));
}
}
}
}