blob: 7b59c3903d5bc2be750892c0313b97a99185ae52 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class OpReadEntry implements ReadEntriesCallback {
ManagedCursorImpl cursor;
PositionImpl readPosition;
private int count;
private ReadEntriesCallback callback;
Object ctx;
// Results
private List<Entry> entries;
private PositionImpl nextReadPosition;
PositionImpl maxPosition;
Predicate<PositionImpl> skipCondition;
public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
OpReadEntry op = RECYCLER.get();
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
op.cursor = cursor;
op.count = count;
op.callback = callback;
op.entries = new ArrayList<>();
if (maxPosition == null) {
maxPosition = PositionImpl.LATEST;
}
op.maxPosition = maxPosition;
op.skipCondition = skipCondition;
op.ctx = ctx;
op.nextReadPosition = PositionImpl.get(op.readPosition);
return op;
}
void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, PositionImpl lastPosition) {
// Filter the returned entries for individual deleted messages
int entriesCount = returnedEntries.size();
long entriesSize = 0;
for (int i = 0; i < entriesCount; i++) {
entriesSize += returnedEntries.get(i).getLength();
}
cursor.updateReadStats(entriesCount, entriesSize);
if (entriesCount != 0) {
lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Read entries succeeded batch_size={} cumulative_size={} requested_count={}",
cursor.ledger.getName(), cursor.getName(), returnedEntries.size(), entries.size(), count);
}
List<Entry> filteredEntries = Collections.emptyList();
if (entriesCount != 0) {
filteredEntries = cursor.filterReadEntries(returnedEntries);
entries.addAll(filteredEntries);
}
// if entries have been filtered out then try to skip reading of already deletedMessages in that range
final Position nexReadPosition = entriesCount != filteredEntries.size()
? cursor.getNextAvailablePosition(lastPosition) : lastPosition.getNext();
updateReadPosition(nexReadPosition);
checkReadCompletion();
}
@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
internalReadEntriesComplete(returnedEntries, ctx, null);
}
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
cursor.readOperationCompleted();
if (!entries.isEmpty()) {
// There were already some entries that were read before, we can return them
cursor.ledger.getExecutor().execute(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
});
} else if (cursor.config.isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
readPosition, exception.getMessage());
final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger();
Position nexReadPosition;
Long lostLedger = null;
if (exception instanceof ManagedLedgerException.LedgerNotExistException) {
// try to find and move to next valid ledger
nexReadPosition = cursor.getNextLedgerPosition(readPosition.getLedgerId());
lostLedger = readPosition.ledgerId;
} else {
// Skip this read operation
nexReadPosition = ledger.getValidPositionAfterSkippedEntries(readPosition, count);
}
// fail callback if it couldn't find next valid ledger
if (nexReadPosition == null) {
callback.readEntriesFailed(exception, ctx);
cursor.ledger.mbean.recordReadEntriesError();
recycle();
return;
}
updateReadPosition(nexReadPosition);
if (lostLedger != null) {
cursor.getManagedLedger().skipNonRecoverableLedger(lostLedger);
}
checkReadCompletion();
} else {
if (!(exception instanceof TooManyRequestsException)) {
log.warn("[{}][{}] read failed from ledger at position:{}", cursor.ledger.getName(),
cursor.getName(), readPosition, exception);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] read throttled failed from ledger at position:{}", cursor.ledger.getName(),
cursor.getName(), readPosition);
}
}
callback.readEntriesFailed(exception, ctx);
cursor.ledger.mbean.recordReadEntriesError();
recycle();
}
}
void updateReadPosition(Position newReadPosition) {
nextReadPosition = (PositionImpl) newReadPosition;
cursor.setReadPosition(nextReadPosition);
}
void checkReadCompletion() {
// op readPosition is smaller or equals maxPosition then can read again
if (entries.size() < count && cursor.hasMoreEntries()
&& maxPosition.compareTo(readPosition) > 0) {
// We still have more entries to read from the next ledger, schedule a new async operation
cursor.ledger.getExecutor().execute(() -> {
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
});
} else {
// The reading was already completed, release resources and trigger callback
try {
cursor.readOperationCompleted();
} finally {
cursor.ledger.getExecutor().execute(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
});
}
}
}
public int getNumberOfEntriesToRead() {
return count - entries.size();
}
private final Handle<OpReadEntry> recyclerHandle;
private OpReadEntry(Handle<OpReadEntry> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
private static final Recycler<OpReadEntry> RECYCLER = new Recycler<OpReadEntry>() {
@Override
protected OpReadEntry newObject(Recycler.Handle<OpReadEntry> recyclerHandle) {
return new OpReadEntry(recyclerHandle);
}
};
public void recycle() {
count = 0;
cursor = null;
readPosition = null;
callback = null;
ctx = null;
entries = null;
nextReadPosition = null;
maxPosition = null;
recyclerHandle.recycle(this);
skipCondition = null;
}
private static final Logger log = LoggerFactory.getLogger(OpReadEntry.class);
}