blob: c495fc7816fcdd42c962a5b3a3d8f9b4ae6c985d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.cassandra
import org.apache.pekko
import pekko.actor.typed.scaladsl.LoggerOps
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.Behaviors
import pekko.projection.ProjectionContext
import pekko.projection.eventsourced.EventEnvelope
import pekko.stream.scaladsl.FlowWithContext
//#daemon-imports
import org.apache.pekko
import pekko.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
import pekko.projection.ProjectionBehavior
//#daemon-imports
//#source-provider-imports
import org.apache.pekko
import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal
import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
import docs.eventsourced.ShoppingCart
//#source-provider-imports
//#projection-imports
import org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.cassandra.scaladsl.CassandraProjection
//#projection-imports
//#handler-imports
import scala.concurrent.duration._
import scala.concurrent.Future
import org.apache.pekko
import pekko.Done
import pekko.projection.scaladsl.Handler
import org.slf4j.LoggerFactory
//#handler-imports
object CassandraProjectionDocExample {
private val system = ActorSystem[Nothing](Behaviors.empty, "Example")
// #handler
class ShoppingCartHandler extends Handler[EventEnvelope[ShoppingCart.Event]] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = {
envelope.event match {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info2("Shopping cart {} was checked out at {}", cartId, time)
Future.successful(Done)
case otherEvent =>
logger.debug2("Shopping cart {} changed by {}", otherEvent.cartId, otherEvent)
Future.successful(Done)
}
}
}
// #handler
// #grouped-handler
import scala.collection.immutable
class GroupedShoppingCartHandler extends Handler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): Future[Done] = {
envelopes.map(_.event).foreach {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info2("Shopping cart {} was checked out at {}", cartId, time)
case otherEvent =>
logger.debug2("Shopping cart {} changed by {}", otherEvent.cartId, otherEvent)
}
Future.successful(Done)
}
}
// #grouped-handler
// #sourceProvider
val sourceProvider =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](system, readJournalPluginId = CassandraReadJournal.Identifier, tag = "carts-1")
// #sourceProvider
object IllustrateAtLeastOnce {
// #atLeastOnce
val projection =
CassandraProjection
.atLeastOnce(
projectionId = ProjectionId("shopping-carts", "carts-1"),
sourceProvider,
handler = () => new ShoppingCartHandler)
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
// #atLeastOnce
}
object IllustrateAtMostOnce {
// #atMostOnce
val projection =
CassandraProjection.atMostOnce(
projectionId = ProjectionId("shopping-carts", "carts-1"),
sourceProvider,
handler = () => new ShoppingCartHandler)
// #atMostOnce
}
object IllustrateGrouped {
// #grouped
val projection =
CassandraProjection
.groupedWithin(
projectionId = ProjectionId("shopping-carts", "carts-1"),
sourceProvider,
handler = () => new GroupedShoppingCartHandler)
.withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
// #grouped
}
object IllustrateAtLeastOnceFlow {
// #atLeastOnceFlow
val logger = LoggerFactory.getLogger(getClass)
val flow = FlowWithContext[EventEnvelope[ShoppingCart.Event], ProjectionContext]
.map(envelope => envelope.event)
.map {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info2("Shopping cart {} was checked out at {}", cartId, time)
Done
case otherEvent =>
logger.debug2("Shopping cart {} changed by {}", otherEvent.cartId, otherEvent)
Done
}
val projection =
CassandraProjection
.atLeastOnceFlow(projectionId = ProjectionId("shopping-carts", "carts-1"), sourceProvider, handler = flow)
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
// #atLeastOnceFlow
}
object IllustrateRecoveryStrategy {
// #withRecoveryStrategy
import org.apache.pekko.projection.HandlerRecoveryStrategy
val projection =
CassandraProjection
.atLeastOnce(
projectionId = ProjectionId("shopping-carts", "carts-1"),
sourceProvider,
handler = () => new ShoppingCartHandler)
.withRecoveryStrategy(HandlerRecoveryStrategy.retryAndFail(retries = 10, delay = 1.second))
// #withRecoveryStrategy
}
object IllustrateRestart {
// #withRestartBackoff
val projection =
CassandraProjection
.atLeastOnce(
projectionId = ProjectionId("shopping-carts", "carts-1"),
sourceProvider,
handler = () => new ShoppingCartHandler)
.withRestartBackoff(minBackoff = 200.millis, maxBackoff = 5.seconds, randomFactor = 0.1)
// #withRestartBackoff
}
object IllustrateRunningWithShardedDaemon {
// #running-source-provider
def sourceProvider(tag: String) =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](
system = system,
readJournalPluginId = CassandraReadJournal.Identifier,
tag = tag)
// #running-source-provider
// #running-projection
def projection(tag: String) =
CassandraProjection
.atLeastOnce(
projectionId = ProjectionId("shopping-carts", tag),
sourceProvider(tag),
handler = () => new ShoppingCartHandler)
.withSaveOffset(100, 500.millis)
// #running-projection
// #running-with-daemon-process
ShardedDaemonProcess(system).init[ProjectionBehavior.Command](
name = "shopping-carts",
numberOfInstances = ShoppingCart.tags.size,
behaviorFactory = (i: Int) => ProjectionBehavior(projection(ShoppingCart.tags(i))),
stopMessage = ProjectionBehavior.Stop)
// #running-with-daemon-process
}
object IllustrateRunningWithActor {
Behaviors.setup[String] { context =>
// #running-with-actor
def sourceProvider(tag: String) =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](
system = system,
readJournalPluginId = CassandraReadJournal.Identifier,
tag = tag)
def projection(tag: String) =
CassandraProjection
.atLeastOnce(
projectionId = ProjectionId("shopping-carts", tag),
sourceProvider(tag),
handler = () => new ShoppingCartHandler)
val projection1 = projection("carts-1")
context.spawn(ProjectionBehavior(projection1), projection1.projectionId.id)
// #running-with-actor
Behaviors.empty
}
}
object IllustrateRunningWithSingleton {
// #running-with-singleton
import org.apache.pekko
import pekko.cluster.typed.ClusterSingleton
import pekko.cluster.typed.SingletonActor
def sourceProvider(tag: String) =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](
system = system,
readJournalPluginId = CassandraReadJournal.Identifier,
tag = tag)
def projection(tag: String) =
CassandraProjection
.atLeastOnce(
projectionId = ProjectionId("shopping-carts", tag),
sourceProvider(tag),
handler = () => new ShoppingCartHandler)
val projection1 = projection("carts-1")
ClusterSingleton(system).init(
SingletonActor(ProjectionBehavior(projection1), projection1.projectionId.id)
.withStopMessage(ProjectionBehavior.Stop))
// #running-with-singleton
}
object IllustrateProjectionSettings {
// #projection-settings
val projection =
CassandraProjection
.atLeastOnce(
projectionId = ProjectionId("shopping-carts", "carts-1"),
sourceProvider,
handler = () => new ShoppingCartHandler)
.withRestartBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.5)
.withSaveOffset(100, 500.millis)
// #projection-settings
}
object IllustrateGetOffset {
// #get-offset
import org.apache.pekko
import pekko.projection.scaladsl.ProjectionManagement
import pekko.persistence.query.Offset
import pekko.projection.ProjectionId
val projectionId = ProjectionId("shopping-carts", "carts-1")
val currentOffset: Future[Option[Offset]] = ProjectionManagement(system).getOffset[Offset](projectionId)
// #get-offset
}
object IllustrateClearOffset {
import org.apache.pekko.projection.scaladsl.ProjectionManagement
// #clear-offset
val projectionId = ProjectionId("shopping-carts", "carts-1")
val done: Future[Done] = ProjectionManagement(system).clearOffset(projectionId)
// #clear-offset
}
object IllustrateUpdateOffset {
import org.apache.pekko
import pekko.projection.scaladsl.ProjectionManagement
import system.executionContext
// #update-offset
import org.apache.pekko.persistence.query.Sequence
val projectionId = ProjectionId("shopping-carts", "carts-1")
val currentOffset: Future[Option[Sequence]] = ProjectionManagement(system).getOffset[Sequence](projectionId)
currentOffset.foreach {
case Some(s) => ProjectionManagement(system).updateOffset[Sequence](projectionId, Sequence(s.value + 1))
case None => // already removed
}
// #update-offset
}
object IllustratePauseResume {
import system.executionContext
def someDataMigration() = Future.successful(Done)
// #pause-resume
import org.apache.pekko
import pekko.projection.scaladsl.ProjectionManagement
import pekko.projection.ProjectionId
val projectionId = ProjectionId("shopping-carts", "carts-1")
val mgmt = ProjectionManagement(system)
val done = for {
_ <- mgmt.pause(projectionId)
_ <- someDataMigration()
_ <- mgmt.resume(projectionId)
} yield Done
// #pause-resume
}
object IllustrateIsPaused {
import org.apache.pekko
import pekko.projection.scaladsl.ProjectionManagement
import pekko.projection.ProjectionId
// #is-paused
val projectionId = ProjectionId("shopping-carts", "carts-1")
val paused: Future[Boolean] = ProjectionManagement(system).isPaused(projectionId)
// #is-paused
}
}