| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.bookkeeper.client; |
| |
| import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID; |
| import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_READ; |
| import static org.apache.bookkeeper.replication.ReplicationStats.NUM_BYTES_WRITTEN; |
| import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_READ; |
| import static org.apache.bookkeeper.replication.ReplicationStats.NUM_ENTRIES_WRITTEN; |
| import static org.apache.bookkeeper.replication.ReplicationStats.READ_DATA_LATENCY; |
| import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WORKER_SCOPE; |
| import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY; |
| |
| import com.google.common.util.concurrent.RateLimiter; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| import io.netty.util.ReferenceCounted; |
| import java.util.Enumeration; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.BiConsumer; |
| import java.util.stream.Collectors; |
| import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; |
| import org.apache.bookkeeper.client.api.WriteFlag; |
| import org.apache.bookkeeper.common.util.MathUtils; |
| import org.apache.bookkeeper.conf.ClientConfiguration; |
| import org.apache.bookkeeper.meta.LedgerManager; |
| import org.apache.bookkeeper.net.BookieId; |
| import org.apache.bookkeeper.proto.BookieProtocol; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; |
| import org.apache.bookkeeper.stats.Counter; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.stats.OpStatsLogger; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.bookkeeper.stats.annotations.StatsDoc; |
| import org.apache.bookkeeper.util.ByteBufList; |
| import org.apache.zookeeper.AsyncCallback; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This is the helper class for replicating the fragments from one bookie to |
| * another. |
| */ |
| @StatsDoc( |
| name = REPLICATION_WORKER_SCOPE, |
| help = "Ledger fragment replicator related stats" |
| ) |
| public class LedgerFragmentReplicator { |
| |
| // BookKeeper instance |
| private BookKeeper bkc; |
| private StatsLogger statsLogger; |
| @StatsDoc( |
| name = NUM_ENTRIES_READ, |
| help = "Number of entries read by the replicator" |
| ) |
| private final Counter numEntriesRead; |
| @StatsDoc( |
| name = NUM_BYTES_READ, |
| help = "The distribution of size of entries read by the replicator" |
| ) |
| private final OpStatsLogger numBytesRead; |
| @StatsDoc( |
| name = NUM_ENTRIES_WRITTEN, |
| help = "Number of entries written by the replicator" |
| ) |
| private final Counter numEntriesWritten; |
| @StatsDoc( |
| name = NUM_BYTES_WRITTEN, |
| help = "The distribution of size of entries written by the replicator" |
| ) |
| private final OpStatsLogger numBytesWritten; |
| @StatsDoc( |
| name = READ_DATA_LATENCY, |
| help = "The distribution of latency of read entries by the replicator" |
| ) |
| private final OpStatsLogger readDataLatency; |
| @StatsDoc( |
| name = WRITE_DATA_LATENCY, |
| help = "The distribution of latency of write entries by the replicator" |
| ) |
| private final OpStatsLogger writeDataLatency; |
| |
| protected Throttler replicationThrottle = null; |
| |
| private AtomicInteger averageEntrySize; |
| |
| private static final int INITIAL_AVERAGE_ENTRY_SIZE = 1024; |
| private static final double AVERAGE_ENTRY_SIZE_RATIO = 0.8; |
| private ClientConfiguration conf; |
| |
| public LedgerFragmentReplicator(BookKeeper bkc, StatsLogger statsLogger, ClientConfiguration conf) { |
| this.bkc = bkc; |
| this.statsLogger = statsLogger; |
| numEntriesRead = this.statsLogger.getCounter(NUM_ENTRIES_READ); |
| numBytesRead = this.statsLogger.getOpStatsLogger(NUM_BYTES_READ); |
| numEntriesWritten = this.statsLogger.getCounter(NUM_ENTRIES_WRITTEN); |
| numBytesWritten = this.statsLogger.getOpStatsLogger(NUM_BYTES_WRITTEN); |
| readDataLatency = this.statsLogger.getOpStatsLogger(READ_DATA_LATENCY); |
| writeDataLatency = this.statsLogger.getOpStatsLogger(WRITE_DATA_LATENCY); |
| if (conf.getReplicationRateByBytes() > 0) { |
| this.replicationThrottle = new Throttler(conf.getReplicationRateByBytes()); |
| } |
| averageEntrySize = new AtomicInteger(INITIAL_AVERAGE_ENTRY_SIZE); |
| this.conf = conf; |
| } |
| |
| public LedgerFragmentReplicator(BookKeeper bkc, ClientConfiguration conf) { |
| this(bkc, NullStatsLogger.INSTANCE, conf); |
| } |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(LedgerFragmentReplicator.class); |
| |
| private void replicateFragmentInternal(final LedgerHandle lh, |
| final LedgerFragment lf, |
| final AsyncCallback.VoidCallback ledgerFragmentMcb, |
| final Set<BookieId> newBookies, |
| final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException { |
| if (!lf.isClosed()) { |
| LOG.error("Trying to replicate an unclosed fragment;" |
| + " This is not safe {}", lf); |
| ledgerFragmentMcb.processResult(BKException.Code.UnclosedFragmentException, |
| null, null); |
| return; |
| } |
| Long startEntryId = lf.getFirstStoredEntryId(); |
| Long endEntryId = lf.getLastStoredEntryId(); |
| |
| /* |
| * if startEntryId is INVALID_ENTRY_ID then endEntryId should be |
| * INVALID_ENTRY_ID and viceversa. |
| */ |
| if (startEntryId == INVALID_ENTRY_ID ^ endEntryId == INVALID_ENTRY_ID) { |
| LOG.error("For LedgerFragment: {}, seeing inconsistent firstStoredEntryId: {} and lastStoredEntryId: {}", |
| lf, startEntryId, endEntryId); |
| assert false; |
| } |
| |
| if (startEntryId > endEntryId || endEntryId <= INVALID_ENTRY_ID) { |
| // for open ledger which there is no entry, the start entry id is 0, |
| // the end entry id is -1. |
| // we can return immediately to trigger forward read |
| ledgerFragmentMcb.processResult(BKException.Code.OK, null, null); |
| return; |
| } |
| |
| /* |
| * Now asynchronously replicate all of the entries for the ledger |
| * fragment that were on the dead bookie. |
| */ |
| int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1); |
| MultiCallback ledgerFragmentEntryMcb = new MultiCallback( |
| entriesToReplicateCnt, ledgerFragmentMcb, null, BKException.Code.OK, |
| BKException.Code.LedgerRecoveryException); |
| if (this.replicationThrottle != null) { |
| this.replicationThrottle.resetRate(this.conf.getReplicationRateByBytes()); |
| } |
| |
| if (conf.isRecoveryBatchReadEnabled() |
| && conf.getUseV2WireProtocol() |
| && conf.isBatchReadEnabled() |
| && lh.getLedgerMetadata().getEnsembleSize() == lh.getLedgerMetadata().getWriteQuorumSize()) { |
| batchRecoverLedgerFragmentEntry(startEntryId, endEntryId, lh, ledgerFragmentEntryMcb, |
| newBookies, onReadEntryFailureCallback); |
| |
| } else { |
| /* |
| * Add all the entries to entriesToReplicate list from |
| * firstStoredEntryId to lastStoredEntryID. |
| */ |
| List<Long> entriesToReplicate = new LinkedList<Long>(); |
| long lastStoredEntryId = lf.getLastStoredEntryId(); |
| for (long i = lf.getFirstStoredEntryId(); i <= lastStoredEntryId; i++) { |
| entriesToReplicate.add(i); |
| } |
| for (final Long entryId : entriesToReplicate) { |
| recoverLedgerFragmentEntry(entryId, lh, ledgerFragmentEntryMcb, |
| newBookies, onReadEntryFailureCallback); |
| } |
| } |
| |
| } |
| |
| /** |
| * This method replicate a ledger fragment which is a contiguous portion of |
| * a ledger that was stored in an ensemble that included the failed bookie. |
| * It will Splits the fragment into multiple sub fragments by keeping the |
| * max entries up to the configured value of rereplicationEntryBatchSize and |
| * then it re-replicates that batched entry fragments one by one. After |
| * re-replication of all batched entry fragments, it will update the |
| * ensemble info with new Bookie once |
| * |
| * @param lh |
| * LedgerHandle for the ledger |
| * @param lf |
| * LedgerFragment to replicate |
| * @param ledgerFragmentMcb |
| * MultiCallback to invoke once we've recovered the current |
| * ledger fragment. |
| * @param targetBookieAddresses |
| * New bookies we want to use to recover and replicate the ledger |
| * entries that were stored on the failed bookie. |
| */ |
| void replicate(final LedgerHandle lh, final LedgerFragment lf, |
| final AsyncCallback.VoidCallback ledgerFragmentMcb, |
| final Set<BookieId> targetBookieAddresses, |
| final BiConsumer<Long, Long> onReadEntryFailureCallback) |
| throws InterruptedException { |
| Set<LedgerFragment> partionedFragments = splitIntoSubFragments(lh, lf, |
| bkc.getConf().getRereplicationEntryBatchSize()); |
| LOG.info("Replicating fragment {} in {} sub fragments.", |
| lf, partionedFragments.size()); |
| replicateNextBatch(lh, partionedFragments.iterator(), |
| ledgerFragmentMcb, targetBookieAddresses, onReadEntryFailureCallback); |
| } |
| |
| /** |
| * Replicate the batched entry fragments one after other. |
| */ |
| private void replicateNextBatch(final LedgerHandle lh, |
| final Iterator<LedgerFragment> fragments, |
| final AsyncCallback.VoidCallback ledgerFragmentMcb, |
| final Set<BookieId> targetBookieAddresses, |
| final BiConsumer<Long, Long> onReadEntryFailureCallback) { |
| if (fragments.hasNext()) { |
| try { |
| replicateFragmentInternal(lh, fragments.next(), |
| new AsyncCallback.VoidCallback() { |
| @Override |
| public void processResult(int rc, String v, Object ctx) { |
| if (rc != BKException.Code.OK) { |
| ledgerFragmentMcb.processResult(rc, null, |
| null); |
| } else { |
| replicateNextBatch(lh, fragments, |
| ledgerFragmentMcb, |
| targetBookieAddresses, |
| onReadEntryFailureCallback); |
| } |
| } |
| |
| }, targetBookieAddresses, onReadEntryFailureCallback); |
| } catch (InterruptedException e) { |
| ledgerFragmentMcb.processResult( |
| BKException.Code.InterruptedException, null, null); |
| Thread.currentThread().interrupt(); |
| } |
| } else { |
| ledgerFragmentMcb.processResult(BKException.Code.OK, null, null); |
| } |
| } |
| |
| /** |
| * Split the full fragment into batched entry fragments by keeping |
| * rereplicationEntryBatchSize of entries in each one and can treat them as |
| * sub fragments. |
| */ |
| static Set<LedgerFragment> splitIntoSubFragments(LedgerHandle lh, |
| LedgerFragment ledgerFragment, long rereplicationEntryBatchSize) { |
| Set<LedgerFragment> fragments = new HashSet<LedgerFragment>(); |
| if (rereplicationEntryBatchSize <= 0) { |
| // rereplicationEntryBatchSize can not be 0 or less than 0, |
| // returning with the current fragment |
| fragments.add(ledgerFragment); |
| return fragments; |
| } |
| |
| long firstEntryId = ledgerFragment.getFirstStoredEntryId(); |
| long lastEntryId = ledgerFragment.getLastStoredEntryId(); |
| |
| /* |
| * if firstEntryId is INVALID_ENTRY_ID then lastEntryId should be |
| * INVALID_ENTRY_ID and viceversa. |
| */ |
| if (firstEntryId == INVALID_ENTRY_ID ^ lastEntryId == INVALID_ENTRY_ID) { |
| LOG.error("For LedgerFragment: {}, seeing inconsistent firstStoredEntryId: {} and lastStoredEntryId: {}", |
| ledgerFragment, firstEntryId, lastEntryId); |
| assert false; |
| } |
| |
| long numberOfEntriesToReplicate = firstEntryId == INVALID_ENTRY_ID ? 0 : (lastEntryId - firstEntryId) + 1; |
| long splitsWithFullEntries = numberOfEntriesToReplicate |
| / rereplicationEntryBatchSize; |
| |
| if (splitsWithFullEntries == 0) {// only one fragment |
| fragments.add(ledgerFragment); |
| return fragments; |
| } |
| |
| long fragmentSplitLastEntry = 0; |
| for (int i = 0; i < splitsWithFullEntries; i++) { |
| fragmentSplitLastEntry = (firstEntryId + rereplicationEntryBatchSize) - 1; |
| fragments.add(new LedgerFragment(lh, firstEntryId, |
| fragmentSplitLastEntry, ledgerFragment.getBookiesIndexes())); |
| firstEntryId = fragmentSplitLastEntry + 1; |
| } |
| |
| long lastSplitWithPartialEntries = numberOfEntriesToReplicate |
| % rereplicationEntryBatchSize; |
| if (lastSplitWithPartialEntries > 0) { |
| fragments.add(new LedgerFragment(lh, firstEntryId, firstEntryId |
| + lastSplitWithPartialEntries - 1, ledgerFragment |
| .getBookiesIndexes())); |
| } |
| return fragments; |
| } |
| |
| /** |
| * This method asynchronously recovers a specific ledger entry by reading |
| * the values via the BookKeeper Client (which would read it from the other |
| * replicas) and then writing it to the chosen new bookie. |
| * |
| * @param entryId |
| * Ledger Entry ID to recover. |
| * @param lh |
| * LedgerHandle for the ledger |
| * @param ledgerFragmentEntryMcb |
| * MultiCallback to invoke once we've recovered the current |
| * ledger entry. |
| * @param newBookies |
| * New bookies we want to use to recover and replicate the ledger |
| * entries that were stored on the failed bookie. |
| */ |
| void recoverLedgerFragmentEntry(final Long entryId, |
| final LedgerHandle lh, |
| final AsyncCallback.VoidCallback ledgerFragmentEntryMcb, |
| final Set<BookieId> newBookies, |
| final BiConsumer<Long, Long> onReadEntryFailureCallback) throws InterruptedException { |
| final long ledgerId = lh.getId(); |
| final AtomicInteger numCompleted = new AtomicInteger(0); |
| final AtomicBoolean completed = new AtomicBoolean(false); |
| |
| if (replicationThrottle != null) { |
| replicationThrottle.acquire(averageEntrySize.get()); |
| } |
| |
| final WriteCallback multiWriteCallback = new WriteCallback() { |
| @Override |
| public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) { |
| if (rc != BKException.Code.OK) { |
| LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}", |
| ledgerId, entryId, addr, BKException.create(rc)); |
| if (completed.compareAndSet(false, true)) { |
| ledgerFragmentEntryMcb.processResult(rc, null, null); |
| } |
| } else { |
| numEntriesWritten.inc(); |
| if (ctx instanceof Long) { |
| numBytesWritten.registerSuccessfulValue((Long) ctx); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!", |
| ledgerId, entryId, addr); |
| } |
| if (numCompleted.incrementAndGet() == newBookies.size() && completed.compareAndSet(false, true)) { |
| ledgerFragmentEntryMcb.processResult(rc, null, null); |
| } |
| } |
| } |
| }; |
| |
| long startReadEntryTime = MathUtils.nowInNano(); |
| /* |
| * Read the ledger entry using the LedgerHandle. This will allow us to |
| * read the entry from one of the other replicated bookies other than |
| * the dead one. |
| */ |
| lh.asyncReadEntries(entryId, entryId, new ReadCallback() { |
| @Override |
| public void readComplete(int rc, LedgerHandle lh, |
| Enumeration<LedgerEntry> seq, Object ctx) { |
| if (rc != BKException.Code.OK) { |
| LOG.error("BK error reading ledger entry: " + entryId, |
| BKException.create(rc)); |
| onReadEntryFailureCallback.accept(ledgerId, entryId); |
| ledgerFragmentEntryMcb.processResult(rc, null, null); |
| return; |
| } |
| |
| readDataLatency.registerSuccessfulEvent(MathUtils.elapsedNanos(startReadEntryTime), |
| TimeUnit.NANOSECONDS); |
| |
| /* |
| * Now that we've read the ledger entry, write it to the new |
| * bookie we've selected. |
| */ |
| LedgerEntry entry = seq.nextElement(); |
| byte[] data = entry.getEntry(); |
| final long dataLength = data.length; |
| numEntriesRead.inc(); |
| numBytesRead.registerSuccessfulValue(dataLength); |
| |
| ReferenceCounted toSend = lh.getDigestManager() |
| .computeDigestAndPackageForSending(entryId, |
| lh.getLastAddConfirmed(), entry.getLength(), |
| Unpooled.wrappedBuffer(data, 0, data.length), |
| lh.getLedgerKey(), |
| BookieProtocol.FLAG_RECOVERY_ADD |
| ); |
| if (replicationThrottle != null) { |
| if (toSend instanceof ByteBuf) { |
| updateAverageEntrySize(((ByteBuf) toSend).readableBytes()); |
| } else if (toSend instanceof ByteBufList) { |
| updateAverageEntrySize(((ByteBufList) toSend).readableBytes()); |
| } |
| } |
| for (BookieId newBookie : newBookies) { |
| long startWriteEntryTime = MathUtils.nowInNano(); |
| bkc.getBookieClient().addEntry(newBookie, lh.getId(), |
| lh.getLedgerKey(), entryId, toSend, |
| multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD, |
| false, WriteFlag.NONE); |
| writeDataLatency.registerSuccessfulEvent( |
| MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS); |
| } |
| toSend.release(); |
| } |
| }, null); |
| } |
| |
| void batchRecoverLedgerFragmentEntry(final long startEntryId, |
| final long endEntryId, |
| final LedgerHandle lh, |
| final AsyncCallback.VoidCallback ledgerFragmentMcb, |
| final Set<BookieId> newBookies, |
| final BiConsumer<Long, Long> onReadEntryFailureCallback) |
| throws InterruptedException { |
| int entriesToReplicateCnt = (int) (endEntryId - startEntryId + 1); |
| int maxBytesToReplicate = conf.getReplicationRateByBytes(); |
| if (replicationThrottle != null) { |
| if (maxBytesToReplicate != -1 && maxBytesToReplicate > averageEntrySize.get() * entriesToReplicateCnt) { |
| maxBytesToReplicate = averageEntrySize.get() * entriesToReplicateCnt; |
| } |
| replicationThrottle.acquire(maxBytesToReplicate); |
| } |
| |
| lh.asyncBatchReadEntries(startEntryId, entriesToReplicateCnt, maxBytesToReplicate, |
| new ReadCallback() { |
| @Override |
| public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) { |
| if (rc != BKException.Code.OK) { |
| LOG.error("BK error reading ledger entries: {} - {}", |
| startEntryId, endEntryId, BKException.create(rc)); |
| onReadEntryFailureCallback.accept(lh.getId(), startEntryId); |
| for (int i = 0; i < entriesToReplicateCnt; i++) { |
| ledgerFragmentMcb.processResult(rc, null, null); |
| } |
| return; |
| } |
| long lastEntryId = startEntryId; |
| while (seq.hasMoreElements()) { |
| LedgerEntry entry = seq.nextElement(); |
| lastEntryId = entry.getEntryId(); |
| byte[] data = entry.getEntry(); |
| final long dataLength = data.length; |
| numEntriesRead.inc(); |
| numBytesRead.registerSuccessfulValue(dataLength); |
| |
| ReferenceCounted toSend = lh.getDigestManager() |
| .computeDigestAndPackageForSending(entry.getEntryId(), |
| lh.getLastAddConfirmed(), entry.getLength(), |
| Unpooled.wrappedBuffer(data, 0, data.length), |
| lh.getLedgerKey(), |
| BookieProtocol.FLAG_RECOVERY_ADD); |
| if (replicationThrottle != null) { |
| if (toSend instanceof ByteBuf) { |
| updateAverageEntrySize(((ByteBuf) toSend).readableBytes()); |
| } else if (toSend instanceof ByteBufList) { |
| updateAverageEntrySize(((ByteBufList) toSend).readableBytes()); |
| } |
| } |
| AtomicInteger numCompleted = new AtomicInteger(0); |
| AtomicBoolean completed = new AtomicBoolean(false); |
| |
| WriteCallback multiWriteCallback = new WriteCallback() { |
| @Override |
| public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) { |
| if (rc != BKException.Code.OK) { |
| LOG.error("BK error writing entry for ledgerId: {}, entryId: {}, bookie: {}", |
| ledgerId, entryId, addr, BKException.create(rc)); |
| if (completed.compareAndSet(false, true)) { |
| ledgerFragmentMcb.processResult(rc, null, null); |
| } |
| } else { |
| numEntriesWritten.inc(); |
| if (ctx instanceof Long) { |
| numBytesWritten.registerSuccessfulValue((Long) ctx); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Success writing ledger id {}, entry id {} to a new bookie {}!", |
| ledgerId, entryId, addr); |
| } |
| if (numCompleted.incrementAndGet() == newBookies.size() |
| && completed.compareAndSet(false, true)) { |
| ledgerFragmentMcb.processResult(rc, null, null); |
| } |
| } |
| } |
| }; |
| |
| for (BookieId newBookie : newBookies) { |
| long startWriteEntryTime = MathUtils.nowInNano(); |
| bkc.getBookieClient().addEntry(newBookie, lh.getId(), |
| lh.getLedgerKey(), entry.getEntryId(), toSend, |
| multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD, |
| false, WriteFlag.NONE); |
| writeDataLatency.registerSuccessfulEvent( |
| MathUtils.elapsedNanos(startWriteEntryTime), TimeUnit.NANOSECONDS); |
| } |
| toSend.release(); |
| } |
| if (lastEntryId != endEntryId) { |
| try { |
| batchRecoverLedgerFragmentEntry(lastEntryId + 1, endEntryId, lh, |
| ledgerFragmentMcb, newBookies, onReadEntryFailureCallback); |
| } catch (InterruptedException e) { |
| int remainingEntries = (int) (endEntryId - lastEntryId); |
| for (int i = 0; i < remainingEntries; i++) { |
| ledgerFragmentMcb.processResult(BKException.Code.InterruptedException, null, null); |
| } |
| } |
| } |
| } |
| }, null); |
| } |
| |
| private void updateAverageEntrySize(int toSendSize) { |
| averageEntrySize.updateAndGet(value -> (int) (value * AVERAGE_ENTRY_SIZE_RATIO |
| + (1 - AVERAGE_ENTRY_SIZE_RATIO) * toSendSize)); |
| } |
| |
| /** |
| * Callback for recovery of a single ledger fragment. Once the fragment has |
| * had all entries replicated, update the ensemble in zookeeper. Once |
| * finished propogate callback up to ledgerFragmentsMcb which should be a |
| * multicallback responsible for all fragments in a single ledger |
| */ |
| static class SingleFragmentCallback implements AsyncCallback.VoidCallback { |
| final AsyncCallback.VoidCallback ledgerFragmentsMcb; |
| final LedgerHandle lh; |
| final LedgerManager ledgerManager; |
| final long fragmentStartId; |
| final Map<BookieId, BookieId> oldBookie2NewBookie; |
| |
| SingleFragmentCallback(AsyncCallback.VoidCallback ledgerFragmentsMcb, |
| LedgerHandle lh, LedgerManager ledgerManager, long fragmentStartId, |
| Map<BookieId, BookieId> oldBookie2NewBookie) { |
| this.ledgerFragmentsMcb = ledgerFragmentsMcb; |
| this.lh = lh; |
| this.ledgerManager = ledgerManager; |
| this.fragmentStartId = fragmentStartId; |
| this.oldBookie2NewBookie = oldBookie2NewBookie; |
| } |
| |
| @Override |
| public void processResult(int rc, String path, Object ctx) { |
| if (rc != BKException.Code.OK) { |
| LOG.error("BK error replicating ledger fragments for ledger: " |
| + lh.getId(), BKException.create(rc)); |
| ledgerFragmentsMcb.processResult(rc, null, null); |
| return; |
| } |
| updateEnsembleInfo(ledgerManager, ledgerFragmentsMcb, fragmentStartId, lh, oldBookie2NewBookie); |
| } |
| } |
| |
| /** |
| * Updates the ensemble with newBookie and notify the ensembleUpdatedCb. |
| */ |
| private static void updateEnsembleInfo( |
| LedgerManager ledgerManager, AsyncCallback.VoidCallback ensembleUpdatedCb, long fragmentStartId, |
| LedgerHandle lh, Map<BookieId, BookieId> oldBookie2NewBookie) { |
| |
| MetadataUpdateLoop updateLoop = new MetadataUpdateLoop( |
| ledgerManager, |
| lh.getId(), |
| lh::getVersionedLedgerMetadata, |
| (metadata) -> { |
| // returns true if any of old bookies exist in ensemble |
| List<BookieId> ensemble = metadata.getAllEnsembles().get(fragmentStartId); |
| return oldBookie2NewBookie.keySet().stream().anyMatch(ensemble::contains); |
| }, |
| (currentMetadata) -> { |
| // replace all old bookies with new bookies in ensemble |
| List<BookieId> newEnsemble = currentMetadata.getAllEnsembles().get(fragmentStartId) |
| .stream().map((bookie) -> oldBookie2NewBookie.getOrDefault(bookie, bookie)) |
| .collect(Collectors.toList()); |
| return LedgerMetadataBuilder.from(currentMetadata) |
| .replaceEnsembleEntry(fragmentStartId, newEnsemble).build(); |
| }, |
| lh::setLedgerMetadata); |
| |
| updateLoop.run().whenComplete((result, ex) -> { |
| if (ex == null) { |
| LOG.info("Updated ZK to point ledger fragments" |
| + " from old bookies to new bookies: {}", oldBookie2NewBookie); |
| |
| ensembleUpdatedCb.processResult(BKException.Code.OK, null, null); |
| } else { |
| LOG.error("Error updating ledger config metadata for ledgerId {}", lh.getId(), ex); |
| |
| ensembleUpdatedCb.processResult( |
| BKException.getExceptionCode(ex, BKException.Code.UnexpectedConditionException), |
| null, null); |
| } |
| }); |
| } |
| |
| static class Throttler { |
| private final RateLimiter rateLimiter; |
| |
| Throttler(int throttleBytes) { |
| this.rateLimiter = RateLimiter.create(throttleBytes); |
| } |
| |
| // reset rate of limiter before compact one entry log file |
| void resetRate(int throttleBytes) { |
| this.rateLimiter.setRate(throttleBytes); |
| } |
| |
| // get rate of limiter for unit test |
| double getRate() { |
| return this.rateLimiter.getRate(); |
| } |
| |
| // acquire. if bybytes: bytes of this entry; if byentries: 1. |
| void acquire(int permits) { |
| rateLimiter.acquire(permits); |
| } |
| } |
| } |