| // Copyright 2016 Twitter. All rights reserved. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.twitter.heron.statemgr.localfs; |
| |
| import java.nio.charset.Charset; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.util.concurrent.TimeUnit; |
| |
| import com.google.common.util.concurrent.ListenableFuture; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.mockito.Mockito; |
| import org.powermock.api.mockito.PowerMockito; |
| import org.powermock.core.classloader.annotations.PrepareForTest; |
| import org.powermock.modules.junit4.PowerMockRunner; |
| |
| import com.twitter.heron.api.generated.TopologyAPI; |
| import com.twitter.heron.common.basics.FileUtils; |
| import com.twitter.heron.proto.scheduler.Scheduler; |
| import com.twitter.heron.proto.system.ExecutionEnvironment; |
| import com.twitter.heron.proto.system.PackingPlans; |
| import com.twitter.heron.spi.common.Config; |
| import com.twitter.heron.spi.common.Keys; |
| import com.twitter.heron.spi.statemgr.Lock; |
| |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.anyBoolean; |
| import static org.mockito.Matchers.anyString; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Mockito.times; |
| |
| /** |
| * LocalFileSystemStateManager Tester. |
| */ |
| @RunWith(PowerMockRunner.class) |
| @PrepareForTest(FileUtils.class) |
| public class LocalFileSystemStateManagerTest { |
| |
| private static final String TOPOLOGY_NAME = "topologyName"; |
| private static final String LOCK_NAME = "lockName"; |
| private static final String ROOT_ADDR = "/"; |
| private LocalFileSystemStateManager manager; |
| |
| @Before |
| public void before() throws Exception { |
| manager = initMockManager(ROOT_ADDR, false); |
| } |
| |
| private static LocalFileSystemStateManager initMockManager(String rootPath, boolean initTree) { |
| Config config = Config.newBuilder() |
| .put(Keys.stateManagerRootPath(), rootPath) |
| .put(LocalFileSystemKeys.initializeFileTree(), initTree) |
| .build(); |
| LocalFileSystemStateManager manager = Mockito.spy(new LocalFileSystemStateManager()); |
| manager.initialize(config); |
| return manager; |
| } |
| |
| @After |
| public void after() throws Exception { |
| } |
| |
| private void initMocks() throws Exception { |
| PowerMockito.spy(FileUtils.class); |
| PowerMockito.doReturn(true).when(FileUtils.class, "createDirectory", anyString()); |
| |
| Assert.assertTrue(manager.initTree()); |
| |
| PowerMockito.doReturn(true) |
| .when(FileUtils.class, "writeToFile", anyString(), any(byte[].class), anyBoolean()); |
| |
| PowerMockito.doReturn(true).when(FileUtils.class, "deleteFile", anyString()); |
| } |
| |
| @Test |
| public void testInitialize() throws Exception { |
| initMocks(); |
| |
| PowerMockito.verifyStatic(Mockito.atLeastOnce()); |
| FileUtils.createDirectory(anyString()); |
| } |
| |
| /** |
| * Method: getSchedulerLocation(String topologyName, WatchCallback watcher) |
| */ |
| @Test |
| public void testGetSchedulerLocation() throws Exception { |
| Scheduler.SchedulerLocation location = Scheduler.SchedulerLocation.newBuilder() |
| .setHttpEndpoint("host:1") |
| .setTopologyName(TOPOLOGY_NAME) |
| .build(); |
| PowerMockito.spy(FileUtils.class); |
| PowerMockito.doReturn(true).when(FileUtils.class, "isFileExists", anyString()); |
| PowerMockito.doReturn(location.toByteArray()). |
| when(FileUtils.class, "readFromFile", anyString()); |
| |
| Scheduler.SchedulerLocation locationFetched = |
| manager.getSchedulerLocation(null, TOPOLOGY_NAME).get(); |
| |
| Assert.assertEquals(location, locationFetched); |
| } |
| |
| /** |
| * Method: setSchedulerLocation(Scheduler.SchedulerLocation location, String topologyName) |
| */ |
| @Test |
| public void testSetSchedulerLocation() throws Exception { |
| Mockito.doReturn(Mockito.mock(ListenableFuture.class)).when(manager). |
| setData(anyString(), any(byte[].class), anyBoolean()); |
| |
| manager.setSchedulerLocation(Scheduler.SchedulerLocation.getDefaultInstance(), ""); |
| Mockito.verify(manager). |
| setData(anyString(), any(byte[].class), eq(true)); |
| } |
| |
| /** |
| * Method: setExecutionState(ExecutionEnvironment.ExecutionState executionState) |
| */ |
| @Test |
| public void testSetExecutionState() throws Exception { |
| initMocks(); |
| ExecutionEnvironment.ExecutionState defaultState = |
| ExecutionEnvironment.ExecutionState.getDefaultInstance(); |
| |
| Assert.assertTrue(manager.setExecutionState(defaultState, "").get()); |
| |
| assertWriteToFile( |
| String.format("%s/%s/%s", ROOT_ADDR, "executionstate", defaultState.getTopologyName()), |
| defaultState.toByteArray(), false); |
| } |
| |
| /** |
| * Method: setTopology(TopologyAPI.Topology topology) |
| */ |
| @Test |
| public void testSetTopology() throws Exception { |
| initMocks(); |
| TopologyAPI.Topology topology = TopologyAPI.Topology.getDefaultInstance(); |
| |
| Assert.assertTrue(manager.setTopology(topology, TOPOLOGY_NAME).get()); |
| |
| assertWriteToFile( |
| String.format("%s/%s/%s", ROOT_ADDR, "topologies", TOPOLOGY_NAME), |
| topology.toByteArray(), false); |
| } |
| |
| @Test |
| public void testSetPackingPlan() throws Exception { |
| initMocks(); |
| PackingPlans.PackingPlan packingPlan = PackingPlans.PackingPlan.getDefaultInstance(); |
| |
| Assert.assertTrue(manager.setPackingPlan(packingPlan, TOPOLOGY_NAME).get()); |
| |
| assertWriteToFile( |
| String.format("%s/%s/%s", ROOT_ADDR, "packingplans", TOPOLOGY_NAME), |
| packingPlan.toByteArray(), true); |
| } |
| |
| /** |
| * Method: deleteExecutionState() |
| */ |
| @Test |
| public void testDeleteExecutionState() throws Exception { |
| initMocks(); |
| |
| Assert.assertTrue(manager.deleteExecutionState(TOPOLOGY_NAME).get()); |
| |
| assertDeleteFile(String.format("%s/%s/%s", ROOT_ADDR, "executionstate", TOPOLOGY_NAME)); |
| } |
| |
| /** |
| * Method: deleteTopology() |
| */ |
| @Test |
| public void testDeleteTopology() throws Exception { |
| initMocks(); |
| |
| Assert.assertTrue(manager.deleteTopology(TOPOLOGY_NAME).get()); |
| |
| assertDeleteFile(String.format("%s/%s/%s", ROOT_ADDR, "topologies", TOPOLOGY_NAME)); |
| } |
| |
| @Test |
| public void testDeletePackingPlan() throws Exception { |
| initMocks(); |
| |
| Assert.assertTrue(manager.deletePackingPlan(TOPOLOGY_NAME).get()); |
| |
| assertDeleteFile(String.format("%s/%s/%s", ROOT_ADDR, "packingplans", TOPOLOGY_NAME)); |
| } |
| |
| @Test |
| public void testGetLock() throws Exception { |
| initMocks(); |
| String expectedLockPath = String.format("//locks/%s__%s", TOPOLOGY_NAME, LOCK_NAME); |
| byte[] expectedContents = Thread.currentThread().getName().getBytes(Charset.defaultCharset()); |
| |
| Lock lock = manager.getLock(TOPOLOGY_NAME, LOCK_NAME); |
| |
| Assert.assertTrue(lock.tryLock(0, TimeUnit.MILLISECONDS)); |
| assertWriteToFile(expectedLockPath, expectedContents, false); |
| |
| lock.unlock(); |
| assertDeleteFile(expectedLockPath); |
| } |
| |
| @Test |
| public void testGetFilesystemLock() throws Exception { |
| Path tempDir = Files.createTempDirectory("heron-testGetFilesystemLock"); |
| LocalFileSystemStateManager fsBackedManager = initMockManager(tempDir.toString(), true); |
| Lock lock = fsBackedManager.getLock(TOPOLOGY_NAME, LOCK_NAME); |
| Assert.assertTrue("Failed to get lock", lock.tryLock(0, TimeUnit.MILLISECONDS)); |
| lock.unlock(); |
| } |
| |
| @Test |
| public void testLockTaken() throws Exception { |
| String expectedLockPath = String.format("//locks/%s__%s", TOPOLOGY_NAME, LOCK_NAME); |
| byte[] expectedContents = Thread.currentThread().getName().getBytes(Charset.defaultCharset()); |
| |
| PowerMockito.spy(FileUtils.class); |
| PowerMockito.doReturn(false) |
| .when(FileUtils.class, "writeToFile", anyString(), any(byte[].class), anyBoolean()); |
| |
| Lock lock = manager.getLock(TOPOLOGY_NAME, LOCK_NAME); |
| |
| Assert.assertFalse(lock.tryLock(0, TimeUnit.MILLISECONDS)); |
| assertWriteToFile(expectedLockPath, expectedContents, false); |
| } |
| |
| private static void assertWriteToFile(String path, byte[] bytes, boolean overwrite) { |
| PowerMockito.verifyStatic(times(1)); |
| FileUtils.writeToFile(eq(path), eq(bytes), eq(overwrite)); |
| } |
| |
| private static void assertDeleteFile(String path) { |
| PowerMockito.verifyStatic(times(1)); |
| FileUtils.deleteFile(eq(path)); |
| } |
| } |