blob: bd61e17e271321d11edf78886ebdd73263685b65 [file] [log] [blame]
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local require = require
local local_conf = require("apisix.core.config_local").local_conf()
local core = require("apisix.core")
local resty_consul = require('resty.consul')
local cjson = require('cjson')
local http = require('resty.http')
local util = require("apisix.cli.util")
local ipairs = ipairs
local error = error
local ngx = ngx
local unpack = unpack
local ngx_re_match = ngx.re.match
local tonumber = tonumber
local pairs = pairs
local ipairs = ipairs
local ngx_timer_at = ngx.timer.at
local ngx_timer_every = ngx.timer.every
local log = core.log
local ngx_decode_base64 = ngx.decode_base64
local json_delay_encode = core.json.delay_encode
local cjson_null = cjson.null
local applications = core.table.new(0, 5)
local default_service
local default_weight
local default_prefix_rule
local skip_keys_map = core.table.new(0, 1)
local dump_params
local events
local events_list
local consul_apps
local schema = {
type = "object",
properties = {
servers = {
type = "array",
minItems = 1,
items = {
type = "string",
}
},
fetch_interval = {type = "integer", minimum = 1, default = 3},
keepalive = {
type = "boolean",
default = true
},
prefix = {type = "string", default = "upstreams"},
weight = {type = "integer", minimum = 1, default = 1},
timeout = {
type = "object",
properties = {
connect = {type = "integer", minimum = 1, default = 2000},
read = {type = "integer", minimum = 1, default = 2000},
wait = {type = "integer", minimum = 1, default = 60}
},
default = {
connect = 2000,
read = 2000,
wait = 60,
}
},
skip_keys = {
type = "array",
minItems = 1,
items = {
type = "string",
}
},
dump = {
type = "object",
properties = {
path = {type = "string", minLength = 1},
load_on_init = {type = "boolean", default = true},
expire = {type = "integer", default = 0},
},
required = {"path"},
},
default_service = {
type = "object",
properties = {
host = {type = "string"},
port = {type = "integer"},
metadata = {
type = "object",
properties = {
fail_timeout = {type = "integer", default = 1},
weigth = {type = "integer", default = 1},
max_fails = {type = "integer", default = 1}
},
default = {
fail_timeout = 1,
weigth = 1,
max_fails = 1
}
}
}
}
},
required = {"servers"}
}
local _M = {
version = 0.3,
}
local function discovery_consul_callback(data, event, source, pid)
applications = data
log.notice("update local variable application, event is: ", event,
"source: ", source, "server pid:", pid,
", application: ", core.json.encode(applications, true))
end
function _M.all_nodes()
return applications
end
function _M.nodes(service_name)
if not applications then
log.error("application is nil, failed to fetch nodes for : ", service_name)
return
end
local resp_list = applications[service_name]
if not resp_list then
log.error("fetch nodes failed by ", service_name, ", return default service")
return default_service and {default_service}
end
log.info("process id: ", ngx.worker.id(), ", applications[", service_name, "] = ",
json_delay_encode(resp_list, true))
return resp_list
end
local function parse_instance(node, server_name_prefix)
local key = node.Key
if key == cjson_null or not key or #key == 0 then
log.error("consul_key_empty, server_name_prefix: ", server_name_prefix,
", node: ", json_delay_encode(node, true))
return false
end
local result = ngx_re_match(key, default_prefix_rule, "jo")
if not result then
log.error("server name parse error, server_name_prefix: ", server_name_prefix,
", node: ", json_delay_encode(node, true))
return false
end
local sn, host, port = result[1], result[2], result[3]
-- if exist, skip special kesy
if sn and skip_keys_map[sn] then
return false
end
-- base64 value = "IHsid2VpZ2h0IjogMTIwLCAibWF4X2ZhaWxzIjogMiwgImZhaWxfdGltZW91dCI6IDJ9"
-- ori value = "{"weight": 120, "max_fails": 2, "fail_timeout": 2}"
local metadataBase64 = node.Value
if metadataBase64 == cjson_null or not metadataBase64 or #metadataBase64 == 0 then
log.error("error: consul_value_empty, server_name_prefix: ", server_name_prefix,
", node: ", json_delay_encode(node, true))
return false
end
local metadata, err = core.json.decode(ngx_decode_base64(metadataBase64))
if err then
log.error("invalid upstream value, server_name_prefix: ", server_name_prefix,
",err: ", err, ", node: ", json_delay_encode(node, true))
return false
elseif metadata.check_status == false or metadata.check_status == "false" then
log.error("server node unhealthy, server_name_prefix: ", server_name_prefix,
", node: ", json_delay_encode(node, true))
return false
end
return true, host, tonumber(port), metadata, sn
end
local function update_application(server_name_prefix, data)
local sn
local up_apps = core.table.new(0, #data)
local weight = default_weight
for _, node in ipairs(data) do
local succ, ip, port, metadata, server_name = parse_instance(node, server_name_prefix)
if succ then
sn = server_name_prefix .. server_name
local nodes = up_apps[sn]
if not nodes then
nodes = core.table.new(1, 0)
up_apps[sn] = nodes
end
core.table.insert(nodes, {
host = ip,
port = port,
weight = metadata and metadata.weight or weight,
})
end
end
-- clean old unused data
local old_apps = consul_apps[server_name_prefix] or {}
for k, _ in pairs(old_apps) do
applications[k] = nil
end
core.table.clear(old_apps)
for k, v in pairs(up_apps) do
applications[k] = v
end
consul_apps[server_name_prefix] = up_apps
log.info("update applications: ", core.json.encode(applications))
end
local function read_dump_srvs()
local data, err = util.read_file(dump_params.path)
if not data then
log.notice("read dump file get error: ", err)
return
end
log.info("read dump file: ", data)
data = util.trim(data)
if #data == 0 then
log.error("dump file is empty")
return
end
local entity, err = core.json.decode(data)
if err then
log.error("decoded dump data got error: ", err, ", file content: ", data)
return
end
if not entity.services or not entity.last_update then
log.warn("decoded dump data miss fields, file content: ", data)
return
end
local now_time = ngx.time()
log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ",
dump_params.expire, ", now_time: ", now_time)
if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire) < now_time then
log.warn("dump file: ", dump_params.path, " had expired, ignored it")
return
end
applications = entity.services
log.info("load dump file into memory success")
end
local function write_dump_srvs()
local entity = {
services = applications,
last_update = ngx.time(),
expire = dump_params.expire, -- later need handle it
}
local data = core.json.encode(entity)
local succ, err = util.write_file(dump_params.path, data)
if not succ then
log.error("write dump into file got error: ", err)
end
end
local function show_dump_file()
if not dump_params then
return 503, "dump params is nil"
end
local data, err = util.read_file(dump_params.path)
if not data then
return 503, err
end
return 200, data
end
function _M.connect(premature, consul_server)
if premature then
return
end
local consul_client = resty_consul:new({
host = consul_server.host,
port = consul_server.port,
connect_timeout = consul_server.connect_timeout,
read_timeout = consul_server.read_timeout,
default_args = consul_server.default_args,
})
log.info("consul_server: ", json_delay_encode(consul_server, true))
local result, err = consul_client:get(consul_server.consul_key)
local error_info = (err ~= nil and err)
or ((result ~= nil and result.status ~= 200)
and result.status)
if error_info then
log.error("connect consul: ", consul_server.server_name_key,
" by key: ", consul_server.consul_key,
", got result: ", json_delay_encode(result, true),
", with error: ", error_info)
goto ERR
end
log.info("connect consul: ", consul_server.server_name_key,
", result status: ", result.status,
", result.headers.index: ", result.headers['X-Consul-Index'],
", result body: ", json_delay_encode(result.body))
-- if current index different last index then update application
if consul_server.index ~= result.headers['X-Consul-Index'] then
consul_server.index = result.headers['X-Consul-Index']
-- only long connect type use index
if consul_server.keepalive then
consul_server.default_args.index = result.headers['X-Consul-Index']
end
-- decode body, decode json, update application, error handling
if result.body and #result.body ~= 0 then
log.notice("server_name: ", consul_server.server_name_key,
", header: ", core.json.encode(result.headers, true),
", body: ", core.json.encode(result.body, true))
update_application(consul_server.server_name_key, result.body)
--update events
local ok, err = events.post(events_list._source, events_list.updating, applications)
if not ok then
log.error("post_event failure with ", events_list._source,
", update application error: ", err)
end
if dump_params then
ngx_timer_at(0, write_dump_srvs)
end
end
end
:: ERR ::
local keepalive = consul_server.keepalive
if keepalive then
local ok, err = ngx_timer_at(0, _M.connect, consul_server)
if not ok then
log.error("create ngx_timer_at got error: ", err)
return
end
end
end
local function format_consul_params(consul_conf)
local consul_server_list = core.table.new(0, #consul_conf.servers)
local args
if consul_conf.keepalive == false then
args = {
recurse = true,
}
elseif consul_conf.keepalive then
args = {
recurse = true,
wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by wait=0
index = 0,
}
end
for _, v in pairs(consul_conf.servers) do
local scheme, host, port, path = unpack(http.parse_uri(nil, v))
if scheme ~= "http" then
return nil, "only support consul http schema address, eg: http://address:port"
elseif path ~= "/" or core.string.has_suffix(v, '/') then
return nil, "invalid consul server address, the valid format: http://address:port"
end
core.table.insert(consul_server_list, {
host = host,
port = port,
connect_timeout = consul_conf.timeout.connect,
read_timeout = consul_conf.timeout.read,
consul_key = "/kv/" .. consul_conf.prefix,
server_name_key = v .. "/v1/kv/",
weight = consul_conf.weight,
keepalive = consul_conf.keepalive,
default_args = args,
index = 0,
fetch_interval = consul_conf.fetch_interval -- fetch interval to next connect consul
})
end
return consul_server_list
end
function _M.init_worker()
local consul_conf = local_conf.discovery.consul_kv
if not consul_conf
or not consul_conf.servers
or #consul_conf.servers == 0 then
error("do not set consul_kv correctly !")
return
end
local ok, err = core.schema.check(schema, consul_conf)
if not ok then
error("invalid consul_kv configuration: " .. err)
return
end
if consul_conf.dump then
local dump = consul_conf.dump
dump_params = dump
if dump.load_on_init then
read_dump_srvs()
end
end
events = require("resty.worker.events")
events_list = events.event_list(
"discovery_consul_update_application",
"updating"
)
if 0 ~= ngx.worker.id() then
events.register(discovery_consul_callback, events_list._source, events_list.updating)
return
end
log.notice("consul_conf: ", core.json.encode(consul_conf))
-- set default service, used when the server node cannot be found
if consul_conf.default_service then
default_service = consul_conf.default_service
end
default_weight = consul_conf.weight
default_prefix_rule = "(" .. consul_conf.prefix .. "/.*/)([a-zA-Z0-9.]+):([0-9]+)"
log.info("default params, default_weight: ", default_weight,
", default_prefix_rule: ", default_prefix_rule)
if consul_conf.skip_keys then
skip_keys_map = core.table.new(0, #consul_conf.skip_keys)
for _, v in ipairs(consul_conf.skip_keys) do
skip_keys_map[v] = true
end
end
local consul_servers_list, err = format_consul_params(consul_conf)
if err then
error(err)
return
end
log.info("consul_server_list: ", core.json.encode(consul_servers_list))
consul_apps = core.table.new(0, 1)
-- success or failure
for _, server in ipairs(consul_servers_list) do
local ok, err = ngx_timer_at(0, _M.connect, server)
if not ok then
error("create consul_kv got error: " .. err)
return
end
if server.keepalive == false then
ngx_timer_every(server.fetch_interval, _M.connect, server)
end
end
end
function _M.dump_data()
return {config = local_conf.discovery.consul_kv, services = applications}
end
function _M.control_api()
return {
{
methods = {"GET"},
uris = {"/show_dump_file"},
handler = show_dump_file,
}
}
end
return _M