blob: e3983437f2bee4c8d2bc0d0864665e9a977a7885 [file] [log] [blame]
package sample.distributeddata
import java.util.Optional
import scala.concurrent.duration._
import org.apache.pekko.actor.testkit.typed.scaladsl.TestProbe
import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.actor.typed.scaladsl.adapter._
import org.apache.pekko.cluster.ddata.Replicator
import org.apache.pekko.cluster.ddata.typed.scaladsl.DistributedData
import org.apache.pekko.cluster.ddata.typed.scaladsl.Replicator.{ GetReplicaCount, ReplicaCount }
import org.apache.pekko.cluster.typed.{ Cluster, Join }
import org.apache.pekko.remote.testconductor.RoleName
import org.apache.pekko.remote.testkit.MultiNodeConfig
import org.apache.pekko.remote.testkit.MultiNodeSpec
import com.typesafe.config.ConfigFactory
object ReplicatedCacheSpec extends MultiNodeConfig {
val node1 = role("node-1")
val node2 = role("node-2")
val node3 = role("node-3")
commonConfig(ConfigFactory.parseString("""
pekko.loglevel = INFO
pekko.actor.provider = "cluster"
pekko.log-dead-letters-during-shutdown = off
"""))
}
class ReplicatedCacheSpecMultiJvmNode1 extends ReplicatedCacheSpec
class ReplicatedCacheSpecMultiJvmNode2 extends ReplicatedCacheSpec
class ReplicatedCacheSpecMultiJvmNode3 extends ReplicatedCacheSpec
class ReplicatedCacheSpec extends MultiNodeSpec(ReplicatedCacheSpec) with STMultiNodeSpec {
import ReplicatedCacheSpec._
import ReplicatedCache._
override def initialParticipants = roles.size
implicit val typedSystem: ActorSystem[Nothing] = system.toTyped
val cluster = Cluster(typedSystem)
val replicatedCache = system.spawnAnonymous(ReplicatedCache.create())
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster.manager ! Join(node(to).address)
}
enterBarrier(from.name + "-joined")
}
"Demo of a replicated cache" must {
"join cluster" in within(20.seconds) {
join(node1, node1)
join(node2, node1)
join(node3, node1)
awaitAssert {
val probe = TestProbe[ReplicaCount]()
DistributedData(typedSystem).replicator ! GetReplicaCount(probe.ref)
probe.expectMessage(Replicator.ReplicaCount(roles.size))
}
enterBarrier("after-1")
}
"replicate cached entry" in within(10.seconds) {
runOn(node1) {
replicatedCache ! new PutInCache("key1", "A")
}
awaitAssert {
val probe = TestProbe[Cached]()
replicatedCache.tell(new GetFromCache("key1", probe.ref))
probe.expectMessage(new Cached("key1", Optional.of("A")))
}
enterBarrier("after-2")
}
"replicate many cached entries" in within(10.seconds) {
runOn(node1) {
for (i <- 100 to 200)
replicatedCache ! new PutInCache("key" + i, "entry-" + i)
}
awaitAssert {
val probe = TestProbe[Cached]()
for (i <- 100 to 200) {
replicatedCache.tell(new GetFromCache("key" + i, probe.ref))
probe.expectMessage(new Cached("key" + i, Optional.of("entry-" + i)))
}
}
enterBarrier("after-3")
}
"replicate evicted entry" in within(15.seconds) {
runOn(node1) {
replicatedCache ! new PutInCache("key2", "B")
}
awaitAssert {
val probe = TestProbe[Cached]()
replicatedCache.tell(new GetFromCache("key2", probe.ref))
probe.expectMessage(new Cached("key2", Optional.of("B")))
}
enterBarrier("key2-replicated")
runOn(node3) {
replicatedCache ! new Evict("key2")
}
awaitAssert {
val probe = TestProbe[Cached]()
replicatedCache.tell(new GetFromCache("key2", probe.ref))
probe.expectMessage(new Cached("key2", Optional.empty()))
}
enterBarrier("after-4")
}
"replicate updated cached entry" in within(10.seconds) {
runOn(node2) {
replicatedCache ! new PutInCache("key1", "A2")
replicatedCache ! new PutInCache("key1", "A3")
}
awaitAssert {
val probe = TestProbe[Cached]()
replicatedCache.tell(new GetFromCache("key1", probe.ref))
probe.expectMessage(new Cached("key1", Optional.of("A3")))
}
enterBarrier("after-5")
}
}
}