blob: 630d626317be0e2759a623a4156e9de345637458 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.twitter.distributedlog.bk.LedgerAllocator;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.DLIllegalStateException;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.EndOfStreamException;
import com.twitter.distributedlog.exceptions.LockingException;
import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.function.GetLastTxIdFunction;
import com.twitter.distributedlog.impl.BKLogSegmentEntryWriter;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.logsegment.RollingPolicy;
import com.twitter.distributedlog.logsegment.SizeBasedRollingPolicy;
import com.twitter.distributedlog.logsegment.TimeBasedRollingPolicy;
import com.twitter.distributedlog.metadata.MetadataUpdater;
import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FailpointUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.FutureUtils.FutureEventListenerRunnable;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.Transaction;
import com.twitter.distributedlog.util.PermitLimiter;
import com.twitter.distributedlog.util.Utils;
import com.twitter.distributedlog.zk.ZKOp;
import com.twitter.distributedlog.zk.ZKTransaction;
import com.twitter.distributedlog.zk.ZKVersionedSetOp;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.Version;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Charsets.UTF_8;
import static com.twitter.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;
/**
* Log Handler for Writers.
*
* <h3>Metrics</h3>
* All the metrics about log write handler are exposed under scope `segments`.
* <ul>
* <li> `segments`/open : opstats. latency characteristics on starting a new log segment.
* <li> `segments`/close : opstats. latency characteristics on completing an inprogress log segment.
* <li> `segments`/recover : opstats. latency characteristics on recovering a log segment.
* <li> `segments`/delete : opstats. latency characteristics on deleting a log segment.
* </ul>
*/
class BKLogWriteHandler extends BKLogHandler {
static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
protected final DistributedLock lock;
protected final int ensembleSize;
protected final int writeQuorumSize;
protected final int ackQuorumSize;
protected final LedgerAllocator ledgerAllocator;
protected final MaxTxId maxTxId;
protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
protected final boolean sanityCheckTxnId;
protected final boolean validateLogSegmentSequenceNumber;
protected final int regionId;
protected volatile boolean closed = false;
protected final RollingPolicy rollingPolicy;
protected Future<DistributedLock> lockFuture = null;
protected final PermitLimiter writeLimiter;
protected final FeatureProvider featureProvider;
protected final DynamicDistributedLogConfiguration dynConf;
protected final MetadataUpdater metadataUpdater;
// tracking the inprogress log segments
protected final LinkedList<Long> inprogressLSSNs;
// Recover Functions
private final RecoverLogSegmentFunction recoverLogSegmentFunction =
new RecoverLogSegmentFunction();
private final AbstractFunction1<List<LogSegmentMetadata>, Future<Long>> recoverLogSegmentsFunction =
new AbstractFunction1<List<LogSegmentMetadata>, Future<Long>>() {
@Override
public Future<Long> apply(List<LogSegmentMetadata> segmentList) {
LOG.info("Initiating Recovery For {} : {}", getFullyQualifiedName(), segmentList);
// if lastLedgerRollingTimeMillis is not updated, we set it to now.
synchronized (BKLogWriteHandler.this) {
if (lastLedgerRollingTimeMillis < 0) {
lastLedgerRollingTimeMillis = Utils.nowInMillis();
}
}
if (validateLogSegmentSequenceNumber) {
synchronized (inprogressLSSNs) {
for (LogSegmentMetadata segment : segmentList) {
if (segment.isInProgress()) {
inprogressLSSNs.addLast(segment.getLogSegmentSequenceNumber());
}
}
}
}
return FutureUtils.processList(segmentList, recoverLogSegmentFunction, scheduler).map(
GetLastTxIdFunction.INSTANCE);
}
};
// Stats
private final StatsLogger perLogStatsLogger;
private final OpStatsLogger closeOpStats;
private final OpStatsLogger openOpStats;
private final OpStatsLogger recoverOpStats;
private final OpStatsLogger deleteOpStats;
/**
* Construct a Bookkeeper journal manager.
*/
BKLogWriteHandler(ZKLogMetadataForWriter logMetadata,
DistributedLogConfiguration conf,
ZooKeeperClientBuilder zkcBuilder,
BookKeeperClientBuilder bkcBuilder,
LogSegmentMetadataStore metadataStore,
OrderedScheduler scheduler,
LedgerAllocator allocator,
StatsLogger statsLogger,
StatsLogger perLogStatsLogger,
AlertStatsLogger alertStatsLogger,
String clientId,
int regionId,
PermitLimiter writeLimiter,
FeatureProvider featureProvider,
DynamicDistributedLogConfiguration dynConf,
DistributedLock lock /** owned by handler **/) {
super(logMetadata, conf, zkcBuilder, bkcBuilder, metadataStore,
scheduler, statsLogger, alertStatsLogger, null, WRITE_HANDLE_FILTER, clientId);
this.perLogStatsLogger = perLogStatsLogger;
this.writeLimiter = writeLimiter;
this.featureProvider = featureProvider;
this.dynConf = dynConf;
this.ledgerAllocator = allocator;
this.lock = lock;
this.metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
ensembleSize = conf.getEnsembleSize();
if (ensembleSize < conf.getWriteQuorumSize()) {
writeQuorumSize = ensembleSize;
LOG.warn("Setting write quorum size {} greater than ensemble size {}",
conf.getWriteQuorumSize(), ensembleSize);
} else {
writeQuorumSize = conf.getWriteQuorumSize();
}
if (writeQuorumSize < conf.getAckQuorumSize()) {
ackQuorumSize = writeQuorumSize;
LOG.warn("Setting write ack quorum size {} greater than write quorum size {}",
conf.getAckQuorumSize(), writeQuorumSize);
} else {
ackQuorumSize = conf.getAckQuorumSize();
}
if (conf.getEncodeRegionIDInLogSegmentMetadata()) {
this.regionId = regionId;
} else {
this.regionId = DistributedLogConstants.LOCAL_REGION_ID;
}
this.sanityCheckTxnId = conf.getSanityCheckTxnID();
this.validateLogSegmentSequenceNumber = conf.isLogSegmentSequenceNumberValidationEnabled();
// Construct the max sequence no
maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(logMetadata.getMaxLSSNData());
inprogressLSSNs = new LinkedList<Long>();
// Construct the max txn id.
maxTxId = new MaxTxId(zooKeeperClient, logMetadata.getMaxTxIdPath(),
conf.getSanityCheckTxnID(), logMetadata.getMaxTxIdData());
// Schedule fetching ledgers list in background before we access it.
// We don't need to watch the ledgers list changes for writer, as it manages ledgers list.
scheduleGetLedgersTask(false, true);
// Initialize other parameters.
setLastLedgerRollingTimeMillis(Utils.nowInMillis());
// Rolling Policy
if (conf.getLogSegmentRollingIntervalMinutes() > 0) {
rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000);
} else {
rollingPolicy = new SizeBasedRollingPolicy(conf.getMaxLogSegmentBytes());
}
// Stats
StatsLogger segmentsStatsLogger = statsLogger.scope("segments");
openOpStats = segmentsStatsLogger.getOpStatsLogger("open");
closeOpStats = segmentsStatsLogger.getOpStatsLogger("close");
recoverOpStats = segmentsStatsLogger.getOpStatsLogger("recover");
deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete");
}
// Transactional operations for MaxLogSegmentSequenceNo
void storeMaxSequenceNumber(final Transaction txn,
final MaxLogSegmentSequenceNo maxSeqNo,
final long seqNo,
final boolean isInprogress) {
byte[] data = DLUtils.serializeLogSegmentSequenceNumber(seqNo);
Op zkOp = Op.setData(logMetadata.getLogSegmentsPath(), data, maxSeqNo.getZkVersion());
txn.addOp(new ZKVersionedSetOp(zkOp, new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version version) {
if (validateLogSegmentSequenceNumber) {
synchronized (inprogressLSSNs) {
if (isInprogress) {
inprogressLSSNs.add(seqNo);
} else {
inprogressLSSNs.removeFirst();
}
}
}
maxSeqNo.update((ZkVersion) version, seqNo);
}
@Override
public void onAbort(Throwable t) {
// no-op
}
}));
}
// Transactional operations for MaxTxId
void storeMaxTxId(final ZKTransaction txn,
final MaxTxId maxTxId,
final long txId) {
byte[] data = maxTxId.couldStore(txId);
if (null != data) {
Op zkOp = Op.setData(maxTxId.getZkPath(), data, -1);
txn.addOp(new ZKVersionedSetOp(zkOp, new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version version) {
maxTxId.setMaxTxId(txId);
}
@Override
public void onAbort(Throwable t) {
}
}));
}
}
// Transactional operations for logsegment
void writeLogSegment(final ZKTransaction txn,
final List<ACL> acl,
final String inprogressSegmentName,
final LogSegmentMetadata metadata,
final String path) {
byte[] finalisedData = metadata.getFinalisedData().getBytes(UTF_8);
Op zkOp = Op.create(path, finalisedData, acl, CreateMode.PERSISTENT);
txn.addOp(new ZKOp(zkOp) {
@Override
protected void commitOpResult(OpResult opResult) {
addLogSegmentToCache(inprogressSegmentName, metadata);
}
@Override
protected void abortOpResult(Throwable t, OpResult opResult) {
// no-op
}
});
}
void deleteLogSegment(final ZKTransaction txn,
final String logSegmentName,
final String logSegmentPath) {
Op zkOp = Op.delete(logSegmentPath, -1);
txn.addOp(new ZKOp(zkOp) {
@Override
protected void commitOpResult(OpResult opResult) {
removeLogSegmentFromCache(logSegmentName);
}
@Override
protected void abortOpResult(Throwable t, OpResult opResult) {
// no-op
}
});
}
/**
* The caller could call this before any actions, which to hold the lock for
* the write handler of its whole lifecycle. The lock will only be released
* when closing the write handler.
*
* This method is useful to prevent releasing underlying zookeeper lock during
* recovering/completing log segments. Releasing underlying zookeeper lock means
* 1) increase latency when re-lock on starting new log segment. 2) increase the
* possibility of a stream being re-acquired by other instances.
*
* @return future represents the lock result
*/
Future<DistributedLock> lockHandler() {
if (null != lockFuture) {
return lockFuture;
}
lockFuture = lock.asyncAcquire();
return lockFuture;
}
Future<Void> unlockHandler() {
if (null != lockFuture) {
return lock.asyncClose();
} else {
return Future.Void();
}
}
void register(Watcher watcher) {
this.zooKeeperClient.register(watcher);
}
/**
* Start a new log segment in a BookKeeper ledger.
* First ensure that we have the write lock for this journal.
* Then create a ledger and stream based on that ledger.
* The ledger id is written to the inprogress znode, so that in the
* case of a crash, a recovery process can find the ledger we were writing
* to when we crashed.
*
* @param txId First transaction id to be written to the stream
* @return
* @throws IOException
*/
public BKLogSegmentWriter startLogSegment(long txId) throws IOException {
return startLogSegment(txId, false, false);
}
/**
* Start a new log segment in a BookKeeper ledger.
* First ensure that we have the write lock for this journal.
* Then create a ledger and stream based on that ledger.
* The ledger id is written to the inprogress znode, so that in the
* case of a crash, a recovery process can find the ledger we were writing
* to when we crashed.
*
* @param txId First transaction id to be written to the stream
* @param bestEffort
* @param allowMaxTxID
* allow using max tx id to start log segment
* @return
* @throws IOException
*/
public BKLogSegmentWriter startLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID)
throws IOException {
Stopwatch stopwatch = Stopwatch.createStarted();
boolean success = false;
try {
BKLogSegmentWriter writer = doStartLogSegment(txId, bestEffort, allowMaxTxID);
success = true;
return writer;
} finally {
if (success) {
openOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
} else {
openOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
}
}
}
protected long assignLogSegmentSequenceNumber() throws IOException {
// For any active stream we will always make sure that there is at least one
// active ledger (except when the stream first starts out). Therefore when we
// see no ledger metadata for a stream, we assume that this is the first ledger
// in the stream
long logSegmentSeqNo = DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO;
boolean logSegmentsFound = false;
if (LogSegmentMetadata.supportsLogSegmentSequenceNo(conf.getDLLedgerMetadataLayoutVersion())) {
List<LogSegmentMetadata> ledgerListDesc = getFilteredLedgerListDesc(false, false);
Long nextLogSegmentSeqNo = DLUtils.nextLogSegmentSequenceNumber(ledgerListDesc);
if (null == nextLogSegmentSeqNo) {
logSegmentsFound = false;
// we don't find last assigned log segment sequence number
// then we start the log segment with configured FirstLogSegmentSequenceNumber.
logSegmentSeqNo = conf.getFirstLogSegmentSequenceNumber();
} else {
logSegmentsFound = true;
// latest log segment is assigned with a sequence number, start with next sequence number
logSegmentSeqNo = nextLogSegmentSeqNo;
}
}
// We only skip log segment sequence number validation only when no log segments found &
// the maximum log segment sequence number is "UNASSIGNED".
if (!logSegmentsFound &&
(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO == maxLogSegmentSequenceNo.getSequenceNumber())) {
// no ledger seqno stored in /ledgers before
LOG.info("No max ledger sequence number found while creating log segment {} for {}.",
logSegmentSeqNo, getFullyQualifiedName());
} else if (maxLogSegmentSequenceNo.getSequenceNumber() + 1 != logSegmentSeqNo) {
LOG.warn("Unexpected max log segment sequence number {} for {} : list of cached segments = {}",
new Object[]{maxLogSegmentSequenceNo.getSequenceNumber(), getFullyQualifiedName(),
getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR)});
// there is max log segment number recorded there and it isn't match. throw exception.
throw new DLIllegalStateException("Unexpected max log segment sequence number "
+ maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
+ ", expected " + (logSegmentSeqNo - 1));
}
return logSegmentSeqNo;
}
protected BKLogSegmentWriter doStartLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) throws IOException {
return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, allowMaxTxID));
}
protected Future<BKLogSegmentWriter> asyncStartLogSegment(long txId,
boolean bestEffort,
boolean allowMaxTxID) {
Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>();
try {
lock.checkOwnershipAndReacquire();
} catch (LockingException e) {
FutureUtils.setException(promise, e);
return promise;
}
doStartLogSegment(txId, bestEffort, allowMaxTxID, promise);
return promise;
}
protected void doStartLogSegment(final long txId,
final boolean bestEffort,
final boolean allowMaxTxID,
final Promise<BKLogSegmentWriter> promise) {
// validate the tx id
if ((txId < 0) ||
(!allowMaxTxID && (txId == DistributedLogConstants.MAX_TXID))) {
FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId));
return;
}
if (this.sanityCheckTxnId) {
long highestTxIdWritten = maxTxId.get();
if (txId < highestTxIdWritten) {
if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
return;
}
else {
LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
return;
}
}
}
try {
ledgerAllocator.allocate();
} catch (IOException e) {
// failed to issue an allocation request
failStartLogSegment(promise, bestEffort, e);
return;
}
// start the transaction from zookeeper
final ZKTransaction txn = new ZKTransaction(zooKeeperClient);
// failpoint injected before creating ledger
try {
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate);
} catch (IOException ioe) {
failStartLogSegment(promise, bestEffort, ioe);
return;
}
ledgerAllocator.tryObtain(txn, new Transaction.OpListener<LedgerHandle>() {
@Override
public void onCommit(LedgerHandle lh) {
// no-op
}
@Override
public void onAbort(Throwable t) {
// no-op
}
}).addEventListener(new FutureEventListener<LedgerHandle>() {
@Override
public void onSuccess(LedgerHandle lh) {
// try-obtain succeed
createInprogressLogSegment(
txn,
txId,
lh,
bestEffort,
promise);
}
@Override
public void onFailure(Throwable cause) {
failStartLogSegment(promise, bestEffort, cause);
}
});
}
private void failStartLogSegment(Promise<BKLogSegmentWriter> promise,
boolean bestEffort,
Throwable cause) {
if (bestEffort) {
FutureUtils.setValue(promise, null);
} else {
FutureUtils.setException(promise, cause);
}
}
// once the ledger handle is obtained from allocator, this function should guarantee
// either the transaction is executed or aborted. Otherwise, the ledger handle will
// just leak from the allocation pool - hence cause "No Ledger Allocator"
private void createInprogressLogSegment(ZKTransaction txn,
final long txId,
final LedgerHandle lh,
boolean bestEffort,
final Promise<BKLogSegmentWriter> promise) {
final long logSegmentSeqNo;
try {
FailpointUtils.checkFailPoint(
FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber);
logSegmentSeqNo = assignLogSegmentSequenceNumber();
} catch (IOException e) {
// abort the current prepared transaction
txn.abort(e);
failStartLogSegment(promise, bestEffort, e);
return;
}
final String inprogressZnodeName = inprogressZNodeName(lh.getId(), txId, logSegmentSeqNo);
final String inprogressZnodePath = inprogressZNode(lh.getId(), txId, logSegmentSeqNo);
final LogSegmentMetadata l =
new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath,
conf.getDLLedgerMetadataLayoutVersion(), lh.getId(), txId)
.setLogSegmentSequenceNo(logSegmentSeqNo)
.setRegionId(regionId)
.setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(conf.getDLLedgerMetadataLayoutVersion()))
.build();
// Create an inprogress segment
writeLogSegment(
txn,
zooKeeperClient.getDefaultACL(),
inprogressZnodeName,
l,
inprogressZnodePath);
// Try storing max sequence number.
LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZnodePath, logSegmentSeqNo);
storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, logSegmentSeqNo, true);
// Try storing max tx id.
LOG.debug("Try storing MaxTxId in startLogSegment {} {}", inprogressZnodePath, txId);
storeMaxTxId(txn, maxTxId, txId);
txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
try {
FutureUtils.setValue(promise, new BKLogSegmentWriter(
getFullyQualifiedName(),
inprogressZnodeName,
conf,
conf.getDLLedgerMetadataLayoutVersion(),
new BKLogSegmentEntryWriter(lh),
lock,
txId,
logSegmentSeqNo,
scheduler,
statsLogger,
perLogStatsLogger,
alertStatsLogger,
writeLimiter,
featureProvider,
dynConf));
} catch (IOException ioe) {
failStartLogSegment(promise, false, ioe);
}
}
@Override
public void onFailure(Throwable cause) {
failStartLogSegment(promise, false, cause);
}
}, scheduler));
}
boolean shouldStartNewSegment(BKLogSegmentWriter writer) {
return rollingPolicy.shouldRollover(writer, lastLedgerRollingTimeMillis);
}
/**
* Finalize a log segment. If the journal manager is currently
* writing to a ledger, ensure that this is the ledger of the log segment
* being finalized.
* <p/>
* Otherwise this is the recovery case. In the recovery case, ensure that
* the firstTxId of the ledger matches firstTxId for the segment we are
* trying to finalize.
*/
Future<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) {
final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
completeAndCloseLogSegment(writer, promise);
return promise;
}
private void completeAndCloseLogSegment(final BKLogSegmentWriter writer,
final Promise<LogSegmentMetadata> promise) {
writer.asyncClose().addEventListener(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
// in theory closeToFinalize should throw exception if a stream is in error.
// just in case, add another checking here to make sure we don't close log segment is a stream is in error.
if (writer.shouldFailCompleteLogSegment()) {
FutureUtils.setException(promise,
new IOException("LogSegmentWriter for " + writer.getFullyQualifiedLogSegment() + " is already in error."));
return;
}
doCompleteAndCloseLogSegment(
inprogressZNodeName(writer.getLogSegmentId(), writer.getStartTxId(), writer.getLogSegmentSequenceNumber()),
writer.getLogSegmentSequenceNumber(),
writer.getLogSegmentId(),
writer.getStartTxId(),
writer.getLastTxId(),
writer.getPositionWithinLogSegment(),
writer.getLastDLSN().getEntryId(),
writer.getLastDLSN().getSlotId(),
promise);
}
@Override
public void onFailure(Throwable cause) {
FutureUtils.setException(promise, cause);
}
});
}
@VisibleForTesting
LogSegmentMetadata completeAndCloseLogSegment(long logSegmentSeqNo,
long ledgerId,
long firstTxId,
long lastTxId,
int recordCount)
throws IOException {
return completeAndCloseLogSegment(inprogressZNodeName(ledgerId, firstTxId, logSegmentSeqNo), logSegmentSeqNo,
ledgerId, firstTxId, lastTxId, recordCount, -1, -1);
}
/**
* Finalize a log segment. If the journal manager is currently
* writing to a ledger, ensure that this is the ledger of the log segment
* being finalized.
* <p/>
* Otherwise this is the recovery case. In the recovery case, ensure that
* the firstTxId of the ledger matches firstTxId for the segment we are
* trying to finalize.
*/
LogSegmentMetadata completeAndCloseLogSegment(String inprogressZnodeName, long logSegmentSeqNo,
long ledgerId, long firstTxId, long lastTxId,
int recordCount, long lastEntryId, long lastSlotId)
throws IOException {
Stopwatch stopwatch = Stopwatch.createStarted();
boolean success = false;
try {
LogSegmentMetadata completedLogSegment =
doCompleteAndCloseLogSegment(inprogressZnodeName, logSegmentSeqNo,
ledgerId, firstTxId, lastTxId, recordCount,
lastEntryId, lastSlotId);
success = true;
return completedLogSegment;
} finally {
if (success) {
closeOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
} else {
closeOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
}
}
}
protected long computeStartSequenceId(LogSegmentMetadata segment) throws IOException {
if (!segment.isInProgress()) {
return segment.getStartSequenceId();
}
long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
// we only record sequence id when both write version and logsegment's version support sequence id
if (LogSegmentMetadata.supportsSequenceId(conf.getDLLedgerMetadataLayoutVersion())
&& segment.supportsSequenceId()) {
List<LogSegmentMetadata> logSegmentDescList = getFilteredLedgerListDesc(false, false);
startSequenceId = DLUtils.computeStartSequenceId(logSegmentDescList, segment);
}
return startSequenceId;
}
/**
* Close log segment
*
* @param inprogressZnodeName
* @param logSegmentSeqNo
* @param ledgerId
* @param firstTxId
* @param lastTxId
* @param recordCount
* @param lastEntryId
* @param lastSlotId
* @throws IOException
*/
protected LogSegmentMetadata doCompleteAndCloseLogSegment(
String inprogressZnodeName,
long logSegmentSeqNo,
long ledgerId,
long firstTxId,
long lastTxId,
int recordCount,
long lastEntryId,
long lastSlotId) throws IOException {
Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
doCompleteAndCloseLogSegment(
inprogressZnodeName,
logSegmentSeqNo,
ledgerId,
firstTxId,
lastTxId,
recordCount,
lastEntryId,
lastSlotId,
promise);
return FutureUtils.result(promise);
}
protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName,
long logSegmentSeqNo,
long ledgerId,
long firstTxId,
long lastTxId,
int recordCount,
long lastEntryId,
long lastSlotId,
final Promise<LogSegmentMetadata> promise) {
try {
lock.checkOwnershipAndReacquire();
} catch (IOException ioe) {
FutureUtils.setException(promise, ioe);
return;
}
LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId);
final String inprogressZnodePath = inprogressZNode(inprogressZnodeName);
LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName);
// validate log segment
if (inprogressLogSegment.getLedgerId() != ledgerId) {
FutureUtils.setException(promise, new IOException(
"Active ledger has different ID to inprogress. "
+ inprogressLogSegment.getLedgerId() + " found, "
+ ledgerId + " expected"));
return;
}
// validate the transaction id
if (inprogressLogSegment.getFirstTxId() != firstTxId) {
FutureUtils.setException(promise, new IOException("Transaction id not as expected, "
+ inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected"));
return;
}
// validate the log sequence number
if (validateLogSegmentSequenceNumber) {
synchronized (inprogressLSSNs) {
if (inprogressLSSNs.isEmpty()) {
FutureUtils.setException(promise, new UnexpectedException(
"Didn't find matched inprogress log segments when completing inprogress "
+ inprogressLogSegment));
return;
}
long leastInprogressLSSN = inprogressLSSNs.getFirst();
// the log segment sequence number in metadata {@link inprogressLogSegment.getLogSegmentSequenceNumber()}
// should be same as the sequence number we are completing (logSegmentSeqNo)
// and
// it should also be same as the least inprogress log segment sequence number tracked in {@link inprogressLSSNs}
if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo) ||
(leastInprogressLSSN != logSegmentSeqNo)) {
FutureUtils.setException(promise, new UnexpectedException(
"Didn't find matched inprogress log segments when completing inprogress "
+ inprogressLogSegment));
return;
}
}
}
// store max sequence number.
long maxSeqNo= Math.max(logSegmentSeqNo, maxLogSegmentSequenceNo.getSequenceNumber());
if (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo ||
(maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo + 1)) {
// ignore the case that a new inprogress log segment is pre-allocated
// before completing current inprogress one
LOG.info("Try storing max sequence number {} in completing {}.",
new Object[] { logSegmentSeqNo, inprogressZnodePath });
} else {
LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() });
if (validateLogSegmentSequenceNumber) {
FutureUtils.setException(promise, new DLIllegalStateException("Unexpected max log segment sequence number "
+ maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
+ ", expected " + (logSegmentSeqNo - 1)));
return;
}
}
// Prepare the completion
final String nameForCompletedLedger = completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo);
final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo);
long startSequenceId;
try {
startSequenceId = computeStartSequenceId(inprogressLogSegment);
} catch (IOException ioe) {
FutureUtils.setException(promise, ioe);
return;
}
// write completed ledger znode
final LogSegmentMetadata completedLogSegment =
inprogressLogSegment.completeLogSegment(
pathForCompletedLedger,
lastTxId,
recordCount,
lastEntryId,
lastSlotId,
startSequenceId);
setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime());
// prepare the transaction
ZKTransaction txn = new ZKTransaction(zooKeeperClient);
// create completed log segment
writeLogSegment(
txn,
zooKeeperClient.getDefaultACL(),
nameForCompletedLedger,
completedLogSegment,
pathForCompletedLedger);
// delete inprogress log segment
deleteLogSegment(txn, inprogressZnodeName, inprogressZnodePath);
// store max sequence number
storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false);
// update max txn id.
LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", pathForCompletedLedger, lastTxId);
storeMaxTxId(txn, maxTxId, lastTxId);
txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
LOG.info("Completed {} to {} for {} : {}",
new Object[] { inprogressZnodeName, nameForCompletedLedger, getFullyQualifiedName(), completedLogSegment });
FutureUtils.setValue(promise, completedLogSegment);
}
@Override
public void onFailure(Throwable cause) {
FutureUtils.setException(promise, cause);
}
}, scheduler));
}
public Future<Long> recoverIncompleteLogSegments() {
try {
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
} catch (IOException ioe) {
return Future.exception(ioe);
}
return asyncGetFilteredLedgerList(false, false).flatMap(recoverLogSegmentsFunction);
}
class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, Future<LogSegmentMetadata>> {
@Override
public Future<LogSegmentMetadata> apply(final LogSegmentMetadata l) {
if (!l.isInProgress()) {
return Future.value(l);
}
LOG.info("Recovering last record in log segment {} for {}.", l, getFullyQualifiedName());
return asyncReadLastRecord(l, true, true, true).flatMap(
new AbstractFunction1<LogRecordWithDLSN, Future<LogSegmentMetadata>>() {
@Override
public Future<LogSegmentMetadata> apply(LogRecordWithDLSN lastRecord) {
return completeLogSegment(l, lastRecord);
}
});
}
private Future<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l,
LogRecordWithDLSN lastRecord) {
LOG.info("Recovered last record in log segment {} for {}.", l, getFullyQualifiedName());
long endTxId = DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID;
int recordCount = 0;
long lastEntryId = -1;
long lastSlotId = -1;
if (null != lastRecord) {
endTxId = lastRecord.getTransactionId();
recordCount = lastRecord.getLastPositionWithinLogSegment();
lastEntryId = lastRecord.getDlsn().getEntryId();
lastSlotId = lastRecord.getDlsn().getSlotId();
}
if (endTxId == DistributedLogConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + l.getZkPath()
+ ". Unable to continue recovery.");
return Future.exception(new IOException("Unrecoverable corruption,"
+ " please check logs."));
} else if (endTxId == DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) {
// TODO: Empty ledger - Ideally we should just remove it?
endTxId = l.getFirstTxId();
}
Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
doCompleteAndCloseLogSegment(
l.getZNodeName(),
l.getLogSegmentSequenceNumber(),
l.getLedgerId(),
l.getFirstTxId(),
endTxId,
recordCount,
lastEntryId,
lastSlotId,
promise);
return promise;
}
}
public void deleteLog() throws IOException {
lock.checkOwnershipAndReacquire();
FutureUtils.result(purgeLogSegmentsOlderThanTxnId(-1));
try {
Utils.closeQuietly(lock);
zooKeeperClient.get().exists(logMetadata.getLogSegmentsPath(), false);
zooKeeperClient.get().exists(logMetadata.getMaxTxIdPath(), false);
if (logMetadata.getLogRootPath().toLowerCase().contains("distributedlog")) {
ZKUtil.deleteRecursive(zooKeeperClient.get(), logMetadata.getLogRootPath());
} else {
LOG.warn("Skip deletion of unrecognized ZK Path {}", logMetadata.getLogRootPath());
}
} catch (InterruptedException ie) {
LOG.error("Interrupted while deleting log znodes", ie);
throw new DLInterruptedException("Interrupted while deleting " + logMetadata.getLogRootPath(), ie);
} catch (KeeperException ke) {
LOG.error("Error deleting" + logMetadata.getLogRootPath() + " in zookeeper", ke);
}
}
Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
if (DLSN.InvalidDLSN == dlsn) {
List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0);
return Future.value(emptyList);
}
scheduleGetAllLedgersTaskIfNeeded();
return asyncGetFullLedgerList(false, false).flatMap(
new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
@Override
public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
return setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn);
}
});
}
private Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments,
final DLSN dlsn) {
LOG.debug("Setting truncation status on logs older than {} from {} for {}",
new Object[]{dlsn, logSegments, getFullyQualifiedName()});
List<LogSegmentMetadata> truncateList = new ArrayList<LogSegmentMetadata>(logSegments.size());
LogSegmentMetadata partialTruncate = null;
LOG.info("{}: Truncating log segments older than {}", getFullyQualifiedName(), dlsn);
int numCandidates = getNumCandidateLogSegmentsToTruncate(logSegments);
for (int i = 0; i < numCandidates; i++) {
LogSegmentMetadata l = logSegments.get(i);
if (!l.isInProgress()) {
if (l.getLastDLSN().compareTo(dlsn) < 0) {
LOG.debug("{}: Truncating log segment {} ", getFullyQualifiedName(), l);
truncateList.add(l);
} else if (l.getFirstDLSN().compareTo(dlsn) < 0) {
// Can be satisfied by at most one segment
if (null != partialTruncate) {
String logMsg = String.format("Potential metadata inconsistency for stream %s at segment %s", getFullyQualifiedName(), l);
LOG.error(logMsg);
return Future.exception(new DLIllegalStateException(logMsg));
}
LOG.info("{}: Partially truncating log segment {} older than {}.", new Object[] {getFullyQualifiedName(), l, dlsn});
partialTruncate = l;
} else {
break;
}
} else {
break;
}
}
return setLogSegmentTruncationStatus(truncateList, partialTruncate, dlsn);
}
private int getNumCandidateLogSegmentsToTruncate(List<LogSegmentMetadata> logSegments) {
if (logSegments.isEmpty()) {
return 0;
} else {
// we have to keep at least one completed log segment for sequence id
int numCandidateLogSegments = 0;
for (LogSegmentMetadata segment : logSegments) {
if (segment.isInProgress()) {
break;
} else {
++numCandidateLogSegments;
}
}
return numCandidateLogSegments - 1;
}
}
Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) {
if (minTimestampToKeep >= Utils.nowInMillis()) {
return Future.exception(new IllegalArgumentException(
"Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName()));
}
return asyncGetFullLedgerList(false, false).flatMap(
new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
@Override
public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size());
int numCandidates = getNumCandidateLogSegmentsToTruncate(logSegments);
for (int iterator = 0; iterator < numCandidates; iterator++) {
LogSegmentMetadata l = logSegments.get(iterator);
// When application explicitly truncates segments; timestamp based purge is
// only used to cleanup log segments that have been marked for truncation
if ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
!l.isInProgress() && (l.getCompletionTime() < minTimestampToKeep)) {
purgeList.add(l);
} else {
// stop truncating log segments if we find either an inprogress or a partially
// truncated log segment
break;
}
}
LOG.info("Deleting log segments older than {} for {} : {}",
new Object[] { minTimestampToKeep, getFullyQualifiedName(), purgeList });
return deleteLogSegments(purgeList);
}
});
}
Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) {
return asyncGetFullLedgerList(true, false).flatMap(
new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
@Override
public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
int numLogSegmentsToProcess;
if (minTxIdToKeep < 0) {
// we are deleting the log, we can remove whole log segments
numLogSegmentsToProcess = logSegments.size();
} else {
numLogSegmentsToProcess = getNumCandidateLogSegmentsToTruncate(logSegments);
}
List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess);
for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) {
LogSegmentMetadata l = logSegments.get(iterator);
if ((minTxIdToKeep < 0) ||
((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
!l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) {
purgeList.add(l);
} else {
// stop truncating log segments if we find either an inprogress or a partially
// truncated log segment
break;
}
}
return deleteLogSegments(purgeList);
}
});
}
private Future<List<LogSegmentMetadata>> setLogSegmentTruncationStatus(
final List<LogSegmentMetadata> truncateList,
LogSegmentMetadata partialTruncate,
DLSN minActiveDLSN) {
final List<LogSegmentMetadata> listToTruncate = Lists.newArrayListWithCapacity(truncateList.size() + 1);
final List<LogSegmentMetadata> listAfterTruncated = Lists.newArrayListWithCapacity(truncateList.size() + 1);
Transaction<Object> updateTxn = metadataUpdater.transaction();
for(LogSegmentMetadata l : truncateList) {
if (!l.isTruncated()) {
LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentTruncated(updateTxn, l);
listToTruncate.add(l);
listAfterTruncated.add(newSegment);
}
}
if (null != partialTruncate && (partialTruncate.isNonTruncated() ||
(partialTruncate.isPartiallyTruncated() && (partialTruncate.getMinActiveDLSN().compareTo(minActiveDLSN) < 0)))) {
LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentPartiallyTruncated(
updateTxn, partialTruncate, minActiveDLSN);
listToTruncate.add(partialTruncate);
listAfterTruncated.add(newSegment);
}
return updateTxn.execute().map(new AbstractFunction1<Void, List<LogSegmentMetadata>>() {
@Override
public List<LogSegmentMetadata> apply(Void value) {
for (int i = 0; i < listToTruncate.size(); i++) {
removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName());
LogSegmentMetadata newSegment = listAfterTruncated.get(i);
addLogSegmentToCache(newSegment.getSegmentName(), newSegment);
}
return listAfterTruncated;
}
});
}
private Future<List<LogSegmentMetadata>> deleteLogSegments(
final List<LogSegmentMetadata> logs) {
if (LOG.isTraceEnabled()) {
LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs);
}
return FutureUtils.processList(logs,
new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() {
@Override
public Future<LogSegmentMetadata> apply(LogSegmentMetadata segment) {
return deleteLogSegment(segment);
}
}, scheduler);
}
private Future<LogSegmentMetadata> deleteLogSegment(
final LogSegmentMetadata ledgerMetadata) {
LOG.info("Deleting ledger {} for {}", ledgerMetadata, getFullyQualifiedName());
final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
final Stopwatch stopwatch = Stopwatch.createStarted();
promise.addEventListener(new FutureEventListener<LogSegmentMetadata>() {
@Override
public void onSuccess(LogSegmentMetadata segment) {
deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
}
@Override
public void onFailure(Throwable cause) {
deleteOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
}
});
try {
bookKeeperClient.get().asyncDeleteLedger(ledgerMetadata.getLedgerId(), new AsyncCallback.DeleteCallback() {
@Override
public void deleteComplete(int rc, Object ctx) {
if (BKException.Code.NoSuchLedgerExistsException == rc) {
LOG.warn("No ledger {} found to delete for {} : {}.",
new Object[]{ledgerMetadata.getLedgerId(), getFullyQualifiedName(),
ledgerMetadata});
} else if (BKException.Code.OK != rc) {
BKException bke = BKException.create(rc);
LOG.error("Couldn't delete ledger {} from bookkeeper for {} : ",
new Object[]{ledgerMetadata.getLedgerId(), getFullyQualifiedName(), bke});
promise.setException(bke);
return;
}
// after the ledger is deleted, we delete the metadata znode
scheduler.submit(new Runnable() {
@Override
public void run() {
deleteLogSegmentMetadata(ledgerMetadata, promise);
}
});
}
}, null);
} catch (IOException e) {
promise.setException(BKException.create(BKException.Code.BookieHandleNotAvailableException));
}
return promise;
}
private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata,
final Promise<LogSegmentMetadata> promise) {
Transaction<Object> deleteTxn = metadataStore.transaction();
metadataStore.deleteLogSegment(deleteTxn, segmentMetadata);
deleteTxn.execute().addEventListener(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void result) {
// purge log segment
removeLogSegmentFromCache(segmentMetadata.getZNodeName());
promise.setValue(segmentMetadata);
}
@Override
public void onFailure(Throwable cause) {
if (cause instanceof ZKException) {
ZKException zke = (ZKException) cause;
if (KeeperException.Code.NONODE == zke.getKeeperExceptionCode()) {
LOG.error("No log segment {} found for {}.",
segmentMetadata, getFullyQualifiedName());
// purge log segment
removeLogSegmentFromCache(segmentMetadata.getZNodeName());
promise.setValue(segmentMetadata);
return;
}
}
LOG.error("Couldn't purge {} for {}: with error {}",
new Object[]{ segmentMetadata, getFullyQualifiedName(), cause });
promise.setException(cause);
}
});
}
@Override
public Future<Void> asyncClose() {
return Utils.closeSequence(scheduler,
lock,
ledgerAllocator
).flatMap(new AbstractFunction1<Void, Future<Void>>() {
@Override
public Future<Void> apply(Void result) {
return BKLogWriteHandler.super.asyncClose();
}
});
}
@Override
public Future<Void> asyncAbort() {
return asyncClose();
}
String completedLedgerZNodeName(long firstTxId, long lastTxId, long logSegmentSeqNo) {
if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) {
return String.format("%s_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, logSegmentSeqNo);
} else {
return String.format("%s_%018d_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX,
firstTxId, lastTxId);
}
}
/**
* Get the znode path for a finalize ledger
*/
String completedLedgerZNode(long firstTxId, long lastTxId, long logSegmentSeqNo) {
return String.format("%s/%s", logMetadata.getLogSegmentsPath(),
completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo));
}
/**
* Get the name of the inprogress znode.
*
* @return name of the inprogress znode.
*/
String inprogressZNodeName(long ledgerId, long firstTxId, long logSegmentSeqNo) {
if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) {
// Lots of the problems are introduced due to different inprogress names with same ledger sequence number.
return String.format("%s_%018d", DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX, logSegmentSeqNo);
} else {
return DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX + "_" + Long.toString(firstTxId, 16);
}
}
/**
* Get the znode path for the inprogressZNode
*/
String inprogressZNode(long ledgerId, long firstTxId, long logSegmentSeqNo) {
return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName(ledgerId, firstTxId, logSegmentSeqNo);
}
String inprogressZNode(String inprogressZNodeName) {
return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName;
}
}