| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE |
| * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file |
| * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the |
| * License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on |
| * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations under the License. |
| */ |
| package org.apache.kafka.clients.producer; |
| |
| import java.net.InetSocketAddress; |
| import java.util.Collections; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.kafka.clients.ClientUtils; |
| import org.apache.kafka.clients.Metadata; |
| import org.apache.kafka.clients.NetworkClient; |
| import org.apache.kafka.clients.producer.internals.RecordAccumulator; |
| import org.apache.kafka.clients.producer.internals.Sender; |
| import org.apache.kafka.clients.producer.internals.ProducerInterceptors; |
| import org.apache.kafka.common.Cluster; |
| import org.apache.kafka.common.KafkaException; |
| import org.apache.kafka.common.Metric; |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.config.ConfigException; |
| import org.apache.kafka.common.errors.ApiException; |
| import org.apache.kafka.common.errors.InterruptException; |
| import org.apache.kafka.common.errors.RecordTooLargeException; |
| import org.apache.kafka.common.errors.SerializationException; |
| import org.apache.kafka.common.errors.TimeoutException; |
| import org.apache.kafka.common.errors.TopicAuthorizationException; |
| import org.apache.kafka.common.metrics.JmxReporter; |
| import org.apache.kafka.common.metrics.MetricConfig; |
| import org.apache.kafka.common.MetricName; |
| import org.apache.kafka.common.metrics.Metrics; |
| import org.apache.kafka.common.metrics.MetricsReporter; |
| import org.apache.kafka.common.metrics.Sensor; |
| import org.apache.kafka.common.network.Selector; |
| import org.apache.kafka.common.network.ChannelBuilder; |
| import org.apache.kafka.common.record.CompressionType; |
| import org.apache.kafka.common.record.Record; |
| import org.apache.kafka.common.record.Records; |
| import org.apache.kafka.common.serialization.Serializer; |
| import org.apache.kafka.common.utils.AppInfoParser; |
| import org.apache.kafka.common.utils.KafkaThread; |
| import org.apache.kafka.common.utils.SystemTime; |
| import org.apache.kafka.common.utils.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * A Kafka client that publishes records to the Kafka cluster. |
| * <P> |
| * The producer is <i>thread safe</i> and sharing a single producer instance across threads will generally be faster than |
| * having multiple instances. |
| * <p> |
| * Here is a simple example of using the producer to send records with strings containing sequential numbers as the key/value |
| * pairs. |
| * <pre> |
| * {@code |
| * Properties props = new Properties(); |
| * props.put("bootstrap.servers", "localhost:4242"); |
| * props.put("acks", "all"); |
| * props.put("retries", 0); |
| * props.put("batch.size", 16384); |
| * props.put("linger.ms", 1); |
| * props.put("buffer.memory", 33554432); |
| * props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| * props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| * |
| * Producer<String, String> producer = new KafkaProducer<>(props); |
| * for(int i = 0; i < 100; i++) |
| * producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); |
| * |
| * producer.close(); |
| * }</pre> |
| * <p> |
| * The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server |
| * as well as a background I/O thread that is responsible for turning these records into requests and transmitting them |
| * to the cluster. Failure to close the producer after use will leak these resources. |
| * <p> |
| * The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends |
| * and immediately returns. This allows the producer to batch together individual records for efficiency. |
| * <p> |
| * The <code>acks</code> config controls the criteria under which requests are considered complete. The "all" setting |
| * we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. |
| * <p> |
| * If the request fails, the producer can automatically retry, though since we have specified <code>retries</code> |
| * as 0 it won't. Enabling retries also opens up the possibility of duplicates (see the documentation on |
| * <a href="http://kafka.apache.org/documentation.html#semantics">message delivery semantics</a> for details). |
| * <p> |
| * The producer maintains buffers of unsent records for each partition. These buffers are of a size specified by |
| * the <code>batch.size</code> config. Making this larger can result in more batching, but requires more memory (since we will |
| * generally have one of these buffers for each active partition). |
| * <p> |
| * By default a buffer is available to send immediately even if there is additional unused space in the buffer. However if you |
| * want to reduce the number of requests you can set <code>linger.ms</code> to something greater than 0. This will |
| * instruct the producer to wait up to that number of milliseconds before sending a request in hope that more records will |
| * arrive to fill up the same batch. This is analogous to Nagle's algorithm in TCP. For example, in the code snippet above, |
| * likely all 100 records would be sent in a single request since we set our linger time to 1 millisecond. However this setting |
| * would add 1 millisecond of latency to our request waiting for more records to arrive if we didn't fill up the buffer. Note that |
| * records that arrive close together in time will generally batch together even with <code>linger.ms=0</code> so under heavy load |
| * batching will occur regardless of the linger configuration; however setting this to something larger than 0 can lead to fewer, more |
| * efficient requests when not under maximal load at the cost of a small amount of latency. |
| * <p> |
| * The <code>buffer.memory</code> controls the total amount of memory available to the producer for buffering. If records |
| * are sent faster than they can be transmitted to the server then this buffer space will be exhausted. When the buffer space is |
| * exhausted additional send calls will block. For uses where you want to avoid any blocking you can set <code>block.on.buffer.full=false</code> which |
| * will cause the send call to result in an exception. |
| * <p> |
| * The <code>key.serializer</code> and <code>value.serializer</code> instruct how to turn the key and value objects the user provides with |
| * their <code>ProducerRecord</code> into bytes. You can use the included {@link org.apache.kafka.common.serialization.ByteArraySerializer} or |
| * {@link org.apache.kafka.common.serialization.StringSerializer} for simple string or byte types. |
| */ |
| public class KafkaProducer<K, V> implements Producer<K, V> { |
| |
| private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class); |
| private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); |
| private static final String JMX_PREFIX = "kafka.producer"; |
| |
| private String clientId; |
| private final Partitioner partitioner; |
| private final int maxRequestSize; |
| private final long totalMemorySize; |
| private final Metadata metadata; |
| private final RecordAccumulator accumulator; |
| private final Sender sender; |
| private final Metrics metrics; |
| private final Thread ioThread; |
| private final CompressionType compressionType; |
| private final Sensor errors; |
| private final Time time; |
| private final Serializer<K> keySerializer; |
| private final Serializer<V> valueSerializer; |
| private final ProducerConfig producerConfig; |
| private final long maxBlockTimeMs; |
| private final int requestTimeoutMs; |
| private final ProducerInterceptors<K, V> interceptors; |
| |
| /** |
| * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings |
| * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. Values can be |
| * either strings or Objects of the appropriate type (for example a numeric configuration would accept either the |
| * string "42" or the integer 42). |
| * @param configs The producer configs |
| * |
| */ |
| public KafkaProducer(Map<String, Object> configs) { |
| this(new ProducerConfig(configs), null, null); |
| } |
| |
| /** |
| * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}. |
| * Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. |
| * Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept |
| * either the string "42" or the integer 42). |
| * @param configs The producer configs |
| * @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be |
| * called in the producer when the serializer is passed in directly. |
| * @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't |
| * be called in the producer when the serializer is passed in directly. |
| */ |
| public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) { |
| this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)), |
| keySerializer, valueSerializer); |
| } |
| |
| /** |
| * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings |
| * are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. |
| * @param properties The producer configs |
| */ |
| public KafkaProducer(Properties properties) { |
| this(new ProducerConfig(properties), null, null); |
| } |
| |
| /** |
| * A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}. |
| * Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>. |
| * @param properties The producer configs |
| * @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be |
| * called in the producer when the serializer is passed in directly. |
| * @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't |
| * be called in the producer when the serializer is passed in directly. |
| */ |
| public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) { |
| this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)), |
| keySerializer, valueSerializer); |
| } |
| |
| @SuppressWarnings({"unchecked", "deprecation"}) |
| private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) { |
| try { |
| log.trace("Starting the Kafka producer"); |
| Map<String, Object> userProvidedConfigs = config.originals(); |
| this.producerConfig = config; |
| this.time = new SystemTime(); |
| |
| clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); |
| if (clientId.length() <= 0) |
| clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); |
| Map<String, String> metricTags = new LinkedHashMap<String, String>(); |
| metricTags.put("client-id", clientId); |
| MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) |
| .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) |
| .tags(metricTags); |
| List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, |
| MetricsReporter.class); |
| reporters.add(new JmxReporter(JMX_PREFIX)); |
| this.metrics = new Metrics(metricConfig, reporters, time); |
| this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); |
| long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); |
| this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); |
| this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); |
| this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); |
| this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); |
| /* check for user defined settings. |
| * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG. |
| * This should be removed with release 0.9 when the deprecated configs are removed. |
| */ |
| if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) { |
| log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + |
| "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); |
| boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG); |
| if (blockOnBufferFull) { |
| this.maxBlockTimeMs = Long.MAX_VALUE; |
| } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { |
| log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + |
| "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); |
| this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); |
| } else { |
| this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); |
| } |
| } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { |
| log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + |
| "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); |
| this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); |
| } else { |
| this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); |
| } |
| |
| /* check for user defined settings. |
| * If the TIME_OUT config is set use that for request timeout. |
| * This should be removed with release 0.9 |
| */ |
| if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) { |
| log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " + |
| ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); |
| this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG); |
| } else { |
| this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); |
| } |
| |
| this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), |
| this.totalMemorySize, |
| this.compressionType, |
| config.getLong(ProducerConfig.LINGER_MS_CONFIG), |
| retryBackoffMs, |
| metrics, |
| time); |
| List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); |
| this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); |
| ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); |
| NetworkClient client = new NetworkClient( |
| new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), |
| this.metadata, |
| clientId, |
| config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), |
| config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), |
| config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), |
| config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), |
| this.requestTimeoutMs, time); |
| this.sender = new Sender(client, |
| this.metadata, |
| this.accumulator, |
| config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, |
| config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), |
| (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), |
| config.getInt(ProducerConfig.RETRIES_CONFIG), |
| this.metrics, |
| new SystemTime(), |
| clientId, |
| this.requestTimeoutMs); |
| String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); |
| this.ioThread = new KafkaThread(ioThreadName, this.sender, true); |
| this.ioThread.start(); |
| |
| this.errors = this.metrics.sensor("errors"); |
| |
| if (keySerializer == null) { |
| this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, |
| Serializer.class); |
| this.keySerializer.configure(config.originals(), true); |
| } else { |
| config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); |
| this.keySerializer = keySerializer; |
| } |
| if (valueSerializer == null) { |
| this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, |
| Serializer.class); |
| this.valueSerializer.configure(config.originals(), false); |
| } else { |
| config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); |
| this.valueSerializer = valueSerializer; |
| } |
| |
| // load interceptors and make sure they get clientId |
| userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); |
| List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, |
| ProducerInterceptor.class); |
| this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList); |
| |
| config.logUnused(); |
| AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); |
| log.debug("Kafka producer started"); |
| } catch (Throwable t) { |
| // call close methods if internal objects are already constructed |
| // this is to prevent resource leak. see KAFKA-2121 |
| close(0, TimeUnit.MILLISECONDS, true); |
| // now propagate the exception |
| throw new KafkaException("Failed to construct kafka producer", t); |
| } |
| } |
| |
| private static int parseAcks(String acksString) { |
| try { |
| return acksString.trim().toLowerCase().equals("all") ? -1 : Integer.parseInt(acksString.trim()); |
| } catch (NumberFormatException e) { |
| throw new ConfigException("Invalid configuration value for 'acks': " + acksString); |
| } |
| } |
| |
| /** |
| * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. |
| * See {@link #send(ProducerRecord, Callback)} for details. |
| */ |
| @Override |
| public Future<RecordMetadata> send(ProducerRecord<K, V> record) { |
| return send(record, null); |
| } |
| |
| /** |
| * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged. |
| * <p> |
| * The send is asynchronous and this method will return immediately once the record has been stored in the buffer of |
| * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the |
| * response after each one. |
| * <p> |
| * The result of the send is a {@link RecordMetadata} specifying the partition the record was sent to, the offset |
| * it was assigned and the timestamp of the record. If |
| * {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime} is used by the topic, the timestamp |
| * will be the user provided timestamp or the record send time if the user did not specify a timestamp for the |
| * record. If {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime} is used for the |
| * topic, the timestamp will be the Kafka broker local time when the message is appended. |
| * <p> |
| * Since the send call is asynchronous it returns a {@link java.util.concurrent.Future Future} for the |
| * {@link RecordMetadata} that will be assigned to this record. Invoking {@link java.util.concurrent.Future#get() |
| * get()} on this future will block until the associated request completes and then return the metadata for the record |
| * or throw any exception that occurred while sending the record. |
| * <p> |
| * If you want to simulate a simple blocking call you can call the <code>get()</code> method immediately: |
| * |
| * <pre> |
| * {@code |
| * byte[] key = "key".getBytes(); |
| * byte[] value = "value".getBytes(); |
| * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value) |
| * producer.send(record).get(); |
| * }</pre> |
| * <p> |
| * Fully non-blocking usage can make use of the {@link Callback} parameter to provide a callback that |
| * will be invoked when the request is complete. |
| * |
| * <pre> |
| * {@code |
| * ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); |
| * producer.send(myRecord, |
| * new Callback() { |
| * public void onCompletion(RecordMetadata metadata, Exception e) { |
| * if(e != null) |
| * e.printStackTrace(); |
| * System.out.println("The offset of the record we just sent is: " + metadata.offset()); |
| * } |
| * }); |
| * } |
| * </pre> |
| * |
| * Callbacks for records being sent to the same partition are guaranteed to execute in order. That is, in the |
| * following example <code>callback1</code> is guaranteed to execute before <code>callback2</code>: |
| * |
| * <pre> |
| * {@code |
| * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1); |
| * producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2); |
| * } |
| * </pre> |
| * <p> |
| * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or |
| * they will delay the sending of messages from other threads. If you want to execute blocking or computationally |
| * expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body |
| * to parallelize processing. |
| * |
| * @param record The record to send |
| * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null |
| * indicates no callback) |
| * |
| * @throws InterruptException If the thread is interrupted while blocked |
| * @throws SerializationException If the key or value are not valid objects given the configured serializers |
| * @throws BufferExhaustedException If <code>block.on.buffer.full=false</code> and the buffer is full. |
| * |
| */ |
| @Override |
| public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { |
| // intercept the record, which can be potentially modified; this method does not throw exceptions |
| ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); |
| return doSend(interceptedRecord, callback); |
| } |
| |
| /** |
| * Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. |
| * See {@link #send(ProducerRecord, Callback)} for details. |
| */ |
| private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { |
| TopicPartition tp = null; |
| try { |
| // first make sure the metadata for the topic is available |
| long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs); |
| long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs); |
| byte[] serializedKey; |
| try { |
| serializedKey = keySerializer.serialize(record.topic(), record.key()); |
| } catch (ClassCastException cce) { |
| throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + |
| " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + |
| " specified in key.serializer"); |
| } |
| byte[] serializedValue; |
| try { |
| serializedValue = valueSerializer.serialize(record.topic(), record.value()); |
| } catch (ClassCastException cce) { |
| throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + |
| " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + |
| " specified in value.serializer"); |
| } |
| int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); |
| int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); |
| ensureValidRecordSize(serializedSize); |
| tp = new TopicPartition(record.topic(), partition); |
| long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); |
| log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); |
| // producer callback will make sure to call both 'callback' and interceptor callback |
| Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); |
| RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); |
| if (result.batchIsFull || result.newBatchCreated) { |
| log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); |
| this.sender.wakeup(); |
| } |
| return result.future; |
| // handling exceptions and record the errors; |
| // for API exceptions return them in the future, |
| // for other exceptions throw directly |
| } catch (ApiException e) { |
| log.debug("Exception occurred during message send:", e); |
| if (callback != null) |
| callback.onCompletion(null, e); |
| this.errors.record(); |
| if (this.interceptors != null) |
| this.interceptors.onSendError(record, tp, e); |
| return new FutureFailure(e); |
| } catch (InterruptedException e) { |
| this.errors.record(); |
| if (this.interceptors != null) |
| this.interceptors.onSendError(record, tp, e); |
| throw new InterruptException(e); |
| } catch (BufferExhaustedException e) { |
| this.errors.record(); |
| this.metrics.sensor("buffer-exhausted-records").record(); |
| if (this.interceptors != null) |
| this.interceptors.onSendError(record, tp, e); |
| throw e; |
| } catch (KafkaException e) { |
| this.errors.record(); |
| if (this.interceptors != null) |
| this.interceptors.onSendError(record, tp, e); |
| throw e; |
| } catch (Exception e) { |
| // we notify interceptor about all exceptions, since onSend is called before anything else in this method |
| if (this.interceptors != null) |
| this.interceptors.onSendError(record, tp, e); |
| throw e; |
| } |
| } |
| |
| /** |
| * Wait for cluster metadata including partitions for the given topic to be available. |
| * @param topic The topic we want metadata for |
| * @param maxWaitMs The maximum time in ms for waiting on the metadata |
| * @return The amount of time we waited in ms |
| */ |
| private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { |
| // add topic to metadata topic list if it is not there already. |
| if (!this.metadata.containsTopic(topic)) |
| this.metadata.add(topic); |
| |
| if (metadata.fetch().partitionsForTopic(topic) != null) |
| return 0; |
| |
| long begin = time.milliseconds(); |
| long remainingWaitMs = maxWaitMs; |
| while (metadata.fetch().partitionsForTopic(topic) == null) { |
| log.trace("Requesting metadata update for topic {}.", topic); |
| int version = metadata.requestUpdate(); |
| sender.wakeup(); |
| metadata.awaitUpdate(version, remainingWaitMs); |
| long elapsed = time.milliseconds() - begin; |
| if (elapsed >= maxWaitMs) |
| throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms."); |
| if (metadata.fetch().unauthorizedTopics().contains(topic)) |
| throw new TopicAuthorizationException(topic); |
| remainingWaitMs = maxWaitMs - elapsed; |
| } |
| return time.milliseconds() - begin; |
| } |
| |
| /** |
| * Validate that the record size isn't too large |
| */ |
| private void ensureValidRecordSize(int size) { |
| if (size > this.maxRequestSize) |
| throw new RecordTooLargeException("The message is " + size + |
| " bytes when serialized which is larger than the maximum request size you have configured with the " + |
| ProducerConfig.MAX_REQUEST_SIZE_CONFIG + |
| " configuration."); |
| if (size > this.totalMemorySize) |
| throw new RecordTooLargeException("The message is " + size + |
| " bytes when serialized which is larger than the total memory buffer you have configured with the " + |
| ProducerConfig.BUFFER_MEMORY_CONFIG + |
| " configuration."); |
| } |
| |
| /** |
| * Invoking this method makes all buffered records immediately available to send (even if <code>linger.ms</code> is |
| * greater than 0) and blocks on the completion of the requests associated with these records. The post-condition |
| * of <code>flush()</code> is that any previously sent record will have completed (e.g. <code>Future.isDone() == true</code>). |
| * A request is considered completed when it is successfully acknowledged |
| * according to the <code>acks</code> configuration you have specified or else it results in an error. |
| * <p> |
| * Other threads can continue sending records while one thread is blocked waiting for a flush call to complete, |
| * however no guarantee is made about the completion of records sent after the flush call begins. |
| * <p> |
| * This method can be useful when consuming from some input system and producing into Kafka. The <code>flush()</code> call |
| * gives a convenient way to ensure all previously sent messages have actually completed. |
| * <p> |
| * This example shows how to consume from one Kafka topic and produce to another Kafka topic: |
| * <pre> |
| * {@code |
| * for(ConsumerRecord<String, String> record: consumer.poll(100)) |
| * producer.send(new ProducerRecord("my-topic", record.key(), record.value()); |
| * producer.flush(); |
| * consumer.commit(); |
| * } |
| * </pre> |
| * |
| * Note that the above example may drop records if the produce request fails. If we want to ensure that this does not occur |
| * we need to set <code>retries=<large_number></code> in our config. |
| * |
| * @throws InterruptException If the thread is interrupted while blocked |
| */ |
| @Override |
| public void flush() { |
| log.trace("Flushing accumulated records in producer."); |
| this.accumulator.beginFlush(); |
| this.sender.wakeup(); |
| try { |
| this.accumulator.awaitFlushCompletion(); |
| } catch (InterruptedException e) { |
| throw new InterruptException("Flush interrupted.", e); |
| } |
| } |
| |
| /** |
| * Get the partition metadata for the give topic. This can be used for custom partitioning. |
| * @throws InterruptException If the thread is interrupted while blocked |
| */ |
| @Override |
| public List<PartitionInfo> partitionsFor(String topic) { |
| try { |
| waitOnMetadata(topic, this.maxBlockTimeMs); |
| } catch (InterruptedException e) { |
| throw new InterruptException(e); |
| } |
| return this.metadata.fetch().partitionsForTopic(topic); |
| } |
| |
| /** |
| * Get the full set of internal metrics maintained by the producer. |
| */ |
| @Override |
| public Map<MetricName, ? extends Metric> metrics() { |
| return Collections.unmodifiableMap(this.metrics.metrics()); |
| } |
| |
| /** |
| * Close this producer. This method blocks until all previously sent requests complete. |
| * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>. |
| * <p> |
| * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS) |
| * will be called instead. We do this because the sender thread would otherwise try to join itself and |
| * block forever.</strong> |
| * <p> |
| * |
| * @throws InterruptException If the thread is interrupted while blocked |
| */ |
| @Override |
| public void close() { |
| close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); |
| } |
| |
| /** |
| * This method waits up to <code>timeout</code> for the producer to complete the sending of all incomplete requests. |
| * <p> |
| * If the producer is unable to complete all requests before the timeout expires, this method will fail |
| * any unsent and unacknowledged records immediately. |
| * <p> |
| * If invoked from within a {@link Callback} this method will not block and will be equivalent to |
| * <code>close(0, TimeUnit.MILLISECONDS)</code>. This is done since no further sending will happen while |
| * blocking the I/O thread of the producer. |
| * |
| * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be |
| * non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete. |
| * @param timeUnit The time unit for the <code>timeout</code> |
| * @throws InterruptException If the thread is interrupted while blocked |
| * @throws IllegalArgumentException If the <code>timeout</code> is negative. |
| */ |
| @Override |
| public void close(long timeout, TimeUnit timeUnit) { |
| close(timeout, timeUnit, false); |
| } |
| |
| private void close(long timeout, TimeUnit timeUnit, boolean swallowException) { |
| if (timeout < 0) |
| throw new IllegalArgumentException("The timeout cannot be negative."); |
| |
| log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); |
| // this will keep track of the first encountered exception |
| AtomicReference<Throwable> firstException = new AtomicReference<Throwable>(); |
| boolean invokedFromCallback = Thread.currentThread() == this.ioThread; |
| if (timeout > 0) { |
| if (invokedFromCallback) { |
| log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " + |
| "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout); |
| } else { |
| // Try to close gracefully. |
| if (this.sender != null) |
| this.sender.initiateClose(); |
| if (this.ioThread != null) { |
| try { |
| this.ioThread.join(timeUnit.toMillis(timeout)); |
| } catch (InterruptedException t) { |
| firstException.compareAndSet(null, t); |
| log.error("Interrupted while joining ioThread", t); |
| } |
| } |
| } |
| } |
| |
| if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) { |
| log.info("Proceeding to force close the producer since pending requests could not be completed " + |
| "within timeout {} ms.", timeout); |
| this.sender.forceClose(); |
| // Only join the sender thread when not calling from callback. |
| if (!invokedFromCallback) { |
| try { |
| this.ioThread.join(); |
| } catch (InterruptedException e) { |
| firstException.compareAndSet(null, e); |
| } |
| } |
| } |
| |
| ClientUtils.closeQuietly(interceptors, "producer interceptors", firstException); |
| ClientUtils.closeQuietly(metrics, "producer metrics", firstException); |
| ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException); |
| ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); |
| AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId); |
| log.debug("The Kafka producer has closed."); |
| if (firstException.get() != null && !swallowException) |
| throw new KafkaException("Failed to close kafka producer", firstException.get()); |
| } |
| |
| /** |
| * computes partition for given record. |
| * if the record has partition returns the value otherwise |
| * calls configured partitioner class to compute the partition. |
| */ |
| private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) { |
| Integer partition = record.partition(); |
| if (partition != null) { |
| List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); |
| int lastPartition = partitions.size() - 1; |
| // they have given us a partition, use it |
| if (partition < 0 || partition > lastPartition) { |
| throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition)); |
| } |
| return partition; |
| } |
| return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, |
| cluster); |
| } |
| |
| private static class FutureFailure implements Future<RecordMetadata> { |
| |
| private final ExecutionException exception; |
| |
| public FutureFailure(Exception exception) { |
| this.exception = new ExecutionException(exception); |
| } |
| |
| @Override |
| public boolean cancel(boolean interrupt) { |
| return false; |
| } |
| |
| @Override |
| public RecordMetadata get() throws ExecutionException { |
| throw this.exception; |
| } |
| |
| @Override |
| public RecordMetadata get(long timeout, TimeUnit unit) throws ExecutionException { |
| throw this.exception; |
| } |
| |
| @Override |
| public boolean isCancelled() { |
| return false; |
| } |
| |
| @Override |
| public boolean isDone() { |
| return true; |
| } |
| |
| } |
| |
| /** |
| * A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and |
| * notifies producer interceptors about the request completion. |
| */ |
| private static class InterceptorCallback<K, V> implements Callback { |
| private final Callback userCallback; |
| private final ProducerInterceptors<K, V> interceptors; |
| private final TopicPartition tp; |
| |
| public InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, |
| TopicPartition tp) { |
| this.userCallback = userCallback; |
| this.interceptors = interceptors; |
| this.tp = tp; |
| } |
| |
| public void onCompletion(RecordMetadata metadata, Exception exception) { |
| if (this.interceptors != null) { |
| if (metadata == null) { |
| this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, Record.NO_TIMESTAMP, -1, -1, -1), |
| exception); |
| } else { |
| this.interceptors.onAcknowledgement(metadata, exception); |
| } |
| } |
| if (this.userCallback != null) |
| this.userCallback.onCompletion(metadata, exception); |
| } |
| } |
| } |