blob: 9016a720e884e7b16efd4722c2b28e2ea0347307 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use super::{log_exception, Plugin};
use crate::{
component::COMPONENT_AMQP_PRODUCER_ID,
context::{RequestContext, SW_HEADER},
execute::{get_this_mut, validate_num_args, AfterExecuteHook, BeforeExecuteHook},
tag::{TAG_MQ_BROKER, TAG_MQ_QUEUE, TAG_MQ_TOPIC},
};
use anyhow::Context;
use phper::{
arrays::ZArray,
classes::ClassEntry,
functions::call,
objects::{ZObj, ZObject},
values::{ExecuteData, ZVal},
};
use skywalking::{skywalking_proto::v3::SpanLayer, trace::span::Span};
#[derive(Default, Clone)]
pub struct AmqplibPlugin;
impl Plugin for AmqplibPlugin {
fn class_names(&self) -> Option<&'static [&'static str]> {
Some(&["PhpAmqpLib\\Channel\\AMQPChannel"])
}
fn function_name_prefix(&self) -> Option<&'static str> {
None
}
fn hook(
&self, class_name: Option<&str>, function_name: &str,
) -> Option<(
Box<crate::execute::BeforeExecuteHook>,
Box<crate::execute::AfterExecuteHook>,
)> {
match (class_name, function_name) {
(
Some(class_name @ "PhpAmqpLib\\Channel\\AMQPChannel"),
function_name @ "basic_publish",
) => Some(self.hook_channel_basic_publish(class_name, function_name)),
_ => None,
}
}
}
impl AmqplibPlugin {
fn hook_channel_basic_publish(
&self, class_name: &str, function_name: &str,
) -> (Box<BeforeExecuteHook>, Box<AfterExecuteHook>) {
let class_name = class_name.to_owned();
let function_name = function_name.to_owned();
(
Box::new(move |request_id, execute_data| {
validate_num_args(execute_data, 3)?;
let this = get_this_mut(execute_data)?;
let peer = Self::get_peer(this);
let exchange = execute_data
.get_parameter(1)
.as_z_str()
.and_then(|s| s.to_str().ok())
.map(ToOwned::to_owned)
.unwrap_or_else(|| "unknown".to_owned());
let routing_key = execute_data
.get_parameter(2)
.as_z_str()
.and_then(|s| s.to_str().ok())
.map(ToOwned::to_owned)
.unwrap_or_else(|| "unknown".to_owned());
let span = Self::create_exit_span(
request_id,
&class_name,
&function_name,
&peer,
&exchange,
&routing_key,
)?;
Self::inject_sw_header(request_id, execute_data)?;
Ok(Box::new(span))
}),
Box::new(move |_, span, _, _| {
let mut span = span.downcast::<Span>().unwrap();
log_exception(&mut span);
Ok(())
}),
)
}
fn get_peer(this: &mut ZObj) -> String {
let Some(io) = this.get_property("connection").as_z_obj().and_then(|connection| connection.get_property("io").as_z_obj()) else {
return "unknown:0".to_owned();
};
let host = io
.get_property("host")
.as_z_str()
.and_then(|s| s.to_str().ok())
.unwrap_or("unknown");
let port = io.get_property("port").as_long().unwrap_or_default();
format!("{}:{}", host, port)
}
fn create_exit_span(
request_id: Option<i64>, class_name: &str, function_name: &str, peer: &str, exchange: &str,
routing_key: &str,
) -> crate::Result<Span> {
let mut span = RequestContext::try_with_global_ctx(request_id, |ctx| {
Ok(ctx.create_exit_span(&format!("{}->{}", class_name, function_name), peer))
})?;
let mut span_object = span.span_object_mut();
span_object.set_span_layer(SpanLayer::Mq);
span_object.component_id = COMPONENT_AMQP_PRODUCER_ID;
span_object.add_tag(TAG_MQ_BROKER, peer);
span_object.add_tag(TAG_MQ_TOPIC, exchange);
span_object.add_tag(TAG_MQ_QUEUE, routing_key);
drop(span_object);
Ok(span)
}
fn inject_sw_header(
request_id: Option<i64>, execute_data: &mut ExecuteData,
) -> crate::Result<()> {
const HEADER_NAME: &str = "application_headers";
let sw_header = RequestContext::try_get_sw_header(request_id)?;
let message = execute_data
.get_mut_parameter(0)
.as_mut_z_obj()
.context("message isn't object")?;
let has = message
.call("has", [ZVal::from(HEADER_NAME)])?
.expect_bool()?;
if has {
let mut headers = message.call("get", [ZVal::from(HEADER_NAME)])?;
let headers = headers.expect_mut_z_obj()?;
headers.call("set", [ZVal::from(SW_HEADER), ZVal::from(sw_header)])?;
} else {
let headers = Self::new_sw_headers(&sw_header)?;
message.call("set", [ZVal::from(HEADER_NAME), ZVal::from(headers)])?;
}
Ok(())
}
fn new_sw_headers(sw_header: &str) -> crate::Result<ZObject> {
let mut arr = ZArray::new();
arr.insert(SW_HEADER, sw_header);
let class_name = "PhpAmqpLib\\Wire\\AMQPTable";
let exists = call("class_exists", [ZVal::from(class_name), ZVal::from(true)])?;
if !exists.as_bool().unwrap_or_default() {
return Err(format!("Class {} not exists", class_name).into());
}
let obj = ClassEntry::from_globals(class_name)?.new_object([ZVal::from(arr)])?;
Ok(obj)
}
}