blob: 8ee288febc28c1b6e3fe652c12060a28dc26011d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Properties;
/**
* Flink Sink to produce data into a Kafka topic. This producer is compatible with Kafka 0.10.x
*/
@PublicEvolving
public class FlinkKafkaProducer010<T> extends FlinkKafkaProducer09<T> {
private static final long serialVersionUID = 1L;
/**
* Flag controlling whether we are writing the Flink record's timestamp into Kafka.
*/
private boolean writeTimestampToKafka = false;
// ---------------------- Regular constructors------------------
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param brokerList
* Comma separated addresses of the brokers
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined key-less serialization schema.
*/
public FlinkKafkaProducer010(String brokerList, String topicId, SerializationSchema<T> serializationSchema) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
}
/**
* Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined key-less serialization schema.
* @param producerConfig
* Properties with the producer configuration.
*/
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
}
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
* attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
* partitions in a round-robin fashion.
*
* @param topicId The topic to write data to
* @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If set to {@code null}, records will be distributed to Kafka partitions
* in a round-robin fashion.
*/
public FlinkKafkaProducer010(
String topicId,
SerializationSchema<T> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<T> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
}
// ------------------- Key/Value serialization schema constructors ----------------------
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param brokerList
* Comma separated addresses of the brokers
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined serialization schema supporting key/value messages
*/
public FlinkKafkaProducer010(String brokerList, String topicId, KeyedSerializationSchema<T> serializationSchema) {
this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
}
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>Using this constructor, the default {@link FlinkFixedPartitioner} will be used as
* the partitioner. This default partitioner maps each sink subtask to a single Kafka
* partition (i.e. all records received by a sink subtask will end up in the same
* Kafka partition).
*
* <p>To use a custom partitioner, please use
* {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*
* @param topicId
* ID of the Kafka topic.
* @param serializationSchema
* User defined serialization schema supporting key/value messages
* @param producerConfig
* Properties with the producer configuration.
*/
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
}
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>If a partitioner is not provided, written records will be partitioned by the attached key of each
* record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not
* have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they
* will be distributed to Kafka partitions in a round-robin fashion.
*
* @param topicId The topic to write data to
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If set to {@code null}, records will be partitioned by the key of each record
* (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys
* are {@code null}, then records will be distributed to Kafka partitions in a
* round-robin fashion.
*/
public FlinkKafkaProducer010(
String topicId,
KeyedSerializationSchema<T> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<T> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
// ------------------- User configuration ----------------------
/**
* If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
* Timestamps must be positive for Kafka to accept them.
*
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
*/
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
this.writeTimestampToKafka = writeTimestampToKafka;
}
// ----------------------------- Deprecated constructors / factory methods ---------------------------
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
*
* @param inStream The stream to write to Kafka
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined serialization schema supporting key/value messages
* @param producerConfig Properties with the producer configuration.
*
* @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
* and call {@link #setWriteTimestampToKafka(boolean)}.
*/
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
KeyedSerializationSchema<T> serializationSchema,
Properties producerConfig) {
return writeToKafkaWithTimestamps(inStream, topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
}
/**
* Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
* the topic.
*
* <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
*
* @param inStream The stream to write to Kafka
* @param topicId ID of the Kafka topic.
* @param serializationSchema User defined (keyless) serialization schema.
* @param producerConfig Properties with the producer configuration.
*
* @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties)}
* and call {@link #setWriteTimestampToKafka(boolean)}.
*/
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
SerializationSchema<T> serializationSchema,
Properties producerConfig) {
return writeToKafkaWithTimestamps(inStream, topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new FlinkFixedPartitioner<T>());
}
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
*
* @param inStream The stream to write to Kafka
* @param topicId The name of the target topic
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
*
* @deprecated Use {@link #FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)}
* and call {@link #setWriteTimestampToKafka(boolean)}.
*/
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
KeyedSerializationSchema<T> serializationSchema,
Properties producerConfig,
FlinkKafkaPartitioner<T> customPartitioner) {
FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner);
DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
return new FlinkKafkaProducer010Configuration<>(streamSink, inStream, kafkaProducer);
}
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to
* the topic.
*
* <p>This constructor allows writing timestamps to Kafka, it follow approach (b) (see above)
*
* @param inStream The stream to write to Kafka
* @param topicId The name of the target topic
* @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
*
* @deprecated This is a deprecated since it does not correctly handle partitioning when
* producing to multiple topics. Use
* {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*/
@Deprecated
public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream,
String topicId,
KeyedSerializationSchema<T> serializationSchema,
Properties producerConfig,
KafkaPartitioner<T> customPartitioner) {
FlinkKafkaProducer010<T> kafkaProducer =
new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner));
DataStreamSink<T> streamSink = inStream.addSink(kafkaProducer);
return new FlinkKafkaProducer010Configuration<T>(streamSink, inStream, kafkaProducer);
}
/**
* Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to
* the topic.
*
* @param topicId The topic to write data to
* @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
*
* @deprecated This is a deprecated since it does not correctly handle partitioning when
* producing to multiple topics. Use
* {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*/
@Deprecated
public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
}
/**
* Create Kafka producer.
*
* <p>This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above)
*
* @deprecated This is a deprecated constructor that does not correctly handle partitioning when
* producing to multiple topics. Use
* {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
*/
@Deprecated
public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) {
// We create a Kafka 09 producer instance here and only "override" (by intercepting) the
// invoke call.
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
// ----------------------------- Generic element processing ---------------------------
@Override
protected ProducerRecord<byte[], byte[]> buildProducerRecord(
Context context, String topic, Integer partition, byte[] keyBytes, byte[] valueBytes) {
Long timestamp = null;
if (this.writeTimestampToKafka) {
timestamp = context.timestamp();
}
return new ProducerRecord<>(topic, partition, timestamp, keyBytes, valueBytes);
}
/**
* Configuration object returned by the writeToKafkaWithTimestamps() call.
*
* <p>This is only kept because it's part of the public API. It is not necessary anymore, now
* that the {@link SinkFunction} interface provides timestamps.</p>
*
* <p>To enable the settings, this fake sink must override all the public methods
* in {@link DataStreamSink}.</p>
*
* @deprecated This class is deprecated since the factory methods {@code writeToKafkaWithTimestamps}
* for the producer are also deprecated.
*/
@Deprecated
public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {
private final FlinkKafkaProducer010 producer;
private final SinkTransformation<T> transformation;
private FlinkKafkaProducer010Configuration(
DataStreamSink<T> originalSink,
DataStream<T> inputStream,
FlinkKafkaProducer010<T> producer) {
//noinspection unchecked
super(inputStream, originalSink.getTransformation().getOperator());
this.transformation = originalSink.getTransformation();
this.producer = producer;
}
/**
* Defines whether the producer should fail on errors, or only log them.
* If this is set to true, then exceptions will be only logged, if set to false,
* exceptions will be eventually thrown and cause the streaming program to
* fail (and enter recovery).
*
* @param logFailuresOnly The flag to indicate logging-only on exceptions.
*/
public void setLogFailuresOnly(boolean logFailuresOnly) {
producer.setLogFailuresOnly(logFailuresOnly);
}
/**
* If set to true, the Flink producer will wait for all outstanding messages in the Kafka buffers
* to be acknowledged by the Kafka producer on a checkpoint.
* This way, the producer can guarantee that messages in the Kafka buffers are part of the checkpoint.
*
* @param flush Flag indicating the flushing mode (true = flush on checkpoint)
*/
public void setFlushOnCheckpoint(boolean flush) {
producer.setFlushOnCheckpoint(flush);
}
/**
* If set to true, Flink will write the (event time) timestamp attached to each record into Kafka.
* Timestamps must be positive for Kafka to accept them.
*
* @param writeTimestampToKafka Flag indicating if Flink's internal timestamps are written to Kafka.
*/
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
producer.writeTimestampToKafka = writeTimestampToKafka;
}
// *************************************************************************
// Override methods to use the transformation in this class.
// *************************************************************************
@Override
public SinkTransformation<T> getTransformation() {
return transformation;
}
@Override
public DataStreamSink<T> name(String name) {
transformation.setName(name);
return this;
}
@Override
public DataStreamSink<T> uid(String uid) {
transformation.setUid(uid);
return this;
}
@Override
public DataStreamSink<T> setUidHash(String uidHash) {
transformation.setUidHash(uidHash);
return this;
}
@Override
public DataStreamSink<T> setParallelism(int parallelism) {
transformation.setParallelism(parallelism);
return this;
}
@Override
public DataStreamSink<T> disableChaining() {
this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
return this;
}
@Override
public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
}
}