| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.bookkeeper.client; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Objects; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.RateLimiter; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| import java.security.GeneralSecurityException; |
| import java.security.NoSuchAlgorithmException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Enumeration; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.bookkeeper.client.AsyncCallback.AddCallback; |
| import org.apache.bookkeeper.client.AsyncCallback.AddLacCallback; |
| import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; |
| import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; |
| import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback; |
| import org.apache.bookkeeper.client.BookKeeper.DigestType; |
| import org.apache.bookkeeper.net.BookieSocketAddress; |
| import org.apache.bookkeeper.proto.BookieProtocol; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback; |
| import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State; |
| import org.apache.bookkeeper.stats.Counter; |
| import org.apache.bookkeeper.stats.Gauge; |
| import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback; |
| import org.apache.bookkeeper.util.SafeRunnable; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Ledger handle contains ledger metadata and is used to access the read and |
| * write operations to a ledger. |
| */ |
| public class LedgerHandle implements AutoCloseable { |
| final static Logger LOG = LoggerFactory.getLogger(LedgerHandle.class); |
| |
| final byte[] ledgerKey; |
| LedgerMetadata metadata; |
| final BookKeeper bk; |
| final long ledgerId; |
| long lastAddPushed; |
| volatile long lastAddConfirmed; |
| |
| long length; |
| final DigestManager macManager; |
| final DistributionSchedule distributionSchedule; |
| final RateLimiter throttler; |
| final LoadingCache<BookieSocketAddress, Long> bookieFailureHistory; |
| final boolean enableParallelRecoveryRead; |
| final int recoveryReadBatchSize; |
| |
| /** |
| * Invalid entry id. This value is returned from methods which |
| * should return an entry id but there is no valid entry available. |
| */ |
| final static public long INVALID_ENTRY_ID = BookieProtocol.INVALID_ENTRY_ID; |
| |
| final AtomicInteger blockAddCompletions = new AtomicInteger(0); |
| final AtomicInteger numEnsembleChanges = new AtomicInteger(0); |
| Queue<PendingAddOp> pendingAddOps; |
| ExplicitLacFlushPolicy explicitLacFlushPolicy; |
| |
| final Counter ensembleChangeCounter; |
| final Counter lacUpdateHitsCounter; |
| final Counter lacUpdateMissesCounter; |
| |
| // This empty master key is used when an empty password is provided which is the hash of an empty string |
| private final static byte[] emptyLedgerKey; |
| static { |
| try { |
| emptyLedgerKey = MacDigestManager.genDigest("ledger", new byte[0]); |
| } catch (NoSuchAlgorithmException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| LedgerHandle(BookKeeper bk, long ledgerId, LedgerMetadata metadata, |
| DigestType digestType, byte[] password) |
| throws GeneralSecurityException, NumberFormatException { |
| this.bk = bk; |
| this.metadata = metadata; |
| this.pendingAddOps = new ConcurrentLinkedQueue<PendingAddOp>(); |
| this.enableParallelRecoveryRead = bk.getConf().getEnableParallelRecoveryRead(); |
| this.recoveryReadBatchSize = bk.getConf().getRecoveryReadBatchSize(); |
| |
| if (metadata.isClosed()) { |
| lastAddConfirmed = lastAddPushed = metadata.getLastEntryId(); |
| length = metadata.getLength(); |
| } else { |
| lastAddConfirmed = lastAddPushed = INVALID_ENTRY_ID; |
| length = 0; |
| } |
| |
| this.ledgerId = ledgerId; |
| |
| if (bk.getConf().getThrottleValue() > 0) { |
| this.throttler = RateLimiter.create(bk.getConf().getThrottleValue()); |
| } else { |
| this.throttler = null; |
| } |
| |
| macManager = DigestManager.instantiate(ledgerId, password, digestType); |
| |
| // If the password is empty, pass the same random ledger key which is generated by the hash of the empty |
| // password, so that the bookie can avoid processing the keys for each entry |
| this.ledgerKey = password.length > 0 ? MacDigestManager.genDigest("ledger", password) : emptyLedgerKey; |
| distributionSchedule = new RoundRobinDistributionSchedule( |
| metadata.getWriteQuorumSize(), metadata.getAckQuorumSize(), metadata.getEnsembleSize()); |
| this.bookieFailureHistory = CacheBuilder.newBuilder() |
| .expireAfterWrite(bk.getConf().getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS) |
| .build(new CacheLoader<BookieSocketAddress, Long>() { |
| public Long load(BookieSocketAddress key) { |
| return -1L; |
| } |
| }); |
| |
| ensembleChangeCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.ENSEMBLE_CHANGES); |
| lacUpdateHitsCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_HITS); |
| lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES); |
| bk.getStatsLogger().registerGauge(BookKeeperClientStats.PENDING_ADDS, |
| new Gauge<Integer>() { |
| public Integer getDefaultValue() { |
| return 0; |
| } |
| public Integer getSample() { |
| return pendingAddOps.size(); |
| } |
| }); |
| initializeExplicitLacFlushPolicy(); |
| } |
| |
| protected void initializeExplicitLacFlushPolicy() { |
| if (!metadata.isClosed() && bk.getExplicitLacInterval() > 0) { |
| explicitLacFlushPolicy = new ExplicitLacFlushPolicy.ExplicitLacFlushPolicyImpl(this); |
| } else { |
| explicitLacFlushPolicy = ExplicitLacFlushPolicy.VOID_EXPLICITLAC_FLUSH_POLICY; |
| } |
| } |
| |
| /** |
| * Get the id of the current ledger |
| * |
| * @return the id of the ledger |
| */ |
| public long getId() { |
| return ledgerId; |
| } |
| |
| /** |
| * Get the last confirmed entry id on this ledger. It reads |
| * the local state of the ledger handle, which is different |
| * from the readLastConfirmed call. In the case the ledger |
| * is not closed and the client is a reader, it is necessary |
| * to call readLastConfirmed to obtain an estimate of the |
| * last add operation that has been confirmed. |
| * |
| * @see #readLastConfirmed() |
| * |
| * @return the last confirmed entry id or {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID} if no entry has been confirmed |
| */ |
| public synchronized long getLastAddConfirmed() { |
| return lastAddConfirmed; |
| } |
| |
| synchronized void setLastAddConfirmed(long lac) { |
| this.lastAddConfirmed = lac; |
| } |
| |
| /** |
| * Get the entry id of the last entry that has been enqueued for addition (but |
| * may not have possibly been persited to the ledger) |
| * |
| * @return the id of the last entry pushed or {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID} if no entry has been pushed |
| */ |
| synchronized public long getLastAddPushed() { |
| return lastAddPushed; |
| } |
| |
| /** |
| * Get the Ledger's key/password. |
| * |
| * @return byte array for the ledger's key/password. |
| */ |
| public byte[] getLedgerKey() { |
| return Arrays.copyOf(ledgerKey, ledgerKey.length); |
| } |
| |
| /** |
| * Get the LedgerMetadata |
| * |
| * @return LedgerMetadata for the LedgerHandle |
| */ |
| LedgerMetadata getLedgerMetadata() { |
| return metadata; |
| } |
| |
| /** |
| * Get this ledger's customMetadata map. |
| * |
| * @return map containing user provided customMetadata. |
| */ |
| public Map<String, byte[]> getCustomMetadata() { |
| return metadata.getCustomMetadata(); |
| } |
| |
| /** |
| * Get the number of fragments that makeup this ledger |
| * |
| * @return the count of fragments |
| */ |
| synchronized public long getNumFragments() { |
| return metadata.getEnsembles().size(); |
| } |
| |
| /** |
| * Get the count of unique bookies that own part of this ledger |
| * by going over all the fragments of the ledger. |
| * |
| * @return count of unique bookies |
| */ |
| synchronized public long getNumBookies() { |
| Map<Long, ArrayList<BookieSocketAddress>> m = metadata.getEnsembles(); |
| Set<BookieSocketAddress> s = Sets.newHashSet(); |
| for (ArrayList<BookieSocketAddress> aList : m.values()) { |
| s.addAll(aList); |
| } |
| return s.size(); |
| } |
| |
| /** |
| * Get the DigestManager |
| * |
| * @return DigestManager for the LedgerHandle |
| */ |
| DigestManager getDigestManager() { |
| return macManager; |
| } |
| |
| /** |
| * Add to the length of the ledger in bytes. |
| * |
| * @param delta |
| * @return the length of the ledger after the addition |
| */ |
| synchronized long addToLength(long delta) { |
| this.length += delta; |
| return this.length; |
| } |
| |
| /** |
| * Returns the length of the ledger in bytes. |
| * |
| * @return the length of the ledger in bytes |
| */ |
| synchronized public long getLength() { |
| return this.length; |
| } |
| |
| /** |
| * Get the Distribution Schedule |
| * |
| * @return DistributionSchedule for the LedgerHandle |
| */ |
| DistributionSchedule getDistributionSchedule() { |
| return distributionSchedule; |
| } |
| |
| void writeLedgerConfig(GenericCallback<Void> writeCb) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Writing metadata to ledger manager: {}, {}", this.ledgerId, metadata.getVersion()); |
| } |
| |
| bk.getLedgerManager().writeLedgerMetadata(ledgerId, metadata, writeCb); |
| } |
| |
| /** |
| * Close this ledger synchronously. |
| * @see #asyncClose |
| */ |
| public void close() |
| throws InterruptedException, BKException { |
| CompletableFuture<Void> counter = new CompletableFuture<>(); |
| |
| asyncClose(new SyncCloseCallback(), counter); |
| |
| explicitLacFlushPolicy.stopExplicitLacFlush(); |
| |
| SynchCallbackUtils.waitForResult(counter); |
| } |
| |
| /** |
| * Asynchronous close, any adds in flight will return errors. |
| * |
| * Closing a ledger will ensure that all clients agree on what the last entry |
| * of the ledger is. This ensures that, once the ledger has been closed, all |
| * reads from the ledger will return the same set of entries. |
| * |
| * @param cb |
| * callback implementation |
| * @param ctx |
| * control object |
| */ |
| public void asyncClose(CloseCallback cb, Object ctx) { |
| asyncCloseInternal(cb, ctx, BKException.Code.LedgerClosedException); |
| } |
| |
| /** |
| * Has the ledger been closed? |
| */ |
| public synchronized boolean isClosed() { |
| return metadata.isClosed(); |
| } |
| |
| void asyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) { |
| try { |
| doAsyncCloseInternal(cb, ctx, rc); |
| } catch (RejectedExecutionException re) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Failed to close ledger {} : ", ledgerId, re); |
| } |
| errorOutPendingAdds(bk.getReturnRc(rc)); |
| cb.closeComplete(bk.getReturnRc(BKException.Code.InterruptedException), this, ctx); |
| } |
| } |
| |
| /** |
| * Same as public version of asyncClose except that this one takes an |
| * additional parameter which is the return code to hand to all the pending |
| * add ops |
| * |
| * @param cb |
| * @param ctx |
| * @param rc |
| */ |
| void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) { |
| bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| final long prevLastEntryId; |
| final long prevLength; |
| final State prevState; |
| List<PendingAddOp> pendingAdds; |
| |
| if (isClosed()) { |
| // TODO: make ledger metadata immutable {@link https://github.com/apache/bookkeeper/issues/281} |
| // Although the metadata is already closed, we don't need to proceed zookeeper metadata update, but |
| // we still need to error out the pending add ops. |
| // |
| // There is a race condition a pending add op is enqueued, after a close op reset ledger metadata state |
| // to unclosed to resolve metadata conflicts. If we don't error out these pending add ops, they would be |
| // leak and never callback. |
| // |
| // The race condition happen in following sequence: |
| // a) ledger L is fenced |
| // b) write entry E encountered LedgerFencedException, trigger ledger close procedure |
| // c) ledger close encountered metadata version exception and set ledger metadata back to open |
| // d) writer tries to write entry E+1, since ledger metadata is still open (reset by c)) |
| // e) the close procedure in c) resolved the metadata conflicts and set ledger metadata to closed |
| // f) writing entry E+1 encountered LedgerFencedException which will enter ledger close procedure |
| // g) it would find that ledger metadata is closed, then it callbacks immediately without erroring out any pendings |
| synchronized (LedgerHandle.this) { |
| pendingAdds = drainPendingAddsToErrorOut(); |
| } |
| errorOutPendingAdds(rc, pendingAdds); |
| cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx); |
| return; |
| } |
| |
| synchronized(LedgerHandle.this) { |
| prevState = metadata.getState(); |
| prevLastEntryId = metadata.getLastEntryId(); |
| prevLength = metadata.getLength(); |
| |
| // drain pending adds first |
| pendingAdds = drainPendingAddsToErrorOut(); |
| |
| // synchronized on LedgerHandle.this to ensure that |
| // lastAddPushed can not be updated after the metadata |
| // is closed. |
| metadata.setLength(length); |
| metadata.close(lastAddConfirmed); |
| lastAddPushed = lastAddConfirmed; |
| } |
| |
| // error out all pending adds during closing, the callbacks shouldn't be |
| // running under any bk locks. |
| errorOutPendingAdds(rc, pendingAdds); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Closing ledger: " + ledgerId + " at entryId: " |
| + metadata.getLastEntryId() + " with this many bytes: " + metadata.getLength()); |
| } |
| |
| final class CloseCb extends OrderedSafeGenericCallback<Void> { |
| CloseCb() { |
| super(bk.mainWorkerPool, ledgerId); |
| } |
| |
| @Override |
| public void safeOperationComplete(final int rc, Void result) { |
| if (rc == BKException.Code.MetadataVersionException) { |
| rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.mainWorkerPool, |
| ledgerId) { |
| @Override |
| public void safeOperationComplete(int newrc, LedgerMetadata newMeta) { |
| if (newrc != BKException.Code.OK) { |
| LOG.error("Error reading new metadata from ledger {} when closing, code={}", |
| ledgerId, newrc); |
| cb.closeComplete(rc, LedgerHandle.this, ctx); |
| } else { |
| metadata.setState(prevState); |
| if (prevState.equals(State.CLOSED)) { |
| metadata.close(prevLastEntryId); |
| } |
| |
| metadata.setLength(prevLength); |
| if (!metadata.isNewerThan(newMeta) |
| && !metadata.isConflictWith(newMeta)) { |
| // use the new metadata's ensemble, in case re-replication already |
| // replaced some bookies in the ensemble. |
| metadata.setEnsembles(newMeta.getEnsembles()); |
| metadata.setVersion(newMeta.version); |
| metadata.setLength(length); |
| metadata.close(getLastAddConfirmed()); |
| writeLedgerConfig(new CloseCb()); |
| return; |
| } else { |
| metadata.setLength(length); |
| metadata.close(getLastAddConfirmed()); |
| LOG.warn("Conditional update ledger metadata for ledger {} failed.", ledgerId); |
| cb.closeComplete(rc, LedgerHandle.this, ctx); |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("ReReadMetadataForClose(%d)", ledgerId); |
| } |
| }); |
| } else if (rc != BKException.Code.OK) { |
| LOG.error("Error update ledger metadata for ledger {} : {}", ledgerId, rc); |
| cb.closeComplete(rc, LedgerHandle.this, ctx); |
| } else { |
| cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("WriteLedgerConfigForClose(%d)", ledgerId); |
| } |
| } |
| |
| writeLedgerConfig(new CloseCb()); |
| |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("CloseLedgerHandle(%d)", ledgerId); |
| } |
| }); |
| } |
| |
| /** |
| * Read a sequence of entries synchronously. |
| * |
| * @param firstEntry |
| * id of first entry of sequence (included) |
| * @param lastEntry |
| * id of last entry of sequence (included) |
| * |
| * @see #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) |
| */ |
| public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry) |
| throws InterruptedException, BKException { |
| CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>(); |
| |
| asyncReadEntries(firstEntry, lastEntry, new SyncReadCallback(), counter); |
| |
| return SynchCallbackUtils.waitForResult(counter); |
| } |
| |
| /** |
| * Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.<br> |
| * This is the same of |
| * {@link #asyncReadUnconfirmedEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) } |
| * |
| * @param firstEntry |
| * id of first entry of sequence (included) |
| * @param lastEntry |
| * id of last entry of sequence (included) |
| * |
| * @see #readEntries(long, long) |
| * @see #asyncReadUnconfirmedEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) |
| * @see #asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, java.lang.Object) |
| */ |
| public Enumeration<LedgerEntry> readUnconfirmedEntries(long firstEntry, long lastEntry) |
| throws InterruptedException, BKException { |
| CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>(); |
| |
| asyncReadUnconfirmedEntries(firstEntry, lastEntry, new SyncReadCallback(), counter); |
| |
| return SynchCallbackUtils.waitForResult(counter); |
| } |
| |
| /** |
| * Read a sequence of entries asynchronously. |
| * |
| * @param firstEntry |
| * id of first entry of sequence |
| * @param lastEntry |
| * id of last entry of sequence |
| * @param cb |
| * object implementing read callback interface |
| * @param ctx |
| * control object |
| */ |
| public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) { |
| // Little sanity check |
| if (firstEntry < 0 || firstEntry > lastEntry) { |
| LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", |
| new Object[] { ledgerId, firstEntry, lastEntry }); |
| cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx); |
| return; |
| } |
| |
| if (lastEntry > lastAddConfirmed) { |
| LOG.error("ReadException on ledgerId:{} firstEntry:{} lastEntry:{}", |
| new Object[] { ledgerId, firstEntry, lastEntry }); |
| cb.readComplete(BKException.Code.ReadException, this, null, ctx); |
| return; |
| } |
| |
| asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx); |
| } |
| |
| /** |
| * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. |
| * <br>This is the same of |
| * {@link #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) } |
| * but it lets the client read without checking the local value of LastAddConfirmed, so that it is possibile to |
| * read entries for which the writer has not received the acknowledge yet. <br> |
| * For entries which are within the range 0..LastAddConfirmed BookKeeper guarantees that the writer has successfully |
| * received the acknowledge.<br> |
| * For entries outside that range it is possible that the writer never received the acknowledge |
| * and so there is the risk that the reader is seeing entries before the writer and this could result in a consistency |
| * issue in some cases.<br> |
| * With this method you can even read entries before the LastAddConfirmed and entries after it with one call, |
| * the expected consistency will be as described above for each subrange of ids. |
| * |
| * @param firstEntry |
| * id of first entry of sequence |
| * @param lastEntry |
| * id of last entry of sequence |
| * @param cb |
| * object implementing read callback interface |
| * @param ctx |
| * control object |
| * |
| * @see #asyncReadEntries(long, long, org.apache.bookkeeper.client.AsyncCallback.ReadCallback, java.lang.Object) |
| * @see #asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, java.lang.Object) |
| * @see #readUnconfirmedEntries(long, long) |
| */ |
| public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) { |
| // Little sanity check |
| if (firstEntry < 0 || firstEntry > lastEntry) { |
| LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{} lastEntry:{}", |
| new Object[] { ledgerId, firstEntry, lastEntry }); |
| cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx); |
| return; |
| } |
| |
| asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx); |
| } |
| |
| void asyncReadEntriesInternal(long firstEntry, long lastEntry, ReadCallback cb, Object ctx) { |
| new PendingReadOp(this, bk.scheduler, |
| firstEntry, lastEntry, cb, ctx).initiate(); |
| } |
| |
| /** |
| * Add entry synchronously to an open ledger. |
| * |
| * @param data |
| * array of bytes to be written to the ledger |
| * @return the entryId of the new inserted entry |
| */ |
| public long addEntry(byte[] data) throws InterruptedException, BKException { |
| return addEntry(data, 0, data.length); |
| } |
| |
| /** |
| * Add entry synchronously to an open ledger. This can be used only with |
| * {@link LedgerHandleAdv} returned through ledgers created with {@link |
| * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}. |
| * |
| * |
| * @param entryId |
| * entryId to be added |
| * @param data |
| * array of bytes to be written to the ledger |
| * @return the entryId of the new inserted entry |
| */ |
| public long addEntry(final long entryId, byte[] data) throws InterruptedException, BKException { |
| LOG.error("To use this feature Ledger must be created with createLedgerAdv interface."); |
| throw BKException.create(BKException.Code.IllegalOpException); |
| } |
| |
| /** |
| * Add entry synchronously to an open ledger. |
| * |
| * @param data |
| * array of bytes to be written to the ledger |
| * @param offset |
| * offset from which to take bytes from data |
| * @param length |
| * number of bytes to take from data |
| * @return the entryId of the new inserted entry |
| */ |
| public long addEntry(byte[] data, int offset, int length) |
| throws InterruptedException, BKException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Adding entry {}", data); |
| } |
| |
| CompletableFuture<Long> counter = new CompletableFuture<>(); |
| |
| SyncAddCallback callback = new SyncAddCallback(); |
| asyncAddEntry(data, offset, length, callback, counter); |
| |
| return SynchCallbackUtils.waitForResult(counter); |
| } |
| |
| /** |
| * Add entry synchronously to an open ledger. This can be used only with |
| * {@link LedgerHandleAdv} returned through ledgers created with {@link |
| * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}. |
| * |
| * @param entryId |
| * entryId to be added. |
| * @param data |
| * array of bytes to be written to the ledger |
| * @param offset |
| * offset from which to take bytes from data |
| * @param length |
| * number of bytes to take from data |
| * @return entryId |
| */ |
| public long addEntry(final long entryId, byte[] data, int offset, int length) throws InterruptedException, |
| BKException { |
| LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface."); |
| throw BKException.create(BKException.Code.IllegalOpException); |
| } |
| |
| /** |
| * Add entry asynchronously to an open ledger. |
| * |
| * @param data |
| * array of bytes to be written |
| * @param cb |
| * object implementing callbackinterface |
| * @param ctx |
| * some control object |
| */ |
| public void asyncAddEntry(final byte[] data, final AddCallback cb, |
| final Object ctx) { |
| asyncAddEntry(data, 0, data.length, cb, ctx); |
| } |
| |
| /** |
| * Add entry asynchronously to an open ledger. This can be used only with |
| * {@link LedgerHandleAdv} returned through ledgers created with {@link |
| * BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}. |
| * |
| * @param entryId |
| * entryId to be added |
| * @param data |
| * array of bytes to be written |
| * @param cb |
| * object implementing callbackinterface |
| * @param ctx |
| * some control object |
| */ |
| public void asyncAddEntry(final long entryId, final byte[] data, final AddCallback cb, final Object ctx) { |
| LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface."); |
| cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, ctx); |
| } |
| |
| /** |
| * Add entry asynchronously to an open ledger, using an offset and range. |
| * |
| * @param data |
| * array of bytes to be written |
| * @param offset |
| * offset from which to take bytes from data |
| * @param length |
| * number of bytes to take from data |
| * @param cb |
| * object implementing callbackinterface |
| * @param ctx |
| * some control object |
| * @throws ArrayIndexOutOfBoundsException if offset or length is negative or |
| * offset and length sum to a value higher than the length of data. |
| */ |
| public void asyncAddEntry(final byte[] data, final int offset, final int length, |
| final AddCallback cb, final Object ctx) { |
| if (offset < 0 || length < 0 |
| || (offset + length) > data.length) { |
| throw new ArrayIndexOutOfBoundsException( |
| "Invalid values for offset("+offset |
| +") or length("+length+")"); |
| } |
| |
| PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx); |
| doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx); |
| } |
| |
| public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) { |
| data.retain(); |
| PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx); |
| doAsyncAddEntry(op, data, cb, ctx); |
| } |
| |
| /** |
| * Add entry asynchronously to an open ledger, using an offset and range. |
| * This can be used only with {@link LedgerHandleAdv} returned through |
| * ledgers created with {@link BookKeeper#createLedgerAdv(int, int, int, DigestType, byte[])}. |
| * |
| * @param entryId |
| * entryId of the entry to add. |
| * @param data |
| * array of bytes to be written |
| * @param offset |
| * offset from which to take bytes from data |
| * @param length |
| * number of bytes to take from data |
| * @param cb |
| * object implementing callbackinterface |
| * @param ctx |
| * some control object |
| * @throws ArrayIndexOutOfBoundsException |
| * if offset or length is negative or offset and length sum to a |
| * value higher than the length of data. |
| */ |
| public void asyncAddEntry(final long entryId, final byte[] data, final int offset, final int length, |
| final AddCallback cb, final Object ctx) throws BKException { |
| LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface."); |
| cb.addComplete(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, ctx); |
| } |
| |
| /** |
| * Make a recovery add entry request. Recovery adds can add to a ledger even |
| * if it has been fenced. |
| * |
| * This is only valid for bookie and ledger recovery, which may need to replicate |
| * entries to a quorum of bookies to ensure data safety. |
| * |
| * Normal client should never call this method. |
| */ |
| void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length, |
| final AddCallback cb, final Object ctx) { |
| PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx).enableRecoveryAdd(); |
| doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx); |
| } |
| |
| protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final AddCallback cb, final Object ctx) { |
| if (throttler != null) { |
| throttler.acquire(); |
| } |
| |
| final long entryId; |
| final long currentLength; |
| boolean wasClosed = false; |
| synchronized(this) { |
| // synchronized on this to ensure that |
| // the ledger isn't closed between checking and |
| // updating lastAddPushed |
| if (metadata.isClosed()) { |
| wasClosed = true; |
| entryId = -1; |
| currentLength = 0; |
| } else { |
| entryId = ++lastAddPushed; |
| currentLength = addToLength(data.readableBytes()); |
| op.setEntryId(entryId); |
| pendingAddOps.add(op); |
| } |
| } |
| |
| if (wasClosed) { |
| // make sure the callback is triggered in main worker pool |
| try { |
| bk.mainWorkerPool.submit(new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| LOG.warn("Attempt to add to closed ledger: {}", ledgerId); |
| cb.addComplete(BKException.Code.LedgerClosedException, |
| LedgerHandle.this, INVALID_ENTRY_ID, ctx); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId); |
| } |
| }); |
| } catch (RejectedExecutionException e) { |
| cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), |
| LedgerHandle.this, INVALID_ENTRY_ID, ctx); |
| } |
| return; |
| } |
| |
| try { |
| bk.mainWorkerPool.submitOrdered(ledgerId, new SafeRunnable() { |
| @Override |
| public void safeRun() { |
| ByteBuf toSend = macManager.computeDigestAndPackageForSending(entryId, lastAddConfirmed, |
| currentLength, data); |
| try { |
| op.initiate(toSend, data.readableBytes()); |
| } finally { |
| toSend.release(); |
| } |
| } |
| @Override |
| public String toString() { |
| return String.format("AsyncAddEntry(lid=%d, eid=%d)", ledgerId, entryId); |
| } |
| }); |
| } catch (RejectedExecutionException e) { |
| cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), |
| LedgerHandle.this, INVALID_ENTRY_ID, ctx); |
| } |
| } |
| |
| synchronized void updateLastConfirmed(long lac, long len) { |
| if (lac > lastAddConfirmed) { |
| lastAddConfirmed = lac; |
| lacUpdateHitsCounter.inc(); |
| } else { |
| lacUpdateMissesCounter.inc(); |
| } |
| lastAddPushed = Math.max(lastAddPushed, lac); |
| length = Math.max(length, len); |
| } |
| |
| /** |
| * Obtains asynchronously the last confirmed write from a quorum of bookies. This |
| * call obtains the the last add confirmed each bookie has received for this ledger |
| * and returns the maximum. If the ledger has been closed, the value returned by this |
| * call may not correspond to the id of the last entry of the ledger, since it reads |
| * the hint of bookies. Consequently, in the case the ledger has been closed, it may |
| * return a different value than getLastAddConfirmed, which returns the local value |
| * of the ledger handle. |
| * |
| * @see #getLastAddConfirmed() |
| * |
| * @param cb |
| * @param ctx |
| */ |
| |
| public void asyncReadLastConfirmed(final ReadLastConfirmedCallback cb, final Object ctx) { |
| boolean isClosed; |
| long lastEntryId; |
| synchronized (this) { |
| isClosed = metadata.isClosed(); |
| lastEntryId = metadata.getLastEntryId(); |
| } |
| if (isClosed) { |
| cb.readLastConfirmedComplete(BKException.Code.OK, lastEntryId, ctx); |
| return; |
| } |
| ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback() { |
| @Override |
| public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) { |
| if (rc == BKException.Code.OK) { |
| updateLastConfirmed(data.lastAddConfirmed, data.length); |
| cb.readLastConfirmedComplete(rc, data.lastAddConfirmed, ctx); |
| } else { |
| cb.readLastConfirmedComplete(rc, INVALID_ENTRY_ID, ctx); |
| } |
| } |
| }; |
| new ReadLastConfirmedOp(this, innercb).initiate(); |
| } |
| |
| /** |
| * Obtains asynchronously the last confirmed write from a quorum of bookies. |
| * It is similar as |
| * {@link #asyncTryReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, Object)}, |
| * but it doesn't wait all the responses from the quorum. It would callback |
| * immediately if it received a LAC which is larger than current LAC. |
| * |
| * @see #asyncTryReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, Object) |
| * |
| * @param cb |
| * callback to return read last confirmed |
| * @param ctx |
| * callback context |
| */ |
| public void asyncTryReadLastConfirmed(final ReadLastConfirmedCallback cb, final Object ctx) { |
| boolean isClosed; |
| long lastEntryId; |
| synchronized (this) { |
| isClosed = metadata.isClosed(); |
| lastEntryId = metadata.getLastEntryId(); |
| } |
| if (isClosed) { |
| cb.readLastConfirmedComplete(BKException.Code.OK, lastEntryId, ctx); |
| return; |
| } |
| ReadLastConfirmedOp.LastConfirmedDataCallback innercb = new ReadLastConfirmedOp.LastConfirmedDataCallback() { |
| AtomicBoolean completed = new AtomicBoolean(false); |
| |
| @Override |
| public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData data) { |
| if (rc == BKException.Code.OK) { |
| updateLastConfirmed(data.lastAddConfirmed, data.length); |
| if (completed.compareAndSet(false, true)) { |
| cb.readLastConfirmedComplete(rc, data.lastAddConfirmed, ctx); |
| } |
| } else { |
| if (completed.compareAndSet(false, true)) { |
| cb.readLastConfirmedComplete(rc, INVALID_ENTRY_ID, ctx); |
| } |
| } |
| } |
| }; |
| new TryReadLastConfirmedOp(this, innercb, getLastAddConfirmed()).initiate(); |
| } |
| |
| |
| /** |
| * Asynchronous read next entry and the latest last add confirmed. |
| * If the next entryId is less than known last add confirmed, the call will read next entry directly. |
| * If the next entryId is ahead of known last add confirmed, the call will issue a long poll read |
| * to wait for the next entry <i>entryId</i>. |
| * |
| * The callback will return the latest last add confirmed and next entry if it is available within timeout period <i>timeOutInMillis</i>. |
| * |
| * @param entryId |
| * next entry id to read |
| * @param timeOutInMillis |
| * timeout period to wait for the entry id to be available (for long poll only) |
| * @param parallel |
| * whether to issue the long poll reads in parallel |
| * @param cb |
| * callback to return the result |
| * @param ctx |
| * callback context |
| */ |
| public void asyncReadLastConfirmedAndEntry(final long entryId, |
| final long timeOutInMillis, |
| final boolean parallel, |
| final AsyncCallback.ReadLastConfirmedAndEntryCallback cb, |
| final Object ctx) { |
| boolean isClosed; |
| long lac; |
| synchronized (this) { |
| isClosed = metadata.isClosed(); |
| lac = metadata.getLastEntryId(); |
| } |
| if (isClosed) { |
| if (entryId > lac) { |
| cb.readLastConfirmedAndEntryComplete(BKException.Code.OK, lac, null, ctx); |
| return; |
| } |
| } else { |
| lac = getLastAddConfirmed(); |
| } |
| if (entryId <= lac) { |
| asyncReadEntries(entryId, entryId, new ReadCallback() { |
| @Override |
| public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) { |
| if (BKException.Code.OK == rc) { |
| if (seq.hasMoreElements()) { |
| cb.readLastConfirmedAndEntryComplete(rc, getLastAddConfirmed(), seq.nextElement(), ctx); |
| } else { |
| cb.readLastConfirmedAndEntryComplete(rc, getLastAddConfirmed(), null, ctx); |
| } |
| } else { |
| cb.readLastConfirmedAndEntryComplete(rc, INVALID_ENTRY_ID, null, ctx); |
| } |
| } |
| }, ctx); |
| return; |
| } |
| // wait for entry <i>entryId</i> |
| ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb = new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() { |
| AtomicBoolean completed = new AtomicBoolean(false); |
| @Override |
| public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, LedgerEntry entry) { |
| if (rc == BKException.Code.OK) { |
| if (completed.compareAndSet(false, true)) { |
| cb.readLastConfirmedAndEntryComplete(rc, lastAddConfirmed, entry, ctx); |
| } |
| } else { |
| if (completed.compareAndSet(false, true)) { |
| cb.readLastConfirmedAndEntryComplete(rc, INVALID_ENTRY_ID, null, ctx); |
| } |
| } |
| } |
| }; |
| new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1, timeOutInMillis, bk.scheduler).parallelRead(parallel).initiate(); |
| } |
| |
| /** |
| * Context objects for synchronous call to read last confirmed. |
| */ |
| static class LastConfirmedCtx { |
| final static long ENTRY_ID_PENDING = -10; |
| long response; |
| int rc; |
| |
| LastConfirmedCtx() { |
| this.response = ENTRY_ID_PENDING; |
| } |
| |
| void setLastConfirmed(long lastConfirmed) { |
| this.response = lastConfirmed; |
| } |
| |
| long getlastConfirmed() { |
| return this.response; |
| } |
| |
| void setRC(int rc) { |
| this.rc = rc; |
| } |
| |
| int getRC() { |
| return this.rc; |
| } |
| |
| boolean ready() { |
| return (this.response != ENTRY_ID_PENDING); |
| } |
| } |
| |
| /** |
| * Obtains synchronously the last confirmed write from a quorum of bookies. This call |
| * obtains the the last add confirmed each bookie has received for this ledger |
| * and returns the maximum. If the ledger has been closed, the value returned by this |
| * call may not correspond to the id of the last entry of the ledger, since it reads |
| * the hint of bookies. Consequently, in the case the ledger has been closed, it may |
| * return a different value than getLastAddConfirmed, which returns the local value |
| * of the ledger handle. |
| * |
| * @see #getLastAddConfirmed() |
| * |
| * @return The entry id of the last confirmed write or {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID} |
| * if no entry has been confirmed |
| * @throws InterruptedException |
| * @throws BKException |
| */ |
| public long readLastConfirmed() |
| throws InterruptedException, BKException { |
| LastConfirmedCtx ctx = new LastConfirmedCtx(); |
| asyncReadLastConfirmed(new SyncReadLastConfirmedCallback(), ctx); |
| synchronized(ctx) { |
| while(!ctx.ready()) { |
| ctx.wait(); |
| } |
| } |
| |
| if(ctx.getRC() != BKException.Code.OK) throw BKException.create(ctx.getRC()); |
| return ctx.getlastConfirmed(); |
| } |
| |
| /** |
| * Obtains synchronously the last confirmed write from a quorum of bookies. |
| * It is similar as {@link #readLastConfirmed()}, but it doesn't wait all the responses |
| * from the quorum. It would callback immediately if it received a LAC which is larger |
| * than current LAC. |
| * |
| * @see #readLastConfirmed() |
| * |
| * @return The entry id of the last confirmed write or {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID} |
| * if no entry has been confirmed |
| * @throws InterruptedException |
| * @throws BKException |
| */ |
| public long tryReadLastConfirmed() throws InterruptedException, BKException { |
| LastConfirmedCtx ctx = new LastConfirmedCtx(); |
| asyncTryReadLastConfirmed(new SyncReadLastConfirmedCallback(), ctx); |
| synchronized (ctx) { |
| while (!ctx.ready()) { |
| ctx.wait(); |
| } |
| } |
| if (ctx.getRC() != BKException.Code.OK) throw BKException.create(ctx.getRC()); |
| return ctx.getlastConfirmed(); |
| } |
| |
| /** |
| * Obtains asynchronously the explicit last add confirmed from a quorum of |
| * bookies. This call obtains the the explicit last add confirmed each |
| * bookie has received for this ledger and returns the maximum. If in the |
| * write LedgerHandle, explicitLAC feature is not enabled then this will |
| * return {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}. If the read explicit |
| * lastaddconfirmed is greater than getLastAddConfirmed, then it updates the |
| * lastAddConfirmed of this ledgerhandle. If the ledger has been closed, it |
| * returns the value of the last add confirmed from the metadata. |
| * |
| * @see #getLastAddConfirmed() |
| * |
| * @param cb |
| * callback to return read explicit last confirmed |
| * @param ctx |
| * callback context |
| */ |
| public void asyncReadExplicitLastConfirmed(final ReadLastConfirmedCallback cb, final Object ctx) { |
| boolean isClosed; |
| synchronized (this) { |
| isClosed = metadata.isClosed(); |
| if (isClosed) { |
| lastAddConfirmed = metadata.getLastEntryId(); |
| length = metadata.getLength(); |
| } |
| } |
| if (isClosed) { |
| cb.readLastConfirmedComplete(BKException.Code.OK, lastAddConfirmed, ctx); |
| return; |
| } |
| |
| PendingReadLacOp.LacCallback innercb = new PendingReadLacOp.LacCallback() { |
| |
| @Override |
| public void getLacComplete(int rc, long lac) { |
| if (rc == BKException.Code.OK) { |
| // here we are trying to update lac only but not length |
| updateLastConfirmed(lac, 0); |
| cb.readLastConfirmedComplete(rc, lac, ctx); |
| } else { |
| cb.readLastConfirmedComplete(rc, INVALID_ENTRY_ID, ctx); |
| } |
| } |
| }; |
| new PendingReadLacOp(this, innercb).initiate(); |
| } |
| |
| /** |
| * Obtains synchronously the explicit last add confirmed from a quorum of |
| * bookies. This call obtains the the explicit last add confirmed each |
| * bookie has received for this ledger and returns the maximum. If in the |
| * write LedgerHandle, explicitLAC feature is not enabled then this will |
| * return {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID}. If the read explicit |
| * lastaddconfirmed is greater than getLastAddConfirmed, then it updates the |
| * lastAddConfirmed of this ledgerhandle. If the ledger has been closed, it |
| * returns the value of the last add confirmed from the metadata. |
| * |
| * @see #getLastAddConfirmed() |
| * |
| * @return The entry id of the explicit last confirmed write or |
| * {@link #INVALID_ENTRY_ID INVALID_ENTRY_ID} if no entry has been |
| * confirmed or if explicitLAC feature is not enabled in write |
| * LedgerHandle. |
| * @throws InterruptedException |
| * @throws BKException |
| */ |
| public long readExplicitLastConfirmed() throws InterruptedException, BKException { |
| LastConfirmedCtx ctx = new LastConfirmedCtx(); |
| asyncReadExplicitLastConfirmed(new SyncReadLastConfirmedCallback(), ctx); |
| synchronized (ctx) { |
| while (!ctx.ready()) { |
| ctx.wait(); |
| } |
| } |
| if (ctx.getRC() != BKException.Code.OK) { |
| throw BKException.create(ctx.getRC()); |
| } |
| return ctx.getlastConfirmed(); |
| } |
| |
| // close the ledger and send fails to all the adds in the pipeline |
| void handleUnrecoverableErrorDuringAdd(int rc) { |
| if (metadata.isInRecovery()) { |
| // we should not close ledger if ledger is recovery mode |
| // otherwise we may lose entry. |
| errorOutPendingAdds(rc); |
| return; |
| } |
| LOG.error("Closing ledger {} due to error {}", ledgerId, rc); |
| asyncCloseInternal(NoopCloseCallback.instance, null, rc); |
| } |
| |
| void errorOutPendingAdds(int rc) { |
| errorOutPendingAdds(rc, drainPendingAddsToErrorOut()); |
| } |
| |
| synchronized List<PendingAddOp> drainPendingAddsToErrorOut() { |
| PendingAddOp pendingAddOp; |
| List<PendingAddOp> opsDrained = new ArrayList<PendingAddOp>(pendingAddOps.size()); |
| while ((pendingAddOp = pendingAddOps.poll()) != null) { |
| addToLength(-pendingAddOp.entryLength); |
| opsDrained.add(pendingAddOp); |
| } |
| return opsDrained; |
| } |
| |
| void errorOutPendingAdds(int rc, List<PendingAddOp> ops) { |
| for (PendingAddOp op : ops) { |
| op.submitCallback(rc); |
| } |
| } |
| |
| void sendAddSuccessCallbacks() { |
| // Start from the head of the queue and proceed while there are |
| // entries that have had all their responses come back |
| PendingAddOp pendingAddOp; |
| |
| while ((pendingAddOp = pendingAddOps.peek()) != null |
| && blockAddCompletions.get() == 0) { |
| if (!pendingAddOp.completed) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("pending add not completed: {}", pendingAddOp); |
| } |
| return; |
| } |
| // Check if it is the next entry in the sequence. |
| if (pendingAddOp.entryId != 0 && pendingAddOp.entryId != lastAddConfirmed + 1) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Head of the queue entryId: {} is not lac: {} + 1", pendingAddOp.entryId, |
| lastAddConfirmed); |
| } |
| return; |
| } |
| |
| pendingAddOps.remove(); |
| explicitLacFlushPolicy.updatePiggyBackedLac(lastAddConfirmed); |
| lastAddConfirmed = pendingAddOp.entryId; |
| |
| pendingAddOp.submitCallback(BKException.Code.OK); |
| } |
| |
| } |
| |
| EnsembleInfo replaceBookieInMetadata(final Map<Integer, BookieSocketAddress> failedBookies, |
| int ensembleChangeIdx) |
| throws BKException.BKNotEnoughBookiesException { |
| final ArrayList<BookieSocketAddress> newEnsemble = new ArrayList<BookieSocketAddress>(); |
| final long newEnsembleStartEntry = getLastAddConfirmed() + 1; |
| final HashSet<Integer> replacedBookies = new HashSet<Integer>(); |
| synchronized (metadata) { |
| newEnsemble.addAll(metadata.currentEnsemble); |
| for (Map.Entry<Integer, BookieSocketAddress> entry : failedBookies.entrySet()) { |
| int idx = entry.getKey(); |
| BookieSocketAddress addr = entry.getValue(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("[EnsembleChange-L{}-{}] : replacing bookie: {} index: {}", |
| new Object[]{getId(), ensembleChangeIdx, addr, idx}); |
| } |
| if (!newEnsemble.get(idx).equals(addr)) { |
| // ensemble has already changed, failure of this addr is immaterial |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Write did not succeed to {}, bookieIndex {}, but we have already fixed it.", |
| addr, idx); |
| } |
| continue; |
| } |
| try { |
| BookieSocketAddress newBookie = bk.bookieWatcher.replaceBookie( |
| metadata.getEnsembleSize(), |
| metadata.getWriteQuorumSize(), |
| metadata.getAckQuorumSize(), |
| metadata.getCustomMetadata(), |
| newEnsemble, |
| idx, |
| new HashSet<BookieSocketAddress>(failedBookies.values())); |
| newEnsemble.set(idx, newBookie); |
| replacedBookies.add(idx); |
| } catch (BKException.BKNotEnoughBookiesException e) { |
| // if there is no bookie replaced, we throw not enough bookie exception |
| if (replacedBookies.size() <= 0) { |
| throw e; |
| } else { |
| break; |
| } |
| } |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("[EnsembleChange-L{}-{}] : changing ensemble from: {} to: {} starting at entry: {}," + |
| " failed bookies: {}, replaced bookies: {}", |
| new Object[] { ledgerId, ensembleChangeIdx, metadata.currentEnsemble, newEnsemble, |
| (getLastAddConfirmed() + 1), failedBookies, replacedBookies }); |
| } |
| metadata.addEnsemble(newEnsembleStartEntry, newEnsemble); |
| } |
| return new EnsembleInfo(newEnsemble, failedBookies, replacedBookies); |
| } |
| |
| void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) { |
| int curBlockAddCompletions = blockAddCompletions.incrementAndGet(); |
| |
| if (bk.disableEnsembleChangeFeature.isAvailable()) { |
| blockAddCompletions.decrementAndGet(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Ensemble change is disabled. Retry sending to failed bookies {} for ledger {}.", |
| failedBookies, ledgerId); |
| } |
| unsetSuccessAndSendWriteRequest(failedBookies.keySet()); |
| return; |
| } |
| |
| int curNumEnsembleChanges = numEnsembleChanges.incrementAndGet(); |
| |
| synchronized (metadata) { |
| try { |
| EnsembleInfo ensembleInfo = replaceBookieInMetadata(failedBookies, curNumEnsembleChanges); |
| if (ensembleInfo.replacedBookies.isEmpty()) { |
| blockAddCompletions.decrementAndGet(); |
| return; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("[EnsembleChange-L{}-{}] : writing new ensemble info = {}, block add completions = {}", |
| new Object[]{getId(), curNumEnsembleChanges, ensembleInfo, curBlockAddCompletions}); |
| } |
| writeLedgerConfig(new ChangeEnsembleCb(ensembleInfo, curBlockAddCompletions, curNumEnsembleChanges)); |
| } catch (BKException.BKNotEnoughBookiesException e) { |
| LOG.error("Could not get additional bookie to remake ensemble, closing ledger: {}", ledgerId); |
| handleUnrecoverableErrorDuringAdd(e.getCode()); |
| return; |
| } |
| } |
| } |
| |
| // Contains newly reformed ensemble, bookieIndex, failedBookieAddress |
| static final class EnsembleInfo { |
| private final ArrayList<BookieSocketAddress> newEnsemble; |
| private final Map<Integer, BookieSocketAddress> failedBookies; |
| final Set<Integer> replacedBookies; |
| |
| public EnsembleInfo(ArrayList<BookieSocketAddress> newEnsemble, |
| Map<Integer, BookieSocketAddress> failedBookies, |
| Set<Integer> replacedBookies) { |
| this.newEnsemble = newEnsemble; |
| this.failedBookies = failedBookies; |
| this.replacedBookies = replacedBookies; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Ensemble Info : failed bookies = ").append(failedBookies) |
| .append(", replaced bookies = ").append(replacedBookies) |
| .append(", new ensemble = ").append(newEnsemble); |
| return sb.toString(); |
| } |
| } |
| |
| /** |
| * Callback which is updating the ledgerMetadata in zk with the newly |
| * reformed ensemble. On MetadataVersionException, will reread latest |
| * ledgerMetadata and act upon. |
| */ |
| private final class ChangeEnsembleCb extends OrderedSafeGenericCallback<Void> { |
| private final EnsembleInfo ensembleInfo; |
| private final int curBlockAddCompletions; |
| private final int ensembleChangeIdx; |
| |
| ChangeEnsembleCb(EnsembleInfo ensembleInfo, |
| int curBlockAddCompletions, |
| int ensembleChangeIdx) { |
| super(bk.mainWorkerPool, ledgerId); |
| this.ensembleInfo = ensembleInfo; |
| this.curBlockAddCompletions = curBlockAddCompletions; |
| this.ensembleChangeIdx = ensembleChangeIdx; |
| } |
| |
| @Override |
| public void safeOperationComplete(final int rc, Void result) { |
| if (rc == BKException.Code.MetadataVersionException) { |
| // We changed the ensemble, but got a version exception. We |
| // should still consider this as an ensemble change |
| ensembleChangeCounter.inc(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.info("[EnsembleChange-L{}-{}] : encountered version conflicts, re-read ledger metadata.", |
| getId(), ensembleChangeIdx); |
| } |
| |
| rereadMetadata(new ReReadLedgerMetadataCb(rc, |
| ensembleInfo, curBlockAddCompletions, ensembleChangeIdx)); |
| return; |
| } else if (rc != BKException.Code.OK) { |
| LOG.error("[EnsembleChange-L{}-{}] : could not persist ledger metadata : info = {}, closing ledger : {}.", |
| new Object[] { getId(), ensembleChangeIdx, ensembleInfo, rc }); |
| handleUnrecoverableErrorDuringAdd(rc); |
| return; |
| } |
| int newBlockAddCompletions = blockAddCompletions.decrementAndGet(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.info("[EnsembleChange-L{}-{}] : completed ensemble change, block add completion {} => {}", |
| new Object[]{getId(), ensembleChangeIdx, curBlockAddCompletions, newBlockAddCompletions}); |
| } |
| |
| // We've successfully changed an ensemble |
| ensembleChangeCounter.inc(); |
| // the failed bookie has been replaced |
| unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("ChangeEnsemble(%d)", ledgerId); |
| } |
| } |
| |
| /** |
| * Callback which is reading the ledgerMetadata present in zk. This will try |
| * to resolve the version conflicts. |
| */ |
| private final class ReReadLedgerMetadataCb extends OrderedSafeGenericCallback<LedgerMetadata> { |
| private final int rc; |
| private final EnsembleInfo ensembleInfo; |
| private final int curBlockAddCompletions; |
| private final int ensembleChangeIdx; |
| |
| ReReadLedgerMetadataCb(int rc, |
| EnsembleInfo ensembleInfo, |
| int curBlockAddCompletions, |
| int ensembleChangeIdx) { |
| super(bk.mainWorkerPool, ledgerId); |
| this.rc = rc; |
| this.ensembleInfo = ensembleInfo; |
| this.curBlockAddCompletions = curBlockAddCompletions; |
| this.ensembleChangeIdx = ensembleChangeIdx; |
| } |
| |
| @Override |
| public void safeOperationComplete(int newrc, LedgerMetadata newMeta) { |
| if (newrc != BKException.Code.OK) { |
| LOG.error("[EnsembleChange-L{}-{}] : error re-reading metadata to address ensemble change conflicts," + |
| " code=", new Object[] { ledgerId, ensembleChangeIdx, newrc }); |
| handleUnrecoverableErrorDuringAdd(rc); |
| } else { |
| if (!resolveConflict(newMeta)) { |
| LOG.error("[EnsembleChange-L{}-{}] : could not resolve ledger metadata conflict" + |
| " while changing ensemble to: {}, local meta data is \n {} \n," + |
| " zk meta data is \n {} \n, closing ledger", |
| new Object[] { ledgerId, ensembleChangeIdx, ensembleInfo.newEnsemble, metadata, newMeta }); |
| handleUnrecoverableErrorDuringAdd(rc); |
| } |
| } |
| } |
| |
| /** |
| * Specific resolve conflicts happened when multiple bookies failures in same ensemble. |
| * <p> |
| * Resolving the version conflicts between local ledgerMetadata and zk |
| * ledgerMetadata. This will do the following: |
| * <ul> |
| * <li> |
| * check whether ledgerMetadata state matches of local and zk</li> |
| * <li> |
| * if the zk ledgerMetadata still contains the failed bookie, then |
| * update zookeeper with the newBookie otherwise send write request</li> |
| * </ul> |
| * </p> |
| */ |
| private boolean resolveConflict(LedgerMetadata newMeta) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("[EnsembleChange-L{}-{}] : resolving conflicts - local metadata = \n {} \n," + |
| " zk metadata = \n {} \n", new Object[]{ledgerId, ensembleChangeIdx, metadata, newMeta}); |
| } |
| // make sure the ledger isn't closed by other ones. |
| if (metadata.getState() != newMeta.getState()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.info("[EnsembleChange-L{}-{}] : resolving conflicts but state changed," + |
| " local metadata = \n {} \n, zk metadata = \n {} \n", |
| new Object[]{ledgerId, ensembleChangeIdx, metadata, newMeta}); |
| } |
| return false; |
| } |
| |
| // We should check number of ensembles since there are two kinds of metadata conflicts: |
| // - Case 1: Multiple bookies involved in ensemble change. |
| // Number of ensembles should be same in this case. |
| // - Case 2: Recovery (Auto/Manually) replaced ensemble and ensemble changed. |
| // The metadata changed due to ensemble change would have one more ensemble |
| // than the metadata changed by recovery. |
| int diff = newMeta.getEnsembles().size() - metadata.getEnsembles().size(); |
| if (0 != diff) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("[EnsembleChange-L{}-{}] : resolving conflicts but ensembles have {} differences," + |
| " local metadata = \n {} \n, zk metadata = \n {} \n", |
| new Object[]{ledgerId, ensembleChangeIdx, diff, metadata, newMeta}); |
| } |
| if (-1 == diff) { |
| // Case 1: metadata is changed by other ones (e.g. Recovery) |
| return updateMetadataIfPossible(newMeta); |
| } |
| return false; |
| } |
| |
| // |
| // Case 2: |
| // |
| // If the failed the bookie is still existed in the metadata (in zookeeper), it means that |
| // the ensemble change of the failed bookie is failed due to metadata conflicts. so try to |
| // update the ensemble change metadata again. Otherwise, it means that the ensemble change |
| // is already succeed, unset the success and re-adding entries. |
| if (!areFailedBookiesReplaced(newMeta, ensembleInfo)) { |
| // If the in-memory data doesn't contains the failed bookie, it means the ensemble change |
| // didn't finish, so try to resolve conflicts with the metadata read from zookeeper and |
| // update ensemble changed metadata again. |
| if (areFailedBookiesReplaced(metadata, ensembleInfo)) { |
| return updateMetadataIfPossible(newMeta); |
| } |
| } else { |
| ensembleChangeCounter.inc(); |
| // We've successfully changed an ensemble |
| // the failed bookie has been replaced |
| int newBlockAddCompletions = blockAddCompletions.decrementAndGet(); |
| unsetSuccessAndSendWriteRequest(ensembleInfo.replacedBookies); |
| if (LOG.isDebugEnabled()) { |
| LOG.info("[EnsembleChange-L{}-{}] : resolved conflicts, block add complectiosn {} => {}.", |
| new Object[]{ledgerId, ensembleChangeIdx, curBlockAddCompletions, newBlockAddCompletions}); |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Check whether all the failed bookies are replaced. |
| * |
| * @param newMeta |
| * new ledger metadata |
| * @param ensembleInfo |
| * ensemble info used for ensemble change. |
| * @return true if all failed bookies are replaced, false otherwise |
| */ |
| private boolean areFailedBookiesReplaced(LedgerMetadata newMeta, EnsembleInfo ensembleInfo) { |
| boolean replaced = true; |
| for (Integer replacedBookieIdx : ensembleInfo.replacedBookies) { |
| BookieSocketAddress failedBookieAddr = ensembleInfo.failedBookies.get(replacedBookieIdx); |
| BookieSocketAddress replacedBookieAddr = newMeta.currentEnsemble.get(replacedBookieIdx); |
| replaced &= !Objects.equal(replacedBookieAddr, failedBookieAddr); |
| } |
| return replaced; |
| } |
| |
| private boolean updateMetadataIfPossible(LedgerMetadata newMeta) { |
| // if the local metadata is newer than zookeeper metadata, it means that metadata is updated |
| // again when it was trying re-reading the metatada, re-kick the reread again |
| if (metadata.isNewerThan(newMeta)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("[EnsembleChange-L{}-{}] : reread metadata because local metadata is newer.", |
| new Object[]{ledgerId, ensembleChangeIdx}); |
| } |
| rereadMetadata(this); |
| return true; |
| } |
| // make sure the metadata doesn't changed by other ones. |
| if (metadata.isConflictWith(newMeta)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("[EnsembleChange-L{}-{}] : metadata is conflicted, local metadata = \n {} \n," + |
| " zk metadata = \n {} \n", new Object[]{ledgerId, ensembleChangeIdx, metadata, newMeta}); |
| } |
| return false; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.info("[EnsembleChange-L{}-{}] : resolved ledger metadata conflict and writing to zookeeper," |
| + " local meta data is \n {} \n, zk meta data is \n {}.", |
| new Object[]{ledgerId, ensembleChangeIdx, metadata, newMeta}); |
| } |
| // update znode version |
| metadata.setVersion(newMeta.getVersion()); |
| // merge ensemble infos from new meta except last ensemble |
| // since they might be modified by recovery tool. |
| metadata.mergeEnsembles(newMeta.getEnsembles()); |
| writeLedgerConfig(new ChangeEnsembleCb(ensembleInfo, curBlockAddCompletions, ensembleChangeIdx)); |
| return true; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("ReReadLedgerMetadata(%d)", ledgerId); |
| } |
| } |
| |
| void unsetSuccessAndSendWriteRequest(final Set<Integer> bookies) { |
| for (PendingAddOp pendingAddOp : pendingAddOps) { |
| for (Integer bookieIndex: bookies) { |
| pendingAddOp.unsetSuccessAndSendWriteRequest(bookieIndex); |
| } |
| } |
| } |
| |
| void rereadMetadata(final GenericCallback<LedgerMetadata> cb) { |
| bk.getLedgerManager().readLedgerMetadata(ledgerId, cb); |
| } |
| |
| void registerOperationFailureOnBookie(BookieSocketAddress bookie, long entryId) { |
| if (bk.getConf().getEnableBookieFailureTracking()) { |
| bookieFailureHistory.put(bookie, entryId); |
| } |
| } |
| |
| |
| void recover(GenericCallback<Void> finalCb) { |
| recover(finalCb, null, false); |
| } |
| |
| /** |
| * Recover the ledger. |
| * |
| * @param finalCb |
| * callback after recovery is done. |
| * @param listener |
| * read entry listener on recovery reads. |
| * @param forceRecovery |
| * force the recovery procedure even the ledger metadata shows the ledger is closed. |
| */ |
| void recover(GenericCallback<Void> finalCb, |
| final @VisibleForTesting BookkeeperInternalCallbacks.ReadEntryListener listener, |
| final boolean forceRecovery) { |
| final GenericCallback<Void> cb = new TimedGenericCallback<Void>( |
| finalCb, |
| BKException.Code.OK, |
| bk.getRecoverOpLogger()); |
| boolean wasClosed = false; |
| boolean wasInRecovery = false; |
| |
| synchronized (this) { |
| if (metadata.isClosed()) { |
| if (forceRecovery) { |
| wasClosed = false; |
| // mark the ledger back to in recovery state, so it would proceed ledger recovery again. |
| wasInRecovery = false; |
| metadata.markLedgerInRecovery(); |
| } else { |
| lastAddConfirmed = lastAddPushed = metadata.getLastEntryId(); |
| length = metadata.getLength(); |
| wasClosed = true; |
| } |
| } else { |
| wasClosed = false; |
| if (metadata.isInRecovery()) { |
| wasInRecovery = true; |
| } else { |
| wasInRecovery = false; |
| metadata.markLedgerInRecovery(); |
| } |
| } |
| } |
| |
| if (wasClosed) { |
| // We are already closed, nothing to do |
| cb.operationComplete(BKException.Code.OK, null); |
| return; |
| } |
| |
| if (wasInRecovery) { |
| // if metadata is already in recover, dont try to write again, |
| // just do the recovery from the starting point |
| new LedgerRecoveryOp(LedgerHandle.this, cb) |
| .parallelRead(enableParallelRecoveryRead) |
| .readBatchSize(recoveryReadBatchSize) |
| .setEntryListener(listener) |
| .initiate(); |
| return; |
| } |
| |
| writeLedgerConfig(new OrderedSafeGenericCallback<Void>(bk.mainWorkerPool, ledgerId) { |
| @Override |
| public void safeOperationComplete(final int rc, Void result) { |
| if (rc == BKException.Code.MetadataVersionException) { |
| rereadMetadata(new OrderedSafeGenericCallback<LedgerMetadata>(bk.mainWorkerPool, |
| ledgerId) { |
| @Override |
| public void safeOperationComplete(int rc, LedgerMetadata newMeta) { |
| if (rc != BKException.Code.OK) { |
| cb.operationComplete(rc, null); |
| } else { |
| metadata = newMeta; |
| recover(cb, listener, forceRecovery); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("ReReadMetadataForRecover(%d)", ledgerId); |
| } |
| }); |
| } else if (rc == BKException.Code.OK) { |
| // we only could issue recovery operation after we successfully update the ledger state to in recovery |
| // otherwise, it couldn't prevent us advancing last confirmed while the other writer is closing the ledger, |
| // which will cause inconsistent last add confirmed on bookies & zookeeper metadata. |
| new LedgerRecoveryOp(LedgerHandle.this, cb) |
| .parallelRead(enableParallelRecoveryRead) |
| .readBatchSize(recoveryReadBatchSize) |
| .setEntryListener(listener) |
| .initiate(); |
| } else { |
| LOG.error("Error writing ledger config {} of ledger {}", rc, ledgerId); |
| cb.operationComplete(rc, null); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("WriteLedgerConfigForRecover(%d)", ledgerId); |
| } |
| }); |
| } |
| |
| static class NoopCloseCallback implements CloseCallback { |
| static NoopCloseCallback instance = new NoopCloseCallback(); |
| |
| @Override |
| public void closeComplete(int rc, LedgerHandle lh, Object ctx) { |
| if (rc != BKException.Code.OK) { |
| LOG.warn("Close failed: " + BKException.getMessage(rc)); |
| } |
| // noop |
| } |
| } |
| |
| static class LastAddConfirmedCallback implements AddLacCallback { |
| static final LastAddConfirmedCallback INSTANCE = new LastAddConfirmedCallback(); |
| /** |
| * Implementation of callback interface for synchronous read method. |
| * |
| * @param rc |
| * return code |
| * @param lh |
| * ledger identifier |
| * @param ctx |
| * control object |
| */ |
| @Override |
| public void addLacComplete(int rc, LedgerHandle lh, Object ctx) { |
| if (rc != BKException.Code.OK) { |
| LOG.warn("LastAddConfirmedUpdate failed: {} ", BKException.getMessage(rc)); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Callback LAC Updated for: {} ", lh.getId()); |
| } |
| } |
| } |
| } |
| |
| static class SyncReadCallback implements ReadCallback { |
| /** |
| * Implementation of callback interface for synchronous read method. |
| * |
| * @param rc |
| * return code |
| * @param lh |
| * ledger handle |
| * @param seq |
| * sequence of entries |
| * @param ctx |
| * control object |
| */ |
| @Override |
| @SuppressWarnings("unchecked") |
| public void readComplete(int rc, LedgerHandle lh, |
| Enumeration<LedgerEntry> seq, Object ctx) { |
| SynchCallbackUtils.finish(rc, seq, (CompletableFuture<Enumeration<LedgerEntry>>)ctx); |
| } |
| } |
| |
| static class SyncAddCallback implements AddCallback { |
| |
| /** |
| * Implementation of callback interface for synchronous read method. |
| * |
| * @param rc |
| * return code |
| * @param lh |
| * ledger handle |
| * @param entry |
| * entry identifier |
| * @param ctx |
| * control object |
| */ |
| @Override |
| @SuppressWarnings("unchecked") |
| public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) { |
| SynchCallbackUtils.finish(rc, entry, (CompletableFuture<Long>)ctx); |
| } |
| } |
| |
| static class SyncReadLastConfirmedCallback implements ReadLastConfirmedCallback { |
| /** |
| * Implementation of callback interface for synchronous read last confirmed method. |
| */ |
| @Override |
| public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) { |
| LastConfirmedCtx lcCtx = (LastConfirmedCtx) ctx; |
| |
| synchronized(lcCtx) { |
| lcCtx.setRC(rc); |
| lcCtx.setLastConfirmed(lastConfirmed); |
| lcCtx.notify(); |
| } |
| } |
| } |
| |
| static class SyncCloseCallback implements CloseCallback { |
| /** |
| * Close callback method |
| * |
| * @param rc |
| * @param lh |
| * @param ctx |
| */ |
| @Override |
| @SuppressWarnings("unchecked") |
| public void closeComplete(int rc, LedgerHandle lh, Object ctx) { |
| SynchCallbackUtils.finish(rc, null, (CompletableFuture<Void>)ctx); |
| } |
| } |
| } |