| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.bookkeeper.client; |
| |
| import static com.google.common.base.Preconditions.checkState; |
| import static org.apache.bookkeeper.client.api.BKException.Code.NoBookieAvailableException; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.ArgumentMatchers.anyBoolean; |
| import static org.mockito.ArgumentMatchers.anyInt; |
| import static org.mockito.ArgumentMatchers.anyList; |
| import static org.mockito.ArgumentMatchers.anyLong; |
| import static org.mockito.ArgumentMatchers.anyMap; |
| import static org.mockito.ArgumentMatchers.anySet; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.ByteBufAllocator; |
| import io.netty.buffer.Unpooled; |
| import io.netty.buffer.UnpooledByteBufAllocator; |
| |
| import java.security.GeneralSecurityException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.bookkeeper.client.BKException.BKDigestMatchException; |
| import org.apache.bookkeeper.client.BKException.Code; |
| import org.apache.bookkeeper.client.api.CreateBuilder; |
| import org.apache.bookkeeper.client.api.DeleteBuilder; |
| import org.apache.bookkeeper.client.api.LedgerMetadata; |
| import org.apache.bookkeeper.client.api.OpenBuilder; |
| import org.apache.bookkeeper.common.util.OrderedExecutor; |
| import org.apache.bookkeeper.common.util.OrderedScheduler; |
| import org.apache.bookkeeper.conf.ClientConfiguration; |
| import org.apache.bookkeeper.meta.LedgerIdGenerator; |
| import org.apache.bookkeeper.meta.LedgerManager; |
| import org.apache.bookkeeper.net.BookieSocketAddress; |
| import org.apache.bookkeeper.proto.BookieClient; |
| import org.apache.bookkeeper.proto.BookieProtocol; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; |
| import org.apache.bookkeeper.proto.checksum.DigestManager; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.util.ByteBufList; |
| import org.apache.bookkeeper.versioning.LongVersion; |
| import org.apache.bookkeeper.versioning.Version; |
| import org.apache.bookkeeper.versioning.Versioned; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| import org.mockito.stubbing.Stubber; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Base class for Mock-based Client testcases. |
| */ |
| public abstract class MockBookKeeperTestCase { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MockBookKeeperTestCase.class); |
| |
| protected OrderedScheduler scheduler; |
| protected OrderedExecutor executor; |
| protected BookKeeper bk; |
| protected BookieClient bookieClient; |
| protected LedgerManager ledgerManager; |
| protected LedgerIdGenerator ledgerIdGenerator; |
| |
| private BookieWatcher bookieWatcher; |
| |
| protected ConcurrentMap<Long, LedgerMetadata> mockLedgerMetadataRegistry; |
| protected AtomicLong mockNextLedgerId; |
| protected ConcurrentSkipListSet<Long> fencedLedgers; |
| protected ConcurrentMap<Long, Map<BookieSocketAddress, Map<Long, MockEntry>>> mockLedgerData; |
| |
| private Map<BookieSocketAddress, List<Runnable>> deferredBookieForceLedgerResponses; |
| private Set<BookieSocketAddress> suspendedBookiesForForceLedgerAcks; |
| |
| List<BookieSocketAddress> failedBookies; |
| Set<BookieSocketAddress> availableBookies; |
| private int lastIndexForBK; |
| |
| private Map<BookieSocketAddress, Map<Long, MockEntry>> getMockLedgerContents(long ledgerId) { |
| return mockLedgerData.computeIfAbsent(ledgerId, (id) -> new ConcurrentHashMap<>()); |
| } |
| |
| private Map<Long, MockEntry> getMockLedgerContentsInBookie(long ledgerId, BookieSocketAddress bookieSocketAddress) { |
| return getMockLedgerContents(ledgerId).computeIfAbsent(bookieSocketAddress, addr -> new ConcurrentHashMap<>()); |
| } |
| |
| private MockEntry getMockLedgerEntry(long ledgerId, |
| BookieSocketAddress bookieSocketAddress, long entryId) throws BKException{ |
| if (failedBookies.contains(bookieSocketAddress)) { |
| throw BKException.create(NoBookieAvailableException); |
| } |
| return getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).get(entryId); |
| } |
| |
| private static final class MockEntry { |
| |
| byte[] payload; |
| long lastAddConfirmed; |
| |
| public MockEntry(byte[] payload, long lastAddConfirmed) { |
| this.payload = payload; |
| this.lastAddConfirmed = lastAddConfirmed; |
| } |
| |
| } |
| |
| @Before |
| public void setup() throws Exception { |
| deferredBookieForceLedgerResponses = new ConcurrentHashMap<>(); |
| suspendedBookiesForForceLedgerAcks = Collections.synchronizedSet(new HashSet<>()); |
| mockLedgerMetadataRegistry = new ConcurrentHashMap<>(); |
| mockLedgerData = new ConcurrentHashMap<>(); |
| mockNextLedgerId = new AtomicLong(1); |
| fencedLedgers = new ConcurrentSkipListSet<>(); |
| scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(4).name("bk-test").build(); |
| executor = OrderedExecutor.newBuilder().build(); |
| bookieWatcher = mock(BookieWatcher.class); |
| |
| bookieClient = mock(BookieClient.class); |
| ledgerManager = mock(LedgerManager.class); |
| ledgerIdGenerator = mock(LedgerIdGenerator.class); |
| |
| bk = mock(BookKeeper.class); |
| |
| failedBookies = new ArrayList<>(); |
| availableBookies = new HashSet<>(); |
| |
| when(bk.getCloseLock()).thenReturn(new ReentrantReadWriteLock()); |
| when(bk.isClosed()).thenReturn(false); |
| when(bk.getBookieWatcher()).thenReturn(bookieWatcher); |
| when(bk.getMainWorkerPool()).thenReturn(executor); |
| when(bk.getBookieClient()).thenReturn(bookieClient); |
| when(bk.getScheduler()).thenReturn(scheduler); |
| |
| setBookKeeperConfig(new ClientConfiguration()); |
| when(bk.getStatsLogger()).thenReturn(NullStatsLogger.INSTANCE); |
| BookKeeperClientStats clientStats = BookKeeperClientStats.newInstance(NullStatsLogger.INSTANCE); |
| ClientContext clientCtx = new ClientContext() { |
| @Override |
| public ClientInternalConf getConf() { |
| return ClientInternalConf.fromConfig(bk.getConf()); |
| } |
| |
| @Override |
| public LedgerManager getLedgerManager() { |
| return ledgerManager; |
| } |
| |
| @Override |
| public BookieWatcher getBookieWatcher() { |
| return bookieWatcher; |
| } |
| |
| @Override |
| public EnsemblePlacementPolicy getPlacementPolicy() { |
| return null; |
| } |
| |
| @Override |
| public BookieClient getBookieClient() { |
| return bookieClient; |
| } |
| |
| @Override |
| public OrderedExecutor getMainWorkerPool() { |
| return scheduler; |
| } |
| |
| @Override |
| public OrderedScheduler getScheduler() { |
| return scheduler; |
| } |
| |
| @Override |
| public BookKeeperClientStats getClientStats() { |
| return clientStats; |
| } |
| |
| @Override |
| public boolean isClientClosed() { |
| return bk.isClosed(); |
| } |
| |
| @Override |
| public ByteBufAllocator getByteBufAllocator() { |
| return UnpooledByteBufAllocator.DEFAULT; |
| } |
| }; |
| when(bk.getClientCtx()).thenReturn(clientCtx); |
| when(bk.getLedgerManager()).thenReturn(ledgerManager); |
| when(bk.getLedgerIdGenerator()).thenReturn(ledgerIdGenerator); |
| when(bk.getReturnRc(anyInt())).thenAnswer(invocationOnMock -> invocationOnMock.getArgument(0)); |
| when(bookieClient.isWritable(any(), anyLong())).thenReturn(true); |
| |
| setupLedgerIdGenerator(); |
| setupCreateLedgerMetadata(); |
| setupReadLedgerMetadata(); |
| setupWriteLedgerMetadata(); |
| setupRemoveLedgerMetadata(); |
| setupRegisterLedgerMetadataListener(); |
| setupBookieWatcherForNewEnsemble(); |
| setupBookieWatcherForEnsembleChange(); |
| setupBookieClientReadEntry(); |
| setupBookieClientAddEntry(); |
| setupBookieClientForceLedger(); |
| } |
| |
| protected void setBookKeeperConfig(ClientConfiguration conf) { |
| when(bk.getConf()).thenReturn(conf); |
| } |
| |
| private DigestManager getDigestType(long ledgerId) throws GeneralSecurityException { |
| LedgerMetadata metadata = mockLedgerMetadataRegistry.get(ledgerId); |
| return DigestManager.instantiate( |
| ledgerId, |
| metadata.getPassword(), |
| org.apache.bookkeeper.client.BookKeeper.DigestType.toProtoDigestType( |
| org.apache.bookkeeper.client.BookKeeper.DigestType.fromApiDigestType( |
| metadata.getDigestType())), |
| UnpooledByteBufAllocator.DEFAULT, false); |
| } |
| |
| @After |
| public void tearDown() { |
| scheduler.shutdown(); |
| executor.shutdown(); |
| } |
| |
| protected CreateBuilder newCreateLedgerOp() { |
| return new LedgerCreateOp.CreateBuilderImpl(bk); |
| } |
| |
| protected OpenBuilder newOpenLedgerOp() { |
| return new LedgerOpenOp.OpenBuilderImpl(bk); |
| } |
| |
| protected DeleteBuilder newDeleteLedgerOp() { |
| return new LedgerDeleteOp.DeleteBuilderImpl(bk); |
| } |
| |
| protected void closeBookkeeper() { |
| when(bk.isClosed()).thenReturn(true); |
| } |
| |
| protected void killBookie(BookieSocketAddress killedBookieSocketAddress) { |
| failedBookies.add(killedBookieSocketAddress); |
| availableBookies.remove(killedBookieSocketAddress); |
| } |
| |
| protected void startKilledBookie(BookieSocketAddress killedBookieSocketAddress) { |
| checkState(failedBookies.contains(killedBookieSocketAddress)); |
| checkState(!availableBookies.contains(killedBookieSocketAddress)); |
| failedBookies.remove(killedBookieSocketAddress); |
| availableBookies.add(killedBookieSocketAddress); |
| } |
| |
| protected void suspendBookieForceLedgerAcks(BookieSocketAddress address) { |
| suspendedBookiesForForceLedgerAcks.add(address); |
| } |
| |
| protected void resumeBookieWriteAcks(BookieSocketAddress address) { |
| suspendedBookiesForForceLedgerAcks.remove(address); |
| List<Runnable> pendingResponses = deferredBookieForceLedgerResponses.remove(address); |
| if (pendingResponses != null) { |
| pendingResponses.forEach(Runnable::run); |
| } |
| } |
| |
| protected BookieSocketAddress startNewBookie() { |
| BookieSocketAddress address = generateBookieSocketAddress(lastIndexForBK++); |
| availableBookies.add(address); |
| return address; |
| } |
| |
| protected BookieSocketAddress generateBookieSocketAddress(int index) { |
| return new BookieSocketAddress("localhost", 1111 + index); |
| } |
| |
| protected ArrayList<BookieSocketAddress> generateNewEnsemble(int ensembleSize) { |
| ArrayList<BookieSocketAddress> ensemble = new ArrayList<>(ensembleSize); |
| for (int i = 0; i < ensembleSize; i++) { |
| ensemble.add(generateBookieSocketAddress(i)); |
| } |
| availableBookies.addAll(ensemble); |
| lastIndexForBK = ensembleSize; |
| return ensemble; |
| } |
| |
| private void setupBookieWatcherForNewEnsemble() throws BKException.BKNotEnoughBookiesException { |
| when(bookieWatcher.newEnsemble(anyInt(), anyInt(), anyInt(), any())) |
| .thenAnswer((Answer<ArrayList<BookieSocketAddress>>) new Answer<ArrayList<BookieSocketAddress>>() { |
| @Override |
| @SuppressWarnings("unchecked") |
| public ArrayList<BookieSocketAddress> answer(InvocationOnMock invocation) throws Throwable { |
| Object[] args = invocation.getArguments(); |
| int ensembleSize = (Integer) args[0]; |
| return generateNewEnsemble(ensembleSize); |
| } |
| }); |
| } |
| |
| private void setupBookieWatcherForEnsembleChange() throws BKException.BKNotEnoughBookiesException { |
| when(bookieWatcher.replaceBookie(anyInt(), anyInt(), anyInt(), anyMap(), anyList(), anyInt(), anySet())) |
| .thenAnswer((Answer<BookieSocketAddress>) new Answer<BookieSocketAddress>() { |
| @Override |
| @SuppressWarnings("unchecked") |
| public BookieSocketAddress answer(InvocationOnMock invocation) throws Throwable { |
| Object[] args = invocation.getArguments(); |
| List<BookieSocketAddress> existingBookies = (List<BookieSocketAddress>) args[4]; |
| Set<BookieSocketAddress> excludeBookies = (Set<BookieSocketAddress>) args[6]; |
| excludeBookies.addAll(existingBookies); |
| Set<BookieSocketAddress> remainBookies = new HashSet<BookieSocketAddress>(availableBookies); |
| remainBookies.removeAll(excludeBookies); |
| if (remainBookies.iterator().hasNext()) { |
| return remainBookies.iterator().next(); |
| } |
| throw BKException.create(BKException.Code.NotEnoughBookiesException); |
| } |
| }); |
| } |
| |
| protected void registerMockEntryForRead(long ledgerId, long entryId, BookieSocketAddress bookieSocketAddress, |
| byte[] entryData, long lastAddConfirmed) { |
| getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).put(entryId, new MockEntry(entryData, |
| lastAddConfirmed)); |
| } |
| |
| protected void registerMockLedgerMetadata(long ledgerId, LedgerMetadata ledgerMetadata) { |
| mockLedgerMetadataRegistry.put(ledgerId, ledgerMetadata); |
| } |
| |
| protected void setNewGeneratedLedgerId(long ledgerId) { |
| mockNextLedgerId.set(ledgerId); |
| setupLedgerIdGenerator(); |
| } |
| |
| protected LedgerMetadata getLedgerMetadata(long ledgerId) { |
| return mockLedgerMetadataRegistry.get(ledgerId); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void setupReadLedgerMetadata() { |
| doAnswer(invocation -> { |
| Object[] args = invocation.getArguments(); |
| Long ledgerId = (Long) args[0]; |
| CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>(); |
| executor.executeOrdered(ledgerId, () -> { |
| LedgerMetadata ledgerMetadata = mockLedgerMetadataRegistry.get(ledgerId); |
| if (ledgerMetadata == null) { |
| promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException()); |
| } else { |
| promise.complete(new Versioned<>(ledgerMetadata, new LongVersion(1))); |
| } |
| }); |
| return promise; |
| }).when(ledgerManager).readLedgerMetadata(anyLong()); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void setupRemoveLedgerMetadata() { |
| doAnswer(invocation -> { |
| Object[] args = invocation.getArguments(); |
| Long ledgerId = (Long) args[0]; |
| CompletableFuture<Void> promise = new CompletableFuture<>(); |
| executor.executeOrdered(ledgerId, () -> { |
| if (mockLedgerMetadataRegistry.remove(ledgerId) != null) { |
| promise.complete(null); |
| } else { |
| promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException()); |
| } |
| }); |
| return promise; |
| }).when(ledgerManager).removeLedgerMetadata(anyLong(), any()); |
| } |
| |
| private void setupRegisterLedgerMetadataListener() { |
| doAnswer((Answer<Void>) new Answer<Void>() { |
| @Override |
| @SuppressWarnings("unchecked") |
| public Void answer(InvocationOnMock invocation) throws Throwable { |
| return null; |
| } |
| }).when(ledgerManager).registerLedgerMetadataListener(anyLong(), any()); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void setupLedgerIdGenerator() { |
| doAnswer(invocation -> { |
| Object[] args = invocation.getArguments(); |
| BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[0]; |
| cb.operationComplete(Code.OK, mockNextLedgerId.getAndIncrement()); |
| return null; |
| }).when(ledgerIdGenerator).generateLedgerId(any()); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void setupCreateLedgerMetadata() { |
| doAnswer(invocation -> { |
| Object[] args = invocation.getArguments(); |
| Long ledgerId = (Long) args[0]; |
| |
| CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>(); |
| executor.executeOrdered(ledgerId, () -> { |
| |
| LedgerMetadata ledgerMetadata = (LedgerMetadata) args[1]; |
| mockLedgerMetadataRegistry.put(ledgerId, ledgerMetadata); |
| promise.complete(new Versioned<>(ledgerMetadata, new LongVersion(1))); |
| }); |
| return promise; |
| }).when(ledgerManager).createLedgerMetadata(anyLong(), any()); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void setupWriteLedgerMetadata() { |
| doAnswer(invocation -> { |
| Object[] args = invocation.getArguments(); |
| Long ledgerId = (Long) args[0]; |
| LedgerMetadata metadata = (LedgerMetadata) args[1]; |
| Version currentVersion = (Version) args[2]; |
| CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>(); |
| executor.executeOrdered(ledgerId, () -> { |
| LedgerMetadata newMetadata = LedgerMetadataBuilder.from(metadata).build(); |
| mockLedgerMetadataRegistry.put(ledgerId, newMetadata); |
| promise.complete(new Versioned<>(newMetadata, new LongVersion(1234))); |
| }); |
| return promise; |
| }).when(ledgerManager).writeLedgerMetadata(anyLong(), any(), any()); |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected void setupBookieClientReadEntry() { |
| final Stubber stub = doAnswer(invokation -> { |
| Object[] args = invokation.getArguments(); |
| BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0]; |
| long ledgerId = (Long) args[1]; |
| long entryId = (Long) args[2]; |
| BookkeeperInternalCallbacks.ReadEntryCallback callback = |
| (BookkeeperInternalCallbacks.ReadEntryCallback) args[3]; |
| boolean fenced = (((Integer) args[5]) & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING; |
| |
| executor.executeOrdered(ledgerId, () -> { |
| DigestManager macManager = null; |
| try { |
| macManager = getDigestType(ledgerId); |
| } catch (GeneralSecurityException gse){ |
| LOG.error("Initialize macManager fail", gse); |
| } |
| MockEntry mockEntry = null; |
| try { |
| mockEntry = getMockLedgerEntry(ledgerId, bookieSocketAddress, entryId); |
| } catch (BKException bke) { |
| LOG.info("readEntryAndFenceLedger - occur BKException {}@{} at {}", entryId, ledgerId, |
| bookieSocketAddress); |
| callback.readEntryComplete(bke.getCode(), ledgerId, entryId, null, args[5]); |
| } |
| |
| if (fenced) { |
| fencedLedgers.add(ledgerId); |
| } |
| |
| if (mockEntry != null) { |
| LOG.info("readEntry - found mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress); |
| ByteBufList entry = macManager.computeDigestAndPackageForSending(entryId, |
| mockEntry.lastAddConfirmed, mockEntry.payload.length, |
| Unpooled.wrappedBuffer(mockEntry.payload)); |
| callback.readEntryComplete(BKException.Code.OK, ledgerId, entryId, ByteBufList.coalesce(entry), |
| args[4]); |
| entry.release(); |
| } else { |
| LOG.info("readEntry - no such mock entry {}@{} at {}", entryId, ledgerId, bookieSocketAddress); |
| callback.readEntryComplete(BKException.Code.NoSuchEntryException, ledgerId, entryId, null, args[4]); |
| } |
| }); |
| return null; |
| }); |
| |
| stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(), |
| any(BookkeeperInternalCallbacks.ReadEntryCallback.class), |
| any(), anyInt()); |
| |
| stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(), |
| any(BookkeeperInternalCallbacks.ReadEntryCallback.class), |
| any(), anyInt(), any()); |
| |
| stub.when(bookieClient).readEntry(any(), anyLong(), anyLong(), |
| any(BookkeeperInternalCallbacks.ReadEntryCallback.class), |
| any(), anyInt(), any(), anyBoolean()); |
| } |
| |
| private byte[] extractEntryPayload(long ledgerId, long entryId, ByteBufList toSend) |
| throws BKException.BKDigestMatchException { |
| ByteBuf toSendCopy = Unpooled.copiedBuffer(toSend.toArray()); |
| toSendCopy.resetReaderIndex(); |
| DigestManager macManager = null; |
| try { |
| macManager = getDigestType(ledgerId); |
| } catch (GeneralSecurityException gse){ |
| LOG.error("Initialize macManager fail", gse); |
| } |
| ByteBuf content = macManager.verifyDigestAndReturnData(entryId, toSendCopy); |
| byte[] entry = new byte[content.readableBytes()]; |
| content.readBytes(entry); |
| content.resetReaderIndex(); |
| content.release(); |
| return entry; |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected void setupBookieClientAddEntry() { |
| final Stubber stub = doAnswer(invokation -> { |
| Object[] args = invokation.getArguments(); |
| BookkeeperInternalCallbacks.WriteCallback callback = (BookkeeperInternalCallbacks.WriteCallback) args[5]; |
| BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0]; |
| long ledgerId = (Long) args[1]; |
| long entryId = (Long) args[3]; |
| ByteBufList toSend = (ByteBufList) args[4]; |
| Object ctx = args[6]; |
| int options = (int) args[7]; |
| boolean isRecoveryAdd = |
| ((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD; |
| |
| toSend.retain(); |
| executor.executeOrdered(ledgerId, () -> { |
| byte[] entry; |
| try { |
| entry = extractEntryPayload(ledgerId, entryId, toSend); |
| } catch (BKDigestMatchException e) { |
| callback.writeComplete(Code.DigestMatchException, |
| ledgerId, entryId, bookieSocketAddress, ctx); |
| toSend.release(); |
| return; |
| } |
| boolean fenced = fencedLedgers.contains(ledgerId); |
| if (fenced && !isRecoveryAdd) { |
| callback.writeComplete(BKException.Code.LedgerFencedException, |
| ledgerId, entryId, bookieSocketAddress, ctx); |
| } else { |
| if (failedBookies.contains(bookieSocketAddress)) { |
| callback.writeComplete(NoBookieAvailableException, |
| ledgerId, entryId, bookieSocketAddress, ctx); |
| toSend.release(); |
| return; |
| } |
| if (getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).isEmpty()) { |
| registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, |
| bookieSocketAddress, new byte[0], BookieProtocol.INVALID_ENTRY_ID); |
| } |
| registerMockEntryForRead(ledgerId, entryId, bookieSocketAddress, entry, ledgerId); |
| callback.writeComplete(BKException.Code.OK, |
| ledgerId, entryId, bookieSocketAddress, ctx); |
| } |
| toSend.release(); |
| }); |
| |
| return null; |
| }); |
| |
| stub.when(bookieClient).addEntry(any(BookieSocketAddress.class), |
| anyLong(), any(byte[].class), |
| anyLong(), any(ByteBufList.class), |
| any(BookkeeperInternalCallbacks.WriteCallback.class), |
| any(), anyInt(), anyBoolean(), any(EnumSet.class)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected void setupBookieClientForceLedger() { |
| final Stubber stub = doAnswer(invokation -> { |
| Object[] args = invokation.getArguments(); |
| BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0]; |
| long ledgerId = (Long) args[1]; |
| BookkeeperInternalCallbacks.ForceLedgerCallback callback = |
| (BookkeeperInternalCallbacks.ForceLedgerCallback) args[2]; |
| Object ctx = args[3]; |
| |
| Runnable activity = () -> { |
| executor.executeOrdered(ledgerId, () -> { |
| if (failedBookies.contains(bookieSocketAddress)) { |
| callback.forceLedgerComplete(NoBookieAvailableException, ledgerId, bookieSocketAddress, ctx); |
| return; |
| } |
| callback.forceLedgerComplete(BKException.Code.OK, ledgerId, bookieSocketAddress, ctx); |
| }); |
| }; |
| if (suspendedBookiesForForceLedgerAcks.contains(bookieSocketAddress)) { |
| List<Runnable> queue = deferredBookieForceLedgerResponses.computeIfAbsent(bookieSocketAddress, |
| (k) -> new CopyOnWriteArrayList<>()); |
| queue.add(activity); |
| } else { |
| activity.run(); |
| } |
| return null; |
| }); |
| |
| stub.when(bookieClient).forceLedger(any(BookieSocketAddress.class), |
| anyLong(), |
| any(BookkeeperInternalCallbacks.ForceLedgerCallback.class), |
| any()); |
| } |
| |
| } |