use agg instead of last
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java index 0f61f06..23ed2cc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanOperator.java
@@ -22,14 +22,15 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; -import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import java.io.IOException; +import java.util.Collections; import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES; @@ -51,7 +52,7 @@ this.aggregationScanUtil = new FileAggregationScanUtil( pathPattern, aggregationDescriptor, levels, new SeriesScanOptions.Builder().build()); - this.tsBlockBuilder = LastQueryUtil.createTsBlockBuilder(); + this.tsBlockBuilder = new TsBlockBuilder(Collections.singletonList(TSDataType.INT64)); } @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java index 9267d68..4b91a9f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/source/FileAggregationScanUtil.java
@@ -41,7 +41,6 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.IPageReader; -import org.apache.iotdb.tsfile.utils.Binary; import java.io.IOException; import java.util.ArrayList; @@ -122,20 +121,12 @@ public TsBlock getAggregationResult(TsBlockBuilder builder) { for (Map.Entry<PartialPath, Aggregator> entry : pathToAggregatorMap.entrySet()) { builder.getTimeColumnBuilder().writeLong(0L); - builder.getValueColumnBuilders()[0].writeBinary( - Binary.valueOf( - String.format( - "%s(%s)", - aggregationDescriptor.getAggregationFuncName(), entry.getKey().toString()))); Aggregator aggregator = entry.getValue(); ColumnBuilder[] columnBuilders = new ColumnBuilder[1]; - columnBuilders[0] = builder.getValueColumnBuilders()[1]; + columnBuilders[0] = builder.getValueColumnBuilders()[0]; aggregator.outputResult(columnBuilders); - builder.getValueColumnBuilders()[2].writeBinary( - Binary.valueOf(aggregator.getOutputType()[0].toString())); - builder.declarePosition(); } return builder.build();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java index 5c03ff0..35a6801 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -250,14 +250,20 @@ Set<Expression> aggregationExpressions = new HashSet<>(); FunctionExpression aggregationExpression = new FunctionExpression( - aggregationMeasurementExpression.getFunctionName(), - aggregationMeasurementExpression.getFunctionAttributes(), - Collections.singletonList(sourceExpression)); + "count", + new LinkedHashMap<>(), + Collections.singletonList( + new TimeSeriesOperand( + new MeasurementPath("root.*.*.*.*.*.*", TSDataType.INT64)))); + analyzeExpression(analysis, aggregationExpression); aggregationExpressions.add(aggregationExpression); analysis.setAggregationExpressions(aggregationExpressions); analysis.setRespDatasetHeader( - new DatasetHeader(ColumnHeaderConstant.lastQueryColumnHeaders, true)); + new DatasetHeader( + Collections.singletonList( + new ColumnHeader(aggregationExpression.toString(), TSDataType.INT64)), + true)); Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>(); sgNameToQueryParamsMap.put("root.iov", Collections.emptyList()); @@ -381,7 +387,7 @@ // fetch partition information analyzeDataPartition(analysis, queryStatement, schemaTree); - } catch (StatementAnalyzeException e) { + } catch (StatementAnalyzeException | IllegalPathException e) { logger.warn("Meet error when analyzing the query statement: ", e); throw new StatementAnalyzeException( "Meet error when analyzing the query statement: " + e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java index 70b470e..7a14f75 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -1204,17 +1204,16 @@ new AggregationDescriptor( functionExpression.getFunctionName(), AggregationStep.SINGLE, - Collections.emptyList(), + Collections.singletonList(functionExpression.getExpressions().get(0)), Collections.emptyMap()); + updateTypeProvider(Collections.singletonList(functionExpression)); this.root = new FileAggregationScanNode( - context.getQueryId().genPlanNodeId(), pathPattern, aggregationDescriptor, levels); - - ColumnHeaderConstant.lastQueryColumnHeaders.forEach( - columnHeader -> - context - .getTypeProvider() - .setType(columnHeader.getColumnName(), columnHeader.getColumnType())); + context.getQueryId().genPlanNodeId(), + pathPattern, + aggregationDescriptor, + levels, + functionExpression); return this; } }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java index c9cfeb1..376284d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -454,17 +454,28 @@ return Collections.singletonList(node); } - LastQueryCollectNode lastQueryCollectNode = - new LastQueryCollectNode(context.queryContext.getQueryId().genPlanNodeId()); + CrossSeriesAggregationDescriptor rootAggregationDescriptor = + new CrossSeriesAggregationDescriptor( + "count", + AggregationStep.FINAL, + Collections.singletonList(node.getOutputExpression().getExpressions().get(0)), + Collections.emptyMap(), + node.getOutputExpression().getExpressions().get(0)); + AggregationNode aggregationNode = + new AggregationNode( + context.queryContext.getQueryId().genPlanNodeId(), + Collections.singletonList(rootAggregationDescriptor), + null, + Ordering.ASC); for (TRegionReplicaSet dataRegion : dataDistribution) { FileAggregationScanNode split = (FileAggregationScanNode) node.clone(); split.setAggregationDescriptor(node.getAggregationDescriptor()); split.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); split.setRegionReplicaSet(dataRegion); - lastQueryCollectNode.addChild(split); + aggregationNode.addChild(split); } - return Collections.singletonList(lastQueryCollectNode); + return Collections.singletonList(aggregationNode); } private List<PlanNode> processSeriesAggregationSource(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java index 5c997b9..ae4b2f6 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/source/FileAggregationScanNode.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.mpp.plan.expression.Expression; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; @@ -35,10 +36,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; -import static org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode.LAST_QUERY_HEADER_COLUMNS; - public class FileAggregationScanNode extends SeriesSourceNode { private final PartialPath pathPattern; @@ -50,15 +50,19 @@ // The id of DataRegion where the node will run private TRegionReplicaSet regionReplicaSet; + private final Expression outputExpression; + public FileAggregationScanNode( PlanNodeId id, PartialPath pathPattern, AggregationDescriptor aggregationDescriptor, - int[] levels) { + int[] levels, + Expression outputExpression) { super(id); this.pathPattern = pathPattern; this.aggregationDescriptor = aggregationDescriptor; this.levels = levels; + this.outputExpression = outputExpression; } public FileAggregationScanNode( @@ -66,8 +70,9 @@ PartialPath pathPattern, AggregationDescriptor aggregationDescriptor, int[] levels, + Expression outputExpression, TRegionReplicaSet regionReplicaSet) { - this(id, pathPattern, aggregationDescriptor, levels); + this(id, pathPattern, aggregationDescriptor, levels, outputExpression); this.regionReplicaSet = regionReplicaSet; } @@ -87,6 +92,10 @@ return levels; } + public Expression getOutputExpression() { + return outputExpression; + } + @Override public PartialPath getPartitionPath() { return null; @@ -135,12 +144,13 @@ getPathPattern(), getAggregationDescriptor(), getLevels(), + getOutputExpression(), getRegionReplicaSet()); } @Override public List<String> getOutputColumnNames() { - return LAST_QUERY_HEADER_COLUMNS; + return Collections.singletonList(outputExpression.toString()); } @Override @@ -152,6 +162,7 @@ for (int level : levels) { ReadWriteIOUtils.write(level, buffer); } + Expression.serialize(outputExpression, buffer); } @Override @@ -163,6 +174,7 @@ for (int level : levels) { ReadWriteIOUtils.write(level, stream); } + Expression.serialize(outputExpression, stream); } public static PlanNode deserialize(ByteBuffer buffer) { @@ -173,8 +185,10 @@ for (int i = 0; i < levelsSize; i++) { levels[i] = ReadWriteIOUtils.readInt(buffer); } + Expression outputExpression = Expression.deserialize(buffer); PlanNodeId planNodeId = PlanNodeId.deserialize(buffer); - return new FileAggregationScanNode(planNodeId, pathPattern, aggregationDescriptor, levels); + return new FileAggregationScanNode( + planNodeId, pathPattern, aggregationDescriptor, levels, outputExpression); } @Override