blob: 1de712f910e61aad5c2ae69754c6a8c1c9a88f70 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.impl.metadata;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.twitter.distributedlog.DistributedLogConstants;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
import com.twitter.distributedlog.exceptions.LogExistsException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.ExceptionalFunction;
import com.twitter.util.Future;
import com.twitter.util.Promise;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import java.io.File;
import java.net.URI;
import java.util.List;
/**
* Log Metadata for writer
*/
public class ZKLogMetadataForWriter extends ZKLogMetadata {
static final Logger LOG = LoggerFactory.getLogger(ZKLogMetadataForWriter.class);
static class MetadataIndex {
static final int LOG_ROOT_PARENT = 0;
static final int LOG_ROOT = 1;
static final int MAX_TXID = 2;
static final int VERSION = 3;
static final int LOCK = 4;
static final int READ_LOCK = 5;
static final int LOGSEGMENTS = 6;
static final int ALLOCATION = 7;
}
static int bytesToInt(byte[] b) {
assert b.length >= 4;
return b[0] << 24 | b[1] << 16 | b[2] << 8 | b[3];
}
static byte[] intToBytes(int i) {
return new byte[]{
(byte) (i >> 24),
(byte) (i >> 16),
(byte) (i >> 8),
(byte) (i)};
}
static boolean pathExists(Versioned<byte[]> metadata) {
return null != metadata.getValue() && null != metadata.getVersion();
}
static void ensureMetadataExist(Versioned<byte[]> metadata) {
Preconditions.checkNotNull(metadata.getValue());
Preconditions.checkNotNull(metadata.getVersion());
}
public static Future<ZKLogMetadataForWriter> of(
final URI uri,
final String logName,
final String logIdentifier,
final ZooKeeper zk,
final List<ACL> acl,
final boolean ownAllocator,
final boolean createIfNotExists) {
final String logRootPath = ZKLogMetadata.getLogRootPath(uri, logName, logIdentifier);
try {
PathUtils.validatePath(logRootPath);
} catch (IllegalArgumentException e) {
LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
}
return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
.flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
@Override
public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
Promise<List<Versioned<byte[]>>> promise =
new Promise<List<Versioned<byte[]>>>();
createMissingMetadata(zk, logRootPath, metadatas, acl,
ownAllocator, createIfNotExists, promise);
return promise;
}
}).map(new ExceptionalFunction<List<Versioned<byte[]>>, ZKLogMetadataForWriter>() {
@Override
public ZKLogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
return processLogMetadatas(uri, logName, logIdentifier, metadatas, ownAllocator);
}
});
}
static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
String logRootPath,
boolean ownAllocator) {
// Note re. persistent lock state initialization: the read lock persistent state (path) is
// initialized here but only used in the read handler. The reason is its more convenient and
// less error prone to manage all stream structure in one place.
final String logRootParentPath = new File(logRootPath).getParent();
final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
final String lockPath = logRootPath + LOCK_PATH;
final String readLockPath = logRootPath + READ_LOCK_PATH;
final String versionPath = logRootPath + VERSION_PATH;
final String allocationPath = logRootPath + ALLOCATION_PATH;
int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
checkFutures.add(Utils.zkGetData(zk, versionPath, false));
checkFutures.add(Utils.zkGetData(zk, lockPath, false));
checkFutures.add(Utils.zkGetData(zk, readLockPath, false));
checkFutures.add(Utils.zkGetData(zk, logSegmentsPath, false));
if (ownAllocator) {
checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
}
return Future.collect(checkFutures);
}
static void createMissingMetadata(final ZooKeeper zk,
final String logRootPath,
final List<Versioned<byte[]>> metadatas,
final List<ACL> acl,
final boolean ownAllocator,
final boolean createIfNotExists,
final Promise<List<Versioned<byte[]>>> promise) {
final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
CreateMode createMode = CreateMode.PERSISTENT;
// log root parent path
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
pathsToCreate.add(null);
} else {
String logRootParentPath = new File(logRootPath).getParent();
pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
}
// log root path
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
}
// max id
if (pathExists(metadatas.get(MetadataIndex.MAX_TXID))) {
pathsToCreate.add(null);
} else {
byte[] zeroTxnIdData = DLUtils.serializeTransactionId(0L);
pathsToCreate.add(zeroTxnIdData);
zkOps.add(Op.create(logRootPath + MAX_TXID_PATH, zeroTxnIdData, acl, createMode));
}
// version
if (pathExists(metadatas.get(MetadataIndex.VERSION))) {
pathsToCreate.add(null);
} else {
byte[] versionData = intToBytes(LAYOUT_VERSION);
pathsToCreate.add(versionData);
zkOps.add(Op.create(logRootPath + VERSION_PATH, versionData, acl, createMode));
}
// lock path
if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
}
// read lock path
if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
}
// log segments path
if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
pathsToCreate.add(null);
} else {
byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
pathsToCreate.add(logSegmentsData);
zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
}
// allocation path
if (ownAllocator) {
if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
pathsToCreate.add(null);
} else {
pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
DistributedLogConstants.EMPTY_BYTES, acl, createMode));
}
}
if (zkOps.isEmpty()) {
// nothing missed
promise.setValue(metadatas);
return;
}
if (!createIfNotExists) {
promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
return;
}
zk.multi(zkOps, new AsyncCallback.MultiCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
if (KeeperException.Code.OK.intValue() == rc) {
List<Versioned<byte[]>> finalMetadatas =
Lists.newArrayListWithExpectedSize(metadatas.size());
for (int i = 0; i < pathsToCreate.size(); i++) {
byte[] dataCreated = pathsToCreate.get(i);
if (null == dataCreated) {
finalMetadatas.add(metadatas.get(i));
} else {
finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
}
}
promise.setValue(finalMetadatas);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
promise.setException(new LogExistsException("Someone just created log "
+ logRootPath));
} else {
if (LOG.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
for (OpResult result : resultList) {
if (result instanceof OpResult.ErrorResult) {
OpResult.ErrorResult errorResult = (OpResult.ErrorResult) result;
builder.append(errorResult.getErr()).append(",");
} else {
builder.append(0).append(",");
}
}
String resultCodeList = builder.substring(0, builder.length() - 1);
LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
}
promise.setException(new ZKException("Failed to create log " + logRootPath,
KeeperException.Code.get(rc)));
}
}
}, null);
}
static ZKLogMetadataForWriter processLogMetadatas(URI uri,
String logName,
String logIdentifier,
List<Versioned<byte[]>> metadatas,
boolean ownAllocator)
throws UnexpectedException {
try {
// max id
Versioned<byte[]> maxTxnIdData = metadatas.get(MetadataIndex.MAX_TXID);
ensureMetadataExist(maxTxnIdData);
// version
Versioned<byte[]> versionData = metadatas.get(MetadataIndex.VERSION);
ensureMetadataExist(maxTxnIdData);
Preconditions.checkArgument(LAYOUT_VERSION == bytesToInt(versionData.getValue()));
// lock path
ensureMetadataExist(metadatas.get(MetadataIndex.LOCK));
// read lock path
ensureMetadataExist(metadatas.get(MetadataIndex.READ_LOCK));
// max lssn
Versioned<byte[]> maxLSSNData = metadatas.get(MetadataIndex.LOGSEGMENTS);
ensureMetadataExist(maxLSSNData);
try {
DLUtils.deserializeLogSegmentSequenceNumber(maxLSSNData.getValue());
} catch (NumberFormatException nfe) {
throw new UnexpectedException("Invalid max sequence number found in log " + logName, nfe);
}
// allocation path
Versioned<byte[]> allocationData;
if (ownAllocator) {
allocationData = metadatas.get(MetadataIndex.ALLOCATION);
ensureMetadataExist(allocationData);
} else {
allocationData = new Versioned<byte[]>(null, null);
}
return new ZKLogMetadataForWriter(uri, logName, logIdentifier,
maxLSSNData, maxTxnIdData, allocationData);
} catch (IllegalArgumentException iae) {
throw new UnexpectedException("Invalid log " + logName, iae);
} catch (NullPointerException npe) {
throw new UnexpectedException("Invalid log " + logName, npe);
}
}
private final Versioned<byte[]> maxLSSNData;
private final Versioned<byte[]> maxTxIdData;
private final Versioned<byte[]> allocationData;
/**
* metadata representation of a log
*
* @param uri namespace to store the log
* @param logName name of the log
* @param logIdentifier identifier of the log
*/
private ZKLogMetadataForWriter(URI uri,
String logName,
String logIdentifier,
Versioned<byte[]> maxLSSNData,
Versioned<byte[]> maxTxIdData,
Versioned<byte[]> allocationData) {
super(uri, logName, logIdentifier);
this.maxLSSNData = maxLSSNData;
this.maxTxIdData = maxTxIdData;
this.allocationData = allocationData;
}
public Versioned<byte[]> getMaxLSSNData() {
return maxLSSNData;
}
public Versioned<byte[]> getMaxTxIdData() {
return maxTxIdData;
}
public Versioned<byte[]> getAllocationData() {
return allocationData;
}
}