blob: 198b2c827c2c117da3a4762390307304ebc8430e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.avro.generated.Address;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.types.Either;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Supplier;
/**
* A simple stateful job that will be used to test avro state evolution and
* general state migration.
*/
public class StatefulStreamingJob {
private static final String EXPECTED_DEFAULT_VALUE = "123";
/**
* Stub source that emits one record per second.
*/
public static class MySource extends RichParallelSourceFunction<Integer> {
private static final long serialVersionUID = 1L;
private volatile boolean cancel;
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (!cancel) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(1);
}
Thread.sleep(100L);
}
}
@Override
public void cancel() {
cancel = true;
}
}
/**
* A stateful {@link RichMapFunction} that keeps the required types of state.
* That is:
* <ol>
* <li>an Avro,</li>
* <li>a Tuple2, and</li>
* <li>an Either type.</li>
* </ol>
*/
public static class MyStatefulFunction extends RichMapFunction<Integer, String> {
private static final long serialVersionUID = 1L;
private static final ValueStateDescriptor<Address> AVRO_DESCRIPTOR =
new ValueStateDescriptor<>("test-state", Address.class);
private static final ValueStateDescriptor<Tuple2<String, Integer>> TUPLE_DESCRIPTOR =
new ValueStateDescriptor<>("tuple-state",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
private static final ValueStateDescriptor<Either<String, Boolean>> EITHER_DESCRIPTOR =
new ValueStateDescriptor<>("either-state",
TypeInformation.of(new TypeHint<Either<String, Boolean>>() {}));
private transient ValueState<Address> avroState;
private transient ValueState<Tuple2<String, Integer>> tupleState;
private transient ValueState<Either<String, Boolean>> eitherState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.avroState = getRuntimeContext().getState(AVRO_DESCRIPTOR);
this.tupleState = getRuntimeContext().getState(TUPLE_DESCRIPTOR);
this.eitherState = getRuntimeContext().getState(EITHER_DESCRIPTOR);
}
@Override
public String map(Integer value) throws Exception {
touchState(tupleState, () -> Tuple2.of("19", 19));
touchState(eitherState, () -> Either.Left("255"));
final Address newAddress = Address.newBuilder()
.setCity("New York")
.setZip("10036")
.setStreet("555 W 42nd St")
.setState("NY")
.setNum(555)
.build();
Address existingAddress = avroState.value();
if (existingAddress != null) {
if (!Objects.equals(existingAddress.getAppno(), EXPECTED_DEFAULT_VALUE)) {
// this is expected to fail the job, if found in the output files.
System.out.println("Wrong Default Value.");
}
}
avroState.update(newAddress);
return "";
}
private static <T> void touchState(ValueState<T> state, Supplier<T> elements) throws IOException {
T elem = state.value();
if (elem == null) {
elem = elements.get();
}
state.update(elem);
}
}
public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);
final String checkpointDir = pt.getRequired("checkpoint.dir");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend(checkpointDir));
env.setRestartStrategy(RestartStrategies.noRestart());
env.enableCheckpointing(1000L);
env.getConfig().disableGenericTypes();
env.addSource(new MySource()).uid("my-source")
.keyBy(anInt -> 0)
.map(new MyStatefulFunction()).uid("my-map")
.addSink(new DiscardingSink<>()).uid("my-sink");
env.execute();
}
}