Merge branch 'NIFI-1676'
diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index 7c90342..46891a4 100644
--- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -169,8 +169,18 @@
 
     @Override
     public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
-        final int flowFilesBinned = binFlowFiles(context, sessionFactory);
-        getLogger().debug("Binned {} FlowFiles", new Object[]{flowFilesBinned});
+        final int totalBinCount = binManager.getBinCount() + readyBins.size();
+        final int maxBinCount = context.getProperty(MAX_BIN_COUNT).asInteger();
+        final int flowFilesBinned;
+
+        if (totalBinCount < maxBinCount) {
+            flowFilesBinned = binFlowFiles(context, sessionFactory);
+            getLogger().debug("Binned {} FlowFiles", new Object[] {flowFilesBinned});
+        } else {
+            flowFilesBinned = 0;
+            getLogger().debug("Will not bin any FlowFiles because {} bins already exist;"
+                + "will wait until bins have been emptied before any more are created", new Object[] {totalBinCount});
+        }
 
         if (!isScheduled()) {
             return;
@@ -194,7 +204,7 @@
         // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
         // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
         // bins. So we may as well expire it now.
-        if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
+        if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
             final Bin bin = binManager.removeOldestBin();
             if (bin != null) {
                 added++;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 7057dff..e06befb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -323,9 +323,6 @@
     @OnScheduled
     public void schedule(ProcessContext context) {
         this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
-        if (this.executor == null || this.executor.isShutdown()) {
-            this.executor = Executors.newCachedThreadPool();
-        }
     }
 
     @Override
@@ -335,6 +332,9 @@
          * of onTrigger. Will be reset to 'false' in the event of exception
          */
         synchronized (this.consumerStreamsReady) {
+            if (this.executor == null || this.executor.isShutdown()) {
+                this.executor = Executors.newCachedThreadPool();
+            }
             if (!this.consumerStreamsReady.get()) {
                 Future<Void> f = this.executor.submit(new Callable<Void>() {
                     @Override
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
new file mode 100644
index 0000000..e116978
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.producer.KeyedMessage;
+import kafka.producer.Partitioner;
+
+/**
+ * Wrapper over {@link KafkaProducer} to assist {@link PutKafka} processor with
+ * sending content of {@link FlowFile}s to Kafka.
+ */
+public class KafkaPublisher implements AutoCloseable {
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);
+
+    private final KafkaProducer<byte[], byte[]> producer;
+
+    private final Partitioner partitioner;
+
+    private final long ackWaitTime;
+
+    private ProcessorLog processLog;
+
+    /**
+     * Creates an instance of this class as well as the instance of the
+     * corresponding Kafka {@link KafkaProducer} using provided Kafka
+     * configuration properties.
+     */
+    KafkaPublisher(Properties kafkaProperties) {
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        this.producer = new KafkaProducer<byte[], byte[]>(kafkaProperties);
+        this.ackWaitTime = Long.parseLong(kafkaProperties.getProperty(ProducerConfig.TIMEOUT_CONFIG)) * 2;
+        try {
+            if (kafkaProperties.containsKey("partitioner.class")){
+                this.partitioner = (Partitioner) Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance();
+            } else {
+                this.partitioner = null;
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to create partitioner", e);
+        }
+    }
+
+    /**
+     *
+     */
+    void setProcessLog(ProcessorLog processLog) {
+        this.processLog = processLog;
+    }
+
+    /**
+     * Publishes messages to Kafka topic. It supports three publishing
+     * mechanisms.
+     * <ul>
+     * <li>Sending the entire content stream as a single Kafka message.</li>
+     * <li>Splitting the incoming content stream into chunks and sending
+     * individual chunks as separate Kafka messages.</li>
+     * <li>Splitting the incoming content stream into chunks and sending only
+     * the chunks that have failed previously @see
+     * {@link SplittableMessageContext#getFailedSegments()}.</li>
+     * </ul>
+     * This method assumes content stream affinity where it is expected that the
+     * content stream that represents the same Kafka message(s) will remain the
+     * same across possible retries. This is required specifically for cases
+     * where delimiter is used and a single content stream may represent
+     * multiple Kafka messages. The failed segment list will keep the index of
+     * of each content stream segment that had failed to be sent to Kafka, so
+     * upon retry only the failed segments are sent.
+     *
+     * @param messageContext
+     *            instance of {@link SplittableMessageContext} which hold
+     *            context information about the message to be sent
+     * @param contentStream
+     *            instance of open {@link InputStream} carrying the content of
+     *            the message(s) to be send to Kafka
+     * @param partitionKey
+     *            the value of the partition key. Only relevant is user wishes
+     *            to provide a custom partition key instead of relying on
+     *            variety of provided {@link Partitioner}(s)
+     * @return The set containing the failed segment indexes for messages that
+     *         failed to be sent to Kafka.
+     */
+    BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey) {
+        List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
+        BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments();
+        int segmentCounter = 0;
+        StreamScanner scanner = new StreamScanner(contentStream, messageContext.getDelimiterPattern());
+
+        while (scanner.hasNext()) {
+            byte[] content = scanner.next();
+            if (content.length > 0){
+                byte[] key = messageContext.getKeyBytes();
+                String topicName = messageContext.getTopicName();
+                if (partitionKey == null && key != null) {
+                    partitionKey = this.getPartition(key, topicName);
+                }
+                if (prevFailedSegmentIndexes == null || prevFailedSegmentIndexes.get(segmentCounter)) {
+                    ProducerRecord<byte[], byte[]> message = new ProducerRecord<byte[], byte[]>(topicName, partitionKey, key, content);
+                    sendFutures.add(this.toKafka(message));
+                }
+                segmentCounter++;
+            }
+        }
+        scanner.close();
+        return this.processAcks(sendFutures);
+    }
+
+    /**
+     *
+     */
+    private BitSet processAcks(List<Future<RecordMetadata>> sendFutures) {
+        int segmentCounter = 0;
+        BitSet failedSegments = new BitSet();
+        for (Future<RecordMetadata> future : sendFutures) {
+            try {
+                future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                failedSegments.set(segmentCounter);
+                Thread.currentThread().interrupt();
+                logger.warn("Interrupted while waiting for acks from Kafka");
+                if (this.processLog != null) {
+                    this.processLog.warn("Interrupted while waiting for acks from Kafka");
+                }
+            } catch (ExecutionException e) {
+                failedSegments.set(segmentCounter);
+                logger.error("Failed while waiting for acks from Kafka", e);
+                if (this.processLog != null) {
+                    this.processLog.error("Failed while waiting for acks from Kafka", e);
+                }
+            } catch (TimeoutException e) {
+                failedSegments.set(segmentCounter);
+                logger.warn("Timed out while waiting for acks from Kafka");
+                if (this.processLog != null) {
+                    this.processLog.warn("Timed out while waiting for acks from Kafka");
+                }
+            }
+            segmentCounter++;
+        }
+        return failedSegments;
+    }
+
+    /**
+     *
+     */
+    private int getPartition(Object key, String topicName) {
+        int partSize = this.producer.partitionsFor(topicName).size();
+        return this.partitioner.partition(key, partSize);
+    }
+
+    /**
+     * Closes {@link KafkaProducer}
+     */
+    @Override
+    public void close() throws Exception {
+        this.producer.close();
+    }
+
+    /**
+     * Sends the provided {@link KeyedMessage} to Kafka async returning
+     * {@link Future}
+     */
+    private Future<RecordMetadata> toKafka(ProducerRecord<byte[], byte[]> message) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Publishing message to '" + message.topic() + "' topic.");
+        }
+        return this.producer.send(message);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
new file mode 100644
index 0000000..2a851a4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.util.Random;
+
+import kafka.producer.Partitioner;
+
+/**
+ * Collection of implementation of common Kafka {@link Partitioner}s.
+ */
+final public class Partitioners {
+
+    private Partitioners() {
+    }
+    /**
+     * {@link Partitioner} that implements 'round-robin' mechanism which evenly
+     * distributes load between all available partitions.
+     */
+    public static class RoundRobinPartitioner implements Partitioner {
+        private volatile int index;
+
+        @Override
+        public int partition(Object key, int numberOfPartitions) {
+            int partitionIndex = this.next(numberOfPartitions);
+            return partitionIndex;
+        }
+
+        private int next(int numberOfPartitions) {
+            if (index == numberOfPartitions) {
+                index = 0;
+            }
+            int indexToReturn = index++;
+            return indexToReturn;
+        }
+    }
+
+    /**
+     * {@link Partitioner} that implements 'random' mechanism which randomly
+     * distributes the load between all available partitions.
+     */
+    public static class RandomPartitioner implements Partitioner {
+        private final Random random;
+
+        public RandomPartitioner() {
+            this.random = new Random();
+        }
+
+        @Override
+        public int partition(Object key, int numberOfPartitions) {
+            return this.random.nextInt(numberOfPartitions);
+        }
+    }
+
+    /**
+     * {@link Partitioner} that implements 'key hash' mechanism which
+     * distributes the load between all available partitions based on hashing
+     * the value of the key.
+     */
+    public static class HashPartitioner implements Partitioner {
+
+        @Override
+        public int partition(Object key, int numberOfPartitions) {
+            if (key != null) {
+                return (key.hashCode() & Integer.MAX_VALUE) % numberOfPartitions;
+            }
+            return 0;
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index f91099e..4510038 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -20,42 +20,23 @@
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
-import org.apache.kafka.clients.producer.BufferExhaustedException;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -65,326 +46,419 @@
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.stream.io.ByteCountingInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
-import org.apache.nifi.util.LongHolder;
+
+import kafka.producer.DefaultPartitioner;
 
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
+@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
 @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a "
-    + "user-specified delimiter, such as a new-line.")
-@TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed
+        + "user-specified delimiter, such as a new-line.")
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
-            description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+                 description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
         + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be"
         + " overriden with warning message describing the override."
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
-public class PutKafka extends AbstractSessionFactoryProcessor {
+public class PutKafka extends AbstractProcessor {
 
     private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
+
     private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
 
-    public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to"
-        + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
-    public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed"
-        + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than"
-        + " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
-    public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after"
-        + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result"
-        + " in data loss.");
-
+    public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
+            "FlowFile will be routed to"
+                    + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
+    public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
+            "FlowFile will be routed"
+                    + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than"
+                    + " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
+    public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
+            "FlowFile will be routed to success after"
+                    + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result"
+                    + " in data loss.");
 
     /**
      * AllowableValue for sending messages to Kafka without compression
      */
-    public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None", "Compression will not be used for any topic.");
+    public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None",
+            "Compression will not be used for any topic.");
 
     /**
      * AllowableValue for sending messages to Kafka with GZIP compression
      */
-    public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP", "Compress messages using GZIP");
+    public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP",
+            "Compress messages using GZIP");
 
     /**
      * AllowableValue for sending messages to Kafka with Snappy compression
      */
-    public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy");
+    public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy",
+            "Compress messages using Snappy");
 
     static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin",
-        "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, the next Partition to Partition 2, and so on, wrapping as necessary.");
+            "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+                    + "the next Partition to Partition 2, and so on, wrapping as necessary.");
     static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random",
-        "Messages will be assigned to random partitions.");
+            "Messages will be assigned to random partitions.");
     static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined",
-        "The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be assigned to the same partition.");
-
+            "The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be "
+                    + "assigned to the same partition.");
 
     public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
-        .name("Known Brokers")
-        .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
-        .required(true)
-        .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
-        .expressionLanguageSupported(false)
-        .build();
+            .name("Known Brokers")
+            .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
+            .required(true)
+            .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+            .expressionLanguageSupported(false)
+            .build();
     public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
-        .name("Topic Name")
-        .description("The Kafka Topic of interest")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(true)
-        .build();
+            .name("Topic Name")
+            .description("The Kafka Topic of interest")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
     static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder()
-        .name("Partition Strategy")
-        .description("Specifies how messages should be partitioned when sent to Kafka")
-        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING)
-        .defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
-        .required(true)
-        .build();
+            .name("Partition Strategy")
+            .description("Specifies how messages should be partitioned when sent to Kafka")
+            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING)
+            .defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
+            .required(true)
+            .build();
     public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
-        .name("Partition")
-        .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages in the same FlowFile will be sent to the same partition. "
-            + "If a partition is specified but is not valid, then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.")
-        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-        .expressionLanguageSupported(true)
-        .required(false)
-        .build();
+            .name("Partition")
+            .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages "
+                            + "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, "
+                            + "then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
     public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
-        .name("Kafka Key")
-        .description("The Key to use for the Message")
-        .required(false)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(true)
-        .build();
+            .name("Kafka Key")
+            .description("The Key to use for the Message")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
     public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
-        .name("Delivery Guarantee")
-        .description("Specifies the requirement for guaranteeing that a message is sent to Kafka")
-        .required(true)
-        .expressionLanguageSupported(false)
-        .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
-        .defaultValue(DELIVERY_BEST_EFFORT.getValue())
-        .build();
+            .name("Delivery Guarantee")
+            .description("Specifies the requirement for guaranteeing that a message is sent to Kafka").required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
+            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+            .build();
     public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
-        .name("Message Delimiter")
-        .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
-            + "If not specified, the entire content of the FlowFile will be used as a single message. "
-            + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
-            + "sent as a separate Kafka message. Note that if messages are delimited and some messages for a given FlowFile "
-            + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those "
-            + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' "
-            + "relationship.")
-        .required(false)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(true)
-        .build();
+            .name("Message Delimiter")
+            .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+                            + "If not specified, the entire content of the FlowFile will be used as a single message. If specified, "
+                            + "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka "
+                            + "message. Note that if messages are delimited and some messages for a given FlowFile are transferred "
+                            + "successfully while others are not, the messages will be split into individual FlowFiles, such that those "
+                            + "messages that were successfully sent are routed to the 'success' relationship while other messages are "
+                            + "sent to the 'failure' relationship.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
     public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
-        .name("Max Buffer Size")
-        .description("The maximum amount of data to buffer in memory before sending to Kafka")
-        .required(true)
-        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .defaultValue("5 MB")
-        .build();
+            .name("Max Buffer Size")
+            .description("The maximum amount of data to buffer in memory before sending to Kafka")
+            .required(true)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("5 MB")
+            .build();
     static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder()
-        .name("Max Record Size")
-        .description("The maximum size that any individual record can be.")
-        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-        .required(true)
-        .defaultValue("1 MB")
-        .build();
+            .name("Max Record Size")
+            .description("The maximum size that any individual record can be.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR).required(true)
+            .defaultValue("1 MB")
+            .build();
     public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
-        .name("Communications Timeout")
-        .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
-        .required(true)
-        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .defaultValue("30 secs")
-        .build();
+            .name("Communications Timeout")
+            .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .defaultValue("30 secs").build();
     public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
-        .name("Client Name")
-        .description("Client Name to use when communicating with Kafka")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .build();
+            .name("Client Name")
+            .description("Client Name to use when communicating with Kafka")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
     public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
-        .name("Async Batch Size")
-        .displayName("Batch Size")
-        .description("The number of messages to send in one batch. The producer will wait until either this number of messages are ready"
-            + " to send or \"Queue Buffering Max Time\" is reached.")
-        .required(true)
-        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-        .defaultValue("200")
-        .build();
+            .name("Async Batch Size").displayName("Batch Size")
+            .description("The number of messages to send in one batch. The producer will wait until either this number of messages are ready "
+                            + "to send or \"Queue Buffering Max Time\" is reached. NOTE: This property will be ignored unless the 'Message Delimiter' "
+                            + "property is specified.")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("200")
+            .build();
     public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder()
-        .name("Queue Buffering Max Time")
-        .description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms"
-            + " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve"
-            + " throughput but adds message delivery latency due to the buffering.")
-        .required(true)
-        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .defaultValue("5 secs")
-        .build();
+            .name("Queue Buffering Max Time")
+            .description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms"
+                    + " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve"
+                    + " throughput but adds message delivery latency due to the buffering.")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("5 secs")
+            .build();
     public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
-        .name("Compression Codec")
-        .description("This parameter allows you to specify the compression codec for all"
-            + " data generated by this producer.")
-        .required(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY)
-        .defaultValue(COMPRESSION_CODEC_NONE.getValue())
-        .build();
-
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship")
-        .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-        .name("failure")
-        .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
-        .build();
-
-    private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
-    private final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
-    private final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
-
-    private final ConcurrentMap<String, AtomicLong> partitionIndexMap = new ConcurrentHashMap<>();
-
-    private volatile Producer<byte[], byte[]> producer;
-
-    private volatile ExecutorService executor;
-    private volatile long deadlockTimeout;
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
-            .fromPropertyDescriptor(CLIENT_NAME)
-            .defaultValue("NiFi-" + getIdentifier())
+            .name("Compression Codec")
+            .description("This parameter allows you to specify the compression codec for all"
+                    + " data generated by this producer.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY)
+            .defaultValue(COMPRESSION_CODEC_NONE.getValue())
             .build();
 
-        final List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(SEED_BROKERS);
-        props.add(TOPIC);
-        props.add(PARTITION_STRATEGY);
-        props.add(PARTITION);
-        props.add(KEY);
-        props.add(DELIVERY_GUARANTEE);
-        props.add(MESSAGE_DELIMITER);
-        props.add(MAX_BUFFER_SIZE);
-        props.add(MAX_RECORD_SIZE);
-        props.add(TIMEOUT);
-        props.add(BATCH_NUM_MESSAGES);
-        props.add(QUEUE_BUFFERING_MAX);
-        props.add(COMPRESSION_CODEC);
-        props.add(clientName);
-        return props;
+    // Relationships
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
+            .build();
+
+    protected static final String ATTR_PROC_ID = "PROC_ID";
+
+    protected static final String ATTR_FAILED_SEGMENTS = "FS";
+
+    protected static final String ATTR_TOPIC = "TOPIC";
+
+    protected static final String ATTR_KEY = "KEY";
+
+    protected static final String ATTR_DELIMITER = "DELIMITER";
+
+    private volatile KafkaPublisher kafkaPublisher;
+
+    private static final List<PropertyDescriptor> propertyDescriptors;
+
+    private static final Set<Relationship> relationships;
+
+    static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.add(SEED_BROKERS);
+        _propertyDescriptors.add(TOPIC);
+        _propertyDescriptors.add(PARTITION_STRATEGY);
+        _propertyDescriptors.add(PARTITION);
+        _propertyDescriptors.add(KEY);
+        _propertyDescriptors.add(DELIVERY_GUARANTEE);
+        _propertyDescriptors.add(MESSAGE_DELIMITER);
+        _propertyDescriptors.add(MAX_BUFFER_SIZE);
+        _propertyDescriptors.add(MAX_RECORD_SIZE);
+        _propertyDescriptors.add(TIMEOUT);
+        _propertyDescriptors.add(BATCH_NUM_MESSAGES);
+        _propertyDescriptors.add(QUEUE_BUFFERING_MAX);
+        _propertyDescriptors.add(COMPRESSION_CODEC);
+        _propertyDescriptors.add(CLIENT_NAME);
+        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
+
+        Set<Relationship> _relationships = new HashSet<>();
+        _relationships.add(REL_SUCCESS);
+        _relationships.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(_relationships);
     }
 
+    /**
+     *
+     */
+    @OnScheduled
+    public void createKafkaPublisher(ProcessContext context) {
+        this.kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context));
+        this.kafkaPublisher.setProcessLog(this.getLogger());
+    }
+
+    /**
+     *
+     */
+    @Override
+    public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile != null) {
+            final SplittableMessageContext messageContext = this.buildMessageContext(flowFile, context, session);
+            final Integer partitionKey = this.determinePartition(messageContext, context, flowFile);
+            final AtomicReference<BitSet> failedSegmentsRef = new AtomicReference<BitSet>();
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(InputStream contentStream) throws IOException {
+                    failedSegmentsRef.set(kafkaPublisher.publish(messageContext, contentStream, partitionKey));
+                }
+            });
+
+            if (failedSegmentsRef.get().isEmpty()) {
+                session.getProvenanceReporter().send(flowFile, context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName());
+                flowFile = this.cleanUpFlowFileIfNecessary(flowFile, session);
+                session.transfer(flowFile, REL_SUCCESS);
+            } else {
+                flowFile = session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(failedSegmentsRef.get(), messageContext));
+                session.transfer(flowFile, REL_FAILURE);
+            }
+
+        } else {
+            context.yield();
+        }
+    }
+
+    @OnStopped
+    public void cleanup() {
+        try {
+            this.kafkaPublisher.close();
+        } catch (Exception e) {
+            getLogger().warn("Failed while closing KafkaPublisher", e);
+        }
+    }
 
     @Override
     public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>(1);
-        relationships.add(REL_SUCCESS);
-        relationships.add(REL_FAILURE);
         return relationships;
     }
 
     @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+                .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
+                .build();
+    }
+
+    @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
         final List<ValidationResult> results = new ArrayList<>();
 
         final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue();
-        if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) && !validationContext.getProperty(PARTITION).isSet()) {
-            results.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation(
-                "The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy").build());
+        if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
+                && !validationContext.getProperty(PARTITION).isSet()) {
+            results.add(new ValidationResult.Builder().subject("Partition").valid(false)
+                    .explanation("The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy")
+                    .build());
         }
-
         return results;
     }
 
-    protected Producer<byte[], byte[]> getProducer() {
-        return producer;
+    /**
+     *
+     */
+    private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) {
+        if (flowFile.getAttribute(ATTR_FAILED_SEGMENTS) != null) {
+            flowFile = session.removeAttribute(flowFile, ATTR_FAILED_SEGMENTS);
+            flowFile = session.removeAttribute(flowFile, ATTR_KEY);
+            flowFile = session.removeAttribute(flowFile, ATTR_TOPIC);
+            flowFile = session.removeAttribute(flowFile, ATTR_DELIMITER);
+            flowFile = session.removeAttribute(flowFile, ATTR_PROC_ID);
+        }
+        return flowFile;
     }
 
-    @OnStopped
-    public void cleanup() {
-        final Producer<byte[], byte[]> producer = getProducer();
-        if (producer != null) {
-            producer.close();
-        }
-
-        for (final FlowFileMessageBatch batch : activeBatches) {
-            batch.cancelOrComplete();
-        }
-        if (this.executor != null) {
-            this.executor.shutdown();
-            try {
-                if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) {
-                    this.executor.shutdownNow();
-                    getLogger().warn("Executor did not stop in 30 sec. Terminated.");
-                }
-                this.executor = null;
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+    /**
+     *
+     */
+    private Integer determinePartition(SplittableMessageContext messageContext, ProcessContext context, FlowFile flowFile) {
+        String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
+        Integer partitionValue = null;
+        if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
+            String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+            if (pv != null){
+                partitionValue = Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
             }
         }
+        return partitionValue;
     }
 
-    @OnScheduled
-    public void createProducer(final ProcessContext context) {
-        this.deadlockTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
-        if (this.executor == null || this.executor.isShutdown()) {
-            this.executor = Executors.newCachedThreadPool();
+    /**
+     *
+     */
+    private Map<String, String> buildFailedFlowFileAttributes(BitSet failedSegments, SplittableMessageContext messageContext) {
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put(ATTR_PROC_ID, this.getIdentifier());
+        attributes.put(ATTR_FAILED_SEGMENTS, new String(failedSegments.toByteArray(), StandardCharsets.UTF_8));
+        attributes.put(ATTR_TOPIC, messageContext.getTopicName());
+        attributes.put(ATTR_KEY, messageContext.getKeyBytesAsString());
+        attributes.put(ATTR_DELIMITER, messageContext.getDelimiterPattern());
+        return attributes;
+    }
+
+    /**
+     *
+     */
+    private SplittableMessageContext buildMessageContext(FlowFile flowFile, ProcessContext context, ProcessSession session) {
+        String topicName;
+        byte[] key;
+        String delimiterPattern;
+
+        String failedSegmentsString = flowFile.getAttribute(ATTR_FAILED_SEGMENTS);
+        if (flowFile.getAttribute(ATTR_PROC_ID) != null && flowFile.getAttribute(ATTR_PROC_ID).equals(this.getIdentifier()) && failedSegmentsString != null) {
+            topicName = flowFile.getAttribute(ATTR_TOPIC);
+            key = flowFile.getAttribute(ATTR_KEY).getBytes();
+            delimiterPattern = flowFile.getAttribute(ATTR_DELIMITER);
+        } else {
+            failedSegmentsString = null;
+            topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+            String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+            key = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8);
+            delimiterPattern = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
         }
-        producer = new KafkaProducer<byte[], byte[]>(createConfig(context), new ByteArraySerializer(), new ByteArraySerializer());
+        SplittableMessageContext messageContext = new SplittableMessageContext(topicName, key, delimiterPattern);
+        if (failedSegmentsString != null) {
+            messageContext.setFailedSegmentsAsByteArray(failedSegmentsString.getBytes());
+        }
+        return messageContext;
     }
 
-    protected int getActiveMessageBatchCount() {
-        return activeBatches.size();
-    }
-
-    protected int getCompleteMessageBatchCount() {
-        return completeBatches.size();
-    }
-
-    protected Properties createConfig(final ProcessContext context) {
-        final String brokers = context.getProperty(SEED_BROKERS).getValue();
-
-        final Properties properties = new Properties();
-        properties.setProperty("bootstrap.servers", brokers);
+    /**
+     *
+     */
+    private Properties buildKafkaConfigProperties(final ProcessContext context) {
+        Properties properties = new Properties();
+        String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
+        properties.setProperty("bootstrap.servers", context.getProperty(SEED_BROKERS).getValue());
         properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
+        properties.setProperty("buffer.memory", String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue()));
+        properties.setProperty("compression.type", context.getProperty(COMPRESSION_CODEC).getValue());
+        if (context.getProperty(MESSAGE_DELIMITER).isSet()) {
+            properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue());
+        } else {
+            properties.setProperty("batch.size", "1");
+        }
+
         properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
-
-        final String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-        properties.setProperty("timeout.ms", timeout);
-        properties.setProperty("metadata.fetch.timeout.ms", timeout);
-
-        properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue());
-        properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue()));
-
-        final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
-        properties.setProperty("buffer.memory", String.valueOf(maxBufferSize));
-
-        final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
-        properties.setProperty("compression.type", compressionCodec);
-
-        final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
+        Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
         if (queueBufferingMillis != null) {
             properties.setProperty("linger.ms", String.valueOf(queueBufferingMillis));
         }
+        properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue()));
+        properties.setProperty("timeout.ms", timeout);
+        properties.setProperty("metadata.fetch.timeout.ms", timeout);
 
-        properties.setProperty("retries", "0");
-        properties.setProperty("block.on.buffer.full", "false");
+        String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
+        String partitionerClass = null;
+        if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
+            partitionerClass = Partitioners.RoundRobinPartitioner.class.getName();
+        } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
+            partitionerClass = DefaultPartitioner.class.getName();
+        }
+        properties.setProperty("partitioner.class", partitionerClass);
 
+        // Set Dynamic Properties
         for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
             PropertyDescriptor descriptor = entry.getKey();
             if (descriptor.isDynamic()) {
@@ -396,494 +470,6 @@
                 properties.setProperty(descriptor.getName(), entry.getValue());
             }
         }
-
         return properties;
     }
-
-    private Integer getPartition(final ProcessContext context, final FlowFile flowFile, final String topic) {
-        final long unnormalizedIndex;
-
-        final String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
-        if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
-            AtomicLong partitionIndex = partitionIndexMap.get(topic);
-            if (partitionIndex == null) {
-                partitionIndex = new AtomicLong(0L);
-                final AtomicLong existing = partitionIndexMap.putIfAbsent(topic, partitionIndex);
-                if (existing != null) {
-                    partitionIndex = existing;
-                }
-            }
-
-            unnormalizedIndex = partitionIndex.getAndIncrement();
-        } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
-            return null;
-        } else {
-            if (context.getProperty(PARTITION).isSet()) {
-                final String partitionValue = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
-
-                if (NUMBER_PATTERN.matcher(partitionValue).matches()) {
-                    // Subtract 1 because if the partition is "3" then we want to get index 2 into the List of partitions.
-                    unnormalizedIndex = Long.parseLong(partitionValue) - 1;
-                } else {
-                    unnormalizedIndex = partitionValue.hashCode();
-                }
-            } else {
-                return null;
-            }
-        }
-
-        final Producer<byte[], byte[]> producer = getProducer();
-        final List<PartitionInfo> partitionInfos = producer.partitionsFor(topic);
-        final int partitionIdx = (int) (unnormalizedIndex % partitionInfos.size());
-        return partitionInfos.get(partitionIdx).partition();
-    }
-
-    @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
-                .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
-                .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
-                .build();
-    }
-
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
-        FlowFileMessageBatch batch;
-        while ((batch = completeBatches.poll()) != null) {
-            batch.completeSession();
-        }
-
-        final ProcessSession session = sessionFactory.createSession();
-        final FlowFile flowFile = session.get();
-        if (flowFile != null){
-            Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                    doOnTrigger(context, session, flowFile);
-                    return null;
-                }
-            });
-            try {
-                consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                consumptionFuture.cancel(true);
-                Thread.currentThread().interrupt();
-                getLogger().warn("Interrupted while sending messages", e);
-            } catch (ExecutionException e) {
-                throw new IllegalStateException(e);
-            } catch (TimeoutException e) {
-                consumptionFuture.cancel(true);
-                getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while sending messages", e);
-            }
-        } else {
-            context.yield();
-        }
-    }
-
-    private void doOnTrigger(final ProcessContext context, ProcessSession session, final FlowFile flowFile) throws ProcessException {
-        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
-        final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-        final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8);
-        String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
-        if (delimiter != null) {
-            delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
-        }
-
-        final Producer<byte[], byte[]> producer = getProducer();
-
-        if (delimiter == null) {
-            // Send the entire FlowFile as a single message.
-            final byte[] value = new byte[(int) flowFile.getSize()];
-            session.read(flowFile, new InputStreamCallback() {
-                @Override
-                public void process(final InputStream in) throws IOException {
-                    StreamUtils.fillBuffer(in, value);
-                }
-            });
-
-            final Integer partition;
-            try {
-                partition = getPartition(context, flowFile, topic);
-            } catch (final Exception e) {
-                getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e});
-                session.transfer(session.penalize(flowFile), REL_FAILURE);
-                session.commit();
-                return;
-            }
-
-            final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, value);
-
-            final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic);
-            messageBatch.setNumMessages(1);
-            activeBatches.add(messageBatch);
-
-            try {
-                producer.send(producerRecord, new Callback() {
-                    @Override
-                    public void onCompletion(final RecordMetadata metadata, final Exception exception) {
-                        if (exception == null) {
-                            // record was successfully sent.
-                            messageBatch.addSuccessfulRange(0L, flowFile.getSize(), metadata.offset());
-                        } else {
-                            messageBatch.addFailedRange(0L, flowFile.getSize(), exception);
-                        }
-                    }
-                });
-            } catch (final BufferExhaustedException bee) {
-                messageBatch.addFailedRange(0L, flowFile.getSize(), bee);
-                context.yield();
-                return;
-            }
-        } else {
-            final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
-
-            // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see
-            // if it matches some pattern. We can use this to search for the delimiter as we read through
-            // the stream of bytes in the FlowFile
-            final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
-
-            final LongHolder messagesSent = new LongHolder(0L);
-            final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic);
-            activeBatches.add(messageBatch);
-
-            try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-                session.read(flowFile, new InputStreamCallback() {
-                    @Override
-                    public void process(final InputStream rawIn) throws IOException {
-                        byte[] data = null; // contents of a single message
-
-                        boolean streamFinished = false;
-
-                        int nextByte;
-                        try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
-                            final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
-
-                            long messageStartOffset = in.getBytesConsumed();
-
-                            // read until we're out of data.
-                            while (!streamFinished) {
-                                nextByte = in.read();
-
-                                if (nextByte > -1) {
-                                    baos.write(nextByte);
-                                }
-
-                                if (nextByte == -1) {
-                                    // we ran out of data. This message is complete.
-                                    data = baos.toByteArray();
-                                    streamFinished = true;
-                                } else if (buffer.addAndCompare((byte) nextByte)) {
-                                    // we matched our delimiter. This message is complete. We want all of the bytes from the
-                                    // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want
-                                    // the delimiter itself to be sent.
-                                    data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length);
-                                }
-
-                                if (data != null) {
-                                    final long messageEndOffset = in.getBytesConsumed();
-
-                                    // If the message has no data, ignore it.
-                                    if (data.length != 0) {
-                                        final Integer partition;
-                                        try {
-                                            partition = getPartition(context, flowFile, topic);
-                                        } catch (final Exception e) {
-                                            messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e);
-                                            getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e});
-                                            continue;
-                                        }
-
-
-                                        final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data);
-                                        final long rangeStart = messageStartOffset;
-
-                                        try {
-                                            producer.send(producerRecord, new Callback() {
-                                                @Override
-                                                public void onCompletion(final RecordMetadata metadata, final Exception exception) {
-                                                    if (exception == null) {
-                                                        // record was successfully sent.
-                                                        messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, metadata.offset());
-                                                    } else {
-                                                        messageBatch.addFailedRange(rangeStart, messageEndOffset, exception);
-                                                    }
-                                                }
-                                            });
-
-                                            messagesSent.incrementAndGet();
-                                        } catch (final BufferExhaustedException bee) {
-                                            // Not enough room in the buffer. Add from the beginning of this message to end of FlowFile as a failed range
-                                            messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee);
-                                            context.yield();
-                                            return;
-                                        }
-
-                                    }
-
-                                    // reset BAOS so that we can start a new message.
-                                    baos.reset();
-                                    data = null;
-                                    messageStartOffset = in.getBytesConsumed();
-                                }
-                            }
-                        }
-                    }
-                });
-
-                messageBatch.setNumMessages(messagesSent.get());
-            }
-        }
-    }
-
-
-    private static class Range {
-        private final long start;
-        private final long end;
-        private final Long kafkaOffset;
-
-        public Range(final long start, final long end, final Long kafkaOffset) {
-            this.start = start;
-            this.end = end;
-            this.kafkaOffset = kafkaOffset;
-        }
-
-        public long getStart() {
-            return start;
-        }
-
-        public long getEnd() {
-            return end;
-        }
-
-        public Long getKafkaOffset() {
-            return kafkaOffset;
-        }
-
-        @Override
-        public String toString() {
-            return "Range[" + start + "-" + end + "]";
-        }
-    }
-
-    private class FlowFileMessageBatch {
-        private final ProcessSession session;
-        private final FlowFile flowFile;
-        private final String topic;
-        private final long startTime = System.nanoTime();
-
-        private final List<Range> successfulRanges = new ArrayList<>();
-        private final List<Range> failedRanges = new ArrayList<>();
-
-        private Exception lastFailureReason;
-        private long numMessages = -1L;
-        private long completeTime = 0L;
-        private boolean canceled = false;
-
-        public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile, final String topic) {
-            this.session = session;
-            this.flowFile = flowFile;
-            this.topic = topic;
-        }
-
-        public synchronized void cancelOrComplete() {
-            if (isComplete()) {
-                completeSession();
-                return;
-            }
-
-            this.canceled = true;
-
-            session.rollback();
-            successfulRanges.clear();
-            failedRanges.clear();
-        }
-
-        public synchronized void addSuccessfulRange(final long start, final long end, final long kafkaOffset) {
-            if (canceled) {
-                return;
-            }
-
-            successfulRanges.add(new Range(start, end, kafkaOffset));
-
-            if (isComplete()) {
-                activeBatches.remove(this);
-                completeBatches.add(this);
-                completeTime = System.nanoTime();
-            }
-        }
-
-        public synchronized void addFailedRange(final long start, final long end, final Exception e) {
-            if (canceled) {
-                return;
-            }
-
-            failedRanges.add(new Range(start, end, null));
-            lastFailureReason = e;
-
-            if (isComplete()) {
-                activeBatches.remove(this);
-                completeBatches.add(this);
-                completeTime = System.nanoTime();
-            }
-        }
-
-        private boolean isComplete() {
-            return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages);
-        }
-
-        public synchronized void setNumMessages(final long msgCount) {
-            this.numMessages = msgCount;
-
-            if (isComplete()) {
-                activeBatches.remove(this);
-                completeBatches.add(this);
-                completeTime = System.nanoTime();
-            }
-        }
-
-        private Long getMin(final Long a, final Long b) {
-            if (a == null && b == null) {
-                return null;
-            }
-
-            if (a == null) {
-                return b;
-            }
-
-            if (b == null) {
-                return a;
-            }
-
-            return Math.min(a, b);
-        }
-
-        private Long getMax(final Long a, final Long b) {
-            if (a == null && b == null) {
-                return null;
-            }
-
-            if (a == null) {
-                return b;
-            }
-
-            if (b == null) {
-                return a;
-            }
-
-            return Math.max(a, b);
-        }
-
-        private void transferRanges(final List<Range> ranges, final Relationship relationship) {
-            Collections.sort(ranges, new Comparator<Range>() {
-                @Override
-                public int compare(final Range o1, final Range o2) {
-                    return Long.compare(o1.getStart(), o2.getStart());
-                }
-            });
-
-            for (int i = 0; i < ranges.size(); i++) {
-                Range range = ranges.get(i);
-                int count = 1;
-                Long smallestKafkaOffset = range.getKafkaOffset();
-                Long largestKafkaOffset = range.getKafkaOffset();
-
-                while (i + 1 < ranges.size()) {
-                    // Check if the next range in the List continues where this one left off.
-                    final Range nextRange = ranges.get(i + 1);
-
-                    if (nextRange.getStart() == range.getEnd()) {
-                        // We have two ranges in a row that are contiguous; combine them into a single Range.
-                        range = new Range(range.getStart(), nextRange.getEnd(), null);
-
-                        smallestKafkaOffset = getMin(smallestKafkaOffset, nextRange.getKafkaOffset());
-                        largestKafkaOffset = getMax(largestKafkaOffset, nextRange.getKafkaOffset());
-                        count++;
-                        i++;
-                    } else {
-                        break;
-                    }
-                }
-
-                // Create a FlowFile for this range.
-                FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart());
-                if (relationship == REL_SUCCESS) {
-                    session.getProvenanceReporter().send(child, getTransitUri(), "Sent " + count + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset);
-                    session.transfer(child, relationship);
-                } else {
-                    session.transfer(session.penalize(child), relationship);
-                }
-            }
-        }
-
-        private String getTransitUri() {
-            final List<PartitionInfo> partitions = getProducer().partitionsFor(topic);
-            if (partitions.isEmpty()) {
-                return "kafka://unknown-host" + "/topics/" + topic;
-            }
-
-            final PartitionInfo info = partitions.get(0);
-            final Node leader = info.leader();
-            final String host = leader.host();
-            final int port = leader.port();
-
-            return "kafka://" + host + ":" + port + "/topics/" + topic;
-        }
-
-        public synchronized void completeSession() {
-            if (canceled) {
-                return;
-            }
-
-            if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
-                getLogger().info("Completed processing {} but sent 0 FlowFiles to Kafka", new Object[] {flowFile});
-                session.transfer(flowFile, REL_SUCCESS);
-                session.commit();
-                return;
-            }
-
-            if (successfulRanges.isEmpty()) {
-                getLogger().error("Failed to send {} to Kafka; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason});
-                session.transfer(session.penalize(flowFile), REL_FAILURE);
-                session.commit();
-                return;
-            }
-
-            if (failedRanges.isEmpty()) {
-                final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
-
-                if (successfulRanges.size() == 1) {
-                    final Long kafkaOffset = successfulRanges.get(0).getKafkaOffset();
-                    final String msg = "Sent 1 message" + ((kafkaOffset == null) ? "" : ("; Kafka offset = " + kafkaOffset));
-                    session.getProvenanceReporter().send(flowFile, getTransitUri(), msg);
-                } else {
-                    long smallestKafkaOffset = successfulRanges.get(0).getKafkaOffset();
-                    long largestKafkaOffset = successfulRanges.get(0).getKafkaOffset();
-
-                    for (final Range range : successfulRanges) {
-                        smallestKafkaOffset = Math.min(smallestKafkaOffset, range.getKafkaOffset());
-                        largestKafkaOffset = Math.max(largestKafkaOffset, range.getKafkaOffset());
-                    }
-
-                    session.getProvenanceReporter().send(flowFile, getTransitUri(),
-                        "Sent " + successfulRanges.size() + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset);
-                }
-
-                session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
-                session.commit();
-                return;
-            }
-
-            // At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way to Kafka
-            // successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success'
-            // and the failed messages to 'failure'.
-            transferRanges(successfulRanges, REL_SUCCESS);
-            transferRanges(failedRanges, REL_FAILURE);
-            session.remove(flowFile);
-            getLogger().error("Successfully sent {} messages to Kafka but failed to send {} messages; the last error received was {}",
-                new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason});
-            session.commit();
-        }
-    }
 }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java
new file mode 100644
index 0000000..9967404
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.util.BitSet;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+/**
+ * Context object that serves as a bridge between the content of a FlowFile and
+ * Kafka message(s). It contains all necessary information to allow
+ * {@link KafkaPublisher} to determine how a each content of the
+ * {@link FlowFile} must be sent to Kafka.
+ */
+final class SplittableMessageContext {
+    private final String topicName;
+
+    private final String delimiterPattern;
+
+    private final byte[] keyBytes;
+
+    private volatile BitSet failedSegments;
+
+    /**
+     * @param topicName
+     *            the name of the Kafka topic
+     * @param keyBytes
+     *            the instance of byte[] representing the key. Can be null.
+     * @param delimiterPattern
+     *            the string representing the delimiter regex pattern. Can be
+     *            null. For cases where it is null the EOF pattern will be used
+     *            - "(\\W)\\Z".
+     */
+    SplittableMessageContext(String topicName, byte[] keyBytes, String delimiterPattern) {
+        this.topicName = topicName;
+        this.keyBytes = keyBytes;
+        this.delimiterPattern = delimiterPattern != null ? delimiterPattern : "(\\W)\\Z";
+    }
+
+    /**
+     *
+     */
+    @Override
+    public String toString() {
+        return "topic: '" + topicName + "'; delimiter: '" + delimiterPattern + "'";
+    }
+
+    /**
+     *
+     */
+    void setFailedSegments(int... failedSegments) {
+        this.failedSegments = new BitSet();
+        for (int failedSegment : failedSegments) {
+            this.failedSegments.set(failedSegment);
+        }
+    }
+
+    /**
+     *
+     */
+    void setFailedSegmentsAsByteArray(byte[] failedSegments) {
+        this.failedSegments = BitSet.valueOf(failedSegments);
+    }
+
+    /**
+     * Returns the list of integers representing the segments (chunks) of the
+     * delimited content stream that had failed to be sent to Kafka topic.
+     */
+    BitSet getFailedSegments() {
+        return this.failedSegments;
+    }
+
+    /**
+     * Returns the name of the Kafka topic
+     */
+    String getTopicName() {
+        return this.topicName;
+    }
+
+    /**
+     * Returns the value of the delimiter regex pattern.
+     */
+    String getDelimiterPattern() {
+        return this.delimiterPattern;
+    }
+
+    /**
+     * Returns the key bytes as String
+     */
+    String getKeyBytesAsString() {
+        return new String(this.keyBytes);
+    }
+
+    /**
+     * Returns the key bytes
+     */
+    byte[] getKeyBytes() {
+        return this.keyBytes;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
new file mode 100644
index 0000000..e959fdd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
+
+/**
+ *
+ */
+class StreamScanner {
+
+    private final InputStream is;
+
+    private final byte[] delimiter;
+
+    private final NonThreadSafeCircularBuffer buffer;
+
+    private final ByteArrayOutputStream baos;
+
+    private byte[] data;
+
+    private boolean eos;
+
+    /**
+     *
+     */
+    StreamScanner(InputStream is, String delimiter) {
+        this.is = new BufferedInputStream(is);
+        this.delimiter = delimiter.getBytes();
+        buffer = new NonThreadSafeCircularBuffer(this.delimiter);
+        baos = new ByteArrayOutputStream();
+    }
+
+    /**
+     *
+     */
+    boolean hasNext() {
+        this.data = null;
+        if (!this.eos) {
+            try {
+                boolean keepReading = true;
+                while (keepReading) {
+                    byte b = (byte) this.is.read();
+                    if (b > -1) {
+                        baos.write(b);
+                        if (buffer.addAndCompare(b)) {
+                            this.data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiter.length);
+                            keepReading = false;
+                        }
+                    } else {
+                        this.data = baos.toByteArray();
+                        keepReading = false;
+                        this.eos = true;
+                    }
+                }
+                baos.reset();
+            } catch (Exception e) {
+                throw new IllegalStateException("Failed while reading InputStream", e);
+            }
+        }
+        return this.data != null;
+    }
+
+    /**
+     *
+     */
+    byte[] next() {
+        return this.data;
+    }
+
+    void close() {
+        this.baos.close();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
new file mode 100644
index 0000000..92a6307
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+public class KafkaPublisherTest {
+
+    private static final String sampleData = "The true sign of intelligence is not knowledge but imagination.\n"
+            + "It's not that I'm so smart, it's just that I stay with problems longer.\n"
+            + "The only source of knowledge is experience.\n"
+            + "Only two things are infinite, the universe and human stupidity, and I'm not sure about the former.\n";
+
+    private static final String sampleData2 = "foo|bar|baz";
+
+    private static EmbeddedKafka kafkaLocal;
+
+    private static EmbeddedKafkaProducerHelper producerHelper;
+
+    @BeforeClass
+    public static void bforeClass() {
+        kafkaLocal = new EmbeddedKafka();
+        kafkaLocal.start();
+        producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producerHelper.close();
+        kafkaLocal.stop();
+    }
+
+    String test = "Khalid El Bakraoui rented an apartment in Brussels that was raided last week and both are suspected of having ties to "
+            + "the terror attacks in Paris in November, the source said. While Belgian officials say both brothers were suicide bombers, a U.S. "
+            + "official briefed earlier on preliminary evidence from the investigation says authorities are looking at the possibility that one of "
+            + "the airport explosions may have been caused by a bomb inside a suitcase and the other was a suicide bombing. But identifying the brothers "
+            + "should help spring the investigation forward, says Cedric Leighton, a CNN military analyst and the former deputy director for the Joint Chiefs of Staff.";
+
+    @Test
+    public void validateSuccessfulSendAsWhole() throws Exception {
+        InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulSendAsWhole";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, null);
+
+        publisher.publish(messageContext, fis, null);
+
+        fis.close();
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        assertNotNull(iter.next());
+        try {
+            iter.next();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    @Test
+    public void validateSuccessfulSendAsDelimited() throws Exception {
+        InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulSendAsDelimited";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n");
+
+        publisher.publish(messageContext, fis, null);
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    @Test
+    public void validateSuccessfulSendAsDelimited2() throws Exception {
+        InputStream fis = new ByteArrayInputStream(sampleData2.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulSendAsDelimited2";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "|");
+
+        publisher.publish(messageContext, fis, null);
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    @Test
+    public void validateSuccessfulReSendOfFailedSegments() throws Exception {
+        InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulReSendOfFailedSegments";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n");
+        messageContext.setFailedSegments(1, 3);
+
+        publisher.publish(messageContext, fis, null);
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        String m1 = new String(iter.next().message());
+        String m2 = new String(iter.next().message());
+        assertEquals("It's not that I'm so smart, it's just that I stay with problems longer.", m1);
+        assertEquals("Only two things are infinite, the universe and human stupidity, and I'm not sure about the former.", m2);
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    private Properties buildProducerProperties() {
+        Properties kafkaProperties = new Properties();
+        kafkaProperties.setProperty("bootstrap.servers", "0.0.0.0:" + kafkaLocal.getKafkaPort());
+        kafkaProperties.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
+        kafkaProperties.setProperty("acks", "1");
+        kafkaProperties.put("auto.create.topics.enable", "true");
+        kafkaProperties.setProperty("partitioner.class", "org.apache.nifi.processors.kafka.Partitioners$RoundRobinPartitioner");
+        kafkaProperties.setProperty("timeout.ms", "5000");
+        return kafkaProperties;
+    }
+
+    private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort());
+        props.put("group.id", "test");
+        props.put("consumer.timeout.ms", "5000");
+        props.put("auto.offset.reset", "smallest");
+        ConsumerConfig consumerConfig = new ConsumerConfig(props);
+        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+        Map<String, Integer> topicCountMap = new HashMap<>(1);
+        topicCountMap.put(topic, 1);
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+        ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
+        return iter;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 2f5da5c..3ed0549 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -17,462 +17,189 @@
 package org.apache.nifi.processors.kafka;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Future;
+import java.util.Properties;
 
-import org.apache.kafka.clients.producer.BufferExhaustedException;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.provenance.ProvenanceEventRecord;
-import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
-import kafka.common.FailedToSendMessageException;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
 
 
 public class TestPutKafka {
 
-    @Test
-    public void testMultipleKeyValuePerFlowFile() {
-        final TestableProcessor proc = new TestableProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+    private static EmbeddedKafka kafkaLocal;
 
-        runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
-        runner.run(2); // we have to run twice because the first iteration will result in data being added to a queue in the processor; the second onTrigger call will transfer FlowFiles.
+    private static EmbeddedKafkaProducerHelper producerHelper;
+
+    @BeforeClass
+    public static void bforeClass() {
+        kafkaLocal = new EmbeddedKafka();
+        kafkaLocal.start();
+        producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producerHelper.close();
+        kafkaLocal.stop();
+    }
+
+    @Test
+    public void testDelimitedMessagesWithKey() {
+        String topicName = "testDelimitedMessagesWithKey";
+        PutKafka putKafka = new PutKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PutKafka.TOPIC, topicName);
+        runner.setProperty(PutKafka.CLIENT_NAME, "foo");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
+
+        runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes());
+        runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+        ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
+        assertEquals("Hello World", new String(consumer.next().message()));
+        assertEquals("Goodbye", new String(consumer.next().message()));
+        assertEquals("1", new String(consumer.next().message()));
+        assertEquals("2", new String(consumer.next().message()));
+        assertEquals("3", new String(consumer.next().message()));
+        assertEquals("4", new String(consumer.next().message()));
+        assertEquals("5", new String(consumer.next().message()));
 
-        final List<ProducerRecord<byte[], byte[]>> messages = ((MockProducer) proc.getProducer()).getMessages();
-        assertEquals(11, messages.size());
-
-        assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0).value()));
-        assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1).value()));
-
-        for (int i = 1; i <= 9; i++) {
-            assertTrue(Arrays.equals(String.valueOf(i).getBytes(StandardCharsets.UTF_8), messages.get(i + 1).value()));
-        }
+        runner.shutdown();
     }
 
     @Test
-    public void testWithImmediateFailure() {
-        final TestableProcessor proc = new TestableProcessor(0);
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
+    @Ignore
+    public void testWithFailureAndPartialResend() throws Exception {
+        String topicName = "testWithImmediateFailure";
+        PutKafka putKafka = new PutKafka();
+        final TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PutKafka.TOPIC, topicName);
+        runner.setProperty(PutKafka.CLIENT_NAME, "foo");
         runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        runner.setProperty(PutKafka.SEED_BROKERS, "0.0.0.0:" + kafkaLocal.getKafkaPort());
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
-        final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
+        final String text = "Hello World\nGoodbye\n1\n2";
         runner.enqueue(text.getBytes());
-        runner.run(2);
+        afterClass(); // kill Kafka right before send to ensure producer fails
+        runner.run(1, false);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
-        final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
-        mff.assertContentEquals(text);
-    }
+        MockFlowFile ff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+        String failedSegmentsStr = ff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS);
+        BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes());
+        assertTrue(fs.get(0));
+        assertTrue(fs.get(1));
+        assertTrue(fs.get(2));
+        assertTrue(fs.get(3));
+        String delimiter = ff.getAttribute(PutKafka.ATTR_DELIMITER);
+        assertEquals("\n", delimiter);
+        String key = ff.getAttribute(PutKafka.ATTR_KEY);
+        assertEquals("key1", key);
+        String topic = ff.getAttribute(PutKafka.ATTR_TOPIC);
+        assertEquals(topicName, topic);
 
-    @Test
-    public void testPartialFailure() {
-        final TestableProcessor proc = new TestableProcessor(2); // fail after sending 2 messages.
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
+        bforeClass();
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
+        Map<String, String> attr = new HashMap<>(ff.getAttributes());
+        /*
+         * So here we are emulating partial success. Basically even though all 4
+         * messages failed to be sent by changing the ATTR_FAILED_SEGMENTS value
+         * we essentially saying that only two failed and need to be resent.
+         */
+        BitSet _fs = new BitSet();
+        _fs.set(1);
+        _fs.set(3);
+        attr.put(PutKafka.ATTR_FAILED_SEGMENTS, new String(_fs.toByteArray(), StandardCharsets.UTF_8));
+        ff.putAttributes(attr);
+        runner.enqueue(ff);
+        runner.run(1, false);
+        MockFlowFile sff = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
+        assertNull(sff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS));
+        assertNull(sff.getAttribute(PutKafka.ATTR_TOPIC));
+        assertNull(sff.getAttribute(PutKafka.ATTR_KEY));
+        assertNull(sff.getAttribute(PutKafka.ATTR_DELIMITER));
 
-        final byte[] bytes = "1\n2\n3\n4".getBytes();
-        runner.enqueue(bytes);
-        runner.run(2);
+        ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
 
-        runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
-        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
-
-        final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
-        successFF.assertContentEquals("1\n2\n");
-
-        final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
-        failureFF.assertContentEquals("3\n4");
-    }
-
-    @Test
-    public void testPartialFailureWithSuccessBeforeAndAfter() {
-        final TestableProcessor proc = new TestableProcessor(2, 4); // fail after sending 2 messages, then stop failing after 4
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-        runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
-
-        final byte[] bytes = "1\n2\n3\n4\n5\n6".getBytes();
-        runner.enqueue(bytes);
-        runner.run(2);
-
-        runner.assertTransferCount(PutKafka.REL_SUCCESS, 2);
-        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
-
-        final List<MockFlowFile> success = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
-        for (final MockFlowFile successFF : success) {
-            if ('1' == successFF.toByteArray()[0]) {
-                successFF.assertContentEquals("1\n2\n");
-            } else if ('5' == successFF.toByteArray()[0]) {
-                successFF.assertContentEquals("5\n6");
-            } else {
-                Assert.fail("Wrong content for FlowFile; contained " + new String(successFF.toByteArray()));
-            }
+        assertEquals("Goodbye", new String(consumer.next().message()));
+        assertEquals("2", new String(consumer.next().message()));
+        try {
+            consumer.next();
+            fail();
+        } catch (Exception e) {
+            // ignore
         }
-
-        final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
-        failureFF.assertContentEquals("3\n4\n");
     }
 
-
     @Test
     public void testWithEmptyMessages() {
-        final TestableProcessor proc = new TestableProcessor();
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
+        String topicName = "testWithEmptyMessages";
+        PutKafka putKafka = new PutKafka();
+        final TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PutKafka.TOPIC, topicName);
         runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        runner.setProperty(PutKafka.CLIENT_NAME, "foo");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
 
         final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
         runner.enqueue(bytes);
-        runner.run(2);
+        runner.run(1);
 
         runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
 
-        final List<ProducerRecord<byte[], byte[]>> msgs = ((MockProducer) proc.getProducer()).getMessages();
-        assertEquals(4, msgs.size());
-
-        for (int i = 1; i <= 4; i++) {
-            assertTrue(Arrays.equals(String.valueOf(i).getBytes(), msgs.get(i - 1).value()));
+        ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
+        assertNotNull(consumer.next());
+        assertNotNull(consumer.next());
+        assertNotNull(consumer.next());
+        assertNotNull(consumer.next());
+        try {
+            consumer.next();
+            fail();
+        } catch (Exception e) {
+            // ignore
         }
     }
 
-    @Test
-    public void testProvenanceReporterMessagesCount() {
-        final TestableProcessor processor = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(processor);
-
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
-
-        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
-        runner.enqueue(bytes);
-        runner.run(2);
-
-        final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
-        assertEquals(1, events.size());
-        final ProvenanceEventRecord event = events.get(0);
-        assertEquals(ProvenanceEventType.SEND, event.getEventType());
-        assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri());
-        assertTrue(event.getDetails().startsWith("Sent 4 messages"));
-    }
-
-    @Test
-    public void testProvenanceReporterWithoutDelimiterMessagesCount() {
-        final TestableProcessor processor = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(processor);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-
-        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
-        runner.enqueue(bytes);
-        runner.run(2);
-
-        final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
-        assertEquals(1, events.size());
-        final ProvenanceEventRecord event = events.get(0);
-        assertEquals(ProvenanceEventType.SEND, event.getEventType());
-        assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri());
-    }
-
-    @Test
-    public void testRoundRobinAcrossMultipleMessages() {
-        final TestableProcessor proc = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING);
-
-        runner.enqueue("hello".getBytes());
-        runner.enqueue("there".getBytes());
-        runner.enqueue("how are you".getBytes());
-        runner.enqueue("today".getBytes());
-
-        runner.run(5);
-
-        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
-
-        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
-        for (int i = 0; i < 3; i++) {
-            assertEquals(i + 1, records.get(i).partition().intValue());
-        }
-
-        assertEquals(1, records.get(3).partition().intValue());
-    }
-
-    @Test
-    public void testRoundRobinAcrossMultipleMessagesInSameFlowFile() {
-        final TestableProcessor proc = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING);
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
-
-        runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes());
-
-        runner.run(2);
-
-        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
-
-        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
-        for (int i = 0; i < 3; i++) {
-            assertEquals(i + 1, records.get(i).partition().intValue());
-        }
-
-        assertEquals(1, records.get(3).partition().intValue());
-    }
-
-
-    @Test
-    public void testUserDefinedPartition() {
-        final TestableProcessor proc = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
-        runner.setProperty(PutKafka.PARTITION, "${part}");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
-
-        final Map<String, String> attrs = new HashMap<>();
-        attrs.put("part", "3");
-        runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
-
-        runner.run(2);
-
-        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
-
-        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
-        for (int i = 0; i < 4; i++) {
-            assertEquals(3, records.get(i).partition().intValue());
-        }
-    }
-
-
-
-    @Test
-    public void testUserDefinedPartitionWithInvalidValue() {
-        final TestableProcessor proc = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
-        runner.setProperty(PutKafka.PARTITION, "${part}");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
-
-        final Map<String, String> attrs = new HashMap<>();
-        attrs.put("part", "bogus");
-        runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
-
-        runner.run(2);
-
-        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
-
-        final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
-        // should all be the same partition, regardless of what partition it is.
-        final int partition = records.get(0).partition().intValue();
-
-        for (int i = 0; i < 4; i++) {
-            assertEquals(partition, records.get(i).partition().intValue());
-        }
-    }
-
-
-    @Test
-    public void testFullBuffer() {
-        final TestableProcessor proc = new TestableProcessor();
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutKafka.TOPIC, "topic1");
-        runner.setProperty(PutKafka.KEY, "key1");
-        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
-        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
-        runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "5 B");
-        proc.setMaxQueueSize(10L); // will take 4 bytes for key and 1 byte for value.
-
-        runner.enqueue("1\n2\n3\n4\n".getBytes());
-        runner.run(2);
-
-        runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
-        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
-
-        runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0).assertContentEquals("1\n2\n");
-        runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0).assertContentEquals("3\n4\n");
-    }
-
-
-    /**
-     * Used to override the {@link #getProducer()} method so that we can enforce that our MockProducer is used
-     */
-    private static class TestableProcessor extends PutKafka {
-        private final MockProducer producer;
-
-        public TestableProcessor() {
-            this(null);
-        }
-
-        public TestableProcessor(final Integer failAfter) {
-            this(failAfter, null);
-        }
-
-        public TestableProcessor(final Integer failAfter, final Integer stopFailingAfter) {
-            producer = new MockProducer();
-            producer.setFailAfter(failAfter);
-            producer.setStopFailingAfter(stopFailingAfter);
-        }
-
-        @Override
-        protected Producer<byte[], byte[]> getProducer() {
-            return producer;
-        }
-
-        public void setMaxQueueSize(final long bytes) {
-            producer.setMaxQueueSize(bytes);
-        }
-    }
-
-
-    /**
-     * We have our own Mock Producer, which is very similar to the Kafka-supplied one. However, with the Kafka-supplied
-     * Producer, we don't have the ability to tell it to fail after X number of messages; rather, we can only tell it
-     * to fail on the next message. Since we are sending multiple messages in a single onTrigger call for the Processor,
-     * this doesn't allow us to test failure conditions adequately.
-     */
-    private static class MockProducer implements Producer<byte[], byte[]> {
-
-        private int sendCount = 0;
-        private Integer failAfter;
-        private Integer stopFailingAfter;
-        private long queueSize = 0L;
-        private long maxQueueSize = Long.MAX_VALUE;
-
-        private final List<ProducerRecord<byte[], byte[]>> messages = new ArrayList<>();
-
-        public MockProducer() {
-        }
-
-        public void setMaxQueueSize(final long bytes) {
-            this.maxQueueSize = bytes;
-        }
-
-        public List<ProducerRecord<byte[], byte[]>> getMessages() {
-            return messages;
-        }
-
-        public void setFailAfter(final Integer successCount) {
-            failAfter = successCount;
-        }
-
-        public void setStopFailingAfter(final Integer stopFailingAfter) {
-            this.stopFailingAfter = stopFailingAfter;
-        }
-
-        @Override
-        public Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record) {
-            return send(record, null);
-        }
-
-        @Override
-        public Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
-            sendCount++;
-
-            final ByteArraySerializer serializer = new ByteArraySerializer();
-            final int keyBytes = serializer.serialize(record.topic(), record.key()).length;
-            final int valueBytes = serializer.serialize(record.topic(), record.value()).length;
-            if (maxQueueSize - queueSize < keyBytes + valueBytes) {
-                throw new BufferExhaustedException("Queue size is " + queueSize + " but serialized message is " + (keyBytes + valueBytes));
-            }
-
-            queueSize += keyBytes + valueBytes;
-
-            if (failAfter != null && sendCount > failAfter && ((stopFailingAfter == null) || (sendCount < stopFailingAfter + 1))) {
-                final Exception e = new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
-                callback.onCompletion(null, e);
-            } else {
-                messages.add(record);
-                final RecordMetadata meta = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 1 : record.partition()), 0L, 0L);
-                callback.onCompletion(meta, null);
-            }
-
-            // we don't actually look at the Future in the processor, so we can just return null
-            return null;
-        }
-
-        @Override
-        public List<PartitionInfo> partitionsFor(String topic) {
-            final Node leader = new Node(1, "localhost", 1111);
-            final Node node2 = new Node(2, "localhost-2", 2222);
-            final Node node3 = new Node(3, "localhost-3", 3333);
-
-            final PartitionInfo partInfo1 = new PartitionInfo(topic, 1, leader, new Node[] {node2, node3}, new Node[0]);
-            final PartitionInfo partInfo2 = new PartitionInfo(topic, 2, leader, new Node[] {node2, node3}, new Node[0]);
-            final PartitionInfo partInfo3 = new PartitionInfo(topic, 3, leader, new Node[] {node2, node3}, new Node[0]);
-
-            final List<PartitionInfo> infos = new ArrayList<>(3);
-            infos.add(partInfo1);
-            infos.add(partInfo2);
-            infos.add(partInfo3);
-            return infos;
-        }
-
-        @Override
-        public Map<MetricName, ? extends Metric> metrics() {
-            return Collections.emptyMap();
-        }
-
-        @Override
-        public void close() {
-        }
+    private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort());
+        props.put("group.id", "test");
+        props.put("consumer.timeout.ms", "5000");
+        props.put("auto.offset.reset", "smallest");
+        ConsumerConfig consumerConfig = new ConsumerConfig(props);
+        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+        Map<String, Integer> topicCountMap = new HashMap<>(1);
+        topicCountMap.put(topic, 1);
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+        ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
+        return iter;
     }
 }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties
index 35778d8..8e37bb9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties
@@ -12,12 +12,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootCategory=WARN, stdout
+log4j.rootCategory=INFO, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n
 
-log4j.category.org.apache.nifi.processors.kafka=INFO
-log4j.category.kafka=ERROR
-#log4j.category.org.apache.nifi.startup=INFO
+log4j.category.org.apache.nifi.processors.kafka=DEBUG