blob: 6c3c9d35de6e39af921564553be4f99ac21af48e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.skywalking.apm.network.logging.v3.LogTags;
import org.apache.skywalking.banyandb.v1.client.RowEntity;
import org.apache.skywalking.banyandb.v1.client.StreamQuery;
import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
import org.apache.skywalking.banyandb.v1.client.TimestampRange;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.input.TraceScopeCondition;
import org.apache.skywalking.oap.server.core.query.type.ContentType;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.Log;
import org.apache.skywalking.oap.server.core.query.type.Logs;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import static java.util.Objects.nonNull;
/**
* {@link org.apache.skywalking.oap.server.core.analysis.manual.log.LogRecord} is a stream
*/
public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO implements ILogQueryDAO {
private static final Set<String> TAGS = ImmutableSet.of(AbstractLogRecord.SERVICE_ID,
AbstractLogRecord.SERVICE_INSTANCE_ID,
AbstractLogRecord.ENDPOINT_ID,
AbstractLogRecord.TRACE_ID,
AbstractLogRecord.TRACE_SEGMENT_ID,
AbstractLogRecord.SPAN_ID,
AbstractLogRecord.TIMESTAMP,
AbstractLogRecord.CONTENT_TYPE,
AbstractLogRecord.CONTENT,
AbstractLogRecord.TAGS,
AbstractLogRecord.TAGS_RAW_DATA);
public BanyanDBLogQueryDAO(BanyanDBStorageClient client) {
super(client);
}
@Override
public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId,
TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit,
Duration duration, List<Tag> tags, List<String> keywordsOfContent,
List<String> excludingKeywordsOfContent) throws IOException {
long startTB = 0;
long endTB = 0;
if (nonNull(duration)) {
startTB = duration.getStartTimeBucketInSec();
endTB = duration.getEndTimeBucketInSec();
}
final QueryBuilder<StreamQuery> query = new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
query.and(eq(AbstractLogRecord.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
query.and(eq(AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (StringUtil.isNotEmpty(endpointId)) {
query.and(eq(AbstractLogRecord.ENDPOINT_ID, endpointId));
}
if (Objects.nonNull(relatedTrace)) {
if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
query.and(eq(AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
}
if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
query.and(eq(AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
}
if (Objects.nonNull(relatedTrace.getSpanId())) {
query.and(eq(AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
}
}
if (CollectionUtils.isNotEmpty(tags)) {
List<String> tagsConditions = new ArrayList<>(tags.size());
for (final Tag tag : tags) {
tagsConditions.add(tag.toString());
}
query.and(having(LogRecord.TAGS, tagsConditions));
}
}
};
TimestampRange tsRange = null;
if (startTB > 0 && endTB > 0) {
tsRange = new TimestampRange(TimeBucket.getTimestamp(startTB), TimeBucket.getTimestamp(endTB));
}
StreamQueryResponse resp = query(LogRecord.INDEX_NAME, TAGS, tsRange, query);
Logs logs = new Logs();
for (final RowEntity rowEntity : resp.getElements()) {
Log log = new Log();
log.setServiceId(rowEntity.getTagValue(AbstractLogRecord.SERVICE_ID));
log.setServiceInstanceId(
rowEntity.getTagValue(AbstractLogRecord.SERVICE_INSTANCE_ID));
log.setEndpointId(
rowEntity.getTagValue(AbstractLogRecord.ENDPOINT_ID));
if (log.getEndpointId() != null) {
log.setEndpointName(
IDManager.EndpointID.analysisId(log.getEndpointId()).getEndpointName());
}
log.setTraceId(rowEntity.getTagValue(AbstractLogRecord.TRACE_ID));
log.setTimestamp(((Number) rowEntity.getTagValue(AbstractLogRecord.TIMESTAMP)).longValue());
log.setContentType(ContentType.instanceOf(
((Number) rowEntity.getTagValue(AbstractLogRecord.CONTENT_TYPE)).intValue()));
log.setContent(rowEntity.getTagValue(AbstractLogRecord.CONTENT));
byte[] dataBinary = rowEntity.getTagValue(AbstractLogRecord.TAGS_RAW_DATA);
if (dataBinary != null && dataBinary.length > 0) {
parserDataBinary(dataBinary, log.getTags());
}
logs.getLogs().add(log);
}
return logs;
}
/**
* Parser the raw tags.
* TODO: merge default method
*/
private void parserDataBinary(byte[] dataBinary, List<KeyValue> tags) {
try {
LogTags logTags = LogTags.parseFrom(dataBinary);
logTags.getDataList().forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}