% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_replicator_filters).

-export([parse/1, fetch/4, view_type/2]).
-export([ejsort/1]).

-include_lib("couch/include/couch_db.hrl").

-import(couch_util, [
    get_value/2,
    get_value/3
]).


% Parse the filter from replication options proplist.
% Return {ok, {FilterType,...}} | {error, ParseError}.
% For `user` filter, i.e. filters specified as user code
% in source database, this code doesn't fetch the filter
% code, but only returns the name of the filter.
-spec parse([_]) ->
  {ok, nil} |
  {ok, {view, binary(), {[_]}}} |
  {ok, {user, {binary(), binary()}, {[_]}}} |
  {ok, {docids, [_]}} |
  {ok, {mango, {[_]}}} |
  {error, binary()}.
parse(Options) ->
    Filter = get_value(filter, Options),
    DocIds = get_value(doc_ids, Options),
    Selector = get_value(selector, Options),
    case {Filter, DocIds, Selector} of
        {undefined, undefined, undefined} ->
            {ok, nil};
        {<<"_", _/binary>>, undefined, undefined} ->
            {ok, {view, Filter, query_params(Options)}};
        {_, undefined, undefined} ->
            case parse_user_filter(Filter) of
                {ok, {Doc, FilterName}} ->
                    {ok, {user, {Doc, FilterName}, query_params(Options)}};
                {error, Error} ->
                    {error, Error}
            end;
        {undefined, _, undefined} ->
            {ok, {docids, DocIds}};
        {undefined, undefined, _} ->
            {ok, {mango, ejsort(mango_selector:normalize(Selector))}};
        _ ->
            Err = "`selector`, `filter` and `doc_ids` are mutually exclusive",
            {error, list_to_binary(Err)}
    end.


% Fetches body of filter function from source database. Guaranteed to either
% return {ok, Body} or an {error, Reason}. Also assume this function might
% block due to network / socket issues for an undeterminted amount of time.
-spec fetch(binary(), binary(), binary(), #user_ctx{}) ->
    {ok, {[_]}} | {error, binary()}.
fetch(DDocName, FilterName, Source, UserCtx) ->
    {Pid, Ref} = spawn_monitor(fun() ->
        try fetch_internal(DDocName, FilterName, Source, UserCtx) of
            Resp ->
                exit({exit_ok, Resp})
        catch
            throw:{fetch_error, Reason} ->
                exit({exit_fetch_error, Reason});
            _OtherTag:Reason ->
                exit({exit_other_error, Reason})
        end
    end),
    receive
        {'DOWN', Ref, process, Pid, {exit_ok, Resp}} ->
            {ok, Resp};
        {'DOWN', Ref, process, Pid, {exit_fetch_error, Reason}} ->
            {error, Reason};
        {'DOWN', Ref, process, Pid, {exit_other_error, Reason}} ->
            {error, couch_util:to_binary(Reason)}
    end.


% Get replication type and view (if any) from replication document props
-spec view_type([_], [_]) ->
    {view, {binary(), binary()}} | {db, nil} | {error, binary()}.
view_type(Props, Options) ->
    case get_value(<<"filter">>, Props) of
        <<"_view">> ->
            {QP}  = get_value(query_params, Options, {[]}),
            ViewParam = get_value(<<"view">>, QP),
            case re:split(ViewParam, <<"/">>) of
                [DName, ViewName] ->
                    {view, {<< "_design/", DName/binary >>, ViewName}};
                _ ->
                    {error, <<"Invalid `view` parameter.">>}
            end;
        _ ->
            {db, nil}
    end.


% Private functions

fetch_internal(DDocName, FilterName, Source, UserCtx) ->
    Db = case (catch couch_replicator_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of
    {ok, Db0} ->
        Db0;
    DbError ->
        DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
           [couch_replicator_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]),
        throw({fetch_error, iolist_to_binary(DbErrorMsg)})
    end,
    try
        Body = case (catch couch_replicator_api_wrap:open_doc(
            Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
        {ok, #doc{body = Body0}} ->
            Body0;
        DocError ->
            DocErrorMsg = io_lib:format(
                "Couldn't open document `_design/~s` from source "
                "database `~s`: ~s", [DDocName, couch_replicator_api_wrap:db_uri(Source),
                    couch_util:to_binary(DocError)]),
            throw({fetch_error, iolist_to_binary(DocErrorMsg)})
        end,
        try
            Code = couch_util:get_nested_json_value(
                     Body, [<<"filters">>, FilterName]),
            re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
         catch
             _Tag:CodeError ->
                 CodeErrorMsg = io_lib:format(
                     "Couldn't parse filter code from document ~s on `~s` "
                     " Error: ~s", [DDocName, couch_replicator_api_wrap:db_uri(Source),
                         couch_util:to_binary(CodeError)]),
                 throw({fetch_error, CodeErrorMsg})
         end
    after
        couch_replicator_api_wrap:db_close(Db)
    end.



-spec query_params([_]) -> {[_]}.
query_params(Options)->
    get_value(query_params, Options, {[]}).



parse_user_filter(Filter) ->
    case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
        {match, [DDocName0, FilterName0]} ->
            {ok, {DDocName0, FilterName0}};
        _ ->
            {error, <<"Invalid filter. Must match `ddocname/filtername`.">>}
    end.


% Sort an EJSON object's properties to attempt
% to generate a unique representation. This is used
% to reduce the chance of getting different
% replication checkpoints for the same Mango selector
ejsort({V})->
    ejsort_props(V, []);
ejsort(V) when is_list(V) ->
    ejsort_array(V, []);
ejsort(V) ->
    V.

ejsort_props([], Acc)->
    {lists:keysort(1, Acc)};
ejsort_props([{K, V}| R], Acc) ->
    ejsort_props(R, [{K, ejsort(V)} | Acc]).

ejsort_array([], Acc)->
    lists:reverse(Acc);
ejsort_array([V | R], Acc) ->
    ejsort_array(R, [ejsort(V) | Acc]).


-ifdef(TEST).

-include_lib("eunit/include/eunit.hrl").

ejsort_basic_values_test() ->
    ?assertEqual(ejsort(0), 0),
    ?assertEqual(ejsort(<<"a">>), <<"a">>),
    ?assertEqual(ejsort(true), true),
    ?assertEqual(ejsort([]), []),
    ?assertEqual(ejsort({[]}), {[]}).

ejsort_compound_values_test() ->
    ?assertEqual(ejsort([2, 1, 3 ,<<"a">>]), [2, 1, 3, <<"a">>]),
    Ej1 = {[{<<"a">>, 0}, {<<"c">>, 0},  {<<"b">>, 0}]},
    Ej1s =  {[{<<"a">>, 0}, {<<"b">>, 0}, {<<"c">>, 0}]},
    ?assertEqual(ejsort(Ej1), Ej1s),
    Ej2 = {[{<<"x">>, Ej1}, {<<"z">>, Ej1}, {<<"y">>, [Ej1, Ej1]}]},
    ?assertEqual(ejsort(Ej2),
        {[{<<"x">>, Ej1s}, {<<"y">>, [Ej1s, Ej1s]}, {<<"z">>, Ej1s}]}).

-endif.
