Add new metrics to record ZNRecord compression count. (#1943)

This PR determines if a ZK write request is compressed by calling GZipCompressionUtil. This is an indirect method and can be inaccurate. So the decision is based on trade-offs.

Alternatively, the ZkClientMonitor can be passed into the serializer class and then report compressed write internally. However, this will require multiple changes in the serializer interfaces.
Due to the multiple layers (PathBasedZkSerializer, ZkSerializer) of serializer interfaces definition, it would be very costly to implement the alternative without major refactoring.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
index 67638a6..fcc94e9 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java
@@ -19,12 +19,15 @@
  * under the License.
  */
 
+import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
@@ -32,10 +35,13 @@
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor;
 import org.apache.helix.zookeeper.zkclient.serialize.BytesPushThroughSerializer;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -133,6 +139,18 @@
     Assert.assertTrue(compressedPaths.contains(idealstatePath));
     Assert.assertTrue(compressedPaths.contains(externalViewPath));
 
+    // Validate the compressed ZK nodes count == external view nodes
+    MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+    ObjectName name =
+        MBeanRegistrar.buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
+            InstanceType.CONTROLLER.name(), ZkClientMonitor.MONITOR_KEY,
+            clusterName + "." + controller.getInstanceName());
+    // The controller ZkClient only writes one compressed node, which is the External View node.
+    long compressCount = (long) beanServer.getAttribute(name, "CompressedZnodeWriteCounter");
+    // Note since external view node is updated in every controller pipeline, there would be multiple compressed writes.
+    // However, the total count won't exceed the external view node version (starts from 0).
+    Assert.assertTrue(compressCount >= 1 && compressCount <= zkClient.getStat(externalViewPath).getVersion() + 1);
+
     // clean up
     controller.syncStop();
     for (int i = 0; i < 5; i++) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index cbbdbbd..be72920 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -33,12 +33,12 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.management.JMException;
-
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
 import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.helix.zookeeper.datamodel.SessionAwareZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
+import org.apache.helix.zookeeper.util.GZipCompressionUtil;
 import org.apache.helix.zookeeper.util.ZNRecordUtil;
 import org.apache.helix.zookeeper.zkclient.annotation.PreFetchChangedData;
 import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallMonitorContext;
@@ -1907,12 +1907,8 @@
     try {
       final byte[] data = serialize(datat, path);
       checkDataSizeLimit(path, data);
-      final Stat stat = (Stat) retryUntilConnected(new Callable<Object>() {
-        @Override
-        public Object call() throws Exception {
-          return getConnection().writeDataReturnStat(path, data, expectedVersion);
-        }
-      });
+      final Stat stat = (Stat) retryUntilConnected(
+          (Callable<Object>) () -> getConnection().writeDataReturnStat(path, data, expectedVersion));
       record(path, data, startT, ZkClientMonitor.AccessType.WRITE);
       return stat;
     } catch (Exception e) {
@@ -1945,19 +1941,17 @@
   }
 
   private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode,
-      final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb,
-      final String expectedSessionId) {
+      final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb, final String expectedSessionId) {
     try {
       retryUntilConnected(() -> {
-        getExpectedZookeeper(expectedSessionId)
-            .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb,
-                new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false) {
-                  @Override
-                  protected void doRetry() {
-                    doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb,
-                        expectedSessionId);
-                  }
-                });
+        getExpectedZookeeper(expectedSessionId).create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb,
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false,
+                GZipCompressionUtil.isCompressed(data)) {
+              @Override
+              protected void doRetry() {
+                doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb, expectedSessionId);
+              }
+            });
         return null;
       });
     } catch (RuntimeException e) {
@@ -1988,12 +1982,11 @@
     try {
       retryUntilConnected(() -> {
         getExpectedZookeeper(expectedSessionId).setData(path, data, version, cb,
-            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT,
-                data == null ? 0 : data.length, false) {
+            new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, data == null ? 0 : data.length,
+                false, GZipCompressionUtil.isCompressed(data)) {
               @Override
               protected void doRetry() {
-                doAsyncSetData(path, data, version, System.currentTimeMillis(), cb,
-                    expectedSessionId);
+                doAsyncSetData(path, data, version, System.currentTimeMillis(), cb, expectedSessionId);
               }
             });
         return null;
@@ -2447,6 +2440,10 @@
     if (_monitor != null) {
       int dataSize = (data != null) ? data.length : 0;
       _monitor.record(path, dataSize, startTimeMilliSec, accessType);
+
+      if (GZipCompressionUtil.isCompressed(data)) {
+        _monitor.increaseZnodeCompressCounter();
+      }
     }
   }
 
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
index ce691e9..22ae047 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncCallMonitorContext.java
@@ -25,6 +25,7 @@
   private final long _startTimeMilliSec;
   private final ZkClientMonitor _monitor;
   private final boolean _isRead;
+  private final boolean _isCompressed;
   private int _bytes;
 
   /**
@@ -32,15 +33,23 @@
    * @param startTimeMilliSec Operation initialization time.
    * @param bytes             The data size in bytes that is involved in the operation.
    * @param isRead            True if the operation is readonly.
+   * @param isCompressed      True if the data is compressed.
    */
   public ZkAsyncCallMonitorContext(final ZkClientMonitor monitor, long startTimeMilliSec, int bytes,
-      boolean isRead) {
+      boolean isRead, boolean isCompressed) {
     _monitor = monitor;
     _startTimeMilliSec = startTimeMilliSec;
     _bytes = bytes;
     _isRead = isRead;
+    _isCompressed = isCompressed;
   }
 
+  public ZkAsyncCallMonitorContext(final ZkClientMonitor monitor, long startTimeMilliSec, int bytes,
+      boolean isRead) {
+    this(monitor, startTimeMilliSec, bytes, isRead, false);
+  }
+
+
   /**
    * Update the operated data size in bytes.
    * @param bytes
@@ -59,6 +68,9 @@
         _monitor.recordAsync(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.READ);
       } else {
         _monitor.recordAsync(path, _bytes, _startTimeMilliSec, ZkClientMonitor.AccessType.WRITE);
+        if (_isCompressed) {
+          _monitor.increaseZnodeCompressCounter();
+        }
       }
     }
   }
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
index d444dda..6ac6726 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/callback/ZkAsyncRetryCallContext.java
@@ -36,15 +36,22 @@
    * @param startTimeMilliSec Operation initialization time.
    * @param bytes             The data size in bytes that is involved in the operation.
    * @param isRead            True if the operation is readonly.
+   * @param isCompressed      True if the data is compressed.
    */
   public ZkAsyncRetryCallContext(final ZkAsyncRetryThread retryThread,
       final CancellableZkAsyncCallback callback, final ZkClientMonitor monitor,
-      long startTimeMilliSec, int bytes, boolean isRead) {
-    super(monitor, startTimeMilliSec, bytes, isRead);
+      long startTimeMilliSec, int bytes, boolean isRead, boolean isCompressed) {
+    super(monitor, startTimeMilliSec, bytes, isRead, isCompressed);
     _retryThread = retryThread;
     _cancellableCallback = callback;
   }
 
+  public ZkAsyncRetryCallContext(final ZkAsyncRetryThread retryThread,
+      final CancellableZkAsyncCallback callback, final ZkClientMonitor monitor,
+      long startTimeMilliSec, int bytes, boolean isRead) {
+    this(retryThread, callback, monitor, startTimeMilliSec, bytes, isRead, false);
+  }
+
   /**
    * Request a retry.
    *
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
index 4ce298f..1f71e42 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/metric/ZkClientMonitor.java
@@ -38,8 +38,6 @@
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 import org.apache.helix.monitoring.mbeans.exception.MetricException;
 import org.apache.helix.zookeeper.zkclient.ZkEventThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class ZkClientMonitor extends DynamicMBeanProvider {
@@ -62,6 +60,7 @@
   private SimpleDynamicMetric<Long> _expiredSessionCounter;
   private SimpleDynamicMetric<Long> _dataChangeEventCounter;
   private SimpleDynamicMetric<Long> _outstandingRequestGauge;
+  private SimpleDynamicMetric<Long> _znodeCompressCounter;
 
   private ZkThreadMetric _zkEventThreadMetric;
 
@@ -86,6 +85,7 @@
     _expiredSessionCounter = new SimpleDynamicMetric("ExpiredSessionCounter", 0l);
     _dataChangeEventCounter = new SimpleDynamicMetric("DataChangeEventCounter", 0l);
     _outstandingRequestGauge = new SimpleDynamicMetric("OutstandingRequestGauge", 0l);
+    _znodeCompressCounter = new SimpleDynamicMetric("CompressedZnodeWriteCounter", 0l);
 
     if (zkEventThread != null) {
       boolean result = setAndInitZkEventThreadMonitor(zkEventThread);
@@ -128,6 +128,7 @@
     attributeList.add(_outstandingRequestGauge);
     attributeList.add(_stateChangeEventCounter);
     attributeList.add(_expiredSessionCounter);
+    attributeList.add(_znodeCompressCounter);
     if (_zkEventThreadMetric != null) {
       attributeList.add(_zkEventThreadMetric);
     }
@@ -190,6 +191,12 @@
     }
   }
 
+  public void increaseZnodeCompressCounter() {
+    synchronized (_znodeCompressCounter) {
+      _znodeCompressCounter.updateValue(_znodeCompressCounter.getValue() + 1);
+    }
+  }
+
   public void recordDataPropagationLatency(String path, long latencyMilliSec) {
     if (null == path) {
       return;