blob: 021756003f0cab3694902ab4152f7336947b3ed0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.readahead;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import com.twitter.distributedlog.AsyncNotification;
import com.twitter.distributedlog.BKLogHandler;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.LedgerDescriptor;
import com.twitter.distributedlog.LedgerHandleCache;
import com.twitter.distributedlog.LedgerReadPosition;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.LogReadException;
import com.twitter.distributedlog.LogSegmentMetadata;
import com.twitter.distributedlog.ReadAheadCache;
import com.twitter.distributedlog.callback.LogSegmentListener;
import com.twitter.distributedlog.callback.ReadAheadCallback;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
import com.twitter.distributedlog.injector.AsyncFailureInjector;
import com.twitter.distributedlog.io.AsyncCloseable;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* ReadAhead Worker process readahead in asynchronous way. The whole readahead process are chained into
* different phases:
*
* <p>
* ScheduleReadAheadPhase: Schedule the readahead request based on previous state (e.g. whether to backoff).
* After the readahead request was scheduled, the worker enters ReadAhead phase.
* </p>
* <p>
* ReadAhead Phase: This phase is divided into several sub-phases. All the sub-phases are chained into the
* execution flow. If errors happened during execution, the worker enters Exceptions Handling Phase.
* <br>
* CheckInProgressChangedPhase: check whether there is in progress ledgers changed and updated the metadata.
* <br>
* OpenLedgerPhase: open ledgers if necessary for following read requests.
* <br>
* ReadEntriesPhase: read entries from bookkeeper and fill the readahead cache.
* <br>
* After that, the worker goes back to Schedule Phase to schedule next readahead request.
* </p>
* <p>
* Exceptions Handling Phase: Handle all the exceptions and properly schedule next readahead request.
* </p>
*/
public class ReadAheadWorker implements ReadAheadCallback, Runnable, AsyncCloseable, LogSegmentListener {
private static final Logger LOG = LoggerFactory.getLogger(ReadAheadWorker.class);
private static final int BKC_ZK_EXCEPTION_THRESHOLD_IN_SECONDS = 30;
private static final int BKC_UNEXPECTED_EXCEPTION_THRESHOLD = 3;
// Stream information
private final String fullyQualifiedName;
private final DistributedLogConfiguration conf;
private final DynamicDistributedLogConfiguration dynConf;
private final ZKLogMetadataForReader logMetadata;
private final BKLogHandler bkLedgerManager;
private final boolean isHandleForReading;
// Notification to notify readahead status
protected final AsyncNotification notification;
// resources
protected final OrderedScheduler scheduler;
private final LedgerHandleCache handleCache;
private final ReadAheadCache readAheadCache;
// ReadAhead Status
volatile boolean running = true;
Promise<Void> stopPromise = null;
private volatile boolean isCatchingUp = true;
private volatile boolean logDeleted = false;
private volatile boolean readAheadError = false;
private volatile boolean readAheadInterrupted = false;
private volatile boolean readingFromTruncated = false;
// Exceptions Handling
volatile boolean encounteredException = false;
private final AtomicInteger bkcZkExceptions = new AtomicInteger(0);
private final AtomicInteger bkcUnExpectedExceptions = new AtomicInteger(0);
private final int noLedgerExceptionOnReadLACThreshold;
private final AtomicInteger bkcNoLedgerExceptionsOnReadLAC = new AtomicInteger(0);
// Read Ahead Positions
private final LedgerReadPosition startReadPosition;
protected LedgerReadPosition nextReadAheadPosition;
//
// LogSegments & Metadata Notification
//
// variables related to zookeeper watcher notification to interrupt long poll waits
final Object notificationLock = new Object();
AsyncNotification metadataNotification = null;
volatile long metadataNotificationTimeMillis = -1L;
// variables related to log segments
private volatile boolean reInitializeMetadata = true;
private volatile boolean forceReadLogSegments = false;
volatile boolean inProgressChanged = false;
private LogSegmentMetadata currentMetadata = null;
private int currentMetadataIndex;
protected LedgerDescriptor currentLH;
private volatile List<LogSegmentMetadata> logSegmentListNotified;
private volatile List<LogSegmentMetadata> logSegmentList;
//
// ReadAhead Phases
//
final Phase schedulePhase = new ScheduleReadAheadPhase();
final Phase exceptionHandler = new ExceptionHandlePhase(schedulePhase);
final Phase readAheadPhase =
new StoppablePhase(
new CheckInProgressChangedPhase(
new OpenLedgerPhase(
new ReadEntriesPhase(schedulePhase))));
//
// Stats, Tracing and Failure Injection
//
// failure injector
private final AsyncFailureInjector failureInjector;
// trace
protected final long metadataLatencyWarnThresholdMillis;
final ReadAheadTracker tracker;
final Stopwatch resumeStopWatch;
final Stopwatch LACNotAdvancedStopWatch = Stopwatch.createUnstarted();
// Misc
private final boolean readAheadSkipBrokenEntries;
// Stats
private final AlertStatsLogger alertStatsLogger;
private final StatsLogger readAheadPerStreamStatsLogger;
private final Counter readAheadWorkerWaits;
private final Counter readAheadEntryPiggyBackHits;
private final Counter readAheadEntryPiggyBackMisses;
private final Counter readAheadReadLACAndEntryCounter;
private final Counter readAheadCacheFullCounter;
private final Counter readAheadSkippedBrokenEntries;
private final Counter idleReaderWarn;
private final OpStatsLogger readAheadReadEntriesStat;
private final OpStatsLogger readAheadCacheResumeStat;
private final OpStatsLogger readAheadLacLagStats;
private final OpStatsLogger longPollInterruptionStat;
private final OpStatsLogger metadataReinitializationStat;
private final OpStatsLogger notificationExecutionStat;
private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
public ReadAheadWorker(DistributedLogConfiguration conf,
DynamicDistributedLogConfiguration dynConf,
ZKLogMetadataForReader logMetadata,
BKLogHandler ledgerManager,
OrderedScheduler scheduler,
LedgerHandleCache handleCache,
LedgerReadPosition startPosition,
ReadAheadCache readAheadCache,
boolean isHandleForReading,
ReadAheadExceptionsLogger readAheadExceptionsLogger,
StatsLogger handlerStatsLogger,
StatsLogger readAheadPerStreamStatsLogger,
AlertStatsLogger alertStatsLogger,
AsyncFailureInjector failureInjector,
AsyncNotification notification) {
// Log information
this.fullyQualifiedName = logMetadata.getFullyQualifiedName();
this.conf = conf;
this.dynConf = dynConf;
this.logMetadata = logMetadata;
this.bkLedgerManager = ledgerManager;
this.isHandleForReading = isHandleForReading;
this.notification = notification;
// Resources
this.scheduler = scheduler;
this.handleCache = handleCache;
this.readAheadCache = readAheadCache;
// Readahead status
this.startReadPosition = new LedgerReadPosition(startPosition);
this.nextReadAheadPosition = new LedgerReadPosition(startPosition);
// LogSegments
// Failure Detection
this.failureInjector = failureInjector;
// Tracing
this.metadataLatencyWarnThresholdMillis = conf.getMetadataLatencyWarnThresholdMillis();
this.noLedgerExceptionOnReadLACThreshold =
conf.getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis() / conf.getReadAheadWaitTime();
this.tracker = new ReadAheadTracker(logMetadata.getLogName(), readAheadCache,
ReadAheadPhase.SCHEDULE_READAHEAD, readAheadPerStreamStatsLogger);
this.resumeStopWatch = Stopwatch.createUnstarted();
// Misc
this.readAheadSkipBrokenEntries = conf.getReadAheadSkipBrokenEntries();
// Stats
this.alertStatsLogger = alertStatsLogger;
this.readAheadPerStreamStatsLogger = readAheadPerStreamStatsLogger;
StatsLogger readAheadStatsLogger = handlerStatsLogger.scope("readahead_worker");
readAheadWorkerWaits = readAheadStatsLogger.getCounter("wait");
readAheadEntryPiggyBackHits = readAheadStatsLogger.getCounter("entry_piggy_back_hits");
readAheadEntryPiggyBackMisses = readAheadStatsLogger.getCounter("entry_piggy_back_misses");
readAheadReadEntriesStat = readAheadStatsLogger.getOpStatsLogger("read_entries");
readAheadReadLACAndEntryCounter = readAheadStatsLogger.getCounter("read_lac_and_entry_counter");
readAheadCacheFullCounter = readAheadStatsLogger.getCounter("cache_full");
readAheadSkippedBrokenEntries = readAheadStatsLogger.getCounter("skipped_broken_entries");
readAheadCacheResumeStat = readAheadStatsLogger.getOpStatsLogger("resume");
readAheadLacLagStats = readAheadStatsLogger.getOpStatsLogger("read_lac_lag");
longPollInterruptionStat = readAheadStatsLogger.getOpStatsLogger("long_poll_interruption");
notificationExecutionStat = readAheadStatsLogger.getOpStatsLogger("notification_execution");
metadataReinitializationStat = readAheadStatsLogger.getOpStatsLogger("metadata_reinitialization");
idleReaderWarn = readAheadStatsLogger.getCounter("idle_reader_warn");
this.readAheadExceptionsLogger = readAheadExceptionsLogger;
}
@VisibleForTesting
public LedgerReadPosition getNextReadAheadPosition() {
return nextReadAheadPosition;
}
public LedgerDescriptor getCurrentLedgerDescriptor() {
return currentLH;
}
//
// ReadAhead Status
//
void setReadAheadError(ReadAheadTracker tracker, Throwable cause) {
LOG.error("Read Ahead for {} is set to error.", logMetadata.getFullyQualifiedName());
readAheadError = true;
tracker.enterPhase(ReadAheadPhase.ERROR);
if (null != notification) {
notification.notifyOnError(cause);
}
if (null != stopPromise) {
FutureUtils.setValue(stopPromise, null);
}
}
void setReadAheadInterrupted(ReadAheadTracker tracker) {
readAheadInterrupted = true;
tracker.enterPhase(ReadAheadPhase.INTERRUPTED);
if (null != notification) {
notification.notifyOnError(new DLInterruptedException("ReadAhead worker for "
+ bkLedgerManager.getFullyQualifiedName() + " is interrupted."));
}
if (null != stopPromise) {
FutureUtils.setValue(stopPromise, null);
}
}
void setReadingFromTruncated(ReadAheadTracker tracker) {
readingFromTruncated = true;
tracker.enterPhase(ReadAheadPhase.TRUNCATED);
if (null != notification) {
notification.notifyOnError(
new AlreadyTruncatedTransactionException(logMetadata.getFullyQualifiedName()
+ ": Trying to position read ahead to a segment that is marked truncated"));
}
if (null != stopPromise) {
FutureUtils.setValue(stopPromise, null);
}
}
private void setReadAheadStopped() {
tracker.enterPhase(ReadAheadPhase.STOPPED);
if (null != stopPromise) {
FutureUtils.setValue(stopPromise, null);
}
LOG.info("Stopped ReadAheadWorker for {}", fullyQualifiedName);
}
public void checkClosedOrInError()
throws LogNotFoundException, LogReadException, DLInterruptedException,
AlreadyTruncatedTransactionException {
if (logDeleted) {
throw new LogNotFoundException(logMetadata.getFullyQualifiedName() + " is already deleted.");
} else if (readingFromTruncated) {
throw new AlreadyTruncatedTransactionException(
String.format("%s: Trying to position read ahead a segment that is marked truncated",
logMetadata.getFullyQualifiedName()));
} else if (readAheadInterrupted) {
throw new DLInterruptedException(String.format("%s: ReadAhead Thread was interrupted",
logMetadata.getFullyQualifiedName()));
} else if (readAheadError) {
throw new LogReadException(String.format("%s: ReadAhead Thread encountered exceptions",
logMetadata.getFullyQualifiedName()));
}
}
public boolean isCaughtUp() {
return !isCatchingUp;
}
public void start(List<LogSegmentMetadata> segmentList) {
LOG.debug("Starting ReadAhead Worker for {} : segments = {}",
fullyQualifiedName, segmentList);
running = true;
logSegmentListNotified = segmentList;
schedulePhase.process(BKException.Code.OK);
}
@Override
public Future<Void> asyncClose() {
LOG.info("Stopping Readahead worker for {}", fullyQualifiedName);
running = false;
// Unregister associated gauages to prevent GC spiral
this.tracker.unregisterGauge();
// Aside from unfortunate naming of variables, this allows
// the currently active long poll to be interrupted and completed
AsyncNotification notification;
synchronized (notificationLock) {
notification = metadataNotification;
metadataNotification = null;
}
if (null != notification) {
notification.notifyOnOperationComplete();
}
if (null == stopPromise) {
return Future.Void();
}
return FutureUtils.ignore(FutureUtils.within(
stopPromise,
2 * conf.getReadAheadWaitTime(),
TimeUnit.MILLISECONDS,
new TimeoutException("Timeout on waiting for ReadAhead worker to stop " + fullyQualifiedName),
scheduler,
fullyQualifiedName));
}
@Override
public String toString() {
return "Running:" + running +
", NextReadAheadPosition:" + nextReadAheadPosition +
", BKZKExceptions:" + bkcZkExceptions.get() +
", BKUnexpectedExceptions:" + bkcUnExpectedExceptions.get() +
", EncounteredException:" + encounteredException +
", readAheadError:" + readAheadError +
", readAheadInterrupted" + readAheadInterrupted +
", CurrentMetadata:" + ((null != currentMetadata) ? currentMetadata : "NONE") +
", FailureInjector:" + failureInjector;
}
@Override
public void resumeReadAhead() {
try {
long cacheResumeLatency = resumeStopWatch.stop().elapsed(TimeUnit.MICROSECONDS);
readAheadCacheResumeStat.registerSuccessfulEvent(cacheResumeLatency);
} catch (IllegalStateException ise) {
LOG.error("Encountered illegal state when stopping resume stop watch for {} : ",
logMetadata.getFullyQualifiedName(), ise);
}
submit(this);
}
Runnable addRTEHandler(final Runnable runnable) {
return new Runnable() {
@Override
public void run() {
try {
runnable.run();
} catch (RuntimeException rte) {
LOG.error("ReadAhead on stream {} encountered runtime exception",
logMetadata.getFullyQualifiedName(), rte);
setReadAheadError(tracker, rte);
throw rte;
}
}
};
}
<T> Function1<T, BoxedUnit> submit(final Function1<T, BoxedUnit> function) {
return new AbstractFunction1<T, BoxedUnit>() {
@Override
public BoxedUnit apply(final T input) {
submit(new Runnable() {
@Override
public void run() {
function.apply(input);
}
});
return BoxedUnit.UNIT;
}
};
}
void submit(Runnable runnable) {
if (failureInjector.shouldInjectStops()) {
LOG.warn("Error injected: read ahead for stream {} is going to stall.",
logMetadata.getFullyQualifiedName());
return;
}
if (failureInjector.shouldInjectDelays()) {
int delayMs = failureInjector.getInjectedDelayMs();
schedule(runnable, delayMs);
return;
}
try {
scheduler.submit(addRTEHandler(runnable));
} catch (RejectedExecutionException ree) {
setReadAheadError(tracker, ree);
}
}
private void schedule(Runnable runnable, long timeInMillis) {
try {
InterruptibleScheduledRunnable task = new InterruptibleScheduledRunnable(runnable);
boolean executeImmediately = setMetadataNotification(task);
if (executeImmediately) {
scheduler.submit(addRTEHandler(task));
return;
}
scheduler.schedule(addRTEHandler(task), timeInMillis, TimeUnit.MILLISECONDS);
readAheadWorkerWaits.inc();
} catch (RejectedExecutionException ree) {
setReadAheadError(tracker, ree);
}
}
private void handleException(ReadAheadPhase phase, int returnCode) {
readAheadExceptionsLogger.getBKExceptionStatsLogger(phase.name()).getExceptionCounter(returnCode).inc();
exceptionHandler.process(returnCode);
}
private boolean closeCurrentLedgerHandle() {
if (currentLH == null) {
return true;
}
boolean retVal = false;
LedgerDescriptor ld = currentLH;
try {
handleCache.closeLedger(ld);
currentLH = null;
retVal = true;
} catch (BKException bke) {
LOG.debug("BK Exception during closing {} : ", ld, bke);
handleException(ReadAheadPhase.CLOSE_LEDGER, bke.getCode());
}
return retVal;
}
abstract class Phase {
final Phase next;
Phase(Phase next) {
this.next = next;
}
abstract void process(int rc);
}
/**
* Schedule next readahead request. If we need to backoff, schedule in a backoff delay.
*/
final class ScheduleReadAheadPhase extends Phase {
ScheduleReadAheadPhase() {
super(null);
}
@Override
void process(int rc) {
if (!running) {
setReadAheadStopped();
return;
}
tracker.enterPhase(ReadAheadPhase.SCHEDULE_READAHEAD);
boolean injectErrors = failureInjector.shouldInjectErrors();
if (encounteredException || injectErrors) {
int zkErrorThreshold = BKC_ZK_EXCEPTION_THRESHOLD_IN_SECONDS * 1000 * 4 / conf.getReadAheadWaitTime();
if ((bkcZkExceptions.get() > zkErrorThreshold) || injectErrors) {
LOG.error("{} : BookKeeper Client used by the ReadAhead Thread has encountered {} zookeeper exceptions : simulate = {}",
new Object[] { fullyQualifiedName, bkcZkExceptions.get(), injectErrors });
running = false;
setReadAheadError(tracker, new LogReadException(
"Encountered too many zookeeper issues on read ahead for " + bkLedgerManager.getFullyQualifiedName()));
} else if (bkcUnExpectedExceptions.get() > BKC_UNEXPECTED_EXCEPTION_THRESHOLD) {
LOG.error("{} : ReadAhead Thread has encountered {} unexpected BK exceptions.",
fullyQualifiedName, bkcUnExpectedExceptions.get());
running = false;
setReadAheadError(tracker, new LogReadException(
"Encountered too many unexpected bookkeeper issues on read ahead for " + bkLedgerManager.getFullyQualifiedName()));
} else {
// We must always reinitialize metadata if the last attempt to read failed.
reInitializeMetadata = true;
encounteredException = false;
// Backoff before resuming
if (LOG.isTraceEnabled()) {
LOG.trace("Scheduling read ahead for {} after {} ms.", fullyQualifiedName, conf.getReadAheadWaitTime() / 4);
}
schedule(ReadAheadWorker.this, conf.getReadAheadWaitTime() / 4);
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Scheduling read ahead for {} now.", fullyQualifiedName);
}
submit(ReadAheadWorker.this);
}
}
}
/**
* Phase on handling exceptions.
*/
final class ExceptionHandlePhase extends Phase {
ExceptionHandlePhase(Phase next) {
super(next);
}
@Override
void process(int rc) {
tracker.enterPhase(ReadAheadPhase.EXCEPTION_HANDLING);
if (BKException.Code.InterruptedException == rc) {
LOG.trace("ReadAhead Worker for {} is interrupted.", fullyQualifiedName);
running = false;
setReadAheadInterrupted(tracker);
return;
} else if (BKException.Code.ZKException == rc) {
encounteredException = true;
int numExceptions = bkcZkExceptions.incrementAndGet();
LOG.debug("ReadAhead Worker for {} encountered zookeeper exception : total exceptions are {}.",
fullyQualifiedName, numExceptions);
} else if (BKException.Code.OK != rc) {
encounteredException = true;
switch(rc) {
case BKException.Code.NoSuchEntryException:
case BKException.Code.LedgerRecoveryException:
case BKException.Code.NoSuchLedgerExistsException:
break;
default:
bkcUnExpectedExceptions.incrementAndGet();
}
LOG.info("ReadAhead Worker for {} encountered exception : {}",
fullyQualifiedName, BKException.create(rc));
}
// schedule next read ahead
next.process(BKException.Code.OK);
}
}
/**
* A phase that could be stopped by a stopPromise
*/
final class StoppablePhase extends Phase {
StoppablePhase(Phase next) {
super(next);
}
@Override
void process(int rc) {
if (!running) {
setReadAheadStopped();
return;
}
if (null == stopPromise) {
stopPromise = new Promise<Void>();
}
// proceed the readahead request
next.process(BKException.Code.OK);
}
}
/**
* Phase on checking in progress changed.
*/
final class CheckInProgressChangedPhase extends Phase
implements FutureEventListener<Versioned<List<LogSegmentMetadata>>> {
CheckInProgressChangedPhase(Phase next) {
super(next);
}
void processLogSegments(final List<LogSegmentMetadata> segments) {
// submit callback execution to dlg executor to avoid deadlock.
submit(new Runnable() {
@Override
public void run() {
logSegmentList = segments;
boolean isInitialPositioning = nextReadAheadPosition.definitelyLessThanOrEqualTo(startReadPosition);
for (int i = 0; i < logSegmentList.size(); i++) {
LogSegmentMetadata l = logSegmentList.get(i);
// By default we should skip truncated segments during initial positioning
if (l.isTruncated() &&
isInitialPositioning &&
!conf.getIgnoreTruncationStatus()) {
continue;
}
DLSN nextReadDLSN = new DLSN(nextReadAheadPosition.getLogSegmentSequenceNumber(),
nextReadAheadPosition.getEntryId(), -1);
// next read position still inside a log segment
final boolean hasDataToRead = (l.getLastDLSN().compareTo(nextReadDLSN) >= 0);
// either there is data to read in current log segment or we are moving over a log segment that is
// still inprogress or was inprogress, we have check (or maybe close) this log segment.
final boolean checkOrCloseLedger = hasDataToRead ||
// next read position move over a log segment, if l is still inprogress or it was inprogress
((l.isInProgress() || (null != currentMetadata && currentMetadata.isInProgress())) &&
l.getLogSegmentSequenceNumber() == nextReadAheadPosition.getLogSegmentSequenceNumber());
// If we are positioning on a partially truncated log segment then the truncation point should
// be before the nextReadPosition
if (l.isPartiallyTruncated() &&
!isInitialPositioning &&
(l.getMinActiveDLSN().compareTo(nextReadDLSN) > 0)) {
if (conf.getAlertWhenPositioningOnTruncated()) {
alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated",
nextReadAheadPosition, l);
}
if (!conf.getIgnoreTruncationStatus()) {
LOG.error("{}: Trying to position reader on {} when {} is marked partially truncated",
new Object[]{ logMetadata.getFullyQualifiedName(), nextReadAheadPosition, l});
setReadingFromTruncated(tracker);
return;
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("CheckLogSegment : newMetadata = {}, currentMetadata = {}, nextReadAheadPosition = {}",
new Object[] { l, currentMetadata, nextReadAheadPosition});
}
if (checkOrCloseLedger) {
long startBKEntry = 0;
if (l.isPartiallyTruncated() && !conf.getIgnoreTruncationStatus()) {
startBKEntry = l.getMinActiveDLSN().getEntryId();
}
if(l.getLogSegmentSequenceNumber() == nextReadAheadPosition.getLogSegmentSequenceNumber()) {
startBKEntry = Math.max(startBKEntry, nextReadAheadPosition.getEntryId());
if (currentMetadata != null) {
inProgressChanged = currentMetadata.isInProgress() && !l.isInProgress();
}
} else {
// We are positioning on a new ledger => reset the current ledger handle
LOG.trace("Positioning {} on a new ledger {}", fullyQualifiedName, l);
if (!closeCurrentLedgerHandle()) {
return;
}
}
nextReadAheadPosition = new LedgerReadPosition(l.getLedgerId(), l.getLogSegmentSequenceNumber(), startBKEntry);
if (conf.getTraceReadAheadMetadataChanges()) {
LOG.info("Moved read position to {} for stream {} at {}.",
new Object[] {nextReadAheadPosition, logMetadata.getFullyQualifiedName(), System.currentTimeMillis() });
}
if (l.isTruncated()) {
if (conf.getAlertWhenPositioningOnTruncated()) {
alertStatsLogger.raise("Trying to position reader on {} when {} is marked truncated",
nextReadAheadPosition, l);
}
if (!conf.getIgnoreTruncationStatus()) {
LOG.error("{}: Trying to position reader on {} when {} is marked truncated",
new Object[]{ logMetadata.getFullyQualifiedName(), nextReadAheadPosition, l});
setReadingFromTruncated(tracker);
return;
}
}
currentMetadata = l;
currentMetadataIndex = i;
break;
}
// Handle multiple in progress => stop at the first in progress
if (l.isInProgress()) {
break;
}
}
if (null == currentMetadata) {
if (isCatchingUp) {
isCatchingUp = false;
if (isHandleForReading) {
LOG.info("{} caught up at {}: position = {} and no log segment to position on at this point.",
new Object[] { fullyQualifiedName, System.currentTimeMillis(), nextReadAheadPosition });
}
}
schedule(ReadAheadWorker.this, conf.getReadAheadWaitTimeOnEndOfStream());
if (LOG.isDebugEnabled()) {
LOG.debug("No log segment to position on for {}. Backing off for {} millseconds",
fullyQualifiedName, conf.getReadAheadWaitTimeOnEndOfStream());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Initialized metadata for {}, starting reading ahead from {} : {}.",
new Object[] { fullyQualifiedName, currentMetadataIndex, currentMetadata });
}
next.process(BKException.Code.OK);
}
// Once we have read the ledger list for the first time, subsequent segments
// should be read in a streaming manner and we should get the new ledgers as
// they close in ZK through watchers.
// So lets start observing the latency
bkLedgerManager.reportGetSegmentStats(true);
}
});
}
@Override
void process(int rc) {
if (!running) {
setReadAheadStopped();
return;
}
tracker.enterPhase(ReadAheadPhase.GET_LEDGERS);
inProgressChanged = false;
if (LOG.isTraceEnabled()) {
LOG.trace("Checking {} if InProgress changed.", fullyQualifiedName);
}
if (reInitializeMetadata || null == currentMetadata) {
reInitializeMetadata = false;
if (LOG.isTraceEnabled()) {
LOG.trace("Reinitializing metadata for {}.", fullyQualifiedName);
}
if (metadataNotificationTimeMillis > 0) {
long metadataReinitializeTimeMillis = System.currentTimeMillis();
long elapsedMillisSinceMetadataChanged = metadataReinitializeTimeMillis - metadataNotificationTimeMillis;
if (elapsedMillisSinceMetadataChanged >= metadataLatencyWarnThresholdMillis) {
LOG.warn("{} reinitialize metadata at {}, which is {} millis after receiving notification at {}.",
new Object[] { logMetadata.getFullyQualifiedName(), metadataReinitializeTimeMillis,
elapsedMillisSinceMetadataChanged, metadataNotificationTimeMillis});
}
metadataReinitializationStat.registerSuccessfulEvent(
TimeUnit.MILLISECONDS.toMicros(elapsedMillisSinceMetadataChanged));
metadataNotificationTimeMillis = -1L;
}
if (forceReadLogSegments) {
forceReadLogSegments = false;
bkLedgerManager.readLogSegmentsFromStore(
LogSegmentMetadata.COMPARATOR,
LogSegmentFilter.DEFAULT_FILTER,
null
).addEventListener(this);
} else {
processLogSegments(logSegmentListNotified);
}
} else {
next.process(BKException.Code.OK);
}
}
@Override
public void onSuccess(Versioned<List<LogSegmentMetadata>> segments) {
processLogSegments(segments.getValue());
}
@Override
public void onFailure(Throwable cause) {
LOG.info("Encountered metadata exception while reading log segments of {} : {}. Retrying ...",
bkLedgerManager.getFullyQualifiedName(), cause.getMessage());
reInitializeMetadata = true;
forceReadLogSegments = true;
handleException(ReadAheadPhase.GET_LEDGERS, BKException.Code.ZKException);
}
}
final class OpenLedgerPhase extends Phase
implements BookkeeperInternalCallbacks.GenericCallback<LedgerDescriptor>,
AsyncCallback.ReadLastConfirmedAndEntryCallback {
OpenLedgerPhase(Phase next) {
super(next);
}
private void issueReadLastConfirmedAndEntry(final boolean parallel,
final long lastAddConfirmed) {
final String ctx = String.format("ReadLastConfirmedAndEntry(%s, %d)", parallel? "Parallel":"Sequential", lastAddConfirmed);
final ReadLastConfirmedAndEntryCallbackWithNotification callback =
new ReadLastConfirmedAndEntryCallbackWithNotification(lastAddConfirmed, this, ctx);
boolean callbackImmediately = setMetadataNotification(callback);
handleCache.asyncReadLastConfirmedAndEntry(
currentLH,
nextReadAheadPosition.getEntryId(),
conf.getReadLACLongPollTimeout(),
parallel
).addEventListener(new FutureEventListener<Pair<Long, LedgerEntry>>() {
@Override
public void onSuccess(Pair<Long, LedgerEntry> lacAndEntry) {
callback.readLastConfirmedAndEntryComplete(
BKException.Code.OK,
lacAndEntry.getLeft(),
lacAndEntry.getRight(),
ctx);
}
@Override
public void onFailure(Throwable cause) {
callback.readLastConfirmedAndEntryComplete(
FutureUtils.bkResultCode(cause),
lastAddConfirmed,
null,
ctx);
}
});
callback.callbackImmediately(callbackImmediately);
readAheadReadLACAndEntryCounter.inc();
}
@Override
void process(int rc) {
if (!running) {
setReadAheadStopped();
return;
}
tracker.enterPhase(ReadAheadPhase.OPEN_LEDGER);
if (currentMetadata.isInProgress()) { // we don't want to fence the current journal
if (null == currentLH) {
if (conf.getTraceReadAheadMetadataChanges()) {
LOG.info("Opening inprogress ledger of {} for {} at {}.",
new Object[] { currentMetadata, fullyQualifiedName, System.currentTimeMillis() });
}
handleCache.asyncOpenLedger(currentMetadata, false)
.addEventListener(new FutureEventListener<LedgerDescriptor>() {
@Override
public void onSuccess(LedgerDescriptor ld) {
operationComplete(BKException.Code.OK, ld);
}
@Override
public void onFailure(Throwable cause) {
operationComplete(FutureUtils.bkResultCode(cause), null);
}
});
} else {
final long lastAddConfirmed;
try {
lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH);
} catch (BKException ie) {
// Exception is thrown due to no ledger handle
handleException(ReadAheadPhase.OPEN_LEDGER, ie.getCode());
return;
}
if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) {
// This indicates that the currentMetadata is still marked in
// progress while we have already read all the entries. It might
// indicate a failure to detect metadata change. So we
// should probably try force read log segments if the reader has
// been idle for after a while.
if (LACNotAdvancedStopWatch.isRunning()) {
if (LACNotAdvancedStopWatch.elapsed(TimeUnit.MILLISECONDS)
> conf.getReaderIdleWarnThresholdMillis()) {
idleReaderWarn.inc();
LOG.info("{} Ledger {} for inprogress segment {}, reader has been idle for warn threshold {}",
new Object[] { fullyQualifiedName, currentMetadata, currentLH, conf.getReaderIdleWarnThresholdMillis() });
reInitializeMetadata = true;
forceReadLogSegments = true;
}
} else {
LACNotAdvancedStopWatch.reset().start();
if (conf.getTraceReadAheadMetadataChanges()) {
LOG.info("{} Ledger {} for inprogress segment {} closed",
new Object[] { fullyQualifiedName, currentMetadata, currentLH });
}
}
tracker.enterPhase(ReadAheadPhase.READ_LAST_CONFIRMED);
// the readahead is caught up if current ledger is in progress and read position moves over last add confirmed
if (isCatchingUp) {
isCatchingUp = false;
if (isHandleForReading) {
LOG.info("{} caught up at {}: lac = {}, position = {}.",
new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition });
}
}
LOG.trace("Reading last add confirmed of {} for {}, as read poistion has moved over {} : {}",
new Object[] { currentMetadata, fullyQualifiedName, lastAddConfirmed, nextReadAheadPosition });
if (nextReadAheadPosition.getEntryId() == 0 && conf.getTraceReadAheadMetadataChanges()) {
// we are waiting for first entry to arrive
LOG.info("Reading last add confirmed for {} at {}: lac = {}, position = {}.",
new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition});
} else {
LOG.trace("Reading last add confirmed for {} at {}: lac = {}, position = {}.",
new Object[] { fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition});
}
issueReadLastConfirmedAndEntry(false, lastAddConfirmed);
} else {
next.process(BKException.Code.OK);
}
}
} else {
LACNotAdvancedStopWatch.reset();
if (null != currentLH) {
try {
if (inProgressChanged) {
LOG.trace("Closing completed ledger of {} for {}.", currentMetadata, fullyQualifiedName);
if (!closeCurrentLedgerHandle()) {
return;
}
inProgressChanged = false;
} else {
long lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH);
if (nextReadAheadPosition.getEntryId() > lastAddConfirmed) {
// Its possible that the last entryId does not account for the control
// log record, but the lastAddConfirmed should never be short of the
// last entry id; else we maybe missing an entry
boolean gapDetected = false;
if (lastAddConfirmed < currentMetadata.getLastEntryId()) {
alertStatsLogger.raise("Unexpected last entry id during read ahead; {} , {}",
currentMetadata, lastAddConfirmed);
gapDetected = true;
}
if (conf.getPositionGapDetectionEnabled() && gapDetected) {
setReadAheadError(tracker, new UnexpectedException(
"Unexpected last entry id during read ahead : " + currentMetadata
+ ", lac = " + lastAddConfirmed));
} else {
// This disconnect will only surface during repositioning and
// will not silently miss records; therefore its safe to not halt
// reading, but we should print a warning for easy diagnosis
if (conf.getTraceReadAheadMetadataChanges() && lastAddConfirmed > (currentMetadata.getLastEntryId() + 1)) {
LOG.warn("Potential Metadata Corruption {} for stream {}, lastAddConfirmed {}",
new Object[] {currentMetadata, logMetadata.getFullyQualifiedName(), lastAddConfirmed});
}
LOG.trace("Past the last Add Confirmed {} in ledger {} for {}",
new Object[] { lastAddConfirmed, currentMetadata, fullyQualifiedName });
if (!closeCurrentLedgerHandle()) {
return;
}
LogSegmentMetadata oldMetadata = currentMetadata;
currentMetadata = null;
if (currentMetadataIndex + 1 < logSegmentList.size()) {
currentMetadata = logSegmentList.get(++currentMetadataIndex);
if (currentMetadata.getLogSegmentSequenceNumber() != (oldMetadata.getLogSegmentSequenceNumber() + 1)) {
// We should never get here as we should have exited the loop if
// pendingRequests were empty
alertStatsLogger.raise("Unexpected condition during read ahead; {} , {}",
currentMetadata, oldMetadata);
setReadAheadError(tracker, new UnexpectedException(
"Unexpected condition during read ahead : current metadata "
+ currentMetadata + ", old metadata " + oldMetadata));
} else {
if (currentMetadata.isTruncated()) {
if (conf.getAlertWhenPositioningOnTruncated()) {
alertStatsLogger.raise("Trying to position reader on the log segment that is marked truncated : {}",
currentMetadata);
}
if (!conf.getIgnoreTruncationStatus()) {
LOG.error("{}: Trying to position reader on the log segment that is marked truncated : {}",
logMetadata.getFullyQualifiedName(), currentMetadata);
setReadingFromTruncated(tracker);
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Moving read position to a new ledger {} for {}.",
currentMetadata, fullyQualifiedName);
}
nextReadAheadPosition.positionOnNewLogSegment(currentMetadata.getLedgerId(), currentMetadata.getLogSegmentSequenceNumber());
}
}
}
}
}
}
if (!readAheadError) {
next.process(BKException.Code.OK);
}
} catch (BKException bke) {
LOG.debug("Exception while repositioning", bke);
handleException(ReadAheadPhase.CLOSE_LEDGER, bke.getCode());
}
} else {
LOG.trace("Opening completed ledger of {} for {}.", currentMetadata, fullyQualifiedName);
handleCache.asyncOpenLedger(currentMetadata, true)
.addEventListener(new FutureEventListener<LedgerDescriptor>() {
@Override
public void onSuccess(LedgerDescriptor ld) {
operationComplete(BKException.Code.OK, ld);
}
@Override
public void onFailure(Throwable cause) {
operationComplete(FutureUtils.bkResultCode(cause), null);
}
});
}
}
}
@Override
public void operationComplete(final int rc, final LedgerDescriptor result) {
// submit callback execution to dlg executor to avoid deadlock.
submit(new Runnable() {
@Override
public void run() {
if (BKException.Code.OK != rc) {
LOG.debug("BK Exception {} while opening ledger", rc);
handleException(ReadAheadPhase.OPEN_LEDGER, rc);
return;
}
currentLH = result;
if (conf.getTraceReadAheadMetadataChanges()) {
LOG.info("Opened ledger of {} for {} at {}.",
new Object[]{currentMetadata, fullyQualifiedName, System.currentTimeMillis()});
}
bkcZkExceptions.set(0);
bkcUnExpectedExceptions.set(0);
bkcNoLedgerExceptionsOnReadLAC.set(0);
next.process(rc);
}
});
}
/**
* Handle the result of reading last add confirmed.
*
* @param rc
* result of reading last add confirmed
*/
private void handleReadLastConfirmedError(int rc) {
if (BKException.Code.NoSuchLedgerExistsException == rc) {
if (bkcNoLedgerExceptionsOnReadLAC.incrementAndGet() > noLedgerExceptionOnReadLACThreshold) {
LOG.info("{} No entries published to ledger {} yet for {} millis.",
new Object[] { fullyQualifiedName, currentLH, conf.getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis() });
bkcNoLedgerExceptionsOnReadLAC.set(0);
// set current ledger handle to null, so it would be re-opened again.
// if the ledger does disappear, it would trigger re-initialize log segments on handling openLedger exceptions
if (closeCurrentLedgerHandle()) {
next.process(BKException.Code.OK);
}
return;
} else {
if (LOG.isTraceEnabled()) {
LOG.info("{} No entries published to ledger {} yet. Backoff reading ahead for {} ms.",
new Object[]{fullyQualifiedName, currentLH, conf.getReadAheadWaitTime()});
}
// Backoff before resuming
schedule(ReadAheadWorker.this, conf.getReadAheadWaitTime());
return;
}
} else if (BKException.Code.OK != rc) {
handleException(ReadAheadPhase.READ_LAST_CONFIRMED, rc);
return;
}
}
public void readLastConfirmedAndEntryComplete(final int rc, final long lastConfirmed, final LedgerEntry entry,
final Object ctx) {
// submit callback execution to dlg executor to avoid deadlock.
submit(new Runnable() {
@Override
public void run() {
if (BKException.Code.OK != rc) {
handleReadLastConfirmedError(rc);
return;
}
bkcZkExceptions.set(0);
bkcUnExpectedExceptions.set(0);
bkcNoLedgerExceptionsOnReadLAC.set(0);
if (LOG.isTraceEnabled()) {
try {
LOG.trace("Advancing Last Add Confirmed of {} for {} : {}, {}",
new Object[] { currentMetadata, fullyQualifiedName, lastConfirmed,
handleCache.getLastAddConfirmed(currentLH) });
} catch (BKException exc) {
// Ignore
}
}
if ((null != entry)
&& (nextReadAheadPosition.getEntryId() == entry.getEntryId())
&& (nextReadAheadPosition.getLedgerId() == entry.getLedgerId())) {
if (lastConfirmed <= 4 && conf.getTraceReadAheadMetadataChanges()) {
LOG.info("Hit readLastConfirmedAndEntry for {} at {} : entry = {}, lac = {}, position = {}.",
new Object[] { fullyQualifiedName, System.currentTimeMillis(),
entry.getEntryId(), lastConfirmed, nextReadAheadPosition });
}
if (!isCatchingUp) {
long lac = lastConfirmed - nextReadAheadPosition.getEntryId();
if (lac > 0) {
readAheadLacLagStats.registerSuccessfulEvent(lac);
}
}
nextReadAheadPosition.advance();
readAheadCache.set(new LedgerReadPosition(entry.getLedgerId(), currentLH.getLogSegmentSequenceNo(), entry.getEntryId()),
entry, null != ctx ? ctx.toString() : "",
currentMetadata.getEnvelopeEntries(), currentMetadata.getStartSequenceId());
if (LOG.isTraceEnabled()) {
LOG.trace("Reading the value received {} for {} : entryId {}",
new Object[] { currentMetadata, fullyQualifiedName, entry.getEntryId() });
}
readAheadEntryPiggyBackHits.inc();
} else {
if (lastConfirmed > nextReadAheadPosition.getEntryId()) {
LOG.info("{} : entry {} isn't piggybacked but last add confirmed already moves to {}.",
new Object[] { logMetadata.getFullyQualifiedName(), nextReadAheadPosition.getEntryId(), lastConfirmed });
}
readAheadEntryPiggyBackMisses.inc();
}
next.process(rc);
}
});
}
}
final class ReadEntriesPhase extends Phase implements Runnable {
boolean cacheFull = false;
long lastAddConfirmed = -1;
ReadEntriesPhase(Phase next) {
super(next);
}
@Override
void process(int rc) {
if (!running) {
setReadAheadStopped();
return;
}
tracker.enterPhase(ReadAheadPhase.READ_ENTRIES);
cacheFull = false;
lastAddConfirmed = -1;
if (null != currentLH) {
try {
lastAddConfirmed = handleCache.getLastAddConfirmed(currentLH);
} catch (BKException bke) {
handleException(ReadAheadPhase.READ_LAST_CONFIRMED, bke.getCode());
return;
}
read();
} else {
complete();
}
}
private void read() {
if (lastAddConfirmed < nextReadAheadPosition.getEntryId()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Nothing to read for {} of {} : lastAddConfirmed = {}, nextReadAheadPosition = {}",
new Object[] { currentMetadata, fullyQualifiedName, lastAddConfirmed, nextReadAheadPosition});
}
complete();
return;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Reading entry {} for {} of {}.",
new Object[] {nextReadAheadPosition, currentMetadata, fullyQualifiedName });
}
int readAheadBatchSize = dynConf.getReadAheadBatchSize();
final long startEntryId = nextReadAheadPosition.getEntryId();
final long endEntryId = Math.min(lastAddConfirmed, (nextReadAheadPosition.getEntryId() + readAheadBatchSize - 1));
if (endEntryId <= readAheadBatchSize && conf.getTraceReadAheadMetadataChanges()) {
// trace first read batch
LOG.info("Reading entries ({} - {}) for {} at {} : lac = {}, nextReadAheadPosition = {}.",
new Object[] { startEntryId, endEntryId, fullyQualifiedName, System.currentTimeMillis(), lastAddConfirmed, nextReadAheadPosition});
}
final String readCtx = String.format("ReadEntries(%d-%d)", startEntryId, endEntryId);
handleCache.asyncReadEntries(currentLH, startEntryId, endEntryId)
.addEventListener(new FutureEventListener<Enumeration<LedgerEntry>>() {
@Override
public void onSuccess(Enumeration<LedgerEntry> entries) {
int rc = BKException.Code.OK;
// If the range includes an entry id that is a multiple of 10, simulate corruption.
if (failureInjector.shouldInjectCorruption() && rangeContainsSimulatedBrokenEntry(startEntryId, endEntryId)) {
rc = BKException.Code.DigestMatchException;
}
readComplete(rc, null, entries, readCtx, startEntryId, endEntryId);
}
@Override
public void onFailure(Throwable cause) {
readComplete(FutureUtils.bkResultCode(cause), null, null, readCtx, startEntryId, endEntryId);
}
});
}
private boolean rangeContainsSimulatedBrokenEntry(long start, long end) {
for (long i = start; i <= end; i++) {
if (i % 10 == 0) {
return true;
}
}
return false;
}
public void readComplete(final int rc, final LedgerHandle lh,
final Enumeration<LedgerEntry> seq, final Object ctx,
final long startEntryId, final long endEntryId) {
// submit callback execution to dlg executor to avoid deadlock.
submit(new Runnable() {
@Override
public void run() {
// If readAheadSkipBrokenEntries is enabled and we hit a corrupt entry, log and
// stat the issue and move forward.
if (BKException.Code.DigestMatchException == rc && readAheadSkipBrokenEntries) {
readAheadReadEntriesStat.registerFailedEvent(0);
LOG.error("BK DigestMatchException while reading entries {}-{} in stream {}, entry {} discarded",
new Object[] { startEntryId, endEntryId, fullyQualifiedName, startEntryId });
bkcZkExceptions.set(0);
bkcUnExpectedExceptions.set(0);
readAheadSkippedBrokenEntries.inc();
nextReadAheadPosition.advance();
} else if (BKException.Code.OK != rc) {
readAheadReadEntriesStat.registerFailedEvent(0);
LOG.debug("BK Exception {} while reading entry", rc);
handleException(ReadAheadPhase.READ_ENTRIES, rc);
return;
} else {
int numReads = 0;
while (seq.hasMoreElements()) {
bkcZkExceptions.set(0);
bkcUnExpectedExceptions.set(0);
nextReadAheadPosition.advance();
LedgerEntry e = seq.nextElement();
LedgerReadPosition readPosition = new LedgerReadPosition(e.getLedgerId(), currentMetadata.getLogSegmentSequenceNumber(), e.getEntryId());
readAheadCache.set(readPosition, e, null != ctx ? ctx.toString() : "",
currentMetadata.getEnvelopeEntries(), currentMetadata.getStartSequenceId());
++numReads;
if (LOG.isDebugEnabled()) {
LOG.debug("Read entry {} of {}.", readPosition, fullyQualifiedName);
}
}
readAheadReadEntriesStat.registerSuccessfulEvent(numReads);
}
if (readAheadCache.isCacheFull()) {
cacheFull = true;
complete();
} else {
read();
}
}
});
}
private void complete() {
if (cacheFull) {
LOG.trace("Cache for {} is full. Backoff reading until notified", fullyQualifiedName);
readAheadCacheFullCounter.inc();
resumeStopWatch.reset().start();
stopPromise = null;
readAheadCache.setReadAheadCallback(ReadAheadWorker.this);
} else {
run();
}
}
@Override
public void run() {
next.process(BKException.Code.OK);
}
}
@Override
public void run() {
if (!running) {
setReadAheadStopped();
return;
}
readAheadPhase.process(BKException.Code.OK);
}
@Override
public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
AsyncNotification notification;
synchronized (notificationLock) {
logSegmentListNotified = segments;
reInitializeMetadata = true;
LOG.debug("{} Read ahead node changed", fullyQualifiedName);
notification = metadataNotification;
metadataNotification = null;
}
metadataNotificationTimeMillis = System.currentTimeMillis();
if (null != notification) {
notification.notifyOnOperationComplete();
}
}
@Override
public void onLogStreamDeleted() {
logDeleted = true;
setReadAheadError(tracker, new LogNotFoundException("Log stream "
+ bkLedgerManager.getFullyQualifiedName() + " is deleted."));
}
/**
* Set metadata notification and return the flag indicating whether to reinitialize metadata.
*
* @param notification
* metadata notification
* @return flag indicating whether to reinitialize metadata.
*/
private boolean setMetadataNotification(AsyncNotification notification) {
synchronized (notificationLock) {
this.metadataNotification = notification;
return reInitializeMetadata;
}
}
@VisibleForTesting
public AsyncNotification getMetadataNotification() {
synchronized (notificationLock) {
return metadataNotification;
}
}
/**
* A scheduled runnable that could be waken and executed immediately when notification arrives.
*
* E.g
* <p>
* The reader reaches end of stream, it backs off to schedule next read in 2 seconds.
* <br/>
* if a new log segment is created, without this change, reader has to wait 2 seconds to read
* entries in new log segment, which means delivery latency of entries in new log segment could
* be up to 2 seconds. but with this change, the task would be executed immediately, which reader
* would be waken up from backoff, which would reduce the delivery latency.
* </p>
*/
class InterruptibleScheduledRunnable implements AsyncNotification, Runnable {
final Runnable task;
final AtomicBoolean called = new AtomicBoolean(false);
final long startNanos;
InterruptibleScheduledRunnable(Runnable task) {
this.task = task;
this.startNanos = MathUtils.nowInNano();
}
@Override
public void notifyOnError(Throwable t) {
longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos));
execute();
}
@Override
public void notifyOnOperationComplete() {
longPollInterruptionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startNanos));
execute();
}
@Override
public void run() {
if (called.compareAndSet(false, true)) {
task.run();
}
}
void execute() {
if (called.compareAndSet(false, true)) {
submit(task);
}
}
}
abstract class LongPollNotification<T> implements AsyncNotification {
final long lac;
final T cb;
final Object ctx;
final AtomicBoolean called = new AtomicBoolean(false);
final long startNanos;
LongPollNotification(long lac, T cb, Object ctx) {
this.lac = lac;
this.cb = cb;
this.ctx = ctx;
this.startNanos = MathUtils.nowInNano();
}
void complete(boolean success) {
long startTime = MathUtils.nowInNano();
doComplete(success);
if (success) {
notificationExecutionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startTime));
} else {
notificationExecutionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startTime));
}
}
abstract void doComplete(boolean success);
@Override
public void notifyOnError(Throwable cause) {
longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(startNanos));
complete(false);
}
@Override
public void notifyOnOperationComplete() {
longPollInterruptionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(startNanos));
complete(true);
}
void callbackImmediately(boolean immediate) {
if (immediate) {
complete(true);
}
}
}
class ReadLastConfirmedAndEntryCallbackWithNotification
extends LongPollNotification<AsyncCallback.ReadLastConfirmedAndEntryCallback>
implements AsyncCallback.ReadLastConfirmedAndEntryCallback {
ReadLastConfirmedAndEntryCallbackWithNotification(
long lac, AsyncCallback.ReadLastConfirmedAndEntryCallback cb, Object ctx) {
super(lac, cb, ctx);
}
@Override
public void readLastConfirmedAndEntryComplete(int rc, long lac, LedgerEntry entry, Object ctx) {
if (called.compareAndSet(false, true)) {
// clear the notification when callback
synchronized (notificationLock) {
metadataNotification = null;
}
this.cb.readLastConfirmedAndEntryComplete(rc, lac, entry, ctx);
}
}
@Override
void doComplete(boolean success) {
readLastConfirmedAndEntryComplete(BKException.Code.OK, lac, null, ctx);
}
}
}