blob: 47b1dfa4caec2238987d7dadf38bdfd63c66ef0a [file] [log] [blame]
package org.apache.helix.store.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.AccessOption;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Property store that does auto fallback to an old location.
* Assuming no concurrent updates
*/
public class AutoFallbackPropertyStore<T> extends ZkHelixPropertyStore<T> {
private static Logger LOG = LoggerFactory.getLogger(AutoFallbackPropertyStore.class);
private final ZkHelixPropertyStore<T> _fallbackStore;
public AutoFallbackPropertyStore(ZkBaseDataAccessor<T> accessor, String root, String fallbackRoot) {
super(accessor, root, null);
if (accessor.exists(fallbackRoot, 0)) {
_fallbackStore = new ZkHelixPropertyStore<T>(accessor, fallbackRoot, null);
} else {
LOG.info("fallbackRoot: " + fallbackRoot
+ " doesn't exist, skip creating fallback property store");
_fallbackStore = null;
}
}
@Override
public boolean update(String path, DataUpdater<T> updater, int options) {
if (_fallbackStore == null) {
return super.update(path, updater, options);
} else {
Stat stat = super.getStat(path, options);
if (stat == null) {
// create znode at new location with fallback-value
T fallbackValue = _fallbackStore.get(path, null, options);
boolean succeed = super.create(path, fallbackValue, AccessOption.PERSISTENT);
if (!succeed) {
LOG.error("Can't update " + path + " since there are concurrent updates");
return false;
}
}
return super.update(path, updater, options);
}
}
@Override
public boolean exists(String path, int options) {
if (_fallbackStore == null) {
return super.exists(path, options);
} else {
boolean exist = super.exists(path, options);
if (!exist) {
exist = _fallbackStore.exists(path, options);
}
return exist;
}
}
@Override
public boolean remove(String path, int options) {
if (_fallbackStore != null) {
_fallbackStore.remove(path, options);
}
return super.remove(path, options);
}
@Override
public T get(String path, Stat stat, int options) {
if (_fallbackStore == null) {
return super.get(path, stat, options);
} else {
T value = super.get(path, stat, options);
if (value == null) {
value = _fallbackStore.get(path, stat, options);
}
return value;
}
}
@Override
public Stat getStat(String path, int options) {
if (_fallbackStore == null) {
return super.getStat(path, options);
} else {
Stat stat = super.getStat(path, options);
if (stat == null) {
stat = _fallbackStore.getStat(path, options);
}
return stat;
}
}
@Override
public boolean[] updateChildren(List<String> paths, List<DataUpdater<T>> updaters, int options) {
if (_fallbackStore == null) {
return super.updateChildren(paths, updaters, options);
} else {
Stat[] stats = super.getStats(paths, options);
Map<String, Integer> fallbackMap = new HashMap<String, Integer>();
Map<String, Integer> updateMap = new HashMap<String, Integer>();
for (int i = 0; i < paths.size(); i++) {
String path = paths.get(i);
if (stats[i] == null) {
fallbackMap.put(path, i);
} else {
updateMap.put(path, i);
}
}
if (fallbackMap.size() > 0) {
List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet());
List<T> fallbackValues = _fallbackStore.get(fallbackPaths, null, options);
boolean createSucceed[] =
super.createChildren(fallbackPaths, fallbackValues, AccessOption.PERSISTENT);
for (int i = 0; i < fallbackPaths.size(); i++) {
String fallbackPath = fallbackPaths.get(i);
if (createSucceed[i]) {
updateMap.put(fallbackPath, fallbackMap.get(fallbackPath));
} else {
LOG.error("Can't update " + fallbackPath + " since there are concurrent updates");
}
}
}
boolean succeed[] = new boolean[paths.size()]; // all init'ed to false
if (updateMap.size() > 0) {
List<String> updatePaths = new ArrayList<String>(updateMap.keySet());
List<DataUpdater<T>> subUpdaters = new ArrayList<DataUpdater<T>>();
for (int i = 0; i < updatePaths.size(); i++) {
String updatePath = updatePaths.get(i);
subUpdaters.add(updaters.get(updateMap.get(updatePath)));
}
boolean updateSucceed[] = super.updateChildren(updatePaths, subUpdaters, options);
for (int i = 0; i < updatePaths.size(); i++) {
String updatePath = updatePaths.get(i);
if (updateSucceed[i]) {
succeed[updateMap.get(updatePath)] = true;
}
}
}
return succeed;
}
}
@Override
public boolean[] exists(List<String> paths, int options) {
if (_fallbackStore == null) {
return super.exists(paths, options);
} else {
boolean[] exists = super.exists(paths, options);
Map<String, Integer> fallbackMap = new HashMap<String, Integer>();
for (int i = 0; i < paths.size(); i++) {
boolean exist = exists[i];
if (!exist) {
fallbackMap.put(paths.get(i), i);
}
}
if (fallbackMap.size() > 0) {
List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet());
boolean[] fallbackExists = _fallbackStore.exists(fallbackPaths, options);
for (int i = 0; i < fallbackPaths.size(); i++) {
String fallbackPath = fallbackPaths.get(i);
int j = fallbackMap.get(fallbackPath);
exists[j] = fallbackExists[i];
}
}
return exists;
}
}
@Override
public boolean[] remove(List<String> paths, int options) {
if (_fallbackStore != null) {
_fallbackStore.remove(paths, options);
}
return super.remove(paths, options);
}
@Override
public List<T> get(List<String> paths, List<Stat> stats, int options) {
if (_fallbackStore == null) {
return super.get(paths, stats, options);
} else {
List<T> values = super.get(paths, stats, options);
Map<String, Integer> fallbackMap = new HashMap<String, Integer>();
for (int i = 0; i < paths.size(); i++) {
T value = values.get(i);
if (value == null) {
fallbackMap.put(paths.get(i), i);
}
}
if (fallbackMap.size() > 0) {
List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet());
List<Stat> fallbackStats = new ArrayList<Stat>();
List<T> fallbackValues = _fallbackStore.get(fallbackPaths, fallbackStats, options);
for (int i = 0; i < fallbackPaths.size(); i++) {
String fallbackPath = fallbackPaths.get(i);
int j = fallbackMap.get(fallbackPath);
values.set(j, fallbackValues.get(i));
if (stats != null) {
stats.set(j, fallbackStats.get(i));
}
}
}
return values;
}
}
@Override
public Stat[] getStats(List<String> paths, int options) {
if (_fallbackStore == null) {
return super.getStats(paths, options);
} else {
Stat[] stats = super.getStats(paths, options);
Map<String, Integer> fallbackMap = new HashMap<String, Integer>();
for (int i = 0; i < paths.size(); i++) {
Stat stat = stats[i];
if (stat == null) {
fallbackMap.put(paths.get(i), i);
}
}
if (fallbackMap.size() > 0) {
List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet());
Stat[] fallbackStats = _fallbackStore.getStats(fallbackPaths, options);
for (int i = 0; i < fallbackPaths.size(); i++) {
String fallbackPath = fallbackPaths.get(i);
int j = fallbackMap.get(fallbackPath);
stats[j] = fallbackStats[i];
}
}
return stats;
}
}
@Override
public List<String> getChildNames(String parentPath, int options) {
if (_fallbackStore == null) {
return super.getChildNames(parentPath, options);
} else {
List<String> childs = super.getChildNames(parentPath, options);
List<String> fallbackChilds = _fallbackStore.getChildNames(parentPath, options);
if (childs == null && fallbackChilds == null) {
return null;
}
// merge two child lists
Set<String> allChildSet = new HashSet<String>();
if (childs != null) {
allChildSet.addAll(childs);
}
if (fallbackChilds != null) {
allChildSet.addAll(fallbackChilds);
}
List<String> allChilds = new ArrayList<String>(allChildSet);
return allChilds;
}
}
@Override
public void start() {
if (_fallbackStore != null) {
_fallbackStore.start();
}
}
@Override
public void stop() {
if (_fallbackStore != null) {
_fallbackStore.stop();
}
super.stop();
}
@Override
public void reset() {
if (_fallbackStore != null) {
_fallbackStore.reset();
}
super.reset();
}
}