blob: 27f4c5099a393c6e27fadbabaf1ba213134c76e9 [file] [log] [blame]
package org.apache.helix.controller.changedetector;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixProperty;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.model.ClusterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
* Helix's main resource pipeline cache (DataProvider) and the computation results of change
* detection.
* WARNING: the methods of this class are not thread-safe.
*/
public class ResourceChangeDetector implements ChangeDetector {
private static final Logger LOG = LoggerFactory.getLogger(ResourceChangeDetector.class.getName());
private final boolean _ignoreControllerGeneratedFields;
private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
// The following caches the computation results
private Map<HelixConstants.ChangeType, Collection<String>> _changedItems = new HashMap<>();
private Map<HelixConstants.ChangeType, Collection<String>> _addedItems = new HashMap<>();
private Map<HelixConstants.ChangeType, Collection<String>> _removedItems = new HashMap<>();
public ResourceChangeDetector(boolean ignoreControllerGeneratedFields) {
_newSnapshot = new ResourceChangeSnapshot();
_ignoreControllerGeneratedFields = ignoreControllerGeneratedFields;
}
public ResourceChangeDetector() {
this(false);
}
/**
* Compare the underlying HelixProperty objects and produce a collection of names of changed
* properties.
* @return
*/
private Collection<String> getChangedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
Map<String, ? extends HelixProperty> newPropertyMap) {
Collection<String> changedItems = new HashSet<>();
oldPropertyMap.forEach((name, property) -> {
if (newPropertyMap.containsKey(name)
&& !property.getRecord().equals(newPropertyMap.get(name).getRecord())) {
changedItems.add(name);
}
});
return changedItems;
}
/**
* Return a collection of names that are newly added.
* @return
*/
private Collection<String> getAddedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
Map<String, ? extends HelixProperty> newPropertyMap) {
return Sets.difference(newPropertyMap.keySet(), oldPropertyMap.keySet());
}
/**
* Return a collection of names that were removed.
* @return
*/
private Collection<String> getRemovedItems(Map<String, ? extends HelixProperty> oldPropertyMap,
Map<String, ? extends HelixProperty> newPropertyMap) {
return Sets.difference(oldPropertyMap.keySet(), newPropertyMap.keySet());
}
private void clearCachedComputation() {
_changedItems.clear();
_addedItems.clear();
_removedItems.clear();
}
/**
* Based on the change type given and propertyMap type, call the right getters for propertyMap.
* @param changeType
* @param snapshot
* @return
*/
private Map<String, ? extends HelixProperty> determinePropertyMapByType(
HelixConstants.ChangeType changeType, ResourceChangeSnapshot snapshot) {
switch (changeType) {
case INSTANCE_CONFIG:
return snapshot.getInstanceConfigMap();
case IDEAL_STATE:
return snapshot.getIdealStateMap();
case RESOURCE_CONFIG:
return snapshot.getResourceConfigMap();
case LIVE_INSTANCE:
return snapshot.getLiveInstances();
case CLUSTER_CONFIG:
ClusterConfig config = snapshot.getClusterConfig();
if (config == null) {
return Collections.emptyMap();
} else {
return Collections.singletonMap(config.getClusterName(), config);
}
default:
LOG.warn(
"ResourceChangeDetector cannot determine propertyMap for the given ChangeType: {}. Returning an empty map.",
changeType);
return Collections.emptyMap();
}
}
/**
* Makes the current newSnapshot the oldSnapshot and reads in the up-to-date snapshot for change
* computation. To be called in the controller pipeline.
* @param dataProvider newly refreshed DataProvider (cache)
*/
public synchronized void updateSnapshots(ResourceControllerDataProvider dataProvider) {
// If there are changes, update internal states
_oldSnapshot = new ResourceChangeSnapshot(_newSnapshot);
_newSnapshot = new ResourceChangeSnapshot(dataProvider, _ignoreControllerGeneratedFields);
dataProvider.clearRefreshedChangeTypes();
// Invalidate cached computation
clearCachedComputation();
}
public synchronized void resetSnapshots() {
_newSnapshot = new ResourceChangeSnapshot();
clearCachedComputation();
}
@Override
public synchronized Collection<HelixConstants.ChangeType> getChangeTypes() {
return Collections.unmodifiableSet(_newSnapshot.getChangedTypes());
}
@Override
public synchronized Collection<String> getChangesByType(HelixConstants.ChangeType changeType) {
return _changedItems.computeIfAbsent(changeType,
changedItems -> getChangedItems(determinePropertyMapByType(changeType, _oldSnapshot),
determinePropertyMapByType(changeType, _newSnapshot)));
}
@Override
public synchronized Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType) {
return _addedItems.computeIfAbsent(changeType,
changedItems -> getAddedItems(determinePropertyMapByType(changeType, _oldSnapshot),
determinePropertyMapByType(changeType, _newSnapshot)));
}
@Override
public synchronized Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType) {
return _removedItems.computeIfAbsent(changeType,
changedItems -> getRemovedItems(determinePropertyMapByType(changeType, _oldSnapshot),
determinePropertyMapByType(changeType, _newSnapshot)));
}
/**
* @return A map contains all the changed items that are categorized by the change types.
*/
public Map<HelixConstants.ChangeType, Set<String>> getAllChanges() {
return getChangeTypes().stream()
.collect(Collectors.toMap(changeType -> changeType, changeType -> {
Set<String> itemKeys = new HashSet<>();
itemKeys.addAll(getAdditionsByType(changeType));
itemKeys.addAll(getChangesByType(changeType));
itemKeys.addAll(getRemovalsByType(changeType));
return itemKeys;
})).entrySet().stream().filter(changeEntry -> !changeEntry.getValue().isEmpty()).collect(
Collectors
.toMap(changeEntry -> changeEntry.getKey(), changeEntry -> changeEntry.getValue()));
}
}