blob: f07b1d6e7b7c6e067e6a805dd7a4a9b03f46995f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.spark
import java.util.concurrent.ExecutorService
import scala.util.Random
import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator,
Connection, BufferedMutatorParams, Admin, TableBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.TableName
import org.scalatest.FunSuite
case class HBaseConnectionKeyMocker (confId: Int) extends HBaseConnectionKey (null) {
override def hashCode: Int = {
confId
}
override def equals(obj: Any): Boolean = {
if(!obj.isInstanceOf[HBaseConnectionKeyMocker])
false
else
confId == obj.asInstanceOf[HBaseConnectionKeyMocker].confId
}
}
class ConnectionMocker extends Connection {
var isClosed: Boolean = false
def getRegionLocator (tableName: TableName): RegionLocator = null
def getConfiguration: Configuration = null
override def getTable (tableName: TableName): Table = null
override def getTable(tableName: TableName, pool: ExecutorService): Table = null
def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null
def getBufferedMutator (tableName: TableName): BufferedMutator = null
def getAdmin: Admin = null
def getTableBuilder(tableName: TableName, pool: ExecutorService): TableBuilder = null
def close(): Unit = {
if (isClosed)
throw new IllegalStateException()
isClosed = true
}
def isAborted: Boolean = true
def abort(why: String, e: Throwable) = {}
/* Without override, we can also compile it against HBase 2.1. */
/* override */ def clearRegionLocationCache(): Unit = {}
}
class HBaseConnectionCacheSuite extends FunSuite with Logging {
/*
* These tests must be performed sequentially as they operate with an
* unique running thread and resource.
*
* It looks there's no way to tell FunSuite to do so, so making those
* test cases normal functions which are called sequentially in a single
* test case.
*/
test("all test cases") {
testBasic()
testWithPressureWithoutClose()
testWithPressureWithClose()
}
def cleanEnv() {
HBaseConnectionCache.connectionMap.synchronized {
HBaseConnectionCache.connectionMap.clear()
HBaseConnectionCache.cacheStat.numActiveConnections = 0
HBaseConnectionCache.cacheStat.numActualConnectionsCreated = 0
HBaseConnectionCache.cacheStat.numTotalRequests = 0
}
}
def testBasic() {
cleanEnv()
HBaseConnectionCache.setTimeout(1 * 1000)
val connKeyMocker1 = new HBaseConnectionKeyMocker(1)
val connKeyMocker1a = new HBaseConnectionKeyMocker(1)
val connKeyMocker2 = new HBaseConnectionKeyMocker(2)
val c1 = HBaseConnectionCache
.getConnection(connKeyMocker1, new ConnectionMocker)
assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.getStat.numTotalRequests === 1)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1)
assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
val c1a = HBaseConnectionCache
.getConnection(connKeyMocker1a, new ConnectionMocker)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.getStat.numTotalRequests === 2)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1)
assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
}
val c2 = HBaseConnectionCache
.getConnection(connKeyMocker2, new ConnectionMocker)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 2)
assert(HBaseConnectionCache.getStat.numTotalRequests === 3)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
}
c1.close()
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
}
c1a.close()
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
}
Thread.sleep(3 * 1000) // Leave housekeeping thread enough time
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 1)
assert(HBaseConnectionCache.connectionMap.iterator.next()._1
.asInstanceOf[HBaseConnectionKeyMocker].confId === 2)
assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
}
c2.close()
}
def testWithPressureWithoutClose() {
cleanEnv()
class TestThread extends Runnable {
override def run() {
for (i <- 0 to 999) {
val c = HBaseConnectionCache.getConnection(
new HBaseConnectionKeyMocker(Random.nextInt(10)), new ConnectionMocker)
}
}
}
HBaseConnectionCache.setTimeout(500)
val threads: Array[Thread] = new Array[Thread](100)
for (i <- 0 to 99) {
threads.update(i, new Thread(new TestThread()))
threads(i).run()
}
try {
threads.foreach { x => x.join() }
} catch {
case e: InterruptedException => println(e.getMessage)
}
Thread.sleep(1000)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 10)
assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 10)
var totalRc : Int = 0
HBaseConnectionCache.connectionMap.foreach {
x => totalRc += x._2.refCount
}
assert(totalRc === 100 * 1000)
HBaseConnectionCache.connectionMap.foreach {
x => {
x._2.refCount = 0
x._2.timestamp = System.currentTimeMillis() - 1000
}
}
}
Thread.sleep(1000)
assert(HBaseConnectionCache.connectionMap.size === 0)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
}
def testWithPressureWithClose() {
cleanEnv()
class TestThread extends Runnable {
override def run() {
for (i <- 0 to 999) {
val c = HBaseConnectionCache.getConnection(
new HBaseConnectionKeyMocker(Random.nextInt(10)), new ConnectionMocker)
Thread.`yield`()
c.close()
}
}
}
HBaseConnectionCache.setTimeout(3 * 1000)
val threads: Array[Thread] = new Array[Thread](100)
for (i <- threads.indices) {
threads.update(i, new Thread(new TestThread()))
threads(i).run()
}
try {
threads.foreach { x => x.join() }
} catch {
case e: InterruptedException => println(e.getMessage)
}
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 10)
assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 10)
}
Thread.sleep(6 * 1000)
HBaseConnectionCache.connectionMap.synchronized {
assert(HBaseConnectionCache.connectionMap.size === 0)
assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
}
}
}