KYLIN-5040 Add rule for exactly match cuboid
diff --git a/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query00.sql b/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query00.sql
new file mode 100644
index 0000000..4268b28
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query00.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select CAL_DT,LSTG_FORMAT_NAME,LSTG_SITE_ID,SLR_SEGMENT_CD,
+ sum(price) as GMV, count(1) as TRANS_CNT,
+ min(price) as minP, max(price) as maxP
+from test_kylin_fact
+group by CAL_DT,LSTG_FORMAT_NAME,SLR_SEGMENT_CD,LSTG_SITE_ID
+;{"scanRowCount":9287,"scanBytes":0,"scanFiles":2,"cuboidId":[276480],"exactlyMatched":[true]}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query01.sql b/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query01.sql
new file mode 100644
index 0000000..13d08e8
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query01.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select LSTG_FORMAT_NAME,LSTG_SITE_ID,SLR_SEGMENT_CD,
+ sum(price) as GMV, count(1) as TRANS_CNT,
+ min(price) as minP, max(price) as maxP
+from test_kylin_fact
+group by LSTG_FORMAT_NAME,SLR_SEGMENT_CD,LSTG_SITE_ID
+;{"scanRowCount":537,"scanBytes":0,"scanFiles":2,"cuboidId":[14336],"exactlyMatched":[false]}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
index 8c7c2f0..b2bdaef 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
@@ -22,7 +22,8 @@
import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.sql.SqlKind
import org.apache.kylin.common.KylinConfig
-import org.apache.kylin.metadata.model.FunctionDesc
+import org.apache.kylin.cube.CubeInstance
+import org.apache.kylin.metadata.model.{FunctionDesc, PartitionDesc, SegmentStatusEnum, TblColRef}
import org.apache.kylin.query.relnode.{KylinAggregateCall, OLAPAggregateRel}
import org.apache.kylin.query.runtime.RuntimeHelper
import org.apache.kylin.query.SchemaProcessor
@@ -244,7 +245,16 @@
}
val groupByCols = rel.getGroups.asScala.map(_.getIdentity).toSet
if (groupByCols.isEmpty) return false
+ val f = olapContext.realization.asInstanceOf[CubeInstance].getSegments(SegmentStatusEnum.READY).size()
+ if (!groupByContainsPartition(groupByCols, cuboid.getCubeDesc.getModel.getPartitionDesc) &&
+ olapContext.realization.asInstanceOf[CubeInstance].getSegments(SegmentStatusEnum.READY).size() != 1) {
+ return false
+ }
val cuboidDims = cuboid.getColumns.asScala.map(_.getIdentity).toSet
groupByCols.equals(cuboidDims)
}
+
+ def groupByContainsPartition(groupByCols: Set[String], partitionDesc: PartitionDesc): Boolean = {
+ partitionDesc != null && partitionDesc.getPartitionDateColumnRef != null && groupByCols.contains(partitionDesc.getPartitionDateColumnRef.getIdentity)
+ }
}
diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
index cb1b049..a0b7639 100644
--- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
+++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
@@ -105,6 +105,21 @@
report(results);
}
+ @Test
+ public void exactlyMatchCuboidMultiSegmentTest() throws Exception {
+ final KylinConfig config = KylinConfig.getInstanceFromEnv();
+ buildSegments("ci_left_join_cube", new SegmentRange.TSRange(dateToLong("2012-01-01"), dateToLong("2013-01-01")),
+ new SegmentRange.TSRange(dateToLong("2013-01-01"), dateToLong("2015-01-01")));
+
+ populateSSWithCSVData(config, getProject(), KylinSparkEnv.getSparkSession());
+
+ List<QueryCallable> tasks = new ArrayList<>();
+ tasks.add(new QueryCallable(CompareLevel.SAME, "left", "sql_exactly_agg"));
+ List<Pair<String, Throwable>> results = execAndGetResults(tasks);
+ Assert.assertEquals(results.size(), tasks.size());
+ report(results);
+ }
+
private List<Pair<String, Throwable>> execAndGetResults(List<QueryCallable> tasks)
throws InterruptedException, java.util.concurrent.ExecutionException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(9//