blob: c046fc620392a94b340814f8fd1488762772a18c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.impl.metadata;
import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.distributedlog.DistributedLogConstants.EMPTY_BYTES;
import static org.apache.distributedlog.DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO;
import static org.apache.distributedlog.metadata.LogMetadata.*;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.concurrent.FutureEventListener;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClient.ZooKeeperConnectionException;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LockCancelledException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.LogExistsException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
import org.apache.distributedlog.lock.DistributedLock;
import org.apache.distributedlog.lock.SessionLockFactory;
import org.apache.distributedlog.lock.ZKDistributedLock;
import org.apache.distributedlog.lock.ZKSessionLockFactory;
import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.LogMetadataForReader;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.metadata.LogStreamMetadataStore;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.LimitedPermitManager;
import org.apache.distributedlog.zk.ZKTransaction;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.Op.Create;
import org.apache.zookeeper.Op.Delete;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* zookeeper based {@link LogStreamMetadataStore}.
*/
public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
private static final Logger LOG = LoggerFactory.getLogger(ZKLogStreamMetadataStore.class);
private final String clientId;
private final DistributedLogConfiguration conf;
private final ZooKeeperClient zooKeeperClient;
private final OrderedScheduler scheduler;
private final StatsLogger statsLogger;
private final LogSegmentMetadataStore logSegmentStore;
private final LimitedPermitManager permitManager;
// lock
private SessionLockFactory lockFactory;
private OrderedScheduler lockStateExecutor;
public ZKLogStreamMetadataStore(String clientId,
DistributedLogConfiguration conf,
ZooKeeperClient zkc,
OrderedScheduler scheduler,
StatsLogger statsLogger) {
this.clientId = clientId;
this.conf = conf;
this.zooKeeperClient = zkc;
this.scheduler = scheduler;
this.statsLogger = statsLogger;
// create the log segment metadata store and the permit manager (used for log segment rolling)
this.logSegmentStore = new ZKLogSegmentMetadataStore(conf, zooKeeperClient, scheduler);
this.permitManager = new LimitedPermitManager(
conf.getLogSegmentRollingConcurrency(),
1,
TimeUnit.MINUTES,
scheduler);
this.zooKeeperClient.register(permitManager);
}
private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
if (createIfNull && null == lockStateExecutor) {
lockStateExecutor = OrderedScheduler.newBuilder()
.name("DLM-LockState")
.corePoolSize(conf.getNumLockStateThreads())
.build();
}
return lockStateExecutor;
}
private synchronized SessionLockFactory getLockFactory(boolean createIfNull) {
if (createIfNull && null == lockFactory) {
lockFactory = new ZKSessionLockFactory(
zooKeeperClient,
clientId,
getLockStateExecutor(createIfNull),
conf.getZKNumRetries(),
conf.getLockTimeoutMilliSeconds(),
conf.getZKRetryBackoffStartMillis(),
statsLogger);
}
return lockFactory;
}
@Override
public void close() throws IOException {
this.zooKeeperClient.unregister(permitManager);
this.permitManager.close();
this.logSegmentStore.close();
SchedulerUtils.shutdownScheduler(
getLockStateExecutor(false),
conf.getSchedulerShutdownTimeoutMs(),
TimeUnit.MILLISECONDS);
}
@Override
public LogSegmentMetadataStore getLogSegmentMetadataStore() {
return logSegmentStore;
}
@Override
public PermitManager getPermitManager() {
return this.permitManager;
}
@Override
public Transaction<Object> newTransaction() {
return new ZKTransaction(zooKeeperClient);
}
@Override
public CompletableFuture<Void> logExists(URI uri, final String logName) {
final String logSegmentsPath = LogMetadata.getLogSegmentsPath(
uri, logName, conf.getUnpartitionedStreamName());
final CompletableFuture<Void> promise = new CompletableFuture<Void>();
try {
final ZooKeeper zk = zooKeeperClient.get();
zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int syncRc, String path, Object syncCtx) {
if (KeeperException.Code.NONODE.intValue() == syncRc) {
promise.completeExceptionally(new LogNotFoundException(
String.format("Log %s does not exist or has been deleted", logName)));
return;
} else if (KeeperException.Code.OK.intValue() != syncRc){
promise.completeExceptionally(new ZKException("Error on checking log existence for " + logName,
KeeperException.create(KeeperException.Code.get(syncRc))));
return;
}
zk.exists(logSegmentsPath, false, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (KeeperException.Code.OK.intValue() == rc) {
promise.complete(null);
} else if (KeeperException.Code.NONODE.intValue() == rc) {
promise.completeExceptionally(new LogNotFoundException(
String.format("Log %s does not exist or has been deleted", logName)));
} else {
promise.completeExceptionally(
new ZKException("Error on checking log existence for "
+ logName, KeeperException.create(KeeperException.Code.get(rc))));
}
}
}, null);
}
}, null);
} catch (InterruptedException ie) {
LOG.error("Interrupted while reading {}", logSegmentsPath, ie);
promise.completeExceptionally(new DLInterruptedException("Interrupted while checking "
+ logSegmentsPath, ie));
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
promise.completeExceptionally(e);
}
return promise;
}
//
// Create Write Lock
//
@Override
public DistributedLock createWriteLock(LogMetadataForWriter metadata) {
return new ZKDistributedLock(
getLockStateExecutor(true),
getLockFactory(true),
metadata.getLockPath(),
conf.getLockTimeoutMilliSeconds(),
statsLogger);
}
//
// Create Read Lock
//
private CompletableFuture<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
final String readLockPath) {
final CompletableFuture<Void> promise = new CompletableFuture<Void>();
promise.whenComplete((value, cause) -> {
if (cause instanceof CancellationException) {
FutureUtils.completeExceptionally(promise, new LockCancelledException(readLockPath,
"Could not ensure read lock path", cause));
}
});
Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
Utils.zkAsyncCreateFullPathOptimisticRecursive(zooKeeperClient, readLockPath, parentPathShouldNotCreate,
new byte[0], zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT,
new org.apache.zookeeper.AsyncCallback.StringCallback() {
@Override
public void processResult(final int rc, final String path, Object ctx, String name) {
if (KeeperException.Code.NONODE.intValue() == rc) {
FutureUtils.completeExceptionally(promise, new LogNotFoundException(
String.format("Log %s does not exist or has been deleted",
logMetadata.getFullyQualifiedName())));
} else if (KeeperException.Code.OK.intValue() == rc) {
FutureUtils.complete(promise, null);
LOG.trace("Created path {}.", path);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
FutureUtils.complete(promise, null);
LOG.trace("Path {} is already existed.", path);
} else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
FutureUtils.completeExceptionally(promise,
new ZooKeeperClient.ZooKeeperConnectionException(path));
} else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
FutureUtils.completeExceptionally(promise, new DLInterruptedException(path));
} else {
FutureUtils.completeExceptionally(promise,
KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
return promise;
}
@Override
public CompletableFuture<DistributedLock> createReadLock(final LogMetadataForReader metadata,
Optional<String> readerId) {
final String readLockPath = metadata.getReadLockPath(readerId);
return ensureReadLockPathExist(metadata, readLockPath)
.thenApplyAsync((value) -> {
DistributedLock lock = new ZKDistributedLock(
getLockStateExecutor(true),
getLockFactory(true),
readLockPath,
conf.getLockTimeoutMilliSeconds(),
statsLogger.scope("read_lock"));
return lock;
}, scheduler.chooseExecutor(readLockPath));
}
//
// Create Log
//
static class MetadataIndex {
static final int LOG_ROOT_PARENT = 0;
static final int LOG_ROOT = 1;
static final int MAX_TXID = 2;
static final int VERSION = 3;
static final int LOCK = 4;
static final int READ_LOCK = 5;
static final int LOGSEGMENTS = 6;
static final int ALLOCATION = 7;
}
static int bytesToInt(byte[] b) {
assert b.length >= 4;
return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
}
static byte[] intToBytes(int i) {
return new byte[]{
(byte) (i >> 24),
(byte) (i >> 16),
(byte) (i >> 8),
(byte) (i)};
}
static CompletableFuture<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
String logRootPath,
boolean ownAllocator) {
// Note re. persistent lock state initialization: the read lock persistent state (path) is
// initialized here but only used in the read handler. The reason is its more convenient and
// less error prone to manage all stream structure in one place.
final String logRootParentPath = Utils.getParent(logRootPath);
final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
final String lockPath = logRootPath + LOCK_PATH;
final String readLockPath = logRootPath + READ_LOCK_PATH;
final String versionPath = logRootPath + VERSION_PATH;
final String allocationPath = logRootPath + ALLOCATION_PATH;
int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
List<CompletableFuture<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
checkFutures.add(Utils.zkGetData(zk, versionPath, false));
checkFutures.add(Utils.zkGetData(zk, lockPath, false));
checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
if (ownAllocator) {
checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
}
return FutureUtils.collect(checkFutures);
}
static boolean pathExists(Versioned<byte[]> metadata) {
return null != metadata.getValue() && null != metadata.getVersion();
}
static void ensureMetadataExist(Versioned<byte[]> metadata) {
checkNotNull(metadata.getValue());
checkNotNull(metadata.getVersion());
}
static void createMissingMetadata(final ZooKeeper zk,
final String basePath,
final String logRootPath,
final List<Versioned<byte[]>> metadatas,
final List<ACL> acl,
final boolean ownAllocator,
final boolean createIfNotExists,
final CompletableFuture<List<Versioned<byte[]>>> promise) {
final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
CreateMode createMode = CreateMode.PERSISTENT;
// log root parent path
String logRootParentPath = Utils.getParent(logRootPath);
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
}
// log root path
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath, EMPTY_BYTES, acl, createMode));
}
// max id
if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
pathsToCreate.add(null);
} else {
byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
pathsToCreate.add(zeroTxnIdData);
zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
}
// version
if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
pathsToCreate.add(null);
} else {
byte[] versionData = intToBytes(LAYOUT_VERSION);
pathsToCreate.add(versionData);
zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
}
// lock path
if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + LOCK_PATH, EMPTY_BYTES, acl, createMode));
}
// read lock path
if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, EMPTY_BYTES, acl, createMode));
}
// log segments path
if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
pathsToCreate.add(null);
} else {
byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
pathsToCreate.add(logSegmentsData);
zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
}
// allocation path
if (ownAllocator) {
if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
EMPTY_BYTES, acl, createMode));
}
}
if (zkOps.isEmpty()) {
// nothing missed
promise.complete(metadatas);
return;
}
if (!createIfNotExists) {
promise.completeExceptionally(new LogNotFoundException("Log " + logRootPath + " not found"));
return;
}
getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath))
.whenComplete(new FutureEventListener<List<String>>() {
@Override
public void onSuccess(List<String> paths) {
for (String path : paths) {
pathsToCreate.add(EMPTY_BYTES);
zkOps.add(
0, Op.create(path, EMPTY_BYTES, acl, createMode));
}
executeCreateMissingPathTxn(
zk,
zkOps,
pathsToCreate,
metadatas,
logRootPath,
promise
);
}
@Override
public void onFailure(Throwable cause) {
promise.completeExceptionally(cause);
return;
}
});
}
private static void executeCreateMissingPathTxn(ZooKeeper zk,
List<Op> zkOps,
List<byte[]> pathsToCreate,
List<Versioned<byte[]>> metadatas,
String logRootPath,
CompletableFuture<List<Versioned<byte[]>>> promise) {
zk.multi(zkOps, new AsyncCallback.MultiCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
if (KeeperException.Code.OK.intValue() == rc) {
List<Versioned<byte[]>> finalMetadatas =
Lists.newArrayListWithExpectedSize(metadatas.size());
for (int i = 0; i < pathsToCreate.size(); i++) {
byte[] dataCreated = pathsToCreate.get(i);
if (null == dataCreated) {
finalMetadatas.add(metadatas.get(i));
} else {
finalMetadatas.add(new Versioned<byte[]>(dataCreated, new LongVersion(0)));
}
}
promise.complete(finalMetadatas);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
promise.completeExceptionally(new LogExistsException("Someone just created log "
+ logRootPath));
} else {
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
for (OpResult result : resultList) {
if (result instanceof OpResult.ErrorResult) {
OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
builder.append(errorResult.getErr()).append(",");
} else {
builder.append(0).append(",");
}
}
String resultCodeList = builder.substring(0, builder.length() - 1);
LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
}
promise.completeExceptionally(new ZKException("Failed to create log " + logRootPath,
KeeperException.Code.get(rc)));
}
}
}, null);
}
static LogMetadataForWriter processLogMetadatas(URI uri,
String logName,
String logIdentifier,
List<Versioned<byte[]>> metadatas,
boolean ownAllocator)
throws UnexpectedException {
try {
// max id
Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
ensureMetadataExist(maxTxnIdData);
// version
Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
ensureMetadataExist(maxTxnIdData);
checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
// lock path
ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
// read lock path
ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
// max lssn
Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
ensureMetadataExist(maxLSSNData);
try {
DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
} catch (NumberFormatException nfe) {
throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
}
// allocation path
Versioned<byte[]> allocationData;
if (ownAllocator) {
allocationData = metadatas.get(MetadataIndex.ALLOCATION);
ensureMetadataExist(allocationData);
} else {
allocationData = new Versioned<byte[]>(null, null);
}
return new LogMetadataForWriter(uri, logName, logIdentifier,
maxLSSNData, maxTxnIdData, allocationData);
} catch (IllegalArgumentException iae) {
throw new UnexpectedException("Invalid log " + logName, iae);
} catch (NullPointerException npe) {
throw new UnexpectedException("Invalid log " + logName, npe);
}
}
static CompletableFuture<LogMetadataForWriter> getLog(final URI uri,
final String logName,
final String logIdentifier,
final ZooKeeperClient zooKeeperClient,
final boolean ownAllocator,
final boolean createIfNotExists) {
final String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier);
try {
PathUtils.validatePath(logRootPath);
} catch (IllegalArgumentException e) {
LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
return FutureUtils.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
}
try {
final ZooKeeper zk = zooKeeperClient.get();
return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
.thenCompose(metadatas -> {
CompletableFuture<List<Versioned<byte[]>>> promise =
new CompletableFuture<List<Versioned<byte[]>>>();
createMissingMetadata(
zk,
uri.getPath(),
logRootPath,
metadatas,
zooKeeperClient.getDefaultACL(),
ownAllocator,
createIfNotExists,
promise);
return promise;
}).thenCompose(metadatas -> {
try {
return FutureUtils.value(
processLogMetadatas(
uri,
logName,
logIdentifier,
metadatas,
ownAllocator));
} catch (UnexpectedException e) {
return FutureUtils.exception(e);
}
});
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
return FutureUtils.exception(
new ZKException("Encountered zookeeper connection issue on creating log "
+ logName, KeeperException.Code.CONNECTIONLOSS));
} catch (InterruptedException e) {
return FutureUtils.exception(new DLInterruptedException("Interrupted on creating log " + logName, e));
}
}
@Override
public CompletableFuture<LogMetadataForWriter> getLog(final URI uri,
final String logName,
final boolean ownAllocator,
final boolean createIfNotExists) {
return getLog(
uri,
logName,
conf.getUnpartitionedStreamName(),
zooKeeperClient,
ownAllocator,
createIfNotExists);
}
//
// Delete Log
//
@Override
public CompletableFuture<Void> deleteLog(URI uri, final String logName) {
final CompletableFuture<Void> promise = new CompletableFuture<Void>();
try {
String streamPath = LogMetadata.getLogStreamPath(uri, logName);
ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
if (KeeperException.Code.OK.intValue() != rc) {
FutureUtils.completeExceptionally(promise,
new ZKException("Encountered zookeeper issue on deleting log stream "
+ logName, KeeperException.Code.get(rc)));
return;
}
FutureUtils.complete(promise, null);
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
FutureUtils.completeExceptionally(promise,
new ZKException("Encountered zookeeper issue on deleting log stream "
+ logName, KeeperException.Code.CONNECTIONLOSS));
} catch (InterruptedException e) {
FutureUtils.completeExceptionally(promise,
new DLInterruptedException("Interrupted while deleting log stream " + logName));
} catch (KeeperException e) {
FutureUtils.completeExceptionally(promise,
new ZKException("Encountered zookeeper issue on deleting log stream "
+ logName, e));
}
return promise;
}
//
// Rename Log
//
@Override
public CompletableFuture<Void> renameLog(URI uri, String oldStreamName, String newStreamName) {
return getLog(
uri,
oldStreamName,
true,
false
).thenCompose(metadata -> renameLogMetadata(uri, metadata, newStreamName));
}
private CompletableFuture<Void> renameLogMetadata(URI uri,
LogMetadataForWriter oldMetadata,
String newStreamName) {
final LinkedList<Op> createOps = Lists.newLinkedList();
final LinkedList<Op> deleteOps = Lists.newLinkedList();
List<ACL> acls = zooKeeperClient.getDefaultACL();
// get the root path
String oldRootPath = oldMetadata.getLogRootPath();
String newRootPath = LogMetadata.getLogRootPath(
uri, newStreamName, conf.getUnpartitionedStreamName());
// 0. the log path
deleteOps.addFirst(Op.delete(
LogMetadata.getLogStreamPath(uri, oldMetadata.getLogName()), -1));
// 1. the root path
createOps.addLast(Op.create(
newRootPath, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
deleteOps.addFirst(Op.delete(
oldRootPath, -1));
// 2. max id
Versioned<byte[]> maxTxIdData = oldMetadata.getMaxTxIdData();
deleteOldPathAndCreateNewPath(
oldRootPath, MAX_TXID_PATH, maxTxIdData,
newRootPath, DLUtils.serializeTransactionId(0L), acls,
createOps, deleteOps
);
// 3. version
createOps.addLast(Op.create(
newRootPath + VERSION_PATH, intToBytes(LAYOUT_VERSION), acls, CreateMode.PERSISTENT));
deleteOps.addFirst(Op.delete(
oldRootPath + VERSION_PATH, -1));
// 4. lock path (NOTE: if the stream is locked by a writer, then the delete will fail as you can not
// delete the lock path if children is not empty.
createOps.addLast(Op.create(
newRootPath + LOCK_PATH, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
deleteOps.addFirst(Op.delete(
oldRootPath + LOCK_PATH, -1));
// 5. read lock path (NOTE: same reason as the write lock)
createOps.addLast(Op.create(
newRootPath + READ_LOCK_PATH, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
deleteOps.addFirst(Op.delete(
oldRootPath + READ_LOCK_PATH, -1));
// 6. allocation path
Versioned<byte[]> allocationData = oldMetadata.getAllocationData();
deleteOldPathAndCreateNewPath(
oldRootPath, ALLOCATION_PATH, allocationData,
newRootPath, EMPTY_BYTES, acls,
createOps, deleteOps);
// 7. log segments
Versioned<byte[]> maxLSSNData = oldMetadata.getMaxLSSNData();
deleteOldPathAndCreateNewPath(
oldRootPath, LOGSEGMENTS_PATH, maxLSSNData,
newRootPath, DLUtils.serializeLogSegmentSequenceNumber(UNASSIGNED_LOGSEGMENT_SEQNO), acls,
createOps, deleteOps);
// 8. copy the log segments
CompletableFuture<List<LogSegmentMetadata>> segmentsFuture;
if (pathExists(maxLSSNData)) {
segmentsFuture = getLogSegments(zooKeeperClient, oldRootPath + LOGSEGMENTS_PATH);
} else {
segmentsFuture = FutureUtils.value(Collections.emptyList());
}
return segmentsFuture
// copy the segments
.thenApply(segments -> {
for (LogSegmentMetadata segment : segments) {
deleteOldSegmentAndCreateNewSegment(
segment,
newRootPath + LOGSEGMENTS_PATH,
acls,
createOps,
deleteOps);
}
return null;
})
// get the missing paths
.thenCompose(ignored ->
getMissingPaths(zooKeeperClient, uri, newStreamName)
)
// create the missing paths and execute the rename transaction
.thenCompose(paths -> {
for (String path : paths) {
createOps.addFirst(Op.create(
path, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
}
return executeRenameTxn(oldRootPath, newRootPath, createOps, deleteOps);
});
}
@VisibleForTesting
static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) {
ZooKeeper zk;
try {
zk = zkc.get();
} catch (ZooKeeperConnectionException | InterruptedException e) {
return FutureUtils.exception(e);
}
String basePath = uri.getPath();
String logStreamPath = LogMetadata.getLogStreamPath(uri, logName);
return getMissingPaths(zk, basePath, logStreamPath);
}
@VisibleForTesting
static CompletableFuture<List<String>> getMissingPaths(ZooKeeper zk, String basePath, String logStreamPath) {
LinkedList<String> missingPaths = Lists.newLinkedList();
CompletableFuture<List<String>> future = FutureUtils.createFuture();
existPath(zk, logStreamPath, basePath, missingPaths, future);
return future;
}
private static void existPath(ZooKeeper zk,
String path,
String basePath,
LinkedList<String> missingPaths,
CompletableFuture<List<String>> future) {
if (basePath.equals(path)) {
future.complete(missingPaths);
return;
}
zk.exists(path, false, (rc, path1, ctx, stat) -> {
if (Code.OK.intValue() != rc && Code.NONODE.intValue() != rc) {
future.completeExceptionally(new ZKException("Failed to check existence of path " + path1,
Code.get(rc)));
return;
}
if (Code.OK.intValue() == rc) {
future.complete(missingPaths);
return;
}
missingPaths.addLast(path);
String parentPath = Utils.getParent(path);
existPath(zk, parentPath, basePath, missingPaths, future);
}, null);
}
private CompletableFuture<Void> executeRenameTxn(String oldLogPath,
String newLogPath,
LinkedList<Op> createOps,
LinkedList<Op> deleteOps) {
CompletableFuture<Void> future = FutureUtils.createFuture();
List<Op> zkOps = Lists.newArrayListWithExpectedSize(createOps.size() + deleteOps.size());
zkOps.addAll(createOps);
zkOps.addAll(deleteOps);
if (LOG.isDebugEnabled()) {
for (Op op : zkOps) {
if (op instanceof Create) {
Create create = (Create) op;
LOG.debug("op : create {}", create.getPath());
} else if (op instanceof Delete) {
Delete delete = (Delete) op;
LOG.debug("op : delete {}, record = {}", delete.getPath(), op.toRequestRecord());
} else {
LOG.debug("op : {}", op);
}
}
}
try {
zooKeeperClient.get().multi(zkOps, (rc, path, ctx, opResults) -> {
if (Code.OK.intValue() == rc) {
future.complete(null);
} else if (Code.NODEEXISTS.intValue() == rc) {
future.completeExceptionally(new LogExistsException("Someone just created new log " + newLogPath));
} else if (Code.NOTEMPTY.intValue() == rc) {
future.completeExceptionally(new LockingException(oldLogPath + LOCK_PATH,
"Someone is holding a lock on log " + oldLogPath));
} else {
future.completeExceptionally(new ZKException("Failed to rename log "
+ oldLogPath + " to " + newLogPath + " at path " + path, Code.get(rc)));
}
}, null);
} catch (ZooKeeperConnectionException e) {
future.completeExceptionally(e);
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
return future;
}
private static void deleteOldSegmentAndCreateNewSegment(LogSegmentMetadata oldMetadata,
String newSegmentsPath,
List<ACL> acls,
LinkedList<Op> createOps,
LinkedList<Op> deleteOps) {
createOps.addLast(Op.create(
newSegmentsPath + "/" + oldMetadata.getZNodeName(),
oldMetadata.getFinalisedData().getBytes(UTF_8),
acls,
CreateMode.PERSISTENT));
deleteOps.addFirst(Op.delete(
oldMetadata.getZkPath(),
-1));
}
private static void deleteOldPathAndCreateNewPath(String oldRootPath,
String nodePath,
Versioned<byte[]> pathData,
String newRootPath,
byte[] initData,
List<ACL> acls,
LinkedList<Op> createOps,
LinkedList<Op> deleteOps) {
if (pathExists(pathData)) {
createOps.addLast(Op.create(
newRootPath + nodePath, pathData.getValue(), acls, CreateMode.PERSISTENT));
deleteOps.addFirst(Op.delete(
oldRootPath + nodePath, (int) ((LongVersion) pathData.getVersion()).getLongVersion()));
} else {
createOps.addLast(Op.create(
newRootPath + nodePath, initData, acls, CreateMode.PERSISTENT));
}
}
@VisibleForTesting
static CompletableFuture<List<LogSegmentMetadata>> getLogSegments(ZooKeeperClient zk,
String logSegmentsPath) {
CompletableFuture<List<LogSegmentMetadata>> future = FutureUtils.createFuture();
try {
zk.get().getChildren(logSegmentsPath, false, (rc, path, ctx, children, stat) -> {
if (Code.OK.intValue() != rc) {
if (Code.NONODE.intValue() == rc) {
future.completeExceptionally(new LogNotFoundException("Log " + path + " not found"));
} else {
future.completeExceptionally(new ZKException("Failed to get log segments from " + path,
Code.get(rc)));
}
return;
}
// get all the segments
List<CompletableFuture<LogSegmentMetadata>> futures =
Lists.newArrayListWithExpectedSize(children.size());
for (String child : children) {
futures.add(LogSegmentMetadata.read(zk, logSegmentsPath + "/" + child));
}
FutureUtils.proxyTo(
FutureUtils.collect(futures),
future);
}, null);
} catch (ZooKeeperConnectionException e) {
future.completeExceptionally(e);
} catch (InterruptedException e) {
future.completeExceptionally(e);
}
return future;
}
}