IMPALA-9664: Support hive replication

This patch makes some improvements to the INSERT
event generated by Impala. Specifically, the INSERT event
will now include new file information when Impala inserts
into a table. This information can be used by external
tools like Hive Replication to replicate the changes made
by Impala in their source databases.

Additionally, this patch modifies the truncate table
execution so that it uses HMS API to truncate the table
instead of deleting the files directly on the filesystem.

Following changes were made.
1. Fires insert events for insert overwrite.
2. Has the names of the new files in the events. In case of insert
overwrite, this is just a list of files which were added by
the insert overwrite operation.
3. In case of ACID tables, fires transactional notification API
for all the partitions in which data is inserted.
4. For tables which have replication enabled, the truncate table
operation now uses a HMS API to truncate the table. This is
necessary since HMS API moves the files to a replication change
manager location if needed. Additionally, it generates
ALTER_TABLE events with truncate flag set to true.

TODO:
1. For external tables, replication does not seem to work in the
dev environment. This will be done as a followup.

Testing:
1. Created a new test in test_events_processing.py which inserts
into managed tables which are being replicated. It makes sure that
hive replication detects the new rows which are added into the
tables. The test also exercises insert overwrite and truncate
statements and makes sure that the table is replicated correctly.

Change-Id: Icaf3fe0adff755ff853960f270ceb45b11a84f0a
Reviewed-on: http://gerrit.cloudera.org:8080/16439
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Vihang Karajgaonkar <vihang@cloudera.com>
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 60acc3c..84c250b 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1241,6 +1241,7 @@
       catalog_update.is_overwrite = finalize_params.is_overwrite;
       if (InTransaction()) {
         catalog_update.__set_transaction_id(finalize_params.transaction_id);
+        catalog_update.__set_write_id(finalize_params.write_id);
       }
 
       Status cnxn_status;
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index de3cd4f..5acdb7f 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -215,7 +215,10 @@
   7: required bool is_overwrite;
 
   // ACID transaction ID for transactional inserts.
-  8: optional i64 transaction_id;
+  8: optional i64 transaction_id
+
+  // ACID write ID for transactional inserts.
+  9: optional i64 write_id
 }
 
 // Response from a TUpdateCatalogRequest
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index dd41715..de7c586 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -25,8 +25,10 @@
 import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableMap;
 
+import com.google.common.collect.Iterables;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -35,6 +37,7 @@
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -74,6 +77,7 @@
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
@@ -102,7 +106,7 @@
 import org.apache.impala.thrift.TValidWriteIdList;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.AcidUtils.TblTransaction;
-import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
+import org.apache.impala.util.MetaStoreUtil.TableInsertEventInfo;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
@@ -1061,60 +1065,102 @@
    * In case of any exception, we just log the failure of firing insert events.
    */
   public static List<Long> fireInsertEvents(MetaStoreClient msClient,
-      List<InsertEventInfo> insertEventInfos, String dbName, String tableName) {
+      TableInsertEventInfo insertEventInfo, String dbName, String tableName) {
+    Stopwatch sw = Stopwatch.createStarted();
     try {
-      return fireInsertEventHelper(msClient.getHiveClient(), insertEventInfos, dbName, tableName);
+      if (insertEventInfo.isTransactional()) {
+        // if the table is transactional we use a different API to fire these
+        // events. Note that we don't really need the event ids here for self-event
+        // detection since these events are not fetched by the EventsProcessor later
+        // These events are mostly required for incremental replication in Hive
+        fireInsertTransactionalEventHelper(msClient.getHiveClient(),
+            insertEventInfo, dbName, tableName);
+      } else {
+        return fireInsertEventHelper(msClient.getHiveClient(),
+            insertEventInfo.getInsertEventReqData(), dbName,
+            tableName);
+      }
     } catch (Exception e) {
       LOG.error("Failed to fire insert event. Some tables might not be"
               + " refreshed on other impala clusters.", e);
     } finally {
+      LOG.info("Time taken to fire insert events on table {}.{}: {} msec", dbName,
+          tableName, sw.stop().elapsed(TimeUnit.MILLISECONDS));
       msClient.close();
     }
     return Collections.emptyList();
   }
 
   /**
+   * Fires a listener event of the type ACID_WRITE on a transactional table in metastore.
+   * This event is polled by other external systems to detect insert operations into
+   * ACID tables.
+   * @throws TException in case of errors during HMS API call.
+   */
+  private static void fireInsertTransactionalEventHelper(
+      IMetaStoreClient hiveClient, TableInsertEventInfo insertEventInfo, String dbName,
+      String tableName) throws TException {
+    for (InsertEventRequestData insertData : insertEventInfo.getInsertEventReqData()) {
+      // TODO(Vihang) unfortunately there is no bulk insert event API for transactional
+      // tables. It is possible that this may take long time here if there are lots of
+      // partitions which were inserted.
+      if (LOG.isDebugEnabled()) {
+        String msg =
+            "Firing write notification log request for table " + dbName + "." + tableName
+                + (insertData.isSetPartitionVal() ? " on partition " + insertData
+                .getPartitionVal() : "");
+        LOG.debug(msg);
+      }
+      WriteNotificationLogRequest rqst = new WriteNotificationLogRequest(
+          insertEventInfo.getTxnId(), insertEventInfo.getWriteId(), dbName, tableName,
+          insertData);
+      if (insertData.isSetPartitionVal()) {
+        rqst.setPartitionVals(insertData.getPartitionVal());
+      }
+      // TODO(Vihang) metastore should return the event id here so that we can get rid
+      // of firing INSERT event types for transactional tables.
+      hiveClient.addWriteNotificationLog(rqst);
+    }
+  }
+
+  /**
    *  Fires an insert event to HMS notification log. In Hive-3 for partitioned table,
    *  all partition insert events will be fired by a bulk API.
    *
    * @param msClient Metastore client,
-   * @param insertEventInfos A list of insert event encapsulating the information needed
-   * to fire insert
+   * @param insertEventDataList A list of insert event info encapsulating the information
+   *                            needed to fire insert events.
    * @param dbName
    * @param tableName
    * @return a list of eventIds for the insert events
    */
   @VisibleForTesting
   public static List<Long> fireInsertEventHelper(IMetaStoreClient msClient,
-      List<InsertEventInfo> insertEventInfos, String dbName, String tableName)
+      List<InsertEventRequestData> insertEventDataList, String dbName, String tableName)
       throws TException {
     Preconditions.checkNotNull(msClient);
     Preconditions.checkNotNull(dbName);
     Preconditions.checkNotNull(tableName);
-    Preconditions.checkState(!insertEventInfos.isEmpty(), "Atleast one insert event "
+    Preconditions.checkState(!insertEventDataList.isEmpty(), "Atleast one insert event "
         + "info must be provided.");
     LOG.debug(String.format(
-        "Firing %s insert event for %s", insertEventInfos.size(), tableName));
+        "Firing %s insert event(s) for %s.%s", insertEventDataList.size(), dbName,
+        tableName));
     FireEventRequestData data = new FireEventRequestData();
     FireEventRequest rqst = new FireEventRequest(true, data);
     rqst.setDbName(dbName);
     rqst.setTableName(tableName);
-    List<InsertEventRequestData> insertDatas = new ArrayList<>();
-    for (InsertEventInfo info : insertEventInfos) {
-      InsertEventRequestData insertData = new InsertEventRequestData();
-      Preconditions.checkNotNull(info.getNewFiles());
-      insertData.setFilesAdded(new ArrayList<>(info.getNewFiles()));
-      insertData.setReplace(info.isOverwrite());
-      if (info.getPartVals() != null) insertData.setPartitionVal(info.getPartVals());
-      insertDatas.add(insertData);
-    }
-    if (insertDatas.size() == 1) {
-      if (insertEventInfos.get(0).getPartVals() != null) {
-        rqst.setPartitionVals(insertEventInfos.get(0).getPartVals());
+    if (insertEventDataList.size() == 1) {
+      InsertEventRequestData insertEventData = Iterables
+          .getOnlyElement(insertEventDataList);
+      if (insertEventData.getPartitionVal() != null) {
+        rqst.setPartitionVals(insertEventData.getPartitionVal());
       }
-      data.setInsertData(insertDatas.get(0));
+      // single insert event API
+      data.setInsertData(insertEventData);
     } else {
-      data.setInsertDatas(insertDatas);
+      // use bulk insert API
+      data.setInsertDatas(insertEventDataList);
     }
     FireEventResponse response = msClient.fireListenerEvent(rqst);
     if (!response.isSetEventIds()) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index f4e562a..0400d3a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -735,11 +735,13 @@
   }
 
   public FileSystem getFileSystem() throws CatalogException {
+    FileSystem tableFs;
     try {
-      return (new Path(getLocation())).getFileSystem(CONF);
+      tableFs = (new Path(getLocation())).getFileSystem(CONF);
     } catch (IOException e) {
       throw new CatalogException("Invalid table path for table: " + getFullName(), e);
     }
+    return tableFs;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/HiveStorageDescriptorFactory.java b/fe/src/main/java/org/apache/impala/catalog/HiveStorageDescriptorFactory.java
index c649808..c645cfc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HiveStorageDescriptorFactory.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HiveStorageDescriptorFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
@@ -47,6 +48,8 @@
     sd.setInputFormat(hdfsFileFormat.inputFormat());
     sd.setOutputFormat(hdfsFileFormat.outputFormat());
     sd.getSerdeInfo().setSerializationLib(hdfsFileFormat.serializationLib());
+    sd.setBucketCols(new ArrayList<>(0));
+    sd.setSortCols(new ArrayList<>(0));
     setSerdeInfo(rowFormat, sd.getSerdeInfo());
     return sd;
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 8b9bba3..2e38d4c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -233,7 +233,7 @@
   private final long pollingFrequencyInSec_;
 
   // catalog service instance to be used while processing events
-  private final CatalogServiceCatalog catalog_;
+  protected final CatalogServiceCatalog catalog_;
 
   // scheduler daemon thread executor for processing events at given frequency
   private final ScheduledExecutorService scheduler_ = Executors
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index e8270bb..88801a5 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -19,9 +19,15 @@
 
 import static org.apache.impala.analysis.Analyzer.ACCESSTYPE_READWRITE;
 
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterables;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -36,8 +42,8 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -54,6 +60,7 @@
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -61,7 +68,9 @@
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.TruncateTableRequest;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.impala.analysis.AlterTableSortByStmt;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.TableName;
@@ -100,8 +109,8 @@
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.View;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
-import org.apache.impala.catalog.monitor.CatalogOperationMetrics;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
+import org.apache.impala.catalog.monitor.CatalogOperationMetrics;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.ImpalaRuntimeException;
@@ -141,6 +150,7 @@
 import org.apache.impala.thrift.TColumnType;
 import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TCommentOnParams;
+import org.apache.impala.thrift.TCopyTestCaseReq;
 import org.apache.impala.thrift.TCreateDataSourceParams;
 import org.apache.impala.thrift.TCreateDbParams;
 import org.apache.impala.thrift.TCreateDropRoleParams;
@@ -163,7 +173,6 @@
 import org.apache.impala.thrift.TGrantRevokeRoleParams;
 import org.apache.impala.thrift.THdfsCachingOp;
 import org.apache.impala.thrift.THdfsFileFormat;
-import org.apache.impala.thrift.TCopyTestCaseReq;
 import org.apache.impala.thrift.TPartitionDef;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPartitionStats;
@@ -183,25 +192,16 @@
 import org.apache.impala.thrift.TTruncateParams;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.thrift.TUpdateCatalogResponse;
-import org.apache.impala.util.CompressionUtil;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.AcidUtils.TblTransaction;
+import org.apache.impala.util.CompressionUtil;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.MetaStoreUtil;
-import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
-import org.slf4j.Logger;
+import org.apache.impala.util.MetaStoreUtil.TableInsertEventInfo;
 import org.apache.thrift.TException;
-
-import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -2080,33 +2080,45 @@
         catalog_.getLock().writeLock().unlock();
         TblTransaction tblTxn = MetastoreShim.createTblTransaction(hmsClient,
             table.getMetaStoreTable(), txn.getId());
-        HdfsTable hdfsTable = (HdfsTable)table;
-        Collection<? extends FeFsPartition> parts =
-            FeCatalogUtils.loadAllPartitions(hdfsTable);
-        createEmptyBaseDirectories(parts, tblTxn.writeId);
-        // Currently Impala cannot update the statistics properly. So instead of
-        // writing correct stats, let's just remove COLUMN_STATS_ACCURATE parameter from
-        // each partition.
-        // TODO(IMPALA-8883): properly update statistics
-        List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitions =
-            Lists.newArrayListWithCapacity(parts.size());
-        if (table.getNumClusteringCols() > 0) {
-          for (FeFsPartition part: parts) {
-            org.apache.hadoop.hive.metastore.api.Partition hmsPart =
-                ((HdfsPartition)part).toHmsPartition();
-            Preconditions.checkNotNull(hmsPart);
-            if (hmsPart.getParameters() != null) {
-              hmsPart.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
-              hmsPartitions.add(hmsPart);
+        HdfsTable hdfsTable = (HdfsTable) table;
+        // if the table is replicated we should use the HMS API to truncate it so that
+        // if moves the files into in replication change manager location which is later
+        // used for replication.
+        if (isTableBeingReplicated(hmsClient, hdfsTable)) {
+          Stopwatch sw = Stopwatch.createStarted();
+          String dbName = Preconditions.checkNotNull(hdfsTable.getDb()).getName();
+          hmsClient.truncateTable(dbName, hdfsTable.getName(), null, tblTxn.validWriteIds,
+              tblTxn.writeId);
+          LOG.debug("Time taken to truncate table {} using HMS API: {} msec",
+              hdfsTable.getFullName(), sw.stop().elapsed(TimeUnit.MILLISECONDS));
+        } else {
+          Collection<? extends FeFsPartition> parts =
+              FeCatalogUtils.loadAllPartitions(hdfsTable);
+          createEmptyBaseDirectories(parts, tblTxn.writeId);
+          // Currently Impala cannot update the statistics properly. So instead of
+          // writing correct stats, let's just remove COLUMN_STATS_ACCURATE parameter from
+          // each partition.
+          // TODO(IMPALA-8883): properly update statistics
+          List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitions =
+              Lists.newArrayListWithCapacity(parts.size());
+          if (table.getNumClusteringCols() > 0) {
+            for (FeFsPartition part : parts) {
+              org.apache.hadoop.hive.metastore.api.Partition hmsPart =
+                  ((HdfsPartition) part).toHmsPartition();
+              Preconditions.checkNotNull(hmsPart);
+              if (hmsPart.getParameters() != null) {
+                hmsPart.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+                hmsPartitions.add(hmsPart);
+              }
             }
           }
+          // For partitioned tables we need to alter all the partitions in HMS.
+          if (!hmsPartitions.isEmpty()) {
+            unsetPartitionsColStats(table.getMetaStoreTable(), hmsPartitions, tblTxn);
+          }
+          // Remove COLUMN_STATS_ACCURATE property from the table.
+          unsetTableColStats(table.getMetaStoreTable(), tblTxn);
         }
-        // For partitioned tables we need to alter all the partitions in HMS.
-        if (!hmsPartitions.isEmpty()) {
-          unsetPartitionsColStats(table.getMetaStoreTable(), hmsPartitions, tblTxn);
-        }
-        // Remove COLUMN_STATS_ACCURATE property from the table.
-        unsetTableColStats(table.getMetaStoreTable(), tblTxn);
         txn.commit();
       }
       return newCatalogVersion;
@@ -2116,6 +2128,27 @@
     }
   }
 
+  /**
+   * Helper method to check if the database which table belongs to is a source of
+   * Hive replication. We cannot rely on the Db object here due to eventual nature of
+   * cache updates.
+   */
+  private boolean isTableBeingReplicated(IMetaStoreClient metastoreClient,
+      HdfsTable tbl) throws CatalogException {
+    Preconditions.checkState(tbl.getLock().isHeldByCurrentThread());
+    String dbName = tbl.getDb().getName();
+    try {
+      Database db = metastoreClient.getDatabase(dbName);
+      if (!db.isSetParameters()) return false;
+      return org.apache.commons.lang.StringUtils
+          .isNotEmpty(db.getParameters().get("repl.source.for"));
+    } catch (TException tException) {
+      throw new CatalogException(
+          String.format("Could not determine if the table %s is a replication source",
+          tbl.getFullName()), tException);
+    }
+  }
+
   private long truncateNonTransactionalTable(TTruncateParams params, Table table)
       throws Exception {
     Preconditions.checkState(table.getLock().isHeldByCurrentThread());
@@ -2123,6 +2156,18 @@
     long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
     catalog_.getLock().writeLock().unlock();
     HdfsTable hdfsTable = (HdfsTable) table;
+    // if the table is being replicated we issue the HMS API to truncate the table
+    // since it generates additional events which are used by Hive Replication.
+    try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
+      if (isTableBeingReplicated(metaStoreClient.getHiveClient(), hdfsTable)) {
+        String dbName = Preconditions.checkNotNull(hdfsTable.getDb()).getName();
+        Stopwatch sw = Stopwatch.createStarted();
+        metaStoreClient.getHiveClient().truncateTable(dbName, hdfsTable.getName(), null);
+        LOG.debug("Time taken to truncate table {} using HMS API: {} msec",
+            hdfsTable.getFullName(), sw.stop().elapsed(TimeUnit.MILLISECONDS));
+        return newCatalogVersion;
+      }
+    }
     Collection<? extends FeFsPartition> parts = FeCatalogUtils
         .loadAllPartitions(hdfsTable);
     for (FeFsPartition part : parts) {
@@ -4516,16 +4561,37 @@
       // Commit transactional inserts on success. We don't abort the transaction
       // here in case of failures, because the client, i.e. query coordinator, is
       // always responsible for aborting transactions when queries hit errors.
+      boolean skipTransactionalInsertEvent = false;
       if (update.isSetTransaction_id()) {
         if (response.getResult().getStatus().getStatus_code() == TErrorCode.OK) {
           commitTransaction(update.getTransaction_id());
+        } else {
+          // do not generate the insert event if the transaction was aborted.
+          skipTransactionalInsertEvent = true;
         }
       }
+      // the load operation below changes the partitions in-place (this was later
+      // changed in upstream). If the partitions are loaded in-place, we cannot keep track
+      // of files before the load and hence the firing of insert events later below
+      // cannot find the new files.
+      Map<String, Set<String>> filesBeforeInsert = Maps.newHashMap();
+      if (shouldGenerateInsertEvents()) {
+        filesBeforeInsert = getPartitionNameFilesMap(affectedExistingPartitions);
+      }
       loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata,
           "INSERT");
       // After loading metadata, fire insert events if external event processing is
       // enabled.
-      createInsertEvents(table, affectedExistingPartitions, update.is_overwrite);
+      if (!skipTransactionalInsertEvent) {
+        // new parts are only created in case the table is partitioned.
+        Set<String> newPartsCreated = table.getNumClusteringCols() > 0 ? Sets
+            .difference(Preconditions.checkNotNull(partsToLoadMetadata),
+                filesBeforeInsert.keySet()) : Sets.newHashSetWithExpectedSize(0);
+        long txnId = tblTxn == null ? -1 : tblTxn.txnId;
+        long writeId = tblTxn == null ? -1: tblTxn.writeId;
+        createInsertEvents(table, filesBeforeInsert, newPartsCreated,
+            update.is_overwrite, txnId, writeId);
+      }
       addTableToCatalogUpdate(table, response.result);
     } finally {
       context.stop();
@@ -4544,89 +4610,91 @@
    * Populates insert event data and calls fireInsertEvents() if external event
    * processing is enabled. This is no-op if event processing is disabled or there are
    * no existing partitions affected by this insert.
-   *
-   * @param affectedExistingPartitions List of existing partitions touched by the insert.
-   * @param isInsertOverwrite indicates if the operation was an insert overwrite. If it
-   *     is not, all the new files added by this insert is calculated.
+   *  @param partitionFilesMapBeforeInsert List of files for each partition in which data
+   *                                     was inserted.
+   * @param newPartsCreated represents all the partitions names which got created by
+   *                       the insert operation.
+   * @param isInsertOverwrite indicates if the operation was an insert overwrite.
    */
   private void createInsertEvents(Table table,
-      List<FeFsPartition> affectedExistingPartitions, boolean isInsertOverwrite) {
-    boolean isInsertEventsEnabled = BackendConfig.INSTANCE.isInsertEventsEnabled();
-    if (!catalog_.isEventProcessingActive() || !isInsertEventsEnabled
-        || affectedExistingPartitions.size() == 0) {
+      Map<String, Set<String>> partitionFilesMapBeforeInsert,
+      Set<String> newPartsCreated, boolean isInsertOverwrite,
+      long txnId, long writeId) throws CatalogException {
+    if (!shouldGenerateInsertEvents()) {
       return;
     }
 
     // List of all insert events that we call HMS fireInsertEvent() on.
-    List<InsertEventInfo> insertEventInfos = new ArrayList<>();
+    List<InsertEventRequestData> insertEventReqDatas = new ArrayList<>();
     // List of all partitions that we insert into
     List<HdfsPartition> partitions = new ArrayList<>();
-
-    // Map of partition names to file names of all existing partitions touched by the
-    // insert.
-    Map<String, Set<String>> partitionFilesMapBeforeInsert = new HashMap<>();
-    if (!isInsertOverwrite) {
-      partitionFilesMapBeforeInsert =
-          getPartitionNameFilesMap(affectedExistingPartitions);
-    }
-    // If table is partitioned, we add all existing partitions touched by this insert
-    // to the insert event.
     Collection<? extends FeFsPartition> partsPostInsert;
     partsPostInsert =
-        ((HdfsTable) table).getPartitionsForNames(
-            partitionFilesMapBeforeInsert.keySet());
+        ((HdfsTable) table).getPartitionsForNames(partitionFilesMapBeforeInsert.keySet());
 
-    // If it is not an insert overwrite operation, we find new files added by this insert.
-    Map<String, Set<String>> partitionFilesMapPostInsert = new HashMap<>();
-    if (!isInsertOverwrite) {
-      partitionFilesMapPostInsert =
-          getPartitionNameFilesMap(partsPostInsert);
-    }
+    Map<String, Set<String>> partitionFilesMapPostInsert = getPartitionNameFilesMap(
+        partsPostInsert);
 
     for (FeFsPartition part : partsPostInsert) {
       // Find the delta of the files added by the insert if it is not an overwrite
       // operation. HMS fireListenerEvent() expects an empty list if no new files are
       // added or if the operation is an insert overwrite.
       HdfsPartition hdfsPartition = (HdfsPartition) part;
-      Set<String> deltaFiles = new HashSet<>();
+      // newFiles keeps track of newly added files by this insert operation.
+      Set<String> newFiles;
       List<String> partVals = null;
-      if (!isInsertOverwrite) {
-        String partitionName = hdfsPartition.getPartitionName() + "/";
-        Set<String> filesPostInsert =
-            partitionFilesMapPostInsert.get(partitionName);
-        if (table.getNumClusteringCols() > 0) {
-          Set<String> filesBeforeInsert =
-              partitionFilesMapBeforeInsert.get(partitionName);
-          deltaFiles = Sets.difference(filesBeforeInsert, filesPostInsert);
-          partVals = hdfsPartition.getPartitionValuesAsStrings(true);
-        } else {
-          Map.Entry<String, Set<String>> entry =
-              partitionFilesMapBeforeInsert.entrySet().iterator().next();
-          deltaFiles = Sets.difference(entry.getValue(), filesPostInsert);
-        }
-        LOG.info("{} new files detected for table {} partition {}.",
-            filesPostInsert.size(), table.getTableName(),
-            hdfsPartition.getPartitionName());
-      }
-      if (deltaFiles != null || isInsertOverwrite) {
+      String partitionName = hdfsPartition.getPartitionName() + "/";
+      Set<String> filesPostInsert =
+          partitionFilesMapPostInsert.get(partitionName);
+      Set<String> filesBeforeInsert =
+          partitionFilesMapBeforeInsert.get(partitionName);
+      newFiles = isInsertOverwrite ? filesPostInsert
+          : Sets.difference(filesPostInsert, filesBeforeInsert);
+      // if the table is unpartitioned partVals will be empty
+      partVals = hdfsPartition.getPartitionValuesAsStrings(true);
+      LOG.info(String.format("%s new files detected for table %s%s", newFiles.size(),
+          table.getFullName(),
+          ((HdfsTable) table).isPartitioned() ? " partition " + hdfsPartition
+              .getPartitionName() : ""));
+      if (!newFiles.isEmpty()) {
         // Collect all the insert events.
-        insertEventInfos.add(
-            new InsertEventInfo(partVals, deltaFiles, isInsertOverwrite));
-        if (partVals != null) {
+        insertEventReqDatas.add(
+            makeInsertEventData((HdfsTable) table, partVals, newFiles,
+                isInsertOverwrite));
+        if (!partVals.isEmpty()) {
           // insert into partition
           partitions.add(hdfsPartition);
         }
-      } else {
-        LOG.info("No new files were created, and is not a replace. Skipping "
-            + "generating INSERT event.");
       }
     }
 
-    if (insertEventInfos.isEmpty()) return;
+    // if the table is transaction table we should generate a transactional
+    // insert event type.
+    boolean isTransactional = AcidUtils.isTransactionalTable(table.getMetaStoreTable()
+        .getParameters());
+    Preconditions.checkState(!isTransactional || txnId > 0, String
+        .format("Invalid transaction id %s for generating insert events on table %s",
+            txnId, table.getFullName()));
+    Preconditions.checkState(!isTransactional || writeId > 0,
+        String.format("Invalid write id %s for generating insert events on table %s",
+            writeId, table.getFullName()));
+    // if the table is transactional we fire the event for new partitions as well.
+    if (isTransactional) {
+      for (HdfsPartition newPart : ((HdfsTable) table)
+          .getPartitionsForNames(newPartsCreated)) {
+        insertEventReqDatas.add(makeInsertEventData((HdfsTable) table,
+            newPart.getPartitionValuesAsStrings(true), newPart.getFileNames(),
+            isInsertOverwrite));
+      }
+    }
+
+    if (insertEventReqDatas.isEmpty()) return;
 
     MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient();
+    TableInsertEventInfo insertEventInfo = new TableInsertEventInfo(
+        insertEventReqDatas, isTransactional, txnId, writeId);
     List<Long> eventIds = MetastoreShim.fireInsertEvents(metaStoreClient,
-        insertEventInfos, table.getDb().getName(), table.getName());
+        insertEventInfo, table.getDb().getName(), table.getName());
     if (!eventIds.isEmpty()) {
       if (partitions.size() == 0) { // insert into table
         catalog_.addVersionsForInflightEvents(true, table, eventIds.get(0));
@@ -4639,14 +4707,58 @@
     }
   }
 
+  private boolean shouldGenerateInsertEvents() {
+    return catalog_.isEventProcessingActive() && BackendConfig.INSTANCE
+        .isInsertEventsEnabled();
+  }
+
+  private InsertEventRequestData makeInsertEventData(HdfsTable tbl, List<String> partVals,
+      Set<String> newFiles, boolean isInsertOverwrite) throws CatalogException {
+    Preconditions.checkNotNull(newFiles);
+    Preconditions.checkNotNull(partVals);
+    InsertEventRequestData insertEventRequestData = new InsertEventRequestData(
+        Lists.newArrayListWithCapacity(
+            newFiles.size()));
+    boolean isTransactional = AcidUtils
+        .isTransactionalTable(tbl.getMetaStoreTable().getParameters());
+    // in case of unpartitioned table, partVals will be empty
+    if (!partVals.isEmpty()) insertEventRequestData.setPartitionVal(partVals);
+    FileSystem fs = tbl.getFileSystem();
+    for (String file : newFiles) {
+      try {
+        Path filePath = new Path(file);
+        FileChecksum checkSum = fs.getFileChecksum(filePath);
+        String checksumStr = checkSum == null ? ""
+            : StringUtils.byteToHexString(checkSum.getBytes(), 0, checkSum.getLength());
+        insertEventRequestData.addToFilesAdded(file);
+        insertEventRequestData.addToFilesAddedChecksum(checksumStr);
+        if (isTransactional) {
+          String acidDirPath = AcidUtils.getFirstLevelAcidDirPath(filePath, fs);
+          if (acidDirPath != null) {
+            insertEventRequestData.addToSubDirectoryList(acidDirPath);
+          }
+        }
+        insertEventRequestData.setReplace(isInsertOverwrite);
+      } catch (IOException e) {
+        throw new CatalogException("Could not get the file checksum for file " + file, e);
+      }
+    }
+    return insertEventRequestData;
+  }
+
   /**
    * Util method to return a map of partition names to list of files for that partition.
    */
   private static Map<String, Set<String>> getPartitionNameFilesMap(Collection<?
       extends FeFsPartition> partitions) {
-        return partitions.stream().collect(
-            Collectors.toMap(p -> p.getPartitionName() + "/",
-                p -> ((HdfsPartition) p).getFileNames()));
+    Map<String, Set<String>> partitionFilePaths = new HashMap<>();
+    for (FeFsPartition partition : partitions) {
+      String key = partition.getPartitionName() + "/";
+      Set<String> filenames = ((HdfsPartition) partition).getFileNames();
+      partitionFilePaths.putIfAbsent(key, new HashSet<>(filenames.size()));
+      partitionFilePaths.get(key).addAll(filenames);
+    }
+    return partitionFilePaths;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index 7671318..b9fef23 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -22,7 +22,9 @@
 import com.google.common.collect.Range;
 import com.google.errorprone.annotations.Immutable;
 
+import java.io.IOException;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -173,6 +175,45 @@
   }
 
   /**
+   * This method is copied from Hive's org.apache.hadoop.hive.ql.io.AcidUtils.java
+   * (commit hash 17ac1d9f230b8d663c09c22016753012a9b91edf). It is used to generate
+   * the ACID directory names to be added to the insert events on the transactional
+   * tables.
+   */
+  //Get the first level acid directory (if any) from a given path
+  public static String getFirstLevelAcidDirPath(Path dataPath, FileSystem fileSystem)
+      throws IOException {
+    if (dataPath == null) {
+      return null;
+    }
+    String firstLevelAcidDir = getAcidSubDir(dataPath);
+    if (firstLevelAcidDir != null) {
+      return firstLevelAcidDir;
+    }
+
+    String acidDirPath = getFirstLevelAcidDirPath(dataPath.getParent(), fileSystem);
+    if (acidDirPath == null) {
+      return null;
+    }
+
+    // We need the path for directory so no need to append file name
+    if (fileSystem.isDirectory(dataPath)) {
+      return acidDirPath + Path.SEPARATOR + dataPath.getName();
+    }
+    return acidDirPath;
+  }
+
+  private static String getAcidSubDir(Path dataPath) {
+    String dataDir = dataPath.getName();
+    if (dataDir.startsWith("base_")
+        || dataDir.startsWith("delta_")
+        || dataDir.startsWith("delete_delta_")) {
+      return dataDir;
+    }
+    return null;
+  }
+
+  /**
    * Predicate that checks if the file or directory is relevant for a given WriteId list.
    * The class does not implement a Predicate interface since we want to support a strict
    * mode which throw exception in certain cases.
diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
index 81b62f6..db5655b 100644
--- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java
@@ -25,11 +25,14 @@
 import java.util.Map;
 
 import java.util.Collection;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
@@ -321,33 +324,39 @@
     return msTbl.getSd().getNumBuckets() > 0;
   }
 
-  /**
-   * A helper class that encapsulates all the information needed to fire and insert event
-   * with HMS.
-   */
-  public static class InsertEventInfo {
-    // List of partition values corresponding to the partition keys in
-    // a partitioned table. This is null for non-partitioned table.
-    private List<String> partVals;
+  public static class TableInsertEventInfo {
+    // list of partition level insert event info
+    private final List<InsertEventRequestData> insertEventRequestData_;
+    // transaction id in case this represents a transactional table.
+    private final long txnId_;
+    // writeId in case this is for a transactional table.
+    private final long writeId_;
+    // true in case this is for transactional table.
+    private final boolean isTransactional_;
 
-    // Set of all the 'new' files added by this insert. This is empty in
-    // case of insert overwrite.
-    private Collection<String> newFiles;
-
-    // If true, sets the 'replace' flag to true indicating that the
-    // operation was an insert overwrite in the notification log. Will set the same to
-    // false otherwise.
-    private boolean isOverwrite;
-
-    public InsertEventInfo(
-        List<String> partVals, Collection<String> newFiles, boolean isOverwrite) {
-      this.partVals = partVals;
-      this.newFiles = newFiles;
-      this.isOverwrite = isOverwrite;
+    public TableInsertEventInfo(
+        List<InsertEventRequestData> insertEventInfos_, boolean isTransactional,
+        long txnId, long writeId) {
+      this.insertEventRequestData_ = insertEventInfos_;
+      this.txnId_ = txnId;
+      this.writeId_ = writeId;
+      this.isTransactional_ = isTransactional;
     }
 
-    public List<String> getPartVals() { return this.partVals; }
-    public Collection<String> getNewFiles() { return this.newFiles; }
-    public boolean isOverwrite() { return this.isOverwrite; }
+    public boolean isTransactional() {
+      return isTransactional_;
+    }
+
+    public List<InsertEventRequestData> getInsertEventReqData() {
+      return insertEventRequestData_;
+    }
+
+    public long getTxnId() {
+      return txnId_;
+    }
+
+    public long getWriteId() {
+      return writeId_;
+    }
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 338d6ff..27e1ab1 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -33,6 +33,7 @@
 import com.google.common.collect.Iterables;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -46,12 +47,16 @@
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -59,6 +64,9 @@
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.HdfsUri;
 import org.apache.impala.authorization.NoopAuthorizationFactory;
@@ -80,6 +88,7 @@
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
 import org.apache.impala.catalog.events.MetastoreEvents.AlterTableEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.InsertEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType;
@@ -114,6 +123,7 @@
 import org.apache.impala.thrift.TColumnType;
 import org.apache.impala.thrift.TCreateDbParams;
 import org.apache.impala.thrift.TCreateFunctionParams;
+import org.apache.impala.thrift.TCreateTableLikeParams;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.TDdlExecRequest;
 import org.apache.impala.thrift.TDdlType;
@@ -138,7 +148,6 @@
 import org.apache.impala.thrift.TTypeNodeType;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
-import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -720,50 +729,148 @@
    * should be treated as self-event.
    */
   @Test
-  public void testInsertFromImpala() throws ImpalaException {
+  public void testInsertFromImpala() throws Exception {
     Assume.assumeTrue("Skipping this test because it only works with Hive-3 or greater",
         TestUtils.getHiveMajorVersion() >= 3);
     // Test insert into multiple partitions
     createDatabaseFromImpala(TEST_DB_NAME, null);
     String tableToInsertPart = "tbl_with_mul_part";
-    createTableFromImpala(TEST_DB_NAME, tableToInsertPart, true);
     String tableToInsertMulPart = "tbl_to_insert_mul_part";
-    createTableFromImpala(TEST_DB_NAME, tableToInsertMulPart, true);
-    // add first partition
-    TPartitionDef partitionDef = new TPartitionDef();
-    partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "1"));
-    partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "100"));
-    alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef);
-    alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef);
-    // add second partition
-    partitionDef = new TPartitionDef();
-    partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "1"));
-    partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "200"));
-    alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef);
-    alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef);
-    eventsProcessor_.processEvents();
+    createInsertTestTbls(tableToInsertPart, tableToInsertMulPart);
     // count self event from here, numberOfSelfEventsBefore=4 as we have 4 ADD PARTITION
     // events
     long numberOfSelfEventsBefore =
         eventsProcessor_.getMetrics()
             .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS)
             .getCount();
+    runInsertTest(tableToInsertPart, tableToInsertMulPart, numberOfSelfEventsBefore,
+        false);
+  }
+
+  @Test
+  public void testInsertOverwriteFromImpala() throws Exception {
+    Assume.assumeTrue("Skipping this test because it only works with Hive-3 or greater",
+        TestUtils.getHiveMajorVersion() >= 3);
+    // Test insert into multiple partitions
+    createDatabaseFromImpala(TEST_DB_NAME, null);
+    String tableToInsertPart = "tbl_with_mul_part";
+    String tableToInsertMulPart = "tbl_to_insert_mul_part";
+    createInsertTestTbls(tableToInsertPart, tableToInsertMulPart);
+    // count self event from here, numberOfSelfEventsBefore=4 as we have 4 ADD PARTITION
+    // events
+    long numberOfSelfEventsBefore =
+        eventsProcessor_.getMetrics()
+            .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS)
+            .getCount();
+    runInsertTest(tableToInsertPart, tableToInsertMulPart, numberOfSelfEventsBefore,
+        true);
+  }
+
+  private void createInsertTestTbls(String tableToInsertPart, String tableToInsertMulPart)
+      throws Exception {
+    createTableLike("functional", "alltypes", TEST_DB_NAME, tableToInsertPart);
+    createTableLike("functional", "alltypes", TEST_DB_NAME, tableToInsertMulPart);
+    // add first partition
+    TPartitionDef partitionDef = new TPartitionDef();
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("year", "2009"));
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("month", "1"));
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef);
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef);
+    // add second partition
+    partitionDef = new TPartitionDef();
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("year", "2009"));
+    partitionDef.addToPartition_spec(new TPartitionKeyValue("month", "2"));
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef);
+    alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef);
+
+    HdfsTable allTypes = (HdfsTable) catalog_
+        .getOrLoadTable("functional", "alltypes", "test", null);
+    HdfsTable insertTbl = (HdfsTable) catalog_
+        .getOrLoadTable(TEST_DB_NAME, tableToInsertPart, "test", null);
+    HdfsTable multiInsertTbl = (HdfsTable) catalog_
+        .getOrLoadTable(TEST_DB_NAME, tableToInsertMulPart, "test", null);
+    // copy files from the source tables so that we have some data
+    copyFiles(allTypes.getFileSystem(),
+        new Path(allTypes.getHdfsBaseDir() + "/year=2009/month=1"),
+        insertTbl.getFileSystem(),
+        new Path(insertTbl.getHdfsBaseDir() + "/year=2009/month=1"), true, null);
+    copyFiles(allTypes.getFileSystem(),
+        new Path(allTypes.getHdfsBaseDir() + "/year=2009/month=2"),
+        insertTbl.getFileSystem(),
+        new Path(insertTbl.getHdfsBaseDir() + "/year=2009/month=2"), true, null);
+    copyFiles(allTypes.getFileSystem(),
+        new Path(allTypes.getHdfsBaseDir() + "/year=2009/month=1"),
+        multiInsertTbl.getFileSystem(),
+        new Path(multiInsertTbl.getHdfsBaseDir() + "/year=2009/month=1"), true, null);
+    copyFiles(allTypes.getFileSystem(),
+        new Path(allTypes.getHdfsBaseDir() + "/year=2009/month=2"),
+        multiInsertTbl.getFileSystem(),
+        new Path(multiInsertTbl.getHdfsBaseDir() + "/year=2009/month=2"), true, null);
+    // load the created tables
+    catalog_.reloadTable(multiInsertTbl, "test");
+    catalog_.reloadTable(insertTbl, "test");
+    eventsProcessor_.processEvents();
+  }
+
+  private void runInsertTest(String tableToInsertPart, String tableToInsertMulPart,
+      long numberOfSelfEventsBefore, boolean overwrite) throws Exception {
     // insert into partition
-    insertFromImpala(tableToInsertPart, true, "1", "100");
-    insertFromImpala(tableToInsertPart, true, "1", "200");
+    HdfsTable allTypes = (HdfsTable) catalog_
+        .getOrLoadTable("functional", "alltypes", "test", null);
+    HdfsTable insertTbl = (HdfsTable) catalog_
+        .getOrLoadTable(TEST_DB_NAME, tableToInsertPart, "test", null);
+    HdfsTable multiInsertTbl = (HdfsTable) catalog_
+        .getOrLoadTable(TEST_DB_NAME, tableToInsertMulPart, "test", null);
+    // we copy files from the src tbl and then issue a insert catalogOp to simulate a
+    // insert operation
+    List<String> tbl1Part1Files = copyFiles(allTypes.getFileSystem(),
+        new Path(allTypes.getHdfsBaseDir() + "/year=2009/month=1"),
+        insertTbl.getFileSystem(),
+        new Path(insertTbl.getHdfsBaseDir() + "/year=2009/month=1"), overwrite, "copy_");
+    List<String> tbl1Part2Files = copyFiles(allTypes.getFileSystem(),
+        new Path(allTypes.getHdfsBaseDir() + "/year=2009/month=2"),
+        insertTbl.getFileSystem(),
+        new Path(insertTbl.getHdfsBaseDir() + "/year=2009/month=2"), overwrite, "copy_");
+    List<String> tbl2Part1Files = copyFiles(allTypes.getFileSystem(),
+        new Path(allTypes.getHdfsBaseDir() + "/year=2009/month=1"),
+        multiInsertTbl.getFileSystem(),
+        new Path(multiInsertTbl.getHdfsBaseDir() + "/year=2009/month=1"), overwrite,
+        "copy_");
+    List<String> tbl2Part2Files = copyFiles(allTypes.getFileSystem(),
+        new Path(allTypes.getHdfsBaseDir() + "/year=2009/month=2"),
+        multiInsertTbl.getFileSystem(),
+        new Path(multiInsertTbl.getHdfsBaseDir() + "/year=2009/month=2"), overwrite,
+        "copy_");
+    insertFromImpala(tableToInsertPart, true, "year=2009", "month=1", overwrite);
+    insertFromImpala(tableToInsertPart, true, "year=2009", "month=2", overwrite);
     // insert into multiple partition
     Set<String> created_partitions = new HashSet<String>();
-    String partition1 = "p1=1/p2=100/";
-    String partition2 = "p1=1/p2=200/";
+    String partition1 = "year=2009/month=1/";
+    String partition2 = "year=2009/month=2/";
     created_partitions.add(partition1);
     created_partitions.add(partition2);
-    insertMulPartFromImpala(tableToInsertMulPart, tableToInsertPart, created_partitions);
+    insertMulPartFromImpala(tableToInsertMulPart, tableToInsertPart, created_partitions,
+        overwrite);
+    // we expect 3 INSERT events (2 for the insertTbl and 1 for multiInsertTbl)
+    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    assertEquals(4, events.size());
+    assertEquals(tbl1Part1Files, getFilesFromEvent(events.get(0)));
+    assertEquals(tbl1Part2Files, getFilesFromEvent(events.get(1)));
+    assertEquals(tbl2Part1Files, getFilesFromEvent(events.get(2)));
+    assertEquals(tbl2Part2Files, getFilesFromEvent(events.get(3)));
     eventsProcessor_.processEvents();
 
     // Test insert into table
-    String tableToInsert = "tbl_to_insert";
-    createTableFromImpala(TEST_DB_NAME, tableToInsert, false);
-    insertFromImpala(tableToInsert, false, "", "");
+    String unpartitionedTbl = "tbl_to_insert";
+    createTableLike("functional", "tinytable", TEST_DB_NAME, unpartitionedTbl);
+    HdfsTable tinyTable = (HdfsTable) catalog_
+        .getOrLoadTable("functional", "tinytable", "test", null);
+    HdfsTable unpartTable =
+        (HdfsTable) catalog_.getOrLoadTable(TEST_DB_NAME, unpartitionedTbl, "test", null);
+    copyFiles(tinyTable.getFileSystem(), new Path(tinyTable.getHdfsBaseDir()),
+        unpartTable.getFileSystem(), new Path(unpartTable.getHdfsBaseDir()), overwrite,
+        "copy_");
+    insertFromImpala(unpartitionedTbl, false, "", "", overwrite);
     eventsProcessor_.processEvents();
 
     long selfEventsCountAfter =
@@ -776,6 +883,50 @@
         numberOfSelfEventsBefore + 5, selfEventsCountAfter);
   }
 
+  private List<String> getFilesFromEvent(NotificationEvent event) {
+    assertEquals("INSERT", event.getEventType());
+    List<String> files= new ArrayList<>();
+    for (String f : MetastoreEventsProcessor.getMessageDeserializer()
+        .getInsertMessage(event.getMessage()).getFiles()) {
+      // Metastore InsertMessage appends "###" for some reason. Ignoring that bit in the
+      // comparison below.
+      files.add(f.replaceAll("###", ""));
+    }
+    return files;
+  }
+
+  private static final Configuration CONF = new Configuration();
+
+  private static List<String> copyFiles(FileSystem srcFs, Path src, FileSystem destFs,
+      Path dest, boolean overwrite, String prefix) throws Exception {
+    FSDataOutputStream out = null;
+    try {
+      if (srcFs.isDirectory(src)) {
+        if (!destFs.exists(dest)) {
+          destFs.mkdirs(dest);
+        } else if(overwrite) {
+          destFs.delete(dest, true);
+          destFs.mkdirs(dest);
+        }
+      }
+      List<String> filesCopied = new ArrayList<>();
+      RemoteIterator<? extends FileStatus> it = FileSystemUtil
+          .listStatus(srcFs, src, true);
+      while (it.hasNext()) {
+        FileStatus status = it.next();
+        if (status.isDirectory()) continue;
+        InputStream in = srcFs.open(status.getPath());
+        String copyFileName = (prefix == null ? "" : prefix) + status.getPath().getName();
+        out = destFs.create(new Path(dest, copyFileName), false);
+        IOUtils.copyBytes(in, out, CONF, true);
+        filesCopied.add(new Path(dest, copyFileName).toString());
+      }
+      return filesCopied;
+    } catch (IOException ex) {
+      IOUtils.closeStream(out);
+      throw ex;
+    }
+  }
   /**
    * Test generates a sequence of create_table, insert and drop_table in the event stream
    * to make sure when the insert event is processed on a removed table, it doesn't cause
@@ -900,10 +1051,14 @@
     List <String> newFiles = addFilesToDirectory(parentPath, "testFile.",
         totalNumberOfFilesToAdd, isOverwrite);
     try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
-      List<InsertEventInfo> insertEventInfos = new ArrayList<>();
-      insertEventInfos.add(new InsertEventInfo(null, newFiles, isOverwrite));
+      List<InsertEventRequestData> partitionInsertEventInfos = new ArrayList<>();
+      InsertEventRequestData insertEventRequestData = new InsertEventRequestData();
+      insertEventRequestData.setFilesAdded(newFiles);
+      insertEventRequestData.setReplace(isOverwrite);
+      partitionInsertEventInfos
+          .add(insertEventRequestData);
       MetastoreShim.fireInsertEventHelper(metaStoreClient.getHiveClient(),
-          insertEventInfos, msTbl.getDbName(), msTbl.getTableName());
+          partitionInsertEventInfos, msTbl.getDbName(), msTbl.getTableName());
     }
   }
 
@@ -2436,6 +2591,19 @@
     catalogOpExecutor_.execDdlRequest(req);
   }
 
+  private void createTableLike(String srcDb, String srcTbl, String destDb, String destTbl)
+      throws Exception {
+    HdfsTable table = (HdfsTable) catalog_.getOrLoadTable(srcDb, srcTbl, "Test", null);
+    TCreateTableLikeParams createTableLikeParams = new TCreateTableLikeParams();
+    createTableLikeParams.setSrc_table_name(new TTableName(srcDb, srcTbl));
+    createTableLikeParams.setTable_name(new TTableName(destDb, destTbl));
+    createTableLikeParams.setIs_external(false);
+    TDdlExecRequest req = new TDdlExecRequest();
+    req.setDdl_type(TDdlType.CREATE_TABLE_LIKE);
+    req.create_table_like_params = createTableLikeParams;
+    catalogOpExecutor_.execDdlRequest(req);
+  }
+
   private void createTableFromImpala(String dbName, String tblName, boolean isPartitioned)
       throws ImpalaException {
     createTableFromImpala(dbName, tblName, null, isPartitioned);
@@ -2781,11 +2949,11 @@
    * Insert multiple partitions into table from Impala
    */
   private void insertMulPartFromImpala(String tblName1, String tblName2,
-      Set<String> created_partitions) throws ImpalaException {
+      Set<String> created_partitions, boolean overwrite) throws ImpalaException {
     String insert_mul_part = String.format(
         "insert into table %s partition(p1, p2) select * from %s", tblName1, tblName2);
     TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest(
-        TEST_DB_NAME, tblName1, insert_mul_part, created_partitions);
+        TEST_DB_NAME, tblName1, insert_mul_part, created_partitions, overwrite);
     catalogOpExecutor_.updateCatalog(testInsertRequest);
   }
 
@@ -2796,16 +2964,16 @@
    * @return
    */
   private void insertFromImpala(String tblName, boolean isPartitioned, String p1val,
-      String p2val) throws ImpalaException {
-    String partition = String.format("partition (p1=%s, p2='%s')", p1val, p2val);
+      String p2val, boolean isOverwrite) throws ImpalaException {
+    String partition = String.format("partition (%s, %s)", p1val, p2val);
     String test_insert_tbl = String.format("insert into table %s %s values ('a','aa') ",
         tblName, isPartitioned ? partition : "");
-    Set<String> created_partitions = new HashSet<String>();
+    Set<String> created_partitions = new HashSet<>();
     String created_part_str =
-        isPartitioned ? String.format("p1=%s/p2=%s/", p1val, p2val) : "";
+        isPartitioned ? String.format("%s/%s/", p1val, p2val) : "";
     created_partitions.add(created_part_str);
     TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest(
-        TEST_DB_NAME, tblName, test_insert_tbl, created_partitions);
+        TEST_DB_NAME, tblName, test_insert_tbl, created_partitions, isOverwrite);
     catalogOpExecutor_.updateCatalog(testInsertRequest);
   }
 
@@ -2818,13 +2986,15 @@
    * @return
    */
   private TUpdateCatalogRequest createTestTUpdateCatalogRequest(String dBName,
-      String tableName, String redacted_sql_stmt, Set<String> created_partitions) {
+      String tableName, String redacted_sql_stmt, Set<String> created_partitions,
+      boolean isOverwrite) {
     TUpdateCatalogRequest tUpdateCatalogRequest = new TUpdateCatalogRequest();
     tUpdateCatalogRequest.setDb_name(dBName);
     tUpdateCatalogRequest.setTarget_table(tableName);
     tUpdateCatalogRequest.setCreated_partitions((created_partitions));
     tUpdateCatalogRequest.setHeader(new TCatalogServiceRequestHeader());
     tUpdateCatalogRequest.getHeader().setRedacted_sql_stmt(redacted_sql_stmt);
+    if (isOverwrite) tUpdateCatalogRequest.setIs_overwrite(true);
     return tUpdateCatalogRequest;
   }
 
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
index f71dfc6..b8c4474 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/SynchronousHMSEventProcessorForTests.java
@@ -17,8 +17,15 @@
 
 package org.apache.impala.catalog.events;
 
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.thrift.TException;
 
 /**
  * A test MetastoreEventProcessor which executes in the same thread. Useful for testing
@@ -35,4 +42,29 @@
   public void startScheduler() {
     // nothing to do here; there is no background thread for this processor
   }
+
+  public List<NotificationEvent> getNextNotificationEvents(long eventId)
+      throws MetastoreNotificationFetchException {
+    try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      // fetch the current notification event id. We assume that the polling interval
+      // is small enough that most of these polling operations result in zero new
+      // events. In such a case, fetching current notification event id is much faster
+      // (and cheaper on HMS side) instead of polling for events directly
+      CurrentNotificationEventId currentNotificationEventId =
+          msClient.getHiveClient().getCurrentNotificationEventId();
+      long currentEventId = currentNotificationEventId.getEventId();
+
+      // no new events since we last polled
+      if (currentEventId <= eventId) {
+        return Collections.emptyList();
+      }
+
+      NotificationEventResponse response = msClient.getHiveClient()
+          .getNextNotification(eventId, 1000, null);
+      return response.getEvents();
+    } catch (TException e) {
+      throw new MetastoreNotificationFetchException(
+          "Unable to fetch notifications from metastore", e);
+    }
+  }
 }
diff --git a/fe/src/test/resources/hive-site.xml.py b/fe/src/test/resources/hive-site.xml.py
index 3fc6a80..c14f982 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -56,6 +56,12 @@
   # TODO(todd): should we be enabling stats autogather?
   'hive.stats.autogather': 'false',
   'hive.support.concurrency': 'true',
+  # There are tests which issue Hive's replication command. The repl
+  # dump command will wait in case there are any open transactions.
+  # The default value of this configuration is 1h which will block
+  # the test for long time. Overriding this configuration helps with
+  # the runtime of the test.
+  'hive.repl.bootstrap.dump.open.txn.timeout': '120s'
 })
 
 if variant == 'changed_external_dir':
diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py
index d88e388..47510ae 100644
--- a/tests/custom_cluster/test_event_processing.py
+++ b/tests/custom_cluster/test_event_processing.py
@@ -187,6 +187,179 @@
     self.__run_self_events_test(unique_database, True)
     self.__run_self_events_test(unique_database, False)
 
+  @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1")
+  def test_event_based_replication(self):
+    self.__run_event_based_replication_tests()
+
+  def __run_event_based_replication_tests(self, transactional=True):
+    """Hive Replication relies on the insert events generated on the tables.
+    This test issues some basic replication commands from Hive and makes sure
+    that the replicated table has correct data."""
+    TBLPROPERTIES = self.__get_transactional_tblproperties(transactional)
+    source_db = self.__get_random_name("repl_source_")
+    target_db = self.__get_random_name("repl_target_")
+    unpartitioned_tbl = "unpart_tbl"
+    partitioned_tbl = "part_tbl"
+    try:
+      self.run_stmt_in_hive("create database {0}".format(source_db))
+      self.run_stmt_in_hive(
+        "alter database {0} set dbproperties ('repl.source.for'='xyz')".format(source_db))
+      # explicit create table command since create table like doesn't allow tblproperties
+      self.client.execute("create table {0}.{1} (a string, b string) stored as parquet"
+        " {2}".format(source_db, unpartitioned_tbl, TBLPROPERTIES))
+      EventProcessorUtils.wait_for_event_processing(self)
+      self.client.execute(
+        "create table {0}.{1} (id int, bool_col boolean, tinyint_col tinyint, "
+        "smallint_col smallint, int_col int, bigint_col bigint, float_col float, "
+        "double_col double, date_string string, string_col string, "
+        "timestamp_col timestamp) partitioned by (year int, month int) stored as parquet"
+        " {2}".format(source_db, partitioned_tbl, TBLPROPERTIES))
+
+      # case I: insert
+      # load the table with some data from impala, this also creates new partitions.
+      self.client.execute("insert into {0}.{1}"
+        " select * from functional.tinytable".format(source_db,
+          unpartitioned_tbl))
+      self.client.execute("insert into {0}.{1} partition(year,month)"
+        " select * from functional_parquet.alltypessmall".format(
+          source_db, partitioned_tbl))
+      rows_in_unpart_tbl = int(self.execute_scalar(
+        "select count(*) from {0}.{1}".format(source_db, unpartitioned_tbl)).split('\t')[
+        0])
+      rows_in_part_tbl = int(self.execute_scalar(
+        "select count(*) from {0}.{1}".format(source_db, partitioned_tbl)).split('\t')[0])
+      assert rows_in_unpart_tbl > 0
+      assert rows_in_part_tbl > 0
+      # bootstrap the replication
+      self.run_stmt_in_hive("repl dump {0}".format(source_db))
+      # create a target database where tables will be replicated
+      self.client.execute("create database {0}".format(target_db))
+      # replicate the table from source to target
+      self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db))
+      EventProcessorUtils.wait_for_event_processing(self)
+      assert unpartitioned_tbl in self.client.execute(
+        "show tables in {0}".format(target_db)).get_data()
+      assert partitioned_tbl in self.client.execute(
+        "show tables in {0}".format(target_db)).get_data()
+      # confirm the number of rows in target match with the source table.
+      rows_in_unpart_tbl_target = int(self.execute_scalar(
+        "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
+          .split('\t')[0])
+      rows_in_part_tbl_target = int(self.execute_scalar(
+        "select count(*) from {0}.{1}".format(target_db, partitioned_tbl))
+          .split('\t')[0])
+      assert rows_in_unpart_tbl == rows_in_unpart_tbl_target
+      assert rows_in_part_tbl == rows_in_part_tbl_target
+
+      # case II: insert into existing partitions.
+      self.client.execute("insert into {0}.{1}"
+        " select * from functional.tinytable".format(
+          source_db, unpartitioned_tbl))
+      self.client.execute("insert into {0}.{1} partition(year,month)"
+        " select * from functional_parquet.alltypessmall".format(
+          source_db, partitioned_tbl))
+      self.run_stmt_in_hive("repl dump {0}".format(source_db))
+      # replicate the table from source to target
+      self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db))
+      # we wait until the events catch up in case repl command above did some HMS
+      # operations.
+      EventProcessorUtils.wait_for_event_processing(self)
+      # confirm the number of rows in target match with the source table.
+      rows_in_unpart_tbl_target = int(self.execute_scalar(
+        "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
+          .split('\t')[0])
+      rows_in_part_tbl_target = int(self.execute_scalar(
+        "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)).split('\t')[0])
+      assert 2 * rows_in_unpart_tbl == rows_in_unpart_tbl_target
+      assert 2 * rows_in_part_tbl == rows_in_part_tbl_target
+
+      # Case III: insert overwrite
+      # impala does a insert overwrite of the tables.
+      self.client.execute("insert overwrite table {0}.{1}"
+        " select * from functional.tinytable".format(
+          source_db, unpartitioned_tbl))
+      self.client.execute("insert overwrite table {0}.{1} partition(year,month)"
+        " select * from functional_parquet.alltypessmall".format(
+          source_db, partitioned_tbl))
+      self.run_stmt_in_hive("repl dump {0}".format(source_db))
+      # replicate the table from source to target
+      self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db))
+      # we wait until the events catch up in case repl command above did some HMS
+      # operations.
+      EventProcessorUtils.wait_for_event_processing(self)
+      # confirm the number of rows in target match with the source table.
+      rows_in_unpart_tbl_target = int(self.execute_scalar(
+        "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
+          .split('\t')[0])
+      rows_in_part_tbl_target = int(self.execute_scalar(
+        "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)).split('\t')[0])
+      assert rows_in_unpart_tbl == rows_in_unpart_tbl_target
+      assert rows_in_part_tbl == rows_in_part_tbl_target
+
+      # Case IV: CTAS which creates a transactional table.
+      self.client.execute(
+        "create table {0}.insertonly_nopart_ctas {1} as "
+        "select * from {0}.{2}".format(source_db, TBLPROPERTIES, unpartitioned_tbl))
+      self.client.execute(
+        "create table {0}.insertonly_part_ctas partitioned by (year, month) {1}"
+        " as select * from {0}.{2}".format(source_db, TBLPROPERTIES, partitioned_tbl))
+      self.run_stmt_in_hive("repl dump {0}".format(source_db))
+      # replicate the table from source to target
+      self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db))
+      # we wait until the events catch up in case repl command above did some HMS
+      # operations.
+      EventProcessorUtils.wait_for_event_processing(self)
+      # confirm the number of rows in target match with the source table.
+      rows_in_unpart_tbl_source = int(self.execute_scalar("select count(*) from "
+        "{0}.insertonly_nopart_ctas".format(source_db)).split('\t')[0])
+      rows_in_unpart_tbl_target = int(self.execute_scalar("select count(*) from "
+          "{0}.insertonly_nopart_ctas".format(target_db)).split('\t')[0])
+      assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target
+      rows_in_unpart_tbl_source = int(self.execute_scalar("select count(*) from "
+        "{0}.insertonly_part_ctas".format(source_db)).split('\t')[0])
+      rows_in_unpart_tbl_target = int(self.execute_scalar("select count(*) from "
+        "{0}.insertonly_part_ctas".format(target_db)).split('\t')[0])
+      assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target
+
+      # Case V: truncate table
+      # impala truncates both the tables. Make sure replication sees that.
+      self.client.execute("truncate table {0}.{1}".format(source_db, unpartitioned_tbl))
+      self.client.execute("truncate table {0}.{1}".format(source_db, partitioned_tbl))
+      self.run_stmt_in_hive("repl dump {0}".format(source_db))
+      # replicate the table from source to target
+      self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db))
+      # we wait until the events catch up in case repl command above did some HMS
+      # operations.
+      EventProcessorUtils.wait_for_event_processing(self)
+      # confirm the number of rows in target match with the source table.
+      rows_in_unpart_tbl_target = int(self.execute_scalar(
+        "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
+          .split('\t')[0])
+      rows_in_part_tbl_target = int(self.execute_scalar(
+        "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)).split('\t')[0])
+      assert rows_in_unpart_tbl_target == 0
+      assert rows_in_part_tbl_target == 0
+    finally:
+      src_db = self.__get_db_nothrow(source_db)
+      target_db_obj = self.__get_db_nothrow(target_db)
+      if src_db is not None:
+        self.run_stmt_in_hive(
+          "alter database {0} set dbproperties ('repl.source.for'='')".format(source_db))
+        self.run_stmt_in_hive("drop database if exists {0} cascade".format(source_db))
+      if target_db_obj is not None:
+        self.run_stmt_in_hive("drop database if exists {0} cascade".format(target_db))
+      # workaround for HIVE-24135. the managed db location doesn't get cleaned up
+      if src_db is not None and src_db.managedLocationUri is not None:
+        self.filesystem_client.delete_file_dir(src_db.managedLocationUri, True)
+      if target_db_obj is not None and target_db_obj.managedLocationUri is not None:
+        self.filesystem_client.delete_file_dir(target_db.src_db.managedLocationUri, True)
+
+  def __get_db_nothrow(self, name):
+    try:
+      return self.hive_client.get_database(name)
+    except:
+      return None
+
   def _run_test_empty_partition_events(self, unique_database, is_transactional):
     test_tbl = unique_database + ".test_events"
     TBLPROPERTIES = self.__get_transactional_tblproperties(is_transactional)
@@ -321,6 +494,7 @@
           # drop non-existing partition; essentially this is a no-op
           "alter table {0}.{1} drop if exists partition (year=2100, month=1)".format(
             db_name, tbl_name),
+          # empty table case where no insert events are generated
           "insert overwrite {0}.{1} select * from {0}.{1}".format(
             db_name, empty_unpartitioned_tbl),
           "insert overwrite {0}.{1} partition(part) select * from {0}.{1}".format(
@@ -328,10 +502,14 @@
       ]
     }
     if HIVE_MAJOR_VERSION >= 3:
-      # insert into a existing partition; generates INSERT events
+      # insert into a existing partition; generates INSERT self-event
       self_event_test_queries[True].append("insert into table {0}.{1} partition "
           "(year, month) select * from functional.alltypessmall where year=2009 "
           "and month=1".format(db_name, tbl2))
+      # insert overwrite query from Impala also generates a INSERT self-event
+      self_event_test_queries[True].append("insert overwrite table {0}.{1} partition "
+         "(year, month) select * from functional.alltypessmall where year=2009 "
+         "and month=1".format(db_name, tbl2))
     return self_event_test_queries
 
   def __get_hive_test_queries(self, db_name, recover_tbl_name):