| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.livy.server.recovery |
| |
| import java.io.{FileNotFoundException, InputStream, IOException} |
| import java.util |
| |
| import org.apache.hadoop.fs._ |
| import org.apache.hadoop.fs.Options.{CreateOpts, Rename} |
| import org.apache.hadoop.fs.permission.FsPermission |
| import org.apache.hadoop.hdfs.DistributedFileSystem |
| import org.hamcrest.Description |
| import org.mockito.ArgumentMatcher |
| import org.mockito.Matchers.{any, anyInt, argThat, eq => equal} |
| import org.mockito.Mockito.{atLeastOnce, spy, verify, when} |
| import org.mockito.internal.matchers.Equals |
| import org.mockito.invocation.InvocationOnMock |
| import org.mockito.stubbing.Answer |
| import org.scalatest.FunSpec |
| import org.scalatest.Matchers._ |
| import org.scalatestplus.mockito.MockitoSugar.mock |
| |
| import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf} |
| |
| class FileSystemStateStoreSpec extends FunSpec with LivyBaseUnitTestSuite { |
| describe("FileSystemStateStore") { |
| def pathEq(wantedPath: String): Path = argThat(new ArgumentMatcher[Path] { |
| private val matcher = new Equals(wantedPath) |
| |
| override def matches(path: Any): Boolean = matcher.matches(path.toString) |
| |
| override def describeTo(d: Description): Unit = { matcher.describeTo(d) } |
| }) |
| |
| def makeConf(): LivyConf = { |
| val conf = new LivyConf() |
| conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/") |
| |
| conf |
| } |
| |
| def makeConfWithTwoSeconds(): LivyConf = { |
| val conf = new LivyConf() |
| conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "file://tmp/") |
| conf.set(LivyConf.HDFS_SAFE_MODE_INTERVAL_IN_SECONDS, new Integer(2)) |
| conf.set(LivyConf.HDFS_SAFE_MODE_MAX_RETRY_ATTEMPTS, new Integer(2)) |
| conf |
| } |
| |
| def mockFileContext(rootDirPermission: String): FileContext = { |
| val fileContext = mock[FileContext] |
| val rootDirStatus = mock[FileStatus] |
| when(fileContext.getFileStatus(any())).thenReturn(rootDirStatus) |
| when(rootDirStatus.getPermission).thenReturn(new FsPermission(rootDirPermission)) |
| |
| fileContext |
| } |
| |
| it("should throw if url is not configured") { |
| intercept[IllegalArgumentException](new FileSystemStateStore(new LivyConf())) |
| } |
| |
| it("should set and verify file permission") { |
| val fileContext = mockFileContext("700") |
| new FileSystemStateStore(makeConf(), Some(fileContext)) |
| |
| verify(fileContext).setUMask(new FsPermission("077")) |
| } |
| |
| it("should reject insecure permission") { |
| def test(permission: String): Unit = { |
| val fileContext = mockFileContext(permission) |
| |
| intercept[IllegalArgumentException](new FileSystemStateStore(makeConf(), Some(fileContext))) |
| } |
| test("600") |
| test("400") |
| test("677") |
| test("670") |
| test("607") |
| } |
| |
| it("set should write with an intermediate file") { |
| val fileContext = mockFileContext("700") |
| val outputStream = mock[FSDataOutputStream] |
| when(fileContext.create(pathEq("/key.tmp"), any[util.EnumSet[CreateFlag]], any[CreateOpts])) |
| .thenReturn(outputStream) |
| |
| val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) |
| |
| stateStore.set("key", "value") |
| |
| verify(outputStream).write(""""value"""".getBytes) |
| verify(outputStream, atLeastOnce).close() |
| |
| |
| verify(fileContext).rename(pathEq("/key.tmp"), pathEq("/key"), equal(Rename.OVERWRITE)) |
| verify(fileContext).delete(pathEq("/.key.tmp.crc"), equal(false)) |
| } |
| |
| it("get should read file") { |
| val fileContext = mockFileContext("700") |
| abstract class MockInputStream extends InputStream with Seekable with PositionedReadable {} |
| val inputStream: InputStream = mock[MockInputStream] |
| when(inputStream.read(any[Array[Byte]](), anyInt(), anyInt())).thenAnswer(new Answer[Int] { |
| private var firstCall = true |
| override def answer(invocation: InvocationOnMock): Int = { |
| if (firstCall) { |
| firstCall = false |
| val buf = invocation.getArguments()(0).asInstanceOf[Array[Byte]] |
| val b = """"value"""".getBytes() |
| b.copyToArray(buf) |
| b.length |
| } else { |
| -1 |
| } |
| } |
| }) |
| |
| when(fileContext.open(pathEq("/key"))).thenReturn(new FSDataInputStream(inputStream)) |
| |
| val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) |
| |
| stateStore.get[String]("key") shouldBe Some("value") |
| |
| verify(inputStream, atLeastOnce).close() |
| } |
| |
| it("get non-existent key should return None") { |
| val fileContext = mockFileContext("700") |
| when(fileContext.open(any())).thenThrow(new FileNotFoundException("Unit test")) |
| |
| val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) |
| |
| stateStore.get[String]("key") shouldBe None |
| } |
| |
| it("getChildren should list file") { |
| val parentPath = "path" |
| def makeFileStatus(name: String): FileStatus = { |
| val fs = new FileStatus() |
| fs.setPath(new Path(parentPath, name)) |
| fs |
| } |
| val children = Seq("c1", "c2") |
| |
| val fileContext = mockFileContext("700") |
| val util = mock[FileContext#Util] |
| when(util.listStatus(pathEq(s"/$parentPath"))) |
| .thenReturn(children.map(makeFileStatus).toArray) |
| when(fileContext.util()).thenReturn(util) |
| |
| val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) |
| stateStore.getChildren(parentPath) should contain theSameElementsAs children |
| } |
| |
| def getChildrenErrorTest(error: Exception): Unit = { |
| val parentPath = "path" |
| |
| val fileContext = mockFileContext("700") |
| val util = mock[FileContext#Util] |
| when(util.listStatus(pathEq(s"/$parentPath"))).thenThrow(error) |
| when(fileContext.util()).thenReturn(util) |
| |
| val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) |
| stateStore.getChildren(parentPath) shouldBe empty |
| } |
| |
| it("getChildren should return empty list if the key doesn't exist") { |
| getChildrenErrorTest(new IOException("Unit test")) |
| } |
| |
| it("getChildren should return empty list if key doesn't exist") { |
| getChildrenErrorTest(new FileNotFoundException("Unit test")) |
| } |
| |
| it("remove should delete file") { |
| val fileContext = mockFileContext("700") |
| |
| val stateStore = new FileSystemStateStore(makeConf(), Some(fileContext)) |
| stateStore.remove("key") |
| |
| verify(fileContext).delete(pathEq("/key"), equal(false)) |
| } |
| |
| it("set safe mode ON and wait") { |
| val fileContext = mockFileContext("700") |
| val provider = spy(new FileSystemStateStore(makeConf(), Some(fileContext))) |
| val dfs = mock[DistributedFileSystem] |
| provider.isFsInSafeMode() |
| assert(!provider.isFsInSafeMode(dfs)) |
| } |
| |
| it("provider throws IllegalStateException when reaches 'N' " + |
| "max attempts to access HDFS file system") { |
| val provider = new SafeModeTestProvider(makeConfWithTwoSeconds(), |
| Some(mockFileContext("700"))) |
| provider.inSafeMode = true |
| intercept[IllegalStateException](provider.startSafeModeCheck()) |
| } |
| } |
| |
| private class SafeModeTestProvider(conf: LivyConf, context: Option[FileContext]) |
| extends FileSystemStateStore(conf, context) { |
| var inSafeMode = true |
| |
| override def isFsInSafeMode(): Boolean = inSafeMode |
| } |
| |
| } |