| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(fabric_rpc). |
| |
| -export([get_db_info/1, get_doc_count/1, get_design_doc_count/1, |
| get_update_seq/1]). |
| -export([open_doc/3, open_revs/4, get_doc_info/3, get_full_doc_info/3, |
| get_missing_revs/2, get_missing_revs/3, update_docs/3]). |
| -export([all_docs/3, changes/3, map_view/4, reduce_view/4, group_info/2]). |
| -export([create_db/1, create_db/2, delete_db/1, reset_validation_funs/1, |
| set_security/3, set_revs_limit/3, create_shard_db_doc/2, |
| delete_shard_db_doc/2, get_partition_info/2]). |
| -export([get_all_security/2, open_shard/2]). |
| -export([compact/1, compact/2]). |
| -export([get_purge_seq/2, purge_docs/3, set_purge_infos_limit/3]). |
| |
| -export([get_db_info/2, get_doc_count/2, get_design_doc_count/2, |
| get_update_seq/2, changes/4, map_view/5, reduce_view/5, |
| group_info/3, update_mrview/4]). |
| |
| -include_lib("fabric/include/fabric.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("couch_mrview/include/couch_mrview.hrl"). |
| |
| %% rpc endpoints |
| %% call to with_db will supply your M:F with a Db instance |
| %% and then remaining args |
| |
| %% @equiv changes(DbName, Args, StartSeq, []) |
| changes(DbName, Args, StartSeq) -> |
| changes(DbName, Args, StartSeq, []). |
| |
| changes(DbName, #changes_args{} = Args, StartSeq, DbOptions) -> |
| changes(DbName, [Args], StartSeq, DbOptions); |
| changes(DbName, Options, StartVector, DbOptions) -> |
| set_io_priority(DbName, DbOptions), |
| Args0 = lists:keyfind(changes_args, 1, Options), |
| #changes_args{dir=Dir, filter_fun=Filter} = Args0, |
| Args = case Filter of |
| {fetch, custom, Style, Req, {DDocId, Rev}, FName} -> |
| {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), |
| Args0#changes_args{ |
| filter_fun={custom, Style, Req, DDoc, FName} |
| }; |
| {fetch, view, Style, {DDocId, Rev}, VName} -> |
| {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), |
| Args0#changes_args{filter_fun={view, Style, DDoc, VName}}; |
| _ -> |
| Args0 |
| end, |
| |
| DbOpenOptions = Args#changes_args.db_open_options ++ DbOptions, |
| case get_or_create_db(DbName, DbOpenOptions) of |
| {ok, Db} -> |
| StartSeq = calculate_start_seq(Db, node(), StartVector), |
| Enum = fun changes_enumerator/2, |
| Opts = [{dir,Dir}], |
| Acc0 = #fabric_changes_acc{ |
| db = Db, |
| seq = StartSeq, |
| args = Args, |
| options = Options, |
| pending = couch_db:count_changes_since(Db, StartSeq), |
| epochs = couch_db:get_epochs(Db) |
| }, |
| try |
| {ok, #fabric_changes_acc{seq=LastSeq, pending=Pending, epochs=Epochs}} = |
| do_changes(Db, StartSeq, Enum, Acc0, Opts), |
| rexi:stream_last({complete, [ |
| {seq, {LastSeq, uuid(Db), couch_db:owner_of(Epochs, LastSeq)}}, |
| {pending, Pending} |
| ]}) |
| after |
| couch_db:close(Db) |
| end; |
| Error -> |
| rexi:stream_last(Error) |
| end. |
| |
| do_changes(Db, StartSeq, Enum, Acc0, Opts) -> |
| #fabric_changes_acc { |
| args = Args |
| } = Acc0, |
| #changes_args { |
| filter = Filter |
| } = Args, |
| case Filter of |
| "_doc_ids" -> |
| % optimised code path, we’re looking up all doc_ids in the by-id instead of filtering |
| % the entire by-seq tree to find the doc_ids one by one |
| #changes_args { |
| filter_fun = {doc_ids, Style, DocIds}, |
| dir = Dir |
| } = Args, |
| couch_changes:send_changes_doc_ids(Db, StartSeq, Dir, Enum, Acc0, {doc_ids, Style, DocIds}); |
| "_design_docs" -> |
| % optimised code path, we’re looking up all design_docs in the by-id instead of |
| % filtering the entire by-seq tree to find the design_docs one by one |
| #changes_args { |
| filter_fun = {design_docs, Style}, |
| dir = Dir |
| } = Args, |
| couch_changes:send_changes_design_docs(Db, StartSeq, Dir, Enum, Acc0, {design_docs, Style}); |
| _ -> |
| couch_db:fold_changes(Db, StartSeq, Enum, Acc0, Opts) |
| end. |
| |
| all_docs(DbName, Options, Args0) -> |
| case fabric_util:upgrade_mrargs(Args0) of |
| #mrargs{keys=undefined} = Args -> |
| set_io_priority(DbName, Options), |
| {ok, Db} = get_or_create_db(DbName, Options), |
| CB = get_view_cb(Args), |
| couch_mrview:query_all_docs(Db, Args, CB, Args) |
| end. |
| |
| update_mrview(DbName, {DDocId, Rev}, ViewName, Args0) -> |
| {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), |
| couch_util:with_db(DbName, fun(Db) -> |
| UpdateSeq = couch_db:get_update_seq(Db), |
| {ok, Pid, _} = couch_mrview:get_view_index_pid( |
| Db, DDoc, ViewName, fabric_util:upgrade_mrargs(Args0)), |
| couch_index:get_state(Pid, UpdateSeq) |
| end). |
| |
| %% @equiv map_view(DbName, DDoc, ViewName, Args0, []) |
| map_view(DbName, DDocInfo, ViewName, Args0) -> |
| map_view(DbName, DDocInfo, ViewName, Args0, []). |
| |
| map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> |
| {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), |
| map_view(DbName, DDoc, ViewName, Args0, DbOptions); |
| map_view(DbName, DDoc, ViewName, Args0, DbOptions) -> |
| set_io_priority(DbName, DbOptions), |
| Args = fabric_util:upgrade_mrargs(Args0), |
| {ok, Db} = get_or_create_db(DbName, DbOptions), |
| CB = get_view_cb(Args), |
| couch_mrview:query_view(Db, DDoc, ViewName, Args, CB, Args). |
| |
| %% @equiv reduce_view(DbName, DDoc, ViewName, Args0) |
| reduce_view(DbName, DDocInfo, ViewName, Args0) -> |
| reduce_view(DbName, DDocInfo, ViewName, Args0, []). |
| |
| reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) -> |
| {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev), |
| reduce_view(DbName, DDoc, ViewName, Args0, DbOptions); |
| reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) -> |
| set_io_priority(DbName, DbOptions), |
| Args = fabric_util:upgrade_mrargs(Args0), |
| {ok, Db} = get_or_create_db(DbName, DbOptions), |
| VAcc0 = #vacc{db=Db}, |
| couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0). |
| |
| create_db(DbName) -> |
| create_db(DbName, []). |
| |
| create_db(DbName, Options) -> |
| rexi:reply(case couch_server:create(DbName, Options) of |
| {ok, _} -> |
| ok; |
| Error -> |
| Error |
| end). |
| |
| create_shard_db_doc(_, Doc) -> |
| rexi:reply(mem3_util:write_db_doc(Doc)). |
| |
| delete_db(DbName) -> |
| couch_server:delete(DbName, []). |
| |
| delete_shard_db_doc(_, DocId) -> |
| rexi:reply(mem3_util:delete_db_doc(DocId)). |
| |
| %% @equiv get_db_info(DbName, []) |
| get_db_info(DbName) -> |
| get_db_info(DbName, []). |
| |
| get_db_info(DbName, DbOptions) -> |
| with_db(DbName, DbOptions, {couch_db, get_db_info, []}). |
| |
| get_partition_info(DbName, Partition) -> |
| with_db(DbName, [], {couch_db, get_partition_info, [Partition]}). |
| |
| %% equiv get_doc_count(DbName, []) |
| get_doc_count(DbName) -> |
| get_doc_count(DbName, []). |
| |
| get_doc_count(DbName, DbOptions) -> |
| with_db(DbName, DbOptions, {couch_db, get_doc_count, []}). |
| |
| %% equiv get_design_doc_count(DbName, []) |
| get_design_doc_count(DbName) -> |
| get_design_doc_count(DbName, []). |
| |
| get_design_doc_count(DbName, DbOptions) -> |
| with_db(DbName, DbOptions, {couch_db, get_design_doc_count, []}). |
| |
| %% equiv get_update_seq(DbName, []) |
| get_update_seq(DbName) -> |
| get_update_seq(DbName, []). |
| |
| get_update_seq(DbName, DbOptions) -> |
| with_db(DbName, DbOptions, {couch_db, get_update_seq, []}). |
| |
| set_security(DbName, SecObj, Options0) -> |
| Options = case lists:keyfind(io_priority, 1, Options0) of |
| false -> |
| [{io_priority, {db_meta, security}}|Options0]; |
| _ -> |
| Options0 |
| end, |
| with_db(DbName, Options, {couch_db, set_security, [SecObj]}). |
| |
| get_all_security(DbName, Options) -> |
| with_db(DbName, Options, {couch_db, get_security, []}). |
| |
| set_revs_limit(DbName, Limit, Options) -> |
| with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}). |
| |
| set_purge_infos_limit(DbName, Limit, Options) -> |
| with_db(DbName, Options, {couch_db, set_purge_infos_limit, [Limit]}). |
| |
| open_doc(DbName, DocId, Options) -> |
| with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}). |
| |
| open_revs(DbName, Id, Revs, Options) -> |
| with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}). |
| |
| get_full_doc_info(DbName, DocId, Options) -> |
| with_db(DbName, Options, {couch_db, get_full_doc_info, [DocId]}). |
| |
| get_doc_info(DbName, DocId, Options) -> |
| with_db(DbName, Options, {couch_db, get_doc_info, [DocId]}). |
| |
| get_missing_revs(DbName, IdRevsList) -> |
| get_missing_revs(DbName, IdRevsList, []). |
| |
| get_missing_revs(DbName, IdRevsList, Options) -> |
| % reimplement here so we get [] for Ids with no missing revs in response |
| set_io_priority(DbName, Options), |
| rexi:reply(case get_or_create_db(DbName, Options) of |
| {ok, Db} -> |
| Ids = [Id1 || {Id1, _Revs} <- IdRevsList], |
| {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) -> |
| case FullDocInfoResult of |
| #full_doc_info{rev_tree=RevisionTree} = FullInfo -> |
| MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs), |
| {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)}; |
| not_found -> |
| {Id, Revs, []} |
| end |
| end, IdRevsList, couch_db:get_full_doc_infos(Db, Ids))}; |
| Error -> |
| Error |
| end). |
| |
| update_docs(DbName, Docs0, Options) -> |
| {Docs1, Type} = case couch_util:get_value(read_repair, Options) of |
| NodeRevs when is_list(NodeRevs) -> |
| Filtered = read_repair_filter(DbName, Docs0, NodeRevs, Options), |
| {Filtered, replicated_changes}; |
| undefined -> |
| X = case proplists:get_value(replicated_changes, Options) of |
| true -> replicated_changes; |
| _ -> interactive_edit |
| end, |
| {Docs0, X} |
| end, |
| Docs2 = make_att_readers(Docs1), |
| with_db(DbName, Options, {couch_db, update_docs, [Docs2, Options, Type]}). |
| |
| |
| get_purge_seq(DbName, Options) -> |
| with_db(DbName, Options, {couch_db, get_purge_seq, []}). |
| |
| purge_docs(DbName, UUIdsIdsRevs, Options) -> |
| with_db(DbName, Options, {couch_db, purge_docs, [UUIdsIdsRevs, Options]}). |
| |
| %% @equiv group_info(DbName, DDocId, []) |
| group_info(DbName, DDocId) -> |
| group_info(DbName, DDocId, []). |
| |
| group_info(DbName, DDocId, DbOptions) -> |
| with_db(DbName, DbOptions, {couch_mrview, get_info, [DDocId]}). |
| |
| reset_validation_funs(DbName) -> |
| case get_or_create_db(DbName, []) of |
| {ok, Db} -> |
| couch_db:reload_validation_funs(Db); |
| _ -> |
| ok |
| end. |
| |
| open_shard(Name, Opts) -> |
| set_io_priority(Name, Opts), |
| try |
| rexi:reply(couch_db:open(Name, Opts)) |
| catch exit:{timeout, _} -> |
| couch_stats:increment_counter([fabric, open_shard, timeouts]) |
| end. |
| |
| compact(DbName) -> |
| with_db(DbName, [], {couch_db, start_compact, []}). |
| |
| compact(ShardName, DesignName) -> |
| {ok, Pid} = couch_index_server:get_index( |
| couch_mrview_index, ShardName, <<"_design/", DesignName/binary>>), |
| Ref = erlang:make_ref(), |
| Pid ! {'$gen_call', {self(), Ref}, compact}. |
| |
| %% |
| %% internal |
| %% |
| |
| with_db(DbName, Options, {M,F,A}) -> |
| set_io_priority(DbName, Options), |
| case get_or_create_db(DbName, Options) of |
| {ok, Db} -> |
| rexi:reply(try |
| apply(M, F, [Db | A]) |
| catch Exception -> |
| Exception; |
| error:Reason -> |
| couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason, |
| clean_stack()]), |
| {error, Reason} |
| end); |
| Error -> |
| rexi:reply(Error) |
| end. |
| |
| |
| read_repair_filter(DbName, Docs, NodeRevs, Options) -> |
| set_io_priority(DbName, Options), |
| case get_or_create_db(DbName, Options) of |
| {ok, Db} -> |
| try |
| read_repair_filter(Db, Docs, NodeRevs) |
| after |
| couch_db:close(Db) |
| end; |
| Error -> |
| rexi:reply(Error) |
| end. |
| |
| |
| % A read repair operation may have been triggered by a node |
| % that was out of sync with the local node. Thus, any time |
| % we receive a read repair request we need to check if we |
| % may have recently purged any of the given revisions and |
| % ignore them if so. |
| % |
| % This is accomplished by looking at the purge infos that we |
| % have locally that have not been replicated to the remote |
| % node. The logic here is that we may have received the purge |
| % request before the remote shard copy. So to check that we |
| % need to look at the purge infos that we have locally but |
| % have not yet sent to the remote copy. |
| % |
| % NodeRevs is a list of the {node(), [rev()]} tuples passed |
| % as the read_repair option to update_docs. |
| read_repair_filter(Db, Docs, NodeRevs) -> |
| [#doc{id = DocId} | _] = Docs, |
| NonLocalNodeRevs = [NR || {N, _} = NR <- NodeRevs, N /= node()], |
| Nodes = lists:usort([Node || {Node, _} <- NonLocalNodeRevs]), |
| NodeSeqs = get_node_seqs(Db, Nodes), |
| |
| DbPSeq = couch_db:get_purge_seq(Db), |
| Lag = config:get_integer("couchdb", "read_repair_lag", 100), |
| |
| % Filter out read-repair updates from any node that is |
| % so out of date that it would force us to scan a large |
| % number of purge infos |
| NodeFiltFun = fun({Node, _Revs}) -> |
| {Node, NodeSeq} = lists:keyfind(Node, 1, NodeSeqs), |
| NodeSeq >= DbPSeq - Lag |
| end, |
| RecentNodeRevs = lists:filter(NodeFiltFun, NonLocalNodeRevs), |
| |
| % For each node we scan the purge infos to filter out any |
| % revisions that have been locally purged since we last |
| % replicated to the remote node's shard copy. |
| AllowableRevs = lists:foldl(fun({Node, Revs}, RevAcc) -> |
| {Node, StartSeq} = lists:keyfind(Node, 1, NodeSeqs), |
| FoldFun = fun({_PSeq, _UUID, PDocId, PRevs}, InnerAcc) -> |
| if PDocId /= DocId -> {ok, InnerAcc}; true -> |
| {ok, InnerAcc -- PRevs} |
| end |
| end, |
| {ok, FiltRevs} = couch_db:fold_purge_infos(Db, StartSeq, FoldFun, Revs), |
| lists:usort(FiltRevs ++ RevAcc) |
| end, [], RecentNodeRevs), |
| |
| % Finally, filter the doc updates to only include revisions |
| % that have not been purged locally. |
| DocFiltFun = fun(#doc{revs = {Pos, [Rev | _]}}) -> |
| lists:member({Pos, Rev}, AllowableRevs) |
| end, |
| lists:filter(DocFiltFun, Docs). |
| |
| |
| get_node_seqs(Db, Nodes) -> |
| % Gather the list of {Node, PurgeSeq} pairs for all nodes |
| % that are present in our read repair group |
| FoldFun = fun(#doc{id = Id, body = {Props}}, Acc) -> |
| case Id of |
| <<?LOCAL_DOC_PREFIX, "purge-mem3-", _/binary>> -> |
| TgtNode = couch_util:get_value(<<"target_node">>, Props), |
| PurgeSeq = couch_util:get_value(<<"purge_seq">>, Props), |
| case lists:keyfind(TgtNode, 1, Acc) of |
| {_, OldSeq} -> |
| NewSeq = erlang:max(OldSeq, PurgeSeq), |
| NewEntry = {TgtNode, NewSeq}, |
| NewAcc = lists:keyreplace(TgtNode, 1, Acc, NewEntry), |
| {ok, NewAcc}; |
| false -> |
| {ok, Acc} |
| end; |
| _ -> |
| % We've processed all _local mem3 purge docs |
| {stop, Acc} |
| end |
| end, |
| InitAcc = [{list_to_binary(atom_to_list(Node)), 0} || Node <- Nodes], |
| Opts = [{start_key, <<?LOCAL_DOC_PREFIX, "purge-mem3-">>}], |
| {ok, NodeBinSeqs} = couch_db:fold_local_docs(Db, FoldFun, InitAcc, Opts), |
| [{list_to_existing_atom(binary_to_list(N)), S} || {N, S} <- NodeBinSeqs]. |
| |
| |
| |
| get_or_create_db(DbName, Options) -> |
| mem3_util:get_or_create_db(DbName, Options). |
| |
| |
| get_view_cb(#mrargs{extra = Options}) -> |
| case couch_util:get_value(callback, Options) of |
| {Mod, Fun} when is_atom(Mod), is_atom(Fun) -> |
| fun Mod:Fun/2; |
| _ -> |
| fun view_cb/2 |
| end; |
| get_view_cb(_) -> |
| fun view_cb/2. |
| |
| |
| view_cb({meta, Meta}, Acc) -> |
| % Map function starting |
| ok = rexi:stream2({meta, Meta}), |
| {ok, Acc}; |
| view_cb({row, Row}, Acc) -> |
| % Adding another row |
| ViewRow = #view_row{ |
| id = couch_util:get_value(id, Row), |
| key = couch_util:get_value(key, Row), |
| value = couch_util:get_value(value, Row), |
| doc = couch_util:get_value(doc, Row) |
| }, |
| ok = rexi:stream2(ViewRow), |
| {ok, Acc}; |
| view_cb(complete, Acc) -> |
| % Finish view output |
| ok = rexi:stream_last(complete), |
| {ok, Acc}; |
| view_cb(ok, ddoc_updated) -> |
| rexi:reply({ok, ddoc_updated}). |
| |
| |
| reduce_cb({meta, Meta}, Acc) -> |
| % Map function starting |
| ok = rexi:stream2({meta, Meta}), |
| {ok, Acc}; |
| reduce_cb({row, Row}, Acc) -> |
| % Adding another row |
| ok = rexi:stream2(#view_row{ |
| key = couch_util:get_value(key, Row), |
| value = couch_util:get_value(value, Row) |
| }), |
| {ok, Acc}; |
| reduce_cb(complete, Acc) -> |
| % Finish view output |
| ok = rexi:stream_last(complete), |
| {ok, Acc}; |
| reduce_cb(ok, ddoc_updated) -> |
| rexi:reply({ok, ddoc_updated}). |
| |
| |
| changes_enumerator(#full_doc_info{} = FDI, Acc) -> |
| changes_enumerator(couch_doc:to_doc_info(FDI), Acc); |
| changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) -> |
| {ok, Acc#fabric_changes_acc{seq = Seq, pending = Acc#fabric_changes_acc.pending-1}}; |
| changes_enumerator(DocInfo, Acc) -> |
| #fabric_changes_acc{ |
| db = Db, |
| args = #changes_args{ |
| include_docs = IncludeDocs, |
| conflicts = Conflicts, |
| filter_fun = Filter, |
| doc_options = DocOptions |
| }, |
| pending = Pending, |
| epochs = Epochs |
| } = Acc, |
| #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo, |
| case [X || X <- couch_changes:filter(Db, DocInfo, Filter), X /= null] of |
| [] -> |
| ChangesRow = {no_pass, [ |
| {pending, Pending-1}, |
| {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}} |
| ]}; |
| Results -> |
| Opts = if Conflicts -> [conflicts | DocOptions]; true -> DocOptions end, |
| ChangesRow = {change, [ |
| {pending, Pending-1}, |
| {seq, {Seq, uuid(Db), couch_db:owner_of(Epochs, Seq)}}, |
| {id, Id}, |
| {changes, Results}, |
| {deleted, Del} | |
| if IncludeDocs -> [doc_member(Db, DocInfo, Opts, Filter)]; true -> [] end |
| ]} |
| end, |
| ok = rexi:stream2(ChangesRow), |
| {ok, Acc#fabric_changes_acc{seq = Seq, pending = Pending-1}}. |
| |
| doc_member(Shard, DocInfo, Opts, Filter) -> |
| case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of |
| {ok, Doc} -> |
| {doc, maybe_filtered_json_doc(Doc, Opts, Filter)}; |
| Error -> |
| Error |
| end. |
| |
| maybe_filtered_json_doc(Doc, Opts, {selector, _Style, {_Selector, Fields}}) |
| when Fields =/= nil -> |
| mango_fields:extract(couch_doc:to_json_obj(Doc, Opts), Fields); |
| maybe_filtered_json_doc(Doc, Opts, _Filter) -> |
| couch_doc:to_json_obj(Doc, Opts). |
| |
| |
| possible_ancestors(_FullInfo, []) -> |
| []; |
| possible_ancestors(FullInfo, MissingRevs) -> |
| #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo), |
| LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo], |
| % Find the revs that are possible parents of this rev |
| lists:foldl(fun({LeafPos, LeafRevId}, Acc) -> |
| % this leaf is a "possible ancenstor" of the missing |
| % revs if this LeafPos lessthan any of the missing revs |
| case lists:any(fun({MissingPos, _}) -> |
| LeafPos < MissingPos end, MissingRevs) of |
| true -> |
| [{LeafPos, LeafRevId} | Acc]; |
| false -> |
| Acc |
| end |
| end, [], LeafRevs). |
| |
| make_att_readers([]) -> |
| []; |
| make_att_readers([#doc{atts=Atts0} = Doc | Rest]) -> |
| % % go through the attachments looking for 'follows' in the data, |
| % % replace with function that reads the data from MIME stream. |
| Atts = [couch_att:transform(data, fun make_att_reader/1, Att) || Att <- Atts0], |
| [Doc#doc{atts = Atts} | make_att_readers(Rest)]. |
| |
| make_att_reader({follows, Parser, Ref}) -> |
| fun() -> |
| ParserRef = case get(mp_parser_ref) of |
| undefined -> |
| PRef = erlang:monitor(process, Parser), |
| put(mp_parser_ref, PRef), |
| PRef; |
| Else -> |
| Else |
| end, |
| Parser ! {get_bytes, Ref, self()}, |
| receive |
| {bytes, Ref, Bytes} -> |
| rexi:reply(attachment_chunk_received), |
| Bytes; |
| {'DOWN', ParserRef, _, _, Reason} -> |
| throw({mp_parser_died, Reason}) |
| end |
| end; |
| make_att_reader({fabric_attachment_receiver, Middleman, Length}) -> |
| fabric_doc_atts:receiver_callback(Middleman, Length); |
| make_att_reader(Else) -> |
| Else. |
| |
| clean_stack() -> |
| lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end, |
| erlang:get_stacktrace()). |
| |
| set_io_priority(DbName, Options) -> |
| case lists:keyfind(io_priority, 1, Options) of |
| {io_priority, Pri} -> |
| erlang:put(io_priority, Pri); |
| false -> |
| erlang:put(io_priority, {interactive, DbName}) |
| end, |
| case erlang:get(io_priority) of |
| {interactive, _} -> |
| case config:get("couchdb", "maintenance_mode", "false") of |
| "true" -> |
| % Done to silence error logging by rexi_server |
| rexi:reply({rexi_EXIT, {maintenance_mode, node()}}), |
| exit(normal); |
| _ -> |
| ok |
| end; |
| _ -> |
| ok |
| end. |
| |
| |
| calculate_start_seq(Db, Node, Seq) -> |
| case couch_db:calculate_start_seq(Db, Node, Seq) of |
| N when is_integer(N) -> |
| N; |
| {replace, OriginalNode, Uuid, OriginalSeq} -> |
| %% Scan history looking for an entry with |
| %% * target_node == TargetNode |
| %% * target_uuid == TargetUUID |
| %% * target_seq =< TargetSeq |
| %% If such an entry is found, stream from associated source_seq |
| mem3_rep:find_source_seq(Db, OriginalNode, Uuid, OriginalSeq) |
| end. |
| |
| |
| uuid(Db) -> |
| Uuid = couch_db:get_uuid(Db), |
| binary:part(Uuid, {0, uuid_prefix_len()}). |
| |
| uuid_prefix_len() -> |
| list_to_integer(config:get("fabric", "uuid_prefix_len", "7")). |
| |
| -ifdef(TEST). |
| -include_lib("eunit/include/eunit.hrl"). |
| |
| maybe_filtered_json_doc_no_filter_test() -> |
| Body = {[{<<"a">>, 1}]}, |
| Doc = #doc{id = <<"1">>, revs = {1, [<<"r1">>]}, body = Body}, |
| {JDocProps} = maybe_filtered_json_doc(Doc, [], x), |
| ExpectedProps = [{<<"_id">>, <<"1">>}, {<<"_rev">>, <<"1-r1">>}, {<<"a">>, 1}], |
| ?assertEqual(lists:keysort(1, JDocProps), ExpectedProps). |
| |
| maybe_filtered_json_doc_with_filter_test() -> |
| Body = {[{<<"a">>, 1}]}, |
| Doc = #doc{id = <<"1">>, revs = {1, [<<"r1">>]}, body = Body}, |
| Fields = [<<"a">>, <<"nonexistent">>], |
| Filter = {selector, main_only, {some_selector, Fields}}, |
| {JDocProps} = maybe_filtered_json_doc(Doc, [], Filter), |
| ?assertEqual(JDocProps, [{<<"a">>, 1}]). |
| |
| -endif. |