Implement Mango selectors for change feeds
API is modeled after _doc_ids filter for change feeds.
User POSTs to {db}/_changes with `filter=_selector`.
Document body should have a "selector" field, with
a Mango selector object as value.
For example:
```
http http://.../d1/_changes?filter=_selector
{
"selector": {"z" : {"$gte" : 1} }
}
```
Jira: COUCHDB-2988
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
index 248ba3d..75ec1cd 100644
--- a/src/couch_changes.erl
+++ b/src/couch_changes.erl
@@ -196,6 +196,8 @@
configure_filter("_doc_ids", Style, Req, _Db) ->
{doc_ids, Style, get_doc_ids(Req)};
+configure_filter("_selector", Style, Req, _Db) ->
+ {selector, Style, get_selector(Req)};
configure_filter("_design", Style, _Req, _Db) ->
{design_docs, Style};
configure_filter("_view", Style, Req, Db) ->
@@ -267,6 +269,11 @@
false ->
[]
end;
+filter(Db, DocInfo, {selector, Style, Selector}) ->
+ Docs = open_revs(Db, DocInfo, Style),
+ Passes = [mango_selector:match(Selector, couch_doc:to_json_obj(Doc, []))
+ || Doc <- Docs],
+ filter_revs(Passes, Docs);
filter(_Db, DocInfo, {design_docs, Style}) ->
case DocInfo#doc_info.id of
<<"_design", _/binary>> ->
@@ -336,6 +343,15 @@
throw({bad_request, no_doc_ids_provided}).
+get_selector({json_req, {Props}}) ->
+ check_selector(couch_util:get_value(<<"selector">>, Props));
+get_selector(#httpd{method='POST'}=Req) ->
+ couch_httpd:validate_ctype(Req, "application/json"),
+ get_selector({json_req, couch_httpd:json_body_obj(Req)});
+get_selector(_) ->
+ throw({bad_request, "Selector must be specified in POST payload"}).
+
+
check_docids(DocIds) when is_list(DocIds) ->
lists:foreach(fun
(DocId) when not is_binary(DocId) ->
@@ -349,6 +365,18 @@
throw({bad_request, Msg}).
+check_selector(Selector={_}) ->
+ try
+ mango_selector:normalize(Selector)
+ catch
+ {mango_error, Mod, Reason0} ->
+ {_StatusCode, _Error, Reason} = mango_error:info(Mod, Reason0),
+ throw({bad_request, Reason})
+ end;
+check_selector(_Selector) ->
+ throw({bad_request, "Selector error: expected a JSON object"}).
+
+
open_ddoc(#db{name=DbName, id_tree=undefined}, DDocId) ->
case ddoc_cache:open_doc(mem3:dbname(DbName), DDocId) of
{ok, _} = Resp -> Resp;
diff --git a/test/couch_changes_tests.erl b/test/couch_changes_tests.erl
index f3dcf6e..52eff8a 100644
--- a/test/couch_changes_tests.erl
+++ b/test/couch_changes_tests.erl
@@ -60,6 +60,7 @@
setup,
fun test_util:start_couch/0, fun test_util:stop_couch/1,
[
+ filter_by_selector(),
filter_by_doc_id(),
filter_by_design(),
continuous_feed()
@@ -84,6 +85,24 @@
}
}.
+filter_by_selector() ->
+ {
+ "Filter _selector",
+ {
+ foreach,
+ fun setup/0, fun teardown/1,
+ [
+ fun should_select_basic/1,
+ fun should_select_with_since/1,
+ fun should_select_when_no_result/1,
+ fun should_select_with_deleted_docs/1,
+ fun should_select_with_continuous/1,
+ fun should_stop_selector_when_db_deleted/1
+ ]
+ }
+ }.
+
+
filter_by_design() ->
{
"Filter _design",
@@ -317,7 +336,7 @@
should_end_changes_when_db_deleted({DbName, _Revs}) ->
?_test(begin
- {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, _Db} = couch_db:open_int(DbName, []),
ChangesArgs = #changes_args{
filter = "_doc_ids",
feed = "continuous"
@@ -333,6 +352,140 @@
ok
end).
+
+should_select_basic({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{filter = "_selector"},
+ Selector = {[{<<"_id">>, <<"doc3">>}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ Consumer = spawn_consumer(DbName, ChArgs, Req),
+ {Rows, LastSeq} = wait_finished(Consumer),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ UpSeq = couch_db:get_update_seq(Db),
+ couch_db:close(Db),
+ stop_consumer(Consumer),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id}] = Rows,
+ ?assertEqual(<<"doc3">>, Id),
+ ?assertEqual(6, Seq),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_select_with_since({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{filter = "_selector", since = 9},
+ GteDoc2 = {[{<<"$gte">>, <<"doc1">>}]},
+ Selector = {[{<<"_id">>, GteDoc2}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ Consumer = spawn_consumer(DbName, ChArgs, Req),
+ {Rows, LastSeq} = wait_finished(Consumer),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ UpSeq = couch_db:get_update_seq(Db),
+ couch_db:close(Db),
+ stop_consumer(Consumer),
+ ?assertEqual(1, length(Rows)),
+ [#row{seq = Seq, id = Id}] = Rows,
+ ?assertEqual(<<"doc8">>, Id),
+ ?assertEqual(10, Seq),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_select_when_no_result({DbName, _}) ->
+ ?_test(
+ begin
+ ChArgs = #changes_args{filter = "_selector"},
+ Selector = {[{<<"_id">>, <<"nopers">>}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ Consumer = spawn_consumer(DbName, ChArgs, Req),
+ {Rows, LastSeq} = wait_finished(Consumer),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ UpSeq = couch_db:get_update_seq(Db),
+ couch_db:close(Db),
+ stop_consumer(Consumer),
+ ?assertEqual(0, length(Rows)),
+ ?assertEqual(UpSeq, LastSeq)
+ end).
+
+should_select_with_deleted_docs({DbName, Revs}) ->
+ ?_test(
+ begin
+ Rev3_2 = element(6, Revs),
+ {ok, Db} = couch_db:open_int(DbName, []),
+ {ok, _} = save_doc(
+ Db,
+ {[{<<"_id">>, <<"doc3">>},
+ {<<"_deleted">>, true},
+ {<<"_rev">>, Rev3_2}]}),
+ ChArgs = #changes_args{filter = "_selector"},
+ Selector = {[{<<"_id">>, <<"doc3">>}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ Consumer = spawn_consumer(DbName, ChArgs, Req),
+ {Rows, LastSeq} = wait_finished(Consumer),
+ couch_db:close(Db),
+ stop_consumer(Consumer),
+ ?assertMatch(
+ [#row{seq = LastSeq, id = <<"doc3">>, deleted = true}],
+ Rows
+ ),
+ ?assertEqual(11, LastSeq)
+ end).
+
+should_select_with_continuous({DbName, Revs}) ->
+ ?_test(
+ begin
+ {ok, Db} = couch_db:open_int(DbName, []),
+ ChArgs = #changes_args{filter = "_selector", feed = "continuous"},
+ GteDoc8 = {[{<<"$gte">>, <<"doc8">>}]},
+ Selector = {[{<<"_id">>, GteDoc8}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ Consumer = spawn_consumer(DbName, ChArgs, Req),
+ ok = pause(Consumer),
+ Rows = get_rows(Consumer),
+ ?assertMatch(
+ [#row{seq = 10, id = <<"doc8">>, deleted = false}],
+ Rows
+ ),
+ clear_rows(Consumer),
+ {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc01">>}]}),
+ ok = unpause(Consumer),
+ timer:sleep(100),
+ ok = pause(Consumer),
+ ?assertEqual([], get_rows(Consumer)),
+ Rev4 = element(4, Revs),
+ Rev8 = element(10, Revs),
+ {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc8">>},
+ {<<"_rev">>, Rev8}]}),
+ {ok, _} = save_doc(Db, {[{<<"_id">>, <<"doc4">>},
+ {<<"_rev">>, Rev4}]}),
+ ok = unpause(Consumer),
+ timer:sleep(100),
+ ok = pause(Consumer),
+ NewRows = get_rows(Consumer),
+ ?assertMatch(
+ [#row{seq = _, id = <<"doc8">>, deleted = false}],
+ NewRows
+ )
+ end).
+
+should_stop_selector_when_db_deleted({DbName, _Revs}) ->
+ ?_test(
+ begin
+ {ok, _Db} = couch_db:open_int(DbName, []),
+ ChArgs = #changes_args{filter = "_selector", feed = "continuous"},
+ Selector = {[{<<"_id">>, <<"doc3">>}]},
+ Req = {json_req, {[{<<"selector">>, Selector}]}},
+ Consumer = spawn_consumer(DbName, ChArgs, Req),
+ ok = pause(Consumer),
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+ ok = unpause(Consumer),
+ {_Rows, _LastSeq} = wait_finished(Consumer),
+ stop_consumer(Consumer),
+ ok
+ end).
+
+
should_emit_only_design_documents({DbName, Revs}) ->
?_test(
begin