blob: 496008473fe4970c197eb762faa2249dd482bd17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.spark.datasources.{HBaseSparkConf, HBaseTableCatalog}
import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
case class FilterRangeRecord(
intCol0: Int,
boolCol1: Boolean,
doubleCol2: Double,
floatCol3: Float,
intCol4: Int,
longCol5: Long,
shortCol6: Short,
stringCol7: String,
byteCol8: Byte)
object FilterRangeRecord {
def apply(i: Int): FilterRangeRecord = {
FilterRangeRecord(if (i % 2 == 0) i else -i,
i % 2 == 0,
if (i % 2 == 0) i.toDouble else -i.toDouble,
i.toFloat,
if (i % 2 == 0) i else -i,
i.toLong,
i.toShort,
s"String$i extra",
i.toByte)
}
}
class PartitionFilterSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
@transient var sc: SparkContext = null
var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
var sqlContext: SQLContext = null
var df: DataFrame = null
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog -> cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
override def beforeAll() {
TEST_UTIL.startMiniCluster
val sparkConf = new SparkConf
sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
sc = new SparkContext("local", "test", sparkConf)
new HBaseContext(sc, TEST_UTIL.getConfiguration)
sqlContext = new SQLContext(sc)
}
override def afterAll() {
logInfo("shutting down minicluster")
TEST_UTIL.shutdownMiniCluster()
sc.stop()
}
override def beforeEach(): Unit = {
DefaultSourceStaticUtils.lastFiveExecutionRules.clear()
}
// The original raw data used for construct result set without going through
// data frame logic. It is used to verify the result set retrieved from data frame logic.
val rawResult = (0 until 32).map { i =>
FilterRangeRecord(i)
}
def collectToSet[T](df: DataFrame): Set[T] = {
df.collect().map(_.getAs[T](0)).toSet
}
val catalog = s"""{
|"table":{"namespace":"default", "name":"rangeTable"},
|"rowkey":"key",
|"columns":{
|"intCol0":{"cf":"rowkey", "col":"key", "type":"int"},
|"boolCol1":{"cf":"cf1", "col":"boolCol1", "type":"boolean"},
|"doubleCol2":{"cf":"cf2", "col":"doubleCol2", "type":"double"},
|"floatCol3":{"cf":"cf3", "col":"floatCol3", "type":"float"},
|"intCol4":{"cf":"cf4", "col":"intCol4", "type":"int"},
|"longCol5":{"cf":"cf5", "col":"longCol5", "type":"bigint"},
|"shortCol6":{"cf":"cf6", "col":"shortCol6", "type":"smallint"},
|"stringCol7":{"cf":"cf7", "col":"stringCol7", "type":"string"},
|"byteCol8":{"cf":"cf8", "col":"byteCol8", "type":"tinyint"}
|}
|}""".stripMargin
test("populate rangeTable") {
val sql = sqlContext
import sql.implicits._
sc.parallelize(rawResult).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
}
test("rangeTable full query") {
val df = withCatalog(catalog)
df.show
assert(df.count() === 32)
}
/**
*expected result: only showing top 20 rows
*+-------+
*|intCol0|
*+-------+
*| -31 |
*| -29 |
*| -27 |
*| -25 |
*| -23 |
*| -21 |
*| -19 |
*| -17 |
*| -15 |
*| -13 |
*| -11 |
*| -9 |
*| -7 |
*| -5 |
*| -3 |
*| -1 |
*+---- +
*/
test("rangeTable rowkey less than 0") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"intCol0" < 0).select($"intCol0")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(_.intCol0 < 0).map(_.intCol0).toSet
// filter results going through dataframe
val result = collectToSet[Int](s)
assert(expected === result)
}
/**
*expected result: only showing top 20 rows
*+-------+
*|intCol4|
*+-------+
*| -31 |
*| -29 |
*| -27 |
*| -25 |
*| -23 |
*| -21 |
*| -19 |
*| -17 |
*| -15 |
*| -13 |
*| -11 |
*| -9 |
*| -7 |
*| -5 |
*| -3 |
*| -1 |
*+-------+
*/
test("rangeTable int col less than 0") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"intCol4" < 0).select($"intCol4")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(_.intCol4 < 0).map(_.intCol4).toSet
// filter results going through dataframe
val result = collectToSet[Int](s)
assert(expected === result)
}
/**
*expected result: only showing top 20 rows
*+-----------+
*| doubleCol2|
*+-----------+
*| 0.0 |
*| 2.0 |
*|-31.0 |
*|-29.0 |
*|-27.0 |
*|-25.0 |
*|-23.0 |
*|-21.0 |
*|-19.0 |
*|-17.0 |
*|-15.0 |
*|-13.0 |
*|-11.0 |
*| -9.0 |
*| -7.0 |
*| -5.0 |
*| -3.0 |
*| -1.0 |
*+-----------+
*/
test("rangeTable double col less than 0") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"doubleCol2" < 3.0).select($"doubleCol2")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(_.doubleCol2 < 3.0).map(_.doubleCol2).toSet
// filter results going through dataframe
val result = collectToSet[Double](s)
assert(expected === result)
}
/**
* expected result: only showing top 20 rows
*+-------+
*|intCol0|
*+-------+
*| -31 |
*| -29 |
*| -27 |
*| -25 |
*| -23 |
*| -21 |
*| -19 |
*| -17 |
*| -15 |
*| -13 |
*| -11 |
*+-------+
*
*/
test("rangeTable lessequal than -10") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"intCol0" <= -10).select($"intCol0")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(_.intCol0 <= -10).map(_.intCol0).toSet
// filter results going through dataframe
val result = collectToSet[Int](s)
assert(expected === result)
}
/**
*expected result: only showing top 20 rows
*+-------+
*|intCol0|
*+----+
*| -31 |
*| -29 |
*| -27 |
*| -25 |
*| -23 |
*| -21 |
*| -19 |
*| -17 |
*| -15 |
*| -13 |
*| -11 |
*| -9 |
*+-------+
*/
test("rangeTable lessequal than -9") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"intCol0" <= -9).select($"intCol0")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(_.intCol0 <= -9).map(_.intCol0).toSet
// filter results going through dataframe
val result = collectToSet[Int](s)
assert(expected === result)
}
/**
*expected result: only showing top 20 rows
*+-------+
*|intCol0|
*+-------+
*| 0 |
*| 2 |
*| 4 |
*| 6 |
*| 8 |
*| 10 |
*| 12 |
*| 14 |
*| 16 |
*| 18 |
*| 20 |
*| 22 |
*| 24 |
*| 26 |
*| 28 |
*| 30 |
*| -9 |
*| -7 |
*| -5 |
*| -3 |
*+-------+
*/
test("rangeTable greaterequal than -9") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"intCol0" >= -9).select($"intCol0")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(_.intCol0 >= -9).map(_.intCol0).toSet
// filter results going through dataframe
val result = collectToSet[Int](s)
assert(expected === result)
}
/**
*expected result: only showing top 20 rows
*+-------+
*|intCol0|
*+-------+
*| 0 |
*| 2 |
*| 4 |
*| 6 |
*| 8 |
*| 10 |
*| 12 |
*| 14 |
*| 16 |
*| 18 |
*| 20 |
*| 22 |
*| 24 |
*| 26 |
*| 28 |
*| 30 |
*+-------+
*/
test("rangeTable greaterequal than 0") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"intCol0" >= 0).select($"intCol0")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(_.intCol0 >= 0).map(_.intCol0).toSet
// filter results going through dataframe
val result = collectToSet[Int](s)
assert(expected === result)
}
/**
*expected result: only showing top 20 rows
*+-------+
*|intCol0|
*+-------+
*| 12 |
*| 14 |
*| 16 |
*| 18 |
*| 20 |
*| 22 |
*| 24 |
*| 26 |
*| 28 |
*| 30 |
*+-------+
*/
test("rangeTable greater than 10") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"intCol0" > 10).select($"intCol0")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(_.intCol0 > 10).map(_.intCol0).toSet
// filter results going through dataframe
val result = collectToSet[Int](s)
assert(expected === result)
}
/**
*expected result: only showing top 20 rows
*+-------+
*|intCol0|
*+-------+
*| 0 |
*| 2 |
*| 4 |
*| 6 |
*| 8 |
*| 10 |
*| -9 |
*| -7 |
*| -5 |
*| -3 |
*| -1 |
*+-------+
*/
test("rangeTable and") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"intCol0" > -10 && $"intCol0" <= 10).select($"intCol0")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(x => x.intCol0 > -10 && x.intCol0 <= 10 ).map(_.intCol0).toSet
// filter results going through dataframe
val result = collectToSet[Int](s)
assert(expected === result)
}
/**
*expected result: only showing top 20 rows
*+-------+
*|intCol0|
*+-------+
*| 12 |
*| 14 |
*| 16 |
*| 18 |
*| 20 |
*| 22 |
*| 24 |
*| 26 |
*| 28 |
*| 30 |
*| -31 |
*| -29 |
*| -27 |
*| -25 |
*| -23 |
*| -21 |
*| -19 |
*| -17 |
*| -15 |
*| -13 |
*+-------+
*/
test("or") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"intCol0" <= -10 || $"intCol0" > 10).select($"intCol0")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(x => x.intCol0 <= -10 || x.intCol0 > 10).map(_.intCol0).toSet
// filter results going through dataframe
val result = collectToSet[Int](s)
assert(expected === result)
}
/**
*expected result: only showing top 20 rows
*+-------+
*|intCol0|
*+-------+
*| 0 |
*| 2 |
*| 4 |
*| 6 |
*| 8 |
*| 10 |
*| 12 |
*| 14 |
*| 16 |
*| 18 |
*| 20 |
*| 22 |
*| 24 |
*| 26 |
*| 28 |
*| 30 |
*| -31 |
*| -29 |
*| -27 |
*| -25 |
*+-------+
*/
test("rangeTable all") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(catalog)
val s = df.filter($"intCol0" >= -100).select($"intCol0")
s.show
// filter results without going through dataframe
val expected = rawResult.filter(_.intCol0 >= -100).map(_.intCol0).toSet
// filter results going through dataframe
val result = collectToSet[Int](s)
assert(expected === result)
}
}