--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements.  See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License.  You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

--- Extensible framework to support publish-and-subscribe scenarios
--
-- @module core.pubsub

local log          = require("apisix.core.log")
local ws_server    = require("resty.websocket.server")
local protoc       = require("protoc")
local pb           = require("pb")
local setmetatable = setmetatable
local pcall        = pcall
local pairs        = pairs


local _M = { version = 0.1 }
local mt = { __index = _M }

local pb_state
local function init_pb_state()
    -- clear current pb state
    local old_pb_state = pb.state(nil)

    -- set int64 rule for pubsub module
    pb.option("int64_as_string")

    -- initialize protoc compiler
    protoc.reload()
    local pubsub_protoc = protoc.new()
    pubsub_protoc:addpath("apisix/include/apisix/model")
    local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
    if not ok then
        pubsub_protoc:reset()
        pb.state(old_pb_state)
        return "failed to load pubsub protocol: " .. err
    end

    pb_state = pb.state(old_pb_state)
end


-- parse command name and parameters from client message
local function get_cmd(data)
    for key, value in pairs(data) do
        -- There are sequence and command properties in the data,
        -- select the handler according to the command value.
        if key ~= "sequence" then
            return key, value
        end
    end
end


-- send generic response to client
local function send_resp(ws, sequence, data)
    data.sequence = sequence
    local ok, encoded = pcall(pb.encode, "PubSubResp", data)
    if not ok or not encoded then
        log.error("failed to encode response message, err: ", encoded)
        return
    end

    local _, err = ws:send_binary(encoded)
    if err then
        log.error("failed to send response to client, err: ", err)
    end
end


-- send error response to client
local function send_error(ws, sequence, err_msg)
    return send_resp(ws, sequence, {
        error_resp = {
            code = 0,
            message = err_msg,
        },
    })
end


---
-- Create pubsub module instance
--
-- @function core.pubsub.new
-- @treturn pubsub module instance
-- @treturn string|nil error message if present
-- @usage
-- local pubsub, err = core.pubsub.new()
function _M.new()
    if not pb_state then
        local err = init_pb_state()
        if err then
            return nil, err
        end
    end

    local ws, err = ws_server:new()
    if not ws then
        return nil, err
    end

    local obj = setmetatable({
        ws_server = ws,
        cmd_handler = {},
    }, mt)

    -- add default ping handler
    obj:on("cmd_ping", function (params)
        return { pong_resp = params }
    end)

    return obj
end


---
-- Add command callbacks to pubsub module instances
--
-- The callback function prototype: function (params)
-- The params in the parameters contain the data defined in the requested command.
-- Its first return value is the data, which needs to contain the data needed for
-- the particular resp, returns nil if an error exists.
-- Its second return value is a string type error message, no need to return when
-- no error exists.
--
-- @function core.pubsub.on
-- @tparam string command The command to add callback.
-- @tparam func handler The callback function on receipt of command.
-- @usage
-- pubsub:on(command, function (params)
--     return data, err
-- end)
function _M.on(self, command, handler)
    self.cmd_handler[command] = handler
end


---
-- Put the pubsub instance into an event loop, waiting to process client commands
--
-- @function core.pubsub.wait
-- @usage
-- local err = pubsub:wait()
function _M.wait(self)
    local fatal_err
    local ws = self.ws_server
    while true do
        -- read raw data frames from websocket connection
        local raw_data, raw_type, err = ws:recv_frame()
        if err then
            -- terminate the event loop when a fatal error occurs
            if ws.fatal then
                fatal_err = err
                break
            end

            -- skip this loop for non-fatal errors
            log.error("failed to receive websocket frame: ", err)
            goto continue
        end

        -- handle client close connection
        if raw_type == "close" then
            break
        end

        -- the pubsub messages use binary, if the message is not
        -- binary, skip this message
        if raw_type ~= "binary" then
            log.warn("pubsub server receive non-binary data, type: ",
                raw_type, ", data: ", raw_data)
            goto continue
        end

        -- recovery of stored pb_store
        pb.state(pb_state)

        local data, err = pb.decode("PubSubReq", raw_data)
        if not data then
            log.error("pubsub server receives undecodable data, err: ", err)
            send_error(ws, 0, "wrong command")
            goto continue
        end

        -- command sequence code
        local sequence = data.sequence

        local cmd, params = get_cmd(data)
        if not cmd and not params then
            log.warn("pubsub server receives empty command")
            goto continue
        end

        -- find the handler for the current command
        local handler = self.cmd_handler[cmd]
        if not handler then
            log.error("pubsub callback handler not registered for the",
                " command, command: ", cmd)
            send_error(ws, sequence, "unknown command")
            goto continue
        end

        -- call command handler to generate response data
        local resp, err = handler(params)
        if not resp then
            send_error(ws, sequence, err)
            goto continue
        end
        send_resp(ws, sequence, resp)

        ::continue::
    end

    if fatal_err then
        log.error("fatal error in pubsub websocket server, err: ", fatal_err)
    end
    ws:send_close()
end


return _M
