support Websocket protocol for continuous response
This isn't full Websocket protocol as mochiweb doesn't have
the plumbing but it's still useful.
The endpoint will not return a pong if the client sends a ping
but if the heartbeat parameter is used the server will send pongs
at that right as a keep-alive.
If the server encounters an error it sends a Close message with
the reason before closing the socket.
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 791ee86..1681ee2 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -163,18 +163,28 @@
throw({bad_request, Msg})
end.
-% websocket goop
+% websocket callbacks
changes_callback(start, #cacc{feed = continuous, websocket = true} = Acc) ->
{ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(
- Acc#cacc.mochi, fun client_ws_loop/3),
- {ok, Acc#cacc{mochi = undefined, websocket = {ws, ReentryWs, ReplyChannel}}};
+ Acc#cacc.mochi#httpd.mochi_req, fun client_ws_loop/3),
+ {ok, Acc#cacc{websocket = {ws, ReentryWs, ReplyChannel}}};
changes_callback({change, Change}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
chttpd_stats:incr_rows(),
ReplyChannel(?JSON_ENCODE(Change)),
{ok, Acc};
-changes_callback(timeout, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
- ReplyChannel("\n"),
+changes_callback(timeout, #cacc{websocket = {ws, _ReentryWs, _ReplyChannel}} = Acc) ->
+ ws_pong(Acc),
{ok, Acc};
+changes_callback(waiting_for_updates, #cacc{websocket = {ws, _ReentryWs, _ReplyChannel}} = Acc) ->
+ {ok, Acc};
+changes_callback({error, Reason}, #cacc{websocket = {ws, _ReentryWs, _ReplyChannel}} = Acc) ->
+ {_Code, ErrorStr, ReasonStr} = chttpd:error_info(Reason),
+ Row =
+ {[
+ {<<"error">>, ErrorStr},
+ {<<"reason">>, ReasonStr}
+ ]},
+ ws_close(Acc, 1011, ?JSON_ENCODE(Row));
changes_callback({stop, EndSeq, Pending}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
Row =
{[
@@ -182,7 +192,7 @@
{<<"pending">>, Pending}
]},
ReplyChannel(?JSON_ENCODE(Row)),
- {stop, Acc};
+ ws_close(Acc, 1000);
% callbacks for continuous feed (newline-delimited JSON Objects)
changes_callback(start, #cacc{feed = continuous} = Acc) ->
@@ -2584,8 +2594,50 @@
Upgrade = chttpd:header_value(Req, "Upgrade"),
Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
-client_ws_loop(_Payload, Broadcaster, _ReplyChannel) ->
- Broadcaster.
+client_ws_loop(_Payload, State, _ReplyChannel) ->
+ State.
+
+ws_pong(Req) ->
+ Socket = ws_socket(Req),
+ mochiweb_socket:send(Socket, <<1:1, 0:3, 10:4, 0:8>>).
+
+ws_close(Req, Code) ->
+ ws_close(Req, Code, <<>>).
+
+ws_close(Req, Code, Payload) when is_integer(Code), is_binary(Payload) ->
+ Socket = ws_socket(Req),
+ Prefix = <<1:1, 0:3, 8:4, (ws_payload_length(iolist_size(Payload) + 2))/binary>>,
+ mochiweb_socket:send(Socket, [Prefix, <<Code:16>>, Payload]),
+ ws_shutdown(Socket, write),
+ ws_drain(Socket),
+ mochiweb_socket:close(Socket),
+ exit({shutdown, websocket_close}).
+
+ws_drain(Socket) ->
+ Result = gen_tcp:recv(Socket, 8192),
+ case Result of
+ {ok, _Data} ->
+ ws_drain(socket);
+ {error, closed} ->
+ ok
+ end.
+
+ws_socket(#cacc{} = Acc) ->
+ ws_socket(Acc#cacc.mochi#httpd.mochi_req);
+ws_socket({ReqM, _} = Req) ->
+ ReqM:get(socket, Req).
+
+ws_shutdown({ssl, Socket}, How) ->
+ gen_tcp:shutdown(Socket, How);
+ws_shutdown(Socket, How) ->
+ gen_tcp:shutdown(Socket, How).
+
+ws_payload_length(N) ->
+ case N of
+ N when N =< 125 -> <<N>>;
+ N when N =< 65535 -> <<126, N:16>>;
+ N when N =< 9223372036854775807 -> <<127, N:64>>
+ end.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").