blob: b07615067d7c91f18d65eaf15cd4debb4842b61c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache;
import java.util.Arrays;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionMetrics;
import org.junit.Test;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
/**
* Transactional cache metrics test.
*/
public abstract class GridCacheTransactionalAbstractMetricsSelfTest extends GridCacheAbstractMetricsSelfTest {
/** */
private static final int TX_CNT = 3;
/** Transaction timeout. */
private static final long TX_TIMEOUT = 500L;
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticReadCommittedCommits() throws Exception {
testCommits(OPTIMISTIC, READ_COMMITTED, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticReadCommittedCommitsNoData() throws Exception {
testCommits(OPTIMISTIC, READ_COMMITTED, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticRepeatableReadCommits() throws Exception {
testCommits(OPTIMISTIC, REPEATABLE_READ, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticRepeatableReadCommitsNoData() throws Exception {
testCommits(OPTIMISTIC, REPEATABLE_READ, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticSerializableCommits() throws Exception {
testCommits(OPTIMISTIC, SERIALIZABLE, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticSerializableCommitsNoData() throws Exception {
testCommits(OPTIMISTIC, SERIALIZABLE, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticReadCommittedCommits() throws Exception {
testCommits(PESSIMISTIC, READ_COMMITTED, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticReadCommittedCommitsNoData() throws Exception {
testCommits(PESSIMISTIC, READ_COMMITTED, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticRepeatableReadCommits() throws Exception {
testCommits(PESSIMISTIC, REPEATABLE_READ, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticRepeatableReadCommitsNoData() throws Exception {
testCommits(PESSIMISTIC, REPEATABLE_READ, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticSerializableCommits() throws Exception {
testCommits(PESSIMISTIC, SERIALIZABLE, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticSerializableCommitsNoData() throws Exception {
testCommits(PESSIMISTIC, SERIALIZABLE, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticReadCommittedRollbacks() throws Exception {
testRollbacks(OPTIMISTIC, READ_COMMITTED, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticReadCommittedRollbacksNoData() throws Exception {
testRollbacks(OPTIMISTIC, READ_COMMITTED, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticRepeatableReadRollbacks() throws Exception {
testRollbacks(OPTIMISTIC, REPEATABLE_READ, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticRepeatableReadRollbacksNoData() throws Exception {
testRollbacks(OPTIMISTIC, REPEATABLE_READ, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticSerializableRollbacks() throws Exception {
testRollbacks(OPTIMISTIC, SERIALIZABLE, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticSerializableRollbacksNoData() throws Exception {
testRollbacks(OPTIMISTIC, SERIALIZABLE, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticReadCommittedRollbacks() throws Exception {
testRollbacks(PESSIMISTIC, READ_COMMITTED, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticReadCommittedRollbacksNoData() throws Exception {
testRollbacks(PESSIMISTIC, READ_COMMITTED, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticRepeatableReadRollbacks() throws Exception {
testRollbacks(PESSIMISTIC, REPEATABLE_READ, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticRepeatableReadRollbacksNoData() throws Exception {
testRollbacks(PESSIMISTIC, REPEATABLE_READ, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticSerializableRollbacks() throws Exception {
testRollbacks(PESSIMISTIC, SERIALIZABLE, true);
}
/**
* @throws Exception If failed.
*/
@Test
public void testPessimisticSerializableRollbacksNoData() throws Exception {
testRollbacks(PESSIMISTIC, SERIALIZABLE, false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticSuspendedReadCommittedTxTimeoutRollbacks() throws Exception {
doTestSuspendedTxTimeoutRollbacks(OPTIMISTIC, READ_COMMITTED);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticSuspendedRepeatableReadTxTimeoutRollbacks() throws Exception {
doTestSuspendedTxTimeoutRollbacks(OPTIMISTIC, REPEATABLE_READ);
}
/**
* @throws Exception If failed.
*/
@Test
public void testOptimisticSuspendedSerializableTxTimeoutRollbacks() throws Exception {
doTestSuspendedTxTimeoutRollbacks(OPTIMISTIC, SERIALIZABLE);
}
/** */
@Test
public void testCommitTime() {
IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
HistogramMetricImpl m = metric("CommitTime");
assertTrue(Arrays.stream(m.value()).allMatch(v -> v == 0));
try (Transaction tx = grid(0).transactions().txStart()) {
cache.put(1, 1);
tx.commit();
}
assertEquals(1, Arrays.stream(m.value()).filter(v -> v == 1).count());
}
/** */
@Test
public void testRollbackTime() {
IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
HistogramMetricImpl m = metric("RollbackTime");
assertTrue(Arrays.stream(m.value()).allMatch(v -> v == 0));
try (Transaction tx = grid(0).transactions().txStart()) {
cache.put(1, 1);
tx.rollback();
}
assertEquals(1, Arrays.stream(m.value()).filter(v -> v == 1).count());
}
/**
* @param concurrency Concurrency control.
* @param isolation Isolation level.
* @param put Put some data if {@code true}.
* @throws Exception If failed.
*/
private void testCommits(TransactionConcurrency concurrency, TransactionIsolation isolation, boolean put)
throws Exception {
IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < TX_CNT; i++) {
Transaction tx = grid(0).transactions().txStart(concurrency, isolation);
if (put)
for (int j = 0; j < keyCount(); j++)
cache.put(j, j);
// Waiting 30 ms for metrics. U.currentTimeMillis() method has 10 ms discretization.
U.sleep(30);
tx.commit();
}
for (int i = 0; i < gridCount(); i++) {
TransactionMetrics metrics = grid(i).transactions().metrics();
CacheMetrics cacheMetrics = grid(i).cache(DEFAULT_CACHE_NAME).localMetrics();
if (i == 0) {
assertEquals(TX_CNT, metrics.txCommits());
if (put) {
assertEquals(TX_CNT, cacheMetrics.getCacheTxCommits());
// Expected metric value should be in microseconds.
assert cacheMetrics.getAverageTxCommitTime() > 1000 : cacheMetrics.getAverageTxCommitTime();
}
}
else {
assertEquals(0, metrics.txCommits());
assertEquals(0, cacheMetrics.getCacheTxCommits());
}
assertEquals(0, metrics.txRollbacks());
assertEquals(0, cacheMetrics.getCacheTxRollbacks());
}
}
/**
* @param concurrency Concurrency control.
* @param isolation Isolation level.
* @param put Put some data if {@code true}.
* @throws Exception If failed.
*/
private void testRollbacks(TransactionConcurrency concurrency, TransactionIsolation isolation,
boolean put) throws Exception {
IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < TX_CNT; i++) {
Transaction tx = grid(0).transactions().txStart(concurrency, isolation);
if (put)
for (int j = 0; j < keyCount(); j++)
cache.put(j, j);
// Waiting 30 ms for metrics. U.currentTimeMillis() method has 10 ms discretization.
U.sleep(30);
tx.rollback();
}
for (int i = 0; i < gridCount(); i++) {
TransactionMetrics metrics = grid(i).transactions().metrics();
CacheMetrics cacheMetrics = grid(i).cache(DEFAULT_CACHE_NAME).localMetrics();
assertEquals(0, metrics.txCommits());
assertEquals(0, cacheMetrics.getCacheTxCommits());
if (i == 0) {
assertEquals(TX_CNT, metrics.txRollbacks());
if (put) {
assertEquals(TX_CNT, cacheMetrics.getCacheTxRollbacks());
// Expected metric value should be in microseconds.
assert cacheMetrics.getAverageTxRollbackTime() > 1000 : cacheMetrics.getAverageTxRollbackTime();
}
}
else {
assertEquals(0, metrics.txRollbacks());
assertEquals(0, cacheMetrics.getCacheTxRollbacks());
}
}
}
/**
* Metrics test for transaction timeout rollback.
*
* @param concurrency Concurrency control.
* @param isolation Isolation level.
* @throws Exception If failed.
*/
private void doTestSuspendedTxTimeoutRollbacks(TransactionConcurrency concurrency, TransactionIsolation isolation)
throws Exception {
IgniteCache<Integer, Integer> cache = grid(0).cache(DEFAULT_CACHE_NAME);
IgniteTransactions transactions = grid(0).transactions();
for (int i = 0; i < TX_CNT; i++) {
Transaction tx = transactions.txStart(concurrency, isolation, TX_TIMEOUT, 0);
cache.put(1, 1);
tx.suspend();
boolean res = GridTestUtils.waitForCondition(() -> tx.state() == ROLLED_BACK, TX_TIMEOUT * 10);
assertTrue(res);
tx.close();
}
TransactionMetrics txMetrics = transactions.metrics();
CacheMetrics cacheMetrics = cache.localMetrics();
assertEquals(0, txMetrics.txCommits());
assertEquals(0, cacheMetrics.getCacheTxCommits());
assertEquals(TX_CNT, txMetrics.txRollbacks());
assertEquals(TX_CNT, cacheMetrics.getCacheTxRollbacks());
assertTrue(cacheMetrics.getAverageTxRollbackTime() > 0);
for (int i = 1; i < gridCount(); i++) {
txMetrics = grid(i).transactions().metrics();
cacheMetrics = grid(i).cache(DEFAULT_CACHE_NAME).localMetrics();
assertEquals(0, txMetrics.txCommits());
assertEquals(0, cacheMetrics.getCacheTxCommits());
assertEquals(0, txMetrics.txRollbacks());
assertEquals(0, cacheMetrics.getCacheTxRollbacks());
}
}
}