commit | b9fe94d05649f7ed9e9384c880eec6817e1b5a27 | [log] [tgz] |
---|---|---|
author | Ni Ze <unclerangoo@gmail.com> | Wed Feb 01 16:33:09 2023 +0800 |
committer | GitHub <noreply@github.com> | Wed Feb 01 16:33:09 2023 +0800 |
tree | c8f2a695efb0ea07694efb41e9d1b7acada1ba3f | |
parent | 86f18192360411b52b22654e9b4982c885f5a02e [diff] | |
parent | 1dd13c3c644e622f31cf42ae2f7963142f83b742 [diff] |
Merge pull request #262 from ni-ze/develop [ISSUE #263]The idle window does not trigger
RocketMQ Streams is a lightweight stream processing framework, application gains the stream processing ability by depending on RocketMQ Streams as an SDK.
It offers a variety of features:
This paragraph guides you running a stream processing with RocketMQ Streams.
RocketMQ runs on all major operating systems and requires only a Java JDK version 8 or higher to be installed. To check, run java -version
:
$ java -version java version "1.8.0_121"
1) Download RocketMQ
wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip # Unpack the release $ unzip rocketmq-all-5.0.0-bin-release.zip $ cd rocketmq-all-5.0.0-bin-release/bin
2) Start NameServer
NameServer will be listening at 0.0.0.0:9876
, make sure that the port is not used by others on the local machine, and then do as follows.
### start Name Server $ nohup sh mqnamesrv & ### check whether Name Server is successfully started $ tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
2) Start Broker
### start Broker $ nohup sh bin/mqbroker -n localhost:9876 & ### check whether Broker is successfully started, eg: Broker's IP is 192.168.1.2, Broker's name is broker-a $ tail -f ~/logs/rocketmqlogs/broker.log The broker[broker-a, 192.169.1.2:10911] boot success...
1) Build application in IDE
2) Add RocketMQ Streams dependency
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams</artifactId> <version>{current.version}</version> </dependency>
3) Build stream processing application
sh bin/mqadmin updateTopic -c ${clusterName} -t ${topicName} -r 8 -w 8 -n 127.0.0.1:9876
NOTE: the default clusterName is DefaultCluster in this quick-start doc, changes it with your RocketMQ cluster.
public static void main(String[] args) { StreamBuilder builder = new StreamBuilder("wordCount"); builder.source("sourceTopic", total -> { String value = new String(total, StandardCharsets.UTF_8); return new Pair<>(null, value); }) .flatMap((ValueMapperAction<String, List<String>>) value -> { String[] splits = value.toLowerCase().split("\\W+"); return Arrays.asList(splits); }) .keyBy(value -> value) .count() .toRStream() .print(); TopologyBuilder topologyBuilder = builder.build(); Properties properties = new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") { @Override public void run() { rocketMQStream.stop(); latch.countDown(); } }); try { rocketMQStream.start(); latch.await(); } catch (final Throwable e) { System.exit(1); } System.exit(0); }