blob: 88610514ea89b97cdce52f2e3002c74661208e06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.server.recovery
import scala.util.Success
import org.mockito.Mockito._
import org.scalatest.FunSpec
import org.scalatest.Matchers._
import org.scalatestplus.mockito.MockitoSugar.mock
import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
import org.apache.livy.sessions.Session.RecoveryMetadata
class SessionStoreSpec extends FunSpec with LivyBaseUnitTestSuite {
describe("SessionStore") {
case class TestRecoveryMetadata(id: Int) extends RecoveryMetadata
val sessionType = "test"
val sessionPath = s"v1/$sessionType"
val sessionManagerPath = s"v1/$sessionType/state"
val conf = new LivyConf()
it("should set session state and session counter when saving a session.") {
val stateStore = mock[StateStore]
val sessionStore = new SessionStore(conf, stateStore)
val m = TestRecoveryMetadata(99)
sessionStore.save(sessionType, m)
verify(stateStore).set(s"$sessionPath/99", m)
}
it("should return existing sessions") {
val validMetadata = Map(
"0" -> Some(TestRecoveryMetadata(0)),
"5" -> None,
"77" -> Some(TestRecoveryMetadata(77)))
val corruptedMetadata = Map(
"7" -> new RuntimeException("Test"),
"11212" -> new RuntimeException("Test")
)
val stateStore = mock[StateStore]
val sessionStore = new SessionStore(conf, stateStore)
when(stateStore.getChildren(sessionPath))
.thenReturn((validMetadata ++ corruptedMetadata).keys.toList)
validMetadata.foreach { case (id, m) =>
when(stateStore.get[TestRecoveryMetadata](s"$sessionPath/$id")).thenReturn(m)
}
corruptedMetadata.foreach { case (id, ex) =>
when(stateStore.get[TestRecoveryMetadata](s"$sessionPath/$id")).thenThrow(ex)
}
val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType)
// Verify normal metadata are retrieved.
s.filter(_.isSuccess) should contain theSameElementsAs
validMetadata.values.filter(_.isDefined).map(m => Success(m.get))
// Verify exceptions are wrapped as in Try and are returned.
s.filter(_.isFailure) should have size corruptedMetadata.size
}
it("should not throw if the state store is empty") {
val stateStore = mock[StateStore]
val sessionStore = new SessionStore(conf, stateStore)
when(stateStore.getChildren(sessionPath)).thenReturn(Seq.empty)
val s = sessionStore.getAllSessions[TestRecoveryMetadata](sessionType)
s.filter(_.isSuccess) shouldBe empty
}
it("should return correct next session id") {
val stateStore = mock[StateStore]
val sessionStore = new SessionStore(conf, stateStore)
when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(None)
sessionStore.getNextSessionId(sessionType) shouldBe 0
val sms = SessionManagerState(100)
when(stateStore.get[SessionManagerState](sessionManagerPath)).thenReturn(Some(sms))
sessionStore.getNextSessionId(sessionType) shouldBe sms.nextSessionId
}
it("should remove session") {
val stateStore = mock[StateStore]
val sessionStore = new SessionStore(conf, stateStore)
val id = 1
sessionStore.remove(sessionType, 1)
verify(stateStore).remove(s"$sessionPath/$id")
}
}
}