blob: a03ffe8472514af2c97dd56f37830f7c84c599a3 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_replicator_filters).
-export([
parse/1,
fetch/4,
view_type/2,
ejsort/1
]).
-include_lib("couch/include/couch_db.hrl").
% Parse the filter from replication options proplist.
% Return {ok, {FilterType,...}} | {error, ParseError}.
% For `user` filter, i.e. filters specified as user code
% in source database, this code doesn't fetch the filter
% code, but only returns the name of the filter.
-spec parse([_]) ->
{ok, nil} |
{ok, {view, binary(), {[_]}}} |
{ok, {user, {binary(), binary()}, {[_]}}} |
{ok, {docids, [_]}} |
{ok, {mango, {[_]}}} |
{error, binary()}.
parse(Options) ->
Filter = couch_util:get_value(filter, Options),
DocIds = couch_util:get_value(doc_ids, Options),
Selector = couch_util:get_value(selector, Options),
case {Filter, DocIds, Selector} of
{undefined, undefined, undefined} ->
{ok, nil};
{<<"_", _/binary>>, undefined, undefined} ->
{ok, {view, Filter, query_params(Options)}};
{_, undefined, undefined} ->
case parse_user_filter(Filter) of
{ok, {Doc, FilterName}} ->
{ok, {user, {Doc, FilterName}, query_params(Options)}};
{error, Error} ->
{error, Error}
end;
{undefined, _, undefined} ->
{ok, {docids, DocIds}};
{undefined, undefined, _} ->
{ok, {mango, ejsort(mango_selector:normalize(Selector))}};
_ ->
Err = "`selector`, `filter` and `doc_ids` are mutually exclusive",
{error, list_to_binary(Err)}
end.
% Fetches body of filter function from source database. Guaranteed to either
% return {ok, Body} or an {error, Reason}. Also assume this function might
% block due to network / socket issues for an undeterminted amount of time.
-spec fetch(binary(), binary(), binary()) ->
{ok, {[_]}} | {error, binary()}.
fetch(DDocName, FilterName, Source) ->
{Pid, Ref} = spawn_monitor(fun() ->
try fetch_internal(DDocName, FilterName, Source) of
Resp ->
exit({exit_ok, Resp})
catch
throw:{fetch_error, Reason} ->
exit({exit_fetch_error, Reason});
_OtherTag:Reason ->
exit({exit_other_error, Reason})
end
end),
receive
{'DOWN', Ref, process, Pid, {exit_ok, Resp}} ->
{ok, Resp};
{'DOWN', Ref, process, Pid, {exit_fetch_error, Reason}} ->
{error, Reason};
{'DOWN', Ref, process, Pid, {exit_other_error, Reason}} ->
{error, couch_util:to_binary(Reason)}
end.
% Get replication type and view (if any) from replication document props
-spec view_type(#{}, [_]) ->
{binary(), #{}} | {error, binary()}.
view_type(#{<<"filter">> := <<"_view">>}, Options) ->
{QP} = couch_util:get_value(query_params, Options, {[]}),
ViewParam = couch_util:get_value(<<"view">>, QP),
case re:split(ViewParam, <<"/">>) of
[DName, ViewName] ->
DDocMap = #{
<<"ddoc">> => <<"_design/",DName/binary>>,
<<"view">> => ViewName
},
{<<"view">>, DDocMap};
_ ->
{error, <<"Invalid `view` parameter.">>}
end;
view_type(#{}, [_] = Options) ->
{<<"db">>, #{}}.
% Private functions
fetch_internal(DDocName, FilterName, Source) ->
Db = case (catch couch_replicator_api_wrap:db_open(Source) of
{ok, Db0} ->
Db0;
DbError ->
DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
[couch_replicator_api_wrap:db_uri(Source),
couch_util:to_binary(DbError)]),
throw({fetch_error, iolist_to_binary(DbErrorMsg)})
end,
try
Body = case (catch couch_replicator_api_wrap:open_doc(
Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
{ok, #doc{body = Body0}} ->
Body0;
DocError ->
DocErrorMsg = io_lib:format(
"Couldn't open document `_design/~s` from source "
"database `~s`: ~s", [DDocName,
couch_replicator_api_wrap:db_uri(Source),
couch_util:to_binary(DocError)]
),
throw({fetch_error, iolist_to_binary(DocErrorMsg)})
end,
try
Code = couch_util:get_nested_json_value(
Body, [<<"filters">>, FilterName]),
re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
catch
_Tag:CodeError ->
CodeErrorMsg = io_lib:format(
"Couldn't parse filter code from document ~s on `~s` "
" Error: ~s", [DDocName,
couch_replicator_api_wrap:db_uri(Source),
couch_util:to_binary(CodeError)]
),
throw({fetch_error, CodeErrorMsg})
end
after
couch_replicator_api_wrap:db_close(Db)
end.
-spec query_params([_]) -> {[_]}.
query_params(Options)->
couch_util:get_value(query_params, Options, {[]}).
parse_user_filter(Filter) ->
case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
{match, [DDocName0, FilterName0]} ->
{ok, {DDocName0, FilterName0}};
_ ->
{error, <<"Invalid filter. Must match `ddocname/filtername`.">>}
end.
% Sort an EJSON object's properties to attempt
% to generate a unique representation. This is used
% to reduce the chance of getting different
% replication checkpoints for the same Mango selector
ejsort({V})->
ejsort_props(V, []);
ejsort(V) when is_list(V) ->
ejsort_array(V, []);
ejsort(V) ->
V.
ejsort_props([], Acc)->
{lists:keysort(1, Acc)};
ejsort_props([{K, V}| R], Acc) ->
ejsort_props(R, [{K, ejsort(V)} | Acc]).
ejsort_array([], Acc)->
lists:reverse(Acc);
ejsort_array([V | R], Acc) ->
ejsort_array(R, [ejsort(V) | Acc]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
ejsort_basic_values_test() ->
?assertEqual(ejsort(0), 0),
?assertEqual(ejsort(<<"a">>), <<"a">>),
?assertEqual(ejsort(true), true),
?assertEqual(ejsort([]), []),
?assertEqual(ejsort({[]}), {[]}).
ejsort_compound_values_test() ->
?assertEqual(ejsort([2, 1, 3, <<"a">>]), [2, 1, 3, <<"a">>]),
Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0}, {<<"b">>, 0}]},
Ej1s = {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]},
?assertEqual(ejsort(Ej1), Ej1s),
Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]},
?assertEqual(ejsort(Ej2),
{[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}).
-endif.