| //// |
| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| . . http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| //// |
| |
| [[spark]] |
| = HBase and Spark |
| :doctype: book |
| :numbered: |
| :toc: left |
| :icons: font |
| :experimental: |
| |
| link:http://spark.apache.org/[Apache Spark] is a software framework that is used |
| to process data in memory in a distributed manner, and is replacing MapReduce in |
| many use cases. |
| |
| Spark itself is out of scope of this document, please refer to the Spark site for |
| more information on the Spark project and subprojects. This document will focus |
| on 4 main interaction points between Spark and HBase. Those interaction points are: |
| |
| Basic Spark:: |
| The ability to have an HBase Connection at any point in your Spark DAG. |
| Spark Streaming:: |
| The ability to have an HBase Connection at any point in your Spark Streaming |
| application. |
| Spark Bulk Load:: |
| The ability to write directly to HBase HFiles for bulk insertion into HBase |
| SparkSQL/DataFrames:: |
| The ability to write SparkSQL that draws on tables that are represented in HBase. |
| |
| The following sections will walk through examples of all these interaction points. |
| |
| == Basic Spark |
| |
| This section discusses Spark HBase integration at the lowest and simplest levels. |
| All the other interaction points are built upon the concepts that will be described |
| here. |
| |
| At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext |
| takes in HBase configurations and pushes them to the Spark executors. This allows |
| us to have an HBase Connection per Spark Executor in a static location. |
| |
| For reference, Spark Executors can be on the same nodes as the Region Servers or |
| on different nodes there is no dependence of co-location. Think of every Spark |
| Executor as a multi-threaded client application. This allows any Spark Tasks |
| running on the executors to access the shared Connection object. |
| |
| .HBaseContext Usage Example |
| ==== |
| |
| This example shows how HBaseContext can be used to do a `foreachPartition` on a RDD |
| in Scala: |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| ... |
| |
| val hbaseContext = new HBaseContext(sc, config) |
| |
| rdd.hbaseForeachPartition(hbaseContext, (it, conn) => { |
| val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1")) |
| it.foreach((putRecord) => { |
| . val put = new Put(putRecord._1) |
| . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) |
| . bufferedMutator.mutate(put) |
| }) |
| bufferedMutator.flush() |
| bufferedMutator.close() |
| }) |
| ---- |
| |
| Here is the same example implemented in Java: |
| |
| [source, java] |
| ---- |
| JavaSparkContext jsc = new JavaSparkContext(sparkConf); |
| |
| try { |
| List<byte[]> list = new ArrayList<>(); |
| list.add(Bytes.toBytes("1")); |
| ... |
| list.add(Bytes.toBytes("5")); |
| |
| JavaRDD<byte[]> rdd = jsc.parallelize(list); |
| Configuration conf = HBaseConfiguration.create(); |
| |
| JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf); |
| |
| hbaseContext.foreachPartition(rdd, |
| new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() { |
| public void call(Tuple2<Iterator<byte[]>, Connection> t) |
| throws Exception { |
| Table table = t._2().getTable(TableName.valueOf(tableName)); |
| BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName)); |
| while (t._1().hasNext()) { |
| byte[] b = t._1().next(); |
| Result r = table.get(new Get(b)); |
| if (r.getExists()) { |
| mutator.mutate(new Put(b)); |
| } |
| } |
| |
| mutator.flush(); |
| mutator.close(); |
| table.close(); |
| } |
| }); |
| } finally { |
| jsc.stop(); |
| } |
| ---- |
| ==== |
| |
| All functionality between Spark and HBase will be supported both in Scala and in |
| Java, with the exception of SparkSQL which will support any language that is |
| supported by Spark. For the remaining of this documentation we will focus on |
| Scala examples for now. |
| |
| The examples above illustrate how to do a foreachPartition with a connection. A |
| number of other Spark base functions are supported out of the box: |
| |
| // tag::spark_base_functions[] |
| `bulkPut`:: For massively parallel sending of puts to HBase |
| `bulkDelete`:: For massively parallel sending of deletes to HBase |
| `bulkGet`:: For massively parallel sending of gets to HBase to create a new RDD |
| `mapPartition`:: To do a Spark Map function with a Connection object to allow full |
| access to HBase |
| `hBaseRDD`:: To simplify a distributed scan to create a RDD |
| // end::spark_base_functions[] |
| |
| For examples of all these functionalities, see the HBase-Spark Module. |
| |
| == Spark Streaming |
| http://spark.apache.org/streaming/[Spark Streaming] is a micro batching stream |
| processing framework built on top of Spark. HBase and Spark Streaming make great |
| companions in that HBase can help serve the following benefits alongside Spark |
| Streaming. |
| |
| * A place to grab reference data or profile data on the fly |
| * A place to store counts or aggregates in a way that supports Spark Streaming |
| promise of _only once processing_. |
| |
| The HBase-Spark module’s integration points with Spark Streaming are similar to |
| its normal Spark integration points, in that the following commands are possible |
| straight off a Spark Streaming DStream. |
| |
| include::spark.adoc[tags=spark_base_functions] |
| |
| .`bulkPut` Example with DStreams |
| ==== |
| |
| Below is an example of bulkPut with DStreams. It is very close in feel to the RDD |
| bulk put. |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| val hbaseContext = new HBaseContext(sc, config) |
| val ssc = new StreamingContext(sc, Milliseconds(200)) |
| |
| val rdd1 = ... |
| val rdd2 = ... |
| |
| val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte], |
| Array[Byte], Array[Byte])])]]() |
| |
| queue += rdd1 |
| queue += rdd2 |
| |
| val dStream = ssc.queueStream(queue) |
| |
| dStream.hbaseBulkPut( |
| hbaseContext, |
| TableName.valueOf(tableName), |
| (putRecord) => { |
| val put = new Put(putRecord._1) |
| putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3)) |
| put |
| }) |
| ---- |
| |
| There are three inputs to the `hbaseBulkPut` function. |
| . The hbaseContext that carries the configuration boardcast information link us |
| to the HBase Connections in the executors |
| . The table name of the table we are putting data into |
| . A function that will convert a record in the DStream into an HBase Put object. |
| ==== |
| |
| == Bulk Load |
| |
| There are two options for bulk loading data into HBase with Spark. There is the |
| basic bulk load functionality that will work for cases where your rows have |
| millions of columns and cases where your columns are not consolidated and |
| partitions before the on the map side of the Spark bulk load process. |
| |
| There is also a thin record bulk load option with Spark, this second option is |
| designed for tables that have less then 10k columns per row. The advantage |
| of this second option is higher throughput and less over all load on the Spark |
| shuffle operation. |
| |
| Both implementations work more or less like the MapReduce bulk load process in |
| that a partitioner partitions the rowkeys based on region splits and |
| the row keys are sent to the reducers in order, so that HFiles can be written |
| out directly from the reduce phase. |
| |
| In Spark terms, the bulk load will be implemented around a the Spark |
| `repartitionAndSortWithinPartitions` followed by a Spark `foreachPartition`. |
| |
| First lets look at an example of using the basic bulk load functionality |
| |
| .Bulk Loading Example |
| ==== |
| |
| The following example shows bulk loading with Spark. |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| val hbaseContext = new HBaseContext(sc, config) |
| |
| val stagingFolder = ... |
| val rdd = sc.parallelize(Array( |
| (Bytes.toBytes("1"), |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), |
| (Bytes.toBytes("3"), |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... |
| |
| rdd.hbaseBulkLoad(TableName.valueOf(tableName), |
| t => { |
| val rowKey = t._1 |
| val family:Array[Byte] = t._2(0)._1 |
| val qualifier = t._2(0)._2 |
| val value = t._2(0)._3 |
| |
| val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) |
| |
| Seq((keyFamilyQualifier, value)).iterator |
| }, |
| stagingFolder.getPath) |
| |
| val load = new LoadIncrementalHFiles(config) |
| load.doBulkLoad(new Path(stagingFolder.getPath), |
| conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) |
| ---- |
| ==== |
| |
| The `hbaseBulkLoad` function takes three required parameters: |
| |
| . The table name of the table we intend to bulk load too |
| |
| . A function that will convert a record in the RDD to a tuple key value par. With |
| the tuple key being a KeyFamilyQualifer object and the value being the cell value. |
| The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier. |
| The shuffle will partition on the RowKey but will sort by all three values. |
| |
| . The temporary path for the HFile to be written out too |
| |
| Following the Spark bulk load command, use the HBase's LoadIncrementalHFiles object |
| to load the newly created HFiles into HBase. |
| |
| .Additional Parameters for Bulk Loading with Spark |
| |
| You can set the following attributes with additional parameter options on hbaseBulkLoad. |
| |
| * Max file size of the HFiles |
| * A flag to exclude HFiles from compactions |
| * Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding |
| |
| .Using Additional Parameters |
| ==== |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| val hbaseContext = new HBaseContext(sc, config) |
| |
| val stagingFolder = ... |
| val rdd = sc.parallelize(Array( |
| (Bytes.toBytes("1"), |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), |
| (Bytes.toBytes("3"), |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... |
| |
| val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions] |
| val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX") |
| |
| familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options) |
| |
| rdd.hbaseBulkLoad(TableName.valueOf(tableName), |
| t => { |
| val rowKey = t._1 |
| val family:Array[Byte] = t._2(0)._1 |
| val qualifier = t._2(0)._2 |
| val value = t._2(0)._3 |
| |
| val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier) |
| |
| Seq((keyFamilyQualifier, value)).iterator |
| }, |
| stagingFolder.getPath, |
| familyHBaseWriterOptions, |
| compactionExclude = false, |
| HConstants.DEFAULT_MAX_FILE_SIZE) |
| |
| val load = new LoadIncrementalHFiles(config) |
| load.doBulkLoad(new Path(stagingFolder.getPath), |
| conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) |
| ---- |
| ==== |
| |
| Now lets look at how you would call the thin record bulk load implementation |
| |
| .Using thin record bulk load |
| ==== |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| val hbaseContext = new HBaseContext(sc, config) |
| |
| val stagingFolder = ... |
| val rdd = sc.parallelize(Array( |
| ("1", |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), |
| ("3", |
| (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ... |
| |
| rdd.hbaseBulkLoadThinRows(hbaseContext, |
| TableName.valueOf(tableName), |
| t => { |
| val rowKey = t._1 |
| |
| val familyQualifiersValues = new FamiliesQualifiersValues |
| t._2.foreach(f => { |
| val family:Array[Byte] = f._1 |
| val qualifier = f._2 |
| val value:Array[Byte] = f._3 |
| |
| familyQualifiersValues +=(family, qualifier, value) |
| }) |
| (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues) |
| }, |
| stagingFolder.getPath, |
| new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions], |
| compactionExclude = false, |
| 20) |
| |
| val load = new LoadIncrementalHFiles(config) |
| load.doBulkLoad(new Path(stagingFolder.getPath), |
| conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName))) |
| ---- |
| ==== |
| |
| Note that the big difference in using bulk load for thin rows is the function |
| returns a tuple with the first value being the row key and the second value |
| being an object of FamiliesQualifiersValues, which will contain all the |
| values for this row for all column families. |
| |
| |
| == SparkSQL/DataFrames |
| |
| http://spark.apache.org/sql/[SparkSQL] is a subproject of Spark that supports |
| SQL that will compute down to a Spark DAG. In addition,SparkSQL is a heavy user |
| of DataFrames. DataFrames are like RDDs with schema information. |
| |
| The HBase-Spark module includes support for Spark SQL and DataFrames, which allows |
| you to write SparkSQL directly on HBase tables. In addition the HBase-Spark |
| will push down query filtering logic to HBase. |
| |
| In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP, |
| MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records |
| with different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP. |
| In the meantime, use concrete value instead of tsSpecified and oldMs in the examples below. |
| |
| .Query with different timestamps |
| ==== |
| |
| The example below shows how to load df DataFrame with different timestamps. |
| tsSpecified is specified by the user. |
| HBaseTableCatalog defines the HBase and Relation relation schema. |
| writeCatalog defines catalog for the schema mapping. |
| ---- |
| val df = sqlContext.read |
| .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString)) |
| .format("org.apache.hadoop.hbase.spark") |
| .load() |
| ---- |
| |
| The example below shows how to load df DataFrame with different time ranges. |
| oldMs is specified by the user. |
| ---- |
| val df = sqlContext.read |
| .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0", |
| HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString)) |
| .format("org.apache.hadoop.hbase.spark") |
| .load() |
| ---- |
| |
| After loading df DataFrame, users can query data. |
| ---- |
| df.registerTempTable("table") |
| sqlContext.sql("select count(col1) from table").show |
| ---- |
| ==== |
| |
| === Predicate Push Down |
| |
| There are two examples of predicate push down in the HBase-Spark implementation. |
| The first example shows the push down of filtering logic on the RowKey. HBase-Spark |
| will reduce the filters on RowKeys down to a set of Get and/or Scan commands. |
| |
| NOTE: The Scans are distributed scans, rather than a single client scan operation. |
| |
| If the query looks something like the following, the logic will push down and get |
| the rows through 3 Gets and 0 Scans. We can do gets because all the operations |
| are `equal` operations. |
| |
| [source,sql] |
| ---- |
| SELECT |
| KEY_FIELD, |
| B_FIELD, |
| A_FIELD |
| FROM hbaseTmp |
| WHERE (KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3') |
| ---- |
| |
| Now let's look at an example where we will end up doing two scans on HBase. |
| |
| [source, sql] |
| ---- |
| SELECT |
| KEY_FIELD, |
| B_FIELD, |
| A_FIELD |
| FROM hbaseTmp |
| WHERE KEY_FIELD < 'get2' or KEY_FIELD > 'get3' |
| ---- |
| |
| In this example we will get 0 Gets and 2 Scans. One scan will load everything |
| from the first row in the table until “get2” and the second scan will get |
| everything from “get3” until the last row in the table. |
| |
| The next query is a good example of having a good deal of range checks. However |
| the ranges overlap. To the code will be smart enough to get the following data |
| in a single scan that encompasses all the data asked by the query. |
| |
| [source, sql] |
| ---- |
| SELECT |
| KEY_FIELD, |
| B_FIELD, |
| A_FIELD |
| FROM hbaseTmp |
| WHERE |
| (KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or |
| (KEY_FIELD > 'get3' and KEY_FIELD <= 'get5') |
| ---- |
| |
| The second example of push down functionality offered by the HBase-Spark module |
| is the ability to push down filter logic for column and cell fields. Just like |
| the RowKey logic, all query logic will be consolidated into the minimum number |
| of range checks and equal checks by sending a Filter object along with the Scan |
| with information about consolidated push down predicates |
| |
| .SparkSQL Code Example |
| ==== |
| This example shows how we can interact with HBase with SQL. |
| |
| [source, scala] |
| ---- |
| val sc = new SparkContext("local", "test") |
| val config = new HBaseConfiguration() |
| |
| new HBaseContext(sc, TEST_UTIL.getConfiguration) |
| val sqlContext = new SQLContext(sc) |
| |
| df = sqlContext.load("org.apache.hadoop.hbase.spark", |
| Map("hbase.columns.mapping" -> |
| "KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b", |
| "hbase.table" -> "t1")) |
| |
| df.registerTempTable("hbaseTmp") |
| |
| val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD FROM hbaseTmp " + |
| "WHERE " + |
| "(KEY_FIELD = 'get1' and B_FIELD < '3') or " + |
| "(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5) |
| ---- |
| |
| There are three major parts of this example that deserve explaining. |
| |
| The sqlContext.load function:: |
| In the sqlContext.load function we see two |
| parameters. The first of these parameters is pointing Spark to the HBase |
| DefaultSource class that will act as the interface between SparkSQL and HBase. |
| |
| A map of key value pairs:: |
| In this example we have two keys in our map, `hbase.columns.mapping` and |
| `hbase.table`. The `hbase.table` directs SparkSQL to use the given HBase table. |
| The `hbase.columns.mapping` key give us the logic to translate HBase columns to |
| SparkSQL columns. |
| + |
| The `hbase.columns.mapping` is a string that follows the following format |
| + |
| [source, scala] |
| ---- |
| (SparkSQL.ColumnName) (SparkSQL.ColumnType) (HBase.ColumnFamily):(HBase.Qualifier) |
| ---- |
| + |
| In the example below we see the definition of three fields. Because KEY_FIELD has |
| no ColumnFamily, it is the RowKey. |
| + |
| ---- |
| KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b |
| ---- |
| |
| The registerTempTable function:: |
| This is a SparkSQL function that allows us now to be free of Scala when accessing |
| our HBase table directly with SQL with the table name of "hbaseTmp". |
| |
| The last major point to note in the example is the `sqlContext.sql` function, which |
| allows the user to ask their questions in SQL which will be pushed down to the |
| DefaultSource code in the HBase-Spark module. The result of this command will be |
| a DataFrame with the Schema of KEY_FIELD and B_FIELD. |
| ==== |