blob: a521928dad228585c34e7ffd673ea7dec7159bac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
import java.io.ByteArrayInputStream;
import java.util.Properties;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.management.ui.menu.UIMenu;
import org.apache.skywalking.oap.server.core.management.ui.template.UITemplate;
import org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingPolicy;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO;
import org.apache.skywalking.oap.server.core.storage.management.UIMenuManagementDAO;
import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO;
import org.apache.skywalking.oap.server.core.storage.model.ModelCreator;
import org.apache.skywalking.oap.server.core.storage.profiling.continuous.IContinuousProfilingPolicyDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingScheduleDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IServiceLabelDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IBrowserLogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IHierarchyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IZipkinQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.util.MultipleFilesChangeMonitor;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressAliasEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AlarmQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.BrowserLogQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ContinuousProfilingPolicyEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.EBPFProfilingDataEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.EBPFProfilingScheduleEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.EBPFProfilingTaskEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ESEventQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.HierarchyQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.LogQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetadataQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetricsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskLogEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileThreadSnapshotQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.RecordsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ServiceLabelEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.SpanAttachedEventEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.UIMenuManagementEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.UITemplateManagementEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.zipkin.ZipkinQueryEsDAO;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
/**
* The storage provider for ElasticSearch 6/7 and OpenSearch 1.x.
*/
@Slf4j
public class StorageModuleElasticsearchProvider extends ModuleProvider {
protected StorageModuleElasticsearchConfig config;
protected ElasticSearchClient elasticSearchClient;
protected StorageEsInstaller modelInstaller;
@Override
public String name() {
return "elasticsearch";
}
@Override
public Class<? extends ModuleDefine> module() {
return StorageModule.class;
}
@Override
public ConfigCreator newConfigCreator() {
return new ConfigCreator<StorageModuleElasticsearchConfig>() {
@Override
public Class type() {
return StorageModuleElasticsearchConfig.class;
}
@Override
public void onInitialized(final StorageModuleElasticsearchConfig initialized) {
config = initialized;
}
};
}
@Override
public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
if (StringUtil.isEmpty(config.getNamespace())) {
config.setNamespace("sw");
} else {
config.setNamespace(config.getNamespace().toLowerCase());
}
if (config.getDayStep() > 1) {
TimeSeriesUtils.setDAY_STEP(config.getDayStep());
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getDayStep());
}
if (config.getSuperDatasetDayStep() > 0) {
TimeSeriesUtils.setSUPER_DATASET_DAY_STEP(config.getSuperDatasetDayStep());
}
if (!StringUtil.isEmpty(config.getSecretsManagementFile())) {
MultipleFilesChangeMonitor monitor = new MultipleFilesChangeMonitor(
10, readableContents -> {
final byte[] secretsFileContent = readableContents.get(0);
if (secretsFileContent == null) {
return;
}
Properties secrets = new Properties();
secrets.load(new ByteArrayInputStream(secretsFileContent));
config.setUser(secrets.getProperty("user", null));
config.setPassword(secrets.getProperty("password", null));
config.setTrustStorePass(secrets.getProperty("trustStorePass", null));
if (elasticSearchClient == null) {
// In the startup process, we just need to change the username/password
} else {
// The client has connected, updates the config and connects again.
elasticSearchClient.setUser(config.getUser());
elasticSearchClient.setPassword(config.getPassword());
elasticSearchClient.setTrustStorePass(config.getTrustStorePass());
elasticSearchClient.connect();
}
}, config.getSecretsManagementFile(), config.getTrustStorePath());
/*
* By leveraging the sync update check feature when startup.
*/
monitor.start();
}
elasticSearchClient = new ElasticSearchClient(
config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config
.getTrustStorePass(), config.getUser(), config.getPassword(),
indexNameConverter(config.getNamespace()), config.getConnectTimeout(),
config.getSocketTimeout(), config.getResponseTimeout(),
config.getNumHttpClientThread()
);
modelInstaller = new StorageEsInstaller(elasticSearchClient, getManager(), config);
this.registerServiceImplementation(
IBatchDAO.class,
new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config
.getFlushInterval(), config.getConcurrentRequests(), config.getBatchOfBytes())
);
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
this.registerServiceImplementation(
IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
this.registerServiceImplementation(
INetworkAddressAliasDAO.class, new NetworkAddressAliasEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(
ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient, config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(IBrowserLogQueryDAO.class, new BrowserLogQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(
IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient, config));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IRecordsQueryDAO.class, new RecordsQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(
IProfileTaskQueryDAO.class, new ProfileTaskQueryEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(
IProfileTaskLogQueryDAO.class, new ProfileTaskLogEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(
IProfileThreadSnapshotQueryDAO.class, new ProfileThreadSnapshotQueryEsDAO(elasticSearchClient, config
.getProfileTaskQueryMaxSize()));
this.registerServiceImplementation(
UITemplateManagementDAO.class, new UITemplateManagementEsDAO(elasticSearchClient, new UITemplate.Builder()));
this.registerServiceImplementation(
UIMenuManagementDAO.class, new UIMenuManagementEsDAO(elasticSearchClient, new UIMenu.Builder()));
this.registerServiceImplementation(IEventQueryDAO.class, new ESEventQueryDAO(elasticSearchClient));
this.registerServiceImplementation(
IEBPFProfilingTaskDAO.class,
new EBPFProfilingTaskEsDAO(elasticSearchClient, config)
);
this.registerServiceImplementation(
IEBPFProfilingScheduleDAO.class,
new EBPFProfilingScheduleEsDAO(elasticSearchClient, config)
);
this.registerServiceImplementation(
IEBPFProfilingDataDAO.class,
new EBPFProfilingDataEsDAO(elasticSearchClient, config)
);
this.registerServiceImplementation(
IContinuousProfilingPolicyDAO.class,
new ContinuousProfilingPolicyEsDAO(elasticSearchClient, new ContinuousProfilingPolicy.Builder())
);
this.registerServiceImplementation(
IServiceLabelDAO.class,
new ServiceLabelEsDAO(elasticSearchClient, config)
);
this.registerServiceImplementation(
ITagAutoCompleteQueryDAO.class, new TagAutoCompleteQueryDAO(elasticSearchClient));
this.registerServiceImplementation(
IZipkinQueryDAO.class, new ZipkinQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(
ISpanAttachedEventQueryDAO.class, new SpanAttachedEventEsDAO(elasticSearchClient, config));
IndexController.INSTANCE.setLogicSharding(config.isLogicSharding());
IndexController.INSTANCE.setEnableCustomRouting(config.isEnableCustomRouting());
this.registerServiceImplementation(IHierarchyQueryDAO.class, new HierarchyQueryEsDAO(elasticSearchClient, config));
}
@Override
public void start() throws ModuleStartException {
MetricsCreator metricCreator = getManager().find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
HealthCheckMetrics healthChecker = metricCreator.createHealthCheckerGauge(
"storage_elasticsearch", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
elasticSearchClient.registerChecker(healthChecker);
try {
elasticSearchClient.connect();
final ConfigService service = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class);
// Add 5s to make sure OAP has at least done persistent once.
// By default, the persistent period is 25 seconds and ElasticSearch refreshes in every 30 seconds.
modelInstaller.setIndexRefreshInterval(service.getPersistentPeriod() + 5);
modelInstaller.start();
getManager().find(CoreModule.NAME)
.provider()
.getService(ModelCreator.class)
.addModelListener(modelInstaller);
} catch (Exception e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override
public void notifyAfterCompleted() {
}
@Override
public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
public static Function<String, String> indexNameConverter(String namespace) {
return indexName -> {
if (StringUtil.isNotEmpty(namespace)) {
return namespace + "_" + indexName;
}
return indexName;
};
}
}