RocketMQ Streams is a lightweight stream processing framework, application gains the stream processing ability by depending on RocketMQ Streams as an SDK.
It offers a variety of features:
This paragraph guides you running a stream processing with RocketMQ Streams.
RocketMQ runs on all major operating systems and requires only a Java JDK version 8 or higher to be installed. To check, run java -version
:
$ java -version java version "1.8.0_121"
1) Download RocketMQ
wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip # Unpack the release $ unzip rocketmq-all-5.0.0-bin-release.zip $ cd rocketmq-all-5.0.0-bin-release/bin
2) Start NameServer
NameServer will be listening at 0.0.0.0:9876
, make sure that the port is not used by others on the local machine, and then do as follows.
### start Name Server $ nohup sh mqnamesrv & ### check whether Name Server is successfully started $ tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
2) Start Broker
### start Broker $ nohup sh bin/mqbroker -n localhost:9876 & ### check whether Broker is successfully started, eg: Broker's IP is 192.168.1.2, Broker's name is broker-a $ tail -f ~/logs/rocketmqlogs/broker.log The broker[broker-a, 192.169.1.2:10911] boot success...
1) Build application in IDE
2) Add RocketMQ Streams dependency
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams</artifactId> <version>{current.version}</version> </dependency>
3) Build stream processing application
sh bin/mqadmin updateTopic -c ${clusterName} -t ${topicName} -r 8 -w 8 -n 127.0.0.1:9876
NOTE: the default clusterName is DefaultCluster in this quick-start doc, changes it with your RocketMQ cluster.
public static void main(String[] args) { StreamBuilder builder = new StreamBuilder("wordCount"); builder.source("sourceTopic", total -> { String value = new String(total, StandardCharsets.UTF_8); return new Pair<>(null, value); }) .flatMap((ValueMapperAction<String, List<String>>) value -> { String[] splits = value.toLowerCase().split("\\W+"); return Arrays.asList(splits); }) .keyBy(value -> value) .count() .toRStream() .print(); TopologyBuilder topologyBuilder = builder.build(); Properties properties = new Properties(); properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876"); RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") { @Override public void run() { rocketMQStream.stop(); latch.countDown(); } }); try { rocketMQStream.start(); latch.await(); } catch (final Throwable e) { System.exit(1); } System.exit(0); }