blob: 5b49ca7706b1d06e239d5aae32ad9a5198dd06b0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.bookie;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.PrimitiveIterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.StatsLogger;
/**
* A mock for running tests that require ledger storage.
*/
public class MockLedgerStorage implements CompactableLedgerStorage {
private static class LedgerInfo {
boolean limbo = false;
boolean fenced = false;
long lac = -1;
final byte[] masterKey;
LedgerInfo(byte[] masterKey) {
this.masterKey = Arrays.copyOf(masterKey, masterKey.length);
}
ConcurrentHashMap<Long, ByteBuf> entries = new ConcurrentHashMap<>();
}
private final ConcurrentHashMap<Long, LedgerInfo> ledgers = new ConcurrentHashMap<>();
private final EnumSet<StorageState> storageStateFlags = EnumSet.noneOf(StorageState.class);
private final List<EntryLocation> entryLocations = new ArrayList<>();
@Override
public void initialize(ServerConfiguration conf,
LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager,
LedgerDirsManager indexDirsManager,
StatsLogger statsLogger,
ByteBufAllocator allocator)
throws IOException {}
@Override
public void setStateManager(StateManager stateManager) {}
@Override
public void setCheckpointSource(CheckpointSource checkpointSource) {}
@Override
public void setCheckpointer(Checkpointer checkpointer) {}
@Override
public void start() {}
@Override
public void shutdown() throws InterruptedException {}
@Override
public boolean ledgerExists(long ledgerId) throws IOException {
return ledgers.containsKey(ledgerId);
}
@Override
public boolean entryExists(long ledgerId, long entryId) throws IOException {
LedgerInfo info = ledgers.get(ledgerId);
if (info == null) {
throw new Bookie.NoLedgerException(ledgerId);
}
return info != null && info.entries.containsKey(entryId);
}
@Override
public boolean setFenced(long ledgerId) throws IOException {
AtomicBoolean ret = new AtomicBoolean(false);
LedgerInfo previous = ledgers.computeIfPresent(ledgerId, (ledgerId1, current) -> {
if (!current.fenced) {
current.fenced = true;
ret.set(true);
} else {
ret.set(false);
}
return current;
});
if (previous == null) {
throw new Bookie.NoLedgerException(ledgerId);
}
return ret.get();
}
@Override
public boolean isFenced(long ledgerId) throws IOException {
LedgerInfo info = ledgers.get(ledgerId);
if (info == null) {
throw new Bookie.NoLedgerException(ledgerId);
}
return info != null && info.fenced;
}
@Override
public void setLimboState(long ledgerId) throws IOException {
LedgerInfo previous = ledgers.computeIfPresent(ledgerId, (ledgerId1, current) -> {
current.limbo = true;
return current;
});
if (previous == null) {
throw new Bookie.NoLedgerException(ledgerId);
}
}
@Override
public boolean hasLimboState(long ledgerId) throws IOException {
LedgerInfo info = ledgers.get(ledgerId);
if (info == null) {
throw new Bookie.NoLedgerException(ledgerId);
}
return info.limbo;
}
@Override
public void clearLimboState(long ledgerId) throws IOException {
LedgerInfo previous = ledgers.computeIfPresent(ledgerId, (ledgerId1, current) -> {
current.limbo = false;
return current;
});
if (previous == null) {
throw new Bookie.NoLedgerException(ledgerId);
}
}
@Override
public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
LedgerInfo previous = ledgers.compute(ledgerId, (ledgerId1, current) -> {
if (current != null) {
return current;
}
return new LedgerInfo(masterKey);
});
if (previous != null && !Arrays.equals(masterKey, previous.masterKey)) {
throw new IOException(BookieException.create(BookieException.Code.IllegalOpException));
}
}
@Override
public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
LedgerInfo info = ledgers.get(ledgerId);
if (info == null) {
throw new Bookie.NoLedgerException(ledgerId);
}
return Arrays.copyOf(info.masterKey, info.masterKey.length);
}
public long extractLedgerId(ByteBuf entry) {
return entry.getLong(entry.readerIndex());
}
public long extractEntryId(ByteBuf entry) {
return entry.getLong(entry.readerIndex() + 8);
}
public long extractLac(ByteBuf entry) {
return entry.getLong(entry.readerIndex() + 16);
}
@Override
public long addEntry(ByteBuf entry) throws IOException, BookieException {
ByteBuf copy = entry.retain().duplicate();
long ledgerId = extractLedgerId(copy);
long entryId = extractEntryId(copy);
long lac = extractLac(copy);
LedgerInfo previous = ledgers.computeIfPresent(ledgerId, (ledgerId1, current) -> {
if (lac > current.lac) {
current.lac = lac;
}
current.entries.put(entryId, copy);
return current;
});
if (previous == null) {
throw new Bookie.NoLedgerException(ledgerId);
}
return entryId;
}
@Override
public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
@Override
public long getLastAddConfirmed(long ledgerId) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
@Override
public boolean waitForLastAddConfirmedUpdate(
long ledgerId,
long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
@Override
public void cancelWaitForLastAddConfirmedUpdate(
long ledgerId,
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
@Override
public void flush() throws IOException {
// this is a noop, as we dont hit disk anyhow
}
@Override
public void checkpoint(Checkpoint checkpoint) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
@Override
public void deleteLedger(long ledgerId) throws IOException {
ledgers.remove(ledgerId);
}
@Override
public void registerLedgerDeletionListener(LedgerDeletionListener listener) {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
@Override
public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
@Override
public ByteBuf getExplicitLac(long ledgerId) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
@Override
public LedgerStorage getUnderlyingLedgerStorage() {
return CompactableLedgerStorage.super.getUnderlyingLedgerStorage();
}
@Override
public void forceGC() {
CompactableLedgerStorage.super.forceGC();
}
@Override
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
CompactableLedgerStorage.super.forceGC(forceMajor, forceMinor);
}
@Override
public List<DetectedInconsistency> localConsistencyCheck(Optional<RateLimiter> rateLimiter) throws IOException {
return CompactableLedgerStorage.super.localConsistencyCheck(rateLimiter);
}
@Override
public boolean isInForceGC() {
return CompactableLedgerStorage.super.isInForceGC();
}
@Override
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
return CompactableLedgerStorage.super.getGarbageCollectionStatus();
}
@Override
public PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
@Override
public Iterable<Long> getActiveLedgersInRange(long firstLedgerId, long lastLedgerId)
throws IOException {
throw new UnsupportedOperationException("Not supported in mock, implement if you need it");
}
public List<EntryLocation> getUpdatedLocations() {
return entryLocations;
}
@Override
public void updateEntriesLocations(Iterable<EntryLocation> locations) throws IOException {
synchronized (entryLocations) {
for (EntryLocation l : locations) {
entryLocations.add(l);
}
}
}
@Override
public EnumSet<StorageState> getStorageStateFlags() throws IOException {
return storageStateFlags;
}
@Override
public void setStorageStateFlag(StorageState flag) throws IOException {
storageStateFlags.add(flag);
}
@Override
public void clearStorageStateFlag(StorageState flag) throws IOException {
storageStateFlags.remove(flag);
}
@Override
public void flushEntriesLocationsIndex() throws IOException { }
}