TAJO-2146: Fragment interface cleanup.

Closes #1015
diff --git a/CHANGES b/CHANGES
index 8f6c0fe..beee5be 100644
--- a/CHANGES
+++ b/CHANGES
@@ -4,7 +4,7 @@
 
   NEW FEATURES
 
-    TAJO-1686: Allow Tajo to use Hive UDF. (jihoon)
+    TAJO-1686: Allow Tajo to use Hive UDF. (Jongyoung Park via jihoon)
 
     TAJO-2122: PullServer as an Auxiliary service of Yarn. (jihoon)
 
@@ -14,6 +14,8 @@
 
   IMPROVEMENT
 
+    TAJO-2146: Fragment interface cleanup. (jihoon)
+
     TAJO-2129: Apply new type implementation to Schema and Catalog. (hyunsik)
 
     TAJO-2071: Supporting DATE type in Parquet format. 
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index e79bc75..b42cf58 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -75,7 +75,7 @@
 
 message FragmentProto {
   required string id = 1;
-  required string data_format = 2;
+  required string kind = 2;
   required bytes contents = 3;
 }
 
diff --git a/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java b/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
index 559bed3..294f8bb 100644
--- a/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
+++ b/tajo-common/src/test/java/org/apache/tajo/datum/TestInt4Datum.java
@@ -18,12 +18,10 @@
 
 package org.apache.tajo.datum;
 
-import org.junit.Test;
 import org.apache.tajo.common.TajoDataTypes;
+import org.junit.Test;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class TestInt4Datum {
 
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 8914e3b..5819179 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -555,8 +555,8 @@
     Tablespace tablespace = TablespaceManager.getByName("cluster1");
     List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
     assertEquals(1, fragments.size());
-    assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
-    assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
+    assertEquals("021", ((HBaseFragment)fragments.get(0)).getStartKey().toString());
+    assertEquals("021" + postFix, ((HBaseFragment)fragments.get(0)).getEndKey().toString());
 
     // where rk >= '020' and rk <= '055'
     EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
@@ -569,12 +569,12 @@
     fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
     assertEquals(2, fragments.size());
     HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
-    assertEquals("020", new String(fragment1.getStartRow()));
-    assertEquals("040", new String(fragment1.getStopRow()));
+    assertEquals("020", fragment1.getStartKey().toString());
+    assertEquals("040", fragment1.getEndKey().toString());
 
     HBaseFragment fragment2 = (HBaseFragment) fragments.get(1);
-    assertEquals("040", new String(fragment2.getStartRow()));
-    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+    assertEquals("040", fragment2.getStartKey().toString());
+    assertEquals("055" + postFix, fragment2.getEndKey().toString());
 
     // where (rk >= '020' and rk <= '055') or rk = '075'
     EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
@@ -584,16 +584,16 @@
     fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, false, scanNode.getQual());
     assertEquals(3, fragments.size());
     fragment1 = (HBaseFragment) fragments.get(0);
-    assertEquals("020", new String(fragment1.getStartRow()));
-    assertEquals("040", new String(fragment1.getStopRow()));
+    assertEquals("020", fragment1.getStartKey().toString());
+    assertEquals("040", fragment1.getEndKey().toString());
 
     fragment2 = (HBaseFragment) fragments.get(1);
-    assertEquals("040", new String(fragment2.getStartRow()));
-    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+    assertEquals("040", fragment2.getStartKey().toString());
+    assertEquals("055" + postFix, fragment2.getEndKey().toString());
 
     HBaseFragment fragment3 = (HBaseFragment) fragments.get(2);
-    assertEquals("075", new String(fragment3.getStartRow()));
-    assertEquals("075" + postFix, new String(fragment3.getStopRow()));
+    assertEquals("075", fragment3.getStartKey().toString());
+    assertEquals("075" + postFix, fragment3.getEndKey().toString());
 
 
     // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
@@ -608,16 +608,16 @@
     assertEquals(3, fragments.size());
 
     fragment1 = (HBaseFragment) fragments.get(0);
-    assertEquals("020", new String(fragment1.getStartRow()));
-    assertEquals("040", new String(fragment1.getStopRow()));
+    assertEquals("020", fragment1.getStartKey().toString());
+    assertEquals("040", fragment1.getEndKey().toString());
 
     fragment2 = (HBaseFragment) fragments.get(1);
-    assertEquals("040", new String(fragment2.getStartRow()));
-    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+    assertEquals("040", fragment2.getStartKey().toString());
+    assertEquals("055" + postFix, fragment2.getEndKey().toString());
 
     fragment3 = (HBaseFragment) fragments.get(2);
-    assertEquals("072", new String(fragment3.getStartRow()));
-    assertEquals("078" + postFix, new String(fragment3.getStopRow()));
+    assertEquals("072", fragment3.getStartKey().toString());
+    assertEquals("078" + postFix, fragment3.getEndKey().toString());
 
     // where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059')
     evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
@@ -631,12 +631,12 @@
     assertEquals(2, fragments.size());
 
     fragment1 = (HBaseFragment) fragments.get(0);
-    assertEquals("020", new String(fragment1.getStartRow()));
-    assertEquals("040", new String(fragment1.getStopRow()));
+    assertEquals("020", fragment1.getStartKey().toString());
+    assertEquals("040", fragment1.getEndKey().toString());
 
     fragment2 = (HBaseFragment) fragments.get(1);
-    assertEquals("040", new String(fragment2.getStartRow()));
-    assertEquals("059" + postFix, new String(fragment2.getStopRow()));
+    assertEquals("040", fragment2.getStartKey().toString());
+    assertEquals("059" + postFix, fragment2.getEndKey().toString());
   }
 
   @Test
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
index 86f7d1a..cccff70 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestFileFragment.java
@@ -20,6 +20,8 @@
 
 import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.BuiltinFragmentKinds;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -34,10 +36,12 @@
 import static org.junit.Assert.assertTrue;
 
 public class TestFileFragment {
+  private TajoConf conf;
   private Path path;
   
   @Before
   public final void setUp() throws Exception {
+    conf = new TajoConf();
     path = CommonTestingUtil.getTestDir();
   }
 
@@ -45,7 +49,7 @@
   public final void testGetAndSetFields() {
     FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
 
-    assertEquals("table1_1", fragment1.getTableName());
+    assertEquals("table1_1", fragment1.getInputSourceId());
     assertEquals(new Path(path, "table0"), fragment1.getPath());
     assertTrue(0 == fragment1.getStartKey());
     assertTrue(500 == fragment1.getLength());
@@ -55,8 +59,9 @@
   public final void testGetProtoAndRestore() {
     FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0"), 0, 500);
 
-    FileFragment fragment1 = FragmentConvertor.convert(FileFragment.class, fragment.getProto());
-    assertEquals("table1_1", fragment1.getTableName());
+    FileFragment fragment1 = FragmentConvertor.convert(conf, BuiltinFragmentKinds.FILE,
+        FragmentConvertor.toFragmentProto(conf, fragment));
+    assertEquals("table1_1", fragment1.getInputSourceId());
     assertEquals(new Path(path, "table0"), fragment1.getPath());
     assertTrue(0 == fragment1.getStartKey());
     assertTrue(500 == fragment1.getLength());
@@ -73,7 +78,7 @@
     Arrays.sort(tablets);
 
     for(int i = 0; i < num; i++) {
-      assertEquals("tablet1_"+i, tablets[i].getTableName());
+      assertEquals("tablet1_"+i, tablets[i].getInputSourceId());
     }
   }
 
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
deleted file mode 100644
index 0173f59..0000000
--- a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TpchTestBase;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaBuilder;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class TestRowFile {
-  private static final Log LOG = LogFactory.getLog(TestRowFile.class);
-
-  private TajoTestingCluster cluster;
-  private TajoConf conf;
-
-  @Before
-  public void setup() throws Exception {
-    cluster = TpchTestBase.getInstance().getTestingCluster();
-    conf = cluster.getConfiguration();
-  }
-
-  @After
-  public void teardown() throws Exception {
-  }
-
-  @Test
-  public void test() throws IOException {
-    Schema schema = SchemaBuilder.builder()
-        .add("id", Type.INT4)
-        .add("age", Type.INT8)
-        .add("description", Type.TEXT)
-        .build();
-
-    TableMeta meta = CatalogUtil.newTableMeta("ROWFILE", conf);
-
-    FileTablespace sm = (FileTablespace) TablespaceManager.get(cluster.getDefaultFileSystem().getUri());
-
-    Path tablePath = new Path("/test");
-    Path dataPath = new Path(tablePath, "test.tbl");
-    FileSystem fs = sm.getFileSystem();
-    fs.mkdirs(tablePath);
-
-    Appender appender = sm.getAppender(meta, schema, dataPath);
-    appender.enableStats();
-    appender.init();
-
-    int tupleNum = 200;
-    Tuple tuple;
-    Datum stringDatum = DatumFactory.createText("abcdefghijklmnopqrstuvwxyz");
-    Set<Integer> idSet = Sets.newHashSet();
-
-    tuple = new VTuple(3);
-    long start = System.currentTimeMillis();
-    for(int i = 0; i < tupleNum; i++) {
-      tuple.put(0, DatumFactory.createInt4(i + 1));
-      tuple.put(1, DatumFactory.createInt8(25l));
-      tuple.put(2, stringDatum);
-      appender.addTuple(tuple);
-      idSet.add(i+1);
-    }
-    appender.close();
-
-    TableStats stat = appender.getStats();
-    assertEquals(tupleNum, stat.getNumRows().longValue());
-
-    FileStatus file = fs.getFileStatus(dataPath);
-    FileFragment fragment = new FileFragment("test.tbl", dataPath, 0, file.getLen());
-
-    int tupleCnt = 0;
-    start = System.currentTimeMillis();
-    Scanner scanner = sm.getScanner(meta, schema, fragment, null);
-    scanner.init();
-    while ((tuple=scanner.next()) != null) {
-      tupleCnt++;
-    }
-    scanner.close();
-
-    assertEquals(tupleNum, tupleCnt);
-
-    tupleCnt = 0;
-    long fileStart = 0;
-    long fileLen = file.getLen()/13;
-
-    for (int i = 0; i < 13; i++) {
-      fragment = new FileFragment("test.tbl", dataPath, fileStart, fileLen);
-      scanner = new RowFile.RowFileScanner(conf, schema, meta, fragment);
-      scanner.init();
-      while ((tuple=scanner.next()) != null) {
-        if (!idSet.remove(tuple.getInt4(0)) && LOG.isDebugEnabled()) {
-          LOG.debug("duplicated! " + tuple.getInt4(0));
-        }
-        tupleCnt++;
-      }
-      scanner.close();
-      fileStart += fileLen;
-      if (i == 11) {
-        fileLen = file.getLen() - fileStart;
-      }
-    }
-    assertEquals(tupleNum, tupleCnt);
-  }
-}
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 87a6e74..fce56fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -923,13 +923,13 @@
           PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
           List<Fragment> fileFragments = new ArrayList<>();
 
-          FileTablespace space = (FileTablespace) TablespaceManager.get(scanNode.getTableDesc().getUri());
+          FileTablespace space = TablespaceManager.get(scanNode.getTableDesc().getUri());
           for (Path path : partitionedTableScanNode.getInputPaths()) {
             fileFragments.addAll(Arrays.asList(space.split(scanNode.getCanonicalName(), path)));
           }
 
           FragmentProto[] fragments =
-                FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()]));
+                FragmentConvertor.toFragmentProtoArray(conf, fileFragments.toArray(new Fragment[fileFragments.size()]));
 
           ctx.addFragments(scanNode.getCanonicalName(), fragments);
           return new PartitionMergeScanExec(ctx, scanNode, fragments);
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 3be1d36..7ab0943 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -86,7 +86,7 @@
 
     this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
-    Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(fragment));
+    Path indexPath = new Path(indexPrefix.toString(), IndexExecutorUtil.getIndexFileName(context.getConf(), fragment));
     this.reader = new BSTIndex(context.getConf()).
         getIndexReader(indexPath, keySchema, comparator);
   }
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 3d2de28..4ec5144 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -170,7 +170,7 @@
 
     mergedInputFragments = new ArrayList<>();
     for (CatalogProtos.FragmentProto proto : fragments) {
-      FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+      FileFragment fragment = FragmentConvertor.convert(context.getConf(), proto);
       mergedInputFragments.add(new Chunk(inSchema, fragment, scanNode.getTableDesc().getMeta()));
     }
   }
@@ -464,7 +464,7 @@
             debug(LOG, "Remove intermediate memory tuples: " + chunk.getMemoryTuples().usedMem());
           }
           chunk.getMemoryTuples().release();
-        } else if (chunk.getFragment().getTableName().contains(INTERMEDIATE_FILE_PREFIX)) {
+        } else if (chunk.getFragment().getInputSourceId().contains(INTERMEDIATE_FILE_PREFIX)) {
           localFS.delete(chunk.getFragment().getPath(), true);
           numDeletedFiles++;
 
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
index 3b8317f..df143c8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/IndexExecutorUtil.java
@@ -18,14 +18,15 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 
 public class IndexExecutorUtil {
 
-  public static String getIndexFileName(FragmentProto fragmentProto) {
-    FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, fragmentProto);
+  public static String getIndexFileName(Configuration conf, FragmentProto fragmentProto) {
+    FileFragment fileFragment = FragmentConvertor.convert(conf, fragmentProto);
     StringBuilder sb = new StringBuilder();
     sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
     return sb.toString();
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index d1dfe40..074d0ab 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -40,6 +40,7 @@
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 
 import java.io.IOException;
@@ -120,7 +121,7 @@
         fragments.add(fileFragment);
       }
     }
-    return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[fragments.size()]));
+    return FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()]));
   }
 
   /**
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index dc48f3f..52cb080 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -97,7 +97,7 @@
 
     Tuple partitionRow = null;
     if (fragments != null && fragments.length > 0) {
-      List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
+      List<FileFragment> fileFragments = FragmentConvertor.convert(context.getConf(), fragments);
 
       // Get a partition key value from a given path
       partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath(
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
index c5e1093..46f672d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
@@ -72,7 +72,7 @@
 
     TajoConf conf = context.getConf();
     Path indexPath = new Path(logicalPlan.getIndexPath().toString(),
-        IndexExecutorUtil.getIndexFileName(scanExec.getFragments()[0]));
+        IndexExecutorUtil.getIndexFileName(conf, scanExec.getFragments()[0]));
     // TODO: Create factory using reflection
     BSTIndex bst = new BSTIndex(conf);
     this.comparator = new BaseTupleComparator(keySchema, sortSpecs);
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index 871db89..e8b8b45 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -108,7 +108,9 @@
         SplitUtil.getSplits(tablespace, scanNode, scanNode.getTableDesc(), true));
 
     if (!fragments.isEmpty()) {
-      FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[fragments.size()]));
+      FragmentProto[] fragmentProtos =
+          FragmentConvertor.toFragmentProtoArray(tajoConf, fragments.toArray(new Fragment[fragments.size()]));
+
       this.taskContext = new TaskAttemptContext(
           new QueryContext(tajoConf), null,
           new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0),
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index f8b89f1..f1ad931 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -20,6 +20,7 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -281,7 +282,7 @@
         fragmentList.add(fragment.toString());
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
-        fragmentList.add("ERROR: " + eachFragment.getDataFormat() + "," + eachFragment.getId() + ": " + e.getMessage());
+        fragmentList.add("ERROR: " + eachFragment.getKind() + "," + eachFragment.getId() + ": " + e.getMessage());
       }
     }
     taskHistory.setFragments(fragmentList.toArray(new String[fragmentList.size()]));
@@ -330,25 +331,25 @@
 	}
 
   private void addDataLocation(Fragment fragment) {
-    String[] hosts = fragment.getHosts();
-    int[] diskIds = null;
+    ImmutableList<String> hosts = fragment.getHostNames();
+    Integer[] diskIds = null;
     if (fragment instanceof FileFragment) {
       diskIds = ((FileFragment)fragment).getDiskIds();
     }
-    for (int i = 0; i < hosts.length; i++) {
-      dataLocations.add(new DataLocation(hosts[i], diskIds == null ? DataLocation.UNKNOWN_VOLUME_ID : diskIds[i]));
+    for (int i = 0; i < hosts.size(); i++) {
+      dataLocations.add(new DataLocation(hosts.get(i), diskIds == null ? DataLocation.UNKNOWN_VOLUME_ID : diskIds[i]));
     }
   }
 
   public void addFragment(Fragment fragment, boolean useDataLocation) {
     Set<FragmentProto> fragmentProtos;
-    if (fragMap.containsKey(fragment.getTableName())) {
-      fragmentProtos = fragMap.get(fragment.getTableName());
+    if (fragMap.containsKey(fragment.getInputSourceId())) {
+      fragmentProtos = fragMap.get(fragment.getInputSourceId());
     } else {
       fragmentProtos = new HashSet<>();
-      fragMap.put(fragment.getTableName(), fragmentProtos);
+      fragMap.put(fragment.getInputSourceId(), fragmentProtos);
     }
-    fragmentProtos.add(fragment.getProto());
+    fragmentProtos.add(FragmentConvertor.toFragmentProto(systemConf, fragment));
     if (useDataLocation) {
       addDataLocation(fragment);
     }
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 47f1af2..1cba931 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -128,7 +128,8 @@
   @VisibleForTesting
   public TaskAttemptContext(final QueryContext queryContext, final TaskAttemptId taskAttemptId,
                             final Fragment [] fragments,  final Path workDir) {
-    this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(fragments), workDir);
+    this(queryContext, null, taskAttemptId, FragmentConvertor.toFragmentProtoArray(queryContext.getConf(), fragments),
+        workDir);
   }
 
   public TajoConf getConf() {
@@ -258,12 +259,12 @@
   public void updateAssignedFragments(String tableId, Fragment[] fragments) {
     fragmentMap.remove(tableId);
     for(Fragment t : fragments) {
-      if (fragmentMap.containsKey(t.getTableName())) {
-        fragmentMap.get(t.getTableName()).add(t.getProto());
+      if (fragmentMap.containsKey(t.getInputSourceId())) {
+        fragmentMap.get(t.getInputSourceId()).add(FragmentConvertor.toFragmentProto(getConf(), t));
       } else {
         List<FragmentProto> frags = new ArrayList<>();
-        frags.add(t.getProto());
-        fragmentMap.put(t.getTableName(), frags);
+        frags.add(FragmentConvertor.toFragmentProto(getConf(), t));
+        fragmentMap.put(t.getInputSourceId(), frags);
       }
     }
   }
@@ -281,7 +282,7 @@
     List<Path> paths = fragmentToPath(tableFragments);
 
     for (FragmentProto eachFragment: fragments) {
-      FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
+      FileFragment fileFragment = FragmentConvertor.convert(getConf(), eachFragment);
       // If current attempt already has same path, we don't need to add it to fragments.
       if (!paths.contains(fileFragment.getPath())) {
         tableFragments.add(eachFragment);
@@ -297,7 +298,7 @@
     List<Path> list = new ArrayList<>();
 
     for (FragmentProto proto : tableFragments) {
-      FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto);
+      FileFragment fragment = FragmentConvertor.convert(getConf(), proto);
       list.add(fragment.getPath());
     }
 
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 4afb383..4fcd5dc 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -244,6 +244,21 @@
   }
 
   /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws IOException
+   */
+  public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, Fragment fragment,
+                                                         Schema target) throws IOException {
+    return (SeekableScanner)this.getScanner(meta, schema, fragment, target);
+  }
+
+  /**
    * Returns Appender instance.
    * @param queryContext Query property.
    * @param taskAttemptId Task id.
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
new file mode 100644
index 0000000..9c4fce5
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/BuiltinFragmentKinds.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+public class BuiltinFragmentKinds {
+  public static final String FILE = "FILE";
+  public static final String HBASE = "HBASE";
+  public static final String JDBC = "JDBC";
+}
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
index ac43197..a8de4ab 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
@@ -18,22 +18,150 @@
 
 package org.apache.tajo.storage.fragment;
 
-import org.apache.tajo.common.ProtoObject;
+import com.google.common.collect.ImmutableList;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import java.net.URI;
 
-public interface Fragment extends ProtoObject<FragmentProto> {
+/**
+ * The Fragment is similar to the split in MapReduce.
+ * For distributed processing of a a single large table,
+ * it contains the information of which part of data will be processed by each task.
+ *
+ * @param <T> type of fragment key. It should implement the Comparable interface.
+ */
+public abstract class Fragment<T extends Comparable> implements Comparable<Fragment<T>>, Cloneable {
 
-  public abstract String getTableName();
+  protected String kind;
+  protected URI uri;
+  protected String inputSourceId;
+  protected T startKey;
+  protected T endKey;
+  protected long length;
+  protected ImmutableList<String> hostNames;
+
+  protected Fragment(String kind,
+                     URI uri,
+                     String inputSourceId,
+                     T startKey,
+                     T endKey,
+                     long length,
+                     String[] hostNames) {
+    this.kind = kind;
+    this.uri = uri;
+    this.inputSourceId = inputSourceId;
+    this.startKey = startKey;
+    this.endKey = endKey;
+    this.length = length;
+    this.hostNames = hostNames == null ? ImmutableList.of() : ImmutableList.copyOf(hostNames);
+  }
+
+  /**
+   * Returns the fragment type.
+   *
+   * @return fragment type
+   */
+  public final String getKind() {
+    return kind;
+  }
+
+  /**
+   * Returns an unique URI of the input source.
+   *
+   * @return URI of the input source
+   */
+  public final URI getUri() {
+    return uri;
+  }
+
+  /**
+   * Returns a unique id of the input source.
+   *
+   * @return id of the input source
+   */
+  public final String getInputSourceId() {
+    return this.inputSourceId;
+  }
+
+  /**
+   * Returns the start key of the data range.
+   * {@link org.apache.tajo.storage.Scanner} will start reading data from the point indicated by this key.
+   *
+   * @return start key
+   */
+  public final T getStartKey() {
+    return startKey;
+  }
+
+  /**
+   * Returns the end key of the data range.
+   * {@link org.apache.tajo.storage.Scanner} will stop reading data when it reaches the point indicated by this key.
+   *
+   * @return end key
+   */
+  public final T getEndKey() {
+    return endKey;
+  }
+
+  /**
+   * Returns the length of the data range between start key and end key.
+   *
+   * @return length of the range
+   */
+  public final long getLength() {
+    return length;
+  }
+
+  /**
+   * Returns host names which have any portion of the data between start key and end key.
+   *
+   * @return host names
+   */
+  public final ImmutableList<String> getHostNames() {
+    return hostNames;
+  }
+
+  /**
+   * Indicates the fragment is empty or not.
+   * An empty fragment means that there is no data to read.
+   *
+   * @return true if the fragment is empty. Otherwise, false.
+   */
+  public boolean isEmpty() {
+    return length == 0;
+  }
+
+  /**
+   * First compares URIs of fragments, and then compares their start keys.
+   *
+   * @param t
+   * @return return 0 if two fragments are same. If not same, return -1 if this fragment is smaller than the other.
+   * Otherwise, return 1;
+   */
+  @Override
+  public final int compareTo(Fragment<T> t) {
+    int cmp = uri.compareTo(t.uri);
+    if (cmp == 0) {
+      if (startKey != null && t.startKey != null) {
+        return startKey.compareTo(t.startKey);
+      } else if (startKey == null) {  // nulls last
+        return 1;
+      } else {
+        return -1;
+      }
+    } else {
+      return cmp;
+    }
+  }
 
   @Override
-  public abstract FragmentProto getProto();
-
-  public abstract long getLength();
-
-  public abstract String getKey();
-
-  public String[] getHosts();
-
-  public abstract boolean isEmpty();
+  public Object clone() throws CloneNotSupportedException {
+    Fragment clone = (Fragment) super.clone();
+    clone.uri = this.uri;
+    clone.inputSourceId = this.inputSourceId;
+    clone.startKey = this.startKey;
+    clone.endKey = this.endKey;
+    clone.hostNames = this.hostNames;
+    clone.length = this.length;
+    return clone;
+  }
 }
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
index 4ce6928..835a714 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
@@ -20,13 +20,12 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.annotation.ThreadSafe;
 import org.apache.tajo.exception.TajoInternalError;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Map;
 
@@ -34,69 +33,47 @@
 
 @ThreadSafe
 public class FragmentConvertor {
-  /**
-   * Cache of fragment classes
-   */
-  protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap();
-  /**
-   * Cache of constructors for each class.
-   */
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
-  /**
-   * default parameter for all constructors
-   */
-  private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class };
 
-  public static Class<? extends Fragment> getFragmentClass(Configuration conf, String dataFormat) {
-    Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(dataFormat.toLowerCase());
-    if (fragmentClass == null) {
-      fragmentClass = conf.getClass(
-          String.format("tajo.storage.fragment.%s.class", dataFormat.toLowerCase()), null, Fragment.class);
-      CACHED_FRAGMENT_CLASSES.put(dataFormat.toLowerCase(), fragmentClass);
+  private static final Map<String, FragmentSerde> SERDE_MAP = Maps.newConcurrentMap();
+
+  private static FragmentSerde getFragmentSerde(Configuration conf, String fragmentKind) {
+    fragmentKind = fragmentKind.toLowerCase();
+    FragmentSerde serde = SERDE_MAP.get(fragmentKind);
+    if (serde == null) {
+      Class<? extends FragmentSerde> serdeClass = conf.getClass(
+          String.format("tajo.storage.fragment.serde.%s", fragmentKind), null, FragmentSerde.class);
+      try {
+        serde = serdeClass.getConstructor(null).newInstance();
+      } catch (InstantiationException
+          | IllegalAccessException
+          | InvocationTargetException
+          | NoSuchMethodException e) {
+        throw new TajoInternalError(e);
+      }
+      SERDE_MAP.put(fragmentKind, serde);
     }
 
-    if (fragmentClass == null) {
-      throw new TajoInternalError("No such a fragment for " + dataFormat.toLowerCase());
+    if (serde == null) {
+      throw new TajoInternalError("No such a serde for " + fragmentKind);
     }
 
-    return fragmentClass;
+    return serde;
   }
 
-  public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) {
-    T result;
+  public static <T extends Fragment> T convert(Configuration conf, String fragmentKind, FragmentProto fragment) {
+    FragmentSerde serde = getFragmentSerde(conf, fragmentKind);
     try {
-      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
-      if (constructor == null) {
-        constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS);
-        constructor.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(clazz, constructor);
-      }
-      result = constructor.newInstance(new Object[]{fragment.getContents()});
-    } catch (Throwable e) {
+      return (T) serde.deserialize(
+          serde.newBuilder()
+              .mergeFrom(fragment.getContents())
+              .build());
+    } catch (InvalidProtocolBufferException e) {
       throw new TajoInternalError(e);
     }
-
-    return result;
   }
 
   public static <T extends Fragment> T convert(Configuration conf, FragmentProto fragment) {
-    Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, fragment.getDataFormat().toLowerCase());
-    if (fragmentClass == null) {
-      throw new TajoInternalError("No such a fragment class for " + fragment.getDataFormat());
-    }
-    return convert(fragmentClass, fragment);
-  }
-
-  public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments)
-      throws IOException {
-    List<T> list = Lists.newArrayList();
-    if (fragments == null) {
-      return list;
-    }
-    for (FragmentProto proto : fragments) {
-      list.add(convert(clazz, proto));
-    }
-    return list;
+    return convert(conf, fragment.getKind(), fragment);
   }
 
   public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) {
@@ -105,24 +82,32 @@
       return list;
     }
     for (FragmentProto proto : fragments) {
-      list.add((T) convert(conf, proto));
+      list.add(convert(conf, proto));
     }
     return list;
   }
 
-  public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) {
+  public static FragmentProto toFragmentProto(Configuration conf, Fragment fragment) {
+    FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
+    fragmentBuilder.setId(fragment.getInputSourceId());
+    fragmentBuilder.setKind(fragment.getKind());
+    fragmentBuilder.setContents(getFragmentSerde(conf, fragment.getKind()).serialize(fragment).toByteString());
+    return fragmentBuilder.build();
+  }
+
+  public static List<FragmentProto> toFragmentProtoList(Configuration conf, Fragment... fragments) {
     List<FragmentProto> list = Lists.newArrayList();
     if (fragments == null) {
       return list;
     }
     for (Fragment fragment : fragments) {
-      list.add(fragment.getProto());
+      list.add(toFragmentProto(conf, fragment));
     }
     return list;
   }
 
-  public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) {
-    List<FragmentProto> list = toFragmentProtoList(fragments);
+  public static FragmentProto [] toFragmentProtoArray(Configuration conf, Fragment... fragments) {
+    List<FragmentProto> list = toFragmentProtoList(conf, fragments);
     return list.toArray(new FragmentProto[list.size()]);
   }
 }
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java
new file mode 100644
index 0000000..c570c9c
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/fragment/FragmentSerde.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.GeneratedMessage.Builder;
+
+/**
+ * FragmentSerde abstracts how a fragment is serialized / deserialized to / from a Protocol Buffer message.
+ *
+ * @param <F> Fragment class
+ * @param <P> Protocol Buffer Message class corresponding to the Fragment class
+ */
+public interface FragmentSerde<F extends Fragment, P extends Message> {
+
+  /**
+   * Creates a new builder of {@link P}.
+   *
+   * @return a Protocol Buffer message builder
+   */
+  Builder newBuilder();
+
+  /**
+   * Serializes a fragment into a Protocol Buffer message.
+   *
+   * @param fragment
+   * @return a serialized Protocol Buffer message
+   */
+  P serialize(F fragment);
+
+  /**
+   * Deserializes a Protocol Buffer message to a fragment.
+   *
+   * @param proto
+   * @return a deserialized fragment instance
+   */
+  F deserialize(P proto);
+}
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index 2454714..4e57204 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -44,53 +44,29 @@
 
   <!--- Fragment Class Configurations -->
   <property>
-    <name>tajo.storage.fragment.text.class</name>
+    <name>tajo.storage.fragment.kind.file</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
-    <name>tajo.storage.fragment.json.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.raw.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.draw.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.rcfile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.row.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.parquet.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.orc.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.sequencefile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.avro.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.hbase.class</name>
+    <name>tajo.storage.fragment.kind.hbase</name>
     <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
   </property>
   <property>
-    <name>tajo.storage.fragment.jdbc.class</name>
+    <name>tajo.storage.fragment.kind.jdbc</name>
     <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
   </property>
+  <property>
+    <name>tajo.storage.fragment.serde.file</name>
+    <value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.serde.hbase</name>
+    <value>org.apache.tajo.storage.hbase.HBaseFragmentSerde</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.serde.jdbc</name>
+    <value>org.apache.tajo.storage.jdbc.JdbcFragmentSerde</value>
+  </property>
 
   <!--- Scanner Handler -->
   <property>
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index 1c4530a..5bf6b0b 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -43,53 +43,29 @@
 
   <!--- Fragment Class Configurations -->
   <property>
-    <name>tajo.storage.fragment.text.class</name>
+    <name>tajo.storage.fragment.kind.file</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
-    <name>tajo.storage.fragment.json.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.raw.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.draw.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.rcfile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.row.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.parquet.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.orc.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.sequencefile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.avro.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.hbase.class</name>
+    <name>tajo.storage.fragment.kind.hbase</name>
     <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
   </property>
   <property>
-    <name>tajo.storage.fragment.jdbc.class</name>
+    <name>tajo.storage.fragment.kind.jdbc</name>
     <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
   </property>
+  <property>
+    <name>tajo.storage.fragment.serde.file</name>
+    <value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.serde.hbase</name>
+    <value>org.apache.tajo.storage.hbase.HBaseFragmentSerde</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.serde.jdbc</name>
+    <value>org.apache.tajo.storage.jdbc.JdbcFragmentSerde</value>
+  </property>
 
   <!--- Scanner Handler -->
   <property>
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
index 18aa515..e1026bb 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
@@ -19,112 +19,49 @@
 package org.apache.tajo.storage.hbase;
 
 import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tajo.BuiltinStorages;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.storage.fragment.BuiltinFragmentKinds;
 import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.hbase.StorageFragmentProtos.HBaseFragmentProto;
+import org.apache.tajo.storage.hbase.HBaseFragment.HBaseFragmentKey;
 
 import java.net.URI;
 
-public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable {
-  @Expose
-  private URI uri;
-  @Expose
-  private String tableName;
-  @Expose
+/**
+ * Fragment for HBase
+ */
+public class HBaseFragment extends Fragment<HBaseFragmentKey> {
   private String hbaseTableName;
-  @Expose
-  private byte[] startRow;
-  @Expose
-  private byte[] stopRow;
-  @Expose
-  private String regionLocation;
-  @Expose
   private boolean last;
-  @Expose
-  private long length;
 
   public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow,
                        String regionLocation) {
-    this.uri = uri;
-    this.tableName = tableName;
+    super(BuiltinFragmentKinds.HBASE, uri, tableName, new HBaseFragmentKey(startRow), new HBaseFragmentKey(stopRow),
+        TajoConstants.UNKNOWN_LENGTH, new String[]{regionLocation});
+
     this.hbaseTableName = hbaseTableName;
-    this.startRow = startRow;
-    this.stopRow = stopRow;
-    this.regionLocation = regionLocation;
     this.last = false;
   }
 
-  public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException {
-    HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
-    builder.mergeFrom(raw);
-    builder.build();
-    init(builder.build());
-  }
-
-  private void init(HBaseFragmentProto proto) {
-    this.uri = URI.create(proto.getUri());
-    this.tableName = proto.getTableName();
-    this.hbaseTableName = proto.getHbaseTableName();
-    this.startRow = proto.getStartRow().toByteArray();
-    this.stopRow = proto.getStopRow().toByteArray();
-    this.regionLocation = proto.getRegionLocation();
-    this.length = proto.getLength();
-    this.last = proto.getLast();
-  }
-
-  @Override
-  public int compareTo(HBaseFragment t) {
-    return Bytes.compareTo(startRow, t.startRow);
-  }
-
-  public URI getUri() {
-    return uri;
-  }
-
-  @Override
-  public String getTableName() {
-    return tableName;
-  }
-
-  @Override
-  public String getKey() {
-    return new String(startRow);
+  public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow,
+                       String regionLocation, boolean last) {
+    this(uri, tableName, hbaseTableName, startRow, stopRow, regionLocation);
+    this.last = last;
   }
 
   @Override
   public boolean isEmpty() {
-    return startRow == null || stopRow == null;
-  }
-
-  @Override
-  public long getLength() {
-    return length;
+    return startKey.isEmpty() || endKey.isEmpty();
   }
 
   public void setLength(long length) {
     this.length = length;
   }
 
-  @Override
-  public String[] getHosts() {
-    return new String[] {regionLocation};
-  }
-
   public Object clone() throws CloneNotSupportedException {
     HBaseFragment frag = (HBaseFragment) super.clone();
-    frag.uri = uri;
-    frag.tableName = tableName;
     frag.hbaseTableName = hbaseTableName;
-    frag.startRow = startRow;
-    frag.stopRow = stopRow;
-    frag.regionLocation = regionLocation;
     frag.last = last;
-    frag.length = length;
     return frag;
   }
 
@@ -132,9 +69,9 @@
   public boolean equals(Object o) {
     if (o instanceof HBaseFragment) {
       HBaseFragment t = (HBaseFragment) o;
-      if (tableName.equals(t.tableName)
-          && Bytes.equals(startRow, t.startRow)
-          && Bytes.equals(stopRow, t.stopRow)) {
+      if (inputSourceId.equals(t.inputSourceId)
+          && startKey.equals(t.startKey)
+          && endKey.equals(t.endKey)) {
         return true;
       }
     }
@@ -143,51 +80,19 @@
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow);
+    return Objects.hashCode(inputSourceId, hbaseTableName, startKey, endKey);
   }
 
   @Override
   public String toString() {
     return
-        "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ tableName +
+        "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ inputSourceId +
             "\", hbaseTableName\": \"" + hbaseTableName + "\"" +
-            ", \"startRow\": \"" + new String(startRow) + "\"" +
-            ", \"stopRow\": \"" + new String(stopRow) + "\"" +
+            ", \"startRow\": \"" + new String(startKey.bytes) + "\"" +
+            ", \"stopRow\": \"" + new String(endKey.bytes) + "\"" +
             ", \"length\": \"" + length + "\"}" ;
   }
 
-  @Override
-  public FragmentProto getProto() {
-    HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
-    builder
-        .setUri(uri.toString())
-        .setTableName(tableName)
-        .setHbaseTableName(hbaseTableName)
-        .setStartRow(ByteString.copyFrom(startRow))
-        .setStopRow(ByteString.copyFrom(stopRow))
-        .setLast(last)
-        .setLength(length)
-        .setRegionLocation(regionLocation);
-
-    FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
-    fragmentBuilder.setId(this.tableName);
-    fragmentBuilder.setContents(builder.buildPartial().toByteString());
-    fragmentBuilder.setDataFormat(BuiltinStorages.HBASE);
-    return fragmentBuilder.build();
-  }
-
-  public byte[] getStartRow() {
-    return startRow;
-  }
-
-  public byte[] getStopRow() {
-    return stopRow;
-  }
-
-  public String getRegionLocation() {
-    return regionLocation;
-  }
-
   public boolean isLast() {
     return last;
   }
@@ -200,15 +105,51 @@
     return hbaseTableName;
   }
 
-  public void setHbaseTableName(String hbaseTableName) {
-    this.hbaseTableName = hbaseTableName;
-  }
-
   public void setStartRow(byte[] startRow) {
-    this.startRow = startRow;
+    this.startKey = new HBaseFragmentKey(startRow);
   }
 
   public void setStopRow(byte[] stopRow) {
-    this.stopRow = stopRow;
+    this.endKey = new HBaseFragmentKey(stopRow);
+  }
+
+  public static class HBaseFragmentKey implements Comparable<HBaseFragmentKey> {
+    private final byte[] bytes;
+
+    public HBaseFragmentKey(byte[] key) {
+      this.bytes = key;
+    }
+
+    public byte[] getBytes() {
+      return bytes;
+    }
+
+    @Override
+    public int hashCode() {
+      return Bytes.hashCode(bytes);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof HBaseFragmentKey) {
+        HBaseFragmentKey other = (HBaseFragmentKey) o;
+        return Bytes.equals(bytes, other.bytes);
+      }
+      return false;
+    }
+
+    @Override
+    public int compareTo(HBaseFragmentKey o) {
+      return Bytes.compareTo(bytes, o.bytes);
+    }
+
+    @Override
+    public String toString() {
+      return new String(bytes);
+    }
+
+    public boolean isEmpty() {
+      return this.bytes == null;
+    }
   }
 }
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java
new file mode 100644
index 0000000..f896f43
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragmentSerde.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage.Builder;
+import org.apache.tajo.storage.fragment.FragmentSerde;
+import org.apache.tajo.storage.hbase.StorageFragmentProtos.HBaseFragmentProto;
+
+import java.net.URI;
+
+public class HBaseFragmentSerde implements FragmentSerde<HBaseFragment, HBaseFragmentProto> {
+
+  @Override
+  public Builder newBuilder() {
+    return HBaseFragmentProto.newBuilder();
+  }
+
+  @Override
+  public HBaseFragmentProto serialize(HBaseFragment fragment) {
+    return HBaseFragmentProto.newBuilder()
+        .setUri(fragment.getUri().toASCIIString())
+        .setTableName(fragment.getInputSourceId())
+        .setHbaseTableName(fragment.getHbaseTableName())
+        .setStartRow(ByteString.copyFrom(fragment.getStartKey().getBytes()))
+        .setStopRow(ByteString.copyFrom(fragment.getEndKey().getBytes()))
+        .setLast(fragment.isLast())
+        .setLength(fragment.getLength())
+        .setRegionLocation(fragment.getHostNames().get(0))
+        .build();
+  }
+
+  @Override
+  public HBaseFragment deserialize(HBaseFragmentProto proto) {
+    return new HBaseFragment(
+        URI.create(proto.getUri()),
+        proto.getTableName(),
+        proto.getHbaseTableName(),
+        proto.getStartRow().toByteArray(),
+        proto.getStopRow().toByteArray(),
+        proto.getRegionLocation(),
+        proto.getLast());
+  }
+}
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index b2ca02d..781f911 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -36,10 +36,16 @@
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.exception.*;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoInternalError;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.BytesUtils;
 
@@ -175,16 +181,16 @@
       }
     }
 
-    scan.setStartRow(fragment.getStartRow());
-    if (fragment.isLast() && fragment.getStopRow() != null &&
-        fragment.getStopRow().length > 0) {
+    scan.setStartRow(fragment.getStartKey().getBytes());
+    if (fragment.isLast() && !fragment.getEndKey().isEmpty() &&
+        fragment.getEndKey().getBytes().length > 0) {
       // last and stopRow is not empty
       if (filters == null) {
         filters = new FilterList();
       }
-      filters.addFilter(new InclusiveStopFilter(fragment.getStopRow()));
+      filters.addFilter(new InclusiveStopFilter(fragment.getEndKey().getBytes()));
     } else {
-      scan.setStopRow(fragment.getStopRow());
+      scan.setStopRow(fragment.getEndKey().getBytes());
     }
 
     if (filters != null) {
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 0cf883e..e3f7c25 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -525,10 +525,10 @@
 
           if (fragmentMap.containsKey(regionStartKey)) {
             final HBaseFragment prevFragment = fragmentMap.get(regionStartKey);
-            if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) {
+            if (Bytes.compareTo(fragmentStart, prevFragment.getStartKey().getBytes()) < 0) {
               prevFragment.setStartRow(fragmentStart);
             }
-            if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) {
+            if (Bytes.compareTo(fragmentStop, prevFragment.getEndKey().getBytes()) > 0) {
               prevFragment.setStopRow(fragmentStop);
             }
           } else {
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index f3cb9a5..17c413e 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -453,8 +453,8 @@
   /**
    * Get Disk Ids by Volume Bytes
    */
-  private int[] getDiskIds(VolumeId[] volumeIds) {
-    int[] diskIds = new int[volumeIds.length];
+  private Integer[] getDiskIds(VolumeId[] volumeIds) {
+    Integer[] diskIds = new Integer[volumeIds.length];
     for (int i = 0; i < volumeIds.length; i++) {
       int diskId = -1;
       if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index 6d8443e..a6850c1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -92,7 +92,7 @@
       fis = new FileInputStream(file);
       channel = fis.getChannel();
       filePosition = startOffset = fragment.getStartKey();
-      endOffset = fragment.getStartKey() + fragment.getLength();
+      endOffset = fragment.getEndKey();
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size()
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
deleted file mode 100644
index 958e1c6..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ /dev/null
@@ -1,502 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.exception.TajoRuntimeException;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.BitArray;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Arrays;
-
-public class RowFile {
-  public static final Log LOG = LogFactory.getLog(RowFile.class);
-
-  private static final int SYNC_ESCAPE = -1;
-  private static final int SYNC_HASH_SIZE = 16;
-  private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE;
-  private final static int DEFAULT_BUFFER_SIZE = 65535;
-  public static int SYNC_INTERVAL;
-
-  public static class RowFileScanner extends FileScanner {
-    private FileSystem fs;
-    private FSDataInputStream in;
-    private Tuple tuple;
-
-    private byte[] sync = new byte[SYNC_HASH_SIZE];
-    private byte[] checkSync = new byte[SYNC_HASH_SIZE];
-    private long start, end;
-
-    private ByteBuffer buffer;
-    private final int tupleHeaderSize;
-    private BitArray nullFlags;
-    private long bufferStartPos;
-
-    public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
-        throws IOException {
-      super(conf, schema, meta, fragment);
-
-      SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
-          ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal) * SYNC_SIZE;
-
-      nullFlags = new BitArray(schema.size());
-      tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
-      this.start = this.fragment.getStartKey();
-      this.end = this.start + this.fragment.getLength();
-    }
-
-    public void init() throws IOException {
-      // set default page size.
-      fs = fragment.getPath().getFileSystem(conf);
-      in = fs.open(fragment.getPath());
-      buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.size());
-      buffer.flip();
-
-      readHeader();
-
-      // find the correct position from the start
-      if (this.start > in.getPos()) {
-        long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0;
-        in.seek(realStart);
-      }
-      bufferStartPos = in.getPos();
-      fillBuffer();
-
-      if (start != 0) {
-        // TODO: improve
-        boolean syncFound = false;
-        while (!syncFound) {
-          if (buffer.remaining() < SYNC_SIZE) {
-            fillBuffer();
-          }
-          buffer.mark();
-          syncFound = checkSync();
-          if (!syncFound) {
-            buffer.reset();
-            buffer.get(); // proceed one byte
-          }
-        }
-        bufferStartPos += buffer.position();
-        buffer.compact();
-        buffer.flip();
-      }
-
-      tuple = new VTuple(schema.size());
-
-      super.init();
-    }
-
-    private void readHeader() throws IOException {
-      SYNC_INTERVAL = in.readInt();
-      StorageUtil.readFully(in, this.sync, 0, SYNC_HASH_SIZE);
-    }
-
-    /**
-     * Find the sync from the front of the buffer
-     *
-     * @return return true if it succeeds to find the sync.
-     * @throws java.io.IOException
-     */
-    private boolean checkSync() throws IOException {
-      buffer.getInt();                           // escape
-      buffer.get(checkSync, 0, SYNC_HASH_SIZE);  // sync
-      return Arrays.equals(checkSync, sync);
-    }
-
-    private int fillBuffer() throws IOException {
-      bufferStartPos += buffer.position();
-      buffer.compact();
-      int remain = buffer.remaining();
-      int read = in.read(buffer);
-      if (read == -1) {
-        buffer.flip();
-        return read;
-      } else {
-        int totalRead = read;
-        if (remain > totalRead) {
-          read = in.read(buffer);
-          totalRead += read > 0 ? read : 0;
-        }
-        buffer.flip();
-        return totalRead;
-      }
-    }
-
-    @Override
-    public Tuple next() throws IOException {
-      while (buffer.remaining() < SYNC_SIZE) {
-        if (fillBuffer() < 0) {
-          return null;
-        }
-      }
-
-      buffer.mark();
-      if (!checkSync()) {
-        buffer.reset();
-      } else {
-        if (bufferStartPos + buffer.position() > end) {
-          return null;
-        }
-      }
-
-      while (buffer.remaining() < tupleHeaderSize) {
-        if (fillBuffer() < 0) {
-          return null;
-        }
-      }
-
-      int i;
-
-      int nullFlagSize = buffer.getShort();
-      byte[] nullFlagBytes = new byte[nullFlagSize];
-      buffer.get(nullFlagBytes, 0, nullFlagSize);
-      nullFlags = new BitArray(nullFlagBytes);
-      int tupleSize = buffer.getShort();
-
-      while (buffer.remaining() < (tupleSize)) {
-        if (fillBuffer() < 0) {
-          return null;
-        }
-      }
-
-      Datum datum;
-      Column col;
-      for (i = 0; i < schema.size(); i++) {
-        if (!nullFlags.get(i)) {
-          col = schema.getColumn(i);
-          switch (col.getDataType().getType()) {
-            case BOOLEAN :
-              datum = DatumFactory.createBool(buffer.get());
-              tuple.put(i, datum);
-              break;
-
-            case BIT:
-              datum = DatumFactory.createBit(buffer.get());
-              tuple.put(i, datum );
-              break;
-
-            case CHAR :
-              int realLen = buffer.getInt();
-              byte[] buf = new byte[col.getDataType().getLength()];
-              buffer.get(buf);
-              byte[] charBuf = Arrays.copyOf(buf, realLen);
-              tuple.put(i, DatumFactory.createChar(charBuf));
-              break;
-
-            case INT2 :
-              datum = DatumFactory.createInt2(buffer.getShort());
-              tuple.put(i, datum );
-              break;
-
-            case INT4 :
-              datum = DatumFactory.createInt4(buffer.getInt());
-              tuple.put(i, datum );
-              break;
-
-            case INT8 :
-              datum = DatumFactory.createInt8(buffer.getLong());
-              tuple.put(i, datum );
-              break;
-
-            case FLOAT4 :
-              datum = DatumFactory.createFloat4(buffer.getFloat());
-              tuple.put(i, datum);
-              break;
-
-            case FLOAT8 :
-              datum = DatumFactory.createFloat8(buffer.getDouble());
-              tuple.put(i, datum);
-              break;
-
-            case TEXT:
-              short bytelen = buffer.getShort();
-              byte[] strbytes = new byte[bytelen];
-              buffer.get(strbytes, 0, bytelen);
-              datum = DatumFactory.createText(strbytes);
-              tuple.put(i, datum);
-              break;
-
-            case BLOB:
-              short bytesLen = buffer.getShort();
-              byte [] bytesBuf = new byte[bytesLen];
-              buffer.get(bytesBuf);
-              datum = DatumFactory.createBlob(bytesBuf);
-              tuple.put(i, datum);
-              break;
-
-            default:
-              break;
-          }
-        } else {
-          tuple.put(i, DatumFactory.createNullDatum());
-        }
-      }
-      return tuple;
-    }
-
-    @Override
-    public void reset() throws IOException {
-      init();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (in != null) {
-        in.close();
-      }
-    }
-
-    @Override
-    public boolean isProjectable() {
-      return false;
-    }
-
-    @Override
-    public boolean isSelectable() {
-      return false;
-    }
-
-    @Override
-    public void setFilter(EvalNode filter) {
-      throw new TajoRuntimeException(new UnsupportedException());
-    }
-
-    @Override
-    public boolean isSplittable(){
-      return true;
-    }
-  }
-
-  public static class RowFileAppender extends FileAppender {
-    private FSDataOutputStream out;
-    private long lastSyncPos;
-    private FileSystem fs;
-    private byte[] sync;
-    private ByteBuffer buffer;
-
-    private BitArray nullFlags;
-    // statistics
-    private TableStatistics stats;
-    private ShuffleType shuffleType;
-
-    public RowFileAppender(Configuration conf, final TaskAttemptId taskAttemptId,
-                           final Schema schema, final TableMeta meta, final Path workDir)
-        throws IOException {
-      super(conf, taskAttemptId, schema, meta, workDir);
-    }
-
-    public void init() throws IOException {
-      SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname,
-          ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal);
-      fs = path.getFileSystem(conf);
-
-      if (!fs.exists(path.getParent())) {
-        throw new FileNotFoundException(path.toString());
-      }
-
-      if (fs.exists(path)) {
-        throw new AlreadyExistsStorageException(path);
-      }
-
-      sync = new byte[SYNC_HASH_SIZE];
-      lastSyncPos = 0;
-
-      out = fs.create(path);
-
-      MessageDigest md;
-      try {
-        md = MessageDigest.getInstance("MD5");
-        md.update((path.toString()+System.currentTimeMillis()).getBytes());
-        sync = md.digest();
-      } catch (NoSuchAlgorithmException e) {
-        LOG.error(e);
-      }
-
-      writeHeader();
-
-      buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
-
-      nullFlags = new BitArray(schema.size());
-
-      if (tableStatsEnabled) {
-        this.stats = new TableStatistics(this.schema, columnStatsEnabled);
-        this.shuffleType = PlannerUtil.getShuffleType(
-            meta.getProperty(StorageConstants.SHUFFLE_TYPE,
-                PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)));
-      }
-    }
-
-    private void writeHeader() throws IOException {
-      out.writeInt(SYNC_INTERVAL);
-      out.write(sync);
-      out.flush();
-      lastSyncPos = out.getPos();
-    }
-
-    @Override
-    public void addTuple(Tuple t) throws IOException {
-      checkAndWriteSync();
-      Column col;
-
-      buffer.clear();
-      nullFlags.clear();
-
-      for (int i = 0; i < schema.size(); i++) {
-        if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
-          // it is to calculate min/max values, and it is only used for the intermediate file.
-          stats.analyzeField(i, t);
-        }
-
-        if (t.isBlankOrNull(i)) {
-          nullFlags.set(i);
-        } else {
-          col = schema.getColumn(i);
-          switch (col.getDataType().getType()) {
-            case BOOLEAN:
-              buffer.put(t.getByte(i));
-              break;
-            case BIT:
-              buffer.put(t.getByte(i));
-              break;
-            case CHAR:
-              byte[] src = t.getBytes(i);
-              byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
-              buffer.putInt(src.length);
-              buffer.put(dst);
-              break;
-            case TEXT:
-              byte [] strbytes = t.getBytes(i);
-              buffer.putShort((short)strbytes.length);
-              buffer.put(strbytes, 0, strbytes.length);
-              break;
-            case INT2:
-              buffer.putShort(t.getInt2(i));
-              break;
-            case INT4:
-              buffer.putInt(t.getInt4(i));
-              break;
-            case INT8:
-              buffer.putLong(t.getInt8(i));
-              break;
-            case FLOAT4:
-              buffer.putFloat(t.getFloat4(i));
-              break;
-            case FLOAT8:
-              buffer.putDouble(t.getFloat8(i));
-              break;
-            case BLOB:
-              byte [] bytes = t.getBytes(i);
-              buffer.putShort((short)bytes.length);
-              buffer.put(bytes);
-              break;
-            case NULL_TYPE:
-              nullFlags.set(i);
-              break;
-            default:
-              break;
-          }
-        }
-      }
-
-      byte[] bytes = nullFlags.toArray();
-      out.writeShort(bytes.length);
-      out.write(bytes);
-
-      bytes = buffer.array();
-      int dataLen = buffer.position();
-      out.writeShort(dataLen);
-      out.write(bytes, 0, dataLen);
-
-      // Statistical section
-      if (tableStatsEnabled) {
-        stats.incrementRow();
-      }
-    }
-
-    @Override
-    public long getOffset() throws IOException {
-      return out.getPos();
-    }
-
-    @Override
-    public void flush() throws IOException {
-      out.flush();
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (out != null) {
-        if (tableStatsEnabled) {
-          stats.setNumBytes(out.getPos());
-        }
-        sync();
-        out.flush();
-        IOUtils.cleanup(LOG, out);
-      }
-    }
-
-    private void sync() throws IOException {
-      if (lastSyncPos != out.getPos()) {
-        out.writeInt(SYNC_ESCAPE);
-        out.write(sync);
-        lastSyncPos = out.getPos();
-      }
-    }
-
-    private void checkAndWriteSync() throws IOException {
-      if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) {
-        sync();
-      }
-    }
-
-    @Override
-    public TableStats getStats() {
-      if (tableStatsEnabled) {
-        return stats.getTableStat();
-      } else {
-        return null;
-      }
-    }
-  }
-}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
index 8f18c7a..7bdf0cb 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -19,161 +19,61 @@
 package org.apache.tajo.storage.fragment;
 
 import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.BuiltinStorages;
-import org.apache.tajo.storage.StorageFragmentProtos.FileFragmentProto;
+import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable {
-  @Expose private String tableName; // required
-  @Expose private Path uri; // required
-  @Expose public Long startOffset; // required
-  @Expose public Long length; // required
-
-  private String[] hosts; // Datanode hostnames
-  @Expose private int[] diskIds;
-
-  public FileFragment(ByteString raw) throws InvalidProtocolBufferException {
-    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
-    builder.mergeFrom(raw);
-    builder.build();
-    init(builder.build());
-  }
+/**
+ * Fragment for file systems.
+ */
+public class FileFragment extends Fragment<Long> {
+  private Integer[] diskIds; // disk volume ids
 
   public FileFragment(String tableName, Path uri, BlockLocation blockLocation)
       throws IOException {
-    this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null);
+    this(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null);
   }
 
-  public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) {
-    this.set(tableName, uri, start, length, hosts, diskIds);
-  }
-  // Non splittable
-  public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) {
-    this.set(tableName, uri, start, length, hosts, null);
-  }
-
-  public FileFragment(String fragmentId, Path path, long start, long length) {
-    this.set(fragmentId, path, start, length, null, null);
-  }
-
-  public FileFragment(FileFragmentProto proto) {
-    init(proto);
-  }
-
-  private void init(FileFragmentProto proto) {
-    int[] diskIds = new int[proto.getDiskIdsList().size()];
-    int i = 0;
-    for(Integer eachValue: proto.getDiskIdsList()) {
-      diskIds[i++] = eachValue;
-    }
-    List<String> var = proto.getHostsList();
-    this.set(proto.getId(), new Path(proto.getPath()),
-        proto.getStartOffset(), proto.getLength(),
-            var.toArray(new String[var.size()]),
-        diskIds);
-  }
-
-  private void set(String tableName, Path path, long start,
-      long length, String[] hosts, int[] diskIds) {
-    this.tableName = tableName;
-    this.uri = path;
-    this.startOffset = start;
-    this.length = length;
-    this.hosts = hosts;
+  public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, Integer[] diskIds) {
+    super(BuiltinFragmentKinds.FILE, uri.toUri(), tableName, start, start + length, length, hosts);
     this.diskIds = diskIds;
   }
 
+  // Non splittable
+  public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) {
+    this(tableName, uri, start, length, hosts, null);
+  }
 
-  /**
-   * Get the list of hosts (hostname) hosting this block
-   */
-  public String[] getHosts() {
-    if (hosts == null) {
-      this.hosts = new String[0];
-    }
-    return hosts;
+  public FileFragment(String fragmentId, Path path, long start, long length) {
+    this(fragmentId, path, start, length, null, null);
   }
 
   /**
    * Get the list of Disk Ids
    * Unknown disk is -1. Others 0 ~ N
    */
-  public int[] getDiskIds() {
+  public Integer[] getDiskIds() {
     if (diskIds == null) {
-      this.diskIds = new int[getHosts().length];
-      Arrays.fill(this.diskIds, -1);
+      this.diskIds = new Integer[getHostNames().size()];
+      Arrays.fill(this.diskIds, DataLocation.UNKNOWN_VOLUME_ID);
     }
     return diskIds;
   }
 
-  public void setDiskIds(int[] diskIds){
+  public void setDiskIds(Integer[] diskIds){
     this.diskIds = diskIds;
   }
 
-  @Override
-  public String getTableName() {
-    return this.tableName;
-  }
-
   public Path getPath() {
-    return this.uri;
+    return new Path(uri);
   }
 
   public void setPath(Path path) {
-    this.uri = path;
-  }
-
-  public Long getStartKey() {
-    return this.startOffset;
-  }
-
-  @Override
-  public String getKey() {
-    return this.uri.toString();
-  }
-
-  @Override
-  public long getLength() {
-    return this.length;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return this.length <= 0;
-  }
-  /**
-   * 
-   * The offset range of tablets <b>MUST NOT</b> be overlapped.
-   * 
-   * @param t
-   * @return If the table paths are not same, return -1.
-   */
-  @Override
-  public int compareTo(FileFragment t) {
-    if (getPath().equals(t.getPath())) {
-      long diff = this.getStartKey() - t.getStartKey();
-      if (diff < 0) {
-        return -1;
-      } else if (diff > 0) {
-        return 1;
-      } else {
-        return 0;
-      }
-    } else {
-      return -1;
-    }
+    this.uri = path.toUri();
   }
 
   @Override
@@ -191,48 +91,20 @@
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(tableName, uri, startOffset, length);
+    return Objects.hashCode(inputSourceId, uri, startKey, endKey, length, diskIds, hostNames);
   }
-  
+
+  @Override
   public Object clone() throws CloneNotSupportedException {
     FileFragment frag = (FileFragment) super.clone();
-    frag.tableName = tableName;
-    frag.uri = uri;
     frag.diskIds = diskIds;
-    frag.hosts = hosts;
-
     return frag;
   }
 
   @Override
   public String toString() {
-    return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
+    return "\"fragment\": {\"id\": \""+ inputSourceId +"\", \"path\": "
     		+getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": "
         + getLength() + "}" ;
   }
-
-  public FragmentProto getProto() {
-    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
-    builder.setId(this.tableName);
-    builder.setStartOffset(this.startOffset);
-    builder.setLength(this.length);
-    builder.setPath(this.uri.toString());
-    if(diskIds != null) {
-      List<Integer> idList = new ArrayList<>();
-      for(int eachId: diskIds) {
-        idList.add(eachId);
-      }
-      builder.addAllDiskIds(idList);
-    }
-
-    if(hosts != null) {
-      builder.addAllHosts(Arrays.asList(hosts));
-    }
-
-    FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
-    fragmentBuilder.setId(this.tableName);
-    fragmentBuilder.setDataFormat(BuiltinStorages.TEXT);
-    fragmentBuilder.setContents(builder.buildPartial().toByteString());
-    return fragmentBuilder.build();
-  }
 }
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java
new file mode 100644
index 0000000..926e5fe
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragmentSerde.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.fragment;
+
+import com.google.protobuf.GeneratedMessage.Builder;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.storage.StorageFragmentProtos.FileFragmentProto;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FileFragmentSerde implements FragmentSerde<FileFragment, FileFragmentProto> {
+
+  @Override
+  public Builder newBuilder() {
+    return FileFragmentProto.newBuilder();
+  }
+
+  @Override
+  public FileFragmentProto serialize(FileFragment fragment) {
+    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
+    builder.setId(fragment.inputSourceId);
+    builder.setStartOffset(fragment.startKey);
+    builder.setLength(fragment.length);
+    builder.setPath(fragment.getPath().toString());
+    if(fragment.getDiskIds() != null) {
+      List<Integer> idList = new ArrayList<>();
+      for(int eachId: fragment.getDiskIds()) {
+        idList.add(eachId);
+      }
+      builder.addAllDiskIds(idList);
+    }
+
+    if(fragment.hostNames != null) {
+      builder.addAllHosts(fragment.hostNames);
+    }
+    return builder.build();
+  }
+
+  @Override
+  public FileFragment deserialize(FileFragmentProto proto) {
+    return new FileFragment(
+        proto.getId(),
+        new Path(proto.getPath()),
+        proto.getStartOffset(),
+        proto.getLength(),
+        proto.getHostsList().toArray(new String[proto.getHostsCount()]),
+        proto.getDiskIdsList().toArray(new Integer[proto.getDiskIdsCount()]));
+  }
+}
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index 964a58a..a2688b1 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -88,7 +88,7 @@
     }
 
     pos = startOffset = fragment.getStartKey();
-    end = startOffset + fragment.getLength();
+    end = fragment.getEndKey();
 
     if (codec != null) {
       fis = fs.open(fragment.getPath());
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index e112b0d..d52f46d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -291,7 +291,7 @@
       }
 
       startOffset = this.fragment.getStartKey();
-      endOffset = startOffset + fragment.getLength();
+      endOffset = this.fragment.getEndKey();
 
       errorTorrenceMaxNum =
           Integer.parseInt(meta.getProperty(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM));
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
index 58fea2b..a29e86b 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java
@@ -99,7 +99,7 @@
     long rows = 0;
     long skippedRows = 0;
     long offset = fragment.getStartKey();
-    long maxOffset = fragment.getStartKey() + fragment.getLength();
+    long maxOffset = fragment.getEndKey();
     for(StripeInformation stripe: stripes) {
       long stripeStart = stripe.getOffset();
       if (offset > stripeStart) {
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index ad45d18..42b0b23 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -159,14 +159,14 @@
       splits.addAll(space.getSplits("data", meta, schema, false, partitions.toArray(new Path[partitions.size()])));
       assertEquals(testCount, splits.size());
       // -1 is unknown volumeId
-      assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+      assertEquals(DataLocation.UNKNOWN_VOLUME_ID, ((FileFragment)splits.get(0)).getDiskIds()[0].intValue());
 
       splits.clear();
       splits.addAll(space.getSplits("data", meta, schema, false,
           partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
       assertEquals(testCount / 2, splits.size());
-      assertEquals(1, splits.get(0).getHosts().length);
-      assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+      assertEquals(1, splits.get(0).getHostNames().size());
+      assertEquals(DataLocation.UNKNOWN_VOLUME_ID, ((FileFragment)splits.get(0)).getDiskIds()[0].intValue());
 
       fs.close();
     } finally {
@@ -259,9 +259,9 @@
       splits.addAll(sm.getSplits("data", meta, schema, false, tablePath));
 
       assertEquals(testCount, splits.size());
-      assertEquals(2, splits.get(0).getHosts().length);
+      assertEquals(2, splits.get(0).getHostNames().size());
       assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
-      assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+      assertNotEquals(DataLocation.UNKNOWN_VOLUME_ID, ((FileFragment)splits.get(0)).getDiskIds()[0].intValue());
 
       fs.close();
     } finally {
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 0e6fde5..7a9b074 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -21,6 +21,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
@@ -74,8 +75,8 @@
   @Parameters(name = "{index}: {0}")
   public static Collection<Object[]> generateParameters() {
     return Arrays.asList(new Object[][]{
-        {"RAW"},
-        {"TEXT"}
+        {BuiltinStorages.RAW},
+        {BuiltinStorages.TEXT}
     });
   }
 
@@ -128,7 +129,7 @@
     creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -152,7 +153,7 @@
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
     reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@ -233,7 +234,7 @@
         keySchema, comp);
     reader.init();
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@ -298,7 +299,7 @@
     creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -372,7 +373,7 @@
     creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -396,7 +397,7 @@
         keySchema, comp);
     reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple result;
@@ -466,7 +467,7 @@
     creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -490,7 +491,7 @@
         keySchema, comp);
     reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple result;
@@ -549,7 +550,7 @@
     creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -575,7 +576,7 @@
         keySchema, comp);
     reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     tuple.put(0, DatumFactory.createInt8(0));
@@ -636,7 +637,7 @@
     creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -744,7 +745,7 @@
     creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -828,7 +829,7 @@
     creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -854,7 +855,7 @@
         keySchema, comp);
     reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     for (int i = (TUPLE_NUM - 1); i > 0; i--) {
@@ -921,7 +922,7 @@
     creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -950,7 +951,7 @@
     assertEquals(comp, reader.getComparator());
 
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple result;
@@ -1023,7 +1024,7 @@
     creater.init();
 
     SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -1047,7 +1048,7 @@
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + dataFormat + ".idx"), keySchema, comp);
     reader.init();
     scanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat()).
-        getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        getSeekableScanner(meta, schema, tablet, schema);
     scanner.init();
 
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 4f6b566..cca7920 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -114,7 +114,7 @@
     creater.init();
 
     SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
-        .getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        .getSeekableScanner(meta, schema, tablet, schema);
     fileScanner.init();
     Tuple keyTuple;
     long offset;
@@ -139,7 +139,7 @@
         "FindValueInCSV.idx"), keySchema, comp);
     reader.init();
     fileScanner = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
-        .getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        .getSeekableScanner(meta, schema, tablet, schema);
     fileScanner.init();
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
       tuple.put(0, DatumFactory.createInt8(i));
@@ -206,7 +206,7 @@
     creater.init();
     
     SeekableScanner fileScanner  = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
-        .getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        .getSeekableScanner(meta, schema, tablet, schema);
     fileScanner.init();
     Tuple keyTuple;
     long offset;
@@ -228,7 +228,7 @@
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
     reader.init();
     fileScanner  = OldStorageManager.getStorageManager(conf, meta.getDataFormat())
-        .getSeekableScanner(meta, schema, tablet.getProto(), schema);
+        .getSeekableScanner(meta, schema, tablet, schema);
     fileScanner.init();
     Tuple result;
     for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
index 3283f9f..7ae58aa 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml
@@ -39,44 +39,28 @@
 
   <!--- Fragment Class Configurations -->
   <property>
-    <name>tajo.storage.fragment.text.class</name>
+    <name>tajo.storage.fragment.kind.file</name>
     <value>org.apache.tajo.storage.fragment.FileFragment</value>
   </property>
   <property>
-    <name>tajo.storage.fragment.json.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+    <name>tajo.storage.fragment.kind.hbase</name>
+    <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
   </property>
   <property>
-    <name>tajo.storage.fragment.raw.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+    <name>tajo.storage.fragment.kind.jdbc</name>
+    <value>org.apache.tajo.storage.jdbc.JdbcFragment</value>
   </property>
   <property>
-    <name>tajo.storage.fragment.draw.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+    <name>tajo.storage.fragment.serde.file</name>
+    <value>org.apache.tajo.storage.fragment.FileFragmentSerde</value>
   </property>
   <property>
-    <name>tajo.storage.fragment.rcfile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+    <name>tajo.storage.fragment.serde.hbase</name>
+    <value>org.apache.tajo.storage.hbase.HBaseFragmentSerde</value>
   </property>
   <property>
-    <name>tajo.storage.fragment.row.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.parquet.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.orc.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.sequencefile.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
-  </property>
-  <property>
-    <name>tajo.storage.fragment.avro.class</name>
-    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+    <name>tajo.storage.fragment.serde.jdbc</name>
+    <value>org.apache.tajo.storage.jdbc.JdbcFragmentSerde</value>
   </property>
 
   <!--- Scanner Handler -->
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
index 0088504..20c1aca 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragment.java
@@ -18,90 +18,34 @@
 
 package org.apache.tajo.storage.jdbc;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.storage.fragment.BuiltinFragmentKinds;
 import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.jdbc.JdbcFragmentProtos.JdbcFragmentProto;
 
-import java.util.Arrays;
+import java.net.URI;
+import java.util.List;
 
-public class JdbcFragment implements Fragment, Comparable<JdbcFragment>, Cloneable {
-  String uri;
-  String inputSourceId;
-  String [] hostNames;
+/**
+ * Fragment for the systems which connects to Tajo via the JDBC interface.
+ */
+public class JdbcFragment extends Fragment<Long> {
 
-
-  public JdbcFragment(ByteString raw) throws InvalidProtocolBufferException {
-    JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder();
-    builder.mergeFrom(raw);
-    builder.build();
-    init(builder.build());
+  // TODO: set start and end keys properly
+  public JdbcFragment(String inputSourceId, URI uri) {
+    super(BuiltinFragmentKinds.JDBC, uri, inputSourceId, null, null, TajoConstants.UNKNOWN_LENGTH, extractHosts(uri));
   }
 
-  public JdbcFragment(String inputSourceId, String uri) {
-    this.inputSourceId = inputSourceId;
-    this.uri = uri;
-    this.hostNames = extractHosts(uri);
+  public JdbcFragment(String inputSourceId, URI uri, List<String> hostNames) {
+    super(BuiltinFragmentKinds.JDBC, uri, inputSourceId, null, null, TajoConstants.UNKNOWN_LENGTH,
+        hostNames.toArray(new String[hostNames.size()]));
   }
 
-  private void init(JdbcFragmentProto proto) {
-    this.uri = proto.getUri();
-    this.inputSourceId = proto.getInputSourceId();
-    this.hostNames = proto.getHostsList().toArray(new String [proto.getHostsCount()]);
-  }
-
-  private String [] extractHosts(String uri) {
+  private static String[] extractHosts(URI uri) {
     return new String[] {ConnectionInfo.fromURI(uri).host};
   }
 
   @Override
-  public String getTableName() {
-    return inputSourceId;
-  }
-
-  public String getUri() {
-    return uri;
-  }
-
-  @Override
-  public long getLength() {
-    return 0;
-  }
-
-  @Override
-  public String getKey() {
-    return null;
-  }
-
-  @Override
-  public String[] getHosts() {
-    return hostNames;
-  }
-
-  @Override
   public boolean isEmpty() {
     return false;
   }
-
-  @Override
-  public CatalogProtos.FragmentProto getProto() {
-    JdbcFragmentProto.Builder builder = JdbcFragmentProto.newBuilder();
-    builder.setInputSourceId(this.inputSourceId);
-    builder.setUri(this.uri);
-    if(hostNames != null) {
-      builder.addAllHosts(Arrays.asList(hostNames));
-    }
-
-    CatalogProtos.FragmentProto.Builder fragmentBuilder = CatalogProtos.FragmentProto.newBuilder();
-    fragmentBuilder.setId(this.inputSourceId);
-    fragmentBuilder.setDataFormat("JDBC");
-    fragmentBuilder.setContents(builder.buildPartial().toByteString());
-    return fragmentBuilder.build();
-  }
-
-  @Override
-  public int compareTo(JdbcFragment o) {
-    return this.uri.compareTo(o.uri);
-  }
 }
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragmentSerde.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragmentSerde.java
new file mode 100644
index 0000000..9e26d80
--- /dev/null
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcFragmentSerde.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.jdbc;
+
+import com.google.protobuf.GeneratedMessage.Builder;
+import org.apache.tajo.storage.fragment.FragmentSerde;
+import org.apache.tajo.storage.jdbc.JdbcFragmentProtos.JdbcFragmentProto;
+
+import java.net.URI;
+
+public class JdbcFragmentSerde implements FragmentSerde<JdbcFragment, JdbcFragmentProto> {
+
+  @Override
+  public Builder newBuilder() {
+    return JdbcFragmentProto.newBuilder();
+  }
+
+  @Override
+  public JdbcFragmentProto serialize(JdbcFragment fragment) {
+    return JdbcFragmentProto.newBuilder()
+        .setInputSourceId(fragment.getInputSourceId())
+        .setUri(fragment.getUri().toASCIIString())
+        .addAllHosts(fragment.getHostNames())
+        .build();
+  }
+
+  @Override
+  public JdbcFragment deserialize(JdbcFragmentProto proto) {
+    return new JdbcFragment(proto.getInputSourceId(), URI.create(proto.getUri()), proto.getHostsList());
+  }
+}
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
index a58b861..76ed3ab 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcScanner.java
@@ -258,7 +258,7 @@
   private ResultSetIterator executeQueryAndGetIter() {
     try {
       LOG.info("Generated SQL: " + generatedSql);
-      Connection conn = DriverManager.getConnection(fragment.uri, connProperties);
+      Connection conn = DriverManager.getConnection(fragment.getUri().toASCIIString(), connProperties);
       Statement statement = conn.createStatement();
       ResultSet resultset = statement.executeQuery(generatedSql);
       return new ResultSetIterator((resultset));
diff --git a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
index 7a630b2..c353831 100644
--- a/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
+++ b/tajo-storage/tajo-storage-jdbc/src/main/java/org/apache/tajo/storage/jdbc/JdbcTablespace.java
@@ -124,7 +124,7 @@
                                   TableDesc tableDesc,
                                   boolean requireSorted,
                                   @Nullable EvalNode filterCondition) throws IOException {
-    return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, tableDesc.getUri().toASCIIString()));
+    return Lists.newArrayList((Fragment)new JdbcFragment(inputSourceId, tableDesc.getUri()));
   }
 
   @Override