blob: fe16254d6601487061cd43e157301ec146bd22d9 [file] [log] [blame]
package sample.persistence;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.SupervisorStrategy;
import org.apache.pekko.pattern.StatusReply;
import org.apache.pekko.persistence.typed.PersistenceId;
import org.apache.pekko.persistence.typed.javadsl.CommandHandler;
import org.apache.pekko.persistence.typed.javadsl.CommandHandlerBuilder;
import org.apache.pekko.persistence.typed.javadsl.Effect;
import org.apache.pekko.persistence.typed.javadsl.EventHandler;
import org.apache.pekko.persistence.typed.javadsl.EventSourcedBehavior;
import org.apache.pekko.persistence.typed.javadsl.RetentionCriteria;
import com.fasterxml.jackson.annotation.JsonCreator;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/**
* This is an event sourced actor. It has a state, {@link ShoppingCart.State}, which
* stores the current shopping cart items and whether it's checked out.
*
* Event sourced actors are interacted with by sending them commands,
* see classes implementing {@link ShoppingCart.Command}.
*
* Commands get translated to events, see classes implementing {@link ShoppingCart.Event}.
* It's the events that get persisted by the entity. Each event will have an event handler
* registered for it, and an event handler updates the current state based on the event.
* This will be done when the event is first created, and it will also be done when the entity is
* loaded from the database - each event will be replayed to recreate the state
* of the entity.
*/
public class ShoppingCart
extends EventSourcedBehavior<ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {
/**
* The state for the {@link ShoppingCart} entity.
*/
public final class State implements CborSerializable {
private Map<String, Integer> items = new HashMap<>();
private Optional<Instant> checkoutDate = Optional.empty();
public boolean isCheckedOut() {
return checkoutDate.isPresent();
}
public Optional<Instant> getCheckoutDate() {
return checkoutDate;
}
public boolean isEmpty() {
return items.isEmpty();
}
public boolean hasItem(String itemId) {
return items.containsKey(itemId);
}
public State updateItem(String itemId, int quantity) {
if (quantity == 0) {
items.remove(itemId);
} else {
items.put(itemId, quantity);
}
return this;
}
public State removeItem(String itemId) {
items.remove(itemId);
return this;
}
public State checkout(Instant now) {
checkoutDate = Optional.of(now);
return this;
}
public Summary toSummary() {
return new Summary(items, isCheckedOut());
}
}
/**
* This interface defines all the commands that the ShoppingCart persistent actor supports.
*/
public interface Command extends CborSerializable {
}
/**
* A command to add an item to the cart.
*
* It can reply with `StatusReply<Summary>`, which is sent back to the caller when
* all the events emitted by this command are successfully persisted.
*/
public static class AddItem implements Command {
public final String itemId;
public final int quantity;
public final ActorRef<StatusReply<Summary>> replyTo;
public AddItem(String itemId, int quantity, ActorRef<StatusReply<Summary>> replyTo) {
this.itemId = itemId;
this.quantity = quantity;
this.replyTo = replyTo;
}
}
/**
* A command to remove an item from the cart.
*/
public static class RemoveItem implements Command {
public final String itemId;
public final ActorRef<StatusReply<Summary>> replyTo;
@JsonCreator
public RemoveItem(String itemId, ActorRef<StatusReply<Summary>> replyTo) {
this.itemId = itemId;
this.replyTo = replyTo;
}
}
/**
* A command to adjust the quantity of an item in the cart.
*/
public static class AdjustItemQuantity implements Command {
public final String itemId;
public final int quantity;
public final ActorRef<StatusReply<Summary>> replyTo;
public AdjustItemQuantity(String itemId, int quantity, ActorRef<StatusReply<Summary>> replyTo) {
this.itemId = itemId;
this.quantity = quantity;
this.replyTo = replyTo;
}
}
/**
* A command to get the current state of the shopping cart.
*
* The reply type is the {@link Summary}
*/
public static class Get implements Command {
public final ActorRef<Summary> replyTo;
@JsonCreator
public Get(ActorRef<Summary> replyTo) {
this.replyTo = replyTo;
}
}
/**
* A command to checkout the shopping cart.
*
* The reply type is the {@link StatusReply<Summary>}, which will be returned when the events have been
* emitted.
*/
public static class Checkout implements Command {
public final ActorRef<StatusReply<Summary>> replyTo;
@JsonCreator
public Checkout(ActorRef<StatusReply<Summary>> replyTo) {
this.replyTo = replyTo;
}
}
/**
* Summary of the shopping cart state, used in reply messages.
*/
public static final class Summary implements CborSerializable {
public final Map<String, Integer> items;
public final boolean checkedOut;
public Summary(Map<String, Integer> items, boolean checkedOut) {
// Summary is included in messages and should therefore be immutable
this.items = Collections.unmodifiableMap(new HashMap<>(items));
this.checkedOut = checkedOut;
}
}
public interface Event extends CborSerializable {
}
public static final class ItemAdded implements Event {
public final String cartId;
public final String itemId;
public final int quantity;
public ItemAdded(String cartId, String itemId, int quantity) {
this.cartId = cartId;
this.itemId = itemId;
this.quantity = quantity;
}
@Override
public String toString() {
return "ItemAdded(" + cartId + "," + itemId + "," + quantity + ")";
}
}
public static final class ItemRemoved implements Event {
public final String cartId;
public final String itemId;
public ItemRemoved(String cartId, String itemId) {
this.cartId = cartId;
this.itemId = itemId;
}
@Override
public String toString() {
return "ItemRemoved(" + cartId + "," + itemId + ")";
}
}
public static final class ItemQuantityAdjusted implements Event {
public final String cartId;
public final String itemId;
public final int quantity;
public ItemQuantityAdjusted(String cartId, String itemId, int quantity) {
this.cartId = cartId;
this.itemId = itemId;
this.quantity = quantity;
}
@Override
public String toString() {
return "ItemQuantityAdjusted(" + cartId + "," + itemId + "," + quantity + ")";
}
}
public static class CheckedOut implements Event {
public final String cartId;
public final Instant eventTime;
public CheckedOut(String cartId, Instant eventTime) {
this.cartId = cartId;
this.eventTime = eventTime;
}
@Override
public String toString() {
return "CheckedOut(" + cartId + "," + eventTime + ")";
}
}
public static Behavior<Command> create(String cartId) {
return new ShoppingCart(cartId);
}
private final String cartId;
private ShoppingCart(String cartId) {
super(PersistenceId.of("ShoppingCart", cartId),
SupervisorStrategy.restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(5), 0.1));
this.cartId = cartId;
}
@Override
public State emptyState() {
return new State();
}
private final CheckedOutCommandHandlers checkedOutCommandHandlers = new CheckedOutCommandHandlers();
private final OpenShoppingCartCommandHandlers openShoppingCartCommandHandlers = new OpenShoppingCartCommandHandlers();
@Override
public CommandHandler<Command, Event, State> commandHandler() {
CommandHandlerBuilder<Command, Event, State> b =
newCommandHandlerBuilder();
b.forState(state -> !state.isCheckedOut())
.onCommand(AddItem.class, openShoppingCartCommandHandlers::onAddItem)
.onCommand(RemoveItem.class, openShoppingCartCommandHandlers::onRemoveItem)
.onCommand(AdjustItemQuantity.class, openShoppingCartCommandHandlers::onAdjustItemQuantity)
.onCommand(Checkout.class, openShoppingCartCommandHandlers::onCheckout);
b.forState(state -> state.isCheckedOut())
.onCommand(AddItem.class, checkedOutCommandHandlers::onAddItem)
.onCommand(RemoveItem.class, checkedOutCommandHandlers::onRemoveItem)
.onCommand(AdjustItemQuantity.class, checkedOutCommandHandlers::onAdjustItemQuantity)
.onCommand(Checkout.class, checkedOutCommandHandlers::onCheckout);
b.forAnyState()
.onCommand(Get.class, this::onGet);
return b.build();
}
private Effect<Event, State> onGet(State state, Get cmd) {
cmd.replyTo.tell(state.toSummary());
return Effect().none();
}
private class OpenShoppingCartCommandHandlers {
Effect<Event, State> onAddItem(State state, AddItem cmd) {
if (state.hasItem(cmd.itemId)) {
cmd.replyTo.tell(StatusReply.error(
"Item '" + cmd.itemId + "' was already added to this shopping cart"));
return Effect().none();
} else if (cmd.quantity <= 0) {
cmd.replyTo.tell(StatusReply.error("Quantity must be greater than zero"));
return Effect().none();
} else {
return Effect().persist(new ItemAdded(cartId, cmd.itemId, cmd.quantity))
.thenRun(updatedCart -> cmd.replyTo.tell(StatusReply.success(updatedCart.toSummary())));
}
}
Effect<Event, State> onRemoveItem(State state, RemoveItem cmd) {
if (state.hasItem(cmd.itemId)) {
return Effect().persist(new ItemRemoved(cartId, cmd.itemId))
.thenRun(updatedCart -> cmd.replyTo.tell(StatusReply.success(updatedCart.toSummary())));
} else {
cmd.replyTo.tell(StatusReply.success(state.toSummary()));
return Effect().none();
}
}
Effect<Event, State> onAdjustItemQuantity(State state, AdjustItemQuantity cmd) {
if (cmd.quantity <= 0) {
cmd.replyTo.tell(StatusReply.error("Quantity must be greater than zero"));
return Effect().none();
} else if (state.hasItem(cmd.itemId)) {
return Effect().persist(new ItemQuantityAdjusted(cartId, cmd.itemId, cmd.quantity))
.thenRun(updatedCart -> cmd.replyTo.tell(StatusReply.success(updatedCart.toSummary())));
} else {
cmd.replyTo.tell(StatusReply.error(
"Cannot adjust quantity for item '" + cmd.itemId + "'. Item not present on cart"));
return Effect().none();
}
}
Effect<Event, State> onCheckout(State state, Checkout cmd) {
if (state.isEmpty()) {
cmd.replyTo.tell(StatusReply.error("Cannot checkout an empty shopping cart"));
return Effect().none();
} else {
return Effect().persist(new CheckedOut(cartId, Instant.now()))
.thenRun(updatedCart -> cmd.replyTo.tell(StatusReply.success(updatedCart.toSummary())));
}
}
}
private class CheckedOutCommandHandlers {
Effect<Event, State> onAddItem(AddItem cmd) {
cmd.replyTo.tell(StatusReply.error("Can't add an item to an already checked out shopping cart"));
return Effect().none();
}
Effect<Event, State> onRemoveItem(RemoveItem cmd) {
cmd.replyTo.tell(StatusReply.error("Can't remove an item from an already checked out shopping cart"));
return Effect().none();
}
Effect<Event, State> onAdjustItemQuantity(AdjustItemQuantity cmd) {
cmd.replyTo.tell(StatusReply.error("Can't adjust item on an already checked out shopping cart"));
return Effect().none();
}
Effect<Event, State> onCheckout(Checkout cmd) {
cmd.replyTo.tell(StatusReply.error("Can't checkout already checked out shopping cart"));
return Effect().none();
}
}
@Override
public EventHandler<State, Event> eventHandler() {
return newEventHandlerBuilder().forAnyState()
.onEvent(ItemAdded.class, (state, event) -> state.updateItem(event.itemId, event.quantity))
.onEvent(ItemRemoved.class, (state, event) -> state.removeItem(event.itemId))
.onEvent(ItemQuantityAdjusted.class, (state, event) -> state.updateItem(event.itemId, event.quantity))
.onEvent(CheckedOut.class, (state, event) -> state.checkout(event.eventTime))
.build();
}
@Override
public RetentionCriteria retentionCriteria() {
// enable snapshotting
return RetentionCriteria.snapshotEvery(100, 3);
}
}