blob: 7864a0084dddc4c41a3b2b9d9fcf47d6d907d41e [file] [log] [blame]
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local require = require
local core = require("apisix.core")
local config_util = require("apisix.core.config_util")
local enable_debug = require("apisix.debug").enable_debug
local wasm = require("apisix.wasm")
local ngx_exit = ngx.exit
local pkg_loaded = package.loaded
local sort_tab = table.sort
local pcall = pcall
local ipairs = ipairs
local pairs = pairs
local type = type
local local_plugins = core.table.new(32, 0)
local ngx = ngx
local tostring = tostring
local error = error
local is_http = ngx.config.subsystem == "http"
local local_plugins_hash = core.table.new(0, 32)
local stream_local_plugins = core.table.new(32, 0)
local stream_local_plugins_hash = core.table.new(0, 32)
local merged_route = core.lrucache.new({
ttl = 300, count = 512
})
local local_conf
local _M = {
version = 0.3,
load_times = 0,
plugins = local_plugins,
plugins_hash = local_plugins_hash,
stream_load_times= 0,
stream_plugins = stream_local_plugins,
stream_plugins_hash = stream_local_plugins_hash,
}
local function plugin_attr(name)
-- TODO: get attr from synchronized data
local local_conf = core.config.local_conf()
return core.table.try_read_attr(local_conf, "plugin_attr", name)
end
_M.plugin_attr = plugin_attr
local function sort_plugin(l, r)
return l.priority > r.priority
end
local PLUGIN_TYPE_HTTP = 1
local PLUGIN_TYPE_STREAM = 2
local PLUGIN_TYPE_HTTP_WASM = 3
local function unload_plugin(name, plugin_type)
if plugin_type == PLUGIN_TYPE_HTTP_WASM then
return
end
local pkg_name = "apisix.plugins." .. name
if plugin_type == PLUGIN_TYPE_STREAM then
pkg_name = "apisix.stream.plugins." .. name
end
local old_plugin = pkg_loaded[pkg_name]
if old_plugin and type(old_plugin.destroy) == "function" then
old_plugin.destroy()
end
pkg_loaded[pkg_name] = nil
end
local function load_plugin(name, plugins_list, plugin_type)
local ok, plugin
if plugin_type == PLUGIN_TYPE_HTTP_WASM then
-- for wasm plugin, we pass the whole attrs instead of name
ok, plugin = wasm.require(name)
name = name.name
else
local pkg_name = "apisix.plugins." .. name
if plugin_type == PLUGIN_TYPE_STREAM then
pkg_name = "apisix.stream.plugins." .. name
end
ok, plugin = pcall(require, pkg_name)
end
if not ok then
core.log.error("failed to load plugin [", name, "] err: ", plugin)
return
end
if not plugin.priority then
core.log.error("invalid plugin [", name,
"], missing field: priority")
return
end
if not plugin.version then
core.log.error("invalid plugin [", name, "] missing field: version")
return
end
if type(plugin.schema) ~= "table" then
core.log.error("invalid plugin [", name, "] schema field")
return
end
if not plugin.schema.properties then
plugin.schema.properties = {}
end
local properties = plugin.schema.properties
local plugin_injected_schema = core.schema.plugin_injected_schema
if plugin.schema['$comment'] ~= plugin_injected_schema['$comment'] then
if properties.disable then
core.log.error("invalid plugin [", name,
"]: found forbidden 'disable' field in the schema")
return
end
properties.disable = plugin_injected_schema.disable
plugin.schema['$comment'] = plugin_injected_schema['$comment']
end
plugin.name = name
plugin.attr = plugin_attr(name)
core.table.insert(plugins_list, plugin)
if plugin.init then
plugin.init()
end
return
end
local function load(plugin_names, wasm_plugin_names)
local processed = {}
for _, name in ipairs(plugin_names) do
if processed[name] == nil then
processed[name] = true
end
end
for _, attrs in ipairs(wasm_plugin_names) do
if processed[attrs.name] == nil then
processed[attrs.name] = attrs
end
end
core.log.warn("new plugins: ", core.json.delay_encode(processed))
for name, plugin in pairs(local_plugins_hash) do
local ty = PLUGIN_TYPE_HTTP
if plugin.type == "wasm" then
ty = PLUGIN_TYPE_HTTP_WASM
end
unload_plugin(name, ty)
end
core.table.clear(local_plugins)
core.table.clear(local_plugins_hash)
for name, value in pairs(processed) do
local ty = PLUGIN_TYPE_HTTP
if type(value) == "table" then
ty = PLUGIN_TYPE_HTTP_WASM
name = value
end
load_plugin(name, local_plugins, ty)
end
-- sort by plugin's priority
if #local_plugins > 1 then
sort_tab(local_plugins, sort_plugin)
end
for i, plugin in ipairs(local_plugins) do
local_plugins_hash[plugin.name] = plugin
if enable_debug() then
core.log.warn("loaded plugin and sort by priority:",
" ", plugin.priority,
" name: ", plugin.name)
end
end
_M.load_times = _M.load_times + 1
core.log.info("load plugin times: ", _M.load_times)
return true
end
local function load_stream(plugin_names)
local processed = {}
for _, name in ipairs(plugin_names) do
if processed[name] == nil then
processed[name] = true
end
end
core.log.warn("new plugins: ", core.json.delay_encode(processed))
for name in pairs(stream_local_plugins_hash) do
unload_plugin(name, PLUGIN_TYPE_STREAM)
end
core.table.clear(stream_local_plugins)
core.table.clear(stream_local_plugins_hash)
for name in pairs(processed) do
load_plugin(name, stream_local_plugins, PLUGIN_TYPE_STREAM)
end
-- sort by plugin's priority
if #stream_local_plugins > 1 then
sort_tab(stream_local_plugins, sort_plugin)
end
for i, plugin in ipairs(stream_local_plugins) do
stream_local_plugins_hash[plugin.name] = plugin
if enable_debug() then
core.log.warn("loaded stream plugin and sort by priority:",
" ", plugin.priority,
" name: ", plugin.name)
end
end
_M.stream_load_times = _M.stream_load_times + 1
core.log.info("stream plugins: ",
core.json.delay_encode(stream_local_plugins, true))
core.log.info("load stream plugin times: ", _M.stream_load_times)
return true
end
function _M.load(config)
local http_plugin_names
local stream_plugin_names
if not config then
-- called during starting or hot reload in admin
local err
local_conf, err = core.config.local_conf(true)
if not local_conf then
-- the error is unrecoverable, so we need to raise it
error("failed to load the configuration file: " .. err)
end
http_plugin_names = local_conf.plugins
stream_plugin_names = local_conf.stream_plugins
else
-- called during synchronizing plugin data
http_plugin_names = {}
stream_plugin_names = {}
local plugins_conf = config.value
-- plugins_conf can be nil when another instance writes into etcd key "/apisix/plugins/"
if not plugins_conf then
return local_plugins
end
for _, conf in ipairs(plugins_conf) do
if conf.stream then
core.table.insert(stream_plugin_names, conf.name)
else
core.table.insert(http_plugin_names, conf.name)
end
end
end
if ngx.config.subsystem == "http" then
if not http_plugin_names then
core.log.error("failed to read plugin list from local file")
else
local wasm_plugin_names = {}
if local_conf.wasm then
wasm_plugin_names = local_conf.wasm.plugins
end
local ok, err = load(http_plugin_names, wasm_plugin_names)
if not ok then
core.log.error("failed to load plugins: ", err)
end
end
end
if not stream_plugin_names then
core.log.warn("failed to read stream plugin list from local file")
else
local ok, err = load_stream(stream_plugin_names)
if not ok then
core.log.error("failed to load stream plugins: ", err)
end
end
-- for test
return local_plugins
end
local function trace_plugins_info_for_debug(ctx, plugins)
if not enable_debug() then
return
end
if not plugins then
if is_http and not ngx.headers_sent then
core.response.add_header("Apisix-Plugins", "no plugin")
else
core.log.warn("Apisix-Plugins: no plugin")
end
return
end
local t = {}
for i = 1, #plugins, 2 do
core.table.insert(t, plugins[i].name)
end
if is_http and not ngx.headers_sent then
if ctx then
local debug_headers = ctx.debug_headers
if not debug_headers then
debug_headers = core.table.new(0, 5)
end
for i, v in ipairs(t) do
debug_headers[v] = true
end
ctx.debug_headers = debug_headers
end
else
core.log.warn("Apisix-Plugins: ", core.table.concat(t, ", "))
end
end
function _M.filter(ctx, conf, plugins, route_conf)
local user_plugin_conf = conf.value.plugins
if user_plugin_conf == nil or
core.table.nkeys(user_plugin_conf) == 0 then
trace_plugins_info_for_debug(nil, nil)
-- when 'plugins' is given, always return 'plugins' itself instead
-- of another one
return plugins or core.tablepool.fetch("plugins", 0, 0)
end
local route_plugin_conf = route_conf and route_conf.value.plugins
plugins = plugins or core.tablepool.fetch("plugins", 32, 0)
for _, plugin_obj in ipairs(local_plugins) do
local name = plugin_obj.name
local plugin_conf = user_plugin_conf[name]
if type(plugin_conf) == "table" and not plugin_conf.disable then
if plugin_obj.run_policy == "prefer_route" and route_plugin_conf ~= nil then
local plugin_conf_in_route = route_plugin_conf[name]
if plugin_conf_in_route and not plugin_conf_in_route.disable then
goto continue
end
end
core.table.insert(plugins, plugin_obj)
core.table.insert(plugins, plugin_conf)
::continue::
end
end
trace_plugins_info_for_debug(ctx, plugins)
return plugins
end
function _M.stream_filter(user_route, plugins)
plugins = plugins or core.table.new(#stream_local_plugins * 2, 0)
local user_plugin_conf = user_route.value.plugins
if user_plugin_conf == nil then
trace_plugins_info_for_debug(nil, nil)
return plugins
end
for _, plugin_obj in ipairs(stream_local_plugins) do
local name = plugin_obj.name
local plugin_conf = user_plugin_conf[name]
if type(plugin_conf) == "table" and not plugin_conf.disable then
core.table.insert(plugins, plugin_obj)
core.table.insert(plugins, plugin_conf)
end
end
trace_plugins_info_for_debug(nil, plugins)
return plugins
end
local function merge_service_route(service_conf, route_conf)
local new_conf = core.table.deepcopy(service_conf)
new_conf.value.service_id = new_conf.value.id
new_conf.value.id = route_conf.value.id
new_conf.modifiedIndex = route_conf.modifiedIndex
if route_conf.value.plugins then
for name, conf in pairs(route_conf.value.plugins) do
if not new_conf.value.plugins then
new_conf.value.plugins = {}
end
new_conf.value.plugins[name] = conf
end
end
local route_upstream = route_conf.value.upstream
if route_upstream then
new_conf.value.upstream = route_upstream
-- when route's upstream override service's upstream,
-- the upstream.parent still point to the route
new_conf.value.upstream_id = nil
new_conf.has_domain = route_conf.has_domain
end
if route_conf.value.upstream_id then
new_conf.value.upstream_id = route_conf.value.upstream_id
new_conf.has_domain = route_conf.has_domain
end
if route_conf.value.script then
new_conf.value.script = route_conf.value.script
end
if route_conf.value.timeout then
new_conf.value.timeout = route_conf.value.timeout
end
if route_conf.value.name then
new_conf.value.name = route_conf.value.name
else
new_conf.value.name = nil
end
if route_conf.value.hosts then
new_conf.value.hosts = route_conf.value.hosts
end
if not new_conf.value.hosts and route_conf.value.host then
new_conf.value.host = route_conf.value.host
end
-- core.log.info("merged conf : ", core.json.delay_encode(new_conf))
return new_conf
end
function _M.merge_service_route(service_conf, route_conf)
core.log.info("service conf: ", core.json.delay_encode(service_conf, true))
core.log.info(" route conf: ", core.json.delay_encode(route_conf, true))
local route_service_key = route_conf.value.id .. "#"
.. route_conf.modifiedIndex .. "#" .. service_conf.modifiedIndex
return merged_route(route_service_key, service_conf,
merge_service_route,
service_conf, route_conf)
end
local function merge_consumer_route(route_conf, consumer_conf)
if not consumer_conf.plugins or
core.table.nkeys(consumer_conf.plugins) == 0
then
core.log.info("consumer no plugins")
return route_conf
end
local new_route_conf = core.table.deepcopy(route_conf)
for name, conf in pairs(consumer_conf.plugins) do
if not new_route_conf.value.plugins then
new_route_conf.value.plugins = {}
end
new_route_conf.value.plugins[name] = conf
end
core.log.info("merged conf : ", core.json.delay_encode(new_route_conf))
return new_route_conf
end
function _M.merge_consumer_route(route_conf, consumer_conf, api_ctx)
core.log.info("route conf: ", core.json.delay_encode(route_conf))
core.log.info("consumer conf: ", core.json.delay_encode(consumer_conf))
local flag = tostring(route_conf) .. tostring(consumer_conf)
local new_conf = merged_route(flag, nil,
merge_consumer_route, route_conf, consumer_conf)
api_ctx.conf_type = api_ctx.conf_type .. "&consumer"
api_ctx.conf_version = api_ctx.conf_version .. "&" ..
api_ctx.consumer_ver
api_ctx.conf_id = api_ctx.conf_id .. "&" .. api_ctx.consumer_name
return new_conf, new_conf ~= route_conf
end
local init_plugins_syncer
do
local plugins_conf
function init_plugins_syncer()
local err
plugins_conf, err = core.config.new("/plugins", {
automatic = true,
item_schema = core.schema.plugins,
single_item = true,
filter = function(item)
-- we need to pass 'item' instead of plugins_conf because
-- the latter one is nil at the first run
_M.load(item)
end,
})
if not plugins_conf then
error("failed to create etcd instance for fetching /plugins : " .. err)
end
end
end
function _M.init_worker()
-- some plugins need to be initialized in init* phases
if ngx.config.subsystem == "http" then
require("apisix.plugins.prometheus.exporter").init()
end
_M.load()
if local_conf and not local_conf.apisix.enable_admin then
init_plugins_syncer()
end
local plugin_metadatas, err = core.config.new("/plugin_metadata",
{automatic = true}
)
if not plugin_metadatas then
error("failed to create etcd instance for fetching /plugin_metadatas : "
.. err)
end
_M.plugin_metadatas = plugin_metadatas
end
function _M.plugin_metadata(name)
return _M.plugin_metadatas:get(name)
end
function _M.get(name)
return local_plugins_hash and local_plugins_hash[name]
end
function _M.get_all(attrs)
local http_plugins = {}
local stream_plugins = {}
if local_plugins_hash then
for name, plugin_obj in pairs(local_plugins_hash) do
http_plugins[name] = core.table.pick(plugin_obj, attrs)
end
end
if stream_local_plugins_hash then
for name, plugin_obj in pairs(stream_local_plugins_hash) do
stream_plugins[name] = core.table.pick(plugin_obj, attrs)
end
end
return http_plugins, stream_plugins
end
local function check_schema(plugins_conf, schema_type, skip_disabled_plugin)
for name, plugin_conf in pairs(plugins_conf) do
core.log.info("check plugin schema, name: ", name, ", configurations: ",
core.json.delay_encode(plugin_conf, true))
if type(plugin_conf) ~= "table" then
return false, "invalid plugin conf " ..
core.json.encode(plugin_conf, true) ..
" for plugin [" .. name .. "]"
end
local plugin_obj = local_plugins_hash[name]
if not plugin_obj then
if skip_disabled_plugin then
goto CONTINUE
else
return false, "unknown plugin [" .. name .. "]"
end
end
if plugin_obj.check_schema then
local disable = plugin_conf.disable
plugin_conf.disable = nil
local ok, err = plugin_obj.check_schema(plugin_conf, schema_type)
if not ok then
return false, "failed to check the configuration of plugin "
.. name .. " err: " .. err
end
plugin_conf.disable = disable
end
::CONTINUE::
end
return true
end
_M.check_schema = check_schema
local function stream_check_schema(plugins_conf, schema_type, skip_disabled_plugin)
for name, plugin_conf in pairs(plugins_conf) do
core.log.info("check stream plugin schema, name: ", name,
": ", core.json.delay_encode(plugin_conf, true))
if type(plugin_conf) ~= "table" then
return false, "invalid plugin conf " ..
core.json.encode(plugin_conf, true) ..
" for plugin [" .. name .. "]"
end
local plugin_obj = stream_local_plugins_hash[name]
if not plugin_obj then
if skip_disabled_plugin then
goto CONTINUE
else
return false, "unknown plugin [" .. name .. "]"
end
end
if plugin_obj.check_schema then
local disable = plugin_conf.disable
plugin_conf.disable = nil
local ok, err = plugin_obj.check_schema(plugin_conf, schema_type)
if not ok then
return false, "failed to check the configuration of "
.. "stream plugin [" .. name .. "]: " .. err
end
plugin_conf.disable = disable
end
::CONTINUE::
end
return true
end
_M.stream_check_schema = stream_check_schema
function _M.plugin_checker(item, schema_type)
if item.plugins then
return check_schema(item.plugins, schema_type, true)
end
return true
end
function _M.stream_plugin_checker(item)
if item.plugins then
return stream_check_schema(item.plugins, nil, true)
end
return true
end
function _M.run_plugin(phase, plugins, api_ctx)
local plugin_run = false
api_ctx = api_ctx or ngx.ctx.api_ctx
if not api_ctx then
return
end
plugins = plugins or api_ctx.plugins
if not plugins or #plugins == 0 then
return api_ctx
end
if phase ~= "log"
and phase ~= "header_filter"
and phase ~= "body_filter"
then
for i = 1, #plugins, 2 do
local phase_func = plugins[i][phase]
if phase_func then
plugin_run = true
local code, body = phase_func(plugins[i + 1], api_ctx)
if code or body then
if is_http then
if code >= 400 then
core.log.warn(plugins[i].name, " exits with http status code ", code)
end
core.response.exit(code, body)
else
if code >= 400 then
core.log.warn(plugins[i].name, " exits with status code ", code)
end
ngx_exit(1)
end
end
end
end
return api_ctx, plugin_run
end
for i = 1, #plugins, 2 do
local phase_func = plugins[i][phase]
if phase_func then
plugin_run = true
phase_func(plugins[i + 1], api_ctx)
end
end
return api_ctx, plugin_run
end
function _M.run_global_rules(api_ctx, global_rules, phase_name)
if global_rules and global_rules.values
and #global_rules.values > 0 then
local orig_conf_type = api_ctx.conf_type
local orig_conf_version = api_ctx.conf_version
local orig_conf_id = api_ctx.conf_id
if phase_name == nil then
api_ctx.global_rules = global_rules
end
local plugins = core.tablepool.fetch("plugins", 32, 0)
local values = global_rules.values
local route = api_ctx.matched_route
for _, global_rule in config_util.iterate_values(values) do
api_ctx.conf_type = "global_rule"
api_ctx.conf_version = global_rule.modifiedIndex
api_ctx.conf_id = global_rule.value.id
core.table.clear(plugins)
plugins = _M.filter(api_ctx, global_rule, plugins, route)
if phase_name == nil then
_M.run_plugin("rewrite", plugins, api_ctx)
_M.run_plugin("access", plugins, api_ctx)
else
_M.run_plugin(phase_name, plugins, api_ctx)
end
end
core.tablepool.release("plugins", plugins)
api_ctx.conf_type = orig_conf_type
api_ctx.conf_version = orig_conf_version
api_ctx.conf_id = orig_conf_id
end
end
return _M