blob: 7f8db8ed4bad6a9f310e91d14b8c43835c7e45f5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.dist;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Random;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/**
* Unit test that verifies that the task manager heap size calculation used by the bash script
* <tt>taskmanager.sh</tt> returns the same values as the heap size calculation of
* {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)}.
*
* <p>NOTE: the shell script uses <tt>awk</tt> to perform floating-point arithmetic which uses
* <tt>double</tt> precision but our Java code restrains to <tt>float</tt> because we actually do
* not need high precision.
*/
public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
/** Key that is used by <tt>config.sh</tt>. */
private static final String KEY_TASKM_MEM_SIZE = "taskmanager.heap.mb";
/**
* Number of tests with random values.
*
* <p>NOTE: calling the external test script is slow and thus low numbers are preferred for general
* testing.
*/
private static final int NUM_RANDOM_TESTS = 20;
@Before
public void checkOperatingSystem() {
Assume.assumeTrue("This test checks shell scripts which are not available on Windows.",
!OperatingSystem.isWindows());
}
/**
* Tests that {@link TaskManagerServices#calculateNetworkBufferMemory(long, Configuration)} has the same
* result as the shell script.
*/
@Test
public void compareNetworkBufShellScriptWithJava() throws Exception {
int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
compareNetworkBufJavaVsScript(
getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
compareNetworkBufJavaVsScript(
getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f);
compareNetworkBufJavaVsScript(
getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
// some automated tests with random (but valid) values
Random ran = new Random();
for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
// tolerate that values differ by 1% (due to different floating point precisions)
compareNetworkBufJavaVsScript(getRandomConfig(ran), 0.01f);
}
}
/**
* Tests that {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} has the same
* result as the shell script.
*/
@Test
public void compareHeapSizeShellScriptWithJava() throws Exception {
int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
// manual tests from org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateHeapSizeMB()
compareHeapSizeJavaVsScript(
getConfig(1000, false, 0.1f, 64L << 20, 1L << 30, managedMemSize, managedMemFrac), 0.0f);
compareHeapSizeJavaVsScript(
getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, 10 /*MB*/, managedMemFrac), 0.0f);
compareHeapSizeJavaVsScript(
getConfig(1000, true, 0.1f, 64L << 20, 1L << 30, managedMemSize, 0.1f), 0.0f);
// some automated tests with random (but valid) values
Random ran = new Random();
for (int i = 0; i < NUM_RANDOM_TESTS; ++i) {
// tolerate that values differ by 1% (due to different floating point precisions)
compareHeapSizeJavaVsScript(getRandomConfig(ran), 0.01f);
}
}
/**
* Returns a flink configuration object with the given values.
*
* @param javaMemMB
* total JVM memory to use (in megabytes)
* @param useOffHeap
* whether to use off-heap memory (<tt>true</tt>) or not (<tt>false</tt>)
* @param netBufMemFrac
* fraction of JVM memory to use for network buffers
* @param netBufMemMin
* minimum memory size for network buffers (in bytes)
* @param netBufMemMax
* maximum memory size for network buffers (in bytes)
* @param managedMemSizeMB
* amount of managed memory (in megabytes)
* @param managedMemFrac
* fraction of free memory to use for managed memory (if <tt>managedMemSizeMB</tt> is
* <tt>-1</tt>)
*
* @return flink configuration
*/
private static Configuration getConfig(
final int javaMemMB, final boolean useOffHeap, final float netBufMemFrac,
final long netBufMemMin, final long netBufMemMax, final int managedMemSizeMB,
final float managedMemFrac) {
Configuration config = new Configuration();
config.setLong(KEY_TASKM_MEM_SIZE, javaMemMB);
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, useOffHeap);
config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, netBufMemFrac);
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, netBufMemMin);
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, netBufMemMax);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, managedMemSizeMB);
config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, managedMemFrac);
return config;
}
/**
* Returns a flink configuration object with random values (only those relevant to the tests in
* this class.
*
* @param ran random number generator
*
* @return flink configuration
*/
private static Configuration getRandomConfig(final Random ran) {
float frac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
// note: we are testing with integers only here to avoid overly complicated checks for
// overflowing or negative Long values - this should be enough for any practical scenario
// though
long min = (long) TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue() + ran.nextInt(Integer.MAX_VALUE);
long max = ran.nextInt(Integer.MAX_VALUE) + min;
int javaMemMB = Math.max((int) (max >> 20), ran.nextInt(Integer.MAX_VALUE)) + 1;
boolean useOffHeap = ran.nextBoolean();
int managedMemSize = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue().intValue();
float managedMemFrac = TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue();
if (ran.nextBoolean()) {
// use fixed-size managed memory
Configuration config = getConfig(javaMemMB, useOffHeap, frac, min, max, managedMemSize, managedMemFrac);
long totalJavaMemorySize = ((long) javaMemMB) << 20; // megabytes to bytes
final int networkBufMB =
(int) (TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20);
// max (exclusive): total - netbuf
managedMemSize = Math.min(javaMemMB - networkBufMB - 1, ran.nextInt(Integer.MAX_VALUE));
} else {
// use fraction of given memory
managedMemFrac = Math.max(ran.nextFloat(), Float.MIN_VALUE);
}
return getConfig(javaMemMB, useOffHeap, frac, min, max, managedMemSize, managedMemFrac);
}
// Helper functions
/**
* Calculates the heap size via
* {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} and the shell script
* and verifies that these are equal.
*
* @param config flink configuration
* @param tolerance tolerate values that are off by this factor (0.01 = 1%)
*/
private void compareNetworkBufJavaVsScript(final Configuration config, final float tolerance)
throws IOException {
final long totalJavaMemorySizeMB = config.getLong(KEY_TASKM_MEM_SIZE, 0L);
long javaNetworkBufMem = TaskManagerServices.calculateNetworkBufferMemory(totalJavaMemorySizeMB << 20, config);
String[] command = {"src/test/bin/calcTMNetBufMem.sh",
String.valueOf(totalJavaMemorySizeMB),
String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)),
String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX))};
String scriptOutput = executeScript(command);
long absoluteTolerance = (long) (javaNetworkBufMem * tolerance);
if (absoluteTolerance < 1) {
assertEquals(
"Different network buffer memory sizes with configuration: " + config.toString(),
String.valueOf(javaNetworkBufMem), scriptOutput);
} else {
Long scriptNetworkBufMem = Long.valueOf(scriptOutput);
assertThat(
"Different network buffer memory sizes (Java: " + javaNetworkBufMem + ", Script: " + scriptNetworkBufMem +
") with configuration: " + config.toString(), scriptNetworkBufMem,
allOf(greaterThanOrEqualTo(javaNetworkBufMem - absoluteTolerance),
lessThanOrEqualTo(javaNetworkBufMem + absoluteTolerance)));
}
}
/**
* Calculates the heap size via
* {@link TaskManagerServices#calculateHeapSizeMB(long, Configuration)} and the shell script
* and verifies that these are equal.
*
* @param config flink configuration
* @param tolerance tolerate values that are off by this factor (0.01 = 1%)
*/
private void compareHeapSizeJavaVsScript(final Configuration config, float tolerance)
throws IOException {
final long totalJavaMemorySizeMB = config.getLong(KEY_TASKM_MEM_SIZE, 0L);
long javaHeapSizeMB = TaskManagerServices.calculateHeapSizeMB(totalJavaMemorySizeMB, config);
String[] command = {"src/test/bin/calcTMHeapSizeMB.sh",
String.valueOf(totalJavaMemorySizeMB),
String.valueOf(config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)),
String.valueOf(config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION)),
String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)),
String.valueOf(config.getLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)),
String.valueOf(config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE)),
String.valueOf(config.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION))};
String scriptOutput = executeScript(command);
long absoluteTolerance = (long) (javaHeapSizeMB * tolerance);
if (absoluteTolerance < 1) {
assertEquals("Different heap sizes with configuration: " + config.toString(),
String.valueOf(javaHeapSizeMB), scriptOutput);
} else {
Long scriptHeapSizeMB = Long.valueOf(scriptOutput);
assertThat(
"Different heap sizes (Java: " + javaHeapSizeMB + ", Script: " + scriptHeapSizeMB +
") with configuration: " + config.toString(), scriptHeapSizeMB,
allOf(greaterThanOrEqualTo(javaHeapSizeMB - absoluteTolerance),
lessThanOrEqualTo(javaHeapSizeMB + absoluteTolerance)));
}
}
/**
* Executes the given shell script wrapper and returns its output.
*
* @param command command to run
*
* @return raw script output
*/
private String executeScript(final String[] command) throws IOException {
ProcessBuilder pb = new ProcessBuilder(command);
pb.redirectErrorStream(true);
Process process = pb.start();
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
StringBuilder sb = new StringBuilder();
String s;
while ((s = reader.readLine()) != null) {
sb.append(s);
}
return sb.toString();
}
}