blob: b2cf68c57b3b9f2ec7ce2c9e490d69bf25c4fa33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.analysis.manual.RelationDefineUtil;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.storage.model.ModelName;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
/**
* @author wusheng
*/
public class H2TopologyQueryDAO implements ITopologyQueryDAO {
private JDBCHikariCPClient h2Client;
public H2TopologyQueryDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
}
@Override public List<Call.CallDetail> loadSpecifiedServerSideServiceRelations(Downsampling downsampling, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
String tableName = ModelName.build(downsampling, ServiceRelationServerSideMetrics.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.DEST_SERVICE_ID, serviceIds, false);
}
@Override public List<Call.CallDetail> loadSpecifiedClientSideServiceRelations(Downsampling downsampling, long startTB, long endTB,
List<Integer> serviceIds) throws IOException {
String tableName = ModelName.build(downsampling, ServiceRelationClientSideMetrics.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.DEST_SERVICE_ID, serviceIds, true);
}
@Override public List<Call.CallDetail> loadServerSideServiceRelations(Downsampling downsampling, long startTB, long endTB) throws IOException {
String tableName = ModelName.build(downsampling, ServiceRelationServerSideMetrics.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.DEST_SERVICE_ID, new ArrayList<>(0), false);
}
@Override public List<Call.CallDetail> loadClientSideServiceRelations(Downsampling downsampling, long startTB, long endTB) throws IOException {
String tableName = ModelName.build(downsampling, ServiceRelationClientSideMetrics.INDEX_NAME);
return loadServiceCalls(tableName, startTB, endTB, ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, ServiceRelationServerSideMetrics.DEST_SERVICE_ID, new ArrayList<>(0), true);
}
@Override
public List<Call.CallDetail> loadServerSideServiceInstanceRelations(int clientServiceId, int serverServiceId, Downsampling downsampling, long startTB, long endTB) throws IOException {
String tableName = ModelName.build(downsampling, ServiceInstanceRelationServerSideMetrics.INDEX_NAME);
return loadServiceInstanceCalls(tableName, startTB, endTB,
ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID,
ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID,
clientServiceId, serverServiceId, false);
}
@Override
public List<Call.CallDetail> loadClientSideServiceInstanceRelations(int clientServiceId, int serverServiceId, Downsampling downsampling, long startTB, long endTB) throws IOException {
String tableName = ModelName.build(downsampling, ServiceInstanceRelationClientSideMetrics.INDEX_NAME);
return loadServiceInstanceCalls(tableName, startTB, endTB,
ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID,
ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID,
clientServiceId, serverServiceId, true);
}
@Override
public List<Call.CallDetail> loadSpecifiedDestOfServerSideEndpointRelations(Downsampling downsampling, long startTB, long endTB, int destEndpointId) throws IOException {
String tableName = ModelName.build(downsampling, EndpointRelationServerSideMetrics.INDEX_NAME);
List<Call.CallDetail> calls = loadEndpointFromSide(tableName, startTB, endTB, EndpointRelationServerSideMetrics.SOURCE_ENDPOINT_ID, EndpointRelationServerSideMetrics.DEST_ENDPOINT_ID, destEndpointId, false);
calls.addAll(loadEndpointFromSide(tableName, startTB, endTB, EndpointRelationServerSideMetrics.SOURCE_ENDPOINT_ID, EndpointRelationServerSideMetrics.DEST_ENDPOINT_ID, destEndpointId, true));
return calls;
}
private List<Call.CallDetail> loadServiceCalls(String tableName, long startTB, long endTB, String sourceCName,
String destCName, List<Integer> serviceIds, boolean isClientSide) throws IOException {
Object[] conditions = new Object[serviceIds.size() * 2 + 2];
conditions[0] = startTB;
conditions[1] = endTB;
StringBuilder serviceIdMatchSql = new StringBuilder();
if (serviceIds.size() > 0) {
serviceIdMatchSql.append("and (");
for (int i = 0; i < serviceIds.size(); i++) {
serviceIdMatchSql.append(sourceCName + "=? or " + destCName + "=? ");
conditions[i * 2 + 2] = serviceIds.get(i);
conditions[i * 2 + 1 + 2] = serviceIds.get(i);
if (i != serviceIds.size() - 1) {
serviceIdMatchSql.append("or ");
}
}
serviceIdMatchSql.append(")");
}
List<Call.CallDetail> calls = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ Metrics.ENTITY_ID
+ " from " + tableName + " where "
+ Metrics.TIME_BUCKET + ">= ? and " + Metrics.TIME_BUCKET + "<=? "
+ serviceIdMatchSql.toString()
+ " group by " + Metrics.ENTITY_ID,
conditions)) {
buildCalls(resultSet, calls, isClientSide);
}
} catch (SQLException e) {
throw new IOException(e);
}
return calls;
}
private List<Call.CallDetail> loadServiceInstanceCalls(String tableName, long startTB, long endTB, String sourceCName,
String descCName, int sourceServiceId, int destServiceId, boolean isClientSide) throws IOException {
Object[] conditions = new Object[]{startTB, endTB, sourceServiceId, destServiceId, destServiceId, sourceServiceId};
StringBuilder serviceIdMatchSql = new StringBuilder("and ((")
.append(sourceCName).append("=? and ").append(descCName).append("=?").append(") or (")
.append(sourceCName).append("=? and ").append(descCName).append("=?").append("))");
List<Call.CallDetail> calls = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ Metrics.ENTITY_ID
+ " from " + tableName + " where "
+ Metrics.TIME_BUCKET + ">= ? and " + Metrics.TIME_BUCKET + "<=? "
+ serviceIdMatchSql.toString()
+ " group by " + Metrics.ENTITY_ID,
conditions)) {
buildCalls(resultSet, calls, isClientSide);
}
} catch (SQLException e) {
throw new IOException(e);
}
return calls;
}
private List<Call.CallDetail> loadEndpointFromSide(String tableName, long startTB, long endTB, String sourceCName,
String destCName, int id, boolean isSourceId) throws IOException {
Object[] conditions = new Object[3];
conditions[0] = startTB;
conditions[1] = endTB;
conditions[2] = id;
List<Call.CallDetail> calls = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
try (ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ Metrics.ENTITY_ID
+ " from " + tableName + " where "
+ Metrics.TIME_BUCKET + ">= ? and " + Metrics.TIME_BUCKET + "<=? and "
+ (isSourceId ? sourceCName : destCName) + "=?"
+ " group by " + Metrics.ENTITY_ID,
conditions)) {
buildCalls(resultSet, calls, isSourceId);
}
} catch (SQLException e) {
throw new IOException(e);
}
return calls;
}
private void buildCalls(ResultSet resultSet, List<Call.CallDetail> calls,
boolean isClientSide) throws SQLException {
while (resultSet.next()) {
Call.CallDetail call = new Call.CallDetail();
String entityId = resultSet.getString(Metrics.ENTITY_ID);
RelationDefineUtil.RelationDefine relationDefine = RelationDefineUtil.splitEntityId(entityId);
call.setSource(relationDefine.getSource());
call.setTarget(relationDefine.getDest());
call.setComponentId(relationDefine.getComponentId());
if (isClientSide) {
call.setDetectPoint(DetectPoint.CLIENT);
} else {
call.setDetectPoint(DetectPoint.SERVER);
}
call.generateID();
calls.add(call);
}
}
}