| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.distributedlog; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionStage; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| import org.apache.distributedlog.common.concurrent.FutureEventListener; |
| import org.apache.distributedlog.common.concurrent.FutureUtils; |
| import org.apache.distributedlog.common.util.PermitManager; |
| import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; |
| import org.apache.distributedlog.exceptions.AlreadyClosedException; |
| import org.apache.distributedlog.exceptions.LockingException; |
| import org.apache.distributedlog.exceptions.UnexpectedException; |
| import org.apache.distributedlog.exceptions.ZKException; |
| import org.apache.distributedlog.io.Abortable; |
| import org.apache.distributedlog.io.Abortables; |
| import org.apache.distributedlog.io.AsyncAbortable; |
| import org.apache.distributedlog.io.AsyncCloseable; |
| import org.apache.distributedlog.util.Utils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortable, AsyncAbortable { |
| static final Logger LOG = LoggerFactory.getLogger(BKAbstractLogWriter.class); |
| |
| protected final DistributedLogConfiguration conf; |
| private final DynamicDistributedLogConfiguration dynConf; |
| protected final BKDistributedLogManager bkDistributedLogManager; |
| |
| // States |
| private CompletableFuture<Void> closePromise = null; |
| private volatile boolean forceRolling = false; |
| private boolean forceRecovery = false; |
| |
| // Truncation Related |
| private CompletableFuture<List<LogSegmentMetadata>> lastTruncationAttempt = null; |
| @VisibleForTesting |
| private Long minTimestampToKeepOverride = null; |
| |
| // Log Segment Writers |
| protected BKLogSegmentWriter segmentWriter = null; |
| protected CompletableFuture<BKLogSegmentWriter> segmentWriterFuture = null; |
| protected BKLogSegmentWriter allocatedSegmentWriter = null; |
| protected BKLogWriteHandler writeHandler = null; |
| |
| BKAbstractLogWriter(DistributedLogConfiguration conf, |
| DynamicDistributedLogConfiguration dynConf, |
| BKDistributedLogManager bkdlm) { |
| this.conf = conf; |
| this.dynConf = dynConf; |
| this.bkDistributedLogManager = bkdlm; |
| LOG.debug("Initial retention period for {} : {}", bkdlm.getStreamName(), |
| TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS)); |
| } |
| |
| // manage write handler |
| |
| protected synchronized BKLogWriteHandler getCachedWriteHandler() { |
| return writeHandler; |
| } |
| |
| protected BKLogWriteHandler getWriteHandler() throws IOException { |
| BKLogWriteHandler writeHandler = createAndCacheWriteHandler(); |
| writeHandler.checkMetadataException(); |
| return writeHandler; |
| } |
| |
| protected BKLogWriteHandler createAndCacheWriteHandler() |
| throws IOException { |
| synchronized (this) { |
| if (writeHandler != null) { |
| return writeHandler; |
| } |
| } |
| // This code path will be executed when the handler is not set or has been closed |
| // due to forceRecovery during testing |
| BKLogWriteHandler newHandler = |
| Utils.ioResult(bkDistributedLogManager.asyncCreateWriteHandler(false)); |
| boolean success = false; |
| try { |
| synchronized (this) { |
| if (writeHandler == null) { |
| writeHandler = newHandler; |
| success = true; |
| } |
| return writeHandler; |
| } |
| } finally { |
| if (!success) { |
| newHandler.asyncAbort(); |
| } |
| } |
| } |
| |
| // manage log segment writers |
| |
| protected synchronized BKLogSegmentWriter getCachedLogWriter() { |
| return segmentWriter; |
| } |
| |
| protected synchronized CompletableFuture<BKLogSegmentWriter> getCachedLogWriterFuture() { |
| return segmentWriterFuture; |
| } |
| |
| protected synchronized void cacheLogWriter(BKLogSegmentWriter logWriter) { |
| this.segmentWriter = logWriter; |
| this.segmentWriterFuture = FutureUtils.value(logWriter); |
| } |
| |
| protected synchronized BKLogSegmentWriter removeCachedLogWriter() { |
| try { |
| return segmentWriter; |
| } finally { |
| segmentWriter = null; |
| segmentWriterFuture = null; |
| } |
| } |
| |
| protected synchronized BKLogSegmentWriter getAllocatedLogWriter() { |
| return allocatedSegmentWriter; |
| } |
| |
| protected synchronized void cacheAllocatedLogWriter(BKLogSegmentWriter logWriter) { |
| this.allocatedSegmentWriter = logWriter; |
| } |
| |
| protected synchronized BKLogSegmentWriter removeAllocatedLogWriter() { |
| try { |
| return allocatedSegmentWriter; |
| } finally { |
| allocatedSegmentWriter = null; |
| } |
| } |
| |
| private CompletableFuture<Void> asyncCloseAndComplete(boolean shouldThrow) { |
| BKLogSegmentWriter segmentWriter = getCachedLogWriter(); |
| BKLogWriteHandler writeHandler = getCachedWriteHandler(); |
| if (null != segmentWriter && null != writeHandler) { |
| cancelTruncation(); |
| CompletableFuture<Void> completePromise = new CompletableFuture<Void>(); |
| asyncCloseAndComplete(segmentWriter, writeHandler, completePromise, shouldThrow); |
| return completePromise; |
| } else { |
| return closeNoThrow(); |
| } |
| } |
| |
| private void asyncCloseAndComplete(final BKLogSegmentWriter segmentWriter, |
| final BKLogWriteHandler writeHandler, |
| final CompletableFuture<Void> completePromise, |
| final boolean shouldThrow) { |
| writeHandler.completeAndCloseLogSegment(segmentWriter) |
| .whenComplete(new FutureEventListener<LogSegmentMetadata>() { |
| @Override |
| public void onSuccess(LogSegmentMetadata segment) { |
| removeCachedLogWriter(); |
| complete(null); |
| } |
| |
| @Override |
| public void onFailure(Throwable cause) { |
| LOG.error("Completing Log segments encountered exception", cause); |
| complete(cause); |
| } |
| |
| private void complete(final Throwable cause) { |
| FutureUtils.ensure(closeNoThrow(), () -> { |
| if (null != cause && shouldThrow) { |
| FutureUtils.completeExceptionally(completePromise, cause); |
| } else { |
| FutureUtils.complete(completePromise, null); |
| } |
| }); |
| } |
| }); |
| } |
| |
| @VisibleForTesting |
| void closeAndComplete() throws IOException { |
| Utils.ioResult(asyncCloseAndComplete(true)); |
| } |
| |
| protected CompletableFuture<Void> asyncCloseAndComplete() { |
| return asyncCloseAndComplete(true); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| Utils.ioResult(asyncClose()); |
| } |
| |
| @Override |
| public CompletableFuture<Void> asyncClose() { |
| return asyncCloseAndComplete(false); |
| } |
| |
| /** |
| * Close the writer and release all the underlying resources. |
| */ |
| protected CompletableFuture<Void> closeNoThrow() { |
| CompletableFuture<Void> closeFuture; |
| synchronized (this) { |
| if (null != closePromise) { |
| return closePromise; |
| } |
| closeFuture = closePromise = new CompletableFuture<Void>(); |
| } |
| cancelTruncation(); |
| FutureUtils.proxyTo( |
| Utils.closeSequence(bkDistributedLogManager.getScheduler(), |
| true, /** ignore close errors **/ |
| getCachedLogWriter(), |
| getAllocatedLogWriter(), |
| getCachedWriteHandler() |
| ), |
| closeFuture); |
| return closeFuture; |
| } |
| |
| @Override |
| public void abort() throws IOException { |
| Utils.ioResult(asyncAbort()); |
| } |
| |
| @Override |
| public CompletableFuture<Void> asyncAbort() { |
| CompletableFuture<Void> closeFuture; |
| synchronized (this) { |
| if (null != closePromise) { |
| return closePromise; |
| } |
| closeFuture = closePromise = new CompletableFuture<Void>(); |
| } |
| cancelTruncation(); |
| FutureUtils.proxyTo( |
| Abortables.abortSequence(bkDistributedLogManager.getScheduler(), |
| getCachedLogWriter(), |
| getAllocatedLogWriter(), |
| getCachedWriteHandler()), |
| closeFuture); |
| return closeFuture; |
| } |
| |
| // used by sync writer |
| protected BKLogSegmentWriter getLedgerWriter(final long startTxId, |
| final boolean allowMaxTxID) |
| throws IOException { |
| CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true); |
| BKLogSegmentWriter logSegmentWriter = null; |
| if (null != logSegmentWriterFuture) { |
| logSegmentWriter = Utils.ioResult(logSegmentWriterFuture); |
| } |
| if (null == logSegmentWriter || (shouldStartNewSegment(logSegmentWriter) || forceRolling)) { |
| logSegmentWriter = Utils.ioResult(rollLogSegmentIfNecessary( |
| logSegmentWriter, startTxId, true /* bestEffort */, allowMaxTxID)); |
| } |
| return logSegmentWriter; |
| } |
| |
| // used by async writer |
| protected synchronized CompletableFuture<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) { |
| final BKLogSegmentWriter ledgerWriter = getCachedLogWriter(); |
| CompletableFuture<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture(); |
| if (null == ledgerWriterFuture || null == ledgerWriter) { |
| return null; |
| } |
| |
| // Handle the case where the last call to write actually caused an error in the log |
| if ((ledgerWriter.isLogSegmentInError() || forceRecovery) && resetOnError) { |
| // Close the ledger writer so that we will recover and start a new log segment |
| CompletableFuture<Void> closeFuture; |
| if (ledgerWriter.isLogSegmentInError()) { |
| closeFuture = ledgerWriter.asyncAbort(); |
| } else { |
| closeFuture = ledgerWriter.asyncClose(); |
| } |
| return closeFuture.thenCompose( |
| new Function<Void, CompletionStage<BKLogSegmentWriter>>() { |
| @Override |
| public CompletableFuture<BKLogSegmentWriter> apply(Void result) { |
| removeCachedLogWriter(); |
| |
| if (ledgerWriter.isLogSegmentInError()) { |
| return FutureUtils.value(null); |
| } |
| |
| BKLogWriteHandler writeHandler; |
| try { |
| writeHandler = getWriteHandler(); |
| } catch (IOException e) { |
| return FutureUtils.exception(e); |
| } |
| if (null != writeHandler && forceRecovery) { |
| return writeHandler.completeAndCloseLogSegment(ledgerWriter) |
| .thenApply(new Function<LogSegmentMetadata, BKLogSegmentWriter>() { |
| @Override |
| public BKLogSegmentWriter apply(LogSegmentMetadata completedLogSegment) { |
| return null; |
| } |
| }); |
| } else { |
| return FutureUtils.value(null); |
| } |
| } |
| }); |
| } else { |
| return ledgerWriterFuture; |
| } |
| } |
| |
| boolean shouldStartNewSegment(BKLogSegmentWriter ledgerWriter) throws IOException { |
| BKLogWriteHandler writeHandler = getWriteHandler(); |
| return null == ledgerWriter || writeHandler.shouldStartNewSegment(ledgerWriter) || forceRolling; |
| } |
| |
| private void truncateLogSegmentsIfNecessary(BKLogWriteHandler writeHandler) { |
| boolean truncationEnabled = false; |
| |
| long minTimestampToKeep = 0; |
| |
| long retentionPeriodInMillis = TimeUnit.MILLISECONDS.convert(dynConf.getRetentionPeriodHours(), TimeUnit.HOURS); |
| if (retentionPeriodInMillis > 0) { |
| minTimestampToKeep = Utils.nowInMillis() - retentionPeriodInMillis; |
| truncationEnabled = true; |
| } |
| |
| if (null != minTimestampToKeepOverride) { |
| minTimestampToKeep = minTimestampToKeepOverride; |
| truncationEnabled = true; |
| } |
| |
| // skip scheduling if there is task that's already running |
| // |
| synchronized (this) { |
| if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDone())) { |
| lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep); |
| } |
| } |
| } |
| |
| private CompletableFuture<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler, |
| final long startTxId, |
| final boolean allowMaxTxID) { |
| return writeHandler.recoverIncompleteLogSegments() |
| .thenCompose( |
| lastTxId -> writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID) |
| .thenApply(newSegmentWriter -> { |
| cacheLogWriter(newSegmentWriter); |
| return newSegmentWriter; |
| })); |
| } |
| |
| private CompletableFuture<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit( |
| final BKLogSegmentWriter oldSegmentWriter, |
| final BKLogWriteHandler writeHandler, |
| final long startTxId, |
| final boolean bestEffort, |
| final boolean allowMaxTxID) { |
| final PermitManager.Permit switchPermit = |
| bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit(); |
| if (switchPermit.isAllowed()) { |
| return FutureUtils.ensure( |
| FutureUtils.rescue( |
| closeOldLogSegmentAndStartNewOne( |
| oldSegmentWriter, |
| writeHandler, |
| startTxId, |
| bestEffort, |
| allowMaxTxID |
| ), |
| // rescue function |
| cause -> { |
| if (cause instanceof LockingException) { |
| LOG.warn("We lost lock during completeAndClose log segment for {}." |
| + "Disable ledger rolling until it is recovered : ", |
| writeHandler.getFullyQualifiedName(), cause); |
| bkDistributedLogManager.getLogSegmentRollingPermitManager() |
| .disallowObtainPermits(switchPermit); |
| return FutureUtils.value(oldSegmentWriter); |
| } else if (cause instanceof ZKException) { |
| ZKException zke = (ZKException) cause; |
| if (ZKException.isRetryableZKException(zke)) { |
| LOG.warn("Encountered zookeeper connection issues during completeAndClose " |
| + "log segment for {}. " |
| + "Disable ledger rolling until it is recovered : {}", |
| writeHandler.getFullyQualifiedName(), |
| zke.getKeeperExceptionCode()); |
| bkDistributedLogManager.getLogSegmentRollingPermitManager() |
| .disallowObtainPermits(switchPermit); |
| return FutureUtils.value(oldSegmentWriter); |
| } |
| } |
| return FutureUtils.exception(cause); |
| } |
| ), |
| // ensure function |
| () -> bkDistributedLogManager.getLogSegmentRollingPermitManager() |
| .releasePermit(switchPermit) |
| ); |
| } else { |
| bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(switchPermit); |
| return FutureUtils.value(oldSegmentWriter); |
| } |
| } |
| |
| private CompletableFuture<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne( |
| final BKLogSegmentWriter oldSegmentWriter, |
| final BKLogWriteHandler writeHandler, |
| final long startTxId, |
| final boolean bestEffort, |
| final boolean allowMaxTxID) { |
| // we switch only when we could allocate a new log segment. |
| BKLogSegmentWriter newSegmentWriter = getAllocatedLogWriter(); |
| if (null == newSegmentWriter) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Allocating a new log segment from {} for {}.", startTxId, |
| writeHandler.getFullyQualifiedName()); |
| } |
| return writeHandler.asyncStartLogSegment(startTxId, bestEffort, allowMaxTxID) |
| .thenCompose(new Function<BKLogSegmentWriter, CompletableFuture<BKLogSegmentWriter>>() { |
| @Override |
| public CompletableFuture<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) { |
| if (null == newSegmentWriter) { |
| if (bestEffort) { |
| return FutureUtils.value(oldSegmentWriter); |
| } else { |
| return FutureUtils.exception( |
| new UnexpectedException("StartLogSegment returns " |
| + "null for bestEffort rolling")); |
| } |
| } |
| cacheAllocatedLogWriter(newSegmentWriter); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Allocated a new log segment from {} for {}.", startTxId, |
| writeHandler.getFullyQualifiedName()); |
| } |
| return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter); |
| } |
| }); |
| } else { |
| return completeOldSegmentAndCacheNewLogSegmentWriter(oldSegmentWriter, newSegmentWriter); |
| } |
| } |
| |
| private CompletableFuture<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter( |
| BKLogSegmentWriter oldSegmentWriter, |
| final BKLogSegmentWriter newSegmentWriter) { |
| final CompletableFuture<BKLogSegmentWriter> completePromise = new CompletableFuture<BKLogSegmentWriter>(); |
| // complete the old log segment |
| writeHandler.completeAndCloseLogSegment(oldSegmentWriter) |
| .whenComplete(new FutureEventListener<LogSegmentMetadata>() { |
| |
| @Override |
| public void onSuccess(LogSegmentMetadata value) { |
| cacheLogWriter(newSegmentWriter); |
| removeAllocatedLogWriter(); |
| FutureUtils.complete(completePromise, newSegmentWriter); |
| } |
| |
| @Override |
| public void onFailure(Throwable cause) { |
| FutureUtils.completeExceptionally(completePromise, cause); |
| } |
| }); |
| return completePromise; |
| } |
| |
| protected synchronized CompletableFuture<BKLogSegmentWriter> rollLogSegmentIfNecessary( |
| final BKLogSegmentWriter segmentWriter, |
| long startTxId, |
| boolean bestEffort, |
| boolean allowMaxTxID) { |
| final BKLogWriteHandler writeHandler; |
| try { |
| writeHandler = getWriteHandler(); |
| } catch (IOException e) { |
| return FutureUtils.exception(e); |
| } |
| CompletableFuture<BKLogSegmentWriter> rollPromise; |
| if (null != segmentWriter && (writeHandler.shouldStartNewSegment(segmentWriter) || forceRolling)) { |
| rollPromise = closeOldLogSegmentAndStartNewOneWithPermit( |
| segmentWriter, writeHandler, startTxId, bestEffort, allowMaxTxID); |
| } else if (null == segmentWriter) { |
| rollPromise = asyncStartNewLogSegment(writeHandler, startTxId, allowMaxTxID); |
| } else { |
| rollPromise = FutureUtils.value(segmentWriter); |
| } |
| return rollPromise.thenApply(new Function<BKLogSegmentWriter, BKLogSegmentWriter>() { |
| @Override |
| public BKLogSegmentWriter apply(BKLogSegmentWriter newSegmentWriter) { |
| if (segmentWriter == newSegmentWriter) { |
| return newSegmentWriter; |
| } |
| truncateLogSegmentsIfNecessary(writeHandler); |
| return newSegmentWriter; |
| } |
| }); |
| } |
| |
| protected synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException { |
| if (null != closePromise) { |
| LOG.error("Executing " + operation + " on already closed Log Writer"); |
| throw new AlreadyClosedException("Executing " + operation + " on already closed Log Writer"); |
| } |
| } |
| |
| @VisibleForTesting |
| public void setForceRolling(boolean forceRolling) { |
| this.forceRolling = forceRolling; |
| } |
| |
| @VisibleForTesting |
| public synchronized void overRideMinTimeStampToKeep(Long minTimestampToKeepOverride) { |
| this.minTimestampToKeepOverride = minTimestampToKeepOverride; |
| } |
| |
| protected synchronized void cancelTruncation() { |
| if (null != lastTruncationAttempt) { |
| lastTruncationAttempt.cancel(true); |
| lastTruncationAttempt = null; |
| } |
| } |
| |
| @VisibleForTesting |
| public synchronized void setForceRecovery(boolean forceRecovery) { |
| this.forceRecovery = forceRecovery; |
| } |
| |
| } |