| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.bookkeeper.client; |
| |
| import com.google.common.collect.Lists; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| |
| import java.security.GeneralSecurityException; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Enumeration; |
| import java.util.List; |
| import java.util.Queue; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.RejectedExecutionException; |
| |
| import org.apache.bookkeeper.client.AsyncCallback.AddCallback; |
| import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; |
| import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; |
| import org.apache.bookkeeper.client.BookKeeper.DigestType; |
| import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; |
| import org.apache.bookkeeper.client.api.LedgerEntries; |
| import org.apache.bookkeeper.client.api.LedgerMetadata; |
| import org.apache.bookkeeper.client.api.ReadHandle; |
| import org.apache.bookkeeper.client.api.WriteFlag; |
| import org.apache.bookkeeper.client.impl.LedgerEntryImpl; |
| import org.apache.bookkeeper.net.BookieSocketAddress; |
| import org.apache.bookkeeper.versioning.LongVersion; |
| import org.apache.bookkeeper.versioning.Versioned; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Mock BK {@link LedgerHandle}. Used by {@link MockBookKeeper}. |
| */ |
| public class MockLedgerHandle extends LedgerHandle { |
| |
| final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList(); |
| final MockBookKeeper bk; |
| final long id; |
| final DigestType digest; |
| final byte[] passwd; |
| final ReadHandle readHandle; |
| long lastEntry = -1; |
| boolean fenced = false; |
| |
| MockLedgerHandle(MockBookKeeper bk, long id, DigestType digest, byte[] passwd) throws GeneralSecurityException { |
| super(bk.getClientCtx(), id, |
| new Versioned<>(createMetadata(digest, passwd), new LongVersion(0L)), |
| digest, passwd, WriteFlag.NONE); |
| this.bk = bk; |
| this.id = id; |
| this.digest = digest; |
| this.passwd = Arrays.copyOf(passwd, passwd.length); |
| |
| readHandle = new MockReadHandle(bk, id, getLedgerMetadata(), entries); |
| } |
| |
| @Override |
| public void asyncClose(CloseCallback cb, Object ctx) { |
| if (bk.getProgrammedFailStatus()) { |
| cb.closeComplete(bk.failReturnCode, this, ctx); |
| return; |
| } |
| |
| fenced = true; |
| try { |
| bk.executor.execute(() -> cb.closeComplete(0, this, ctx)); |
| } catch (RejectedExecutionException e) { |
| cb.closeComplete(0, this, ctx); |
| } |
| |
| } |
| |
| @Override |
| public void asyncReadEntries(final long firstEntry, final long lastEntry, final ReadCallback cb, final Object ctx) { |
| if (bk.isStopped()) { |
| cb.readComplete(-1, MockLedgerHandle.this, null, ctx); |
| return; |
| } |
| |
| bk.executor.execute(new Runnable() { |
| public void run() { |
| if (bk.getProgrammedFailStatus()) { |
| cb.readComplete(bk.failReturnCode, MockLedgerHandle.this, null, ctx); |
| return; |
| } else if (bk.isStopped()) { |
| log.debug("Bookkeeper is closed!"); |
| cb.readComplete(-1, MockLedgerHandle.this, null, ctx); |
| return; |
| } |
| |
| log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size()); |
| final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>(); |
| long entryId = firstEntry; |
| while (entryId <= lastEntry && entryId < entries.size()) { |
| seq.add(new LedgerEntry(entries.get((int) entryId++).duplicate())); |
| } |
| |
| log.debug("Entries read: {}", seq); |
| |
| try { |
| Thread.sleep(1); |
| } catch (InterruptedException e) { |
| } |
| |
| cb.readComplete(0, MockLedgerHandle.this, new Enumeration<LedgerEntry>() { |
| public boolean hasMoreElements() { |
| return !seq.isEmpty(); |
| } |
| |
| public LedgerEntry nextElement() { |
| return seq.remove(); |
| } |
| |
| }, ctx); |
| } |
| }); |
| } |
| |
| @Override |
| public long addEntry(byte[] data) throws InterruptedException, BKException { |
| try { |
| bk.checkProgrammedFail(); |
| } catch (BKException e) { |
| fenced = true; |
| throw e; |
| } |
| |
| if (fenced) { |
| throw BKException.create(BKException.Code.LedgerFencedException); |
| } |
| |
| if (bk.isStopped()) { |
| throw BKException.create(BKException.Code.NoBookieAvailableException); |
| } |
| |
| lastEntry = entries.size(); |
| entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, Unpooled.wrappedBuffer(data))); |
| return lastEntry; |
| } |
| |
| @Override |
| public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) { |
| asyncAddEntry(data, 0, data.length, cb, ctx); |
| } |
| |
| @Override |
| public void asyncAddEntry(final byte[] data, final int offset, final int length, final AddCallback cb, |
| final Object ctx) { |
| asyncAddEntry(Unpooled.wrappedBuffer(data, offset, length), cb, ctx); |
| } |
| |
| @Override |
| public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object ctx) { |
| if (bk.isStopped()) { |
| cb.addComplete(-1, MockLedgerHandle.this, INVALID_ENTRY_ID, ctx); |
| return; |
| } |
| |
| data.retain(); |
| bk.executor.execute(new Runnable() { |
| public void run() { |
| if (bk.getProgrammedFailStatus()) { |
| fenced = true; |
| data.release(); |
| cb.addComplete(bk.failReturnCode, MockLedgerHandle.this, INVALID_ENTRY_ID, ctx); |
| return; |
| } |
| if (bk.isStopped()) { |
| data.release(); |
| cb.addComplete(-1, MockLedgerHandle.this, INVALID_ENTRY_ID, ctx); |
| return; |
| } |
| |
| try { |
| Thread.sleep(1); |
| } catch (InterruptedException e) { |
| } |
| |
| if (fenced) { |
| data.release(); |
| cb.addComplete(BKException.Code.LedgerFencedException, MockLedgerHandle.this, |
| LedgerHandle.INVALID_ENTRY_ID, ctx); |
| } else { |
| lastEntry = entries.size(); |
| byte[] storedData = new byte[data.readableBytes()]; |
| data.readBytes(storedData); |
| entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, |
| storedData.length, Unpooled.wrappedBuffer(storedData))); |
| data.release(); |
| cb.addComplete(0, MockLedgerHandle.this, lastEntry, ctx); |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public long getId() { |
| return ledgerId; |
| } |
| |
| @Override |
| public long getLastAddConfirmed() { |
| return lastEntry; |
| } |
| |
| @Override |
| public long getLength() { |
| long length = 0; |
| for (LedgerEntryImpl entry : entries) { |
| length += entry.getLength(); |
| } |
| |
| return length; |
| } |
| |
| |
| // ReadHandle interface |
| @Override |
| public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) { |
| return readHandle.readAsync(firstEntry, lastEntry); |
| } |
| |
| @Override |
| public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) { |
| return readHandle.readUnconfirmedAsync(firstEntry, lastEntry); |
| } |
| |
| @Override |
| public CompletableFuture<Long> readLastAddConfirmedAsync() { |
| return readHandle.readLastAddConfirmedAsync(); |
| } |
| |
| @Override |
| public CompletableFuture<Long> tryReadLastAddConfirmedAsync() { |
| return readHandle.tryReadLastAddConfirmedAsync(); |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return readHandle.isClosed(); |
| } |
| |
| @Override |
| public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, |
| long timeOutInMillis, |
| boolean parallel) { |
| return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); |
| } |
| |
| private static LedgerMetadata createMetadata(DigestType digest, byte[] passwd) { |
| List<BookieSocketAddress> ensemble = Lists.newArrayList( |
| new BookieSocketAddress("192.0.2.1", 1234), |
| new BookieSocketAddress("192.0.2.2", 1234), |
| new BookieSocketAddress("192.0.2.3", 1234)); |
| return LedgerMetadataBuilder.create() |
| .withDigestType(digest.toApiDigestType()) |
| .withPassword(passwd) |
| .newEnsembleEntry(0L, ensemble) |
| .build(); |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(MockLedgerHandle.class); |
| |
| } |