blob: 81212d1b70175bedc62b288af9ff159be60142ad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.client;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.api.BKException.Code;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.OpenBuilderBase;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Mocked version of BookKeeper client that keeps all ledgers data in memory.
*
* <p>This mocked client is meant to be used in unit tests for applications using the BookKeeper API.
*/
public class MockBookKeeper extends BookKeeper {
final ExecutorService executor = Executors.newFixedThreadPool(1, new DefaultThreadFactory("mock-bookkeeper"));
final ZooKeeper zkc;
@Override
public ZooKeeper getZkHandle() {
return zkc;
}
@Override
public ClientConfiguration getConf() {
return super.getConf();
}
Map<Long, MockLedgerHandle> ledgers = new ConcurrentHashMap<Long, MockLedgerHandle>();
AtomicLong sequence = new AtomicLong(3);
AtomicBoolean stopped = new AtomicBoolean(false);
AtomicInteger stepsToFail = new AtomicInteger(-1);
int failReturnCode = BKException.Code.OK;
int nextFailReturnCode = BKException.Code.OK;
public MockBookKeeper(ZooKeeper zkc) throws Exception {
this.zkc = zkc;
}
@Override
public LedgerHandle createLedger(DigestType digestType, byte passwd[]) throws BKException {
return createLedger(3, 2, digestType, passwd);
}
@Override
public LedgerHandle createLedger(int ensSize, int qSize, DigestType digestType, byte passwd[]) throws BKException {
return createLedger(ensSize, qSize, qSize, digestType, passwd);
}
@Override
public void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, final DigestType digestType,
final byte[] passwd, final CreateCallback cb, final Object ctx, Map<String, byte[]> properties) {
if (stopped.get()) {
cb.createComplete(BKException.Code.WriteException, null, ctx);
return;
}
executor.execute(new Runnable() {
public void run() {
if (getProgrammedFailStatus()) {
if (failReturnCode != BkTimeoutOperation) {
cb.createComplete(failReturnCode, null, ctx);
}
return;
}
if (stopped.get()) {
cb.createComplete(BKException.Code.WriteException, null, ctx);
return;
}
try {
long id = sequence.getAndIncrement();
log.info("Creating ledger {}", id);
MockLedgerHandle lh = new MockLedgerHandle(MockBookKeeper.this, id, digestType, passwd);
ledgers.put(id, lh);
cb.createComplete(0, lh, ctx);
} catch (Throwable t) {
t.printStackTrace();
}
}
});
}
@Override
public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType,
byte[] passwd) throws BKException {
checkProgrammedFail();
if (stopped.get()) {
throw BKException.create(BKException.Code.WriteException);
}
try {
long id = sequence.getAndIncrement();
log.info("Creating ledger {}", id);
MockLedgerHandle lh = new MockLedgerHandle(this, id, digestType, passwd);
ledgers.put(id, lh);
return lh;
} catch (Throwable t) {
log.error("Exception:", t);
return null;
}
}
@Override
public void asyncCreateLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd, CreateCallback cb,
Object ctx) {
asyncCreateLedger(ensSize, qSize, qSize, digestType, passwd, cb, ctx, Collections.emptyMap());
}
@Override
public void asyncOpenLedger(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx) {
if (getProgrammedFailStatus()) {
if (failReturnCode != BkTimeoutOperation) {
cb.openComplete(failReturnCode, null, ctx);
}
return;
}
if (stopped.get()) {
cb.openComplete(BKException.Code.WriteException, null, ctx);
return;
}
MockLedgerHandle lh = ledgers.get(lId);
if (lh == null) {
cb.openComplete(BKException.Code.NoSuchLedgerExistsOnMetadataServerException, null, ctx);
} else if (lh.digest != digestType) {
cb.openComplete(BKException.Code.DigestMatchException, null, ctx);
} else if (!Arrays.equals(lh.passwd, passwd)) {
cb.openComplete(BKException.Code.UnauthorizedAccessException, null, ctx);
} else {
cb.openComplete(0, lh, ctx);
}
}
@Override
public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte[] passwd, OpenCallback cb, Object ctx) {
asyncOpenLedger(lId, digestType, passwd, cb, ctx);
}
@Override
public void asyncDeleteLedger(long lId, DeleteCallback cb, Object ctx) {
if (getProgrammedFailStatus()) {
if (failReturnCode != BkTimeoutOperation) {
cb.deleteComplete(failReturnCode, ctx);
}
} else if (stopped.get()) {
cb.deleteComplete(BKException.Code.WriteException, ctx);
} else if (ledgers.containsKey(lId)) {
ledgers.remove(lId);
cb.deleteComplete(0, ctx);
} else {
cb.deleteComplete(BKException.Code.NoSuchLedgerExistsOnMetadataServerException, ctx);
}
}
@Override
public void deleteLedger(long lId) throws InterruptedException, BKException {
checkProgrammedFail();
if (stopped.get()) {
throw BKException.create(BKException.Code.WriteException);
}
if (!ledgers.containsKey(lId)) {
throw BKException.create(BKException.Code.NoSuchLedgerExistsOnMetadataServerException);
}
ledgers.remove(lId);
}
@Override
public void close() throws InterruptedException, BKException {
checkProgrammedFail();
shutdown();
}
@Override
public OpenBuilder newOpenLedgerOp() {
return new OpenBuilderBase() {
@Override
public CompletableFuture<ReadHandle> execute() {
CompletableFuture<ReadHandle> promise = new CompletableFuture<ReadHandle>();
final int validateRc = validate();
if (Code.OK != validateRc) {
promise.completeExceptionally(BKException.create(validateRc));
return promise;
} else if (getProgrammedFailStatus()) {
if (failReturnCode != BkTimeoutOperation) {
promise.completeExceptionally(BKException.create(failReturnCode));
}
return promise;
} else if (stopped.get()) {
promise.completeExceptionally(new BKException.BKClientClosedException());
return promise;
}
MockLedgerHandle lh = ledgers.get(ledgerId);
if (lh == null) {
promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException());
} else if (lh.digest != DigestType.fromApiDigestType(digestType)) {
promise.completeExceptionally(new BKException.BKDigestMatchException());
} else if (!Arrays.equals(lh.passwd, password)) {
promise.completeExceptionally(new BKException.BKUnauthorizedAccessException());
} else {
promise.complete(new MockReadHandle(MockBookKeeper.this, ledgerId,
lh.getLedgerMetadata(), lh.entries));
}
return promise;
}
};
}
public void shutdown() {
try {
super.close();
} catch (Exception e) {
}
stopped.set(true);
for (MockLedgerHandle ledger : ledgers.values()) {
ledger.entries.clear();
}
ledgers.clear();
executor.shutdownNow();
}
public boolean isStopped() {
return stopped.get();
}
public Set<Long> getLedgers() {
return ledgers.keySet();
}
void checkProgrammedFail() throws BKException {
int steps = stepsToFail.getAndDecrement();
log.debug("Steps to fail: {}", steps);
if (steps <= 0) {
if (failReturnCode != BKException.Code.OK) {
int rc = failReturnCode;
failReturnCode = nextFailReturnCode;
nextFailReturnCode = BKException.Code.OK;
throw BKException.create(rc);
}
}
}
boolean getProgrammedFailStatus() {
int steps = stepsToFail.getAndDecrement();
log.debug("Steps to fail: {}", steps);
return steps == 0;
}
public void failNow(int rc) {
failNow(rc, BKException.Code.OK);
}
public void failNow(int rc, int nextErrorCode) {
failAfter(0, rc);
}
public void failAfter(int steps, int rc) {
failAfter(steps, rc, BKException.Code.OK);
}
public void failAfter(int steps, int rc, int nextErrorCode) {
stepsToFail.set(steps);
failReturnCode = rc;
this.nextFailReturnCode = nextErrorCode;
}
public void timeoutAfter(int steps) {
stepsToFail.set(steps);
failReturnCode = BkTimeoutOperation;
}
private static final int BkTimeoutOperation = 1000;
private static final Logger log = LoggerFactory.getLogger(MockBookKeeper.class);
}