blob: ed1fcf98277f4921bd540834c727687f0219c636 [file] [log] [blame]
package sample.distributeddata;
import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.cluster.ddata.Key;
import org.apache.pekko.cluster.ddata.LWWMap;
import org.apache.pekko.cluster.ddata.LWWMapKey;
import org.apache.pekko.cluster.ddata.ReplicatedData;
import org.apache.pekko.cluster.ddata.SelfUniqueAddress;
import org.apache.pekko.cluster.ddata.typed.javadsl.DistributedData;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.GetFailure;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.GetResponse;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.GetSuccess;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.NotFound;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.ReadConsistency;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.ReadMajority;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.Update;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.UpdateFailure;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.UpdateResponse;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.UpdateSuccess;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.UpdateTimeout;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.WriteConsistency;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.WriteMajority;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator.Get;
import org.apache.pekko.cluster.ddata.typed.javadsl.Replicator;
import org.apache.pekko.cluster.ddata.typed.javadsl.ReplicatorMessageAdapter;
public class ShoppingCart {
//#read-write-majority
private final WriteConsistency writeMajority =
new WriteMajority(Duration.ofSeconds(3));
private final static ReadConsistency readMajority =
new ReadMajority(Duration.ofSeconds(3));
//#read-write-majority
public interface Command {}
public static final class GetCart implements Command {
public final ActorRef<Cart> replyTo;
public GetCart(ActorRef<Cart> replyTo) {
this.replyTo = replyTo;
}
}
public static class AddItem implements Command {
public final LineItem item;
public AddItem(LineItem item) {
this.item = item;
}
}
public static class RemoveItem implements Command {
public final String productId;
public RemoveItem(String productId) {
this.productId = productId;
}
}
public static class Cart {
public final Set<LineItem> items;
public Cart(Set<LineItem> items) {
this.items = items;
}
}
public static class LineItem {
public final String productId;
public final String title;
public final int quantity;
public LineItem(String productId, String title, int quantity) {
this.productId = productId;
this.title = title;
this.quantity = quantity;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((productId == null) ? 0 : productId.hashCode());
result = prime * result + quantity;
result = prime * result + ((title == null) ? 0 : title.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
LineItem other = (LineItem) obj;
if (productId == null) {
if (other.productId != null)
return false;
} else if (!productId.equals(other.productId))
return false;
if (quantity != other.quantity)
return false;
if (title == null) {
if (other.title != null)
return false;
} else if (!title.equals(other.title))
return false;
return true;
}
@Override
public String toString() {
return "LineItem [productId=" + productId + ", title=" + title + ", quantity=" + quantity + "]";
}
}
private interface InternalCommand extends Command {}
private static class InternalGetResponse implements InternalCommand {
public final GetResponse<LWWMap<String, LineItem>> rsp;
public final ActorRef<Cart> replyTo;
private InternalGetResponse(GetResponse<LWWMap<String, LineItem>> rsp, ActorRef<Cart> replyTo) {
this.rsp = rsp;
this.replyTo = replyTo;
}
}
private static class InternalUpdateResponse<A extends ReplicatedData> implements InternalCommand {
public final UpdateResponse<A> rsp;
private InternalUpdateResponse(UpdateResponse<A> rsp) {
this.rsp = rsp;
}
}
private static class InternalRemoveItem implements InternalCommand {
public final String productId;
public final GetResponse<LWWMap<String, LineItem>> rsp;
private InternalRemoveItem(String productId, GetResponse<LWWMap<String, LineItem>> rsp) {
this.productId = productId;
this.rsp = rsp;
}
}
public static Behavior<Command> create(String userId) {
return Behaviors.setup(context ->
DistributedData.withReplicatorMessageAdapter(
(ReplicatorMessageAdapter<Command, LWWMap<String, LineItem>> replicator) ->
new ShoppingCart(context, replicator, userId).createBehavior()));
}
private final ReplicatorMessageAdapter<Command, LWWMap<String, LineItem>> replicator;
private final Key<LWWMap<String, LineItem>> dataKey;
private final SelfUniqueAddress node;
public ShoppingCart(
ActorContext<Command> context,
ReplicatorMessageAdapter<Command, LWWMap<String, LineItem>> replicator,
String userId
) {
this.replicator = replicator;
this.dataKey = LWWMapKey.create("cart-" + userId);
node = DistributedData.get(context.getSystem()).selfUniqueAddress();
}
public Behavior<Command> createBehavior() {
return Behaviors
.receive(Command.class)
.onMessage(GetCart.class, this::onGetCart)
.onMessage(InternalGetResponse.class, this::onInternalGetResponse)
.onMessage(AddItem.class, this::onAddItem)
.onMessage(RemoveItem.class, this::onRemoveItem)
.onMessage(InternalRemoveItem.class, this::onInternalRemoveItem)
.onMessage(InternalUpdateResponse.class, this::onInternalUpdateResponse)
.build();
}
//#get-cart
private Behavior<Command> onGetCart(GetCart command) {
replicator.askGet(
askReplyTo -> new Get<>(dataKey, readMajority, askReplyTo),
rsp -> new InternalGetResponse(rsp, command.replyTo));
return Behaviors.same();
}
private Behavior<Command> onInternalGetResponse(InternalGetResponse msg) {
if (msg.rsp instanceof GetSuccess) {
LWWMap<String, LineItem> data = ((GetSuccess<LWWMap<String, LineItem>>) msg.rsp).get(dataKey);
msg.replyTo.tell(new Cart(new HashSet<>(data.getEntries().values())));
} else if (msg.rsp instanceof NotFound) {
msg.replyTo.tell(new Cart(new HashSet<>()));
} else if (msg.rsp instanceof GetFailure) {
// ReadMajority failure, try again with local read
replicator.askGet(
askReplyTo -> new Get<>(dataKey, Replicator.readLocal(), askReplyTo),
rsp -> new InternalGetResponse(rsp, msg.replyTo)
);
}
return Behaviors.same();
}
//#get-cart
//#add-item
private Behavior<Command> onAddItem(AddItem command) {
replicator.askUpdate(
askReplyTo ->
new Update<>(
dataKey,
LWWMap.empty(),
writeMajority,
askReplyTo,
cart -> updateCart(cart, command.item)
),
InternalUpdateResponse::new);
return Behaviors.same();
}
//#add-item
private LWWMap<String, LineItem> updateCart(LWWMap<String, LineItem> data, LineItem item) {
if (data.contains(item.productId)) {
LineItem existingItem = data.get(item.productId).get();
int newQuantity = existingItem.quantity + item.quantity;
LineItem newItem = new LineItem(item.productId, item.title, newQuantity);
return data.put(node, item.productId, newItem);
} else {
return data.put(node, item.productId, item);
}
}
//#remove-item
private Behavior<Command> onRemoveItem(RemoveItem command) {
// Try to fetch latest from a majority of nodes first, since ORMap
// remove must have seen the item to be able to remove it.
replicator.askGet(
askReplyTo -> new Get<>(dataKey, readMajority, askReplyTo),
rsp -> new InternalRemoveItem(command.productId, rsp));
return Behaviors.same();
}
private Behavior<Command> onInternalRemoveItem(InternalRemoveItem msg) {
if (msg.rsp instanceof GetSuccess) {
removeItem(msg.productId);
} else if (msg.rsp instanceof NotFound) {
/* nothing to remove */
} else if (msg.rsp instanceof GetFailure) {
// ReadMajority failed, fall back to best effort local value
removeItem(msg.productId);
}
return Behaviors.same();
}
private void removeItem(String productId) {
replicator.askUpdate(
askReplyTo ->
new Update<>(
dataKey,
LWWMap.empty(),
writeMajority,
askReplyTo,
cart -> cart.remove(node, productId)
),
InternalUpdateResponse::new);
}
//#remove-item
private Behavior<Command> onInternalUpdateResponse(InternalUpdateResponse<?> msg) {
if (msg.rsp instanceof UpdateSuccess) {
// ok
} else if (msg.rsp instanceof UpdateTimeout) {
// will eventually be replicated
} else if (msg.rsp instanceof UpdateFailure) {
throw new IllegalStateException("Unexpected failure: " + msg.rsp);
}
return Behaviors.same();
}
}