blob: d8adcaa6701910b346ed2a1be12717b43d262247 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.monitoring.opencensus;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import io.opencensus.trace.SpanId;
import org.apache.ignite.client.Config;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.spi.tracing.TracingConfigurationCoordinates;
import org.apache.ignite.spi.tracing.TracingConfigurationParameters;
import org.junit.Test;
import static java.sql.DriverManager.getConnection;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_PAGE_ROWS;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_BATCH_PROCESS;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CMD_QRY_EXECUTE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CURSOR_CLOSE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CURSOR_OPEN;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_CLOSE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_OPEN;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PAGE_FETCH;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY_EXECUTE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY_PARSE;
import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
import static org.apache.ignite.spi.tracing.Scope.SQL;
import static org.apache.ignite.spi.tracing.TracingConfigurationParameters.SAMPLING_RATE_ALWAYS;
import static org.apache.ignite.spi.tracing.TracingConfigurationParameters.SAMPLING_RATE_NEVER;
/**
* Tests tracing of SQL quries execution via JDBC.
*/
public class OpenCensusSqlJdbcTracingTest extends OpenCensusSqlNativeTracingTest {
/** JDBC URL prefix. */
private static final String JDBC_URL_PREFIX = "jdbc:ignite:thin://";
/** CSV file for bulk-load testing. */
private static final String BULKLOAD_FILE = Objects.requireNonNull(resolveIgnitePath(
"/modules/clients/src/test/resources/bulkload2.csv")).getAbsolutePath();
/** Number of bulk-load entries. */
private static final int BULKLOAD_ENTRIES = 2;
/**
* Tests tracing of local SQL SELECT query.
*
* @throws Exception If failed.
*/
@Test
public void testSelectLocal() throws Exception {
String orgTable = createTableAndPopulate(Organization.class, REPLICATED, 1);
SpanId rootSpan = executeAndCheckRootSpan("SELECT orgVal FROM " + orgTable, TEST_SCHEMA, false, false, true);
checkChildSpan(SQL_QRY_PARSE, rootSpan);
checkChildSpan(SQL_CURSOR_OPEN, rootSpan);
checkChildSpan(SQL_ITER_OPEN, rootSpan);
SpanId iterSpan = checkChildSpan(SQL_ITER_OPEN, rootSpan);
checkChildSpan(SQL_QRY_EXECUTE, iterSpan);
int fetchedRows = findChildSpans(SQL_PAGE_FETCH, rootSpan).stream()
.mapToInt(span -> Integer.parseInt(getAttribute(span, SQL_PAGE_ROWS)))
.sum();
assertEquals(TEST_TABLE_POPULATION, fetchedRows);
checkChildSpan(SQL_ITER_CLOSE, rootSpan);
assertFalse(findChildSpans(SQL_CURSOR_CLOSE, rootSpan).isEmpty());
}
/**
* Test SQL bulk load query tracing.
*/
@Test
public void testCopy() throws Exception {
ignite(0).tracingConfiguration().set(
new TracingConfigurationCoordinates.Builder(SQL).build(),
new TracingConfigurationParameters.Builder()
.withSamplingRate(SAMPLING_RATE_NEVER).build());
String table = "test_table";
executeQuery(
"CREATE TABLE " + table + "(id LONG PRIMARY KEY, first_name VARCHAR, last_name VARCHAR, age LONG)",
DFLT_SCHEMA, false, false, null);
ignite(0).tracingConfiguration().set(
new TracingConfigurationCoordinates.Builder(SQL).build(),
new TracingConfigurationParameters.Builder()
.withSamplingRate(SAMPLING_RATE_ALWAYS).build());
SpanId rootSpan = executeAndCheckRootSpan(
"COPY FROM '" + BULKLOAD_FILE + "' INTO " + table + "(id, age, first_name, last_name) FORMAT csv",
DFLT_SCHEMA, false, false, false);
checkChildSpan(SQL_QRY_PARSE, rootSpan);
checkChildSpan(SQL_CMD_QRY_EXECUTE, rootSpan);
checkSpan(SQL_BATCH_PROCESS, rootSpan, BULKLOAD_ENTRIES, null);
}
/** {@inheritDoc} */
@Override public void testSelectQueryUserThreadSpanNotAffected() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
String orgTable = createTableAndPopulate(Organization.class, PARTITIONED, 1);
String url = JDBC_URL_PREFIX + Config.SERVER + '/' + TEST_SCHEMA;
try (
Connection prsntConn = getConnection(url);
Connection orgConn = getConnection(url);
PreparedStatement prsntStmt = prsntConn.prepareStatement("SELECT * FROM " + prsnTable);
PreparedStatement orgStmt = orgConn.prepareStatement("SELECT * FROM " + orgTable)
) {
prsntStmt.executeQuery();
orgStmt.executeQuery();
try (
ResultSet prsnResultSet = prsntStmt.getResultSet();
ResultSet orgResultSet = orgStmt.getResultSet()
) {
while (prsnResultSet.next() && orgResultSet.next()) {
// No-op.
}
}
}
catch (SQLException e) {
throw new RuntimeException(e);
}
handler().flush();
checkDroppedSpans();
List<SpanId> rootSpans = findRootSpans(SQL_QRY);
assertEquals(2, rootSpans.size());
for (SpanId rootSpan : rootSpans)
checkBasicSelectQuerySpanTree(rootSpan, TEST_TABLE_POPULATION);
}
/** {@inheritDoc} */
@SuppressWarnings("StatementWithEmptyBody")
@Override protected void executeQuery(
String sql,
String schema,
boolean skipReduceOnUpdate,
boolean distributedJoins,
Boolean isQry
) {
String url = JDBC_URL_PREFIX + Config.SERVER + '/' + schema +
"?skipReducerOnUpdate=" + skipReduceOnUpdate + "&distributedJoins=" + distributedJoins;
try (
Connection conn = getConnection(url);
PreparedStatement stmt = conn.prepareStatement(sql)
) {
stmt.setFetchSize(PAGE_SIZE);
if (isQry == null)
stmt.execute();
else if (!isQry)
stmt.executeUpdate();
else {
stmt.executeQuery();
try (ResultSet resultSet = stmt.getResultSet()) {
while (resultSet.next()) {
// No-op.
}
}
}
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
/** {@inheritDoc} */
@Override protected IgniteEx reducer() {
return ignite(0);
}
/** {@inheritDoc} */
@Override protected int mapNodesCount() {
return GRID_CNT - 1;
}
}