blob: d4ca777b37deb2296e44129c4353902482e39ec3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.pack.alpha.fsm;
import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER;
import static org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.SpringAkkaExtension.SPRING_EXTENSION_PROVIDER;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Map;
import org.apache.http.HttpHost;
import org.apache.servicecomb.pack.alpha.core.OmegaCallback;
import org.apache.servicecomb.pack.alpha.core.fsm.channel.ActorEventChannel;
import org.apache.servicecomb.pack.alpha.core.metrics.AlphaMetricsEndpoint;
import org.apache.servicecomb.pack.alpha.fsm.api.APIv1Controller;
import org.apache.servicecomb.pack.alpha.fsm.api.APIv1Impl;
import org.apache.servicecomb.pack.alpha.fsm.channel.AbstractEventConsumer;
import org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemoryActorEventChannel;
import org.apache.servicecomb.pack.alpha.fsm.channel.memory.MemorySagaEventConsumer;
import org.apache.servicecomb.pack.alpha.fsm.metrics.MetricsService;
import org.apache.servicecomb.pack.alpha.fsm.metrics.AlphaMetricsEndpointImpl;
import org.apache.servicecomb.pack.alpha.fsm.properties.SpecSagaAkkaProperties;
import org.apache.servicecomb.pack.alpha.fsm.repository.NoneTransactionRepository;
import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepository;
import org.apache.servicecomb.pack.alpha.fsm.repository.TransactionRepositoryChannel;
import org.apache.servicecomb.pack.alpha.fsm.repository.channel.DefaultTransactionRepositoryChannel;
import org.apache.servicecomb.pack.alpha.fsm.repository.elasticsearch.ElasticsearchTransactionRepository;
import org.apache.servicecomb.pack.alpha.fsm.spring.integration.akka.AkkaConfigPropertyAdapter;
import org.apache.servicecomb.pack.alpha.fsm.test.FsmSagaDataController;
import org.apache.servicecomb.pack.common.AlphaMetaKeys;
import org.apache.servicecomb.pack.contract.grpc.ServerMeta;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
@Configuration
@ImportAutoConfiguration({SpecSagaAkkaProperties.class})
@ConditionalOnExpression("'${alpha.spec.names}'.contains('saga-akka')")
public class AlphaSpecSagaFsmAutoConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public AlphaSpecSagaFsmAutoConfiguration() {
System.setProperty("es.set.netty.runtime.available.processors", "false");
LOG.info("Alpha Specification Saga Akka");
}
@Bean
public MetricsService metricsService() {
return new MetricsService();
}
@Bean
public ActorSystem actorSystem(ConfigurableApplicationContext applicationContext,
ConfigurableEnvironment environment, MetricsService metricsService,
TransactionRepositoryChannel repositoryChannel) {
ActorSystem system = ActorSystem
.create("alpha-cluster", akkaConfiguration(applicationContext, environment));
SPRING_EXTENSION_PROVIDER.get(system).initialize(applicationContext);
SAGA_DATA_EXTENSION_PROVIDER.get(system).setRepositoryChannel(repositoryChannel);
SAGA_DATA_EXTENSION_PROVIDER.get(system).setMetricsService(metricsService);
return system;
}
@Bean
public Config akkaConfiguration(ConfigurableApplicationContext applicationContext,
ConfigurableEnvironment environment) {
final Map<String, Object> converted = AkkaConfigPropertyAdapter.getPropertyMap(environment);
return ConfigFactory.parseMap(converted)
.withFallback(ConfigFactory.defaultReference(applicationContext.getClassLoader()));
}
@Bean(name = "sagaShardRegionActor")
public ActorRef sagaShardRegionActor(ActorSystem actorSystem) {
return actorSystem.actorOf(Props.create(SagaShardRegionActor.class));
}
@Bean
public ElasticsearchRestTemplate elasticsearchRestTemplate(
SpecSagaAkkaProperties specSagaAkkaProperties) {
HttpHost[] hosts = Arrays.stream(
specSagaAkkaProperties.getRepository().getElasticsearch().getUris()
.split(","))
.map(uri -> HttpHost.create(uri))
.toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(hosts);
RestHighLevelClient client = new RestHighLevelClient(builder);
return new ElasticsearchRestTemplate(client);
}
@Bean
public TransactionRepository transactionRepository(
SpecSagaAkkaProperties specSagaAkkaProperties,
ElasticsearchRestTemplate elasticsearchRestTemplate,
MetricsService metricsService) {
if (specSagaAkkaProperties.getRepository().getName().equals("elasticsearch")) {
return new ElasticsearchTransactionRepository(specSagaAkkaProperties.getRepository()
.getElasticsearch(), elasticsearchRestTemplate, metricsService);
} else {
return new NoneTransactionRepository();
}
}
@Bean
TransactionRepositoryChannel memoryTransactionRepositoryChannel(TransactionRepository repository,
MetricsService metricsService) {
return new DefaultTransactionRepositoryChannel(repository, metricsService);
}
@Bean
@ConditionalOnMissingBean(ActorEventChannel.class)
public ActorEventChannel eventChannel(SpecSagaAkkaProperties specSagaAkkaProperties,
MetricsService metricsService) {
if (specSagaAkkaProperties.getChannel().getName().equals("memory")) {
return new MemoryActorEventChannel(metricsService,
specSagaAkkaProperties.getChannel().getMemory().getMaxLength());
} else {
return new MemoryActorEventChannel(metricsService,
specSagaAkkaProperties.getChannel().getMemory().getMaxLength());
}
}
@Bean
AbstractEventConsumer eventConsumer(SpecSagaAkkaProperties specSagaAkkaProperties,
ActorSystem actorSystem,
@Qualifier("sagaShardRegionActor") ActorRef sagaShardRegionActor,
MetricsService metricsService,
ActorEventChannel actorEventChannel) {
if (specSagaAkkaProperties.getChannel().getName().equals("memory")) {
return new MemorySagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService,
(MemoryActorEventChannel) actorEventChannel);
} else {
return new MemorySagaEventConsumer(actorSystem, sagaShardRegionActor, metricsService,
(MemoryActorEventChannel) actorEventChannel);
}
}
@Bean
GrpcSagaEventService grpcSagaEventService(ActorEventChannel actorEventChannel, Map<String, Map<String, OmegaCallback>> omegaCallbacks){
ServerMeta serverMeta = ServerMeta.newBuilder()
.putMeta(AlphaMetaKeys.AkkaEnabled.name(), String.valueOf(true)).build();
return new GrpcSagaEventService(actorEventChannel, omegaCallbacks, serverMeta);
}
@Bean
APIv1Impl apIv1(){
return new APIv1Impl();
}
@Bean
APIv1Controller apIv1Controller(){
return new APIv1Controller();
}
@Bean
AlphaMetricsEndpoint alphaMetricsEndpoint(){
return new AlphaMetricsEndpointImpl();
}
@Bean
@Profile("test")
FsmSagaDataController fsmSagaDataController(){
return new FsmSagaDataController();
}
}