Apache rocketmq

Clone this repo:
  1. 14ba8f7 Merge pull request #293 from ni-ze/develop by Ni Ze · 2 days ago develop
  2. 900b260 fix(bug) recover state when rocketmqStreams failed and restart by 维章 · 2 days ago
  3. 82ffd2b Merge branch 'local/upstream/develop' into develop by 维章 · 2 days ago
  4. 7357c27 modify the note by 维章 · 3 weeks ago
  5. db84168 polish(code) modify log by 维章 · 3 weeks ago

RocketMQ Streams

Build Status CodeCov GitHub release License Average time to resolve an issue Percentage of issues still open Twitter Follow

RocketMQ Streams is a lightweight stream processing framework, application gains the stream processing ability by depending on RocketMQ Streams as an SDK.

It offers a variety of features:

  • Function:
    • One-to-one transform function, such as: filter, map, foreach
    • Aggregate function, such as: sum, min, max, count, aggregate
    • Generating function, such as: flatMap
  • Group by aggregate and window aggregate
  • Join stream
  • Custom serialization

Quick Start

This paragraph guides you running a stream processing with RocketMQ Streams.

Run RocketMQ 5.0 locally

RocketMQ quick-start

RocketMQ runs on all major operating systems and requires only a Java JDK version 8 or higher to be installed. To check, run java -version:

$ java -version
java version "1.8.0_121"

1) Download RocketMQ

wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip

# Unpack the release
$ unzip rocketmq-all-5.0.0-bin-release.zip

$ cd rocketmq-all-5.0.0-bin-release/bin

2) Start NameServer

NameServer will be listening at, make sure that the port is not used by others on the local machine, and then do as follows.

### start Name Server
$ nohup sh mqnamesrv &

### check whether Name Server is successfully started
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

2) Start Broker

### start Broker
$ nohup sh bin/mqbroker -n localhost:9876 &

### check whether Broker is successfully started, eg: Broker's IP is, Broker's name is broker-a
$ tail -f ~/logs/rocketmqlogs/broker.log
The broker[broker-a,] boot success...

Build stream processing application

1) Build application in IDE

2) Add RocketMQ Streams dependency


3) Build stream processing application

  • create topic in RocketMQ before start the stream processing.
sh bin/mqadmin updateTopic -c ${clusterName} -t ${topicName} -r 8 -w 8 -n
NOTE: the default clusterName is DefaultCluster in this quick-start doc, changes it with your RocketMQ cluster.
  • add your stream processing code, The following is an example. more examples are here.
public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("wordCount");

        builder.source("sourceTopic",  total -> {
                    String value = new String(total, StandardCharsets.UTF_8);
                    return new Pair<>(null, value);
                .flatMap((ValueMapperAction<String, List<String>>) value -> {
                    String[] splits = value.toLowerCase().split("\\W+");
                    return Arrays.asList(splits);
                .keyBy(value -> value)

        TopologyBuilder topologyBuilder = builder.build();

        Properties properties = new Properties();
        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "");

        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);

        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
            public void run() {

        try {
        } catch (final Throwable e) {