blob: 0141d6bfd15b8d676c576d229569af8910ca7b78 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_views_indexer).
-export([
spawn_link/0
]).
-export([
init/0,
map_docs/2,
write_docs/4
]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-include("couch_views.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("fabric/include/fabric2.hrl").
-include_lib("kernel/include/logger.hrl").
-define(KEY_SIZE_LIMIT, 8000).
-define(VALUE_SIZE_LIMIT, 64000).
-define(DEFAULT_TX_RETRY_LIMIT, 5).
% These are all of the errors that we can fix by using
% a smaller batch size.
-define(IS_RECOVERABLE_ERROR(Code),
((Code == ?ERLFDB_TIMED_OUT) orelse
(Code == ?ERLFDB_TRANSACTION_TOO_OLD) orelse
(Code == ?ERLFDB_TRANSACTION_TIMED_OUT) orelse
(Code == ?ERLFDB_TRANSACTION_TOO_LARGE))
).
spawn_link() ->
proc_lib:spawn_link(?MODULE, init, []).
init() ->
Opts = #{no_schedule => true},
{ok, Job, Data0} = couch_jobs:accept(?INDEX_JOB_TYPE, Opts),
couch_views_server:accepted(self()),
Data = upgrade_data(Data0),
#{
<<"db_name">> := DbName,
<<"db_uuid">> := DbUUID,
<<"ddoc_id">> := DDocId,
<<"sig">> := JobSig,
<<"retries">> := Retries
} = Data,
{ok, Db} =
try
fabric2_db:open(DbName, [?ADMIN_CTX, {uuid, DbUUID}])
catch
error:database_does_not_exist ->
fail_job(Job, Data, db_deleted, "Database was deleted")
end,
{ok, DDoc} =
case fabric2_db:open_doc(Db, DDocId) of
{ok, DDoc0} ->
{ok, DDoc0};
{not_found, _} ->
fail_job(Job, Data, ddoc_deleted, "Design document was deleted")
end,
{ok, Mrst} = couch_views_util:ddoc_to_mrst(DbName, DDoc),
HexSig = fabric2_util:to_hex(Mrst#mrst.sig),
if
HexSig == JobSig -> ok;
true -> fail_job(Job, Data, sig_changed, "Design document was modified")
end,
DbSeq = fabric2_fdb:transactional(Db, fun(TxDb) ->
fabric2_fdb:with_snapshot(TxDb, fun(SSDb) ->
fabric2_db:get_update_seq(SSDb)
end)
end),
State = #{
tx_db => undefined,
db_uuid => DbUUID,
db_seq => DbSeq,
view_seq => undefined,
last_seq => undefined,
view_vs => undefined,
job => Job,
job_data => Data,
rows_processed => 0,
count => 0,
changes_done => 0,
doc_acc => [],
design_opts => Mrst#mrst.design_opts,
update_stats => #{},
tx_retry_limit => tx_retry_limit(),
db_read_vsn => ?VIEW_CURRENT_VSN,
view_read_vsn => ?VIEW_CURRENT_VSN
},
try
update(Db, Mrst, State)
catch
exit:normal ->
ok;
error:database_does_not_exist ->
fail_job(Job, Data, db_deleted, "Database was deleted");
Error:Reason:Stack ->
?LOG_ERROR(#{
what => view_update_failure,
db => DbName,
ddoc => DDocId,
tag => Error,
details => Reason,
stacktrace => Stack
}),
Fmt = "Error building view for ddoc ~s in ~s: ~p:~p ~p",
couch_log:error(Fmt, [DbName, DDocId, Error, Reason, Stack]),
NewRetry = Retries + 1,
RetryLimit = retry_limit(),
case should_retry(NewRetry, RetryLimit, Reason) of
true ->
DataErr = Data#{<<"retries">> := NewRetry},
% Set the last_seq to 0 so that it doesn't trigger a
% successful view build for anyone listening to the
% couch_views_jobs:wait_for_job
% Note this won't cause the view to rebuild from 0 again
StateErr = State#{job_data := DataErr, last_seq := <<"0">>},
report_progress(StateErr, update);
false ->
fail_job(Job, Data, Error, Reason)
end
end.
upgrade_data(Data) ->
Defaults = [
{<<"retries">>, 0},
{<<"db_uuid">>, undefined}
],
lists:foldl(
fun({Key, Default}, Acc) ->
case maps:is_key(Key, Acc) of
true -> Acc;
false -> maps:put(Key, Default, Acc)
end
end,
Data,
Defaults
),
% initialize active task
fabric2_active_tasks:update_active_task_info(Data, #{}).
% Transaction limit exceeded don't retry
should_retry(_, _, {erlfdb_error, ?ERLFDB_TRANSACTION_TOO_LARGE}) ->
false;
should_retry(Retries, RetryLimit, _) when Retries < RetryLimit ->
true;
should_retry(_, _, _) ->
false.
add_error(error, {erlfdb_error, Code}, Data) ->
CodeBin = couch_util:to_binary(Code),
CodeString = erlfdb:get_error_string(Code),
Data#{
error => foundationdb_error,
reason => list_to_binary([CodeBin, <<"-">>, CodeString])
};
add_error(Error, Reason, Data) ->
Data#{
error => couch_util:to_binary(Error),
reason => couch_util:to_binary(Reason)
}.
update(#{} = Db, Mrst0, State0) ->
Limit = couch_views_batch:start(Mrst0),
Result =
try
do_update(Db, Mrst0, State0#{limit => Limit})
catch
error:{erlfdb_error, Error} when ?IS_RECOVERABLE_ERROR(Error) ->
couch_views_batch:failure(Mrst0),
update(Db, Mrst0, State0)
end,
case Result of
ok ->
% Already finished and released map context
ok;
{Mrst1, finished} ->
couch_eval:release_map_context(Mrst1#mrst.qserver);
{Mrst1, State1} ->
#{
update_stats := UpdateStats
} = State1,
couch_views_batch:success(Mrst1, UpdateStats),
update(Db, Mrst1, State1)
end.
do_update(Db, Mrst0, State0) ->
TxOpts = #{retry_limit => maps:get(tx_retry_limit, State0)},
TxResult = fabric2_fdb:transactional(Db, TxOpts, fun(TxDb) ->
#{
tx := Tx
} = TxDb,
Snapshot = TxDb#{tx := erlfdb:snapshot(Tx)},
State1 = get_update_start_state(TxDb, Mrst0, State0),
Mrst1 = couch_views_trees:open(TxDb, Mrst0),
{ok, State2} = fold_changes(Snapshot, State1),
#{
doc_acc := DocAcc,
last_seq := LastSeq,
changes_done := ChangesDone0,
design_opts := DesignOpts
} = State2,
DocAcc1 = fetch_docs(Snapshot, DesignOpts, DocAcc),
{Mrst2, MappedDocs} = map_docs(Mrst0, DocAcc1),
TotalKVs = write_docs(TxDb, Mrst1, MappedDocs, State2),
ChangesDone = ChangesDone0 + length(DocAcc),
UpdateStats = #{
docs_read => length(DocAcc),
tx_size => erlfdb:wait(erlfdb:get_approximate_size(Tx)),
total_kvs => TotalKVs
},
case is_update_finished(State2) of
true ->
State3 = State2#{changes_done := ChangesDone},
% We must call report_progress/2 (which, in turn calls
% couch_jobs:update/3) in every transaction where indexing data
% is updated, otherwise we risk another indexer taking over and
% clobbering the indexing data
State4 = report_progress(State3, update),
{Mrst2, finished, State4#{
db_read_vsn := erlfdb:wait(erlfdb:get_read_version(Tx))
}};
false ->
State3 = report_progress(State2, update),
{Mrst2, continue, State3#{
tx_db := undefined,
count := 0,
doc_acc := [],
changes_done := ChangesDone,
view_seq := LastSeq,
update_stats := UpdateStats
}}
end
end),
case TxResult of
{Mrst, continue, State} ->
{Mrst, State};
{Mrst, finished, State} ->
do_finalize(Mrst, State),
{Mrst, finished}
end.
do_finalize(Mrst, State) ->
#{tx_db := OldDb} = State,
ViewReadVsn = erlfdb:get_committed_version(maps:get(tx, OldDb)),
fabric2_fdb:transactional(OldDb#{tx := undefined}, fun(TxDb) ->
% Use the recent committed version as the read
% version. However, if transaction retries due to an error,
% let it acquire its own version to avoid spinning
% continuously due to conflicts or other errors.
case erlfdb:get_last_error() of
undefined ->
erlfdb:set_read_version(maps:get(tx, TxDb), ViewReadVsn);
ErrorCode when is_integer(ErrorCode) ->
ok
end,
State1 = State#{
tx_db := TxDb,
view_read_vsn := ViewReadVsn
},
ViewVS = maps:get(view_vs, State1),
maybe_set_build_status(TxDb, Mrst, ViewVS, ?INDEX_READY),
report_progress(State1, finished)
end).
is_update_finished(State) ->
#{
db_seq := DbSeq,
last_seq := LastSeq,
view_vs := ViewVs
} = State,
AtDbSeq = LastSeq == DbSeq,
AtViewVs =
case ViewVs of
not_found -> false;
_ -> LastSeq == fabric2_fdb:vs_to_seq(ViewVs)
end,
AtDbSeq orelse AtViewVs.
maybe_set_build_status(_TxDb, _Mrst1, not_found, _State) ->
ok;
maybe_set_build_status(TxDb, Mrst1, _ViewVS, State) ->
couch_views_fdb:set_build_status(TxDb, Mrst1, State).
% In the first iteration of update we need
% to populate our db and view sequences
get_update_start_state(TxDb, Mrst, #{view_seq := undefined} = State) ->
#{
view_vs := ViewVS,
view_seq := ViewSeq
} = couch_views_fdb:get_view_state(TxDb, Mrst),
State#{
tx_db := TxDb,
view_vs := ViewVS,
view_seq := ViewSeq,
last_seq := ViewSeq
};
get_update_start_state(TxDb, _Idx, State) ->
State#{
tx_db := TxDb
}.
fold_changes(Snapshot, State) ->
#{
view_seq := SinceSeq,
db_seq := DbSeq,
limit := Limit
} = State,
FoldState = State#{
rows_processed := 0
},
Fun = fun process_changes/2,
Opts = [
{end_key, fabric2_fdb:seq_to_vs(DbSeq)},
{limit, Limit},
{restart_tx, false}
],
case fabric2_db:fold_changes(Snapshot, SinceSeq, Fun, FoldState, Opts) of
{ok, #{rows_processed := 0} = FinalState} when Limit > 0 ->
% If we read zero rows with a non-zero limit
% it means we've caught up to the DbSeq as our
% last_seq.
{ok, FinalState#{last_seq := DbSeq}};
Result ->
Result
end.
process_changes(Change, Acc) ->
#{
doc_acc := DocAcc,
rows_processed := RowsProcessed,
count := Count,
design_opts := DesignOpts,
view_vs := ViewVS
} = Acc,
#{
id := Id,
sequence := LastSeq
} = Change,
IncludeDesign = lists:keymember(<<"include_design">>, 1, DesignOpts),
Acc1 =
case {Id, IncludeDesign} of
{<<?DESIGN_DOC_PREFIX, _/binary>>, false} ->
maps:merge(Acc, #{
rows_processed => RowsProcessed + 1,
count => Count + 1,
last_seq => LastSeq
});
_ ->
Acc#{
doc_acc := DocAcc ++ [Change],
rows_processed := RowsProcessed + 1,
count := Count + 1,
last_seq := LastSeq
}
end,
DocVS = fabric2_fdb:seq_to_vs(LastSeq),
Go = maybe_stop_at_vs(ViewVS, DocVS),
{Go, Acc1}.
maybe_stop_at_vs({versionstamp, _} = ViewVS, DocVS) when DocVS >= ViewVS ->
stop;
maybe_stop_at_vs(_, _) ->
ok.
map_docs(Mrst, []) ->
{Mrst, []};
map_docs(Mrst, Docs) ->
% Run all the non deleted docs through the view engine and
Mrst1 = start_query_server(Mrst),
QServer = Mrst1#mrst.qserver,
{Deleted0, NotDeleted0} = lists:partition(
fun(Doc) ->
#{deleted := Deleted} = Doc,
Deleted
end,
Docs
),
Deleted1 = lists:map(
fun(Doc) ->
Doc#{results => [[] || _ <- Mrst1#mrst.views]}
end,
Deleted0
),
DocsToMap = lists:map(
fun(Doc) ->
#{doc := DocRec} = Doc,
DocRec
end,
NotDeleted0
),
{ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap),
% The expanded function head here is making an assertion
% that the results match the given doc
NotDeleted1 = lists:zipwith(
fun(#{id := DocId} = Doc, {DocId, Results}) ->
Doc#{results => Results}
end,
NotDeleted0,
AllResults
),
% I'm being a bit careful here resorting the docs
% in order of the changes feed. Theoretically this is
% unnecessary since we're inside a single transaction.
% However, I'm concerned if we ever split this up
% into multiple transactions that this detail might
% be important but forgotten.
MappedDocs = lists:sort(
fun(A, B) ->
#{sequence := ASeq} = A,
#{sequence := BSeq} = B,
ASeq =< BSeq
end,
Deleted1 ++ NotDeleted1
),
{Mrst1, MappedDocs}.
write_docs(TxDb, Mrst, Docs0, State) ->
#mrst{
sig = Sig
} = Mrst,
#{
last_seq := LastSeq
} = State,
KeyLimit = key_size_limit(),
ValLimit = value_size_limit(),
{Docs1, TotalKVCount} = lists:mapfoldl(
fun(Doc0, KVCount) ->
Doc1 = check_kv_size_limit(Mrst, Doc0, KeyLimit, ValLimit),
{Doc1, KVCount + count_kvs(Doc1)}
end,
0,
Docs0
),
couch_views_trees:update_views(TxDb, Mrst, Docs1),
if
LastSeq == false -> ok;
true -> couch_views_fdb:set_update_seq(TxDb, Sig, LastSeq)
end,
TotalKVCount.
fetch_docs(Db, DesignOpts, Changes) ->
{Deleted, NotDeleted} = lists:partition(
fun(Doc) ->
#{deleted := Deleted} = Doc,
Deleted
end,
Changes
),
RevState = lists:foldl(
fun(Change, Acc) ->
#{id := Id} = Change,
RevFuture = fabric2_fdb:get_winning_revs_future(Db, Id, 1),
Acc#{
RevFuture => {Id, Change}
}
end,
#{},
NotDeleted
),
RevFutures = maps:keys(RevState),
BodyState = lists:foldl(
fun(RevFuture, Acc) ->
{Id, Change} = maps:get(RevFuture, RevState),
Revs = fabric2_fdb:get_revs_wait(Db, RevFuture),
% I'm assuming that in this changes transaction that the winning
% doc body exists since it is listed in the changes feed as not deleted
#{winner := true} = RevInfo = lists:last(Revs),
BodyFuture = fabric2_fdb:get_doc_body_future(Db, Id, RevInfo),
Acc#{
BodyFuture => {Id, RevInfo, Change}
}
end,
#{},
erlfdb:wait_for_all(RevFutures)
),
AddLocalSeq = fabric2_util:get_value(<<"local_seq">>, DesignOpts, false),
BodyFutures = maps:keys(BodyState),
ChangesWithDocs = lists:map(
fun(BodyFuture) ->
{Id, RevInfo, Change} = maps:get(BodyFuture, BodyState),
Doc = fabric2_fdb:get_doc_body_wait(Db, Id, RevInfo, BodyFuture),
Doc1 =
case maps:get(branch_count, RevInfo, 1) of
1 when AddLocalSeq ->
{ok, DocWithLocalSeq} = fabric2_db:apply_open_doc_opts(
Doc,
[RevInfo],
[local_seq]
),
DocWithLocalSeq;
1 ->
Doc;
_ ->
RevConflicts = fabric2_fdb:get_all_revs(Db, Id),
DocOpts =
if
not AddLocalSeq -> [];
true -> [local_seq]
end,
{ok, DocWithConflicts} = fabric2_db:apply_open_doc_opts(
Doc,
RevConflicts,
[conflicts | DocOpts]
),
DocWithConflicts
end,
Change#{doc => Doc1}
end,
erlfdb:wait_for_all(BodyFutures)
),
% This combines the deleted changes with the changes that contain docs
% Important to note that this is now unsorted. Which is fine for now
% But later could be an issue if we split this across transactions
Deleted ++ ChangesWithDocs.
start_query_server(#mrst{qserver = nil} = Mrst) ->
#mrst{
db_name = DbName,
idx_name = DDocId,
language = Language,
sig = Sig,
lib = Lib,
views = Views
} = Mrst,
case
couch_eval:acquire_map_context(
DbName,
DDocId,
Language,
Sig,
Lib,
[View#mrview.def || View <- Views]
)
of
{ok, QServer} ->
Mrst#mrst{qserver = QServer};
{error, Error} ->
error(Error)
end;
start_query_server(#mrst{} = Mrst) ->
Mrst.
check_kv_size_limit(Mrst, Doc, KeyLimit, ValLimit) ->
#mrst{
db_name = DbName,
idx_name = IdxName
} = Mrst,
#{
results := Results
} = Doc,
try
lists:foreach(
fun(ViewRows) ->
lists:foreach(
fun({K, V}) ->
KeySize = couch_ejson_size:encoded_size(K),
ValSize = couch_ejson_size:encoded_size(V),
if
KeySize =< KeyLimit -> ok;
true -> throw({size_error, key})
end,
if
ValSize =< ValLimit -> ok;
true -> throw({size_error, value})
end
end,
ViewRows
)
end,
Results
),
Doc
catch
throw:{size_error, Type} ->
#{id := DocId} = Doc,
?LOG_ERROR(#{
what => lists:concat(["oversized_", Type]),
db => DbName,
docid => DocId,
index => IdxName
}),
Fmt =
"View ~s size error for docid `~s`, excluded from indexing "
"in db `~s` for design doc `~s`",
couch_log:error(Fmt, [Type, DocId, DbName, IdxName]),
Doc#{
deleted := true,
results := [[] || _ <- Mrst#mrst.views],
kv_sizes => []
}
end.
count_kvs(Doc) ->
#{
results := Results
} = Doc,
lists:foldl(
fun(ViewRows, Count) ->
Count + length(ViewRows)
end,
0,
Results
).
report_progress(State, UpdateType) ->
#{
tx_db := TxDb,
job := Job1,
job_data := JobData,
last_seq := LastSeq,
db_seq := DBSeq,
changes_done := ChangesDone,
db_read_vsn := DbReadVsn,
view_read_vsn := ViewReadVsn
} = State,
#{
<<"db_name">> := DbName,
<<"db_uuid">> := DbUUID,
<<"ddoc_id">> := DDocId,
<<"sig">> := Sig,
<<"retries">> := Retries
} = JobData,
ActiveTasks = fabric2_active_tasks:get_active_task_info(JobData),
TotalDone =
case maps:get(<<"changes_done">>, ActiveTasks, 0) of
0 -> ChangesDone;
N -> N + ChangesDone
end,
NewActiveTasks = couch_views_util:active_tasks_info(
TotalDone,
DbName,
DDocId,
LastSeq,
DBSeq
),
% Reconstruct from scratch to remove any
% possible existing error state.
NewData0 = #{
<<"db_name">> => DbName,
<<"db_uuid">> => DbUUID,
<<"ddoc_id">> => DDocId,
<<"sig">> => Sig,
<<"view_seq">> => LastSeq,
<<"retries">> => Retries,
<<"db_read_vsn">> => DbReadVsn,
<<"view_read_vsn">> => ViewReadVsn
},
NewData = fabric2_active_tasks:update_active_task_info(
NewData0,
NewActiveTasks
),
case UpdateType of
update ->
case couch_jobs:update(TxDb, Job1, NewData) of
{ok, Job2} ->
State#{job := Job2};
{error, halt} ->
?LOG_ERROR(#{what => job_halted, job => Job1}),
couch_log:error("~s job halted :: ~w", [?MODULE, Job1]),
exit(normal)
end;
finished ->
case couch_jobs:finish(TxDb, Job1, NewData) of
ok ->
State;
{error, halt} ->
?LOG_ERROR(#{what => job_halted, job => Job1}),
couch_log:error("~s job halted :: ~w", [?MODULE, Job1]),
exit(normal)
end
end.
fail_job(Job, Data, Error, Reason) ->
NewData = add_error(Error, Reason, Data),
couch_jobs:finish(undefined, Job, NewData),
exit(normal).
retry_limit() ->
config:get_integer("couch_views", "retry_limit", 3).
key_size_limit() ->
config:get_integer("couch_views", "key_size_limit", ?KEY_SIZE_LIMIT).
value_size_limit() ->
config:get_integer("couch_views", "value_size_limit", ?VALUE_SIZE_LIMIT).
tx_retry_limit() ->
config:get_integer(
"couch_views",
"indexer_tx_retry_limit",
?DEFAULT_TX_RETRY_LIMIT
).