blob: 2496d697cf2e2ccd3e1d1ba405698fdf30564554 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Arrays;
import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements a ledger inside a bookie. In particular, it implements operations
* to write entries to a ledger and read entries from a ledger.
*/
public class LedgerDescriptorImpl extends LedgerDescriptor {
private static final Logger LOG = LoggerFactory.getLogger(LedgerDescriptorImpl.class);
final LedgerStorage ledgerStorage;
private long ledgerId;
final byte[] masterKey;
private AtomicBoolean fenceEntryPersisted = new AtomicBoolean();
private CompletableFuture<Boolean> logFenceResult = null;
LedgerDescriptorImpl(byte[] masterKey,
long ledgerId,
LedgerStorage ledgerStorage) {
this.masterKey = masterKey;
this.ledgerId = ledgerId;
this.ledgerStorage = ledgerStorage;
}
@Override
void checkAccess(byte[] masterKey) throws BookieException, IOException {
if (!Arrays.equals(this.masterKey, masterKey)) {
LOG.error("[{}] Requested master key {} does not match the cached master key {}",
this.ledgerId, Arrays.toString(masterKey), Arrays.toString(this.masterKey));
throw BookieException.create(BookieException.Code.UnauthorizedAccessException);
}
}
@Override
public long getLedgerId() {
return ledgerId;
}
@Override
boolean setFenced() throws IOException {
return ledgerStorage.setFenced(ledgerId);
}
@Override
boolean isFenced() throws IOException {
return ledgerStorage.isFenced(ledgerId);
}
@Override
void setExplicitLac(ByteBuf lac) throws IOException {
ledgerStorage.setExplicitLac(ledgerId, lac);
}
@Override
ByteBuf getExplicitLac() throws IOException {
return ledgerStorage.getExplicitLac(ledgerId);
}
@Override
synchronized CompletableFuture<Boolean> fenceAndLogInJournal(Journal journal) throws IOException {
boolean success = this.setFenced();
if (success) {
// fenced for first time, we should add the key to journal ensure we can rebuild.
return logFenceEntryInJournal(journal);
} else {
// If we reach here, it means the fence state in FileInfo has been set (may not be persisted yet).
// However, writing the fence log entry to the journal might still be in progress. This can happen
// when a bookie receives two fence requests almost at the same time. The subsequent logic is used
// to check the fencing progress.
if (logFenceResult == null || fenceEntryPersisted.get()){
// Either ledger's fenced state is recovered from Journal
// Or Log fence entry in Journal succeed
CompletableFuture<Boolean> result = FutureUtils.createFuture();
result.complete(true);
return result;
} else if (logFenceResult.isDone()) {
// We failed to log fence entry in Journal, try again.
return logFenceEntryInJournal(journal);
}
// Fencing is in progress
return logFenceResult;
}
}
/**
* Log the fence ledger entry in Journal so that we can rebuild the state.
* @param journal log the fence entry in the Journal
* @return A future which will be satisfied when add entry to journal complete
*/
private CompletableFuture<Boolean> logFenceEntryInJournal(Journal journal) {
CompletableFuture<Boolean> result;
synchronized (this) {
result = logFenceResult = FutureUtils.createFuture();
}
ByteBuf entry = createLedgerFenceEntry(ledgerId);
try {
journal.logAddEntry(entry, false /* ackBeforeSync */, (rc, ledgerId, entryId, addr, ctx) -> {
if (LOG.isDebugEnabled()) {
LOG.debug("Record fenced state for ledger {} in journal with rc {}",
ledgerId, BKException.codeLogger(rc));
}
if (rc == 0) {
fenceEntryPersisted.compareAndSet(false, true);
result.complete(true);
} else {
result.complete(false);
}
}, null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result.completeExceptionally(e);
}
return result;
}
@Override
long addEntry(ByteBuf entry) throws IOException, BookieException {
long ledgerId = entry.getLong(entry.readerIndex());
if (ledgerId != this.ledgerId) {
throw new IOException("Entry for ledger " + ledgerId + " was sent to " + this.ledgerId);
}
return ledgerStorage.addEntry(entry);
}
@Override
ByteBuf readEntry(long entryId) throws IOException {
return ledgerStorage.getEntry(ledgerId, entryId);
}
@Override
long getLastAddConfirmed() throws IOException {
return ledgerStorage.getLastAddConfirmed(ledgerId);
}
@Override
boolean waitForLastAddConfirmedUpdate(long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
}
@Override
void cancelWaitForLastAddConfirmedUpdate(Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
ledgerStorage.cancelWaitForLastAddConfirmedUpdate(ledgerId, watcher);
}
@Override
OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
return ledgerStorage.getListOfEntriesOfLedger(ledgerId);
}
}