blob: 1d11e98b4d5f829a16992b689d1b3a23a752f34e [file] [log] [blame]
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-module(nouveau_index_updater).
-include_lib("couch/include/couch_db.hrl").
-include("nouveau.hrl").
%% public api
-export([outdated/1]).
%% callbacks
-export([update/1]).
-import(couch_query_servers, [get_os_process/1, ret_os_process/1, proc_prompt/2]).
-import(nouveau_util, [index_path/1]).
-record(acc, {
db,
index,
proc,
changes_done,
total_changes,
exclude_idrevs,
reqids,
conn_pid,
update_seq,
max_pipeline_size
}).
-record(purge_acc, {
exclude_list = [],
index_update_seq,
index_purge_seq
}).
outdated(#index{} = Index) ->
case open_or_create_index(Index) of
{ok, #{} = Info} ->
#{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := IndexPurgeSeq} = Info,
{DbUpdateSeq, DbPurgeSeq} = get_db_info(Index),
DbUpdateSeq > IndexUpdateSeq orelse DbPurgeSeq > IndexPurgeSeq;
{error, Reason} ->
{error, Reason}
end.
update(#index{} = Index) ->
{ok, Db} = couch_db:open_int(Index#index.dbname, []),
try
case open_or_create_index(Db, Index) of
{error, Reason} ->
exit({error, Reason});
{ok, #{} = Info} ->
#{<<"update_seq">> := IndexUpdateSeq, <<"purge_seq">> := IndexPurgeSeq} = Info,
ChangesSince = couch_db:count_changes_since(Db, IndexUpdateSeq),
PurgesSince = couch_db:get_purge_seq(Db) - IndexPurgeSeq,
TotalChanges = ChangesSince + PurgesSince,
couch_task_status:add_task([
{type, search_indexer},
{database, Index#index.dbname},
{design_document, Index#index.ddoc_id},
{index, Index#index.name},
{progress, 0},
{changes_done, 0},
{total_changes, TotalChanges}
]),
%% update status every half second
couch_task_status:set_update_frequency(500),
{ok, ConnPid} = ibrowse:spawn_link_worker_process(nouveau_util:nouveau_url()),
PurgeAcc0 = #purge_acc{
index_update_seq = IndexUpdateSeq,
index_purge_seq = IndexPurgeSeq
},
{ok, PurgeAcc1} = purge_index(ConnPid, Db, Index, PurgeAcc0),
NewCurSeq = couch_db:get_update_seq(Db),
Proc = get_os_process(Index#index.def_lang),
try
true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]),
Acc0 = #acc{
db = Db,
index = Index,
proc = Proc,
changes_done = 0,
total_changes = TotalChanges,
exclude_idrevs = PurgeAcc1#purge_acc.exclude_list,
reqids = queue:new(),
conn_pid = ConnPid,
update_seq = PurgeAcc1#purge_acc.index_update_seq,
max_pipeline_size = nouveau_util:max_pipeline_size()
},
{ok, Acc1} = couch_db:fold_changes(
Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, []
),
nouveau_api:drain_async_responses(Acc1#acc.reqids, 0),
exit(nouveau_api:set_update_seq(ConnPid, Index, Acc1#acc.update_seq, NewCurSeq))
after
ibrowse:stop_worker_process(ConnPid),
ret_os_process(Proc)
end
end
after
couch_db:close(Db)
end.
load_docs(#full_doc_info{id = <<"_design/", _/binary>>}, #acc{} = Acc) ->
{ok, Acc};
load_docs(FDI, #acc{} = Acc0) ->
%% block for responses so we stay under the max pipeline size
ReqIds1 = nouveau_api:drain_async_responses(Acc0#acc.reqids, Acc0#acc.max_pipeline_size),
Acc1 = Acc0#acc{reqids = ReqIds1},
couch_task_status:update([
{changes_done, Acc1#acc.changes_done},
{progress, (Acc1#acc.changes_done * 100) div Acc1#acc.total_changes}
]),
DI = couch_doc:to_doc_info(FDI),
#doc_info{id = Id, revs = [#rev_info{rev = Rev} | _]} = DI,
Acc2 =
case lists:member({Id, Rev}, Acc1#acc.exclude_idrevs) of
true ->
Acc1;
false ->
case
update_or_delete_index(
Acc1#acc.conn_pid,
Acc1#acc.db,
Acc1#acc.index,
Acc1#acc.update_seq,
DI,
Acc1#acc.proc
)
of
{ibrowse_req_id, ReqId} ->
Acc1#acc{
update_seq = DI#doc_info.high_seq,
reqids = queue:in(ReqId, Acc1#acc.reqids)
};
{error, Reason} ->
exit({error, Reason})
end
end,
{ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}.
update_or_delete_index(ConnPid, Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) ->
#doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI,
case Del of
true ->
nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, Seq);
false ->
{ok, Doc} = couch_db:open_doc(Db, DI, []),
Json = couch_doc:to_json_obj(Doc, []),
[Fields | _] = proc_prompt(Proc, [<<"nouveau_index_doc">>, Json]),
Partition =
case couch_db:is_partitioned(Db) of
true ->
couch_partition:from_docid(Id);
false ->
null
end,
case Fields of
[] ->
nouveau_api:delete_doc_async(ConnPid, Index, Id, MatchSeq, Seq);
_ ->
nouveau_api:update_doc_async(
ConnPid, Index, Id, MatchSeq, Seq, Partition, Fields
)
end
end.
open_or_create_index(#index{} = Index) ->
case nouveau_api:index_info(Index) of
{ok, #{} = Info} ->
{ok, Info};
{error, {not_found, _}} ->
case nouveau_api:create_index(Index, index_definition(Index)) of
ok ->
nouveau_api:index_info(Index);
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
open_or_create_index(Db, #index{} = Index) ->
case open_or_create_index(Index) of
{ok, #{} = Info} ->
nouveau_util:maybe_create_local_purge_doc(Db, Index),
{ok, Info};
Else ->
Else
end.
get_db_info(#index{} = Index) ->
{ok, Db} = couch_db:open_int(Index#index.dbname, []),
try
UpdateSeq = couch_db:get_update_seq(Db),
PurgeSeq = couch_db:get_purge_seq(Db),
{UpdateSeq, PurgeSeq}
after
couch_db:close(Db)
end.
index_definition(#index{} = Index) ->
#{
<<"default_analyzer">> => Index#index.default_analyzer,
<<"field_analyzers">> => Index#index.field_analyzers
}.
purge_index(ConnPid, Db, Index, #purge_acc{} = PurgeAcc0) ->
Proc = get_os_process(Index#index.def_lang),
try
true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]),
FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, #purge_acc{} = PurgeAcc1) ->
PurgeAcc2 =
case couch_db:get_full_doc_info(Db, Id) of
not_found ->
ok = nouveau_api:purge_doc(
ConnPid, Index, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq
),
PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq};
FDI ->
DI = couch_doc:to_doc_info(FDI),
#doc_info{id = Id, high_seq = Seq, revs = [#rev_info{rev = Rev} | _]} = DI,
case lists:member({Id, Rev}, PurgeAcc1#purge_acc.exclude_list) of
true ->
PurgeAcc1;
false ->
update_or_delete_index(
ConnPid,
Db,
Index,
PurgeAcc1#purge_acc.index_update_seq,
DI,
Proc
),
PurgeAcc1#purge_acc{
exclude_list = [{Id, Rev} | PurgeAcc1#purge_acc.exclude_list],
index_update_seq = Seq
}
end
end,
update_task(1),
{ok, PurgeAcc2}
end,
{ok, #purge_acc{} = PurgeAcc3} = couch_db:fold_purge_infos(
Db, PurgeAcc0#purge_acc.index_purge_seq, FoldFun, PurgeAcc0, []
),
DbPurgeSeq = couch_db:get_purge_seq(Db),
ok = nouveau_api:set_purge_seq(
ConnPid, Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq
),
update_local_doc(Db, Index, DbPurgeSeq),
{ok, PurgeAcc3}
after
ret_os_process(Proc)
end.
update_task(NumChanges) ->
[Changes, Total] = couch_task_status:get([changes_done, total_changes]),
Changes2 = Changes + NumChanges,
Progress =
case Total of
0 ->
0;
_ ->
(Changes2 * 100) div Total
end,
couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).
update_local_doc(Db, #index{} = Index, PurgeSeq) ->
DocId = nouveau_util:get_local_purge_doc_id(Index#index.sig),
DocContent = nouveau_util:get_local_purge_doc_body(DocId, PurgeSeq, Index),
couch_db:update_doc(Db, DocContent, []).