STORM-3647: Adds OFF HEAP to worker child opts
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index 4db6f3f..4d1609c 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -444,7 +444,7 @@
return CPJ.join(workercp);
}
- private String substituteChildOptsInternal(String string, int memOnheap) {
+ private String substituteChildOptsInternal(String string, int memOnheap, int memOffheap) {
if (StringUtils.isNotBlank(string)) {
String p = String.valueOf(port);
string = string.replace("%ID%", p);
@@ -454,6 +454,9 @@
if (memOnheap > 0) {
string = string.replace("%HEAP-MEM%", String.valueOf(memOnheap));
}
+ if (memOffheap > 0) {
+ string = string.replace("%OFF-HEAP-MEM%", String.valueOf(memOffheap));
+ }
if (memoryLimitMb > 0) {
string = string.replace("%LIMIT-MEM%", String.valueOf(memoryLimitMb));
}
@@ -462,13 +465,13 @@
}
protected List<String> substituteChildopts(Object value) {
- return substituteChildopts(value, -1);
+ return substituteChildopts(value, -1, -1);
}
- protected List<String> substituteChildopts(Object value, int memOnheap) {
+ protected List<String> substituteChildopts(Object value, int memOnheap, int memOffHeap) {
List<String> rets = new ArrayList<>();
if (value instanceof String) {
- String string = substituteChildOptsInternal((String) value, memOnheap);
+ String string = substituteChildOptsInternal((String) value, memOnheap, memOffHeap);
if (StringUtils.isNotBlank(string)) {
String[] strings = string.split("\\s+");
for (String s : strings) {
@@ -481,7 +484,7 @@
@SuppressWarnings("unchecked")
List<String> objects = (List<String>) value;
for (String object : objects) {
- String str = substituteChildOptsInternal(object, memOnheap);
+ String str = substituteChildOptsInternal(object, memOnheap, memOffHeap);
if (StringUtils.isNotBlank(str)) {
rets.add(str);
}
@@ -586,10 +589,20 @@
return memOnheap;
}
- private List<String> getWorkerProfilerChildOpts(int memOnheap) {
+ private int getMemOffHeap(WorkerResources resources) {
+ int memOffheap = 0;
+ if (resources != null && resources.is_set_mem_off_heap() && resources.get_mem_off_heap() > 0) {
+ memOffheap = (int) Math.ceil(resources.get_mem_off_heap());
+ }
+ return memOffheap;
+ }
+
+ private List<String> getWorkerProfilerChildOpts(int memOnheap, int memOffheap) {
List<String> workerProfilerChildopts = new ArrayList<>();
if (ObjectReader.getBoolean(conf.get(DaemonConfig.WORKER_PROFILER_ENABLED), false)) {
- workerProfilerChildopts = substituteChildopts(conf.get(DaemonConfig.WORKER_PROFILER_CHILDOPTS), memOnheap);
+ workerProfilerChildopts = substituteChildopts(
+ conf.get(DaemonConfig.WORKER_PROFILER_CHILDOPTS), memOnheap, memOffheap
+ );
}
return workerProfilerChildopts;
}
@@ -615,7 +628,7 @@
*
* @throws IOException on any error.
*/
- private List<String> mkLaunchCommand(final int memOnheap, final String stormRoot,
+ private List<String> mkLaunchCommand(final int memOnheap, final int memOffheap, final String stormRoot,
final String jlp, final String numaId) throws IOException {
final String javaCmd = javaCmd("java");
final String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
@@ -653,12 +666,12 @@
commandList.add("-server");
commandList.addAll(commonParams);
commandList.add("-Dlog4j.configurationFile=" + workerLog4jConfig);
- commandList.addAll(substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), memOnheap));
- commandList.addAll(substituteChildopts(topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap));
+ commandList.addAll(substituteChildopts(conf.get(Config.WORKER_CHILDOPTS), memOnheap, memOffheap));
+ commandList.addAll(substituteChildopts(topoConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS), memOnheap, memOffheap));
commandList.addAll(substituteChildopts(Utils.OR(
topoConf.get(Config.TOPOLOGY_WORKER_GC_CHILDOPTS),
- conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap));
- commandList.addAll(getWorkerProfilerChildOpts(memOnheap));
+ conf.get(Config.WORKER_GC_CHILDOPTS)), memOnheap, memOffheap));
+ commandList.addAll(getWorkerProfilerChildOpts(memOnheap, memOffheap));
commandList.add("-Djava.library.path=" + jlp);
commandList.add("-Dstorm.conf.file=" + topoConfFile);
commandList.add("-Dstorm.options=" + stormOptions);
@@ -833,6 +846,7 @@
final WorkerResources resources = assignment.get_resources();
final int memOnHeap = getMemOnHeap(resources);
+ final int memOffHeap = getMemOffHeap(resources);
memoryLimitMb = calculateMemoryLimit(resources, memOnHeap);
final String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, topologyId);
String jlp = javaLibraryPath(stormRoot, conf);
@@ -857,7 +871,7 @@
resourceIsolationManager.reserveResourcesForWorker(workerId, (int) memoryLimitMb, cpu, numaId);
}
- List<String> commandList = mkLaunchCommand(memOnHeap, stormRoot, jlp, numaId);
+ List<String> commandList = mkLaunchCommand(memOnHeap, memOffHeap, stormRoot, jlp, numaId);
LOG.info("Launching worker with command: {}. ", ServerUtils.shellCmd(commandList));
diff --git a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
index e8bb376..cb95229 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
@@ -599,6 +599,7 @@
int supervisorPort = 6628;
int port = 9999;
int memOnheap = 512;
+ int memOffheap = 256;
LocalAssignment la = new LocalAssignment();
la.set_topology_id(topoId);
@@ -617,18 +618,24 @@
assertListEquals(Arrays.asList(
"-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
"-Xms256m",
- "-Xmx512m"),
- mc.substituteChildopts(
- "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m",
- memOnheap));
+ "-Xmx512m", "-XX:MaxDirectMemorySize=256m"),
+ mc.substituteChildopts(
+ "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log -Xms256m -Xmx%HEAP-MEM%m -XX:MaxDirectMemorySize=%OFF-HEAP-MEM%m",
+ memOnheap, memOffheap));
- assertListEquals(Arrays.asList(
- "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
- "-Xms256m",
- "-Xmx512m"),
- mc.substituteChildopts(Arrays.asList(
- "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log", "-Xms256m",
- "-Xmx%HEAP-MEM%m"), memOnheap));
+ assertListEquals(
+ Arrays.asList(
+ "-Xloggc:/tmp/storm/logs/gc.worker-9999-s-01-w-01-9999.log",
+ "-Xms256m",
+ "-Xmx512m"
+ ),
+ mc.substituteChildopts(
+ Arrays.asList(
+ "-Xloggc:/tmp/storm/logs/gc.worker-%ID%-%TOPOLOGY-ID%-%WORKER-ID%-%WORKER-PORT%.log", "-Xms256m",
+ "-Xmx%HEAP-MEM%m"
+ ), memOnheap, memOffheap
+ )
+ );
assertListEquals(Collections.emptyList(),
mc.substituteChildopts(null));
@@ -672,8 +679,8 @@
}
@Override
- public List<String> substituteChildopts(Object value, int memOnheap) {
- return super.substituteChildopts(value, memOnheap);
+ public List<String> substituteChildopts(Object value, int memOnheap, int memOffHeap) {
+ return super.substituteChildopts(value, memOnheap, memOffHeap);
}
@Override