blob: 0bc75ff00e47841b9e7a2d9962daf29b4ece7504 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.util.future;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;
/**
* Performance tests added to compare the same functionality in .Net.
*/
public class GridFutureQueueTest {
/**
* @param args Arguments.
* @throws Exception If failed.
*/
public static void main(String[] args) throws Exception {
int threads = 64;
for (int i = 0; i < 3; i++)
new QueueTest().testQueue(30000, threads);
}
/**
*/
private static class Message {
/** */
public final long id;
/**
* @param id Message id.
*/
Message(long id) {
this.id = id;
}
}
/**
*
*/
private static class Future<T> extends GridFutureAdapter<T> {
/** */
private Message msg;
/**
* @param msg Message.
*/
Future(Message msg) {
this.msg = msg;
}
}
/**
*
*/
private static class QueueTest {
/** */
private AtomicLong qSize = new AtomicLong();
/** */
private final Deque<Future> queue = new ConcurrentLinkedDeque<>();
/** */
private volatile boolean stop;
/** */
private final Object mux = new Object();
/** */
private AtomicLong cnt = new AtomicLong();
/**
* @param time Test time.
* @param writers Number of writers threads.
* @throws Exception If failed.
*/
public void testQueue(long time, int writers) throws Exception {
System.out.println("Start test [writers=" + writers + ", time=" + time + "]");
Thread rdr = new Thread() {
@SuppressWarnings({"InfiniteLoopStatement", "unchecked"})
@Override public void run() {
try {
while (true) {
Future fut;
while ((fut = queue.poll()) != null) {
qSize.decrementAndGet();
fut.onDone(true);
}
long size = qSize.get();
if (size == 0) {
synchronized (mux) {
while (queue.isEmpty())
mux.wait();
}
}
}
}
catch (InterruptedException ignore) {
}
}
};
rdr.start();
List<Thread> putThreads = new ArrayList<>();
for (int i = 0; i < writers; i++) {
putThreads.add(new Thread() {
@SuppressWarnings("CallToNotifyInsteadOfNotifyAll")
@Override public void run() {
try {
while (!stop) {
long id = cnt.incrementAndGet();
Future<Boolean> fut = new Future<Boolean>(new Message(id));
queue.offer(fut);
long size = qSize.incrementAndGet();
if (size == 1) {
synchronized (mux) {
mux.notify();
}
}
Boolean res = fut.get();
if (!res)
System.out.println("Error");
}
}
catch (Exception e ) {
e.printStackTrace();
}
}
});
}
for (Thread t : putThreads)
t.start();
Thread.sleep(time);
stop = true;
for (Thread t : putThreads)
t.join();
rdr.interrupt();
rdr.join();
System.out.println("Total: " + cnt.get());
System.gc();
System.gc();
System.gc();
}
}
}