TAJO-1928: Can't read parquet on hive meta.
diff --git a/CHANGES b/CHANGES
index 8863224..23ebd03 100644
--- a/CHANGES
+++ b/CHANGES
@@ -285,6 +285,8 @@
 
   BUG FIXES
 
+    TAJO-1928: Can't read parquet on hive meta. (jinho)
+
     TAJO-1926: Disable partition pruning using catalog temporarily. (jaehwa)
     
     TAJO-1924: Repair partition need to calculate partition volume. (jaehwa)
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
index 139e650..76256c5 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
@@ -27,11 +27,16 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.*;
@@ -46,6 +51,7 @@
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
 import org.apache.thrift.TException;
+import parquet.hadoop.ParquetOutputFormat;
 
 import java.io.IOException;
 import java.util.*;
@@ -54,9 +60,10 @@
   protected final Log LOG = LogFactory.getLog(getClass());
 
   private static String HIVE_WAREHOUSE_DIR_CONF_KEY = "hive.metastore.warehouse.dir";
+  private static final int CLIENT_POOL_SIZE = 2;
+  private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
 
   protected Configuration conf;
-  private static final int CLIENT_POOL_SIZE = 2;
   private final HiveCatalogStoreClientPool clientPool;
   private final String defaultTableSpaceUri;
   private final String catalogUri;
@@ -97,12 +104,27 @@
     return exist;
   }
 
+  protected org.apache.hadoop.hive.ql.metadata.Table getHiveTable(String databaseName, final String tableName)
+      throws UndefinedTableException {
+
+    HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+    try {
+      client = clientPool.getClient();
+      return HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName);
+    } catch (NoSuchObjectException nsoe) {
+      throw new UndefinedTableException(tableName);
+    } catch (Exception e) {
+      throw new TajoInternalError(e);
+    } finally {
+      if (client != null) client.release();
+    }
+  }
+
   @Override
   public final CatalogProtos.TableDescProto getTable(String databaseName, final String tableName)
       throws UndefinedTableException {
 
     org.apache.hadoop.hive.ql.metadata.Table table = null;
-    HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
     Path path = null;
     String dataFormat = null;
     org.apache.tajo.catalog.Schema schema = null;
@@ -115,21 +137,14 @@
     //////////////////////////////////
     try {
       // get hive table schema
-      try {
-        client = clientPool.getClient();
-        table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName);
-        path = table.getPath();
-      } catch (NoSuchObjectException nsoe) {
-        throw new UndefinedTableException(tableName);
-      } catch (Exception e) {
-        throw new TajoInternalError(e);
-      }
+      table = getHiveTable(databaseName, tableName);
+      path = table.getPath();
 
       // convert HiveCatalogStore field schema into tajo field schema.
       schema = new org.apache.tajo.catalog.Schema();
 
       List<FieldSchema> fieldSchemaList = table.getCols();
-      boolean isPartitionKey = false;
+      boolean isPartitionKey;
       for (FieldSchema eachField : fieldSchemaList) {
         isPartitionKey = false;
 
@@ -177,14 +192,13 @@
         }
         options.remove(serdeConstants.SERIALIZATION_NULL_FORMAT);
 
-        // set file output format
-        String fileOutputformat = properties.getProperty(hive_metastoreConstants.FILE_OUTPUT_FORMAT);
-        dataFormat = HiveCatalogUtil.getDataFormat(fileOutputformat);
 
-        if (dataFormat.equalsIgnoreCase("TEXT")) {
+        dataFormat = HiveCatalogUtil.getDataFormat(table.getSd());
+        if (BuiltinStorages.TEXT.equals(dataFormat)) {
           options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter));
           options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava(nullFormat));
-        } else if (dataFormat.equals("RCFILE")) {
+
+        } else if (BuiltinStorages.RCFILE.equals(dataFormat)) {
           options.set(StorageConstants.RCFILE_NULL, StringEscapeUtils.escapeJava(nullFormat));
           String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB);
           if (LazyBinaryColumnarSerDe.class.getName().equals(serde)) {
@@ -192,7 +206,8 @@
           } else if (ColumnarSerDe.class.getName().equals(serde)) {
             options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
           }
-        } else if (dataFormat.equals("SEQUENCEFILE")) {
+
+        } else if (BuiltinStorages.SEQUENCE_FILE.equals(dataFormat)) {
           options.set(StorageConstants.SEQUENCEFILE_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter));
           options.set(StorageConstants.SEQUENCEFILE_NULL, StringEscapeUtils.escapeJava(nullFormat));
           String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB);
@@ -201,6 +216,7 @@
           } else if (LazySimpleSerDe.class.getName().equals(serde)) {
             options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
           }
+
         }
 
         // set data size
@@ -248,9 +264,8 @@
       }
     } catch (Throwable t) {
       throw new TajoInternalError(t);
-    } finally {
-      if(client != null) client.release();
     }
+
     TableMeta meta = new TableMeta(dataFormat, options);
     TableDesc tableDesc = new TableDesc(databaseName + "." + tableName, schema, meta, path.toUri());
     if (table.getTableType().equals(TableType.EXTERNAL_TABLE)) {
@@ -334,7 +349,7 @@
 
   @Override
   public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) {
-    throw new TajoRuntimeException(new UnsupportedException("Tablespace in HiveMeta"));
+    // SKIP
   }
 
   @Override
@@ -435,21 +450,18 @@
       sd.getSerdeInfo().setParameters(new HashMap<String, String>());
       sd.getSerdeInfo().setName(table.getTableName());
 
-      // if tajo set location method, thrift client make exception as follows:
-      // Caused by: MetaException(message:java.lang.NullPointerException)
-      // If you want to modify table path, you have to modify on Hive cli.
-      if (tableDesc.isExternal()) {
-        table.setTableType(TableType.EXTERNAL_TABLE.name());
-        table.putToParameters("EXTERNAL", "TRUE");
+      //If tableType is a managed-table, the location is hive-warehouse dir
+      // and it will be wrong path in output committing
+      table.setTableType(TableType.EXTERNAL_TABLE.name());
+      table.putToParameters("EXTERNAL", "TRUE");
 
-        Path tablePath = new Path(tableDesc.getUri());
-        FileSystem fs = tablePath.getFileSystem(conf);
-        if (fs.isFile(tablePath)) {
-          LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path.");
-          sd.setLocation(tablePath.getParent().toString());
-        } else {
-          sd.setLocation(tablePath.toString());
-        }
+      Path tablePath = new Path(tableDesc.getUri());
+      FileSystem fs = tablePath.getFileSystem(conf);
+      if (fs.isFile(tablePath)) {
+        LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path.");
+        sd.setLocation(tablePath.getParent().toString());
+      } else {
+        sd.setLocation(tablePath.toString());
       }
 
       // set column information
@@ -473,14 +485,15 @@
       }
 
       if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.RCFILE)) {
+        StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.RCFILE);
+        sd.setInputFormat(descriptor.getInputFormat());
+        sd.setOutputFormat(descriptor.getOutputFormat());
+
         String serde = tableDesc.getMeta().getOption(StorageConstants.RCFILE_SERDE);
-        sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName());
-        sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName());
         if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) {
-          sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName());
+          sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName());
         } else {
-          sd.getSerdeInfo().setSerializationLib(
-              org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe.class.getName());
+          sd.getSerdeInfo().setSerializationLib(LazyBinaryColumnarSerDe.class.getName());
         }
 
         if (tableDesc.getMeta().getOptions().containsKey(StorageConstants.RCFILE_NULL)) {
@@ -488,9 +501,10 @@
               StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.RCFILE_NULL)));
         }
       } else if (tableDesc.getMeta().getDataFormat().equals(BuiltinStorages.TEXT)) {
-        sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
-        sd.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class.getName());
-        sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName());
+        // TextFileStorageFormatDescriptor has deprecated class. so the class name set directly
+        sd.setInputFormat(TextInputFormat.class.getName());
+        sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getName());
+        sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());
 
         String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.TEXT_DELIMITER,
             StorageConstants.DEFAULT_FIELD_DELIMITER);
@@ -512,12 +526,14 @@
           table.getParameters().remove(StorageConstants.TEXT_NULL);
         }
       } else if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) {
+        StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.SEQUENCEFILE);
+        sd.setInputFormat(descriptor.getInputFormat());
+        sd.setOutputFormat(descriptor.getOutputFormat());
+
         String serde = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE);
-        sd.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName());
-        sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.class.getName());
 
         if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) {
-          sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+          sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());
 
           String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_DELIMITER,
               StorageConstants.DEFAULT_FIELD_DELIMITER);
@@ -533,7 +549,7 @@
               StringEscapeUtils.unescapeJava(fieldDelimiter));
           table.getParameters().remove(StorageConstants.SEQUENCEFILE_DELIMITER);
         } else {
-          sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class.getName());
+          sd.getSerdeInfo().setSerializationLib(LazyBinarySerDe.class.getName());
         }
 
         if (tableDesc.getMeta().containsOption(StorageConstants.SEQUENCEFILE_NULL)) {
@@ -541,14 +557,18 @@
               StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_NULL)));
           table.getParameters().remove(StorageConstants.SEQUENCEFILE_NULL);
         }
-      } else {
-        if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.PARQUET)) {
-          sd.setInputFormat(parquet.hive.DeprecatedParquetInputFormat.class.getName());
-          sd.setOutputFormat(parquet.hive.DeprecatedParquetOutputFormat.class.getName());
-          sd.getSerdeInfo().setSerializationLib(parquet.hive.serde.ParquetHiveSerDe.class.getName());
-        } else {
-          throw new UnsupportedException(tableDesc.getMeta().getDataFormat() + " in HivecatalogStore");
+      } else if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.PARQUET)) {
+        StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.PARQUET);
+        sd.setInputFormat(descriptor.getInputFormat());
+        sd.setOutputFormat(descriptor.getOutputFormat());
+        sd.getSerdeInfo().setSerializationLib(descriptor.getSerde());
+
+        if (tableDesc.getMeta().containsOption(ParquetOutputFormat.COMPRESSION)) {
+          table.putToParameters(ParquetOutputFormat.COMPRESSION,
+              tableDesc.getMeta().getOption(ParquetOutputFormat.COMPRESSION));
         }
+      } else {
+        throw new UnsupportedException(tableDesc.getMeta().getDataFormat() + " in HivecatalogStore");
       }
 
       sd.setSortCols(new ArrayList<Order>());
@@ -1081,6 +1101,6 @@
 
   @Override
   public List<TablespaceProto> getTablespaces() {
-    throw new UnsupportedOperationException();
+    return Lists.newArrayList(getTablespace(TajoConstants.DEFAULT_TABLESPACE_NAME));
   }
 }
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
index 9e1da2b..bbb7ade 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
@@ -20,17 +20,25 @@
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
-import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
-import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.tajo.BuiltinStorages;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.exception.*;
+import org.apache.tajo.exception.LMDNoMatchedDatatypeException;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnknownDataFormatException;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.thrift.TException;
-import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
 
 public class HiveCatalogUtil {
   public static void validateSchema(Table tblSchema) {
@@ -99,25 +107,38 @@
     }
   }
 
-  public static String getDataFormat(String fileFormat) {
-    Preconditions.checkNotNull(fileFormat);
+  public static String getDataFormat(StorageDescriptor descriptor) {
+    Preconditions.checkNotNull(descriptor);
 
-    String[] fileFormatArrary = fileFormat.split("\\.");
-    if(fileFormatArrary.length < 1) {
-      throw new TajoRuntimeException(new UnknownDataFormatException(fileFormat));
-    }
+    String serde = descriptor.getSerdeInfo().getSerializationLib();
+    String inputFormat = descriptor.getInputFormat();
 
-    String outputFormatClass = fileFormatArrary[fileFormatArrary.length-1];
-    if(outputFormatClass.equals(HiveIgnoreKeyTextOutputFormat.class.getSimpleName())) {
-      return BuiltinStorages.TEXT;
-    } else if(outputFormatClass.equals(HiveSequenceFileOutputFormat.class.getSimpleName())) {
-      return CatalogProtos.DataFormat.SEQUENCEFILE.name();
-    } else if(outputFormatClass.equals(RCFileOutputFormat.class.getSimpleName())) {
-      return CatalogProtos.DataFormat.RCFILE.name();
-    } else if(outputFormatClass.equals(DeprecatedParquetOutputFormat.class.getSimpleName())) {
-      return CatalogProtos.DataFormat.PARQUET.name();
+    if (LazySimpleSerDe.class.getName().equals(serde)) {
+      if (TextInputFormat.class.getName().equals(inputFormat)) {
+        return BuiltinStorages.TEXT;
+      } else if (SequenceFileInputFormat.class.getName().equals(inputFormat)) {
+        return BuiltinStorages.SEQUENCE_FILE;
+      } else {
+        throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
+      }
+    } else if (LazyBinarySerDe.class.getName().equals(serde)) {
+      if (SequenceFileInputFormat.class.getName().equals(inputFormat)) {
+        return BuiltinStorages.SEQUENCE_FILE;
+      } else {
+        throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
+      }
+    } else if (LazyBinaryColumnarSerDe.class.getName().equals(serde) || ColumnarSerDe.class.getName().equals(serde)) {
+      if (RCFileInputFormat.class.getName().equals(inputFormat)) {
+        return BuiltinStorages.RCFILE;
+      } else {
+        throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
+      }
+    } else if (ParquetHiveSerDe.class.getName().equals(serde)) {
+      return BuiltinStorages.PARQUET;
+    } else if (AvroSerDe.class.getName().equals(serde)) {
+      return BuiltinStorages.AVRO;
     } else {
-      throw new TajoRuntimeException(new UnknownDataFormatException(fileFormat));
+      throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
     }
   }
 
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
index af1b0b1..c695131 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
@@ -24,6 +24,11 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
@@ -59,9 +64,11 @@
 
   private static HiveCatalogStore store;
   private static Path warehousePath;
+  private static StorageFormatFactory formatFactory;
 
   @BeforeClass
   public static void setUp() throws Exception {
+    formatFactory = new StorageFormatFactory();
     Path testPath = CommonTestingUtil.getTestDir();
     warehousePath = new Path(testPath, "warehouse");
 
@@ -85,7 +92,7 @@
 
   @Test
   public void testTableUsingTextFile() throws Exception {
-    TableMeta meta = new TableMeta("TEXT", new KeyValueSet());
+    TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("c_custkey", TajoDataTypes.Type.INT4);
@@ -102,6 +109,12 @@
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, CUSTOMER));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.TEXTFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, CUSTOMER);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    //IgnoreKeyTextOutputFormat was deprecated
+    assertEquals(HiveIgnoreKeyTextOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -119,7 +132,7 @@
   public void testTableUsingRCFileWithBinarySerde() throws Exception {
     KeyValueSet options = new KeyValueSet();
     options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
-    TableMeta meta = new TableMeta("RCFILE", options);
+    TableMeta meta = new TableMeta(BuiltinStorages.RCFILE, options);
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
@@ -131,6 +144,11 @@
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, REGION));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.RCFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -148,7 +166,7 @@
   public void testTableUsingRCFileWithTextSerde() throws Exception {
     KeyValueSet options = new KeyValueSet();
     options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
-    TableMeta meta = new TableMeta("RCFILE", options);
+    TableMeta meta = new TableMeta(BuiltinStorages.RCFILE, options);
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
@@ -160,6 +178,11 @@
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, REGION));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.RCFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -177,7 +200,7 @@
     KeyValueSet options = new KeyValueSet();
     options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava("\u0002"));
     options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava("\u0003"));
-    TableMeta meta = new TableMeta("TEXT", options);
+    TableMeta meta = new TableMeta(BuiltinStorages.TEXT, options);
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("s_suppkey", TajoDataTypes.Type.INT4);
@@ -194,6 +217,12 @@
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, SUPPLIER));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.TEXTFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, SUPPLIER);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    //IgnoreKeyTextOutputFormat was deprecated
+    assertEquals(HiveIgnoreKeyTextOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, SUPPLIER));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -332,7 +361,7 @@
 
   @Test
   public void testGetAllTableNames() throws Exception{
-    TableMeta meta = new TableMeta("TEXT", new KeyValueSet());
+    TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("n_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4);
@@ -360,7 +389,7 @@
 
   @Test
   public void testDeleteTable() throws Exception {
-    TableMeta meta = new TableMeta("TEXT", new KeyValueSet());
+    TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("n_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4);
@@ -384,7 +413,7 @@
   public void testTableUsingSequenceFileWithBinarySerde() throws Exception {
     KeyValueSet options = new KeyValueSet();
     options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
-    TableMeta meta = new TableMeta("SEQUENCEFILE", options);
+    TableMeta meta = new TableMeta(BuiltinStorages.SEQUENCE_FILE, options);
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
@@ -396,6 +425,11 @@
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, REGION));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.SEQUENCEFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -413,7 +447,7 @@
   public void testTableUsingSequenceFileWithTextSerde() throws Exception {
     KeyValueSet options = new KeyValueSet();
     options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
-    TableMeta meta = new TableMeta("SEQUENCEFILE", options);
+    TableMeta meta = new TableMeta(BuiltinStorages.SEQUENCE_FILE, options);
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
@@ -425,6 +459,11 @@
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, REGION));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.SEQUENCEFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -457,6 +496,11 @@
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, CUSTOMER));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.PARQUET);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, CUSTOMER);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -472,7 +516,7 @@
   public void testDataTypeCompatibility() throws Exception {
     String tableName = CatalogUtil.normalizeIdentifier("testDataTypeCompatibility");
 
-    TableMeta meta = new TableMeta("TEXT", new KeyValueSet());
+    TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("col1", TajoDataTypes.Type.INT4);
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index ba0c37b..bb053cc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -85,7 +85,7 @@
   public static final String ORC_STRIPE_SIZE = "orc.stripe.size";
   public static final String DEFAULT_ORC_STRIPE_SIZE = "67108864"; // 64MB
 
-  public static final String ORC_COMPRESSION_KIND = "orc.compression.kind";
+  public static final String ORC_COMPRESSION = "orc.compress";
   public static final String ORC_COMPRESSION_KIND_NONE = "none";
   public static final String ORC_COMPRESSION_KIND_SNAPPY = "snappy";
   public static final String ORC_COMPRESSION_KIND_LZO = "lzo";
diff --git a/tajo-dist/src/main/conf/tajo-env.cmd b/tajo-dist/src/main/conf/tajo-env.cmd
index f005430..4040a4a 100644
--- a/tajo-dist/src/main/conf/tajo-env.cmd
+++ b/tajo-dist/src/main/conf/tajo-env.cmd
@@ -68,7 +68,7 @@
 @rem Tajo cluster mode. the default mode is standby mode.

 set TAJO_WORKER_STANDBY_MODE=true

 

-@rem It must be required to use HCatalogStore

+@rem It must be required to use HiveCatalogStore

 @rem set HIVE_HOME=

 @rem set HIVE_JDBC_DRIVER_DIR=

 

diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
index 4544ed3..dbbf5a6 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
@@ -111,7 +111,7 @@
   }
 
   private CompressionKind getCompressionKind() {
-    String kindstr = meta.getOption(StorageConstants.ORC_COMPRESSION_KIND, StorageConstants.DEFAULT_ORC_COMPRESSION_KIND);
+    String kindstr = meta.getOption(StorageConstants.ORC_COMPRESSION, StorageConstants.DEFAULT_ORC_COMPRESSION_KIND);
 
     if (kindstr.equalsIgnoreCase(StorageConstants.ORC_COMPRESSION_KIND_ZIP)) {
       return CompressionKind.ZLIB;