| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.bookkeeper.client; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| import java.security.GeneralSecurityException; |
| import java.util.ArrayDeque; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Enumeration; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.atomic.AtomicLong; |
| import lombok.CustomLog; |
| import lombok.Getter; |
| import org.apache.bookkeeper.client.AsyncCallback.AddCallback; |
| import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; |
| import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; |
| import org.apache.bookkeeper.client.BookKeeper.DigestType; |
| import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; |
| import org.apache.bookkeeper.client.api.LedgerEntries; |
| import org.apache.bookkeeper.client.api.LedgerMetadata; |
| import org.apache.bookkeeper.client.api.ReadHandle; |
| import org.apache.bookkeeper.client.api.WriteFlag; |
| import org.apache.bookkeeper.client.impl.LedgerEntryImpl; |
| import org.apache.bookkeeper.common.concurrent.FutureUtils; |
| import org.apache.bookkeeper.net.BookieId; |
| import org.apache.bookkeeper.versioning.LongVersion; |
| import org.apache.bookkeeper.versioning.Versioned; |
| |
| /** |
| * Mock BK {@link LedgerHandle}. Used by {@link PulsarMockBookKeeper}. |
| */ |
| @CustomLog |
| public class PulsarMockLedgerHandle extends LedgerHandle { |
| |
| final List<LedgerEntryImpl> entries = Collections.synchronizedList(new ArrayList<>()); |
| final PulsarMockBookKeeper bk; |
| final long id; |
| final DigestType digest; |
| final byte[] passwd; |
| final ReadHandle readHandle; |
| long lastEntry = -1; |
| @VisibleForTesting |
| @Getter |
| boolean fenced = false; |
| // Count for total length of the entries |
| final AtomicLong totalLengthCounter = new AtomicLong(0); |
| |
| public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, |
| DigestType digest, byte[] passwd) throws GeneralSecurityException { |
| this(bk, id, digest, passwd, Collections.emptyMap()); |
| } |
| |
| public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, |
| DigestType digest, byte[] passwd, |
| Map<String, byte[]> customMetadata) throws GeneralSecurityException { |
| super(bk.getClientCtx(), id, |
| new Versioned<>(createMetadata(id, digest, passwd, customMetadata), new LongVersion(0L)), |
| digest, passwd, WriteFlag.NONE); |
| this.bk = bk; |
| this.id = id; |
| this.digest = digest; |
| this.passwd = Arrays.copyOf(passwd, passwd.length); |
| |
| readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries, |
| bk::getReadHandleInterceptor, totalLengthCounter); |
| } |
| |
| @Override |
| public void asyncClose(CloseCallback cb, Object ctx) { |
| bk.getProgrammedFailure().thenComposeAsync((res) -> { |
| fenced = true; |
| |
| Versioned<LedgerMetadata> current = getVersionedLedgerMetadata(); |
| Versioned<LedgerMetadata> newMetadata = new Versioned<>( |
| LedgerMetadataBuilder.from(current.getValue()) |
| .withClosedState().withLastEntryId(getLastAddConfirmed()) |
| .withLength(getLength()).build(), |
| new LongVersion(((LongVersion) current.getVersion()).getLongVersion() + 1)); |
| setLedgerMetadata(current, newMetadata); |
| return FutureUtils.value(null); |
| }, bk.executor).whenCompleteAsync((res, exception) -> { |
| if (exception != null) { |
| cb.closeComplete(PulsarMockBookKeeper.getExceptionCode(exception), null, ctx); |
| } else { |
| cb.closeComplete(BKException.Code.OK, this, ctx); |
| } |
| }, bk.executor); |
| } |
| |
| @Override |
| public void asyncReadEntries(final long firstEntry, final long lastEntry, final ReadCallback cb, final Object ctx) { |
| bk.getProgrammedFailure().thenComposeAsync((res) -> { |
| log.debug().attr("first", firstEntry).attr("last", lastEntry) |
| .attr("total", entries.size()).log("readEntries"); |
| final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>(); |
| long entryId = firstEntry; |
| while (entryId <= lastEntry && entryId < entries.size()) { |
| seq.add(new LedgerEntry(entries.get((int) entryId++).duplicate())); |
| } |
| |
| log.debug().attr("entries", seq).log("Entries read"); |
| |
| long readEntriesDelay = bk.getReadEntriesDelayMillis(); |
| if (readEntriesDelay > 0) { |
| try { |
| Thread.sleep(readEntriesDelay); |
| } catch (InterruptedException e) { |
| // ignore |
| } |
| } |
| |
| Enumeration<LedgerEntry> entries = new Enumeration<LedgerEntry>() { |
| @Override |
| public boolean hasMoreElements() { |
| return !seq.isEmpty(); |
| } |
| |
| @Override |
| public LedgerEntry nextElement() { |
| return seq.remove(); |
| } |
| }; |
| return FutureUtils.value(entries); |
| }).whenCompleteAsync((res, exception) -> { |
| if (exception != null) { |
| cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception), |
| PulsarMockLedgerHandle.this, null, ctx); |
| } else { |
| cb.readComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, res, ctx); |
| } |
| }, bk.executor); |
| } |
| |
| @Override |
| public long addEntry(byte[] data) throws InterruptedException, BKException { |
| try { |
| bk.checkProgrammedFail(); |
| } catch (BKException e) { |
| fenced = true; |
| throw e; |
| } |
| |
| if (fenced) { |
| throw BKException.create(BKException.Code.LedgerFencedException); |
| } |
| |
| lastEntry = entries.size(); |
| totalLengthCounter.addAndGet(data.length); |
| entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, Unpooled.wrappedBuffer(data))); |
| return lastEntry; |
| } |
| |
| @Override |
| public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) { |
| asyncAddEntry(data, 0, data.length, cb, ctx); |
| } |
| |
| @Override |
| public void asyncAddEntry(final byte[] data, final int offset, final int length, final AddCallback cb, |
| final Object ctx) { |
| asyncAddEntry(Unpooled.wrappedBuffer(data, offset, length), cb, ctx); |
| } |
| |
| @Override |
| public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object ctx) { |
| bk.getAddEntryFailure().thenComposeAsync((res) -> { |
| long delayMillis = bk.getNextAddEntryDelayMillis(); |
| if (delayMillis > 0) { |
| try { |
| Thread.sleep(delayMillis); |
| } catch (InterruptedException e) { |
| } |
| } |
| |
| if (fenced) { |
| return FutureUtils.exception(new BKException.BKLedgerFencedException()); |
| } else { |
| lastEntry = entries.size(); |
| byte[] storedData = new byte[data.readableBytes()]; |
| data.readBytes(storedData); |
| totalLengthCounter.addAndGet(storedData.length); |
| entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, |
| storedData.length, Unpooled.wrappedBuffer(storedData))); |
| return FutureUtils.value(lastEntry); |
| } |
| |
| }, bk.executor).whenCompleteAsync((entryId, exception) -> { |
| data.release(); |
| if (exception != null) { |
| fenced = true; |
| cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception), |
| PulsarMockLedgerHandle.this, LedgerHandle.INVALID_ENTRY_ID, ctx); |
| } else { |
| long responseDelayMillis = bk.getNextAddEntryResponseDelayMillis(); |
| if (responseDelayMillis > 0) { |
| try { |
| Thread.sleep(responseDelayMillis); |
| } catch (InterruptedException e) { |
| } |
| } |
| cb.addComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, entryId, ctx); |
| } |
| }, bk.executor); |
| } |
| |
| @Override |
| public long getId() { |
| return ledgerId; |
| } |
| |
| @Override |
| public long getLastAddConfirmed() { |
| if (bk.checkReturnEmptyLedger()) { |
| return -1; |
| } else { |
| return lastEntry; |
| } |
| } |
| |
| @Override |
| public long getLength() { |
| return totalLengthCounter.get(); |
| } |
| |
| |
| // ReadHandle interface |
| @Override |
| public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) { |
| return readHandle.readAsync(firstEntry, lastEntry); |
| } |
| |
| @Override |
| public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) { |
| return readHandle.readUnconfirmedAsync(firstEntry, lastEntry); |
| } |
| |
| @Override |
| public CompletableFuture<Long> readLastAddConfirmedAsync() { |
| return readHandle.readLastAddConfirmedAsync(); |
| } |
| |
| @Override |
| public CompletableFuture<Long> tryReadLastAddConfirmedAsync() { |
| return readHandle.tryReadLastAddConfirmedAsync(); |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return readHandle.isClosed(); |
| } |
| |
| @Override |
| public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId, |
| long timeOutInMillis, |
| boolean parallel) { |
| return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel); |
| } |
| |
| private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd, |
| Map<String, byte[]> customMetadata) { |
| List<BookieId> ensemble = new ArrayList<>(PulsarMockBookKeeper.getMockEnsemble()); |
| LedgerMetadataBuilder builder = LedgerMetadataBuilder.create() |
| .withDigestType(digest.toApiDigestType()) |
| .withPassword(passwd) |
| .withId(id) |
| .newEnsembleEntry(0L, ensemble); |
| if (customMetadata != null && !customMetadata.isEmpty()) { |
| builder.withCustomMetadata(customMetadata); |
| } |
| return builder.build(); |
| } |
| } |