| = Section 4: Create the Event Sourced Cart entity |
| :page-supergroup-java-scala: Language |
| |
| include::ROOT:partial$include.adoc[] |
| |
| Next, we will create the `ShoppingCart` Cart entity that manages the state for each shopping cart. The architectural overview shows how the entity is related to the Cart service. The Cart entity will use Event Sourcing to persist events that capture changes to the Cart's state. The entity writes events to the event journal, which we will use later to create projections: |
| |
| image::example-entity.svg[Example entity] |
| |
| For now, we'll implement the command to add items to the Cart. In the next part of the tutorial, we will expand it to handle more commands and events. On this page you will learn how to: |
| |
| * implement an Event Sourced entity |
| * unit test the entity |
| * distribute the entities over the nodes in the Akka Cluster |
| * send requests from the gRPC service implementation to the entities |
| |
| ifdef::review[REVIEWERS: re: bullet 3 above, the Initialization section only contains a brief statement about distributing among nodes. I think we should introduce and motivate use in the overview of this page or maybe in the architectural overview to explain that this is one thing that helps us achieve Reactive?] |
| |
| If you are unfamiliar with Event Sourcing, refer to the xref:concepts:event-sourcing.adoc[Event Sourcing] section for an explanation. |
| The {akka-blog}/news/2020/01/07/akka-event-sourcing-video[Event Sourcing with Akka 2.6 video {tab-icon}, window="tab"] is also a good starting point for learning Event Sourcing. |
| |
| This example is using PostgreSQL for storing the events. An alternative is described in xref:how-to:cassandra-alternative.adoc[]. |
| |
| === Akka Workshop |
| |
| The second video of the https://info.lightbend.com/akka-platform-workshop-part-2-on-demand-recording.html[Akka Workshop Series {tab-icon}, window="tab"] covers both the cart entity and Event sourcing. It provides some solid guidance to aid you in digesting this section, and the next section of this guide. |
| |
| == Source downloads |
| |
| If you prefer to simply view and run the example, download a zip file containing the completed code: |
| |
| [.tabset] |
| Java:: |
| + |
| **** |
| * link:_attachments/1-shopping-cart-grpc-java.zip[Source] that includes all previous tutorial steps and allows you to start with the steps on this page. |
| * link:_attachments/2-shopping-cart-event-sourced-java.zip[Source] with the steps on this page completed. |
| **** |
| |
| Scala:: |
| + |
| **** |
| * link:_attachments/1-shopping-cart-grpc-scala.zip[Source] that includes all previous tutorial steps and allows you to start with the steps on this page. |
| * link:_attachments/2-shopping-cart-event-sourced-scala.zip[Source] with the steps on this page completed. |
| **** |
| |
| |
| :sectnums: |
| == Add commands and events |
| |
| Commands are the "external" API of an entity. Entity state can only be changed by commands. The results of commands are emitted as events. A command can request state changes, but different events might be generated depending on the current state of the entity. A command can also be validated and be rejected if it has invalid input or can't be handled by current state of the entity. |
| |
| To add a command and an event, follow these steps: |
| |
| . [.group-scala]#Define a `ShoppingCart` object and the `AddItem` command:# [.group-java]#Define a `ShoppingCart` class extending `EventSourcedBehaviorWithEnforcedReplies` and the `AddItem` command:# |
| + |
| [.tabset] |
| Java:: |
| + |
| .src/main/java/shopping/cart/ShoppingCart.java: |
| [source,java,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java[tags=imports;shoppingCart;commands] |
| ---- |
| |
| Scala:: |
| + |
| .src/main/scala/shopping/cart/ShoppingCart.scala: |
| [source,scala,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala[tags=obj;commands] |
| ---- |
| |
| [start=2] |
| . Add a corresponding `ItemAdded` event: |
| + |
| [.tabset] |
| Java:: |
| + |
| .src/main/java/shopping/cart/ShoppingCart.java |
| [source,java,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java[tags=shoppingCart;events] |
| ---- |
| + |
| <1> `equals` and `hashCode` are not strictly needed, aside from that it can be useful when asserting the result in tests |
| |
| Scala:: |
| + |
| .src/main/scala/shopping/cart/ShoppingCart.scala |
| [source,scala,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala[tags=obj;events] |
| ---- |
| |
| == Add state map |
| |
| Items added to the Cart are added to a `Map`. The contents of the `Map` comprise the Cart's state. Add the `Map` to the `ShoppingCart` [.group-scala]#object# [.group-java]#class# as shown: |
| |
| [.tabset] |
| Java:: |
| + |
| .src/main/java/shopping/cart/ShoppingCart.java |
| [source,java,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java[tags=shoppingCart;state] |
| ---- |
| |
| Scala:: |
| + |
| .src/main/scala/shopping/cart/ShoppingCart.scala |
| [source,scala,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala[tags=obj;state] |
| ---- |
| |
| == Implement a command handler |
| |
| The Cart entity will receive commands that request changes to Cart state. We will implement a command handler to process these commands and emit a reply. Our business logic allows only items to be added which are not in the cart yet and require a positive quantity. |
| |
| |
| [.group-scala]#Implement the Event Sourced entity with the `EventSourcedBehavior`. Define the command handlers:# |
| [.group-java]#Implement the `commandHandler` as required by `EventSourcedBehaviorWithEnforcedReplies`:# |
| |
| [.tabset] |
| Java:: |
| + |
| .src/main/java/shopping/cart/ShoppingCart.java |
| [source,java,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java[tags=shoppingCart;commandHandler] |
| ---- |
| + |
| <1> Matching the `AddItem` command. |
| <2> Persisting the `ItemAdded` event and replying to the sender. |
| |
| Scala:: |
| + |
| .src/main/scala/shopping/cart/ShoppingCart.scala |
| [source,scala,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala[tags=obj;commandHandler] |
| ---- |
| + |
| <1> Matching the `AddItem` command. |
| <2> Persisting the `ItemAdded` event and replying to the sender. |
| |
| If an `AddItem` command is accepted, the `Effect.persist` applies an event to the cart's state and makes sure this event is stored before replying to the command. The returned `ReplyEffect` reacts on the commands by deciding which effect they should have on the entity. If the validation fails we want to send back an error message. The reply can be a success or an error and that is the reason for using the `StatusReply`. |
| |
| |
| See all available effects in the {akka}/typed/persistence.html#effects-and-side-effects[Akka reference documentation {tab-icon}, window="tab"]. |
| |
| == Add the event handler |
| |
| From commands, the entity creates events that represent state changes. Aligning with the command handler above, the entity's event handler reacts to events and updates the state. The events are continuously persisted to the Event Journal datastore, while the entity state is kept in memory. Other parts of the application may listen to the events. In case of a restart, the entity recovers its latest state by replaying the events from the Event Journal. |
| |
| Notice that there are no decisions on events, they are applied without any checking. |
| |
| Add the event handler as follows: |
| |
| [.tabset] |
| Java:: |
| + |
| .src/main/java/shopping/cart/ShoppingCart.java |
| [source,java,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java[tags=shoppingCart;eventHandler] |
| ---- |
| |
| Scala:: |
| + |
| .src/main/scala/shopping/cart/ShoppingCart.scala |
| [source,scala,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala[tags=obj;eventHandler] |
| ---- |
| |
| [#initialization] |
| == Add initialization |
| |
| To glue the command handler, event handler, and state together, we need some initialization code. Our code will distribute the Cart entities over nodes in the Akka Cluster with https://doc.akka.io/docs/akka/current/cluster-sharding.html[Cluster Sharding {tab-icon}, window="tab"], enable snapshots to reduce recovery time when the entity is started, and restart with backoff in the case of failure. |
| |
| [.tabset] |
| Java:: |
| + |
| .src/main/java/shopping/cart/ShoppingCart.java |
| [source,java,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java[tags=shoppingCart;init] |
| ---- |
| + |
| <1> The entities are distributed over the nodes in the Akka Cluster with Cluster Sharding. |
| <2> An `EventSourcedBehavior` is created for the `ShoppingCart`. |
| <3> Snapshotting is an optimization to reduce recovery when the entity is started. |
| <4> Restarting with backoff in case of failures. |
| |
| Scala:: |
| + |
| .src/main/scala/shopping/cart/ShoppingCart.scala |
| [source,scala,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala[tags=obj;init] |
| ---- |
| + |
| <1> The entities are distributed over the nodes in the Akka Cluster with Cluster Sharding. |
| <2> Command and event handler are defined with the `EventSourcedBehavior`. |
| <3> Snapshotting is an optimization to reduce recovery when the entity is started. |
| <4> Restarting with backoff in case of failures. |
| |
| Then we need to call `ShoppingCart.init` from `Main`. Add the following before the gRPC `ShoppingCartServer` initialization: |
| |
| [.tabset] |
| Java:: |
| + |
| .src/main/java/shopping/cart/Main.java: |
| [source,java,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-java/src/main/java/shopping/cart/Main.java[tag=ShoppingCart] |
| ---- |
| |
| Scala:: |
| + |
| .src/main/scala/shopping/cart/Main.scala: |
| [source,scala,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/scala/shopping/cart/Main.scala[tag=ShoppingCart] |
| ---- |
| |
| Verify that everything compiles with: |
| |
| [.group-scala] |
| [source,shell script] |
| ---- |
| sbt compile |
| ---- |
| |
| [.group-java] |
| [source,shell script] |
| ---- |
| mvn compile |
| ---- |
| |
| == How serialization is included |
| |
| The state, commands and events of the entity must be serializable because they are written to the datastore or sent between nodes within the Akka cluster. The template project includes built-in CBOR serialization. This section describes how serialization is implemented. You do not need to do anything specific to take advantage of CBOR, but this section explains how it is included. |
| |
| The state, commands and events are marked as `CborSerializable` which is configured to use the built-in CBOR serialization. The template project includes this marker interface `CborSerializable`: |
| |
| [.tabset] |
| Java:: |
| + |
| .src/main/java/shopping/cart/CborSerializable.java: |
| [source,java,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-java/src/main/java/shopping/cart/CborSerializable.java[] |
| ---- |
| |
| Scala:: |
| + |
| .src/main/scala/shopping/cart/CborSerializable.scala: |
| [source,scala,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/scala/shopping/cart/CborSerializable.scala[] |
| ---- |
| |
| The interface is configured in the `serialization.conf` file to enable CBOR serialization. `serialization.conf` is included in `application.conf`. |
| |
| .src/main/resources/serialization.conf |
| [source,hocon] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/resources/serialization.conf[] |
| ---- |
| |
| == Unit testing |
| |
| To test the `ShoppingCart` entity we can write a unit test using the [.group-scala]#`EventSourcedBehaviorTestKit`# [.group-java]#`TestKitJunitResource`#. |
| |
| A test for the `AddItem` command looks like this in [.group-scala]#`src/test/scala/shopping/cart/ShoppingCartSpec.scala`# [.group-java]#`src/test/java/shopping/cart/ShoppingCartTest.java`#: |
| |
| [.tabset] |
| Java:: |
| + |
| .src/test/java/shopping/cart/ShoppingCartTest.java: |
| [source,java,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-java/src/test/java/shopping/cart/ShoppingCartTest.java[] |
| ---- |
| |
| Scala:: |
| + |
| .src/test/scala/shopping/cart/ShoppingCartSpec.scala |
| [source,scala,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/test/scala/shopping/cart/ShoppingCartSpec.scala[] |
| ---- |
| |
| Run the test with: |
| |
| [.group-scala] |
| [source,shell script] |
| ---- |
| sbt test |
| ---- |
| |
| [.group-java] |
| [source,shell script] |
| ---- |
| mvn test |
| ---- |
| |
| You can learn more about the [.group-scala]#`EventSourcedBehaviorTestKit`# [.group-java]#`TestKitJunitResource`# in the {akka}/typed/persistence-testing.html#unit-testing[Akka reference documentation {tab-icon}, window="tab"] |
| |
| == Send commands to the entities |
| |
| We want to send commands to the entities from the gRPC service implementation. In the xref:grpc-service.adoc[previous step], we wrote a dummy implementation of `addItem` in the `ShoppingCartServiceImpl`. We can now replace that and send `ShoppingCart.AddItem` commands from `ShoppingCartServiceImpl`: |
| |
| [.tabset] |
| Java:: |
| + |
| .src/main/java/shopping/cart/ShoppingCartServiceImpl.java: |
| [source,java,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCartServiceImpl.java[] |
| ---- |
| |
| Scala:: |
| + |
| .src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala: |
| [source,scala,indent=0] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala[] |
| ---- |
| |
| If the command is successful, the entity will reply with `StatusReply.Success` with the updated `ShoppingCart.Summary`. If the validation in the entity fails it will reply with `StatusReply.Error`, which will fail the [.group-scala]#`Future`# [.group-java]#`CompletionStage`# that is returned from `askWithStatus`. |
| |
| Also, we added an `ActorSystem` parameter to the constructor of `ShoppingCartServiceImpl`. Edit `Main` to include the `system` as the parameter when creating a new instance of the `ShoppingCartServiceImpl`. |
| |
| == Configure Postgres |
| |
| The events are stored in a PostgresSQL database and the template project includes configuration for that in the `src/main/resources/persistence.conf` file. We have to enable this configuration by including `persistence.conf` in `application.conf`: |
| |
| [source,hocon] |
| ---- |
| include::example$02-shopping-cart-service-scala/src/main/resources/application.conf[tag=persistenceInclude] |
| ---- |
| |
| == Run locally |
| |
| To run the service, we first need to start the PostgresSQL to persist the events. Then we can run the service: |
| |
| include::template.adoc[tag=docker] |
| |
| . Run the service with: |
| + |
| [.group-scala] |
| [source,shell script] |
| ---- |
| sbt -Dconfig.resource=local1.conf run |
| ---- |
| + |
| [.group-java] |
| [source,shell script] |
| ---- |
| # make sure to compile before running exec:exec |
| mvn compile exec:exec -DAPP_CONFIG=local1.conf |
| ---- |
| |
| === Exercise the service |
| |
| Use `https://github.com/fullstorydev/grpcurl[grpcurl]` to exercise the service: |
| |
| . Use `grpcurl` to add 3 socks to a cart: |
| + |
| [source,shell script] |
| ---- |
| grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem |
| ---- |
| |
| . Test the validation logic by trying to add the same item again, which should result in an error: |
| + |
| [source,shell script] |
| ---- |
| grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem |
| ---- |
| |
| . To verify that the events are actually saved, and the state can be recovered from the events you can stop the service with `ctrl-c` and then start it again. |
| |
| . Add 2 t-shirts to the same cart: |
| + |
| [source,shell script] |
| ---- |
| grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem |
| ---- |
| + |
| The returned updated cart should still contain the 3 socks. |
| |
| === Exercise with multiple service instances |
| |
| Another fun experiment is to start several instances of the service on different ports (2552, 2553) and then interact with different carts via the different gRPC servers (gRPC ports 8101, 8102, 8103). To do this, you can use the other provided configuration files: |
| |
| . In a new terminal, start a second instance with local configuration #2: |
| + |
| [.group-scala] |
| [source,shell script] |
| ---- |
| sbt -Dconfig.resource=local2.conf run |
| ---- |
| + |
| [.group-java] |
| [source,shell script] |
| ---- |
| # make sure to compile before running exec:exec |
| mvn compile exec:exec -DAPP_CONFIG=local2.conf |
| ---- |
| |
| . Within another terminal, start a third instance with local configuration #3: |
| + |
| [.group-scala] |
| [source,shell script] |
| ---- |
| sbt -Dconfig.resource=local3.conf run |
| ---- |
| + |
| [.group-java] |
| [source,shell script] |
| ---- |
| # make sure to compile before running exec:exec |
| mvn compile exec:exec -DAPP_CONFIG=local3.conf |
| ---- |
| |
| === Stop the service |
| |
| When finished, stop the service with `ctrl-c`. Leave the PostgresSQL running for the next set of steps, or stop it with: |
| |
| [source,shell script] |
| ---- |
| docker-compose down |
| ---- |
| NOTE: The following steps for cloud deployment are optional. If you are only running locally, you can skip to the next section of the tutorial. |
| [#kubernetes] |
| == Run in Kubernetes |
| |
| Before following the steps below, create Kubernetes cluster and install the Akka Operator. Used the instructions below for: |
| |
| * xref:deployment:aws-install.adoc[] |
| * xref:deployment:gcp-install.adoc[] |
| |
| === Create PostgreSQL database |
| |
| Follow the xref:deployment:jdbc.adoc[JDBC] instructions to setup a PostgresSQL database in GCP or AWS and create a secret to access it. |
| |
| Create the PostgresSQL tables using the `ddl-scripts/create_tables.sql` SQL script located at the root of the project. Follow the instructions in xref:deployment:jdbc.adoc[JDBC integration] to connect to your database instance and load the script. |
| |
| [source,shell script] |
| ---- |
| kubectl run -i rds-mgmt --image=postgres \ |
| --restart=Never --rm --env "PGPASSWORD=<password>" -- \ |
| psql -h <db hostname> -U postgres -t < ddl-scripts/create_tables.sql |
| ---- |
| |
| [NOTE] |
| ==== |
| If you created your database with the xref:deployment:aws-install-quickstart.adoc[] then you can reference the postgres username, password, and hostname using the `pulumi output` command (you must be in the Pulumi working directory for this to work), and reference the ddl script with an absolute path. |
| See the xref:deployment:aws-install-quickstart.adoc#connect-aurora-database[Connect to the Aurora RDS database {tab-icon}, window="tab"] section of the quick start for an example. |
| ==== |
| |
| === Build Docker image |
| |
| Create a Docker repository and authenticate Docker. |
| |
| [.tabset] |
| GCP:: |
| + |
| Follow the instructions in https://cloud.google.com/container-registry/docs/using-with-google-cloud-platform[Using Container Registry with Google Cloud {tab-icon}, window="tab"] to deploy Docker images on GCP's container registry. |
| |
| AWS:: |
| + |
| Follow the instructions in xref:deployment:aws-ecr.adoc[Amazon Elastic Container Registry] to deploy Docker images on AWS's container registry. |
| |
| === Additional steps for Docker and AWS |
| |
| If you are using AWS, you will also need to complete the following procedures. |
| |
| Rebuild the Docker image. |
| |
| include::partial$build-docker-for-kube.adoc[] |
| |
| === Update the deployment descriptor |
| |
| Update the `kubernetes/shopping-cart-service-cr.yml` deployment descriptor with the new image tag and the JDBC credential secret. |
| |
| .kubernetes/shopping-cart-service-cr.yml: |
| [source,yaml] |
| ---- |
| include::example$02-shopping-cart-service-scala/kubernetes/shopping-cart-service-cr.yml[] |
| ---- |
| |
| <1> Replace `<docker-registry>` with your docker registry address and update the image reference with the image tag from the output of the Docker build above, for example: `803424716218.dkr.ecr.eu-central-1.amazonaws.com/shopping-cart-service:20201209-135004-363ae2b`. |
| <2> Add a `jdbc.credentialsSecret` section pointing to the secret created in the xref:deployment:jdbc.adoc[JDBC] instructions. |
| |
| |
| === Apply to Kubernetes |
| |
| Re-apply the `shopping-cart-service-cr.yml` to Kubernetes: |
| |
| [source,shell script] |
| ---- |
| kubectl apply -f kubernetes/shopping-cart-service-cr.yml |
| ---- |
| |
| You can see progress by viewing the status: |
| |
| [source,shell script] |
| ---- |
| kubectl get akkamicroservices/shopping-cart-service |
| ---- |
| |
| See xref:deployment:troubleshooting.adoc[troubleshooting deployment status] for more details. |
| |
| === Exercise the service in Kubernetes |
| |
| include::partial$prepare-to-exercise-in-kube.adoc[] |
| |
| Use `https://github.com/fullstorydev/grpcurl[grpcurl]` to exercise the service: |
| |
| . Use `grpcurl` to add 3 socks to a cart: |
| + |
| [source,shell script] |
| ---- |
| grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem |
| ---- |
| |
| . Test the validation logic by trying to add the same item again, which should result in an error: |
| + |
| [source,shell script] |
| ---- |
| grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem |
| ---- |
| |
| . Add 2 t-shirts to the same cart: |
| + |
| [source,shell script] |
| ---- |
| grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem |
| ---- |
| + |
| The returned updated cart should still contain the 3 socks. |
| |
| Note the logging from the `ShoppingCartServiceImpl` in the console that is running `kubectl logs -f`. |
| |
| :!sectnums: |
| == Learn more |
| |
| * xref:concepts:event-sourcing.adoc[Event Sourcing concepts]. |
| * {akka}/typed/persistence.html[Akka Event Sourcing reference documentation {tab-icon}, window="tab"]. |
| * {akka}/typed/cluster-sharding.html[Akka Cluster Sharding reference documentation {tab-icon}, window="tab"]. |
| * xref:how-to:cassandra-alternative.adoc[] |