blob: 025454010cef159840cca82fae439727862b696b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.training.solutions.ridesandfares;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiFareSource;
import org.apache.flink.training.exercises.common.sources.TaxiRideSource;
import org.apache.flink.training.exercises.common.utils.ExerciseBase;
import org.apache.flink.util.Collector;
/**
* Java reference implementation for the "Stateful Enrichment" exercise of the Flink training in the docs.
*
* <p>The goal for this exercise is to enrich TaxiRides with fare information.
*
* <p>Parameters:
* -rides path-to-input-file
* -fares path-to-input-file
*/
public class RidesAndFaresSolution extends ExerciseBase {
/**
* Main method.
*
* <p>Parameters:
* -rides path-to-input-file
* -fares path-to-input-file
*
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
final String ridesFile = params.get("rides", PATH_TO_RIDE_DATA);
final String faresFile = params.get("fares", PATH_TO_FARE_DATA);
final int delay = 60; // at most 60 seconds of delay
final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second
// Set up streaming execution environment, including Web UI and REST endpoint.
// Checkpointing isn't needed for the RidesAndFares exercise; this setup is for
// using the State Processor API.
Configuration conf = new Configuration();
conf.setString("state.backend", "filesystem");
conf.setString("state.savepoints.dir", "file:///tmp/savepoints");
conf.setString("state.checkpoints.dir", "file:///tmp/checkpoints");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(ExerciseBase.parallelism);
env.enableCheckpointing(10000L);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream<TaxiRide> rides = env
.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
.filter((TaxiRide ride) -> ride.isStart)
.keyBy(ride -> ride.rideId);
DataStream<TaxiFare> fares = env
.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
.keyBy(fare -> fare.rideId);
// Set a UID on the stateful flatmap operator so we can read its state using the State Processor API.
DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
.connect(fares)
.flatMap(new EnrichmentFunction())
.uid("enrichment");
printOrTest(enrichedRides);
env.execute("Join Rides with Fares (java RichCoFlatMap)");
}
public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
// keyed, managed state
private ValueState<TaxiRide> rideState;
private ValueState<TaxiFare> fareState;
@Override
public void open(Configuration config) {
rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
}
@Override
public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
out.collect(Tuple2.of(ride, fare));
} else {
rideState.update(ride);
}
}
@Override
public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
TaxiRide ride = rideState.value();
if (ride != null) {
rideState.clear();
out.collect(Tuple2.of(ride, fare));
} else {
fareState.update(fare);
}
}
}
}