| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.accumulo.fate.zookeeper; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static java.util.Objects.requireNonNull; |
| |
| import java.util.List; |
| |
| import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; |
| import org.apache.accumulo.core.conf.AccumuloConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; |
| import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.Code; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Stat; |
| |
| public class ZooReaderWriter extends ZooReader { |
| public interface Mutator { |
| byte[] mutate(byte[] currentValue) throws AcceptableThriftTableOperationException; |
| } |
| |
| public ZooReaderWriter(AccumuloConfiguration conf) { |
| this(conf.get(Property.INSTANCE_ZK_HOST), |
| (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), |
| conf.get(Property.INSTANCE_SECRET)); |
| } |
| |
| private final byte[] auth; |
| |
| public ZooReaderWriter(String keepers, int timeoutInMillis, String secret) { |
| super(keepers, timeoutInMillis); |
| this.auth = ("accumulo" + ":" + secret).getBytes(UTF_8); |
| } |
| |
| @Override |
| public ZooKeeper getZooKeeper() { |
| return ZooSession.getAuthenticatedSession(keepers, timeout, "digest", auth); |
| } |
| |
| /** |
| * Retrieve the ACL list that was on the node |
| */ |
| public List<ACL> getACL(String zPath) throws KeeperException, InterruptedException { |
| return retryLoop(zk -> zk.getACL(zPath, null)); |
| } |
| |
| /** |
| * Create a persistent node with the default ACL |
| * |
| * @return true if the data was set on a new node or overwritten, and false if an existing node |
| * was skipped |
| */ |
| public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) |
| throws KeeperException, InterruptedException { |
| return putPersistentData(zPath, data, policy, ZooUtil.PUBLIC); |
| } |
| |
| /** |
| * Create a persistent node with the private ACL |
| * |
| * @return true if the data was set on a new node or overwritten, and false if an existing node |
| * was skipped |
| */ |
| public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) |
| throws KeeperException, InterruptedException { |
| return putPersistentData(zPath, data, policy, ZooUtil.PRIVATE); |
| } |
| |
| /** |
| * Create a persistent node with the provided ACLs |
| * |
| * @return true if the data was set on a new node or overwritten, and false if an existing node |
| * was skipped |
| */ |
| public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy, |
| List<ACL> acls) throws KeeperException, InterruptedException { |
| // zk allows null ACLs, but it's probably a bug in Accumulo if we see it used in our code |
| requireNonNull(acls); |
| requireNonNull(policy); |
| return retryLoop(zk -> { |
| try { |
| zk.create(zPath, data, acls, CreateMode.PERSISTENT); |
| return true; |
| } catch (KeeperException e) { |
| if (e.code() == Code.NODEEXISTS) { |
| switch (policy) { |
| case SKIP: |
| return false; |
| case OVERWRITE: |
| zk.setData(zPath, data, -1); |
| return true; |
| case FAIL: |
| default: |
| // re-throw below |
| } |
| } |
| throw e; |
| } |
| }, |
| // if OVERWRITE policy is used, create() can fail with NODEEXISTS; |
| // then, the node can be deleted, causing setData() to fail with NONODE; |
| // if that happens, the following code ensures we retry |
| e -> e.code() == Code.NONODE && policy == NodeExistsPolicy.OVERWRITE); |
| } |
| |
| /** |
| * Create a persistent sequential node with the default ACL |
| * |
| * @return the actual path of the created node |
| */ |
| public String putPersistentSequential(String zPath, byte[] data) |
| throws KeeperException, InterruptedException { |
| return retryLoop( |
| zk -> zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL)); |
| } |
| |
| /** |
| * Create an ephemeral node with the default ACL |
| */ |
| public void putEphemeralData(String zPath, byte[] data) |
| throws KeeperException, InterruptedException { |
| retryLoop(zk -> zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL)); |
| } |
| |
| /** |
| * Create an ephemeral sequential node with the default ACL |
| * |
| * @return the actual path of the created node |
| */ |
| public String putEphemeralSequential(String zPath, byte[] data) |
| throws KeeperException, InterruptedException { |
| return retryLoop(zk -> zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL)); |
| } |
| |
| /** |
| * Recursively copy any persistent data from the source to the destination, using the default ACL |
| * to create any missing nodes and skipping over any ephemeral data. |
| */ |
| public void recursiveCopyPersistentOverwrite(String source, String destination) |
| throws KeeperException, InterruptedException { |
| var stat = new Stat(); |
| byte[] data = getData(source, stat); |
| // only copy persistent data |
| if (stat.getEphemeralOwner() != 0) { |
| return; |
| } |
| putPersistentData(destination, data, NodeExistsPolicy.OVERWRITE); |
| if (stat.getNumChildren() > 0) { |
| for (String child : getChildren(source)) { |
| recursiveCopyPersistentOverwrite(source + "/" + child, destination + "/" + child); |
| } |
| } |
| } |
| |
| /** |
| * Update an existing ZK node using the provided mutator function. If it's possible the node |
| * doesn't exist yet, use {@link #mutateOrCreate(String, byte[], Mutator)} instead. |
| * |
| * @return the value set on the node |
| */ |
| public byte[] mutateExisting(String zPath, Mutator mutator) |
| throws KeeperException, InterruptedException, AcceptableThriftTableOperationException { |
| requireNonNull(mutator); |
| return retryLoopMutator(zk -> { |
| var stat = new Stat(); |
| byte[] data = zk.getData(zPath, null, stat); |
| // this mutator can throw AcceptableThriftTableOperationException |
| data = mutator.mutate(data); |
| if (data != null) { |
| zk.setData(zPath, data, stat.getVersion()); |
| } |
| return data; |
| }, e -> e.code() == Code.BADVERSION); // always retry if bad version |
| } |
| |
| /** |
| * Create a new {@link CreateMode#PERSISTENT} ZK node with the default ACL if it does not exist. |
| * If it does already exist, then update it with the provided mutator function. If it is known to |
| * exist already, use {@link #mutateExisting(String, Mutator)} instead. |
| * |
| * @return the value set on the node |
| */ |
| public byte[] mutateOrCreate(String zPath, byte[] createValue, Mutator mutator) |
| throws KeeperException, InterruptedException, AcceptableThriftTableOperationException { |
| requireNonNull(mutator); |
| return putPersistentData(zPath, createValue, NodeExistsPolicy.SKIP) ? createValue |
| : mutateExisting(zPath, mutator); |
| } |
| |
| /** |
| * Ensure the provided path exists, using persistent nodes, empty data, and the default ACL for |
| * any missing path elements. |
| */ |
| public void mkdirs(String path) throws KeeperException, InterruptedException { |
| if (path.equals("")) { |
| // terminal condition for recursion |
| return; |
| } |
| if (!path.startsWith("/")) { |
| throw new IllegalArgumentException(path + "does not start with /"); |
| } |
| if (exists(path)) { |
| return; |
| } |
| String parent = path.substring(0, path.lastIndexOf("/")); |
| mkdirs(parent); |
| putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); |
| } |
| |
| /** |
| * Delete the specified node, and ignore NONODE exceptions. |
| */ |
| public void delete(String path) throws KeeperException, InterruptedException { |
| retryLoop(zk -> { |
| try { |
| zk.delete(path, -1); |
| } catch (KeeperException e) { |
| // ignore the case where the node doesn't exist |
| if (e.code() != Code.NONODE) { |
| throw e; |
| } |
| } |
| return null; |
| }); |
| } |
| |
| /** |
| * This method will delete a node and all its children. |
| */ |
| public void recursiveDelete(String zPath, NodeMissingPolicy policy) |
| throws KeeperException, InterruptedException { |
| ZooUtil.recursiveDelete(getZooKeeper(), zPath, policy); |
| } |
| } |