| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.bookkeeper.client; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_HIGH_PRIORITY; |
| import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_NONE; |
| import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD; |
| |
| import com.google.common.collect.ImmutableMap; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.util.Recycler; |
| import io.netty.util.Recycler.Handle; |
| import io.netty.util.ReferenceCountUtil; |
| import java.util.EnumSet; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency; |
| import org.apache.bookkeeper.client.api.WriteFlag; |
| import org.apache.bookkeeper.net.BookieSocketAddress; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; |
| import org.apache.bookkeeper.util.ByteBufList; |
| import org.apache.bookkeeper.util.MathUtils; |
| import org.apache.bookkeeper.util.SafeRunnable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This represents a pending add operation. When it has got success from all |
| * bookies, it sees if its at the head of the pending adds queue, and if yes, |
| * sends ack back to the application. If a bookie fails, a replacement is made |
| * and placed at the same position in the ensemble. The pending adds are then |
| * rereplicated. |
| * |
| * |
| */ |
| class PendingAddOp extends SafeRunnable implements WriteCallback { |
| private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class); |
| |
| ByteBuf payload; |
| ByteBufList toSend; |
| AddCallbackWithLatency cb; |
| Object ctx; |
| long entryId; |
| int entryLength; |
| |
| DistributionSchedule.AckSet ackSet; |
| boolean completed = false; |
| |
| LedgerHandle lh; |
| ClientContext clientCtx; |
| boolean isRecoveryAdd = false; |
| long requestTimeNanos; |
| long qwcLatency; // Quorum Write Completion Latency after response from quorum bookies. |
| |
| long currentLedgerLength; |
| int pendingWriteRequests; |
| boolean callbackTriggered; |
| boolean hasRun; |
| EnumSet<WriteFlag> writeFlags; |
| boolean allowFailFast = false; |
| List<BookieSocketAddress> ensemble; |
| |
| static PendingAddOp create(LedgerHandle lh, ClientContext clientCtx, |
| List<BookieSocketAddress> ensemble, |
| ByteBuf payload, EnumSet<WriteFlag> writeFlags, |
| AddCallbackWithLatency cb, Object ctx) { |
| PendingAddOp op = RECYCLER.get(); |
| op.lh = lh; |
| op.clientCtx = clientCtx; |
| op.isRecoveryAdd = false; |
| op.cb = cb; |
| op.ctx = ctx; |
| op.entryId = LedgerHandle.INVALID_ENTRY_ID; |
| op.currentLedgerLength = -1; |
| op.payload = payload; |
| op.entryLength = payload.readableBytes(); |
| |
| op.completed = false; |
| op.ensemble = ensemble; |
| op.ackSet = lh.getDistributionSchedule().getAckSet(); |
| op.pendingWriteRequests = 0; |
| op.callbackTriggered = false; |
| op.hasRun = false; |
| op.requestTimeNanos = Long.MAX_VALUE; |
| op.allowFailFast = false; |
| op.qwcLatency = 0; |
| op.writeFlags = writeFlags; |
| |
| return op; |
| } |
| |
| /** |
| * Enable the recovery add flag for this operation. |
| * @see LedgerHandle#asyncRecoveryAddEntry |
| */ |
| PendingAddOp enableRecoveryAdd() { |
| isRecoveryAdd = true; |
| return this; |
| } |
| |
| PendingAddOp allowFailFastOnUnwritableChannel() { |
| allowFailFast = true; |
| return this; |
| } |
| |
| void setEntryId(long entryId) { |
| this.entryId = entryId; |
| } |
| |
| void setLedgerLength(long ledgerLength) { |
| this.currentLedgerLength = ledgerLength; |
| } |
| |
| long getEntryId() { |
| return this.entryId; |
| } |
| |
| void sendWriteRequest(List<BookieSocketAddress> ensemble, int bookieIndex) { |
| int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE; |
| |
| clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex), |
| lh.ledgerId, lh.ledgerKey, entryId, toSend, this, bookieIndex, |
| flags, allowFailFast, lh.writeFlags); |
| ++pendingWriteRequests; |
| } |
| |
| boolean maybeTimeout() { |
| if (MathUtils.elapsedNanos(requestTimeNanos) >= clientCtx.getConf().addEntryQuorumTimeoutNanos) { |
| timeoutQuorumWait(); |
| return true; |
| } |
| return false; |
| } |
| |
| void timeoutQuorumWait() { |
| try { |
| clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| if (completed) { |
| return; |
| } |
| lh.handleUnrecoverableErrorDuringAdd(BKException.Code.AddEntryQuorumTimeoutException); |
| } |
| @Override |
| public String toString() { |
| return String.format("AddEntryQuorumTimeout(lid=%d, eid=%d)", lh.ledgerId, entryId); |
| } |
| }); |
| } catch (RejectedExecutionException e) { |
| LOG.warn("Timeout add entry quorum wait failed {} entry: {}", lh.ledgerId, entryId); |
| } |
| } |
| |
| void unsetSuccessAndSendWriteRequest(List<BookieSocketAddress> ensemble, int bookieIndex) { |
| // update the ensemble |
| this.ensemble = ensemble; |
| |
| if (toSend == null) { |
| // this addOp hasn't yet had its mac computed. When the mac is |
| // computed, its write requests will be sent, so no need to send it |
| // now |
| return; |
| } |
| // Suppose that unset doesn't happen on the write set of an entry. In this |
| // case we don't need to resend the write request upon an ensemble change. |
| // We do need to invoke #sendAddSuccessCallbacks() for such entries because |
| // they may have already completed, but they are just waiting for the ensemble |
| // to change. |
| // E.g. |
| // ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change |
| // happens to replace C with E. Entry k does not complete until C is |
| // replaced with E successfully. When the ensemble change completes, it tries |
| // to unset entry k. C however is not in k's write set, so no entry is written |
| // again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never |
| // completes. |
| // |
| // We call sendAddSuccessCallback when unsetting t cover this case. |
| DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId); |
| try { |
| if (!writeSet.contains(bookieIndex)) { |
| lh.sendAddSuccessCallbacks(); |
| return; |
| } |
| } finally { |
| writeSet.recycle(); |
| } |
| |
| if (callbackTriggered) { |
| return; |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Unsetting success for ledger: " + lh.ledgerId + " entry: " + entryId + " bookie index: " |
| + bookieIndex); |
| } |
| |
| // if we had already heard a success from this array index, need to |
| // increment our number of responses that are pending, since we are |
| // going to unset this success |
| if (!ackSet.removeBookieAndCheck(bookieIndex)) { |
| // unset completed if this results in loss of ack quorum |
| completed = false; |
| } |
| |
| sendWriteRequest(ensemble, bookieIndex); |
| } |
| |
| /** |
| * Initiate the add operation. |
| */ |
| public void safeRun() { |
| hasRun = true; |
| if (callbackTriggered) { |
| // this should only be true if the request was failed due |
| // to another request ahead in the pending queue, |
| // so we can just ignore this request |
| maybeRecycle(); |
| return; |
| } |
| |
| this.requestTimeNanos = MathUtils.nowInNano(); |
| checkNotNull(lh); |
| checkNotNull(lh.macManager); |
| |
| this.toSend = lh.macManager.computeDigestAndPackageForSending( |
| entryId, lh.lastAddConfirmed, currentLedgerLength, |
| payload); |
| // ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending |
| payload = null; |
| |
| // We are about to send. Check if we need to make an ensemble change |
| // becasue of delayed write errors |
| lh.maybeHandleDelayedWriteBookieFailure(); |
| |
| // Iterate over set and trigger the sendWriteRequests |
| DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId); |
| |
| try { |
| for (int i = 0; i < writeSet.size(); i++) { |
| sendWriteRequest(ensemble, writeSet.get(i)); |
| } |
| } finally { |
| writeSet.recycle(); |
| } |
| } |
| |
| @Override |
| public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { |
| int bookieIndex = (Integer) ctx; |
| --pendingWriteRequests; |
| |
| if (!ensemble.get(bookieIndex).equals(addr)) { |
| // ensemble has already changed, failure of this addr is immaterial |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Write did not succeed: " + ledgerId + ", " + entryId + ". But we have already fixed it."); |
| } |
| return; |
| } |
| |
| // must record all acks, even if complete (completion can be undone by an ensemble change) |
| boolean ackQuorum = false; |
| if (BKException.Code.OK == rc) { |
| ackQuorum = ackSet.completeBookieAndCheck(bookieIndex); |
| } |
| |
| if (completed) { |
| if (rc != BKException.Code.OK) { |
| // Got an error after satisfying AQ. This means we are under replicated at the create itself. |
| // Update the stat to reflect it. |
| clientCtx.getClientStats().getAddOpUrCounter().inc(); |
| if (!clientCtx.getConf().disableEnsembleChangeFeature.isAvailable() |
| && !clientCtx.getConf().delayEnsembleChange) { |
| lh.notifyWriteFailed(bookieIndex, addr); |
| } |
| } |
| // even the add operation is completed, but because we don't reset completed flag back to false when |
| // #unsetSuccessAndSendWriteRequest doesn't break ack quorum constraint. we still have current pending |
| // add op is completed but never callback. so do a check here to complete again. |
| // |
| // E.g. entry x is going to complete. |
| // |
| // 1) entry x + k hits a failure. lh.handleBookieFailure increases blockAddCompletions to 1, for ensemble |
| // change |
| // 2) entry x receives all responses, sets completed to true but fails to send success callback because |
| // blockAddCompletions is 1 |
| // 3) ensemble change completed. lh unset success starting from x to x+k, but since the unset doesn't break |
| // ackSet constraint. #removeBookieAndCheck doesn't set completed back to false. |
| // 4) so when the retry request on new bookie completes, it finds the pending op is already completed. |
| // we have to trigger #sendAddSuccessCallbacks |
| // |
| sendAddSuccessCallbacks(); |
| // I am already finished, ignore incoming responses. |
| // otherwise, we might hit the following error handling logic, which might cause bad things. |
| maybeRecycle(); |
| return; |
| } |
| |
| switch (rc) { |
| case BKException.Code.OK: |
| // continue |
| break; |
| case BKException.Code.ClientClosedException: |
| // bookie client is closed. |
| lh.errorOutPendingAdds(rc); |
| return; |
| case BKException.Code.IllegalOpException: |
| // illegal operation requested, like using unsupported feature in v2 protocol |
| lh.handleUnrecoverableErrorDuringAdd(rc); |
| return; |
| case BKException.Code.LedgerFencedException: |
| LOG.warn("Fencing exception on write: L{} E{} on {}", |
| ledgerId, entryId, addr); |
| lh.handleUnrecoverableErrorDuringAdd(rc); |
| return; |
| case BKException.Code.UnauthorizedAccessException: |
| LOG.warn("Unauthorized access exception on write: L{} E{} on {}", |
| ledgerId, entryId, addr); |
| lh.handleUnrecoverableErrorDuringAdd(rc); |
| return; |
| default: |
| if (clientCtx.getConf().delayEnsembleChange) { |
| if (ackSet.failBookieAndCheck(bookieIndex, addr) |
| || rc == BKException.Code.WriteOnReadOnlyBookieException) { |
| Map<Integer, BookieSocketAddress> failedBookies = ackSet.getFailedBookies(); |
| LOG.warn("Failed to write entry ({}, {}) to bookies {}, handling failures.", |
| ledgerId, entryId, failedBookies); |
| // we can't meet ack quorum requirement, trigger ensemble change. |
| lh.handleBookieFailure(failedBookies); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Failed to write entry ({}, {}) to bookie ({}, {})," |
| + " but it didn't break ack quorum, delaying ensemble change : {}", |
| ledgerId, entryId, bookieIndex, addr, BKException.getMessage(rc)); |
| } |
| } |
| } else { |
| LOG.warn("Failed to write entry ({}, {}): {}", |
| ledgerId, entryId, BKException.getMessage(rc)); |
| lh.handleBookieFailure(ImmutableMap.of(bookieIndex, addr)); |
| } |
| return; |
| } |
| |
| if (ackQuorum && !completed) { |
| completed = true; |
| this.qwcLatency = MathUtils.elapsedNanos(requestTimeNanos); |
| |
| sendAddSuccessCallbacks(); |
| } |
| } |
| |
| void sendAddSuccessCallbacks() { |
| lh.sendAddSuccessCallbacks(); |
| } |
| |
| void submitCallback(final int rc) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(), entryId, rc); |
| } |
| |
| long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); |
| if (rc != BKException.Code.OK) { |
| clientCtx.getClientStats().getAddOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); |
| LOG.error("Write of ledger entry to quorum failed: L{} E{}", |
| lh.getId(), entryId); |
| } else { |
| clientCtx.getClientStats().getAddOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); |
| } |
| cb.addCompleteWithLatency(rc, lh, entryId, qwcLatency, ctx); |
| callbackTriggered = true; |
| |
| maybeRecycle(); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("PendingAddOp(lid:").append(lh.ledgerId) |
| .append(", eid:").append(entryId).append(", completed:") |
| .append(completed).append(")"); |
| return sb.toString(); |
| } |
| |
| @Override |
| public int hashCode() { |
| return (int) entryId; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (o instanceof PendingAddOp) { |
| return (this.entryId == ((PendingAddOp) o).entryId); |
| } |
| return (this == o); |
| } |
| |
| private final Handle<PendingAddOp> recyclerHandle; |
| private static final Recycler<PendingAddOp> RECYCLER = new Recycler<PendingAddOp>() { |
| protected PendingAddOp newObject(Recycler.Handle<PendingAddOp> handle) { |
| return new PendingAddOp(handle); |
| } |
| }; |
| |
| private PendingAddOp(Handle<PendingAddOp> recyclerHandle) { |
| this.recyclerHandle = recyclerHandle; |
| } |
| |
| |
| private void maybeRecycle() { |
| /** |
| * We have opportunity to recycle two objects here. |
| * PendingAddOp#toSend and LedgerHandle#pendingAddOp |
| * |
| * A. LedgerHandle#pendingAddOp: This can be released after all 3 conditions are met. |
| * - After the callback is run |
| * - After safeRun finished by the executor |
| * - Write responses are returned from all bookies |
| * as BookieClient Holds a reference from the point the addEntry requests are sent. |
| * |
| * B. ByteBuf can be released after 2 of the conditions are met. |
| * - After the callback is run as we will not retry the write after callback |
| * - After safeRun finished by the executor |
| * BookieClient takes and releases on this buffer immediately after sending the data. |
| * |
| * The object can only be recycled after the above conditions are met |
| * otherwise we could end up recycling twice and all |
| * joy that goes along with that. |
| */ |
| if (hasRun && callbackTriggered) { |
| ReferenceCountUtil.release(toSend); |
| toSend = null; |
| } |
| // only recycle a pending add op after it has been run. |
| if (hasRun && toSend == null && pendingWriteRequests == 0) { |
| recyclePendAddOpObject(); |
| } |
| } |
| |
| private void recyclePendAddOpObject() { |
| entryId = LedgerHandle.INVALID_ENTRY_ID; |
| currentLedgerLength = -1; |
| if (payload != null) { |
| ReferenceCountUtil.release(payload); |
| payload = null; |
| } |
| cb = null; |
| ctx = null; |
| ensemble = null; |
| ackSet.recycle(); |
| ackSet = null; |
| lh = null; |
| clientCtx = null; |
| isRecoveryAdd = false; |
| completed = false; |
| pendingWriteRequests = 0; |
| callbackTriggered = false; |
| hasRun = false; |
| allowFailFast = false; |
| writeFlags = null; |
| |
| recyclerHandle.recycle(this); |
| } |
| } |