| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.bookkeeper.client; |
| |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.net.InetSocketAddress; |
| import org.apache.bookkeeper.client.AsyncCallback.AddCallback; |
| import org.apache.bookkeeper.proto.BookieProtocol; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; |
| import org.jboss.netty.buffer.ChannelBuffer; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This represents a pending add operation. When it has got success from all |
| * bookies, it sees if its at the head of the pending adds queue, and if yes, |
| * sends ack back to the application. If a bookie fails, a replacement is made |
| * and placed at the same position in the ensemble. The pending adds are then |
| * rereplicated. |
| * |
| * |
| */ |
| class PendingAddOp implements WriteCallback { |
| final static Logger LOG = LoggerFactory.getLogger(PendingAddOp.class); |
| |
| ChannelBuffer toSend; |
| AddCallback cb; |
| Object ctx; |
| long entryId; |
| int entryLength; |
| Set<Integer> writeSet; |
| |
| DistributionSchedule.AckSet ackSet; |
| boolean completed = false; |
| |
| LedgerHandle lh; |
| boolean isRecoveryAdd = false; |
| |
| PendingAddOp(LedgerHandle lh, AddCallback cb, Object ctx) { |
| this.lh = lh; |
| this.cb = cb; |
| this.ctx = ctx; |
| this.entryId = LedgerHandle.INVALID_ENTRY_ID; |
| |
| ackSet = lh.distributionSchedule.getAckSet(); |
| } |
| |
| /** |
| * Enable the recovery add flag for this operation. |
| * @see LedgerHandle#asyncRecoveryAddEntry |
| */ |
| PendingAddOp enableRecoveryAdd() { |
| isRecoveryAdd = true; |
| return this; |
| } |
| |
| void setEntryId(long entryId) { |
| this.entryId = entryId; |
| writeSet = new HashSet<Integer>(lh.distributionSchedule.getWriteSet(entryId)); |
| } |
| |
| void sendWriteRequest(int bookieIndex) { |
| int flags = isRecoveryAdd ? BookieProtocol.FLAG_RECOVERY_ADD : BookieProtocol.FLAG_NONE; |
| |
| lh.bk.bookieClient.addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey, entryId, toSend, |
| this, bookieIndex, flags); |
| } |
| |
| void unsetSuccessAndSendWriteRequest(int bookieIndex) { |
| if (toSend == null) { |
| // this addOp hasn't yet had its mac computed. When the mac is |
| // computed, its write requests will be sent, so no need to send it |
| // now |
| return; |
| } |
| // Suppose that unset doesn't happen on the write set of an entry. In this |
| // case we don't need to resend the write request upon an ensemble change. |
| // We do need to invoke #sendAddSuccessCallbacks() for such entries because |
| // they may have already completed, but they are just waiting for the ensemble |
| // to change. |
| // E.g. |
| // ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change |
| // happens to replace C with E. Entry k does not complete until C is |
| // replaced with E successfully. When the ensemble change completes, it tries |
| // to unset entry k. C however is not in k's write set, so no entry is written |
| // again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never |
| // completes. |
| // |
| // We call sendAddSuccessCallback when unsetting t cover this case. |
| if (!writeSet.contains(bookieIndex)) { |
| lh.sendAddSuccessCallbacks(); |
| return; |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Unsetting success for ledger: " + lh.ledgerId + " entry: " + entryId + " bookie index: " |
| + bookieIndex); |
| } |
| |
| // if we had already heard a success from this array index, need to |
| // increment our number of responses that are pending, since we are |
| // going to unset this success |
| ackSet.removeBookie(bookieIndex); |
| completed = false; |
| |
| sendWriteRequest(bookieIndex); |
| } |
| |
| void initiate(ChannelBuffer toSend, int entryLength) { |
| this.toSend = toSend; |
| this.entryLength = entryLength; |
| for (int bookieIndex : writeSet) { |
| sendWriteRequest(bookieIndex); |
| } |
| } |
| |
| @Override |
| public void writeComplete(int rc, long ledgerId, long entryId, InetSocketAddress addr, Object ctx) { |
| int bookieIndex = (Integer) ctx; |
| |
| if (completed) { |
| // I am already finished, ignore incoming responses. |
| // otherwise, we might hit the following error handling logic, which might cause bad things. |
| return; |
| } |
| |
| switch (rc) { |
| case BKException.Code.OK: |
| // continue |
| break; |
| case BKException.Code.LedgerFencedException: |
| LOG.warn("Fencing exception on write: L{} E{} on {}", |
| new Object[] { ledgerId, entryId, addr }); |
| lh.handleUnrecoverableErrorDuringAdd(rc); |
| return; |
| case BKException.Code.UnauthorizedAccessException: |
| LOG.warn("Unauthorized access exception on write: L{} E{} on {}", |
| new Object[] { ledgerId, entryId, addr }); |
| lh.handleUnrecoverableErrorDuringAdd(rc); |
| return; |
| default: |
| LOG.warn("Write did not succeed: L{} E{} on {}", |
| new Object[] { ledgerId, entryId, addr }); |
| lh.handleBookieFailure(addr, bookieIndex); |
| return; |
| } |
| |
| if (!writeSet.contains(bookieIndex)) { |
| LOG.warn("Received a response for (lid:{}, eid:{}) from {}@{}, but it doesn't belong to {}.", |
| new Object[] { ledgerId, entryId, addr, bookieIndex, writeSet }); |
| return; |
| } |
| |
| if (ackSet.addBookieAndCheck(bookieIndex) && !completed) { |
| completed = true; |
| |
| LOG.debug("Complete (lid:{}, eid:{}).", ledgerId, entryId); |
| // when completed an entry, try to send success add callbacks in order |
| lh.sendAddSuccessCallbacks(); |
| } |
| } |
| |
| void submitCallback(final int rc) { |
| if (rc != BKException.Code.OK) { |
| LOG.error("Write of ledger entry to quorum failed: L{} E{}", |
| lh.getId(), entryId); |
| } |
| |
| cb.addComplete(rc, lh, entryId, ctx); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("PendingAddOp(lid:").append(lh.ledgerId) |
| .append(", eid:").append(entryId).append(", completed:") |
| .append(completed).append(")"); |
| return sb.toString(); |
| } |
| |
| } |