blob: 755c018c549631149ec8fc3099be256e39c000f5 [file] [log] [blame]
-module(couch_dbupdates_httpd).
-export([handle_req/1]).
-include_lib("couch/include/couch_db.hrl").
-record(st, {
resp,
feed,
heartbeat,
timeout
}).
handle_req(#httpd{method='GET'}=Req) ->
ok = couch_httpd:verify_is_server_admin(Req),
Qs = couch_httpd:qs(Req),
Feed = proplists:get_value("feed", Qs, "longpoll"),
Timeout = list_to_integer(proplists:get_value("timeout", Qs, "60000")),
Heartbeat0 = proplists:get_value("heartbeat", Qs),
Heartbeat = case {Feed, Heartbeat0} of
{"longpoll", _} -> false;
{_, "false"} -> false;
_ -> true
end,
{ok, Resp} = case Feed of
"eventsource" ->
Headers = [
{"Content-Type", "text/event-stream"},
{"Cache-Control", "no-cache"}
],
couch_httpd:start_json_response(Req, 200, Headers);
_ ->
couch_httpd:start_json_response(Req, 200)
end,
St1 = #st{
resp = Resp,
feed = Feed,
heartbeat = Heartbeat,
timeout = Timeout
},
{ok, St2} = run(St1, Timeout),
{ok, St2#st.resp};
handle_req(Req) ->
couch_httpd:send_method_not_allowed(Req, "GET").
run(St, Timeout) ->
ok = couch_event:register_all(self()),
try
loop(St, Timeout)
after
ok = couch_event:unregister(self()),
drain_events()
end.
loop(Timeout, St) ->
Event = receive
{'$couch_event', _DbName, {index_commit, _}} ->
loop(Timeout, St);
{'$couch_event', DbName, Ev} ->
{DbName, Ev}
after Timeout ->
timeout
end,
case handle_update(Event, St) of
{ok, NewSt} ->
loop(Timeout, NewSt);
{stop, NewSt} ->
{ok, NewSt}
end.
drain_events() ->
receive
{'$couch_event', _, _} ->
drain_events()
after 0 ->
ok
end.
handle_update(timeout, #st{heartbeat=true}=St) ->
{ok, Resp1} = couch_httpd:send_chunk(St#st.resp, "\n"),
{ok, St#st{resp=Resp1}};
handle_update(timeout, #st{heartbeat=false}=St) ->
{ok, Resp1} = couch_httpd:end_json_response(St#st.resp),
{stop, St#st{resp=Resp1}};
handle_update(Event, #st{feed="eventsource"}=St) ->
Chunk = ["data: ", ?JSON_ENCODE(event_obj(Event)), "\n\n"],
{ok, Resp1} = couch_httpd:send_chunk(St#st.resp, Chunk),
{ok, St#st{resp=Resp1}};
handle_update(Event, #st{feed="continuous"}=St) ->
Chunk = [?JSON_ENCODE(event_obj(Event)), "\n"],
{ok, Resp1} = couch_httpd:send_chunk(St#st.resp, Chunk),
{ok, St#st{resp=Resp1}};
handle_update(Event, #st{feed="longpoll"}=St) ->
{Props} = event_obj(Event),
JsonObj = {[{<<"ok">>, true} | Props]},
{ok, Resp1} = couch_httpd:send_chunk(St#st.resp, ?JSON_ENCODE(JsonObj)),
{ok, Resp2} = couch_httpd:end_json_response(Resp1),
{stop, St#st{resp=Resp2}}.
event_obj({DbName, Event}) when is_atom(Event) ->
{[
{<<"type">>, couch_util:to_binary(Event)},
{<<"db_name">>, couch_util:to_binary(DbName)}
]};
event_obj({DbName, {ddoc_updated, DDocId}}) ->
{[
{<<"type">>, <<"ddoc_updated">>},
{<<"db_name">>, couch_util:to_binary(DbName)},
{<<"ddoc_id">>, couch_util:to_binary(DDocId)}
]}.