STORM-3504: changed AsyncLocalizerTest to not mock AdvancedFSOps (#3126)
* STORM-3504: changed AsyncLocalizerTest to not mock AdvancedFSOps
diff --git a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
index 550c808..d3c1d74 100644
--- a/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/localizer/AsyncLocalizerTest.java
@@ -18,7 +18,6 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -28,13 +27,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
+
import org.apache.commons.io.IOUtils;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
@@ -53,13 +51,12 @@
import org.apache.storm.generated.SettableBlobMeta;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.security.auth.DefaultPrincipalToLocal;
+import org.apache.storm.testing.TmpPath;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
@@ -86,68 +83,66 @@
public class AsyncLocalizerTest {
private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizerTest.class);
+
private final String user1 = "user1";
private final String user2 = "user2";
private final String user3 = "user3";
- //From LocalizerTest
- private File baseDir;
- private ClientBlobStore mockblobstore = mock(ClientBlobStore.class);
- private static String getTestLocalizerRoot() {
- File f = new File("./target/" + Thread.currentThread().getStackTrace()[2].getMethodName() + "/localizer/");
- f.deleteOnExit();
- return f.getPath();
- }
+ private ClientBlobStore mockBlobStore = mock(ClientBlobStore.class);
@Test
public void testRequestDownloadBaseTopologyBlobs() throws Exception {
- final String topoId = "TOPO";
- final String user = "user";
- LocalAssignment la = new LocalAssignment();
- la.set_topology_id(topoId);
- la.set_owner(user);
- ExecutorInfo ei = new ExecutorInfo();
- ei.set_task_start(1);
- ei.set_task_end(1);
- la.add_to_executors(ei);
- final int port = 8080;
- final String stormLocal = "./target/DOWNLOAD-TEST/storm-local/";
- ClientBlobStore blobStore = mock(ClientBlobStore.class);
- Map<String, Object> conf = new HashMap<>();
- conf.put(DaemonConfig.SUPERVISOR_BLOBSTORE, ClientBlobStore.class.getName());
- conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
- conf.put(Config.STORM_CLUSTER_MODE, "distributed");
- conf.put(Config.STORM_LOCAL_DIR, stormLocal);
- AdvancedFSOps ops = mock(AdvancedFSOps.class);
- ReflectionUtils mockedRU = mock(ReflectionUtils.class);
- ServerUtils mockedU = mock(ServerUtils.class);
+ ReflectionUtils mockedReflectionUtils = mock(ReflectionUtils.class);
+ ServerUtils mockedServerUtils = mock(ServerUtils.class);
- AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, getTestLocalizerRoot(), new StormMetricsRegistry()));
- LocallyCachedTopologyBlob jarBlob = mock(LocallyCachedTopologyBlob.class);
- doReturn(jarBlob).when(bl).getTopoJar(topoId, la.get_owner());
- when(jarBlob.getLocalVersion()).thenReturn(-1L);
- when(jarBlob.getRemoteVersion(any())).thenReturn(100L);
- when(jarBlob.fetchUnzipToTemp(any())).thenReturn(100L);
+ ReflectionUtils previousReflectionUtils = ReflectionUtils.setInstance(mockedReflectionUtils);
+ ServerUtils previousServerUtils = ServerUtils.setInstance(mockedServerUtils);
- LocallyCachedTopologyBlob codeBlob = mock(LocallyCachedTopologyBlob.class);
- doReturn(codeBlob).when(bl).getTopoCode(topoId, la.get_owner());
- when(codeBlob.getLocalVersion()).thenReturn(-1L);
- when(codeBlob.getRemoteVersion(any())).thenReturn(200L);
- when(codeBlob.fetchUnzipToTemp(any())).thenReturn(200L);
+ // cannot use automatic resource management here in this try because the AsyncLocalizer depends on a config map,
+ // which should take the storm local dir, and that storm local dir is declared in the try-with-resources.
+ AsyncLocalizer victim = null;
- LocallyCachedTopologyBlob confBlob = mock(LocallyCachedTopologyBlob.class);
- doReturn(confBlob).when(bl).getTopoConf(topoId, la.get_owner());
- when(confBlob.getLocalVersion()).thenReturn(-1L);
- when(confBlob.getRemoteVersion(any())).thenReturn(300L);
- when(confBlob.fetchUnzipToTemp(any())).thenReturn(300L);
+ try (TmpPath stormRoot = new TmpPath(); TmpPath localizerRoot = new TmpPath()) {
- ReflectionUtils origRU = ReflectionUtils.setInstance(mockedRU);
- ServerUtils origUtils = ServerUtils.setInstance(mockedU);
- try {
- when(mockedRU.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(DaemonConfig.SUPERVISOR_BLOBSTORE, ClientBlobStore.class.getName());
+ conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, DefaultPrincipalToLocal.class.getName());
+ conf.put(Config.STORM_CLUSTER_MODE, "distributed");
+ conf.put(Config.STORM_LOCAL_DIR, stormRoot.getPath());
- PortAndAssignment pna = new PortAndAssignmentImpl(port, la);
- Future<Void> f = bl.requestDownloadBaseTopologyBlobs(pna, null);
+ AdvancedFSOps ops = AdvancedFSOps.make(conf);
+
+ victim = spy(new AsyncLocalizer(conf, ops, localizerRoot.getPath(), new StormMetricsRegistry()));
+
+ final String topoId = "TOPO";
+
+ final LocalAssignment localAssignment = constructLocalAssignment(topoId, "user");
+ final int port = 8080;
+
+ ClientBlobStore blobStore = mock(ClientBlobStore.class);
+
+ LocallyCachedTopologyBlob jarBlob = mock(LocallyCachedTopologyBlob.class);
+ doReturn(jarBlob).when(victim).getTopoJar(topoId, localAssignment.get_owner());
+ when(jarBlob.getLocalVersion()).thenReturn(-1L);
+ when(jarBlob.getRemoteVersion(any())).thenReturn(100L);
+ when(jarBlob.fetchUnzipToTemp(any())).thenReturn(100L);
+
+ LocallyCachedTopologyBlob codeBlob = mock(LocallyCachedTopologyBlob.class);
+ doReturn(codeBlob).when(victim).getTopoCode(topoId, localAssignment.get_owner());
+ when(codeBlob.getLocalVersion()).thenReturn(-1L);
+ when(codeBlob.getRemoteVersion(any())).thenReturn(200L);
+ when(codeBlob.fetchUnzipToTemp(any())).thenReturn(200L);
+
+ LocallyCachedTopologyBlob confBlob = mock(LocallyCachedTopologyBlob.class);
+ doReturn(confBlob).when(victim).getTopoConf(topoId, localAssignment.get_owner());
+ when(confBlob.getLocalVersion()).thenReturn(-1L);
+ when(confBlob.getRemoteVersion(any())).thenReturn(300L);
+ when(confBlob.fetchUnzipToTemp(any())).thenReturn(300L);
+
+ when(mockedReflectionUtils.newInstanceImpl(ClientBlobStore.class)).thenReturn(blobStore);
+
+ PortAndAssignment pna = new PortAndAssignmentImpl(port, localAssignment);
+ Future<Void> f = victim.requestDownloadBaseTopologyBlobs(pna, null);
f.get(20, TimeUnit.SECONDS);
verify(jarBlob).fetchUnzipToTemp(any());
@@ -161,92 +156,88 @@
verify(confBlob).fetchUnzipToTemp(any());
verify(confBlob).informReferencesAndCommitNewVersion(300L);
verify(confBlob).cleanupOrphanedData();
+
} finally {
- bl.close();
- ReflectionUtils.setInstance(origRU);
- ServerUtils.setInstance(origUtils);
+ ReflectionUtils.setInstance(previousReflectionUtils);
+ ServerUtils.setInstance(previousServerUtils);
+
+ if (victim != null) {
+ victim.close();
+ }
}
}
@Test
public void testRequestDownloadTopologyBlobs() throws Exception {
- final String topoId = "TOPO-12345";
- final String user = "user";
- LocalAssignment la = new LocalAssignment();
- la.set_topology_id(topoId);
- la.set_owner(user);
- ExecutorInfo ei = new ExecutorInfo();
- ei.set_task_start(1);
- ei.set_task_end(1);
- la.add_to_executors(ei);
- final String topoName = "TOPO";
- final int port = 8080;
- final String simpleLocalName = "simple.txt";
- final String simpleKey = "simple";
+ ConfigUtils mockedConfigUtils = mock(ConfigUtils.class);
+ ConfigUtils previousConfigUtils = ConfigUtils.setInstance(mockedConfigUtils);
- final String stormLocal = "/tmp/storm-local/";
- final File userDir = new File(stormLocal, user);
- final String stormRoot = stormLocal + topoId + "/";
+ AsyncLocalizer victim = null;
- final String localizerRoot = getTestLocalizerRoot();
- final String simpleCurrentLocalFile = localizerRoot + "/usercache/" + user + "/filecache/files/simple.current";
+ try (TmpPath stormLocal = new TmpPath(); TmpPath localizerRoot = new TmpPath()) {
- final StormTopology st = new StormTopology();
- st.set_spouts(new HashMap<>());
- st.set_bolts(new HashMap<>());
- st.set_state_spouts(new HashMap<>());
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.STORM_LOCAL_DIR, stormLocal.getPath());
- Map<String, Map<String, Object>> topoBlobMap = new HashMap<>();
- Map<String, Object> simple = new HashMap<>();
- simple.put("localname", simpleLocalName);
- simple.put("uncompress", false);
- topoBlobMap.put(simpleKey, simple);
+ AdvancedFSOps ops = AdvancedFSOps.make(conf);
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
- Map<String, Object> conf = new HashMap<>();
- conf.put(Config.STORM_LOCAL_DIR, stormLocal);
- AdvancedFSOps ops = mock(AdvancedFSOps.class);
- ConfigUtils mockedCU = mock(ConfigUtils.class);
+ victim = spy(new AsyncLocalizer(conf, ops, localizerRoot.getPath(), metricsRegistry));
- Map<String, Object> topoConf = new HashMap<>(conf);
- topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
- topoConf.put(Config.TOPOLOGY_NAME, topoName);
+ final String topoId = "TOPO-12345";
+ final String user = "user";
- List<LocalizedResource> localizedList = new ArrayList<>();
- StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
- LocalizedResource simpleLocal = new LocalizedResource(simpleKey, Paths.get(localizerRoot), false, ops, conf, user, metricsRegistry);
- localizedList.add(simpleLocal);
+ final Path userDir = Paths.get(stormLocal.getPath(), user);
+ final Path topologyDirRoot = Paths.get(stormLocal.getPath(), topoId);
- AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, localizerRoot, metricsRegistry));
- ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
- try {
- when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
- when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
- when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st);
+ final String simpleLocalName = "simple.txt";
+ final String simpleKey = "simple";
+ Map<String, Map<String, Object>> topoBlobMap = new HashMap<>();
+ Map<String, Object> simple = new HashMap<>();
+ simple.put("localname", simpleLocalName);
+ simple.put("uncompress", false);
+ topoBlobMap.put(simpleKey, simple);
+
+ final int port = 8080;
+
+ Map<String, Object> topoConf = new HashMap<>(conf);
+ topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
+ topoConf.put(Config.TOPOLOGY_NAME, "TOPO");
+
+ List<LocalizedResource> localizedList = new ArrayList<>();
+ LocalizedResource simpleLocal = new LocalizedResource(simpleKey, localizerRoot.getFile().toPath(), false, ops, conf, user, metricsRegistry);
+ localizedList.add(simpleLocal);
+
+ when(mockedConfigUtils.supervisorStormDistRootImpl(conf, topoId)).thenReturn(topologyDirRoot.toString());
+ when(mockedConfigUtils.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
+ when(mockedConfigUtils.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(constructEmptyStormTopology());
//Write the mocking backwards so the actual method is not called on the spy object
- doReturn(CompletableFuture.supplyAsync(() -> null)).when(bl)
- .requestDownloadBaseTopologyBlobs(any(), eq(null));
- doReturn(userDir).when(bl).getLocalUserFileCacheDir(user);
- doReturn(localizedList).when(bl).getBlobs(any(List.class), any(), any());
+ doReturn(CompletableFuture.supplyAsync(() -> null)).when(victim)
+ .requestDownloadBaseTopologyBlobs(any(), eq(null));
- Future<Void> f = bl.requestDownloadTopologyBlobs(la, port, null);
+ Files.createDirectories(topologyDirRoot);
+
+ doReturn(userDir.toFile()).when(victim).getLocalUserFileCacheDir(user);
+ doReturn(localizedList).when(victim).getBlobs(any(List.class), any(), any());
+
+ Future<Void> f = victim.requestDownloadTopologyBlobs(constructLocalAssignment(topoId, user), port, null);
f.get(20, TimeUnit.SECONDS);
// We should be done now...
- verify(bl).getLocalUserFileCacheDir(user);
+ verify(victim).getLocalUserFileCacheDir(user);
- verify(ops).fileExists(userDir);
- verify(ops).forceMkdir(userDir);
+ assertTrue(ops.fileExists(userDir));
- verify(bl).getBlobs(any(List.class), any(), any());
+ verify(victim).getBlobs(any(List.class), any(), any());
- verify(ops).createSymlink(new File(stormRoot, simpleLocalName), new File(simpleCurrentLocalFile));
+ // symlink was created
+ assertTrue(Files.isSymbolicLink(topologyDirRoot.resolve(simpleLocalName)));
+
} finally {
- try {
- ConfigUtils.setInstance(orig);
- bl.close();
- } catch (Throwable e) {
- LOG.error("ERROR trying to close an object", e);
+ ConfigUtils.setInstance(previousConfigUtils);
+ if (victim != null) {
+ victim.close();
}
}
}
@@ -255,226 +246,220 @@
@Test
public void testRequestDownloadTopologyBlobsLocalMode() throws Exception {
// tests download of topology blobs in local mode on a topology without resources folder
- final String topoId = "TOPO-12345";
- final String user = "user";
- LocalAssignment la = new LocalAssignment();
- la.set_topology_id(topoId);
- la.set_owner(user);
- ExecutorInfo ei = new ExecutorInfo();
- ei.set_task_start(1);
- ei.set_task_end(1);
- la.add_to_executors(ei);
- final String topoName = "TOPO";
- final int port = 8080;
- final String simpleLocalName = "simple.txt";
- final String simpleKey = "simple";
- final String stormLocal = "/tmp/storm-local/";
- final File userDir = new File(stormLocal, user);
- final String stormRoot = stormLocal + topoId + "/";
+ ConfigUtils mockedConfigUtils = mock(ConfigUtils.class);
+ ServerUtils mockedServerUtils = mock(ServerUtils.class);
- final String localizerRoot = getTestLocalizerRoot();
+ ConfigUtils previousConfigUtils = ConfigUtils.setInstance(mockedConfigUtils);
+ ServerUtils previousServerUtils = ServerUtils.setInstance(mockedServerUtils);
- final StormTopology st = new StormTopology();
- st.set_spouts(new HashMap<>());
- st.set_bolts(new HashMap<>());
- st.set_state_spouts(new HashMap<>());
+ AsyncLocalizer victim = null;
- Map<String, Map<String, Object>> topoBlobMap = new HashMap<>();
- Map<String, Object> simple = new HashMap<>();
- simple.put("localname", simpleLocalName);
- simple.put("uncompress", false);
- topoBlobMap.put(simpleKey, simple);
+ try (TmpPath stormLocal = new TmpPath(); TmpPath localizerRoot = new TmpPath()) {
- Map<String, Object> conf = new HashMap<>();
- conf.put(Config.STORM_LOCAL_DIR, stormLocal);
- conf.put(Config.STORM_CLUSTER_MODE, "local");
- AdvancedFSOps ops = mock(AdvancedFSOps.class);
- ConfigUtils mockedCU = mock(ConfigUtils.class);
- ServerUtils mockedSU = mock(ServerUtils.class);
+ Map<String, Object> conf = new HashMap<>();
+ conf.put(Config.STORM_LOCAL_DIR, stormLocal.getPath());
+ conf.put(Config.STORM_CLUSTER_MODE, "local");
- Map<String, Object> topoConf = new HashMap<>(conf);
- topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
- topoConf.put(Config.TOPOLOGY_NAME, topoName);
+ StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
- List<LocalizedResource> localizedList = new ArrayList<>();
- StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
- LocalizedResource simpleLocal = new LocalizedResource(simpleKey, Paths.get(localizerRoot), false, ops, conf, user, metricsRegistry);
- localizedList.add(simpleLocal);
+ AdvancedFSOps ops = AdvancedFSOps.make(conf);
- AsyncLocalizer bl = spy(new AsyncLocalizer(conf, ops, localizerRoot, metricsRegistry));
- ConfigUtils orig = ConfigUtils.setInstance(mockedCU);
- ServerUtils origSU = ServerUtils.setInstance(mockedSU);
+ victim = spy(new AsyncLocalizer(conf, ops, localizerRoot.getPath(), metricsRegistry));
- try {
- when(mockedCU.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot);
- when(mockedCU.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
- when(mockedCU.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(st);
+ final String topoId = "TOPO-12345";
+ final String user = "user";
- doReturn(mockblobstore).when(bl).getClientBlobStore();
- doReturn(userDir).when(bl).getLocalUserFileCacheDir(user);
- doReturn(localizedList).when(bl).getBlobs(any(List.class), any(), any());
- doReturn(mock(OutputStream.class)).when(ops).getOutputStream(any());
+ final int port = 8080;
+
+ final Path userDir = Paths.get(stormLocal.getPath(), user);
+ final Path stormRoot = Paths.get(stormLocal.getPath(), topoId);
+
+ final String simpleLocalName = "simple.txt";
+ final String simpleKey = "simple";
+ Map<String, Map<String, Object>> topoBlobMap = new HashMap<>();
+ Map<String, Object> simple = new HashMap<>();
+ simple.put("localname", simpleLocalName);
+ simple.put("uncompress", false);
+ topoBlobMap.put(simpleKey, simple);
+
+
+ Map<String, Object> topoConf = new HashMap<>(conf);
+ topoConf.put(Config.TOPOLOGY_BLOBSTORE_MAP, topoBlobMap);
+ topoConf.put(Config.TOPOLOGY_NAME, "TOPO");
+
+ List<LocalizedResource> localizedList = new ArrayList<>();
+ LocalizedResource simpleLocal = new LocalizedResource(simpleKey, localizerRoot.getFile().toPath(), false, ops, conf, user, metricsRegistry);
+ localizedList.add(simpleLocal);
+
+ when(mockedConfigUtils.supervisorStormDistRootImpl(conf, topoId)).thenReturn(stormRoot.toString());
+ when(mockedConfigUtils.readSupervisorStormConfImpl(conf, topoId)).thenReturn(topoConf);
+ when(mockedConfigUtils.readSupervisorTopologyImpl(conf, topoId, ops)).thenReturn(constructEmptyStormTopology());
+
+ doReturn(mockBlobStore).when(victim).getClientBlobStore();
+ doReturn(userDir.toFile()).when(victim).getLocalUserFileCacheDir(user);
+ doReturn(localizedList).when(victim).getBlobs(any(List.class), any(), any());
ReadableBlobMeta blobMeta = new ReadableBlobMeta();
blobMeta.set_version(1);
- doReturn(blobMeta).when(mockblobstore).getBlobMeta(any());
- when(mockblobstore.getBlob(any())).thenAnswer(invocation -> new TestInputStreamWithMeta(LOCAL_MODE_JAR_VERSION));
+ doReturn(blobMeta).when(mockBlobStore).getBlobMeta(any());
+ when(mockBlobStore.getBlob(any())).thenAnswer(invocation -> new TestInputStreamWithMeta(LOCAL_MODE_JAR_VERSION));
- Future<Void> f = bl.requestDownloadTopologyBlobs(la, port, null);
+ Future<Void> f = victim.requestDownloadTopologyBlobs(constructLocalAssignment(topoId, user), port, null);
f.get(20, TimeUnit.SECONDS);
- verify(bl).getLocalUserFileCacheDir(user);
+ verify(victim).getLocalUserFileCacheDir(user);
- verify(ops).fileExists(userDir);
- verify(ops).forceMkdir(userDir);
+ assertTrue(ops.fileExists(userDir));
- verify(bl).getBlobs(any(List.class), any(), any());
+ verify(victim).getBlobs(any(List.class), any(), any());
- Path extractionDir = Paths.get(stormRoot,
- LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR.getTempExtractionDir(LOCAL_MODE_JAR_VERSION));
-
- // make sure resources dir is created.
- verify(ops).forceMkdir(extractionDir);
+ // make sure resources directory after blob version commit is created.
+ Path extractionDir = stormRoot.resolve(LocallyCachedTopologyBlob.TopologyBlobType.TOPO_JAR.getExtractionDir());
+ assertTrue(ops.fileExists(extractionDir));
} finally {
- try {
- ConfigUtils.setInstance(orig);
- ServerUtils.setInstance(origSU);
- bl.close();
- } catch (Throwable e) {
- LOG.error("ERROR trying to close an object", e);
+ ConfigUtils.setInstance(previousConfigUtils);
+ ServerUtils.setInstance(previousServerUtils);
+
+ if (victim != null) {
+ victim.close();
}
}
}
- @Before
- public void setUp() throws Exception {
- baseDir = new File(System.getProperty("java.io.tmpdir") + "/blob-store-localizer-test-" + UUID.randomUUID());
- if (!baseDir.mkdir()) {
- throw new IOException("failed to create base directory");
- }
+ private LocalAssignment constructLocalAssignment(String topoId, String owner) {
+ return constructLocalAssignment(topoId, owner,
+ Collections.singletonList(new ExecutorInfo(1, 1))
+ );
}
- @After
- public void tearDown() throws Exception {
- try {
- FileUtils.deleteDirectory(baseDir);
- } catch (IOException ignore) {
- }
+ private LocalAssignment constructLocalAssignment(String topoId, String owner, List<ExecutorInfo> executorInfos) {
+ LocalAssignment assignment = new LocalAssignment(topoId, executorInfos);
+ assignment.set_owner(owner);
+ return assignment;
}
- ;
+ private StormTopology constructEmptyStormTopology() {
+ StormTopology topology = new StormTopology();
+ topology.set_spouts(new HashMap<>());
+ topology.set_bolts(new HashMap<>());
+ topology.set_state_spouts(new HashMap<>());
+ return topology;
+ }
- protected String joinPath(String... pathList) {
+ private String joinPath(String... pathList) {
return Joiner.on(File.separator).join(pathList);
}
- public String constructUserCacheDir(String base, String user) {
+ private String constructUserCacheDir(String base, String user) {
return joinPath(base, USERCACHE, user);
}
- public String constructExpectedFilesDir(String base, String user) {
+ private String constructExpectedFilesDir(String base, String user) {
return joinPath(constructUserCacheDir(base, user), LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
}
- public String constructExpectedArchivesDir(String base, String user) {
+ private String constructExpectedArchivesDir(String base, String user) {
return joinPath(constructUserCacheDir(base, user), LocalizedResource.FILECACHE, LocalizedResource.ARCHIVESDIR);
}
@Test
public void testDirPaths() throws Exception {
- Map<String, Object> conf = new HashMap();
- AsyncLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ try (TmpPath tmp = new TmpPath()) {
+ Map<String, Object> conf = new HashMap();
+ AsyncLocalizer localizer = new TestLocalizer(conf, tmp.getPath());
- String expectedDir = constructUserCacheDir(baseDir.toString(), user1);
- assertEquals("get local user dir doesn't return right value",
- expectedDir, localizer.getLocalUserDir(user1).toString());
+ String expectedDir = constructUserCacheDir(tmp.getPath(), user1);
+ assertEquals("get local user dir doesn't return right value",
+ expectedDir, localizer.getLocalUserDir(user1).toString());
- String expectedFileDir = joinPath(expectedDir, LocalizedResource.FILECACHE);
- assertEquals("get local user file dir doesn't return right value",
- expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString());
+ String expectedFileDir = joinPath(expectedDir, LocalizedResource.FILECACHE);
+ assertEquals("get local user file dir doesn't return right value",
+ expectedFileDir, localizer.getLocalUserFileCacheDir(user1).toString());
+ }
}
@Test
public void testReconstruct() throws Exception {
- Map<String, Object> conf = new HashMap<>();
+ try (TmpPath tmp = new TmpPath()){
+ Map<String, Object> conf = new HashMap<>();
- String expectedFileDir1 = constructExpectedFilesDir(baseDir.toString(), user1);
- String expectedArchiveDir1 = constructExpectedArchivesDir(baseDir.toString(), user1);
- String expectedFileDir2 = constructExpectedFilesDir(baseDir.toString(), user2);
- String expectedArchiveDir2 = constructExpectedArchivesDir(baseDir.toString(), user2);
+ String expectedFileDir1 = constructExpectedFilesDir(tmp.getPath(), user1);
+ String expectedArchiveDir1 = constructExpectedArchivesDir(tmp.getPath(), user1);
+ String expectedFileDir2 = constructExpectedFilesDir(tmp.getPath(), user2);
+ String expectedArchiveDir2 = constructExpectedArchivesDir(tmp.getPath(), user2);
- String key1 = "testfile1.txt";
- String key2 = "testfile2.txt";
- String key3 = "testfile3.txt";
- String key4 = "testfile4.txt";
+ String key1 = "testfile1.txt";
+ String key2 = "testfile2.txt";
+ String key3 = "testfile3.txt";
+ String key4 = "testfile4.txt";
- String archive1 = "archive1";
- String archive2 = "archive2";
+ String archive1 = "archive1";
+ String archive2 = "archive2";
- File user1file1 = new File(expectedFileDir1, key1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- File user1file2 = new File(expectedFileDir1, key2 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- File user2file3 = new File(expectedFileDir2, key3 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- File user2file4 = new File(expectedFileDir2, key4 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ File user1file1 = new File(expectedFileDir1, key1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ File user1file2 = new File(expectedFileDir1, key2 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ File user2file3 = new File(expectedFileDir2, key3 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ File user2file4 = new File(expectedFileDir2, key4 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- File user1archive1 = new File(expectedArchiveDir1, archive1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- File user2archive2 = new File(expectedArchiveDir2, archive2 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- File user1archive1file = new File(user1archive1, "file1");
- File user2archive2file = new File(user2archive2, "file2");
+ File user1archive1 = new File(expectedArchiveDir1, archive1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ File user2archive2 = new File(expectedArchiveDir2, archive2 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ File user1archive1file = new File(user1archive1, "file1");
+ File user2archive2file = new File(user2archive2, "file2");
- // setup some files/dirs to emulate supervisor restart
- assertTrue("Failed setup filecache dir1", new File(expectedFileDir1).mkdirs());
- assertTrue("Failed setup filecache dir2", new File(expectedFileDir2).mkdirs());
- assertTrue("Failed setup file1", user1file1.createNewFile());
- assertTrue("Failed setup file2", user1file2.createNewFile());
- assertTrue("Failed setup file3", user2file3.createNewFile());
- assertTrue("Failed setup file4", user2file4.createNewFile());
- assertTrue("Failed setup archive dir1", user1archive1.mkdirs());
- assertTrue("Failed setup archive dir2", user2archive2.mkdirs());
- assertTrue("Failed setup file in archivedir1", user1archive1file.createNewFile());
- assertTrue("Failed setup file in archivedir2", user2archive2file.createNewFile());
+ // setup some files/dirs to emulate supervisor restart
+ assertTrue("Failed setup filecache dir1", new File(expectedFileDir1).mkdirs());
+ assertTrue("Failed setup filecache dir2", new File(expectedFileDir2).mkdirs());
+ assertTrue("Failed setup file1", user1file1.createNewFile());
+ assertTrue("Failed setup file2", user1file2.createNewFile());
+ assertTrue("Failed setup file3", user2file3.createNewFile());
+ assertTrue("Failed setup file4", user2file4.createNewFile());
+ assertTrue("Failed setup archive dir1", user1archive1.mkdirs());
+ assertTrue("Failed setup archive dir2", user2archive2.mkdirs());
+ assertTrue("Failed setup file in archivedir1", user1archive1file.createNewFile());
+ assertTrue("Failed setup file in archivedir2", user2archive2file.createNewFile());
- TestLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ TestLocalizer localizer = new TestLocalizer(conf, tmp.getPath());
- ArrayList<LocalResource> arrUser1Keys = new ArrayList<>();
- arrUser1Keys.add(new LocalResource(key1, false, false));
- arrUser1Keys.add(new LocalResource(archive1, true, false));
- LocalAssignment topo1 = new LocalAssignment("topo1", Collections.emptyList());
- topo1.set_owner(user1);
- localizer.addReferences(arrUser1Keys, new PortAndAssignmentImpl(1, topo1), null);
+ ArrayList<LocalResource> arrUser1Keys = new ArrayList<>();
+ arrUser1Keys.add(new LocalResource(key1, false, false));
+ arrUser1Keys.add(new LocalResource(archive1, true, false));
+ LocalAssignment topo1 = constructLocalAssignment("topo1", user1, Collections.emptyList());
+ localizer.addReferences(arrUser1Keys, new PortAndAssignmentImpl(1, topo1), null);
- ConcurrentMap<String, LocalizedResource> lrsrcFiles = localizer.getUserFiles().get(user1);
- ConcurrentMap<String, LocalizedResource> lrsrcArchives = localizer.getUserArchives().get(user1);
- assertEquals("local resource set size wrong", 3, lrsrcFiles.size() + lrsrcArchives.size());
- LocalizedResource key1rsrc = lrsrcFiles.get(key1);
- assertNotNull("Local resource doesn't exist but should", key1rsrc);
- assertEquals("key doesn't match", key1, key1rsrc.getKey());
- assertEquals("references doesn't match " + key1rsrc.getDependencies(), true, key1rsrc.isUsed());
- LocalizedResource key2rsrc = lrsrcFiles.get(key2);
- assertNotNull("Local resource doesn't exist but should", key2rsrc);
- assertEquals("key doesn't match", key2, key2rsrc.getKey());
- assertEquals("refcount doesn't match " + key2rsrc.getDependencies(), false, key2rsrc.isUsed());
- LocalizedResource archive1rsrc = lrsrcArchives.get(archive1);
- assertNotNull("Local resource doesn't exist but should", archive1rsrc);
- assertEquals("key doesn't match", archive1, archive1rsrc.getKey());
- assertEquals("refcount doesn't match " + archive1rsrc.getDependencies(), true, archive1rsrc.isUsed());
+ ConcurrentMap<String, LocalizedResource> lrsrcFiles = localizer.getUserFiles().get(user1);
+ ConcurrentMap<String, LocalizedResource> lrsrcArchives = localizer.getUserArchives().get(user1);
+ assertEquals("local resource set size wrong", 3, lrsrcFiles.size() + lrsrcArchives.size());
+ LocalizedResource key1rsrc = lrsrcFiles.get(key1);
+ assertNotNull("Local resource doesn't exist but should", key1rsrc);
+ assertEquals("key doesn't match", key1, key1rsrc.getKey());
+ assertEquals("references doesn't match " + key1rsrc.getDependencies(), true, key1rsrc.isUsed());
+ LocalizedResource key2rsrc = lrsrcFiles.get(key2);
+ assertNotNull("Local resource doesn't exist but should", key2rsrc);
+ assertEquals("key doesn't match", key2, key2rsrc.getKey());
+ assertEquals("refcount doesn't match " + key2rsrc.getDependencies(), false, key2rsrc.isUsed());
+ LocalizedResource archive1rsrc = lrsrcArchives.get(archive1);
+ assertNotNull("Local resource doesn't exist but should", archive1rsrc);
+ assertEquals("key doesn't match", archive1, archive1rsrc.getKey());
+ assertEquals("refcount doesn't match " + archive1rsrc.getDependencies(), true, archive1rsrc.isUsed());
- ConcurrentMap<String, LocalizedResource> lrsrcFiles2 = localizer.getUserFiles().get(user2);
- ConcurrentMap<String, LocalizedResource> lrsrcArchives2 = localizer.getUserArchives().get(user2);
- assertEquals("local resource set size wrong", 3, lrsrcFiles2.size() + lrsrcArchives2.size());
- LocalizedResource key3rsrc = lrsrcFiles2.get(key3);
- assertNotNull("Local resource doesn't exist but should", key3rsrc);
- assertEquals("key doesn't match", key3, key3rsrc.getKey());
- assertEquals("refcount doesn't match " + key3rsrc.getDependencies(), false, key3rsrc.isUsed());
- LocalizedResource key4rsrc = lrsrcFiles2.get(key4);
- assertNotNull("Local resource doesn't exist but should", key4rsrc);
- assertEquals("key doesn't match", key4, key4rsrc.getKey());
- assertEquals("refcount doesn't match " + key4rsrc.getDependencies(), false, key4rsrc.isUsed());
- LocalizedResource archive2rsrc = lrsrcArchives2.get(archive2);
- assertNotNull("Local resource doesn't exist but should", archive2rsrc);
- assertEquals("key doesn't match", archive2, archive2rsrc.getKey());
- assertEquals("refcount doesn't match " + archive2rsrc.getDependencies(), false, archive2rsrc.isUsed());
+ ConcurrentMap<String, LocalizedResource> lrsrcFiles2 = localizer.getUserFiles().get(user2);
+ ConcurrentMap<String, LocalizedResource> lrsrcArchives2 = localizer.getUserArchives().get(user2);
+ assertEquals("local resource set size wrong", 3, lrsrcFiles2.size() + lrsrcArchives2.size());
+ LocalizedResource key3rsrc = lrsrcFiles2.get(key3);
+ assertNotNull("Local resource doesn't exist but should", key3rsrc);
+ assertEquals("key doesn't match", key3, key3rsrc.getKey());
+ assertEquals("refcount doesn't match " + key3rsrc.getDependencies(), false, key3rsrc.isUsed());
+ LocalizedResource key4rsrc = lrsrcFiles2.get(key4);
+ assertNotNull("Local resource doesn't exist but should", key4rsrc);
+ assertEquals("key doesn't match", key4, key4rsrc.getKey());
+ assertEquals("refcount doesn't match " + key4rsrc.getDependencies(), false, key4rsrc.isUsed());
+ LocalizedResource archive2rsrc = lrsrcArchives2.get(archive2);
+ assertNotNull("Local resource doesn't exist but should", archive2rsrc);
+ assertEquals("key doesn't match", archive2, archive2rsrc.getKey());
+ assertEquals("refcount doesn't match " + archive2rsrc.getDependencies(), false, archive2rsrc.isUsed());
+ }
}
@Test
@@ -513,7 +498,7 @@
// Windows should set this to false cause symlink in compressed file doesn't work properly.
supportSymlinks = false;
}
- try (Time.SimulatedTime st = new Time.SimulatedTime()) {
+ try (Time.SimulatedTime st = new Time.SimulatedTime(); TmpPath tmp = new TmpPath()) {
Map<String, Object> conf = new HashMap<>();
// set clean time really high so doesn't kick in
@@ -522,16 +507,16 @@
String key1 = archiveFile.getName();
String topo1 = "topo1";
LOG.info("About to create new AsyncLocalizer...");
- TestLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ TestLocalizer localizer = new TestLocalizer(conf, tmp.getPath());
// set really small so will do cleanup
localizer.setTargetCacheSize(1);
LOG.info("created AsyncLocalizer...");
ReadableBlobMeta rbm = new ReadableBlobMeta();
rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING));
- when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+ when(mockBlobStore.getBlobMeta(key1)).thenReturn(rbm);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new
+ when(mockBlobStore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(new
FileInputStream(archiveFile.getAbsolutePath()),
0, archiveFile.length()));
@@ -539,15 +524,14 @@
Time.advanceTime(10);
File user1Dir = localizer.getLocalUserFileCacheDir(user1);
assertTrue("failed to create user dir", user1Dir.mkdirs());
- LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
- topo1Assignment.set_owner(user1);
+ LocalAssignment topo1Assignment = constructLocalAssignment(topo1, user1, Collections.emptyList());
PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, true, false), topo1Pna, null);
Time.advanceTime(10);
long timeAfter = Time.currentTimeMillis();
Time.advanceTime(10);
- String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+ String expectedUserDir = joinPath(tmp.getPath(), USERCACHE, user1);
String expectedFileDir = joinPath(expectedUserDir, LocalizedResource.FILECACHE, LocalizedResource.ARCHIVESDIR);
assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
File keyFile = new File(expectedFileDir, key1 + ".0");
@@ -602,36 +586,35 @@
@Test
public void testBasic() throws Exception {
- try (Time.SimulatedTime st = new Time.SimulatedTime()) {
+ try (Time.SimulatedTime st = new Time.SimulatedTime(); TmpPath tmp = new TmpPath()) {
Map<String, Object> conf = new HashMap();
// set clean time really high so doesn't kick in
conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
String key1 = "key1";
String topo1 = "topo1";
- TestLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ TestLocalizer localizer = new TestLocalizer(conf, tmp.getPath());
// set really small so will do cleanup
localizer.setTargetCacheSize(1);
ReadableBlobMeta rbm = new ReadableBlobMeta();
rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING));
- when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
+ when(mockBlobStore.getBlobMeta(key1)).thenReturn(rbm);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(1));
+ when(mockBlobStore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(1));
long timeBefore = Time.currentTimeMillis();
Time.advanceTime(10);
File user1Dir = localizer.getLocalUserFileCacheDir(user1);
assertTrue("failed to create user dir", user1Dir.mkdirs());
Time.advanceTime(10);
- LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
- topo1Assignment.set_owner(user1);
+ LocalAssignment topo1Assignment = constructLocalAssignment(topo1, user1, Collections.emptyList());
PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
long timeAfter = Time.currentTimeMillis();
Time.advanceTime(10);
- String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
+ String expectedUserDir = joinPath(tmp.getPath(), USERCACHE, user1);
String expectedFileDir = joinPath(expectedUserDir, LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
File keyFile = new File(expectedFileDir, key1 + ".current");
@@ -678,7 +661,7 @@
@Test
public void testMultipleKeysOneUser() throws Exception {
- try (Time.SimulatedTime st = new Time.SimulatedTime()) {
+ try (Time.SimulatedTime st = new Time.SimulatedTime(); TmpPath tmp = new TmpPath()) {
Map<String, Object> conf = new HashMap<>();
// set clean time really high so doesn't kick in
conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1_000);
@@ -687,32 +670,31 @@
String topo1 = "topo1";
String key2 = "key2";
String key3 = "key3";
- TestLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ TestLocalizer localizer = new TestLocalizer(conf, tmp.getPath());
// set to keep 2 blobs (each of size 34)
localizer.setTargetCacheSize(68);
ReadableBlobMeta rbm = new ReadableBlobMeta();
rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING));
- when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
- when(mockblobstore.isRemoteBlobExists(anyString())).thenReturn(true);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(0));
- when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta(0));
- when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta(0));
+ when(mockBlobStore.getBlobMeta(anyString())).thenReturn(rbm);
+ when(mockBlobStore.isRemoteBlobExists(anyString())).thenReturn(true);
+ when(mockBlobStore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(0));
+ when(mockBlobStore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta(0));
+ when(mockBlobStore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta(0));
List<LocalResource> keys = Arrays.asList(new LocalResource(key1, false, false),
new LocalResource(key2, false, false), new LocalResource(key3, false, false));
File user1Dir = localizer.getLocalUserFileCacheDir(user1);
assertTrue("failed to create user dir", user1Dir.mkdirs());
- LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
- topo1Assignment.set_owner(user1);
+ LocalAssignment topo1Assignment = constructLocalAssignment(topo1, user1, Collections.emptyList());
PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
List<LocalizedResource> lrsrcs = localizer.getBlobs(keys, topo1Pna, null);
LocalizedResource lrsrc = lrsrcs.get(0);
LocalizedResource lrsrc2 = lrsrcs.get(1);
LocalizedResource lrsrc3 = lrsrcs.get(2);
- String expectedFileDir = joinPath(baseDir.toString(), USERCACHE, user1,
+ String expectedFileDir = joinPath(tmp.getPath(), USERCACHE, user1,
LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
File keyFile = new File(expectedFileDir, key1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
@@ -782,31 +764,32 @@
@Test(expected = AuthorizationException.class)
public void testFailAcls() throws Exception {
- Map<String, Object> conf = new HashMap();
- // set clean time really high so doesn't kick in
- conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
- // enable blobstore acl validation
- conf.put(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED, true);
+ try (TmpPath tmp = new TmpPath()) {
+ Map<String, Object> conf = new HashMap();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
+ // enable blobstore acl validation
+ conf.put(Config.STORM_BLOBSTORE_ACL_VALIDATION_ENABLED, true);
- String topo1 = "topo1";
- String key1 = "key1";
- TestLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ String topo1 = "topo1";
+ String key1 = "key1";
+ TestLocalizer localizer = new TestLocalizer(conf, tmp.getPath());
- ReadableBlobMeta rbm = new ReadableBlobMeta();
- // set acl so user doesn't have read access
- AccessControl acl = new AccessControl(AccessControlType.USER, BlobStoreAclHandler.ADMIN);
- acl.set_name(user1);
- rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl)));
- when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(1));
- File user1Dir = localizer.getLocalUserFileCacheDir(user1);
- assertTrue("failed to create user dir", user1Dir.mkdirs());
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ // set acl so user doesn't have read access
+ AccessControl acl = new AccessControl(AccessControlType.USER, BlobStoreAclHandler.ADMIN);
+ acl.set_name(user1);
+ rbm.set_settable(new SettableBlobMeta(Arrays.asList(acl)));
+ when(mockBlobStore.getBlobMeta(anyString())).thenReturn(rbm);
+ when(mockBlobStore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(1));
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
- LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
- topo1Assignment.set_owner(user1);
- PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
- // This should throw AuthorizationException because auth failed
- localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
+ LocalAssignment topo1Assignment = constructLocalAssignment(topo1, user1, Collections.emptyList());
+ PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
+ // This should throw AuthorizationException because auth failed
+ localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
+ }
}
@Test(expected = KeyNotFoundException.class)
@@ -824,163 +807,162 @@
@Test
public void testMultipleUsers() throws Exception {
- Map<String, Object> conf = new HashMap<>();
- // set clean time really high so doesn't kick in
- conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
+ try (TmpPath tmp = new TmpPath()){
+ Map<String, Object> conf = new HashMap<>();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
- String topo1 = "topo1";
- String topo2 = "topo2";
- String topo3 = "topo3";
- String key1 = "key1";
- String key2 = "key2";
- String key3 = "key3";
- TestLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
- // set to keep 2 blobs (each of size 34)
- localizer.setTargetCacheSize(68);
+ String topo1 = "topo1";
+ String topo2 = "topo2";
+ String topo3 = "topo3";
+ String key1 = "key1";
+ String key2 = "key2";
+ String key3 = "key3";
+ TestLocalizer localizer = new TestLocalizer(conf, tmp.getPath());
+ // set to keep 2 blobs (each of size 34)
+ localizer.setTargetCacheSize(68);
- ReadableBlobMeta rbm = new ReadableBlobMeta();
- rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING));
- when(mockblobstore.getBlobMeta(anyString())).thenReturn(rbm);
- //thenReturn always returns the same object, which is already consumed by the time User3 tries to getBlob!
- when(mockblobstore.getBlob(key1)).thenAnswer((i) -> new TestInputStreamWithMeta(1));
- when(mockblobstore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta(1));
- when(mockblobstore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta(1));
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING));
+ when(mockBlobStore.getBlobMeta(anyString())).thenReturn(rbm);
+ //thenReturn always returns the same object, which is already consumed by the time User3 tries to getBlob!
+ when(mockBlobStore.getBlob(key1)).thenAnswer((i) -> new TestInputStreamWithMeta(1));
+ when(mockBlobStore.getBlob(key2)).thenReturn(new TestInputStreamWithMeta(1));
+ when(mockBlobStore.getBlob(key3)).thenReturn(new TestInputStreamWithMeta(1));
- File user1Dir = localizer.getLocalUserFileCacheDir(user1);
- assertTrue("failed to create user dir", user1Dir.mkdirs());
- File user2Dir = localizer.getLocalUserFileCacheDir(user2);
- assertTrue("failed to create user dir", user2Dir.mkdirs());
- File user3Dir = localizer.getLocalUserFileCacheDir(user3);
- assertTrue("failed to create user dir", user3Dir.mkdirs());
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+ File user2Dir = localizer.getLocalUserFileCacheDir(user2);
+ assertTrue("failed to create user dir", user2Dir.mkdirs());
+ File user3Dir = localizer.getLocalUserFileCacheDir(user3);
+ assertTrue("failed to create user dir", user3Dir.mkdirs());
- LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
- topo1Assignment.set_owner(user1);
- PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
- LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
+ LocalAssignment topo1Assignment = constructLocalAssignment(topo1, user1, Collections.emptyList());
+ PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
+ LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
- LocalAssignment topo2Assignment = new LocalAssignment(topo2, Collections.emptyList());
- topo2Assignment.set_owner(user2);
- PortAndAssignment topo2Pna = new PortAndAssignmentImpl(2, topo2Assignment);
- LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false, false), topo2Pna, null);
+ LocalAssignment topo2Assignment = constructLocalAssignment(topo2, user2, Collections.emptyList());
+ PortAndAssignment topo2Pna = new PortAndAssignmentImpl(2, topo2Assignment);
+ LocalizedResource lrsrc2 = localizer.getBlob(new LocalResource(key2, false, false), topo2Pna, null);
- LocalAssignment topo3Assignment = new LocalAssignment(topo3, Collections.emptyList());
- topo3Assignment.set_owner(user3);
- PortAndAssignment topo3Pna = new PortAndAssignmentImpl(3, topo3Assignment);
- LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false, false), topo3Pna, null);
+ LocalAssignment topo3Assignment = constructLocalAssignment(topo3, user3, Collections.emptyList());
+ PortAndAssignment topo3Pna = new PortAndAssignmentImpl(3, topo3Assignment);
+ LocalizedResource lrsrc3 = localizer.getBlob(new LocalResource(key3, false, false), topo3Pna, null);
- // make sure we support different user reading same blob
- LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false, false), topo3Pna, null);
+ // make sure we support different user reading same blob
+ LocalizedResource lrsrc1_user3 = localizer.getBlob(new LocalResource(key1, false, false), topo3Pna, null);
- String expectedUserDir1 = joinPath(baseDir.toString(), USERCACHE, user1);
- String expectedFileDirUser1 = joinPath(expectedUserDir1, LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
- String expectedFileDirUser2 = joinPath(baseDir.toString(), USERCACHE, user2,
- LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
- String expectedFileDirUser3 = joinPath(baseDir.toString(), USERCACHE, user3,
- LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
- assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists());
- assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists());
- assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists());
+ String expectedUserDir1 = joinPath(tmp.getPath(), USERCACHE, user1);
+ String expectedFileDirUser1 = joinPath(expectedUserDir1, LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
+ String expectedFileDirUser2 = joinPath(tmp.getPath(), USERCACHE, user2,
+ LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
+ String expectedFileDirUser3 = joinPath(tmp.getPath(), USERCACHE, user3,
+ LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
+ assertTrue("user filecache dir user1 not created", new File(expectedFileDirUser1).exists());
+ assertTrue("user filecache dir user2 not created", new File(expectedFileDirUser2).exists());
+ assertTrue("user filecache dir user3 not created", new File(expectedFileDirUser3).exists());
- File keyFile = new File(expectedFileDirUser1, key1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- File keyFile2 = new File(expectedFileDirUser2, key2 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- File keyFile3 = new File(expectedFileDirUser3, key3 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- File keyFile1user3 = new File(expectedFileDirUser3, key1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ File keyFile = new File(expectedFileDirUser1, key1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ File keyFile2 = new File(expectedFileDirUser2, key2 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ File keyFile3 = new File(expectedFileDirUser3, key3 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ File keyFile1user3 = new File(expectedFileDirUser3, key1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- assertTrue("blob not created", keyFile.exists());
- assertTrue("blob not created", keyFile2.exists());
- assertTrue("blob not created", keyFile3.exists());
- assertTrue("blob not created", keyFile1user3.exists());
+ assertTrue("blob not created", keyFile.exists());
+ assertTrue("blob not created", keyFile2.exists());
+ assertTrue("blob not created", keyFile3.exists());
+ assertTrue("blob not created", keyFile1user3.exists());
- //Should assert file size
- assertEquals("size doesn't match", 34, lrsrc.getSizeOnDisk());
- assertEquals("size doesn't match", 34, lrsrc2.getSizeOnDisk());
- assertEquals("size doesn't match", 34, lrsrc3.getSizeOnDisk());
- //This was 0 byte in test
- assertEquals("size doesn't match", 34, lrsrc1_user3.getSizeOnDisk());
+ //Should assert file size
+ assertEquals("size doesn't match", 34, lrsrc.getSizeOnDisk());
+ assertEquals("size doesn't match", 34, lrsrc2.getSizeOnDisk());
+ assertEquals("size doesn't match", 34, lrsrc3.getSizeOnDisk());
+ //This was 0 byte in test
+ assertEquals("size doesn't match", 34, lrsrc1_user3.getSizeOnDisk());
- ConcurrentMap<String, LocalizedResource> lrsrcSet = localizer.getUserFiles().get(user1);
- assertEquals("local resource set size wrong", 1, lrsrcSet.size());
- ConcurrentMap<String, LocalizedResource> lrsrcSet2 = localizer.getUserFiles().get(user2);
- assertEquals("local resource set size wrong", 1, lrsrcSet2.size());
- ConcurrentMap<String, LocalizedResource> lrsrcSet3 = localizer.getUserFiles().get(user3);
- assertEquals("local resource set size wrong", 2, lrsrcSet3.size());
+ ConcurrentMap<String, LocalizedResource> lrsrcSet = localizer.getUserFiles().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.size());
+ ConcurrentMap<String, LocalizedResource> lrsrcSet2 = localizer.getUserFiles().get(user2);
+ assertEquals("local resource set size wrong", 1, lrsrcSet2.size());
+ ConcurrentMap<String, LocalizedResource> lrsrcSet3 = localizer.getUserFiles().get(user3);
+ assertEquals("local resource set size wrong", 2, lrsrcSet3.size());
- localizer.removeBlobReference(lrsrc.getKey(), topo1Pna, false);
- // should remove key1
- localizer.cleanup();
+ localizer.removeBlobReference(lrsrc.getKey(), topo1Pna, false);
+ // should remove key1
+ localizer.cleanup();
- lrsrcSet = localizer.getUserFiles().get(user1);
- lrsrcSet3 = localizer.getUserFiles().get(user3);
- assertNull("user set should be null", lrsrcSet);
- assertFalse("blob dir not deleted", new File(expectedFileDirUser1).exists());
- assertFalse("blob dir not deleted", new File(expectedUserDir1).exists());
- assertEquals("local resource set size wrong", 2, lrsrcSet3.size());
+ lrsrcSet = localizer.getUserFiles().get(user1);
+ lrsrcSet3 = localizer.getUserFiles().get(user3);
+ assertNull("user set should be null", lrsrcSet);
+ assertFalse("blob dir not deleted", new File(expectedFileDirUser1).exists());
+ assertFalse("blob dir not deleted", new File(expectedUserDir1).exists());
+ assertEquals("local resource set size wrong", 2, lrsrcSet3.size());
- assertTrue("blob deleted", keyFile2.exists());
- assertFalse("blob not deleted", keyFile.exists());
- assertTrue("blob deleted", keyFile3.exists());
- assertTrue("blob deleted", keyFile1user3.exists());
+ assertTrue("blob deleted", keyFile2.exists());
+ assertFalse("blob not deleted", keyFile.exists());
+ assertTrue("blob deleted", keyFile3.exists());
+ assertTrue("blob deleted", keyFile1user3.exists());
+ }
}
@Test
public void testUpdate() throws Exception {
- Map<String, Object> conf = new HashMap<>();
- // set clean time really high so doesn't kick in
- conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
+ try (TmpPath tmp = new TmpPath()) {
+ Map<String, Object> conf = new HashMap<>();
+ // set clean time really high so doesn't kick in
+ conf.put(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, 60 * 60 * 1000);
- String key1 = "key1";
- String topo1 = "topo1";
- String topo2 = "topo2";
- TestLocalizer localizer = new TestLocalizer(conf, baseDir.toString());
+ String key1 = "key1";
+ String topo1 = "topo1";
+ String topo2 = "topo2";
+ TestLocalizer localizer = new TestLocalizer(conf, tmp.getPath());
- ReadableBlobMeta rbm = new ReadableBlobMeta();
- rbm.set_version(1);
- rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING));
- when(mockblobstore.getBlobMeta(key1)).thenReturn(rbm);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(1));
+ ReadableBlobMeta rbm = new ReadableBlobMeta();
+ rbm.set_version(1);
+ rbm.set_settable(new SettableBlobMeta(WORLD_EVERYTHING));
+ when(mockBlobStore.getBlobMeta(key1)).thenReturn(rbm);
+ when(mockBlobStore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(1));
- File user1Dir = localizer.getLocalUserFileCacheDir(user1);
- assertTrue("failed to create user dir", user1Dir.mkdirs());
- LocalAssignment topo1Assignment = new LocalAssignment(topo1, Collections.emptyList());
- topo1Assignment.set_owner(user1);
- PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
- LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
+ File user1Dir = localizer.getLocalUserFileCacheDir(user1);
+ assertTrue("failed to create user dir", user1Dir.mkdirs());
+ LocalAssignment topo1Assignment = constructLocalAssignment(topo1, user1, Collections.emptyList());
+ PortAndAssignment topo1Pna = new PortAndAssignmentImpl(1, topo1Assignment);
+ LocalizedResource lrsrc = localizer.getBlob(new LocalResource(key1, false, false), topo1Pna, null);
- String expectedUserDir = joinPath(baseDir.toString(), USERCACHE, user1);
- String expectedFileDir = joinPath(expectedUserDir, LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
- assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
- Path keyVersionFile = Paths.get(expectedFileDir, key1 + ".version");
- File keyFileCurrentSymlink = new File(expectedFileDir, key1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
- assertTrue("blob not created", keyFileCurrentSymlink.exists());
- File versionFile = new File(expectedFileDir, key1 + LocalizedResource.BLOB_VERSION_SUFFIX);
- assertTrue("blob version file not created", versionFile.exists());
- assertEquals("blob version not correct", 1, LocalizedResource.localVersionOfBlob(keyVersionFile));
+ String expectedUserDir = joinPath(tmp.getPath(), USERCACHE, user1);
+ String expectedFileDir = joinPath(expectedUserDir, LocalizedResource.FILECACHE, LocalizedResource.FILESDIR);
+ assertTrue("user filecache dir not created", new File(expectedFileDir).exists());
+ Path keyVersionFile = Paths.get(expectedFileDir, key1 + ".version");
+ File keyFileCurrentSymlink = new File(expectedFileDir, key1 + LocalizedResource.CURRENT_BLOB_SUFFIX);
+ assertTrue("blob not created", keyFileCurrentSymlink.exists());
+ File versionFile = new File(expectedFileDir, key1 + LocalizedResource.BLOB_VERSION_SUFFIX);
+ assertTrue("blob version file not created", versionFile.exists());
+ assertEquals("blob version not correct", 1, LocalizedResource.localVersionOfBlob(keyVersionFile));
- ConcurrentMap<String, LocalizedResource> lrsrcSet = localizer.getUserFiles().get(user1);
- assertEquals("local resource set size wrong", 1, lrsrcSet.size());
+ ConcurrentMap<String, LocalizedResource> lrsrcSet = localizer.getUserFiles().get(user1);
+ assertEquals("local resource set size wrong", 1, lrsrcSet.size());
- // test another topology getting blob with updated version - it should update version now
- rbm.set_version(2);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(2));
+ // test another topology getting blob with updated version - it should update version now
+ rbm.set_version(2);
+ when(mockBlobStore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(2));
- LocalAssignment topo2Assignment = new LocalAssignment(topo2, Collections.emptyList());
- topo2Assignment.set_owner(user1);
- PortAndAssignment topo2Pna = new PortAndAssignmentImpl(1, topo2Assignment);
- localizer.getBlob(new LocalResource(key1, false, false), topo2Pna, null);
- assertTrue("blob version file not created", versionFile.exists());
- assertEquals("blob version not correct", 2, LocalizedResource.localVersionOfBlob(keyVersionFile));
- assertTrue("blob file with version 2 not created", new File(expectedFileDir, key1 + ".2").exists());
+ LocalAssignment topo2Assignment = constructLocalAssignment(topo2, user1, Collections.emptyList());
+ PortAndAssignment topo2Pna = new PortAndAssignmentImpl(1, topo2Assignment);
+ localizer.getBlob(new LocalResource(key1, false, false), topo2Pna, null);
+ assertTrue("blob version file not created", versionFile.exists());
+ assertEquals("blob version not correct", 2, LocalizedResource.localVersionOfBlob(keyVersionFile));
+ assertTrue("blob file with version 2 not created", new File(expectedFileDir, key1 + ".2").exists());
- // now test regular updateBlob
- rbm.set_version(3);
- when(mockblobstore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(3));
+ // now test regular updateBlob
+ rbm.set_version(3);
+ when(mockBlobStore.getBlob(key1)).thenReturn(new TestInputStreamWithMeta(3));
- ArrayList<LocalResource> arr = new ArrayList<>();
- arr.add(new LocalResource(key1, false, false));
- localizer.updateBlobs();
- assertTrue("blob version file not created", versionFile.exists());
- assertEquals("blob version not correct", 3, LocalizedResource.localVersionOfBlob(keyVersionFile));
- assertTrue("blob file with version 3 not created", new File(expectedFileDir, key1 + ".3").exists());
+ ArrayList<LocalResource> arr = new ArrayList<>();
+ arr.add(new LocalResource(key1, false, false));
+ localizer.updateBlobs();
+ assertTrue("blob version file not created", versionFile.exists());
+ assertEquals("blob version not correct", 3, LocalizedResource.localVersionOfBlob(keyVersionFile));
+ assertTrue("blob file with version 3 not created", new File(expectedFileDir, key1 + ".3").exists());
+ }
}
@Test
@@ -989,9 +971,9 @@
PortAndAssignment pna = new PortAndAssignmentImpl(1, la);
PortAndAssignment tpna = new TimePortAndAssignment(pna, new Timer());
- assertTrue(pna.equals(tpna));
- assertTrue(tpna.equals(pna));
- assertTrue(pna.hashCode() == tpna.hashCode());
+ assertEquals(pna, tpna);
+ assertEquals(tpna, pna);
+ assertEquals(pna.hashCode(), tpna.hashCode());
}
class TestLocalizer extends AsyncLocalizer {
@@ -1002,7 +984,7 @@
@Override
protected ClientBlobStore getClientBlobStore() {
- return mockblobstore;
+ return mockBlobStore;
}
synchronized void addReferences(List<LocalResource> localresource, PortAndAssignment pna, BlobChangingCallback cb) {