package org.apache.flink.streaming.connectors.kafka.v2;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.connectors.kafka.v2.common.Syncable;
import org.apache.flink.streaming.connectors.kafka.v2.common.TupleRichOutputFormat;
import org.apache.flink.types.Row;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializableObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static java.util.Objects.requireNonNull;
/** Kafka OutputFormat base class. */
public abstract class KafkaBaseOutputFormat extends TupleRichOutputFormat implements Syncable {
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
private static final long serialVersionUID = 1L;
* Configuration key for disabling the metrics reporting.
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
* User defined properties for the Producer.
protected final Properties producerConfig;
* The name of the default topic this producer is writing data to.
protected final String defaultTopicId;
* (Serializable) SerializationSchema for turning objects used with Flink into.
* byte[] for Kafka.
protected final KafkaConverter<Row> schema;
* Partitions of each topic.
protected final Map<String, int[]> topicPartitionsMap;
* Flag indicating whether to accept failures (and log them), or to fail on failures.
protected boolean logFailuresOnly;
* If true, the producer will wait until all outstanding records have been send to the broker.
protected boolean flushOnCheckpoint;
// -------------------------------- Runtime fields ------------------------------------------
/** KafkaProducer instance. */
protected transient KafkaProducer<byte[], byte[]> producer;
/** The callback than handles error propagation or logging callbacks. */
protected transient Callback callback;
/** Errors encountered in the async producer are stored here. */
protected transient volatile Exception asyncException;
/** Lock for accessing the pending records. */
protected final SerializableObject pendingRecordsLock = new SerializableObject();
/** Number of unacknowledged records. */
protected long pendingRecords;
public KafkaBaseOutputFormat(
String defaultTopicId, KafkaConverter serializationSchema, Properties
producerConfig) {
requireNonNull(defaultTopicId, "TopicID not set");
requireNonNull(serializationSchema, "serializationSchema not set");
requireNonNull(producerConfig, "producerConfig not set");
this.defaultTopicId = defaultTopicId;
this.schema = serializationSchema;
this.producerConfig = producerConfig;
// set the producer configuration properties for kafka record key value serializers.
if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
// eagerly ensure that bootstrap servers are set.
if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
throw new IllegalArgumentException(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
this.topicPartitionsMap = new HashMap<>();
// ---------------------------------- Properties --------------------------
* Defines whether the producer should fail on errors, or only log them.
* If this is set to true, then exceptions will be only logged, if set to false,
* exceptions will be eventually thrown and cause the streaming program to
* fail (and enter recovery).
* @param logFailuresOnly The flag to indicate logging-only on exceptions.
public void setLogFailuresOnly(boolean logFailuresOnly) {
this.logFailuresOnly = logFailuresOnly;
* If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
* to be acknowledged by the Kafka producer on a checkpoint.
* This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
* @param flush Flag indicating the flushing mode (true = flush on checkpoint)
public void setFlushOnCheckpoint(boolean flush) {
this.flushOnCheckpoint = flush;
* Used for testing only.
protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
return createKafkaProducer(props);
// load KafkaProducer via system classloader instead of application classloader, otherwise we will hit classloading
// issue in scala-shell scenario where kafka jar is not shipped with JobGraph, but shipped when starting yarn
// container.
private static KafkaProducer createKafkaProducer(Properties kafkaProperties) {
try {
return new KafkaProducer<>(kafkaProperties);
} catch (KafkaException e) {
ClassLoader original = Thread.currentThread().getContextClassLoader();
try {
return new KafkaProducer<>(kafkaProperties);
} finally {
public void open(int taskNumber, int numTasks) throws IOException {, numTasks);
producer = getKafkaProducer(this.producerConfig);
RuntimeContext ctx = getRuntimeContext();;"Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}",
ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
// register Kafka metrics to Flink accumulators
if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
if (metrics == null) {
// MapR's Kafka implementation returns null here."Producer implementation does not support metrics");
} else {
final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
if (logFailuresOnly) {
callback = new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
} else {
callback = new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
public void close() throws IOException {
// ------------------- Logic for handling checkpoint flushing -------------------------- //
private void acknowledgeMessage() {
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
if (pendingRecords == 0) {
protected abstract void flush();
public void writeAddRecord(Row row) throws IOException {
// propagate asynchronous errors
String targetTopic = schema.getTargetTopic(row);
if (targetTopic == null) {
targetTopic = defaultTopicId;
int[] partitions = this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, producer);
this.topicPartitionsMap.put(targetTopic, partitions);
ProducerRecord<byte[], byte[]> record;
record = schema.convert(row, targetTopic, partitions);
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
if (record != null) {
producer.send(record, callback);
public void writeDeleteRecord(Row row) throws IOException {
public void sync() throws IOException {
// check for asynchronous errors and fail the checkpoint if necessary
if (flushOnCheckpoint) {
// flushing is activated: We need to wait until pendingRecords is 0
synchronized (pendingRecordsLock) {
if (pendingRecords != 0) {
throw new IllegalStateException(
"Pending record count must be zero at this point: " + pendingRecords);
// if the flushed requests has errors, we should propagate it also and fail the checkpoint
public String getName() {
return null;
public void configure(Configuration parameters) {
// ----------------------------------- Utilities --------------------------
protected void checkErroneous() {
Exception e = asyncException;
if (e != null) {
// prevent double throwing
asyncException = null;
throw new RuntimeException("Failed to send data to Kafka: " + e.getMessage(), e);
public static Properties getPropertiesFromBrokerList(String brokerList) {
String[] elements = brokerList.split(",");
// validate the broker addresses
for (String broker : elements) {
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return props;
protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
public int compare(PartitionInfo o1, PartitionInfo o2) {
return, o2.partition());
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
return partitions;
protected long numPendingRecords() {
synchronized (pendingRecordsLock) {
return pendingRecords;