DRILL-7706: Implement Drill RDBMS Metastore

1. Fix issue with undeterministic execution of batch update / delete statements, now they will be executed in the same order as they were added.
2. Abstracted Metastore common test classes to be used by different Metastore implementations.
3. Added drill-metastore-override-example.conf with example of Drill Metastore configuration.
4. Replaced list of metadata types which are required to be passed during read / write operations with set to avoid possible duplicates.
5. Add RDBMS Metastore implementation, README.md and unit tests.
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 24c14a3..ddffb44 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -91,6 +91,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.drill.metastore</groupId>
+      <artifactId>drill-rdbms-metastore</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <exclusions>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index f37f195..05fd203 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -35,6 +35,7 @@
         <include>org.apache.drill.exec:drill-java-exec:jar</include>
         <include>org.apache.drill.metastore:drill-metastore-api:jar</include>
         <include>org.apache.drill.metastore:drill-iceberg-metastore:jar</include>
+        <include>org.apache.drill.metastore:drill-rdbms-metastore:jar</include>
         <include>org.apache.drill.contrib.storage-hive:drill-storage-hive-core:jar</include>
         <include>org.apache.drill.contrib.storage-hive:drill-hive-exec-shaded:jar</include>
         <include>org.apache.drill.contrib.data:tpch-sample-data:jar</include>
@@ -377,6 +378,11 @@
       <outputDirectory>conf</outputDirectory>
       <fileMode>0640</fileMode>
     </file>
+    <file>
+      <source>src/main/resources/drill-metastore-override-example.conf</source>
+      <outputDirectory>conf</outputDirectory>
+      <fileMode>0640</fileMode>
+    </file>
   </files>
 
 </component>
diff --git a/distribution/src/main/resources/drill-metastore-override-example.conf b/distribution/src/main/resources/drill-metastore-override-example.conf
new file mode 100644
index 0000000..d2742a8
--- /dev/null
+++ b/distribution/src/main/resources/drill-metastore-override-example.conf
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Configuration for the Drill Metastore.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.metastore: {
+  # For Drill Iceberg Metastore use: org.apache.drill.metastore.iceberg.IcebergMetastore
+  implementation.class: "org.apache.drill.metastore.rdbms.RdbmsMetastore",
+
+  # If implementation class is RdbmsMetastore and no data source config is indicated,
+  # file based embedded SQLine database will be used, can be used only for Drill in embedded mode.
+
+  # For distriubuted mode, indicate data source details to the database accessible for all drillbits.
+  rdbms: {
+    # Connection details to the Drill Metastore database (PostgreSQL, MySQL)
+    data_source: {
+      # Note: driver must be inluded into Drill classpath before Drill start up
+      # driver: "org.postgresql.Driver",
+      # url: "jdbc:postgresql://localhost:1234/mydb?currentSchema=drill_metastore",
+      # username: "user",
+      # password: "password",
+      properties: {
+        # List of Hikari properties: https://github.com/brettwooldridge/HikariCP
+        # maxIdle : 8
+      }
+    }
+  }
+
+  iceberg: {
+    # File system config can be specified
+    config.properties: {
+      # Iceberg tables location will be created under default file system
+      # taken from Hadoop configuration unless the below property is set
+      # fs.defaultFS: "file:///"
+    }
+
+    # Iceberg Metastore location is constructed based on
+    # combination of base_path and relative_path config values
+    location: {
+      # Iceberg table base path, if not indicated, user home directory will be used
+      # base_path: "/app",
+
+      # Iceberg table relative path unique among clusters
+      # ${drill.exec.zk.root} value will be substituted from Drill main config
+      # relative_path: ${drill.exec.zk.root}"/metastore/iceberg"
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
index d720b31..1866f03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/FileMetadataInfoCollector.java
@@ -63,13 +63,13 @@
  * Implementation of {@link MetadataInfoCollector} for file-based tables.
  */
 public class FileMetadataInfoCollector implements MetadataInfoCollector {
-  private final List<MetadataInfo> allMetaToHandle;
   private final List<MetadataInfo> metadataToRemove;
 
   private final BasicTablesRequests basicRequests;
   private final TableInfo tableInfo;
   private final MetadataType metadataLevel;
 
+  private List<MetadataInfo> allMetaToHandle;
   private List<MetadataInfo> rowGroupsInfo = Collections.emptyList();
   private List<MetadataInfo> filesInfo = Collections.emptyList();
   private Multimap<Integer, MetadataInfo> segmentsInfo = ArrayListMultimap.create();
@@ -84,7 +84,6 @@
     this.basicRequests = basicRequests;
     this.tableInfo = tableInfo;
     this.metadataLevel = metadataLevel;
-    this.allMetaToHandle = new ArrayList<>();
     this.metadataToRemove = new ArrayList<>();
     init(selection, settings, tableScanSupplier, interestingColumns, segmentColumnsCount);
   }
@@ -220,14 +219,12 @@
           .collect(Collectors.toList());
 
       List<MetadataInfo> segmentsToUpdate = getMetadataInfoList(selectionRoot, scanAndRemovedFiles, MetadataType.SEGMENT, 0);
-      Streams.concat(allSegments.values().stream(), allFilesInfo.stream(), allRowGroupsInfo.stream())
+      allMetaToHandle = Streams.concat(allSegments.values().stream(), allFilesInfo.stream(), allRowGroupsInfo.stream())
           .filter(child -> segmentsToUpdate.stream().anyMatch(parent -> MetadataIdentifierUtils.isMetadataKeyParent(parent.identifier(), child.identifier())))
           .filter(parent ->
               removedFilesMetadata.stream().noneMatch(child -> MetadataIdentifierUtils.isMetadataKeyParent(parent.identifier(), child.identifier()))
                   || filesInfo.stream().anyMatch(child -> MetadataIdentifierUtils.isMetadataKeyParent(parent.identifier(), child.identifier())))
-          .forEach(allMetaToHandle::add);
-
-      allMetaToHandle.addAll(segmentsToUpdate);
+          .collect(Collectors.toList());
 
       // removed top-level segments are handled separately since their metadata is not overridden when producing writing to the Metastore
       List<MetadataInfo> removedTopSegments = getMetadataInfoList(selectionRoot, removedFiles, MetadataType.SEGMENT, 0).stream()
@@ -236,6 +233,10 @@
                   && allFilesInfo.stream().noneMatch(child -> MetadataIdentifierUtils.isMetadataKeyParent(parent.identifier(), child.identifier())))
           .collect(Collectors.toList());
       metadataToRemove.addAll(removedTopSegments);
+
+      segmentsToUpdate.stream()
+          .filter(segment -> !removedTopSegments.contains(segment))
+          .forEach(allMetaToHandle::add);
     } else {
       // table metadata may still be actual
       outdated = false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataControllerContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataControllerContext.java
index b6b6b80..0b5167f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataControllerContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metastore/analyze/MetadataControllerContext.java
@@ -188,7 +188,6 @@
       Objects.requireNonNull(tableInfo, "tableInfo was not set");
       Objects.requireNonNull(location, "location was not set");
       Objects.requireNonNull(segmentColumns, "segmentColumns were not set");
-      Objects.requireNonNull(metadataToHandle, "metadataToHandle was not set");
       Objects.requireNonNull(metadataToRemove, "metadataToRemove was not set");
       return new MetadataControllerContext(this);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
index 9c0e8af..1bcb1ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java
@@ -286,7 +286,7 @@
       metadataUnits.addAll(getMetadataUnits(reader, 0));
     }
 
-    if (!metadataToHandle.isEmpty()) {
+    if (metadataToHandle != null) {
       // leaves only table metadata and metadata which belongs to segments to be overridden
       metadataUnits = metadataUnits.stream()
           .filter(tableMetadataUnit ->
@@ -308,7 +308,9 @@
       metadataUnits.addAll(metadata);
     }
 
-    boolean insertDefaultSegment = metadataUnits.stream()
+    // checks whether metadataUnits contains not only table metadata before adding default segment
+    // to avoid case when only table metadata should be updated and / or root segments removed
+    boolean insertDefaultSegment = metadataUnits.size() > 1 && metadataUnits.stream()
         .noneMatch(metadataUnit -> metadataUnit.metadataType().equals(MetadataType.SEGMENT.name()));
 
     if (insertDefaultSegment) {
@@ -333,6 +335,7 @@
     return tableMetadataUnit.toBuilder()
         .metadataType(MetadataType.SEGMENT.name())
         .metadataKey(MetadataInfo.DEFAULT_SEGMENT_KEY)
+        .metadataIdentifier(MetadataInfo.DEFAULT_SEGMENT_KEY)
         .owner(null)
         .tableType(null)
         .metadataStatistics(Collections.emptyList())
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
index 3ec98d4..5c03c61 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SchemaUtilites.java
@@ -218,7 +218,7 @@
 
     if (isRootSchema(schema)) {
       throw UserException.validationError()
-          .message("Root schema is immutable. Creating or dropping tables/views is not allowed in root schema." +
+          .message("Root schema is immutable. Drill does not allow creating or deleting tables or views in the root schema. " +
               "Select a schema using 'USE schema' command.")
           .build(logger);
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
index 856788c..23a6b1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/MetastoreAnalyzeTableHandler.java
@@ -229,7 +229,7 @@
 
     MetastoreTableInfo metastoreTableInfo = basicRequests.metastoreTableInfo(tableInfo);
 
-    List<MetadataInfo> allMetaToHandle = new ArrayList<>();
+    List<MetadataInfo> allMetaToHandle = null;
     List<MetadataInfo> metadataToRemove = new ArrayList<>();
 
     // Step 1: checks whether table metadata is present in the Metastore to determine
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
index 107a544..e916149 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RecordCollector.java
@@ -50,6 +50,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.drill.exec.planner.types.DrillRelDataTypeSystem.DRILL_REL_DATATYPE_SYSTEM;
@@ -387,11 +388,13 @@
 
         metadataHolder.segments().stream()
           .filter(segment -> filterEvaluator.shouldVisitTable(schemaPath, segment.getTableInfo().name(), Schema.TableType.TABLE))
+          .filter(segmentMetadata -> Objects.nonNull(segmentMetadata.getPartitionValues()))
           .map(segment -> Records.Partition.fromSegment(IS_CATALOG_NAME, schemaPath, segment))
           .forEach(records::addAll);
 
         metadataHolder.partitions().stream()
           .filter(partition -> filterEvaluator.shouldVisitTable(schemaPath, partition.getTableInfo().name(), Schema.TableType.TABLE))
+          .filter(partitionMetadata -> Objects.nonNull(partitionMetadata.getPartitionValues()))
           .map(partition -> Records.Partition.fromPartition(IS_CATALOG_NAME, schemaPath, partition))
           .forEach(records::addAll);
       }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index b25e2b2..09d9dbf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -556,12 +556,12 @@
             .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
             .go();
 
-        List<MetadataType> emptyMetadataLevels = Arrays.stream(MetadataType.values())
+        Set<MetadataType> emptyMetadataLevels = Arrays.stream(MetadataType.values())
             .filter(metadataType -> metadataType.compareTo(analyzeLevel) > 0
                 // for the case when there are no segment metadata, default segment is present
                 && metadataType.compareTo(MetadataType.SEGMENT) > 0
                 && metadataType.compareTo(MetadataType.ALL) < 0)
-            .collect(Collectors.toList());
+            .collect(Collectors.toSet());
 
         BasicTablesRequests.RequestMetadata requestMetadata = BasicTablesRequests.RequestMetadata.builder()
             .tableInfo(tableInfo)
diff --git a/metastore/iceberg-metastore/pom.xml b/metastore/iceberg-metastore/pom.xml
index 173f681..d62dba8 100644
--- a/metastore/iceberg-metastore/pom.xml
+++ b/metastore/iceberg-metastore/pom.xml
@@ -145,6 +145,13 @@
       <version>${asm.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.drill.metastore</groupId>
+      <artifactId>drill-metastore-api</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/MetastoreContext.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastoreContext.java
similarity index 97%
rename from metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/MetastoreContext.java
rename to metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastoreContext.java
index 933910d..9bf78fc 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/MetastoreContext.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastoreContext.java
@@ -27,7 +27,7 @@
  *
  * @param <T> Metastore component unit type
  */
-public interface MetastoreContext<T> {
+public interface IcebergMetastoreContext<T> {
 
   /**
    * Returns Iceberg table implementation used as storage for Metastore component data.
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
index 860a14e..7493e82 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java
@@ -26,7 +26,7 @@
 import org.apache.drill.metastore.operate.Modify;
 import org.apache.drill.metastore.operate.Read;
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
-import org.apache.drill.metastore.iceberg.MetastoreContext;
+import org.apache.drill.metastore.iceberg.IcebergMetastoreContext;
 import org.apache.drill.metastore.iceberg.operate.IcebergMetadata;
 import org.apache.drill.metastore.iceberg.schema.IcebergTableSchema;
 import org.apache.drill.metastore.iceberg.operate.IcebergModify;
@@ -42,7 +42,7 @@
  * Metastore Tables component which stores tables metadata in the corresponding Iceberg table.
  * Provides methods to read and modify tables metadata.
  */
-public class IcebergTables implements Tables, MetastoreContext<TableMetadataUnit> {
+public class IcebergTables implements Tables, IcebergMetastoreContext<TableMetadataUnit> {
 
   /**
    * Metastore Tables component partition keys, order of partitioning will be determined based
@@ -64,7 +64,7 @@
     this.expirationHandler = new ExpirationHandler(table);
   }
 
-  public MetastoreContext<TableMetadataUnit> context() {
+  public IcebergMetastoreContext<TableMetadataUnit> context() {
     return this;
   }
 
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesOperationTransformer.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesOperationTransformer.java
index e9fca18..ec76107 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesOperationTransformer.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesOperationTransformer.java
@@ -19,7 +19,7 @@
 
 import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
-import org.apache.drill.metastore.iceberg.MetastoreContext;
+import org.apache.drill.metastore.iceberg.IcebergMetastoreContext;
 import org.apache.drill.metastore.iceberg.operate.Overwrite;
 import org.apache.drill.metastore.iceberg.transform.OperationTransformer;
 import org.apache.iceberg.expressions.Expression;
@@ -36,7 +36,7 @@
  */
 public class TablesOperationTransformer extends OperationTransformer<TableMetadataUnit> {
 
-  public TablesOperationTransformer(MetastoreContext<TableMetadataUnit> context) {
+  public TablesOperationTransformer(IcebergMetastoreContext<TableMetadataUnit> context) {
     super(context);
   }
 
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesTransformer.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesTransformer.java
index cf37fad..036cc6d 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesTransformer.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/TablesTransformer.java
@@ -18,7 +18,7 @@
 package org.apache.drill.metastore.iceberg.components.tables;
 
 import org.apache.drill.metastore.components.tables.TableMetadataUnit;
-import org.apache.drill.metastore.iceberg.MetastoreContext;
+import org.apache.drill.metastore.iceberg.IcebergMetastoreContext;
 import org.apache.drill.metastore.iceberg.transform.InputDataTransformer;
 import org.apache.drill.metastore.iceberg.transform.OperationTransformer;
 import org.apache.drill.metastore.iceberg.transform.OutputDataTransformer;
@@ -33,9 +33,9 @@
  */
 public class TablesTransformer implements Transformer<TableMetadataUnit> {
 
-  private final MetastoreContext<TableMetadataUnit> context;
+  private final IcebergMetastoreContext<TableMetadataUnit> context;
 
-  public TablesTransformer(MetastoreContext<TableMetadataUnit> context) {
+  public TablesTransformer(IcebergMetastoreContext<TableMetadataUnit> context) {
     this.context = context;
   }
 
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
index a06f749..2e8e763 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java
@@ -18,7 +18,7 @@
 package org.apache.drill.metastore.iceberg.operate;
 
 import org.apache.drill.metastore.expressions.FilterExpression;
-import org.apache.drill.metastore.iceberg.MetastoreContext;
+import org.apache.drill.metastore.iceberg.IcebergMetastoreContext;
 import org.apache.drill.metastore.iceberg.transform.OperationTransformer;
 import org.apache.drill.metastore.operate.AbstractModify;
 import org.apache.drill.metastore.operate.MetadataTypeValidator;
@@ -38,30 +38,37 @@
  */
 public class IcebergModify<T> extends AbstractModify<T> {
 
-  private final MetastoreContext<T> context;
+  private final OperationTransformer<T> transformer;
+  private final IcebergMetastoreContext<T> context;
+  private final List<IcebergOperation> operations = new ArrayList<>();
 
-  public IcebergModify(MetadataTypeValidator metadataTypeValidator, MetastoreContext<T> context) {
+  public IcebergModify(MetadataTypeValidator metadataTypeValidator, IcebergMetastoreContext<T> context) {
     super(metadataTypeValidator);
     this.context = context;
+    this.transformer = context.transformer().operation();
   }
 
   @Override
   public void execute() {
-    OperationTransformer<T> transformer = context.transformer().operation();
-    List<IcebergOperation> operations = new ArrayList<>(transformer.toOverwrite(overwriteUnits));
-    operations.addAll(transformer.toDelete(deletes));
-
     if (operations.isEmpty()) {
       return;
     }
-
     executeOperations(operations);
   }
 
   @Override
   public void purge() {
-    executeOperations(Collections.singletonList(
-      context.transformer().operation().toDelete((FilterExpression) null)));
+    executeOperations(Collections.singletonList(transformer.toDelete((FilterExpression) null)));
+  }
+
+  @Override
+  protected void addOverwrite(List<T> units) {
+    operations.addAll(transformer.toOverwrite(units));
+  }
+
+  @Override
+  protected void addDelete(org.apache.drill.metastore.operate.Delete delete) {
+    operations.add(transformer.toDelete(delete));
   }
 
   private void executeOperations(List<IcebergOperation> operations) {
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergRead.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergRead.java
index 0610ec9..1ce9a4b 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergRead.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergRead.java
@@ -18,7 +18,7 @@
 package org.apache.drill.metastore.iceberg.operate;
 
 import org.apache.drill.metastore.MetastoreColumn;
-import org.apache.drill.metastore.iceberg.MetastoreContext;
+import org.apache.drill.metastore.iceberg.IcebergMetastoreContext;
 import org.apache.drill.metastore.iceberg.transform.FilterTransformer;
 import org.apache.drill.metastore.operate.AbstractRead;
 import org.apache.drill.metastore.operate.MetadataTypeValidator;
@@ -35,13 +35,15 @@
  * Implementation of {@link Read} interface based on {@link AbstractRead} parent class.
  * Reads information from Iceberg table based on given filter expression.
  * Supports reading information for specific columns.
+ *
+ * @param <T> Metastore component unit type
  */
 public class IcebergRead<T> extends AbstractRead<T> {
 
-  private final MetastoreContext<T> context;
+  private final IcebergMetastoreContext<T> context;
   private final String[] defaultColumns;
 
-  public IcebergRead(MetadataTypeValidator metadataTypeValidator, MetastoreContext<T> context) {
+  public IcebergRead(MetadataTypeValidator metadataTypeValidator, IcebergMetastoreContext<T> context) {
     super(metadataTypeValidator);
     this.context = context;
     this.defaultColumns = context.table().schema().columns().stream()
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterTransformer.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterTransformer.java
index 03fa171..feb827b 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterTransformer.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/FilterTransformer.java
@@ -26,6 +26,7 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -34,10 +35,10 @@
  */
 public class FilterTransformer {
 
-  private final FilterExpression.Visitor<Expression> visitor = FilterExpressionVisitor.get();
+  private static final FilterExpression.Visitor<Expression> FILTER_VISITOR = FilterExpressionVisitor.get();
 
   public Expression transform(FilterExpression filter) {
-    return filter == null ? Expressions.alwaysTrue() : filter.accept(visitor);
+    return filter == null ? Expressions.alwaysTrue() : filter.accept(FILTER_VISITOR);
   }
 
   public Expression transform(Map<MetastoreColumn, Object> conditions) {
@@ -57,17 +58,17 @@
       expressions.subList(2, expressions.size()).toArray(new Expression[0]));
   }
 
-  public Expression transform(List<MetadataType> metadataTypes) {
+  public Expression transform(Set<MetadataType> metadataTypes) {
     if (metadataTypes.contains(MetadataType.ALL)) {
       return Expressions.alwaysTrue();
     }
 
-    List<String> inConditionValues = metadataTypes.stream()
+    Set<String> inConditionValues = metadataTypes.stream()
       .map(Enum::name)
-      .collect(Collectors.toList());
+      .collect(Collectors.toSet());
 
     if (inConditionValues.size() == 1) {
-      return Expressions.equal(MetastoreColumn.METADATA_TYPE.columnName(), inConditionValues.get(0));
+      return Expressions.equal(MetastoreColumn.METADATA_TYPE.columnName(), inConditionValues.iterator().next());
     }
 
     return Expressions.in(MetastoreColumn.METADATA_TYPE.columnName(), inConditionValues);
diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/OperationTransformer.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/OperationTransformer.java
index ab27771..77827df 100644
--- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/OperationTransformer.java
+++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/transform/OperationTransformer.java
@@ -18,7 +18,7 @@
 package org.apache.drill.metastore.iceberg.transform;
 
 import org.apache.drill.metastore.expressions.FilterExpression;
-import org.apache.drill.metastore.iceberg.MetastoreContext;
+import org.apache.drill.metastore.iceberg.IcebergMetastoreContext;
 import org.apache.drill.metastore.iceberg.operate.Delete;
 import org.apache.drill.metastore.iceberg.operate.Overwrite;
 import org.apache.drill.metastore.iceberg.write.File;
@@ -28,7 +28,6 @@
 
 import java.util.List;
 import java.util.UUID;
-import java.util.stream.Collectors;
 
 /**
  * Base class to transforms given input into
@@ -38,9 +37,9 @@
  */
 public abstract class OperationTransformer<T> {
 
-  protected final MetastoreContext<T> context;
+  protected final IcebergMetastoreContext<T> context;
 
-  protected OperationTransformer(MetastoreContext<T> context) {
+  protected OperationTransformer(IcebergMetastoreContext<T> context) {
     this.context = context;
   }
 
@@ -68,12 +67,10 @@
     return new Delete(context.transformer().filter().transform(filter));
   }
 
-  public List<Delete> toDelete(List<org.apache.drill.metastore.operate.Delete> deletes) {
+  public Delete toDelete(org.apache.drill.metastore.operate.Delete delete) {
+    // metadata types are ignored during delete since they are not part of the partition key
     FilterTransformer filterTransformer = context.transformer().filter();
-    return deletes.stream()
-      // metadata types are ignored during delete since they are not part of the partition key
-      .map(delete -> new Delete(filterTransformer.transform(delete.filter())))
-      .collect(Collectors.toList());
+    return new Delete(filterTransformer.transform(delete.filter()));
   }
 
   /**
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/IcebergBaseTest.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/IcebergBaseTest.java
index 51db136..d8740b4 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/IcebergBaseTest.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/IcebergBaseTest.java
@@ -61,7 +61,7 @@
    * @param base Iceberg Metastore base path
    * @return {@link Config} instance
    */
-  protected static Config baseIcebergConfig(File base) {
+  public static Config baseIcebergConfig(File base) {
     return DrillConfig.create()
       .withValue(IcebergConfigConstants.BASE_PATH,
         ConfigValueFactory.fromAnyRef(new Path(base.toURI().getPath()).toUri().getPath()))
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergBasicTablesRequests.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergBasicTablesRequests.java
new file mode 100644
index 0000000..b048b05
--- /dev/null
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergBasicTablesRequests.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.iceberg.components.tables;
+
+import org.apache.drill.metastore.components.tables.MetastoreTableInfo;
+import org.apache.drill.metastore.components.tables.AbstractBasicTablesRequestsTest;
+import org.apache.drill.metastore.iceberg.IcebergBaseTest;
+import org.apache.drill.metastore.iceberg.IcebergMetastore;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestIcebergBasicTablesRequests extends AbstractBasicTablesRequestsTest {
+
+  @BeforeClass
+  public static void init() throws Exception {
+    innerInit(IcebergBaseTest.baseIcebergConfig(defaultFolder.newFolder("iceberg-metastore")), IcebergMetastore.class);
+  }
+
+  @Test
+  public void testMetastoreTableInfoExistingTable() {
+    MetastoreTableInfo metastoreTableInfo = basicRequests.metastoreTableInfo(nationTableInfo);
+    assertTrue(metastoreTableInfo.isExists());
+    assertEquals(nationTableInfo, metastoreTableInfo.tableInfo());
+    assertEquals(nationTable.lastModifiedTime(), metastoreTableInfo.lastModifiedTime());
+    assertTrue(metastoreTableInfo.metastoreVersion() > 0);
+  }
+}
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastore.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastore.java
index cd01386..f483386 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastore.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastore.java
@@ -17,496 +17,15 @@
  */
 package org.apache.drill.metastore.iceberg.components.tables;
 
-import com.typesafe.config.ConfigValueFactory;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.metastore.MetastoreColumn;
-import org.apache.drill.metastore.components.tables.Tables;
-import org.apache.drill.metastore.metadata.MetadataType;
-import org.apache.drill.metastore.operate.Delete;
-import org.apache.drill.metastore.operate.Metadata;
-import org.apache.drill.metastore.Metastore;
-import org.apache.drill.metastore.components.tables.TableMetadataUnit;
-import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.components.tables.AbstractTablesMetastoreTest;
 import org.apache.drill.metastore.iceberg.IcebergBaseTest;
 import org.apache.drill.metastore.iceberg.IcebergMetastore;
-import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
-import org.apache.drill.metastore.metadata.TableInfo;
-import org.apache.iceberg.TableProperties;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.BeforeClass;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+public class TestIcebergTablesMetastore extends AbstractTablesMetastoreTest {
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-
-public class TestIcebergTablesMetastore extends IcebergBaseTest {
-
-  private static final String COMPONENTS_COMMON_PROPERTIES_PATTERN = IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES + ".%s";
-  private static final String COMPONENTS_TABLES_PROPERTIES_PATTERN = IcebergConfigConstants.COMPONENTS_TABLES_PROPERTIES + ".%s";
-
-  @Rule
-  public TemporaryFolder baseLocation = new TemporaryFolder();
-
-  @Test
-  public void testCreationWithoutProperties() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-
-    Metastore metastore = new IcebergMetastore(config);
-    assertTrue(metastore.tables().metadata().properties().isEmpty());
-  }
-
-  @Test
-  public void testCreationWithCommonProperties() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
-      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
-        ConfigValueFactory.fromAnyRef(10))
-      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.MANIFEST_MIN_MERGE_COUNT),
-        ConfigValueFactory.fromAnyRef(2)));
-
-    Metastore metastore = new IcebergMetastore(config);
-    Map<String, String> expected = new HashMap<>();
-    expected.put(TableProperties.SPLIT_SIZE, "10");
-    expected.put(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2");
-    assertEquals(expected, metastore.tables().metadata().properties());
-  }
-
-  @Test
-  public void testCreationWithCommonAndComponentProperties() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
-      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
-        ConfigValueFactory.fromAnyRef(10))
-      .withValue(String.format(COMPONENTS_TABLES_PROPERTIES_PATTERN, TableProperties.MANIFEST_MIN_MERGE_COUNT),
-        ConfigValueFactory.fromAnyRef(2)));
-
-    Metastore metastore = new IcebergMetastore(config);
-    Map<String, String> expected = new HashMap<>();
-    expected.put(TableProperties.SPLIT_SIZE, "10");
-    expected.put(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2");
-    assertEquals(expected, metastore.tables().metadata().properties());
-  }
-
-  @Test
-  public void testCreationWithComponentPropertiesPrecedence() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
-      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
-        ConfigValueFactory.fromAnyRef(10))
-      .withValue(String.format(COMPONENTS_TABLES_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
-        ConfigValueFactory.fromAnyRef(100)));
-
-    Metastore metastore = new IcebergMetastore(config);
-    assertEquals(Collections.singletonMap(TableProperties.SPLIT_SIZE, "100"),
-      metastore.tables().metadata().properties());
-  }
-
-  @Test
-  public void testLoadWithoutProperties() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-
-    Metastore initialMetastore = new IcebergMetastore(config);
-    assertTrue(initialMetastore.tables().metadata().properties().isEmpty());
-
-    Metastore newMetastore = new IcebergMetastore(config);
-    assertTrue(newMetastore.tables().metadata().properties().isEmpty());
-  }
-
-  @Test
-  public void testLoadWithSameProperties() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
-      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
-        ConfigValueFactory.fromAnyRef(10)));
-
-    Map<String, String> initialProperties = Collections.singletonMap(TableProperties.SPLIT_SIZE, "10");
-
-    Metastore initialMetastore = new IcebergMetastore(config);
-    assertEquals(initialProperties, initialMetastore.tables().metadata().properties());
-
-    Metastore newMetastore = new IcebergMetastore(config);
-    assertEquals(initialProperties, newMetastore.tables().metadata().properties());
-  }
-
-  @Test
-  public void testLoadWithUpdatedProperties() {
-    DrillConfig initialConfig = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
-      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
-        ConfigValueFactory.fromAnyRef(10))
-      .withValue(String.format(COMPONENTS_TABLES_PROPERTIES_PATTERN, TableProperties.MANIFEST_MIN_MERGE_COUNT),
-        ConfigValueFactory.fromAnyRef(2)));
-
-    Map<String, String> initialProperties = new HashMap<>();
-    initialProperties.put(TableProperties.SPLIT_SIZE, "10");
-    initialProperties.put(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2");
-
-    Metastore initialMetastore = new IcebergMetastore(initialConfig);
-    assertEquals(initialProperties, initialMetastore.tables().metadata().properties());
-
-    DrillConfig newConfig = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
-      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
-        ConfigValueFactory.fromAnyRef(100))
-      .withValue(String.format(COMPONENTS_TABLES_PROPERTIES_PATTERN, TableProperties.COMMIT_NUM_RETRIES),
-        ConfigValueFactory.fromAnyRef(5)));
-
-    Map<String, String> newProperties = new HashMap<>();
-    newProperties.put(TableProperties.SPLIT_SIZE, "100");
-    newProperties.put(TableProperties.COMMIT_NUM_RETRIES, "5");
-
-    Metastore newMetastore = new IcebergMetastore(newConfig);
-    assertEquals(newProperties, newMetastore.tables().metadata().properties());
-  }
-
-  @Test
-  public void testNewInstance() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-    Metastore metastore = new IcebergMetastore(config);
-
-    assertNotSame(metastore.tables(), metastore.tables());
-  }
-
-  @Test
-  public void testVersionInitial() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-    Metastore metastore = new IcebergMetastore(config);
-    Metadata metadata = metastore.tables().metadata();
-    assertTrue(metadata.supportsVersioning());
-    assertEquals(0, metadata.version());
-  }
-
-  @Test
-  public void testVersionUpdate() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-
-    Tables tables = new IcebergMetastore(config).tables();
-    Metadata metadata = tables.metadata();
-
-    assertTrue(metadata.supportsVersioning());
-    assertEquals(0, metadata.version());
-
-    tables.modify()
-      .overwrite(TableMetadataUnit.builder()
-        .storagePlugin("dfs")
-        .workspace("tmp")
-        .tableName("nation")
-        .metadataKey("dir0")
-        .build())
-      .execute();
-
-    assertNotEquals(0, metadata.version());
-  }
-
-  @Test
-  public void testWriteReadAllFieldTypes() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-    Tables tables = new IcebergMetastore(config).tables();
-
-    Map<String, String> columnStatistics = new HashMap<>();
-    columnStatistics.put("stat1", "val1");
-    columnStatistics.put("stat2", "val2");
-
-    TableInfo tableInfo = TableInfo.builder()
-      .storagePlugin("dfs")
-      .workspace("tmp")
-      .name("nation")
-      .build();
-
-    TableMetadataUnit unit = TableMetadataUnit.builder()
-      .storagePlugin(tableInfo.storagePlugin())
-      .workspace(tableInfo.workspace())
-      .tableName(tableInfo.name())
-      .metadataKey("dir0")
-      .rowGroupIndex(1)
-      .lastModifiedTime(System.currentTimeMillis())
-      .partitionValues(Collections.emptyList())
-      .locations(Arrays.asList("a", "b", "c"))
-      .hostAffinity(Collections.emptyMap())
-      .columnsStatistics(columnStatistics)
-      .build();
-
-    tables.modify()
-      .overwrite(unit)
-      .execute();
-
-    List<TableMetadataUnit> units = tables.read()
-      .metadataType(MetadataType.ALL)
-      .filter(tableInfo.toFilter())
-      .execute();
-
-    assertEquals(1, units.size());
-    assertEquals(unit, units.get(0));
-  }
-
-  @Test
-  public void testReadSelectedColumns() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-    Tables tables = new IcebergMetastore(config).tables();
-
-    TableInfo tableInfo = TableInfo.builder()
-      .storagePlugin("dfs")
-      .workspace("tmp")
-      .name("nation")
-      .build();
-
-    TableMetadataUnit unit = TableMetadataUnit.builder()
-      .storagePlugin(tableInfo.storagePlugin())
-      .workspace(tableInfo.workspace())
-      .tableName(tableInfo.name())
-      .metadataKey("dir0")
-      .build();
-
-    tables.modify()
-      .overwrite(unit)
-      .execute();
-
-    List<TableMetadataUnit> units = tables.read()
-      .metadataType(MetadataType.ALL)
-      .filter(tableInfo.toFilter())
-      .columns(MetastoreColumn.TABLE_NAME, MetastoreColumn.METADATA_KEY)
-      .execute();
-
-    assertEquals(1, units.size());
-    assertEquals(TableMetadataUnit.builder().tableName("nation").metadataKey("dir0").build(), units.get(0));
-  }
-
-  @Test
-  public void testReadNoResult() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-    Tables tables = new IcebergMetastore(config).tables();
-
-    List<TableMetadataUnit> units = tables.read()
-      .metadataType(MetadataType.ALL)
-      .filter(FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"))
-      .columns(MetastoreColumn.TABLE_NAME, MetastoreColumn.METADATA_KEY)
-      .execute();
-
-    assertTrue(units.isEmpty());
-  }
-
-  @Test
-  public void testOverwrite() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-    Tables tables = new IcebergMetastore(config).tables();
-
-    TableInfo tableInfo = TableInfo.builder()
-      .storagePlugin("dfs")
-      .workspace("tmp")
-      .name("nation")
-      .build();
-
-    TableMetadataUnit initialUnit = TableMetadataUnit.builder()
-      .storagePlugin(tableInfo.storagePlugin())
-      .workspace(tableInfo.workspace())
-      .tableName(tableInfo.name())
-      .metadataKey("dir0")
-      .metadataType(MetadataType.TABLE.name())
-      .tableType("parquet")
-      .build();
-
-    tables.modify()
-      .overwrite(initialUnit)
-      .execute();
-
-    List<TableMetadataUnit> units = tables.read()
-      .metadataType(MetadataType.TABLE)
-      .filter(tableInfo.toFilter())
-      .execute();
-
-    assertEquals(1, units.size());
-    assertEquals(initialUnit, units.get(0));
-
-    TableMetadataUnit updatedUnit = TableMetadataUnit.builder()
-      .storagePlugin("dfs")
-      .workspace("tmp")
-      .tableName("nation")
-      .metadataKey("dir0")
-      .metadataType(MetadataType.TABLE.name())
-      .tableType("text")
-      .build();
-
-    tables.modify()
-      .overwrite(updatedUnit)
-      .execute();
-
-    List<TableMetadataUnit> updatedUnits = tables.read()
-      .metadataType(MetadataType.TABLE)
-      .filter(tableInfo.toFilter())
-      .execute();
-
-    assertEquals(1, updatedUnits.size());
-    assertEquals(updatedUnit, updatedUnits.get(0));
-  }
-
-  @Test
-  public void testDelete() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-    Tables tables = new IcebergMetastore(config).tables();
-
-    TableInfo tableInfo = TableInfo.builder()
-      .storagePlugin("dfs")
-      .workspace("tmp")
-      .name("nation")
-      .build();
-
-    TableMetadataUnit firstUnit = TableMetadataUnit.builder()
-      .storagePlugin(tableInfo.storagePlugin())
-      .workspace(tableInfo.workspace())
-      .tableName(tableInfo.name())
-      .metadataKey("dir0")
-      .metadataType(MetadataType.SEGMENT.name())
-      .build();
-
-    TableMetadataUnit secondUnit = TableMetadataUnit.builder()
-      .storagePlugin(tableInfo.storagePlugin())
-      .workspace(tableInfo.workspace())
-      .tableName(tableInfo.name())
-      .metadataKey("dir1")
-      .metadataType(MetadataType.SEGMENT.name())
-      .build();
-
-    tables.modify()
-      .overwrite(firstUnit, secondUnit)
-      .execute();
-
-    List<TableMetadataUnit> units = tables.read()
-      .metadataType(MetadataType.SEGMENT)
-      .filter(tableInfo.toFilter())
-      .execute();
-
-    assertEquals(2, units.size());
-
-    FilterExpression deleteFilter = FilterExpression.and(
-      tableInfo.toFilter(),
-      FilterExpression.equal(MetastoreColumn.METADATA_KEY, "dir0"));
-
-    tables.modify()
-      .delete(Delete.builder()
-        .metadataType(MetadataType.SEGMENT)
-        .filter(deleteFilter)
-        .build())
-      .execute();
-
-    List<TableMetadataUnit> updatedUnits = tables.read()
-      .metadataType(MetadataType.SEGMENT)
-      .filter(tableInfo.toFilter())
-      .execute();
-
-    assertEquals(1, updatedUnits.size());
-    assertEquals(secondUnit, updatedUnits.get(0));
-  }
-
-  @Test
-  public void testOverwriteAndDeleteInOneTransaction() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-    Tables tables = new IcebergMetastore(config).tables();
-
-    TableInfo tableInfo = TableInfo.builder()
-      .storagePlugin("dfs")
-      .workspace("tmp")
-      .name("nation")
-      .build();
-
-    TableMetadataUnit firstUnit = TableMetadataUnit.builder()
-      .storagePlugin(tableInfo.storagePlugin())
-      .workspace(tableInfo.workspace())
-      .tableName(tableInfo.name())
-      .metadataKey("dir0")
-      .metadataType(MetadataType.SEGMENT.name())
-      .tableType("parquet")
-      .build();
-
-    TableMetadataUnit secondUnit = TableMetadataUnit.builder()
-      .storagePlugin(tableInfo.storagePlugin())
-      .workspace(tableInfo.workspace())
-      .tableName(tableInfo.name())
-      .metadataKey("dir1")
-      .metadataType(MetadataType.SEGMENT.name())
-      .tableType("parquet")
-      .build();
-
-    tables.modify()
-      .overwrite(firstUnit, secondUnit)
-      .execute();
-
-    List<TableMetadataUnit> units = tables.read()
-      .metadataType(MetadataType.SEGMENT)
-      .filter(tableInfo.toFilter())
-      .execute();
-
-    assertEquals(2, units.size());
-
-    FilterExpression deleteFilter = FilterExpression.and(
-      tableInfo.toFilter(),
-      FilterExpression.equal(MetastoreColumn.METADATA_KEY, "dir0"));
-
-    TableMetadataUnit updatedUnit = TableMetadataUnit.builder()
-      .storagePlugin(tableInfo.storagePlugin())
-      .workspace(tableInfo.workspace())
-      .tableName(tableInfo.name())
-      .metadataKey("dir1")
-      .metadataType(MetadataType.SEGMENT.name())
-      .tableType("text")
-      .build();
-
-    tables.modify()
-      .delete(Delete.builder()
-        .metadataType(MetadataType.SEGMENT)
-        .filter(deleteFilter)
-        .build())
-      .overwrite(updatedUnit)
-      .execute();
-
-    List<TableMetadataUnit> updatedUnits = tables.read()
-      .metadataType(MetadataType.SEGMENT)
-      .filter(tableInfo.toFilter())
-      .execute();
-
-    assertEquals(1, updatedUnits.size());
-    assertEquals(updatedUnit, updatedUnits.get(0));
-  }
-
-  @Test
-  public void testPurge() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
-    Tables tables = new IcebergMetastore(config).tables();
-
-    TableMetadataUnit firstUnit = TableMetadataUnit.builder()
-      .storagePlugin("dfs")
-      .workspace("tmp")
-      .tableName("nation")
-      .metadataKey("dir0")
-      .tableType("parquet")
-      .build();
-
-    TableMetadataUnit secondUnit = TableMetadataUnit.builder()
-      .storagePlugin("s3")
-      .workspace("tmp")
-      .tableName("nation")
-      .metadataKey("dir0")
-      .tableType("parquet")
-      .build();
-
-    tables.modify()
-      .overwrite(firstUnit, secondUnit)
-      .execute();
-
-    List<TableMetadataUnit> initialUnits = tables.read()
-      .metadataType(MetadataType.ALL)
-      .execute();
-
-    assertEquals(2, initialUnits.size());
-
-    tables.modify()
-      .purge();
-
-    List<TableMetadataUnit> resultingUnits = tables.read()
-      .metadataType(MetadataType.ALL)
-      .execute();
-
-    assertTrue(resultingUnits.isEmpty());
+  @BeforeClass
+  public static void init() throws Exception {
+    innerInit(IcebergBaseTest.baseIcebergConfig(defaultFolder.newFolder("iceberg-metastore")), IcebergMetastore.class);
   }
 }
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastoreConfigAndVersion.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastoreConfigAndVersion.java
new file mode 100644
index 0000000..426882a
--- /dev/null
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestIcebergTablesMetastoreConfigAndVersion.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.iceberg.components.tables;
+
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.Metastore;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.iceberg.IcebergBaseTest;
+import org.apache.drill.metastore.iceberg.IcebergMetastore;
+import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
+import org.apache.drill.metastore.operate.Metadata;
+import org.apache.iceberg.TableProperties;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+public class TestIcebergTablesMetastoreConfigAndVersion extends IcebergBaseTest {
+
+  private static final String COMPONENTS_COMMON_PROPERTIES_PATTERN = IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES + ".%s";
+  private static final String COMPONENTS_TABLES_PROPERTIES_PATTERN = IcebergConfigConstants.COMPONENTS_TABLES_PROPERTIES + ".%s";
+
+  @Rule
+  public TemporaryFolder baseLocation = new TemporaryFolder();
+
+  @Test
+  public void testCreationWithoutProperties() {
+    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
+
+    Metastore metastore = new IcebergMetastore(config);
+    assertTrue(metastore.tables().metadata().properties().isEmpty());
+  }
+
+  @Test
+  public void testCreationWithCommonProperties() {
+    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
+      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
+        ConfigValueFactory.fromAnyRef(10))
+      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.MANIFEST_MIN_MERGE_COUNT),
+        ConfigValueFactory.fromAnyRef(2)));
+
+    Metastore metastore = new IcebergMetastore(config);
+    Map<String, String> expected = new HashMap<>();
+    expected.put(TableProperties.SPLIT_SIZE, "10");
+    expected.put(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2");
+    assertEquals(expected, metastore.tables().metadata().properties());
+  }
+
+  @Test
+  public void testCreationWithCommonAndComponentProperties() {
+    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
+      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
+        ConfigValueFactory.fromAnyRef(10))
+      .withValue(String.format(COMPONENTS_TABLES_PROPERTIES_PATTERN, TableProperties.MANIFEST_MIN_MERGE_COUNT),
+        ConfigValueFactory.fromAnyRef(2)));
+
+    Metastore metastore = new IcebergMetastore(config);
+    Map<String, String> expected = new HashMap<>();
+    expected.put(TableProperties.SPLIT_SIZE, "10");
+    expected.put(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2");
+    assertEquals(expected, metastore.tables().metadata().properties());
+  }
+
+  @Test
+  public void testCreationWithComponentPropertiesPrecedence() {
+    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
+      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
+        ConfigValueFactory.fromAnyRef(10))
+      .withValue(String.format(COMPONENTS_TABLES_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
+        ConfigValueFactory.fromAnyRef(100)));
+
+    Metastore metastore = new IcebergMetastore(config);
+    assertEquals(Collections.singletonMap(TableProperties.SPLIT_SIZE, "100"),
+      metastore.tables().metadata().properties());
+  }
+
+  @Test
+  public void testLoadWithoutProperties() {
+    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
+
+    Metastore initialMetastore = new IcebergMetastore(config);
+    assertTrue(initialMetastore.tables().metadata().properties().isEmpty());
+
+    Metastore newMetastore = new IcebergMetastore(config);
+    assertTrue(newMetastore.tables().metadata().properties().isEmpty());
+  }
+
+  @Test
+  public void testLoadWithSameProperties() {
+    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
+      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
+        ConfigValueFactory.fromAnyRef(10)));
+
+    Map<String, String> initialProperties = Collections.singletonMap(TableProperties.SPLIT_SIZE, "10");
+
+    Metastore initialMetastore = new IcebergMetastore(config);
+    assertEquals(initialProperties, initialMetastore.tables().metadata().properties());
+
+    Metastore newMetastore = new IcebergMetastore(config);
+    assertEquals(initialProperties, newMetastore.tables().metadata().properties());
+  }
+
+  @Test
+  public void testLoadWithUpdatedProperties() {
+    DrillConfig initialConfig = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
+      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
+        ConfigValueFactory.fromAnyRef(10))
+      .withValue(String.format(COMPONENTS_TABLES_PROPERTIES_PATTERN, TableProperties.MANIFEST_MIN_MERGE_COUNT),
+        ConfigValueFactory.fromAnyRef(2)));
+
+    Map<String, String> initialProperties = new HashMap<>();
+    initialProperties.put(TableProperties.SPLIT_SIZE, "10");
+    initialProperties.put(TableProperties.MANIFEST_MIN_MERGE_COUNT, "2");
+
+    Metastore initialMetastore = new IcebergMetastore(initialConfig);
+    assertEquals(initialProperties, initialMetastore.tables().metadata().properties());
+
+    DrillConfig newConfig = new DrillConfig(baseIcebergConfig(baseLocation.getRoot())
+      .withValue(String.format(COMPONENTS_COMMON_PROPERTIES_PATTERN, TableProperties.SPLIT_SIZE),
+        ConfigValueFactory.fromAnyRef(100))
+      .withValue(String.format(COMPONENTS_TABLES_PROPERTIES_PATTERN, TableProperties.COMMIT_NUM_RETRIES),
+        ConfigValueFactory.fromAnyRef(5)));
+
+    Map<String, String> newProperties = new HashMap<>();
+    newProperties.put(TableProperties.SPLIT_SIZE, "100");
+    newProperties.put(TableProperties.COMMIT_NUM_RETRIES, "5");
+
+    Metastore newMetastore = new IcebergMetastore(newConfig);
+    assertEquals(newProperties, newMetastore.tables().metadata().properties());
+  }
+
+  @Test
+  public void testNewInstance() {
+    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
+    Metastore metastore = new IcebergMetastore(config);
+
+    assertNotSame(metastore.tables(), metastore.tables());
+  }
+
+  @Test
+  public void testVersionInitial() {
+    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
+    Metastore metastore = new IcebergMetastore(config);
+    Metadata metadata = metastore.tables().metadata();
+    assertTrue(metadata.supportsVersioning());
+    assertEquals(0, metadata.version());
+  }
+
+  @Test
+  public void testVersionUpdate() {
+    DrillConfig config = new DrillConfig(baseIcebergConfig(baseLocation.getRoot()));
+
+    Tables tables = new IcebergMetastore(config).tables();
+    Metadata metadata = tables.metadata();
+
+    assertTrue(metadata.supportsVersioning());
+    assertEquals(0, metadata.version());
+
+    tables.modify()
+      .overwrite(TableMetadataUnit.builder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("dir0")
+        .build())
+      .execute();
+
+    assertNotEquals(0, metadata.version());
+  }
+}
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java
index ac444fc..68cceba 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestTablesOperationTransformer.java
@@ -93,7 +93,7 @@
   }
 
   @Test
-  public void testToDeleteOperation() {
+  public void testToDeleteOperationByFilter() {
     FilterExpression filter = FilterExpression.and(
       FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"),
       FilterExpression.equal(MetastoreColumn.WORKSPACE, "tmp"));
@@ -108,19 +108,16 @@
   }
 
   @Test
-  public void testToDeleteOperations() {
-    org.apache.drill.metastore.operate.Delete dfs = org.apache.drill.metastore.operate.Delete.builder()
+  public void testToDeleteOperation() {
+    Expression expected = Expressions.equal(MetastoreColumn.STORAGE_PLUGIN.columnName(), "dfs");
+
+    org.apache.drill.metastore.operate.Delete delete = org.apache.drill.metastore.operate.Delete.builder()
       .metadataType(MetadataType.ALL)
       .filter(FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"))
       .build();
 
-    org.apache.drill.metastore.operate.Delete s3 = org.apache.drill.metastore.operate.Delete.builder()
-      .metadataType(MetadataType.ALL)
-      .filter(FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "s3"))
-      .build();
+    Delete operation = transformer.toDelete(delete);
 
-    List<Delete> operations = transformer.toDelete(Arrays.asList(dfs, s3));
-
-    assertEquals(2, operations.size());
+    assertEquals(expected.toString(), operation.filter().toString());
   }
 }
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/transform/TestFilterTransformer.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/transform/TestFilterTransformer.java
index 487bbab..633b894 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/transform/TestFilterTransformer.java
+++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/transform/TestFilterTransformer.java
@@ -21,12 +21,12 @@
 import org.apache.drill.metastore.expressions.FilterExpression;
 import org.apache.drill.metastore.iceberg.IcebergBaseTest;
 import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -238,7 +238,7 @@
     Expression expected = Expressions.alwaysTrue();
 
     Expression actual = transformer.transform(
-      Arrays.asList(MetadataType.PARTITION, MetadataType.FILE, MetadataType.ALL));
+      Sets.newHashSet(MetadataType.PARTITION, MetadataType.FILE, MetadataType.ALL));
 
     assertEquals(expected.toString(), actual.toString());
   }
@@ -247,7 +247,7 @@
   public void testToFilterMetadataTypesOne() {
     Expression expected = Expressions.equal(MetastoreColumn.METADATA_TYPE.columnName(), MetadataType.PARTITION.name());
 
-    Expression actual = transformer.transform(Collections.singletonList(MetadataType.PARTITION));
+    Expression actual = transformer.transform(Collections.singleton(MetadataType.PARTITION));
 
     assertEquals(expected.toString(), actual.toString());
   }
@@ -258,7 +258,7 @@
       MetadataType.PARTITION.name(), MetadataType.FILE.name());
 
     Expression actual = transformer.transform(
-      Arrays.asList(MetadataType.PARTITION, MetadataType.FILE));
+      Sets.newHashSet(MetadataType.PARTITION, MetadataType.FILE));
 
     assertEquals(expected.toString(), actual.toString());
   }
diff --git a/metastore/metastore-api/README.md b/metastore/metastore-api/README.md
index c7dd701..fc75d2d 100644
--- a/metastore/metastore-api/README.md
+++ b/metastore/metastore-api/README.md
@@ -352,6 +352,7 @@
 Metastore component implementation may or may not support transactions. If transactions are supported,
 all operations in one `Modify` instance will be executed fully or not executed at all.
 If Metastore implementation does not support transactions, all operations will be executed consequently.
+Note: operations will be executed in the same order as they were added.
 
 ```
     metastore.tables().modify()
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreRegistry.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreRegistry.java
index 293ab56..8d19942 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreRegistry.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/MetastoreRegistry.java
@@ -21,6 +21,8 @@
 import org.apache.drill.metastore.config.MetastoreConfigConstants;
 import org.apache.drill.metastore.config.MetastoreConfigFileInfo;
 import org.apache.drill.metastore.exceptions.MetastoreException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandle;
 import java.lang.invoke.MethodHandles;
@@ -34,11 +36,17 @@
  */
 public class MetastoreRegistry implements AutoCloseable {
 
-  private DrillConfig config;
+  private static final Logger logger = LoggerFactory.getLogger(MetastoreRegistry.class);
+
+  private final DrillConfig config;
+  // used only for testing to avoid searching overridden configuration files
+  private final boolean useProvided;
   private volatile Metastore metastore;
 
   public MetastoreRegistry(DrillConfig config) {
     this.config = config;
+    this.useProvided = config.hasPath(MetastoreConfigConstants.USE_PROVIDED_CONFIG)
+      && config.getBoolean(MetastoreConfigConstants.USE_PROVIDED_CONFIG);
   }
 
   public Metastore get() {
@@ -53,7 +61,7 @@
   }
 
   private Metastore initMetastore() {
-    DrillConfig metastoreConfig = createMetastoreConfig(config);
+    DrillConfig metastoreConfig = useProvided ? config : createMetastoreConfig(config);
     String metastoreClass = metastoreConfig.getString(MetastoreConfigConstants.IMPLEMENTATION_CLASS);
     if (metastoreClass == null) {
       throw new MetastoreException(
@@ -86,6 +94,8 @@
         String.format("Created instance of [%s] does not implement [%s] interface",
           instance.getClass().getSimpleName(), Metastore.class.getSimpleName()));
     }
+
+    logger.info("Drill Metastore is initiated using {} class", metastoreClass);
     return (Metastore) instance;
   }
 
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java
index 2dee1a6..ad0118a 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/BasicTablesRequests.java
@@ -27,12 +27,15 @@
 import org.apache.drill.metastore.metadata.RowGroupMetadata;
 import org.apache.drill.metastore.metadata.SegmentMetadata;
 import org.apache.drill.metastore.metadata.TableInfo;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -262,9 +265,9 @@
         .map(MetadataInfo::identifier)
         .collect(Collectors.toList());
 
-    List<MetadataType> metadataTypes = metadataInfos.stream()
+    Set<MetadataType> metadataTypes = metadataInfos.stream()
         .map(MetadataInfo::type)
-        .collect(Collectors.toList());
+        .collect(Collectors.toSet());
 
     RequestMetadata requestMetadata = RequestMetadata.builder()
         .tableInfo(tableInfo)
@@ -659,17 +662,17 @@
    */
   public static class RequestMetadata {
 
-    private List<MetadataType> metadataTypes;
+    private Set<MetadataType> metadataTypes;
     private final FilterExpression filter;
     private final List<MetastoreColumn> columns;
 
-    private RequestMetadata(List<MetadataType> metadataTypes, FilterExpression filter, List<MetastoreColumn> columns) {
+    private RequestMetadata(Set<MetadataType> metadataTypes, FilterExpression filter, List<MetastoreColumn> columns) {
       this.metadataTypes = metadataTypes;
       this.filter = filter;
       this.columns = columns;
     }
 
-    public List<MetadataType> metadataTypes() {
+    public Set<MetadataType> metadataTypes() {
       return metadataTypes;
     }
 
@@ -697,7 +700,7 @@
       private List<String> paths;
       private List<String> identifiers;
       private FilterExpression customFilter;
-      private List<MetadataType> metadataTypes = new ArrayList<>();
+      private Set<MetadataType> metadataTypes = new HashSet<>();
       private final List<MetastoreColumn> requestColumns = new ArrayList<>();
 
       public RequestMetadata.Builder metadataType(MetadataType metadataType) {
@@ -706,11 +709,11 @@
       }
 
       public RequestMetadata.Builder metadataTypes(MetadataType... metadataTypes) {
-        this.metadataTypes.addAll(Arrays.asList(metadataTypes));
+        this.metadataTypes.addAll(Sets.newHashSet(metadataTypes));
         return this;
       }
 
-      public RequestMetadata.Builder metadataTypes(List<MetadataType> metadataTypes) {
+      public RequestMetadata.Builder metadataTypes(Set<MetadataType> metadataTypes) {
         this.metadataTypes.addAll(metadataTypes);
         return this;
       }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
index da5d8fc..0fc7fac 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TableMetadataUnit.java
@@ -53,6 +53,8 @@
 
   public static final Schema SCHEMA = Schema.of(TableMetadataUnit.class, Builder.class);
 
+  public static final TableMetadataUnit EMPTY_UNIT = TableMetadataUnit.builder().build();
+
   @MetastoreFieldDefinition(column = MetastoreColumn.STORAGE_PLUGIN, scopes = {ALL}) private final String storagePlugin;
   @MetastoreFieldDefinition(column = MetastoreColumn.WORKSPACE,scopes = {ALL}) private final String workspace;
   @MetastoreFieldDefinition(column = MetastoreColumn.TABLE_NAME, scopes = {ALL}) private final String tableName;
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TablesMetadataTypeValidator.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TablesMetadataTypeValidator.java
index d20467f..857d04e 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TablesMetadataTypeValidator.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/components/tables/TablesMetadataTypeValidator.java
@@ -19,9 +19,9 @@
 
 import org.apache.drill.metastore.metadata.MetadataType;
 import org.apache.drill.metastore.operate.MetadataTypeValidator;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 
-import java.util.Arrays;
-import java.util.List;
+import java.util.Set;
 
 /**
  * Implementation of {@link MetadataTypeValidator} interface which provides
@@ -31,7 +31,7 @@
 
   public static final TablesMetadataTypeValidator INSTANCE = new TablesMetadataTypeValidator();
 
-  private static final List<MetadataType> SUPPORTED_METADATA_TYPES = Arrays.asList(
+  private static final Set<MetadataType> SUPPORTED_METADATA_TYPES = ImmutableSet.of(
     MetadataType.ALL,
     MetadataType.TABLE,
     MetadataType.SEGMENT,
@@ -40,7 +40,7 @@
     MetadataType.PARTITION);
 
   @Override
-  public List<MetadataType> supportedMetadataTypes() {
+  public Set<MetadataType> supportedMetadataTypes() {
     return SUPPORTED_METADATA_TYPES;
   }
 }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/config/MetastoreConfigConstants.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/config/MetastoreConfigConstants.java
index f27eea5..8a5428d 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/config/MetastoreConfigConstants.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/config/MetastoreConfigConstants.java
@@ -44,7 +44,6 @@
    */
   String OVERRIDE_RESOURCE_FILE_NAME = "drill-metastore-override.conf";
 
-
   /**
    * Metastore configuration properties namespace.
    */
@@ -54,4 +53,10 @@
    * Indicates canonical class name of the Metastore implementation class.
    */
   String IMPLEMENTATION_CLASS = BASE + "implementation.class";
+
+  /**
+   * Indicates if provided Drill config should be used without looking for overridden / fallback
+   * resource files in the classpath. Used for testing only.
+   */
+  String USE_PROVIDED_CONFIG = BASE + "config.use_provided";
 }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java
index 038c599..8397e24 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/metadata/MetadataInfo.java
@@ -30,8 +30,21 @@
  * Class that specifies metadata type and metadata information
  * which will be used for obtaining specific metadata from metastore.
  *
- * For example, for table-level metadata, it will be
- * {@code MetadataInfo[MetadataType.TABLE, MetadataInfo.GENERAL_INFO_KEY, null]}.
+ * For example:
+ * <li>For table metadata:
+ * {@code MetadataInfo[MetadataType.TABLE, MetadataInfo.GENERAL_INFO_KEY, MetadataInfo.GENERAL_INFO_KEY]}</li>
+ * <li>For default segment metadata:
+ * {@code MetadataInfo[MetadataType.SEGMENT, MetadataInfo.DEFAULT_SEGMENT_KEY, MetadataInfo.DEFAULT_SEGMENT_KEY]}</li>
+ * <li>For top-level segment metadata:
+ * {@code MetadataInfo[MetadataType.SEGMENT, "1994", "1994"]}</li>
+ * <li>For nested segment metadata:
+ * {@code MetadataInfo[MetadataType.SEGMENT, "1994", "1994/Q1"]}</li>
+ * <li>For file metadata:
+ * {@code MetadataInfo[MetadataType.FILE, "1994", "1994/Q1/0_0_0.parquet"]}</li>
+ * <li>For row group metadata:
+ * {@code MetadataInfo[MetadataType.ROW_GROUP, "1994", "1994/Q1/0_0_0.parquet/1"]}</li>
+ * <li>For partition metadata:
+ * {@code MetadataInfo[MetadataType.PARTITION, "1994", "1994/Q1/01"]}</li>
  */
 @JsonTypeName("metadataInfo")
 @JsonDeserialize(builder = MetadataInfo.MetadataInfoBuilder.class)
@@ -41,6 +54,8 @@
   public static final String DEFAULT_SEGMENT_KEY = "DEFAULT_SEGMENT";
   public static final String DEFAULT_COLUMN_PREFIX = "_$SEGMENT_";
 
+  private static final String UNDEFINED_KEY = "UNDEFINED_KEY";
+
   private final MetadataType type;
   private final String key;
   private final String identifier;
@@ -67,9 +82,7 @@
   }
 
   public void toMetadataUnitBuilder(TableMetadataUnit.Builder builder) {
-    if (type != null) {
-      builder.metadataType(type.name());
-    }
+    builder.metadataType(type.name());
     builder.metadataKey(key);
     builder.metadataIdentifier(identifier);
   }
@@ -136,6 +149,8 @@
 
     public MetadataInfo build() {
       Objects.requireNonNull(type, "type was not set");
+      key = key == null ? UNDEFINED_KEY : key;
+      identifier = identifier == null ? key : identifier;
       return new MetadataInfo(this);
     }
   }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractModify.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractModify.java
index 35abdc9..80cad22 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractModify.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractModify.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.metastore.operate;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -30,9 +29,6 @@
  */
 public abstract class AbstractModify<T> implements Modify<T> {
 
-  protected final List<T> overwriteUnits = new ArrayList<>();
-  protected final List<Delete> deletes = new ArrayList<>();
-
   private final MetadataTypeValidator metadataTypeValidator;
 
   protected AbstractModify(MetadataTypeValidator metadataTypeValidator) {
@@ -40,15 +36,31 @@
   }
 
   @Override
-  public Modify<T> overwrite(List<T> units) {
-    overwriteUnits.addAll(units);
+  public final Modify<T> overwrite(List<T> units) {
+    addOverwrite(units);
     return this;
   }
 
   @Override
-  public Modify<T> delete(Delete delete) {
+  public final Modify<T> delete(Delete delete) {
     metadataTypeValidator.validate(delete.metadataTypes());
-    deletes.add(delete);
+    addDelete(delete);
     return this;
   }
+
+  /**
+   * Adds overwrite operation to the list of pending operations.
+   * Is used to ensure operations execution order.
+   *
+   * @param units list of Metastore metadata units
+   */
+  protected abstract void addOverwrite(List<T> units);
+
+  /**
+   * Adds delete operation to the list of pending operations.
+   * Is used to ensure operations execution order.
+   *
+   * @param delete Metastore delete operation holder
+   */
+  protected abstract void addDelete(Delete delete);
 }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractRead.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractRead.java
index 7fd03d6..02066bd 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractRead.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/AbstractRead.java
@@ -22,7 +22,9 @@
 import org.apache.drill.metastore.metadata.MetadataType;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Abstract implementation of {@link Read<T>} interface which contains
@@ -33,7 +35,7 @@
  */
 public abstract class AbstractRead<T> implements Read<T> {
 
-  protected final List<MetadataType> metadataTypes = new ArrayList<>();
+  protected final Set<MetadataType> metadataTypes = new HashSet<>();
   protected final List<MetastoreColumn> columns = new ArrayList<>();
   protected FilterExpression filter;
 
@@ -44,7 +46,7 @@
   }
 
   @Override
-  public Read<T> metadataTypes(List<MetadataType> metadataTypes) {
+  public Read<T> metadataTypes(Set<MetadataType> metadataTypes) {
     this.metadataTypes.addAll(metadataTypes);
     return this;
   }
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Delete.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Delete.java
index 5a53530..a879d26 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Delete.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Delete.java
@@ -19,23 +19,23 @@
 
 import org.apache.drill.metastore.expressions.FilterExpression;
 import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.StringJoiner;
 
 /**
  * Delete operation holder, it includes filter by which Metastore data will be deleted
- * and list of metadata types to which filter will be applied.
+ * and set of metadata types to which filter will be applied.
  *
- * Note: providing at list one metadata type is required.
+ * Note: providing at least one metadata type is required.
  * If delete operation should be applied to all metadata types,
  * {@link MetadataType#ALL} can be indicated.
  */
 public class Delete {
 
-  private final List<MetadataType> metadataTypes;
+  private final Set<MetadataType> metadataTypes;
   private final FilterExpression filter;
 
   private Delete(Builder builder) {
@@ -47,7 +47,7 @@
     return new Builder();
   }
 
-  public List<MetadataType> metadataTypes() {
+  public Set<MetadataType> metadataTypes() {
     return metadataTypes;
   }
 
@@ -64,16 +64,16 @@
   }
 
   public static class Builder {
-    private final List<MetadataType> metadataTypes = new ArrayList<>();
+    private final Set<MetadataType> metadataTypes = new HashSet<>();
     private FilterExpression filter;
 
-    public Builder metadataTypes(List<MetadataType> metadataTypes) {
+    public Builder metadataTypes(Set<MetadataType> metadataTypes) {
       this.metadataTypes.addAll(metadataTypes);
       return this;
     }
 
     public Builder metadataType(MetadataType... metadataTypes) {
-      return metadataTypes(Arrays.asList(metadataTypes));
+      return metadataTypes(Sets.newHashSet(metadataTypes));
     }
 
     public Builder filter(FilterExpression filter) {
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/MetadataTypeValidator.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/MetadataTypeValidator.java
index 96ba3dd..1bc7e0f 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/MetadataTypeValidator.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/MetadataTypeValidator.java
@@ -20,7 +20,7 @@
 import org.apache.drill.metastore.exceptions.MetastoreException;
 import org.apache.drill.metastore.metadata.MetadataType;
 
-import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -30,9 +30,9 @@
 public interface MetadataTypeValidator {
 
   /**
-   * @return list of supported metadata types for concrete Metastore component unit
+   * @return set of supported metadata types for concrete Metastore component unit
    */
-  List<MetadataType> supportedMetadataTypes();
+  Set<MetadataType> supportedMetadataTypes();
 
   /**
    * Validates if given metadata types contain at least one metadata type
@@ -42,16 +42,16 @@
    * @throws MetastoreException if no metadata types were provided
    *                            or given metadata types contain unsupported types
    */
-  default void validate(List<MetadataType> metadataTypes) {
+  default void validate(Set<MetadataType> metadataTypes) {
     if (metadataTypes == null || metadataTypes.isEmpty()) {
       throw new MetastoreException("Metadata type(s) must be indicated");
     }
 
-    List<MetadataType> supportedMetadataTypes = supportedMetadataTypes();
+    Set<MetadataType> supportedMetadataTypes = supportedMetadataTypes();
 
-    List<MetadataType> unsupportedMetadataTypes = metadataTypes.stream()
+    Set<MetadataType> unsupportedMetadataTypes = metadataTypes.stream()
       .filter(metadataType -> !supportedMetadataTypes.contains(metadataType))
-      .collect(Collectors.toList());
+      .collect(Collectors.toSet());
 
     if (!unsupportedMetadataTypes.isEmpty()) {
       throw new MetastoreException("Unsupported metadata types are detected: " + unsupportedMetadataTypes);
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Modify.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Modify.java
index dae8b4d..f010958 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Modify.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Modify.java
@@ -64,6 +64,7 @@
   /**
    * Executes list of provided metastore operations in one transaction if Metastore implementation
    * supports transactions, otherwise executes operations consecutively.
+   * All operations should be executed in the same order as they were added.
    */
   void execute();
 
diff --git a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Read.java b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Read.java
index 97bd79c..ccbcd4a 100644
--- a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Read.java
+++ b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/operate/Read.java
@@ -20,9 +20,11 @@
 import org.apache.drill.metastore.MetastoreColumn;
 import org.apache.drill.metastore.expressions.FilterExpression;
 import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Drill Metastore Read interface contains methods to be implemented in order
@@ -33,17 +35,17 @@
 public interface Read<T> {
 
   /**
-   * Provides list of metadata types to be read.
+   * Provides set of metadata types to be read.
    * Note: providing at least one metadata type is required.
    * If all metadata types should be read, {@link MetadataType#ALL} can be passed.
    *
-   * @param metadataTypes list of metadata types
+   * @param metadataTypes set of metadata types
    * @return current instance of Read interface implementation
    */
-  Read<T> metadataTypes(List<MetadataType> metadataTypes);
+  Read<T> metadataTypes(Set<MetadataType> metadataTypes);
 
   default Read<T> metadataType(MetadataType... metadataType) {
-    return metadataTypes(Arrays.asList(metadataType));
+    return metadataTypes(Sets.newHashSet(metadataType));
   }
 
   /**
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java
new file mode 100644
index 0000000..a638d15
--- /dev/null
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/TestData.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore;
+
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.metadata.MetadataInfo;
+import org.apache.drill.metastore.metadata.MetadataType;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+public class TestData {
+
+  /**
+   * Returns table metadata unit where all fields are filled in.
+   * Note: data in the fields may be not exactly true to reality.
+   *
+   * @return basic table metadata unit
+   */
+  public static TableMetadataUnit basicTableMetadataUnit() {
+    return TableMetadataUnit.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .tableName("test")
+      .owner("user")
+      .tableType("parquet")
+      .metadataType(MetadataType.NONE.name())
+      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+      .location("/tmp/nation")
+      .interestingColumns(Arrays.asList("`id`", "`name`"))
+      .schema("{\"type\":\"tuple_schema\"," +
+        "\"columns\":[{\"name\":\"id\",\"type\":\"INT\",\"mode\":\"REQUIRED\"}," +
+        "{\"name\":\"name\",\"type\":\"VARCHAR\",\"mode\":\"REQUIRED\"}]," +
+        "\"properties\":{\"drill.strict\":\"true\"}}\n")
+      .columnsStatistics(Collections.singletonMap("`name`", "{\"statistics\":[{\"statisticsValue\":\"aaa\"," +
+        "\"statisticsKind\":{\"exact\":true,\"name\":\"minValue\"}},{\"statisticsValue\":\"zzz\"," +
+        "\"statisticsKind\":{\"exact\":true,\"name\":\"maxValue\"}}],\"type\":\"VARCHAR\"}"))
+      .metadataStatistics(Collections.singletonList("{\"statisticsValue\":2.1," +
+        "\"statisticsKind\":{\"name\":\"approx_count_distinct\"}}"))
+      .lastModifiedTime(System.currentTimeMillis())
+      .partitionKeys(Collections.singletonMap("dir0", "2018"))
+      .additionalMetadata("additional test metadata")
+      .metadataIdentifier("part_int=3/part_varchar=g/0_0_0.parquet")
+      .column("`id`")
+      .locations(Arrays.asList("/tmp/nation/1", "/tmp/nation/2"))
+      .partitionValues(Arrays.asList("1", "2"))
+      .path("/tmp/nation/1/0_0_0.parquet")
+      .rowGroupIndex(0)
+      .hostAffinity(Collections.singletonMap("host1", 0.1F))
+      .build();
+  }
+}
diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestBasicRequests.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java
similarity index 72%
rename from metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestBasicRequests.java
rename to metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java
index 4947113..1c6cd4e 100644
--- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/components/tables/TestBasicRequests.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractBasicTablesRequestsTest.java
@@ -15,18 +15,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.metastore.iceberg.components.tables;
+package org.apache.drill.metastore.components.tables;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.MetastoreTest;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.metastore.MetastoreColumn;
-import org.apache.drill.metastore.components.tables.BasicTablesRequests;
-import org.apache.drill.metastore.components.tables.BasicTablesTransformer;
-import org.apache.drill.metastore.components.tables.MetastoreTableInfo;
-import org.apache.drill.metastore.components.tables.TableMetadataUnit;
-import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.MetastoreRegistry;
+import org.apache.drill.metastore.TestData;
+import org.apache.drill.metastore.config.MetastoreConfigConstants;
 import org.apache.drill.metastore.expressions.FilterExpression;
-import org.apache.drill.metastore.iceberg.IcebergBaseTest;
-import org.apache.drill.metastore.iceberg.IcebergMetastore;
 import org.apache.drill.metastore.metadata.BaseTableMetadata;
 import org.apache.drill.metastore.metadata.FileMetadata;
 import org.apache.drill.metastore.metadata.MetadataInfo;
@@ -35,8 +34,11 @@
 import org.apache.drill.metastore.metadata.RowGroupMetadata;
 import org.apache.drill.metastore.metadata.SegmentMetadata;
 import org.apache.drill.metastore.metadata.TableInfo;
-import org.junit.BeforeClass;
+import org.apache.drill.test.BaseTest;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -50,19 +52,26 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-public class TestBasicRequests extends IcebergBaseTest {
+@Category(MetastoreTest.class)
+public abstract class AbstractBasicTablesRequestsTest extends BaseTest {
 
-  private static Tables tables;
-  private static BasicTablesRequests basicRequests;
-  private static TableMetadataUnit nationTable;
-  private static TableInfo nationTableInfo;
+  @ClassRule
+  public static TemporaryFolder defaultFolder = new TemporaryFolder();
 
-  @BeforeClass
-  public static void init() {
-    DrillConfig config = new DrillConfig(baseIcebergConfig(defaultFolder.getRoot()));
-    tables = new IcebergMetastore(config).tables();
-    prepareData(tables);
+  protected static Tables tables;
+  protected static BasicTablesRequests basicRequests;
+  protected static TableMetadataUnit nationTable;
+  protected static TableInfo nationTableInfo;
+
+  protected static void innerInit(Config config, Class<?> implementationClass) {
+    DrillConfig drillConfig = new DrillConfig(config
+      .withValue(MetastoreConfigConstants.USE_PROVIDED_CONFIG,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(MetastoreConfigConstants.IMPLEMENTATION_CLASS,
+        ConfigValueFactory.fromAnyRef(implementationClass.getName())));
+    tables = new MetastoreRegistry(drillConfig).get().tables();
     basicRequests = tables.basicRequests();
+    prepareData(tables);
   }
 
   @Test
@@ -75,15 +84,6 @@
   }
 
   @Test
-  public void testMetastoreTableInfoExistingTable() {
-    MetastoreTableInfo metastoreTableInfo = basicRequests.metastoreTableInfo(nationTableInfo);
-    assertTrue(metastoreTableInfo.isExists());
-    assertEquals(nationTableInfo, metastoreTableInfo.tableInfo());
-    assertEquals(nationTable.lastModifiedTime(), metastoreTableInfo.lastModifiedTime());
-    assertTrue(metastoreTableInfo.metastoreVersion() > 0);
-  }
-
-  @Test
   public void testHasMetastoreTableInfoChangedFalse() {
     MetastoreTableInfo metastoreTableInfo = basicRequests.metastoreTableInfo(nationTableInfo);
     assertFalse(basicRequests.hasMetastoreTableInfoChanged(metastoreTableInfo));
@@ -181,66 +181,66 @@
   @Test
   public void testSegmentMetadataByMetadataInfosAbsent() {
     List<SegmentMetadata> segmentMetadata = basicRequests.segmentsMetadata(
-        nationTableInfo,
-        Collections.singletonList(MetadataInfo.builder()
-            .type(MetadataType.SEGMENT)
-            .key("part_int=4")
-            .identifier("part_int=4")
-            .build()));
+      nationTableInfo,
+      Collections.singletonList(MetadataInfo.builder()
+        .type(MetadataType.SEGMENT)
+        .key("part_int=4")
+        .identifier("part_int=4")
+        .build()));
     assertTrue(segmentMetadata.isEmpty());
   }
 
   @Test
   public void testSegmentMetadataByMetadataInfosExisting() {
     List<SegmentMetadata> segmentMetadata = basicRequests.segmentsMetadata(
-        nationTableInfo,
-        Arrays.asList(
-            MetadataInfo.builder()
-                .type(MetadataType.SEGMENT)
-                .key("part_int=3")
-                .identifier("part_int=3/d3")
-                .build(),
-            MetadataInfo.builder()
-                .type(MetadataType.SEGMENT)
-                .key("part_int=3")
-                .identifier("part_int=3/d4")
-                .build())
-        );
+      nationTableInfo,
+      Arrays.asList(
+        MetadataInfo.builder()
+          .type(MetadataType.SEGMENT)
+          .key("part_int=3")
+          .identifier("part_int=3/d3")
+          .build(),
+        MetadataInfo.builder()
+          .type(MetadataType.SEGMENT)
+          .key("part_int=3")
+          .identifier("part_int=3/d4")
+          .build())
+    );
     assertEquals(2, segmentMetadata.size());
   }
 
   @Test
   public void testMetadataUnitsByMetadataInfosAbsent() {
     List<TableMetadataUnit> segmentMetadata = basicRequests.metadata(
-        nationTableInfo,
-        Collections.singletonList(MetadataInfo.builder()
-            .type(MetadataType.ROW_GROUP)
-            .key("part_int=4")
-            .identifier("part_int=4")
-            .build()));
+      nationTableInfo,
+      Collections.singletonList(MetadataInfo.builder()
+        .type(MetadataType.ROW_GROUP)
+        .key("part_int=4")
+        .identifier("part_int=4")
+        .build()));
     assertTrue(segmentMetadata.isEmpty());
   }
 
   @Test
   public void testMetadataUnitsByMetadataInfosExisting() {
     List<TableMetadataUnit> segmentMetadata = basicRequests.metadata(
-        nationTableInfo,
-        Arrays.asList(
-            MetadataInfo.builder()
-                .type(MetadataType.SEGMENT)
-                .key("part_int=3")
-                .identifier("part_int=3/d3")
-                .build(),
-            MetadataInfo.builder()
-                .type(MetadataType.SEGMENT)
-                .key("part_int=3")
-                .identifier("part_int=3/d4")
-                .build(),
-            MetadataInfo.builder()
-                .type(MetadataType.PARTITION)
-                .key("part_int=3")
-                .identifier("part_int=4/d5")
-                .build())
+      nationTableInfo,
+      Arrays.asList(
+        MetadataInfo.builder()
+          .type(MetadataType.SEGMENT)
+          .key("part_int=3")
+          .identifier("part_int=3/d3")
+          .build(),
+        MetadataInfo.builder()
+          .type(MetadataType.SEGMENT)
+          .key("part_int=3")
+          .identifier("part_int=3/d4")
+          .build(),
+        MetadataInfo.builder()
+          .type(MetadataType.PARTITION)
+          .key("part_int=3")
+          .identifier("part_int=4/d5")
+          .build())
     );
     assertEquals(2, segmentMetadata.size());
   }
@@ -248,30 +248,30 @@
   @Test
   public void testFilesMetadataByMetadataInfosAbsent() {
     List<FileMetadata> segmentMetadata = basicRequests.filesMetadata(
-        nationTableInfo,
-        Collections.singletonList(MetadataInfo.builder()
-            .type(MetadataType.FILE)
-            .key("part_int=4")
-            .identifier("part_int=4/part_varchar=g/0_0_3.parquet")
-            .build()));
+      nationTableInfo,
+      Collections.singletonList(MetadataInfo.builder()
+        .type(MetadataType.FILE)
+        .key("part_int=4")
+        .identifier("part_int=4/part_varchar=g/0_0_3.parquet")
+        .build()));
     assertTrue(segmentMetadata.isEmpty());
   }
 
   @Test
   public void testFilesMetadataByMetadataInfosExisting() {
     List<FileMetadata> segmentMetadata = basicRequests.filesMetadata(
-        nationTableInfo,
-        Arrays.asList(
-            MetadataInfo.builder()
-                .type(MetadataType.FILE)
-                .key("part_int=4")
-                .identifier("part_int=4/part_varchar=g/0_0_0.parquet")
-                .build(),
-            MetadataInfo.builder()
-                .type(MetadataType.FILE)
-                .key("part_int=3")
-                .identifier("part_int=3/part_varchar=g/0_0_1.parquet")
-                .build())
+      nationTableInfo,
+      Arrays.asList(
+        MetadataInfo.builder()
+          .type(MetadataType.FILE)
+          .key("part_int=4")
+          .identifier("part_int=4/part_varchar=g/0_0_0.parquet")
+          .build(),
+        MetadataInfo.builder()
+          .type(MetadataType.FILE)
+          .key("part_int=3")
+          .identifier("part_int=3/part_varchar=g/0_0_1.parquet")
+          .build())
     );
     assertEquals(2, segmentMetadata.size());
   }
@@ -279,22 +279,22 @@
   @Test
   public void testRowGroupsMetadataByMetadataKeysAndPathsAbsent() {
     List<RowGroupMetadata> segmentMetadata = basicRequests.rowGroupsMetadata(
-        nationTableInfo,
-        Collections.singletonList("part_int=4"),
-        Collections.singletonList("/tmp/nation/part_int=4/part_varchar=g/0_0_3.parquet"));
+      nationTableInfo,
+      Collections.singletonList("part_int=4"),
+      Collections.singletonList("/tmp/nation/part_int=4/part_varchar=g/0_0_3.parquet"));
     assertTrue(segmentMetadata.isEmpty());
   }
 
   @Test
   public void testRowGroupsByMetadataKeysAndPathsExisting() {
     List<RowGroupMetadata> segmentMetadata = basicRequests.rowGroupsMetadata(
-        nationTableInfo,
-        Arrays.asList(
-            "part_int=4",
-            "part_int=3"),
-        Arrays.asList(
-            "/tmp/nation/part_int=4/part_varchar=g/0_0_0.parquet",
-            "/tmp/nation/part_int=3/part_varchar=g/0_0_1.parquet")
+      nationTableInfo,
+      Arrays.asList(
+        "part_int=4",
+        "part_int=3"),
+      Arrays.asList(
+        "/tmp/nation/part_int=4/part_varchar=g/0_0_0.parquet",
+        "/tmp/nation/part_int=3/part_varchar=g/0_0_1.parquet")
     );
     assertEquals(2, segmentMetadata.size());
   }
@@ -302,32 +302,32 @@
   @Test
   public void testRowGroupsMetadataByMetadataInfosAbsent() {
     List<RowGroupMetadata> segmentMetadata = basicRequests.rowGroupsMetadata(
-        nationTableInfo,
-        Collections.singletonList(MetadataInfo.builder()
-            .type(MetadataType.ROW_GROUP)
-            .key("part_int=4")
-            .identifier("part_int=4/part_varchar=g/0_0_3.parquet/1")
-            .build()));
+      nationTableInfo,
+      Collections.singletonList(MetadataInfo.builder()
+        .type(MetadataType.ROW_GROUP)
+        .key("part_int=4")
+        .identifier("part_int=4/part_varchar=g/0_0_3.parquet/1")
+        .build()));
     assertTrue(segmentMetadata.isEmpty());
   }
 
   @Test
   public void testRowGroupsMetadataByMetadataInfosExisting() {
-    List<RowGroupMetadata> segmentMetadata = basicRequests.rowGroupsMetadata(
-        nationTableInfo,
-        Arrays.asList(
-            MetadataInfo.builder()
-                .type(MetadataType.ROW_GROUP)
-                .key("part_int=4")
-                .identifier("part_int=4/part_varchar=g/0_0_0.parquet/1")
-                .build(),
-            MetadataInfo.builder()
-                .type(MetadataType.ROW_GROUP)
-                .key("part_int=3")
-                .identifier("part_int=3/part_varchar=g/0_0_1.parquet/1")
-                .build())
+    List<RowGroupMetadata> rowGroupMetadata = basicRequests.rowGroupsMetadata(
+      nationTableInfo,
+      Arrays.asList(
+        MetadataInfo.builder()
+          .type(MetadataType.ROW_GROUP)
+          .key("part_int=4")
+          .identifier("part_int=4/part_varchar=g/0_0_0.parquet/1")
+          .build(),
+        MetadataInfo.builder()
+          .type(MetadataType.ROW_GROUP)
+          .key("part_int=3")
+          .identifier("part_int=3/part_varchar=g/0_0_0.parquet/1")
+          .build())
     );
-    assertEquals(2, segmentMetadata.size());
+    assertEquals(2, rowGroupMetadata.size());
   }
 
   @Test
@@ -470,7 +470,7 @@
    * @param tables Drill Metastore Tables instance
    */
   private static void prepareData(Tables tables) {
-    TableMetadataUnit basicUnit = basicUnit();
+    TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
 
     nationTable = BaseTableMetadata.builder()
       .metadataUnit(basicUnit.toBuilder()
@@ -611,7 +611,7 @@
 
     TableMetadataUnit nationRowGroup4 = basicRowGroup.toBuilder()
       .metadataKey("part_int=4")
-      .metadataIdentifier("part_int=4/part_varchar=g/0_0_0.parquet/1")
+      .metadataIdentifier("part_int=4/part_varchar=g/0_0_0.parquet/2")
       .location("/tmp/nation/part_int=4/part_varchar=g")
       .path("/tmp/nation/part_int=4/part_varchar=g/0_0_0.parquet")
       .rowGroupIndex(2)
@@ -635,43 +635,4 @@
         regionTable)
       .execute();
   }
-
-  /**
-   * Returns metadata unit where all fields are filled in.
-   * Note: data in the fields may be not exactly true to reality.
-   *
-   * @return basic metadata unit
-   */
-  private static TableMetadataUnit basicUnit() {
-    return TableMetadataUnit.builder()
-      .storagePlugin("dfs")
-      .workspace("tmp")
-      .tableName("test")
-      .owner("user")
-      .tableType("parquet")
-      .metadataType(MetadataType.NONE.name())
-      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
-      .location("/tmp/nation")
-      .interestingColumns(Arrays.asList("`id`", "`name`"))
-      .schema("{\"type\":\"tuple_schema\"," +
-        "\"columns\":[{\"name\":\"id\",\"type\":\"INT\",\"mode\":\"REQUIRED\"}," +
-        "{\"name\":\"name\",\"type\":\"VARCHAR\",\"mode\":\"REQUIRED\"}]," +
-        "\"properties\":{\"drill.strict\":\"true\"}}\n")
-      .columnsStatistics(Collections.singletonMap("`name`", "{\"statistics\":[{\"statisticsValue\":\"aaa\"," +
-        "\"statisticsKind\":{\"exact\":true,\"name\":\"minValue\"}},{\"statisticsValue\":\"zzz\"," +
-        "\"statisticsKind\":{\"exact\":true,\"name\":\"maxValue\"}}],\"type\":\"VARCHAR\"}"))
-      .metadataStatistics(Collections.singletonList("{\"statisticsValue\":2.1," +
-        "\"statisticsKind\":{\"name\":\"approx_count_distinct\"}}"))
-      .lastModifiedTime(System.currentTimeMillis())
-      .partitionKeys(Collections.singletonMap("dir0", "2018"))
-      .additionalMetadata("additional test metadata")
-      .metadataIdentifier("part_int=3/part_varchar=g/0_0_0.parquet")
-      .column("`id`")
-      .locations(Arrays.asList("/tmp/nation/1", "/tmp/nation/2"))
-      .partitionValues(Arrays.asList("1", "2"))
-      .path("/tmp/nation/1/0_0_0.parquet")
-      .rowGroupIndex(0)
-      .hostAffinity(Collections.singletonMap("host1", 0.1F))
-      .build();
-  }
 }
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractTablesMetastoreTest.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractTablesMetastoreTest.java
new file mode 100644
index 0000000..f8bae49
--- /dev/null
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/AbstractTablesMetastoreTest.java
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.components.tables;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.MetastoreTest;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.MetastoreRegistry;
+import org.apache.drill.metastore.config.MetastoreConfigConstants;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataInfo;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.metadata.TableInfo;
+import org.apache.drill.metastore.operate.Delete;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.apache.drill.test.BaseTest;
+import org.junit.After;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(MetastoreTest.class)
+public abstract class AbstractTablesMetastoreTest extends BaseTest {
+
+  @ClassRule
+  public static TemporaryFolder defaultFolder = new TemporaryFolder();
+
+  protected static Tables tables;
+
+  protected static void innerInit(Config config, Class<?> implementationClass) {
+    DrillConfig drillConfig = new DrillConfig(config
+      .withValue(MetastoreConfigConstants.USE_PROVIDED_CONFIG,
+        ConfigValueFactory.fromAnyRef(true))
+      .withValue(MetastoreConfigConstants.IMPLEMENTATION_CLASS,
+        ConfigValueFactory.fromAnyRef(implementationClass.getName())));
+    tables = new MetastoreRegistry(drillConfig).get().tables();
+  }
+
+  @After
+  public void cleanUp() {
+    tables.modify().purge();
+  }
+
+  @Test
+  public void testWriteReadAllFieldTypes() {
+    TableInfo tableInfo = TableInfo.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .name("nation")
+      .build();
+
+    TableMetadataUnit tableUnit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+      .metadataType(MetadataType.TABLE.name())
+      .metadataStatistics(Collections.singletonList("{\"statisticsValue\":2.1," +
+        "\"statisticsKind\":{\"name\":\"approx_count_distinct\"}}"))
+      .columnsStatistics(Collections.singletonMap("`name`", "{\"statistics\":[{\"statisticsValue\":\"aaa\"," +
+        "\"statisticsKind\":{\"exact\":true,\"name\":\"minValue\"}},{\"statisticsValue\":\"zzz\"," +
+        "\"statisticsKind\":{\"exact\":true,\"name\":\"maxValue\"}}],\"type\":\"VARCHAR\"}"))
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    TableMetadataUnit rowGroupUnit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey("1994")
+      .metadataIdentifier("1994/Q1/0_0_0.parquet/1")
+      .metadataType(MetadataType.ROW_GROUP.name())
+      .lastModifiedTime(System.currentTimeMillis())
+      .location("/tmp/nation")
+      .path("/tmp/nation/1/0_0_0.parquet")
+      .rowGroupIndex(1)
+      .hostAffinity(Collections.singletonMap("host1", 0.1F))
+      .build();
+
+    tables.modify()
+      .overwrite(tableUnit, rowGroupUnit)
+      .execute();
+
+    List<TableMetadataUnit> tableUnits = tables.read()
+      .metadataType(MetadataType.TABLE)
+      .filter(tableInfo.toFilter())
+      .execute();
+
+    assertEquals(1, tableUnits.size());
+    assertEquals(tableUnit, tableUnits.get(0));
+
+    List<TableMetadataUnit> rowGroupUnits = tables.read()
+      .metadataType(MetadataType.ROW_GROUP)
+      .filter(tableInfo.toFilter())
+      .execute();
+
+    assertEquals(1, rowGroupUnits.size());
+    assertEquals(rowGroupUnit, rowGroupUnits.get(0));
+  }
+
+  @Test
+  public void testReadSelectedColumns() {
+    TableInfo tableInfo = TableInfo.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .name("nation")
+      .build();
+
+    TableMetadataUnit unit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+      .metadataType(MetadataType.TABLE.name())
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    tables.modify()
+      .overwrite(unit)
+      .execute();
+
+    List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.ALL)
+      .filter(tableInfo.toFilter())
+      .columns(MetastoreColumn.TABLE_NAME, MetastoreColumn.METADATA_KEY)
+      .execute();
+
+    assertEquals(1, units.size());
+    assertEquals(TableMetadataUnit.builder()
+      .tableName("nation")
+      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+      .build(),
+      units.get(0));
+  }
+
+  @Test
+  public void testReadNoResult() {
+    List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.ALL)
+      .filter(FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"))
+      .columns(MetastoreColumn.TABLE_NAME, MetastoreColumn.METADATA_KEY)
+      .execute();
+
+    assertTrue(units.isEmpty());
+  }
+
+  @Test
+  public void testReadAbsentColumns() {
+    TableInfo tableInfo = TableInfo.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .name("nation")
+      .build();
+
+    TableMetadataUnit unit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+      .metadataType(MetadataType.TABLE.name())
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    tables.modify()
+      .overwrite(unit)
+      .execute();
+
+    List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.TABLE)
+      .filter(tableInfo.toFilter())
+      .columns(MetastoreColumn.TABLE_NAME, MetastoreColumn.HOST_AFFINITY)
+      .execute();
+
+    assertEquals(1, units.size());
+    assertEquals(TableMetadataUnit.builder()
+        .tableName("nation")
+        .build(),
+      units.get(0));
+
+    units = tables.read()
+      .metadataType(MetadataType.TABLE)
+      .filter(tableInfo.toFilter())
+      .columns(MetastoreColumn.HOST_AFFINITY)
+      .execute();
+
+    assertEquals(1, units.size());
+    assertEquals(TableMetadataUnit.EMPTY_UNIT, units.get(0));
+  }
+
+  @Test
+  public void testFilterByAbsentColumns() {
+    TableInfo tableInfo = TableInfo.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .name("nation")
+      .build();
+
+    TableMetadataUnit unit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+      .metadataType(MetadataType.TABLE.name())
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    tables.modify()
+      .overwrite(unit)
+      .execute();
+
+    List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.TABLE)
+      .filter(FilterExpression.equal(MetastoreColumn.PATH, "abc"))
+      .columns(MetastoreColumn.TABLE_NAME)
+      .execute();
+
+    assertEquals(0, units.size());
+
+    units = tables.read()
+      .metadataType(MetadataType.TABLE)
+      .filter(FilterExpression.and(
+        FilterExpression.equal(MetastoreColumn.TABLE_NAME, "nation"),
+        FilterExpression.isNull(MetastoreColumn.PARTITION_VALUES),
+        FilterExpression.notEqual(MetastoreColumn.PATH, "abc"),
+        FilterExpression.isNull(MetastoreColumn.COLUMN),
+        FilterExpression.notIn(MetastoreColumn.ROW_GROUP_INDEX, 1, 2)))
+      .columns(MetastoreColumn.TABLE_NAME)
+      .execute();
+
+    assertEquals(TableMetadataUnit.builder()
+        .tableName("nation")
+        .build(),
+      units.get(0));
+  }
+
+  @Test
+  public void testOverwrite() {
+    TableInfo tableInfo = TableInfo.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .name("nation")
+      .build();
+
+    TableMetadataUnit initialUnit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+      .metadataType(MetadataType.TABLE.name())
+      .tableType("parquet")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    tables.modify()
+      .overwrite(initialUnit)
+      .execute();
+
+    List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.TABLE)
+      .filter(tableInfo.toFilter())
+      .execute();
+
+    assertEquals(1, units.size());
+    assertEquals(initialUnit, units.get(0));
+
+    TableMetadataUnit updatedUnit = TableMetadataUnit.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .tableName("nation")
+      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+      .metadataType(MetadataType.TABLE.name())
+      .tableType("text")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    tables.modify()
+      .overwrite(updatedUnit)
+      .execute();
+
+    List<TableMetadataUnit> updatedUnits = tables.read()
+      .metadataType(MetadataType.TABLE)
+      .filter(tableInfo.toFilter())
+      .execute();
+
+    assertEquals(1, updatedUnits.size());
+    assertEquals(updatedUnit, updatedUnits.get(0));
+  }
+
+  @Test
+  public void testOverwriteSeveralUnits() {
+    TableInfo tableInfo = TableInfo.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .name("nation")
+      .build();
+
+    TableMetadataUnit topLevelSegment = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey("1994")
+      .metadataIdentifier("1994")
+      .metadataType(MetadataType.SEGMENT.name())
+      .location("/tmp/nation/1994")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    TableMetadataUnit firstNestedSegment = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey("1994")
+      .metadataIdentifier("1994/Q1")
+      .metadataType(MetadataType.SEGMENT.name())
+      .location("/tmp/nation/1994/Q1")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    TableMetadataUnit secondNestedSegment = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey("1994")
+      .metadataIdentifier("1994/Q2")
+      .metadataType(MetadataType.SEGMENT.name())
+      .location("/tmp/nation/1994/Q2")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    tables.modify()
+      .overwrite(topLevelSegment, firstNestedSegment, secondNestedSegment)
+      .execute();
+
+    List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.SEGMENT)
+      .filter(tableInfo.toFilter())
+      .execute();
+
+    assertEquals(3, units.size());
+
+    tables.modify()
+      .overwrite(topLevelSegment, firstNestedSegment)
+      .execute();
+
+    List<TableMetadataUnit> updatedUnits = tables.read()
+      .metadataType(MetadataType.SEGMENT)
+      .filter(tableInfo.toFilter())
+      .execute();
+
+    assertEquals(2, updatedUnits.size());
+
+    Set<String> metadataIdentifiers = updatedUnits.stream()
+      .map(TableMetadataUnit::metadataIdentifier)
+      .collect(Collectors.toSet());
+
+    assertEquals(Sets.newHashSet("1994", "1994/Q1"), metadataIdentifiers);
+  }
+
+  @Test
+  public void testDelete() {
+    TableInfo tableInfo = TableInfo.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .name("nation")
+      .build();
+
+    TableMetadataUnit firstUnit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey("1994")
+      .metadataIdentifier("1994")
+      .metadataType(MetadataType.SEGMENT.name())
+      .location("/tmp/nation/1994")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    TableMetadataUnit secondUnit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey("1995")
+      .metadataIdentifier("1995")
+      .metadataType(MetadataType.SEGMENT.name())
+      .location("/tmp/nation/1995")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    tables.modify()
+      .overwrite(firstUnit, secondUnit)
+      .execute();
+
+    List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.SEGMENT)
+      .filter(tableInfo.toFilter())
+      .execute();
+
+    assertEquals(2, units.size());
+
+    FilterExpression deleteFilter = FilterExpression.and(
+      tableInfo.toFilter(),
+      FilterExpression.equal(MetastoreColumn.METADATA_KEY, "1994"));
+
+    tables.modify()
+      .delete(Delete.builder()
+        .metadataType(MetadataType.SEGMENT)
+        .filter(deleteFilter)
+        .build())
+      .execute();
+
+    List<TableMetadataUnit> updatedUnits = tables.read()
+      .metadataType(MetadataType.SEGMENT)
+      .filter(tableInfo.toFilter())
+      .execute();
+
+    assertEquals(1, updatedUnits.size());
+    assertEquals(secondUnit, updatedUnits.get(0));
+  }
+
+  @Test
+  public void testOverwriteDeleteOrder() {
+    TableInfo tableInfo = TableInfo.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .name("nation")
+      .build();
+
+    TableMetadataUnit initialUnit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey(MetadataInfo.GENERAL_INFO_KEY)
+      .metadataType(MetadataType.TABLE.name())
+      .lastModifiedTime(System.currentTimeMillis())
+      .tableType("parquet")
+      .build();
+
+    TableMetadataUnit updatedUnit = initialUnit.toBuilder()
+      .tableType("text")
+      .build();
+
+    // add initial unit
+    tables.modify()
+      .overwrite(initialUnit)
+      .execute();
+
+    // first delete, then overwrite
+    tables.modify()
+      .delete(Delete.builder()
+        .metadataType(MetadataType.TABLE)
+        .build())
+      .overwrite(updatedUnit)
+      .execute();
+
+    // check that unit is present and updated
+    List<TableMetadataUnit> resultAfterDeleteOverwrite = tables.read()
+      .metadataType(MetadataType.TABLE)
+      .execute();
+
+    assertEquals(1, resultAfterDeleteOverwrite.size());
+    assertEquals(updatedUnit, resultAfterDeleteOverwrite.get(0));
+
+    // first overwrite, then delete
+    tables.modify()
+      .overwrite(initialUnit)
+      .delete(Delete.builder()
+        .metadataType(MetadataType.TABLE)
+        .build())
+      .execute();
+
+    // check that units are absent
+    List<TableMetadataUnit> resultAfterOverwriteDelete = tables.read()
+      .metadataType(MetadataType.TABLE)
+      .execute();
+
+    assertEquals(0, resultAfterOverwriteDelete.size());
+  }
+
+  @Test
+  public void testOverwriteAndDeleteInOneTransaction() {
+    TableInfo tableInfo = TableInfo.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .name("nation")
+      .build();
+
+    TableMetadataUnit firstUnit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey("1994")
+      .metadataIdentifier("1994")
+      .metadataType(MetadataType.SEGMENT.name())
+      .location("/tmp/nation/1994")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    TableMetadataUnit secondUnit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey("1995")
+      .metadataIdentifier("1995")
+      .metadataType(MetadataType.SEGMENT.name())
+      .location("/tmp/nation/1995")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    tables.modify()
+      .overwrite(firstUnit, secondUnit)
+      .execute();
+
+    List<TableMetadataUnit> units = tables.read()
+      .metadataType(MetadataType.SEGMENT)
+      .filter(tableInfo.toFilter())
+      .execute();
+
+    assertEquals(2, units.size());
+
+    FilterExpression deleteFilter = FilterExpression.and(
+      tableInfo.toFilter(),
+      FilterExpression.equal(MetastoreColumn.METADATA_KEY, "1994"));
+
+    TableMetadataUnit updatedUnit = TableMetadataUnit.builder()
+      .storagePlugin(tableInfo.storagePlugin())
+      .workspace(tableInfo.workspace())
+      .tableName(tableInfo.name())
+      .metadataKey("1995")
+      .metadataIdentifier("1995")
+      .metadataType(MetadataType.SEGMENT.name())
+      .location("/user/nation/1995")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    tables.modify()
+      .delete(Delete.builder()
+        .metadataType(MetadataType.SEGMENT)
+        .filter(deleteFilter)
+        .build())
+      .overwrite(updatedUnit)
+      .execute();
+
+    List<TableMetadataUnit> updatedUnits = tables.read()
+      .metadataType(MetadataType.SEGMENT)
+      .filter(tableInfo.toFilter())
+      .execute();
+
+    assertEquals(1, updatedUnits.size());
+    assertEquals(updatedUnit, updatedUnits.get(0));
+  }
+
+  @Test
+  public void testPurge() {
+    TableMetadataUnit firstUnit = TableMetadataUnit.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .tableName("nation")
+      .metadataKey("dir0")
+      .metadataType(MetadataType.TABLE.name())
+      .tableType("parquet")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    TableMetadataUnit secondUnit = TableMetadataUnit.builder()
+      .storagePlugin("dfs")
+      .workspace("tmp")
+      .tableName("nation")
+      .metadataKey("1994")
+      .metadataIdentifier("1994")
+      .metadataType(MetadataType.SEGMENT.name())
+      .location("/tmp/nation/1994")
+      .lastModifiedTime(System.currentTimeMillis())
+      .build();
+
+    tables.modify()
+      .overwrite(firstUnit, secondUnit)
+      .execute();
+
+    List<TableMetadataUnit> initialUnits = tables.read()
+      .metadataType(MetadataType.ALL)
+      .execute();
+
+    assertEquals(2, initialUnits.size());
+
+    tables.modify()
+      .purge();
+
+    List<TableMetadataUnit> resultingUnits = tables.read()
+      .metadataType(MetadataType.ALL)
+      .execute();
+
+    assertTrue(resultingUnits.isEmpty());
+  }
+}
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequestsRequestMetadata.java
similarity index 95%
rename from metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
rename to metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequestsRequestMetadata.java
index 1679cbe..5c13c46 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequests.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestBasicTablesRequestsRequestMetadata.java
@@ -34,7 +34,7 @@
 import static org.junit.Assert.assertTrue;
 
 @Category(MetastoreTest.class)
-public class TestBasicTablesRequests extends BaseTest {
+public class TestBasicTablesRequestsRequestMetadata extends BaseTest {
 
   @Test
   public void testRequestMetadataWithoutRequestColumns() {
@@ -150,8 +150,8 @@
   @Test
   public void testRequestMetadataWithMetadataTypes() {
     BasicTablesRequests.RequestMetadata requestMetadata = BasicTablesRequests.RequestMetadata.builder()
-      .metadataTypes(MetadataType.TABLE, MetadataType.SEGMENT)
-      .metadataTypes(Arrays.asList(MetadataType.PARTITION, MetadataType.FILE))
+      .metadataTypes(MetadataType.TABLE, MetadataType.SEGMENT, MetadataType.PARTITION)
+      .metadataTypes(MetadataType.PARTITION, MetadataType.FILE)
       .build();
 
     assertEquals(4, requestMetadata.metadataTypes().size());
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
index 7a6212c..e487470 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTableMetadataUnitConversion.java
@@ -93,6 +93,7 @@
       .tableType(tableInfo.type())
       .metadataType(metadataInfo.type().name())
       .metadataKey(metadataInfo.key())
+      .metadataIdentifier(metadataInfo.identifier())
       .columnsStatistics(data.unitColumnsStatistics)
       .metadataStatistics(data.unitMetadataStatistics)
       .lastModifiedTime(BaseMetadata.UNDEFINED_TIME)
@@ -128,6 +129,7 @@
       .tableType(tableInfo.type())
       .metadataType(metadataInfo.type().name())
       .metadataKey(metadataInfo.key())
+      .metadataIdentifier(metadataInfo.identifier())
       .schema(data.unitSchema)
       .columnsStatistics(data.unitColumnsStatistics)
       .metadataStatistics(data.unitMetadataStatistics)
diff --git a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTablesMetadataTypeValidator.java b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTablesMetadataTypeValidator.java
index 80b1d61..97f8ca6 100644
--- a/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTablesMetadataTypeValidator.java
+++ b/metastore/metastore-api/src/test/java/org/apache/drill/metastore/components/tables/TestTablesMetadataTypeValidator.java
@@ -20,11 +20,11 @@
 import org.apache.drill.categories.MetastoreTest;
 import org.apache.drill.metastore.exceptions.MetastoreException;
 import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.drill.test.BaseTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.util.Arrays;
 import java.util.Collections;
 
 import static org.hamcrest.CoreMatchers.startsWith;
@@ -36,13 +36,13 @@
 
   @Test
   public void testValidType() {
-    TablesMetadataTypeValidator.INSTANCE.validate(Collections.singletonList(MetadataType.ALL));
-    TablesMetadataTypeValidator.INSTANCE.validate(Collections.singletonList(MetadataType.TABLE));
+    TablesMetadataTypeValidator.INSTANCE.validate(Collections.singleton(MetadataType.ALL));
+    TablesMetadataTypeValidator.INSTANCE.validate(Collections.singleton(MetadataType.TABLE));
   }
 
   @Test
   public void testValidTypes() {
-    TablesMetadataTypeValidator.INSTANCE.validate(Arrays.asList(
+    TablesMetadataTypeValidator.INSTANCE.validate(Sets.newHashSet(
       MetadataType.TABLE,
       MetadataType.SEGMENT,
       MetadataType.FILE,
@@ -53,7 +53,7 @@
   @Test
   public void testInvalidType() {
     try {
-      TablesMetadataTypeValidator.INSTANCE.validate(Collections.singletonList(MetadataType.NONE));
+      TablesMetadataTypeValidator.INSTANCE.validate(Collections.singleton(MetadataType.NONE));
       fail();
     } catch (MetastoreException e) {
       assertThat(e.getMessage(), startsWith("Unsupported metadata types are detected"));
@@ -63,7 +63,7 @@
   @Test
   public void testValidAndInvalidTypes() {
     try {
-      TablesMetadataTypeValidator.INSTANCE.validate(Arrays.asList(
+      TablesMetadataTypeValidator.INSTANCE.validate(Sets.newHashSet(
         MetadataType.TABLE,
         MetadataType.ALL,
         MetadataType.NONE,
diff --git a/metastore/pom.xml b/metastore/pom.xml
index 88031b1..f39e9ad 100644
--- a/metastore/pom.xml
+++ b/metastore/pom.xml
@@ -45,5 +45,6 @@
   <modules>
     <module>metastore-api</module>
     <module>iceberg-metastore</module>
+    <module>rdbms-metastore</module>
   </modules>
 </project>
diff --git a/metastore/rdbms-metastore/README.md b/metastore/rdbms-metastore/README.md
new file mode 100644
index 0000000..6b12397
--- /dev/null
+++ b/metastore/rdbms-metastore/README.md
@@ -0,0 +1,158 @@
+# RDBMS Metastore
+
+The RDBMS Metastore implementation allows you store Drill Metastore metadata in a configured RDBMS.
+
+## Configuration
+
+Currently, the RDBMS Metastore is not the default implementation.
+To enable the RDBMS Metastore create the `drill-metastore-override.conf` file 
+in your config directory and specify the RDBMS Metastore class:
+
+```
+drill.metastore: {
+  implementation.class: "org.apache.drill.metastore.rdbms.RdbmsMetastore"
+}
+```
+
+### Connection properties
+
+Use the connection properties to specify how Drill should connect to your Metastore database.
+
+`drill.metastore.rdbms.data_source.driver` - driver class name. Required. 
+Note: the driver class must be included into the Drill classpath. 
+The easiest way to do that is to put the driver jar file into the `$DRILL_HOME/jars/3rdparty` folder.
+Or, to make upgrades easier, in your `$DRILL_SITE/jars` folder.
+Drill includes the driver for SQLite.
+
+`drill.metastore.rdbms.data_source.url` - connection url. Required.
+
+`drill.metastore.rdbms.data_source.username` - database user on whose behalf the connection is
+being made. Optional, if database does not require user to connect. 
+
+`drill.metastore.rdbms.data_source.password` - database user's password. 
+Optional, if database does not require user's password to connect.
+
+`drill.metastore.rdbms.data_source.properties` - specifies properties which will be used
+during data source creation. See list of available [Hikari properties](https://github.com/brettwooldridge/HikariCP)
+for more details.
+
+### Default configuration 
+
+Out of the box, the Drill RDBMS Metastore is configured to use the embedded file system based SQLite database.
+It will be created locally in user's home directory under `${drill.exec.zk.root}"/metastore` location.
+
+Default setup can be used only in Drill embedded mode. SQLite is an embedded database; is not distributed. 
+SQLite is good for trying out the feature, for testing, for a running Drill in embedded mode, 
+and perhaps for a single-node Drill "cluster". If should not be used in a multi-node cluster. 
+Each Drillbit will have its own version of the truth and behavior will be undefined and incorrect.
+
+### Custom configuration
+
+`drill-metastore-override.conf` is used to customize connection details to the Drill Metastore database.
+See `drill-metastore-override-example.conf` for more details.
+
+#### Example of PostgreSQL configuration
+
+```
+drill.metastore: {
+  implementation.class: "org.apache.drill.metastore.rdbms.RdbmsMetastore",
+  rdbms: {
+    data_source: {
+      driver: "org.postgresql.Driver",
+      url: "jdbc:postgresql://localhost:1234/mydb?currentSchema=drill_metastore",
+      username: "user",
+      password: "password"
+    }
+  }
+}
+```
+
+Note: as mentioned above, the PostgreSQL JDBC driver must be present in the Drill classpath.
+
+#### Example of MySQL configuration
+
+```
+drill.metastore: {
+  implementation.class: "org.apache.drill.metastore.rdbms.RdbmsMetastore",
+  rdbms: {
+    data_source: {
+      driver: "com.mysql.cj.jdbc.Driver",
+      url: "jdbc:mysql://localhost:1234/drill_metastore",
+      username: "user",
+      password: "password"
+    }
+  }
+}
+```
+
+Note: as mentioned above, the MySQL JDBC driver must be present in the Drill classpath.
+
+##### Driver version
+
+For MySQL connector version 6+, use the `com.mysql.cj.jdbc.Driver` driver class,
+for older versions use the `com.mysql.jdbc.Driver`.
+
+## Tables structure
+
+The Drill Metastore stores several types of metadata, called components. Currently, only the `tables` component is implemented.
+The `tables` component provides metadata about Drill tables, including their segments, files, row groups and partitions.
+In Drill `tables` component unit is represented by `TableMetadataUnit` class which is applicable to any metadata type.
+The `TableMetadataUnit` class holds fields for all five metadata types within the `tables` component. 
+Any fields not applicable to a particular metadata type are simply ignored and remain unset.
+
+In the RDBMS implementation of the Drill Metastore, the tables component includes five tables, one for each metadata type. 
+The five tables are: `TABLES`, `SEGMENTS`, `FILES`, `ROW_GROUPS`, and `PARTITIONS`.
+See `src/main/resources/db/changelog/changes/initial_ddls.yaml` for the schema and indexes of each table.
+
+The Drill Metastore API has the following semantics:
+* most of the time all data about component is accessed;
+* data is filtered by non-complex fields, like storage plugin, workspace, table name, etc;
+* data is overwritten fully, there is no update by certain fields.
+
+Taking into account the Drill Metastore API semantics, the RDBMS Drill Metastore schema is slightly denormalized.
+Having normalized structure would lead to unnecessary joins during select, index re-indexing during update.
+
+### Table creation
+
+The RDBMS Metastore uses [Liquibase](https://www.liquibase.org/documentation/core-concepts/index.html)
+to create the needed tables during the RDBMS Metastore initialization. Users should not create any tables manually.
+
+### Database schema
+
+Liquibase uses a yaml configuration file to apply changes to the database schema: `src/main/resources/db/changelog/changelog.yaml`.
+Liquibase converts the yaml specification into the DDL / DML commands suitable required for the configured database.
+See list of supported databases: https://www.liquibase.org/databases.html.
+
+The Drill Metastore tables are created in the database schema indicated in the connection URL.
+This will be the default schema unless you specify a different schema. Drill will not create the schema, however. 
+Best practice is to create a schema within your database for the Drill metastore before initializing the Metastore.
+
+Example:
+
+PostgreSQL: `jdbc:postgresql://localhost:1234/mydb?currentSchema=drill_metastore`
+
+MySQL: `jdbc:mysql://localhost:1234/drill_metastore`
+
+Since Drill will create the required tables, ensure that the database user has the following permissions in the metastore schema:
+* read and write tables;
+* create and modify database objects (tables, indexes, views, etc.).
+
+### Liquibase tables
+
+During Drill RDBMS Metastore initialization, Liquibase will create two internal tracking tables:
+`DATABASECHANGELOG` and `DATABASECHANGELOGLOCK`. They are needed to track schema changes and concurrent updates.
+See https://www.liquibase.org/get_started/how-lb-works.html for more details.
+
+## Query execution
+
+SQL queries issued to RDBMS Metastore tables are generated using [JOOQ](https://www.jooq.org/doc/3.13/manual/getting-started/).
+Drill uses the open-source version of JOOQ to generate the queries sent to the configured Metastore database.
+
+JOOQ generates SQL statements based on SQL dialect determined by database connection details.
+List of supported dialects: https://www.jooq.org/javadoc/3.13.x/org.jooq/org/jooq/SQLDialect.html.
+Note: dialects annotated with `@Pro` are not supported, since open-source version of JOOQ is used.
+
+## Supported databases
+
+The RDBMS Metastore was tested with `SQLite`, `PostreSQL` and `MySQL`. Other databases should also work
+if there is Liquibase and JOOQ support for them.
diff --git a/metastore/rdbms-metastore/pom.xml b/metastore/rdbms-metastore/pom.xml
new file mode 100644
index 0000000..986d012
--- /dev/null
+++ b/metastore/rdbms-metastore/pom.xml
@@ -0,0 +1,140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>metastore-parent</artifactId>
+    <groupId>org.apache.drill.metastore</groupId>
+    <version>1.18.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-rdbms-metastore</artifactId>
+  <name>metastore/Drill RDBMS Metastore</name>
+
+  <properties>
+    <jooq.version>3.13.1</jooq.version>
+    <liquibase.version>3.8.7</liquibase.version>
+    <sqlite.version>3.30.1</sqlite.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill.metastore</groupId>
+      <artifactId>drill-metastore-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.zaxxer</groupId>
+      <artifactId>HikariCP</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.liquibase</groupId>
+      <artifactId>liquibase-core</artifactId>
+      <version>${liquibase.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.jooq</groupId>
+      <artifactId>jooq</artifactId>
+      <version>${jooq.version}</version>
+    </dependency>
+
+    <!-- Is needed to allow yaml parsing in JOOQ -->
+    <dependency>
+      <groupId>org.yaml</groupId>
+      <artifactId>snakeyaml</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.xerial</groupId>
+      <artifactId>sqlite-jdbc</artifactId>
+      <version>${sqlite.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.drill.metastore</groupId>
+      <artifactId>drill-metastore-api</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <!-- Generates JOOQ classes based on Liquibase file -->
+      <plugin>
+        <groupId>org.jooq</groupId>
+        <artifactId>jooq-codegen-maven</artifactId>
+        <version>${jooq.version}</version>
+
+        <!-- The plugin should hook into the generate goal -->
+        <executions>
+          <execution>
+            <goals>
+              <goal>generate</goal>
+            </goals>
+          </execution>
+        </executions>
+
+        <dependencies>
+          <dependency>
+            <groupId>org.jooq</groupId>
+            <artifactId>jooq-meta-extensions</artifactId>
+            <version>${jooq.version}</version>
+          </dependency>
+        </dependencies>
+
+        <configuration>
+          <generator>
+            <database>
+              <name>org.jooq.meta.extensions.liquibase.LiquibaseDatabase</name>
+              <properties>
+                <property>
+                  <key>scripts</key>
+                  <!--
+                     Replace with <value>db/changelog/changelog.yaml</value>
+                     when JOOQ 3.13.2 is released, see https://github.com/jOOQ/jOOQ/issues/9866
+                  -->
+                  <value>${project.basedir}/src/main/resources/db/changelog/changes/initial_ddls.yaml</value>
+                </property>
+                <property>
+                  <key>includeLiquibaseTables</key>
+                  <value>false</value>
+                </property>
+              </properties>
+            </database>
+          </generator>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/QueryExecutorProvider.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/QueryExecutorProvider.java
new file mode 100644
index 0000000..ef99e3d
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/QueryExecutorProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms;
+
+import com.zaxxer.hikari.HikariDataSource;
+import org.jooq.DSLContext;
+import org.jooq.SQLDialect;
+import org.jooq.impl.DSL;
+import org.jooq.tools.jdbc.JDBCUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ * Provides SQL queries executor configured based on given data source and SQL dialect.
+ */
+public class QueryExecutorProvider implements AutoCloseable {
+
+  private static final Logger logger = LoggerFactory.getLogger(QueryExecutorProvider.class);
+
+  private final HikariDataSource dataSource;
+  private final SQLDialect dialect;
+
+  public QueryExecutorProvider(HikariDataSource dataSource) {
+    this.dataSource = dataSource;
+    this.dialect = defineDialect();
+  }
+
+  /**
+   * Provides query executor which can be used to execute various SQL statements.
+   * Executor transforms programmatically created queries into configured SQL dialect,
+   * executes them using connections from provided data source. Allows to execute
+   * SQL queries in transaction.
+   * Note: always close executor to release open connections.
+   *
+   * @return query executor
+   */
+  public DSLContext executor() {
+    return DSL.using(dataSource, dialect);
+  }
+
+  @Override
+  public void close() {
+    dataSource.close();
+  }
+
+  /**
+   * Defines SQL dialect based on data source connection.
+   * If unable to define the dialect, uses {@link SQLDialect#DEFAULT}.
+   *
+   * @return SQL dialect
+   */
+  private SQLDialect defineDialect() {
+    SQLDialect dialect = SQLDialect.DEFAULT;
+    try (Connection connection = dataSource.getConnection()) {
+      dialect = JDBCUtils.dialect(connection);
+    } catch (SQLException e) {
+      logger.debug("Unable to connect to data source in order to define SQL dialect: {}", e.getMessage(), e);
+      // ignore exception and fallback to default dialect
+    }
+    logger.info("RDBMS Metastore is configured to use {} SQL dialect", dialect);
+    return dialect;
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/RdbmsMetastore.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/RdbmsMetastore.java
new file mode 100644
index 0000000..0d19a52
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/RdbmsMetastore.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms;
+
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import liquibase.Liquibase;
+import liquibase.database.Database;
+import liquibase.database.DatabaseFactory;
+import liquibase.database.jvm.JdbcConnection;
+import liquibase.resource.ClassLoaderResourceAccessor;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.Metastore;
+import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.components.views.Views;
+import org.apache.drill.metastore.rdbms.components.tables.RdbmsTables;
+import org.apache.drill.metastore.rdbms.config.RdbmsConfigConstants;
+import org.apache.drill.metastore.rdbms.exception.RdbmsMetastoreException;
+import org.apache.drill.metastore.rdbms.util.DbHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.util.Properties;
+
+/**
+ * RDBMS Drill Metastore implementation that creates necessary tables using Liquibase,
+ * initializes data source using provided config.
+ */
+public class RdbmsMetastore implements Metastore {
+
+  private static final Logger logger = LoggerFactory.getLogger(RdbmsMetastore.class);
+
+  private static final String LIQUIBASE_CHANGELOG_FILE = "db/changelog/changelog.yaml";
+
+  private final QueryExecutorProvider executorProvider;
+
+  public RdbmsMetastore(DrillConfig config) {
+    HikariDataSource dataSource = dataSource(config);
+    this.executorProvider = new QueryExecutorProvider(dataSource);
+    initTables(dataSource);
+  }
+
+  @Override
+  public Tables tables() {
+    return new RdbmsTables(executorProvider);
+  }
+
+  @Override
+  public Views views() {
+    throw new UnsupportedOperationException("Views metadata support is not implemented");
+  }
+
+  @Override
+  public void close() {
+    executorProvider.close();
+  }
+
+  /**
+   * Prepares database before initializing data source based on its type,
+   * initializes {@link HikariDataSource} instance and configures it based on given
+   * Metastore configuration.
+   * Basic parameters such as driver, url, user name and password are set using setters.
+   * Other source parameters are set dynamically through the properties. See the list
+   * of available Hikari properties: <a href="https://github.com/brettwooldridge/HikariCP">.
+   *
+   * @param config Metastore config
+   * @return Hikari data source instance
+   * @throws RdbmsMetastoreException if unable to configure Hikari data source
+   */
+  private HikariDataSource dataSource(DrillConfig config) {
+    DbHelper.init(config).prepareDatabase();
+    try {
+      Properties properties = new Properties();
+      if (config.hasPath(RdbmsConfigConstants.DATA_SOURCE_PROPERTIES)) {
+        Config propertiesConfig = config.getConfig(RdbmsConfigConstants.DATA_SOURCE_PROPERTIES);
+        propertiesConfig.entrySet().forEach(e -> properties.put(e.getKey(), e.getValue().unwrapped()));
+      }
+      HikariConfig hikariConfig = new HikariConfig(properties);
+      hikariConfig.setDriverClassName(config.getString(RdbmsConfigConstants.DATA_SOURCE_DRIVER));
+      hikariConfig.setJdbcUrl(config.getString(RdbmsConfigConstants.DATA_SOURCE_URL));
+      if (config.hasPath(RdbmsConfigConstants.DATA_SOURCE_USER_NAME)) {
+        hikariConfig.setUsername(config.getString(RdbmsConfigConstants.DATA_SOURCE_USER_NAME));
+      }
+      if (config.hasPath(RdbmsConfigConstants.DATA_SOURCE_PASSWORD)) {
+        hikariConfig.setPassword(config.getString(RdbmsConfigConstants.DATA_SOURCE_PASSWORD));
+      }
+      return new HikariDataSource(hikariConfig);
+    } catch (RuntimeException e) {
+      throw new RdbmsMetastoreException("Unable to init RDBMS Metastore data source: " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Initializes RDBMS Metastore tables structure based on {@link #LIQUIBASE_CHANGELOG_FILE} file.
+   * See <a href="https://www.liquibase.org/documentation/core-concepts/index.html"> for more details.
+   *
+   * @param dataSource data source
+   */
+  private void initTables(DataSource dataSource) {
+    try (Connection connection = dataSource.getConnection()) {
+      JdbcConnection jdbcConnection = new JdbcConnection(connection);
+      Database database = DatabaseFactory.getInstance().findCorrectDatabaseImplementation(jdbcConnection);
+      ClassLoaderResourceAccessor resourceAccessor = new ClassLoaderResourceAccessor();
+      try (Liquibase liquibase = new Liquibase(LIQUIBASE_CHANGELOG_FILE, resourceAccessor, database)) {
+        liquibase.update("");
+      }
+    } catch (Exception e) {
+      throw new RdbmsMetastoreException("Unable to init Metastore tables using Liquibase: " + e.getMessage(), e);
+    }
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/RdbmsMetastoreContext.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/RdbmsMetastoreContext.java
new file mode 100644
index 0000000..b250aa9
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/RdbmsMetastoreContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms;
+
+import org.apache.drill.metastore.rdbms.transform.Transformer;
+
+/**
+ * Provides RDBMS Metastore component tools to transform, read or write data from / into RDBMS tables.
+ *
+ * @param <T> Metastore component unit metadata type
+ */
+public interface RdbmsMetastoreContext<T> {
+
+  /**
+   * Return executor provider which allows to execute SQL queries
+   * using configured data source.
+   *
+   * @return executor provider instance
+   */
+  QueryExecutorProvider executorProvider();
+
+  /**
+   * Returns transformer which allows various
+   * data, filters, operations transformation.
+   *
+   * @return transformer instance
+   */
+  Transformer<T> transformer();
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/components/tables/RdbmsTables.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/components/tables/RdbmsTables.java
new file mode 100644
index 0000000..320178b
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/components/tables/RdbmsTables.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.components.tables;
+
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.components.tables.Tables;
+import org.apache.drill.metastore.components.tables.TablesMetadataTypeValidator;
+import org.apache.drill.metastore.operate.Metadata;
+import org.apache.drill.metastore.operate.Modify;
+import org.apache.drill.metastore.operate.Read;
+import org.apache.drill.metastore.rdbms.QueryExecutorProvider;
+import org.apache.drill.metastore.rdbms.RdbmsMetastoreContext;
+import org.apache.drill.metastore.rdbms.operate.RdbmsMetadata;
+import org.apache.drill.metastore.rdbms.operate.RdbmsModify;
+import org.apache.drill.metastore.rdbms.operate.RdbmsRead;
+import org.apache.drill.metastore.rdbms.transform.Transformer;
+
+/**
+ * Metastore Tables component which stores tables metadata in the corresponding RDBMS tables:
+ * TABLES, SEGMENTS, FILES, ROW_GROUPS, PARTITIONS.
+ * Provides methods to read and modify tables metadata.
+ */
+public class RdbmsTables implements Tables, RdbmsMetastoreContext<TableMetadataUnit> {
+
+  private final QueryExecutorProvider executorProvider;
+
+  public RdbmsTables(QueryExecutorProvider executorProvider) {
+    this.executorProvider = executorProvider;
+  }
+
+  public RdbmsMetastoreContext<TableMetadataUnit> context() {
+    return this;
+  }
+
+  @Override
+  public Metadata metadata() {
+    return new RdbmsMetadata();
+  }
+
+  @Override
+  public Read<TableMetadataUnit> read() {
+    return new RdbmsRead<>(TablesMetadataTypeValidator.INSTANCE, context());
+  }
+
+  @Override
+  public Modify<TableMetadataUnit> modify() {
+    return new RdbmsModify<>(TablesMetadataTypeValidator.INSTANCE, context());
+  }
+
+  @Override
+  public QueryExecutorProvider executorProvider() {
+    return executorProvider;
+  }
+
+  @Override
+  public Transformer<TableMetadataUnit> transformer() {
+    return TablesTransformer.get();
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/components/tables/TablesMetadataMapper.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/components/tables/TablesMetadataMapper.java
new file mode 100644
index 0000000..9ecbf39
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/components/tables/TablesMetadataMapper.java
@@ -0,0 +1,606 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.components.tables;
+
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.rdbms.transform.AbstractMetadataMapper;
+import org.apache.drill.metastore.rdbms.transform.RdbmsFilterExpressionVisitor;
+import org.apache.drill.metastore.rdbms.util.ConverterUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.jooq.Condition;
+import org.jooq.Field;
+import org.jooq.Record;
+import org.jooq.Table;
+import org.jooq.generated.Tables;
+import org.jooq.generated.tables.records.FilesRecord;
+import org.jooq.generated.tables.records.PartitionsRecord;
+import org.jooq.generated.tables.records.RowGroupsRecord;
+import org.jooq.generated.tables.records.SegmentsRecord;
+import org.jooq.generated.tables.records.TablesRecord;
+import org.jooq.impl.DSL;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract implementation of {@link AbstractMetadataMapper} for RDBMS Metastore tables component.
+ * Contains common code for specific RDBMS Metastore tables component tables.
+ *
+ * @param <R> RDBMS table record type
+ */
+public abstract class TablesMetadataMapper<R extends Record> extends AbstractMetadataMapper<TableMetadataUnit, R> {
+
+  protected static final Function<TableMetadataUnit, List<String>> TABLE_PARTITION_KEY = unit ->
+    Arrays.asList(unit.storagePlugin(), unit.workspace(), unit.tableName());
+
+  protected static final Function<TableMetadataUnit, List<String>> COMPONENT_PARTITION_KEY = unit ->
+    Arrays.asList(unit.storagePlugin(), unit.workspace(), unit.tableName(), unit.metadataKey());
+
+  @Override
+  public TableMetadataUnit emptyUnit() {
+    return TableMetadataUnit.EMPTY_UNIT;
+  }
+
+  @Override
+  public List<Condition> toDeleteConditions(List<TableMetadataUnit> units) {
+    Set<List<String>> partitionValues = units.stream()
+      .collect(Collectors.groupingBy(partitionKey(), Collectors.toList()))
+      .keySet();
+
+    return partitionValues.stream()
+      .map(values -> DSL.and(toConditions(values)))
+      .collect(Collectors.toList());
+  }
+
+  /**
+   * @return function to determine partition key for specific table
+   */
+  protected abstract Function<TableMetadataUnit, List<String>> partitionKey();
+
+  /**
+   * Creates JOOQ conditions based on given list of partition values.
+   * Matching is order based.
+   *
+   * @param values partition values
+   * @return list of JOOQ conditions
+   */
+  protected abstract List<Condition> toConditions(List<String> values);
+
+  /**
+   * {@link TablesMetadataMapper} implementation for {@link Tables#TABLES} table.
+   */
+  public static class TableMapper extends TablesMetadataMapper<TablesRecord> {
+
+    private static final TableMapper INSTANCE = new TableMapper();
+
+    private static final Map<MetastoreColumn, Field<?>> COLUMNS_MAP = ImmutableMap.<MetastoreColumn, Field<?>>builder()
+      .put(MetastoreColumn.STORAGE_PLUGIN, Tables.TABLES.STORAGE_PLUGIN)
+      .put(MetastoreColumn.WORKSPACE, Tables.TABLES.WORKSPACE)
+      .put(MetastoreColumn.TABLE_NAME, Tables.TABLES.TABLE_NAME)
+      .put(MetastoreColumn.OWNER, Tables.TABLES.OWNER)
+      .put(MetastoreColumn.TABLE_TYPE, Tables.TABLES.TABLE_TYPE)
+      .put(MetastoreColumn.METADATA_KEY, Tables.TABLES.METADATA_KEY)
+      .put(MetastoreColumn.METADATA_TYPE, Tables.TABLES.METADATA_TYPE)
+      .put(MetastoreColumn.LOCATION, Tables.TABLES.LOCATION)
+      .put(MetastoreColumn.INTERESTING_COLUMNS, Tables.TABLES.INTERESTING_COLUMNS)
+      .put(MetastoreColumn.SCHEMA, Tables.TABLES.SCHEMA)
+      .put(MetastoreColumn.COLUMNS_STATISTICS, Tables.TABLES.COLUMN_STATISTICS)
+      .put(MetastoreColumn.METADATA_STATISTICS, Tables.TABLES.METADATA_STATISTICS)
+      .put(MetastoreColumn.PARTITION_KEYS, Tables.TABLES.PARTITION_KEYS)
+      .put(MetastoreColumn.LAST_MODIFIED_TIME, Tables.TABLES.LAST_MODIFIED_TIME)
+      .put(MetastoreColumn.ADDITIONAL_METADATA, Tables.TABLES.ADDITIONAL_METADATA)
+      .build();
+
+    private static final RdbmsFilterExpressionVisitor FILTER_VISITOR = new RdbmsFilterExpressionVisitor(COLUMNS_MAP);
+
+    public static TableMapper get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public Table<TablesRecord> table() {
+      return Tables.TABLES;
+    }
+
+    @Override
+    public TableMetadataUnit toUnit(Record record) {
+      TablesRecord tablesRecord = (TablesRecord) record;
+      return TableMetadataUnit.builder()
+        .storagePlugin(tablesRecord.getStoragePlugin())
+        .workspace(tablesRecord.getWorkspace())
+        .tableName(tablesRecord.getTableName())
+        .owner(tablesRecord.getOwner())
+        .tableType(tablesRecord.getTableType())
+        .metadataKey(tablesRecord.getMetadataKey())
+        .metadataType(tablesRecord.getMetadataType())
+        .location(tablesRecord.getLocation())
+        .interestingColumns(ConverterUtil.convertToListString(tablesRecord.getInterestingColumns()))
+        .schema(tablesRecord.getSchema())
+        .columnsStatistics(ConverterUtil.convertToMapStringString(tablesRecord.getColumnStatistics()))
+        .metadataStatistics(ConverterUtil.convertToListString(tablesRecord.getMetadataStatistics()))
+        .partitionKeys(ConverterUtil.convertToMapStringString(tablesRecord.getPartitionKeys()))
+        .lastModifiedTime(tablesRecord.getLastModifiedTime())
+        .additionalMetadata(tablesRecord.getAdditionalMetadata())
+        .build();
+    }
+
+    @Override
+    public TablesRecord toRecord(TableMetadataUnit unit) {
+      TablesRecord record = new TablesRecord();
+      record.setStoragePlugin(unit.storagePlugin());
+      record.setWorkspace(unit.workspace());
+      record.setTableName(unit.tableName());
+      record.setOwner(unit.owner());
+      record.setTableType(unit.tableType());
+      record.setMetadataKey(unit.metadataKey());
+      record.setMetadataType(unit.metadataType());
+      record.setLocation(unit.location());
+      record.setInterestingColumns(ConverterUtil.convertToString(unit.interestingColumns()));
+      record.setSchema(unit.schema());
+      record.setColumnStatistics(ConverterUtil.convertToString(unit.columnsStatistics()));
+      record.setMetadataStatistics(ConverterUtil.convertToString(unit.metadataStatistics()));
+      record.setPartitionKeys(ConverterUtil.convertToString(unit.partitionKeys()));
+      record.setLastModifiedTime(unit.lastModifiedTime());
+      record.setAdditionalMetadata(unit.additionalMetadata());
+      return record;
+    }
+
+    @Override
+    protected Map<MetastoreColumn, Field<?>> fieldMapper() {
+      return COLUMNS_MAP;
+    }
+
+    @Override
+    protected RdbmsFilterExpressionVisitor filterVisitor() {
+      return FILTER_VISITOR;
+    }
+
+    @Override
+    protected Function<TableMetadataUnit, List<String>> partitionKey() {
+      return TABLE_PARTITION_KEY;
+    }
+
+    @Override
+    protected List<Condition> toConditions(List<String> values) {
+      assert values.size() == 3;
+      return Arrays.asList(
+        Tables.TABLES.STORAGE_PLUGIN.eq(values.get(0)),
+        Tables.TABLES.WORKSPACE.eq(values.get(1)),
+        Tables.TABLES.TABLE_NAME.eq(values.get(2))
+      );
+    }
+  }
+
+  /**
+   * {@link TablesMetadataMapper} implementation for {@link Tables#SEGMENTS} table.
+   */
+  public static class SegmentMapper extends TablesMetadataMapper<SegmentsRecord> {
+
+    private static final SegmentMapper INSTANCE = new SegmentMapper();
+
+    private static final Map<MetastoreColumn, Field<?>> COLUMNS_MAP = ImmutableMap.<MetastoreColumn, Field<?>>builder()
+      .put(MetastoreColumn.STORAGE_PLUGIN, Tables.SEGMENTS.STORAGE_PLUGIN)
+      .put(MetastoreColumn.WORKSPACE, Tables.SEGMENTS.WORKSPACE)
+      .put(MetastoreColumn.TABLE_NAME, Tables.SEGMENTS.TABLE_NAME)
+      .put(MetastoreColumn.METADATA_KEY, Tables.SEGMENTS.METADATA_KEY)
+      .put(MetastoreColumn.METADATA_IDENTIFIER, Tables.SEGMENTS.METADATA_IDENTIFIER)
+      .put(MetastoreColumn.METADATA_TYPE, Tables.SEGMENTS.METADATA_TYPE)
+      .put(MetastoreColumn.LOCATION, Tables.SEGMENTS.LOCATION)
+      .put(MetastoreColumn.SCHEMA, Tables.SEGMENTS.SCHEMA)
+      .put(MetastoreColumn.COLUMNS_STATISTICS, Tables.SEGMENTS.COLUMN_STATISTICS)
+      .put(MetastoreColumn.METADATA_STATISTICS, Tables.SEGMENTS.METADATA_STATISTICS)
+      .put(MetastoreColumn.COLUMN, Tables.SEGMENTS.COLUMN)
+      .put(MetastoreColumn.LOCATIONS, Tables.SEGMENTS.LOCATIONS)
+      .put(MetastoreColumn.PARTITION_VALUES, Tables.SEGMENTS.PARTITION_VALUES)
+      .put(MetastoreColumn.PATH, Tables.SEGMENTS.PATH)
+      .put(MetastoreColumn.LAST_MODIFIED_TIME, Tables.SEGMENTS.LAST_MODIFIED_TIME)
+      .put(MetastoreColumn.ADDITIONAL_METADATA, Tables.SEGMENTS.ADDITIONAL_METADATA)
+      .build();
+
+    private static final RdbmsFilterExpressionVisitor FILTER_VISITOR = new RdbmsFilterExpressionVisitor(COLUMNS_MAP);
+
+    public static SegmentMapper get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public Table<SegmentsRecord> table() {
+      return Tables.SEGMENTS;
+    }
+
+    @Override
+    public TableMetadataUnit toUnit(Record record) {
+      SegmentsRecord segmentsRecord = (SegmentsRecord) record;
+      return TableMetadataUnit.builder()
+        .storagePlugin(segmentsRecord.getStoragePlugin())
+        .workspace(segmentsRecord.getWorkspace())
+        .tableName(segmentsRecord.getTableName())
+        .metadataKey(segmentsRecord.getMetadataKey())
+        .metadataIdentifier(segmentsRecord.getMetadataIdentifier())
+        .metadataType(segmentsRecord.getMetadataType())
+        .location(segmentsRecord.getLocation())
+        .schema(segmentsRecord.getSchema())
+        .columnsStatistics(ConverterUtil.convertToMapStringString(segmentsRecord.getColumnStatistics()))
+        .metadataStatistics(ConverterUtil.convertToListString(segmentsRecord.getMetadataStatistics()))
+        .column(segmentsRecord.getColumn())
+        .locations(ConverterUtil.convertToListString(segmentsRecord.getLocations()))
+        .partitionValues(ConverterUtil.convertToListString(segmentsRecord.getPartitionValues()))
+        .path(segmentsRecord.getPath())
+        .lastModifiedTime(segmentsRecord.getLastModifiedTime())
+        .additionalMetadata(segmentsRecord.getAdditionalMetadata())
+        .build();
+    }
+
+    @Override
+    public SegmentsRecord toRecord(TableMetadataUnit unit) {
+      SegmentsRecord record = new SegmentsRecord();
+      record.setStoragePlugin(unit.storagePlugin());
+      record.setWorkspace(unit.workspace());
+      record.setTableName(unit.tableName());
+      record.setMetadataKey(unit.metadataKey());
+      record.setMetadataIdentifier(unit.metadataIdentifier());
+      record.setMetadataType(unit.metadataType());
+      record.setLocation(unit.location());
+      record.setSchema(unit.schema());
+      record.setColumnStatistics(ConverterUtil.convertToString(unit.columnsStatistics()));
+      record.setMetadataStatistics(ConverterUtil.convertToString(unit.metadataStatistics()));
+      record.setColumn(unit.column());
+      record.setLocations(ConverterUtil.convertToString(unit.locations()));
+      record.setPartitionValues(ConverterUtil.convertToString(unit.partitionValues()));
+      record.setPath(unit.path());
+      record.setLastModifiedTime(unit.lastModifiedTime());
+      record.setAdditionalMetadata(unit.additionalMetadata());
+      return record;
+    }
+
+    @Override
+    protected Map<MetastoreColumn, Field<?>> fieldMapper() {
+      return COLUMNS_MAP;
+    }
+
+    @Override
+    protected RdbmsFilterExpressionVisitor filterVisitor() {
+      return FILTER_VISITOR;
+    }
+
+    @Override
+    protected Function<TableMetadataUnit, List<String>> partitionKey() {
+      return COMPONENT_PARTITION_KEY;
+    }
+
+    @Override
+    protected List<Condition> toConditions(List<String> values) {
+      assert values.size() == 4;
+      return Arrays.asList(Tables.SEGMENTS.STORAGE_PLUGIN.eq(values.get(0)),
+        Tables.SEGMENTS.WORKSPACE.eq(values.get(1)),
+        Tables.SEGMENTS.TABLE_NAME.eq(values.get(2)),
+        Tables.SEGMENTS.METADATA_KEY.eq(values.get(3)));
+    }
+  }
+
+  /**
+   * {@link TablesMetadataMapper} implementation for {@link Tables#FILES} table.
+   */
+  public static class FileMapper extends TablesMetadataMapper<FilesRecord> {
+
+    private static final FileMapper INSTANCE = new FileMapper();
+
+    private static final Map<MetastoreColumn, Field<?>> COLUMNS_MAP = ImmutableMap.<MetastoreColumn, Field<?>>builder()
+      .put(MetastoreColumn.STORAGE_PLUGIN, Tables.FILES.STORAGE_PLUGIN)
+      .put(MetastoreColumn.WORKSPACE, Tables.FILES.WORKSPACE)
+      .put(MetastoreColumn.TABLE_NAME, Tables.FILES.TABLE_NAME)
+      .put(MetastoreColumn.METADATA_KEY, Tables.FILES.METADATA_KEY)
+      .put(MetastoreColumn.METADATA_IDENTIFIER, Tables.FILES.METADATA_IDENTIFIER)
+      .put(MetastoreColumn.METADATA_TYPE, Tables.FILES.METADATA_TYPE)
+      .put(MetastoreColumn.LOCATION, Tables.FILES.LOCATION)
+      .put(MetastoreColumn.SCHEMA, Tables.FILES.SCHEMA)
+      .put(MetastoreColumn.COLUMNS_STATISTICS, Tables.FILES.COLUMN_STATISTICS)
+      .put(MetastoreColumn.METADATA_STATISTICS, Tables.FILES.METADATA_STATISTICS)
+      .put(MetastoreColumn.PATH, Tables.FILES.PATH)
+      .put(MetastoreColumn.LAST_MODIFIED_TIME, Tables.FILES.LAST_MODIFIED_TIME)
+      .put(MetastoreColumn.ADDITIONAL_METADATA, Tables.FILES.ADDITIONAL_METADATA)
+      .build();
+
+    private static final RdbmsFilterExpressionVisitor FILTER_VISITOR = new RdbmsFilterExpressionVisitor(COLUMNS_MAP);
+
+    public static FileMapper get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public Table<FilesRecord> table() {
+      return Tables.FILES;
+    }
+
+    @Override
+    public TableMetadataUnit toUnit(Record record) {
+      FilesRecord filesRecord = (FilesRecord) record;
+      return TableMetadataUnit.builder()
+        .storagePlugin(filesRecord.getStoragePlugin())
+        .workspace(filesRecord.getWorkspace())
+        .tableName(filesRecord.getTableName())
+        .metadataKey(filesRecord.getMetadataKey())
+        .metadataIdentifier(filesRecord.getMetadataIdentifier())
+        .metadataType(filesRecord.getMetadataType())
+        .location(filesRecord.getLocation())
+        .schema(filesRecord.getSchema())
+        .columnsStatistics(ConverterUtil.convertToMapStringString(filesRecord.getColumnStatistics()))
+        .metadataStatistics(ConverterUtil.convertToListString(filesRecord.getMetadataStatistics()))
+        .path(filesRecord.getPath())
+        .lastModifiedTime(filesRecord.getLastModifiedTime())
+        .additionalMetadata(filesRecord.getAdditionalMetadata())
+        .build();
+    }
+
+    @Override
+    public FilesRecord toRecord(TableMetadataUnit unit) {
+      FilesRecord record = new FilesRecord();
+      record.setStoragePlugin(unit.storagePlugin());
+      record.setWorkspace(unit.workspace());
+      record.setTableName(unit.tableName());
+      record.setMetadataKey(unit.metadataKey());
+      record.setMetadataIdentifier(unit.metadataIdentifier());
+      record.setMetadataType(unit.metadataType());
+      record.setLocation(unit.location());
+      record.setSchema(unit.schema());
+      record.setColumnStatistics(ConverterUtil.convertToString(unit.columnsStatistics()));
+      record.setMetadataStatistics(ConverterUtil.convertToString(unit.metadataStatistics()));
+      record.setPath(unit.path());
+      record.setLastModifiedTime(unit.lastModifiedTime());
+      record.setAdditionalMetadata(unit.additionalMetadata());
+      return record;
+    }
+
+    @Override
+    protected Map<MetastoreColumn, Field<?>> fieldMapper() {
+      return COLUMNS_MAP;
+    }
+
+    @Override
+    protected RdbmsFilterExpressionVisitor filterVisitor() {
+      return FILTER_VISITOR;
+    }
+
+    @Override
+    protected Function<TableMetadataUnit, List<String>> partitionKey() {
+      return COMPONENT_PARTITION_KEY;
+    }
+
+    @Override
+    protected List<Condition> toConditions(List<String> values) {
+      assert values.size() == 4;
+      return Arrays.asList(
+        Tables.FILES.STORAGE_PLUGIN.eq(values.get(0)),
+        Tables.FILES.WORKSPACE.eq(values.get(1)),
+        Tables.FILES.TABLE_NAME.eq(values.get(2)),
+        Tables.FILES.METADATA_KEY.eq(values.get(3)));
+    }
+  }
+
+  /**
+   * {@link TablesMetadataMapper} implementation for {@link Tables#ROW_GROUPS} table.
+   */
+  public static class RowGroupMapper extends TablesMetadataMapper<RowGroupsRecord> {
+
+    private static final RowGroupMapper INSTANCE = new RowGroupMapper();
+
+    private static final Map<MetastoreColumn, Field<?>> COLUMNS_MAP = ImmutableMap.<MetastoreColumn, Field<?>>builder()
+      .put(MetastoreColumn.STORAGE_PLUGIN, Tables.ROW_GROUPS.STORAGE_PLUGIN)
+      .put(MetastoreColumn.WORKSPACE, Tables.ROW_GROUPS.WORKSPACE)
+      .put(MetastoreColumn.TABLE_NAME, Tables.ROW_GROUPS.TABLE_NAME)
+      .put(MetastoreColumn.METADATA_KEY, Tables.ROW_GROUPS.METADATA_KEY)
+      .put(MetastoreColumn.METADATA_IDENTIFIER, Tables.ROW_GROUPS.METADATA_IDENTIFIER)
+      .put(MetastoreColumn.METADATA_TYPE, Tables.ROW_GROUPS.METADATA_TYPE)
+      .put(MetastoreColumn.LOCATION, Tables.ROW_GROUPS.LOCATION)
+      .put(MetastoreColumn.SCHEMA, Tables.ROW_GROUPS.SCHEMA)
+      .put(MetastoreColumn.COLUMNS_STATISTICS, Tables.ROW_GROUPS.COLUMN_STATISTICS)
+      .put(MetastoreColumn.METADATA_STATISTICS, Tables.ROW_GROUPS.METADATA_STATISTICS)
+      .put(MetastoreColumn.PATH, Tables.ROW_GROUPS.PATH)
+      .put(MetastoreColumn.ROW_GROUP_INDEX, Tables.ROW_GROUPS.ROW_GROUP_INDEX)
+      .put(MetastoreColumn.HOST_AFFINITY, Tables.ROW_GROUPS.HOST_AFFINITY)
+      .put(MetastoreColumn.LAST_MODIFIED_TIME, Tables.ROW_GROUPS.LAST_MODIFIED_TIME)
+      .put(MetastoreColumn.ADDITIONAL_METADATA, Tables.ROW_GROUPS.ADDITIONAL_METADATA)
+      .build();
+
+    private static final RdbmsFilterExpressionVisitor FILTER_VISITOR = new RdbmsFilterExpressionVisitor(COLUMNS_MAP);
+
+    public static RowGroupMapper get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public Table<RowGroupsRecord> table() {
+      return Tables.ROW_GROUPS;
+    }
+
+    @Override
+    public TableMetadataUnit toUnit(Record record) {
+      RowGroupsRecord rowGroupsRecord = (RowGroupsRecord) record;
+      return TableMetadataUnit.builder()
+        .storagePlugin(rowGroupsRecord.getStoragePlugin())
+        .workspace(rowGroupsRecord.getWorkspace())
+        .tableName(rowGroupsRecord.getTableName())
+        .metadataKey(rowGroupsRecord.getMetadataKey())
+        .metadataIdentifier(rowGroupsRecord.getMetadataIdentifier())
+        .metadataType(rowGroupsRecord.getMetadataType())
+        .location(rowGroupsRecord.getLocation())
+        .schema(rowGroupsRecord.getSchema())
+        .columnsStatistics(ConverterUtil.convertToMapStringString(rowGroupsRecord.getColumnStatistics()))
+        .metadataStatistics(ConverterUtil.convertToListString(rowGroupsRecord.getMetadataStatistics()))
+        .path(rowGroupsRecord.getPath())
+        .rowGroupIndex(rowGroupsRecord.getRowGroupIndex())
+        .hostAffinity(ConverterUtil.convertToMapStringFloat(rowGroupsRecord.getHostAffinity()))
+        .lastModifiedTime(rowGroupsRecord.getLastModifiedTime())
+        .additionalMetadata(rowGroupsRecord.getAdditionalMetadata())
+        .build();
+    }
+
+    @Override
+    public RowGroupsRecord toRecord(TableMetadataUnit unit) {
+      RowGroupsRecord record = new RowGroupsRecord();
+      record.setStoragePlugin(unit.storagePlugin());
+      record.setWorkspace(unit.workspace());
+      record.setTableName(unit.tableName());
+      record.setMetadataKey(unit.metadataKey());
+      record.setMetadataIdentifier(unit.metadataIdentifier());
+      record.setMetadataType(unit.metadataType());
+      record.setLocation(unit.location());
+      record.setSchema(unit.schema());
+      record.setColumnStatistics(ConverterUtil.convertToString(unit.columnsStatistics()));
+      record.setMetadataStatistics(ConverterUtil.convertToString(unit.metadataStatistics()));
+      record.setPath(unit.path());
+      record.setRowGroupIndex(unit.rowGroupIndex());
+      record.setHostAffinity(ConverterUtil.convertToString(unit.hostAffinity()));
+      record.setLastModifiedTime(unit.lastModifiedTime());
+      record.setAdditionalMetadata(unit.additionalMetadata());
+      return record;
+    }
+
+    @Override
+    protected Map<MetastoreColumn, Field<?>> fieldMapper() {
+      return COLUMNS_MAP;
+    }
+
+    @Override
+    protected RdbmsFilterExpressionVisitor filterVisitor() {
+      return FILTER_VISITOR;
+    }
+
+    @Override
+    protected Function<TableMetadataUnit, List<String>> partitionKey() {
+      return COMPONENT_PARTITION_KEY;
+    }
+
+    @Override
+    protected List<Condition> toConditions(List<String> values) {
+      assert values.size() == 4;
+      return Arrays.asList(
+        Tables.ROW_GROUPS.STORAGE_PLUGIN.eq(values.get(0)),
+        Tables.ROW_GROUPS.WORKSPACE.eq(values.get(1)),
+        Tables.ROW_GROUPS.TABLE_NAME.eq(values.get(2)),
+        Tables.ROW_GROUPS.METADATA_KEY.eq(values.get(3)));
+    }
+  }
+
+  /**
+   * {@link TablesMetadataMapper} implementation for {@link Tables#PARTITIONS} table.
+   */
+  public static class PartitionMapper extends TablesMetadataMapper<PartitionsRecord> {
+
+    private static final PartitionMapper INSTANCE = new PartitionMapper();
+
+    private static final Map<MetastoreColumn, Field<?>> COLUMNS_MAP = ImmutableMap.<MetastoreColumn, Field<?>>builder()
+      .put(MetastoreColumn.STORAGE_PLUGIN, Tables.PARTITIONS.STORAGE_PLUGIN)
+      .put(MetastoreColumn.WORKSPACE, Tables.PARTITIONS.WORKSPACE)
+      .put(MetastoreColumn.TABLE_NAME, Tables.PARTITIONS.TABLE_NAME)
+      .put(MetastoreColumn.METADATA_KEY, Tables.PARTITIONS.METADATA_KEY)
+      .put(MetastoreColumn.METADATA_IDENTIFIER, Tables.PARTITIONS.METADATA_IDENTIFIER)
+      .put(MetastoreColumn.METADATA_TYPE, Tables.PARTITIONS.METADATA_TYPE)
+      .put(MetastoreColumn.SCHEMA, Tables.PARTITIONS.SCHEMA)
+      .put(MetastoreColumn.COLUMNS_STATISTICS, Tables.PARTITIONS.COLUMN_STATISTICS)
+      .put(MetastoreColumn.METADATA_STATISTICS, Tables.PARTITIONS.METADATA_STATISTICS)
+      .put(MetastoreColumn.COLUMN, Tables.PARTITIONS.COLUMN)
+      .put(MetastoreColumn.LOCATIONS, Tables.PARTITIONS.LOCATIONS)
+      .put(MetastoreColumn.PARTITION_VALUES, Tables.PARTITIONS.PARTITION_VALUES)
+      .put(MetastoreColumn.LAST_MODIFIED_TIME, Tables.PARTITIONS.LAST_MODIFIED_TIME)
+      .put(MetastoreColumn.ADDITIONAL_METADATA, Tables.PARTITIONS.ADDITIONAL_METADATA)
+      .build();
+
+    private static final RdbmsFilterExpressionVisitor FILTER_VISITOR = new RdbmsFilterExpressionVisitor(COLUMNS_MAP);
+
+    public static PartitionMapper get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public Table<PartitionsRecord> table() {
+      return Tables.PARTITIONS;
+    }
+
+    @Override
+    public TableMetadataUnit toUnit(Record record) {
+      PartitionsRecord partitionsRecord = (PartitionsRecord) record;
+      return TableMetadataUnit.builder()
+        .storagePlugin(partitionsRecord.getStoragePlugin())
+        .workspace(partitionsRecord.getWorkspace())
+        .tableName(partitionsRecord.getTableName())
+        .metadataKey(partitionsRecord.getMetadataKey())
+        .metadataIdentifier(partitionsRecord.getMetadataIdentifier())
+        .metadataType(partitionsRecord.getMetadataType())
+        .schema(partitionsRecord.getSchema())
+        .columnsStatistics(ConverterUtil.convertToMapStringString(partitionsRecord.getColumnStatistics()))
+        .metadataStatistics(ConverterUtil.convertToListString(partitionsRecord.getMetadataStatistics()))
+        .column(partitionsRecord.getColumn())
+        .locations(ConverterUtil.convertToListString(partitionsRecord.getLocations()))
+        .partitionValues(ConverterUtil.convertToListString(partitionsRecord.getPartitionValues()))
+        .lastModifiedTime(partitionsRecord.getLastModifiedTime())
+        .additionalMetadata(partitionsRecord.getAdditionalMetadata())
+        .build();
+    }
+
+    @Override
+    public PartitionsRecord toRecord(TableMetadataUnit unit) {
+      PartitionsRecord record = new PartitionsRecord();
+      record.setStoragePlugin(unit.storagePlugin());
+      record.setWorkspace(unit.workspace());
+      record.setTableName(unit.tableName());
+      record.setMetadataKey(unit.metadataKey());
+      record.setMetadataIdentifier(unit.metadataIdentifier());
+      record.setMetadataType(unit.metadataType());
+      record.setSchema(unit.schema());
+      record.setColumnStatistics(ConverterUtil.convertToString(unit.columnsStatistics()));
+      record.setMetadataStatistics(ConverterUtil.convertToString(unit.metadataStatistics()));
+      record.setColumn(unit.column());
+      record.setLocations(ConverterUtil.convertToString(unit.locations()));
+      record.setPartitionValues(ConverterUtil.convertToString(unit.partitionValues()));
+      record.setLastModifiedTime(unit.lastModifiedTime());
+      record.setAdditionalMetadata(unit.additionalMetadata());
+      return record;
+    }
+
+    @Override
+    protected Map<MetastoreColumn, Field<?>> fieldMapper() {
+      return COLUMNS_MAP;
+    }
+
+    @Override
+    protected RdbmsFilterExpressionVisitor filterVisitor() {
+      return FILTER_VISITOR;
+    }
+
+    @Override
+    protected Function<TableMetadataUnit, List<String>> partitionKey() {
+      return COMPONENT_PARTITION_KEY;
+    }
+
+    @Override
+    protected List<Condition> toConditions(List<String> values) {
+      assert values.size() == 4;
+      return Arrays.asList(
+        Tables.PARTITIONS.STORAGE_PLUGIN.eq(values.get(0)),
+        Tables.PARTITIONS.WORKSPACE.eq(values.get(1)),
+        Tables.PARTITIONS.TABLE_NAME.eq(values.get(2)),
+        Tables.PARTITIONS.METADATA_KEY.eq(values.get(3)));
+    }
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/components/tables/TablesTransformer.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/components/tables/TablesTransformer.java
new file mode 100644
index 0000000..314a8cd
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/components/tables/TablesTransformer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.components.tables;
+
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.rdbms.exception.RdbmsMetastoreException;
+import org.apache.drill.metastore.rdbms.operate.RdbmsOperation;
+import org.apache.drill.metastore.rdbms.transform.AbstractTransformer;
+import org.apache.drill.metastore.rdbms.transform.MetadataMapper;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.jooq.Record;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Transformer implementation for RDBMS Metastore tables component.
+ */
+public class TablesTransformer extends AbstractTransformer<TableMetadataUnit> {
+
+  private static final TablesTransformer INSTANCE = new TablesTransformer();
+
+  private final Map<MetadataType, MetadataMapper<TableMetadataUnit, ? extends Record>> MAPPERS = ImmutableMap.of(
+    MetadataType.TABLE, TablesMetadataMapper.TableMapper.get(),
+    MetadataType.SEGMENT, TablesMetadataMapper.SegmentMapper.get(),
+    MetadataType.FILE, TablesMetadataMapper.FileMapper.get(),
+    MetadataType.ROW_GROUP, TablesMetadataMapper.RowGroupMapper.get(),
+    MetadataType.PARTITION, TablesMetadataMapper.PartitionMapper.get()
+  );
+
+  public static TablesTransformer get() {
+    return INSTANCE;
+  }
+
+  @Override
+  public Set<MetadataMapper<TableMetadataUnit, ? extends Record>> toMappers(Set<MetadataType> metadataTypes) {
+    if (metadataTypes.contains(MetadataType.ALL)) {
+      return Sets.newHashSet(MAPPERS.values());
+    }
+
+    return metadataTypes.stream()
+      .map(this::toMapper)
+      .collect(Collectors.toSet());
+  }
+
+  @Override
+  public MetadataMapper<TableMetadataUnit, ? extends Record> toMapper(MetadataType metadataType) {
+    MetadataMapper<TableMetadataUnit, ? extends Record> mapper = MAPPERS.get(metadataType);
+    if (mapper == null) {
+      throw new RdbmsMetastoreException("Metadata mapper is absent for type: " + metadataType);
+    }
+    return mapper;
+  }
+
+  @Override
+  public List<RdbmsOperation.Overwrite> toOverwrite(List<TableMetadataUnit> units) {
+    Map<String, List<TableMetadataUnit>> unitsByTables = units.stream()
+      .collect(Collectors.groupingBy(TableMetadataUnit::metadataType));
+
+    return unitsByTables.entrySet().stream()
+      .map(entry -> toOverwrite(entry.getKey(), entry.getValue()))
+      .collect(Collectors.toList());
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/config/RdbmsConfigConstants.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/config/RdbmsConfigConstants.java
new file mode 100644
index 0000000..74c34e6
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/config/RdbmsConfigConstants.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.config;
+
+import org.apache.drill.metastore.config.MetastoreConfigConstants;
+
+/**
+ * Drill RDBMS Metastore configuration which is defined
+ * in {@link MetastoreConfigConstants#MODULE_RESOURCE_FILE_NAME} file.
+ */
+public interface RdbmsConfigConstants {
+
+  /**
+   * Drill RDBMS Metastore configuration properties namespace.
+   */
+  String BASE = MetastoreConfigConstants.BASE + "rdbms.";
+
+  /**
+   * RDBMS Metastore data source namespace.
+   */
+  String DATA_SOURCE_NAMESPACE = BASE + "data_source.";
+
+  /**
+   * RDBMS Metastore data source driver property. Required.
+   */
+  String DATA_SOURCE_DRIVER = DATA_SOURCE_NAMESPACE + "driver";
+
+  /**
+   * RDBMS Metastore data source url property. Required.
+   */
+  String DATA_SOURCE_URL = DATA_SOURCE_NAMESPACE + "url";
+
+  /**
+   * RDBMS Metastore data source url property. Optional.
+   */
+  String DATA_SOURCE_USER_NAME = DATA_SOURCE_NAMESPACE + "username";
+
+  /**
+   * RDBMS Metastore data source url property. Optional.
+   */
+  String DATA_SOURCE_PASSWORD = DATA_SOURCE_NAMESPACE + "password";
+
+  /**
+   * RDBMS Metastore data source properties. Optional.
+   * Can be set based on Hikari properties: <a href="https://github.com/brettwooldridge/HikariCP">.
+   */
+  String DATA_SOURCE_PROPERTIES = DATA_SOURCE_NAMESPACE + "properties";
+
+  /**
+   * RDBMS Metastore database namespace.
+   */
+  String DATABASE_NAMESPACE = BASE + "database.";
+
+  /**
+   * RDBMS Metastore SQLite database namespace.
+   */
+  String SQLITE_NAMESPACE = DATABASE_NAMESPACE + "sqlite.";
+
+  /**
+   * RDBMS Metastore SQLite database path namespace.
+   */
+  String SQLITE_PATH_NAMESPACE = SQLITE_NAMESPACE + "path.";
+
+  /**
+   * RDBMS Metastore SQLite database path value.
+   */
+  String SQLITE_PATH_VALUE = SQLITE_PATH_NAMESPACE + "value";
+
+  /**
+   * Flag which indicates if RDBMS Metastore SQLite database path value
+   * should be created prior to data source initialization.
+   * Flag can be set to {@code false} if path already exists.
+   */
+  String SQLITE_PATH_CREATE = SQLITE_PATH_NAMESPACE + "create";
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/exception/RdbmsMetastoreException.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/exception/RdbmsMetastoreException.java
new file mode 100644
index 0000000..74e44f9
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/exception/RdbmsMetastoreException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.exception;
+
+import org.apache.drill.metastore.exceptions.MetastoreException;
+
+/**
+ * Specific RDBMS Drill Metastore runtime exception to indicate exceptions thrown
+ * during RDBMS Drill Metastore code execution.
+ */
+public class RdbmsMetastoreException extends MetastoreException {
+
+  public RdbmsMetastoreException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public RdbmsMetastoreException(String message) {
+    super(message);
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsMetadata.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsMetadata.java
new file mode 100644
index 0000000..59b82f9
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsMetadata.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.operate;
+
+import org.apache.drill.metastore.operate.Metadata;
+
+/**
+ * Implementation of {@link Metadata} interface.
+ * Indicates that RDBMS Metastore does not support versioning.
+ */
+public class RdbmsMetadata implements Metadata {
+
+  @Override
+  public boolean supportsVersioning() {
+    return false;
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsModify.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsModify.java
new file mode 100644
index 0000000..81dccb3
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsModify.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.operate;
+
+import org.apache.drill.metastore.operate.AbstractModify;
+import org.apache.drill.metastore.operate.Delete;
+import org.apache.drill.metastore.operate.MetadataTypeValidator;
+import org.apache.drill.metastore.operate.Modify;
+import org.apache.drill.metastore.rdbms.RdbmsMetastoreContext;
+import org.apache.drill.metastore.rdbms.exception.RdbmsMetastoreException;
+import org.apache.drill.metastore.rdbms.transform.Transformer;
+import org.jooq.DSLContext;
+import org.jooq.impl.DSL;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of {@link Modify} interface based on {@link AbstractModify} parent class.
+ * Modifies information in RDBMS tables based on given overwrite or delete operations.
+ * Executes given operations in one transaction.
+ *
+ * @param <T> Metastore component unit type
+ */
+public class RdbmsModify<T> extends AbstractModify<T> {
+
+  private final RdbmsMetastoreContext<T> context;
+  private final Transformer<T> transformer;
+  private final List<RdbmsOperation> operations = new ArrayList<>();
+
+  public RdbmsModify(MetadataTypeValidator metadataTypeValidator, RdbmsMetastoreContext<T> context) {
+    super(metadataTypeValidator);
+    this.context = context;
+    this.transformer = context.transformer();
+  }
+
+  @Override
+  public void execute() {
+    executeOperations(operations);
+  }
+
+  @Override
+  public void purge() {
+    // not using truncate since we want to rollback if something goes wrong
+    List<RdbmsOperation> deletes = new ArrayList<>(transformer.toDeleteAll());
+    executeOperations(deletes);
+  }
+
+  /**
+   * Executes list of provided RDBMS operations in one transaction.
+   *
+   * @param operations list of RDBMS operations
+   */
+  private void executeOperations(List<RdbmsOperation> operations) {
+    try (DSLContext executor = context.executorProvider().executor()) {
+      executor.transaction(configuration -> {
+        DSLContext transactionalExecutor = DSL.using(configuration);
+        operations.forEach(operation -> operation.execute(transactionalExecutor));
+      });
+    } catch (RuntimeException e) {
+      throw new RdbmsMetastoreException("Error during Metastore modify operation execution: " + e.getMessage(), e);
+    }
+  }
+
+  @Override
+  protected void addOverwrite(List<T> units) {
+    operations.addAll(transformer.toOverwrite(units));
+  }
+
+  @Override
+  protected void addDelete(Delete delete) {
+    operations.addAll(transformer.toDelete(delete));
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsOperation.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsOperation.java
new file mode 100644
index 0000000..cab8f3e
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsOperation.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.operate;
+
+import org.jooq.Condition;
+import org.jooq.DSLContext;
+import org.jooq.Record;
+import org.jooq.Table;
+import org.jooq.impl.DSL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * RDBMS operation main goal is to execute SQL code using provided query executor.
+ */
+public interface RdbmsOperation {
+
+  void execute(DSLContext executor);
+
+  /**
+   * Executes overwrite operation steps for the given table.
+   * First deletes data based on given delete conditions, than inserts new table records.
+   */
+  class Overwrite implements RdbmsOperation {
+
+    private static final Logger logger = LoggerFactory.getLogger(Overwrite.class);
+
+    private final Table<? extends Record> table;
+    private final List<Condition> deleteConditions;
+    private final List<? extends Record> records;
+
+    public Overwrite(Table<? extends Record> table,
+                     List<Condition> deleteConditions,
+                     List<? extends Record> records) {
+      this.table = table;
+      this.deleteConditions = deleteConditions;
+      this.records = records;
+    }
+
+    public Table<? extends Record> table() {
+      return table;
+    }
+
+    public List<Condition> deleteConditions() {
+      return deleteConditions;
+    }
+
+    public List<? extends Record> records() {
+      return records;
+    }
+
+    @Override
+    public void execute(DSLContext executor) {
+      deleteConditions.forEach(condition -> {
+        logger.debug("Deleting data from RDBMS Metastore table {} during overwrite using condition: {}", table, condition);
+        executor.deleteFrom(table)
+          .where(condition)
+          .execute();
+
+      });
+
+      records.forEach(record -> {
+        logger.debug("Inserting data into RDBMS Metastore table {}:\n{}", table, record);
+        executor.insertInto(table)
+          .set(record)
+          .execute();
+      });
+    }
+  }
+
+  /**
+   * Executes delete operation steps for the given table.
+   * Deletes data based on given delete condition.
+   */
+  class Delete implements RdbmsOperation {
+
+    private static final Logger logger = LoggerFactory.getLogger(Delete.class);
+
+    private final Table<? extends Record> table;
+    private final Condition condition;
+
+    public Delete(Table<? extends Record> table) {
+      this(table, DSL.trueCondition());
+    }
+
+    public Delete(Table<? extends Record> table, Condition condition) {
+      this.table = table;
+      this.condition = condition;
+    }
+
+    public Table<? extends Record> table() {
+      return table;
+    }
+
+    public Condition condition() {
+      return condition;
+    }
+
+    @Override
+    public void execute(DSLContext executor) {
+      logger.debug("Deleting data from RDBMS Metastore table {} using condition: {}", table, condition);
+      executor.deleteFrom(table)
+        .where(condition)
+        .execute();
+    }
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsRead.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsRead.java
new file mode 100644
index 0000000..821b719
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/operate/RdbmsRead.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.operate;
+
+import org.apache.drill.metastore.operate.AbstractRead;
+import org.apache.drill.metastore.operate.MetadataTypeValidator;
+import org.apache.drill.metastore.operate.Read;
+import org.apache.drill.metastore.rdbms.RdbmsMetastoreContext;
+import org.apache.drill.metastore.rdbms.exception.RdbmsMetastoreException;
+import org.apache.drill.metastore.rdbms.transform.MetadataMapper;
+import org.jooq.Condition;
+import org.jooq.DSLContext;
+import org.jooq.Field;
+import org.jooq.Record;
+import org.jooq.Result;
+import org.jooq.exception.DataAccessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Implementation of {@link Read} interface based on {@link AbstractRead} parent class.
+ * Reads information from RDBMS tables based on given filter expression.
+ * Supports reading information for specific columns.
+ *
+ * @param <T> Metastore component unit type
+ */
+public class RdbmsRead<T> extends AbstractRead<T> {
+
+  private static final Logger logger = LoggerFactory.getLogger(RdbmsRead.class);
+
+  private final RdbmsMetastoreContext<T> context;
+
+  public RdbmsRead(MetadataTypeValidator metadataTypeValidator, RdbmsMetastoreContext<T> context) {
+    super(metadataTypeValidator);
+    this.context = context;
+  }
+
+  @Override
+  protected List<T> internalExecute() {
+    Set<MetadataMapper<T, ? extends Record>> mappers = context.transformer().toMappers(metadataTypes);
+
+    if (mappers.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    List<T> units = new ArrayList<>();
+    try (DSLContext executor = context.executorProvider().executor()) {
+      for (MetadataMapper<T, ? extends Record> mapper : mappers) {
+        Condition condition = mapper.toCondition(filter);
+        logger.debug("Query data from RDBMS Metastore table {} using condition: {}", mapper.table(), condition);
+
+        List<Field<?>> fields = columns.isEmpty()
+          ? Arrays.asList(mapper.table().fields())
+          : mapper.toFields(columns);
+
+        try {
+          if (fields.isEmpty()) {
+            units.addAll(countRecords(executor, mapper, condition));
+          } else {
+            units.addAll(queryRecords(executor, mapper, fields, condition));
+          }
+        } catch (DataAccessException e) {
+          throw new RdbmsMetastoreException("Error when reading data from Metastore: " + e.getMessage(), e);
+        }
+      }
+      return units;
+    }
+  }
+
+  /**
+   * Counts number of records which qualifies given condition,
+   * returns the same number of empty Metastore component units.
+   * Is used when no fields matching fields in the table are requested.
+   *
+   * @param executor query executor
+   * @param mapper table mapper
+   * @param condition query condition
+   * @return list of Metastore component units
+   * @throws DataAccessException if unable to query data from table
+   */
+  private List<T> countRecords(DSLContext executor,
+                               MetadataMapper<T, ? extends Record> mapper,
+                               Condition condition) throws DataAccessException {
+    Integer recordsNumber = executor
+      .selectCount()
+      .from(mapper.table())
+      .where(condition)
+      .fetchOne()
+      .value1();
+
+    return recordsNumber == null || recordsNumber == 0
+      ? Collections.emptyList()
+      : IntStream.range(0, recordsNumber)
+         .mapToObj(i -> mapper.emptyUnit())
+         .collect(Collectors.toList());
+  }
+
+  /**
+   * Programmatically constructs query to the given table based on filter condition
+   * and requested fields, converts the results into the list of Metastore component units.
+   *
+   * @param executor query executor
+   * @param mapper table mapper
+   * @param fields list of requested fields
+   * @param condition query condition
+   * @return list of Metastore component units
+   * @throws DataAccessException if unable to query data from table
+   */
+  private List<T> queryRecords(DSLContext executor,
+                               MetadataMapper<T, ? extends Record> mapper,
+                               List<Field<?>> fields,
+                               Condition condition) throws DataAccessException {
+    Result<? extends Record> records = executor
+      .select(fields)
+      .from(mapper.table())
+      .where(condition)
+      .fetchInto(mapper.table());
+
+    return records.stream()
+      .map(mapper::toUnit)
+      .collect(Collectors.toList());
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/AbstractMetadataMapper.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/AbstractMetadataMapper.java
new file mode 100644
index 0000000..fe11a4d
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/AbstractMetadataMapper.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.transform;
+
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.jooq.Condition;
+import org.jooq.Field;
+import org.jooq.Record;
+import org.jooq.impl.DSL;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract implementation of {@link MetadataMapper} interface which contains
+ * common code for all Metastore component metadata and RDBMS table types.
+ *
+ * @param <U> Metastore component metadata type
+ * @param <R> RDBMS table record type
+ */
+public abstract class AbstractMetadataMapper<U, R extends Record> implements MetadataMapper<U, R> {
+
+  @Override
+  public List<Field<?>> toFields(List<MetastoreColumn> columns) {
+    return columns.stream()
+      .map(column -> fieldMapper().get(column))
+      // ignore absent fields
+      .filter(Objects::nonNull)
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public Condition toCondition(FilterExpression filter) {
+    return filter == null ? DSL.noCondition() : filter.accept(filterVisitor());
+  }
+
+  /**
+   * @return mapper specific field mapper
+   */
+  protected abstract Map<MetastoreColumn, Field<?>> fieldMapper();
+
+  /**
+   * @return mapper specific filter visitor
+   */
+  protected abstract RdbmsFilterExpressionVisitor filterVisitor();
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/AbstractTransformer.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/AbstractTransformer.java
new file mode 100644
index 0000000..8152d8e
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/AbstractTransformer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.transform;
+
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.rdbms.exception.RdbmsMetastoreException;
+import org.apache.drill.metastore.rdbms.operate.RdbmsOperation;
+import org.jooq.Condition;
+import org.jooq.Record;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract implementation of {@link Transformer} interface which contains
+ * common code for all Metastore component metadata types.
+ *
+ * @param <T> Metastore component metadata type
+ */
+public abstract class AbstractTransformer<T> implements Transformer<T> {
+
+  @Override
+  public List<RdbmsOperation.Delete> toDelete(org.apache.drill.metastore.operate.Delete delete) {
+    Set<MetadataMapper<T, ? extends Record>> mappers = toMappers(delete.metadataTypes());
+    return mappers.stream()
+      .map(mapper -> new RdbmsOperation.Delete(mapper.table(), mapper.toCondition(delete.filter())))
+      .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<RdbmsOperation.Delete> toDeleteAll() {
+    Set<MetadataMapper<T, ? extends Record>> mappers = toMappers(Collections.singleton(MetadataType.ALL));
+    return mappers.stream()
+      .map(MetadataMapper::table)
+      .map(RdbmsOperation.Delete::new)
+      .collect(Collectors.toList());
+  }
+
+  protected RdbmsOperation.Overwrite toOverwrite(String metadataTypeString, List<T> units) {
+    MetadataType metadataType = MetadataType.fromValue(metadataTypeString);
+    if (metadataType == null) {
+      throw new RdbmsMetastoreException("Metadata type must be specified during insert / update");
+    } else {
+      MetadataMapper<T, ? extends Record> mapper = toMapper(metadataType);
+
+      List<Condition> deleteConditions = mapper.toDeleteConditions(units);
+
+      List<? extends Record> records = units.stream()
+        .map(mapper::toRecord)
+        .collect(Collectors.toList());
+
+      return new RdbmsOperation.Overwrite(mapper.table(), deleteConditions, records);
+    }
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/MetadataMapper.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/MetadataMapper.java
new file mode 100644
index 0000000..15f6cdd
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/MetadataMapper.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.transform;
+
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.jooq.Condition;
+import org.jooq.Field;
+import org.jooq.Record;
+import org.jooq.Table;
+
+import java.util.List;
+
+/**
+ * Provides various mapping, transformation methods for the given
+ * RDBMS table and Metastore component metadata unit.
+ *
+ * @param <U> Metastore component metadata type
+ * @param <R> RDBMS table record type
+ */
+public interface MetadataMapper<U, R extends Record> {
+
+  /**
+   * @return RDBMS table instance
+   */
+  Table<R> table();
+
+  /**
+   * @return Metastore component metadata unit instance with all fields set to null
+   */
+  U emptyUnit();
+
+  /**
+   * Converts RDBMS table record into Metastore component metadata unit.
+   *
+   * @param record RDBMS table record
+   * @return Metastore component metadata unit instance
+   */
+  U toUnit(Record record);
+
+  /**
+   * Converts Metastore component metadata unit into RDBMS table record.
+   *
+   * @param unit Metastore component metadata unit
+   * @return RDBMS table record instance
+   */
+  R toRecord(U unit);
+
+  /**
+   * Matches given list of Metastore columns to the available
+   * RDBMS table columns.
+   *
+   * @param columns list of Metastore columns
+   * @return list of RDBMS table fields
+   */
+  List<Field<?>> toFields(List<MetastoreColumn> columns);
+
+  /**
+   * Converts Metastore filter expression into JOOQ condition instance
+   * which will be used as where clause in SQL query.
+   *
+   * @param filter filter expression
+   * @return JOOQ condition instance
+   */
+  Condition toCondition(FilterExpression filter);
+
+  /**
+   * Since data in Metastore is deleted by partition, extracts
+   * partitions values from given list of Metastore component metadata units
+   * and creates list of delete conditions based on them.
+   *
+   * @param units list of Metastore component metadata units
+   * @return list of JOOQ condition instances
+   */
+  List<Condition> toDeleteConditions(List<U> units);
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/RdbmsFilterExpressionVisitor.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/RdbmsFilterExpressionVisitor.java
new file mode 100644
index 0000000..39eb72a
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/RdbmsFilterExpressionVisitor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.transform;
+
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.expressions.DoubleExpressionPredicate;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.expressions.IsPredicate;
+import org.apache.drill.metastore.expressions.ListPredicate;
+import org.apache.drill.metastore.expressions.SimplePredicate;
+import org.apache.drill.metastore.expressions.SingleExpressionPredicate;
+import org.jooq.Condition;
+import org.jooq.Field;
+import org.jooq.impl.DSL;
+
+import java.util.Map;
+
+/**
+ * Visits {@link FilterExpression} implementations and transforms them into JOOQ {@link Condition}.
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class RdbmsFilterExpressionVisitor implements FilterExpression.Visitor<Condition> {
+  private final Map<MetastoreColumn, Field<?>> fields;
+
+  public RdbmsFilterExpressionVisitor(Map<MetastoreColumn, Field<?>> fields) {
+    this.fields = fields;
+  }
+
+  @Override
+  public Condition visit(SimplePredicate.Equal<?> expression) {
+    Field field = fields.get(expression.column());
+    return field == null ? DSL.falseCondition() : field.eq(expression.value());
+  }
+
+  @Override
+  public Condition visit(SimplePredicate.NotEqual<?> expression) {
+    Field field = fields.get(expression.column());
+    return field == null ? DSL.trueCondition() : field.notEqual(expression.value());
+  }
+
+  @Override
+  public Condition visit(SimplePredicate.LessThan<?> expression) {
+    Field field = fields.get(expression.column());
+    return field == null ? DSL.falseCondition() : field.lessThan(expression.value());
+  }
+
+  @Override
+  public Condition visit(SimplePredicate.LessThanOrEqual<?> expression) {
+    Field field = fields.get(expression.column());
+    return field == null ? DSL.falseCondition() : field.lessOrEqual(expression.value());
+  }
+
+  @Override
+  public Condition visit(SimplePredicate.GreaterThan<?> expression) {
+    Field field = fields.get(expression.column());
+    return field == null ? DSL.falseCondition() : field.greaterThan(expression.value());
+  }
+
+  @Override
+  public Condition visit(SimplePredicate.GreaterThanOrEqual<?> expression) {
+    Field field = fields.get(expression.column());
+    return field == null ? DSL.falseCondition() : field.greaterOrEqual(expression.value());
+  }
+
+  @Override
+  public Condition visit(ListPredicate.In<?> expression) {
+    Field field = fields.get(expression.column());
+    return field == null ? DSL.falseCondition() : field.in(expression.values());
+  }
+
+  @Override
+  public Condition visit(ListPredicate.NotIn<?> expression) {
+    Field field = fields.get(expression.column());
+    return field == null ? DSL.trueCondition() : field.notIn(expression.values());
+  }
+
+  @Override
+  public Condition visit(IsPredicate.IsNull expression) {
+    Field field = fields.get(expression.column());
+    return field == null ? DSL.trueCondition() : field.isNull();
+  }
+
+  @Override
+  public Condition visit(IsPredicate.IsNotNull expression) {
+    Field field = fields.get(expression.column());
+    return field == null ? DSL.falseCondition() : field.isNotNull();
+  }
+
+  @Override
+  public Condition visit(SingleExpressionPredicate.Not expression) {
+    return DSL.not(expression.expression().accept(this));
+  }
+
+  @Override
+  public Condition visit(DoubleExpressionPredicate.And expression) {
+    Condition left = expression.left().accept(this);
+    Condition right = expression.right().accept(this);
+    return DSL.and(left, right);
+  }
+
+  @Override
+  public Condition visit(DoubleExpressionPredicate.Or expression) {
+    Condition left = expression.left().accept(this);
+    Condition right = expression.right().accept(this);
+    return DSL.or(left, right);
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/Transformer.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/Transformer.java
new file mode 100644
index 0000000..c25d1b7
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/transform/Transformer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.transform;
+
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.operate.Delete;
+import org.apache.drill.metastore.rdbms.operate.RdbmsOperation;
+import org.jooq.Record;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Provides various methods for RDBMS Metastore data, filters, operations transformation.
+ *
+ * @param <T> Metastore component metadata type
+ */
+public interface Transformer<T> {
+
+  /**
+   * Returns set of metadata mappers corresponding to the given metadata types.
+   *
+   * @param metadataTypes set of metadata types
+   * @return set of metadata mappers
+   */
+  Set<MetadataMapper<T, ? extends Record>> toMappers(Set<MetadataType> metadataTypes);
+
+  /**
+   * Returns metadata mappers corresponding to the given metadata type.
+   *
+   * @param metadataType metadata type
+   * @return metadata mapper
+   */
+  MetadataMapper<T, ? extends Record> toMapper(MetadataType metadataType);
+
+  /**
+   * Converts given list of Metastore component metadata units into
+   * RDBMS Metastore overwrite operations.
+   *
+   * @param units Metastore metadata units
+   * @return list of RDBMS Metastore overwrite operations
+   */
+  List<RdbmsOperation.Overwrite> toOverwrite(List<T> units);
+
+  /**
+   * Converts Metastore delete operation holder into list of
+   * RDBMS Metastore delete operations.
+   *
+   * @param delete Metastore delete operation holder
+   * @return list of RDBMS Metastore delete operations
+   */
+  List<RdbmsOperation.Delete> toDelete(Delete delete);
+
+  /**
+   * Creates list of RDBMS Metastore delete operations which will
+   * delete all data from corresponding Metastore component tables.
+   *
+   * @return list of RDBMS Metastore delete operations
+   */
+  List<RdbmsOperation.Delete> toDeleteAll();
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/util/ConverterUtil.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/util/ConverterUtil.java
new file mode 100644
index 0000000..e1f2ae7
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/util/ConverterUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.metastore.rdbms.exception.RdbmsMetastoreException;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Converter utility class which helps to convert Metastore metadata objects from / to string value.
+ */
+public class ConverterUtil {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  private static final TypeReference<List<String>> LIST_STRING_TYPE_REF = new TypeReference<List<String>>() {
+  };
+  private static final TypeReference<Map<String, String>> MAP_STRING_STRING_TYPE_REF = new TypeReference<Map<String, String>>() {
+  };
+  private static final TypeReference<Map<String, Float>> MAP_STRING_FLOAT_TYPE_REF = new TypeReference<Map<String, Float>>() {
+  };
+
+  public static <T> String convertToString(T value) {
+    try {
+      return MAPPER.writeValueAsString(value);
+    } catch (JsonProcessingException e) {
+      throw new RdbmsMetastoreException("Unable to convert value to String: " + value);
+    }
+  }
+
+  public static <T> T convertTo(String value, TypeReference<T> typeReference) {
+    if (value == null) {
+      return null;
+    }
+    try {
+      return MAPPER.readValue(value, typeReference);
+    } catch (IOException e) {
+      throw new RdbmsMetastoreException(String.format("Unable to convert to %s value: %s",
+        typeReference.getType().getTypeName(), value));
+    }
+  }
+
+  public static List<String> convertToListString(String value) {
+    return convertTo(value, LIST_STRING_TYPE_REF);
+  }
+
+  public static Map<String, String> convertToMapStringString(String value) {
+    return convertTo(value, MAP_STRING_STRING_TYPE_REF);
+  }
+
+  public static Map<String, Float> convertToMapStringFloat(String value) {
+    return convertTo(value, MAP_STRING_FLOAT_TYPE_REF);
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/util/DbHelper.java b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/util/DbHelper.java
new file mode 100644
index 0000000..6e9ea70
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/java/org/apache/drill/metastore/rdbms/util/DbHelper.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.util;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.config.MetastoreConfigConstants;
+import org.apache.drill.metastore.rdbms.config.RdbmsConfigConstants;
+import org.apache.drill.metastore.rdbms.exception.RdbmsMetastoreException;
+import org.jooq.SQLDialect;
+import org.jooq.tools.jdbc.JDBCUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * Provides methods to configure database prior to data source initialization.
+ */
+public interface DbHelper {
+
+  /**
+   * Initializes {@link DbHelper} implementation based on {@link SQLDialect}.
+   *
+   * @param config Metastore config
+   * @return DBHelper instance
+   */
+  static DbHelper init(DrillConfig config) {
+    SQLDialect dialect = JDBCUtils.dialect(config.getString(RdbmsConfigConstants.DATA_SOURCE_URL));
+    switch (dialect) {
+      case SQLITE:
+        return new SQLiteHelper(config);
+      default:
+        return NoOpHelper.get();
+    }
+  }
+
+  /**
+   * Prepares database prior to data source configuration.
+   */
+  void prepareDatabase();
+
+  /**
+   * No-op implementation of {@link DbHelper} for those databases that do not require
+   * any preparation before data source creation.
+   */
+  class NoOpHelper implements DbHelper {
+
+    private static final NoOpHelper INSTANCE = new NoOpHelper();
+
+    public static NoOpHelper get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void prepareDatabase() {
+      // do nothing
+    }
+  }
+
+  /**
+   * SQLite implementation of {@link DbHelper}, creates database path if needed.
+   */
+  class SQLiteHelper implements DbHelper {
+
+    private static final Logger logger = LoggerFactory.getLogger(SQLiteHelper.class);
+
+    private final DrillConfig config;
+
+    public SQLiteHelper(DrillConfig config) {
+      this.config = config;
+    }
+
+    /**
+     * SQLite database requires database path to exist prior to database initialization.
+     * Checks if path creation flag set to true and path is set and attempts to create
+     * database path recursively.
+     */
+    @Override
+    public void prepareDatabase() {
+      if (config.hasPath(RdbmsConfigConstants.SQLITE_PATH_CREATE)
+        && config.hasPath(RdbmsConfigConstants.SQLITE_PATH_VALUE)
+        && config.getBoolean(RdbmsConfigConstants.SQLITE_PATH_CREATE)) {
+
+        String path = config.getString(RdbmsConfigConstants.SQLITE_PATH_VALUE);
+        try {
+          Path dbPath = Files.createDirectories(Paths.get(path));
+          logger.info("Configured SQLite database path: {}", dbPath);
+        } catch (IOException e) {
+          throw new RdbmsMetastoreException(String.format("Unable to create SQLite database path [%s]: %s. " +
+              "Optionally, path can be created manually and [%s] set to false in %s.", path, e.getMessage(),
+            RdbmsConfigConstants.SQLITE_PATH_CREATE, MetastoreConfigConstants.OVERRIDE_RESOURCE_FILE_NAME), e);
+        }
+      }
+    }
+  }
+}
diff --git a/metastore/rdbms-metastore/src/main/resources/db/changelog/changelog.yaml b/metastore/rdbms-metastore/src/main/resources/db/changelog/changelog.yaml
new file mode 100644
index 0000000..0e010fd
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/resources/db/changelog/changelog.yaml
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+databaseChangeLog:
+  - include:
+      file: db/changelog/changes/initial_ddls.yaml
diff --git a/metastore/rdbms-metastore/src/main/resources/db/changelog/changes/initial_ddls.yaml b/metastore/rdbms-metastore/src/main/resources/db/changelog/changes/initial_ddls.yaml
new file mode 100644
index 0000000..4e04c39
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/resources/db/changelog/changes/initial_ddls.yaml
@@ -0,0 +1,398 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Creates 5 RDBMS Metastore tables component tables: TABLES, SEGMENTS, FILES, ROW_GROUPS, PARTITIONS.
+# Note: the RDBMS Drill Metastore schema is slightly denormalized, see explanation in README.md (section Tables structure).
+databaseChangeLog:
+    # to preserve upper case naming in some DBs (ex: PostgreSQL) and quote reserved column names (ex: COLUMN)
+  - objectQuotingStrategy: QUOTE_ALL_OBJECTS
+  - changeSet:
+      id: 1
+      author: arina
+      changes:
+        - createTable:
+            tableName: TABLES
+            columns:
+              - column:
+                  name: STORAGE_PLUGIN
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: WORKSPACE
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: TABLE_NAME
+                  type: VARCHAR(500)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: OWNER
+                  type: VARCHAR(100)
+              - column:
+                  name: TABLE_TYPE
+                  type: VARCHAR(100)
+              - column:
+                  name: METADATA_KEY
+                  type: VARCHAR(100)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: METADATA_TYPE
+                  type: VARCHAR(100)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: LOCATION
+                  type: VARCHAR(500)
+              - column:
+                  name: INTERESTING_COLUMNS
+                  type: CLOB
+              - column:
+                  name: SCHEMA
+                  type: CLOB
+              - column:
+                  name: COLUMN_STATISTICS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: METADATA_STATISTICS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: PARTITION_KEYS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: LAST_MODIFIED_TIME
+                  type: BIGINT
+                  constraints:
+                    nullable: false
+              - column:
+                  name: ADDITIONAL_METADATA
+                  type: CLOB
+
+        - createTable:
+            tableName: SEGMENTS
+            columns:
+              - column:
+                  name: STORAGE_PLUGIN
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: WORKSPACE
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: TABLE_NAME
+                  type: VARCHAR(500)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_KEY
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_IDENTIFIER
+                  type: VARCHAR(500)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_TYPE
+                  type: VARCHAR(100)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: LOCATION
+                  type: VARCHAR(500)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: SCHEMA
+                  type: CLOB
+              - column:
+                  name: COLUMN_STATISTICS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: METADATA_STATISTICS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: COLUMN
+                  type: VARCHAR(100)
+              - column:
+                  name: LOCATIONS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: PARTITION_VALUES
+                  type: CLOB
+              - column:
+                  name: PATH
+                  type: VARCHAR(500)
+              - column:
+                  name: LAST_MODIFIED_TIME
+                  type: BIGINT
+                  constraints:
+                    nullable: false
+              - column:
+                  name: ADDITIONAL_METADATA
+                  type: CLOB
+
+        - createTable:
+            tableName: FILES
+            columns:
+              - column:
+                  name: STORAGE_PLUGIN
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: WORKSPACE
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: TABLE_NAME
+                  type: VARCHAR(500)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_KEY
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_IDENTIFIER
+                  type: VARCHAR(500)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_TYPE
+                  type: VARCHAR(100)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: LOCATION
+                  type: VARCHAR(500)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: SCHEMA
+                  type: CLOB
+              - column:
+                  name: COLUMN_STATISTICS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: METADATA_STATISTICS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: PATH
+                  type: VARCHAR(500)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: LAST_MODIFIED_TIME
+                  type: BIGINT
+                  constraints:
+                    nullable: false
+              - column:
+                  name: ADDITIONAL_METADATA
+                  type: CLOB
+
+        - createTable:
+            tableName: ROW_GROUPS
+            columns:
+              - column:
+                  name: STORAGE_PLUGIN
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: WORKSPACE
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: TABLE_NAME
+                  type: VARCHAR(500)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_KEY
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_IDENTIFIER
+                  type: VARCHAR(500)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_TYPE
+                  type: VARCHAR(100)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: LOCATION
+                  type: VARCHAR(500)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: SCHEMA
+                  type: CLOB
+              - column:
+                  name: COLUMN_STATISTICS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: METADATA_STATISTICS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: PATH
+                  type: VARCHAR(500)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: ROW_GROUP_INDEX
+                  type: INT
+                  constraints:
+                    nullable: false
+              - column:
+                  name: HOST_AFFINITY
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: LAST_MODIFIED_TIME
+                  type: BIGINT
+                  constraints:
+                    nullable: false
+              - column:
+                  name: ADDITIONAL_METADATA
+                  type: CLOB
+
+        - createTable:
+            tableName: PARTITIONS
+            columns:
+              - column:
+                  name: STORAGE_PLUGIN
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: WORKSPACE
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: TABLE_NAME
+                  type: VARCHAR(500)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_KEY
+                  type: VARCHAR(100)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_IDENTIFIER
+                  type: VARCHAR(500)
+                  constraints:
+                    primaryKey: true
+                    nullable: false
+              - column:
+                  name: METADATA_TYPE
+                  type: VARCHAR(100)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: SCHEMA
+                  type: CLOB
+              - column:
+                  name: COLUMN_STATISTICS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: METADATA_STATISTICS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: COLUMN
+                  type: VARCHAR(100)
+                  constraints:
+                    nullable: false
+              - column:
+                  name: LOCATIONS
+                  type: CLOB
+                  constraints:
+                    nullable: false
+              - column:
+                  name: PARTITION_VALUES
+                  type: CLOB
+              - column:
+                  name: LAST_MODIFIED_TIME
+                  type: BIGINT
+                  constraints:
+                    nullable: false
+              - column:
+                  name: ADDITIONAL_METADATA
+                  type: CLOB
diff --git a/metastore/rdbms-metastore/src/main/resources/drill-metastore-module.conf b/metastore/rdbms-metastore/src/main/resources/drill-metastore-module.conf
new file mode 100644
index 0000000..ab1ebcd
--- /dev/null
+++ b/metastore/rdbms-metastore/src/main/resources/drill-metastore-module.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+drill.metastore.rdbms: {
+  # Data source properits is used to specify connection details to Drill Metastore database
+  data_source: {
+    # By default, embedded file based SQLite database will be used
+    # Note: can be used only in Drill embedded mode
+    driver: "org.sqlite.JDBC",
+    url: "jdbc:sqlite:"${drill.metastore.rdbms.database.sqlite.path.value}"/sqlite-drill-metastore.db",
+    # username: "",
+    # password: "",
+    properties: {
+      # List of Hikari properties: https://github.com/brettwooldridge/HikariCP
+      # maxIdle : 8
+    }
+  }
+
+  # Properties specific to database implementation
+  database: {
+    sqlite: {
+      path: {
+        # ${user.home} is equivalent of System.getProperty("user.home")
+        # ${drill.exec.zk.root} value will be substituted from Drill main config
+        value: ${user.home}"/"${drill.exec.zk.root}"/metastore",
+        # SQLite database path must exist, Drill will attempt to create path before data source initialization.
+        # Flag can be set to false to skip creation process if path already exists.
+        create: true
+      }
+    }
+  }
+}
diff --git a/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/RdbmsBaseTest.java b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/RdbmsBaseTest.java
new file mode 100644
index 0000000..0bd3a6c
--- /dev/null
+++ b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/RdbmsBaseTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms;
+
+import org.apache.drill.categories.MetastoreTest;
+import org.apache.drill.test.BaseTest;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+@Category(MetastoreTest.class)
+public class RdbmsBaseTest extends BaseTest {
+
+  @ClassRule
+  public static TemporaryFolder defaultFolder = new TemporaryFolder();
+}
diff --git a/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestRdbmsBasicTablesRequests.java b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestRdbmsBasicTablesRequests.java
new file mode 100644
index 0000000..5944714
--- /dev/null
+++ b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestRdbmsBasicTablesRequests.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.components.tables;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.components.tables.MetastoreTableInfo;
+import org.apache.drill.metastore.components.tables.AbstractBasicTablesRequestsTest;
+import org.apache.drill.metastore.operate.Metadata;
+import org.apache.drill.metastore.rdbms.RdbmsMetastore;
+import org.apache.drill.metastore.rdbms.config.RdbmsConfigConstants;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestRdbmsBasicTablesRequests extends AbstractBasicTablesRequestsTest {
+
+  @BeforeClass
+  public static void init() {
+    Config config = DrillConfig.create()
+      .withValue(RdbmsConfigConstants.DATA_SOURCE_DRIVER, ConfigValueFactory.fromAnyRef("org.sqlite.JDBC"))
+      .withValue(RdbmsConfigConstants.DATA_SOURCE_URL, ConfigValueFactory.fromAnyRef("jdbc:sqlite::memory:"));
+
+    innerInit(config, RdbmsMetastore.class);
+  }
+
+  @Test
+  public void testMetastoreTableInfoExistingTable() {
+    MetastoreTableInfo metastoreTableInfo = basicRequests.metastoreTableInfo(nationTableInfo);
+    assertTrue(metastoreTableInfo.isExists());
+    assertEquals(nationTableInfo, metastoreTableInfo.tableInfo());
+    assertEquals(nationTable.lastModifiedTime(), metastoreTableInfo.lastModifiedTime());
+    assertEquals(Metadata.UNDEFINED, metastoreTableInfo.metastoreVersion());
+  }
+}
diff --git a/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestRdbmsTablesMetastore.java b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestRdbmsTablesMetastore.java
new file mode 100644
index 0000000..1e7894a
--- /dev/null
+++ b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestRdbmsTablesMetastore.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.components.tables;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.components.tables.AbstractTablesMetastoreTest;
+import org.apache.drill.metastore.rdbms.RdbmsMetastore;
+import org.apache.drill.metastore.rdbms.config.RdbmsConfigConstants;
+import org.junit.BeforeClass;
+
+public class TestRdbmsTablesMetastore extends AbstractTablesMetastoreTest {
+
+  @BeforeClass
+  public static void init() {
+    Config config = DrillConfig.create()
+      .withValue(RdbmsConfigConstants.DATA_SOURCE_DRIVER, ConfigValueFactory.fromAnyRef("org.sqlite.JDBC"))
+      .withValue(RdbmsConfigConstants.DATA_SOURCE_URL, ConfigValueFactory.fromAnyRef("jdbc:sqlite::memory:"));
+
+    innerInit(config, RdbmsMetastore.class);
+  }
+}
diff --git a/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestTablesMetadataMapper.java b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestTablesMetadataMapper.java
new file mode 100644
index 0000000..56e24fb
--- /dev/null
+++ b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestTablesMetadataMapper.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.components.tables;
+
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.TestData;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.rdbms.RdbmsBaseTest;
+import org.jooq.Condition;
+import org.jooq.Field;
+import org.jooq.generated.Tables;
+import org.jooq.impl.DSL;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class TestTablesMetadataMapper extends RdbmsBaseTest {
+
+  @Test
+  public void testTable() {
+    assertEquals(Tables.TABLES, TablesMetadataMapper.TableMapper.get().table());
+    assertEquals(Tables.SEGMENTS, TablesMetadataMapper.SegmentMapper.get().table());
+    assertEquals(Tables.FILES, TablesMetadataMapper.FileMapper.get().table());
+    assertEquals(Tables.ROW_GROUPS, TablesMetadataMapper.RowGroupMapper.get().table());
+    assertEquals(Tables.PARTITIONS, TablesMetadataMapper.PartitionMapper.get().table());
+  }
+
+  @Test
+  public void testToFields() {
+    List<Field<?>> tablesFields = TablesMetadataMapper.TableMapper.get()
+      .toFields(Arrays.asList(MetastoreColumn.OWNER,
+        MetastoreColumn.TABLE_NAME,
+        MetastoreColumn.INTERESTING_COLUMNS));
+
+    assertEquals(
+      Arrays.asList(Tables.TABLES.OWNER, Tables.TABLES.TABLE_NAME, Tables.TABLES.INTERESTING_COLUMNS),
+      tablesFields);
+
+    List<Field<?>> segmentsFields = TablesMetadataMapper.SegmentMapper.get()
+      .toFields(Arrays.asList(MetastoreColumn.COLUMNS_STATISTICS,
+        MetastoreColumn.TABLE_NAME,
+        MetastoreColumn.LOCATION));
+
+    assertEquals(
+      Arrays.asList(Tables.SEGMENTS.COLUMN_STATISTICS, Tables.SEGMENTS.TABLE_NAME, Tables.SEGMENTS.LOCATION),
+      segmentsFields);
+
+    List<Field<?>> filesFields = TablesMetadataMapper.FileMapper.get()
+      .toFields(Arrays.asList(MetastoreColumn.PATH,
+        MetastoreColumn.TABLE_NAME,
+        MetastoreColumn.LOCATION));
+
+    assertEquals(
+      Arrays.asList(Tables.FILES.PATH, Tables.FILES.TABLE_NAME, Tables.FILES.LOCATION),
+      filesFields);
+
+    List<Field<?>> rowGroupsFields = TablesMetadataMapper.RowGroupMapper.get()
+      .toFields(Arrays.asList(MetastoreColumn.PATH,
+        MetastoreColumn.TABLE_NAME,
+        MetastoreColumn.HOST_AFFINITY));
+
+    assertEquals(
+      Arrays.asList(Tables.ROW_GROUPS.PATH, Tables.ROW_GROUPS.TABLE_NAME, Tables.ROW_GROUPS.HOST_AFFINITY),
+      rowGroupsFields);
+
+    List<Field<?>> partitionFields = TablesMetadataMapper.PartitionMapper.get()
+      .toFields(Arrays.asList(MetastoreColumn.PARTITION_VALUES,
+        MetastoreColumn.TABLE_NAME,
+        MetastoreColumn.LOCATIONS));
+
+    assertEquals(
+      Arrays.asList(Tables.PARTITIONS.PARTITION_VALUES, Tables.PARTITIONS.TABLE_NAME, Tables.PARTITIONS.LOCATIONS),
+      partitionFields);
+  }
+
+  @Test
+  public void testToFieldsAbsent() {
+    List<Field<?>> tableFields = TablesMetadataMapper.TableMapper.get().toFields(
+      Arrays.asList(MetastoreColumn.SCHEMA, MetastoreColumn.COLUMN, MetastoreColumn.HOST_AFFINITY));
+    assertEquals(1, tableFields.size());
+
+    List<Field<?>> segmentFields = TablesMetadataMapper.SegmentMapper.get().toFields(
+      Arrays.asList(MetastoreColumn.SCHEMA, MetastoreColumn.OWNER, MetastoreColumn.HOST_AFFINITY));
+    assertEquals(1, segmentFields.size());
+
+    List<Field<?>> fileFields = TablesMetadataMapper.FileMapper.get().toFields(
+      Arrays.asList(MetastoreColumn.SCHEMA, MetastoreColumn.OWNER, MetastoreColumn.HOST_AFFINITY));
+    assertEquals(1, fileFields.size());
+
+    List<Field<?>> rowGroupFields = TablesMetadataMapper.RowGroupMapper.get().toFields(
+      Arrays.asList(MetastoreColumn.LOCATIONS, MetastoreColumn.OWNER, MetastoreColumn.HOST_AFFINITY));
+    assertEquals(1, rowGroupFields.size());
+
+    List<Field<?>> partitionFields = TablesMetadataMapper.PartitionMapper.get().toFields(
+      Arrays.asList(MetastoreColumn.LOCATIONS, MetastoreColumn.OWNER, MetastoreColumn.HOST_AFFINITY));
+    assertEquals(1, partitionFields.size());
+  }
+
+  @Test
+  public void testToCondition() {
+    FilterExpression filterExpression = FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs");
+
+    Condition tablesCondition = TablesMetadataMapper.TableMapper.get().toCondition(filterExpression);
+    assertEquals(Tables.TABLES.STORAGE_PLUGIN.eq("dfs"), tablesCondition);
+
+    Condition segmentsCondition = TablesMetadataMapper.SegmentMapper.get().toCondition(filterExpression);
+    assertEquals(Tables.SEGMENTS.STORAGE_PLUGIN.eq("dfs"), segmentsCondition);
+
+    Condition filesCondition = TablesMetadataMapper.FileMapper.get().toCondition(filterExpression);
+    assertEquals(Tables.FILES.STORAGE_PLUGIN.eq("dfs"), filesCondition);
+
+    Condition rowGroupsCondition = TablesMetadataMapper.RowGroupMapper.get().toCondition(filterExpression);
+    assertEquals(Tables.ROW_GROUPS.STORAGE_PLUGIN.eq("dfs"), rowGroupsCondition);
+
+    Condition partitionsCondition = TablesMetadataMapper.PartitionMapper.get().toCondition(filterExpression);
+    assertEquals(Tables.PARTITIONS.STORAGE_PLUGIN.eq("dfs"), partitionsCondition);
+  }
+
+  @Test
+  public void testToConditionNull() {
+    assertEquals(DSL.noCondition().toString(),
+      TablesMetadataMapper.TableMapper.get().toCondition(null).toString());
+
+    assertEquals(DSL.noCondition().toString(),
+      TablesMetadataMapper.SegmentMapper.get().toCondition(null).toString());
+
+    assertEquals(DSL.noCondition().toString(),
+      TablesMetadataMapper.FileMapper.get().toCondition(null).toString());
+
+    assertEquals(DSL.noCondition().toString(),
+      TablesMetadataMapper.RowGroupMapper.get().toCondition(null).toString());
+
+    assertEquals(DSL.noCondition().toString(),
+      TablesMetadataMapper.PartitionMapper.get().toCondition(null).toString());
+  }
+
+  @Test
+  public void testToDeleteConditionsTables() {
+    TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
+
+    List<TableMetadataUnit> units = Arrays.asList(
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("region")
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .build()
+    );
+
+    Condition[] expectedConditions = new Condition[] {
+      DSL.and(Tables.TABLES.STORAGE_PLUGIN.eq("dfs"),
+        Tables.TABLES.WORKSPACE.eq("tmp"),
+        Tables.TABLES.TABLE_NAME.eq("region")),
+
+      DSL.and(Tables.TABLES.STORAGE_PLUGIN.eq("dfs"),
+        Tables.TABLES.WORKSPACE.eq("tmp"),
+        Tables.TABLES.TABLE_NAME.eq("nation"))
+    };
+
+    List<Condition> actualConditions = TablesMetadataMapper.TableMapper.get().toDeleteConditions(units);
+
+    assertEquals(expectedConditions.length, actualConditions.size());
+    assertThat(actualConditions, hasItems(expectedConditions));
+  }
+
+  @Test
+  public void testToDeleteConditionsSegments() {
+    TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
+
+    List<TableMetadataUnit> units = Arrays.asList(
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2008")
+        .metadataIdentifier("2008")
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2008")
+        .metadataIdentifier("2008/Q1")
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2009")
+        .metadataIdentifier("2009")
+        .build()
+    );
+
+    Condition[] expectedConditions = new Condition[] {
+      DSL.and(Tables.SEGMENTS.STORAGE_PLUGIN.eq("dfs"),
+        Tables.SEGMENTS.WORKSPACE.eq("tmp"),
+        Tables.SEGMENTS.TABLE_NAME.eq("nation"),
+        Tables.SEGMENTS.METADATA_KEY.eq("2008")),
+
+      DSL.and(Tables.SEGMENTS.STORAGE_PLUGIN.eq("dfs"),
+        Tables.SEGMENTS.WORKSPACE.eq("tmp"),
+        Tables.SEGMENTS.TABLE_NAME.eq("nation"),
+        Tables.SEGMENTS.METADATA_KEY.eq("2009")),
+    };
+
+    List<Condition> actualConditions = TablesMetadataMapper.SegmentMapper.get().toDeleteConditions(units);
+
+    assertEquals(expectedConditions.length, actualConditions.size());
+    assertThat(actualConditions, hasItems(expectedConditions));
+  }
+
+  @Test
+  public void testToDeleteConditionsFiles() {
+    TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
+
+    List<TableMetadataUnit> units = Arrays.asList(
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2008")
+        .metadataIdentifier("2008/0_0_0.parquet")
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2008")
+        .metadataIdentifier("2008/0_0_1.parquet")
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2009")
+        .metadataIdentifier("2009/0_0_0.parquet")
+        .build()
+    );
+
+    Condition[] expectedConditions = new Condition[] {
+      DSL.and(Tables.FILES.STORAGE_PLUGIN.eq("dfs"),
+        Tables.FILES.WORKSPACE.eq("tmp"),
+        Tables.FILES.TABLE_NAME.eq("nation"),
+        Tables.FILES.METADATA_KEY.eq("2008")),
+
+      DSL.and(Tables.FILES.STORAGE_PLUGIN.eq("dfs"),
+        Tables.FILES.WORKSPACE.eq("tmp"),
+        Tables.FILES.TABLE_NAME.eq("nation"),
+        Tables.FILES.METADATA_KEY.eq("2009")),
+    };
+
+    List<Condition> actualConditions = TablesMetadataMapper.FileMapper.get().toDeleteConditions(units);
+
+    assertEquals(expectedConditions.length, actualConditions.size());
+    assertThat(actualConditions, hasItems(expectedConditions));
+  }
+
+  @Test
+  public void testToDeleteConditionsRowGroups() {
+    TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
+
+    List<TableMetadataUnit> units = Arrays.asList(
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2008")
+        .metadataIdentifier("2008/0_0_0.parquet/1")
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2008")
+        .metadataIdentifier("2008/0_0_0.parquet/2")
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2009")
+        .metadataIdentifier("2009/0_0_0.parquet/1")
+        .build()
+    );
+
+    Condition[] expectedConditions = new Condition[] {
+      DSL.and(Tables.ROW_GROUPS.STORAGE_PLUGIN.eq("dfs"),
+        Tables.ROW_GROUPS.WORKSPACE.eq("tmp"),
+        Tables.ROW_GROUPS.TABLE_NAME.eq("nation"),
+        Tables.ROW_GROUPS.METADATA_KEY.eq("2008")),
+
+      DSL.and(Tables.ROW_GROUPS.STORAGE_PLUGIN.eq("dfs"),
+        Tables.ROW_GROUPS.WORKSPACE.eq("tmp"),
+        Tables.ROW_GROUPS.TABLE_NAME.eq("nation"),
+        Tables.ROW_GROUPS.METADATA_KEY.eq("2009")),
+    };
+
+    List<Condition> actualConditions = TablesMetadataMapper.RowGroupMapper.get().toDeleteConditions(units);
+
+    assertEquals(expectedConditions.length, actualConditions.size());
+    assertThat(actualConditions, hasItems(expectedConditions));
+  }
+
+  @Test
+  public void testToDeleteConditionsPartitions() {
+    TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
+
+    List<TableMetadataUnit> units = Arrays.asList(
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2008")
+        .metadataIdentifier("2008/01")
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2008")
+        .metadataIdentifier("2008/02")
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataKey("2009")
+        .metadataIdentifier("2009/01")
+        .build()
+    );
+
+    Condition[] expectedConditions = new Condition[] {
+      DSL.and(Tables.PARTITIONS.STORAGE_PLUGIN.eq("dfs"),
+        Tables.PARTITIONS.WORKSPACE.eq("tmp"),
+        Tables.PARTITIONS.TABLE_NAME.eq("nation"),
+        Tables.PARTITIONS.METADATA_KEY.eq("2008")),
+
+      DSL.and(Tables.PARTITIONS.STORAGE_PLUGIN.eq("dfs"),
+        Tables.PARTITIONS.WORKSPACE.eq("tmp"),
+        Tables.PARTITIONS.TABLE_NAME.eq("nation"),
+        Tables.PARTITIONS.METADATA_KEY.eq("2009")),
+    };
+
+    List<Condition> actualConditions = TablesMetadataMapper.PartitionMapper.get().toDeleteConditions(units);
+
+    assertEquals(expectedConditions.length, actualConditions.size());
+    assertThat(actualConditions, hasItems(expectedConditions));
+  }
+}
diff --git a/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestTablesTransformer.java b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestTablesTransformer.java
new file mode 100644
index 0000000..0245c30
--- /dev/null
+++ b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/components/tables/TestTablesTransformer.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.components.tables;
+
+import org.apache.drill.metastore.MetastoreColumn;
+import org.apache.drill.metastore.components.tables.TableMetadataUnit;
+import org.apache.drill.metastore.TestData;
+import org.apache.drill.metastore.expressions.FilterExpression;
+import org.apache.drill.metastore.metadata.MetadataType;
+import org.apache.drill.metastore.rdbms.RdbmsBaseTest;
+import org.apache.drill.metastore.rdbms.exception.RdbmsMetastoreException;
+import org.apache.drill.metastore.rdbms.operate.RdbmsOperation;
+import org.apache.drill.metastore.rdbms.transform.MetadataMapper;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.jooq.Record;
+import org.jooq.generated.Tables;
+import org.jooq.impl.DSL;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+public class TestTablesTransformer extends RdbmsBaseTest {
+
+  private static final TablesTransformer TRANSFORMER = TablesTransformer.get();
+
+  @Test
+  public void testToMappersAll() {
+    Set<MetadataMapper<TableMetadataUnit, ? extends Record>> metadataMappers =
+      TRANSFORMER.toMappers(Sets.newHashSet(MetadataType.ALL, MetadataType.FILE));
+
+    assertEquals(5, metadataMappers.size());
+  }
+
+  @Test
+  public void testToMappersSome() {
+    Set<MetadataMapper<TableMetadataUnit, ? extends Record>> metadataMappers =
+      TRANSFORMER.toMappers(Sets.newHashSet(MetadataType.TABLE, MetadataType.FILE));
+
+    assertEquals(
+      Sets.newHashSet(TablesMetadataMapper.TableMapper.get(), TablesMetadataMapper.FileMapper.get()),
+      metadataMappers);
+  }
+
+  @Test
+  public void testToMappersAbsent() {
+    try {
+      TRANSFORMER.toMappers(Sets.newHashSet(MetadataType.TABLE, MetadataType.VIEW));
+      fail();
+    } catch (RdbmsMetastoreException e) {
+      assertThat(e.getMessage(), startsWith("Metadata mapper is absent for type"));
+    }
+  }
+
+  @Test
+  public void testToMapperExisting() {
+    MetadataMapper<TableMetadataUnit, ? extends Record> mapper = TRANSFORMER.toMapper(MetadataType.TABLE);
+    assertSame(TablesMetadataMapper.TableMapper.get(), mapper);
+  }
+
+  @Test
+  public void testToMapperAbsent() {
+    try {
+      TRANSFORMER.toMapper(MetadataType.VIEW);
+      fail();
+    } catch (RdbmsMetastoreException e) {
+      assertThat(e.getMessage(), startsWith("Metadata mapper is absent for type"));
+    }
+  }
+
+  @Test
+  public void testToMapperAll() {
+    try {
+      TRANSFORMER.toMapper(MetadataType.ALL);
+      fail();
+    } catch (RdbmsMetastoreException e) {
+      assertThat(e.getMessage(), startsWith("Metadata mapper is absent for type"));
+    }
+  }
+
+  @Test
+  public void testToOverwriteOneUnit() {
+    TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
+
+    List<TableMetadataUnit> units = Collections.singletonList(
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataType(MetadataType.TABLE.name())
+        .build());
+
+    List<RdbmsOperation.Overwrite> overwrites = TRANSFORMER.toOverwrite(units);
+    assertEquals(1, overwrites.size());
+
+    RdbmsOperation.Overwrite overwrite = overwrites.get(0);
+    assertEquals(Tables.TABLES, overwrite.table());
+
+    assertEquals(1, overwrite.deleteConditions().size());
+    assertEquals(
+      DSL.and(Tables.TABLES.STORAGE_PLUGIN.eq("dfs"),
+        Tables.TABLES.WORKSPACE.eq("tmp"),
+        Tables.TABLES.TABLE_NAME.eq("nation")),
+      overwrite.deleteConditions().get(0));
+  }
+
+  @Test
+  public void testToOverwriteSeveralUnitsSameType() {
+    TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
+
+    List<TableMetadataUnit> units = Arrays.asList(
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("region")
+        .metadataType(MetadataType.TABLE.name())
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataType(MetadataType.TABLE.name())
+        .build()
+    );
+
+    List<RdbmsOperation.Overwrite> overwrites = TRANSFORMER.toOverwrite(units);
+    assertEquals(1, overwrites.size());
+
+    RdbmsOperation.Overwrite overwrite = overwrites.get(0);
+    assertEquals(Tables.TABLES, overwrite.table());
+
+    assertEquals(2, overwrite.deleteConditions().size());
+  }
+
+  @Test
+  public void testToOverwriteSeveralUnitsDifferentTypes() {
+    TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
+
+    List<TableMetadataUnit> units = Arrays.asList(
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("region")
+        .metadataType(MetadataType.TABLE.name())
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataType(MetadataType.TABLE.name())
+        .build(),
+      basicUnit.toBuilder()
+        .storagePlugin("dfs")
+        .workspace("tmp")
+        .tableName("nation")
+        .metadataType(MetadataType.SEGMENT.name())
+        .build()
+    );
+
+    List<RdbmsOperation.Overwrite> overwrites = TRANSFORMER.toOverwrite(units);
+    assertEquals(2, overwrites.size());
+  }
+
+  @Test
+  public void testToOverwriteAbsentMetadataType() {
+    TableMetadataUnit basicUnit = TestData.basicTableMetadataUnit();
+
+    List<TableMetadataUnit> units = Arrays.asList(
+      basicUnit.toBuilder()
+        .metadataType(MetadataType.TABLE.name())
+        .build(),
+      basicUnit.toBuilder()
+        .metadataType(MetadataType.VIEW.name())
+        .build()
+    );
+
+    try {
+      TRANSFORMER.toOverwrite(units);
+      fail();
+    } catch (RdbmsMetastoreException e) {
+      assertThat(e.getMessage(), startsWith("Metadata mapper is absent for type"));
+    }
+  }
+
+  @Test
+  public void testToDeleteOne() {
+    org.apache.drill.metastore.operate.Delete metastoreDelete = org.apache.drill.metastore.operate.Delete.builder()
+      .metadataType(MetadataType.TABLE)
+      .filter(FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"))
+      .build();
+
+    List<RdbmsOperation.Delete> rdbmsDeletes = TRANSFORMER.toDelete(metastoreDelete);
+
+    assertEquals(1, rdbmsDeletes.size());
+
+    RdbmsOperation.Delete rdbmsDelete = rdbmsDeletes.get(0);
+    assertEquals(Tables.TABLES, rdbmsDelete.table());
+    assertEquals(Tables.TABLES.STORAGE_PLUGIN.eq("dfs").toString(), rdbmsDelete.condition().toString());
+  }
+
+  @Test
+  public void testToDeleteSeveral() {
+    org.apache.drill.metastore.operate.Delete metastoreDelete = org.apache.drill.metastore.operate.Delete.builder()
+      .metadataType(MetadataType.ALL)
+      .filter(FilterExpression.equal(MetastoreColumn.STORAGE_PLUGIN, "dfs"))
+      .build();
+
+    List<RdbmsOperation.Delete> rdbmsDeletes = TRANSFORMER.toDelete(metastoreDelete);
+
+    assertEquals(5, rdbmsDeletes.size());
+  }
+
+  @Test
+  public void testToDeleteAll() {
+    List<RdbmsOperation.Delete> rdbmsDeletes = TRANSFORMER.toDeleteAll();
+
+    assertEquals(5, rdbmsDeletes.size());
+    rdbmsDeletes.stream()
+      .map(RdbmsOperation.Delete::condition)
+      .forEach(condition -> assertEquals(DSL.noCondition().toString(), condition.toString()));
+  }
+}
diff --git a/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/util/TestConverterUtil.java b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/util/TestConverterUtil.java
new file mode 100644
index 0000000..0b599e0
--- /dev/null
+++ b/metastore/rdbms-metastore/src/test/java/org/apache/drill/metastore/rdbms/util/TestConverterUtil.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.rdbms.util;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.drill.metastore.rdbms.RdbmsBaseTest;
+import org.apache.drill.metastore.rdbms.exception.RdbmsMetastoreException;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestConverterUtil extends RdbmsBaseTest {
+
+  @Test
+  public void testConvertToString() {
+    assertEquals("null", ConverterUtil.convertToString(null));
+    assertEquals("\"\"", ConverterUtil.convertToString(""));
+    assertEquals("\"abc\"", ConverterUtil.convertToString("abc"));
+    assertEquals("123", ConverterUtil.convertToString(123L));
+    assertEquals("true", ConverterUtil.convertToString(true));
+  }
+
+  @Test
+  public void testConvertToType() {
+    assertNull(ConverterUtil.convertTo(null, new TypeReference<String>() {
+    }));
+    assertNull(ConverterUtil.convertTo("null", new TypeReference<String>() {
+    }));
+    assertNull(ConverterUtil.convertTo("null", new TypeReference<Long>() {
+    }));
+    assertEquals("", ConverterUtil.convertTo("\"\"", new TypeReference<String>() {
+    }));
+    assertEquals("abc", ConverterUtil.convertTo("\"abc\"", new TypeReference<String>() {
+    }));
+    assertEquals(Long.valueOf(123), ConverterUtil.convertTo("123", new TypeReference<Long>() {
+    }));
+    assertTrue(ConverterUtil.convertTo("true", new TypeReference<Boolean>() {
+    }));
+    try {
+      ConverterUtil.convertTo("abc", new TypeReference<Long>() {
+      });
+      fail();
+    } catch (RdbmsMetastoreException e) {
+      assertThat(e.getMessage(), startsWith("Unable to convert"));
+    }
+  }
+
+  @Test
+  public void testConvertToListString() {
+    assertNull(ConverterUtil.convertToListString("null"));
+    assertEquals(Collections.<String>emptyList(), ConverterUtil.convertToListString("[]"));
+    assertEquals(Arrays.asList("a", "b", "c"), ConverterUtil.convertToListString("[\"a\",\"b\",\"c\"]"));
+    try {
+      ConverterUtil.convertToListString("{}");
+      fail();
+    } catch (RdbmsMetastoreException e) {
+      assertThat(e.getMessage(), startsWith("Unable to convert"));
+    }
+  }
+
+  @Test
+  public void testConvertToListMapStringString() {
+    assertNull(ConverterUtil.convertToMapStringString("null"));
+    assertEquals(Collections.<String, String>emptyMap(), ConverterUtil.convertToMapStringString("{}"));
+    assertEquals(ImmutableMap.of("a", "b", "c", "d"),
+      ConverterUtil.convertToMapStringString("{\"a\":\"b\",\"c\":\"d\"}"));
+    try {
+      ConverterUtil.convertToMapStringString("[]");
+      fail();
+    } catch (RdbmsMetastoreException e) {
+      assertThat(e.getMessage(), startsWith("Unable to convert"));
+    }
+  }
+
+  @Test
+  public void testConvertToListMapStringFloat() {
+    assertNull(ConverterUtil.convertToMapStringFloat("null"));
+    assertEquals(Collections.<String, Float>emptyMap(), ConverterUtil.convertToMapStringFloat("{}"));
+    assertEquals(ImmutableMap.of("a", 1.2F, "c", 3.4F),
+      ConverterUtil.convertToMapStringFloat("{\"a\":1.2,\"c\":3.4}"));
+    try {
+      ConverterUtil.convertToMapStringFloat("{\"a\":\"b\",\"c\":\"d\"}");
+      fail();
+    } catch (RdbmsMetastoreException e) {
+      assertThat(e.getMessage(), startsWith("Unable to convert"));
+    }
+  }
+}