HELIX-708: adding basic metrics to HelixViewAggregator
RB=1237803
BUG=HELIX-708
G=helix-reviewers
R=lxia,jjwang,jxue,erkim
A=jjwang
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
index 7afefc6..f3da9d9 100644
--- a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
@@ -20,6 +20,7 @@
*/
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -39,6 +40,7 @@
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.view.common.ViewAggregatorEventAttributes;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.monitoring.ViewAggregatorMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +65,7 @@
private ClusterConfig _curViewClusterConfig;
private Timer _viewClusterRefreshTimer;
private ViewClusterRefresher _viewClusterRefresher;
+ private ViewAggregatorMonitor _monitor;
public HelixViewAggregator(String viewClusterName, String zkAddr) {
_viewClusterName = viewClusterName;
@@ -70,11 +73,13 @@
_viewClusterManager = HelixManagerFactory
.getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
InstanceType.SPECTATOR, zkAddr);
- _refreshViewCluster = false;
+ _refreshViewCluster = new AtomicBoolean(false);
+ _monitor = new ViewAggregatorMonitor(viewClusterName);
_aggregator = new ClusterEventProcessor(_viewClusterName, "Aggregator") {
@Override
public void handleEvent(ClusterEvent event) {
handleSourceClusterEvent(event);
+ _monitor.recordProcessedSourceEvent();
}
};
@@ -91,6 +96,8 @@
* @throws Exception
*/
public void start() throws Exception {
+ _monitor.register();
+
// Start workers
_aggregator.start();
_viewConfigProcessor.start();
@@ -111,6 +118,8 @@
}
public void shutdown() {
+ boolean success = true;
+
// Stop all workers
_aggregator.interrupt();
_viewConfigProcessor.interrupt();
@@ -129,6 +138,7 @@
} catch (ZkInterruptedException zkintr) {
logger.warn("ZK interrupted when disconnecting helix manager", zkintr);
} catch (Exception e) {
+ success = false;
logger.error(String
.format("Failed to disconnect helix manager for view cluster %s", _viewClusterName), e);
}
@@ -141,12 +151,17 @@
try {
provider.shutdown();
} catch (Exception e) {
+ success = false;
logger.error(String
.format("Failed to shutdown data provider %s for view cluster %s", provider.getName(),
_viewClusterName), e);
}
}
- logger.info("HelixViewAggregator shutdown cleanly");
+
+ logger.info("Unregistering monitor.");
+ _monitor.unregister();
+
+ logger.info("HelixViewAggregator shutdown " + (success ? "cleanly" : "with error"));
}
@Override
@@ -207,7 +222,8 @@
action.computeAction();
// If we fail to process action and should retry, re-queue event to retry
- if (processViewClusterConfigUpdate(action)) {
+ if (!processViewClusterConfigUpdate(action)) {
+ _monitor.recordViewConfigProcessFailure();
event.addAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name(), true);
_viewConfigProcessor.queueEvent(event);
} else {
@@ -244,10 +260,10 @@
* Use SourceClusterConfigChangeAction to reset timer (RefreshViewClusterTask),
* create/delete SourceClusterDataProvider in data provider map
*
- * @return true if action failed and should retry, else false
+ * @return true if success else false
*/
private boolean processViewClusterConfigUpdate(SourceClusterConfigChangeAction action) {
- boolean shouldRetry = false;
+ boolean success = true;
for (ViewClusterSourceConfig source : action.getConfigsToDelete()) {
String key = generateDataProviderMapKey(source);
logger.info("Deleting data provider " + key);
@@ -256,7 +272,7 @@
_dataProviderMap.get(key).shutdown();
_dataProviderMap.remove(key);
} catch (Exception e) {
- shouldRetry = true;
+ success = false;
logger.warn(String.format("Failed to shutdown data provider %s, will retry", key));
}
}
@@ -276,7 +292,7 @@
provider.setup();
_dataProviderMap.put(key, provider);
} catch (Exception e) {
- shouldRetry = true;
+ success = false;
logger.warn(String.format("Failed to create data provider %s, will retry", key));
}
}
@@ -286,16 +302,85 @@
"Resetting view cluster refresh timer at interval " + action.getCurrentRefreshPeriodMs());
resetTimer(action.getCurrentRefreshPeriodMs());
}
- return shouldRetry;
+ return success;
}
/**
* Use ViewClusterRefresher to refresh ViewCluster.
* @return true if needs retry, else false
*/
- private synchronized boolean refreshViewCluster() {
- // TODO: Implement refresh logic
- return false;
+ private void refreshViewCluster() {
+ long startRefreshMs = System.currentTimeMillis();
+ logger.info(String.format("START RefreshViewCluster: Refresh view cluster %s at timestamp %s",
+ _viewClusterName, startRefreshMs));
+ boolean dataProviderFailure = false;
+ boolean viewClusterFailure = false;
+
+ // Generate a view of providers so refresh won't block cluster config update
+ // When a data provider is shutdown while we are reloading cache / generating diff,
+ // Exception will be thrown out and we retry during next refresh cycle
+ Set<SourceClusterDataProvider> providerView;
+ synchronized (_dataProviderMap) {
+ _refreshViewCluster.set(false);
+ providerView = new HashSet<>(_dataProviderMap.values());
+ }
+
+ // Refresh data providers
+ // TODO: the following steps can be parallelized
+ for (SourceClusterDataProvider provider : providerView) {
+ try {
+ provider.refreshCache();
+ } catch (Exception e) {
+ logger.warn("Caught exception when refreshing source cluster cache. Abort refresh.", e);
+ _refreshViewCluster.set(true);
+ dataProviderFailure = true;
+
+ // Skip refresh view cluster when we cannot successfully refresh
+ // source cluster caches
+ break;
+ }
+ }
+
+ // Refresh properties in view cluster
+ if (!dataProviderFailure) {
+ _viewClusterRefresher.updateProviderView(providerView);
+ for (PropertyType propertyType : ViewClusterSourceConfig.getValidPropertyTypes()) {
+ logger.info(String
+ .format("Refreshing property %s in view cluster %s", propertyType, _viewClusterName));
+ try {
+ // We try to refresh all properties with best effort, and don't break when
+ // failed to refresh a particular property
+ if (!_viewClusterRefresher.refreshPropertiesInViewCluster(propertyType)) {
+ viewClusterFailure = true;
+ _refreshViewCluster.set(true);
+ }
+ } catch (IllegalArgumentException e) {
+ // Invalid property... not expected! Something wrong with code, should not retry
+ logger.error(String.format("Failed to refresh property in view cluster %s with exception",
+ _viewClusterName), e);
+ }
+ }
+ }
+
+ recordRefreshResults(dataProviderFailure, viewClusterFailure,
+ System.currentTimeMillis() - startRefreshMs);
+ }
+
+ private void recordRefreshResults(boolean recordSourceFailure, boolean recordViewFailure,
+ long latency) {
+ if (recordSourceFailure) {
+ _monitor.recordReadSourceFailure();
+ }
+
+ if (recordViewFailure) {
+ _monitor.recordViewRefreshFailure();
+ }
+
+ _monitor.recordRefreshViewLatency(latency);
+
+ logger.info(String
+ .format("END RefreshViewCluster: finished refresh %s. Time spent: %s ms. Success: %s",
+ _viewClusterName, latency, !recordSourceFailure && !recordViewFailure));
}
private static String generateHelixManagerInstanceName(String viewClusterName) {
diff --git a/helix-view-aggregator/src/main/java/org/apache/helix/view/monitoring/ViewAggregatorMonitor.java b/helix-view-aggregator/src/main/java/org/apache/helix/view/monitoring/ViewAggregatorMonitor.java
new file mode 100644
index 0000000..bd5198a
--- /dev/null
+++ b/helix-view-aggregator/src/main/java/org/apache/helix/view/monitoring/ViewAggregatorMonitor.java
@@ -0,0 +1,89 @@
+package org.apache.helix.view.monitoring;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
+import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
+public class ViewAggregatorMonitor extends DynamicMBeanProvider {
+ /* package */ static final String MBEAN_DOMAIN = MonitorDomainNames.ClusterStatus.name();
+ /* package */ static final String MONITOR_KEY = "ViewClusterName";
+ private static final String MBEAN_DESCRIPTION = "Monitor helix view aggregator activity";
+ private final String _clusterName;
+ private final String _sensorName;
+
+ // Counters
+ private final SimpleDynamicMetric<Long> _refreshViewFailureCounter;
+ private final SimpleDynamicMetric<Long> _sourceReadFailureCounter;
+ private final SimpleDynamicMetric<Long> _processViewConfigFailureCounter;
+ private final SimpleDynamicMetric<Long> _processedSourceClusterEventCounter;
+
+ // Gauges
+ private final HistogramDynamicMetric _viewRefreshLatencyGauge;
+
+ public ViewAggregatorMonitor(String clusterName) {
+ _clusterName = clusterName;
+ _sensorName = String.format("%s.%s.%s", MBEAN_DOMAIN, MONITOR_KEY, clusterName);
+
+ // Initialize metrics
+ _refreshViewFailureCounter =
+ new SimpleDynamicMetric<>("ViewClusterRefreshFailureCounter", 0L);
+ _sourceReadFailureCounter =
+ new SimpleDynamicMetric<>("SourceClusterRefreshFailureCounter", 0L);
+ _processedSourceClusterEventCounter =
+ new SimpleDynamicMetric<>("ProcessedSourceClusterEventCounter", 0L);
+ _processViewConfigFailureCounter =
+ new SimpleDynamicMetric<>("ProcessViewConfigFailureCounter", 0L);
+ _viewRefreshLatencyGauge = new HistogramDynamicMetric("ViewClusterRefreshDurationGauge",
+ new Histogram(
+ new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+ }
+
+ public void recordViewRefreshFailure() {
+ _refreshViewFailureCounter.updateValue(_refreshViewFailureCounter.getValue() + 1);
+ }
+
+ public void recordViewConfigProcessFailure() {
+ _processViewConfigFailureCounter.updateValue(_processViewConfigFailureCounter.getValue() + 1);
+ }
+
+ public void recordReadSourceFailure() {
+ _sourceReadFailureCounter.updateValue(_sourceReadFailureCounter.getValue() + 1);
+ }
+
+ public void recordProcessedSourceEvent() {
+ _processedSourceClusterEventCounter
+ .updateValue(_processedSourceClusterEventCounter.getValue() + 1);
+ }
+
+ public void recordRefreshViewLatency(long latency) {
+ _viewRefreshLatencyGauge.updateValue(latency);
+ }
+
+ @Override
+ public String getSensorName() {
+ return _sensorName;
+ }
+
+ @Override
+ public DynamicMBeanProvider register() throws JMException {
+ List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+ attributeList.add(_sourceReadFailureCounter);
+ attributeList.add(_refreshViewFailureCounter);
+ attributeList.add(_processViewConfigFailureCounter);
+ attributeList.add(_processedSourceClusterEventCounter);
+ attributeList.add(_viewRefreshLatencyGauge);
+
+ doRegister(attributeList, MBEAN_DESCRIPTION, MBeanRegistrar
+ .buildObjectName(MBEAN_DOMAIN, MONITOR_KEY, _clusterName));
+ return this;
+ }
+}
diff --git a/helix-view-aggregator/src/test/java/org/apache/helix/view/monitoring/TestViewAggregatorMonitor.java b/helix-view-aggregator/src/test/java/org/apache/helix/view/monitoring/TestViewAggregatorMonitor.java
new file mode 100644
index 0000000..121d4c3
--- /dev/null
+++ b/helix-view-aggregator/src/test/java/org/apache/helix/view/monitoring/TestViewAggregatorMonitor.java
@@ -0,0 +1,92 @@
+package org.apache.helix.view.monitoring;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestViewAggregatorMonitor {
+ private static final MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer();
+
+ @Test
+ public void testViewAggregatorMonitorMBeanRegistration() throws Exception {
+ String cluster1 = "cluster1";
+ String cluster2 = "cluster2";
+ ObjectName name1 = generateObjectName(cluster1);
+ ObjectName name2 = generateObjectName(cluster2);
+
+ ViewAggregatorMonitor monitor1 = new ViewAggregatorMonitor(cluster1);
+ monitor1.register();
+
+ ViewAggregatorMonitor monitor2 = new ViewAggregatorMonitor(cluster2);
+ monitor2.register();
+
+ Assert.assertTrue(_beanServer.isRegistered(name1));
+ Assert.assertTrue(_beanServer.isRegistered(name2));
+
+ monitor1.unregister();
+ monitor2.unregister();
+
+ Assert.assertFalse(_beanServer.isRegistered(name1));
+ Assert.assertFalse(_beanServer.isRegistered(name2));
+ }
+
+ @Test
+ public void testViewAggregatorMonitorDataRecording() throws Exception {
+ String cluster = "testViewCluster";
+ ViewAggregatorMonitor monitor = new ViewAggregatorMonitor(cluster);
+ monitor.register();
+ ObjectName objectName = generateObjectName(cluster);
+
+ monitor.recordProcessedSourceEvent();
+ monitor.recordViewConfigProcessFailure();
+ monitor.recordViewRefreshFailure();
+ monitor.recordReadSourceFailure();
+ monitor.recordRefreshViewLatency(100);
+
+ Assert.assertEquals(
+ (long) _beanServer.getAttribute(objectName, "ViewClusterRefreshFailureCounter"), 1);
+ Assert.assertEquals(
+ (long) _beanServer.getAttribute(objectName, "SourceClusterRefreshFailureCounter"), 1);
+ Assert.assertEquals(
+ (long) _beanServer.getAttribute(objectName, "ProcessedSourceClusterEventCounter"), 1);
+ Assert.assertEquals(
+ (long) _beanServer.getAttribute(objectName, "ProcessViewConfigFailureCounter"), 1);
+ Assert.assertEquals(
+ (long) _beanServer.getAttribute(objectName, "ViewClusterRefreshDurationGauge.Max"), 100);
+ Assert
+ .assertEquals(_beanServer.getAttribute(objectName, "ViewClusterRefreshDurationGauge.Mean"),
+ 100.0);
+ Assert.assertEquals(
+ _beanServer.getAttribute(objectName, "ViewClusterRefreshDurationGauge.StdDev"), 0.0);
+ }
+
+ private ObjectName generateObjectName(String viewClusterName) throws JMException {
+ return MBeanRegistrar
+ .buildObjectName(ViewAggregatorMonitor.MBEAN_DOMAIN, ViewAggregatorMonitor.MONITOR_KEY,
+ viewClusterName);
+ }
+
+}