SCB-1321 Optimize alpha throughput
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
index 922e40b..fcf5cec 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/FsmAutoConfiguration.java
@@ -23,9 +23,17 @@
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.Map;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActiveMQActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.KafkaActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.MemoryActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.channel.RedisActorEventChannel;
+import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.eventbus.EventSubscribeBeanPostProcessor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
@@ -36,6 +44,9 @@
@ConditionalOnProperty(value = {"alpha.feature.akka.enabled"})
public class FsmAutoConfiguration {
+ @Value("${alpha.feature.akka.channel.memory.size:-1}")
+ int memoryEventChannelMemorySize;
+
@Bean
public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext, ConfigurableEnvironment environment) {
ActorSystem system = ActorSystem.create("alpha-akka", akkaConfiguration(applicationContext,environment));
@@ -50,13 +61,38 @@
}
@Bean
- public SagaEventActorEventSender sagaEventConsumer(){
- return new SagaEventActorEventSender();
+ public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
+ return new EventSubscribeBeanPostProcessor();
}
@Bean
- public EventSubscribeBeanPostProcessor eventSubscribeBeanPostProcessor(){
- return new EventSubscribeBeanPostProcessor();
+ public ActorEventSink actorEventSink(){
+ return new SagaActorEventSender();
+ }
+
+ @Bean
+ @ConditionalOnMissingBean(ActorEventChannel.class)
+ @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "memory", matchIfMissing = true)
+ public ActorEventChannel memoryEventChannel(ActorEventSink actorEventSink){
+ return new MemoryActorEventChannel(actorEventSink, memoryEventChannelMemorySize);
+ }
+
+ @Bean
+ @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "activemq")
+ public ActorEventChannel activeMqEventChannel(ActorEventSink actorEventSink){
+ return new ActiveMQActorEventChannel(actorEventSink);
+ }
+
+ @Bean
+ @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "kafka")
+ public ActorEventChannel kafkaEventChannel(ActorEventSink actorEventSink){
+ return new KafkaActorEventChannel(actorEventSink);
+ }
+
+ @Bean
+ @ConditionalOnProperty(value = "alpha.feature.akka.channel.type", havingValue = "redis")
+ public ActorEventChannel redisEventChannel(ActorEventSink actorEventSink){
+ return new RedisActorEventChannel(actorEventSink);
}
}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
new file mode 100644
index 0000000..515f29c
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActiveMQActorEventChannel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Queue
+ * */
+
+public class ActiveMQActorEventChannel implements ActorEventChannel {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ActorEventSink actorEventSink;
+
+ public ActiveMQActorEventChannel(
+ ActorEventSink actorEventSink) {
+ this.actorEventSink = actorEventSink;
+ }
+
+ @Override
+ public void send(BaseEvent event){
+ try{
+ throw new UnsupportedOperationException();
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java
new file mode 100644
index 0000000..f026d91
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/ActorEventChannel.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+
+public interface ActorEventChannel {
+ void send(BaseEvent event);
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
new file mode 100644
index 0000000..7539069
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/KafkaActorEventChannel.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaActorEventChannel implements ActorEventChannel {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ActorEventSink actorEventSink;
+
+ public KafkaActorEventChannel(
+ ActorEventSink actorEventSink) {
+ this.actorEventSink = actorEventSink;
+ }
+
+ @Override
+ public void send(BaseEvent event){
+ try{
+ throw new UnsupportedOperationException();
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
new file mode 100644
index 0000000..1af2432
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/MemoryActorEventChannel.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryActorEventChannel implements ActorEventChannel {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ActorEventSink actorEventSink;
+ private final LinkedBlockingQueue<BaseEvent> eventQueue;
+ private int size;
+
+ public MemoryActorEventChannel(ActorEventSink actorEventSink, int size) {
+ this.size = size > 0 ? size : Integer.MAX_VALUE;
+ eventQueue = new LinkedBlockingQueue(this.size);
+ this.actorEventSink = actorEventSink;
+ new Thread(new EventConsumer(),"MemoryActorEventChannel").start();
+ }
+
+ @Override
+ public void send(BaseEvent event){
+ try{
+ eventQueue.put(event);
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+
+ class EventConsumer implements Runnable {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ BaseEvent event = eventQueue.peek();
+ if (event != null) {
+ actorEventSink.send(event);
+ eventQueue.poll();
+ } else {
+ Thread.sleep(10);
+ }
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage(), ex);
+ }
+ }
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
new file mode 100644
index 0000000..f055eec
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/channel/RedisActorEventChannel.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.channel;
+
+import java.lang.invoke.MethodHandles;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.apache.servicecomb.pack.alpha.fsm.sink.ActorEventSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pub/Sub
+ * */
+
+public class RedisActorEventChannel implements ActorEventChannel {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ActorEventSink actorEventSink;
+
+ public RedisActorEventChannel(
+ ActorEventSink actorEventSink) {
+ this.actorEventSink = actorEventSink;
+ }
+
+ @Override
+ public void send(BaseEvent event){
+ try{
+ throw new UnsupportedOperationException();
+ }catch (Exception e){
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
deleted file mode 100644
index 84d7914..0000000
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/event/consumer/SagaEventActorEventSender.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.pack.alpha.fsm.event.consumer;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import java.lang.invoke.MethodHandles;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
-import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class SagaEventActorEventSender {
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- @Autowired
- ActorSystem system;
-
- private Map<String,ActorRef> sagaCache = new ConcurrentHashMap<>();
-
- public void send(BaseEvent event) {
- if(LOG.isDebugEnabled()){
- LOG.debug("send {} ", event.toString());
- }
- try{
- ActorRef saga;
- if(sagaCache.containsKey(event.getGlobalTxId())){
- saga = sagaCache.get(event.getGlobalTxId());
- }else{
- saga = system.actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId());
- sagaCache.put(event.getGlobalTxId(), saga);
- }
- saga.tell(event, ActorRef.noSender());
- if(LOG.isDebugEnabled()){
- LOG.debug("tell {} to {}", event.toString(),saga);
- }
- }catch (Exception ex){
- throw ex;
- }
- }
-}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
index 59c1e4e..e9a5f60 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/model/SagaData.java
@@ -23,6 +23,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.servicecomb.pack.alpha.core.fsm.PackSagaEvent;
import org.apache.servicecomb.pack.alpha.fsm.SagaActorState;
@@ -38,7 +39,7 @@
private boolean terminated;
private SagaActorState lastState;
private AtomicLong compensationRunningCounter = new AtomicLong();
- private Map<String,TxEntity> txEntityMap = new HashMap<>();
+ private Map<String,TxEntity> txEntityMap = new ConcurrentHashMap<>();
private List<BaseEvent> events = new LinkedList<>();
public String getServiceName() {
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java
new file mode 100644
index 0000000..73ba220
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/ActorEventSink.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.sink;
+
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+
+public interface ActorEventSink {
+
+ void send(BaseEvent event) throws Exception;
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
new file mode 100644
index 0000000..cdc0828
--- /dev/null
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/sink/SagaActorEventSender.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.pack.alpha.fsm.sink;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+import org.apache.servicecomb.pack.alpha.fsm.SagaActor;
+import org.apache.servicecomb.pack.alpha.fsm.event.SagaStartedEvent;
+import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+public class SagaActorEventSender implements ActorEventSink {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Autowired
+ ActorSystem system;
+
+ private static final Timeout lookupTimeout = new Timeout(Duration.create(1, TimeUnit.SECONDS));
+
+ public void send(BaseEvent event) {
+ try{
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("send {} ", event.toString());
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("send {} ", event.toString());
+ }
+ if (event instanceof SagaStartedEvent) {
+ final ActorRef saga = system
+ .actorOf(SagaActor.props(event.getGlobalTxId()), event.getGlobalTxId());
+ saga.tell(event, ActorRef.noSender());
+ } else {
+ ActorSelection actorSelection = system
+ .actorSelection("/user/" + event.getGlobalTxId());
+ final Future<ActorRef> actorRefFuture = actorSelection.resolveOne(lookupTimeout);
+ final ActorRef saga = Await.result(actorRefFuture, lookupTimeout.duration());
+ saga.tell(event, ActorRef.noSender());
+ }
+ }catch (Exception ex){
+ throw new RuntimeException(ex);
+ }
+ }
+}
diff --git a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
index c1690c6..ae8d43d 100644
--- a/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
+++ b/alpha/alpha-fsm/src/main/java/org/apache/servicecomb/pack/alpha/fsm/spring/integration/akka/SagaDataExtension.java
@@ -31,6 +31,7 @@
public class SagaDataExtension extends AbstractExtensionId<SagaDataExt> {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final SagaDataExtension SAGA_DATA_EXTENSION_PROVIDER = new SagaDataExtension();
+ public static boolean autoCleanSagaDataMap = true; // Only for Test
@Override
public SagaDataExt createExtension(ExtendedActorSystem system) {
@@ -38,22 +39,24 @@
}
public static class SagaDataExt implements Extension {
- private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
+ //private final ConcurrentLinkedQueue<String> globalTxIds = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, SagaData> sagaDataMap = new ConcurrentHashMap();
private String lastGlobalTxId;
- private CleanMemForTest cleanMemForTest = new CleanMemForTest(globalTxIds,sagaDataMap);
+ private CleanMemForTest cleanMemForTest = new CleanMemForTest(sagaDataMap);
public SagaDataExt() {
// Just to avoid the overflow of the OldGen for stress testing
// Delete after SagaData persistence
- new Thread(cleanMemForTest).start();
+ if(autoCleanSagaDataMap){
+ new Thread(cleanMemForTest).start();
+ }
}
public void putSagaData(String globalTxId, SagaData sagaData) {
- if(!globalTxIds.contains(globalTxId)){
+ //if(!globalTxIds.contains(globalTxId)){
lastGlobalTxId = globalTxId;
- globalTxIds.add(globalTxId);
- }
+ // globalTxIds.add(globalTxId);
+ //}
sagaDataMap.put(globalTxId, sagaData);
}
@@ -71,7 +74,8 @@
// Only test
public void clearSagaData() {
- globalTxIds.clear();
+ //globalTxIds.clear();
+ lastGlobalTxId = null;
sagaDataMap.clear();
}
@@ -81,11 +85,9 @@
}
static class CleanMemForTest implements Runnable {
- final ConcurrentLinkedQueue<String> globalTxIds;
final ConcurrentHashMap<String, SagaData> sagaDataMap;
- public CleanMemForTest(ConcurrentLinkedQueue<String> globalTxIds, ConcurrentHashMap<String, SagaData> sagaDataMap) {
- this.globalTxIds = globalTxIds;
+ public CleanMemForTest(ConcurrentHashMap<String, SagaData> sagaDataMap) {
this.sagaDataMap = sagaDataMap;
}
@@ -93,19 +95,12 @@
public void run() {
while (true){
try{
- if(!globalTxIds.isEmpty()){
- int cache_size = globalTxIds.size()-5000;
- while(cache_size>0){
- sagaDataMap.remove(globalTxIds.poll());
- cache_size--;
- }
- }
+ sagaDataMap.clear();
}catch (Exception e){
LOG.error(e.getMessage(),e);
}finally {
- LOG.info("SagaData limit cache 5000, free memory globalTxIds {}, sagaDataMap size {}",globalTxIds.size(),sagaDataMap.size());
try {
- Thread.sleep(60000);
+ Thread.sleep(10000);
} catch (InterruptedException e) {
LOG.error(e.getMessage(),e);
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
index 1b4d84b..505d923 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaActorTest.java
@@ -67,6 +67,7 @@
@BeforeClass
public static void setup() {
+ SagaDataExtension.autoCleanSagaDataMap=false;
system = ActorSystem.create("SagaActorTest", ConfigFactory.parseMap(getPersistenceMemConfig()));
}
diff --git a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
index 82ae48a..69d2870 100644
--- a/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
+++ b/alpha/alpha-fsm/src/test/java/org/apache/servicecomb/pack/alpha/fsm/SagaIntegrationTest.java
@@ -23,15 +23,14 @@
import static org.junit.Assert.assertNotNull;
import akka.actor.ActorSystem;
-import com.google.common.eventbus.EventBus;
import java.util.UUID;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.sink.SagaActorEventSender;
import org.apache.servicecomb.pack.alpha.fsm.model.SagaData;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@@ -39,6 +38,7 @@
@SpringBootTest(classes = {SagaApplication.class},
properties = {
"alpha.feature.akka.enabled=true",
+ "alpha.feature.akka.channel.type=memory",
"akkaConfig.akka.persistence.journal.plugin=akka.persistence.journal.inmem",
"akkaConfig.akka.persistence.journal.leveldb.dir=target/example/journal",
"akkaConfig.akka.persistence.snapshot-store.plugin=akka.persistence.snapshot-store.local",
@@ -50,7 +50,12 @@
ActorSystem system;
@Autowired
- SagaEventActorEventSender sagaEventActorEventSender;
+ SagaActorEventSender sagaActorEventSender;
+
+ @BeforeClass
+ public static void setup(){
+ SagaDataExtension.autoCleanSagaDataMap=false;
+ }
@Test
public void successfulTest() {
@@ -59,7 +64,7 @@
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.successfulEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -79,7 +84,7 @@
final String globalTxId = UUID.randomUUID().toString();
final String localTxId_1 = UUID.randomUUID().toString();
SagaEventSender.firstTxAbortedEvents(globalTxId, localTxId_1).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
@@ -99,7 +104,7 @@
final String localTxId_1 = UUID.randomUUID().toString();
final String localTxId_2 = UUID.randomUUID().toString();
SagaEventSender.middleTxAbortedEvents(globalTxId, localTxId_1, localTxId_2).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -120,7 +125,7 @@
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.lastTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -142,7 +147,7 @@
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.sagaAbortedEventBeforeTxComponsitedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -164,7 +169,7 @@
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.receivedRemainingEventAfterFirstTxAbortedEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -186,7 +191,7 @@
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.sagaAbortedEventAfterAllTxEndedsEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -208,7 +213,7 @@
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.omegaSendSagaTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -231,7 +236,7 @@
final String localTxId_3 = UUID.randomUUID().toString();
final int timeout = 5; // second
SagaEventSender.sagaActorTriggerTimeoutEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3, timeout).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(timeout + 2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -253,7 +258,7 @@
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.successfulWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -275,7 +280,7 @@
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.successfulWithTxConcurrentCrossEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
@@ -297,7 +302,7 @@
final String localTxId_2 = UUID.randomUUID().toString();
final String localTxId_3 = UUID.randomUUID().toString();
SagaEventSender.lastTxAbortedEventWithTxConcurrentEvents(globalTxId, localTxId_1, localTxId_2, localTxId_3).stream().forEach( event -> {
- sagaEventActorEventSender.send(event);
+ sagaActorEventSender.send(event);
});
await().atMost(2, SECONDS).until(() -> {
SagaData sagaData = SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(system).getSagaData(globalTxId);
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
index 17589e9..d2e94b1 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaConfig.java
@@ -29,7 +29,7 @@
import com.google.common.eventbus.EventBus;
import org.apache.servicecomb.pack.alpha.core.*;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.alpha.server.fsm.GrpcSagaEventService;
import org.apache.servicecomb.pack.alpha.server.tcc.GrpcTccEventService;
import org.apache.servicecomb.pack.alpha.server.tcc.callback.TccPendingTaskRunner;
@@ -168,11 +168,11 @@
@Bean
@ConditionalOnProperty(name= "alpha.feature.akka.enabled", havingValue = "true")
- ServerStartable serverStartableMy(GrpcServerConfig serverConfig,
+ ServerStartable serverStartableWithAkka(GrpcServerConfig serverConfig,
Map<String, Map<String, OmegaCallback>> omegaCallbacks, GrpcTccEventService grpcTccEventService,
- TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, SagaEventActorEventSender sagaEventActorEventSender) throws IOException {
+ TccPendingTaskRunner tccPendingTaskRunner, TccEventScanner tccEventScanner, @Qualifier("alphaEventBus") EventBus eventBus, ActorEventChannel actorEventChannel) throws IOException {
ServerStartable bootstrap = new GrpcStartable(serverConfig, eventBus,
- new GrpcSagaEventService(sagaEventActorEventSender, omegaCallbacks), grpcTccEventService);
+ new GrpcSagaEventService(actorEventChannel, omegaCallbacks), grpcTccEventService);
new Thread(bootstrap::start).start();
tccPendingTaskRunner.start();
tccEventScanner.start();
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
index 2e6b8e0..b7a344a 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/AlphaEventController.java
@@ -43,7 +43,6 @@
@Controller
@RequestMapping("/saga")
@Profile("test")
-@ConditionalOnProperty(name = "alpha.feature.akka.enabled", havingValue = "false", matchIfMissing = true)
// Only export this Controller for test
class AlphaEventController {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
index 3cfb931..dcf5cf3 100644
--- a/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
+++ b/alpha/alpha-server/src/main/java/org/apache/servicecomb/pack/alpha/server/fsm/GrpcSagaEventService.java
@@ -27,7 +27,7 @@
import kamon.annotation.Trace;
import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
import org.apache.servicecomb.pack.alpha.fsm.event.base.BaseEvent;
-import org.apache.servicecomb.pack.alpha.fsm.event.consumer.SagaEventActorEventSender;
+import org.apache.servicecomb.pack.alpha.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.common.EventType;
import org.apache.servicecomb.pack.contract.grpc.GrpcAck;
import org.apache.servicecomb.pack.contract.grpc.GrpcCompensateCommand;
@@ -43,11 +43,11 @@
private static final GrpcAck REJECT = GrpcAck.newBuilder().setAborted(true).build();
private final Map<String, Map<String, OmegaCallback>> omegaCallbacks;
- private final SagaEventActorEventSender sagaEventActorEventSender;
+ private final ActorEventChannel actorEventChannel;
- public GrpcSagaEventService(SagaEventActorEventSender sagaEventActorEventSender,
+ public GrpcSagaEventService(ActorEventChannel actorEventChannel,
Map<String, Map<String, OmegaCallback>> omegaCallbacks) {
- this.sagaEventActorEventSender = sagaEventActorEventSender;
+ this.actorEventChannel = actorEventChannel;
this.omegaCallbacks = omegaCallbacks;
}
@@ -142,7 +142,7 @@
}
if (event != null) {
event.setCreateTime(new Date());
- sagaEventActorEventSender.send(event);
+ actorEventChannel.send(event);
}
responseObserver.onNext(ok ? ALLOW : REJECT);
responseObserver.onCompleted();
diff --git a/alpha/alpha-server/src/main/resources/application.yaml b/alpha/alpha-server/src/main/resources/application.yaml
index 1aa7a68..fa1b35a 100644
--- a/alpha/alpha-server/src/main/resources/application.yaml
+++ b/alpha/alpha-server/src/main/resources/application.yaml
@@ -21,6 +21,11 @@
server:
host: 0.0.0.0
port: 8080
+ feature:
+ akka:
+ enabled: false
+ channel:
+ type: memory
spring:
datasource:
@@ -45,6 +50,7 @@
metadataMap:
servicecomb-alpha-server: ${alpha.server.host}:${alpha.server.port}
+
akkaConfig:
akka.persistence.journal.plugin: akka.persistence.journal.inmem
akka.persistence.journal.leveldb.dir: target/example/journal
diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
index d44d728..d0902e5 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/fsm/AlphaIntegrationFsmTest.java
@@ -68,7 +68,8 @@
@BeforeClass
public static void beforeClass() {
- omegaEventSender.configClient(NettyChannelBuilder.forAddress("localhost", port).usePlaintext().build());
+ omegaEventSender.configClient(NettyChannelBuilder.forAddress("0.0.0.0", port).usePlaintext().build());
+ SagaDataExtension.autoCleanSagaDataMap=false;
}
@AfterClass