blob: b7df92eb3ec11c279a0a4a0c51f8ffc3aaf80f85 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.jdbc;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.persistence.cassandra.query.javadsl.CassandraReadJournal;
import org.apache.pekko.persistence.query.Offset;
import org.apache.pekko.projection.Projection;
import org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.eventsourced.EventEnvelope;
import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider;
import org.apache.pekko.projection.javadsl.SourceProvider;
import org.apache.pekko.projection.jdbc.javadsl.JdbcHandler;
import org.apache.pekko.projection.jdbc.javadsl.JdbcProjection;
import jdocs.eventsourced.ShoppingCart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.persistence.EntityManager;
// #jdbc-session-imports
import org.apache.pekko.projection.jdbc.JdbcSession;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Connection;
// #jdbc-session-imports
import java.time.Duration;
import java.time.Instant;
import java.util.List;
@SuppressWarnings({"unused", "InnerClassMayBeStatic"})
class JdbcProjectionDocExample {
// #todo
// TODO
// #todo
// #repository
class Order {
public final String id;
public final Instant time;
public Order(String id, Instant time) {
this.id = id;
this.time = time;
}
}
interface OrderRepository {
void save(EntityManager entityManager, Order order);
}
// #repository
@SuppressWarnings("Convert2Lambda")
public OrderRepository orderRepository =
new OrderRepository() {
@Override
public void save(EntityManager entityManager, Order order) {}
};
// #jdbc-session
class PlainJdbcSession implements JdbcSession {
private final Connection connection;
public PlainJdbcSession() {
try {
Class.forName("org.h2.Driver");
this.connection = DriverManager.getConnection("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1");
connection.setAutoCommit(false);
} catch (ClassNotFoundException | SQLException e) {
throw new RuntimeException(e);
}
}
@Override
public <Result> Result withConnection(Function<Connection, Result> func) throws Exception {
return func.apply(connection);
}
@Override
public void commit() throws SQLException {
connection.commit();
}
@Override
public void rollback() throws SQLException {
connection.rollback();
}
@Override
public void close() throws SQLException {
connection.close();
}
}
// #jdbc-session
// #handler
public class ShoppingCartHandler
extends JdbcHandler<EventEnvelope<ShoppingCart.Event>, HibernateJdbcSession> {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void process(HibernateJdbcSession session, EventEnvelope<ShoppingCart.Event> envelope)
throws Exception {
ShoppingCart.Event event = envelope.event();
if (event instanceof ShoppingCart.CheckedOut) {
ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
logger.info(
"Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);
// pass the EntityManager created by the projection
// to the repository in order to use the same transaction
orderRepository.save(
session.entityManager, new Order(checkedOut.cartId, checkedOut.eventTime));
} else {
logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
}
}
}
// #handler
// #grouped-handler
public class GroupedShoppingCartHandler
extends JdbcHandler<List<EventEnvelope<ShoppingCart.Event>>, HibernateJdbcSession> {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void process(
HibernateJdbcSession session, List<EventEnvelope<ShoppingCart.Event>> envelopes)
throws Exception {
for (EventEnvelope<ShoppingCart.Event> envelope : envelopes) {
ShoppingCart.Event event = envelope.event();
if (event instanceof ShoppingCart.CheckedOut) {
ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
logger.info(
"Shopping cart {} was checked out at {}", checkedOut.cartId, checkedOut.eventTime);
// pass the EntityManager created by the projection
// to the repository in order to use the same transaction
orderRepository.save(
session.entityManager, new Order(checkedOut.cartId, checkedOut.eventTime));
} else {
logger.debug("Shopping cart {} changed by {}", event.getCartId(), event);
}
}
}
}
// #grouped-handler
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #sourceProvider
SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), "carts-1");
// #sourceProvider
{
// #exactlyOnce
final HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
Projection<EventEnvelope<ShoppingCart.Event>> projection =
JdbcProjection.exactlyOnce(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
sessionProvider::newInstance,
ShoppingCartHandler::new,
system);
// #exactlyOnce
}
{
// #atLeastOnce
final HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection =
JdbcProjection.atLeastOnce(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
sessionProvider::newInstance,
ShoppingCartHandler::new,
system)
.withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
// #atLeastOnce
}
{
// #grouped
final HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
int saveOffsetAfterEnvelopes = 100;
Duration saveOffsetAfterDuration = Duration.ofMillis(500);
Projection<EventEnvelope<ShoppingCart.Event>> projection =
JdbcProjection.groupedWithin(
ProjectionId.of("shopping-carts", "carts-1"),
sourceProvider,
sessionProvider::newInstance,
GroupedShoppingCartHandler::new,
system)
.withGroup(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
// #grouped
}
}