blob: 068b1af6aca9347c67a7e28afe6d195d322a01cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark.example.datasources
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.hadoop.hbase.spark.AvroSerdes
import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.yetus.audience.InterfaceAudience
/**
* @param col0 Column #0, Type is String
* @param col1 Column #1, Type is Array[Byte]
*/
@InterfaceAudience.Private
case class AvroHBaseRecord(col0: String,
col1: Array[Byte])
@InterfaceAudience.Private
object AvroHBaseRecord {
val schemaString =
s"""{"namespace": "example.avro",
| "type": "record", "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": ["int", "null"]},
| {"name": "favorite_color", "type": ["string", "null"]},
| {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
| {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
| ] }""".stripMargin
val avroSchema: Schema = {
val p = new Schema.Parser
p.parse(schemaString)
}
def apply(i: Int): AvroHBaseRecord = {
val user = new GenericData.Record(avroSchema);
user.put("name", s"name${"%03d".format(i)}")
user.put("favorite_number", i)
user.put("favorite_color", s"color${"%03d".format(i)}")
val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
favoriteArray.add(s"number${i}")
favoriteArray.add(s"number${i+1}")
user.put("favorite_array", favoriteArray)
import scala.collection.JavaConverters._
val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
user.put("favorite_map", favoriteMap)
val avroByte = AvroSerdes.serialize(user, avroSchema)
AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
}
}
@InterfaceAudience.Private
object AvroSource {
def catalog = s"""{
|"table":{"namespace":"default", "name":"ExampleAvrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
|}
|}""".stripMargin
def avroCatalog = s"""{
|"table":{"namespace":"default", "name":"ExampleAvrotable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
def avroCatalogInsert = s"""{
|"table":{"namespace":"default", "name":"ExampleAvrotableInsert"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("AvroSourceExample")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val data = (0 to 255).map { i =>
AvroHBaseRecord(i)
}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
val df = withCatalog(catalog)
df.show()
df.printSchema()
df.registerTempTable("ExampleAvrotable")
val c = sqlContext.sql("select count(1) from ExampleAvrotable")
c.show()
val filtered = df.select($"col0", $"col1.favorite_array").where($"col0" === "name001")
filtered.show()
val collected = filtered.collect()
if (collected(0).getSeq[String](1)(0) != "number1") {
throw new UserCustomizedSampleException("value invalid")
}
if (collected(0).getSeq[String](1)(1) != "number2") {
throw new UserCustomizedSampleException("value invalid")
}
df.write.options(
Map("avroSchema"->AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog->avroCatalogInsert,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
val newDF = withCatalog(avroCatalogInsert)
newDF.show()
newDF.printSchema()
if(newDF.count() != 256) {
throw new UserCustomizedSampleException("value invalid")
}
df.filter($"col1.name" === "name005" || $"col1.name" <= "name005")
.select("col0", "col1.favorite_color", "col1.favorite_number")
.show()
df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007"))
.select("col0", "col1.favorite_color", "col1.favorite_number")
.show()
}
}