blob: 26f0a23472c60cdcef8fba1262826ea3b9445e96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.performancestatistics;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.ClientType.CLIENT;
import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.ClientType.SERVER;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_ALL;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_PUT;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_GET_AND_REMOVE;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_INVOKE;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_INVOKE_ALL;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_LOCK;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_PUT_ALL;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE;
import static org.apache.ignite.internal.processors.performancestatistics.OperationType.CACHE_REMOVE_ALL;
/**
* Tests performance statistics.
*/
@RunWith(Parameterized.class)
@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
public class PerformanceStatisticsSelfTest extends AbstractPerformanceStatisticsTest {
/** Nodes count. */
private static final int NODES_CNT = 2;
/** Cache entry count. */
private static final int ENTRY_COUNT = 100;
/** Test entry processor. */
private static final EntryProcessor<Object, Object, Object> ENTRY_PROC =
new EntryProcessor<Object, Object, Object>() {
@Override public Object process(MutableEntry<Object, Object> entry, Object... arguments)
throws EntryProcessorException {
return null;
}
};
/** Test cache entry processor. */
private static final CacheEntryProcessor<Object, Object, Object> CACHE_ENTRY_PROC =
new CacheEntryProcessor<Object, Object, Object>() {
@Override public Object process(MutableEntry<Object, Object> entry, Object... arguments)
throws EntryProcessorException {
return null;
}
};
/** Client type to run operations from. */
@Parameterized.Parameter
public ClientType clientType;
/** @return Test parameters. */
@Parameterized.Parameters(name = "clientType={0}")
public static Collection<?> parameters() {
return Arrays.asList(new Object[][] {{SERVER}, {CLIENT}});
}
/** Ignite. */
private static IgniteEx srv;
/** Ignite node to run load from. */
private static IgniteEx node;
/** Test cache. */
private static IgniteCache<Object, Object> cache;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setCacheConfiguration(defaultCacheConfiguration());
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
srv = startGrid(NODES_CNT - 1);
IgniteEx client = startClientGrid(NODES_CNT);
node = clientType == SERVER ? srv : client;
cache = node.cache(DEFAULT_CACHE_NAME);
for (int i = 0; i < ENTRY_COUNT; i++)
cache.put(i, i);
}
/** @throws Exception If failed. */
@Test
public void testCompute() throws Exception {
String testTaskName = "testTask";
int executions = 5;
long startTime = U.currentTimeMillis();
startCollectStatistics();
IgniteRunnable task = new IgniteRunnable() {
@Override public void run() {
// No-op.
}
};
for (int i = 0; i < executions; i++)
node.compute().withName(testTaskName).run(task);
HashMap<IgniteUuid, Integer> sessions = new HashMap<>();
AtomicInteger tasks = new AtomicInteger();
AtomicInteger jobs = new AtomicInteger();
stopCollectStatisticsAndRead(new TestHandler() {
@Override public void task(UUID nodeId, IgniteUuid sesId, String taskName, long taskStartTime,
long duration, int affPartId) {
sessions.compute(sesId, (uuid, cnt) -> cnt == null ? 1 : ++cnt);
tasks.incrementAndGet();
assertEquals(node.context().localNodeId(), nodeId);
assertEquals(testTaskName, taskName);
assertTrue(taskStartTime >= startTime);
assertTrue(duration >= 0);
assertEquals(-1, affPartId);
}
@Override public void job(UUID nodeId, IgniteUuid sesId, long queuedTime, long jobStartTime, long duration,
boolean timedOut) {
sessions.compute(sesId, (uuid, cnt) -> cnt == null ? 1 : ++cnt);
jobs.incrementAndGet();
assertEquals(srv.context().localNodeId(), nodeId);
assertTrue(queuedTime >= 0);
assertTrue(jobStartTime >= startTime);
assertTrue(duration >= 0);
assertFalse(timedOut);
}
});
assertEquals(executions, tasks.get());
assertEquals(executions, jobs.get());
Collection<Integer> vals = sessions.values();
assertEquals(executions, vals.size());
assertTrue("Invalid sessions: " + sessions, vals.stream().allMatch(cnt -> cnt == NODES_CNT));
}
/** @throws Exception If failed. */
@Test
public void testCacheOperation() throws Exception {
checkCacheOperation(CACHE_PUT, cache -> cache.put(1, 1));
checkCacheOperation(CACHE_PUT, cache -> cache.putAsync(2, 2).get());
checkCacheOperation(CACHE_PUT_ALL, cache -> cache.putAll(Collections.singletonMap(3, 3)));
checkCacheOperation(CACHE_PUT_ALL, cache -> cache.putAllAsync(Collections.singletonMap(4, 4)).get());
checkCacheOperation(CACHE_GET, cache -> cache.get(1));
checkCacheOperation(CACHE_GET, cache -> cache.getAsync(2).get());
checkCacheOperation(CACHE_GET_AND_PUT, cache -> cache.getAndPut(1, 1));
checkCacheOperation(CACHE_GET_AND_PUT, cache -> cache.getAndPutAsync(2, 2).get());
checkCacheOperation(CACHE_GET_ALL, cache -> cache.getAll(Collections.singleton(1)));
checkCacheOperation(CACHE_GET_ALL, cache -> cache.getAllAsync(Collections.singleton(2)).get());
checkCacheOperation(CACHE_GET_ALL, cache -> cache.getAllOutTx(Collections.singleton(1)));
checkCacheOperation(CACHE_GET_ALL, cache -> cache.getAllOutTxAsync(Collections.singleton(2)).get());
checkCacheOperation(CACHE_REMOVE, cache -> cache.remove(1));
checkCacheOperation(CACHE_REMOVE, cache -> cache.removeAsync(2).get());
checkCacheOperation(CACHE_REMOVE_ALL, cache -> cache.removeAll(Collections.singleton(3)));
checkCacheOperation(CACHE_REMOVE_ALL, cache -> cache.removeAllAsync(Collections.singleton(4)).get());
checkCacheOperation(CACHE_GET_AND_REMOVE, cache -> cache.getAndRemove(5));
checkCacheOperation(CACHE_GET_AND_REMOVE, cache -> cache.getAndRemoveAsync(6).get());
checkCacheOperation(CACHE_LOCK, cache -> {
Lock lock = cache.lock(7);
lock.lock();
lock.unlock();
});
checkCacheOperation(CACHE_LOCK, cache -> {
Lock lock = cache.lockAll(Collections.singleton(8));
lock.lock();
lock.unlock();
});
checkCacheOperation(CACHE_INVOKE, cache -> cache.invoke(10, ENTRY_PROC));
checkCacheOperation(CACHE_INVOKE, cache -> cache.invokeAsync(10, ENTRY_PROC).get());
checkCacheOperation(CACHE_INVOKE, cache -> cache.invoke(10, CACHE_ENTRY_PROC));
checkCacheOperation(CACHE_INVOKE, cache -> cache.invokeAsync(10, CACHE_ENTRY_PROC).get());
checkCacheOperation(CACHE_INVOKE_ALL, cache -> cache.invokeAll(Collections.singleton(10), ENTRY_PROC));
checkCacheOperation(CACHE_INVOKE_ALL,
cache -> cache.invokeAllAsync(Collections.singleton(10), ENTRY_PROC).get());
checkCacheOperation(CACHE_INVOKE_ALL, cache -> cache.invokeAll(Collections.singleton(10), CACHE_ENTRY_PROC));
checkCacheOperation(CACHE_INVOKE_ALL,
cache -> cache.invokeAllAsync(Collections.singleton(10), CACHE_ENTRY_PROC).get());
}
/** Checks cache operation. */
private void checkCacheOperation(OperationType op, Consumer<IgniteCache<Object, Object>> clo) throws Exception {
long startTime = U.currentTimeMillis();
cleanPerformanceStatisticsDir();
startCollectStatistics();
clo.accept(cache);
AtomicInteger ops = new AtomicInteger();
stopCollectStatisticsAndRead(new TestHandler() {
@Override public void cacheOperation(UUID nodeId, OperationType type, int cacheId, long opStartTime,
long duration) {
ops.incrementAndGet();
assertEquals(node.context().localNodeId(), nodeId);
assertEquals(op, type);
assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), cacheId);
assertTrue(opStartTime >= startTime);
assertTrue(duration >= 0);
}
});
assertEquals(1, ops.get());
}
/** @throws Exception If failed. */
@Test
public void testTransaction() throws Exception {
checkTx(true);
checkTx(false);
}
/** @param commited {@code True} if check transaction commited. */
private void checkTx(boolean commited) throws Exception {
long startTime = U.currentTimeMillis();
cleanPerformanceStatisticsDir();
startCollectStatistics();
try (Transaction tx = node.transactions().txStart()) {
for (int i = 0; i < 10; i++)
cache.put(i, i * 2);
if (commited)
tx.commit();
else
tx.rollback();
}
AtomicInteger txs = new AtomicInteger();
stopCollectStatisticsAndRead(new TestHandler() {
@Override public void transaction(UUID nodeId, GridIntList cacheIds, long txStartTime, long duration,
boolean txCommited) {
txs.incrementAndGet();
assertEquals(node.context().localNodeId(), nodeId);
assertEquals(1, cacheIds.size());
assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), cacheIds.get(0));
assertTrue(txStartTime >= startTime);
assertTrue(duration >= 0);
assertEquals(commited, txCommited);
}
});
assertEquals(1, txs.get());
}
}