blob: bbfbe0ddf5656f61c2215d683ebb867e448edeff [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_db).
-export([open/2,open_int/2,close/1,create/2,get_db_info/1,get_design_docs/1]).
-export([start_compact/1, cancel_compact/1]).
-export([wait_for_compaction/1, wait_for_compaction/2]).
-export([is_idle/1,monitor/1,count_changes_since/2]).
-export([update_doc/3,update_doc/4,update_docs/4,update_docs/2,update_docs/3,delete_doc/3]).
-export([get_doc_info/2,get_full_doc_info/2,get_full_doc_infos/2]).
-export([open_doc/2,open_doc/3,open_doc_revs/4]).
-export([set_revs_limit/2,get_revs_limit/1]).
-export([get_missing_revs/2,name/1,get_update_seq/1,get_committed_update_seq/1]).
-export([get_uuid/1, get_epochs/1, get_compacted_seq/1]).
-export([enum_docs/4,enum_docs_since/5]).
-export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
-export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
-export([start_link/3,open_doc_int/3,ensure_full_commit/1,ensure_full_commit/2]).
-export([set_security/2,get_security/1]).
-export([changes_since/4,changes_since/5,read_doc/2,new_revid/1]).
-export([check_is_admin/1, check_is_member/1, get_doc_count/1]).
-export([reopen/1, is_system_db/1, compression/1, make_doc/5]).
-export([load_validation_funs/1]).
-export([check_md5/2, with_stream/3]).
-export([monitored_by/1]).
-export([normalize_dbname/1]).
-include_lib("couch/include/couch_db.hrl").
-define(VALID_DB_NAME, "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$").
start_link(DbName, Filepath, Options) ->
case open_db_file(Filepath, Options) of
{ok, Fd} ->
{ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {DbName,
Filepath, Fd, Options}, []),
unlink(Fd),
gen_server:call(UpdaterPid, get_db);
Else ->
Else
end.
open_db_file(Filepath, Options) ->
case couch_file:open(Filepath, Options) of
{ok, Fd} ->
{ok, Fd};
{error, enoent} ->
% couldn't find file. is there a compact version? This can happen if
% crashed during the file switch.
case couch_file:open(Filepath ++ ".compact", [nologifmissing]) of
{ok, Fd} ->
couch_log:info("Found ~s~s compaction file, using as primary"
" storage.", [Filepath, ".compact"]),
ok = file:rename(Filepath ++ ".compact", Filepath),
ok = couch_file:sync(Fd),
{ok, Fd};
{error, enoent} ->
{not_found, no_db_file}
end;
Error ->
Error
end.
create(DbName, Options) ->
couch_server:create(DbName, Options).
% this is for opening a database for internal purposes like the replicator
% or the view indexer. it never throws a reader error.
open_int(DbName, Options) ->
couch_server:open(DbName, Options).
% this should be called anytime an http request opens the database.
% it ensures that the http userCtx is a valid reader
open(DbName, Options) ->
case couch_server:open(DbName, Options) of
{ok, Db} ->
try
check_is_member(Db),
{ok, Db}
catch
throw:Error ->
close(Db),
throw(Error)
end;
Else -> Else
end.
reopen(#db{main_pid = Pid, fd = Fd, fd_monitor = OldRef, user_ctx = UserCtx}) ->
{ok, #db{fd = NewFd} = NewDb} = gen_server:call(Pid, get_db, infinity),
case NewFd =:= Fd of
true ->
{ok, NewDb#db{user_ctx = UserCtx}};
false ->
erlang:demonitor(OldRef, [flush]),
NewRef = erlang:monitor(process, NewFd),
{ok, NewDb#db{user_ctx = UserCtx, fd_monitor = NewRef}}
end.
is_system_db(#db{options = Options}) ->
lists:member(sys_db, Options).
ensure_full_commit(#db{main_pid=Pid, instance_start_time=StartTime}) ->
ok = gen_server:call(Pid, full_commit, infinity),
{ok, StartTime}.
ensure_full_commit(Db, RequiredSeq) ->
#db{main_pid=Pid, instance_start_time=StartTime} = Db,
ok = gen_server:call(Pid, {full_commit, RequiredSeq}, infinity),
{ok, StartTime}.
close(#db{fd_monitor=Ref}) ->
erlang:demonitor(Ref, [flush]),
ok.
is_idle(#db{compactor_pid=nil, waiting_delayed_commit=nil} = Db) ->
monitored_by(Db) == [];
is_idle(_Db) ->
false.
monitored_by(Db) ->
case erlang:process_info(Db#db.fd, monitored_by) of
undefined ->
[];
{monitored_by, Pids} ->
PidTracker = whereis(couch_stats_process_tracker),
Pids -- [Db#db.main_pid, PidTracker]
end.
monitor(#db{main_pid=MainPid}) ->
erlang:monitor(process, MainPid).
start_compact(#db{main_pid=Pid}) ->
gen_server:call(Pid, start_compact).
cancel_compact(#db{main_pid=Pid}) ->
gen_server:call(Pid, cancel_compact).
wait_for_compaction(Db) ->
wait_for_compaction(Db, infinity).
wait_for_compaction(#db{main_pid=Pid}=Db, Timeout) ->
Start = os:timestamp(),
case gen_server:call(Pid, compactor_pid) of
CPid when is_pid(CPid) ->
Ref = erlang:monitor(process, CPid),
receive
{'DOWN', Ref, _, _, normal} when Timeout == infinity ->
wait_for_compaction(Db, Timeout);
{'DOWN', Ref, _, _, normal} ->
Elapsed = timer:now_diff(os:timestamp(), Start) div 1000,
wait_for_compaction(Db, Timeout - Elapsed);
{'DOWN', Ref, _, _, Reason} ->
{error, Reason}
after Timeout ->
erlang:demonitor(Ref, [flush]),
{error, Timeout}
end;
_ ->
ok
end.
delete_doc(Db, Id, Revisions) ->
DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
{ok, [Result]} = update_docs(Db, DeletedDocs, []),
{ok, Result}.
open_doc(Db, IdOrDocInfo) ->
open_doc(Db, IdOrDocInfo, []).
open_doc(Db, Id, Options) ->
increment_stat(Db, [couchdb, database_reads]),
case open_doc_int(Db, Id, Options) of
{ok, #doc{deleted=true}=Doc} ->
case lists:member(deleted, Options) of
true ->
apply_open_options({ok, Doc},Options);
false ->
{not_found, deleted}
end;
Else ->
apply_open_options(Else,Options)
end.
apply_open_options({ok, Doc},Options) ->
apply_open_options2(Doc,Options);
apply_open_options(Else,_Options) ->
Else.
apply_open_options2(Doc,[]) ->
{ok, Doc};
apply_open_options2(#doc{atts=Atts0,revs=Revs}=Doc,
[{atts_since, PossibleAncestors}|Rest]) ->
RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
Atts = lists:map(fun(Att) ->
[AttPos, Data] = couch_att:fetch([revpos, data], Att),
if AttPos > RevPos -> couch_att:store(data, Data, Att);
true -> couch_att:store(data, stub, Att)
end
end, Atts0),
apply_open_options2(Doc#doc{atts=Atts}, Rest);
apply_open_options2(Doc, [ejson_body | Rest]) ->
apply_open_options2(couch_doc:with_ejson_body(Doc), Rest);
apply_open_options2(Doc,[_|Rest]) ->
apply_open_options2(Doc,Rest).
find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
0;
find_ancestor_rev_pos(_DocRevs, []) ->
0;
find_ancestor_rev_pos({RevPos, [RevId|Rest]}, AttsSinceRevs) ->
case lists:member({RevPos, RevId}, AttsSinceRevs) of
true ->
RevPos;
false ->
find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
end.
open_doc_revs(Db, Id, Revs, Options) ->
increment_stat(Db, [couchdb, database_reads]),
[{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
{ok, [apply_open_options(Result, Options) || Result <- Results]}.
% Each returned result is a list of tuples:
% {Id, MissingRevs, PossibleAncestors}
% if no revs are missing, it's omitted from the results.
get_missing_revs(Db, IdRevsList) ->
Results = get_full_doc_infos(Db, [Id1 || {Id1, _Revs} <- IdRevsList]),
{ok, find_missing(IdRevsList, Results)}.
find_missing([], []) ->
[];
find_missing([{Id, Revs}|RestIdRevs], [{ok, FullInfo} | RestLookupInfo]) ->
case couch_key_tree:find_missing(FullInfo#full_doc_info.rev_tree, Revs) of
[] ->
find_missing(RestIdRevs, RestLookupInfo);
MissingRevs ->
#doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
% Find the revs that are possible parents of this rev
PossibleAncestors =
lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
% this leaf is a "possible ancenstor" of the missing
% revs if this LeafPos lessthan any of the missing revs
case lists:any(fun({MissingPos, _}) ->
LeafPos < MissingPos end, MissingRevs) of
true ->
[{LeafPos, LeafRevId} | Acc];
false ->
Acc
end
end, [], LeafRevs),
[{Id, MissingRevs, PossibleAncestors} |
find_missing(RestIdRevs, RestLookupInfo)]
end;
find_missing([{Id, Revs}|RestIdRevs], [not_found | RestLookupInfo]) ->
[{Id, Revs, []} | find_missing(RestIdRevs, RestLookupInfo)].
get_doc_info(Db, Id) ->
case get_full_doc_info(Db, Id) of
{ok, DocInfo} ->
{ok, couch_doc:to_doc_info(DocInfo)};
Else ->
Else
end.
% returns {ok, DocInfo} or not_found
get_full_doc_info(Db, Id) ->
[Result] = get_full_doc_infos(Db, [Id]),
Result.
get_full_doc_infos(Db, Ids) ->
couch_btree:lookup(Db#db.id_tree, Ids).
increment_update_seq(#db{main_pid=Pid}) ->
gen_server:call(Pid, increment_update_seq).
purge_docs(#db{main_pid=Pid}, IdsRevs) ->
gen_server:call(Pid, {purge_docs, IdsRevs}).
get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
get_update_seq(#db{update_seq=Seq})->
Seq.
get_purge_seq(#db{}=Db) ->
couch_db_header:purge_seq(Db#db.header).
get_last_purged(#db{}=Db) ->
case couch_db_header:purged_docs(Db#db.header) of
nil ->
{ok, []};
Pointer ->
couch_file:pread_term(Db#db.fd, Pointer)
end.
get_doc_count(Db) ->
{ok, {Count, _, _}} = couch_btree:full_reduce(Db#db.id_tree),
{ok, Count}.
get_uuid(#db{}=Db) ->
couch_db_header:uuid(Db#db.header).
get_epochs(#db{}=Db) ->
couch_db_header:epochs(Db#db.header).
get_compacted_seq(#db{}=Db) ->
couch_db_header:compacted_seq(Db#db.header).
get_db_info(Db) ->
#db{fd=Fd,
header=Header,
compactor_pid=Compactor,
update_seq=SeqNum,
name=Name,
instance_start_time=StartTime,
committed_update_seq=CommittedUpdateSeq,
id_tree = IdBtree
} = Db,
{ok, FileSize} = couch_file:bytes(Fd),
{ok, DbReduction} = couch_btree:full_reduce(IdBtree),
SizeInfo0 = element(3, DbReduction),
SizeInfo = if is_record(SizeInfo0, size_info) -> SizeInfo0; true ->
#size_info{active=SizeInfo0}
end,
ActiveSize = active_size(Db, SizeInfo),
DiskVersion = couch_db_header:disk_version(Header),
Uuid = case get_uuid(Db) of
undefined -> null;
Uuid0 -> Uuid0
end,
CompactedSeq = case get_compacted_seq(Db) of
undefined -> null;
Else1 -> Else1
end,
InfoList = [
{db_name, Name},
{doc_count, element(1, DbReduction)},
{doc_del_count, element(2, DbReduction)},
{update_seq, SeqNum},
{purge_seq, couch_db:get_purge_seq(Db)},
{compact_running, Compactor/=nil},
{disk_size, FileSize}, % legacy
{other, {[{data_size, SizeInfo#size_info.external}]}}, % legacy
{data_size, ActiveSize}, % legacy
{sizes, {[
{file, FileSize},
{active, ActiveSize},
{external, SizeInfo#size_info.external}
]}},
{instance_start_time, StartTime},
{disk_format_version, DiskVersion},
{committed_update_seq, CommittedUpdateSeq},
{compacted_seq, CompactedSeq},
{uuid, Uuid}
],
{ok, InfoList}.
active_size(#db{}=Db, Size) when is_integer(Size) ->
active_size(Db, #size_info{active=Size});
active_size(#db{}=Db, #size_info{}=SI) ->
Trees = [
Db#db.id_tree,
Db#db.seq_tree,
Db#db.local_tree
],
lists:foldl(fun(T, Acc) ->
case couch_btree:size(T) of
_ when Acc == null ->
null;
undefined ->
null;
Size ->
Acc + Size
end
end, SI#size_info.active, Trees).
get_design_docs(#db{name = <<"shards/", _:18/binary, DbName/binary>>}) ->
{_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
receive {'DOWN', Ref, _, _, Response} ->
Response
end;
get_design_docs(#db{id_tree = IdBtree}) ->
FoldFun = pipe([fun skip_deleted/4], fun
(#full_doc_info{deleted = true}, _Reds, Acc) ->
{ok, Acc};
(#full_doc_info{id= <<"_design/",_/binary>>}=FullDocInfo, _Reds, Acc) ->
{ok, [FullDocInfo | Acc]};
(_, _Reds, Acc) ->
{stop, Acc}
end),
KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
{ok, _, Docs} = couch_btree:fold(IdBtree, FoldFun, [], KeyOpts),
{ok, Docs}.
check_is_admin(#db{user_ctx=#user_ctx{name=Name,roles=Roles}}=Db) ->
{Admins} = get_admins(Db),
AdminRoles = [<<"_admin">> | couch_util:get_value(<<"roles">>, Admins, [])],
AdminNames = couch_util:get_value(<<"names">>, Admins,[]),
case AdminRoles -- Roles of
AdminRoles -> % same list, not an admin role
case AdminNames -- [Name] of
AdminNames -> % same names, not an admin
throw({unauthorized, <<"You are not a db or server admin.">>});
_ ->
ok
end;
_ ->
ok
end.
check_is_member(#db{user_ctx=#user_ctx{name=Name,roles=Roles}=UserCtx}=Db) ->
case (catch check_is_admin(Db)) of
ok -> ok;
_ ->
{Members} = get_members(Db),
ReaderRoles = couch_util:get_value(<<"roles">>, Members,[]),
WithAdminRoles = [<<"_admin">> | ReaderRoles],
ReaderNames = couch_util:get_value(<<"names">>, Members,[]),
case ReaderRoles ++ ReaderNames of
[] -> ok; % no readers == public access
_Else ->
case WithAdminRoles -- Roles of
WithAdminRoles -> % same list, not an reader role
case ReaderNames -- [Name] of
ReaderNames -> % same names, not a reader
couch_log:debug("Not a reader: UserCtx ~p"
" vs Names ~p Roles ~p",
[UserCtx, ReaderNames, WithAdminRoles]),
throw({unauthorized, <<"You are not authorized to access this db.">>});
_ ->
ok
end;
_ ->
ok
end
end
end.
get_admins(#db{security=SecProps}) ->
couch_util:get_value(<<"admins">>, SecProps, {[]}).
get_members(#db{security=SecProps}) ->
% we fallback to readers here for backwards compatibility
couch_util:get_value(<<"members">>, SecProps,
couch_util:get_value(<<"readers">>, SecProps, {[]})).
get_security(#db{security=SecProps}) ->
{SecProps}.
set_security(#db{main_pid=Pid}=Db, {NewSecProps}) when is_list(NewSecProps) ->
check_is_admin(Db),
ok = validate_security_object(NewSecProps),
ok = gen_server:call(Pid, {set_security, NewSecProps}, infinity),
{ok, _} = ensure_full_commit(Db),
ok;
set_security(_, _) ->
throw(bad_request).
validate_security_object(SecProps) ->
Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}),
% we fallback to readers here for backwards compatibility
Members = couch_util:get_value(<<"members">>, SecProps,
couch_util:get_value(<<"readers">>, SecProps, {[]})),
ok = validate_names_and_roles(Admins),
ok = validate_names_and_roles(Members),
ok.
% validate user input
validate_names_and_roles({Props}) when is_list(Props) ->
case couch_util:get_value(<<"names">>,Props,[]) of
Ns when is_list(Ns) ->
[throw("names must be a JSON list of strings") ||N <- Ns, not is_binary(N)],
Ns;
_ -> throw("names must be a JSON list of strings")
end,
case couch_util:get_value(<<"roles">>,Props,[]) of
Rs when is_list(Rs) ->
[throw("roles must be a JSON list of strings") ||R <- Rs, not is_binary(R)],
Rs;
_ -> throw("roles must be a JSON list of strings")
end,
ok.
get_revs_limit(#db{revs_limit=Limit}) ->
Limit.
set_revs_limit(#db{main_pid=Pid}=Db, Limit) when Limit > 0 ->
check_is_admin(Db),
gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
set_revs_limit(_Db, _Limit) ->
throw(invalid_revs_limit).
name(#db{name=Name}) ->
Name.
compression(#db{compression=Compression}) ->
Compression.
update_doc(Db, Doc, Options) ->
update_doc(Db, Doc, Options, interactive_edit).
update_doc(Db, Doc, Options, UpdateType) ->
case update_docs(Db, [Doc], Options, UpdateType) of
{ok, [{ok, NewRev}]} ->
{ok, NewRev};
{ok, [{{_Id, _Rev}, Error}]} ->
throw(Error);
{ok, [Error]} ->
throw(Error);
{ok, []} ->
% replication success
{Pos, [RevId | _]} = Doc#doc.revs,
{ok, {Pos, RevId}}
end.
update_docs(Db, Docs) ->
update_docs(Db, Docs, []).
% group_alike_docs groups the sorted documents into sublist buckets, by id.
% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
group_alike_docs(Docs) ->
% Here we're just asserting that our doc sort is stable so that
% if we have duplicate docids we don't have to worry about the
% behavior of lists:sort/2 which isn't documented anyhwere as
% being stable.
WithPos = lists:zip(Docs, lists:seq(1, length(Docs))),
SortFun = fun({D1, P1}, {D2, P2}) -> {D1#doc.id, P1} =< {D2#doc.id, P2} end,
SortedDocs = [D || {D, _} <- lists:sort(SortFun, WithPos)],
group_alike_docs(SortedDocs, []).
group_alike_docs([], Buckets) ->
lists:reverse(lists:map(fun lists:reverse/1, Buckets));
group_alike_docs([Doc|Rest], []) ->
group_alike_docs(Rest, [[Doc]]);
group_alike_docs([Doc|Rest], [Bucket|RestBuckets]) ->
[#doc{id=BucketId}|_] = Bucket,
case Doc#doc.id == BucketId of
true ->
% add to existing bucket
group_alike_docs(Rest, [[Doc|Bucket]|RestBuckets]);
false ->
% add to new bucket
group_alike_docs(Rest, [[Doc]|[Bucket|RestBuckets]])
end.
validate_doc_update(#db{}=Db, #doc{id= <<"_design/",_/binary>>}=Doc, _GetDiskDocFun) ->
case catch check_is_admin(Db) of
ok -> validate_ddoc(Db#db.name, Doc);
Error -> Error
end;
validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) ->
ValidationFuns = load_validation_funs(Db),
validate_doc_update(Db#db{validate_doc_funs=ValidationFuns}, Doc, Fun);
validate_doc_update(#db{validate_doc_funs=[]}, _Doc, _GetDiskDocFun) ->
ok;
validate_doc_update(_Db, #doc{id= <<"_local/",_/binary>>}, _GetDiskDocFun) ->
ok;
validate_doc_update(Db, Doc, GetDiskDocFun) ->
case get(io_priority) of
{internal_repl, _} ->
ok;
_ ->
validate_doc_update_int(Db, Doc, GetDiskDocFun)
end.
validate_ddoc(DbName, DDoc) ->
try
couch_index_server:validate(DbName, couch_doc:with_ejson_body(DDoc))
catch
throw:{invalid_design_doc, Reason} ->
{bad_request, invalid_design_doc, Reason};
throw:Error ->
Error
end.
validate_doc_update_int(Db, Doc, GetDiskDocFun) ->
Fun = fun() ->
DiskDoc = GetDiskDocFun(),
JsonCtx = couch_util:json_user_ctx(Db),
SecObj = get_security(Db),
try
[case Fun(Doc, DiskDoc, JsonCtx, SecObj) of
ok -> ok;
Error -> throw(Error)
end || Fun <- Db#db.validate_doc_funs],
ok
catch
throw:Error ->
Error
end
end,
couch_stats:update_histogram([couchdb, query_server, vdu_process_time],
Fun).
% to be safe, spawn a middleman here
load_validation_funs(#db{main_pid=Pid, name = <<"shards/", _/binary>>}=Db) ->
{_, Ref} = spawn_monitor(fun() ->
exit(ddoc_cache:open(mem3:dbname(Db#db.name), validation_funs))
end),
receive
{'DOWN', Ref, _, _, {ok, Funs}} ->
gen_server:cast(Pid, {load_validation_funs, Funs}),
Funs;
{'DOWN', Ref, _, _, Reason} ->
couch_log:error("could not load validation funs ~p", [Reason]),
throw(internal_server_error)
end;
load_validation_funs(#db{main_pid=Pid}=Db) ->
{ok, DDocInfos} = get_design_docs(Db),
OpenDocs = fun
(#full_doc_info{}=D) ->
{ok, Doc} = open_doc_int(Db, D, [ejson_body]),
Doc
end,
DDocs = lists:map(OpenDocs, DDocInfos),
Funs = lists:flatmap(fun(DDoc) ->
case couch_doc:get_validate_doc_fun(DDoc) of
nil -> [];
Fun -> [Fun]
end
end, DDocs),
gen_server:cast(Pid, {load_validation_funs, Funs}),
Funs.
prep_and_validate_update(Db, #doc{id=Id,revs={RevStart, Revs}}=Doc,
OldFullDocInfo, LeafRevsDict, AllowConflict) ->
case Revs of
[PrevRev|_] ->
case dict:find({RevStart, PrevRev}, LeafRevsDict) of
{ok, {#leaf{deleted=Deleted, ptr=DiskSp}, DiskRevs}} ->
case couch_doc:has_stubs(Doc) of
true ->
DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs),
Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
{validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2};
false ->
LoadDiskDoc = fun() -> make_doc(Db,Id,Deleted,DiskSp,DiskRevs) end,
{validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
end;
error when AllowConflict ->
couch_doc:merge_stubs(Doc, #doc{}), % will generate error if
% there are stubs
{validate_doc_update(Db, Doc, fun() -> nil end), Doc};
error ->
{conflict, Doc}
end;
[] ->
% new doc, and we have existing revs.
% reuse existing deleted doc
if OldFullDocInfo#full_doc_info.deleted orelse AllowConflict ->
{validate_doc_update(Db, Doc, fun() -> nil end), Doc};
true ->
{conflict, Doc}
end
end.
prep_and_validate_updates(_Db, [], [], _AllowConflict, AccPrepped,
AccFatalErrors) ->
AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)),
{AccPrepped2, AccFatalErrors};
prep_and_validate_updates(Db, [DocBucket|RestBuckets], [not_found|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
% no existing revs are known,
{PreppedBucket, AccErrors3} = lists:foldl(
fun(#doc{revs=Revs}=Doc, {AccBucket, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
false -> ok
end,
case Revs of
{0, []} ->
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
{[Doc | AccBucket], AccErrors2};
Error ->
{AccBucket, [{doc_tag(Doc), Error} | AccErrors2]}
end;
_ ->
% old revs specified but none exist, a conflict
{AccBucket, [{doc_tag(Doc), conflict} | AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3);
prep_and_validate_updates(Db, [DocBucket|RestBuckets],
[{ok, #full_doc_info{rev_tree=OldRevTree}=OldFullDocInfo}|RestLookups],
AllowConflict, AccPrepped, AccErrors) ->
Leafs = couch_key_tree:get_all_leafs(OldRevTree),
LeafRevsDict = dict:from_list([
{{Start, RevId}, {Leaf, Revs}} ||
{Leaf, {Start, [RevId | _]} = Revs} <- Leafs
]),
{PreppedBucket, AccErrors3} = lists:foldl(
fun(Doc, {Docs2Acc, AccErrors2}) ->
case prep_and_validate_update(Db, Doc, OldFullDocInfo,
LeafRevsDict, AllowConflict) of
{ok, Doc2} ->
{[Doc2 | Docs2Acc], AccErrors2};
{Error, _} ->
% Record the error
{Docs2Acc, [{doc_tag(Doc), Error} |AccErrors2]}
end
end,
{[], AccErrors}, DocBucket),
prep_and_validate_updates(Db, RestBuckets, RestLookups, AllowConflict,
[PreppedBucket | AccPrepped], AccErrors3).
update_docs(Db, Docs, Options) ->
update_docs(Db, Docs, Options, interactive_edit).
prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
Errors2 = [{{Id, {Pos, Rev}}, Error} ||
{#doc{id=Id,revs={Pos,[Rev|_]}}, Error} <- AccErrors],
AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)),
{AccPrepped2, lists:reverse(Errors2)};
prep_and_validate_replicated_updates(Db, [Bucket|RestBuckets], [OldInfo|RestOldInfo], AccPrepped, AccErrors) ->
case OldInfo of
not_found ->
{ValidatedBucket, AccErrors3} = lists:foldl(
fun(Doc, {AccPrepped2, AccErrors2}) ->
case couch_doc:has_stubs(Doc) of
true ->
couch_doc:merge_stubs(Doc, #doc{}); % will throw exception
false -> ok
end,
case validate_doc_update(Db, Doc, fun() -> nil end) of
ok ->
{[Doc | AccPrepped2], AccErrors2};
Error ->
{AccPrepped2, [{Doc, Error} | AccErrors2]}
end
end,
{[], AccErrors}, Bucket),
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3);
{ok, #full_doc_info{rev_tree=OldTree}} ->
OldLeafs = couch_key_tree:get_all_leafs_full(OldTree),
OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _}|_]} <- OldLeafs],
NewRevTree = lists:foldl(
fun(NewDoc, AccTree) ->
{NewTree, _} = couch_key_tree:merge(AccTree,
couch_doc:to_path(NewDoc), Db#db.revs_limit),
NewTree
end,
OldTree, Bucket),
Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
LeafRevsFullDict = dict:from_list( [{{Start, RevId}, FullPath} || {Start, [{RevId, _}|_]}=FullPath <- Leafs]),
{ValidatedBucket, AccErrors3} =
lists:foldl(
fun(#doc{id=Id,revs={Pos, [RevId|_]}}=Doc, {AccValidated, AccErrors2}) ->
IsOldLeaf = lists:member({Pos, RevId}, OldLeafsLU),
case dict:find({Pos, RevId}, LeafRevsFullDict) of
{ok, {Start, Path}} when not IsOldLeaf ->
% our unflushed doc is a leaf node. Go back on the path
% to find the previous rev that's on disk.
LoadPrevRevFun = fun() ->
make_first_doc_on_disk(Db,Id,Start-1, tl(Path))
end,
case couch_doc:has_stubs(Doc) of
true ->
DiskDoc = case LoadPrevRevFun() of
#doc{} = DiskDoc0 ->
DiskDoc0;
_ ->
% Force a missing_stub exception
couch_doc:merge_stubs(Doc, #doc{})
end,
Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
GetDiskDocFun = fun() -> DiskDoc end;
false ->
Doc2 = Doc,
GetDiskDocFun = LoadPrevRevFun
end,
case validate_doc_update(Db, Doc2, GetDiskDocFun) of
ok ->
{[Doc2 | AccValidated], AccErrors2};
Error ->
{AccValidated, [{Doc, Error} | AccErrors2]}
end;
_ ->
% this doc isn't a leaf or already exists in the tree.
% ignore but consider it a success.
{AccValidated, AccErrors2}
end
end,
{[], AccErrors}, Bucket),
prep_and_validate_replicated_updates(Db, RestBuckets, RestOldInfo,
[ValidatedBucket | AccPrepped], AccErrors3)
end.
new_revid(#doc{body=Body, revs={OldStart,OldRevs}, atts=Atts, deleted=Deleted}) ->
DigestedAtts = lists:foldl(fun(Att, Acc) ->
[N, T, M] = couch_att:fetch([name, type, md5], Att),
case M == <<>> of
true -> Acc;
false -> [{N, T, M} | Acc]
end
end, [], Atts),
case DigestedAtts of
Atts2 when length(Atts) =/= length(Atts2) ->
% We must have old style non-md5 attachments
?l2b(integer_to_list(couch_util:rand32()));
Atts2 ->
OldRev = case OldRevs of [] -> 0; [OldRev0|_] -> OldRev0 end,
couch_util:md5(term_to_binary([Deleted, OldStart, OldRev, Body, Atts2]))
end.
new_revs([], OutBuckets, IdRevsAcc) ->
{lists:reverse(OutBuckets), IdRevsAcc};
new_revs([Bucket|RestBuckets], OutBuckets, IdRevsAcc) ->
{NewBucket, IdRevsAcc3} = lists:mapfoldl(
fun(#doc{revs={Start, RevIds}}=Doc, IdRevsAcc2)->
NewRevId = new_revid(Doc),
{Doc#doc{revs={Start+1, [NewRevId | RevIds]}},
[{doc_tag(Doc), {ok, {Start+1, NewRevId}}} | IdRevsAcc2]}
end, IdRevsAcc, Bucket),
new_revs(RestBuckets, [NewBucket|OutBuckets], IdRevsAcc3).
check_dup_atts(#doc{atts=Atts}=Doc) ->
lists:foldl(fun(Att, Names) ->
Name = couch_att:fetch(name, Att),
case ordsets:is_element(Name, Names) of
true -> throw({bad_request, <<"Duplicate attachments">>});
false -> ordsets:add_element(Name, Names)
end
end, ordsets:new(), Atts),
Doc.
tag_docs([]) ->
[];
tag_docs([#doc{meta=Meta}=Doc | Rest]) ->
[Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)].
doc_tag(#doc{meta=Meta}) ->
case lists:keyfind(ref, 1, Meta) of
{ref, Ref} when is_reference(Ref) -> Ref;
false -> throw(doc_not_tagged);
Else -> throw({invalid_doc_tag, Else})
end.
update_docs(Db, Docs0, Options, replicated_changes) ->
increment_stat(Db, [couchdb, database_writes]),
Docs = tag_docs(Docs0),
DocBuckets = before_docs_update(Db, group_alike_docs(Docs)),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
(#doc{atts=Atts}) ->
Atts /= []
end, Docs) of
true ->
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
ExistingDocs = get_full_doc_infos(Db, Ids),
{DocBuckets2, DocErrors} =
prep_and_validate_replicated_updates(Db, DocBuckets, ExistingDocs, [], []),
DocBuckets3 = [Bucket || [_|_]=Bucket <- DocBuckets2]; % remove empty buckets
false ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
update_docs(Db, Docs0, Options, interactive_edit) ->
increment_stat(Db, [couchdb, database_writes]),
AllOrNothing = lists:member(all_or_nothing, Options),
Docs = tag_docs(Docs0),
% Separate _local docs from normal docs
IsLocal = fun
(#doc{id= <<?LOCAL_DOC_PREFIX, _/binary>>}) -> true;
(_) -> false
end,
{NonRepDocs, Docs2} = lists:partition(IsLocal, Docs),
DocBuckets = before_docs_update(Db, group_alike_docs(Docs2)),
case (Db#db.validate_doc_funs /= []) orelse
lists:any(
fun(#doc{id= <<?DESIGN_DOC_PREFIX, _/binary>>}) ->
true;
(#doc{atts=Atts}) ->
Atts /= []
end, Docs2) of
true ->
% lookup the doc by id and get the most recent
Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
ExistingDocInfos = get_full_doc_infos(Db, Ids),
{DocBucketsPrepped, PreCommitFailures} = prep_and_validate_updates(Db,
DocBuckets, ExistingDocInfos, AllOrNothing, [], []),
% strip out any empty buckets
DocBuckets2 = [Bucket || [_|_] = Bucket <- DocBucketsPrepped];
false ->
PreCommitFailures = [],
DocBuckets2 = DocBuckets
end,
if (AllOrNothing) and (PreCommitFailures /= []) ->
RefErrorDict = dict:from_list([{doc_tag(Doc), Doc} || Doc <- Docs]),
{aborted, lists:map(fun({Ref, Error}) ->
#doc{id=Id,revs={Start,RevIds}} = dict:fetch(Ref, RefErrorDict),
case {Start, RevIds} of
{Pos, [RevId | _]} -> {{Id, {Pos, RevId}}, Error};
{0, []} -> {{Id, {0, <<>>}}, Error}
end
end, PreCommitFailures)};
true ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
doc_flush_atts(set_new_att_revpos(
check_dup_atts(Doc)), Db#db.fd)
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
{ok, CommitResults} = write_and_commit(Db, DocBuckets4, NonRepDocs, Options2),
ResultsDict = lists:foldl(fun({Key, Resp}, ResultsAcc) ->
dict:store(Key, Resp, ResultsAcc)
end, dict:from_list(IdRevs), CommitResults ++ PreCommitFailures),
{ok, lists:map(fun(Doc) ->
dict:fetch(doc_tag(Doc), ResultsDict)
end, Docs)}
end.
% Returns the first available document on disk. Input list is a full rev path
% for the doc.
make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
nil;
make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #doc{}} | RestPath]) ->
make_first_doc_on_disk(Db, Id, Pos-1, RestPath);
make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING}|RestPath]) ->
make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #leaf{deleted=IsDel, ptr=Sp}} |_]=DocPath) ->
Revs = [Rev || {Rev, _} <- DocPath],
make_doc(Db, Id, IsDel, Sp, {Pos, Revs}).
set_commit_option(Options) ->
CommitSettings = {
[true || O <- Options, O==full_commit orelse O==delay_commit],
config:get("couchdb", "delayed_commits", "false")
},
case CommitSettings of
{[true], _} ->
Options; % user requested explicit commit setting, do not change it
{_, "true"} ->
Options; % delayed commits are enabled, do nothing
{_, "false"} ->
[full_commit|Options];
{_, Else} ->
couch_log:error("[couchdb] delayed_commits setting must be true/false,"
" not ~p", [Else]),
[full_commit|Options]
end.
collect_results_with_metrics(Pid, MRef, []) ->
Begin = os:timestamp(),
try
collect_results(Pid, MRef, [])
after
ResultsTime = timer:now_diff(os:timestamp(), Begin) div 1000,
couch_stats:update_histogram(
[couchdb, collect_results_time],
ResultsTime
)
end.
collect_results(Pid, MRef, ResultsAcc) ->
receive
{result, Pid, Result} ->
collect_results(Pid, MRef, [Result | ResultsAcc]);
{done, Pid} ->
{ok, ResultsAcc};
{retry, Pid} ->
retry;
{'DOWN', MRef, _, _, Reason} ->
exit(Reason)
end.
write_and_commit(#db{main_pid=Pid, user_ctx=Ctx}=Db, DocBuckets1,
NonRepDocs, Options0) ->
DocBuckets = prepare_doc_summaries(Db, DocBuckets1),
Options = set_commit_option(Options0),
MergeConflicts = lists:member(merge_conflicts, Options),
FullCommit = lists:member(full_commit, Options),
MRef = erlang:monitor(process, Pid),
try
Pid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
case collect_results_with_metrics(Pid, MRef, []) of
{ok, Results} -> {ok, Results};
retry ->
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
DocBuckets2 = [
[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] ||
Bucket <- DocBuckets1
],
% We only retry once
DocBuckets3 = prepare_doc_summaries(Db2, DocBuckets2),
close(Db2),
Pid ! {update_docs, self(), DocBuckets3, NonRepDocs, MergeConflicts, FullCommit},
case collect_results_with_metrics(Pid, MRef, []) of
{ok, Results} -> {ok, Results};
retry -> throw({update_error, compaction_retry})
end
end
after
erlang:demonitor(MRef, [flush])
end.
prepare_doc_summaries(Db, BucketList) ->
[lists:map(
fun(#doc{body = Body, atts = Atts} = Doc) ->
DiskAtts = [couch_att:to_disk_term(Att) || Att <- Atts],
{ok, SizeInfo} = couch_att:size_info(Atts),
AttsFd = case Atts of
[Att | _] ->
{Fd, _} = couch_att:fetch(data, Att),
Fd;
[] ->
nil
end,
SummaryChunk = couch_db_updater:make_doc_summary(Db, {Body, DiskAtts}),
Doc#doc{body = {summary, SummaryChunk, SizeInfo, AttsFd}}
end,
Bucket) || Bucket <- BucketList].
before_docs_update(#db{before_doc_update = nil}, BucketList) ->
BucketList;
before_docs_update(#db{before_doc_update = Fun} = Db, BucketList) ->
[lists:map(
fun(Doc) ->
Fun(couch_doc:with_ejson_body(Doc), Db)
end,
Bucket) || Bucket <- BucketList].
set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts0}=Doc) ->
Atts = lists:map(
fun(Att) ->
case couch_att:fetch(data, Att) of
{_Fd, _Sp} -> Att; % already commited to disk, don't set new rev
_ -> couch_att:store(revpos, RevPos+1, Att)
end
end, Atts0),
Doc#doc{atts = Atts}.
doc_flush_atts(Doc, Fd) ->
Doc#doc{atts=[couch_att:flush(Fd, Att) || Att <- Doc#doc.atts]}.
check_md5(_NewSig, <<>>) -> ok;
check_md5(Sig, Sig) -> ok;
check_md5(_, _) -> throw(md5_mismatch).
compressible_att_type(MimeType) when is_binary(MimeType) ->
compressible_att_type(?b2l(MimeType));
compressible_att_type(MimeType) ->
TypeExpList = re:split(
config:get("attachments", "compressible_types", ""),
"\\s*,\\s*",
[{return, list}]
),
lists:any(
fun(TypeExp) ->
Regexp = ["^\\s*", re:replace(TypeExp, "\\*", ".*"),
"(?:\\s*;.*?)?\\s*", $$],
re:run(MimeType, Regexp, [caseless]) =/= nomatch
end,
[T || T <- TypeExpList, T /= []]
).
% From RFC 2616 3.6.1 - Chunked Transfer Coding
%
% In other words, the origin server is willing to accept
% the possibility that the trailer fields might be silently
% discarded along the path to the client.
%
% I take this to mean that if "Trailers: Content-MD5\r\n"
% is present in the request, but there is no Content-MD5
% trailer, we're free to ignore this inconsistency and
% pretend that no Content-MD5 exists.
with_stream(Fd, Att, Fun) ->
[InMd5, Type, Enc] = couch_att:fetch([md5, type, encoding], Att),
BufferSize = list_to_integer(
config:get("couchdb", "attachment_stream_buffer_size", "4096")),
{ok, OutputStream} = case (Enc =:= identity) andalso
compressible_att_type(Type) of
true ->
CompLevel = list_to_integer(
config:get("attachments", "compression_level", "0")
),
couch_stream:open(Fd, [{buffer_size, BufferSize},
{encoding, gzip}, {compression_level, CompLevel}]);
_ ->
couch_stream:open(Fd, [{buffer_size, BufferSize}])
end,
ReqMd5 = case Fun(OutputStream) of
{md5, FooterMd5} ->
case InMd5 of
md5_in_footer -> FooterMd5;
_ -> InMd5
end;
_ ->
InMd5
end,
{StreamInfo, Len, IdentityLen, Md5, IdentityMd5} =
couch_stream:close(OutputStream),
check_md5(IdentityMd5, ReqMd5),
{AttLen, DiskLen, NewEnc} = case Enc of
identity ->
case {Md5, IdentityMd5} of
{Same, Same} ->
{Len, IdentityLen, identity};
_ ->
{Len, IdentityLen, gzip}
end;
gzip ->
case couch_att:fetch([att_len, disk_len], Att) of
[AL, DL] when AL =:= undefined orelse DL =:= undefined ->
% Compressed attachment uploaded through the standalone API.
{Len, Len, gzip};
[AL, DL] ->
% This case is used for efficient push-replication, where a
% compressed attachment is located in the body of multipart
% content-type request.
{AL, DL, gzip}
end
end,
couch_att:store([
{data, {Fd,StreamInfo}},
{att_len, AttLen},
{disk_len, DiskLen},
{md5, Md5},
{encoding, NewEnc}
], Att).
enum_docs_since_reduce_to_count(Reds) ->
couch_btree:final_reduce(
fun couch_db_updater:btree_by_seq_reduce/2, Reds).
enum_docs_reduce_to_count(Reds) ->
FinalRed = couch_btree:final_reduce(
fun couch_db_updater:btree_by_id_reduce/2, Reds),
element(1, FinalRed).
changes_since(Db, StartSeq, Fun, Acc) ->
changes_since(Db, StartSeq, Fun, [], Acc).
changes_since(Db, StartSeq, Fun, Options, Acc) when is_record(Db, db) ->
changes_since(Db#db.seq_tree, StartSeq, Fun, Options, Acc);
changes_since(SeqTree, StartSeq, Fun, Options, Acc) ->
Wrapper = fun(FullDocInfo, _Offset, Acc2) ->
DocInfo = case FullDocInfo of
#full_doc_info{} ->
couch_doc:to_doc_info(FullDocInfo);
#doc_info{} ->
FullDocInfo
end,
Fun(DocInfo, Acc2)
end,
{ok, _LastReduction, AccOut} = couch_btree:fold(SeqTree,
Wrapper, Acc, [{start_key, StartSeq + 1}] ++ Options),
{ok, AccOut}.
count_changes_since(Db, SinceSeq) ->
BTree = Db#db.seq_tree,
{ok, Changes} =
couch_btree:fold_reduce(BTree,
fun(_SeqStart, PartialReds, 0) ->
{ok, couch_btree:final_reduce(BTree, PartialReds)}
end,
0, [{start_key, SinceSeq + 1}]),
Changes.
enum_docs_since(Db, SinceSeq, InFun, Acc, Options) ->
{ok, LastReduction, AccOut} = couch_btree:fold(
Db#db.seq_tree, InFun, Acc,
[{start_key, SinceSeq + 1} | Options]),
{ok, enum_docs_since_reduce_to_count(LastReduction), AccOut}.
enum_docs(Db, InFun, InAcc, Options0) ->
{NS, Options} = extract_namespace(Options0),
enum_docs(Db, NS, InFun, InAcc, Options).
enum_docs(Db, undefined, InFun, InAcc, Options) ->
FoldFun = pipe([fun skip_deleted/4], InFun),
{ok, LastReduce, OutAcc} = couch_btree:fold(
Db#db.id_tree, FoldFun, InAcc, Options),
{ok, enum_docs_reduce_to_count(LastReduce), OutAcc};
enum_docs(Db, <<"_local">>, InFun, InAcc, Options) ->
FoldFun = pipe([fun skip_deleted/4], InFun),
{ok, _LastReduce, OutAcc} = couch_btree:fold(
Db#db.local_tree, FoldFun, InAcc, Options),
{ok, 0, OutAcc};
enum_docs(Db, NS, InFun, InAcc, Options0) ->
FoldFun = pipe([
fun skip_deleted/4,
stop_on_leaving_namespace(NS)], InFun),
Options = set_namespace_range(Options0, NS),
{ok, LastReduce, OutAcc} = couch_btree:fold(
Db#db.id_tree, FoldFun, InAcc, Options),
{ok, enum_docs_reduce_to_count(LastReduce), OutAcc}.
extract_namespace(Options0) ->
case proplists:split(Options0, [namespace]) of
{[[{namespace, NS}]], Options} ->
{NS, Options};
{_, Options} ->
{undefined, Options}
end.
%%% Internal function %%%
open_doc_revs_int(Db, IdRevs, Options) ->
Ids = [Id || {Id, _Revs} <- IdRevs],
LookupResults = get_full_doc_infos(Db, Ids),
lists:zipwith(
fun({Id, Revs}, Lookup) ->
case Lookup of
{ok, #full_doc_info{rev_tree=RevTree}} ->
{FoundRevs, MissingRevs} =
case Revs of
all ->
{couch_key_tree:get_all_leafs(RevTree), []};
_ ->
case lists:member(latest, Options) of
true ->
couch_key_tree:get_key_leafs(RevTree, Revs);
false ->
couch_key_tree:get(RevTree, Revs)
end
end,
FoundResults =
lists:map(fun({Value, {Pos, [Rev|_]}=FoundRevPath}) ->
case Value of
?REV_MISSING ->
% we have the rev in our list but know nothing about it
{{not_found, missing}, {Pos, Rev}};
#leaf{deleted=IsDeleted, ptr=SummaryPtr} ->
{ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
end
end, FoundRevs),
Results = FoundResults ++ [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
{ok, Results};
not_found when Revs == all ->
{ok, []};
not_found ->
{ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
end
end,
IdRevs, LookupResults).
open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
case couch_btree:lookup(Db#db.local_tree, [Id]) of
[{ok, {_, {Rev, BodyData}}}] ->
Doc = #doc{id=Id, revs={0, [?l2b(integer_to_list(Rev))]}, body=BodyData},
apply_open_options({ok, Doc}, Options);
[not_found] ->
{not_found, missing}
end;
open_doc_int(Db, #doc_info{id=Id,revs=[RevInfo|_]}=DocInfo, Options) ->
#rev_info{deleted=IsDeleted,rev={Pos,RevId},body_sp=Bp} = RevInfo,
Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos,[RevId]}),
apply_open_options(
{ok, Doc#doc{meta=doc_meta_info(DocInfo, [], Options)}}, Options);
open_doc_int(Db, #full_doc_info{id=Id,rev_tree=RevTree}=FullDocInfo, Options) ->
#doc_info{revs=[#rev_info{deleted=IsDeleted,rev=Rev,body_sp=Bp}|_]} =
DocInfo = couch_doc:to_doc_info(FullDocInfo),
{[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
apply_open_options(
{ok, Doc#doc{meta=doc_meta_info(DocInfo, RevTree, Options)}}, Options);
open_doc_int(Db, Id, Options) ->
case get_full_doc_info(Db, Id) of
{ok, FullDocInfo} ->
open_doc_int(Db, FullDocInfo, Options);
not_found ->
{not_found, missing}
end.
doc_meta_info(#doc_info{high_seq=Seq,revs=[#rev_info{rev=Rev}|RestInfo]}, RevTree, Options) ->
case lists:member(revs_info, Options) of
false -> [];
true ->
{[{Pos, RevPath}],[]} =
couch_key_tree:get_full_key_paths(RevTree, [Rev]),
[{revs_info, Pos, lists:map(
fun({Rev1, ?REV_MISSING}) ->
{Rev1, missing};
({Rev1, Leaf}) ->
case Leaf#leaf.deleted of
true ->
{Rev1, deleted};
false ->
{Rev1, available}
end
end, RevPath)}]
end ++
case lists:member(conflicts, Options) of
false -> [];
true ->
case [Rev1 || #rev_info{rev=Rev1,deleted=false} <- RestInfo] of
[] -> [];
ConflictRevs -> [{conflicts, ConflictRevs}]
end
end ++
case lists:member(deleted_conflicts, Options) of
false -> [];
true ->
case [Rev1 || #rev_info{rev=Rev1,deleted=true} <- RestInfo] of
[] -> [];
DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}]
end
end ++
case lists:member(local_seq, Options) of
false -> [];
true -> [{local_seq, Seq}]
end.
read_doc(#db{fd=Fd}, Pos) ->
couch_file:pread_term(Fd, Pos).
make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
#doc{
id = Id,
revs = RevisionPath,
body = [],
atts = [],
deleted = Deleted
};
make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
{BodyData, Atts0} = case Bp of
nil ->
{[], []};
_ ->
case read_doc(Db, Bp) of
{ok, {BodyData0, Atts1}} when is_binary(Atts1) ->
{BodyData0, couch_compress:decompress(Atts1)};
{ok, {BodyData0, Atts1}} when is_list(Atts1) ->
% pre 1.2 format
{BodyData0, Atts1}
end
end,
Atts = [couch_att:from_disk_term(Fd, T) || T <- Atts0],
Doc = #doc{
id = Id,
revs = RevisionPath,
body = BodyData,
atts = Atts,
deleted = Deleted
},
after_doc_read(Db, Doc).
after_doc_read(#db{after_doc_read = nil}, Doc) ->
Doc;
after_doc_read(#db{after_doc_read = Fun} = Db, Doc) ->
Fun(couch_doc:with_ejson_body(Doc), Db).
increment_stat(#db{options = Options}, Stat) ->
case lists:member(sys_db, Options) of
true ->
ok;
false ->
couch_stats:increment_counter(Stat)
end.
skip_deleted(traverse, LK, {Undeleted, _, _} = Reds, Acc) when Undeleted == 0 ->
{skip, LK, Reds, Acc};
skip_deleted(Case, A, B, C) ->
{Case, A, B, C}.
stop_on_leaving_namespace(NS) ->
fun
(visit, #full_doc_info{id = Key} = FullInfo, Reds, Acc) ->
case has_prefix(Key, NS) of
true ->
{visit, FullInfo, Reds, Acc};
false ->
{stop, FullInfo, Reds, Acc}
end;
(Case, KV, Reds, Acc) ->
{Case, KV, Reds, Acc}
end.
has_prefix(Bin, Prefix) ->
S = byte_size(Prefix),
case Bin of
<<Prefix:S/binary, "/", _/binary>> ->
true;
_Else ->
false
end.
pipe(Filters, Final) ->
Wrap =
fun
(visit, KV, Reds, Acc) ->
Final(KV, Reds, Acc);
(skip, _KV, _Reds, Acc) ->
{skip, Acc};
(stop, _KV, _Reds, Acc) ->
{stop, Acc};
(traverse, _, _, Acc) ->
{ok, Acc}
end,
do_pipe(Filters, Wrap).
do_pipe([], Fun) -> Fun;
do_pipe([Filter|Rest], F0) ->
F1 = fun(C0, KV0, Reds0, Acc0) ->
{C, KV, Reds, Acc} = Filter(C0, KV0, Reds0, Acc0),
F0(C, KV, Reds, Acc)
end,
do_pipe(Rest, F1).
set_namespace_range(Options, undefined) -> Options;
set_namespace_range(Options, NS) ->
%% FIXME depending on order we might need to swap keys
SK = select_gt(
proplists:get_value(start_key, Options, <<"">>),
<<NS/binary, "/">>),
EK = select_lt(
proplists:get_value(end_key, Options, <<NS/binary, "0">>),
<<NS/binary, "0">>),
[{start_key, SK}, {end_key_gt, EK}].
select_gt(V1, V2) when V1 < V2 -> V2;
select_gt(V1, _V2) -> V1.
select_lt(V1, V2) when V1 > V2 -> V2;
select_lt(V1, _V2) -> V1.
normalize_dbname(<<"shards/", _/binary>> = Path) ->
lists:last(binary:split(mem3:dbname(Path), <<"/">>, [global]));
normalize_dbname(DbName) ->
DbName.