blob: 4a44579cfa7281703b285dd330792d5bf8ae4bc1 [file] [log] [blame]
package org.apache.helix.view.aggregator;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyType;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.view.common.ClusterViewEvent;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
import org.apache.helix.view.monitoring.ViewAggregatorMonitor;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Main logic for Helix view aggregator
*/
public class HelixViewAggregator implements ClusterConfigChangeListener {
private static final Logger logger = LoggerFactory.getLogger(HelixViewAggregator.class);
private static final long DEFAULT_INITIAL_EVENT_PROCESS_BACKOFF = 10;
private static final long DEFAULT_MAX_EVENT_PROCESS_BACKOFF = 5 * 1000;
private final String _viewClusterName;
private final HelixManager _viewClusterManager;
private final Map<String, SourceClusterDataProvider> _dataProviderMap;
// Worker that processes source cluster events and refresh view cluster
private final DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> _aggregator;
private final AtomicBoolean _refreshViewCluster;
// Worker that processes view cluster config change
private final DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent> _viewConfigProcessor;
private final ViewAggregatorMonitor _monitor;
private ClusterConfig _curViewClusterConfig;
private Timer _viewClusterRefreshTimer;
private ViewClusterRefresher _viewClusterRefresher;
private HelixDataAccessor _dataAccessor;
public HelixViewAggregator(String viewClusterName, String zkAddr) {
_viewClusterName = viewClusterName;
_dataProviderMap = new ConcurrentHashMap<>();
_viewClusterManager = HelixManagerFactory
.getZKHelixManager(_viewClusterName, generateHelixManagerInstanceName(_viewClusterName),
InstanceType.SPECTATOR, zkAddr);
_refreshViewCluster = new AtomicBoolean(true);
_monitor = new ViewAggregatorMonitor(viewClusterName);
_aggregator = new DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent>(_viewClusterName,
"Aggregator") {
@Override
public void handleEvent(ClusterViewEvent event) {
handleSourceClusterEvent(event);
_monitor.recordProcessedSourceEvent();
}
};
_viewConfigProcessor = new DedupEventProcessor<ClusterViewEvent.Type, ClusterViewEvent>(_viewClusterName, "ViewConfigProcessor") {
@Override
public void handleEvent(ClusterViewEvent event) {
handleViewClusterConfigChange(event);
}
};
}
public String getAggregatorInstanceName() {
return String.format("%s::%s", _viewClusterManager.getInstanceName(), hashCode());
}
/**
* Start controller main logic
* @throws Exception when HelixViewAggregator fails to start. Will try to shut it down before
* exception is thrown out
*/
public void start() throws Exception {
_monitor.register();
// Start workers
_aggregator.start();
_viewConfigProcessor.start();
// Setup manager
try {
_viewClusterManager.connect();
_dataAccessor = _viewClusterManager.getHelixDataAccessor();
_viewClusterManager.addClusterfigChangeListener(this);
} catch (Exception e) {
shutdown();
throw new HelixException("Failed to connect view cluster helix manager", e);
}
// Set up view cluster refresher
_viewClusterRefresher = new ViewClusterRefresher(_viewClusterName, _viewClusterManager.getHelixDataAccessor());
}
public void shutdown() {
boolean success = true;
// Stop all workers
_aggregator.interrupt();
_viewConfigProcessor.interrupt();
// Stop timer
if (_viewClusterRefreshTimer != null) {
logger.info("Shutting down view cluster refresh timer");
_viewClusterRefreshTimer.cancel();
}
// disconnect manager
if (_viewClusterManager != null && _viewClusterManager.isConnected()) {
logger.info("Shutting down view cluster helix manager");
try {
_viewClusterManager.disconnect();
} catch (ZkInterruptedException zkintr) {
logger.warn("ZK interrupted when disconnecting helix manager", zkintr);
} catch (Exception e) {
success = false;
logger.error(String.format("Failed to disconnect helix manager for view cluster %s", _viewClusterName), e);
}
}
// Clean up all data providers
for (SourceClusterDataProvider provider : _dataProviderMap.values()) {
logger.info(String.format("Shutting down data provider for source cluster %s", provider.getName()));
try {
provider.shutdown();
} catch (Exception e) {
success = false;
logger.error(String.format("Failed to shutdown data provider %s for view cluster %s", provider.getName(),
_viewClusterName), e);
}
}
logger.info("Unregistering monitor.");
_monitor.unregister();
logger.info("HelixViewAggregator shutdown " + (success ? "cleanly" : "with error"));
}
@Override
@PreFetch(enabled = false)
public void onClusterConfigChange(ClusterConfig clusterConfig, NotificationContext context) {
if (context != null && context.getType() != NotificationContext.Type.FINALIZE) {
_viewConfigProcessor.queueEvent(ClusterViewEvent.Type.ConfigChange,
new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.ConfigChange));
} else {
logger.info(String.format("Skip processing view cluster config change with notification context type %s",
context == null ? "NoContext" : context.getType().name()));
}
}
private void handleSourceClusterEvent(ClusterViewEvent event) {
logger.info(String
.format("Processing event %s from source cluster %s.", event.getEventType().name(),
event.getClusterName()));
switch (event.getEventType()) {
case ExternalViewChange:
case InstanceConfigChange:
case LiveInstanceChange:
_refreshViewCluster.set(true);
break;
case PeriodicViewRefresh:
// refresh local view cluster data cache
boolean cacheChanged = _viewClusterRefresher.refreshViewClusterDataCache();
if (cacheChanged) {
logger.info("Detected change in view cluster data, trigger a refresh");
} else if (!_refreshViewCluster.get()) {
logger.info("Skip refresh: No event happened since last refresh, and no force refresh.");
return;
}
// mark source cluster as changed to trigger next refresh as we failed to refresh at
// least some of the elements in view cluster
logger.info("Refreshing cluster based on event " + event.getEventType().name());
refreshViewCluster();
_viewClusterRefresher.refreshViewClusterDataCache();
break;
default:
logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
}
}
private void handleViewClusterConfigChange(ClusterViewEvent event) {
logger.info(String.format("Processing event %s for view cluster %s", event.getEventType().name(),
_viewClusterName));
if (event.getEventType() == ClusterViewEvent.Type.ConfigChange) {
// TODO: when DedupEventProcessor supports delayed scheduling,
// we should not have this head-of-line blocking but to have DedupEventProcessor do the work.
// Currently it's acceptable as we can endure delay in processing view cluster config change
try {
Thread.sleep(event.getEventProcessBackoff());
} catch (InterruptedException e) {
logger.warn("Interrupted when backing off during process view config change retry", e);
Thread.currentThread().interrupt();
}
// We always compare current cluster config with most up-to-date cluster config
boolean success;
ClusterConfig newClusterConfig =
_dataAccessor.getProperty(_dataAccessor.keyBuilder().clusterConfig());
if (newClusterConfig == null) {
logger.warn("Failed to read view cluster config");
success = false;
} else {
SourceClusterConfigChangeAction action =
new SourceClusterConfigChangeAction(_curViewClusterConfig, newClusterConfig);
action.computeAction();
success = processViewClusterConfigUpdate(action);
}
// If we fail to process action and should retry, re-queue event to retry
if (!success) {
_monitor.recordViewConfigProcessFailure();
long backoff = computeNextEventProcessBackoff(event.getEventProcessBackoff());
logger.info("Failed to process view cluster config change. Will retry in {} ms", backoff);
event.setEventProcessBackoff(backoff);
_viewConfigProcessor.queueEvent(event.getEventType(), event);
} else {
_curViewClusterConfig = newClusterConfig;
}
} else {
logger.error(String.format("Unrecognized event type: %s", event.getEventType()));
}
}
private long computeNextEventProcessBackoff(long currentBackoff) {
if (currentBackoff <= 0) {
return DEFAULT_INITIAL_EVENT_PROCESS_BACKOFF;
}
// Exponential backoff with ceiling
return currentBackoff * 2 > DEFAULT_MAX_EVENT_PROCESS_BACKOFF
? DEFAULT_MAX_EVENT_PROCESS_BACKOFF
: currentBackoff * 2;
}
private class RefreshViewClusterTask extends TimerTask {
@Override
public void run() {
logger.info("Triggering view cluster refresh");
_aggregator.queueEvent(ClusterViewEvent.Type.PeriodicViewRefresh,
new ClusterViewEvent(_viewClusterName, ClusterViewEvent.Type.PeriodicViewRefresh));
}
}
/**
* Recreate timer that triggers RefreshViewClusterTask
*/
private void resetTimer(long triggerIntervalMs) {
if (_viewClusterRefreshTimer != null) {
_viewClusterRefreshTimer.cancel();
}
RefreshViewClusterTask refreshTrigger = new RefreshViewClusterTask();
_viewClusterRefreshTimer = new Timer(true);
_viewClusterRefreshTimer.scheduleAtFixedRate(refreshTrigger, 0, triggerIntervalMs);
}
/**
* Use SourceClusterConfigChangeAction to reset timer (RefreshViewClusterTask),
* create/delete SourceClusterDataProvider in data provider map
*
* @return true if success else false
*/
private boolean processViewClusterConfigUpdate(SourceClusterConfigChangeAction action) {
boolean success = true;
for (ViewClusterSourceConfig source : action.getConfigsToDelete()) {
String key = generateDataProviderMapKey(source);
logger.info("Deleting data provider " + key);
if (_dataProviderMap.containsKey(key)) {
try {
_dataProviderMap.get(key).shutdown();
synchronized (_dataProviderMap) {
_dataProviderMap.remove(key);
// upon successful removal of data provider, set refresh view cluster to true
// or if no event from source cluster happened before next refresh cycle, this
// removal will be missed.
_refreshViewCluster.set(true);
}
} catch (Exception e) {
success = false;
logger.warn(String.format("Failed to shutdown data provider %s, will retry", key));
}
}
}
for (ViewClusterSourceConfig source : action.getConfigsToAdd()) {
String key = generateDataProviderMapKey(source);
logger.info("Creating data provider " + key);
if (_dataProviderMap.containsKey(key)) {
// possibly due to a previous failure of shutting down, print warning and recreate for now
logger.warn(String.format("Add data provider %s which already exists. Recreating", key));
_dataProviderMap.remove(key);
}
try {
SourceClusterDataProvider provider = new SourceClusterDataProvider(source, _aggregator);
provider.setup();
_dataProviderMap.put(key, provider);
} catch (Exception e) {
success = false;
logger.warn(String.format("Failed to create data provider %s, will retry", key));
}
}
if (action.shouldResetTimer()) {
logger.info(
"Resetting view cluster refresh timer at interval " + action.getCurrentRefreshPeriodMs());
resetTimer(action.getCurrentRefreshPeriodMs());
}
return success;
}
/**
* Use ViewClusterRefresher to refresh ViewCluster.
*/
private void refreshViewCluster() {
long startRefreshMs = System.currentTimeMillis();
logger.info(String.format("START RefreshViewCluster: Refresh view cluster %s at timestamp %s",
_viewClusterName, startRefreshMs));
boolean dataProviderFailure = false;
boolean viewClusterFailure = false;
// Generate a view of providers so refresh won't block cluster config update
// When a data provider is shutdown while we are reloading cache / generating diff,
// Exception will be thrown out and we retry during next refresh cycle
Set<SourceClusterDataProvider> providerView;
synchronized (_dataProviderMap) {
_refreshViewCluster.set(false);
providerView = new HashSet<>(_dataProviderMap.values());
}
// Refresh data providers
// TODO: the following steps can be parallelized
for (SourceClusterDataProvider provider : providerView) {
try {
provider.refreshCache();
} catch (Exception e) {
logger.warn("Caught exception when refreshing source cluster cache. Abort refresh.", e);
_refreshViewCluster.set(true);
dataProviderFailure = true;
// Skip refresh view cluster when we cannot successfully refresh
// source cluster caches
break;
}
}
// Refresh properties in view cluster
if (!dataProviderFailure) {
_viewClusterRefresher.updateProviderView(providerView);
for (PropertyType propertyType : ViewClusterSourceConfig.getValidPropertyTypes()) {
logger.info(String
.format("Refreshing property %s in view cluster %s", propertyType, _viewClusterName));
try {
// We try to refresh all properties with best effort, and don't break when
// failed to refresh a particular property
if (!_viewClusterRefresher.refreshPropertiesInViewCluster(propertyType)) {
viewClusterFailure = true;
_refreshViewCluster.set(true);
}
} catch (IllegalArgumentException e) {
// Invalid property... not expected! Something wrong with code, should not retry
logger.error(String.format("Failed to refresh property in view cluster %s with exception",
_viewClusterName), e);
}
}
}
recordRefreshResults(dataProviderFailure, viewClusterFailure,
System.currentTimeMillis() - startRefreshMs);
}
private void recordRefreshResults(boolean recordSourceFailure, boolean recordViewFailure,
long latency) {
if (recordSourceFailure) {
_monitor.recordReadSourceFailure();
}
if (recordViewFailure) {
_monitor.recordViewRefreshFailure();
}
_monitor.recordRefreshViewLatency(latency);
logger.info(String
.format("END RefreshViewCluster: finished refresh %s. Time spent: %s ms. Success: %s",
_viewClusterName, latency, !recordSourceFailure && !recordViewFailure));
}
private static String generateHelixManagerInstanceName(String viewClusterName) {
return String.format("HelixViewAggregator-%s", viewClusterName);
}
private static String generateDataProviderMapKey(ViewClusterSourceConfig config) {
return String.format("%s-%s", config.getName(), config.getZkAddress());
}
}