blob: c6df9087626e8cd0c764603655be09007ba719de [file] [log] [blame]
package org.apache.helix.view.aggregator;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.model.ExternalView;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
import org.apache.helix.view.dataprovider.ViewClusterDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class contains logics to refresh view cluster based on information from source cluster data
* providers.
* This class assumes SourceClusterDataProviders have its caches refreshed already.
*/
public class ViewClusterRefresher {
private static final Logger logger = LoggerFactory.getLogger(ViewClusterRefresher.class);
private final String _viewClusterName;
private final HelixDataAccessor _viewClusterDataAccessor;
private final ViewClusterDataCache _viewClusterDataCache;
private Set<SourceClusterDataProvider> _dataProviderView;
public ViewClusterRefresher(String viewClusterName, HelixDataAccessor viewClusterDataAccessor) {
_viewClusterName = viewClusterName;
_viewClusterDataAccessor = viewClusterDataAccessor;
_viewClusterDataCache = new ViewClusterDataCache(viewClusterName);
}
private static class ClusterPropertyDiff {
/**
* List of names of objects to set (create or modify)
*/
List<String> _keysToSet;
/**
* List of actual objects represented by _keysToSet
*/
List<HelixProperty> _propertiesToSet;
/**
* List of names of objects to delete
*/
List<String> _keysToDelete;
public ClusterPropertyDiff() {
_keysToSet = new ArrayList<>();
_propertiesToSet = new ArrayList<>();
_keysToDelete = new ArrayList<>();
}
public void addPropertyToSet(String key, HelixProperty obj) {
// batch setChildren API needs keys and objects to have corresponding order
_keysToSet.add(key);
_propertiesToSet.add(obj);
}
public void addPropertiesToDelete(Collection<? extends String> keys) {
_keysToDelete.addAll(keys);
}
public List<String> getKeysToSet() {
return Collections.unmodifiableList(_keysToSet);
}
public List<HelixProperty> getPropertiesToSet() {
return Collections.unmodifiableList(_propertiesToSet);
}
public List<String> getKeysToDelete() {
return Collections.unmodifiableList(_keysToDelete);
}
}
public void updateProviderView(Set<SourceClusterDataProvider> dataProviderView) {
_dataProviderView = dataProviderView;
}
/**
* Create / update / delete property of given type in view cluster, based on data change from
* source clusters.
*
* @param propertyType type of property to refresh in view cluster
* @return true if successfully refreshed all instances of the given property else false
* @throws IllegalArgumentException throws exception when give type is not supported
*/
public boolean refreshPropertiesInViewCluster(PropertyType propertyType)
throws IllegalArgumentException {
boolean ok = false;
Set<String> listedNamesInView;
Set<String> listedNamesInSource = new HashSet<>();
Map<String, HelixProperty> sourceProperties = new HashMap<>();
Map<String, HelixProperty> viewClusterPropertyCache =
(Map<String, HelixProperty>) getViewClusterPropertyCache(propertyType);
if (viewClusterPropertyCache == null) {
throw new IllegalArgumentException(
"Cannot find view cluster property cache. Property: " + propertyType.name());
}
try {
listedNamesInView =
new HashSet<>(_viewClusterDataAccessor.getChildNames(getPropertyKey(propertyType, null)));
// Prepare data
for (SourceClusterDataProvider provider : _dataProviderView) {
if (!provider.getPropertiesToAggregate().contains(propertyType)) {
logger.info(String
.format("SourceCluster %s does not need to aggregate %s, skip.", provider.getName(),
propertyType.name()));
continue;
}
switch (propertyType) {
case INSTANCES:
listedNamesInSource.addAll(provider.getInstanceConfigNames());
sourceProperties.putAll(provider.getInstanceConfigMap());
break;
case LIVEINSTANCES:
listedNamesInSource.addAll(provider.getLiveInstanceNames());
sourceProperties.putAll(provider.getLiveInstances());
break;
case EXTERNALVIEW:
listedNamesInSource.addAll(provider.getExternalViewNames());
for (Map.Entry<String, ExternalView> entry : provider.getExternalViews().entrySet()) {
String resourceName = entry.getKey();
if (!sourceProperties.containsKey(resourceName)) {
sourceProperties.put(resourceName, new ExternalView(resourceName));
}
mergeExternalViews((ExternalView) sourceProperties.get(resourceName), entry.getValue());
}
break;
default:
// Will NOT come here as for unsupported property type, exception will be thrown out
// earlier
break;
}
}
// Perform refresh
ok = doRefresh(propertyType, listedNamesInView, listedNamesInSource, sourceProperties, viewClusterPropertyCache);
} catch (Exception e) {
logger.warn(String
.format("Caught exception during refreshing %s for view cluster %s", propertyType.name(),
_viewClusterName), e);
}
logRefreshResult(propertyType, ok);
return ok;
}
/**
* Merge external view "toMerge" into external view "source":
* - if partition in toMerge does not exist in source, we add it into source
* - if partition exist in both external views, we add all map fields from toMerge to source
*
* @param source
* @param toMerge
*/
private static void mergeExternalViews(ExternalView source, ExternalView toMerge)
throws IllegalArgumentException {
if (!source.getId().equals(toMerge.getId())) {
throw new IllegalArgumentException(String
.format("Cannot merge ExternalViews with different ID. SourceID: %s; ToMergeID: %s",
source.getId(), toMerge.getId()));
}
for (String partitionName : toMerge.getPartitionSet()) {
// Deep copying state map to avoid modifying source cache
if (!source.getPartitionSet().contains(partitionName)) {
source.setStateMap(partitionName, new TreeMap<String, String>());
}
source.getStateMap(partitionName).putAll(toMerge.getStateMap(partitionName));
}
}
/**
* Based on property names in view cluster, property names in source clusters, and all cached
* properties in source clusters, generate ClusterPropertyDiff that contains information about
* what to add / update or delete
*
* @param viewPropertyNames names of all properties (i.e. liveInstances) in view cluster
* @param sourcePropertyNames names of all properties (i.e. liveInstances) in all source clusters
* @param cachedSourceProperties all cached properties from source clusters
* @param viewClusterPropertyCache all properties that are previously set successfully to view cluster
* @return ClusterPropertyDiff object contains diff information
*/
private ClusterPropertyDiff calculatePropertyDiff(
Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
Map<String, HelixProperty> cachedSourceProperties, Map<String, HelixProperty> viewClusterPropertyCache) {
ClusterPropertyDiff diff = new ClusterPropertyDiff();
// items whose names are in view cluster but not in source should be removed for sure
Set<String> toDelete = new HashSet<>(viewPropertyNames);
toDelete.removeAll(sourcePropertyNames);
diff.addPropertiesToDelete(toDelete);
for (Map.Entry<String, HelixProperty> sourceProperty : cachedSourceProperties.entrySet()) {
String name = sourceProperty.getKey();
HelixProperty property = sourceProperty.getValue();
// cache refresh happens earlier than list curNames, so if cache is still in curNames,
// we confirm that this is a valid live instance. This is necessary because ZK
// can possibly not return all children content in a refresh, but list child names
// will reliably return all children names.
//
// Else, either this child is already deleted, or we fail to retrieve information
// from a cache refresh. either way, we will leave it to next ViewClusterRefresh cycle
// to confirm state
if (property != null && sourcePropertyNames.contains(name) && (
!viewClusterPropertyCache.containsKey(name) || !viewClusterPropertyCache.get(name)
.getRecord().equals(property.getRecord()))) {
diff.addPropertyToSet(name, property);
}
}
return diff;
}
/**
* Refresh view cluster regarding a particular property based given source of truths.
* Steps are:
* - Calculate diff based on propertyNamesInView, propertyNamesInSource and
* cachedSourceProperties
* - Generate property keys for properties to set / delete
* - Delete properties
* - Set properties
*
* @param propertyType type of property to refresh
* @param viewPropertyNames all names of the target properties in view cluster
* @param sourcePropertyNames all names of the target properties in source clusters
* @param cachedSourceProperties all up-to-date cached properties in source cluster
* @param viewClusterPropertyCache view cluster cache
* @return true if all required refreshes are successful, else false
*/
private boolean doRefresh(PropertyType propertyType,
Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
Map<String, HelixProperty> cachedSourceProperties, Map<String, HelixProperty> viewClusterPropertyCache) {
boolean ok = true;
// Calculate diff
ClusterPropertyDiff diff =
calculatePropertyDiff(viewPropertyNames, sourcePropertyNames, cachedSourceProperties, viewClusterPropertyCache);
// Generate property keys
List<PropertyKey> keysToSet = new ArrayList<>();
List<PropertyKey> keysToDelete = new ArrayList<>();
for (String name : diff.getKeysToSet()) {
PropertyKey key = getPropertyKey(propertyType, name);
if (key != null) {
keysToSet.add(key);
}
}
for (String name : diff.getKeysToDelete()) {
PropertyKey key = getPropertyKey(propertyType, name);
if (key != null) {
keysToDelete.add(key);
}
}
// Delete outdated properties
if (!deleteProperties(keysToDelete)) {
ok = false;
}
// Add or update changed properties
if (!addOrUpdateProperties(keysToSet, diff.getPropertiesToSet())) {
ok = false;
}
return ok;
}
/**
* Based on type and property name, generate property key
* @param propertyType type of the property
* @param propertyName name of the property. If null, return key of the parent of
* all properties of given type
* @return property key
*/
private PropertyKey getPropertyKey(PropertyType propertyType, String propertyName) {
switch (propertyType) {
case INSTANCES:
return propertyName == null
? _viewClusterDataAccessor.keyBuilder().instanceConfigs()
: _viewClusterDataAccessor.keyBuilder().instanceConfig(propertyName);
case LIVEINSTANCES:
return propertyName == null
? _viewClusterDataAccessor.keyBuilder().liveInstances()
: _viewClusterDataAccessor.keyBuilder().liveInstance(propertyName);
case EXTERNALVIEW:
return propertyName == null
? _viewClusterDataAccessor.keyBuilder().externalViews()
: _viewClusterDataAccessor.keyBuilder().externalView(propertyName);
default:
return null;
}
}
/**
* Refresh view cluster data cache and return true if there is data update.
* @return true if new change is fetched from remote
*/
boolean refreshViewClusterDataCache() {
return _viewClusterDataCache.updateCache(_viewClusterDataAccessor);
}
private Map<String, ? extends HelixProperty> getViewClusterPropertyCache(PropertyType propertyType) {
switch (propertyType) {
case INSTANCES:
return _viewClusterDataCache.getInstanceConfigMap();
case LIVEINSTANCES:
return _viewClusterDataCache.getLiveInstances();
case EXTERNALVIEW:
return _viewClusterDataCache.getExternalViews();
default:
return null;
}
}
/**
* Create or Update properties in ZK specified by a list of property keys. Update the given cache
* for the objects that got successfully created or updated in ZK
* @param keysToAddOrUpdate
* @param objects
* @param <T> HelixProperty
*
* @return true if all objects are successfully created or updated, else false
*/
private <T extends HelixProperty> boolean addOrUpdateProperties(
List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects) {
boolean ok = true;
logger.info(
String.format("AddOrUpdate %s objects: %s", keysToAddOrUpdate.size(), keysToAddOrUpdate));
boolean[] addOrUpdateResults = _viewClusterDataAccessor.setChildren(keysToAddOrUpdate, objects);
for (int i = 0; i < addOrUpdateResults.length; i++) {
if (!addOrUpdateResults[i]) {
logger.warn(String.format("Failed to create or update live instance %s, will retry later",
keysToAddOrUpdate.get(i).getPath()));
ok = false;
}
}
return ok;
}
/**
* Delete properties in ZK specified by a list of property keys. Update the given cache
* for the objects that got successfully deleted in ZK
* @param keysToDelete
* @param <T> HelixProperty
* @return true if all objects got successfully deleted else false
*/
private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey> keysToDelete) {
boolean ok = true;
logger.info(String.format("Deleting %s objects: %s", keysToDelete.size(), keysToDelete));
for (PropertyKey key : keysToDelete) {
if (!_viewClusterDataAccessor.removeProperty(key)) {
ok = false;
logger.warn(String.format("Failed to create or update live instance %s, will retry later",
key.getPath()));
}
}
return ok;
}
private void logRefreshResult(PropertyType type, boolean ok) {
if (!ok) {
logger.warn(String
.format("Failed to refresh all %s for view cluster %s, will retry",
type.name(), _viewClusterName));
} else {
logger.info(String.format("Successfully refreshed all %s for view cluster %s",
type.name(), _viewClusterName));
}
}
}