blob: 7b0c671e3481bf7b3dac42b2f60d4fb145fef0da [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.runtime;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.test.util.ShellScript;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME;
import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Tests for passing and loading dynamical properties of task manager. Usually(in Yarn, Kubernetes),
* the taskmanager java start commands are wrapped within bash. This test will generate a
* launch_container.sh script to validate the dynamical properties passing and loading.
*/
public class TaskManagerLoadingDynamicPropertiesITCase extends TestLogger {
private static final String KEY_A = "key.a";
private static final String VALUE_A = "#a,b&c^d*e@f(g!h";
private static final String KEY_B = "key.b";
private static final String VALUE_B = "'foobar";
private static final String KEY_C = "key.c";
private static final String VALUE_C = "foo''bar";
private static final String KEY_D = "key.d";
private static final String VALUE_D = "'foo' 'bar'";
private static final String KEY_E = "key.e";
private static final String VALUE_E = "foo\"bar'";
private static final String KEY_F = "key.f";
private static final String VALUE_F = "\"foo\" \"bar\"";
@Rule public TemporaryFolder folder = new TemporaryFolder();
@Test
public void testLoadingDynamicPropertiesInBash() throws Exception {
final Configuration clientConfiguration = new Configuration();
final File root = folder.getRoot();
final File homeDir = new File(root, "home");
assertTrue(homeDir.mkdir());
BootstrapTools.writeConfiguration(
clientConfiguration, new File(homeDir, FLINK_CONF_FILENAME));
final Configuration jmUpdatedConfiguration = getJobManagerUpdatedConfiguration();
final File shellScriptFile =
generateLaunchContainerScript(
homeDir,
BootstrapTools.getDynamicPropertiesAsString(
clientConfiguration, jmUpdatedConfiguration));
Process process = new ProcessBuilder(shellScriptFile.getAbsolutePath()).start();
try {
final StringWriter processOutput = new StringWriter();
new CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput);
if (!process.waitFor(10, TimeUnit.SECONDS)) {
throw new Exception("TestingTaskManagerRunner did not shutdown in time.");
}
assertEquals(processOutput.toString(), 0, process.exitValue());
} finally {
process.destroy();
}
}
private Configuration getJobManagerUpdatedConfiguration() {
final Configuration updatedConfig = new Configuration();
updatedConfig.setString(KEY_A, VALUE_A);
updatedConfig.setString(KEY_B, VALUE_B);
updatedConfig.setString(KEY_C, VALUE_C);
updatedConfig.setString(KEY_D, VALUE_D);
updatedConfig.setString(KEY_E, VALUE_E);
updatedConfig.setString(KEY_F, VALUE_F);
return updatedConfig;
}
private File generateLaunchContainerScript(File homeDir, String dynamicProperties)
throws IOException {
final ShellScript.ShellScriptBuilder shellScriptBuilder =
ShellScript.createShellScriptBuilder();
final List<String> commands = new ArrayList<>();
commands.add(getJavaCommandWithOS());
commands.add(getInternalClassNameWithOS(TestingTaskManagerRunner.class.getName()));
commands.add("--configDir");
commands.add(homeDir.getAbsolutePath());
commands.add(dynamicProperties);
shellScriptBuilder.env("CLASSPATH", getCurrentClasspath());
shellScriptBuilder.command(commands);
final File shellScriptFile =
new File(homeDir, "launch_container" + ShellScript.getScriptExtension());
shellScriptBuilder.write(shellScriptFile);
return shellScriptFile;
}
private String getJavaCommandWithOS() {
if (OperatingSystem.isWindows()) {
return "\"" + getJavaCommandPath() + "\"";
}
return getJavaCommandPath();
}
private String getInternalClassNameWithOS(String className) {
if (!OperatingSystem.isWindows()) {
return className.replace("$", "'$'");
}
return className;
}
// --------------------------------------------------------------------------------------------
/**
* The testing taskmanager runner. The exit code will be 0 if configuration values check passed.
*/
public static class TestingTaskManagerRunner {
public static void main(String[] args) throws FlinkParseException {
final Configuration flinkConfig = TaskManagerRunner.loadConfiguration(args);
assertThat(
flinkConfig.toMap().values(),
Matchers.containsInAnyOrder(
VALUE_A, VALUE_B, VALUE_C, VALUE_D, VALUE_E, VALUE_F));
}
}
}