blob: a42129383c68df31f6d019e6d150975077662058 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= Apache Camel Streamer
== Overview
This documentation page focuses on the Apache Camel, which can also be thought of as a universal streamer because it
allows you to consume from any technology or protocol supported by Camel into an Ignite Cache.
image::images/integrations/camel-streamer.png[Camel Streamer]
With this streamer, you can ingest entries straight into an Ignite cache based on:
* Calls received on a Web Service (SOAP or REST), by extracting the body or headers.
* Listening on a TCP or UDP channel for messages.
* The content of files received via FTP or written to the local filesystem.
* Email messages received via POP3 or IMAP.
* A MongoDB tailable cursor.
* An AWS SQS queue.
* And many others.
This streamer supports two modes of ingestion: **direct ingestion** and **mediated ingestion**.
[NOTE]
====
[discrete]
=== The Ignite Camel Component
There is also the https://camel.apache.org/components/latest/ignite-summary.html[camel-ignite, window=_blank] component, if what you are looking is
to interact with Ignite Caches, Compute, Events, Messaging, etc. from within a Camel route.
====
== Maven Dependency
To make use of the `ignite-camel-ext` streamer, you need to add the following dependency:
[tabs]
--
tab:pom.xml[]
[source,xml]
----
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-camel-ext</artifactId>
<version>${ignite-camel-ext.version}</version>
</dependency>
----
--
It will also pull in `camel-core` as a transitive dependency.
== Direct Ingestion
Direct Ingestion allows you to consume from any Camel endpoint straight into Ignite, with the help of a
Tuple Extractor. We call this **direct ingestion**.
Here is a code sample:
[tabs]
--
tab:Java[]
[source,java]
----
// Start Apache Ignite.
Ignite ignite = Ignition.start();
// Create an streamer pipe which ingests into the 'mycache' cache.
IgniteDataStreamer<String, String> pipe = ignite.dataStreamer("mycache");
// Create a Camel streamer and connect it.
CamelStreamer<String, String> streamer = new CamelStreamer<>();
streamer.setIgnite(ignite);
streamer.setStreamer(pipe);
// This endpoint starts a Jetty server and consumes from all network interfaces on port 8080 and context path /ignite.
streamer.setEndpointUri("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST");
// This is the tuple extractor. We'll assume each message contains only one tuple.
// If your message contains multiple tuples, use a StreamMultipleTupleExtractor.
// The Tuple Extractor receives the Camel Exchange and returns a Map.Entry<?,?> with the key and value.
streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exchange, String, String>() {
@Override public Map.Entry<String, String> extract(Exchange exchange) {
String stationId = exchange.getIn().getHeader("X-StationId", String.class);
String temperature = exchange.getIn().getBody(String.class);
return new GridMapEntry<>(stationId, temperature);
}
});
// Start the streamer.
streamer.start();
----
--
== Mediated Ingestion
For more sophisticated scenarios, you can also create a Camel route that performs complex processing on incoming messages, e.g. transformations, validations, splitting, aggregating, idempotency, resequencing, enrichment, etc. and **ingest only the result into the Ignite cache**.
We call this **mediated ingestion**.
[tabs]
--
tab:Java[]
[source,java]
----
// Create a CamelContext with a custom route that will:
// (1) consume from our Jetty endpoint.
// (2) transform incoming JSON into a Java object with Jackson.
// (3) uses JSR 303 Bean Validation to validate the object.
// (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is consuming from.
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST")
.unmarshal().json(JsonLibrary.Jackson)
.to("bean-validator:validate")
.to("direct:ignite.ingest");
}
});
// Remember our Streamer is now consuming from the Direct endpoint above.
streamer.setEndpointUri("direct:ignite.ingest");
----
--
== Setting a Response
By default, the response sent back to the caller (if it is a synchronous endpoint) is simply an echo of the original request.
If you want to customize the response, set a Camel `Processor` as a `responseProcessor`:
[tabs]
--
tab:Java[]
[source,java]
----
streamer.setResponseProcessor(new Processor() {
@Override public void process(Exchange exchange) throws Exception {
exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200);
exchange.getOut().setBody("OK");
}
});
----
--