blob: 61fc7d3a1163c3e279080b66c565b2bce00c3b87 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.impl;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
/**
* Template (named like Spring's TransactionTemplate & JmsTemplate
* et al) for working with Camel and sending {@link Message} instances in an
* {@link Exchange} to an {@link Endpoint}.
*
* @version
*/
public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
private final CamelContext context;
private volatile ProducerCache producerCache;
private volatile ExecutorService executor;
private Endpoint defaultEndpoint;
private int maximumCacheSize;
public DefaultProducerTemplate(CamelContext context) {
this.context = context;
}
public DefaultProducerTemplate(CamelContext context, ExecutorService executor) {
this.context = context;
this.executor = executor;
}
public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) {
this(context);
this.defaultEndpoint = defaultEndpoint;
}
public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) {
Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri);
return new DefaultProducerTemplate(camelContext, endpoint);
}
public int getMaximumCacheSize() {
return maximumCacheSize;
}
public void setMaximumCacheSize(int maximumCacheSize) {
this.maximumCacheSize = maximumCacheSize;
}
public int getCurrentCacheSize() {
if (producerCache == null) {
return 0;
}
return producerCache.size();
}
public Exchange send(String endpointUri, Exchange exchange) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
return send(endpoint, exchange);
}
public Exchange send(String endpointUri, Processor processor) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
return send(endpoint, processor);
}
public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
return send(endpoint, pattern, processor);
}
public Exchange send(Endpoint endpoint, Exchange exchange) {
getProducerCache().send(endpoint, exchange);
return exchange;
}
public Exchange send(Endpoint endpoint, Processor processor) {
return getProducerCache().send(endpoint, processor);
}
public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
return getProducerCache().send(endpoint, pattern, processor);
}
public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) {
Exchange result = send(endpoint, pattern, createSetBodyProcessor(body));
return extractResultBody(result, pattern);
}
public void sendBody(Endpoint endpoint, Object body) throws CamelExecutionException {
Exchange result = send(endpoint, createSetBodyProcessor(body));
// must invoke extract result body in case of exception to be rethrown
extractResultBody(result);
}
public void sendBody(String endpointUri, Object body) throws CamelExecutionException {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
sendBody(endpoint, body);
}
public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) throws CamelExecutionException {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
Object result = sendBody(endpoint, pattern, body);
if (pattern.isOutCapable()) {
return result;
} else {
// return null if not OUT capable
return null;
}
}
public void sendBodyAndHeader(String endpointUri, final Object body, final String header, final Object headerValue) throws CamelExecutionException {
sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
}
public void sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, final Object headerValue) throws CamelExecutionException {
Exchange result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
// must invoke extract result body in case of exception to be rethrown
extractResultBody(result);
}
public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body,
final String header, final Object headerValue) throws CamelExecutionException {
Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
Object result = extractResultBody(exchange, pattern);
if (pattern.isOutCapable()) {
return result;
} else {
// return null if not OUT capable
return null;
}
}
public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body,
final String header, final Object headerValue) throws CamelExecutionException {
Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
Object result = extractResultBody(exchange, pattern);
if (pattern.isOutCapable()) {
return result;
} else {
// return null if not OUT capable
return null;
}
}
public void sendBodyAndProperty(String endpointUri, final Object body,
final String property, final Object propertyValue) throws CamelExecutionException {
sendBodyAndProperty(resolveMandatoryEndpoint(endpointUri), body, property, propertyValue);
}
public void sendBodyAndProperty(Endpoint endpoint, final Object body,
final String property, final Object propertyValue) throws CamelExecutionException {
Exchange result = send(endpoint, createBodyAndPropertyProcessor(body, property, propertyValue));
// must invoke extract result body in case of exception to be rethrown
extractResultBody(result);
}
public Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, final Object body,
final String property, final Object propertyValue) throws CamelExecutionException {
Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
Object result = extractResultBody(exchange, pattern);
if (pattern.isOutCapable()) {
return result;
} else {
// return null if not OUT capable
return null;
}
}
public Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, final Object body,
final String property, final Object propertyValue) throws CamelExecutionException {
Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
Object result = extractResultBody(exchange, pattern);
if (pattern.isOutCapable()) {
return result;
} else {
// return null if not OUT capable
return null;
}
}
public void sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
}
public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
Exchange result = send(endpoint, new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
for (Map.Entry<String, Object> header : headers.entrySet()) {
in.setHeader(header.getKey(), header.getValue());
}
in.setBody(body);
}
});
// must invoke extract result body in case of exception to be rethrown
extractResultBody(result);
}
public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers) throws CamelExecutionException {
return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers);
}
public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
Exchange exchange = send(endpoint, pattern, new Processor() {
public void process(Exchange exchange) throws Exception {
Message in = exchange.getIn();
for (Map.Entry<String, Object> header : headers.entrySet()) {
in.setHeader(header.getKey(), header.getValue());
}
in.setBody(body);
}
});
Object result = extractResultBody(exchange, pattern);
if (pattern.isOutCapable()) {
return result;
} else {
// return null if not OUT capable
return null;
}
}
// Methods using an InOut ExchangePattern
// -----------------------------------------------------------------------
public Exchange request(Endpoint endpoint, Processor processor) {
return send(endpoint, ExchangePattern.InOut, processor);
}
public Object requestBody(Object body) throws CamelExecutionException {
return sendBody(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body);
}
public Object requestBody(Endpoint endpoint, Object body) throws CamelExecutionException {
return sendBody(endpoint, ExchangePattern.InOut, body);
}
public Object requestBodyAndHeader(Object body, String header, Object headerValue) throws CamelExecutionException {
return sendBodyAndHeader(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body, header, headerValue);
}
public Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue) throws CamelExecutionException {
return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
}
public Exchange request(String endpoint, Processor processor) throws CamelExecutionException {
return send(endpoint, ExchangePattern.InOut, processor);
}
public Object requestBody(String endpoint, Object body) throws CamelExecutionException {
return sendBody(endpoint, ExchangePattern.InOut, body);
}
public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) throws CamelExecutionException {
return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
}
public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) {
return requestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
}
public Object requestBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers);
}
public Object requestBodyAndHeaders(final Object body, final Map<String, Object> headers) {
return sendBodyAndHeaders(getDefaultEndpoint(), ExchangePattern.InOut, body, headers);
}
public <T> T requestBody(Object body, Class<T> type) {
Object answer = requestBody(body);
return context.getTypeConverter().convertTo(type, answer);
}
public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
Object answer = requestBody(endpoint, body);
return context.getTypeConverter().convertTo(type, answer);
}
public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
Object answer = requestBody(endpointUri, body);
return context.getTypeConverter().convertTo(type, answer);
}
public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) {
Object answer = requestBodyAndHeader(endpoint, body, header, headerValue);
return context.getTypeConverter().convertTo(type, answer);
}
public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) {
Object answer = requestBodyAndHeader(endpointUri, body, header, headerValue);
return context.getTypeConverter().convertTo(type, answer);
}
public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) {
Object answer = requestBodyAndHeaders(endpointUri, body, headers);
return context.getTypeConverter().convertTo(type, answer);
}
public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) {
Object answer = requestBodyAndHeaders(endpoint, body, headers);
return context.getTypeConverter().convertTo(type, answer);
}
// Methods using the default endpoint
// -----------------------------------------------------------------------
public void sendBody(Object body) {
sendBody(getMandatoryDefaultEndpoint(), body);
}
public Exchange send(Exchange exchange) {
return send(getMandatoryDefaultEndpoint(), exchange);
}
public Exchange send(Processor processor) {
return send(getMandatoryDefaultEndpoint(), processor);
}
public void sendBodyAndHeader(Object body, String header, Object headerValue) {
sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
}
public void sendBodyAndProperty(Object body, String property, Object propertyValue) {
sendBodyAndProperty(getMandatoryDefaultEndpoint(), body, property, propertyValue);
}
public void sendBodyAndHeaders(Object body, Map<String, Object> headers) {
sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
}
// Properties
// -----------------------------------------------------------------------
public CamelContext getContext() {
return context;
}
public Endpoint getDefaultEndpoint() {
return defaultEndpoint;
}
public void setDefaultEndpoint(Endpoint defaultEndpoint) {
this.defaultEndpoint = defaultEndpoint;
}
/**
* Sets the default endpoint to use if none is specified
*/
public void setDefaultEndpointUri(String endpointUri) {
setDefaultEndpoint(getContext().getEndpoint(endpointUri));
}
public <T extends Endpoint> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) {
return context.getEndpoint(endpointUri, expectedClass);
}
// Implementation methods
// -----------------------------------------------------------------------
protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) {
return new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setHeader(header, headerValue);
in.setBody(body);
}
};
}
protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) {
return new Processor() {
public void process(Exchange exchange) {
exchange.setProperty(property, propertyValue);
Message in = exchange.getIn();
in.setBody(body);
}
};
}
protected Processor createSetBodyProcessor(final Object body) {
return new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setBody(body);
}
};
}
protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
Endpoint endpoint = context.getEndpoint(endpointUri);
if (endpoint == null) {
throw new NoSuchEndpointException(endpointUri);
}
return endpoint;
}
protected Endpoint getMandatoryDefaultEndpoint() {
Endpoint answer = getDefaultEndpoint();
ObjectHelper.notNull(answer, "defaultEndpoint");
return answer;
}
protected Object extractResultBody(Exchange result) {
return extractResultBody(result, null);
}
protected Object extractResultBody(Exchange result, ExchangePattern pattern) {
return ExchangeHelper.extractResultBody(result, pattern);
}
public void setExecutorService(ExecutorService executorService) {
this.executor = executorService;
}
public Future<Exchange> asyncSend(final String uri, final Exchange exchange) {
return asyncSend(resolveMandatoryEndpoint(uri), exchange);
}
public Future<Exchange> asyncSend(final String uri, final Processor processor) {
return asyncSend(resolveMandatoryEndpoint(uri), processor);
}
public Future<Object> asyncSendBody(final String uri, final Object body) {
return asyncSendBody(resolveMandatoryEndpoint(uri), body);
}
public Future<Object> asyncRequestBody(final String uri, final Object body) {
return asyncRequestBody(resolveMandatoryEndpoint(uri), body);
}
public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
return asyncRequestBody(resolveMandatoryEndpoint(uri), body, type);
}
public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) {
return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
}
public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue, type);
}
public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) {
return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
}
public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers, type);
}
public <T> T extractFutureBody(Future<Object> future, Class<T> type) {
return ExchangeHelper.extractFutureBody(context, future, type);
}
public <T> T extractFutureBody(Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
}
public Future<Object> asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) {
return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion);
}
public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion);
}
public Future<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) {
return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion);
}
public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion);
}
public Future<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) {
return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion);
}
public Future<Exchange> asyncCallback(String uri, Processor processor, Synchronization onCompletion) {
return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion);
}
public Future<Object> asyncRequestBody(final Endpoint endpoint, final Object body) {
Callable<Object> task = new Callable<Object>() {
public Object call() throws Exception {
return requestBody(endpoint, body);
}
};
return getExecutorService().submit(task);
}
public <T> Future<T> asyncRequestBody(final Endpoint endpoint, final Object body, final Class<T> type) {
Callable<T> task = new Callable<T>() {
public T call() throws Exception {
return requestBody(endpoint, body, type);
}
};
return getExecutorService().submit(task);
}
public Future<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header,
final Object headerValue) {
Callable<Object> task = new Callable<Object>() {
public Object call() throws Exception {
return requestBodyAndHeader(endpoint, body, header, headerValue);
}
};
return getExecutorService().submit(task);
}
public <T> Future<T> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header,
final Object headerValue, final Class<T> type) {
Callable<T> task = new Callable<T>() {
public T call() throws Exception {
return requestBodyAndHeader(endpoint, body, header, headerValue, type);
}
};
return getExecutorService().submit(task);
}
public Future<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body,
final Map<String, Object> headers) {
Callable<Object> task = new Callable<Object>() {
public Object call() throws Exception {
return requestBodyAndHeaders(endpoint, body, headers);
}
};
return getExecutorService().submit(task);
}
public <T> Future<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body,
final Map<String, Object> headers, final Class<T> type) {
Callable<T> task = new Callable<T>() {
public T call() throws Exception {
return requestBodyAndHeaders(endpoint, body, headers, type);
}
};
return getExecutorService().submit(task);
}
public Future<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) {
Callable<Exchange> task = new Callable<Exchange>() {
public Exchange call() throws Exception {
return send(endpoint, exchange);
}
};
return getExecutorService().submit(task);
}
public Future<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) {
Callable<Exchange> task = new Callable<Exchange>() {
public Exchange call() throws Exception {
return send(endpoint, processor);
}
};
return getExecutorService().submit(task);
}
public Future<Object> asyncSendBody(final Endpoint endpoint, final Object body) {
Callable<Object> task = new Callable<Object>() {
public Object call() throws Exception {
sendBody(endpoint, body);
// its InOnly, so no body to return
return null;
}
};
return getExecutorService().submit(task);
}
private Future<Object> asyncCallback(final Endpoint endpoint, final ExchangePattern pattern, final Object body, final Synchronization onCompletion) {
Callable<Object> task = new Callable<Object>() {
public Object call() throws Exception {
Exchange answer = send(endpoint, pattern, createSetBodyProcessor(body));
// invoke callback before returning answer
// as it allows callback to be used without UnitOfWorkProcessor invoking it
// and thus it works directly from a producer template as well, as opposed
// to the UnitOfWorkProcessor that is injected in routes
if (answer.isFailed()) {
onCompletion.onFailure(answer);
} else {
onCompletion.onComplete(answer);
}
Object result = extractResultBody(answer, pattern);
if (pattern.isOutCapable()) {
return result;
} else {
// return null if not OUT capable
return null;
}
}
};
return getExecutorService().submit(task);
}
public Future<Exchange> asyncCallback(final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) {
Callable<Exchange> task = new Callable<Exchange>() {
public Exchange call() throws Exception {
// process the exchange, any exception occurring will be caught and set on the exchange
send(endpoint, exchange);
// invoke callback before returning answer
// as it allows callback to be used without UnitOfWorkProcessor invoking it
// and thus it works directly from a producer template as well, as opposed
// to the UnitOfWorkProcessor that is injected in routes
if (exchange.isFailed()) {
onCompletion.onFailure(exchange);
} else {
onCompletion.onComplete(exchange);
}
return exchange;
}
};
return getExecutorService().submit(task);
}
public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) {
Callable<Exchange> task = new Callable<Exchange>() {
public Exchange call() throws Exception {
// process the exchange, any exception occurring will be caught and set on the exchange
Exchange answer = send(endpoint, processor);
// invoke callback before returning answer
// as it allows callback to be used without UnitOfWorkProcessor invoking it
// and thus it works directly from a producer template as well, as opposed
// to the UnitOfWorkProcessor that is injected in routes
if (answer.isFailed()) {
onCompletion.onFailure(answer);
} else {
onCompletion.onComplete(answer);
}
return answer;
}
};
return getExecutorService().submit(task);
}
private ProducerCache getProducerCache() {
if (!isStarted()) {
throw new IllegalStateException("ProducerTemplate has not been started");
}
return producerCache;
}
private ExecutorService getExecutorService() {
if (!isStarted()) {
throw new IllegalStateException("ProducerTemplate has not been started");
}
if (executor != null) {
return executor;
}
// create a default executor which must be synchronized
synchronized (this) {
if (executor != null) {
return executor;
}
executor = context.getExecutorServiceStrategy().newDefaultThreadPool(this, "ProducerTemplate");
}
ObjectHelper.notNull(executor, "ExecutorService");
return executor;
}
protected void doStart() throws Exception {
if (producerCache == null) {
if (maximumCacheSize > 0) {
producerCache = new ProducerCache(this, context, maximumCacheSize);
} else {
producerCache = new ProducerCache(this, context);
}
}
ServiceHelper.startService(producerCache);
}
protected void doStop() throws Exception {
ServiceHelper.stopService(producerCache);
producerCache = null;
if (executor != null) {
context.getExecutorServiceStrategy().shutdownNow(executor);
executor = null;
}
}
}