blob: f1e20f581a129353de06e8e12d3bbdd4730d8c89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.tracing.opentracing;
import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.ANNOTATION_KEY;
import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.COMPONENT;
import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.DELIVERY_SETTLED;
import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.ERROR_EVENT;
import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.MESSAGE_EXPIRED;
import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.ONMESSAGE_SPAN_NAME;
import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.RECEIVE_SPAN_NAME;
import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.REDELIVERIES_EXCEEDED;
import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.SEND_SPAN_NAME;
import static org.apache.qpid.jms.tracing.opentracing.OpenTracingTracer.STATE;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.jms.tracing.JmsTracer;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.log.Fields;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockSpan.LogEntry;
import io.opentracing.mock.MockSpan.MockContext;
import io.opentracing.mock.MockTracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMapAdapter;
import io.opentracing.tag.Tags;
public class OpenTracingIntegrationTest extends QpidJmsTestCase {
@Captor
private ArgumentCaptor<Map<String, String>> annotationMapCaptor;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test(timeout = 20000)
public void testSend() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
testPeer.expectSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect a message with the trace info annotation set
String msgContent = "myTracedMessageContent";
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
msgAnnotationsMatcher.withEntry(Symbol.valueOf(ANNOTATION_KEY), Matchers.any(Map.class));
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(msgContent));
testPeer.expectTransfer(messageMatcher);
TextMessage message = session.createTextMessage(msgContent);
producer.send(message);
testPeer.waitForAllHandlersToComplete(2000);
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
Span sendSpan = finishedSpans.get(0);
assertEquals("Unexpected span class", MockSpan.class, sendSpan.getClass());
MockSpan sendMockSpan = (MockSpan) sendSpan;
assertEquals("Expected span to have no parent", 0, sendMockSpan.parentId());
assertEquals("Unexpected span operation name", SEND_SPAN_NAME, sendMockSpan.operationName());
// Verify tags set on the completed span
Map<String, Object> spanTags = sendMockSpan.tags();
assertFalse("Expected some tags", spanTags.isEmpty());
assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_PRODUCER, spanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
// Verify log set on the completed span
List<LogEntry> entries = sendMockSpan.logEntries();
assertEquals("Expected 1 log entry: " + entries, 1, entries.size());
Map<String, ?> entryFields = entries.get(0).fields();
assertFalse("Expected some log entry fields", entryFields.isEmpty());
assertNotNull("Expected a state description", entryFields.get(STATE));
assertEquals(DELIVERY_SETTLED, entryFields.get(Fields.EVENT));
// Verify the context sent on the wire matches the original span
Object obj = msgAnnotationsMatcher.getReceivedAnnotation(Symbol.valueOf(ANNOTATION_KEY));
assertTrue("annotation was not a map", obj instanceof Map);
@SuppressWarnings("unchecked")
Map<String, String> traceInfo = (Map<String, String>) obj;
assertFalse("Expected some content in map", traceInfo.isEmpty());
SpanContext extractedContext = mockTracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(traceInfo));
assertEquals("Unexpected context class", MockContext.class, extractedContext.getClass());
assertEquals("Extracted context spanId did not match original", sendMockSpan.context().spanId(), ((MockContext) extractedContext).spanId());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testSendPreSettled() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer, "jms.presettlePolicy.presettleProducers=true"));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
testPeer.expectSettledSenderAttach();
MessageProducer producer = session.createProducer(queue);
// Expect a message with the trace info annotation set
String msgContent = "myTracedMessageContent";
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
msgAnnotationsMatcher.withEntry(Symbol.valueOf(ANNOTATION_KEY), Matchers.any(Map.class));
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(msgContent));
// Expect settled transfer
testPeer.expectTransfer(messageMatcher, Matchers.nullValue(), true, false, null, false);
TextMessage message = session.createTextMessage(msgContent);
producer.send(message);
// Await the pre-settled transfer completing (so we can get some details of it from the peer) and span finishing.
testPeer.waitForAllHandlersToComplete(2000);
boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
assertTrue("Did not get finished span after send", finishedSpanFound);
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
Span sendSpan = finishedSpans.get(0);
assertEquals("Unexpected span class", MockSpan.class, sendSpan.getClass());
MockSpan sendMockSpan = (MockSpan) sendSpan;
assertEquals("Expected span to have no parent", 0, sendMockSpan.parentId());
assertEquals("Unexpected span operation name", SEND_SPAN_NAME, sendMockSpan.operationName());
// Verify tags set on the completed span
Map<String, Object> spanTags = sendMockSpan.tags();
assertFalse("Expected some tags", spanTags.isEmpty());
assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_PRODUCER, spanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
// Verify log set on the completed span
List<LogEntry> entries = sendMockSpan.logEntries();
assertEquals("Expected 1 log entry: " + entries, 1, entries.size());
Map<String, ?> entryFields = entries.get(0).fields();
assertFalse("Expected some log entry fields", entryFields.isEmpty());
assertNotNull("Expected a state description", entryFields.get(STATE));
assertEquals(DELIVERY_SETTLED, entryFields.get(Fields.EVENT));
// Verify the context sent on the wire matches the original span
Object obj = msgAnnotationsMatcher.getReceivedAnnotation(Symbol.valueOf(ANNOTATION_KEY));
assertTrue("annotation was not a map", obj instanceof Map);
@SuppressWarnings("unchecked")
Map<String, String> traceInfo = (Map<String, String>) obj;
assertFalse("Expected some content in map", traceInfo.isEmpty());
SpanContext extractedContext = mockTracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(traceInfo));
assertEquals("Unexpected context class", MockContext.class, extractedContext.getClass());
assertEquals("Extracted context spanId did not match original", sendMockSpan.context().spanId(), ((MockContext) extractedContext).spanId());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testReceive() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
// Prepare an arriving message with tracing info
Map<String,String> injected = new HashMap<>();
MockSpan sendSpan = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected));
assertFalse("Expected inject to add values", injected.isEmpty());
MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
msgAnnotations.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected);
String msgContent = "myContent";
DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message msg = messageConsumer.receive(2000);
assertNotNull("Did not receive message as expected", msg);
assertNull("expected no active span", mockTracer.activeSpan());
boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
assertTrue("Did not get finished span after receive", finishedSpanFound);
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
Span deliverySpan = finishedSpans.get(0);
assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
assertEquals("Expected span to be child of the send span", sendSpan.context().spanId(), deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
// Verify tags set on the completed span
Map<String, Object> spanTags = deliveryMockSpan.tags();
assertFalse("Expected some tags", spanTags.isEmpty());
assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
// Verify no log set on the completed span
List<LogEntry> logEntries = deliveryMockSpan.logEntries();
assertTrue("Expected no log entry: " + logEntries, logEntries.isEmpty());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testReceiveBody() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
JMSContext context = factory.createContext();
context.start();
testPeer.expectBegin();
String queueName = "myQueue";
Queue queue = context.createQueue(queueName);
// Prepare an arriving message with tracing info
Map<String,String> injected = new HashMap<>();
MockSpan sendSpan = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected));
assertFalse("Expected inject to add values", injected.isEmpty());
MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
msgAnnotations.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected);
String msgContent = "myContent";
DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
JMSConsumer consumer = context.createConsumer(queue);
String body = consumer.receiveBody(String.class, 2000);
assertEquals("Did not receive message body as expected", msgContent, body);
assertNull("expected no active span", mockTracer.activeSpan());
boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
assertTrue("Did not get finished span after receiveBody", finishedSpanFound);
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
Span deliverySpan = finishedSpans.get(0);
assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
assertEquals("Expected span to be child of the send span", sendSpan.context().spanId(), deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
// Verify tags set on the completed span
Map<String, Object> spanTags = deliveryMockSpan.tags();
assertFalse("Expected some tags", spanTags.isEmpty());
assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
// Verify no log set on the completed span
List<LogEntry> logEntries = deliveryMockSpan.logEntries();
assertTrue("Expected no log entry: " + logEntries, logEntries.isEmpty());
testPeer.expectEnd();
testPeer.expectClose();
context.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testReceiveWithoutTraceInfo() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
// Prepare an arriving message without tracing info
String msgContent = "myContent";
DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
MessageConsumer messageConsumer = session.createConsumer(queue);
Message msg = messageConsumer.receive(2000);
assertNotNull("Did not receive message as expected", msg);
assertNull("expected no active span", mockTracer.activeSpan());
boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
assertTrue("Did not get finished span after receive", finishedSpanFound);
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
Span deliverySpan = finishedSpans.get(0);
assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
assertEquals("Expected span to have no parent as incoming message had no context", 0, deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
// Verify tags set on the completed span
Map<String, Object> spanTags = deliveryMockSpan.tags();
assertFalse("Expected some tags", spanTags.isEmpty());
assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
// Verify no log set on the completed span
List<LogEntry> logEntries = deliveryMockSpan.logEntries();
assertTrue("Expected no log entry: " + logEntries, logEntries.isEmpty());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Test(timeout = 20000)
public void testReceiveWithExpiredMessage() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
// Prepare an arriving message with tracing info, but which has also already expired
Map<String,String> injected1 = new HashMap<>();
MockSpan sendSpan1 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan1.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected1));
assertFalse("Expected inject to add values", injected1.isEmpty());
MessageAnnotationsDescribedType msgAnnotations1 = new MessageAnnotationsDescribedType();
msgAnnotations1.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected1);
PropertiesDescribedType props = new PropertiesDescribedType();
props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
String expiredMsgContent = "already-expired";
// Also prepare a message which is not expired yet.
String liveMsgContent = "still-active";
Map<String,String> injected2 = new HashMap<>();
MockSpan sendSpan2 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan2.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected2));
assertFalse("Expected inject to add values", injected2.isEmpty());
MessageAnnotationsDescribedType msgAnnotations2 = new MessageAnnotationsDescribedType();
msgAnnotations2.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected2);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations1, props, null, new AmqpValueDescribedType(expiredMsgContent));
testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, msgAnnotations2, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
ModifiedMatcher modified = new ModifiedMatcher();
modified.withDeliveryFailed(equalTo(true));
modified.withUndeliverableHere(equalTo(true));
testPeer.expectDisposition(true, modified, 1, 1);
testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
MessageConsumer messageConsumer = session.createConsumer(queue);
Message msg = messageConsumer.receive(3000);
assertNotNull("Message should have been received", msg);
assertTrue(msg instanceof TextMessage);
assertEquals("Unexpected message content", liveMsgContent, ((TextMessage)msg).getText());
assertNotEquals(expiredMsgContent, liveMsgContent);
assertNull("expected no active span", mockTracer.activeSpan());
boolean finishedSpansFound = Wait.waitFor(() -> (mockTracer.finishedSpans().size() == 2), 3000, 10);
assertTrue("Did not get finished spans after receive", finishedSpansFound);
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
Span expiredSpan = finishedSpans.get(0);
assertEquals("Unexpected span class", MockSpan.class, expiredSpan.getClass());
MockSpan expiredMockSpan = (MockSpan) expiredSpan;
assertEquals("Expected expired message span to be child of the first send span", sendSpan1.context().spanId(), expiredMockSpan.parentId());
assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, expiredMockSpan.operationName());
// Verify tags on the span for expired message
Map<String, Object> expiredSpanTags = expiredMockSpan.tags();
assertFalse("Expected some tags", expiredSpanTags.isEmpty());
assertFalse("Expected error tag not to be set", expiredSpanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, expiredSpanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, expiredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, expiredSpanTags.get(Tags.COMPONENT.getKey()));
// Verify log on the span for expired message
List<LogEntry> expiredLogEntries = expiredMockSpan.logEntries();
assertEquals("Expected 1 log entry: " + expiredLogEntries, 1, expiredLogEntries.size());
Map<String, ?> entryFields = expiredLogEntries.get(0).fields();
assertFalse("Expected some log entry fields", entryFields.isEmpty());
assertEquals(MESSAGE_EXPIRED, entryFields.get(Fields.EVENT));
Span deliverySpan = finishedSpans.get(1);
assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
assertEquals("Expected delivery span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
// Verify tags on the span for delivered message
Map<String, Object> deliveredSpanTags = deliveryMockSpan.tags();
assertFalse("Expected some tags", deliveredSpanTags.isEmpty());
assertFalse("Expected error tag not to be set", deliveredSpanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, deliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, deliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, deliveredSpanTags.get(Tags.COMPONENT.getKey()));
// Verify no log on the span for delivered message
List<LogEntry> deliveredLogEntries = deliveryMockSpan.logEntries();
assertTrue("Expected no log entry: " + deliveredLogEntries, deliveredLogEntries.isEmpty());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
sendSpan1.finish();
sendSpan2.finish();
finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 4 finished spans: " + finishedSpans, 4, finishedSpans.size());
}
}
@Test(timeout = 20000)
public void testReceiveWithRedeliveryPolicy() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer, "jms.redeliveryPolicy.maxRedeliveries=1"));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
// Prepare an arriving message with tracing info, but which has also already exceeded the redelivery-policy
Map<String,String> injected1 = new HashMap<>();
MockSpan sendSpan1 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan1.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected1));
assertFalse("Expected inject to add values", injected1.isEmpty());
MessageAnnotationsDescribedType msgAnnotations1 = new MessageAnnotationsDescribedType();
msgAnnotations1.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected1);
HeaderDescribedType header = new HeaderDescribedType();
header.setDeliveryCount(UnsignedInteger.valueOf(2));
String redeliveredMsgContent = "already-exceeded-redelivery-policy";
// Also prepare a message which has not exceeded the redelivery policy yet.
String liveMsgContent = "still-active";
Map<String,String> injected2 = new HashMap<>();
MockSpan sendSpan2 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan2.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected2));
assertFalse("Expected inject to add values", injected2.isEmpty());
MessageAnnotationsDescribedType msgAnnotations2 = new MessageAnnotationsDescribedType();
msgAnnotations2.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected2);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(header, msgAnnotations1, null, null, new AmqpValueDescribedType(redeliveredMsgContent));
testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, msgAnnotations2, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
ModifiedMatcher modified = new ModifiedMatcher();
modified.withDeliveryFailed(equalTo(true));
modified.withUndeliverableHere(equalTo(true));
testPeer.expectDisposition(true, modified, 1, 1);
testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
MessageConsumer messageConsumer = session.createConsumer(queue);
Message msg = messageConsumer.receive(3000);
assertNotNull("Message should have been received", msg);
assertTrue(msg instanceof TextMessage);
assertEquals("Unexpected message content", liveMsgContent, ((TextMessage)msg).getText());
assertNotEquals(redeliveredMsgContent, liveMsgContent);
assertNull("expected no active span", mockTracer.activeSpan());
boolean finishedSpansFound = Wait.waitFor(() -> (mockTracer.finishedSpans().size() == 2), 3000, 10);
assertTrue("Did not get finished spans after receive", finishedSpansFound);
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
Span redeliveredSpan = finishedSpans.get(0);
assertEquals("Unexpected span class", MockSpan.class, redeliveredSpan.getClass());
MockSpan redeliveredMockSpan = (MockSpan) redeliveredSpan;
assertEquals("Expected redelivered message span to be child of the first send span", sendSpan1.context().spanId(), redeliveredMockSpan.parentId());
assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, redeliveredMockSpan.operationName());
// Verify tags on the span for redelivered message
Map<String, Object> redeliveredSpanTags = redeliveredMockSpan.tags();
assertFalse("Expected some tags", redeliveredSpanTags.isEmpty());
assertFalse("Expected error tag not to be set", redeliveredSpanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, redeliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, redeliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, redeliveredSpanTags.get(Tags.COMPONENT.getKey()));
// Verify log on the span for redelivered message
List<LogEntry> redeliveredLogEntries = redeliveredMockSpan.logEntries();
assertEquals("Expected 1 log entry: " + redeliveredLogEntries, 1, redeliveredLogEntries.size());
Map<String, ?> entryFields = redeliveredLogEntries.get(0).fields();
assertFalse("Expected some log entry fields", entryFields.isEmpty());
assertEquals(REDELIVERIES_EXCEEDED, entryFields.get(Fields.EVENT));
Span deliverySpan = finishedSpans.get(1);
assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
assertEquals("Expected delivery span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", RECEIVE_SPAN_NAME, deliveryMockSpan.operationName());
// Verify tags on the span for delivered message
Map<String, Object> deliveredSpanTags = deliveryMockSpan.tags();
assertFalse("Expected some tags", deliveredSpanTags.isEmpty());
assertFalse("Expected error tag not to be set", deliveredSpanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, deliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, deliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, deliveredSpanTags.get(Tags.COMPONENT.getKey()));
// Verify no log on the span for delivered message
List<LogEntry> deliveredLogEntries = deliveryMockSpan.logEntries();
assertTrue("Expected no log entry: " + deliveredLogEntries, deliveredLogEntries.isEmpty());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
sendSpan1.finish();
sendSpan2.finish();
finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 4 finished spans: " + finishedSpans, 4, finishedSpans.size());
}
}
@Test(timeout = 20000)
public void testOnMessage() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
// Prepare an arriving message with tracing info
Map<String,String> injected = new HashMap<>();
MockSpan sendSpan = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected));
assertFalse("Expected inject to add values", injected.isEmpty());
MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
msgAnnotations.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected);
String msgContent = "myContent";
DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
AtomicReference<Span> activeSpanRef = new AtomicReference<>();
AtomicReference<Throwable> throwableRef = new AtomicReference<>();
CountDownLatch deliveryRun = new CountDownLatch(1);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
activeSpanRef.set(mockTracer.activeSpan());
deliveryRun.countDown();
} catch (Throwable t) {
throwableRef.set(t);
}
}
});
assertTrue("onMessage did not run in timely fashion: " + throwableRef.get(), deliveryRun.await(3000, TimeUnit.MILLISECONDS));
Span deliverySpan = activeSpanRef.get();
assertNotNull("expected an active span during onMessage", deliverySpan);
assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
assertTrue("Did not get finished span after onMessage", finishedSpanFound);
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
assertEquals("Unexpected finished span", deliverySpan, finishedSpans.get(0));
assertEquals("Expected span to be child of the send span", sendSpan.context().spanId(), deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
// Verify tags set on the completed span
Map<String, Object> spanTags = deliveryMockSpan.tags();
assertFalse("Expected some tags", spanTags.isEmpty());
assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
// Verify no log set on the completed span
List<LogEntry> logEntries = deliveryMockSpan.logEntries();
assertTrue("Expected no log entry: " + logEntries, logEntries.isEmpty());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
sendSpan.finish();
finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
assertNull("Unexpected error during onMessage", throwableRef.get());
}
}
@Test(timeout = 20000)
public void testOnMessageWithoutTraceInfo() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
// Prepare an arriving message without tracing info
String msgContent = "myContent";
DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueContent);
testPeer.expectDispositionThatIsAcceptedAndSettled();
AtomicReference<Span> activeSpanRef = new AtomicReference<>();
AtomicReference<Throwable> throwableRef = new AtomicReference<>();
CountDownLatch deliveryRun = new CountDownLatch(1);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
activeSpanRef.set(mockTracer.activeSpan());
deliveryRun.countDown();
} catch (Throwable t) {
throwableRef.set(t);
}
}
});
assertTrue("onMessage did not run in timely fashion: " + throwableRef.get(), deliveryRun.await(3000, TimeUnit.MILLISECONDS));
Span deliverySpan = activeSpanRef.get();
assertNotNull("expected an active span during onMessage", deliverySpan);
assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
assertTrue("Did not get finished span after onMessage", finishedSpanFound);
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
assertEquals("Unexpected finished span", deliverySpan, finishedSpans.get(0));
assertEquals("Expected span to have no parent as incoming message had no context", 0, deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
// Verify tags set on the completed span
Map<String, Object> spanTags = deliveryMockSpan.tags();
assertFalse("Expected some tags", spanTags.isEmpty());
assertFalse("Expected error tag not to be set", spanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
// Verify no log set on the completed span
List<LogEntry> logEntries = deliveryMockSpan.logEntries();
assertTrue("Expected no log entry: " + logEntries, logEntries.isEmpty());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
assertNull("Unexpected error during onMessage", throwableRef.get());
}
}
@Test(timeout = 20000)
public void testOnMessageThrowingException() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
// Prepare an arriving message with tracing info
Map<String,String> injected = new HashMap<>();
MockSpan sendSpan = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected));
assertFalse("Expected inject to add values", injected.isEmpty());
MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
msgAnnotations.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected);
String msgContent = "myContent";
DescribedType amqpValueContent = new AmqpValueDescribedType(msgContent);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueContent);
testPeer.expectDispositionThatIsReleasedAndSettled();
AtomicReference<Span> activeSpanRef = new AtomicReference<>();
AtomicReference<Throwable> throwableRef = new AtomicReference<>();
CountDownLatch deliveryRun = new CountDownLatch(1);
String exceptionMessage = "not-supposed-to-throw-from-onMessage";
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
activeSpanRef.set(mockTracer.activeSpan());
deliveryRun.countDown();
} catch (Throwable t) {
throwableRef.set(t);
}
throw new RuntimeException(exceptionMessage);
}
});
assertTrue("onMessage did not run in timely fashion: " + throwableRef.get(), deliveryRun.await(3000, TimeUnit.MILLISECONDS));
Span deliverySpan = activeSpanRef.get();
assertNotNull("expected an active span during onMessage", deliverySpan);
assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
boolean finishedSpanFound = Wait.waitFor(() -> !(mockTracer.finishedSpans().isEmpty()), 3000, 10);
assertTrue("Did not get finished span after onMessage", finishedSpanFound);
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 1 finished span: " + finishedSpans, 1, finishedSpans.size());
assertEquals("Unexpected finished span", deliveryMockSpan, finishedSpans.get(0));
assertEquals("Expected span to be child of the send span", sendSpan.context().spanId(), deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
// Verify tags set on the completed span
Map<String, Object> spanTags = deliveryMockSpan.tags();
assertFalse("Expected some tags", spanTags.isEmpty());
assertTrue("Expected error tag to be true", (Boolean) spanTags.get(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, spanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, spanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, spanTags.get(Tags.COMPONENT.getKey()));
// Verify log set on the completed span
List<LogEntry> logEntries = deliveryMockSpan.logEntries();
assertEquals("Expected 1 log entry: " + logEntries, 1, logEntries.size());
Map<String, ?> entryFields = logEntries.get(0).fields();
assertFalse("Expected some log entry fields", entryFields.isEmpty());
assertEquals(ERROR_EVENT, entryFields.get(Fields.EVENT));
Object messageDesc = entryFields.get(Fields.MESSAGE);
assertTrue(messageDesc instanceof String);
assertTrue(((String) messageDesc).contains("thrown from onMessage"));
Object t = entryFields.get(Fields.ERROR_OBJECT);
assertNotNull("Expected error object to be set", t);
assertTrue(t instanceof RuntimeException);
assertTrue(exceptionMessage.equals(((RuntimeException) t).getMessage()));
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
sendSpan.finish();
finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
assertNull("Unexpected error during onMessage", throwableRef.get());
}
}
@Test(timeout = 20000)
public void testOnMessageWithExpiredMessage() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
// Prepare an arriving message with tracing info, but which has also already expired
Map<String,String> injected1 = new HashMap<>();
MockSpan sendSpan1 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan1.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected1));
assertFalse("Expected inject to add values", injected1.isEmpty());
MessageAnnotationsDescribedType msgAnnotations1 = new MessageAnnotationsDescribedType();
msgAnnotations1.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected1);
PropertiesDescribedType props = new PropertiesDescribedType();
props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
String expiredMsgContent = "already-expired";
// Also prepare a message which is not expired yet.
String liveMsgContent = "still-active";
Map<String,String> injected2 = new HashMap<>();
MockSpan sendSpan2 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan2.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected2));
assertFalse("Expected inject to add values", injected2.isEmpty());
MessageAnnotationsDescribedType msgAnnotations2 = new MessageAnnotationsDescribedType();
msgAnnotations2.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected2);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations1, props, null, new AmqpValueDescribedType(expiredMsgContent));
testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, msgAnnotations2, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
ModifiedMatcher modified = new ModifiedMatcher();
modified.withDeliveryFailed(equalTo(true));
modified.withUndeliverableHere(equalTo(true));
testPeer.expectDisposition(true, modified, 1, 1);
testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
AtomicReference<Span> activeSpanRef = new AtomicReference<>();
AtomicReference<Throwable> throwableRef = new AtomicReference<>();
CountDownLatch deliveryRun = new CountDownLatch(1);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
activeSpanRef.compareAndSet(null, mockTracer.activeSpan());
deliveryRun.countDown();
} catch (Throwable t) {
throwableRef.set(t);
}
}
});
assertTrue("onMessage did not run in timely fashion: " + throwableRef.get(), deliveryRun.await(3000, TimeUnit.MILLISECONDS));
boolean finishedSpansFound = Wait.waitFor(() -> (mockTracer.finishedSpans().size() == 2), 3000, 10);
assertTrue("Did not get finished spans after receive", finishedSpansFound);
Span deliverySpan = activeSpanRef.get();
assertNotNull("expected an active span during onMessage", deliverySpan);
assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
assertEquals("Expected span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
Span expiredSpan = finishedSpans.get(0);
assertEquals("Unexpected span class", MockSpan.class, expiredSpan.getClass());
MockSpan expiredMockSpan = (MockSpan) expiredSpan;
assertEquals("Expected expired message span to be child of the first send span", sendSpan1.context().spanId(), expiredMockSpan.parentId());
assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, expiredMockSpan.operationName());
// Verify tags on the span for expired message
Map<String, Object> expiredSpanTags = expiredMockSpan.tags();
assertFalse("Expected some tags", expiredSpanTags.isEmpty());
assertFalse("Expected error tag not to be set", expiredSpanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, expiredSpanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, expiredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, expiredSpanTags.get(Tags.COMPONENT.getKey()));
// Verify log on the span for expired message
List<LogEntry> expiredLogEntries = expiredMockSpan.logEntries();
assertEquals("Expected 1 log entry: " + expiredLogEntries, 1, expiredLogEntries.size());
Map<String, ?> entryFields = expiredLogEntries.get(0).fields();
assertFalse("Expected some log entry fields", entryFields.isEmpty());
assertEquals(MESSAGE_EXPIRED, entryFields.get(Fields.EVENT));
assertEquals("Unexpected second finished span", deliveryMockSpan, finishedSpans.get(1));
assertEquals("Expected delivery span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
// Verify tags on the span for delivered message
Map<String, Object> deliveredSpanTags = deliveryMockSpan.tags();
assertFalse("Expected some tags", deliveredSpanTags.isEmpty());
assertFalse("Expected error tag not to be set", deliveredSpanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, deliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(queueName, deliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, deliveredSpanTags.get(Tags.COMPONENT.getKey()));
// Verify no log on the span for delivered message
List<LogEntry> deliveredLogEntries = deliveryMockSpan.logEntries();
assertTrue("Expected no log entry: " + deliveredLogEntries, deliveredLogEntries.isEmpty());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
sendSpan1.finish();
sendSpan2.finish();
finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 4 finished spans: " + finishedSpans, 4, finishedSpans.size());
assertNull("Unexpected error during onMessage", throwableRef.get());
}
}
@Test(timeout = 20000)
public void testOnMessageWithRedeliveryPolicy() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnectionFactory factory = new JmsConnectionFactory(createPeerURI(testPeer, "jms.redeliveryPolicy.maxRedeliveries=1"));
MockTracer mockTracer = new MockTracer();
JmsTracer tracer = OpenTracingTracerFactory.create(mockTracer);
factory.setTracer(tracer);
testPeer.expectSaslAnonymous();
testPeer.expectOpen();
testPeer.expectBegin();
Connection connection = factory.createConnection();
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic topic = session.createTopic(topicName);
// Prepare an arriving message with tracing info, but which has also already exceeded the redelivery-policy
Map<String,String> injected1 = new HashMap<>();
MockSpan sendSpan1 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan1.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected1));
assertFalse("Expected inject to add values", injected1.isEmpty());
MessageAnnotationsDescribedType msgAnnotations1 = new MessageAnnotationsDescribedType();
msgAnnotations1.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected1);
HeaderDescribedType header = new HeaderDescribedType();
header.setDeliveryCount(UnsignedInteger.valueOf(2));
String redeliveredMsgContent = "already-exceeded-redelivery-policy";
// Also prepare a message which has not exceeded the redelivery policy yet.
String liveMsgContent = "still-active";
Map<String,String> injected2 = new HashMap<>();
MockSpan sendSpan2 = mockTracer.buildSpan(SEND_SPAN_NAME).start();
mockTracer.inject(sendSpan2.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(injected2));
assertFalse("Expected inject to add values", injected2.isEmpty());
MessageAnnotationsDescribedType msgAnnotations2 = new MessageAnnotationsDescribedType();
msgAnnotations2.setSymbolKeyedAnnotation(ANNOTATION_KEY, injected2);
testPeer.expectReceiverAttach();
testPeer.expectLinkFlowRespondWithTransfer(header, msgAnnotations1, null, null, new AmqpValueDescribedType(redeliveredMsgContent));
testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, msgAnnotations2, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
ModifiedMatcher modified = new ModifiedMatcher();
modified.withDeliveryFailed(equalTo(true));
modified.withUndeliverableHere(equalTo(true));
testPeer.expectDisposition(true, modified, 1, 1);
testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
AtomicReference<Span> activeSpanRef = new AtomicReference<>();
AtomicReference<Throwable> throwableRef = new AtomicReference<>();
CountDownLatch deliveryRun = new CountDownLatch(1);
MessageConsumer messageConsumer = session.createConsumer(topic);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
activeSpanRef.compareAndSet(null, mockTracer.activeSpan());
deliveryRun.countDown();
} catch (Throwable t) {
throwableRef.set(t);
}
}
});
assertTrue("onMessage did not run in timely fashion: " + throwableRef.get(), deliveryRun.await(3000, TimeUnit.MILLISECONDS));
boolean finishedSpansFound = Wait.waitFor(() -> (mockTracer.finishedSpans().size() == 2), 3000, 10);
assertTrue("Did not get finished spans after receive", finishedSpansFound);
Span deliverySpan = activeSpanRef.get();
assertNotNull("expected an active span during onMessage", deliverySpan);
assertEquals("Unexpected span class", MockSpan.class, deliverySpan.getClass());
MockSpan deliveryMockSpan = (MockSpan) deliverySpan;
List<MockSpan> finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 2 finished spans: " + finishedSpans, 2, finishedSpans.size());
assertEquals("Expected span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
Span redeliveredSpan = finishedSpans.get(0);
assertEquals("Unexpected span class", MockSpan.class, redeliveredSpan.getClass());
MockSpan redeliveredMockSpan = (MockSpan) redeliveredSpan;
assertEquals("Expected redelivered message span to be child of the first send span", sendSpan1.context().spanId(), redeliveredMockSpan.parentId());
assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, redeliveredMockSpan.operationName());
// Verify tags on the span for redelivered message
Map<String, Object> redeliveredSpanTags = redeliveredMockSpan.tags();
assertFalse("Expected some tags", redeliveredSpanTags.isEmpty());
assertFalse("Expected error tag not to be set", redeliveredSpanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, redeliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(topicName, redeliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, redeliveredSpanTags.get(Tags.COMPONENT.getKey()));
// Verify log on the span for redelivered message
List<LogEntry> redeliveredLogEntries = redeliveredMockSpan.logEntries();
assertEquals("Expected 1 log entry: " + redeliveredLogEntries, 1, redeliveredLogEntries.size());
Map<String, ?> entryFields = redeliveredLogEntries.get(0).fields();
assertFalse("Expected some log entry fields", entryFields.isEmpty());
assertEquals(REDELIVERIES_EXCEEDED, entryFields.get(Fields.EVENT));
assertEquals("Unexpected second finished span", deliveryMockSpan, finishedSpans.get(1));
assertEquals("Expected delivery span to be child of the second send span", sendSpan2.context().spanId(), deliveryMockSpan.parentId());
assertEquals("Unexpected span operation name", ONMESSAGE_SPAN_NAME, deliveryMockSpan.operationName());
// Verify tags on the span for delivered message
Map<String, Object> deliveredSpanTags = deliveryMockSpan.tags();
assertFalse("Expected some tags", deliveredSpanTags.isEmpty());
assertFalse("Expected error tag not to be set", deliveredSpanTags.containsKey(Tags.ERROR.getKey()));
assertEquals(Tags.SPAN_KIND_CONSUMER, deliveredSpanTags.get(Tags.SPAN_KIND.getKey()));
assertEquals(topicName, deliveredSpanTags.get(Tags.MESSAGE_BUS_DESTINATION.getKey()));
assertEquals(COMPONENT, deliveredSpanTags.get(Tags.COMPONENT.getKey()));
// Verify no log on the span for delivered message
List<LogEntry> deliveredLogEntries = deliveryMockSpan.logEntries();
assertTrue("Expected no log entry: " + deliveredLogEntries, deliveredLogEntries.isEmpty());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
sendSpan1.finish();
sendSpan2.finish();
finishedSpans = mockTracer.finishedSpans();
assertEquals("Expected 4 finished spans: " + finishedSpans, 4, finishedSpans.size());
assertNull("Unexpected error during onMessage", throwableRef.get());
}
}
private String createPeerURI(TestAmqpPeer peer) {
return createPeerURI(peer, null);
}
private String createPeerURI(TestAmqpPeer peer, String params) {
return "amqp://localhost:" + peer.getServerPort() + (params != null ? "?" + params : "");
}
}