| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.twill.internal.kafka.client; |
| |
| import com.google.common.base.Objects; |
| import com.google.common.collect.Lists; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import kafka.javaapi.producer.Producer; |
| import kafka.producer.KeyedMessage; |
| import kafka.producer.ProducerConfig; |
| import org.apache.twill.common.Cancellable; |
| import org.apache.twill.common.Threads; |
| import org.apache.twill.kafka.client.BrokerService; |
| import org.apache.twill.kafka.client.Compression; |
| import org.apache.twill.kafka.client.KafkaPublisher; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.nio.ByteBuffer; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * Implementation of {@link KafkaPublisher} using the kafka scala-java api. |
| */ |
| final class SimpleKafkaPublisher implements KafkaPublisher { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaPublisher.class); |
| private static final int MAX_MESSAGE_BYTES = 1024 * 1024 * 10; |
| |
| private final BrokerService brokerService; |
| private final Ack ack; |
| private final Compression compression; |
| private final AtomicReference<Producer<Integer, ByteBuffer>> producer; |
| private final AtomicBoolean listenerCancelled; |
| |
| SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) { |
| this.brokerService = brokerService; |
| this.ack = ack; |
| this.compression = compression; |
| this.producer = new AtomicReference<>(); |
| this.listenerCancelled = new AtomicBoolean(false); |
| } |
| |
| /** |
| * Start the publisher. This method must be called before other methods. This method is only to be called |
| * by KafkaClientService who own this object. |
| * @return A Cancellable for closing this publish. |
| */ |
| Cancellable start() { |
| ExecutorService listenerExecutor |
| = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("kafka-publisher")); |
| |
| // Listen to changes in broker list |
| final BrokerListChangeListener listener = new BrokerListChangeListener(listenerCancelled, producer, |
| ack, compression); |
| Cancellable cancelChangeListener = brokerService.addChangeListener(listener, listenerExecutor); |
| |
| // Invoke the change listener at least once. Since every call to the listener is through the single thread |
| // executor, there is no race and for sure the listener always see the latest change, either through this call |
| // or from the BrokerService callback. |
| Future<?> completion = listenerExecutor.submit(new Runnable() { |
| @Override |
| public void run() { |
| listener.changed(brokerService); |
| } |
| }); |
| |
| Futures.getUnchecked(completion); |
| return new ProducerCancellable(listenerExecutor, listenerCancelled, cancelChangeListener, producer); |
| } |
| |
| @Override |
| public Preparer prepare(String topic) { |
| return new SimplePreparer(topic); |
| } |
| |
| private final class SimplePreparer implements Preparer { |
| |
| private final String topic; |
| private final List<KeyedMessage<Integer, ByteBuffer>> messages; |
| |
| private SimplePreparer(String topic) { |
| this.topic = topic; |
| this.messages = Lists.newLinkedList(); |
| } |
| |
| @Override |
| public Preparer add(ByteBuffer message, Object partitionKey) { |
| messages.add(new KeyedMessage<>(topic, Math.abs(partitionKey.hashCode()), message)); |
| return this; |
| } |
| |
| @Override |
| public ListenableFuture<Integer> send() { |
| try { |
| int size = messages.size(); |
| Producer<Integer, ByteBuffer> kafkaProducer = producer.get(); |
| if (kafkaProducer == null) { |
| return Futures.immediateFailedFuture(new IllegalStateException("No kafka producer available.")); |
| } |
| kafkaProducer.send(messages); |
| return Futures.immediateFuture(size); |
| } catch (Exception e) { |
| return Futures.immediateFailedFuture(e); |
| } finally { |
| messages.clear(); |
| } |
| } |
| } |
| |
| /** |
| * Listener for watching for changes in broker list. |
| * This needs to be a static class so that no reference to the publisher instance is held in order for |
| * weak reference inside ZKBrokerService to publish works and be able to GC the Publisher instance and hence |
| * closing the underlying kafka producer. |
| */ |
| private static final class BrokerListChangeListener extends BrokerService.BrokerChangeListener { |
| |
| private final AtomicBoolean listenerCancelled; |
| private final AtomicReference<Producer<Integer, ByteBuffer>> producer; |
| private final Ack ack; |
| private final Compression compression; |
| private String brokerList; |
| |
| private BrokerListChangeListener(AtomicBoolean listenerCancelled, |
| AtomicReference<Producer<Integer, ByteBuffer>> producer, |
| Ack ack, Compression compression) { |
| this.listenerCancelled = listenerCancelled; |
| this.producer = producer; |
| this.ack = ack; |
| this.compression = compression; |
| } |
| |
| @Override |
| public void changed(BrokerService brokerService) { |
| if (listenerCancelled.get()) { |
| return; |
| } |
| |
| String newBrokerList = brokerService.getBrokerList(); |
| |
| // If there is no change, whether it is empty or not, just return |
| if (Objects.equal(brokerList, newBrokerList)) { |
| return; |
| } |
| |
| Producer<Integer, ByteBuffer> newProducer = null; |
| if (!newBrokerList.isEmpty()) { |
| Properties props = new Properties(); |
| props.put("metadata.broker.list", newBrokerList); |
| props.put("serializer.class", ByteBufferEncoder.class.getName()); |
| props.put("key.serializer.class", IntegerEncoder.class.getName()); |
| props.put("request.required.acks", Integer.toString(ack.getAck())); |
| props.put("compression.codec", compression.getCodec()); |
| props.put("message.max.bytes", Integer.toString(MAX_MESSAGE_BYTES)); |
| |
| ProducerConfig config = new ProducerConfig(props); |
| newProducer = new Producer<>(config); |
| } |
| |
| // If the broker list is empty, the producer will be set to null |
| Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(newProducer); |
| if (oldProducer != null) { |
| oldProducer.close(); |
| } |
| |
| if (newBrokerList.isEmpty()) { |
| LOG.warn("Empty Kafka producer broker list, publish will fail."); |
| } else { |
| LOG.info("Updated Kafka producer broker list: {}", newBrokerList); |
| } |
| brokerList = newBrokerList; |
| } |
| } |
| |
| /** |
| * For stopping and releasing resource for the publisher. This class shouldn't hold any references to the |
| * Publisher class. |
| */ |
| private static final class ProducerCancellable implements Cancellable, Runnable { |
| private final ExecutorService executor; |
| private final AtomicBoolean listenerCancelled; |
| private final Cancellable cancelChangeListener; |
| private final AtomicReference<Producer<Integer, ByteBuffer>> producer; |
| |
| private ProducerCancellable(ExecutorService executor, AtomicBoolean listenerCancelled, |
| Cancellable cancelChangeListener, |
| AtomicReference<Producer<Integer, ByteBuffer>> producer) { |
| this.executor = executor; |
| this.listenerCancelled = listenerCancelled; |
| this.cancelChangeListener = cancelChangeListener; |
| this.producer = producer; |
| } |
| |
| @Override |
| public void cancel() { |
| if (listenerCancelled.compareAndSet(false, true)) { |
| executor.execute(this); |
| } |
| } |
| |
| @Override |
| public void run() { |
| // Call from cancel() through executor only. |
| cancelChangeListener.cancel(); |
| Producer<Integer, ByteBuffer> kafkaProducer = producer.get(); |
| kafkaProducer.close(); |
| executor.shutdownNow(); |
| } |
| } |
| } |