KYLIN-5040 Add rule for exactly match cuboid
diff --git a/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query00.sql b/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query00.sql
new file mode 100644
index 0000000..4268b28
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query00.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select CAL_DT,LSTG_FORMAT_NAME,LSTG_SITE_ID,SLR_SEGMENT_CD,
+       sum(price) as GMV, count(1) as TRANS_CNT,
+       min(price) as minP, max(price) as maxP
+from test_kylin_fact
+group by CAL_DT,LSTG_FORMAT_NAME,SLR_SEGMENT_CD,LSTG_SITE_ID
+;{"scanRowCount":9287,"scanBytes":0,"scanFiles":2,"cuboidId":[276480],"exactlyMatched":[true]}
\ No newline at end of file
diff --git a/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query01.sql b/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query01.sql
new file mode 100644
index 0000000..13d08e8
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql_exactly_agg_multi_segment/query01.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select LSTG_FORMAT_NAME,LSTG_SITE_ID,SLR_SEGMENT_CD,
+       sum(price) as GMV, count(1) as TRANS_CNT,
+       min(price) as minP, max(price) as maxP
+from test_kylin_fact
+group by LSTG_FORMAT_NAME,SLR_SEGMENT_CD,LSTG_SITE_ID
+;{"scanRowCount":537,"scanBytes":0,"scanFiles":2,"cuboidId":[14336],"exactlyMatched":[false]}
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
index 8c7c2f0..b2bdaef 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/AggregatePlan.scala
@@ -22,7 +22,8 @@
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.sql.SqlKind
 import org.apache.kylin.common.KylinConfig
-import org.apache.kylin.metadata.model.FunctionDesc
+import org.apache.kylin.cube.CubeInstance
+import org.apache.kylin.metadata.model.{FunctionDesc, PartitionDesc, SegmentStatusEnum, TblColRef}
 import org.apache.kylin.query.relnode.{KylinAggregateCall, OLAPAggregateRel}
 import org.apache.kylin.query.runtime.RuntimeHelper
 import org.apache.kylin.query.SchemaProcessor
@@ -244,7 +245,16 @@
     }
     val groupByCols = rel.getGroups.asScala.map(_.getIdentity).toSet
     if (groupByCols.isEmpty) return false
+    val f = olapContext.realization.asInstanceOf[CubeInstance].getSegments(SegmentStatusEnum.READY).size()
+    if (!groupByContainsPartition(groupByCols, cuboid.getCubeDesc.getModel.getPartitionDesc) &&
+      olapContext.realization.asInstanceOf[CubeInstance].getSegments(SegmentStatusEnum.READY).size() != 1) {
+      return false
+    }
     val cuboidDims = cuboid.getColumns.asScala.map(_.getIdentity).toSet
     groupByCols.equals(cuboidDims)
   }
+
+  def groupByContainsPartition(groupByCols: Set[String], partitionDesc: PartitionDesc): Boolean = {
+    partitionDesc != null && partitionDesc.getPartitionDateColumnRef != null && groupByCols.contains(partitionDesc.getPartitionDateColumnRef.getIdentity)
+  }
 }
diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
index cb1b049..a0b7639 100644
--- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
+++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
@@ -105,6 +105,21 @@
         report(results);
     }
 
+    @Test
+    public void exactlyMatchCuboidMultiSegmentTest() throws Exception {
+        final KylinConfig config = KylinConfig.getInstanceFromEnv();
+        buildSegments("ci_left_join_cube", new SegmentRange.TSRange(dateToLong("2012-01-01"), dateToLong("2013-01-01")),
+                new SegmentRange.TSRange(dateToLong("2013-01-01"), dateToLong("2015-01-01")));
+
+        populateSSWithCSVData(config, getProject(), KylinSparkEnv.getSparkSession());
+
+        List<QueryCallable> tasks = new ArrayList<>();
+        tasks.add(new QueryCallable(CompareLevel.SAME, "left", "sql_exactly_agg"));
+        List<Pair<String, Throwable>> results = execAndGetResults(tasks);
+        Assert.assertEquals(results.size(), tasks.size());
+        report(results);
+    }
+
     private List<Pair<String, Throwable>> execAndGetResults(List<QueryCallable> tasks)
             throws InterruptedException, java.util.concurrent.ExecutionException {
         ThreadPoolExecutor executor = new ThreadPoolExecutor(9//