commit | cbf6a92050e79e166b3a5b35a8fe9cef513e4b47 | [log] [tgz] |
---|---|---|
author | elenacliu <44845151+elenacliu@users.noreply.github.com> | Fri Jul 07 23:25:04 2023 +0800 |
committer | GitHub <noreply@github.com> | Fri Jul 07 23:25:04 2023 +0800 |
tree | e7f73adcaf8f061e4c844e7715b332ddbc892b4c | |
parent | af419fdbec853ac311e7b49809e551f2959ecf85 [diff] | |
parent | 4bd58c82c58f31484741f4e61a9b4185c435d005 [diff] |
Merge branch 'apache:develop' into hopping
RocketMQ Streams is a lightweight stream processing framework, application gains the stream processing ability by depending on RocketMQ Streams as an SDK.
It offers a variety of features:
This paragraph guides you running a stream processing with RocketMQ Streams.
RocketMQ runs on all major operating systems and requires only a Java JDK version 8 or higher to be installed. To check, run java -version
:
$ java -version java version "1.8.0_121"
1) Download RocketMQ
wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip # Unpack the release $ unzip rocketmq-all-5.0.0-bin-release.zip $ cd rocketmq-all-5.0.0-bin-release/bin
2) Start NameServer
NameServer will be listening at 0.0.0.0:9876
, make sure that the port is not used by others on the local machine, and then do as follows.
### start Name Server $ nohup sh mqnamesrv & ### check whether Name Server is successfully started $ tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
2) Start Broker
### start Broker $ nohup sh bin/mqbroker -n localhost:9876 & ### check whether Broker is successfully started, eg: Broker's IP is 192.168.1.2, Broker's name is broker-a $ tail -f ~/logs/rocketmqlogs/broker.log The broker[broker-a, 192.169.1.2:10911] boot success...
1) Build application in IDE
2) Add RocketMQ Streams dependency
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams</artifactId> <version>{current.version}</version> </dependency>
3) Build stream processing application
sh bin/mqadmin updateTopic -c ${clusterName} -t ${topicName} -r 8 -w 8 -n 127.0.0.1:9876
NOTE: the default clusterName is DefaultCluster in this quick-start doc, changes it with your RocketMQ cluster.
public static void main(String[] args) { StreamBuilder builder = new StreamBuilder("wordCount"); builder.source("sourceTopic", total -> { String value = new String(total, StandardCharsets.UTF_8); return new Pair<>(null, value); }) .flatMap((ValueMapperAction<String, List<String>>) value -> { String[] splits = value.toLowerCase().split("\\W+"); return Arrays.asList(splits); }) .keyBy(value -> value) .count() .toRStream() .print(); TopologyBuilder topologyBuilder = builder.build(); Properties properties = new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") { @Override public void run() { rocketMQStream.stop(); latch.countDown(); } }); try { rocketMQStream.start(); latch.await(); } catch (final Throwable e) { System.exit(1); } System.exit(0); }