blob: c5af29ac6642e7908ad840ea662c908a15908126 [file] [log] [blame]
[[wireTap-eip]]
= Wire Tap EIP
:page-source: core/camel-core-engine/src/main/docs/eips/wireTap-eip.adoc
http://www.enterpriseintegrationpatterns.com/WireTap.html[Wire Tap]
(from the xref:enterprise-integration-patterns.adoc[EIP patterns])
allows you to route messages to a separate location while they are being
forwarded to the ultimate destination.
image::eip/WireTap.gif[image]
== Streams
If you xref:wireTap-eip.adoc[Wire Tap] a stream message body then you
should consider enabling xref:stream-caching.adoc[Stream caching] to
ensure the message body can be read at each endpoint. See more details
at xref:stream-caching.adoc[Stream caching].
== Options
// eip options: START
The Wire Tap EIP supports 11 options which are listed below:
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
| *processorRef* | Reference to a Processor to use for creating a new body as the message to use for wire tapping | | String
| *body* | Uses the expression for creating a new body as the message to use for wire tapping | | NamespaceAware Expression
| *executorServiceRef* | Uses a custom thread pool | | String
| *copy* | Uses a copy of the original exchange | true | Boolean
| *dynamicUri* | Whether the uri is dynamic or static. If the uri is dynamic then the simple language is used to evaluate a dynamic uri to use as the wire-tap destination, for each incoming message. This works similar to how the toD EIP pattern works. If static then the uri is used as-is as the wire-tap destination. | true | Boolean
| *onPrepareRef* | Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send. | | String
| *uri* | *Required* The uri of the endpoint to send to. The uri can be dynamic computed using the org.apache.camel.language.simple.SimpleLanguage expression. | | String
| *pattern* | Sets the optional ExchangePattern used to invoke this endpoint | | ExchangePattern
| *cacheSize* | Sets the maximum size used by the org.apache.camel.spi.ConsumerCache which is used to cache and reuse producers. | | Integer
| *ignoreInvalidEndpoint* | Ignore the invalidate endpoint exception when try to create a producer with that endpoint | false | Boolean
| *allowOptimisedComponents* | Whether to allow components to optimise toD if they are org.apache.camel.spi.SendDynamicAware. | true | Boolean
|===
// eip options: END
== WireTap Thread pool
The WireTap uses a thread pool to process the
tapped messages. This thread pool will by default use the settings
detailed at xref:threading-model.adoc[Threading Model]. In particular,
when the pool is exhausted (with all threads utilized), further wiretaps
will be executed synchronously by the calling thread. To remedy this,
you can configure an explicit thread pool on the xref:wireTap-eip.adoc[Wire
Tap] having either a different rejection policy, a larger worker queue,
or more worker threads.
== WireTap Node
Camel's Wire Tap node supports two flavors when tapping an
xref:exchange.adoc[Exchange]:
- With the traditional Wire Tap, Camel will copy the original
xref:exchange.adoc[Exchange] and set its
xref:exchange-pattern.adoc[Exchange Pattern] to *`InOnly`*, as we want
the tapped xref:exchange.adoc[Exchange] to be sent in a fire and forget
style. The tapped xref:exchange.adoc[Exchange] is then sent in a
separate thread so it can run in parallel with the original. Beware that
only the Exchange is copied - Wire Tap won't do a deep clone (unless you
specify a custom processor via *`onPrepareRef`* which does that). So all
copies could share objects from the original Exchange.
- Camel also provides an option of sending a new
xref:exchange.adoc[Exchange] allowing you to populate it with new
values.
== Sending a Copy (traditional wiretap)
* Using the xref:fluent-builders.adoc[Fluent Builders]
[source,java]
----
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: e1
from("direct:start")
.to("log:foo")
.wireTap("direct:tap")
.to("mock:result");
// END SNIPPET: e1
from("direct:tap")
.delay(1000).setBody().constant("Tapped")
.to("mock:result", "mock:tap");
from("direct:test").wireTap("direct:a").id("wiretap_1").to("mock:a");
from("direct:a").to("mock:b");
}
};
}
----
== Sending a New xref:exchange.adoc[Exchange]
*Using the xref:fluent-builders.adoc[Fluent Builders]*
Camel supports either a processor or an
xref:expression.adoc[Expression] to populate the new
xref:exchange.adoc[Exchange]. Using a processor gives you full power
over how the xref:exchange.adoc[Exchange] is populated as you can set
properties, headers, etc. An xref:expression.adoc[Expression] can only
be used to set the *`IN`* body.
The xref:expression.adoc[Expression] or
xref:processor.adoc[Processor] is pre-populated with a copy of the
original xref:exchange.adoc[Exchange], which allows you to access the
original message when you prepare a new xref:exchange.adoc[Exchange] to
be sent. You can use the *`copy`* option (enabled by default) to
indicate whether you want this.
Below is the processor variation,
where we disable *`copy`* by passing in *`false`* to create a new, empty
xref:exchange.adoc[Exchange]
[source,java]
----
public void testFireAndForgetUsingProcessor() throws Exception {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
// START SNIPPET: e1
from("direct:start")
.wireTap("direct:foo", false, new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("Bye World");
exchange.getIn().setHeader("foo", "bar");
}
}).to("mock:result");
from("direct:foo").to("mock:foo");
// END SNIPPET: e1
}
});
}
----
== Using Dynamic URIs
For example to wire tap to a dynamic URI, then it supports the same
dynamic URIs as documented in xref:message-endpoint.adoc[Message
Endpoint]. For example to wire tap to a JMS queue where the header ID is
part of the queue name:
[source,java]
----
from("direct:start") .wireTap("jms:queue:backup-$\{header.id}")
.to("bean:doSomething");
----
== Sending a New exchange and Set Headers in DSL
If you send a new message using xref:wireTap-eip.adoc[Wire Tap], then you
could only set the message body using an
xref:expression.adoc[Expression] from the DSL. If you also need to set
headers, you would have to use a xref:processor.adoc[Processor]. From
It's possible to set headers as well using the DSL.
The following example sends a new message which has
* *`Bye World`* as message body.
* A header with key *`id`* with the value *`123`*.
* A header with key *`date`* which has current date as value.
== Java DSL
[source,java]
----
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
// START SNIPPET: e1
from("direct:start")
// tap a new message and send it to direct:tap
// the new message should be Bye World with 2 headers
.wireTap("direct:tap")
// create the new tap message body and headers
.newExchangeBody(constant("Bye World"))
.newExchangeHeader("id", constant(123))
.newExchangeHeader("date", simple("${date:now:yyyyMMdd}"))
.end()
// here we continue routing the original messages
.to("mock:result");
// this is the tapped route
from("direct:tap")
.to("mock:tap");
// END SNIPPET: e1
}
};
}
----
== Using `onPrepare` to Execute Custom Logic when Preparing Messages
See details at xref:multicast-eip.adoc[Multicast]
xref:using-this-pattern.adoc[Using This Pattern]