blob: 728bd7411e5bcbec14bd3268059213899c285194 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
/**
* Holder of actual information of latest manipulation on WAL segments.
*/
public class SegmentAware {
/** Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. */
private final SegmentReservationStorage reservationStorage = new SegmentReservationStorage();
/** Lock on segment protects from archiving segment. */
private final SegmentLockStorage segmentLockStorage = new SegmentLockStorage();
/** Manages last archived index, emulates archivation in no-archiver mode. */
private final SegmentArchivedStorage segmentArchivedStorage;
/** Storage of actual information about current index of compressed segments. */
private final SegmentCompressStorage segmentCompressStorage;
/** Storage of absolute current segment index. */
private final SegmentCurrentStateStorage segmentCurrStateStorage;
/** Storage of archive size. */
private final SegmentArchiveSizeStorage archiveSizeStorage;
/** Storage of truncated segments. */
private final SegmentTruncateStorage truncateStorage;
/**
* Constructor.
*
* @param log Logger.
* @param walSegmentsCnt Total WAL segments count.
* @param compactionEnabled Is wal compaction enabled.
* @param minWalArchiveSize Minimum size of the WAL archive in bytes
* or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}.
* @param maxWalArchiveSize Maximum size of the WAL archive in bytes
* or {@link DataStorageConfiguration#UNLIMITED_WAL_ARCHIVE}.
*/
public SegmentAware(
IgniteLogger log,
int walSegmentsCnt,
boolean compactionEnabled,
long minWalArchiveSize,
long maxWalArchiveSize
) {
segmentArchivedStorage = new SegmentArchivedStorage(segmentLockStorage);
segmentCurrStateStorage = new SegmentCurrentStateStorage(walSegmentsCnt);
segmentCompressStorage = new SegmentCompressStorage(log, compactionEnabled);
archiveSizeStorage = new SegmentArchiveSizeStorage(
log,
minWalArchiveSize,
maxWalArchiveSize,
reservationStorage
);
truncateStorage = new SegmentTruncateStorage();
segmentArchivedStorage.addObserver(segmentCurrStateStorage::onSegmentArchived);
segmentArchivedStorage.addObserver(segmentCompressStorage::onSegmentArchived);
segmentArchivedStorage.addObserver(truncateStorage::lastArchivedIdx);
segmentLockStorage.addObserver(segmentArchivedStorage::onSegmentUnlocked);
reservationStorage.addObserver(truncateStorage::minReservedIdx);
}
/**
* Waiting until current WAL index will be greater or equal than given one.
*
* @param absSegIdx Target WAL index.
*/
public void awaitSegment(long absSegIdx) throws IgniteInterruptedCheckedException {
segmentCurrStateStorage.awaitSegment(absSegIdx);
}
/**
* Calculate next segment index or wait if needed.
*
* @return Next absolute segment index.
*/
public long nextAbsoluteSegmentIndex() throws IgniteInterruptedCheckedException {
return segmentCurrStateStorage.nextAbsoluteSegmentIndex();
}
/**
* @return Current WAL index.
*/
public long curAbsWalIdx() {
return segmentCurrStateStorage.curAbsWalIdx();
}
/**
* Waiting until archivation of next segment will be allowed.
*/
public long waitNextSegmentForArchivation() throws IgniteInterruptedCheckedException {
return segmentCurrStateStorage.waitNextSegmentForArchivation();
}
/**
* Mark segment as moved to archive under lock.
*
* @param toArchive Segment which was should be moved to archive.
* @throws IgniteInterruptedCheckedException if interrupted during waiting.
*/
public void markAsMovedToArchive(long toArchive) throws IgniteInterruptedCheckedException {
segmentArchivedStorage.markAsMovedToArchive(toArchive);
}
/**
* Method will wait activation of particular WAL segment index.
*
* @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true.
* @throws IgniteInterruptedCheckedException if interrupted.
*/
public void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException {
segmentArchivedStorage.awaitSegmentArchived(awaitIdx);
}
/**
* Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. Waits if
* there's no segment to archive right now.
*/
public long waitNextSegmentToCompress() throws IgniteInterruptedCheckedException {
long idx;
while ((idx = segmentCompressStorage.nextSegmentToCompressOrWait()) <= lastTruncatedArchiveIdx())
onSegmentCompressed(idx);
return idx;
}
/**
* Callback after segment compression finish.
*
* @param compressedIdx Index of compressed segment.
*/
public void onSegmentCompressed(long compressedIdx) {
segmentCompressStorage.onSegmentCompressed(compressedIdx);
}
/**
* @return Last compressed segment.
*/
public long lastCompressedIdx() {
return segmentCompressStorage.lastCompressedIdx();
}
/**
* Update current WAL index.
*
* @param curAbsWalIdx New current WAL index.
*/
public void curAbsWalIdx(long curAbsWalIdx) {
segmentCurrStateStorage.curAbsWalIdx(curAbsWalIdx);
}
/**
* Update last truncated segment.
*
* @param absIdx Absolut segment index.
*/
public void lastTruncatedArchiveIdx(long absIdx) {
truncateStorage.lastTruncatedIdx(absIdx);
}
/**
* Getting last truncated segment.
*
* @return Absolut segment index.
*/
public long lastTruncatedArchiveIdx() {
return truncateStorage.lastTruncatedIdx();
}
/**
* @param lastAbsArchivedIdx New value of last archived segment index.
*/
public void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
segmentArchivedStorage.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
}
/**
* @return Last archived segment absolute index.
*/
public long lastArchivedAbsoluteIndex() {
return segmentArchivedStorage.lastArchivedAbsoluteIndex();
}
/**
* Segment reservation. It will be successful if segment is {@code >} than
* the {@link #minReserveIndex minimum}.
*
* @param absIdx Index for reservation.
* @return {@code True} if the reservation was successful.
*/
public boolean reserve(long absIdx) {
return reservationStorage.reserve(absIdx);
}
/**
* Checks if segment is currently reserved (protected from deletion during WAL cleanup).
*
* @param absIdx Index for check reservation.
* @return {@code True} if index is reserved.
*/
public boolean reserved(long absIdx) {
return reservationStorage.reserved(absIdx);
}
/**
* @param absIdx Reserved index.
*/
public void release(long absIdx) {
reservationStorage.release(absIdx);
}
/**
* Check if WAL segment locked (protected from move to archive).
*
* @param absIdx Index for check locking.
* @return {@code True} if index is locked.
*/
public boolean locked(long absIdx) {
return segmentLockStorage.locked(absIdx);
}
/**
* Segment lock. It will be successful if segment is {@code >} than
* the {@link #lastArchivedAbsoluteIndex last archived}.
*
* @param absIdx Index to lock.
* @return {@code True} if the lock was successful.
*/
public boolean lock(long absIdx) {
return segmentLockStorage.lockWorkSegment(absIdx);
}
/**
* @param absIdx Index to unlock.
*/
public void unlock(long absIdx) {
segmentLockStorage.releaseWorkSegment(absIdx);
}
/**
* Reset interrupted flag.
*/
public void reset() {
segmentArchivedStorage.reset();
segmentCompressStorage.reset();
segmentCurrStateStorage.reset();
archiveSizeStorage.reset();
truncateStorage.reset();
}
/**
* Interrupt waiting on related objects.
*/
public void interrupt() {
segmentArchivedStorage.interrupt();
segmentCompressStorage.interrupt();
segmentCurrStateStorage.interrupt();
archiveSizeStorage.interrupt();
truncateStorage.interrupt();
}
/**
* Interrupt waiting on related objects.
*/
public void forceInterrupt() {
segmentArchivedStorage.interrupt();
segmentCompressStorage.interrupt();
segmentCurrStateStorage.forceInterrupt();
archiveSizeStorage.interrupt();
truncateStorage.interrupt();
}
/**
* Increasing minimum segment index after that can be reserved.
* Value will be updated if it is greater than the current one.
* If segment is already reserved, the update will fail.
*
* @param absIdx Absolut segment index.
* @return {@code True} if update is successful.
*/
public boolean minReserveIndex(long absIdx) {
return reservationStorage.minReserveIndex(absIdx);
}
/**
* Increasing minimum segment index after that can be locked.
* Value will be updated if it is greater than the current one.
* If segment is already reserved, the update will fail.
*
* @param absIdx Absolut segment index.
* @return {@code True} if update is successful.
*/
public boolean minLockIndex(long absIdx) {
return segmentLockStorage.minLockIndex(absIdx);
}
/**
* Adding the WAL segment size in the archive.
*
* @param idx Absolut segment index.
* @param sizeChange Segment size in bytes.
*/
public void addSize(long idx, long sizeChange) {
archiveSizeStorage.changeSize(idx, sizeChange);
}
/**
* Reset the current and reserved WAL archive sizes.
*/
public void resetWalArchiveSizes() {
archiveSizeStorage.resetSizes();
}
/**
* Waiting for exceeding the maximum WAL archive size.
* To track size of WAL archive, need to use {@link #addSize}.
*
* @param max Maximum WAL archive size in bytes.
* @throws IgniteInterruptedCheckedException If it was interrupted.
*/
public void awaitExceedMaxArchiveSize(long max) throws IgniteInterruptedCheckedException {
archiveSizeStorage.awaitExceedMaxSize(max);
}
/**
* Update segment of last completed checkpoint.
* Required for binary recovery.
*
* @param absIdx Absolut segment index.
*/
public void lastCheckpointIdx(long absIdx) {
truncateStorage.lastCheckpointIdx(absIdx);
}
/**
* Waiting for segment truncation to be available. To get the number of segments available for truncation, use
* {@link #lastTruncatedArchiveIdx}, {@link #lastCheckpointIdx}, {@link #reserve} and
* {@link #lastArchivedAbsoluteIndex} (to restart the node correctly) and is calculated as
* {@code lastTruncatedArchiveIdx} - {@code min(lastCheckpointIdx, reserve, lastArchivedAbsoluteIndex)}.
*
* @return Number of segments available to truncate.
* @throws IgniteInterruptedCheckedException If it was interrupted.
*/
public long awaitAvailableTruncateArchive() throws IgniteInterruptedCheckedException {
return truncateStorage.awaitAvailableTruncate();
}
/**
* Start automatically releasing segments when reaching {@link DataStorageConfiguration#getMaxWalArchiveSize()}.
*/
public void startAutoReleaseSegments() {
archiveSizeStorage.startAutoReleaseSegments();
}
}