blob: 4d3ce7278ddd0379748fcef25f887aa5122eac07 [file] [log] [blame]
package org.apache.helix.common.caches;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.common.controllers.ControlContextProvider;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cache the basic cluster data, including LiveInstances, InstanceConfigs and ExternalViews.
*/
public class BasicClusterDataCache implements ControlContextProvider {
private static Logger LOG = LoggerFactory.getLogger(BasicClusterDataCache.class.getName());
private static final String INSTANCE_CONFIG = "InstanceConfig";
private static final String LIVE_INSTANCE = "LiveInstance";
private String _clusterEventId;
protected PropertyCache<LiveInstance> _liveInstancePropertyCache;
protected PropertyCache<InstanceConfig> _instanceConfigPropertyCache;
protected ExternalViewCache _externalViewCache;
protected String _clusterName;
protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
public BasicClusterDataCache(String clusterName) {
_propertyDataChangedMap = new ConcurrentHashMap<>();
_externalViewCache = new ExternalViewCache(clusterName);
_clusterName = clusterName;
_clusterEventId = AbstractDataCache.UNKNOWN_EVENT_ID;
_liveInstancePropertyCache = new PropertyCache<>(this, LIVE_INSTANCE,
new PropertyCache.PropertyCacheKeyFuncs<LiveInstance>() {
@Override
public PropertyKey getRootKey(HelixDataAccessor accessor) {
return accessor.keyBuilder().liveInstances();
}
@Override
public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) {
return accessor.keyBuilder().liveInstance(objName);
}
@Override
public String getObjName(LiveInstance obj) {
return obj.getInstanceName();
}
}, true);
_instanceConfigPropertyCache = new PropertyCache<>(this, INSTANCE_CONFIG,
new PropertyCache.PropertyCacheKeyFuncs<InstanceConfig>() {
@Override
public PropertyKey getRootKey(HelixDataAccessor accessor) {
return accessor.keyBuilder().instanceConfigs();
}
@Override
public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) {
return accessor.keyBuilder().instanceConfig(objName);
}
@Override
public String getObjName(InstanceConfig obj) {
return obj.getInstanceName();
}
}, true);
}
/**
* This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way.
* If we want to support multi-threading in the future, this method needs to be synchronized.
*
* @param accessor
*
* @return
*/
public void refresh(HelixDataAccessor accessor) {
LOG.info("START: BasicClusterDataCache.refresh() for cluster " + _clusterName);
long startTime = System.currentTimeMillis();
if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.EXTERNAL_VIEW, false)) {
_propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, false);
_externalViewCache.refresh(accessor);
}
if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, false)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, false);
_propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, true);
_liveInstancePropertyCache.refresh(accessor);
LOG.info("Reload LiveInstances: " + _liveInstancePropertyCache.getPropertyMap().keySet()
+ ". Takes " + (System.currentTimeMillis() - start) + " ms");
}
if (_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.INSTANCE_CONFIG, false)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG, false);
_instanceConfigPropertyCache.refresh(accessor);
LOG.info("Reload InstanceConfig: " + _instanceConfigPropertyCache.getPropertyMap().keySet()
+ ". Takes " + (System.currentTimeMillis() - start) + " ms");
}
long endTime = System.currentTimeMillis();
LOG.info(
"END: BasicClusterDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
- startTime) + " ms");
if (LOG.isDebugEnabled()) {
LOG.debug("LiveInstances: {}", _liveInstancePropertyCache.getPropertyMap());
LOG.debug("ExternalViews: {}", _externalViewCache.getExternalViewMap().keySet());
LOG.debug("InstanceConfigs: {}", _instanceConfigPropertyCache.getPropertyMap());
}
}
/**
* Retrieves the ExternalView for all resources
*
* @return
*/
public Map<String, ExternalView> getExternalViews() {
return _externalViewCache.getExternalViewMap();
}
/**
* Returns the LiveInstances for each of the instances that are curretnly up and running
*
* @return
*/
public Map<String, LiveInstance> getLiveInstances() {
return _liveInstancePropertyCache.getPropertyMap();
}
/**
* Returns the instance config map
*
* @return
*/
public Map<String, InstanceConfig> getInstanceConfigMap() {
return _instanceConfigPropertyCache.getPropertyMap();
}
/**
* Notify the cache that some part of the cluster data has been changed.
*
* @param changeType
* @param pathChanged
*/
public void notifyDataChange(HelixConstants.ChangeType changeType, String pathChanged) {
notifyDataChange(changeType);
}
/**
* Notify the cache that some part of the cluster data has been changed.
*
* @param changeType
*/
public void notifyDataChange(HelixConstants.ChangeType changeType) {
_propertyDataChangedMap.put(changeType, true);
}
/**
* Clear the corresponding cache based on change type
* @param changeType
*/
public synchronized void clearCache(HelixConstants.ChangeType changeType) {
switch (changeType) {
case LIVE_INSTANCE:
_liveInstancePropertyCache.setPropertyMap(HelixProperty.convertListToMap(Collections.emptyList()));
break;
case INSTANCE_CONFIG:
_instanceConfigPropertyCache.setPropertyMap(HelixProperty.convertListToMap(Collections.emptyList()));
break;
case EXTERNAL_VIEW:
_externalViewCache.clear();
break;
default:
break;
}
}
/**
* Indicate that a full read should be done on the next refresh
*/
public void requireFullRefresh() {
for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
_propertyDataChangedMap.put(type, Boolean.TRUE);
}
}
/**
* toString method to print the data cache state
*/
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("liveInstancePropertyCache: ").append(_liveInstancePropertyCache.getPropertyMap())
.append("\n");
sb.append("externalViewCache: ").append(_externalViewCache.getExternalViewMap()).append("\n");
sb.append("instanceConfigPropertyCache: ").append(_instanceConfigPropertyCache.getPropertyMap())
.append("\n");
return sb.toString();
}
@Override
public String getClusterName() {
return _clusterName;
}
@Override
public String getClusterEventId() {
return _clusterEventId;
}
@Override
public void setClusterEventId(String clusterEventId) {
_clusterEventId = clusterEventId;
}
@Override
public String getPipelineName() {
return AbstractDataCache.UNKNOWN_PIPELINE;
}
}