blob: ac7e7766de3b39e251a70a3f8ab73c718e7cbbec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.datasources
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.yetus.audience.InterfaceAudience
@InterfaceAudience.Private
class UserCustomizedSampleException(message: String = null, cause: Throwable = null) extends
RuntimeException(UserCustomizedSampleException.message(message, cause), cause)
@InterfaceAudience.Private
object UserCustomizedSampleException {
def message(message: String, cause: Throwable) =
if (message != null) message
else if (cause != null) cause.toString()
else null
}
@InterfaceAudience.Private
case class IntKeyRecord(
col0: Integer,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object IntKeyRecord {
def apply(i: Int): IntKeyRecord = {
IntKeyRecord(if (i % 2 == 0) i else -i,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i extra",
i.toByte)
}
}
@InterfaceAudience.Private
object DataType {
val cat = s"""{
|"table":{"namespace":"default", "name":"DataTypeExampleTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"int"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
def main(args: Array[String]){
val sparkConf = new SparkConf().setAppName("DataTypeExample")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
// test populate table
val data = (0 until 32).map { i =>
IntKeyRecord(i)
}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
// test less than 0
val df = withCatalog(cat)
val s = df.filter($"col0" < 0)
s.show()
if(s.count() != 16){
throw new UserCustomizedSampleException("value invalid")
}
//test less or equal than -10. The number of results is 11
val num1 = df.filter($"col0" <= -10)
num1.show()
val c1 = num1.count()
println(s"test result count should be 11: $c1")
//test less or equal than -9. The number of results is 12
val num2 = df.filter($"col0" <= -9)
num2.show()
val c2 = num2.count()
println(s"test result count should be 12: $c2")
//test greater or equal than -9". The number of results is 21
val num3 = df.filter($"col0" >= -9)
num3.show()
val c3 = num3.count()
println(s"test result count should be 21: $c3")
//test greater or equal than 0. The number of results is 16
val num4 = df.filter($"col0" >= 0)
num4.show()
val c4 = num4.count()
println(s"test result count should be 16: $c4")
//test greater than 10. The number of results is 10
val num5 = df.filter($"col0" > 10)
num5.show()
val c5 = num5.count()
println(s"test result count should be 10: $c5")
// test "and". The number of results is 11
val num6 = df.filter($"col0" > -10 && $"col0" <= 10)
num6.show()
val c6 = num6.count()
println(s"test result count should be 11: $c6")
//test "or". The number of results is 21
val num7 = df.filter($"col0" <= -10 || $"col0" > 10)
num7.show()
val c7 = num7.count()
println(s"test result count should be 21: $c7")
//test "all". The number of results is 32
val num8 = df.filter($"col0" >= -100)
num8.show()
val c8 = num8.count()
println(s"test result count should be 32: $c8")
//test "full query"
val df1 = withCatalog(cat)
df1.show()
val c_df = df1.count()
println(s"df count should be 32: $c_df")
if(c_df != 32){
throw new UserCustomizedSampleException("value invalid")
}
}
}