blob: fa6b210b63d2af338144eddfb1ece72b85eb6d6e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import org.apache.commons.lang3.Validate;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import javax.annotation.Nullable;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.stream.Collectors;
public class GroupByTagNode extends MultiChildProcessNode {
private final List<String> tagKeys;
private final Map<List<String>, List<CrossSeriesAggregationDescriptor>>
tagValuesToAggregationDescriptors;
private final List<String> outputColumnNames;
// The parameter of `group by time`.
// Its value will be null if there is no `group by time` clause.
@Nullable protected GroupByTimeParameter groupByTimeParameter;
protected Ordering scanOrder;
public GroupByTagNode(
PlanNodeId id,
List<PlanNode> children,
@Nullable GroupByTimeParameter groupByTimeParameter,
Ordering scanOrder,
List<String> tagKeys,
Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors,
List<String> outputColumnNames) {
super(id, children);
this.groupByTimeParameter = groupByTimeParameter;
this.scanOrder = Validate.notNull(scanOrder);
this.tagKeys = Validate.notNull(tagKeys);
this.tagValuesToAggregationDescriptors = Validate.notNull(tagValuesToAggregationDescriptors);
this.outputColumnNames = Validate.notNull(outputColumnNames);
}
public GroupByTagNode(
PlanNodeId id,
@Nullable GroupByTimeParameter groupByTimeParameter,
Ordering scanOrder,
List<String> tagKeys,
Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors,
List<String> outputColumnNames) {
super(id);
this.groupByTimeParameter = groupByTimeParameter;
this.scanOrder = Validate.notNull(scanOrder);
this.tagKeys = Validate.notNull(tagKeys);
this.tagValuesToAggregationDescriptors = Validate.notNull(tagValuesToAggregationDescriptors);
this.outputColumnNames = Validate.notNull(outputColumnNames);
}
@Override
public PlanNodeType getType() {
return PlanNodeType.GROUP_BY_TAG;
}
@Override
public PlanNode clone() {
// TODO: better do deep copy
return new GroupByTagNode(
getPlanNodeId(),
this.groupByTimeParameter,
this.scanOrder,
this.tagKeys,
this.tagValuesToAggregationDescriptors,
this.outputColumnNames);
}
@Override
public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
return new HorizontallyConcatNode(
new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
new ArrayList<>(children.subList(startIndex, endIndex)));
}
@Override
public List<String> getOutputColumnNames() {
List<String> ret = new ArrayList<>(tagKeys);
ret.addAll(outputColumnNames);
return ret;
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitGroupByTag(this, context);
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
// Plan type.
PlanNodeType.GROUP_BY_TAG.serialize(byteBuffer);
// Tag keys.
ReadWriteIOUtils.writeStringList(tagKeys, byteBuffer);
// Tag values to aggregation descriptors.
ReadWriteIOUtils.write(tagValuesToAggregationDescriptors.size(), byteBuffer);
for (Entry<List<String>, List<CrossSeriesAggregationDescriptor>> entry :
tagValuesToAggregationDescriptors.entrySet()) {
ReadWriteIOUtils.writeStringList(entry.getKey(), byteBuffer);
ReadWriteIOUtils.write(entry.getValue().size(), byteBuffer);
for (CrossSeriesAggregationDescriptor aggregationDescriptor : entry.getValue()) {
if (aggregationDescriptor == null) {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
} else {
ReadWriteIOUtils.write((byte) 1, byteBuffer);
aggregationDescriptor.serialize(byteBuffer);
}
}
}
// Output column names.
ReadWriteIOUtils.writeStringList(outputColumnNames, byteBuffer);
// Group by time parameter.
if (groupByTimeParameter == null) {
ReadWriteIOUtils.write((byte) 0, byteBuffer);
} else {
ReadWriteIOUtils.write((byte) 1, byteBuffer);
groupByTimeParameter.serialize(byteBuffer);
}
// Scan order.
ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
}
@Override
protected void serializeAttributes(DataOutputStream stream) throws IOException {
// Plan type.
PlanNodeType.GROUP_BY_TAG.serialize(stream);
// Tag keys.
ReadWriteIOUtils.writeStringList(tagKeys, stream);
// Tag values to aggregation descriptors.
ReadWriteIOUtils.write(tagValuesToAggregationDescriptors.size(), stream);
for (Entry<List<String>, List<CrossSeriesAggregationDescriptor>> entry :
tagValuesToAggregationDescriptors.entrySet()) {
ReadWriteIOUtils.writeStringList(entry.getKey(), stream);
ReadWriteIOUtils.write(entry.getValue().size(), stream);
for (CrossSeriesAggregationDescriptor aggregationDescriptor : entry.getValue()) {
if (aggregationDescriptor == null) {
ReadWriteIOUtils.write((byte) 0, stream);
} else {
ReadWriteIOUtils.write((byte) 1, stream);
aggregationDescriptor.serialize(stream);
}
}
}
// Output column names.
ReadWriteIOUtils.writeStringList(outputColumnNames, stream);
// Group by time parameter.
if (groupByTimeParameter == null) {
ReadWriteIOUtils.write((byte) 0, stream);
} else {
ReadWriteIOUtils.write((byte) 1, stream);
groupByTimeParameter.serialize(stream);
}
// Scan order.
ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
}
@Nullable
public GroupByTimeParameter getGroupByTimeParameter() {
return groupByTimeParameter;
}
public Ordering getScanOrder() {
return scanOrder;
}
public List<String> getTagKeys() {
return tagKeys;
}
/**
* The CrossSeriesAggregationDescriptor may be null if there exists a key containing no
* timeseries.
*
* <p>e.g. we have following timeseries:
*
* <ul>
* <li>root.sg.d1.s1(k1=v1)
* <li>root.sg.d1.s2(k1=v1)
* <li>root.sg.d2.s1(k1=v2)
* <li>root.sg.d3.s1(k1=v2)
* </ul>
*
* Then the query <code>
* SELECT avg(s1), avg(s2) FROM root.sg.** GROUP BY TAGS(k1)
* </code>will generate a {@link GroupByTagNode} with the <code>TagValuesToAggregationDescriptors
* </code> as below: <code>
* {
* ["v1"]: [["avg(root.sg.d1.s1)"], ["avg(root.sg.d1.s2)"]],
* ["v2"]: [["avg(root.sg.d2.s1)","avg(root.sg.d3.s1)"], null],
* }
* </code>
*
* <p>So we should use it carefully with null values.
*/
public Map<List<String>, List<CrossSeriesAggregationDescriptor>>
getTagValuesToAggregationDescriptors() {
return tagValuesToAggregationDescriptors;
}
public static GroupByTagNode deserialize(ByteBuffer byteBuffer) {
// Tag keys.
List<String> tagKeys = ReadWriteIOUtils.readStringList(byteBuffer);
// Tag values to aggregation descriptors.
int numOfEntries = ReadWriteIOUtils.readInt(byteBuffer);
Map<List<String>, List<CrossSeriesAggregationDescriptor>> tagValuesToAggregationDescriptors =
new HashMap<>();
while (numOfEntries > 0) {
List<String> tagValues = ReadWriteIOUtils.readStringList(byteBuffer);
List<CrossSeriesAggregationDescriptor> aggregationDescriptors = new ArrayList<>();
int numOfAggregationDescriptors = ReadWriteIOUtils.readInt(byteBuffer);
while (numOfAggregationDescriptors > 0) {
byte isNotNull = ReadWriteIOUtils.readByte(byteBuffer);
if (isNotNull == 1) {
aggregationDescriptors.add(CrossSeriesAggregationDescriptor.deserialize(byteBuffer));
} else {
aggregationDescriptors.add(null);
}
numOfAggregationDescriptors -= 1;
}
tagValuesToAggregationDescriptors.put(tagValues, aggregationDescriptors);
numOfEntries -= 1;
}
// Output column names.
List<String> outputColumnNames = ReadWriteIOUtils.readStringList(byteBuffer);
// Group by time parameter.
byte hasGroupByTimeParameter = ReadWriteIOUtils.readByte(byteBuffer);
GroupByTimeParameter groupByTimeParameter = null;
if (hasGroupByTimeParameter == 1) {
groupByTimeParameter = GroupByTimeParameter.deserialize(byteBuffer);
}
// Scan order.
Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new GroupByTagNode(
planNodeId,
groupByTimeParameter,
scanOrder,
tagKeys,
tagValuesToAggregationDescriptors,
outputColumnNames);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
GroupByTagNode that = (GroupByTagNode) o;
return Objects.equals(groupByTimeParameter, that.groupByTimeParameter)
&& scanOrder == that.scanOrder
&& Objects.equals(tagKeys, that.tagKeys)
&& Objects.equals(tagValuesToAggregationDescriptors, that.tagValuesToAggregationDescriptors)
&& Objects.equals(outputColumnNames, that.outputColumnNames);
}
@Override
public int hashCode() {
return Objects.hash(
super.hashCode(),
groupByTimeParameter,
scanOrder,
tagKeys,
tagValuesToAggregationDescriptors,
outputColumnNames);
}
@Override
public String toString() {
return String.format(
"GroupByTagNode-%s: Output: %s, Input: %s",
getPlanNodeId(),
getOutputColumnNames(),
tagValuesToAggregationDescriptors.values().stream()
.flatMap(
list -> list.stream().map(CrossSeriesAggregationDescriptor::getInputExpressions))
.collect(Collectors.toList()));
}
}