Operation FAQ

1 RocketMQ's mqadmin command error.

Problem: Sometimes after deploying the RocketMQ cluster, when you try to execute some commands of “mqadmin”, the following exception will appear:

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed

Solution: Execute export NAMESRV_ADDR=ip:9876 (ip refers to the address of NameServer deployed in the cluster) on the VM that deploys the RocketMQ cluster.Then you will execute commands of “mqadmin” successfully.

2 The inconsistent version of RocketMQ between the producer and consumer leads to the problem that message can't be consumed normally.

Problem: The same producer sends a message, consumer A can consume, but consumer B can't consume, and the RocketMQ Console appears:

Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message

Solution: The jar package of RocketMQ, such as rocketmq-client, should be the same version on the consumer and producer.

3 When adding a new topic consumer group, historical messages can't be consumed.

Problem: When a new consumer group of the same topic is started, the consumed message is the current offset message, and the historical message is not obtained.

Solution: The default policy of rocketmq is to start from the end of the message queue and skip the historical message. If you want to consume historical message, you need to set:

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere

There are three common configurations:

  • By default, a new subscription group starts to consume from the end of the queue for the first time, and then restarts and continue to consume from the last consume position, that is, to skip the historical message.
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  • A new subscription group starts to consume from the head of the queue for the first time, and then restarts and continue to consume from the last consume position, that is, to consume the historical message that is not expired on Broker.
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • A new subscription group starts to consume from the specified time point for the first time, and then restarts and continue to consume from the last consume position. It is used together with consumer.setConsumeTimestamp(). The default is half an hour ago.
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);

4 How to enable reading data from Slave

In some cases, the Consumer needs to reset the consume position to 1-2 days ago. At this time, on the Master Broker with limited memory, the CommitLog will carry a relatively heavy IO pressure, affecting the reading and writing of other messages on that Broker. You can enable slaveReadEnable=true. When Master Broker finds that the difference between the Consumer‘s consume position and the latest value of CommitLog exceeds the percentage of machine’s memory (accessMessageInMemoryMaxRatio=40%), it will recommend Consumer to read from Slave Broker and relieve Master Broker's IO.

5 Performance

Asynchronous flush disk is recommended to use spin lock.

Synchronous flush disk is recommended to use reentrant lock. Adjust the Broker configuration item useReentrantLockWhenPutMessage, and the default value is false.

Asynchronous flush disk is recommended to open TransientStorePoolEnable and close transferMsgByHeap to improve the efficiency of pulling message;

Synchronous flush disk is recommended to increase the sendMessageThreadPoolNums appropriately. The specific configuration needs to be tested.

6 The meaning and difference between msgId and offsetMsgId in RocketMQ

After sending message with RocketMQ, you will usually see the following log:

SendResult [sendStatus=SEND_OK, msgId=0A42333A0DC818B4AAC246C290FD0000, offsetMsgId=0A42333A00002A9F000000000134F1F5, messageQueue=MessageQueue [topic=topicTest1, BrokerName=mac.local, queueId=3], queueOffset=4]
  • msgId,for the client, the msgId is generated by the producer instance. Specifically, the method MessageClientIDSetter.createUniqIDBuffer() is called to generate a unique Id.
  • offsetMsgId, offsetMsgId is generated by the Broker server when writing a message ( string concating “IP address + port” and “CommitLog's physical offset address”), and offsetMsgId is the messageId used to query in the RocketMQ console.