support Websocket protocol for continuous response

This isn't full Websocket protocol as mochiweb doesn't have
the plumbing but it's still useful.

The endpoint will not return a pong if the client sends a ping
but if the heartbeat parameter is used the server will send pongs
at that right as a keep-alive.

If the server encounters an error it sends a Close message with
the reason before closing the socket.
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 791ee86..1681ee2 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -163,18 +163,28 @@
             throw({bad_request, Msg})
     end.
 
-% websocket goop
+% websocket callbacks
 changes_callback(start, #cacc{feed = continuous, websocket = true} = Acc) ->
     {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(
-        Acc#cacc.mochi, fun client_ws_loop/3),
-    {ok, Acc#cacc{mochi = undefined, websocket = {ws, ReentryWs, ReplyChannel}}};
+        Acc#cacc.mochi#httpd.mochi_req, fun client_ws_loop/3),
+    {ok, Acc#cacc{websocket = {ws, ReentryWs, ReplyChannel}}};
 changes_callback({change, Change}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
     chttpd_stats:incr_rows(),
     ReplyChannel(?JSON_ENCODE(Change)),
     {ok, Acc};
-changes_callback(timeout, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
-    ReplyChannel("\n"),
+changes_callback(timeout, #cacc{websocket = {ws, _ReentryWs, _ReplyChannel}} = Acc) ->
+    ws_pong(Acc),
     {ok, Acc};
+changes_callback(waiting_for_updates, #cacc{websocket = {ws, _ReentryWs, _ReplyChannel}} = Acc) ->
+    {ok, Acc};
+changes_callback({error, Reason}, #cacc{websocket = {ws, _ReentryWs, _ReplyChannel}} = Acc) ->
+    {_Code, ErrorStr, ReasonStr} = chttpd:error_info(Reason),
+    Row =
+        {[
+            {<<"error">>, ErrorStr},
+            {<<"reason">>, ReasonStr}
+         ]},
+    ws_close(Acc, 1011, ?JSON_ENCODE(Row));
 changes_callback({stop, EndSeq, Pending}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
     Row =
         {[
@@ -182,7 +192,7 @@
             {<<"pending">>, Pending}
         ]},
     ReplyChannel(?JSON_ENCODE(Row)),
-    {stop, Acc};
+    ws_close(Acc, 1000);
 
 % callbacks for continuous feed (newline-delimited JSON Objects)
 changes_callback(start, #cacc{feed = continuous} = Acc) ->
@@ -2584,8 +2594,50 @@
     Upgrade = chttpd:header_value(Req, "Upgrade"),
     Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
 
-client_ws_loop(_Payload, Broadcaster, _ReplyChannel) ->
-    Broadcaster.
+client_ws_loop(_Payload, State, _ReplyChannel) ->
+    State.
+
+ws_pong(Req) ->
+    Socket = ws_socket(Req),
+    mochiweb_socket:send(Socket, <<1:1, 0:3, 10:4, 0:8>>).
+
+ws_close(Req, Code) ->
+    ws_close(Req, Code, <<>>).
+
+ws_close(Req, Code, Payload) when is_integer(Code), is_binary(Payload)  ->
+    Socket = ws_socket(Req),
+    Prefix = <<1:1, 0:3, 8:4, (ws_payload_length(iolist_size(Payload) + 2))/binary>>,
+    mochiweb_socket:send(Socket, [Prefix, <<Code:16>>, Payload]),
+    ws_shutdown(Socket, write),
+    ws_drain(Socket),
+    mochiweb_socket:close(Socket),
+    exit({shutdown, websocket_close}).
+
+ws_drain(Socket) ->
+    Result = gen_tcp:recv(Socket, 8192),
+    case Result of
+        {ok, _Data} ->
+            ws_drain(socket);
+        {error, closed} ->
+            ok
+    end.
+
+ws_socket(#cacc{} = Acc) ->
+    ws_socket(Acc#cacc.mochi#httpd.mochi_req);
+ws_socket({ReqM, _} = Req) ->
+    ReqM:get(socket, Req).
+
+ws_shutdown({ssl, Socket}, How) ->
+    gen_tcp:shutdown(Socket, How);
+ws_shutdown(Socket, How) ->
+    gen_tcp:shutdown(Socket, How).
+
+ws_payload_length(N) ->
+    case N of
+      N when N =< 125 -> <<N>>;
+      N when N =< 65535 -> <<126, N:16>>;
+      N when N =< 9223372036854775807 -> <<127, N:64>>
+    end.
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").