blob: 5d5c5d2a9f8bd3f419e5e43c118c85e254e91242 [file] [log] [blame]
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local errlog = require("ngx.errlog")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")
local timers = require("apisix.timers")
local http = require("resty.http")
local producer = require("resty.kafka.producer")
local plugin_name = "error-log-logger"
local table = core.table
local schema_def = core.schema
local ngx = ngx
local tcp = ngx.socket.tcp
local tostring = tostring
local ipairs = ipairs
local string = require("string")
local lrucache = core.lrucache.new({
ttl = 300, count = 32
})
local kafka_prod_lrucache = core.lrucache.new({
ttl = 300, count = 32
})
local metadata_schema = {
type = "object",
properties = {
tcp = {
type = "object",
properties = {
host = schema_def.host_def,
port = {type = "integer", minimum = 0},
tls = {type = "boolean", default = false},
tls_server_name = {type = "string"},
},
required = {"host", "port"}
},
skywalking = {
type = "object",
properties = {
endpoint_addr = {schema_def.uri, default = "http://127.0.0.1:12900/v3/logs"},
service_name = {type = "string", default = "APISIX"},
service_instance_name = {type="string", default = "APISIX Service Instance"},
},
},
clickhouse = {
type = "object",
properties = {
endpoint_addr = {schema_def.uri_def, default="http://127.0.0.1:8123"},
user = {type = "string", default = "default"},
password = {type = "string", default = ""},
database = {type = "string", default = ""},
logtable = {type = "string", default = ""},
},
required = {"endpoint_addr", "user", "password", "database", "logtable"}
},
kafka = {
type = "object",
properties = {
brokers = {
type = "array",
minItems = 1,
items = {
type = "object",
properties = {
host = {
type = "string",
description = "the host of kafka broker",
},
port = {
type = "integer",
minimum = 1,
maximum = 65535,
description = "the port of kafka broker",
},
sasl_config = {
type = "object",
description = "sasl config",
properties = {
mechanism = {
type = "string",
default = "PLAIN",
enum = {"PLAIN"},
},
user = { type = "string", description = "user" },
password = { type = "string", description = "password" },
},
required = {"user", "password"},
},
},
required = {"host", "port"},
},
uniqueItems = true,
},
kafka_topic = {type = "string"},
producer_type = {
type = "string",
default = "async",
enum = {"async", "sync"},
},
required_acks = {
type = "integer",
default = 1,
enum = { 0, 1, -1 },
},
key = {type = "string"},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
meta_refresh_interval = {type = "integer", minimum = 1, default = 30},
},
required = {"brokers", "kafka_topic"},
},
name = {type = "string", default = plugin_name},
level = {type = "string", default = "WARN", enum = {"STDERR", "EMERG", "ALERT", "CRIT",
"ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"}},
timeout = {type = "integer", minimum = 1, default = 3},
keepalive = {type = "integer", minimum = 1, default = 30},
batch_max_size = {type = "integer", minimum = 0, default = 1000},
max_retry_count = {type = "integer", minimum = 0, default = 0},
retry_delay = {type = "integer", minimum = 0, default = 1},
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 3},
},
oneOf = {
{required = {"skywalking"}},
{required = {"tcp"}},
{required = {"clickhouse"}},
{required = {"kafka"}},
-- for compatible with old schema
{required = {"host", "port"}}
},
encrypt_fields = {"clickhouse.password"},
}
local schema = {
type = "object",
}
local log_level = {
STDERR = ngx.STDERR,
EMERG = ngx.EMERG,
ALERT = ngx.ALERT,
CRIT = ngx.CRIT,
ERR = ngx.ERR,
ERROR = ngx.ERR,
WARN = ngx.WARN,
NOTICE = ngx.NOTICE,
INFO = ngx.INFO,
DEBUG = ngx.DEBUG
}
local config = {}
local log_buffer
local _M = {
version = 0.1,
priority = 1091,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
scope = "global",
}
function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
end
local function send_to_tcp_server(data)
local sock, soc_err = tcp()
if not sock then
return false, "failed to init the socket " .. soc_err
end
sock:settimeout(config.timeout * 1000)
local tcp_config = config.tcp
local ok, err = sock:connect(tcp_config.host, tcp_config.port)
if not ok then
return false, "failed to connect the TCP server: host[" .. tcp_config.host
.. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
end
if tcp_config.tls then
ok, err = sock:sslhandshake(false, tcp_config.tls_server_name, false)
if not ok then
sock:close()
return false, "failed to perform TLS handshake to TCP server: host["
.. tcp_config.host .. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
end
end
local bytes, err = sock:send(data)
if not bytes then
sock:close()
return false, "failed to send data to TCP server: host[" .. tcp_config.host
.. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
end
sock:setkeepalive(config.keepalive * 1000)
return true
end
local function send_to_skywalking(log_message)
local err_msg
local res = true
core.log.info("sending a batch logs to ", config.skywalking.endpoint_addr)
local httpc = http.new()
httpc:set_timeout(config.timeout * 1000)
local entries = {}
local service_instance_name = config.skywalking.service_instance_name
if service_instance_name == "$hostname" then
service_instance_name = core.utils.gethostname()
end
for i = 1, #log_message, 2 do
local content = {
service = config.skywalking.service_name,
serviceInstance = service_instance_name,
endpoint = "",
body = {
text = {
text = log_message[i]
}
}
}
table.insert(entries, content)
end
local httpc_res, httpc_err = httpc:request_uri(
config.skywalking.endpoint_addr,
{
method = "POST",
body = core.json.encode(entries),
keepalive_timeout = config.keepalive * 1000,
headers = {
["Content-Type"] = "application/json",
}
}
)
if not httpc_res then
return false, "error while sending data to skywalking["
.. config.skywalking.endpoint_addr .. "] " .. httpc_err
end
-- some error occurred in the server
if httpc_res.status >= 400 then
res = false
err_msg = string.format(
"server returned status code[%s] skywalking[%s] body[%s]",
httpc_res.status,
config.skywalking.endpoint_addr.endpoint_addr,
httpc_res:read_body()
)
end
return res, err_msg
end
local function send_to_clickhouse(log_message)
local err_msg
local res = true
core.log.info("sending a batch logs to ", config.clickhouse.endpoint_addr)
local httpc = http.new()
httpc:set_timeout(config.timeout * 1000)
local entries = {}
for i = 1, #log_message, 2 do
-- TODO Here save error log as a whole string to clickhouse 'data' column.
-- We will add more columns in the future.
table.insert(entries, core.json.encode({data=log_message[i]}))
end
local httpc_res, httpc_err = httpc:request_uri(
config.clickhouse.endpoint_addr,
{
method = "POST",
body = "INSERT INTO " .. config.clickhouse.logtable .." FORMAT JSONEachRow "
.. table.concat(entries, " "),
keepalive_timeout = config.keepalive * 1000,
headers = {
["Content-Type"] = "application/json",
["X-ClickHouse-User"] = config.clickhouse.user,
["X-ClickHouse-Key"] = config.clickhouse.password,
["X-ClickHouse-Database"] = config.clickhouse.database
}
}
)
if not httpc_res then
return false, "error while sending data to clickhouse["
.. config.clickhouse.endpoint_addr .. "] " .. httpc_err
end
-- some error occurred in the server
if httpc_res.status >= 400 then
res = false
err_msg = string.format(
"server returned status code[%s] clickhouse[%s] body[%s]",
httpc_res.status,
config.clickhouse.endpoint_addr.endpoint_addr,
httpc_res:read_body()
)
end
return res, err_msg
end
local function update_filter(value)
local level = log_level[value.level]
local status, err = errlog.set_filter_level(level)
if not status then
return nil, "failed to set filter level by ngx.errlog, the error is :" .. err
else
core.log.notice("set the filter_level to ", value.level)
end
return value
end
local function create_producer(broker_list, broker_config, cluster_name)
core.log.info("create new kafka producer instance")
return producer:new(broker_list, broker_config, cluster_name)
end
local function send_to_kafka(log_message)
-- avoid race of the global config
local metadata = plugin.plugin_metadata(plugin_name)
if not (metadata and metadata.value and metadata.modifiedIndex) then
return false, "please set the correct plugin_metadata for " .. plugin_name
end
local config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value)
if not config then
return false, "get config failed: " .. err
end
core.log.info("sending a batch logs to kafka brokers: ",
core.json.delay_encode(config.kafka.brokers))
local broker_config = {}
broker_config["request_timeout"] = config.timeout * 1000
broker_config["producer_type"] = config.kafka.producer_type
broker_config["required_acks"] = config.kafka.required_acks
broker_config["refresh_interval"] = config.kafka.meta_refresh_interval * 1000
-- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions of messages in kafka
local prod, err = kafka_prod_lrucache(plugin_name, metadata.modifiedIndex,
create_producer, config.kafka.brokers, broker_config,
config.kafka.cluster_name)
if not prod then
return false, "get kafka producer failed: " .. err
end
core.log.info("kafka cluster name ", config.kafka.cluster_name, ", broker_list[1] port ",
prod.client.broker_list[1].port)
local ok
for i = 1, #log_message, 2 do
ok, err = prod:send(config.kafka.kafka_topic,
config.kafka.key, core.json.encode(log_message[i]))
if not ok then
return false, "failed to send data to Kafka topic: " .. err ..
", brokers: " .. core.json.encode(config.kafka.brokers)
end
core.log.info("send data to kafka: ", core.json.delay_encode(log_message[i]))
end
return true
end
local function send(data)
if config.skywalking then
return send_to_skywalking(data)
elseif config.clickhouse then
return send_to_clickhouse(data)
elseif config.kafka then
return send_to_kafka(data)
end
return send_to_tcp_server(data)
end
local function process()
local metadata = plugin.plugin_metadata(plugin_name)
if not (metadata and metadata.value and metadata.modifiedIndex) then
core.log.info("please set the correct plugin_metadata for ", plugin_name)
return
else
local err
config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value)
if not config then
core.log.warn("set log filter failed for ", err)
return
end
if not (config.tcp or config.skywalking or config.clickhouse or config.kafka) then
config.tcp = {
host = config.host,
port = config.port,
tls = config.tls,
tls_server_name = config.tls_server_name
}
core.log.warn(
string.format("The schema is out of date. Please update to the new configuration, "
.. "for example: {\"tcp\": {\"host\": \"%s\", \"port\": \"%s\"}}",
config.host, config.port
))
end
end
local err_level = log_level[metadata.value.level]
local entries = {}
local logs = errlog.get_logs(9)
while ( logs and #logs>0 ) do
for i = 1, #logs, 3 do
-- There will be some stale error logs after the filter level changed.
-- We should avoid reporting them.
if logs[i] <= err_level then
table.insert(entries, logs[i + 2])
table.insert(entries, "\n")
end
end
logs = errlog.get_logs(9)
end
if #entries == 0 then
return
end
if log_buffer then
for _, v in ipairs(entries) do
log_buffer:push(v)
end
return
end
local config_bat = {
name = config.name,
retry_delay = config.retry_delay,
batch_max_size = config.batch_max_size,
max_retry_count = config.max_retry_count,
buffer_duration = config.buffer_duration,
inactive_timeout = config.inactive_timeout,
}
local err
log_buffer, err = batch_processor:new(send, config_bat)
if not log_buffer then
core.log.warn("error when creating the batch processor: ", err)
return
end
for _, v in ipairs(entries) do
log_buffer:push(v)
end
end
function _M.init()
timers.register_timer("plugin#error-log-logger", process)
end
function _M.destroy()
timers.unregister_timer("plugin#error-log-logger")
end
return _M