commit | 979fdf5b67cb44d0b8532e4ffd116d36e1c3d374 | [log] [tgz] |
---|---|---|
author | Ni Ze <karp@apache.org> | Tue Jun 20 09:38:06 2023 +0800 |
committer | GitHub <noreply@github.com> | Tue Jun 20 09:38:06 2023 +0800 |
tree | aeab4575fa8fdf56c7026d842660e1af72b96d73 | |
parent | 14ba8f77f8c33275b74145733005496cdae0b852 [diff] | |
parent | f01fff6e02b95defbdc462f3f45682b9f0593130 [diff] |
Merge pull request #295 from Shuozeli/develop [ISSUE #281] Do not process the data from the removed MessageQueues by removing the queues from the originListener first
RocketMQ Streams is a lightweight stream processing framework, application gains the stream processing ability by depending on RocketMQ Streams as an SDK.
It offers a variety of features:
This paragraph guides you running a stream processing with RocketMQ Streams.
RocketMQ runs on all major operating systems and requires only a Java JDK version 8 or higher to be installed. To check, run java -version
:
$ java -version java version "1.8.0_121"
1) Download RocketMQ
wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip # Unpack the release $ unzip rocketmq-all-5.0.0-bin-release.zip $ cd rocketmq-all-5.0.0-bin-release/bin
2) Start NameServer
NameServer will be listening at 0.0.0.0:9876
, make sure that the port is not used by others on the local machine, and then do as follows.
### start Name Server $ nohup sh mqnamesrv & ### check whether Name Server is successfully started $ tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
2) Start Broker
### start Broker $ nohup sh bin/mqbroker -n localhost:9876 & ### check whether Broker is successfully started, eg: Broker's IP is 192.168.1.2, Broker's name is broker-a $ tail -f ~/logs/rocketmqlogs/broker.log The broker[broker-a, 192.169.1.2:10911] boot success...
1) Build application in IDE
2) Add RocketMQ Streams dependency
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams</artifactId> <version>{current.version}</version> </dependency>
3) Build stream processing application
sh bin/mqadmin updateTopic -c ${clusterName} -t ${topicName} -r 8 -w 8 -n 127.0.0.1:9876
NOTE: the default clusterName is DefaultCluster in this quick-start doc, changes it with your RocketMQ cluster.
public static void main(String[] args) { StreamBuilder builder = new StreamBuilder("wordCount"); builder.source("sourceTopic", total -> { String value = new String(total, StandardCharsets.UTF_8); return new Pair<>(null, value); }) .flatMap((ValueMapperAction<String, List<String>>) value -> { String[] splits = value.toLowerCase().split("\\W+"); return Arrays.asList(splits); }) .keyBy(value -> value) .count() .toRStream() .print(); TopologyBuilder topologyBuilder = builder.build(); Properties properties = new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") { @Override public void run() { rocketMQStream.stop(); latch.countDown(); } }); try { rocketMQStream.start(); latch.await(); } catch (final Throwable e) { System.exit(1); } System.exit(0); }