blob: cab77867215c09ed6a113f595524914ae85f1dfe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.transactions;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
*/
public class TxPartitionCounterStateOnePrimaryOneBackupTest extends TxPartitionCounterStateAbstractTest {
/** */
private static final int[] PREPARE_ORDER = new int[] {0, 1, 2};
/** */
private static final int[] PRIMARY_COMMIT_ORDER = new int[] {2, 1, 0};
/** */
private static final int[] BACKUP_COMMIT_ORDER = new int[] {1, 2, 0};
/** */
private static final int[] SIZES = new int[] {5, 7, 3};
/** */
private static final int TOTAL = IntStream.of(SIZES).sum() + PRELOAD_KEYS_CNT;
/** */
private static final int PARTITION_ID = 0;
/** */
private static final int BACKUPS = 1;
/** */
private static final int SERVERS_CNT = 2;
/** */
@Test
public void testPrepareCommitReorder() throws Exception {
doTestPrepareCommitReorder(false);
}
/** */
@Test
public void testPrepareCommitReorderSkipCheckpoint() throws Exception {
doTestPrepareCommitReorder(true);
}
/** */
@Test
public void testPrepareCommitReorderFailRebalance() throws Exception {
doTestPrepareCommitReorder_2(false);
}
/** */
@Test
public void testPrepareCommitReorderFailRebalanceSkipCheckpoint() throws Exception {
doTestPrepareCommitReorder_2(true);
}
/**
* Tests reproduces the problem: if partition was marked for clear on rebalance, it should never be selected for
* historical rebalance or only partial data defined by counter range will be loaded.
*
* @param skipCheckpoint Skip checkpoint.
*
* @throws Exception
*/
private void doTestPrepareCommitReorder(boolean skipCheckpoint) throws Exception {
T2<Ignite, List<Ignite>> txTop = runTest(skipCheckpoint).get(PARTITION_ID);
waitForTopology(SERVERS_CNT);
awaitPartitionMapExchange();
IgniteEx client = grid(CLIENT_GRID_NAME);
assertEquals("Primary has not all committed transactions", TOTAL, client.cache(DEFAULT_CACHE_NAME).size());
TestRecordingCommunicationSpi.stopBlockAll();
String primaryName = txTop.get1().name();
String backupName = txTop.get2().get(0).name();
IgniteEx primary = startGrid(primaryName);
awaitPartitionMapExchange();
assertPartitionsSame(idleVerify(client, DEFAULT_CACHE_NAME));
// Check if gaps are closed on rebalance.
PartitionUpdateCounter cntr = counter(PARTITION_ID, primary.name());
assertNotNull(cntr);
assertTrue(cntr.sequential());
assertEquals(TOTAL, cntr.get());
stopGrid(backupName);
awaitPartitionMapExchange();
assertNotNull(cntr = counter(PARTITION_ID, primary.name()));
assertEquals(TOTAL, cntr.reserved());
// Make update to advance a counter.
int addCnt = 10;
loadDataToPartition(PARTITION_ID, primaryName, DEFAULT_CACHE_NAME, addCnt, TOTAL);
// TODO https://issues.apache.org/jira/browse/IGNITE-11607
// Historical rebalance is not possible from history containing rebalanced entries.
// Next rebalance will be full.
IgniteEx grid0 = startGrid(backupName);
awaitPartitionMapExchange();
assertNotNull(cntr = counter(PARTITION_ID, grid0.name()));
assertEquals(TOTAL + addCnt, cntr.get());
assertEquals(TOTAL + addCnt, cntr.reserved());
assertPartitionsSame(idleVerify(client, DEFAULT_CACHE_NAME));
}
/**
* Adds primary restart in the middle of rebalance.
*
* @param skipCheckpoint Skip checkpoint.
*/
private void doTestPrepareCommitReorder_2(boolean skipCheckpoint) throws Exception {
Map<Integer, T2<Ignite, List<Ignite>>> txTops = runTest(skipCheckpoint);
T2<Ignite, List<Ignite>> txTop = txTops.get(PARTITION_ID);
String primaryName = txTop.get1().name();
String backupName = txTop.get2().get(0).name();
IgniteEx client = grid(CLIENT_GRID_NAME);
waitForTopology(SERVERS_CNT);
// If not wait for PME backup might not switch to primary.
awaitPartitionMapExchange();
PartitionUpdateCounter cntr2 = counter(PARTITION_ID, backupName);
assertNotNull(cntr2);
assertEquals(TOTAL, cntr2.get());
assertEquals(TOTAL, cntr2.reserved());
assertTrue(cntr2.sequential());
assertEquals("Backup has not all committed transactions", TOTAL, client.cache(DEFAULT_CACHE_NAME).size());
TestRecordingCommunicationSpi.stopBlockAll();
TestRecordingCommunicationSpi backupSpi = TestRecordingCommunicationSpi.spi(grid(backupName));
backupSpi.blockMessages((node, msg) -> {
if (msg instanceof GridDhtPartitionSupplyMessage) {
GridDhtPartitionSupplyMessage m0 = (GridDhtPartitionSupplyMessage)msg;
return m0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
}
return false;
});
// Restart primary during rebalance.
IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
try {
backupSpi.waitForBlocked();
}
catch (InterruptedException e) {
fail("Unexpected interruption");
}
stopGrid(skipCheckpoint, primaryName);
backupSpi.stopBlock();
try {
startGrid(primaryName);
awaitPartitionMapExchange();
}
catch (Exception e) {
fail();
}
}, 1);
// Trigger rebalance.
IgniteEx prim = startGrid(primaryName);
// Wait for restart.
fut.get();
assertPartitionsSame(idleVerify(client, DEFAULT_CACHE_NAME));
}
/**
* The callback order prepares and commits on primary node.
*/
protected class OnePhaseCommitTxCallbackAdapter extends TxCallbackAdapter {
/** */
private Queue<Integer> prepOrder;
/** */
private Queue<Integer> primCommitOrder;
/** */
private Queue<Integer> backupCommitOrder;
/** */
private Map<IgniteUuid, GridFutureAdapter<?>> prepFuts = new ConcurrentHashMap<>();
/** */
private Map<IgniteUuid, GridFutureAdapter<?>> primFinishFuts = new ConcurrentHashMap<>();
/** */
private Map<IgniteUuid, GridFutureAdapter<?>> backupFinishFuts = new ConcurrentHashMap<>();
/** */
private final int txCnt;
/**
* @param prepOrd Prepare order.
* @param primCommitOrder Commit order.
*/
public OnePhaseCommitTxCallbackAdapter(int[] prepOrd, int[] primCommitOrder, int[] backupCommitOrder) {
this.txCnt = prepOrd.length;
prepOrder = new ConcurrentLinkedQueue<>();
for (int aPrepOrd : prepOrd)
prepOrder.add(aPrepOrd);
this.primCommitOrder = new ConcurrentLinkedQueue<>();
for (int aCommitOrd : primCommitOrder)
this.primCommitOrder.add(aCommitOrd);
this.backupCommitOrder = new ConcurrentLinkedQueue<>();
for (int aCommitOrd : backupCommitOrder)
this.backupCommitOrder.add(aCommitOrd);
}
/** */
protected boolean onPrimaryPrepared(IgniteEx primary, IgniteInternalTx tx, int idx) {
log.info("TX: prepared on primary [name=" + primary.name() + ", txId=" + idx + ", tx=" + CU.txString(tx) + ']');
return false;
}
/**
* @param primary Primary primary.
*/
protected void onAllPrimaryPrepared(IgniteEx primary) {
log.info("TX: all primary prepared [name=" + primary.name() + ']');
}
/**
* @param primary Primary node.
* @param idx Index.
*/
protected boolean onPrimaryCommitted(IgniteEx primary, int idx) {
log.info("TX: primary committed [name=" + primary.name() + ", txId=" + idx + ']');
return false;
}
/**
* @param backup Backup node.
* @param idx Index.
*/
protected boolean onBackupCommitted(IgniteEx backup, int idx) {
log.info("TX: backup committed [name=" + backup.name() + ", txId=" + idx + ']');
return false;
}
/**
* @param primary Primary node.
*/
protected void onAllPrimaryCommitted(IgniteEx primary) {
log.info("TX: all primary committed [name=" + primary.name() + ']');
}
/**
* @param backup Backup node.
*/
protected void onAllBackupCommitted(IgniteEx backup) {
log.info("TX: all backup committed [name=" + backup.name() + ']');
}
/** {@inheritDoc} */
@Override public boolean beforePrimaryPrepare(IgniteEx primary, IgniteUuid nearXidVer,
GridFutureAdapter<?> proceedFut) {
runAsync(() -> {
prepFuts.put(nearXidVer, proceedFut);
// Order prepares.
if (prepFuts.size() == txCnt) { // Wait until all prep requests queued and force prepare order.
prepFuts.remove(version(prepOrder.poll())).onDone();
}
});
return true;
}
/** {@inheritDoc} */
@Override public boolean beforeBackupPrepare(IgniteEx primary, IgniteEx backup, IgniteInternalTx primaryTx,
GridFutureAdapter<?> proceedFut) {
runAsync(() -> {
IgniteUuid nearXidVer = primaryTx.nearXidVersion().asIgniteUuid();
onPrimaryPrepared(primary, primaryTx, order(nearXidVer));
backupFinishFuts.put(nearXidVer, proceedFut);
if (prepOrder.isEmpty() && backupFinishFuts.size() == txCnt) {
onAllPrimaryPrepared(primary);
assertEquals(txCnt, backupFinishFuts.size());
backupFinishFuts.remove(version(backupCommitOrder.poll())).onDone();
return;
}
prepFuts.remove(version(prepOrder.poll())).onDone();
});
return true;
}
/** {@inheritDoc} */
@Override public boolean afterBackupPrepare(IgniteEx primary, IgniteEx backup, IgniteInternalTx backupTx, IgniteUuid nearXidVer,
GridFutureAdapter<?> proceedFut) {
runAsync(() -> {
primFinishFuts.put(nearXidVer, proceedFut);
if (onBackupCommitted(backup, order(nearXidVer)))
return;
if (backupCommitOrder.isEmpty() && primFinishFuts.size() == txCnt) {
onAllBackupCommitted(primary);
assertEquals(txCnt, primFinishFuts.size());
primFinishFuts.remove(version(primCommitOrder.poll())).onDone();
return;
}
backupFinishFuts.remove(version(backupCommitOrder.poll())).onDone();
});
return true;
}
/** {@inheritDoc} */
@Override public boolean afterPrimaryPrepare(IgniteEx primary, @Nullable IgniteInternalTx tx, IgniteUuid nearXidVer,
GridFutureAdapter<?> proceedFut) {
runAsync(() -> {
if (onPrimaryCommitted(primary, order(nearXidVer)))
return;
if (primCommitOrder.isEmpty()) {
onAllPrimaryCommitted(primary);
return;
}
primFinishFuts.remove(version(primCommitOrder.poll())).onDone();
});
return false;
}
}
/**
* Test scenario:
* <p>
* 1. Assign counters for tx0, tx1, tx2 out of order.
* <p>
* 2. Commit all txs on backup.
* <p>
* 3. Commit only tx2 on primary.
* <p>
* 4. Stop primary node.
* <p>
*
* Pass condition: after primary start partitions are in sync.
*
* @param skipCheckpoint Skip checkpoint.
*/
private Map<Integer, T2<Ignite, List<Ignite>>> runTest(boolean skipCheckpoint) throws Exception {
return super.runOnPartition(PARTITION_ID, null, BACKUPS, SERVERS_CNT,
new IgniteClosure<Map<Integer, T2<Ignite, List<Ignite>>>, TxCallback>() {
private Map<Integer, T2<Ignite, List<Ignite>>> txTop;
@Override public TxCallback apply(Map<Integer, T2<Ignite, List<Ignite>>> map) {
txTop = map;
return new OnePhaseCommitTxCallbackAdapter(PREPARE_ORDER, PRIMARY_COMMIT_ORDER, BACKUP_COMMIT_ORDER) {
@Override protected boolean onPrimaryCommitted(IgniteEx primary, int idx) {
if (idx == PRIMARY_COMMIT_ORDER[0]) {
PartitionUpdateCounter cntr = counter(PARTITION_ID, primary.name());
assertNotNull(cntr);
assertEquals(TOTAL, cntr.reserved());
assertFalse(cntr.sequential());
long[] upd = cntr.iterator().next();
assertEquals(
cntr.toString(),
PRELOAD_KEYS_CNT + SIZES[PRIMARY_COMMIT_ORDER[1]] + SIZES[PRIMARY_COMMIT_ORDER[2]],
upd[0]);
assertEquals(
cntr.toString(),
SIZES[PRIMARY_COMMIT_ORDER[0]],
upd[1]);
// Check backup counter.
String backup = txTop.get(PARTITION_ID).get2().get(0).name();
PartitionUpdateCounter cntr2 = counter(PARTITION_ID, backup);
assertNotNull(cntr2);
assertFalse(primary.name().equals(backup));
assertEquals(TOTAL, cntr2.get());
assertEquals(0, cntr2.reserved());
assertTrue(cntr2.sequential());
assertEquals(TOTAL, grid(backup).cache(DEFAULT_CACHE_NAME).localSize(CachePeekMode.BACKUP));
// Will stop primary node before all remaining commits are applied.
stopGrid(skipCheckpoint, primary.name());
return true; // Stop commit chain.
}
throw new IgniteException("Should not commit other transactions");
}
};
}
}, SIZES);
}
}