blob: 5e2468a5fd5b72560d9c820665aeee6cd5a3a58a [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from skywalking import Layer, Component
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag
def install():
from pika.channel import Channel
_basic_publish = Channel.basic_publish
__on_deliver = Channel._on_deliver
Channel.basic_publish = _sw_basic_publish_func(_basic_publish)
Channel._on_deliver = _sw__on_deliver_func(__on_deliver)
def _sw_basic_publish_func(_basic_publish):
def _sw_basic_publish(this, exchange,
routing_key,
body,
properties=None,
mandatory=False):
peer = '%s:%s' % (this.connection.params.host, this.connection.params.port)
context = get_context()
import pika
with context.new_exit_span(op="RabbitMQ/Topic/" + exchange + "/Queue/" + routing_key + "/Producer" or "/",
peer=peer, component=Component.RabbitmqProducer) as span:
carrier = span.inject()
span.layer = Layer.MQ
properties = pika.BasicProperties() if properties is None else properties
if properties.headers is None:
properties.headers = {}
for item in carrier:
properties.headers[item.key] = item.val
res = _basic_publish(this, exchange,
routing_key,
body,
properties=properties,
mandatory=mandatory)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=exchange))
span.tag(Tag(key=tags.MqQueue, val=routing_key))
return res
return _sw_basic_publish
def _sw__on_deliver_func(__on_deliver):
def _sw__on_deliver(this, method_frame, header_frame, body):
peer = '%s:%s' % (this.connection.params.host, this.connection.params.port)
context = get_context()
exchange = method_frame.method.exchange
routing_key = method_frame.method.routing_key
carrier = Carrier()
for item in carrier:
if item.key in header_frame.properties.headers:
item.val = header_frame.properties.headers[item.key]
with context.new_entry_span(op="RabbitMQ/Topic/" + exchange + "/Queue/" + routing_key
+ "/Consumer" or "", carrier=carrier) as span:
span.layer = Layer.MQ
span.component = Component.RabbitmqConsumer
__on_deliver(this, method_frame, header_frame, body)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=exchange))
span.tag(Tag(key=tags.MqQueue, val=routing_key))
return _sw__on_deliver