blob: d04d9bb79d07f54d390b5503caa1f76463b9320a [file] [log] [blame]
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local base_prometheus = require("prometheus")
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local control = require("apisix.control.v1")
local ipairs = ipairs
local pairs = pairs
local ngx = ngx
local re_gmatch = ngx.re.gmatch
local ffi = require("ffi")
local C = ffi.C
local pcall = pcall
local select = select
local type = type
local prometheus
local router = require("apisix.router")
local get_routes = router.http_routes
local get_ssls = router.ssls
local get_services = require("apisix.http.service").services
local get_consumers = require("apisix.consumer").consumers
local get_upstreams = require("apisix.upstream").upstreams
local get_global_rules = require("apisix.global_rules").global_rules
local get_global_rules_prev_index = require("apisix.global_rules").get_pre_index
local clear_tab = core.table.clear
local get_stream_routes = router.stream_routes
local get_protos = require("apisix.plugins.grpc-transcode.proto").protos
local service_fetch = require("apisix.http.service").get
local latency_details = require("apisix.utils.log-util").latency_details_in_ms
local xrpc = require("apisix.stream.xrpc")
local unpack = unpack
local next = next
local ngx_capture
if ngx.config.subsystem == "http" then
ngx_capture = ngx.location.capture
end
local plugin_name = "prometheus"
local default_export_uri = "/apisix/prometheus/metrics"
-- Default set of latency buckets, 1ms to 60s:
local DEFAULT_BUCKETS = {1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 30000, 60000}
local metrics = {}
local inner_tab_arr = {}
local function gen_arr(...)
clear_tab(inner_tab_arr)
for i = 1, select('#', ...) do
inner_tab_arr[i] = select(i, ...)
end
return inner_tab_arr
end
local extra_labels_tbl = {}
local function extra_labels(name, ctx)
clear_tab(extra_labels_tbl)
local attr = plugin.plugin_attr("prometheus")
local metrics = attr.metrics
if metrics and metrics[name] and metrics[name].extra_labels then
local labels = metrics[name].extra_labels
for _, kv in ipairs(labels) do
local val, v = next(kv)
if ctx then
val = ctx.var[v:sub(2)]
if val == nil then
val = ""
end
end
core.table.insert(extra_labels_tbl, val)
end
end
return extra_labels_tbl
end
local _M = {}
local function init_stream_metrics()
metrics.stream_connection_total = prometheus:counter("stream_connection_total",
"Total number of connections handled per stream route in APISIX",
{"route"})
xrpc.init_metrics(prometheus)
end
function _M.http_init(prometheus_enabled_in_stream)
-- todo: support hot reload, we may need to update the lua-prometheus
-- library
if ngx.get_phase() ~= "init" and ngx.get_phase() ~= "init_worker" then
return
end
clear_tab(metrics)
-- Newly added metrics should follow the naming best practices described in
-- https://prometheus.io/docs/practices/naming/#metric-names
-- For example,
-- 1. Add unit as the suffix
-- 2. Add `_total` as the suffix if the metric type is counter
-- 3. Use base unit
-- We keep the old metric names for the compatibility.
-- across all services
local metric_prefix = "apisix_"
local attr = plugin.plugin_attr("prometheus")
if attr and attr.metric_prefix then
metric_prefix = attr.metric_prefix
end
local exptime
if attr and attr.expire then
exptime = attr.expire
end
prometheus = base_prometheus.init("prometheus-metrics", metric_prefix)
metrics.connections = prometheus:gauge("nginx_http_current_connections",
"Number of HTTP connections",
{"state"})
metrics.requests = prometheus:gauge("http_requests_total",
"The total number of client requests since APISIX started")
metrics.etcd_reachable = prometheus:gauge("etcd_reachable",
"Config server etcd reachable from APISIX, 0 is unreachable")
metrics.node_info = prometheus:gauge("node_info",
"Info of APISIX node",
{"hostname"})
metrics.etcd_modify_indexes = prometheus:gauge("etcd_modify_indexes",
"Etcd modify index for APISIX keys",
{"key"})
metrics.shared_dict_capacity_bytes = prometheus:gauge("shared_dict_capacity_bytes",
"The capacity of each nginx shared DICT since APISIX start",
{"name"})
metrics.shared_dict_free_space_bytes = prometheus:gauge("shared_dict_free_space_bytes",
"The free space of each nginx shared DICT since APISIX start",
{"name"})
metrics.upstream_status = prometheus:gauge("upstream_status",
"Upstream status from health check",
{"name", "ip", "port"},
exptime)
-- per service
-- The consumer label indicates the name of consumer corresponds to the
-- request to the route/service, it will be an empty string if there is
-- no consumer in request.
metrics.status = prometheus:counter("http_status",
"HTTP status codes per service in APISIX",
{"code", "route", "matched_uri", "matched_host", "service", "consumer", "node",
unpack(extra_labels("http_status"))},
exptime)
local buckets = DEFAULT_BUCKETS
if attr and attr.default_buckets then
buckets = attr.default_buckets
end
metrics.latency = prometheus:histogram("http_latency",
"HTTP request latency in milliseconds per service in APISIX",
{"type", "route", "service", "consumer", "node", unpack(extra_labels("http_latency"))},
buckets, exptime)
metrics.bandwidth = prometheus:counter("bandwidth",
"Total bandwidth in bytes consumed per service in APISIX",
{"type", "route", "service", "consumer", "node", unpack(extra_labels("bandwidth"))},
exptime)
if prometheus_enabled_in_stream then
init_stream_metrics()
end
end
function _M.stream_init()
if ngx.get_phase() ~= "init" and ngx.get_phase() ~= "init_worker" then
return
end
if not pcall(function() return C.ngx_meta_lua_ffi_shdict_udata_to_zone end) then
core.log.error("need to build APISIX-Runtime to support L4 metrics")
return
end
clear_tab(metrics)
local metric_prefix = "apisix_"
local attr = plugin.plugin_attr("prometheus")
if attr and attr.metric_prefix then
metric_prefix = attr.metric_prefix
end
prometheus = base_prometheus.init("prometheus-metrics", metric_prefix)
init_stream_metrics()
end
function _M.http_log(conf, ctx)
local vars = ctx.var
local route_id = ""
local balancer_ip = ctx.balancer_ip or ""
local service_id = ""
local consumer_name = ctx.consumer_name or ""
local matched_route = ctx.matched_route and ctx.matched_route.value
if matched_route then
route_id = matched_route.id
service_id = matched_route.service_id or ""
if conf.prefer_name == true then
route_id = matched_route.name or route_id
if service_id ~= "" then
local service = service_fetch(service_id)
service_id = service and service.value.name or service_id
end
end
end
local matched_uri = ""
local matched_host = ""
if ctx.curr_req_matched then
matched_uri = ctx.curr_req_matched._path or ""
matched_host = ctx.curr_req_matched._host or ""
end
metrics.status:inc(1,
gen_arr(vars.status, route_id, matched_uri, matched_host,
service_id, consumer_name, balancer_ip,
unpack(extra_labels("http_status", ctx))))
local latency, upstream_latency, apisix_latency = latency_details(ctx)
local latency_extra_label_values = extra_labels("http_latency", ctx)
metrics.latency:observe(latency,
gen_arr("request", route_id, service_id, consumer_name, balancer_ip,
unpack(latency_extra_label_values)))
if upstream_latency then
metrics.latency:observe(upstream_latency,
gen_arr("upstream", route_id, service_id, consumer_name, balancer_ip,
unpack(latency_extra_label_values)))
end
metrics.latency:observe(apisix_latency,
gen_arr("apisix", route_id, service_id, consumer_name, balancer_ip,
unpack(latency_extra_label_values)))
local bandwidth_extra_label_values = extra_labels("bandwidth", ctx)
metrics.bandwidth:inc(vars.request_length,
gen_arr("ingress", route_id, service_id, consumer_name, balancer_ip,
unpack(bandwidth_extra_label_values)))
metrics.bandwidth:inc(vars.bytes_sent,
gen_arr("egress", route_id, service_id, consumer_name, balancer_ip,
unpack(bandwidth_extra_label_values)))
end
function _M.stream_log(conf, ctx)
local route_id = ""
local matched_route = ctx.matched_route and ctx.matched_route.value
if matched_route then
route_id = matched_route.id
if conf.prefer_name == true then
route_id = matched_route.name or route_id
end
end
metrics.stream_connection_total:inc(1, gen_arr(route_id))
end
local ngx_status_items = {"active", "accepted", "handled", "total",
"reading", "writing", "waiting"}
local label_values = {}
local function nginx_status()
local res = ngx_capture("/apisix/nginx_status")
if not res or res.status ~= 200 then
core.log.error("failed to fetch Nginx status")
return
end
-- Active connections: 2
-- server accepts handled requests
-- 26 26 84
-- Reading: 0 Writing: 1 Waiting: 1
local iterator, err = re_gmatch(res.body, [[(\d+)]], "jmo")
if not iterator then
core.log.error("failed to re.gmatch Nginx status: ", err)
return
end
for _, name in ipairs(ngx_status_items) do
local val = iterator()
if not val then
break
end
if name == "total" then
metrics.requests:set(val[0])
else
label_values[1] = name
metrics.connections:set(val[0], label_values)
end
end
end
local key_values = {}
local function set_modify_index(key, items, items_ver, global_max_index)
clear_tab(key_values)
local max_idx = 0
if items_ver and items then
for _, item in ipairs(items) do
if type(item) == "table" then
local modify_index = item.orig_modifiedIndex or item.modifiedIndex
if modify_index > max_idx then
max_idx = modify_index
end
end
end
end
key_values[1] = key
metrics.etcd_modify_indexes:set(max_idx, key_values)
global_max_index = max_idx > global_max_index and max_idx or global_max_index
return global_max_index
end
local function etcd_modify_index()
clear_tab(key_values)
local global_max_idx = 0
-- routes
local routes, routes_ver = get_routes()
global_max_idx = set_modify_index("routes", routes, routes_ver, global_max_idx)
-- services
local services, services_ver = get_services()
global_max_idx = set_modify_index("services", services, services_ver, global_max_idx)
-- ssls
local ssls, ssls_ver = get_ssls()
global_max_idx = set_modify_index("ssls", ssls, ssls_ver, global_max_idx)
-- consumers
local consumers, consumers_ver = get_consumers()
global_max_idx = set_modify_index("consumers", consumers, consumers_ver, global_max_idx)
-- global_rules
local global_rules, global_rules_ver = get_global_rules()
if global_rules then
global_max_idx = set_modify_index("global_rules", global_rules,
global_rules_ver, global_max_idx)
-- prev_index
key_values[1] = "prev_index"
local prev_index = get_global_rules_prev_index()
metrics.etcd_modify_indexes:set(prev_index, key_values)
else
global_max_idx = set_modify_index("global_rules", nil, nil, global_max_idx)
end
-- upstreams
local upstreams, upstreams_ver = get_upstreams()
global_max_idx = set_modify_index("upstreams", upstreams, upstreams_ver, global_max_idx)
-- stream_routes
local stream_routes, stream_routes_ver = get_stream_routes()
global_max_idx = set_modify_index("stream_routes", stream_routes,
stream_routes_ver, global_max_idx)
-- proto
local protos, protos_ver = get_protos()
global_max_idx = set_modify_index("protos", protos, protos_ver, global_max_idx)
-- global max
key_values[1] = "max_modify_index"
metrics.etcd_modify_indexes:set(global_max_idx, key_values)
end
local function shared_dict_status()
local name = {}
for shared_dict_name, shared_dict in pairs(ngx.shared) do
name[1] = shared_dict_name
metrics.shared_dict_capacity_bytes:set(shared_dict:capacity(), name)
metrics.shared_dict_free_space_bytes:set(shared_dict:free_space(), name)
end
end
local function collect(ctx, stream_only)
if not prometheus or not metrics then
core.log.error("prometheus: plugin is not initialized, please make sure ",
" 'prometheus_metrics' shared dict is present in nginx template")
return 500, {message = "An unexpected error occurred"}
end
-- collect ngx.shared.DICT status
shared_dict_status()
-- across all services
nginx_status()
local config = core.config.new()
-- config server status
local vars = ngx.var or {}
local hostname = vars.hostname or ""
-- we can't get etcd index in metric server if only stream subsystem is enabled
if config.type == "etcd" and not stream_only then
-- etcd modify index
etcd_modify_index()
local version, err = config:server_version()
if version then
metrics.etcd_reachable:set(1)
else
metrics.etcd_reachable:set(0)
core.log.error("prometheus: failed to reach config server while ",
"processing metrics endpoint: ", err)
end
-- Because request any key from etcd will return the "X-Etcd-Index".
-- A non-existed key is preferred because it doesn't return too much data.
-- So use phantom key to get etcd index.
local res, _ = config:getkey("/phantomkey")
if res and res.headers then
clear_tab(key_values)
-- global max
key_values[1] = "x_etcd_index"
metrics.etcd_modify_indexes:set(res.headers["X-Etcd-Index"], key_values)
end
end
metrics.node_info:set(1, gen_arr(hostname))
-- update upstream_status metrics
local stats = control.get_health_checkers()
metrics.upstream_status:reset()
for _, stat in ipairs(stats) do
for _, node in ipairs(stat.nodes) do
metrics.upstream_status:set(
(node.status == "healthy" or node.status == "mostly_healthy") and 1 or 0,
gen_arr(stat.name, node.ip, node.port)
)
end
end
core.response.set_header("content_type", "text/plain")
return 200, core.table.concat(prometheus:metric_data())
end
_M.collect = collect
local function get_api(called_by_api_router)
local export_uri = default_export_uri
local attr = plugin.plugin_attr(plugin_name)
if attr and attr.export_uri then
export_uri = attr.export_uri
end
local api = {
methods = {"GET"},
uri = export_uri,
handler = collect
}
if not called_by_api_router then
return api
end
if attr.enable_export_server then
return {}
end
return {api}
end
_M.get_api = get_api
function _M.export_metrics(stream_only)
local api = get_api(false)
local uri = ngx.var.uri
local method = ngx.req.get_method()
if uri == api.uri and method == api.methods[1] then
local code, body = api.handler(nil, stream_only)
if code or body then
core.response.exit(code, body)
end
end
return core.response.exit(404)
end
function _M.metric_data()
return prometheus:metric_data()
end
function _M.get_prometheus()
return prometheus
end
return _M