blob: 722adb1cd2b97cbc9357afc33c75b8473ea49f52 [file] [log] [blame]
// Copyright 2016 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.statemgr.localfs;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ListenableFuture;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.twitter.heron.api.generated.TopologyAPI;
import com.twitter.heron.common.basics.FileUtils;
import com.twitter.heron.proto.scheduler.Scheduler;
import com.twitter.heron.proto.system.ExecutionEnvironment;
import com.twitter.heron.proto.system.PackingPlans;
import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.Keys;
import com.twitter.heron.spi.statemgr.Lock;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
/**
* LocalFileSystemStateManager Tester.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(FileUtils.class)
public class LocalFileSystemStateManagerTest {
private static final String TOPOLOGY_NAME = "topologyName";
private static final String LOCK_NAME = "lockName";
private static final String ROOT_ADDR = "/";
private LocalFileSystemStateManager manager;
@Before
public void before() throws Exception {
manager = initMockManager(ROOT_ADDR, false);
}
private static LocalFileSystemStateManager initMockManager(String rootPath, boolean initTree) {
Config config = Config.newBuilder()
.put(Keys.stateManagerRootPath(), rootPath)
.put(LocalFileSystemKeys.initializeFileTree(), initTree)
.build();
LocalFileSystemStateManager manager = Mockito.spy(new LocalFileSystemStateManager());
manager.initialize(config);
return manager;
}
@After
public void after() throws Exception {
}
private void initMocks() throws Exception {
PowerMockito.spy(FileUtils.class);
PowerMockito.doReturn(true).when(FileUtils.class, "createDirectory", anyString());
Assert.assertTrue(manager.initTree());
PowerMockito.doReturn(true)
.when(FileUtils.class, "writeToFile", anyString(), any(byte[].class), anyBoolean());
PowerMockito.doReturn(true).when(FileUtils.class, "deleteFile", anyString());
}
@Test
public void testInitialize() throws Exception {
initMocks();
PowerMockito.verifyStatic(Mockito.atLeastOnce());
FileUtils.createDirectory(anyString());
}
/**
* Method: getSchedulerLocation(String topologyName, WatchCallback watcher)
*/
@Test
public void testGetSchedulerLocation() throws Exception {
Scheduler.SchedulerLocation location = Scheduler.SchedulerLocation.newBuilder()
.setHttpEndpoint("host:1")
.setTopologyName(TOPOLOGY_NAME)
.build();
PowerMockito.spy(FileUtils.class);
PowerMockito.doReturn(true).when(FileUtils.class, "isFileExists", anyString());
PowerMockito.doReturn(location.toByteArray()).
when(FileUtils.class, "readFromFile", anyString());
Scheduler.SchedulerLocation locationFetched =
manager.getSchedulerLocation(null, TOPOLOGY_NAME).get();
Assert.assertEquals(location, locationFetched);
}
/**
* Method: setSchedulerLocation(Scheduler.SchedulerLocation location, String topologyName)
*/
@Test
public void testSetSchedulerLocation() throws Exception {
Mockito.doReturn(Mockito.mock(ListenableFuture.class)).when(manager).
setData(anyString(), any(byte[].class), anyBoolean());
manager.setSchedulerLocation(Scheduler.SchedulerLocation.getDefaultInstance(), "");
Mockito.verify(manager).
setData(anyString(), any(byte[].class), eq(true));
}
/**
* Method: setExecutionState(ExecutionEnvironment.ExecutionState executionState)
*/
@Test
public void testSetExecutionState() throws Exception {
initMocks();
ExecutionEnvironment.ExecutionState defaultState =
ExecutionEnvironment.ExecutionState.getDefaultInstance();
Assert.assertTrue(manager.setExecutionState(defaultState, "").get());
assertWriteToFile(
String.format("%s/%s/%s", ROOT_ADDR, "executionstate", defaultState.getTopologyName()),
defaultState.toByteArray(), false);
}
/**
* Method: setTopology(TopologyAPI.Topology topology)
*/
@Test
public void testSetTopology() throws Exception {
initMocks();
TopologyAPI.Topology topology = TopologyAPI.Topology.getDefaultInstance();
Assert.assertTrue(manager.setTopology(topology, TOPOLOGY_NAME).get());
assertWriteToFile(
String.format("%s/%s/%s", ROOT_ADDR, "topologies", TOPOLOGY_NAME),
topology.toByteArray(), false);
}
@Test
public void testSetPackingPlan() throws Exception {
initMocks();
PackingPlans.PackingPlan packingPlan = PackingPlans.PackingPlan.getDefaultInstance();
Assert.assertTrue(manager.setPackingPlan(packingPlan, TOPOLOGY_NAME).get());
assertWriteToFile(
String.format("%s/%s/%s", ROOT_ADDR, "packingplans", TOPOLOGY_NAME),
packingPlan.toByteArray(), true);
}
/**
* Method: deleteExecutionState()
*/
@Test
public void testDeleteExecutionState() throws Exception {
initMocks();
Assert.assertTrue(manager.deleteExecutionState(TOPOLOGY_NAME).get());
assertDeleteFile(String.format("%s/%s/%s", ROOT_ADDR, "executionstate", TOPOLOGY_NAME));
}
/**
* Method: deleteTopology()
*/
@Test
public void testDeleteTopology() throws Exception {
initMocks();
Assert.assertTrue(manager.deleteTopology(TOPOLOGY_NAME).get());
assertDeleteFile(String.format("%s/%s/%s", ROOT_ADDR, "topologies", TOPOLOGY_NAME));
}
@Test
public void testDeletePackingPlan() throws Exception {
initMocks();
Assert.assertTrue(manager.deletePackingPlan(TOPOLOGY_NAME).get());
assertDeleteFile(String.format("%s/%s/%s", ROOT_ADDR, "packingplans", TOPOLOGY_NAME));
}
@Test
public void testGetLock() throws Exception {
initMocks();
String expectedLockPath = String.format("//locks/%s__%s", TOPOLOGY_NAME, LOCK_NAME);
byte[] expectedContents = Thread.currentThread().getName().getBytes(Charset.defaultCharset());
Lock lock = manager.getLock(TOPOLOGY_NAME, LOCK_NAME);
Assert.assertTrue(lock.tryLock(0, TimeUnit.MILLISECONDS));
assertWriteToFile(expectedLockPath, expectedContents, false);
lock.unlock();
assertDeleteFile(expectedLockPath);
}
@Test
public void testGetFilesystemLock() throws Exception {
Path tempDir = Files.createTempDirectory("heron-testGetFilesystemLock");
LocalFileSystemStateManager fsBackedManager = initMockManager(tempDir.toString(), true);
Lock lock = fsBackedManager.getLock(TOPOLOGY_NAME, LOCK_NAME);
Assert.assertTrue("Failed to get lock", lock.tryLock(0, TimeUnit.MILLISECONDS));
lock.unlock();
}
@Test
public void testLockTaken() throws Exception {
String expectedLockPath = String.format("//locks/%s__%s", TOPOLOGY_NAME, LOCK_NAME);
byte[] expectedContents = Thread.currentThread().getName().getBytes(Charset.defaultCharset());
PowerMockito.spy(FileUtils.class);
PowerMockito.doReturn(false)
.when(FileUtils.class, "writeToFile", anyString(), any(byte[].class), anyBoolean());
Lock lock = manager.getLock(TOPOLOGY_NAME, LOCK_NAME);
Assert.assertFalse(lock.tryLock(0, TimeUnit.MILLISECONDS));
assertWriteToFile(expectedLockPath, expectedContents, false);
}
private static void assertWriteToFile(String path, byte[] bytes, boolean overwrite) {
PowerMockito.verifyStatic(times(1));
FileUtils.writeToFile(eq(path), eq(bytes), eq(overwrite));
}
private static void assertDeleteFile(String path) {
PowerMockito.verifyStatic(times(1));
FileUtils.deleteFile(eq(path));
}
}