blob: 9a5f1b263aed11f732c0f8ea68948dc335c17fee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
/**
* Update counter implementation used for cache groups in persistent mode for both tx and atomic caches.
* <p>
* Implements the partition update counter flow to avoid situations when:
* <ol>
* <li>update counter could be incremented and persisted while corresponding update is not recorded to WAL.</li>
* <li>update counter could be updated out of order.</li>
* </ol>
* All these situations are sources of partitions desync in case of node failure under load.
* <p>
* The main idea is to track updates received out-of-order to ensure valid state of the update counter for rebalancing.
* <p>
* Below a short description of new flow:
* <ol>
* <li>Update counter is <i>reserved</i> for each update in partition on tx prepare phase (which always happens
* on primary partition owner). Reservation causes HWM increment.</li>
* <li>Reserved counter values are propagated on backup nodes and stored in backup transactions.</li>
* <li>On commit reserved counters are assigned to cache entries.</li>
* <li>LWM is incremented ONLY after corresponding WAL data record for each entry was written.</li>
* <li>In case of rollback (manual or during tx recovery on node failure) reserved updates are also applied and
* logged to WAL using {@link RollbackRecord} for further recovery purposes.</li>
* </ol>
*/
public class PartitionUpdateCounterTrackingImpl implements PartitionUpdateCounter {
/**
* Max allowed missed updates. Overflow will trigger critical failure handler to prevent OOM.
*/
public static final int MAX_MISSED_UPDATES = 10_000;
/** Counter updates serialization version. */
private static final byte VERSION = 1;
/** Queue of applied out of order counter updates. */
private NavigableMap<Long, Item> queue = new TreeMap<>();
/** LWM. */
private final AtomicLong cntr = new AtomicLong();
/** HWM. */
protected final AtomicLong reserveCntr = new AtomicLong();
/** */
private boolean first = true;
/** */
protected final CacheGroupContext grp;
/**
* Initial counter points to last sequential update after WAL recovery.
* @deprecated TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11794
*/
@Deprecated private volatile long initCntr;
/**
* @param grp Group.
*/
public PartitionUpdateCounterTrackingImpl(CacheGroupContext grp) {
this.grp = grp;
}
/** {@inheritDoc} */
@Override public void init(long initUpdCntr, @Nullable byte[] cntrUpdData) {
cntr.set(initUpdCntr);
reserveCntr.set(initCntr = initUpdCntr);
queue = fromBytes(cntrUpdData);
}
/** {@inheritDoc} */
@Override public long initial() {
return initCntr;
}
/** {@inheritDoc} */
@Override public long get() {
return cntr.get();
}
/** */
protected synchronized long highestAppliedCounter() {
return queue.isEmpty() ? cntr.get() : queue.lastEntry().getValue().absolute();
}
/**
* @return Next update counter. For tx mode called by {@link DataStreamerImpl} IsolatedUpdater.
*/
@Override public long next() {
long next = cntr.incrementAndGet();
reserveCntr.set(next);
return next;
}
/** {@inheritDoc} */
@Override public synchronized void update(long val) throws IgniteCheckedException {
// Reserved update counter is updated only on exchange.
long cur = get();
// Always set reserved counter equal to max known counter.
long max = Math.max(val, cur);
if (reserveCntr.get() < max)
reserveCntr.set(max);
// Outdated counter (txs are possible before current topology future is finished if primary is not changed).
if (val < cur)
return;
// Absolute counter should be not less than last applied update.
// Otherwise supplier doesn't contain some updates and rebalancing couldn't restore consistency.
// Best behavior is to stop node by failure handler in such a case.
if (val < highestAppliedCounter())
throw new IgniteCheckedException("Failed to update the counter [newVal=" + val + ", curState=" + this + ']');
cntr.set(val);
/** If some holes are present at this point, thar means some update were missed on recovery and will be restored
* during rebalance. All gaps are safe to "forget".
* Should only do it for first PME (later missed updates on node left are reset in {@link #finalizeUpdateCounters}. */
if (first) {
queue.clear();
first = false;
}
}
/** {@inheritDoc} */
@Override public synchronized boolean update(long start, long delta) {
long cur = cntr.get();
if (cur > start)
return false;
else if (cur < start) {
// Try merge with adjacent gaps in sequence.
long next = start + delta;
// Merge with next.
Item nextItem = queue.remove(next);
if (nextItem != null)
delta += nextItem.delta;
// Merge with previous, possibly modifying previous.
Map.Entry<Long, Item> prev = queue.lowerEntry(start);
if (prev != null) {
Item prevItem = prev.getValue();
if (prevItem.absolute() == start) {
prevItem.delta += delta;
return true;
}
else if (prevItem.within(next - 1))
return false;
}
if (queue.size() >= MAX_MISSED_UPDATES) // Should trigger failure handler.
throw new IgniteException("Too many gaps [cntr=" + this + ']');
return queue.putIfAbsent(start, new Item(start, delta)) == null;
}
else { // cur == start
long next = start + delta;
// There is only one next sequential item possible, all other items will be merged.
Item nextItem = queue.remove(next);
if (nextItem != null)
next += nextItem.delta;
boolean res = cntr.compareAndSet(cur, next);
assert res;
return true;
}
}
/** {@inheritDoc} */
@Override public void updateInitial(long start, long delta) {
update(start, delta);
initCntr = get();
if (reserveCntr.get() < initCntr)
reserveCntr.set(initCntr);
}
/** {@inheritDoc} */
@Override public synchronized GridLongList finalizeUpdateCounters() {
Map.Entry<Long, Item> item = queue.pollFirstEntry();
GridLongList gaps = null;
while (item != null) {
if (gaps == null)
gaps = new GridLongList((queue.size() + 1) * 2);
long start = cntr.get() + 1;
long end = item.getValue().start;
gaps.add(start);
gaps.add(end);
// Close pending ranges.
cntr.set(item.getValue().absolute());
item = queue.pollFirstEntry();
}
reserveCntr.set(get());
return gaps;
}
/** {@inheritDoc} */
@Override public synchronized long reserve(long delta) {
long cntr = get();
long reserved = reserveCntr.getAndAdd(delta);
assert reserved >= cntr : "LWM after HWM: lwm=" + cntr + ", hwm=" + reserved;
return reserved;
}
/** {@inheritDoc} */
@Override public long next(long delta) {
return cntr.getAndAdd(delta);
}
/** {@inheritDoc} */
@Override public synchronized boolean sequential() {
return queue.isEmpty();
}
/** {@inheritDoc} */
@Override public synchronized @Nullable byte[] getBytes() {
if (queue.isEmpty())
return null;
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeByte(VERSION);
int size = queue.size();
dos.writeInt(size);
for (Item item : queue.values()) {
dos.writeLong(item.start);
dos.writeLong(item.delta);
}
bos.close();
return bos.toByteArray();
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/**
* @param raw Raw bytes.
*/
private @Nullable NavigableMap<Long, Item> fromBytes(@Nullable byte[] raw) {
NavigableMap<Long, Item> ret = new TreeMap<>();
if (raw == null)
return ret;
try {
ByteArrayInputStream bis = new ByteArrayInputStream(raw);
DataInputStream dis = new DataInputStream(bis);
dis.readByte(); // Version.
int cnt = dis.readInt(); // Holes count.
while (cnt-- > 0) {
Item item = new Item(dis.readLong(), dis.readLong());
ret.put(item.start, item);
}
return ret;
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public synchronized void reset() {
initCntr = 0;
cntr.set(0);
reserveCntr.set(0);
queue.clear();
}
/** {@inheritDoc} */
@Override public void resetInitialCounter() {
initCntr = 0;
}
/**
* Update counter task. Update from start value by delta value.
*/
private static class Item {
/** */
private long start;
/** */
private long delta;
/**
* @param start Start value.
* @param delta Delta value.
*/
private Item(long start, long delta) {
this.start = start;
this.delta = delta;
}
/** {@inheritDoc} */
@Override public String toString() {
return "Item [" +
"start=" + start +
", delta=" + delta +
']';
}
/** */
public long start() {
return start;
}
/** */
public long delta() {
return delta;
}
/** */
public long absolute() {
return start + delta;
}
/** */
public boolean within(long cntr) {
return cntr - start < delta;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Item item = (Item)o;
if (start != item.start)
return false;
return delta == item.delta;
}
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
PartitionUpdateCounterTrackingImpl cntr = (PartitionUpdateCounterTrackingImpl)o;
if (!queue.equals(cntr.queue))
return false;
return this.cntr.get() == cntr.cntr.get();
}
/** {@inheritDoc} */
@Override public long reserved() {
return reserveCntr.get();
}
/** {@inheritDoc} */
@Override public synchronized boolean empty() {
return get() == 0 && sequential();
}
/** {@inheritDoc} */
@Override public Iterator<long[]> iterator() {
return F.iterator(queue.values().iterator(), item -> new long[] {item.start, item.delta}, true);
}
/** {@inheritDoc} */
@Override public String toString() {
return "Counter [lwm=" + get() + ", holes=" + queue +
", maxApplied=" + highestAppliedCounter() + ", hwm=" + reserveCntr.get() + ']';
}
/** {@inheritDoc} */
@Override public CacheGroupContext context() {
return grp;
}
}