blob: 33e8c7d7cb3df78bac3cc38bc9f139b99a91d7a5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.base.Charsets.UTF_8;
/**
* A central place on managing open ledgers.
*/
public class LedgerHandleCache {
static final Logger LOG = LoggerFactory.getLogger(LedgerHandleCache.class);
public static Builder newBuilder() {
return new Builder();
}
public static class Builder {
private BookKeeperClient bkc;
private String digestpw;
private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
private Builder() {}
public Builder bkc(BookKeeperClient bkc) {
this.bkc = bkc;
return this;
}
public Builder conf(DistributedLogConfiguration conf) {
this.digestpw = conf.getBKDigestPW();
return this;
}
public Builder statsLogger(StatsLogger statsLogger) {
this.statsLogger = statsLogger;
return this;
}
public LedgerHandleCache build() {
Preconditions.checkNotNull(bkc, "No bookkeeper client is provided");
Preconditions.checkNotNull(digestpw, "No bookkeeper digest password is provided");
Preconditions.checkNotNull(statsLogger, "No stats logger is provided");
return new LedgerHandleCache(bkc, digestpw, statsLogger);
}
}
final ConcurrentHashMap<LedgerDescriptor, RefCountedLedgerHandle> handlesMap =
new ConcurrentHashMap<LedgerDescriptor, RefCountedLedgerHandle>();
private final BookKeeperClient bkc;
private final String digestpw;
private final OpStatsLogger openStats;
private final OpStatsLogger openNoRecoveryStats;
private LedgerHandleCache(BookKeeperClient bkc, String digestpw, StatsLogger statsLogger) {
this.bkc = bkc;
this.digestpw = digestpw;
// Stats
openStats = statsLogger.getOpStatsLogger("open_ledger");
openNoRecoveryStats = statsLogger.getOpStatsLogger("open_ledger_no_recovery");
}
/**
* Open the given ledger <i>ledgerDesc</i>.
*
* @param ledgerDesc
* ledger description
* @param callback
* open callback.
* @param ctx
* callback context
*/
private void asyncOpenLedger(LedgerDescriptor ledgerDesc, AsyncCallback.OpenCallback callback, Object ctx) {
try {
if (!ledgerDesc.isFenced()) {
bkc.get().asyncOpenLedgerNoRecovery(ledgerDesc.getLedgerId(),
BookKeeper.DigestType.CRC32, digestpw.getBytes(UTF_8), callback, ctx);
} else {
bkc.get().asyncOpenLedger(ledgerDesc.getLedgerId(),
BookKeeper.DigestType.CRC32, digestpw.getBytes(UTF_8), callback, ctx);
}
} catch (IOException ace) {
// :) when we can't get bkc, it means bookie handle not available
callback.openComplete(BKException.Code.BookieHandleNotAvailableException, null, ctx);
}
}
/**
* Open the log segment.
*
* @param metadata
* the log segment metadata
* @param fence
* whether to fence the log segment during open
* @return a future presenting the open result.
*/
public Future<LedgerDescriptor> asyncOpenLedger(LogSegmentMetadata metadata, boolean fence) {
final Stopwatch stopwatch = Stopwatch.createStarted();
final OpStatsLogger openStatsLogger = fence ? openStats : openNoRecoveryStats;
final Promise<LedgerDescriptor> promise = new Promise<LedgerDescriptor>();
final LedgerDescriptor ledgerDesc = new LedgerDescriptor(metadata.getLedgerId(), metadata.getLogSegmentSequenceNumber(), fence);
RefCountedLedgerHandle refhandle = handlesMap.get(ledgerDesc);
if (null == refhandle) {
asyncOpenLedger(ledgerDesc, new AsyncCallback.OpenCallback() {
@Override
public void openComplete(int rc, LedgerHandle lh, Object ctx) {
if (BKException.Code.OK != rc) {
promise.setException(BKException.create(rc));
return;
}
RefCountedLedgerHandle newRefHandle = new RefCountedLedgerHandle(lh);
RefCountedLedgerHandle oldRefHandle = handlesMap.putIfAbsent(ledgerDesc, newRefHandle);
if (null != oldRefHandle) {
oldRefHandle.addRef();
if (newRefHandle.removeRef()) {
newRefHandle.handle.asyncClose(new AsyncCallback.CloseCallback() {
@Override
public void closeComplete(int i, LedgerHandle ledgerHandle, Object o) {
// No action necessary
}
}, null);
}
}
promise.setValue(ledgerDesc);
}
}, null);
} else {
refhandle.addRef();
promise.setValue(ledgerDesc);
}
return promise.addEventListener(new FutureEventListener<LedgerDescriptor>() {
@Override
public void onSuccess(LedgerDescriptor value) {
openStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
}
@Override
public void onFailure(Throwable cause) {
openStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
}
});
}
/**
* Open a ledger synchronously.
*
* @param metadata
* log segment metadata
* @param fence
* whether to fence the log segment during open
* @return ledger descriptor
* @throws BKException
*/
public LedgerDescriptor openLedger(LogSegmentMetadata metadata, boolean fence) throws BKException {
return FutureUtils.bkResult(asyncOpenLedger(metadata, fence));
}
private RefCountedLedgerHandle getLedgerHandle(LedgerDescriptor ledgerDescriptor) {
return null == ledgerDescriptor ? null : handlesMap.get(ledgerDescriptor);
}
/**
* Close the ledger asynchronously.
*
* @param ledgerDesc
* ledger descriptor.
* @return future presenting the closing result.
*/
public Future<Void> asyncCloseLedger(LedgerDescriptor ledgerDesc) {
final Promise<Void> promise = new Promise<Void>();
RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc);
if ((null != refhandle) && (refhandle.removeRef())) {
refhandle = handlesMap.remove(ledgerDesc);
if (refhandle.getRefCount() > 0) {
// In the rare race condition that a ref count was added immediately
// after the close de-refed it and the remove was called
// Try to put the handle back in the map
handlesMap.putIfAbsent(ledgerDesc, refhandle);
// ReadOnlyLedgerHandles don't have much overhead, so lets just leave
// the handle open even if it had already been replaced
promise.setValue(null);
} else {
refhandle.handle.asyncClose(new AsyncCallback.CloseCallback() {
@Override
public void closeComplete(int rc, LedgerHandle ledgerHandle, Object ctx) {
if (BKException.Code.OK == rc) {
promise.setValue(null);
} else {
promise.setException(BKException.create(rc));
}
}
}, null);
}
} else {
promise.setValue(null);
}
return promise;
}
/**
* Close the ledger synchronously.
*
* @param ledgerDesc
* ledger descriptor.
* @throws BKException
*/
public void closeLedger(LedgerDescriptor ledgerDesc) throws BKException {
FutureUtils.bkResult(asyncCloseLedger(ledgerDesc));
}
/**
* Get the last add confirmed of <code>ledgerDesc</code>.
*
* @param ledgerDesc
* ledger descriptor.
* @return last add confirmed of <code>ledgerDesc</code>
* @throws BKException
*/
public long getLastAddConfirmed(LedgerDescriptor ledgerDesc) throws BKException {
RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc);
if (null == refhandle) {
LOG.error("Accessing ledger {} without opening.", ledgerDesc);
throw BKException.create(BKException.Code.UnexpectedConditionException);
}
return refhandle.handle.getLastAddConfirmed();
}
/**
* Whether a ledger is closed or not.
*
* @param ledgerDesc
* ledger descriptor.
* @return true if a ledger is closed, otherwise false.
* @throws BKException
*/
public boolean isLedgerHandleClosed(LedgerDescriptor ledgerDesc) throws BKException {
RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc);
if (null == refhandle) {
LOG.error("Accessing ledger {} without opening.", ledgerDesc);
throw BKException.create(BKException.Code.UnexpectedConditionException);
}
return refhandle.handle.isClosed();
}
/**
* Async try read last confirmed.
*
* @param ledgerDesc
* ledger descriptor
* @return future presenting read last confirmed result.
*/
public Future<Long> asyncTryReadLastConfirmed(LedgerDescriptor ledgerDesc) {
RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc);
if (null == refHandle) {
LOG.error("Accessing ledger {} without opening.", ledgerDesc);
return Future.exception(BKException.create(BKException.Code.UnexpectedConditionException));
}
final Promise<Long> promise = new Promise<Long>();
refHandle.handle.asyncTryReadLastConfirmed(new AsyncCallback.ReadLastConfirmedCallback() {
@Override
public void readLastConfirmedComplete(int rc, long lastAddConfirmed, Object ctx) {
if (BKException.Code.OK == rc) {
promise.setValue(lastAddConfirmed);
} else {
promise.setException(BKException.create(rc));
}
}
}, null);
return promise;
}
/**
* Try read last confirmed.
*
* @param ledgerDesc
* ledger descriptor
* @return last confirmed
* @throws BKException
*/
public long tryReadLastConfirmed(LedgerDescriptor ledgerDesc) throws BKException {
return FutureUtils.bkResult(asyncTryReadLastConfirmed(ledgerDesc));
}
/**
* Async read last confirmed and entry
*
* @param ledgerDesc
* ledger descriptor
* @param entryId
* entry id to read
* @param timeOutInMillis
* time out if no newer entry available
* @param parallel
* whether to read from replicas in parallel
*/
public Future<Pair<Long, LedgerEntry>> asyncReadLastConfirmedAndEntry(
LedgerDescriptor ledgerDesc,
long entryId,
long timeOutInMillis,
boolean parallel) {
RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc);
if (null == refHandle) {
LOG.error("Accessing ledger {} without opening.", ledgerDesc);
return Future.exception(BKException.create(BKException.Code.UnexpectedConditionException));
}
final Promise<Pair<Long, LedgerEntry>> promise = new Promise<Pair<Long, LedgerEntry>>();
refHandle.handle.asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel,
new AsyncCallback.ReadLastConfirmedAndEntryCallback() {
@Override
public void readLastConfirmedAndEntryComplete(int rc, long lac, LedgerEntry ledgerEntry, Object ctx) {
if (BKException.Code.OK == rc) {
promise.setValue(Pair.of(lac, ledgerEntry));
} else {
promise.setException(BKException.create(rc));
}
}
}, null);
return promise;
}
/**
* Async Read Entries
*
* @param ledgerDesc
* ledger descriptor
* @param first
* first entry
* @param last
* second entry
*/
public Future<Enumeration<LedgerEntry>> asyncReadEntries(
LedgerDescriptor ledgerDesc, long first, long last) {
RefCountedLedgerHandle refHandle = handlesMap.get(ledgerDesc);
if (null == refHandle) {
LOG.error("Accessing ledger {} without opening.", ledgerDesc);
return Future.exception(BKException.create(BKException.Code.UnexpectedConditionException));
}
final Promise<Enumeration<LedgerEntry>> promise = new Promise<Enumeration<LedgerEntry>>();
refHandle.handle.asyncReadEntries(first, last, new AsyncCallback.ReadCallback() {
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
if (BKException.Code.OK == rc) {
promise.setValue(entries);
} else {
promise.setException(BKException.create(rc));
}
}
}, null);
return promise;
}
public Enumeration<LedgerEntry> readEntries(LedgerDescriptor ledgerDesc, long first, long last)
throws BKException {
return FutureUtils.bkResult(asyncReadEntries(ledgerDesc, first, last));
}
public long getLength(LedgerDescriptor ledgerDesc) throws BKException {
RefCountedLedgerHandle refhandle = getLedgerHandle(ledgerDesc);
if (null == refhandle) {
LOG.error("Accessing ledger {} without opening.", ledgerDesc);
throw BKException.create(BKException.Code.UnexpectedConditionException);
}
return refhandle.handle.getLength();
}
public void clear() {
if (null != handlesMap) {
Iterator<Map.Entry<LedgerDescriptor, RefCountedLedgerHandle>> handlesMapIter = handlesMap.entrySet().iterator();
while (handlesMapIter.hasNext()) {
Map.Entry<LedgerDescriptor, RefCountedLedgerHandle> entry = handlesMapIter.next();
// Make it inaccessible through the map
handlesMapIter.remove();
// now close the ledger
entry.getValue().forceClose();
}
}
}
static class RefCountedLedgerHandle {
public final LedgerHandle handle;
final AtomicLong refcount = new AtomicLong(0);
RefCountedLedgerHandle(LedgerHandle lh) {
this.handle = lh;
addRef();
}
long getRefCount() {
return refcount.get();
}
public void addRef() {
refcount.incrementAndGet();
}
public boolean removeRef() {
return (refcount.decrementAndGet() == 0);
}
public void forceClose() {
try {
handle.close();
} catch (BKException.BKLedgerClosedException exc) {
// Ignore
} catch (Exception exc) {
LOG.warn("Exception while closing ledger {}", handle, exc);
}
}
}
}