blob: 6172a75ab0cfed083aa646ce039d5aea8e197a2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentArchivedStorage.buildArchivedStorage;
import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentCompressStorage.buildCompressStorage;
import static org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentCurrentStateStorage.buildCurrentStateStorage;
/**
* Holder of actual information of latest manipulation on WAL segments.
*/
public class SegmentAware {
/** Latest truncated segment. */
private volatile long lastTruncatedArchiveIdx = -1L;
/** Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. */
private final SegmentReservationStorage reservationStorage = new SegmentReservationStorage();
/** Lock on segment protects from archiving segment. */
private final SegmentLockStorage segmentLockStorage = new SegmentLockStorage();
/** Manages last archived index, emulates archivation in no-archiver mode. */
private final SegmentArchivedStorage segmentArchivedStorage = buildArchivedStorage(segmentLockStorage);
/** Storage of actual information about current index of compressed segments. */
private final SegmentCompressStorage segmentCompressStorage = buildCompressStorage(segmentArchivedStorage);
/** Storage of absolute current segment index. */
private final SegmentCurrentStateStorage segmentCurrStateStorage;
/**
* @param walSegmentsCnt Total WAL segments count.
*/
public SegmentAware(int walSegmentsCnt) {
segmentCurrStateStorage = buildCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage);
}
/**
* Waiting until current WAL index will be greater or equal than given one.
*
* @param absSegIdx Target WAL index.
*/
public void awaitSegment(long absSegIdx) throws IgniteInterruptedCheckedException {
segmentCurrStateStorage.awaitSegment(absSegIdx);
}
/**
* Calculate next segment index or wait if needed.
*
* @return Next absolute segment index.
*/
public long nextAbsoluteSegmentIndex() throws IgniteInterruptedCheckedException {
return segmentCurrStateStorage.nextAbsoluteSegmentIndex();
}
/**
* @return Current WAL index.
*/
public long curAbsWalIdx() {
return segmentCurrStateStorage.curAbsWalIdx();
}
/**
* Waiting until archivation of next segment will be allowed.
*/
public long waitNextSegmentForArchivation() throws IgniteInterruptedCheckedException {
return segmentCurrStateStorage.waitNextSegmentForArchivation();
}
/**
* Mark segment as moved to archive under lock.
*
* @param toArchive Segment which was should be moved to archive.
* @throws IgniteInterruptedCheckedException if interrupted during waiting.
*/
public void markAsMovedToArchive(long toArchive) throws IgniteInterruptedCheckedException {
segmentArchivedStorage.markAsMovedToArchive(toArchive);
}
/**
* Method will wait activation of particular WAL segment index.
*
* @param awaitIdx absolute index {@link #lastArchivedAbsoluteIndex()} to become true.
* @throws IgniteInterruptedCheckedException if interrupted.
*/
public void awaitSegmentArchived(long awaitIdx) throws IgniteInterruptedCheckedException {
segmentArchivedStorage.awaitSegmentArchived(awaitIdx);
}
/**
* Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. Waits if
* there's no segment to archive right now.
*/
public long waitNextSegmentToCompress() throws IgniteInterruptedCheckedException {
return Math.max(segmentCompressStorage.nextSegmentToCompressOrWait(), lastTruncatedArchiveIdx + 1);
}
/**
* Force set last compressed segment.
*
* @param lastCompressedIdx Segment which was last compressed.
*/
public void lastCompressedIdx(long lastCompressedIdx) {
segmentCompressStorage.lastCompressedIdx(lastCompressedIdx);
}
/**
* @return Last compressed segment.
*/
public long lastCompressedIdx() {
return segmentCompressStorage.lastCompressedIdx();
}
/**
* Update current WAL index.
*
* @param curAbsWalIdx New current WAL index.
*/
public void curAbsWalIdx(long curAbsWalIdx) {
segmentCurrStateStorage.curAbsWalIdx(curAbsWalIdx);
}
/**
* @param lastTruncatedArchiveIdx Last truncated segment;
*/
public void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) {
this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx;
}
/**
* @return Last truncated segment.
*/
public long lastTruncatedArchiveIdx() {
return lastTruncatedArchiveIdx;
}
/**
* @param lastAbsArchivedIdx New value of last archived segment index.
*/
public void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
segmentArchivedStorage.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
}
/**
* @return Last archived segment absolute index.
*/
public long lastArchivedAbsoluteIndex() {
return segmentArchivedStorage.lastArchivedAbsoluteIndex();
}
/**
* @param absIdx Index for reservation.
*/
public void reserve(long absIdx) {
reservationStorage.reserve(absIdx);
}
/**
* Checks if segment is currently reserved (protected from deletion during WAL cleanup).
*
* @param absIdx Index for check reservation.
* @return {@code True} if index is reserved.
*/
public boolean reserved(long absIdx) {
return reservationStorage.reserved(absIdx);
}
/**
* @param absIdx Reserved index.
*/
public void release(long absIdx) {
reservationStorage.release(absIdx);
}
/**
* Check if WAL segment locked (protected from move to archive)
*
* @param absIdx Index for check reservation.
* @return {@code True} if index is locked.
*/
public boolean locked(long absIdx) {
return segmentLockStorage.locked(absIdx);
}
/**
* @param absIdx Segment absolute index.
* @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need release
* segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
*/
public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
return lastArchivedAbsoluteIndex() >= absIdx || segmentLockStorage.lockWorkSegment(absIdx);
}
/**
* Visible for test.
*
* @param absIdx Segment absolute index. segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
*/
void lockWorkSegment(long absIdx) {
segmentLockStorage.lockWorkSegment(absIdx);
}
/**
* @param absIdx Segment absolute index.
*/
public void releaseWorkSegment(long absIdx) {
segmentLockStorage.releaseWorkSegment(absIdx);
}
/**
* Interrupt waiting on related objects.
*/
public void interrupt() {
segmentArchivedStorage.interrupt();
segmentCompressStorage.interrupt();
segmentCurrStateStorage.interrupt();
}
/**
* Interrupt waiting on related objects.
*/
public void forceInterrupt() {
segmentArchivedStorage.interrupt();
segmentCompressStorage.interrupt();
segmentCurrStateStorage.forceInterrupt();
}
}