| -- |
| -- Licensed to the Apache Software Foundation (ASF) under one or more |
| -- contributor license agreements. See the NOTICE file distributed with |
| -- this work for additional information regarding copyright ownership. |
| -- The ASF licenses this file to You under the Apache License, Version 2.0 |
| -- (the "License"); you may not use this file except in compliance with |
| -- the License. You may obtain a copy of the License at |
| -- |
| -- http://www.apache.org/licenses/LICENSE-2.0 |
| -- |
| -- Unless required by applicable law or agreed to in writing, software |
| -- distributed under the License is distributed on an "AS IS" BASIS, |
| -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| -- See the License for the specific language governing permissions and |
| -- limitations under the License. |
| -- |
| local require = require |
| local core = require("apisix.core") |
| local config_util = require("apisix.core.config_util") |
| local pkg_loaded = package.loaded |
| local sort_tab = table.sort |
| local pcall = pcall |
| local ipairs = ipairs |
| local pairs = pairs |
| local type = type |
| local local_plugins = core.table.new(32, 0) |
| local ngx = ngx |
| local tostring = tostring |
| local error = error |
| local local_plugins_hash = core.table.new(0, 32) |
| local stream_local_plugins = core.table.new(32, 0) |
| local stream_local_plugins_hash = core.table.new(0, 32) |
| |
| |
| local merged_route = core.lrucache.new({ |
| ttl = 300, count = 512 |
| }) |
| local local_conf |
| |
| |
| local _M = { |
| version = 0.3, |
| |
| load_times = 0, |
| plugins = local_plugins, |
| plugins_hash = local_plugins_hash, |
| |
| stream_load_times= 0, |
| stream_plugins = stream_local_plugins, |
| stream_plugins_hash = stream_local_plugins_hash, |
| } |
| |
| |
| local function plugin_attr(name) |
| -- TODO: get attr from synchronized data |
| local local_conf = core.config.local_conf() |
| return core.table.try_read_attr(local_conf, "plugin_attr", name) |
| end |
| _M.plugin_attr = plugin_attr |
| |
| |
| local function sort_plugin(l, r) |
| return l.priority > r.priority |
| end |
| |
| |
| local function unload_plugin(name, is_stream_plugin) |
| local pkg_name = "apisix.plugins." .. name |
| if is_stream_plugin then |
| pkg_name = "apisix.stream.plugins." .. name |
| end |
| |
| local old_plugin = pkg_loaded[pkg_name] |
| if old_plugin and type(old_plugin.destory) == "function" then |
| old_plugin.destory() |
| end |
| |
| pkg_loaded[pkg_name] = nil |
| end |
| |
| |
| local function load_plugin(name, plugins_list, is_stream_plugin) |
| local pkg_name = "apisix.plugins." .. name |
| if is_stream_plugin then |
| pkg_name = "apisix.stream.plugins." .. name |
| end |
| |
| local ok, plugin = pcall(require, pkg_name) |
| if not ok then |
| core.log.error("failed to load plugin [", name, "] err: ", plugin) |
| return |
| end |
| |
| if not plugin.priority then |
| core.log.error("invalid plugin [", name, |
| "], missing field: priority") |
| return |
| end |
| |
| if not plugin.version then |
| core.log.error("invalid plugin [", name, "] missing field: version") |
| return |
| end |
| |
| if plugin.schema and plugin.schema.type == "object" then |
| if not plugin.schema.properties or |
| core.table.nkeys(plugin.schema.properties) == 0 |
| then |
| plugin.schema.properties = core.schema.plugin_disable_schema |
| end |
| end |
| |
| plugin.name = name |
| plugin.attr = plugin_attr(name) |
| core.table.insert(plugins_list, plugin) |
| |
| if plugin.init then |
| plugin.init() |
| end |
| |
| return |
| end |
| |
| |
| local function plugins_eq(old, new) |
| local eq = core.table.set_eq(old, new) |
| if not eq then |
| core.log.info("plugin list changed") |
| return false |
| end |
| |
| for name, plugin in pairs(old) do |
| eq = core.table.deep_eq(plugin.attr, plugin_attr(name)) |
| if not eq then |
| core.log.info("plugin_attr of ", name, " changed") |
| return false |
| end |
| end |
| |
| return true |
| end |
| |
| |
| local function load(plugin_names) |
| local processed = {} |
| for _, name in ipairs(plugin_names) do |
| if processed[name] == nil then |
| processed[name] = true |
| end |
| end |
| |
| -- the same configure may be synchronized more than one |
| if plugins_eq(local_plugins_hash, processed) then |
| core.log.info("plugins not changed") |
| return true |
| end |
| |
| core.log.warn("new plugins: ", core.json.delay_encode(processed)) |
| |
| for name in pairs(local_plugins_hash) do |
| unload_plugin(name) |
| end |
| |
| core.table.clear(local_plugins) |
| core.table.clear(local_plugins_hash) |
| |
| for name in pairs(processed) do |
| load_plugin(name, local_plugins) |
| end |
| |
| -- sort by plugin's priority |
| if #local_plugins > 1 then |
| sort_tab(local_plugins, sort_plugin) |
| end |
| |
| for i, plugin in ipairs(local_plugins) do |
| local_plugins_hash[plugin.name] = plugin |
| if local_conf and local_conf.apisix |
| and local_conf.apisix.enable_debug then |
| core.log.warn("loaded plugin and sort by priority:", |
| " ", plugin.priority, |
| " name: ", plugin.name) |
| end |
| end |
| |
| _M.load_times = _M.load_times + 1 |
| core.log.info("load plugin times: ", _M.load_times) |
| return true |
| end |
| |
| |
| local function load_stream(plugin_names) |
| local processed = {} |
| for _, name in ipairs(plugin_names) do |
| if processed[name] == nil then |
| processed[name] = true |
| end |
| end |
| |
| -- the same configure may be synchronized more than one |
| if plugins_eq(stream_local_plugins_hash, processed) then |
| core.log.info("plugins not changed") |
| return true |
| end |
| |
| core.log.warn("new plugins: ", core.json.delay_encode(processed)) |
| |
| for name in pairs(stream_local_plugins_hash) do |
| unload_plugin(name, true) |
| end |
| |
| core.table.clear(stream_local_plugins) |
| core.table.clear(stream_local_plugins_hash) |
| |
| for name in pairs(processed) do |
| load_plugin(name, stream_local_plugins, true) |
| end |
| |
| -- sort by plugin's priority |
| if #stream_local_plugins > 1 then |
| sort_tab(stream_local_plugins, sort_plugin) |
| end |
| |
| for i, plugin in ipairs(stream_local_plugins) do |
| stream_local_plugins_hash[plugin.name] = plugin |
| if local_conf and local_conf.apisix |
| and local_conf.apisix.enable_debug then |
| core.log.warn("loaded stream plugin and sort by priority:", |
| " ", plugin.priority, |
| " name: ", plugin.name) |
| end |
| end |
| |
| _M.stream_load_times = _M.stream_load_times + 1 |
| core.log.info("stream plugins: ", |
| core.json.delay_encode(stream_local_plugins, true)) |
| core.log.info("load stream plugin times: ", _M.stream_load_times) |
| return true |
| end |
| |
| |
| function _M.load(config) |
| local http_plugin_names |
| local stream_plugin_names |
| |
| if not config then |
| -- called during starting or hot reload in admin |
| local_conf = core.config.local_conf(true) |
| http_plugin_names = local_conf.plugins |
| stream_plugin_names = local_conf.stream_plugins |
| else |
| -- called during synchronizing plugin data |
| http_plugin_names = {} |
| stream_plugin_names = {} |
| for _, conf_value in config_util.iterate_values(config.values) do |
| local plugins_conf = conf_value.value |
| for _, conf in ipairs(plugins_conf) do |
| if conf.stream then |
| core.table.insert(stream_plugin_names, conf.name) |
| else |
| core.table.insert(http_plugin_names, conf.name) |
| end |
| end |
| end |
| end |
| |
| if ngx.config.subsystem == "http"then |
| if not http_plugin_names then |
| core.log.error("failed to read plugin list from local file") |
| else |
| local ok, err = load(http_plugin_names) |
| if not ok then |
| core.log.error("failed to load plugins: ", err) |
| end |
| end |
| end |
| |
| if not stream_plugin_names then |
| core.log.warn("failed to read stream plugin list from local file") |
| else |
| local ok, err = load_stream(stream_plugin_names) |
| if not ok then |
| core.log.error("failed to load stream plugins: ", err) |
| end |
| end |
| |
| -- for test |
| return local_plugins |
| end |
| |
| |
| function _M.filter(user_route, plugins) |
| local user_plugin_conf = user_route.value.plugins |
| if user_plugin_conf == nil or |
| core.table.nkeys(user_plugin_conf) == 0 then |
| if local_conf and local_conf.apisix.enable_debug then |
| core.response.set_header("Apisix-Plugins", "no plugin") |
| end |
| return core.empty_tab |
| end |
| |
| plugins = plugins or core.tablepool.fetch("plugins", 32, 0) |
| for _, plugin_obj in ipairs(local_plugins) do |
| local name = plugin_obj.name |
| local plugin_conf = user_plugin_conf[name] |
| |
| if type(plugin_conf) == "table" and not plugin_conf.disable then |
| core.table.insert(plugins, plugin_obj) |
| core.table.insert(plugins, plugin_conf) |
| end |
| end |
| |
| if local_conf.apisix.enable_debug then |
| local t = {} |
| for i = 1, #plugins, 2 do |
| core.table.insert(t, plugins[i].name) |
| end |
| core.response.set_header("Apisix-Plugins", core.table.concat(t, ", ")) |
| end |
| |
| return plugins |
| end |
| |
| |
| function _M.stream_filter(user_route, plugins) |
| plugins = plugins or core.table.new(#stream_local_plugins * 2, 0) |
| local user_plugin_conf = user_route.value.plugins |
| if user_plugin_conf == nil then |
| if local_conf and local_conf.apisix.enable_debug then |
| core.response.set_header("Apisix-Plugins", "no plugin") |
| end |
| return plugins |
| end |
| |
| for _, plugin_obj in ipairs(stream_local_plugins) do |
| local name = plugin_obj.name |
| local plugin_conf = user_plugin_conf[name] |
| |
| if type(plugin_conf) == "table" and not plugin_conf.disable then |
| core.table.insert(plugins, plugin_obj) |
| core.table.insert(plugins, plugin_conf) |
| end |
| end |
| |
| if local_conf.apisix.enable_debug then |
| local t = {} |
| for i = 1, #plugins, 2 do |
| core.table.insert(t, plugins[i].name) |
| end |
| core.response.set_header("Apisix-Plugins", core.table.concat(t, ", ")) |
| end |
| |
| return plugins |
| end |
| |
| |
| local function merge_service_route(service_conf, route_conf) |
| local new_conf = core.table.deepcopy(service_conf) |
| new_conf.value.service_id = new_conf.value.id |
| new_conf.value.id = route_conf.value.id |
| new_conf.modifiedIndex = route_conf.modifiedIndex |
| |
| if route_conf.value.plugins then |
| for name, conf in pairs(route_conf.value.plugins) do |
| if not new_conf.value.plugins then |
| new_conf.value.plugins = {} |
| end |
| |
| new_conf.value.plugins[name] = conf |
| end |
| end |
| |
| local route_upstream = route_conf.value.upstream |
| if route_upstream then |
| new_conf.value.upstream = route_upstream |
| |
| if route_upstream.checks then |
| route_upstream.parent = route_conf |
| end |
| |
| new_conf.value.upstream_id = nil |
| new_conf.has_domain = route_conf.has_domain |
| end |
| |
| if route_conf.value.upstream_id then |
| new_conf.value.upstream_id = route_conf.value.upstream_id |
| new_conf.has_domain = route_conf.has_domain |
| end |
| |
| -- core.log.info("merged conf : ", core.json.delay_encode(new_conf)) |
| return new_conf |
| end |
| |
| |
| function _M.merge_service_route(service_conf, route_conf) |
| core.log.info("service conf: ", core.json.delay_encode(service_conf)) |
| core.log.info(" route conf: ", core.json.delay_encode(route_conf)) |
| |
| local route_service_key = route_conf.value.id .. "#" |
| .. route_conf.modifiedIndex .. "#" .. service_conf.modifiedIndex |
| return merged_route(route_service_key, service_conf, |
| merge_service_route, |
| service_conf, route_conf) |
| end |
| |
| |
| local function merge_consumer_route(route_conf, consumer_conf) |
| if not consumer_conf.plugins or |
| core.table.nkeys(consumer_conf.plugins) == 0 |
| then |
| core.log.info("consumer no plugins") |
| return route_conf |
| end |
| |
| local new_route_conf = core.table.deepcopy(route_conf) |
| for name, conf in pairs(consumer_conf.plugins) do |
| if not new_route_conf.value.plugins then |
| new_route_conf.value.plugins = {} |
| end |
| |
| new_route_conf.value.plugins[name] = conf |
| end |
| |
| core.log.info("merged conf : ", core.json.delay_encode(new_route_conf)) |
| return new_route_conf |
| end |
| |
| |
| function _M.merge_consumer_route(route_conf, consumer_conf, api_ctx) |
| core.log.info("route conf: ", core.json.delay_encode(route_conf)) |
| core.log.info("consumer conf: ", core.json.delay_encode(consumer_conf)) |
| |
| local flag = tostring(route_conf) .. tostring(consumer_conf) |
| local new_conf = merged_route(flag, nil, |
| merge_consumer_route, route_conf, consumer_conf) |
| |
| api_ctx.conf_type = api_ctx.conf_type .. "&consumer" |
| api_ctx.conf_version = api_ctx.conf_version .. "&" .. |
| api_ctx.consumer_ver |
| api_ctx.conf_id = api_ctx.conf_id .. "&" .. api_ctx.consumer_name |
| |
| return new_conf, new_conf ~= route_conf |
| end |
| |
| |
| local init_plugins_syncer |
| do |
| local plugins_conf |
| |
| function init_plugins_syncer() |
| local err |
| plugins_conf, err = core.config.new("/plugins", { |
| automatic = true, |
| item_schema = core.schema.plugins, |
| single_item = true, |
| filter = function() |
| _M.load(plugins_conf) |
| end, |
| }) |
| if not plugins_conf then |
| error("failed to create etcd instance for fetching /plugins : " .. err) |
| end |
| end |
| end |
| |
| |
| function _M.init_worker() |
| _M.load() |
| |
| -- some plugins need to be initialized in init* phases |
| if ngx.config.subsystem == "http"then |
| require("apisix.plugins.prometheus.exporter").init() |
| end |
| |
| if local_conf and not local_conf.apisix.enable_admin then |
| init_plugins_syncer() |
| end |
| |
| local plugin_metadatas, err = core.config.new("/plugin_metadata", |
| {automatic = true} |
| ) |
| if not plugin_metadatas then |
| error("failed to create etcd instance for fetching /plugin_metadatas : " |
| .. err) |
| end |
| |
| _M.plugin_metadatas = plugin_metadatas |
| end |
| |
| |
| function _M.plugin_metadata(name) |
| return _M.plugin_metadatas:get(name) |
| end |
| |
| |
| function _M.get(name) |
| return local_plugins_hash and local_plugins_hash[name] |
| end |
| |
| |
| return _M |