blob: be6581a3c7b0a9c1a485c847fd27861e4a180091 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import java.util.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.util.Pair
import org.apache.yetus.audience.InterfaceAudience
import org.apache.hadoop.hbase.client.{Connection, Delete, Get, Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{FlatMapFunction, Function, VoidFunction}
import org.apache.spark.streaming.api.java.JavaDStream
import java.lang.Iterable
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
/**
* This is the Java Wrapper over HBaseContext which is written in
* Scala. This class will be used by developers that want to
* work with Spark or Spark Streaming in Java
*
* @param jsc This is the JavaSparkContext that we will wrap
* @param config This is the config information to out HBase cluster
*/
@InterfaceAudience.Public
class JavaHBaseContext(@transient val jsc: JavaSparkContext,
@transient val config: Configuration) extends Serializable {
val hbaseContext = new HBaseContext(jsc.sc, config)
/**
* A simple enrichment of the traditional Spark javaRdd foreachPartition.
* This function differs from the original in that it offers the
* developer access to a already connected Connection object
*
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* @param javaRdd Original javaRdd with data to iterate over
* @param f Function to be given a iterator to iterate through
* the RDD values and a Connection object to interact
* with HBase
*/
def foreachPartition[T](javaRdd: JavaRDD[T],
f: VoidFunction[(java.util.Iterator[T], Connection)]) = {
hbaseContext.foreachPartition(javaRdd.rdd,
(it: Iterator[T], conn: Connection) => {
f.call((it, conn))
})
}
/**
* A simple enrichment of the traditional Spark Streaming dStream foreach
* This function differs from the original in that it offers the
* developer access to a already connected Connection object
*
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* @param javaDstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the JavaDStream values and a Connection object to
* interact with HBase
*/
def foreachPartition[T](javaDstream: JavaDStream[T],
f: VoidFunction[(Iterator[T], Connection)]) = {
hbaseContext.foreachPartition(javaDstream.dstream,
(it: Iterator[T], conn: Connection) => f.call(it, conn))
}
/**
* A simple enrichment of the traditional Spark JavaRDD mapPartition.
* This function differs from the original in that it offers the
* developer access to a already connected Connection object
*
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
* getting data from HBase
*
* @param javaRdd Original JavaRdd with data to iterate over
* @param f Function to be given a iterator to iterate through
* the RDD values and a Connection object to interact
* with HBase
* @return Returns a new RDD generated by the user definition
* function just like normal mapPartition
*/
def mapPartitions[T, R](javaRdd: JavaRDD[T],
f: FlatMapFunction[(java.util.Iterator[T],
Connection), R]): JavaRDD[R] = {
JavaRDD.fromRDD(hbaseContext.mapPartitions(javaRdd.rdd,
(it: Iterator[T], conn: Connection) =>
f.call(it, conn))(fakeClassTag[R]))(fakeClassTag[R])
}
/**
* A simple enrichment of the traditional Spark Streaming JavaDStream
* mapPartition.
*
* This function differs from the original in that it offers the
* developer access to a already connected Connection object
*
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
* getting data from HBase
*
* @param javaDstream Original JavaDStream with data to iterate over
* @param mp Function to be given a iterator to iterate through
* the JavaDStream values and a Connection object to
* interact with HBase
* @return Returns a new JavaDStream generated by the user
* definition function just like normal mapPartition
*/
def streamMap[T, U](javaDstream: JavaDStream[T],
mp: Function[(Iterator[T], Connection), Iterator[U]]):
JavaDStream[U] = {
JavaDStream.fromDStream(hbaseContext.streamMapPartitions(javaDstream.dstream,
(it: Iterator[T], conn: Connection) =>
mp.call(it, conn))(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* A simple abstraction over the HBaseContext.foreachPartition method.
*
* It allow addition support for a user to take JavaRDD
* and generate puts and send them to HBase.
* The complexity of managing the Connection is
* removed from the developer
*
* @param javaRdd Original JavaRDD with data to iterate over
* @param tableName The name of the table to put into
* @param f Function to convert a value in the JavaRDD
* to a HBase Put
*/
def bulkPut[T](javaRdd: JavaRDD[T],
tableName: TableName,
f: Function[(T), Put]) {
hbaseContext.bulkPut(javaRdd.rdd, tableName, (t: T) => f.call(t))
}
/**
* A simple abstraction over the HBaseContext.streamMapPartition method.
*
* It allow addition support for a user to take a JavaDStream and
* generate puts and send them to HBase.
*
* The complexity of managing the Connection is
* removed from the developer
*
* @param javaDstream Original DStream with data to iterate over
* @param tableName The name of the table to put into
* @param f Function to convert a value in
* the JavaDStream to a HBase Put
*/
def streamBulkPut[T](javaDstream: JavaDStream[T],
tableName: TableName,
f: Function[T, Put]) = {
hbaseContext.streamBulkPut(javaDstream.dstream,
tableName,
(t: T) => f.call(t))
}
/**
* A simple abstraction over the HBaseContext.foreachPartition method.
*
* It allow addition support for a user to take a JavaRDD and
* generate delete and send them to HBase.
*
* The complexity of managing the Connection is
* removed from the developer
*
* @param javaRdd Original JavaRDD with data to iterate over
* @param tableName The name of the table to delete from
* @param f Function to convert a value in the JavaRDD to a
* HBase Deletes
* @param batchSize The number of deletes to batch before sending to HBase
*/
def bulkDelete[T](javaRdd: JavaRDD[T], tableName: TableName,
f: Function[T, Delete], batchSize: Integer) {
hbaseContext.bulkDelete(javaRdd.rdd, tableName, (t: T) => f.call(t), batchSize)
}
/**
* A simple abstraction over the HBaseContext.streamBulkMutation method.
*
* It allow addition support for a user to take a JavaDStream and
* generate Delete and send them to HBase.
*
* The complexity of managing the Connection is
* removed from the developer
*
* @param javaDStream Original DStream with data to iterate over
* @param tableName The name of the table to delete from
* @param f Function to convert a value in the JavaDStream to a
* HBase Delete
* @param batchSize The number of deletes to be sent at once
*/
def streamBulkDelete[T](javaDStream: JavaDStream[T],
tableName: TableName,
f: Function[T, Delete],
batchSize: Integer) = {
hbaseContext.streamBulkDelete(javaDStream.dstream, tableName,
(t: T) => f.call(t),
batchSize)
}
/**
* A simple abstraction over the HBaseContext.mapPartition method.
*
* It allow addition support for a user to take a JavaRDD and generates a
* new RDD based on Gets and the results they bring back from HBase
*
* @param tableName The name of the table to get from
* @param batchSize batch size of how many gets to retrieve in a single fetch
* @param javaRdd Original JavaRDD with data to iterate over
* @param makeGet Function to convert a value in the JavaRDD to a
* HBase Get
* @param convertResult This will convert the HBase Result object to
* what ever the user wants to put in the resulting
* JavaRDD
* @return New JavaRDD that is created by the Get to HBase
*/
def bulkGet[T, U](tableName: TableName,
batchSize: Integer,
javaRdd: JavaRDD[T],
makeGet: Function[T, Get],
convertResult: Function[Result, U]): JavaRDD[U] = {
JavaRDD.fromRDD(hbaseContext.bulkGet[T, U](tableName,
batchSize,
javaRdd.rdd,
(t: T) => makeGet.call(t),
(r: Result) => {
convertResult.call(r)
})(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* A simple abstraction over the HBaseContext.streamMap method.
*
* It allow addition support for a user to take a DStream and
* generates a new DStream based on Gets and the results
* they bring back from HBase
*
* @param tableName The name of the table to get from
* @param batchSize The number of gets to be batched together
* @param javaDStream Original DStream with data to iterate over
* @param makeGet Function to convert a value in the JavaDStream to a
* HBase Get
* @param convertResult This will convert the HBase Result object to
* what ever the user wants to put in the resulting
* JavaDStream
* @return New JavaDStream that is created by the Get to HBase
*/
def streamBulkGet[T, U](tableName: TableName,
batchSize: Integer,
javaDStream: JavaDStream[T],
makeGet: Function[T, Get],
convertResult: Function[Result, U]): JavaDStream[U] = {
JavaDStream.fromDStream(hbaseContext.streamBulkGet(tableName,
batchSize,
javaDStream.dstream,
(t: T) => makeGet.call(t),
(r: Result) => convertResult.call(r))(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* A simple abstraction over the HBaseContext.bulkLoad method.
* It allow addition support for a user to take a JavaRDD and
* convert into new JavaRDD[Pair] based on MapFunction,
* and HFiles will be generated in stagingDir for bulk load
*
* @param javaRdd The javaRDD we are bulk loading from
* @param tableName The HBase table we are loading into
* @param mapFunc A Function that will convert a value in JavaRDD
* to Pair(KeyFamilyQualifier, Array[Byte])
* @param stagingDir The location on the FileSystem to bulk load into
* @param familyHFileWriteOptionsMap Options that will define how the HFile for a
* column family is written
* @param compactionExclude Compaction excluded for the HFiles
* @param maxSize Max size for the HFiles before they roll
*/
def bulkLoad[T](javaRdd: JavaRDD[T],
tableName: TableName,
mapFunc : Function[T, Pair[KeyFamilyQualifier, Array[Byte]]],
stagingDir: String,
familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean,
maxSize: Long):
Unit = {
hbaseContext.bulkLoad[Pair[KeyFamilyQualifier, Array[Byte]]](javaRdd.map(mapFunc).rdd, tableName, t => {
val keyFamilyQualifier = t.getFirst
val value = t.getSecond
Seq((keyFamilyQualifier, value)).iterator
}, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
}
/**
* A simple abstraction over the HBaseContext.bulkLoadThinRows method.
* It allow addition support for a user to take a JavaRDD and
* convert into new JavaRDD[Pair] based on MapFunction,
* and HFiles will be generated in stagingDir for bulk load
*
* @param javaRdd The javaRDD we are bulk loading from
* @param tableName The HBase table we are loading into
* @param mapFunc A Function that will convert a value in JavaRDD
* to Pair(ByteArrayWrapper, FamiliesQualifiersValues)
* @param stagingDir The location on the FileSystem to bulk load into
* @param familyHFileWriteOptionsMap Options that will define how the HFile for a
* column family is written
* @param compactionExclude Compaction excluded for the HFiles
* @param maxSize Max size for the HFiles before they roll
*/
def bulkLoadThinRows[T](javaRdd: JavaRDD[T],
tableName: TableName,
mapFunc : Function[T, Pair[ByteArrayWrapper, FamiliesQualifiersValues]],
stagingDir: String,
familyHFileWriteOptionsMap: Map[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean,
maxSize: Long):
Unit = {
hbaseContext.bulkLoadThinRows[Pair[ByteArrayWrapper, FamiliesQualifiersValues]](javaRdd.map(mapFunc).rdd,
tableName, t => {
(t.getFirst, t.getSecond)
}, stagingDir, familyHFileWriteOptionsMap, compactionExclude, maxSize)
}
/**
* This function will use the native HBase TableInputFormat with the
* given scan object to generate a new JavaRDD
*
* @param tableName The name of the table to scan
* @param scans The HBase scan object to use to read data from HBase
* @param f Function to convert a Result object from HBase into
* What the user wants in the final generated JavaRDD
* @return New JavaRDD with results from scan
*/
def hbaseRDD[U](tableName: TableName,
scans: Scan,
f: Function[(ImmutableBytesWritable, Result), U]):
JavaRDD[U] = {
JavaRDD.fromRDD(
hbaseContext.hbaseRDD[U](tableName,
scans,
(v: (ImmutableBytesWritable, Result)) =>
f.call(v._1, v._2))(fakeClassTag[U]))(fakeClassTag[U])
}
/**
* A overloaded version of HBaseContext hbaseRDD that define the
* type of the resulting JavaRDD
*
* @param tableName The name of the table to scan
* @param scans The HBase scan object to use to read data from HBase
* @return New JavaRDD with results from scan
*/
def hbaseRDD(tableName: TableName,
scans: Scan):
JavaRDD[(ImmutableBytesWritable, Result)] = {
JavaRDD.fromRDD(hbaseContext.hbaseRDD(tableName, scans))
}
/**
* Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
*
* This method is used to keep ClassTags out of the external Java API, as the Java compiler
* cannot produce them automatically. While this ClassTag-faking does please the compiler,
* it can cause problems at runtime if the Scala API relies on ClassTags for correctness.
*
* Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
* just worse performance or security issues.
* For instance, an Array[AnyRef] can hold any type T,
* but may lose primitive
* specialization.
*/
private[spark]
def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
}