blob: 2cbee19e3408e9818b967927bb9885ea61169a65 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package org.apache.storm.perf.queuetest;
import java.util.Collections;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.WaitStrategyPark;
import org.apache.storm.utils.JCQueue;
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class JCQueuePerfTest {
// Usage: Let it and then explicitly terminate.
// Metrics will be printed when application is terminated.
public static void main(String[] args) throws Exception {
// oneProducer1Consumer(1000); // -- measurement 1
// twoProducer1Consumer(1000); // -- measurement 2
// threeProducer1Consumer(1); // -- measurement 3
// oneProducer2Consumers(); // -- measurement 4
// producerFwdConsumer(); // -- measurement 5
// ackingProducerSimulation(); // -- measurement 6
while (true) {
Thread.sleep(1000);
}
}
private static void ackingProducerSimulation() {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
JCQueue ackQ = new JCQueue("ackQ", "ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
final Acker acker = new Acker(ackQ, spoutQ);
runAllThds(ackingProducer, acker);
}
private static void producerFwdConsumer(int prodBatchSz) {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test",
Collections.singletonList(1000), 1000, registry);
JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final Producer prod = new Producer(q1);
final Forwarder fwd = new Forwarder(q1, q2);
final Consumer cons = new Consumer(q2);
runAllThds(prod, fwd, cons);
}
private static void oneProducer1Consumer(int prodBatchSz) {
JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
final Consumer cons1 = new Consumer(q1);
runAllThds(prod1, cons1);
}
private static void twoProducer1Consumer(int prodBatchSz) {
JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
final Producer prod2 = new Producer(q1);
final Consumer cons1 = new Consumer(q1);
runAllThds(prod1, prod2, cons1);
}
private static void threeProducer1Consumer(int prodBatchSz) {
JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
final Producer prod2 = new Producer(q1);
final Producer prod3 = new Producer(q1);
final Consumer cons1 = new Consumer(q1);
runAllThds(prod1, prod2, prod3, cons1);
}
private static void oneProducer2Consumers(int prodBatchSz) {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final Producer2 prod1 = new Producer2(q1, q2);
final Consumer cons1 = new Consumer(q1);
final Consumer cons2 = new Consumer(q2);
runAllThds(prod1, cons1, cons2);
}
public static void runAllThds(MyThread... threads) {
for (Thread thread : threads) {
thread.start();
}
addShutdownHooks(threads);
}
public static void addShutdownHooks(MyThread... threads) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
System.err.println("Stopping");
for (Thread thread : threads) {
thread.interrupt();
}
for (Thread thread : threads) {
System.err.println("Waiting for " + thread.getName());
thread.join();
}
for (MyThread thread : threads) {
System.err.printf("%s : %d, Throughput: %,d \n", thread.getName(), thread.count, thread.throughput());
}
} catch (InterruptedException e) {
return;
}
}));
}
}