blob: 88918aa366fc67c0c688ff323790ab2cc7ea130e [file] [log] [blame]
////
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
. . http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
////
[[spark]]
= HBase and Spark
:doctype: book
:numbered:
:toc: left
:icons: font
:experimental:
link:http://spark.apache.org/[Apache Spark] is a software framework that is used
to process data in memory in a distributed manner, and is replacing MapReduce in
many use cases.
Spark itself is out of scope of this document, please refer to the Spark site for
more information on the Spark project and subprojects. This document will focus
on 4 main interaction points between Spark and HBase. Those interaction points are:
Basic Spark::
The ability to have an HBase Connection at any point in your Spark DAG.
Spark Streaming::
The ability to have an HBase Connection at any point in your Spark Streaming
application.
Spark Bulk Load::
The ability to write directly to HBase HFiles for bulk insertion into HBase
SparkSQL/DataFrames::
The ability to write SparkSQL that draws on tables that are represented in HBase.
The following sections will walk through examples of all these interaction points.
== Basic Spark
This section discusses Spark HBase integration at the lowest and simplest levels.
All the other interaction points are built upon the concepts that will be described
here.
At the root of all Spark and HBase integration is the HBaseContext. The HBaseContext
takes in HBase configurations and pushes them to the Spark executors. This allows
us to have an HBase Connection per Spark Executor in a static location.
For reference, Spark Executors can be on the same nodes as the Region Servers or
on different nodes there is no dependence of co-location. Think of every Spark
Executor as a multi-threaded client application. This allows any Spark Tasks
running on the executors to access the shared Connection object.
.HBaseContext Usage Example
====
This example shows how HBaseContext can be used to do a `foreachPartition` on a RDD
in Scala:
[source, scala]
----
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
...
val hbaseContext = new HBaseContext(sc, config)
rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
it.foreach((putRecord) => {
. val put = new Put(putRecord._1)
. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
. bufferedMutator.mutate(put)
})
bufferedMutator.flush()
bufferedMutator.close()
})
----
Here is the same example implemented in Java:
[source, java]
----
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<byte[]> list = new ArrayList<>();
list.add(Bytes.toBytes("1"));
...
list.add(Bytes.toBytes("5"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.foreachPartition(rdd,
new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
public void call(Tuple2<Iterator<byte[]>, Connection> t)
throws Exception {
Table table = t._2().getTable(TableName.valueOf(tableName));
BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
while (t._1().hasNext()) {
byte[] b = t._1().next();
Result r = table.get(new Get(b));
if (r.getExists()) {
mutator.mutate(new Put(b));
}
}
mutator.flush();
mutator.close();
table.close();
}
});
} finally {
jsc.stop();
}
----
====
All functionality between Spark and HBase will be supported both in Scala and in
Java, with the exception of SparkSQL which will support any language that is
supported by Spark. For the remaining of this documentation we will focus on
Scala examples for now.
The examples above illustrate how to do a foreachPartition with a connection. A
number of other Spark base functions are supported out of the box:
// tag::spark_base_functions[]
`bulkPut`:: For massively parallel sending of puts to HBase
`bulkDelete`:: For massively parallel sending of deletes to HBase
`bulkGet`:: For massively parallel sending of gets to HBase to create a new RDD
`mapPartition`:: To do a Spark Map function with a Connection object to allow full
access to HBase
`hBaseRDD`:: To simplify a distributed scan to create a RDD
// end::spark_base_functions[]
For examples of all these functionalities, see the HBase-Spark Module.
== Spark Streaming
http://spark.apache.org/streaming/[Spark Streaming] is a micro batching stream
processing framework built on top of Spark. HBase and Spark Streaming make great
companions in that HBase can help serve the following benefits alongside Spark
Streaming.
* A place to grab reference data or profile data on the fly
* A place to store counts or aggregates in a way that supports Spark Streaming
promise of _only once processing_.
The HBase-Spark module’s integration points with Spark Streaming are similar to
its normal Spark integration points, in that the following commands are possible
straight off a Spark Streaming DStream.
include::spark.adoc[tags=spark_base_functions]
.`bulkPut` Example with DStreams
====
Below is an example of bulkPut with DStreams. It is very close in feel to the RDD
bulk put.
[source, scala]
----
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val ssc = new StreamingContext(sc, Milliseconds(200))
val rdd1 = ...
val rdd2 = ...
val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
Array[Byte], Array[Byte])])]]()
queue += rdd1
queue += rdd2
val dStream = ssc.queueStream(queue)
dStream.hbaseBulkPut(
hbaseContext,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
})
----
There are three inputs to the `hbaseBulkPut` function.
. The hbaseContext that carries the configuration boardcast information link us
to the HBase Connections in the executors
. The table name of the table we are putting data into
. A function that will convert a record in the DStream into an HBase Put object.
====
== Bulk Load
There are two options for bulk loading data into HBase with Spark. There is the
basic bulk load functionality that will work for cases where your rows have
millions of columns and cases where your columns are not consolidated and
partitions before the on the map side of the Spark bulk load process.
There is also a thin record bulk load option with Spark, this second option is
designed for tables that have less then 10k columns per row. The advantage
of this second option is higher throughput and less over all load on the Spark
shuffle operation.
Both implementations work more or less like the MapReduce bulk load process in
that a partitioner partitions the rowkeys based on region splits and
the row keys are sent to the reducers in order, so that HFiles can be written
out directly from the reduce phase.
In Spark terms, the bulk load will be implemented around a the Spark
`repartitionAndSortWithinPartitions` followed by a Spark `foreachPartition`.
First lets look at an example of using the basic bulk load functionality
.Bulk Loading Example
====
The following example shows bulk loading with Spark.
[source, scala]
----
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
rdd.hbaseBulkLoad(TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family:Array[Byte] = t._2(0)._1
val qualifier = t._2(0)._2
val value = t._2(0)._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder.getPath)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
----
====
The `hbaseBulkLoad` function takes three required parameters:
. The table name of the table we intend to bulk load too
. A function that will convert a record in the RDD to a tuple key value par. With
the tuple key being a KeyFamilyQualifer object and the value being the cell value.
The KeyFamilyQualifer object will hold the RowKey, Column Family, and Column Qualifier.
The shuffle will partition on the RowKey but will sort by all three values.
. The temporary path for the HFile to be written out too
Following the Spark bulk load command, use the HBase's LoadIncrementalHFiles object
to load the newly created HFiles into HBase.
.Additional Parameters for Bulk Loading with Spark
You can set the following attributes with additional parameter options on hbaseBulkLoad.
* Max file size of the HFiles
* A flag to exclude HFiles from compactions
* Column Family settings for compression, bloomType, blockSize, and dataBlockEncoding
.Using Additional Parameters
====
[source, scala]
----
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)
rdd.hbaseBulkLoad(TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family:Array[Byte] = t._2(0)._1
val qualifier = t._2(0)._2
val value = t._2(0)._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder.getPath,
familyHBaseWriterOptions,
compactionExclude = false,
HConstants.DEFAULT_MAX_FILE_SIZE)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
----
====
Now lets look at how you would call the thin record bulk load implementation
.Using thin record bulk load
====
[source, scala]
----
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
("1",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
("3",
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
rdd.hbaseBulkLoadThinRows(hbaseContext,
TableName.valueOf(tableName),
t => {
val rowKey = t._1
val familyQualifiersValues = new FamiliesQualifiersValues
t._2.foreach(f => {
val family:Array[Byte] = f._1
val qualifier = f._2
val value:Array[Byte] = f._3
familyQualifiersValues +=(family, qualifier, value)
})
(new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
},
stagingFolder.getPath,
new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude = false,
20)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
----
====
Note that the big difference in using bulk load for thin rows is the function
returns a tuple with the first value being the row key and the second value
being an object of FamiliesQualifiersValues, which will contain all the
values for this row for all column families.
== SparkSQL/DataFrames
http://spark.apache.org/sql/[SparkSQL] is a subproject of Spark that supports
SQL that will compute down to a Spark DAG. In addition,SparkSQL is a heavy user
of DataFrames. DataFrames are like RDDs with schema information.
The HBase-Spark module includes support for Spark SQL and DataFrames, which allows
you to write SparkSQL directly on HBase tables. In addition the HBase-Spark
will push down query filtering logic to HBase.
In HBaseSparkConf, four parameters related to timestamp can be set. They are TIMESTAMP,
MIN_TIMESTAMP, MAX_TIMESTAMP and MAX_VERSIONS respectively. Users can query records
with different timestamps or time ranges with MIN_TIMESTAMP and MAX_TIMESTAMP.
In the meantime, use concrete value instead of tsSpecified and oldMs in the examples below.
.Query with different timestamps
====
The example below shows how to load df DataFrame with different timestamps.
tsSpecified is specified by the user.
HBaseTableCatalog defines the HBase and Relation relation schema.
writeCatalog defines catalog for the schema mapping.
----
val df = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
.format("org.apache.hadoop.hbase.spark")
.load()
----
The example below shows how to load df DataFrame with different time ranges.
oldMs is specified by the user.
----
val df = sqlContext.read
.options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
.format("org.apache.hadoop.hbase.spark")
.load()
----
After loading df DataFrame, users can query data.
----
df.registerTempTable("table")
sqlContext.sql("select count(col1) from table").show
----
====
=== Predicate Push Down
There are two examples of predicate push down in the HBase-Spark implementation.
The first example shows the push down of filtering logic on the RowKey. HBase-Spark
will reduce the filters on RowKeys down to a set of Get and/or Scan commands.
NOTE: The Scans are distributed scans, rather than a single client scan operation.
If the query looks something like the following, the logic will push down and get
the rows through 3 Gets and 0 Scans. We can do gets because all the operations
are `equal` operations.
[source,sql]
----
SELECT
KEY_FIELD,
B_FIELD,
A_FIELD
FROM hbaseTmp
WHERE (KEY_FIELD = 'get1' or KEY_FIELD = 'get2' or KEY_FIELD = 'get3')
----
Now let's look at an example where we will end up doing two scans on HBase.
[source, sql]
----
SELECT
KEY_FIELD,
B_FIELD,
A_FIELD
FROM hbaseTmp
WHERE KEY_FIELD < 'get2' or KEY_FIELD > 'get3'
----
In this example we will get 0 Gets and 2 Scans. One scan will load everything
from the first row in the table until “get2” and the second scan will get
everything from “get3” until the last row in the table.
The next query is a good example of having a good deal of range checks. However
the ranges overlap. To the code will be smart enough to get the following data
in a single scan that encompasses all the data asked by the query.
[source, sql]
----
SELECT
KEY_FIELD,
B_FIELD,
A_FIELD
FROM hbaseTmp
WHERE
(KEY_FIELD >= 'get1' and KEY_FIELD <= 'get3') or
(KEY_FIELD > 'get3' and KEY_FIELD <= 'get5')
----
The second example of push down functionality offered by the HBase-Spark module
is the ability to push down filter logic for column and cell fields. Just like
the RowKey logic, all query logic will be consolidated into the minimum number
of range checks and equal checks by sending a Filter object along with the Scan
with information about consolidated push down predicates
.SparkSQL Code Example
====
This example shows how we can interact with HBase with SQL.
[source, scala]
----
val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
new HBaseContext(sc, TEST_UTIL.getConfiguration)
val sqlContext = new SQLContext(sc)
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map("hbase.columns.mapping" ->
"KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b",
"hbase.table" -> "t1"))
df.registerTempTable("hbaseTmp")
val results = sqlContext.sql("SELECT KEY_FIELD, B_FIELD FROM hbaseTmp " +
"WHERE " +
"(KEY_FIELD = 'get1' and B_FIELD < '3') or " +
"(KEY_FIELD >= 'get3' and B_FIELD = '8')").take(5)
----
There are three major parts of this example that deserve explaining.
The sqlContext.load function::
In the sqlContext.load function we see two
parameters. The first of these parameters is pointing Spark to the HBase
DefaultSource class that will act as the interface between SparkSQL and HBase.
A map of key value pairs::
In this example we have two keys in our map, `hbase.columns.mapping` and
`hbase.table`. The `hbase.table` directs SparkSQL to use the given HBase table.
The `hbase.columns.mapping` key give us the logic to translate HBase columns to
SparkSQL columns.
+
The `hbase.columns.mapping` is a string that follows the following format
+
[source, scala]
----
(SparkSQL.ColumnName) (SparkSQL.ColumnType) (HBase.ColumnFamily):(HBase.Qualifier)
----
+
In the example below we see the definition of three fields. Because KEY_FIELD has
no ColumnFamily, it is the RowKey.
+
----
KEY_FIELD STRING :key, A_FIELD STRING c:a, B_FIELD STRING c:b
----
The registerTempTable function::
This is a SparkSQL function that allows us now to be free of Scala when accessing
our HBase table directly with SQL with the table name of "hbaseTmp".
The last major point to note in the example is the `sqlContext.sql` function, which
allows the user to ask their questions in SQL which will be pushed down to the
DefaultSource code in the HBase-Spark module. The result of this command will be
a DataFrame with the Schema of KEY_FIELD and B_FIELD.
====