blob: 11c672f1aae2e54cd9739cbb9a72863c9ca78454 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.tests.component;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.util.ExchangeHelper;
public class PerformanceTestComponent extends DefaultComponent {
public static final String HEADER_THREADS = "CamelPerfThreads";
public static final String HEADER_ITERATIONS = "CamelPerfIterations";
private static final int DEFAULT_THREADS = 8;
private static final int DEFAULT_ITERATIONS = 100;
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
Endpoint endpoint = new PerformanceTestEndpoint(uri, this);
setProperties(endpoint, parameters);
return endpoint;
}
public static int getHeaderValue(Exchange exchange, String header) {
Integer value = exchange.getContext().getTypeConverter().convertTo(Integer.class, exchange, exchange.getIn().getHeader(header));
return value != null ? value
: header.equals(HEADER_THREADS) ? DEFAULT_THREADS
: header.equals(HEADER_ITERATIONS) ? DEFAULT_ITERATIONS : 0;
}
private static final class PerformanceTestEndpoint extends DefaultEndpoint {
private PerformanceTestConsumer consumer;
protected PerformanceTestEndpoint(String uri, Component component) {
super(uri, component);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
synchronized (this) {
if (consumer != null && processor != consumer.getProcessor()) {
throw new Exception("PerformanceTestEndpoint doesn not support multiple consumers per Endpoint");
}
consumer = new PerformanceTestConsumer(this, processor);
}
return consumer;
}
@Override
public Producer createProducer() throws Exception {
return new PerformanceTestProducer(this);
}
@Override
public boolean isSingleton() {
return true;
}
public Consumer getConsumer() {
return consumer;
}
}
private static final class PerformanceTestConsumer extends DefaultConsumer {
protected PerformanceTestConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
}
}
private static final class PerformanceTestProducer extends DefaultProducer implements AsyncProcessor {
protected PerformanceTestProducer(Endpoint endpoint) {
super(endpoint);
}
@Override
public void process(final Exchange exchange) throws Exception {
final int count = getHeaderValue(exchange, HEADER_ITERATIONS);
final int threads = getHeaderValue(exchange, HEADER_THREADS);
PerformanceTestEndpoint endpoint = (PerformanceTestEndpoint)getEndpoint();
if (endpoint != null) {
final DefaultConsumer consumer = (DefaultConsumer)endpoint.getConsumer();
ExecutorService executor = exchange.getContext().getExecutorServiceManager().newFixedThreadPool(this, "perf", threads);
CompletionService<Exchange> tasks = new ExecutorCompletionService<Exchange>(executor);
// StopWatch watch = new StopWatch(); // if we want to clock how long it takes
for (int i = 0; i < count; i++) {
tasks.submit(new Callable<Exchange>() {
@Override
public Exchange call() throws Exception {
Exchange exch = ExchangeHelper.createCopy(exchange, false);
try {
consumer.getProcessor().process(exch);
} catch (final Exception e) {
exch.setException(e);
}
return exch;
}
});
}
for (int i = 0; i < count; i++) {
// Future<Exchange> result = tasks.take();
tasks.take(); // wait for all exchanges to complete
}
}
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
this.process(exchange);
} catch (Exception e) {
exchange.setException(e);
}
callback.done(true);
return true;
}
}
}