blob: 7c51efcbacbcbe5e69a92fb31613fbafb008395e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.container.entrypoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.entrypoint.FlinkParseException;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Optional;
import java.util.Properties;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsNull.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** Tests for the {@link StandaloneApplicationClusterConfigurationParserFactory}. */
public class StandaloneApplicationClusterConfigurationParserFactoryTest extends TestLogger {
@Rule public TemporaryFolder tempFolder = new TemporaryFolder();
private File confFile;
private String confDirPath;
@Before
public void createEmptyFlinkConfiguration() throws IOException {
File confDir = tempFolder.getRoot();
confDirPath = confDir.getAbsolutePath();
confFile = new File(confDir, GlobalConfiguration.FLINK_CONF_FILENAME);
confFile.createNewFile();
}
private static final CommandLineParser<StandaloneApplicationClusterConfiguration>
commandLineParser =
new CommandLineParser<>(
new StandaloneApplicationClusterConfigurationParserFactory());
private static final String JOB_CLASS_NAME = "foobar";
@Test
public void testEntrypointClusterConfigurationToConfigurationParsing()
throws FlinkParseException {
final JobID jobID = JobID.generate();
final SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath("/test/savepoint/path", true);
final String key = DeploymentOptions.TARGET.key();
final String value = "testDynamicExecutorConfig";
final int restPort = 1234;
final String arg1 = "arg1";
final String arg2 = "arg2";
final String[] args = {
"--configDir",
confDirPath,
"--job-id",
jobID.toHexString(),
"--fromSavepoint",
savepointRestoreSettings.getRestorePath(),
"--allowNonRestoredState",
"--webui-port",
String.valueOf(restPort),
"--job-classname",
JOB_CLASS_NAME,
String.format("-D%s=%s", key, value),
arg1,
arg2
};
final StandaloneApplicationClusterConfiguration clusterConfiguration =
commandLineParser.parse(args);
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));
final Configuration configuration =
StandaloneApplicationClusterEntryPoint.loadConfigurationFromClusterConfig(
clusterConfiguration);
final String strJobId = configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
assertThat(JobID.fromHexString(strJobId), is(equalTo(jobID)));
assertThat(
SavepointRestoreSettings.fromConfiguration(configuration),
is(equalTo(savepointRestoreSettings)));
assertThat(configuration.get(RestOptions.PORT), is(equalTo(restPort)));
assertThat(configuration.get(DeploymentOptions.TARGET), is(equalTo(value)));
}
@Test
public void testEntrypointClusterConfigWOSavepointSettingsToConfigurationParsing()
throws FlinkParseException {
final JobID jobID = JobID.generate();
final String[] args = {"-c", confDirPath, "--job-id", jobID.toHexString()};
final StandaloneApplicationClusterConfiguration clusterConfiguration =
commandLineParser.parse(args);
final Configuration configuration =
StandaloneApplicationClusterEntryPoint.loadConfigurationFromClusterConfig(
clusterConfiguration);
final String strJobId = configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID);
assertThat(JobID.fromHexString(strJobId), is(equalTo(jobID)));
assertThat(
SavepointRestoreSettings.fromConfiguration(configuration),
is(equalTo(SavepointRestoreSettings.none())));
}
@Test
public void testEntrypointClusterConfigurationParsing() throws FlinkParseException {
final String key = "key";
final String value = "value";
final int restPort = 1234;
final String arg1 = "arg1";
final String arg2 = "arg2";
final String[] args = {
"--configDir",
confDirPath,
"--webui-port",
String.valueOf(restPort),
"--job-classname",
JOB_CLASS_NAME,
String.format("-D%s=%s", key, value),
arg1,
arg2
};
final StandaloneApplicationClusterConfiguration clusterConfiguration =
commandLineParser.parse(args);
assertThat(clusterConfiguration.getConfigDir(), is(equalTo(confDirPath)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(JOB_CLASS_NAME)));
assertThat(clusterConfiguration.getRestPort(), is(equalTo(restPort)));
final Properties dynamicProperties = clusterConfiguration.getDynamicProperties();
assertThat(dynamicProperties, hasEntry(key, value));
assertThat(clusterConfiguration.getArgs(), arrayContaining(arg1, arg2));
assertThat(
clusterConfiguration.getSavepointRestoreSettings(),
is(equalTo(SavepointRestoreSettings.none())));
assertThat(clusterConfiguration.getJobId(), is(nullValue()));
}
@Test
public void testOnlyRequiredArguments() throws FlinkParseException {
final String[] args = {"--configDir", confDirPath};
final StandaloneApplicationClusterConfiguration clusterConfiguration =
commandLineParser.parse(args);
assertThat(clusterConfiguration.getConfigDir(), is(equalTo(confDirPath)));
assertThat(clusterConfiguration.getDynamicProperties(), is(equalTo(new Properties())));
assertThat(clusterConfiguration.getArgs(), is(new String[0]));
assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1)));
assertThat(clusterConfiguration.getHostname(), is(nullValue()));
assertThat(
clusterConfiguration.getSavepointRestoreSettings(),
is(equalTo(SavepointRestoreSettings.none())));
assertThat(clusterConfiguration.getJobId(), is(nullValue()));
assertThat(clusterConfiguration.getJobClassName(), is(nullValue()));
}
@Test(expected = FlinkParseException.class)
public void testMissingRequiredArgument() throws FlinkParseException {
final String[] args = {};
commandLineParser.parse(args);
}
@Test
public void testSavepointRestoreSettingsParsing() throws FlinkParseException {
final String restorePath = "foobar";
final String[] args = {"-c", confDirPath, "-j", JOB_CLASS_NAME, "-s", restorePath, "-n"};
final StandaloneApplicationClusterConfiguration standaloneApplicationClusterConfiguration =
commandLineParser.parse(args);
final SavepointRestoreSettings savepointRestoreSettings =
standaloneApplicationClusterConfiguration.getSavepointRestoreSettings();
assertThat(savepointRestoreSettings.restoreSavepoint(), is(true));
assertThat(savepointRestoreSettings.getRestorePath(), is(equalTo(restorePath)));
assertThat(savepointRestoreSettings.allowNonRestoredState(), is(true));
}
@Test
public void testSetJobIdManually() throws FlinkParseException {
final JobID jobId = new JobID();
final String[] args = {
"--configDir", confDirPath, "--job-classname", "foobar", "--job-id", jobId.toString()
};
final StandaloneApplicationClusterConfiguration standaloneApplicationClusterConfiguration =
commandLineParser.parse(args);
assertThat(standaloneApplicationClusterConfiguration.getJobId(), is(equalTo(jobId)));
}
@Test
public void testInvalidJobIdThrows() {
final String invalidJobId = "0xINVALID";
final String[] args = {
"--configDir", confDirPath, "--job-classname", "foobar", "--job-id", invalidJobId
};
try {
commandLineParser.parse(args);
fail("Did not throw expected FlinkParseException");
} catch (FlinkParseException e) {
Optional<IllegalArgumentException> cause =
ExceptionUtils.findThrowable(e, IllegalArgumentException.class);
assertTrue(cause.isPresent());
assertThat(cause.get().getMessage(), containsString(invalidJobId));
}
}
@Test
public void testShortOptions() throws FlinkParseException {
final String jobClassName = "foobar";
final JobID jobId = new JobID();
final String savepointRestorePath = "s3://foo/bar";
final String[] args = {
"-c", confDirPath,
"-j", jobClassName,
"-jid", jobId.toString(),
"-s", savepointRestorePath,
"-n"
};
final StandaloneApplicationClusterConfiguration clusterConfiguration =
commandLineParser.parse(args);
assertThat(clusterConfiguration.getConfigDir(), is(equalTo(confDirPath)));
assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName)));
assertThat(clusterConfiguration.getJobId(), is(equalTo(jobId)));
final SavepointRestoreSettings savepointRestoreSettings =
clusterConfiguration.getSavepointRestoreSettings();
assertThat(savepointRestoreSettings.restoreSavepoint(), is(true));
assertThat(savepointRestoreSettings.getRestorePath(), is(equalTo(savepointRestorePath)));
assertThat(savepointRestoreSettings.allowNonRestoredState(), is(true));
}
@Test
public void testHostOption() throws FlinkParseException {
final String hostName = "user-specified-hostname";
final String[] args = {
"--configDir", confDirPath, "--job-classname", "foobar", "--host", hostName
};
final StandaloneApplicationClusterConfiguration applicationClusterConfiguration =
commandLineParser.parse(args);
assertThat(applicationClusterConfiguration.getHostname(), is(hostName));
}
}