blob: 309b6f73d1e226dd1c029198888887bb28438f0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.monitoring.opencensus;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import io.opencensus.trace.SpanId;
import io.opencensus.trace.Tracing;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.NoopSpan;
import org.apache.ignite.internal.processors.tracing.SpanType;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.tracing.TracingConfigurationCoordinates;
import org.apache.ignite.spi.tracing.TracingConfigurationParameters;
import org.apache.ignite.spi.tracing.TracingSpi;
import org.apache.ignite.spi.tracing.opencensus.OpenCensusTracingSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import static java.lang.Boolean.parseBoolean;
import static java.lang.Integer.parseInt;
import static java.util.regex.Pattern.compile;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
import static org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA;
import static org.apache.ignite.internal.processors.tracing.SpanTags.CONSISTENT_ID;
import static org.apache.ignite.internal.processors.tracing.SpanTags.NAME;
import static org.apache.ignite.internal.processors.tracing.SpanTags.NODE;
import static org.apache.ignite.internal.processors.tracing.SpanTags.NODE_ID;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_CACHE_UPDATES;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_IDX_RANGE_ROWS;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_PAGE_ROWS;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_PARSER_CACHE_HIT;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_QRY_TEXT;
import static org.apache.ignite.internal.processors.tracing.SpanTags.SQL_SCHEMA;
import static org.apache.ignite.internal.processors.tracing.SpanTags.tag;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CACHE_UPDATE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CMD_QRY_EXECUTE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CURSOR_CANCEL;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CURSOR_CLOSE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_CURSOR_OPEN;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_DML_QRY_EXECUTE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_DML_QRY_EXEC_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_DML_QRY_RESP;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_FAIL_RESP;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_IDX_RANGE_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_IDX_RANGE_RESP;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_CLOSE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_ITER_OPEN;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_NEXT_PAGE_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PAGE_FETCH;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PAGE_PREPARE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PAGE_RESP;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PAGE_WAIT;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_PARTITIONS_RESERVE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY_CANCEL_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY_EXECUTE;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY_EXEC_REQ;
import static org.apache.ignite.internal.processors.tracing.SpanType.SQL_QRY_PARSE;
import static org.apache.ignite.spi.tracing.Scope.SQL;
import static org.apache.ignite.spi.tracing.TracingConfigurationParameters.SAMPLING_RATE_ALWAYS;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
* Tests tracing of SQL queries based on {@link OpenCensusTracingSpi}.
*/
public class OpenCensusSqlNativeTracingTest extends AbstractTracingTest {
/** Number of entries in test table. */
protected static final int TEST_TABLE_POPULATION = 100;
/** Page size for queries. */
protected static final int PAGE_SIZE = 20;
/** Test schema name. */
protected static final String TEST_SCHEMA = "TEST_SCHEMA";
/** Key counter. */
private final AtomicInteger keyCntr = new AtomicInteger();
/** {@inheritDoc} */
@Override protected TracingSpi<?> getTracingSpi() {
return new OpenCensusTracingSpi();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setCommunicationSpi(new TestRecordingCommunicationSpi());
}
/** {@inheritDoc} */
@Override public void before() throws Exception {
super.before();
IgniteEx cli = startClientGrid(GRID_CNT);
cli.tracingConfiguration().set(
new TracingConfigurationCoordinates.Builder(SQL).build(),
new TracingConfigurationParameters.Builder()
.withSamplingRate(SAMPLING_RATE_ALWAYS).build());
}
/**
* Tests tracing of UPDATE query with skipped reduce phase.
*
* @throws Exception If failed.
*/
@Test
public void testUpdateWithReducerSkipped() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
SpanId rootSpan = executeAndCheckRootSpan(
"UPDATE " + prsnTable + " SET prsnVal = (prsnVal + 1)", TEST_SCHEMA, true, false, false);
checkChildSpan(SQL_QRY_PARSE, rootSpan);
SpanId dmlExecSpan = checkChildSpan(SQL_DML_QRY_EXECUTE, rootSpan);
List<SpanId> execReqSpans = checkSpan(SQL_DML_QRY_EXEC_REQ, dmlExecSpan, GRID_CNT, null);
int fetchedRows = 0;
int cacheUpdates = 0;
assertEquals(GRID_CNT, execReqSpans.size());
for (int i = 0; i < GRID_CNT; i++) {
SpanId execReqSpan = execReqSpans.get(i);
checkChildSpan(SQL_PARTITIONS_RESERVE, execReqSpan);
checkSpan(SQL_QRY_PARSE, execReqSpan, 2, null);
SpanId iterSpan = checkChildSpan(SQL_ITER_OPEN, execReqSpan);
checkChildSpan(SQL_QRY_EXECUTE, iterSpan);
fetchedRows += findChildSpans(SQL_PAGE_FETCH, execReqSpan).stream()
.mapToInt(span -> parseInt(getAttribute(span, SQL_PAGE_ROWS)))
.sum();
List<SpanId> cacheUpdateSpans = findChildSpans(SQL_CACHE_UPDATE, execReqSpan);
cacheUpdates += cacheUpdateSpans.stream()
.mapToInt(span -> parseInt(getAttribute(span, SQL_CACHE_UPDATES)))
.sum();
checkChildSpan(SQL_ITER_CLOSE, execReqSpan);
checkChildSpan(SQL_DML_QRY_RESP, execReqSpan);
}
assertEquals(TEST_TABLE_POPULATION, fetchedRows);
assertEquals(TEST_TABLE_POPULATION, cacheUpdates);
}
/**
* Tests tracing of multiple MERGE query.
*
* @throws Exception If failed.
*/
@Test
public void testMultipleMerge() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
checkDmlQuerySpans("MERGE INTO " + prsnTable + "(_key, prsnVal) SELECT _key, 0 FROM " + prsnTable,
true, TEST_TABLE_POPULATION);
}
/**
* Tests tracing of single MERGE query.
*
* @throws Exception If failed.
*/
@Test
public void testSingleMerge() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
checkDmlQuerySpans("MERGE INTO " + prsnTable + "(_key, prsnId, prsnVal) VALUES (" + keyCntr.get() + ", 0, 0)",
false, 1);
}
/**
* Tests tracing of UPDATE query.
*
* @throws Exception If failed.
*/
@Test
public void testUpdate() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
checkDmlQuerySpans("UPDATE " + prsnTable + " SET prsnVal = (prsnVal + 1)",
true, TEST_TABLE_POPULATION);
}
/**
* Tests tracing of DELETE query.
*
* @throws Exception If failed.
*/
@Test
public void testDelete() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
checkDmlQuerySpans("DELETE FROM " + prsnTable, true, TEST_TABLE_POPULATION);
}
/**
* Tests tracing of multiple INSERT query.
*
* @throws Exception If failed.
*/
@Test
public void testMultipleInsert() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
checkDmlQuerySpans("INSERT INTO " + prsnTable + "(_key, prsnId, prsnVal) VALUES" +
" (" + keyCntr.incrementAndGet() + ", 0, 0)," +
" (" + keyCntr.incrementAndGet() + ", 1, 1)", false, 2);
}
/**
* Tests tracing of single INSERT query.
*
* @throws Exception If failed.
*/
@Test
public void testSingleInsert() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
checkDmlQuerySpans("INSERT INTO " + prsnTable + "(_key, prsnId, prsnVal) VALUES" +
" (" + keyCntr.incrementAndGet() + ", 0, 0)", false, 1);
}
/**
* Tests tracing of distributed join query which includes all communications between reducer and mapped nodes and
* index range requests.
*
* @throws Exception If failed.
*/
@Test
public void testDistributedJoin() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
String orgTable = createTableAndPopulate(Organization.class, PARTITIONED, 1);
SpanId rootSpan = executeAndCheckRootSpan(
"SELECT * FROM " + prsnTable + " AS p JOIN " + orgTable + " AS o ON o.orgId = p.prsnId",
TEST_SCHEMA, false, true, true);
checkChildSpan(SQL_QRY_PARSE, rootSpan);
checkChildSpan(SQL_CURSOR_OPEN, rootSpan);
SpanId iterSpan = checkChildSpan(SQL_ITER_OPEN, rootSpan);
List<SpanId> execReqSpans = checkSpan(SQL_QRY_EXEC_REQ, iterSpan, GRID_CNT, null);
int idxRangeReqRows = 0;
int preparedRows = 0;
int fetchedRows = 0;
for (int i = 0; i < GRID_CNT; i++) {
SpanId execReqSpan = execReqSpans.get(i);
Ignite ignite = Ignition.ignite(UUID.fromString(getAttribute(execReqSpan, NODE_ID)));
SpanId partsReserveSpan = checkChildSpan(SQL_PARTITIONS_RESERVE, execReqSpan);
List<String> partsReserveLogs = handler().spanById(partsReserveSpan).getAnnotations().getEvents().stream()
.map(e -> e.getEvent().getDescription())
.collect(Collectors.toList());
assertEquals(2, partsReserveLogs.size());
Pattern ptrn = compile("Cache partitions were reserved \\[cache=(.+), partitions=\\[(.+)], topology=(.+)]");
partsReserveLogs.forEach(l -> {
Matcher matcher = ptrn.matcher(l);
assertTrue(matcher.matches());
Set<Integer> expParts = Arrays.stream(ignite.affinity(matcher.group(1))
.primaryPartitions(ignite.cluster().localNode())
).boxed().collect(Collectors.toSet());
Set<Integer> parts = Arrays.stream(matcher.group(2).split(","))
.map(s -> parseInt(s.trim()))
.collect(Collectors.toSet());
assertEquals(expParts, parts);
});
SpanId execSpan = checkChildSpan(SQL_QRY_EXECUTE, execReqSpan);
List<SpanId> distrLookupReqSpans = findChildSpans(SQL_IDX_RANGE_REQ, execSpan);
for (SpanId span : distrLookupReqSpans) {
idxRangeReqRows += parseInt(getAttribute(span, SQL_IDX_RANGE_ROWS));
checkChildSpan(SQL_IDX_RANGE_RESP, span);
}
preparedRows += parseInt(getAttribute(checkChildSpan(SQL_PAGE_PREPARE, execReqSpan), SQL_PAGE_ROWS));
checkChildSpan(SQL_PAGE_RESP, execReqSpan);
}
SpanId pageFetchSpan = checkChildSpan(SQL_PAGE_FETCH, iterSpan);
fetchedRows += parseInt(getAttribute(pageFetchSpan, SQL_PAGE_ROWS));
checkChildSpan(SQL_PAGE_WAIT, pageFetchSpan);
SpanId nexPageSpan = checkChildSpan(SQL_NEXT_PAGE_REQ, pageFetchSpan);
preparedRows += parseInt(getAttribute(checkChildSpan(SQL_PAGE_PREPARE, nexPageSpan), SQL_PAGE_ROWS));
checkChildSpan(SQL_PAGE_RESP, nexPageSpan);
List<SpanId> pageFetchSpans = findChildSpans(SQL_PAGE_FETCH, rootSpan);
for (SpanId span : pageFetchSpans) {
fetchedRows += parseInt(getAttribute(span, SQL_PAGE_ROWS));
checkChildSpan(SQL_PAGE_WAIT, span);
List<SpanId> nextPageSpans = findChildSpans(SQL_NEXT_PAGE_REQ, span);
if (!nextPageSpans.isEmpty()) {
assertEquals(1, nextPageSpans.size());
SpanId nextPageSpan = nextPageSpans.get(0);
preparedRows += parseInt(getAttribute(checkChildSpan(SQL_PAGE_PREPARE, nextPageSpan), SQL_PAGE_ROWS));
checkChildSpan(SQL_PAGE_RESP, nextPageSpan);
}
}
assertEquals(TEST_TABLE_POPULATION, fetchedRows);
assertEquals(TEST_TABLE_POPULATION, preparedRows);
assertEquals(TEST_TABLE_POPULATION, idxRangeReqRows);
checkSpan(SQL_QRY_CANCEL_REQ, rootSpan, mapNodesCount(), null);
assertFalse(findChildSpans(SQL_CURSOR_CLOSE, rootSpan).isEmpty());
}
/**
* Tests tracing of SELECT query with parallelism.
*
* @throws Exception If failed.
*/
@Test
public void testSelectWithParallelism() throws Exception {
int qryParallelism = 2;
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, qryParallelism);
SpanId rootSpan = executeAndCheckRootSpan("SELECT * FROM " + prsnTable,
TEST_SCHEMA, false, false, true);
SpanId iterOpenSpan = checkChildSpan(SQL_ITER_OPEN, rootSpan);
List<SpanId> qryExecSpans = findChildSpans(SQL_QRY_EXEC_REQ, iterOpenSpan);
assertEquals(GRID_CNT * qryParallelism, qryExecSpans.size());
}
/**
* Tests tracing of the SQL query next page request failure.
*
* @throws Exception If failed.
*/
@Test
@SuppressWarnings("Convert2MethodRef")
public void testNextPageRequestFailure() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
IgniteEx qryInitiator = reducer();
spi(qryInitiator).blockMessages((node, msg) -> msg instanceof GridQueryNextPageRequest);
IgniteInternalFuture<?> iterFut = runAsync(() ->
executeQuery("SELECT * FROM " + prsnTable, TEST_SCHEMA, false, false, true));
spi(qryInitiator).waitForBlocked(mapNodesCount());
qryInitiator.context().query().runningQueries(-1).iterator().next().cancel();
spi(qryInitiator).stopBlock();
GridTestUtils.assertThrowsWithCause(() -> iterFut.get(), IgniteCheckedException.class);
handler().flush();
checkDroppedSpans();
SpanId rootSpan = checkChildSpan(SQL_QRY, null);
SpanId cursorCancelSpan = checkChildSpan(SQL_CURSOR_CANCEL, rootSpan);
SpanId cursorCloseSpan = checkChildSpan(SQL_CURSOR_CLOSE, cursorCancelSpan);
SpanId iterCloseSpan = checkChildSpan(SQL_ITER_CLOSE, cursorCloseSpan);
checkSpan(SQL_QRY_CANCEL_REQ, iterCloseSpan, mapNodesCount(), null);
List<SpanId> pageFetchSpans = findChildSpans(SQL_PAGE_FETCH, rootSpan);
for (SpanId pageFetchSpan : pageFetchSpans) {
List<SpanId> nextPageReqSpans = findChildSpans(SQL_NEXT_PAGE_REQ, pageFetchSpan);
if (!nextPageReqSpans.isEmpty()) {
assertEquals(1, nextPageReqSpans.size());
checkChildSpan(SQL_FAIL_RESP, nextPageReqSpans.get(0));
}
}
}
/**
* Tests tracing of the CREATE TABLE command execution.
*/
@Test
public void testCreateTable() throws Exception {
SpanId rootSpan = executeAndCheckRootSpan("CREATE TABLE test_table(id INT PRIMARY KEY, val VARCHAR)",
DFLT_SCHEMA, false, false, null);
checkChildSpan(SQL_QRY_PARSE, rootSpan);
checkChildSpan(SQL_CMD_QRY_EXECUTE, rootSpan);
}
/** Tests SQL parser cache hit tag. */
@Test
public void testParserCacheHitTag() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
SpanId rootSpan = executeAndCheckRootSpan("SELECT * FROM " + prsnTable,
TEST_SCHEMA, false, false, true);
SpanId qryParseSpan = checkChildSpan(SQL_QRY_PARSE, rootSpan);
assertFalse(parseBoolean(getAttribute(qryParseSpan, SQL_PARSER_CACHE_HIT)));
handler().clearCollectedSpans();
rootSpan = executeAndCheckRootSpan("SELECT * FROM " + prsnTable,
TEST_SCHEMA, false, false, true);
qryParseSpan = checkChildSpan(SQL_QRY_PARSE, rootSpan);
assertTrue(parseBoolean(getAttribute(qryParseSpan, SQL_PARSER_CACHE_HIT)));
}
/**
* Tests that tracing of multiple SELECT queries produces separate span tree for each query and does not affect
* user thread {@link MTC#span()} value during execution and after it.
*/
@Test
public void testSelectQueryUserThreadSpanNotAffected() throws Exception {
String prsnTable = createTableAndPopulate(Person.class, PARTITIONED, 1);
String orgTable = createTableAndPopulate(Organization.class, PARTITIONED, 1);
try (
FieldsQueryCursor<List<?>> prsnQryCursor = reducer().context().query()
.querySqlFields(new SqlFieldsQuery("SELECT * FROM " + prsnTable), false);
FieldsQueryCursor<List<?>> orgQryCursor = reducer().context().query()
.querySqlFields(new SqlFieldsQuery("SELECT * FROM " + orgTable), false)
) {
Iterator<List<?>> prsnQryIter = prsnQryCursor.iterator();
Iterator<List<?>> orgQryIter = orgQryCursor.iterator();
while (prsnQryIter.hasNext() && orgQryIter.hasNext()) {
assertEquals(NoopSpan.INSTANCE, MTC.span());
prsnQryIter.next();
assertEquals(NoopSpan.INSTANCE, MTC.span());
orgQryIter.next();
assertEquals(NoopSpan.INSTANCE, MTC.span());
}
}
assertEquals(NoopSpan.INSTANCE, MTC.span());
handler().flush();
checkDroppedSpans();
List<SpanId> rootSpans = findRootSpans(SQL_QRY);
assertEquals(2, rootSpans.size());
for (SpanId rootSpan : rootSpans)
checkBasicSelectQuerySpanTree(rootSpan, TEST_TABLE_POPULATION);
}
/**
* Checks presence of basic spans that related to SELECT SQL query and are childs of the specfied span.
*
* @param expRows Number of rows as a result of SELECT query.
* @param rootSpan Span which childs will be checked.
*/
protected void checkBasicSelectQuerySpanTree(SpanId rootSpan, int expRows) {
int fetchedRows = 0;
SpanId iterSpan = checkChildSpan(SQL_ITER_OPEN, rootSpan);
SpanId fetchSpan = checkChildSpan(SQL_PAGE_FETCH, iterSpan);
fetchedRows += parseInt(getAttribute(fetchSpan, SQL_PAGE_ROWS));
List<SpanId> pageFetchSpans = findChildSpans(SQL_PAGE_FETCH, rootSpan);
for (SpanId span : pageFetchSpans)
fetchedRows += parseInt(getAttribute(span, SQL_PAGE_ROWS));
assertEquals(expRows, fetchedRows);
assertFalse(findChildSpans(SQL_CURSOR_CLOSE, rootSpan).isEmpty());
}
/**
* Executes DML query and checks corresponding span tree.
*
* @param qry SQL query to execute.
* @param fetchRequired Whether query need to fetch data before cache update.
*/
private void checkDmlQuerySpans(String qry, boolean fetchRequired, int expCacheUpdates) throws Exception {
SpanId rootSpan = executeAndCheckRootSpan(qry, TEST_SCHEMA, false,false,false);
checkChildSpan(SQL_QRY_PARSE, rootSpan);
SpanId dmlExecSpan = checkChildSpan(SQL_DML_QRY_EXECUTE, rootSpan);
if (fetchRequired) {
checkChildSpan(SQL_ITER_OPEN, dmlExecSpan);
int fetchedRows = findChildSpans(SQL_PAGE_FETCH, null).stream()
.mapToInt(span -> parseInt(getAttribute(span, SQL_PAGE_ROWS)))
.sum();
assertEquals(expCacheUpdates, fetchedRows);
}
int cacheUpdates = findChildSpans(SQL_CACHE_UPDATE, dmlExecSpan).stream()
.mapToInt(span -> parseInt(getAttribute(span, SQL_CACHE_UPDATES)))
.sum();
assertEquals(expCacheUpdates, cacheUpdates);
}
/**
* Checks whether parent span has a single child span with specified type.
*
* @param type Span type.
* @param parentSpan Parent span id.
* @return Id of the the child span.
*/
protected SpanId checkChildSpan(SpanType type, SpanId parentSpan) {
return checkSpan(type, parentSpan,1, null).get(0);
}
/**
* Finds child spans with specified type and parent span.
*
* @param type Span type.
* @param parentSpanId Parent span id.
* @return Ids of the found spans.
*/
protected List<SpanId> findChildSpans(SpanType type, SpanId parentSpanId) {
return handler().allSpans()
.filter(span -> parentSpanId != null ?
parentSpanId.equals(span.getParentSpanId()) && type.spanName().equals(span.getName()) :
type.spanName().equals(span.getName()))
.map(span -> span.getContext().getSpanId())
.collect(Collectors.toList());
}
/**
* Finds root spans with specified type.
*
* @param type Span type.
* @return Ids of the found spans.
*/
protected List<SpanId> findRootSpans(SpanType type) {
return handler().allSpans()
.filter(span -> span.getParentSpanId() == null && type.spanName().equals(span.getName()))
.map(span -> span.getContext().getSpanId())
.collect(Collectors.toList());
}
/**
* Obtains string representation of the attribtute from span with specified id.
*
* @param spanId Id of the target span.
* @param tag Tag of the attribute.
* @return Value of the attribute.
*/
protected String getAttribute(SpanId spanId, String tag) {
return attributeValueToString(handler()
.spanById(spanId)
.getAttributes()
.getAttributeMap()
.get(tag));
}
/**
* Executes query with specified parameters.
*
* @param sql SQL query to execute.
* @param schema SQL query schema.
* @param skipReducerOnUpdate Wether reduce phase should be skipped during update query execution.
* @param distributedJoins Whether distributed joins enabled.
* @param isQry {@code True} if query is SELECT, {@code False} - DML queries, {@code null} - all remainings.
*/
protected void executeQuery(
String sql,
String schema,
boolean skipReducerOnUpdate,
boolean distributedJoins,
Boolean isQry
) {
SqlFieldsQuery qry = new SqlFieldsQueryEx(sql, isQry)
.setSkipReducerOnUpdate(skipReducerOnUpdate)
.setDistributedJoins(distributedJoins)
.setPageSize(PAGE_SIZE)
.setSchema(schema);
reducer().context().query().querySqlFields(qry, false).getAll();
}
/**
* Executes specified query and checks the root span of query execution.
*
* @param sql SQL query to execute.
* @param schema SQL query schema.
* @param skipReducerOnUpdate Wether reduce phase should be skipped during update query execution.
* @param distributedJoins Whether distributed joins enabled.
* @param isQry {@code True} if query is SELECT, {@code False} - DML queries, {@code null} - all remainings.
*/
protected SpanId executeAndCheckRootSpan(
String sql,
String schema,
boolean skipReducerOnUpdate,
boolean distributedJoins,
Boolean isQry
) throws Exception {
executeQuery(sql, schema, skipReducerOnUpdate, distributedJoins, isQry);
handler().flush();
checkDroppedSpans();
return checkSpan(
SQL_QRY,
null,
1,
ImmutableMap.<String, String>builder()
.put(NODE_ID, reducer().localNode().id().toString())
.put(tag(NODE, CONSISTENT_ID), reducer().localNode().consistentId().toString())
.put(tag(NODE, NAME), reducer().name())
.put(SQL_QRY_TEXT, sql)
.put(SQL_SCHEMA, schema)
.build()
).get(0);
}
/**
* @return Name of the table which was created.
*/
protected String createTableAndPopulate(Class<?> cls, CacheMode mode, int qryParallelism) {
IgniteCache<Integer, Object> cache = grid(0).createCache(
new CacheConfiguration<Integer, Object>(cls.getSimpleName() + "_" + mode)
.setIndexedTypes(Integer.class, cls)
.setCacheMode(mode)
.setQueryParallelism(qryParallelism)
.setSqlSchema(TEST_SCHEMA)
);
for (int i = 0; i < TEST_TABLE_POPULATION; i++)
cache.put(keyCntr.getAndIncrement(), cls == Organization.class ? new Organization(i, i) : new Person(i, i));
return TEST_SCHEMA + '.' + cls.getSimpleName();
}
/**
* Checks that no spans were dropped by OpencenCensus due to exporter buffer overflow.
*/
protected void checkDroppedSpans() {
Object worker = U.field(Tracing.getExportComponent().getSpanExporter(), "worker");
long droppedSpans = U.field(worker, "droppedSpans");
assertEquals("Some spans were dropped by OpencenCensus due to exporter buffer overflow.",
0, droppedSpans);
}
/**
* @return Reducer node.
*/
protected IgniteEx reducer() {
return ignite(GRID_CNT);
}
/**
* @return Number of nodes in cluster except reducer.
*/
protected int mapNodesCount() {
return GRID_CNT;
}
/** */
public static class Person {
/** */
@QuerySqlField(index = true)
private int prsnId;
/** */
@QuerySqlField
private int prsnVal;
/** */
public Person(int prsnId, int prsnVal) {
this.prsnId = prsnId;
this.prsnVal = prsnVal;
}
}
/** */
public static class Organization {
/** */
@QuerySqlField(index = true)
private int orgId;
/** */
@QuerySqlField
private int orgVal;
/** */
public Organization(int orgId, int orgVal) {
this.orgId = orgId;
this.orgVal = orgVal;
}
}
}