blob: 2d74f3396e2a9340d19518405e1b7023f3888f1b [file] [log] [blame]
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local new_tracer = require("opentracing.tracer").new
local zipkin_codec = require("apisix.plugins.zipkin.codec")
local new_random_sampler = require("apisix.plugins.zipkin.random_sampler").new
local new_reporter = require("apisix.plugins.zipkin.reporter").new
local ngx = ngx
local ngx_re = require("ngx.re")
local pairs = pairs
local tonumber = tonumber
local plugin_name = "zipkin"
local ZIPKIN_SPAN_VER_1 = 1
local ZIPKIN_SPAN_VER_2 = 2
local lrucache = core.lrucache.new({
type = "plugin",
})
local schema = {
type = "object",
properties = {
endpoint = {type = "string"},
sample_ratio = {type = "number", minimum = 0.00001, maximum = 1},
service_name = {
type = "string",
description = "service name for zipkin reporter",
default = "APISIX",
},
server_addr = {
type = "string",
description = "default is $server_addr, you can specify your external ip address",
pattern = "^[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}$"
},
span_version = {
enum = {ZIPKIN_SPAN_VER_1, ZIPKIN_SPAN_VER_2},
default = ZIPKIN_SPAN_VER_2,
},
},
required = {"endpoint", "sample_ratio"}
}
local _M = {
version = 0.1,
priority = 11011,
name = plugin_name,
schema = schema,
}
function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
local function create_tracer(conf,ctx)
conf.route_id = ctx.route_id
local reporter = new_reporter(conf)
reporter:init_processor()
local tracer = new_tracer(reporter, new_random_sampler(conf))
tracer:register_injector("http_headers", zipkin_codec.new_injector())
tracer:register_extractor("http_headers", zipkin_codec.new_extractor())
return tracer
end
local function parse_b3(b3)
-- See https://github.com/openzipkin/b3-propagation#single-header
if b3 == "0" then
return nil, nil, nil, "0", nil
end
local pieces, err = ngx_re.split(b3, "-", nil, nil, 4)
if not pieces then
return err
end
if not pieces[1] then
return "missing trace_id"
end
if not pieces[2] then
return "missing span_id"
end
return nil, pieces[1], pieces[2], pieces[3], pieces[4]
end
function _M.rewrite(plugin_conf, ctx)
local conf = core.table.clone(plugin_conf)
-- once the server started, server_addr and server_port won't change, so we can cache it.
conf.server_port = tonumber(ctx.var['server_port'])
if not conf.server_addr or conf.server_addr == '' then
conf.server_addr = ctx.var["server_addr"]
end
local tracer = core.lrucache.plugin_ctx(lrucache, ctx, conf.server_addr .. conf.server_port,
create_tracer, conf, ctx)
local headers = core.request.headers(ctx)
local per_req_sample_ratio
-- X-B3-Flags: if it equals '1' then it overrides sampling policy
-- We still want to warn on invalid sampled header, so do this after the above
local debug = headers["x-b3-flags"]
if debug == "1" then
per_req_sample_ratio = 1
end
local trace_id, request_span_id, sampled, parent_span_id
local b3 = headers["b3"]
if b3 then
-- don't pass b3 header by default
core.request.set_header(ctx, "b3", nil)
local err
err, trace_id, request_span_id, sampled, parent_span_id = parse_b3(b3)
if err then
core.log.error("invalid b3 header: ", b3, ", ignored: ", err)
return 400
end
if sampled == "d" then
core.request.set_header(ctx, "x-b3-flags", "1")
sampled = "1"
end
else
-- X-B3-Sampled: if the client decided to sample this request, we do too.
sampled = headers["x-b3-sampled"]
trace_id = headers["x-b3-traceid"]
parent_span_id = headers["x-b3-parentspanid"]
request_span_id = headers["x-b3-spanid"]
end
if sampled == "1" or sampled == "true" then
per_req_sample_ratio = 1
elseif sampled == "0" or sampled == "false" then
per_req_sample_ratio = 0
end
ctx.opentracing_sample = tracer.sampler:sample(per_req_sample_ratio or conf.sample_ratio)
if not ctx.opentracing_sample then
core.request.set_header(ctx, "x-b3-sampled", "0")
return
end
local zipkin_ctx = core.tablepool.fetch("zipkin_ctx", 0, 3)
zipkin_ctx.trace_id = trace_id
zipkin_ctx.parent_span_id = parent_span_id
zipkin_ctx.request_span_id = request_span_id
ctx.zipkin = zipkin_ctx
local wire_context = tracer:extract("http_headers", ctx)
local start_timestamp = ngx.req.start_time()
local request_span = tracer:start_span("apisix.request", {
child_of = wire_context,
start_timestamp = start_timestamp,
tags = {
component = "apisix",
["span.kind"] = "server",
["http.method"] = ctx.var.request_method,
["http.url"] = ctx.var.request_uri,
-- TODO: support ipv6
["peer.ipv4"] = core.request.get_remote_client_ip(ctx),
["peer.port"] = core.request.get_remote_client_port(ctx),
}
})
ctx.opentracing = {
tracer = tracer,
wire_context = wire_context,
request_span = request_span,
}
local request_span = ctx.opentracing.request_span
if conf.span_version == ZIPKIN_SPAN_VER_1 then
ctx.opentracing.rewrite_span = request_span:start_child_span("apisix.rewrite",
start_timestamp)
ctx.REWRITE_END_TIME = tracer:time()
ctx.opentracing.rewrite_span:finish(ctx.REWRITE_END_TIME)
else
ctx.opentracing.proxy_span = request_span:start_child_span("apisix.proxy",
start_timestamp)
end
end
function _M.access(conf, ctx)
if not ctx.opentracing_sample then
return
end
local opentracing = ctx.opentracing
local tracer = opentracing.tracer
if conf.span_version == ZIPKIN_SPAN_VER_1 then
opentracing.access_span = opentracing.request_span:start_child_span(
"apisix.access", ctx.REWRITE_END_TIME)
ctx.ACCESS_END_TIME = tracer:time()
opentracing.access_span:finish(ctx.ACCESS_END_TIME)
opentracing.proxy_span = opentracing.request_span:start_child_span(
"apisix.proxy", ctx.ACCESS_END_TIME)
end
-- send headers to upstream
local outgoing_headers = {}
tracer:inject(opentracing.proxy_span, "http_headers", outgoing_headers)
for k, v in pairs(outgoing_headers) do
core.request.set_header(ctx, k, v)
end
end
function _M.header_filter(conf, ctx)
if not ctx.opentracing_sample then
return
end
local opentracing = ctx.opentracing
local end_time = opentracing.tracer:time()
if conf.span_version == ZIPKIN_SPAN_VER_1 then
ctx.HEADER_FILTER_END_TIME = end_time
if opentracing.proxy_span then
opentracing.body_filter_span = opentracing.proxy_span:start_child_span(
"apisix.body_filter", ctx.HEADER_FILTER_END_TIME)
end
else
opentracing.proxy_span:finish(end_time)
opentracing.response_span = opentracing.request_span:start_child_span(
"apisix.response_span", ctx.HEADER_FILTER_END_TIME)
end
end
function _M.log(conf, ctx)
if not ctx.opentracing_sample then
return
end
local opentracing = ctx.opentracing
local log_end_time = opentracing.tracer:time()
if conf.span_version == ZIPKIN_SPAN_VER_1 then
if opentracing.body_filter_span then
opentracing.body_filter_span:finish(log_end_time)
end
if opentracing.proxy_span then
opentracing.proxy_span:finish(log_end_time)
end
else
opentracing.response_span:finish(log_end_time)
end
local upstream_status = core.response.get_upstream_status(ctx)
opentracing.request_span:set_tag("http.status_code", upstream_status)
opentracing.request_span:finish(log_end_time)
if ctx.zipkin_ctx then
core.tablepool.release("zipkin_ctx", ctx.zipkin_ctx)
end
end
return _M