blob: 93c81d478c78756191e1d969ab461893440da0ed [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.util.HashMap;
import java.util.Map;
import java.util.PrimitiveIterator.OfLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.SnapshotMap;
import org.apache.commons.lang.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Scan all entries in the entry log and rebuild the index file for one ledger.
public class InterleavedStorageRegenerateIndexOp {
private static final Logger LOG = LoggerFactory.getLogger(InterleavedStorageRegenerateIndexOp.class);
private final ServerConfiguration conf;
private final Set<Long> ledgerIds;
private final byte[] masterKey;
public InterleavedStorageRegenerateIndexOp(ServerConfiguration conf, Set<Long> ledgerIds, byte[] password)
throws NoSuchAlgorithmException {
this.conf = conf;
this.ledgerIds = ledgerIds;
this.masterKey = DigestManager.generateMasterKey(password);
static class RecoveryStats {
long firstEntry = Long.MAX_VALUE;
long lastEntry = Long.MIN_VALUE;
long numEntries = 0;
void registerEntry(long entryId) {
if (entryId < firstEntry) {
firstEntry = entryId;
if (entryId > lastEntry) {
lastEntry = entryId;
long getNumEntries() {
return numEntries;
long getFirstEntry() {
return firstEntry;
long getLastEntry() {
return lastEntry;
public void initiate(boolean dryRun) throws IOException {"Starting index rebuilding");
DiskChecker diskChecker = BookieResources.createDiskChecker(conf);
LedgerDirsManager ledgerDirsManager = BookieResources.createLedgerDirsManager(
conf, diskChecker, NullStatsLogger.INSTANCE);
LedgerDirsManager indexDirsManager = BookieResources.createIndexDirsManager(
conf, diskChecker, NullStatsLogger.INSTANCE, ledgerDirsManager);
DefaultEntryLogger entryLogger = new DefaultEntryLogger(conf, ledgerDirsManager);
final LedgerCache ledgerCache;
if (dryRun) {
ledgerCache = new DryRunLedgerCache();
} else {
ledgerCache = new LedgerCacheImpl(conf, new SnapshotMap<Long, Boolean>(),
indexDirsManager, NullStatsLogger.INSTANCE);
Set<Long> entryLogs = entryLogger.getEntryLogsSet();
int totalEntryLogs = entryLogs.size();
int completedEntryLogs = 0;
long startTime = System.nanoTime();"Scanning {} entry logs", totalEntryLogs);
Map<Long, RecoveryStats> stats = new HashMap<>();
for (long entryLogId : entryLogs) {"Scanning {}", entryLogId);
entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
long entryId = entry.getLong(8);
stats.computeIfAbsent(ledgerId, (ignore) -> new RecoveryStats()).registerEntry(entryId);
// Actual location indexed is pointing past the entry size
long location = (entryLogId << 32L) | (offset + 4);
if (LOG.isDebugEnabled()) {
LOG.debug("Rebuilding {}:{} at location {} / {}", ledgerId, entryId, location >> 32,
location & (Integer.MAX_VALUE - 1));
if (!ledgerCache.ledgerExists(ledgerId)) {
ledgerCache.setMasterKey(ledgerId, masterKey);
ledgerCache.putEntryOffset(ledgerId, entryId, location);
public boolean accept(long ledgerId) {
return ledgerIds.contains(ledgerId);
++completedEntryLogs;"Completed scanning of log {}.log -- {} / {}", Long.toHexString(entryLogId), completedEntryLogs,
}"Rebuilding indices done");
for (long ledgerId : ledgerIds) {
RecoveryStats ledgerStats = stats.get(ledgerId);
if (ledgerStats == null || ledgerStats.getNumEntries() == 0) {" {} - No entries found", ledgerId);
} else {" {} - Found {} entries, from {} to {}", ledgerId,
ledgerStats.getNumEntries(), ledgerStats.getFirstEntry(), ledgerStats.getLastEntry());
}"Total time: {}", DurationFormatUtils.formatDurationHMS(
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)));
static class DryRunLedgerCache implements LedgerCache {
public void close() {
public boolean setFenced(long ledgerId) throws IOException {
return false;
public boolean isFenced(long ledgerId) throws IOException {
throw new UnsupportedOperationException();
public void setMasterKey(long ledgerId, byte[] masterKey) throws IOException {
public byte[] readMasterKey(long ledgerId) throws IOException, BookieException {
throw new UnsupportedOperationException();
public boolean ledgerExists(long ledgerId) throws IOException {
return false;
public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
public long getEntryOffset(long ledger, long entry) throws IOException {
throw new UnsupportedOperationException();
public void flushLedger(boolean doAll) throws IOException {
public long getLastEntry(long ledgerId) throws IOException {
throw new UnsupportedOperationException();
public Long getLastAddConfirmed(long ledgerId) throws IOException {
throw new UnsupportedOperationException();
public long updateLastAddConfirmed(long ledgerId, long lac) throws IOException {
throw new UnsupportedOperationException();
public boolean waitForLastAddConfirmedUpdate(long ledgerId,
long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
throw new UnsupportedOperationException();
public void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
throw new UnsupportedOperationException();
public void deleteLedger(long ledgerId) throws IOException {
public void setExplicitLac(long ledgerId, ByteBuf lac) throws IOException {
public ByteBuf getExplicitLac(long ledgerId) {
throw new UnsupportedOperationException();
public PageEntriesIterable listEntries(long ledgerId) throws IOException {
throw new UnsupportedOperationException();
public LedgerIndexMetadata readLedgerIndexMetadata(long ledgerId) throws IOException {
throw new UnsupportedOperationException();
public OfLong getEntriesIterator(long ledgerId) throws IOException {
throw new UnsupportedOperationException();