| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * license agreements; and to You under the Apache License, version 2.0: |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * This file is part of the Apache Pekko project, which was derived from Akka. |
| */ |
| |
| /* |
| * Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com> |
| */ |
| |
| package docs.cassandra |
| |
| import java.util.UUID |
| |
| import scala.concurrent.Await |
| import scala.concurrent.ExecutionContext |
| import scala.concurrent.duration._ |
| |
| import org.apache.pekko |
| import pekko.Done |
| import pekko.actor.testkit.typed.scaladsl.LogCapturing |
| import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit |
| import pekko.projection.Projection |
| import pekko.projection.ProjectionId |
| import pekko.projection.cassandra.ContainerSessionProvider |
| import pekko.projection.cassandra.scaladsl.CassandraProjection |
| import pekko.projection.testkit.scaladsl.ProjectionTestKit |
| import pekko.stream.connectors.cassandra.scaladsl.CassandraSessionRegistry |
| import docs.cassandra.WordCountDocExample._ |
| import org.scalatest.wordspec.AnyWordSpecLike |
| |
| class WordCountDocExampleSpec |
| extends ScalaTestWithActorTestKit(ContainerSessionProvider.Config) |
| with AnyWordSpecLike |
| with LogCapturing { |
| |
| private implicit val ec: ExecutionContext = system.executionContext |
| private val session = CassandraSessionRegistry(system).sessionFor("pekko.projection.cassandra.session-config") |
| private val repository = new CassandraWordCountRepository(session) |
| private val projectionTestKit = ProjectionTestKit(system) |
| |
| override protected def beforeAll(): Unit = { |
| super.beforeAll() |
| |
| // don't use futureValue (patience) here because it can take a while to start the test container |
| Await.result(ContainerSessionProvider.started, 30.seconds) |
| |
| Await.result(for { |
| _ <- CassandraProjection.createTablesIfNotExists() |
| _ <- repository.createKeyspaceAndTable() |
| } yield Done, 30.seconds) |
| } |
| |
| override protected def afterAll(): Unit = { |
| Await.ready(for { |
| _ <- session.executeDDL(s"DROP keyspace pekko_projection.offset_store") |
| _ <- session.executeDDL(s"DROP keyspace ${repository.keyspace}") |
| } yield Done, 30.seconds) |
| super.afterAll() |
| } |
| |
| private def genRandomProjectionId() = |
| ProjectionId(UUID.randomUUID().toString, UUID.randomUUID().toString) |
| |
| private def runAndAssert(projection: Projection[WordEnvelope]): Unit = { |
| val projectionId = projection.projectionId |
| val expected = Map("abc" -> 2, "def" -> 1, "ghi" -> 1) |
| |
| projectionTestKit.run(projection) { |
| withClue("check - all values words were counted") { |
| val savedState = repository.loadAll(projectionId.id).futureValue |
| savedState shouldBe expected |
| } |
| } |
| } |
| |
| "WordCount example" must { |
| |
| "be able to load initial state and manage updated state" in { |
| import IllustrateStatefulHandlerLoadingInitialState._ |
| |
| val projectionId = genRandomProjectionId() |
| |
| // #projection |
| val projection = |
| CassandraProjection |
| .atLeastOnce[Long, WordEnvelope]( |
| projectionId, |
| sourceProvider = new WordSource, |
| handler = () => new WordCountHandler(projectionId, repository)) |
| // #projection |
| |
| runAndAssert(projection) |
| } |
| |
| "be able to load state on demand and manage updated state" in { |
| import IllustrateStatefulHandlerLoadingStateOnDemand._ |
| |
| val projectionId = genRandomProjectionId() |
| |
| val projection = |
| CassandraProjection |
| .atLeastOnce[Long, WordEnvelope]( |
| projectionId, |
| sourceProvider = new WordSource, |
| handler = () => new WordCountHandler(projectionId, repository)) |
| |
| runAndAssert(projection) |
| } |
| |
| "have support for actor Behavior as handler - loading initial state" in { |
| import IllstrateActorLoadingInitialState._ |
| |
| val projectionId = genRandomProjectionId() |
| |
| // #actorHandlerProjection |
| val projection = |
| CassandraProjection |
| .atLeastOnce[Long, WordEnvelope]( |
| projectionId, |
| sourceProvider = new WordSource, |
| handler = () => new WordCountActorHandler(WordCountProcessor(projectionId, repository))) |
| // #actorHandlerProjection |
| |
| runAndAssert(projection) |
| } |
| |
| "have support for actor Behavior as handler - loading state on demand" in { |
| import IllstrateActorLoadingStateOnDemand._ |
| |
| val projectionId = genRandomProjectionId() |
| |
| val projection = |
| CassandraProjection |
| .atLeastOnce[Long, WordEnvelope]( |
| projectionId, |
| new WordSource, |
| () => new WordCountActorHandler(WordCountProcessor(projectionId, repository))) |
| |
| runAndAssert(projection) |
| } |
| |
| } |
| |
| } |