支持rabbit
diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
index 67fb188..c81d4d3 100644
--- a/alpha/alpha-fsm/pom.xml
+++ b/alpha/alpha-fsm/pom.xml
@@ -53,6 +53,20 @@
       <artifactId>spring-boot-autoconfigure</artifactId>
     </dependency>
 
+    <!-- spring-cloud-stream rabbit-->
+    <dependency>
+      <groupId>org.springframework.cloud</groupId>
+      <artifactId>spring-cloud-stream</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-amqp</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.cloud</groupId>
+      <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
+    </dependency>
+
     <!-- pack -->
     <dependency>
       <groupId>org.apache.servicecomb.pack</groupId>
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 2afc1a9..6facf00 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -29,6 +29,7 @@
 import javax.annotation.PostConstruct;
 import org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaChannelAutoConfiguration;
 import org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryChannelAutoConfiguration;
+import org.apache.servicecomb.pack.alpha.fsm.channel.rabbit.RabbitChannelAutoConfiguration;
 import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisChannelAutoConfiguration;
 import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
 import org.apache.servicecomb.pack.alpha.fsm.repository.NoneTransactionRepository;
@@ -51,7 +52,7 @@
 @ImportAutoConfiguration({
     MemoryChannelAutoConfiguration.class,
     KafkaChannelAutoConfiguration.class,
-    RedisChannelAutoConfiguration.class})
+    RedisChannelAutoConfiguration.class, RabbitChannelAutoConfiguration.class})
 @ConditionalOnProperty(value = {"alpha.feature.akka.enabled"})
 public class FsmAutoConfiguration {
 
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitActorEventChannel.java
new file mode 100644
index 0000000..fb16288
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitActorEventChannel.java
@@ -0,0 +1,20 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+
+public class RabbitActorEventChannel extends AbstractActorEventChannel {
+
+    private RabbitMessagePublisher rabbitMqMessagePublisher;
+
+    public RabbitActorEventChannel(MetricsService metricsService, RabbitMessagePublisher rabbitMqMessagePublisher) {
+        super(metricsService);
+        this.rabbitMqMessagePublisher = rabbitMqMessagePublisher;
+    }
+
+    @Override
+    public void sendTo(BaseEvent event) {
+        rabbitMqMessagePublisher.publish(event);
+    }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitChannelAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitChannelAutoConfiguration.java
new file mode 100644
index 0000000..796fe11
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitChannelAutoConfiguration.java
@@ -0,0 +1,63 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamMessageConverter;
+import org.springframework.cloud.stream.config.BindingProperties;
+import org.springframework.cloud.stream.config.BindingServiceProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
+
+import java.util.Map;
+
+@EnableBinding({RabbitMessageChannel.class})
+@Configuration
+@EnableConfigurationProperties(BindingServiceProperties.class)
+@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "rabbit")
+public class RabbitChannelAutoConfiguration {
+
+
+    @Bean
+    @ConditionalOnMissingBean()
+    public RabbitMessagePublisher rabbitMessagePublisher(BindingServiceProperties bindingServiceProperties, RabbitMessageChannel producerMessage) {
+        Map<String, BindingProperties> bindings = bindingServiceProperties.getBindings();
+        // 分区数量,现在现在生产者与消费这都在alpha-server,所以rabbit的分区partitionCount与该数量保持一直
+        int partitionCount = bindings.get(RabbitMessageChannel.SERVICE_COMB_PACK_PRODUCER).getProducer().getPartitionCount();
+        RabbitMessagePublisher messagePublisher = new RabbitMessagePublisher(partitionCount, producerMessage);
+        return messagePublisher;
+    }
+
+//    @StreamMessageConverter
+//    public MessageConverter StreamMessageConverter() {
+//        MappingJackson2MessageConverter mappingJackson2HttpMessageConverter = new MappingJackson2MessageConverter();
+////        ObjectMapper objectMapper = new ObjectMapper();
+////        mappingJackson2HttpMessageConverter.setObjectMapper(objectMapper);
+//        return mappingJackson2HttpMessageConverter;
+//    }
+
+    @Bean
+    RabbitSagaEventConsumer sagaEventRabbitConsumer(ActorSystem actorSystem,
+                                                    @Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor,
+                                                    MetricsService metricsService) {
+        return new RabbitSagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService);
+    }
+
+    @Bean
+    @ConditionalOnMissingBean(ActorEventChannel.class)
+    public ActorEventChannel kafkaEventChannel(MetricsService metricsService,
+                                               @Lazy RabbitMessagePublisher rabbitMessagePublisher) {
+        return new org.apache.servicecomb.pack.alpha.fsm.channel.rabbit.RabbitActorEventChannel(metricsService, rabbitMessagePublisher);
+    }
+
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessageChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessageChannel.java
new file mode 100644
index 0000000..c824a55
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessageChannel.java
@@ -0,0 +1,20 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.SubscribableChannel;
+
+public interface RabbitMessageChannel {
+
+
+    String SERVICE_COMB_PACK_PRODUCER = "service-comb-pack-producer";
+    String SERVICE_COMB_PACK_CONSUMER = "service-comb-pack-consumer";
+
+
+    @Output(SERVICE_COMB_PACK_PRODUCER)
+    MessageChannel messageChannel();
+
+    @Input(SERVICE_COMB_PACK_CONSUMER)
+    SubscribableChannel input();
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessagePublisher.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessagePublisher.java
new file mode 100644
index 0000000..d5fa21a
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessagePublisher.java
@@ -0,0 +1,38 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+
+import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.integration.support.MessageBuilder;
+
+import java.lang.invoke.MethodHandles;
+
+public class RabbitMessagePublisher implements MessagePublisher<BaseEvent> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+    private RabbitMessageChannel producerMessage;
+    private int partitionCount;
+
+    public RabbitMessagePublisher(int partitionCount, RabbitMessageChannel producerMessage) {
+
+        this.partitionCount = partitionCount;
+        this.producerMessage = producerMessage;
+
+    }
+
+    @Override
+    public void publish(BaseEvent data) {
+
+        String globalTxId = data.getGlobalTxId();
+        int partitionIndex = (Math.abs(globalTxId.hashCode()))% partitionCount;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("send message [{}] to [{}]", data, partitionIndex);
+        }
+        //partitionKey 分区的名称必须与配置中的key保持一致
+        producerMessage.messageChannel().send(MessageBuilder.withPayload(data).setHeader("partitionKey", partitionIndex).build());
+
+    }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitSagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitSagaEventConsumer.java
new file mode 100644
index 0000000..ee976c7
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitSagaEventConsumer.java
@@ -0,0 +1,55 @@
+
+
+package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+
+public class RabbitSagaEventConsumer extends AbstractEventConsumer {
+    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    public RabbitSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor,
+                                   MetricsService metricsService) {
+        super(actorSystem, sagaShardRegionActor, metricsService);
+
+    }
+
+    @StreamListener(org.apache.servicecomb.pack.alpha.fsm.channel.rabbit.RabbitMessageChannel.SERVICE_COMB_PACK_CONSUMER)
+    public void receive(BaseEvent baseEvent) {
+        sendSagaActor(baseEvent);
+    }
+
+
+    private CompletionStage<String> sendSagaActor(BaseEvent event) {
+        try {
+            long begin = System.currentTimeMillis();
+            metricsService.metrics().doActorReceived();
+            Timeout timeout = new Timeout(Duration.create(10, "seconds"));
+            Future<Object> future = Patterns.ask(sagaShardRegionActor, event, timeout);
+            Await.result(future, timeout.duration());
+            long end = System.currentTimeMillis();
+            metricsService.metrics().doActorAccepted();
+            metricsService.metrics().doActorAvgTime(end - begin);
+            return CompletableFuture.completedFuture("OK");
+        } catch (Exception ex) {
+            LOG.error(ex.getMessage(), ex);
+            metricsService.metrics().doActorRejected();
+            throw new CompletionException(ex);
+        }
+    }
+}
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 3cb74e3..48aad85 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -148,6 +148,46 @@
 
 ---
 spring:
+  profiles: rabbit
+  cloud:
+    stream:
+      binders:
+        defaultRabbit:
+          type: rabbit
+          environment:
+            spring:
+              rabbitmq:
+                host: 127.0.0.1
+                username: servicecomb-pack
+                password: H123213PWD
+                virtual-host: servicecomb-pack
+      bindings:
+        service-comb-pack-producer:
+          destination: exchange-service-comb-pack
+          content-type: application/json
+          producer:
+            partition-count: 3
+            partition-key-expression: headers['partitionKey']
+
+        service-comb-pack-consumer:
+          group: group-pack
+          content-type: application/json
+          destination: exchange-service-comb-pack
+          consumer:
+            partitioned: true
+      instance-count: 1
+      instance-index: 0
+alpha:
+  feature:
+    akka:
+      enabled: true
+      channel:
+        type: rabbit
+
+
+
+---
+spring:
   profiles: cluster
 
 alpha:
diff --git a/pom.xml b/pom.xml
index 7777dca..64a1fd8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
     <spring.cloud.starter.consul.discovery.version>2.1.1.RELEASE</spring.cloud.starter.consul.discovery.version>
     <spring.cloud.starter.zookeeper.discovery.version>2.1.1.RELEASE</spring.cloud.starter.zookeeper.discovery.version>
     <spring.cloud.starter.alibaba.nacos.discovery.version>0.2.2.RELEASE</spring.cloud.starter.alibaba.nacos.discovery.version>
-
+    <spring-cloud-stream>2.2.1.RELEASE</spring-cloud-stream>
     <java.chassis.version>1.2.1</java.chassis.version>
     <akka.version>2.5.14</akka.version>
     <alpakka.version>1.0.5</alpakka.version>
@@ -352,6 +352,22 @@
         <version>${project.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.springframework.cloud</groupId>
+        <artifactId>spring-cloud-stream</artifactId>
+        <version>${spring-cloud-stream}</version>
+
+      </dependency>
+      <dependency>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-amqp</artifactId>
+        <version>${spring.boot.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.springframework.cloud</groupId>
+        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
+        <version>${spring-cloud-stream}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.servicecomb.pack</groupId>
         <artifactId>alpha-spring-cloud-starter-consul</artifactId>
         <version>${project.version}</version>