blob: a7e594f568619abee3971b14bd288ec771aa2e04 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_mrview_compactor).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-export([compact/3, swap_compacted/2]).
-record(acc, {
btree = nil,
last_id = nil,
kvs = [],
kvs_size = 0,
changes = 0,
total_changes
}).
compact(_Db, State, Opts) ->
case lists:member(recompact, Opts) of
false -> compact(State);
true -> recompact(State)
end.
compact(State) ->
#mrst{
db_name=DbName,
idx_name=IdxName,
sig=Sig,
update_seq=Seq,
id_btree=IdBtree,
log_btree=LogBtree,
seq_indexed=SeqIndexed,
views=Views
} = State,
{EmptyState, NumDocIds} = couch_util:with_db(DbName, fun(Db) ->
CompactFName = couch_mrview_util:compaction_file(DbName, Sig),
{ok, Fd} = couch_mrview_util:open_file(CompactFName),
ESt = couch_mrview_util:reset_index(Db, Fd, State),
{ok, DbReduce} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
Count = element(1, DbReduce),
{ESt, Count}
end),
#mrst{
id_btree = EmptyIdBtree,
log_btree = EmptyLogBtree,
views = EmptyViews
} = EmptyState,
TotalChanges0 = case SeqIndexed of
true -> NumDocIds * 2;
_ -> NumDocIds
end,
TotalChanges = lists:foldl(
fun(View, Acc) ->
{ok, Kvs} = couch_mrview_util:get_row_count(View),
case SeqIndexed of
true ->
{ok, SKvs} = couch_mrview_util:get_view_changes_count(View),
Acc + Kvs + SKvs * 2;
false ->
Acc + Kvs
end
end,
TotalChanges0, Views),
couch_task_status:add_task([
{type, view_compaction},
{database, DbName},
{design_document, IdxName},
{progress, 0}
]),
BufferSize0 = couch_config:get(
"view_compaction", "keyvalue_buffer_size", "2097152"
),
BufferSize = list_to_integer(BufferSize0),
FoldFun = fun({DocId, _} = KV, Acc) ->
#acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize, last_id = LastId} = Acc,
if DocId =:= LastId ->
% COUCHDB-999 regression test
?LOG_ERROR("Duplicate docid `~s` detected in view group `~s`"
++ ", database `~s` - This view needs to be rebuilt.",
[DocId, IdxName, DbName]
),
exit({view_duplicate_id, DocId});
true -> ok end,
KvsSize2 = KvsSize + ?term_size(KV),
case KvsSize2 >= BufferSize of
true ->
{ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
Acc2 = update_task(Acc, 1 + length(Kvs)),
{ok, Acc2#acc{
btree = Bt2, kvs = [], kvs_size = 0, last_id = DocId}};
_ ->
{ok, Acc#acc{
kvs = [KV | Kvs], kvs_size = KvsSize2, last_id = DocId}}
end
end,
InitAcc = #acc{total_changes = TotalChanges, btree = EmptyIdBtree},
{ok, _, FinalAcc} = couch_btree:foldl(IdBtree, FoldFun, InitAcc),
#acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
{ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
FinalAcc2 = update_task(FinalAcc, length(Uncopied)),
{NewLogBtree, FinalAcc3} = case SeqIndexed of
true ->
compact_log(LogBtree, BufferSize,
FinalAcc2#acc{kvs=[],
kvs_size=0,
btree=EmptyLogBtree});
_ ->
{nil, FinalAcc2}
end,
{NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) ->
compact_view(View, EmptyView, BufferSize, Acc)
end, FinalAcc3, lists:zip(Views, EmptyViews)),
unlink(EmptyState#mrst.fd),
{ok, EmptyState#mrst{
id_btree=NewIdBtree,
log_btree=NewLogBtree,
views=NewViews,
update_seq=Seq
}}.
recompact(State) ->
link(State#mrst.fd),
{Pid, Ref} = erlang:spawn_monitor(fun() ->
couch_index_updater:update(couch_mrview_index, State)
end),
receive
{'DOWN', Ref, _, _, {updated, Pid, State2}} ->
unlink(State#mrst.fd),
{ok, State2}
end.
compact_log(LogBtree, BufferSize, Acc0) ->
FoldFun = fun({DocId, _} = KV, Acc) ->
#acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc,
KvsSize2 = KvsSize + ?term_size(KV),
case KvsSize2 >= BufferSize of
true ->
{ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
Acc2 = update_task(Acc, 1 + length(Kvs)),
{ok, Acc2#acc{
btree = Bt2, kvs = [], kvs_size = 0}};
_ ->
{ok, Acc#acc{
kvs = [KV | Kvs], kvs_size = KvsSize2}}
end
end,
{ok, _, FinalAcc} = couch_btree:foldl(LogBtree, FoldFun, Acc0),
#acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
{ok, NewLogBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
FinalAcc2 = update_task(FinalAcc, length(Uncopied)),
{NewLogBtree, FinalAcc2}.
%% @spec compact_view(View, EmptyView, Retry, Acc) -> {CompactView, NewAcc}
compact_view(View, EmptyView, BufferSize, Acc0) ->
{NewBt, Acc1} = compact_view_btree(View#mrview.btree,
EmptyView#mrview.btree,
BufferSize, Acc0),
%% are we indexing changes by sequences?
{NewSeqBt, NewKeyBySeqBt, FinalAcc} = case View#mrview.seq_indexed of
true ->
{SBt, Acc2} = compact_view_btree(View#mrview.seq_btree,
EmptyView#mrview.seq_btree,
BufferSize, Acc1),
{KSBt, Acc3} = compact_view_btree(View#mrview.key_byseq_btree,
EmptyView#mrview.key_byseq_btree,
BufferSize, Acc2),
{SBt, KSBt, Acc3};
_ ->
{nil, nil, Acc1}
end,
{EmptyView#mrview{btree=NewBt,
seq_btree=NewSeqBt,
key_byseq_btree=NewKeyBySeqBt}, FinalAcc}.
compact_view_btree(Btree, EmptyBtree, BufferSize, Acc0) ->
Fun = fun(KV, #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc) ->
KvsSize2 = KvsSize + ?term_size(KV),
if KvsSize2 >= BufferSize ->
{ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])),
Acc2 = update_task(Acc, 1 + length(Kvs)),
{ok, Acc2#acc{btree = Bt2, kvs = [], kvs_size = 0}};
true ->
{ok, Acc#acc{kvs = [KV | Kvs], kvs_size = KvsSize2}}
end
end,
InitAcc = Acc0#acc{kvs = [], kvs_size = 0, btree = EmptyBtree},
{ok, _, FinalAcc} = couch_btree:foldl(Btree, Fun, InitAcc),
#acc{btree = Bt3, kvs = Uncopied} = FinalAcc,
{ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)),
FinalAcc2 = update_task(FinalAcc, length(Uncopied)),
{NewBt, FinalAcc2}.
update_task(#acc{changes = Changes, total_changes = Total} = Acc, ChangesInc) ->
Changes2 = Changes + ChangesInc,
couch_task_status:update([{progress, (Changes2 * 100) div Total}]),
Acc#acc{changes = Changes2}.
swap_compacted(OldState, NewState) ->
#mrst{
sig=Sig,
db_name=DbName
} = NewState,
link(NewState#mrst.fd),
RootDir = couch_index_util:root_dir(),
IndexFName = couch_mrview_util:index_file(DbName, Sig),
CompactFName = couch_mrview_util:compaction_file(DbName, Sig),
ok = couch_file:delete(RootDir, IndexFName),
ok = file:rename(CompactFName, IndexFName),
unlink(OldState#mrst.fd),
couch_ref_counter:drop(OldState#mrst.refc),
{ok, NewRefCounter} = couch_ref_counter:start([NewState#mrst.fd]),
{ok, NewState#mrst{refc=NewRefCounter}}.