blob: 83e2ac6126661f3555bde2a5b930efa0c99942ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{ CellUtil, TableName, HBaseTestingUtility}
import org.apache.spark.{SparkException, SparkContext}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
class HBaseContextSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
@transient var sc: SparkContext = null
var TEST_UTIL = new HBaseTestingUtility
val tableName = "t1"
val columnFamily = "c"
override def beforeAll() {
TEST_UTIL.startMiniCluster()
logInfo(" - minicluster started")
try {
TEST_UTIL.deleteTable(TableName.valueOf(tableName))
} catch {
case e: Exception =>
logInfo(" - no table " + tableName + " found")
}
logInfo(" - creating table " + tableName)
TEST_UTIL.createTable(TableName.valueOf(tableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
val envMap = Map[String,String](("Xmx", "512m"))
sc = new SparkContext("local", "test", null, Nil, envMap)
}
override def afterAll() {
logInfo("shuting down minicluster")
TEST_UTIL.shutdownMiniCluster()
logInfo(" - minicluster shut down")
TEST_UTIL.cleanupTestDir()
sc.stop()
}
test("bulkput to test HBase client") {
val config = TEST_UTIL.getConfiguration
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1")))),
(Bytes.toBytes("2"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo2")))),
(Bytes.toBytes("3"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("c"), Bytes.toBytes("foo3")))),
(Bytes.toBytes("4"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("d"), Bytes.toBytes("foo")))),
(Bytes.toBytes("5"),
Array((Bytes.toBytes(columnFamily), Bytes.toBytes("e"), Bytes.toBytes("bar"))))))
val hbaseContext = new HBaseContext(sc, config)
hbaseContext.bulkPut[(Array[Byte], Array[(Array[Byte], Array[Byte], Array[Byte])])](rdd,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
})
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
val foo1 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("1"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a"))))
assert(foo1 == "foo1")
val foo2 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("2"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("b"))))
assert(foo2 == "foo2")
val foo3 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("3"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("c"))))
assert(foo3 == "foo3")
val foo4 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("4"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("d"))))
assert(foo4 == "foo")
val foo5 = Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("5"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("e"))))
assert(foo5 == "bar")
} finally {
table.close()
connection.close()
}
}
test("bulkDelete to test HBase client") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("delete1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("delete2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("delete3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
val rdd = sc.parallelize(Array(
Bytes.toBytes("delete1"),
Bytes.toBytes("delete3")))
val hbaseContext = new HBaseContext(sc, config)
hbaseContext.bulkDelete[Array[Byte]](rdd,
TableName.valueOf(tableName),
putRecord => new Delete(putRecord),
4)
assert(table.get(new Get(Bytes.toBytes("delete1"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
assert(table.get(new Get(Bytes.toBytes("delete3"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")) == null)
assert(Bytes.toString(CellUtil.cloneValue(table.get(new Get(Bytes.toBytes("delete2"))).
getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes("a")))).equals("foo2"))
} finally {
table.close()
connection.close()
}
}
test("bulkGet to test HBase client") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
} finally {
table.close()
connection.close()
}
val rdd = sc.parallelize(Array(
Bytes.toBytes("get1"),
Bytes.toBytes("get2"),
Bytes.toBytes("get3"),
Bytes.toBytes("get4")))
val hbaseContext = new HBaseContext(sc, config)
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
TableName.valueOf(tableName),
2,
rdd,
record => {
new Get(record)
},
(result: Result) => {
if (result.listCells() != null) {
val it = result.listCells().iterator()
val B = new StringBuilder
B.append(Bytes.toString(result.getRow) + ":")
while (it.hasNext) {
val cell = it.next()
val q = Bytes.toString(CellUtil.cloneQualifier(cell))
if (q.equals("counter")) {
B.append("(" + q + "," + Bytes.toLong(CellUtil.cloneValue(cell)) + ")")
} else {
B.append("(" + q + "," + Bytes.toString(CellUtil.cloneValue(cell)) + ")")
}
}
"" + B.toString
} else {
""
}
})
val getArray = getRdd.collect()
assert(getArray.length == 4)
assert(getArray.contains("get1:(a,foo1)"))
assert(getArray.contains("get2:(a,foo2)"))
assert(getArray.contains("get3:(a,foo3)"))
}
test("BulkGet failure test: bad table") {
val config = TEST_UTIL.getConfiguration
val rdd = sc.parallelize(Array(
Bytes.toBytes("get1"),
Bytes.toBytes("get2"),
Bytes.toBytes("get3"),
Bytes.toBytes("get4")))
val hbaseContext = new HBaseContext(sc, config)
intercept[SparkException] {
try {
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
TableName.valueOf("badTableName"),
2,
rdd,
record => {
new Get(record)
},
(result: Result) => "1")
getRdd.collect()
fail("We should have failed and not reached this line")
} catch {
case ex: SparkException => {
assert(
ex.getMessage.contains(
"org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException"))
throw ex
}
}
}
}
test("BulkGet failure test: bad column") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("get1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("get2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("get3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
} finally {
table.close()
connection.close()
}
val rdd = sc.parallelize(Array(
Bytes.toBytes("get1"),
Bytes.toBytes("get2"),
Bytes.toBytes("get3"),
Bytes.toBytes("get4")))
val hbaseContext = new HBaseContext(sc, config)
val getRdd = hbaseContext.bulkGet[Array[Byte], String](
TableName.valueOf(tableName),
2,
rdd,
record => {
new Get(record)
},
(result: Result) => {
if (result.listCells() != null) {
val cellValue = result.getColumnLatestCell(
Bytes.toBytes("c"), Bytes.toBytes("bad_column"))
if (cellValue == null) "null" else "bad"
} else "noValue"
})
var nullCounter = 0
var noValueCounter = 0
getRdd.collect().foreach(r => {
if ("null".equals(r)) nullCounter += 1
else if ("noValue".equals(r)) noValueCounter += 1
})
assert(nullCounter == 3)
assert(noValueCounter == 1)
}
test("distributedScan to test HBase client") {
val config = TEST_UTIL.getConfiguration
val connection = ConnectionFactory.createConnection(config)
val table = connection.getTable(TableName.valueOf("t1"))
try {
var put = new Put(Bytes.toBytes("scan1"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo1"))
table.put(put)
put = new Put(Bytes.toBytes("scan2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo2"))
table.put(put)
put = new Put(Bytes.toBytes("scan2"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("b"), Bytes.toBytes("foo-2"))
table.put(put)
put = new Put(Bytes.toBytes("scan3"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
put = new Put(Bytes.toBytes("scan4"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
put = new Put(Bytes.toBytes("scan5"))
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("a"), Bytes.toBytes("foo3"))
table.put(put)
} finally {
table.close()
connection.close()
}
val hbaseContext = new HBaseContext(sc, config)
val scan = new Scan()
val filter = new FirstKeyOnlyFilter()
scan.setCaching(100)
scan.setStartRow(Bytes.toBytes("scan2"))
scan.setStopRow(Bytes.toBytes("scan4_"))
scan.setFilter(filter)
val scanRdd = hbaseContext.hbaseRDD(TableName.valueOf(tableName), scan)
try {
val scanList = scanRdd.map(r => r._1.copyBytes()).collect()
assert(scanList.length == 3)
var cnt = 0
scanRdd.map(r => r._2.listCells().size()).collect().foreach(l => {
cnt += l
})
// the number of cells returned would be 4 without the Filter
assert(cnt == 3);
} catch {
case ex: Exception => ex.printStackTrace()
}
}
}