IMPALA-10076: Reduce partition level update logs

Catalogd and the legacy mode coordinator log catalog topic items one by
a line. A table can have more than 100K partitions. It's too verbose to
log all partition level updates. This patch aggregates the partition
level updates of each table and only logs the aggregated info in a line.
For instance, here is an aggregated log:

Collected 24 partition update(s):
1:HDFS_PARTITION:test.tbl:(p=1,p=2,...,p=24), version=1451, original
size=(avg=646, min=646, max=648, sum=15516), compressed size=(avg=478,
min=475, max=485, sum=11487)

If there are more than 3 partitions, we only show the lexicographically
smallest, the second smallest and the largest partition names. If there
are only one partition update, the log format is the same as before:

Collected 1 partition update(s): 1:HDFS_PARTITION:test.tbl:p=0,
version=1451, original size=648, compressed size=475

To support these, this patch implements a helper class,
PartitionMetaSummary, to aggregate these information and get a summary
for logging. The compressed size is calculated by BE. So the jni method
for NativeAddPendingTopicItem is extended to return the actual
compressed size.

Tests:
 - Added unit tests for PartitionMetaSummary.
 - Start the cluster in mixed catalog mode and manually verified the
   logs.

Change-Id: Ic48946b2f8b0be1e73988092d03a004836f1b368
Reviewed-on: http://gerrit.cloudera.org:8080/16375
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index b67172b..0148745 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -723,7 +723,7 @@
   }
 }
 
-bool CatalogServer::AddPendingTopicItem(std::string key, int64_t version,
+int CatalogServer::AddPendingTopicItem(std::string key, int64_t version,
     const uint8_t* item_data, uint32_t size, bool deleted) {
   pending_topic_updates_.emplace_back();
   TTopicItem& item = pending_topic_updates_.back();
@@ -732,7 +732,7 @@
     if (!status.ok()) {
       pending_topic_updates_.pop_back();
       LOG(ERROR) << "Error compressing topic item: " << status.GetDetail();
-      return false;
+      return -1;
     }
   } else {
     item.value.assign(reinterpret_cast<const char*>(item_data),
@@ -740,11 +740,13 @@
   }
   item.key = std::move(key);
   item.deleted = deleted;
+  // Skip logging partition items since FE will log their summary (IMPALA-10076).
+  if (item.key.find("HDFS_PARTITION") != string::npos) return item.value.size();
   VLOG(1) << "Collected " << (deleted ? "deletion: " : "update: ") << item.key
           << ", version=" << version << ", original size=" << size
           << (FLAGS_compact_catalog_topic ?
               Substitute(", compressed size=$0", item.value.size()) : string());
-  return true;
+  return item.value.size();
 }
 
 void CatalogServer::MarkServiceAsStarted() { service_started_ = true; }
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 5cb3a2f..959d444 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -76,9 +76,9 @@
   }
   Catalog* catalog() const { return catalog_.get(); }
 
-  /// Add a topic item to pending_topic_updates_. Caller must hold catalog_lock_.
-  /// The return value is true if the operation succeeds and false otherwise.
-  bool AddPendingTopicItem(std::string key, int64_t version, const uint8_t* item_data,
+  /// Adds a topic item to pending_topic_updates_. Caller must hold catalog_lock_.
+  /// Returns the actual size of the item data. Returns a negative value for failures.
+  int AddPendingTopicItem(std::string key, int64_t version, const uint8_t* item_data,
       uint32_t size, bool deleted);
 
   /// Mark service as started. Should be called only after the thrift server hosting this
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 3dd2a0b..0a25997 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -464,7 +464,7 @@
 
 // Add a catalog update to pending_topic_updates_.
 extern "C"
-JNIEXPORT jboolean JNICALL
+JNIEXPORT jint JNICALL
 Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem(JNIEnv* env,
     jclass fe_support_class, jlong native_catalog_server_ptr, jstring key, jlong version,
     jbyteArray serialized_object, jboolean deleted) {
@@ -472,18 +472,18 @@
   {
     JniUtfCharGuard key_str;
     if (!JniUtfCharGuard::create(env, key, &key_str).ok()) {
-      return static_cast<jboolean>(false);
+      return static_cast<jint>(-1);
     }
     key_string.assign(key_str.get());
   }
   JniScopedArrayCritical obj_buf;
   if (!JniScopedArrayCritical::Create(env, serialized_object, &obj_buf)) {
-    return static_cast<jboolean>(false);
+    return static_cast<jint>(-1);
   }
-  reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->
+  int res = reinterpret_cast<CatalogServer*>(native_catalog_server_ptr)->
       AddPendingTopicItem(std::move(key_string), version, obj_buf.get(),
       static_cast<uint32_t>(obj_buf.size()), deleted);
-  return static_cast<jboolean>(true);
+  return static_cast<jint>(res);
 }
 
 // Get the next catalog update pointed by 'callback_ctx'.
@@ -732,7 +732,7 @@
   },
   {
       const_cast<char*>("NativeAddPendingTopicItem"),
-      const_cast<char*>("(JLjava/lang/String;J[BZ)Z"),
+      const_cast<char*>("(JLjava/lang/String;J[BZ)I"),
       (void*)::Java_org_apache_impala_service_FeSupport_NativeAddPendingTopicItem
   },
   {
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 556192d..8401b3b 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -85,6 +85,7 @@
 DECLARE_int32(num_check_authorization_threads);
 DECLARE_bool(use_customized_user_groups_mapper_for_ranger);
 DECLARE_bool(enable_column_masking);
+DECLARE_bool(compact_catalog_topic);
 
 namespace impala {
 
@@ -172,6 +173,7 @@
   cfg.__set_use_customized_user_groups_mapper_for_ranger(
       FLAGS_use_customized_user_groups_mapper_for_ranger);
   cfg.__set_enable_column_masking(FLAGS_enable_column_masking);
+  cfg.__set_compact_catalog_topic(FLAGS_compact_catalog_topic);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 6d5b548..48ac997 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -157,4 +157,6 @@
   66: required bool enable_column_masking
 
   67: required bool enable_insert_events
+
+  68: required bool compact_catalog_topic
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 116b8dd..0c22e6b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -685,6 +685,11 @@
     }
 
     void addCatalogObject(TCatalogObject obj, boolean delete) throws TException {
+      addCatalogObject(obj, delete, null);
+    }
+
+    void addCatalogObject(TCatalogObject obj, boolean delete,
+        PartitionMetaSummary summary) throws TException {
       String key = Catalog.toCatalogObjectKey(obj);
       if (obj.type != TCatalogObjectType.CATALOG) {
         topicUpdateLog_.add(key,
@@ -696,10 +701,14 @@
       if (topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED) {
         String v1Key = CatalogServiceConstants.CATALOG_TOPIC_V1_PREFIX + key;
         byte[] data = serializer.serialize(obj);
-        if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v1Key,
-            obj.catalog_version, data, delete)) {
+        int actualSize = FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
+            v1Key, obj.catalog_version, data, delete);
+        if (actualSize < 0) {
           LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v1Key + ", delete="
               + delete + ", data_size=" + data.length);
+        } else if (summary != null && obj.type == HDFS_PARTITION) {
+          summary.update(true, delete, obj.hdfs_partition.partition_name,
+              obj.catalog_version, data.length, actualSize);
         }
       }
 
@@ -711,10 +720,14 @@
         if (minimalObject != null) {
           byte[] data = serializer.serialize(minimalObject);
           String v2Key = CatalogServiceConstants.CATALOG_TOPIC_V2_PREFIX + key;
-          if (!FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr, v2Key,
-              obj.catalog_version, data, delete)) {
+          int actualSize = FeSupport.NativeAddPendingTopicItem(nativeCatalogServerPtr,
+              v2Key, obj.catalog_version, data, delete);
+          if (actualSize < 0) {
             LOG.error("NativeAddPendingTopicItem failed in BE. key=" + v2Key + ", delete="
                 + delete + ", data_size=" + data.length);
+          } else if (summary != null && obj.type == HDFS_PARTITION) {
+            summary.update(false, delete, obj.hdfs_partition.partition_name,
+                obj.catalog_version, data.length, actualSize);
           }
         }
       }
@@ -791,6 +804,15 @@
   }
 
   /**
+   * Creates a partition meta summary for the given table name.
+   */
+  private PartitionMetaSummary createPartitionMetaSummary(String tableName) {
+    return new PartitionMetaSummary(tableName, /*inCatalogd*/true,
+        topicMode_ == TopicMode.FULL || topicMode_ == TopicMode.MIXED,
+        topicMode_ == TopicMode.MINIMAL || topicMode_ == TopicMode.MIXED);
+  }
+
+  /**
    * Identifies the catalog objects that were added/modified/deleted in the catalog with
    * versions > 'fromVersion'. It operates on a snaphsot of the catalog without holding
    * the catalog lock which means that other concurrent metadata operations can still make
@@ -847,6 +869,9 @@
         Preconditions.checkState(
             !hdfsTable.has_full_partitions && hdfsTable.has_partition_names,
             /*errorMessage*/hdfsTable);
+        String tblName = removedObject.getTable().db_name + "."
+            + removedObject.getTable().tbl_name;
+        PartitionMetaSummary deleteSummary = createPartitionMetaSummary(tblName);
         for (THdfsPartition part : hdfsTable.partitions.values()) {
           Preconditions.checkState(part.id >= HdfsPartition.INITIAL_PARTITION_ID
               && part.db_name != null
@@ -857,9 +882,10 @@
           removedPart.setHdfs_partition(part);
           if (!ctx.updatedCatalogObjects.contains(
               Catalog.toCatalogObjectKey(removedPart))) {
-            ctx.addCatalogObject(removedPart, true);
+            ctx.addCatalogObject(removedPart, true, deleteSummary);
           }
         }
+        LOG.info(deleteSummary.toString());
       }
     }
     // Each topic update should contain a single "TCatalog" object which is used to
@@ -1318,11 +1344,14 @@
     // statestored restarts).
     if (ctx.isFullUpdate()) hdfsTable.resetMaxSentPartitionId();
 
+    PartitionMetaSummary updateSummary = createPartitionMetaSummary(
+        hdfsTable.getFullName());
+
     // Add updates for new partitions.
     long maxSentId = hdfsTable.getMaxSentPartitionId();
     for (TCatalogObject catalogPart : hdfsTable.getNewPartitionsSinceLastUpdate()) {
       maxSentId = Math.max(maxSentId, catalogPart.getHdfs_partition().getId());
-      ctx.addCatalogObject(catalogPart, false);
+      ctx.addCatalogObject(catalogPart, false, updateSummary);
     }
     hdfsTable.setMaxSentPartitionId(maxSentId);
 
@@ -1330,10 +1359,12 @@
       TCatalogObject removedPart = part.toMinimalTCatalogObject();
       if (!ctx.updatedCatalogObjects.contains(
           Catalog.toCatalogObjectKey(removedPart))) {
-        ctx.addCatalogObject(removedPart, true);
+        ctx.addCatalogObject(removedPart, true, updateSummary);
       }
     }
     hdfsTable.resetDroppedPartitions();
+
+    LOG.info(updateSummary.toString());
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
index e3fa783..54ea1ca 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ImpaladCatalog.java
@@ -206,6 +206,7 @@
     // Maps that group incremental partition updates by table names so we can apply them
     // when updating the table.
     Map<TableName, List<THdfsPartition>> newPartitionsByTable = new HashMap<>();
+    Map<TableName, PartitionMetaSummary> partUpdates = new HashMap<>();
     long newCatalogVersion = lastSyncedCatalogVersion_.get();
     Pair<Boolean, ByteBuffer> update;
     while ((update = FeSupport.NativeGetNextCatalogObjectUpdate(req.native_iterator_ptr))
@@ -219,22 +220,34 @@
         LOG.info("Received large catalog object(>100mb): " + key + " is " + len +
             "bytes");
       }
-      LOG.info((isDelete ? "Deleting: " : "Adding: ") + key + " version: "
-          + obj.catalog_version + " size: " + len);
+      if (!key.contains("HDFS_PARTITION")) {
+        LOG.info((isDelete ? "Deleting: " : "Adding: ") + key + " version: "
+            + obj.catalog_version + " size: " + len);
+      }
       // For statestore updates, the service ID and updated version is wrapped in a
       // CATALOG catalog object.
       if (obj.type == TCatalogObjectType.CATALOG) {
         setCatalogServiceId(obj.catalog.catalog_service_id);
         newCatalogVersion = obj.catalog_version;
-      } else if (obj.type == TCatalogObjectType.HDFS_PARTITION && !isDelete) {
+      } else if (obj.type == TCatalogObjectType.HDFS_PARTITION) {
         TableName tblName = new TableName(obj.getHdfs_partition().db_name,
             obj.getHdfs_partition().tbl_name);
-        newPartitionsByTable.computeIfAbsent(tblName, (s) -> new ArrayList<>())
-            .add(obj.getHdfs_partition());
+        partUpdates.computeIfAbsent(tblName,
+            (s) -> new PartitionMetaSummary(tblName.toString(), /*inCatalogd*/false,
+                /*hasV1Updates*/true, /*hasV2Updates*/false))
+            .update(/*isV1Key*/true, isDelete, obj.hdfs_partition.partition_name,
+                obj.catalog_version, len, /*unused*/-1);
+        if (!isDelete) {
+          newPartitionsByTable.computeIfAbsent(tblName, (s) -> new ArrayList<>())
+              .add(obj.getHdfs_partition());
+        }
       } else {
         sequencer.add(obj, isDelete);
       }
     }
+    for (PartitionMetaSummary summary : partUpdates.values()) {
+      LOG.info(summary.toString());
+    }
 
     for (TCatalogObject catalogObject: sequencer.getUpdatedObjects()) {
       try {
diff --git a/fe/src/main/java/org/apache/impala/catalog/PartitionMetaSummary.java b/fe/src/main/java/org/apache/impala/catalog/PartitionMetaSummary.java
new file mode 100644
index 0000000..aac98e0
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/PartitionMetaSummary.java
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.collect.Iterables;
+import org.apache.impala.service.BackendConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Helper class to get a summary for partition updates of a table when logging topic
+ * updates. We log each topic item except partition items since they are too many.
+ * Catalogd and impalad use this to log a summary of the partition items. See some
+ * examples in PartitionMetaSummaryTest.
+ */
+public class PartitionMetaSummary {
+  private static final String ORIGINAL_SIZE_METRIC = "original-size";
+  private static final String ACTUAL_SIZE_METRIC = "actual-size";
+  private static final String V1 = "v1";
+  private static final String V2 = "v2";
+
+  // Whether we are used in catalogd. If true, toString() will return info about catalog
+  // modes (v1/v2).
+  private final boolean inCatalogd_;
+  private final String fullTableName_;
+
+  // Whether we have v1 updates. Set in update().
+  private final boolean hasV1Updates_;
+  // Whether we have v2 updates. Set in update().
+  private final boolean hasV2Updates_;
+
+  // Number of total updates. If there are both v1 and v2 updates, only counts on v1.
+  private int numUpdatedParts_;
+  // Number of total deletes. If there are both v1 and v2 deletes, only counts on v1.
+  private int numDeletedParts_;
+  // Stores the lexicographically smallest, second smallest and the largest partition
+  // names. Number of the updated partitions could be more than 100K, so we just keep
+  // three of them. For easily picking up the three values together, we use an array with
+  // 3 items instead of 3 individual vars.
+  private final String[] updatedPartNames_ = new String[3];
+  // Same as above but for deleted partitions.
+  private final String[] deletedPartNames_ = new String[3];
+  // All update versions we've seen. In case the impalad is processing multiple updates of
+  // a table at once (e.g. after restart), there could be several versions.
+  private final Set<Long> updateVersions_ = new TreeSet<>();
+  // Same as above but for deletions.
+  private final Set<Long> deleteVersions_ = new TreeSet<>();
+
+  // Aggregated metrics of sizes of partitions, i.e. min, max, avg, sum of the original
+  // and compressed sizes.
+  private final Map<String, Map<String, Metrics>> updateMetrics_ = new HashMap<>();
+  private final Map<String, Map<String, Metrics>> deleteMetrics_ = new HashMap<>();
+
+  public PartitionMetaSummary(String fullTableName, boolean inCatalogd,
+      boolean hasV1Updates, boolean hasV2Updates) {
+    fullTableName_ = fullTableName;
+    inCatalogd_ = inCatalogd;
+    hasV1Updates_ = hasV1Updates;
+    hasV2Updates_ = hasV2Updates;
+    updateMetrics_.put(V1, newMetrics());
+    updateMetrics_.put(V2, newMetrics());
+    deleteMetrics_.put(V1, newMetrics());
+    deleteMetrics_.put(V2, newMetrics());
+  }
+
+  private Map<String, Metrics> newMetrics() {
+    Map<String, Metrics> res = new HashMap<>();
+    res.put(ORIGINAL_SIZE_METRIC, new Metrics());
+    res.put(ACTUAL_SIZE_METRIC, new Metrics());
+    return res;
+  }
+
+  /**
+   * Collects basic info of a partition level topic item.
+   */
+  public void update(boolean isV1Key, boolean delete, String partName, long version,
+      int originalSize, int actualSize) {
+    if (delete) {
+      deleteVersions_.add(version);
+    } else {
+      updateVersions_.add(version);
+    }
+
+    // Update metrics
+    Map<String, Metrics> metrics = delete ?
+        deleteMetrics_.get(isV1Key ? V1 : V2) :
+        updateMetrics_.get(isV1Key ? V1 : V2);
+    metrics.get(ORIGINAL_SIZE_METRIC).update(originalSize);
+    metrics.get(ACTUAL_SIZE_METRIC).update(actualSize);
+
+    // Processing the partition name. Skip processing v2 partition names if we are
+    // processing v1 keys to avoid duplication.
+    if (!isV1Key && hasV1Updates_) return;
+    boolean isFirst;
+    String[] partNames;
+    if (delete) {
+      isFirst = (numDeletedParts_ == 0);
+      numDeletedParts_++;
+      partNames = deletedPartNames_;
+    } else {
+      isFirst = (numUpdatedParts_ == 0);
+      numUpdatedParts_++;
+      partNames = updatedPartNames_;
+    }
+    // Updates the partition names array. partNames[0] is the lexicographically smallest
+    // one. partNames[1] is the second smallest one. partNames[2] is the largest one.
+    if (isFirst) {
+      // Init the lexicographically smallest and largest one as the first partition name.
+      partNames[0] = partName;
+      partNames[1] = null;
+      partNames[2] = partName;
+    } else {
+      // Update the lexicographically smallest and second smallest partition name.
+      if (partName.compareTo(partNames[0]) < 0) {
+        // 'partName' is smaller than the current smallest one. Shift partNames[0] to
+        // partNames[1] and update partNames[0].
+        partNames[1] = partNames[0];
+        partNames[0] = partName;
+      } else if (partNames[1] == null || partName.compareTo(partNames[1]) < 0) {
+        partNames[1] = partName;
+      }
+      // Update the lexicographically largest partition name.
+      if (partNames[2].compareTo(partName) < 0) partNames[2] = partName;
+    }
+  }
+
+  private void appendSummary(String mode, boolean isDelete, StringBuilder res) {
+    int numParts;
+    String[] partNames;
+    Map<String, Metrics> metrics;
+    Set<Long> versions;
+    if (isDelete) {
+      numParts = numDeletedParts_;
+      partNames = deletedPartNames_;
+      metrics = deleteMetrics_.get(mode);
+      versions = deleteVersions_;
+    } else {
+      numParts = numUpdatedParts_;
+      partNames = updatedPartNames_;
+      metrics = updateMetrics_.get(mode);
+      versions = updateVersions_;
+    }
+    if (numParts == 0) return;
+    if (res.length() > 0) res.append("\n");
+    if (inCatalogd_) {
+      res.append(String.format("Collected %d partition %s(s): ",
+          numParts, isDelete ? "deletion" : "update"));
+      res.append(V1.equals(mode) ? "1:" : "2:");
+    } else {
+      res.append(isDelete ? "Deleting " : "Adding ").append(numParts)
+          .append(" partition(s): ");
+    }
+    res.append("HDFS_PARTITION:").append(fullTableName_).append(":");
+    if (numParts > 1) res.append("(");
+    res.append(partNames[0]);
+    if (numParts > 1) res.append(",").append(partNames[1]);
+    if (numParts > 3) res.append(",...");
+    if (numParts > 2) res.append(",").append(partNames[2]);
+    if (numParts > 1) res.append(")");
+    if (versions.size() == 1) {
+      res.append(", version=").append(Iterables.getOnlyElement(versions));
+    } else {
+      res.append(", versions=").append(versions);
+    }
+    res.append(inCatalogd_ ? ", original size=" : ", size=")
+        .append(metrics.get(ORIGINAL_SIZE_METRIC));
+    // Compressed sizes are not collected in impalad.
+    if (inCatalogd_ && BackendConfig.INSTANCE.isCompactCatalogTopic()) {
+      res.append(", compressed size=").append(metrics.get(ACTUAL_SIZE_METRIC));
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder res = new StringBuilder();
+    if (hasV1Updates_) appendSummary(V1, false, res);
+    if (hasV2Updates_) appendSummary(V2, false, res);
+    if (hasV1Updates_) appendSummary(V1, true, res);
+    if (hasV2Updates_) appendSummary(V2, true, res);
+    return res.toString();
+  }
+
+  /**
+   * Metrics that are used in logging. Only aggregated values are stored.
+   */
+  static class Metrics {
+    private long min, max, sum, count;
+
+    public void update(int value) {
+      if (count == 0) {
+        min = max = sum = value;
+      } else {
+        sum += value;
+        max = Math.max(max, value);
+        min = Math.min(min, value);
+      }
+      count++;
+    }
+
+    public long getMean() { return (long) (sum * 1.0 / count); }
+    public long getMin() { return min; }
+    public long getMax() { return max; }
+    public long getSum() { return sum; }
+    public long getCount() { return count; }
+
+    @Override
+    public String toString() {
+      if (count == 0) return "";
+      if (count == 1) return Long.toString(sum);
+      return String.format("(avg=%d, min=%d, max=%d, sum=%d)", getMean(), min,
+          max, sum);
+    }
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index eafc39a..675040c 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -210,6 +210,8 @@
 
   public boolean isColumnMaskingEnabled() { return backendCfg_.enable_column_masking; }
 
+  public boolean isCompactCatalogTopic() { return backendCfg_.compact_catalog_topic; }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 95ce25b..bbca6bd 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -86,8 +86,8 @@
 
   // Adds a topic item to the backend's pending metadata-topic update.
   // 'serializationBuffer' is a serialized TCatalogObject.
-  // The return value is true if the operation succeeds and false otherwise.
-  public native static boolean NativeAddPendingTopicItem(long nativeCatalogServerPtr,
+  // Returns the actual value size and -1 if the operation fails.
+  public native static int NativeAddPendingTopicItem(long nativeCatalogServerPtr,
       String key, long version, byte[] serializationBuffer, boolean deleted);
 
   // Get a catalog object update from the backend. A pair of isDeletion flag and
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartitionMetaSummaryTest.java b/fe/src/test/java/org/apache/impala/catalog/PartitionMetaSummaryTest.java
new file mode 100644
index 0000000..b606148
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/PartitionMetaSummaryTest.java
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import org.apache.impala.service.FeSupport;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PartitionMetaSummaryTest {
+
+  @BeforeClass
+  public static void setUp() {
+    FeSupport.loadLibrary();
+  }
+
+  @Test
+  public void testSortingInCatalogd() {
+    PartitionMetaSummary summary = new PartitionMetaSummary("test_db.test_tbl", true,
+        true, false);
+    summary.update(true, false, "p=1", 123, 100, 10);
+    summary.update(true, false, "p=4", 123, 100, 10);
+    summary.update(true, false, "p=3", 123, 100, 10);
+    summary.update(true, false, "p=5", 123, 100, 10);
+    summary.update(true, false, "p=2", 123, 100, 10);
+    assertEquals("Collected 5 partition update(s): " +
+        "1:HDFS_PARTITION:test_db.test_tbl:(p=1,p=2,...,p=5), version=123, " +
+        "original size=(avg=100, min=100, max=100, sum=500), " +
+        "compressed size=(avg=10, min=10, max=10, sum=50)", summary.toString());
+  }
+
+  @Test
+  public void testSorting2InCatalogd() {
+    PartitionMetaSummary summary = new PartitionMetaSummary("test_db.test_tbl", true,
+        false, true);
+    summary.update(false, false, "p=5", 123, 100, 10);
+    summary.update(false, false, "p=4", 123, 100, 10);
+    summary.update(false, false, "p=3", 123, 100, 10);
+    summary.update(false, false, "p=2", 123, 100, 10);
+    summary.update(false, false, "p=1", 123, 100, 10);
+    assertEquals("Collected 5 partition update(s): " +
+        "2:HDFS_PARTITION:test_db.test_tbl:(p=1,p=2,...,p=5), version=123, " +
+        "original size=(avg=100, min=100, max=100, sum=500), " +
+        "compressed size=(avg=10, min=10, max=10, sum=50)", summary.toString());
+  }
+
+  @Test
+  public void testMixModeInCatalogd() {
+    PartitionMetaSummary summary = new PartitionMetaSummary("test_db.test_tbl", true,
+        true, true);
+    summary.update(true, false, "p=2", 123, 100, 10);
+    summary.update(false, false, "p=2", 123, 10, 10);
+    summary.update(true, false, "p=4", 123, 100, 10);
+    summary.update(false, false, "p=4", 123, 10, 10);
+    summary.update(true, false, "p=3", 123, 100, 10);
+    summary.update(false, false, "p=3", 123, 10, 10);
+    summary.update(true, true, "p=0", 121, 100, 10);
+    summary.update(false, true, "p=0", 121, 10, 10);
+    summary.update(true, true, "p=1", 122, 100, 10);
+    summary.update(false, true, "p=1", 122, 10, 10);
+    assertEquals("Collected 3 partition update(s): " +
+        "1:HDFS_PARTITION:test_db.test_tbl:(p=2,p=3,p=4), version=123, " +
+        "original size=(avg=100, min=100, max=100, sum=300), " +
+        "compressed size=(avg=10, min=10, max=10, sum=30)\n" +
+        "Collected 3 partition update(s): " +
+        "2:HDFS_PARTITION:test_db.test_tbl:(p=2,p=3,p=4), version=123, " +
+        "original size=(avg=10, min=10, max=10, sum=30), " +
+        "compressed size=(avg=10, min=10, max=10, sum=30)\n" +
+        "Collected 2 partition deletion(s): " +
+        "1:HDFS_PARTITION:test_db.test_tbl:(p=0,p=1), versions=[121, 122], " +
+        "original size=(avg=100, min=100, max=100, sum=200), " +
+        "compressed size=(avg=10, min=10, max=10, sum=20)\n" +
+        "Collected 2 partition deletion(s): " +
+        "2:HDFS_PARTITION:test_db.test_tbl:(p=0,p=1), versions=[121, 122], " +
+        "original size=(avg=10, min=10, max=10, sum=20), " +
+        "compressed size=(avg=10, min=10, max=10, sum=20)", summary.toString());
+  }
+
+  @Test
+  public void testUpdatesAndDeletesInCatalogd() {
+    PartitionMetaSummary summary = new PartitionMetaSummary("test_db.test_tbl", true,
+        true, false);
+    summary.update(true, false, "p=1", 123, 100, 10);
+    summary.update(true, false, "p=2", 123, 100, 10);
+    summary.update(true, false, "p=3", 123, 100, 10);
+    summary.update(true, false, "p=4", 123, 100, 10);
+    summary.update(true, false, "p=5", 123, 100, 10);
+    summary.update(true, true, "p=0", 200, 100, 10);
+    assertEquals("Collected 5 partition update(s): " +
+        "1:HDFS_PARTITION:test_db.test_tbl:(p=1,p=2,...,p=5), version=123, " +
+        "original size=(avg=100, min=100, max=100, sum=500), " +
+        "compressed size=(avg=10, min=10, max=10, sum=50)\n" +
+        "Collected 1 partition deletion(s): 1:HDFS_PARTITION:test_db.test_tbl:p=0, " +
+        "version=200, original size=100, compressed size=10", summary.toString());
+  }
+
+  @Test
+  public void testSortingInImpalad() {
+    PartitionMetaSummary summary = new PartitionMetaSummary("test_db.test_tbl", false,
+        true, false);
+    summary.update(true, false, "p=1", 123, 100, 10);
+    summary.update(true, false, "p=2", 123, 100, 10);
+    summary.update(true, false, "p=4", 123, 100, 10);
+    summary.update(true, false, "p=3", 123, 100, 10);
+    summary.update(true, true, "p=0", 200, 100, 10);
+    assertEquals("Adding 4 partition(s): " +
+        "HDFS_PARTITION:test_db.test_tbl:(p=1,p=2,...,p=4), version=123, " +
+        "size=(avg=100, min=100, max=100, sum=400)\n" +
+        "Deleting 1 partition(s): HDFS_PARTITION:test_db.test_tbl:p=0, " +
+        "version=200, size=100", summary.toString());
+  }
+}