| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.dubbo.config.deploy; |
| |
| import org.apache.dubbo.common.URL; |
| import org.apache.dubbo.common.config.ConfigurationUtils; |
| import org.apache.dubbo.common.config.Environment; |
| import org.apache.dubbo.common.config.ReferenceCache; |
| import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; |
| import org.apache.dubbo.common.config.configcenter.DynamicConfigurationFactory; |
| import org.apache.dubbo.common.config.configcenter.wrapper.CompositeDynamicConfiguration; |
| import org.apache.dubbo.common.deploy.AbstractDeployer; |
| import org.apache.dubbo.common.deploy.ApplicationDeployListener; |
| import org.apache.dubbo.common.deploy.ApplicationDeployer; |
| import org.apache.dubbo.common.deploy.DeployListener; |
| import org.apache.dubbo.common.deploy.DeployState; |
| import org.apache.dubbo.common.deploy.ModuleDeployer; |
| import org.apache.dubbo.common.extension.ExtensionLoader; |
| import org.apache.dubbo.common.lang.ShutdownHookCallbacks; |
| import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; |
| import org.apache.dubbo.common.logger.LoggerFactory; |
| import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; |
| import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository; |
| import org.apache.dubbo.common.utils.ArrayUtils; |
| import org.apache.dubbo.common.utils.CollectionUtils; |
| import org.apache.dubbo.common.utils.StringUtils; |
| import org.apache.dubbo.config.ApplicationConfig; |
| import org.apache.dubbo.config.ConfigCenterConfig; |
| import org.apache.dubbo.config.DubboShutdownHook; |
| import org.apache.dubbo.config.MetadataReportConfig; |
| import org.apache.dubbo.config.RegistryConfig; |
| import org.apache.dubbo.config.context.ConfigManager; |
| import org.apache.dubbo.config.utils.CompositeReferenceCache; |
| import org.apache.dubbo.config.utils.ConfigValidationUtils; |
| import org.apache.dubbo.metadata.report.MetadataReportFactory; |
| import org.apache.dubbo.metadata.report.MetadataReportInstance; |
| import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils; |
| import org.apache.dubbo.registry.support.RegistryManager; |
| import org.apache.dubbo.rpc.model.ApplicationModel; |
| import org.apache.dubbo.rpc.model.ModuleModel; |
| import org.apache.dubbo.rpc.model.ScopeModel; |
| import org.apache.dubbo.rpc.model.ScopeModelUtil; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.function.Supplier; |
| |
| import static java.lang.String.format; |
| import static org.apache.dubbo.common.config.ConfigurationUtils.parseProperties; |
| import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_SPLIT_PATTERN; |
| import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE; |
| import static org.apache.dubbo.common.constants.LoggerCodeConstants.CONFIG_REFRESH_INSTANCE_ERROR; |
| import static org.apache.dubbo.common.constants.LoggerCodeConstants.CONFIG_REGISTER_INSTANCE_ERROR; |
| import static org.apache.dubbo.common.utils.StringUtils.isEmpty; |
| import static org.apache.dubbo.common.utils.StringUtils.isNotEmpty; |
| import static org.apache.dubbo.metadata.MetadataConstants.DEFAULT_METADATA_PUBLISH_DELAY; |
| import static org.apache.dubbo.metadata.MetadataConstants.METADATA_PUBLISH_DELAY_KEY; |
| import static org.apache.dubbo.remoting.Constants.CLIENT_KEY; |
| |
| /** |
| * initialize and start application instance |
| */ |
| public class DefaultApplicationDeployer extends AbstractDeployer<ApplicationModel> implements ApplicationDeployer { |
| |
| private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(DefaultApplicationDeployer.class); |
| |
| private final ApplicationModel applicationModel; |
| |
| private final ConfigManager configManager; |
| |
| private final Environment environment; |
| |
| private final ReferenceCache referenceCache; |
| |
| private final FrameworkExecutorRepository frameworkExecutorRepository; |
| private final ExecutorRepository executorRepository; |
| |
| private final AtomicBoolean hasPreparedApplicationInstance = new AtomicBoolean(false); |
| private volatile boolean hasPreparedInternalModule = false; |
| |
| private ScheduledFuture<?> asyncMetadataFuture; |
| private volatile CompletableFuture<Boolean> startFuture; |
| private final DubboShutdownHook dubboShutdownHook; |
| private final Object stateLock = new Object(); |
| private final Object startLock = new Object(); |
| private final Object destroyLock = new Object(); |
| private final Object internalModuleLock = new Object(); |
| |
| public DefaultApplicationDeployer(ApplicationModel applicationModel) { |
| super(applicationModel); |
| this.applicationModel = applicationModel; |
| configManager = applicationModel.getApplicationConfigManager(); |
| environment = applicationModel.getModelEnvironment(); |
| |
| referenceCache = new CompositeReferenceCache(applicationModel); |
| frameworkExecutorRepository = applicationModel.getFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class); |
| executorRepository = getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); |
| dubboShutdownHook = new DubboShutdownHook(applicationModel); |
| |
| // load spi listener |
| Set<ApplicationDeployListener> deployListeners = applicationModel.getExtensionLoader(ApplicationDeployListener.class) |
| .getSupportedExtensionInstances(); |
| for (ApplicationDeployListener listener : deployListeners) { |
| this.addDeployListener(listener); |
| } |
| } |
| |
| public static ApplicationDeployer get(ScopeModel moduleOrApplicationModel) { |
| ApplicationModel applicationModel = ScopeModelUtil.getApplicationModel(moduleOrApplicationModel); |
| ApplicationDeployer applicationDeployer = applicationModel.getDeployer(); |
| if (applicationDeployer == null) { |
| applicationDeployer = applicationModel.getBeanFactory().getOrRegisterBean(DefaultApplicationDeployer.class); |
| } |
| return applicationDeployer; |
| } |
| |
| @Override |
| public ApplicationModel getApplicationModel() { |
| return applicationModel; |
| } |
| |
| private <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { |
| return applicationModel.getExtensionLoader(type); |
| } |
| |
| private void unRegisterShutdownHook() { |
| dubboShutdownHook.unregister(); |
| } |
| |
| /** |
| * Close registration of instance for pure Consumer process by setting registerConsumer to 'false' |
| * by default is true. |
| */ |
| private boolean isRegisterConsumerInstance() { |
| Boolean registerConsumer = getApplication().getRegisterConsumer(); |
| if (registerConsumer == null) { |
| return true; |
| } |
| return Boolean.TRUE.equals(registerConsumer); |
| } |
| |
| @Override |
| public ReferenceCache getReferenceCache() { |
| return referenceCache; |
| } |
| |
| /** |
| * Initialize |
| */ |
| @Override |
| public void initialize() { |
| if (initialized) { |
| return; |
| } |
| // Ensure that the initialization is completed when concurrent calls |
| synchronized (startLock) { |
| if (initialized) { |
| return; |
| } |
| // register shutdown hook |
| registerShutdownHook(); |
| |
| startConfigCenter(); |
| |
| loadApplicationConfigs(); |
| |
| initModuleDeployers(); |
| |
| // @since 2.7.8 |
| startMetadataCenter(); |
| |
| initialized = true; |
| |
| if (logger.isInfoEnabled()) { |
| logger.info(getIdentifier() + " has been initialized!"); |
| } |
| } |
| } |
| |
| private void registerShutdownHook() { |
| dubboShutdownHook.register(); |
| } |
| |
| private void initModuleDeployers() { |
| // make sure created default module |
| applicationModel.getDefaultModule(); |
| // copy modules and initialize avoid ConcurrentModificationException if add new module |
| List<ModuleModel> moduleModels = new ArrayList<>(applicationModel.getModuleModels()); |
| for (ModuleModel moduleModel : moduleModels) { |
| moduleModel.getDeployer().initialize(); |
| } |
| } |
| |
| private void loadApplicationConfigs() { |
| configManager.loadConfigs(); |
| } |
| |
| private void startConfigCenter() { |
| |
| // load application config |
| configManager.loadConfigsOfTypeFromProps(ApplicationConfig.class); |
| |
| // try set model name |
| if (StringUtils.isBlank(applicationModel.getModelName())) { |
| applicationModel.setModelName(applicationModel.tryGetApplicationName()); |
| } |
| |
| // load config centers |
| configManager.loadConfigsOfTypeFromProps(ConfigCenterConfig.class); |
| |
| useRegistryAsConfigCenterIfNecessary(); |
| |
| // check Config Center |
| Collection<ConfigCenterConfig> configCenters = configManager.getConfigCenters(); |
| if (CollectionUtils.isEmpty(configCenters)) { |
| ConfigCenterConfig configCenterConfig = new ConfigCenterConfig(); |
| configCenterConfig.setScopeModel(applicationModel); |
| configCenterConfig.refresh(); |
| ConfigValidationUtils.validateConfigCenterConfig(configCenterConfig); |
| if (configCenterConfig.isValid()) { |
| configManager.addConfigCenter(configCenterConfig); |
| configCenters = configManager.getConfigCenters(); |
| } |
| } else { |
| for (ConfigCenterConfig configCenterConfig : configCenters) { |
| configCenterConfig.refresh(); |
| ConfigValidationUtils.validateConfigCenterConfig(configCenterConfig); |
| } |
| } |
| |
| if (CollectionUtils.isNotEmpty(configCenters)) { |
| CompositeDynamicConfiguration compositeDynamicConfiguration = new CompositeDynamicConfiguration(); |
| for (ConfigCenterConfig configCenter : configCenters) { |
| // Pass config from ConfigCenterBean to environment |
| environment.updateExternalConfigMap(configCenter.getExternalConfiguration()); |
| environment.updateAppExternalConfigMap(configCenter.getAppExternalConfiguration()); |
| |
| // Fetch config from remote config center |
| compositeDynamicConfiguration.addConfiguration(prepareEnvironment(configCenter)); |
| } |
| environment.setDynamicConfiguration(compositeDynamicConfiguration); |
| } |
| } |
| |
| private void startMetadataCenter() { |
| |
| useRegistryAsMetadataCenterIfNecessary(); |
| |
| ApplicationConfig applicationConfig = getApplication(); |
| |
| String metadataType = applicationConfig.getMetadataType(); |
| // FIXME, multiple metadata config support. |
| Collection<MetadataReportConfig> metadataReportConfigs = configManager.getMetadataConfigs(); |
| if (CollectionUtils.isEmpty(metadataReportConfigs)) { |
| if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) { |
| throw new IllegalStateException("No MetadataConfig found, Metadata Center address is required when 'metadata=remote' is enabled."); |
| } |
| return; |
| } |
| |
| MetadataReportInstance metadataReportInstance = applicationModel.getBeanFactory().getBean(MetadataReportInstance.class); |
| List<MetadataReportConfig> validMetadataReportConfigs = new ArrayList<>(metadataReportConfigs.size()); |
| for (MetadataReportConfig metadataReportConfig : metadataReportConfigs) { |
| ConfigValidationUtils.validateMetadataConfig(metadataReportConfig); |
| validMetadataReportConfigs.add(metadataReportConfig); |
| } |
| metadataReportInstance.init(validMetadataReportConfigs); |
| if (!metadataReportInstance.inited()) { |
| throw new IllegalStateException(String.format("%s MetadataConfigs found, but none of them is valid.", metadataReportConfigs.size())); |
| } |
| } |
| |
| /** |
| * For compatibility purpose, use registry as the default config center when |
| * there's no config center specified explicitly and |
| * useAsConfigCenter of registryConfig is null or true |
| */ |
| private void useRegistryAsConfigCenterIfNecessary() { |
| // we use the loading status of DynamicConfiguration to decide whether ConfigCenter has been initiated. |
| if (environment.getDynamicConfiguration().isPresent()) { |
| return; |
| } |
| |
| if (CollectionUtils.isNotEmpty(configManager.getConfigCenters())) { |
| return; |
| } |
| |
| // load registry |
| configManager.loadConfigsOfTypeFromProps(RegistryConfig.class); |
| |
| List<RegistryConfig> defaultRegistries = configManager.getDefaultRegistries(); |
| if (defaultRegistries.size() > 0) { |
| defaultRegistries |
| .stream() |
| .filter(this::isUsedRegistryAsConfigCenter) |
| .map(this::registryAsConfigCenter) |
| .forEach(configCenter -> { |
| if (configManager.getConfigCenter(configCenter.getId()).isPresent()) { |
| return; |
| } |
| configManager.addConfigCenter(configCenter); |
| logger.info("use registry as config-center: " + configCenter); |
| |
| }); |
| } |
| } |
| |
| private boolean isUsedRegistryAsConfigCenter(RegistryConfig registryConfig) { |
| return isUsedRegistryAsCenter(registryConfig, registryConfig::getUseAsConfigCenter, "config", |
| DynamicConfigurationFactory.class); |
| } |
| |
| private ConfigCenterConfig registryAsConfigCenter(RegistryConfig registryConfig) { |
| String protocol = registryConfig.getProtocol(); |
| Integer port = registryConfig.getPort(); |
| URL url = URL.valueOf(registryConfig.getAddress(), registryConfig.getScopeModel()); |
| String id = "config-center-" + protocol + "-" + url.getHost() + "-" + port; |
| ConfigCenterConfig cc = new ConfigCenterConfig(); |
| cc.setId(id); |
| cc.setScopeModel(applicationModel); |
| if (cc.getParameters() == null) { |
| cc.setParameters(new HashMap<>()); |
| } |
| if (CollectionUtils.isNotEmptyMap(registryConfig.getParameters())) { |
| cc.getParameters().putAll(registryConfig.getParameters()); // copy the parameters |
| } |
| cc.getParameters().put(CLIENT_KEY, registryConfig.getClient()); |
| cc.setProtocol(protocol); |
| cc.setPort(port); |
| if (StringUtils.isNotEmpty(registryConfig.getGroup())) { |
| cc.setGroup(registryConfig.getGroup()); |
| } |
| cc.setAddress(getRegistryCompatibleAddress(registryConfig)); |
| cc.setNamespace(registryConfig.getGroup()); |
| cc.setUsername(registryConfig.getUsername()); |
| cc.setPassword(registryConfig.getPassword()); |
| if (registryConfig.getTimeout() != null) { |
| cc.setTimeout(registryConfig.getTimeout().longValue()); |
| } |
| cc.setHighestPriority(false); |
| return cc; |
| } |
| |
| private void useRegistryAsMetadataCenterIfNecessary() { |
| |
| Collection<MetadataReportConfig> metadataConfigs = configManager.getMetadataConfigs(); |
| |
| if (CollectionUtils.isNotEmpty(metadataConfigs)) { |
| return; |
| } |
| |
| List<RegistryConfig> defaultRegistries = configManager.getDefaultRegistries(); |
| if (defaultRegistries.size() > 0) { |
| defaultRegistries |
| .stream() |
| .filter(this::isUsedRegistryAsMetadataCenter) |
| .map(this::registryAsMetadataCenter) |
| .forEach(metadataReportConfig -> { |
| if (metadataReportConfig.getId() == null) { |
| Collection<MetadataReportConfig> metadataReportConfigs = configManager.getMetadataConfigs(); |
| if (CollectionUtils.isNotEmpty(metadataReportConfigs)) { |
| for (MetadataReportConfig existedConfig : metadataReportConfigs) { |
| if (existedConfig.getId() == null && existedConfig.getAddress().equals(metadataReportConfig.getAddress())) { |
| return; |
| } |
| } |
| } |
| configManager.addMetadataReport(metadataReportConfig); |
| } else { |
| Optional<MetadataReportConfig> configOptional = configManager.getConfig(MetadataReportConfig.class, metadataReportConfig.getId()); |
| if (configOptional.isPresent()) { |
| return; |
| } |
| configManager.addMetadataReport(metadataReportConfig); |
| } |
| logger.info("use registry as metadata-center: " + metadataReportConfig); |
| }); |
| } |
| } |
| |
| private boolean isUsedRegistryAsMetadataCenter(RegistryConfig registryConfig) { |
| return isUsedRegistryAsCenter(registryConfig, registryConfig::getUseAsMetadataCenter, "metadata", |
| MetadataReportFactory.class); |
| } |
| |
| /** |
| * Is used the specified registry as a center infrastructure |
| * |
| * @param registryConfig the {@link RegistryConfig} |
| * @param usedRegistryAsCenter the configured value on |
| * @param centerType the type name of center |
| * @param extensionClass an extension class of a center infrastructure |
| * @return |
| * @since 2.7.8 |
| */ |
| private boolean isUsedRegistryAsCenter(RegistryConfig registryConfig, Supplier<Boolean> usedRegistryAsCenter, |
| String centerType, |
| Class<?> extensionClass) { |
| final boolean supported; |
| |
| Boolean configuredValue = usedRegistryAsCenter.get(); |
| if (configuredValue != null) { // If configured, take its value. |
| supported = configuredValue.booleanValue(); |
| } else { // Or check the extension existence |
| String protocol = registryConfig.getProtocol(); |
| supported = supportsExtension(extensionClass, protocol); |
| if (logger.isInfoEnabled()) { |
| logger.info(format("No value is configured in the registry, the %s extension[name : %s] %s as the %s center" |
| , extensionClass.getSimpleName(), protocol, supported ? "supports" : "does not support", centerType)); |
| } |
| } |
| |
| if (logger.isInfoEnabled()) { |
| logger.info(format("The registry[%s] will be %s as the %s center", registryConfig, |
| supported ? "used" : "not used", centerType)); |
| } |
| return supported; |
| } |
| |
| /** |
| * Supports the extension with the specified class and name |
| * |
| * @param extensionClass the {@link Class} of extension |
| * @param name the name of extension |
| * @return if supports, return <code>true</code>, or <code>false</code> |
| * @since 2.7.8 |
| */ |
| private boolean supportsExtension(Class<?> extensionClass, String name) { |
| if (isNotEmpty(name)) { |
| ExtensionLoader extensionLoader = getExtensionLoader(extensionClass); |
| return extensionLoader.hasExtension(name); |
| } |
| return false; |
| } |
| |
| private MetadataReportConfig registryAsMetadataCenter(RegistryConfig registryConfig) { |
| String protocol = registryConfig.getProtocol(); |
| URL url = URL.valueOf(registryConfig.getAddress(), registryConfig.getScopeModel()); |
| MetadataReportConfig metadataReportConfig = new MetadataReportConfig(); |
| metadataReportConfig.setId(registryConfig.getId()); |
| metadataReportConfig.setScopeModel(applicationModel); |
| if (metadataReportConfig.getParameters() == null) { |
| metadataReportConfig.setParameters(new HashMap<>()); |
| } |
| if (CollectionUtils.isNotEmptyMap(registryConfig.getParameters())) { |
| metadataReportConfig.getParameters().putAll(registryConfig.getParameters()); // copy the parameters |
| } |
| metadataReportConfig.getParameters().put(CLIENT_KEY, registryConfig.getClient()); |
| metadataReportConfig.setGroup(registryConfig.getGroup()); |
| metadataReportConfig.setAddress(getRegistryCompatibleAddress(registryConfig)); |
| metadataReportConfig.setUsername(registryConfig.getUsername()); |
| metadataReportConfig.setPassword(registryConfig.getPassword()); |
| metadataReportConfig.setTimeout(registryConfig.getTimeout()); |
| return metadataReportConfig; |
| } |
| |
| private String getRegistryCompatibleAddress(RegistryConfig registryConfig) { |
| String registryAddress = registryConfig.getAddress(); |
| String[] addresses = REGISTRY_SPLIT_PATTERN.split(registryAddress); |
| if (ArrayUtils.isEmpty(addresses)) { |
| throw new IllegalStateException("Invalid registry address found."); |
| } |
| String address = addresses[0]; |
| // since 2.7.8 |
| // Issue : https://github.com/apache/dubbo/issues/6476 |
| StringBuilder metadataAddressBuilder = new StringBuilder(); |
| URL url = URL.valueOf(address, registryConfig.getScopeModel()); |
| String protocolFromAddress = url.getProtocol(); |
| if (isEmpty(protocolFromAddress)) { |
| // If the protocol from address is missing, is like : |
| // "dubbo.registry.address = 127.0.0.1:2181" |
| String protocolFromConfig = registryConfig.getProtocol(); |
| metadataAddressBuilder.append(protocolFromConfig).append("://"); |
| } |
| metadataAddressBuilder.append(address); |
| return metadataAddressBuilder.toString(); |
| } |
| |
| /** |
| * Start the bootstrap |
| * |
| * @return |
| */ |
| @Override |
| public Future start() { |
| synchronized (startLock) { |
| if (isStopping() || isStopped() || isFailed()) { |
| throw new IllegalStateException(getIdentifier() + " is stopping or stopped, can not start again"); |
| } |
| |
| try { |
| // maybe call start again after add new module, check if any new module |
| boolean hasPendingModule = hasPendingModule(); |
| |
| if (isStarting()) { |
| // currently, is starting, maybe both start by module and application |
| // if it has new modules, start them |
| if (hasPendingModule) { |
| startModules(); |
| } |
| // if it is starting, reuse previous startFuture |
| return startFuture; |
| } |
| |
| // if is started and no new module, just return |
| if (isStarted() && !hasPendingModule) { |
| return CompletableFuture.completedFuture(false); |
| } |
| |
| // pending -> starting : first start app |
| // started -> starting : re-start app |
| onStarting(); |
| |
| initialize(); |
| |
| doStart(); |
| } catch (Throwable e) { |
| onFailed(getIdentifier() + " start failure", e); |
| throw e; |
| } |
| |
| return startFuture; |
| } |
| } |
| |
| private boolean hasPendingModule() { |
| boolean found = false; |
| for (ModuleModel moduleModel : applicationModel.getModuleModels()) { |
| if (moduleModel.getDeployer().isPending()) { |
| found = true; |
| break; |
| } |
| } |
| return found; |
| } |
| |
| @Override |
| public Future getStartFuture() { |
| return startFuture; |
| } |
| |
| private void doStart() { |
| startModules(); |
| |
| // prepare application instance |
| // prepareApplicationInstance(); |
| |
| // Ignore checking new module after start |
| // executorRepository.getSharedExecutor().submit(() -> { |
| // try { |
| // while (isStarting()) { |
| // // notify when any module state changed |
| // synchronized (stateLock) { |
| // try { |
| // stateLock.wait(500); |
| // } catch (InterruptedException e) { |
| // // ignore |
| // } |
| // } |
| // |
| // // if has new module, do start again |
| // if (hasPendingModule()) { |
| // startModules(); |
| // } |
| // } |
| // } catch (Throwable e) { |
| // onFailed(getIdentifier() + " check start occurred an exception", e); |
| // } |
| // }); |
| } |
| |
| private void startModules() { |
| // ensure init and start internal module first |
| prepareInternalModule(); |
| |
| // filter and start pending modules, ignore new module during starting, throw exception of module start |
| for (ModuleModel moduleModel : new ArrayList<>(applicationModel.getModuleModels())) { |
| if (moduleModel.getDeployer().isPending()) { |
| moduleModel.getDeployer().start(); |
| } |
| } |
| } |
| |
| @Override |
| public void prepareApplicationInstance() { |
| if (hasPreparedApplicationInstance.get()) { |
| return; |
| } |
| |
| if (isRegisterConsumerInstance()) { |
| exportMetadataService(); |
| if (hasPreparedApplicationInstance.compareAndSet(false, true)) { |
| // register the local ServiceInstance if required |
| registerServiceInstance(); |
| } |
| } |
| } |
| |
| public void prepareInternalModule() { |
| if(hasPreparedInternalModule){ |
| return; |
| } |
| synchronized (internalModuleLock) { |
| if (hasPreparedInternalModule) { |
| return; |
| } |
| |
| // start internal module |
| ModuleDeployer internalModuleDeployer = applicationModel.getInternalModule().getDeployer(); |
| if (!internalModuleDeployer.isStarted()) { |
| Future future = internalModuleDeployer.start(); |
| // wait for internal module startup |
| try { |
| future.get(5, TimeUnit.SECONDS); |
| } catch (Exception e) { |
| logger.warn("wait for internal module startup failed: " + e.getMessage(), e); |
| } |
| } |
| } |
| } |
| |
| private boolean hasExportedServices() { |
| for (ModuleModel moduleModel : applicationModel.getModuleModels()) { |
| if (CollectionUtils.isNotEmpty(moduleModel.getConfigManager().getServices())) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean isBackground() { |
| for (ModuleModel moduleModel : applicationModel.getModuleModels()) { |
| if (moduleModel.getDeployer().isBackground()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private DynamicConfiguration prepareEnvironment(ConfigCenterConfig configCenter) { |
| if (configCenter.isValid()) { |
| if (!configCenter.checkOrUpdateInitialized(true)) { |
| return null; |
| } |
| |
| DynamicConfiguration dynamicConfiguration; |
| try { |
| dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl()); |
| } catch (Exception e) { |
| if (!configCenter.isCheck()) { |
| logger.warn("The configuration center failed to initialize", e); |
| configCenter.setInitialized(false); |
| return null; |
| } else { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| if (StringUtils.isNotEmpty(configCenter.getConfigFile())) { |
| String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup()); |
| String appGroup = getApplication().getName(); |
| String appConfigContent = null; |
| if (isNotEmpty(appGroup)) { |
| appConfigContent = dynamicConfiguration.getProperties |
| (isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(), |
| appGroup |
| ); |
| } |
| try { |
| environment.updateExternalConfigMap(parseProperties(configContent)); |
| environment.updateAppExternalConfigMap(parseProperties(appConfigContent)); |
| } catch (IOException e) { |
| throw new IllegalStateException("Failed to parse configurations from Config Center.", e); |
| } |
| } |
| return dynamicConfiguration; |
| } |
| return null; |
| } |
| |
| /** |
| * Get the instance of {@link DynamicConfiguration} by the specified connection {@link URL} of config-center |
| * |
| * @param connectionURL of config-center |
| * @return non-null |
| * @since 2.7.5 |
| */ |
| private DynamicConfiguration getDynamicConfiguration(URL connectionURL) { |
| String protocol = connectionURL.getProtocol(); |
| |
| DynamicConfigurationFactory factory = ConfigurationUtils.getDynamicConfigurationFactory(applicationModel, protocol); |
| return factory.getDynamicConfiguration(connectionURL); |
| } |
| |
| private volatile boolean registered; |
| |
| private void registerServiceInstance() { |
| try { |
| registered = true; |
| ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel); |
| } catch (Exception e) { |
| logger.error(CONFIG_REGISTER_INSTANCE_ERROR, "configuration server disconnected", "", "Register instance error.", e); |
| } |
| if (registered) { |
| // scheduled task for updating Metadata and ServiceInstance |
| asyncMetadataFuture = frameworkExecutorRepository.getSharedScheduledExecutor().scheduleWithFixedDelay(() -> { |
| |
| // ignore refresh metadata on stopping |
| if (applicationModel.isDestroyed()) { |
| return; |
| } |
| try { |
| if (!applicationModel.isDestroyed() && registered) { |
| ServiceInstanceMetadataUtils.refreshMetadataAndInstance(applicationModel); |
| } |
| } catch (Exception e) { |
| if (!applicationModel.isDestroyed()) { |
| logger.error(CONFIG_REFRESH_INSTANCE_ERROR, "", "", "Refresh instance and metadata error.", e); |
| } |
| } |
| }, 0, ConfigurationUtils.get(applicationModel, METADATA_PUBLISH_DELAY_KEY, DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| private void unregisterServiceInstance() { |
| if (registered) { |
| ServiceInstanceMetadataUtils.unregisterMetadataAndInstance(applicationModel); |
| } |
| } |
| |
| @Override |
| public void stop() { |
| applicationModel.destroy(); |
| } |
| |
| @Override |
| public void preDestroy() { |
| synchronized (destroyLock) { |
| if (isStopping() || isStopped()) { |
| return; |
| } |
| onStopping(); |
| |
| unregisterServiceInstance(); |
| destroyRegistries(); |
| destroyMetadataReports(); |
| |
| unRegisterShutdownHook(); |
| if (asyncMetadataFuture != null) { |
| asyncMetadataFuture.cancel(true); |
| } |
| } |
| } |
| |
| @Override |
| public void postDestroy() { |
| synchronized (destroyLock) { |
| // expect application model is destroyed before here |
| if (isStopped()) { |
| return; |
| } |
| try { |
| executeShutdownCallbacks(); |
| |
| // TODO should we close unused protocol server which only used by this application? |
| // protocol server will be closed on all applications of same framework are stopped currently, but no associate to application |
| // see org.apache.dubbo.config.deploy.FrameworkModelCleaner#destroyProtocols |
| // see org.apache.dubbo.config.bootstrap.DubboBootstrapMultiInstanceTest#testMultiProviderApplicationStopOneByOne |
| |
| // destroy all executor services |
| destroyExecutorRepository(); |
| |
| onStopped(); |
| } catch (Throwable ex) { |
| String msg = getIdentifier() + " an error occurred while stopping application: " + ex.getMessage(); |
| onFailed(msg, ex); |
| } |
| } |
| } |
| |
| private void executeShutdownCallbacks() { |
| ShutdownHookCallbacks shutdownHookCallbacks = applicationModel.getBeanFactory().getBean(ShutdownHookCallbacks.class); |
| shutdownHookCallbacks.callback(); |
| } |
| |
| @Override |
| public void notifyModuleChanged(ModuleModel moduleModel, DeployState state) { |
| checkState(moduleModel, state); |
| |
| // notify module state changed or module changed |
| synchronized (stateLock) { |
| stateLock.notifyAll(); |
| } |
| } |
| |
| @Override |
| public void checkState(ModuleModel moduleModel, DeployState moduleState) { |
| synchronized (stateLock) { |
| if (!moduleModel.isInternal() && moduleState == DeployState.STARTED) { |
| prepareApplicationInstance(); |
| } |
| DeployState newState = calculateState(); |
| switch (newState) { |
| case STARTED: |
| onStarted(); |
| break; |
| case STARTING: |
| onStarting(); |
| break; |
| case STOPPING: |
| onStopping(); |
| break; |
| case STOPPED: |
| onStopped(); |
| break; |
| case FAILED: |
| Throwable error = null; |
| ModuleModel errorModule = null; |
| for (ModuleModel module : applicationModel.getModuleModels()) { |
| ModuleDeployer deployer = module.getDeployer(); |
| if (deployer.isFailed() && deployer.getError() != null) { |
| error = deployer.getError(); |
| errorModule = module; |
| break; |
| } |
| } |
| onFailed(getIdentifier() + " found failed module: " + errorModule.getDesc(), error); |
| break; |
| case PENDING: |
| // cannot change to pending from other state |
| // setPending(); |
| break; |
| } |
| } |
| } |
| |
| private DeployState calculateState() { |
| DeployState newState = DeployState.UNKNOWN; |
| int pending = 0, starting = 0, started = 0, stopping = 0, stopped = 0, failed = 0; |
| for (ModuleModel moduleModel : applicationModel.getModuleModels()) { |
| ModuleDeployer deployer = moduleModel.getDeployer(); |
| if (deployer == null) { |
| pending++; |
| } else if (deployer.isPending()) { |
| pending++; |
| } else if (deployer.isStarting()) { |
| starting++; |
| } else if (deployer.isStarted()) { |
| started++; |
| } else if (deployer.isStopping()) { |
| stopping++; |
| } else if (deployer.isStopped()) { |
| stopped++; |
| } else if (deployer.isFailed()) { |
| failed++; |
| } |
| } |
| |
| if (failed > 0) { |
| newState = DeployState.FAILED; |
| } else if (started > 0) { |
| if (pending + starting + stopping + stopped == 0) { |
| // all modules have been started |
| newState = DeployState.STARTED; |
| } else if (pending + starting > 0) { |
| // some module is pending and some is started |
| newState = DeployState.STARTING; |
| } else if (stopping + stopped > 0) { |
| newState = DeployState.STOPPING; |
| } |
| } else if (starting > 0) { |
| // any module is starting |
| newState = DeployState.STARTING; |
| } else if (pending > 0) { |
| if (starting + starting + stopping + stopped == 0) { |
| // all modules have not starting or started |
| newState = DeployState.PENDING; |
| } else if (stopping + stopped > 0) { |
| // some is pending and some is stopping or stopped |
| newState = DeployState.STOPPING; |
| } |
| } else if (stopping > 0) { |
| // some is stopping and some stopped |
| newState = DeployState.STOPPING; |
| } else if (stopped > 0) { |
| // all modules are stopped |
| newState = DeployState.STOPPED; |
| } |
| return newState; |
| } |
| |
| private void exportMetadataService() { |
| if (!isStarting()) { |
| return; |
| } |
| for (DeployListener<ApplicationModel> listener : listeners) { |
| try { |
| if (listener instanceof ApplicationDeployListener) { |
| ((ApplicationDeployListener) listener).onModuleStarted(applicationModel); |
| } |
| } catch (Throwable e) { |
| logger.error(getIdentifier() + " an exception occurred when handle starting event", e); |
| } |
| } |
| } |
| |
| private void onStarting() { |
| // pending -> starting |
| // started -> starting |
| if (!(isPending() || isStarted())) { |
| return; |
| } |
| setStarting(); |
| startFuture = new CompletableFuture(); |
| if (logger.isInfoEnabled()) { |
| logger.info(getIdentifier() + " is starting."); |
| } |
| } |
| |
| private void onStarted() { |
| try { |
| // starting -> started |
| if (!isStarting()) { |
| return; |
| } |
| setStarted(); |
| if (logger.isInfoEnabled()) { |
| logger.info(getIdentifier() + " is ready."); |
| } |
| // refresh metadata |
| try { |
| if (registered) { |
| ServiceInstanceMetadataUtils.refreshMetadataAndInstance(applicationModel); |
| } |
| } catch (Exception e) { |
| logger.error(CONFIG_REFRESH_INSTANCE_ERROR, "", "", "Refresh instance and metadata error.", e); |
| } |
| } finally { |
| // complete future |
| completeStartFuture(true); |
| } |
| } |
| |
| private void completeStartFuture(boolean success) { |
| if (startFuture != null) { |
| startFuture.complete(success); |
| } |
| } |
| |
| private void onStopping() { |
| try { |
| if (isStopping() || isStopped()) { |
| return; |
| } |
| setStopping(); |
| if (logger.isInfoEnabled()) { |
| logger.info(getIdentifier() + " is stopping."); |
| } |
| } finally { |
| completeStartFuture(false); |
| } |
| } |
| |
| private void onStopped() { |
| try { |
| if (isStopped()) { |
| return; |
| } |
| setStopped(); |
| if (logger.isInfoEnabled()) { |
| logger.info(getIdentifier() + " has stopped."); |
| } |
| } finally { |
| completeStartFuture(false); |
| } |
| } |
| |
| private void onFailed(String msg, Throwable ex) { |
| try { |
| setFailed(ex); |
| logger.error(msg, ex); |
| } finally { |
| completeStartFuture(false); |
| } |
| } |
| |
| private void destroyExecutorRepository() { |
| // shutdown export/refer executor |
| executorRepository.shutdownServiceExportExecutor(); |
| executorRepository.shutdownServiceReferExecutor(); |
| getExtensionLoader(ExecutorRepository.class).getDefaultExtension().destroyAll(); |
| } |
| |
| private void destroyRegistries() { |
| RegistryManager.getInstance(applicationModel).destroyAll(); |
| } |
| |
| private void destroyServiceDiscoveries() { |
| RegistryManager.getInstance(applicationModel).getServiceDiscoveries().forEach(serviceDiscovery -> { |
| try { |
| serviceDiscovery.destroy(); |
| } catch (Throwable ignored) { |
| logger.warn(ignored.getMessage(), ignored); |
| } |
| }); |
| if (logger.isDebugEnabled()) { |
| logger.debug(getIdentifier() + "'s all ServiceDiscoveries have been destroyed."); |
| } |
| } |
| |
| private void destroyMetadataReports() { |
| // only destroy MetadataReport of this application |
| List<MetadataReportFactory> metadataReportFactories = getExtensionLoader(MetadataReportFactory.class).getLoadedExtensionInstances(); |
| for (MetadataReportFactory metadataReportFactory : metadataReportFactories) { |
| metadataReportFactory.destroy(); |
| } |
| } |
| |
| private ApplicationConfig getApplication() { |
| return configManager.getApplicationOrElseThrow(); |
| } |
| |
| } |