blob: e8bb37659c86a6f7c7cb423666292ad43b007d9d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package org.apache.storm.daemon.supervisor;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.container.ResourceIsolationInterface;
import org.apache.storm.daemon.supervisor.Container.ContainerType;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.ProfileAction;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.SimpleVersion;
import org.apache.storm.utils.Utils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ConfigUtils;
public class BasicContainerTest {
private static void setSystemProp(String key, String value) {
if (value == null) {
System.clearProperty(key);
} else {
System.setProperty(key, value);
}
}
private static void checkpoint(Run r, String... newValues) throws Exception {
if (newValues.length % 2 != 0) {
throw new IllegalArgumentException("Parameters are of the form system property name, new value");
}
Map<String, String> orig = new HashMap<>();
try {
for (int index = 0; index < newValues.length; index += 2) {
String key = newValues[index];
String value = newValues[index + 1];
orig.put(key, System.getProperty(key));
setSystemProp(key, value);
}
r.run();
} finally {
for (Map.Entry<String, String> entry : orig.entrySet()) {
setSystemProp(entry.getKey(), entry.getValue());
}
}
}
private static <T> void assertListEquals(List<T> a, List<T> b) {
if (a == null) {
assertNull(b);
}
if (b == null) {
assertNull(a);
}
int commonLen = Math.min(a.size(), b.size());
for (int i = 0; i < commonLen; i++) {
assertEquals("at index " + i + "\n" + a + " !=\n" + b + "\n", a.get(i), b.get(i));
}
assertEquals("size of lists don't match \n" + a + " !=\n" + b, a.size(), b.size());
}
@Test
public void testCreateNewWorkerId() throws Exception {
final String topoId = "test_topology";
final int supervisorPort = 6628;
final int port = 8080;
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
Map<String, Object> superConf = new HashMap<>();
AdvancedFSOps ops = mock(AdvancedFSOps.class);
when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
LocalState ls = mock(LocalState.class);
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
"SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
new HashMap<>(), ops, "profile");
//null worker id means generate one...
assertNotNull(mc.workerId);
verify(ls).getApprovedWorkers();
Map<String, Integer> expectedNewState = new HashMap<String, Integer>();
expectedNewState.put(mc.workerId, port);
verify(ls).setApprovedWorkers(expectedNewState);
}
@Test
public void testRecovery() throws Exception {
final String topoId = "test_topology";
final String workerId = "myWorker";
final int supervisorPort = 6628;
final int port = 8080;
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
Map<String, Integer> workerState = new HashMap<String, Integer>();
workerState.put(workerId, port);
LocalState ls = mock(LocalState.class);
when(ls.getApprovedWorkers()).thenReturn(workerState);
Map<String, Object> superConf = new HashMap<>();
AdvancedFSOps ops = mock(AdvancedFSOps.class);
when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
MockBasicContainer mc = new MockBasicContainer(ContainerType.RECOVER_FULL, superConf,
"SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
new HashMap<>(), ops, "profile");
assertEquals(workerId, mc.workerId);
}
@Test
public void testRecoveryMiss() throws Exception {
final String topoId = "test_topology";
final int supervisorPort = 6628;
final int port = 8080;
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
Map<String, Integer> workerState = new HashMap<String, Integer>();
workerState.put("somethingelse", port + 1);
LocalState ls = mock(LocalState.class);
when(ls.getApprovedWorkers()).thenReturn(workerState);
try {
new MockBasicContainer(ContainerType.RECOVER_FULL, new HashMap<String, Object>(),
"SUPERVISOR", supervisorPort, port, la, null, ls, null, new StormMetricsRegistry(),
new HashMap<>(), null, "profile");
fail("Container recovered worker incorrectly");
} catch (ContainerRecoveryException e) {
//Expected
}
}
@Test
public void testCleanUp() throws Exception {
final String topoId = "test_topology";
final int supervisorPort = 6628;
final int port = 8080;
final String workerId = "worker-id";
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
Map<String, Object> superConf = new HashMap<>();
AdvancedFSOps ops = mock(AdvancedFSOps.class);
when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
Map<String, Integer> workerState = new HashMap<String, Integer>();
workerState.put(workerId, port);
LocalState ls = mock(LocalState.class);
when(ls.getApprovedWorkers()).thenReturn(new HashMap<>(workerState));
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
"SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(), new HashMap<>(), ops,
"profile");
mc.cleanUp();
assertNull(mc.workerId);
verify(ls).getApprovedWorkers();
Map<String, Integer> expectedNewState = new HashMap<String, Integer>();
verify(ls).setApprovedWorkers(expectedNewState);
}
@Test
public void testRunProfiling() throws Exception {
final long pid = 100;
final String topoId = "test_topology";
final int supervisorPort = 6628;
final int port = 8080;
final String workerId = "worker-id";
final String stormLocal = ContainerTest.asAbsPath("tmp", "testing");
final String topoRoot = ContainerTest.asAbsPath(stormLocal, topoId, String.valueOf(port));
final File workerArtifactsPid = ContainerTest.asAbsFile(topoRoot, "worker.pid");
final Map<String, Object> superConf = new HashMap<>();
superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
AdvancedFSOps ops = mock(AdvancedFSOps.class);
when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
when(ops.slurpString(workerArtifactsPid)).thenReturn(String.valueOf(pid));
LocalState ls = mock(LocalState.class);
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
"SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
new HashMap<>(), ops, "profile");
//HEAP DUMP
ProfileRequest req = new ProfileRequest();
req.set_action(ProfileAction.JMAP_DUMP);
mc.runProfiling(req, false);
assertEquals(1, mc.profileCmds.size());
CommandRun cmd = mc.profileCmds.get(0);
mc.profileCmds.clear();
assertEquals(Arrays.asList("profile", String.valueOf(pid), "jmap", topoRoot), cmd.cmd);
assertEquals(new File(topoRoot), cmd.pwd);
//JSTACK DUMP
req.set_action(ProfileAction.JSTACK_DUMP);
mc.runProfiling(req, false);
assertEquals(1, mc.profileCmds.size());
cmd = mc.profileCmds.get(0);
mc.profileCmds.clear();
assertEquals(Arrays.asList("profile", String.valueOf(pid), "jstack", topoRoot), cmd.cmd);
assertEquals(new File(topoRoot), cmd.pwd);
//RESTART
req.set_action(ProfileAction.JVM_RESTART);
mc.runProfiling(req, false);
assertEquals(1, mc.profileCmds.size());
cmd = mc.profileCmds.get(0);
mc.profileCmds.clear();
assertEquals(Arrays.asList("profile", String.valueOf(pid), "kill"), cmd.cmd);
assertEquals(new File(topoRoot), cmd.pwd);
//JPROFILE DUMP
req.set_action(ProfileAction.JPROFILE_DUMP);
mc.runProfiling(req, false);
assertEquals(1, mc.profileCmds.size());
cmd = mc.profileCmds.get(0);
mc.profileCmds.clear();
assertEquals(Arrays.asList("profile", String.valueOf(pid), "dump", topoRoot), cmd.cmd);
assertEquals(new File(topoRoot), cmd.pwd);
//JPROFILE START
req.set_action(ProfileAction.JPROFILE_STOP);
mc.runProfiling(req, false);
assertEquals(1, mc.profileCmds.size());
cmd = mc.profileCmds.get(0);
mc.profileCmds.clear();
assertEquals(Arrays.asList("profile", String.valueOf(pid), "start"), cmd.cmd);
assertEquals(new File(topoRoot), cmd.pwd);
//JPROFILE STOP
req.set_action(ProfileAction.JPROFILE_STOP);
mc.runProfiling(req, true);
assertEquals(1, mc.profileCmds.size());
cmd = mc.profileCmds.get(0);
mc.profileCmds.clear();
assertEquals(Arrays.asList("profile", String.valueOf(pid), "stop", topoRoot), cmd.cmd);
assertEquals(new File(topoRoot), cmd.pwd);
}
@Test
public void testLaunch() throws Exception {
final String topoId = "test_topology_current";
final int supervisorPort = 6628;
final int port = 8080;
final String stormHome = ContainerTest.asAbsPath("tmp", "storm-home");
final String stormLogDir = ContainerTest.asFile(".", "target").getCanonicalPath();
final String workerId = "worker-id";
final String stormLocal = ContainerTest.asAbsPath("tmp", "storm-local");
final String distRoot = ContainerTest.asAbsPath(stormLocal, "supervisor", "stormdist", topoId);
final File stormcode = new File(distRoot, "stormcode.ser");
final File stormjar = new File(distRoot, "stormjar.jar");
final String log4jdir = ContainerTest.asAbsPath(stormHome, "conf");
final String workerConf = ContainerTest.asAbsPath(log4jdir, "worker.xml");
final String workerRoot = ContainerTest.asAbsPath(stormLocal, "workers", workerId);
final String workerTmpDir = ContainerTest.asAbsPath(workerRoot, "tmp");
final StormTopology st = new StormTopology();
st.set_spouts(new HashMap<>());
st.set_bolts(new HashMap<>());
st.set_state_spouts(new HashMap<>());
byte[] serializedState = Utils.gzip(Utils.thriftSerialize(st));
final Map<String, Object> superConf = new HashMap<>();
superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
superConf.put(DaemonConfig.STORM_LOG4J2_CONF_DIR, log4jdir);
superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true");
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
AdvancedFSOps ops = mock(AdvancedFSOps.class);
when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
when(ops.slurp(stormcode)).thenReturn(serializedState);
LocalState ls = mock(LocalState.class);
checkpoint(() -> {
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
"SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
new HashMap<>(), ops, "profile");
mc.launch();
assertEquals(1, mc.workerCmds.size());
CommandRun cmd = mc.workerCmds.get(0);
mc.workerCmds.clear();
assertListEquals(Arrays.asList(
"java",
"-cp",
"FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
"-Dlogging.sensitivity=S3",
"-Dlogfile.name=worker.log",
"-Dstorm.home=" + stormHome,
"-Dworkers.artifacts=" + stormLocal,
"-Dstorm.id=" + topoId,
"-Dworker.id=" + workerId,
"-Dworker.port=" + port,
"-Dstorm.log.dir=" + stormLogDir,
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
"-Dstorm.local.dir=" + stormLocal,
"-Dworker.memory_limit_mb=768",
"-Dlog4j.configurationFile=" + workerConf,
"org.apache.storm.LogWriter",
"java",
"-server",
"-Dlogging.sensitivity=S3",
"-Dlogfile.name=worker.log",
"-Dstorm.home=" + stormHome,
"-Dworkers.artifacts=" + stormLocal,
"-Dstorm.id=" + topoId,
"-Dworker.id=" + workerId,
"-Dworker.port=" + port,
"-Dstorm.log.dir=" + stormLogDir,
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
"-Dstorm.local.dir=" + stormLocal,
"-Dworker.memory_limit_mb=768",
"-Dlog4j.configurationFile=" + workerConf,
"-Dtesting=true",
"-Djava.library.path=JLP",
"-Dstorm.conf.file=",
"-Dstorm.options=",
"-Djava.io.tmpdir=" + workerTmpDir,
"-cp",
"FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
"org.apache.storm.daemon.worker.Worker",
topoId,
"SUPERVISOR",
String.valueOf(supervisorPort),
String.valueOf(port),
workerId
), cmd.cmd);
assertEquals(new File(workerRoot), cmd.pwd);
},
ConfigUtils.STORM_HOME, stormHome,
"storm.log.dir", stormLogDir);
}
@Test
public void testLaunchStorm1version() throws Exception {
final String topoId = "test_topology_storm_1.x";
final int supervisorPort = 6628;
final int port = 8080;
final String stormHome = ContainerTest.asAbsPath("tmp", "storm-home");
final String stormLogDir = ContainerTest.asFile(".", "target").getCanonicalPath();
final String workerId = "worker-id";
final String stormLocal = ContainerTest.asAbsPath("tmp", "storm-local");
final String distRoot = ContainerTest.asAbsPath(stormLocal, "supervisor", "stormdist", topoId);
final File stormcode = new File(distRoot, "stormcode.ser");
final File stormjar = new File(distRoot, "stormjar.jar");
final String log4jdir = ContainerTest.asAbsPath(stormHome, "conf");
final String workerConf = ContainerTest.asAbsPath(log4jdir, "worker.xml");
final String workerRoot = ContainerTest.asAbsPath(stormLocal, "workers", workerId);
final String workerTmpDir = ContainerTest.asAbsPath(workerRoot, "tmp");
final StormTopology st = new StormTopology();
st.set_spouts(new HashMap<>());
st.set_bolts(new HashMap<>());
st.set_state_spouts(new HashMap<>());
// minimum 1.x version of supporting STORM-2448 would be 1.0.4
st.set_storm_version("1.0.4");
byte[] serializedState = Utils.gzip(Utils.thriftSerialize(st));
final Map<String, Object> superConf = new HashMap<>();
superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
superConf.put(DaemonConfig.STORM_LOG4J2_CONF_DIR, log4jdir);
superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true");
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
AdvancedFSOps ops = mock(AdvancedFSOps.class);
when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
when(ops.slurp(stormcode)).thenReturn(serializedState);
LocalState ls = mock(LocalState.class);
checkpoint(() -> {
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
"SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
new HashMap<>(), ops, "profile");
mc.launch();
assertEquals(1, mc.workerCmds.size());
CommandRun cmd = mc.workerCmds.get(0);
mc.workerCmds.clear();
assertListEquals(Arrays.asList(
"java",
"-cp",
"FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
"-Dlogging.sensitivity=S3",
"-Dlogfile.name=worker.log",
"-Dstorm.home=" + stormHome,
"-Dworkers.artifacts=" + stormLocal,
"-Dstorm.id=" + topoId,
"-Dworker.id=" + workerId,
"-Dworker.port=" + port,
"-Dstorm.log.dir=" + stormLogDir,
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
"-Dstorm.local.dir=" + stormLocal,
"-Dworker.memory_limit_mb=768",
"-Dlog4j.configurationFile=" + workerConf,
"org.apache.storm.LogWriter",
"java",
"-server",
"-Dlogging.sensitivity=S3",
"-Dlogfile.name=worker.log",
"-Dstorm.home=" + stormHome,
"-Dworkers.artifacts=" + stormLocal,
"-Dstorm.id=" + topoId,
"-Dworker.id=" + workerId,
"-Dworker.port=" + port,
"-Dstorm.log.dir=" + stormLogDir,
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
"-Dstorm.local.dir=" + stormLocal,
"-Dworker.memory_limit_mb=768",
"-Dlog4j.configurationFile=" + workerConf,
"-Dtesting=true",
"-Djava.library.path=JLP",
"-Dstorm.conf.file=",
"-Dstorm.options=",
"-Djava.io.tmpdir=" + workerTmpDir,
"-cp",
"FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
"org.apache.storm.daemon.worker",
topoId,
"SUPERVISOR",
String.valueOf(port),
workerId
), cmd.cmd);
assertEquals(new File(workerRoot), cmd.pwd);
},
ConfigUtils.STORM_HOME, stormHome,
"storm.log.dir", stormLogDir);
}
@Test
public void testLaunchStorm0version() throws Exception {
final String topoId = "test_topology_storm_0.x";
final int supervisorPort = 6628;
final int port = 8080;
final String stormHome = ContainerTest.asAbsPath("tmp", "storm-home");
final String stormLogDir = ContainerTest.asFile(".", "target").getCanonicalPath();
final String workerId = "worker-id";
final String stormLocal = ContainerTest.asAbsPath("tmp", "storm-local");
final String distRoot = ContainerTest.asAbsPath(stormLocal, "supervisor", "stormdist", topoId);
final File stormcode = new File(distRoot, "stormcode.ser");
final File stormjar = new File(distRoot, "stormjar.jar");
final String log4jdir = ContainerTest.asAbsPath(stormHome, "conf");
final String workerConf = ContainerTest.asAbsPath(log4jdir, "worker.xml");
final String workerRoot = ContainerTest.asAbsPath(stormLocal, "workers", workerId);
final String workerTmpDir = ContainerTest.asAbsPath(workerRoot, "tmp");
final StormTopology st = new StormTopology();
st.set_spouts(new HashMap<>());
st.set_bolts(new HashMap<>());
st.set_state_spouts(new HashMap<>());
// minimum 0.x version of supporting STORM-2448 would be 0.10.3
st.set_storm_version("0.10.3");
byte[] serializedState = Utils.gzip(Utils.thriftSerialize(st));
final Map<String, Object> superConf = new HashMap<>();
superConf.put(Config.STORM_LOCAL_DIR, stormLocal);
superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
superConf.put(DaemonConfig.STORM_LOG4J2_CONF_DIR, log4jdir);
superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true");
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
AdvancedFSOps ops = mock(AdvancedFSOps.class);
when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
when(ops.slurp(stormcode)).thenReturn(serializedState);
LocalState ls = mock(LocalState.class);
checkpoint(() -> {
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
"SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
new HashMap<>(), ops, "profile");
mc.launch();
assertEquals(1, mc.workerCmds.size());
CommandRun cmd = mc.workerCmds.get(0);
mc.workerCmds.clear();
assertListEquals(Arrays.asList(
"java",
"-cp",
"FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
"-Dlogging.sensitivity=S3",
"-Dlogfile.name=worker.log",
"-Dstorm.home=" + stormHome,
"-Dworkers.artifacts=" + stormLocal,
"-Dstorm.id=" + topoId,
"-Dworker.id=" + workerId,
"-Dworker.port=" + port,
"-Dstorm.log.dir=" + stormLogDir,
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
"-Dstorm.local.dir=" + stormLocal,
"-Dworker.memory_limit_mb=768",
"-Dlog4j.configurationFile=" + workerConf,
"backtype.storm.LogWriter",
"java",
"-server",
"-Dlogging.sensitivity=S3",
"-Dlogfile.name=worker.log",
"-Dstorm.home=" + stormHome,
"-Dworkers.artifacts=" + stormLocal,
"-Dstorm.id=" + topoId,
"-Dworker.id=" + workerId,
"-Dworker.port=" + port,
"-Dstorm.log.dir=" + stormLogDir,
"-DLog4jContextSelector=org.apache.logging.log4j.core.selector.BasicContextSelector",
"-Dstorm.local.dir=" + stormLocal,
"-Dworker.memory_limit_mb=768",
"-Dlog4j.configurationFile=" + workerConf,
"-Dtesting=true",
"-Djava.library.path=JLP",
"-Dstorm.conf.file=",
"-Dstorm.options=",
"-Djava.io.tmpdir=" + workerTmpDir,
"-cp",
"FRAMEWORK_CP:" + stormjar.getAbsolutePath(),
"backtype.storm.daemon.worker",
topoId,
"SUPERVISOR",
String.valueOf(port),
workerId
), cmd.cmd);
assertEquals(new File(workerRoot), cmd.pwd);
},
ConfigUtils.STORM_HOME, stormHome,
"storm.log.dir", stormLogDir);
}
@Test
public void testSubstChildOpts() throws Exception {
String workerId = "w-01";
String topoId = "s-01";
int supervisorPort = 6628;
int port = 9999;
int memOnheap = 512;
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
Map<String, Object> superConf = new HashMap<>();
AdvancedFSOps ops = mock(AdvancedFSOps.class);
when(ops.doRequiredTopoFilesExist(superConf, topoId)).thenReturn(true);
LocalState ls = mock(LocalState.class);
MockBasicContainer mc = new MockBasicContainer(ContainerType.LAUNCH, superConf,
"SUPERVISOR", supervisorPort, port, la, null, ls, workerId, new StormMetricsRegistry(),
new HashMap<>(), ops, "profile");
assertListEquals(Arrays.asList(
"-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
"-Xms256m",
"-Xmx512m"),
mc.substituteChildopts(
"-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m",
memOnheap));
assertListEquals(Arrays.asList(
"-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
"-Xms256m",
"-Xmx512m"),
mc.substituteChildopts(Arrays.asList(
"-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log", "-Xms256m",
"-Xmx%HEAP-MEM%m"), memOnheap));
assertListEquals(Collections.emptyList(),
mc.substituteChildopts(null));
}
private static interface Run {
public void run() throws Exception;
}
public static class CommandRun {
final List<String> cmd;
final Map<String, String> env;
final File pwd;
public CommandRun(List<String> cmd, Map<String, String> env, File pwd) {
this.cmd = cmd;
this.env = env;
this.pwd = pwd;
}
}
public static class MockBasicContainer extends BasicContainer {
public final List<CommandRun> profileCmds = new ArrayList<>();
public final List<CommandRun> workerCmds = new ArrayList<>();
public MockBasicContainer(ContainerType type, Map<String, Object> conf, String supervisorId, int supervisorPort,
int port, LocalAssignment assignment, ResourceIsolationInterface resourceIsolationManager,
LocalState localState, String workerId, StormMetricsRegistry metricsRegistry,
Map<String, Object> topoConf, AdvancedFSOps ops, String profileCmd) throws IOException {
super(type, conf, supervisorId, supervisorPort, port, assignment, resourceIsolationManager, localState,
workerId, metricsRegistry,new ContainerMemoryTracker(metricsRegistry), topoConf, ops, profileCmd);
}
@Override
protected Map<String, Object> readTopoConf() throws IOException {
return new HashMap<>();
}
@Override
public void createNewWorkerId() {
super.createNewWorkerId();
}
@Override
public List<String> substituteChildopts(Object value, int memOnheap) {
return super.substituteChildopts(value, memOnheap);
}
@Override
protected boolean runProfilingCommand(List<String> command, Map<String, String> env, String logPrefix,
File targetDir) throws IOException, InterruptedException {
profileCmds.add(new CommandRun(command, env, targetDir));
return true;
}
@Override
protected void launchWorkerProcess(List<String> command, Map<String, String> env, String logPrefix,
ExitCodeCallback processExitCallback, File targetDir) throws IOException {
workerCmds.add(new CommandRun(command, env, targetDir));
}
@Override
protected String javaCmd(String cmd) {
//avoid system dependent things
return cmd;
}
@Override
protected List<String> frameworkClasspath(SimpleVersion version) {
//We are not really running anything so make this
// simple to check for
return Arrays.asList("FRAMEWORK_CP");
}
@Override
protected String javaLibraryPath(String stormRoot, Map<String, Object> conf) {
return "JLP";
}
}
}