blob: af28c11f00127f25e8c0749f008baec33d755040 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
/**
* Handles the life-cycle of an addEntry() operation
*
*/
class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
private ManagedLedgerImpl ml;
private LedgerHandle ledger;
private long entryId;
@SuppressWarnings("unused")
private volatile AddEntryCallback callback;
private Object ctx;
private boolean closeWhenDone;
private long startTime;
ByteBuf data;
private int dataLength;
private static final AtomicReferenceFieldUpdater<OpAddEntry, AddEntryCallback> callbackUpdater = AtomicReferenceFieldUpdater
.newUpdater(OpAddEntry.class, AddEntryCallback.class, "callback");
public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
op.data = data.retain();
op.dataLength = data.readableBytes();
op.callback = callback;
op.ctx = ctx;
op.closeWhenDone = false;
op.entryId = -1;
op.startTime = System.nanoTime();
ml.mbean.addAddEntrySample(op.dataLength);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}
public void setLedger(LedgerHandle ledger) {
this.ledger = ledger;
}
public void setCloseWhenDone(boolean closeWhenDone) {
this.closeWhenDone = closeWhenDone;
}
public void initiate() {
ByteBuf duplicateBuffer = data.retainedDuplicate();
// duplicatedBuffer has refCnt=1 at this point
ledger.asyncAddEntry(duplicateBuffer, this, ctx);
// Internally, asyncAddEntry() is refCnt neutral to respect to the passed buffer and it will keep a ref on it
// until is done using it. We need to release this buffer here to balance the 1 refCnt added at the creation
// time.
duplicateBuffer.release();
}
public void failed(ManagedLedgerException e) {
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
cb.addFailed(e, ctx);
ml.mbean.recordAddEntryError();
}
}
@Override
public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) {
if (ledger.getId() != lh.getId()) {
log.warn("[{}] ledgerId {} doesn't match with acked ledgerId {}", ml.getName(), ledger.getId(), lh.getId());
}
checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
lh.getId());
checkArgument(this.ctx == ctx);
this.entryId = entryId;
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] write-complete: ledger-id={} entry-id={} size={} rc={}", this, ml.getName(),
lh.getId(), entryId, dataLength, rc);
}
if (rc != BKException.Code.OK) {
// If we get a write error, we will try to create a new ledger and re-submit the pending writes. If the
// ledger creation fails (persistent bk failure, another instanche owning the ML, ...), then the writes will
// be marked as failed.
ml.mbean.recordAddEntryError();
ml.getExecutor().submitOrdered(ml.getName(), SafeRun.safeRun(() -> {
// Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock
// from a BK callback.
ml.ledgerClosed(lh);
}));
} else {
// Trigger addComplete callback in a thread hashed on the managed ledger name
ml.getExecutor().submitOrdered(ml.getName(), this);
}
}
// Called in exector hashed on managed ledger name, once the add operation is complete
@Override
public void safeRun() {
// Remove this entry from the head of the pending queue
OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
checkArgument(this == firstInQueue);
ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
if (ml.hasActiveCursors()) {
// Avoid caching entries if no cursor has been created
EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data);
// EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling
// insert
ml.entryCache.insert(entry);
entry.release();
}
// We are done using the byte buffer
data.release();
PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml);
ml.lastConfirmedEntry = lastEntry;
if (closeWhenDone) {
log.info("[{}] Closing ledger {} for being full", ml.getName(), ledger.getId());
ledger.asyncClose(this, ctx);
} else {
updateLatency();
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
cb.addComplete(PositionImpl.get(ledger.getId(), entryId), ctx);
ml.notifyCursors();
this.recycle();
}
}
}
@Override
public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
lh.getId());
if (rc == BKException.Code.OK) {
log.debug("Successfuly closed ledger {}", lh.getId());
} else {
log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
}
ml.ledgerClosed(lh);
updateLatency();
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
cb.addComplete(PositionImpl.get(lh.getId(), entryId), ctx);
ml.notifyCursors();
this.recycle();
}
}
private void updateLatency() {
ml.mbean.addAddEntryLatencySample(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
}
private final Handle<OpAddEntry> recyclerHandle;
private OpAddEntry(Handle<OpAddEntry> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
private static final Recycler<OpAddEntry> RECYCLER = new Recycler<OpAddEntry>() {
protected OpAddEntry newObject(Recycler.Handle<OpAddEntry> recyclerHandle) {
return new OpAddEntry(recyclerHandle);
}
};
public void recycle() {
ml = null;
ledger = null;
data = null;
dataLength = -1;
callback = null;
ctx = null;
closeWhenDone = false;
entryId = -1;
startTime = -1;
recyclerHandle.recycle(this);
}
private static final Logger log = LoggerFactory.getLogger(OpAddEntry.class);
}