blob: bb629566c50cc34ec77d8668b55276e36c3571dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.cli;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import static org.apache.flink.client.cli.CliFrontendTestUtils.getTestJarPath;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Tests for the RUN command.
*/
public class CliFrontendRunTest extends CliFrontendTestBase {
@BeforeClass
public static void init() {
CliFrontendTestUtils.pipeSystemOutToNull();
}
@AfterClass
public static void shutdown() {
CliFrontendTestUtils.restoreSystemOut();
}
@Test
public void testRun() throws Exception {
final Configuration configuration = getConfiguration();
// test without parallelism
{
String[] parameters = {"-v", getTestJarPath()};
verifyCliFrontend(getCli(configuration), parameters, 1, true, false);
}
// test configure parallelism
{
String[] parameters = {"-v", "-p", "42", getTestJarPath()};
verifyCliFrontend(getCli(configuration), parameters, 42, true, false);
}
// test configure sysout logging
{
String[] parameters = {"-p", "2", "-q", getTestJarPath()};
verifyCliFrontend(getCli(configuration), parameters, 2, false, false);
}
// test detached mode
{
String[] parameters = {"-p", "2", "-d", getTestJarPath()};
verifyCliFrontend(getCli(configuration), parameters, 2, true, true);
}
// test configure savepoint path (no ignore flag)
{
String[] parameters = {"-s", "expectedSavepointPath", getTestJarPath()};
RunOptions options = CliFrontendParser.parseRunCommand(parameters);
SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings();
assertTrue(savepointSettings.restoreSavepoint());
assertEquals("expectedSavepointPath", savepointSettings.getRestorePath());
assertFalse(savepointSettings.allowNonRestoredState());
assertFalse(savepointSettings.resumeFromLatestCheckpoint());
}
{
String resumeJobPath = "resumeJobPath";
String[] parameters = {"-r", resumeJobPath, getTestJarPath()};
RunOptions options = CliFrontendParser.parseRunCommand(parameters);
SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings();
assertTrue(savepointSettings.restoreSavepoint());
assertEquals(resumeJobPath, savepointSettings.getRestorePath());
assertFalse(savepointSettings.allowNonRestoredState());
assertTrue(savepointSettings.resumeFromLatestCheckpoint());
}
// test configure savepoint path (with ignore flag)
{
String[] parameters = {"-s", "expectedSavepointPath", "-n", getTestJarPath()};
RunOptions options = CliFrontendParser.parseRunCommand(parameters);
SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings();
assertTrue(savepointSettings.restoreSavepoint());
assertEquals("expectedSavepointPath", savepointSettings.getRestorePath());
assertTrue(savepointSettings.allowNonRestoredState());
}
// test jar arguments
{
String[] parameters =
{ getTestJarPath(), "-arg1", "value1", "justavalue", "--arg2", "value2"};
RunOptions options = CliFrontendParser.parseRunCommand(parameters);
assertEquals("-arg1", options.getProgramArgs()[0]);
assertEquals("value1", options.getProgramArgs()[1]);
assertEquals("justavalue", options.getProgramArgs()[2]);
assertEquals("--arg2", options.getProgramArgs()[3]);
assertEquals("value2", options.getProgramArgs()[4]);
}
// test libjars, files
{
final String libjar1 = "file:///tmp/libjar1.jar";
final String file = "/tmp/file.txt#newname";
String[] parameters = {
"--libjars", libjar1,
"--files", file,
getTestJarPath()};
RunOptions options = CliFrontendParser.parseRunCommand(parameters);
final List<URI> libjars = options.getLibjars();
assertEquals(1, libjars.size());
assertEquals(new URI(libjar1), libjars.get(0));
final List<URI> files = options.getFiles();
assertEquals(1, files.size());
assertEquals(new URI("file://" + file), files.get(0));
assertEquals("newname", files.get(0).getFragment());
}
}
@Test(expected = CliArgsException.class)
public void testUnrecognizedOption() throws Exception {
// test unrecognized option
String[] parameters = {"-v", "-l", "-a", "some", "program", "arguments"};
verifyParameters(parameters);
}
@Test(expected = CliArgsException.class)
public void testInvalidParallelismOption() throws Exception {
// test configure parallelism with non integer value
String[] parameters = {"-v", "-p", "text", getTestJarPath()};
verifyParameters(parameters);
}
@Test(expected = CliArgsException.class)
public void testParallelismWithOverflow() throws Exception {
// test configure parallelism with overflow integer value
String[] parameters = {"-v", "-p", "475871387138", getTestJarPath()};
verifyParameters(parameters);
}
@Test(expected = CliArgsException.class)
public void testBothRestoreAndResumeOptions() throws Exception {
// test configure both fromSavepoint and resume
String[] parameters = {"-s", "hdfs:///checkpoint/job-id1/chk-18", "-r", "hdfs:///checkpoint/job-id1", getTestJarPath()};
verifyParameters(parameters);
}
// --------------------------------------------------------------------------------------------
public static void verifyCliFrontend(
AbstractCustomCommandLine<?> cli,
String[] parameters,
int expectedParallelism,
boolean logging,
boolean isDetached) throws Exception {
RunTestingCliFrontend testFrontend =
new RunTestingCliFrontend(cli, expectedParallelism, logging,
isDetached);
testFrontend.run(parameters); // verifies the expected values (see below)
}
private static final class RunTestingCliFrontend extends CliFrontend {
private final int expectedParallelism;
private final boolean sysoutLogging;
private final boolean isDetached;
private RunTestingCliFrontend(
AbstractCustomCommandLine<?> cli,
int expectedParallelism,
boolean logging,
boolean isDetached) throws Exception {
super(
cli.getConfiguration(),
Collections.singletonList(cli));
this.expectedParallelism = expectedParallelism;
this.sysoutLogging = logging;
this.isDetached = isDetached;
}
@Override
protected void executeProgram(PackagedProgram program, ClusterClient client, int parallelism) {
assertEquals(isDetached, client.isDetached());
assertEquals(sysoutLogging, client.getPrintStatusDuringExecution());
assertEquals(expectedParallelism, parallelism);
}
}
private static void verifyParameters(String[] parameters) throws Exception {
Configuration configuration = new Configuration();
CliFrontend testFrontend = new CliFrontend(
configuration,
Collections.singletonList(getCli(configuration)));
testFrontend.run(parameters);
}
}