blob: 5b4cb2525d9238f07dfc56a6e16896fac0502c8e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.examples.ridesharing;
import org.apache.flink.statefun.examples.ridesharing.generated.DriverJoinsRide;
import org.apache.flink.statefun.examples.ridesharing.generated.DriverRejectsPickup;
import org.apache.flink.statefun.examples.ridesharing.generated.InboundDriverMessage;
import org.apache.flink.statefun.examples.ridesharing.generated.JoinCell;
import org.apache.flink.statefun.examples.ridesharing.generated.LeaveCell;
import org.apache.flink.statefun.examples.ridesharing.generated.OutboundDriverMessage;
import org.apache.flink.statefun.examples.ridesharing.generated.PickupPassenger;
import org.apache.flink.statefun.examples.ridesharing.generated.RideEnded;
import org.apache.flink.statefun.examples.ridesharing.generated.RideStarted;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.annotations.Persisted;
import org.apache.flink.statefun.sdk.match.MatchBinder;
import org.apache.flink.statefun.sdk.match.StatefulMatchFunction;
import org.apache.flink.statefun.sdk.state.PersistedValue;
public class FnDriver extends StatefulMatchFunction {
static final FunctionType TYPE = new FunctionType(Identifiers.NAMESPACE, "driver");
@Persisted
private final PersistedValue<String> currentRideId = PersistedValue.of("ride", String.class);
@Persisted
private final PersistedValue<Integer> location = PersistedValue.of("location", Integer.class);
@Override
public void configure(MatchBinder binder) {
binder
.predicate(PickupPassenger.class, this::whenPickupIsNeeded)
.predicate(
InboundDriverMessage.class,
InboundDriverMessage::hasRideStarted,
this::whenRideHasStarted)
.predicate(
InboundDriverMessage.class, InboundDriverMessage::hasRideEnded, this::whenRideHasEnded)
.predicate(
InboundDriverMessage.class,
InboundDriverMessage::hasLocationUpdate,
this::whenLocationIsUpdated);
}
private void whenPickupIsNeeded(Context context, PickupPassenger pickupPassenger) {
if (isTaken()) {
// this driver is currently in a ride, and therefore can't take any more
// passengers.
context.reply(
DriverRejectsPickup.newBuilder()
.setDriverId(context.self().id())
.setRideId(context.caller().id())
.build());
return;
}
// We are called by the ride function, so we remember it's id for future communication.
currentRideId.set(context.caller().id());
// We also need to unregister ourselves from the current geo cell we belong to.
final int currentLocation =
location.getOrDefault(0); // drivers should have a location at this point.
context.send(FnGeoCell.TYPE, String.valueOf(currentLocation), LeaveCell.getDefaultInstance());
// reply to the ride, saying we are taking this passenger
context.reply(
DriverJoinsRide.newBuilder()
.setDriverId(context.self().id())
.setDriverLocation(currentLocation)
.build());
// also send a command to the physical driver to pickup the passenger
context.send(
Identifiers.TO_OUTBOUND_DRIVER,
OutboundDriverMessage.newBuilder()
.setDriverId(context.self().id())
.setPickupPassenger(
OutboundDriverMessage.PickupPassenger.newBuilder()
.setRideId(pickupPassenger.getPassengerId())
.setStartGeoLocation(pickupPassenger.getPassengerStartCell())
.setEndGeoLocation(pickupPassenger.getPassengerEndCell())
.build())
.build());
}
private void whenRideHasStarted(Context context, InboundDriverMessage ignored) {
context.send(
FnRide.TYPE,
currentRideId.get(),
RideStarted.newBuilder()
.setDriverId(context.self().id())
.setDriverGeoCell(location.get())
.build());
}
private void whenRideHasEnded(Context context, InboundDriverMessage ignored) {
context.send(FnRide.TYPE, currentRideId.get(), RideEnded.getDefaultInstance());
currentRideId.clear();
// register at the current location as free driver.
Integer currentLocation = location.get();
context.send(FnGeoCell.TYPE, String.valueOf(currentLocation), JoinCell.getDefaultInstance());
}
private void whenLocationIsUpdated(Context context, InboundDriverMessage locationUpdate) {
final int updated = locationUpdate.getLocationUpdate().getCurrentGeoCell();
final int last = location.getOrDefault(-1);
if (last == -1) {
// this is the first time this driver gets a location update.
// so we notify the relevant geo cell function.
location.set(updated);
context.send(FnGeoCell.TYPE, String.valueOf(updated), JoinCell.getDefaultInstance());
return;
}
if (last == updated) {
return;
}
location.set(updated);
}
private boolean isTaken() {
String rideId = currentRideId.get();
return rideId != null;
}
}