blob: 78726358e2a2edfe7edafc9f60cf218252e54d55 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_db_updater).
-behaviour(gen_server).
-vsn(1).
-export([btree_by_id_split/1, btree_by_id_join/2, btree_by_id_reduce/2]).
-export([btree_by_seq_split/1, btree_by_seq_join/2, btree_by_seq_reduce/2]).
-export([make_doc_summary/2]).
-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
-include_lib("couch/include/couch_db.hrl").
-record(comp_header, {
db_header,
meta_state
}).
-record(merge_st, {
id_tree,
seq_tree,
curr,
rem_seqs,
infos
}).
init({DbName, Filepath, Fd, Options}) ->
erlang:put(io_priority, {db_update, DbName}),
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
Header = couch_db_header:new(),
ok = couch_file:write_header(Fd, Header),
% delete any old compaction files that might be hanging around
RootDir = config:get("couchdb", "database_dir", "."),
couch_file:delete(RootDir, Filepath ++ ".compact"),
couch_file:delete(RootDir, Filepath ++ ".compact.data"),
couch_file:delete(RootDir, Filepath ++ ".compact.meta");
false ->
case couch_file:read_header(Fd) of
{ok, Header} ->
ok;
no_valid_header ->
% create a new header and writes it to the file
Header = couch_db_header:new(),
ok = couch_file:write_header(Fd, Header),
% delete any old compaction files that might be hanging around
file:delete(Filepath ++ ".compact"),
file:delete(Filepath ++ ".compact.data"),
file:delete(Filepath ++ ".compact.meta")
end
end,
Db = init_db(DbName, Filepath, Fd, Header, Options),
case lists:member(sys_db, Options) of
false ->
couch_stats_process_tracker:track([couchdb, open_databases]);
true ->
ok
end,
% we don't load validation funs here because the fabric query is liable to
% race conditions. Instead see couch_db:validate_doc_update, which loads
% them lazily
{ok, Db#db{main_pid = self()}}.
terminate(_Reason, Db) ->
% If the reason we died is because our fd disappeared
% then we don't need to try closing it again.
if Db#db.fd_monitor == closed -> ok; true ->
ok = couch_file:close(Db#db.fd)
end,
couch_util:shutdown_sync(Db#db.compactor_pid),
couch_util:shutdown_sync(Db#db.fd),
ok.
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db};
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
handle_call(full_commit, _From, Db) ->
{reply, ok, commit_data(Db)};
handle_call({full_commit, RequiredSeq}, _From, Db)
when RequiredSeq =< Db#db.committed_update_seq ->
{reply, ok, Db};
handle_call({full_commit, _}, _, Db) ->
{reply, ok, commit_data(Db)}; % commit the data and return ok
handle_call(start_compact, _From, Db) ->
{noreply, NewDb} = handle_cast(start_compact, Db),
{reply, {ok, NewDb#db.compactor_pid}, NewDb};
handle_call(compactor_pid, _From, #db{compactor_pid = Pid} = Db) ->
{reply, Pid, Db};
handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
{reply, ok, Db};
handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
unlink(Pid),
exit(Pid, kill),
RootDir = config:get("couchdb", "database_dir", "."),
ok = couch_file:delete(RootDir, Db#db.filepath ++ ".compact"),
Db2 = Db#db{compactor_pid = nil},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_event:notify(Db#db.name, updated),
{reply, {ok, Db2#db.update_seq}, Db2};
handle_call({set_security, NewSec}, _From, #db{compression = Comp} = Db) ->
{ok, Ptr, _} = couch_file:append_term(
Db#db.fd, NewSec, [{compression, Comp}]),
Db2 = commit_data(Db#db{security=NewSec, security_ptr=Ptr,
update_seq=Db#db.update_seq+1}),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
handle_call({set_revs_limit, Limit}, _From, Db) ->
Db2 = commit_data(Db#db{revs_limit=Limit,
update_seq=Db#db.update_seq+1}),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2};
handle_call({purge_docs, _IdRevs}, _From,
#db{compactor_pid=Pid}=Db) when Pid /= nil ->
{reply, {error, purge_during_compaction}, Db};
handle_call({purge_docs, IdRevs}, _From, Db) ->
#db{
fd = Fd,
id_tree = DocInfoByIdBTree,
seq_tree = DocInfoBySeqBTree,
update_seq = LastSeq,
header = Header,
compression = Comp
} = Db,
DocLookups = couch_btree:lookup(DocInfoByIdBTree,
[Id || {Id, _Revs} <- IdRevs]),
NewDocInfos = lists:zipwith(
fun({_Id, Revs}, {ok, #full_doc_info{rev_tree=Tree}=FullDocInfo}) ->
case couch_key_tree:remove_leafs(Tree, Revs) of
{_, []=_RemovedRevs} -> % no change
nil;
{NewTree, RemovedRevs} ->
{FullDocInfo#full_doc_info{rev_tree=NewTree},RemovedRevs}
end;
(_, not_found) ->
nil
end,
IdRevs, DocLookups),
SeqsToRemove = [Seq
|| {#full_doc_info{update_seq=Seq},_} <- NewDocInfos],
FullDocInfoToUpdate = [FullInfo
|| {#full_doc_info{rev_tree=Tree}=FullInfo,_}
<- NewDocInfos, Tree /= []],
IdRevsPurged = [{Id, Revs}
|| {#full_doc_info{id=Id}, Revs} <- NewDocInfos],
{DocInfoToUpdate, NewSeq} = lists:mapfoldl(
fun(#full_doc_info{rev_tree=Tree}=FullInfo, SeqAcc) ->
Tree2 = couch_key_tree:map_leafs(
fun(_RevId, Leaf) ->
Leaf#leaf{seq=SeqAcc+1}
end, Tree),
{FullInfo#full_doc_info{rev_tree=Tree2}, SeqAcc + 1}
end, LastSeq, FullDocInfoToUpdate),
IdsToRemove = [Id || {#full_doc_info{id=Id,rev_tree=[]},_}
<- NewDocInfos],
{ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree,
DocInfoToUpdate, SeqsToRemove),
{ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree,
FullDocInfoToUpdate, IdsToRemove),
{ok, Pointer, _} = couch_file:append_term(
Fd, IdRevsPurged, [{compression, Comp}]),
NewHeader = couch_db_header:set(Header, [
{purge_seq, couch_db_header:purge_seq(Header) + 1},
{purged_docs, Pointer}
]),
Db2 = commit_data(
Db#db{
id_tree = DocInfoByIdBTree2,
seq_tree = DocInfoBySeqBTree2,
update_seq = NewSeq + 1,
header=NewHeader}),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
couch_event:notify(Db#db.name, updated),
{reply, {ok, couch_db_header:purge_seq(NewHeader), IdRevsPurged}, Db2}.
handle_cast({load_validation_funs, ValidationFuns}, Db) ->
Db2 = Db#db{validate_doc_funs = ValidationFuns},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{noreply, Db2};
handle_cast(start_compact, Db) ->
case Db#db.compactor_pid of
nil ->
couch_log:info("Starting compaction for db \"~s\"", [Db#db.name]),
Pid = spawn_link(fun() -> start_copy_compact(Db) end),
Db2 = Db#db{compactor_pid=Pid},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{noreply, Db2};
_ ->
% compact currently running, this is a no-op
{noreply, Db}
end;
handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
{ok, NewHeader0} = couch_file:read_header(NewFd),
NewHeader = couch_db_header:set(NewHeader0, [
{compacted_seq, Db#db.update_seq}
]),
#db{update_seq=NewSeq} = NewDb =
init_db(Db#db.name, Filepath, NewFd, NewHeader, Db#db.options),
unlink(NewFd),
case Db#db.update_seq == NewSeq of
true ->
% suck up all the local docs into memory and write them to the new db
{ok, _, LocalDocs} = couch_btree:foldl(Db#db.local_tree,
fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
{ok, NewLocalBtree} = couch_btree:add(NewDb#db.local_tree, LocalDocs),
NewDb2 = commit_data(NewDb#db{
local_tree = NewLocalBtree,
main_pid = self(),
filepath = Filepath,
instance_start_time = Db#db.instance_start_time,
revs_limit = Db#db.revs_limit
}),
couch_log:debug("CouchDB swapping files ~s and ~s.",
[Filepath, CompactFilepath]),
ok = file:rename(CompactFilepath, Filepath ++ ".compact"),
RootDir = config:get("couchdb", "database_dir", "."),
couch_file:delete(RootDir, Filepath),
ok = file:rename(Filepath ++ ".compact", Filepath),
% Delete the old meta compaction file after promoting
% the compaction file.
couch_file:delete(RootDir, Filepath ++ ".compact.meta"),
close_db(Db),
NewDb3 = refresh_validate_doc_funs(NewDb2),
ok = gen_server:call(couch_server, {db_updated, NewDb3}, infinity),
couch_event:notify(NewDb3#db.name, compacted),
couch_log:info("Compaction for db \"~s\" completed.", [Db#db.name]),
{noreply, NewDb3#db{compactor_pid=nil}};
false ->
couch_log:info("Compaction file still behind main file "
"(update seq=~p. compact update seq=~p). Retrying.",
[Db#db.update_seq, NewSeq]),
close_db(NewDb),
Pid = spawn_link(fun() -> start_copy_compact(Db) end),
Db2 = Db#db{compactor_pid=Pid},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{noreply, Db2}
end;
handle_cast(Msg, #db{name = Name} = Db) ->
couch_log:error("Database `~s` updater received unexpected cast: ~p",
[Name, Msg]),
{stop, Msg, Db}.
handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
FullCommit}, Db) ->
GroupedDocs2 = sort_and_tag_grouped_docs(Client, GroupedDocs),
if NonRepDocs == [] ->
{GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
[Client], MergeConflicts, FullCommit);
true ->
GroupedDocs3 = GroupedDocs2,
FullCommit2 = FullCommit,
Clients = [Client]
end,
NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
FullCommit2) of
{ok, Db2, UpdatedDDocIds} ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
if Db2#db.update_seq /= Db#db.update_seq ->
couch_event:notify(Db2#db.name, updated);
true -> ok
end,
if NonRepDocs2 /= [] ->
couch_event:notify(Db2#db.name, local_updated);
true -> ok
end,
[catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
Db3 = case length(UpdatedDDocIds) > 0 of
true ->
% Ken and ddoc_cache are the only things that
% use the unspecified ddoc_updated message. We
% should update them to use the new message per
% ddoc.
lists:foreach(fun(DDocId) ->
couch_event:notify(Db2#db.name, {ddoc_updated, DDocId})
end, UpdatedDDocIds),
couch_event:notify(Db2#db.name, ddoc_updated),
ddoc_cache:evict(Db2#db.name, UpdatedDDocIds),
refresh_validate_doc_funs(Db2);
false ->
Db2
end,
{noreply, Db3, hibernate}
catch
throw: retry ->
[catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
{noreply, Db, hibernate}
end;
handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) ->
%no outstanding delayed commits, ignore
{noreply, Db};
handle_info(delayed_commit, Db) ->
case commit_data(Db) of
Db ->
{noreply, Db};
Db2 ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{noreply, Db2}
end;
handle_info({'EXIT', _Pid, normal}, Db) ->
{noreply, Db};
handle_info({'EXIT', _Pid, Reason}, Db) ->
{stop, Reason, Db};
handle_info({'DOWN', Ref, _, _, Reason}, #db{fd_monitor=Ref, name=Name} = Db) ->
couch_log:error("DB ~s shutting down - Fd ~p", [Name, Reason]),
{stop, normal, Db#db{fd=undefined, fd_monitor=closed}}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
sort_and_tag_grouped_docs(Client, GroupedDocs) ->
% These groups should already be sorted but sometimes clients misbehave.
% The merge_updates function will fail and the database can end up with
% duplicate documents if the incoming groups are not sorted, so as a sanity
% check we sort them again here. See COUCHDB-2735.
Cmp = fun([#doc{id=A}|_], [#doc{id=B}|_]) -> A < B end,
lists:map(fun(DocGroup) ->
[{Client, maybe_tag_doc(D)} || D <- DocGroup]
end, lists:sort(Cmp, GroupedDocs)).
maybe_tag_doc(#doc{id=Id, revs={Pos,[_Rev|PrevRevs]}, meta=Meta0}=Doc) ->
case lists:keymember(ref, 1, Meta0) of
true ->
Doc;
false ->
Key = {Id, {Pos-1, PrevRevs}},
Doc#doc{meta=[{ref, Key} | Meta0]}
end.
merge_updates([[{_,#doc{id=X}}|_]=A|RestA], [[{_,#doc{id=X}}|_]=B|RestB]) ->
[A++B | merge_updates(RestA, RestB)];
merge_updates([[{_,#doc{id=X}}|_]|_]=A, [[{_,#doc{id=Y}}|_]|_]=B) when X < Y ->
[hd(A) | merge_updates(tl(A), B)];
merge_updates([[{_,#doc{id=X}}|_]|_]=A, [[{_,#doc{id=Y}}|_]|_]=B) when X > Y ->
[hd(B) | merge_updates(A, tl(B))];
merge_updates([], RestB) ->
RestB;
merge_updates(RestA, []) ->
RestA.
collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
receive
% Only collect updates with the same MergeConflicts flag and without
% local docs. It's easier to just avoid multiple _local doc
% updaters than deal with their possible conflicts, and local docs
% writes are relatively rare. Can be optmized later if really needed.
{update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
GroupedDocs2 = sort_and_tag_grouped_docs(Client, GroupedDocs),
GroupedDocsAcc2 =
merge_updates(GroupedDocsAcc, GroupedDocs2),
collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
MergeConflicts, (FullCommit or FullCommit2))
after 0 ->
{GroupedDocsAcc, ClientsAcc, FullCommit}
end.
rev_tree(DiskTree) ->
couch_key_tree:map(fun
(_RevId, {Del, Ptr, Seq}) ->
#leaf{
deleted = ?i2b(Del),
ptr = Ptr,
seq = Seq
};
(_RevId, {Del, Ptr, Seq, Size}) ->
#leaf{
deleted = ?i2b(Del),
ptr = Ptr,
seq = Seq,
sizes = upgrade_sizes(Size)
};
(_RevId, {Del, Ptr, Seq, Sizes, Atts}) ->
#leaf{
deleted = ?i2b(Del),
ptr = Ptr,
seq = Seq,
sizes = upgrade_sizes(Sizes),
atts = Atts
};
(_RevId, ?REV_MISSING) ->
?REV_MISSING
end, DiskTree).
disk_tree(RevTree) ->
couch_key_tree:map(fun
(_RevId, ?REV_MISSING) ->
?REV_MISSING;
(_RevId, #leaf{} = Leaf) ->
#leaf{
deleted = Del,
ptr = Ptr,
seq = Seq,
sizes = Sizes,
atts = Atts
} = Leaf,
{?b2i(Del), Ptr, Seq, split_sizes(Sizes), Atts}
end, RevTree).
upgrade_sizes(#size_info{}=SI) ->
SI;
upgrade_sizes({D, E}) ->
#size_info{active=D, external=E};
upgrade_sizes(S) when is_integer(S) ->
#size_info{active=S, external=0}.
split_sizes(#size_info{}=SI) ->
{SI#size_info.active, SI#size_info.external}.
join_sizes({Active, External}) when is_integer(Active), is_integer(External) ->
#size_info{active=Active, external=External}.
btree_by_seq_split(#full_doc_info{}=Info) ->
#full_doc_info{
id = Id,
update_seq = Seq,
deleted = Del,
sizes = SizeInfo,
rev_tree = Tree
} = Info,
{Seq, {Id, ?b2i(Del), split_sizes(SizeInfo), disk_tree(Tree)}}.
btree_by_seq_join(Seq, {Id, Del, DiskTree}) when is_integer(Del) ->
btree_by_seq_join(Seq, {Id, Del, {0, 0}, DiskTree});
btree_by_seq_join(Seq, {Id, Del, Sizes, DiskTree}) when is_integer(Del) ->
#full_doc_info{
id = Id,
update_seq = Seq,
deleted = ?i2b(Del),
sizes = join_sizes(Sizes),
rev_tree = rev_tree(DiskTree)
};
btree_by_seq_join(KeySeq, {Id, RevInfos, DeletedRevInfos}) ->
% Older versions stored #doc_info records in the seq_tree.
% Compact to upgrade.
#doc_info{
id = Id,
high_seq=KeySeq,
revs =
[#rev_info{rev=Rev,seq=Seq,deleted=false,body_sp = Bp} ||
{Rev, Seq, Bp} <- RevInfos] ++
[#rev_info{rev=Rev,seq=Seq,deleted=true,body_sp = Bp} ||
{Rev, Seq, Bp} <- DeletedRevInfos]}.
btree_by_id_split(#full_doc_info{}=Info) ->
#full_doc_info{
id = Id,
update_seq = Seq,
deleted = Deleted,
sizes = SizeInfo,
rev_tree = Tree
} = Info,
{Id, {Seq, ?b2i(Deleted), split_sizes(SizeInfo), disk_tree(Tree)}}.
% Handle old formats before data_size was added
btree_by_id_join(Id, {HighSeq, Deleted, DiskTree}) ->
btree_by_id_join(Id, {HighSeq, Deleted, #size_info{}, DiskTree});
btree_by_id_join(Id, {HighSeq, Deleted, Sizes, DiskTree}) ->
#full_doc_info{
id = Id,
update_seq = HighSeq,
deleted = ?i2b(Deleted),
sizes = upgrade_sizes(Sizes),
rev_tree = rev_tree(DiskTree)
}.
btree_by_id_reduce(reduce, FullDocInfos) ->
lists:foldl(
fun(Info, {NotDeleted, Deleted, Sizes}) ->
Sizes2 = reduce_sizes(Sizes, Info#full_doc_info.sizes),
case Info#full_doc_info.deleted of
true ->
{NotDeleted, Deleted + 1, Sizes2};
false ->
{NotDeleted + 1, Deleted, Sizes2}
end
end,
{0, 0, #size_info{}}, FullDocInfos);
btree_by_id_reduce(rereduce, Reds) ->
lists:foldl(
fun({NotDeleted, Deleted}, {AccNotDeleted, AccDeleted, _AccSizes}) ->
% pre 1.2 format, will be upgraded on compaction
{AccNotDeleted + NotDeleted, AccDeleted + Deleted, nil};
({NotDeleted, Deleted, Sizes}, {AccNotDeleted, AccDeleted, AccSizes}) ->
AccSizes2 = reduce_sizes(AccSizes, Sizes),
{AccNotDeleted + NotDeleted, AccDeleted + Deleted, AccSizes2}
end,
{0, 0, #size_info{}}, Reds).
reduce_sizes(nil, _) ->
nil;
reduce_sizes(_, nil) ->
nil;
reduce_sizes(#size_info{}=S1, #size_info{}=S2) ->
#size_info{
active = S1#size_info.active + S2#size_info.active,
external = S1#size_info.external + S2#size_info.external
};
reduce_sizes(S1, S2) ->
reduce_sizes(upgrade_sizes(S1), upgrade_sizes(S2)).
btree_by_seq_reduce(reduce, DocInfos) ->
% count the number of documents
length(DocInfos);
btree_by_seq_reduce(rereduce, Reds) ->
lists:sum(Reds).
init_db(DbName, Filepath, Fd, Header0, Options) ->
Header = couch_db_header:upgrade(Header0),
{ok, FsyncOptions} = couch_util:parse_term(
config:get("couchdb", "fsync_options",
"[before_header, after_header, on_file_open]")),
case lists:member(on_file_open, FsyncOptions) of
true -> ok = couch_file:sync(Fd);
_ -> ok
end,
Compression = couch_compress:get_compression_method(),
IdTreeState = couch_db_header:id_tree_state(Header),
SeqTreeState = couch_db_header:seq_tree_state(Header),
LocalTreeState = couch_db_header:local_tree_state(Header),
{ok, IdBtree} = couch_btree:open(IdTreeState, Fd,
[{split, fun ?MODULE:btree_by_id_split/1},
{join, fun ?MODULE:btree_by_id_join/2},
{reduce, fun ?MODULE:btree_by_id_reduce/2},
{compression, Compression}]),
{ok, SeqBtree} = couch_btree:open(SeqTreeState, Fd,
[{split, fun ?MODULE:btree_by_seq_split/1},
{join, fun ?MODULE:btree_by_seq_join/2},
{reduce, fun ?MODULE:btree_by_seq_reduce/2},
{compression, Compression}]),
{ok, LocalDocsBtree} = couch_btree:open(LocalTreeState, Fd,
[{compression, Compression}]),
case couch_db_header:security_ptr(Header) of
nil ->
Security = default_security_object(DbName),
SecurityPtr = nil;
SecurityPtr ->
{ok, Security} = couch_file:pread_term(Fd, SecurityPtr)
end,
% convert start time tuple to microsecs and store as a binary string
{MegaSecs, Secs, MicroSecs} = os:timestamp(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
ok = couch_file:set_db_pid(Fd, self()),
Db = #db{
fd=Fd,
fd_monitor = erlang:monitor(process, Fd),
header=Header,
id_tree = IdBtree,
seq_tree = SeqBtree,
local_tree = LocalDocsBtree,
committed_update_seq = couch_db_header:update_seq(Header),
update_seq = couch_db_header:update_seq(Header),
name = DbName,
filepath = Filepath,
security = Security,
security_ptr = SecurityPtr,
instance_start_time = StartTime,
revs_limit = couch_db_header:revs_limit(Header),
fsync_options = FsyncOptions,
options = Options,
compression = Compression,
before_doc_update = couch_util:get_value(before_doc_update, Options, nil),
after_doc_read = couch_util:get_value(after_doc_read, Options, nil)
},
% If we just created a new UUID while upgrading a
% database then we want to flush that to disk or
% we risk sending out the uuid and having the db
% crash which would result in it generating a new
% uuid each time it was reopened.
case Header /= Header0 of
true ->
sync_header(Db, Header);
false ->
Db
end.
close_db(#db{fd_monitor = Ref}) ->
erlang:demonitor(Ref).
refresh_validate_doc_funs(#db{name = <<"shards/", _/binary>> = Name} = Db) ->
spawn(fabric, reset_validation_funs, [mem3:dbname(Name)]),
Db#db{validate_doc_funs = undefined};
refresh_validate_doc_funs(Db0) ->
Db = Db0#db{user_ctx=?ADMIN_USER},
{ok, DesignDocs} = couch_db:get_design_docs(Db),
ProcessDocFuns = lists:flatmap(
fun(DesignDocInfo) ->
{ok, DesignDoc} = couch_db:open_doc_int(
Db, DesignDocInfo, [ejson_body]),
case couch_doc:get_validate_doc_fun(DesignDoc) of
nil -> [];
Fun -> [Fun]
end
end, DesignDocs),
Db#db{validate_doc_funs=ProcessDocFuns}.
% rev tree functions
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
flush_trees(#db{fd = Fd} = Db,
[InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
{Flushed, FinalAcc} = couch_key_tree:mapfold(
fun(_Rev, Value, Type, SizesAcc) ->
case Value of
#doc{deleted = IsDeleted, body = {summary, _, _, _} = DocSummary} ->
{summary, Summary, AttSizeInfo, AttsFd} = DocSummary,
% this node value is actually an unwritten document summary,
% write to disk.
% make sure the Fd in the written bins is the same Fd we are
% and convert bins, removing the FD.
% All bins should have been written to disk already.
case {AttsFd, Fd} of
{nil, _} ->
ok;
{SameFd, SameFd} ->
ok;
_ ->
% Fd where the attachments were written to is not the same
% as our Fd. This can happen when a database is being
% switched out during a compaction.
couch_log:debug("File where the attachments are written has"
" changed. Possibly retrying.", []),
throw(retry)
end,
ExternalSize = ?term_size(Summary),
{ok, NewSummaryPointer, SummarySize} =
couch_file:append_raw_chunk(Fd, Summary),
Leaf = #leaf{
deleted = IsDeleted,
ptr = NewSummaryPointer,
seq = UpdateSeq,
sizes = #size_info{
active = SummarySize,
external = ExternalSize
},
atts = AttSizeInfo
},
{Leaf, add_sizes(Type, Leaf, SizesAcc)};
#leaf{} ->
{Value, add_sizes(Type, Value, SizesAcc)};
_ ->
{Value, SizesAcc}
end
end, {0, 0, []}, Unflushed),
{FinalAS, FinalES, FinalAtts} = FinalAcc,
TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
NewInfo = InfoUnflushed#full_doc_info{
rev_tree = Flushed,
sizes = #size_info{
active = FinalAS + TotalAttSize,
external = FinalES + TotalAttSize
}
},
flush_trees(Db, RestUnflushed, [NewInfo | AccFlushed]).
add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) ->
% Maybe upgrade from disk_size only
#size_info{
active = ActiveSize,
external = ExternalSize
} = upgrade_sizes(Sizes),
{ASAcc, ESAcc, AttsAcc} = Acc,
NewASAcc = ActiveSize + ASAcc,
NewESAcc = ESAcc + if Type == leaf -> ExternalSize; true -> 0 end,
NewAttsAcc = lists:umerge(AttSizes, AttsAcc),
{NewASAcc, NewESAcc, NewAttsAcc}.
send_result(Client, Doc, NewResult) ->
% used to send a result to the client
catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}).
doc_tag(#doc{meta=Meta}) ->
case lists:keyfind(ref, 1, Meta) of
{ref, Ref} -> Ref;
false -> throw(no_doc_tag);
Else -> throw({invalid_doc_tag, Else})
end.
merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
{ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
[OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
erlang:put(last_id_merged, OldDocInfo#full_doc_info.id), % for debugging
NewDocInfo0 = lists:foldl(fun({Client, NewDoc}, OldInfoAcc) ->
merge_rev_tree(OldInfoAcc, NewDoc, Client, Limit, MergeConflicts)
end, OldDocInfo, NewDocs),
% When MergeConflicts is false, we updated #full_doc_info.deleted on every
% iteration of merge_rev_tree. However, merge_rev_tree does not update
% #full_doc_info.deleted when MergeConflicts is true, since we don't need
% to know whether the doc is deleted between iterations. Since we still
% need to know if the doc is deleted after the merge happens, we have to
% set it here.
NewDocInfo1 = case MergeConflicts of
true ->
NewDocInfo0#full_doc_info{
deleted = couch_doc:is_deleted(NewDocInfo0)
};
false ->
NewDocInfo0
end,
if NewDocInfo1 == OldDocInfo ->
% nothing changed
merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
AccNewInfos, AccRemoveSeqs, AccSeq);
true ->
% We have updated the document, give it a new update_seq. Its
% important to note that the update_seq on OldDocInfo should
% be identical to the value on NewDocInfo1.
OldSeq = OldDocInfo#full_doc_info.update_seq,
NewDocInfo2 = NewDocInfo1#full_doc_info{
update_seq = AccSeq + 1
},
RemoveSeqs = case OldSeq of
0 -> AccRemoveSeqs;
_ -> [OldSeq | AccRemoveSeqs]
end,
merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
[NewDocInfo2|AccNewInfos], RemoveSeqs, AccSeq+1)
end.
merge_rev_tree(OldInfo, NewDoc, Client, Limit, false)
when OldInfo#full_doc_info.deleted ->
% We're recreating a document that was previously
% deleted. To check that this is a recreation from
% the root we assert that the new document has a
% revision depth of 1 (this is to avoid recreating a
% doc from a previous internal revision) and is also
% not deleted. To avoid expanding the revision tree
% unnecessarily we create a new revision based on
% the winning deleted revision.
{RevDepth, _} = NewDoc#doc.revs,
NewDeleted = NewDoc#doc.deleted,
case RevDepth == 1 andalso not NewDeleted of
true ->
% Update the new doc based on revisions in OldInfo
#doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(OldInfo),
#rev_info{rev={OldPos, OldRev}} = WinningRev,
NewRevId = couch_db:new_revid(NewDoc#doc{revs={OldPos, [OldRev]}}),
NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
% Merge our modified new doc into the tree
#full_doc_info{rev_tree=OldTree} = OldInfo,
NewTree0 = couch_doc:to_path(NewDoc2),
case couch_key_tree:merge(OldTree, NewTree0, Limit) of
{NewTree1, new_leaf} ->
% We changed the revision id so inform the caller
send_result(Client, NewDoc, {ok, {OldPos+1, NewRevId}}),
OldInfo#full_doc_info{
rev_tree = NewTree1,
deleted = false
};
_ ->
throw(doc_recreation_failed)
end;
_ ->
send_result(Client, NewDoc, conflict),
OldInfo
end;
merge_rev_tree(OldInfo, NewDoc, Client, Limit, false) ->
% We're attempting to merge a new revision into an
% undeleted document. To not be a conflict we require
% that the merge results in extending a branch.
OldTree = OldInfo#full_doc_info.rev_tree,
NewTree0 = couch_doc:to_path(NewDoc),
NewDeleted = NewDoc#doc.deleted,
case couch_key_tree:merge(OldTree, NewTree0, Limit) of
{NewTree, new_leaf} when not NewDeleted ->
OldInfo#full_doc_info{
rev_tree = NewTree,
deleted = false
};
{NewTree, new_leaf} when NewDeleted ->
% We have to check if we just deleted this
% document completely or if it was a conflict
% resolution.
OldInfo#full_doc_info{
rev_tree = NewTree,
deleted = couch_doc:is_deleted(NewTree)
};
_ ->
send_result(Client, NewDoc, conflict),
OldInfo
end;
merge_rev_tree(OldInfo, NewDoc, _Client, Limit, true) ->
% We're merging in revisions without caring about
% conflicts. Most likely this is a replication update.
OldTree = OldInfo#full_doc_info.rev_tree,
NewTree0 = couch_doc:to_path(NewDoc),
{NewTree, _} = couch_key_tree:merge(OldTree, NewTree0, Limit),
OldInfo#full_doc_info{rev_tree = NewTree}.
stem_full_doc_infos(#db{revs_limit=Limit}, DocInfos) ->
[Info#full_doc_info{rev_tree=couch_key_tree:stem(Tree, Limit)} ||
#full_doc_info{rev_tree=Tree}=Info <- DocInfos].
update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
#db{
id_tree = DocInfoByIdBTree,
seq_tree = DocInfoBySeqBTree,
update_seq = LastSeq,
revs_limit = RevsLimit
} = Db,
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
OldDocInfos = lists:zipwith(
fun(_Id, {ok, FullDocInfo}) ->
FullDocInfo;
(Id, not_found) ->
#full_doc_info{id=Id}
end,
Ids, OldDocLookups),
% Merge the new docs into the revision trees.
{ok, NewFullDocInfos, RemoveSeqs, NewSeq} = merge_rev_trees(RevsLimit,
MergeConflicts, DocsList, OldDocInfos, [], [], LastSeq),
% All documents are now ready to write.
{ok, Db2} = update_local_docs(Db, NonRepDocs),
% Write out the document summaries (the bodies are stored in the nodes of
% the trees, the attachments are already written to disk)
{ok, IndexFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
% and the indexes
{ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, IndexFullDocInfos, []),
{ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, IndexFullDocInfos, RemoveSeqs),
WriteCount = length(IndexFullDocInfos),
couch_stats:increment_counter([couchdb, document_inserts],
WriteCount - length(RemoveSeqs)),
couch_stats:increment_counter([couchdb, document_writes], WriteCount),
couch_stats:increment_counter(
[couchdb, local_document_writes],
length(NonRepDocs)
),
Db3 = Db2#db{
id_tree = DocInfoByIdBTree2,
seq_tree = DocInfoBySeqBTree2,
update_seq = NewSeq},
% Check if we just updated any design documents, and update the validation
% funs if we did.
UpdatedDDocIds = lists:flatmap(fun
(<<"_design/", _/binary>> = Id) -> [Id];
(_) -> []
end, Ids),
Db4 = case length(UpdatedDDocIds) > 0 of
true ->
couch_event:notify(Db3#db.name, ddoc_updated),
ddoc_cache:evict(Db3#db.name, UpdatedDDocIds),
refresh_validate_doc_funs(Db3);
false ->
Db3
end,
{ok, commit_data(Db4, not FullCommit), UpdatedDDocIds}.
update_local_docs(Db, []) ->
{ok, Db};
update_local_docs(#db{local_tree=Btree}=Db, Docs) ->
BtreeEntries = lists:map(
fun({Client, NewDoc}) ->
#doc{
id = Id,
deleted = Delete,
revs = {0, PrevRevs},
body = Body
} = NewDoc,
case PrevRevs of
[RevStr|_] ->
PrevRev = list_to_integer(?b2l(RevStr));
[] ->
PrevRev = 0
end,
case Delete of
false ->
send_result(Client, NewDoc, {ok,
{0, ?l2b(integer_to_list(PrevRev + 1))}}),
{update, {Id, {PrevRev + 1, Body}}};
true ->
send_result(Client, NewDoc,
{ok, {0, <<"0">>}}),
{remove, Id}
end
end, Docs),
BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
BtreeIdsUpdate = [{Key, Val} || {update, {Key, Val}} <- BtreeEntries],
{ok, Btree2} =
couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
{ok, Db#db{local_tree = Btree2}}.
db_to_header(Db, Header) ->
couch_db_header:set(Header, [
{update_seq, Db#db.update_seq},
{seq_tree_state, couch_btree:get_state(Db#db.seq_tree)},
{id_tree_state, couch_btree:get_state(Db#db.id_tree)},
{local_tree_state, couch_btree:get_state(Db#db.local_tree)},
{security_ptr, Db#db.security_ptr},
{revs_limit, Db#db.revs_limit}
]).
commit_data(Db) ->
commit_data(Db, false).
commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
TRef = erlang:send_after(1000,self(),delayed_commit),
Db#db{waiting_delayed_commit=TRef};
commit_data(Db, true) ->
Db;
commit_data(Db, _) ->
#db{
header = OldHeader,
waiting_delayed_commit = Timer
} = Db,
if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
case db_to_header(Db, OldHeader) of
OldHeader -> Db#db{waiting_delayed_commit=nil};
NewHeader -> sync_header(Db, NewHeader)
end.
sync_header(Db, NewHeader) ->
#db{
fd = Fd,
filepath = FilePath,
fsync_options = FsyncOptions,
waiting_delayed_commit = Timer
} = Db,
if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
Before = lists:member(before_header, FsyncOptions),
After = lists:member(after_header, FsyncOptions),
if Before -> couch_file:sync(FilePath); true -> ok end,
ok = couch_file:write_header(Fd, NewHeader),
if After -> couch_file:sync(FilePath); true -> ok end,
Db#db{
header=NewHeader,
committed_update_seq=Db#db.update_seq,
waiting_delayed_commit=nil
}.
copy_doc_attachments(#db{fd = SrcFd} = SrcDb, SrcSp, DestFd) ->
{ok, {BodyData, BinInfos0}} = couch_db:read_doc(SrcDb, SrcSp),
BinInfos = case BinInfos0 of
_ when is_binary(BinInfos0) ->
couch_compress:decompress(BinInfos0);
_ when is_list(BinInfos0) ->
% pre 1.2 file format
BinInfos0
end,
% copy the bin values
NewBinInfos = lists:map(
fun({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}) ->
% 010 UPGRADE CODE
{NewBinSp, AttLen, AttLen, ActualMd5, _IdentityMd5} =
couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
check_md5(ExpectedMd5, ActualMd5),
{Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc1}) ->
{NewBinSp, AttLen, _, ActualMd5, _IdentityMd5} =
couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
check_md5(ExpectedMd5, ActualMd5),
Enc = case Enc1 of
true ->
% 0110 UPGRADE CODE
gzip;
false ->
% 0110 UPGRADE CODE
identity;
_ ->
Enc1
end,
{Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}
end, BinInfos),
{BodyData, NewBinInfos}.
merge_lookups(Infos, []) ->
Infos;
merge_lookups([], _) ->
[];
merge_lookups([#doc_info{}=DI | RestInfos], [{ok, FDI} | RestLookups]) ->
% Assert we've matched our lookups
if DI#doc_info.id == FDI#full_doc_info.id -> ok; true ->
erlang:error({mismatched_doc_infos, DI#doc_info.id})
end,
[FDI | merge_lookups(RestInfos, RestLookups)];
merge_lookups([FDI | RestInfos], Lookups) ->
[FDI | merge_lookups(RestInfos, Lookups)].
check_md5(Md5, Md5) -> ok;
check_md5(_, _) -> throw(md5_mismatch).
copy_docs(Db, #db{fd = DestFd} = NewDb, MixedInfos, Retry) ->
DocInfoIds = [Id || #doc_info{id=Id} <- MixedInfos],
LookupResults = couch_btree:lookup(Db#db.id_tree, DocInfoIds),
% COUCHDB-968, make sure we prune duplicates during compaction
NewInfos0 = lists:usort(fun(#full_doc_info{id=A}, #full_doc_info{id=B}) ->
A =< B
end, merge_lookups(MixedInfos, LookupResults)),
NewInfos1 = lists:map(fun(Info) ->
{NewRevTree, FinalAcc} = couch_key_tree:mapfold(fun
(_Rev, #leaf{ptr=Sp}=Leaf, leaf, SizesAcc) ->
{Body, AttInfos} = copy_doc_attachments(Db, Sp, DestFd),
SummaryChunk = make_doc_summary(NewDb, {Body, AttInfos}),
ExternalSize = ?term_size(SummaryChunk),
{ok, Pos, SummarySize} = couch_file:append_raw_chunk(
DestFd, SummaryChunk),
AttSizes = [{element(3,A), element(4,A)} || A <- AttInfos],
NewLeaf = Leaf#leaf{
ptr = Pos,
sizes = #size_info{
active = SummarySize,
external = ExternalSize
},
atts = AttSizes
},
{NewLeaf, add_sizes(leaf, NewLeaf, SizesAcc)};
(_Rev, _Leaf, branch, SizesAcc) ->
{?REV_MISSING, SizesAcc}
end, {0, 0, []}, Info#full_doc_info.rev_tree),
{FinalAS, FinalES, FinalAtts} = FinalAcc,
TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
NewActiveSize = FinalAS + TotalAttSize,
NewExternalSize = FinalES + TotalAttSize,
Info#full_doc_info{
rev_tree = NewRevTree,
sizes = #size_info{
active = NewActiveSize,
external = NewExternalSize
}
}
end, NewInfos0),
NewInfos = stem_full_doc_infos(Db, NewInfos1),
RemoveSeqs =
case Retry of
nil ->
[];
OldDocIdTree ->
% Compaction is being rerun to catch up to writes during the
% first pass. This means we may have docs that already exist
% in the seq_tree in the .data file. Here we lookup any old
% update_seqs so that they can be removed.
Ids = [Id || #full_doc_info{id=Id} <- NewInfos],
Existing = couch_btree:lookup(OldDocIdTree, Ids),
[Seq || {ok, #full_doc_info{update_seq=Seq}} <- Existing]
end,
{ok, SeqTree} = couch_btree:add_remove(
NewDb#db.seq_tree, NewInfos, RemoveSeqs),
FDIKVs = lists:map(fun(#full_doc_info{id=Id, update_seq=Seq}=FDI) ->
{{Id, Seq}, FDI}
end, NewInfos),
{ok, IdEms} = couch_emsort:add(NewDb#db.id_tree, FDIKVs),
update_compact_task(length(NewInfos)),
NewDb#db{id_tree=IdEms, seq_tree=SeqTree}.
copy_compact(Db, NewDb0, Retry) ->
Compression = couch_compress:get_compression_method(),
NewDb = NewDb0#db{compression=Compression},
TotalChanges = couch_db:count_changes_since(Db, NewDb#db.update_seq),
BufferSize = list_to_integer(
config:get("database_compaction", "doc_buffer_size", "524288")),
CheckpointAfter = couch_util:to_integer(
config:get("database_compaction", "checkpoint_after",
BufferSize * 10)),
EnumBySeqFun =
fun(DocInfo, _Offset,
{AccNewDb, AccUncopied, AccUncopiedSize, AccCopiedSize}) ->
Seq = case DocInfo of
#full_doc_info{} -> DocInfo#full_doc_info.update_seq;
#doc_info{} -> DocInfo#doc_info.high_seq
end,
AccUncopiedSize2 = AccUncopiedSize + ?term_size(DocInfo),
if AccUncopiedSize2 >= BufferSize ->
NewDb2 = copy_docs(
Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
AccCopiedSize2 = AccCopiedSize + AccUncopiedSize2,
if AccCopiedSize2 >= CheckpointAfter ->
CommNewDb2 = commit_compaction_data(NewDb2#db{update_seq=Seq}),
{ok, {CommNewDb2, [], 0, 0}};
true ->
{ok, {NewDb2#db{update_seq = Seq}, [], 0, AccCopiedSize2}}
end;
true ->
{ok, {AccNewDb, [DocInfo | AccUncopied], AccUncopiedSize2,
AccCopiedSize}}
end
end,
TaskProps0 = [
{type, database_compaction},
{database, Db#db.name},
{progress, 0},
{changes_done, 0},
{total_changes, TotalChanges}
],
case (Retry =/= nil) and couch_task_status:is_task_added() of
true ->
couch_task_status:update([
{retry, true},
{progress, 0},
{changes_done, 0},
{total_changes, TotalChanges}
]);
false ->
couch_task_status:add_task(TaskProps0),
couch_task_status:set_update_frequency(500)
end,
{ok, _, {NewDb2, Uncopied, _, _}} =
couch_btree:foldl(Db#db.seq_tree, EnumBySeqFun,
{NewDb, [], 0, 0},
[{start_key, NewDb#db.update_seq + 1}]),
NewDb3 = copy_docs(Db, NewDb2, lists:reverse(Uncopied), Retry),
% copy misc header values
if NewDb3#db.security /= Db#db.security ->
{ok, Ptr, _} = couch_file:append_term(
NewDb3#db.fd, Db#db.security,
[{compression, NewDb3#db.compression}]),
NewDb4 = NewDb3#db{security=Db#db.security, security_ptr=Ptr};
true ->
NewDb4 = NewDb3
end,
commit_compaction_data(NewDb4#db{update_seq=Db#db.update_seq}).
start_copy_compact(#db{}=Db) ->
erlang:put(io_priority, {db_compact, Db#db.name}),
#db{name=Name, filepath=Filepath, options=Options, header=Header} = Db,
couch_log:debug("Compaction process spawned for db \"~s\"", [Name]),
{ok, NewDb, DName, DFd, MFd, Retry} =
open_compaction_files(Name, Header, Filepath, Options),
erlang:monitor(process, MFd),
% This is a bit worrisome. init_db/4 will monitor the data fd
% but it doesn't know about the meta fd. For now I'll maintain
% that the data fd is the old normal fd and meta fd is special
% and hope everything works out for the best.
unlink(DFd),
NewDb1 = copy_purge_info(Db, NewDb),
NewDb2 = copy_compact(Db, NewDb1, Retry),
NewDb3 = sort_meta_data(NewDb2),
NewDb4 = commit_compaction_data(NewDb3),
NewDb5 = copy_meta_data(NewDb4),
NewDb6 = sync_header(NewDb5, db_to_header(NewDb5, NewDb5#db.header)),
close_db(NewDb6),
ok = couch_file:close(MFd),
gen_server:cast(Db#db.main_pid, {compact_done, DName}).
open_compaction_files(DbName, SrcHdr, DbFilePath, Options) ->
DataFile = DbFilePath ++ ".compact.data",
MetaFile = DbFilePath ++ ".compact.meta",
{ok, DataFd, DataHdr} = open_compaction_file(DataFile),
{ok, MetaFd, MetaHdr} = open_compaction_file(MetaFile),
DataHdrIsDbHdr = couch_db_header:is_header(DataHdr),
case {DataHdr, MetaHdr} of
{#comp_header{}=A, #comp_header{}=A} ->
DbHeader = A#comp_header.db_header,
Db0 = init_db(DbName, DataFile, DataFd, DbHeader, Options),
Db1 = bind_emsort(Db0, MetaFd, A#comp_header.meta_state),
{ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
_ when DataHdrIsDbHdr ->
ok = reset_compaction_file(MetaFd, couch_db_header:from(SrcHdr)),
Db0 = init_db(DbName, DataFile, DataFd, DataHdr, Options),
Db1 = bind_emsort(Db0, MetaFd, nil),
{ok, Db1, DataFile, DataFd, MetaFd, Db0#db.id_tree};
_ ->
Header = couch_db_header:from(SrcHdr),
ok = reset_compaction_file(DataFd, Header),
ok = reset_compaction_file(MetaFd, Header),
Db0 = init_db(DbName, DataFile, DataFd, Header, Options),
Db1 = bind_emsort(Db0, MetaFd, nil),
{ok, Db1, DataFile, DataFd, MetaFd, nil}
end.
open_compaction_file(FilePath) ->
case couch_file:open(FilePath, [nologifmissing]) of
{ok, Fd} ->
case couch_file:read_header(Fd) of
{ok, Header} -> {ok, Fd, Header};
no_valid_header -> {ok, Fd, nil}
end;
{error, enoent} ->
{ok, Fd} = couch_file:open(FilePath, [create]),
{ok, Fd, nil}
end.
reset_compaction_file(Fd, Header) ->
ok = couch_file:truncate(Fd, 0),
ok = couch_file:write_header(Fd, Header).
copy_purge_info(OldDb, NewDb) ->
OldHdr = OldDb#db.header,
NewHdr = NewDb#db.header,
OldPurgeSeq = couch_db_header:purge_seq(OldHdr),
if OldPurgeSeq > 0 ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(OldDb),
Opts = [{compression, NewDb#db.compression}],
{ok, Ptr, _} = couch_file:append_term(NewDb#db.fd, PurgedIdsRevs, Opts),
NewNewHdr = couch_db_header:set(NewHdr, [
{purge_seq, OldPurgeSeq},
{purged_docs, Ptr}
]),
NewDb#db{header = NewNewHdr};
true ->
NewDb
end.
commit_compaction_data(#db{}=Db) ->
% Compaction needs to write headers to both the data file
% and the meta file so if we need to restart we can pick
% back up from where we left off.
commit_compaction_data(Db, couch_emsort:get_fd(Db#db.id_tree)),
commit_compaction_data(Db, Db#db.fd).
commit_compaction_data(#db{header=OldHeader}=Db0, Fd) ->
% Mostly copied from commit_data/2 but I have to
% replace the logic to commit and fsync to a specific
% fd instead of the Filepath stuff that commit_data/2
% does.
DataState = couch_db_header:id_tree_state(OldHeader),
MetaFd = couch_emsort:get_fd(Db0#db.id_tree),
MetaState = couch_emsort:get_state(Db0#db.id_tree),
Db1 = bind_id_tree(Db0, Db0#db.fd, DataState),
Header = db_to_header(Db1, OldHeader),
CompHeader = #comp_header{
db_header = Header,
meta_state = MetaState
},
ok = couch_file:sync(Fd),
ok = couch_file:write_header(Fd, CompHeader),
Db2 = Db1#db{
waiting_delayed_commit=nil,
header=Header,
committed_update_seq=Db1#db.update_seq
},
bind_emsort(Db2, MetaFd, MetaState).
bind_emsort(Db, Fd, nil) ->
{ok, Ems} = couch_emsort:open(Fd),
Db#db{id_tree=Ems};
bind_emsort(Db, Fd, State) ->
{ok, Ems} = couch_emsort:open(Fd, [{root, State}]),
Db#db{id_tree=Ems}.
bind_id_tree(Db, Fd, State) ->
{ok, IdBtree} = couch_btree:open(State, Fd, [
{split, fun ?MODULE:btree_by_id_split/1},
{join, fun ?MODULE:btree_by_id_join/2},
{reduce, fun ?MODULE:btree_by_id_reduce/2}
]),
Db#db{id_tree=IdBtree}.
sort_meta_data(Db0) ->
{ok, Ems} = couch_emsort:merge(Db0#db.id_tree),
Db0#db{id_tree=Ems}.
copy_meta_data(#db{fd=Fd, header=Header}=Db) ->
Src = Db#db.id_tree,
DstState = couch_db_header:id_tree_state(Header),
{ok, IdTree0} = couch_btree:open(DstState, Fd, [
{split, fun ?MODULE:btree_by_id_split/1},
{join, fun ?MODULE:btree_by_id_join/2},
{reduce, fun ?MODULE:btree_by_id_reduce/2}
]),
{ok, Iter} = couch_emsort:iter(Src),
Acc0 = #merge_st{
id_tree=IdTree0,
seq_tree=Db#db.seq_tree,
rem_seqs=[],
infos=[]
},
Acc = merge_docids(Iter, Acc0),
{ok, IdTree} = couch_btree:add(Acc#merge_st.id_tree, Acc#merge_st.infos),
{ok, SeqTree} = couch_btree:add_remove(
Acc#merge_st.seq_tree, [], Acc#merge_st.rem_seqs
),
Db#db{id_tree=IdTree, seq_tree=SeqTree}.
merge_docids(Iter, #merge_st{infos=Infos}=Acc) when length(Infos) > 1000 ->
#merge_st{
id_tree=IdTree0,
seq_tree=SeqTree0,
rem_seqs=RemSeqs
} = Acc,
{ok, IdTree1} = couch_btree:add(IdTree0, Infos),
{ok, SeqTree1} = couch_btree:add_remove(SeqTree0, [], RemSeqs),
Acc1 = Acc#merge_st{
id_tree=IdTree1,
seq_tree=SeqTree1,
rem_seqs=[],
infos=[]
},
merge_docids(Iter, Acc1);
merge_docids(Iter, #merge_st{curr=Curr}=Acc) ->
case next_info(Iter, Curr, []) of
{NextIter, NewCurr, FDI, Seqs} ->
Acc1 = Acc#merge_st{
infos = [FDI | Acc#merge_st.infos],
rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
curr = NewCurr
},
merge_docids(NextIter, Acc1);
{finished, FDI, Seqs} ->
Acc#merge_st{
infos = [FDI | Acc#merge_st.infos],
rem_seqs = Seqs ++ Acc#merge_st.rem_seqs,
curr = undefined
};
empty ->
Acc
end.
next_info(Iter, undefined, []) ->
case couch_emsort:next(Iter) of
{ok, {{Id, Seq}, FDI}, NextIter} ->
next_info(NextIter, {Id, Seq, FDI}, []);
finished ->
empty
end;
next_info(Iter, {Id, Seq, FDI}, Seqs) ->
case couch_emsort:next(Iter) of
{ok, {{Id, NSeq}, NFDI}, NextIter} ->
next_info(NextIter, {Id, NSeq, NFDI}, [Seq | Seqs]);
{ok, {{NId, NSeq}, NFDI}, NextIter} ->
{NextIter, {NId, NSeq, NFDI}, FDI, Seqs};
finished ->
{finished, FDI, Seqs}
end.
update_compact_task(NumChanges) ->
[Changes, Total] = couch_task_status:get([changes_done, total_changes]),
Changes2 = Changes + NumChanges,
Progress = case Total of
0 ->
0;
_ ->
(Changes2 * 100) div Total
end,
couch_task_status:update([{changes_done, Changes2}, {progress, Progress}]).
make_doc_summary(#db{compression = Comp}, {Body0, Atts0}) ->
Body = case couch_compress:is_compressed(Body0, Comp) of
true ->
Body0;
false ->
% pre 1.2 database file format
couch_compress:compress(Body0, Comp)
end,
Atts = case couch_compress:is_compressed(Atts0, Comp) of
true ->
Atts0;
false ->
couch_compress:compress(Atts0, Comp)
end,
SummaryBin = ?term_to_bin({Body, Atts}),
couch_file:assemble_file_chunk(SummaryBin, couch_crypto:hash(md5, SummaryBin)).
default_security_object(<<"shards/", _/binary>>) ->
case config:get("couchdb", "default_security", "everyone") of
"admin_only" ->
[{<<"members">>,{[{<<"roles">>,[<<"_admin">>]}]}},
{<<"admins">>,{[{<<"roles">>,[<<"_admin">>]}]}}];
Everyone when Everyone == "everyone"; Everyone == "admin_local" ->
[]
end;
default_security_object(_DbName) ->
case config:get("couchdb", "default_security", "everyone") of
Admin when Admin == "admin_only"; Admin == "admin_local" ->
[{<<"members">>,{[{<<"roles">>,[<<"_admin">>]}]}},
{<<"admins">>,{[{<<"roles">>,[<<"_admin">>]}]}}];
"everyone" ->
[]
end.