blob: 7dbb5077b7c16d90831f800e63abbcd0ae309057 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
import java.lang.ref.SoftReference;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_GRP_STATE_LAZY_STORE;
/**
* Class represents checkpoint state.
*/
public class CheckpointEntry {
/** Checkpoint timestamp. */
private final long cpTs;
/** Checkpoint end mark. */
private final WALPointer cpMark;
/** Checkpoint ID. */
private final UUID cpId;
/** State of groups and partitions snapshotted at the checkpoint begin. */
private volatile SoftReference<GroupStateLazyStore> grpStateLazyStore;
/**
* Checkpoint entry constructor.
*
* If {@code grpStates} is null then it will be inited lazy from wal pointer.
*
* @param cpTs Checkpoint timestamp.
* @param cpMark Checkpoint mark pointer.
* @param cpId Checkpoint ID.
* @param cacheGrpStates Cache groups states.
*/
CheckpointEntry(
long cpTs,
WALPointer cpMark,
UUID cpId,
@Nullable Map<Integer, CacheState> cacheGrpStates
) {
this.cpTs = cpTs;
this.cpMark = cpMark;
this.cpId = cpId;
this.grpStateLazyStore = new SoftReference<>(new GroupStateLazyStore(cacheGrpStates));
}
/**
* @return Checkpoint timestamp.
*/
public long timestamp() {
return cpTs;
}
/**
* @return Checkpoint ID.
*/
public UUID checkpointId() {
return cpId;
}
/**
* @return Checkpoint mark.
*/
public WALPointer checkpointMark() {
return cpMark;
}
/**
* @param wal Write ahead log manager.
* @return Group id -> group state map.
*/
public Map<Integer, GroupState> groupState(
IgniteWriteAheadLogManager wal
) throws IgniteCheckedException {
GroupStateLazyStore store = initIfNeeded(wal);
return store.grpStates;
}
/**
* @param wal Write ahead log manager.
* @return Group lazy store.
*/
private GroupStateLazyStore initIfNeeded(IgniteWriteAheadLogManager wal) throws IgniteCheckedException {
GroupStateLazyStore store = grpStateLazyStore.get();
if (store == null || IgniteSystemProperties.getBoolean(IGNITE_DISABLE_GRP_STATE_LAZY_STORE, false)) {
store = new GroupStateLazyStore();
grpStateLazyStore = new SoftReference<>(store);
}
store.initIfNeeded(wal, cpMark);
return store;
}
/**
* @param wal Write ahead log manager.
* @param grpId Cache group ID.
* @param part Partition ID.
* @return Partition counter or {@code null} if not found.
* @throws IgniteCheckedException If something is wrong when loading the counter from WAL history.
*/
public Long partitionCounter(IgniteWriteAheadLogManager wal, int grpId, int part) throws IgniteCheckedException {
GroupStateLazyStore store = initIfNeeded(wal);
return store.partitionCounter(grpId, part);
}
/** {@inheritDoc} */
@Override public String toString() {
return "CheckpointEntry [id=" + cpId + ", timestamp=" + cpTs + ", ptr=" + cpMark + "]";
}
/**
*
*/
public static class GroupState {
/** Partition ids. */
private int[] parts;
/** Partition counters which corresponds to partition ids. */
private long[] cnts;
/** Next index to insert to parts and cnts. */
private int idx;
/**
* @param partsCnt Partitions count.
*/
private GroupState(int partsCnt) {
parts = new int[partsCnt];
cnts = new long[partsCnt];
}
/**
* @param partId Partition ID to add.
* @param cntr Partition counter.
*/
public void addPartitionCounter(int partId, long cntr) {
if (idx == parts.length)
throw new IllegalStateException("Failed to add new partition to the partitions state " +
"(no enough space reserved) [partId=" + partId + ", reserved=" + parts.length + ']');
if (idx > 0) {
if (parts[idx - 1] >= partId)
throw new IllegalStateException("Adding partition in a wrong order [prev=" + parts[idx - 1] +
", cur=" + partId + ']');
}
parts[idx] = partId;
cnts[idx] = cntr;
idx++;
}
/**
* Gets partition counter by partition ID.
*
* @param partId Partition ID.
* @return Partition update counter (will return {@code -1} if partition is not present in the record).
*/
public long counterByPartition(int partId) {
int idx = indexByPartition(partId);
return idx >= 0 ? cnts[idx] : -1;
}
/**
* Return a partition id by an index of this group state. Index was passed through parameter have to be less
* than size.
*
* @param idx Partition index.
* @return Patition id.
*/
public int getPartitionByIndex(int idx) {
return parts[idx];
}
/**
* Return size of this group state.
*
* @return Size of an internal indexes array fro this group state.
*/
public int size() {
return idx;
}
/**
* @param partId Partition ID to search.
* @return Non-negative index of partition if found or negative value if not found.
*/
public int indexByPartition(int partId) {
return Arrays.binarySearch(parts, 0, idx, partId);
}
/** {@inheritDoc} */
@Override public String toString() {
return "GroupState [cap=" + parts.length + ", size=" + idx + ']';
}
}
/**
* Group state lazy store.
*/
public static class GroupStateLazyStore {
/** */
private static final AtomicIntegerFieldUpdater<GroupStateLazyStore> initGuardUpdater =
AtomicIntegerFieldUpdater.newUpdater(GroupStateLazyStore.class, "initGuard");
/** Cache states. Initialized lazily. */
private volatile Map<Integer, GroupState> grpStates;
/** */
private final CountDownLatch latch;
/** */
@SuppressWarnings("unused")
private volatile int initGuard;
/** Initialization exception. */
private IgniteCheckedException initEx;
/**
* Default constructor.
*/
private GroupStateLazyStore() {
this(null);
}
/**
* @param cacheGrpStates Cache group state.
*/
private GroupStateLazyStore(Map<Integer, CacheState> cacheGrpStates) {
if (cacheGrpStates != null) {
initGuard = 1;
latch = new CountDownLatch(0);
}
else
latch = new CountDownLatch(1);
grpStates = remap(cacheGrpStates);
}
/**
* @param stateRec Cache group state.
*/
private Map<Integer, GroupState> remap(Map<Integer, CacheState> stateRec) {
if (stateRec == null)
return Collections.emptyMap();
Map<Integer, GroupState> grpStates = U.newHashMap(stateRec.size());
for (Integer grpId : stateRec.keySet()) {
CacheState recState = stateRec.get(grpId);
GroupState grpState = new GroupState(recState.size());
for (int i = 0; i < recState.size(); i++) {
byte partState = recState.stateByIndex(i);
if (GridDhtPartitionState.fromOrdinal(partState) != GridDhtPartitionState.OWNING)
continue;
grpState.addPartitionCounter(
recState.partitionByIndex(i),
recState.partitionCounterByIndex(i)
);
}
grpStates.put(grpId, grpState);
}
return grpStates;
}
/**
* @param grpId Group id.
* @param part Partition id.
* @return Partition counter.
*/
private Long partitionCounter(int grpId, int part) {
assert initGuard != 0 : initGuard;
if (initEx != null || grpStates == null)
return null;
GroupState state = grpStates.get(grpId);
if (state != null) {
long cntr = state.counterByPartition(part);
return cntr < 0 ? null : cntr;
}
return null;
}
/**
* @param wal Write ahead log manager.
* @param ptr Checkpoint wal pointer.
* @throws IgniteCheckedException If failed to read WAL entry.
*/
private void initIfNeeded(
IgniteWriteAheadLogManager wal,
WALPointer ptr
) throws IgniteCheckedException {
if (initGuardUpdater.compareAndSet(this, 0, 1)) {
try (WALIterator it = wal.replay(ptr)) {
if (it.hasNextX()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
CheckpointRecord rec = (CheckpointRecord)tup.get2();
Map<Integer, CacheState> stateRec = rec.cacheGroupStates();
grpStates = remap(stateRec);
}
else {
throw new IgniteCheckedException(
"Failed to find checkpoint record at the given WAL pointer: " + ptr);
}
}
catch (IgniteCheckedException e) {
initEx = e;
throw e;
}
finally {
latch.countDown();
}
}
else {
U.await(latch);
if (initEx != null)
throw initEx;
}
}
}
}