add multi view queries support
This changes implement COUCHDB-523 with support of the multiview query
for the view changes and the replication using a view.
You can pass multiple queries to a view using the following format:
{"queries": [
{ "startkey": "a", "endkey": "c" },
....
]}
diff --git a/src/couch_mrview.erl b/src/couch_mrview.erl
index a01d812..dc6ece1 100644
--- a/src/couch_mrview.erl
+++ b/src/couch_mrview.erl
@@ -529,7 +529,7 @@
is_key_byseq(Options) ->
lists:any(fun({K, _}) ->
lists:member(K, [start_key, end_key, start_key_docid,
- end_key_docid, keys])
+ end_key_docid, keys, queries])
end, Options).
make_view_changes_args(Options) ->
diff --git a/src/couch_mrview_changes.erl b/src/couch_mrview_changes.erl
index 735ded8..0912949 100644
--- a/src/couch_mrview_changes.erl
+++ b/src/couch_mrview_changes.erl
@@ -20,6 +20,7 @@
ddoc,
view,
view_options,
+ queries,
since,
callback,
acc,
@@ -49,12 +50,14 @@
Since = proplists:get_value(since, Options, 0),
Stream = proplists:get_value(stream, Options, false),
ViewOptions = proplists:get_value(view_options, Options, []),
+ Queries = proplists:get_value(queries, Options),
Refresh = proplists:get_value(refresh, Options, false),
State0 = #vst{dbname=DbName,
ddoc=DDocId,
view=View,
view_options=ViewOptions,
+ queries=Queries,
since=Since,
callback=Fun,
acc=Acc},
@@ -142,8 +145,8 @@
{UserTimeout, Timeout, Heartbeat}.
view_changes_since(#vst{dbname=DbName, ddoc=DDocId, view=View,
- view_options=Options, since=Since,
- callback=Callback, acc=UserAcc}=State) ->
+ view_options=ViewOptions, queries=Queries,
+ since=Since, callback=Callback, acc=UserAcc}=State) ->
Wrapper = fun ({{Seq, _Key, _DocId}, _Val}=KV, {_Go, Acc2, OldSeq}) ->
LastSeq = if OldSeq < Seq -> Seq;
true -> OldSeq
@@ -154,14 +157,41 @@
end,
Acc0 = {ok, UserAcc, Since},
- case couch_mrview:view_changes_since(DbName, DDocId, View, Since,
- Wrapper, Options, Acc0) of
+ Res = case {Queries, ViewOptions} of
+ {Queries, []} when is_list(Queries) ->
+ Args = {DbName, DDocId, View, Wrapper, Since},
+ multi_view_changes(Queries, Args, Acc0);
+ {undefined, ViewOptions} when is_list(ViewOptions) ->
+ couch_mrview:view_changes_since(DbName, DDocId, View, Since,
+ Wrapper, ViewOptions, Acc0);
+ {[], []} ->
+ couch_mrview:view_changes_since(DbName, DDocId, View, Since,
+ Wrapper, [], Acc0);
+ _ ->
+ {error, badarg}
+ end,
+
+ case Res of
{ok, {Go, UserAcc2, Since2}}->
{Go, State#vst{since=Since2, acc=UserAcc2}};
Error ->
Error
end.
+multi_view_changes([], _Args, Acc) ->
+ {ok, Acc};
+multi_view_changes([Options | Rest], {DbName, DDocId, View, Wrapper, Since}=Args,
+ Acc) ->
+ case couch_mrview:view_changes_since(DbName, DDocId, View, Since,
+ Wrapper, Options, Acc) of
+ {ok, {stop, _UserAcc2, _Since2}=Acc2} ->
+ {ok, Acc2};
+ {ok, Acc2} ->
+ multi_view_changes(Rest, Args, Acc2);
+ Error ->
+ Error
+ end.
+
index_update_notifier(#db{name=DbName}, DDocId) ->
index_update_notifier(DbName, DDocId);
index_update_notifier(DbName, DDocId) ->
diff --git a/src/couch_mrview_http.erl b/src/couch_mrview_http.erl
index bd4d9c5..44153fd 100644
--- a/src/couch_mrview_http.erl
+++ b/src/couch_mrview_http.erl
@@ -36,7 +36,8 @@
req,
resp,
prepend,
- etag
+ etag,
+ should_close=false
}).
@@ -50,7 +51,7 @@
handle_reindex_req(#httpd{method='POST',
path_parts=[_, _, DName,<<"_reindex">>]}=Req,
- Db, DDoc) ->
+ Db, _DDoc) ->
ok = couch_db:check_is_admin(Db),
couch_mrview:trigger_update(Db, <<"_design/", DName/binary>>),
couch_httpd:send_json(Req, 201, {[{<<"ok">>, true}]});
@@ -75,9 +76,25 @@
design_doc_view(Req, Db, DDoc, ViewName, undefined);
handle_view_req(#httpd{method='POST'}=Req, Db, DDoc) ->
[_, _, _, _, ViewName] = Req#httpd.path_parts,
- Keys = get_view_keys(couch_httpd:json_body_obj(Req)),
- couch_stats_collector:increment({httpd, view_reads}),
- design_doc_view(Req, Db, DDoc, ViewName, Keys);
+ Props = couch_httpd:json_body_obj(Req),
+ Keys = get_view_keys(Props),
+ Queries = get_view_queries(Props),
+ case {Queries, Keys} of
+ {Queries, undefined} ->
+ [couch_stats_collector:increment({httpd, view_reads})
+ || _I <- Queries],
+ multi_query_view(Req, Db, DDoc, ViewName, Queries);
+ {undefined, Keys} ->
+ couch_stats_collector:increment({httpd, view_reads}),
+ design_doc_view(Req, Db, DDoc, ViewName, Keys);
+ {undefined, undefined} ->
+ throw({
+ bad_request,
+ "POST body must contain `keys` or `queries` field"
+ });
+ {_, _} ->
+ throw({bad_request, "`keys` and `queries` are mutually exclusive"})
+ end;
handle_view_req(Req, _Db, _DDoc) ->
couch_httpd:send_method_not_allowed(Req, "GET,POST,HEAD").
@@ -218,6 +235,37 @@
_ -> {ok, Resp}
end.
+multi_query_view(Req, Db, DDoc, ViewName, Queries) ->
+ Args0 = parse_qs(Req, undefined),
+ {ok, _, _, Args1} = couch_mrview_util:get_view(Db, DDoc, ViewName, Args0),
+ ArgQueries = lists:map(fun({Query}) ->
+ QueryArg = parse_qs(Query, undefined, Args1, true),
+ couch_mrview_util:validate_args(QueryArg)
+ end, Queries),
+ {ok, Resp2} = couch_httpd:etag_maybe(Req, fun() ->
+ VAcc0 = #vacc{db=Db, req=Req, prepend="\r\n"},
+ Etag = couch_uuids:new(),
+ Headers = [{"ETag", Etag}],
+ FirstChunk = "{\"results\":[",
+ {ok, Resp} = couch_httpd:start_json_response(Req, 200, Headers),
+ couch_httpd:send_chunk(Resp, FirstChunk),
+ VAcc1 = VAcc0#vacc{resp=Resp},
+ VAcc2 = lists:foldl(fun(Args, Acc0) ->
+ {ok, Acc1} = couch_mrview:query_view(
+ Db, DDoc, ViewName, Args,
+ fun view_cb/2, Acc0),
+ Acc1
+ end, VAcc1, ArgQueries),
+ couch_httpd:send_chunk(VAcc2#vacc.resp, "\r\n]}"),
+ {ok, Resp2} = couch_httpd:end_json_response(VAcc2#vacc.resp),
+ {ok, VAcc2#vacc{resp=Resp2}}
+ end),
+
+ case is_record(Resp2, vacc) of
+ true -> {ok, Resp2#vacc.resp};
+ _ -> {ok, Resp2}
+ end.
+
filtered_view_cb({row, Row0}, Acc) ->
Row1 = lists:map(fun({doc, null}) ->
@@ -236,6 +284,8 @@
view_cb({meta, Meta}, #vacc{resp=undefined}=Acc) ->
Headers = [{"ETag", Acc#vacc.etag}],
{ok, Resp} = couch_httpd:start_json_response(Acc#vacc.req, 200, Headers),
+ view_cb({meta, Meta}, Acc#vacc{resp=Resp, should_close=true});
+view_cb({meta, Meta}, #vacc{resp=Resp}=Acc) ->
% Map function starting
Parts = case couch_util:get_value(total, Meta) of
undefined -> [];
@@ -264,13 +314,19 @@
% Nothing in view
{ok, Resp} = couch_httpd:send_json(Acc#vacc.req, 200, {[{rows, []}]}),
{ok, Acc#vacc{resp=Resp}};
-view_cb(complete, Acc) ->
- % Finish view output
- couch_httpd:send_chunk(Acc#vacc.resp, "\r\n]}"),
- couch_httpd:end_json_response(Acc#vacc.resp),
- {ok, Acc}.
+view_cb(complete, #vacc{resp=Resp}=Acc) ->
+ % Finish view output
+ couch_httpd:send_chunk(Resp, "\r\n]}"),
+ case Acc#vacc.should_close of
+ true ->
+ {ok, Resp2} = couch_httpd:end_json_response(Resp),
+ {ok, Acc#vacc{resp=Resp2}};
+ _ ->
+ {ok, Acc#vacc{resp=Resp, prepend=",\r\n"}}
+ end.
+
row_to_json(Row) ->
Id = couch_util:get_value(id, Row),
row_to_json(Id, Row).
@@ -309,37 +365,57 @@
throw({bad_request, "`keys` member must be a array."})
end.
+get_view_queries({Props}) ->
+ case couch_util:get_value(<<"queries">>, Props) of
+ undefined ->
+ undefined;
+ Queries when is_list(Queries) ->
+ Queries;
+ _ ->
+ throw({bad_request, "`queries` member must be a array."})
+ end.
-parse_qs(Req, Keys) ->
- Args = #mrargs{keys=Keys},
+
+parse_qs(#httpd{}=Req, Keys) ->
+ parse_qs(couch_httpd:qs(Req), Keys);
+parse_qs(Props, Keys) ->
+ Args = #mrargs{},
+ parse_qs(Props, Keys, Args).
+
+parse_qs(Props, Keys, #mrargs{}=Args0) ->
+ parse_qs(Props, Keys, Args0, false).
+
+parse_qs(Props, Keys, #mrargs{}=Args0, Json) ->
+ Args = Args0#mrargs{keys=Keys},
lists:foldl(fun({K, V}, Acc) ->
- parse_qs(K, V, Acc)
- end, Args, couch_httpd:qs(Req)).
+ parse_param(K, V, Acc, Json)
+ end, Args, Props).
-
-parse_qs(Key, Val, Args) ->
+parse_param(Key, Val, Args, Json) when is_binary(Key) ->
+ parse_param(binary_to_list(Key), Val, Args, Json);
+parse_param(Key, Val, Args, Json) ->
case Key of
"" ->
Args;
"reduce" ->
Args#mrargs{reduce=parse_boolean(Val)};
"key" ->
- JsonKey = ?JSON_DECODE(Val),
+ JsonKey = parse_json(Val, Json),
Args#mrargs{start_key=JsonKey, end_key=JsonKey};
"keys" ->
- Args#mrargs{keys=?JSON_DECODE(Val)};
+ Args#mrargs{keys=parse_json(Val, Json)};
"startkey" ->
- Args#mrargs{start_key=?JSON_DECODE(Val)};
+ Args#mrargs{start_key=parse_json(Val, Json)};
"start_key" ->
- Args#mrargs{start_key=?JSON_DECODE(Val)};
+ Args#mrargs{start_key=parse_json(Val, Json)};
"startkey_docid" ->
Args#mrargs{start_key_docid=list_to_binary(Val)};
"start_key_doc_id" ->
Args#mrargs{start_key_docid=list_to_binary(Val)};
"endkey" ->
- Args#mrargs{end_key=?JSON_DECODE(Val)};
+ Args#mrargs{end_key=parse_json(Val, Json)};
"end_key" ->
- Args#mrargs{end_key=?JSON_DECODE(Val)};
+ Args#mrargs{end_key=parse_json(Val, Json)};
"endkey_docid" ->
Args#mrargs{end_key_docid=list_to_binary(Val)};
"end_key_doc_id" ->
@@ -407,6 +483,8 @@
true;
parse_boolean(false) ->
false;
+parse_boolean(Val) when is_binary(Val) ->
+ parse_boolean(binary_to_list(Val));
parse_boolean(Val) ->
case string:to_lower(Val) of
"true" -> true;
@@ -436,3 +514,8 @@
Msg = io_lib:format(Fmt, [Val]),
throw({query_parse_error, ?l2b(Msg)})
end.
+
+parse_json(V, false) when is_list(V) ->
+ ?JSON_DECODE(V);
+parse_json(V, _) ->
+ V.
diff --git a/src/couch_mrview_updater.erl b/src/couch_mrview_updater.erl
index 4d51f78..b12cf98 100644
--- a/src/couch_mrview_updater.erl
+++ b/src/couch_mrview_updater.erl
@@ -367,7 +367,8 @@
end, {[], [], []}, Log),
RemValue = {[{<<"_removed">>, true}]},
- {Log1, AddAcc, DelAcc} = walk_log(Btree, fun({DocId, VIdKeys}, {Log2, AddAcc2, DelAcc2}) ->
+ {Log1, AddAcc, DelAcc} = walk_log(Btree, fun({DocId, VIdKeys},
+ {Log2, AddAcc2, DelAcc2}) ->
{Log3, AddAcc3, DelAcc3} = lists:foldl(fun({ViewId,{Key, Seq,_Op}},
{Log4, AddAcc4, DelAcc4}) ->