blob: 22c703a0ad15c8c42b77583f5fe860eac5b779b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED;
/**
* Data class representing the state of running/scheduled checkpoint.
*/
public class CheckpointProgressImpl implements CheckpointProgress {
/** Scheduled time of checkpoint. */
private volatile long nextCpNanos;
/** Current checkpoint state. */
private volatile AtomicReference<CheckpointState> state = new AtomicReference(CheckpointState.SCHEDULED);
/** Future which would be finished when corresponds state is set. */
private final Map<CheckpointState, GridFutureAdapter> stateFutures = new ConcurrentHashMap<>();
/** Cause of fail, which has happened during the checkpoint or null if checkpoint was successful. */
private volatile Throwable failCause;
/** Flag indicates that snapshot operation will be performed after checkpoint. */
private volatile boolean nextSnapshot;
/** Snapshot operation that should be performed if {@link #nextSnapshot} set to true. */
private volatile SnapshotOperation snapshotOperation;
/** Partitions destroy queue. */
private final PartitionDestroyQueue destroyQueue = new PartitionDestroyQueue();
/** Wakeup reason. */
private String reason;
/** Counter for written checkpoint pages. Not null only if checkpoint is running. */
private volatile AtomicInteger writtenPagesCntr;
/** Counter for fsynced checkpoint pages. Not null only if checkpoint is running. */
private volatile AtomicInteger syncedPagesCntr;
/** Counter for evicted checkpoint pages. Not null only if checkpoint is running. */
private volatile AtomicInteger evictedPagesCntr;
/** Number of pages in current checkpoint at the beginning of checkpoint. */
private volatile int currCheckpointPagesCnt;
/**
* @param cpFreq Timeout until next checkpoint.
*/
public CheckpointProgressImpl(long cpFreq) {
nextCpNanos = System.nanoTime() + U.millisToNanos(cpFreq);
}
/**
* @return {@code true} If checkpoint already started but have not finished yet.
*/
@Override public boolean inProgress() {
return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
}
/**
* @param expectedState Expected state.
* @return {@code true} if current state equal to given state.
*/
public boolean greaterOrEqualTo(CheckpointState expectedState) {
return state.get().ordinal() >= expectedState.ordinal();
}
/**
* @param state State for which future should be returned.
* @return Existed or new future which corresponds to the given state.
*/
@Override public GridFutureAdapter futureFor(CheckpointState state) {
GridFutureAdapter stateFut = stateFutures.computeIfAbsent(state, (k) -> new GridFutureAdapter());
if (greaterOrEqualTo(state) && !stateFut.isDone())
stateFut.onDone(failCause);
return stateFut;
}
/**
* Mark this checkpoint execution as failed.
*
* @param error Causal error of fail.
*/
@Override public void fail(Throwable error) {
failCause = error;
transitTo(FINISHED);
}
/**
* Changing checkpoint state if order of state is correct.
*
* @param newState New checkpoint state.
*/
@Override public void transitTo(@NotNull CheckpointState newState) {
CheckpointState state = this.state.get();
if (state.ordinal() < newState.ordinal()) {
this.state.compareAndSet(state, newState);
doFinishFuturesWhichLessOrEqualTo(newState);
}
}
/**
* Finishing futures with correct result in direct state order until lastState(included).
*
* @param lastState State until which futures should be done.
*/
private void doFinishFuturesWhichLessOrEqualTo(@NotNull CheckpointState lastState) {
for (CheckpointState old : CheckpointState.values()) {
GridFutureAdapter fut = stateFutures.get(old);
if (fut != null && !fut.isDone())
fut.onDone(failCause);
if (old == lastState)
return;
}
}
/**
* @return Destroy queue.
*/
public PartitionDestroyQueue getDestroyQueue() {
return destroyQueue;
}
/**
* @return Flag indicates that snapshot operation will be performed after checkpoint.
*/
public boolean nextSnapshot() {
return nextSnapshot;
}
/**
* @return Scheduled time of checkpoint.
*/
public long nextCopyNanos() {
return nextCpNanos;
}
/**
* @param nextCpNanos New scheduled time of checkpoint.
*/
public void nextCopyNanos(long nextCpNanos) {
this.nextCpNanos = nextCpNanos;
}
/**
* @return Wakeup reason.
*/
public String reason() {
return reason;
}
/**
* @param reason New wakeup reason.
*/
public void reason(String reason) {
this.reason = reason;
}
/**
* @return Snapshot operation that should be performed if set to true.
*/
public SnapshotOperation snapshotOperation() {
return snapshotOperation;
}
/**
* @param snapshotOperation New snapshot operation that should be performed if set to true.
*/
public void snapshotOperation(SnapshotOperation snapshotOperation) {
this.snapshotOperation = snapshotOperation;
}
/**
* @param nextSnapshot New flag indicates that snapshot operation will be performed after checkpoint.
*/
public void nextSnapshot(boolean nextSnapshot) {
this.nextSnapshot = nextSnapshot;
}
/** {@inheritDoc} */
@Override public AtomicInteger writtenPagesCounter() {
return writtenPagesCntr;
}
/** {@inheritDoc} */
@Override public void updateWrittenPages(int deltha) {
A.ensure(deltha > 0, "param must be positive");
writtenPagesCntr.addAndGet(deltha);
}
/** {@inheritDoc} */
@Override public AtomicInteger syncedPagesCounter() {
return syncedPagesCntr;
}
/** {@inheritDoc} */
@Override public void updateSyncedPages(int deltha) {
A.ensure(deltha > 0, "param must be positive");
syncedPagesCntr.addAndGet(deltha);
}
/** {@inheritDoc} */
@Override public AtomicInteger evictedPagesCounter() {
return evictedPagesCntr;
}
/** {@inheritDoc} */
@Override public void updateEvictedPages(int deltha) {
A.ensure(deltha > 0, "param must be positive");
if (evictedPagesCounter() != null)
evictedPagesCounter().addAndGet(deltha);
}
/** {@inheritDoc} */
@Override public int currentCheckpointPagesCount() {
return currCheckpointPagesCnt;
}
/** {@inheritDoc} */
@Override public void currentCheckpointPagesCount(int num) {
currCheckpointPagesCnt = num;
}
/** {@inheritDoc} */
@Override public void initCounters(int pagesSize) {
currCheckpointPagesCnt = pagesSize;
writtenPagesCntr = new AtomicInteger();
syncedPagesCntr = new AtomicInteger();
evictedPagesCntr = new AtomicInteger();
}
}