| -- |
| -- Licensed to the Apache Software Foundation (ASF) under one or more |
| -- contributor license agreements. See the NOTICE file distributed with |
| -- this work for additional information regarding copyright ownership. |
| -- The ASF licenses this file to You under the Apache License, Version 2.0 |
| -- (the "License"); you may not use this file except in compliance with |
| -- the License. You may obtain a copy of the License at |
| -- |
| -- http://www.apache.org/licenses/LICENSE-2.0 |
| -- |
| -- Unless required by applicable law or agreed to in writing, software |
| -- distributed under the License is distributed on an "AS IS" BASIS, |
| -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| -- See the License for the specific language governing permissions and |
| -- limitations under the License. |
| -- |
| local core = require("apisix.core") |
| local new_tracer = require("opentracing.tracer").new |
| local zipkin_codec = require("apisix.plugins.zipkin.codec") |
| local new_random_sampler = require("apisix.plugins.zipkin.random_sampler").new |
| local new_reporter = require("apisix.plugins.zipkin.reporter").new |
| local ngx = ngx |
| local pairs = pairs |
| local tonumber = tonumber |
| |
| local plugin_name = "zipkin" |
| |
| |
| local lrucache = core.lrucache.new({ |
| type = "plugin", |
| }) |
| |
| local schema = { |
| type = "object", |
| properties = { |
| endpoint = {type = "string"}, |
| sample_ratio = {type = "number", minimum = 0.00001, maximum = 1}, |
| service_name = { |
| type = "string", |
| description = "service name for zipkin reporter", |
| default = "APISIX", |
| }, |
| server_addr = { |
| type = "string", |
| description = "default is $server_addr, you can speific your external ip address", |
| pattern = "^[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}$" |
| }, |
| }, |
| required = {"endpoint", "sample_ratio"} |
| } |
| |
| |
| local _M = { |
| version = 0.1, |
| priority = 11011, |
| name = plugin_name, |
| schema = schema, |
| } |
| |
| |
| function _M.check_schema(conf) |
| return core.schema.check(schema, conf) |
| end |
| |
| |
| local function create_tracer(conf,ctx) |
| |
| local headers = core.request.headers(ctx) |
| |
| -- X-B3-Sampled: if an upstream decided to sample this request, we do too. |
| local sample = headers["x-b3-sampled"] |
| if sample == "1" or sample == "true" then |
| conf.sample_ratio = 1 |
| elseif sample == "0" or sample == "false" then |
| conf.sample_ratio = 0 |
| end |
| |
| -- X-B3-Flags: if it equals '1' then it overrides sampling policy |
| -- We still want to warn on invalid sample header, so do this after the above |
| local debug = headers["x-b3-flags"] |
| if debug == "1" then |
| conf.sample_ratio = 1 |
| end |
| |
| local reporter = new_reporter(conf) |
| reporter:init_processor() |
| local tracer = new_tracer(reporter, new_random_sampler(conf)) |
| tracer:register_injector("http_headers", zipkin_codec.new_injector()) |
| tracer:register_extractor("http_headers", zipkin_codec.new_extractor()) |
| return tracer |
| end |
| |
| function _M.rewrite(plugin_conf, ctx) |
| local conf = core.table.clone(plugin_conf) |
| -- once the server started, server_addr and server_port won't change, so we can cache it. |
| conf.server_port = tonumber(ctx.var['server_port']) |
| |
| if not conf.server_addr or conf.server_addr == '' then |
| conf.server_addr = ctx.var["server_addr"] |
| end |
| |
| local tracer = core.lrucache.plugin_ctx(lrucache, ctx, conf.server_addr, |
| create_tracer, conf, ctx) |
| |
| ctx.opentracing_sample = tracer.sampler:sample() |
| if not ctx.opentracing_sample then |
| return |
| end |
| |
| local wire_context = tracer:extract("http_headers", |
| core.request.headers(ctx)) |
| |
| local start_timestamp = ngx.req.start_time() |
| local request_span = tracer:start_span("apisix.request", { |
| child_of = wire_context, |
| start_timestamp = start_timestamp, |
| tags = { |
| component = "apisix", |
| ["span.kind"] = "server", |
| ["http.method"] = ctx.var.request_method, |
| ["http.url"] = ctx.var.request_uri, |
| -- TODO: support ipv6 |
| ["peer.ipv4"] = core.request.get_remote_client_ip(ctx), |
| ["peer.port"] = core.request.get_remote_client_port(ctx), |
| } |
| }) |
| |
| ctx.opentracing = { |
| tracer = tracer, |
| wire_context = wire_context, |
| request_span = request_span, |
| rewrite_span = nil, |
| access_span = nil, |
| proxy_span = nil, |
| } |
| |
| local request_span = ctx.opentracing.request_span |
| ctx.opentracing.rewrite_span = request_span:start_child_span( |
| "apisix.rewrite", start_timestamp) |
| |
| ctx.REWRITE_END_TIME = tracer:time() |
| ctx.opentracing.rewrite_span:finish(ctx.REWRITE_END_TIME) |
| end |
| |
| function _M.access(conf, ctx) |
| if not ctx.opentracing_sample then |
| return |
| end |
| |
| local opentracing = ctx.opentracing |
| |
| opentracing.access_span = opentracing.request_span:start_child_span( |
| "apisix.access", ctx.REWRITE_END_TIME) |
| |
| local tracer = opentracing.tracer |
| |
| ctx.ACCESS_END_TIME = tracer:time() |
| opentracing.access_span:finish(ctx.ACCESS_END_TIME) |
| |
| opentracing.proxy_span = opentracing.request_span:start_child_span( |
| "apisix.proxy", ctx.ACCESS_END_TIME) |
| |
| -- send headers to upstream |
| local outgoing_headers = {} |
| tracer:inject(opentracing.proxy_span, "http_headers", outgoing_headers) |
| for k, v in pairs(outgoing_headers) do |
| core.request.set_header(k, v) |
| end |
| end |
| |
| |
| function _M.header_filter(conf, ctx) |
| if not ctx.opentracing_sample then |
| return |
| end |
| |
| local opentracing = ctx.opentracing |
| |
| ctx.HEADER_FILTER_END_TIME = opentracing.tracer:time() |
| if opentracing.proxy_span then |
| opentracing.body_filter_span = opentracing.proxy_span:start_child_span( |
| "apisix.body_filter", ctx.HEADER_FILTER_END_TIME) |
| end |
| end |
| |
| |
| function _M.log(conf, ctx) |
| if not ctx.opentracing_sample then |
| return |
| end |
| |
| local opentracing = ctx.opentracing |
| |
| local log_end_time = opentracing.tracer:time() |
| if opentracing.body_filter_span then |
| opentracing.body_filter_span:finish(log_end_time) |
| end |
| |
| local upstream_status = core.response.get_upstream_status(ctx) |
| opentracing.request_span:set_tag("http.status_code", upstream_status) |
| if opentracing.proxy_span then |
| opentracing.proxy_span:finish(log_end_time) |
| end |
| |
| opentracing.request_span:finish(log_end_time) |
| end |
| |
| return _M |