| -- |
| -- Licensed to the Apache Software Foundation (ASF) under one or more |
| -- contributor license agreements. See the NOTICE file distributed with |
| -- this work for additional information regarding copyright ownership. |
| -- The ASF licenses this file to You under the Apache License, Version 2.0 |
| -- (the "License"); you may not use this file except in compliance with |
| -- the License. You may obtain a copy of the License at |
| -- |
| -- http://www.apache.org/licenses/LICENSE-2.0 |
| -- |
| -- Unless required by applicable law or agreed to in writing, software |
| -- distributed under the License is distributed on an "AS IS" BASIS, |
| -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| -- See the License for the specific language governing permissions and |
| -- limitations under the License. |
| -- |
| |
| local local_conf = require("apisix.core.config_local").local_conf() |
| local http = require("resty.http") |
| local core = require("apisix.core") |
| local ipmatcher = require("resty.ipmatcher") |
| local ipairs = ipairs |
| local tostring = tostring |
| local type = type |
| local math_random = math.random |
| local ngx = ngx |
| local ngx_timer_at = ngx.timer.at |
| local ngx_timer_every = ngx.timer.every |
| local string_sub = string.sub |
| local str_find = core.string.find |
| local log = core.log |
| |
| local default_weight |
| local applications |
| |
| |
| local _M = { |
| version = 0.1, |
| } |
| |
| |
| local function service_info() |
| local host = local_conf.discovery and |
| local_conf.discovery.eureka and local_conf.discovery.eureka.host |
| if not host then |
| log.error("do not set eureka.host") |
| return |
| end |
| |
| local basic_auth |
| -- TODO Add health check to get healthy nodes. |
| local url = host[math_random(#host)] |
| local auth_idx = str_find(url, "@") |
| if auth_idx then |
| local protocol_idx = str_find(url, "://") |
| local protocol = string_sub(url, 1, protocol_idx + 2) |
| local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1) |
| local other = string_sub(url, auth_idx + 1) |
| url = protocol .. other |
| basic_auth = "Basic " .. ngx.encode_base64(user_and_password) |
| end |
| if local_conf.discovery.eureka.prefix then |
| url = url .. local_conf.discovery.eureka.prefix |
| end |
| if string_sub(url, #url) ~= "/" then |
| url = url .. "/" |
| end |
| |
| return url, basic_auth |
| end |
| |
| |
| local function request(request_uri, basic_auth, method, path, query, body) |
| log.info("eureka uri:", request_uri, ".") |
| local url = request_uri .. path |
| local headers = core.table.new(0, 5) |
| headers['Connection'] = 'Keep-Alive' |
| headers['Accept'] = 'application/json' |
| |
| if basic_auth then |
| headers['Authorization'] = basic_auth |
| end |
| |
| if body and 'table' == type(body) then |
| local err |
| body, err = core.json.encode(body) |
| if not body then |
| return nil, 'invalid body : ' .. err |
| end |
| -- log.warn(method, url, body) |
| headers['Content-Type'] = 'application/json' |
| end |
| |
| local httpc = http.new() |
| local timeout = local_conf.discovery.eureka.timeout |
| local connect_timeout = timeout and timeout.connect or 2000 |
| local send_timeout = timeout and timeout.send or 2000 |
| local read_timeout = timeout and timeout.read or 5000 |
| log.info("connect_timeout:", connect_timeout, ", send_timeout:", send_timeout, |
| ", read_timeout:", read_timeout, ".") |
| httpc:set_timeouts(connect_timeout, send_timeout, read_timeout) |
| return httpc:request_uri(url, { |
| version = 1.1, |
| method = method, |
| headers = headers, |
| query = query, |
| body = body, |
| ssl_verify = false, |
| }) |
| end |
| |
| |
| local function parse_instance(instance) |
| local status = instance.status |
| local overridden_status = instance.overriddenstatus or instance.overriddenStatus |
| if overridden_status and overridden_status ~= "UNKNOWN" then |
| status = overridden_status |
| end |
| |
| if status ~= "UP" then |
| return |
| end |
| local port |
| if tostring(instance.port["@enabled"]) == "true" and instance.port["$"] then |
| port = instance.port["$"] |
| -- secure = false |
| end |
| if tostring(instance.securePort["@enabled"]) == "true" and instance.securePort["$"] then |
| port = instance.securePort["$"] |
| -- secure = true |
| end |
| local ip = instance.ipAddr |
| if not ipmatcher.parse_ipv4(ip) and |
| not ipmatcher.parse_ipv6(ip) then |
| log.error(instance.app, " service ", instance.hostName, " node IP ", ip, |
| " is invalid(must be IPv4 or IPv6).") |
| return |
| end |
| return ip, port, instance.metadata |
| end |
| |
| |
| local function fetch_full_registry(premature) |
| if premature then |
| return |
| end |
| |
| local request_uri, basic_auth = service_info() |
| if not request_uri then |
| return |
| end |
| |
| local res, err = request(request_uri, basic_auth, "GET", "apps") |
| if not res then |
| log.error("failed to fetch registry", err) |
| return |
| end |
| |
| if not res.body or res.status ~= 200 then |
| log.error("failed to fetch registry, status = ", res.status) |
| return |
| end |
| |
| local json_str = res.body |
| local data, err = core.json.decode(json_str) |
| if not data then |
| log.error("invalid response body: ", json_str, " err: ", err) |
| return |
| end |
| local apps = data.applications.application |
| local up_apps = core.table.new(0, #apps) |
| for _, app in ipairs(apps) do |
| for _, instance in ipairs(app.instance) do |
| local ip, port, metadata = parse_instance(instance) |
| if ip and port then |
| local nodes = up_apps[app.name] |
| if not nodes then |
| nodes = core.table.new(#app.instance, 0) |
| up_apps[app.name] = nodes |
| end |
| core.table.insert(nodes, { |
| host = ip, |
| port = port, |
| weight = metadata and metadata.weight or default_weight, |
| metadata = metadata, |
| }) |
| if metadata then |
| -- remove useless data |
| metadata.weight = nil |
| end |
| end |
| end |
| end |
| applications = up_apps |
| end |
| |
| |
| function _M.nodes(service_name) |
| if not applications then |
| log.error("failed to fetch nodes for : ", service_name) |
| return |
| end |
| |
| return applications[service_name] |
| end |
| |
| |
| function _M.init_worker() |
| default_weight = local_conf.discovery.eureka.weight or 100 |
| log.info("default_weight:", default_weight, ".") |
| local fetch_interval = local_conf.discovery.eureka.fetch_interval or 30 |
| log.info("fetch_interval:", fetch_interval, ".") |
| ngx_timer_at(0, fetch_full_registry) |
| ngx_timer_every(fetch_interval, fetch_full_registry) |
| end |
| |
| |
| function _M.dump_data() |
| return {config = local_conf.discovery.eureka, services = applications or {}} |
| end |
| |
| |
| return _M |