SCB-1321 Optimize alpha throughput
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 922e40b..fcf5cec 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -23,9 +23,17 @@
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import java.util.Map;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.MemoryActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.RedisActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
@@ -36,6 +44,9 @@
 @ConditionalOnProperty(value = {"alpha.feature.akka.enabled"})
 public class FsmAutoConfiguration {
 
+  @Value("${alpha.feature.akka.channel.memory.size:-1}")
+  int memoryEventChannelMemorySize;
+
   @Bean
   public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
     ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment));
@@ -50,13 +61,38 @@
   }
 
   @Bean
-  public SagaEventActorEventSender sagaEventConsumer(){
-    return new SagaEventActorEventSender();
+  public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
+    return new EventSubscribeBeanPostProcessor();
   }
 
   @Bean
-  public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
-    return new EventSubscribeBeanPostProcessor();
+  public ActorEventSink actorEventSink(){
+    return new SagaActorEventSender();
+  }
+
+  @Bean
+  @ConditionalOnMissingBean(ActorEventChannel.class)
+  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true)
+  public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink){
+    return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize);
+  }
+
+  @Bean
+  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq")
+  public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink){
+    return new ActiveMQActorEventChannel(actorEventSink);
+  }
+
+  @Bean
+  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
+  public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink){
+    return new KafkaActorEventChannel(actorEventSink);
+  }
+
+  @Bean
+  @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis")
+  public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink){
+    return new RedisActorEventChannel(actorEventSink);
   }
 
 }
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
new file mode 100644
index 0000000..515f29c
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Queue
+ * */
+
+public class ActiveMQActorEventChannel implements ActorEventChannel {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ActorEventSink actorEventSink;
+
+  public ActiveMQActorEventChannel(
+      ActorEventSink actorEventSink) {
+    this.actorEventSink = actorEventSink;
+  }
+
+  @Override
+  public void send(BaseEvent event){
+    try{
+      throw new UnsupportedOperationException();
+    }catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java
new file mode 100644
index 0000000..f026d91
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+
+public interface ActorEventChannel {
+  void send(BaseEvent event);
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
new file mode 100644
index 0000000..7539069
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaActorEventChannel implements ActorEventChannel {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ActorEventSink actorEventSink;
+
+  public KafkaActorEventChannel(
+      ActorEventSink actorEventSink) {
+    this.actorEventSink = actorEventSink;
+  }
+
+  @Override
+  public void send(BaseEvent event){
+    try{
+      throw new UnsupportedOperationException();
+    }catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
new file mode 100644
index 0000000..1af2432
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryActorEventChannel implements ActorEventChannel {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ActorEventSink actorEventSink;
+  private final LinkedBlockingQueue<BaseEvent> eventQueue;
+  private int size;
+
+  public MemoryActorEventChannel(ActorEventSink actorEventSink, int size) {
+    this.size = size > 0 ? size : Integer.MAX_VALUE;
+    eventQueue = new LinkedBlockingQueue(this.size);
+    this.actorEventSink = actorEventSink;
+    new Thread(new EventConsumer(),"MemoryActorEventChannel").start();
+  }
+
+  @Override
+  public void send(BaseEvent event){
+    try{
+      eventQueue.put(event);
+    }catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+
+  class EventConsumer implements Runnable {
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          BaseEvent event = eventQueue.peek();
+          if (event != null) {
+            actorEventSink.send(event);
+            eventQueue.poll();
+          } else {
+            Thread.sleep(10);
+          }
+        } catch (Exception ex) {
+          LOG.error(ex.getMessage(), ex);
+        }
+      }
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
new file mode 100644
index 0000000..f055eec
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pub/Sub
+ * */
+
+public class RedisActorEventChannel implements ActorEventChannel {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private final ActorEventSink actorEventSink;
+
+  public RedisActorEventChannel(
+      ActorEventSink actorEventSink) {
+    this.actorEventSink = actorEventSink;
+  }
+
+  @Override
+  public void send(BaseEvent event){
+    try{
+      throw new UnsupportedOperationException();
+    }catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
deleted file mode 100644
index 84d7914..0000000
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.pack.alpha.fsm.event.consumer;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import java.lang.invoke.MethodHandles;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class SagaEventActorEventSender {
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  @Autowired
-  ActorSystem system;
-
-  private Map<String,ActorRef> sagaCache = new ConcurrentHashMap<>();
-
-  public void send(BaseEvent event) {
-    if(LOG.isDebugEnabled()){
-      LOG.debug("send {} ", event.toString());
-    }
-    try{
-      ActorRef saga;
-      if(sagaCache.containsKey(event.getGlobalTxId())){
-        saga = sagaCache.get(event.getGlobalTxId());
-      }else{
-        saga = system.actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId());
-        sagaCache.put(event.getGlobalTxId(), saga);
-      }
-      saga.tell(event, ActorRef.noSender());
-      if(LOG.isDebugEnabled()){
-        LOG.debug("tell {} to {}", event.toString(),saga);
-      }
-    }catch (Exception ex){
-      throw ex;
-    }
-  }
-}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
index 59c1e4e..e9a5f60 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
@@ -23,6 +23,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.servicecomb.pack.alpha.core.fsm.PackSagaEvent;
 import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
@@ -38,7 +39,7 @@
   private boolean terminated;
   private SagaActorState lastState;
   private AtomicLong compensationRunningCounter = new AtomicLong();
-  private Map<String,TxEntity> txEntityMap = new HashMap<>();
+  private Map<String,TxEntity> txEntityMap = new ConcurrentHashMap<>();
   private List<BaseEvent> events = new LinkedList<>();
 
   public String getServiceName() {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java
new file mode 100644
index 0000000..73ba220
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.sink;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+
+public interface ActorEventSink {
+
+  void send(BaseEvent event) throws Exception;
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
new file mode 100644
index 0000000..cdc0828
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.sink;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class SagaActorEventSender implements ActorEventSink {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  @Autowired
+  ActorSystem system;
+
+  private static final Timeout lookupTimeout = new Timeout(Duration.create(1, TimeUnit.SECONDS));
+
+  public void send(BaseEvent event) {
+    try{
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("send {} ", event.toString());
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("send {} ", event.toString());
+      }
+      if (event instanceof SagaStartedEvent) {
+        final ActorRef saga = system
+            .actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId());
+        saga.tell(event, ActorRef.noSender());
+      } else {
+        ActorSelection actorSelection = system
+            .actorSelection("/user/" + event.getGlobalTxId());
+        final Future<ActorRef> actorRefFuture = actorSelection.resolveOne(lookupTimeout);
+        final ActorRef saga = Await.result(actorRefFuture, lookupTimeout.duration());
+        saga.tell(event, ActorRef.noSender());
+      }
+    }catch (Exception ex){
+      throw new RuntimeException(ex);
+    }
+  }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index c1690c6..ae8d43d 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -31,6 +31,7 @@
 public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension();
+  public static boolean autoCleanSagaDataMap = true; // Only for Test
 
   @Override
   public SagaDataExt createExtension(ExtendedActorSystem system) {
@@ -38,22 +39,24 @@
   }
 
   public static class SagaDataExt implements Extension {
-    private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
+    //private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
     private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
     private String lastGlobalTxId;
-    private CleanMemForTest cleanMemForTest = new CleanMemForTest(globalTxIds,sagaDataMap);
+    private CleanMemForTest cleanMemForTest = new CleanMemForTest(sagaDataMap);
 
     public SagaDataExt() {
       // Just to avoid the overflow of the OldGen for stress testing
       // Delete after SagaData persistence
-      new Thread(cleanMemForTest).start();
+      if(autoCleanSagaDataMap){
+        new Thread(cleanMemForTest).start();
+      }
     }
 
     public void putSagaData(String globalTxId, SagaData sagaData) {
-      if(!globalTxIds.contains(globalTxId)){
+      //if(!globalTxIds.contains(globalTxId)){
         lastGlobalTxId = globalTxId;
-        globalTxIds.add(globalTxId);
-      }
+      //  globalTxIds.add(globalTxId);
+      //}
       sagaDataMap.put(globalTxId, sagaData);
     }
 
@@ -71,7 +74,8 @@
 
     // Only test
     public void clearSagaData() {
-      globalTxIds.clear();
+      //globalTxIds.clear();
+      lastGlobalTxId = null;
       sagaDataMap.clear();
     }
 
@@ -81,11 +85,9 @@
   }
 
   static class CleanMemForTest implements Runnable {
-    final ConcurrentLinkedQueue<String> globalTxIds;
     final ConcurrentHashMap<String, SagaData> sagaDataMap;
 
-    public CleanMemForTest(ConcurrentLinkedQueue<String> globalTxIds, ConcurrentHashMap<String, SagaData> sagaDataMap) {
-      this.globalTxIds = globalTxIds;
+    public CleanMemForTest(ConcurrentHashMap<String, SagaData> sagaDataMap) {
       this.sagaDataMap = sagaDataMap;
     }
 
@@ -93,19 +95,12 @@
     public void run() {
       while (true){
         try{
-          if(!globalTxIds.isEmpty()){
-            int cache_size = globalTxIds.size()-5000;
-            while(cache_size>0){
-              sagaDataMap.remove(globalTxIds.poll());
-              cache_size--;
-            }
-          }
+          sagaDataMap.clear();
         }catch (Exception e){
           LOG.error(e.getMessage(),e);
         }finally {
-          LOG.info("SagaData limit cache 5000, free memory globalTxIds {}, sagaDataMap size {}",globalTxIds.size(),sagaDataMap.size());
           try {
-            Thread.sleep(60000);
+            Thread.sleep(10000);
           } catch (InterruptedException e) {
             LOG.error(e.getMessage(),e);
           }
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 1b4d84b..505d923 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -67,6 +67,7 @@
 
   @BeforeClass
   public static void setup() {
+    SagaDataExtension.autoCleanSagaDataMap=false;
     system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig()));
   }
 
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 82ae48a..69d2870 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -23,15 +23,14 @@
 import static org.junit.Assert.assertNotNull;
 
 import akka.actor.ActorSystem;
-import com.google.common.eventbus.EventBus;
 import java.util.UUID;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
 import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
 import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
@@ -39,6 +38,7 @@
 @SpringBootTest(classes = {SagaApplication.class},
     properties = {
         "alpha.feature.akka.enabled=true",
+        "alpha.feature.akka.channel.type=memory",
         "akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
         "akkaConfig.akka.persistence.journal.leveldb.dir=target/example/journal",
         "akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
@@ -50,7 +50,12 @@
   ActorSystem system;
   
   @Autowired
-  SagaEventActorEventSender sagaEventActorEventSender;
+  SagaActorEventSender sagaActorEventSender;
+
+  @BeforeClass
+  public static void setup(){
+    SagaDataExtension.autoCleanSagaDataMap=false;
+  }
 
   @Test
   public void successfulTest() {
@@ -59,7 +64,7 @@
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -79,7 +84,7 @@
     final String globalTxId = UUID.randomUUID().toString();
     final String localTxId_1 = UUID.randomUUID().toString();
     SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
 
     await().atMost(2, SECONDS).until(() -> {
@@ -99,7 +104,7 @@
     final String localTxId_1 = UUID.randomUUID().toString();
     final String localTxId_2 = UUID.randomUUID().toString();
     SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -120,7 +125,7 @@
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -142,7 +147,7 @@
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -164,7 +169,7 @@
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -186,7 +191,7 @@
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -208,7 +213,7 @@
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -231,7 +236,7 @@
     final String localTxId_3 = UUID.randomUUID().toString();
     final int timeout = 5; // second
     SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(timeout + 2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -253,7 +258,7 @@
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -275,7 +280,7 @@
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -297,7 +302,7 @@
     final String localTxId_2 = UUID.randomUUID().toString();
     final String localTxId_3 = UUID.randomUUID().toString();
     SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
-      sagaEventActorEventSender.send(event);
+      sagaActorEventSender.send(event);
     });
     await().atMost(2, SECONDS).until(() -> {
       SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
index 17589e9..d2e94b1 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
@@ -29,7 +29,7 @@
 
 import com.google.common.eventbus.EventBus;
 import org.apache.servicecomb.pack.alpha.core.*;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
 import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService;
 import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner;
@@ -168,11 +168,11 @@
 
   @Bean
   @ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true")
-  ServerStartable serverStartableMy(GrpcServerConfig serverConfig,
+  ServerStartable serverStartableWithAkka(GrpcServerConfig serverConfig,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
-      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, SagaEventActorEventSender sagaEventActorEventSender) throws IOException {
+      TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, ActorEventChannel actorEventChannel) throws IOException {
     ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
-        new GrpcSagaEventService(sagaEventActorEventSender, omegaCallbacks), grpcTccEventService);
+        new GrpcSagaEventService(actorEventChannel, omegaCallbacks), grpcTccEventService);
     new Thread(bootstrap::start).start();
     tccPendingTaskRunner.start();
     tccEventScanner.start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
index 2e6b8e0..b7a344a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
@@ -43,7 +43,6 @@
 @Controller
 @RequestMapping("/saga")
 @Profile("test")
-@ConditionalOnProperty(name = "alpha.feature.akka.enabled", havingValue = "false", matchIfMissing = true)
 // Only export this Controller for test
 class AlphaEventController {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 3cfb931..dcf5cf3 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -27,7 +27,7 @@
 import kamon.annotation.Trace;
 import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
 import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
 import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
 import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
@@ -43,11 +43,11 @@
   private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
 
   private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
-  private final SagaEventActorEventSender sagaEventActorEventSender;
+  private final ActorEventChannel actorEventChannel;
 
-  public GrpcSagaEventService(SagaEventActorEventSender sagaEventActorEventSender,
+  public GrpcSagaEventService(ActorEventChannel actorEventChannel,
       Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
-    this.sagaEventActorEventSender = sagaEventActorEventSender;
+    this.actorEventChannel = actorEventChannel;
     this.omegaCallbacks = omegaCallbacks;
   }
 
@@ -142,7 +142,7 @@
     }
     if (event != null) {
       event.setCreateTime(new Date());
-      sagaEventActorEventSender.send(event);
+      actorEventChannel.send(event);
     }
     responseObserver.onNext(ok ? ALLOW : REJECT);
     responseObserver.onCompleted();
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 1aa7a68..fa1b35a 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -21,6 +21,11 @@
   server:
     host: 0.0.0.0
     port: 8080
+  feature:
+    akka:
+      enabled: false
+      channel:
+        type: memory
 
 spring:
   datasource:
@@ -45,6 +50,7 @@
     metadataMap:
       servicecomb-alpha-server: ${alpha.server.host}:${alpha.server.port}
 
+
 akkaConfig:
   akka.persistence.journal.plugin: akka.persistence.journal.inmem
   akka.persistence.journal.leveldb.dir: target/example/journal
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index d44d728..d0902e5 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -68,7 +68,8 @@
 
   @BeforeClass
   public static void beforeClass() {
-    omegaEventSender.configClient(NettyChannelBuilder.forAddress("localhost", port).usePlaintext().build());
+    omegaEventSender.configClient(NettyChannelBuilder.forAddress("0.0.0.0", port).usePlaintext().build());
+    SagaDataExtension.autoCleanSagaDataMap=false;
   }
 
   @AfterClass