blob: 7eb6ce48e78d9080f7fa5035cf8c3627a3ec4674 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import groovy.lang.Closure;
import groovy.lang.DelegatesTo;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.List;
import lombok.experimental.Delegate;
import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.TraceContext;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.AbstractSpec;
import org.apache.skywalking.oap.log.analyzer.dsl.spec.extractor.slowsql.SlowSqlSpec;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamilyBuilder;
import org.apache.skywalking.oap.server.analyzer.provider.trace.parser.listener.DatabaseSlowStatementBuilder;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.source.DatabaseSlowStatement;
import org.apache.skywalking.oap.server.core.source.ServiceMeta;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.nonNull;
import static org.apache.skywalking.oap.server.library.util.StringUtil.isNotBlank;
public class ExtractorSpec extends AbstractSpec {
private static final Logger LOGGER = LoggerFactory.getLogger(SlowSqlSpec.class);
private final List<MetricConvert> metricConverts;
private final SlowSqlSpec slowSql;
private final NamingControl namingControl;
private final SourceReceiver sourceReceiver;
private static final DateTimeFormatter DTF = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
public ExtractorSpec(final ModuleManager moduleManager,
final LogAnalyzerModuleConfig moduleConfig) throws ModuleStartException {
super(moduleManager, moduleConfig);
final MeterSystem meterSystem =
moduleManager.find(CoreModule.NAME).provider().getService(MeterSystem.class);
metricConverts = moduleConfig.malConfigs()
.stream()
.map(it -> new MetricConvert(it, meterSystem))
.collect(Collectors.toList());
slowSql = new SlowSqlSpec(moduleManager(), moduleConfig());
namingControl = moduleManager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
}
@SuppressWarnings("unused")
public void service(final String service) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(service)) {
BINDING.get().log().setService(service);
}
}
@SuppressWarnings("unused")
public void instance(final String instance) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(instance)) {
BINDING.get().log().setServiceInstance(instance);
}
}
@SuppressWarnings("unused")
public void endpoint(final String endpoint) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(endpoint)) {
BINDING.get().log().setEndpoint(endpoint);
}
}
@SuppressWarnings("unused")
public void tag(final Map<String, ?> kv) {
if (BINDING.get().shouldAbort()) {
return;
}
if (CollectionUtils.isEmpty(kv)) {
return;
}
final LogData.Builder logData = BINDING.get().log();
logData.setTags(
logData.getTags()
.toBuilder()
.addAllData(
kv.entrySet()
.stream()
.filter(it -> isNotBlank(it.getKey()))
.filter(it -> nonNull(it.getValue()) &&
isNotBlank(Objects.toString(it.getValue())))
.map(it -> {
final Object val = it.getValue();
String valStr = Objects.toString(val);
if (Collection.class.isAssignableFrom(val.getClass())) {
valStr = Joiner.on(",").skipNulls().join((Collection<?>) val);
}
return KeyStringValuePair.newBuilder()
.setKey(it.getKey())
.setValue(valStr)
.build();
})
.collect(Collectors.toList())
)
);
BINDING.get().log(logData);
}
@SuppressWarnings("unused")
public void traceId(final String traceId) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(traceId)) {
final LogData.Builder logData = BINDING.get().log();
final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
traceContext.setTraceId(traceId);
logData.setTraceContext(traceContext);
}
}
@SuppressWarnings("unused")
public void segmentId(final String segmentId) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(segmentId)) {
final LogData.Builder logData = BINDING.get().log();
final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
traceContext.setTraceSegmentId(segmentId);
logData.setTraceContext(traceContext);
}
}
@SuppressWarnings("unused")
public void spanId(final String spanId) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(spanId)) {
final LogData.Builder logData = BINDING.get().log();
final TraceContext.Builder traceContext = logData.getTraceContext().toBuilder();
traceContext.setSpanId(Integer.parseInt(spanId));
logData.setTraceContext(traceContext);
}
}
@SuppressWarnings("unused")
public void timestamp(final String timestamp) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(timestamp) && StringUtils.isNumeric(timestamp)) {
BINDING.get().log().setTimestamp(Long.parseLong(timestamp));
}
}
@SuppressWarnings("unused")
public void layer(final String layer) {
if (BINDING.get().shouldAbort()) {
return;
}
if (nonNull(layer)) {
final LogData.Builder logData = BINDING.get().log();
logData.setLayer(layer);
}
}
@SuppressWarnings("unused")
public void metrics(@DelegatesTo(SampleBuilder.class) final Closure<?> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
final SampleBuilder builder = new SampleBuilder();
cl.setDelegate(builder);
cl.call();
final Sample sample = builder.build();
final SampleFamily sampleFamily = SampleFamilyBuilder.newBuilder(sample).build();
final Optional<List<SampleFamily>> possibleMetricsContainer = BINDING.get().metricsContainer();
if (possibleMetricsContainer.isPresent()) {
possibleMetricsContainer.get().add(sampleFamily);
} else {
metricConverts.forEach(it -> it.toMeter(
ImmutableMap.<String, SampleFamily>builder()
.put(sample.getName(), sampleFamily)
.build()
));
}
}
@SuppressWarnings("unused")
public void slowSql(@DelegatesTo(SlowSqlSpec.class) final Closure<?> cl) {
if (BINDING.get().shouldAbort()) {
return;
}
LogData.Builder log = BINDING.get().log();
if (log.getLayer() == null
|| log.getService() == null
|| log.getTimestamp() < 1) {
LOGGER.warn("SlowSql extracts failed, maybe something is not configured.");
return;
}
DatabaseSlowStatementBuilder builder = new DatabaseSlowStatementBuilder(namingControl);
builder.setLayer(Layer.nameOf(log.getLayer()));
LocalDateTime localDateTime = Instant.ofEpochSecond(log.getTimestamp()).atZone(ZoneId.systemDefault()).toLocalDateTime();
String timeBucket = DTF.format(localDateTime);
builder.setTimeBucket(Long.parseLong(timeBucket));
builder.setServiceName(log.getService());
ServiceMeta serviceMeta = new ServiceMeta();
serviceMeta.setName(namingControl.formatServiceName(log.getService()));
serviceMeta.setLayer(builder.getLayer());
serviceMeta.setTimeBucket(builder.getTimeBucket());
BINDING.get().databaseSlowStatement(builder);
cl.setDelegate(slowSql);
cl.call();
if (builder.getId() == null
|| builder.getLatency() < 1
|| builder.getStatement() == null) {
LOGGER.warn("SlowSql extracts failed, maybe something is not configured.");
return;
}
String entityId = serviceMeta.getEntityId();
builder.prepare();
DatabaseSlowStatement databaseSlowStatement = builder.toDatabaseSlowStatement();
databaseSlowStatement.setDatabaseServiceId(entityId);
sourceReceiver.receive(databaseSlowStatement);
sourceReceiver.receive(serviceMeta);
}
public static class SampleBuilder {
@Delegate
private final Sample.SampleBuilder sampleBuilder = Sample.builder();
@SuppressWarnings("unused")
public Sample.SampleBuilder labels(final Map<String, ?> labels) {
final Map<String, String> filtered =
labels.entrySet()
.stream()
.filter(it -> isNotBlank(it.getKey()) && nonNull(it.getValue()))
.collect(
Collectors.toMap(Map.Entry::getKey,
it -> Objects.toString(it.getValue()))
);
return sampleBuilder.labels(ImmutableMap.copyOf(filtered));
}
}
}