| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import org.apache.skywalking.oap.server.core.analysis.manual.relation.endpoint.EndpointRelationServerSideMetrics; |
| import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationClientSideMetrics; |
| import org.apache.skywalking.oap.server.core.analysis.manual.relation.instance.ServiceInstanceRelationServerSideMetrics; |
| import org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationClientSideMetrics; |
| import org.apache.skywalking.oap.server.core.analysis.manual.relation.process.ProcessRelationServerSideMetrics; |
| import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationClientSideMetrics; |
| import org.apache.skywalking.oap.server.core.analysis.manual.relation.service.ServiceRelationServerSideMetrics; |
| import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; |
| import org.apache.skywalking.oap.server.core.query.input.Duration; |
| import org.apache.skywalking.oap.server.core.query.type.Call; |
| import org.apache.skywalking.oap.server.core.source.DetectPoint; |
| import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO; |
| import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; |
| |
| public class H2TopologyQueryDAO implements ITopologyQueryDAO { |
| private JDBCHikariCPClient h2Client; |
| |
| public H2TopologyQueryDAO(JDBCHikariCPClient h2Client) { |
| this.h2Client = h2Client; |
| } |
| |
| @Override |
| public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(Duration duration, |
| List<String> serviceIds) throws IOException { |
| return loadServiceCalls( |
| ServiceRelationServerSideMetrics.INDEX_NAME, duration, |
| ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, |
| ServiceRelationServerSideMetrics.DEST_SERVICE_ID, serviceIds, DetectPoint.SERVER |
| ); |
| } |
| |
| @Override |
| public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(Duration duration, |
| List<String> serviceIds) throws IOException { |
| return loadServiceCalls( |
| ServiceRelationClientSideMetrics.INDEX_NAME, duration, |
| ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID, |
| ServiceRelationClientSideMetrics.DEST_SERVICE_ID, serviceIds, DetectPoint.CLIENT |
| ); |
| } |
| |
| @Override |
| public List<Call.CallDetail> loadServiceRelationsDetectedAtServerSide(Duration duration) throws IOException { |
| return loadServiceCalls( |
| ServiceRelationServerSideMetrics.INDEX_NAME, duration, |
| ServiceRelationServerSideMetrics.SOURCE_SERVICE_ID, |
| ServiceRelationServerSideMetrics.DEST_SERVICE_ID, new ArrayList<>(0), DetectPoint.SERVER |
| ); |
| } |
| |
| @Override |
| public List<Call.CallDetail> loadServiceRelationDetectedAtClientSide(Duration duration) throws IOException { |
| return loadServiceCalls( |
| ServiceRelationClientSideMetrics.INDEX_NAME, duration, |
| ServiceRelationClientSideMetrics.SOURCE_SERVICE_ID, |
| ServiceRelationClientSideMetrics.DEST_SERVICE_ID, new ArrayList<>(0), DetectPoint.CLIENT |
| ); |
| } |
| |
| @Override |
| public List<Call.CallDetail> loadInstanceRelationDetectedAtServerSide(String clientServiceId, |
| String serverServiceId, |
| Duration duration) throws IOException { |
| return loadServiceInstanceCalls( |
| ServiceInstanceRelationServerSideMetrics.INDEX_NAME, duration, |
| ServiceInstanceRelationServerSideMetrics.SOURCE_SERVICE_ID, |
| ServiceInstanceRelationServerSideMetrics.DEST_SERVICE_ID, clientServiceId, serverServiceId, |
| DetectPoint.SERVER |
| ); |
| } |
| |
| @Override |
| public List<Call.CallDetail> loadInstanceRelationDetectedAtClientSide(String clientServiceId, |
| String serverServiceId, |
| Duration duration) throws IOException { |
| return loadServiceInstanceCalls( |
| ServiceInstanceRelationClientSideMetrics.INDEX_NAME, duration, |
| ServiceInstanceRelationClientSideMetrics.SOURCE_SERVICE_ID, |
| ServiceInstanceRelationClientSideMetrics.DEST_SERVICE_ID, clientServiceId, serverServiceId, |
| DetectPoint.CLIENT |
| ); |
| } |
| |
| @Override |
| public List<Call.CallDetail> loadEndpointRelation(Duration duration, |
| String destEndpointId) throws IOException { |
| List<Call.CallDetail> calls = loadEndpointFromSide( |
| EndpointRelationServerSideMetrics.INDEX_NAME, duration, |
| EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, |
| EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId, false |
| ); |
| calls.addAll( |
| loadEndpointFromSide(EndpointRelationServerSideMetrics.INDEX_NAME, duration, |
| EndpointRelationServerSideMetrics.SOURCE_ENDPOINT, |
| EndpointRelationServerSideMetrics.DEST_ENDPOINT, destEndpointId, true |
| )); |
| return calls; |
| } |
| |
| @Override |
| public List<Call.CallDetail> loadProcessRelationDetectedAtClientSide(String serviceInstanceId, Duration duration) throws IOException { |
| return loadProcessFromSide(duration, serviceInstanceId, DetectPoint.CLIENT); |
| } |
| |
| @Override |
| public List<Call.CallDetail> loadProcessRelationDetectedAtServerSide(String serviceInstanceId, Duration duration) throws IOException { |
| return loadProcessFromSide(duration, serviceInstanceId, DetectPoint.SERVER); |
| } |
| |
| private List<Call.CallDetail> loadServiceCalls(String tableName, |
| Duration duration, |
| String sourceCName, |
| String destCName, |
| List<String> serviceIds, |
| DetectPoint detectPoint) throws IOException { |
| Object[] conditions = new Object[serviceIds.size() * 2 + 2]; |
| conditions[0] = duration.getStartTimeBucket(); |
| conditions[1] = duration.getEndTimeBucket(); |
| StringBuilder serviceIdMatchSql = new StringBuilder(); |
| if (serviceIds.size() > 0) { |
| serviceIdMatchSql.append("and ("); |
| for (int i = 0; i < serviceIds.size(); i++) { |
| serviceIdMatchSql.append(sourceCName + "=? or " + destCName + "=? "); |
| conditions[i * 2 + 2] = serviceIds.get(i); |
| conditions[i * 2 + 1 + 2] = serviceIds.get(i); |
| if (i != serviceIds.size() - 1) { |
| serviceIdMatchSql.append("or "); |
| } |
| } |
| serviceIdMatchSql.append(")"); |
| } |
| List<Call.CallDetail> calls = new ArrayList<>(); |
| try (Connection connection = h2Client.getConnection()) { |
| try (ResultSet resultSet = h2Client.executeQuery( |
| connection, |
| "select " + Metrics.ENTITY_ID + ", " + ServiceRelationServerSideMetrics.COMPONENT_ID |
| + " from " + tableName + " where " + Metrics.TIME_BUCKET + ">= ? and " |
| + Metrics.TIME_BUCKET + "<=? " + serviceIdMatchSql |
| .toString() + |
| " group by " + Metrics.ENTITY_ID + "," + ServiceRelationServerSideMetrics.COMPONENT_ID, conditions |
| )) { |
| buildServiceCalls(resultSet, calls, detectPoint); |
| } |
| } catch (SQLException e) { |
| throw new IOException(e); |
| } |
| return calls; |
| } |
| |
| private List<Call.CallDetail> loadServiceInstanceCalls(String tableName, |
| Duration duration, |
| String sourceCName, |
| String descCName, |
| String sourceServiceId, |
| String destServiceId, |
| DetectPoint detectPoint) throws IOException { |
| Object[] conditions = new Object[] { |
| duration.getStartTimeBucket(), |
| duration.getEndTimeBucket(), |
| sourceServiceId, |
| destServiceId, |
| destServiceId, |
| sourceServiceId |
| }; |
| StringBuilder serviceIdMatchSql = new StringBuilder("and ((").append(sourceCName) |
| .append("=? and ") |
| .append(descCName) |
| .append("=?") |
| .append(") or (") |
| .append(sourceCName) |
| .append("=? and ") |
| .append(descCName) |
| .append("=?") |
| .append("))"); |
| List<Call.CallDetail> calls = new ArrayList<>(); |
| try (Connection connection = h2Client.getConnection()) { |
| try (ResultSet resultSet = h2Client.executeQuery( |
| connection, |
| "select " + Metrics.ENTITY_ID + ", " + ServiceInstanceRelationServerSideMetrics.COMPONENT_ID |
| + " from " + tableName + " where " + Metrics.TIME_BUCKET + ">= ? and " + Metrics.TIME_BUCKET + "<=? " + serviceIdMatchSql |
| .toString() + " group by " + Metrics.ENTITY_ID + ", " + ServiceInstanceRelationServerSideMetrics.COMPONENT_ID, |
| conditions |
| )) { |
| buildInstanceCalls(resultSet, calls, detectPoint); |
| } |
| } catch (SQLException e) { |
| throw new IOException(e); |
| } |
| return calls; |
| } |
| |
| private List<Call.CallDetail> loadEndpointFromSide(String tableName, |
| Duration duration, |
| String sourceCName, |
| String destCName, |
| String id, |
| boolean isSourceId) throws IOException { |
| Object[] conditions = new Object[3]; |
| conditions[0] = duration.getStartTimeBucket(); |
| conditions[1] = duration.getEndTimeBucket(); |
| conditions[2] = id; |
| List<Call.CallDetail> calls = new ArrayList<>(); |
| try (Connection connection = h2Client.getConnection()) { |
| try (ResultSet resultSet = h2Client.executeQuery( |
| connection, |
| "select " + Metrics.ENTITY_ID + " from " + tableName |
| + " where " + Metrics.TIME_BUCKET + ">= ? and " + Metrics.TIME_BUCKET + "<=? and " |
| + (isSourceId ? sourceCName : destCName) + "=?" |
| + " group by " + Metrics.ENTITY_ID, |
| conditions |
| )) { |
| buildEndpointCalls(resultSet, calls, DetectPoint.SERVER); |
| } |
| } catch (SQLException e) { |
| throw new IOException(e); |
| } |
| return calls; |
| } |
| |
| private List<Call.CallDetail> loadProcessFromSide(Duration duration, |
| String instanceId, |
| DetectPoint detectPoint) throws IOException { |
| Object[] conditions = new Object[3]; |
| conditions[0] = duration.getStartTimeBucket(); |
| conditions[1] = duration.getEndTimeBucket(); |
| conditions[2] = instanceId; |
| List<Call.CallDetail> calls = new ArrayList<>(); |
| try (Connection connection = h2Client.getConnection()) { |
| try (ResultSet resultSet = h2Client.executeQuery( |
| connection, |
| "select " + Metrics.ENTITY_ID + ", " + ProcessRelationServerSideMetrics.COMPONENT_ID |
| + " from " + (detectPoint == DetectPoint.SERVER ? ProcessRelationServerSideMetrics.INDEX_NAME : ProcessRelationClientSideMetrics.INDEX_NAME) |
| + " where " + Metrics.TIME_BUCKET + ">= ? and " + Metrics.TIME_BUCKET + "<=? and " |
| + ProcessRelationClientSideMetrics.SERVICE_INSTANCE_ID + "=?" |
| + " group by " + Metrics.ENTITY_ID + ", " + ProcessRelationServerSideMetrics.COMPONENT_ID, |
| conditions |
| )) { |
| buildProcessCalls(resultSet, calls, detectPoint); |
| } |
| } catch (SQLException e) { |
| throw new IOException(e); |
| } |
| return calls; |
| } |
| |
| private void buildServiceCalls(ResultSet resultSet, List<Call.CallDetail> calls, |
| DetectPoint detectPoint) throws SQLException { |
| while (resultSet.next()) { |
| Call.CallDetail call = new Call.CallDetail(); |
| String entityId = resultSet.getString(Metrics.ENTITY_ID); |
| final int componentId = resultSet.getInt(ServiceRelationServerSideMetrics.COMPONENT_ID); |
| call.buildFromServiceRelation(entityId, componentId, detectPoint); |
| calls.add(call); |
| } |
| } |
| |
| private void buildInstanceCalls(ResultSet resultSet, List<Call.CallDetail> calls, |
| DetectPoint detectPoint) throws SQLException { |
| while (resultSet.next()) { |
| Call.CallDetail call = new Call.CallDetail(); |
| String entityId = resultSet.getString(Metrics.ENTITY_ID); |
| final int componentId = resultSet.getInt(ServiceRelationServerSideMetrics.COMPONENT_ID); |
| call.buildFromInstanceRelation(entityId, componentId, detectPoint); |
| calls.add(call); |
| } |
| } |
| |
| private void buildEndpointCalls(ResultSet resultSet, List<Call.CallDetail> calls, |
| DetectPoint detectPoint) throws SQLException { |
| while (resultSet.next()) { |
| Call.CallDetail call = new Call.CallDetail(); |
| String entityId = resultSet.getString(Metrics.ENTITY_ID); |
| call.buildFromEndpointRelation(entityId, detectPoint); |
| calls.add(call); |
| } |
| } |
| |
| private void buildProcessCalls(ResultSet resultSet, List<Call.CallDetail> calls, |
| DetectPoint detectPoint) throws SQLException { |
| while (resultSet.next()) { |
| Call.CallDetail call = new Call.CallDetail(); |
| String entityId = resultSet.getString(Metrics.ENTITY_ID); |
| int componentId = resultSet.getInt(ProcessRelationServerSideMetrics.COMPONENT_ID); |
| call.buildProcessRelation(entityId, componentId, detectPoint); |
| calls.add(call); |
| } |
| } |
| } |