Kafka bolt (#3324)

* update KafkaSpout

* Revert "update KafkaSpout"

This reverts commit dc64e9ad

* implement a Kafka Bolt

* add unit tests for Effective Once and At Most Once mode
diff --git a/contrib/bolts/kafka/src/java/BUILD b/contrib/bolts/kafka/src/java/BUILD
new file mode 100644
index 0000000..1df060c
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/BUILD
@@ -0,0 +1,29 @@
+licenses(["notice"])
+
+package(default_visibility = ["//visibility:public"])
+
+load("//tools/rules:build_defs.bzl", "DOCLINT_HTML_AND_SYNTAX")
+load("//tools/rules:javadoc.bzl", "java_doc")
+
+java_doc(
+    name = "heron-kafka-bolt-javadoc",
+    libs = [":heron-kafka-bolt-java"],
+    pkgs = ["org/apache/heron/bolts/kafka"],
+    title = "Kafka Bolt Documentation",
+)
+
+kafka_bolt_deps = [
+    "//storm-compatibility/src/java:storm-compatibility-java-neverlink",
+    "//heron/api/src/java:api-java-low-level",
+    "//heron/common/src/java:basics-java",
+    "//heron/common/src/java:config-java",
+    "//third_party/java:logging",
+    "@org_apache_kafka_kafka_clients//jar",
+]
+
+java_library(
+    name = "heron-kafka-bolt-java",
+    srcs = glob(["org/apache/heron/bolts/kafka/**/*.java"]),
+    javacopts = DOCLINT_HTML_AND_SYNTAX,
+    deps = kafka_bolt_deps,
+)
\ No newline at end of file
diff --git a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactory.java b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactory.java
new file mode 100644
index 0000000..3fdd223
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactory.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+
+@SuppressWarnings("unused")
+public class DefaultKafkaProducerFactory<K, V> implements KafkaProducerFactory<K, V> {
+  private static final long serialVersionUID = -2184222924531815883L;
+  private Map<String, Object> configs;
+
+  public DefaultKafkaProducerFactory(Map<String, Object> configs) {
+    this.configs = configs;
+  }
+
+  @Override
+  public Producer<K, V> create() {
+    return new KafkaProducer<>(configs);
+  }
+}
diff --git a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaBolt.java b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaBolt.java
new file mode 100644
index 0000000..95ce3f8
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaBolt.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.BaseRichBolt;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.topology.OutputFieldsDeclarer;
+import org.apache.heron.api.topology.TopologyContext;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
+
+@SuppressWarnings("unused")
+public class KafkaBolt<K, V> extends BaseRichBolt {
+  private static final long serialVersionUID = -3301619269473733618L;
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
+  private KafkaProducerFactory<K, V> kafkaProducerFactory;
+  private TupleTransformer<K, V> tupleTransformer;
+  private transient Producer<K, V> producer;
+  private Config.TopologyReliabilityMode topologyReliabilityMode;
+  private transient OutputCollector outputCollector;
+
+  @SuppressWarnings("WeakerAccess")
+  public KafkaBolt(KafkaProducerFactory<K, V> kafkaProducerFactory,
+                   TupleTransformer<K, V> tupleTransformer) {
+    this.kafkaProducerFactory = kafkaProducerFactory;
+    this.tupleTransformer = tupleTransformer;
+  }
+
+  @Override
+  public void prepare(Map<String, Object> heronConf, TopologyContext context,
+                      OutputCollector collector) {
+    topologyReliabilityMode = Config.TopologyReliabilityMode
+        .valueOf(heronConf.getOrDefault(Config.TOPOLOGY_RELIABILITY_MODE, ATMOST_ONCE).toString());
+    producer = kafkaProducerFactory.create();
+    outputCollector = collector;
+  }
+
+  @Override
+  public void cleanup() {
+    super.cleanup();
+    if (producer != null) {
+      producer.close();
+    }
+  }
+
+  @Override
+  public void execute(Tuple input) {
+    ProducerRecord<K, V> producerRecord = new ProducerRecord<>(
+        tupleTransformer.getTopicName(input),
+        tupleTransformer.transformToKey(input),
+        tupleTransformer.transformToValue(input));
+    if (topologyReliabilityMode == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
+      try {
+        producer.send(producerRecord).get();
+      } catch (InterruptedException e) {
+        LOG.error("interrupted while waiting for the record to be sent", e);
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException e) {
+        LOG.error("error has occurred when sending record to Kafka", e);
+        throw new KafkaException(e);
+      }
+    } else {
+      producer.send(producerRecord, (recordMetadata, e) -> {
+        if (e != null) {
+          LOG.error("error has occurred when sending record to Kafka", e);
+          if (topologyReliabilityMode == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+            outputCollector.fail(input);
+          }
+        } else if (topologyReliabilityMode == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
+          outputCollector.ack(input);
+        }
+      });
+    }
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    //Kafka bolt does not emit anything
+  }
+}
diff --git a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaProducerFactory.java b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaProducerFactory.java
new file mode 100644
index 0000000..6a7ba65
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/KafkaProducerFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.producer.Producer;
+
+public interface KafkaProducerFactory<K, V> extends Serializable {
+  Producer<K, V> create();
+}
diff --git a/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/TupleTransformer.java b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/TupleTransformer.java
new file mode 100644
index 0000000..8611aef
--- /dev/null
+++ b/contrib/bolts/kafka/src/java/org/apache/heron/bolts/kafka/TupleTransformer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.io.Serializable;
+
+import org.apache.heron.api.tuple.Tuple;
+
+public interface TupleTransformer<K, V> extends Serializable {
+
+  default K transformToKey(Tuple tuple) {
+    if (tuple.contains("key")) {
+      //noinspection unchecked
+      return (K) tuple.getValueByField("key");
+    }
+    return null;
+  }
+
+  default V transformToValue(Tuple tuple) {
+    if (tuple.contains("value")) {
+      //noinspection unchecked
+      return (V) tuple.getValueByField("value");
+    }
+    return null;
+  }
+
+  String getTopicName(Tuple tuple);
+}
diff --git a/contrib/bolts/kafka/test/java/BUILD b/contrib/bolts/kafka/test/java/BUILD
new file mode 100644
index 0000000..e713a6c
--- /dev/null
+++ b/contrib/bolts/kafka/test/java/BUILD
@@ -0,0 +1,23 @@
+heron_kafka_bolts_test_dep = [
+    "//contrib/bolts/kafka/src/java:heron-kafka-bolt-java",
+    "//heron/api/src/java:api-java-low-level",
+    "//heron/common/src/java:basics-java",
+    "//heron/common/src/java:config-java",
+    "//third_party/java:junit4",
+    "@org_apache_kafka_kafka_clients//jar",
+    "@org_mockito_mockito_all//jar",
+]
+
+java_test(
+    name = "KafkaBoltTest",
+    srcs = ["org/apache/heron/bolts/kafka/KafkaBoltTest.java"],
+    test_class = "org.apache.heron.bolts.kafka.KafkaBoltTest",
+    deps = heron_kafka_bolts_test_dep,
+)
+
+java_test(
+    name = "DefaultKafkaProducerFactoryTest",
+    srcs = ["org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java"],
+    test_class = "org.apache.heron.bolts.kafka.DefaultKafkaProducerFactoryTest",
+    deps = heron_kafka_bolts_test_dep,
+)
diff --git a/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java
new file mode 100644
index 0000000..18a573b
--- /dev/null
+++ b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/DefaultKafkaProducerFactoryTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import static org.junit.Assert.assertNotNull;
+
+public class DefaultKafkaProducerFactoryTest {
+
+  @Test
+  public void create() {
+    Map<String, Object> configs = new HashMap<>();
+    configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    assertNotNull(new DefaultKafkaProducerFactory<>(configs).create());
+  }
+}
diff --git a/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/KafkaBoltTest.java b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/KafkaBoltTest.java
new file mode 100644
index 0000000..9572b11
--- /dev/null
+++ b/contrib/bolts/kafka/test/java/org/apache/heron/bolts/kafka/KafkaBoltTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.heron.bolts.kafka;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import org.apache.heron.api.Config;
+import org.apache.heron.api.bolt.OutputCollector;
+import org.apache.heron.api.tuple.Tuple;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATLEAST_ONCE;
+import static org.apache.heron.api.Config.TopologyReliabilityMode.ATMOST_ONCE;
+import static org.apache.heron.api.Config.TopologyReliabilityMode.EFFECTIVELY_ONCE;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaBoltTest {
+  @Mock
+  private KafkaProducerFactory<String, byte[]> kafkaProducerFactory;
+  @Mock
+  private Producer<String, byte[]> producer;
+  @Mock
+  private TupleTransformer<String, byte[]> tupleTransformer;
+  @Mock
+  private OutputCollector outputCollector;
+  @Mock
+  private Tuple tuple;
+  @Mock
+  private Future<RecordMetadata> future;
+  private KafkaBolt<String, byte[]> kafkaBolt;
+
+  @Before
+  public void setUp() {
+    when(kafkaProducerFactory.create()).thenReturn(producer);
+    kafkaBolt = new KafkaBolt<>(kafkaProducerFactory, tupleTransformer);
+  }
+
+  @Test
+  public void cleanup() {
+    kafkaBolt.prepare(Collections.emptyMap(), null, outputCollector);
+    kafkaBolt.cleanup();
+    verify(producer).close();
+  }
+
+  @Test
+  public void executeATLEASTONCE() {
+    kafkaBolt.prepare(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, ATLEAST_ONCE),
+        null, outputCollector);
+    when(tupleTransformer.transformToKey(tuple)).thenReturn("key");
+    byte[] value = new byte[]{1, 2, 3};
+    when(tupleTransformer.transformToValue(tuple)).thenReturn(value);
+    when(tupleTransformer.getTopicName(tuple)).thenReturn("topic");
+
+    ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("topic", "key", value);
+    when(producer.send(eq(producerRecord), any(Callback.class)))
+        .then((Answer<Future<RecordMetadata>>) invocationOnMock -> {
+          invocationOnMock.getArgumentAt(1, Callback.class)
+              .onCompletion(new RecordMetadata(null, 0, 0, 0, null, 0, 0), null);
+          return future;
+        });
+    kafkaBolt.execute(tuple);
+    verify(outputCollector).ack(tuple);
+
+    when(producer.send(eq(producerRecord), any(Callback.class)))
+        .then((Answer<Future<RecordMetadata>>) invocationOnMock -> {
+          invocationOnMock.getArgumentAt(1, Callback.class)
+              .onCompletion(new RecordMetadata(null, 0, 0, 0, null, 0, 0), new Exception());
+          return future;
+        });
+    kafkaBolt.execute(tuple);
+    verify(outputCollector).fail(tuple);
+  }
+
+  @Test(expected = KafkaException.class)
+  public void executeEFFECTIVEONCE() throws ExecutionException, InterruptedException {
+    kafkaBolt.prepare(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, EFFECTIVELY_ONCE),
+        null, outputCollector);
+    when(tupleTransformer.transformToKey(tuple)).thenReturn("key");
+    byte[] value = new byte[]{1, 2, 3};
+    when(tupleTransformer.transformToValue(tuple)).thenReturn(value);
+    when(tupleTransformer.getTopicName(tuple)).thenReturn("topic");
+
+    ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("topic", "key", value);
+    when(producer.send(producerRecord)).thenReturn(future);
+    kafkaBolt.execute(tuple);
+    verify(future).get();
+
+    when(future.get()).thenThrow(ExecutionException.class);
+    kafkaBolt.execute(tuple);
+  }
+
+  @Test
+  public void executeATMOSTONCE() {
+    kafkaBolt.prepare(Collections.singletonMap(Config.TOPOLOGY_RELIABILITY_MODE, ATMOST_ONCE),
+        null, outputCollector);
+    when(tupleTransformer.transformToKey(tuple)).thenReturn("key");
+    byte[] value = new byte[]{1, 2, 3};
+    when(tupleTransformer.transformToValue(tuple)).thenReturn(value);
+    when(tupleTransformer.getTopicName(tuple)).thenReturn("topic");
+
+    ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>("topic", "key", value);
+    kafkaBolt.execute(tuple);
+    verify(producer).send(eq(producerRecord), any(Callback.class));
+  }
+}