blob: 8a866ef688049ada3b9f5afbeddc3e1555683b35 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.log.analyzer.provider.log.listener;
import com.google.protobuf.Message;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
import org.apache.skywalking.apm.network.logging.v3.TraceContext;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
import org.apache.skywalking.oap.server.core.source.TagAutocomplete;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.log.analyzer.provider.LogAnalyzerModuleConfig;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.query.type.ContentType;
import org.apache.skywalking.oap.server.core.source.Log;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import static org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
/**
* RecordSinkListener forwards the log data to the persistence layer with the query required conditions.
*/
@RequiredArgsConstructor
public class RecordSinkListener implements LogSinkListener {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
private final List<String> searchableTagKeys;
@Getter
private final Log log = new Log();
@Override
public void build() {
sourceReceiver.receive(log);
addAutocompleteTags();
}
@Override
@SneakyThrows
public LogSinkListener parse(final LogData.Builder logData,
final Message extraLog) {
LogDataBody body = logData.getBody();
log.setUniqueId(UUID.randomUUID().toString().replace("-", ""));
// timestamp
log.setTimestamp(logData.getTimestamp());
log.setTimeBucket(TimeBucket.getRecordTimeBucket(logData.getTimestamp()));
// service
String serviceName = namingControl.formatServiceName(logData.getService());
String serviceId = IDManager.ServiceID.buildId(serviceName, true);
log.setServiceId(serviceId);
// service instance
if (StringUtil.isNotEmpty(logData.getServiceInstance())) {
log.setServiceInstanceId(IDManager.ServiceInstanceID.buildId(
serviceId,
namingControl.formatInstanceName(logData.getServiceInstance())
));
}
// endpoint
if (StringUtil.isNotEmpty(logData.getEndpoint())) {
String endpointName = namingControl.formatEndpointName(serviceName, logData.getEndpoint());
log.setEndpointId(IDManager.EndpointID.buildId(serviceId, endpointName));
}
// trace
TraceContext traceContext = logData.getTraceContext();
if (StringUtil.isNotEmpty(traceContext.getTraceId())) {
log.setTraceId(traceContext.getTraceId());
}
if (StringUtil.isNotEmpty(traceContext.getTraceSegmentId())) {
log.setTraceSegmentId(traceContext.getTraceSegmentId());
log.setSpanId(traceContext.getSpanId());
}
// content
if (body.hasText()) {
log.setContentType(ContentType.TEXT);
log.setContent(body.getText().getText());
} else if (body.hasYaml()) {
log.setContentType(ContentType.YAML);
log.setContent(body.getYaml().getYaml());
} else if (body.hasJson()) {
log.setContentType(ContentType.JSON);
log.setContent(body.getJson().getJson());
} else if (extraLog != null) {
log.setContentType(ContentType.JSON);
log.setContent(toJSON(extraLog));
}
if (logData.getTags().getDataCount() > 0) {
log.setTagsRawData(logData.getTags().toByteArray());
}
log.getTags().addAll(appendSearchableTags(logData));
return this;
}
private Collection<Tag> appendSearchableTags(LogData.Builder logData) {
HashSet<Tag> logTags = new HashSet<>();
logData.getTags().getDataList().forEach(tag -> {
if (searchableTagKeys.contains(tag.getKey())) {
final Tag logTag = new Tag(tag.getKey(), tag.getValue());
logTags.add(logTag);
}
});
return logTags;
}
private void addAutocompleteTags() {
log.getTags().forEach(tag -> {
TagAutocomplete tagAutocomplete = new TagAutocomplete();
tagAutocomplete.setTagKey(tag.getKey());
tagAutocomplete.setTagValue(tag.getValue());
tagAutocomplete.setTagType(TagType.LOG);
tagAutocomplete.setTimeBucket(TimeBucket.getMinuteTimeBucket(log.getTimestamp()));
sourceReceiver.receive(tagAutocomplete);
});
}
public static class Factory implements LogSinkListenerFactory {
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
private final List<String> searchableTagKeys;
public Factory(ModuleManager moduleManager, LogAnalyzerModuleConfig moduleConfig) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME)
.provider()
.getService(SourceReceiver.class);
this.namingControl = moduleManager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
ConfigService configService = moduleManager.find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
this.searchableTagKeys = Arrays.asList(configService.getSearchableLogsTags().split(Const.COMMA));
}
@Override
public RecordSinkListener create() {
return new RecordSinkListener(sourceReceiver, namingControl, searchableTagKeys);
}
}
}