blob: 826a2fbd79a073c1a20884052e31c77d1a911a72 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.server.recovery
import java.io.{FileNotFoundException, IOException}
import java.net.URI
import java.util
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.Options.{CreateOpts, Rename}
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.Utils.usingResource
class FileSystemStateStore(
livyConf: LivyConf,
mockFileContext: Option[FileContext])
extends StateStore(livyConf) with Logging {
// Constructor defined for StateStore factory to new this class using reflection.
def this(livyConf: LivyConf) {
this(livyConf, None)
}
private val fsUri = {
val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
require(fsPath != null && !fsPath.isEmpty,
s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
new URI(fsPath)
}
private val fileContext: FileContext = mockFileContext.getOrElse {
FileContext.getFileContext(fsUri)
}
{
// Only Livy user should have access to state files.
fileContext.setUMask(new FsPermission("077"))
// Create state store dir if it doesn't exist.
val stateStorePath = absPath(".")
try {
fileContext.mkdir(stateStorePath, FsPermission.getDirDefault(), true)
} catch {
case _: FileAlreadyExistsException =>
if (!fileContext.getFileStatus(stateStorePath).isDirectory()) {
throw new IOException(s"$stateStorePath is not a directory.")
}
}
// Check permission of state store dir.
val fileStatus = fileContext.getFileStatus(absPath("."))
require(fileStatus.getPermission.getUserAction() == FsAction.ALL,
s"Livy doesn't have permission to access state store: $fsUri.")
if (fileStatus.getPermission.getGroupAction != FsAction.NONE) {
warn(s"Group users have permission to access state store: $fsUri. This is insecure.")
}
if (fileStatus.getPermission.getOtherAction != FsAction.NONE) {
warn(s"Other users have permission to access state store: $fsUri. This is in secure.")
}
}
override def set(key: String, value: Object): Unit = {
// Write to a temp file then rename to avoid file corruption if livy-server crashes
// in the middle of the write.
val tmpPath = absPath(s"$key.tmp")
val createFlag = util.EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
usingResource(fileContext.create(tmpPath, createFlag, CreateOpts.createParent())) { tmpFile =>
tmpFile.write(serializeToBytes(value))
tmpFile.close()
// Assume rename is atomic.
fileContext.rename(tmpPath, absPath(key), Rename.OVERWRITE)
}
try {
val crcPath = new Path(tmpPath.getParent, s".${tmpPath.getName}.crc")
fileContext.delete(crcPath, false)
} catch {
case NonFatal(e) => // Swallow the exception.
}
}
override def get[T: ClassTag](key: String): Option[T] = {
try {
usingResource(fileContext.open(absPath(key))) { is =>
Option(deserialize[T](IOUtils.toByteArray(is)))
}
} catch {
case _: FileNotFoundException => None
case e: IOException =>
warn(s"Failed to read $key from state store.", e)
None
}
}
override def getChildren(key: String): Seq[String] = {
try {
fileContext.util.listStatus(absPath(key)).map(_.getPath.getName)
} catch {
case _: FileNotFoundException => Seq.empty
case e: IOException =>
warn(s"Failed to list $key from state store.", e)
Seq.empty
}
}
override def remove(key: String): Unit = {
try {
fileContext.delete(absPath(key), false)
} catch {
case _: FileNotFoundException => warn(s"Failed to remove non-existed file: ${key}")
}
}
private def absPath(key: String): Path = new Path(fsUri.getPath(), key)
}