blob: a302ca722c4b22c584baf40c12b790e272cd04ce [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= RocketMQ Streamer
This streamer module provides streaming from https://github.com/apache/incubator-rocketmq[Apache RocketMQ, window=_blank]
to Ignite.
To use Ignite RocketMQ Streamer module
. Import it to your Maven project. If you are using Maven to manage dependencies of your project, you can add an Ignite
RocketMQ module dependency like this (replace `${ignite-rocketmq-ext.version}` with actual Ignite RocketMQ Extension version you are interested in):
+
[tabs]
--
tab:pom.xml[]
[source,xml]
----
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
...
<dependencies>
...
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-rocketmq-ext</artifactId>
<version>${ignite-rocketmq-ext.version}</version>
</dependency>
...
</dependencies>
...
</project>
----
--
. Implement either `StreamSingleTupleExtractor` or `StreamMultipleTupleExtractor` for the streamer (shown
as `MyTupleExtractor` in the code sample below). For a simple implementation, refer to `RocketMQStreamerTest.java`.
. Initialize and start the streamer
+
[tabs]
--
tab:Java[]
[source,java]
----
IgniteDataStreamer<String, byte[]> dataStreamer = ignite.dataStreamer(MY_CACHE));
dataStreamer.allowOverwrite(true);
dataStreamer.autoFlushFrequency(10);
streamer = new RocketMQStreamer<>();
//configure.
streamer.setIgnite(ignite);
streamer.setStreamer(dataStreamer);
streamer.setNameSrvAddr(NAMESERVER_IP_PORT);
streamer.setConsumerGrp(CONSUMER_GRP);
streamer.setTopic(TOPIC_NAME);
streamer.setMultipleTupleExtractor(new MyTupleExtractor());
streamer.start();
...
// stop on shutdown
streamer.stop();
dataStreamer.close();
----
--
Refer to the Javadocs for more info on the available options.