| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| = Apache Camel Streamer |
| |
| == Overview |
| |
| This documentation page focuses on the Apache Camel, which can also be thought of as a universal streamer because it |
| allows you to consume from any technology or protocol supported by Camel into an Ignite Cache. |
| |
| image::images/integrations/camel-streamer.png[Camel Streamer] |
| |
| With this streamer, you can ingest entries straight into an Ignite cache based on: |
| |
| * Calls received on a Web Service (SOAP or REST), by extracting the body or headers. |
| * Listening on a TCP or UDP channel for messages. |
| * The content of files received via FTP or written to the local filesystem. |
| * Email messages received via POP3 or IMAP. |
| * A MongoDB tailable cursor. |
| * An AWS SQS queue. |
| * And many others. |
| |
| This streamer supports two modes of ingestion: **direct ingestion** and **mediated ingestion**. |
| |
| [NOTE] |
| ==== |
| [discrete] |
| === The Ignite Camel Component |
| There is also the https://camel.apache.org/components/latest/ignite-summary.html[camel-ignite, window=_blank] component, if what you are looking is |
| to interact with Ignite Caches, Compute, Events, Messaging, etc. from within a Camel route. |
| ==== |
| |
| == Maven Dependency |
| |
| To make use of the `ignite-camel-ext` streamer, you need to add the following dependency: |
| |
| [tabs] |
| -- |
| tab:pom.xml[] |
| [source,xml] |
| ---- |
| <dependency> |
| <groupId>org.apache.ignite</groupId> |
| <artifactId>ignite-camel-ext</artifactId> |
| <version>${ignite-camel-ext.version}</version> |
| </dependency> |
| ---- |
| -- |
| |
| It will also pull in `camel-core` as a transitive dependency. |
| |
| == Direct Ingestion |
| |
| Direct Ingestion allows you to consume from any Camel endpoint straight into Ignite, with the help of a |
| Tuple Extractor. We call this **direct ingestion**. |
| |
| Here is a code sample: |
| [tabs] |
| -- |
| tab:Java[] |
| [source,java] |
| ---- |
| // Start Apache Ignite. |
| Ignite ignite = Ignition.start(); |
| |
| // Create an streamer pipe which ingests into the 'mycache' cache. |
| IgniteDataStreamer<String, String> pipe = ignite.dataStreamer("mycache"); |
| |
| // Create a Camel streamer and connect it. |
| CamelStreamer<String, String> streamer = new CamelStreamer<>(); |
| streamer.setIgnite(ignite); |
| streamer.setStreamer(pipe); |
| |
| // This endpoint starts a Jetty server and consumes from all network interfaces on port 8080 and context path /ignite. |
| streamer.setEndpointUri("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST"); |
| |
| // This is the tuple extractor. We'll assume each message contains only one tuple. |
| // If your message contains multiple tuples, use a StreamMultipleTupleExtractor. |
| // The Tuple Extractor receives the Camel Exchange and returns a Map.Entry<?,?> with the key and value. |
| streamer.setSingleTupleExtractor(new StreamSingleTupleExtractor<Exchange, String, String>() { |
| @Override public Map.Entry<String, String> extract(Exchange exchange) { |
| String stationId = exchange.getIn().getHeader("X-StationId", String.class); |
| String temperature = exchange.getIn().getBody(String.class); |
| return new GridMapEntry<>(stationId, temperature); |
| } |
| }); |
| |
| // Start the streamer. |
| streamer.start(); |
| ---- |
| -- |
| |
| == Mediated Ingestion |
| |
| For more sophisticated scenarios, you can also create a Camel route that performs complex processing on incoming messages, e.g. transformations, validations, splitting, aggregating, idempotency, resequencing, enrichment, etc. and **ingest only the result into the Ignite cache**. |
| |
| We call this **mediated ingestion**. |
| |
| [tabs] |
| -- |
| tab:Java[] |
| [source,java] |
| ---- |
| // Create a CamelContext with a custom route that will: |
| // (1) consume from our Jetty endpoint. |
| // (2) transform incoming JSON into a Java object with Jackson. |
| // (3) uses JSR 303 Bean Validation to validate the object. |
| // (4) dispatches to the direct:ignite.ingest endpoint, where the streamer is consuming from. |
| CamelContext context = new DefaultCamelContext(); |
| context.addRoutes(new RouteBuilder() { |
| @Override |
| public void configure() throws Exception { |
| from("jetty:http://0.0.0.0:8080/ignite?httpMethodRestrict=POST") |
| .unmarshal().json(JsonLibrary.Jackson) |
| .to("bean-validator:validate") |
| .to("direct:ignite.ingest"); |
| } |
| }); |
| |
| // Remember our Streamer is now consuming from the Direct endpoint above. |
| streamer.setEndpointUri("direct:ignite.ingest"); |
| ---- |
| -- |
| |
| == Setting a Response |
| |
| By default, the response sent back to the caller (if it is a synchronous endpoint) is simply an echo of the original request. |
| If you want to customize the response, set a Camel `Processor` as a `responseProcessor`: |
| |
| [tabs] |
| -- |
| tab:Java[] |
| [source,java] |
| ---- |
| streamer.setResponseProcessor(new Processor() { |
| @Override public void process(Exchange exchange) throws Exception { |
| exchange.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 200); |
| exchange.getOut().setBody("OK"); |
| } |
| }); |
| ---- |
| -- |