blob: 99a4155cef0f9c7672e30195067ac13382cbd410 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.util;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.common.functions.VoidFunctions;
import org.apache.distributedlog.io.AsyncAbortable;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
/**
* Basic Utilities.
*/
@Slf4j
public class Utils {
/**
* Current time from some arbitrary time base in the past, counting in
* nanoseconds, and not affected by settimeofday or similar system clock
* changes. This is appropriate to use when computing how much longer to
* wait for an interval to expire.
*
* @return current time in nanoseconds.
*/
public static long nowInNanos() {
return System.nanoTime();
}
/**
* Current time from some fixed base time - so useful for cross machine
* comparison
*
* @return current time in milliseconds.
*/
public static long nowInMillis() {
return System.currentTimeMillis();
}
/**
* Milliseconds elapsed since the time specified, the input is nanoTime
* the only conversion happens when computing the elapsed time
*
* @param startMsecTime the start of the interval that we are measuring
* @return elapsed time in milliseconds.
*/
public static long elapsedMSec(long startMsecTime) {
return (System.currentTimeMillis() - startMsecTime);
}
public static boolean randomPercent(double percent) {
return (Math.random() * 100.0) <= percent;
}
/**
* Synchronously create zookeeper path recursively and optimistically.
*
* @see #zkAsyncCreateFullPathOptimistic(ZooKeeperClient, String, byte[], List, CreateMode)
* @param zkc Zookeeper client
* @param path Zookeeper full path
* @param data Zookeeper data
* @param acl Acl of the zk path
* @param createMode Create mode of zk path
* @throws ZooKeeperClient.ZooKeeperConnectionException
* @throws KeeperException
* @throws InterruptedException
*/
public static void zkCreateFullPathOptimistic(
ZooKeeperClient zkc,
String path,
byte[] data,
final List<ACL> acl,
final CreateMode createMode) throws IOException, KeeperException {
try {
FutureUtils.result(zkAsyncCreateFullPathOptimistic(zkc, path, data, acl, createMode));
} catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
throw zkce;
} catch (KeeperException ke) {
throw ke;
} catch (InterruptedException ie) {
throw new DLInterruptedException("Interrupted on create zookeeper path " + path, ie);
} catch (RuntimeException rte) {
throw rte;
} catch (Exception exc) {
throw new RuntimeException("Unexpected Exception", exc);
}
}
/**
* Asynchronously create zookeeper path recursively and optimistically.
*
* @param zkc Zookeeper client
* @param pathToCreate Zookeeper full path
* @param parentPathShouldNotCreate The recursive creation should stop if this path doesn't exist
* @param data Zookeeper data
* @param acl Acl of the zk path
* @param createMode Create mode of zk path
* @param callback Callback
* @param ctx Context object
*/
public static void zkAsyncCreateFullPathOptimisticRecursive(
final ZooKeeperClient zkc,
final String pathToCreate,
final Optional<String> parentPathShouldNotCreate,
final byte[] data,
final List<ACL> acl,
final CreateMode createMode,
final AsyncCallback.StringCallback callback,
final Object ctx) {
try {
zkc.get().create(pathToCreate, data, acl, createMode, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc != KeeperException.Code.NONODE.intValue()) {
callback.processResult(rc, path, ctx, name);
return;
}
// Since we got a nonode, it means that my parents may not exist
// ephemeral nodes can't have children so Create mode is always
// persistent parents
int lastSlash = pathToCreate.lastIndexOf('/');
if (lastSlash <= 0) {
callback.processResult(rc, path, ctx, name);
return;
}
String parent = pathToCreate.substring(0, lastSlash);
if (parentPathShouldNotCreate.isPresent() && Objects.equal(parentPathShouldNotCreate.get(), parent)) {
// we should stop here
callback.processResult(rc, path, ctx, name);
return;
}
zkAsyncCreateFullPathOptimisticRecursive(zkc, parent, parentPathShouldNotCreate, new byte[0], acl,
CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NODEEXISTS.intValue()) {
// succeeded in creating the parent, now create the original path
zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
data, acl, createMode, callback, ctx);
} else {
callback.processResult(rc, path, ctx, name);
}
}
}, ctx);
}
}, ctx);
} catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
callback.processResult(DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE, zkce.getMessage(), ctx, pathToCreate);
} catch (InterruptedException ie) {
callback.processResult(DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE, ie.getMessage(), ctx, pathToCreate);
}
}
/**
* Asynchronously create zookeeper path recursively and optimistically.
*
* @param zkc Zookeeper client
* @param pathToCreate Zookeeper full path
* @param data Zookeeper data
* @param acl Acl of the zk path
* @param createMode Create mode of zk path
*/
public static CompletableFuture<Void> zkAsyncCreateFullPathOptimistic(
final ZooKeeperClient zkc,
final String pathToCreate,
final byte[] data,
final List<ACL> acl,
final CreateMode createMode) {
Optional<String> parentPathShouldNotCreate = Optional.absent();
return zkAsyncCreateFullPathOptimistic(
zkc,
pathToCreate,
parentPathShouldNotCreate,
data,
acl,
createMode);
}
/**
* Asynchronously create zookeeper path recursively and optimistically
*
* @param zkc Zookeeper client
* @param pathToCreate Zookeeper full path
* @param parentPathShouldNotCreate zookeeper parent path should not be created
* @param data Zookeeper data
* @param acl Acl of the zk path
* @param createMode Create mode of zk path
*/
public static CompletableFuture<Void> zkAsyncCreateFullPathOptimistic(
final ZooKeeperClient zkc,
final String pathToCreate,
final Optional<String> parentPathShouldNotCreate,
final byte[] data,
final List<ACL> acl,
final CreateMode createMode) {
final CompletableFuture<Void> result = new CompletableFuture<Void>();
zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
data, acl, createMode, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
handleKeeperExceptionCode(rc, path, result);
}
}, result);
return result;
}
/**
* Asynchronously create zookeeper path recursively and optimistically.
*
* @param zkc Zookeeper client
* @param pathToCreate Zookeeper full path
* @param data Zookeeper data
* @param acl Acl of the zk path
* @param createMode Create mode of zk path
*/
public static CompletableFuture<Void> zkAsyncCreateFullPathOptimisticAndSetData(
final ZooKeeperClient zkc,
final String pathToCreate,
final byte[] data,
final List<ACL> acl,
final CreateMode createMode) {
final CompletableFuture<Void> result = new CompletableFuture<Void>();
try {
zkc.get().setData(pathToCreate, data, -1, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (rc != KeeperException.Code.NONODE.intValue()) {
handleKeeperExceptionCode(rc, path, result);
return;
}
Optional<String> parentPathShouldNotCreate = Optional.absent();
zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
data, acl, createMode, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
handleKeeperExceptionCode(rc, path, result);
}
}, result);
}
}, result);
} catch (Exception exc) {
result.completeExceptionally(exc);
}
return result;
}
private static void handleKeeperExceptionCode(int rc, String pathOrMessage, CompletableFuture<Void> result) {
if (KeeperException.Code.OK.intValue() == rc) {
result.complete(null);
} else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
result.completeExceptionally(new ZooKeeperClient.ZooKeeperConnectionException(pathOrMessage));
} else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
result.completeExceptionally(new DLInterruptedException(pathOrMessage));
} else {
result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), pathOrMessage));
}
}
public static CompletableFuture<Versioned<byte[]>> zkGetData(ZooKeeperClient zkc, String path, boolean watch) {
ZooKeeper zk;
try {
zk = zkc.get();
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
return FutureUtils.exception(zkException(e, path));
} catch (InterruptedException e) {
return FutureUtils.exception(zkException(e, path));
}
return zkGetData(zk, path, watch);
}
/**
* Retrieve data from zookeeper <code>path</code>.
*
* @param path
* zookeeper path to retrieve data
* @param watch
* whether to watch the path
* @return future representing the versioned value. null version or null value means path doesn't exist.
*/
public static CompletableFuture<Versioned<byte[]>> zkGetData(ZooKeeper zk, String path, boolean watch) {
final CompletableFuture<Versioned<byte[]>> promise = new CompletableFuture<Versioned<byte[]>>();
zk.getData(path, watch, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (KeeperException.Code.OK.intValue() == rc) {
if (null == stat) {
promise.complete(new Versioned<byte[]>(null, null));
} else {
promise.complete(new Versioned<byte[]>(data, new ZkVersion(stat.getVersion())));
}
} else if (KeeperException.Code.NONODE.intValue() == rc) {
promise.complete(new Versioned<byte[]>(null, null));
} else {
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
return promise;
}
public static CompletableFuture<ZkVersion> zkSetData(ZooKeeperClient zkc, String path, byte[] data, ZkVersion version) {
ZooKeeper zk;
try {
zk = zkc.get();
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
return FutureUtils.exception(zkException(e, path));
} catch (InterruptedException e) {
return FutureUtils.exception(zkException(e, path));
}
return zkSetData(zk, path, data, version);
}
/**
* Set <code>data</code> to zookeeper <code>path</code>.
*
* @param zk
* zookeeper client
* @param path
* path to set data
* @param data
* data to set
* @param version
* version used to set data
* @return future representing the version after this operation.
*/
public static CompletableFuture<ZkVersion> zkSetData(ZooKeeper zk, String path, byte[] data, ZkVersion version) {
final CompletableFuture<ZkVersion> promise = new CompletableFuture<ZkVersion>();
zk.setData(path, data, version.getZnodeVersion(), new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (KeeperException.Code.OK.intValue() == rc) {
promise.complete(new ZkVersion(stat.getVersion()));
return;
}
promise.completeExceptionally(
KeeperException.create(KeeperException.Code.get(rc)));
return;
}
}, null);
return promise;
}
public static CompletableFuture<Void> zkDelete(ZooKeeperClient zkc, String path, ZkVersion version) {
ZooKeeper zk;
try {
zk = zkc.get();
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
return FutureUtils.exception(zkException(e, path));
} catch (InterruptedException e) {
return FutureUtils.exception(zkException(e, path));
}
return zkDelete(zk, path, version);
}
/**
* Delete the given <i>path</i> from zookeeper.
*
* @param zk
* zookeeper client
* @param path
* path to delete
* @param version
* version used to set data
* @return future representing the version after this operation.
*/
public static CompletableFuture<Void> zkDelete(ZooKeeper zk, String path, ZkVersion version) {
final CompletableFuture<Void> promise = new CompletableFuture<Void>();
zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
if (KeeperException.Code.OK.intValue() == rc) {
promise.complete(null);
return;
}
promise.completeExceptionally(
KeeperException.create(KeeperException.Code.get(rc)));
return;
}
}, null);
return promise;
}
/**
* Delete the given <i>path</i> from zookeeper.
*
* @param zkc
* zookeeper client
* @param path
* path to delete
* @param version
* version used to set data
* @return future representing if the delete is successful. Return true if the node is deleted,
* false if the node doesn't exist, otherwise future will throw exception
*
*/
public static CompletableFuture<Boolean> zkDeleteIfNotExist(ZooKeeperClient zkc, String path, ZkVersion version) {
ZooKeeper zk;
try {
zk = zkc.get();
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
return FutureUtils.exception(zkException(e, path));
} catch (InterruptedException e) {
return FutureUtils.exception(zkException(e, path));
}
final CompletableFuture<Boolean> promise = new CompletableFuture<Boolean>();
zk.delete(path, version.getZnodeVersion(), new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
if (KeeperException.Code.OK.intValue() == rc ) {
promise.complete(true);
} else if (KeeperException.Code.NONODE.intValue() == rc) {
promise.complete(false);
} else {
promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
return promise;
}
public static CompletableFuture<Void> asyncClose(@Nullable AsyncCloseable closeable,
boolean swallowIOException) {
if (null == closeable) {
return FutureUtils.Void();
} else if (swallowIOException) {
return FutureUtils.ignore(closeable.asyncClose());
} else {
return closeable.asyncClose();
}
}
/**
* Sync zookeeper client on given <i>path</i>.
*
* @param zkc
* zookeeper client
* @param path
* path to sync
* @return zookeeper client after sync
* @throws IOException
*/
public static ZooKeeper sync(ZooKeeperClient zkc, String path) throws IOException {
ZooKeeper zk;
try {
zk = zkc.get();
} catch (InterruptedException e) {
throw new DLInterruptedException("Interrupted on checking if log " + path + " exists", e);
}
final CountDownLatch syncLatch = new CountDownLatch(1);
final AtomicInteger syncResult = new AtomicInteger(0);
zk.sync(path, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
syncResult.set(rc);
syncLatch.countDown();
}
}, null);
try {
syncLatch.await();
} catch (InterruptedException e) {
throw new DLInterruptedException("Interrupted on syncing zookeeper connection", e);
}
if (KeeperException.Code.OK.intValue() != syncResult.get()) {
throw new ZKException("Error syncing zookeeper connection ",
KeeperException.Code.get(syncResult.get()));
}
return zk;
}
/**
* Close a closeable.
*
* @param closeable
* closeable to close
*/
public static void close(@Nullable Closeable closeable) {
if (null == closeable) {
return;
}
try {
Closeables.close(closeable, true);
} catch (IOException e) {
// no-op. the exception is swallowed.
}
}
/**
* Close an async closeable.
*
* @param closeable
* closeable to close
*/
public static void close(@Nullable AsyncCloseable closeable)
throws IOException {
if (null == closeable) {
return;
}
Utils.ioResult(closeable.asyncClose());
}
/**
* Close an async closeable.
*
* @param closeable
* closeable to close
*/
public static void closeQuietly(@Nullable AsyncCloseable closeable) {
if (null == closeable) {
return;
}
try {
Utils.ioResult(closeable.asyncClose());
} catch (IOException e) {
// no-op. the exception is swallowed.
}
}
/**
* Close the closeables in sequence.
*
* @param closeables
* closeables to close
* @return future represents the close future
*/
public static CompletableFuture<Void> closeSequence(ExecutorService executorService,
AsyncCloseable... closeables) {
return closeSequence(executorService, false, closeables);
}
/**
* Close the closeables in sequence and ignore errors during closing.
*
* @param executorService executor to execute closeable
* @param ignoreCloseError whether to ignore errors during closing
* @param closeables list of closeables
* @return future represents the close future.
*/
public static CompletableFuture<Void> closeSequence(ExecutorService executorService,
boolean ignoreCloseError,
AsyncCloseable... closeables) {
List<AsyncCloseable> closeableList = Lists.newArrayListWithExpectedSize(closeables.length);
for (AsyncCloseable closeable : closeables) {
if (null == closeable) {
closeableList.add(AsyncCloseable.NULL);
} else {
closeableList.add(closeable);
}
}
return FutureUtils.processList(
closeableList,
ignoreCloseError ? AsyncCloseable.CLOSE_FUNC_IGNORE_ERRORS : AsyncCloseable.CLOSE_FUNC,
executorService
).thenApply(VoidFunctions.LIST_TO_VOID_FUNC);
}
/**
* Gets the parent of a path.
*
* @param path
* path to get the parent of
* @return parent of the path or null if no parent exists.
*/
public static String getParent(final String path) {
if (path == null) {
return null;
}
if (path.length() < 2) {
return null;
}
int firstIndex = path.indexOf("/");
if (firstIndex == -1) {
return null;
}
int lastIndex = path.lastIndexOf("/");
if (lastIndex == path.length() - 1) {
lastIndex = path.substring(0, path.length() - 1).lastIndexOf("/");
}
if (lastIndex == -1) {
return null;
}
if (lastIndex == 0) {
return "/";
}
return path.substring(0, lastIndex);
}
/**
* Convert the <i>throwable</i> to zookeeper related exceptions.
*
* @param throwable cause
* @param path zookeeper path
* @return zookeeper related exceptions
*/
public static Throwable zkException(Throwable throwable, String path) {
if (throwable instanceof KeeperException) {
return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable);
} else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) {
return new ZKException("Encountered zookeeper connection loss on " + path,
KeeperException.Code.CONNECTIONLOSS);
} else if (throwable instanceof InterruptedException) {
return new DLInterruptedException("Interrupted on operating " + path, throwable);
} else {
return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable);
}
}
/**
* Create transmit exception from transmit result.
*
* @param transmitResult
* transmit result (basically bk exception code)
* @return transmit exception
*/
public static BKTransmitException transmitException(int transmitResult) {
return new BKTransmitException("Failed to write to bookkeeper; Error is ("
+ transmitResult + ") "
+ BKException.getMessage(transmitResult), transmitResult);
}
/**
* A specific version of {@link FutureUtils#result(CompletableFuture)} to handle known exception issues.
*/
public static <T> T ioResult(CompletableFuture<T> result) throws IOException {
return FutureUtils.result(
result,
(cause) -> {
if (cause instanceof IOException) {
return (IOException) cause;
} else if (cause instanceof KeeperException) {
return new ZKException("Encountered zookeeper exception on waiting result",
(KeeperException) cause);
} else if (cause instanceof BKException) {
return new BKTransmitException("Encountered bookkeeper exception on waiting result",
((BKException) cause).getCode());
} else if (cause instanceof InterruptedException) {
return new DLInterruptedException("Interrupted on waiting result", cause);
} else {
return new IOException("Encountered exception on waiting result", cause);
}
});
}
/**
* A specific version of {@link FutureUtils#result(CompletableFuture, long, TimeUnit)}
* to handle known exception issues.
*/
public static <T> T ioResult(CompletableFuture<T> result, long timeout, TimeUnit timeUnit)
throws IOException, TimeoutException {
return FutureUtils.result(
result,
(cause) -> {
if (cause instanceof IOException) {
return (IOException) cause;
} else if (cause instanceof KeeperException) {
return new ZKException("Encountered zookeeper exception on waiting result",
(KeeperException) cause);
} else if (cause instanceof BKException) {
return new BKTransmitException("Encountered bookkeeper exception on waiting result",
((BKException) cause).getCode());
} else if (cause instanceof InterruptedException) {
return new DLInterruptedException("Interrupted on waiting result", cause);
} else {
return new IOException("Encountered exception on waiting result", cause);
}
},
timeout,
timeUnit);
}
/**
* Abort async <i>abortable</i>
*
* @param abortable the {@code AsyncAbortable} object to be aborted, or null, in which case this method
* does nothing.
* @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods
* @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException}
*/
public static void abort(@Nullable AsyncAbortable abortable,
boolean swallowIOException)
throws IOException {
if (null == abortable) {
return;
}
try {
ioResult(abortable.asyncAbort());
} catch (Exception ioe) {
if (swallowIOException) {
log.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe);
} else {
throw ioe;
}
}
}
}