TAJO-1488: Implement KafkaTablespace.

Closes #1044

Signed-off-by: Jinho Kim <jhkim@apache.org>
diff --git a/CHANGES b/CHANGES
index 5e7fc0f..85fccd0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -408,6 +408,8 @@
 
   SUB TASKS
 
+    TAJO-1488: Implement KafkaTablespace. (Byunghwa Yun via jinho)
+
     TAJO-1487: Kafka Scanner for kafka strage. (Byunghwa Yun via jinho)
 
     TAJO-2174: Implement HiveCatalogStore#alterTable. (Lee Dongjin via jinho)
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
index 17ac3ba..a0b4280 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
@@ -31,6 +31,10 @@
     "swift": {
       "handler": "org.apache.tajo.storage.s3.S3TableSpace",
       "default-format": "text"
+    },
+    "kafka": {
+      "handler": "org.apache.tajo.storage.kafka.KafkaTablespace",
+      "default-format": "kafka"
     }
   }
 }
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index ce0ce85..976fb24 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -39,7 +39,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json</value>
+    <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json,kafka</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -60,6 +60,10 @@
     <value>org.apache.tajo.storage.http.ExampleHttpFileFragment</value>
   </property>
   <property>
+    <name>tajo.storage.fragment.kind.kafka</name>
+    <value>org.apache.tajo.storage.kafka.KafkaFragment</value>
+  </property>
+  <property>
     <name>tajo.storage.fragment.serde.file</name>
     <value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
   </property>
@@ -75,6 +79,10 @@
     <name>tajo.storage.fragment.serde.example-http</name>
     <value>org.apache.tajo.storage.http.ExampleHttpFileFragmentSerde</value>
   </property>
+  <property>
+    <name>tajo.storage.fragment.serde.kafka</name>
+    <value>org.apache.tajo.storage.kafka.KafkaFragmentSerde</value>
+  </property>
 
   <!--- Scanner Handler -->
   <property>
@@ -137,6 +145,11 @@
     <value>org.apache.tajo.storage.http.ExampleHttpJsonScanner</value>
   </property>
 
+  <property>
+    <name>tajo.storage.scanner-handler.kafka.class</name>
+    <value>org.apache.tajo.storage.kafka.KafkaScanner</value>
+  </property>
+
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index 1737e22..d6e74a3 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -38,7 +38,7 @@
   <!--- Registered Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
-    <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json</value>
+    <value>text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json,kafka</value>
   </property>
 
   <!--- Fragment Class Configurations -->
@@ -131,6 +131,11 @@
     <value>org.apache.tajo.storage.http.ExampleHttpJsonScanner</value>
   </property>
 
+  <property>
+    <name>tajo.storage.scanner-handler.kafka.class</name>
+    <value>org.apache.tajo.storage.kafka.KafkaScanner</value>
+  </property>
+
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>
diff --git a/tajo-storage/tajo-storage-kafka/pom.xml b/tajo-storage/tajo-storage-kafka/pom.xml
index 82a55ff..ab93e0b 100644
--- a/tajo-storage/tajo-storage-kafka/pom.xml
+++ b/tajo-storage/tajo-storage-kafka/pom.xml
@@ -56,8 +56,10 @@
         <artifactId>apache-rat-plugin</artifactId>
         <configuration>
           <excludes>
-            <exclude>src/main/resources/*.json</exclude>
-            <exclude>src/test/resources/*.json</exclude>
+            <exclude>derby.log</exclude>
+            <exclude>src/test/resources/dataset/**</exclude>
+            <exclude>src/test/resources/queries/**</exclude>
+            <exclude>src/test/resources/results/**</exclude>
           </excludes>
         </configuration>
         <executions>
@@ -185,6 +187,16 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-rpc-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-cluster-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>${kafka.version}</version>
@@ -213,6 +225,73 @@
       <version>0.88</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java
index 4008a25..9f189ff 100644
--- a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragment.java
@@ -35,19 +35,18 @@
 
   public KafkaFragment(URI uri, String tableName, String topicName, long startOffset, long lastOffset,
       int partitionId, String leaderHost) {
-    super(BuiltinFragmentKinds.KAFKA, uri, tableName, new KafkaFragmentKey(partitionId, startOffset),
-        new KafkaFragmentKey(partitionId, lastOffset), lastOffset - startOffset, new String[] { leaderHost });
-
-    this.topicName = topicName;
-    this.last = false;
+    this(uri, tableName, topicName, startOffset, lastOffset, partitionId, leaderHost, false);
   }
 
   public KafkaFragment(URI uri, String tableName, String topicName, long startOffset, long lastOffset,
       int partitionId, String leaderHost, boolean last) {
-    this(uri, tableName, topicName, startOffset, lastOffset, partitionId, leaderHost);
+    super(BuiltinFragmentKinds.KAFKA, uri, tableName, new KafkaFragmentKey(partitionId, startOffset),
+        new KafkaFragmentKey(partitionId, lastOffset), lastOffset - startOffset, new String[] { leaderHost });
+    this.topicName = topicName;
     this.last = last;
   }
 
+  @Override
   public Object clone() throws CloneNotSupportedException {
     KafkaFragment frag = (KafkaFragment) super.clone();
     frag.topicName = topicName;
@@ -111,6 +110,10 @@
     this.endKey = new KafkaFragmentKey(partitionId, lastOffset);
   }
 
+  public int getPartitionId() {
+    return this.startKey.getPartitionId();
+  }
+
   public static class KafkaFragmentKey implements Comparable<KafkaFragmentKey> {
     private final int partitionId;
     private final long offset;
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java
index 2899b7c..f79d36e 100644
--- a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaFragmentSerde.java
@@ -40,6 +40,7 @@
         .setTopicName(fragment.getTopicName())
         .setStartOffset(fragment.getStartKey().getOffset())
         .setLastOffset(fragment.getEndKey().getOffset())
+        .setPartitionId(fragment.getPartitionId())
         .setLast(fragment.isLast())
         .setLength(fragment.getLength())
         .setLeaderHost(fragment.getHostNames().get(0))
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java
index 496dfa6..8e2f2da 100644
--- a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaScanner.java
@@ -106,7 +106,7 @@
     deserializer.init();
 
     simpleConsumerManager = new SimpleConsumerManager(fragment.getUri(), fragment.getTopicName(),
-        fragment.getStartKey().getPartitionId(), fragmentSize);
+        fragment.getPartitionId(), fragmentSize);
 
     initOffset();
   }
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaTablespace.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaTablespace.java
new file mode 100644
index 0000000..9b56cf4
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/KafkaTablespace.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.exception.NotImplementedException;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.storage.FormatProperty;
+import org.apache.tajo.storage.StorageProperty;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TupleRange;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import net.minidev.json.JSONObject;
+
+/**
+ * Tablespace for Kafka table.
+ */
+public class KafkaTablespace extends Tablespace {
+  public static final StorageProperty KAFKA_STORAGE_PROPERTIES = new StorageProperty("kafka", false, false, false,
+      false);
+
+  public static final FormatProperty KAFKA_FORMAT_PROPERTIES = new FormatProperty(false, false, false);
+
+  public KafkaTablespace(String name, URI uri, JSONObject config) {
+    super(name, uri, config);
+  }
+
+  @Override
+  protected void storageInit() throws IOException {
+  }
+
+  @Override
+  public long getTableVolume(TableDesc table, Optional<EvalNode> filter) {
+    long totalVolume;
+    try {
+      totalVolume = getSplits("", table, false, filter.orElse(null)).stream()
+          .map(f -> f.getLength())
+          .filter(size -> size > 0) // eliminate unknown sizes (-1)
+          .reduce(0L, Long::sum);
+    } catch (TajoException e) {
+      throw new TajoRuntimeException(e);
+    } catch (Throwable ioe) {
+      throw new TajoInternalError(ioe);
+    }
+    return totalVolume;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws TajoException, IOException {
+    TableStats stats = new TableStats();
+    stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+    tableDesc.setStats(stats);
+  }
+
+  @Override
+  public void purgeTable(TableDesc tableDesc) throws IOException, TajoException {
+  }
+
+  @Override
+  public URI getTableUri(TableMeta meta, String databaseName, String tableName) {
+    return URI.create(uri.toString() + "/" + tableName);
+  }
+
+  @Override
+  public List<Fragment> getSplits(String inputSourceId,
+      TableDesc table,
+      boolean requireSorted,
+      EvalNode filterCondition)
+          throws IOException, TajoException {
+    String topic = table.getMeta().getProperty(KafkaStorageConstants.KAFKA_TOPIC);
+    int fragmentSize = Integer.parseInt(table.getMeta().getProperty(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE,
+        KafkaStorageConstants.DEFAULT_FRAGMENT_SIZE));
+    // If isn't specific partitions, scan all partition of topic.
+    String partitions = table.getMeta().getProperty(KafkaStorageConstants.KAFKA_TOPIC_PARTITION,
+        KafkaStorageConstants.DEFAULT_PARTITION);
+    List<PartitionInfo> partitionList;
+    if (partitions.equals(KafkaStorageConstants.DEFAULT_PARTITION)) {
+      partitionList = SimpleConsumerManager.getPartitions(uri, topic);
+    } else {
+      partitionList = new LinkedList<>();
+      // filter partitions.
+      List<PartitionInfo> topicPartitions = SimpleConsumerManager.getPartitions(uri, topic);
+      Map<String, PartitionInfo> topicPartitionsMap = new HashMap<>();
+      for (PartitionInfo partitionInfo : topicPartitions) {
+        topicPartitionsMap.put(Integer.toString(partitionInfo.partition()), partitionInfo);
+      }
+      for (String partitionId : partitions.split(",")) {
+        partitionList.add(topicPartitionsMap.get(partitionId));
+      }
+    }
+
+    List<Fragment> fragments = new ArrayList<Fragment>();
+    for (PartitionInfo partitionInfo : partitionList) {
+      int partitionId = partitionInfo.partition();
+      String leaderHost = partitionInfo.leader().host();
+      long lastOffset;
+      long startOffset;
+      try (SimpleConsumerManager simpleConsumerManager = new SimpleConsumerManager(uri, topic, partitionId)) {
+        lastOffset = simpleConsumerManager.getLatestOffset();
+        startOffset = simpleConsumerManager.getEarliestOffset();
+      }
+
+      long messageSize = lastOffset - startOffset;
+      if (0 == lastOffset || 0 == messageSize)
+        continue;
+
+      // If message count of partition is less than fragmentSize(message count of one fragment),
+      if (messageSize <= fragmentSize) {
+        fragments.add(new KafkaFragment(table.getUri(), inputSourceId, topic, startOffset,
+            lastOffset, partitionId, leaderHost));
+      } else { // If message count of partition is greater than fragmentSize,
+        long nextFragmentStartOffset = startOffset;
+        while (nextFragmentStartOffset < lastOffset) {
+          // partition data: 0 1 2 3 4 5 6 7 8 9 10
+          // start offset: 0
+          // last offset: 11
+          // fragment size: 3
+          // result: (0, 1, 2), (3, 4, 5), (6, 7, 8), (9, 10)
+          // 1st nextFragmentStartOffset=0, nextFragmentLastOffset=3
+          // 2st nextFragmentStartOffset=3, nextFragmentLastOffset=6
+          // 3st nextFragmentStartOffset=6, nextFragmentLastOffset=9
+          // 4st nextFragmentStartOffset=9, nextFragmentLastOffset=12
+          long nextFragmentLastOffset = nextFragmentStartOffset + fragmentSize;
+          // the offset of last part is small than fragmentSize so that Tajo gets the minimum value.
+          long fragmentLstOffset = Math.min(nextFragmentLastOffset, lastOffset);
+          fragments.add(new KafkaFragment(table.getUri(), inputSourceId, topic,
+              nextFragmentStartOffset, fragmentLstOffset, partitionId, leaderHost));
+          nextFragmentStartOffset = nextFragmentLastOffset;
+        }
+      }
+    }
+    return fragments;
+  }
+
+  @Override
+  public StorageProperty getProperty() {
+    return KAFKA_STORAGE_PROPERTIES;
+  }
+
+  @Override
+  public FormatProperty getFormatProperty(TableMeta meta) {
+    return KAFKA_FORMAT_PROPERTIES;
+  }
+
+  @Override
+  public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, Schema inputSchema,
+      SortSpec[] sortSpecs, TupleRange dataRange) throws IOException {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws TajoException {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public void prepareTable(LogicalNode node) throws IOException, TajoException {
+  }
+
+  @Override
+  public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, Schema schema,
+      TableDesc tableDesc) throws IOException {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public void rollbackTable(LogicalNode node) throws IOException, TajoException {
+    throw new TajoRuntimeException(new NotImplementedException());
+  }
+
+  @Override
+  public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java
index dd44ea7..b4c7736 100644
--- a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java
@@ -146,8 +146,12 @@
   static String extractBroker(URI uri) {
     String uriStr = uri.toString();
     int start = uriStr.indexOf("/") + 2;
-
-    return uriStr.substring(start);
+    int end = uriStr.indexOf("/", start);
+    if (end < 0) {
+      return uriStr.substring(start);
+    } else {
+      return uriStr.substring(start, end);
+    }
   }
 
   /**
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaQuery.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaQuery.java
new file mode 100644
index 0000000..522d0cc
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaQuery.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import static org.apache.tajo.storage.kafka.KafkaTestUtil.DEFAULT_TEST_PARTITION_NUM;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.kafka.server.EmbeddedKafka;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URI;
+
+import net.minidev.json.JSONObject;
+
+public class TestKafkaQuery extends QueryTestCaseBase {
+
+  private static final String TEST_TOPIC_USER = "TEST_TOPIC_USER";
+  private static final String TEST_TOPIC_PROD = "TEST_TOPIC2_PROD";
+
+  private static EmbeddedKafka KAFKA;
+  private static URI KAFKA_SERVER_URI;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    KAFKA = EmbeddedKafka.createEmbeddedKafka(2181, 9092);
+    KAFKA.start();
+    KAFKA.createTopic(DEFAULT_TEST_PARTITION_NUM, 1, TEST_TOPIC_USER);
+    KAFKA.createTopic(DEFAULT_TEST_PARTITION_NUM, 1, TEST_TOPIC_PROD);
+    KAFKA_SERVER_URI = URI.create("kafka://" + KAFKA.getConnectString());
+
+    // Load test data.
+    try (Producer<String, String> producer = KAFKA.createProducer(KAFKA.getConnectString())) {
+      KafkaTestUtil.sendTestData(producer, TEST_TOPIC_USER, "1|user1");
+      KafkaTestUtil.sendTestData(producer, TEST_TOPIC_USER, "2|user2");
+      KafkaTestUtil.sendTestData(producer, TEST_TOPIC_USER, "3|user3");
+      KafkaTestUtil.sendTestData(producer, TEST_TOPIC_USER, "4|user4");
+      KafkaTestUtil.sendTestData(producer, TEST_TOPIC_USER, "6|user6");
+      for (int i = 0; i < 2; i++) {
+	KafkaTestUtil.sendTestData(producer, TEST_TOPIC_PROD);
+      }
+    }
+
+    JSONObject configElements = new JSONObject();
+    KafkaTablespace hBaseTablespace = new KafkaTablespace("cluster1", KAFKA_SERVER_URI, configElements);
+    hBaseTablespace.init(new TajoConf());
+    TablespaceManager.addTableSpaceForTest(hBaseTablespace);
+
+    QueryTestCaseBase.testingCluster.getMaster().refresh();
+  }
+
+  @AfterClass
+  public static void teardown() throws Exception {
+    KAFKA.close();
+  }
+
+  @Before
+  public void prepareTables() throws TajoException {
+    String createSql = "create table user (id int4, name text) tablespace cluster1 using kafka with ('kafka.topic'='"
+      + TEST_TOPIC_USER + "')";
+    executeString(createSql);
+
+    createSql = "create table prod (id int4, prod_name text, point float4) tablespace cluster1 using kafka with ('kafka.topic'='"
+      + TEST_TOPIC_PROD + "')";
+    executeString(createSql);
+  }
+
+  @After
+  public void dropTables() throws TajoException {
+    executeString("drop table user");
+    executeString("drop table prod");
+  }
+
+  @SimpleTest
+  @Test
+  public void testSelect() throws Exception {
+    runSimpleTests();
+  }
+
+  @SimpleTest
+  @Test
+  public void testGroupby() throws Exception {
+    runSimpleTests();
+  }
+
+  @SimpleTest
+  @Test
+  public void testJoin() throws Exception {
+    runSimpleTests();
+  }
+
+  @SimpleTest
+  @Test
+  public void testLeftOuterJoin() throws Exception {
+    runSimpleTests();
+  }
+
+  @SimpleTest
+  @Test
+  public void testFullOuterJoin() throws Exception {
+    runSimpleTests();
+  }
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaTablespace.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaTablespace.java
new file mode 100644
index 0000000..904492e
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestKafkaTablespace.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import static org.apache.tajo.storage.kafka.KafkaTestUtil.TOPIC_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.kafka.server.EmbeddedKafka;
+import org.apache.tajo.util.KeyValueSet;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import net.minidev.json.JSONObject;
+
+public class TestKafkaTablespace {
+  private static EmbeddedKafka KAFKA;
+  private static URI KAFKA_SERVER_URI;
+
+  /**
+   * Start up EmbeddedKafka and Generate test data.
+   *
+   * @throws Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    KAFKA = EmbeddedKafka.createEmbeddedKafka(2181, 9092);
+    KAFKA.start();
+    KAFKA.createTopic(1, 1, TOPIC_NAME);
+    KAFKA_SERVER_URI = URI.create("kafka://" + KAFKA.getConnectString());
+
+    // Load test data.
+    try (Producer<String, String> producer = KAFKA.createProducer(KAFKA.getConnectString())) {
+      for (int i = 0; i < 20; i++) {
+	KafkaTestUtil.sendTestData(producer, TOPIC_NAME);
+      }
+    }
+
+    JSONObject configElements = new JSONObject();
+    KafkaTablespace hBaseTablespace = new KafkaTablespace("cluster1", KAFKA_SERVER_URI, configElements);
+    hBaseTablespace.init(new TajoConf());
+    TablespaceManager.addTableSpaceForTest(hBaseTablespace);
+  }
+
+  @Test
+  public void testTablespaceHandler() throws Exception {
+    assertTrue((TablespaceManager.getByName("cluster1")) instanceof KafkaTablespace);
+    assertTrue((TablespaceManager.get(KAFKA_SERVER_URI)) instanceof KafkaTablespace);
+  }
+
+  /**
+   * Close EmbeddedKafka.
+   *
+   * @throws Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    KAFKA.close();
+  }
+
+  /**
+   * Test for getSplit.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testGetSplit() throws Exception {
+    TableMeta meta = CatalogUtil.newTableMeta("KAFKA", new TajoConf());
+    Map<String, String> option = new java.util.HashMap<String, String>();
+    option.put(KafkaStorageConstants.KAFKA_TOPIC, TOPIC_NAME);
+    option.put(KafkaStorageConstants.KAFKA_FRAGMENT_SIZE, "10");
+    meta.setPropertySet(new KeyValueSet(option));
+    TableDesc td = new TableDesc("test_table", null, meta, null);
+    KafkaTablespace kafkaTablespace = TablespaceManager.getByName("cluster1");
+    List<Fragment> fragmentList = kafkaTablespace.getSplits("", td, false, null);
+    long totalCount = 0;
+    for (int i = 0; i < fragmentList.size(); i++) {
+      totalCount += fragmentList.get(i).getLength();
+    }
+    assertEquals(100, totalCount);
+  }
+}
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java
index d780ac4..86c81ae 100644
--- a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java
@@ -72,6 +72,8 @@
   public void testExtractBroker() {
     assertEquals("host1:9092,host2:9092,host3:9092",
         SimpleConsumerManager.extractBroker(URI.create("kafka://host1:9092,host2:9092,host3:9092")));
+    assertEquals("host1:9092,host2:9092,host3:9092",
+        SimpleConsumerManager.extractBroker(URI.create("kafka://host1:9092,host2:9092,host3:9092/user")));
   }
 
   /**
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/dataset/.gitkeep b/tajo-storage/tajo-storage-kafka/src/test/resources/dataset/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/dataset/.gitkeep
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testFullOuterJoin.sql b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testFullOuterJoin.sql
new file mode 100644
index 0000000..bb4ced1
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testFullOuterJoin.sql
@@ -0,0 +1 @@
+select u.id, u.name, p.id, p.prod_name, p.point from user u full outer join prod p on(u.id = p.id) order by u.id
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testGroupby.sql b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testGroupby.sql
new file mode 100644
index 0000000..10a0dc1
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testGroupby.sql
@@ -0,0 +1 @@
+select id, count(*) from prod group by id order by id;
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testJoin.sql b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testJoin.sql
new file mode 100644
index 0000000..e3ff8d2
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testJoin.sql
@@ -0,0 +1 @@
+select u.id, u.name, p.id, p.prod_name, p.point from user u join prod p on(u.id = p.id) order by u.id
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testLeftOuterJoin.sql b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testLeftOuterJoin.sql
new file mode 100644
index 0000000..8c27e26
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testLeftOuterJoin.sql
@@ -0,0 +1 @@
+select u.id, u.name, p.id, p.prod_name, p.point from user u left outer join prod p on(u.id = p.id) order by u.id
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testSelect.sql b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testSelect.sql
new file mode 100644
index 0000000..839908c
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/queries/TestKafkaQuery/testSelect.sql
@@ -0,0 +1 @@
+select id, name from user order by id;
\ No newline at end of file
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testFullOuterJoin.result b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testFullOuterJoin.result
new file mode 100644
index 0000000..d33b305
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testFullOuterJoin.result
@@ -0,0 +1,13 @@
+id,name,id,prod_name,point
+-------------------------------
+1,user1,1,abc,0.2
+1,user1,1,abc,0.2
+2,user2,2,def,0.4
+2,user2,2,def,0.4
+3,user3,3,ghi,0.6
+3,user3,3,ghi,0.6
+4,user4,4,jkl,0.8
+4,user4,4,jkl,0.8
+6,user6,null,null,null
+null,null,5,mno,1.0
+null,null,5,mno,1.0
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testGroupby.result b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testGroupby.result
new file mode 100644
index 0000000..cfb9c69
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testGroupby.result
@@ -0,0 +1,7 @@
+id,?count
+-------------------------------
+1,2
+2,2
+3,2
+4,2
+5,2
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testJoin.result b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testJoin.result
new file mode 100644
index 0000000..0af0a6a
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testJoin.result
@@ -0,0 +1,10 @@
+id,name,id,prod_name,point
+-------------------------------
+1,user1,1,abc,0.2
+1,user1,1,abc,0.2
+2,user2,2,def,0.4
+2,user2,2,def,0.4
+3,user3,3,ghi,0.6
+3,user3,3,ghi,0.6
+4,user4,4,jkl,0.8
+4,user4,4,jkl,0.8
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testLeftOuterJoin.result b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testLeftOuterJoin.result
new file mode 100644
index 0000000..51ba0fa
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testLeftOuterJoin.result
@@ -0,0 +1,11 @@
+id,name,id,prod_name,point
+-------------------------------
+1,user1,1,abc,0.2
+1,user1,1,abc,0.2
+2,user2,2,def,0.4
+2,user2,2,def,0.4
+3,user3,3,ghi,0.6
+3,user3,3,ghi,0.6
+4,user4,4,jkl,0.8
+4,user4,4,jkl,0.8
+6,user6,null,null,null
diff --git a/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testSelect.result b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testSelect.result
new file mode 100644
index 0000000..24a91ff
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/resources/results/TestKafkaQuery/testSelect.result
@@ -0,0 +1,7 @@
+id,name
+-------------------------------
+1,user1
+2,user2
+3,user3
+4,user4
+6,user6