| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_bt_engine). |
| -behavior(couch_db_engine). |
| |
| -export([ |
| exists/1, |
| |
| delete/3, |
| delete_compaction_files/3, |
| |
| init/2, |
| terminate/2, |
| handle_db_updater_call/2, |
| handle_db_updater_info/2, |
| |
| incref/1, |
| decref/1, |
| monitored_by/1, |
| |
| last_activity/1, |
| |
| get_compacted_seq/1, |
| get_del_doc_count/1, |
| get_disk_version/1, |
| get_doc_count/1, |
| get_epochs/1, |
| get_last_purged/1, |
| get_purge_seq/1, |
| get_revs_limit/1, |
| get_security/1, |
| get_size_info/1, |
| get_update_seq/1, |
| get_uuid/1, |
| |
| set_revs_limit/2, |
| set_security/2, |
| |
| open_docs/2, |
| open_local_docs/2, |
| read_doc_body/2, |
| |
| serialize_doc/2, |
| write_doc_body/2, |
| write_doc_infos/4, |
| |
| commit_data/1, |
| |
| open_write_stream/2, |
| open_read_stream/2, |
| is_active_stream/2, |
| |
| fold_docs/4, |
| fold_local_docs/4, |
| fold_changes/5, |
| count_changes_since/2, |
| |
| start_compaction/4, |
| finish_compaction/4 |
| ]). |
| |
| |
| -export([ |
| init_state/4 |
| ]). |
| |
| |
| -export([ |
| id_tree_split/1, |
| id_tree_join/2, |
| id_tree_reduce/2, |
| |
| seq_tree_split/1, |
| seq_tree_join/2, |
| seq_tree_reduce/2, |
| |
| local_tree_split/1, |
| local_tree_join/2 |
| ]). |
| |
| |
| % Used by the compactor |
| -export([ |
| set_update_seq/2, |
| update_header/2, |
| copy_security/2 |
| ]). |
| |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| -include("couch_bt_engine.hrl"). |
| |
| |
| exists(FilePath) -> |
| case filelib:is_file(FilePath) of |
| true -> |
| true; |
| false -> |
| filelib:is_file(FilePath ++ ".compact") |
| end. |
| |
| |
| delete(RootDir, FilePath, Async) -> |
| %% Delete any leftover compaction files. If we don't do this a |
| %% subsequent request for this DB will try to open them to use |
| %% as a recovery. |
| delete_compaction_files(RootDir, FilePath, [{context, compaction}]), |
| |
| % Delete the actual database file |
| couch_file:delete(RootDir, FilePath, Async). |
| |
| |
| delete_compaction_files(RootDir, FilePath, DelOpts) -> |
| lists:foreach(fun(Ext) -> |
| couch_file:delete(RootDir, FilePath ++ Ext, DelOpts) |
| end, [".compact", ".compact.data", ".compact.meta"]). |
| |
| |
| init(FilePath, Options) -> |
| {ok, Fd} = open_db_file(FilePath, Options), |
| Header = case lists:member(create, Options) of |
| true -> |
| delete_compaction_files(FilePath), |
| Header0 = couch_bt_engine_header:new(), |
| ok = couch_file:write_header(Fd, Header0), |
| Header0; |
| false -> |
| case couch_file:read_header(Fd) of |
| {ok, Header0} -> |
| Header0; |
| no_valid_header -> |
| delete_compaction_files(FilePath), |
| Header0 = couch_bt_engine_header:new(), |
| ok = couch_file:write_header(Fd, Header0), |
| Header0 |
| end |
| end, |
| {ok, init_state(FilePath, Fd, Header, Options)}. |
| |
| |
| terminate(_Reason, St) -> |
| % If the reason we died is because our fd disappeared |
| % then we don't need to try closing it again. |
| Ref = St#st.fd_monitor, |
| if Ref == closed -> ok; true -> |
| ok = couch_file:close(St#st.fd), |
| receive |
| {'DOWN', Ref, _, _, _} -> |
| ok |
| after 500 -> |
| ok |
| end |
| end, |
| couch_util:shutdown_sync(St#st.fd), |
| ok. |
| |
| |
| handle_db_updater_call(Msg, St) -> |
| {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}. |
| |
| |
| handle_db_updater_info({'DOWN', Ref, _, _, _}, #st{fd_monitor=Ref} = St) -> |
| {stop, normal, St#st{fd=undefined, fd_monitor=closed}}. |
| |
| |
| incref(St) -> |
| {ok, St#st{fd_monitor = erlang:monitor(process, St#st.fd)}}. |
| |
| |
| decref(St) -> |
| true = erlang:demonitor(St#st.fd_monitor, [flush]), |
| ok. |
| |
| |
| monitored_by(St) -> |
| case erlang:process_info(St#st.fd, monitored_by) of |
| {monitored_by, Pids} -> |
| Pids; |
| _ -> |
| [] |
| end. |
| |
| |
| last_activity(#st{fd = Fd}) -> |
| couch_file:last_read(Fd). |
| |
| |
| get_compacted_seq(#st{header = Header}) -> |
| couch_bt_engine_header:get(Header, compacted_seq). |
| |
| |
| get_del_doc_count(#st{} = St) -> |
| {ok, Reds} = couch_btree:full_reduce(St#st.id_tree), |
| element(2, Reds). |
| |
| |
| get_disk_version(#st{header = Header}) -> |
| couch_bt_engine_header:get(Header, disk_version). |
| |
| |
| get_doc_count(#st{} = St) -> |
| {ok, Reds} = couch_btree:full_reduce(St#st.id_tree), |
| element(1, Reds). |
| |
| |
| get_epochs(#st{header = Header}) -> |
| couch_bt_engine_header:get(Header, epochs). |
| |
| |
| get_last_purged(#st{header = Header} = St) -> |
| case couch_bt_engine_header:get(Header, purged_docs) of |
| nil -> |
| []; |
| Pointer -> |
| {ok, PurgeInfo} = couch_file:pread_term(St#st.fd, Pointer), |
| PurgeInfo |
| end. |
| |
| |
| get_purge_seq(#st{header = Header}) -> |
| couch_bt_engine_header:get(Header, purge_seq). |
| |
| |
| get_revs_limit(#st{header = Header}) -> |
| couch_bt_engine_header:get(Header, revs_limit). |
| |
| |
| get_size_info(#st{} = St) -> |
| {ok, FileSize} = couch_file:bytes(St#st.fd), |
| {ok, DbReduction} = couch_btree:full_reduce(St#st.id_tree), |
| SizeInfo0 = element(3, DbReduction), |
| SizeInfo = case SizeInfo0 of |
| SI when is_record(SI, size_info) -> |
| SI; |
| {AS, ES} -> |
| #size_info{active=AS, external=ES}; |
| AS -> |
| #size_info{active=AS} |
| end, |
| ActiveSize = active_size(St, SizeInfo), |
| ExternalSize = SizeInfo#size_info.external, |
| [ |
| {active, ActiveSize}, |
| {external, ExternalSize}, |
| {file, FileSize} |
| ]. |
| |
| |
| get_security(#st{header = Header} = St) -> |
| case couch_bt_engine_header:get(Header, security_ptr) of |
| undefined -> |
| []; |
| Pointer -> |
| {ok, SecProps} = couch_file:pread_term(St#st.fd, Pointer), |
| SecProps |
| end. |
| |
| |
| get_update_seq(#st{header = Header}) -> |
| couch_bt_engine_header:get(Header, update_seq). |
| |
| |
| get_uuid(#st{header = Header}) -> |
| couch_bt_engine_header:get(Header, uuid). |
| |
| |
| set_revs_limit(#st{header = Header} = St, RevsLimit) -> |
| NewSt = St#st{ |
| header = couch_bt_engine_header:set(Header, [ |
| {revs_limit, RevsLimit} |
| ]), |
| needs_commit = true |
| }, |
| {ok, increment_update_seq(NewSt)}. |
| |
| |
| set_security(#st{header = Header} = St, NewSecurity) -> |
| Options = [{compression, St#st.compression}], |
| {ok, Ptr, _} = couch_file:append_term(St#st.fd, NewSecurity, Options), |
| NewSt = St#st{ |
| header = couch_bt_engine_header:set(Header, [ |
| {security_ptr, Ptr} |
| ]), |
| needs_commit = true |
| }, |
| {ok, increment_update_seq(NewSt)}. |
| |
| |
| open_docs(#st{} = St, DocIds) -> |
| Results = couch_btree:lookup(St#st.id_tree, DocIds), |
| lists:map(fun |
| ({ok, FDI}) -> FDI; |
| (not_found) -> not_found |
| end, Results). |
| |
| |
| open_local_docs(#st{} = St, DocIds) -> |
| Results = couch_btree:lookup(St#st.local_tree, DocIds), |
| lists:map(fun |
| ({ok, Doc}) -> Doc; |
| (not_found) -> not_found |
| end, Results). |
| |
| |
| read_doc_body(#st{} = St, #doc{} = Doc) -> |
| {ok, {Body, Atts}} = couch_file:pread_term(St#st.fd, Doc#doc.body), |
| Doc#doc{ |
| body = Body, |
| atts = Atts |
| }. |
| |
| |
| serialize_doc(#st{} = St, #doc{} = Doc) -> |
| Compress = fun(Term) -> |
| case couch_compress:is_compressed(Term, St#st.compression) of |
| true -> Term; |
| false -> couch_compress:compress(Term, St#st.compression) |
| end |
| end, |
| Body = Compress(Doc#doc.body), |
| Atts = Compress(Doc#doc.atts), |
| SummaryBin = ?term_to_bin({Body, Atts}), |
| Md5 = couch_hash:md5_hash(SummaryBin), |
| Data = couch_file:assemble_file_chunk(SummaryBin, Md5), |
| % TODO: This is a terrible hack to get around the issues |
| % in COUCHDB-3255. We'll need to come back and figure |
| % out a better approach to handling the case when we |
| % need to generate a new revision id after the doc |
| % has been serialized. |
| Doc#doc{ |
| body = Data, |
| meta = [{comp_body, Body} | Doc#doc.meta] |
| }. |
| |
| |
| write_doc_body(St, #doc{} = Doc) -> |
| #st{ |
| fd = Fd |
| } = St, |
| {ok, Ptr, Written} = couch_file:append_raw_chunk(Fd, Doc#doc.body), |
| {ok, Doc#doc{body = Ptr}, Written}. |
| |
| |
| write_doc_infos(#st{} = St, Pairs, LocalDocs, PurgedIdRevs) -> |
| #st{ |
| id_tree = IdTree, |
| seq_tree = SeqTree, |
| local_tree = LocalTree |
| } = St, |
| FinalAcc = lists:foldl(fun({OldFDI, NewFDI}, Acc) -> |
| {AddAcc, RemIdsAcc, RemSeqsAcc} = Acc, |
| case {OldFDI, NewFDI} of |
| {not_found, #full_doc_info{}} -> |
| {[NewFDI | AddAcc], RemIdsAcc, RemSeqsAcc}; |
| {#full_doc_info{id = Id}, #full_doc_info{id = Id}} -> |
| NewAddAcc = [NewFDI | AddAcc], |
| NewRemSeqsAcc = [OldFDI#full_doc_info.update_seq | RemSeqsAcc], |
| {NewAddAcc, RemIdsAcc, NewRemSeqsAcc}; |
| {#full_doc_info{id = Id}, not_found} -> |
| NewRemIdsAcc = [Id | RemIdsAcc], |
| NewRemSeqsAcc = [OldFDI#full_doc_info.update_seq | RemSeqsAcc], |
| {AddAcc, NewRemIdsAcc, NewRemSeqsAcc} |
| end |
| end, {[], [], []}, Pairs), |
| |
| {Add, RemIds, RemSeqs} = FinalAcc, |
| {ok, IdTree2} = couch_btree:add_remove(IdTree, Add, RemIds), |
| {ok, SeqTree2} = couch_btree:add_remove(SeqTree, Add, RemSeqs), |
| |
| {AddLDocs, RemLDocIds} = lists:foldl(fun(Doc, {AddAcc, RemAcc}) -> |
| case Doc#doc.deleted of |
| true -> |
| {AddAcc, [Doc#doc.id | RemAcc]}; |
| false -> |
| {[Doc | AddAcc], RemAcc} |
| end |
| end, {[], []}, LocalDocs), |
| {ok, LocalTree2} = couch_btree:add_remove(LocalTree, AddLDocs, RemLDocIds), |
| |
| NewUpdateSeq = lists:foldl(fun(#full_doc_info{update_seq=Seq}, Acc) -> |
| erlang:max(Seq, Acc) |
| end, get_update_seq(St), Add), |
| |
| NewHeader = case PurgedIdRevs of |
| [] -> |
| couch_bt_engine_header:set(St#st.header, [ |
| {update_seq, NewUpdateSeq} |
| ]); |
| _ -> |
| {ok, Ptr, _} = couch_file:append_term(St#st.fd, PurgedIdRevs), |
| OldPurgeSeq = couch_bt_engine_header:get(St#st.header, purge_seq), |
| % We bump NewUpdateSeq because we have to ensure that |
| % indexers see that they need to process the new purge |
| % information. |
| couch_bt_engine_header:set(St#st.header, [ |
| {update_seq, NewUpdateSeq + 1}, |
| {purge_seq, OldPurgeSeq + 1}, |
| {purged_docs, Ptr} |
| ]) |
| end, |
| |
| {ok, St#st{ |
| header = NewHeader, |
| id_tree = IdTree2, |
| seq_tree = SeqTree2, |
| local_tree = LocalTree2, |
| needs_commit = true |
| }}. |
| |
| |
| commit_data(St) -> |
| #st{ |
| fd = Fd, |
| fsync_options = FsyncOptions, |
| header = OldHeader, |
| needs_commit = NeedsCommit |
| } = St, |
| |
| NewHeader = update_header(St, OldHeader), |
| |
| case NewHeader /= OldHeader orelse NeedsCommit of |
| true -> |
| Before = lists:member(before_header, FsyncOptions), |
| After = lists:member(after_header, FsyncOptions), |
| |
| if Before -> couch_file:sync(Fd); true -> ok end, |
| ok = couch_file:write_header(Fd, NewHeader), |
| if After -> couch_file:sync(Fd); true -> ok end, |
| |
| {ok, St#st{ |
| header = NewHeader, |
| needs_commit = false |
| }}; |
| false -> |
| {ok, St} |
| end. |
| |
| |
| open_write_stream(#st{} = St, Options) -> |
| couch_stream:open({couch_bt_engine_stream, {St#st.fd, []}}, Options). |
| |
| |
| open_read_stream(#st{} = St, StreamSt) -> |
| {ok, {couch_bt_engine_stream, {St#st.fd, StreamSt}}}. |
| |
| |
| is_active_stream(#st{} = St, {couch_bt_engine_stream, {Fd, _}}) -> |
| St#st.fd == Fd; |
| is_active_stream(_, _) -> |
| false. |
| |
| |
| fold_docs(St, UserFun, UserAcc, Options) -> |
| fold_docs_int(St, St#st.id_tree, UserFun, UserAcc, Options). |
| |
| |
| fold_local_docs(St, UserFun, UserAcc, Options) -> |
| case fold_docs_int(St, St#st.local_tree, UserFun, UserAcc, Options) of |
| {ok, _Reds, FinalAcc} -> {ok, null, FinalAcc}; |
| {ok, FinalAcc} -> {ok, FinalAcc} |
| end. |
| |
| |
| fold_changes(St, SinceSeq, UserFun, UserAcc, Options) -> |
| Fun = fun drop_reductions/4, |
| InAcc = {UserFun, UserAcc}, |
| Opts = [{start_key, SinceSeq + 1}] ++ Options, |
| {ok, _, OutAcc} = couch_btree:fold(St#st.seq_tree, Fun, InAcc, Opts), |
| {_, FinalUserAcc} = OutAcc, |
| {ok, FinalUserAcc}. |
| |
| |
| count_changes_since(St, SinceSeq) -> |
| BTree = St#st.seq_tree, |
| FoldFun = fun(_SeqStart, PartialReds, 0) -> |
| {ok, couch_btree:final_reduce(BTree, PartialReds)} |
| end, |
| Opts = [{start_key, SinceSeq + 1}], |
| {ok, Changes} = couch_btree:fold_reduce(BTree, FoldFun, 0, Opts), |
| Changes. |
| |
| |
| start_compaction(St, DbName, Options, Parent) -> |
| Args = [St, DbName, Options, Parent], |
| Pid = spawn_link(couch_bt_engine_compactor, start, Args), |
| {ok, St, Pid}. |
| |
| |
| finish_compaction(OldState, DbName, Options, CompactFilePath) -> |
| {ok, NewState1} = ?MODULE:init(CompactFilePath, Options), |
| OldSeq = get_update_seq(OldState), |
| NewSeq = get_update_seq(NewState1), |
| case OldSeq == NewSeq of |
| true -> |
| finish_compaction_int(OldState, NewState1); |
| false -> |
| couch_log:info("Compaction file still behind main file " |
| "(update seq=~p. compact update seq=~p). Retrying.", |
| [OldSeq, NewSeq]), |
| ok = decref(NewState1), |
| start_compaction(OldState, DbName, Options, self()) |
| end. |
| |
| |
| id_tree_split(#full_doc_info{}=Info) -> |
| #full_doc_info{ |
| id = Id, |
| update_seq = Seq, |
| deleted = Deleted, |
| sizes = SizeInfo, |
| rev_tree = Tree |
| } = Info, |
| {Id, {Seq, ?b2i(Deleted), split_sizes(SizeInfo), disk_tree(Tree)}}. |
| |
| |
| id_tree_join(Id, {HighSeq, Deleted, DiskTree}) -> |
| % Handle old formats before data_size was added |
| id_tree_join(Id, {HighSeq, Deleted, #size_info{}, DiskTree}); |
| |
| id_tree_join(Id, {HighSeq, Deleted, Sizes, DiskTree}) -> |
| #full_doc_info{ |
| id = Id, |
| update_seq = HighSeq, |
| deleted = ?i2b(Deleted), |
| sizes = couch_db_updater:upgrade_sizes(Sizes), |
| rev_tree = rev_tree(DiskTree) |
| }. |
| |
| |
| id_tree_reduce(reduce, FullDocInfos) -> |
| lists:foldl(fun(Info, {NotDeleted, Deleted, Sizes}) -> |
| Sizes2 = reduce_sizes(Sizes, Info#full_doc_info.sizes), |
| case Info#full_doc_info.deleted of |
| true -> |
| {NotDeleted, Deleted + 1, Sizes2}; |
| false -> |
| {NotDeleted + 1, Deleted, Sizes2} |
| end |
| end, {0, 0, #size_info{}}, FullDocInfos); |
| id_tree_reduce(rereduce, Reds) -> |
| lists:foldl(fun |
| ({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSizes}) -> |
| % pre 1.2 format, will be upgraded on compaction |
| {AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil}; |
| ({NotDeleted, Deleted, Sizes}, {AccNotDeleted, AccDeleted, AccSizes}) -> |
| AccSizes2 = reduce_sizes(AccSizes, Sizes), |
| {AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSizes2} |
| end, {0, 0, #size_info{}}, Reds). |
| |
| |
| seq_tree_split(#full_doc_info{}=Info) -> |
| #full_doc_info{ |
| id = Id, |
| update_seq = Seq, |
| deleted = Del, |
| sizes = SizeInfo, |
| rev_tree = Tree |
| } = Info, |
| {Seq, {Id, ?b2i(Del), split_sizes(SizeInfo), disk_tree(Tree)}}. |
| |
| |
| seq_tree_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) -> |
| seq_tree_join(Seq, {Id, Del, {0, 0}, DiskTree}); |
| |
| seq_tree_join(Seq, {Id, Del, Sizes, DiskTree}) when is_integer(Del) -> |
| #full_doc_info{ |
| id = Id, |
| update_seq = Seq, |
| deleted = ?i2b(Del), |
| sizes = join_sizes(Sizes), |
| rev_tree = rev_tree(DiskTree) |
| }; |
| |
| seq_tree_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) -> |
| % Older versions stored #doc_info records in the seq_tree. |
| % Compact to upgrade. |
| Revs = lists:map(fun({Rev, Seq, Bp}) -> |
| #rev_info{rev = Rev, seq = Seq, deleted = false, body_sp = Bp} |
| end, RevInfos), |
| DeletedRevs = lists:map(fun({Rev, Seq, Bp}) -> |
| #rev_info{rev = Rev, seq = Seq, deleted = true, body_sp = Bp} |
| end, DeletedRevInfos), |
| #doc_info{ |
| id = Id, |
| high_seq = KeySeq, |
| revs = Revs ++ DeletedRevs |
| }. |
| |
| |
| seq_tree_reduce(reduce, DocInfos) -> |
| % count the number of documents |
| length(DocInfos); |
| seq_tree_reduce(rereduce, Reds) -> |
| lists:sum(Reds). |
| |
| |
| local_tree_split(#doc{revs = {0, [Rev]}} = Doc) when is_binary(Rev) -> |
| #doc{ |
| id = Id, |
| body = BodyData |
| } = Doc, |
| {Id, {binary_to_integer(Rev), BodyData}}; |
| |
| local_tree_split(#doc{revs = {0, [Rev]}} = Doc) when is_integer(Rev) -> |
| #doc{ |
| id = Id, |
| body = BodyData |
| } = Doc, |
| {Id, {Rev, BodyData}}. |
| |
| |
| local_tree_join(Id, {Rev, BodyData}) when is_integer(Rev) -> |
| #doc{ |
| id = Id, |
| revs = {0, [integer_to_binary(Rev)]}, |
| body = BodyData |
| }. |
| |
| |
| set_update_seq(#st{header = Header} = St, UpdateSeq) -> |
| {ok, St#st{ |
| header = couch_bt_engine_header:set(Header, [ |
| {update_seq, UpdateSeq} |
| ]), |
| needs_commit = true |
| }}. |
| |
| |
| copy_security(#st{header = Header} = St, SecProps) -> |
| Options = [{compression, St#st.compression}], |
| {ok, Ptr, _} = couch_file:append_term(St#st.fd, SecProps, Options), |
| {ok, St#st{ |
| header = couch_bt_engine_header:set(Header, [ |
| {security_ptr, Ptr} |
| ]), |
| needs_commit = true |
| }}. |
| |
| |
| open_db_file(FilePath, Options) -> |
| case couch_file:open(FilePath, Options) of |
| {ok, Fd} -> |
| {ok, Fd}; |
| {error, enoent} -> |
| % Couldn't find file. is there a compact version? This ca |
| % happen (rarely) if we crashed during the file switch. |
| case couch_file:open(FilePath ++ ".compact", [nologifmissing]) of |
| {ok, Fd} -> |
| Fmt = "Recovering from compaction file: ~s~s", |
| couch_log:info(Fmt, [FilePath, ".compact"]), |
| ok = file:rename(FilePath ++ ".compact", FilePath), |
| ok = couch_file:sync(Fd), |
| {ok, Fd}; |
| {error, enoent} -> |
| throw({not_found, no_db_file}) |
| end; |
| Error -> |
| throw(Error) |
| end. |
| |
| |
| init_state(FilePath, Fd, Header0, Options) -> |
| DefaultFSync = "[before_header, after_header, on_file_open]", |
| FsyncStr = config:get("couchdb", "fsync_options", DefaultFSync), |
| {ok, FsyncOptions} = couch_util:parse_term(FsyncStr), |
| |
| case lists:member(on_file_open, FsyncOptions) of |
| true -> ok = couch_file:sync(Fd); |
| _ -> ok |
| end, |
| |
| Compression = couch_compress:get_compression_method(), |
| |
| Header1 = couch_bt_engine_header:upgrade(Header0), |
| Header = set_default_security_object(Fd, Header1, Compression, Options), |
| |
| IdTreeState = couch_bt_engine_header:id_tree_state(Header), |
| {ok, IdTree} = couch_btree:open(IdTreeState, Fd, [ |
| {split, fun ?MODULE:id_tree_split/1}, |
| {join, fun ?MODULE:id_tree_join/2}, |
| {reduce, fun ?MODULE:id_tree_reduce/2}, |
| {compression, Compression} |
| ]), |
| |
| SeqTreeState = couch_bt_engine_header:seq_tree_state(Header), |
| {ok, SeqTree} = couch_btree:open(SeqTreeState, Fd, [ |
| {split, fun ?MODULE:seq_tree_split/1}, |
| {join, fun ?MODULE:seq_tree_join/2}, |
| {reduce, fun ?MODULE:seq_tree_reduce/2}, |
| {compression, Compression} |
| ]), |
| |
| LocalTreeState = couch_bt_engine_header:local_tree_state(Header), |
| {ok, LocalTree} = couch_btree:open(LocalTreeState, Fd, [ |
| {split, fun ?MODULE:local_tree_split/1}, |
| {join, fun ?MODULE:local_tree_join/2}, |
| {compression, Compression} |
| ]), |
| |
| ok = couch_file:set_db_pid(Fd, self()), |
| |
| St = #st{ |
| filepath = FilePath, |
| fd = Fd, |
| fd_monitor = erlang:monitor(process, Fd), |
| fsync_options = FsyncOptions, |
| header = Header, |
| needs_commit = false, |
| id_tree = IdTree, |
| seq_tree = SeqTree, |
| local_tree = LocalTree, |
| compression = Compression |
| }, |
| |
| % If this is a new database we've just created a |
| % new UUID and default security object which need |
| % to be written to disk. |
| case Header /= Header0 of |
| true -> |
| {ok, NewSt} = commit_data(St), |
| NewSt; |
| false -> |
| St |
| end. |
| |
| |
| update_header(St, Header) -> |
| couch_bt_engine_header:set(Header, [ |
| {seq_tree_state, couch_btree:get_state(St#st.seq_tree)}, |
| {id_tree_state, couch_btree:get_state(St#st.id_tree)}, |
| {local_tree_state, couch_btree:get_state(St#st.local_tree)} |
| ]). |
| |
| |
| increment_update_seq(#st{header = Header} = St) -> |
| UpdateSeq = couch_bt_engine_header:get(Header, update_seq), |
| St#st{ |
| header = couch_bt_engine_header:set(Header, [ |
| {update_seq, UpdateSeq + 1} |
| ]) |
| }. |
| |
| |
| set_default_security_object(Fd, Header, Compression, Options) -> |
| case couch_bt_engine_header:get(Header, security_ptr) of |
| Pointer when is_integer(Pointer) -> |
| Header; |
| _ -> |
| Default = couch_util:get_value(default_security_object, Options), |
| AppendOpts = [{compression, Compression}], |
| {ok, Ptr, _} = couch_file:append_term(Fd, Default, AppendOpts), |
| couch_bt_engine_header:set(Header, security_ptr, Ptr) |
| end. |
| |
| |
| delete_compaction_files(FilePath) -> |
| RootDir = config:get("couchdb", "database_dir", "."), |
| DelOpts = [{context, compaction}], |
| delete_compaction_files(RootDir, FilePath, DelOpts). |
| |
| |
| rev_tree(DiskTree) -> |
| couch_key_tree:map(fun |
| (_RevId, {Del, Ptr, Seq}) -> |
| #leaf{ |
| deleted = ?i2b(Del), |
| ptr = Ptr, |
| seq = Seq |
| }; |
| (_RevId, {Del, Ptr, Seq, Size}) -> |
| #leaf{ |
| deleted = ?i2b(Del), |
| ptr = Ptr, |
| seq = Seq, |
| sizes = couch_db_updater:upgrade_sizes(Size) |
| }; |
| (_RevId, {Del, Ptr, Seq, Sizes, Atts}) -> |
| #leaf{ |
| deleted = ?i2b(Del), |
| ptr = Ptr, |
| seq = Seq, |
| sizes = couch_db_updater:upgrade_sizes(Sizes), |
| atts = Atts |
| }; |
| (_RevId, ?REV_MISSING) -> |
| ?REV_MISSING |
| end, DiskTree). |
| |
| |
| disk_tree(RevTree) -> |
| couch_key_tree:map(fun |
| (_RevId, ?REV_MISSING) -> |
| ?REV_MISSING; |
| (_RevId, #leaf{} = Leaf) -> |
| #leaf{ |
| deleted = Del, |
| ptr = Ptr, |
| seq = Seq, |
| sizes = Sizes, |
| atts = Atts |
| } = Leaf, |
| {?b2i(Del), Ptr, Seq, split_sizes(Sizes), Atts} |
| end, RevTree). |
| |
| |
| split_sizes(#size_info{}=SI) -> |
| {SI#size_info.active, SI#size_info.external}. |
| |
| |
| join_sizes({Active, External}) when is_integer(Active), is_integer(External) -> |
| #size_info{active=Active, external=External}. |
| |
| |
| reduce_sizes(nil, _) -> |
| nil; |
| reduce_sizes(_, nil) -> |
| nil; |
| reduce_sizes(#size_info{}=S1, #size_info{}=S2) -> |
| #size_info{ |
| active = S1#size_info.active + S2#size_info.active, |
| external = S1#size_info.external + S2#size_info.external |
| }; |
| reduce_sizes(S1, S2) -> |
| US1 = couch_db_updater:upgrade_sizes(S1), |
| US2 = couch_db_updater:upgrade_sizes(S2), |
| reduce_sizes(US1, US2). |
| |
| |
| active_size(#st{} = St, #size_info{} = SI) -> |
| Trees = [ |
| St#st.id_tree, |
| St#st.seq_tree, |
| St#st.local_tree |
| ], |
| lists:foldl(fun(T, Acc) -> |
| case couch_btree:size(T) of |
| _ when Acc == null -> |
| null; |
| nil -> |
| null; |
| Size -> |
| Acc + Size |
| end |
| end, SI#size_info.active, Trees). |
| |
| |
| fold_docs_int(St, Tree, UserFun, UserAcc, Options) -> |
| Fun = case lists:member(include_deleted, Options) of |
| true -> fun include_deleted/4; |
| false -> fun skip_deleted/4 |
| end, |
| RedFun = case lists:member(include_reductions, Options) of |
| true -> fun include_reductions/4; |
| false -> fun drop_reductions/4 |
| end, |
| InAcc = {RedFun, {UserFun, UserAcc}}, |
| {ok, Reds, OutAcc} = couch_btree:fold(Tree, Fun, InAcc, Options), |
| {_, {_, FinalUserAcc}} = OutAcc, |
| case lists:member(include_reductions, Options) of |
| true when Tree == St#st.id_tree -> |
| {ok, fold_docs_reduce_to_count(Reds), FinalUserAcc}; |
| true when Tree == St#st.local_tree -> |
| {ok, 0, FinalUserAcc}; |
| false -> |
| {ok, FinalUserAcc} |
| end. |
| |
| |
| include_deleted(Case, Entry, Reds, {UserFun, UserAcc}) -> |
| {Go, NewUserAcc} = UserFun(Case, Entry, Reds, UserAcc), |
| {Go, {UserFun, NewUserAcc}}. |
| |
| |
| % First element of the reductions is the total |
| % number of undeleted documents. |
| skip_deleted(traverse, _Entry, {0, _, _} = _Reds, Acc) -> |
| {skip, Acc}; |
| skip_deleted(visit, #full_doc_info{deleted = true}, _, Acc) -> |
| {ok, Acc}; |
| skip_deleted(Case, Entry, Reds, {UserFun, UserAcc}) -> |
| {Go, NewUserAcc} = UserFun(Case, Entry, Reds, UserAcc), |
| {Go, {UserFun, NewUserAcc}}. |
| |
| |
| include_reductions(visit, FDI, Reds, {UserFun, UserAcc}) -> |
| {Go, NewUserAcc} = UserFun(FDI, Reds, UserAcc), |
| {Go, {UserFun, NewUserAcc}}; |
| include_reductions(_, _, _, Acc) -> |
| {ok, Acc}. |
| |
| |
| drop_reductions(visit, FDI, _Reds, {UserFun, UserAcc}) -> |
| {Go, NewUserAcc} = UserFun(FDI, UserAcc), |
| {Go, {UserFun, NewUserAcc}}; |
| drop_reductions(_, _, _, Acc) -> |
| {ok, Acc}. |
| |
| |
| fold_docs_reduce_to_count(Reds) -> |
| RedFun = fun id_tree_reduce/2, |
| FinalRed = couch_btree:final_reduce(RedFun, Reds), |
| element(1, FinalRed). |
| |
| |
| finish_compaction_int(#st{} = OldSt, #st{} = NewSt1) -> |
| #st{ |
| filepath = FilePath, |
| local_tree = OldLocal |
| } = OldSt, |
| #st{ |
| filepath = CompactDataPath, |
| header = Header, |
| local_tree = NewLocal1 |
| } = NewSt1, |
| |
| % suck up all the local docs into memory and write them to the new db |
| LoadFun = fun(Value, _Offset, Acc) -> |
| {ok, [Value | Acc]} |
| end, |
| {ok, _, LocalDocs} = couch_btree:foldl(OldLocal, LoadFun, []), |
| {ok, NewLocal2} = couch_btree:add(NewLocal1, LocalDocs), |
| |
| {ok, NewSt2} = commit_data(NewSt1#st{ |
| header = couch_bt_engine_header:set(Header, [ |
| {compacted_seq, get_update_seq(OldSt)}, |
| {revs_limit, get_revs_limit(OldSt)} |
| ]), |
| local_tree = NewLocal2 |
| }), |
| |
| % Rename our *.compact.data file to *.compact so that if we |
| % die between deleting the old file and renaming *.compact |
| % we can recover correctly. |
| ok = file:rename(CompactDataPath, FilePath ++ ".compact"), |
| |
| % Remove the uncompacted database file |
| RootDir = config:get("couchdb", "database_dir", "."), |
| couch_file:delete(RootDir, FilePath), |
| |
| % Move our compacted file into its final location |
| ok = file:rename(FilePath ++ ".compact", FilePath), |
| |
| % Delete the old meta compaction file after promoting |
| % the compaction file. |
| couch_file:delete(RootDir, FilePath ++ ".compact.meta"), |
| |
| % We're finished with our old state |
| decref(OldSt), |
| |
| % And return our finished new state |
| {ok, NewSt2#st{ |
| filepath = FilePath |
| }, undefined}. |