blob: 9aa3465ba74c276419e6822ab9ee4c62fa178348 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.callback.LogSegmentListener;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.LogEmptyException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.MetadataException;
import com.twitter.distributedlog.exceptions.UnexpectedException;
import com.twitter.distributedlog.exceptions.ZKException;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
import com.twitter.distributedlog.io.AsyncAbortable;
import com.twitter.distributedlog.io.AsyncCloseable;
import com.twitter.distributedlog.logsegment.LogSegmentCache;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* The base class about log handler on managing log segments.
*
* <h3>Metrics</h3>
* The log handler is a base class on managing log segments. so all the metrics
* here are related to log segments retrieval and exposed under `logsegments`.
* These metrics are all OpStats, in the format of <code>`scope`/logsegments/`op`</code>.
* <p>
* Those operations are:
* <ul>
* <li>force_get_list: force to get the list of log segments.
* <li>get_list: get the list of the log segments. it might just retrieve from
* local log segment cache.
* <li>get_filtered_list: get the filtered list of log segments.
* <li>get_full_list: get the full list of log segments.
* <li>get_inprogress_segment: time between the inprogress log segment created and
* the handler read it.
* <li>get_completed_segment: time between a log segment is turned to completed and
* the handler read it.
* <li>negative_get_inprogress_segment: record the negative values for `get_inprogress_segment`.
* <li>negative_get_completed_segment: record the negative values for `get_completed_segment`.
* <li>recover_last_entry: recovering last entry from a log segment
* <li>recover_scanned_entries: the number of entries that are scanned during recovering.
* </ul>
* @see BKLogWriteHandler
* @see BKLogReadHandler
*/
public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbortable {
static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class);
private static final int LAYOUT_VERSION = -1;
protected final ZKLogMetadata logMetadata;
protected final DistributedLogConfiguration conf;
protected final ZooKeeperClient zooKeeperClient;
protected final BookKeeperClient bookKeeperClient;
protected final LogSegmentMetadataStore metadataStore;
protected final int firstNumEntriesPerReadLastRecordScan;
protected final int maxNumEntriesPerReadLastRecordScan;
protected volatile long lastLedgerRollingTimeMillis = -1;
protected final OrderedScheduler scheduler;
protected final StatsLogger statsLogger;
protected final AlertStatsLogger alertStatsLogger;
private final AtomicBoolean ledgerListWatchSet = new AtomicBoolean(false);
private final AtomicBoolean isFullListFetched = new AtomicBoolean(false);
protected volatile boolean reportGetSegmentStats = false;
private final String lockClientId;
protected final AtomicReference<IOException> metadataException = new AtomicReference<IOException>(null);
// listener
protected final CopyOnWriteArraySet<LogSegmentListener> listeners =
new CopyOnWriteArraySet<LogSegmentListener>();
// Maintain the list of ledgers
protected final LogSegmentCache logSegmentCache;
protected volatile SyncGetLedgersCallback firstGetLedgersTask = null;
protected final AsyncNotification notification;
// log segment filter
protected final LogSegmentFilter filter;
// zookeeper children watcher
private final Watcher getChildrenWatcher;
// trace
protected final long metadataLatencyWarnThresholdMillis;
// Stats
private final OpStatsLogger forceGetListStat;
private final OpStatsLogger getListStat;
private final OpStatsLogger getFilteredListStat;
private final OpStatsLogger getFullListStat;
private final OpStatsLogger getInprogressSegmentStat;
private final OpStatsLogger getCompletedSegmentStat;
private final OpStatsLogger negativeGetInprogressSegmentStat;
private final OpStatsLogger negativeGetCompletedSegmentStat;
private final OpStatsLogger recoverLastEntryStats;
private final OpStatsLogger recoverScannedEntriesStats;
static class SyncGetLedgersCallback implements GenericCallback<List<LogSegmentMetadata>> {
final String path;
final boolean allowEmpty;
final CountDownLatch countDownLatch = new CountDownLatch(1);
final Promise<List<LogSegmentMetadata>> promise =
new Promise<List<LogSegmentMetadata>>();
int rc = KeeperException.Code.APIERROR.intValue();
SyncGetLedgersCallback(String path, boolean allowEmpty) {
this.path = path;
this.allowEmpty = allowEmpty;
}
@Override
public void operationComplete(int rc, List<LogSegmentMetadata> logSegmentMetadatas) {
this.rc = rc;
if (KeeperException.Code.OK.intValue() == rc) {
LOG.debug("Updated ledgers list for {} : {}", path, logSegmentMetadatas);
promise.setValue(logSegmentMetadatas);
} else if (KeeperException.Code.NONODE.intValue() == rc) {
if (allowEmpty) {
promise.setValue(new ArrayList<LogSegmentMetadata>(0));
} else {
promise.setException(new LogNotFoundException("Log " + path + " is not found"));
}
} else {
promise.setException(new MetadataException("Error getting ledgers list for " + path));
}
countDownLatch.countDown();
}
void waitForFinish() throws IOException {
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new DLInterruptedException("Interrupted on getting ledgers list for " + path, e);
}
if (KeeperException.Code.OK.intValue() != rc) {
if (KeeperException.Code.NONODE.intValue() == rc) {
if (!allowEmpty) {
throw new LogNotFoundException("Log " + path + " is not found");
}
} else {
throw new MetadataException("Error getting ledgers list for " + path);
}
}
}
}
static class NOPGetLedgersCallback implements GenericCallback<List<LogSegmentMetadata>> {
final String path;
NOPGetLedgersCallback(String path) {
this.path = path;
}
@Override
public void operationComplete(int rc, List<LogSegmentMetadata> logSegmentMetadatas) {
if (KeeperException.Code.OK.intValue() == rc) {
LOG.debug("Updated ledgers list : {}", path, logSegmentMetadatas);
}
}
}
class WatcherGetLedgersCallback implements GenericCallback<List<LogSegmentMetadata>>, Runnable {
final String path;
WatcherGetLedgersCallback(String path) {
this.path = path;
}
@Override
public void operationComplete(int rc, List<LogSegmentMetadata> logSegmentMetadatas) {
if (KeeperException.Code.OK.intValue() == rc) {
LOG.debug("Updated ledgers list {} : {}", path, logSegmentMetadatas);
} else {
scheduler.schedule(this, conf.getZKRetryBackoffMaxMillis(), TimeUnit.MILLISECONDS);
}
}
@Override
public void run() {
asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR, filter, getChildrenWatcher, this);
}
}
/**
* Construct a Bookkeeper journal manager.
*/
BKLogHandler(ZKLogMetadata metadata,
DistributedLogConfiguration conf,
ZooKeeperClientBuilder zkcBuilder,
BookKeeperClientBuilder bkcBuilder,
LogSegmentMetadataStore metadataStore,
OrderedScheduler scheduler,
StatsLogger statsLogger,
AlertStatsLogger alertStatsLogger,
AsyncNotification notification,
LogSegmentFilter filter,
String lockClientId) {
Preconditions.checkNotNull(zkcBuilder);
Preconditions.checkNotNull(bkcBuilder);
this.logMetadata = metadata;
this.conf = conf;
this.scheduler = scheduler;
this.statsLogger = statsLogger;
this.alertStatsLogger = alertStatsLogger;
this.notification = notification;
this.filter = filter;
this.logSegmentCache = new LogSegmentCache(metadata.getLogName());
firstNumEntriesPerReadLastRecordScan = conf.getFirstNumEntriesPerReadLastRecordScan();
maxNumEntriesPerReadLastRecordScan = conf.getMaxNumEntriesPerReadLastRecordScan();
this.zooKeeperClient = zkcBuilder.build();
LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath());
this.bookKeeperClient = bkcBuilder.build();
this.metadataStore = metadataStore;
if (lockClientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
this.lockClientId = getHostIpLockClientId();
} else {
this.lockClientId = lockClientId;
}
this.getChildrenWatcher = this.zooKeeperClient.getWatcherManager()
.registerChildWatcher(logMetadata.getLogSegmentsPath(), this);
// Traces
this.metadataLatencyWarnThresholdMillis = conf.getMetadataLatencyWarnThresholdMillis();
// Stats
StatsLogger segmentsLogger = statsLogger.scope("logsegments");
forceGetListStat = segmentsLogger.getOpStatsLogger("force_get_list");
getListStat = segmentsLogger.getOpStatsLogger("get_list");
getFilteredListStat = segmentsLogger.getOpStatsLogger("get_filtered_list");
getFullListStat = segmentsLogger.getOpStatsLogger("get_full_list");
getInprogressSegmentStat = segmentsLogger.getOpStatsLogger("get_inprogress_segment");
getCompletedSegmentStat = segmentsLogger.getOpStatsLogger("get_completed_segment");
negativeGetInprogressSegmentStat = segmentsLogger.getOpStatsLogger("negative_get_inprogress_segment");
negativeGetCompletedSegmentStat = segmentsLogger.getOpStatsLogger("negative_get_completed_segment");
recoverLastEntryStats = segmentsLogger.getOpStatsLogger("recover_last_entry");
recoverScannedEntriesStats = segmentsLogger.getOpStatsLogger("recover_scanned_entries");
}
BKLogHandler checkMetadataException() throws IOException {
if (null != metadataException.get()) {
throw metadataException.get();
}
return this;
}
public void reportGetSegmentStats(boolean enabled) {
this.reportGetSegmentStats = enabled;
}
public String getLockClientId() {
return lockClientId;
}
private String getHostIpLockClientId() {
try {
return InetAddress.getLocalHost().toString();
} catch(Exception ex) {
return DistributedLogConstants.UNKNOWN_CLIENT_ID;
}
}
protected void registerListener(LogSegmentListener listener) {
listeners.add(listener);
}
protected void unregisterListener(LogSegmentListener listener) {
listeners.remove(listener);
}
protected void notifyUpdatedLogSegments(List<LogSegmentMetadata> segments) {
for (LogSegmentListener listener : listeners) {
List<LogSegmentMetadata> listToReturn =
new ArrayList<LogSegmentMetadata>(segments);
Collections.sort(listToReturn, LogSegmentMetadata.DESC_COMPARATOR);
listener.onSegmentsUpdated(listToReturn);
}
}
protected void scheduleGetAllLedgersTaskIfNeeded() {
if (isFullListFetched.get()) {
return;
}
asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR, LogSegmentFilter.DEFAULT_FILTER,
null, new NOPGetLedgersCallback(getFullyQualifiedName()));
}
protected void scheduleGetLedgersTask(boolean watch, boolean allowEmpty) {
if (!watch) {
ledgerListWatchSet.set(true);
}
LOG.info("Scheduling get ledgers task for {}, watch = {}.", getFullyQualifiedName(), watch);
firstGetLedgersTask = new SyncGetLedgersCallback(getFullyQualifiedName(), allowEmpty);
asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR, filter,
watch ? getChildrenWatcher : null, firstGetLedgersTask);
LOG.info("Scheduled get ledgers task for {}, watch = {}.", getFullyQualifiedName(), watch);
}
protected void waitFirstGetLedgersTaskToFinish() throws IOException {
SyncGetLedgersCallback task = firstGetLedgersTask;
if (null != task) {
if (LOG.isTraceEnabled()) {
LOG.trace("Wait first getting ledgers task to finish for {}.", getFullyQualifiedName());
}
task.waitForFinish();
}
}
public Future<LogRecordWithDLSN> asyncGetFirstLogRecord() {
final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
asyncGetFullLedgerList(true, true).addEventListener(new FutureEventListener<List<LogSegmentMetadata>>() {
@Override
public void onSuccess(List<LogSegmentMetadata> ledgerList) {
if (ledgerList.isEmpty()) {
promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records"));
return;
}
Future<LogRecordWithDLSN> firstRecord = null;
for (LogSegmentMetadata ledger : ledgerList) {
if (!ledger.isTruncated() && (ledger.getRecordCount() > 0 || ledger.isInProgress())) {
firstRecord = asyncReadFirstUserRecord(ledger, DLSN.InitialDLSN);
break;
}
}
if (null != firstRecord) {
promise.become(firstRecord);
} else {
promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records"));
}
}
@Override
public void onFailure(Throwable cause) {
promise.setException(cause);
}
});
}
@Override
public void onFailure(Throwable cause) {
promise.setException(cause);
}
});
return promise;
}
public Future<LogRecordWithDLSN> getLastLogRecordAsync(final boolean recover, final boolean includeEndOfStream) {
final Promise<LogRecordWithDLSN> promise = new Promise<LogRecordWithDLSN>();
checkLogStreamExistsAsync().addEventListener(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
asyncGetFullLedgerListDesc(true, true).addEventListener(new FutureEventListener<List<LogSegmentMetadata>>() {
@Override
public void onSuccess(List<LogSegmentMetadata> ledgerList) {
if (ledgerList.isEmpty()) {
promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records"));
return;
}
asyncGetLastLogRecord(ledgerList.iterator(), promise, recover, false, includeEndOfStream);
}
@Override
public void onFailure(Throwable cause) {
promise.setException(cause);
}
});
}
@Override
public void onFailure(Throwable cause) {
promise.setException(cause);
}
});
return promise;
}
private void asyncGetLastLogRecord(final Iterator<LogSegmentMetadata> ledgerIter,
final Promise<LogRecordWithDLSN> promise,
final boolean fence,
final boolean includeControlRecord,
final boolean includeEndOfStream) {
if (ledgerIter.hasNext()) {
LogSegmentMetadata metadata = ledgerIter.next();
asyncReadLastRecord(metadata, fence, includeControlRecord, includeEndOfStream).addEventListener(
new FutureEventListener<LogRecordWithDLSN>() {
@Override
public void onSuccess(LogRecordWithDLSN record) {
if (null == record) {
asyncGetLastLogRecord(ledgerIter, promise, fence, includeControlRecord, includeEndOfStream);
} else {
promise.setValue(record);
}
}
@Override
public void onFailure(Throwable cause) {
promise.setException(cause);
}
}
);
} else {
promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " has no records"));
}
}
public LogRecordWithDLSN getLastLogRecord(boolean recover, boolean includeEndOfStream) throws IOException {
checkLogStreamExists();
List<LogSegmentMetadata> ledgerList = getFullLedgerListDesc(true, true);
for (LogSegmentMetadata metadata: ledgerList) {
LogRecordWithDLSN record = recoverLastRecordInLedger(metadata, recover, false, includeEndOfStream);
if (null != record) {
assert(!record.isControl());
LOG.debug("{} getLastLogRecord Returned {}", getFullyQualifiedName(), record);
return record;
}
}
throw new LogEmptyException("Log " + getFullyQualifiedName() + " has no records");
}
public long getLastTxId(boolean recover,
boolean includeEndOfStream) throws IOException {
checkLogStreamExists();
return getLastLogRecord(recover, includeEndOfStream).getTransactionId();
}
public DLSN getLastDLSN(boolean recover,
boolean includeEndOfStream) throws IOException {
checkLogStreamExists();
return getLastLogRecord(recover, includeEndOfStream).getDlsn();
}
public long getLogRecordCount() throws IOException {
try {
checkLogStreamExists();
} catch (LogNotFoundException exc) {
return 0;
}
List<LogSegmentMetadata> ledgerList = getFullLedgerList(true, false);
long count = 0;
for (LogSegmentMetadata l : ledgerList) {
if (l.isInProgress()) {
LogRecord record = recoverLastRecordInLedger(l, false, false, false);
if (null != record) {
count += record.getLastPositionWithinLogSegment();
}
} else {
count += l.getRecordCount();
}
}
return count;
}
private Future<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) {
final LedgerHandleCache handleCache =
LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build();
return ReadUtils.asyncReadFirstUserRecord(
getFullyQualifiedName(),
ledger,
firstNumEntriesPerReadLastRecordScan,
maxNumEntriesPerReadLastRecordScan,
new AtomicInteger(0),
scheduler,
handleCache,
beginDLSN
).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
handleCache.clear();
return BoxedUnit.UNIT;
}
});
}
/**
* This is a helper method to compactly return the record count between two records, the first denoted by
* beginDLSN and the second denoted by endPosition. Its up to the caller to ensure that endPosition refers to
* position in the same ledger as beginDLSN.
*/
private Future<Long> asyncGetLogRecordCount(LogSegmentMetadata ledger, final DLSN beginDLSN, final long endPosition) {
return asyncReadFirstUserRecord(ledger, beginDLSN).map(new Function<LogRecordWithDLSN, Long>() {
public Long apply(final LogRecordWithDLSN beginRecord) {
long recordCount = 0;
if (null != beginRecord) {
recordCount = endPosition + 1 - beginRecord.getLastPositionWithinLogSegment();
}
return recordCount;
}
});
}
/**
* Ledger metadata tells us how many records are in each completed segment, but for the first and last segments
* we may have to crack open the entry and count. For the first entry, we need to do so because beginDLSN may be
* an interior entry. For the last entry, if it is inprogress, we need to recover it and find the last user
* entry.
*/
private Future<Long> asyncGetLogRecordCount(final LogSegmentMetadata ledger, final DLSN beginDLSN) {
if (ledger.isInProgress() && ledger.isDLSNinThisSegment(beginDLSN)) {
return asyncReadLastUserRecord(ledger).flatMap(new Function<LogRecordWithDLSN, Future<Long>>() {
public Future<Long> apply(final LogRecordWithDLSN endRecord) {
if (null != endRecord) {
return asyncGetLogRecordCount(ledger, beginDLSN, endRecord.getLastPositionWithinLogSegment() /* end position */);
} else {
return Future.value((long) 0);
}
}
});
} else if (ledger.isInProgress()) {
return asyncReadLastUserRecord(ledger).map(new Function<LogRecordWithDLSN, Long>() {
public Long apply(final LogRecordWithDLSN endRecord) {
if (null != endRecord) {
return (long) endRecord.getLastPositionWithinLogSegment();
} else {
return (long) 0;
}
}
});
} else if (ledger.isDLSNinThisSegment(beginDLSN)) {
return asyncGetLogRecordCount(ledger, beginDLSN, ledger.getRecordCount() /* end position */);
} else {
return Future.value((long) ledger.getRecordCount());
}
}
/**
* Get a count of records between beginDLSN and the end of the stream.
*
* @param beginDLSN dlsn marking the start of the range
* @return the count of records present in the range
*/
public Future<Long> asyncGetLogRecordCount(final DLSN beginDLSN) {
return checkLogStreamExistsAsync().flatMap(new Function<Void, Future<Long>>() {
public Future<Long> apply(Void done) {
return asyncGetFullLedgerList(true, false).flatMap(new Function<List<LogSegmentMetadata>, Future<Long>>() {
public Future<Long> apply(List<LogSegmentMetadata> ledgerList) {
List<Future<Long>> futureCounts = new ArrayList<Future<Long>>(ledgerList.size());
for (LogSegmentMetadata ledger : ledgerList) {
if (ledger.getLogSegmentSequenceNumber() >= beginDLSN.getLogSegmentSequenceNo()) {
futureCounts.add(asyncGetLogRecordCount(ledger, beginDLSN));
}
}
return Future.collect(futureCounts).map(new Function<List<Long>, Long>() {
public Long apply(List<Long> counts) {
return sum(counts);
}
});
}
});
}
});
}
private Long sum(List<Long> values) {
long sum = 0;
for (Long value : values) {
sum += value;
}
return sum;
}
public long getFirstTxId() throws IOException {
checkLogStreamExists();
List<LogSegmentMetadata> ledgerList = getFullLedgerList(true, true);
// The ledger list should at least have one element
// First TxId is populated even for in progress ledgers
return ledgerList.get(0).getFirstTxId();
}
Future<Void> checkLogStreamExistsAsync() {
final Promise<Void> promise = new Promise<Void>();
try {
final ZooKeeper zk = zooKeeperClient.get();
zk.sync(logMetadata.getLogSegmentsPath(), new AsyncCallback.VoidCallback() {
@Override
public void processResult(int syncRc, String path, Object syncCtx) {
if (KeeperException.Code.NONODE.intValue() == syncRc) {
promise.setException(new LogNotFoundException(
String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
return;
} else if (KeeperException.Code.OK.intValue() != syncRc){
promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(),
KeeperException.create(KeeperException.Code.get(syncRc))));
return;
}
zk.exists(logMetadata.getLogSegmentsPath(), false, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (KeeperException.Code.OK.intValue() == rc) {
promise.setValue(null);
} else if (KeeperException.Code.NONODE.intValue() == rc) {
promise.setException(new LogNotFoundException(String.format("Log %s does not exist or has been deleted", getFullyQualifiedName())));
} else {
promise.setException(new ZKException("Error on checking log existence for " + getFullyQualifiedName(),
KeeperException.create(KeeperException.Code.get(rc))));
}
}
}, null);
}
}, null);
} catch (InterruptedException ie) {
LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie);
promise.setException(new DLInterruptedException("Interrupted while checking "
+ logMetadata.getLogSegmentsPath(), ie));
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
promise.setException(e);
}
return promise;
}
private void checkLogStreamExists() throws IOException {
try {
if (null == Utils.sync(zooKeeperClient, logMetadata.getLogSegmentsPath())
.exists(logMetadata.getLogSegmentsPath(), false)) {
throw new LogNotFoundException("Log " + getFullyQualifiedName() + " doesn't exist");
}
} catch (InterruptedException ie) {
LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie);
throw new DLInterruptedException("Interrupted while checking "
+ logMetadata.getLogSegmentsPath(), ie);
} catch (KeeperException ke) {
LOG.error("Error checking existence for {} : ", logMetadata.getLogSegmentsPath(), ke);
throw new ZKException("Error checking existence for " + getFullyQualifiedName() + " : ", ke);
}
}
@Override
public Future<Void> asyncClose() {
// No-op
this.zooKeeperClient.getWatcherManager().unregisterChildWatcher(logMetadata.getLogSegmentsPath(), this);
return Future.Void();
}
@Override
public Future<Void> asyncAbort() {
return asyncClose();
}
/**
* Find the id of the last edit log transaction written to a edit log
* ledger.
*/
protected Pair<Long, DLSN> readLastTxIdInLedger(LogSegmentMetadata l) throws IOException {
LogRecordWithDLSN record = recoverLastRecordInLedger(l, false, false, true);
if (null == record) {
return Pair.of(DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID, DLSN.InvalidDLSN);
}
else {
return Pair.of(record.getTransactionId(), record.getDlsn());
}
}
/**
* Find the id of the last edit log transaction written to a edit log
* ledger.
*/
protected LogRecordWithDLSN recoverLastRecordInLedger(LogSegmentMetadata l,
boolean fence,
boolean includeControl,
boolean includeEndOfStream)
throws IOException {
return FutureUtils.result(asyncReadLastRecord(l, fence, includeControl, includeEndOfStream));
}
public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final LogSegmentMetadata l) {
return asyncReadLastRecord(l, false, false, false);
}
public Future<LogRecordWithDLSN> asyncReadLastRecord(final LogSegmentMetadata l,
final boolean fence,
final boolean includeControl,
final boolean includeEndOfStream) {
final AtomicInteger numRecordsScanned = new AtomicInteger(0);
final Stopwatch stopwatch = Stopwatch.createStarted();
final LedgerHandleCache handleCache =
LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build();
return ReadUtils.asyncReadLastRecord(
getFullyQualifiedName(),
l,
fence,
includeControl,
includeEndOfStream,
firstNumEntriesPerReadLastRecordScan,
maxNumEntriesPerReadLastRecordScan,
numRecordsScanned,
scheduler,
handleCache
).addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
@Override
public void onSuccess(LogRecordWithDLSN value) {
recoverLastEntryStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
recoverScannedEntriesStats.registerSuccessfulEvent(numRecordsScanned.get());
}
@Override
public void onFailure(Throwable cause) {
recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
}
}).ensure(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
handleCache.clear();
return BoxedUnit.UNIT;
}
});
}
protected void setLastLedgerRollingTimeMillis(long rollingTimeMillis) {
if (lastLedgerRollingTimeMillis < rollingTimeMillis) {
lastLedgerRollingTimeMillis = rollingTimeMillis;
}
}
public String getFullyQualifiedName() {
return logMetadata.getFullyQualifiedName();
}
// Ledgers Related Functions
// ***Note***
// Get ledger list should go through #getCachedLogSegments as we need to assign start sequence id for inprogress log
// segment so the reader could generate the right sequence id.
protected List<LogSegmentMetadata> getCachedLogSegments(Comparator<LogSegmentMetadata> comparator)
throws UnexpectedException {
try {
return logSegmentCache.getLogSegments(comparator);
} catch (UnexpectedException ue) {
// the log segments cache went wrong
LOG.error("Unexpected exception on getting log segments from the cache for stream {}",
getFullyQualifiedName(), ue);
metadataException.compareAndSet(null, ue);
throw ue;
}
}
protected List<LogSegmentMetadata> getFullLedgerList(boolean forceFetch, boolean throwOnEmpty)
throws IOException {
return getLedgerList(forceFetch, true, LogSegmentMetadata.COMPARATOR, throwOnEmpty);
}
protected List<LogSegmentMetadata> getFullLedgerListDesc(boolean forceFetch, boolean throwOnEmpty)
throws IOException {
return getLedgerList(forceFetch, true, LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
}
protected List<LogSegmentMetadata> getFilteredLedgerList(boolean forceFetch, boolean throwOnEmpty)
throws IOException {
return getLedgerList(forceFetch, false, LogSegmentMetadata.COMPARATOR, throwOnEmpty);
}
protected List<LogSegmentMetadata> getFilteredLedgerListDesc(boolean forceFetch, boolean throwOnEmpty)
throws IOException {
return getLedgerList(forceFetch, false, LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
}
protected List<LogSegmentMetadata> getLedgerList(boolean forceFetch,
boolean fetchFullList,
Comparator<LogSegmentMetadata> comparator,
boolean throwOnEmpty)
throws IOException {
Stopwatch stopwatch = Stopwatch.createStarted();
boolean success = false;
try {
List<LogSegmentMetadata> segments =
doGetLedgerList(forceFetch, fetchFullList, comparator, throwOnEmpty);
success = true;
return segments;
} finally {
OpStatsLogger statsLogger = fetchFullList ? getFullListStat : getFilteredListStat;
if (success) {
statsLogger.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
} else {
statsLogger.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
}
}
}
private List<LogSegmentMetadata> doGetLedgerList(boolean forceFetch, boolean fetchFullList,
Comparator<LogSegmentMetadata> comparator,
boolean throwOnEmpty)
throws IOException {
if (fetchFullList) {
if (forceFetch || !isFullListFetched.get()) {
return forceGetLedgerList(comparator, LogSegmentFilter.DEFAULT_FILTER, throwOnEmpty);
} else {
return getCachedLogSegments(comparator);
}
} else {
if (forceFetch) {
return forceGetLedgerList(comparator, filter, throwOnEmpty);
} else {
if(!ledgerListWatchSet.get()) {
scheduleGetLedgersTask(true, true);
}
waitFirstGetLedgersTaskToFinish();
return getCachedLogSegments(comparator);
}
}
}
/**
* Get a list of all segments in the journal.
*/
protected List<LogSegmentMetadata> forceGetLedgerList(final Comparator<LogSegmentMetadata> comparator,
final LogSegmentFilter segmentFilter,
boolean throwOnEmpty) throws IOException {
final List<LogSegmentMetadata> ledgers = new ArrayList<LogSegmentMetadata>();
final AtomicInteger result = new AtomicInteger(-1);
final CountDownLatch latch = new CountDownLatch(1);
Stopwatch stopwatch = Stopwatch.createStarted();
asyncGetLedgerListInternal(comparator, segmentFilter, null, new GenericCallback<List<LogSegmentMetadata>>() {
@Override
public void operationComplete(int rc, List<LogSegmentMetadata> logSegmentMetadatas) {
result.set(rc);
if (KeeperException.Code.OK.intValue() == rc) {
ledgers.addAll(logSegmentMetadatas);
} else {
LOG.error("Failed to get ledger list for {} : with error {}", getFullyQualifiedName(), rc);
}
latch.countDown();
}
}, new AtomicInteger(conf.getZKNumRetries()), new AtomicLong(conf.getZKRetryBackoffStartMillis()));
try {
latch.await();
} catch (InterruptedException e) {
forceGetListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
throw new DLInterruptedException("Interrupted on reading ledger list from zkfor " + getFullyQualifiedName(), e);
}
long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
KeeperException.Code rc = KeeperException.Code.get(result.get());
if (rc == KeeperException.Code.OK) {
forceGetListStat.registerSuccessfulEvent(elapsedMicros);
} else {
forceGetListStat.registerFailedEvent(elapsedMicros);
if (KeeperException.Code.NONODE == rc) {
throw new LogNotFoundException("Log " + getFullyQualifiedName() + " is not found");
} else {
throw new IOException("ZK Exception " + rc + " reading ledger list for " + getFullyQualifiedName());
}
}
if (throwOnEmpty && ledgers.isEmpty()) {
throw new LogEmptyException("Log " + getFullyQualifiedName() + " is empty");
}
return ledgers;
}
protected Future<List<LogSegmentMetadata>> asyncGetFullLedgerList(boolean forceFetch, boolean throwOnEmpty) {
return asyncGetLedgerList(forceFetch, true, LogSegmentMetadata.COMPARATOR, throwOnEmpty);
}
protected Future<List<LogSegmentMetadata>> asyncGetFullLedgerListDesc(boolean forceFetch, boolean throwOnEmpty) {
return asyncGetLedgerList(forceFetch, true, LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
}
protected Future<List<LogSegmentMetadata>> asyncGetFilteredLedgerList(boolean forceFetch, boolean throwOnEmpty) {
return asyncGetLedgerList(forceFetch, false, LogSegmentMetadata.COMPARATOR, throwOnEmpty);
}
protected Future<List<LogSegmentMetadata>> asyncGetFilteredLedgerListDesc(boolean forceFetch, boolean throwOnEmpty) {
return asyncGetLedgerList(forceFetch, false, LogSegmentMetadata.DESC_COMPARATOR, throwOnEmpty);
}
protected Future<List<LogSegmentMetadata>> asyncGetLedgerList(final boolean forceFetch,
final boolean fetchFullList,
final Comparator<LogSegmentMetadata> comparator,
final boolean throwOnEmpty) {
final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
final Stopwatch stopwatch = Stopwatch.createStarted();
final OpStatsLogger statsLogger = fetchFullList ? getFullListStat : getFilteredListStat;
asyncDoGetLedgerList(forceFetch, fetchFullList, comparator, throwOnEmpty)
.addEventListener(new FutureEventListener<List<LogSegmentMetadata>>() {
@Override
public void onSuccess(List<LogSegmentMetadata> value) {
statsLogger.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
promise.setValue(value);
}
@Override
public void onFailure(Throwable cause) {
statsLogger.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
promise.setException(cause);
}
});
return promise;
}
private Future<List<LogSegmentMetadata>> asyncDoGetLedgerList(final boolean forceFetch,
final boolean fetchFullList,
final Comparator<LogSegmentMetadata> comparator,
final boolean throwOnEmpty) {
if (fetchFullList) {
if (forceFetch || !isFullListFetched.get()) {
return asyncForceGetLedgerList(comparator, LogSegmentFilter.DEFAULT_FILTER, throwOnEmpty);
} else {
try {
return Future.value(getCachedLogSegments(comparator));
} catch (UnexpectedException ue) {
return Future.exception(ue);
}
}
} else {
if (forceFetch) {
return asyncForceGetLedgerList(comparator, filter, throwOnEmpty);
} else {
final Promise<List<LogSegmentMetadata>> promise =
new Promise<List<LogSegmentMetadata>>();
SyncGetLedgersCallback task = firstGetLedgersTask;
task.promise.addEventListener(new FutureEventListener<List<LogSegmentMetadata>>() {
@Override
public void onSuccess(List<LogSegmentMetadata> value) {
try {
promise.setValue(getCachedLogSegments(comparator));
} catch (UnexpectedException e) {
promise.setException(e);
}
}
@Override
public void onFailure(Throwable cause) {
promise.setException(cause);
}
});
return promise;
}
}
}
protected Future<List<LogSegmentMetadata>> asyncForceGetLedgerList(final Comparator<LogSegmentMetadata> comparator,
final LogSegmentFilter segmentFilter,
final boolean throwOnEmpty) {
final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
final Stopwatch stopwatch = Stopwatch.createStarted();
asyncGetLedgerListWithRetries(comparator, segmentFilter, null)
.addEventListener(new FutureEventListener<List<LogSegmentMetadata>>() {
@Override
public void onSuccess(List<LogSegmentMetadata> ledgers) {
forceGetListStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
if (ledgers.isEmpty() && throwOnEmpty) {
promise.setException(new LogEmptyException("Log " + getFullyQualifiedName() + " is empty"));
} else {
promise.setValue(ledgers);
}
}
@Override
public void onFailure(Throwable cause) {
forceGetListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
promise.setException(cause);
}
});
return promise;
}
/**
* Add the segment <i>metadata</i> for <i>name</i> in the cache.
*
* @param name
* segment znode name.
* @param metadata
* segment metadata.
*/
protected void addLogSegmentToCache(String name, LogSegmentMetadata metadata) {
logSegmentCache.add(name, metadata);
// update the last ledger rolling time
if (!metadata.isInProgress() && (lastLedgerRollingTimeMillis < metadata.getCompletionTime())) {
lastLedgerRollingTimeMillis = metadata.getCompletionTime();
}
if (reportGetSegmentStats) {
// update stats
long ts = System.currentTimeMillis();
if (metadata.isInProgress()) {
// as we used timestamp as start tx id we could take it as start time
// NOTE: it is a hack here.
long elapsedMillis = ts - metadata.getFirstTxId();
long elapsedMicroSec = TimeUnit.MILLISECONDS.toMicros(elapsedMillis);
if (elapsedMicroSec > 0) {
if (elapsedMillis > metadataLatencyWarnThresholdMillis) {
LOG.warn("{} received inprogress log segment in {} millis: {}",
new Object[] { getFullyQualifiedName(), elapsedMillis, metadata });
}
getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec);
} else {
negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec);
}
} else {
long elapsedMillis = ts - metadata.getCompletionTime();
long elapsedMicroSec = TimeUnit.MILLISECONDS.toMicros(elapsedMillis);
if (elapsedMicroSec > 0) {
if (elapsedMillis > metadataLatencyWarnThresholdMillis) {
LOG.warn("{} received completed log segment in {} millis : {}",
new Object[] { getFullyQualifiedName(), elapsedMillis, metadata });
}
getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec);
} else {
negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec);
}
}
}
}
protected LogSegmentMetadata readLogSegmentFromCache(String name) {
return logSegmentCache.get(name);
}
protected LogSegmentMetadata removeLogSegmentFromCache(String name) {
return logSegmentCache.remove(name);
}
public void asyncGetLedgerList(final Comparator<LogSegmentMetadata> comparator,
Watcher watcher,
final GenericCallback<List<LogSegmentMetadata>> callback) {
asyncGetLedgerListWithRetries(comparator, filter, watcher, callback);
}
protected Future<List<LogSegmentMetadata>> asyncGetLedgerListWithRetries(Comparator<LogSegmentMetadata> comparator,
LogSegmentFilter segmentFilter,
Watcher watcher) {
final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
asyncGetLedgerListWithRetries(comparator, segmentFilter, watcher, new GenericCallback<List<LogSegmentMetadata>>() {
@Override
public void operationComplete(int rc, List<LogSegmentMetadata> segments) {
if (KeeperException.Code.OK.intValue() == rc) {
promise.setValue(segments);
} else if (KeeperException.Code.NONODE.intValue() == rc) {
promise.setException(new LogNotFoundException("Log " + getFullyQualifiedName() + " not found"));
} else {
String errMsg = "ZK Exception " + rc + " reading ledger list for " + getFullyQualifiedName();
promise.setException(new ZKException(errMsg, KeeperException.Code.get(rc)));
}
}
});
return promise;
}
private void asyncGetLedgerListWithRetries(final Comparator<LogSegmentMetadata> comparator,
final LogSegmentFilter segmentFilter,
final Watcher watcher,
final GenericCallback<List<LogSegmentMetadata>> finalCallback) {
asyncGetLedgerListInternal(comparator, segmentFilter, watcher, finalCallback,
new AtomicInteger(conf.getZKNumRetries()), new AtomicLong(conf.getZKRetryBackoffStartMillis()));
}
private void asyncGetLedgerListInternal(final Comparator<LogSegmentMetadata> comparator,
final LogSegmentFilter segmentFilter,
final Watcher watcher,
final GenericCallback<List<LogSegmentMetadata>> finalCallback,
final AtomicInteger numAttemptsLeft,
final AtomicLong backoffMillis) {
final Stopwatch stopwatch = Stopwatch.createStarted();
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Async getting ledger list for {}.", getFullyQualifiedName());
}
final GenericCallback<List<LogSegmentMetadata>> callback = new GenericCallback<List<LogSegmentMetadata>>() {
@Override
public void operationComplete(int rc, List<LogSegmentMetadata> result) {
long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS);
if (KeeperException.Code.OK.intValue() != rc) {
getListStat.registerFailedEvent(elapsedMicros);
} else {
if (LogSegmentFilter.DEFAULT_FILTER == segmentFilter) {
isFullListFetched.set(true);
}
getListStat.registerSuccessfulEvent(elapsedMicros);
}
finalCallback.operationComplete(rc, result);
}
};
zooKeeperClient.get().getChildren(logMetadata.getLogSegmentsPath(), watcher, new AsyncCallback.Children2Callback() {
@Override
public void processResult(final int rc, final String path, final Object ctx, final List<String> children, final Stat stat) {
if (KeeperException.Code.OK.intValue() != rc) {
if ((KeeperException.Code.CONNECTIONLOSS.intValue() == rc ||
KeeperException.Code.SESSIONEXPIRED.intValue() == rc ||
KeeperException.Code.SESSIONMOVED.intValue() == rc) &&
numAttemptsLeft.decrementAndGet() > 0) {
long backoffMs = backoffMillis.get();
backoffMillis.set(Math.min(conf.getZKRetryBackoffMaxMillis(), 2 * backoffMs));
scheduler.schedule(new Runnable() {
@Override
public void run() {
asyncGetLedgerListInternal(comparator, segmentFilter, watcher,
finalCallback, numAttemptsLeft, backoffMillis);
}
}, backoffMs, TimeUnit.MILLISECONDS);
return;
}
callback.operationComplete(rc, null);
return;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Got ledger list from {} : {}", logMetadata.getLogSegmentsPath(), children);
}
ledgerListWatchSet.set(true);
Set<String> segmentsReceived = new HashSet<String>();
segmentsReceived.addAll(segmentFilter.filter(children));
Set<String> segmentsAdded;
final Set<String> removedSegments = Collections.synchronizedSet(new HashSet<String>());
final Map<String, LogSegmentMetadata> addedSegments =
Collections.synchronizedMap(new HashMap<String, LogSegmentMetadata>());
Pair<Set<String>, Set<String>> segmentChanges = logSegmentCache.diff(segmentsReceived);
segmentsAdded = segmentChanges.getLeft();
removedSegments.addAll(segmentChanges.getRight());
if (segmentsAdded.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("No segments added for {}.", getFullyQualifiedName());
}
// update the cache before fetch
logSegmentCache.update(removedSegments, addedSegments);
List<LogSegmentMetadata> segmentList;
try {
segmentList = getCachedLogSegments(comparator);
} catch (UnexpectedException e) {
callback.operationComplete(KeeperException.Code.DATAINCONSISTENCY.intValue(), null);
return;
}
callback.operationComplete(KeeperException.Code.OK.intValue(), segmentList);
notifyUpdatedLogSegments(segmentList);
if (!removedSegments.isEmpty()) {
notifyOnOperationComplete();
}
return;
}
final AtomicInteger numChildren = new AtomicInteger(segmentsAdded.size());
final AtomicInteger numFailures = new AtomicInteger(0);
for (final String segment: segmentsAdded) {
metadataStore.getLogSegment(logMetadata.getLogSegmentPath(segment))
.addEventListener(new FutureEventListener<LogSegmentMetadata>() {
@Override
public void onSuccess(LogSegmentMetadata result) {
addedSegments.put(segment, result);
complete();
}
@Override
public void onFailure(Throwable cause) {
// NONODE exception is possible in two cases
// 1. A log segment was deleted by truncation between the call to getChildren and read
// attempt on the znode corresponding to the segment
// 2. In progress segment has been completed => inprogress ZNode does not exist
if (cause instanceof KeeperException &&
KeeperException.Code.NONODE == ((KeeperException) cause).code()) {
removedSegments.add(segment);
complete();
} else {
// fail fast
if (1 == numFailures.incrementAndGet()) {
int rcToReturn = KeeperException.Code.SYSTEMERROR.intValue();
if (cause instanceof KeeperException) {
rcToReturn = ((KeeperException) cause).code().intValue();
} else if (cause instanceof ZKException) {
rcToReturn = ((ZKException) cause).getKeeperExceptionCode().intValue();
}
// :( properly we need dlog related response code.
callback.operationComplete(rcToReturn, null);
return;
}
}
}
private void complete() {
if (0 == numChildren.decrementAndGet() && numFailures.get() == 0) {
// update the cache only when fetch completed
logSegmentCache.update(removedSegments, addedSegments);
List<LogSegmentMetadata> segmentList;
try {
segmentList = getCachedLogSegments(comparator);
} catch (UnexpectedException e) {
callback.operationComplete(KeeperException.Code.DATAINCONSISTENCY.intValue(), null);
return;
}
callback.operationComplete(KeeperException.Code.OK.intValue(), segmentList);
notifyUpdatedLogSegments(segmentList);
notifyOnOperationComplete();
}
}
});
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
getListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
finalCallback.operationComplete(KeeperException.Code.CONNECTIONLOSS.intValue(), null);
} catch (InterruptedException e) {
getListStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
finalCallback.operationComplete(KeeperException.Code.CONNECTIONLOSS.intValue(), null);
}
}
@Override
public void process(WatchedEvent event) {
if (Watcher.Event.EventType.None.equals(event.getType())) {
if (event.getState() == Watcher.Event.KeeperState.Expired) {
// if the watcher is expired
scheduler.schedule(new WatcherGetLedgersCallback(getFullyQualifiedName()),
conf.getZKRetryBackoffStartMillis(), TimeUnit.MILLISECONDS);
}
} else if (Watcher.Event.EventType.NodeChildrenChanged.equals(event.getType())) {
if (LOG.isTraceEnabled()) {
LOG.trace("LogSegments Changed under {}.", getFullyQualifiedName());
}
asyncGetLedgerListWithRetries(LogSegmentMetadata.COMPARATOR, filter,
getChildrenWatcher, new WatcherGetLedgersCallback(getFullyQualifiedName()));
}
}
void notifyOnOperationComplete() {
if (null != notification) {
notification.notifyOnOperationComplete();
}
}
// ZooKeeper Watchers
Watcher registerExpirationHandler(final ZooKeeperClient.ZooKeeperSessionExpireNotifier onExpired) {
if (conf.getZKNumRetries() > 0) {
return new Watcher() {
@Override
public void process(WatchedEvent event) {
// nop
}
};
}
return zooKeeperClient.registerExpirationHandler(onExpired);
}
boolean unregister(Watcher watcher) {
return zooKeeperClient.unregister(watcher);
}
}