blob: 90e623f5a54d268c084cb2c2f07865567ebf6cb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.examples.ridesharing;
import org.apache.flink.statefun.examples.ridesharing.generated.DriverInCell;
import org.apache.flink.statefun.examples.ridesharing.generated.DriverJoinsRide;
import org.apache.flink.statefun.examples.ridesharing.generated.DriverRejectsPickup;
import org.apache.flink.statefun.examples.ridesharing.generated.GetDriver;
import org.apache.flink.statefun.examples.ridesharing.generated.PassengerJoinsRide;
import org.apache.flink.statefun.examples.ridesharing.generated.PickupPassenger;
import org.apache.flink.statefun.examples.ridesharing.generated.RideEnded;
import org.apache.flink.statefun.examples.ridesharing.generated.RideFailed;
import org.apache.flink.statefun.examples.ridesharing.generated.RideStarted;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.match.MatchBinder;
import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
import org.apache.flink.statefun.sdk.state.PersistedValue;
final class FnRide extends StatefulMatchFunction {
static final FunctionType TYPE = new FunctionType(Identifiers.NAMESPACE, "ride");
@Persisted
private final PersistedValue<PassengerJoinsRide> passenger =
PersistedValue.of("passenger", PassengerJoinsRide.class);
@Persisted
private final PersistedValue<String> driver = PersistedValue.of("driver", String.class);
public void configure(MatchBinder binder) {
binder
.predicate(PassengerJoinsRide.class, this::whenPassengerJoins)
.predicate(DriverInCell.class, this::whenGeoCellResponds)
.predicate(DriverRejectsPickup.class, this::whenDriverRejectsPickup)
.predicate(DriverJoinsRide.class, this::whenDriverJoins)
.predicate(RideStarted.class, this::whenRideHasStarted)
.predicate(RideEnded.class, this::whenRideHasEnded);
}
/**
* When a user joins a ride, we have to: 1. remember that user id 2. remember the starting
* location of that ride 3. contact the geo cell of the starting location, and ask for a free
* driver
*/
private void whenPassengerJoins(Context context, PassengerJoinsRide in) {
final String cellKey = String.valueOf(in.getStartGeoCell());
passenger.set(in);
context.send(FnGeoCell.TYPE, cellKey, GetDriver.getDefaultInstance());
}
/**
* Geo cell responds, it might respond with: - there is no driver, in that case we fail the ride -
* there is a driver, let's ask them to pickup the passenger.
*/
private void whenGeoCellResponds(Context context, DriverInCell in) {
final String driverId = in.getDriverId();
final PassengerJoinsRide rideRequest = passenger.get();
if (driverId != null && !driverId.isEmpty()) {
context.send(
FnDriver.TYPE,
driverId,
PickupPassenger.newBuilder()
.setPassengerId(rideRequest.getPassengerId())
.setPassengerStartCell(rideRequest.getStartGeoCell())
.setPassengerEndCell(rideRequest.getEndGeoCell())
.build());
return;
}
// no free drivers in this geo cell, at this example we just fail the ride
// but we can imagine that this is where we will expand our search to near geo cells
context.send(FnPassenger.TYPE, rideRequest.getPassengerId(), RideFailed.getDefaultInstance());
// by clearing our state, we essentially delete this instance of the ride actor
passenger.clear();
}
/**
* A driver might not be free, or for some other reason they can't take this ride, so we try
* another driver in that cell.
*/
@SuppressWarnings("unused")
private void whenDriverRejectsPickup(Context context, DriverRejectsPickup ignored) {
// try another driver, realistically we need to pass in a list of 'banned' drivers,
// so that the GeoCell will not offer us these drivers again, but in this example
// if a driver rejects a ride, it means that he is currently busy (and it would soon delete
// itself from the geo cell)
final int startGeoCell = passenger.get().getStartGeoCell();
String cellKey = String.valueOf(startGeoCell);
context.send(FnGeoCell.TYPE, cellKey, GetDriver.getDefaultInstance());
}
/** A driver has taken this ride. */
private void whenDriverJoins(Context context, DriverJoinsRide driverJoinRide) {
driver.set(context.caller().id());
context.send(FnPassenger.TYPE, passenger.get().getPassengerId(), driverJoinRide);
}
/** A driver has successfully picked up the passenger */
private void whenRideHasStarted(Context context, RideStarted rideStarted) {
context.send(FnPassenger.TYPE, passenger.get().getPassengerId(), rideStarted);
}
/** The driver has successfully reached the destination. */
private void whenRideHasEnded(Context context, RideEnded rideEnded) {
context.send(FnPassenger.TYPE, passenger.get().getPassengerId(), rideEnded);
passenger.clear();
driver.clear();
}
}