HELIX-708: adding basic metrics to HelixViewAggregator

RB=1237803
BUG=HELIX-708
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=jjwang
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
index 7afefc6..f3da9d9 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
@@ -20,6 +20,7 @@
  */
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -39,6 +40,7 @@
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.view.common.ViewAggregatorEventAttributes;
 import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.monitoring.ViewAggregatorMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +65,7 @@
   private ClusterConfig _curViewClusterConfig;
   private Timer _viewClusterRefreshTimer;
   private ViewClusterRefresher _viewClusterRefresher;
+  private ViewAggregatorMonitor _monitor;
 
   public HelixViewAggregator(String viewClusterName, String zkAddr) {
     _viewClusterName = viewClusterName;
@@ -70,11 +73,13 @@
     _viewClusterManager = HelixManagerFactory
         .getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
             InstanceType.SPECTATOR, zkAddr);
-    _refreshViewCluster = false;
+    _refreshViewCluster = new AtomicBoolean(false);
+    _monitor = new ViewAggregatorMonitor(viewClusterName);
     _aggregator = new ClusterEventProcessor(_viewClusterName, "Aggregator") {
       @Override
       public void handleEvent(ClusterEvent event) {
         handleSourceClusterEvent(event);
+        _monitor.recordProcessedSourceEvent();
       }
     };
 
@@ -91,6 +96,8 @@
    * @throws Exception
    */
   public void start() throws Exception {
+    _monitor.register();
+
     // Start workers
     _aggregator.start();
     _viewConfigProcessor.start();
@@ -111,6 +118,8 @@
   }
 
   public void shutdown() {
+    boolean success = true;
+
     // Stop all workers
     _aggregator.interrupt();
     _viewConfigProcessor.interrupt();
@@ -129,6 +138,7 @@
       } catch (ZkInterruptedException zkintr) {
         logger.warn("ZK interrupted when disconnecting helix manager", zkintr);
       } catch (Exception e) {
+        success = false;
         logger.error(String
             .format("Failed to disconnect helix manager for view cluster %s", _viewClusterName), e);
       }
@@ -141,12 +151,17 @@
       try {
         provider.shutdown();
       } catch (Exception e) {
+        success = false;
         logger.error(String
             .format("Failed to shutdown data provider %s for view cluster %s", provider.getName(),
                 _viewClusterName), e);
       }
     }
-    logger.info("HelixViewAggregator shutdown cleanly");
+
+    logger.info("Unregistering monitor.");
+    _monitor.unregister();
+
+    logger.info("HelixViewAggregator shutdown " + (success ? "cleanly" : "with error"));
   }
 
   @Override
@@ -207,7 +222,8 @@
       action.computeAction();
 
       // If we fail to process action and should retry, re-queue event to retry
-      if (processViewClusterConfigUpdate(action)) {
+      if (!processViewClusterConfigUpdate(action)) {
+        _monitor.recordViewConfigProcessFailure();
         event.addAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name(), true);
         _viewConfigProcessor.queueEvent(event);
       } else {
@@ -244,10 +260,10 @@
    * Use SourceClusterConfigChangeAction to reset timer (RefreshViewClusterTask),
    * create/delete SourceClusterDataProvider in data provider map
    *
-   * @return true if action failed and should retry, else false
+   * @return true if success else false
    */
   private boolean processViewClusterConfigUpdate(SourceClusterConfigChangeAction action) {
-    boolean shouldRetry = false;
+    boolean success = true;
     for (ViewClusterSourceConfig source : action.getConfigsToDelete()) {
       String key = generateDataProviderMapKey(source);
       logger.info("Deleting data provider " + key);
@@ -256,7 +272,7 @@
           _dataProviderMap.get(key).shutdown();
           _dataProviderMap.remove(key);
         } catch (Exception e) {
-          shouldRetry = true;
+          success = false;
           logger.warn(String.format("Failed to shutdown data provider %s, will retry", key));
         }
       }
@@ -276,7 +292,7 @@
         provider.setup();
         _dataProviderMap.put(key, provider);
       } catch (Exception e) {
-        shouldRetry = true;
+        success = false;
         logger.warn(String.format("Failed to create data provider %s, will retry", key));
       }
     }
@@ -286,16 +302,85 @@
           "Resetting view cluster refresh timer at interval " + action.getCurrentRefreshPeriodMs());
       resetTimer(action.getCurrentRefreshPeriodMs());
     }
-    return shouldRetry;
+    return success;
   }
 
   /**
    * Use ViewClusterRefresher to refresh ViewCluster.
    * @return true if needs retry, else false
    */
-  private synchronized boolean refreshViewCluster() {
-    // TODO: Implement refresh logic
-    return false;
+  private void refreshViewCluster() {
+    long startRefreshMs = System.currentTimeMillis();
+    logger.info(String.format("START RefreshViewCluster: Refresh view cluster %s at timestamp %s",
+        _viewClusterName, startRefreshMs));
+    boolean dataProviderFailure = false;
+    boolean viewClusterFailure = false;
+
+    // Generate a view of providers so refresh won't block cluster config update
+    // When a data provider is shutdown while we are reloading cache / generating diff,
+    // Exception will be thrown out and we retry during next refresh cycle
+    Set<SourceClusterDataProvider> providerView;
+    synchronized (_dataProviderMap) {
+      _refreshViewCluster.set(false);
+      providerView = new HashSet<>(_dataProviderMap.values());
+    }
+
+    // Refresh data providers
+    // TODO: the following steps can be parallelized
+    for (SourceClusterDataProvider provider : providerView) {
+      try {
+        provider.refreshCache();
+      } catch (Exception e) {
+        logger.warn("Caught exception when refreshing source cluster cache. Abort refresh.", e);
+        _refreshViewCluster.set(true);
+        dataProviderFailure = true;
+
+        // Skip refresh view cluster when we cannot successfully refresh
+        // source cluster caches
+        break;
+      }
+    }
+
+    // Refresh properties in view cluster
+    if (!dataProviderFailure) {
+      _viewClusterRefresher.updateProviderView(providerView);
+      for (PropertyType propertyType : ViewClusterSourceConfig.getValidPropertyTypes()) {
+        logger.info(String
+            .format("Refreshing property %s in view cluster %s", propertyType, _viewClusterName));
+        try {
+          // We try to refresh all properties with best effort, and don't break when
+          // failed to refresh a particular property
+          if (!_viewClusterRefresher.refreshPropertiesInViewCluster(propertyType)) {
+            viewClusterFailure = true;
+            _refreshViewCluster.set(true);
+          }
+        } catch (IllegalArgumentException e) {
+          // Invalid property... not expected! Something wrong with code, should not retry
+          logger.error(String.format("Failed to refresh property in view cluster %s with exception",
+              _viewClusterName), e);
+        }
+      }
+    }
+
+    recordRefreshResults(dataProviderFailure, viewClusterFailure,
+        System.currentTimeMillis() - startRefreshMs);
+  }
+
+  private void recordRefreshResults(boolean recordSourceFailure, boolean recordViewFailure,
+      long latency) {
+    if (recordSourceFailure) {
+      _monitor.recordReadSourceFailure();
+    }
+
+    if (recordViewFailure) {
+      _monitor.recordViewRefreshFailure();
+    }
+
+    _monitor.recordRefreshViewLatency(latency);
+
+    logger.info(String
+        .format("END RefreshViewCluster: finished refresh %s. Time spent: %s ms. Success: %s",
+            _viewClusterName, latency, !recordSourceFailure && !recordViewFailure));
   }
 
   private static String generateHelixManagerInstanceName(String viewClusterName) {
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/monitoring/ViewAggregatorMonitor.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/monitoring/ViewAggregatorMonitor.java
new file mode 100644
index 0000000..bd5198a
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/monitoring/ViewAggregatorMonitor.java
@@ -0,0 +1,89 @@
+package org.apache.helix.view.monitoring;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
+public class ViewAggregatorMonitor extends DynamicMBeanProvider {
+  /* package */ static final String MBEAN_DOMAIN = MonitorDomainNames.ClusterStatus.name();
+  /* package */ static final String MONITOR_KEY = "ViewClusterName";
+  private static final String MBEAN_DESCRIPTION = "Monitor helix view aggregator activity";
+  private final String _clusterName;
+  private final String _sensorName;
+
+  // Counters
+  private final SimpleDynamicMetric<Long> _refreshViewFailureCounter;
+  private final SimpleDynamicMetric<Long> _sourceReadFailureCounter;
+  private final SimpleDynamicMetric<Long> _processViewConfigFailureCounter;
+  private final SimpleDynamicMetric<Long> _processedSourceClusterEventCounter;
+
+  // Gauges
+  private final HistogramDynamicMetric _viewRefreshLatencyGauge;
+
+  public ViewAggregatorMonitor(String clusterName) {
+    _clusterName = clusterName;
+    _sensorName = String.format("%s.%s.%s", MBEAN_DOMAIN, MONITOR_KEY, clusterName);
+
+    // Initialize metrics
+    _refreshViewFailureCounter =
+        new SimpleDynamicMetric<>("ViewClusterRefreshFailureCounter", 0L);
+    _sourceReadFailureCounter =
+        new SimpleDynamicMetric<>("SourceClusterRefreshFailureCounter", 0L);
+    _processedSourceClusterEventCounter =
+        new SimpleDynamicMetric<>("ProcessedSourceClusterEventCounter", 0L);
+    _processViewConfigFailureCounter =
+        new SimpleDynamicMetric<>("ProcessViewConfigFailureCounter", 0L);
+    _viewRefreshLatencyGauge = new HistogramDynamicMetric("ViewClusterRefreshDurationGauge",
+        new Histogram(
+            new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+  }
+
+  public void recordViewRefreshFailure() {
+    _refreshViewFailureCounter.updateValue(_refreshViewFailureCounter.getValue() + 1);
+  }
+
+  public void recordViewConfigProcessFailure() {
+    _processViewConfigFailureCounter.updateValue(_processViewConfigFailureCounter.getValue() + 1);
+  }
+
+  public void recordReadSourceFailure() {
+    _sourceReadFailureCounter.updateValue(_sourceReadFailureCounter.getValue() + 1);
+  }
+
+  public void recordProcessedSourceEvent() {
+    _processedSourceClusterEventCounter
+        .updateValue(_processedSourceClusterEventCounter.getValue() + 1);
+  }
+
+  public void recordRefreshViewLatency(long latency) {
+    _viewRefreshLatencyGauge.updateValue(latency);
+  }
+
+  @Override
+  public String getSensorName() {
+    return _sensorName;
+  }
+
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_sourceReadFailureCounter);
+    attributeList.add(_refreshViewFailureCounter);
+    attributeList.add(_processViewConfigFailureCounter);
+    attributeList.add(_processedSourceClusterEventCounter);
+    attributeList.add(_viewRefreshLatencyGauge);
+
+    doRegister(attributeList, MBEAN_DESCRIPTION, MBeanRegistrar
+        .buildObjectName(MBEAN_DOMAIN, MONITOR_KEY, _clusterName));
+    return this;
+  }
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/monitoring/TestViewAggregatorMonitor.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/monitoring/TestViewAggregatorMonitor.java
new file mode 100644
index 0000000..121d4c3
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/monitoring/TestViewAggregatorMonitor.java
@@ -0,0 +1,92 @@
+package org.apache.helix.view.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestViewAggregatorMonitor {
+  private static final MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer();
+
+  @Test
+  public void testViewAggregatorMonitorMBeanRegistration() throws Exception {
+    String cluster1 = "cluster1";
+    String cluster2 = "cluster2";
+    ObjectName name1 = generateObjectName(cluster1);
+    ObjectName name2 = generateObjectName(cluster2);
+
+    ViewAggregatorMonitor monitor1 = new ViewAggregatorMonitor(cluster1);
+    monitor1.register();
+
+    ViewAggregatorMonitor monitor2 = new ViewAggregatorMonitor(cluster2);
+    monitor2.register();
+
+    Assert.assertTrue(_beanServer.isRegistered(name1));
+    Assert.assertTrue(_beanServer.isRegistered(name2));
+
+    monitor1.unregister();
+    monitor2.unregister();
+
+    Assert.assertFalse(_beanServer.isRegistered(name1));
+    Assert.assertFalse(_beanServer.isRegistered(name2));
+  }
+
+  @Test
+  public void testViewAggregatorMonitorDataRecording() throws Exception {
+    String cluster = "testViewCluster";
+    ViewAggregatorMonitor monitor = new ViewAggregatorMonitor(cluster);
+    monitor.register();
+    ObjectName objectName = generateObjectName(cluster);
+
+    monitor.recordProcessedSourceEvent();
+    monitor.recordViewConfigProcessFailure();
+    monitor.recordViewRefreshFailure();
+    monitor.recordReadSourceFailure();
+    monitor.recordRefreshViewLatency(100);
+
+    Assert.assertEquals(
+        (long) _beanServer.getAttribute(objectName, "ViewClusterRefreshFailureCounter"), 1);
+    Assert.assertEquals(
+        (long) _beanServer.getAttribute(objectName, "SourceClusterRefreshFailureCounter"), 1);
+    Assert.assertEquals(
+        (long) _beanServer.getAttribute(objectName, "ProcessedSourceClusterEventCounter"), 1);
+    Assert.assertEquals(
+        (long) _beanServer.getAttribute(objectName, "ProcessViewConfigFailureCounter"), 1);
+    Assert.assertEquals(
+        (long) _beanServer.getAttribute(objectName, "ViewClusterRefreshDurationGauge.Max"), 100);
+    Assert
+        .assertEquals(_beanServer.getAttribute(objectName, "ViewClusterRefreshDurationGauge.Mean"),
+            100.0);
+    Assert.assertEquals(
+        _beanServer.getAttribute(objectName, "ViewClusterRefreshDurationGauge.StdDev"), 0.0);
+  }
+
+  private ObjectName generateObjectName(String viewClusterName) throws JMException {
+    return MBeanRegistrar
+        .buildObjectName(ViewAggregatorMonitor.MBEAN_DOMAIN, ViewAggregatorMonitor.MONITOR_KEY,
+            viewClusterName);
+  }
+
+}