blob: aab8e80b3edd2477d7a1b7d418f9250cef7179c5 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_replicator_filters).
-export([
parse/1,
fetch/3,
view_type/2,
ejsort/1
]).
-include_lib("couch/include/couch_db.hrl").
% Parse the filter from replication options proplist.
% Return {ok, {FilterType,...}} | {error, ParseError}.
% For `user` filter, i.e. filters specified as user code
% in source database, this code doesn't fetch the filter
% code, but only returns the name of the filter.
-spec parse([_]) ->
{ok, nil}
| {ok, {view, binary(), {[_]}}}
| {ok, {user, {binary(), binary()}, {[_]}}}
| {ok, {docids, [_]}}
| {ok, {mango, {[_]}}}
| {error, binary()}.
parse(Options) ->
Filter = couch_util:get_value(filter, Options),
DocIds = couch_util:get_value(doc_ids, Options),
Selector = couch_util:get_value(selector, Options),
case {Filter, DocIds, Selector} of
{undefined, undefined, undefined} ->
{ok, nil};
{<<"_", _/binary>>, undefined, undefined} ->
{ok, {view, Filter, query_params(Options)}};
{_, undefined, undefined} ->
case parse_user_filter(Filter) of
{ok, {Doc, FilterName}} ->
{ok, {user, {Doc, FilterName}, query_params(Options)}};
{error, Error} ->
{error, Error}
end;
{undefined, _, undefined} ->
{ok, {docids, DocIds}};
{undefined, undefined, _} ->
{ok, {mango, ejsort(mango_selector:normalize(Selector))}};
_ ->
Err = "`selector`, `filter` and `doc_ids` are mutually exclusive",
{error, list_to_binary(Err)}
end.
% Fetches body of filter function from source database. Guaranteed to either
% return {ok, Body} or an {error, Reason}. Also assume this function might
% block due to network / socket issues for an undeterminted amount of time.
-spec fetch(binary(), binary(), binary()) ->
{ok, {[_]}} | {error, binary()}.
fetch(DDocName, FilterName, Source) ->
{Pid, Ref} = spawn_monitor(fun() ->
try fetch_internal(DDocName, FilterName, Source) of
Resp ->
exit({exit_ok, Resp})
catch
throw:{fetch_error, Reason} ->
exit({exit_fetch_error, Reason});
_OtherTag:Reason ->
exit({exit_other_error, Reason})
end
end),
receive
{'DOWN', Ref, process, Pid, {exit_ok, Resp}} ->
{ok, Resp};
{'DOWN', Ref, process, Pid, {exit_fetch_error, Reason}} ->
{error, Reason};
{'DOWN', Ref, process, Pid, {exit_other_error, Reason}} ->
{error, couch_util:to_binary(Reason)}
end.
% Get replication type and view (if any) from replication document props
-spec view_type([_], [_]) ->
{view, {binary(), binary()}} | {db, nil} | {error, binary()}.
view_type(Props, Options) ->
case couch_util:get_value(<<"filter">>, Props) of
<<"_view">> ->
{QP} = couch_util:get_value(query_params, Options, {[]}),
ViewParam = couch_util:get_value(<<"view">>, QP),
case re:split(ViewParam, <<"/">>) of
[DName, ViewName] ->
{view, {<<"_design/", DName/binary>>, ViewName}};
_ ->
{error, <<"Invalid `view` parameter.">>}
end;
_ ->
{db, nil}
end.
% Private functions
fetch_internal(DDocName, FilterName, Source) ->
Db =
case (catch couch_replicator_api_wrap:db_open(Source)) of
{ok, Db0} ->
Db0;
DbError ->
DbErrorMsg = io_lib:format(
"Could not open source database `~s`: ~s",
[
couch_replicator_api_wrap:db_uri(Source),
couch_util:to_binary(DbError)
]
),
throw({fetch_error, iolist_to_binary(DbErrorMsg)})
end,
try
Body =
case
(catch couch_replicator_api_wrap:open_doc(
Db, <<"_design/", DDocName/binary>>, [ejson_body]
))
of
{ok, #doc{body = Body0}} ->
Body0;
DocError ->
DocErrorMsg = io_lib:format(
"Couldn't open document `_design/~s` from source "
"database `~s`: ~s",
[
DDocName,
couch_replicator_api_wrap:db_uri(Source),
couch_util:to_binary(DocError)
]
),
throw({fetch_error, iolist_to_binary(DocErrorMsg)})
end,
try
Code = couch_util:get_nested_json_value(
Body, [<<"filters">>, FilterName]
),
re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
catch
_Tag:CodeError ->
CodeErrorMsg = io_lib:format(
"Couldn't parse filter code from document ~s on `~s` "
" Error: ~s",
[
DDocName,
couch_replicator_api_wrap:db_uri(Source),
couch_util:to_binary(CodeError)
]
),
throw({fetch_error, CodeErrorMsg})
end
after
couch_replicator_api_wrap:db_close(Db)
end.
-spec query_params([_]) -> {[_]}.
query_params(Options) ->
couch_util:get_value(query_params, Options, {[]}).
parse_user_filter(Filter) ->
case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
{match, [DDocName0, FilterName0]} ->
{ok, {DDocName0, FilterName0}};
_ ->
{error, <<"Invalid filter. Must match `ddocname/filtername`.">>}
end.
% Sort an EJSON object's properties to attempt
% to generate a unique representation. This is used
% to reduce the chance of getting different
% replication checkpoints for the same Mango selector
ejsort({V}) ->
ejsort_props(V, []);
ejsort(V) when is_list(V) ->
ejsort_array(V, []);
ejsort(V) ->
V.
ejsort_props([], Acc) ->
{lists:keysort(1, Acc)};
ejsort_props([{K, V} | R], Acc) ->
ejsort_props(R, [{K, ejsort(V)} | Acc]).
ejsort_array([], Acc) ->
lists:reverse(Acc);
ejsort_array([V | R], Acc) ->
ejsort_array(R, [ejsort(V) | Acc]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
ejsort_basic_values_test() ->
?assertEqual(ejsort(0), 0),
?assertEqual(ejsort(<<"a">>), <<"a">>),
?assertEqual(ejsort(true), true),
?assertEqual(ejsort([]), []),
?assertEqual(ejsort({[]}), {[]}).
ejsort_compound_values_test() ->
?assertEqual(ejsort([2, 1, 3, <<"a">>]), [2, 1, 3, <<"a">>]),
Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0}, {<<"b">>, 0}]},
Ej1s = {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]},
?assertEqual(ejsort(Ej1), Ej1s),
Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]},
?assertEqual(
ejsort(Ej2),
{[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}
).
-endif.