blob: e8eabb64797035ea00a9ec56a1d71d634971e291 [file] [log] [blame]
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local config_local = require("apisix.core.config_local")
local yaml = require("tinyyaml")
local log = require("apisix.core.log")
local json = require("apisix.core.json")
local new_tab = require("table.new")
local check_schema = require("apisix.core.schema").check
local profile = require("apisix.core.profile")
local lfs = require("lfs")
local exiting = ngx.worker.exiting
local insert_tab = table.insert
local type = type
local ipairs = ipairs
local setmetatable = setmetatable
local ngx_sleep = require("apisix.core.utils").sleep
local ngx_timer_at = ngx.timer.at
local ngx_time = ngx.time
local sub_str = string.sub
local tostring = tostring
local pcall = pcall
local io = io
local ngx = ngx
local re_find = ngx.re.find
local apisix_yaml_path = profile:yaml_path("apisix")
local created_obj = {}
local _M = {
version = 0.2,
local_conf = config_local.local_conf,
clear_local_cache = config_local.clear_cache,
}
local mt = {
__index = _M,
__tostring = function(self)
return "apisix.yaml key: " .. (self.key or "")
end
}
local apisix_yaml
local apisix_yaml_ctime
local function read_apisix_yaml(premature, pre_mtime)
if premature then
return
end
local attributes, err = lfs.attributes(apisix_yaml_path)
if not attributes then
log.error("failed to fetch ", apisix_yaml_path, " attributes: ", err)
return
end
-- log.info("change: ", json.encode(attributes))
local last_change_time = attributes.change
if apisix_yaml_ctime == last_change_time then
return
end
local f, err = io.open(apisix_yaml_path, "r")
if not f then
log.error("failed to open file ", apisix_yaml_path, " : ", err)
return
end
f:seek('end', -10)
local end_flag = f:read("*a")
-- log.info("flag: ", end_flag)
local found_end_flag = re_find(end_flag, [[#END\s*$]], "jo")
if not found_end_flag then
f:close()
log.warn("missing valid end flag in file ", apisix_yaml_path)
return
end
f:seek('set')
local yaml_config = f:read("*a")
f:close()
local apisix_yaml_new = yaml.parse(yaml_config)
if not apisix_yaml_new then
log.error("failed to parse the content of file " .. apisix_yaml_path)
return
end
apisix_yaml = apisix_yaml_new
apisix_yaml_ctime = last_change_time
end
local function sync_data(self)
if not self.key then
return nil, "missing 'key' arguments"
end
if not apisix_yaml_ctime then
log.warn("wait for more time")
return nil, "failed to read local file " .. apisix_yaml_path
end
if self.conf_version == apisix_yaml_ctime then
return true
end
local items = apisix_yaml[self.key]
log.info(self.key, " items: ", json.delay_encode(items))
if not items then
self.values = new_tab(8, 0)
self.values_hash = new_tab(0, 8)
self.conf_version = apisix_yaml_ctime
return true
end
if self.values then
for _, item in ipairs(self.values) do
if item.clean_handlers then
for _, clean_handler in ipairs(item.clean_handlers) do
clean_handler(item)
end
item.clean_handlers = nil
end
end
self.values = nil
end
if self.single_item then
-- treat items as a single item
self.values = new_tab(1, 0)
self.values_hash = new_tab(0, 1)
local item = items
local conf_item = {value = item, modifiedIndex = apisix_yaml_ctime,
key = "/" .. self.key}
local data_valid = true
local err
if self.item_schema then
data_valid, err = check_schema(self.item_schema, item)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.delay_encode(item))
end
if data_valid and self.checker then
data_valid, err = self.checker(item)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.delay_encode(item))
end
end
end
if data_valid then
insert_tab(self.values, conf_item)
self.values_hash[self.key] = #self.values
conf_item.clean_handlers = {}
if self.filter then
self.filter(conf_item)
end
end
else
self.values = new_tab(#items, 0)
self.values_hash = new_tab(0, #items)
local err
for i, item in ipairs(items) do
local id = tostring(i)
local data_valid = true
if type(item) ~= "table" then
data_valid = false
log.error("invalid item data of [", self.key .. "/" .. id,
"], val: ", json.delay_encode(item),
", it should be an object")
end
local key = item.id or "arr_" .. i
local conf_item = {value = item, modifiedIndex = apisix_yaml_ctime,
key = "/" .. self.key .. "/" .. key}
if data_valid and self.item_schema then
data_valid, err = check_schema(self.item_schema, item)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.delay_encode(item))
end
end
if data_valid and self.checker then
data_valid, err = self.checker(item)
if not data_valid then
log.error("failed to check item data of [", self.key,
"] err:", err, " ,val: ", json.delay_encode(item))
end
end
if data_valid then
insert_tab(self.values, conf_item)
local item_id = conf_item.value.id or self.key .. "#" .. id
item_id = tostring(item_id)
self.values_hash[item_id] = #self.values
conf_item.value.id = item_id
conf_item.clean_handlers = {}
if self.filter then
self.filter(conf_item)
end
end
end
end
self.conf_version = apisix_yaml_ctime
return true
end
function _M.get(self, key)
if not self.values_hash then
return
end
local arr_idx = self.values_hash[tostring(key)]
if not arr_idx then
return nil
end
return self.values[arr_idx]
end
local function _automatic_fetch(premature, self)
if premature then
return
end
local i = 0
while not exiting() and self.running and i <= 32 do
i = i + 1
local ok, ok2, err = pcall(sync_data, self)
if not ok then
err = ok2
log.error("failed to fetch data from local file " .. apisix_yaml_path .. ": ",
err, ", ", tostring(self))
ngx_sleep(3)
break
elseif not ok2 and err then
if err ~= "timeout" and err ~= "Key not found"
and self.last_err ~= err then
log.error("failed to fetch data from local file " .. apisix_yaml_path .. ": ",
err, ", ", tostring(self))
end
if err ~= self.last_err then
self.last_err = err
self.last_err_time = ngx_time()
else
if ngx_time() - self.last_err_time >= 30 then
self.last_err = nil
end
end
ngx_sleep(0.5)
elseif not ok2 then
ngx_sleep(0.05)
else
ngx_sleep(0.1)
end
end
if not exiting() and self.running then
ngx_timer_at(0, _automatic_fetch, self)
end
end
function _M.new(key, opts)
local local_conf, err = config_local.local_conf()
if not local_conf then
return nil, err
end
local automatic = opts and opts.automatic
local item_schema = opts and opts.item_schema
local filter_fun = opts and opts.filter
local single_item = opts and opts.single_item
local checker = opts and opts.checker
-- like /routes and /upstreams, remove first char `/`
if key then
key = sub_str(key, 2)
end
local obj = setmetatable({
automatic = automatic,
item_schema = item_schema,
checker = checker,
sync_times = 0,
running = true,
conf_version = 0,
values = nil,
routes_hash = nil,
prev_index = nil,
last_err = nil,
last_err_time = nil,
key = key,
single_item = single_item,
filter = filter_fun,
}, mt)
if automatic then
if not key then
return nil, "missing `key` argument"
end
local ok, ok2, err = pcall(sync_data, obj)
if not ok then
err = ok2
end
if err then
log.error("failed to fetch data from local file ", apisix_yaml_path, ": ",
err, ", ", key)
end
ngx_timer_at(0, _automatic_fetch, obj)
end
if key then
created_obj[key] = obj
end
return obj
end
function _M.close(self)
self.running = false
end
function _M.server_version(self)
return "apisix.yaml " .. _M.version
end
function _M.fetch_created_obj(key)
return created_obj[sub_str(key, 2)]
end
function _M.init()
read_apisix_yaml()
return true
end
function _M.init_worker()
-- sync data in each non-master process
ngx.timer.every(1, read_apisix_yaml)
end
return _M