Kafka bolt (#3324)
* update KafkaSpout
* Revert "update KafkaSpout"
This reverts commit dc64e9ad
* implement a Kafka Bolt
* add unit tests for Effective Once and At Most Once mode
diff --git a/contrib/bolts/kafka/src/java/BUILD b/contrib/bolts/kafka/src/java/BUILD
new file mode 100644
index 0000000..1df060c
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/BUILD
@@ -0,0 +1,29 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+load("//tools/rules:build_defs.bzl", "DOCLINT_HTML_AND_SYNTAX")
+load("//tools/rules:javadoc.bzl", "java_doc")
+
+java_doc(
+ name = "heron-kafka-bolt-javadoc",
+ libs = [":heron-kafka-bolt-java"],
+ pkgs = ["org/apache/heron/bolts/kafka"],
+ title = "Kafka Bolt Documentation",
+)
+
+kafka_bolt_deps = [
+ "//storm-compatibility/src/java:storm-compatibility-java-neverlink",
+ "//heron/api/src/java:api-java-low-level",
+ "//heron/common/src/java:basics-java",
+ "//heron/common/src/java:config-java",
+ "//third_party/java:logging",
+ "@org_apache_kafka_kafka_clients//jar",
+]
+
+java_library(
+ name = "heron-kafka-bolt-java",
+ srcs = glob(["org/apache/heron/bolts/kafka/**/*.java"]),
+ javacopts = DOCLINT_HTML_AND_SYNTAX,
+ deps = kafka_bolt_deps,
+)
\ No newline at end of file
diff --git a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactory.java b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactory.java
new file mode 100644
index 0000000..3fdd223
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+
+@SuppressWarnings("unused")
+public class DefaultKafkaProducerFactory<K, V> implements KafkaProducerFactory<K, V> {
+ private static final long serialVersionUID = -2184222924531815883L;
+ private Map<String, Object> configs;
+
+ public DefaultKafkaProducerFactory(Map<String, Object> configs) {
+ this.configs = configs;
+ }
+
+ @Override
+ public Producer<K, V> create() {
+ return new KafkaProducer<>(configs);
+ }
+}
diff --git a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaBolt.java b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaBolt.java
new file mode 100644
index 0000000..95ce3f8
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaBolt.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
+
+@SuppressWarnings("unused")
+public class KafkaBolt<K, V> extends BaseRichBolt {
+ private static final long serialVersionUID = -3301619269473733618L;
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
+ private KafkaProducerFactory<K, V> kafkaProducerFactory;
+ private TupleTransformer<K, V> tupleTransformer;
+ private transient Producer<K, V> producer;
+ private Config.TopologyReliabilityMode topologyReliabilityMode;
+ private transient OutputCollector outputCollector;
+
+ @SuppressWarnings("WeakerAccess")
+ public KafkaBolt(KafkaProducerFactory<K, V> kafkaProducerFactory,
+ TupleTransformer<K, V> tupleTransformer) {
+ this.kafkaProducerFactory = kafkaProducerFactory;
+ this.tupleTransformer = tupleTransformer;
+ }
+
+ @Override
+ public void prepare(Map<String, Object> heronConf, TopologyContext context,
+ OutputCollector collector) {
+ topologyReliabilityMode = Config.TopologyReliabilityMode
+ .valueOf(heronConf.getOrDefault(Config.TOPOLOGY_RELIABILITY_MODE, ATMOST_ONCE).toString());
+ producer = kafkaProducerFactory.create();
+ outputCollector = collector;
+ }
+
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ if (producer != null) {
+ producer.close();
+ }
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ ProducerRecord<K, V> producerRecord = new ProducerRecord<>(
+ tupleTransformer.getTopicName(input),
+ tupleTransformer.transformToKey(input),
+ tupleTransformer.transformToValue(input));
+ if (topologyReliabilityMode == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+ try {
+ producer.send(producerRecord).get();
+ } catch (InterruptedException e) {
+ LOG.error("interrupted while waiting for the record to be sent", e);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ LOG.error("error has occurred when sending record to Kafka", e);
+ throw new KafkaException(e);
+ }
+ } else {
+ producer.send(producerRecord, (recordMetadata, e) -> {
+ if (e != null) {
+ LOG.error("error has occurred when sending record to Kafka", e);
+ if (topologyReliabilityMode == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ outputCollector.fail(input);
+ }
+ } else if (topologyReliabilityMode == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+ outputCollector.ack(input);
+ }
+ });
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ //Kafka bolt does not emit anything
+ }
+}
diff --git a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaProducerFactory.java b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaProducerFactory.java
new file mode 100644
index 0000000..6a7ba65
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaProducerFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.producer.Producer;
+
+public interface KafkaProducerFactory<K, V> extends Serializable {
+ Producer<K, V> create();
+}
diff --git a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/TupleTransformer.java b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/TupleTransformer.java
new file mode 100644
index 0000000..8611aef
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/TupleTransformer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.io.Serializable;
+
+import org.apache.heron.api.tuple.Tuple;
+
+public interface TupleTransformer<K, V> extends Serializable {
+
+ default K transformToKey(Tuple tuple) {
+ if (tuple.contains("key")) {
+ //noinspection unchecked
+ return (K) tuple.getValueByField("key");
+ }
+ return null;
+ }
+
+ default V transformToValue(Tuple tuple) {
+ if (tuple.contains("value")) {
+ //noinspection unchecked
+ return (V) tuple.getValueByField("value");
+ }
+ return null;
+ }
+
+ String getTopicName(Tuple tuple);
+}
diff --git a/contrib/bolts/kafka/test/java/BUILD b/contrib/bolts/kafka/test/java/BUILD
new file mode 100644
index 0000000..e713a6c
--- /dev/null
+++ b/contrib/bolts/kafka/test/java/BUILD
@@ -0,0 +1,23 @@
+heron_kafka_bolts_test_dep = [
+ "//contrib/bolts/kafka/src/java:heron-kafka-bolt-java",
+ "//heron/api/src/java:api-java-low-level",
+ "//heron/common/src/java:basics-java",
+ "//heron/common/src/java:config-java",
+ "//third_party/java:junit4",
+ "@org_apache_kafka_kafka_clients//jar",
+ "@org_mockito_mockito_all//jar",
+]
+
+java_test(
+ name = "KafkaBoltTest",
+ srcs = ["org/apache/heron/bolts/kafka/KafkaBoltTest.java"],
+ test_class = "org.apache.heron.bolts.kafka.KafkaBoltTest",
+ deps = heron_kafka_bolts_test_dep,
+)
+
+java_test(
+ name = "DefaultKafkaProducerFactoryTest",
+ srcs = ["org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java"],
+ test_class = "org.apache.heron.bolts.kafka.DefaultKafkaProducerFactoryTest",
+ deps = heron_kafka_bolts_test_dep,
+)
diff --git a/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java
new file mode 100644
index 0000000..18a573b
--- /dev/null
+++ b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultKafkaProducerFactoryTest {
+
+ @Test
+ public void create() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ assertNotNull(new DefaultKafkaProducerFactory<>(configs).create());
+ }
+}
diff --git a/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/KafkaBoltTest.java b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/KafkaBoltTest.java
new file mode 100644
index 0000000..9572b11
--- /dev/null
+++ b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/KafkaBoltTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
+import static org.apache.heron.api.Config.TopologyReliabilityMode.EFFECTIVELY_ONCE;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBoltTest {
+ @Mock
+ private KafkaProducerFactory<String, byte[]> kafkaProducerFactory;
+ @Mock
+ private Producer<String, byte[]> producer;
+ @Mock
+ private TupleTransformer<String, byte[]> tupleTransformer;
+ @Mock
+ private OutputCollector outputCollector;
+ @Mock
+ private Tuple tuple;
+ @Mock
+ private Future<RecordMetadata> future;
+ private KafkaBolt<String, byte[]> kafkaBolt;
+
+ @Before
+ public void setUp() {
+ when(kafkaProducerFactory.create()).thenReturn(producer);
+ kafkaBolt = new KafkaBolt<>(kafkaProducerFactory, tupleTransformer);
+ }
+
+ @Test
+ public void cleanup() {
+ kafkaBolt.prepare(Collections.emptyMap(), null, outputCollector);
+ kafkaBolt.cleanup();
+ verify(producer).close();
+ }
+
+ @Test
+ public void executeATLEASTONCE() {
+ kafkaBolt.prepare(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, ATLEAST_ONCE),
+ null, outputCollector);
+ when(tupleTransformer.transformToKey(tuple)).thenReturn("key");
+ byte[] value = new byte[]{1, 2, 3};
+ when(tupleTransformer.transformToValue(tuple)).thenReturn(value);
+ when(tupleTransformer.getTopicName(tuple)).thenReturn("topic");
+
+ ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("topic", "key", value);
+ when(producer.send(eq(producerRecord), any(Callback.class)))
+ .then((Answer<Future<RecordMetadata>>) invocationOnMock -> {
+ invocationOnMock.getArgumentAt(1, Callback.class)
+ .onCompletion(new RecordMetadata(null, 0, 0, 0, null, 0, 0), null);
+ return future;
+ });
+ kafkaBolt.execute(tuple);
+ verify(outputCollector).ack(tuple);
+
+ when(producer.send(eq(producerRecord), any(Callback.class)))
+ .then((Answer<Future<RecordMetadata>>) invocationOnMock -> {
+ invocationOnMock.getArgumentAt(1, Callback.class)
+ .onCompletion(new RecordMetadata(null, 0, 0, 0, null, 0, 0), new Exception());
+ return future;
+ });
+ kafkaBolt.execute(tuple);
+ verify(outputCollector).fail(tuple);
+ }
+
+ @Test(expected = KafkaException.class)
+ public void executeEFFECTIVEONCE() throws ExecutionException, InterruptedException {
+ kafkaBolt.prepare(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, EFFECTIVELY_ONCE),
+ null, outputCollector);
+ when(tupleTransformer.transformToKey(tuple)).thenReturn("key");
+ byte[] value = new byte[]{1, 2, 3};
+ when(tupleTransformer.transformToValue(tuple)).thenReturn(value);
+ when(tupleTransformer.getTopicName(tuple)).thenReturn("topic");
+
+ ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("topic", "key", value);
+ when(producer.send(producerRecord)).thenReturn(future);
+ kafkaBolt.execute(tuple);
+ verify(future).get();
+
+ when(future.get()).thenThrow(ExecutionException.class);
+ kafkaBolt.execute(tuple);
+ }
+
+ @Test
+ public void executeATMOSTONCE() {
+ kafkaBolt.prepare(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, ATMOST_ONCE),
+ null, outputCollector);
+ when(tupleTransformer.transformToKey(tuple)).thenReturn("key");
+ byte[] value = new byte[]{1, 2, 3};
+ when(tupleTransformer.transformToValue(tuple)).thenReturn(value);
+ when(tupleTransformer.getTopicName(tuple)).thenReturn("topic");
+
+ ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("topic", "key", value);
+ kafkaBolt.execute(tuple);
+ verify(producer).send(eq(producerRecord), any(Callback.class));
+ }
+}