blob: a571fe8cba66e8ee8788630f72f503297e9af40f [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <opentelemetry/context/propagation/global_propagator.h>
#include <opentelemetry/nostd/unique_ptr.h>
#include <opentelemetry/sdk/trace/simple_processor.h>
#include <opentelemetry/sdk/trace/tracer_provider.h>
#include <opentelemetry/sdk/version/version.h>
#include <opentelemetry/trace/context.h>
#include <opentelemetry/trace/propagation/http_trace_context.h>
#include <opentelemetry/trace/provider.h>
#include <opentelemetry/trace/span.h>
#include <opentelemetry/trace/tracer.h>
#include <proton/messaging_handler.hpp>
#include <proton/annotation_key.hpp>
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/receiver.hpp>
#include <proton/sender.hpp>
#include <proton/source.hpp>
#include <proton/target.hpp>
#include <proton/tracing.hpp>
#include <proton/tracker.hpp>
#include <proton/transfer.hpp>
#include "proton/link.hpp"
#include <proton/link.h>
#include "tracing_private.hpp"
#include <functional>
#include <iostream>
#include <memory>
#include <sstream>
#include <string>
#include <unordered_map>
namespace proton
{
namespace nostd = opentelemetry::nostd;
namespace sdktrace = opentelemetry::sdk::trace;
const std::string kContextKey = "x-opt-qpid-tracestate";
// TODO: Have a delivery context to do the work, instead of having a map to associate the spans with the delivery tags.
// A map to associate the spans with the delivery tags, needed for ending the spans on message settled.
// Deleting the map entries after the span is ended to avoid the memory leakage in future.
std::unordered_map<binary, nostd::shared_ptr<opentelemetry::trace::Span>> tag_span;
class AMQPMapCarrier : public opentelemetry::context::propagation::TextMapCarrier {
public:
AMQPMapCarrier(const proton::map<annotation_key, value>* message_annotations) : message_annotations_(message_annotations) {}
virtual nostd::string_view Get(nostd::string_view key) const noexcept override {
std::string key_to_compare = key.data();
proton::value v_tracing_map = message_annotations_->get(annotation_key(kContextKey));
proton::map<proton::annotation_key, proton::value> tracing_map;
if (!v_tracing_map.empty())
get(v_tracing_map, tracing_map);
if (tracing_map.exists(annotation_key(key_to_compare))) {
value extracted_value = tracing_map.get(annotation_key(key_to_compare));
std::string extracted_string = to_string(extracted_value);
extracted_strings.push_back(extracted_string);
nostd::string_view final_extracted_string = nostd::string_view(extracted_strings.back());
return final_extracted_string;
}
return "";
}
virtual void Set(nostd::string_view key,
nostd::string_view val) noexcept override {
proton::value v_tracing_map = message_annotations_->get(annotation_key(kContextKey));
proton::map<proton::annotation_key, proton::value> tracing_map;
if (!v_tracing_map.empty())
get(v_tracing_map, tracing_map);
tracing_map.put(annotation_key(std::string(key)), value(std::string(val)));
((proton::map<proton::annotation_key, proton::value>*)message_annotations_)->put(annotation_key(kContextKey), tracing_map);
}
const proton::map<annotation_key, value>* message_annotations_;
mutable std::vector<std::string> extracted_strings;
};
nostd::shared_ptr<opentelemetry::trace::Tracer> get_tracer() {
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
nostd::shared_ptr<opentelemetry::trace::Tracer> tracer = provider->GetTracer("qpid-tracer", OPENTELEMETRY_SDK_VERSION);
return tracer;
}
class OpentelemetryTracing : public Tracing {
public:
void message_encode(const message& message, std::vector<char>& buf, const binary& tag, const tracker& track) override {
proton::message message_cp = message;
opentelemetry::trace::StartSpanOptions options;
options.kind = opentelemetry::trace::SpanKind::kProducer;
opentelemetry::context::Context ctx = opentelemetry::context::RuntimeContext::GetCurrent();
options.parent = opentelemetry::trace::GetSpan(ctx)->GetContext();
std::string tag_in_string = std::string(tag);
std::stringstream ss;
for (int i = 0; i < (int)tag_in_string.length(); ++i)
ss << std::hex << (int)tag[i];
std::string delivery_tag = ss.str();
sender s = track.sender();
target t = s.target();
std::string t_addr = t.address();
std::string delivery_state = to_string(track.state());
nostd::shared_ptr<opentelemetry::trace::Span> span = get_tracer()->StartSpan(
"amqp-delivery-send",
{{"Delivery_tag", delivery_tag}, {"Destination_address", t_addr}},
options);
opentelemetry::trace::Scope scope = proton::get_tracer()->WithActiveSpan(span);
// Inject current context into AMQP message annotations
opentelemetry::context::Context current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
AMQPMapCarrier carrier(&message_cp.message_annotations());
nostd::shared_ptr<opentelemetry::context::propagation::TextMapPropagator> prop =
opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
prop->Inject(carrier, current_ctx);
tag_span[tag] = span;
message_cp.encode(buf);
}
void on_message_handler(messaging_handler& h, delivery& d, message& message) override {
opentelemetry::trace::StartSpanOptions options;
options.kind = opentelemetry::trace::SpanKind::kConsumer;
// Extract context from AMQP message annotations
const AMQPMapCarrier carrier(&message.message_annotations());
nostd::shared_ptr<opentelemetry::context::propagation::TextMapPropagator> prop =
opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
opentelemetry::context::Context current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
opentelemetry::context::Context new_context = prop->Extract(carrier, current_ctx);
options.parent = opentelemetry::trace::GetSpan(new_context)->GetContext();
binary tag_in_binary = d.tag();
std::string tag_in_string = std::string(d.tag());
std::stringstream ss;
for (int i = 0; i < (int)tag_in_string.length(); ++i)
ss << std::hex << (int)tag_in_binary[i];
std::string delivery_tag = ss.str();
receiver r = d.receiver();
source s = r.source();
std::string s_addr = s.address();
transfer tt(d);
std::string delivery_state = to_string(tt.state());
nostd::shared_ptr<opentelemetry::trace::Span> span = get_tracer()->StartSpan(
"amqp-message-received",
{{"Delivery_tag", delivery_tag}, {"Source_address", s_addr}},
options);
opentelemetry::trace::Scope scope = get_tracer()->WithActiveSpan(span);
h.on_message(d, message);
span->End();
}
void on_settled_span(tracker& track) override {
binary tag = track.tag();
nostd::shared_ptr<opentelemetry::trace::Span> span = tag_span[tag];
std::string delivery_state = to_string(track.state());
span->AddEvent("delivery state: " + delivery_state);
span->End();
// Delete map entries.
tag_span.erase(tag);
}
};
static OpentelemetryTracing otel;
void initOpenTelemetryTracer()
{
Tracing::activate(otel);
// Set global propagator
opentelemetry::context::propagation::GlobalTextMapPropagator::
SetGlobalPropagator(
nostd::shared_ptr<
opentelemetry::context::propagation::TextMapPropagator>(
new opentelemetry::trace::propagation::HttpTraceContext()));
}
} // namespace proton