blob: 697b0d46ef3f6a5fb0ea9421221f25da9788210d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.flink.streaming.connectors.kafka.v2;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.connectors.kafka.v2.common.Syncable;
import org.apache.flink.streaming.connectors.kafka.v2.common.TupleRichOutputFormat;
import org.apache.flink.types.Row;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializableObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static java.util.Objects.requireNonNull;
/** Kafka OutputFormat base class. */
public abstract class KafkaBaseOutputFormat extends TupleRichOutputFormat implements Syncable {
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
private static final long serialVersionUID = 1L;
/**
* Configuration key for disabling the metrics reporting.
*/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/**
* User defined properties for the Producer.
*/
protected final Properties producerConfig;
/**
* The name of the default topic this producer is writing data to.
*/
protected final String defaultTopicId;
/**
* (Serializable) SerializationSchema for turning objects used with Flink into.
* byte[] for Kafka.
*/
protected final KafkaConverter<Row> schema;
/**
* Partitions of each topic.
*/
protected final Map<String, int[]> topicPartitionsMap;
/**
* Flag indicating whether to accept failures (and log them), or to fail on failures.
*/
protected boolean logFailuresOnly;
/**
* If true, the producer will wait until all outstanding records have been send to the broker.
*/
protected boolean flushOnCheckpoint;
// -------------------------------- Runtime fields ------------------------------------------
/** KafkaProducer instance. */
protected transient KafkaProducer<byte[], byte[]> producer;
/** The callback than handles error propagation or logging callbacks. */
protected transient Callback callback;
/** Errors encountered in the async producer are stored here. */
protected transient volatile Exception asyncException;
/** Lock for accessing the pending records. */
protected final SerializableObject pendingRecordsLock = new SerializableObject();
/** Number of unacknowledged records. */
protected long pendingRecords;
public KafkaBaseOutputFormat(
String defaultTopicId, KafkaConverter serializationSchema, Properties
producerConfig) {
requireNonNull(defaultTopicId, "TopicID not set");
requireNonNull(serializationSchema, "serializationSchema not set");
requireNonNull(producerConfig, "producerConfig not set");
ClosureCleaner.ensureSerializable(serializationSchema);
this.defaultTopicId = defaultTopicId;
this.schema = serializationSchema;
this.producerConfig = producerConfig;
// set the producer configuration properties for kafka record key value serializers.
if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getCanonicalName());
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
}
if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
this.producerConfig.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getCanonicalName());
} else {
LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
}
// eagerly ensure that bootstrap servers are set.
if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
throw new IllegalArgumentException(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties.");
}
this.topicPartitionsMap = new HashMap<>();
}
// ---------------------------------- Properties --------------------------
/**
* Defines whether the producer should fail on errors, or only log them.
* If this is set to true, then exceptions will be only logged, if set to false,
* exceptions will be eventually thrown and cause the streaming program to
* fail (and enter recovery).
*
* @param logFailuresOnly The flag to indicate logging-only on exceptions.
*/
public void setLogFailuresOnly(boolean logFailuresOnly) {
this.logFailuresOnly = logFailuresOnly;
}
/**
* If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
* to be acknowledged by the Kafka producer on a checkpoint.
* This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
*
* @param flush Flag indicating the flushing mode (true = flush on checkpoint)
*/
public void setFlushOnCheckpoint(boolean flush) {
this.flushOnCheckpoint = flush;
}
/**
* Used for testing only.
*/
@VisibleForTesting
protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
return createKafkaProducer(props);
}
// load KafkaProducer via system classloader instead of application classloader, otherwise we will hit classloading
// issue in scala-shell scenario where kafka jar is not shipped with JobGraph, but shipped when starting yarn
// container.
private static KafkaProducer createKafkaProducer(Properties kafkaProperties) {
try {
return new KafkaProducer<>(kafkaProperties);
} catch (KafkaException e) {
ClassLoader original = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(null);
return new KafkaProducer<>(kafkaProperties);
} finally {
Thread.currentThread().setContextClassLoader(original);
}
}
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
producer = getKafkaProducer(this.producerConfig);
RuntimeContext ctx = getRuntimeContext();
schema.open(ctx);
LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}",
ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId);
// register Kafka metrics to Flink accumulators
if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
if (metrics == null) {
// MapR's Kafka implementation returns null here.
LOG.info("Producer implementation does not support metrics");
} else {
final MetricGroup kafkaMetricGroup = getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
for (Map.Entry<MetricName, ? extends Metric> metric : metrics.entrySet()) {
kafkaMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
}
}
}
if (logFailuresOnly) {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
}
acknowledgeMessage();
}
};
} else {
callback = new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null && asyncException == null) {
asyncException = exception;
}
acknowledgeMessage();
}
};
}
}
@Override
public void close() throws IOException {
schema.close();
}
// ------------------- Logic for handling checkpoint flushing -------------------------- //
private void acknowledgeMessage() {
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
pendingRecords--;
if (pendingRecords == 0) {
pendingRecordsLock.notifyAll();
}
}
}
}
protected abstract void flush();
@Override
public void writeAddRecord(Row row) throws IOException {
// propagate asynchronous errors
checkErroneous();
String targetTopic = schema.getTargetTopic(row);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
int[] partitions = this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}
ProducerRecord<byte[], byte[]> record;
record = schema.convert(row, targetTopic, partitions);
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
pendingRecords++;
}
}
if (record != null) {
producer.send(record, callback);
}
}
@Override
public void writeDeleteRecord(Row row) throws IOException {
}
@Override
public void sync() throws IOException {
// check for asynchronous errors and fail the checkpoint if necessary
checkErroneous();
if (flushOnCheckpoint) {
// flushing is activated: We need to wait until pendingRecords is 0
flush();
synchronized (pendingRecordsLock) {
if (pendingRecords != 0) {
throw new IllegalStateException(
"Pending record count must be zero at this point: " + pendingRecords);
}
// if the flushed requests has errors, we should propagate it also and fail the checkpoint
checkErroneous();
}
}
}
@Override
public String getName() {
return null;
}
@Override
public void configure(Configuration parameters) {
}
// ----------------------------------- Utilities --------------------------
protected void checkErroneous() {
Exception e = asyncException;
if (e != null) {
// prevent double throwing
asyncException = null;
throw new RuntimeException("Failed to send data to Kafka: " + e.getMessage(), e);
}
}
public static Properties getPropertiesFromBrokerList(String brokerList) {
String[] elements = brokerList.split(",");
// validate the broker addresses
for (String broker : elements) {
NetUtils.getCorrectHostnamePort(broker);
}
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
return props;
}
protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
// the fetched list is immutable, so we're creating a mutable copy in order to sort it
List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic));
// sort the partitions by partition id to make sure the fetched partition list is the same across subtasks
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
@Override
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for (int i = 0; i < partitions.length; i++) {
partitions[i] = partitionsList.get(i).partition();
}
return partitions;
}
@VisibleForTesting
protected long numPendingRecords() {
synchronized (pendingRecordsLock) {
return pendingRecords;
}
}
}