blob: d6bcafad11f48bd3ffa8ee7ccccc1e4e9f246e9d [file] [log] [blame]
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
--- Extensible framework to support publish-and-subscribe scenarios
--
-- @module core.pubsub
local log = require("apisix.core.log")
local ws_server = require("resty.websocket.server")
local protoc = require("protoc")
local pb = require("pb")
local setmetatable = setmetatable
local pcall = pcall
local pairs = pairs
local _M = { version = 0.1 }
local mt = { __index = _M }
local pb_state
local function init_pb_state()
-- clear current pb state
local old_pb_state = pb.state(nil)
-- set int64 rule for pubsub module
pb.option("int64_as_string")
-- initialize protoc compiler
protoc.reload()
local pubsub_protoc = protoc.new()
pubsub_protoc:addpath("apisix/include/apisix/model")
local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
if not ok then
pubsub_protoc:reset()
pb.state(old_pb_state)
return "failed to load pubsub protocol: " .. err
end
pb_state = pb.state(old_pb_state)
end
-- parse command name and parameters from client message
local function get_cmd(data)
for key, value in pairs(data) do
-- There are sequence and command properties in the data,
-- select the handler according to the command value.
if key ~= "sequence" then
return key, value
end
end
end
-- send generic response to client
local function send_resp(ws, sequence, data)
data.sequence = sequence
local ok, encoded = pcall(pb.encode, "PubSubResp", data)
if not ok or not encoded then
log.error("failed to encode response message, err: ", encoded)
return
end
local _, err = ws:send_binary(encoded)
if err then
log.error("failed to send response to client, err: ", err)
end
end
-- send error response to client
local function send_error(ws, sequence, err_msg)
return send_resp(ws, sequence, {
error_resp = {
code = 0,
message = err_msg,
},
})
end
---
-- Create pubsub module instance
--
-- @function core.pubsub.new
-- @treturn pubsub module instance
-- @treturn string|nil error message if present
-- @usage
-- local pubsub, err = core.pubsub.new()
function _M.new()
if not pb_state then
local err = init_pb_state()
if err then
return nil, err
end
end
local ws, err = ws_server:new()
if not ws then
return nil, err
end
local obj = setmetatable({
ws_server = ws,
cmd_handler = {},
}, mt)
-- add default ping handler
obj:on("cmd_ping", function (params)
return { pong_resp = params }
end)
return obj
end
---
-- Add command callbacks to pubsub module instances
--
-- The callback function prototype: function (params)
-- The params in the parameters contain the data defined in the requested command.
-- Its first return value is the data, which needs to contain the data needed for
-- the particular resp, returns nil if an error exists.
-- Its second return value is a string type error message, no need to return when
-- no error exists.
--
-- @function core.pubsub.on
-- @tparam string command The command to add callback.
-- @tparam func handler The callback function on receipt of command.
-- @usage
-- pubsub:on(command, function (params)
-- return data, err
-- end)
function _M.on(self, command, handler)
self.cmd_handler[command] = handler
end
---
-- Put the pubsub instance into an event loop, waiting to process client commands
--
-- @function core.pubsub.wait
-- @usage
-- local err = pubsub:wait()
function _M.wait(self)
local fatal_err
local ws = self.ws_server
while true do
-- read raw data frames from websocket connection
local raw_data, raw_type, err = ws:recv_frame()
if err then
-- terminate the event loop when a fatal error occurs
if ws.fatal then
fatal_err = err
break
end
-- skip this loop for non-fatal errors
log.error("failed to receive websocket frame: ", err)
goto continue
end
-- handle client close connection
if raw_type == "close" then
break
end
-- the pubsub messages use binary, if the message is not
-- binary, skip this message
if raw_type ~= "binary" then
log.warn("pubsub server receive non-binary data, type: ",
raw_type, ", data: ", raw_data)
goto continue
end
-- recovery of stored pb_store
pb.state(pb_state)
local data, err = pb.decode("PubSubReq", raw_data)
if not data then
log.error("pubsub server receives undecodable data, err: ", err)
send_error(ws, 0, "wrong command")
goto continue
end
-- command sequence code
local sequence = data.sequence
local cmd, params = get_cmd(data)
if not cmd and not params then
log.warn("pubsub server receives empty command")
goto continue
end
-- find the handler for the current command
local handler = self.cmd_handler[cmd]
if not handler then
log.error("pubsub callback handler not registered for the",
" command, command: ", cmd)
send_error(ws, sequence, "unknown command")
goto continue
end
-- call command handler to generate response data
local resp, err = handler(params)
if not resp then
send_error(ws, sequence, err)
goto continue
end
send_resp(ws, sequence, resp)
::continue::
end
if fatal_err then
log.error("fatal error in pubsub websocket server, err: ", fatal_err)
end
ws:send_close()
end
return _M