WIP websocket support for changes feed
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index e2de301..791ee86 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -66,7 +66,8 @@
chunks_sent = 0,
buffer = [],
bufsize = 0,
- threshold
+ threshold,
+ websocket
}).
-define(IS_ALL_DOCS(T),
@@ -149,7 +150,8 @@
Acc0 = #cacc{
feed = list_to_atom(Feed),
mochi = Req,
- threshold = Max
+ threshold = Max,
+ websocket = websocket_upgrade_requested(Req)
},
try
fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
@@ -161,6 +163,27 @@
throw({bad_request, Msg})
end.
+% websocket goop
+changes_callback(start, #cacc{feed = continuous, websocket = true} = Acc) ->
+ {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(
+ Acc#cacc.mochi, fun client_ws_loop/3),
+ {ok, Acc#cacc{mochi = undefined, websocket = {ws, ReentryWs, ReplyChannel}}};
+changes_callback({change, Change}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
+ chttpd_stats:incr_rows(),
+ ReplyChannel(?JSON_ENCODE(Change)),
+ {ok, Acc};
+changes_callback(timeout, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
+ ReplyChannel("\n"),
+ {ok, Acc};
+changes_callback({stop, EndSeq, Pending}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
+ Row =
+ {[
+ {<<"last_seq">>, EndSeq},
+ {<<"pending">>, Pending}
+ ]},
+ ReplyChannel(?JSON_ENCODE(Row)),
+ {stop, Acc};
+
% callbacks for continuous feed (newline-delimited JSON Objects)
changes_callback(start, #cacc{feed = continuous} = Acc) ->
{ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200),
@@ -2557,6 +2580,13 @@
bulk_get_rev_error([{Pos, RevId} = Rev]) when is_integer(Pos), is_binary(RevId) ->
couch_doc:rev_to_str(Rev).
+websocket_upgrade_requested(#httpd{} = Req) ->
+ Upgrade = chttpd:header_value(Req, "Upgrade"),
+ Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
+
+client_ws_loop(_Payload, Broadcaster, _ReplyChannel) ->
+ Broadcaster.
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").