blob: 4f686fd378c565cb12ccb288449f43abf4271681 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.cassandra;
import java.time.Duration;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.projection.ProjectionContext;
import org.apache.pekko.stream.javadsl.FlowWithContext;
import jdocs.eventsourced.ShoppingCart;
// #daemon-imports
import org.apache.pekko.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
import org.apache.pekko.projection.ProjectionBehavior;
// #daemon-imports
// #singleton-imports
import org.apache.pekko.cluster.typed.ClusterSingleton;
import org.apache.pekko.cluster.typed.SingletonActor;
// #singleton-imports
// #source-provider-imports
import org.apache.pekko.persistence.cassandra.query.javadsl.CassandraReadJournal;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.projection.javadsl.SourceProvider;
import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider;
import org.apache.pekko.projection.eventsourced.EventEnvelope;
// #source-provider-imports
// #projection-imports
import org.apache.pekko.projection.cassandra.javadsl.CassandraProjection;
import org.apache.pekko.projection.Projection;
import org.apache.pekko.projection.ProjectionId;
// #projection-imports
// #handler-imports
import org.apache.pekko.projection.javadsl.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
// #handler-imports
// #withRecoveryStrategy
import org.apache.pekko.projection.HandlerRecoveryStrategy;
// #withRecoveryStrategy
// #get-offset
import org.apache.pekko.projection.javadsl.ProjectionManagement;
// #get-offset
// #update-offset
import org.apache.pekko.persistence.query.Sequence;
// #update-offset
public interface CassandraProjectionDocExample {
// #handler
public class ShoppingCartHandler extends Handler<EventEnvelope<ShoppingCart.Event>> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public CompletionStage<Done> process(EventEnvelope<ShoppingCart.Event> envelope) {
ShoppingCart.Event event = envelope.event();
if (event instanceof ShoppingCart.CheckedOut) {
ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
logger.info(
"Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);
return CompletableFuture.completedFuture(Done.getInstance());
} else {
logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
return CompletableFuture.completedFuture(Done.getInstance());
}
}
}
// #handler
// #grouped-handler
public class GroupedShoppingCartHandler extends Handler<List<EventEnvelope<ShoppingCart.Event>>> {
private Logger logger = LoggerFactory.getLogger(getClass());
@Override
public CompletionStage<Done> process(List<EventEnvelope<ShoppingCart.Event>> envelopes) {
envelopes.forEach(
env -> {
ShoppingCart.Event event = env.event();
if (event instanceof ShoppingCart.CheckedOut) {
ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
logger.info(
"Shopping cart {} was checked out at {}",
checkedOut.cartId,
checkedOut.eventTime);
} else {
logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
}
});
return CompletableFuture.completedFuture(Done.getInstance());
}
}
// #grouped-handler
public static void illustrateAtLeastOnce() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #sourceProvider
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
// #sourceProvider
// #atLeastOnce
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection =
CassandraProjection.atLeastOnce(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
() -> new ShoppingCartHandler())
.withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
// #atLeastOnce
}
public static void illustrateAtMostOnce() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
// #atMostOnce
Projection<EventEnvelope<ShoppingCart.Event>> projection =
CassandraProjection.atMostOnce(
ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, ShoppingCartHandler::new);
// #atMostOnce
}
public static void illustrateGrouped() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
// #grouped
int groupAfterEnvelopes = 20;
Duration groupAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection =
CassandraProjection.groupedWithin(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
GroupedShoppingCartHandler::new)
.withGroup(groupAfterEnvelopes, groupAfterDuration);
// #grouped
}
public static void illustrateAtLeastOnceFlow() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
// #atLeastOnceFlow
Logger logger = LoggerFactory.getLogger("example");
FlowWithContext<
EventEnvelope<ShoppingCart.Event>, ProjectionContext, Done, ProjectionContext, NotUsed>
flow =
FlowWithContext.<EventEnvelope<ShoppingCart.Event>, ProjectionContext>create()
.map(EventEnvelope::event)
.map(
event -> {
if (event instanceof ShoppingCart.CheckedOut) {
ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
logger.info(
"Shopping cart {} was checked out at {}",
checkedOut.cartId,
checkedOut.eventTime);
} else {
logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
}
return Done.getInstance();
});
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection =
CassandraProjection.atLeastOnceFlow(
ProjectionId.of("shopping-carts", "carts-1"), sourceProvider, flow)
.withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
// #atLeastOnceFlow
}
public static void illustrateRecoveryStrategy() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
// #withRecoveryStrategy
Projection<EventEnvelope<ShoppingCart.Event>> projection =
CassandraProjection.atLeastOnce(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
ShoppingCartHandler::new)
.withRecoveryStrategy(HandlerRecoveryStrategy.retryAndFail(10, Duration.ofSeconds(1)));
// #withRecoveryStrategy
}
public static void illustrateRestart() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
// #withRestartBackoff
Duration minBackoff = Duration.ofMillis(200);
Duration maxBackoff = Duration.ofSeconds(5);
double randomFactor = 0.1;
Projection<EventEnvelope<ShoppingCart.Event>> projection =
CassandraProjection.atLeastOnce(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
ShoppingCartHandler::new)
.withRestartBackoff(minBackoff, maxBackoff, randomFactor);
// #withRestartBackoff
}
static class IllustrateRunningWithShardedDaemon {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #running-source-provider
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) {
return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag);
}
// #running-source-provider
// #running-projection
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) {
return CassandraProjection.atLeastOnce(
ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new)
.withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
}
// #running-projection
public IllustrateRunningWithShardedDaemon() {
// #running-with-daemon-process
ShardedDaemonProcess.get(system)
.init(
ProjectionBehavior.Command.class,
"shopping-carts",
ShoppingCart.tags.size(),
id -> ProjectionBehavior.create(projection(ShoppingCart.tags.get((Integer) id))),
ProjectionBehavior.stopMessage());
// #running-with-daemon-process
}
}
static class IllustrateRunningWithActor {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
ActorContext<Void> context = null;
// #running-with-actor
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) {
return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag);
}
Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) {
return CassandraProjection.atLeastOnce(
ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new);
}
Projection<EventEnvelope<ShoppingCart.Event>> projection1 = projection("carts-1");
ActorRef<ProjectionBehavior.Command> projection1Ref =
context.spawn(ProjectionBehavior.create(projection1), projection1.projectionId().id());
// #running-with-actor
}
static class IllustrateRunningWithSingleton {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #running-with-singleton
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider(String tag) {
return EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag);
}
Projection<EventEnvelope<ShoppingCart.Event>> projection(String tag) {
return CassandraProjection.atLeastOnce(
ProjectionId.of("shopping-carts", tag), sourceProvider(tag), ShoppingCartHandler::new);
}
Projection<EventEnvelope<ShoppingCart.Event>> projection1 = projection("carts-1");
ActorRef<ProjectionBehavior.Command> projection1Ref =
ClusterSingleton.get(system)
.init(
SingletonActor.of(
ProjectionBehavior.create(projection1), projection1.projectionId().id())
.withStopMessage(ProjectionBehavior.stopMessage()));
// #running-with-singleton
}
public static void illustrateProjectionSettings() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
// #projection-settings
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection =
CassandraProjection.atLeastOnce(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
ShoppingCartHandler::new)
.withRestartBackoff(
Duration.ofSeconds(10), /*minBackoff*/
Duration.ofSeconds(60), /*maxBackoff*/
0.5 /*randomFactor*/)
.withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
// #projection-settings
}
public static void illustrateGetOffset() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #get-offset
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1");
CompletionStage<Optional<Offset>> currentOffset =
ProjectionManagement.get(system).getOffset(projectionId);
// #get-offset
}
public static void illustrateClearOffset() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #clear-offset
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1");
CompletionStage<Done> done = ProjectionManagement.get(system).clearOffset(projectionId);
// #clear-offset
}
public static void illustrateUpdateOffset() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #update-offset
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1");
CompletionStage<Optional<Sequence>> currentOffset =
ProjectionManagement.get(system).getOffset(projectionId);
currentOffset.thenAccept(
optionalOffset -> {
if (optionalOffset.isPresent()) {
Sequence newOffset = new Sequence(optionalOffset.get().value());
CompletionStage<Done> done =
ProjectionManagement.get(system).updateOffset(projectionId, newOffset);
}
});
// #update-offset
}
public static void illustrateIsPaused() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #is-paused
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1");
CompletionStage<Boolean> paused =
ProjectionManagement.get(system).isPaused(projectionId);
// #is-paused
}
public static void illustratPauseResume() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #pause-resume
ProjectionId projectionId = ProjectionId.of("shopping-carts", "carts-1");
ProjectionManagement mgmt = ProjectionManagement.get(system);
CompletionStage<Done> pauseDone = mgmt.pause(projectionId);
CompletionStage<Done> migrationDone = pauseDone.thenCompose(notUsed -> someDataMigration());
CompletionStage<Done> resumeDone = migrationDone.thenCompose(notUsed -> mgmt.resume(projectionId));
// #pause-resume
}
static CompletionStage<Done> someDataMigration() {
return CompletableFuture.completedFuture(Done.getInstance());
}
}