blob: e5e297a17413c9607a5bc200be24ab55996849ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNull.nullValue;
import java.util.Collections;
import java.util.HashMap;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests for serialization and deserialization of {@link PipelineOptions} in {@link DoFnOperator}.
*/
public class FlinkPipelineOptionsTest {
/** Pipeline options. */
public interface MyOptions extends FlinkPipelineOptions {
@Description("Bla bla bla")
@Default.String("Hello")
String getTestOption();
void setTestOption(String value);
}
private static MyOptions options =
PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class);
/** These defaults should only be changed with a very good reason. */
@Test
public void testDefaults() {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
assertThat(options.getParallelism(), is(-1));
assertThat(options.getMaxParallelism(), is(-1));
assertThat(options.getFlinkMaster(), is("[auto]"));
assertThat(options.getFilesToStage(), is(nullValue()));
assertThat(options.getLatencyTrackingInterval(), is(0L));
assertThat(options.isShutdownSourcesOnFinalWatermark(), is(false));
assertThat(options.getObjectReuse(), is(false));
assertThat(options.getCheckpointingMode(), is(CheckpointingMode.EXACTLY_ONCE.name()));
assertThat(options.getMinPauseBetweenCheckpoints(), is(-1L));
assertThat(options.getCheckpointingInterval(), is(-1L));
assertThat(options.getCheckpointTimeoutMillis(), is(-1L));
assertThat(options.getFailOnCheckpointingErrors(), is(true));
assertThat(options.getNumberOfExecutionRetries(), is(-1));
assertThat(options.getExecutionRetryDelay(), is(-1L));
assertThat(options.getRetainExternalizedCheckpointsOnCancellation(), is(false));
assertThat(options.getStateBackendFactory(), is(nullValue()));
assertThat(options.getMaxBundleSize(), is(1000L));
assertThat(options.getMaxBundleTimeMills(), is(1000L));
assertThat(options.getExecutionModeForBatch(), is(ExecutionMode.PIPELINED.name()));
assertThat(options.getSavepointPath(), is(nullValue()));
assertThat(options.getAllowNonRestoredState(), is(false));
}
@Test(expected = Exception.class)
public void parDoBaseClassPipelineOptionsNullTest() {
TupleTag<String> mainTag = new TupleTag<>("main-output");
Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
new DoFnOperator<>(
new TestDoFn(),
"stepName",
coder,
null,
Collections.emptyMap(),
mainTag,
Collections.emptyList(),
new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
WindowingStrategy.globalDefault(),
new HashMap<>(),
Collections.emptyList(),
null,
null, /* key coder */
null /* key selector */,
DoFnSchemaInformation.create(),
Collections.emptyMap());
}
/** Tests that PipelineOptions are present after serialization. */
@Test
public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception {
TupleTag<String> mainTag = new TupleTag<>("main-output");
Coder<WindowedValue<String>> coder = WindowedValue.getValueOnlyCoder(StringUtf8Coder.of());
DoFnOperator<String, String> doFnOperator =
new DoFnOperator<>(
new TestDoFn(),
"stepName",
coder,
null,
Collections.emptyMap(),
mainTag,
Collections.emptyList(),
new DoFnOperator.MultiOutputOutputManagerFactory<>(mainTag, coder),
WindowingStrategy.globalDefault(),
new HashMap<>(),
Collections.emptyList(),
options,
null, /* key coder */
null /* key selector */,
DoFnSchemaInformation.create(),
Collections.emptyMap());
final byte[] serialized = SerializationUtils.serialize(doFnOperator);
@SuppressWarnings("unchecked")
DoFnOperator<Object, Object> deserialized = SerializationUtils.deserialize(serialized);
TypeInformation<WindowedValue<Object>> typeInformation =
TypeInformation.of(new TypeHint<WindowedValue<Object>>() {});
OneInputStreamOperatorTestHarness<WindowedValue<Object>, WindowedValue<Object>> testHarness =
new OneInputStreamOperatorTestHarness<>(
deserialized, typeInformation.createSerializer(new ExecutionConfig()));
testHarness.open();
// execute once to access options
testHarness.processElement(
new StreamRecord<>(
WindowedValue.of(
new Object(), Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
testHarness.close();
}
private static class TestDoFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Assert.assertNotNull(c.getPipelineOptions());
Assert.assertEquals(
options.getTestOption(), c.getPipelineOptions().as(MyOptions.class).getTestOption());
}
}
}