DRILL-7388: Kafka improvements

1. Upgraded Kafka libraries to 2.3.1 (DRILL-6739).
2. Added new options to support the same features as native JSON reader:
  a. store.kafka.reader.skip_invalid_records, default: false (DRILL-6723);
  b. store.kafka.reader.allow_nan_inf, default: true;
  c. store.kafka.reader.allow_escape_any_char, default: false.
3. Fixed issue when Kafka topic contains only one message (DRILL-7388).
4. Replaced Gson parser with Jackson to parse JSON in the same manner as Drill native Json reader.
5. Performance improvements: Kafka consumers will be closed async, fixed issue with resource leak (DRILL-7290), moved to debug unnecessary info logging.
6. Updated bootstrap-storage-plugins.json to reflect actual Kafka connection properties.
7. Added unit tests.
8. Refactoring and code clean up.

closes #1901
diff --git a/contrib/storage-kafka/README.md b/contrib/storage-kafka/README.md
index a63731f..a26c6e1 100644
--- a/contrib/storage-kafka/README.md
+++ b/contrib/storage-kafka/README.md
@@ -211,18 +211,22 @@
 
 - store.kafka.record.reader system option can be used for setting record reader and default is org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
 - Default store.kafka.poll.timeout is set to 200, user has to set this accordingly
-- Custom record reader can be implemented by extending org.apache.drill.exec.store.kafka.decoders.MessageReader and setting store.kafka.record.reader accordinlgy
+- Custom record reader can be implemented by extending org.apache.drill.exec.store.kafka.decoders.MessageReader and setting store.kafka.record.reader accordingly
 
 
-In case of JSON message format, following system options can be used accordingly. More details can be found in [Drill Json Model](https://drill.apache.org/docs/json-data-model/) and in [Drill system options configurations](https://drill.apache.org/docs/configuration-options-introduction/)
+In case of JSON message format, following system / session options can be used accordingly. More details can be found in [Drill Json Model](https://drill.apache.org/docs/json-data
+-model/) and in [Drill system options configurations](https://drill.apache.org/docs/configuration-options-introduction/)
 
-<ui>
+<ul>
   <li>ALTER SESSION SET `store.kafka.record.reader` = 'org.apache.drill.exec.store.kafka.decoders.JsonMessageReader';</li>
   <li>ALTER SESSION SET `store.kafka.poll.timeout` = 200;</li>
   <li>ALTER SESSION SET `exec.enable_union_type` = true; </li>
   <li>ALTER SESSION SET `store.kafka.all_text_mode` = true;</li>
   <li>ALTER SESSION SET `store.kafka.read_numbers_as_double` = true;</li>
-</ui>
+  <li>ALTER SESSION SET `store.kafka.skip_invalid_records` = true;</li>
+  <li>ALTER SESSION SET `store.kafka.allow_nan_inf` = true;</li>
+  <li>ALTER SESSION SET `store.kafka.allow_escape_any_char` = true;</li>
+</ul>
 
 <h4 id="RoadMap">RoadMap</h4>
  <ul>
diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index 83a1a29..0e2f3d6 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -31,7 +31,7 @@
   <name>contrib/kafka-storage-plugin</name>
 
   <properties>
-    <kafka.version>0.11.0.1</kafka.version>
+    <kafka.version>2.3.1</kafka.version>
     <kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite>
   </properties>
 
@@ -64,7 +64,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.11</artifactId>
+      <artifactId>kafka_2.12</artifactId>
       <version>${kafka.version}</version>
       <exclusions>
         <exclusion>
@@ -78,7 +78,7 @@
       </exclusions>
     </dependency>
 
-    <!-- Test dependencie -->
+    <!-- Test dependencies -->
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
       <artifactId>drill-java-exec</artifactId>
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaAsyncCloser.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaAsyncCloser.java
new file mode 100644
index 0000000..c8d74bf
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaAsyncCloser.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Closes Kafka resources asynchronously which result does not depend on close method
+ * in order to improve query execution performance.
+ * For example, {@link org.apache.kafka.clients.consumer.KafkaConsumer}.
+ */
+public class KafkaAsyncCloser implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(KafkaAsyncCloser.class);
+
+  private volatile ExecutorService executorService;
+
+  /**
+   * Closes given resource in separate thread using thread executor.
+   *
+   * @param autoCloseable resource to close
+   */
+  public void close(AutoCloseable autoCloseable) {
+    if (autoCloseable != null) {
+      ExecutorService executorService = executorService();
+      executorService.submit(() -> {
+        try {
+          autoCloseable.close();
+          logger.debug("Closing {} resource", autoCloseable.getClass().getCanonicalName());
+        } catch (Exception e) {
+          logger.debug("Resource {} failed to close: {}", autoCloseable.getClass().getCanonicalName(), e.getMessage());
+        }
+      });
+    }
+  }
+
+  @Override
+  public void close() {
+    if (executorService != null) {
+      logger.trace("Closing Kafka async closer: {}", executorService);
+      executorService.shutdownNow();
+    }
+  }
+
+  /**
+   * Initializes executor service instance using DCL.
+   * Created thread executor instance allows to execute only one thread at a time
+   * but unlike single thread executor does not keep this thread in the pool.
+   * Custom thread factory is used to define Kafka specific thread names.
+   *
+   * @return executor service instance
+   */
+  private ExecutorService executorService() {
+    if (executorService == null) {
+      synchronized (this) {
+        if (executorService == null) {
+          this.executorService = new ThreadPoolExecutor(0, 1, 0L,
+            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new KafkaThreadFactory());
+        }
+      }
+    }
+    return executorService;
+  }
+
+  /**
+   * Wraps default thread factory and adds Kafka closer prefix to the original thread name.
+   * Is used to uniquely identify Kafka closer threads.
+   * Example: drill-kafka-closer-pool-1-thread-1
+   */
+  private static class KafkaThreadFactory implements ThreadFactory {
+
+    private static final String THREAD_PREFIX = "drill-kafka-closer-";
+    private final ThreadFactory delegate = Executors.defaultThreadFactory();
+
+    @Override
+    public Thread newThread(Runnable runnable) {
+      Thread thread = delegate.newThread(runnable);
+      thread.setName(THREAD_PREFIX + thread.getName());
+      return thread;
+    }
+  }
+}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index dded560..e4f255c 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -17,12 +17,14 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
@@ -155,25 +157,28 @@
   private void init() {
     partitionWorkMap = Maps.newHashMap();
     Collection<DrillbitEndpoint> endpoints = kafkaStoragePlugin.getContext().getBits();
-    Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap();
-    for (DrillbitEndpoint endpoint : endpoints) {
-      endpointMap.put(endpoint.getAddress(), endpoint);
-    }
+    Map<String, DrillbitEndpoint> endpointMap = endpoints.stream()
+      .collect(Collectors.toMap(
+        DrillbitEndpoint::getAddress,
+        Function.identity(),
+        (o, n) -> n));
 
     Map<TopicPartition, Long> startOffsetsMap = Maps.newHashMap();
     Map<TopicPartition, Long> endOffsetsMap = Maps.newHashMap();
-    List<PartitionInfo> topicPartitions = null;
+    List<PartitionInfo> topicPartitions;
     String topicName = kafkaScanSpec.getTopicName();
 
-    try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(kafkaStoragePlugin.getConfig().getKafkaConsumerProps(),
-        new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
-      if (!kafkaConsumer.listTopics().keySet().contains(topicName)) {
+    KafkaConsumer<?, ?> kafkaConsumer = null;
+    try {
+      kafkaConsumer = new KafkaConsumer<>(kafkaStoragePlugin.getConfig().getKafkaConsumerProps(),
+        new ByteArrayDeserializer(), new ByteArrayDeserializer());
+      if (!kafkaConsumer.listTopics().containsKey(topicName)) {
         throw UserException.dataReadError()
             .message("Table '%s' does not exist", topicName)
             .build(logger);
       }
 
-      kafkaConsumer.subscribe(Arrays.asList(topicName));
+      kafkaConsumer.subscribe(Collections.singletonList(topicName));
       // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
       // evaluates lazily, seeking to the first/last offset in all partitions only
       // when poll(long) or
@@ -194,8 +199,12 @@
         endOffsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
       }
     } catch (Exception e) {
-      throw UserException.dataReadError(e).message("Failed to fetch start/end offsets of the topic  %s", topicName)
-          .addContext(e.getMessage()).build(logger);
+      throw UserException.dataReadError(e)
+        .message("Failed to fetch start/end offsets of the topic %s", topicName)
+        .addContext(e.getMessage())
+        .build(logger);
+    } finally {
+      kafkaStoragePlugin.registerToClose(kafkaConsumer);
     }
 
     // computes work for each end point
@@ -227,11 +236,10 @@
   @Override
   public KafkaSubScan getSpecificScan(int minorFragmentId) {
     List<PartitionScanWork> workList = assignments.get(minorFragmentId);
-    List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList();
 
-    for (PartitionScanWork work : workList) {
-      scanSpecList.add(work.partitionScanSpec);
-    }
+    List<KafkaPartitionScanSpec> scanSpecList = workList.stream()
+      .map(PartitionScanWork::getPartitionScanSpec)
+      .collect(Collectors.toList());
 
     return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, scanSpecList);
   }
@@ -256,7 +264,7 @@
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
     return new KafkaGroupScan(this);
   }
@@ -286,7 +294,7 @@
     KafkaGroupScan clone = new KafkaGroupScan(this);
     HashSet<TopicPartition> partitionsInSpec = Sets.newHashSet();
 
-    for(KafkaPartitionScanSpec scanSpec : partitionScanSpecList) {
+    for (KafkaPartitionScanSpec scanSpec : partitionScanSpecList) {
       TopicPartition tp = new TopicPartition(scanSpec.getTopicName(), scanSpec.getPartitionId());
       partitionsInSpec.add(tp);
 
@@ -327,10 +335,8 @@
 
   @JsonIgnore
   public List<KafkaPartitionScanSpec> getPartitionScanSpecList() {
-    List<KafkaPartitionScanSpec> partitionScanSpecList = Lists.newArrayList();
-    for (PartitionScanWork work : partitionWorkMap.values()) {
-      partitionScanSpecList.add(work.partitionScanSpec.clone());
-    }
-    return partitionScanSpecList;
+    return partitionWorkMap.values().stream()
+      .map(work -> work.partitionScanSpec.clone())
+      .collect(Collectors.toList());
   }
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
index 92488a2..c0ce1c2 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
@@ -180,7 +180,6 @@
                                           .put("less_than", "greater_than")
                                           .build();
   }
-
 }
 
 
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
index 713f62e..eaef410 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
@@ -57,21 +57,21 @@
     switch (functionName) {
       case "booleanAnd":
         //Reduce the scan range
-        if(startOffset < scanSpec.startOffset) {
+        if (startOffset < scanSpec.startOffset) {
           startOffset = scanSpec.startOffset;
         }
 
-        if(endOffset > scanSpec.endOffset) {
+        if (endOffset > scanSpec.endOffset) {
           endOffset = scanSpec.endOffset;
         }
         break;
       case "booleanOr":
         //Increase the scan range
-        if(scanSpec.startOffset < startOffset) {
+        if (scanSpec.startOffset < startOffset) {
           startOffset = scanSpec.startOffset;
         }
 
-        if(scanSpec.endOffset > endOffset) {
+        if (scanSpec.endOffset > endOffset) {
           endOffset = scanSpec.endOffset;
         }
         break;
@@ -80,7 +80,7 @@
 
   @Override
   public boolean equals(Object obj) {
-    if(obj instanceof KafkaPartitionScanSpec) {
+    if (obj instanceof KafkaPartitionScanSpec) {
       KafkaPartitionScanSpec that = ((KafkaPartitionScanSpec)obj);
       return this.topicName.equals(that.topicName) && this.partitionId == that.partitionId
                  && this.startOffset == that.startOffset && this.endOffset == that.endOffset;
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
index 9fa987a..2734861 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
@@ -17,44 +17,44 @@
  */
 package org.apache.drill.exec.store.kafka;
 
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.drill.common.expression.BooleanOperator;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
-public class KafkaPartitionScanSpecBuilder extends
-    AbstractExprVisitor<List<KafkaPartitionScanSpec>,Void,RuntimeException> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaPartitionScanSpecBuilder.class);
+public class KafkaPartitionScanSpecBuilder
+  extends AbstractExprVisitor<List<KafkaPartitionScanSpec>, Void, RuntimeException>
+  implements AutoCloseable {
+
   private final LogicalExpression le;
   private final KafkaGroupScan groupScan;
   private final KafkaConsumer<?, ?> kafkaConsumer;
   private ImmutableMap<TopicPartition, KafkaPartitionScanSpec> fullScanSpec;
-  private static final long CLOSE_TIMEOUT_MS = 200;
 
   public KafkaPartitionScanSpecBuilder(KafkaGroupScan groupScan, LogicalExpression conditionExp) {
     this.groupScan = groupScan;
-    kafkaConsumer = new KafkaConsumer<>(groupScan.getKafkaStoragePluginConfig().getKafkaConsumerProps(),
+    this.kafkaConsumer = new KafkaConsumer<>(groupScan.getKafkaStoragePluginConfig().getKafkaConsumerProps(),
         new ByteArrayDeserializer(), new ByteArrayDeserializer());
-    le = conditionExp;
+    this.le = conditionExp;
   }
 
   public List<KafkaPartitionScanSpec> parseTree() {
     ImmutableMap.Builder<TopicPartition, KafkaPartitionScanSpec> builder = ImmutableMap.builder();
-    for(KafkaPartitionScanSpec scanSpec : groupScan.getPartitionScanSpecList()) {
+    for (KafkaPartitionScanSpec scanSpec : groupScan.getPartitionScanSpecList()) {
       builder.put(new TopicPartition(scanSpec.getTopicName(), scanSpec.getPartitionId()), scanSpec);
     }
     fullScanSpec = builder.build();
@@ -65,7 +65,7 @@
     This results in a "ScanBatch" with no reader. DRILL currently requires
     at least one reader to be present in a scan batch.
      */
-    if(pushdownSpec != null && pushdownSpec.isEmpty()) {
+    if (pushdownSpec != null && pushdownSpec.isEmpty()) {
       TopicPartition firstPartition = new TopicPartition(groupScan.getKafkaScanSpec().getTopicName(), 0);
       KafkaPartitionScanSpec emptySpec =
           new KafkaPartitionScanSpec(firstPartition.topic(),firstPartition.partition(),
@@ -76,29 +76,26 @@
   }
 
   @Override
-  public List<KafkaPartitionScanSpec> visitUnknown(LogicalExpression e, Void value)
-      throws RuntimeException {
+  public List<KafkaPartitionScanSpec> visitUnknown(LogicalExpression e, Void value) {
     return null;
   }
 
   @Override
-  public List<KafkaPartitionScanSpec> visitBooleanOperator(BooleanOperator op, Void value)
-      throws RuntimeException {
-
+  public List<KafkaPartitionScanSpec> visitBooleanOperator(BooleanOperator op, Void value) {
     Map<TopicPartition, KafkaPartitionScanSpec> specMap = Maps.newHashMap();
     ImmutableList<LogicalExpression> args = op.args;
-    if(op.getName().equals("booleanOr")) {
+    if (op.getName().equals("booleanOr")) {
 
-      for(LogicalExpression expr : args) {
+      for (LogicalExpression expr : args) {
         List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null);
         //parsedSpec is null if expression cannot be pushed down
-        if(parsedSpec != null) {
+        if (parsedSpec != null) {
           for(KafkaPartitionScanSpec newSpec : parsedSpec) {
             TopicPartition tp = new TopicPartition(newSpec.getTopicName(), newSpec.getPartitionId());
             KafkaPartitionScanSpec existingSpec = specMap.get(tp);
 
             //If existing spec does not contain topic-partition
-            if(existingSpec == null) {
+            if (existingSpec == null) {
               specMap.put(tp, newSpec); //Add topic-partition to spec for OR
             } else {
               existingSpec.mergeScanSpec(op.getName(), newSpec);
@@ -111,11 +108,11 @@
       }
     } else { //booleanAnd
       specMap.putAll(fullScanSpec);
-      for(LogicalExpression expr : args) {
+      for (LogicalExpression expr : args) {
         List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null);
 
         //parsedSpec is null if expression cannot be pushed down
-        if(parsedSpec != null) {
+        if (parsedSpec != null) {
           Set<TopicPartition> partitionsInNewSpec = Sets.newHashSet(); //Store topic-partitions returned from new spec.
 
           for (KafkaPartitionScanSpec newSpec : parsedSpec) {
@@ -143,14 +140,12 @@
   }
 
   @Override
-  public List<KafkaPartitionScanSpec> visitFunctionCall(FunctionCall call, Void value)
-      throws RuntimeException {
-
+  public List<KafkaPartitionScanSpec> visitFunctionCall(FunctionCall call, Void value) {
     String functionName = call.getName();
-    if(KafkaNodeProcessor.isPushdownFunction(functionName)) {
+    if (KafkaNodeProcessor.isPushdownFunction(functionName)) {
 
       KafkaNodeProcessor kafkaNodeProcessor = KafkaNodeProcessor.process(call);
-      if(kafkaNodeProcessor.isSuccess()) {
+      if (kafkaNodeProcessor.isSuccess()) {
         switch (kafkaNodeProcessor.getPath()) {
           case "kafkaMsgTimestamp":
             return createScanSpecForTimestamp(kafkaNodeProcessor.getFunctionName(),
@@ -168,22 +163,21 @@
   }
 
 
-  private List<KafkaPartitionScanSpec> createScanSpecForTimestamp(String functionName,
-                                                                  Long fieldValue) {
+  private List<KafkaPartitionScanSpec> createScanSpecForTimestamp(String functionName, Long fieldValue) {
     List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList();
     Map<TopicPartition, Long> timesValMap = Maps.newHashMap();
     ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
 
-    for(TopicPartition partitions : topicPartitions) {
+    for (TopicPartition partitions : topicPartitions) {
       timesValMap.put(partitions, functionName.equals("greater_than") ? fieldValue+1 : fieldValue);
     }
 
     Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = kafkaConsumer.offsetsForTimes(timesValMap);
 
-    for(TopicPartition tp : topicPartitions) {
+    for (TopicPartition tp : topicPartitions) {
       OffsetAndTimestamp value = offsetAndTimestamp.get(tp);
       //OffsetAndTimestamp is null if there is no offset greater or equal to requested timestamp
-      if(value == null) {
+      if (value == null) {
         scanSpec.add(
             new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
                 fullScanSpec.get(tp).getEndOffset(), fullScanSpec.get(tp).getEndOffset()));
@@ -197,8 +191,7 @@
     return scanSpec;
   }
 
-  private List<KafkaPartitionScanSpec> createScanSpecForOffset(String functionName,
-                                                               Long fieldValue) {
+  private List<KafkaPartitionScanSpec> createScanSpecForOffset(String functionName, Long fieldValue) {
     List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList();
     ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
 
@@ -210,8 +203,8 @@
 
     switch (functionName) {
       case "equal":
-        for(TopicPartition tp : topicPartitions) {
-          if(fieldValue < fullScanSpec.get(tp).getStartOffset()) {
+        for (TopicPartition tp : topicPartitions) {
+          if (fieldValue < fullScanSpec.get(tp).getStartOffset()) {
             //Offset does not exist
             scanSpec.add(
                 new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
@@ -224,7 +217,7 @@
         }
         break;
       case "greater_than_or_equal_to":
-        for(TopicPartition tp : topicPartitions) {
+        for (TopicPartition tp : topicPartitions) {
           //Ensure scan range is between startOffset and endOffset,
           long val = bindOffsetToRange(tp, fieldValue);
           scanSpec.add(
@@ -233,16 +226,16 @@
         }
         break;
       case "greater_than":
-        for(TopicPartition tp : topicPartitions) {
+        for (TopicPartition tp : topicPartitions) {
           //Ensure scan range is between startOffset and endOffset,
-          long val = bindOffsetToRange(tp, fieldValue+1);
+          long val = bindOffsetToRange(tp, fieldValue + 1);
           scanSpec.add(
               new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
                   val, fullScanSpec.get(tp).getEndOffset()));
         }
         break;
       case "less_than_or_equal_to":
-        for(TopicPartition tp : topicPartitions) {
+        for (TopicPartition tp : topicPartitions) {
           //Ensure scan range is between startOffset and endOffset,
           long val = bindOffsetToRange(tp, fieldValue+1);
 
@@ -252,7 +245,7 @@
         }
         break;
       case "less_than":
-        for(TopicPartition tp : topicPartitions) {
+        for (TopicPartition tp : topicPartitions) {
           //Ensure scan range is between startOffset and endOffset,
           long val = bindOffsetToRange(tp, fieldValue);
 
@@ -265,15 +258,14 @@
     return scanSpec;
   }
 
-  private List<KafkaPartitionScanSpec> createScanSpecForPartition(String functionName,
-                                                                  Long fieldValue) {
+  private List<KafkaPartitionScanSpec> createScanSpecForPartition(String functionName, Long fieldValue) {
     List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList();
     ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
 
     switch (functionName) {
       case "equal":
-        for(TopicPartition tp : topicPartitions) {
-          if(tp.partition() == fieldValue) {
+        for (TopicPartition tp : topicPartitions) {
+          if (tp.partition() == fieldValue) {
             scanSpecList.add(
                 new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
                     fullScanSpec.get(tp).getStartOffset(),
@@ -282,8 +274,8 @@
         }
         break;
       case "not_equal":
-        for(TopicPartition tp : topicPartitions) {
-          if(tp.partition() != fieldValue) {
+        for (TopicPartition tp : topicPartitions) {
+          if (tp.partition() != fieldValue) {
             scanSpecList.add(
                 new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
                     fullScanSpec.get(tp).getStartOffset(),
@@ -292,8 +284,8 @@
         }
         break;
       case "greater_than_or_equal_to":
-        for(TopicPartition tp : topicPartitions) {
-          if(tp.partition() >= fieldValue) {
+        for (TopicPartition tp : topicPartitions) {
+          if (tp.partition() >= fieldValue) {
             scanSpecList.add(
                 new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
                     fullScanSpec.get(tp).getStartOffset(),
@@ -302,8 +294,8 @@
         }
         break;
       case "greater_than":
-        for(TopicPartition tp : topicPartitions) {
-          if(tp.partition() > fieldValue) {
+        for (TopicPartition tp : topicPartitions) {
+          if (tp.partition() > fieldValue) {
             scanSpecList.add(
                 new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
                     fullScanSpec.get(tp).getStartOffset(),
@@ -312,8 +304,8 @@
         }
         break;
       case "less_than_or_equal_to":
-        for(TopicPartition tp : topicPartitions) {
-          if(tp.partition() <= fieldValue) {
+        for (TopicPartition tp : topicPartitions) {
+          if (tp.partition() <= fieldValue) {
             scanSpecList.add(
                 new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
                     fullScanSpec.get(tp).getStartOffset(),
@@ -322,8 +314,8 @@
         }
         break;
       case "less_than":
-        for(TopicPartition tp : topicPartitions) {
-          if(tp.partition() < fieldValue) {
+        for (TopicPartition tp : topicPartitions) {
+          if (tp.partition() < fieldValue) {
             scanSpecList.add(
                 new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
                     fullScanSpec.get(tp).getStartOffset(),
@@ -335,8 +327,9 @@
     return scanSpecList;
   }
 
-  void close() {
-    kafkaConsumer.close(CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+  @Override
+  public void close() {
+    groupScan.getStoragePlugin().registerToClose(kafkaConsumer);
   }
 
   private long bindOffsetToRange(TopicPartition tp, long offset) {
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
index 002d043..443650e 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
@@ -30,10 +30,14 @@
 import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 
 public class KafkaPushDownFilterIntoScan extends StoragePluginOptimizerRule {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaPushDownFilterIntoScan.class);
+
+  private static final Logger logger = LoggerFactory.getLogger(KafkaPushDownFilterIntoScan.class);
 
   public static final StoragePluginOptimizerRule INSTANCE =
       new KafkaPushDownFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
@@ -53,18 +57,21 @@
         DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
 
     KafkaGroupScan groupScan = (KafkaGroupScan) scan.getGroupScan();
-    logger.info("Partitions ScanSpec before pushdown: " + groupScan.getPartitionScanSpecList());
-    KafkaPartitionScanSpecBuilder builder = new KafkaPartitionScanSpecBuilder(groupScan, conditionExp);
-    List<KafkaPartitionScanSpec> newScanSpec = null;
-    newScanSpec = builder.parseTree();
-    builder.close(); //Close consumer
+    if (logger.isDebugEnabled()) {
+      logger.debug("Partitions ScanSpec before push down: {}", groupScan.getPartitionScanSpecList());
+    }
+
+    List<KafkaPartitionScanSpec> newScanSpec;
+    try (KafkaPartitionScanSpecBuilder builder = new KafkaPartitionScanSpecBuilder(groupScan, conditionExp)) {
+      newScanSpec = builder.parseTree();
+    }
 
     //No pushdown
-    if(newScanSpec == null) {
+    if (newScanSpec == null) {
       return;
     }
 
-    logger.info("Partitions ScanSpec after pushdown: " + newScanSpec);
+    logger.debug("Partitions ScanSpec after pushdown: {}", newScanSpec);
     GroupScan newGroupScan = groupScan.cloneWithNewSpec(newScanSpec);
     final ScanPrel newScanPrel =
       new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
@@ -73,7 +80,7 @@
 
   @Override
   public boolean matches(RelOptRuleCall call) {
-    final ScanPrel scan = (ScanPrel) call.rel(1);
+    final ScanPrel scan = call.rel(1);
     if (scan.getGroupScan() instanceof KafkaGroupScan) {
       return super.matches(call);
     }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index 62e588c..5218c3b 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -17,19 +17,18 @@
  */
 package org.apache.drill.exec.store.kafka;
 
+import java.io.IOException;
 import java.util.Collection;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
@@ -40,61 +39,48 @@
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 public class KafkaRecordReader extends AbstractRecordReader {
   private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
-  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private final ReadOptions readOptions;
+  private final KafkaStoragePlugin plugin;
+  private final KafkaPartitionScanSpec subScanSpec;
 
   private VectorContainerWriter writer;
   private MessageReader messageReader;
 
-  private final boolean unionEnabled;
-  private final KafkaStoragePlugin plugin;
-  private final KafkaPartitionScanSpec subScanSpec;
-  private final long kafkaPollTimeOut;
-
   private long currentOffset;
   private MessageIterator msgItr;
-
-  private final boolean enableAllTextMode;
-  private final boolean readNumbersAsDouble;
-  private final String kafkaMsgReader;
   private int currentMessageCount;
 
   public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
       FragmentContext context, KafkaStoragePlugin plugin) {
     setColumns(projectedColumns);
-    final OptionManager optionManager = context.getOptions();
-    this.enableAllTextMode = optionManager.getBoolean(ExecConstants.KAFKA_ALL_TEXT_MODE);
-    this.readNumbersAsDouble = optionManager.getBoolean(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE);
-    this.unionEnabled = optionManager.getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
-    this.kafkaMsgReader = optionManager.getString(ExecConstants.KAFKA_RECORD_READER);
-    this.kafkaPollTimeOut = optionManager.getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
+    this.readOptions = new ReadOptions(context.getOptions());
     this.plugin = plugin;
     this.subScanSpec = subScanSpec;
   }
 
   @Override
   protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
-    Set<SchemaPath> transformed = Sets.newLinkedHashSet();
-    if (!isStarQuery()) {
-      for (SchemaPath column : projectedColumns) {
-        transformed.add(column);
-      }
-    } else {
+    Set<SchemaPath> transformed = new LinkedHashSet<>();
+    if (isStarQuery()) {
       transformed.add(SchemaPath.STAR_COLUMN);
+    } else {
+      transformed.addAll(projectedColumns);
     }
     return transformed;
   }
 
   @Override
-  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
-    this.writer = new VectorContainerWriter(output, unionEnabled);
-    messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader);
-    messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), this.writer,
-        this.enableAllTextMode, this.readNumbersAsDouble);
-    msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, kafkaPollTimeOut);
+  public void setup(OperatorContext context, OutputMutator output) {
+    this.writer = new VectorContainerWriter(output, readOptions.isEnableUnionType());
+    messageReader = MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
+    messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), writer, readOptions);
+    msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, readOptions.getPollTimeOut());
   }
 
   /**
@@ -105,17 +91,20 @@
   public int next() {
     writer.allocate();
     writer.reset();
-    Stopwatch watch = Stopwatch.createStarted();
+    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
     currentMessageCount = 0;
 
     try {
-      while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) {
+      while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
         ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
         currentOffset = consumerRecord.offset();
         writer.setPosition(currentMessageCount);
-        messageReader.readMessage(consumerRecord);
-        if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
-          break;
+        boolean status = messageReader.readMessage(consumerRecord);
+        // increment record count only if message was read successfully
+        if (status) {
+          if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
+            break;
+          }
         }
       }
 
@@ -123,33 +112,35 @@
         messageReader.ensureAtLeastOneField();
       }
       writer.setValueCount(currentMessageCount);
-      logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
+      if (watch != null) {
+        logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
+      }
       logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
           currentOffset);
       return currentMessageCount;
     } catch (Exception e) {
-      String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (currentMessageCount + 1);
-      throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger);
+      String msg = "Failure while reading messages from kafka. Record reader was at record: " + (currentMessageCount + 1);
+      throw UserException.dataReadError(e)
+        .message(msg)
+        .addContext(e.getMessage())
+        .build(logger);
     }
   }
 
   @Override
-  public void close() throws Exception {
-    logger.info("Last offset processed for {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
+  public void close() throws IOException {
+    logger.debug("Last offset processed for {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
         currentOffset);
-    logger.info("Total time to fetch messages from {}:{} is - {} milliseconds", subScanSpec.getTopicName(),
+    logger.debug("Total time to fetch messages from {}:{} is - {} milliseconds", subScanSpec.getTopicName(),
         subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
+    plugin.registerToClose(msgItr);
     messageReader.close();
   }
 
   @Override
   public String toString() {
-    return "KafkaRecordReader[messageReader=" + messageReader
-        + ", kafkaPollTimeOut=" + kafkaPollTimeOut
+    return "KafkaRecordReader[readOptions=" + readOptions
         + ", currentOffset=" + currentOffset
-        + ", enableAllTextMode=" + enableAllTextMode
-        + ", readNumbersAsDouble=" + readNumbersAsDouble
-        + ", kafkaMsgReader=" + kafkaMsgReader
         + ", currentMessageCount=" + currentMessageCount
         + "]";
   }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index ce71531..8083e33 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -17,8 +17,8 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.util.LinkedList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -35,7 +35,7 @@
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> {
-  static final Logger logger = LoggerFactory.getLogger(KafkaScanBatchCreator.class);
+  private static final Logger logger = LoggerFactory.getLogger(KafkaScanBatchCreator.class);
 
   @Override
   public CloseableRecordBatch getBatch(ExecutorFragmentContext context, KafkaSubScan subScan, List<RecordBatch> children)
@@ -43,13 +43,11 @@
     Preconditions.checkArgument(children.isEmpty());
     List<SchemaPath> columns = subScan.getColumns() != null ? subScan.getColumns() : GroupScan.ALL_COLUMNS;
 
-    List<RecordReader> readers = new LinkedList<>();
-    for (KafkaPartitionScanSpec scanSpec : subScan.getPartitionSubScanSpecList()) {
-      readers.add(new KafkaRecordReader(scanSpec, columns, context, subScan.getKafkaStoragePlugin()));
-    }
+    List<RecordReader> readers = subScan.getPartitionSubScanSpecList().stream()
+      .map(scanSpec -> new KafkaRecordReader(scanSpec, columns, context, subScan.getKafkaStoragePlugin()))
+      .collect(Collectors.toList());
 
-    logger.info("Number of record readers initialized : {}", readers.size());
+    logger.debug("Number of record readers initialized : {}", readers.size());
     return new ScanBatch(subScan, context, readers);
   }
-
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
index 91c8fdf..d059099 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
@@ -21,7 +21,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class KafkaScanSpec {
-  private String topicName;
+  private final String topicName;
 
   @JsonCreator
   public KafkaScanSpec(@JsonProperty("topicName") String topicName) {
@@ -36,5 +36,4 @@
   public String toString() {
     return "KafkaScanSpec [topicName=" + topicName + "]";
   }
-
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
index 2d45b89..257c0bf 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -22,7 +22,6 @@
 
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.JSONOptions;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -30,28 +29,26 @@
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.kafka.schema.KafkaSchemaFactory;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
-import org.apache.drill.shaded.guava.com.google.common.io.Closer;
 
 public class KafkaStoragePlugin extends AbstractStoragePlugin {
 
   private static final Logger logger = LoggerFactory.getLogger(KafkaStoragePlugin.class);
   private final KafkaSchemaFactory kafkaSchemaFactory;
   private final KafkaStoragePluginConfig config;
-  private final Closer closer = Closer.create();
+  private final KafkaAsyncCloser closer;
 
-  public KafkaStoragePlugin(KafkaStoragePluginConfig config, DrillbitContext context, String name)
-      throws ExecutionSetupException {
+  public KafkaStoragePlugin(KafkaStoragePluginConfig config, DrillbitContext context, String name) {
     super(context, name);
     logger.debug("Initializing {}", KafkaStoragePlugin.class.getName());
     this.config = config;
     this.kafkaSchemaFactory = new KafkaSchemaFactory(this, name);
+    this.closer = new KafkaAsyncCloser();
   }
 
   @Override
@@ -65,7 +62,7 @@
   }
 
   @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
     this.kafkaSchemaFactory.registerSchemas(schemaConfig, parent);
   }
 
@@ -75,21 +72,19 @@
   }
 
   @Override
-  public AbstractGroupScan getPhysicalScan(String userName,
-      JSONOptions selection) throws IOException {
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
     KafkaScanSpec kafkaScanSpec = selection.getListWith(new ObjectMapper(),
         new TypeReference<KafkaScanSpec>() {
         });
     return new KafkaGroupScan(this, kafkaScanSpec, null);
   }
 
-  public KafkaConsumer<byte[], byte[]> registerConsumer(KafkaConsumer<byte[], byte[]> consumer) {
-    return closer.register(consumer);
+  public void registerToClose(AutoCloseable autoCloseable) {
+    closer.close(autoCloseable);
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     closer.close();
   }
-
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
index 94afa5f..7feb4d9 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
@@ -18,7 +18,9 @@
 package org.apache.drill.exec.store.kafka;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
+import java.util.StringJoiner;
 
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.slf4j.Logger;
@@ -33,7 +35,8 @@
 
   private static final Logger logger = LoggerFactory.getLogger(KafkaStoragePluginConfig.class);
   public static final String NAME = "kafka";
-  private Properties kafkaConsumerProps;
+
+  private final Properties kafkaConsumerProps;
 
   @JsonCreator
   public KafkaStoragePluginConfig(@JsonProperty("kafkaConsumerProps") Map<String, String> kafkaConsumerProps) {
@@ -48,31 +51,25 @@
 
   @Override
   public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((kafkaConsumerProps == null) ? 0 : kafkaConsumerProps.hashCode());
-    return result;
+    return Objects.hash(kafkaConsumerProps);
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
+  public boolean equals(Object o) {
+    if (this == o) {
       return true;
     }
-    if (obj == null) {
+    if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    KafkaStoragePluginConfig other = (KafkaStoragePluginConfig) obj;
-    if (kafkaConsumerProps == null && other.kafkaConsumerProps == null) {
-      return true;
-    }
-    if (kafkaConsumerProps == null || other.kafkaConsumerProps == null) {
-      return false;
-    }
-    return kafkaConsumerProps.equals(other.kafkaConsumerProps);
+    KafkaStoragePluginConfig that = (KafkaStoragePluginConfig) o;
+    return kafkaConsumerProps.equals(that.kafkaConsumerProps);
   }
 
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", KafkaStoragePluginConfig.class.getSimpleName() + "[", "]")
+      .add("kafkaConsumerProps=" + kafkaConsumerProps)
+      .toString();
+  }
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
index 67350df..6ea9e1d 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -74,7 +74,7 @@
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
     return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, partitionSubScanSpecList);
   }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
index 7373e2c..68855ce 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.kafka;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -31,11 +32,10 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 import kafka.common.KafkaException;
 
-public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
+public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>>, AutoCloseable {
 
   private static final Logger logger = LoggerFactory.getLogger(MessageIterator.class);
   private final KafkaConsumer<byte[], byte[]> kafkaConsumer;
@@ -50,11 +50,11 @@
     this.kafkaConsumer = kafkaConsumer;
     this.kafkaPollTimeOut = kafkaPollTimeOut;
 
-    List<TopicPartition> partitions = Lists.newArrayListWithCapacity(1);
+    List<TopicPartition> partitions = new ArrayList<>(1);
     topicPartition = new TopicPartition(subScanSpec.getTopicName(), subScanSpec.getPartitionId());
     partitions.add(topicPartition);
     this.kafkaConsumer.assign(partitions);
-    logger.info("Start offset of {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
+    logger.debug("Start offset of {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
         subScanSpec.getStartOffset());
     this.kafkaConsumer.seek(topicPartition, subScanSpec.getStartOffset());
     this.endOffset = subScanSpec.getEndOffset();
@@ -76,38 +76,54 @@
       return false;
     }
 
-    ConsumerRecords<byte[], byte[]> consumerRecords = null;
-    Stopwatch stopwatch = Stopwatch.createStarted();
+    ConsumerRecords<byte[], byte[]> consumerRecords;
+    Stopwatch stopwatch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
     try {
       consumerRecords = kafkaConsumer.poll(kafkaPollTimeOut);
     } catch (KafkaException ke) {
-      logger.error(ke.getMessage(), ke);
       throw UserException.dataReadError(ke).message(ke.getMessage()).build(logger);
+    } finally {
+      if (stopwatch != null) {
+        stopwatch.stop();
+      }
     }
-    stopwatch.stop();
 
     if (consumerRecords.isEmpty()) {
-      String errorMsg = new StringBuilder().append("Failed to fetch messages within ").append(kafkaPollTimeOut)
-          .append(" milliseconds. Consider increasing the value of the property : ")
-          .append(ExecConstants.KAFKA_POLL_TIMEOUT).toString();
-      throw UserException.dataReadError().message(errorMsg).build(logger);
+      throw UserException.dataReadError()
+        .message("Failed to fetch messages within %s milliseconds. " +
+          "Consider increasing the value of the property: %s",
+          kafkaPollTimeOut, ExecConstants.KAFKA_POLL_TIMEOUT)
+        .build(logger);
     }
 
-    long lastFetchTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
-    logger.debug("Total number of messages fetched : {}", consumerRecords.count());
-    logger.debug("Time taken to fetch : {} milliseconds", lastFetchTime);
-    totalFetchTime += lastFetchTime;
+    if (stopwatch != null) {
+      long lastFetchTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+      logger.debug("Time taken to fetch : {} milliseconds", lastFetchTime);
+      totalFetchTime += lastFetchTime;
+      logger.debug("Total number of messages fetched : {}", consumerRecords.count());
+    }
 
     recordIter = consumerRecords.iterator();
     return recordIter.hasNext();
   }
 
+  /**
+   * Returns total fetch time of the messages from topic.
+   * Only applicable if debug log level is enabled.
+   *
+   * @return calculated total fetch time if debug log level is enabled, 0 otherwise
+   */
   public long getTotalFetchTime() {
-    return this.totalFetchTime;
+    return totalFetchTime;
   }
 
   @Override
   public ConsumerRecord<byte[], byte[]> next() {
     return recordIter.next();
   }
+
+  @Override
+  public void close() {
+    kafkaConsumer.close();
+  }
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
index cdaee9b..af3e163 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
@@ -22,12 +22,16 @@
  * It is expected that one should not modify the fieldName of each constant as it breaks the compatibility.
  */
 public enum MetaDataField {
-  KAFKA_TOPIC("kafkaTopic"), KAFKA_PARTITION_ID("kafkaPartitionId"), KAFKA_OFFSET("kafkaMsgOffset"), KAFKA_TIMESTAMP(
-      "kafkaMsgTimestamp"), KAFKA_MSG_KEY("kafkaMsgKey");
+
+  KAFKA_TOPIC("kafkaTopic"),
+  KAFKA_PARTITION_ID("kafkaPartitionId"),
+  KAFKA_OFFSET("kafkaMsgOffset"),
+  KAFKA_TIMESTAMP("kafkaMsgTimestamp"),
+  KAFKA_MSG_KEY("kafkaMsgKey");
 
   private final String fieldName;
 
-  private MetaDataField(final String fieldName) {
+  MetaDataField(final String fieldName) {
     this.fieldName = fieldName;
   }
 
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/ReadOptions.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/ReadOptions.java
new file mode 100644
index 0000000..ea9ed0d
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/ReadOptions.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionManager;
+
+import java.util.StringJoiner;
+
+/**
+ * Holds all system / session options that are used during data read from Kafka.
+ */
+public class ReadOptions {
+
+  private final String messageReader;
+  private final long pollTimeOut;
+  private final boolean allTextMode;
+  private final boolean readNumbersAsDouble;
+  private final boolean enableUnionType;
+  private final boolean skipInvalidRecords;
+  private final boolean allowNanInf;
+  private final boolean allowEscapeAnyChar;
+
+  public ReadOptions(OptionManager optionManager) {
+    this.messageReader = optionManager.getString(ExecConstants.KAFKA_RECORD_READER);
+    this.pollTimeOut = optionManager.getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
+    this.allTextMode = optionManager.getBoolean(ExecConstants.KAFKA_ALL_TEXT_MODE);
+    this.readNumbersAsDouble = optionManager.getBoolean(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE);
+    this.enableUnionType = optionManager.getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
+    this.skipInvalidRecords = optionManager.getBoolean(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS);
+    this.allowNanInf = optionManager.getBoolean(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS);
+    this.allowEscapeAnyChar = optionManager.getBoolean(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR);
+  }
+
+  public String getMessageReader() {
+    return messageReader;
+  }
+
+  public long getPollTimeOut() {
+    return pollTimeOut;
+  }
+
+  public boolean isAllTextMode() {
+    return allTextMode;
+  }
+
+  public boolean isReadNumbersAsDouble() {
+    return readNumbersAsDouble;
+  }
+
+  public boolean isEnableUnionType() {
+    return enableUnionType;
+  }
+
+  public boolean isSkipInvalidRecords() {
+    return skipInvalidRecords;
+  }
+
+  public boolean isAllowNanInf() {
+    return allowNanInf;
+  }
+
+  public boolean isAllowEscapeAnyChar() {
+    return allowEscapeAnyChar;
+  }
+
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", ReadOptions.class.getSimpleName() + "[", "]")
+      .add("messageReader='" + messageReader + "'")
+      .add("pollTimeOut=" + pollTimeOut)
+      .add("allTextMode=" + allTextMode)
+      .add("readNumbersAsDouble=" + readNumbersAsDouble)
+      .add("enableUnionType=" + enableUnionType)
+      .add("skipInvalidRecords=" + skipInvalidRecords)
+      .add("allowNanInf=" + allowNanInf)
+      .add("allowEscapeAnyChar=" + allowEscapeAnyChar)
+      .toString();
+  }
+}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index 40e9e12..eb503aa 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -26,9 +26,16 @@
 import java.io.IOException;
 import java.util.List;
 
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.easy.json.JsonProcessor;
+import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.ReadOptions;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -38,49 +45,65 @@
 import org.slf4j.LoggerFactory;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
 
 import io.netty.buffer.DrillBuf;
 
 /**
  * MessageReader class which will convert ConsumerRecord into JSON and writes to
  * VectorContainerWriter of JsonReader
- *
  */
 public class JsonMessageReader implements MessageReader {
 
   private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class);
   private JsonReader jsonReader;
   private VectorContainerWriter writer;
+  private ObjectMapper objectMapper;
 
   @Override
-  public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, boolean allTextMode,
-      boolean readNumbersAsDouble) {
+  public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, ReadOptions readOptions) {
     // set skipOuterList to false as it doesn't applicable for JSON records and it's only applicable for JSON files.
     this.jsonReader = new JsonReader.Builder(buf)
-            .schemaPathColumns(columns)
-            .allTextMode(allTextMode)
-            .readNumbersAsDouble(readNumbersAsDouble)
-            .build();
+      .schemaPathColumns(columns)
+      .allTextMode(readOptions.isAllTextMode())
+      .readNumbersAsDouble(readOptions.isReadNumbersAsDouble())
+      .enableNanInf(readOptions.isAllowNanInf())
+      .enableEscapeAnyChar(readOptions.isAllowEscapeAnyChar())
+      .build();
+    jsonReader.setIgnoreJSONParseErrors(readOptions.isSkipInvalidRecords());
     this.writer = writer;
+    this.objectMapper = BaseJsonProcessor.getDefaultMapper()
+      .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, readOptions.isAllowNanInf())
+      .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, readOptions.isAllowEscapeAnyChar());
   }
 
   @Override
-  public void readMessage(ConsumerRecord<?, ?> record) {
+  public boolean readMessage(ConsumerRecord<?, ?> record) {
+    byte[] recordArray = (byte[]) record.value();
+    String data = new String(recordArray, Charsets.UTF_8);
     try {
-      byte[] recordArray = (byte[]) record.value();
-      JsonObject jsonObj = (new JsonParser()).parse(new String(recordArray, Charsets.UTF_8)).getAsJsonObject();
-      jsonObj.addProperty(KAFKA_TOPIC.getFieldName(), record.topic());
-      jsonObj.addProperty(KAFKA_PARTITION_ID.getFieldName(), record.partition());
-      jsonObj.addProperty(KAFKA_OFFSET.getFieldName(), record.offset());
-      jsonObj.addProperty(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
-      jsonObj.addProperty(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? record.key().toString() : null);
-      jsonReader.setSource(jsonObj.toString().getBytes(Charsets.UTF_8));
-      jsonReader.write(writer);
-    } catch (IOException e) {
-      throw UserException.dataReadError(e).message(e.getMessage())
-          .addContext("MessageReader", JsonMessageReader.class.getName()).build(logger);
+      JsonNode jsonNode = objectMapper.readTree(data);
+      if (jsonNode != null && jsonNode.isObject()) {
+        ObjectNode objectNode = (ObjectNode) jsonNode;
+        objectNode.put(KAFKA_TOPIC.getFieldName(), record.topic());
+        objectNode.put(KAFKA_PARTITION_ID.getFieldName(), record.partition());
+        objectNode.put(KAFKA_OFFSET.getFieldName(), record.offset());
+        objectNode.put(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
+        objectNode.put(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? record.key().toString() : null);
+      } else {
+        throw new IOException("Unsupported node type: " + (jsonNode == null ? "NO CONTENT" : jsonNode.getNodeType()));
+      }
+      jsonReader.setSource(jsonNode);
+      return convertJsonReadState(jsonReader.write(writer));
+    } catch (IOException | IllegalArgumentException e) {
+      String message = String.format("JSON record %s: %s", data, e.getMessage());
+      if (jsonReader.ignoreJSONParseError()) {
+        logger.debug("Skipping {}", message, e);
+        return false;
+      }
+      throw UserException.dataReadError(e)
+        .message("Failed to read " + message)
+        .addContext("MessageReader", JsonMessageReader.class.getName())
+        .build(logger);
     }
   }
 
@@ -91,17 +114,17 @@
 
   @Override
   public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin) {
-    return plugin.registerConsumer(new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps(),
-        new ByteArrayDeserializer(), new ByteArrayDeserializer()));
+    return new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps(),
+      new ByteArrayDeserializer(), new ByteArrayDeserializer());
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     this.writer.clear();
     try {
       this.writer.close();
     } catch (Exception e) {
-      logger.warn("Error while closing JsonMessageReader", e);
+      logger.warn("Error while closing JsonMessageReader: {}", e.getMessage());
     }
   }
 
@@ -109,4 +132,24 @@
   public String toString() {
     return "JsonMessageReader[jsonReader=" + jsonReader + "]";
   }
+
+  /**
+   * Converts {@link JsonProcessor.ReadState} into true / false result.
+   *
+   * @param jsonReadState JSON reader read state
+   * @return true if read was successful, false otherwise
+   * @throws IllegalArgumentException if unexpected read state was encountered
+   */
+  private boolean convertJsonReadState(JsonProcessor.ReadState jsonReadState) {
+    switch (jsonReadState) {
+      case WRITE_SUCCEED:
+      case END_OF_STREAM:
+        return true;
+      case JSON_RECORD_PARSE_ERROR:
+      case JSON_RECORD_PARSE_EOF_ERROR:
+        return false;
+      default:
+        throw new IllegalArgumentException("Unexpected JSON read state: " + jsonReadState);
+    }
+  }
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
index 510a520..f925fce 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
@@ -22,6 +22,7 @@
 
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.ReadOptions;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -34,12 +35,11 @@
  */
 public interface MessageReader extends Closeable {
 
-  public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, boolean allTextMode,
-      boolean readNumbersAsDouble);
+  void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, ReadOptions readOptions);
 
-  public void readMessage(ConsumerRecord<?, ?> message);
+  boolean readMessage(ConsumerRecord<?, ?> message);
 
-  public void ensureAtLeastOneField();
+  void ensureAtLeastOneField();
 
-  public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin);
+  KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin);
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
index cd83f96..3c1cc78 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
@@ -26,7 +26,7 @@
   private static final Logger logger = LoggerFactory.getLogger(MessageReaderFactory.class);
 
   /**
-   * Initialize kafka message reader beased on store.kafka.record.reader session
+   * Initialize kafka message reader based on store.kafka.record.reader session
    * property
    *
    * @param messageReaderKlass
@@ -47,7 +47,7 @@
       Class<?> klass = Class.forName(messageReaderKlass);
       if (MessageReader.class.isAssignableFrom(klass)) {
         messageReader = (MessageReader) klass.newInstance();
-        logger.info("Initialized Message Reader : {}", messageReader);
+        logger.debug("Initialized Message Reader : {}", messageReader);
       }
     } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
       throw UserException.validationError().message("Failed to initialize message reader : %s", messageReaderKlass)
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
index 034927a..3fac096 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.kafka.schema;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -33,18 +34,15 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-
 public class KafkaMessageSchema extends AbstractSchema {
 
   private static final Logger logger = LoggerFactory.getLogger(KafkaMessageSchema.class);
   private final KafkaStoragePlugin plugin;
-  private final Map<String, DrillTable> drillTables = Maps.newHashMap();
+  private final Map<String, DrillTable> drillTables = new HashMap<>();
   private Set<String> tableNames;
 
   public KafkaMessageSchema(final KafkaStoragePlugin plugin, final String name) {
-    super(ImmutableList.of(), name);
+    super(Collections.emptyList(), name);
     this.plugin = plugin;
   }
 
@@ -73,11 +71,15 @@
   @Override
   public Set<String> getTableNames() {
     if (tableNames == null) {
-      try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps())) {
+      KafkaConsumer<?, ?> kafkaConsumer = null;
+      try {
+        kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps());
         tableNames = kafkaConsumer.listTopics().keySet();
       } catch (Exception e) {
         logger.warn("Failure while loading table names for database '{}': {}", getName(), e.getMessage(), e.getCause());
         return Collections.emptySet();
+      } finally {
+        plugin.registerToClose(kafkaConsumer);
       }
     }
     return tableNames;
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
index 86ef095..2ee5e88 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.store.kafka.schema;
 
-import java.io.IOException;
-
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.exec.store.AbstractSchemaFactory;
 import org.apache.drill.exec.store.SchemaConfig;
@@ -34,10 +32,9 @@
   }
 
   @Override
-  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
     KafkaMessageSchema schema = new KafkaMessageSchema(plugin, getName());
     SchemaPlus hPlus = parent.add(getName(), schema);
     schema.setHolder(hPlus);
   }
-
 }
diff --git a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
index 3c3142f..199332f 100644
--- a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
@@ -2,7 +2,15 @@
   "storage":{
     "kafka" : {
       "type":"kafka",
-      "kafkaConsumerProps": {"bootstrap.servers":"localhost:9092", "group.id" : "drill-consumer"},
+      "kafkaConsumerProps": {
+        "bootstrap.servers": "localhost:9092",
+        "group.id": "drill-query-consumer",
+        "key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+        "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+        "session.timeout.ms": "30000",
+        "enable.auto.commit": "true",
+        "auto.offset.reset": "earliest"
+      },
       "enabled": false
     }
   }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
index 8e8319b..4bfd5d4 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
@@ -27,7 +27,7 @@
 
 import static org.apache.drill.exec.store.kafka.TestKafkaSuit.NUM_JSON_MSG;
 import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 @Category({KafkaStorageTest.class, SlowTest.class})
 public class KafkaFilterPushdownTest extends KafkaTestBase {
@@ -43,13 +43,11 @@
     generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_JSON_MSG);
     String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_PUSHDOWN_TOPIC);
     //Ensure messages are present
-    assertTrue("Kafka server does not have expected number of messages",
-        testSql(query) == NUM_PARTITIONS * NUM_JSON_MSG);
+    assertEquals("Kafka server does not have expected number of messages", testSql(query), NUM_PARTITIONS * NUM_JSON_MSG);
   }
 
   /**
    * Test filter pushdown with condition on kafkaMsgOffset.
-   * @throws Exception
    */
   @Test
   public void testPushdownOnOffset() throws Exception {
@@ -67,7 +65,6 @@
 
   /**
    * Test filter pushdown with condition on kafkaPartitionId.
-   * @throws Exception
    */
   @Test
   public void testPushdownOnPartition() throws Exception {
@@ -84,7 +81,6 @@
 
   /**
    * Test filter pushdown with condition on kafkaPartitionId.
-   * @throws Exception
    */
   @Test
   public void testPushdownOnTimestamp() throws Exception {
@@ -101,7 +97,6 @@
 
   /**
    * Test filter pushdown when timestamp is not ordered.
-   * @throws Exception
    */
   @Test
   public void testPushdownUnorderedTimestamp() throws Exception {
@@ -119,7 +114,6 @@
 
   /**
    * Test filter pushdown when timestamp value specified does not exist.
-   * @throws Exception
    */
   @Test
   public void testPushdownWhenTimestampDoesNotExist() throws Exception {
@@ -136,7 +130,6 @@
 
   /**
    * Test filter pushdown when partition value specified does not exist.
-   * @throws Exception
    */
   @Test
   public void testPushdownWhenPartitionDoesNotExist() throws Exception {
@@ -153,7 +146,6 @@
 
   /**
    * Test filter pushdown when timestamp exist but partition does not exist.
-   * @throws Exception
    */
   @Test
   public void testPushdownForEmptyScanSpec() throws Exception {
@@ -172,7 +164,6 @@
   /**
    * Test filter pushdown on kafkaMsgOffset with boundary conditions.
    * In every case, the number of records returned is 0.
-   * @throws Exception
    */
   @Test
   public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws Exception {
@@ -230,7 +221,6 @@
   /**
    * Test filter pushdown on kafkaMsgOffset with boundary conditions.
    * In every case, the number of records returned is 5 (1 per topic-partition).
-   * @throws Exception
    */
   @Test
   public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws Exception {
@@ -264,7 +254,6 @@
   /**
    * Test filter pushdown with OR.
    * Pushdown is possible if all the predicates are on metadata fields.
-   * @throws Exception
    */
   @Test
   public void testPushdownWithOr() throws Exception {
@@ -282,7 +271,6 @@
 
   /**
    * Test filter pushdown with OR on kafkaMsgTimestamp and kafkaMsgOffset.
-   * @throws Exception
    */
   @Test
   public void testPushdownWithOr1() throws Exception {
@@ -301,8 +289,6 @@
 
   /**
    * Test pushdown for a combination of AND and OR.
-   *
-   * @throws Exception
    */
   @Test
   public void testPushdownWithAndOrCombo() throws Exception {
@@ -321,7 +307,6 @@
 
   /**
    * Test pushdown for a combination of AND and OR.
-   * @throws Exception
    */
   @Test
   public void testPushdownWithAndOrCombo2() throws Exception {
@@ -343,7 +328,6 @@
   /**
    * Test pushdown for predicate1 AND predicate2.
    * Where predicate1 is on metadata field and and predicate2 is on user fields.
-   * @throws Exception
    */
   @Test
   public void testPushdownTimestampWithNonMetaField() throws Exception {
@@ -363,7 +347,6 @@
   /**
    * Tests that pushdown does not happen for predicates such as
    * non-metadata-field = val1 OR (kafkaMsgTimestamp > val2 AND kafkaMsgTimestamp < val4)
-   * @throws Exception
    */
   @Test
   public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
@@ -380,5 +363,4 @@
     PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
         new String[] {String.format(EXPECTED_PATTERN, expectedRowCountInPlan)}, null);
   }
-
 }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index d094531..745edb5 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -30,7 +30,6 @@
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -67,39 +66,38 @@
   }
 
   public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException {
-    KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(producerProperties);
-    Schema.Parser parser = new Schema.Parser();
-    Schema schema = parser.parse(Resources.getResource("drill-avro-test.avsc").openStream());
-    GenericRecordBuilder builder = new GenericRecordBuilder(schema);
-    Random rand = new Random();
-    for (int i = 0; i < numMsg; ++i) {
-      builder.set("key1", UUID.randomUUID().toString());
-      builder.set("key2", rand.nextInt());
-      builder.set("key3", rand.nextBoolean());
+    try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(producerProperties)) {
+      Schema.Parser parser = new Schema.Parser();
+      Schema schema = parser.parse(Resources.getResource("drill-avro-test.avsc").openStream());
+      GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+      Random rand = new Random();
+      for (int i = 0; i < numMsg; ++i) {
+        builder.set("key1", UUID.randomUUID().toString());
+        builder.set("key2", rand.nextInt());
+        builder.set("key3", rand.nextBoolean());
 
-      List<Integer> list = Lists.newArrayList();
-      list.add(rand.nextInt(100));
-      list.add(rand.nextInt(100));
-      list.add(rand.nextInt(100));
-      builder.set("key5", list);
+        List<Integer> list = Lists.newArrayList();
+        list.add(rand.nextInt(100));
+        list.add(rand.nextInt(100));
+        list.add(rand.nextInt(100));
+        builder.set("key5", list);
 
-      Map<String, Double> map = Maps.newHashMap();
-      map.put("key61", rand.nextDouble());
-      map.put("key62", rand.nextDouble());
-      builder.set("key6", map);
+        Map<String, Double> map = Maps.newHashMap();
+        map.put("key61", rand.nextDouble());
+        map.put("key62", rand.nextDouble());
+        builder.set("key6", map);
 
-      Record producerRecord = builder.build();
+        Record producerRecord = builder.build();
 
-      ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, producerRecord);
-      producer.send(record);
+        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, producerRecord);
+        producer.send(record);
+      }
     }
-    producer.close();
   }
 
-  public void populateJsonMsgIntoKafka(String topic, int numMsg) throws InterruptedException, ExecutionException {
-    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProperties);
-    Random rand = new Random();
-    try {
+  public void populateJsonMsgIntoKafka(String topic, int numMsg) throws ExecutionException, InterruptedException {
+    try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
+      Random rand = new Random();
       for (int i = 0; i < numMsg; ++i) {
         JsonObject object = new JsonObject();
         object.addProperty("key1", UUID.randomUUID().toString());
@@ -118,29 +116,19 @@
         element3.addProperty("key62", rand.nextDouble());
         object.add("key6", element3);
 
-        ProducerRecord<String, String> message = new ProducerRecord<String, String>(topic, object.toString());
+        ProducerRecord<String, String> message = new ProducerRecord<>(topic, object.toString());
         logger.info("Publishing message : {}", message);
         Future<RecordMetadata> future = producer.send(message);
         logger.info("Committed offset of the message : {}", future.get().offset());
       }
-    } catch (Throwable th) {
-      logger.error(th.getMessage(), th);
-      throw new DrillRuntimeException(th.getMessage(), th);
-    } finally {
-      if (producer != null) {
-        producer.close();
-      }
     }
   }
 
-  public void populateJsonMsgWithTimestamps(String topic, int numMsg) {
-    KafkaProducer<String, String> producer = null;
-    Random rand = new Random();
-    try {
-      producer = new KafkaProducer<String, String>(producerProperties);
+  public void populateJsonMsgWithTimestamps(String topic, int numMsg) throws ExecutionException, InterruptedException {
+    try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
       int halfCount = numMsg / 2;
 
-      for(PartitionInfo tpInfo : producer.partitionsFor(topic)) {
+      for (PartitionInfo tpInfo : producer.partitionsFor(topic)) {
         for (int i = 1; i <= numMsg; ++i) {
           JsonObject object = new JsonObject();
           object.addProperty("stringKey", UUID.randomUUID().toString());
@@ -148,22 +136,23 @@
           object.addProperty("boolKey", i % 2 == 0);
 
           long timestamp = i < halfCount ? (halfCount - i) : i;
-          ProducerRecord<String, String> message =
-              new ProducerRecord<String, String>(tpInfo.topic(), tpInfo.partition(), timestamp, "key"+i, object.toString());
+          ProducerRecord<String, String> message = new ProducerRecord<>(tpInfo.topic(), tpInfo.partition(), timestamp, "key" + i, object.toString());
           logger.info("Publishing message : {}", message);
           Future<RecordMetadata> future = producer.send(message);
           logger.info("Committed offset of the message : {}", future.get().offset());
         }
-
-      }
-    } catch (Throwable th) {
-      logger.error(th.getMessage(), th);
-      throw new DrillRuntimeException(th.getMessage(), th);
-    } finally {
-      if (producer != null) {
-        producer.close();
       }
     }
   }
 
+  public void populateMessages(String topic, String... messages) throws ExecutionException, InterruptedException {
+    try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
+      for (String content : messages) {
+        ProducerRecord<String, String> message = new ProducerRecord<>(topic, content);
+        logger.info("Publishing message : {}", message);
+        Future<RecordMetadata> future = producer.send(message);
+        logger.info("Committed offset of the message : {}", future.get().offset());
+      }
+    }
+  }
 }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index 6dc1b3e..62d1b66 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -17,22 +17,28 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.drill.categories.KafkaStorageTest;
 import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.rpc.RpcException;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.Assert;
 import org.junit.Test;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import org.junit.experimental.categories.Category;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster;
+import static org.junit.Assert.fail;
+
 @Category({KafkaStorageTest.class, SlowTest.class})
 public class KafkaQueriesTest extends KafkaTestBase {
 
@@ -40,9 +46,12 @@
   public void testSqlQueryOnInvalidTopic() throws Exception {
     String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.INVALID_TOPIC);
     try {
-      testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.<Map<String, Object>> emptyList())
-          .build().run();
-      Assert.fail("Test passed though topic does not exist.");
+      testBuilder()
+        .sqlQuery(queryString)
+        .unOrdered()
+        .baselineRecords(Collections.emptyList())
+        .go();
+      fail("Test passed though topic does not exist.");
     } catch (RpcException re) {
       Assert.assertTrue(re.getMessage().contains("DATA_READ ERROR: Table 'invalid-topic' does not exist"));
     }
@@ -60,8 +69,12 @@
     Map<TopicPartition, Long> startOffsetsMap = fetchOffsets(-2);
 
     String queryString = String.format(TestQueryConstants.MIN_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC);
-    testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("minOffset")
-        .baselineValues(startOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))).go();
+    testBuilder()
+      .sqlQuery(queryString)
+      .unOrdered()
+      .baselineColumns("minOffset")
+      .baselineValues(startOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0)))
+      .go();
   }
 
   @Test
@@ -70,8 +83,12 @@
     Map<TopicPartition, Long> endOffsetsMap = fetchOffsets(-1);
 
     String queryString = String.format(TestQueryConstants.MAX_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC);
-    testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("maxOffset")
-        .baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))-1).go();
+    testBuilder()
+      .sqlQuery(queryString)
+      .unOrdered()
+      .baselineColumns("maxOffset")
+      .baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0)) - 1)
+      .go();
   }
 
   @Test
@@ -81,19 +98,20 @@
   }
 
   private Map<TopicPartition, Long> fetchOffsets(int flag) {
-    KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(),
+    Consumer<byte[], byte[]> kafkaConsumer = null;
+    try {
+     kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(),
         new ByteArrayDeserializer(), new ByteArrayDeserializer());
 
-    Map<TopicPartition, Long> offsetsMap = Maps.newHashMap();
-    kafkaConsumer.subscribe(Collections.singletonList(TestQueryConstants.JSON_TOPIC));
-    // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
-    // evaluates lazily, seeking to the
-    // first/last offset in all partitions only when poll(long) or
-    // position(TopicPartition) are called
-    kafkaConsumer.poll(0);
-    Set<TopicPartition> assignments = kafkaConsumer.assignment();
+      Map<TopicPartition, Long> offsetsMap = new HashMap<>();
+      kafkaConsumer.subscribe(Collections.singletonList(TestQueryConstants.JSON_TOPIC));
+      // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
+      // evaluates lazily, seeking to the
+      // first/last offset in all partitions only when poll(long) or
+      // position(TopicPartition) are called
+      kafkaConsumer.poll(0);
+      Set<TopicPartition> assignments = kafkaConsumer.assignment();
 
-    try {
       if (flag == -2) {
         // fetch start offsets for each topicPartition
         kafkaConsumer.seekToBeginning(assignments);
@@ -109,10 +127,10 @@
       } else {
         throw new RuntimeException(String.format("Unsupported flag %d", flag));
       }
+      return offsetsMap;
     } finally {
-      kafkaConsumer.close();
+      embeddedKafkaCluster.registerToClose(kafkaConsumer);
     }
-    return offsetsMap;
   }
 
   @Test
@@ -121,4 +139,109 @@
     testPhysicalPlanExecutionBasedOnQuery(query);
   }
 
+  @Test
+  public void testOneMessageTopic() throws Exception {
+    String topicName = "topicWithOneMessage";
+    TestKafkaSuit.createTopicHelper(topicName, 1);
+    KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
+    generator.populateMessages(topicName, "{\"index\": 1}");
+
+    testBuilder()
+      .sqlQuery("select index from kafka.`%s`", topicName)
+      .unOrdered()
+      .baselineColumns("index")
+      .baselineValues(1L)
+      .go();
+  }
+
+  @Test
+  public void testMalformedRecords() throws Exception {
+    String topicName = "topicWithMalFormedMessages";
+    TestKafkaSuit.createTopicHelper(topicName, 1);
+    try {
+      KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
+      generator.populateMessages(topicName, "Test");
+
+      alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, false);
+      try {
+        test("select * from kafka.`%s`", topicName);
+        fail();
+      } catch (UserException e) {
+        // expected
+      }
+
+      alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, true);
+      testBuilder()
+        .sqlQuery("select * from kafka.`%s`", topicName)
+        .expectsEmptyResultSet();
+
+      generator.populateMessages(topicName, "{\"index\": 1}", "", "   ", "{Invalid}", "{\"index\": 2}");
+
+      testBuilder()
+        .sqlQuery("select index from kafka.`%s`", topicName)
+        .unOrdered()
+        .baselineColumns("index")
+        .baselineValues(1L)
+        .baselineValues(2L)
+        .go();
+    } finally {
+      resetSessionOption(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS);
+    }
+  }
+
+  @Test
+  public void testNanInf() throws Exception {
+    String topicName = "topicWithNanInf";
+    TestKafkaSuit.createTopicHelper(topicName, 1);
+    try {
+      KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
+      generator.populateMessages(topicName, "{\"nan_col\":NaN, \"inf_col\":Infinity}");
+
+      alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, false);
+      try {
+        test("select nan_col, inf_col from kafka.`%s`", topicName);
+        fail();
+      } catch (UserException e) {
+        // expected
+      }
+
+      alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, true);
+      testBuilder()
+        .sqlQuery("select nan_col, inf_col from kafka.`%s`", topicName)
+        .unOrdered()
+        .baselineColumns("nan_col", "inf_col")
+        .baselineValues(Double.NaN, Double.POSITIVE_INFINITY)
+        .go();
+    } finally {
+      resetSessionOption(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS);
+    }
+  }
+
+  @Test
+  public void testEscapeAnyChar() throws Exception {
+    String topicName = "topicWithEscapeAnyChar";
+    TestKafkaSuit.createTopicHelper(topicName, 1);
+    try {
+      KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
+      generator.populateMessages(topicName, "{\"name\": \"AB\\\"\\C\"}");
+
+      alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, false);
+      try {
+        test("select name from kafka.`%s`", topicName);
+        fail();
+      } catch (UserException e) {
+        // expected
+      }
+
+      alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, true);
+      testBuilder()
+        .sqlQuery("select name from kafka.`%s`", topicName)
+        .unOrdered()
+        .baselineColumns("name")
+        .baselineValues("AB\"C")
+        .go();
+    } finally {
+      resetSessionOption(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR);
+    }
+  }
 }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
index b1742d7..effff77 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.List;
 import java.util.Map;
 
@@ -77,18 +75,10 @@
     }
   }
 
-  public void testHelper(String query, String expectedExprInPlan, int expectedRecordCount) throws Exception {
-    testPhysicalPlan(query, expectedExprInPlan);
-    int actualRecordCount = testSql(query);
-    assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s",
-        expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
-  }
-
   @AfterClass
-  public static void tearDownKafkaTestBase() throws Exception {
+  public static void tearDownKafkaTestBase() {
     if (TestKafkaSuit.isRunningSuite()) {
       TestKafkaSuit.tearDownCluster();
     }
   }
-
 }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
index 4347167..12db4c3 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
@@ -55,6 +55,7 @@
   public void cleanUp() {
     if (kafkaConsumer != null) {
       kafkaConsumer.close();
+      kafkaConsumer = null;
     }
   }
 
@@ -67,7 +68,7 @@
     } catch (UserException ue) {
       Assert.assertEquals(ErrorType.DATA_READ, ue.getErrorType());
       Assert.assertTrue(ue.getMessage().contains(
-          "DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout"));
+          "DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property: store.kafka.poll.timeout"));
     }
   }
 
@@ -89,7 +90,7 @@
     Assert.assertNotNull(iterator.next());
     try {
       iterator.next();
-      Assert.fail("Kafak fetched more messages than configured.");
+      Assert.fail("Kafka fetched more messages than configured.");
     } catch (NoSuchElementException nse) {
       // Expected
     }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index 784eb4e..b586d7d 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -17,20 +17,20 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import kafka.utils.ZKStringSerializer$;
 import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.drill.categories.KafkaStorageTest;
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.exec.ZookeeperTestUtil;
 import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.security.JaasUtils;
-
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.experimental.categories.Category;
@@ -40,29 +40,34 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @Category({KafkaStorageTest.class, SlowTest.class})
 @RunWith(Suite.class)
-@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class,
-    KafkaFilterPushdownTest.class })
+@SuiteClasses({KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class, KafkaFilterPushdownTest.class})
 public class TestKafkaSuit {
   private static final Logger logger = LoggerFactory.getLogger(LoggerFactory.class);
+
   private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf";
 
   public static EmbeddedKafkaCluster embeddedKafkaCluster;
+
   private static ZkClient zkClient;
 
   private static volatile AtomicInteger initCount = new AtomicInteger(0);
-  static final int NUM_JSON_MSG = 10;
-  static final int CONN_TIMEOUT = 8 * 1000;
-  static final int SESSION_TIMEOUT = 10 * 1000;
 
-  static String kafkaBroker;
-  private static volatile boolean runningSuite = false;
+  static final int NUM_JSON_MSG = 10;
+
+  private static final int CONN_TIMEOUT = 8 * 1000;
+
+  private static final int SESSION_TIMEOUT = 10 * 1000;
+
+  private static volatile boolean runningSuite = true;
 
   @BeforeClass
   public static void initKafka() throws Exception {
@@ -71,17 +76,9 @@
         ZookeeperTestUtil.setZookeeperSaslTestConfigProps();
         System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, ClassLoader.getSystemResource(LOGIN_CONF_RESOURCE_PATHNAME).getFile());
         embeddedKafkaCluster = new EmbeddedKafkaCluster();
-        Properties topicProps = new Properties();
         zkClient = new ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$);
-        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
-        AdminUtils.createTopic(zkUtils, TestQueryConstants.JSON_TOPIC, 1, 1, topicProps, RackAwareMode.Disabled$.MODULE$);
-
-        org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = AdminUtils
-            .fetchTopicMetadataFromZk(TestQueryConstants.JSON_TOPIC, zkUtils);
-        logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
-
-        KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
-            StringSerializer.class);
+        createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
+        KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
         generator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC, NUM_JSON_MSG);
       }
       initCount.incrementAndGet();
@@ -95,32 +92,36 @@
   }
 
   @AfterClass
-  public static void tearDownCluster() throws Exception {
+  public static void tearDownCluster() {
     synchronized (TestKafkaSuit.class) {
       if (initCount.decrementAndGet() == 0) {
         if (zkClient != null) {
           zkClient.close();
+          zkClient = null;
         }
         if (embeddedKafkaCluster != null && !embeddedKafkaCluster.getBrokers().isEmpty()) {
           embeddedKafkaCluster.shutDownCluster();
+          embeddedKafkaCluster = null;
         }
       }
     }
   }
 
-  public static void createTopicHelper(final String topicName, final int partitions) {
-
-    Properties topicProps = new Properties();
-    topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
-    topicProps.put(TopicConfig.RETENTION_MS_CONFIG, "-1");
-    ZkUtils zkUtils = new ZkUtils(zkClient,
-        new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
-    AdminUtils.createTopic(zkUtils, topicName, partitions, 1,
-        topicProps, RackAwareMode.Disabled$.MODULE$);
-
-    org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk =
-        AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
-    logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
+  public static void createTopicHelper(String topicName, int partitions) throws ExecutionException, InterruptedException {
+    try (AdminClient adminClient = initAdminClient()) {
+      NewTopic newTopic = new NewTopic(topicName, partitions, (short) 1);
+      Map<String, String> topicConfigs = new HashMap<>();
+      topicConfigs.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
+      topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, "-1");
+      newTopic.configs(topicConfigs);
+      CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
+      result.all().get();
+    }
   }
 
+  private static AdminClient initAdminClient() {
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaCluster.getKafkaBrokerList());
+    return AdminClient.create(props);
+  }
 }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
index 663e0e4..eccc17a 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
@@ -25,10 +25,8 @@
 import java.util.Properties;
 
 import org.apache.drill.exec.ZookeeperHelper;
-import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
+import org.apache.drill.exec.store.kafka.KafkaAsyncCloser;
 import org.apache.drill.exec.store.kafka.TestQueryConstants;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,9 +34,11 @@
 import kafka.server.KafkaServerStartable;
 
 public class EmbeddedKafkaCluster implements TestQueryConstants {
+
   private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
   private List<KafkaServerStartable> brokers;
-  private final ZookeeperHelper zkHelper;
+  private ZookeeperHelper zkHelper;
+  private KafkaAsyncCloser closer;
   private final Properties props;
 
   public EmbeddedKafkaCluster() throws IOException {
@@ -49,9 +49,9 @@
     this(props, 1);
   }
 
-  public EmbeddedKafkaCluster(Properties basePorps, int numberOfBrokers) throws IOException {
+  public EmbeddedKafkaCluster(Properties baseProps, int numberOfBrokers) throws IOException {
     this.props = new Properties();
-    props.putAll(basePorps);
+    props.putAll(baseProps);
     this.zkHelper = new ZookeeperHelper();
     zkHelper.startZookeeper(1);
     this.brokers = new ArrayList<>(numberOfBrokers);
@@ -62,13 +62,14 @@
         sb.append(BROKER_DELIM);
       }
       int ephemeralBrokerPort = getEphemeralPort();
-      sb.append(LOCAL_HOST + ":" + ephemeralBrokerPort);
+      sb.append(LOCAL_HOST).append(":").append(ephemeralBrokerPort);
       addBroker(props, i, ephemeralBrokerPort);
     }
 
     this.props.put("metadata.broker.list", sb.toString());
     this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
     logger.info("Initialized Kafka Server");
+    this.closer = new KafkaAsyncCloser();
   }
 
   private void addBroker(Properties props, int brokerID, int ephemeralBrokerPort) {
@@ -79,13 +80,14 @@
     properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
     properties.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1));
     properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100));
-    properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.TRUE);
+    properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.FALSE);
     properties.put(KafkaConfig.ZkConnectProp(), zkHelper.getConnectionString());
     properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1));
-    properties.put(KafkaConfig.HostNameProp(), String.valueOf(LOCAL_HOST));
-    properties.put(KafkaConfig.AdvertisedHostNameProp(), String.valueOf(LOCAL_HOST));
+    properties.put(KafkaConfig.HostNameProp(), LOCAL_HOST);
+    properties.put(KafkaConfig.AdvertisedHostNameProp(), LOCAL_HOST);
     properties.put(KafkaConfig.PortProp(), String.valueOf(ephemeralBrokerPort));
-    properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.FALSE);
+    properties.put(KafkaConfig.AdvertisedPortProp(), String.valueOf(ephemeralBrokerPort));
+    properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.TRUE);
     properties.put(KafkaConfig.LogDirsProp(), getTemporaryDir().getAbsolutePath());
     properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
     brokers.add(getBroker(properties));
@@ -97,23 +99,25 @@
     return broker;
   }
 
-  public void shutDownCluster() throws IOException {
-    // set Kafka log level to ERROR
-    Level level = LogManager.getLogger(KafkaStoragePluginConfig.NAME).getLevel();
-    LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(Level.ERROR);
+  public void shutDownCluster() {
+    closer.close();
+    closer = null;
 
-    for (KafkaServerStartable broker : brokers) {
-      broker.shutdown();
+    if (brokers != null) {
+      for (KafkaServerStartable broker : brokers) {
+        broker.shutdown();
+      }
+      brokers = null;
     }
-
-    // revert back the level
-    LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(level);
-    zkHelper.stopZookeeper();
+    if (zkHelper != null) {
+      zkHelper.stopZookeeper();
+      zkHelper = null;
+    }
   }
 
   public void shutDownBroker(int brokerId) {
     for (KafkaServerStartable broker : brokers) {
-      if (Integer.valueOf(broker.serverConfig().getString(KafkaConfig.BrokerIdProp())) == brokerId) {
+      if (Integer.parseInt(broker.staticServerConfig().getString(KafkaConfig.BrokerIdProp())) == brokerId) {
         broker.shutdown();
         return;
       }
@@ -141,13 +145,17 @@
   public String getKafkaBrokerList() {
     StringBuilder sb = new StringBuilder();
     for (KafkaServerStartable broker : brokers) {
-      KafkaConfig serverConfig = broker.serverConfig();
-      sb.append(serverConfig.hostName() + ":" + serverConfig.port());
+      KafkaConfig serverConfig = broker.staticServerConfig();
+      sb.append(serverConfig.hostName()).append(":").append(serverConfig.port());
       sb.append(",");
     }
     return sb.toString().substring(0, sb.toString().length() - 1);
   }
 
+  public void registerToClose(AutoCloseable autoCloseable) {
+    closer.close(autoCloseable);
+  }
+
   private int getEphemeralPort() throws IOException {
     try (ServerSocket socket = new ServerSocket(0)) {
       return socket.getLocalPort();
@@ -162,5 +170,4 @@
     }
     return file;
   }
-
 }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
index 796c63a..1b8aa11 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
@@ -33,7 +33,7 @@
       MessageReaderFactory.getMessageReader(null);
       Assert.fail("Message reader initialization succeeded even though it is null");
     } catch (UserException ue) {
-      Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+      Assert.assertSame(ue.getErrorType(), ErrorType.VALIDATION);
       Assert.assertTrue(ue.getMessage().contains(
           "VALIDATION ERROR: Please configure message reader implementation using the property 'store.kafka.record.reader'"));
     }
@@ -45,7 +45,7 @@
       MessageReaderFactory.getMessageReader(MessageReaderFactoryTest.class.getName());
       Assert.fail("Message reader initialization succeeded even though class does not implement message reader interface");
     } catch (UserException ue) {
-      Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+      Assert.assertSame(ue.getErrorType(), ErrorType.VALIDATION);
       Assert.assertTrue(ue.getMessage().contains(
           "VALIDATION ERROR: Message reader configured 'org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest' does not implement 'org.apache.drill.exec.store.kafka.decoders.MessageReader'"));
     }
@@ -57,7 +57,7 @@
       MessageReaderFactory.getMessageReader("a.b.c.d");
       Assert.fail("Message reader initialization succeeded even though class does not exist");
     } catch (UserException ue) {
-      Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+      Assert.assertSame(ue.getErrorType(), ErrorType.VALIDATION);
       Assert.assertTrue(ue.getMessage().contains("VALIDATION ERROR: Failed to initialize message reader : a.b.c.d"));
     }
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 91f8803..22a6cb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -539,6 +539,16 @@
   public static final String KAFKA_POLL_TIMEOUT = "store.kafka.poll.timeout";
   public static final PositiveLongValidator KAFKA_POLL_TIMEOUT_VALIDATOR = new PositiveLongValidator(KAFKA_POLL_TIMEOUT, Long.MAX_VALUE,
       new OptionDescription("Amount of time in milliseconds allotted to the Kafka client to fetch messages from the Kafka cluster; default value is 200."));
+  public static final String KAFKA_READER_SKIP_INVALID_RECORDS = "store.kafka.reader.skip_invalid_records";
+  public static final BooleanValidator KAFKA_SKIP_MALFORMED_RECORDS_VALIDATOR = new BooleanValidator(KAFKA_READER_SKIP_INVALID_RECORDS,
+    new OptionDescription("Allows queries to progress when the JSON record reader skips bad records in JSON files. Default is false. (Drill 1.17+)"));
+  public static final String KAFKA_READER_NAN_INF_NUMBERS = "store.kafka.reader.allow_nan_inf";
+  public static final BooleanValidator KAFKA_READER_NAN_INF_NUMBERS_VALIDATOR = new BooleanValidator(KAFKA_READER_NAN_INF_NUMBERS,
+    new OptionDescription("Enables the Kafka JSON record reader in Drill to read `NaN` and `Infinity` tokens in JSON data as numbers. Default is true. (Drill 1.17+)"));
+  public static final String KAFKA_READER_ESCAPE_ANY_CHAR = "store.kafka.reader.allow_escape_any_char";
+  public static final BooleanValidator KAFKA_READER_ESCAPE_ANY_CHAR_VALIDATOR = new BooleanValidator(KAFKA_READER_ESCAPE_ANY_CHAR,
+    new OptionDescription("Enables the Kafka JSON record reader in Drill to escape any character. Default is false. (Drill 1.17+)"));
+
 
   // TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare
   // in core which is not right. Move this option and above two mongo plugin related options once we have the feature.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 28fa3e3..e3ed2f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -202,6 +202,9 @@
       new OptionDefinition(ExecConstants.KAFKA_RECORD_READER_VALIDATOR),
       new OptionDefinition(ExecConstants.KAFKA_POLL_TIMEOUT_VALIDATOR),
       new OptionDefinition(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR),
+      new OptionDefinition(ExecConstants.KAFKA_SKIP_MALFORMED_RECORDS_VALIDATOR),
+      new OptionDefinition(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS_VALIDATOR),
+      new OptionDefinition(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR_VALIDATOR),
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR),
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR),
       new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4e33c82..021e297 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -670,6 +670,9 @@
     store.kafka.read_numbers_as_double: false,
     store.kafka.record.reader: "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader",
     store.kafka.poll.timeout: 200,
+    store.kafka.reader.skip_invalid_records: false,
+    store.kafka.reader.allow_nan_inf: true,
+    store.kafka.reader.allow_escape_any_char: false,
     web.logs.max_lines: 10000,
     web.display_format.timestamp: "",
     web.display_format.date: "",