blob: 7378a6f91063d8b9e3e6cfcb78d0e159690e9eb2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.client;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.security.GeneralSecurityException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.api.LastConfirmedAndEntry;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Mock BK {@link LedgerHandle}. Used by {@link PulsarMockBookKeeper}.
*/
public class PulsarMockLedgerHandle extends LedgerHandle {
final ArrayList<LedgerEntryImpl> entries = Lists.newArrayList();
final PulsarMockBookKeeper bk;
final long id;
final DigestType digest;
final byte[] passwd;
final ReadHandle readHandle;
long lastEntry = -1;
boolean fenced = false;
public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
DigestType digest, byte[] passwd) throws GeneralSecurityException {
super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id, digest, passwd), new LongVersion(0L)),
digest, passwd, WriteFlag.NONE);
this.bk = bk;
this.id = id;
this.digest = digest;
this.passwd = Arrays.copyOf(passwd, passwd.length);
readHandle = new PulsarMockReadHandle(bk, id, getLedgerMetadata(), entries);
}
@Override
public void asyncClose(CloseCallback cb, Object ctx) {
bk.getProgrammedFailure().thenComposeAsync((res) -> {
fenced = true;
Versioned<LedgerMetadata> current = getVersionedLedgerMetadata();
Versioned<LedgerMetadata> newMetadata = new Versioned<>(
LedgerMetadataBuilder.from(current.getValue())
.withClosedState().withLastEntryId(getLastAddConfirmed())
.withLength(getLength()).build(),
new LongVersion(((LongVersion) current.getVersion()).getLongVersion() + 1));
setLedgerMetadata(current, newMetadata);
return FutureUtils.value(null);
}, bk.executor).whenCompleteAsync((res, exception) -> {
if (exception != null) {
cb.closeComplete(PulsarMockBookKeeper.getExceptionCode(exception), null, ctx);
} else {
cb.closeComplete(BKException.Code.OK, this, ctx);
}
}, bk.executor);
}
@Override
public void asyncReadEntries(final long firstEntry, final long lastEntry, final ReadCallback cb, final Object ctx) {
bk.getProgrammedFailure().thenComposeAsync((res) -> {
log.debug("readEntries: first={} last={} total={}", firstEntry, lastEntry, entries.size());
final Queue<LedgerEntry> seq = new ArrayDeque<LedgerEntry>();
long entryId = firstEntry;
while (entryId <= lastEntry && entryId < entries.size()) {
seq.add(new LedgerEntry(entries.get((int) entryId++).duplicate()));
}
log.debug("Entries read: {}", seq);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
Enumeration<LedgerEntry> entries = new Enumeration<LedgerEntry>() {
@Override
public boolean hasMoreElements() {
return !seq.isEmpty();
}
@Override
public LedgerEntry nextElement() {
return seq.remove();
}
};
return FutureUtils.value(entries);
}).whenCompleteAsync((res, exception) -> {
if (exception != null) {
cb.readComplete(PulsarMockBookKeeper.getExceptionCode(exception),
PulsarMockLedgerHandle.this, null, ctx);
} else {
cb.readComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, res, ctx);
}
}, bk.executor);
}
@Override
public long addEntry(byte[] data) throws InterruptedException, BKException {
try {
bk.checkProgrammedFail();
} catch (BKException e) {
fenced = true;
throw e;
}
if (fenced) {
throw BKException.create(BKException.Code.LedgerFencedException);
}
lastEntry = entries.size();
entries.add(LedgerEntryImpl.create(ledgerId, lastEntry, data.length, Unpooled.wrappedBuffer(data)));
return lastEntry;
}
@Override
public void asyncAddEntry(final byte[] data, final AddCallback cb, final Object ctx) {
asyncAddEntry(data, 0, data.length, cb, ctx);
}
@Override
public void asyncAddEntry(final byte[] data, final int offset, final int length, final AddCallback cb,
final Object ctx) {
asyncAddEntry(Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
}
@Override
public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object ctx) {
bk.getProgrammedFailure().thenComposeAsync((res) -> {
Long delayMillis = bk.addEntryDelaysMillis.poll();
if (delayMillis == null) {
delayMillis = 1L;
}
try {
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
}
if (fenced) {
return FutureUtils.exception(new BKException.BKLedgerFencedException());
} else {
lastEntry = entries.size();
byte[] storedData = new byte[data.readableBytes()];
data.readBytes(storedData);
entries.add(LedgerEntryImpl.create(ledgerId, lastEntry,
storedData.length, Unpooled.wrappedBuffer(storedData)));
return FutureUtils.value(lastEntry);
}
}, bk.executor).whenCompleteAsync((entryId, exception) -> {
data.release();
if (exception != null) {
fenced = true;
cb.addComplete(PulsarMockBookKeeper.getExceptionCode(exception),
PulsarMockLedgerHandle.this, LedgerHandle.INVALID_ENTRY_ID, ctx);
} else {
cb.addComplete(BKException.Code.OK, PulsarMockLedgerHandle.this, entryId, ctx);
}
}, bk.executor);
}
@Override
public long getId() {
return ledgerId;
}
@Override
public long getLastAddConfirmed() {
if (bk.checkReturnEmptyLedger()) {
return -1;
} else {
return lastEntry;
}
}
@Override
public long getLength() {
long length = 0;
for (LedgerEntryImpl entry : entries) {
length += entry.getLength();
}
return length;
}
// ReadHandle interface
@Override
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
return readHandle.readAsync(firstEntry, lastEntry);
}
@Override
public CompletableFuture<LedgerEntries> readUnconfirmedAsync(long firstEntry, long lastEntry) {
return readHandle.readUnconfirmedAsync(firstEntry, lastEntry);
}
@Override
public CompletableFuture<Long> readLastAddConfirmedAsync() {
return readHandle.readLastAddConfirmedAsync();
}
@Override
public CompletableFuture<Long> tryReadLastAddConfirmedAsync() {
return readHandle.tryReadLastAddConfirmedAsync();
}
@Override
public boolean isClosed() {
return readHandle.isClosed();
}
@Override
public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntryAsync(long entryId,
long timeOutInMillis,
boolean parallel) {
return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel);
}
private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd) {
List<BookieId> ensemble = new ArrayList<>(PulsarMockBookKeeper.getMockEnsemble());
return LedgerMetadataBuilder.create()
.withDigestType(digest.toApiDigestType())
.withPassword(passwd)
.withId(id)
.newEnsembleEntry(0L, ensemble)
.build();
}
private static final Logger log = LoggerFactory.getLogger(PulsarMockLedgerHandle.class);
}