blob: 58933e556f0681f8a14de95c64c359dd1341bc3d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.twitter.distributedlog.callback.ReadAheadCallback;
import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
import com.twitter.distributedlog.exceptions.LogReadException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReadAheadCache {
static final Logger LOG = LoggerFactory.getLogger(ReadAheadCache.class);
private final String streamName;
private final LinkedBlockingQueue<LogRecordWithDLSN> readAheadRecords;
private final int maxCachedRecords;
private final AtomicReference<DLSN> minActiveDLSN = new AtomicReference<DLSN>(DLSN.NonInclusiveLowerBound);
private DLSN lastReadAheadDLSN = DLSN.InvalidDLSN;
private DLSN lastReadAheadUserDLSN = DLSN.InvalidDLSN;
private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
private final boolean deserializeRecordSet;
// callbacks
private final AsyncNotification notification;
private ReadAheadCallback readAheadCallback = null;
// variables for idle reader detection
private final Stopwatch lastEntryProcessTime;
// Stats
private final AtomicLong cacheBytes = new AtomicLong(0);
private final AlertStatsLogger alertStatsLogger;
private final StatsLogger statsLogger;
private final OpStatsLogger readAheadDeliveryLatencyStat;
private final OpStatsLogger negativeReadAheadDeliveryLatencyStat;
// Flags on controlling delivery latency stats collection
private final boolean traceDeliveryLatencyEnabled;
private volatile boolean suppressDeliveryLatency = true;
private final long deliveryLatencyWarnThresholdMillis;
public ReadAheadCache(String streamName,
StatsLogger statsLogger,
AlertStatsLogger alertStatsLogger,
AsyncNotification notification,
int maxCachedRecords,
boolean deserializeRecordSet,
boolean traceDeliveryLatencyEnabled,
long deliveryLatencyWarnThresholdMillis,
Ticker ticker) {
this.streamName = streamName;
this.maxCachedRecords = maxCachedRecords;
this.notification = notification;
this.deserializeRecordSet = deserializeRecordSet;
// create the readahead queue
readAheadRecords = new LinkedBlockingQueue<LogRecordWithDLSN>();
// start the idle reader detection
lastEntryProcessTime = Stopwatch.createStarted(ticker);
// Flags to control delivery latency tracing
this.traceDeliveryLatencyEnabled = traceDeliveryLatencyEnabled;
this.deliveryLatencyWarnThresholdMillis = deliveryLatencyWarnThresholdMillis;
// Stats
StatsLogger readAheadStatsLogger = statsLogger.scope("readahead");
this.statsLogger = readAheadStatsLogger;
this.alertStatsLogger = alertStatsLogger;
this.readAheadDeliveryLatencyStat =
readAheadStatsLogger.getOpStatsLogger("delivery_latency");
this.negativeReadAheadDeliveryLatencyStat =
readAheadStatsLogger.getOpStatsLogger("negative_delivery_latency");
}
DLSN getLastReadAheadUserDLSN() {
return lastReadAheadUserDLSN;
}
/**
* Trigger read ahead callback
*/
private synchronized void invokeReadAheadCallback() {
if (null != readAheadCallback) {
if (LOG.isTraceEnabled()) {
LOG.trace("Cache has space, schedule the read ahead");
}
readAheadCallback.resumeReadAhead();
readAheadCallback = null;
}
}
/**
* Register a readhead callback.
*
* @param readAheadCallback
* read ahead callback
*/
public synchronized void setReadAheadCallback(ReadAheadCallback readAheadCallback) {
this.readAheadCallback = readAheadCallback;
if (!isCacheFull()) {
invokeReadAheadCallback();
}
}
private void setLastException(IOException exc) {
lastException.set(exc);
}
/**
* Poll next record from the readahead queue.
*
* @return next record from readahead queue. null if no records available in the queue.
* @throws IOException
*/
public LogRecordWithDLSN getNextReadAheadRecord() throws IOException {
if (null != lastException.get()) {
throw lastException.get();
}
LogRecordWithDLSN record = readAheadRecords.poll();
if (null != record) {
cacheBytes.addAndGet(-record.getPayload().length);
if (!isCacheFull()) {
invokeReadAheadCallback();
}
}
return record;
}
/**
* Check whether the readahead becomes stall.
*
* @param idleReaderErrorThreshold
* idle reader error threshold
* @param timeUnit
* time unit of the idle reader error threshold
* @return true if the readahead becomes stall, otherwise false.
*/
public boolean isReadAheadIdle(int idleReaderErrorThreshold, TimeUnit timeUnit) {
return (lastEntryProcessTime.elapsed(timeUnit) > idleReaderErrorThreshold);
}
/**
* Set an ledger entry to readahead cache
*
* @param key
* read position of the entry
* @param entry
* the ledger entry
* @param reason
* the reason to add the entry to readahead (for logging)
* @param envelopeEntries
* whether this entry an enveloped entries or not
* @param startSequenceId
* the start sequence id
*/
public void set(LedgerReadPosition key,
LedgerEntry entry,
String reason,
boolean envelopeEntries,
long startSequenceId) {
processNewLedgerEntry(key, entry, reason, envelopeEntries, startSequenceId);
lastEntryProcessTime.reset().start();
AsyncNotification n = notification;
if (null != n) {
n.notifyOnOperationComplete();
}
}
public boolean isCacheFull() {
return getNumCachedRecords() >= maxCachedRecords;
}
/**
* Return number cached records.
*
* @return number cached records.
*/
public int getNumCachedRecords() {
return readAheadRecords.size();
}
/**
* Return number cached bytes.
*
* @return number cached bytes.
*/
public long getNumCachedBytes() {
return cacheBytes.get();
}
public void setSuppressDeliveryLatency(boolean suppressed) {
this.suppressDeliveryLatency = suppressed;
}
public void setMinActiveDLSN(DLSN minActiveDLSN) {
this.minActiveDLSN.set(minActiveDLSN);
}
/**
* Process the new ledger entry and propagate the records into readahead queue.
*
* @param readPosition
* position of the ledger entry
* @param ledgerEntry
* ledger entry
* @param reason
* reason to add this ledger entry
* @param envelopeEntries
* whether this entry is enveloped
* @param startSequenceId
* the start sequence id of this log segment
*/
private void processNewLedgerEntry(final LedgerReadPosition readPosition,
final LedgerEntry ledgerEntry,
final String reason,
boolean envelopeEntries,
long startSequenceId) {
try {
Entry.Reader reader = Entry.newBuilder()
.setLogSegmentInfo(readPosition.getLogSegmentSequenceNumber(), startSequenceId)
.setEntryId(ledgerEntry.getEntryId())
.setEnvelopeEntry(envelopeEntries)
.deserializeRecordSet(deserializeRecordSet)
.setInputStream(ledgerEntry.getEntryInputStream())
.buildReader();
while(true) {
LogRecordWithDLSN record = reader.nextRecord();
if (null == record) {
break;
}
if (lastReadAheadDLSN.compareTo(record.getDlsn()) >= 0) {
LOG.error("Out of order reads last {} : curr {}", lastReadAheadDLSN, record.getDlsn());
throw new LogReadException("Out of order reads");
}
lastReadAheadDLSN = record.getDlsn();
if (record.isControl()) {
continue;
}
lastReadAheadUserDLSN = lastReadAheadDLSN;
if (minActiveDLSN.get().compareTo(record.getDlsn()) > 0) {
continue;
}
if (traceDeliveryLatencyEnabled && !suppressDeliveryLatency) {
long currentMs = System.currentTimeMillis();
long deliveryMs = currentMs - record.getTransactionId();
if (deliveryMs >= 0) {
readAheadDeliveryLatencyStat.registerSuccessfulEvent(deliveryMs);
} else {
negativeReadAheadDeliveryLatencyStat.registerSuccessfulEvent(-deliveryMs);
}
if (deliveryMs > deliveryLatencyWarnThresholdMillis) {
LOG.warn("Record {} for stream {} took long time to deliver : publish time = {}, available time = {}, delivery time = {}, reason = {}.",
new Object[] { record.getDlsn(), streamName, record.getTransactionId(), currentMs, deliveryMs, reason });
}
}
readAheadRecords.add(record);
cacheBytes.addAndGet(record.getPayload().length);
}
} catch (InvalidEnvelopedEntryException ieee) {
alertStatsLogger.raise("Found invalid enveloped entry on stream {} : ", streamName, ieee);
setLastException(ieee);
} catch (IOException exc) {
setLastException(exc);
}
}
public void clear() {
readAheadRecords.clear();
cacheBytes.set(0L);
}
@Override
public String toString() {
return String.format("%s: Cache Bytes: %d, Num Cached Records: %d",
streamName, cacheBytes.get(), getNumCachedRecords());
}
}