blob: ab122dbf71ff9800fa33f67f4b814db4adecd55f [file] [log] [blame]
package org.apache.helix.view.aggregator;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.common.ClusterEventProcessor;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.view.common.ViewAggregatorEventAttributes;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
import org.apache.helix.view.monitoring.ViewAggregatorMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Main logic for Helix view aggregator
*/
public class HelixViewAggregator implements ClusterConfigChangeListener {
private static final Logger logger = LoggerFactory.getLogger(HelixViewAggregator.class);
private static final int PROCESS_VIEW_CONFIG_CHANGE_BACKOFF_MS = 3 * 1000;
private final String _viewClusterName;
private final HelixManager _viewClusterManager;
private HelixDataAccessor _dataAccessor;
// Worker that processes source cluster events and refresh view cluster
private ClusterEventProcessor _aggregator;
private boolean _refreshViewCluster;
// Worker that processes view cluster config change
private ClusterEventProcessor _viewConfigProcessor;
private Map<String, SourceClusterDataProvider> _dataProviderMap;
private ClusterConfig _curViewClusterConfig;
private Timer _viewClusterRefreshTimer;
private ViewClusterRefresher _viewClusterRefresher;
private ViewAggregatorMonitor _monitor;
public HelixViewAggregator(String viewClusterName, String zkAddr) {
_viewClusterName = viewClusterName;
_dataProviderMap = new HashMap<>();
_viewClusterManager = HelixManagerFactory
.getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
InstanceType.SPECTATOR, zkAddr);
_refreshViewCluster = new AtomicBoolean(false);
_monitor = new ViewAggregatorMonitor(viewClusterName);
_aggregator = new ClusterEventProcessor(_viewClusterName, "Aggregator") {
@Override
public void handleEvent(ClusterEvent event) {
handleSourceClusterEvent(event);
_monitor.recordProcessedSourceEvent();
}
};
_viewConfigProcessor = new ClusterEventProcessor(_viewClusterName, "ViewConfigProcessor") {
@Override
public void handleEvent(ClusterEvent event) {
handleViewClusterConfigChange(event);
}
};
}
public String getAggregatorInstanceName() {
return String
.format("%s::%s", _viewClusterManager.getInstanceName(), hashCode());
}
/**
* Start controller main logic
* @throws Exception
*/
public void start() throws Exception {
_monitor.register();
// Start workers
_aggregator.start();
_viewConfigProcessor.start();
// Setup manager
try {
_viewClusterManager.connect();
_viewClusterManager.addClusterfigChangeListener(this);
_dataAccessor = _viewClusterManager.getHelixDataAccessor();
} catch (Exception e) {
throw new HelixException("Failed to connect view cluster helix manager", e);
}
// Set up view cluster refresher
_viewClusterRefresher =
new ViewClusterRefresher(_viewClusterName, _viewClusterManager.getHelixDataAccessor(),
_dataProviderMap);
}
public void shutdown() {
boolean success = true;
// Stop all workers
_aggregator.interrupt();
_viewConfigProcessor.interrupt();
// Stop timer
if (_viewClusterRefreshTimer != null) {
logger.info("Shutting down view cluster refresh timer");
_viewClusterRefreshTimer.cancel();
}
// disconnect manager
if (_viewClusterManager != null && _viewClusterManager.isConnected()) {
logger.info("Shutting down view cluster helix manager");
try {
_viewClusterManager.disconnect();
} catch (ZkInterruptedException zkintr) {
logger.warn("ZK interrupted when disconnecting helix manager", zkintr);
} catch (Exception e) {
success = false;
logger.error(String
.format("Failed to disconnect helix manager for view cluster %s", _viewClusterName), e);
}
}
// Clean up all data providers
for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
logger
.info(String.format("Shutting data provider for source cluster %s", provider.getName()));
try {
provider.shutdown();
} catch (Exception e) {
success = false;
logger.error(String
.format("Failed to shutdown data provider %s for view cluster %s", provider.getName(),
_viewClusterName), e);
}
}
logger.info("Unregistering monitor.");
_monitor.unregister();
logger.info("HelixViewAggregator shutdown " + (success ? "cleanly" : "with error"));
}
@Override
@PreFetch(enabled = false)
public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) {
if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
_viewConfigProcessor
.queueEvent(new ClusterEvent(_viewClusterName, ClusterEventType.ClusterConfigChange));
} else {
logger.info(String
.format("Skip processing view cluster config change with notification context type %s",
context == null ? "NoContext" : context.getType().name()));
}
}
private void handleSourceClusterEvent(ClusterEvent event) {
logger.info("Processing event from source cluster " + event.getClusterName());
switch (event.getEventType()) {
case LiveInstanceChange:
case InstanceConfigChange:
case ExternalViewChange:
_refreshViewCluster = true;
break;
case ViewClusterPeriodicRefresh:
if (!_refreshViewCluster) {
logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
return;
}
// mark source cluster as changed to trigger next refresh as we failed to refresh at
// least some of the elements in view cluster
logger.info("Refreshing cluster based on event " + event.getEventType().name());
_refreshViewCluster = refreshViewCluster();
break;
default:
logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
}
}
private synchronized void handleViewClusterConfigChange(ClusterEvent event) {
logger.info("Processing view cluster event " + event.getEventType().name());
switch (event.getEventType()) {
case ClusterConfigChange:
// TODO: when clusterEventProcessor supports delayed scheduling,
// we should not have this head-of-line blocking but to have ClusterEventProcessor do the work.
// Currently it's acceptable as we can endure delay in processing view cluster config change
if (event.getAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name()) != null) {
try {
Thread.sleep(PROCESS_VIEW_CONFIG_CHANGE_BACKOFF_MS);
} catch (InterruptedException e) {
logger.warn("Interrupted when backing off during process view config change retry", e);
}
}
// We always compare current cluster config with most up-to-date cluster config
ClusterConfig newClusterConfig =
_dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig());
SourceClusterConfigChangeAction action =
new SourceClusterConfigChangeAction(_curViewClusterConfig, newClusterConfig);
action.computeAction();
// If we fail to process action and should retry, re-queue event to retry
if (!processViewClusterConfigUpdate(action)) {
_monitor.recordViewConfigProcessFailure();
event.addAttribute(ViewAggregatorEventAttributes.EventProcessBackoff.name(), true);
_viewConfigProcessor.queueEvent(event);
} else {
_curViewClusterConfig = newClusterConfig;
}
break;
default:
logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
}
}
private class RefreshViewClusterTask extends TimerTask {
@Override
public void run() {
logger.info("Triggering view cluster refresh");
_aggregator.queueEvent(
new ClusterEvent(_viewClusterName, ClusterEventType.ViewClusterPeriodicRefresh));
}
}
/**
* Recreate timer that triggers RefreshViewClusterTask
*/
private void resetTimer(long triggerIntervalMs) {
if (_viewClusterRefreshTimer != null) {
_viewClusterRefreshTimer.cancel();
}
RefreshViewClusterTask refreshTrigger = new RefreshViewClusterTask();
_viewClusterRefreshTimer = new Timer(true);
_viewClusterRefreshTimer.scheduleAtFixedRate(refreshTrigger, 0, triggerIntervalMs);
}
/**
* Use SourceClusterConfigChangeAction to reset timer (RefreshViewClusterTask),
* create/delete SourceClusterDataProvider in data provider map
*
* @return true if success else false
*/
private boolean processViewClusterConfigUpdate(SourceClusterConfigChangeAction action) {
boolean success = true;
for (ViewClusterSourceConfig source : action.getConfigsToDelete()) {
String key = generateDataProviderMapKey(source);
logger.info("Deleting data provider " + key);
if (_dataProviderMap.containsKey(key)) {
try {
_dataProviderMap.get(key).shutdown();
_dataProviderMap.remove(key);
} catch (Exception e) {
success = false;
logger.warn(String.format("Failed to shutdown data provider %s, will retry", key));
}
}
}
for (ViewClusterSourceConfig source : action.getConfigsToAdd()) {
String key = generateDataProviderMapKey(source);
logger.info("Creating data provider " + key);
if (_dataProviderMap.containsKey(key)) {
// possibly due to a previous failure of shutting down, print warning and recreate for now
logger.warn(String.format("Add data provider %s which already exists. Recreating", key));
_dataProviderMap.remove(key);
}
try {
SourceClusterDataProvider provider = new SourceClusterDataProvider(source, _aggregator);
provider.setup();
_dataProviderMap.put(key, provider);
} catch (Exception e) {
success = false;
logger.warn(String.format("Failed to create data provider %s, will retry", key));
}
}
if (action.shouldResetTimer()) {
logger.info(
"Resetting view cluster refresh timer at interval " + action.getCurrentRefreshPeriodMs());
resetTimer(action.getCurrentRefreshPeriodMs());
}
return success;
}
/**
* Use ViewClusterRefresher to refresh ViewCluster.
* @return true if needs retry, else false
*/
private void refreshViewCluster() {
long startRefreshMs = System.currentTimeMillis();
logger.info(String.format("START RefreshViewCluster: Refresh view cluster %s at timestamp %s",
_viewClusterName, startRefreshMs));
boolean dataProviderFailure = false;
boolean viewClusterFailure = false;
// Generate a view of providers so refresh won't block cluster config update
// When a data provider is shutdown while we are reloading cache / generating diff,
// Exception will be thrown out and we retry during next refresh cycle
Set<SourceClusterDataProvider> providerView;
synchronized (_dataProviderMap) {
_refreshViewCluster.set(false);
providerView = new HashSet<>(_dataProviderMap.values());
}
// Refresh data providers
// TODO: the following steps can be parallelized
for (SourceClusterDataProvider provider : providerView) {
try {
provider.refreshCache();
} catch (Exception e) {
logger.warn("Caught exception when refreshing source cluster cache. Abort refresh.", e);
_refreshViewCluster.set(true);
dataProviderFailure = true;
// Skip refresh view cluster when we cannot successfully refresh
// source cluster caches
break;
}
}
// Refresh properties in view cluster
if (!dataProviderFailure) {
_viewClusterRefresher.updateProviderView(providerView);
for (PropertyType propertyType : ViewClusterSourceConfig.getValidPropertyTypes()) {
logger.info(String
.format("Refreshing property %s in view cluster %s", propertyType, _viewClusterName));
try {
// We try to refresh all properties with best effort, and don't break when
// failed to refresh a particular property
if (!_viewClusterRefresher.refreshPropertiesInViewCluster(propertyType)) {
viewClusterFailure = true;
_refreshViewCluster.set(true);
}
} catch (IllegalArgumentException e) {
// Invalid property... not expected! Something wrong with code, should not retry
logger.error(String.format("Failed to refresh property in view cluster %s with exception",
_viewClusterName), e);
}
}
}
recordRefreshResults(dataProviderFailure, viewClusterFailure,
System.currentTimeMillis() - startRefreshMs);
}
private void recordRefreshResults(boolean recordSourceFailure, boolean recordViewFailure,
long latency) {
if (recordSourceFailure) {
_monitor.recordReadSourceFailure();
}
if (recordViewFailure) {
_monitor.recordViewRefreshFailure();
}
_monitor.recordRefreshViewLatency(latency);
logger.info(String
.format("END RefreshViewCluster: finished refresh %s. Time spent: %s ms. Success: %s",
_viewClusterName, latency, !recordSourceFailure && !recordViewFailure));
}
private static String generateHelixManagerInstanceName(String viewClusterName) {
return String.format("HelixViewAggregator-%s", viewClusterName);
}
private static String generateDataProviderMapKey(ViewClusterSourceConfig config) {
return String.format("%s-%s", config.getName(), config.getZkAddress());
}
}