| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.bookkeeper.proto; |
| |
| import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD; |
| import static org.apache.bookkeeper.util.SafeRunnable.safeRun; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.api.WriteFlag; |
| import org.apache.bookkeeper.common.concurrent.FutureUtils; |
| import org.apache.bookkeeper.common.util.OrderedExecutor; |
| import org.apache.bookkeeper.net.BookieId; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.FutureGetListOfEntriesOfLedger; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback; |
| import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger; |
| import org.apache.bookkeeper.util.ByteBufList; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| /** |
| * Mock implementation of BookieClient. |
| */ |
| public class MockBookieClient implements BookieClient { |
| static final Logger LOG = LoggerFactory.getLogger(MockBookieClient.class); |
| |
| final OrderedExecutor executor; |
| final MockBookies mockBookies; |
| final Set<BookieId> errorBookies = |
| Collections.newSetFromMap(new ConcurrentHashMap<>()); |
| |
| /** |
| * Runs before or after an operation. Can stall the operation or error it. |
| */ |
| public interface Hook { |
| CompletableFuture<Void> runHook(BookieId bookie, long ledgerId, long entryId); |
| } |
| |
| private Hook preReadHook = (bookie, ledgerId, entryId) -> FutureUtils.value(null); |
| private Hook postReadHook = (bookie, ledgerId, entryId) -> FutureUtils.value(null); |
| private Hook preWriteHook = (bookie, ledgerId, entryId) -> FutureUtils.value(null); |
| private Hook postWriteHook = (bookie, ledgerId, entryId) -> FutureUtils.value(null); |
| |
| public MockBookieClient(OrderedExecutor executor) { |
| this.executor = executor; |
| this.mockBookies = new MockBookies(); |
| } |
| |
| public MockBookieClient(OrderedExecutor executor, |
| MockBookies mockBookies) { |
| this.executor = executor; |
| this.mockBookies = mockBookies; |
| } |
| |
| public void setPreReadHook(Hook hook) { |
| this.preReadHook = hook; |
| } |
| |
| public void setPostReadHook(Hook hook) { |
| this.postReadHook = hook; |
| } |
| |
| public void setPreWriteHook(Hook hook) { |
| this.preWriteHook = hook; |
| } |
| |
| public void setPostWriteHook(Hook hook) { |
| this.postWriteHook = hook; |
| } |
| |
| public void errorBookies(BookieId... bookies) { |
| errorBookies.addAll(Arrays.asList(bookies)); |
| } |
| |
| public void removeErrors(BookieId... bookies) { |
| for (BookieId b : bookies) { |
| errorBookies.remove(b); |
| } |
| } |
| |
| public boolean isErrored(BookieId bookieId) { |
| return errorBookies.contains(bookieId); |
| } |
| |
| public MockBookies getMockBookies() { |
| return mockBookies; |
| } |
| |
| @Override |
| public List<BookieId> getFaultyBookies() { |
| return Collections.emptyList(); |
| } |
| |
| @Override |
| public boolean isWritable(BookieId address, long ledgerId) { |
| return true; |
| } |
| |
| @Override |
| public long getNumPendingRequests(BookieId address, long ledgerId) { |
| return 0; |
| } |
| |
| @Override |
| public void forceLedger(BookieId addr, long ledgerId, |
| ForceLedgerCallback cb, Object ctx) { |
| executor.executeOrdered(ledgerId, |
| safeRun(() -> { |
| cb.forceLedgerComplete(BKException.Code.IllegalOpException, |
| ledgerId, addr, ctx); |
| })); |
| } |
| |
| @Override |
| public void writeLac(BookieId addr, long ledgerId, byte[] masterKey, |
| long lac, ByteBufList toSend, WriteLacCallback cb, Object ctx) { |
| executor.executeOrdered(ledgerId, |
| safeRun(() -> { |
| cb.writeLacComplete(BKException.Code.IllegalOpException, |
| ledgerId, addr, ctx); |
| })); |
| } |
| |
| @Override |
| public void addEntry(BookieId addr, long ledgerId, byte[] masterKey, |
| long entryId, ByteBufList toSend, WriteCallback cb, Object ctx, |
| int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags) { |
| toSend.retain(); |
| preWriteHook.runHook(addr, ledgerId, entryId) |
| .thenComposeAsync( |
| (ignore) -> { |
| LOG.info("[{};L{}] write entry {}", addr, ledgerId, entryId); |
| if (isErrored(addr)) { |
| LOG.warn("[{};L{}] erroring write {}", addr, ledgerId, entryId); |
| return FutureUtils.exception(new BKException.BKWriteException()); |
| } |
| |
| try { |
| if ((options & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD) { |
| mockBookies.recoveryAddEntry(addr, ledgerId, entryId, copyData(toSend)); |
| } else { |
| mockBookies.addEntry(addr, ledgerId, entryId, copyData(toSend)); |
| } |
| } catch (BKException bke) { |
| return FutureUtils.exception(bke); |
| } finally { |
| toSend.release(); |
| } |
| |
| return FutureUtils.value(null); |
| }, executor.chooseThread(ledgerId)) |
| .thenCompose((res) -> postWriteHook.runHook(addr, ledgerId, entryId)) |
| .whenCompleteAsync((res, ex) -> { |
| if (ex != null) { |
| cb.writeComplete(BKException.getExceptionCode(ex, BKException.Code.WriteException), |
| ledgerId, entryId, addr, ctx); |
| } else { |
| cb.writeComplete(BKException.Code.OK, ledgerId, entryId, addr, ctx); |
| } |
| }, executor.chooseThread(ledgerId)); |
| } |
| |
| @Override |
| public void readLac(BookieId addr, long ledgerId, ReadLacCallback cb, Object ctx) { |
| executor.executeOrdered(ledgerId, |
| safeRun(() -> { |
| cb.readLacComplete(BKException.Code.IllegalOpException, |
| ledgerId, null, null, ctx); |
| })); |
| } |
| |
| @Override |
| public void readEntry(BookieId addr, long ledgerId, long entryId, |
| ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey, |
| boolean allowFastFail) { |
| preReadHook.runHook(addr, ledgerId, entryId) |
| .thenComposeAsync((res) -> { |
| LOG.info("[{};L{}] read entry {}", addr, ledgerId, entryId); |
| if (isErrored(addr)) { |
| LOG.warn("[{};L{}] erroring read {}", addr, ledgerId, entryId); |
| return FutureUtils.exception(new BKException.BKReadException()); |
| } |
| |
| try { |
| ByteBuf entry = mockBookies.readEntry(addr, flags, ledgerId, entryId); |
| return FutureUtils.value(entry); |
| } catch (BKException bke) { |
| return FutureUtils.exception(bke); |
| } |
| }, executor.chooseThread(ledgerId)) |
| .thenCompose((buf) -> postReadHook.runHook(addr, ledgerId, entryId).thenApply((res) -> buf)) |
| .whenCompleteAsync((res, ex) -> { |
| if (ex != null) { |
| cb.readEntryComplete(BKException.getExceptionCode(ex, BKException.Code.ReadException), |
| ledgerId, entryId, null, ctx); |
| } else { |
| cb.readEntryComplete(BKException.Code.OK, |
| ledgerId, entryId, res.slice(), ctx); |
| } |
| }, executor.chooseThread(ledgerId)); |
| } |
| |
| @Override |
| public void readEntryWaitForLACUpdate(BookieId addr, |
| long ledgerId, |
| long entryId, |
| long previousLAC, |
| long timeOutInMillis, |
| boolean piggyBackEntry, |
| ReadEntryCallback cb, |
| Object ctx) { |
| executor.executeOrdered(ledgerId, |
| safeRun(() -> { |
| cb.readEntryComplete(BKException.Code.IllegalOpException, |
| ledgerId, entryId, null, ctx); |
| })); |
| } |
| |
| @Override |
| public void getBookieInfo(BookieId addr, long requested, |
| GetBookieInfoCallback cb, Object ctx) { |
| executor.executeOrdered(addr, |
| safeRun(() -> { |
| cb.getBookieInfoComplete(BKException.Code.IllegalOpException, |
| null, ctx); |
| })); |
| } |
| |
| @Override |
| public CompletableFuture<AvailabilityOfEntriesOfLedger> getListOfEntriesOfLedger(BookieId address, |
| long ledgerId) { |
| FutureGetListOfEntriesOfLedger futureResult = new FutureGetListOfEntriesOfLedger(ledgerId); |
| executor.executeOrdered(address, safeRun(() -> { |
| futureResult |
| .completeExceptionally(BKException.create(BKException.Code.IllegalOpException).fillInStackTrace()); |
| })); |
| return futureResult; |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return false; |
| } |
| |
| @Override |
| public void close() { |
| } |
| |
| private static ByteBuf copyData(ByteBufList list) { |
| ByteBuf buf = Unpooled.buffer(list.readableBytes()); |
| for (int i = 0; i < list.size(); i++) { |
| buf.writeBytes(list.getBuffer(i).slice()); |
| } |
| return buf; |
| } |
| } |