| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.livy.server.recovery |
| |
| import scala.reflect.{classTag, ClassTag} |
| |
| import com.fasterxml.jackson.databind.ObjectMapper |
| import com.fasterxml.jackson.module.scala.DefaultScalaModule |
| |
| import org.apache.livy.{LivyConf, Logging} |
| import org.apache.livy.server.recovery.ZooKeeperManager |
| import org.apache.livy.sessions.SessionKindModule |
| import org.apache.livy.sessions.SessionManager._ |
| |
| protected trait JsonMapper { |
| protected val mapper = new ObjectMapper() |
| .registerModule(DefaultScalaModule) |
| .registerModule(new SessionKindModule()) |
| |
| def serializeToBytes(value: Object): Array[Byte] = mapper.writeValueAsBytes(value) |
| |
| def deserialize[T: ClassTag](json: Array[Byte]): T = |
| mapper.readValue(json, classTag[T].runtimeClass.asInstanceOf[Class[T]]) |
| } |
| |
| /** |
| * Interface of a key-value pair storage for state storage. |
| * It's responsible for de/serialization and retrieving/storing object. |
| * It's the low level interface used by higher level classes like SessionStore. |
| * |
| * Hardcoded to use JSON serialization for now for easier ops. Will add better serialization later. |
| */ |
| abstract class StateStore(livyConf: LivyConf) extends JsonMapper { |
| /** |
| * Set a key-value pair to this state store. It overwrites existing value. |
| * @throws Exception Throw when persisting the state store fails. |
| */ |
| def set(key: String, value: Object): Unit |
| |
| /** |
| * Get a key-value pair from this state store. |
| * @return Value if the key exists. None if the key doesn't exist. |
| * @throws Exception Throw when deserialization of the stored value fails. |
| */ |
| def get[T: ClassTag](key: String): Option[T] |
| |
| /** |
| * Treat keys in this state store as a directory tree and |
| * return names of the direct children of the key. |
| * @return List of names of the direct children of the key. |
| * Empty list if the key doesn't exist or have no child. |
| */ |
| def getChildren(key: String): Seq[String] |
| |
| /** |
| * Remove the key from this state store. Does not throw if the key doesn't exist. |
| * @throws Exception Throw when persisting the state store fails. |
| */ |
| def remove(key: String): Unit |
| } |
| |
| /** |
| * Factory to create the store chosen in LivyConf. |
| */ |
| object StateStore extends Logging { |
| private[this] var stateStore: Option[StateStore] = None |
| |
| def init(livyConf: LivyConf, zkManager: Option[ZooKeeperManager] = None): Unit = synchronized { |
| if (stateStore.isEmpty) { |
| val fileStateStoreClassTag = pickStateStore(livyConf) |
| if (fileStateStoreClassTag == classOf[ZooKeeperStateStore]) { |
| stateStore = Option(fileStateStoreClassTag. |
| getDeclaredConstructor(classOf[LivyConf], classOf[ZooKeeperManager]) |
| .newInstance(livyConf, zkManager.get).asInstanceOf[StateStore]) |
| } else { |
| stateStore = Option(fileStateStoreClassTag.getDeclaredConstructor(classOf[LivyConf]) |
| .newInstance(livyConf).asInstanceOf[StateStore]) |
| } |
| info(s"Using ${stateStore.get.getClass.getSimpleName} for recovery.") |
| } |
| } |
| |
| def cleanup(): Unit = synchronized { |
| stateStore = None |
| } |
| |
| def get: StateStore = { |
| assert(stateStore.isDefined, "StateStore hasn't been initialized.") |
| stateStore.get |
| } |
| |
| private[recovery] def pickStateStore(livyConf: LivyConf): Class[_] = { |
| livyConf.get(LivyConf.RECOVERY_MODE) match { |
| case SESSION_RECOVERY_MODE_OFF => classOf[BlackholeStateStore] |
| case SESSION_RECOVERY_MODE_RECOVERY => |
| livyConf.get(LivyConf.RECOVERY_STATE_STORE) match { |
| case "filesystem" => classOf[FileSystemStateStore] |
| case "zookeeper" => classOf[ZooKeeperStateStore] |
| case ss => throw new IllegalArgumentException(s"Unsupported state store $ss") |
| } |
| case rm => throw new IllegalArgumentException(s"Unsupported recovery mode $rm") |
| } |
| } |
| } |