blob: 3ad7912ea7dc30ca5e556d4532ef53a8202a4c7d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.server.recovery
import scala.collection.JavaConverters._
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.api._
import org.apache.curator.framework.listen.Listenable
import org.apache.zookeeper.data.Stat
import org.mockito.Mockito._
import org.scalatest.FunSpec
import org.scalatest.Matchers._
import org.scalatest.mock.MockitoSugar.mock
import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
class ZooKeeperStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
describe("ZooKeeperStateStore") {
case class TestFixture(stateStore: ZooKeeperStateStore, curatorClient: CuratorFramework)
val conf = new LivyConf()
conf.set(LivyConf.RECOVERY_STATE_STORE, "zookeeper")
conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
val key = "key"
val prefixedKey = s"/livy/$key"
def withMock[R](testBody: TestFixture => R): R = {
val curatorClient = mock[CuratorFramework]
when(curatorClient.getUnhandledErrorListenable())
.thenReturn(mock[Listenable[UnhandledErrorListener]])
val zkManager = new ZooKeeperManager(conf, Some(curatorClient))
zkManager.start()
val stateStore = new ZooKeeperStateStore(conf, zkManager)
testBody(TestFixture(stateStore, curatorClient))
}
def mockExistsBuilder(curatorClient: CuratorFramework, exists: Boolean): Unit = {
val existsBuilder = mock[ExistsBuilder]
when(curatorClient.checkExists()).thenReturn(existsBuilder)
if (exists) {
when(existsBuilder.forPath(prefixedKey)).thenReturn(mock[Stat])
}
}
it("should throw on bad config") {
withMock { f =>
val conf = new LivyConf()
intercept[IllegalArgumentException] { new ZooKeeperManager(conf) }
conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
conf.set(LivyConf.ZK_RETRY_POLICY, "bad")
intercept[IllegalArgumentException] { new ZooKeeperManager(conf) }
}
}
it("set should use curatorClient") {
withMock { f =>
mockExistsBuilder(f.curatorClient, true)
val setDataBuilder = mock[SetDataBuilder]
when(f.curatorClient.setData()).thenReturn(setDataBuilder)
f.stateStore.set("key", 1.asInstanceOf[Object])
verify(f.curatorClient).start()
verify(setDataBuilder).forPath(prefixedKey, Array[Byte](49))
}
}
it("set should create parents if they don't exist") {
withMock { f =>
mockExistsBuilder(f.curatorClient, false)
val createBuilder = mock[CreateBuilder]
when(f.curatorClient.create()).thenReturn(createBuilder)
val p = mock[ProtectACLCreateModePathAndBytesable[String]]
when(createBuilder.creatingParentsIfNeeded()).thenReturn(p)
f.stateStore.set("key", 1.asInstanceOf[Object])
verify(f.curatorClient).start()
verify(p).forPath(prefixedKey, Array[Byte](49))
}
}
it("get should retrieve retry policy configs") {
conf.set(LivyConf.ZK_RETRY_POLICY, "11,77")
withMock { f =>
mockExistsBuilder(f.curatorClient, true)
f.stateStore.getZooKeeperManager().retryPolicy should not be null
f.stateStore.getZooKeeperManager().retryPolicy.getN shouldBe 11
}
}
it("get should retrieve data from curatorClient") {
withMock { f =>
mockExistsBuilder(f.curatorClient, true)
val getDataBuilder = mock[GetDataBuilder]
when(f.curatorClient.getData()).thenReturn(getDataBuilder)
when(getDataBuilder.forPath(prefixedKey)).thenReturn(Array[Byte](50))
val v = f.stateStore.get[Int]("key")
verify(f.curatorClient).start()
v shouldBe Some(2)
}
}
it("get should return None if key doesn't exist") {
withMock { f =>
mockExistsBuilder(f.curatorClient, false)
val v = f.stateStore.get[Int]("key")
verify(f.curatorClient).start()
v shouldBe None
}
}
it("getChildren should use curatorClient") {
withMock { f =>
mockExistsBuilder(f.curatorClient, true)
val getChildrenBuilder = mock[GetChildrenBuilder]
when(f.curatorClient.getChildren()).thenReturn(getChildrenBuilder)
val children = List("abc", "def")
when(getChildrenBuilder.forPath(prefixedKey)).thenReturn(children.asJava)
val c = f.stateStore.getChildren("key")
verify(f.curatorClient).start()
c shouldBe children
}
}
it("getChildren should return empty list if key doesn't exist") {
withMock { f =>
mockExistsBuilder(f.curatorClient, false)
val c = f.stateStore.getChildren("key")
verify(f.curatorClient).start()
c shouldBe empty
}
}
it("remove should use curatorClient") {
withMock { f =>
val deleteBuilder = mock[DeleteBuilder]
when(f.curatorClient.delete()).thenReturn(deleteBuilder)
val g = mock[ChildrenDeletable]
when(deleteBuilder.guaranteed()).thenReturn(g)
f.stateStore.remove(key)
verify(g).forPath(prefixedKey)
}
}
}
}