| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.test.runtime; |
| |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.runtime.clusterframework.BootstrapTools; |
| import org.apache.flink.runtime.entrypoint.FlinkParseException; |
| import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; |
| import org.apache.flink.runtime.testutils.CommonTestUtils; |
| import org.apache.flink.test.util.ShellScript; |
| import org.apache.flink.util.OperatingSystem; |
| import org.apache.flink.util.TestLogger; |
| |
| import org.hamcrest.Matchers; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.StringWriter; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME; |
| import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath; |
| import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| |
| /** |
| * Tests for passing and loading dynamical properties of task manager. Usually(in Yarn, Kubernetes), |
| * the taskmanager java start commands are wrapped within bash. This test will generate a |
| * launch_container.sh script to validate the dynamical properties passing and loading. |
| */ |
| public class TaskManagerLoadingDynamicPropertiesITCase extends TestLogger { |
| |
| private static final String KEY_A = "key.a"; |
| private static final String VALUE_A = "#a,b&c^d*e@f(g!h"; |
| private static final String KEY_B = "key.b"; |
| private static final String VALUE_B = "'foobar"; |
| private static final String KEY_C = "key.c"; |
| private static final String VALUE_C = "foo''bar"; |
| private static final String KEY_D = "key.d"; |
| private static final String VALUE_D = "'foo' 'bar'"; |
| private static final String KEY_E = "key.e"; |
| private static final String VALUE_E = "foo\"bar'"; |
| private static final String KEY_F = "key.f"; |
| private static final String VALUE_F = "\"foo\" \"bar\""; |
| |
| @Rule public TemporaryFolder folder = new TemporaryFolder(); |
| |
| @Test |
| public void testLoadingDynamicPropertiesInBash() throws Exception { |
| final Configuration clientConfiguration = new Configuration(); |
| final File root = folder.getRoot(); |
| final File homeDir = new File(root, "home"); |
| assertTrue(homeDir.mkdir()); |
| BootstrapTools.writeConfiguration( |
| clientConfiguration, new File(homeDir, FLINK_CONF_FILENAME)); |
| |
| final Configuration jmUpdatedConfiguration = getJobManagerUpdatedConfiguration(); |
| |
| final File shellScriptFile = |
| generateLaunchContainerScript( |
| homeDir, |
| BootstrapTools.getDynamicPropertiesAsString( |
| clientConfiguration, jmUpdatedConfiguration)); |
| |
| Process process = new ProcessBuilder(shellScriptFile.getAbsolutePath()).start(); |
| try { |
| final StringWriter processOutput = new StringWriter(); |
| new CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput); |
| |
| if (!process.waitFor(10, TimeUnit.SECONDS)) { |
| throw new Exception("TestingTaskManagerRunner did not shutdown in time."); |
| } |
| assertEquals(processOutput.toString(), 0, process.exitValue()); |
| } finally { |
| process.destroy(); |
| } |
| } |
| |
| private Configuration getJobManagerUpdatedConfiguration() { |
| final Configuration updatedConfig = new Configuration(); |
| updatedConfig.setString(KEY_A, VALUE_A); |
| updatedConfig.setString(KEY_B, VALUE_B); |
| updatedConfig.setString(KEY_C, VALUE_C); |
| updatedConfig.setString(KEY_D, VALUE_D); |
| updatedConfig.setString(KEY_E, VALUE_E); |
| updatedConfig.setString(KEY_F, VALUE_F); |
| return updatedConfig; |
| } |
| |
| private File generateLaunchContainerScript(File homeDir, String dynamicProperties) |
| throws IOException { |
| final ShellScript.ShellScriptBuilder shellScriptBuilder = |
| ShellScript.createShellScriptBuilder(); |
| final List<String> commands = new ArrayList<>(); |
| commands.add(getJavaCommandWithOS()); |
| commands.add(getInternalClassNameWithOS(TestingTaskManagerRunner.class.getName())); |
| commands.add("--configDir"); |
| commands.add(homeDir.getAbsolutePath()); |
| commands.add(dynamicProperties); |
| shellScriptBuilder.env("CLASSPATH", getCurrentClasspath()); |
| shellScriptBuilder.command(commands); |
| final File shellScriptFile = |
| new File(homeDir, "launch_container" + ShellScript.getScriptExtension()); |
| shellScriptBuilder.write(shellScriptFile); |
| return shellScriptFile; |
| } |
| |
| private String getJavaCommandWithOS() { |
| if (OperatingSystem.isWindows()) { |
| return "\"" + getJavaCommandPath() + "\""; |
| } |
| return getJavaCommandPath(); |
| } |
| |
| private String getInternalClassNameWithOS(String className) { |
| if (!OperatingSystem.isWindows()) { |
| return className.replace("$", "'$'"); |
| } |
| return className; |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * The testing taskmanager runner. The exit code will be 0 if configuration values check passed. |
| */ |
| public static class TestingTaskManagerRunner { |
| |
| public static void main(String[] args) throws FlinkParseException { |
| final Configuration flinkConfig = TaskManagerRunner.loadConfiguration(args); |
| assertThat( |
| flinkConfig.toMap().values(), |
| Matchers.containsInAnyOrder( |
| VALUE_A, VALUE_B, VALUE_C, VALUE_D, VALUE_E, VALUE_F)); |
| } |
| } |
| } |