blob: 59374cf91d578e21a0bf10d7a1718a5c2a9b8bbb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.filterexpr
import java.util
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.datamap.DataMapFilter
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.scan.expression.logical.{AndExpression, TrueExpression}
import org.apache.carbondata.core.scan.filter.FilterUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.spark.rdd.CarbonScanRDD
/**
* test class to verify the functionality of Implicit filter expression
*/
class TestImplicitFilterExpression extends QueryTest with BeforeAndAfterAll {
override def beforeAll(): Unit = {
sql("drop table if exists implicit_test")
sql(
"create table implicit_test(firstname string, lastname string, age int) " +
"STORED AS carbondata")
sql("insert into implicit_test select 'bob','marshall',35")
}
test("test implicit filter expression for data pruning with valid implicit filter value") {
val query: DataFrame = sql("select count(*) from implicit_test where lastname='marshall'")
// 1 row should be returned for blockletId 0
verifyResultWithImplicitFilter(query, 1, 0)
}
test("test implicit filter expression for data pruning with invalid implicit filter value") {
val query: DataFrame = sql("select count(*) from implicit_test where lastname='marshall'")
// No row should be returned for blockletId 1
verifyResultWithImplicitFilter(query, 0, 1)
}
private def verifyResultWithImplicitFilter(query: DataFrame,
expectedResultCount: Int,
blockletId: Int): Unit = {
// from the plan extract the CarbonScanRDD
val scanRDD = query.queryExecution.sparkPlan.collect {
case scan: CarbonDataSourceScan if (scan.rdd.isInstanceOf[CarbonScanRDD[InternalRow]]) =>
scan.rdd.asInstanceOf[CarbonScanRDD[InternalRow]]
}.head
// get carbon relation
val relation: CarbonRelation = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore
.lookupRelation(Some("default"), "implicit_test")(sqlContext.sparkSession)
.asInstanceOf[CarbonRelation]
// get carbon table from carbon relation
val carbonTable = relation.carbonTable
// get the segment path
val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
// list carbondata files from the segment path
val files = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = {
if (file.getName.endsWith(CarbonTablePath.getCarbonDataExtension)) {
true
} else {
false
}
}
})
// assert that only 1 carbondata file exists
assert(files.length == 1)
// get the carbondata file complete path and name
val carbondataFileName = FileFactory.getUpdatedFilePath(files.head.getPath)
// get the shourt unique name for the carbondata file
// Example: complete file name will be as below
// /opt/db/implicit_test/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-1545986389020.carbondata
// short file name: 0/0/0-0_batchno0-0-0-1545986389020
val carbondataFileShortName = CarbonTablePath
.getShortBlockId(carbondataFileName.substring(carbondataFileName.lastIndexOf("/Part") + 1))
// create block to blocklet mapping indicating which all blocklets for a given block
// contain the data
val blockToBlockletMap = new util.HashMap[String, util.Set[Integer]]()
val blockletList = new util.HashSet[Integer]()
// add blocklet Id 0 to the list
blockletList.add(blockletId)
blockToBlockletMap.put(carbondataFileShortName, blockletList)
// create a new AND expression with True expression as right child
val filterExpression = new AndExpression(scanRDD.dataMapFilter.getExpression, new TrueExpression(null))
// create implicit expression which will replace the right child (True expression)
FilterUtil.createImplicitExpressionAndSetAsRightChild(filterExpression, blockToBlockletMap)
// update the filter expression
scanRDD.dataMapFilter = new DataMapFilter(carbonTable, filterExpression)
// execute the query and get the result count
checkAnswer(query.toDF(), Seq(Row(expectedResultCount)))
}
override def afterAll(): Unit = {
sql("drop table if exists implicit_test")
}
}