blob: fd015f92e90a8f6a634f18dc06443bfdcd3b13d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.impl.engine;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Message;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.ProcessorFactory;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.concurrent.SynchronousExecutorService;
/**
* Template (named like Spring's TransactionTemplate & JmsTemplate et al) for working with Camel and sending
* {@link Message} instances in an {@link Exchange} to an {@link Endpoint}.
*/
public class DefaultProducerTemplate extends ServiceSupport implements ProducerTemplate {
private final CamelContext camelContext;
private final ProcessorFactory processorFactory;
private volatile ProducerCache producerCache;
private volatile ExecutorService executor;
private Endpoint defaultEndpoint;
private int maximumCacheSize;
private boolean eventNotifierEnabled = true;
private volatile boolean threadedAsyncMode = true;
public DefaultProducerTemplate(CamelContext camelContext) {
this.camelContext = camelContext;
this.processorFactory = camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory();
}
public DefaultProducerTemplate(CamelContext camelContext, ExecutorService executor) {
this.camelContext = camelContext;
this.processorFactory = camelContext.adapt(ExtendedCamelContext.class).getProcessorFactory();
this.executor = executor;
}
public DefaultProducerTemplate(CamelContext camelContext, Endpoint defaultEndpoint) {
this(camelContext);
this.defaultEndpoint = defaultEndpoint;
}
public static DefaultProducerTemplate newInstance(CamelContext camelContext, String defaultEndpointUri) {
Endpoint endpoint = CamelContextHelper.getMandatoryEndpoint(camelContext, defaultEndpointUri);
return new DefaultProducerTemplate(camelContext, endpoint);
}
@Override
public int getMaximumCacheSize() {
return maximumCacheSize;
}
@Override
public void setMaximumCacheSize(int maximumCacheSize) {
this.maximumCacheSize = maximumCacheSize;
}
@Override
public boolean isThreadedAsyncMode() {
return threadedAsyncMode;
}
@Override
public void setThreadedAsyncMode(boolean useExecutor) {
this.threadedAsyncMode = useExecutor;
}
@Override
public int getCurrentCacheSize() {
if (producerCache == null) {
return 0;
}
return producerCache.size();
}
@Override
public boolean isEventNotifierEnabled() {
return eventNotifierEnabled;
}
@Override
public void cleanUp() {
if (producerCache != null) {
producerCache.cleanUp();
}
}
@Override
public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
this.eventNotifierEnabled = eventNotifierEnabled;
// if we already created the cache then adjust its setting as well
if (producerCache != null) {
producerCache.setEventNotifierEnabled(eventNotifierEnabled);
}
}
@Override
public Exchange send(String endpointUri, Exchange exchange) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
return send(endpoint, exchange);
}
@Override
public Exchange send(String endpointUri, Processor processor) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
return send(endpoint, null, processor, null);
}
@Override
public Exchange send(String endpointUri, ExchangePattern pattern, Processor processor) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
return send(endpoint, pattern, processor, null);
}
@Override
public Exchange send(Endpoint endpoint, Exchange exchange) {
return send(endpoint, exchange, null);
}
@Override
public Exchange send(Endpoint endpoint, Processor processor) {
return send(endpoint, null, processor, null);
}
@Override
public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) {
return send(endpoint, pattern, processor, null);
}
@Override
public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor) {
Exchange exchange = pattern != null ? endpoint.createExchange(pattern) : endpoint.createExchange();
if (processor != null) {
try {
processor.process(exchange);
} catch (Exception e) {
exchange.setException(e);
return exchange;
}
}
return send(endpoint, exchange, resultProcessor);
}
public Exchange send(Endpoint endpoint, Exchange exchange, Processor resultProcessor) {
return getProducerCache().send(endpoint, exchange, resultProcessor);
}
@Override
public Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body) {
Exchange result = send(endpoint, pattern, createSetBodyProcessor(body));
return extractResultBody(result, pattern);
}
@Override
public void sendBody(Endpoint endpoint, Object body) throws CamelExecutionException {
Exchange result = send(endpoint, createSetBodyProcessor(body));
// must invoke extract result body in case of exception to be rethrown
extractResultBody(result);
}
@Override
public void sendBody(String endpointUri, Object body) throws CamelExecutionException {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
sendBody(endpoint, body);
}
@Override
public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) throws CamelExecutionException {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
Object result = sendBody(endpoint, pattern, body);
if (pattern == ExchangePattern.InOnly) {
// return null if not OUT capable
return null;
} else {
return result;
}
}
@Override
public void sendBodyAndHeader(String endpointUri, final Object body, final String header, final Object headerValue)
throws CamelExecutionException {
sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
}
@Override
public void sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, final Object headerValue)
throws CamelExecutionException {
Exchange result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
// must invoke extract result body in case of exception to be rethrown
extractResultBody(result);
}
@Override
public Object sendBodyAndHeader(
Endpoint endpoint, ExchangePattern pattern, final Object body,
final String header, final Object headerValue)
throws CamelExecutionException {
Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
Object result = extractResultBody(exchange, pattern);
if (pattern == ExchangePattern.InOnly) {
// return null if not OUT capable
return null;
} else {
return result;
}
}
@Override
public Object sendBodyAndHeader(
String endpoint, ExchangePattern pattern, final Object body,
final String header, final Object headerValue)
throws CamelExecutionException {
Exchange exchange = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue));
Object result = extractResultBody(exchange, pattern);
if (pattern == ExchangePattern.InOnly) {
// return null if not OUT capable
return null;
} else {
return result;
}
}
@Override
public void sendBodyAndProperty(
String endpointUri, final Object body,
final String property, final Object propertyValue)
throws CamelExecutionException {
sendBodyAndProperty(resolveMandatoryEndpoint(endpointUri), body, property, propertyValue);
}
@Override
public void sendBodyAndProperty(
Endpoint endpoint, final Object body,
final String property, final Object propertyValue)
throws CamelExecutionException {
Exchange result = send(endpoint, createBodyAndPropertyProcessor(body, property, propertyValue));
// must invoke extract result body in case of exception to be rethrown
extractResultBody(result);
}
@Override
public Object sendBodyAndProperty(
Endpoint endpoint, ExchangePattern pattern, final Object body,
final String property, final Object propertyValue)
throws CamelExecutionException {
Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
Object result = extractResultBody(exchange, pattern);
if (pattern == ExchangePattern.InOnly) {
// return null if not OUT capable
return null;
} else {
return result;
}
}
@Override
public Object sendBodyAndProperty(
String endpoint, ExchangePattern pattern, final Object body,
final String property, final Object propertyValue)
throws CamelExecutionException {
Exchange exchange = send(endpoint, pattern, createBodyAndPropertyProcessor(body, property, propertyValue));
Object result = extractResultBody(exchange, pattern);
if (pattern == ExchangePattern.InOnly) {
// return null if not OUT capable
return null;
} else {
return result;
}
}
@Override
public void sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers)
throws CamelExecutionException {
sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
}
@Override
public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers)
throws CamelExecutionException {
Exchange result = send(endpoint, createBodyAndHeaders(body, headers));
// must invoke extract result body in case of exception to be rethrown
extractResultBody(result);
}
@Override
public Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map<String, Object> headers)
throws CamelExecutionException {
return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), pattern, body, headers);
}
@Override
public Object sendBodyAndHeaders(
Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers)
throws CamelExecutionException {
Exchange exchange = send(endpoint, pattern, createBodyAndHeaders(body, headers));
Object result = extractResultBody(exchange, pattern);
if (pattern == ExchangePattern.InOnly) {
// return null if not OUT capable
return null;
} else {
return result;
}
}
// Methods using an InOut ExchangePattern
// -----------------------------------------------------------------------
@Override
public Exchange request(Endpoint endpoint, Processor processor) {
return send(endpoint, ExchangePattern.InOut, processor);
}
@Override
public Object requestBody(Object body) throws CamelExecutionException {
return sendBody(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body);
}
@Override
public Object requestBody(Endpoint endpoint, Object body) throws CamelExecutionException {
return sendBody(endpoint, ExchangePattern.InOut, body);
}
@Override
public Object requestBodyAndHeader(Object body, String header, Object headerValue) throws CamelExecutionException {
return sendBodyAndHeader(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body, header, headerValue);
}
@Override
public Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue)
throws CamelExecutionException {
return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue);
}
@Override
public Exchange request(String endpointUri, Processor processor) throws CamelExecutionException {
return send(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, processor, null);
}
@Override
public Object requestBody(String endpointUri, Object body) throws CamelExecutionException {
return sendBody(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, body);
}
@Override
public Object requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue)
throws CamelExecutionException {
return sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, body, header, headerValue);
}
@Override
public Object requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers) {
return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), ExchangePattern.InOut, body, headers);
}
@Override
public Object requestBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) {
return sendBodyAndHeaders(endpoint, ExchangePattern.InOut, body, headers);
}
@Override
public Object requestBodyAndHeaders(final Object body, final Map<String, Object> headers) {
return sendBodyAndHeaders(getMandatoryDefaultEndpoint(), ExchangePattern.InOut, body, headers);
}
@Override
public <T> T requestBody(Object body, Class<T> type) {
return requestBody(getMandatoryDefaultEndpoint(), body, type);
}
@Override
public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
Exchange exchange
= send(endpoint, ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@Override
public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
Exchange exchange
= send(endpoint, ExchangePattern.InOut, createSetBodyProcessor(body), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@Override
public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type) {
Exchange exchange = send(endpoint, ExchangePattern.InOut, createBodyAndHeaderProcessor(body, header, headerValue),
createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@Override
public <T> T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
Exchange exchange = send(endpoint, ExchangePattern.InOut, createBodyAndHeaderProcessor(body, header, headerValue),
createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@Override
public <T> T requestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
Exchange exchange
= send(endpoint, ExchangePattern.InOut, createBodyAndHeaders(body, headers), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@Override
public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type) {
Exchange exchange
= send(endpoint, ExchangePattern.InOut, createBodyAndHeaders(body, headers), createConvertBodyProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
// Methods using the default endpoint
// -----------------------------------------------------------------------
@Override
public void sendBody(Object body) {
sendBody(getMandatoryDefaultEndpoint(), body);
}
@Override
public Exchange send(Exchange exchange) {
return send(getMandatoryDefaultEndpoint(), exchange);
}
@Override
public Exchange send(Processor processor) {
return send(getMandatoryDefaultEndpoint(), processor);
}
@Override
public void sendBodyAndHeader(Object body, String header, Object headerValue) {
sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue);
}
@Override
public void sendBodyAndProperty(Object body, String property, Object propertyValue) {
sendBodyAndProperty(getMandatoryDefaultEndpoint(), body, property, propertyValue);
}
@Override
public void sendBodyAndHeaders(Object body, Map<String, Object> headers) {
sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers);
}
// Properties
// -----------------------------------------------------------------------
@Override
public CamelContext getCamelContext() {
return camelContext;
}
@Override
public Endpoint getDefaultEndpoint() {
return defaultEndpoint;
}
@Override
public void setDefaultEndpoint(Endpoint defaultEndpoint) {
this.defaultEndpoint = defaultEndpoint;
}
/**
* Sets the default endpoint to use if none is specified
*/
@Override
public void setDefaultEndpointUri(String endpointUri) {
setDefaultEndpoint(getCamelContext().getEndpoint(endpointUri));
}
// Implementation methods
// -----------------------------------------------------------------------
protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) {
return new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setHeader(header, headerValue);
in.setBody(body);
}
};
}
protected Processor createBodyAndHeaders(final Object body, final Map<String, Object> headers) {
return new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
if (headers != null) {
for (Map.Entry<String, Object> header : headers.entrySet()) {
in.setHeader(header.getKey(), header.getValue());
}
}
in.setBody(body);
}
};
}
protected Processor createBodyAndPropertyProcessor(final Object body, final String property, final Object propertyValue) {
return new Processor() {
public void process(Exchange exchange) {
exchange.setProperty(property, propertyValue);
Message in = exchange.getIn();
in.setBody(body);
}
};
}
protected Processor createSetBodyProcessor(final Object body) {
return new Processor() {
public void process(Exchange exchange) {
Message in = exchange.getIn();
in.setBody(body);
}
};
}
protected Processor createConvertBodyProcessor(final Class<?> type) {
try {
return processorFactory.createProcessor(camelContext, "ConvertBodyProcessor", new Object[] { type });
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeException(e);
}
}
protected Function<Exchange, Exchange> createCompletionFunction(Synchronization onCompletion) {
return answer -> {
// invoke callback before returning answer
// as it allows callback to be used without unit of work invoking it
// and thus it works directly from a producer template as well, as opposed
// to the unit of work that is injected in routes
if (answer.isFailed()) {
onCompletion.onFailure(answer);
} else {
onCompletion.onComplete(answer);
}
return answer;
};
}
protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
Endpoint endpoint = camelContext.getEndpoint(endpointUri);
if (endpoint == null) {
throw new NoSuchEndpointException(endpointUri);
}
return endpoint;
}
protected Endpoint getMandatoryDefaultEndpoint() {
Endpoint answer = getDefaultEndpoint();
ObjectHelper.notNull(answer, "defaultEndpoint");
return answer;
}
protected Object extractResultBody(Exchange result) {
return extractResultBody(result, null);
}
protected Object extractResultBody(Exchange result, ExchangePattern pattern) {
return ExchangeHelper.extractResultBody(result, pattern);
}
@Override
public void setExecutorService(ExecutorService executorService) {
this.executor = executorService;
}
@Override
public CompletableFuture<Exchange> asyncSend(final String uri, final Exchange exchange) {
return asyncSend(resolveMandatoryEndpoint(uri), exchange);
}
@Override
public CompletableFuture<Exchange> asyncSend(final String uri, final Processor processor) {
return asyncSend(resolveMandatoryEndpoint(uri), processor);
}
@Override
public CompletableFuture<Object> asyncSendBody(final String uri, final Object body) {
return asyncSendBody(resolveMandatoryEndpoint(uri), body);
}
@Override
public CompletableFuture<Object> asyncRequestBody(final String uri, final Object body) {
return asyncRequestBody(resolveMandatoryEndpoint(uri), body);
}
@Override
public <T> CompletableFuture<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) {
return asyncRequestBody(resolveMandatoryEndpoint(uri), createSetBodyProcessor(body), type);
}
@Override
public CompletableFuture<Object> asyncRequestBodyAndHeader(
final String endpointUri, final Object body, final String header, final Object headerValue) {
return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue);
}
@Override
public <T> CompletableFuture<T> asyncRequestBodyAndHeader(
final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) {
return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue, type);
}
@Override
public CompletableFuture<Object> asyncRequestBodyAndHeaders(
final String endpointUri, final Object body, final Map<String, Object> headers) {
return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers);
}
@Override
public <T> CompletableFuture<T> asyncRequestBodyAndHeaders(
final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) {
return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers, type);
}
@Override
public <T> T extractFutureBody(Future<?> future, Class<T> type) {
return ExchangeHelper.extractFutureBody(camelContext, future, type);
}
@Override
public <T> T extractFutureBody(Future<?> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
return ExchangeHelper.extractFutureBody(camelContext, future, timeout, unit, type);
}
@Override
public CompletableFuture<Object> asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) {
return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion);
}
@Override
public CompletableFuture<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion);
}
@Override
public CompletableFuture<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) {
return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion);
}
@Override
public CompletableFuture<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) {
return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion);
}
@Override
public CompletableFuture<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) {
return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion);
}
@Override
public CompletableFuture<Exchange> asyncCallback(String uri, Processor processor, Synchronization onCompletion) {
return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion);
}
@Override
public CompletableFuture<Object> asyncRequestBody(final Endpoint endpoint, final Object body) {
return asyncRequestBody(endpoint, createSetBodyProcessor(body));
}
@Override
public <T> CompletableFuture<T> asyncRequestBody(Endpoint endpoint, Object body, Class<T> type) {
return asyncRequestBody(endpoint, createSetBodyProcessor(body), type);
}
@Override
public CompletableFuture<Object> asyncRequestBodyAndHeader(
final Endpoint endpoint, final Object body, final String header,
final Object headerValue) {
return asyncRequestBody(endpoint, createBodyAndHeaderProcessor(body, header, headerValue));
}
protected <T> CompletableFuture<T> asyncRequestBody(final Endpoint endpoint, Processor processor, final Class<T> type) {
return asyncRequestBody(endpoint, processor, createConvertBodyProcessor(type))
.thenApply(answer -> camelContext.getTypeConverter().convertTo(type, answer));
}
@Override
public <T> CompletableFuture<T> asyncRequestBodyAndHeader(
final Endpoint endpoint, final Object body, final String header,
final Object headerValue, final Class<T> type) {
return asyncRequestBody(endpoint, createBodyAndHeaderProcessor(body, header, headerValue), type);
}
@Override
public CompletableFuture<Object> asyncRequestBodyAndHeaders(
final Endpoint endpoint, final Object body,
final Map<String, Object> headers) {
return asyncRequestBody(endpoint, createBodyAndHeaders(body, headers));
}
@Override
public <T> CompletableFuture<T> asyncRequestBodyAndHeaders(
final Endpoint endpoint, final Object body,
final Map<String, Object> headers, final Class<T> type) {
return asyncRequestBody(endpoint, createBodyAndHeaders(body, headers), type);
}
@Override
public CompletableFuture<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) {
return asyncSendExchange(endpoint, null, null, null, exchange);
}
@Override
public CompletableFuture<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) {
return asyncSend(endpoint, null, processor, null);
}
@Override
public CompletableFuture<Object> asyncSendBody(final Endpoint endpoint, final Object body) {
return asyncSend(endpoint, createSetBodyProcessor(body))
.thenApply(this::extractResultBody);
}
@Override
public CompletableFuture<Exchange> asyncCallback(
final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) {
return asyncSend(endpoint, exchange).thenApply(createCompletionFunction(onCompletion));
}
@Override
public CompletableFuture<Exchange> asyncCallback(
final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) {
return asyncSend(endpoint, processor).thenApply(createCompletionFunction(onCompletion));
}
protected CompletableFuture<Object> asyncRequestBody(final Endpoint endpoint, Processor processor) {
return asyncRequestBody(endpoint, processor, (Processor) null);
}
protected CompletableFuture<Object> asyncRequestBody(
final Endpoint endpoint, Processor processor, Processor resultProcessor) {
return asyncRequest(endpoint, processor, resultProcessor)
.thenApply(e -> extractResultBody(e, ExchangePattern.InOut));
}
protected CompletableFuture<Exchange> asyncRequest(
Endpoint endpoint, Processor processor,
Processor resultProcessor) {
return asyncSend(endpoint, ExchangePattern.InOut, processor, resultProcessor);
}
protected CompletableFuture<Exchange> asyncSend(
Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor) {
return asyncSendExchange(endpoint, pattern, processor, resultProcessor, null);
}
protected CompletableFuture<Exchange> asyncSendExchange(
Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor,
Exchange inExchange) {
CompletableFuture<Exchange> exchangeFuture = new CompletableFuture<>();
getExecutorService().submit(() -> getProducerCache().asyncSendExchange(endpoint, pattern, processor,
resultProcessor, inExchange, exchangeFuture));
return exchangeFuture;
}
protected CompletableFuture<Object> asyncCallback(
final Endpoint endpoint, final ExchangePattern pattern,
final Object body, final Synchronization onCompletion) {
return asyncSend(endpoint, pattern, createSetBodyProcessor(body), null)
.thenApply(createCompletionFunction(onCompletion))
.thenApply(answer -> {
Object result = extractResultBody(answer, pattern);
if (pattern == ExchangePattern.InOnly) {
// return null if not OUT capable
return null;
} else {
return result;
}
});
}
private org.apache.camel.spi.ProducerCache getProducerCache() {
if (!isStarted()) {
throw new IllegalStateException("ProducerTemplate has not been started");
}
return producerCache;
}
private ExecutorService getExecutorService() {
if (!isStarted()) {
throw new IllegalStateException("ProducerTemplate has not been started");
}
if (executor == null) {
// create a default executor which must be synchronized
synchronized (lock) {
if (executor == null) {
if (threadedAsyncMode) {
executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "ProducerTemplate");
} else {
executor = new SynchronousExecutorService();
}
}
}
}
return executor;
}
@Override
protected void doInit() throws Exception {
// need to lookup default endpoint as it may have been intercepted
if (defaultEndpoint != null) {
defaultEndpoint = camelContext.getEndpoint(defaultEndpoint.getEndpointUri());
}
producerCache = new DefaultProducerCache(this, camelContext, maximumCacheSize);
producerCache.setEventNotifierEnabled(isEventNotifierEnabled());
ServiceHelper.initService(producerCache);
}
@Override
protected void doStart() throws Exception {
ServiceHelper.startService(producerCache);
}
@Override
protected void doStop() throws Exception {
ServiceHelper.stopService(producerCache);
if (executor != null) {
camelContext.getExecutorServiceManager().shutdownNow(executor);
executor = null;
}
}
@Override
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownService(producerCache);
producerCache = null;
}
}