订阅关系:一个消费者组订阅一个 Topic 的某一个 Tag,这种记录被称为订阅关系。
订阅关系一致:同一个消费者组下所有消费者实例所订阅的Topic、Tag必须完全一致。如果订阅关系(消费者组名-Topic-Tag)不一致,会导致消费消息紊乱,甚至消息丢失。
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA,且订阅TopicA的Tag也都是Tag1,符合订阅关系一致原则。
正确示例代码一
C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "Tag1", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicB,订阅TopicB的Tag也都是Tag2和Tag3,表示订阅TopicB中所有Tag为Tag2或Tag3的消息,且顺序一致都是Tag2||Tag3,符合订阅关系一致性原则。
正确示例代码二
C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicB", "Tag2||Tag3", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA和TopicB,且订阅的TopicA都未指定Tag,即订阅TopicA中的所有消息,订阅的TopicB的Tag都是Tag2和Tag3,表示订阅TopicB中所有Tag为Tag2或Tag3的消息,且顺序一致都是Tag2||Tag3,符合订阅关系一致原则。
正确示例代码三
C1、C2、C3的订阅关系一致,即C1、C2、C3订阅消息的代码必须完全一致,代码示例如下:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_3"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("TopicB", "Tag2||Tag3", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
问题描述
在使用消息队列RocketMQ版实例时,可能会出现订阅关系不一致的情况,具体的问题现象如下:
请参考以下步骤进行检查
您可在消息消息队列RocketMQ版控制台Group 详情页面查看指定Group的订阅关系是否一致。若查询结果不一致,请参见本文(3 常见订阅关系不一致问题)排查Consumer实例的消费代码。
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别订阅了TopicA、TopicB和TopicC,订阅的Topic不一致,不符合订阅关系一致性原则。
错误示例代码一
Consumer实例1-1:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Consumer实例1-2:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicB", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Consumer实例1-3:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicC", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
如下图所示,同一Group ID下的三个Consumer实例C1、C2和C3分别都订阅了TopicA,但是C1订阅TopicA的Tag为Tag1,C2和C3订阅的TopicA的Tag为Tag2,订阅同一Topic的Tag不一致,不符合订阅关系一致性原则。
错误示例代码二
Consumer实例2-1:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "Tag1", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Consumer实例2-2:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "Tag2", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Consumer实例2-3:
Properties properties = new Properties(); properties.put(PropertyKeyConst.GROUP_ID, "GID_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("TopicA", "Tag2", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });