WIP websocket support for changes feed
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index e2de301..791ee86 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -66,7 +66,8 @@
     chunks_sent = 0,
     buffer = [],
     bufsize = 0,
-    threshold
+    threshold,
+    websocket
 }).
 
 -define(IS_ALL_DOCS(T),
@@ -149,7 +150,8 @@
             Acc0 = #cacc{
                 feed = list_to_atom(Feed),
                 mochi = Req,
-                threshold = Max
+                threshold = Max,
+                websocket = websocket_upgrade_requested(Req)
             },
             try
                 fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
@@ -161,6 +163,27 @@
             throw({bad_request, Msg})
     end.
 
+% websocket goop
+changes_callback(start, #cacc{feed = continuous, websocket = true} = Acc) ->
+    {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(
+        Acc#cacc.mochi, fun client_ws_loop/3),
+    {ok, Acc#cacc{mochi = undefined, websocket = {ws, ReentryWs, ReplyChannel}}};
+changes_callback({change, Change}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
+    chttpd_stats:incr_rows(),
+    ReplyChannel(?JSON_ENCODE(Change)),
+    {ok, Acc};
+changes_callback(timeout, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
+    ReplyChannel("\n"),
+    {ok, Acc};
+changes_callback({stop, EndSeq, Pending}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
+    Row =
+        {[
+            {<<"last_seq">>, EndSeq},
+            {<<"pending">>, Pending}
+        ]},
+    ReplyChannel(?JSON_ENCODE(Row)),
+    {stop, Acc};
+
 % callbacks for continuous feed (newline-delimited JSON Objects)
 changes_callback(start, #cacc{feed = continuous} = Acc) ->
     {ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200),
@@ -2557,6 +2580,13 @@
 bulk_get_rev_error([{Pos, RevId} = Rev]) when is_integer(Pos), is_binary(RevId) ->
     couch_doc:rev_to_str(Rev).
 
+websocket_upgrade_requested(#httpd{} = Req) ->
+    Upgrade = chttpd:header_value(Req, "Upgrade"),
+    Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".
+
+client_ws_loop(_Payload, Broadcaster, _ReplyChannel) ->
+    Broadcaster.
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").