blob: 51356c99a6b26e5d67856b4d96de8c892f0ac694 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.streamingdispatch;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.impl.Backoff;
/**
* Entry reader that fulfill read request by streamline the read instead of reading with micro batch.
*/
@Slf4j
@RequiredArgsConstructor
public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, WaitingEntryCallBack {
private final int maxRetry = 3;
// Queue for read request issued yet waiting for complete from managed ledger.
private ConcurrentLinkedQueue<PendingReadEntryRequest> issuedReads = new ConcurrentLinkedQueue<>();
// Queue for read request that's wait for new entries from managed ledger.
private ConcurrentLinkedQueue<PendingReadEntryRequest> pendingReads = new ConcurrentLinkedQueue<>();
private final ManagedCursorImpl cursor;
private final StreamingDispatcher dispatcher;
private final PersistentTopic topic;
private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
private volatile State state;
private static final AtomicReferenceFieldUpdater<StreamingEntryReader, State> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class, State.class, "state");
private volatile int maxReadSizeByte;
private final Backoff readFailureBackoff = new Backoff(10, TimeUnit.MILLISECONDS,
1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
/**
* Read entries in streaming way, that said instead of reading with micro batch and send entries to consumer after
* all entries in the batch are read from ledger, this method will fire numEntriesToRead requests to managedLedger
* and send entry to consumer whenever it is read && all entries before it have been sent to consumer.
* @param numEntriesToRead number of entry to read from ledger.
* @param maxReadSizeByte maximum byte will be read from ledger.
* @param ctx Context send along with read request.
*/
public synchronized void asyncReadEntries(int numEntriesToRead, int maxReadSizeByte, Object ctx) {
if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) {
internalCancelReadRequests();
}
if (!issuedReads.isEmpty() || !pendingReads.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}] There's pending streaming read not completed yet. Not scheduling next read request.",
cursor.getName());
}
return;
}
PositionImpl nextReadPosition = (PositionImpl) cursor.getReadPosition();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) cursor.getManagedLedger();
// Edge case, when a old ledger is full and new ledger is not yet opened, position can point to next
// position of the last confirmed position, but it'll be an invalid position. So try to update the position.
if (!managedLedger.isValidPosition(nextReadPosition)) {
nextReadPosition = managedLedger.getNextValidPosition(nextReadPosition);
}
boolean hasEntriesToRead = managedLedger.hasMoreEntries(nextReadPosition);
currentReadSizeByte.set(0);
STATE_UPDATER.set(this, State.Issued);
this.maxReadSizeByte = maxReadSizeByte;
for (int c = 0; c < numEntriesToRead; c++) {
PendingReadEntryRequest pendingReadEntryRequest = PendingReadEntryRequest.create(ctx, nextReadPosition);
// Make sure once we start putting request into pending requests queue, we won't put any following request
// to issued requests queue in order to guarantee the order.
if (hasEntriesToRead && managedLedger.hasMoreEntries(nextReadPosition)) {
issuedReads.offer(pendingReadEntryRequest);
} else {
pendingReads.offer(pendingReadEntryRequest);
}
nextReadPosition = managedLedger.getNextValidPosition(nextReadPosition);
}
// Issue requests.
for (PendingReadEntryRequest request : issuedReads) {
managedLedger.asyncReadEntry(request.position, this, request);
}
if (!pendingReads.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}} Streaming entry reader has {} pending read requests waiting on new entry."
, cursor.getName(), pendingReads.size());
}
// If new entries are available after we put request into pending queue, fire read.
// Else register callback with managed ledger to get notify when new entries are available.
if (managedLedger.hasMoreEntries(pendingReads.peek().position)) {
entriesAvailable();
} else if (managedLedger.isTerminated()) {
dispatcher.notifyConsumersEndOfTopic();
cleanQueue(pendingReads);
if (issuedReads.size() == 0) {
dispatcher.canReadMoreEntries(true);
}
} else {
managedLedger.addWaitingEntryCallBack(this);
}
}
}
@Override
public void readEntryComplete(Entry entry, Object ctx) {
// Don't block caller thread, complete read entry with dispatcher dedicated thread.
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(), SafeRun.safeRun(() -> {
internalReadEntryComplete(entry, ctx);
}));
}
private void internalReadEntryComplete(Entry entry, Object ctx) {
PendingReadEntryRequest pendingReadEntryRequest = (PendingReadEntryRequest) ctx;
pendingReadEntryRequest.entry = entry;
readFailureBackoff.reduceToHalf();
Entry readEntry;
// If we have entry to send to dispatcher.
if (!issuedReads.isEmpty() && issuedReads.peek() == pendingReadEntryRequest) {
while (!issuedReads.isEmpty() && issuedReads.peek().entry != null) {
PendingReadEntryRequest firstPendingReadEntryRequest = issuedReads.poll();
readEntry = firstPendingReadEntryRequest.entry;
currentReadSizeByte.addAndGet(readEntry.getLength());
//Cancel remaining requests and reset cursor if maxReadSizeByte exceeded.
if (currentReadSizeByte.get() > maxReadSizeByte) {
cancelReadRequests(readEntry.getPosition());
dispatcher.canReadMoreEntries(false);
STATE_UPDATER.set(this, State.Completed);
return;
} else {
// All request has been completed, mark returned entry as last.
if (issuedReads.isEmpty() && pendingReads.isEmpty()) {
firstPendingReadEntryRequest.isLast = true;
STATE_UPDATER.set(this, State.Completed);
}
dispatcher.readEntryComplete(readEntry, firstPendingReadEntryRequest);
}
}
} else if (!issuedReads.isEmpty() && issuedReads.peek().retry > maxRetry) {
cancelReadRequests(issuedReads.peek().position);
dispatcher.canReadMoreEntries(true);
STATE_UPDATER.set(this, State.Completed);
}
}
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
// Don't block caller thread, complete read entry fail with dispatcher dedicated thread.
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(), SafeRun.safeRun(() -> {
internalReadEntryFailed(exception, ctx);
}));
}
private void internalReadEntryFailed(ManagedLedgerException exception, Object ctx) {
PendingReadEntryRequest pendingReadEntryRequest = (PendingReadEntryRequest) ctx;
PositionImpl readPosition = pendingReadEntryRequest.position;
pendingReadEntryRequest.retry++;
long waitTimeMillis = readFailureBackoff.next();
if (exception.getCause() instanceof TransactionNotSealedException) {
waitTimeMillis = 1;
if (log.isDebugEnabled()) {
log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds",
cursor.getName(), exception.getMessage(), waitTimeMillis / 1000.0);
}
} else if (!(exception instanceof ManagedLedgerException.TooManyRequestsException)) {
log.error("[{} Error reading entries at {} : {} - Retrying to read in {} seconds", cursor.getName(),
readPosition, exception.getMessage(), waitTimeMillis / 1000.0);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds",
cursor.getName(), readPosition, exception.getMessage(), waitTimeMillis / 1000.0);
}
}
if (!issuedReads.isEmpty()) {
if (issuedReads.peek().retry > maxRetry) {
cancelReadRequests(issuedReads.peek().position);
dispatcher.canReadMoreEntries(true);
STATE_UPDATER.set(this, State.Completed);
return;
}
if (pendingReadEntryRequest.retry <= maxRetry) {
retryReadRequest(pendingReadEntryRequest, waitTimeMillis);
}
}
}
// Cancel all issued and pending request and update cursor's read position.
private void cancelReadRequests(Position position) {
if (!issuedReads.isEmpty()) {
cleanQueue(issuedReads);
cursor.seek(position);
}
if (!pendingReads.isEmpty()) {
cleanQueue(pendingReads);
}
}
private void internalCancelReadRequests() {
Position readPosition = !issuedReads.isEmpty() ? issuedReads.peek().position : pendingReads.peek().position;
cancelReadRequests(readPosition);
}
public boolean cancelReadRequests() {
if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
// Don't block caller thread, complete cancel read with dispatcher dedicated thread.
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(), SafeRun.safeRun(() -> {
synchronized (StreamingEntryReader.this) {
if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) {
internalCancelReadRequests();
}
}
}));
return true;
}
return false;
}
private void cleanQueue(Queue<PendingReadEntryRequest> queue) {
while (!queue.isEmpty()) {
PendingReadEntryRequest pendingReadEntryRequest = queue.poll();
if (pendingReadEntryRequest.entry != null) {
pendingReadEntryRequest.entry.release();
pendingReadEntryRequest.recycle();
}
}
}
private void retryReadRequest(PendingReadEntryRequest pendingReadEntryRequest, long delay) {
topic.getBrokerService().executor().schedule(() -> {
// Jump again into dispatcher dedicated thread
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
SafeRun.safeRun(() -> {
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) cursor.getManagedLedger();
managedLedger.asyncReadEntry(pendingReadEntryRequest.position, this, pendingReadEntryRequest);
}));
}, delay, TimeUnit.MILLISECONDS);
}
@Override
public void entriesAvailable() {
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(), SafeRun.safeRun(() -> {
internalEntriesAvailable();
}));
}
private synchronized void internalEntriesAvailable() {
if (log.isDebugEnabled()) {
log.debug("[{}} Streaming entry reader get notification of newly added entries from managed ledger,"
+ " trying to issued pending read requests.", cursor.getName());
}
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) cursor.getManagedLedger();
List<PendingReadEntryRequest> newlyIssuedRequests = new ArrayList<>();
if (!pendingReads.isEmpty()) {
// Edge case, when a old ledger is full and new ledger is not yet opened, position can point to next
// position of the last confirmed position, but it'll be an invalid position. So try to update the position.
if (!managedLedger.isValidPosition(pendingReads.peek().position)) {
pendingReads.peek().position = managedLedger.getNextValidPosition(pendingReads.peek().position);
}
while (!pendingReads.isEmpty() && managedLedger.hasMoreEntries(pendingReads.peek().position)) {
PendingReadEntryRequest next = pendingReads.poll();
issuedReads.offer(next);
newlyIssuedRequests.add(next);
// Need to update the position because when the PendingReadEntryRequest is created, the position could
// be all set to managed ledger's last confirmed position.
if (!pendingReads.isEmpty()) {
pendingReads.peek().position = managedLedger.getNextValidPosition(next.position);
}
}
for (PendingReadEntryRequest request : newlyIssuedRequests) {
managedLedger.asyncReadEntry(request.position, this, request);
}
if (!pendingReads.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("[{}} Streaming entry reader has {} pending read requests waiting on new entry."
, cursor.getName(), pendingReads.size());
}
if (managedLedger.hasMoreEntries(pendingReads.peek().position)) {
entriesAvailable();
} else {
managedLedger.addWaitingEntryCallBack(this);
}
}
}
}
protected State getState() {
return STATE_UPDATER.get(this);
}
enum State {
Issued, Canceling, Canceled, Completed;
}
}