blob: bac0cb7b074a4ddc83014e0eb65380cfa46e427e [file] [log] [blame] [view]
---
title: Storm RocketMQ
layout: documentation
documentation: true
---
Storm/Trident integration for [RocketMQ](https://rocketmq.incubator.apache.org/). This package includes the core spout, bolt and trident states that allows a storm topology to either write storm tuples into a topic or read from topics in a storm topology.
## Read from Topic
The spout included in this package for reading data from a topic.
### RocketMqSpout
To use the `RocketMqSpout`, you construct an instance of it by specifying a Properties instance which including rocketmq configs.
RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer.
RocketMqSpout will retry 3(use `SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY` to change the value) times when messages are failed.
```java
Properties properties = new Properties();
properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr);
properties.setProperty(SpoutConfig.CONSUMER_GROUP, group);
properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic);
RocketMqSpout spout = new RocketMqSpout(properties);
```
## Write into Topic
The bolt and trident state included in this package for write data into a topic.
### TupleToMessageMapper
The main API for mapping Storm tuple to a RocketMQ Message is the `org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper` interface:
```java
public interface TupleToMessageMapper extends Serializable {
String getKeyFromTuple(ITuple tuple);
byte[] getValueFromTuple(ITuple tuple);
}
```
### FieldNameBasedTupleToMessageMapper
`storm-rocketmq` includes a general purpose `TupleToMessageMapper` implementation called `FieldNameBasedTupleToMessageMapper`.
### TopicSelector
The main API for selecting topic and tags is the `org.apache.storm.rocketmq.common.selector.TopicSelector` interface:
```java
public interface TopicSelector extends Serializable {
String getTopic(ITuple tuple);
String getTag(ITuple tuple);
}
```
### DefaultTopicSelector/FieldNameBasedTopicSelector
`storm-rocketmq` includes general purpose `TopicSelector` implementations called `DefaultTopicSelector` and `FieldNameBasedTopicSelector`.
### RocketMqBolt
To use the `RocketMqBolt`, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances.
RocketMqBolt send messages async by default. You can change this by invoking `withAsync(false)`.
```java
TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
TopicSelector selector = new DefaultTopicSelector(topic);
properties = new Properties();
properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
RocketMqBolt insertBolt = new RocketMqBolt()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
```
### Trident State
We support trident persistent state that can be used with trident topologies. To create a RocketMQ persistent trident state you need to initialize it with the TupleToMessageMapper, TopicSelector, Properties instances. See the example below:
```java
TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count");
TopicSelector selector = new DefaultTopicSelector(topic);
Properties properties = new Properties();
properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr);
RocketMqState.Options options = new RocketMqState.Options()
.withMapper(mapper)
.withSelector(selector)
.withProperties(properties);
StateFactory factory = new RocketMqStateFactory(options);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
stream.partitionPersist(factory, fields,
new RocketMqStateUpdater(), new Fields());
```