% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(cassim_metadata_cache).
-behaviour(gen_server).


-export([
    start_link/0
]).

-export([
    init/1,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    terminate/2,
    code_change/3
]).

-export([
    load_meta/1,
    load_meta/2,
    load_meta/3,
    metadata_db/0,
    metadata_db_exists/0,
    cleanup_old_docs/1
]).

-export([
    security_meta_id/1
]).

-export([
    cache_timeout/0
]).


-record(st, {
    changes_pid,
    last_seq="0",
    global_db
}).


-include_lib("couch/include/couch_db.hrl").


-define(META_TABLE, metadata_cache).
-define(TABLE_OPTS, [set, protected, named_table]).


metadata_db() ->
    config:get("couchdb", "metadata_db", "_metadata").


cache_timeout() ->
    case config:get("cassim", "cache_timeout", "5000") of
        "infinity" -> infinity;
        N -> list_to_integer(N)
    end.


metadata_db_exists() ->
    try mem3:shards(metadata_db()) of
        _Shards ->
            true
    catch error:database_does_not_exist ->
        false
    end.


security_meta_id(DbName) ->
    Suffix = list_to_binary(mem3:shard_suffix(DbName)),
    <<"db/", DbName/binary, "/_security", Suffix/binary>>.


start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).


init([]) ->
    ets:new(?META_TABLE, ?TABLE_OPTS),
    GlobalDb = metadata_db(),
    {Pid, _} = spawn_monitor(fun() -> listen_for_changes("0") end),
    {ok, #st{changes_pid = Pid, global_db=GlobalDb}}.


handle_call(get_seq, _From, State) ->
    {reply, State#st.last_seq, State};
handle_call({fetch_meta, MetaId}, _From, State) ->
    {reply, load_meta_from_db(State#st.global_db, MetaId), State};
handle_call({fetch_meta, Db, MetaId}, _From, State) ->
    {reply, load_meta_from_db(Db, MetaId), State};
handle_call(Call, _From, State) ->
    couch_log:error("Unknown cassim_metadata_cache call: ~p~n", [Call]),
    {noreply, State}.


handle_cast({insert_cached_meta, {MetaId, Props}}, State) ->
    ets:insert(?META_TABLE, {MetaId, Props}),
    {noreply, State};
handle_cast({delete_meta, MetaId}, State) ->
    ets:delete(?META_TABLE, MetaId),
    {noreply, State};
handle_cast(_Msg, State) ->
    {noreply, State}.


handle_info({'DOWN', _, _, Pid, Reason}, #st{changes_pid=Pid} = State) ->
    {Delay, Seq} = case Reason of
        {seq, EndSeq} ->
            {5 * 1000, EndSeq};
        {error, database_does_not_exist} ->
            couch_log:error("Metadata db ~p does not exist", [metadata_db()]),
            {5 * 60 * 1000, State#st.last_seq};
        _ ->
            couch_log:notice("~p changes listener died ~p", [?MODULE, Reason]),
            {5 * 1000, State#st.last_seq}
    end,
    timer:send_after(Delay, {start_listener, Seq}),
    {noreply, State#st{last_seq=Seq, changes_pid=undefined}};
handle_info({start_listener, Seq}, #st{changes_pid=undefined} = State) ->
    {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
    {noreply, State#st{changes_pid=NewPid}};
handle_info(_Msg, State) ->
    {noreply, State}.


terminate(_Reason, #st{changes_pid = undefined}) ->
    ok;
terminate(_Reason, #st{changes_pid = Pid}) ->
    exit(Pid, kill),
    ok.


code_change(_OldVsn, #st{}=State, _Extra) ->
    {ok, State}.


%% internal functions


listen_for_changes(Since) ->
    DbName = metadata_db(),
    Args = #changes_args{
        feed = "continuous",
        since = Since,
        heartbeat = true,
        include_docs = true
    },
    fabric:changes(DbName, fun changes_callback/2, Since, Args).


changes_callback(start, Since) ->
    {ok, Since};
changes_callback({stop, EndSeq}, _) ->
    exit({seq, EndSeq});
changes_callback({change, {Change}}, _) ->
    Id = couch_util:get_value(id, Change),
    case couch_util:get_value(deleted, Change, false) of
        true ->
            gen_server:cast(?MODULE, {delete_meta, Id});
        false ->
            case couch_util:get_value(doc, Change) of
                {error, Reason} ->
                    couch_log:warning(
                        "could not retrieve metadata doc ~s: ~p", [Id, Reason]);
                Props ->
                    gen_server:cast(?MODULE, {insert_cached_meta, {Id, Props}})
            end
    end,
    {ok, couch_util:get_value(seq, Change)};
changes_callback(timeout, EndSeq) ->
    exit({seq, EndSeq});
changes_callback({error, database_does_not_exist}, _EndSeq) ->
    exit({error, database_does_not_exist});
changes_callback({error, _}, EndSeq) ->
    exit({seq, EndSeq}).


%% internal metadata functions


load_meta_from_db(DbName, MetaId) ->
    Args = [DbName, MetaId, []],
    Timeout = cache_timeout(),
    case couch_util:with_proc(fabric, open_doc, Args, Timeout) of
        {ok, {ok, Doc}} ->
            couch_doc:to_json_obj(Doc, []);
        {ok, {not_found, missing}} ->
            undefined;
        {error, {database_does_not_exist, _}} ->
            undefined;
        {error, timeout} ->
            couch_log:notice("timeout retrieving metadata doc ~s", [MetaId]),
            {error, timeout};
        {error, Error} ->
            {error, Error}
     end.


load_meta(MetaId) ->
    load_meta(MetaId, true).


load_meta(MetaId, _UseCache=true) ->
    case fetch_cached_meta(MetaId) of
        undefined ->
            load_meta(MetaId, false);
        Props ->
            Props
    end;
load_meta(MetaId, _UseCache=false) ->
    gen_server:call(?MODULE, {fetch_meta, MetaId}).


load_meta(MetaId, _UseCache=false, Db) ->
    gen_server:call(?MODULE, {fetch_meta, Db, MetaId}).


fetch_cached_meta(MetaId) ->
    try ets:lookup(?META_TABLE, MetaId) of
        [{MetaId, Props}] ->
            Props;
        [] ->
            couch_log:notice("cache miss on metadata ~s", [MetaId]),
            undefined
        catch error:badarg ->
            couch_log:notice("cache miss on metadata ~s", [MetaId]),
            undefined
    end.


cleanup_old_docs(MetaId) ->
    MetaGroupId = filename:rootname(binary_to_list(MetaId)),
    {ok, {_MetaId, Docs}} = fabric:all_docs(
        metadata_db(),
        fun cleanup_old_docs_callback/2,
        {MetaId, []},
        [
            {start_key, list_to_binary(MetaGroupId)},
            {end_key, list_to_binary(MetaGroupId ++ "Z")},
            {include_docs, true}
        ]
    ),
    {ok, _Revs} = delete_meta_docs(Docs),
    ok.


cleanup_old_docs_callback({meta, _}, {MetaId, Acc}) ->
    {ok, {MetaId, Acc}};
cleanup_old_docs_callback({row, Row}, {MetaId, Acc}) ->
    Id = couch_util:get_value(id, Row),
    case MetaId == Id of
        true ->
            {ok, {MetaId, Acc}};
        false ->
            Doc = couch_doc:from_json_obj(couch_util:get_value(doc, Row)),
            {ok, {MetaId, [Doc|Acc]}}
    end;
cleanup_old_docs_callback(complete, {MetaId, Acc}) ->
    {ok, {MetaId, Acc}};
cleanup_old_docs_callback({error, Reason}, {MetaId, Acc}) ->
    couch_log:error("Unable clean up all old docs on ~p: ~p", [MetaId, Reason]),
    {ok, {MetaId, Acc}}.


delete_meta_docs(Docs0) ->
    Docs = [D#doc{deleted=true, body={[]}} || D <- Docs0],
    DbName = metadata_db(),
    fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).
