blob: a492f51523b14281455aaea8a7694d1a748411bb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.StreamNotReadyException;
import org.apache.distributedlog.exceptions.WriteCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.feature.CoreFeatureKeys;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* BookKeeper based {@link AsyncLogWriter} implementation.
*
* <h3>Metrics</h3>
* All the metrics are exposed under `log_writer`.
* <ul>
* <li> `log_writer/write`: opstats. latency characteristics about the time that write operations spent.
* <li> `log_writer/bulk_write`: opstats. latency characteristics about the time that bulk_write
* operations spent.
* are pending in the queue for long time due to log segment rolling.
* <li> `log_writer/get_writer`: opstats. the time spent on getting the writer. it could spike when there
* is log segment rolling happened during getting the writer. it is a good stat to look into when the latency
* is caused by queuing time.
* <li> `log_writer/pending_request_dispatch`: counter. the number of queued operations that are dispatched
* after log segment is rolled. it is an metric on measuring how many operations has been queued because of
* log segment rolling.
* </ul>
* See {@link BKLogSegmentWriter} for segment writer stats.
*/
class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter {
static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogWriter.class);
static Function<List<LogSegmentMetadata>, Boolean> truncationResultConverter =
segments -> true;
// Records pending for roll log segment.
class PendingLogRecord implements FutureEventListener<DLSN> {
final LogRecord record;
final CompletableFuture<DLSN> promise;
final boolean flush;
PendingLogRecord(LogRecord record, boolean flush) {
this.record = record;
this.promise = new CompletableFuture<DLSN>();
this.flush = flush;
}
@Override
public void onSuccess(DLSN value) {
promise.complete(value);
}
@Override
public void onFailure(Throwable cause) {
promise.completeExceptionally(cause);
encounteredError = true;
}
}
/**
* Last pending record in current log segment. After it is satisified, it would
* roll log segment.
* This implementation is based on the assumption that all future satisified in same
* order future pool.
*/
class LastPendingLogRecord extends PendingLogRecord {
LastPendingLogRecord(LogRecord record, boolean flush) {
super(record, flush);
}
@Override
public void onSuccess(DLSN value) {
super.onSuccess(value);
// roll log segment and issue all pending requests.
rollLogSegmentAndIssuePendingRequests(record.getTransactionId());
}
@Override
public void onFailure(Throwable cause) {
super.onFailure(cause);
// error out pending requests.
errorOutPendingRequestsAndWriter(cause);
}
}
private final boolean streamFailFast;
private final boolean disableRollOnSegmentError;
private LinkedList<PendingLogRecord> pendingRequests = null;
private volatile boolean encounteredError = false;
private CompletableFuture<BKLogSegmentWriter> rollingFuture = null;
private long lastTxId = DistributedLogConstants.INVALID_TXID;
private final StatsLogger statsLogger;
private final OpStatsLogger writeOpStatsLogger;
private final OpStatsLogger markEndOfStreamOpStatsLogger;
private final OpStatsLogger bulkWriteOpStatsLogger;
private final OpStatsLogger getWriterOpStatsLogger;
private final Counter pendingRequestDispatch;
private final Feature disableLogSegmentRollingFeature;
BKAsyncLogWriter(DistributedLogConfiguration conf,
DynamicDistributedLogConfiguration dynConf,
BKDistributedLogManager bkdlm,
BKLogWriteHandler writeHandler, /** log writer owns the handler **/
FeatureProvider featureProvider,
StatsLogger dlmStatsLogger) {
super(conf, dynConf, bkdlm);
this.writeHandler = writeHandler;
this.streamFailFast = conf.getFailFastOnStreamNotReady();
this.disableRollOnSegmentError = conf.getDisableRollingOnLogSegmentError();
// features
disableLogSegmentRollingFeature = featureProvider
.getFeature(CoreFeatureKeys.DISABLE_LOGSEGMENT_ROLLING.name().toLowerCase());
// stats
this.statsLogger = dlmStatsLogger.scope("log_writer");
this.writeOpStatsLogger = statsLogger.getOpStatsLogger("write");
this.markEndOfStreamOpStatsLogger = statsLogger.getOpStatsLogger("mark_end_of_stream");
this.bulkWriteOpStatsLogger = statsLogger.getOpStatsLogger("bulk_write");
this.getWriterOpStatsLogger = statsLogger.getOpStatsLogger("get_writer");
this.pendingRequestDispatch = statsLogger.getCounter("pending_request_dispatch");
}
@VisibleForTesting
synchronized void setLastTxId(long txId) {
lastTxId = Math.max(lastTxId, txId);
}
@Override
public synchronized long getLastTxId() {
return lastTxId;
}
/**
* Write a log record as control record.
* The method will be used by Monitor Service to enforce a new inprogress segment.
*
* @param record
* log record
* @return future of the write
*/
public CompletableFuture<DLSN> writeControlRecord(final LogRecord record) {
record.setControl();
return write(record);
}
private BKLogSegmentWriter getCachedLogSegmentWriter() throws WriteException {
if (encounteredError) {
throw new WriteException(bkDistributedLogManager.getStreamName(),
"writer has been closed due to error.");
}
BKLogSegmentWriter segmentWriter = getCachedLogWriter();
if (null != segmentWriter
&& segmentWriter.isLogSegmentInError()
&& !disableRollOnSegmentError) {
return null;
} else {
return segmentWriter;
}
}
private CompletableFuture<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid,
boolean bestEffort,
boolean rollLog,
boolean allowMaxTxID) {
Stopwatch stopwatch = Stopwatch.createStarted();
return FutureUtils.stats(
doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, allowMaxTxID),
getWriterOpStatsLogger,
stopwatch);
}
private CompletableFuture<BKLogSegmentWriter> doGetLogSegmentWriter(final long firstTxid,
final boolean bestEffort,
final boolean rollLog,
final boolean allowMaxTxID) {
if (encounteredError) {
return FutureUtils.exception(new WriteException(bkDistributedLogManager.getStreamName(),
"writer has been closed due to error."));
}
CompletableFuture<BKLogSegmentWriter> writerFuture = asyncGetLedgerWriter(!disableRollOnSegmentError);
if (null == writerFuture) {
return rollLogSegmentIfNecessary(null, firstTxid, bestEffort, allowMaxTxID);
} else if (rollLog) {
return writerFuture.thenCompose(
writer -> rollLogSegmentIfNecessary(writer, firstTxid, bestEffort, allowMaxTxID));
} else {
return writerFuture;
}
}
/**
* We write end of stream marker by writing a record with MAX_TXID, so we need to allow using
* max txid when rolling for this case only.
*/
private CompletableFuture<BKLogSegmentWriter> getLogSegmentWriterForEndOfStream() {
return getLogSegmentWriter(DistributedLogConstants.MAX_TXID,
false /* bestEffort */,
false /* roll log */,
true /* allow max txid */);
}
private CompletableFuture<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid,
boolean bestEffort,
boolean rollLog) {
return getLogSegmentWriter(firstTxid, bestEffort, rollLog, false /* allow max txid */);
}
CompletableFuture<DLSN> queueRequest(LogRecord record, boolean flush) {
PendingLogRecord pendingLogRecord = new PendingLogRecord(record, flush);
pendingRequests.add(pendingLogRecord);
return pendingLogRecord.promise;
}
boolean shouldRollLog(BKLogSegmentWriter w) {
try {
return null == w
|| (!disableLogSegmentRollingFeature.isAvailable()
&& shouldStartNewSegment(w));
} catch (IOException ioe) {
return false;
}
}
void startQueueingRequests() {
assert(null == pendingRequests && null == rollingFuture);
pendingRequests = new LinkedList<PendingLogRecord>();
rollingFuture = new CompletableFuture<BKLogSegmentWriter>();
}
// for ordering guarantee, we shouldn't send requests to next log segments until
// previous log segment is done.
private synchronized CompletableFuture<DLSN> asyncWrite(final LogRecord record,
boolean flush) {
// The passed in writer may be stale since we acquire the writer outside of sync
// lock. If we recently rolled and the new writer is cached, use that instead.
CompletableFuture<DLSN> result = null;
BKLogSegmentWriter w;
try {
w = getCachedLogSegmentWriter();
} catch (WriteException we) {
return FutureUtils.exception(we);
}
if (null != rollingFuture) {
if (streamFailFast) {
result = FutureUtils.exception(new StreamNotReadyException("Rolling log segment"));
} else {
result = queueRequest(record, flush);
}
} else if (shouldRollLog(w)) {
// insert a last record, so when it called back, we will trigger a log segment rolling
startQueueingRequests();
if (null != w) {
LastPendingLogRecord lastLogRecordInCurrentSegment = new LastPendingLogRecord(record, flush);
w.asyncWrite(record, true).whenComplete(lastLogRecordInCurrentSegment);
result = lastLogRecordInCurrentSegment.promise;
} else { // no log segment yet. roll the log segment and issue pending requests.
result = queueRequest(record, flush);
rollLogSegmentAndIssuePendingRequests(record.getTransactionId());
}
} else {
result = w.asyncWrite(record, flush);
}
// use map here rather than onSuccess because we want lastTxId to be updated before
// satisfying the future
return result.thenApply(dlsn -> {
setLastTxId(record.getTransactionId());
return dlsn;
});
}
private List<CompletableFuture<DLSN>> asyncWriteBulk(List<LogRecord> records) {
final ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(records.size());
Iterator<LogRecord> iterator = records.iterator();
while (iterator.hasNext()) {
LogRecord record = iterator.next();
CompletableFuture<DLSN> future = asyncWrite(record, !iterator.hasNext());
results.add(future);
// Abort early if an individual write has already failed.
if (future.isDone() && future.isCompletedExceptionally()) {
break;
}
}
if (records.size() > results.size()) {
appendCancelledFutures(results, records.size() - results.size());
}
return results;
}
private void appendCancelledFutures(List<CompletableFuture<DLSN>> futures, int numToAdd) {
final WriteCancelledException cre =
new WriteCancelledException(getStreamName());
for (int i = 0; i < numToAdd; i++) {
CompletableFuture<DLSN> cancelledFuture = FutureUtils.exception(cre);
futures.add(cancelledFuture);
}
}
private void rollLogSegmentAndIssuePendingRequests(final long firstTxId) {
getLogSegmentWriter(firstTxId, true, true)
.whenComplete(new FutureEventListener<BKLogSegmentWriter>() {
@Override
public void onSuccess(BKLogSegmentWriter writer) {
try {
synchronized (BKAsyncLogWriter.this) {
for (PendingLogRecord pendingLogRecord : pendingRequests) {
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_LogWriterIssuePending);
writer.asyncWrite(pendingLogRecord.record, pendingLogRecord.flush)
.whenComplete(pendingLogRecord);
}
// if there are no records in the pending queue, let's write a control record
// so that when a new log segment is rolled, a control record will be added and
// the corresponding bookies would be able to create its ledger.
if (pendingRequests.isEmpty()) {
LogRecord controlRecord = new LogRecord(firstTxId,
DistributedLogConstants.CONTROL_RECORD_CONTENT);
controlRecord.setControl();
PendingLogRecord controlReq = new PendingLogRecord(controlRecord, false);
writer.asyncWrite(controlReq.record, controlReq.flush)
.whenComplete(controlReq);
}
if (null != rollingFuture) {
FutureUtils.complete(rollingFuture, writer);
}
rollingFuture = null;
pendingRequestDispatch.add(pendingRequests.size());
pendingRequests = null;
}
} catch (IOException ioe) {
errorOutPendingRequestsAndWriter(ioe);
}
}
@Override
public void onFailure(Throwable cause) {
errorOutPendingRequestsAndWriter(cause);
}
});
}
@VisibleForTesting
void errorOutPendingRequests(Throwable cause, boolean errorOutWriter) {
final List<PendingLogRecord> pendingRequestsSnapshot;
synchronized (this) {
pendingRequestsSnapshot = pendingRequests;
encounteredError = errorOutWriter;
pendingRequests = null;
if (null != rollingFuture) {
FutureUtils.completeExceptionally(rollingFuture, cause);
}
rollingFuture = null;
}
pendingRequestDispatch.add(pendingRequestsSnapshot.size());
// After erroring out the writer above, no more requests
// will be enqueued to pendingRequests
for (PendingLogRecord pendingLogRecord : pendingRequestsSnapshot) {
pendingLogRecord.promise.completeExceptionally(cause);
}
}
void errorOutPendingRequestsAndWriter(Throwable cause) {
errorOutPendingRequests(cause, true /* error out writer */);
}
/**
* Write a log record to the stream.
*
* @param record single log record
*/
@Override
public CompletableFuture<DLSN> write(final LogRecord record) {
final Stopwatch stopwatch = Stopwatch.createStarted();
return FutureUtils.stats(
asyncWrite(record, true),
writeOpStatsLogger,
stopwatch);
}
/**
* Write many log records to the stream. The return type here is unfortunate but its a direct result
* of having to combine FuturePool and the asyncWriteBulk method which returns a future as well. The
* problem is the List that asyncWriteBulk returns can't be materialized until getLogSegmentWriter
* completes, so it has to be wrapped in a future itself.
*
* @param records list of records
*/
@Override
public CompletableFuture<List<CompletableFuture<DLSN>>> writeBulk(final List<LogRecord> records) {
final Stopwatch stopwatch = Stopwatch.createStarted();
return FutureUtils.stats(
FutureUtils.value(asyncWriteBulk(records)),
bulkWriteOpStatsLogger,
stopwatch);
}
@Override
public CompletableFuture<Boolean> truncate(final DLSN dlsn) {
if (DLSN.InvalidDLSN == dlsn) {
return FutureUtils.value(false);
}
BKLogWriteHandler writeHandler;
try {
writeHandler = getWriteHandler();
} catch (IOException e) {
return FutureUtils.exception(e);
}
return writeHandler.setLogSegmentsOlderThanDLSNTruncated(dlsn).thenApply(truncationResultConverter);
}
CompletableFuture<Long> flushAndCommit() {
CompletableFuture<BKLogSegmentWriter> writerFuture;
synchronized (this) {
if (null != this.rollingFuture) {
writerFuture = this.rollingFuture;
} else {
writerFuture = getCachedLogWriterFuture();
}
}
if (null == writerFuture) {
return FutureUtils.value(getLastTxId());
}
return writerFuture.thenCompose(writer -> writer.flushAndCommit());
}
@Override
public CompletableFuture<Long> markEndOfStream() {
final Stopwatch stopwatch = Stopwatch.createStarted();
CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture;
synchronized (this) {
logSegmentWriterFuture = this.rollingFuture;
}
if (null == logSegmentWriterFuture) {
logSegmentWriterFuture = getLogSegmentWriterForEndOfStream();
}
return FutureUtils.stats(
logSegmentWriterFuture.thenCompose(w -> w.markEndOfStream()),
markEndOfStreamOpStatsLogger,
stopwatch);
}
@Override
protected CompletableFuture<Void> asyncCloseAndComplete() {
CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture;
synchronized (this) {
logSegmentWriterFuture = this.rollingFuture;
}
if (null == logSegmentWriterFuture) {
return super.asyncCloseAndComplete();
} else {
return logSegmentWriterFuture.thenCompose(segmentWriter1 -> super.asyncCloseAndComplete());
}
}
@Override
void closeAndComplete() throws IOException {
Utils.ioResult(asyncCloseAndComplete());
}
/**
* *TEMP HACK*
* Get the name of the stream this writer writes data to.
*/
@Override
public String getStreamName() {
return bkDistributedLogManager.getStreamName();
}
@Override
public CompletableFuture<Void> asyncAbort() {
CompletableFuture<Void> result = super.asyncAbort();
synchronized (this) {
if (pendingRequests != null) {
for (PendingLogRecord pendingLogRecord : pendingRequests) {
pendingLogRecord.promise
.completeExceptionally(new WriteException(bkDistributedLogManager.getStreamName(),
"abort wring: writer has been closed due to error."));
}
}
}
return result;
}
@Override
public String toString() {
return String.format("AsyncLogWriter:%s", getStreamName());
}
}