| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import atexit |
| import functools |
| import os |
| import sys |
| import time |
| import weakref |
| |
| try: |
| import opentracing |
| import jaeger_client |
| from opentracing.ext import tags |
| from opentracing.propagation import Format |
| except ImportError: |
| raise ImportError('proton tracing requires opentracing and jaeger_client modules') |
| |
| import proton |
| from proton import Sender as ProtonSender |
| from proton.handlers import ( |
| OutgoingMessageHandler as ProtonOutgoingMessageHandler, |
| IncomingMessageHandler as ProtonIncomingMessageHandler |
| ) |
| |
| _tracer = None |
| _trace_key = proton.symbol('x-opt-qpid-tracestate') |
| |
| |
| def get_tracer(): |
| global _tracer |
| if _tracer is not None: |
| return _tracer |
| exe = sys.argv[0] if sys.argv[0] else 'interactive-session' |
| return init_tracer(os.path.basename(exe)) |
| |
| |
| def _fini_tracer(): |
| time.sleep(1) |
| c = opentracing.global_tracer().close() |
| while not c.done(): |
| time.sleep(0.5) |
| |
| |
| def init_tracer(service_name): |
| global _tracer |
| if _tracer is not None: |
| return _tracer |
| |
| config = jaeger_client.Config( |
| config={}, |
| service_name=service_name, |
| validate=True |
| ) |
| config.initialize_tracer() |
| _tracer = opentracing.global_tracer() |
| # A nasty hack to ensure enough time for the tracing data to be flushed |
| atexit.register(_fini_tracer) |
| return _tracer |
| |
| |
| class IncomingMessageHandler(ProtonIncomingMessageHandler): |
| def on_message(self, event): |
| if self.delegate is not None: |
| tracer = get_tracer() |
| message = event.message |
| receiver = event.receiver |
| connection = event.connection |
| span_tags = { |
| tags.SPAN_KIND: tags.SPAN_KIND_CONSUMER, |
| tags.MESSAGE_BUS_DESTINATION: receiver.source.address, |
| tags.PEER_ADDRESS: connection.connected_address, |
| tags.PEER_HOSTNAME: connection.hostname, |
| 'inserted_by': 'proton-message-tracing' |
| } |
| if message.annotations is not None: |
| headers = message.annotations[_trace_key] |
| span_ctx = tracer.extract(Format.TEXT_MAP, headers) |
| with tracer.start_active_span('amqp-delivery-receive', child_of=span_ctx, tags=span_tags): |
| proton._events._dispatch(self.delegate, 'on_message', event) |
| else: |
| with tracer.start_active_span('amqp-delivery-receive', ignore_active_span=True, tags=span_tags): |
| proton._events._dispatch(self.delegate, 'on_message', event) |
| |
| |
| class OutgoingMessageHandler(ProtonOutgoingMessageHandler): |
| def on_settled(self, event): |
| if self.delegate is not None: |
| delivery = event.delivery |
| state = delivery.remote_state |
| span = delivery.span |
| span.set_tag('delivery-terminal-state', state.name) |
| span.log_kv({'event': 'delivery settled', 'state': state.name}) |
| span.finish() |
| proton._events._dispatch(self.delegate, 'on_settled', event) |
| |
| |
| class Sender(ProtonSender): |
| def send(self, msg): |
| tracer = get_tracer() |
| connection = self.connection |
| span_tags = { |
| tags.SPAN_KIND: tags.SPAN_KIND_PRODUCER, |
| tags.MESSAGE_BUS_DESTINATION: self.target.address, |
| tags.PEER_ADDRESS: connection.connected_address, |
| tags.PEER_HOSTNAME: connection.hostname, |
| 'inserted_by': 'proton-message-tracing' |
| } |
| span = tracer.start_span('amqp-delivery-send', tags=span_tags) |
| headers = {} |
| tracer.inject(span, Format.TEXT_MAP, headers) |
| if msg.annotations is None: |
| msg.annotations = {_trace_key: headers} |
| else: |
| msg.annotations[_trace_key] = headers |
| delivery = ProtonSender.send(self, msg) |
| delivery.span = span |
| span.set_tag('delivery-tag', delivery.tag) |
| return delivery |
| |
| |
| # Monkey patch proton for tracing (need to patch both internal and external names) |
| proton._handlers.IncomingMessageHandler = IncomingMessageHandler |
| proton._handlers.OutgoingMessageHandler = OutgoingMessageHandler |
| proton._endpoints.Sender = Sender |
| proton.handlers.IncomingMessageHandler = IncomingMessageHandler |
| proton.handlers.OutgoingMessageHandler = OutgoingMessageHandler |
| proton.Sender = Sender |