blob: 72a15879eecaa803959e8dfc5d38d53ccd4905d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.statemachine;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.statemachine.dfa.State;
import org.apache.flink.streaming.examples.statemachine.event.Alert;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializationSchema;
import org.apache.flink.util.Collector;
/**
* Main class of the state machine example. This class implements the streaming application that
* receives the stream of events and evaluates a state machine (per originating address) to validate
* that the events follow the state machine's rules.
*/
public class StateMachineExample {
/**
* Main entry point for the program.
*
* @param args The command line arguments.
*/
public static void main(String[] args) throws Exception {
// ---- print some usage help ----
System.out.println(
"Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]");
System.out.println(
"Usage with Kafka: StateMachineExample --kafka-topic <topic> [--brokers <brokers>]");
System.out.println("Options for both the above setups: ");
System.out.println("\t[--backend <hashmap|rocks>]");
System.out.println("\t[--checkpoint-dir <filepath>]");
System.out.println("\t[--incremental-checkpoints <true|false>]");
System.out.println("\t[--output <filepath> OR null for stdout]");
System.out.println();
// ---- determine whether to use the built-in source, or read from Kafka ----
final DataStream<Event> events;
final ParameterTool params = ParameterTool.fromArgs(args);
// create the environment to create streams and configure execution
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2000L);
final String stateBackend = params.get("backend", "memory");
if ("hashmap".equals(stateBackend)) {
final String checkpointDir = params.get("checkpoint-dir");
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
} else if ("rocks".equals(stateBackend)) {
final String checkpointDir = params.get("checkpoint-dir");
boolean incrementalCheckpoints = params.getBoolean("incremental-checkpoints", false);
env.setStateBackend(new EmbeddedRocksDBStateBackend(incrementalCheckpoints));
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
}
if (params.has("kafka-topic")) {
// set up the Kafka reader
String kafkaTopic = params.get("kafka-topic");
String brokers = params.get("brokers", "localhost:9092");
System.out.printf("Reading from kafka topic %s @ %s\n", kafkaTopic, brokers);
System.out.println();
KafkaSource<Event> source =
KafkaSource.<Event>builder()
.setBootstrapServers(brokers)
.setGroupId("stateMachineExample")
.setTopics(kafkaTopic)
.setDeserializer(
KafkaRecordDeserializationSchema.valueOnly(
new EventDeSerializationSchema()))
.setStartingOffsets(OffsetsInitializer.latest())
.build();
events =
env.fromSource(
source, WatermarkStrategy.noWatermarks(), "StateMachineExampleSource");
} else {
double errorRate = params.getDouble("error-rate", 0.0);
int sleep = params.getInt("sleep", 1);
System.out.printf(
"Using standalone source with error rate %f and sleep delay %s millis\n",
errorRate, sleep);
System.out.println();
events = env.addSource(new EventsGeneratorSource(errorRate, sleep));
}
// ---- main program ----
final String outputFile = params.get("output");
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
DataStream<Alert> alerts =
events
// partition on the address to make sure equal addresses
// end up in the same state machine flatMap function
.keyBy(Event::sourceAddress)
// the function that evaluates the state machine over the sequence of events
.flatMap(new StateMachineMapper());
// output the alerts to std-out
if (outputFile == null) {
alerts.print();
} else {
alerts.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
}
// trigger program execution
env.execute("State machine job");
}
// ------------------------------------------------------------------------
/**
* The function that maintains the per-IP-address state machines and verifies that the events
* are consistent with the current state of the state machine. If the event is not consistent
* with the current state, the function produces an alert.
*/
@SuppressWarnings("serial")
static class StateMachineMapper extends RichFlatMapFunction<Event, Alert> {
/** The state for the current key. */
private ValueState<State> currentState;
@Override
public void open(Configuration conf) {
// get access to the state object
currentState =
getRuntimeContext().getState(new ValueStateDescriptor<>("state", State.class));
}
@Override
public void flatMap(Event evt, Collector<Alert> out) throws Exception {
// get the current state for the key (source address)
// if no state exists, yet, the state must be the state machine's initial state
State state = currentState.value();
if (state == null) {
state = State.Initial;
}
// ask the state machine what state we should go to based on the given event
State nextState = state.transition(evt.type());
if (nextState == State.InvalidTransition) {
// the current event resulted in an invalid transition
// raise an alert!
out.collect(new Alert(evt.sourceAddress(), state, evt.type()));
} else if (nextState.isTerminal()) {
// we reached a terminal state, clean up the current state
currentState.clear();
} else {
// remember the new state
currentState.update(nextState);
}
}
}
}