blob: 1f0979a63f8932236dfc711d371edd4bb1a984e2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Consumer;
import org.jetbrains.annotations.Nullable;
/**
* Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup.
*/
class SegmentReservationStorage extends SegmentObservable {
/**
* Maps absolute segment index to reservation counter. If counter > 0 then we wouldn't delete all segments which has
* index >= reserved segment index. Guarded by {@code this}.
*/
private final NavigableMap<Long, Integer> reserved = new TreeMap<>();
/** Maximum segment index that can be reserved. */
private long minReserveIdx = -1;
/**
* Segment reservation. It will be successful if segment is {@code >} than the {@link #minReserveIdx minimum}.
*
* @param absIdx Index for reservation.
* @return {@code True} if the reservation was successful.
*/
boolean reserve(long absIdx) {
boolean res = false;
Long minReservedIdx = null;
synchronized (this) {
if (absIdx > minReserveIdx) {
minReservedIdx = trackingMinReservedIdx(reserved -> reserved.merge(absIdx, 1, Integer::sum));
res = true;
}
}
if (minReservedIdx != null)
notifyObservers(minReservedIdx);
return res;
}
/**
* Checks if segment is currently reserved (protected from deletion during WAL cleanup).
*
* @param absIdx Index for check reservation.
* @return {@code True} if index is reserved.
*/
synchronized boolean reserved(long absIdx) {
return reserved.floorKey(absIdx) != null;
}
/**
* @param absIdx Reserved index.
*/
void release(long absIdx) {
Long minReservedIdx;
synchronized (this) {
minReservedIdx = trackingMinReservedIdx(
reserved -> reserved.computeIfPresent(absIdx, (i, cnt) -> cnt == 1 ? null : cnt - 1)
);
}
if (minReservedIdx != null)
notifyObservers(minReservedIdx);
}
/**
* Increasing minimum segment index that can be reserved.
* Value will be updated if it is greater than the current one.
* If segment is already reserved, the update will fail.
*
* @param absIdx Absolut segment index.
* @return {@code True} if update is successful.
*/
synchronized boolean minReserveIndex(long absIdx) {
if (reserved(absIdx))
return false;
minReserveIdx = Math.max(minReserveIdx, absIdx);
return true;
}
/**
* Updating {@link #reserved} with tracking changes of minimum reserved segment.
*
* @param updateFun {@link #reserved} update function.
* @return New minimum reserved segment, {@code null} if there are no changes,
* {@code -1} if there are no reserved segments.
*/
@Nullable private synchronized Long trackingMinReservedIdx(Consumer<NavigableMap<Long, Integer>> updateFun) {
Map.Entry<Long, Integer> oldMinE = reserved.firstEntry();
updateFun.accept(reserved);
Map.Entry<Long, Integer> newMinE = reserved.firstEntry();
Long oldMin = oldMinE == null ? null : oldMinE.getKey();
Long newMin = newMinE == null ? null : newMinE.getKey();
return Objects.equals(oldMin, newMin) ? null : newMin == null ? -1 : newMin;
}
}