Implement Mango selectors for replication

Replication document should have a "selector"
field with a Mango selector JSON object
as the value.

For example:
```
{
    "_id": "r",
    "continuous": true,
    "selector": {
        "_id": {
            "$gte": "d2"
        }
    },
    "source": "http://adm:pass@localhost:15984/a",
    "target": "http://adm:pass@localhost:15984/b"
}
```

This feature underneath uses the _changes feed
Mango selectors capability.

Replicator docs js validation function has been
updated to return an error if it notices user has
specified both `doc_ids` and `selector`. Or
they specified `filter` and either of the other
two.

Replication options parsing also checks for those
mutually exclusive fields, as replications can be
started from the `_replicate` endpoint not just
via the docs in `*_replicator` dbs.

When generating a replication id, Mango selector
object is normalized and sorted (JSON fields
are sorted inside objects only). That is done in order
to reduce the chance of creating two different
replication checkpoints for same Mango selector.

Jira: COUCHDB-2988
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index b7d3bb6..ff6b00c 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -469,11 +469,16 @@
         {"timeout", integer_to_list(Timeout)}
            ],
     DocIds = get_value(doc_ids, Options),
-    {QArgs, Method, Body, Headers} = case DocIds of
-    undefined ->
+    Selector = get_value(selector, Options),
+    {QArgs, Method, Body, Headers} = case {DocIds, Selector} of
+    {undefined, undefined} ->
         QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
         {QArgs1, get, [], Headers1};
-    _ when is_list(DocIds) ->
+    {undefined, _} when is_tuple(Selector) ->
+        Headers2 = [{"Content-Type", "application/json"} | Headers1],
+        JsonSelector = ?JSON_ENCODE({[{<<"selector">>, Selector}]}),
+        {[{"filter", "_selector"} | BaseQArgs], post, JsonSelector, Headers2};
+    {_, undefined} when is_list(DocIds) ->
         Headers2 = [{"Content-Type", "application/json"} | Headers1],
         JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
         {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
@@ -506,11 +511,15 @@
                     end)
         end);
 changes_since(Db, Style, StartSeq, UserFun, Options) ->
-    Filter = case get_value(doc_ids, Options) of
-    undefined ->
+    DocIds = get_value(doc_ids, Options),
+    Selector = get_value(selector, Options),
+    Filter = case {DocIds, Selector} of
+    {undefined, undefined} ->
         ?b2l(get_value(filter, Options, <<>>));
-    _DocIds ->
-        "_doc_ids"
+    {_, undefined} ->
+        "_doc_ids";
+    {undefined, _} ->
+        "_selector"
     end,
     Args = #changes_args{
         style = Style,
@@ -580,6 +589,8 @@
     {[]};
 changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
     {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
+changes_json_req(_Db, "_selector", _QueryParams, Options) ->
+    {[{<<"selector">>, get_value(selector, Options)}]};
 changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
     {ok, Info} = couch_db:get_db_info(Db),
     % simulate a request to db_name/_changes
diff --git a/src/couch_replicator_js_functions.hrl b/src/couch_replicator_js_functions.hrl
index 3f1db7c..f3f7ab6 100644
--- a/src/couch_replicator_js_functions.hrl
+++ b/src/couch_replicator_js_functions.hrl
@@ -81,12 +81,31 @@
                 reportError('The `doc_ids\\' field must be an array of strings.');
             }
 
+            if ((typeof newDoc.selector !== 'undefined') &&
+                (typeof newDoc.selector !== 'object')) {
+
+                reportError('The `selector\\' field must be an object.');
+            }
+
             if ((typeof newDoc.filter !== 'undefined') &&
                 ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
 
                 reportError('The `filter\\' field must be a non-empty string.');
             }
 
+            if ((typeof newDoc.doc_ids !== 'undefined') &&
+                (typeof newDoc.selector !== 'undefined')) {
+
+                reportError('`doc_ids\\' field is incompatible with `selector\\'.');
+            }
+
+            if ( ((typeof newDoc.doc_ids !== 'undefined') ||
+                  (typeof newDoc.selector !== 'undefined')) &&
+                 (typeof newDoc.filter !== 'undefined') ) {
+
+                reportError('`filter\\' field is incompatible with `selector\\' and `doc_ids\\'.');
+            }
+
             if ((typeof newDoc.query_params !== 'undefined') &&
                 ((typeof newDoc.query_params !== 'object') ||
                     newDoc.query_params === null)) {
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index dde30f6..76bc8e1 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -115,20 +115,24 @@
 
 maybe_append_filters(Base,
         #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
+    Filter = get_value(filter, Options),
+    DocIds = get_value(doc_ids, Options),
+    Selector = get_value(selector, Options),
     Base2 = Base ++
-        case get_value(filter, Options) of
-        undefined ->
-            case get_value(doc_ids, Options) of
-            undefined ->
-                [];
-            DocIds ->
-                [DocIds]
-            end;
-        <<"_", _/binary>> = Filter ->
-                [Filter, get_value(query_params, Options, {[]})];
-        Filter ->
+        case {Filter, DocIds, Selector} of
+        {undefined, undefined, undefined} ->
+            [];
+        {<<"_", _/binary>>, undefined, undefined} ->
+            [Filter, get_value(query_params, Options, {[]})];
+        {_, undefined, undefined} ->
             [filter_code(Filter, Source, UserCtx),
-                get_value(query_params, Options, {[]})]
+                get_value(query_params, Options, {[]})];
+        {undefined, _, undefined} ->
+            [DocIds];
+        {undefined, undefined, _} ->
+            [ejsort(mango_selector:normalize(Selector))];
+        _ ->
+            throw({error, <<"`selector`, `filter` and `doc_ids` fields are mutually exclusive">>})
         end,
     couch_util:to_hex(couch_crypto:hash(md5, term_to_binary(Base2))).
 
@@ -248,7 +252,8 @@
 
 
 make_options(Props) ->
-    Options = lists:ukeysort(1, convert_options(Props)),
+    Options0 = lists:ukeysort(1, convert_options(Props)),
+    Options = check_options(Options0),
     DefWorkers = config:get("replicator", "worker_processes", "4"),
     DefBatchSize = config:get("replicator", "worker_batch_size", "500"),
     DefConns = config:get("replicator", "http_connections", "20"),
@@ -296,6 +301,10 @@
     % encoded doc IDs.
     DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
     [{doc_ids, DocIds} | convert_options(R)];
+convert_options([{<<"selector">>, V} | _R]) when not is_tuple(V) ->
+    throw({bad_request, <<"parameter `selector` must be a JSON object">>});
+convert_options([{<<"selector">>, V} | R]) ->
+    [{selector, V} | convert_options(R)];
 convert_options([{<<"worker_processes">>, V} | R]) ->
     [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
 convert_options([{<<"worker_batch_size">>, V} | R]) ->
@@ -318,6 +327,19 @@
 convert_options([_ | R]) -> % skip unknown option
     convert_options(R).
 
+check_options(Options) ->
+    DocIds = lists:keyfind(doc_ids, 1, Options),
+    Filter = lists:keyfind(filter, 1, Options),
+    Selector = lists:keyfind(selector, 1, Options),
+    case {DocIds, Filter, Selector} of
+        {false, false, false} -> Options;
+        {false, false, _} -> Options;
+        {false, _, false} -> Options;
+        {_, false, false} -> Options;
+        _ ->
+            throw({bad_request, "`doc_ids`, `filter`, `selector` are mutually exclusive options"})
+    end.
+
 
 parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
     parse_proxy_params(?b2l(ProxyUrl));
@@ -458,3 +480,65 @@
     Else ->
         Else
     end.
+
+
+% Sort an EJSON object's properties to attempt
+% to generate a unique representation. This is used
+% to reduce the chance of getting different
+% replication checkpoints for the same Mango selector
+ejsort({V})->
+    ejsort_props(V, []);
+ejsort(V) when is_list(V) ->
+    ejsort_array(V, []);
+ejsort(V) ->
+    V.
+
+ejsort_props([], Acc)->
+    {lists:keysort(1, Acc)};
+ejsort_props([{K, V}| R], Acc) ->
+    ejsort_props(R, [{K, ejsort(V)} | Acc]).
+
+ejsort_array([], Acc)->
+    lists:reverse(Acc);
+ejsort_array([V | R], Acc) ->
+    ejsort_array(R, [ejsort(V) | Acc]).
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+ejsort_basic_values_test() ->
+    ?assertEqual(ejsort(0), 0),
+    ?assertEqual(ejsort(<<"a">>), <<"a">>),
+    ?assertEqual(ejsort(true), true),
+    ?assertEqual(ejsort([]), []),
+    ?assertEqual(ejsort({[]}), {[]}).
+
+ejsort_compound_values_test() ->
+    ?assertEqual(ejsort([2, 1, 3 ,<<"a">>]), [2, 1, 3, <<"a">>]),
+    Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0},  {<<"b">>, 0}]},
+    Ej1s =  {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]},
+    ?assertEqual(ejsort(Ej1), Ej1s),
+    Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]},
+    ?assertEqual(ejsort(Ej2),
+        {[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}).
+
+check_options_pass_values_test() ->
+    ?assertEqual(check_options([]), []),
+    ?assertEqual(check_options([baz, {other,fiz}]), [baz, {other, fiz}]),
+    ?assertEqual(check_options([{doc_ids, x}]), [{doc_ids, x}]),
+    ?assertEqual(check_options([{filter, x}]), [{filter, x}]),
+    ?assertEqual(check_options([{selector, x}]), [{selector, x}]).
+
+check_options_fail_values_test() ->
+    ?assertThrow({bad_request, _},
+        check_options([{doc_ids, x}, {filter, y}])),
+    ?assertThrow({bad_request, _},
+        check_options([{doc_ids, x}, {selector, y}])),
+    ?assertThrow({bad_request, _},
+        check_options([{filter, x}, {selector, y}])),
+    ?assertThrow({bad_request, _},
+        check_options([{doc_ids, x}, {selector, y}, {filter, z}])).
+
+-endif.
diff --git a/test/couch_replicator_selector_tests.erl b/test/couch_replicator_selector_tests.erl
new file mode 100644
index 0000000..98c6099
--- /dev/null
+++ b/test/couch_replicator_selector_tests.erl
@@ -0,0 +1,121 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_selector_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_replicator/src/couch_replicator.hrl").
+
+
+setup(_) ->
+    Ctx = test_util:start_couch([couch_replicator]),
+    Source = create_db(),
+    create_docs(Source),
+    Target = create_db(),
+    {Ctx, {Source, Target}}.
+
+teardown(_, {Ctx, {Source, Target}}) ->
+    delete_db(Source),
+    delete_db(Target),
+    ok = application:stop(couch_replicator),
+    ok = test_util:stop_couch(Ctx).
+
+selector_replication_test_() ->
+    Pairs = [{local, local}, {local, remote},
+             {remote, local}, {remote, remote}],
+    {
+        "Selector filtered replication tests",
+        {
+            foreachx,
+            fun setup/1, fun teardown/2,
+            [{Pair, fun should_succeed/2} || Pair <- Pairs]
+        }
+    }.
+
+should_succeed({From, To}, {_Ctx, {Source, Target}}) ->
+    RepObject = {[
+        {<<"source">>, db_url(From, Source)},
+        {<<"target">>, db_url(To, Target)},
+        {<<"selector">>, {[{<<"_id">>, <<"doc2">>}]}}
+    ]},
+    {ok, _} = couch_replicator:replicate(RepObject, ?ADMIN_USER),
+    %% FilteredFun is an Erlang version of following mango selector
+    FilterFun = fun(_DocId, {Props}) ->
+        couch_util:get_value(<<"_id">>, Props) == <<"doc2">>
+    end,
+    {ok, TargetDbInfo, AllReplies} = compare_dbs(Source, Target, FilterFun),
+    {lists:flatten(io_lib:format("~p -> ~p", [From, To])), [
+        {"Target DB has proper number of docs",
+        ?_assertEqual(1, proplists:get_value(doc_count, TargetDbInfo))},
+        {"All the docs selected as expected",
+        ?_assert(lists:all(fun(Valid) -> Valid end, AllReplies))}
+    ]}.
+
+compare_dbs(Source, Target, FilterFun) ->
+    {ok, SourceDb} = couch_db:open_int(Source, []),
+    {ok, TargetDb} = couch_db:open_int(Target, []),
+    {ok, TargetDbInfo} = couch_db:get_db_info(TargetDb),
+    Fun = fun(FullDocInfo, _, Acc) ->
+        {ok, DocId, SourceDoc} = read_doc(SourceDb, FullDocInfo),
+        TargetReply = read_doc(TargetDb, DocId),
+        case FilterFun(DocId, SourceDoc) of
+            true ->
+                ValidReply = {ok, DocId, SourceDoc} == TargetReply,
+                {ok, [ValidReply|Acc]};
+            false ->
+                ValidReply = {not_found, missing} == TargetReply,
+                {ok, [ValidReply|Acc]}
+        end
+    end,
+    {ok, _, AllReplies} = couch_db:enum_docs(SourceDb, Fun, [], []),
+    ok = couch_db:close(SourceDb),
+    ok = couch_db:close(TargetDb),
+    {ok, TargetDbInfo, AllReplies}.
+
+read_doc(Db, DocIdOrInfo) ->
+    case couch_db:open_doc(Db, DocIdOrInfo) of
+        {ok, Doc} ->
+            {Props} = couch_doc:to_json_obj(Doc, [attachments]),
+            DocId = couch_util:get_value(<<"_id">>, Props),
+            {ok, DocId, {Props}};
+        Error ->
+            Error
+    end.
+
+create_db() ->
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    ok = couch_db:close(Db),
+    DbName.
+
+create_docs(DbName) ->
+    {ok, Db} = couch_db:open(DbName, [?ADMIN_CTX]),
+    Doc1 = couch_doc:from_json_obj({[
+        {<<"_id">>, <<"doc1">>}
+    ]}),
+    Doc2 = couch_doc:from_json_obj({[
+        {<<"_id">>, <<"doc2">>}
+    ]}),
+    {ok, _} = couch_db:update_docs(Db, [Doc1, Doc2]),
+    couch_db:ensure_full_commit(Db),
+    couch_db:close(Db).
+
+delete_db(DbName) ->
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]).
+
+db_url(local, DbName) ->
+    DbName;
+db_url(remote, DbName) ->
+    Addr = config:get("httpd", "bind_address", "127.0.0.1"),
+    Port = mochiweb_socket_server:get(couch_httpd, port),
+    ?l2b(io_lib:format("http://~s:~b/~s", [Addr, Port, DbName])).