blob: afe515bce9d9a8183030688d5d2f9336d3abda50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.spark.datasources.{HBaseSparkConf, HBaseTableCatalog}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseTestingUtility, TableName}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.xml.sax.SAXParseException
case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord {
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i: $t",
i.toByte)
}
}
case class AvroHBaseKeyRecord(col0: Array[Byte],
col1: Array[Byte])
object AvroHBaseKeyRecord {
val schemaString =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [ {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": ["int", "null"]},
| {"name": "favorite_color", "type": ["string", "null"]} ] }""".stripMargin
val avroSchema: Schema = {
val p = new Schema.Parser
p.parse(schemaString)
}
def apply(i: Int): AvroHBaseKeyRecord = {
val user = new GenericData.Record(avroSchema);
user.put("name", s"name${"%03d".format(i)}")
user.put("favorite_number", i)
user.put("favorite_color", s"color${"%03d".format(i)}")
val avroByte = AvroSerdes.serialize(user, avroSchema)
AvroHBaseKeyRecord(avroByte, avroByte)
}
}
class DefaultSourceSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
@transient var sc: SparkContext = null
var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
val t1TableName = "t1"
val t2TableName = "t2"
val columnFamily = "c"
var sqlContext:SQLContext = null
var df:DataFrame = null
override def beforeAll() {
TEST_UTIL.startMiniCluster
logInfo(" - minicluster started")
try
TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
catch {
case e: Exception => logInfo(" - no table " + t1TableName + " found")
}
try
TEST_UTIL.deleteTable(TableName.valueOf(t2TableName))
catch {
case e: Exception => logInfo(" - no table " + t2TableName + " found")
}
logInfo(" - creating table " + t1TableName)
TEST_UTIL.createTable(TableName.valueOf(t1TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
logInfo(" - creating table " + t2TableName)
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
val sparkConf = new SparkConf
sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
sc = new SparkContext("local", "test", sparkConf)
val connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration)
try {
val t1Table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
t1Table.put(put)
put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
t1Table.put(put)
put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
t1Table.put(put)
put = new Put(Bytes.toBytes("get4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
t1Table.put(put)
put = new Put(Bytes.toBytes("get5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
t1Table.put(put)
} finally {
t1Table.close()
}
val t2Table = connection.getTable(TableName.valueOf("t2"))
try {
var put = new Put(Bytes.toBytes(1))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(1))
t2Table.put(put)
put = new Put(Bytes.toBytes(2))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(4))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("FOO"))
t2Table.put(put)
put = new Put(Bytes.toBytes(3))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
t2Table.put(put)
put = new Put(Bytes.toBytes(4))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("10"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(10))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("z"), Bytes.toBytes("BAR"))
t2Table.put(put)
put = new Put(Bytes.toBytes(5))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("8"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("i"), Bytes.toBytes(8))
t2Table.put(put)
} finally {
t2Table.close()
}
} finally {
connection.close()
}
def hbaseTable1Catalog = s"""{
|"table":{"namespace":"default", "name":"t1"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"b", "type":"string"}
|}
|}""".stripMargin
new HBaseContext(sc, TEST_UTIL.getConfiguration)
sqlContext = new SQLContext(sc)
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->hbaseTable1Catalog))
df.registerTempTable("hbaseTable1")
def hbaseTable2Catalog = s"""{
|"table":{"namespace":"default", "name":"t2"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"int"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"b", "type":"string"}
|}
|}""".stripMargin
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->hbaseTable2Catalog))
df.registerTempTable("hbaseTable2")
}
override def afterAll() {
TEST_UTIL.deleteTable(TableName.valueOf(t1TableName))
logInfo("shuting down minicluster")
TEST_UTIL.shutdownMiniCluster()
sc.stop()
}
override def beforeEach(): Unit = {
DefaultSourceStaticUtils.lastFiveExecutionRules.clear()
}
/**
* A example of query three fields and also only using rowkey points for the filter
*/
test("Test rowKey point only rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"(KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 3)
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( KEY_FIELD == 0 OR KEY_FIELD == 1 ) OR KEY_FIELD == 2 )"))
assert(executionRules.rowKeyFilter.points.size == 3)
assert(executionRules.rowKeyFilter.ranges.size == 0)
}
/**
* A example of query three fields and also only using cell points for the filter
*/
test("Test cell point only rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"(B_FIELD = '4' or B_FIELD = '10' or A_FIELD = 'foo1')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 3)
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( B_FIELD == 0 OR B_FIELD == 1 ) OR A_FIELD == 2 )"))
}
/**
* A example of a OR merge between to ranges the result is one range
* Also an example of less then and greater then
*/
test("Test two range rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"( KEY_FIELD < 'get2' or KEY_FIELD > 'get3')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 3)
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 2)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
assert(Bytes.equals(scanRange1.upperBound,Bytes.toBytes("get2")))
assert(scanRange1.isLowerBoundEqualTo)
assert(!scanRange1.isUpperBoundEqualTo)
val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes("get3")))
assert(scanRange2.upperBound == null)
assert(!scanRange2.isLowerBoundEqualTo)
assert(scanRange2.isUpperBoundEqualTo)
}
/**
* A example of a OR merge between to ranges the result is one range
* Also an example of less then and greater then
*
* This example makes sure the code works for a int rowKey
*/
test("Test two range rowKey query where the rowKey is Int and there is a range over lap") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
"WHERE " +
"( KEY_FIELD < 4 or KEY_FIELD > 2)").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 2)
assert(results.length == 5)
}
/**
* A example of a OR merge between to ranges the result is two ranges
* Also an example of less then and greater then
*
* This example makes sure the code works for a int rowKey
*/
test("Test two range rowKey query where the rowKey is Int and the ranges don't over lap") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
"WHERE " +
"( KEY_FIELD < 2 or KEY_FIELD > 4)").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( KEY_FIELD < 0 OR KEY_FIELD > 1 )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 3)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
assert(scanRange1.isLowerBoundEqualTo)
assert(!scanRange1.isUpperBoundEqualTo)
val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
assert(scanRange2.isUpperBoundEqualTo)
assert(results.length == 2)
}
/**
* A example of a AND merge between to ranges the result is one range
* Also an example of less then and equal to and greater then and equal to
*/
test("Test one combined range rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 2)
val expr = executionRules.dynamicLogicExpression.toExpressionString
assert(expr.equals("( ( KEY_FIELD isNotNull AND KEY_FIELD <= 0 ) AND KEY_FIELD >= 1 )"), expr)
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get2")))
assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get3")))
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
}
/**
* Do a select with no filters
*/
test("Test select only query") {
val results = df.select("KEY_FIELD").take(10)
assert(results.length == 5)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
}
/**
* A complex query with one point and one range for both the
* rowKey and the a column
*/
test("Test SQL point and range combo") {
val results = sqlContext.sql("SELECT KEY_FIELD FROM hbaseTable1 " +
"WHERE " +
"(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
"(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( KEY_FIELD == 0 AND B_FIELD < 1 ) OR " +
"( KEY_FIELD >= 2 AND B_FIELD == 3 ) )"))
assert(executionRules.rowKeyFilter.points.size == 1)
assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get3")))
assert(scanRange1.upperBound == null)
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
assert(results.length == 3)
}
/**
* A complex query with two complex ranges that doesn't merge into one
*/
test("Test two complete range non merge rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable2 " +
"WHERE " +
"( KEY_FIELD >= 1 and KEY_FIELD <= 2) or" +
"( KEY_FIELD > 3 and KEY_FIELD <= 5)").take(10)
assert(results.length == 4)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
"( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 2)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes(1)))
assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes(2)))
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
assert(Bytes.equals(scanRange2.lowerBound,Bytes.toBytes(3)))
assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes(5)))
assert(!scanRange2.isLowerBoundEqualTo)
assert(scanRange2.isUpperBoundEqualTo)
}
/**
* A complex query with two complex ranges that does merge into one
*/
test("Test two complete range merge rowKey query") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get2') or" +
"( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 4)
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( KEY_FIELD >= 0 AND KEY_FIELD <= 1 ) OR " +
"( KEY_FIELD > 2 AND KEY_FIELD <= 3 ) )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 2)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("get1")))
assert(Bytes.equals(scanRange1.upperBound, Bytes.toBytes("get2")))
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
val scanRange2 = executionRules.rowKeyFilter.ranges.get(1).get
assert(Bytes.equals(scanRange2.lowerBound, Bytes.toBytes("get3")))
assert(Bytes.equals(scanRange2.upperBound, Bytes.toBytes("get5")))
assert(!scanRange2.isLowerBoundEqualTo)
assert(scanRange2.isUpperBoundEqualTo)
}
test("Test OR logic with a one RowKey and One column") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"( KEY_FIELD >= 'get1' or A_FIELD <= 'foo2') or" +
"( KEY_FIELD > 'get3' or B_FIELD <= '4')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 5)
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( KEY_FIELD >= 0 OR A_FIELD <= 1 ) OR " +
"( KEY_FIELD > 2 OR B_FIELD <= 3 ) )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
//This is the main test for 14406
//Because the key is joined through a or with a qualifier
//There is no filter on the rowKey
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
assert(scanRange1.upperBound == null)
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
}
test("Test OR logic with a two columns") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"( B_FIELD > '4' or A_FIELD <= 'foo2') or" +
"( A_FIELD > 'foo2' or B_FIELD < '4')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 5)
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( ( B_FIELD > 0 OR A_FIELD <= 1 ) OR " +
"( A_FIELD > 2 OR B_FIELD < 3 ) )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
assert(scanRange1.upperBound == null)
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
}
test("Test single RowKey Or Column logic") {
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseTable1 " +
"WHERE " +
"( KEY_FIELD >= 'get4' or A_FIELD <= 'foo2' )").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 4)
assert(executionRules.dynamicLogicExpression.toExpressionString.
equals("( KEY_FIELD >= 0 OR A_FIELD <= 1 )"))
assert(executionRules.rowKeyFilter.points.size == 0)
assert(executionRules.rowKeyFilter.ranges.size == 1)
val scanRange1 = executionRules.rowKeyFilter.ranges.get(0).get
assert(Bytes.equals(scanRange1.lowerBound,Bytes.toBytes("")))
assert(scanRange1.upperBound == null)
assert(scanRange1.isLowerBoundEqualTo)
assert(scanRange1.isUpperBoundEqualTo)
}
test("Test table that doesn't exist") {
val catalog = s"""{
|"table":{"namespace":"default", "name":"t1NotThere"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"c", "type":"string"}
|}
|}""".stripMargin
intercept[Exception] {
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->catalog))
df.registerTempTable("hbaseNonExistingTmp")
sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNonExistingTmp " +
"WHERE " +
"( KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or" +
"( KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')").count()
}
DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
}
test("Test table with column that doesn't exist") {
val catalog = s"""{
|"table":{"namespace":"default", "name":"t1"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
|"C_FIELD":{"cf":"c", "col":"c", "type":"string"}
|}
|}""".stripMargin
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->catalog))
df.registerTempTable("hbaseFactColumnTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, " +
"B_FIELD, A_FIELD FROM hbaseFactColumnTmp")
assert(result.count() == 5)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
}
test("Test table with INT column") {
val catalog = s"""{
|"table":{"namespace":"default", "name":"t1"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
|"I_FIELD":{"cf":"c", "col":"i", "type":"int"}
|}
|}""".stripMargin
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->catalog))
df.registerTempTable("hbaseIntTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, I_FIELD FROM hbaseIntTmp"+
" where I_FIELD > 4 and I_FIELD < 10")
val localResult = result.take(5)
assert(localResult.length == 2)
assert(localResult(0).getInt(2) == 8)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
val expr = executionRules.dynamicLogicExpression.toExpressionString
logInfo(expr)
assert(expr.equals("( ( I_FIELD isNotNull AND I_FIELD > 0 ) AND I_FIELD < 1 )"), expr)
}
test("Test table with INT column defined at wrong type") {
val catalog = s"""{
|"table":{"namespace":"default", "name":"t1"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
|"I_FIELD":{"cf":"c", "col":"i", "type":"string"}
|}
|}""".stripMargin
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->catalog))
df.registerTempTable("hbaseIntWrongTypeTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, " +
"B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
val localResult = result.take(10)
assert(localResult.length == 5)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
assert(localResult(0).getString(2).length == 4)
assert(localResult(0).getString(2).charAt(0).toByte == 0)
assert(localResult(0).getString(2).charAt(1).toByte == 0)
assert(localResult(0).getString(2).charAt(2).toByte == 0)
assert(localResult(0).getString(2).charAt(3).toByte == 1)
}
test("Test bad column type") {
val catalog = s"""{
|"table":{"namespace":"default", "name":"t1"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"FOOBAR"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"I_FIELD":{"cf":"c", "col":"i", "type":"string"}
|}
|}""".stripMargin
intercept[Exception] {
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->catalog))
df.registerTempTable("hbaseIntWrongTypeTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, " +
"B_FIELD, I_FIELD FROM hbaseIntWrongTypeTmp")
val localResult = result.take(10)
assert(localResult.length == 5)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
}
}
test("Test HBaseSparkConf matching") {
val df = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
Map("cacheSize" -> "100",
"batchNum" -> "100",
"blockCacheingEnable" -> "true", "rowNum" -> "10"))
assert(df.count() == 10)
val df1 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
Map("cacheSize" -> "1000",
"batchNum" -> "100", "blockCacheingEnable" -> "true", "rowNum" -> "10"))
intercept[Exception] {
assert(df1.count() == 10)
}
val df2 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
Map("cacheSize" -> "100",
"batchNum" -> "1000", "blockCacheingEnable" -> "true", "rowNum" -> "10"))
intercept[Exception] {
assert(df2.count() == 10)
}
val df3 = sqlContext.load("org.apache.hadoop.hbase.spark.HBaseTestSource",
Map("cacheSize" -> "100",
"batchNum" -> "100", "blockCacheingEnable" -> "false", "rowNum" -> "10"))
intercept[Exception] {
assert(df3.count() == 10)
}
}
test("Test table with sparse column") {
val catalog = s"""{
|"table":{"namespace":"default", "name":"t1"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
|"Z_FIELD":{"cf":"c", "col":"z", "type":"string"}
|}
|}""".stripMargin
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->catalog))
df.registerTempTable("hbaseZTmp")
val result = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, Z_FIELD FROM hbaseZTmp")
val localResult = result.take(10)
assert(localResult.length == 5)
assert(localResult(0).getString(2) == null)
assert(localResult(1).getString(2) == "FOO")
assert(localResult(2).getString(2) == null)
assert(localResult(3).getString(2) == "BAR")
assert(localResult(4).getString(2) == null)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(executionRules.dynamicLogicExpression == null)
}
test("Test with column logic disabled") {
val catalog = s"""{
|"table":{"namespace":"default", "name":"t1"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"b", "type":"string"},
|"Z_FIELD":{"cf":"c", "col":"z", "type":"string"}
|}
|}""".stripMargin
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->catalog,
HBaseSparkConf.PUSHDOWN_COLUMN_FILTER -> "false"))
df.registerTempTable("hbaseNoPushDownTmp")
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD, A_FIELD FROM hbaseNoPushDownTmp " +
"WHERE " +
"(KEY_FIELD <= 'get3' and KEY_FIELD >= 'get2')").take(10)
val executionRules = DefaultSourceStaticUtils.lastFiveExecutionRules.poll()
assert(results.length == 2)
assert(executionRules.dynamicLogicExpression == null)
}
def writeCatalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf1", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf3", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
test("populate table") {
val sql = sqlContext
import sql.implicits._
val data = (0 to 255).map { i =>
HBaseRecord(i, "extra")
}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
}
test("empty column") {
val df = withCatalog(writeCatalog)
df.registerTempTable("table0")
val c = sqlContext.sql("select count(1) from table0").rdd.collect()(0)(0).asInstanceOf[Long]
assert(c == 256)
}
test("full query") {
val df = withCatalog(writeCatalog)
df.show()
assert(df.count() == 256)
}
test("filtered query0") {
val sql = sqlContext
import sql.implicits._
val df = withCatalog(writeCatalog)
val s = df.filter($"col0" <= "row005")
.select("col0", "col1")
s.show()
assert(s.count() == 6)
}
test("Timestamp semantics") {
val sql = sqlContext
import sql.implicits._
// There's already some data in here from recently. Let's throw something in
// from 1993 which we can include/exclude and add some data with the implicit (now) timestamp.
// Then we should be able to cross-section it and only get points in between, get the most recent view
// and get an old view.
val oldMs = 754869600000L
val startMs = System.currentTimeMillis()
val oldData = (0 to 100).map { i =>
HBaseRecord(i, "old")
}
val newData = (200 to 255).map { i =>
HBaseRecord(i, "new")
}
sc.parallelize(oldData).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5",
HBaseSparkConf.TIMESTAMP -> oldMs.toString))
.format("org.apache.hadoop.hbase.spark")
.save()
sc.parallelize(newData).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseTableCatalog.tableName -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
// Test specific timestamp -- Full scan, Timestamp
val individualTimestamp = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> oldMs.toString))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(individualTimestamp.count() == 101)
// Test getting everything -- Full Scan, No range
val everything = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(everything.count() == 256)
// Test getting everything -- Pruned Scan, TimeRange
val element50 = everything.where(col("col0") === lit("row050")).select("col7").collect()(0)(0)
assert(element50 == "String50: extra")
val element200 = everything.where(col("col0") === lit("row200")).select("col7").collect()(0)(0)
assert(element200 == "String200: new")
// Test Getting old stuff -- Full Scan, TimeRange
val oldRange = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
HBaseSparkConf.TIMERANGE_END -> (oldMs + 100).toString))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(oldRange.count() == 101)
// Test Getting old stuff -- Pruned Scan, TimeRange
val oldElement50 = oldRange.where(col("col0") === lit("row050")).select("col7").collect()(0)(0)
assert(oldElement50 == "String50: old")
// Test Getting middle stuff -- Full Scan, TimeRange
val middleRange = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
HBaseSparkConf.TIMERANGE_END -> (startMs + 100).toString))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(middleRange.count() == 256)
// Test Getting middle stuff -- Pruned Scan, TimeRange
val middleElement200 = middleRange.where(col("col0") === lit("row200")).select("col7").collect()(0)(0)
assert(middleElement200 == "String200: extra")
}
// catalog for insertion
def avroWriteCatalog = s"""{
|"table":{"namespace":"default", "name":"avrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"binary"},
|"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
|}
|}""".stripMargin
// catalog for read
def avroCatalog = s"""{
|"table":{"namespace":"default", "name":"avrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "avro":"avroSchema"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
// for insert to another table
def avroCatalogInsert = s"""{
|"table":{"namespace":"default", "name":"avrotableInsert"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "avro":"avroSchema"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
def withAvroCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map("avroSchema"->AvroHBaseKeyRecord.schemaString,
HBaseTableCatalog.tableCatalog->avroCatalog))
.format("org.apache.hadoop.hbase.spark")
.load()
}
test("populate avro table") {
val sql = sqlContext
import sql.implicits._
val data = (0 to 255).map { i =>
AvroHBaseKeyRecord(i)
}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> avroWriteCatalog,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
}
test("avro empty column") {
val df = withAvroCatalog(avroCatalog)
df.registerTempTable("avrotable")
val c = sqlContext.sql("select count(1) from avrotable")
.rdd.collect()(0)(0).asInstanceOf[Long]
assert(c == 256)
}
test("avro full query") {
val df = withAvroCatalog(avroCatalog)
df.show()
df.printSchema()
assert(df.count() == 256)
}
test("avro serialization and deserialization query") {
val df = withAvroCatalog(avroCatalog)
df.write.options(
Map("avroSchema"->AvroHBaseKeyRecord.schemaString,
HBaseTableCatalog.tableCatalog->avroCatalogInsert,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
val newDF = withAvroCatalog(avroCatalogInsert)
newDF.show()
newDF.printSchema()
assert(newDF.count() == 256)
}
test("avro filtered query") {
val sql = sqlContext
import sql.implicits._
val df = withAvroCatalog(avroCatalog)
val r = df.filter($"col1.name" === "name005" || $"col1.name" <= "name005")
.select("col0", "col1.favorite_color", "col1.favorite_number")
r.show()
assert(r.count() == 6)
}
test("avro Or filter") {
val sql = sqlContext
import sql.implicits._
val df = withAvroCatalog(avroCatalog)
val s = df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007"))
.select("col0", "col1.favorite_color", "col1.favorite_number")
s.show()
assert(s.count() == 7)
}
test("test create HBaseRelation with new context throws SAXParseException") {
val catalog = s"""{
|"table":{"namespace":"default", "name":"t1NotThere"},
|"rowkey":"key",
|"columns":{
|"KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"},
|"A_FIELD":{"cf":"c", "col":"a", "type":"string"},
|"B_FIELD":{"cf":"c", "col":"c", "type":"string"}
|}
|}""".stripMargin
try {
HBaseRelation(Map(HBaseTableCatalog.tableCatalog -> catalog,
HBaseSparkConf.USE_HBASECONTEXT -> "false"), None)(sqlContext)
} catch {
case e: Throwable => if(e.getCause.isInstanceOf[SAXParseException]) {
fail("SAXParseException due to configuration loading empty resource")
} else {
println("Failed due to some other exception, ignore " + e.getMessage)
}
}
}
}