TAJO-1488: Implement KafkaTablespace.
Closes #1044
Signed-off-by: Jinho Kim <jhkim@apache.org>
diff --git a/CHANGES b/CHANGES
index 5e7fc0f..85fccd0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -408,6 +408,8 @@
SUB TASKS
+ TAJO-1488: Implement KafkaTablespace. (Byunghwa Yun via jinho)
+
TAJO-1487: Kafka Scanner for kafka strage. (Byunghwa Yun via jinho)
TAJO-2174: Implement HiveCatalogStore#alterTable. (Lee Dongjin via jinho)
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
index 17ac3ba..a0b4280 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
@@ -31,6 +31,10 @@
"swift": {
"handler": "org.apache.tajo.storage.s3.S3TableSpace",
"default-format": "text"
+ },
+ "kafka": {
+ "handler": "org.apache.tajo.storage.kafka.KafkaTablespace",
+ "default-format": "kafka"
}
}
}
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index ce0ce85..976fb24 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -39,7 +39,7 @@
<!--- Registered Scanner Handler -->
<property>
<name>tajo.storage.scanner-handler</name>
- <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json</value>
+ <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json,kafka</value>
</property>
<!--- Fragment Class Configurations -->
@@ -60,6 +60,10 @@
<value>org.apache.tajo.storage.http.ExampleHttpFileFragment</value>
</property>
<property>
+ <name>tajo.storage.fragment.kind.kafka</name>
+ <value>org.apache.tajo.storage.kafka.KafkaFragment</value>
+ </property>
+ <property>
<name>tajo.storage.fragment.serde.file</name>
<value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
</property>
@@ -75,6 +79,10 @@
<name>tajo.storage.fragment.serde.example-http</name>
<value>org.apache.tajo.storage.http.ExampleHttpFileFragmentSerde</value>
</property>
+ <property>
+ <name>tajo.storage.fragment.serde.kafka</name>
+ <value>org.apache.tajo.storage.kafka.KafkaFragmentSerde</value>
+ </property>
<!--- Scanner Handler -->
<property>
@@ -137,6 +145,11 @@
<value>org.apache.tajo.storage.http.ExampleHttpJsonScanner</value>
</property>
+ <property>
+ <name>tajo.storage.scanner-handler.kafka.class</name>
+ <value>org.apache.tajo.storage.kafka.KafkaScanner</value>
+ </property>
+
<!--- Appender Handler -->
<property>
<name>tajo.storage.appender-handler</name>
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index 1737e22..d6e74a3 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -38,7 +38,7 @@
<!--- Registered Scanner Handler -->
<property>
<name>tajo.storage.scanner-handler</name>
- <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json</value>
+ <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json,kafka</value>
</property>
<!--- Fragment Class Configurations -->
@@ -131,6 +131,11 @@
<value>org.apache.tajo.storage.http.ExampleHttpJsonScanner</value>
</property>
+ <property>
+ <name>tajo.storage.scanner-handler.kafka.class</name>
+ <value>org.apache.tajo.storage.kafka.KafkaScanner</value>
+ </property>
+
<!--- Appender Handler -->
<property>
<name>tajo.storage.appender-handler</name>
diff --git a/tajo-storage/tajo-storage-kafka/pom.xml b/tajo-storage/tajo-storage-kafka/pom.xml
index 82a55ff..ab93e0b 100644
--- a/tajo-storage/tajo-storage-kafka/pom.xml
+++ b/tajo-storage/tajo-storage-kafka/pom.xml
@@ -56,8 +56,10 @@
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
- <exclude>src/main/resources/*.json</exclude>
- <exclude>src/test/resources/*.json</exclude>
+ <exclude>derby.log</exclude>
+ <exclude>src/test/resources/dataset/**</exclude>
+ <exclude>src/test/resources/queries/**</exclude>
+ <exclude>src/test/resources/results/**</exclude>
</excludes>
</configuration>
<executions>
@@ -185,6 +187,16 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-rpc-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tajo</groupId>
+ <artifactId>tajo-cluster-tests</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
@@ -213,6 +225,73 @@
<version>0.88</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>netty-all</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>netty-all</artifactId>
+ <groupId>io.netty</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<profiles>
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java
index 4008a25..9f189ff 100644
--- a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java
@@ -35,19 +35,18 @@
public KafkaFragment(URI uri, String tableName, String topicName, long startOffset, long lastOffset,
int partitionId, String leaderHost) {
- super(BuiltinFragmentKinds.KAFKA, uri, tableName, new KafkaFragmentKey(partitionId, startOffset),
- new KafkaFragmentKey(partitionId, lastOffset), lastOffset - startOffset, new String[] { leaderHost });
-
- this.topicName = topicName;
- this.last = false;
+ this(uri, tableName, topicName, startOffset, lastOffset, partitionId, leaderHost, false);
}
public KafkaFragment(URI uri, String tableName, String topicName, long startOffset, long lastOffset,
int partitionId, String leaderHost, boolean last) {
- this(uri, tableName, topicName, startOffset, lastOffset, partitionId, leaderHost);
+ super(BuiltinFragmentKinds.KAFKA, uri, tableName, new KafkaFragmentKey(partitionId, startOffset),
+ new KafkaFragmentKey(partitionId, lastOffset), lastOffset - startOffset, new String[] { leaderHost });
+ this.topicName = topicName;
this.last = last;
}
+ @Override
public Object clone() throws CloneNotSupportedException {
KafkaFragment frag = (KafkaFragment) super.clone();
frag.topicName = topicName;
@@ -111,6 +110,10 @@
this.endKey = new KafkaFragmentKey(partitionId, lastOffset);
}
+ public int getPartitionId() {
+ return this.startKey.getPartitionId();
+ }
+
public static class KafkaFragmentKey implements Comparable<KafkaFragmentKey> {
private final int partitionId;
private final long offset;
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java
index 2899b7c..f79d36e 100644
--- a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java
@@ -40,6 +40,7 @@
.setTopicName(fragment.getTopicName())
.setStartOffset(fragment.getStartKey().getOffset())
.setLastOffset(fragment.getEndKey().getOffset())
+ .setPartitionId(fragment.getPartitionId())
.setLast(fragment.isLast())
.setLength(fragment.getLength())
.setLeaderHost(fragment.getHostNames().get(0))
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java
index 496dfa6..8e2f2da 100644
--- a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java
@@ -106,7 +106,7 @@
deserializer.init();
simpleConsumerManager = new SimpleConsumerManager(fragment.getUri(), fragment.getTopicName(),
- fragment.getStartKey().getPartitionId(), fragmentSize);
+ fragment.getPartitionId(), fragmentSize);
initOffset();
}
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaTablespace.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaTablespace.java
new file mode 100644
index 0000000..9b56cf4
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaTablespace.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.exception.NotImplementedException;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.FormatProperty;
+import org.apache.tajo.storage.StorageProperty;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import net.minidev.json.JSONObject;
+
+/**
+ * Tablespace for Kafka table.
+ */
+public class KafkaTablespace extends Tablespace {
+ public static final StorageProperty KAFKA_STORAGE_PROPERTIES = new StorageProperty("kafka", false, false, false,
+ false);
+
+ public static final FormatProperty KAFKA_FORMAT_PROPERTIES = new FormatProperty(false, false, false);
+
+ public KafkaTablespace(String name, URI uri, JSONObject config) {
+ super(name, uri, config);
+ }
+
+ @Override
+ protected void storageInit() throws IOException {
+ }
+
+ @Override
+ public long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
+ long totalVolume;
+ try {
+ totalVolume = getSplits("", table, false, filter.orElse(null)).stream()
+ .map(f -> f.getLength())
+ .filter(size -> size > 0) // eliminate unknown sizes (-1)
+ .reduce(0L, Long::sum);
+ } catch (TajoException e) {
+ throw new TajoRuntimeException(e);
+ } catch (Throwable ioe) {
+ throw new TajoInternalError(ioe);
+ }
+ return totalVolume;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void createTable(TableDesc tableDesc, boolean ifNotExists) throws TajoException, IOException {
+ TableStats stats = new TableStats();
+ stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+ tableDesc.setStats(stats);
+ }
+
+ @Override
+ public void purgeTable(TableDesc tableDesc) throws IOException, TajoException {
+ }
+
+ @Override
+ public URI getTableUri(TableMeta meta, String databaseName, String tableName) {
+ return URI.create(uri.toString() + "/" + tableName);
+ }
+
+ @Override
+ public List<Fragment> getSplits(String inputSourceId,
+ TableDesc table,
+ boolean requireSorted,
+ EvalNode filterCondition)
+ throws IOException, TajoException {
+ String topic = table.getMeta().getProperty(KafkaStorageConstants.KAFKA_TOPIC);
+ int fragmentSize = Integer.parseInt(table.getMeta().getProperty(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE,
+ KafkaStorageConstants.DEFAULT_FRAGMENT_SIZE));
+ // If isn't specific partitions, scan all partition of topic.
+ String partitions = table.getMeta().getProperty(KafkaStorageConstants.KAFKA_TOPIC_PARTITION,
+ KafkaStorageConstants.DEFAULT_PARTITION);
+ List<PartitionInfo> partitionList;
+ if (partitions.equals(KafkaStorageConstants.DEFAULT_PARTITION)) {
+ partitionList = SimpleConsumerManager.getPartitions(uri, topic);
+ } else {
+ partitionList = new LinkedList<>();
+ // filter partitions.
+ List<PartitionInfo> topicPartitions = SimpleConsumerManager.getPartitions(uri, topic);
+ Map<String, PartitionInfo> topicPartitionsMap = new HashMap<>();
+ for (PartitionInfo partitionInfo : topicPartitions) {
+ topicPartitionsMap.put(Integer.toString(partitionInfo.partition()), partitionInfo);
+ }
+ for (String partitionId : partitions.split(",")) {
+ partitionList.add(topicPartitionsMap.get(partitionId));
+ }
+ }
+
+ List<Fragment> fragments = new ArrayList<Fragment>();
+ for (PartitionInfo partitionInfo : partitionList) {
+ int partitionId = partitionInfo.partition();
+ String leaderHost = partitionInfo.leader().host();
+ long lastOffset;
+ long startOffset;
+ try (SimpleConsumerManager simpleConsumerManager = new SimpleConsumerManager(uri, topic, partitionId)) {
+ lastOffset = simpleConsumerManager.getLatestOffset();
+ startOffset = simpleConsumerManager.getEarliestOffset();
+ }
+
+ long messageSize = lastOffset - startOffset;
+ if (0 == lastOffset || 0 == messageSize)
+ continue;
+
+ // If message count of partition is less than fragmentSize(message count of one fragment),
+ if (messageSize <= fragmentSize) {
+ fragments.add(new KafkaFragment(table.getUri(), inputSourceId, topic, startOffset,
+ lastOffset, partitionId, leaderHost));
+ } else { // If message count of partition is greater than fragmentSize,
+ long nextFragmentStartOffset = startOffset;
+ while (nextFragmentStartOffset < lastOffset) {
+ // partition data: 0 1 2 3 4 5 6 7 8 9 10
+ // start offset: 0
+ // last offset: 11
+ // fragment size: 3
+ // result: (0, 1, 2), (3, 4, 5), (6, 7, 8), (9, 10)
+ // 1st nextFragmentStartOffset=0, nextFragmentLastOffset=3
+ // 2st nextFragmentStartOffset=3, nextFragmentLastOffset=6
+ // 3st nextFragmentStartOffset=6, nextFragmentLastOffset=9
+ // 4st nextFragmentStartOffset=9, nextFragmentLastOffset=12
+ long nextFragmentLastOffset = nextFragmentStartOffset + fragmentSize;
+ // the offset of last part is small than fragmentSize so that Tajo gets the minimum value.
+ long fragmentLstOffset = Math.min(nextFragmentLastOffset, lastOffset);
+ fragments.add(new KafkaFragment(table.getUri(), inputSourceId, topic,
+ nextFragmentStartOffset, fragmentLstOffset, partitionId, leaderHost));
+ nextFragmentStartOffset = nextFragmentLastOffset;
+ }
+ }
+ }
+ return fragments;
+ }
+
+ @Override
+ public StorageProperty getProperty() {
+ return KAFKA_STORAGE_PROPERTIES;
+ }
+
+ @Override
+ public FormatProperty getFormatProperty(TableMeta meta) {
+ return KAFKA_FORMAT_PROPERTIES;
+ }
+
+ @Override
+ public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, Schema inputSchema,
+ SortSpec[] sortSpecs, TupleRange dataRange) throws IOException {
+ throw new TajoRuntimeException(new NotImplementedException());
+ }
+
+ @Override
+ public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws TajoException {
+ throw new TajoRuntimeException(new NotImplementedException());
+ }
+
+ @Override
+ public void prepareTable(LogicalNode node) throws IOException, TajoException {
+ }
+
+ @Override
+ public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema,
+ TableDesc tableDesc) throws IOException {
+ throw new TajoRuntimeException(new NotImplementedException());
+ }
+
+ @Override
+ public void rollbackTable(LogicalNode node) throws IOException, TajoException {
+ throw new TajoRuntimeException(new NotImplementedException());
+ }
+
+ @Override
+ public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
+ throw new TajoRuntimeException(new UnsupportedException());
+ }
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java
index dd44ea7..b4c7736 100644
--- a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java
@@ -146,8 +146,12 @@
static String extractBroker(URI uri) {
String uriStr = uri.toString();
int start = uriStr.indexOf("/") + 2;
-
- return uriStr.substring(start);
+ int end = uriStr.indexOf("/", start);
+ if (end < 0) {
+ return uriStr.substring(start);
+ } else {
+ return uriStr.substring(start, end);
+ }
}
/**
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaQuery.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaQuery.java
new file mode 100644
index 0000000..522d0cc
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaQuery.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import static org.apache.tajo.storage.kafka.KafkaTestUtil.DEFAULT_TEST_PARTITION_NUM;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.kafka.server.EmbeddedKafka;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URI;
+
+import net.minidev.json.JSONObject;
+
+public class TestKafkaQuery extends QueryTestCaseBase {
+
+ private static final String TEST_TOPIC_USER = "TEST_TOPIC_USER";
+ private static final String TEST_TOPIC_PROD = "TEST_TOPIC2_PROD";
+
+ private static EmbeddedKafka KAFKA;
+ private static URI KAFKA_SERVER_URI;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ KAFKA = EmbeddedKafka.createEmbeddedKafka(2181, 9092);
+ KAFKA.start();
+ KAFKA.createTopic(DEFAULT_TEST_PARTITION_NUM, 1, TEST_TOPIC_USER);
+ KAFKA.createTopic(DEFAULT_TEST_PARTITION_NUM, 1, TEST_TOPIC_PROD);
+ KAFKA_SERVER_URI = URI.create("kafka://" + KAFKA.getConnectString());
+
+ // Load test data.
+ try (Producer<String, String> producer = KAFKA.createProducer(KAFKA.getConnectString())) {
+ KafkaTestUtil.sendTestData(producer, TEST_TOPIC_USER, "1|user1");
+ KafkaTestUtil.sendTestData(producer, TEST_TOPIC_USER, "2|user2");
+ KafkaTestUtil.sendTestData(producer, TEST_TOPIC_USER, "3|user3");
+ KafkaTestUtil.sendTestData(producer, TEST_TOPIC_USER, "4|user4");
+ KafkaTestUtil.sendTestData(producer, TEST_TOPIC_USER, "6|user6");
+ for (int i = 0; i < 2; i++) {
+ KafkaTestUtil.sendTestData(producer, TEST_TOPIC_PROD);
+ }
+ }
+
+ JSONObject configElements = new JSONObject();
+ KafkaTablespace hBaseTablespace = new KafkaTablespace("cluster1", KAFKA_SERVER_URI, configElements);
+ hBaseTablespace.init(new TajoConf());
+ TablespaceManager.addTableSpaceForTest(hBaseTablespace);
+
+ QueryTestCaseBase.testingCluster.getMaster().refresh();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ KAFKA.close();
+ }
+
+ @Before
+ public void prepareTables() throws TajoException {
+ String createSql = "create table user (id int4, name text) tablespace cluster1 using kafka with ('kafka.topic'='"
+ + TEST_TOPIC_USER + "')";
+ executeString(createSql);
+
+ createSql = "create table prod (id int4, prod_name text, point float4) tablespace cluster1 using kafka with ('kafka.topic'='"
+ + TEST_TOPIC_PROD + "')";
+ executeString(createSql);
+ }
+
+ @After
+ public void dropTables() throws TajoException {
+ executeString("drop table user");
+ executeString("drop table prod");
+ }
+
+ @SimpleTest
+ @Test
+ public void testSelect() throws Exception {
+ runSimpleTests();
+ }
+
+ @SimpleTest
+ @Test
+ public void testGroupby() throws Exception {
+ runSimpleTests();
+ }
+
+ @SimpleTest
+ @Test
+ public void testJoin() throws Exception {
+ runSimpleTests();
+ }
+
+ @SimpleTest
+ @Test
+ public void testLeftOuterJoin() throws Exception {
+ runSimpleTests();
+ }
+
+ @SimpleTest
+ @Test
+ public void testFullOuterJoin() throws Exception {
+ runSimpleTests();
+ }
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaTablespace.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaTablespace.java
new file mode 100644
index 0000000..904492e
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaTablespace.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import static org.apache.tajo.storage.kafka.KafkaTestUtil.TOPIC_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.kafka.server.EmbeddedKafka;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import net.minidev.json.JSONObject;
+
+public class TestKafkaTablespace {
+ private static EmbeddedKafka KAFKA;
+ private static URI KAFKA_SERVER_URI;
+
+ /**
+ * Start up EmbeddedKafka and Generate test data.
+ *
+ * @throws Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ KAFKA = EmbeddedKafka.createEmbeddedKafka(2181, 9092);
+ KAFKA.start();
+ KAFKA.createTopic(1, 1, TOPIC_NAME);
+ KAFKA_SERVER_URI = URI.create("kafka://" + KAFKA.getConnectString());
+
+ // Load test data.
+ try (Producer<String, String> producer = KAFKA.createProducer(KAFKA.getConnectString())) {
+ for (int i = 0; i < 20; i++) {
+ KafkaTestUtil.sendTestData(producer, TOPIC_NAME);
+ }
+ }
+
+ JSONObject configElements = new JSONObject();
+ KafkaTablespace hBaseTablespace = new KafkaTablespace("cluster1", KAFKA_SERVER_URI, configElements);
+ hBaseTablespace.init(new TajoConf());
+ TablespaceManager.addTableSpaceForTest(hBaseTablespace);
+ }
+
+ @Test
+ public void testTablespaceHandler() throws Exception {
+ assertTrue((TablespaceManager.getByName("cluster1")) instanceof KafkaTablespace);
+ assertTrue((TablespaceManager.get(KAFKA_SERVER_URI)) instanceof KafkaTablespace);
+ }
+
+ /**
+ * Close EmbeddedKafka.
+ *
+ * @throws Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ KAFKA.close();
+ }
+
+ /**
+ * Test for getSplit.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testGetSplit() throws Exception {
+ TableMeta meta = CatalogUtil.newTableMeta("KAFKA", new TajoConf());
+ Map<String, String> option = new java.util.HashMap<String, String>();
+ option.put(KafkaStorageConstants.KAFKA_TOPIC, TOPIC_NAME);
+ option.put(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE, "10");
+ meta.setPropertySet(new KeyValueSet(option));
+ TableDesc td = new TableDesc("test_table", null, meta, null);
+ KafkaTablespace kafkaTablespace = TablespaceManager.getByName("cluster1");
+ List<Fragment> fragmentList = kafkaTablespace.getSplits("", td, false, null);
+ long totalCount = 0;
+ for (int i = 0; i < fragmentList.size(); i++) {
+ totalCount += fragmentList.get(i).getLength();
+ }
+ assertEquals(100, totalCount);
+ }
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java
index d780ac4..86c81ae 100644
--- a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java
@@ -72,6 +72,8 @@
public void testExtractBroker() {
assertEquals("host1:9092,host2:9092,host3:9092",
SimpleConsumerManager.extractBroker(URI.create("kafka://host1:9092,host2:9092,host3:9092")));
+ assertEquals("host1:9092,host2:9092,host3:9092",
+ SimpleConsumerManager.extractBroker(URI.create("kafka://host1:9092,host2:9092,host3:9092/user")));
}
/**
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/dataset/.gitkeep b/tajo-storage/tajo-storage-kafka/src/test/resources/dataset/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/dataset/.gitkeep
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testFullOuterJoin.sql b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testFullOuterJoin.sql
new file mode 100644
index 0000000..bb4ced1
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testFullOuterJoin.sql
@@ -0,0 +1 @@
+select u.id, u.name, p.id, p.prod_name, p.point from user u full outer join prod p on(u.id = p.id) order by u.id
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testGroupby.sql b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testGroupby.sql
new file mode 100644
index 0000000..10a0dc1
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testGroupby.sql
@@ -0,0 +1 @@
+select id, count(*) from prod group by id order by id;
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testJoin.sql b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testJoin.sql
new file mode 100644
index 0000000..e3ff8d2
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testJoin.sql
@@ -0,0 +1 @@
+select u.id, u.name, p.id, p.prod_name, p.point from user u join prod p on(u.id = p.id) order by u.id
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testLeftOuterJoin.sql b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testLeftOuterJoin.sql
new file mode 100644
index 0000000..8c27e26
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testLeftOuterJoin.sql
@@ -0,0 +1 @@
+select u.id, u.name, p.id, p.prod_name, p.point from user u left outer join prod p on(u.id = p.id) order by u.id
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testSelect.sql b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testSelect.sql
new file mode 100644
index 0000000..839908c
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testSelect.sql
@@ -0,0 +1 @@
+select id, name from user order by id;
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testFullOuterJoin.result b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testFullOuterJoin.result
new file mode 100644
index 0000000..d33b305
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testFullOuterJoin.result
@@ -0,0 +1,13 @@
+id,name,id,prod_name,point
+-------------------------------
+1,user1,1,abc,0.2
+1,user1,1,abc,0.2
+2,user2,2,def,0.4
+2,user2,2,def,0.4
+3,user3,3,ghi,0.6
+3,user3,3,ghi,0.6
+4,user4,4,jkl,0.8
+4,user4,4,jkl,0.8
+6,user6,null,null,null
+null,null,5,mno,1.0
+null,null,5,mno,1.0
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testGroupby.result b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testGroupby.result
new file mode 100644
index 0000000..cfb9c69
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testGroupby.result
@@ -0,0 +1,7 @@
+id,?count
+-------------------------------
+1,2
+2,2
+3,2
+4,2
+5,2
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testJoin.result b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testJoin.result
new file mode 100644
index 0000000..0af0a6a
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testJoin.result
@@ -0,0 +1,10 @@
+id,name,id,prod_name,point
+-------------------------------
+1,user1,1,abc,0.2
+1,user1,1,abc,0.2
+2,user2,2,def,0.4
+2,user2,2,def,0.4
+3,user3,3,ghi,0.6
+3,user3,3,ghi,0.6
+4,user4,4,jkl,0.8
+4,user4,4,jkl,0.8
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testLeftOuterJoin.result b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testLeftOuterJoin.result
new file mode 100644
index 0000000..51ba0fa
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testLeftOuterJoin.result
@@ -0,0 +1,11 @@
+id,name,id,prod_name,point
+-------------------------------
+1,user1,1,abc,0.2
+1,user1,1,abc,0.2
+2,user2,2,def,0.4
+2,user2,2,def,0.4
+3,user3,3,ghi,0.6
+3,user3,3,ghi,0.6
+4,user4,4,jkl,0.8
+4,user4,4,jkl,0.8
+6,user6,null,null,null
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testSelect.result b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testSelect.result
new file mode 100644
index 0000000..24a91ff
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testSelect.result
@@ -0,0 +1,7 @@
+id,name
+-------------------------------
+1,user1
+2,user2
+3,user3
+4,user4
+6,user6