blob: db5cda04412911c090bbeb5c4034714e860a1912 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import java.net.InetSocketAddress
import java.util
import java.util.UUID
import javax.management.openmbean.KeyAlreadyExistsException
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.fs.HFileSystem
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.io.compress.Compression
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.io.hfile.{HFile, CacheConfig, HFileContextBuilder, HFileWriterImpl}
import org.apache.hadoop.hbase.regionserver.{HStore, HStoreFile, StoreFileWriter, BloomType}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
import org.apache.hadoop.hbase.client._
import scala.reflect.ClassTag
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
TableInputFormat, IdentityTableMapper}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.streaming.dstream.DStream
import java.io._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.hadoop.fs.{Path, FileAlreadyExistsException, FileSystem}
import scala.collection.mutable
/**
* HBaseContext is a façade for HBase operations
* like bulk put, get, increment, delete, and scan
*
* HBaseContext will take the responsibilities
* of disseminating the configuration information
* to the working and managing the life cycle of Connections.
*/
@InterfaceAudience.Public
class HBaseContext(@transient val sc: SparkContext,
@transient val config: Configuration,
val tmpHdfsConfgFile: String = null)
extends Serializable with Logging {
@transient var tmpHdfsConfiguration:Configuration = config
@transient var appliedCredentials = false
@transient val job = Job.getInstance(config)
TableMapReduceUtil.initCredentials(job)
val broadcastedConf = sc.broadcast(new SerializableWritable(config))
LatestHBaseContextCache.latest = this
if (tmpHdfsConfgFile != null && config != null) {
val fs = FileSystem.newInstance(config)
val tmpPath = new Path(tmpHdfsConfgFile)
if (!fs.exists(tmpPath)) {
val outputStream = fs.create(tmpPath)
config.write(outputStream)
outputStream.close()
} else {
logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
}
}
/**
* A simple enrichment of the traditional Spark RDD foreachPartition.
* This function differs from the original in that it offers the
* developer access to a already connected Connection object
*
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* @param rdd Original RDD with data to iterate over
* @param f Function to be given a iterator to iterate through
* the RDD values and a Connection object to interact
* with HBase
*/
def foreachPartition[T](rdd: RDD[T],
f: (Iterator[T], Connection) => Unit):Unit = {
rdd.foreachPartition(
it => hbaseForeachPartition(broadcastedConf, it, f))
}
/**
* A simple enrichment of the traditional Spark Streaming dStream foreach
* This function differs from the original in that it offers the
* developer access to a already connected Connection object
*
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* @param dstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the DStream values and a Connection object to
* interact with HBase
*/
def foreachPartition[T](dstream: DStream[T],
f: (Iterator[T], Connection) => Unit):Unit = {
dstream.foreachRDD((rdd, time) => {
foreachPartition(rdd, f)
})
}
/**
* A simple enrichment of the traditional Spark RDD mapPartition.
* This function differs from the original in that it offers the
* developer access to a already connected Connection object
*
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* @param rdd Original RDD with data to iterate over
* @param mp Function to be given a iterator to iterate through
* the RDD values and a Connection object to interact
* with HBase
* @return Returns a new RDD generated by the user definition
* function just like normal mapPartition
*/
def mapPartitions[T, R: ClassTag](rdd: RDD[T],
mp: (Iterator[T], Connection) => Iterator[R]): RDD[R] = {
rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
it,
mp))
}
/**
* A simple enrichment of the traditional Spark Streaming DStream
* foreachPartition.
*
* This function differs from the original in that it offers the
* developer access to a already connected Connection object
*
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
* getting data from HBase
*
* @param dstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the DStream values and a Connection object to
* interact with HBase
* @return Returns a new DStream generated by the user
* definition function just like normal mapPartition
*/
def streamForeachPartition[T](dstream: DStream[T],
f: (Iterator[T], Connection) => Unit): Unit = {
dstream.foreachRDD(rdd => this.foreachPartition(rdd, f))
}
/**
* A simple enrichment of the traditional Spark Streaming DStream
* mapPartition.
*
* This function differs from the original in that it offers the
* developer access to a already connected Connection object
*
* Note: Do not close the Connection object. All Connection
* management is handled outside this method
*
* Note: Make sure to partition correctly to avoid memory issue when
* getting data from HBase
*
* @param dstream Original DStream with data to iterate over
* @param f Function to be given a iterator to iterate through
* the DStream values and a Connection object to
* interact with HBase
* @return Returns a new DStream generated by the user
* definition function just like normal mapPartition
*/
def streamMapPartitions[T, U: ClassTag](dstream: DStream[T],
f: (Iterator[T], Connection) => Iterator[U]):
DStream[U] = {
dstream.mapPartitions(it => hbaseMapPartition[T, U](
broadcastedConf,
it,
f))
}
/**
* A simple abstraction over the HBaseContext.foreachPartition method.
*
* It allow addition support for a user to take RDD
* and generate puts and send them to HBase.
* The complexity of managing the Connection is
* removed from the developer
*
* @param rdd Original RDD with data to iterate over
* @param tableName The name of the table to put into
* @param f Function to convert a value in the RDD to a HBase Put
*/
def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) {
val tName = tableName.getName
rdd.foreachPartition(
it => hbaseForeachPartition[T](
broadcastedConf,
it,
(iterator, connection) => {
val m = connection.getBufferedMutator(TableName.valueOf(tName))
iterator.foreach(T => m.mutate(f(T)))
m.flush()
m.close()
}))
}
def applyCreds[T] (){
if (!appliedCredentials) {
appliedCredentials = true
@transient val ugi = UserGroupInformation.getCurrentUser
// specify that this is a proxy user
ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
}
}
/**
* A simple abstraction over the HBaseContext.streamMapPartition method.
*
* It allow addition support for a user to take a DStream and
* generate puts and send them to HBase.
*
* The complexity of managing the Connection is
* removed from the developer
*
* @param dstream Original DStream with data to iterate over
* @param tableName The name of the table to put into
* @param f Function to convert a value in
* the DStream to a HBase Put
*/
def streamBulkPut[T](dstream: DStream[T],
tableName: TableName,
f: (T) => Put) = {
val tName = tableName.getName
dstream.foreachRDD((rdd, time) => {
bulkPut(rdd, TableName.valueOf(tName), f)
})
}
/**
* A simple abstraction over the HBaseContext.foreachPartition method.
*
* It allow addition support for a user to take a RDD and generate delete
* and send them to HBase. The complexity of managing the Connection is
* removed from the developer
*
* @param rdd Original RDD with data to iterate over
* @param tableName The name of the table to delete from
* @param f Function to convert a value in the RDD to a
* HBase Deletes
* @param batchSize The number of delete to batch before sending to HBase
*/
def bulkDelete[T](rdd: RDD[T], tableName: TableName,
f: (T) => Delete, batchSize: Integer) {
bulkMutation(rdd, tableName, f, batchSize)
}
/**
* A simple abstraction over the HBaseContext.streamBulkMutation method.
*
* It allow addition support for a user to take a DStream and
* generate Delete and send them to HBase.
*
* The complexity of managing the Connection is
* removed from the developer
*
* @param dstream Original DStream with data to iterate over
* @param tableName The name of the table to delete from
* @param f function to convert a value in the DStream to a
* HBase Delete
* @param batchSize The number of deletes to batch before sending to HBase
*/
def streamBulkDelete[T](dstream: DStream[T],
tableName: TableName,
f: (T) => Delete,
batchSize: Integer) = {
streamBulkMutation(dstream, tableName, f, batchSize)
}
/**
* Under lining function to support all bulk mutations
*
* May be opened up if requested
*/
private def bulkMutation[T](rdd: RDD[T], tableName: TableName,
f: (T) => Mutation, batchSize: Integer) {
val tName = tableName.getName
rdd.foreachPartition(
it => hbaseForeachPartition[T](
broadcastedConf,
it,
(iterator, connection) => {
val table = connection.getTable(TableName.valueOf(tName))
val mutationList = new java.util.ArrayList[Mutation]
iterator.foreach(T => {
mutationList.add(f(T))
if (mutationList.size >= batchSize) {
table.batch(mutationList, null)
mutationList.clear()
}
})
if (mutationList.size() > 0) {
table.batch(mutationList, null)
mutationList.clear()
}
table.close()
}))
}
/**
* Under lining function to support all bulk streaming mutations
*
* May be opened up if requested
*/
private def streamBulkMutation[T](dstream: DStream[T],
tableName: TableName,
f: (T) => Mutation,
batchSize: Integer) = {
val tName = tableName.getName
dstream.foreachRDD((rdd, time) => {
bulkMutation(rdd, TableName.valueOf(tName), f, batchSize)
})
}
/**
* A simple abstraction over the HBaseContext.mapPartition method.
*
* It allow addition support for a user to take a RDD and generates a
* new RDD based on Gets and the results they bring back from HBase
*
* @param rdd Original RDD with data to iterate over
* @param tableName The name of the table to get from
* @param makeGet function to convert a value in the RDD to a
* HBase Get
* @param convertResult This will convert the HBase Result object to
* what ever the user wants to put in the resulting
* RDD
* return new RDD that is created by the Get to HBase
*/
def bulkGet[T, U: ClassTag](tableName: TableName,
batchSize: Integer,
rdd: RDD[T],
makeGet: (T) => Get,
convertResult: (Result) => U): RDD[U] = {
val getMapPartition = new GetMapPartition(tableName,
batchSize,
makeGet,
convertResult)
rdd.mapPartitions[U](it =>
hbaseMapPartition[T, U](
broadcastedConf,
it,
getMapPartition.run))
}
/**
* A simple abstraction over the HBaseContext.streamMap method.
*
* It allow addition support for a user to take a DStream and
* generates a new DStream based on Gets and the results
* they bring back from HBase
*
* @param tableName The name of the table to get from
* @param batchSize The number of Gets to be sent in a single batch
* @param dStream Original DStream with data to iterate over
* @param makeGet Function to convert a value in the DStream to a
* HBase Get
* @param convertResult This will convert the HBase Result object to
* what ever the user wants to put in the resulting
* DStream
* @return A new DStream that is created by the Get to HBase
*/
def streamBulkGet[T, U: ClassTag](tableName: TableName,
batchSize: Integer,
dStream: DStream[T],
makeGet: (T) => Get,
convertResult: (Result) => U): DStream[U] = {
val getMapPartition = new GetMapPartition(tableName,
batchSize,
makeGet,
convertResult)
dStream.mapPartitions[U](it => hbaseMapPartition[T, U](
broadcastedConf,
it,
getMapPartition.run))
}
/**
* This function will use the native HBase TableInputFormat with the
* given scan object to generate a new RDD
*
* @param tableName the name of the table to scan
* @param scan the HBase scan object to use to read data from HBase
* @param f function to convert a Result object from HBase into
* what the user wants in the final generated RDD
* @return new RDD with results from scan
*/
def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan,
f: ((ImmutableBytesWritable, Result)) => U): RDD[U] = {
val job: Job = Job.getInstance(getConf(broadcastedConf))
TableMapReduceUtil.initCredentials(job)
TableMapReduceUtil.initTableMapperJob(tableName, scan,
classOf[IdentityTableMapper], null, null, job)
val jconf = new JobConf(job.getConfiguration)
val jobCreds = jconf.getCredentials()
UserGroupInformation.setConfiguration(sc.hadoopConfiguration)
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
new NewHBaseRDD(sc,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result],
job.getConfiguration,
this).map(f)
}
/**
* A overloaded version of HBaseContext hbaseRDD that defines the
* type of the resulting RDD
*
* @param tableName the name of the table to scan
* @param scans the HBase scan object to use to read data from HBase
* @return New RDD with results from scan
*
*/
def hbaseRDD(tableName: TableName, scans: Scan):
RDD[(ImmutableBytesWritable, Result)] = {
hbaseRDD[(ImmutableBytesWritable, Result)](
tableName,
scans,
(r: (ImmutableBytesWritable, Result)) => r)
}
/**
* underlining wrapper all foreach functions in HBaseContext
*/
private def hbaseForeachPartition[T](configBroadcast:
Broadcast[SerializableWritable[Configuration]],
it: Iterator[T],
f: (Iterator[T], Connection) => Unit) = {
val config = getConf(configBroadcast)
applyCreds
// specify that this is a proxy user
val smartConn = HBaseConnectionCache.getConnection(config)
f(it, smartConn.connection)
smartConn.close()
}
private def getConf(configBroadcast: Broadcast[SerializableWritable[Configuration]]):
Configuration = {
if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
val fs = FileSystem.newInstance(sc.hadoopConfiguration)
val inputStream = fs.open(new Path(tmpHdfsConfgFile))
tmpHdfsConfiguration = new Configuration(false)
tmpHdfsConfiguration.readFields(inputStream)
inputStream.close()
}
if (tmpHdfsConfiguration == null) {
try {
tmpHdfsConfiguration = configBroadcast.value.value
} catch {
case ex: Exception => logError("Unable to getConfig from broadcast", ex)
}
}
tmpHdfsConfiguration
}
/**
* underlining wrapper all mapPartition functions in HBaseContext
*
*/
private def hbaseMapPartition[K, U](
configBroadcast:
Broadcast[SerializableWritable[Configuration]],
it: Iterator[K],
mp: (Iterator[K], Connection) =>
Iterator[U]): Iterator[U] = {
val config = getConf(configBroadcast)
applyCreds
val smartConn = HBaseConnectionCache.getConnection(config)
val res = mp(it, smartConn.connection)
smartConn.close()
res
}
/**
* underlining wrapper all get mapPartition functions in HBaseContext
*/
private class GetMapPartition[T, U](tableName: TableName,
batchSize: Integer,
makeGet: (T) => Get,
convertResult: (Result) => U)
extends Serializable {
val tName = tableName.getName
def run(iterator: Iterator[T], connection: Connection): Iterator[U] = {
val table = connection.getTable(TableName.valueOf(tName))
val gets = new java.util.ArrayList[Get]()
var res = List[U]()
while (iterator.hasNext) {
gets.add(makeGet(iterator.next()))
if (gets.size() == batchSize) {
val results = table.get(gets)
res = res ++ results.map(convertResult)
gets.clear()
}
}
if (gets.size() > 0) {
val results = table.get(gets)
res = res ++ results.map(convertResult)
gets.clear()
}
table.close()
res.iterator
}
}
/**
* Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
*
* This method is used to keep ClassTags out of the external Java API, as
* the Java compiler cannot produce them automatically. While this
* ClassTag-faking does please the compiler, it can cause problems at runtime
* if the Scala API relies on ClassTags for correctness.
*
* Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
* just worse performance or security issues.
* For instance, an Array of AnyRef can hold any type T, but may lose primitive
* specialization.
*/
private[spark]
def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
/**
* Spark Implementation of HBase Bulk load for wide rows or when
* values are not already combined at the time of the map process
*
* This will take the content from an existing RDD then sort and shuffle
* it with respect to region splits. The result of that sort and shuffle
* will be written to HFiles.
*
* After this function is executed the user will have to call
* LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
*
* Also note this version of bulk load is different from past versions in
* that it includes the qualifier as part of the sort process. The
* reason for this is to be able to support rows will very large number
* of columns.
*
* @param rdd The RDD we are bulk loading from
* @param tableName The HBase table we are loading into
* @param flatMap A flapMap function that will make every
* row in the RDD
* into N cells for the bulk load
* @param stagingDir The location on the FileSystem to bulk load into
* @param familyHFileWriteOptionsMap Options that will define how the HFile for a
* column family is written
* @param compactionExclude Compaction excluded for the HFiles
* @param maxSize Max size for the HFiles before they roll
* @param nowTimeStamp Version timestamp
* @tparam T The Type of values in the original RDD
*/
def bulkLoad[T](rdd:RDD[T],
tableName: TableName,
flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
stagingDir:String,
familyHFileWriteOptionsMap:
util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE,
nowTimeStamp:Long = System.currentTimeMillis()):
Unit = {
val stagingPath = new Path(stagingDir)
val fs = stagingPath.getFileSystem(config)
if (fs.exists(stagingPath)) {
throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
}
val conn = HBaseConnectionCache.getConnection(config)
try {
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
if (startKeys.length == 0) {
logInfo("Table " + tableName.toString + " was not found")
}
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName)
val hfileCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr)
val tableRawName = tableName.getName
val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
while (entrySetIt.hasNext) {
val entry = entrySetIt.next()
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}
val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys)
//This is where all the magic happens
//Here we are going to do the following things
// 1. FlapMap every row in the RDD into key column value tuples
// 2. Then we are going to repartition sort and shuffle
// 3. Finally we are going to write out our HFiles
rdd.flatMap( r => flatMap(r)).
repartitionAndSortWithinPartitions(regionSplitPartitioner).
hbaseForeachPartition(this, (it, conn) => {
val conf = broadcastedConf.value.value
val fs = FileSystem.get(conf)
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
val localTableName = TableName.valueOf(tableRawName)
//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
keyFamilyQualifier.family,
keyFamilyQualifier.qualifier,
cellValue,
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
hfileCompression,
writerMap,
stagingDir)
rollOverRequested = rollOverRequested || wl.written > maxSize
//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
}
previousRow = keyFamilyQualifier.rowKey
}
//We have finished all the data so lets close up the writers
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
})
} finally {
if(null != conn) conn.close()
}
}
/**
* Spark Implementation of HBase Bulk load for short rows some where less then
* a 1000 columns. This bulk load should be faster for tables will thinner
* rows then the other spark implementation of bulk load that puts only one
* value into a record going into a shuffle
*
* This will take the content from an existing RDD then sort and shuffle
* it with respect to region splits. The result of that sort and shuffle
* will be written to HFiles.
*
* After this function is executed the user will have to call
* LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
*
* In this implementation, only the rowKey is given to the shuffle as the key
* and all the columns are already linked to the RowKey before the shuffle
* stage. The sorting of the qualifier is done in memory out side of the
* shuffle stage
*
* Also make sure that incoming RDDs only have one record for every row key.
*
* @param rdd The RDD we are bulk loading from
* @param tableName The HBase table we are loading into
* @param mapFunction A function that will convert the RDD records to
* the key value format used for the shuffle to prep
* for writing to the bulk loaded HFiles
* @param stagingDir The location on the FileSystem to bulk load into
* @param familyHFileWriteOptionsMap Options that will define how the HFile for a
* column family is written
* @param compactionExclude Compaction excluded for the HFiles
* @param maxSize Max size for the HFiles before they roll
* @tparam T The Type of values in the original RDD
*/
def bulkLoadThinRows[T](rdd:RDD[T],
tableName: TableName,
mapFunction: (T) =>
(ByteArrayWrapper, FamiliesQualifiersValues),
stagingDir:String,
familyHFileWriteOptionsMap:
util.Map[Array[Byte], FamilyHFileWriteOptions] =
new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
compactionExclude: Boolean = false,
maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
Unit = {
val stagingPath = new Path(stagingDir)
val fs = stagingPath.getFileSystem(config)
if (fs.exists(stagingPath)) {
throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
}
val conn = HBaseConnectionCache.getConnection(config)
try {
val regionLocator = conn.getRegionLocator(tableName)
val startKeys = regionLocator.getStartKeys
if (startKeys.length == 0) {
logInfo("Table " + tableName.toString + " was not found")
}
val defaultCompressionStr = config.get("hfile.compression",
Compression.Algorithm.NONE.getName)
val defaultCompression = HFileWriterImpl
.compressionByName(defaultCompressionStr)
val nowTimeStamp = System.currentTimeMillis()
val tableRawName = tableName.getName
val familyHFileWriteOptionsMapInternal =
new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
while (entrySetIt.hasNext) {
val entry = entrySetIt.next()
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
}
val regionSplitPartitioner =
new BulkLoadPartitioner(startKeys)
//This is where all the magic happens
//Here we are going to do the following things
// 1. FlapMap every row in the RDD into key column value tuples
// 2. Then we are going to repartition sort and shuffle
// 3. Finally we are going to write out our HFiles
rdd.map( r => mapFunction(r)).
repartitionAndSortWithinPartitions(regionSplitPartitioner).
hbaseForeachPartition(this, (it, conn) => {
val conf = broadcastedConf.value.value
val fs = FileSystem.get(conf)
val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
var rollOverRequested = false
val localTableName = TableName.valueOf(tableRawName)
//Here is where we finally iterate through the data in this partition of the
//RDD that has been sorted and partitioned
it.foreach{ case (rowKey:ByteArrayWrapper,
familiesQualifiersValues:FamiliesQualifiersValues) =>
if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
throw new KeyAlreadyExistsException("The following key was sent to the " +
"HFile load more then one: " + Bytes.toString(previousRow))
}
//The family map is a tree map so the families will be sorted
val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
while (familyIt.hasNext) {
val familyEntry = familyIt.next()
val family = familyEntry.getKey.value
val qualifierIt = familyEntry.getValue.entrySet().iterator()
//The qualifier map is a tree map so the families will be sorted
while (qualifierIt.hasNext) {
val qualifierEntry = qualifierIt.next()
val qualifier = qualifierEntry.getKey
val cellValue = qualifierEntry.getValue
writeValueToHFile(rowKey.value,
family,
qualifier.value, // qualifier
cellValue, // value
nowTimeStamp,
fs,
conn,
localTableName,
conf,
familyHFileWriteOptionsMapInternal,
defaultCompression,
writerMap,
stagingDir)
previousRow = rowKey.value
}
writerMap.values.foreach( wl => {
rollOverRequested = rollOverRequested || wl.written > maxSize
//This will only roll if we have at least one column family file that is
//bigger then maxSize and we have finished a given row key
if (rollOverRequested) {
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
}
})
}
}
//This will get a writer for the column family
//If there is no writer for a given column family then
//it will get created here.
//We have finished all the data so lets close up the writers
rollWriters(fs, writerMap,
regionSplitPartitioner,
previousRow,
compactionExclude)
rollOverRequested = false
})
} finally {
if(null != conn) conn.close()
}
}
/**
* This will return a new HFile writer when requested
*
* @param family column family
* @param conf configuration to connect to HBase
* @param favoredNodes nodes that we would like to write too
* @param fs FileSystem object where we will be writing the HFiles to
* @return WriterLength object
*/
private def getNewHFileWriter(family: Array[Byte], conf: Configuration,
favoredNodes: Array[InetSocketAddress],
fs:FileSystem,
familydir:Path,
familyHFileWriteOptionsMapInternal:
util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
defaultCompression:Compression.Algorithm): WriterLength = {
var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))
if (familyOptions == null) {
familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
}
val tempConf = new Configuration(conf)
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
val contextBuilder = new HFileContextBuilder()
.withCompression(Algorithm.valueOf(familyOptions.compression))
.withChecksumType(HStore.getChecksumType(conf))
.withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
.withBlockSize(familyOptions.blockSize)
if (HFile.getFormatVersion(conf) >= HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
contextBuilder.withIncludesTags(true)
}
contextBuilder.withDataBlockEncoding(DataBlockEncoding.
valueOf(familyOptions.dataBlockEncoding))
val hFileContext = contextBuilder.build()
//Add a '_' to the file name because this is a unfinished file. A rename will happen
// to remove the '_' when the file is closed.
new WriterLength(0,
new StoreFileWriter.Builder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
.withBloomType(BloomType.valueOf(familyOptions.bloomType))
.withComparator(CellComparator.getInstance()).withFileContext(hFileContext)
.withFilePath(new Path(familydir, "_" + UUID.randomUUID.toString.replaceAll("-", "")))
.withFavoredNodes(favoredNodes).build())
}
/**
* Encompasses the logic to write a value to an HFile
*
* @param rowKey The RowKey for the record
* @param family HBase column family for the record
* @param qualifier HBase column qualifier for the record
* @param cellValue HBase cell value
* @param nowTimeStamp The cell time stamp
* @param fs Connection to the FileSystem for the HFile
* @param conn Connection to HBaes
* @param tableName HBase TableName object
* @param conf Configuration to be used when making a new HFile
* @param familyHFileWriteOptionsMapInternal Extra configs for the HFile
* @param hfileCompression The compression codec for the new HFile
* @param writerMap HashMap of existing writers and their offsets
* @param stagingDir The staging directory on the FileSystem to store
* the HFiles
* @return The writer for the given HFile that was writen
* too
*/
private def writeValueToHFile(rowKey: Array[Byte],
family: Array[Byte],
qualifier: Array[Byte],
cellValue:Array[Byte],
nowTimeStamp: Long,
fs: FileSystem,
conn: Connection,
tableName: TableName,
conf: Configuration,
familyHFileWriteOptionsMapInternal:
util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
hfileCompression:Compression.Algorithm,
writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
stagingDir: String
): WriterLength = {
val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(family), {
val familyDir = new Path(stagingDir, Bytes.toString(family))
fs.mkdirs(familyDir)
val loc:HRegionLocation = {
try {
val locator =
conn.getRegionLocator(tableName)
locator.getRegionLocation(rowKey)
} catch {
case e: Throwable =>
logWarning("there's something wrong when locating rowkey: " +
Bytes.toString(rowKey))
null
}
}
if (null == loc) {
if (log.isTraceEnabled) {
logTrace("failed to get region location, so use default writer: " +
Bytes.toString(rowKey))
}
getNewHFileWriter(family = family,
conf = conf,
favoredNodes = null,
fs = fs,
familydir = familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
} else {
if (log.isDebugEnabled) {
logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]")
}
val initialIsa =
new InetSocketAddress(loc.getHostname, loc.getPort)
if (initialIsa.isUnresolved) {
if (log.isTraceEnabled) {
logTrace("failed to resolve bind address: " + loc.getHostname + ":"
+ loc.getPort + ", so use default writer")
}
getNewHFileWriter(family,
conf,
null,
fs,
familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
} else {
if(log.isDebugEnabled) {
logDebug("use favored nodes writer: " + initialIsa.getHostString)
}
getNewHFileWriter(family,
conf,
Array[InetSocketAddress](initialIsa),
fs,
familyDir,
familyHFileWriteOptionsMapInternal,
hfileCompression)
}
}
})
val keyValue =new KeyValue(rowKey,
family,
qualifier,
nowTimeStamp,cellValue)
wl.writer.append(keyValue)
wl.written += keyValue.getLength
wl
}
/**
* This will roll all Writers
* @param fs Hadoop FileSystem object
* @param writerMap HashMap that contains all the writers
* @param regionSplitPartitioner The partitioner with knowledge of how the
* Region's are split by row key
* @param previousRow The last row to fill the HFile ending range metadata
* @param compactionExclude The exclude compaction metadata flag for the HFile
*/
private def rollWriters(fs:FileSystem,
writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
regionSplitPartitioner: BulkLoadPartitioner,
previousRow: Array[Byte],
compactionExclude: Boolean): Unit = {
writerMap.values.foreach( wl => {
if (wl.writer != null) {
logDebug("Writer=" + wl.writer.getPath +
(if (wl.written == 0) "" else ", wrote=" + wl.written))
closeHFileWriter(fs, wl.writer,
regionSplitPartitioner,
previousRow,
compactionExclude)
}
})
writerMap.clear()
}
/**
* Function to close an HFile
* @param fs Hadoop FileSystem object
* @param w HFile Writer
* @param regionSplitPartitioner The partitioner with knowledge of how the
* Region's are split by row key
* @param previousRow The last row to fill the HFile ending range metadata
* @param compactionExclude The exclude compaction metadata flag for the HFile
*/
private def closeHFileWriter(fs:FileSystem,
w: StoreFileWriter,
regionSplitPartitioner: BulkLoadPartitioner,
previousRow: Array[Byte],
compactionExclude: Boolean): Unit = {
if (w != null) {
w.appendFileInfo(HStoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()))
w.appendFileInfo(HStoreFile.BULKLOAD_TASK_KEY,
Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
w.appendFileInfo(HStoreFile.MAJOR_COMPACTION_KEY,
Bytes.toBytes(true))
w.appendFileInfo(HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude))
w.appendTrackedTimestampsToMetadata()
w.close()
val srcPath = w.getPath
//In the new path you will see that we are using substring. This is to
// remove the '_' character in front of the HFile name. '_' is a character
// that will tell HBase that this file shouldn't be included in the bulk load
// This feature is to protect for unfinished HFiles being submitted to HBase
val newPath = new Path(w.getPath.getParent, w.getPath.getName.substring(1))
if (!fs.rename(srcPath, newPath)) {
throw new IOException("Unable to rename '" + srcPath +
"' to " + newPath)
}
}
}
/**
* This is a wrapper class around StoreFileWriter. The reason for the
* wrapper is to keep the length of the file along side the writer
*
* @param written The writer to be wrapped
* @param writer The number of bytes written to the writer
*/
class WriterLength(var written:Long, val writer:StoreFileWriter)
}
@InterfaceAudience.Private
object LatestHBaseContextCache {
var latest:HBaseContext = null
}