blob: e520d02caa7e9dc0bed0896d950bcb7a5cd3a505 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2020-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.projection.MergeableOffset;
import org.apache.pekko.projection.Projection;
import org.apache.pekko.projection.ProjectionId;
import org.apache.pekko.projection.javadsl.ExactlyOnceProjection;
import org.apache.pekko.projection.javadsl.Handler;
import org.apache.pekko.projection.javadsl.SourceProvider;
import org.apache.pekko.projection.jdbc.javadsl.JdbcHandler;
import org.apache.pekko.projection.jdbc.javadsl.JdbcProjection;
import org.apache.pekko.projection.kafka.javadsl.KafkaSourceProvider;
import org.apache.pekko.stream.javadsl.Source;
import jdocs.jdbc.HibernateSessionFactory;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// #imports
import org.apache.pekko.kafka.ConsumerSettings;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
// #imports
// #imports-producer
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pekko.kafka.ProducerSettings;
// #imports-producer
// #sendProducer
import org.apache.pekko.kafka.javadsl.SendProducer;
// #sendProducer
// #producerFlow
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pekko.kafka.ProducerMessage;
import org.apache.pekko.kafka.javadsl.Producer;
import org.apache.pekko.stream.javadsl.FlowWithContext;
import org.apache.pekko.projection.ProjectionContext;
// #producerFlow
import jdocs.jdbc.HibernateJdbcSession;
import javax.persistence.EntityManager;
public interface KafkaDocExample {
// #todo
// TODO
// #todo
// #handler
public class WordCountHandler
extends JdbcHandler<ConsumerRecord<String, String>, HibernateJdbcSession> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final ProjectionId projectionId;
private final Map<String, Integer> state = new HashMap<>();
public WordCountHandler(ProjectionId projectionId) {
this.projectionId = projectionId;
}
@Override
public void process(HibernateJdbcSession session, ConsumerRecord<String, String> envelope) {
String word = envelope.value();
int newCount = state.getOrDefault(word, 0) + 1;
logger.info(
"{} consumed from topic/partition {}/{}. Word count for [{}] is {}",
projectionId,
envelope.topic(),
envelope.partition(),
word,
newCount);
state.put(word, newCount);
}
}
// #handler
// #wordSource
public class WordEnvelope {
public final Long offset;
public final String word;
public WordEnvelope(Long offset, String word) {
this.offset = offset;
this.word = word;
}
}
class WordSource extends SourceProvider<Long, WordEnvelope> {
private final Source<WordEnvelope, NotUsed> src =
Source.from(
Arrays.asList(
new WordEnvelope(1L, "abc"),
new WordEnvelope(2L, "def"),
new WordEnvelope(3L, "ghi"),
new WordEnvelope(4L, "abc")));
@Override
public CompletionStage<Source<WordEnvelope, NotUsed>> source(
Supplier<CompletionStage<Optional<Long>>> offset) {
return offset
.get()
.thenApply(
o -> {
if (o.isPresent())
return src.dropWhile(envelope -> envelope.offset <= o.get())
.throttle(1, Duration.ofSeconds(1));
else return src.throttle(1, Duration.ofSeconds(1));
});
}
@Override
public Long extractOffset(WordEnvelope envelope) {
return envelope.offset;
}
@Override
public long extractCreationTime(WordEnvelope envelope) {
return 0L;
}
}
// #wordSource
// #wordPublisher
class WordPublisher extends Handler<WordEnvelope> {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final String topic;
private final SendProducer<String, String> sendProducer;
public WordPublisher(String topic, SendProducer<String, String> sendProducer) {
this.topic = topic;
this.sendProducer = sendProducer;
}
@Override
public CompletionStage<Done> process(WordEnvelope envelope) {
String word = envelope.word;
// using the word as the key and `DefaultPartitioner` will select partition based on the key
// so that same word always ends up in same partition
String key = word;
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, word);
CompletionStage<RecordMetadata> result = sendProducer.send(producerRecord);
CompletionStage<Done> done =
result.thenApply(
recordMetadata -> {
logger.info(
"Published word [{}] to topic/partition {}/{}",
word,
topic,
recordMetadata.partition());
return Done.getInstance();
});
return done;
}
}
// #wordPublisher
static SourceProvider<MergeableOffset<Long>, ConsumerRecord<String, String>>
illustrateSourceProvider() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #sourceProvider
String bootstrapServers = "localhost:9092";
String groupId = "group-wordcount";
String topicName = "words";
ConsumerSettings<String, String> consumerSettings =
ConsumerSettings.create(system, new StringDeserializer(), new StringDeserializer())
.withBootstrapServers(bootstrapServers)
.withGroupId(groupId)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
SourceProvider<MergeableOffset<Long>, ConsumerRecord<String, String>> sourceProvider =
KafkaSourceProvider.create(system, consumerSettings, Collections.singleton(topicName));
// #sourceProvider
return sourceProvider;
}
static void illustrateExactlyOnce() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
SourceProvider<MergeableOffset<Long>, ConsumerRecord<String, String>> sourceProvider =
illustrateSourceProvider();
WordRepository wordRepository = null;
// #exactlyOnce
final HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
ProjectionId projectionId = ProjectionId.of("WordCount", "wordcount-1");
ExactlyOnceProjection<MergeableOffset<Long>, ConsumerRecord<String, String>> projection =
JdbcProjection.exactlyOnce(
projectionId,
sourceProvider,
sessionProvider::newInstance,
() -> new WordCountJdbcHandler(wordRepository),
system);
// #exactlyOnce
}
// #exactly-once-jdbc-handler
public class WordCountJdbcHandler
extends JdbcHandler<ConsumerRecord<String, String>, HibernateJdbcSession> {
private Logger logger = LoggerFactory.getLogger(getClass());
private WordRepository wordRepository;
public WordCountJdbcHandler(WordRepository wordRepository) {
this.wordRepository = wordRepository;
}
@Override
public void process(HibernateJdbcSession session, ConsumerRecord<String, String> envelope)
throws Exception {
String word = envelope.value();
wordRepository.increment(session.entityManager, word);
}
}
// #exactly-once-jdbc-handler
// #repository
interface WordRepository {
void increment(EntityManager entityManager, String word);
}
// #repository
static void IllustrateSendingToKafka() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #sendProducer
String bootstrapServers = "localhost:9092";
String topicName = "words";
ProducerSettings<String, String> producerSettings =
ProducerSettings.create(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers(bootstrapServers);
// FIXME classicSystem might not be needed in later Pekko Connectors Kafka version?
SendProducer<String, String> sendProducer =
new SendProducer<>(producerSettings, system.classicSystem());
// #sendProducer
// #sendToKafkaProjection
WordSource sourceProvider = new WordSource();
HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
ProjectionId projectionId = ProjectionId.of("PublishWords", "words");
Projection<WordEnvelope> projection =
JdbcProjection.atLeastOnceAsync(
projectionId,
sourceProvider,
sessionProvider::newInstance,
() -> new WordPublisher(topicName, sendProducer),
system);
// #sendToKafkaProjection
}
static void IllustrateSendingToKafkaFlow() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
// #producerFlow
String bootstrapServers = "localhost:9092";
String topicName = "words";
ProducerSettings<String, String> producerSettings =
ProducerSettings.create(system, new StringSerializer(), new StringSerializer())
.withBootstrapServers(bootstrapServers);
FlowWithContext<WordEnvelope, ProjectionContext, Done, ProjectionContext, NotUsed>
producerFlow =
FlowWithContext.<WordEnvelope, ProjectionContext>create()
.map(
wordEnv ->
ProducerMessage.single(
new ProducerRecord<String, String>(
topicName, wordEnv.word, wordEnv.word)))
.via(Producer.flowWithContext(producerSettings))
.map(__ -> Done.getInstance());
// #producerFlow
// #sendToKafkaProjectionFlow
WordSource sourceProvider = new WordSource();
HibernateSessionFactory sessionProvider = new HibernateSessionFactory();
ProjectionId projectionId = ProjectionId.of("PublishWords", "words");
Projection<WordEnvelope> projection =
JdbcProjection.atLeastOnceFlow(
projectionId, sourceProvider, sessionProvider::newInstance, producerFlow, system);
// #sendToKafkaProjectionFlow
}
}