blob: 4006c77e6841e9eb12f2840c26e432b8d61f34db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.slick
import java.time.Instant
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.Behaviors
import pekko.persistence.jdbc.query.scaladsl.JdbcReadJournal
import pekko.projection.eventsourced.EventEnvelope
import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
import docs.eventsourced.ShoppingCart
//#projection-imports
import org.apache.pekko
import pekko.projection.ProjectionId
import pekko.projection.slick.SlickProjection
import slick.basic.DatabaseConfig
import slick.dbio.DBIO
import slick.jdbc.H2Profile
//#projection-imports
//#handler-imports
import scala.concurrent.Future
import org.apache.pekko
import pekko.Done
import pekko.projection.slick.SlickHandler
import org.slf4j.LoggerFactory
//#handler-imports
class SlickProjectionDocExample {
// #repository
case class Order(id: String, time: Instant)
class OrderRepository(val dbConfig: DatabaseConfig[H2Profile]) {
import dbConfig.profile.api._
private class OrdersTable(tag: Tag) extends Table[Order](tag, "ORDERS") {
def id = column[String]("CART_ID", O.PrimaryKey)
def time = column[Instant]("TIME")
def * = (id, time).mapTo[Order]
}
private val ordersTable = TableQuery[OrdersTable]
def save(order: Order)(implicit ec: ExecutionContext) = {
ordersTable.insertOrUpdate(order).map(_ => Done)
}
def createTable(): Future[Unit] =
dbConfig.db.run(ordersTable.schema.createIfNotExists)
}
// #repository
// #handler
class ShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext)
extends SlickHandler[EventEnvelope[ShoppingCart.Event]] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(envelope: EventEnvelope[ShoppingCart.Event]): DBIO[Done] = {
envelope.event match {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info(s"Shopping cart $cartId was checked out at $time")
repository.save(Order(cartId, time))
case otherEvent =>
logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
DBIO.successful(Done)
}
}
}
// #handler
// #grouped-handler
import scala.collection.immutable
class GroupedShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext)
extends SlickHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] {
private val logger = LoggerFactory.getLogger(getClass)
override def process(envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): DBIO[Done] = {
val dbios = envelopes.map(_.event).map {
case ShoppingCart.CheckedOut(cartId, time) =>
logger.info(s"Shopping cart $cartId was checked out at $time")
repository.save(Order(cartId, time))
case otherEvent =>
logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent")
DBIO.successful(Done)
}
DBIO.sequence(dbios).map(_ => Done)
}
}
// #grouped-handler
implicit val system: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "Example")
// #db-config
val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig("pekko.projection.slick", system.settings.config)
val repository = new OrderRepository(dbConfig)
// #db-config
// #sourceProvider
val sourceProvider =
EventSourcedProvider
.eventsByTag[ShoppingCart.Event](system, readJournalPluginId = JdbcReadJournal.Identifier, tag = "carts-1")
// #sourceProvider
object IllustrateExactlyOnce {
// #exactlyOnce
implicit val ec: ExecutionContext = system.executionContext
val projection =
SlickProjection.exactlyOnce(
projectionId = ProjectionId("ShoppingCarts", "carts-1"),
sourceProvider,
dbConfig,
handler = () => new ShoppingCartHandler(repository))
// #exactlyOnce
}
object IllustrateAtLeastOnce {
// #atLeastOnce
implicit val ec: ExecutionContext = system.executionContext
val projection =
SlickProjection
.atLeastOnce(
projectionId = ProjectionId("ShoppingCarts", "carts-1"),
sourceProvider,
dbConfig,
handler = () => new ShoppingCartHandler(repository))
.withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
// #atLeastOnce
}
object IllustrateGrouped {
// #grouped
implicit val ec: ExecutionContext = system.executionContext
val projection =
SlickProjection
.groupedWithin(
projectionId = ProjectionId("ShoppingCarts", "carts-1"),
sourceProvider,
dbConfig,
handler = () => new GroupedShoppingCartHandler(repository))
.withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
// #grouped
}
}