blob: 80200394d132c8dfe6a36a2477a6d25d53c73a1c [file] [log] [blame]
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local plugin = require("apisix.plugin")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local log_util = require("apisix.utils.log-util")
local path = require("pl.path")
local http = require("resty.http")
local ngx = ngx
local tostring = tostring
local pairs = pairs
local tab_concat = table.concat
local udp = ngx.socket.udp
local plugin_name = "loggly"
local batch_processor_manager = bp_manager_mod.new(plugin_name)
local severity = {
EMEGR = 0, -- system is unusable
ALERT = 1, -- action must be taken immediately
CRIT = 2, -- critical conditions
ERR = 3, -- error conditions
WARNING = 4, -- warning conditions
NOTICE = 5, -- normal but significant condition
INFO = 6, -- informational
DEBUG = 7, -- debug-level messages
}
local severity_enums = {}
do
for k, _ in pairs(severity) do
severity_enums[#severity_enums+1] = k
severity_enums[#severity_enums+1] = k:lower()
end
end
local schema = {
type = "object",
properties = {
customer_token = {type = "string"},
severity = {
type = "string",
default = "INFO",
enum = severity_enums,
description = "base severity log level",
},
include_req_body = {type = "boolean", default = false},
include_resp_body = {type = "boolean", default = false},
include_resp_body_expr = {
type = "array",
minItems = 1,
items = {
type = "array"
}
},
tags = {
type = "array",
minItems = 1,
items = {
type = "string",
-- we prevent of having `tag=` prefix
pattern = "^(?!tag=)[ -~]*",
},
default = {"apisix"}
},
ssl_verify = {
-- applicable for https protocol
type = "boolean",
default = true
},
severity_map = {
type = "object",
description = "upstream response code vs syslog severity mapping",
patternProperties = {
["^[1-5][0-9]{2}$"] = {
description = "keys are HTTP status code, values are severity",
type = "string",
enum = severity_enums
},
},
additionalProperties = false
}
},
required = {"customer_token"}
}
local defaults = {
host = "logs-01.loggly.com",
port = 514,
protocol = "syslog",
timeout = 5000
}
local metadata_schema = {
type = "object",
properties = {
host = {
type = "string",
default = defaults.host
},
port = {
type = "integer",
default = defaults.port
},
protocol = {
type = "string",
default = defaults.protocol,
-- in case of http and https, we use bulk endpoints
enum = {"syslog", "http", "https"}
},
timeout = {
type = "integer",
minimum = 1,
default= defaults.timeout
},
log_format = {
type = "object",
}
}
}
local _M = {
version = 0.1,
priority = 411,
name = plugin_name,
schema = batch_processor_manager:wrap_schema(schema),
metadata_schema = metadata_schema
}
function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
local ok, err = core.schema.check(schema, conf)
if not ok then
return nil, err
end
if conf.severity_map then
local cache = {}
for k, v in pairs(conf.severity_map) do
cache[k] = severity[v:upper()]
end
conf._severity_cache = cache
end
return log_util.check_log_schema(conf)
end
function _M.body_filter(conf, ctx)
log_util.collect_body(conf, ctx)
end
local function generate_log_message(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
local entry
if metadata and metadata.value.log_format
and core.table.nkeys(metadata.value.log_format) > 0
then
entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
else
entry = log_util.get_full_log(ngx, conf)
end
local json_str, err = core.json.encode(entry)
if not json_str then
core.log.error('error occurred while encoding the data: ', err)
return nil
end
if metadata.value.protocol ~= "syslog" then
return json_str
end
-- generate rfc5424 compliant syslog event
local timestamp = log_util.get_rfc3339_zulu_timestamp()
local taglist = {}
if conf.tags then
for i = 1, #conf.tags do
core.table.insert(taglist, "tag=\"" .. conf.tags[i] .. "\"")
end
end
local message_severity = severity[conf.severity:upper()]
if conf._severity_cache and conf._severity_cache[tostring(ngx.status)] then
message_severity = conf._severity_cache[tostring(ngx.status)]
end
local message = {
-- facility LOG_USER - random user level message
"<".. tostring(8 + message_severity) .. ">1",-- <PRIVAL>1
timestamp, -- timestamp
ctx.var.host or "-", -- hostname
"apisix", -- appname
ctx.var.pid, -- proc-id
"-", -- msgid
"[" .. conf.customer_token .. "@41058 " .. tab_concat(taglist, " ") .. "]",
json_str
}
return tab_concat(message, " ")
end
local function send_data_over_udp(message, metadata)
local err_msg
local res = true
local sock = udp()
local host, port = metadata.value.host, metadata.value.port
sock:settimeout(metadata.value.timeout)
local ok, err = sock:setpeername(host, port)
if not ok then
core.log.error("failed to send log: ", err)
return false, "failed to connect to UDP server: host[" .. host
.. "] port[" .. tostring(port) .. "] err: " .. err
end
ok, err = sock:send(message)
if not ok then
res = false
core.log.error("failed to send log: ", err)
err_msg = "failed to send data to UDP server: host[" .. host
.. "] port[" .. tostring(port) .. "] err:" .. err
end
ok, err = sock:close()
if not ok then
core.log.error("failed to close the UDP connection, host[",
host, "] port[", port, "] ", err)
end
return res, err_msg
end
local function send_bulk_over_http(message, metadata, conf)
local endpoint = path.join(metadata.value.host, "bulk", conf.customer_token, "tag", "bulk")
local has_prefix = core.string.has_prefix(metadata.value.host, "http")
if not has_prefix then
if metadata.value.protocol == "http" then
endpoint = "http://" .. endpoint
else
endpoint = "https://" .. endpoint
end
end
local httpc = http.new()
httpc:set_timeout(metadata.value.timeout)
local res, err = httpc:request_uri(endpoint, {
ssl_verify = conf.ssl_verify,
method = "POST",
body = message,
headers = {
["Content-Type"] = "application/json",
["X-LOGGLY-TAG"] = conf.tags
},
})
if not res then
return false, "failed to write log to loggly, " .. err
end
if res.status ~= 200 then
local body = core.json.decode(res.body)
if not body then
return false, "failed to send log to loggly, http status code: " .. res.status
else
return false, "failed to send log to loggly, http status code: " .. res.status
.. " response body: ".. res.body
end
end
return true
end
local handle_http_payload
local function handle_log(entries)
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))
if not metadata then
core.log.info("received nil metadata: using metadata defaults: ",
core.json.delay_encode(defaults, true))
metadata = {}
metadata.value = defaults
end
core.log.info("sending a batch logs to ", metadata.value.host)
if metadata.value.protocol == "syslog" then
for i = 1, #entries do
local ok, err = send_data_over_udp(entries[i], metadata)
if not ok then
return false, err, i
end
end
else
return handle_http_payload(entries, metadata)
end
return true
end
function _M.log(conf, ctx)
local log_data = generate_log_message(conf, ctx)
if not log_data then
return
end
handle_http_payload = function (entries, metadata)
-- loggly bulk endpoint expects entries concatenated in newline("\n")
local message = tab_concat(entries, "\n")
return send_bulk_over_http(message, metadata, conf)
end
if batch_processor_manager:add_entry(conf, log_data) then
return
end
batch_processor_manager:add_entry_to_new_processor(conf, log_data, ctx, handle_log)
end
return _M