Storm/Trident integration for RocketMQ. This package includes the core spout, bolt and trident states that allows a storm topology to either write storm tuples into a topic or read from topics in a storm topology.
The spout included in this package for reading data from a topic.
To use the RocketMqSpout
, you construct an instance of it by specifying a Properties instance which including rocketmq configs. RocketMqSpout uses RocketMQ MQPushConsumer as the default implementation. PushConsumer is a high level consumer API, wrapping the pulling details. Looks like broker push messages to consumer. RocketMqSpout will retry 3(use SpoutConfig.DEFAULT_MESSAGES_MAX_RETRY
to change the value) times when messages are failed.
Properties properties = new Properties(); properties.setProperty(SpoutConfig.NAME_SERVER_ADDR, nameserverAddr); properties.setProperty(SpoutConfig.CONSUMER_GROUP, group); properties.setProperty(SpoutConfig.CONSUMER_TOPIC, topic); RocketMqSpout spout = new RocketMqSpout(properties);
The bolt and trident state included in this package for write data into a topic.
The main API for mapping Storm tuple to a RocketMQ Message is the org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper
interface:
public interface TupleToMessageMapper extends Serializable { String getKeyFromTuple(ITuple tuple); byte[] getValueFromTuple(ITuple tuple); }
storm-rocketmq
includes a general purpose TupleToMessageMapper
implementation called FieldNameBasedTupleToMessageMapper
.
The main API for selecting topic and tags is the org.apache.storm.rocketmq.common.selector.TopicSelector
interface:
public interface TopicSelector extends Serializable { String getTopic(ITuple tuple); String getTag(ITuple tuple); }
storm-rocketmq
includes general purpose TopicSelector
implementations called DefaultTopicSelector
and FieldNameBasedTopicSelector
.
To use the RocketMqBolt
, you construct an instance of it by specifying TupleToMessageMapper, TopicSelector and Properties instances. RocketMqBolt send messages async by default. You can change this by invoking withAsync(false)
.
TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count"); TopicSelector selector = new DefaultTopicSelector(topic); properties = new Properties(); properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr); RocketMqBolt insertBolt = new RocketMqBolt() .withMapper(mapper) .withSelector(selector) .withProperties(properties);
We support trident persistent state that can be used with trident topologies. To create a RocketMQ persistent trident state you need to initialize it with the TupleToMessageMapper, TopicSelector, Properties instances. See the example below:
TupleToMessageMapper mapper = new FieldNameBasedTupleToMessageMapper("word", "count"); TopicSelector selector = new DefaultTopicSelector(topic); Properties properties = new Properties(); properties.setProperty(RocketMqConfig.NAME_SERVER_ADDR, nameserverAddr); RocketMqState.Options options = new RocketMqState.Options() .withMapper(mapper) .withSelector(selector) .withProperties(properties); StateFactory factory = new RocketMqStateFactory(options); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout); stream.partitionPersist(factory, fields, new RocketMqStateUpdater(), new Fields());