-export([add_sizes/3, upgrade_sizes/1]).
-define(IDLE_LIMIT_DEFAULT, 61000).
init({Engine, DbName, FilePath, Options0}) ->
erlang:put(io_priority, {db_update, DbName}),
DefaultSecObj = default_security_object(DbName),
Options = [{default_security_object, DefaultSecObj} | Options0],
{ok, EngineState} = couch_db_engine:init(Engine, FilePath, Options),
Db = init_db(DbName, FilePath, EngineState, Options),
case lists:member(sys_db, Options) of
false ->
couch_stats_process_tracker:track([couchdb, open_databases]);
true ->
% Don't load validation funs here because the fabric query is
% liable to race conditions. Instead see
% couch_db:validate_doc_update, which loads them lazily.
NewDb = Db#db{main_pid = self()},
proc_lib:init_ack({ok, NewDb}),
gen_server:enter_loop(?MODULE, [], NewDb, idle_limit())
throw:InitError ->
terminate(Reason, Db) ->
couch_db_engine:terminate(Reason, Db),
handle_call(get_db, _From, Db) ->
{reply, {ok, Db}, Db, idle_limit()};
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db, idle_limit()}; % no data waiting, return ok immediately
handle_call(full_commit, _From, Db) ->
{reply, ok, commit_data(Db), idle_limit()};
handle_call({full_commit, RequiredSeq}, _From, Db)
when RequiredSeq =< Db#db.committed_update_seq ->
{reply, ok, Db, idle_limit()};
handle_call({full_commit, _}, _, Db) ->
{reply, ok, commit_data(Db), idle_limit()}; % commit the data and return ok
handle_call(start_compact, _From, Db) ->
{noreply, NewDb, _Timeout} = handle_cast(start_compact, Db),
{reply, {ok, NewDb#db.compactor_pid}, NewDb, idle_limit()};
handle_call(compactor_pid, _From, #db{compactor_pid = Pid} = Db) ->
{reply, Pid, Db, idle_limit()};
handle_call(cancel_compact, _From, #db{compactor_pid = nil} = Db) ->
{reply, ok, Db, idle_limit()};
handle_call(cancel_compact, _From, #db{compactor_pid = Pid} = Db) ->
exit(Pid, kill),
Db2 = Db#db{compactor_pid = nil},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{reply, ok, Db2, idle_limit()};
handle_call({set_security, NewSec}, _From, #db{} = Db) ->
{ok, NewDb} = couch_db_engine:set_security(Db, NewSec),
NewSecDb = commit_data(NewDb#db{
security = NewSec
ok = gen_server:call(couch_server, {db_updated, NewSecDb}, infinity),
{reply, ok, NewSecDb, idle_limit()};
handle_call({set_revs_limit, Limit}, _From, Db) ->
{ok, Db2} = couch_db_engine:set_revs_limit(Db, Limit),
Db3 = commit_data(Db2),
ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
{reply, ok, Db3, idle_limit()};
handle_call({purge_docs, _IdRevs}, _From,
#db{compactor_pid=Pid}=Db) when Pid /= nil ->
{reply, {error, purge_during_compaction}, Db, idle_limit()};
handle_call({purge_docs, IdRevs}, _From, Db) ->
DocIds = [Id || {Id, _Revs} <- IdRevs],
OldDocInfos = couch_db_engine:open_docs(Db, DocIds),
NewDocInfos = lists:flatmap(fun
({{Id, Revs}, #full_doc_info{id = Id, rev_tree = Tree} = FDI}) ->
case couch_key_tree:remove_leafs(Tree, Revs) of
{_, [] = _RemovedRevs} -> % no change
{NewTree, RemovedRevs} ->
NewFDI = FDI#full_doc_info{rev_tree = NewTree},
[{FDI, NewFDI, RemovedRevs}]
({_, not_found}) ->
end, lists:zip(IdRevs, OldDocInfos)),
InitUpdateSeq = couch_db_engine:get_update_seq(Db),
InitAcc = {InitUpdateSeq, [], []},
FinalAcc = lists:foldl(fun({_, #full_doc_info{} = OldFDI, RemRevs}, Acc) ->
id = Id,
rev_tree = OldTree
} = OldFDI,
{SeqAcc0, FDIAcc, IdRevsAcc} = Acc,
{NewFDIAcc, NewSeqAcc} = case OldTree of
[] ->
% If we purged every #leaf{} in the doc record
% then we're removing it completely from the
% database.
_ ->
% Its possible to purge the #leaf{} that contains
% the update_seq where this doc sits in the update_seq
% sequence. Rather than do a bunch of complicated checks
% we just re-label every #leaf{} and reinsert it into
% the update_seq sequence.
{NewTree, SeqAcc1} = couch_key_tree:mapfold(fun
(_RevId, Leaf, leaf, InnerSeqAcc) ->
{Leaf#leaf{seq = InnerSeqAcc + 1}, InnerSeqAcc + 1};
(_RevId, Value, _Type, InnerSeqAcc) ->
{Value, InnerSeqAcc}
end, SeqAcc0, OldTree),
NewFDI = OldFDI#full_doc_info{
update_seq = SeqAcc1,
rev_tree = NewTree
{[NewFDI | FDIAcc], SeqAcc1}
NewIdRevsAcc = [{Id, RemRevs} | IdRevsAcc],
{NewSeqAcc, NewFDIAcc, NewIdRevsAcc}
end, InitAcc, NewDocInfos),
{_FinalSeq, FDIs, PurgedIdRevs} = FinalAcc,
% We need to only use the list of #full_doc_info{} records
% that we have actually changed due to a purge.
PreviousFDIs = [PrevFDI || {PrevFDI, _, _} <- NewDocInfos],
Pairs = pair_purge_info(PreviousFDIs, FDIs),
{ok, Db2} = couch_db_engine:write_doc_infos(Db, Pairs, [], PurgedIdRevs),
Db3 = commit_data(Db2),
ok = gen_server:call(couch_server, {db_updated, Db3}, infinity),
couch_event:notify(, updated),
PurgeSeq = couch_db_engine:get_purge_seq(Db3),
{reply, {ok, PurgeSeq, PurgedIdRevs}, Db3, idle_limit()};
handle_call(Msg, From, Db) ->
case couch_db_engine:handle_db_updater_call(Msg, From, Db) of
{reply, Resp, NewDb} ->
{reply, Resp, NewDb, idle_limit()};
Else ->
handle_cast({load_validation_funs, ValidationFuns}, Db) ->
Db2 = Db#db{validate_doc_funs = ValidationFuns},
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{noreply, Db2, idle_limit()};
handle_cast(start_compact, Db) ->
case Db#db.compactor_pid of
nil ->
% For now we only support compacting to the same
% storage engine. After the first round of patches
% we'll add a field that sets the target engine
% type to compact to with a new copy compactor.
UpdateSeq = couch_db_engine:get_update_seq(Db),
Args = [, UpdateSeq],
couch_log:info("Starting compaction for db \"~s\" at ~p", Args),
{ok, Db2} = couch_db_engine:start_compaction(Db),
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{noreply, Db2, idle_limit()};
_ ->
% compact currently running, this is a no-op
{noreply, Db, idle_limit()}
handle_cast({compact_done, _Engine, CompactInfo}, #db{} = OldDb) ->
{ok, NewDb} = couch_db_engine:finish_compaction(OldDb, CompactInfo),
{noreply, NewDb};
handle_cast(wakeup, Db) ->
{noreply, Db, idle_limit()};
handle_cast(Msg, #db{name = Name} = Db) ->
couch_log:error("Database `~s` updater received unexpected cast: ~p",
[Name, Msg]),
{stop, Msg, Db}.
handle_info({update_docs, Client, GroupedDocs, NonRepDocs, MergeConflicts,
FullCommit}, Db) ->
GroupedDocs2 = sort_and_tag_grouped_docs(Client, GroupedDocs),
if NonRepDocs == [] ->
{GroupedDocs3, Clients, FullCommit2} = collect_updates(GroupedDocs2,
[Client], MergeConflicts, FullCommit);
true ->
GroupedDocs3 = GroupedDocs2,
FullCommit2 = FullCommit,
Clients = [Client]
NonRepDocs2 = [{Client, NRDoc} || NRDoc <- NonRepDocs],
try update_docs_int(Db, GroupedDocs3, NonRepDocs2, MergeConflicts,
FullCommit2) of
{ok, Db2, UpdatedDDocIds} ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
case {couch_db:get_update_seq(Db), couch_db:get_update_seq(Db2)} of
{Seq, Seq} -> ok;
_ -> couch_event:notify(, updated)
if NonRepDocs2 /= [] ->
couch_event:notify(, local_updated);
true -> ok
[catch(ClientPid ! {done, self()}) || ClientPid <- Clients],
Db3 = case length(UpdatedDDocIds) > 0 of
true ->
% Ken and ddoc_cache are the only things that
% use the unspecified ddoc_updated message. We
% should update them to use the new message per
% ddoc.
lists:foreach(fun(DDocId) ->
couch_event:notify(, {ddoc_updated, DDocId})
end, UpdatedDDocIds),
couch_event:notify(, ddoc_updated),
ddoc_cache:refresh(, UpdatedDDocIds),
false ->
{noreply, Db3, hibernate_if_no_idle_limit()}
throw: retry ->
[catch(ClientPid ! {retry, self()}) || ClientPid <- Clients],
{noreply, Db, hibernate_if_no_idle_limit()}
handle_info(delayed_commit, #db{waiting_delayed_commit=nil}=Db) ->
%no outstanding delayed commits, ignore
{noreply, Db, idle_limit()};
handle_info(delayed_commit, Db) ->
case commit_data(Db) of
Db ->
{noreply, Db, idle_limit()};
Db2 ->
ok = gen_server:call(couch_server, {db_updated, Db2}, infinity),
{noreply, Db2, idle_limit()}
handle_info({'EXIT', _Pid, normal}, Db) ->
{noreply, Db, idle_limit()};
handle_info({'EXIT', _Pid, Reason}, Db) ->
{stop, Reason, Db};
handle_info(timeout, #db{name=DbName} = Db) ->
IdleLimitMSec = update_idle_limit_from_config(),
case couch_db:is_idle(Db) of
true ->
LastActivity = couch_db_engine:last_activity(Db),
DtMSec = timer:now_diff(os:timestamp(), LastActivity) div 1000,
MSecSinceLastActivity = max(0, DtMSec),
case MSecSinceLastActivity > IdleLimitMSec of
true ->
ok = couch_server:close_db_if_idle(DbName);
false ->
false ->
% Send a message to wake up and then hibernate. Hibernation here is done to
% force a thorough garbage collection.
gen_server:cast(self(), wakeup),
{noreply, Db, hibernate};
handle_info(Msg, Db) ->
case couch_db_engine:handle_db_updater_info(Msg, Db) of
{noreply, NewDb} ->
{noreply, NewDb, idle_limit()};
Else ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
sort_and_tag_grouped_docs(Client, GroupedDocs) ->
% These groups should already be sorted but sometimes clients misbehave.
% The merge_updates function will fail and the database can end up with
% duplicate documents if the incoming groups are not sorted, so as a sanity
% check we sort them again here. See COUCHDB-2735.
Cmp = fun([#doc{id=A}|_], [#doc{id=B}|_]) -> A < B end,
lists:map(fun(DocGroup) ->
[{Client, maybe_tag_doc(D)} || D <- DocGroup]
end, lists:sort(Cmp, GroupedDocs)).
maybe_tag_doc(#doc{id=Id, revs={Pos,[_Rev|PrevRevs]}, meta=Meta0}=Doc) ->
case lists:keymember(ref, 1, Meta0) of
true ->
false ->
Key = {Id, {Pos-1, PrevRevs}},
Doc#doc{meta=[{ref, Key} | Meta0]}
merge_updates([[{_,#doc{id=X}}|_]=A|RestA], [[{_,#doc{id=X}}|_]=B|RestB]) ->
[A++B | merge_updates(RestA, RestB)];
merge_updates([[{_,#doc{id=X}}|_]|_]=A, [[{_,#doc{id=Y}}|_]|_]=B) when X < Y ->
[hd(A) | merge_updates(tl(A), B)];
merge_updates([[{_,#doc{id=X}}|_]|_]=A, [[{_,#doc{id=Y}}|_]|_]=B) when X > Y ->
[hd(B) | merge_updates(A, tl(B))];
merge_updates([], RestB) ->
merge_updates(RestA, []) ->
collect_updates(GroupedDocsAcc, ClientsAcc, MergeConflicts, FullCommit) ->
% Only collect updates with the same MergeConflicts flag and without
% local docs. It's easier to just avoid multiple _local doc
% updaters than deal with their possible conflicts, and local docs
% writes are relatively rare. Can be optmized later if really needed.
{update_docs, Client, GroupedDocs, [], MergeConflicts, FullCommit2} ->
GroupedDocs2 = sort_and_tag_grouped_docs(Client, GroupedDocs),
GroupedDocsAcc2 =
merge_updates(GroupedDocsAcc, GroupedDocs2),
collect_updates(GroupedDocsAcc2, [Client | ClientsAcc],
MergeConflicts, (FullCommit or FullCommit2))
after 0 ->
{GroupedDocsAcc, ClientsAcc, FullCommit}
init_db(DbName, FilePath, EngineState, Options) ->
% convert start time tuple to microsecs and store as a binary string
{MegaSecs, Secs, MicroSecs} = os:timestamp(),
StartTime = ?l2b(io_lib:format("~p",
[(MegaSecs*1000000*1000000) + (Secs*1000000) + MicroSecs])),
BDU = couch_util:get_value(before_doc_update, Options, nil),
ADR = couch_util:get_value(after_doc_read, Options, nil),
CleanedOpts = [Opt || Opt <- Options, Opt /= create],
InitDb = #db{
name = DbName,
filepath = FilePath,
engine = EngineState,
instance_start_time = StartTime,
options = CleanedOpts,
before_doc_update = BDU,
after_doc_read = ADR
committed_update_seq = couch_db_engine:get_update_seq(InitDb),
security = couch_db_engine:get_security(InitDb)
refresh_validate_doc_funs(#db{name = <<"shards/", _/binary>> = Name} = Db) ->
spawn(fabric, reset_validation_funs, [mem3:dbname(Name)]),
Db#db{validate_doc_funs = undefined};
refresh_validate_doc_funs(Db0) ->
Db = Db0#db{user_ctx=?ADMIN_USER},
{ok, DesignDocs} = couch_db:get_design_docs(Db),
ProcessDocFuns = lists:flatmap(
fun(DesignDocInfo) ->
{ok, DesignDoc} = couch_db:open_doc_int(
Db, DesignDocInfo, [ejson_body]),
case couch_doc:get_validate_doc_fun(DesignDoc) of
nil -> [];
Fun -> [Fun]
end, DesignDocs),
% rev tree functions
flush_trees(_Db, [], AccFlushedTrees) ->
{ok, lists:reverse(AccFlushedTrees)};
flush_trees(#db{} = Db,
[InfoUnflushed | RestUnflushed], AccFlushed) ->
#full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
{Flushed, FinalAcc} = couch_key_tree:mapfold(
fun(_Rev, Value, Type, SizesAcc) ->
case Value of
% This node is a document summary that needs to be
% flushed to disk.
#doc{} = Doc ->
check_doc_atts(Db, Doc),
ExternalSize = get_meta_body_size(Value#doc.meta),
{size_info, AttSizeInfo} =
lists:keyfind(size_info, 1, Doc#doc.meta),
{ok, NewDoc, WrittenSize} =
couch_db_engine:write_doc_body(Db, Doc),
Leaf = #leaf{
deleted = Doc#doc.deleted,
ptr = NewDoc#doc.body,
seq = UpdateSeq,
sizes = #size_info{
active = WrittenSize,
external = ExternalSize
atts = AttSizeInfo
{Leaf, add_sizes(Type, Leaf, SizesAcc)};
#leaf{} ->
{Value, add_sizes(Type, Value, SizesAcc)};
_ ->
{Value, SizesAcc}
end, {0, 0, []}, Unflushed),
{FinalAS, FinalES, FinalAtts} = FinalAcc,
TotalAttSize = lists:foldl(fun({_, S}, A) -> S + A end, 0, FinalAtts),
NewInfo = InfoUnflushed#full_doc_info{
rev_tree = Flushed,
sizes = #size_info{
active = FinalAS + TotalAttSize,
external = FinalES + TotalAttSize
flush_trees(Db, RestUnflushed, [NewInfo | AccFlushed]).
check_doc_atts(Db, Doc) ->
{atts_stream, Stream} = lists:keyfind(atts_stream, 1, Doc#doc.meta),
% Make sure that the attachments were written to the currently
% active attachment stream. If compaction swaps during a write
% request we may have to rewrite our attachment bodies.
if Stream == nil -> ok; true ->
case couch_db:is_active_stream(Db, Stream) of
true ->
false ->
% Stream where the attachments were written to is
% no longer the current attachment stream. This
% can happen when a database is switched at
% compaction time.
couch_log:debug("Stream where the attachments were"
" written has changed."
" Possibly retrying.", []),
add_sizes(Type, #leaf{sizes=Sizes, atts=AttSizes}, Acc) ->
% Maybe upgrade from disk_size only
active = ActiveSize,
external = ExternalSize
} = upgrade_sizes(Sizes),
{ASAcc, ESAcc, AttsAcc} = Acc,
NewASAcc = ActiveSize + ASAcc,
NewESAcc = ESAcc + if Type == leaf -> ExternalSize; true -> 0 end,
NewAttsAcc = lists:umerge(AttSizes, AttsAcc),
{NewASAcc, NewESAcc, NewAttsAcc}.
upgrade_sizes(#size_info{}=SI) ->
upgrade_sizes({D, E}) ->
#size_info{active=D, external=E};
upgrade_sizes(S) when is_integer(S) ->
#size_info{active=S, external=0}.
send_result(Client, Doc, NewResult) ->
% used to send a result to the client
catch(Client ! {result, self(), {doc_tag(Doc), NewResult}}).
doc_tag(#doc{meta=Meta}) ->
case lists:keyfind(ref, 1, Meta) of
{ref, Ref} -> Ref;
false -> throw(no_doc_tag);
Else -> throw({invalid_doc_tag, Else})
merge_rev_trees(_Limit, _Merge, [], [], AccNewInfos, AccRemoveSeqs, AccSeq) ->
{ok, lists:reverse(AccNewInfos), AccRemoveSeqs, AccSeq};
merge_rev_trees(Limit, MergeConflicts, [NewDocs|RestDocsList],
[OldDocInfo|RestOldInfo], AccNewInfos, AccRemoveSeqs, AccSeq) ->
erlang:put(last_id_merged,, % for debugging
NewDocInfo0 = lists:foldl(fun({Client, NewDoc}, OldInfoAcc) ->
merge_rev_tree(OldInfoAcc, NewDoc, Client, MergeConflicts)
end, OldDocInfo, NewDocs),
NewDocInfo1 = maybe_stem_full_doc_info(NewDocInfo0, Limit),
% When MergeConflicts is false, we updated #full_doc_info.deleted on every
% iteration of merge_rev_tree. However, merge_rev_tree does not update
% #full_doc_info.deleted when MergeConflicts is true, since we don't need
% to know whether the doc is deleted between iterations. Since we still
% need to know if the doc is deleted after the merge happens, we have to
% set it here.
NewDocInfo2 = case MergeConflicts of
true ->
deleted = couch_doc:is_deleted(NewDocInfo1)
false ->
if NewDocInfo2 == OldDocInfo ->
% nothing changed
merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
AccNewInfos, AccRemoveSeqs, AccSeq);
true ->
% We have updated the document, give it a new update_seq. Its
% important to note that the update_seq on OldDocInfo should
% be identical to the value on NewDocInfo1.
OldSeq = OldDocInfo#full_doc_info.update_seq,
NewDocInfo3 = NewDocInfo2#full_doc_info{
update_seq = AccSeq + 1
RemoveSeqs = case OldSeq of
0 -> AccRemoveSeqs;
_ -> [OldSeq | AccRemoveSeqs]
merge_rev_trees(Limit, MergeConflicts, RestDocsList, RestOldInfo,
[NewDocInfo3|AccNewInfos], RemoveSeqs, AccSeq+1)
merge_rev_tree(OldInfo, NewDoc, Client, false)
when OldInfo#full_doc_info.deleted ->
% We're recreating a document that was previously
% deleted. To check that this is a recreation from
% the root we assert that the new document has a
% revision depth of 1 (this is to avoid recreating a
% doc from a previous internal revision) and is also
% not deleted. To avoid expanding the revision tree
% unnecessarily we create a new revision based on
% the winning deleted revision.
{RevDepth, _} = NewDoc#doc.revs,
NewDeleted = NewDoc#doc.deleted,
case RevDepth == 1 andalso not NewDeleted of
true ->
% Update the new doc based on revisions in OldInfo
#doc_info{revs=[WinningRev | _]} = couch_doc:to_doc_info(OldInfo),
#rev_info{rev={OldPos, OldRev}} = WinningRev,
Body = case couch_util:get_value(comp_body, NewDoc#doc.meta) of
CompBody when is_binary(CompBody) ->
_ ->
RevIdDoc = NewDoc#doc{
revs = {OldPos, [OldRev]},
body = Body
NewRevId = couch_db:new_revid(RevIdDoc),
NewDoc2 = NewDoc#doc{revs={OldPos + 1, [NewRevId, OldRev]}},
% Merge our modified new doc into the tree
#full_doc_info{rev_tree=OldTree} = OldInfo,
NewTree0 = couch_doc:to_path(NewDoc2),
case couch_key_tree:merge(OldTree, NewTree0) of
{NewTree1, new_leaf} ->
% We changed the revision id so inform the caller
send_result(Client, NewDoc, {ok, {OldPos+1, NewRevId}}),
rev_tree = NewTree1,
deleted = false
_ ->
_ ->
send_result(Client, NewDoc, conflict),
merge_rev_tree(OldInfo, NewDoc, Client, false) ->
% We're attempting to merge a new revision into an
% undeleted document. To not be a conflict we require
% that the merge results in extending a branch.
OldTree = OldInfo#full_doc_info.rev_tree,
NewTree0 = couch_doc:to_path(NewDoc),
NewDeleted = NewDoc#doc.deleted,
case couch_key_tree:merge(OldTree, NewTree0) of
{NewTree, new_leaf} when not NewDeleted ->
rev_tree = NewTree,
deleted = false
{NewTree, new_leaf} when NewDeleted ->
% We have to check if we just deleted this
% document completely or if it was a conflict
% resolution.
rev_tree = NewTree,
deleted = couch_doc:is_deleted(NewTree)
_ ->
send_result(Client, NewDoc, conflict),
merge_rev_tree(OldInfo, NewDoc, _Client, true) ->
% We're merging in revisions without caring about
% conflicts. Most likely this is a replication update.
OldTree = OldInfo#full_doc_info.rev_tree,
NewTree0 = couch_doc:to_path(NewDoc),
{NewTree, _} = couch_key_tree:merge(OldTree, NewTree0),
OldInfo#full_doc_info{rev_tree = NewTree}.
maybe_stem_full_doc_info(#full_doc_info{rev_tree = Tree} = Info, Limit) ->
case config:get_boolean("couchdb", "stem_interactive_updates", true) of
true ->
Stemmed = couch_key_tree:stem(Tree, Limit),
Info#full_doc_info{rev_tree = Stemmed};
false ->
update_docs_int(Db, DocsList, LocalDocs, MergeConflicts, FullCommit) ->
UpdateSeq = couch_db_engine:get_update_seq(Db),
RevsLimit = couch_db_engine:get_revs_limit(Db),
Ids = [Id || [{_Client, #doc{id=Id}}|_] <- DocsList],
% lookup up the old documents, if they exist.
OldDocLookups = couch_db_engine:open_docs(Db, Ids),
OldDocInfos = lists:zipwith(fun
(_Id, #full_doc_info{} = FDI) ->
(Id, not_found) ->
end, Ids, OldDocLookups),
% Merge the new docs into the revision trees.
{ok, NewFullDocInfos, RemSeqs, _} = merge_rev_trees(RevsLimit,
MergeConflicts, DocsList, OldDocInfos, [], [], UpdateSeq),
% Write out the document summaries (the bodies are stored in the nodes of
% the trees, the attachments are already written to disk)
{ok, IndexFDIs} = flush_trees(Db, NewFullDocInfos, []),
Pairs = pair_write_info(OldDocLookups, IndexFDIs),
LocalDocs2 = update_local_doc_revs(LocalDocs),
{ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, LocalDocs2, []),
WriteCount = length(IndexFDIs),
couch_stats:increment_counter([couchdb, document_inserts],
WriteCount - length(RemSeqs)),
couch_stats:increment_counter([couchdb, document_writes], WriteCount),
[couchdb, local_document_writes],
% Check if we just updated any design documents, and update the validation
% funs if we did.
UpdatedDDocIds = lists:flatmap(fun
(<<"_design/", _/binary>> = Id) -> [Id];
(_) -> []
end, Ids),
{ok, commit_data(Db1, not FullCommit), UpdatedDDocIds}.
update_local_doc_revs(Docs) ->
lists:map(fun({Client, NewDoc}) ->
deleted = Delete,
revs = {0, PrevRevs}
} = NewDoc,
case PrevRevs of
[RevStr | _] ->
PrevRev = binary_to_integer(RevStr);
[] ->
PrevRev = 0
NewRev = case Delete of
false ->
PrevRev + 1;
true ->
send_result(Client, NewDoc, {ok, {0, integer_to_binary(NewRev)}}),
revs = {0, [NewRev]}
end, Docs).
commit_data(Db) ->
commit_data(Db, false).
commit_data(#db{waiting_delayed_commit=nil} = Db, true) ->
TRef = erlang:send_after(1000,self(),delayed_commit),
commit_data(Db, true) ->
commit_data(Db, _) ->
waiting_delayed_commit = Timer
} = Db,
if is_reference(Timer) -> erlang:cancel_timer(Timer); true -> ok end,
{ok, Db1} = couch_db_engine:commit_data(Db),
waiting_delayed_commit = nil,
committed_update_seq = couch_db_engine:get_update_seq(Db)
pair_write_info(Old, New) ->
lists:map(fun(FDI) ->
case lists:keyfind(,, Old) of
#full_doc_info{} = OldFDI -> {OldFDI, FDI};
false -> {not_found, FDI}
end, New).
pair_purge_info(Old, New) ->
lists:map(fun(OldFDI) ->
case lists:keyfind(,, New) of
#full_doc_info{} = NewFDI -> {OldFDI, NewFDI};
false -> {OldFDI, not_found}
end, Old).
get_meta_body_size(Meta) ->
{ejson_size, ExternalSize} = lists:keyfind(ejson_size, 1, Meta),
default_security_object(<<"shards/", _/binary>>) ->
case config:get("couchdb", "default_security", "everyone") of
"admin_only" ->
Everyone when Everyone == "everyone"; Everyone == "admin_local" ->
default_security_object(_DbName) ->
case config:get("couchdb", "default_security", "everyone") of
Admin when Admin == "admin_only"; Admin == "admin_local" ->
"everyone" ->
% These functions rely on using the process dictionary. This is
% usually frowned upon however in this case it is done to avoid
% changing to a different server state record. Once PSE (Pluggable
% Storage Engine) code lands this should be moved to the #db{} record.
update_idle_limit_from_config() ->
Default = integer_to_list(?IDLE_LIMIT_DEFAULT),
IdleLimit = case config:get("couchdb", "idle_check_timeout", Default) of
"infinity" ->
Milliseconds ->
put(idle_limit, IdleLimit),
idle_limit() ->
hibernate_if_no_idle_limit() ->
case idle_limit() of
infinity ->
Timeout when is_integer(Timeout) ->