blob: 38ac7451e288dbfe0309c0efc94899bfd33def93 [file] [log] [blame]
package org.apache.helix.common.caches;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.common.controllers.ControlContextProvider;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.LiveInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cache to hold all CurrentStates of a cluster.
*/
public class CurrentStateCache extends AbstractDataCache<CurrentState> {
private static final Logger LOG = LoggerFactory.getLogger(CurrentStateCache.class.getName());
private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap;
private Map<PropertyKey, CurrentState> _currentStateCache = Maps.newHashMap();
// If the cache is already refreshed with current state data.
private boolean _initialized = false;
private CurrentStateSnapshot _snapshot;
public CurrentStateCache(String clusterName) {
this(createDefaultControlContextProvider(clusterName));
}
public CurrentStateCache(ControlContextProvider contextProvider) {
super(contextProvider);
_currentStateMap = Collections.emptyMap();
_snapshot = new CurrentStateSnapshot(_currentStateCache);
}
/**
* This refreshes the CurrentStates data by re-fetching the data from zookeeper in an efficient
* way
*
* @param accessor
* @param liveInstanceMap map of all liveInstances in cluster
*
* @return
*/
public boolean refresh(HelixDataAccessor accessor,
Map<String, LiveInstance> liveInstanceMap) {
long startTime = System.currentTimeMillis();
refreshCurrentStatesCache(accessor, liveInstanceMap);
Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap = new HashMap<>();
for (PropertyKey key : _currentStateCache.keySet()) {
CurrentState currentState = _currentStateCache.get(key);
String[] params = key.getParams();
if (currentState != null && params.length >= 4) {
String instanceName = params[1];
String sessionId = params[2];
String stateName = params[3];
Map<String, Map<String, CurrentState>> instanceCurStateMap =
allCurStateMap.get(instanceName);
if (instanceCurStateMap == null) {
instanceCurStateMap = Maps.newHashMap();
allCurStateMap.put(instanceName, instanceCurStateMap);
}
Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId);
if (sessionCurStateMap == null) {
sessionCurStateMap = Maps.newHashMap();
instanceCurStateMap.put(sessionId, sessionCurStateMap);
}
sessionCurStateMap.put(stateName, currentState);
}
}
for (String instance : allCurStateMap.keySet()) {
allCurStateMap.put(instance, Collections.unmodifiableMap(allCurStateMap.get(instance)));
}
_currentStateMap = Collections.unmodifiableMap(allCurStateMap);
long endTime = System.currentTimeMillis();
LogUtil.logInfo(LOG, genEventInfo(),
"END: CurrentStateCache.refresh() for cluster " + _controlContextProvider.getClusterName()
+ ", started at : " + startTime + ", took " + (endTime - startTime) + " ms");
if (LOG.isDebugEnabled()) {
LogUtil.logDebug(LOG, genEventInfo(),
String.format("Current State refreshed : %s", _currentStateMap.toString()));
}
return true;
}
// reload current states that has been changed from zk to local cache.
private void refreshCurrentStatesCache(HelixDataAccessor accessor,
Map<String, LiveInstance> liveInstanceMap) {
long start = System.currentTimeMillis();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
Set<PropertyKey> currentStateKeys = new HashSet<>();
for (String instanceName : liveInstanceMap.keySet()) {
LiveInstance liveInstance = liveInstanceMap.get(instanceName);
String sessionId = liveInstance.getSessionId();
List<String> currentStateNames =
accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId));
for (String currentStateName : currentStateNames) {
currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName));
}
}
// All new entries from zk not cached locally yet should be read from ZK.
Set<PropertyKey> reloadKeys = new HashSet<>(currentStateKeys);
reloadKeys.removeAll(_currentStateCache.keySet());
Set<PropertyKey> cachedKeys = new HashSet<>(_currentStateCache.keySet());
cachedKeys.retainAll(currentStateKeys);
Map<PropertyKey, CurrentState> newStateCache = Collections.unmodifiableMap(
refreshProperties(accessor, new ArrayList<>(reloadKeys), new ArrayList<>(cachedKeys),
_currentStateCache));
// if the cache was not initialized, the previous state should not be included in the snapshot
if (_initialized) {
_snapshot = new CurrentStateSnapshot(newStateCache, _currentStateCache, reloadKeys);
} else {
_snapshot = new CurrentStateSnapshot(newStateCache);
_initialized = true;
}
_currentStateCache = newStateCache;
if (LOG.isDebugEnabled()) {
LogUtil.logDebug(LOG, genEventInfo(),
"# of CurrentStates reload: " + reloadKeys.size() + ", skipped:" + (
currentStateKeys.size() - reloadKeys.size()) + ". took " + (System.currentTimeMillis()
- start) + " ms to reload new current states for cluster: " + _controlContextProvider
.getClusterName());
}
}
/**
* Return CurrentStates map for all instances.
*
* @return
*/
public Map<String, Map<String, Map<String, CurrentState>>> getCurrentStatesMap() {
return Collections.unmodifiableMap(_currentStateMap);
}
/**
* Return all CurrentState on the given instance.
*
* @param instance
*
* @return
*/
public Map<String, Map<String, CurrentState>> getCurrentStates(String instance) {
if (!_currentStateMap.containsKey(instance)) {
return Collections.emptyMap();
}
return Collections.unmodifiableMap(_currentStateMap.get(instance));
}
/**
* Provides the current state of the node for a given session id, the sessionid can be got from
* LiveInstance
*
* @param instance
* @param clientSessionId
*
* @return
*/
public Map<String, CurrentState> getCurrentState(String instance, String clientSessionId) {
if (!_currentStateMap.containsKey(instance) || !_currentStateMap.get(instance)
.containsKey(clientSessionId)) {
return Collections.emptyMap();
}
return Collections.unmodifiableMap(_currentStateMap.get(instance).get(clientSessionId));
}
@Override
public CurrentStateSnapshot getSnapshot() {
return _snapshot;
}
}