blob: 2a4b2377c6b04feee708f1e2b240335cc9a74667 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink;
import static org.hamcrest.CoreMatchers.allOf;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.Test;
/**
* Test for {@link FlinkRunner}.
*
* <p>This test is copied to 1.10 is becauses the signature of the method getPipeline in
* OptimizerPlanEnvironment has been changed in Flink 1.10, please refer to
* https://github.com/apache/flink/commit/0ea4dd7e9d56a017743ca6794d28537800faab6f for more details.
*/
public class FlinkRunnerTest {
@Test
public void testEnsureStdoutStdErrIsRestored() throws Exception {
PackagedProgram packagedProgram =
PackagedProgram.newBuilder().setEntryPointClassName(getClass().getName()).build();
// constructor changed between Flink 1.10.0 and 1.10.1 and will again change in 1.11
OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(new Configuration());
try {
// Flink will throw an error because no job graph will be generated by the main method
env.getPipeline(packagedProgram, false);
Assert.fail("This should have failed to create the Flink Plan.");
} catch (ProgramInvocationException e) {
// Test that Flink wasn't able to intercept the stdout/stderr and we printed to the regular
// output instead
MatcherAssert.assertThat(
e.getMessage(),
allOf(
StringContains.containsString("System.out: (none)"),
StringContains.containsString("System.err: (none)")));
}
}
/** Main method for {@code testEnsureStdoutStdErrIsRestored()}. */
public static void main(String[] args) {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(NotExecutingFlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(GenerateSequence.from(0));
// This will call Workarounds.restoreOriginalStdOutAndStdErr() through the constructor of
// FlinkRunner
p.run();
}
private static class NotExecutingFlinkRunner extends FlinkRunner {
protected NotExecutingFlinkRunner(FlinkPipelineOptions options) {
// Stdout/Stderr is restored here
super(options);
}
@SuppressWarnings("unused")
public static NotExecutingFlinkRunner fromOptions(PipelineOptions options) {
return new NotExecutingFlinkRunner(options.as(FlinkPipelineOptions.class));
}
@Override
public PipelineResult run(Pipeline pipeline) {
// Do not execute to test the stdout printing
return null;
}
}
}