| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.accumulo.fate.zookeeper; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| |
| import java.util.Arrays; |
| import java.util.List; |
| |
| import org.apache.accumulo.core.conf.AccumuloConfiguration; |
| import org.apache.accumulo.core.conf.Property; |
| import org.apache.accumulo.fate.util.Retry; |
| import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; |
| import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; |
| import org.apache.accumulo.fate.zookeeper.ZooUtil.ZooKeeperConnectionInfo; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.Code; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ZooReaderWriter extends ZooReader implements IZooReaderWriter { |
| private static final Logger log = LoggerFactory.getLogger(ZooReaderWriter.class); |
| private static final String SCHEME = "digest"; |
| private static final String USER = "accumulo"; |
| |
| private static ZooReaderWriter instance = null; |
| private final String scheme; |
| private final byte[] auth; |
| private final ZooKeeperConnectionInfo info; |
| |
| @Override |
| public ZooKeeper getZooKeeper() { |
| return getSession(keepers, timeout, scheme, auth); |
| } |
| |
| public ZooReaderWriter(AccumuloConfiguration conf) { |
| this(conf.get(Property.INSTANCE_ZK_HOST), |
| (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT), |
| conf.get(Property.INSTANCE_SECRET)); |
| } |
| |
| public ZooReaderWriter(String string, int timeInMillis, String secret) { |
| this(string, timeInMillis, SCHEME, (USER + ":" + secret).getBytes(UTF_8)); |
| } |
| |
| public ZooReaderWriter(String string, int timeInMillis, String scheme, byte[] auth) { |
| super(string, timeInMillis); |
| this.scheme = scheme; |
| this.auth = Arrays.copyOf(auth, auth.length); |
| this.info = new ZooKeeperConnectionInfo(string, timeInMillis, scheme, this.auth); |
| } |
| |
| @Override |
| public void recursiveDelete(String zPath, NodeMissingPolicy policy) |
| throws KeeperException, InterruptedException { |
| ZooUtil.recursiveDelete(info, zPath, policy); |
| } |
| |
| @Override |
| public boolean putPersistentData(String zPath, byte[] data, NodeExistsPolicy policy) |
| throws KeeperException, InterruptedException { |
| return ZooUtil.putPersistentData(info, zPath, data, policy); |
| } |
| |
| @Override |
| public boolean putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy, |
| List<ACL> acls) throws KeeperException, InterruptedException { |
| return ZooUtil.putPersistentData(info, zPath, data, version, policy, acls); |
| } |
| |
| @Override |
| public boolean putPrivatePersistentData(String zPath, byte[] data, NodeExistsPolicy policy) |
| throws KeeperException, InterruptedException { |
| return ZooUtil.putPrivatePersistentData(info, zPath, data, policy); |
| } |
| |
| @Override |
| public void putPersistentData(String zPath, byte[] data, int version, NodeExistsPolicy policy) |
| throws KeeperException, InterruptedException { |
| ZooUtil.putPersistentData(info, zPath, data, version, policy); |
| } |
| |
| @Override |
| public String putPersistentSequential(String zPath, byte[] data) |
| throws KeeperException, InterruptedException { |
| return ZooUtil.putPersistentSequential(info, zPath, data); |
| } |
| |
| @Override |
| public String putEphemeralData(String zPath, byte[] data) |
| throws KeeperException, InterruptedException { |
| return ZooUtil.putEphemeralData(info, zPath, data); |
| } |
| |
| @Override |
| public String putEphemeralSequential(String zPath, byte[] data) |
| throws KeeperException, InterruptedException { |
| return ZooUtil.putEphemeralSequential(info, zPath, data); |
| } |
| |
| @Override |
| public void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) |
| throws KeeperException, InterruptedException { |
| ZooUtil.recursiveCopyPersistent(info, source, destination, policy); |
| } |
| |
| @Override |
| public void delete(String path, int version) throws InterruptedException, KeeperException { |
| final Retry retry = getRetryFactory().createRetry(); |
| while (true) { |
| try { |
| getZooKeeper().delete(path, version); |
| return; |
| } catch (KeeperException e) { |
| final Code code = e.code(); |
| if (code == Code.NONODE) { |
| if (retry.hasRetried()) { |
| // A retried delete could have deleted the node, assume that was the case |
| log.debug("Delete saw no node on a retry. Assuming node was deleted"); |
| return; |
| } |
| |
| throw e; |
| } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT |
| || code == Code.SESSIONEXPIRED) { |
| // retry if we have more attempts to do so |
| retryOrThrow(retry, e); |
| } else { |
| throw e; |
| } |
| } |
| |
| retry.waitForNextAttempt(); |
| } |
| } |
| |
| @Override |
| public byte[] mutate(String zPath, byte[] createValue, List<ACL> acl, Mutator mutator) |
| throws Exception { |
| if (createValue != null) { |
| while (true) { |
| final Retry retry = getRetryFactory().createRetry(); |
| try { |
| getZooKeeper().create(zPath, createValue, acl, CreateMode.PERSISTENT); |
| return createValue; |
| } catch (KeeperException ex) { |
| final Code code = ex.code(); |
| if (code == Code.NODEEXISTS) { |
| // expected |
| break; |
| } else if (code == Code.OPERATIONTIMEOUT || code == Code.CONNECTIONLOSS |
| || code == Code.SESSIONEXPIRED) { |
| retryOrThrow(retry, ex); |
| } else { |
| throw ex; |
| } |
| } |
| |
| retry.waitForNextAttempt(); |
| } |
| } |
| do { |
| final Retry retry = getRetryFactory().createRetry(); |
| Stat stat = new Stat(); |
| byte[] data = getData(zPath, false, stat); |
| data = mutator.mutate(data); |
| if (data == null) |
| return data; |
| try { |
| getZooKeeper().setData(zPath, data, stat.getVersion()); |
| return data; |
| } catch (KeeperException ex) { |
| final Code code = ex.code(); |
| if (code == Code.BADVERSION) { |
| // Retry, but don't increment. This makes it backwards compatible with the infinite |
| // loop that previously happened. I'm not sure if that's really desirable though. |
| } else if (code == Code.OPERATIONTIMEOUT || code == Code.CONNECTIONLOSS |
| || code == Code.SESSIONEXPIRED) { |
| retryOrThrow(retry, ex); |
| retry.waitForNextAttempt(); |
| } else { |
| throw ex; |
| } |
| } |
| } while (true); |
| } |
| |
| public static synchronized ZooReaderWriter getInstance(String zookeepers, int timeInMillis, |
| String scheme, byte[] auth) { |
| if (instance == null) |
| instance = new ZooReaderWriter(zookeepers, timeInMillis, scheme, auth); |
| return instance; |
| } |
| |
| @Override |
| public boolean isLockHeld(ZooUtil.LockID lockID) throws KeeperException, InterruptedException { |
| return ZooUtil.isLockHeld(info, lockID); |
| } |
| |
| @Override |
| public void mkdirs(String path) throws KeeperException, InterruptedException { |
| if (path.equals("")) |
| return; |
| if (!path.startsWith("/")) |
| throw new IllegalArgumentException(path + "does not start with /"); |
| if (exists(path)) |
| return; |
| String parent = path.substring(0, path.lastIndexOf("/")); |
| mkdirs(parent); |
| putPersistentData(path, new byte[] {}, NodeExistsPolicy.SKIP); |
| } |
| |
| } |