| -- |
| -- Licensed to the Apache Software Foundation (ASF) under one or more |
| -- contributor license agreements. See the NOTICE file distributed with |
| -- this work for additional information regarding copyright ownership. |
| -- The ASF licenses this file to You under the Apache License, Version 2.0 |
| -- (the "License"); you may not use this file except in compliance with |
| -- the License. You may obtain a copy of the License at |
| -- |
| -- http://www.apache.org/licenses/LICENSE-2.0 |
| -- |
| -- Unless required by applicable law or agreed to in writing, software |
| -- distributed under the License is distributed on an "AS IS" BASIS, |
| -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| -- See the License for the specific language governing permissions and |
| -- limitations under the License. |
| -- |
| local base_prometheus = require("prometheus") |
| local core = require("apisix.core") |
| local plugin = require("apisix.plugin") |
| local control = require("apisix.control.v1") |
| local ipairs = ipairs |
| local pairs = pairs |
| local ngx = ngx |
| local re_gmatch = ngx.re.gmatch |
| local ffi = require("ffi") |
| local C = ffi.C |
| local pcall = pcall |
| local select = select |
| local type = type |
| local prometheus |
| local router = require("apisix.router") |
| local get_routes = router.http_routes |
| local get_ssls = router.ssls |
| local get_services = require("apisix.http.service").services |
| local get_consumers = require("apisix.consumer").consumers |
| local get_upstreams = require("apisix.upstream").upstreams |
| local get_global_rules = require("apisix.global_rules").global_rules |
| local get_global_rules_prev_index = require("apisix.global_rules").get_pre_index |
| local clear_tab = core.table.clear |
| local get_stream_routes = router.stream_routes |
| local get_protos = require("apisix.plugins.grpc-transcode.proto").protos |
| local service_fetch = require("apisix.http.service").get |
| local latency_details = require("apisix.utils.log-util").latency_details_in_ms |
| local xrpc = require("apisix.stream.xrpc") |
| local unpack = unpack |
| local next = next |
| |
| |
| local ngx_capture |
| if ngx.config.subsystem == "http" then |
| ngx_capture = ngx.location.capture |
| end |
| |
| |
| local plugin_name = "prometheus" |
| local default_export_uri = "/apisix/prometheus/metrics" |
| -- Default set of latency buckets, 1ms to 60s: |
| local DEFAULT_BUCKETS = {1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 30000, 60000} |
| |
| local metrics = {} |
| |
| local inner_tab_arr = {} |
| |
| local function gen_arr(...) |
| clear_tab(inner_tab_arr) |
| for i = 1, select('#', ...) do |
| inner_tab_arr[i] = select(i, ...) |
| end |
| |
| return inner_tab_arr |
| end |
| |
| local extra_labels_tbl = {} |
| |
| local function extra_labels(name, ctx) |
| clear_tab(extra_labels_tbl) |
| |
| local attr = plugin.plugin_attr("prometheus") |
| local metrics = attr.metrics |
| |
| if metrics and metrics[name] and metrics[name].extra_labels then |
| local labels = metrics[name].extra_labels |
| for _, kv in ipairs(labels) do |
| local val, v = next(kv) |
| if ctx then |
| val = ctx.var[v:sub(2)] |
| if val == nil then |
| val = "" |
| end |
| end |
| core.table.insert(extra_labels_tbl, val) |
| end |
| end |
| |
| return extra_labels_tbl |
| end |
| |
| |
| local _M = {} |
| |
| |
| local function init_stream_metrics() |
| metrics.stream_connection_total = prometheus:counter("stream_connection_total", |
| "Total number of connections handled per stream route in APISIX", |
| {"route"}) |
| |
| xrpc.init_metrics(prometheus) |
| end |
| |
| |
| function _M.http_init(prometheus_enabled_in_stream) |
| -- todo: support hot reload, we may need to update the lua-prometheus |
| -- library |
| if ngx.get_phase() ~= "init" and ngx.get_phase() ~= "init_worker" then |
| return |
| end |
| |
| clear_tab(metrics) |
| |
| -- Newly added metrics should follow the naming best practices described in |
| -- https://prometheus.io/docs/practices/naming/#metric-names |
| -- For example, |
| -- 1. Add unit as the suffix |
| -- 2. Add `_total` as the suffix if the metric type is counter |
| -- 3. Use base unit |
| -- We keep the old metric names for the compatibility. |
| |
| -- across all services |
| local metric_prefix = "apisix_" |
| local attr = plugin.plugin_attr("prometheus") |
| if attr and attr.metric_prefix then |
| metric_prefix = attr.metric_prefix |
| end |
| |
| local exptime |
| if attr and attr.expire then |
| exptime = attr.expire |
| end |
| |
| prometheus = base_prometheus.init("prometheus-metrics", metric_prefix) |
| |
| metrics.connections = prometheus:gauge("nginx_http_current_connections", |
| "Number of HTTP connections", |
| {"state"}) |
| |
| metrics.requests = prometheus:gauge("http_requests_total", |
| "The total number of client requests since APISIX started") |
| |
| metrics.etcd_reachable = prometheus:gauge("etcd_reachable", |
| "Config server etcd reachable from APISIX, 0 is unreachable") |
| |
| metrics.node_info = prometheus:gauge("node_info", |
| "Info of APISIX node", |
| {"hostname"}) |
| |
| metrics.etcd_modify_indexes = prometheus:gauge("etcd_modify_indexes", |
| "Etcd modify index for APISIX keys", |
| {"key"}) |
| |
| metrics.shared_dict_capacity_bytes = prometheus:gauge("shared_dict_capacity_bytes", |
| "The capacity of each nginx shared DICT since APISIX start", |
| {"name"}) |
| |
| metrics.shared_dict_free_space_bytes = prometheus:gauge("shared_dict_free_space_bytes", |
| "The free space of each nginx shared DICT since APISIX start", |
| {"name"}) |
| |
| metrics.upstream_status = prometheus:gauge("upstream_status", |
| "Upstream status from health check", |
| {"name", "ip", "port"}, |
| exptime) |
| |
| -- per service |
| |
| -- The consumer label indicates the name of consumer corresponds to the |
| -- request to the route/service, it will be an empty string if there is |
| -- no consumer in request. |
| metrics.status = prometheus:counter("http_status", |
| "HTTP status codes per service in APISIX", |
| {"code", "route", "matched_uri", "matched_host", "service", "consumer", "node", |
| unpack(extra_labels("http_status"))}, |
| exptime) |
| |
| local buckets = DEFAULT_BUCKETS |
| if attr and attr.default_buckets then |
| buckets = attr.default_buckets |
| end |
| |
| metrics.latency = prometheus:histogram("http_latency", |
| "HTTP request latency in milliseconds per service in APISIX", |
| {"type", "route", "service", "consumer", "node", unpack(extra_labels("http_latency"))}, |
| buckets, exptime) |
| |
| metrics.bandwidth = prometheus:counter("bandwidth", |
| "Total bandwidth in bytes consumed per service in APISIX", |
| {"type", "route", "service", "consumer", "node", unpack(extra_labels("bandwidth"))}, |
| exptime) |
| |
| if prometheus_enabled_in_stream then |
| init_stream_metrics() |
| end |
| end |
| |
| |
| function _M.stream_init() |
| if ngx.get_phase() ~= "init" and ngx.get_phase() ~= "init_worker" then |
| return |
| end |
| |
| if not pcall(function() return C.ngx_meta_lua_ffi_shdict_udata_to_zone end) then |
| core.log.error("need to build APISIX-Runtime to support L4 metrics") |
| return |
| end |
| |
| clear_tab(metrics) |
| |
| local metric_prefix = "apisix_" |
| local attr = plugin.plugin_attr("prometheus") |
| if attr and attr.metric_prefix then |
| metric_prefix = attr.metric_prefix |
| end |
| |
| prometheus = base_prometheus.init("prometheus-metrics", metric_prefix) |
| |
| init_stream_metrics() |
| end |
| |
| |
| function _M.http_log(conf, ctx) |
| local vars = ctx.var |
| |
| local route_id = "" |
| local balancer_ip = ctx.balancer_ip or "" |
| local service_id = "" |
| local consumer_name = ctx.consumer_name or "" |
| |
| local matched_route = ctx.matched_route and ctx.matched_route.value |
| if matched_route then |
| route_id = matched_route.id |
| service_id = matched_route.service_id or "" |
| if conf.prefer_name == true then |
| route_id = matched_route.name or route_id |
| if service_id ~= "" then |
| local service = service_fetch(service_id) |
| service_id = service and service.value.name or service_id |
| end |
| end |
| end |
| |
| local matched_uri = "" |
| local matched_host = "" |
| if ctx.curr_req_matched then |
| matched_uri = ctx.curr_req_matched._path or "" |
| matched_host = ctx.curr_req_matched._host or "" |
| end |
| |
| metrics.status:inc(1, |
| gen_arr(vars.status, route_id, matched_uri, matched_host, |
| service_id, consumer_name, balancer_ip, |
| unpack(extra_labels("http_status", ctx)))) |
| |
| local latency, upstream_latency, apisix_latency = latency_details(ctx) |
| local latency_extra_label_values = extra_labels("http_latency", ctx) |
| |
| metrics.latency:observe(latency, |
| gen_arr("request", route_id, service_id, consumer_name, balancer_ip, |
| unpack(latency_extra_label_values))) |
| |
| if upstream_latency then |
| metrics.latency:observe(upstream_latency, |
| gen_arr("upstream", route_id, service_id, consumer_name, balancer_ip, |
| unpack(latency_extra_label_values))) |
| end |
| |
| metrics.latency:observe(apisix_latency, |
| gen_arr("apisix", route_id, service_id, consumer_name, balancer_ip, |
| unpack(latency_extra_label_values))) |
| |
| local bandwidth_extra_label_values = extra_labels("bandwidth", ctx) |
| |
| metrics.bandwidth:inc(vars.request_length, |
| gen_arr("ingress", route_id, service_id, consumer_name, balancer_ip, |
| unpack(bandwidth_extra_label_values))) |
| |
| metrics.bandwidth:inc(vars.bytes_sent, |
| gen_arr("egress", route_id, service_id, consumer_name, balancer_ip, |
| unpack(bandwidth_extra_label_values))) |
| end |
| |
| |
| function _M.stream_log(conf, ctx) |
| local route_id = "" |
| local matched_route = ctx.matched_route and ctx.matched_route.value |
| if matched_route then |
| route_id = matched_route.id |
| if conf.prefer_name == true then |
| route_id = matched_route.name or route_id |
| end |
| end |
| |
| metrics.stream_connection_total:inc(1, gen_arr(route_id)) |
| end |
| |
| |
| local ngx_status_items = {"active", "accepted", "handled", "total", |
| "reading", "writing", "waiting"} |
| local label_values = {} |
| |
| local function nginx_status() |
| local res = ngx_capture("/apisix/nginx_status") |
| if not res or res.status ~= 200 then |
| core.log.error("failed to fetch Nginx status") |
| return |
| end |
| |
| -- Active connections: 2 |
| -- server accepts handled requests |
| -- 26 26 84 |
| -- Reading: 0 Writing: 1 Waiting: 1 |
| |
| local iterator, err = re_gmatch(res.body, [[(\d+)]], "jmo") |
| if not iterator then |
| core.log.error("failed to re.gmatch Nginx status: ", err) |
| return |
| end |
| |
| for _, name in ipairs(ngx_status_items) do |
| local val = iterator() |
| if not val then |
| break |
| end |
| |
| if name == "total" then |
| metrics.requests:set(val[0]) |
| else |
| label_values[1] = name |
| metrics.connections:set(val[0], label_values) |
| end |
| end |
| end |
| |
| |
| local key_values = {} |
| local function set_modify_index(key, items, items_ver, global_max_index) |
| clear_tab(key_values) |
| local max_idx = 0 |
| if items_ver and items then |
| for _, item in ipairs(items) do |
| if type(item) == "table" then |
| local modify_index = item.orig_modifiedIndex or item.modifiedIndex |
| if modify_index > max_idx then |
| max_idx = modify_index |
| end |
| end |
| end |
| end |
| |
| key_values[1] = key |
| metrics.etcd_modify_indexes:set(max_idx, key_values) |
| |
| |
| global_max_index = max_idx > global_max_index and max_idx or global_max_index |
| |
| return global_max_index |
| end |
| |
| |
| local function etcd_modify_index() |
| clear_tab(key_values) |
| local global_max_idx = 0 |
| |
| -- routes |
| local routes, routes_ver = get_routes() |
| global_max_idx = set_modify_index("routes", routes, routes_ver, global_max_idx) |
| |
| -- services |
| local services, services_ver = get_services() |
| global_max_idx = set_modify_index("services", services, services_ver, global_max_idx) |
| |
| -- ssls |
| local ssls, ssls_ver = get_ssls() |
| global_max_idx = set_modify_index("ssls", ssls, ssls_ver, global_max_idx) |
| |
| -- consumers |
| local consumers, consumers_ver = get_consumers() |
| global_max_idx = set_modify_index("consumers", consumers, consumers_ver, global_max_idx) |
| |
| -- global_rules |
| local global_rules, global_rules_ver = get_global_rules() |
| if global_rules then |
| global_max_idx = set_modify_index("global_rules", global_rules, |
| global_rules_ver, global_max_idx) |
| |
| -- prev_index |
| key_values[1] = "prev_index" |
| local prev_index = get_global_rules_prev_index() |
| metrics.etcd_modify_indexes:set(prev_index, key_values) |
| |
| else |
| global_max_idx = set_modify_index("global_rules", nil, nil, global_max_idx) |
| end |
| |
| -- upstreams |
| local upstreams, upstreams_ver = get_upstreams() |
| global_max_idx = set_modify_index("upstreams", upstreams, upstreams_ver, global_max_idx) |
| |
| -- stream_routes |
| local stream_routes, stream_routes_ver = get_stream_routes() |
| global_max_idx = set_modify_index("stream_routes", stream_routes, |
| stream_routes_ver, global_max_idx) |
| |
| -- proto |
| local protos, protos_ver = get_protos() |
| global_max_idx = set_modify_index("protos", protos, protos_ver, global_max_idx) |
| |
| -- global max |
| key_values[1] = "max_modify_index" |
| metrics.etcd_modify_indexes:set(global_max_idx, key_values) |
| |
| end |
| |
| |
| local function shared_dict_status() |
| local name = {} |
| for shared_dict_name, shared_dict in pairs(ngx.shared) do |
| name[1] = shared_dict_name |
| metrics.shared_dict_capacity_bytes:set(shared_dict:capacity(), name) |
| metrics.shared_dict_free_space_bytes:set(shared_dict:free_space(), name) |
| end |
| end |
| |
| |
| local function collect(ctx, stream_only) |
| if not prometheus or not metrics then |
| core.log.error("prometheus: plugin is not initialized, please make sure ", |
| " 'prometheus_metrics' shared dict is present in nginx template") |
| return 500, {message = "An unexpected error occurred"} |
| end |
| |
| -- collect ngx.shared.DICT status |
| shared_dict_status() |
| |
| -- across all services |
| nginx_status() |
| |
| local config = core.config.new() |
| |
| -- config server status |
| local vars = ngx.var or {} |
| local hostname = vars.hostname or "" |
| |
| -- we can't get etcd index in metric server if only stream subsystem is enabled |
| if config.type == "etcd" and not stream_only then |
| -- etcd modify index |
| etcd_modify_index() |
| |
| local version, err = config:server_version() |
| if version then |
| metrics.etcd_reachable:set(1) |
| |
| else |
| metrics.etcd_reachable:set(0) |
| core.log.error("prometheus: failed to reach config server while ", |
| "processing metrics endpoint: ", err) |
| end |
| |
| -- Because request any key from etcd will return the "X-Etcd-Index". |
| -- A non-existed key is preferred because it doesn't return too much data. |
| -- So use phantom key to get etcd index. |
| local res, _ = config:getkey("/phantomkey") |
| if res and res.headers then |
| clear_tab(key_values) |
| -- global max |
| key_values[1] = "x_etcd_index" |
| metrics.etcd_modify_indexes:set(res.headers["X-Etcd-Index"], key_values) |
| end |
| end |
| |
| metrics.node_info:set(1, gen_arr(hostname)) |
| |
| -- update upstream_status metrics |
| local stats = control.get_health_checkers() |
| metrics.upstream_status:reset() |
| for _, stat in ipairs(stats) do |
| for _, node in ipairs(stat.nodes) do |
| metrics.upstream_status:set( |
| (node.status == "healthy" or node.status == "mostly_healthy") and 1 or 0, |
| gen_arr(stat.name, node.ip, node.port) |
| ) |
| end |
| end |
| |
| core.response.set_header("content_type", "text/plain") |
| return 200, core.table.concat(prometheus:metric_data()) |
| end |
| _M.collect = collect |
| |
| |
| local function get_api(called_by_api_router) |
| local export_uri = default_export_uri |
| local attr = plugin.plugin_attr(plugin_name) |
| if attr and attr.export_uri then |
| export_uri = attr.export_uri |
| end |
| |
| local api = { |
| methods = {"GET"}, |
| uri = export_uri, |
| handler = collect |
| } |
| |
| if not called_by_api_router then |
| return api |
| end |
| |
| if attr.enable_export_server then |
| return {} |
| end |
| |
| return {api} |
| end |
| _M.get_api = get_api |
| |
| |
| function _M.export_metrics(stream_only) |
| local api = get_api(false) |
| local uri = ngx.var.uri |
| local method = ngx.req.get_method() |
| |
| if uri == api.uri and method == api.methods[1] then |
| local code, body = api.handler(nil, stream_only) |
| if code or body then |
| core.response.exit(code, body) |
| end |
| end |
| |
| return core.response.exit(404) |
| end |
| |
| |
| function _M.metric_data() |
| return prometheus:metric_data() |
| end |
| |
| function _M.get_prometheus() |
| return prometheus |
| end |
| |
| return _M |