blob: 87da8bcf86201cbe5af9bd0842a4c879e3f3a660 [file] [log] [blame]
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local require = require
require("apisix.patch").patch()
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local plugin_config = require("apisix.plugin_config")
local script = require("apisix.script")
local service_fetch = require("apisix.http.service").get
local admin_init = require("apisix.admin.init")
local get_var = require("resty.ngxvar").fetch
local router = require("apisix.router")
local apisix_upstream = require("apisix.upstream")
local set_upstream = apisix_upstream.set_by_route
local upstream_util = require("apisix.utils.upstream")
local ctxdump = require("resty.ctxdump")
local ipmatcher = require("resty.ipmatcher")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
local math = math
local error = error
local ipairs = ipairs
local tostring = tostring
local ngx_now = ngx.now
local ngx_var = ngx.var
local str_byte = string.byte
local str_sub = string.sub
local tonumber = tonumber
local control_api_router
local is_http = false
if ngx.config.subsystem == "http" then
is_http = true
control_api_router = require("apisix.control.router")
end
local load_balancer
local local_conf
local ver_header = "APISIX/" .. core.version.VERSION
local _M = {version = 0.4}
function _M.http_init(args)
require("resty.core")
if require("ffi").os == "Linux" then
require("ngx.re").opt("jit_stack_size", 200 * 1024)
end
require("jit.opt").start("minstitch=2", "maxtrace=4000",
"maxrecord=8000", "sizemcode=64",
"maxmcode=4000", "maxirconst=1000")
core.resolver.init_resolver(args)
core.id.init()
local process = require("ngx.process")
local ok, err = process.enable_privileged_agent()
if not ok then
core.log.error("failed to enable privileged_agent: ", err)
end
if core.config.init then
local ok, err = core.config.init()
if not ok then
core.log.error("failed to load the configuration: ", err)
end
end
end
function _M.http_init_worker()
local seed, err = core.utils.get_seed_from_urandom()
if not seed then
core.log.warn('failed to get seed from urandom: ', err)
seed = ngx_now() * 1000 + ngx.worker.pid()
end
math.randomseed(seed)
-- for testing only
core.log.info("random test in [1, 10000]: ", math.random(1, 10000))
local we = require("resty.worker.events")
local ok, err = we.configure({shm = "worker-events", interval = 0.1})
if not ok then
error("failed to init worker event: " .. err)
end
local discovery = require("apisix.discovery.init").discovery
if discovery and discovery.init_worker then
discovery.init_worker()
end
require("apisix.balancer").init_worker()
load_balancer = require("apisix.balancer").run
require("apisix.admin.init").init_worker()
require("apisix.timers").init_worker()
plugin.init_worker()
router.http_init_worker()
require("apisix.http.service").init_worker()
plugin_config.init_worker()
require("apisix.consumer").init_worker()
if core.config == require("apisix.core.config_yaml") then
core.config.init_worker()
end
require("apisix.debug").init_worker()
apisix_upstream.init_worker()
require("apisix.plugins.ext-plugin.init").init_worker()
local_conf = core.config.local_conf()
if local_conf.apisix and local_conf.apisix.enable_server_tokens == false then
ver_header = "APISIX"
end
end
function _M.http_ssl_phase()
local ngx_ctx = ngx.ctx
local api_ctx = ngx_ctx.api_ctx
if api_ctx == nil then
api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
ngx_ctx.api_ctx = api_ctx
end
local ok, err = router.router_ssl.match_and_set(api_ctx)
if not ok then
if err then
core.log.error("failed to fetch ssl config: ", err)
end
ngx_exit(-1)
end
end
local function parse_domain_for_nodes(nodes)
local new_nodes = core.table.new(#nodes, 0)
for _, node in ipairs(nodes) do
local host = node.host
if not ipmatcher.parse_ipv4(host) and
not ipmatcher.parse_ipv6(host) then
local ip, err = core.resolver.parse_domain(host)
if ip then
local new_node = core.table.clone(node)
new_node.host = ip
new_node.domain = host
core.table.insert(new_nodes, new_node)
end
if err then
core.log.error("dns resolver domain: ", host, " error: ", err)
end
else
core.table.insert(new_nodes, node)
end
end
return new_nodes
end
local function parse_domain_in_up(up)
local nodes = up.value.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
if not new_nodes then
return nil, err
end
local ok = upstream_util.compare_upstream_node(up.dns_value, new_nodes)
if ok then
return up
end
if not up.orig_modifiedIndex then
up.orig_modifiedIndex = up.modifiedIndex
end
up.modifiedIndex = up.orig_modifiedIndex .. "#" .. ngx_now()
up.dns_value = core.table.clone(up.value)
up.dns_value.nodes = new_nodes
core.log.info("resolve upstream which contain domain: ",
core.json.delay_encode(up, true))
return up
end
local function parse_domain_in_route(route)
local nodes = route.value.upstream.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
if not new_nodes then
return nil, err
end
local up_conf = route.dns_value and route.dns_value.upstream
local ok = upstream_util.compare_upstream_node(up_conf, new_nodes)
if ok then
return route
end
-- don't modify the modifiedIndex to avoid plugin cache miss because of DNS resolve result
-- has changed
route.dns_value = core.table.deepcopy(route.value)
route.dns_value.upstream.nodes = new_nodes
core.log.info("parse route which contain domain: ",
core.json.delay_encode(route, true))
return route
end
local function set_upstream_host(api_ctx)
local pass_host = api_ctx.pass_host or "pass"
if pass_host == "pass" then
return
end
if pass_host == "rewrite" then
api_ctx.var.upstream_host = api_ctx.upstream_host
return
end
-- only support single node for `node` mode currently
local host
local up_conf = api_ctx.upstream_conf
local nodes_count = up_conf.nodes and #up_conf.nodes or 0
if nodes_count == 1 then
local node = up_conf.nodes[1]
if node.domain and #node.domain > 0 then
host = node.domain
else
host = node.host
end
end
if host then
api_ctx.var.upstream_host = host
end
end
local function get_upstream_by_id(up_id)
local upstreams = core.config.fetch_created_obj("/upstreams")
if upstreams then
local upstream = upstreams:get(tostring(up_id))
if not upstream then
core.log.error("failed to find upstream by id: " .. up_id)
if is_http then
return core.response.exit(502)
end
return ngx_exit(1)
end
if upstream.has_domain then
local err
upstream, err = parse_domain_in_up(upstream)
if err then
core.log.error("failed to get resolved upstream: ", err)
if is_http then
return core.response.exit(500)
end
return ngx_exit(1)
end
end
core.log.info("parsed upstream: ", core.json.delay_encode(upstream))
return upstream.dns_value or upstream.value
end
end
function _M.http_access_phase()
local ngx_ctx = ngx.ctx
if ngx_ctx.api_ctx and ngx_ctx.api_ctx.ssl_client_verified then
local res = ngx_var.ssl_client_verify
if res ~= "SUCCESS" then
if res == "NONE" then
core.log.error("client certificate was not present")
else
core.log.error("clent certificate verification is not passed: ", res)
end
return core.response.exit(400)
end
end
-- always fetch table from the table pool, we don't need a reused api_ctx
local api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
ngx_ctx.api_ctx = api_ctx
core.ctx.set_vars_meta(api_ctx)
local uri = api_ctx.var.uri
if local_conf.apisix and local_conf.apisix.delete_uri_tail_slash then
if str_byte(uri, #uri) == str_byte("/") then
api_ctx.var.uri = str_sub(api_ctx.var.uri, 1, #uri - 1)
core.log.info("remove the end of uri '/', current uri: ",
api_ctx.var.uri)
end
end
if router.api.has_route_not_under_apisix() or
core.string.has_prefix(uri, "/apisix/")
then
local skip = local_conf and local_conf.apisix.global_rule_skip_internal_api
local matched = router.api.match(api_ctx, skip)
if matched then
return
end
end
router.router_http.match(api_ctx)
-- run global rule
plugin.run_global_rules(api_ctx, router.global_rules, nil)
local route = api_ctx.matched_route
if not route then
core.log.info("not find any matched route")
return core.response.exit(404,
{error_msg = "404 Route Not Found"})
end
core.log.info("matched route: ",
core.json.delay_encode(api_ctx.matched_route, true))
local enable_websocket = route.value.enable_websocket
if route.value.plugin_config_id then
local conf = plugin_config.get(route.value.plugin_config_id)
if not conf then
core.log.error("failed to fetch plugin config by ",
"id: ", route.value.plugin_config_id)
return core.response.exit(503)
end
route = plugin_config.merge(route, conf)
end
if route.value.service_id then
local service = service_fetch(route.value.service_id)
if not service then
core.log.error("failed to fetch service configuration by ",
"id: ", route.value.service_id)
return core.response.exit(404)
end
route = plugin.merge_service_route(service, route)
api_ctx.matched_route = route
api_ctx.conf_type = "route&service"
api_ctx.conf_version = route.modifiedIndex .. "&" .. service.modifiedIndex
api_ctx.conf_id = route.value.id .. "&" .. service.value.id
api_ctx.service_id = service.value.id
api_ctx.service_name = service.value.name
if enable_websocket == nil then
enable_websocket = service.value.enable_websocket
end
else
api_ctx.conf_type = "route"
api_ctx.conf_version = route.modifiedIndex
api_ctx.conf_id = route.value.id
end
api_ctx.route_id = route.value.id
api_ctx.route_name = route.value.name
if route.value.script then
script.load(route, api_ctx)
script.run("access", api_ctx)
else
local plugins = plugin.filter(route)
api_ctx.plugins = plugins
plugin.run_plugin("rewrite", plugins, api_ctx)
if api_ctx.consumer then
local changed
route, changed = plugin.merge_consumer_route(
route,
api_ctx.consumer,
api_ctx
)
core.log.info("find consumer ", api_ctx.consumer.username,
", config changed: ", changed)
if changed then
core.table.clear(api_ctx.plugins)
api_ctx.plugins = plugin.filter(route, api_ctx.plugins)
end
end
plugin.run_plugin("access", plugins, api_ctx)
end
local up_id = route.value.upstream_id
-- used for the traffic-split plugin
if api_ctx.upstream_id then
up_id = api_ctx.upstream_id
end
if up_id then
local upstream = get_upstream_by_id(up_id)
api_ctx.matched_upstream = upstream
if upstream and upstream.pass_host then
api_ctx.pass_host = upstream.pass_host
api_ctx.upstream_host = upstream.upstream_host
end
else
if route.has_domain then
local err
route, err = parse_domain_in_route(route)
if err then
core.log.error("failed to get resolved route: ", err)
return core.response.exit(500)
end
api_ctx.conf_version = route.modifiedIndex
api_ctx.matched_route = route
end
local route_val = route.value
if route_val.upstream and route_val.upstream.enable_websocket then
enable_websocket = true
end
if route_val.upstream and route_val.upstream.pass_host then
api_ctx.pass_host = route_val.upstream.pass_host
api_ctx.upstream_host = route_val.upstream.upstream_host
end
api_ctx.matched_upstream = (route.dns_value and
route.dns_value.upstream)
or route_val.upstream
end
if enable_websocket then
api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade
api_ctx.var.upstream_connection = api_ctx.var.http_connection
core.log.info("enabled websocket for route: ", route.value.id)
end
if route.value.service_protocol == "grpc" then
api_ctx.upstream_scheme = "grpc"
end
local code, err = set_upstream(route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
core.response.exit(code)
end
set_upstream_host(api_ctx)
local up_scheme = api_ctx.upstream_scheme
if up_scheme == "grpcs" or up_scheme == "grpc" then
ngx_var.ctx_ref = ctxdump.stash_ngx_ctx()
return ngx.exec("@grpc_pass")
end
if api_ctx.dubbo_proxy_enabled then
ngx_var.ctx_ref = ctxdump.stash_ngx_ctx()
return ngx.exec("@dubbo_pass")
end
end
function _M.dubbo_access_phase()
ngx.ctx = ctxdump.apply_ngx_ctx(ngx_var.ctx_ref)
end
function _M.grpc_access_phase()
ngx.ctx = ctxdump.apply_ngx_ctx(ngx_var.ctx_ref)
local api_ctx = ngx.ctx.api_ctx
if not api_ctx then
return
end
local code, err = apisix_upstream.set_grpcs_upstream_param(api_ctx)
if code then
core.log.error("failed to set grpcs upstream param: ", err)
core.response.exit(code)
end
end
local function common_phase(phase_name)
local api_ctx = ngx.ctx.api_ctx
if not api_ctx then
return
end
plugin.run_global_rules(api_ctx, api_ctx.global_rules, phase_name)
if api_ctx.script_obj then
script.run(phase_name, api_ctx)
else
plugin.run_plugin(phase_name, nil, api_ctx)
end
return api_ctx
end
local function set_resp_upstream_status(up_status)
core.response.set_header("X-APISIX-Upstream-Status", up_status)
core.log.info("X-APISIX-Upstream-Status: ", up_status)
end
function _M.http_header_filter_phase()
core.response.set_header("Server", ver_header)
local up_status = get_var("upstream_status")
if up_status and #up_status == 3
and tonumber(up_status) >= 500
and tonumber(up_status) <= 599
then
set_resp_upstream_status(up_status)
elseif up_status and #up_status > 3 then
-- the up_status can be "502, 502" or "502, 502 : "
local last_status
if str_byte(up_status, -1) == str_byte(" ") then
last_status = str_sub(up_status, -6, -3)
else
last_status = str_sub(up_status, -3)
end
if tonumber(last_status) >= 500 and tonumber(last_status) <= 599 then
set_resp_upstream_status(up_status)
end
end
common_phase("header_filter")
end
function _M.http_body_filter_phase()
common_phase("body_filter")
end
local function healthcheck_passive(api_ctx)
local checker = api_ctx.up_checker
if not checker then
return
end
local up_conf = api_ctx.upstream_conf
local passive = up_conf.checks.passive
if not passive then
return
end
core.log.info("enabled healthcheck passive")
local host = up_conf.checks and up_conf.checks.active
and up_conf.checks.active.host
local port = up_conf.checks and up_conf.checks.active
and up_conf.checks.active.port
local resp_status = ngx.status
local http_statuses = passive and passive.healthy and
passive.healthy.http_statuses
core.log.info("passive.healthy.http_statuses: ",
core.json.delay_encode(http_statuses))
if http_statuses then
for i, status in ipairs(http_statuses) do
if resp_status == status then
checker:report_http_status(api_ctx.balancer_ip,
port or api_ctx.balancer_port,
host,
resp_status)
end
end
end
http_statuses = passive and passive.unhealthy and
passive.unhealthy.http_statuses
core.log.info("passive.unhealthy.http_statuses: ",
core.json.delay_encode(http_statuses))
if not http_statuses then
return
end
for i, status in ipairs(http_statuses) do
if resp_status == status then
checker:report_http_status(api_ctx.balancer_ip,
port or api_ctx.balancer_port,
host,
resp_status)
end
end
end
function _M.http_log_phase()
local api_ctx = common_phase("log")
healthcheck_passive(api_ctx)
if api_ctx.server_picker and api_ctx.server_picker.after_balance then
api_ctx.server_picker.after_balance(api_ctx, false)
end
if api_ctx.uri_parse_param then
core.tablepool.release("uri_parse_param", api_ctx.uri_parse_param)
end
core.ctx.release_vars(api_ctx)
if api_ctx.plugins and api_ctx.plugins ~= core.empty_tab then
core.tablepool.release("plugins", api_ctx.plugins)
end
if api_ctx.curr_req_matched then
core.tablepool.release("matched_route_record", api_ctx.curr_req_matched)
end
core.tablepool.release("api_ctx", api_ctx)
end
function _M.http_balancer_phase()
local api_ctx = ngx.ctx.api_ctx
if not api_ctx then
core.log.error("invalid api_ctx")
return core.response.exit(500)
end
load_balancer(api_ctx.matched_route, api_ctx)
end
local function cors_admin()
local_conf = core.config.local_conf()
if local_conf.apisix and not local_conf.apisix.enable_admin_cors then
return
end
local method = get_method()
if method == "OPTIONS" then
core.response.set_header("Access-Control-Allow-Origin", "*",
"Access-Control-Allow-Methods",
"POST, GET, PUT, OPTIONS, DELETE, PATCH",
"Access-Control-Max-Age", "3600",
"Access-Control-Allow-Headers", "*",
"Access-Control-Allow-Credentials", "true",
"Content-Length", "0",
"Content-Type", "text/plain")
ngx_exit(200)
end
core.response.set_header("Access-Control-Allow-Origin", "*",
"Access-Control-Allow-Credentials", "true",
"Access-Control-Expose-Headers", "*",
"Access-Control-Max-Age", "3600")
end
local function add_content_type()
core.response.set_header("Content-Type", "application/json")
end
do
local router
function _M.http_admin()
if not router then
router = admin_init.get()
end
-- add cors rsp header
cors_admin()
-- add content type to rsp header
add_content_type()
-- core.log.info("uri: ", get_var("uri"), " method: ", get_method())
local ok = router:dispatch(get_var("uri"), {method = get_method()})
if not ok then
ngx_exit(404)
end
end
end -- do
function _M.http_control()
local ok = control_api_router.match(get_var("uri"))
if not ok then
ngx_exit(404)
end
end
function _M.stream_init(args)
core.log.info("enter stream_init")
core.resolver.init_resolver(args)
if core.config.init then
local ok, err = core.config.init()
if not ok then
core.log.error("failed to load the configuration: ", err)
end
end
end
function _M.stream_init_worker()
core.log.info("enter stream_init_worker")
local seed, err = core.utils.get_seed_from_urandom()
if not seed then
core.log.warn('failed to get seed from urandom: ', err)
seed = ngx_now() * 1000 + ngx.worker.pid()
end
math.randomseed(seed)
-- for testing only
core.log.info("random stream test in [1, 10000]: ", math.random(1, 10000))
plugin.init_worker()
router.stream_init_worker()
apisix_upstream.init_worker()
if core.config == require("apisix.core.config_yaml") then
core.config.init_worker()
end
load_balancer = require("apisix.balancer").run
local_conf = core.config.local_conf()
end
function _M.stream_preread_phase()
core.log.info("enter stream_preread_phase")
local ngx_ctx = ngx.ctx
local api_ctx = ngx_ctx.api_ctx
if not api_ctx then
api_ctx = core.tablepool.fetch("api_ctx", 0, 32)
ngx_ctx.api_ctx = api_ctx
end
core.ctx.set_vars_meta(api_ctx)
router.router_stream.match(api_ctx)
core.log.info("matched route: ",
core.json.delay_encode(api_ctx.matched_route, true))
local matched_route = api_ctx.matched_route
if not matched_route then
return ngx_exit(1)
end
local up_id = matched_route.value.upstream_id
if up_id then
api_ctx.matched_upstream = get_upstream_by_id(up_id)
else
api_ctx.matched_upstream = matched_route.value.upstream
end
local plugins = core.tablepool.fetch("plugins", 32, 0)
api_ctx.plugins = plugin.stream_filter(matched_route, plugins)
-- core.log.info("valid plugins: ", core.json.delay_encode(plugins, true))
api_ctx.conf_type = "stream/route"
api_ctx.conf_version = matched_route.modifiedIndex
api_ctx.conf_id = matched_route.value.id
plugin.run_plugin("preread", plugins, api_ctx)
local code, err = set_upstream(matched_route, api_ctx)
if code then
core.log.error("failed to set upstream: ", err)
return ngx_exit(1)
end
end
function _M.stream_balancer_phase()
core.log.info("enter stream_balancer_phase")
local api_ctx = ngx.ctx.api_ctx
if not api_ctx then
core.log.error("invalid api_ctx")
return ngx_exit(1)
end
load_balancer(api_ctx.matched_route, api_ctx)
end
function _M.stream_log_phase()
core.log.info("enter stream_log_phase")
-- core.ctx.release_vars(api_ctx)
plugin.run_plugin("log")
end
return _M