| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_mrview_compactor). |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("couch_mrview/include/couch_mrview.hrl"). |
| |
| -export([compact/3, swap_compacted/2]). |
| |
| -record(acc, { |
| btree = nil, |
| last_id = nil, |
| kvs = [], |
| kvs_size = 0, |
| changes = 0, |
| total_changes |
| }). |
| |
| |
| compact(_Db, State, Opts) -> |
| case lists:member(recompact, Opts) of |
| false -> compact(State); |
| true -> recompact(State) |
| end. |
| |
| compact(State) -> |
| #mrst{ |
| db_name=DbName, |
| idx_name=IdxName, |
| sig=Sig, |
| update_seq=Seq, |
| id_btree=IdBtree, |
| log_btree=LogBtree, |
| seq_indexed=SeqIndexed, |
| views=Views |
| } = State, |
| |
| {EmptyState, NumDocIds} = couch_util:with_db(DbName, fun(Db) -> |
| CompactFName = couch_mrview_util:compaction_file(DbName, Sig), |
| {ok, Fd} = couch_mrview_util:open_file(CompactFName), |
| ESt = couch_mrview_util:reset_index(Db, Fd, State), |
| |
| {ok, DbReduce} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree), |
| Count = element(1, DbReduce), |
| |
| {ESt, Count} |
| end), |
| |
| #mrst{ |
| id_btree = EmptyIdBtree, |
| log_btree = EmptyLogBtree, |
| views = EmptyViews |
| } = EmptyState, |
| |
| TotalChanges0 = case SeqIndexed of |
| true -> NumDocIds * 2; |
| _ -> NumDocIds |
| end, |
| |
| TotalChanges = lists:foldl( |
| fun(View, Acc) -> |
| {ok, Kvs} = couch_mrview_util:get_row_count(View), |
| case SeqIndexed of |
| true -> |
| {ok, SKvs} = couch_mrview_util:get_view_changes_count(View), |
| Acc + Kvs + SKvs * 2; |
| false -> |
| Acc + Kvs |
| end |
| end, |
| TotalChanges0, Views), |
| |
| couch_task_status:add_task([ |
| {type, view_compaction}, |
| {database, DbName}, |
| {design_document, IdxName}, |
| {progress, 0} |
| ]), |
| |
| BufferSize0 = couch_config:get( |
| "view_compaction", "keyvalue_buffer_size", "2097152" |
| ), |
| BufferSize = list_to_integer(BufferSize0), |
| |
| FoldFun = fun({DocId, _} = KV, Acc) -> |
| #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize, last_id = LastId} = Acc, |
| if DocId =:= LastId -> |
| % COUCHDB-999 regression test |
| ?LOG_ERROR("Duplicate docid `~s` detected in view group `~s`" |
| ++ ", database `~s` - This view needs to be rebuilt.", |
| [DocId, IdxName, DbName] |
| ), |
| exit({view_duplicate_id, DocId}); |
| true -> ok end, |
| KvsSize2 = KvsSize + ?term_size(KV), |
| case KvsSize2 >= BufferSize of |
| true -> |
| {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])), |
| Acc2 = update_task(Acc, 1 + length(Kvs)), |
| {ok, Acc2#acc{ |
| btree = Bt2, kvs = [], kvs_size = 0, last_id = DocId}}; |
| _ -> |
| {ok, Acc#acc{ |
| kvs = [KV | Kvs], kvs_size = KvsSize2, last_id = DocId}} |
| end |
| end, |
| |
| InitAcc = #acc{total_changes = TotalChanges, btree = EmptyIdBtree}, |
| {ok, _, FinalAcc} = couch_btree:foldl(IdBtree, FoldFun, InitAcc), |
| #acc{btree = Bt3, kvs = Uncopied} = FinalAcc, |
| {ok, NewIdBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)), |
| FinalAcc2 = update_task(FinalAcc, length(Uncopied)), |
| |
| |
| {NewLogBtree, FinalAcc3} = case SeqIndexed of |
| true -> |
| compact_log(LogBtree, BufferSize, |
| FinalAcc2#acc{kvs=[], |
| kvs_size=0, |
| btree=EmptyLogBtree}); |
| _ -> |
| {nil, FinalAcc2} |
| end, |
| |
| {NewViews, _} = lists:mapfoldl(fun({View, EmptyView}, Acc) -> |
| compact_view(View, EmptyView, BufferSize, Acc) |
| end, FinalAcc3, lists:zip(Views, EmptyViews)), |
| |
| unlink(EmptyState#mrst.fd), |
| {ok, EmptyState#mrst{ |
| id_btree=NewIdBtree, |
| log_btree=NewLogBtree, |
| views=NewViews, |
| update_seq=Seq |
| }}. |
| |
| |
| recompact(State) -> |
| link(State#mrst.fd), |
| {Pid, Ref} = erlang:spawn_monitor(fun() -> |
| couch_index_updater:update(couch_mrview_index, State) |
| end), |
| receive |
| {'DOWN', Ref, _, _, {updated, Pid, State2}} -> |
| unlink(State#mrst.fd), |
| {ok, State2} |
| end. |
| |
| compact_log(LogBtree, BufferSize, Acc0) -> |
| FoldFun = fun({DocId, _} = KV, Acc) -> |
| #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc, |
| KvsSize2 = KvsSize + ?term_size(KV), |
| case KvsSize2 >= BufferSize of |
| true -> |
| {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])), |
| Acc2 = update_task(Acc, 1 + length(Kvs)), |
| {ok, Acc2#acc{ |
| btree = Bt2, kvs = [], kvs_size = 0}}; |
| _ -> |
| {ok, Acc#acc{ |
| kvs = [KV | Kvs], kvs_size = KvsSize2}} |
| end |
| end, |
| |
| {ok, _, FinalAcc} = couch_btree:foldl(LogBtree, FoldFun, Acc0), |
| #acc{btree = Bt3, kvs = Uncopied} = FinalAcc, |
| {ok, NewLogBtree} = couch_btree:add(Bt3, lists:reverse(Uncopied)), |
| FinalAcc2 = update_task(FinalAcc, length(Uncopied)), |
| {NewLogBtree, FinalAcc2}. |
| |
| %% @spec compact_view(View, EmptyView, Retry, Acc) -> {CompactView, NewAcc} |
| compact_view(View, EmptyView, BufferSize, Acc0) -> |
| |
| {NewBt, Acc1} = compact_view_btree(View#mrview.btree, |
| EmptyView#mrview.btree, |
| BufferSize, Acc0), |
| |
| %% are we indexing changes by sequences? |
| {NewSeqBt, NewKeyBySeqBt, FinalAcc} = case View#mrview.seq_indexed of |
| true -> |
| {SBt, Acc2} = compact_view_btree(View#mrview.seq_btree, |
| EmptyView#mrview.seq_btree, |
| BufferSize, Acc1), |
| {KSBt, Acc3} = compact_view_btree(View#mrview.key_byseq_btree, |
| EmptyView#mrview.key_byseq_btree, |
| BufferSize, Acc2), |
| {SBt, KSBt, Acc3}; |
| _ -> |
| {nil, nil, Acc1} |
| end, |
| {EmptyView#mrview{btree=NewBt, |
| seq_btree=NewSeqBt, |
| key_byseq_btree=NewKeyBySeqBt}, FinalAcc}. |
| |
| compact_view_btree(Btree, EmptyBtree, BufferSize, Acc0) -> |
| Fun = fun(KV, #acc{btree = Bt, kvs = Kvs, kvs_size = KvsSize} = Acc) -> |
| KvsSize2 = KvsSize + ?term_size(KV), |
| if KvsSize2 >= BufferSize -> |
| {ok, Bt2} = couch_btree:add(Bt, lists:reverse([KV | Kvs])), |
| Acc2 = update_task(Acc, 1 + length(Kvs)), |
| {ok, Acc2#acc{btree = Bt2, kvs = [], kvs_size = 0}}; |
| true -> |
| {ok, Acc#acc{kvs = [KV | Kvs], kvs_size = KvsSize2}} |
| end |
| end, |
| |
| InitAcc = Acc0#acc{kvs = [], kvs_size = 0, btree = EmptyBtree}, |
| {ok, _, FinalAcc} = couch_btree:foldl(Btree, Fun, InitAcc), |
| #acc{btree = Bt3, kvs = Uncopied} = FinalAcc, |
| {ok, NewBt} = couch_btree:add(Bt3, lists:reverse(Uncopied)), |
| FinalAcc2 = update_task(FinalAcc, length(Uncopied)), |
| {NewBt, FinalAcc2}. |
| |
| update_task(#acc{changes = Changes, total_changes = Total} = Acc, ChangesInc) -> |
| Changes2 = Changes + ChangesInc, |
| couch_task_status:update([{progress, (Changes2 * 100) div Total}]), |
| Acc#acc{changes = Changes2}. |
| |
| |
| swap_compacted(OldState, NewState) -> |
| #mrst{ |
| sig=Sig, |
| db_name=DbName |
| } = NewState, |
| |
| link(NewState#mrst.fd), |
| |
| RootDir = couch_index_util:root_dir(), |
| IndexFName = couch_mrview_util:index_file(DbName, Sig), |
| CompactFName = couch_mrview_util:compaction_file(DbName, Sig), |
| ok = couch_file:delete(RootDir, IndexFName), |
| ok = file:rename(CompactFName, IndexFName), |
| |
| unlink(OldState#mrst.fd), |
| couch_ref_counter:drop(OldState#mrst.refc), |
| {ok, NewRefCounter} = couch_ref_counter:start([NewState#mrst.fd]), |
| |
| {ok, NewState#mrst{refc=NewRefCounter}}. |