blob: 8a93a5e3c30abd1e30f58e50d41f93fc14651058 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import com.google.common.base.Strings;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.type.BasicTrace;
import org.apache.skywalking.oap.server.core.query.type.QueryOrder;
import org.apache.skywalking.oap.server.core.query.type.Span;
import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import static org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TableInstaller.ID_COLUMN;
public class H2TraceQueryDAO implements ITraceQueryDAO {
private ModuleManager manager;
private JDBCHikariCPClient h2Client;
private List<String> searchableTagKeys;
public H2TraceQueryDAO(ModuleManager manager,
JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
this.manager = manager;
}
@Override
public TraceBrief queryBasicTraces(long startSecondTB,
long endSecondTB,
long minDuration,
long maxDuration,
String serviceId,
String serviceInstanceId,
String endpointId,
String traceId,
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder,
final List<Tag> tags) throws IOException {
if (searchableTagKeys == null) {
final ConfigService configService = manager.find(CoreModule.NAME)
.provider()
.getService(ConfigService.class);
searchableTagKeys = Arrays.asList(configService.getSearchableTracesTags().split(Const.COMMA));
}
StringBuilder sql = new StringBuilder();
List<Object> parameters = new ArrayList<>(10);
sql.append("from ").append(SegmentRecord.INDEX_NAME);
/**
* This is an AdditionalEntity feature, see:
* {@link org.apache.skywalking.oap.server.core.storage.annotation.SQLDatabase.AdditionalEntity}
*/
if (!CollectionUtils.isEmpty(tags)) {
for (int i = 0; i < tags.size(); i++) {
sql.append(" inner join ").append(SegmentRecord.ADDITIONAL_TAG_TABLE).append(" ");
sql.append(SegmentRecord.ADDITIONAL_TAG_TABLE + i);
sql.append(" on ").append(SegmentRecord.INDEX_NAME).append(".").append(ID_COLUMN).append(" = ");
sql.append(SegmentRecord.ADDITIONAL_TAG_TABLE + i).append(".").append(ID_COLUMN);
}
}
sql.append(" where ");
sql.append(" 1=1 ");
if (startSecondTB != 0 && endSecondTB != 0) {
sql.append(" and ").append(SegmentRecord.INDEX_NAME).append(".").append(SegmentRecord.TIME_BUCKET).append(" >= ?");
parameters.add(startSecondTB);
sql.append(" and ").append(SegmentRecord.INDEX_NAME).append(".").append(SegmentRecord.TIME_BUCKET).append(" <= ?");
parameters.add(endSecondTB);
}
if (minDuration != 0) {
sql.append(" and ").append(SegmentRecord.LATENCY).append(" >= ?");
parameters.add(minDuration);
}
if (maxDuration != 0) {
sql.append(" and ").append(SegmentRecord.LATENCY).append(" <= ?");
parameters.add(maxDuration);
}
if (StringUtil.isNotEmpty(serviceId)) {
sql.append(" and ").append(SegmentRecord.SERVICE_ID).append(" = ?");
parameters.add(serviceId);
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
sql.append(" and ").append(SegmentRecord.SERVICE_INSTANCE_ID).append(" = ?");
parameters.add(serviceInstanceId);
}
if (!Strings.isNullOrEmpty(endpointId)) {
sql.append(" and ").append(SegmentRecord.ENDPOINT_ID).append(" = ?");
parameters.add(endpointId);
}
if (!Strings.isNullOrEmpty(traceId)) {
sql.append(" and ").append(SegmentRecord.TRACE_ID).append(" = ?");
parameters.add(traceId);
}
if (CollectionUtils.isNotEmpty(tags)) {
for (int i = 0; i < tags.size(); i++) {
final int foundIdx = searchableTagKeys.indexOf(tags.get(i).getKey());
if (foundIdx > -1) {
sql.append(" and ").append(SegmentRecord.ADDITIONAL_TAG_TABLE + i).append(".");
sql.append(SegmentRecord.TAGS).append(" = ?");
parameters.add(tags.get(i).toString());
} else {
//If the tag is not searchable, but is required, then don't need to run the real query.
return new TraceBrief();
}
}
}
switch (traceState) {
case ERROR:
sql.append(" and ").append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.TRUE);
break;
case SUCCESS:
sql.append(" and ").append(SegmentRecord.IS_ERROR).append(" = ").append(BooleanUtils.FALSE);
break;
}
switch (queryOrder) {
case BY_START_TIME:
sql.append(" order by ").append(SegmentRecord.START_TIME).append(" ").append("desc");
break;
case BY_DURATION:
sql.append(" order by ").append(SegmentRecord.LATENCY).append(" ").append("desc");
break;
}
TraceBrief traceBrief = new TraceBrief();
try (Connection connection = h2Client.getConnection()) {
buildLimit(sql, from, limit);
try (ResultSet resultSet = h2Client.executeQuery(
connection, "select " +
SegmentRecord.SEGMENT_ID + ", " +
SegmentRecord.START_TIME + ", " +
SegmentRecord.ENDPOINT_ID + ", " +
SegmentRecord.LATENCY + ", " +
SegmentRecord.IS_ERROR + ", " +
SegmentRecord.TRACE_ID + " " + sql, parameters.toArray(new Object[0]))) {
while (resultSet.next()) {
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId(resultSet.getString(SegmentRecord.SEGMENT_ID));
basicTrace.setStart(resultSet.getString(SegmentRecord.START_TIME));
basicTrace.getEndpointNames().add(
IDManager.EndpointID.analysisId(resultSet.getString(SegmentRecord.ENDPOINT_ID))
.getEndpointName()
);
basicTrace.setDuration(resultSet.getInt(SegmentRecord.LATENCY));
basicTrace.setError(BooleanUtils.valueToBoolean(resultSet.getInt(SegmentRecord.IS_ERROR)));
String traceIds = resultSet.getString(SegmentRecord.TRACE_ID);
basicTrace.getTraceIds().add(traceIds);
traceBrief.getTraces().add(basicTrace);
}
}
} catch (SQLException e) {
throw new IOException(e);
}
return traceBrief;
}
protected void buildLimit(StringBuilder sql, int from, int limit) {
sql.append(" LIMIT ").append(limit);
sql.append(" OFFSET ").append(from);
}
@Override
public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
List<SegmentRecord> segmentRecords = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(
connection, "select " + SegmentRecord.SEGMENT_ID + ", " +
SegmentRecord.TRACE_ID + ", " +
SegmentRecord.SERVICE_ID + ", " +
SegmentRecord.SERVICE_INSTANCE_ID + ", " +
SegmentRecord.START_TIME + ", " +
SegmentRecord.LATENCY + ", " +
SegmentRecord.IS_ERROR + ", " +
SegmentRecord.DATA_BINARY + " from " +
SegmentRecord.INDEX_NAME + " where " + SegmentRecord.TRACE_ID + " = ?", traceId
)) {
while (resultSet.next()) {
SegmentRecord segmentRecord = new SegmentRecord();
segmentRecord.setSegmentId(resultSet.getString(SegmentRecord.SEGMENT_ID));
segmentRecord.setTraceId(resultSet.getString(SegmentRecord.TRACE_ID));
segmentRecord.setServiceId(resultSet.getString(SegmentRecord.SERVICE_ID));
segmentRecord.setServiceInstanceId(resultSet.getString(SegmentRecord.SERVICE_INSTANCE_ID));
segmentRecord.setStartTime(resultSet.getLong(SegmentRecord.START_TIME));
segmentRecord.setLatency(resultSet.getInt(SegmentRecord.LATENCY));
segmentRecord.setIsError(resultSet.getInt(SegmentRecord.IS_ERROR));
String dataBinaryBase64 = resultSet.getString(SegmentRecord.DATA_BINARY);
if (!Strings.isNullOrEmpty(dataBinaryBase64)) {
segmentRecord.setDataBinary(Base64.getDecoder().decode(dataBinaryBase64));
}
segmentRecords.add(segmentRecord);
}
}
} catch (SQLException e) {
throw new IOException(e);
}
return segmentRecords;
}
@Override
public List<Span> doFlexibleTraceQuery(String traceId) {
return Collections.emptyList();
}
}