| package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit; |
| |
| import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher; |
| import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.integration.support.MessageBuilder; |
| |
| import java.lang.invoke.MethodHandles; |
| |
| public class RabbitMessagePublisher implements MessagePublisher<BaseEvent> { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| |
| private RabbitMessageChannel producerMessage; |
| private int partitionCount; |
| |
| public RabbitMessagePublisher(int partitionCount, RabbitMessageChannel producerMessage) { |
| |
| this.partitionCount = partitionCount; |
| this.producerMessage = producerMessage; |
| |
| } |
| |
| @Override |
| public void publish(BaseEvent data) { |
| |
| String globalTxId = data.getGlobalTxId(); |
| int partitionIndex = (Math.abs(globalTxId.hashCode()))% partitionCount; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("send message [{}] to [{}]", data, partitionIndex); |
| } |
| //partitionKey 分区的名称必须与配置中的key保持一致 |
| producerMessage.messageChannel().send(MessageBuilder.withPayload(data).setHeader("partitionKey", partitionIndex).build()); |
| |
| } |
| } |