blob: 234fd9afa4346a4c6cb6b60e2e64d02bbda8c2d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializableObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static java.util.Objects.requireNonNull;
/**
* Flink Sink to produce data into a Kafka topic.
*
* <p>Please note that this producer provides at-least-once reliability guarantees when checkpoints
* are enabled and setFlushOnCheckpoint(true) is set. Otherwise, the producer doesn't provide any
* reliability guarantees.
*
* @param <IN> Type of the messages to write into Kafka.
*/
@Internal
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN>
implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
private static final long serialVersionUID = 1L;
/** Configuration key for disabling the metrics reporting. */
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/** User defined properties for the Producer. */
protected final Properties producerConfig;
/** The name of the default topic this producer is writing data to. */
protected final String defaultTopicId;
/**
* (Serializable) SerializationSchema for turning objects used with Flink into. byte[] for
* Kafka.
*/
protected final KeyedSerializationSchema<IN> schema;
/** User-provided partitioner for assigning an object to a Kafka partition for each topic. */
protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
/** Partitions of each topic. */
protected final Map<String, int[]> topicPartitionsMap;
/** Flag indicating whether to accept failures (and log them), or to fail on failures. */
protected boolean logFailuresOnly;
/**
* If true, the producer will wait until all outstanding records have been send to the broker.
*/
protected boolean flushOnCheckpoint = true;
// -------------------------------- Runtime fields ------------------------------------------
/** KafkaProducer instance. */
protected transient KafkaProducer<byte[], byte[]> producer;
/** The callback than handles error propagation or logging callbacks. */
protected transient Callback callback;
/** Errors encountered in the async producer are stored here. */
protected transient volatile Exception asyncException;
/** Lock for accessing the pending records. */
protected final SerializableObject pendingRecordsLock = new SerializableObject();
/** Number of unacknowledged records. */
protected long pendingRecords;
/**
* The main constructor for creating a FlinkKafkaProducer.
*
* @param defaultTopicId The default topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into
* a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is
* the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka
* partitions. Passing null will use Kafka's partitioner.
*/
public FlinkKafkaProducerBase(
String defaultTopicId,
KeyedSerializationSchema<IN> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<IN> customPartitioner) {
requireNonNull(defaultTopicId, "TopicID not set");
requireNonNull(serializationSchema, "serializationSchema not set");
requireNonNull(producerConfig, "producerConfig not set");
ClosureCleaner.clean(
customPartitioner, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
ClosureCleaner.ensureSerializable(serializationSchema);
this.defaultTopicId = defaultTopicId;
this.schema = serializationSchema;
this.producerConfig = producerConfig;
this.flinkKafkaPartitioner = customPartitioner;
// set the producer configuration properties for kafka record key value serializers.
if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
} else {
LOG.warn(
"Overwriting the '{}' is not recommended",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
} else {
LOG.warn(
"Overwriting the '{}' is not recommended",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
// eagerly ensure that bootstrap servers are set.
if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
throw new IllegalArgumentException(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+ " must be supplied in the producer config properties.");
}
this.topicPartitionsMap = new HashMap<>();
}
// ---------------------------------- Properties --------------------------
/**
* Defines whether the producer should fail on errors, or only log them. If this is set to true,
* then exceptions will be only logged, if set to false, exceptions will be eventually thrown
* and cause the streaming program to fail (and enter recovery).
*
* @param logFailuresOnly The flag to indicate logging-only on exceptions.
*/
public void setLogFailuresOnly(boolean logFailuresOnly) {
this.logFailuresOnly = logFailuresOnly;
}
/**
* If set to true, the Flink producer will wait for all outstanding messages in the Kafka
* buffers to be acknowledged by the Kafka producer on a checkpoint. This way, the producer can
* guarantee that messages in the Kafka buffers are part of the checkpoint.
*
* @param flush Flag indicating the flushing mode (true = flush on checkpoint)
*/
public void setFlushOnCheckpoint(boolean flush) {
this.flushOnCheckpoint = flush;
}
/** Used for testing only. */
@VisibleForTesting
protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
return new KafkaProducer<>(props);
}
// ----------------------------------- Utilities --------------------------
/** Initializes the connection to Kafka. */
@Override
public void open(Configuration configuration) throws Exception {
if (schema instanceof KeyedSerializationSchemaWrapper) {
((KeyedSerializationSchemaWrapper<IN>) schema)
.getSerializationSchema()
.open(
RuntimeContextInitializationContextAdapters.serializationAdapter(
getRuntimeContext(),
metricGroup -> metricGroup.addGroup("user")));
}
producer = getKafkaProducer(this.producerConfig);
RuntimeContext ctx = getRuntimeContext();
if (null != flinkKafkaPartitioner) {
flinkKafkaPartitioner.open(
ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
LOG.info(
"Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}",
ctx.getIndexOfThisSubtask() + 1,
ctx.getNumberOfParallelSubtasks(),
defaultTopicId);
// register Kafka metrics to Flink accumulators
if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
if (metrics == null) {
// MapR's Kafka implementation returns null here.
LOG.info("Producer implementation does not support metrics");
} else {
final MetricGroup kafkaMetricGroup =
getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
kafkaMetricGroup.gauge(
metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
}
}
}
if (flushOnCheckpoint
&& !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn(
"Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
flushOnCheckpoint = false;
}
if (logFailuresOnly) {
callback =
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error(
"Error while sending record to Kafka: " + e.getMessage(),
e);
}
acknowledgeMessage();
}
};
} else {
callback =
new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
}
acknowledgeMessage();
}
};
}
}
/**
* Called when new data arrives to the sink, and forwards it to Kafka.
*
* @param next The incoming data
*/
@Override
public void invoke(IN next, Context context) throws Exception {
// propagate asynchronous errors
checkErroneous();
byte[] serializedKey = schema.serializeKey(next);
byte[] serializedValue = schema.serializeValue(next);
String targetTopic = schema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
int[] partitions = this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}
ProducerRecord<byte[], byte[]> record;
if (flinkKafkaPartitioner == null) {
record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue);
} else {
record =
new ProducerRecord<>(
targetTopic,
flinkKafkaPartitioner.partition(
next, serializedKey, serializedValue, targetTopic, partitions),
serializedKey,
serializedValue);
}
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
pendingRecords++;
}
}
producer.send(record, callback);
}
@Override
public void close() throws Exception {
if (producer != null) {
producer.close();
}
// make sure we propagate pending errors
checkErroneous();
}
// ------------------- Logic for handling checkpoint flushing -------------------------- //
private void acknowledgeMessage() {
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
pendingRecords--;
if (pendingRecords == 0) {
pendingRecordsLock.notifyAll();
}
}
}
}
/** Flush pending records. */
protected abstract void flush();
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// nothing to do
}
@Override
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
// check for asynchronous errors and fail the checkpoint if necessary
checkErroneous();
if (flushOnCheckpoint) {
// flushing is activated: We need to wait until pendingRecords is 0
flush();
synchronized (pendingRecordsLock) {
if (pendingRecords != 0) {
throw new IllegalStateException(
"Pending record count must be zero at this point: " + pendingRecords);
}
// if the flushed requests has errors, we should propagate it also and fail the
// checkpoint
checkErroneous();
}
}
}
// ----------------------------------- Utilities --------------------------
protected void checkErroneous() throws Exception {
Exception e = asyncException;
if (e != null) {
// prevent double throwing
asyncException = null;
throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
}
}
public static Properties getPropertiesFromBrokerList(String brokerList) {
String[] elements = brokerList.split(",");
// validate the broker addresses
for (String broker : elements) {
NetUtils.getCorrectHostnamePort(broker);
}
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return props;
}
protected static int[] getPartitionsByTopic(
String topic, KafkaProducer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same
// across subtasks
Collections.sort(
partitionsList,
new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
@VisibleForTesting
protected long numPendingRecords() {
synchronized (pendingRecordsLock) {
return pendingRecords;
}
}
}