blob: 3abbca27799bbfe9e0886e2d0c7e40f00d70a369 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.skywalking.oap.server.ai.pipeline.AIPipelineModule;
import org.apache.skywalking.oap.server.ai.pipeline.services.api.HttpUriRecognition;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.core.analysis.ApdexThresholdConfig;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.metrics.ApdexMetrics;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.OAPNodeChecker;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
import org.apache.skywalking.oap.server.core.config.HierarchyDefinitionService;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
import org.apache.skywalking.oap.server.core.config.group.EndpointNameGroupingRuleWatcher;
import org.apache.skywalking.oap.server.core.config.group.openapi.EndpointNameGroupingRule4OpenapiWatcher;
import org.apache.skywalking.oap.server.core.hierarchy.HierarchyService;
import org.apache.skywalking.oap.server.core.logging.LoggingConfigWatcher;
import org.apache.skywalking.oap.server.core.management.ui.menu.UIMenuInitializer;
import org.apache.skywalking.oap.server.core.management.ui.menu.UIMenuManagementService;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplateInitializer;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplateManagementService;
import org.apache.skywalking.oap.server.core.oal.rt.DisableOALDefine;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
import org.apache.skywalking.oap.server.core.profiling.continuous.ContinuousProfilingMutationService;
import org.apache.skywalking.oap.server.core.profiling.continuous.ContinuousProfilingQueryService;
import org.apache.skywalking.oap.server.core.profiling.ebpf.EBPFProfilingMutationService;
import org.apache.skywalking.oap.server.core.profiling.ebpf.EBPFProfilingQueryService;
import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskMutationService;
import org.apache.skywalking.oap.server.core.profiling.trace.ProfileTaskQueryService;
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
import org.apache.skywalking.oap.server.core.query.BrowserLogQueryService;
import org.apache.skywalking.oap.server.core.query.EventQueryService;
import org.apache.skywalking.oap.server.core.query.HierarchyQueryService;
import org.apache.skywalking.oap.server.core.query.LogQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricsMetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricsQueryService;
import org.apache.skywalking.oap.server.core.query.RecordQueryService;
import org.apache.skywalking.oap.server.core.query.TagAutoCompleteQueryService;
import org.apache.skywalking.oap.server.core.query.TopNRecordsQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.status.ServerStatusService;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.IModelManager;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.model.ModelManipulator;
import org.apache.skywalking.oap.server.core.storage.model.StorageModels;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.core.worker.WorkerInstancesService;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
/**
* Core module provider includes the recommended and default implementations of {@link CoreModule#services()}. All
* services with these default implementations are widely used including data receiver, data analysis, streaming
* process, storage and query.
* <p>
* NOTICE: In our experience, no one should re-implement the core module service implementations, unless they are very
* familiar with all mechanisms of SkyWalking.
*/
public class CoreModuleProvider extends ModuleProvider {
private CoreModuleConfig moduleConfig;
private GRPCServer grpcServer;
private HTTPServer httpServer;
private RemoteClientManager remoteClientManager;
private final AnnotationScan annotationScan;
private final StorageModels storageModels;
private final SourceReceiverImpl receiver;
private ApdexThresholdConfig apdexThresholdConfig;
private EndpointNameGroupingRuleWatcher endpointNameGroupingRuleWatcher;
private OALEngineLoaderService oalEngineLoaderService;
private LoggingConfigWatcher loggingConfigWatcher;
private EndpointNameGroupingRule4OpenapiWatcher endpointNameGroupingRule4OpenapiWatcher;
private EndpointNameGrouping endpointNameGrouping;
private HierarchyService hierarchyService;
public CoreModuleProvider() {
super();
this.annotationScan = new AnnotationScan();
this.storageModels = new StorageModels();
this.receiver = new SourceReceiverImpl();
}
@Override
public String name() {
return "default";
}
@Override
public Class<? extends ModuleDefine> module() {
return CoreModule.class;
}
@Override
public ConfigCreator newConfigCreator() {
return new ConfigCreator<CoreModuleConfig>() {
@Override
public Class type() {
return CoreModuleConfig.class;
}
@Override
public void onInitialized(final CoreModuleConfig initialized) {
moduleConfig = initialized;
}
};
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
if (moduleConfig.isActiveExtraModelColumns()) {
DefaultScopeDefine.activeExtraModelColumns();
}
endpointNameGrouping = new EndpointNameGrouping();
final NamingControl namingControl = new NamingControl(
moduleConfig.getServiceNameMaxLength(),
moduleConfig.getInstanceNameMaxLength(),
moduleConfig.getEndpointNameMaxLength(),
endpointNameGrouping
);
this.registerServiceImplementation(NamingControl.class, namingControl);
MeterEntity.setNamingControl(namingControl);
try {
endpointNameGroupingRuleWatcher = new EndpointNameGroupingRuleWatcher(
this, endpointNameGrouping);
if (moduleConfig.isEnableEndpointNameGroupingByOpenapi()) {
endpointNameGroupingRule4OpenapiWatcher = new EndpointNameGroupingRule4OpenapiWatcher(
this, endpointNameGrouping);
}
} catch (FileNotFoundException e) {
throw new ModuleStartException(e.getMessage(), e);
}
AnnotationScan scopeScan = new AnnotationScan();
scopeScan.registerListener(new DefaultScopeDefine.Listener());
try {
scopeScan.scan();
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager()));
AnnotationScan oalDisable = new AnnotationScan();
oalDisable.registerListener(DisableRegister.INSTANCE);
oalDisable.registerListener(new DisableRegister.SingleDisableScanListener());
try {
oalDisable.scan();
} catch (IOException | StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
if (moduleConfig.isGRPCSslEnabled()) {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
moduleConfig.getGRPCSslCertChainPath(),
moduleConfig.getGRPCSslKeyPath(),
null
);
} else {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
}
setBootingParameter("oap.internal.comm.host", moduleConfig.getGRPCHost());
setBootingParameter("oap.internal.comm.port", moduleConfig.getGRPCPort());
setBootingParameter("oap.external.grpc.host", moduleConfig.getGRPCHost());
setBootingParameter("oap.external.grpc.port", moduleConfig.getGRPCPort());
if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
}
if (moduleConfig.getMaxMessageSize() > 0) {
grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
}
if (moduleConfig.getGRPCThreadPoolSize() > 0) {
grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
}
grpcServer.initialize();
HTTPServerConfig httpServerConfig = HTTPServerConfig.builder()
.host(moduleConfig.getRestHost())
.port(moduleConfig.getRestPort())
.contextPath(moduleConfig.getRestContextPath())
.idleTimeOut(moduleConfig.getRestIdleTimeOut())
.maxThreads(moduleConfig.getRestMaxThreads())
.acceptQueueSize(
moduleConfig.getRestAcceptQueueSize())
.maxRequestHeaderSize(
moduleConfig.getHttpMaxRequestHeaderSize())
.build();
setBootingParameter("oap.external.http.host", moduleConfig.getRestHost());
setBootingParameter("oap.external.http.port", moduleConfig.getRestPort());
httpServer = new HTTPServer(httpServerConfig);
httpServer.initialize();
this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig, this));
this.registerServiceImplementation(ServerStatusService.class, new ServerStatusService(getManager(), moduleConfig));
this.registerServiceImplementation(HierarchyDefinitionService.class, new HierarchyDefinitionService(moduleConfig));
hierarchyService = new HierarchyService(getManager(), moduleConfig);
this.registerServiceImplementation(HierarchyService.class, hierarchyService);
this.registerServiceImplementation(
DownSamplingConfigService.class, new DownSamplingConfigService(moduleConfig.getDownsampling()));
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(HTTPHandlerRegister.class, new HTTPHandlerRegisterImpl(httpServer));
this.registerServiceImplementation(
IComponentLibraryCatalogService.class,
ComponentLibraryCatalogUtil.hold(new ComponentLibraryCatalogService())
);
this.registerServiceImplementation(SourceReceiver.class, receiver);
WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(ModelCreator.class, storageModels);
this.registerServiceImplementation(IModelManager.class, storageModels);
this.registerServiceImplementation(ModelManipulator.class, storageModels);
this.registerServiceImplementation(
NetworkAddressAliasCache.class, new NetworkAddressAliasCache(moduleConfig));
this.registerServiceImplementation(
TopologyQueryService.class, new TopologyQueryService(getManager(), storageModels));
this.registerServiceImplementation(MetricsMetadataQueryService.class, new MetricsMetadataQueryService());
this.registerServiceImplementation(MetricsQueryService.class, new MetricsQueryService(getManager()));
this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
this.registerServiceImplementation(BrowserLogQueryService.class, new BrowserLogQueryService(getManager()));
this.registerServiceImplementation(LogQueryService.class, new LogQueryService(getManager()));
this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager(), moduleConfig));
this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
this.registerServiceImplementation(EventQueryService.class, new EventQueryService(getManager()));
this.registerServiceImplementation(
TagAutoCompleteQueryService.class, new TagAutoCompleteQueryService(getManager(), moduleConfig));
this.registerServiceImplementation(RecordQueryService.class, new RecordQueryService(getManager()));
this.registerServiceImplementation(HierarchyQueryService.class, new HierarchyQueryService(getManager(), moduleConfig));
// add profile service implementations
this.registerServiceImplementation(
ProfileTaskMutationService.class, new ProfileTaskMutationService(getManager()));
this.registerServiceImplementation(
ProfileTaskQueryService.class, new ProfileTaskQueryService(getManager(), moduleConfig));
this.registerServiceImplementation(ProfileTaskCache.class, new ProfileTaskCache(getManager(), moduleConfig));
this.registerServiceImplementation(
EBPFProfilingMutationService.class, new EBPFProfilingMutationService(getManager()));
this.registerServiceImplementation(
EBPFProfilingQueryService.class,
new EBPFProfilingQueryService(getManager(), moduleConfig, this.storageModels)
);
this.registerServiceImplementation(
ContinuousProfilingMutationService.class, new ContinuousProfilingMutationService(getManager()));
this.registerServiceImplementation(
ContinuousProfilingQueryService.class, new ContinuousProfilingQueryService(getManager()));
this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));
// add oal engine loader service implementations
oalEngineLoaderService = new OALEngineLoaderService(getManager());
this.registerServiceImplementation(OALEngineLoaderService.class, oalEngineLoaderService);
annotationScan.registerListener(new StreamAnnotationListener(getManager()));
if (moduleConfig.isGRPCSslEnabled()) {
this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout(),
moduleConfig.getGRPCSslTrustedCAPath()
);
} else {
this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
}
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
// Management
this.registerServiceImplementation(
UITemplateManagementService.class, new UITemplateManagementService(getManager()));
this.registerServiceImplementation(
UIMenuManagementService.class, new UIMenuManagementService(getManager(), moduleConfig));
if (moduleConfig.getMetricsDataTTL() < 2) {
throw new ModuleStartException(
"Metric TTL should be at least 2 days, current value is " + moduleConfig.getMetricsDataTTL());
}
setBootingParameter("TTL.metrics", moduleConfig.getMetricsDataTTL());
if (moduleConfig.getRecordDataTTL() < 2) {
throw new ModuleStartException(
"Record TTL should be at least 2 days, current value is " + moduleConfig.getRecordDataTTL());
}
setBootingParameter("TTL.record", moduleConfig.getRecordDataTTL());
final MetricsStreamProcessor metricsStreamProcessor = MetricsStreamProcessor.getInstance();
metricsStreamProcessor.setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
metricsStreamProcessor.setStorageSessionTimeout(moduleConfig.getStorageSessionTimeout());
metricsStreamProcessor.setMetricsDataTTL(moduleConfig.getMetricsDataTTL());
TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
apdexThresholdConfig = new ApdexThresholdConfig(this);
ApdexMetrics.setDICT(apdexThresholdConfig);
loggingConfigWatcher = new LoggingConfigWatcher(this);
}
@Override
public void start() throws ModuleStartException {
grpcServer.addHandler(new RemoteServiceHandler(getManager()));
grpcServer.addHandler(new HealthCheckServiceHandler());
endpointNameGrouping.startHttpUriRecognitionSvr(
getManager()
.find(AIPipelineModule.NAME)
.provider()
.getService(HttpUriRecognition.class),
getService(MetadataQueryService.class),
moduleConfig.getSyncPeriodHttpUriRecognitionPattern(),
moduleConfig.getTrainingPeriodHttpUriRecognitionPattern(),
moduleConfig.getMaxHttpUrisNumberPerService()
);
// Disable OAL script has higher priority
oalEngineLoaderService.load(DisableOALDefine.INSTANCE);
try {
receiver.scan();
annotationScan.scan();
} catch (IOException | IllegalAccessException | InstantiationException | StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
Address gRPCServerInstanceAddress = new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
TelemetryRelatedContext.INSTANCE.setId(gRPCServerInstanceAddress.toString());
ClusterCoordinator coordinator = this.getManager()
.find(ClusterModule.NAME)
.provider()
.getService(ClusterCoordinator.class);
coordinator.registerWatcher(remoteClientManager);
coordinator.start();
if (CoreModuleConfig.Role.Mixed.name()
.equalsIgnoreCase(
moduleConfig.getRole())
|| CoreModuleConfig.Role.Aggregator.name()
.equalsIgnoreCase(
moduleConfig.getRole())) {
RemoteInstance gRPCServerInstance = new RemoteInstance(gRPCServerInstanceAddress);
coordinator.registerRemote(gRPCServerInstance);
}
OAPNodeChecker.setROLE(CoreModuleConfig.Role.fromName(moduleConfig.getRole()));
DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME)
.provider()
.getService(
DynamicConfigurationService.class);
dynamicConfigurationService.registerConfigChangeWatcher(apdexThresholdConfig);
dynamicConfigurationService.registerConfigChangeWatcher(endpointNameGroupingRuleWatcher);
dynamicConfigurationService.registerConfigChangeWatcher(loggingConfigWatcher);
if (moduleConfig.isEnableEndpointNameGroupingByOpenapi()) {
dynamicConfigurationService.registerConfigChangeWatcher(endpointNameGroupingRule4OpenapiWatcher);
}
dynamicConfigurationService.registerConfigChangeWatcher(moduleConfig.getSearchableTracesTagsWatcher());
}
@Override
public void notifyAfterCompleted() throws ModuleStartException {
try {
if (!RunningMode.isInitMode()) {
grpcServer.start();
httpServer.start();
remoteClientManager.start();
}
} catch (ServerException e) {
throw new ModuleStartException(e.getMessage(), e);
}
PersistenceTimer.INSTANCE.start(getManager(), moduleConfig);
if (moduleConfig.isEnableDataKeeperExecutor()) {
DataTTLKeeperTimer.INSTANCE.start(getManager(), moduleConfig);
}
CacheUpdateTimer.INSTANCE.start(getManager(), moduleConfig.getMetricsDataTTL());
try {
new UITemplateInitializer(getManager()).initAll();
} catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
try {
new UIMenuInitializer(getManager()).init();
} catch (IOException e) {
throw new ModuleStartException(e.getMessage(), e);
}
hierarchyService.startAutoMatchingServiceHierarchy();
}
@Override
public String[] requiredModules() {
return new String[] {
TelemetryModule.NAME,
ConfigurationModule.NAME,
AIPipelineModule.NAME
};
}
}