支持rabbit
diff --git a/alpha/alpha-fsm/pom.xml b/alpha/alpha-fsm/pom.xml
index 67fb188..c81d4d3 100644
--- a/alpha/alpha-fsm/pom.xml
+++ b/alpha/alpha-fsm/pom.xml
@@ -53,6 +53,20 @@
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
+ <!-- spring-cloud-stream rabbit-->
+ <dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-stream</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-amqp</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
+ </dependency>
+
<!-- pack -->
<dependency>
<groupId>org.apache.servicecomb.pack</groupId>
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 2afc1a9..6facf00 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -29,6 +29,7 @@
import javax.annotation.PostConstruct;
import org.apache.servicecomb.pack.alpha.fsm.channel.kafka.KafkaChannelAutoConfiguration;
import org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryChannelAutoConfiguration;
+import org.apache.servicecomb.pack.alpha.fsm.channel.rabbit.RabbitChannelAutoConfiguration;
import org.apache.servicecomb.pack.alpha.fsm.channel.redis.RedisChannelAutoConfiguration;
import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.repository.NoneTransactionRepository;
@@ -51,7 +52,7 @@
@ImportAutoConfiguration({
MemoryChannelAutoConfiguration.class,
KafkaChannelAutoConfiguration.class,
- RedisChannelAutoConfiguration.class})
+ RedisChannelAutoConfiguration.class, RabbitChannelAutoConfiguration.class})
@ConditionalOnProperty(value = {"alpha.feature.akka.enabled"})
public class FsmAutoConfiguration {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitActorEventChannel.java
new file mode 100644
index 0000000..fb16288
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitActorEventChannel.java
@@ -0,0 +1,20 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+
+public class RabbitActorEventChannel extends AbstractActorEventChannel {
+
+ private RabbitMessagePublisher rabbitMqMessagePublisher;
+
+ public RabbitActorEventChannel(MetricsService metricsService, RabbitMessagePublisher rabbitMqMessagePublisher) {
+ super(metricsService);
+ this.rabbitMqMessagePublisher = rabbitMqMessagePublisher;
+ }
+
+ @Override
+ public void sendTo(BaseEvent event) {
+ rabbitMqMessagePublisher.publish(event);
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitChannelAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitChannelAutoConfiguration.java
new file mode 100644
index 0000000..796fe11
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitChannelAutoConfiguration.java
@@ -0,0 +1,63 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.cloud.stream.annotation.EnableBinding;
+import org.springframework.cloud.stream.annotation.StreamMessageConverter;
+import org.springframework.cloud.stream.config.BindingProperties;
+import org.springframework.cloud.stream.config.BindingServiceProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.messaging.converter.MessageConverter;
+
+import java.util.Map;
+
+@EnableBinding({RabbitMessageChannel.class})
+@Configuration
+@EnableConfigurationProperties(BindingServiceProperties.class)
+@ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "rabbit")
+public class RabbitChannelAutoConfiguration {
+
+
+ @Bean
+ @ConditionalOnMissingBean()
+ public RabbitMessagePublisher rabbitMessagePublisher(BindingServiceProperties bindingServiceProperties, RabbitMessageChannel producerMessage) {
+ Map<String, BindingProperties> bindings = bindingServiceProperties.getBindings();
+ // 分区数量,现在现在生产者与消费这都在alpha-server,所以rabbit的分区partitionCount与该数量保持一直
+ int partitionCount = bindings.get(RabbitMessageChannel.SERVICE_COMB_PACK_PRODUCER).getProducer().getPartitionCount();
+ RabbitMessagePublisher messagePublisher = new RabbitMessagePublisher(partitionCount, producerMessage);
+ return messagePublisher;
+ }
+
+// @StreamMessageConverter
+// public MessageConverter StreamMessageConverter() {
+// MappingJackson2MessageConverter mappingJackson2HttpMessageConverter = new MappingJackson2MessageConverter();
+//// ObjectMapper objectMapper = new ObjectMapper();
+//// mappingJackson2HttpMessageConverter.setObjectMapper(objectMapper);
+// return mappingJackson2HttpMessageConverter;
+// }
+
+ @Bean
+ RabbitSagaEventConsumer sagaEventRabbitConsumer(ActorSystem actorSystem,
+ @Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor,
+ MetricsService metricsService) {
+ return new RabbitSagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService);
+ }
+
+ @Bean
+ @ConditionalOnMissingBean(ActorEventChannel.class)
+ public ActorEventChannel kafkaEventChannel(MetricsService metricsService,
+ @Lazy RabbitMessagePublisher rabbitMessagePublisher) {
+ return new org.apache.servicecomb.pack.alpha.fsm.channel.rabbit.RabbitActorEventChannel(metricsService, rabbitMessagePublisher);
+ }
+
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessageChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessageChannel.java
new file mode 100644
index 0000000..c824a55
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessageChannel.java
@@ -0,0 +1,20 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+
+import org.springframework.cloud.stream.annotation.Input;
+import org.springframework.cloud.stream.annotation.Output;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.SubscribableChannel;
+
+public interface RabbitMessageChannel {
+
+
+ String SERVICE_COMB_PACK_PRODUCER = "service-comb-pack-producer";
+ String SERVICE_COMB_PACK_CONSUMER = "service-comb-pack-consumer";
+
+
+ @Output(SERVICE_COMB_PACK_PRODUCER)
+ MessageChannel messageChannel();
+
+ @Input(SERVICE_COMB_PACK_CONSUMER)
+ SubscribableChannel input();
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessagePublisher.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessagePublisher.java
new file mode 100644
index 0000000..d5fa21a
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitMessagePublisher.java
@@ -0,0 +1,38 @@
+package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+
+import org.apache.servicecomb.pack.alpha.core.fsm.channel.MessagePublisher;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.integration.support.MessageBuilder;
+
+import java.lang.invoke.MethodHandles;
+
+public class RabbitMessagePublisher implements MessagePublisher<BaseEvent> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+ private RabbitMessageChannel producerMessage;
+ private int partitionCount;
+
+ public RabbitMessagePublisher(int partitionCount, RabbitMessageChannel producerMessage) {
+
+ this.partitionCount = partitionCount;
+ this.producerMessage = producerMessage;
+
+ }
+
+ @Override
+ public void publish(BaseEvent data) {
+
+ String globalTxId = data.getGlobalTxId();
+ int partitionIndex = (Math.abs(globalTxId.hashCode()))% partitionCount;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("send message [{}] to [{}]", data, partitionIndex);
+ }
+ //partitionKey 分区的名称必须与配置中的key保持一致
+ producerMessage.messageChannel().send(MessageBuilder.withPayload(data).setHeader("partitionKey", partitionIndex).build());
+
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitSagaEventConsumer.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitSagaEventConsumer.java
new file mode 100644
index 0000000..ee976c7
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/rabbit/RabbitSagaEventConsumer.java
@@ -0,0 +1,55 @@
+
+
+package org.apache.servicecomb.pack.alpha.fsm.channel.rabbit;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
+import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.stream.annotation.StreamListener;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CompletionStage;
+
+public class RabbitSagaEventConsumer extends AbstractEventConsumer {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public RabbitSagaEventConsumer(ActorSystem actorSystem, ActorRef sagaShardRegionActor,
+ MetricsService metricsService) {
+ super(actorSystem, sagaShardRegionActor, metricsService);
+
+ }
+
+ @StreamListener(org.apache.servicecomb.pack.alpha.fsm.channel.rabbit.RabbitMessageChannel.SERVICE_COMB_PACK_CONSUMER)
+ public void receive(BaseEvent baseEvent) {
+ sendSagaActor(baseEvent);
+ }
+
+
+ private CompletionStage<String> sendSagaActor(BaseEvent event) {
+ try {
+ long begin = System.currentTimeMillis();
+ metricsService.metrics().doActorReceived();
+ Timeout timeout = new Timeout(Duration.create(10, "seconds"));
+ Future<Object> future = Patterns.ask(sagaShardRegionActor, event, timeout);
+ Await.result(future, timeout.duration());
+ long end = System.currentTimeMillis();
+ metricsService.metrics().doActorAccepted();
+ metricsService.metrics().doActorAvgTime(end - begin);
+ return CompletableFuture.completedFuture("OK");
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ metricsService.metrics().doActorRejected();
+ throw new CompletionException(ex);
+ }
+ }
+}
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 3cb74e3..48aad85 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -148,6 +148,46 @@
---
spring:
+ profiles: rabbit
+ cloud:
+ stream:
+ binders:
+ defaultRabbit:
+ type: rabbit
+ environment:
+ spring:
+ rabbitmq:
+ host: 127.0.0.1
+ username: servicecomb-pack
+ password: H123213PWD
+ virtual-host: servicecomb-pack
+ bindings:
+ service-comb-pack-producer:
+ destination: exchange-service-comb-pack
+ content-type: application/json
+ producer:
+ partition-count: 3
+ partition-key-expression: headers['partitionKey']
+
+ service-comb-pack-consumer:
+ group: group-pack
+ content-type: application/json
+ destination: exchange-service-comb-pack
+ consumer:
+ partitioned: true
+ instance-count: 1
+ instance-index: 0
+alpha:
+ feature:
+ akka:
+ enabled: true
+ channel:
+ type: rabbit
+
+
+
+---
+spring:
profiles: cluster
alpha:
diff --git a/pom.xml b/pom.xml
index 7777dca..64a1fd8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,7 +54,7 @@
<spring.cloud.starter.consul.discovery.version>2.1.1.RELEASE</spring.cloud.starter.consul.discovery.version>
<spring.cloud.starter.zookeeper.discovery.version>2.1.1.RELEASE</spring.cloud.starter.zookeeper.discovery.version>
<spring.cloud.starter.alibaba.nacos.discovery.version>0.2.2.RELEASE</spring.cloud.starter.alibaba.nacos.discovery.version>
-
+ <spring-cloud-stream>2.2.1.RELEASE</spring-cloud-stream>
<java.chassis.version>1.2.1</java.chassis.version>
<akka.version>2.5.14</akka.version>
<alpakka.version>1.0.5</alpakka.version>
@@ -352,6 +352,22 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-stream</artifactId>
+ <version>${spring-cloud-stream}</version>
+
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-amqp</artifactId>
+ <version>${spring.boot.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.cloud</groupId>
+ <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
+ <version>${spring-cloud-stream}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.servicecomb.pack</groupId>
<artifactId>alpha-spring-cloud-starter-consul</artifactId>
<version>${project.version}</version>