blob: 41d69dc010bb15ef0af38073a2a9aaf28914926a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kafka;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO.WriteRecords;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v20_0.com.google.common.cache.RemovalCause;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Exactly-once sink transform for Kafka. See {@link KafkaIO} for user visible documentation and
* example usage.
*/
class KafkaExactlyOnceSink<K, V>
extends PTransform<PCollection<ProducerRecord<K, V>>, PCollection<Void>> {
// Dataflow ensures at-least once processing for side effects like sinks. In order to provide
// exactly-once semantics, a sink needs to be idempotent or it should avoid writing records
// that have already been written. This snk does the latter. All the the records are ordered
// across a fixed number of shards and records in each shard are written in order. It drops
// any records that are already written and buffers those arriving out of order.
//
// Exactly once sink involves two shuffles of the records:
// A : Assign a shard ---> B : Assign sequential ID ---> C : Write to Kafka in order
//
// Processing guarantees also require deterministic processing within user transforms.
// Here, that requires order of the records committed to Kafka by C should not be affected by
// restarts in C and its upstream stages.
//
// A : Assigns a random shard for message. Note that there are no ordering guarantees for
// writing user records to Kafka. User can still control partitioning among topic
// partitions as with regular sink (of course, there are no ordering guarantees in
// regular Kafka sink either).
// B : Assigns an id sequentially for each messages within a shard.
// C : Writes each shard to Kafka in sequential id order. In Dataflow, when C sees a record
// and id, it implies that record and the associated id are checkpointed to persistent
// storage and this record will always have same id, even in retries.
// Exactly-once semantics are achieved by writing records in the strict order of
// these check-pointed sequence ids.
//
// Parallelism for B and C is fixed to 'numShards', which defaults to number of partitions
// for the topic. A few reasons for that:
// - B & C implement their functionality using per-key state. Shard id makes it independent
// of cardinality of user key.
// - We create one producer per shard, and its 'transactional id' is based on shard id. This
// requires that number of shards to be finite. This also helps with batching. and avoids
// initializing producers and transactions.
// - Most importantly, each of sharded writers stores 'next message id' in partition
// metadata, which is committed atomically with Kafka transactions. This is critical
// to handle retries of C correctly. Initial testing showed number of shards could be
// larger than number of partitions for the topic.
//
// Number of shards can change across multiple runs of a pipeline (job upgrade in Dataflow).
//
private static final Logger LOG = LoggerFactory.getLogger(KafkaExactlyOnceSink.class);
private static final String METRIC_NAMESPACE = "KafkaExactlyOnceSink";
private final WriteRecords<K, V> spec;
static void ensureEOSSupport() {
checkArgument(
ProducerSpEL.supportsTransactions(),
"%s %s",
"This version of Kafka client does not support transactions required to support",
"exactly-once semantics. Please use Kafka client version 0.11 or newer.");
}
KafkaExactlyOnceSink(WriteRecords<K, V> spec) {
this.spec = spec;
}
@Override
public PCollection<Void> expand(PCollection<ProducerRecord<K, V>> input) {
int numShards = spec.getNumShards();
if (numShards <= 0) {
try (Consumer<?, ?> consumer = openConsumer(spec)) {
numShards = consumer.partitionsFor(spec.getTopic()).size();
LOG.info(
"Using {} shards for exactly-once writer, matching number of partitions "
+ "for topic '{}'",
numShards,
spec.getTopic());
}
}
checkState(numShards > 0, "Could not set number of shards");
return input
.apply(
Window.<ProducerRecord<K, V>>into(new GlobalWindows()) // Everything into global window.
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply(
String.format("Shuffle across %d shards", numShards),
ParDo.of(new Reshard<>(numShards)))
.apply("Persist sharding", GroupByKey.create())
.apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
.apply("Persist ids", GroupByKey.create())
.apply(
String.format("Write to Kafka topic '%s'", spec.getTopic()),
ParDo.of(new ExactlyOnceWriter<>(spec, input.getCoder())));
}
/** Shuffle messages assigning each randomly to a shard. */
private static class Reshard<K, V>
extends DoFn<ProducerRecord<K, V>, KV<Integer, TimestampedValue<ProducerRecord<K, V>>>> {
private final int numShards;
private transient int shardId;
Reshard(int numShards) {
this.numShards = numShards;
}
@Setup
public void setup() {
shardId = ThreadLocalRandom.current().nextInt(numShards);
}
@ProcessElement
public void processElement(ProcessContext ctx) {
shardId = (shardId + 1) % numShards; // round-robin among shards.
ctx.output(KV.of(shardId, TimestampedValue.of(ctx.element(), ctx.timestamp())));
}
}
private static class Sequencer<K, V>
extends DoFn<
KV<Integer, Iterable<TimestampedValue<ProducerRecord<K, V>>>>,
KV<Integer, KV<Long, TimestampedValue<ProducerRecord<K, V>>>>> {
private static final String NEXT_ID = "nextId";
@StateId(NEXT_ID)
private final StateSpec<ValueState<Long>> nextIdSpec = StateSpecs.value();
@ProcessElement
public void processElement(@StateId(NEXT_ID) ValueState<Long> nextIdState, ProcessContext ctx) {
long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L);
int shard = ctx.element().getKey();
for (TimestampedValue<ProducerRecord<K, V>> value : ctx.element().getValue()) {
ctx.output(KV.of(shard, KV.of(nextId, value)));
nextId++;
}
nextIdState.write(nextId);
}
}
private static class ExactlyOnceWriter<K, V>
extends DoFn<KV<Integer, Iterable<KV<Long, TimestampedValue<ProducerRecord<K, V>>>>>, Void> {
private static final String NEXT_ID = "nextId";
private static final String MIN_BUFFERED_ID = "minBufferedId";
private static final String OUT_OF_ORDER_BUFFER = "outOfOrderBuffer";
private static final String WRITER_ID = "writerId";
// Not sure of a good limit. This applies only for large bundles.
private static final int MAX_RECORDS_PER_TXN = 1000;
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
@StateId(NEXT_ID)
private final StateSpec<ValueState<Long>> sequenceIdSpec = StateSpecs.value();
@StateId(MIN_BUFFERED_ID)
private final StateSpec<ValueState<Long>> minBufferedIdSpec = StateSpecs.value();
@StateId(OUT_OF_ORDER_BUFFER)
private final StateSpec<BagState<KV<Long, TimestampedValue<ProducerRecord<K, V>>>>>
outOfOrderBufferSpec;
// A random id assigned to each shard. Helps with detecting when multiple jobs are mistakenly
// started with same groupId used for storing state on Kafka side, including the case where
// a job is restarted with same groupId, but the metadata from previous run was not cleared.
// Better to be safe and error out with a clear message.
@StateId(WRITER_ID)
private final StateSpec<ValueState<String>> writerIdSpec = StateSpecs.value();
private final WriteRecords<K, V> spec;
// Metrics
private final Counter elementsWritten = SinkMetrics.elementsWritten();
// Elements buffered due to out of order arrivals.
private final Counter elementsBuffered = Metrics.counter(METRIC_NAMESPACE, "elementsBuffered");
private final Counter numTransactions = Metrics.counter(METRIC_NAMESPACE, "numTransactions");
ExactlyOnceWriter(WriteRecords<K, V> spec, Coder<ProducerRecord<K, V>> elemCoder) {
this.spec = spec;
this.outOfOrderBufferSpec =
StateSpecs.bag(KvCoder.of(BigEndianLongCoder.of(), TimestampedValueCoder.of(elemCoder)));
}
@Setup
public void setup() {
// This is on the worker. Ensure the runtime version is till compatible.
KafkaExactlyOnceSink.ensureEOSSupport();
}
// Futures ignored as exceptions will be flushed out in the commitTxn
@SuppressWarnings("FutureReturnValueIgnored")
@RequiresStableInput
@ProcessElement
public void processElement(
@StateId(NEXT_ID) ValueState<Long> nextIdState,
@StateId(MIN_BUFFERED_ID) ValueState<Long> minBufferedIdState,
@StateId(OUT_OF_ORDER_BUFFER)
BagState<KV<Long, TimestampedValue<ProducerRecord<K, V>>>> oooBufferState,
@StateId(WRITER_ID) ValueState<String> writerIdState,
ProcessContext ctx)
throws IOException {
int shard = ctx.element().getKey();
minBufferedIdState.readLater();
long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L);
long minBufferedId = MoreObjects.firstNonNull(minBufferedIdState.read(), Long.MAX_VALUE);
ShardWriterCache<K, V> cache =
(ShardWriterCache<K, V>) CACHE_BY_GROUP_ID.getUnchecked(spec.getSinkGroupId());
ShardWriter<K, V> writer = cache.removeIfPresent(shard);
if (writer == null) {
writer = initShardWriter(shard, writerIdState, nextId);
}
long committedId = writer.committedId;
if (committedId >= nextId) {
// This is a retry of an already committed batch.
LOG.info(
"{}: committed id {} is ahead of expected {}. {} records will be dropped "
+ "(these are already written).",
shard,
committedId,
nextId - 1,
committedId - nextId + 1);
nextId = committedId + 1;
}
try {
writer.beginTxn();
int txnSize = 0;
// Iterate in recordId order. The input iterator could be mostly sorted.
// There might be out of order messages buffered in earlier iterations. These
// will get merged if and when minBufferedId matches nextId.
Iterator<KV<Long, TimestampedValue<ProducerRecord<K, V>>>> iter =
ctx.element().getValue().iterator();
while (iter.hasNext()) {
KV<Long, TimestampedValue<ProducerRecord<K, V>>> kv = iter.next();
long recordId = kv.getKey();
if (recordId < nextId) {
LOG.info(
"{}: dropping older record {}. Already committed till {}",
shard,
recordId,
committedId);
continue;
}
if (recordId > nextId) {
// Out of order delivery. Should be pretty rare (what about in a batch pipeline?)
LOG.info(
"{}: Saving out of order record {}, next record id to be written is {}",
shard,
recordId,
nextId);
// checkState(recordId - nextId < 10000, "records are way out of order");
oooBufferState.add(kv);
minBufferedId = Math.min(minBufferedId, recordId);
minBufferedIdState.write(minBufferedId);
elementsBuffered.inc();
continue;
}
// recordId and nextId match. Finally write the record.
writer.sendRecord(kv.getValue(), elementsWritten);
nextId++;
if (++txnSize >= MAX_RECORDS_PER_TXN) {
writer.commitTxn(recordId, numTransactions);
txnSize = 0;
writer.beginTxn();
}
if (minBufferedId == nextId) {
// One or more of the buffered records can be committed now.
// Read all of them in to memory and sort them. Reading into memory
// might be problematic in extreme cases. Might need to improve it in future.
List<KV<Long, TimestampedValue<ProducerRecord<K, V>>>> buffered =
Lists.newArrayList(oooBufferState.read());
buffered.sort(new KV.OrderByKey<>());
LOG.info(
"{} : merging {} buffered records (min buffered id is {}).",
shard,
buffered.size(),
minBufferedId);
oooBufferState.clear();
minBufferedIdState.clear();
minBufferedId = Long.MAX_VALUE;
iter =
Iterators.mergeSorted(
ImmutableList.of(iter, buffered.iterator()), new KV.OrderByKey<>());
}
}
writer.commitTxn(nextId - 1, numTransactions);
nextIdState.write(nextId);
} catch (ProducerSpEL.UnrecoverableProducerException e) {
// Producer JavaDoc says these are not recoverable errors and producer should be closed.
// Close the producer and a new producer will be initialized in retry.
// It is possible that a rough worker keeps retrying and ends up fencing off
// active producers. How likely this might be or how well such a scenario is handled
// depends on the runner. For now we will leave it to upper layers, will need to revisit.
LOG.warn(
"{} : closing producer {} after unrecoverable error. The work might have migrated."
+ " Committed id {}, current id {}.",
writer.shard,
writer.producerName,
writer.committedId,
nextId - 1,
e);
writer.producer.close();
writer = null; // No need to cache it.
throw e;
} finally {
if (writer != null) {
cache.insert(shard, writer);
}
}
}
private static class ShardMetadata {
@JsonProperty("seq")
public final long sequenceId;
@JsonProperty("id")
public final String writerId;
private ShardMetadata() { // for json deserializer
sequenceId = -1;
writerId = null;
}
ShardMetadata(long sequenceId, String writerId) {
this.sequenceId = sequenceId;
this.writerId = writerId;
}
}
/** A wrapper around Kafka producer. One for each of the shards. */
private static class ShardWriter<K, V> {
private final int shard;
private final String writerId;
private final Producer<K, V> producer;
private final String producerName;
private final WriteRecords<K, V> spec;
private long committedId;
ShardWriter(
int shard,
String writerId,
Producer<K, V> producer,
String producerName,
WriteRecords<K, V> spec,
long committedId) {
this.shard = shard;
this.writerId = writerId;
this.producer = producer;
this.producerName = producerName;
this.spec = spec;
this.committedId = committedId;
}
void beginTxn() {
ProducerSpEL.beginTransaction(producer);
}
Future<RecordMetadata> sendRecord(
TimestampedValue<ProducerRecord<K, V>> record, Counter sendCounter) {
try {
Long timestampMillis =
spec.getPublishTimestampFunction() != null
? spec.getPublishTimestampFunction()
.getTimestamp(record.getValue(), record.getTimestamp())
.getMillis()
: null;
Future<RecordMetadata> result =
producer.send(
new ProducerRecord<>(
spec.getTopic(),
null,
timestampMillis,
record.getValue().key(),
record.getValue().value()));
sendCounter.inc();
return result;
} catch (KafkaException e) {
ProducerSpEL.abortTransaction(producer);
throw e;
}
}
void commitTxn(long lastRecordId, Counter numTransactions) throws IOException {
try {
// Store id in consumer group metadata for the partition.
// NOTE: Kafka keeps this metadata for 24 hours since the last update. This limits
// how long the pipeline could be down before resuming it. It does not look like
// this TTL can be adjusted (asked about it on Kafka users list).
ProducerSpEL.sendOffsetsToTransaction(
producer,
ImmutableMap.of(
new TopicPartition(spec.getTopic(), shard),
new OffsetAndMetadata(
0L,
JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId, writerId)))),
spec.getSinkGroupId());
ProducerSpEL.commitTransaction(producer);
numTransactions.inc();
LOG.debug("{} : committed {} records", shard, lastRecordId - committedId);
committedId = lastRecordId;
} catch (KafkaException e) {
ProducerSpEL.abortTransaction(producer);
throw e;
}
}
}
private ShardWriter<K, V> initShardWriter(
int shard, ValueState<String> writerIdState, long nextId) throws IOException {
String producerName = String.format("producer_%d_for_%s", shard, spec.getSinkGroupId());
Producer<K, V> producer = initializeExactlyOnceProducer(spec, producerName);
// Fetch latest committed metadata for the partition (if any). Checks committed sequence ids.
try {
String writerId = writerIdState.read();
OffsetAndMetadata committed;
try (Consumer<?, ?> consumer = openConsumer(spec)) {
committed = consumer.committed(new TopicPartition(spec.getTopic(), shard));
}
long committedSeqId = -1;
if (committed == null || committed.metadata() == null || committed.metadata().isEmpty()) {
checkState(
nextId == 0 && writerId == null,
"State exists for shard %s (nextId %s, writerId '%s'), but there is no state "
+ "stored with Kafka topic '%s' group id '%s'",
shard,
nextId,
writerId,
spec.getTopic(),
spec.getSinkGroupId());
writerId =
String.format(
"%X - %s",
new Random().nextInt(Integer.MAX_VALUE),
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
.withZone(DateTimeZone.UTC)
.print(DateTimeUtils.currentTimeMillis()));
writerIdState.write(writerId);
LOG.info("Assigned writer id '{}' to shard {}", writerId, shard);
} else {
ShardMetadata metadata = JSON_MAPPER.readValue(committed.metadata(), ShardMetadata.class);
checkNotNull(metadata.writerId);
if (writerId == null) {
// a) This might be a restart of the job from scratch, in which case metatdata
// should be ignored and overwritten with new one.
// b) This job might be started with an incorrect group id which is an error.
// c) There is an extremely small chance that this is a retry of the first bundle
// where metatdate was committed to Kafka but the bundle results were not committed
// in Beam, in which case it should be treated as correct metadata.
// How can we tell these three cases apart? Be safe and throw an exception.
//
// We could let users explicitly an option to override the existing metadata.
//
throw new IllegalStateException(
String.format(
"Kafka metadata exists for shard %s, but there is no stored state for it. "
+ "This mostly indicates groupId '%s' is used else where or in earlier runs. "
+ "Try another group id. Metadata for this shard on Kafka : '%s'",
shard, spec.getSinkGroupId(), committed.metadata()));
}
checkState(
writerId.equals(metadata.writerId),
"Writer ids don't match. This is mostly a unintended misuse of groupId('%s')."
+ "Beam '%s', Kafka '%s'",
spec.getSinkGroupId(),
writerId,
metadata.writerId);
committedSeqId = metadata.sequenceId;
checkState(
committedSeqId >= (nextId - 1),
"Committed sequence id can not be lower than %s, partition metadata : %s",
nextId - 1,
committed.metadata());
}
LOG.info(
"{} : initialized producer {} with committed sequence id {}",
shard,
producerName,
committedSeqId);
return new ShardWriter<>(shard, writerId, producer, producerName, spec, committedSeqId);
} catch (IOException e) {
producer.close();
throw e;
}
}
/**
* A wrapper around guava cache to provide insert()/remove() semantics. A ShardWriter will be
* closed if it is stays in cache for more than 1 minute, i.e. not used inside
* KafkaExactlyOnceSink DoFn for a minute.
*/
private static class ShardWriterCache<K, V> {
static final ScheduledExecutorService SCHEDULED_CLEAN_UP_THREAD =
Executors.newSingleThreadScheduledExecutor();
static final int CLEAN_UP_CHECK_INTERVAL_MS = 10 * 1000;
static final int IDLE_TIMEOUT_MS = 60 * 1000;
private final Cache<Integer, ShardWriter<K, V>> cache;
// Exceptions arising from the cache cleanup are ignored
@SuppressWarnings("FutureReturnValueIgnored")
ShardWriterCache() {
this.cache =
CacheBuilder.newBuilder()
.expireAfterWrite(IDLE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.<Integer, ShardWriter<K, V>>removalListener(
notification -> {
if (notification.getCause() != RemovalCause.EXPLICIT) {
ShardWriter writer = notification.getValue();
LOG.info(
"{} : Closing idle shard writer {} after 1 minute of idle time.",
writer.shard,
writer.producerName);
writer.producer.close();
}
})
.build();
// run cache.cleanUp() every 10 seconds.
SCHEDULED_CLEAN_UP_THREAD.scheduleAtFixedRate(
cache::cleanUp,
CLEAN_UP_CHECK_INTERVAL_MS,
CLEAN_UP_CHECK_INTERVAL_MS,
TimeUnit.MILLISECONDS);
}
ShardWriter<K, V> removeIfPresent(int shard) {
return cache.asMap().remove(shard);
}
void insert(int shard, ShardWriter<K, V> writer) {
ShardWriter<K, V> existing = cache.asMap().putIfAbsent(shard, writer);
checkState(
existing == null, "Unexpected multiple instances of writers for shard %s", shard);
}
}
// One cache for each sink (usually there is only one sink per pipeline)
private static final LoadingCache<String, ShardWriterCache<?, ?>> CACHE_BY_GROUP_ID =
CacheBuilder.newBuilder()
.build(
new CacheLoader<String, ShardWriterCache<?, ?>>() {
@Override
public ShardWriterCache<?, ?> load(String key) throws Exception {
return new ShardWriterCache<>();
}
});
}
/**
* Opens a generic consumer that is mainly meant for metadata operations like fetching number of
* partitions for a topic rather than for fetching messages.
*/
private static Consumer<?, ?> openConsumer(WriteRecords<?, ?> spec) {
return spec.getConsumerFactoryFn()
.apply(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
spec.getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
ConsumerConfig.GROUP_ID_CONFIG,
spec.getSinkGroupId(),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class));
}
private static <K, V> Producer<K, V> initializeExactlyOnceProducer(
WriteRecords<K, V> spec, String producerName) {
Map<String, Object> producerConfig = new HashMap<>(spec.getProducerConfig());
producerConfig.putAll(
ImmutableMap.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
spec.getKeySerializer(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
spec.getValueSerializer(),
ProducerSpEL.ENABLE_IDEMPOTENCE_CONFIG,
true,
ProducerSpEL.TRANSACTIONAL_ID_CONFIG,
producerName));
Producer<K, V> producer =
spec.getProducerFactoryFn() != null
? spec.getProducerFactoryFn().apply(producerConfig)
: new KafkaProducer<>(producerConfig);
ProducerSpEL.initTransactions(producer);
return producer;
}
}