blob: 75925258e2cdcb0ea567db7db2f44261fdffe3ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{CellUtil, TableName, HBaseTestingUtility}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.spark.HBaseDStreamFunctions._
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import scala.collection.mutable
class HBaseDStreamFunctionsSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
@transient var sc: SparkContext = null
var TEST_UTIL: HBaseTestingUtility = new HBaseTestingUtility
val tableName = "t1"
val columnFamily = "c"
override def beforeAll() {
TEST_UTIL.startMiniCluster()
logInfo(" - minicluster started")
try
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
catch {
case e: Exception => logInfo(" - no table " + tableName + " found")
}
logInfo(" - creating table " + tableName)
TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
sc = new SparkContext("local", "test")
}
override def afterAll() {
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
TEST_UTIL.shutdownMiniCluster()
sc.stop()
}
test("bulkput to test HBase client") {
val config = TEST_UTIL.getConfiguration
val rdd1 = sc.parallelize(Array(
(Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
(Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
(Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3"))))))
val rdd2 = sc.parallelize(Array(
(Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
(Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
var isFinished = false
val hbaseContext = new HBaseContext(sc, config)
val ssc = new StreamingContext(sc, Milliseconds(200))
val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
Array[Byte], Array[Byte])])]]()
queue += rdd1
queue += rdd2
val dStream = ssc.queueStream(queue)
dStream.hbaseBulkPut(
hbaseContext,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
})
dStream.foreachRDD(rdd => {
if (rdd.count() == 0) {
isFinished = true
}
})
ssc.start()
while (!isFinished) {
Thread.sleep(100)
}
ssc.stop(true, true)
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
assert(foo1 == "foo1")
val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
assert(foo2 == "foo2")
val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
assert(foo3 == "foo3")
val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
assert(foo4 == "foo")
val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
assert(foo5 == "bar")
} finally {
table.close()
connection.close()
}
}
}