blob: 955fafed4dc4122c578a3413660d28fefadd2d75 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog;
import static org.apache.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.LogSegmentNotFoundException;
import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.function.GetLastTxIdFunction;
import org.apache.distributedlog.lock.DistributedLock;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.distributedlog.logsegment.LogSegmentFilter;
import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
import org.apache.distributedlog.logsegment.RollingPolicy;
import org.apache.distributedlog.logsegment.SizeBasedRollingPolicy;
import org.apache.distributedlog.logsegment.TimeBasedRollingPolicy;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.metadata.LogStreamMetadataStore;
import org.apache.distributedlog.metadata.MetadataUpdater;
import org.apache.distributedlog.util.Allocator;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Log Handler for Writers.
*
* <h3>Metrics</h3>
* All the metrics about log write handler are exposed under scope `segments`.
* <ul>
* <li> `segments`/open : opstats. latency characteristics on starting a new log segment.
* <li> `segments`/close : opstats. latency characteristics on completing an inprogress log segment.
* <li> `segments`/recover : opstats. latency characteristics on recovering a log segment.
* <li> `segments`/delete : opstats. latency characteristics on deleting a log segment.
* </ul>
*/
class BKLogWriteHandler extends BKLogHandler {
static final Logger LOG = LoggerFactory.getLogger(BKLogWriteHandler.class);
private static Transaction.OpListener<LogSegmentEntryWriter> NULL_OP_LISTENER =
new Transaction.OpListener<LogSegmentEntryWriter>() {
@Override
public void onCommit(LogSegmentEntryWriter r) {
// no-op
}
@Override
public void onAbort(Throwable t) {
// no-op
}
};
protected final LogMetadataForWriter logMetadataForWriter;
protected final Allocator<LogSegmentEntryWriter, Object> logSegmentAllocator;
protected final DistributedLock lock;
protected final MaxTxId maxTxId;
protected final MaxLogSegmentSequenceNo maxLogSegmentSequenceNo;
protected final boolean validateLogSegmentSequenceNumber;
protected final int regionId;
protected final RollingPolicy rollingPolicy;
protected CompletableFuture<? extends DistributedLock> lockFuture = null;
protected final PermitLimiter writeLimiter;
protected final FeatureProvider featureProvider;
protected final DynamicDistributedLogConfiguration dynConf;
protected final MetadataUpdater metadataUpdater;
// tracking the inprogress log segments
protected final LinkedList<Long> inprogressLSSNs;
// Fetch LogSegments State: write can continue without full list of log segments while truncation needs
private final CompletableFuture<Versioned<List<LogSegmentMetadata>>> fetchForWrite;
private CompletableFuture<Versioned<List<LogSegmentMetadata>>> fetchForTruncation;
// Recover Functions
private final RecoverLogSegmentFunction recoverLogSegmentFunction =
new RecoverLogSegmentFunction();
private final Function<List<LogSegmentMetadata>, CompletableFuture<Long>> recoverLogSegmentsFunction =
new Function<List<LogSegmentMetadata>, CompletableFuture<Long>>() {
@Override
public CompletableFuture<Long> apply(List<LogSegmentMetadata> segmentList) {
LOG.info("Initiating Recovery For {} : {}", getFullyQualifiedName(), segmentList);
// if lastLedgerRollingTimeMillis is not updated, we set it to now.
synchronized (BKLogWriteHandler.this) {
if (lastLedgerRollingTimeMillis < 0) {
lastLedgerRollingTimeMillis = Utils.nowInMillis();
}
}
if (validateLogSegmentSequenceNumber) {
synchronized (inprogressLSSNs) {
for (LogSegmentMetadata segment : segmentList) {
if (segment.isInProgress()) {
inprogressLSSNs.addLast(segment.getLogSegmentSequenceNumber());
}
}
}
}
return FutureUtils.processList(
segmentList,
recoverLogSegmentFunction,
scheduler
).thenApply(GetLastTxIdFunction.INSTANCE);
}
};
// Stats
private final StatsLogger perLogStatsLogger;
private final OpStatsLogger closeOpStats;
private final OpStatsLogger openOpStats;
private final OpStatsLogger recoverOpStats;
private final OpStatsLogger deleteOpStats;
/**
* Construct a Bookkeeper journal manager.
*/
BKLogWriteHandler(LogMetadataForWriter logMetadata,
DistributedLogConfiguration conf,
LogStreamMetadataStore streamMetadataStore,
LogSegmentMetadataCache metadataCache,
LogSegmentEntryStore entryStore,
OrderedScheduler scheduler,
Allocator<LogSegmentEntryWriter, Object> segmentAllocator,
StatsLogger statsLogger,
StatsLogger perLogStatsLogger,
AlertStatsLogger alertStatsLogger,
String clientId,
int regionId,
PermitLimiter writeLimiter,
FeatureProvider featureProvider,
DynamicDistributedLogConfiguration dynConf,
DistributedLock lock /** owned by handler **/) {
super(logMetadata,
conf,
streamMetadataStore,
metadataCache,
entryStore,
scheduler,
statsLogger,
alertStatsLogger,
clientId);
this.logMetadataForWriter = logMetadata;
this.logSegmentAllocator = segmentAllocator;
this.perLogStatsLogger = perLogStatsLogger;
this.writeLimiter = writeLimiter;
this.featureProvider = featureProvider;
this.dynConf = dynConf;
this.lock = lock;
this.metadataUpdater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
if (conf.getEncodeRegionIDInLogSegmentMetadata()) {
this.regionId = regionId;
} else {
this.regionId = DistributedLogConstants.LOCAL_REGION_ID;
}
this.validateLogSegmentSequenceNumber = conf.isLogSegmentSequenceNumberValidationEnabled();
// Construct the max sequence no
maxLogSegmentSequenceNo = new MaxLogSegmentSequenceNo(logMetadata.getMaxLSSNData());
inprogressLSSNs = new LinkedList<Long>();
// Construct the max txn id.
maxTxId = new MaxTxId(logMetadata.getMaxTxIdData());
// Schedule fetching log segment list in background before we access it.
// We don't need to watch the log segment list changes for writer, as it manages log segment list.
fetchForWrite = readLogSegmentsFromStore(
LogSegmentMetadata.COMPARATOR,
WRITE_HANDLE_FILTER,
null);
// Initialize other parameters.
setLastLedgerRollingTimeMillis(Utils.nowInMillis());
// Rolling Policy
if (conf.getLogSegmentRollingIntervalMinutes() > 0) {
rollingPolicy = new TimeBasedRollingPolicy(conf.getLogSegmentRollingIntervalMinutes() * 60 * 1000L);
} else {
rollingPolicy = new SizeBasedRollingPolicy(conf.getMaxLogSegmentBytes());
}
// Stats
StatsLogger segmentsStatsLogger = statsLogger.scope("segments");
openOpStats = segmentsStatsLogger.getOpStatsLogger("open");
closeOpStats = segmentsStatsLogger.getOpStatsLogger("close");
recoverOpStats = segmentsStatsLogger.getOpStatsLogger("recover");
deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete");
}
private CompletableFuture<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch(
final Comparator<LogSegmentMetadata> comparator) {
final CompletableFuture<List<LogSegmentMetadata>> promise = new CompletableFuture<List<LogSegmentMetadata>>();
fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
@Override
public void onFailure(Throwable cause) {
promise.completeExceptionally(cause);
}
@Override
public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
try {
promise.complete(getCachedLogSegments(comparator));
} catch (UnexpectedException e) {
promise.completeExceptionally(e);
}
}
});
return promise;
}
private CompletableFuture<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch(
final Comparator<LogSegmentMetadata> comparator) {
CompletableFuture<Versioned<List<LogSegmentMetadata>>> result;
synchronized (this) {
if (null == fetchForTruncation) {
fetchForTruncation = readLogSegmentsFromStore(
LogSegmentMetadata.COMPARATOR,
LogSegmentFilter.DEFAULT_FILTER,
null);
}
result = fetchForTruncation;
}
final CompletableFuture<List<LogSegmentMetadata>> promise = new CompletableFuture<List<LogSegmentMetadata>>();
result.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
@Override
public void onFailure(Throwable cause) {
FutureUtils.completeExceptionally(promise, cause);
}
@Override
public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
try {
FutureUtils.complete(promise, getCachedLogSegments(comparator));
} catch (UnexpectedException e) {
FutureUtils.completeExceptionally(promise, e);
}
}
});
return promise;
}
// Transactional operations for MaxLogSegmentSequenceNo
void storeMaxSequenceNumber(final Transaction<Object> txn,
final MaxLogSegmentSequenceNo maxSeqNo,
final long seqNo,
final boolean isInprogress) {
metadataStore.storeMaxLogSegmentSequenceNumber(txn, logMetadata, maxSeqNo.getVersionedData(seqNo),
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version version) {
if (validateLogSegmentSequenceNumber) {
synchronized (inprogressLSSNs) {
if (isInprogress) {
inprogressLSSNs.add(seqNo);
} else {
inprogressLSSNs.removeFirst();
}
}
}
maxSeqNo.update(version, seqNo);
}
@Override
public void onAbort(Throwable t) {
// no-op
}
});
}
// Transactional operations for MaxTxId
void storeMaxTxId(final Transaction<Object> txn,
final MaxTxId maxTxId,
final long txId) {
metadataStore.storeMaxTxnId(txn, logMetadataForWriter, maxTxId.getVersionedData(txId),
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version version) {
maxTxId.update(version, txId);
}
@Override
public void onAbort(Throwable t) {
// no-op
}
});
}
// Transactional operations for logsegment
void writeLogSegment(final Transaction<Object> txn,
final LogSegmentMetadata metadata) {
metadataStore.createLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
@Override
public void onCommit(Void r) {
addLogSegmentToCache(metadata.getSegmentName(), metadata);
}
@Override
public void onAbort(Throwable t) {
// no-op
}
});
}
void deleteLogSegment(final Transaction<Object> txn,
final LogSegmentMetadata metadata) {
metadataStore.deleteLogSegment(txn, metadata, new Transaction.OpListener<Void>() {
@Override
public void onCommit(Void r) {
removeLogSegmentFromCache(metadata.getSegmentName());
}
@Override
public void onAbort(Throwable t) {
// no-op
}
});
}
/**
* Delete the whole log and all log segments under the log.
*/
void deleteLog() throws IOException {
lock.checkOwnershipAndReacquire();
Utils.ioResult(purgeLogSegmentsOlderThanTxnId(-1));
Utils.closeQuietly(lock);
}
/**
* The caller could call this before any actions, which to hold the lock for
* the write handler of its whole lifecycle. The lock will only be released
* when closing the write handler.
* This method is useful to prevent releasing underlying zookeeper lock during
* recovering/completing log segments. Releasing underlying zookeeper lock means
* 1) increase latency when re-lock on starting new log segment. 2) increase the
* possibility of a stream being re-acquired by other instances.
*
* @return future represents the lock result
*/
CompletableFuture<? extends DistributedLock> lockHandler() {
if (null != lockFuture) {
return lockFuture;
}
lockFuture = lock.asyncAcquire();
return lockFuture;
}
CompletableFuture<Void> unlockHandler() {
if (null != lockFuture) {
return lock.asyncClose();
} else {
return FutureUtils.Void();
}
}
/**
* Start a new log segment in a BookKeeper ledger.
* First ensure that we have the write lock for this journal.
* Then create a ledger and stream based on that ledger.
* The ledger id is written to the inprogress znode, so that in the
* case of a crash, a recovery process can find the ledger we were writing
* to when we crashed.
*
* @param txId First transaction id to be written to the stream
* @return
* @throws IOException
*/
public BKLogSegmentWriter startLogSegment(long txId) throws IOException {
return startLogSegment(txId, false, false);
}
/**
* Start a new log segment in a BookKeeper ledger.
* First ensure that we have the write lock for this journal.
* Then create a ledger and stream based on that ledger.
* The ledger id is written to the inprogress znode, so that in the
* case of a crash, a recovery process can find the ledger we were writing
* to when we crashed.
*
* @param txId First transaction id to be written to the stream
* @param bestEffort
* @param allowMaxTxID
* allow using max tx id to start log segment
* @return
* @throws IOException
*/
public BKLogSegmentWriter startLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID)
throws IOException {
Stopwatch stopwatch = Stopwatch.createStarted();
boolean success = false;
try {
BKLogSegmentWriter writer = doStartLogSegment(txId, bestEffort, allowMaxTxID);
success = true;
return writer;
} finally {
if (success) {
openOpStats.registerSuccessfulEvent(
stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
} else {
openOpStats.registerFailedEvent(
stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS);
}
}
}
protected long assignLogSegmentSequenceNumber() throws IOException {
// For any active stream we will always make sure that there is at least one
// active ledger (except when the stream first starts out). Therefore when we
// see no ledger metadata for a stream, we assume that this is the first ledger
// in the stream
long logSegmentSeqNo = DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO;
boolean logSegmentsFound = false;
if (LogSegmentMetadata.supportsLogSegmentSequenceNo(conf.getDLLedgerMetadataLayoutVersion())) {
List<LogSegmentMetadata> ledgerListDesc = getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
Long nextLogSegmentSeqNo = DLUtils.nextLogSegmentSequenceNumber(ledgerListDesc);
if (null == nextLogSegmentSeqNo) {
logSegmentsFound = false;
// we don't find last assigned log segment sequence number
// then we start the log segment with configured FirstLogSegmentSequenceNumber.
logSegmentSeqNo = conf.getFirstLogSegmentSequenceNumber();
} else {
logSegmentsFound = true;
// latest log segment is assigned with a sequence number, start with next sequence number
logSegmentSeqNo = nextLogSegmentSeqNo;
}
}
// We only skip log segment sequence number validation only when no log segments found &
// the maximum log segment sequence number is "UNASSIGNED".
if (!logSegmentsFound
&& (DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO
== maxLogSegmentSequenceNo.getSequenceNumber())) {
// no ledger seqno stored in /ledgers before
LOG.info("No max ledger sequence number found while creating log segment {} for {}.",
logSegmentSeqNo, getFullyQualifiedName());
} else if (maxLogSegmentSequenceNo.getSequenceNumber() + 1 != logSegmentSeqNo) {
LOG.warn("Unexpected max log segment sequence number {} for {} : list of cached segments = {}",
new Object[]{maxLogSegmentSequenceNo.getSequenceNumber(), getFullyQualifiedName(),
getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR)});
// there is max log segment number recorded there and it isn't match. throw exception.
throw new DLIllegalStateException("Unexpected max log segment sequence number "
+ maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
+ ", expected " + (logSegmentSeqNo - 1));
}
return logSegmentSeqNo;
}
protected BKLogSegmentWriter doStartLogSegment(long txId,
boolean bestEffort, boolean allowMaxTxID) throws IOException {
return Utils.ioResult(asyncStartLogSegment(txId, bestEffort, allowMaxTxID));
}
protected CompletableFuture<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
final boolean bestEffort,
final boolean allowMaxTxID) {
final CompletableFuture<BKLogSegmentWriter> promise = new CompletableFuture<BKLogSegmentWriter>();
try {
lock.checkOwnershipAndReacquire();
} catch (LockingException e) {
FutureUtils.completeExceptionally(promise, e);
return promise;
}
fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
@Override
public void onFailure(Throwable cause) {
FutureUtils.completeExceptionally(promise, cause);
}
@Override
public void onSuccess(Versioned<List<LogSegmentMetadata>> list) {
doStartLogSegment(txId, bestEffort, allowMaxTxID, promise);
}
});
return promise;
}
protected void doStartLogSegment(final long txId,
final boolean bestEffort,
final boolean allowMaxTxID,
final CompletableFuture<BKLogSegmentWriter> promise) {
// validate the tx id
if ((txId < 0)
|| (!allowMaxTxID && (txId == DistributedLogConstants.MAX_TXID))) {
FutureUtils.completeExceptionally(promise, new IOException("Invalid Transaction Id " + txId));
return;
}
long highestTxIdWritten = maxTxId.get();
if (txId < highestTxIdWritten) {
if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
FutureUtils.completeExceptionally(promise,
new EndOfStreamException("Writing to a stream after it has been marked as completed"));
return;
} else {
LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
FutureUtils.completeExceptionally(promise,
new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
return;
}
}
try {
logSegmentAllocator.allocate();
} catch (IOException e) {
// failed to issue an allocation request
failStartLogSegment(promise, bestEffort, e);
return;
}
// start the transaction from zookeeper
final Transaction<Object> txn = streamMetadataStore.newTransaction();
// failpoint injected before creating ledger
try {
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate);
} catch (IOException ioe) {
failStartLogSegment(promise, bestEffort, ioe);
return;
}
logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER)
.whenComplete(new FutureEventListener<LogSegmentEntryWriter>() {
@Override
public void onSuccess(LogSegmentEntryWriter entryWriter) {
// try-obtain succeed
createInprogressLogSegment(
txn,
txId,
entryWriter,
bestEffort,
promise);
}
@Override
public void onFailure(Throwable cause) {
failStartLogSegment(promise, bestEffort, cause);
}
});
}
private void failStartLogSegment(CompletableFuture<BKLogSegmentWriter> promise,
boolean bestEffort,
Throwable cause) {
if (bestEffort) {
FutureUtils.complete(promise, null);
} else {
FutureUtils.completeExceptionally(promise, cause);
}
}
// once the ledger handle is obtained from allocator, this function should guarantee
// either the transaction is executed or aborted. Otherwise, the ledger handle will
// just leak from the allocation pool - hence cause "No Ledger Allocator"
private void createInprogressLogSegment(Transaction<Object> txn,
final long txId,
final LogSegmentEntryWriter entryWriter,
boolean bestEffort,
final CompletableFuture<BKLogSegmentWriter> promise) {
final long logSegmentSeqNo;
try {
FailpointUtils.checkFailPoint(
FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber);
logSegmentSeqNo = assignLogSegmentSequenceNumber();
} catch (IOException e) {
// abort the current prepared transaction
txn.abort(e);
failStartLogSegment(promise, bestEffort, e);
return;
}
final String inprogressZnodePath = inprogressZNode(
entryWriter.getLogSegmentId(), txId, logSegmentSeqNo);
final LogSegmentMetadata l =
new LogSegmentMetadata.LogSegmentMetadataBuilder(inprogressZnodePath,
conf.getDLLedgerMetadataLayoutVersion(), entryWriter.getLogSegmentId(), txId)
.setLogSegmentSequenceNo(logSegmentSeqNo)
.setRegionId(regionId)
.setEnvelopeEntries(
LogSegmentMetadata.supportsEnvelopedEntries(conf.getDLLedgerMetadataLayoutVersion()))
.build();
// Create an inprogress segment
writeLogSegment(txn, l);
// Try storing max sequence number.
LOG.debug("Try storing max sequence number in startLogSegment {} : {}", inprogressZnodePath, logSegmentSeqNo);
storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, logSegmentSeqNo, true);
// Try storing max tx id.
LOG.debug("Try storing MaxTxId in startLogSegment {} {}", inprogressZnodePath, txId);
storeMaxTxId(txn, maxTxId, txId);
txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
try {
FutureUtils.complete(promise, new BKLogSegmentWriter(
getFullyQualifiedName(),
l.getSegmentName(),
conf,
conf.getDLLedgerMetadataLayoutVersion(),
entryWriter,
lock,
txId,
logSegmentSeqNo,
scheduler,
statsLogger,
perLogStatsLogger,
alertStatsLogger,
writeLimiter,
featureProvider,
dynConf));
} catch (IOException ioe) {
failStartLogSegment(promise, false, ioe);
}
}
@Override
public void onFailure(Throwable cause) {
failStartLogSegment(promise, false, cause);
}
}, scheduler);
}
boolean shouldStartNewSegment(BKLogSegmentWriter writer) {
return rollingPolicy.shouldRollover(writer, lastLedgerRollingTimeMillis);
}
/**
* Finalize a log segment. If the journal manager is currently
* writing to a ledger, ensure that this is the ledger of the log segment
* being finalized.
* <p/>
* Otherwise this is the recovery case. In the recovery case, ensure that
* the firstTxId of the ledger matches firstTxId for the segment we are
* trying to finalize.
*/
CompletableFuture<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) {
final CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
completeAndCloseLogSegment(writer, promise);
return promise;
}
private void completeAndCloseLogSegment(final BKLogSegmentWriter writer,
final CompletableFuture<LogSegmentMetadata> promise) {
writer.asyncClose().whenComplete(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
// in theory closeToFinalize should throw exception if a stream is in error.
// just in case, add another checking here to make sure
// we don't close log segment is a stream is in error.
if (writer.shouldFailCompleteLogSegment()) {
FutureUtils.completeExceptionally(promise,
new IOException("LogSegmentWriter for " + writer.getFullyQualifiedLogSegment()
+ " is already in error."));
return;
}
doCompleteAndCloseLogSegment(
inprogressZNodeName(writer.getLogSegmentId(), writer.getStartTxId(),
writer.getLogSegmentSequenceNumber()),
writer.getLogSegmentSequenceNumber(),
writer.getLogSegmentId(),
writer.getStartTxId(),
writer.getLastTxId(),
writer.getPositionWithinLogSegment(),
writer.getLastDLSN().getEntryId(),
writer.getLastDLSN().getSlotId(),
promise);
}
@Override
public void onFailure(Throwable cause) {
FutureUtils.completeExceptionally(promise, cause);
}
});
}
@VisibleForTesting
LogSegmentMetadata completeAndCloseLogSegment(long logSegmentSeqNo,
long logSegmentId,
long firstTxId,
long lastTxId,
int recordCount)
throws IOException {
return completeAndCloseLogSegment(inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo),
logSegmentSeqNo, logSegmentId, firstTxId, lastTxId, recordCount, -1, -1);
}
/**
* Finalize a log segment. If the journal manager is currently
* writing to a ledger, ensure that this is the ledger of the log segment
* being finalized.
* <p/>
* Otherwise this is the recovery case. In the recovery case, ensure that
* the firstTxId of the ledger matches firstTxId for the segment we are
* trying to finalize.
*/
LogSegmentMetadata completeAndCloseLogSegment(String inprogressZnodeName, long logSegmentSeqNo,
long logSegmentId, long firstTxId, long lastTxId,
int recordCount, long lastEntryId, long lastSlotId)
throws IOException {
Stopwatch stopwatch = Stopwatch.createStarted();
boolean success = false;
try {
LogSegmentMetadata completedLogSegment =
doCompleteAndCloseLogSegment(inprogressZnodeName, logSegmentSeqNo,
logSegmentId, firstTxId, lastTxId, recordCount,
lastEntryId, lastSlotId);
success = true;
return completedLogSegment;
} finally {
if (success) {
closeOpStats.registerSuccessfulEvent(
stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
TimeUnit.MICROSECONDS);
} else {
closeOpStats.registerFailedEvent(
stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
TimeUnit.MICROSECONDS);
}
}
}
protected long computeStartSequenceId(LogSegmentMetadata segment) throws IOException {
if (!segment.isInProgress()) {
return segment.getStartSequenceId();
}
long startSequenceId = DistributedLogConstants.UNASSIGNED_SEQUENCE_ID;
// we only record sequence id when both write version and logsegment's version support sequence id
if (LogSegmentMetadata.supportsSequenceId(conf.getDLLedgerMetadataLayoutVersion())
&& segment.supportsSequenceId()) {
List<LogSegmentMetadata> logSegmentDescList =
getCachedLogSegments(LogSegmentMetadata.DESC_COMPARATOR);
startSequenceId = DLUtils.computeStartSequenceId(logSegmentDescList, segment);
}
return startSequenceId;
}
/**
* Close log segment.
*
* @param inprogressZnodeName
* @param logSegmentSeqNo
* @param logSegmentId
* @param firstTxId
* @param lastTxId
* @param recordCount
* @param lastEntryId
* @param lastSlotId
* @throws IOException
*/
protected LogSegmentMetadata doCompleteAndCloseLogSegment(
String inprogressZnodeName,
long logSegmentSeqNo,
long logSegmentId,
long firstTxId,
long lastTxId,
int recordCount,
long lastEntryId,
long lastSlotId) throws IOException {
CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
doCompleteAndCloseLogSegment(
inprogressZnodeName,
logSegmentSeqNo,
logSegmentId,
firstTxId,
lastTxId,
recordCount,
lastEntryId,
lastSlotId,
promise);
return Utils.ioResult(promise);
}
protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName,
final long logSegmentSeqNo,
final long logSegmentId,
final long firstTxId,
final long lastTxId,
final int recordCount,
final long lastEntryId,
final long lastSlotId,
final CompletableFuture<LogSegmentMetadata> promise) {
fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
@Override
public void onFailure(Throwable cause) {
FutureUtils.completeExceptionally(promise, cause);
}
@Override
public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
inprogressZnodeName,
logSegmentSeqNo,
logSegmentId,
firstTxId,
lastTxId,
recordCount,
lastEntryId,
lastSlotId,
promise);
}
});
}
private void doCompleteAndCloseLogSegmentAfterLogSegmentListFetched(
final String inprogressZnodeName,
long logSegmentSeqNo,
long logSegmentId,
long firstTxId,
long lastTxId,
int recordCount,
long lastEntryId,
long lastSlotId,
final CompletableFuture<LogSegmentMetadata> promise) {
try {
lock.checkOwnershipAndReacquire();
} catch (IOException ioe) {
FutureUtils.completeExceptionally(promise, ioe);
return;
}
LOG.debug("Completing and Closing Log Segment {} {}", firstTxId, lastTxId);
LogSegmentMetadata inprogressLogSegment = readLogSegmentFromCache(inprogressZnodeName);
// validate log segment
if (inprogressLogSegment.getLogSegmentId() != logSegmentId) {
FutureUtils.completeExceptionally(promise, new IOException(
"Active ledger has different ID to inprogress. "
+ inprogressLogSegment.getLogSegmentId() + " found, "
+ logSegmentId + " expected"));
return;
}
// validate the transaction id
if (inprogressLogSegment.getFirstTxId() != firstTxId) {
FutureUtils.completeExceptionally(promise, new IOException("Transaction id not as expected, "
+ inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected"));
return;
}
// validate the log sequence number
if (validateLogSegmentSequenceNumber) {
synchronized (inprogressLSSNs) {
if (inprogressLSSNs.isEmpty()) {
FutureUtils.completeExceptionally(promise, new UnexpectedException(
"Didn't find matched inprogress log segments when completing inprogress "
+ inprogressLogSegment));
return;
}
long leastInprogressLSSN = inprogressLSSNs.getFirst();
// the log segment sequence number in metadata
// {@link inprogressLogSegment.getLogSegmentSequenceNumber()}
// should be same as the sequence number we are completing (logSegmentSeqNo)
// and
// it should also be same as the least inprogress log segment sequence number
// tracked in {@link inprogressLSSNs}
if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo)
|| (leastInprogressLSSN != logSegmentSeqNo)) {
FutureUtils.completeExceptionally(promise, new UnexpectedException(
"Didn't find matched inprogress log segments when completing inprogress "
+ inprogressLogSegment));
return;
}
}
}
// store max sequence number.
long maxSeqNo = Math.max(logSegmentSeqNo, maxLogSegmentSequenceNo.getSequenceNumber());
if (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo
|| (maxLogSegmentSequenceNo.getSequenceNumber() == logSegmentSeqNo + 1)) {
// ignore the case that a new inprogress log segment is pre-allocated
// before completing current inprogress one
LOG.info("Try storing max sequence number {} in completing {}.",
new Object[] { logSegmentSeqNo, inprogressLogSegment.getZkPath() });
} else {
LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
new Object[] {
maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() });
if (validateLogSegmentSequenceNumber) {
FutureUtils.completeExceptionally(promise,
new DLIllegalStateException("Unexpected max log segment sequence number "
+ maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
+ ", expected " + (logSegmentSeqNo - 1)));
return;
}
}
// Prepare the completion
final String pathForCompletedLedger = completedLedgerZNode(firstTxId, lastTxId, logSegmentSeqNo);
long startSequenceId;
try {
startSequenceId = computeStartSequenceId(inprogressLogSegment);
} catch (IOException ioe) {
FutureUtils.completeExceptionally(promise, ioe);
return;
}
// write completed ledger znode
final LogSegmentMetadata completedLogSegment =
inprogressLogSegment.completeLogSegment(
pathForCompletedLedger,
lastTxId,
recordCount,
lastEntryId,
lastSlotId,
startSequenceId);
setLastLedgerRollingTimeMillis(completedLogSegment.getCompletionTime());
// prepare the transaction
Transaction<Object> txn = streamMetadataStore.newTransaction();
// create completed log segment
writeLogSegment(txn, completedLogSegment);
// delete inprogress log segment
deleteLogSegment(txn, inprogressLogSegment);
// store max sequence number
storeMaxSequenceNumber(txn, maxLogSegmentSequenceNo, maxSeqNo, false);
// update max txn id.
LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", pathForCompletedLedger, lastTxId);
storeMaxTxId(txn, maxTxId, lastTxId);
txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
LOG.info("Completed {} to {} for {} : {}",
new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(),
getFullyQualifiedName(), completedLogSegment });
FutureUtils.complete(promise, completedLogSegment);
}
@Override
public void onFailure(Throwable cause) {
FutureUtils.completeExceptionally(promise, cause);
}
}, scheduler);
}
public CompletableFuture<Long> recoverIncompleteLogSegments() {
try {
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
} catch (IOException ioe) {
return FutureUtils.exception(ioe);
}
return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR)
.thenCompose(recoverLogSegmentsFunction);
}
class RecoverLogSegmentFunction implements Function<LogSegmentMetadata, CompletableFuture<LogSegmentMetadata>> {
@Override
public CompletableFuture<LogSegmentMetadata> apply(final LogSegmentMetadata l) {
if (!l.isInProgress()) {
return FutureUtils.value(l);
}
LOG.info("Recovering last record in log segment {} for {}.", l, getFullyQualifiedName());
return asyncReadLastRecord(l, true, true, true).thenCompose(
lastRecord -> completeLogSegment(l, lastRecord));
}
private CompletableFuture<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l,
LogRecordWithDLSN lastRecord) {
LOG.info("Recovered last record in log segment {} for {}.", l, getFullyQualifiedName());
long endTxId = DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID;
int recordCount = 0;
long lastEntryId = -1;
long lastSlotId = -1;
if (null != lastRecord) {
endTxId = lastRecord.getTransactionId();
recordCount = lastRecord.getLastPositionWithinLogSegment();
lastEntryId = lastRecord.getDlsn().getEntryId();
lastSlotId = lastRecord.getDlsn().getSlotId();
}
if (endTxId == DistributedLogConstants.INVALID_TXID) {
LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + l.getZkPath()
+ ". Unable to continue recovery.");
return FutureUtils.exception(new IOException("Unrecoverable corruption,"
+ " please check logs."));
} else if (endTxId == DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) {
// TODO: Empty ledger - Ideally we should just remove it?
endTxId = l.getFirstTxId();
}
CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
doCompleteAndCloseLogSegment(
l.getZNodeName(),
l.getLogSegmentSequenceNumber(),
l.getLogSegmentId(),
l.getFirstTxId(),
endTxId,
recordCount,
lastEntryId,
lastSlotId,
promise);
return promise;
}
}
CompletableFuture<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
if (DLSN.InvalidDLSN == dlsn) {
List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0);
return FutureUtils.value(emptyList);
}
return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
logSegments -> setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn));
}
private CompletableFuture<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(
List<LogSegmentMetadata> logSegments, final DLSN dlsn) {
LOG.debug("Setting truncation status on logs older than {} from {} for {}",
new Object[]{dlsn, logSegments, getFullyQualifiedName()});
List<LogSegmentMetadata> truncateList = new ArrayList<LogSegmentMetadata>(logSegments.size());
LogSegmentMetadata partialTruncate = null;
LOG.info("{}: Truncating log segments older than {}", getFullyQualifiedName(), dlsn);
for (int i = 0; i < logSegments.size(); i++) {
LogSegmentMetadata l = logSegments.get(i);
if (!l.isInProgress()) {
if (l.getLastDLSN().compareTo(dlsn) < 0) {
LOG.debug("{}: Truncating log segment {} ", getFullyQualifiedName(), l);
truncateList.add(l);
} else if (l.getFirstDLSN().compareTo(dlsn) < 0) {
// Can be satisfied by at most one segment
if (null != partialTruncate) {
String logMsg = String.format("Potential metadata inconsistency for stream %s at segment %s",
getFullyQualifiedName(), l);
LOG.error(logMsg);
return FutureUtils.exception(new DLIllegalStateException(logMsg));
}
LOG.info("{}: Partially truncating log segment {} older than {}.",
new Object[] {getFullyQualifiedName(), l, dlsn});
partialTruncate = l;
} else {
break;
}
} else {
break;
}
}
return setLogSegmentTruncationStatus(truncateList, partialTruncate, dlsn);
}
private int getNumCandidateLogSegmentsToPurge(List<LogSegmentMetadata> logSegments) {
if (logSegments.isEmpty()) {
return 0;
} else {
// we have to keep at least one completed log segment for sequence id
int numCandidateLogSegments = 0;
for (LogSegmentMetadata segment : logSegments) {
if (segment.isInProgress()) {
break;
} else {
++numCandidateLogSegments;
}
}
return numCandidateLogSegments - 1;
}
}
CompletableFuture<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) {
if (minTimestampToKeep >= Utils.nowInMillis()) {
return FutureUtils.exception(new IllegalArgumentException(
"Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName()));
}
return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
new Function<List<LogSegmentMetadata>, CompletableFuture<List<LogSegmentMetadata>>>() {
@Override
public CompletableFuture<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size());
int numCandidates = getNumCandidateLogSegmentsToPurge(logSegments);
for (int iterator = 0; iterator < numCandidates; iterator++) {
LogSegmentMetadata l = logSegments.get(iterator);
// When application explicitly truncates segments; timestamp based purge is
// only used to cleanup log segments that have been marked for truncation
if ((l.isTruncated() || !conf.getExplicitTruncationByApplication())
&& !l.isInProgress() && (l.getCompletionTime() < minTimestampToKeep)) {
purgeList.add(l);
} else {
// stop truncating log segments if we find either an inprogress or a partially
// truncated log segment
break;
}
}
LOG.info("Deleting log segments older than {} for {} : {}",
new Object[] { minTimestampToKeep, getFullyQualifiedName(), purgeList });
return deleteLogSegments(purgeList);
}
});
}
CompletableFuture<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) {
return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
logSegments -> {
int numLogSegmentsToProcess;
if (minTxIdToKeep < 0) {
// we are deleting the log, we can remove whole log segments
numLogSegmentsToProcess = logSegments.size();
} else {
numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments);
}
List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess);
for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) {
LogSegmentMetadata l = logSegments.get(iterator);
if ((minTxIdToKeep < 0)
|| ((l.isTruncated() || !conf.getExplicitTruncationByApplication())
&& !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) {
purgeList.add(l);
} else {
// stop truncating log segments if we find either an inprogress or a partially
// truncated log segment
break;
}
}
return deleteLogSegments(purgeList);
});
}
private CompletableFuture<List<LogSegmentMetadata>> setLogSegmentTruncationStatus(
final List<LogSegmentMetadata> truncateList,
LogSegmentMetadata partialTruncate,
DLSN minActiveDLSN) {
final List<LogSegmentMetadata> listToTruncate = Lists.newArrayListWithCapacity(truncateList.size() + 1);
final List<LogSegmentMetadata> listAfterTruncated = Lists.newArrayListWithCapacity(truncateList.size() + 1);
Transaction<Object> updateTxn = metadataUpdater.transaction();
for (LogSegmentMetadata l : truncateList) {
if (!l.isTruncated()) {
LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentTruncated(updateTxn, l);
listToTruncate.add(l);
listAfterTruncated.add(newSegment);
}
}
if (null != partialTruncate && (partialTruncate.isNonTruncated()
|| (partialTruncate.isPartiallyTruncated()
&& (partialTruncate.getMinActiveDLSN().compareTo(minActiveDLSN) < 0)))) {
LogSegmentMetadata newSegment = metadataUpdater.setLogSegmentPartiallyTruncated(
updateTxn, partialTruncate, minActiveDLSN);
listToTruncate.add(partialTruncate);
listAfterTruncated.add(newSegment);
}
return updateTxn.execute().thenApply(value -> {
for (int i = 0; i < listToTruncate.size(); i++) {
removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName());
LogSegmentMetadata newSegment = listAfterTruncated.get(i);
addLogSegmentToCache(newSegment.getSegmentName(), newSegment);
}
return listAfterTruncated;
});
}
private CompletableFuture<List<LogSegmentMetadata>> deleteLogSegments(
final List<LogSegmentMetadata> logs) {
if (LOG.isTraceEnabled()) {
LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs);
}
return FutureUtils.processList(logs,
segment -> deleteLogSegment(segment), scheduler);
}
private CompletableFuture<LogSegmentMetadata> deleteLogSegment(
final LogSegmentMetadata ledgerMetadata) {
LOG.info("Deleting ledger {} for {}", ledgerMetadata, getFullyQualifiedName());
final CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
final Stopwatch stopwatch = Stopwatch.createStarted();
promise.whenComplete(new FutureEventListener<LogSegmentMetadata>() {
@Override
public void onSuccess(LogSegmentMetadata segment) {
deleteOpStats.registerSuccessfulEvent(
stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
TimeUnit.MICROSECONDS);
}
@Override
public void onFailure(Throwable cause) {
deleteOpStats.registerFailedEvent(
stopwatch.stop().elapsed(TimeUnit.MICROSECONDS),
TimeUnit.MICROSECONDS);
}
});
entryStore.deleteLogSegment(ledgerMetadata)
.whenComplete(new FutureEventListener<LogSegmentMetadata>() {
@Override
public void onFailure(Throwable cause) {
FutureUtils.completeExceptionally(promise, cause);
}
@Override
public void onSuccess(LogSegmentMetadata segment) {
deleteLogSegmentMetadata(segment, promise);
}
});
return promise;
}
private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata,
final CompletableFuture<LogSegmentMetadata> promise) {
Transaction<Object> deleteTxn = metadataStore.transaction();
metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() {
@Override
public void onCommit(Void r) {
// purge log segment
removeLogSegmentFromCache(segmentMetadata.getZNodeName());
promise.complete(segmentMetadata);
}
@Override
public void onAbort(Throwable t) {
if (t instanceof LogSegmentNotFoundException) {
// purge log segment
removeLogSegmentFromCache(segmentMetadata.getZNodeName());
promise.complete(segmentMetadata);
return;
} else {
LOG.error("Couldn't purge {} for {}: with error {}",
new Object[]{ segmentMetadata, getFullyQualifiedName(), t });
promise.completeExceptionally(t);
}
}
});
deleteTxn.execute();
}
@Override
public CompletableFuture<Void> asyncClose() {
return Utils.closeSequence(scheduler,
lock,
logSegmentAllocator);
}
@Override
public CompletableFuture<Void> asyncAbort() {
return asyncClose();
}
String completedLedgerZNodeName(long firstTxId, long lastTxId, long logSegmentSeqNo) {
if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) {
return String.format("%s_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, logSegmentSeqNo);
} else {
return String.format("%s_%018d_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX,
firstTxId, lastTxId);
}
}
/**
* Get the znode path for a finalize ledger.
*/
String completedLedgerZNode(long firstTxId, long lastTxId, long logSegmentSeqNo) {
return String.format("%s/%s", logMetadata.getLogSegmentsPath(),
completedLedgerZNodeName(firstTxId, lastTxId, logSegmentSeqNo));
}
/**
* Get the name of the inprogress znode.
*
* @return name of the inprogress znode.
*/
String inprogressZNodeName(long logSegmentId, long firstTxId, long logSegmentSeqNo) {
if (DistributedLogConstants.LOGSEGMENT_NAME_VERSION == conf.getLogSegmentNameVersion()) {
// Lots of the problems are introduced due to different inprogress names with same ledger sequence number.
return String.format("%s_%018d", DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX, logSegmentSeqNo);
} else {
return DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX + "_" + Long.toString(firstTxId, 16);
}
}
/**
* Get the znode path for the inprogressZNode.
*/
String inprogressZNode(long logSegmentId, long firstTxId, long logSegmentSeqNo) {
return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName(logSegmentId, firstTxId, logSegmentSeqNo);
}
String inprogressZNode(String inprogressZNodeName) {
return logMetadata.getLogSegmentsPath() + "/" + inprogressZNodeName;
}
}