blob: a663bdfee5ba2e5ac2ac2521607979e640ce6f1b [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import atexit
import functools
import os
import sys
import time
import weakref
try:
import opentracing
import jaeger_client
from opentracing.ext import tags
from opentracing.propagation import Format
except ImportError:
raise ImportError('proton tracing requires opentracing and jaeger_client modules')
import proton
from proton import Sender as ProtonSender
from proton.handlers import (
OutgoingMessageHandler as ProtonOutgoingMessageHandler,
IncomingMessageHandler as ProtonIncomingMessageHandler
)
_tracer = None
_trace_key = proton.symbol('x-opt-qpid-tracestate')
def get_tracer():
global _tracer
if _tracer is not None:
return _tracer
exe = sys.argv[0] if sys.argv[0] else 'interactive-session'
return init_tracer(os.path.basename(exe))
def _fini_tracer():
time.sleep(1)
c = opentracing.global_tracer().close()
while not c.done():
time.sleep(0.5)
def init_tracer(service_name):
global _tracer
if _tracer is not None:
return _tracer
config = jaeger_client.Config(
config={},
service_name=service_name,
validate=True
)
config.initialize_tracer()
_tracer = opentracing.global_tracer()
# A nasty hack to ensure enough time for the tracing data to be flushed
atexit.register(_fini_tracer)
return _tracer
class IncomingMessageHandler(ProtonIncomingMessageHandler):
def on_message(self, event):
if self.delegate is not None:
tracer = get_tracer()
message = event.message
receiver = event.receiver
connection = event.connection
span_tags = {
tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER,
tags.MESSAGE_BUS_DESTINATION: receiver.source.address,
tags.PEER_ADDRESS: connection.connected_address,
tags.PEER_HOSTNAME: connection.hostname,
'inserted_by': 'proton-message-tracing'
}
if message.annotations is not None:
headers = message.annotations[_trace_key]
span_ctx = tracer.extract(Format.TEXT_MAP, headers)
with tracer.start_active_span('amqp-delivery-receive', child_of=span_ctx, tags=span_tags):
proton._events._dispatch(self.delegate, 'on_message', event)
else:
with tracer.start_active_span('amqp-delivery-receive', ignore_active_span=True, tags=span_tags):
proton._events._dispatch(self.delegate, 'on_message', event)
class OutgoingMessageHandler(ProtonOutgoingMessageHandler):
def on_settled(self, event):
if self.delegate is not None:
delivery = event.delivery
state = delivery.remote_state
span = delivery.span
span.set_tag('delivery-terminal-state', state.name)
span.log_kv({'event': 'delivery settled', 'state': state.name})
span.finish()
proton._events._dispatch(self.delegate, 'on_settled', event)
class Sender(ProtonSender):
def send(self, msg):
tracer = get_tracer()
connection = self.connection
span_tags = {
tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER,
tags.MESSAGE_BUS_DESTINATION: self.target.address,
tags.PEER_ADDRESS: connection.connected_address,
tags.PEER_HOSTNAME: connection.hostname,
'inserted_by': 'proton-message-tracing'
}
span = tracer.start_span('amqp-delivery-send', tags=span_tags)
headers = {}
tracer.inject(span, Format.TEXT_MAP, headers)
if msg.annotations is None:
msg.annotations = { _trace_key: headers }
else:
msg.annotations[_trace_key] = headers
delivery = ProtonSender.send(self, msg)
delivery.span = span
span.set_tag('delivery-tag', delivery.tag)
return delivery
# Monkey patch proton for tracing (need to patch both internal and external names)
proton._handlers.IncomingMessageHandler = IncomingMessageHandler
proton._handlers.OutgoingMessageHandler = OutgoingMessageHandler
proton._endpoints.Sender = Sender
proton.handlers.IncomingMessageHandler = IncomingMessageHandler
proton.handlers.OutgoingMessageHandler = OutgoingMessageHandler
proton.Sender = Sender