| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * license agreements; and to You under the Apache License, version 2.0: |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * This file is part of the Apache Pekko project, which was derived from Akka. |
| */ |
| |
| /* |
| * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com> |
| */ |
| |
| package docs.slick |
| |
| import java.time.Instant |
| |
| import scala.concurrent.ExecutionContext |
| import scala.concurrent.duration._ |
| |
| import org.apache.pekko |
| import pekko.actor.typed.ActorSystem |
| import pekko.actor.typed.scaladsl.Behaviors |
| import pekko.persistence.jdbc.query.scaladsl.JdbcReadJournal |
| import pekko.projection.eventsourced.EventEnvelope |
| import pekko.projection.eventsourced.scaladsl.EventSourcedProvider |
| import docs.eventsourced.ShoppingCart |
| |
| //#projection-imports |
| import org.apache.pekko |
| import pekko.projection.ProjectionId |
| import pekko.projection.slick.SlickProjection |
| import slick.basic.DatabaseConfig |
| import slick.dbio.DBIO |
| import slick.jdbc.H2Profile |
| |
| //#projection-imports |
| |
| //#handler-imports |
| import scala.concurrent.Future |
| |
| import org.apache.pekko |
| import pekko.Done |
| import pekko.projection.slick.SlickHandler |
| import org.slf4j.LoggerFactory |
| |
| //#handler-imports |
| |
| class SlickProjectionDocExample { |
| |
| // #repository |
| case class Order(id: String, time: Instant) |
| |
| class OrderRepository(val dbConfig: DatabaseConfig[H2Profile]) { |
| |
| import dbConfig.profile.api._ |
| |
| private class OrdersTable(tag: Tag) extends Table[Order](tag, "ORDERS") { |
| def id = column[String]("CART_ID", O.PrimaryKey) |
| |
| def time = column[Instant]("TIME") |
| |
| def * = (id, time).mapTo[Order] |
| } |
| |
| private val ordersTable = TableQuery[OrdersTable] |
| |
| def save(order: Order)(implicit ec: ExecutionContext) = { |
| ordersTable.insertOrUpdate(order).map(_ => Done) |
| } |
| |
| def createTable(): Future[Unit] = |
| dbConfig.db.run(ordersTable.schema.createIfNotExists) |
| } |
| // #repository |
| |
| // #handler |
| class ShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext) |
| extends SlickHandler[EventEnvelope[ShoppingCart.Event]] { |
| private val logger = LoggerFactory.getLogger(getClass) |
| |
| override def process(envelope: EventEnvelope[ShoppingCart.Event]): DBIO[Done] = { |
| envelope.event match { |
| case ShoppingCart.CheckedOut(cartId, time) => |
| logger.info(s"Shopping cart $cartId was checked out at $time") |
| repository.save(Order(cartId, time)) |
| |
| case otherEvent => |
| logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent") |
| DBIO.successful(Done) |
| } |
| } |
| } |
| // #handler |
| |
| // #grouped-handler |
| import scala.collection.immutable |
| |
| class GroupedShoppingCartHandler(repository: OrderRepository)(implicit ec: ExecutionContext) |
| extends SlickHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] { |
| private val logger = LoggerFactory.getLogger(getClass) |
| |
| override def process(envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): DBIO[Done] = { |
| val dbios = envelopes.map(_.event).map { |
| case ShoppingCart.CheckedOut(cartId, time) => |
| logger.info(s"Shopping cart $cartId was checked out at $time") |
| repository.save(Order(cartId, time)) |
| |
| case otherEvent => |
| logger.debug(s"Shopping cart ${otherEvent.cartId} changed by $otherEvent") |
| DBIO.successful(Done) |
| } |
| DBIO.sequence(dbios).map(_ => Done) |
| } |
| } |
| // #grouped-handler |
| |
| implicit val system: ActorSystem[Nothing] = ActorSystem[Nothing](Behaviors.empty, "Example") |
| |
| // #db-config |
| val dbConfig: DatabaseConfig[H2Profile] = DatabaseConfig.forConfig("pekko.projection.slick", system.settings.config) |
| |
| val repository = new OrderRepository(dbConfig) |
| // #db-config |
| |
| // #sourceProvider |
| val sourceProvider = |
| EventSourcedProvider |
| .eventsByTag[ShoppingCart.Event](system, readJournalPluginId = JdbcReadJournal.Identifier, tag = "carts-1") |
| // #sourceProvider |
| |
| object IllustrateExactlyOnce { |
| // #exactlyOnce |
| implicit val ec: ExecutionContext = system.executionContext |
| |
| val projection = |
| SlickProjection.exactlyOnce( |
| projectionId = ProjectionId("ShoppingCarts", "carts-1"), |
| sourceProvider, |
| dbConfig, |
| handler = () => new ShoppingCartHandler(repository)) |
| // #exactlyOnce |
| } |
| |
| object IllustrateAtLeastOnce { |
| // #atLeastOnce |
| implicit val ec: ExecutionContext = system.executionContext |
| |
| val projection = |
| SlickProjection |
| .atLeastOnce( |
| projectionId = ProjectionId("ShoppingCarts", "carts-1"), |
| sourceProvider, |
| dbConfig, |
| handler = () => new ShoppingCartHandler(repository)) |
| .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis) |
| // #atLeastOnce |
| } |
| |
| object IllustrateGrouped { |
| // #grouped |
| implicit val ec: ExecutionContext = system.executionContext |
| |
| val projection = |
| SlickProjection |
| .groupedWithin( |
| projectionId = ProjectionId("ShoppingCarts", "carts-1"), |
| sourceProvider, |
| dbConfig, |
| handler = () => new GroupedShoppingCartHandler(repository)) |
| .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis) |
| // #grouped |
| } |
| |
| } |