DRILL-7388: Kafka improvements
1. Upgraded Kafka libraries to 2.3.1 (DRILL-6739).
2. Added new options to support the same features as native JSON reader:
a. store.kafka.reader.skip_invalid_records, default: false (DRILL-6723);
b. store.kafka.reader.allow_nan_inf, default: true;
c. store.kafka.reader.allow_escape_any_char, default: false.
3. Fixed issue when Kafka topic contains only one message (DRILL-7388).
4. Replaced Gson parser with Jackson to parse JSON in the same manner as Drill native Json reader.
5. Performance improvements: Kafka consumers will be closed async, fixed issue with resource leak (DRILL-7290), moved to debug unnecessary info logging.
6. Updated bootstrap-storage-plugins.json to reflect actual Kafka connection properties.
7. Added unit tests.
8. Refactoring and code clean up.
closes #1901
diff --git a/contrib/storage-kafka/README.md b/contrib/storage-kafka/README.md
index a63731f..a26c6e1 100644
--- a/contrib/storage-kafka/README.md
+++ b/contrib/storage-kafka/README.md
@@ -211,18 +211,22 @@
- store.kafka.record.reader system option can be used for setting record reader and default is org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
- Default store.kafka.poll.timeout is set to 200, user has to set this accordingly
-- Custom record reader can be implemented by extending org.apache.drill.exec.store.kafka.decoders.MessageReader and setting store.kafka.record.reader accordinlgy
+- Custom record reader can be implemented by extending org.apache.drill.exec.store.kafka.decoders.MessageReader and setting store.kafka.record.reader accordingly
-In case of JSON message format, following system options can be used accordingly. More details can be found in [Drill Json Model](https://drill.apache.org/docs/json-data-model/) and in [Drill system options configurations](https://drill.apache.org/docs/configuration-options-introduction/)
+In case of JSON message format, following system / session options can be used accordingly. More details can be found in [Drill Json Model](https://drill.apache.org/docs/json-data
+-model/) and in [Drill system options configurations](https://drill.apache.org/docs/configuration-options-introduction/)
-<ui>
+<ul>
<li>ALTER SESSION SET `store.kafka.record.reader` = 'org.apache.drill.exec.store.kafka.decoders.JsonMessageReader';</li>
<li>ALTER SESSION SET `store.kafka.poll.timeout` = 200;</li>
<li>ALTER SESSION SET `exec.enable_union_type` = true; </li>
<li>ALTER SESSION SET `store.kafka.all_text_mode` = true;</li>
<li>ALTER SESSION SET `store.kafka.read_numbers_as_double` = true;</li>
-</ui>
+ <li>ALTER SESSION SET `store.kafka.skip_invalid_records` = true;</li>
+ <li>ALTER SESSION SET `store.kafka.allow_nan_inf` = true;</li>
+ <li>ALTER SESSION SET `store.kafka.allow_escape_any_char` = true;</li>
+</ul>
<h4 id="RoadMap">RoadMap</h4>
<ul>
diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml
index 83a1a29..0e2f3d6 100644
--- a/contrib/storage-kafka/pom.xml
+++ b/contrib/storage-kafka/pom.xml
@@ -31,7 +31,7 @@
<name>contrib/kafka-storage-plugin</name>
<properties>
- <kafka.version>0.11.0.1</kafka.version>
+ <kafka.version>2.3.1</kafka.version>
<kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite>
</properties>
@@ -64,7 +64,7 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
+ <artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
@@ -78,7 +78,7 @@
</exclusions>
</dependency>
- <!-- Test dependencie -->
+ <!-- Test dependencies -->
<dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-java-exec</artifactId>
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaAsyncCloser.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaAsyncCloser.java
new file mode 100644
index 0000000..c8d74bf
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaAsyncCloser.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Closes Kafka resources asynchronously which result does not depend on close method
+ * in order to improve query execution performance.
+ * For example, {@link org.apache.kafka.clients.consumer.KafkaConsumer}.
+ */
+public class KafkaAsyncCloser implements AutoCloseable {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaAsyncCloser.class);
+
+ private volatile ExecutorService executorService;
+
+ /**
+ * Closes given resource in separate thread using thread executor.
+ *
+ * @param autoCloseable resource to close
+ */
+ public void close(AutoCloseable autoCloseable) {
+ if (autoCloseable != null) {
+ ExecutorService executorService = executorService();
+ executorService.submit(() -> {
+ try {
+ autoCloseable.close();
+ logger.debug("Closing {} resource", autoCloseable.getClass().getCanonicalName());
+ } catch (Exception e) {
+ logger.debug("Resource {} failed to close: {}", autoCloseable.getClass().getCanonicalName(), e.getMessage());
+ }
+ });
+ }
+ }
+
+ @Override
+ public void close() {
+ if (executorService != null) {
+ logger.trace("Closing Kafka async closer: {}", executorService);
+ executorService.shutdownNow();
+ }
+ }
+
+ /**
+ * Initializes executor service instance using DCL.
+ * Created thread executor instance allows to execute only one thread at a time
+ * but unlike single thread executor does not keep this thread in the pool.
+ * Custom thread factory is used to define Kafka specific thread names.
+ *
+ * @return executor service instance
+ */
+ private ExecutorService executorService() {
+ if (executorService == null) {
+ synchronized (this) {
+ if (executorService == null) {
+ this.executorService = new ThreadPoolExecutor(0, 1, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new KafkaThreadFactory());
+ }
+ }
+ }
+ return executorService;
+ }
+
+ /**
+ * Wraps default thread factory and adds Kafka closer prefix to the original thread name.
+ * Is used to uniquely identify Kafka closer threads.
+ * Example: drill-kafka-closer-pool-1-thread-1
+ */
+ private static class KafkaThreadFactory implements ThreadFactory {
+
+ private static final String THREAD_PREFIX = "drill-kafka-closer-";
+ private final ThreadFactory delegate = Executors.defaultThreadFactory();
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread thread = delegate.newThread(runnable);
+ thread.setName(THREAD_PREFIX + thread.getName());
+ return thread;
+ }
+ }
+}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index dded560..e4f255c 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -17,12 +17,14 @@
*/
package org.apache.drill.exec.store.kafka;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
@@ -155,25 +157,28 @@
private void init() {
partitionWorkMap = Maps.newHashMap();
Collection<DrillbitEndpoint> endpoints = kafkaStoragePlugin.getContext().getBits();
- Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap();
- for (DrillbitEndpoint endpoint : endpoints) {
- endpointMap.put(endpoint.getAddress(), endpoint);
- }
+ Map<String, DrillbitEndpoint> endpointMap = endpoints.stream()
+ .collect(Collectors.toMap(
+ DrillbitEndpoint::getAddress,
+ Function.identity(),
+ (o, n) -> n));
Map<TopicPartition, Long> startOffsetsMap = Maps.newHashMap();
Map<TopicPartition, Long> endOffsetsMap = Maps.newHashMap();
- List<PartitionInfo> topicPartitions = null;
+ List<PartitionInfo> topicPartitions;
String topicName = kafkaScanSpec.getTopicName();
- try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(kafkaStoragePlugin.getConfig().getKafkaConsumerProps(),
- new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
- if (!kafkaConsumer.listTopics().keySet().contains(topicName)) {
+ KafkaConsumer<?, ?> kafkaConsumer = null;
+ try {
+ kafkaConsumer = new KafkaConsumer<>(kafkaStoragePlugin.getConfig().getKafkaConsumerProps(),
+ new ByteArrayDeserializer(), new ByteArrayDeserializer());
+ if (!kafkaConsumer.listTopics().containsKey(topicName)) {
throw UserException.dataReadError()
.message("Table '%s' does not exist", topicName)
.build(logger);
}
- kafkaConsumer.subscribe(Arrays.asList(topicName));
+ kafkaConsumer.subscribe(Collections.singletonList(topicName));
// based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
// evaluates lazily, seeking to the first/last offset in all partitions only
// when poll(long) or
@@ -194,8 +199,12 @@
endOffsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition));
}
} catch (Exception e) {
- throw UserException.dataReadError(e).message("Failed to fetch start/end offsets of the topic %s", topicName)
- .addContext(e.getMessage()).build(logger);
+ throw UserException.dataReadError(e)
+ .message("Failed to fetch start/end offsets of the topic %s", topicName)
+ .addContext(e.getMessage())
+ .build(logger);
+ } finally {
+ kafkaStoragePlugin.registerToClose(kafkaConsumer);
}
// computes work for each end point
@@ -227,11 +236,10 @@
@Override
public KafkaSubScan getSpecificScan(int minorFragmentId) {
List<PartitionScanWork> workList = assignments.get(minorFragmentId);
- List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList();
- for (PartitionScanWork work : workList) {
- scanSpecList.add(work.partitionScanSpec);
- }
+ List<KafkaPartitionScanSpec> scanSpecList = workList.stream()
+ .map(PartitionScanWork::getPartitionScanSpec)
+ .collect(Collectors.toList());
return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, scanSpecList);
}
@@ -256,7 +264,7 @@
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new KafkaGroupScan(this);
}
@@ -286,7 +294,7 @@
KafkaGroupScan clone = new KafkaGroupScan(this);
HashSet<TopicPartition> partitionsInSpec = Sets.newHashSet();
- for(KafkaPartitionScanSpec scanSpec : partitionScanSpecList) {
+ for (KafkaPartitionScanSpec scanSpec : partitionScanSpecList) {
TopicPartition tp = new TopicPartition(scanSpec.getTopicName(), scanSpec.getPartitionId());
partitionsInSpec.add(tp);
@@ -327,10 +335,8 @@
@JsonIgnore
public List<KafkaPartitionScanSpec> getPartitionScanSpecList() {
- List<KafkaPartitionScanSpec> partitionScanSpecList = Lists.newArrayList();
- for (PartitionScanWork work : partitionWorkMap.values()) {
- partitionScanSpecList.add(work.partitionScanSpec.clone());
- }
- return partitionScanSpecList;
+ return partitionWorkMap.values().stream()
+ .map(work -> work.partitionScanSpec.clone())
+ .collect(Collectors.toList());
}
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
index 92488a2..c0ce1c2 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
@@ -180,7 +180,6 @@
.put("less_than", "greater_than")
.build();
}
-
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
index 713f62e..eaef410 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
@@ -57,21 +57,21 @@
switch (functionName) {
case "booleanAnd":
//Reduce the scan range
- if(startOffset < scanSpec.startOffset) {
+ if (startOffset < scanSpec.startOffset) {
startOffset = scanSpec.startOffset;
}
- if(endOffset > scanSpec.endOffset) {
+ if (endOffset > scanSpec.endOffset) {
endOffset = scanSpec.endOffset;
}
break;
case "booleanOr":
//Increase the scan range
- if(scanSpec.startOffset < startOffset) {
+ if (scanSpec.startOffset < startOffset) {
startOffset = scanSpec.startOffset;
}
- if(scanSpec.endOffset > endOffset) {
+ if (scanSpec.endOffset > endOffset) {
endOffset = scanSpec.endOffset;
}
break;
@@ -80,7 +80,7 @@
@Override
public boolean equals(Object obj) {
- if(obj instanceof KafkaPartitionScanSpec) {
+ if (obj instanceof KafkaPartitionScanSpec) {
KafkaPartitionScanSpec that = ((KafkaPartitionScanSpec)obj);
return this.topicName.equals(that.topicName) && this.partitionId == that.partitionId
&& this.startOffset == that.startOffset && this.endOffset == that.endOffset;
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
index 9fa987a..2734861 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
@@ -17,44 +17,44 @@
*/
package org.apache.drill.exec.store.kafka;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.drill.common.expression.BooleanOperator;
-import org.apache.drill.common.expression.FunctionCall;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
-public class KafkaPartitionScanSpecBuilder extends
- AbstractExprVisitor<List<KafkaPartitionScanSpec>,Void,RuntimeException> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaPartitionScanSpecBuilder.class);
+public class KafkaPartitionScanSpecBuilder
+ extends AbstractExprVisitor<List<KafkaPartitionScanSpec>, Void, RuntimeException>
+ implements AutoCloseable {
+
private final LogicalExpression le;
private final KafkaGroupScan groupScan;
private final KafkaConsumer<?, ?> kafkaConsumer;
private ImmutableMap<TopicPartition, KafkaPartitionScanSpec> fullScanSpec;
- private static final long CLOSE_TIMEOUT_MS = 200;
public KafkaPartitionScanSpecBuilder(KafkaGroupScan groupScan, LogicalExpression conditionExp) {
this.groupScan = groupScan;
- kafkaConsumer = new KafkaConsumer<>(groupScan.getKafkaStoragePluginConfig().getKafkaConsumerProps(),
+ this.kafkaConsumer = new KafkaConsumer<>(groupScan.getKafkaStoragePluginConfig().getKafkaConsumerProps(),
new ByteArrayDeserializer(), new ByteArrayDeserializer());
- le = conditionExp;
+ this.le = conditionExp;
}
public List<KafkaPartitionScanSpec> parseTree() {
ImmutableMap.Builder<TopicPartition, KafkaPartitionScanSpec> builder = ImmutableMap.builder();
- for(KafkaPartitionScanSpec scanSpec : groupScan.getPartitionScanSpecList()) {
+ for (KafkaPartitionScanSpec scanSpec : groupScan.getPartitionScanSpecList()) {
builder.put(new TopicPartition(scanSpec.getTopicName(), scanSpec.getPartitionId()), scanSpec);
}
fullScanSpec = builder.build();
@@ -65,7 +65,7 @@
This results in a "ScanBatch" with no reader. DRILL currently requires
at least one reader to be present in a scan batch.
*/
- if(pushdownSpec != null && pushdownSpec.isEmpty()) {
+ if (pushdownSpec != null && pushdownSpec.isEmpty()) {
TopicPartition firstPartition = new TopicPartition(groupScan.getKafkaScanSpec().getTopicName(), 0);
KafkaPartitionScanSpec emptySpec =
new KafkaPartitionScanSpec(firstPartition.topic(),firstPartition.partition(),
@@ -76,29 +76,26 @@
}
@Override
- public List<KafkaPartitionScanSpec> visitUnknown(LogicalExpression e, Void value)
- throws RuntimeException {
+ public List<KafkaPartitionScanSpec> visitUnknown(LogicalExpression e, Void value) {
return null;
}
@Override
- public List<KafkaPartitionScanSpec> visitBooleanOperator(BooleanOperator op, Void value)
- throws RuntimeException {
-
+ public List<KafkaPartitionScanSpec> visitBooleanOperator(BooleanOperator op, Void value) {
Map<TopicPartition, KafkaPartitionScanSpec> specMap = Maps.newHashMap();
ImmutableList<LogicalExpression> args = op.args;
- if(op.getName().equals("booleanOr")) {
+ if (op.getName().equals("booleanOr")) {
- for(LogicalExpression expr : args) {
+ for (LogicalExpression expr : args) {
List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null);
//parsedSpec is null if expression cannot be pushed down
- if(parsedSpec != null) {
+ if (parsedSpec != null) {
for(KafkaPartitionScanSpec newSpec : parsedSpec) {
TopicPartition tp = new TopicPartition(newSpec.getTopicName(), newSpec.getPartitionId());
KafkaPartitionScanSpec existingSpec = specMap.get(tp);
//If existing spec does not contain topic-partition
- if(existingSpec == null) {
+ if (existingSpec == null) {
specMap.put(tp, newSpec); //Add topic-partition to spec for OR
} else {
existingSpec.mergeScanSpec(op.getName(), newSpec);
@@ -111,11 +108,11 @@
}
} else { //booleanAnd
specMap.putAll(fullScanSpec);
- for(LogicalExpression expr : args) {
+ for (LogicalExpression expr : args) {
List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null);
//parsedSpec is null if expression cannot be pushed down
- if(parsedSpec != null) {
+ if (parsedSpec != null) {
Set<TopicPartition> partitionsInNewSpec = Sets.newHashSet(); //Store topic-partitions returned from new spec.
for (KafkaPartitionScanSpec newSpec : parsedSpec) {
@@ -143,14 +140,12 @@
}
@Override
- public List<KafkaPartitionScanSpec> visitFunctionCall(FunctionCall call, Void value)
- throws RuntimeException {
-
+ public List<KafkaPartitionScanSpec> visitFunctionCall(FunctionCall call, Void value) {
String functionName = call.getName();
- if(KafkaNodeProcessor.isPushdownFunction(functionName)) {
+ if (KafkaNodeProcessor.isPushdownFunction(functionName)) {
KafkaNodeProcessor kafkaNodeProcessor = KafkaNodeProcessor.process(call);
- if(kafkaNodeProcessor.isSuccess()) {
+ if (kafkaNodeProcessor.isSuccess()) {
switch (kafkaNodeProcessor.getPath()) {
case "kafkaMsgTimestamp":
return createScanSpecForTimestamp(kafkaNodeProcessor.getFunctionName(),
@@ -168,22 +163,21 @@
}
- private List<KafkaPartitionScanSpec> createScanSpecForTimestamp(String functionName,
- Long fieldValue) {
+ private List<KafkaPartitionScanSpec> createScanSpecForTimestamp(String functionName, Long fieldValue) {
List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList();
Map<TopicPartition, Long> timesValMap = Maps.newHashMap();
ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
- for(TopicPartition partitions : topicPartitions) {
+ for (TopicPartition partitions : topicPartitions) {
timesValMap.put(partitions, functionName.equals("greater_than") ? fieldValue+1 : fieldValue);
}
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = kafkaConsumer.offsetsForTimes(timesValMap);
- for(TopicPartition tp : topicPartitions) {
+ for (TopicPartition tp : topicPartitions) {
OffsetAndTimestamp value = offsetAndTimestamp.get(tp);
//OffsetAndTimestamp is null if there is no offset greater or equal to requested timestamp
- if(value == null) {
+ if (value == null) {
scanSpec.add(
new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
fullScanSpec.get(tp).getEndOffset(), fullScanSpec.get(tp).getEndOffset()));
@@ -197,8 +191,7 @@
return scanSpec;
}
- private List<KafkaPartitionScanSpec> createScanSpecForOffset(String functionName,
- Long fieldValue) {
+ private List<KafkaPartitionScanSpec> createScanSpecForOffset(String functionName, Long fieldValue) {
List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList();
ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
@@ -210,8 +203,8 @@
switch (functionName) {
case "equal":
- for(TopicPartition tp : topicPartitions) {
- if(fieldValue < fullScanSpec.get(tp).getStartOffset()) {
+ for (TopicPartition tp : topicPartitions) {
+ if (fieldValue < fullScanSpec.get(tp).getStartOffset()) {
//Offset does not exist
scanSpec.add(
new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
@@ -224,7 +217,7 @@
}
break;
case "greater_than_or_equal_to":
- for(TopicPartition tp : topicPartitions) {
+ for (TopicPartition tp : topicPartitions) {
//Ensure scan range is between startOffset and endOffset,
long val = bindOffsetToRange(tp, fieldValue);
scanSpec.add(
@@ -233,16 +226,16 @@
}
break;
case "greater_than":
- for(TopicPartition tp : topicPartitions) {
+ for (TopicPartition tp : topicPartitions) {
//Ensure scan range is between startOffset and endOffset,
- long val = bindOffsetToRange(tp, fieldValue+1);
+ long val = bindOffsetToRange(tp, fieldValue + 1);
scanSpec.add(
new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
val, fullScanSpec.get(tp).getEndOffset()));
}
break;
case "less_than_or_equal_to":
- for(TopicPartition tp : topicPartitions) {
+ for (TopicPartition tp : topicPartitions) {
//Ensure scan range is between startOffset and endOffset,
long val = bindOffsetToRange(tp, fieldValue+1);
@@ -252,7 +245,7 @@
}
break;
case "less_than":
- for(TopicPartition tp : topicPartitions) {
+ for (TopicPartition tp : topicPartitions) {
//Ensure scan range is between startOffset and endOffset,
long val = bindOffsetToRange(tp, fieldValue);
@@ -265,15 +258,14 @@
return scanSpec;
}
- private List<KafkaPartitionScanSpec> createScanSpecForPartition(String functionName,
- Long fieldValue) {
+ private List<KafkaPartitionScanSpec> createScanSpecForPartition(String functionName, Long fieldValue) {
List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList();
ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
switch (functionName) {
case "equal":
- for(TopicPartition tp : topicPartitions) {
- if(tp.partition() == fieldValue) {
+ for (TopicPartition tp : topicPartitions) {
+ if (tp.partition() == fieldValue) {
scanSpecList.add(
new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
fullScanSpec.get(tp).getStartOffset(),
@@ -282,8 +274,8 @@
}
break;
case "not_equal":
- for(TopicPartition tp : topicPartitions) {
- if(tp.partition() != fieldValue) {
+ for (TopicPartition tp : topicPartitions) {
+ if (tp.partition() != fieldValue) {
scanSpecList.add(
new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
fullScanSpec.get(tp).getStartOffset(),
@@ -292,8 +284,8 @@
}
break;
case "greater_than_or_equal_to":
- for(TopicPartition tp : topicPartitions) {
- if(tp.partition() >= fieldValue) {
+ for (TopicPartition tp : topicPartitions) {
+ if (tp.partition() >= fieldValue) {
scanSpecList.add(
new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
fullScanSpec.get(tp).getStartOffset(),
@@ -302,8 +294,8 @@
}
break;
case "greater_than":
- for(TopicPartition tp : topicPartitions) {
- if(tp.partition() > fieldValue) {
+ for (TopicPartition tp : topicPartitions) {
+ if (tp.partition() > fieldValue) {
scanSpecList.add(
new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
fullScanSpec.get(tp).getStartOffset(),
@@ -312,8 +304,8 @@
}
break;
case "less_than_or_equal_to":
- for(TopicPartition tp : topicPartitions) {
- if(tp.partition() <= fieldValue) {
+ for (TopicPartition tp : topicPartitions) {
+ if (tp.partition() <= fieldValue) {
scanSpecList.add(
new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
fullScanSpec.get(tp).getStartOffset(),
@@ -322,8 +314,8 @@
}
break;
case "less_than":
- for(TopicPartition tp : topicPartitions) {
- if(tp.partition() < fieldValue) {
+ for (TopicPartition tp : topicPartitions) {
+ if (tp.partition() < fieldValue) {
scanSpecList.add(
new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
fullScanSpec.get(tp).getStartOffset(),
@@ -335,8 +327,9 @@
return scanSpecList;
}
- void close() {
- kafkaConsumer.close(CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ @Override
+ public void close() {
+ groupScan.getStoragePlugin().registerToClose(kafkaConsumer);
}
private long bindOffsetToRange(TopicPartition tp, long offset) {
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
index 002d043..443650e 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
@@ -30,10 +30,14 @@
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.physical.ScanPrel;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
public class KafkaPushDownFilterIntoScan extends StoragePluginOptimizerRule {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaPushDownFilterIntoScan.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaPushDownFilterIntoScan.class);
public static final StoragePluginOptimizerRule INSTANCE =
new KafkaPushDownFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
@@ -53,18 +57,21 @@
DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
KafkaGroupScan groupScan = (KafkaGroupScan) scan.getGroupScan();
- logger.info("Partitions ScanSpec before pushdown: " + groupScan.getPartitionScanSpecList());
- KafkaPartitionScanSpecBuilder builder = new KafkaPartitionScanSpecBuilder(groupScan, conditionExp);
- List<KafkaPartitionScanSpec> newScanSpec = null;
- newScanSpec = builder.parseTree();
- builder.close(); //Close consumer
+ if (logger.isDebugEnabled()) {
+ logger.debug("Partitions ScanSpec before push down: {}", groupScan.getPartitionScanSpecList());
+ }
+
+ List<KafkaPartitionScanSpec> newScanSpec;
+ try (KafkaPartitionScanSpecBuilder builder = new KafkaPartitionScanSpecBuilder(groupScan, conditionExp)) {
+ newScanSpec = builder.parseTree();
+ }
//No pushdown
- if(newScanSpec == null) {
+ if (newScanSpec == null) {
return;
}
- logger.info("Partitions ScanSpec after pushdown: " + newScanSpec);
+ logger.debug("Partitions ScanSpec after pushdown: {}", newScanSpec);
GroupScan newGroupScan = groupScan.cloneWithNewSpec(newScanSpec);
final ScanPrel newScanPrel =
new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
@@ -73,7 +80,7 @@
@Override
public boolean matches(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel) call.rel(1);
+ final ScanPrel scan = call.rel(1);
if (scan.getGroupScan() instanceof KafkaGroupScan) {
return super.matches(call);
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index 62e588c..5218c3b 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -17,19 +17,18 @@
*/
package org.apache.drill.exec.store.kafka;
+import java.io.IOException;
import java.util.Collection;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReader;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
@@ -40,61 +39,48 @@
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
public class KafkaRecordReader extends AbstractRecordReader {
private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
- public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+ private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+ private final ReadOptions readOptions;
+ private final KafkaStoragePlugin plugin;
+ private final KafkaPartitionScanSpec subScanSpec;
private VectorContainerWriter writer;
private MessageReader messageReader;
- private final boolean unionEnabled;
- private final KafkaStoragePlugin plugin;
- private final KafkaPartitionScanSpec subScanSpec;
- private final long kafkaPollTimeOut;
-
private long currentOffset;
private MessageIterator msgItr;
-
- private final boolean enableAllTextMode;
- private final boolean readNumbersAsDouble;
- private final String kafkaMsgReader;
private int currentMessageCount;
public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
FragmentContext context, KafkaStoragePlugin plugin) {
setColumns(projectedColumns);
- final OptionManager optionManager = context.getOptions();
- this.enableAllTextMode = optionManager.getBoolean(ExecConstants.KAFKA_ALL_TEXT_MODE);
- this.readNumbersAsDouble = optionManager.getBoolean(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE);
- this.unionEnabled = optionManager.getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
- this.kafkaMsgReader = optionManager.getString(ExecConstants.KAFKA_RECORD_READER);
- this.kafkaPollTimeOut = optionManager.getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
+ this.readOptions = new ReadOptions(context.getOptions());
this.plugin = plugin;
this.subScanSpec = subScanSpec;
}
@Override
protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
- Set<SchemaPath> transformed = Sets.newLinkedHashSet();
- if (!isStarQuery()) {
- for (SchemaPath column : projectedColumns) {
- transformed.add(column);
- }
- } else {
+ Set<SchemaPath> transformed = new LinkedHashSet<>();
+ if (isStarQuery()) {
transformed.add(SchemaPath.STAR_COLUMN);
+ } else {
+ transformed.addAll(projectedColumns);
}
return transformed;
}
@Override
- public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
- this.writer = new VectorContainerWriter(output, unionEnabled);
- messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader);
- messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), this.writer,
- this.enableAllTextMode, this.readNumbersAsDouble);
- msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, kafkaPollTimeOut);
+ public void setup(OperatorContext context, OutputMutator output) {
+ this.writer = new VectorContainerWriter(output, readOptions.isEnableUnionType());
+ messageReader = MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
+ messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), writer, readOptions);
+ msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, readOptions.getPollTimeOut());
}
/**
@@ -105,17 +91,20 @@
public int next() {
writer.allocate();
writer.reset();
- Stopwatch watch = Stopwatch.createStarted();
+ Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
currentMessageCount = 0;
try {
- while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) {
+ while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
currentOffset = consumerRecord.offset();
writer.setPosition(currentMessageCount);
- messageReader.readMessage(consumerRecord);
- if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
- break;
+ boolean status = messageReader.readMessage(consumerRecord);
+ // increment record count only if message was read successfully
+ if (status) {
+ if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
+ break;
+ }
}
}
@@ -123,33 +112,35 @@
messageReader.ensureAtLeastOneField();
}
writer.setValueCount(currentMessageCount);
- logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
+ if (watch != null) {
+ logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
+ }
logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
currentOffset);
return currentMessageCount;
} catch (Exception e) {
- String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (currentMessageCount + 1);
- throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger);
+ String msg = "Failure while reading messages from kafka. Record reader was at record: " + (currentMessageCount + 1);
+ throw UserException.dataReadError(e)
+ .message(msg)
+ .addContext(e.getMessage())
+ .build(logger);
}
}
@Override
- public void close() throws Exception {
- logger.info("Last offset processed for {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
+ public void close() throws IOException {
+ logger.debug("Last offset processed for {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
currentOffset);
- logger.info("Total time to fetch messages from {}:{} is - {} milliseconds", subScanSpec.getTopicName(),
+ logger.debug("Total time to fetch messages from {}:{} is - {} milliseconds", subScanSpec.getTopicName(),
subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
+ plugin.registerToClose(msgItr);
messageReader.close();
}
@Override
public String toString() {
- return "KafkaRecordReader[messageReader=" + messageReader
- + ", kafkaPollTimeOut=" + kafkaPollTimeOut
+ return "KafkaRecordReader[readOptions=" + readOptions
+ ", currentOffset=" + currentOffset
- + ", enableAllTextMode=" + enableAllTextMode
- + ", readNumbersAsDouble=" + readNumbersAsDouble
- + ", kafkaMsgReader=" + kafkaMsgReader
+ ", currentMessageCount=" + currentMessageCount
+ "]";
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index ce71531..8083e33 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -17,8 +17,8 @@
*/
package org.apache.drill.exec.store.kafka;
-import java.util.LinkedList;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -35,7 +35,7 @@
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> {
- static final Logger logger = LoggerFactory.getLogger(KafkaScanBatchCreator.class);
+ private static final Logger logger = LoggerFactory.getLogger(KafkaScanBatchCreator.class);
@Override
public CloseableRecordBatch getBatch(ExecutorFragmentContext context, KafkaSubScan subScan, List<RecordBatch> children)
@@ -43,13 +43,11 @@
Preconditions.checkArgument(children.isEmpty());
List<SchemaPath> columns = subScan.getColumns() != null ? subScan.getColumns() : GroupScan.ALL_COLUMNS;
- List<RecordReader> readers = new LinkedList<>();
- for (KafkaPartitionScanSpec scanSpec : subScan.getPartitionSubScanSpecList()) {
- readers.add(new KafkaRecordReader(scanSpec, columns, context, subScan.getKafkaStoragePlugin()));
- }
+ List<RecordReader> readers = subScan.getPartitionSubScanSpecList().stream()
+ .map(scanSpec -> new KafkaRecordReader(scanSpec, columns, context, subScan.getKafkaStoragePlugin()))
+ .collect(Collectors.toList());
- logger.info("Number of record readers initialized : {}", readers.size());
+ logger.debug("Number of record readers initialized : {}", readers.size());
return new ScanBatch(subScan, context, readers);
}
-
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
index 91c8fdf..d059099 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java
@@ -21,7 +21,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
public class KafkaScanSpec {
- private String topicName;
+ private final String topicName;
@JsonCreator
public KafkaScanSpec(@JsonProperty("topicName") String topicName) {
@@ -36,5 +36,4 @@
public String toString() {
return "KafkaScanSpec [topicName=" + topicName + "]";
}
-
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
index 2d45b89..257c0bf 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -22,7 +22,6 @@
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.JSONOptions;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.server.DrillbitContext;
@@ -30,28 +29,26 @@
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.kafka.schema.KafkaSchemaFactory;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
-import org.apache.drill.shaded.guava.com.google.common.io.Closer;
public class KafkaStoragePlugin extends AbstractStoragePlugin {
private static final Logger logger = LoggerFactory.getLogger(KafkaStoragePlugin.class);
private final KafkaSchemaFactory kafkaSchemaFactory;
private final KafkaStoragePluginConfig config;
- private final Closer closer = Closer.create();
+ private final KafkaAsyncCloser closer;
- public KafkaStoragePlugin(KafkaStoragePluginConfig config, DrillbitContext context, String name)
- throws ExecutionSetupException {
+ public KafkaStoragePlugin(KafkaStoragePluginConfig config, DrillbitContext context, String name) {
super(context, name);
logger.debug("Initializing {}", KafkaStoragePlugin.class.getName());
this.config = config;
this.kafkaSchemaFactory = new KafkaSchemaFactory(this, name);
+ this.closer = new KafkaAsyncCloser();
}
@Override
@@ -65,7 +62,7 @@
}
@Override
- public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
this.kafkaSchemaFactory.registerSchemas(schemaConfig, parent);
}
@@ -75,21 +72,19 @@
}
@Override
- public AbstractGroupScan getPhysicalScan(String userName,
- JSONOptions selection) throws IOException {
+ public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException {
KafkaScanSpec kafkaScanSpec = selection.getListWith(new ObjectMapper(),
new TypeReference<KafkaScanSpec>() {
});
return new KafkaGroupScan(this, kafkaScanSpec, null);
}
- public KafkaConsumer<byte[], byte[]> registerConsumer(KafkaConsumer<byte[], byte[]> consumer) {
- return closer.register(consumer);
+ public void registerToClose(AutoCloseable autoCloseable) {
+ closer.close(autoCloseable);
}
@Override
- public void close() throws IOException {
+ public void close() {
closer.close();
}
-
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
index 94afa5f..7feb4d9 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
@@ -18,7 +18,9 @@
package org.apache.drill.exec.store.kafka;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
+import java.util.StringJoiner;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.slf4j.Logger;
@@ -33,7 +35,8 @@
private static final Logger logger = LoggerFactory.getLogger(KafkaStoragePluginConfig.class);
public static final String NAME = "kafka";
- private Properties kafkaConsumerProps;
+
+ private final Properties kafkaConsumerProps;
@JsonCreator
public KafkaStoragePluginConfig(@JsonProperty("kafkaConsumerProps") Map<String, String> kafkaConsumerProps) {
@@ -48,31 +51,25 @@
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((kafkaConsumerProps == null) ? 0 : kafkaConsumerProps.hashCode());
- return result;
+ return Objects.hash(kafkaConsumerProps);
}
@Override
- public boolean equals(Object obj) {
- if (this == obj) {
+ public boolean equals(Object o) {
+ if (this == o) {
return true;
}
- if (obj == null) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
- if (getClass() != obj.getClass()) {
- return false;
- }
- KafkaStoragePluginConfig other = (KafkaStoragePluginConfig) obj;
- if (kafkaConsumerProps == null && other.kafkaConsumerProps == null) {
- return true;
- }
- if (kafkaConsumerProps == null || other.kafkaConsumerProps == null) {
- return false;
- }
- return kafkaConsumerProps.equals(other.kafkaConsumerProps);
+ KafkaStoragePluginConfig that = (KafkaStoragePluginConfig) o;
+ return kafkaConsumerProps.equals(that.kafkaConsumerProps);
}
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", KafkaStoragePluginConfig.class.getSimpleName() + "[", "]")
+ .add("kafkaConsumerProps=" + kafkaConsumerProps)
+ .toString();
+ }
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
index 67350df..6ea9e1d 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -74,7 +74,7 @@
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, partitionSubScanSpecList);
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
index 7373e2c..68855ce 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.kafka;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -31,11 +32,10 @@
import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import kafka.common.KafkaException;
-public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>> {
+public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>>, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(MessageIterator.class);
private final KafkaConsumer<byte[], byte[]> kafkaConsumer;
@@ -50,11 +50,11 @@
this.kafkaConsumer = kafkaConsumer;
this.kafkaPollTimeOut = kafkaPollTimeOut;
- List<TopicPartition> partitions = Lists.newArrayListWithCapacity(1);
+ List<TopicPartition> partitions = new ArrayList<>(1);
topicPartition = new TopicPartition(subScanSpec.getTopicName(), subScanSpec.getPartitionId());
partitions.add(topicPartition);
this.kafkaConsumer.assign(partitions);
- logger.info("Start offset of {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
+ logger.debug("Start offset of {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
subScanSpec.getStartOffset());
this.kafkaConsumer.seek(topicPartition, subScanSpec.getStartOffset());
this.endOffset = subScanSpec.getEndOffset();
@@ -76,38 +76,54 @@
return false;
}
- ConsumerRecords<byte[], byte[]> consumerRecords = null;
- Stopwatch stopwatch = Stopwatch.createStarted();
+ ConsumerRecords<byte[], byte[]> consumerRecords;
+ Stopwatch stopwatch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
try {
consumerRecords = kafkaConsumer.poll(kafkaPollTimeOut);
} catch (KafkaException ke) {
- logger.error(ke.getMessage(), ke);
throw UserException.dataReadError(ke).message(ke.getMessage()).build(logger);
+ } finally {
+ if (stopwatch != null) {
+ stopwatch.stop();
+ }
}
- stopwatch.stop();
if (consumerRecords.isEmpty()) {
- String errorMsg = new StringBuilder().append("Failed to fetch messages within ").append(kafkaPollTimeOut)
- .append(" milliseconds. Consider increasing the value of the property : ")
- .append(ExecConstants.KAFKA_POLL_TIMEOUT).toString();
- throw UserException.dataReadError().message(errorMsg).build(logger);
+ throw UserException.dataReadError()
+ .message("Failed to fetch messages within %s milliseconds. " +
+ "Consider increasing the value of the property: %s",
+ kafkaPollTimeOut, ExecConstants.KAFKA_POLL_TIMEOUT)
+ .build(logger);
}
- long lastFetchTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
- logger.debug("Total number of messages fetched : {}", consumerRecords.count());
- logger.debug("Time taken to fetch : {} milliseconds", lastFetchTime);
- totalFetchTime += lastFetchTime;
+ if (stopwatch != null) {
+ long lastFetchTime = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ logger.debug("Time taken to fetch : {} milliseconds", lastFetchTime);
+ totalFetchTime += lastFetchTime;
+ logger.debug("Total number of messages fetched : {}", consumerRecords.count());
+ }
recordIter = consumerRecords.iterator();
return recordIter.hasNext();
}
+ /**
+ * Returns total fetch time of the messages from topic.
+ * Only applicable if debug log level is enabled.
+ *
+ * @return calculated total fetch time if debug log level is enabled, 0 otherwise
+ */
public long getTotalFetchTime() {
- return this.totalFetchTime;
+ return totalFetchTime;
}
@Override
public ConsumerRecord<byte[], byte[]> next() {
return recordIter.next();
}
+
+ @Override
+ public void close() {
+ kafkaConsumer.close();
+ }
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
index cdaee9b..af3e163 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java
@@ -22,12 +22,16 @@
* It is expected that one should not modify the fieldName of each constant as it breaks the compatibility.
*/
public enum MetaDataField {
- KAFKA_TOPIC("kafkaTopic"), KAFKA_PARTITION_ID("kafkaPartitionId"), KAFKA_OFFSET("kafkaMsgOffset"), KAFKA_TIMESTAMP(
- "kafkaMsgTimestamp"), KAFKA_MSG_KEY("kafkaMsgKey");
+
+ KAFKA_TOPIC("kafkaTopic"),
+ KAFKA_PARTITION_ID("kafkaPartitionId"),
+ KAFKA_OFFSET("kafkaMsgOffset"),
+ KAFKA_TIMESTAMP("kafkaMsgTimestamp"),
+ KAFKA_MSG_KEY("kafkaMsgKey");
private final String fieldName;
- private MetaDataField(final String fieldName) {
+ MetaDataField(final String fieldName) {
this.fieldName = fieldName;
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/ReadOptions.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/ReadOptions.java
new file mode 100644
index 0000000..ea9ed0d
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/ReadOptions.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionManager;
+
+import java.util.StringJoiner;
+
+/**
+ * Holds all system / session options that are used during data read from Kafka.
+ */
+public class ReadOptions {
+
+ private final String messageReader;
+ private final long pollTimeOut;
+ private final boolean allTextMode;
+ private final boolean readNumbersAsDouble;
+ private final boolean enableUnionType;
+ private final boolean skipInvalidRecords;
+ private final boolean allowNanInf;
+ private final boolean allowEscapeAnyChar;
+
+ public ReadOptions(OptionManager optionManager) {
+ this.messageReader = optionManager.getString(ExecConstants.KAFKA_RECORD_READER);
+ this.pollTimeOut = optionManager.getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
+ this.allTextMode = optionManager.getBoolean(ExecConstants.KAFKA_ALL_TEXT_MODE);
+ this.readNumbersAsDouble = optionManager.getBoolean(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE);
+ this.enableUnionType = optionManager.getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
+ this.skipInvalidRecords = optionManager.getBoolean(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS);
+ this.allowNanInf = optionManager.getBoolean(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS);
+ this.allowEscapeAnyChar = optionManager.getBoolean(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR);
+ }
+
+ public String getMessageReader() {
+ return messageReader;
+ }
+
+ public long getPollTimeOut() {
+ return pollTimeOut;
+ }
+
+ public boolean isAllTextMode() {
+ return allTextMode;
+ }
+
+ public boolean isReadNumbersAsDouble() {
+ return readNumbersAsDouble;
+ }
+
+ public boolean isEnableUnionType() {
+ return enableUnionType;
+ }
+
+ public boolean isSkipInvalidRecords() {
+ return skipInvalidRecords;
+ }
+
+ public boolean isAllowNanInf() {
+ return allowNanInf;
+ }
+
+ public boolean isAllowEscapeAnyChar() {
+ return allowEscapeAnyChar;
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", ReadOptions.class.getSimpleName() + "[", "]")
+ .add("messageReader='" + messageReader + "'")
+ .add("pollTimeOut=" + pollTimeOut)
+ .add("allTextMode=" + allTextMode)
+ .add("readNumbersAsDouble=" + readNumbersAsDouble)
+ .add("enableUnionType=" + enableUnionType)
+ .add("skipInvalidRecords=" + skipInvalidRecords)
+ .add("allowNanInf=" + allowNanInf)
+ .add("allowEscapeAnyChar=" + allowEscapeAnyChar)
+ .toString();
+ }
+}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index 40e9e12..eb503aa 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -26,9 +26,16 @@
import java.io.IOException;
import java.util.List;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.easy.json.JsonProcessor;
+import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.ReadOptions;
import org.apache.drill.exec.vector.complex.fn.JsonReader;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -38,49 +45,65 @@
import org.slf4j.LoggerFactory;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
import io.netty.buffer.DrillBuf;
/**
* MessageReader class which will convert ConsumerRecord into JSON and writes to
* VectorContainerWriter of JsonReader
- *
*/
public class JsonMessageReader implements MessageReader {
private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class);
private JsonReader jsonReader;
private VectorContainerWriter writer;
+ private ObjectMapper objectMapper;
@Override
- public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, boolean allTextMode,
- boolean readNumbersAsDouble) {
+ public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, ReadOptions readOptions) {
// set skipOuterList to false as it doesn't applicable for JSON records and it's only applicable for JSON files.
this.jsonReader = new JsonReader.Builder(buf)
- .schemaPathColumns(columns)
- .allTextMode(allTextMode)
- .readNumbersAsDouble(readNumbersAsDouble)
- .build();
+ .schemaPathColumns(columns)
+ .allTextMode(readOptions.isAllTextMode())
+ .readNumbersAsDouble(readOptions.isReadNumbersAsDouble())
+ .enableNanInf(readOptions.isAllowNanInf())
+ .enableEscapeAnyChar(readOptions.isAllowEscapeAnyChar())
+ .build();
+ jsonReader.setIgnoreJSONParseErrors(readOptions.isSkipInvalidRecords());
this.writer = writer;
+ this.objectMapper = BaseJsonProcessor.getDefaultMapper()
+ .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, readOptions.isAllowNanInf())
+ .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, readOptions.isAllowEscapeAnyChar());
}
@Override
- public void readMessage(ConsumerRecord<?, ?> record) {
+ public boolean readMessage(ConsumerRecord<?, ?> record) {
+ byte[] recordArray = (byte[]) record.value();
+ String data = new String(recordArray, Charsets.UTF_8);
try {
- byte[] recordArray = (byte[]) record.value();
- JsonObject jsonObj = (new JsonParser()).parse(new String(recordArray, Charsets.UTF_8)).getAsJsonObject();
- jsonObj.addProperty(KAFKA_TOPIC.getFieldName(), record.topic());
- jsonObj.addProperty(KAFKA_PARTITION_ID.getFieldName(), record.partition());
- jsonObj.addProperty(KAFKA_OFFSET.getFieldName(), record.offset());
- jsonObj.addProperty(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
- jsonObj.addProperty(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? record.key().toString() : null);
- jsonReader.setSource(jsonObj.toString().getBytes(Charsets.UTF_8));
- jsonReader.write(writer);
- } catch (IOException e) {
- throw UserException.dataReadError(e).message(e.getMessage())
- .addContext("MessageReader", JsonMessageReader.class.getName()).build(logger);
+ JsonNode jsonNode = objectMapper.readTree(data);
+ if (jsonNode != null && jsonNode.isObject()) {
+ ObjectNode objectNode = (ObjectNode) jsonNode;
+ objectNode.put(KAFKA_TOPIC.getFieldName(), record.topic());
+ objectNode.put(KAFKA_PARTITION_ID.getFieldName(), record.partition());
+ objectNode.put(KAFKA_OFFSET.getFieldName(), record.offset());
+ objectNode.put(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
+ objectNode.put(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? record.key().toString() : null);
+ } else {
+ throw new IOException("Unsupported node type: " + (jsonNode == null ? "NO CONTENT" : jsonNode.getNodeType()));
+ }
+ jsonReader.setSource(jsonNode);
+ return convertJsonReadState(jsonReader.write(writer));
+ } catch (IOException | IllegalArgumentException e) {
+ String message = String.format("JSON record %s: %s", data, e.getMessage());
+ if (jsonReader.ignoreJSONParseError()) {
+ logger.debug("Skipping {}", message, e);
+ return false;
+ }
+ throw UserException.dataReadError(e)
+ .message("Failed to read " + message)
+ .addContext("MessageReader", JsonMessageReader.class.getName())
+ .build(logger);
}
}
@@ -91,17 +114,17 @@
@Override
public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin) {
- return plugin.registerConsumer(new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps(),
- new ByteArrayDeserializer(), new ByteArrayDeserializer()));
+ return new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps(),
+ new ByteArrayDeserializer(), new ByteArrayDeserializer());
}
@Override
- public void close() throws IOException {
+ public void close() {
this.writer.clear();
try {
this.writer.close();
} catch (Exception e) {
- logger.warn("Error while closing JsonMessageReader", e);
+ logger.warn("Error while closing JsonMessageReader: {}", e.getMessage());
}
}
@@ -109,4 +132,24 @@
public String toString() {
return "JsonMessageReader[jsonReader=" + jsonReader + "]";
}
+
+ /**
+ * Converts {@link JsonProcessor.ReadState} into true / false result.
+ *
+ * @param jsonReadState JSON reader read state
+ * @return true if read was successful, false otherwise
+ * @throws IllegalArgumentException if unexpected read state was encountered
+ */
+ private boolean convertJsonReadState(JsonProcessor.ReadState jsonReadState) {
+ switch (jsonReadState) {
+ case WRITE_SUCCEED:
+ case END_OF_STREAM:
+ return true;
+ case JSON_RECORD_PARSE_ERROR:
+ case JSON_RECORD_PARSE_EOF_ERROR:
+ return false;
+ default:
+ throw new IllegalArgumentException("Unexpected JSON read state: " + jsonReadState);
+ }
+ }
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
index 510a520..f925fce 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
@@ -22,6 +22,7 @@
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.ReadOptions;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -34,12 +35,11 @@
*/
public interface MessageReader extends Closeable {
- public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, boolean allTextMode,
- boolean readNumbersAsDouble);
+ void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, ReadOptions readOptions);
- public void readMessage(ConsumerRecord<?, ?> message);
+ boolean readMessage(ConsumerRecord<?, ?> message);
- public void ensureAtLeastOneField();
+ void ensureAtLeastOneField();
- public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin);
+ KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin);
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
index cd83f96..3c1cc78 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
@@ -26,7 +26,7 @@
private static final Logger logger = LoggerFactory.getLogger(MessageReaderFactory.class);
/**
- * Initialize kafka message reader beased on store.kafka.record.reader session
+ * Initialize kafka message reader based on store.kafka.record.reader session
* property
*
* @param messageReaderKlass
@@ -47,7 +47,7 @@
Class<?> klass = Class.forName(messageReaderKlass);
if (MessageReader.class.isAssignableFrom(klass)) {
messageReader = (MessageReader) klass.newInstance();
- logger.info("Initialized Message Reader : {}", messageReader);
+ logger.debug("Initialized Message Reader : {}", messageReader);
}
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw UserException.validationError().message("Failed to initialize message reader : %s", messageReaderKlass)
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
index 034927a..3fac096 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.kafka.schema;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -33,18 +34,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-
public class KafkaMessageSchema extends AbstractSchema {
private static final Logger logger = LoggerFactory.getLogger(KafkaMessageSchema.class);
private final KafkaStoragePlugin plugin;
- private final Map<String, DrillTable> drillTables = Maps.newHashMap();
+ private final Map<String, DrillTable> drillTables = new HashMap<>();
private Set<String> tableNames;
public KafkaMessageSchema(final KafkaStoragePlugin plugin, final String name) {
- super(ImmutableList.of(), name);
+ super(Collections.emptyList(), name);
this.plugin = plugin;
}
@@ -73,11 +71,15 @@
@Override
public Set<String> getTableNames() {
if (tableNames == null) {
- try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps())) {
+ KafkaConsumer<?, ?> kafkaConsumer = null;
+ try {
+ kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps());
tableNames = kafkaConsumer.listTopics().keySet();
} catch (Exception e) {
logger.warn("Failure while loading table names for database '{}': {}", getName(), e.getMessage(), e.getCause());
return Collections.emptySet();
+ } finally {
+ plugin.registerToClose(kafkaConsumer);
}
}
return tableNames;
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
index 86ef095..2ee5e88 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.store.kafka.schema;
-import java.io.IOException;
-
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.exec.store.AbstractSchemaFactory;
import org.apache.drill.exec.store.SchemaConfig;
@@ -34,10 +32,9 @@
}
@Override
- public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException {
+ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
KafkaMessageSchema schema = new KafkaMessageSchema(plugin, getName());
SchemaPlus hPlus = parent.add(getName(), schema);
schema.setHolder(hPlus);
}
-
}
diff --git a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
index 3c3142f..199332f 100644
--- a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
+++ b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
@@ -2,7 +2,15 @@
"storage":{
"kafka" : {
"type":"kafka",
- "kafkaConsumerProps": {"bootstrap.servers":"localhost:9092", "group.id" : "drill-consumer"},
+ "kafkaConsumerProps": {
+ "bootstrap.servers": "localhost:9092",
+ "group.id": "drill-query-consumer",
+ "key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+ "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+ "session.timeout.ms": "30000",
+ "enable.auto.commit": "true",
+ "auto.offset.reset": "earliest"
+ },
"enabled": false
}
}
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
index 8e8319b..4bfd5d4 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
@@ -27,7 +27,7 @@
import static org.apache.drill.exec.store.kafka.TestKafkaSuit.NUM_JSON_MSG;
import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
@Category({KafkaStorageTest.class, SlowTest.class})
public class KafkaFilterPushdownTest extends KafkaTestBase {
@@ -43,13 +43,11 @@
generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_JSON_MSG);
String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_PUSHDOWN_TOPIC);
//Ensure messages are present
- assertTrue("Kafka server does not have expected number of messages",
- testSql(query) == NUM_PARTITIONS * NUM_JSON_MSG);
+ assertEquals("Kafka server does not have expected number of messages", testSql(query), NUM_PARTITIONS * NUM_JSON_MSG);
}
/**
* Test filter pushdown with condition on kafkaMsgOffset.
- * @throws Exception
*/
@Test
public void testPushdownOnOffset() throws Exception {
@@ -67,7 +65,6 @@
/**
* Test filter pushdown with condition on kafkaPartitionId.
- * @throws Exception
*/
@Test
public void testPushdownOnPartition() throws Exception {
@@ -84,7 +81,6 @@
/**
* Test filter pushdown with condition on kafkaPartitionId.
- * @throws Exception
*/
@Test
public void testPushdownOnTimestamp() throws Exception {
@@ -101,7 +97,6 @@
/**
* Test filter pushdown when timestamp is not ordered.
- * @throws Exception
*/
@Test
public void testPushdownUnorderedTimestamp() throws Exception {
@@ -119,7 +114,6 @@
/**
* Test filter pushdown when timestamp value specified does not exist.
- * @throws Exception
*/
@Test
public void testPushdownWhenTimestampDoesNotExist() throws Exception {
@@ -136,7 +130,6 @@
/**
* Test filter pushdown when partition value specified does not exist.
- * @throws Exception
*/
@Test
public void testPushdownWhenPartitionDoesNotExist() throws Exception {
@@ -153,7 +146,6 @@
/**
* Test filter pushdown when timestamp exist but partition does not exist.
- * @throws Exception
*/
@Test
public void testPushdownForEmptyScanSpec() throws Exception {
@@ -172,7 +164,6 @@
/**
* Test filter pushdown on kafkaMsgOffset with boundary conditions.
* In every case, the number of records returned is 0.
- * @throws Exception
*/
@Test
public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws Exception {
@@ -230,7 +221,6 @@
/**
* Test filter pushdown on kafkaMsgOffset with boundary conditions.
* In every case, the number of records returned is 5 (1 per topic-partition).
- * @throws Exception
*/
@Test
public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws Exception {
@@ -264,7 +254,6 @@
/**
* Test filter pushdown with OR.
* Pushdown is possible if all the predicates are on metadata fields.
- * @throws Exception
*/
@Test
public void testPushdownWithOr() throws Exception {
@@ -282,7 +271,6 @@
/**
* Test filter pushdown with OR on kafkaMsgTimestamp and kafkaMsgOffset.
- * @throws Exception
*/
@Test
public void testPushdownWithOr1() throws Exception {
@@ -301,8 +289,6 @@
/**
* Test pushdown for a combination of AND and OR.
- *
- * @throws Exception
*/
@Test
public void testPushdownWithAndOrCombo() throws Exception {
@@ -321,7 +307,6 @@
/**
* Test pushdown for a combination of AND and OR.
- * @throws Exception
*/
@Test
public void testPushdownWithAndOrCombo2() throws Exception {
@@ -343,7 +328,6 @@
/**
* Test pushdown for predicate1 AND predicate2.
* Where predicate1 is on metadata field and and predicate2 is on user fields.
- * @throws Exception
*/
@Test
public void testPushdownTimestampWithNonMetaField() throws Exception {
@@ -363,7 +347,6 @@
/**
* Tests that pushdown does not happen for predicates such as
* non-metadata-field = val1 OR (kafkaMsgTimestamp > val2 AND kafkaMsgTimestamp < val4)
- * @throws Exception
*/
@Test
public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
@@ -380,5 +363,4 @@
PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT,
new String[] {String.format(EXPECTED_PATTERN, expectedRowCountInPlan)}, null);
}
-
}
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index d094531..745edb5 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -30,7 +30,6 @@
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -67,39 +66,38 @@
}
public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException {
- KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(producerProperties);
- Schema.Parser parser = new Schema.Parser();
- Schema schema = parser.parse(Resources.getResource("drill-avro-test.avsc").openStream());
- GenericRecordBuilder builder = new GenericRecordBuilder(schema);
- Random rand = new Random();
- for (int i = 0; i < numMsg; ++i) {
- builder.set("key1", UUID.randomUUID().toString());
- builder.set("key2", rand.nextInt());
- builder.set("key3", rand.nextBoolean());
+ try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(producerProperties)) {
+ Schema.Parser parser = new Schema.Parser();
+ Schema schema = parser.parse(Resources.getResource("drill-avro-test.avsc").openStream());
+ GenericRecordBuilder builder = new GenericRecordBuilder(schema);
+ Random rand = new Random();
+ for (int i = 0; i < numMsg; ++i) {
+ builder.set("key1", UUID.randomUUID().toString());
+ builder.set("key2", rand.nextInt());
+ builder.set("key3", rand.nextBoolean());
- List<Integer> list = Lists.newArrayList();
- list.add(rand.nextInt(100));
- list.add(rand.nextInt(100));
- list.add(rand.nextInt(100));
- builder.set("key5", list);
+ List<Integer> list = Lists.newArrayList();
+ list.add(rand.nextInt(100));
+ list.add(rand.nextInt(100));
+ list.add(rand.nextInt(100));
+ builder.set("key5", list);
- Map<String, Double> map = Maps.newHashMap();
- map.put("key61", rand.nextDouble());
- map.put("key62", rand.nextDouble());
- builder.set("key6", map);
+ Map<String, Double> map = Maps.newHashMap();
+ map.put("key61", rand.nextDouble());
+ map.put("key62", rand.nextDouble());
+ builder.set("key6", map);
- Record producerRecord = builder.build();
+ Record producerRecord = builder.build();
- ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, producerRecord);
- producer.send(record);
+ ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, producerRecord);
+ producer.send(record);
+ }
}
- producer.close();
}
- public void populateJsonMsgIntoKafka(String topic, int numMsg) throws InterruptedException, ExecutionException {
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProperties);
- Random rand = new Random();
- try {
+ public void populateJsonMsgIntoKafka(String topic, int numMsg) throws ExecutionException, InterruptedException {
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
+ Random rand = new Random();
for (int i = 0; i < numMsg; ++i) {
JsonObject object = new JsonObject();
object.addProperty("key1", UUID.randomUUID().toString());
@@ -118,29 +116,19 @@
element3.addProperty("key62", rand.nextDouble());
object.add("key6", element3);
- ProducerRecord<String, String> message = new ProducerRecord<String, String>(topic, object.toString());
+ ProducerRecord<String, String> message = new ProducerRecord<>(topic, object.toString());
logger.info("Publishing message : {}", message);
Future<RecordMetadata> future = producer.send(message);
logger.info("Committed offset of the message : {}", future.get().offset());
}
- } catch (Throwable th) {
- logger.error(th.getMessage(), th);
- throw new DrillRuntimeException(th.getMessage(), th);
- } finally {
- if (producer != null) {
- producer.close();
- }
}
}
- public void populateJsonMsgWithTimestamps(String topic, int numMsg) {
- KafkaProducer<String, String> producer = null;
- Random rand = new Random();
- try {
- producer = new KafkaProducer<String, String>(producerProperties);
+ public void populateJsonMsgWithTimestamps(String topic, int numMsg) throws ExecutionException, InterruptedException {
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
int halfCount = numMsg / 2;
- for(PartitionInfo tpInfo : producer.partitionsFor(topic)) {
+ for (PartitionInfo tpInfo : producer.partitionsFor(topic)) {
for (int i = 1; i <= numMsg; ++i) {
JsonObject object = new JsonObject();
object.addProperty("stringKey", UUID.randomUUID().toString());
@@ -148,22 +136,23 @@
object.addProperty("boolKey", i % 2 == 0);
long timestamp = i < halfCount ? (halfCount - i) : i;
- ProducerRecord<String, String> message =
- new ProducerRecord<String, String>(tpInfo.topic(), tpInfo.partition(), timestamp, "key"+i, object.toString());
+ ProducerRecord<String, String> message = new ProducerRecord<>(tpInfo.topic(), tpInfo.partition(), timestamp, "key" + i, object.toString());
logger.info("Publishing message : {}", message);
Future<RecordMetadata> future = producer.send(message);
logger.info("Committed offset of the message : {}", future.get().offset());
}
-
- }
- } catch (Throwable th) {
- logger.error(th.getMessage(), th);
- throw new DrillRuntimeException(th.getMessage(), th);
- } finally {
- if (producer != null) {
- producer.close();
}
}
}
+ public void populateMessages(String topic, String... messages) throws ExecutionException, InterruptedException {
+ try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
+ for (String content : messages) {
+ ProducerRecord<String, String> message = new ProducerRecord<>(topic, content);
+ logger.info("Publishing message : {}", message);
+ Future<RecordMetadata> future = producer.send(message);
+ logger.info("Committed offset of the message : {}", future.get().offset());
+ }
+ }
+ }
}
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index 6dc1b3e..62d1b66 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -17,22 +17,28 @@
*/
package org.apache.drill.exec.store.kafka;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.rpc.RpcException;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.Test;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.junit.experimental.categories.Category;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster;
+import static org.junit.Assert.fail;
+
@Category({KafkaStorageTest.class, SlowTest.class})
public class KafkaQueriesTest extends KafkaTestBase {
@@ -40,9 +46,12 @@
public void testSqlQueryOnInvalidTopic() throws Exception {
String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.INVALID_TOPIC);
try {
- testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.<Map<String, Object>> emptyList())
- .build().run();
- Assert.fail("Test passed though topic does not exist.");
+ testBuilder()
+ .sqlQuery(queryString)
+ .unOrdered()
+ .baselineRecords(Collections.emptyList())
+ .go();
+ fail("Test passed though topic does not exist.");
} catch (RpcException re) {
Assert.assertTrue(re.getMessage().contains("DATA_READ ERROR: Table 'invalid-topic' does not exist"));
}
@@ -60,8 +69,12 @@
Map<TopicPartition, Long> startOffsetsMap = fetchOffsets(-2);
String queryString = String.format(TestQueryConstants.MIN_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC);
- testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("minOffset")
- .baselineValues(startOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))).go();
+ testBuilder()
+ .sqlQuery(queryString)
+ .unOrdered()
+ .baselineColumns("minOffset")
+ .baselineValues(startOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0)))
+ .go();
}
@Test
@@ -70,8 +83,12 @@
Map<TopicPartition, Long> endOffsetsMap = fetchOffsets(-1);
String queryString = String.format(TestQueryConstants.MAX_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC);
- testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("maxOffset")
- .baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))-1).go();
+ testBuilder()
+ .sqlQuery(queryString)
+ .unOrdered()
+ .baselineColumns("maxOffset")
+ .baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0)) - 1)
+ .go();
}
@Test
@@ -81,19 +98,20 @@
}
private Map<TopicPartition, Long> fetchOffsets(int flag) {
- KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(),
+ Consumer<byte[], byte[]> kafkaConsumer = null;
+ try {
+ kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(),
new ByteArrayDeserializer(), new ByteArrayDeserializer());
- Map<TopicPartition, Long> offsetsMap = Maps.newHashMap();
- kafkaConsumer.subscribe(Collections.singletonList(TestQueryConstants.JSON_TOPIC));
- // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
- // evaluates lazily, seeking to the
- // first/last offset in all partitions only when poll(long) or
- // position(TopicPartition) are called
- kafkaConsumer.poll(0);
- Set<TopicPartition> assignments = kafkaConsumer.assignment();
+ Map<TopicPartition, Long> offsetsMap = new HashMap<>();
+ kafkaConsumer.subscribe(Collections.singletonList(TestQueryConstants.JSON_TOPIC));
+ // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
+ // evaluates lazily, seeking to the
+ // first/last offset in all partitions only when poll(long) or
+ // position(TopicPartition) are called
+ kafkaConsumer.poll(0);
+ Set<TopicPartition> assignments = kafkaConsumer.assignment();
- try {
if (flag == -2) {
// fetch start offsets for each topicPartition
kafkaConsumer.seekToBeginning(assignments);
@@ -109,10 +127,10 @@
} else {
throw new RuntimeException(String.format("Unsupported flag %d", flag));
}
+ return offsetsMap;
} finally {
- kafkaConsumer.close();
+ embeddedKafkaCluster.registerToClose(kafkaConsumer);
}
- return offsetsMap;
}
@Test
@@ -121,4 +139,109 @@
testPhysicalPlanExecutionBasedOnQuery(query);
}
+ @Test
+ public void testOneMessageTopic() throws Exception {
+ String topicName = "topicWithOneMessage";
+ TestKafkaSuit.createTopicHelper(topicName, 1);
+ KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
+ generator.populateMessages(topicName, "{\"index\": 1}");
+
+ testBuilder()
+ .sqlQuery("select index from kafka.`%s`", topicName)
+ .unOrdered()
+ .baselineColumns("index")
+ .baselineValues(1L)
+ .go();
+ }
+
+ @Test
+ public void testMalformedRecords() throws Exception {
+ String topicName = "topicWithMalFormedMessages";
+ TestKafkaSuit.createTopicHelper(topicName, 1);
+ try {
+ KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
+ generator.populateMessages(topicName, "Test");
+
+ alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, false);
+ try {
+ test("select * from kafka.`%s`", topicName);
+ fail();
+ } catch (UserException e) {
+ // expected
+ }
+
+ alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, true);
+ testBuilder()
+ .sqlQuery("select * from kafka.`%s`", topicName)
+ .expectsEmptyResultSet();
+
+ generator.populateMessages(topicName, "{\"index\": 1}", "", " ", "{Invalid}", "{\"index\": 2}");
+
+ testBuilder()
+ .sqlQuery("select index from kafka.`%s`", topicName)
+ .unOrdered()
+ .baselineColumns("index")
+ .baselineValues(1L)
+ .baselineValues(2L)
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS);
+ }
+ }
+
+ @Test
+ public void testNanInf() throws Exception {
+ String topicName = "topicWithNanInf";
+ TestKafkaSuit.createTopicHelper(topicName, 1);
+ try {
+ KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
+ generator.populateMessages(topicName, "{\"nan_col\":NaN, \"inf_col\":Infinity}");
+
+ alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, false);
+ try {
+ test("select nan_col, inf_col from kafka.`%s`", topicName);
+ fail();
+ } catch (UserException e) {
+ // expected
+ }
+
+ alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, true);
+ testBuilder()
+ .sqlQuery("select nan_col, inf_col from kafka.`%s`", topicName)
+ .unOrdered()
+ .baselineColumns("nan_col", "inf_col")
+ .baselineValues(Double.NaN, Double.POSITIVE_INFINITY)
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS);
+ }
+ }
+
+ @Test
+ public void testEscapeAnyChar() throws Exception {
+ String topicName = "topicWithEscapeAnyChar";
+ TestKafkaSuit.createTopicHelper(topicName, 1);
+ try {
+ KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
+ generator.populateMessages(topicName, "{\"name\": \"AB\\\"\\C\"}");
+
+ alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, false);
+ try {
+ test("select name from kafka.`%s`", topicName);
+ fail();
+ } catch (UserException e) {
+ // expected
+ }
+
+ alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, true);
+ testBuilder()
+ .sqlQuery("select name from kafka.`%s`", topicName)
+ .unOrdered()
+ .baselineColumns("name")
+ .baselineValues("AB\"C")
+ .go();
+ } finally {
+ resetSessionOption(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR);
+ }
+ }
}
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
index b1742d7..effff77 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.store.kafka;
-import static org.junit.Assert.assertEquals;
-
import java.util.List;
import java.util.Map;
@@ -77,18 +75,10 @@
}
}
- public void testHelper(String query, String expectedExprInPlan, int expectedRecordCount) throws Exception {
- testPhysicalPlan(query, expectedExprInPlan);
- int actualRecordCount = testSql(query);
- assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s",
- expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount);
- }
-
@AfterClass
- public static void tearDownKafkaTestBase() throws Exception {
+ public static void tearDownKafkaTestBase() {
if (TestKafkaSuit.isRunningSuite()) {
TestKafkaSuit.tearDownCluster();
}
}
-
}
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
index 4347167..12db4c3 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
@@ -55,6 +55,7 @@
public void cleanUp() {
if (kafkaConsumer != null) {
kafkaConsumer.close();
+ kafkaConsumer = null;
}
}
@@ -67,7 +68,7 @@
} catch (UserException ue) {
Assert.assertEquals(ErrorType.DATA_READ, ue.getErrorType());
Assert.assertTrue(ue.getMessage().contains(
- "DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout"));
+ "DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property: store.kafka.poll.timeout"));
}
}
@@ -89,7 +90,7 @@
Assert.assertNotNull(iterator.next());
try {
iterator.next();
- Assert.fail("Kafak fetched more messages than configured.");
+ Assert.fail("Kafka fetched more messages than configured.");
} catch (NoSuchElementException nse) {
// Expected
}
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index 784eb4e..b586d7d 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -17,20 +17,20 @@
*/
package org.apache.drill.exec.store.kafka;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.ZookeeperTestUtil;
import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.security.JaasUtils;
-
+import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@@ -40,29 +40,34 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
@Category({KafkaStorageTest.class, SlowTest.class})
@RunWith(Suite.class)
-@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class,
- KafkaFilterPushdownTest.class })
+@SuiteClasses({KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class, KafkaFilterPushdownTest.class})
public class TestKafkaSuit {
private static final Logger logger = LoggerFactory.getLogger(LoggerFactory.class);
+
private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf";
public static EmbeddedKafkaCluster embeddedKafkaCluster;
+
private static ZkClient zkClient;
private static volatile AtomicInteger initCount = new AtomicInteger(0);
- static final int NUM_JSON_MSG = 10;
- static final int CONN_TIMEOUT = 8 * 1000;
- static final int SESSION_TIMEOUT = 10 * 1000;
- static String kafkaBroker;
- private static volatile boolean runningSuite = false;
+ static final int NUM_JSON_MSG = 10;
+
+ private static final int CONN_TIMEOUT = 8 * 1000;
+
+ private static final int SESSION_TIMEOUT = 10 * 1000;
+
+ private static volatile boolean runningSuite = true;
@BeforeClass
public static void initKafka() throws Exception {
@@ -71,17 +76,9 @@
ZookeeperTestUtil.setZookeeperSaslTestConfigProps();
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, ClassLoader.getSystemResource(LOGIN_CONF_RESOURCE_PATHNAME).getFile());
embeddedKafkaCluster = new EmbeddedKafkaCluster();
- Properties topicProps = new Properties();
zkClient = new ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$);
- ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
- AdminUtils.createTopic(zkUtils, TestQueryConstants.JSON_TOPIC, 1, 1, topicProps, RackAwareMode.Disabled$.MODULE$);
-
- org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = AdminUtils
- .fetchTopicMetadataFromZk(TestQueryConstants.JSON_TOPIC, zkUtils);
- logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
-
- KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
- StringSerializer.class);
+ createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
+ KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
generator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC, NUM_JSON_MSG);
}
initCount.incrementAndGet();
@@ -95,32 +92,36 @@
}
@AfterClass
- public static void tearDownCluster() throws Exception {
+ public static void tearDownCluster() {
synchronized (TestKafkaSuit.class) {
if (initCount.decrementAndGet() == 0) {
if (zkClient != null) {
zkClient.close();
+ zkClient = null;
}
if (embeddedKafkaCluster != null && !embeddedKafkaCluster.getBrokers().isEmpty()) {
embeddedKafkaCluster.shutDownCluster();
+ embeddedKafkaCluster = null;
}
}
}
}
- public static void createTopicHelper(final String topicName, final int partitions) {
-
- Properties topicProps = new Properties();
- topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
- topicProps.put(TopicConfig.RETENTION_MS_CONFIG, "-1");
- ZkUtils zkUtils = new ZkUtils(zkClient,
- new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
- AdminUtils.createTopic(zkUtils, topicName, partitions, 1,
- topicProps, RackAwareMode.Disabled$.MODULE$);
-
- org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk =
- AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
- logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
+ public static void createTopicHelper(String topicName, int partitions) throws ExecutionException, InterruptedException {
+ try (AdminClient adminClient = initAdminClient()) {
+ NewTopic newTopic = new NewTopic(topicName, partitions, (short) 1);
+ Map<String, String> topicConfigs = new HashMap<>();
+ topicConfigs.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
+ topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, "-1");
+ newTopic.configs(topicConfigs);
+ CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
+ result.all().get();
+ }
}
+ private static AdminClient initAdminClient() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaCluster.getKafkaBrokerList());
+ return AdminClient.create(props);
+ }
}
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
index 663e0e4..eccc17a 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java
@@ -25,10 +25,8 @@
import java.util.Properties;
import org.apache.drill.exec.ZookeeperHelper;
-import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
+import org.apache.drill.exec.store.kafka.KafkaAsyncCloser;
import org.apache.drill.exec.store.kafka.TestQueryConstants;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,9 +34,11 @@
import kafka.server.KafkaServerStartable;
public class EmbeddedKafkaCluster implements TestQueryConstants {
+
private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
private List<KafkaServerStartable> brokers;
- private final ZookeeperHelper zkHelper;
+ private ZookeeperHelper zkHelper;
+ private KafkaAsyncCloser closer;
private final Properties props;
public EmbeddedKafkaCluster() throws IOException {
@@ -49,9 +49,9 @@
this(props, 1);
}
- public EmbeddedKafkaCluster(Properties basePorps, int numberOfBrokers) throws IOException {
+ public EmbeddedKafkaCluster(Properties baseProps, int numberOfBrokers) throws IOException {
this.props = new Properties();
- props.putAll(basePorps);
+ props.putAll(baseProps);
this.zkHelper = new ZookeeperHelper();
zkHelper.startZookeeper(1);
this.brokers = new ArrayList<>(numberOfBrokers);
@@ -62,13 +62,14 @@
sb.append(BROKER_DELIM);
}
int ephemeralBrokerPort = getEphemeralPort();
- sb.append(LOCAL_HOST + ":" + ephemeralBrokerPort);
+ sb.append(LOCAL_HOST).append(":").append(ephemeralBrokerPort);
addBroker(props, i, ephemeralBrokerPort);
}
this.props.put("metadata.broker.list", sb.toString());
this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
logger.info("Initialized Kafka Server");
+ this.closer = new KafkaAsyncCloser();
}
private void addBroker(Properties props, int brokerID, int ephemeralBrokerPort) {
@@ -79,13 +80,14 @@
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
properties.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1));
properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100));
- properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.TRUE);
+ properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.FALSE);
properties.put(KafkaConfig.ZkConnectProp(), zkHelper.getConnectionString());
properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1));
- properties.put(KafkaConfig.HostNameProp(), String.valueOf(LOCAL_HOST));
- properties.put(KafkaConfig.AdvertisedHostNameProp(), String.valueOf(LOCAL_HOST));
+ properties.put(KafkaConfig.HostNameProp(), LOCAL_HOST);
+ properties.put(KafkaConfig.AdvertisedHostNameProp(), LOCAL_HOST);
properties.put(KafkaConfig.PortProp(), String.valueOf(ephemeralBrokerPort));
- properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.FALSE);
+ properties.put(KafkaConfig.AdvertisedPortProp(), String.valueOf(ephemeralBrokerPort));
+ properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.TRUE);
properties.put(KafkaConfig.LogDirsProp(), getTemporaryDir().getAbsolutePath());
properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
brokers.add(getBroker(properties));
@@ -97,23 +99,25 @@
return broker;
}
- public void shutDownCluster() throws IOException {
- // set Kafka log level to ERROR
- Level level = LogManager.getLogger(KafkaStoragePluginConfig.NAME).getLevel();
- LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(Level.ERROR);
+ public void shutDownCluster() {
+ closer.close();
+ closer = null;
- for (KafkaServerStartable broker : brokers) {
- broker.shutdown();
+ if (brokers != null) {
+ for (KafkaServerStartable broker : brokers) {
+ broker.shutdown();
+ }
+ brokers = null;
}
-
- // revert back the level
- LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(level);
- zkHelper.stopZookeeper();
+ if (zkHelper != null) {
+ zkHelper.stopZookeeper();
+ zkHelper = null;
+ }
}
public void shutDownBroker(int brokerId) {
for (KafkaServerStartable broker : brokers) {
- if (Integer.valueOf(broker.serverConfig().getString(KafkaConfig.BrokerIdProp())) == brokerId) {
+ if (Integer.parseInt(broker.staticServerConfig().getString(KafkaConfig.BrokerIdProp())) == brokerId) {
broker.shutdown();
return;
}
@@ -141,13 +145,17 @@
public String getKafkaBrokerList() {
StringBuilder sb = new StringBuilder();
for (KafkaServerStartable broker : brokers) {
- KafkaConfig serverConfig = broker.serverConfig();
- sb.append(serverConfig.hostName() + ":" + serverConfig.port());
+ KafkaConfig serverConfig = broker.staticServerConfig();
+ sb.append(serverConfig.hostName()).append(":").append(serverConfig.port());
sb.append(",");
}
return sb.toString().substring(0, sb.toString().length() - 1);
}
+ public void registerToClose(AutoCloseable autoCloseable) {
+ closer.close(autoCloseable);
+ }
+
private int getEphemeralPort() throws IOException {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
@@ -162,5 +170,4 @@
}
return file;
}
-
}
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
index 796c63a..1b8aa11 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
@@ -33,7 +33,7 @@
MessageReaderFactory.getMessageReader(null);
Assert.fail("Message reader initialization succeeded even though it is null");
} catch (UserException ue) {
- Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+ Assert.assertSame(ue.getErrorType(), ErrorType.VALIDATION);
Assert.assertTrue(ue.getMessage().contains(
"VALIDATION ERROR: Please configure message reader implementation using the property 'store.kafka.record.reader'"));
}
@@ -45,7 +45,7 @@
MessageReaderFactory.getMessageReader(MessageReaderFactoryTest.class.getName());
Assert.fail("Message reader initialization succeeded even though class does not implement message reader interface");
} catch (UserException ue) {
- Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+ Assert.assertSame(ue.getErrorType(), ErrorType.VALIDATION);
Assert.assertTrue(ue.getMessage().contains(
"VALIDATION ERROR: Message reader configured 'org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest' does not implement 'org.apache.drill.exec.store.kafka.decoders.MessageReader'"));
}
@@ -57,7 +57,7 @@
MessageReaderFactory.getMessageReader("a.b.c.d");
Assert.fail("Message reader initialization succeeded even though class does not exist");
} catch (UserException ue) {
- Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION);
+ Assert.assertSame(ue.getErrorType(), ErrorType.VALIDATION);
Assert.assertTrue(ue.getMessage().contains("VALIDATION ERROR: Failed to initialize message reader : a.b.c.d"));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 91f8803..22a6cb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -539,6 +539,16 @@
public static final String KAFKA_POLL_TIMEOUT = "store.kafka.poll.timeout";
public static final PositiveLongValidator KAFKA_POLL_TIMEOUT_VALIDATOR = new PositiveLongValidator(KAFKA_POLL_TIMEOUT, Long.MAX_VALUE,
new OptionDescription("Amount of time in milliseconds allotted to the Kafka client to fetch messages from the Kafka cluster; default value is 200."));
+ public static final String KAFKA_READER_SKIP_INVALID_RECORDS = "store.kafka.reader.skip_invalid_records";
+ public static final BooleanValidator KAFKA_SKIP_MALFORMED_RECORDS_VALIDATOR = new BooleanValidator(KAFKA_READER_SKIP_INVALID_RECORDS,
+ new OptionDescription("Allows queries to progress when the JSON record reader skips bad records in JSON files. Default is false. (Drill 1.17+)"));
+ public static final String KAFKA_READER_NAN_INF_NUMBERS = "store.kafka.reader.allow_nan_inf";
+ public static final BooleanValidator KAFKA_READER_NAN_INF_NUMBERS_VALIDATOR = new BooleanValidator(KAFKA_READER_NAN_INF_NUMBERS,
+ new OptionDescription("Enables the Kafka JSON record reader in Drill to read `NaN` and `Infinity` tokens in JSON data as numbers. Default is true. (Drill 1.17+)"));
+ public static final String KAFKA_READER_ESCAPE_ANY_CHAR = "store.kafka.reader.allow_escape_any_char";
+ public static final BooleanValidator KAFKA_READER_ESCAPE_ANY_CHAR_VALIDATOR = new BooleanValidator(KAFKA_READER_ESCAPE_ANY_CHAR,
+ new OptionDescription("Enables the Kafka JSON record reader in Drill to escape any character. Default is false. (Drill 1.17+)"));
+
// TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare
// in core which is not right. Move this option and above two mongo plugin related options once we have the feature.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 28fa3e3..e3ed2f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -202,6 +202,9 @@
new OptionDefinition(ExecConstants.KAFKA_RECORD_READER_VALIDATOR),
new OptionDefinition(ExecConstants.KAFKA_POLL_TIMEOUT_VALIDATOR),
new OptionDefinition(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR),
+ new OptionDefinition(ExecConstants.KAFKA_SKIP_MALFORMED_RECORDS_VALIDATOR),
+ new OptionDefinition(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS_VALIDATOR),
+ new OptionDefinition(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR_VALIDATOR),
new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR),
new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR),
new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4e33c82..021e297 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -670,6 +670,9 @@
store.kafka.read_numbers_as_double: false,
store.kafka.record.reader: "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader",
store.kafka.poll.timeout: 200,
+ store.kafka.reader.skip_invalid_records: false,
+ store.kafka.reader.allow_nan_inf: true,
+ store.kafka.reader.allow_escape_any_char: false,
web.logs.max_lines: 10000,
web.display_format.timestamp: "",
web.display_format.date: "",