Apache RocketMQ Spring Integration

Clone this repo:
  1. 1bea434 Merge pull request #191 from RongtongJin/delete_JacksonAutoConfiguration by von gosling · 36 hours ago master
  2. e40bf99 chore(autoconfig):pass checkstyle by rongtongjin · 2 days ago
  3. 075f41d chore(autoconfig):delete useless code by rongtongjin · 2 days ago
  4. bac2c69 enhancement(autoconfig):delete useless code and ambiguous deprecated annotation about JacksonFallbackConfiguration by rongtongjin · 2 days ago
  5. cb1435d Merge pull request #188 from RongtongJin/fix_186 by von gosling · 2 days ago

RocketMQ-Spring Build Status Coverage Status

Maven Central GitHub release License

中文

Introduction

This project aims to help developers quickly integrate RocketMQ with Spring Boot.

How To Contribute

We are always very happy to have contributions, whether for trivial cleanups or big new features. Please see the RocketMQ main website to read details

Prerequisites

  • JDK 1.8 and above
  • Maven 3.0 and above

Build and Install with local maven repository

  mvn clean install

Features:

  • [x] synchronous transmission
  • [x] synchronous ordered transmission
  • [x] synchronous batch transmission
  • [x] asynchronous transmission
  • [x] asynchronous ordered transmission
  • [x] orderly consume
  • [x] concurrently consume(broadcasting/clustering)
  • [x] one-way transmission
  • [x] transaction transmission
  • [x] message trace
  • [x] ACL
  • [ ] pull consume

Quick Start

Please see the complete sample rocketmq-spring-boot-samples

Note: Current RELEASE.VERSION=2.0.4

<!--add dependency in pom.xml-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${RELEASE.VERSION}</version>
</dependency>

Produce Message

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

Note:

Maybe you need change 127.0.0.1:9876 with your real NameServer address for RocketMQ

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    
    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }
    
    public void run(String... args) throws Exception {
        rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
        rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
        rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")));
        
//        rocketMQTemplate.destroy(); // notes:  once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
    }
    
    @Data
    @AllArgsConstructor
    public class OrderPaidEvent implements Serializable{
        private String orderId;
        
        private BigDecimal paidMoney;
    }
}

More relevant configurations for producing:

rocketmq.producer.send-message-timeout=300000
rocketmq.producer.compress-message-body-threshold=4096
rocketmq.producer.max-message-size=4194304
rocketmq.producer.retry-times-when-send-async-failed=0
rocketmq.producer.retry-next-server=true
rocketmq.producer.retry-times-when-send-failed=2

Send message in transaction and implement local check Listener

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }

    public void run(String... args) throws Exception {
        try {
            // Build a SpringMessage for sending in transaction
            Message msg = MessageBuilder.withPayload(..)...;
            // In sendMessageInTransaction(), the first parameter transaction name ("test")
            // must be same with the @RocketMQTransactionListener's member field 'transName'
            rocketMQTemplate.sendMessageInTransaction("test", "test-topic", msg, null);
        } catch (MQClientException e) {
            e.printStackTrace(System.out);
        }
    }

    // Define transaction listener with the annotation @RocketMQTransactionListener
    @RocketMQTransactionListener(transName="test")
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // ... local transaction process, return bollback, commit or unknown
            return RocketMQLocalTransactionState.UNKNOWN;
          }

          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // ... check transaction status and return bollback, commit or unknown
            return RocketMQLocalTransactionState.COMMIT;
          }
    }
}

Consume Message

## application.properties
rocketmq.name-server=127.0.0.1:9876

Note:

Maybe you need change 127.0.0.1:9876 with your real NameServer address for RocketMQ

@SpringBootApplication
public class ConsumerApplication{
    
    public static void main(String[] args){
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer1 implements RocketMQListener<String>{
        public void onMessage(String message) {
            log.info("received message: {}", message);
        }
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
    public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
        public void onMessage(OrderPaidEvent orderPaidEvent) {
            log.info("received orderPaidEvent: {}", orderPaidEvent);
        }
    }
}

More relevant configurations for consuming:

see: RocketMQMessageListener

Message Trace

We need 2 more configurations for support message trace in producer.

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic

The message trace in consumer should configure in @RocketMQMessageListener.

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    enableMsgTrace = true,
    customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}

Note:

Maybe you need change 127.0.0.1:9876 with your real NameServer address for RocketMQ

By default, the message track feature of Producer and Consumer is turned on and the trace-topic is RMQ_SYS_TRACE_TOPIC The topic of message trace can be configured with rocketmq.consumer.customized-trace-topic configuration item, not required to be configured in each @RocketMQTransactionListener

ACL

We need 2 more configurations for support ACL in producer.

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK

Transaction Message should configure AK/SK in @RocketMQTransactionListener.

@RocketMQTransactionListener(
    txProducerGroup = "test,
    accessKey = "AK",
    secretKey = "SK"
)
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    ...
}

Note:

You do not need to configure AK/SK for each @RocketMQTransactionListener, you could configure rocketmq.producer.access-key and rocketmq.producer.secret-key as default value

The ACL feature in consumer should configure AK/SK in @RocketMQMessageListener.

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    accessKey = "AK",
    secretKey = "SK"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}

Note:

You do not need to configure AK/SK for each @RocketMQMessageListener, you could configure rocketmq.consumer.access-key and rocketmq.consumer.secret-key as default value

FAQ

  1. How to connected many nameserver on production environment?

    rocketmq.name-server support the configuration of multiple nameserver, separated by ;. For example: 172.19.0.1:9876; 172.19.0.2:9876

  2. When was rocketMQTemplate destroyed?

    Developers do not need to manually execute the rocketMQTemplate.destroy () method when using rocketMQTemplate to send a message in the project, androcketMQTemplate will be destroyed automatically when the spring container is destroyed.

  3. start exception:Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please

    RocketMQ in the design do not want a consumer to deal with multiple types of messages at the same time, so the same consumerGroup consumer responsibility should be the same, do not do different things (that is, consumption of multiple topics). Suggested consumerGroup andtopic one correspondence.

  4. How is the message content body being serialized and deserialized?

    RocketMQ's message body is stored as byte []. When the business system message content body if it is java.lang.String type, unified in accordance withutf-8 code into byte []; If the business system message content is not java.lang.String Type, then use jackson-databind serialized into the JSON format string, and then unified in accordance withutf-8 code into byte [].

  5. How do I specify the tags for topic?

    RocketMQ best practice recommended: an application as much as possible with one Topic, the message sub-type with tags to identify,tags can be set by the application free.

    When you use rocketMQTemplate to send a message, set the destination of the message by setting thedestination parameter of the send method. The destination format is topicName:tagName, : Precedes the name of the topic, followed by the tags name.

    Note:

    tags looks a complex, but when sending a message , the destination can only specify one topic under a tag, can not specify multiple.

  6. How do I set the message's key when sending a message?

    You can send a message by overloading method like xxxSend(String destination, Message<?> msg, ...), setting headers of msg. for example:

    Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build();
    rocketMQTemplate.send("topic-test", message);
    

    Similarly, you can also set the message FLAG,WAIT_STORE_MSG_OK and some other user-defined other header information according to the above method.

    Note:

    In the case of converting Spring‘s Message to RocketMQ’s Message, to prevent the header information from conflicting with RocketMQ's system properties, the prefix USERS_ was added in front of all header names. So if you want to get a custom message header when consuming, please pass through the key at the beginning of USERS_ in the header.

  7. When consume message, in addition to get the message payload, but also want to get RocketMQ message of other system attributes, how to do?

    Consumers in the realization of RocketMQListener interface, only need to be generic for theMessageExt can, so in the onMessage method will receive RocketMQ native 'MessageExt` message.

    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer2 implements RocketMQListener<MessageExt>{
        public void onMessage(MessageExt messageExt) {
            log.info("received messageExt: {}", messageExt);
        }
    }
    
  8. How do I specify where consumers start consuming messages?

    The default consume offset please refer: RocketMQ FAQ. To customize the consumer's starting location, simply add a RocketMQPushConsumerLifecycleListener interface implementation to the consumer class. Examples are as follows:

    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
        @Override
        public void onMessage(String message) {
            log.info("received message: {}", message);
        }
    
        @Override
        public void prepareStart(final DefaultMQPushConsumer consumer) {
            // set consumer consume message from now
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        }
    }
    

    Similarly, any other configuration on DefaultMQPushConsumer can be done in the same way as above.

  9. How do I send transactional messages? It needs two steps on client side:

    a) Define a class which is annotated with @RocketMQTransactionListener and implements RocketMQLocalTransactionListener interface, in which, the executeLocalTransaction() and checkLocalTransaction() methods are implemented;

    b) Invoke the sendMessageInTransaction() method with the RocketMQTemplate API. Note: The first parameter of this method is correlated with the txProducerGroup attribute of @RocketMQTransactionListener. It can be null if using the default transaction producer group.

  10. How do I create more than one RocketMQTemplate with a different name-server or other specific properties?

    // Step1. Define an extra RocketMQTemplate with required properties, note, the 'nameServer' property must be different from the value of global
    // Spring configuration 'rocketmq.name-server', other properties are optionally defined, they will use the global configuration 
    // definition by default.  
    
    // The RocketMQTemplate's Spring Bean name is 'extRocketMQTemplate', same with the simplified class name (Initials lowercase)
    @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
       , ... // override other specific properties if needed
    )
    public class ExtRocketMQTemplate extends RocketMQTemplate {
      // keep the body empty
    }
    
    
    // Step2. Use the extra RocketMQTemplate. e.g.
    @Resource(name = "extRocketMQTemplate") // Must define the name to qualify to extra-defined RocketMQTemplate bean.
    private RocketMQTemplate extRocketMQTemplate;
    // you can use the template as normal.
    
    
  11. How do I create a consumer Listener with different name-server other than the global Spring configuration ‘rocketmq.name-server’ ?

    @Service
    @RocketMQMessageListener(
       nameServer = "NEW-NAMESERVER-LIST", // define new nameServer list
       topic = "test-topic-1", 
       consumerGroup = "my-consumer_test-topic-1",
       enableMsgTrace = true,
       customizedTraceTopic = "my-trace-topic"
    )
    public class MyNameServerConsumer implements RocketMQListener<String> {
       ...
    }