blob: bd624f25634c268fa503e7d9bd93540ed7d9a130 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.analyze;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
/**
* If in align by device query ,all queried devices are set in one template, we can store the common
* variables in TemplatedInfo to avoid repeated creation.
*/
public class TemplatedInfo {
// measurement info
private final List<String> measurementList;
private final List<IMeasurementSchema> schemaList;
private final List<TSDataType> dataTypes;
// query info
private final Ordering scanOrder;
private final boolean queryAllSensors;
// variables used in DeviceViewOperator
private final List<String> selectMeasurements;
private final List<Integer> deviceToMeasurementIndexes;
// variables related to LIMIT/OFFSET push down
private final long offsetValue;
private final long limitValue;
// variables used in FilterOperator
private final Expression predicate;
private boolean keepNull;
// utils variables, not serialize
private Map<String, IMeasurementSchema> schemaMap;
private Map<String, List<InputLocation>> layoutMap;
private int maxTsBlockLineNum = -1;
// variables related to predicate push down
private Expression pushDownPredicate;
public TemplatedInfo(
List<String> measurementList,
List<IMeasurementSchema> schemaList,
List<TSDataType> dataTypes,
Ordering scanOrder,
boolean queryAllSensors,
List<String> selectMeasurements,
List<Integer> deviceToMeasurementIndexes,
long offsetValue,
long limitValue,
Expression predicate,
boolean keepNull,
Map<String, IMeasurementSchema> schemaMap,
Map<String, List<InputLocation>> layoutMap,
Expression pushDownPredicate) {
this.measurementList = measurementList;
this.schemaList = schemaList;
this.dataTypes = dataTypes;
this.scanOrder = scanOrder;
this.queryAllSensors = queryAllSensors;
this.selectMeasurements = selectMeasurements;
this.deviceToMeasurementIndexes = deviceToMeasurementIndexes;
this.offsetValue = offsetValue;
this.limitValue = limitValue;
this.predicate = predicate;
if (predicate != null) {
this.keepNull = keepNull;
this.schemaMap = schemaMap;
this.layoutMap = layoutMap;
}
this.pushDownPredicate = pushDownPredicate;
}
public List<String> getMeasurementList() {
return this.measurementList;
}
public List<IMeasurementSchema> getSchemaList() {
return this.schemaList;
}
public List<TSDataType> getDataTypes() {
return this.dataTypes;
}
public Ordering getScanOrder() {
return this.scanOrder;
}
public boolean isQueryAllSensors() {
return this.queryAllSensors;
}
public List<String> getSelectMeasurements() {
return this.selectMeasurements;
}
public long getOffsetValue() {
return this.offsetValue;
}
public long getLimitValue() {
return this.limitValue;
}
public List<Integer> getDeviceToMeasurementIndexes() {
return this.deviceToMeasurementIndexes;
}
public Expression getPredicate() {
return this.predicate;
}
public boolean isKeepNull() {
return this.keepNull;
}
public Map<String, IMeasurementSchema> getSchemaMap() {
return this.schemaMap;
}
public Map<String, List<InputLocation>> getLayoutMap() {
return this.layoutMap;
}
public Expression getPushDownPredicate() {
return this.pushDownPredicate;
}
public void setPushDownPredicate(Expression pushDownPredicate) {
this.pushDownPredicate = pushDownPredicate;
}
public boolean hasPushDownPredicate() {
return this.pushDownPredicate != null;
}
public Expression[] getProjectExpressions() {
Expression[] projectExpressions = new Expression[measurementList.size()];
for (int i = 0; i < measurementList.size(); i++) {
projectExpressions[i] =
new TimeSeriesOperand(
new MeasurementPath(
new PartialPath(new String[] {measurementList.get(i)}), schemaList.get(i)));
}
return projectExpressions;
}
public static Map<String, List<InputLocation>> makeLayout(List<String> measurementList) {
Map<String, List<InputLocation>> outputMappings = new LinkedHashMap<>();
int tsBlockIndex = 0;
outputMappings
.computeIfAbsent(TIMESTAMP_EXPRESSION_STRING, key -> new ArrayList<>())
.add(new InputLocation(tsBlockIndex, -1));
int valueColumnIndex = 0;
for (String columnName : measurementList) {
outputMappings
.computeIfAbsent(columnName, key -> new ArrayList<>())
.add(new InputLocation(tsBlockIndex, valueColumnIndex));
valueColumnIndex++;
}
return outputMappings;
}
public void serialize(ByteBuffer byteBuffer) {
ReadWriteIOUtils.write(measurementList.size(), byteBuffer);
for (String measurement : measurementList) {
ReadWriteIOUtils.write(measurement, byteBuffer);
}
for (IMeasurementSchema schema : schemaList) {
schema.serializeTo(byteBuffer);
}
for (TSDataType dataType : dataTypes) {
ReadWriteIOUtils.write(dataType, byteBuffer);
}
ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
ReadWriteIOUtils.write(queryAllSensors, byteBuffer);
ReadWriteIOUtils.write(selectMeasurements.size(), byteBuffer);
for (String selectMeasurement : selectMeasurements) {
ReadWriteIOUtils.write(selectMeasurement, byteBuffer);
}
ReadWriteIOUtils.write(deviceToMeasurementIndexes.size(), byteBuffer);
for (int index : deviceToMeasurementIndexes) {
ReadWriteIOUtils.write(index, byteBuffer);
}
ReadWriteIOUtils.write(offsetValue, byteBuffer);
ReadWriteIOUtils.write(limitValue, byteBuffer);
if (predicate != null) {
ReadWriteIOUtils.write((byte) 1, byteBuffer);
Expression.serialize(predicate, byteBuffer);
ReadWriteIOUtils.write(keepNull, byteBuffer);
} else {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
}
if (pushDownPredicate != null) {
ReadWriteIOUtils.write((byte) 1, byteBuffer);
Expression.serialize(pushDownPredicate, byteBuffer);
} else {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
}
}
public void serialize(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(measurementList.size(), stream);
for (String measurement : measurementList) {
ReadWriteIOUtils.write(measurement, stream);
}
for (IMeasurementSchema schema : schemaList) {
schema.serializeTo(stream);
}
for (TSDataType dataType : dataTypes) {
ReadWriteIOUtils.write(dataType, stream);
}
ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
ReadWriteIOUtils.write(queryAllSensors, stream);
ReadWriteIOUtils.write(selectMeasurements.size(), stream);
for (String selectMeasurement : selectMeasurements) {
ReadWriteIOUtils.write(selectMeasurement, stream);
}
ReadWriteIOUtils.write(deviceToMeasurementIndexes.size(), stream);
for (int index : deviceToMeasurementIndexes) {
ReadWriteIOUtils.write(index, stream);
}
ReadWriteIOUtils.write(offsetValue, stream);
ReadWriteIOUtils.write(limitValue, stream);
if (predicate != null) {
ReadWriteIOUtils.write((byte) 1, stream);
Expression.serialize(predicate, stream);
ReadWriteIOUtils.write(keepNull, stream);
} else {
ReadWriteIOUtils.write((byte) 0, stream);
}
if (pushDownPredicate != null) {
ReadWriteIOUtils.write((byte) 1, stream);
Expression.serialize(pushDownPredicate, stream);
} else {
ReadWriteIOUtils.write((byte) 0, stream);
}
}
public static TemplatedInfo deserialize(ByteBuffer byteBuffer) {
int measurementSize = ReadWriteIOUtils.readInt(byteBuffer);
List<String> measurementList = new ArrayList<>();
int cnt = measurementSize;
while (cnt-- > 0) {
measurementList.add(ReadWriteIOUtils.readString(byteBuffer));
}
cnt = measurementSize;
List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
while (cnt-- > 0) {
// MeasurementSchema is ok?
measurementSchemaList.add(MeasurementSchema.deserializeFrom(byteBuffer));
}
cnt = measurementSize;
List<TSDataType> dataTypeList = new ArrayList<>();
while (cnt-- > 0) {
dataTypeList.add(ReadWriteIOUtils.readDataType(byteBuffer));
}
Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
boolean queryAllSensors = ReadWriteIOUtils.readBool(byteBuffer);
int listSize = ReadWriteIOUtils.readInt(byteBuffer);
List<String> selectMeasurements = new ArrayList<>(listSize);
while (listSize-- > 0) {
selectMeasurements.add(ReadWriteIOUtils.readString(byteBuffer));
}
listSize = ReadWriteIOUtils.readInt(byteBuffer);
List<Integer> deviceToMeasurementIndexes = new ArrayList<>(listSize);
while (listSize-- > 0) {
deviceToMeasurementIndexes.add(ReadWriteIOUtils.readInt(byteBuffer));
}
long offsetValue = ReadWriteIOUtils.readLong(byteBuffer);
long limitValue = ReadWriteIOUtils.readLong(byteBuffer);
Expression predicate = null;
byte hasFilter = ReadWriteIOUtils.readByte(byteBuffer);
Map<String, IMeasurementSchema> currentSchemaMap = null;
Map<String, List<InputLocation>> layoutMap = null;
boolean keepNull = false;
if (hasFilter == 1) {
predicate = Expression.deserialize(byteBuffer);
keepNull = ReadWriteIOUtils.readBool(byteBuffer);
currentSchemaMap = new HashMap<>();
for (IMeasurementSchema measurementSchema : measurementSchemaList) {
currentSchemaMap.put(measurementSchema.getMeasurementId(), measurementSchema);
}
layoutMap = makeLayout(measurementList);
}
Expression pushDownPredicate = null;
byte hasPushDownFilter = ReadWriteIOUtils.readByte(byteBuffer);
if (hasPushDownFilter == 1) {
pushDownPredicate = Expression.deserialize(byteBuffer);
}
return new TemplatedInfo(
measurementList,
measurementSchemaList,
dataTypeList,
scanOrder,
queryAllSensors,
selectMeasurements,
deviceToMeasurementIndexes,
offsetValue,
limitValue,
predicate,
keepNull,
currentSchemaMap,
layoutMap,
pushDownPredicate);
}
}