blob: 7fea652c967d7607199d141d754bd92607be8902 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.monitoring.opencensus;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Status;
import io.opencensus.trace.export.SpanData;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.Scope;
import org.apache.ignite.internal.processors.tracing.SpanTags;
import org.apache.ignite.internal.processors.tracing.TracingSpi;
import org.apache.ignite.internal.processors.tracing.configuration.TracingConfigurationCoordinates;
import org.apache.ignite.internal.processors.tracing.configuration.TracingConfigurationParameters;
import org.apache.ignite.spi.tracing.opencensus.OpenCensusTracingSpi;
import org.junit.Assert;
import org.junit.Test;
import static io.opencensus.trace.AttributeValue.stringAttributeValue;
import static org.apache.ignite.internal.processors.tracing.SpanType.COMMUNICATION_JOB_EXECUTE_REQUEST;
import static org.apache.ignite.internal.processors.tracing.SpanType.COMMUNICATION_JOB_EXECUTE_RESPONSE;
import static org.apache.ignite.internal.processors.tracing.SpanType.COMMUNICATION_REGULAR_PROCESS;
import static org.apache.ignite.internal.processors.tracing.SpanType.COMMUNICATION_SOCKET_READ;
import static org.apache.ignite.internal.processors.tracing.SpanType.COMMUNICATION_SOCKET_WRITE;
import static org.apache.ignite.internal.processors.tracing.SpanType.CUSTOM_JOB_CALL;
import static org.apache.ignite.internal.processors.tracing.SpanType.DISCOVERY_CUSTOM_EVENT;
import static org.apache.ignite.internal.processors.tracing.SpanType.DISCOVERY_NODE_JOIN_ADD;
import static org.apache.ignite.internal.processors.tracing.SpanType.DISCOVERY_NODE_JOIN_FINISH;
import static org.apache.ignite.internal.processors.tracing.SpanType.DISCOVERY_NODE_JOIN_REQUEST;
import static org.apache.ignite.internal.processors.tracing.SpanType.DISCOVERY_NODE_LEFT;
import static org.apache.ignite.internal.processors.tracing.SpanType.EXCHANGE_FUTURE;
import static org.apache.ignite.internal.processors.tracing.configuration.TracingConfigurationParameters.SAMPLING_RATE_ALWAYS;
/**
* Tests to check correctness of OpenCensus Tracing SPI implementation.
*/
public class OpenCensusTracingSpiTest extends AbstractTracingTest {
/** {@inheritDoc} */
@Override protected TracingSpi getTracingSpi() {
return new OpenCensusTracingSpi();
}
/** {@inheritDoc} */
@Override public void before() throws Exception {
super.before();
grid(0).tracingConfiguration().set(
new TracingConfigurationCoordinates.Builder(Scope.DISCOVERY).build(),
new TracingConfigurationParameters.Builder().
withSamplingRate(SAMPLING_RATE_ALWAYS).build());
grid(0).tracingConfiguration().set(
new TracingConfigurationCoordinates.Builder(Scope.COMMUNICATION).build(),
new TracingConfigurationParameters.Builder().
withSamplingRate(SAMPLING_RATE_ALWAYS).build());
}
/**
* Test checks that node join process is traced correctly in positive case.
*/
@Test
public void testNodeJoinTracing() throws Exception {
IgniteEx joinedNode = startGrid(GRID_CNT);
awaitPartitionMapExchange();
// Consistent id is the same with node name.
List<String> clusterNodeNames = grid(0).cluster().nodes()
.stream().map(node -> (String)node.consistentId()).collect(Collectors.toList());
handler().flush();
String joinedNodeId = joinedNode.localNode().id().toString();
// Check existence of Traces.Discovery.NODE_JOIN_REQUEST spans with OK status on all nodes:
Map<AttributeValue, SpanData> nodeJoinReqSpans = handler().allSpans()
.filter(span -> DISCOVERY_NODE_JOIN_REQUEST.spanName().equals(span.getName()))
.filter(span -> span.getStatus() == Status.OK)
.filter(span -> stringAttributeValue(joinedNodeId).equals(
span.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID))))
.collect(Collectors.toMap(
span -> span.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.NODE, SpanTags.NAME)),
span -> span
));
// NODE_JOIN_REQUEST must be processed at least on coordinator node.
// For other nodes there is no such guarantee.
int CRD_IDX = 0;
clusterNodeNames.stream().filter(
node -> node.endsWith(String.valueOf(CRD_IDX))
).forEach(nodeName ->
Assert.assertTrue(
String.format(
"%s not found on node with name=%s, nodeJoinReqSpans=%s",
DISCOVERY_NODE_JOIN_REQUEST, nodeName, nodeJoinReqSpans),
nodeJoinReqSpans.containsKey(stringAttributeValue(nodeName)))
);
// Check existence of Traces.Discovery.NODE_JOIN_ADD spans with OK status on all nodes:
for (int i = 0; i <= GRID_CNT; i++) {
List<SpanData> nodeJoinAddSpans = handler().spansReportedByNode(getTestIgniteInstanceName(i))
.filter(span -> DISCOVERY_NODE_JOIN_ADD.spanName().equals(span.getName()))
.filter(span -> span.getStatus() == Status.OK)
.filter(span -> stringAttributeValue(joinedNodeId).equals(
span.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID))))
.collect(Collectors.toList());
Assert.assertTrue(
String.format("%s span not found, nodeId=%d",
DISCOVERY_NODE_JOIN_ADD, i),
!nodeJoinReqSpans.isEmpty()
);
nodeJoinAddSpans.forEach(spanData -> {
SpanData parentSpan = handler().spanById(spanData.getParentSpanId());
Assert.assertNotNull(
"Parent span doesn't exist for " + spanData,
parentSpan
);
Assert.assertEquals(
"Parent span name is invalid, parentSpan=" + parentSpan,
DISCOVERY_NODE_JOIN_REQUEST.spanName(),
parentSpan.getName()
);
Assert.assertEquals(
"Parent span is not related to joined node, parentSpan=" + parentSpan,
stringAttributeValue(joinedNodeId),
parentSpan.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID))
);
});
}
// Check existence of Traces.Discovery.NODE_JOIN_FINISH spans with OK status on all nodes:
for (int i = 0; i <= GRID_CNT; i++) {
List<SpanData> nodeJoinAddSpans = handler().spansReportedByNode(getTestIgniteInstanceName(i))
.filter(span -> DISCOVERY_NODE_JOIN_FINISH.spanName().equals(span.getName()))
.filter(span -> span.getStatus() == Status.OK)
.filter(span -> stringAttributeValue(joinedNodeId).equals(
span.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID))))
.collect(Collectors.toList());
Assert.assertTrue(
String.format("%s span not found, nodeId=%d",
DISCOVERY_NODE_JOIN_FINISH, i),
!nodeJoinReqSpans.isEmpty()
);
nodeJoinAddSpans.forEach(spanData -> {
SpanData parentSpan = handler().spanById(spanData.getParentSpanId());
Assert.assertNotNull(
"Parent span doesn't exist for " + spanData,
parentSpan
);
Assert.assertEquals(
"Parent span name is invalid " + parentSpan,
DISCOVERY_NODE_JOIN_ADD.spanName(),
parentSpan.getName()
);
Assert.assertEquals(
"Parent span is not related to joined node " + parentSpan,
stringAttributeValue(joinedNodeId),
parentSpan.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID))
);
});
}
}
/**
* Test checks that node left process is traced correctly in positive case.
*/
@Test
public void testNodeLeftTracing() throws Exception {
// Consistent id is the same with node name.
List<String> clusterNodeNames = grid(0).cluster().forServers().nodes()
.stream().map(node -> (String)node.consistentId()).collect(Collectors.toList());
String leftNodeId = grid(GRID_CNT - 1).localNode().id().toString();
stopGrid(GRID_CNT - 1);
awaitPartitionMapExchange();
handler().flush();
// Check existence of DISCOVERY_NODE_LEFT spans with OK status on all nodes:
Map<AttributeValue, SpanData> nodeLeftSpans = handler().allSpans()
.filter(span -> DISCOVERY_NODE_LEFT.spanName().equals(span.getName()))
.filter(span -> stringAttributeValue(leftNodeId).equals(
span.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID))))
.filter(span -> span.getStatus() == Status.OK)
.collect(Collectors.toMap(
span -> span.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.NODE, SpanTags.NAME)),
span -> span,
(span1, span2) -> {
throw new AssertionError(String.format(
"More than 1 %s span handled on a node (id can be extracted from span), " +
"existingSpan=%s, extraSpan=%s",
DISCOVERY_NODE_LEFT, span1, span2
));
}
));
clusterNodeNames.forEach(nodeName ->
Assert.assertTrue(
"Span " + DISCOVERY_NODE_LEFT + " doesn't exist on node with name=" + nodeName,
nodeLeftSpans.containsKey(stringAttributeValue(nodeName)))
);
}
/**
* Test checks that PME process in case of node left discovery event is traced correctly in positive case.
*/
@Test
public void testPartitionsMapExchangeTracing() throws Exception {
long curTopVer = grid(0).cluster().topologyVersion();
String leftNodeId = grid(GRID_CNT - 1).localNode().id().toString();
stopGrid(GRID_CNT - 1);
awaitPartitionMapExchange();
handler().flush();
// Check PME for NODE_LEFT event on remaining nodes:
for (int i = 0; i < GRID_CNT - 1; i++) {
List<SpanData> exchFutSpans = handler().spansReportedByNode(getTestIgniteInstanceName(i))
.filter(span -> EXCHANGE_FUTURE.spanName().equals(span.getName()))
.filter(span -> span.getStatus() == Status.OK)
.filter(span -> AttributeValue.longAttributeValue(EventType.EVT_NODE_LEFT).equals(
span.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.EVENT, SpanTags.TYPE))))
.filter(span -> stringAttributeValue(leftNodeId).equals(
span.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID))))
.collect(Collectors.toList());
Assert.assertTrue(
String.format("%s span not found (or more than 1), nodeId=%d, exchFutSpans=%s",
EXCHANGE_FUTURE, i, exchFutSpans),
exchFutSpans.size() == 1
);
exchFutSpans.forEach(span -> {
SpanData parentSpan = handler().spanById(span.getParentSpanId());
Assert.assertNotNull(
"Parent span doesn't exist for " + span,
parentSpan
);
Assert.assertEquals(
"Parent span name is invalid " + parentSpan,
DISCOVERY_NODE_LEFT.spanName(),
parentSpan.getName()
);
Assert.assertEquals(
"Parent span is not related to joined node " + parentSpan,
stringAttributeValue(leftNodeId),
parentSpan.getAttributes().getAttributeMap().get(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID))
);
Assert.assertEquals(
"Exchange future major topology version is invalid " + span,
AttributeValue.longAttributeValue(curTopVer + 1),
span.getAttributes().getAttributeMap().get(
SpanTags.tag(SpanTags.RESULT, SpanTags.TOPOLOGY_VERSION, SpanTags.MAJOR))
);
Assert.assertEquals(
"Exchange future minor version is invalid " + span,
AttributeValue.longAttributeValue(0),
span.getAttributes().getAttributeMap().get(
SpanTags.tag(SpanTags.RESULT, SpanTags.TOPOLOGY_VERSION, SpanTags.MINOR))
);
});
}
}
/**
* @throws Exception if failed.
*/
@Test
public void testCommunicationMessages() throws Exception {
IgniteEx ignite = grid(0);
IgniteEx ignite1 = grid(1);
try (MTC.TraceSurroundings ignore = MTC.support(ignite.context().tracing().create(CUSTOM_JOB_CALL))) {
ignite.compute(ignite.cluster().forNode(ignite1.localNode())).withNoFailover().call(() -> "");
}
handler().flush();
SpanData jobSpan = handler().spanByName(CUSTOM_JOB_CALL.spanName());
List<SpanData> data = handler().unrollByParent(jobSpan);
List<AttributeValue> nodejobMsgTags = data.stream()
.filter(it -> it.getAttributes().getAttributeMap().containsKey(SpanTags.MESSAGE))
.map(it -> it.getAttributes().getAttributeMap().get(SpanTags.MESSAGE))
.collect(Collectors.toList());
List<String> nodejobTraces = data.stream()
.map(SpanData::getName)
.collect(Collectors.toList());
assertEquals(nodejobTraces.toString(), 7, nodejobTraces.size());
assertEquals(1, nodejobTraces.stream().filter(it -> it.contains(CUSTOM_JOB_CALL.spanName())).count());
//request + response
assertEquals(2, nodejobTraces.stream().filter(it -> it.contains(COMMUNICATION_SOCKET_WRITE.spanName())).count());
//request + response
assertEquals(2, nodejobTraces.stream().filter(it -> it.contains(COMMUNICATION_SOCKET_READ.spanName())).count());
//request + response
assertEquals(2, nodejobTraces.stream().filter(it -> it.contains(COMMUNICATION_REGULAR_PROCESS.spanName())).count());
assertTrue(nodejobMsgTags.stream().anyMatch(it -> it.equals(stringAttributeValue(COMMUNICATION_JOB_EXECUTE_REQUEST.spanName()))));
assertTrue(nodejobMsgTags.stream().anyMatch(it -> it.equals(stringAttributeValue(COMMUNICATION_JOB_EXECUTE_RESPONSE.spanName()))));
}
/**
*/
@Test
public void testTracingFeatureAvailable() {
assertTrue(IgniteFeatures.nodeSupports(IgniteFeatures.allFeatures(), IgniteFeatures.TRACING));
}
/**
* Ensure that root discovery.custom.event have message.class with corresponding value.
*
* @throws Exception If failed.
*/
@Test
public void testCustomEventContainsMessageClassTag() throws Exception {
IgniteEx ignite = grid(0);
ignite.createCache("New cache");
handler().flush();
// Only root discovery.custom.event spans have message.class tag.
List<SpanData> rootCustomEventSpans = handler().allSpans().
filter(spanData ->
DISCOVERY_CUSTOM_EVENT.spanName().equals(spanData.getName()) &&
spanData.getParentSpanId() == null).
collect(Collectors.toList());
// Check that there's at least one discovery.custom.event span with tag "message.class"
// and value "CacheAffinityChangeMessage"
assertTrue(rootCustomEventSpans.stream().anyMatch(
span -> "DynamicCacheChangeBatch".equals(
attributeValueToString(span.getAttributes().getAttributeMap().get(SpanTags.MESSAGE_CLASS)))));
}
}