blob: b44df7bf101be6a6211b19f8bf9b6fe83f2d07e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.structuredstreaming;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.fail;
import java.io.Serializable;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** This suite tests that various scenarios result in proper states of the pipeline. */
@RunWith(JUnit4.class)
public class StructuredStreamingPipelineStateTest implements Serializable {
private static class MyCustomException extends RuntimeException {
MyCustomException(final String message) {
super(message);
}
}
private final transient SparkStructuredStreamingPipelineOptions options =
PipelineOptionsFactory.create().as(SparkStructuredStreamingPipelineOptions.class);
@Rule public transient TestName testName = new TestName();
private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally";
private ParDo.SingleOutput<String, String> printParDo(final String prefix) {
return ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(final ProcessContext c) {
System.out.println(prefix + " " + c.element());
}
});
}
private PTransform<PBegin, PCollection<String>> getValues(
final SparkStructuredStreamingPipelineOptions options) {
final boolean doNotSyncWithWatermark = false;
return options.isStreaming()
? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1), doNotSyncWithWatermark)
.nextBatch("one", "two")
: Create.of("one", "two");
}
private SparkStructuredStreamingPipelineOptions getStreamingOptions() {
options.setRunner(SparkStructuredStreamingRunner.class);
options.setStreaming(true);
return options;
}
private SparkStructuredStreamingPipelineOptions getBatchOptions() {
options.setRunner(SparkStructuredStreamingRunner.class);
options.setStreaming(false); // explicit because options is reused throughout the test.
return options;
}
private Pipeline getPipeline(final SparkStructuredStreamingPipelineOptions options) {
final Pipeline pipeline = Pipeline.create(options);
final String name = testName.getMethodName() + "(isStreaming=" + options.isStreaming() + ")";
pipeline.apply(getValues(options)).setCoder(StringUtf8Coder.of()).apply(printParDo(name));
return pipeline;
}
private void testFailedPipeline(final SparkStructuredStreamingPipelineOptions options)
throws Exception {
SparkStructuredStreamingPipelineResult result = null;
try {
final Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(getValues(options))
.setCoder(StringUtf8Coder.of())
.apply(
MapElements.via(
new SimpleFunction<String, String>() {
@Override
public String apply(final String input) {
throw new MyCustomException(FAILED_THE_BATCH_INTENTIONALLY);
}
}));
result = (SparkStructuredStreamingPipelineResult) pipeline.run();
result.waitUntilFinish();
} catch (final Exception e) {
assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class));
assertThat(e.getCause(), instanceOf(MyCustomException.class));
assertThat(e.getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY));
assertThat(result.getState(), is(PipelineResult.State.FAILED));
result.cancel();
return;
}
fail("An injected failure did not affect the pipeline as expected.");
}
private void testTimeoutPipeline(final SparkStructuredStreamingPipelineOptions options)
throws Exception {
final Pipeline pipeline = getPipeline(options);
final SparkStructuredStreamingPipelineResult result =
(SparkStructuredStreamingPipelineResult) pipeline.run();
result.waitUntilFinish(Duration.millis(1));
assertThat(result.getState(), is(PipelineResult.State.RUNNING));
result.cancel();
}
private void testCanceledPipeline(final SparkStructuredStreamingPipelineOptions options)
throws Exception {
final Pipeline pipeline = getPipeline(options);
final SparkStructuredStreamingPipelineResult result =
(SparkStructuredStreamingPipelineResult) pipeline.run();
result.cancel();
assertThat(result.getState(), is(PipelineResult.State.CANCELLED));
}
private void testRunningPipeline(final SparkStructuredStreamingPipelineOptions options)
throws Exception {
final Pipeline pipeline = getPipeline(options);
final SparkStructuredStreamingPipelineResult result =
(SparkStructuredStreamingPipelineResult) pipeline.run();
assertThat(result.getState(), is(PipelineResult.State.RUNNING));
result.cancel();
}
@Ignore("TODO: Reactivate with streaming.")
@Test
public void testStreamingPipelineRunningState() throws Exception {
testRunningPipeline(getStreamingOptions());
}
@Test
public void testBatchPipelineRunningState() throws Exception {
testRunningPipeline(getBatchOptions());
}
@Ignore("TODO: Reactivate with streaming.")
@Test
public void testStreamingPipelineCanceledState() throws Exception {
testCanceledPipeline(getStreamingOptions());
}
@Test
public void testBatchPipelineCanceledState() throws Exception {
testCanceledPipeline(getBatchOptions());
}
@Ignore("TODO: Reactivate with streaming.")
@Test
public void testStreamingPipelineFailedState() throws Exception {
testFailedPipeline(getStreamingOptions());
}
@Test
public void testBatchPipelineFailedState() throws Exception {
testFailedPipeline(getBatchOptions());
}
@Ignore("TODO: Reactivate with streaming.")
@Test
public void testStreamingPipelineTimeoutState() throws Exception {
testTimeoutPipeline(getStreamingOptions());
}
@Test
public void testBatchPipelineTimeoutState() throws Exception {
testTimeoutPipeline(getBatchOptions());
}
}