blob: 5b49ca7706b1d06e239d5aae32ad9a5198dd06b0 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.StatsLogger;
* A mock for running tests that require ledger storage.
public class MockLedgerStorage implements CompactableLedgerStorage {
private static class LedgerInfo {
boolean limbo = false;
boolean fenced = false;
long lac = -1;
final byte[] masterKey;
LedgerInfo(byte[] masterKey) {
this.masterKey = Arrays.copyOf(masterKey, masterKey.length);
ConcurrentHashMap<Long, ByteBuf> entries = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, LedgerInfo> ledgers = new ConcurrentHashMap<>();
private final EnumSet<StorageState> storageStateFlags = EnumSet.noneOf(StorageState.class);
private final List<EntryLocation> entryLocations = new ArrayList<>();
public void initialize(ServerConfiguration conf,
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
StatsLogger statsLogger,
ByteBufAllocator allocator)
throws IOException {}
public void setStateManager(StateManager stateManager) {}
public void setCheckpointSource(CheckpointSource checkpointSource) {}
public void setCheckpointer(Checkpointer checkpointer) {}
public void start() {}
public void shutdown() throws InterruptedException {}
public boolean ledgerExists(long ledgerId) throws IOException {
return ledgers.containsKey(ledgerId);
public boolean entryExists(long ledgerId, long entryId) throws IOException {
LedgerInfo info = ledgers.get(ledgerId);
if (info == null) {
throw new Bookie.NoLedgerException(ledgerId);
return info != null && info.entries.containsKey(entryId);
public boolean setFenced(long ledgerId) throws IOException {
AtomicBoolean ret = new AtomicBoolean(false);
LedgerInfo previous = ledgers.computeIfPresent(ledgerId, (ledgerId1, current) -> {
if (!current.fenced) {
current.fenced = true;
} else {
return current;
if (previous == null) {
throw new Bookie.NoLedgerException(ledgerId);
return ret.get();
public boolean isFenced(long ledgerId) throws IOException {
LedgerInfo info = ledgers.get(ledgerId);
if (info == null) {
throw new Bookie.NoLedgerException(ledgerId);
return info != null && info.fenced;
public void setLimboState(long ledgerId) throws IOException {
LedgerInfo previous = ledgers.computeIfPresent(ledgerId, (ledgerId1, current) -> {
current.limbo = true;
return current;
if (previous == null) {
throw new Bookie.NoLedgerException(ledgerId);
public boolean hasLimboState(long ledgerId) throws IOException {
LedgerInfo info = ledgers.get(ledgerId);
if (info == null) {
throw new Bookie.NoLedgerException(ledgerId);
return info.limbo;
public void clearLimboState(long ledgerId) throws IOException {
LedgerInfo previous = ledgers.computeIfPresent(ledgerId, (ledgerId1, current) -> {
current.limbo = false;
return current;
if (previous == null) {
throw new Bookie.NoLedgerException(ledgerId);
public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
LedgerInfo previous = ledgers.compute(ledgerId, (ledgerId1, current) -> {
if (current != null) {
return current;
return new LedgerInfo(masterKey);
if (previous != null && !Arrays.equals(masterKey, previous.masterKey)) {
throw new IOException(BookieException.create(BookieException.Code.IllegalOpException));
public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
LedgerInfo info = ledgers.get(ledgerId);
if (info == null) {
throw new Bookie.NoLedgerException(ledgerId);
return Arrays.copyOf(info.masterKey, info.masterKey.length);
public long extractLedgerId(ByteBuf entry) {
return entry.getLong(entry.readerIndex());
public long extractEntryId(ByteBuf entry) {
return entry.getLong(entry.readerIndex() + 8);
public long extractLac(ByteBuf entry) {
return entry.getLong(entry.readerIndex() + 16);
public long addEntry(ByteBuf entry) throws IOException, BookieException {
ByteBuf copy = entry.retain().duplicate();
long ledgerId = extractLedgerId(copy);
long entryId = extractEntryId(copy);
long lac = extractLac(copy);
LedgerInfo previous = ledgers.computeIfPresent(ledgerId, (ledgerId1, current) -> {
if (lac > current.lac) {
current.lac = lac;
current.entries.put(entryId, copy);
return current;
if (previous == null) {
throw new Bookie.NoLedgerException(ledgerId);
return entryId;
public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
public long getLastAddConfirmed(long ledgerId) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
public boolean waitForLastAddConfirmedUpdate(
long ledgerId,
long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
public void cancelWaitForLastAddConfirmedUpdate(
long ledgerId,
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
public void flush() throws IOException {
// this is a noop, as we dont hit disk anyhow
public void checkpoint(Checkpoint checkpoint) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
public void deleteLedger(long ledgerId) throws IOException {
public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
public ByteBuf getExplicitLac(long ledgerId) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
public LedgerStorage getUnderlyingLedgerStorage() {
return CompactableLedgerStorage.super.getUnderlyingLedgerStorage();
public void forceGC() {
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
CompactableLedgerStorage.super.forceGC(forceMajor, forceMinor);
public List<DetectedInconsistency> localConsistencyCheck(Optional<RateLimiter> rateLimiter) throws IOException {
return CompactableLedgerStorage.super.localConsistencyCheck(rateLimiter);
public boolean isInForceGC() {
return CompactableLedgerStorage.super.isInForceGC();
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
return CompactableLedgerStorage.super.getGarbageCollectionStatus();
public PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId)
throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
public List<EntryLocation> getUpdatedLocations() {
return entryLocations;
public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
synchronized (entryLocations) {
for (EntryLocation l : locations) {
public EnumSet<StorageState> getStorageStateFlags() throws IOException {
return storageStateFlags;
public void setStorageStateFlag(StorageState flag) throws IOException {
public void clearStorageStateFlag(StorageState flag) throws IOException {
public void flushEntriesLocationsIndex() throws IOException { }