blob: f89c8fd0ef741a6dfcb099de23ad570e76e3205c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A zookeeper that can handle 'recoverable' errors.
* To handle recoverable errors, developers need to realize that there are two
* classes of requests: idempotent and non-idempotent requests. Read requests
* and unconditional sets and deletes are examples of idempotent requests, they
* can be reissued with the same results.
* (Although, the delete may throw a NoNodeException on reissue its effect on
* the ZooKeeper state is the same.) Non-idempotent requests need special
* handling, application and library writers need to keep in mind that they may
* need to encode information in the data or name of znodes to detect
* retries. A simple example is a create that uses a sequence flag.
* If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
* loss exception, that process will reissue another
* create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
* getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
* that x-109 was the result of the previous create, so the process actually
* owns both x-109 and x-111. An easy way around this is to use "x-process id-"
* when doing the create. If the process is using an id of 352, before reissuing
* the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
* "x-352-109", x-333-110". The process will know that the original create
* succeeded an the znode it created is "x-352-109".
* @see "https://cwiki.apache.org/confluence/display/HADOOP2/ZooKeeper+ErrorHandling"
*/
@InterfaceAudience.Private
public class RecoverableZooKeeper {
private static final Logger LOG = LoggerFactory.getLogger(RecoverableZooKeeper.class);
// the actual ZooKeeper client instance
private ZooKeeper zk;
private final RetryCounterFactory retryCounterFactory;
// An identifier of this process in the cluster
private final String identifier;
private final byte[] id;
private final Watcher watcher;
private final int sessionTimeout;
private final String quorumServers;
private final int maxMultiSize;
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
justification="None. Its always been this way.")
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
Watcher watcher, int maxRetries, int retryIntervalMillis, int maxSleepTime, String identifier,
int maxMultiSize)
throws IOException {
// TODO: Add support for zk 'chroot'; we don't add it to the quorumServers String as we should.
this.retryCounterFactory =
new RetryCounterFactory(maxRetries+1, retryIntervalMillis, maxSleepTime);
if (identifier == null || identifier.length() == 0) {
// the identifier = processID@hostName
identifier = ManagementFactory.getRuntimeMXBean().getName();
}
LOG.info("Process identifier={} connecting to ZooKeeper ensemble={}", identifier,
quorumServers);
this.identifier = identifier;
this.id = Bytes.toBytes(identifier);
this.watcher = watcher;
this.sessionTimeout = sessionTimeout;
this.quorumServers = quorumServers;
this.maxMultiSize = maxMultiSize;
try {
checkZk();
} catch (Exception x) {
/* ignore */
}
}
/**
* Returns the maximum size (in bytes) that should be included in any single multi() call.
*
* NB: This is an approximation, so there may be variance in the msg actually sent over the
* wire. Please be sure to set this approximately, with respect to your ZK server configuration
* for jute.maxbuffer.
*/
public int getMaxMultiSizeLimit() {
return maxMultiSize;
}
/**
* Try to create a ZooKeeper connection. Turns any exception encountered into a
* KeeperException.OperationTimeoutException so it can retried.
* @return The created ZooKeeper connection object
* @throws KeeperException if a ZooKeeper operation fails
*/
protected synchronized ZooKeeper checkZk() throws KeeperException {
if (this.zk == null) {
try {
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
} catch (IOException ex) {
LOG.warn("Unable to create ZooKeeper Connection", ex);
throw new KeeperException.OperationTimeoutException();
}
}
return zk;
}
public synchronized void reconnectAfterExpiration()
throws IOException, KeeperException, InterruptedException {
if (zk != null) {
LOG.info("Closing dead ZooKeeper connection, session" +
" was: 0x"+Long.toHexString(zk.getSessionId()));
zk.close();
// reset the ZooKeeper connection
zk = null;
}
checkZk();
LOG.info("Recreated a ZooKeeper, session" +
" is: 0x"+Long.toHexString(zk.getSessionId()));
}
/**
* delete is an idempotent operation. Retry before throwing exception.
* This function will not throw NoNodeException if the path does not
* exist.
*/
public void delete(String path, int version) throws InterruptedException, KeeperException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.delete")) {
RetryCounter retryCounter = retryCounterFactory.create();
boolean isRetry = false; // False for first attempt, true for all retries.
while (true) {
try {
checkZk().delete(path, version);
return;
} catch (KeeperException e) {
switch (e.code()) {
case NONODE:
if (isRetry) {
LOG.debug("Node " + path + " already deleted. Assuming a " +
"previous attempt succeeded.");
return;
}
LOG.debug("Node {} already deleted, retry={}", path, isRetry);
throw e;
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "delete");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "delete");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
isRetry = true;
}
}
}
/**
* exists is an idempotent operation. Retry before throwing exception
* @return A Stat instance
*/
public Stat exists(String path, Watcher watcher) throws KeeperException, InterruptedException {
return exists(path, watcher, null);
}
private Stat exists(String path, Watcher watcher, Boolean watch)
throws InterruptedException, KeeperException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.exists")) {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
Stat nodeStat;
if (watch == null) {
nodeStat = checkZk().exists(path, watcher);
} else {
nodeStat = checkZk().exists(path, watch);
}
return nodeStat;
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "exists");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "exists");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
}
}
}
/**
* exists is an idempotent operation. Retry before throwing exception
* @return A Stat instance
*/
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException {
return exists(path, null, watch);
}
private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
String opName) throws KeeperException {
if (!retryCounter.shouldRetry()) {
LOG.error("ZooKeeper {} failed after {} attempts", opName, retryCounter.getMaxAttempts());
throw e;
}
LOG.debug("Retry, connectivity issue (JVM Pause?); quorum={},exception{}=", quorumServers, e);
}
/**
* getChildren is an idempotent operation. Retry before throwing exception
* @return List of children znodes
*/
public List<String> getChildren(String path, Watcher watcher)
throws KeeperException, InterruptedException {
return getChildren(path, watcher, null);
}
private List<String> getChildren(String path, Watcher watcher, Boolean watch)
throws InterruptedException, KeeperException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getChildren")) {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
List<String> children;
if (watch == null) {
children = checkZk().getChildren(path, watcher);
} else {
children = checkZk().getChildren(path, watch);
}
return children;
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "getChildren");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "getChildren");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
}
}
}
/**
* getChildren is an idempotent operation. Retry before throwing exception
* @return List of children znodes
*/
public List<String> getChildren(String path, boolean watch)
throws KeeperException, InterruptedException {
return getChildren(path, null, watch);
}
/**
* getData is an idempotent operation. Retry before throwing exception
* @return Data
*/
public byte[] getData(String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException {
return getData(path, watcher, null, stat);
}
private byte[] getData(String path, Watcher watcher, Boolean watch, Stat stat)
throws InterruptedException, KeeperException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getData")) {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
byte[] revData;
if (watch == null) {
revData = checkZk().getData(path, watcher, stat);
} else {
revData = checkZk().getData(path, watch, stat);
}
return ZKMetadata.removeMetaData(revData);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "getData");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "getData");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
}
}
}
/**
* getData is an idempotent operation. Retry before throwing exception
* @return Data
*/
public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException {
return getData(path, null, watch, stat);
}
/**
* setData is NOT an idempotent operation. Retry may cause BadVersion Exception
* Adding an identifier field into the data to check whether
* badversion is caused by the result of previous correctly setData
* @return Stat instance
*/
public Stat setData(String path, byte[] data, int version)
throws KeeperException, InterruptedException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setData")) {
RetryCounter retryCounter = retryCounterFactory.create();
byte[] newData = ZKMetadata.appendMetaData(id, data);
boolean isRetry = false;
while (true) {
try {
return checkZk().setData(path, newData, version);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "setData");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "setData");
break;
case BADVERSION:
if (isRetry) {
// try to verify whether the previous setData success or not
try{
Stat stat = new Stat();
byte[] revData = checkZk().getData(path, false, stat);
if(Bytes.compareTo(revData, newData) == 0) {
// the bad version is caused by previous successful setData
return stat;
}
} catch(KeeperException keeperException){
// the ZK is not reliable at this moment. just throwing exception
throw keeperException;
}
}
// throw other exceptions and verified bad version exceptions
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
isRetry = true;
}
}
}
/**
* getAcl is an idempotent operation. Retry before throwing exception
* @return list of ACLs
*/
public List<ACL> getAcl(String path, Stat stat)
throws KeeperException, InterruptedException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.getAcl")) {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
return checkZk().getACL(path, stat);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "getAcl");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "getAcl");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
}
}
}
/**
* setAcl is an idempotent operation. Retry before throwing exception
* @return list of ACLs
*/
public Stat setAcl(String path, List<ACL> acls, int version)
throws KeeperException, InterruptedException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.setAcl")) {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
return checkZk().setACL(path, acls, version);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "setAcl");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "setAcl");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
}
}
}
/**
* <p>
* NONSEQUENTIAL create is idempotent operation.
* Retry before throwing exceptions.
* But this function will not throw the NodeExist exception back to the
* application.
* </p>
* <p>
* But SEQUENTIAL is NOT idempotent operation. It is necessary to add
* identifier to the path to verify, whether the previous one is successful
* or not.
* </p>
*
* @return Path
*/
public String create(String path, byte[] data, List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.create")) {
byte[] newData = ZKMetadata.appendMetaData(id, data);
switch (createMode) {
case EPHEMERAL:
case PERSISTENT:
return createNonSequential(path, newData, acl, createMode);
case EPHEMERAL_SEQUENTIAL:
case PERSISTENT_SEQUENTIAL:
return createSequential(path, newData, acl, createMode);
default:
throw new IllegalArgumentException("Unrecognized CreateMode: " +
createMode);
}
}
}
private String createNonSequential(String path, byte[] data, List<ACL> acl,
CreateMode createMode) throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
boolean isRetry = false; // False for first attempt, true for all retries.
while (true) {
try {
return checkZk().create(path, data, acl, createMode);
} catch (KeeperException e) {
switch (e.code()) {
case NODEEXISTS:
if (isRetry) {
// If the connection was lost, there is still a possibility that
// we have successfully created the node at our previous attempt,
// so we read the node and compare.
byte[] currentData = checkZk().getData(path, false, null);
if (currentData != null &&
Bytes.compareTo(currentData, data) == 0) {
// We successfully created a non-sequential node
return path;
}
LOG.error("Node " + path + " already exists with " +
Bytes.toStringBinary(currentData) + ", could not write " +
Bytes.toStringBinary(data));
throw e;
}
LOG.trace("Node {} already exists", path);
throw e;
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "create");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "create");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
isRetry = true;
}
}
private String createSequential(String path, byte[] data,
List<ACL> acl, CreateMode createMode)
throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
boolean first = true;
String newPath = path+this.identifier;
while (true) {
try {
if (!first) {
// Check if we succeeded on a previous attempt
String previousResult = findPreviousSequentialNode(newPath);
if (previousResult != null) {
return previousResult;
}
}
first = false;
return checkZk().create(newPath, data, acl, createMode);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "create");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "create");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
}
}
/**
* Convert Iterable of {@link org.apache.zookeeper.Op} we got into the ZooKeeper.Op
* instances to actually pass to multi (need to do this in order to appendMetaData).
*/
private Iterable<Op> prepareZKMulti(Iterable<Op> ops) throws UnsupportedOperationException {
if(ops == null) {
return null;
}
List<Op> preparedOps = new LinkedList<>();
for (Op op : ops) {
if (op.getType() == ZooDefs.OpCode.create) {
CreateRequest create = (CreateRequest)op.toRequestRecord();
preparedOps.add(Op.create(create.getPath(), ZKMetadata.appendMetaData(id, create.getData()),
create.getAcl(), create.getFlags()));
} else if (op.getType() == ZooDefs.OpCode.delete) {
// no need to appendMetaData for delete
preparedOps.add(op);
} else if (op.getType() == ZooDefs.OpCode.setData) {
SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
preparedOps.add(Op.setData(setData.getPath(),
ZKMetadata.appendMetaData(id, setData.getData()), setData.getVersion()));
} else {
throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
}
}
return preparedOps;
}
/**
* Run multiple operations in a transactional manner. Retry before throwing exception
*/
public List<OpResult> multi(Iterable<Op> ops)
throws KeeperException, InterruptedException {
try (TraceScope scope = TraceUtil.createTrace("RecoverableZookeeper.multi")) {
RetryCounter retryCounter = retryCounterFactory.create();
Iterable<Op> multiOps = prepareZKMulti(ops);
while (true) {
try {
return checkZk().multi(multiOps);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
retryOrThrow(retryCounter, e, "multi");
break;
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "multi");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
}
}
}
private String findPreviousSequentialNode(String path)
throws KeeperException, InterruptedException {
int lastSlashIdx = path.lastIndexOf('/');
assert(lastSlashIdx != -1);
String parent = path.substring(0, lastSlashIdx);
String nodePrefix = path.substring(lastSlashIdx+1);
List<String> nodes = checkZk().getChildren(parent, false);
List<String> matching = filterByPrefix(nodes, nodePrefix);
for (String node : matching) {
String nodePath = parent + "/" + node;
Stat stat = checkZk().exists(nodePath, false);
if (stat != null) {
return nodePath;
}
}
return null;
}
public synchronized long getSessionId() {
return zk == null ? -1 : zk.getSessionId();
}
public synchronized void close() throws InterruptedException {
if (zk != null) {
zk.close();
}
}
public synchronized States getState() {
return zk == null ? null : zk.getState();
}
public synchronized ZooKeeper getZooKeeper() {
return zk;
}
public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException {
checkZk().sync(path, cb, null);
}
/**
* Filters the given node list by the given prefixes.
* This method is all-inclusive--if any element in the node list starts
* with any of the given prefixes, then it is included in the result.
*
* @param nodes the nodes to filter
* @param prefixes the prefixes to include in the result
* @return list of every element that starts with one of the prefixes
*/
private static List<String> filterByPrefix(List<String> nodes,
String... prefixes) {
List<String> lockChildren = new ArrayList<>();
for (String child : nodes){
for (String prefix : prefixes){
if (child.startsWith(prefix)){
lockChildren.add(child);
break;
}
}
}
return lockChildren;
}
public String getIdentifier() {
return identifier;
}
}