blob: f4f28b4721538191528854aab26263b43c641538 [file] [log] [blame]
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local errlog = require("ngx.errlog")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")
local timers = require("apisix.timers")
local http = require("resty.http")
local plugin_name = "error-log-logger"
local table = core.table
local schema_def = core.schema
local ngx = ngx
local tcp = ngx.socket.tcp
local tostring = tostring
local ipairs = ipairs
local string = require("string")
local lrucache = core.lrucache.new({
ttl = 300, count = 32
})
local metadata_schema = {
type = "object",
properties = {
tcp = {
type = "object",
properties = {
host = schema_def.host_def,
port = {type = "integer", minimum = 0},
tls = {type = "boolean", default = false},
tls_server_name = {type = "string"},
},
required = {"host", "port"}
},
skywalking = {
type = "object",
properties = {
endpoint_addr = {schema_def.uri, default = "http://127.0.0.1:12900/v3/logs"},
service_name = {type = "string", default = "APISIX"},
service_instance_name = {type="string", default = "APISIX Service Instance"},
},
},
clickhouse = {
type = "object",
properties = {
endpoint_addr = {schema_def.uri_def, default="http://127.0.0.1:8123"},
user = {type = "string", default = "default"},
password = {type = "string", default = ""},
database = {type = "string", default = ""},
logtable = {type = "string", default = ""},
},
required = {"endpoint_addr", "user", "password", "database", "logtable"}
},
host = {schema_def.host_def, description = "Deprecated, use `tcp.host` instead."},
port = {type = "integer", minimum = 0, description = "Deprecated, use `tcp.port` instead."},
tls = {type = "boolean", default = false,
description = "Deprecated, use `tcp.tls` instead."},
tls_server_name = {type = "string",
description = "Deprecated, use `tcp.tls_server_name` instead."},
name = {type = "string", default = plugin_name},
level = {type = "string", default = "WARN", enum = {"STDERR", "EMERG", "ALERT", "CRIT",
"ERR", "ERROR", "WARN", "NOTICE", "INFO", "DEBUG"}},
timeout = {type = "integer", minimum = 1, default = 3},
keepalive = {type = "integer", minimum = 1, default = 30},
batch_max_size = {type = "integer", minimum = 0, default = 1000},
max_retry_count = {type = "integer", minimum = 0, default = 0},
retry_delay = {type = "integer", minimum = 0, default = 1},
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 3},
},
oneOf = {
{required = {"skywalking"}},
{required = {"tcp"}},
{required = {"clickhouse"}},
-- for compatible with old schema
{required = {"host", "port"}}
}
}
local schema = {
type = "object",
}
local log_level = {
STDERR = ngx.STDERR,
EMERG = ngx.EMERG,
ALERT = ngx.ALERT,
CRIT = ngx.CRIT,
ERR = ngx.ERR,
ERROR = ngx.ERR,
WARN = ngx.WARN,
NOTICE = ngx.NOTICE,
INFO = ngx.INFO,
DEBUG = ngx.DEBUG
}
local config = {}
local log_buffer
local _M = {
version = 0.1,
priority = 1091,
name = plugin_name,
schema = schema,
metadata_schema = metadata_schema,
scope = "global",
}
function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
return core.schema.check(schema, conf)
end
local function send_to_tcp_server(data)
local sock, soc_err = tcp()
if not sock then
return false, "failed to init the socket " .. soc_err
end
sock:settimeout(config.timeout * 1000)
local tcp_config = config.tcp
local ok, err = sock:connect(tcp_config.host, tcp_config.port)
if not ok then
return false, "failed to connect the TCP server: host[" .. tcp_config.host
.. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
end
if tcp_config.tls then
ok, err = sock:sslhandshake(false, tcp_config.tls_server_name, false)
if not ok then
sock:close()
return false, "failed to perform TLS handshake to TCP server: host["
.. tcp_config.host .. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
end
end
local bytes, err = sock:send(data)
if not bytes then
sock:close()
return false, "failed to send data to TCP server: host[" .. tcp_config.host
.. "] port[" .. tostring(tcp_config.port) .. "] err: " .. err
end
sock:setkeepalive(config.keepalive * 1000)
return true
end
local function send_to_skywalking(log_message)
local err_msg
local res = true
core.log.info("sending a batch logs to ", config.skywalking.endpoint_addr)
local httpc = http.new()
httpc:set_timeout(config.timeout * 1000)
local entries = {}
for i = 1, #log_message, 2 do
local content = {
service = config.skywalking.service_name,
serviceInstance = config.skywalking.service_instance_name,
endpoint = "",
body = {
text = {
text = log_message[i]
}
}
}
table.insert(entries, content)
end
local httpc_res, httpc_err = httpc:request_uri(
config.skywalking.endpoint_addr,
{
method = "POST",
body = core.json.encode(entries),
keepalive_timeout = config.keepalive * 1000,
headers = {
["Content-Type"] = "application/json",
}
}
)
if not httpc_res then
return false, "error while sending data to skywalking["
.. config.skywalking.endpoint_addr .. "] " .. httpc_err
end
-- some error occurred in the server
if httpc_res.status >= 400 then
res = false
err_msg = string.format(
"server returned status code[%s] skywalking[%s] body[%s]",
httpc_res.status,
config.skywalking.endpoint_addr.endpoint_addr,
httpc_res:read_body()
)
end
return res, err_msg
end
local function send_to_clickhouse(log_message)
local err_msg
local res = true
core.log.info("sending a batch logs to ", config.clickhouse.endpoint_addr)
local httpc = http.new()
httpc:set_timeout(config.timeout * 1000)
local entries = {}
for i = 1, #log_message, 2 do
-- TODO Here save error log as a whole string to clickhouse 'data' column.
-- We will add more columns in the future.
table.insert(entries, core.json.encode({data=log_message[i]}))
end
local httpc_res, httpc_err = httpc:request_uri(
config.clickhouse.endpoint_addr,
{
method = "POST",
body = "INSERT INTO " .. config.clickhouse.logtable .." FORMAT JSONEachRow "
.. table.concat(entries, " "),
keepalive_timeout = config.keepalive * 1000,
headers = {
["Content-Type"] = "application/json",
["X-ClickHouse-User"] = config.clickhouse.user,
["X-ClickHouse-Key"] = config.clickhouse.password,
["X-ClickHouse-Database"] = config.clickhouse.database
}
}
)
if not httpc_res then
return false, "error while sending data to clickhouse["
.. config.clickhouse.endpoint_addr .. "] " .. httpc_err
end
-- some error occurred in the server
if httpc_res.status >= 400 then
res = false
err_msg = string.format(
"server returned status code[%s] clickhouse[%s] body[%s]",
httpc_res.status,
config.clickhouse.endpoint_addr.endpoint_addr,
httpc_res:read_body()
)
end
return res, err_msg
end
local function update_filter(value)
local level = log_level[value.level]
local status, err = errlog.set_filter_level(level)
if not status then
return nil, "failed to set filter level by ngx.errlog, the error is :" .. err
else
core.log.notice("set the filter_level to ", value.level)
end
return value
end
local function send(data)
if config.skywalking then
return send_to_skywalking(data)
elseif config.clickhouse then
return send_to_clickhouse(data)
end
return send_to_tcp_server(data)
end
local function process()
local metadata = plugin.plugin_metadata(plugin_name)
if not (metadata and metadata.value and metadata.modifiedIndex) then
core.log.info("please set the correct plugin_metadata for ", plugin_name)
return
else
local err
config, err = lrucache(plugin_name, metadata.modifiedIndex, update_filter, metadata.value)
if not config then
core.log.warn("set log filter failed for ", err)
return
end
if not (config.tcp or config.skywalking or config.clickhouse) then
config.tcp = {
host = config.host,
port = config.port,
tls = config.tls,
tls_server_name = config.tls_server_name
}
core.log.warn(
string.format("The schema is out of date. Please update to the new configuration, "
.. "for example: {\"tcp\": {\"host\": \"%s\", \"port\": \"%s\"}}",
config.host, config.port
))
end
end
local err_level = log_level[metadata.value.level]
local entries = {}
local logs = errlog.get_logs(9)
while ( logs and #logs>0 ) do
for i = 1, #logs, 3 do
-- There will be some stale error logs after the filter level changed.
-- We should avoid reporting them.
if logs[i] <= err_level then
table.insert(entries, logs[i + 2])
table.insert(entries, "\n")
end
end
logs = errlog.get_logs(9)
end
if #entries == 0 then
return
end
if log_buffer then
for _, v in ipairs(entries) do
log_buffer:push(v)
end
return
end
local config_bat = {
name = config.name,
retry_delay = config.retry_delay,
batch_max_size = config.batch_max_size,
max_retry_count = config.max_retry_count,
buffer_duration = config.buffer_duration,
inactive_timeout = config.inactive_timeout,
}
local err
log_buffer, err = batch_processor:new(send, config_bat)
if not log_buffer then
core.log.warn("error when creating the batch processor: ", err)
return
end
for _, v in ipairs(entries) do
log_buffer:push(v)
end
end
function _M.init()
timers.register_timer("plugin#error-log-logger", process)
end
function _M.destroy()
timers.unregister_timer("plugin#error-log-logger")
end
return _M