blob: c76a5a599da1f0ec42f95e611ee47db65c46f353 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.impl.metadata;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogConstants;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
import com.twitter.distributedlog.exceptions.LockCancelledException;
import com.twitter.distributedlog.exceptions.LogExistsException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.ZKDistributedLock;
import com.twitter.distributedlog.lock.ZKSessionLockFactory;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.metadata.LogStreamMetadataStore;
import com.twitter.distributedlog.metadata.LogMetadata;
import com.twitter.distributedlog.metadata.LogMetadataForReader;
import com.twitter.distributedlog.metadata.LogMetadataForWriter;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.distributedlog.zk.LimitedPermitManager;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.PermitManager;
import com.twitter.distributedlog.util.Transaction;
import com.twitter.distributedlog.util.Utils;
import com.twitter.distributedlog.zk.ZKTransaction;
import com.twitter.util.ExceptionalFunction;
import com.twitter.util.ExceptionalFunction0;
import com.twitter.util.Future;
import com.twitter.util.Promise;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static com.twitter.distributedlog.metadata.LogMetadata.*;
/**
* zookeeper based {@link LogStreamMetadataStore}
*/
public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
private final static Logger LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class);
private final String clientId;
private final DistributedLogConfiguration conf;
private final ZooKeeperClient zooKeeperClient;
private final OrderedScheduler scheduler;
private final StatsLogger statsLogger;
private final LogSegmentMetadataStore logSegmentStore;
private final LimitedPermitManager permitManager;
// lock
private SessionLockFactory lockFactory;
private OrderedScheduler lockStateExecutor;
public ZKLogStreamMetadataStore(String clientId,
DistributedLogConfiguration conf,
ZooKeeperClient zkc,
OrderedScheduler scheduler,
StatsLogger statsLogger) {
this.clientId = clientId;
this.conf = conf;
this.zooKeeperClient = zkc;
this.scheduler = scheduler;
this.statsLogger = statsLogger;
// create the log segment metadata store and the permit manager (used for log segment rolling)
this.logSegmentStore = new ZKLogSegmentMetadataStore(conf, zooKeeperClient, scheduler);
this.permitManager = new LimitedPermitManager(
conf.getLogSegmentRollingConcurrency(),
1,
TimeUnit.MINUTES,
scheduler);
this.zooKeeperClient.register(permitManager);
}
private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
if (createIfNull && null == lockStateExecutor) {
StatsLogger lockStateStatsLogger = statsLogger.scope("lock_scheduler");
lockStateExecutor = OrderedScheduler.newBuilder()
.name("DLM-LockState")
.corePoolSize(conf.getNumLockStateThreads())
.statsLogger(lockStateStatsLogger)
.perExecutorStatsLogger(lockStateStatsLogger)
.traceTaskExecution(conf.getEnableTaskExecutionStats())
.traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
.build();
}
return lockStateExecutor;
}
private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
if (createIfNull && null == lockFactory) {
lockFactory = new ZKSessionLockFactory(
zooKeeperClient,
clientId,
getLockStateExecutor(createIfNull),
conf.getZKNumRetries(),
conf.getLockTimeoutMilliSeconds(),
conf.getZKRetryBackoffStartMillis(),
statsLogger);
}
return lockFactory;
}
@Override
public void close() throws IOException {
this.zooKeeperClient.unregister(permitManager);
this.permitManager.close();
this.logSegmentStore.close();
SchedulerUtils.shutdownScheduler(
getLockStateExecutor(false),
conf.getSchedulerShutdownTimeoutMs(),
TimeUnit.MILLISECONDS);
}
@Override
public LogSegmentMetadataStore getLogSegmentMetadataStore() {
return logSegmentStore;
}
@Override
public PermitManager getPermitManager() {
return this.permitManager;
}
@Override
public Transaction<Object> newTransaction() {
return new ZKTransaction(zooKeeperClient);
}
@Override
public Future<Void> logExists(URI uri, final String logName) {
final String logSegmentsPath = LogMetadata.getLogSegmentsPath(
uri, logName, conf.getUnpartitionedStreamName());
final Promise<Void> promise = new Promise<Void>();
try {
final ZooKeeper zk = zooKeeperClient.get();
zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int syncRc, String path, Object syncCtx) {
if (KeeperException.Code.NONODE.intValue() == syncRc) {
promise.setException(new LogNotFoundException(
String.format("Log %s does not exist or has been deleted", logName)));
return;
} else if (KeeperException.Code.OK.intValue() != syncRc){
promise.setException(new ZKException("Error on checking log existence for " + logName,
KeeperException.create(KeeperException.Code.get(syncRc))));
return;
}
zk.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (KeeperException.Code.OK.intValue() == rc) {
promise.setValue(null);
} else if (KeeperException.Code.NONODE.intValue() == rc) {
promise.setException(new LogNotFoundException(
String.format("Log %s does not exist or has been deleted", logName)));
} else {
promise.setException(new ZKException("Error on checking log existence for " + logName,
KeeperException.create(KeeperException.Code.get(rc))));
}
}
}, null);
}
}, null);
} catch (InterruptedException ie) {
LOG.error("Interrupted while reading {}", logSegmentsPath, ie);
promise.setException(new DLInterruptedException("Interrupted while checking "
+ logSegmentsPath, ie));
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
promise.setException(e);
}
return promise;
}
//
// Create Write Lock
//
@Override
public DistributedLock createWriteLock(LogMetadataForWriter metadata) {
return new ZKDistributedLock(
getLockStateExecutor(true),
getLockFactory(true),
metadata.getLockPath(),
conf.getLockTimeoutMilliSeconds(),
statsLogger);
}
//
// Create Read Lock
//
private Future<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
final String readLockPath) {
final Promise<Void> promise = new Promise<Void>();
promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
@Override
public BoxedUnit apply(Throwable t) {
FutureUtils.setException(promise, new LockCancelledException(readLockPath,
"Could not ensure read lock path", t));
return null;
}
});
Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
new org.apache.zookeeper.AsyncCallback.StringCallback() {
@Override
public void processResult(final int rc, final String path, Object ctx, String name) {
if (KeeperException.Code.NONODE.intValue() == rc) {
FutureUtils.setException(promise, new LogNotFoundException(
String.format("Log %s does not exist or has been deleted",
logMetadata.getFullyQualifiedName())));
} else if (KeeperException.Code.OK.intValue() == rc) {
FutureUtils.setValue(promise, null);
LOG.trace("Created path {}.", path);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
FutureUtils.setValue(promise, null);
LOG.trace("Path {} is already existed.", path);
} else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
} else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
FutureUtils.setException(promise, new DLInterruptedException(path));
} else {
FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
return promise;
}
@Override
public Future<DistributedLock> createReadLock(final LogMetadataForReader metadata,
Optional<String> readerId) {
final String readLockPath = metadata.getReadLockPath(readerId);
return ensureReadLockPathExist(metadata, readLockPath).flatMap(
new ExceptionalFunction<Void, Future<DistributedLock>>() {
@Override
public Future<DistributedLock> applyE(Void value) throws Throwable {
// Unfortunately this has a blocking call which we should not execute on the
// ZK completion thread
return scheduler.apply(new ExceptionalFunction0<DistributedLock>() {
@Override
public DistributedLock applyE() throws Throwable {
return new ZKDistributedLock(
getLockStateExecutor(true),
getLockFactory(true),
readLockPath,
conf.getLockTimeoutMilliSeconds(),
statsLogger.scope("read_lock"));
}
});
}
});
}
//
// Create Log
//
static class MetadataIndex {
static final int LOG_ROOT_PARENT = 0;
static final int LOG_ROOT = 1;
static final int MAX_TXID = 2;
static final int VERSION = 3;
static final int LOCK = 4;
static final int READ_LOCK = 5;
static final int LOGSEGMENTS = 6;
static final int ALLOCATION = 7;
}
static int bytesToInt(byte[] b) {
assert b.length >= 4;
return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
}
static byte[] intToBytes(int i) {
return new byte[]{
(byte) (i >> 24),
(byte) (i >> 16),
(byte) (i >> 8),
(byte) (i)};
}
static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
String logRootPath,
boolean ownAllocator) {
// Note re. persistent lock state initialization: the read lock persistent state (path) is
// initialized here but only used in the read handler. The reason is its more convenient and
// less error prone to manage all stream structure in one place.
final String logRootParentPath = new File(logRootPath).getParent();
final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
final String lockPath = logRootPath + LOCK_PATH;
final String readLockPath = logRootPath + READ_LOCK_PATH;
final String versionPath = logRootPath + VERSION_PATH;
final String allocationPath = logRootPath + ALLOCATION_PATH;
int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
checkFutures.add(Utils.zkGetData(zk, versionPath, false));
checkFutures.add(Utils.zkGetData(zk, lockPath, false));
checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
if (ownAllocator) {
checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
}
return Future.collect(checkFutures);
}
static boolean pathExists(Versioned<byte[]> metadata) {
return null != metadata.getValue() && null != metadata.getVersion();
}
static void ensureMetadataExist(Versioned<byte[]> metadata) {
Preconditions.checkNotNull(metadata.getValue());
Preconditions.checkNotNull(metadata.getVersion());
}
static void createMissingMetadata(final ZooKeeper zk,
final String logRootPath,
final List<Versioned<byte[]>> metadatas,
final List<ACL> acl,
final boolean ownAllocator,
final boolean createIfNotExists,
final Promise<List<Versioned<byte[]>>> promise) {
final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
CreateMode createMode = CreateMode.PERSISTENT;
// log root parent path
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
pathsToCreate.add(null);
} else {
String logRootParentPath = new File(logRootPath).getParent();
pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
}
// log root path
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
}
// max id
if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
pathsToCreate.add(null);
} else {
byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
pathsToCreate.add(zeroTxnIdData);
zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
}
// version
if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
pathsToCreate.add(null);
} else {
byte[] versionData = intToBytes(LAYOUT_VERSION);
pathsToCreate.add(versionData);
zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
}
// lock path
if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
}
// read lock path
if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
}
// log segments path
if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
pathsToCreate.add(null);
} else {
byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
pathsToCreate.add(logSegmentsData);
zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
}
// allocation path
if (ownAllocator) {
if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
DistributedLogConstants.EMPTY_BYTES, acl, createMode));
}
}
if (zkOps.isEmpty()) {
// nothing missed
promise.setValue(metadatas);
return;
}
if (!createIfNotExists) {
promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
return;
}
zk.multi(zkOps, new AsyncCallback.MultiCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
if (KeeperException.Code.OK.intValue() == rc) {
List<Versioned<byte[]>> finalMetadatas =
Lists.newArrayListWithExpectedSize(metadatas.size());
for (int i = 0; i < pathsToCreate.size(); i++) {
byte[] dataCreated = pathsToCreate.get(i);
if (null == dataCreated) {
finalMetadatas.add(metadatas.get(i));
} else {
finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
}
}
promise.setValue(finalMetadatas);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
promise.setException(new LogExistsException("Someone just created log "
+ logRootPath));
} else {
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
for (OpResult result : resultList) {
if (result instanceof OpResult.ErrorResult) {
OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
builder.append(errorResult.getErr()).append(",");
} else {
builder.append(0).append(",");
}
}
String resultCodeList = builder.substring(0, builder.length() - 1);
LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
}
promise.setException(new ZKException("Failed to create log " + logRootPath,
KeeperException.Code.get(rc)));
}
}
}, null);
}
static LogMetadataForWriter processLogMetadatas(URI uri,
String logName,
String logIdentifier,
List<Versioned<byte[]>> metadatas,
boolean ownAllocator)
throws UnexpectedException {
try {
// max id
Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
ensureMetadataExist(maxTxnIdData);
// version
Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
ensureMetadataExist(maxTxnIdData);
Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
// lock path
ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
// read lock path
ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
// max lssn
Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
ensureMetadataExist(maxLSSNData);
try {
DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
} catch (NumberFormatException nfe) {
throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
}
// allocation path
Versioned<byte[]> allocationData;
if (ownAllocator) {
allocationData = metadatas.get(MetadataIndex.ALLOCATION);
ensureMetadataExist(allocationData);
} else {
allocationData = new Versioned<byte[]>(null, null);
}
return new LogMetadataForWriter(uri, logName, logIdentifier,
maxLSSNData, maxTxnIdData, allocationData);
} catch (IllegalArgumentException iae) {
throw new UnexpectedException("Invalid log " + logName, iae);
} catch (NullPointerException npe) {
throw new UnexpectedException("Invalid log " + logName, npe);
}
}
static Future<LogMetadataForWriter> getLog(final URI uri,
final String logName,
final String logIdentifier,
final ZooKeeperClient zooKeeperClient,
final boolean ownAllocator,
final boolean createIfNotExists) {
final String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier);
try {
PathUtils.validatePath(logRootPath);
} catch (IllegalArgumentException e) {
LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
}
try {
final ZooKeeper zk = zooKeeperClient.get();
return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
.flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
@Override
public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
Promise<List<Versioned<byte[]>>> promise =
new Promise<List<Versioned<byte[]>>>();
createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
ownAllocator, createIfNotExists, promise);
return promise;
}
}).map(new ExceptionalFunction<List<Versioned<byte[]>>, LogMetadataForWriter>() {
@Override
public LogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
return processLogMetadatas(
uri,
logName,
logIdentifier,
metadatas,
ownAllocator);
}
});
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
return Future.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName,
KeeperException.Code.CONNECTIONLOSS));
} catch (InterruptedException e) {
return Future.exception(new DLInterruptedException("Interrupted on creating log " + logName, e));
}
}
@Override
public Future<LogMetadataForWriter> getLog(final URI uri,
final String logName,
final boolean ownAllocator,
final boolean createIfNotExists) {
return getLog(
uri,
logName,
conf.getUnpartitionedStreamName(),
zooKeeperClient,
ownAllocator,
createIfNotExists);
}
//
// Delete Log
//
@Override
public Future<Void> deleteLog(URI uri, final String logName) {
final Promise<Void> promise = new Promise<Void>();
try {
String streamPath = LogMetadata.getLogStreamPath(uri, logName);
ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
if (KeeperException.Code.OK.intValue() != rc) {
FutureUtils.setException(promise,
new ZKException("Encountered zookeeper issue on deleting log stream "
+ logName, KeeperException.Code.get(rc)));
return;
}
FutureUtils.setValue(promise, null);
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+ logName, KeeperException.Code.CONNECTIONLOSS));
} catch (InterruptedException e) {
FutureUtils.setException(promise, new DLInterruptedException("Interrupted while deleting log stream "
+ logName));
} catch (KeeperException e) {
FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+ logName, e));
}
return promise;
}
}