DRILL-7357: Expose Drill Metastore data through information_schema

1. Add additional columns to TABLES and COLUMNS tables.
2. Add PARTITIONS table.
3. General refactoring to adjust information_schema data retrieval from multiple sources.

closes #1860
diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java
index cd619d1..9179c05 100644
--- a/common/src/main/java/org/apache/drill/common/types/Types.java
+++ b/common/src/main/java/org/apache/drill/common/types/Types.java
@@ -93,6 +93,44 @@
     }
   }
 
+  public static boolean isDateTimeType(MajorType type) {
+    if (type.getMode() == REPEATED) {
+      return false;
+    }
+    return isDateTimeType(type.getMinorType());
+  }
+
+  public static boolean isDateTimeType(MinorType type) {
+    switch (type) {
+      case TIME:
+      case TIMETZ:
+      case DATE:
+      case TIMESTAMP:
+      case TIMESTAMPTZ:
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  public static boolean isIntervalType(MajorType type) {
+    if (type.getMode() == REPEATED) {
+      return false;
+    }
+    return isIntervalType(type.getMinorType());
+  }
+
+  public static boolean isIntervalType(MinorType type) {
+    switch (type) {
+      case INTERVAL:
+      case INTERVALDAY:
+      case INTERVALYEAR:
+        return true;
+      default:
+        return false;
+    }
+  }
+
   /**
    * Returns true if specified type is decimal data type.
    *
diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java
index 2ea4932..54506e3 100644
--- a/common/src/test/java/org/apache/drill/test/DrillTest.java
+++ b/common/src/test/java/org/apache/drill/test/DrillTest.java
@@ -23,6 +23,7 @@
 import java.util.List;
 
 import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.common.util.GuavaPatcher;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -39,7 +40,9 @@
 public class DrillTest {
 
   protected static final ObjectMapper objectMapper;
+
   static {
+    GuavaPatcher.patch();
     System.setProperty("line.separator", "\n");
     objectMapper = new ObjectMapper();
   }
@@ -65,12 +68,12 @@
   @Rule public final ExpectedException thrownException = ExpectedException.none();
 
   @BeforeClass
-  public static void initDrillTest() throws Exception {
+  public static void initDrillTest() {
     memWatcher = new MemWatcher();
   }
 
   @AfterClass
-  public static void finiDrillTest() throws InterruptedException{
+  public static void finishDrillTest() {
     testReporter.info(String.format("Test Class done (%s): %s.", memWatcher.getMemString(true), className));
     // Clear interrupts for next test
     Thread.interrupted();
diff --git a/contrib/format-maprdb/pom.xml b/contrib/format-maprdb/pom.xml
index bff7b69..18aae77 100644
--- a/contrib/format-maprdb/pom.xml
+++ b/contrib/format-maprdb/pom.xml
@@ -44,20 +44,20 @@
         <version>${mapr-format-plugin.hbase.version}</version>
         <exclusions>
           <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty</artifactId>
+            <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
           </exclusion>
           <exclusion>
-              <groupId>io.netty</groupId>
-              <artifactId>netty-all</artifactId>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
           </exclusion>
           <exclusion>
-              <groupId>log4j</groupId>
-              <artifactId>log4j</artifactId>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
           </exclusion>
           <exclusion>
-              <groupId>commons-logging</groupId>
-              <artifactId>commons-logging</artifactId>
+            <groupId>commons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
           </exclusion>
 
         </exclusions>
@@ -167,12 +167,12 @@
           <groupId>org.slf4j</groupId>
         </exclusion>
         <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
         </exclusion>
         <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
@@ -221,12 +221,12 @@
       <scope>test</scope>
       <exclusions>
         <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
         </exclusion>
         <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
@@ -242,12 +242,12 @@
           <groupId>org.slf4j</groupId>
         </exclusion>
         <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
         </exclusion>
         <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
@@ -259,12 +259,12 @@
       <scope>test</scope>
       <exclusions>
         <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty</artifactId>
+          <groupId>io.netty</groupId>
+          <artifactId>netty</artifactId>
         </exclusion>
         <exclusion>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-all</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
@@ -286,14 +286,14 @@
       <activation>
         <property><name>!tests</name></property>
       </activation>
+      <properties>
+        <excludedGroups>com.mapr.tests.annotations.ClusterTest</excludedGroups>
+      </properties>
       <build>
         <plugins>
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <excludedGroups>com.mapr.tests.annotations.ClusterTest</excludedGroups>
-            </configuration>
           </plugin>
         </plugins>
       </build>
@@ -304,14 +304,14 @@
       <activation>
         <property><name>tests</name><value>simple</value></property>
       </activation>
+      <properties>
+        <excludedGroups>com.mapr.tests.annotations.ClusterTest</excludedGroups>
+      </properties>
       <build>
         <plugins>
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <excludedGroups>com.mapr.tests.annotations.ClusterTest</excludedGroups>
-            </configuration>
           </plugin>
         </plugins>
       </build>
@@ -322,14 +322,14 @@
       <activation>
         <property><name>tests</name><value>cluster</value></property>
       </activation>
+      <properties>
+        <excludedGroups>com.mapr.tests.annotations.StressTest</excludedGroups>
+      </properties>
       <build>
         <plugins>
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <excludedGroups>com.mapr.tests.annotations.StressTest</excludedGroups>
-            </configuration>
           </plugin>
         </plugins>
       </build>
@@ -340,14 +340,14 @@
       <activation>
         <property><name>tests</name><value>stress</value></property>
       </activation>
+      <properties>
+        <excludedGroups>com.mapr.tests.annotations.IntegrationTest</excludedGroups>
+      </properties>
       <build>
         <plugins>
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <excludedGroups>com.mapr.tests.annotations.IntegrationTest</excludedGroups>
-            </configuration>
           </plugin>
         </plugins>
       </build>
@@ -376,14 +376,14 @@
              This must be the last profile to override the surefire configuration. -->
         <property><name>test</name></property>
       </activation>
+      <properties>
+        <excludedGroups>com.mapr.tests.annotations.AlwaysExclude</excludedGroups>
+      </properties>
       <build>
         <plugins>
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-surefire-plugin</artifactId>
-            <configuration>
-              <excludedGroups>com.mapr.tests.annotations.AlwaysExclude</excludedGroups>
-            </configuration>
           </plugin>
         </plugins>
       </build>
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index ea75fd2..a34a05a 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -238,7 +238,7 @@
 
   @Test // DRILL-4577
   public void showInfoSchema() throws Exception {
-    final String query = "select * \n" +
+    final String query = "select TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE \n" +
         "from INFORMATION_SCHEMA.`TABLES` \n" +
         "where TABLE_SCHEMA like 'hive%'";
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/FilterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/FilterEvaluator.java
new file mode 100644
index 0000000..dc93734
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/FilterEvaluator.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.ischema;
+
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_TABLE_TYPE;
+
+/**
+ * Evaluates information schema for the given condition.
+ * Evaluation can be done with or without filter.
+ */
+public interface FilterEvaluator {
+
+  /**
+   * Visit the catalog. Drill has only one catalog.
+   *
+   * @return whether to continue exploring the contents of the catalog or not.
+   *         Contents are schema/schema tree.
+   */
+  boolean shouldVisitCatalog();
+
+  /**
+   * Visit the given schema.
+   *
+   * @param schemaName name of the schema
+   * @param schema schema object
+   * @return whether to continue exploring the contents of the schema or not.
+   *         Contents are tables within the schema.
+   */
+  boolean shouldVisitSchema(String schemaName, SchemaPlus schema);
+
+  /**
+   * Visit the tables in the given schema.
+   *
+   * @param schemaName name of the schema
+   * @param tableName name of the table
+   * @param tableType type of the table
+   * @return whether to continue exploring the contents of the table or not.
+   *         Contents are tables attributes and columns.
+   */
+  boolean shouldVisitTable(String schemaName, String tableName, Schema.TableType tableType);
+
+  /**
+   * Visit the columns in the given schema and table.
+   *
+   * @param schemaName name of the schema
+   * @param tableName name of the table
+   * @param columnName type of the table
+   * @return whether to continue exploring the contents of the column or not.
+   *         Contents are columns attributes.
+   */
+  boolean shouldVisitColumn(String schemaName, String tableName, String columnName);
+
+  /**
+   * Visit the files in the given schema.
+   *
+   * @param schemaName name of the schema
+   * @param schema schema object
+   * @return whether to continue exploring the files in the schema
+   */
+  boolean shouldVisitFiles(String schemaName, SchemaPlus schema);
+
+
+  /**
+   * Evaluates necessity to visit certain type of information_schema data based
+   * on given schema type.
+   */
+  class NoFilterEvaluator implements FilterEvaluator {
+
+    public static final FilterEvaluator INSTANCE = new NoFilterEvaluator();
+
+    @Override
+    public boolean shouldVisitCatalog() {
+      return true;
+    }
+
+    @Override
+    public boolean shouldVisitSchema(String schemaName, SchemaPlus schema) {
+      if (schemaName == null || schemaName.isEmpty()) {
+        return false;
+      }
+
+      try {
+        AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+        return drillSchema.showInInformationSchema();
+      } catch (ClassCastException e) {
+        return false;
+      }
+    }
+
+    @Override
+    public boolean shouldVisitTable(String schemaName, String tableName, Schema.TableType tableType) {
+      return true;
+    }
+
+    @Override
+    public boolean shouldVisitColumn(String schemaName, String tableName, String columnName) {
+      return true;
+    }
+
+    @Override
+    public boolean shouldVisitFiles(String schemaName, SchemaPlus schema) {
+      try {
+        AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+        return drillSchema instanceof WorkspaceSchemaFactory.WorkspaceSchema;
+      } catch (ClassCastException e) {
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Evaluates necessity to visit certain type of information_schema data using provided filter.
+   */
+  class InfoSchemaFilterEvaluator extends NoFilterEvaluator {
+
+    private final InfoSchemaFilter filter;
+
+    public InfoSchemaFilterEvaluator(InfoSchemaFilter filter) {
+      this.filter = filter;
+    }
+
+    @Override
+    public boolean shouldVisitCatalog() {
+      Map<String, String> recordValues = ImmutableMap.of(CATS_COL_CATALOG_NAME, IS_CATALOG_NAME);
+
+      return filter.evaluate(recordValues) != InfoSchemaFilter.Result.FALSE;
+    }
+
+    @Override
+    public boolean shouldVisitSchema(String schemaName, SchemaPlus schema) {
+      if (!super.shouldVisitSchema(schemaName, schema)) {
+        return false;
+      }
+
+      Map<String, String> recordValues = ImmutableMap.of(
+        CATS_COL_CATALOG_NAME, IS_CATALOG_NAME,
+        SHRD_COL_TABLE_SCHEMA, schemaName,
+        SCHS_COL_SCHEMA_NAME, schemaName);
+
+      return filter.evaluate(recordValues) != InfoSchemaFilter.Result.FALSE;
+    }
+
+    @Override
+    public boolean shouldVisitTable(String schemaName, String tableName, Schema.TableType tableType) {
+      Map<String, String> recordValues = ImmutableMap.of(
+        CATS_COL_CATALOG_NAME, IS_CATALOG_NAME,
+        SHRD_COL_TABLE_SCHEMA, schemaName,
+        SCHS_COL_SCHEMA_NAME, schemaName,
+        SHRD_COL_TABLE_NAME, tableName,
+        TBLS_COL_TABLE_TYPE, tableType.toString());
+
+      return filter.evaluate(recordValues) != InfoSchemaFilter.Result.FALSE;
+    }
+
+    @Override
+    public boolean shouldVisitColumn(String schemaName, String tableName, String columnName) {
+      Map<String, String> recordValues = ImmutableMap.of(
+        CATS_COL_CATALOG_NAME, IS_CATALOG_NAME,
+        SHRD_COL_TABLE_SCHEMA, schemaName,
+        SCHS_COL_SCHEMA_NAME, schemaName,
+        SHRD_COL_TABLE_NAME, tableName,
+        COLS_COL_COLUMN_NAME, columnName);
+
+      return filter.evaluate(recordValues) != InfoSchemaFilter.Result.FALSE;
+    }
+
+    @Override
+    public boolean shouldVisitFiles(String schemaName, SchemaPlus schema) {
+      if (!super.shouldVisitFiles(schemaName, schema)) {
+        return false;
+      }
+
+      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+      WorkspaceSchemaFactory.WorkspaceSchema wsSchema = (WorkspaceSchemaFactory.WorkspaceSchema) drillSchema;
+
+      Map<String, String> recordValues = ImmutableMap.of(
+        FILES_COL_SCHEMA_NAME, schemaName,
+        FILES_COL_ROOT_SCHEMA_NAME, wsSchema.getSchemaPath().get(0),
+        FILES_COL_WORKSPACE_NAME, wsSchema.getName());
+
+      return filter.evaluate(recordValues) != InfoSchemaFilter.Result.FALSE;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index d116d54..400fb61 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -17,9 +17,6 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.ExecutorFragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -27,11 +24,16 @@
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.RecordReader;
 
+import java.util.Collections;
+import java.util.List;
+
 public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan> {
+
   @Override
   public ScanBatch getBatch(ExecutorFragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
       throws ExecutionSetupException {
-    RecordReader rr = config.getTable().getRecordReader(context.getFullRootSchema(), config.getFilter(), context.getOptions());
+    RecordReader rr = config.getTable().getRecordReader(context.getFullRootSchema(), config.getFilter(),
+      context.getOptions(), context.getMetastoreRegistry());
     return new ScanBatch(config, context, Collections.singletonList(rr));
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
index 317e608..035787e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConfig.java
@@ -34,5 +34,4 @@
   public boolean equals(Object o){
     return o instanceof InfoSchemaConfig;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
index 4c07f30..f4370f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaConstants.java
@@ -19,87 +19,97 @@
 
 public interface InfoSchemaConstants {
 
-  /** Name of catalog containing information schema. */
+  /**
+   * Name of catalog containing information schema.
+   */
   String IS_CATALOG_NAME = "DRILL";
 
-  /** Catalog description */
+  /**
+   * Catalog description
+   */
   String IS_CATALOG_DESCRIPTION = "The internal metadata used by Drill";
 
-  /** Catalog connect string. Currently empty */
-   String IS_CATALOG_CONNECT = "";
+  /**
+   * Catalog connect string. Currently empty
+   */
+  String IS_CATALOG_CONNECT = "";
 
-  /** Name of information schema. */
-   String IS_SCHEMA_NAME = "information_schema";
+  /**
+   * Name of information schema.
+   */
+  String IS_SCHEMA_NAME = "information_schema";
 
   // CATALOGS column names:
-   String CATS_COL_CATALOG_CONNECT = "CATALOG_CONNECT";
-   String CATS_COL_CATALOG_DESCRIPTION = "CATALOG_DESCRIPTION";
-   String CATS_COL_CATALOG_NAME = "CATALOG_NAME";
+  String CATS_COL_CATALOG_CONNECT = "CATALOG_CONNECT";
+  String CATS_COL_CATALOG_DESCRIPTION = "CATALOG_DESCRIPTION";
+  String CATS_COL_CATALOG_NAME = "CATALOG_NAME";
 
   // SCHEMATA column names:
-   String SCHS_COL_CATALOG_NAME = "CATALOG_NAME";
-   String SCHS_COL_SCHEMA_NAME = "SCHEMA_NAME";
-   String SCHS_COL_SCHEMA_OWNER = "SCHEMA_OWNER";
-   String SCHS_COL_TYPE = "TYPE";
-   String SCHS_COL_IS_MUTABLE = "IS_MUTABLE";
+  String SCHS_COL_CATALOG_NAME = "CATALOG_NAME";
+  String SCHS_COL_SCHEMA_NAME = "SCHEMA_NAME";
+  String SCHS_COL_SCHEMA_OWNER = "SCHEMA_OWNER";
+  String SCHS_COL_TYPE = "TYPE";
+  String SCHS_COL_IS_MUTABLE = "IS_MUTABLE";
 
   // Common TABLES / VIEWS / COLUMNS columns names:
-   String SHRD_COL_TABLE_CATALOG = "TABLE_CATALOG";
-   String SHRD_COL_TABLE_SCHEMA = "TABLE_SCHEMA";
-   String SHRD_COL_TABLE_NAME = "TABLE_NAME";
+  String SHRD_COL_TABLE_CATALOG = "TABLE_CATALOG";
+  String SHRD_COL_TABLE_SCHEMA = "TABLE_SCHEMA";
+  String SHRD_COL_TABLE_NAME = "TABLE_NAME";
 
   // Remaining TABLES column names:
-   String TBLS_COL_TABLE_TYPE = "TABLE_TYPE";
+  String TBLS_COL_TABLE_TYPE = "TABLE_TYPE";
+  String TBLS_COL_TABLE_SOURCE = "TABLE_SOURCE";
+  String TBLS_COL_LOCATION = "LOCATION";
+  String TBLS_COL_NUM_ROWS = "NUM_ROWS";
+  String TBLS_COL_LAST_MODIFIED_TIME = "LAST_MODIFIED_TIME";
 
   // Remaining VIEWS column names:
-   String VIEWS_COL_VIEW_DEFINITION = "VIEW_DEFINITION";
-
-  // COLUMNS columns, from SQL standard:
-  // 1. TABLE_CATALOG
-  // 2. TABLE_SCHEMA
-  // 3. TABLE_NAME
-  // 4. COLUMN_NAME
-  // 5. ORDINAL_POSITION
-  // 6. COLUMN_DEFAULT
-  // 7. IS_NULLABLE
-  // 8. DATA_TYPE
-  // 9. CHARACTER_MAXIMUM_LENGTH
-  // 10. CHARACTER_OCTET_LENGTH
-  // 11. NUMERIC_PRECISION
-  // 12. NUMERIC_PRECISION_RADIX
-  // 13. NUMERIC_SCALE
-  // 14. DATETIME_PRECISION
-  // 15. INTERVAL_TYPE
-  // 16. INTERVAL_PRECISION
-  // 17. CHARACTER_SET_CATALOG ...
+  String VIEWS_COL_VIEW_DEFINITION = "VIEW_DEFINITION";
 
   // Remaining COLUMNS column names:
-   String COLS_COL_COLUMN_NAME = "COLUMN_NAME";
-   String COLS_COL_ORDINAL_POSITION = "ORDINAL_POSITION";
-   String COLS_COL_COLUMN_DEFAULT = "COLUMN_DEFAULT";
-   String COLS_COL_IS_NULLABLE = "IS_NULLABLE";
-   String COLS_COL_DATA_TYPE = "DATA_TYPE";
-   String COLS_COL_CHARACTER_MAXIMUM_LENGTH = "CHARACTER_MAXIMUM_LENGTH";
-   String COLS_COL_CHARACTER_OCTET_LENGTH = "CHARACTER_OCTET_LENGTH";
-   String COLS_COL_NUMERIC_PRECISION = "NUMERIC_PRECISION";
-   String COLS_COL_NUMERIC_PRECISION_RADIX = "NUMERIC_PRECISION_RADIX";
-   String COLS_COL_NUMERIC_SCALE = "NUMERIC_SCALE";
-   String COLS_COL_DATETIME_PRECISION = "DATETIME_PRECISION";
-   String COLS_COL_INTERVAL_TYPE = "INTERVAL_TYPE";
-   String COLS_COL_INTERVAL_PRECISION = "INTERVAL_PRECISION";
+  String COLS_COL_COLUMN_NAME = "COLUMN_NAME";
+  String COLS_COL_ORDINAL_POSITION = "ORDINAL_POSITION";
+  String COLS_COL_COLUMN_DEFAULT = "COLUMN_DEFAULT";
+  String COLS_COL_IS_NULLABLE = "IS_NULLABLE";
+  String COLS_COL_DATA_TYPE = "DATA_TYPE";
+  String COLS_COL_COLUMN_SIZE = "COLUMN_SIZE";
+  String COLS_COL_CHARACTER_MAXIMUM_LENGTH = "CHARACTER_MAXIMUM_LENGTH";
+  String COLS_COL_CHARACTER_OCTET_LENGTH = "CHARACTER_OCTET_LENGTH";
+  String COLS_COL_NUMERIC_PRECISION = "NUMERIC_PRECISION";
+  String COLS_COL_NUMERIC_PRECISION_RADIX = "NUMERIC_PRECISION_RADIX";
+  String COLS_COL_NUMERIC_SCALE = "NUMERIC_SCALE";
+  String COLS_COL_DATETIME_PRECISION = "DATETIME_PRECISION";
+  String COLS_COL_INTERVAL_TYPE = "INTERVAL_TYPE";
+  String COLS_COL_INTERVAL_PRECISION = "INTERVAL_PRECISION";
+  String COLS_COL_COLUMN_FORMAT = "COLUMN_FORMAT";
+  String COLS_COL_NUM_NULLS = "NUM_NULLS";
+  String COLS_COL_MIN_VAL = "MIN_VAL";
+  String COLS_COL_MAX_VAL = "MAX_VAL";
+  String COLS_COL_NDV = "NDV";
+  String COLS_COL_EST_NUM_NON_NULLS = "EST_NUM_NON_NULLS";
+  String COLS_COL_IS_NESTED = "IS_NESTED";
 
   // FILES column names:
-   String FILES_COL_SCHEMA_NAME = SCHS_COL_SCHEMA_NAME;
-   String FILES_COL_ROOT_SCHEMA_NAME = "ROOT_SCHEMA_NAME";
-   String FILES_COL_WORKSPACE_NAME = "WORKSPACE_NAME";
-   String FILES_COL_FILE_NAME = "FILE_NAME";
-   String FILES_COL_RELATIVE_PATH = "RELATIVE_PATH";
-   String FILES_COL_IS_DIRECTORY = "IS_DIRECTORY";
-   String FILES_COL_IS_FILE = "IS_FILE";
-   String FILES_COL_LENGTH = "LENGTH";
-   String FILES_COL_OWNER = "OWNER";
-   String FILES_COL_GROUP = "GROUP";
-   String FILES_COL_PERMISSION = "PERMISSION";
-   String FILES_COL_ACCESS_TIME = "ACCESS_TIME";
-   String FILES_COL_MODIFICATION_TIME = "MODIFICATION_TIME";
+  String FILES_COL_SCHEMA_NAME = SCHS_COL_SCHEMA_NAME;
+  String FILES_COL_ROOT_SCHEMA_NAME = "ROOT_SCHEMA_NAME";
+  String FILES_COL_WORKSPACE_NAME = "WORKSPACE_NAME";
+  String FILES_COL_FILE_NAME = "FILE_NAME";
+  String FILES_COL_RELATIVE_PATH = "RELATIVE_PATH";
+  String FILES_COL_IS_DIRECTORY = "IS_DIRECTORY";
+  String FILES_COL_IS_FILE = "IS_FILE";
+  String FILES_COL_LENGTH = "LENGTH";
+  String FILES_COL_OWNER = "OWNER";
+  String FILES_COL_GROUP = "GROUP";
+  String FILES_COL_PERMISSION = "PERMISSION";
+  String FILES_COL_ACCESS_TIME = "ACCESS_TIME";
+  String FILES_COL_MODIFICATION_TIME = "MODIFICATION_TIME";
+
+  // Remaining PARTITIONS column names:
+  String PARTITIONS_COL_METADATA_KEY = "METADATA_KEY";
+  String PARTITIONS_COL_METADATA_TYPE = "METADATA_TYPE";
+  String PARTITIONS_COL_METADATA_IDENTIFIER = "METADATA_IDENTIFIER";
+  String PARTITIONS_COL_PARTITION_COLUMN = "PARTITION_COLUMN";
+  String PARTITIONS_COL_PARTITION_VALUE = "PARTITION_VALUE";
+  String PARTITIONS_COL_LOCATION = "LOCATION";
+  String PARTITIONS_COL_LAST_MODIFIED_TIME = "LAST_MODIFIED_TIME";
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaDrillTable.java
index 5aa31a7..bf84345 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaDrillTable.java
@@ -25,7 +25,6 @@
 import org.apache.drill.exec.util.ImpersonationUtil;
 
 public class InfoSchemaDrillTable extends DrillTable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaDrillTable.class);
 
   private final InfoSchemaTableType table;
 
@@ -38,5 +37,4 @@
   public RelDataType getRowType(RelDataTypeFactory typeFactory) {
     return table.getRowType(typeFactory);
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java
index 95d1336..25bbc7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilter.java
@@ -17,19 +17,18 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-import static org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.store.ischema.InfoSchemaFilter.ExprNode.Type;
+import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
 
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 
-import org.apache.drill.exec.store.ischema.InfoSchemaFilter.ExprNode.Type;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
+import static org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike;
 
 @JsonTypeName("info-schema-filter")
 public class InfoSchemaFilter {
@@ -121,13 +120,14 @@
   public enum Result {
     TRUE,
     FALSE,
-    INCONCLUSIVE;
+    INCONCLUSIVE
   }
 
   /**
    * Evaluate the filter for given <COLUMN NAME, VALUE> pairs.
-   * @param recordValues
-   * @return
+   *
+   * @param recordValues map of field names and their values
+   * @return evaluation result
    */
   @JsonIgnore
   public Result evaluate(Map<String, String> recordValues) {
@@ -144,12 +144,12 @@
   }
 
   private Result evaluateHelperFunction(Map<String, String> recordValues, FunctionExprNode exprNode) {
-    switch(exprNode.function) {
+    switch (exprNode.function) {
       case "like": {
         FieldExprNode col = (FieldExprNode) exprNode.args.get(0);
         ConstantExprNode pattern = (ConstantExprNode) exprNode.args.get(1);
         ConstantExprNode escape = exprNode.args.size() > 2 ? (ConstantExprNode) exprNode.args.get(2) : null;
-        final String fieldValue = recordValues.get(col.field.toString());
+        final String fieldValue = recordValues.get(col.field);
         if (fieldValue != null) {
           if (escape == null) {
             return Pattern.matches(sqlToRegexLike(pattern.value).getJavaPatternString(), fieldValue) ?
@@ -169,7 +169,7 @@
         FieldExprNode arg0 = (FieldExprNode) exprNode.args.get(0);
         ConstantExprNode arg1 = (ConstantExprNode) exprNode.args.get(1);
 
-        final String value = recordValues.get(arg0.field.toString());
+        final String value = recordValues.get(arg0.field);
         if (value != null) {
           if (exprNode.function.equals("equal")) {
             return arg1.value.equals(value) ? Result.TRUE : Result.FALSE;
@@ -220,7 +220,7 @@
       case "in": {
         FieldExprNode col = (FieldExprNode) exprNode.args.get(0);
         List<ExprNode> args = exprNode.args.subList(1, exprNode.args.size());
-        final String fieldValue = recordValues.get(col.field.toString());
+        final String fieldValue = recordValues.get(col.field);
         if (fieldValue != null) {
           for(ExprNode arg: args) {
             if (fieldValue.equals(((ConstantExprNode) arg).value)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
index d9ae339..7624788 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaFilterBuilder.java
@@ -17,16 +17,6 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
-
 import org.apache.drill.common.expression.BooleanOperator;
 import org.apache.drill.common.expression.CastExpression;
 import org.apache.drill.common.expression.FieldReference;
@@ -39,17 +29,27 @@
 import org.apache.drill.exec.store.ischema.InfoSchemaFilter.ExprNode;
 import org.apache.drill.exec.store.ischema.InfoSchemaFilter.FieldExprNode;
 import org.apache.drill.exec.store.ischema.InfoSchemaFilter.FunctionExprNode;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 
 import java.util.List;
 
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
+
 /**
  * Builds a InfoSchemaFilter out of the Filter condition. Currently we look only for certain conditions. Mainly
  * conditions involving columns "CATALOG_NAME, "TABLE_NAME", "SCHEMA_NAME", "TABLE_SCHEMA" and "COLUMN_NAME", and
  * functions EQUAL, NOT EQUAL, LIKE, OR and AND.
  */
 public class InfoSchemaFilterBuilder extends AbstractExprVisitor<ExprNode, Void, RuntimeException> {
-  private final LogicalExpression filter;
 
+  private final LogicalExpression filter;
   private boolean isAllExpressionsConverted = true;
 
   public InfoSchemaFilterBuilder(LogicalExpression filter) {
@@ -72,7 +72,7 @@
   @Override
   public ExprNode visitFunctionCall(FunctionCall call, Void value) throws RuntimeException {
     final String funcName = call.getName().toLowerCase();
-    switch(funcName) {
+    switch (funcName) {
       case "equal":
       case "not equal":
       case "notequal":
@@ -103,7 +103,7 @@
         List<ExprNode> args = Lists.newArrayList();
         for(LogicalExpression arg : call.args) {
           ExprNode exprNode = arg.accept(this, value);
-          if (exprNode != null && exprNode instanceof FunctionExprNode) {
+          if (exprNode instanceof FunctionExprNode) {
             args.add(exprNode);
           }
         }
@@ -118,7 +118,7 @@
         List<ExprNode> args = Lists.newArrayList();
         for(LogicalExpression arg : call.args) {
           ExprNode exprNode = arg.accept(this, value);
-          if (exprNode != null && exprNode instanceof FunctionExprNode) {
+          if (exprNode instanceof FunctionExprNode) {
             args.add(exprNode);
           } else {
             return visitUnknown(call, value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
index c18b7f3..7b44a4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
@@ -17,12 +17,11 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-import java.util.List;
-
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -30,15 +29,12 @@
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.base.SubScan;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
+import java.util.List;
+
 @JsonTypeName("info-schema")
-public class InfoSchemaGroupScan extends AbstractGroupScan{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaGroupScan.class);
+public class InfoSchemaGroupScan extends AbstractGroupScan {
 
   private final InfoSchemaTableType table;
   private final InfoSchemaFilter filter;
@@ -52,7 +48,7 @@
   @JsonCreator
   public InfoSchemaGroupScan(@JsonProperty("table") InfoSchemaTableType table,
                              @JsonProperty("filter") InfoSchemaFilter filter) {
-    super((String)null);
+    super((String) null);
     this.table = table;
     this.filter = filter;
   }
@@ -75,12 +71,12 @@
   }
 
   @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
     Preconditions.checkArgument(endpoints.size() == 1);
   }
 
   @Override
-  public SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException {
+  public SubScan getSpecificScan(int minorFragmentId) {
     Preconditions.checkArgument(minorFragmentId == 0);
     return new InfoSchemaSubScan(table, filter);
   }
@@ -101,8 +97,8 @@
   }
 
   @Override
-  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
-    return new InfoSchemaGroupScan (this);
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    return new InfoSchemaGroupScan(this);
   }
 
   @Override
@@ -112,8 +108,7 @@
 
   @Override
   public GroupScan clone(List<SchemaPath> columns) {
-    InfoSchemaGroupScan  newScan = new InfoSchemaGroupScan (this);
-    return newScan;
+    return new InfoSchemaGroupScan(this);
   }
 
   public void setFilterPushedDown(boolean status) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java
index 44d2394..2ce0cc1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaPushFilterIntoRecordGenerator.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec.store.ischema;
 
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
@@ -27,11 +31,6 @@
 import org.apache.drill.exec.planner.physical.ProjectPrel;
 import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptRuleOperand;
-import org.apache.calcite.rex.RexNode;
-
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
 public abstract class InfoSchemaPushFilterIntoRecordGenerator extends StoragePluginOptimizerRule {
@@ -43,15 +42,15 @@
 
         @Override
         public boolean matches(RelOptRuleCall call) {
-          final ScanPrel scan = (ScanPrel) call.rel(2);
+          final ScanPrel scan = call.rel(2);
           GroupScan groupScan = scan.getGroupScan();
           return groupScan instanceof InfoSchemaGroupScan;
         }
 
         @Override
         public void onMatch(RelOptRuleCall call) {
-          final FilterPrel filterRel = (FilterPrel) call.rel(0);
-          final ProjectPrel projectRel = (ProjectPrel) call.rel(1);
+          final FilterPrel filterRel = call.rel(0);
+          final ProjectPrel projectRel = call.rel(1);
           final ScanPrel scanRel = call.rel(2);
           doMatch(call, scanRel, projectRel, filterRel);
         }
@@ -63,15 +62,15 @@
 
         @Override
         public boolean matches(RelOptRuleCall call) {
-          final ScanPrel scan = (ScanPrel) call.rel(1);
+          final ScanPrel scan = call.rel(1);
           GroupScan groupScan = scan.getGroupScan();
           return groupScan instanceof InfoSchemaGroupScan;
         }
 
         @Override
         public void onMatch(RelOptRuleCall call) {
-          final FilterPrel filterRel = (FilterPrel) call.rel(0);
-          final ScanPrel scanRel = (ScanPrel) call.rel(1);
+          final FilterPrel filterRel = call.rel(0);
+          final ScanPrel scanRel = call.rel(1);
           doMatch(call, scanRel, null, filterRel);
         }
       };
@@ -83,7 +82,7 @@
   protected void doMatch(RelOptRuleCall call, ScanPrel scan, ProjectPrel project, FilterPrel filter) {
     final RexNode condition = filter.getCondition();
 
-    InfoSchemaGroupScan groupScan = (InfoSchemaGroupScan)scan.getGroupScan();
+    InfoSchemaGroupScan groupScan = (InfoSchemaGroupScan) scan.getGroupScan();
     if (groupScan.isFilterPushedDown()) {
       return;
     }
@@ -93,7 +92,7 @@
     InfoSchemaFilterBuilder filterBuilder = new InfoSchemaFilterBuilder(conditionExp);
     InfoSchemaFilter infoSchemaFilter = filterBuilder.build();
     if (infoSchemaFilter == null) {
-      return; //no filter pushdown ==> No transformation.
+      return; //no filter push down ==> No transformation.
     }
 
     final InfoSchemaGroupScan newGroupsScan = new InfoSchemaGroupScan(groupScan.getTable(), infoSchemaFilter);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
index 2197f1a..70e110a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaRecordGenerator.java
@@ -17,220 +17,37 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-import static org.apache.drill.exec.planner.types.DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_SCHEMA_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_CONNECT;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCRIPTION;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_TABLE_TYPE;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.schema.Schema.TableType;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Table;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.planner.logical.DrillViewInfoProvider;
-import org.apache.drill.exec.server.options.OptionManager;
-import org.apache.drill.exec.store.AbstractSchema;
-import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
-import org.apache.drill.exec.store.ischema.InfoSchemaFilter.Result;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.exec.util.FileSystemUtil;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
- * Generates records for POJO RecordReader by scanning the given schema. At every level (catalog, schema, table, field),
+ * Generates records for POJO RecordReader by scanning the given schema. At every level (catalog, schema, table, field, partition, file),
  * level specific object is visited and decision is taken to visit the contents of the object. Object here is catalog,
- * schema, table or field.
+ * schema, table, field, partition, file.
  */
 public abstract class InfoSchemaRecordGenerator<S> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaRecordGenerator.class);
-  protected InfoSchemaFilter filter;
 
-  protected OptionManager optionManager;
-  public InfoSchemaRecordGenerator(OptionManager optionManager) {
-    this.optionManager = optionManager;
+  protected List<S> records = new ArrayList<>();
+
+  private final FilterEvaluator filterEvaluator;
+  private final List<RecordCollector> recordCollectors = new ArrayList<>();
+
+  public InfoSchemaRecordGenerator(FilterEvaluator filterEvaluator) {
+    this.filterEvaluator = filterEvaluator;
   }
 
-  public void setInfoSchemaFilter(InfoSchemaFilter filter) {
-    this.filter = filter;
+  public void registerRecordCollector(RecordCollector recordCollector) {
+    recordCollectors.add(recordCollector);
   }
 
-  /**
-   * Visit the catalog. Drill has only one catalog.
-   *
-   * @return Whether to continue exploring the contents of the catalog or not. Contents are schema/schema tree.
-   */
-  public boolean visitCatalog() {
-    return true;
-  }
-
-  /**
-   * Visit the given schema.
-   *
-   * @param schemaName Name of the schema
-   * @param schema Schema object
-   * @return Whether to continue exploring the contents of the schema or not. Contents are tables within the schema.
-   */
-  public boolean visitSchema(String schemaName, SchemaPlus schema) {
-    return true;
-  }
-
-  /**
-   * Visit the given table.
-   *
-   * @param schemaName Name of the schema where the table is present
-   * @param tableName Name of the table
-   * @param table Table object
-   * @return Whether to continue exploring the contents of the table or not. Contents are fields within the table.
-   */
-  public boolean visitTable(String schemaName, String tableName, Table table) {
-    return true;
-  }
-
-  /**
-   * Visit the given field.
-   *
-   * @param schemaName Schema where the table of the field is present
-   * @param tableName Table name
-   * @param field Field object
-   */
-  public void visitField(String schemaName, String tableName, RelDataTypeField field) {
-  }
-
-  public void visitFiles(String schemaName, SchemaPlus schema) {
-  }
-
-  protected boolean shouldVisitCatalog() {
-    if (filter == null) {
-      return true;
-    }
-
-    final Map<String, String> recordValues = ImmutableMap.of(CATS_COL_CATALOG_NAME, IS_CATALOG_NAME);
-
-    // If the filter evaluates to false then we don't need to visit the catalog.
-    // For other two results (TRUE, INCONCLUSIVE) continue to visit the catalog.
-    return filter.evaluate(recordValues) != Result.FALSE;
-  }
-
-  protected boolean shouldVisitSchema(String schemaName, SchemaPlus schema) {
-    try {
-      // if the schema path is null or empty (try for root schema)
-      if (schemaName == null || schemaName.isEmpty()) {
-        return false;
-      }
-
-      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
-      if (!drillSchema.showInInformationSchema()) {
-        return false;
-      }
-
-      if (filter == null) {
-        return true;
-      }
-
-      final Map<String, String> recordValues =
-          ImmutableMap.of(
-              CATS_COL_CATALOG_NAME, IS_CATALOG_NAME,
-              SHRD_COL_TABLE_SCHEMA, schemaName,
-              SCHS_COL_SCHEMA_NAME, schemaName);
-
-      // If the filter evaluates to false then we don't need to visit the schema.
-      // For other two results (TRUE, INCONCLUSIVE) continue to visit the schema.
-      return filter.evaluate(recordValues) != Result.FALSE;
-    } catch(ClassCastException e) {
-      // ignore and return true as this is not a Drill schema
-    }
-    return true;
-  }
-
-  protected boolean shouldVisitTable(String schemaName, String tableName, TableType tableType) {
-    if (filter == null) {
-      return true;
-    }
-
-    final Map<String, String> recordValues =
-        ImmutableMap.of(
-            CATS_COL_CATALOG_NAME, IS_CATALOG_NAME,
-            SHRD_COL_TABLE_SCHEMA, schemaName,
-            SCHS_COL_SCHEMA_NAME, schemaName,
-            SHRD_COL_TABLE_NAME, tableName,
-            TBLS_COL_TABLE_TYPE, tableType.toString());
-
-    // If the filter evaluates to false then we don't need to visit the table.
-    // For other two results (TRUE, INCONCLUSIVE) continue to visit the table.
-    return filter.evaluate(recordValues) != Result.FALSE;
-  }
-
-  protected boolean shouldVisitColumn(String schemaName, String tableName, String columnName) {
-    if (filter == null) {
-      return true;
-    }
-
-    final Map<String, String> recordValues =
-        ImmutableMap.of(
-            CATS_COL_CATALOG_NAME, IS_CATALOG_NAME,
-            SHRD_COL_TABLE_SCHEMA, schemaName,
-            SCHS_COL_SCHEMA_NAME, schemaName,
-            SHRD_COL_TABLE_NAME, tableName,
-            COLS_COL_COLUMN_NAME, columnName);
-
-    // If the filter evaluates to false then we don't need to visit the column.
-    // For other two results (TRUE, INCONCLUSIVE) continue to visit the column.
-    return filter.evaluate(recordValues) != Result.FALSE;
-  }
-
-  protected boolean shouldVisitFiles(String schemaName, SchemaPlus schemaPlus) {
-    if (filter == null) {
-      return true;
-    }
-
-    AbstractSchema schema;
-    try {
-      schema = schemaPlus.unwrap(AbstractSchema.class);
-    } catch (ClassCastException e) {
-      return false;
-    }
-
-    if (!(schema instanceof WorkspaceSchemaFactory.WorkspaceSchema)) {
-      return false;
-    }
-
-    WorkspaceSchemaFactory.WorkspaceSchema wsSchema = (WorkspaceSchemaFactory.WorkspaceSchema) schema;
-
-    Map<String, String> recordValues = new HashMap<>();
-    recordValues.put(FILES_COL_SCHEMA_NAME, schemaName);
-    recordValues.put(FILES_COL_ROOT_SCHEMA_NAME, wsSchema.getSchemaPath().get(0));
-    recordValues.put(FILES_COL_WORKSPACE_NAME, wsSchema.getName());
-
-    return filter.evaluate(recordValues) != Result.FALSE;
-  }
-
-  public abstract PojoRecordReader<S> getRecordReader();
-
   public void scanSchema(SchemaPlus root) {
-    if (shouldVisitCatalog() && visitCatalog()) {
+    records = new ArrayList<>(); // reset on new scan
+    if (filterEvaluator.shouldVisitCatalog()) {
       scanSchema(root.getName(), root);
     }
   }
@@ -240,54 +57,34 @@
    * @param  schemaPath  the path to the given schema, so far
    * @param  schema  the given schema
    */
-  private void scanSchema(String schemaPath, SchemaPlus schema) {
-
-    // Recursively scan any subschema.
+  protected void scanSchema(String schemaPath, SchemaPlus schema) {
+    // Recursively scan any sub-schema
     for (String name: schema.getSubSchemaNames()) {
       scanSchema(schemaPath +
-          ("".equals(schemaPath) ? "" : ".") + // If we have an empty schema path, then don't insert a leading dot.
-          name, schema.getSubSchema(name));
+        ("".equals(schemaPath) ? "" : ".") + // If we have an empty schema path, then don't insert a leading dot.
+        name, schema.getSubSchema(name));
     }
 
-    // Visit this schema and if requested ...
-    if (shouldVisitSchema(schemaPath, schema) && visitSchema(schemaPath, schema)) {
-      visitTables(schemaPath, schema);
-    }
-
-    if (shouldVisitFiles(schemaPath, schema)) {
-      visitFiles(schemaPath, schema);
+    if (filterEvaluator.shouldVisitSchema(schemaPath, schema)) {
+      visit(schemaPath, schema);
     }
   }
 
-  /**
-   * Visit the tables in the given schema. The
-   * @param  schemaPath  the path to the given schema
-   * @param  schema  the given schema
-   */
-  public void visitTables(String schemaPath, SchemaPlus schema) {
-    final AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
-    for (Pair<String, ? extends Table> tableNameToTable : drillSchema.getTablesByNames(schema.getTableNames())) {
-      final String tableName = tableNameToTable.getKey();
-      final Table table = tableNameToTable.getValue();
-      final TableType tableType = table.getJdbcTableType();
-      // Visit the table, and if requested ...
-      if(shouldVisitTable(schemaPath, tableName, tableType) && visitTable(schemaPath, tableName, table)) {
-        // ... do for each of the table's fields.
-        final RelDataType tableRow = table.getRowType(new JavaTypeFactoryImpl(DRILL_REL_DATATYPE_SYSTEM));
-        for (RelDataTypeField field: tableRow.getFieldList()) {
-          if (shouldVisitColumn(schemaPath, tableName, field.getName())) {
-            visitField(schemaPath, tableName, field);
-          }
-        }
-      }
-    }
+  protected final void visit(String schemaPath, SchemaPlus schema) {
+    records.addAll(recordCollectors.parallelStream()
+      .map(recordCollector -> collect(recordCollector, schemaPath, schema))
+      .flatMap(Collection::stream)
+      .collect(Collectors.toList()));
   }
 
+  public abstract PojoRecordReader<S> getRecordReader();
+
+  protected abstract List<S> collect(RecordCollector recordCollector, String schemaPath, SchemaPlus schema);
+
   public static class Catalogs extends InfoSchemaRecordGenerator<Records.Catalog> {
-    List<Records.Catalog> records = ImmutableList.of();
 
-    public Catalogs(OptionManager optionManager) {
-      super(optionManager);
+    public Catalogs(FilterEvaluator filterEvaluator) {
+      super(filterEvaluator);
     }
 
     @Override
@@ -296,17 +93,20 @@
     }
 
     @Override
-    public boolean visitCatalog() {
-      records = ImmutableList.of(new Records.Catalog(IS_CATALOG_NAME, IS_CATALOG_DESCRIPTION, IS_CATALOG_CONNECT));
-      return false;
+    protected List<Records.Catalog> collect(RecordCollector recordCollector, String schemaPath, SchemaPlus schema) {
+      return recordCollector.catalogs(schemaPath, schema);
+    }
+
+    @Override
+    protected void scanSchema(String schemaPath, SchemaPlus schema) {
+      visit(schemaPath, schema);
     }
   }
 
   public static class Schemata extends InfoSchemaRecordGenerator<Records.Schema> {
-    List<Records.Schema> records = Lists.newArrayList();
 
-    public Schemata(OptionManager optionManager) {
-      super(optionManager);
+    public Schemata(FilterEvaluator filterEvaluator) {
+      super(filterEvaluator);
     }
 
     @Override
@@ -315,19 +115,15 @@
     }
 
     @Override
-    public boolean visitSchema(String schemaName, SchemaPlus schema) {
-      AbstractSchema as = schema.unwrap(AbstractSchema.class);
-      records.add(new Records.Schema(IS_CATALOG_NAME, schemaName, "<owner>",
-                                     as.getTypeName(), as.isMutable()));
-      return false;
+    protected List<Records.Schema> collect(RecordCollector recordCollector, String schemaPath, SchemaPlus schema) {
+      return recordCollector.schemas(schemaPath, schema);
     }
   }
 
   public static class Tables extends InfoSchemaRecordGenerator<Records.Table> {
-    List<Records.Table> records = Lists.newArrayList();
 
-    public Tables(OptionManager optionManager) {
-      super(optionManager);
+    public Tables(FilterEvaluator filterEvaluator) {
+      super(filterEvaluator);
     }
 
     @Override
@@ -336,38 +132,15 @@
     }
 
     @Override
-    public void visitTables(String schemaPath, SchemaPlus schema) {
-      schema.unwrap(AbstractSchema.class).getTableNamesAndTypes()
-          .forEach(nameAndType -> attemptVisitTableWithType(schemaPath, nameAndType.getKey(), nameAndType.getValue()));
-    }
-
-    private void attemptVisitTableWithType(final String schemaName, final String tableName,
-                                           final TableType type) {
-      // Visit the table if requested ...
-      if (shouldVisitTable(schemaName, tableName, type)) {
-        records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName, type.toString()));
-      }
-    }
-
-    @Override
-    public boolean visitTable(String schemaName, String tableName, Table table) {
-      Preconditions.checkNotNull(table, "Error. Table %s.%s provided is null.", schemaName, tableName);
-
-      // skip over unknown table types
-      if (table.getJdbcTableType() != null) {
-        records.add(new Records.Table(IS_CATALOG_NAME, schemaName, tableName,
-            table.getJdbcTableType().toString()));
-      }
-
-      return false;
+    protected List<Records.Table> collect(RecordCollector recordCollector, String schemaPath, SchemaPlus schema) {
+      return recordCollector.tables(schemaPath, schema);
     }
   }
 
   public static class Views extends InfoSchemaRecordGenerator<Records.View> {
-    List<Records.View> records = Lists.newArrayList();
 
-    public Views(OptionManager optionManager) {
-      super(optionManager);
+    public Views(FilterEvaluator filterEvaluator) {
+      super(filterEvaluator);
     }
 
     @Override
@@ -376,20 +149,15 @@
     }
 
     @Override
-    public boolean visitTable(String schemaName, String tableName, Table table) {
-      if (table.getJdbcTableType() == TableType.VIEW) {
-        // View's SQL may not be available for some non-Drill views, for example, JDBC view
-        records.add(new Records.View(IS_CATALOG_NAME, schemaName, tableName,
-            table instanceof DrillViewInfoProvider ? ((DrillViewInfoProvider) table).getViewSql() : ""));
-      }
-      return false;
+    protected List<Records.View> collect(RecordCollector recordCollector, String schemaPath, SchemaPlus schema) {
+      return recordCollector.views(schemaPath, schema);
     }
   }
 
   public static class Columns extends InfoSchemaRecordGenerator<Records.Column> {
-    List<Records.Column> records = Lists.newArrayList();
-    public Columns(OptionManager optionManager) {
-      super(optionManager);
+
+    public Columns(FilterEvaluator filterEvaluator) {
+      super(filterEvaluator);
     }
 
     @Override
@@ -398,17 +166,32 @@
     }
 
     @Override
-    public void visitField(String schemaName, String tableName, RelDataTypeField field) {
-      records.add(new Records.Column(IS_CATALOG_NAME, schemaName, tableName, field));
+    protected List<Records.Column> collect(RecordCollector recordCollector, String schemaPath, SchemaPlus schema) {
+      return recordCollector.columns(schemaPath, schema);
+    }
+  }
+
+  public static class Partitions extends InfoSchemaRecordGenerator<Records.Partition> {
+
+    public Partitions(FilterEvaluator filterEvaluator) {
+      super(filterEvaluator);
+    }
+
+    @Override
+    public PojoRecordReader<Records.Partition> getRecordReader() {
+      return new PojoRecordReader<>(Records.Partition.class, records);
+    }
+
+    @Override
+    protected List<Records.Partition> collect(RecordCollector recordCollector, String schemaPath, SchemaPlus schema) {
+      return recordCollector.partitions(schemaPath, schema);
     }
   }
 
   public static class Files extends InfoSchemaRecordGenerator<Records.File> {
 
-    List<Records.File> records = new ArrayList<>();
-
-    public Files(OptionManager optionManager) {
-      super(optionManager);
+    public Files(FilterEvaluator filterEvaluator) {
+      super(filterEvaluator);
     }
 
     @Override
@@ -417,23 +200,8 @@
     }
 
     @Override
-    public void visitFiles(String schemaName, SchemaPlus schemaPlus) {
-      try {
-        AbstractSchema schema = schemaPlus.unwrap(AbstractSchema.class);
-        if (schema instanceof WorkspaceSchemaFactory.WorkspaceSchema) {
-          WorkspaceSchemaFactory.WorkspaceSchema wsSchema = (WorkspaceSchemaFactory.WorkspaceSchema) schema;
-          String defaultLocation = wsSchema.getDefaultLocation();
-          FileSystem fs = wsSchema.getFS();
-          boolean recursive = optionManager.getBoolean(ExecConstants.LIST_FILES_RECURSIVELY);
-          // add URI to the path to ensure that directory objects are skipped (see S3AFileSystem.listStatus method)
-          FileSystemUtil.listAllSafe(fs, new Path(fs.getUri().toString(), defaultLocation), recursive).forEach(
-              fileStatus -> records.add(new Records.File(schemaName, wsSchema, fileStatus))
-          );
-        }
-      } catch (ClassCastException | UnsupportedOperationException e) {
-        // ignore the exception since either this is not a Drill schema or schema does not support files listing
-      }
+    protected List<Records.File> collect(RecordCollector recordCollector, String schemaPath, SchemaPlus schema) {
+      return recordCollector.files(schemaPath, schema);
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index 08608a6..7fbb3ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -17,18 +17,8 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.schema.Table;
-
-import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
-
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
@@ -37,10 +27,19 @@
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractSchema;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
-
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.SystemPlugin;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_SCHEMA_NAME;
 
 @SystemPlugin
 public class InfoSchemaStoragePlugin extends AbstractStoragePlugin {
@@ -52,7 +51,7 @@
     this(InfoSchemaConfig.INSTANCE, context, InfoSchemaConstants.IS_SCHEMA_NAME);
   }
 
-  public InfoSchemaStoragePlugin(InfoSchemaConfig config, DrillbitContext context, String name){
+  public InfoSchemaStoragePlugin(InfoSchemaConfig config, DrillbitContext context, String name) {
     super(context, name);
     this.config = config;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
index 6aa762f..4a0bba5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaSubScan.java
@@ -17,14 +17,12 @@
  */
 package org.apache.drill.exec.store.ischema;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.exec.physical.base.AbstractSubScan;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public class InfoSchemaSubScan extends AbstractSubScan{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaSubScan.class);
+public class InfoSchemaSubScan extends AbstractSubScan {
 
   private final InfoSchemaTableType table;
   private final InfoSchemaFilter filter;
@@ -51,5 +49,4 @@
   public int getOperatorType() {
     return CoreOperatorType.INFO_SCHEMA_SUB_SCAN_VALUE;
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
index e09942e..f2228ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTable.java
@@ -17,22 +17,41 @@
  */
 package org.apache.drill.exec.store.ischema;
 
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_CONNECT;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_DESCRIPTION;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.CATS_COL_CATALOG_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_CHARACTER_MAXIMUM_LENGTH;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_CHARACTER_OCTET_LENGTH;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_DEFAULT;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_FORMAT;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_COLUMN_SIZE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_DATA_TYPE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_DATETIME_PRECISION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_EST_NUM_NON_NULLS;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_INTERVAL_PRECISION;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_INTERVAL_TYPE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_IS_NULLABLE;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_MAX_VAL;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_MIN_VAL;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_NDV;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_NUMERIC_PRECISION;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_NUMERIC_PRECISION_RADIX;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_NUMERIC_SCALE;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_NUM_NULLS;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_ORDINAL_POSITION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.COLS_COL_IS_NESTED;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ACCESS_TIME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_FILE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_GROUP;
@@ -46,6 +65,13 @@
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_ROOT_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_SCHEMA_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.FILES_COL_WORKSPACE_NAME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.PARTITIONS_COL_LAST_MODIFIED_TIME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.PARTITIONS_COL_LOCATION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.PARTITIONS_COL_METADATA_IDENTIFIER;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.PARTITIONS_COL_METADATA_KEY;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.PARTITIONS_COL_METADATA_TYPE;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.PARTITIONS_COL_PARTITION_COLUMN;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.PARTITIONS_COL_PARTITION_VALUE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_CATALOG_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_IS_MUTABLE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SCHS_COL_SCHEMA_NAME;
@@ -54,44 +80,24 @@
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_CATALOG;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_NAME;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_LAST_MODIFIED_TIME;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_LOCATION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_NUM_ROWS;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_TABLE_SOURCE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.TBLS_COL_TABLE_TYPE;
 import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.VIEWS_COL_VIEW_DEFINITION;
 
-import java.util.List;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.server.options.OptionManager;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
 /**
- * Base class for tables in INFORMATION_SCHEMA.  Defines the table (fields and types).
+ * Base class for tables in INFORMATION_SCHEMA. Defines the table (fields and types).
  */
 public abstract class InfoSchemaTable<S> {
 
-  public static class Field {
-    public String name;
-    public MajorType type;
-
-    public static Field create(String name, MajorType type) {
-      Field field = new Field();
-      field.name = name;
-      field.type = type;
-      return field;
-    }
-  }
-
   public static final MajorType INT = Types.required(MinorType.INT);
   public static final MajorType BIGINT = Types.required(MinorType.BIGINT);
   public static final MajorType VARCHAR = Types.required(MinorType.VARCHAR);
   public static final MajorType BIT = Types.required(MinorType.BIT);
   public static final MajorType TIMESTAMP = Types.required(MinorType.TIMESTAMP);
+  public static final MajorType FLOAT8 = Types.required(MinorType.FLOAT8);
 
   private final List<Field> fields;
 
@@ -100,13 +106,12 @@
   }
 
   public RelDataType getRowType(RelDataTypeFactory typeFactory) {
-
-    // Convert the array of Drill types to an array of Optiq types
-    List<RelDataType> relTypes = Lists.newArrayList();
-    List<String> fieldNames = Lists.newArrayList();
+    // Convert the list of Drill types to an list of Optiq types
+    List<RelDataType> relTypes = new ArrayList<>();
+    List<String> fieldNames = new ArrayList<>();
     for (Field field : fields) {
-      relTypes.add(getRelDataType(typeFactory, field.type));
-      fieldNames.add(field.name);
+      relTypes.add(getRelDataType(typeFactory, field.getType()));
+      fieldNames.add(field.getName());
     }
 
     return typeFactory.createStructType(relTypes, fieldNames);
@@ -125,173 +130,222 @@
         return typeFactory.createSqlType(SqlTypeName.BOOLEAN);
       case TIMESTAMP:
         return typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+      case FLOAT8:
+        return typeFactory.createSqlType(SqlTypeName.FLOAT);
       default:
-        throw new UnsupportedOperationException("Only INT, BIGINT, VARCHAR, BOOLEAN and TIMESTAMP types are supported in " +
+        throw new UnsupportedOperationException("Only INT, BIGINT, VARCHAR, BOOLEAN, TIMESTAMP and DOUBLE types are supported in " +
             InfoSchemaConstants.IS_SCHEMA_NAME);
     }
   }
 
-  public abstract InfoSchemaRecordGenerator<S> getRecordGenerator(OptionManager optionManager);
+  public abstract InfoSchemaRecordGenerator<S> getRecordGenerator(FilterEvaluator filterEvaluator);
 
-  /** Layout for the CATALOGS table. */
+  /**
+   * Layout for the CATALOGS table.
+   */
   public static class Catalogs extends InfoSchemaTable<Records.Catalog> {
-    // NOTE:  Nothing seems to verify that the types here (apparently used
-    // by SQL validation) match the types of the fields in Records.Catalogs).
-    private static final List<Field> fields = ImmutableList.of(
-        Field.create(CATS_COL_CATALOG_NAME, VARCHAR),
-        Field.create(CATS_COL_CATALOG_DESCRIPTION, VARCHAR),
-        Field.create(CATS_COL_CATALOG_CONNECT, VARCHAR));
+
+    private static final List<Field> fields = Arrays.asList(
+      Field.create(CATS_COL_CATALOG_NAME, VARCHAR),
+      Field.create(CATS_COL_CATALOG_DESCRIPTION, VARCHAR),
+      Field.create(CATS_COL_CATALOG_CONNECT, VARCHAR));
 
     public Catalogs() {
       super(fields);
     }
 
     @Override
-    public InfoSchemaRecordGenerator<Records.Catalog> getRecordGenerator(OptionManager optionManager) {
-      return new InfoSchemaRecordGenerator.Catalogs(optionManager);
+    public InfoSchemaRecordGenerator<Records.Catalog> getRecordGenerator(FilterEvaluator filterEvaluator) {
+      return new InfoSchemaRecordGenerator.Catalogs(filterEvaluator);
     }
   }
 
-  /** Layout for the SCHEMATA table. */
+  /**
+   * Layout for the SCHEMATA table.
+   */
   public static class Schemata extends InfoSchemaTable<Records.Schema> {
-    // NOTE:  Nothing seems to verify that the types here (apparently used
-    // by SQL validation) match the types of the fields in Records.Schemata).
-    private static final List<Field> fields = ImmutableList.of(
-        Field.create(SCHS_COL_CATALOG_NAME, VARCHAR),
-        Field.create(SCHS_COL_SCHEMA_NAME, VARCHAR),
-        Field.create(SCHS_COL_SCHEMA_OWNER, VARCHAR),
-        Field.create(SCHS_COL_TYPE, VARCHAR),
-        Field.create(SCHS_COL_IS_MUTABLE, VARCHAR));
+
+    private static final List<Field> fields = Arrays.asList(
+      Field.create(SCHS_COL_CATALOG_NAME, VARCHAR),
+      Field.create(SCHS_COL_SCHEMA_NAME, VARCHAR),
+      Field.create(SCHS_COL_SCHEMA_OWNER, VARCHAR),
+      Field.create(SCHS_COL_TYPE, VARCHAR),
+      Field.create(SCHS_COL_IS_MUTABLE, VARCHAR));
 
     public Schemata() {
       super(fields);
     }
 
     @Override
-    public InfoSchemaRecordGenerator<Records.Schema> getRecordGenerator(OptionManager optionManager) {
-      return new InfoSchemaRecordGenerator.Schemata(optionManager);
+    public InfoSchemaRecordGenerator<Records.Schema> getRecordGenerator(FilterEvaluator filterEvaluator) {
+      return new InfoSchemaRecordGenerator.Schemata(filterEvaluator);
     }
   }
 
-  /** Layout for the TABLES table. */
+  /**
+   * Layout for the TABLES table.
+   */
   public static class Tables extends InfoSchemaTable<Records.Table> {
-    // NOTE:  Nothing seems to verify that the types here (apparently used
-    // by SQL validation) match the types of the fields in Records.Tables).
-    private static final List<Field> fields = ImmutableList.of(
-        Field.create(SHRD_COL_TABLE_CATALOG, VARCHAR),
-        Field.create(SHRD_COL_TABLE_SCHEMA, VARCHAR),
-        Field.create(SHRD_COL_TABLE_NAME, VARCHAR),
-        Field.create(TBLS_COL_TABLE_TYPE, VARCHAR));
+
+    private static final List<Field> fields = Arrays.asList(
+      Field.create(SHRD_COL_TABLE_CATALOG, VARCHAR),
+      Field.create(SHRD_COL_TABLE_SCHEMA, VARCHAR),
+      Field.create(SHRD_COL_TABLE_NAME, VARCHAR),
+      Field.create(TBLS_COL_TABLE_TYPE, VARCHAR),
+      Field.create(TBLS_COL_TABLE_SOURCE, VARCHAR),
+      Field.create(TBLS_COL_LOCATION, VARCHAR),
+      Field.create(TBLS_COL_NUM_ROWS, BIGINT),
+      Field.create(TBLS_COL_LAST_MODIFIED_TIME, TIMESTAMP));
 
     public Tables() {
       super(fields);
     }
 
     @Override
-    public InfoSchemaRecordGenerator<Records.Table> getRecordGenerator(OptionManager optionManager) {
-      return new InfoSchemaRecordGenerator.Tables(optionManager);
+    public InfoSchemaRecordGenerator<Records.Table> getRecordGenerator(FilterEvaluator filterEvaluator) {
+      return new InfoSchemaRecordGenerator.Tables(filterEvaluator);
     }
   }
 
-  /** Layout for the VIEWS table. */
+  /**
+   * Layout for the VIEWS table.
+   */
   public static class Views extends InfoSchemaTable<Records.View> {
-    // NOTE:  Nothing seems to verify that the types here (apparently used
-    // by SQL validation) match the types of the fields in Records.Views).
-    private static final List<Field> fields = ImmutableList.of(
-        Field.create(SHRD_COL_TABLE_CATALOG, VARCHAR),
-        Field.create(SHRD_COL_TABLE_SCHEMA, VARCHAR),
-        Field.create(SHRD_COL_TABLE_NAME, VARCHAR),
-        Field.create(VIEWS_COL_VIEW_DEFINITION, VARCHAR));
+
+    private static final List<Field> fields = Arrays.asList(
+      Field.create(SHRD_COL_TABLE_CATALOG, VARCHAR),
+      Field.create(SHRD_COL_TABLE_SCHEMA, VARCHAR),
+      Field.create(SHRD_COL_TABLE_NAME, VARCHAR),
+      Field.create(VIEWS_COL_VIEW_DEFINITION, VARCHAR));
 
     public Views() {
       super(fields);
     }
 
     @Override
-    public InfoSchemaRecordGenerator<Records.View> getRecordGenerator(OptionManager optionManager) {
-      return new InfoSchemaRecordGenerator.Views(optionManager);
+    public InfoSchemaRecordGenerator<Records.View> getRecordGenerator(FilterEvaluator filterEvaluator) {
+      return new InfoSchemaRecordGenerator.Views(filterEvaluator);
     }
   }
 
-  /** Layout for the COLUMNS table. */
+  /**
+   * Layout for the COLUMNS table.
+   */
   public static class Columns extends InfoSchemaTable<Records.Column> {
-    // COLUMNS columns, from SQL standard:
-    // 1. TABLE_CATALOG
-    // 2. TABLE_SCHEMA
-    // 3. TABLE_NAME
-    // 4. COLUMN_NAME
-    // 5. ORDINAL_POSITION
-    // 6. COLUMN_DEFAULT
-    // 7. IS_NULLABLE
-    // 8. DATA_TYPE
-    // 9. CHARACTER_MAXIMUM_LENGTH
-    // 10. CHARACTER_OCTET_LENGTH
-    // 11. NUMERIC_PRECISION
-    // 12. NUMERIC_PRECISION_RADIX
-    // 13. NUMERIC_SCALE
-    // 14. DATETIME_PRECISION
-    // 15. INTERVAL_TYPE
-    // 16. INTERVAL_PRECISION
-    // 17. CHARACTER_SET_CATALOG ...
 
-    // NOTE:  Nothing seems to verify that the types here (apparently used
-    // by SQL validation) match the types of the fields in Records.Columns).
-    private static final List<Field> fields = ImmutableList.of(
-        Field.create(SHRD_COL_TABLE_CATALOG, VARCHAR),
-        Field.create(SHRD_COL_TABLE_SCHEMA, VARCHAR),
-        Field.create(SHRD_COL_TABLE_NAME, VARCHAR),
-        Field.create(COLS_COL_COLUMN_NAME, VARCHAR),
-        Field.create(COLS_COL_ORDINAL_POSITION, INT),
-        Field.create(COLS_COL_COLUMN_DEFAULT, VARCHAR),
-        Field.create(COLS_COL_IS_NULLABLE, VARCHAR),
-        Field.create(COLS_COL_DATA_TYPE, VARCHAR),
-        Field.create(COLS_COL_CHARACTER_MAXIMUM_LENGTH, INT),
-        Field.create(COLS_COL_CHARACTER_OCTET_LENGTH, INT),
-        Field.create(COLS_COL_NUMERIC_PRECISION, INT),
-        Field.create(COLS_COL_NUMERIC_PRECISION_RADIX, INT),
-        Field.create(COLS_COL_NUMERIC_SCALE, INT),
-        Field.create(COLS_COL_DATETIME_PRECISION, INT),
-        Field.create(COLS_COL_INTERVAL_TYPE, VARCHAR),
-        Field.create(COLS_COL_INTERVAL_PRECISION, INT)
-        );
+    private static final List<Field> fields = Arrays.asList(
+      Field.create(SHRD_COL_TABLE_CATALOG, VARCHAR),
+      Field.create(SHRD_COL_TABLE_SCHEMA, VARCHAR),
+      Field.create(SHRD_COL_TABLE_NAME, VARCHAR),
+      Field.create(COLS_COL_COLUMN_NAME, VARCHAR),
+      Field.create(COLS_COL_ORDINAL_POSITION, INT),
+      Field.create(COLS_COL_COLUMN_DEFAULT, VARCHAR),
+      Field.create(COLS_COL_IS_NULLABLE, VARCHAR),
+      Field.create(COLS_COL_DATA_TYPE, VARCHAR),
+      Field.create(COLS_COL_CHARACTER_MAXIMUM_LENGTH, INT),
+      Field.create(COLS_COL_CHARACTER_OCTET_LENGTH, INT),
+      Field.create(COLS_COL_NUMERIC_PRECISION, INT),
+      Field.create(COLS_COL_NUMERIC_PRECISION_RADIX, INT),
+      Field.create(COLS_COL_NUMERIC_SCALE, INT),
+      Field.create(COLS_COL_DATETIME_PRECISION, INT),
+      Field.create(COLS_COL_INTERVAL_TYPE, VARCHAR),
+      Field.create(COLS_COL_INTERVAL_PRECISION, INT),
+      Field.create(COLS_COL_COLUMN_SIZE, INT),
+      Field.create(COLS_COL_COLUMN_FORMAT, VARCHAR),
+      Field.create(COLS_COL_NUM_NULLS, BIGINT),
+      Field.create(COLS_COL_MIN_VAL, VARCHAR),
+      Field.create(COLS_COL_MAX_VAL, VARCHAR),
+      Field.create(COLS_COL_NDV, FLOAT8),
+      Field.create(COLS_COL_EST_NUM_NON_NULLS, FLOAT8),
+      Field.create(COLS_COL_IS_NESTED, BIT));
 
     public Columns() {
       super(fields);
     }
 
     @Override
-    public InfoSchemaRecordGenerator<Records.Column> getRecordGenerator(OptionManager optionManager) {
-      return new InfoSchemaRecordGenerator.Columns(optionManager);
+    public InfoSchemaRecordGenerator<Records.Column> getRecordGenerator(FilterEvaluator filterEvaluator) {
+      return new InfoSchemaRecordGenerator.Columns(filterEvaluator);
     }
   }
 
-  /** Layout for the FILES table. */
+  /**
+   * Layout for the PARTITIONS table.
+   */
+  public static class Partitions extends InfoSchemaTable<Records.Partition> {
+
+    private static final List<Field> fields = Arrays.asList(
+      Field.create(SHRD_COL_TABLE_CATALOG, VARCHAR),
+      Field.create(SHRD_COL_TABLE_SCHEMA, VARCHAR),
+      Field.create(SHRD_COL_TABLE_NAME, VARCHAR),
+      Field.create(PARTITIONS_COL_METADATA_KEY, VARCHAR),
+      Field.create(PARTITIONS_COL_METADATA_TYPE, VARCHAR),
+      Field.create(PARTITIONS_COL_METADATA_IDENTIFIER, VARCHAR),
+      Field.create(PARTITIONS_COL_PARTITION_COLUMN, VARCHAR),
+      Field.create(PARTITIONS_COL_PARTITION_VALUE, VARCHAR),
+      Field.create(PARTITIONS_COL_LOCATION, VARCHAR),
+      Field.create(PARTITIONS_COL_LAST_MODIFIED_TIME, TIMESTAMP));
+
+    public Partitions() {
+      super(fields);
+    }
+
+    @Override
+    public InfoSchemaRecordGenerator<Records.Partition> getRecordGenerator(FilterEvaluator filterEvaluator) {
+      return new InfoSchemaRecordGenerator.Partitions(filterEvaluator);
+    }
+  }
+
+  /**
+   * Layout for the FILES table.
+   */
   public static class Files extends InfoSchemaTable<Records.File> {
 
-    private static final List<Field> fields = ImmutableList.of(
-        Field.create(FILES_COL_SCHEMA_NAME, VARCHAR),
-        Field.create(FILES_COL_ROOT_SCHEMA_NAME, VARCHAR),
-        Field.create(FILES_COL_WORKSPACE_NAME, VARCHAR),
-        Field.create(FILES_COL_FILE_NAME, VARCHAR),
-        Field.create(FILES_COL_RELATIVE_PATH, VARCHAR),
-        Field.create(FILES_COL_IS_DIRECTORY, BIT),
-        Field.create(FILES_COL_IS_FILE, BIT),
-        Field.create(FILES_COL_LENGTH, BIGINT),
-        Field.create(FILES_COL_OWNER, VARCHAR),
-        Field.create(FILES_COL_GROUP, VARCHAR),
-        Field.create(FILES_COL_PERMISSION, VARCHAR),
-        Field.create(FILES_COL_ACCESS_TIME, TIMESTAMP),
-        Field.create(FILES_COL_MODIFICATION_TIME, TIMESTAMP)
-    );
+    private static final List<Field> fields = Arrays.asList(
+      Field.create(FILES_COL_SCHEMA_NAME, VARCHAR),
+      Field.create(FILES_COL_ROOT_SCHEMA_NAME, VARCHAR),
+      Field.create(FILES_COL_WORKSPACE_NAME, VARCHAR),
+      Field.create(FILES_COL_FILE_NAME, VARCHAR),
+      Field.create(FILES_COL_RELATIVE_PATH, VARCHAR),
+      Field.create(FILES_COL_IS_DIRECTORY, BIT),
+      Field.create(FILES_COL_IS_FILE, BIT),
+      Field.create(FILES_COL_LENGTH, BIGINT),
+      Field.create(FILES_COL_OWNER, VARCHAR),
+      Field.create(FILES_COL_GROUP, VARCHAR),
+      Field.create(FILES_COL_PERMISSION, VARCHAR),
+      Field.create(FILES_COL_ACCESS_TIME, TIMESTAMP),
+      Field.create(FILES_COL_MODIFICATION_TIME, TIMESTAMP));
 
     public Files() {
       super(fields);
     }
 
     @Override
-    public InfoSchemaRecordGenerator<Records.File> getRecordGenerator(OptionManager optionManager) {
-      return new InfoSchemaRecordGenerator.Files(optionManager);
+    public InfoSchemaRecordGenerator<Records.File> getRecordGenerator(FilterEvaluator filterEvaluator) {
+      return new InfoSchemaRecordGenerator.Files(filterEvaluator);
     }
   }
 
+  public static class Field {
+
+    private final String name;
+    private final MajorType type;
+
+    private Field(String name, MajorType type) {
+      this.name = name;
+      this.type = type;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public MajorType getType() {
+      return type;
+    }
+
+    public static Field create(String name, MajorType type) {
+      return new Field(name, type);
+    }
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
index 961b90d..3e38fb2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaTableType.java
@@ -20,17 +20,25 @@
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Catalogs;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Columns;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Files;
+import org.apache.drill.exec.store.ischema.InfoSchemaTable.Partitions;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Schemata;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Tables;
 import org.apache.drill.exec.store.ischema.InfoSchemaTable.Views;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
+import org.apache.drill.metastore.Metastore;
+import org.apache.drill.metastore.MetastoreRegistry;
+import org.apache.drill.metastore.exceptions.MetastoreException;
+import org.slf4j.Logger;
+
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * The set of tables/views in INFORMATION_SCHEMA.
+ * The set of tables / views in INFORMATION_SCHEMA.
  */
 public enum InfoSchemaTableType {
 
@@ -39,22 +47,43 @@
   VIEWS(new Views()),
   COLUMNS(new Columns()),
   TABLES(new Tables()),
+  PARTITIONS(new Partitions()),
   FILES(new Files());
 
+  private static final Logger logger = getLogger(InfoSchemaTableType.class);
+
   private final InfoSchemaTable<?> tableDef;
 
   /**
-   * ...
+   * Constructor to init {@link InfoSchemaTableType} class.
+   *
    * @param  tableDef  the definition (columns and data generator) of the table
    */
   InfoSchemaTableType(InfoSchemaTable<?> tableDef) {
     this.tableDef = tableDef;
   }
 
-  public <S> PojoRecordReader<S> getRecordReader(SchemaPlus rootSchema, InfoSchemaFilter filter, OptionManager optionManager) {
-    @SuppressWarnings("unchecked")
-    InfoSchemaRecordGenerator<S> recordGenerator = (InfoSchemaRecordGenerator<S>) tableDef.getRecordGenerator(optionManager);
-    recordGenerator.setInfoSchemaFilter(filter);
+  public <S> PojoRecordReader<S> getRecordReader(SchemaPlus rootSchema,
+                                                 InfoSchemaFilter filter,
+                                                 OptionManager optionManager,
+                                                 MetastoreRegistry metastoreRegistry) {
+    FilterEvaluator filterEvaluator = filter == null
+      ? FilterEvaluator.NoFilterEvaluator.INSTANCE
+      : new FilterEvaluator.InfoSchemaFilterEvaluator(filter);
+
+    @SuppressWarnings("unchecked") InfoSchemaRecordGenerator<S> recordGenerator = (InfoSchemaRecordGenerator<S>) tableDef.getRecordGenerator(filterEvaluator);
+
+    recordGenerator.registerRecordCollector(new RecordCollector.BasicRecordCollector(filterEvaluator, optionManager));
+
+    if (optionManager.getBoolean(ExecConstants.METASTORE_ENABLED)) {
+      try {
+        Metastore metastore = metastoreRegistry.get();
+        recordGenerator.registerRecordCollector(new RecordCollector.MetastoreRecordCollector(metastore, filterEvaluator));
+      } catch (MetastoreException e) {
+        logger.warn("Unable to init Drill Metastore: {}", e.getMessage(), e);
+      }
+    }
+
     recordGenerator.scanSchema(rootSchema);
     return recordGenerator.getRecordReader();
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
new file mode 100644
index 0000000..92f1d14
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.ischema;
+
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.logical.DrillViewInfoProvider;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
+import org.apache.drill.exec.util.FileSystemUtil;
+import org.apache.drill.metastore.Metastore;
+import org.apache.drill.metastore.components.tables.BasicTablesTransformer;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.BaseTableMetadata;
+import org.apache.drill.metastore.metadata.MetadataInfo;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.metadata.TableInfo;
+import org.apache.drill.metastore.statistics.ColumnStatistics;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.drill.exec.planner.types.DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_CONNECT;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_DESCRIPTION;
+import static org.apache.drill.exec.store.ischema.InfoSchemaConstants.IS_CATALOG_NAME;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provides methods to collect various information_schema data.
+ */
+public interface RecordCollector {
+
+  /**
+   * Collects catalogs data for information_schema.
+   *
+   * @param schemaPath schema name
+   * @param schema schema instance
+   * @return list of catalog records
+   */
+  List<Records.Catalog> catalogs(String schemaPath, SchemaPlus schema);
+
+  /**
+   * Collects schemas data for information_schema.
+   *
+   * @param schemaPath schema name
+   * @param schema schema instance
+   * @return list of schema records
+   */
+  List<Records.Schema> schemas(String schemaPath, SchemaPlus schema);
+
+  /**
+   * Collects tables data for information_schema.
+   *
+   * @param schemaPath schema name
+   * @param schema schema instance
+   * @return list of table records
+   */
+  List<Records.Table> tables(String schemaPath, SchemaPlus schema);
+
+  /**
+   * Collects views data for information_schema.
+   *
+   * @param schemaPath schema name
+   * @param schema schema instance
+   * @return list of view records
+   */
+  List<Records.View> views(String schemaPath, SchemaPlus schema);
+
+  /**
+   * Collects columns data for information_schema.
+   *
+   * @param schemaPath schema name
+   * @param schema schema instance
+   * @return list of column records
+   */
+  List<Records.Column> columns(String schemaPath, SchemaPlus schema);
+
+  /**
+   * Collects partitions data for information_schema.
+   *
+   * @param schemaPath schema name
+   * @param schema schema instance
+   * @return list of partition records
+   */
+  List<Records.Partition> partitions(String schemaPath, SchemaPlus schema);
+
+  /**
+   * Collects files data for information_schema.
+   *
+   * @param schemaPath schema name
+   * @param schema schema instance
+   * @return list of file records
+   */
+  List<Records.File> files(String schemaPath, SchemaPlus schema);
+
+  /**
+   * Provides information_schema data based on information stored in {@link AbstractSchema}.
+   */
+  class BasicRecordCollector implements RecordCollector {
+
+    private static final String DEFAULT_OWNER = "<owner>";
+
+    private final FilterEvaluator filterEvaluator;
+    private final OptionManager optionManager;
+
+    public BasicRecordCollector(FilterEvaluator filterEvaluator, OptionManager optionManager) {
+      this.filterEvaluator = filterEvaluator;
+      this.optionManager = optionManager;
+    }
+
+    @Override
+    public List<Records.Catalog> catalogs(String schemaPath, SchemaPlus schema) {
+      return Collections.singletonList(new Records.Catalog(IS_CATALOG_NAME, IS_CATALOG_DESCRIPTION, IS_CATALOG_CONNECT));
+    }
+
+    @Override
+    public List<Records.Schema> schemas(String schemaPath, SchemaPlus schema) {
+      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+
+      return Collections.singletonList(new Records.Schema(IS_CATALOG_NAME, schemaPath,
+        DEFAULT_OWNER, drillSchema.getTypeName(), drillSchema.isMutable()));
+    }
+
+    @Override
+    public List<Records.Table> tables(String schemaPath, SchemaPlus schema) {
+      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+
+      return drillSchema.getTableNamesAndTypes().stream()
+        .filter(entry -> filterEvaluator.shouldVisitTable(schemaPath, entry.getKey(), entry.getValue()))
+        .map(entry -> new Records.Table(IS_CATALOG_NAME, schemaPath, entry.getKey(), entry.getValue().toString()))
+        .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<Records.View> views(String schemaPath, SchemaPlus schema) {
+      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+
+      return drillSchema.getTablesByNames(schema.getTableNames()).stream()
+        .filter(pair -> pair.getValue().getJdbcTableType() == Schema.TableType.VIEW)
+        .filter(pair -> filterEvaluator.shouldVisitTable(schemaPath, pair.getKey(), pair.getValue().getJdbcTableType()))
+        .map(pair -> new Records.View(IS_CATALOG_NAME, schemaPath, pair.getKey(),
+          // View's SQL may not be available for some non-Drill views, for example, JDBC view
+          pair.getValue() instanceof DrillViewInfoProvider ? ((DrillViewInfoProvider) pair.getValue()).getViewSql() : ""))
+        .collect(Collectors.toList());
+    }
+
+    @Override
+    public List<Records.Column> columns(String schemaPath, SchemaPlus schema) {
+      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+      List<Records.Column> records = new ArrayList<>();
+      for (Pair<String, ? extends Table> tableNameToTable : drillSchema.getTablesByNames(schema.getTableNames())) {
+        String tableName = tableNameToTable.getKey();
+        Table table = tableNameToTable.getValue();
+        Schema.TableType tableType = table.getJdbcTableType();
+
+        if (filterEvaluator.shouldVisitTable(schemaPath, tableName, tableType)) {
+          RelDataType tableRow = table.getRowType(new JavaTypeFactoryImpl(DRILL_REL_DATATYPE_SYSTEM));
+          for (RelDataTypeField field : tableRow.getFieldList()) {
+            if (filterEvaluator.shouldVisitColumn(schemaPath, tableName, field.getName())) {
+              records.add(new Records.Column(IS_CATALOG_NAME, schemaPath, tableName, field));
+            }
+          }
+        }
+      }
+      return records;
+    }
+
+    @Override
+    public List<Records.Partition> partitions(String schemaPath, SchemaPlus schema) {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public List<Records.File> files(String schemaPath, SchemaPlus schema) {
+      if (filterEvaluator.shouldVisitFiles(schemaPath, schema)) {
+        try {
+          AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+          if (drillSchema instanceof WorkspaceSchemaFactory.WorkspaceSchema) {
+            WorkspaceSchemaFactory.WorkspaceSchema wsSchema = (WorkspaceSchemaFactory.WorkspaceSchema) drillSchema;
+            String defaultLocation = wsSchema.getDefaultLocation();
+            FileSystem fs = wsSchema.getFS();
+            boolean recursive = optionManager.getBoolean(ExecConstants.LIST_FILES_RECURSIVELY);
+            // add URI to the path to ensure that directory objects are skipped (see S3AFileSystem.listStatus method)
+            return FileSystemUtil.listAllSafe(fs, new Path(fs.getUri().toString(), defaultLocation), recursive).stream()
+              .map(fileStatus -> new Records.File(schemaPath, wsSchema, fileStatus))
+              .collect(Collectors.toList());
+          }
+        } catch (ClassCastException | UnsupportedOperationException e) {
+          // ignore the exception since either this is not a Drill schema or schema does not support files listing
+        }
+      }
+      return Collections.emptyList();
+    }
+  }
+
+  /**
+   * Provides information_schema data based on information stored in Drill Metastore.
+   */
+  class MetastoreRecordCollector implements RecordCollector {
+
+    private static final Logger logger = getLogger(MetastoreRecordCollector.class);
+
+    public static final int UNDEFINED_INDEX = -1;
+    public static final String SCHEMA = "schema";
+
+    private final Metastore metastore;
+    private final FilterEvaluator filterEvaluator;
+
+    public MetastoreRecordCollector(Metastore metastore, FilterEvaluator filterEvaluator) {
+      this.metastore = metastore;
+      this.filterEvaluator = filterEvaluator;
+    }
+
+    @Override
+    public List<Records.Catalog> catalogs(String schemaPath, SchemaPlus schema) {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public List<Records.Schema> schemas(String schemaPath, SchemaPlus schema) {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public List<Records.Table> tables(String schemaPath, SchemaPlus schema) {
+      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+      List<Records.Table> records = new ArrayList<>();
+      List<BaseTableMetadata> baseTableMetadata;
+      if (shouldVisitSchema(drillSchema)) {
+        try {
+          baseTableMetadata = metastore.tables().basicRequests()
+            .tablesMetadata(FilterExpression.and(
+              FilterExpression.equal(TableInfo.STORAGE_PLUGIN, drillSchema.getSchemaPath().get(0)),
+              FilterExpression.equal(TableInfo.WORKSPACE, drillSchema.getSchemaPath().get(1))));
+        } catch (Exception e) {
+          // ignore all exceptions related to Metastore data retrieval, return empty result
+          logger.warn("Error while retrieving Metastore table data: {}", e.getMessage());
+          logger.debug(e.getMessage(), e);
+          return records;
+        }
+
+        baseTableMetadata.stream()
+          .filter(table -> filterEvaluator.shouldVisitTable(schemaPath, table.getTableInfo().name(), Schema.TableType.TABLE))
+          .map(table -> new Records.Table(IS_CATALOG_NAME, schemaPath, Schema.TableType.TABLE.toString(), table))
+          .forEach(records::add);
+      }
+      return records;
+    }
+
+    @Override
+    public List<Records.View> views(String schemaPath, SchemaPlus schema) {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public List<Records.Column> columns(String schemaPath, SchemaPlus schema) {
+      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+      List<Records.Column> records = new ArrayList<>();
+      if (shouldVisitSchema(drillSchema)) {
+        List<BaseTableMetadata> baseTableMetadata;
+        try {
+          baseTableMetadata = metastore.tables().basicRequests()
+            .tablesMetadata(FilterExpression.and(
+              FilterExpression.equal(TableInfo.STORAGE_PLUGIN, drillSchema.getSchemaPath().get(0)),
+              FilterExpression.equal(TableInfo.WORKSPACE, drillSchema.getSchemaPath().get(1)),
+              // exclude tables without schema
+              FilterExpression.isNotNull(SCHEMA)));
+        } catch (Exception e) {
+          // ignore all exceptions related to Metastore data retrieval, return empty result
+          logger.warn("Error while retrieving Metastore table data: {}", e.getMessage());
+          logger.debug(e.getMessage(), e);
+          return records;
+        }
+
+        baseTableMetadata.stream()
+          .filter(table -> filterEvaluator.shouldVisitTable(schemaPath, table.getTableInfo().name(), Schema.TableType.TABLE))
+          .map(table -> columns(schemaPath, table, table.getSchema(), null, UNDEFINED_INDEX, false))
+          .forEach(records::addAll);
+      }
+      return records;
+    }
+
+    /**
+     * Recursively scan given table schema and provides list of column records.
+     * Recursion is used to scan map / struct columns which have nested columns.
+     *
+     * @param schemaPath schema name
+     * @param table table instance
+     * @param schema table or column schema
+     * @param parentColumnName parent column name if any
+     * @param columnIndex column index if any
+     * @param isNested indicates if column is nested
+     * @return list of column records
+     */
+    private List<Records.Column> columns(String schemaPath,
+                                         BaseTableMetadata table,
+                                         TupleMetadata schema,
+                                         String parentColumnName,
+                                         int columnIndex,
+                                         boolean isNested) {
+      List<Records.Column> records = new ArrayList<>();
+      schema.toMetadataList().forEach(
+        column -> {
+          // concat parent column name to use full column name, i.e. struct_col.nested_col
+          String columnName = parentColumnName == null ? column.name() : parentColumnName + "." + column.name();
+          // nested columns have the same index as their parent
+          int currentIndex = columnIndex == UNDEFINED_INDEX ? schema.index(column.name()) : columnIndex;
+          // if column is a map / struct, recursively scan nested columns
+          if (column.isMap()) {
+            List<Records.Column> mapRecords =
+              columns(schemaPath, table, column.mapSchema(), columnName, currentIndex, true);
+            records.addAll(mapRecords);
+          }
+
+          String tableName = table.getTableInfo().name();
+          if (filterEvaluator.shouldVisitColumn(schemaPath, tableName, columnName)) {
+            ColumnStatistics columnStatistics =
+              table.getColumnStatistics(SchemaPath.parseFromString(columnName));
+            records.add(new Records.Column(IS_CATALOG_NAME, schemaPath, tableName, columnName,
+              column, columnStatistics, currentIndex, isNested));
+          }
+        });
+      return records;
+    }
+
+    @Override
+    public List<Records.Partition> partitions(String schemaPath, SchemaPlus schema) {
+      AbstractSchema drillSchema = schema.unwrap(AbstractSchema.class);
+      List<Records.Partition> records = new ArrayList<>();
+      if (shouldVisitSchema(drillSchema)) {
+
+        BasicTablesTransformer.MetadataHolder metadataHolder;
+        try {
+          List<TableMetadataUnit> units = metastore.tables().read()
+            .filter(FilterExpression.and(
+              FilterExpression.equal(TableInfo.STORAGE_PLUGIN, drillSchema.getSchemaPath().get(0)),
+              FilterExpression.equal(TableInfo.WORKSPACE, drillSchema.getSchemaPath().get(1)),
+              // include SEGMENT and PARTITION data only
+              FilterExpression.in(MetadataInfo.METADATA_TYPE, MetadataType.SEGMENT.name(), MetadataType.PARTITION.name()),
+              // exclude DEFAULT_SEGMENT (used only for non-partitioned tables)
+              FilterExpression.notEqual(MetadataInfo.METADATA_KEY, MetadataInfo.DEFAULT_SEGMENT_KEY)))
+            .execute();
+
+          metadataHolder = BasicTablesTransformer.all(units);
+        } catch (Exception e) {
+          // ignore all exceptions related to Metastore data retrieval, return empty result
+          logger.warn("Error while retrieving Metastore segment / partition data: {}", e.getMessage());
+          logger.debug(e.getMessage(), e);
+          return records;
+        }
+
+        metadataHolder.segments().stream()
+          .filter(segment -> filterEvaluator.shouldVisitTable(schemaPath, segment.getTableInfo().name(), Schema.TableType.TABLE))
+          .map(segment -> Records.Partition.fromSegment(IS_CATALOG_NAME, schemaPath, segment))
+          .forEach(records::addAll);
+
+        metadataHolder.partitions().stream()
+          .filter(partition -> filterEvaluator.shouldVisitTable(schemaPath, partition.getTableInfo().name(), Schema.TableType.TABLE))
+          .map(partition -> Records.Partition.fromPartition(IS_CATALOG_NAME, schemaPath, partition))
+          .forEach(records::addAll);
+      }
+      return records;
+    }
+
+    @Override
+    public List<Records.File> files(String schemaPath, SchemaPlus schema) {
+      return Collections.emptyList();
+    }
+
+    /**
+     * Checks if given schema should be searched in Drill Metastore.
+     * Schema must have to parent with corresponds to storage plugin and
+     * actual name which corresponds to workspace name.
+     *
+     * @param schema schema instance
+     * @return true if schema should be visited, false otherwise
+     */
+    private boolean shouldVisitSchema(AbstractSchema schema) {
+      return schema.getSchemaPath().size() == 2;
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java
index 5608560..09f8cee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/Records.java
@@ -17,48 +17,106 @@
  */
 package org.apache.drill.exec.store.ischema;
 
-import static org.slf4j.LoggerFactory.getLogger;
-
 import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.planner.types.DrillRelDataTypeSystem;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
+import org.apache.drill.metastore.metadata.BaseTableMetadata;
+import org.apache.drill.metastore.metadata.PartitionMetadata;
+import org.apache.drill.metastore.metadata.SegmentMetadata;
+import org.apache.drill.metastore.statistics.ColumnStatistics;
+import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
+import org.apache.drill.metastore.statistics.Statistic;
+import org.apache.drill.metastore.statistics.TableStatisticsKind;
+import org.apache.drill.shaded.guava.com.google.common.base.MoreObjects;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 
-import org.apache.drill.shaded.guava.com.google.common.base.MoreObjects;
-
 import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.ZoneId;
 import java.time.ZoneOffset;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
 
 public class Records {
 
-  /** Pojo object for a record in INFORMATION_SCHEMA.TABLES */
+  public static final String YES = "YES";
+  public static final String NO = "NO";
+
+  /**
+   * Converts boolean value into its String representation.
+   *
+   * @param value boolean value
+   * @return boolean value String representation
+   */
+  public static String convertToString(boolean value) {
+    return value ? YES : NO;
+  }
+
+  /**
+   * Converts given time millis into {@link Timestamp} instance.
+   *
+   * @param millis time millis
+   * @return {@link Timestamp} instance
+   */
+  public static Timestamp convertToTimestamp(long millis) {
+    return millis == BaseTableMetadata.UNDEFINED_TIME
+      ? null
+      : Timestamp.from(Instant.ofEpochMilli(millis));
+  }
+
+  /**
+   * Pojo object for a record in INFORMATION_SCHEMA.TABLES
+   */
   public static class Table {
+
     public final String TABLE_CATALOG;
     public final String TABLE_SCHEMA;
     public final String TABLE_NAME;
     public final String TABLE_TYPE;
+    public final String TABLE_SOURCE;
+    public final String LOCATION;
+    public final Long NUM_ROWS;
+    public final Timestamp LAST_MODIFIED_TIME;
 
     public Table(String catalog, String schema, String name, String type) {
       this.TABLE_CATALOG = catalog;
       this.TABLE_SCHEMA = schema;
       this.TABLE_NAME = name;
       this.TABLE_TYPE = type;
+      this.TABLE_SOURCE = null;
+      this.LOCATION = null;
+      this.NUM_ROWS = null;
+      this.LAST_MODIFIED_TIME = null;
+    }
+
+    public Table(String catalog, String schema, String type, BaseTableMetadata tableMetadata) {
+      this.TABLE_CATALOG = catalog;
+      this.TABLE_SCHEMA = schema;
+      this.TABLE_NAME = tableMetadata.getTableInfo().name();
+      this.TABLE_TYPE = type;
+      this.TABLE_SOURCE = tableMetadata.getTableInfo().type();
+      this.LOCATION = tableMetadata.getLocation().toString();
+      this.NUM_ROWS = tableMetadata.getStatistic(TableStatisticsKind.ROW_COUNT);
+      this.LAST_MODIFIED_TIME = convertToTimestamp(tableMetadata.getLastModifiedTime());
     }
   }
 
-  /** Pojo object for a record in INFORMATION_SCHEMA.COLUMNS */
+  /**
+   * Pojo object for a record in INFORMATION_SCHEMA.COLUMNS
+   */
   public static class Column {
-    private static final Logger logger = getLogger( Column.class );
 
-    // TODO:  Resolve:  Do we have such a constant elsewhere?  If so, use it.
-    // If not, where should this really live?:
+    private static final Logger logger = getLogger(Column.class);
     private static final int MAX_UTF8_BYTES_PER_CHARACTER = 4;
 
     public final String TABLE_CATALOG;
@@ -69,7 +127,6 @@
     public final String COLUMN_DEFAULT;
     public final String IS_NULLABLE;
     public final String DATA_TYPE;
-    public final Integer COLUMN_SIZE;
     public final Integer CHARACTER_MAXIMUM_LENGTH;
     public final Integer CHARACTER_OCTET_LENGTH;
     public final Integer NUMERIC_PRECISION;
@@ -78,6 +135,14 @@
     public final Integer DATETIME_PRECISION;
     public final String INTERVAL_TYPE;
     public final Integer INTERVAL_PRECISION;
+    public final Integer COLUMN_SIZE;
+    public final String COLUMN_FORMAT;
+    public final Long NUM_NULLS;
+    public final String MIN_VAL;
+    public final String MAX_VAL;
+    public final Double NDV;
+    public final Double EST_NUM_NON_NULLS;
+    public final boolean IS_NESTED;
 
     // See:
     // - ISO/IEC 9075-11:2011(E) 5.21 COLUMNS view
@@ -97,9 +162,10 @@
       this.ORDINAL_POSITION = 1 + field.getIndex();
 
       this.COLUMN_DEFAULT = null;
-      this.IS_NULLABLE = relDataType.isNullable() ? "YES" : "NO";
+      this.IS_NULLABLE = convertToString(relDataType.isNullable());
+      this.IS_NESTED = false;
 
-      switch ( sqlTypeName ) {
+      switch (sqlTypeName) {
         // 1. SqlTypeName enumerators whose names (currently) match SQL's values
         //    for DATA_TYPE (those which have been seen in tests and verified):
         case BOOLEAN:
@@ -164,20 +230,20 @@
           break;
         // 3:  SqlTypeName enumerators not yet seen and confirmed or handled.
         default:
-          logger.warn( "Type not handled explicitly (code needs review): "
-                       + sqlTypeName );
+          logger.warn("Type not handled explicitly (code needs review): "
+                       + sqlTypeName);
           this.DATA_TYPE = sqlTypeName.toString();
           break;
       }
 
       // Note: The branches are in the same order as SQL constraint
       // DATA_TYPE_DESCRIPTOR_DATA_TYPE_CHECK_COMBINATIONS.
-      switch ( sqlTypeName ) {
+      switch (sqlTypeName) {
         case CHAR:
         case VARCHAR:
           this.CHARACTER_MAXIMUM_LENGTH = relDataType.getPrecision();
-          if ( this.CHARACTER_MAXIMUM_LENGTH
-              < Integer.MAX_VALUE / MAX_UTF8_BYTES_PER_CHARACTER ) {
+          if (this.CHARACTER_MAXIMUM_LENGTH
+              < Integer.MAX_VALUE / MAX_UTF8_BYTES_PER_CHARACTER) {
             this.CHARACTER_OCTET_LENGTH =
                 this.CHARACTER_MAXIMUM_LENGTH * MAX_UTF8_BYTES_PER_CHARACTER;
           }
@@ -227,7 +293,7 @@
           this.CHARACTER_MAXIMUM_LENGTH = null;
           this.CHARACTER_OCTET_LENGTH = null;
           // This NUMERIC_PRECISION is in bits since NUMERIC_PRECISION_RADIX is 2.
-          switch ( sqlTypeName ) {
+          switch (sqlTypeName) {
             case TINYINT:
               NUMERIC_PRECISION = 8;
               break;
@@ -276,7 +342,7 @@
           this.CHARACTER_MAXIMUM_LENGTH = null;
           this.CHARACTER_OCTET_LENGTH = null;
           // This NUMERIC_PRECISION is in bits since NUMERIC_PRECISION_RADIX is 2.
-          switch ( sqlTypeName ) {
+          switch (sqlTypeName) {
             case REAL:
               NUMERIC_PRECISION = 24;
               break;
@@ -314,7 +380,7 @@
           this.DATETIME_PRECISION = relDataType.getPrecision();
           this.INTERVAL_TYPE = null;
           this.INTERVAL_PRECISION = null;
-          switch(sqlTypeName) {
+          switch (sqlTypeName) {
           case DATE:
             this.COLUMN_SIZE = 10;
             break;// yyyy-MM-dd
@@ -352,7 +418,7 @@
           this.NUMERIC_PRECISION = null;
           this.NUMERIC_PRECISION_RADIX = null;
           this.NUMERIC_SCALE = null;
-          switch ( sqlTypeName ) {
+          switch (sqlTypeName) {
             case INTERVAL_YEAR:
             case INTERVAL_YEAR_MONTH:
             case INTERVAL_MONTH:
@@ -375,12 +441,11 @@
                   relDataType
                   .getIntervalQualifier()
                   .getFractionalSecondPrecision(
-                      DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM );
+                      DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM);
               break;
             default:
               throw new AssertionError(
-                  "Unexpected type " + sqlTypeName + " in interval-types branch" );
-              //break;
+                "Unexpected type " + sqlTypeName + " in interval-types branch");
           }
           this.INTERVAL_PRECISION =
               relDataType
@@ -390,7 +455,7 @@
             final TimeUnit start = relDataType.getIntervalQualifier().getStartUnit();
             // NOTE: getEndUnit() returns null instead of YEAR for "INTERVAL YEAR".
             final TimeUnit end = MoreObjects.firstNonNull(relDataType.getIntervalQualifier().getEndUnit(), start);
-            if ( start == end ) {
+            if (start == end) {
               this.INTERVAL_TYPE = start.name();
             }
             else {
@@ -402,7 +467,7 @@
               ? DATETIME_PRECISION + 1 // add 1 for decimal point
               : 0;
 
-            switch(start) {
+            switch (start) {
             case YEAR:
               switch(end) {
               case YEAR:
@@ -417,7 +482,7 @@
               break;
 
             case MONTH:
-              switch(end) {
+              switch (end) {
               case MONTH:
                 this.COLUMN_SIZE = this.INTERVAL_PRECISION + 2;
                 break; // P..M
@@ -427,7 +492,7 @@
               break;
 
             case DAY:
-              switch(end) {
+              switch (end) {
               case DAY:
                 this.COLUMN_SIZE = this.INTERVAL_PRECISION + 2;
                 break; // P..D
@@ -446,7 +511,7 @@
               break;
 
             case HOUR:
-              switch(end) {
+              switch (end) {
               case HOUR:
                 this.COLUMN_SIZE = this.INTERVAL_PRECISION + 3;
                 break; // PT..H
@@ -462,7 +527,7 @@
               break;
 
             case MINUTE:
-              switch(end) {
+              switch (end) {
               case MINUTE:
                 this.COLUMN_SIZE = this.INTERVAL_PRECISION + 3;
                 break; // PT...M
@@ -476,7 +541,7 @@
 
 
             case SECOND:
-              switch(end) {
+              switch (end) {
               case SECOND:
                 this.COLUMN_SIZE = this.INTERVAL_PRECISION + 3 + extraSecondIntervalSize;
                 break; // PT....S
@@ -503,12 +568,97 @@
           this.COLUMN_SIZE = null;
         break;
       }
+      this.COLUMN_FORMAT = null;
+      this.NUM_NULLS = null;
+      this.MIN_VAL = null;
+      this.MAX_VAL = null;
+      this.NDV = null;
+      this.EST_NUM_NON_NULLS = null;
+    }
 
+    public Column(String catalog, String schemaName, String tableName, String columnName,
+                  ColumnMetadata columnMetadata, ColumnStatistics columnStatistics, int index,
+                  boolean isNested) {
+      this.TABLE_CATALOG = catalog;
+      this.TABLE_SCHEMA = schemaName;
+      this.TABLE_NAME = tableName;
+      this.COLUMN_NAME = columnName;
+      this.ORDINAL_POSITION = index + 1;
+      this.COLUMN_DEFAULT = columnMetadata.defaultValue();
+      this.IS_NULLABLE = convertToString(columnMetadata.isNullable());
+      this.COLUMN_FORMAT = columnMetadata.format();
+      this.IS_NESTED = isNested;
+
+      TypeProtos.MajorType type = columnMetadata.majorType();
+      switch (type.getMinorType()) {
+        case INTERVAL:
+        case INTERVALDAY:
+        case INTERVALYEAR:
+          this.DATA_TYPE = TypeProtos.MinorType.INTERVAL.name();
+          break;
+        default:
+          this.DATA_TYPE = Types.getSqlTypeName(type);
+      }
+
+      int columnSize = Types.getJdbcDisplaySize(type);
+      this.COLUMN_SIZE = columnSize == 0 && Types.isScalarStringType(type) ? Types.MAX_VARCHAR_LENGTH : columnSize;
+
+      if (Types.isScalarStringType(type)) {
+        this.CHARACTER_MAXIMUM_LENGTH = COLUMN_SIZE;
+        this.CHARACTER_OCTET_LENGTH = COLUMN_SIZE;
+      } else {
+        this.CHARACTER_MAXIMUM_LENGTH = null;
+        this.CHARACTER_OCTET_LENGTH = null;
+      }
+
+      if (Types.isNumericType(type)) {
+        this.NUMERIC_PRECISION = type.getPrecision();
+        this.NUMERIC_PRECISION_RADIX = Types.isDecimalType(type) ? 10 : 2;
+        this.NUMERIC_SCALE = type.getScale();
+      } else {
+        this.NUMERIC_PRECISION = null;
+        this.NUMERIC_PRECISION_RADIX = null;
+        this.NUMERIC_SCALE = null;
+      }
+
+      if (Types.isDateTimeType(type)) {
+        this.DATETIME_PRECISION = COLUMN_SIZE;
+      } else {
+        this.DATETIME_PRECISION = null;
+      }
+
+      if (Types.isIntervalType(type)) {
+        this.INTERVAL_TYPE = Types.getSqlTypeName(type);
+        this.INTERVAL_PRECISION = 0;
+      } else {
+        this.INTERVAL_TYPE = null;
+        this.INTERVAL_PRECISION = null;
+      }
+
+      if (columnStatistics == null) {
+        this.NUM_NULLS = null;
+        this.MIN_VAL = null;
+        this.MAX_VAL = null;
+        this.NDV = null;
+        this.EST_NUM_NON_NULLS = null;
+      } else {
+        Long numNulls = ColumnStatisticsKind.NULLS_COUNT.getFrom(columnStatistics);
+        this.NUM_NULLS = numNulls == Statistic.NO_COLUMN_STATS ? null : numNulls;
+        Object minVal = ColumnStatisticsKind.MIN_VALUE.getFrom(columnStatistics);
+        this.MIN_VAL = minVal == null ? null : minVal.toString();
+        Object maxVal = ColumnStatisticsKind.MAX_VALUE.getFrom(columnStatistics);
+        this.MAX_VAL = maxVal == null ? null : maxVal.toString();
+        this.NDV = ColumnStatisticsKind.NDV.getFrom(columnStatistics);
+        this.EST_NUM_NON_NULLS = ColumnStatisticsKind.NON_NULL_COUNT.getFrom(columnStatistics);
+      }
     }
   }
 
-  /** Pojo object for a record in INFORMATION_SCHEMA.VIEWS */
+  /**
+   * Pojo object for a record in INFORMATION_SCHEMA.VIEWS
+   */
   public static class View {
+
     public final String TABLE_CATALOG;
     public final String TABLE_SCHEMA;
     public final String TABLE_NAME;
@@ -522,8 +672,11 @@
     }
   }
 
-  /** Pojo object for a record in INFORMATION_SCHEMA.CATALOGS */
+  /**
+   * Pojo object for a record in INFORMATION_SCHEMA.CATALOGS
+   */
   public static class Catalog {
+
     public final String CATALOG_NAME;
     public final String CATALOG_DESCRIPTION;
     public final String CATALOG_CONNECT;
@@ -535,8 +688,11 @@
     }
   }
 
-  /** Pojo object for a record in INFORMATION_SCHEMA.SCHEMATA */
+  /**
+   * Pojo object for a record in INFORMATION_SCHEMA.SCHEMATA
+   */
   public static class Schema {
+
     public final String CATALOG_NAME;
     public final String SCHEMA_NAME;
     public final String SCHEMA_OWNER;
@@ -548,11 +704,68 @@
       this.SCHEMA_NAME = name;
       this.SCHEMA_OWNER = owner;
       this.TYPE = type;
-      this.IS_MUTABLE = isMutable ? "YES" : "NO";
+      this.IS_MUTABLE = convertToString(isMutable);
     }
   }
 
-  /** Pojo object for a record in INFORMATION_SCHEMA.FILES */
+  /**
+   * Pojo object for a record in INFORMATION_SCHEMA.PARTITIONS
+   */
+  public static class Partition {
+
+    public final String TABLE_CATALOG;
+    public final String TABLE_SCHEMA;
+    public final String TABLE_NAME;
+    public final String METADATA_KEY;
+    public final String METADATA_TYPE;
+    public final String METADATA_IDENTIFIER;
+    public final String PARTITION_COLUMN;
+    public final String PARTITION_VALUE;
+    public final String LOCATION;
+    public final Timestamp LAST_MODIFIED_TIME;
+
+    public Partition(String catalog, String schemaName, String value, SegmentMetadata segmentMetadata) {
+      this.TABLE_CATALOG = catalog;
+      this.TABLE_SCHEMA = schemaName;
+      this.TABLE_NAME = segmentMetadata.getTableInfo().name();
+      this.METADATA_KEY = segmentMetadata.getMetadataInfo().key();
+      this.METADATA_TYPE = segmentMetadata.getMetadataInfo().type().name();
+      this.METADATA_IDENTIFIER = segmentMetadata.getMetadataInfo().identifier();
+      this.PARTITION_COLUMN = segmentMetadata.getColumn().toString();
+      this.PARTITION_VALUE = value;
+      this.LOCATION = segmentMetadata.getLocation().toString();
+      this.LAST_MODIFIED_TIME = convertToTimestamp(segmentMetadata.getLastModifiedTime());
+    }
+
+    public Partition(String catalog, String schemaName, String value, PartitionMetadata partitionMetadata) {
+      this.TABLE_CATALOG = catalog;
+      this.TABLE_SCHEMA = schemaName;
+      this.TABLE_NAME = partitionMetadata.getTableInfo().name();
+      this.METADATA_KEY = partitionMetadata.getMetadataInfo().key();
+      this.METADATA_TYPE = partitionMetadata.getMetadataInfo().type().name();
+      this.METADATA_IDENTIFIER = partitionMetadata.getMetadataInfo().identifier();
+      this.PARTITION_COLUMN = partitionMetadata.getColumn().toString();
+      this.PARTITION_VALUE = value;
+      this.LOCATION = null;
+      this.LAST_MODIFIED_TIME = convertToTimestamp(partitionMetadata.getLastModifiedTime());
+    }
+
+    public static List<Partition> fromSegment(String catalog, String schemaName, SegmentMetadata segmentMetadata) {
+      return segmentMetadata.getPartitionValues().stream()
+        .map(value -> new Partition(catalog, schemaName, value, segmentMetadata))
+        .collect(Collectors.toList());
+    }
+
+    public static List<Partition> fromPartition(String catalog, String schemaName, PartitionMetadata partitionMetadata) {
+      return partitionMetadata.getPartitionValues().stream()
+        .map(value -> new Partition(catalog, schemaName, value, partitionMetadata))
+        .collect(Collectors.toList());
+    }
+  }
+
+  /**
+   * Pojo object for a record in INFORMATION_SCHEMA.FILES
+   */
   public static class File {
 
     public final String SCHEMA_NAME;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/package-info.java
deleted file mode 100644
index bdbf96a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package org.apache.drill.exec.store.ischema;
\ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
index 75ab417..ff23958 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/metadata/MetadataProvider.java
@@ -73,6 +73,7 @@
 import org.apache.drill.exec.store.ischema.Records.Table;
 import org.apache.drill.exec.store.pojo.PojoRecordReader;
 
+import org.apache.drill.metastore.MetastoreRegistry;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.ComparisonChain;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
@@ -153,6 +154,10 @@
     public DrillConfig getConfig() {
       return dContext.getConfig();
     }
+
+    public MetastoreRegistry getMetastoreRegistry() {
+      return dContext.getMetastoreRegistry();
+    }
   }
 
   /**
@@ -182,7 +187,7 @@
 
       try {
         final PojoRecordReader<Catalog> records =
-            getPojoRecordReader(CATALOGS, filter, getConfig(), schemaProvider, session);
+            getPojoRecordReader(CATALOGS, filter, getConfig(), schemaProvider, session, getMetastoreRegistry());
 
         List<CatalogMetadata> metadata = new ArrayList<>();
         for(Catalog c : records) {
@@ -238,7 +243,7 @@
 
       try {
         final PojoRecordReader<Schema> records =
-            getPojoRecordReader(SCHEMATA, filter, getConfig(), schemaProvider, session);
+            getPojoRecordReader(SCHEMATA, filter, getConfig(), schemaProvider, session, getMetastoreRegistry());
 
         List<SchemaMetadata> metadata = new ArrayList<>();
         for(Schema s : records) {
@@ -298,7 +303,7 @@
 
       try {
         final PojoRecordReader<Table> records =
-            getPojoRecordReader(TABLES, filter, getConfig(), schemaProvider, session);
+            getPojoRecordReader(TABLES, filter, getConfig(), schemaProvider, session, getMetastoreRegistry());
 
         List<TableMetadata> metadata = new ArrayList<>();
         for(Table t : records) {
@@ -359,7 +364,7 @@
 
       try {
         final PojoRecordReader<Column> records =
-            getPojoRecordReader(COLUMNS, filter, getConfig(), schemaProvider, session);
+            getPojoRecordReader(COLUMNS, filter, getConfig(), schemaProvider, session, getMetastoreRegistry());
 
         List<ColumnMetadata> metadata = new ArrayList<>();
         for(Column c : records) {
@@ -559,10 +564,10 @@
    * @return
    */
   private static <S> PojoRecordReader<S> getPojoRecordReader(final InfoSchemaTableType tableType, final InfoSchemaFilter filter, final DrillConfig config,
-      final SchemaTreeProvider provider, final UserSession userSession) {
+      final SchemaTreeProvider provider, final UserSession userSession, final MetastoreRegistry metastoreRegistry) {
     final SchemaPlus rootSchema =
         provider.createFullRootSchema(userSession.getCredentials().getUserName(), newSchemaConfigInfoProvider(config, userSession, provider));
-    return tableType.getRecordReader(rootSchema, filter, userSession.getOptions());
+    return tableType.getRecordReader(rootSchema, filter, userSession.getOptions(), metastoreRegistry);
   }
 
   /**
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
index 1ccd9b4..e3bdcfa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
@@ -32,7 +32,6 @@
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.SystemOptionManager;
 import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
-import org.apache.drill.common.util.GuavaPatcher;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.DrillTest;
 import org.apache.hadoop.conf.Configuration;
@@ -56,9 +55,6 @@
   public static final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
 
   protected static SystemOptionManager optionManager;
-  static {
-    GuavaPatcher.patch();
-  }
 
   protected static final DrillConfig c = DrillConfig.create();
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
index f437776..02efcee 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchema.java
@@ -52,7 +52,7 @@
  */
 @Category(SqlTest.class)
 public class TestInfoSchema extends BaseTestQuery {
-  public static final String TEST_SUB_DIR = "testSubDir";
+  private static final String TEST_SUB_DIR = "testSubDir";
   private static final ObjectMapper mapper = new ObjectMapper().enable(INDENT_OUTPUT);
 
   @BeforeClass
@@ -69,6 +69,7 @@
     test("select * from INFORMATION_SCHEMA.`TABLES`");
     test("select * from INFORMATION_SCHEMA.COLUMNS");
     test("select * from INFORMATION_SCHEMA.`FILES`");
+    test("select * from INFORMATION_SCHEMA.`PARTITIONS`");
   }
 
   @Test
@@ -83,28 +84,29 @@
 
   @Test
   public void showTablesFromDb() throws Exception{
-    final List<String[]> expected = Arrays.asList(
+    List<String[]> expected = Arrays.asList(
         new String[]{"information_schema", "VIEWS"},
         new String[]{"information_schema", "COLUMNS"},
         new String[]{"information_schema", "TABLES"},
         new String[]{"information_schema", "CATALOGS"},
         new String[]{"information_schema", "SCHEMATA"},
-        new String[]{"information_schema", "FILES"});
+        new String[]{"information_schema", "FILES"},
+        new String[]{"information_schema", "PARTITIONS"});
 
-    final TestBuilder t1 = testBuilder()
+    TestBuilder t1 = testBuilder()
         .sqlQuery("SHOW TABLES FROM INFORMATION_SCHEMA")
         .unOrdered()
         .baselineColumns("TABLE_SCHEMA", "TABLE_NAME");
-    for(String[] expectedRow : expected) {
+    for (String[] expectedRow : expected) {
       t1.baselineValues(expectedRow);
     }
     t1.go();
 
-    final TestBuilder t2 = testBuilder()
+    TestBuilder t2 = testBuilder()
         .sqlQuery("SHOW TABLES IN INFORMATION_SCHEMA")
         .unOrdered()
         .baselineColumns("TABLE_SCHEMA", "TABLE_NAME");
-    for(String[] expectedRow : expected) {
+    for (String[] expectedRow : expected) {
       t2.baselineValues(expectedRow);
     }
     t2.go();
@@ -203,6 +205,10 @@
         .baselineValues("TABLE_SCHEMA", "CHARACTER VARYING", "NO")
         .baselineValues("TABLE_NAME", "CHARACTER VARYING", "NO")
         .baselineValues("TABLE_TYPE", "CHARACTER VARYING", "NO")
+        .baselineValues("TABLE_SOURCE", "CHARACTER VARYING", "NO")
+        .baselineValues("LOCATION", "CHARACTER VARYING", "NO")
+        .baselineValues("NUM_ROWS", "BIGINT", "NO")
+        .baselineValues("LAST_MODIFIED_TIME", "TIMESTAMP", "NO")
         .go();
   }
 
@@ -237,6 +243,10 @@
           .baselineValues("TABLE_SCHEMA", "CHARACTER VARYING", "NO")
           .baselineValues("TABLE_NAME", "CHARACTER VARYING", "NO")
           .baselineValues("TABLE_TYPE", "CHARACTER VARYING", "NO")
+          .baselineValues("TABLE_SOURCE", "CHARACTER VARYING", "NO")
+          .baselineValues("LOCATION", "CHARACTER VARYING", "NO")
+          .baselineValues("NUM_ROWS", "BIGINT", "NO")
+          .baselineValues("LAST_MODIFIED_TIME", "TIMESTAMP", "NO")
           .go();
     } finally {
       test("DROP VIEW IF EXISTS dfs.tmp.`TABLES`");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java
new file mode 100644
index 0000000..6ce32e4
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestInfoSchemaWithMetastore.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.sql;
+
+import org.apache.drill.categories.MetastoreTest;
+import org.apache.drill.categories.SqlTest;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.ischema.InfoSchemaConstants;
+import org.apache.drill.metastore.Metastore;
+import org.apache.drill.metastore.MetastoreRegistry;
+import org.apache.drill.metastore.metadata.BaseTableMetadata;
+import org.apache.drill.metastore.metadata.MetadataInfo;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.metadata.PartitionMetadata;
+import org.apache.drill.metastore.metadata.SegmentMetadata;
+import org.apache.drill.metastore.metadata.TableInfo;
+import org.apache.drill.metastore.statistics.ColumnStatistics;
+import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
+import org.apache.drill.metastore.statistics.StatisticsHolder;
+import org.apache.drill.metastore.statistics.TableStatisticsKind;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Category({SqlTest.class, MetastoreTest.class, UnlikelyTest.class})
+public class TestInfoSchemaWithMetastore extends ClusterTest {
+
+  private static final List<String> TABLES_COLUMNS = Arrays.asList(
+    InfoSchemaConstants.SHRD_COL_TABLE_CATALOG,
+    InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA,
+    InfoSchemaConstants.SHRD_COL_TABLE_NAME,
+    InfoSchemaConstants.TBLS_COL_TABLE_TYPE,
+    InfoSchemaConstants.TBLS_COL_TABLE_SOURCE,
+    InfoSchemaConstants.TBLS_COL_LOCATION,
+    InfoSchemaConstants.TBLS_COL_NUM_ROWS,
+    InfoSchemaConstants.TBLS_COL_LAST_MODIFIED_TIME);
+
+  @ClassRule
+  public static TemporaryFolder root = new TemporaryFolder();
+
+  private static Metastore metastore;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    builder.configProperty(ExecConstants.ZK_ROOT, root.getRoot().toString());
+    builder.sessionOption(ExecConstants.METASTORE_ENABLED, true);
+    startCluster(builder);
+    MetastoreRegistry metastoreRegistry = client.cluster().drillbit().getContext().getMetastoreRegistry();
+    metastore = metastoreRegistry.get();
+  }
+
+  @Test
+  public void testTableNoStats() throws Exception {
+    String tableName = "table_no_stats";
+    BaseTableMetadata table = BaseTableMetadata.builder()
+      .tableInfo(TableInfo.builder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .name(tableName)
+        .type("PARQUET")
+        .build())
+      .metadataInfo(MetadataInfo.builder()
+        .type(MetadataType.TABLE)
+        .key(MetadataInfo.GENERAL_INFO_KEY)
+        .build())
+      .location(new Path("/tmp", tableName))
+      .metadataStatistics(Collections.emptyList())
+      .columnsStatistics(Collections.emptyMap())
+      .partitionKeys(Collections.emptyMap())
+      .build();
+
+    metastore.tables().modify()
+      .overwrite(table.toMetadataUnit())
+      .execute();
+
+    client.testBuilder()
+      .sqlQuery("select %s from information_schema.`tables` where table_name = '%s'",
+        String.join(", ", TABLES_COLUMNS), tableName)
+      .unOrdered()
+      .baselineColumns(TABLES_COLUMNS.toArray(new String[0]))
+      .baselineValues("DRILL", "dfs.tmp", tableName, "TABLE", table.getTableInfo().type(),
+        table.getLocation().toUri().toString(), null, null)
+      .go();
+  }
+
+  @Test
+  public void testTableWithStats() throws Exception {
+    ZonedDateTime currentTime = currentUtcTime();
+
+    String tableName = "table_with_stats";
+    BaseTableMetadata table = BaseTableMetadata.builder()
+      .tableInfo(TableInfo.builder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .name(tableName)
+        .type("PARQUET")
+        .build())
+      .metadataInfo(MetadataInfo.builder()
+        .type(MetadataType.TABLE)
+        .key(MetadataInfo.GENERAL_INFO_KEY)
+        .build())
+      .location(new Path("/tmp", tableName))
+      .metadataStatistics(Collections.singletonList(
+        new StatisticsHolder<>(100L, TableStatisticsKind.ROW_COUNT)))
+      .columnsStatistics(Collections.emptyMap())
+      .partitionKeys(Collections.emptyMap())
+      .lastModifiedTime(currentTime.toInstant().toEpochMilli())
+      .build();
+
+    metastore.tables().modify()
+      .overwrite(table.toMetadataUnit())
+      .execute();
+
+    client.testBuilder()
+      .sqlQuery("select %s from information_schema.`tables` where table_name = '%s'",
+        String.join(", ", TABLES_COLUMNS), tableName)
+      .unOrdered()
+      .baselineColumns(TABLES_COLUMNS.toArray(new String[0]))
+      .baselineValues("DRILL", "dfs.tmp", tableName, "TABLE", table.getTableInfo().type(),
+        table.getLocation().toUri().toString(), 100L, currentTime.toLocalDateTime())
+      .go();
+  }
+
+  @Test
+  public void testColumns() throws Exception {
+    BaseTableMetadata tableNoSchema = BaseTableMetadata.builder()
+      .tableInfo(TableInfo.builder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .name("table_no_schema")
+        .type("PARQUET")
+        .build())
+      .metadataInfo(MetadataInfo.builder()
+        .type(MetadataType.TABLE)
+        .key(MetadataInfo.GENERAL_INFO_KEY)
+        .build())
+      .location(new Path("/tmp", "table_no_schema"))
+      .metadataStatistics(Collections.emptyList())
+      .columnsStatistics(Collections.emptyMap())
+      .partitionKeys(Collections.emptyMap())
+      .build();
+
+    TupleMetadata schema = new SchemaBuilder()
+      .addNullable("bigint_col", TypeProtos.MinorType.BIGINT)
+      .addDecimal("decimal_col", TypeProtos.MinorType.VARDECIMAL, TypeProtos.DataMode.OPTIONAL, 10, 2)
+      .add("interval_col", TypeProtos.MinorType.INTERVALYEAR)
+      .addArray("array_col", TypeProtos.MinorType.BIT)
+      .addMap("struct_col")
+        .addNullable("struct_bigint", TypeProtos.MinorType.BIGINT)
+        .add("struct_varchar", TypeProtos.MinorType.VARCHAR)
+        .addMap("nested_struct")
+          .addNullable("nested_struct_boolean", TypeProtos.MinorType.BIT)
+          .add("nested_struct_varchar", TypeProtos.MinorType.VARCHAR)
+          .resumeMap()
+        .resumeSchema()
+      .buildSchema();
+
+    PrimitiveColumnMetadata varcharCol = new PrimitiveColumnMetadata("varchar_col",
+      TypeProtos.MajorType.newBuilder()
+        .setMinorType(TypeProtos.MinorType.VARCHAR)
+        .setMode(TypeProtos.DataMode.REQUIRED)
+        .build());
+    varcharCol.setDefaultValue("ABC");
+
+    PrimitiveColumnMetadata timestampColumn = new PrimitiveColumnMetadata("timestamp_col",
+      TypeProtos.MajorType.newBuilder()
+        .setMinorType(TypeProtos.MinorType.TIMESTAMP)
+        .setMode(TypeProtos.DataMode.REQUIRED)
+        .build());
+    timestampColumn.setFormat("yyyy-MM-dd HH:mm:ss");
+
+    schema.addColumn(varcharCol);
+    schema.addColumn(timestampColumn);
+
+    Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
+    columnsStatistics.put(SchemaPath.parseFromString("varchar_col"),
+      new ColumnStatistics(Arrays.asList(
+        new StatisticsHolder<>("aaa", ColumnStatisticsKind.MIN_VALUE),
+        new StatisticsHolder<>("zzz", ColumnStatisticsKind.MAX_VALUE))));
+    columnsStatistics.put(SchemaPath.parseFromString("struct_col.nested_struct.nested_struct_varchar"),
+      new ColumnStatistics(Arrays.asList(
+        new StatisticsHolder<>("bbb", ColumnStatisticsKind.MIN_VALUE),
+        new StatisticsHolder<>("ccc", ColumnStatisticsKind.MAX_VALUE))));
+    columnsStatistics.put(SchemaPath.parseFromString("bigint_col"),
+      new ColumnStatistics(Arrays.asList(
+        new StatisticsHolder<>(100L, ColumnStatisticsKind.NULLS_COUNT),
+        new StatisticsHolder<>(10.5D, ColumnStatisticsKind.NDV))));
+    columnsStatistics.put(SchemaPath.parseFromString("struct_col.struct_bigint"),
+      new ColumnStatistics(Collections.singletonList(
+        new StatisticsHolder<>(10.5D, ColumnStatisticsKind.NON_NULL_COUNT))));
+
+    ZonedDateTime currentTime = currentUtcTime();
+
+    String tableName = "table_with_schema";
+    BaseTableMetadata tableWithSchema = BaseTableMetadata.builder()
+      .tableInfo(TableInfo.builder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .name(tableName)
+        .type("PARQUET")
+        .build())
+      .metadataInfo(MetadataInfo.builder()
+        .type(MetadataType.TABLE)
+        .key(MetadataInfo.GENERAL_INFO_KEY)
+        .build())
+      .location(new Path("/tmp", tableName))
+      .schema(schema)
+      .metadataStatistics(Collections.emptyList())
+      .columnsStatistics(columnsStatistics)
+      .partitionKeys(Collections.emptyMap())
+      .lastModifiedTime(currentTime.toInstant().toEpochMilli())
+      .build();
+
+    metastore.tables().modify()
+      .overwrite(tableNoSchema.toMetadataUnit(), tableWithSchema.toMetadataUnit())
+      .execute();
+
+    List<String> columns = Arrays.asList(
+      InfoSchemaConstants.SHRD_COL_TABLE_CATALOG,
+      InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA,
+      InfoSchemaConstants.SHRD_COL_TABLE_NAME,
+      InfoSchemaConstants.COLS_COL_COLUMN_NAME,
+      InfoSchemaConstants.COLS_COL_ORDINAL_POSITION,
+      InfoSchemaConstants.COLS_COL_COLUMN_DEFAULT,
+      InfoSchemaConstants.COLS_COL_IS_NULLABLE,
+      InfoSchemaConstants.COLS_COL_DATA_TYPE,
+      InfoSchemaConstants.COLS_COL_CHARACTER_MAXIMUM_LENGTH,
+      InfoSchemaConstants.COLS_COL_CHARACTER_OCTET_LENGTH,
+      InfoSchemaConstants.COLS_COL_NUMERIC_PRECISION,
+      InfoSchemaConstants.COLS_COL_NUMERIC_PRECISION_RADIX,
+      InfoSchemaConstants.COLS_COL_NUMERIC_SCALE,
+      InfoSchemaConstants.COLS_COL_DATETIME_PRECISION,
+      InfoSchemaConstants.COLS_COL_INTERVAL_TYPE,
+      InfoSchemaConstants.COLS_COL_INTERVAL_PRECISION,
+      InfoSchemaConstants.COLS_COL_COLUMN_SIZE,
+      InfoSchemaConstants.COLS_COL_COLUMN_FORMAT,
+      InfoSchemaConstants.COLS_COL_NUM_NULLS,
+      InfoSchemaConstants.COLS_COL_MIN_VAL,
+      InfoSchemaConstants.COLS_COL_MAX_VAL,
+      InfoSchemaConstants.COLS_COL_NDV,
+      InfoSchemaConstants.COLS_COL_EST_NUM_NON_NULLS,
+      InfoSchemaConstants.COLS_COL_IS_NESTED);
+
+    client.testBuilder()
+      .sqlQuery("select %s from information_schema.`columns` where table_name " +
+        "in ('%s', '%s')", String.join(", ", columns), tableNoSchema.getTableInfo().name(), tableName)
+      .unOrdered()
+      .baselineColumns(columns.toArray(new String[0]))
+      .baselineValues("DRILL", "dfs.tmp", tableName, "bigint_col", 1, null, "YES", "BIGINT", null, null,
+        0, 2, 0, null, null, null, 20, null, 100L, null, null, 10.5D, null, false)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "decimal_col", 2, null, "YES", "DECIMAL", null, null,
+        10, 10, 2, null, null, null, 12, null, null, null, null, null, null, false)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "interval_col", 3, null, "NO", "INTERVAL", null, null,
+        null, null, null, null, "INTERVAL YEAR TO MONTH", 0, 9, null, null, null, null, null, null, false)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "array_col", 4, null, "NO", "ARRAY", null, null,
+        null, null, null, null, null, null, 0, null, null, null, null, null, null, false)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "struct_col", 5, null, "NO", "STRUCT", null, null,
+        null, null, null, null, null, null, 0, null, null, null, null, null, null, false)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "struct_col.struct_bigint", 5, null, "YES", "BIGINT", null, null,
+        0, 2, 0, null, null, null, 20, null, null, null, null, null, 10.5D, true)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "struct_col.struct_varchar", 5, null, "NO", "CHARACTER VARYING", 65535, 65535,
+        null, null, null, null, null, null, 65535, null, null, null, null, null, null, true)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "struct_col.nested_struct", 5, null, "NO", "STRUCT", null, null,
+        null, null, null, null, null, null, 0, null, null, null, null, null, null, true)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "struct_col.nested_struct.nested_struct_boolean", 5, null, "YES", "BOOLEAN", null, null,
+        null, null, null, null, null, null, 1, null, null, null, null, null, null, true)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "struct_col.nested_struct.nested_struct_varchar", 5, null, "NO", "CHARACTER VARYING", 65535, 65535,
+        null, null, null, null, null, null, 65535, null, null, "bbb", "ccc", null, null, true)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "varchar_col", 6, "ABC", "NO", "CHARACTER VARYING", 65535, 65535,
+        null, null, null, null, null, null, 65535, null, null, "aaa", "zzz", null, null, false)
+      .baselineValues("DRILL", "dfs.tmp", tableName, "timestamp_col", 7, null, "NO", "TIMESTAMP", null, null,
+        null, null, null, 19, null, null, 19, "yyyy-MM-dd HH:mm:ss", null, null, null, null, null, false)
+      .go();
+  }
+
+  @Test
+  public void testPartitions() throws Exception {
+    String tableName = "table_with_partitions";
+    ZonedDateTime currentTime = currentUtcTime();
+
+    TableInfo tableInfo = TableInfo.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .name(tableName)
+      .type("PARQUET")
+      .build();
+
+    SegmentMetadata defaultSegment = SegmentMetadata.builder()
+      .tableInfo(tableInfo)
+      .metadataInfo(MetadataInfo.builder()
+        .type(MetadataType.SEGMENT)
+        .key(MetadataInfo.DEFAULT_SEGMENT_KEY)
+        .build())
+      .path(new Path("/tmp", tableName))
+      .locations(Collections.emptySet())
+      .metadataStatistics(Collections.emptyList())
+      .columnsStatistics(Collections.emptyMap())
+      .lastModifiedTime(currentTime.toInstant().toEpochMilli())
+      .build();
+
+    SegmentMetadata segment = SegmentMetadata.builder()
+      .tableInfo(tableInfo)
+      .metadataInfo(MetadataInfo.builder()
+        .type(MetadataType.SEGMENT)
+        .key("part_int=3")
+        .identifier("part_int=3")
+        .build())
+      .column(SchemaPath.parseFromString("dir0"))
+      .partitionValues(Collections.singletonList("part_int=3"))
+      .path(new Path(String.format("/tmp/%s/part_int=3", tableName)))
+      .locations(Collections.emptySet())
+      .metadataStatistics(Collections.emptyList())
+      .columnsStatistics(Collections.emptyMap())
+      .lastModifiedTime(currentTime.toInstant().toEpochMilli())
+      .build();
+
+    PartitionMetadata partition = PartitionMetadata.builder()
+      .tableInfo(tableInfo)
+      .metadataInfo(MetadataInfo.builder()
+        .type(MetadataType.PARTITION)
+        .key("part_int=3")
+        .identifier("part_int=3/part_varchar=g")
+        .build())
+      .column(SchemaPath.parseFromString("part_varchar"))
+      .partitionValues(Collections.singletonList("g"))
+      .locations(Collections.emptySet())
+      .metadataStatistics(Collections.emptyList())
+      .columnsStatistics(Collections.emptyMap())
+      .lastModifiedTime(currentTime.toInstant().toEpochMilli())
+      .build();
+
+    metastore.tables().modify()
+      .overwrite(defaultSegment.toMetadataUnit(), segment.toMetadataUnit(), partition.toMetadataUnit())
+      .execute();
+
+    List<String> columns = Arrays.asList(
+      InfoSchemaConstants.SHRD_COL_TABLE_CATALOG,
+      InfoSchemaConstants.SHRD_COL_TABLE_SCHEMA,
+      InfoSchemaConstants.SHRD_COL_TABLE_NAME,
+      InfoSchemaConstants.PARTITIONS_COL_METADATA_KEY,
+      InfoSchemaConstants.PARTITIONS_COL_METADATA_TYPE,
+      InfoSchemaConstants.PARTITIONS_COL_METADATA_IDENTIFIER,
+      InfoSchemaConstants.PARTITIONS_COL_PARTITION_COLUMN,
+      InfoSchemaConstants.PARTITIONS_COL_PARTITION_VALUE,
+      InfoSchemaConstants.PARTITIONS_COL_LOCATION,
+      InfoSchemaConstants.PARTITIONS_COL_LAST_MODIFIED_TIME);
+
+    client.testBuilder()
+      .sqlQuery("select %s from information_schema.`partitions` where table_name = '%s'",
+        String.join(", ", columns), tableName)
+      .unOrdered()
+      .baselineColumns(columns.toArray(new String[0]))
+      .baselineValues("DRILL", "dfs.tmp", tableName, "part_int=3", MetadataType.SEGMENT.name(),
+        "part_int=3", "`dir0`", "part_int=3", "/tmp/table_with_partitions/part_int=3", currentTime.toLocalDateTime())
+      .baselineValues("DRILL", "dfs.tmp", tableName, "part_int=3", MetadataType.PARTITION.name(),
+        "part_int=3/part_varchar=g", "`part_varchar`", "g", null, currentTime.toLocalDateTime())
+      .go();
+  }
+
+  private ZonedDateTime currentUtcTime() {
+    ZonedDateTime currentTime = ZonedDateTime.of(LocalDateTime.now(), ZoneId.systemDefault());
+    return currentTime.withZoneSameInstant(ZoneId.of("UTC"));
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestSchemaCaseInsensitivity.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestSchemaCaseInsensitivity.java
index c274e0b..05b7f3c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestSchemaCaseInsensitivity.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestSchemaCaseInsensitivity.java
@@ -17,16 +17,19 @@
  */
 package org.apache.drill.exec.sql;
 
+import org.apache.drill.categories.SqlTest;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterFixtureBuilder;
 import org.apache.drill.test.ClusterTest;
 import org.apache.drill.test.QueryBuilder;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+@Category(SqlTest.class)
 public class TestSchemaCaseInsensitivity extends ClusterTest {
 
   @BeforeClass
@@ -57,7 +60,7 @@
 
   @Test
   public void testDescribeTable() throws Exception {
-    checkRecordCount(4, "describe Information_Schema.`Tables`");
+    checkRecordCount(8, "describe Information_Schema.`Tables`");
     checkRecordCount(1, "describe Information_Schema.`Tables` Table_Catalog");
     checkRecordCount(1, "describe Information_Schema.`Tables` '%Catalog'");
     checkRecordCount(6, "describe SyS.Version");
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
index bda294d..af29f06 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestViewSupport.java
@@ -428,7 +428,8 @@
 
       // Test record in INFORMATION_SCHEMA.TABLES
       testBuilder()
-          .sqlQuery("SELECT * FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME = '%s'", viewName)
+          .sqlQuery("SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE" +
+            " FROM INFORMATION_SCHEMA.`TABLES` WHERE TABLE_NAME = '%s'", viewName)
           .unOrdered()
           .baselineColumns("TABLE_CATALOG", "TABLE_SCHEMA", "TABLE_NAME", "TABLE_TYPE")
           .baselineValues("DRILL", DFS_TMP_SCHEMA, viewName, "VIEW")
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
index 2e785b7..21503de 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java
@@ -149,13 +149,14 @@
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<TableMetadata> tables = resp.getTablesList();
-    assertEquals(19, tables.size());
+    assertEquals(20, tables.size());
 
     verifyTable("information_schema", "CATALOGS", tables);
     verifyTable("information_schema", "COLUMNS", tables);
     verifyTable("information_schema", "SCHEMATA", tables);
     verifyTable("information_schema", "TABLES", tables);
     verifyTable("information_schema", "VIEWS", tables);
+    verifyTable("information_schema", "PARTITIONS", tables);
     verifyTable("information_schema", "FILES", tables);
     //Verify System Tables
     for (SystemTable sysTbl : SystemTable.values()) {
@@ -182,13 +183,14 @@
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<TableMetadata> tables = resp.getTablesList();
-    assertEquals(19, tables.size());
+    assertEquals(20, tables.size());
 
     verifyTable("information_schema", "CATALOGS", tables);
     verifyTable("information_schema", "COLUMNS", tables);
     verifyTable("information_schema", "SCHEMATA", tables);
     verifyTable("information_schema", "TABLES", tables);
     verifyTable("information_schema", "VIEWS", tables);
+    verifyTable("information_schema", "PARTITIONS", tables);
     verifyTable("information_schema", "FILES", tables);
     //Verify System Tables
     for (SystemTable sysTbl : SystemTable.values()) {
@@ -240,7 +242,7 @@
 
     assertEquals(RequestStatus.OK, resp.getStatus());
     List<ColumnMetadata> columns = resp.getColumnsList();
-    assertEquals(140, columns.size());
+    assertEquals(162, columns.size());
     // too many records to verify the output.
   }
 
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java
index 58b7de6..c9b497a 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestInformationSchemaColumns.java
@@ -406,7 +406,7 @@
 
   @Test
   public void testMetadataHasInterimNumberOfColumns() throws SQLException {
-    assertThat( "column count", rowsMetadata.getColumnCount(), equalTo( 16 ) );
+    assertThat( "column count", rowsMetadata.getColumnCount(), equalTo( 24 ) );
   }
 
 
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
index ab854aa..8acedbc 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
@@ -129,8 +129,10 @@
       .sql("SELECT TABLE_NAME, COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS " +
         "WHERE TABLE_NAME NOT LIKE 'C%' AND COLUMN_NAME LIKE 'TABLE_%E'")
       .returns(
+        "TABLE_NAME=PARTITIONS; COLUMN_NAME=TABLE_NAME\n" +
         "TABLE_NAME=TABLES; COLUMN_NAME=TABLE_NAME\n" +
         "TABLE_NAME=TABLES; COLUMN_NAME=TABLE_TYPE\n" +
+        "TABLE_NAME=TABLES; COLUMN_NAME=TABLE_SOURCE\n" +
         "TABLE_NAME=VIEWS; COLUMN_NAME=TABLE_NAME\n"
       );
   }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java
index 6cc84fc..e12f33c 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java
@@ -42,10 +42,8 @@
 
   public static final String LAST_MODIFIED_TIME = "lastModifiedTime";
   public static final String PATH = "path";
-  public static final String METADATA_KEY = "metadataKey";
   public static final String LOCATION = "location";
   public static final String COLUMN = "column";
-  public static final String METADATA_TYPE = "metadataType";
   public static final String INTERESTING_COLUMNS = "interestingColumns";
   public static final String PARTITION_KEYS = "partitionKeys";
 
@@ -415,7 +413,7 @@
       .tableInfo(tableInfo)
       .locations(locations)
       .metadataType(MetadataType.SEGMENT.name())
-      .requestColumns(METADATA_KEY, LAST_MODIFIED_TIME)
+      .requestColumns(MetadataInfo.METADATA_KEY, LAST_MODIFIED_TIME)
       .build();
 
     return request(requestMetadata).stream()
@@ -604,10 +602,10 @@
         addFilter(LOCATION, location, filters);
         addFilter(LOCATION, locations, filters);
         addFilter(COLUMN, column, filters);
-        addFilter(METADATA_TYPE, metadataType, filters);
-        addFilter(METADATA_TYPE, metadataTypes, filters);
-        addFilter(METADATA_KEY, metadataKey, filters);
-        addFilter(METADATA_KEY, metadataKeys, filters);
+        addFilter(MetadataInfo.METADATA_TYPE, metadataType, filters);
+        addFilter(MetadataInfo.METADATA_TYPE, metadataTypes, filters);
+        addFilter(MetadataInfo.METADATA_KEY, metadataKey, filters);
+        addFilter(MetadataInfo.METADATA_KEY, metadataKeys, filters);
         addFilter(PATH, path, filters);
         addFilter(PATH, paths, filters);
         if (customFilter != null) {
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java
index 2eeb8e1..344cf43 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java
@@ -34,6 +34,8 @@
   public static final String GENERAL_INFO_KEY = "GENERAL_INFO";
   public static final String DEFAULT_SEGMENT_KEY = "DEFAULT_SEGMENT";
   public static final String DEFAULT_COLUMN_PREFIX = "_$SEGMENT_";
+  public static final String METADATA_TYPE = "metadataType";
+  public static final String METADATA_KEY = "metadataKey";
 
   private final MetadataType type;
   private final String key;
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
index 9c6c45c..2d95255 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
@@ -19,6 +19,7 @@
 
 import org.apache.drill.categories.MetastoreTest;
 import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataInfo;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -109,7 +110,7 @@
 
     FilterExpression expected = FilterExpression.and(
       FilterExpression.in(BasicTablesRequests.LOCATION, locations),
-      FilterExpression.in(BasicTablesRequests.METADATA_KEY, metadataKeys));
+      FilterExpression.in(MetadataInfo.METADATA_KEY, metadataKeys));
 
     assertEquals(expected.toString(), requestMetadata.filter().toString());
   }
@@ -128,7 +129,7 @@
 
     FilterExpression expected = FilterExpression.and(
       FilterExpression.equal(BasicTablesRequests.COLUMN, column),
-      FilterExpression.in(BasicTablesRequests.METADATA_KEY, metadataKeys),
+      FilterExpression.in(MetadataInfo.METADATA_KEY, metadataKeys),
       customFilter);
 
     assertEquals(expected.toString(), requestMetadata.filter().toString());
diff --git a/pom.xml b/pom.xml
index e6fa5a4..36684cc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -744,7 +744,28 @@
             <execution>
               <id>default-test</id>
               <phase>test</phase>
-              <goals><goal>test</goal></goals>
+              <goals>
+                <goal>test</goal>
+              </goals>
+            <!-- TODO: Remove excludedGroups after DRILL-7393 is fixed. -->
+            <configuration>
+              <excludedGroups>org.apache.drill.categories.MetastoreTest,${excludedGroups}</excludedGroups>
+            </configuration>
+          </execution>
+          <!--
+              All Metastore tests must run in separate a JVM to ensure
+              that Guava Preconditions class is patched before execution.
+              TODO: Remove execution block for metastore-test after DRILL-7393 is fixed.
+          -->
+            <execution>
+              <id>metastore-test</id>
+              <phase>test</phase>
+              <goals>
+                <goal>test</goal>
+              </goals>
+              <configuration>
+                <groups>org.apache.drill.categories.MetastoreTest</groups>
+              </configuration>
             </execution>
           </executions>
           <dependencies>