| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.distributedlog; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Optional; |
| import com.google.common.base.Stopwatch; |
| import com.google.common.base.Ticker; |
| import org.apache.distributedlog.exceptions.DLIllegalStateException; |
| import org.apache.distributedlog.exceptions.DLInterruptedException; |
| import org.apache.distributedlog.exceptions.EndOfStreamException; |
| import org.apache.distributedlog.exceptions.IdleReaderException; |
| import org.apache.distributedlog.exceptions.LogNotFoundException; |
| import org.apache.distributedlog.exceptions.ReadCancelledException; |
| import org.apache.distributedlog.exceptions.UnexpectedException; |
| import org.apache.distributedlog.util.OrderedScheduler; |
| import org.apache.distributedlog.util.Utils; |
| import com.twitter.util.Future; |
| import com.twitter.util.FutureEventListener; |
| import com.twitter.util.Promise; |
| import com.twitter.util.Throw; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.bookkeeper.stats.Counter; |
| import org.apache.bookkeeper.stats.OpStatsLogger; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.bookkeeper.versioning.Versioned; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import scala.Function1; |
| import scala.runtime.AbstractFunction1; |
| import scala.runtime.BoxedUnit; |
| |
| /** |
| * BookKeeper based {@link AsyncLogReader} implementation. |
| * |
| * <h3>Metrics</h3> |
| * All the metrics are exposed under `async_reader`. |
| * <ul> |
| * <li> `async_reader`/future_set: opstats. time spent on satisfying futures of read requests. |
| * if it is high, it means that the caller takes time on processing the result of read requests. |
| * The side effect is blocking consequent reads. |
| * <li> `async_reader`/schedule: opstats. time spent on scheduling next reads. |
| * <li> `async_reader`/background_read: opstats. time spent on background reads. |
| * <li> `async_reader`/read_next_exec: opstats. time spent on executing {@link #readNext()}. |
| * <li> `async_reader`/time_between_read_next: opstats. time spent on between two consequent {@link #readNext()}. |
| * if it is high, it means that the caller is slowing down on calling {@link #readNext()}. |
| * <li> `async_reader`/delay_until_promise_satisfied: opstats. total latency for the read requests. |
| * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors. |
| * </ul> |
| */ |
| class BKAsyncLogReader implements AsyncLogReader, Runnable, AsyncNotification { |
| static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReader.class); |
| |
| private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION = |
| new AbstractFunction1<List<LogRecordWithDLSN>, LogRecordWithDLSN>() { |
| @Override |
| public LogRecordWithDLSN apply(List<LogRecordWithDLSN> records) { |
| return records.get(0); |
| } |
| }; |
| |
| private final String streamName; |
| protected final BKDistributedLogManager bkDistributedLogManager; |
| protected final BKLogReadHandler readHandler; |
| private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>(); |
| private final OrderedScheduler scheduler; |
| private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>(); |
| private final Object scheduleLock = new Object(); |
| private final AtomicLong scheduleCount = new AtomicLong(0); |
| final private Stopwatch scheduleDelayStopwatch; |
| final private Stopwatch readNextDelayStopwatch; |
| private DLSN startDLSN; |
| private ReadAheadEntryReader readAheadReader = null; |
| private int lastPosition = 0; |
| private final boolean positionGapDetectionEnabled; |
| private final int idleErrorThresholdMillis; |
| final ScheduledFuture<?> idleReaderTimeoutTask; |
| private ScheduledFuture<?> backgroundScheduleTask = null; |
| // last process time |
| private final Stopwatch lastProcessTime; |
| |
| protected Promise<Void> closeFuture = null; |
| |
| private boolean lockStream = false; |
| |
| private final boolean returnEndOfStreamRecord; |
| |
| private final Runnable BACKGROUND_READ_SCHEDULER = new Runnable() { |
| @Override |
| public void run() { |
| synchronized (scheduleLock) { |
| backgroundScheduleTask = null; |
| } |
| scheduleBackgroundRead(); |
| } |
| }; |
| |
| // State |
| private Entry.Reader currentEntry = null; |
| private LogRecordWithDLSN nextRecord = null; |
| |
| // Failure Injector |
| private boolean disableProcessingReadRequests = false; |
| |
| // Stats |
| private final OpStatsLogger readNextExecTime; |
| private final OpStatsLogger delayUntilPromiseSatisfied; |
| private final OpStatsLogger timeBetweenReadNexts; |
| private final OpStatsLogger futureSetLatency; |
| private final OpStatsLogger scheduleLatency; |
| private final OpStatsLogger backgroundReaderRunTime; |
| private final Counter idleReaderCheckCount; |
| private final Counter idleReaderCheckIdleReadRequestCount; |
| private final Counter idleReaderCheckIdleReadAheadCount; |
| private final Counter idleReaderError; |
| |
| private class PendingReadRequest { |
| private final Stopwatch enqueueTime; |
| private final int numEntries; |
| private final List<LogRecordWithDLSN> records; |
| private final Promise<List<LogRecordWithDLSN>> promise; |
| private final long deadlineTime; |
| private final TimeUnit deadlineTimeUnit; |
| |
| PendingReadRequest(int numEntries, |
| long deadlineTime, |
| TimeUnit deadlineTimeUnit) { |
| this.numEntries = numEntries; |
| this.enqueueTime = Stopwatch.createStarted(); |
| // optimize the space usage for single read. |
| if (numEntries == 1) { |
| this.records = new ArrayList<LogRecordWithDLSN>(1); |
| } else { |
| this.records = new ArrayList<LogRecordWithDLSN>(); |
| } |
| this.promise = new Promise<List<LogRecordWithDLSN>>(); |
| this.deadlineTime = deadlineTime; |
| this.deadlineTimeUnit = deadlineTimeUnit; |
| } |
| |
| Promise<List<LogRecordWithDLSN>> getPromise() { |
| return promise; |
| } |
| |
| long elapsedSinceEnqueue(TimeUnit timeUnit) { |
| return enqueueTime.elapsed(timeUnit); |
| } |
| |
| void setException(Throwable throwable) { |
| Stopwatch stopwatch = Stopwatch.createStarted(); |
| if (promise.updateIfEmpty(new Throw<List<LogRecordWithDLSN>>(throwable))) { |
| futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); |
| delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS)); |
| } |
| } |
| |
| boolean hasReadRecords() { |
| return records.size() > 0; |
| } |
| |
| boolean hasReadEnoughRecords() { |
| return records.size() >= numEntries; |
| } |
| |
| long getRemainingWaitTime() { |
| if (deadlineTime <= 0L) { |
| return 0L; |
| } |
| return deadlineTime - elapsedSinceEnqueue(deadlineTimeUnit); |
| } |
| |
| void addRecord(LogRecordWithDLSN record) { |
| records.add(record); |
| } |
| |
| void complete() { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size()); |
| } |
| delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS)); |
| Stopwatch stopwatch = Stopwatch.createStarted(); |
| promise.setValue(records); |
| futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); |
| } |
| } |
| |
| BKAsyncLogReader(BKDistributedLogManager bkdlm, |
| OrderedScheduler scheduler, |
| DLSN startDLSN, |
| Optional<String> subscriberId, |
| boolean returnEndOfStreamRecord, |
| StatsLogger statsLogger) { |
| this.streamName = bkdlm.getStreamName(); |
| this.bkDistributedLogManager = bkdlm; |
| this.scheduler = scheduler; |
| this.readHandler = bkDistributedLogManager.createReadHandler(subscriberId, |
| this, true); |
| LOG.debug("Starting async reader at {}", startDLSN); |
| this.startDLSN = startDLSN; |
| this.scheduleDelayStopwatch = Stopwatch.createUnstarted(); |
| this.readNextDelayStopwatch = Stopwatch.createStarted(); |
| this.positionGapDetectionEnabled = bkdlm.getConf().getPositionGapDetectionEnabled(); |
| this.idleErrorThresholdMillis = bkdlm.getConf().getReaderIdleErrorThresholdMillis(); |
| this.returnEndOfStreamRecord = returnEndOfStreamRecord; |
| |
| // Stats |
| StatsLogger asyncReaderStatsLogger = statsLogger.scope("async_reader"); |
| futureSetLatency = asyncReaderStatsLogger.getOpStatsLogger("future_set"); |
| scheduleLatency = asyncReaderStatsLogger.getOpStatsLogger("schedule"); |
| backgroundReaderRunTime = asyncReaderStatsLogger.getOpStatsLogger("background_read"); |
| readNextExecTime = asyncReaderStatsLogger.getOpStatsLogger("read_next_exec"); |
| timeBetweenReadNexts = asyncReaderStatsLogger.getOpStatsLogger("time_between_read_next"); |
| delayUntilPromiseSatisfied = asyncReaderStatsLogger.getOpStatsLogger("delay_until_promise_satisfied"); |
| idleReaderError = asyncReaderStatsLogger.getCounter("idle_reader_error"); |
| idleReaderCheckCount = asyncReaderStatsLogger.getCounter("idle_reader_check_total"); |
| idleReaderCheckIdleReadRequestCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_read_requests"); |
| idleReaderCheckIdleReadAheadCount = asyncReaderStatsLogger.getCounter("idle_reader_check_idle_readahead"); |
| |
| // Lock the stream if requested. The lock will be released when the reader is closed. |
| this.lockStream = false; |
| this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary(); |
| this.lastProcessTime = Stopwatch.createStarted(); |
| } |
| |
| private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() { |
| if (idleErrorThresholdMillis < Integer.MAX_VALUE) { |
| // Dont run the task more than once every seconds (for sanity) |
| long period = Math.max(idleErrorThresholdMillis / 10, 1000); |
| // Except when idle reader threshold is less than a second (tests?) |
| period = Math.min(period, idleErrorThresholdMillis / 5); |
| |
| return scheduler.scheduleAtFixedRate(streamName, new Runnable() { |
| @Override |
| public void run() { |
| PendingReadRequest nextRequest = pendingRequests.peek(); |
| |
| idleReaderCheckCount.inc(); |
| if (null == nextRequest) { |
| return; |
| } |
| |
| idleReaderCheckIdleReadRequestCount.inc(); |
| if (nextRequest.elapsedSinceEnqueue(TimeUnit.MILLISECONDS) < idleErrorThresholdMillis) { |
| return; |
| } |
| |
| ReadAheadEntryReader readAheadReader = getReadAheadReader(); |
| |
| // read request has been idle |
| // - cache has records but read request are idle, |
| // that means notification was missed between readahead and reader. |
| // - cache is empty and readahead is idle (no records added for a long time) |
| idleReaderCheckIdleReadAheadCount.inc(); |
| try { |
| if (null == readAheadReader || (!hasMoreRecords() && |
| readAheadReader.isReaderIdle(idleErrorThresholdMillis, TimeUnit.MILLISECONDS))) { |
| markReaderAsIdle(); |
| return; |
| } else if (lastProcessTime.elapsed(TimeUnit.MILLISECONDS) > idleErrorThresholdMillis) { |
| markReaderAsIdle();; |
| } |
| } catch (IOException e) { |
| setLastException(e); |
| return; |
| } |
| } |
| }, period, period, TimeUnit.MILLISECONDS); |
| } |
| return null; |
| } |
| |
| synchronized ReadAheadEntryReader getReadAheadReader() { |
| return readAheadReader; |
| } |
| |
| void cancelIdleReaderTask() { |
| // Do this after we have checked that the reader was not previously closed |
| try { |
| if (null != idleReaderTimeoutTask) { |
| idleReaderTimeoutTask.cancel(true); |
| } |
| } catch (Exception exc) { |
| LOG.info("{}: Failed to cancel the background idle reader timeout task", readHandler.getFullyQualifiedName()); |
| } |
| } |
| |
| private void markReaderAsIdle() { |
| idleReaderError.inc(); |
| IdleReaderException ire = new IdleReaderException("Reader on stream " |
| + readHandler.getFullyQualifiedName() |
| + " is idle for " + idleErrorThresholdMillis +" ms"); |
| setLastException(ire); |
| // cancel all pending reads directly rather than notifying on error |
| // because idle reader could happen on idle read requests that usually means something wrong |
| // in scheduling reads |
| cancelAllPendingReads(ire); |
| } |
| |
| protected synchronized void setStartDLSN(DLSN fromDLSN) throws UnexpectedException { |
| if (null != readAheadReader) { |
| throw new UnexpectedException("Could't reset from dlsn after reader already starts reading."); |
| } |
| startDLSN = fromDLSN; |
| } |
| |
| @VisibleForTesting |
| public synchronized DLSN getStartDLSN() { |
| return startDLSN; |
| } |
| |
| public Future<Void> lockStream() { |
| this.lockStream = true; |
| return readHandler.lockStream(); |
| } |
| |
| private boolean checkClosedOrInError(String operation) { |
| if (null == lastException.get()) { |
| try { |
| if (null != readHandler && null != getReadAheadReader()) { |
| getReadAheadReader().checkLastException(); |
| } |
| |
| bkDistributedLogManager.checkClosedOrInError(operation); |
| } catch (IOException exc) { |
| setLastException(exc); |
| } |
| } |
| |
| if (lockStream) { |
| try { |
| readHandler.checkReadLock(); |
| } catch (IOException ex) { |
| setLastException(ex); |
| } |
| } |
| |
| if (null != lastException.get()) { |
| LOG.trace("Cancelling pending reads"); |
| cancelAllPendingReads(lastException.get()); |
| return true; |
| } |
| |
| return false; |
| } |
| |
| private void setLastException(IOException exc) { |
| lastException.compareAndSet(null, exc); |
| } |
| |
| @Override |
| public String getStreamName() { |
| return streamName; |
| } |
| |
| /** |
| * @return A promise that when satisfied will contain the Log Record with its DLSN. |
| */ |
| @Override |
| public synchronized Future<LogRecordWithDLSN> readNext() { |
| return readInternal(1, 0, TimeUnit.MILLISECONDS).map(READ_NEXT_MAP_FUNCTION); |
| } |
| |
| public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries) { |
| return readInternal(numEntries, 0, TimeUnit.MILLISECONDS); |
| } |
| |
| @Override |
| public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries, |
| long waitTime, |
| TimeUnit timeUnit) { |
| return readInternal(numEntries, waitTime, timeUnit); |
| } |
| |
| /** |
| * Read up to <i>numEntries</i> entries. The future will be satisfied when any number of entries are |
| * ready (1 to <i>numEntries</i>). |
| * |
| * @param numEntries |
| * num entries to read |
| * @return A promise that satisfied with a non-empty list of log records with their DLSN. |
| */ |
| private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries, |
| long deadlineTime, |
| TimeUnit deadlineTimeUnit) { |
| timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); |
| readNextDelayStopwatch.reset().start(); |
| final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit); |
| |
| if (null == readAheadReader) { |
| final ReadAheadEntryReader readAheadEntryReader = this.readAheadReader = new ReadAheadEntryReader( |
| getStreamName(), |
| getStartDLSN(), |
| bkDistributedLogManager.getConf(), |
| readHandler, |
| bkDistributedLogManager.getReaderEntryStore(), |
| bkDistributedLogManager.getScheduler(), |
| Ticker.systemTicker(), |
| bkDistributedLogManager.alertStatsLogger); |
| readHandler.checkLogStreamExists().addEventListener(new FutureEventListener<Void>() { |
| @Override |
| public void onSuccess(Void value) { |
| try { |
| readHandler.registerListener(readAheadEntryReader); |
| readHandler.asyncStartFetchLogSegments() |
| .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() { |
| @Override |
| public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) { |
| readAheadEntryReader.addStateChangeNotification(BKAsyncLogReader.this); |
| readAheadEntryReader.start(logSegments.getValue()); |
| return BoxedUnit.UNIT; |
| } |
| }); |
| } catch (Exception exc) { |
| notifyOnError(exc); |
| } |
| } |
| |
| @Override |
| public void onFailure(Throwable cause) { |
| notifyOnError(cause); |
| } |
| }); |
| } |
| |
| if (checkClosedOrInError("readNext")) { |
| readRequest.setException(lastException.get()); |
| } else { |
| boolean queueEmpty = pendingRequests.isEmpty(); |
| pendingRequests.add(readRequest); |
| |
| if (queueEmpty) { |
| scheduleBackgroundRead(); |
| } |
| } |
| |
| readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); |
| readNextDelayStopwatch.reset().start(); |
| |
| return readRequest.getPromise(); |
| } |
| |
| public synchronized void scheduleBackgroundRead() { |
| // if the reader is already closed, we don't need to schedule background read again. |
| if (null != closeFuture) { |
| return; |
| } |
| |
| long prevCount = scheduleCount.getAndIncrement(); |
| if (0 == prevCount) { |
| scheduleDelayStopwatch.reset().start(); |
| scheduler.submit(streamName, this); |
| } |
| } |
| |
| @Override |
| public Future<Void> asyncClose() { |
| // Cancel the idle reader timeout task, interrupting if necessary |
| ReadCancelledException exception; |
| Promise<Void> closePromise; |
| synchronized (this) { |
| if (null != closeFuture) { |
| return closeFuture; |
| } |
| closePromise = closeFuture = new Promise<Void>(); |
| exception = new ReadCancelledException(readHandler.getFullyQualifiedName(), "Reader was closed"); |
| setLastException(exception); |
| } |
| |
| // Do this after we have checked that the reader was not previously closed |
| cancelIdleReaderTask(); |
| |
| synchronized (scheduleLock) { |
| if (null != backgroundScheduleTask) { |
| backgroundScheduleTask.cancel(true); |
| } |
| } |
| |
| cancelAllPendingReads(exception); |
| |
| ReadAheadEntryReader readAheadReader = getReadAheadReader(); |
| if (null != readAheadReader) { |
| readHandler.unregisterListener(readAheadReader); |
| readAheadReader.removeStateChangeNotification(this); |
| } |
| Utils.closeSequence(bkDistributedLogManager.getScheduler(), true, |
| readAheadReader, |
| readHandler |
| ).proxyTo(closePromise); |
| return closePromise; |
| } |
| |
| private void cancelAllPendingReads(Throwable throwExc) { |
| for (PendingReadRequest promise : pendingRequests) { |
| promise.setException(throwExc); |
| } |
| pendingRequests.clear(); |
| } |
| |
| synchronized boolean hasMoreRecords() throws IOException { |
| if (null == readAheadReader) { |
| return false; |
| } |
| if (readAheadReader.getNumCachedEntries() > 0 || null != nextRecord) { |
| return true; |
| } else if (null != currentEntry) { |
| nextRecord = currentEntry.nextRecord(); |
| return null != nextRecord; |
| } |
| return false; |
| } |
| |
| private synchronized LogRecordWithDLSN readNextRecord() throws IOException { |
| if (null == readAheadReader) { |
| return null; |
| } |
| if (null == currentEntry) { |
| currentEntry = readAheadReader.getNextReadAheadEntry(0L, TimeUnit.MILLISECONDS); |
| // no entry after reading from read ahead then return null |
| if (null == currentEntry) { |
| return null; |
| } |
| } |
| |
| LogRecordWithDLSN recordToReturn; |
| if (null == nextRecord) { |
| nextRecord = currentEntry.nextRecord(); |
| // no more records in current entry |
| if (null == nextRecord) { |
| currentEntry = null; |
| return readNextRecord(); |
| } |
| } |
| |
| // found a record to return and prefetch the next one |
| recordToReturn = nextRecord; |
| nextRecord = currentEntry.nextRecord(); |
| return recordToReturn; |
| } |
| |
| @Override |
| public void run() { |
| synchronized(scheduleLock) { |
| if (scheduleDelayStopwatch.isRunning()) { |
| scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); |
| } |
| |
| Stopwatch runTime = Stopwatch.createStarted(); |
| int iterations = 0; |
| long scheduleCountLocal = scheduleCount.get(); |
| LOG.debug("{}: Scheduled Background Reader", readHandler.getFullyQualifiedName()); |
| while(true) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("{}: Executing Iteration: {}", readHandler.getFullyQualifiedName(), iterations++); |
| } |
| |
| PendingReadRequest nextRequest = null; |
| synchronized(this) { |
| nextRequest = pendingRequests.peek(); |
| |
| // Queue is empty, nothing to read, return |
| if (null == nextRequest) { |
| LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName()); |
| scheduleCount.set(0); |
| backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); |
| return; |
| } |
| |
| if (disableProcessingReadRequests) { |
| LOG.info("Reader of {} is forced to stop processing read requests", readHandler.getFullyQualifiedName()); |
| return; |
| } |
| } |
| lastProcessTime.reset().start(); |
| |
| // If the oldest pending promise is interrupted then we must mark |
| // the reader in error and abort all pending reads since we dont |
| // know the last consumed read |
| if (null == lastException.get()) { |
| if (nextRequest.getPromise().isInterrupted().isDefined()) { |
| setLastException(new DLInterruptedException("Interrupted on reading " + readHandler.getFullyQualifiedName() + " : ", |
| nextRequest.getPromise().isInterrupted().get())); |
| } |
| } |
| |
| if (checkClosedOrInError("readNext")) { |
| if (!(lastException.get().getCause() instanceof LogNotFoundException)) { |
| LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get()); |
| } |
| backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); |
| return; |
| } |
| |
| try { |
| // Fail 10% of the requests when asked to simulate errors |
| if (bkDistributedLogManager.getFailureInjector().shouldInjectErrors()) { |
| throw new IOException("Reader Simulated Exception"); |
| } |
| LogRecordWithDLSN record; |
| while (!nextRequest.hasReadEnoughRecords()) { |
| // read single record |
| do { |
| record = readNextRecord(); |
| } while (null != record && (record.isControl() || (record.getDlsn().compareTo(getStartDLSN()) < 0))); |
| if (null == record) { |
| break; |
| } else { |
| if (record.isEndOfStream() && !returnEndOfStreamRecord) { |
| setLastException(new EndOfStreamException("End of Stream Reached for " |
| + readHandler.getFullyQualifiedName())); |
| break; |
| } |
| |
| // gap detection |
| if (recordPositionsContainsGap(record, lastPosition)) { |
| bkDistributedLogManager.raiseAlert("Gap detected between records at record = {}", record); |
| if (positionGapDetectionEnabled) { |
| throw new DLIllegalStateException("Gap detected between records at record = " + record); |
| } |
| } |
| lastPosition = record.getLastPositionWithinLogSegment(); |
| |
| nextRequest.addRecord(record); |
| } |
| }; |
| } catch (IOException exc) { |
| setLastException(exc); |
| if (!(exc instanceof LogNotFoundException)) { |
| LOG.warn("{} : read with skip Exception", readHandler.getFullyQualifiedName(), lastException.get()); |
| } |
| continue; |
| } |
| |
| if (nextRequest.hasReadRecords()) { |
| long remainingWaitTime = nextRequest.getRemainingWaitTime(); |
| if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) { |
| backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); |
| scheduleDelayStopwatch.reset().start(); |
| scheduleCount.set(0); |
| // the request could still wait for more records |
| backgroundScheduleTask = scheduler.schedule( |
| streamName, |
| BACKGROUND_READ_SCHEDULER, |
| remainingWaitTime, |
| nextRequest.deadlineTimeUnit); |
| return; |
| } |
| |
| PendingReadRequest request = pendingRequests.poll(); |
| if (null != request && nextRequest == request) { |
| request.complete(); |
| if (null != backgroundScheduleTask) { |
| backgroundScheduleTask.cancel(true); |
| backgroundScheduleTask = null; |
| } |
| } else { |
| DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at dlsn = " |
| + nextRequest.records.get(0).getDlsn()); |
| nextRequest.setException(ise); |
| if (null != request) { |
| request.setException(ise); |
| } |
| // We should never get here as we should have exited the loop if |
| // pendingRequests were empty |
| bkDistributedLogManager.raiseAlert("Unexpected condition at dlsn = {}", |
| nextRequest.records.get(0).getDlsn()); |
| setLastException(ise); |
| } |
| } else { |
| if (0 == scheduleCountLocal) { |
| LOG.trace("Schedule count dropping to zero", lastException.get()); |
| backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); |
| return; |
| } |
| scheduleCountLocal = scheduleCount.decrementAndGet(); |
| } |
| } |
| } |
| } |
| |
| private boolean recordPositionsContainsGap(LogRecordWithDLSN record, long lastPosition) { |
| final boolean firstLogRecord = (1 == record.getPositionWithinLogSegment()); |
| final boolean endOfStreamRecord = record.isEndOfStream(); |
| final boolean emptyLogSegment = (0 == lastPosition); |
| final boolean positionIncreasedByOne = (record.getPositionWithinLogSegment() == (lastPosition + 1)); |
| |
| return !firstLogRecord && !endOfStreamRecord && !emptyLogSegment && |
| !positionIncreasedByOne; |
| } |
| |
| /** |
| * Triggered when the background activity encounters an exception |
| */ |
| @Override |
| public void notifyOnError(Throwable cause) { |
| if (cause instanceof IOException) { |
| setLastException((IOException) cause); |
| } else { |
| setLastException(new IOException(cause)); |
| } |
| scheduleBackgroundRead(); |
| } |
| |
| /** |
| * Triggered when the background activity completes an operation |
| */ |
| @Override |
| public void notifyOnOperationComplete() { |
| scheduleBackgroundRead(); |
| } |
| |
| @VisibleForTesting |
| void simulateErrors() { |
| bkDistributedLogManager.getFailureInjector().injectErrors(true); |
| } |
| |
| @VisibleForTesting |
| synchronized void disableReadAheadLogSegmentsNotification() { |
| readHandler.disableReadAheadLogSegmentsNotification(); |
| } |
| |
| @VisibleForTesting |
| synchronized void disableProcessingReadRequests() { |
| disableProcessingReadRequests = true; |
| } |
| } |
| |