| package org.apache.helix.manager.zk; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import org.I0Itec.zkclient.DataUpdater; |
| import org.I0Itec.zkclient.IZkChildListener; |
| import org.I0Itec.zkclient.IZkDataListener; |
| import org.I0Itec.zkclient.exception.ZkBadVersionException; |
| import org.I0Itec.zkclient.exception.ZkException; |
| import org.I0Itec.zkclient.exception.ZkNoNodeException; |
| import org.I0Itec.zkclient.exception.ZkNodeExistsException; |
| import org.apache.helix.AccessOption; |
| import org.apache.helix.BaseDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.api.exceptions.HelixMetaDataAccessException; |
| import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler; |
| import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler; |
| import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler; |
| import org.apache.helix.manager.zk.ZkAsyncCallbacks.GetDataCallbackHandler; |
| import org.apache.helix.manager.zk.ZkAsyncCallbacks.SetDataCallbackHandler; |
| import org.apache.helix.manager.zk.client.HelixZkClient; |
| import org.apache.helix.store.zk.ZNode; |
| import org.apache.helix.util.HelixUtil; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException.Code; |
| import org.apache.zookeeper.data.Stat; |
| import org.apache.zookeeper.server.DataTree; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { |
| enum RetCode { |
| OK, |
| NODE_EXISTS, |
| ERROR |
| } |
| |
| /** |
| * struct holding return information |
| */ |
| public class AccessResult { |
| RetCode _retCode; |
| List<String> _pathCreated; |
| |
| Stat _stat; |
| |
| /** |
| * used by update only |
| */ |
| T _updatedValue; |
| |
| public AccessResult() { |
| _retCode = RetCode.ERROR; |
| _pathCreated = new ArrayList<>(); |
| _stat = new Stat(); |
| _updatedValue = null; |
| } |
| } |
| |
| private static Logger LOG = LoggerFactory.getLogger(ZkBaseDataAccessor.class); |
| |
| private final HelixZkClient _zkClient; |
| |
| public ZkBaseDataAccessor(HelixZkClient zkClient) { |
| if (zkClient == null) { |
| throw new NullPointerException("zkclient is null"); |
| } |
| _zkClient = zkClient; |
| } |
| |
| /** |
| * sync create |
| */ |
| @Override |
| public boolean create(String path, T record, int options) { |
| AccessResult result = doCreate(path, record, options); |
| return result._retCode == RetCode.OK; |
| } |
| |
| /** |
| * sync create |
| */ |
| public AccessResult doCreate(String path, T record, int options) { |
| AccessResult result = new AccessResult(); |
| CreateMode mode = AccessOption.getMode(options); |
| if (mode == null) { |
| LOG.error("Invalid create mode. options: " + options); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| |
| boolean retry; |
| do { |
| retry = false; |
| try { |
| _zkClient.create(path, record, mode); |
| result._pathCreated.add(path); |
| |
| result._retCode = RetCode.OK; |
| return result; |
| } catch (ZkNoNodeException e) { |
| // this will happen if parent node does not exist |
| String parentPath = HelixUtil.getZkParentPath(path); |
| try { |
| AccessResult res = doCreate(parentPath, null, AccessOption.PERSISTENT); |
| result._pathCreated.addAll(res._pathCreated); |
| RetCode rc = res._retCode; |
| if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS) { |
| // if parent node created/exists, retry |
| retry = true; |
| } |
| } catch (Exception e1) { |
| LOG.error("Exception while creating path: " + parentPath, e1); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| } catch (ZkNodeExistsException e) { |
| LOG.warn("Node already exists. path: " + path); |
| result._retCode = RetCode.NODE_EXISTS; |
| return result; |
| } catch (Exception e) { |
| LOG.error("Exception while creating path: " + path, e); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| } while (retry); |
| |
| result._retCode = RetCode.OK; |
| return result; |
| } |
| |
| /** |
| * sync set |
| */ |
| @Override |
| public boolean set(String path, T record, int options) { |
| return set(path, record, -1, options); |
| } |
| |
| /** |
| * sync set |
| */ |
| @Override |
| public boolean set(String path, T record, int expectVersion, int options) { |
| AccessResult result = doSet(path, record, expectVersion, options); |
| return result._retCode == RetCode.OK; |
| } |
| |
| /** |
| * sync set |
| */ |
| public AccessResult doSet(String path, T record, int expectVersion, int options) { |
| AccessResult result = new AccessResult(); |
| |
| CreateMode mode = AccessOption.getMode(options); |
| if (mode == null) { |
| LOG.error("Invalid set mode. options: " + options); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| |
| boolean retry; |
| do { |
| retry = false; |
| try { |
| Stat stat = _zkClient.writeDataGetStat(path, record, expectVersion); |
| DataTree.copyStat(stat, result._stat); |
| } catch (ZkNoNodeException e) { |
| // node not exists, try create if expectedVersion == -1; in this case, stat will not be set |
| if (expectVersion != -1) { |
| LOG.error("Could not create node if expectVersion != -1, was " + expectVersion); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| try { |
| // may create recursively |
| AccessResult res = doCreate(path, record, options); |
| result._pathCreated.addAll(res._pathCreated); |
| RetCode rc = res._retCode; |
| switch (rc) { |
| case OK: |
| // not set stat if node is created (instead of set) |
| break; |
| case NODE_EXISTS: |
| retry = true; |
| break; |
| default: |
| LOG.error("Fail to set path by creating: " + path); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| } catch (Exception e1) { |
| LOG.error("Exception while setting path by creating: " + path, e); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| } catch (ZkBadVersionException e) { |
| LOG.debug("Exception while setting path: " + path, e); |
| throw e; |
| } catch (Exception e) { |
| LOG.error("Exception while setting path: " + path, e); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| } while (retry); |
| |
| result._retCode = RetCode.OK; |
| return result; |
| } |
| |
| /** |
| * sync update |
| */ |
| @Override |
| public boolean update(String path, DataUpdater<T> updater, int options) { |
| AccessResult result = doUpdate(path, updater, options); |
| return result._retCode == RetCode.OK; |
| } |
| |
| /** |
| * sync update |
| */ |
| public AccessResult doUpdate(String path, DataUpdater<T> updater, int options) { |
| AccessResult result = new AccessResult(); |
| CreateMode mode = AccessOption.getMode(options); |
| if (mode == null) { |
| LOG.error("Invalid update mode. options: " + options); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| |
| boolean retry; |
| T updatedData = null; |
| do { |
| retry = false; |
| try { |
| Stat readStat = new Stat(); |
| T oldData = (T) _zkClient.readData(path, readStat); |
| T newData = updater.update(oldData); |
| if (newData != null) { |
| Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion()); |
| DataTree.copyStat(setStat, result._stat); |
| } |
| updatedData = newData; |
| } catch (ZkBadVersionException e) { |
| retry = true; |
| } catch (ZkNoNodeException e) { |
| // node not exist, try create, pass null to updater |
| try { |
| T newData = updater.update(null); |
| RetCode rc; |
| if (newData != null) { |
| AccessResult res = doCreate(path, newData, options); |
| result._pathCreated.addAll(res._pathCreated); |
| rc = res._retCode; |
| } else { |
| // If update returns null, no need to create. |
| rc = RetCode.OK; |
| } |
| switch (rc) { |
| case OK: |
| updatedData = newData; |
| break; |
| case NODE_EXISTS: |
| retry = true; |
| break; |
| default: |
| LOG.error("Fail to update path by creating: " + path); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| } catch (Exception e1) { |
| LOG.error("Exception while updating path by creating: " + path, e1); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| } catch (Exception e) { |
| LOG.error("Exception while updating path: " + path, e); |
| result._retCode = RetCode.ERROR; |
| return result; |
| } |
| } while (retry); |
| |
| result._retCode = RetCode.OK; |
| result._updatedValue = updatedData; |
| return result; |
| } |
| |
| /** |
| * sync get |
| */ |
| @Override |
| public T get(String path, Stat stat, int options) { |
| T data = null; |
| try { |
| data = (T) _zkClient.readData(path, stat); |
| } catch (ZkNoNodeException e) { |
| if (AccessOption.isThrowExceptionIfNotExist(options)) { |
| throw e; |
| } |
| } |
| return data; |
| } |
| |
| /** |
| * async get |
| */ |
| @Override |
| public List<T> get(List<String> paths, List<Stat> stats, int options) { |
| boolean[] needRead = new boolean[paths.size()]; |
| Arrays.fill(needRead, true); |
| |
| return get(paths, stats, needRead, false); |
| } |
| |
| |
| @Override |
| public List<T> get(List<String> paths, List<Stat> stats, int options, |
| boolean throwException) throws HelixException { |
| boolean[] needRead = new boolean[paths.size()]; |
| Arrays.fill(needRead, true); |
| |
| return get(paths, stats, needRead, throwException); |
| } |
| |
| /** |
| * async get |
| */ |
| List<T> get(List<String> paths, List<Stat> stats, boolean[] needRead, boolean throwException) |
| throws HelixException { |
| if (paths == null || paths.size() == 0) { |
| return Collections.emptyList(); |
| } |
| |
| // init stats |
| if (stats != null) { |
| stats.clear(); |
| stats.addAll(Collections.<Stat> nCopies(paths.size(), null)); |
| } |
| |
| long startT = System.nanoTime(); |
| |
| try { |
| // issue asyn get requests |
| GetDataCallbackHandler[] cbList = new GetDataCallbackHandler[paths.size()]; |
| for (int i = 0; i < paths.size(); i++) { |
| if (!needRead[i]) |
| continue; |
| |
| String path = paths.get(i); |
| cbList[i] = new GetDataCallbackHandler(); |
| _zkClient.asyncGetData(path, cbList[i]); |
| } |
| |
| // wait for completion |
| for (int i = 0; i < cbList.length; i++) { |
| if (!needRead[i]) |
| continue; |
| |
| GetDataCallbackHandler cb = cbList[i]; |
| cb.waitForSuccess(); |
| } |
| |
| // construct return results |
| List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null)); |
| Map<String, Integer> pathFailToRead = new HashMap<>(); |
| for (int i = 0; i < paths.size(); i++) { |
| if (!needRead[i]) |
| continue; |
| |
| GetDataCallbackHandler cb = cbList[i]; |
| if (Code.get(cb.getRc()) == Code.OK) { |
| @SuppressWarnings("unchecked") |
| T record = (T) _zkClient.deserialize(cb._data, paths.get(i)); |
| records.set(i, record); |
| if (stats != null) { |
| stats.set(i, cb._stat); |
| } |
| } else if (Code.get(cb.getRc()) != Code.NONODE && throwException) { |
| throw new HelixMetaDataAccessException(String.format("Failed to read node %s", paths.get(i))); |
| } else { |
| pathFailToRead.put(paths.get(i), cb.getRc()); |
| } |
| } |
| if (pathFailToRead.size() > 0) { |
| LOG.warn("Fail to read record for paths: " + pathFailToRead); |
| } |
| return records; |
| } catch (Exception e) { |
| throw new HelixMetaDataAccessException(String.format("Fail to read nodes for %s", paths)); |
| } finally { |
| long endT = System.nanoTime(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("getData_async, size: " + paths.size() + ", paths: " + paths.get(0) |
| + ",... time: " + (endT - startT) + " ns"); |
| } |
| } |
| } |
| |
| /** |
| * asyn getChildren |
| * The retryCount and retryInterval will be ignored. |
| */ |
| // TODO: Change the behavior of getChildren when Helix starts migrating API. |
| @Override |
| public List<T> getChildren(String parentPath, List<Stat> stats, int options) { |
| return getChildren(parentPath, stats, options, false); |
| } |
| |
| @Override |
| public List<T> getChildren(String parentPath, List<Stat> stats, int options, int retryCount, |
| int retryInterval) throws HelixException { |
| int readCount = retryCount + 1; |
| while (readCount > 0) { |
| try { |
| readCount--; |
| List<T> records = getChildren(parentPath, stats, options, true); |
| return records; |
| } catch (HelixMetaDataAccessException e) { |
| if (readCount == 0) { |
| throw new HelixMetaDataAccessException(String.format("Failed to get full list of %s", parentPath), e); |
| } |
| try { |
| Thread.sleep(retryInterval); |
| } catch (InterruptedException interruptedException) { |
| throw new HelixMetaDataAccessException("Fail to interrupt the sleep", interruptedException); |
| } |
| } |
| } |
| |
| // Impossible to reach end |
| return null; |
| } |
| |
| private List<T> getChildren(String parentPath, List<Stat> stats, int options, |
| boolean throwException) { |
| try { |
| // prepare child paths |
| List<String> childNames = getChildNames(parentPath, options); |
| if (childNames == null || childNames.size() == 0) { |
| return Collections.emptyList(); |
| } |
| |
| List<String> paths = new ArrayList<>(); |
| for (String childName : childNames) { |
| String path = parentPath + "/" + childName; |
| paths.add(path); |
| } |
| |
| // remove null record |
| List<Stat> curStats = new ArrayList<>(paths.size()); |
| boolean[] needRead = new boolean[paths.size()]; |
| Arrays.fill(needRead, true); |
| List<T> records = get(paths, curStats, needRead, throwException); |
| Iterator<T> recordIter = records.iterator(); |
| Iterator<Stat> statIter = curStats.iterator(); |
| while (statIter.hasNext()) { |
| recordIter.next(); |
| if (statIter.next() == null) { |
| statIter.remove(); |
| recordIter.remove(); |
| } |
| } |
| |
| if (stats != null) { |
| stats.clear(); |
| stats.addAll(curStats); |
| } |
| |
| return records; |
| } catch (ZkNoNodeException e) { |
| return Collections.emptyList(); |
| } |
| } |
| |
| /** |
| * sync getChildNames |
| * @return null if parentPath doesn't exist |
| */ |
| @Override |
| public List<String> getChildNames(String parentPath, int options) { |
| try { |
| List<String> childNames = _zkClient.getChildren(parentPath); |
| Collections.sort(childNames); |
| return childNames; |
| } catch (ZkNoNodeException e) { |
| return null; |
| } |
| } |
| |
| /** |
| * sync exists |
| */ |
| @Override |
| public boolean exists(String path, int options) { |
| return _zkClient.exists(path); |
| } |
| |
| /** |
| * sync getStat |
| */ |
| @Override |
| public Stat getStat(String path, int options) { |
| return _zkClient.getStat(path); |
| } |
| |
| /** |
| * Sync remove. it tries to remove the ZNode and all its descendants if any, node does not exist |
| * is regarded as success |
| */ |
| @Override |
| public boolean remove(String path, int options) { |
| try { |
| // operation will not throw exception when path successfully deleted or does not exist |
| // despite real error, operation will throw exception when path not empty, and in this |
| // case, we try to delete recursively |
| _zkClient.delete(path); |
| } catch (ZkException e) { |
| LOG.debug("Failed to delete {} with opts {}, err: {}. Try recursive delete", path, options, |
| e.getMessage()); |
| try { |
| _zkClient.deleteRecursively(path); |
| } catch (HelixException he) { |
| LOG.error("Failed to delete {} recursively with opts {}.", path, options, he); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * async create. give up on error other than NONODE |
| */ |
| CreateCallbackHandler[] create(List<String> paths, List<T> records, boolean[] needCreate, |
| List<List<String>> pathsCreated, int options) { |
| if ((records != null && records.size() != paths.size()) || needCreate.length != paths.size() |
| || (pathsCreated != null && pathsCreated.size() != paths.size())) { |
| throw new IllegalArgumentException( |
| "paths, records, needCreate, and pathsCreated should be of same size"); |
| } |
| |
| CreateCallbackHandler[] cbList = new CreateCallbackHandler[paths.size()]; |
| |
| CreateMode mode = AccessOption.getMode(options); |
| if (mode == null) { |
| LOG.error("Invalid async set mode. options: " + options); |
| return cbList; |
| } |
| |
| boolean retry; |
| do { |
| retry = false; |
| |
| for (int i = 0; i < paths.size(); i++) { |
| if (!needCreate[i]) |
| continue; |
| |
| String path = paths.get(i); |
| T record = records == null ? null : records.get(i); |
| cbList[i] = new CreateCallbackHandler(); |
| _zkClient.asyncCreate(path, record, mode, cbList[i]); |
| } |
| |
| List<String> parentPaths = |
| new ArrayList<>(Collections.<String>nCopies(paths.size(), null)); |
| boolean failOnNoNode = false; |
| |
| for (int i = 0; i < paths.size(); i++) { |
| if (!needCreate[i]) |
| continue; |
| |
| CreateCallbackHandler cb = cbList[i]; |
| cb.waitForSuccess(); |
| String path = paths.get(i); |
| |
| if (Code.get(cb.getRc()) == Code.NONODE) { |
| String parentPath = HelixUtil.getZkParentPath(path); |
| parentPaths.set(i, parentPath); |
| failOnNoNode = true; |
| } else { |
| // if create succeed or fail on error other than NONODE, |
| // give up |
| needCreate[i] = false; |
| |
| // if succeeds, record what paths we've created |
| if (Code.get(cb.getRc()) == Code.OK && pathsCreated != null) { |
| if (pathsCreated.get(i) == null) { |
| pathsCreated.set(i, new ArrayList<String>()); |
| } |
| pathsCreated.get(i).add(path); |
| } |
| } |
| } |
| |
| if (failOnNoNode) { |
| boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length); |
| |
| CreateCallbackHandler[] parentCbList = |
| create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT); |
| for (int i = 0; i < parentCbList.length; i++) { |
| CreateCallbackHandler parentCb = parentCbList[i]; |
| if (parentCb == null) |
| continue; |
| |
| Code rc = Code.get(parentCb.getRc()); |
| |
| // if parent is created, retry create child |
| if (rc == Code.OK || rc == Code.NODEEXISTS) { |
| retry = true; |
| break; |
| } |
| } |
| } |
| } while (retry); |
| |
| return cbList; |
| } |
| |
| /** |
| * async create |
| * TODO: rename to create |
| */ |
| @Override |
| public boolean[] createChildren(List<String> paths, List<T> records, int options) { |
| boolean[] success = new boolean[paths.size()]; |
| |
| CreateMode mode = AccessOption.getMode(options); |
| if (mode == null) { |
| LOG.error("Invalid async create mode. options: " + options); |
| return success; |
| } |
| |
| boolean[] needCreate = new boolean[paths.size()]; |
| Arrays.fill(needCreate, true); |
| List<List<String>> pathsCreated = |
| new ArrayList<>(Collections.<List<String>>nCopies(paths.size(), null)); |
| |
| long startT = System.nanoTime(); |
| try { |
| |
| CreateCallbackHandler[] cbList = create(paths, records, needCreate, pathsCreated, options); |
| |
| for (int i = 0; i < cbList.length; i++) { |
| CreateCallbackHandler cb = cbList[i]; |
| success[i] = (Code.get(cb.getRc()) == Code.OK); |
| } |
| |
| return success; |
| |
| } finally { |
| long endT = System.nanoTime(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("create_async, size: " + paths.size() + ", paths: " + paths.get(0) |
| + ",... time: " + (endT - startT) + " ns"); |
| } |
| } |
| } |
| |
| /** |
| * async set |
| * TODO: rename to set |
| */ |
| @Override |
| public boolean[] setChildren(List<String> paths, List<T> records, int options) { |
| return set(paths, records, null, null, options); |
| } |
| |
| /** |
| * async set, give up on error other than NoNode |
| */ |
| boolean[] set(List<String> paths, List<T> records, List<List<String>> pathsCreated, |
| List<Stat> stats, int options) { |
| if (paths == null || paths.size() == 0) { |
| return new boolean[0]; |
| } |
| |
| if ((records != null && records.size() != paths.size()) |
| || (pathsCreated != null && pathsCreated.size() != paths.size())) { |
| throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size"); |
| } |
| |
| boolean[] success = new boolean[paths.size()]; |
| |
| CreateMode mode = AccessOption.getMode(options); |
| if (mode == null) { |
| LOG.error("Invalid async set mode. options: " + options); |
| return success; |
| } |
| |
| List<Stat> setStats = new ArrayList<>(Collections.<Stat>nCopies(paths.size(), null)); |
| SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()]; |
| CreateCallbackHandler[] createCbList = null; |
| boolean[] needSet = new boolean[paths.size()]; |
| Arrays.fill(needSet, true); |
| |
| long startT = System.nanoTime(); |
| |
| try { |
| boolean retry; |
| do { |
| retry = false; |
| |
| for (int i = 0; i < paths.size(); i++) { |
| if (!needSet[i]) |
| continue; |
| |
| String path = paths.get(i); |
| T record = records.get(i); |
| cbList[i] = new SetDataCallbackHandler(); |
| _zkClient.asyncSetData(path, record, -1, cbList[i]); |
| |
| } |
| |
| boolean failOnNoNode = false; |
| |
| for (int i = 0; i < cbList.length; i++) { |
| SetDataCallbackHandler cb = cbList[i]; |
| cb.waitForSuccess(); |
| Code rc = Code.get(cb.getRc()); |
| switch (rc) { |
| case OK: |
| setStats.set(i, cb.getStat()); |
| needSet[i] = false; |
| break; |
| case NONODE: |
| // if fail on NoNode, try create the node |
| failOnNoNode = true; |
| break; |
| default: |
| // if fail on error other than NoNode, give up |
| needSet[i] = false; |
| break; |
| } |
| } |
| |
| // if failOnNoNode, try create |
| if (failOnNoNode) { |
| boolean[] needCreate = Arrays.copyOf(needSet, needSet.length); |
| createCbList = create(paths, records, needCreate, pathsCreated, options); |
| for (int i = 0; i < createCbList.length; i++) { |
| CreateCallbackHandler createCb = createCbList[i]; |
| if (createCb == null) { |
| continue; |
| } |
| |
| Code rc = Code.get(createCb.getRc()); |
| switch (rc) { |
| case OK: |
| setStats.set(i, ZNode.ZERO_STAT); |
| needSet[i] = false; |
| break; |
| case NODEEXISTS: |
| retry = true; |
| break; |
| default: |
| // if creation fails on error other than NodeExists |
| // no need to retry set |
| needSet[i] = false; |
| break; |
| } |
| } |
| } |
| } while (retry); |
| |
| // construct return results |
| for (int i = 0; i < cbList.length; i++) { |
| SetDataCallbackHandler cb = cbList[i]; |
| |
| Code rc = Code.get(cb.getRc()); |
| if (rc == Code.OK) { |
| success[i] = true; |
| } else if (rc == Code.NONODE) { |
| CreateCallbackHandler createCb = createCbList[i]; |
| if (Code.get(createCb.getRc()) == Code.OK) { |
| success[i] = true; |
| } |
| } |
| } |
| |
| if (stats != null) { |
| stats.clear(); |
| stats.addAll(setStats); |
| } |
| |
| return success; |
| } finally { |
| long endT = System.nanoTime(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0) |
| + ",... time: " + (endT - startT) + " ns"); |
| } |
| } |
| } |
| |
| // TODO: rename to update |
| /** |
| * async update |
| */ |
| @Override |
| public boolean[] updateChildren(List<String> paths, List<DataUpdater<T>> updaters, int options) { |
| |
| List<T> updateData = update(paths, updaters, null, null, options); |
| boolean[] success = new boolean[paths.size()]; // init to false |
| for (int i = 0; i < paths.size(); i++) { |
| T data = updateData.get(i); |
| success[i] = (data != null); |
| } |
| return success; |
| } |
| |
| /** |
| * async update |
| * return: updatedData on success or null on fail |
| */ |
| List<T> update(List<String> paths, List<DataUpdater<T>> updaters, |
| List<List<String>> pathsCreated, List<Stat> stats, int options) { |
| if (paths == null || paths.size() == 0) { |
| LOG.error("paths is null or empty"); |
| return Collections.emptyList(); |
| } |
| |
| if (updaters.size() != paths.size() |
| || (pathsCreated != null && pathsCreated.size() != paths.size())) { |
| throw new IllegalArgumentException("paths, updaters, and pathsCreated should be of same size"); |
| } |
| |
| List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null)); |
| List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null)); |
| |
| CreateMode mode = AccessOption.getMode(options); |
| if (mode == null) { |
| LOG.error("Invalid update mode. options: " + options); |
| return updateData; |
| } |
| |
| SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()]; |
| CreateCallbackHandler[] createCbList = null; |
| boolean[] needUpdate = new boolean[paths.size()]; |
| Arrays.fill(needUpdate, true); |
| |
| long startT = System.nanoTime(); |
| |
| try { |
| boolean retry; |
| do { |
| retry = false; |
| boolean[] needCreate = new boolean[paths.size()]; // init'ed with false |
| boolean failOnNoNode = false; |
| |
| // asycn read all data |
| List<Stat> curStats = new ArrayList<Stat>(); |
| List<T> curDataList = |
| get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length), false); |
| |
| // async update |
| List<T> newDataList = new ArrayList<T>(); |
| for (int i = 0; i < paths.size(); i++) { |
| if (!needUpdate[i]) { |
| newDataList.add(null); |
| continue; |
| } |
| String path = paths.get(i); |
| DataUpdater<T> updater = updaters.get(i); |
| T newData = updater.update(curDataList.get(i)); |
| newDataList.add(newData); |
| if (newData == null) { |
| // No need to create or update if the updater does not return a new version |
| continue; |
| } |
| Stat curStat = curStats.get(i); |
| if (curStat == null) { |
| // node not exists |
| failOnNoNode = true; |
| needCreate[i] = true; |
| } else { |
| cbList[i] = new SetDataCallbackHandler(); |
| _zkClient.asyncSetData(path, newData, curStat.getVersion(), cbList[i]); |
| } |
| } |
| |
| // wait for completion |
| boolean failOnBadVersion = false; |
| |
| for (int i = 0; i < paths.size(); i++) { |
| SetDataCallbackHandler cb = cbList[i]; |
| if (cb == null) |
| continue; |
| |
| cb.waitForSuccess(); |
| |
| switch (Code.get(cb.getRc())) { |
| case OK: |
| updateData.set(i, newDataList.get(i)); |
| setStats.set(i, cb.getStat()); |
| needUpdate[i] = false; |
| break; |
| case NONODE: |
| failOnNoNode = true; |
| needCreate[i] = true; |
| break; |
| case BADVERSION: |
| failOnBadVersion = true; |
| break; |
| default: |
| // if fail on error other than NoNode or BadVersion |
| // will not retry |
| needUpdate[i] = false; |
| break; |
| } |
| } |
| |
| // if failOnNoNode, try create |
| if (failOnNoNode) { |
| createCbList = create(paths, newDataList, needCreate, pathsCreated, options); |
| for (int i = 0; i < paths.size(); i++) { |
| CreateCallbackHandler createCb = createCbList[i]; |
| if (createCb == null) { |
| continue; |
| } |
| |
| switch (Code.get(createCb.getRc())) { |
| case OK: |
| needUpdate[i] = false; |
| updateData.set(i, newDataList.get(i)); |
| setStats.set(i, ZNode.ZERO_STAT); |
| break; |
| case NODEEXISTS: |
| retry = true; |
| break; |
| default: |
| // if fail on error other than NodeExists |
| // will not retry |
| needUpdate[i] = false; |
| break; |
| } |
| } |
| } |
| |
| // if failOnBadVersion, retry |
| if (failOnBadVersion) { |
| retry = true; |
| } |
| } while (retry); |
| |
| if (stats != null) { |
| stats.clear(); |
| stats.addAll(setStats); |
| } |
| |
| return updateData; |
| } finally { |
| long endT = System.nanoTime(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0) |
| + ",... time: " + (endT - startT) + " ns"); |
| } |
| } |
| |
| } |
| |
| /** |
| * async exists |
| */ |
| @Override |
| public boolean[] exists(List<String> paths, int options) { |
| Stat[] stats = getStats(paths, options); |
| |
| boolean[] exists = new boolean[paths.size()]; |
| for (int i = 0; i < paths.size(); i++) { |
| exists[i] = (stats[i] != null); |
| } |
| |
| return exists; |
| } |
| |
| /** |
| * async getStat |
| */ |
| @Override |
| public Stat[] getStats(List<String> paths, int options) { |
| if (paths == null || paths.size() == 0) { |
| LOG.error("paths is null or empty"); |
| return new Stat[0]; |
| } |
| |
| Stat[] stats = new Stat[paths.size()]; |
| |
| long startT = System.nanoTime(); |
| |
| try { |
| ExistsCallbackHandler[] cbList = new ExistsCallbackHandler[paths.size()]; |
| for (int i = 0; i < paths.size(); i++) { |
| String path = paths.get(i); |
| cbList[i] = new ExistsCallbackHandler(); |
| _zkClient.asyncExists(path, cbList[i]); |
| } |
| |
| for (int i = 0; i < cbList.length; i++) { |
| ExistsCallbackHandler cb = cbList[i]; |
| cb.waitForSuccess(); |
| stats[i] = cb._stat; |
| } |
| |
| return stats; |
| } finally { |
| long endT = System.nanoTime(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("exists_async, size: " + paths.size() + ", paths: " + paths.get(0) |
| + ",... time: " + (endT - startT) + " ns"); |
| } |
| } |
| } |
| |
| /** |
| * async remove |
| */ |
| @Override |
| public boolean[] remove(List<String> paths, int options) { |
| if (paths == null || paths.size() == 0) { |
| return new boolean[0]; |
| } |
| |
| boolean[] success = new boolean[paths.size()]; |
| |
| DeleteCallbackHandler[] cbList = new DeleteCallbackHandler[paths.size()]; |
| |
| long startT = System.nanoTime(); |
| |
| try { |
| for (int i = 0; i < paths.size(); i++) { |
| String path = paths.get(i); |
| cbList[i] = new DeleteCallbackHandler(); |
| _zkClient.asyncDelete(path, cbList[i]); |
| } |
| |
| for (int i = 0; i < cbList.length; i++) { |
| DeleteCallbackHandler cb = cbList[i]; |
| cb.waitForSuccess(); |
| success[i] = (cb.getRc() == 0); |
| } |
| |
| return success; |
| } finally { |
| long endT = System.nanoTime(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("delete_async, size: " + paths.size() + ", paths: " + paths.get(0) |
| + ",... time: " + (endT - startT) + " ns"); |
| } |
| } |
| } |
| |
| /** |
| * Subscribe to zookeeper data changes |
| */ |
| @Override |
| public void subscribeDataChanges(String path, IZkDataListener listener) { |
| _zkClient.subscribeDataChanges(path, listener); |
| } |
| |
| /** |
| * Unsubscribe to zookeeper data changes |
| */ |
| @Override |
| public void unsubscribeDataChanges(String path, IZkDataListener dataListener) { |
| _zkClient.unsubscribeDataChanges(path, dataListener); |
| } |
| |
| /** |
| * Subscrie to zookeeper data changes |
| */ |
| @Override |
| public List<String> subscribeChildChanges(String path, IZkChildListener listener) { |
| return _zkClient.subscribeChildChanges(path, listener); |
| } |
| |
| /** |
| * Unsubscrie to zookeeper data changes |
| */ |
| @Override |
| public void unsubscribeChildChanges(String path, IZkChildListener childListener) { |
| _zkClient.unsubscribeChildChanges(path, childListener); |
| } |
| |
| /** |
| * Reset |
| */ |
| @Override |
| public void reset() { |
| // Nothing to do |
| } |
| } |