| //// |
| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| . . http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| //// |
| |
| [[spark]] |
| = HBase and Spark |
| :doctype: book |
| :numbered: |
| :toc: left |
| :icons: font |
| :experimental: |
| |
| link:http://spark.apache.org/[Apache Spark] is a software framework that is used |
| to process data in memory in a distributed manner, and is replacing MapReduce in |
| many use cases. |
| |
| Spark itself is out of scope of this document, please refer to the Spark site for |
| more information on the Spark project and subprojects. This document will focus |
| on 4 main interaction points between Spark and HBase. Those interaction points are: |
| |
| Basic Spark:: |
| The ability to have an HBase Connection at any point in your Spark DAG. |
| Spark Streaming:: |
| The ability to have an HBase Connection at any point in your Spark Streaming |
| application. |
| Spark Bulk Load:: |
| The ability to write directly to HBase HFiles for bulk insertion into HBase |
| SparkSQL/DataFrames:: |
| The ability to write SparkSQL that draws on tables that are represented in HBase. |
| |
| The following sections will walk through examples of all these interaction points. |
| |
| == Basic Spark |
| |
| This section discusses Spark HBase integration at the lowest and simplest levels. |
| All the other interaction points are built upon the concepts that will be described |
| here. |
| |
| At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext |
| takes in HBase configurations and pushes them to the Spark executors. This allows |
| us to have an HBase Connection per Spark Executor in a static location. |
| |
| For reference, Spark Executors can be on the same nodes as the Region Servers or |
| on different nodes there is no dependence of co-location. Think of every Spark |
| Executor as a multi-threaded client application. This allows any Spark Tasks |
| running on the executors to access the shared Connection object. |
| |
| .HBaseContext Usage Example |
| ==== |
| |
| This example shows how HBaseContext can be used to do a `foreachPartition` on a RDD |
| in Scala: |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| ... |
| |
| val hbaseContext = new HBaseContext(sc, config) |
| |
| rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { |
| val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) |
| it.foreach((putRecord) => { |
| . val put = new Put(putRecord._1) |
| . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) |
| . bufferedMutator.mutate(put) |
| }) |
| bufferedMutator.flush() |
| bufferedMutator.close() |
| }) |
| ---- |
| |
| Here is the same example implemented in Java: |
| |
| [source, java] |
| ---- |
| JavaSparkContext jsc = new JavaSparkContext(sparkConf); |
| |
| try { |
| List<byte[]> list = new ArrayList<>(); |
| list.add(Bytes.toBytes("1")); |
| ... |
| list.add(Bytes.toBytes("5")); |
| |
| JavaRDD<byte[]> rdd = jsc.parallelize(list); |
| Configuration conf = HBaseConfiguration.create(); |
| |
| JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); |
| |
| hbaseContext.foreachPartition(rdd, |
| new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() { |
| public void call(Tuple2<Iterator<byte[]>, Connection> t) |
| throws Exception { |
| Table table = t._2().getTable(TableName.valueOf(tableName)); |
| BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName)); |
| while (t._1().hasNext()) { |
| byte[] b = t._1().next(); |
| Result r = table.get(new Get(b)); |
| if (r.getExists()) { |
| mutator.mutate(new Put(b)); |
| } |
| } |
| |
| mutator.flush(); |
| mutator.close(); |
| table.close(); |
| } |
| }); |
| } finally { |
| jsc.stop(); |
| } |
| ---- |
| ==== |
| |
| All functionality between Spark and HBase will be supported both in Scala and in |
| Java, with the exception of SparkSQL which will support any language that is |
| supported by Spark. For the remaining of this documentation we will focus on |
| Scala examples for now. |
| |
| The examples above illustrate how to do a foreachPartition with a connection. A |
| number of other Spark base functions are supported out of the box: |
| |
| // tag::spark_base_functions[] |
| `bulkPut`:: For massively parallel sending of puts to HBase |
| `bulkDelete`:: For massively parallel sending of deletes to HBase |
| `bulkGet`:: For massively parallel sending of gets to HBase to create a new RDD |
| `mapPartition`:: To do a Spark Map function with a Connection object to allow full |
| access to HBase |
| `hBaseRDD`:: To simplify a distributed scan to create a RDD |
| // end::spark_base_functions[] |
| |
| For examples of all these functionalities, see the HBase-Spark Module. |
| |
| == Spark Streaming |
| http://spark.apache.org/streaming/[Spark Streaming] is a micro batching stream |
| processing framework built on top of Spark. HBase and Spark Streaming make great |
| companions in that HBase can help serve the following benefits alongside Spark |
| Streaming. |
| |
| * A place to grab reference data or profile data on the fly |
| * A place to store counts or aggregates in a way that supports Spark Streaming |
| promise of _only once processing_. |
| |
| The HBase-Spark module’s integration points with Spark Streaming are similar to |
| its normal Spark integration points, in that the following commands are possible |
| straight off a Spark Streaming DStream. |
| |
| include::spark.adoc[tags=spark_base_functions] |
| |
| .`bulkPut` Example with DStreams |
| ==== |
| |
| Below is an example of bulkPut with DStreams. It is very close in feel to the RDD |
| bulk put. |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| val hbaseContext = new HBaseContext(sc, config) |
| val ssc = new StreamingContext(sc, Milliseconds(200)) |
| |
| val rdd1 = ... |
| val rdd2 = ... |
| |
| val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte], |
| Array[Byte], Array[Byte])])]]() |
| |
| queue += rdd1 |
| queue += rdd2 |
| |
| val dStream = ssc.queueStream(queue) |
| |
| dStream.hbaseBulkPut( |
| hbaseContext, |
| TableName.valueOf(tableName), |
| (putRecord) => { |
| val put = new Put(putRecord._1) |
| putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) |
| put |
| }) |
| ---- |
| |
| There are three inputs to the `hbaseBulkPut` function. |
| . The hbaseContext that carries the configuration boardcast information link us |
| to the HBase Connections in the executors |
| . The table name of the table we are putting data into |
| . A function that will convert a record in the DStream into an HBase Put object. |
| ==== |
| |
| == Bulk Load |
| |
| There are two options for bulk loading data into HBase with Spark. There is the |
| basic bulk load functionality that will work for cases where your rows have |
| millions of columns and cases where your columns are not consolidated and |
| partitions before the on the map side of the Spark bulk load process. |
| |
| There is also a thin record bulk load option with Spark, this second option is |
| designed for tables that have less then 10k columns per row. The advantage |
| of this second option is higher throughput and less over all load on the Spark |
| shuffle operation. |
| |
| Both implementations work more or less like the MapReduce bulk load process in |
| that a partitioner partitions the rowkeys based on region splits and |
| the row keys are sent to the reducers in order, so that HFiles can be written |
| out directly from the reduce phase. |
| |
| In Spark terms, the bulk load will be implemented around a the Spark |
| `repartitionAndSortWithinPartitions` followed by a Spark `foreachPartition`. |
| |
| First lets look at an example of using the basic bulk load functionality |
| |
| .Bulk Loading Example |
| ==== |
| |
| The following example shows bulk loading with Spark. |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| val hbaseContext = new HBaseContext(sc, config) |
| |
| val stagingFolder = ... |
| val rdd = sc.parallelize(Array( |
| (Bytes.toBytes("1"), |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), |
| (Bytes.toBytes("3"), |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... |
| |
| rdd.hbaseBulkLoad(TableName.valueOf(tableName), |
| t => { |
| val rowKey = t._1 |
| val family:Array[Byte] = t._2(0)._1 |
| val qualifier = t._2(0)._2 |
| val value = t._2(0)._3 |
| |
| val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) |
| |
| Seq((keyFamilyQualifier, value)).iterator |
| }, |
| stagingFolder.getPath) |
| |
| val load = new LoadIncrementalHFiles(config) |
| load.doBulkLoad(new Path(stagingFolder.getPath), |
| conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) |
| ---- |
| ==== |
| |
| The `hbaseBulkLoad` function takes three required parameters: |
| |
| . The table name of the table we intend to bulk load too |
| |
| . A function that will convert a record in the RDD to a tuple key value par. With |
| the tuple key being a KeyFamilyQualifer object and the value being the cell value. |
| The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier. |
| The shuffle will partition on the RowKey but will sort by all three values. |
| |
| . The temporary path for the HFile to be written out too |
| |
| Following the Spark bulk load command, use the HBase's LoadIncrementalHFiles object |
| to load the newly created HFiles into HBase. |
| |
| .Additional Parameters for Bulk Loading with Spark |
| |
| You can set the following attributes with additional parameter options on hbaseBulkLoad. |
| |
| * Max file size of the HFiles |
| * A flag to exclude HFiles from compactions |
| * Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding |
| |
| .Using Additional Parameters |
| ==== |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| val hbaseContext = new HBaseContext(sc, config) |
| |
| val stagingFolder = ... |
| val rdd = sc.parallelize(Array( |
| (Bytes.toBytes("1"), |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), |
| (Bytes.toBytes("3"), |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... |
| |
| val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions] |
| val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX") |
| |
| familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options) |
| |
| rdd.hbaseBulkLoad(TableName.valueOf(tableName), |
| t => { |
| val rowKey = t._1 |
| val family:Array[Byte] = t._2(0)._1 |
| val qualifier = t._2(0)._2 |
| val value = t._2(0)._3 |
| |
| val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) |
| |
| Seq((keyFamilyQualifier, value)).iterator |
| }, |
| stagingFolder.getPath, |
| familyHBaseWriterOptions, |
| compactionExclude = false, |
| HConstants.DEFAULT_MAX_FILE_SIZE) |
| |
| val load = new LoadIncrementalHFiles(config) |
| load.doBulkLoad(new Path(stagingFolder.getPath), |
| conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) |
| ---- |
| ==== |
| |
| Now lets look at how you would call the thin record bulk load implementation |
| |
| .Using thin record bulk load |
| ==== |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| val hbaseContext = new HBaseContext(sc, config) |
| |
| val stagingFolder = ... |
| val rdd = sc.parallelize(Array( |
| ("1", |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), |
| ("3", |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... |
| |
| rdd.hbaseBulkLoadThinRows(hbaseContext, |
| TableName.valueOf(tableName), |
| t => { |
| val rowKey = t._1 |
| |
| val familyQualifiersValues = new FamiliesQualifiersValues |
| t._2.foreach(f => { |
| val family:Array[Byte] = f._1 |
| val qualifier = f._2 |
| val value:Array[Byte] = f._3 |
| |
| familyQualifiersValues +=(family, qualifier, value) |
| }) |
| (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues) |
| }, |
| stagingFolder.getPath, |
| new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], |
| compactionExclude = false, |
| 20) |
| |
| val load = new LoadIncrementalHFiles(config) |
| load.doBulkLoad(new Path(stagingFolder.getPath), |
| conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) |
| ---- |
| ==== |
| |
| Note that the big difference in using bulk load for thin rows is the function |
| returns a tuple with the first value being the row key and the second value |
| being an object of FamiliesQualifiersValues, which will contain all the |
| values for this row for all column families. |
| |
| == SparkSQL/DataFrames |
| |
| HBase-Spark Connector (in HBase-Spark Module) leverages |
| link:https://databricks.com/blog/2015/01/09/spark-sql-data-sources-api-unified-data-access-for-the-spark-platform.html[DataSource API] |
| (link:https://issues.apache.org/jira/browse/SPARK-3247[SPARK-3247]) |
| introduced in Spark-1.2.0, bridges the gap between simple HBase KV store and complex |
| relational SQL queries and enables users to perform complex data analytical work |
| on top of HBase using Spark. HBase Dataframe is a standard Spark Dataframe, and is able to |
| interact with any other data sources such as Hive, Orc, Parquet, JSON, etc. |
| HBase-Spark Connector applies critical techniques such as partition pruning, column pruning, |
| predicate pushdown and data locality. |
| |
| To use HBase-Spark connector, users need to define the Catalog for the schema mapping |
| between HBase and Spark tables, prepare the data and populate the HBase table, |
| then load HBase DataFrame. After that, users can do integrated query and access records |
| in HBase table with SQL query. Following illustrates the basic procedure. |
| |
| === Define catalog |
| |
| [source, scala] |
| ---- |
| def catalog = s"""{ |
| |"table":{"namespace":"default", "name":"table1"}, |
| |"rowkey":"key", |
| |"columns":{ |
| |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |
| |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"}, |
| |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |
| |"col3":{"cf":"cf3", "col":"col3", "type":"float"}, |
| |"col4":{"cf":"cf4", "col":"col4", "type":"int"}, |
| |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"}, |
| |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"}, |
| |"col7":{"cf":"cf7", "col":"col7", "type":"string"}, |
| |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"} |
| |} |
| |}""".stripMargin |
| ---- |
| |
| Catalog defines a mapping between HBase and Spark tables. There are two critical parts of this catalog. |
| One is the rowkey definition and the other is the mapping between table column in Spark and |
| the column family and column qualifier in HBase. The above defines a schema for a HBase table |
| with name as table1, row key as key and a number of columns (col1 `-` col8). Note that the rowkey |
| also has to be defined in details as a column (col0), which has a specific cf (rowkey). |
| |
| === Save the DataFrame |
| |
| [source, scala] |
| ---- |
| case class HBaseRecord( |
| col0: String, |
| col1: Boolean, |
| col2: Double, |
| col3: Float, |
| col4: Int, |
| col5: Long, |
| col6: Short, |
| col7: String, |
| col8: Byte) |
| |
| object HBaseRecord |
| { |
| def apply(i: Int, t: String): HBaseRecord = { |
| val s = s"""row${"%03d".format(i)}""" |
| HBaseRecord(s, |
| i % 2 == 0, |
| i.toDouble, |
| i.toFloat, |
| i, |
| i.toLong, |
| i.toShort, |
| s"String$i: $t", |
| i.toByte) |
| } |
| } |
| |
| val data = (0 to 255).map { i => HBaseRecord(i, "extra")} |
| |
| sc.parallelize(data).toDF.write.options( |
| Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) |
| .format("org.apache.hadoop.hbase.spark ") |
| .save() |
| |
| ---- |
| `data` prepared by the user is a local Scala collection which has 256 HBaseRecord objects. |
| `sc.parallelize(data)` function distributes `data` to form an RDD. `toDF` returns a DataFrame. |
| `write` function returns a DataFrameWriter used to write the DataFrame to external storage |
| systems (e.g. HBase here). Given a DataFrame with specified schema `catalog`, `save` function |
| will create an HBase table with 5 regions and save the DataFrame inside. |
| |
| === Load the DataFrame |
| |
| [source, scala] |
| ---- |
| def withCatalog(cat: String): DataFrame = { |
| sqlContext |
| .read |
| .options(Map(HBaseTableCatalog.tableCatalog->cat)) |
| .format("org.apache.hadoop.hbase.spark") |
| .load() |
| } |
| val df = withCatalog(catalog) |
| ---- |
| In ‘withCatalog’ function, sqlContext is a variable of SQLContext, which is the entry point |
| for working with structured data (rows and columns) in Spark. |
| `read` returns a DataFrameReader that can be used to read data in as a DataFrame. |
| `option` function adds input options for the underlying data source to the DataFrameReader, |
| and `format` function specifies the input data source format for the DataFrameReader. |
| The `load()` function loads input in as a DataFrame. The date frame `df` returned |
| by `withCatalog` function could be used to access HBase table, such as 4.4 and 4.5. |
| |
| === Language Integrated Query |
| |
| [source, scala] |
| ---- |
| val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") || |
| $"col0" === "row005" || |
| $"col0" <= "row005") |
| .select("col0", "col1", "col4") |
| s.show |
| ---- |
| DataFrame can do various operations, such as join, sort, select, filter, orderBy and so on. |
| `df.filter` above filters rows using the given SQL expression. `select` selects a set of columns: |
| `col0`, `col1` and `col4`. |
| |
| === SQL Query |
| |
| [source, scala] |
| ---- |
| df.registerTempTable("table1") |
| sqlContext.sql("select count(col1) from table1").show |
| ---- |
| |
| `registerTempTable` registers `df` DataFrame as a temporary table using the table name `table1`. |
| The lifetime of this temporary table is tied to the SQLContext that was used to create `df`. |
| `sqlContext.sql` function allows the user to execute SQL queries. |
| |
| === Others |
| |
| .Query with different timestamps |
| ==== |
| In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP, |
| MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records with |
| different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP. In the meantime, |
| use concrete value instead of tsSpecified and oldMs in the examples below. |
| |
| The example below shows how to load df DataFrame with different timestamps. |
| tsSpecified is specified by the user. |
| HBaseTableCatalog defines the HBase and Relation relation schema. |
| writeCatalog defines catalog for the schema mapping. |
| |
| [source, scala] |
| ---- |
| val df = sqlContext.read |
| .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString)) |
| .format("org.apache.hadoop.hbase.spark") |
| .load() |
| ---- |
| |
| The example below shows how to load df DataFrame with different time ranges. |
| oldMs is specified by the user. |
| |
| [source, scala] |
| ---- |
| val df = sqlContext.read |
| .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0", |
| HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString)) |
| .format("org.apache.hadoop.hbase.spark") |
| .load() |
| ---- |
| After loading df DataFrame, users can query data. |
| |
| [source, scala] |
| ---- |
| df.registerTempTable("table") |
| sqlContext.sql("select count(col1) from table").show |
| ---- |
| ==== |
| |
| .Native Avro support |
| ==== |
| HBase-Spark Connector support different data formats like Avro, Jason, etc. The use case below |
| shows how spark supports Avro. User can persist the Avro record into HBase directly. Internally, |
| the Avro schema is converted to a native Spark Catalyst data type automatically. |
| Note that both key-value parts in an HBase table can be defined in Avro format. |
| |
| 1) Define catalog for the schema mapping: |
| |
| [source, scala] |
| ---- |
| def catalog = s"""{ |
| |"table":{"namespace":"default", "name":"Avrotable"}, |
| |"rowkey":"key", |
| |"columns":{ |
| |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |
| |"col1":{"cf":"cf1", "col":"col1", "type":"binary"} |
| |} |
| |}""".stripMargin |
| ---- |
| |
| `catalog` is a schema for a HBase table named `Avrotable`. row key as key and |
| one column col1. The rowkey also has to be defined in details as a column (col0), |
| which has a specific cf (rowkey). |
| |
| 2) Prepare the Data: |
| |
| [source, scala] |
| ---- |
| object AvroHBaseRecord { |
| val schemaString = |
| s"""{"namespace": "example.avro", |
| | "type": "record", "name": "User", |
| | "fields": [ |
| | {"name": "name", "type": "string"}, |
| | {"name": "favorite_number", "type": ["int", "null"]}, |
| | {"name": "favorite_color", "type": ["string", "null"]}, |
| | {"name": "favorite_array", "type": {"type": "array", "items": "string"}}, |
| | {"name": "favorite_map", "type": {"type": "map", "values": "int"}} |
| | ] }""".stripMargin |
| |
| val avroSchema: Schema = { |
| val p = new Schema.Parser |
| p.parse(schemaString) |
| } |
| |
| def apply(i: Int): AvroHBaseRecord = { |
| val user = new GenericData.Record(avroSchema); |
| user.put("name", s"name${"%03d".format(i)}") |
| user.put("favorite_number", i) |
| user.put("favorite_color", s"color${"%03d".format(i)}") |
| val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema()) |
| favoriteArray.add(s"number${i}") |
| favoriteArray.add(s"number${i+1}") |
| user.put("favorite_array", favoriteArray) |
| import collection.JavaConverters._ |
| val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava |
| user.put("favorite_map", favoriteMap) |
| val avroByte = AvroSedes.serialize(user, avroSchema) |
| AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte) |
| } |
| } |
| |
| val data = (0 to 255).map { i => |
| AvroHBaseRecord(i) |
| } |
| ---- |
| |
| `schemaString` is defined first, then it is parsed to get `avroSchema`. `avroSchema` is used to |
| generate `AvroHBaseRecord`. `data` prepared by users is a local Scala collection |
| which has 256 `AvroHBaseRecord` objects. |
| |
| 3) Save DataFrame: |
| |
| [source, scala] |
| ---- |
| sc.parallelize(data).toDF.write.options( |
| Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")) |
| .format("org.apache.spark.sql.execution.datasources.hbase") |
| .save() |
| ---- |
| |
| Given a data frame with specified schema `catalog`, above will create an HBase table with 5 |
| regions and save the data frame inside. |
| |
| 4) Load the DataFrame |
| |
| [source, scala] |
| ---- |
| def avroCatalog = s"""{ |
| |"table":{"namespace":"default", "name":"avrotable"}, |
| |"rowkey":"key", |
| |"columns":{ |
| |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |
| |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"} |
| |} |
| |}""".stripMargin |
| |
| def withCatalog(cat: String): DataFrame = { |
| sqlContext |
| .read |
| .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog)) |
| .format("org.apache.spark.sql.execution.datasources.hbase") |
| .load() |
| } |
| val df = withCatalog(catalog) |
| ---- |
| |
| In `withCatalog` function, `read` returns a DataFrameReader that can be used to read data in as a DataFrame. |
| The `option` function adds input options for the underlying data source to the DataFrameReader. |
| There are two options: one is to set `avroSchema` as `AvroHBaseRecord.schemaString`, and one is to |
| set `HBaseTableCatalog.tableCatalog` as `avroCatalog`. The `load()` function loads input in as a DataFrame. |
| The date frame `df` returned by `withCatalog` function could be used to access the HBase table. |
| |
| 5) SQL Query |
| |
| [source, scala] |
| ---- |
| df.registerTempTable("avrotable") |
| val c = sqlContext.sql("select count(1) from avrotable"). |
| ---- |
| |
| After loading df DataFrame, users can query data. registerTempTable registers df DataFrame |
| as a temporary table using the table name avrotable. `sqlContext.sql` function allows the |
| user to execute SQL queries. |
| ==== |