blob: 4b41ba147fcfc7c2e367880fcc5dd09b6355785e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gearpump.external.hbase
import java.io.{File, ObjectInputStream, ObjectOutputStream}
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.util.{Constants, FileUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.security.UserProvider
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.security.UserGroupInformation
class HBaseSink(userConfig: UserConfig, tableName: String,
val conn: (UserConfig, Configuration)
=> Connection, @transient var configuration: Configuration)
extends DataSink {
lazy val connection = conn(userConfig, configuration)
lazy val table = connection.getTable(TableName.valueOf(tableName))
override def open(context: TaskContext): Unit = {}
def this(userConfig: UserConfig, tableName: String, configuration: Configuration) = {
this(userConfig, tableName, HBaseSink.getConnection, configuration)
}
def this(userConfig: UserConfig, tableName: String) = {
this(userConfig, tableName, HBaseConfiguration.create())
}
def insert(rowKey: String, columnGroup: String, columnName: String, value: String): Unit = {
insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup),
Bytes.toBytes(columnName), Bytes.toBytes(value))
}
def insert(
rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], value: Array[Byte])
: Unit = {
val put = new Put(rowKey)
put.addColumn(columnGroup, columnName, value)
table.put(put)
}
def put(msg: Any): Unit = {
msg match {
case seq: Seq[Any] =>
seq.foreach(put)
case tuple: (_, _, _, _) => {
tuple._1 match {
case str: String => {
insert(tuple._1.asInstanceOf[String], tuple._2.asInstanceOf[String],
tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String])
}
case byteArray: Array[Byte@unchecked] => {
insert(tuple._1.asInstanceOf[Array[Byte]], tuple._2.asInstanceOf[Array[Byte]],
tuple._3.asInstanceOf[Array[Byte]], tuple._4.asInstanceOf[Array[Byte]])
}
case _ =>
// Skip
}
}
}
}
override def write(message: Message): Unit = {
put(message.msg)
}
def close(): Unit = {
table.close()
connection.close()
}
/**
* Overrides Java's default serialization
* Please do not remove this
*/
private def writeObject(out: ObjectOutputStream): Unit = {
out.defaultWriteObject()
configuration.write(out)
}
/**
* Overrides Java's default deserialization
* Please do not remove this
*/
private def readObject(in: ObjectInputStream): Unit = {
in.defaultReadObject()
val clientConf = new Configuration(false)
clientConf.readFields(in)
configuration = HBaseConfiguration.create(clientConf)
}
}
object HBaseSink {
val HBASESINK = "hbasesink"
val TABLE_NAME = "hbase.table.name"
val COLUMN_FAMILY = "hbase.table.column.family"
val COLUMN_NAME = "hbase.table.column.name"
val HBASE_USER = "hbase.user"
def apply[T](userConfig: UserConfig, tableName: String, configuration: Configuration)
: HBaseSink = {
new HBaseSink(userConfig, tableName, configuration)
}
def apply[T](userConfig: UserConfig, tableName: String)
: HBaseSink = {
new HBaseSink(userConfig, tableName)
}
private def getConnection(userConfig: UserConfig, configuration: Configuration): Connection = {
if (UserGroupInformation.isSecurityEnabled) {
val principal = userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)
val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE)
if (principal.isEmpty || keytabContent.isEmpty) {
val errorMsg = s"HBase is security enabled, user should provide kerberos principal in " +
s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file " +
s"in ${Constants.GEARPUMP_KEYTAB_FILE}"
throw new Exception(errorMsg)
}
val keytabFile = File.createTempFile("login", ".keytab")
FileUtils.writeByteArrayToFile(keytabFile, keytabContent.get)
keytabFile.setExecutable(false)
keytabFile.setWritable(false)
keytabFile.setReadable(true, true)
UserGroupInformation.setConfiguration(configuration)
UserGroupInformation.loginUserFromKeytab(principal.get, keytabFile.getAbsolutePath)
keytabFile.delete()
}
val userName = userConfig.getString(HBASE_USER)
if (userName.isEmpty) {
ConnectionFactory.createConnection(configuration)
} else {
val user = UserProvider.instantiate(configuration)
.create(UserGroupInformation.createRemoteUser(userName.get))
ConnectionFactory.createConnection(configuration, user)
}
}
}