blob: 17e9611a6eaf7dbd6088d2d7dbcb9337c563e6f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.loadtests.cache;
import java.util.Arrays;
import java.util.concurrent.Callable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteAtomicLong;
import org.apache.ignite.IgniteAtomicReference;
import org.apache.ignite.IgniteAtomicSequence;
import org.apache.ignite.IgniteAtomicStamped;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
/**
* Cache data structures load test.
*/
public final class GridCacheDataStructuresLoadTest extends GridCacheAbstractLoadTest {
/** Atomic long name. */
private static final String TEST_LONG_NAME = "test-atomic-long";
/** Atomic reference name. */
private static final String TEST_REF_NAME = "test-atomic-ref";
/** Atomic sequence name. */
private static final String TEST_SEQ_NAME = "test-atomic-seq";
/** Atomic stamped name. */
private static final String TEST_STAMP_NAME = "test-atomic-stamp";
/** Queue name. */
private static final String TEST_QUEUE_NAME = "test-queue";
/** Count down latch name. */
private static final String TEST_LATCH_NAME = "test-latch";
/** Semaphore name. */
private static final String TEST_SEMAPHORE_NAME = "test-semaphore";
/** Reentrant lock name. */
private static final String TEST_REENTRANT_LOCK_NAME = "test-reentrant-lock";
/** */
private static final CollectionConfiguration colCfg = new CollectionConfiguration();
/** Maximum added value. */
private static final int MAX_INT = 1000;
/** Count down latch initial count. */
private static final int LATCH_INIT_CNT = 1000;
/** Semaphore initial count. */
private static final int SEMAPHORE_INIT_CNT = 1000;
/** */
private static final boolean LONG = false;
/** */
private static final boolean REF = false;
/** */
private static final boolean SEQ = false;
/** */
private static final boolean STAMP = false;
/** */
private static final boolean QUEUE = false;
/** */
private static final boolean LATCH = true;
/** */
private static final boolean SEMAPHORE = true;
/** */
private static final boolean REENTRANTLOCK = true;
/** */
private GridCacheDataStructuresLoadTest() {
// No-op
}
/** Atomic long write closure. */
private final CIX1<Ignite> longWriteClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
for (int i = 0; i < operationsPerTx; i++) {
al.addAndGet(RAND.nextInt(MAX_INT));
long cnt = writes.incrementAndGet();
if (cnt % WRITE_LOG_MOD == 0)
info("Performed " + cnt + " writes.");
}
}
};
/** Atomic long read closure. */
private final CIX1<Ignite> longReadClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteAtomicLong al = ignite.atomicLong(TEST_LONG_NAME, 0, true);
for (int i = 0; i < operationsPerTx; i++) {
al.get();
long cnt = reads.incrementAndGet();
if (cnt % READ_LOG_MOD == 0)
info("Performed " + cnt + " reads.");
}
}
};
/** Atomic reference write closure. */
private final CIX1<Ignite> refWriteClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME,
null, true);
for (int i = 0; i < operationsPerTx; i++) {
ar.set(RAND.nextInt(MAX_INT));
long cnt = writes.incrementAndGet();
if (cnt % WRITE_LOG_MOD == 0)
info("Performed " + cnt + " writes.");
}
}
};
/** Atomic reference read closure. */
private final CIX1<Ignite> refReadClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteAtomicReference<Integer> ar = ignite.atomicReference(TEST_REF_NAME, null,
true);
for (int i = 0; i < operationsPerTx; i++) {
ar.get();
long cnt = reads.incrementAndGet();
if (cnt % READ_LOG_MOD == 0)
info("Performed " + cnt + " reads.");
}
}
};
/** Atomic sequence write closure. */
private final CIX1<Ignite> seqWriteClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
for (int i = 0; i < operationsPerTx; i++) {
as.addAndGet(RAND.nextInt(MAX_INT) + 1);
long cnt = writes.incrementAndGet();
if (cnt % WRITE_LOG_MOD == 0)
info("Performed " + cnt + " writes.");
}
}
};
/** Atomic sequence read closure. */
private final CIX1<Ignite> seqReadClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteAtomicSequence as = ignite.atomicSequence(TEST_SEQ_NAME, 0, true);
for (int i = 0; i < operationsPerTx; i++) {
as.get();
long cnt = reads.incrementAndGet();
if (cnt % READ_LOG_MOD == 0)
info("Performed " + cnt + " reads.");
}
}
};
/** Atomic stamped write closure. */
private final CIX1<Ignite> stampWriteClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
0, 0, true);
for (int i = 0; i < operationsPerTx; i++) {
as.set(RAND.nextInt(MAX_INT), RAND.nextInt(MAX_INT));
long cnt = writes.incrementAndGet();
if (cnt % WRITE_LOG_MOD == 0)
info("Performed " + cnt + " writes.");
}
}
};
/** Atomic stamped read closure. */
private final CIX1<Ignite> stampReadClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteAtomicStamped<Integer, Integer> as = ignite.atomicStamped(TEST_STAMP_NAME,
0, 0, true);
for (int i = 0; i < operationsPerTx; i++) {
as.get();
long cnt = reads.incrementAndGet();
if (cnt % READ_LOG_MOD == 0)
info("Performed " + cnt + " reads.");
}
}
};
/** Queue write closure. */
private final CIX1<Ignite> queueWriteClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
for (int i = 0; i < operationsPerTx; i++) {
q.put(RAND.nextInt(MAX_INT));
long cnt = writes.incrementAndGet();
if (cnt % WRITE_LOG_MOD == 0)
info("Performed " + cnt + " writes.");
}
}
};
/** Queue read closure. */
private final CIX1<Ignite> queueReadClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteQueue<Integer> q = ignite.queue(TEST_QUEUE_NAME, 0, colCfg);
for (int i = 0; i < operationsPerTx; i++) {
q.peek();
long cnt = reads.incrementAndGet();
if (cnt % READ_LOG_MOD == 0)
info("Performed " + cnt + " reads.");
}
}
};
/** Count down latch write closure. */
private final CIX1<Ignite> latchWriteClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
for (int i = 0; i < operationsPerTx; i++) {
l.countDown();
long cnt = writes.incrementAndGet();
if (cnt % WRITE_LOG_MOD == 0)
info("Performed " + cnt + " writes.");
}
}
};
/** Count down latch read closure. */
private final CIX1<Ignite> latchReadClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteCountDownLatch l = ignite.countDownLatch(TEST_LATCH_NAME, LATCH_INIT_CNT, true, true);
for (int i = 0; i < operationsPerTx; i++) {
l.count();
long cnt = reads.incrementAndGet();
if (cnt % READ_LOG_MOD == 0)
info("Performed " + cnt + " reads.");
}
}
};
/** Semaphore write closure. */
private final CIX1<Ignite> semaphoreWriteClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteSemaphore s = ignite.semaphore(TEST_SEMAPHORE_NAME, SEMAPHORE_INIT_CNT, false, true);
for (int i = 0; i < operationsPerTx; i++) {
if ((i % 2) == 0)
s.release();
else
s.acquire();
long cnt = writes.incrementAndGet();
if (cnt % WRITE_LOG_MOD == 0)
info("Performed " + cnt + " writes.");
}
}
};
/** Semaphore read closure. */
private final CIX1<Ignite> semaphoreReadClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteSemaphore s = ignite.semaphore(TEST_SEMAPHORE_NAME, SEMAPHORE_INIT_CNT, false, true);
for (int i = 0; i < operationsPerTx; i++) {
s.availablePermits();
long cnt = reads.incrementAndGet();
if (cnt % READ_LOG_MOD == 0)
info("Performed " + cnt + " reads.");
}
}
};
/** Reentrant lock read closure. */
private final CIX1<Ignite> reentrantLockReadClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, false, true);
for (int i = 0; i < operationsPerTx; i++) {
r.isLocked();
long cnt = reads.incrementAndGet();
if (cnt % READ_LOG_MOD == 0)
info("Performed " + cnt + " reads.");
}
}
};
/** Reentrant lock write closure. */
private final CIX1<Ignite> reentrantLockWriteClos =
new CIX1<Ignite>() {
@Override public void applyx(Ignite ignite) {
IgniteLock r = ignite.reentrantLock(TEST_REENTRANT_LOCK_NAME, true, false, true);
for (int i = 0; i < operationsPerTx; i++) {
if ((i % 2) == 0)
r.lock();
else
r.unlock();
long cnt = writes.incrementAndGet();
if (cnt % WRITE_LOG_MOD == 0)
info("Performed " + cnt + " writes.");
}
}
};
/**
* @param args Arguments.
* @throws IgniteCheckedException In case of error.
*/
public static void main(String[] args) throws IgniteCheckedException {
System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false");
System.out.println("Starting master node [params=" + Arrays.toString(args) + ']');
String cfg = args.length >= 1 ? args[0] : CONFIG_FILE;
String log = args.length >= 2 ? args[1] : LOG_FILE;
final GridCacheDataStructuresLoadTest test = new GridCacheDataStructuresLoadTest();
try (Ignite g = Ignition.start(test.configuration(cfg, log))) {
System.gc();
if (LONG) {
info("Testing atomic long...");
test.loadTestIgnite(test.longWriteClos, test.longReadClos);
}
System.gc();
if (REF) {
info("Testing atomic reference...");
test.loadTestIgnite(test.refWriteClos, test.refReadClos);
}
System.gc();
if (SEQ) {
info("Testing atomic sequence...");
test.loadTestIgnite(test.seqWriteClos, test.seqReadClos);
}
System.gc();
if (STAMP) {
info("Testing atomic stamped...");
test.loadTestIgnite(test.stampWriteClos, test.stampReadClos);
}
System.gc();
if (QUEUE) {
info("Testing queue...");
test.loadTestIgnite(test.queueWriteClos, test.queueReadClos);
}
System.gc();
if (LATCH) {
info("Testing count down latch...");
test.loadTestIgnite(test.latchWriteClos, test.latchReadClos);
}
System.gc();
if (SEMAPHORE) {
info("Testing semaphore...");
test.loadTestIgnite(test.semaphoreWriteClos, test.semaphoreReadClos);
}
System.gc();
if (REENTRANTLOCK) {
info("Testing reentrant lock...");
test.loadTestIgnite(test.reentrantLockWriteClos, test.reentrantLockReadClos);
}
}
}
/**
* @param writeClos Write closure.
* @param readClos ReadClosure.
*/
protected void loadTestIgnite(final CIX1<Ignite> writeClos, final CIX1<Ignite> readClos) {
info("Read threads: " + readThreads());
info("Write threads: " + writeThreads());
info("Test duration (ms): " + testDuration);
final Ignite ignite = G.ignite();
final IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
assert cache != null;
try {
IgniteInternalFuture<?> f1 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
long start = System.currentTimeMillis();
while (!done.get()) {
if (tx) {
try (Transaction tx = ignite.transactions().txStart()) {
writeClos.apply(ignite);
tx.commit();
}
}
else
writeClos.apply(ignite);
}
writeTime.addAndGet(System.currentTimeMillis() - start);
return null;
}
}, writeThreads(), "cache-load-test-worker");
IgniteInternalFuture<?> f2 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
@Nullable @Override public Object call() throws Exception {
long start = System.currentTimeMillis();
while (!done.get()) {
if (tx) {
try (Transaction tx = ignite.transactions().txStart()) {
readClos.apply(ignite);
tx.commit();
}
}
else
readClos.apply(ignite);
}
readTime.addAndGet(System.currentTimeMillis() - start);
return null;
}
}, readThreads(), "cache-load-test-worker");
Thread.sleep(testDuration);
done.set(true);
f1.get();
f2.get();
info("Test stats: ");
info(" total-threads = " + threads);
info(" write-ratio = " + writeRatio);
info(" total-runs = " + (reads.get() + writes.get()));
info(" total-reads = " + reads);
info(" total-writes = " + writes);
info(" read-time (ms) = " + readTime);
info(" write-time (ms) = " + writeTime);
info(" avg-read-time (ms) = " + ((double)readTime.get() / reads.get()));
info(" avg-write-time (ms) = " + ((double)writeTime.get() / writes.get()));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}