This plugin is published to the Maven Central repository with the following names:
<dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-persistence-dynamodb_2.13</artifactId> <version>1.1.0</version> </dependency>
or for sbt users:
libraryDependencies += "org.apache.pekko" %% "pekko-persistence-dynamodb" % "1.1.0"
Snapshot versions are available.
https://repository.apache.org/content/groups/snapshots
resolvers += Resolver.ApacheMavenSnapshotsRepo
pekko.persistence.journal.plugin = "my-dynamodb-journal" my-dynamodb-journal = ${dynamodb-journal} # include the default settings my-dynamodb-journal { # and add some overrides journal-table = <the name of the table to be used> journal-name = <prefix to be used for all keys stored by this plugin> aws-access-key-id = <your key> aws-secret-access-key = <your secret> endpoint = "https://dynamodb.us-east-1.amazonaws.com" # or where your deployment is }
For details on the endpoint URL please refer to the DynamoDB documentation. There are many more settings that can be used for fine-tuning and adapting this journal plugin to your use-case, please refer to the reference.conf file.
Before you can use these settings you will have to create a table, e.g. using the AWS console, with the following schema:
par
num
contributed by @joost-de-vries
pekko.persistence.snapshot-store.plugin = "my-dynamodb-snapshot-store" my-dynamodb-snapshot-store = ${dynamodb-snapshot-store} # include the default settings my-dynamodb-snapshot-store { # and add some overrides snapshot-table = <the name of the table to be used> journal-name = <prefix to be used for all keys stored by this plugin> aws-access-key-id = <your key, default is the same as journal> aws-secret-access-key = <your secret, default is the same as journal> endpoint = "https://dynamodb.us-east-1.amazonaws.com" # or where your deployment is, default is the same as journal }
The table to create for snapshot storage has the schema:
par
seq
ts
ts-idx
that is an index on the combination of par
and ts
The DynamoDB item of a snapshot can be 400 kB. Using a binary serialisation format like ProtoBuf or Kryo will use that space most effectively.
contributed by @joost-de-vries
See CreatePersistenceIdsIndex.createPersistenceIdsIndexRequest
how to create the Global Secondary Index that is required to query currentPersistenceIds
dynamodb-read-journal { # The name of the Global Secondary Index that is used to query currentPersistenceIds # see CreatePersistenceIdsIndex.createPersistenceIdsIndexRequest # persistence-ids-index-name: "persistence-ids-idx" }
DynamoDB only offers consistency guarantees for a single storage item—which corresponds to one event in the case of this Pekko Persistence plugin. This means that any single event is either written to the journal (and thereby visible to later replays) or it is not. This plugin supports atomic multi-event batches nevertheless, by marking the contained events such that partial replay can be avoided (see the idx
and cnt
attributes in the storage format description below). Consider the following actions of a PersistentActor:
val events = List(<some events>) if (atomic) { persistAll(events)(handler) else { for (event <- events) persist(event)(handler) }
In the first case a recovery will only ever see all of the events or none of them. This is also true if recovery is requested with an upper limit on the sequence number to be recovered to or a limit on the number of events to be replayed; the event count limit is applied before removing incomplete batch writes which means that the actual count of events received at the actor may be lower than the requested limit even if further events are available.
In the second case each event is treated in isolation and may or may not be replayed depending on whether it was persisted successfully or not.
This plugin uses the AWS Java SDK which means that the number of requests that can be made concurrently is limited by the number of connections to DynamoDB and by the number of threads in the thread-pool that is used by the AWS HTTP client. The default setting is 50 connections which for a deployment that is used from the same EC2 region allows roughly 5000 requests per second (where every persisted event batch is roughly one request). If a single ActorSystem needs to persist more than this number of events per second then you may want to tune the parameter
my-dynamodb-journal.aws-client-config.max-connections = <your value here>
Changing this number changes both the number of concurrent connections and the used thread-pool size.
This plugin uses exponential backoff when plausible and retriable errors from DynamoDB occur. This includes network glitches (50x; since Pekko 1.1.0) and throughput exceptions (400; extended in Pekko 1.1.0).
The backoff strategy is very simple.
This means that the last waiting time would be about half a second, and if responses would be immediate, the total retrial process would take about 1 second. In practice, response time would be more than 0 of course.
pekko-persistence-dynamodb is derived from akka-persistence-dynamodb v1.3.0.
Anyone migrating from using akka-persistence-dynamodb should first upgrade to akka-persistence-dynamodb v1.3.0.
./docker-compose up
to download and start Localstack.source .env.test
sbt
../docker-compose down
.Please also read the CONTRIBUTING.md file.
The structure for journal storage in dynamodb has evolved over iterations of performance tuning. Most of these lessons were learned in creating the eventsourced dynamodb journal, but apply here as well.
When initially modelling journal storage in dynamo, it seems natural to use a simple structure similar to this
persistenceId : S : HashKey sequenceNr : N : RangeKey payload : B
This maps very well to the operations a journal needs to solve.
writeMessage -> PutItem deleteMessage -> DeleteItem replayMessages -> Query by persistenceId, conditions and ordered by sequenceNr, ascending highCounter -> Query by persistenceId, conditions and ordered by sequenceNr, descending limit 1
However this layout suffers from scalability problems. Since the hash key is used to locate the data storage node, all writes for a single processor will go to the same DynamoDB node, which limits throughput and invites throttling, no matter the level of throughput provisioned for a table—the hash key just gets too hot. Also this limits replay throughput since you have to step through a sequence of queries, where you use the last processed item in query N for query N+1.
With the following abbreviations:
P -> PersistentRepr SH -> SequenceHigh SL -> SequenceLow
we model PersistentRepr storage as
par = <journalName>-P-<persistenceId>-<sequenceNr / 100> : S : HashKey num = <sequenceNr % 100> : N : RangeKey pay = <payload> : B idx = <atomic write batch index> : N (possibly absent) cnt = <atomic write batch max index> : N (possibly absent)
High Sequence Numbers
par = <journalName>-SH-<persistenceId>-<(sequenceNr / 100) % sequenceShards> : S : HashKey num = 0 : N : RangeKey seq = <sequenceNr rounded down to nearest multiple of 100> : N
Low Sequence Numbers
par = <journalName>-SL-<persistenceId>-<(sequenceNr / 100) % sequenceShards> : S : HashKey num = 0 : N : RangeKey seq = <sequenceNr, not rounded> : N
This is somewhat more difficult to code, but offers higher throughput possibilities. Notice that the items that hold the high and low sequence are sharded, rather than using a single item to store the counter. If we only used a single item, we would suffer from the same hot key problems as our first structure.
When writing an item we typically do not touch the high sequence number storage, only when writing an item with sort key 0
is this done. This implies that reading the highest sequence number will need to first query the sequence shards for the highest multiple of 100 and then send a Query
for the corresponding P entry’s hash key to find the highest stored sort key number.
build.sbt
in this directory)sbt compile
compiles the main source for project default version of Scala (2.13)sbt +compile
will compile for all supported versions of Scalasbt test
will compile the code and run the unit testssbt testQuick
similar to test but when repeated in shell mode will only run failing testssbt package
will build the jartarget
directorysbt publishLocal
will push the jars to your local Apache Ivy repositorysbt publishM2
will push the jars to your local Apache Maven repositorysbt doc
will build the Javadocs for all the modules and load them to one place (may require Graphviz, see Prerequisites above)index.html
file will appear in target/api/
sbt sourceDistGenerate
will generate source release to target/dist/
sbt "set ThisBuild / version := \"1.0.0\"; sourceDistGenerate"
version.sbt
to the same directory that has the build.sbt
containing something likeThisBuild / version := "1.0.0"
There are several ways to interact with the Pekko community:
Apache Pekko is governed by the Apache code of conduct. By participating in this project you agree to abide by its terms.
Apache Pekko DynamoDB Persistence Plugin is available under the Apache License, version 2.0. See LICENSE file for details.