blob: a953b6a38fcc4939c92a6ae2fe32b0c62e54c053 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-module(dreyfus_fabric).
-export([get_json_docs/2, handle_error_message/6]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
-include("dreyfus.hrl").
get_json_docs(DbName, DocIds) ->
fabric:all_docs(DbName, fun callback/2, [], [{keys, DocIds}, {include_docs, true}]).
callback({meta,_}, Acc) ->
{ok, Acc};
callback({error, Reason}, _Acc) ->
{error, Reason};
callback({row, Row}, Acc) ->
{id, Id} = lists:keyfind(id, 1, Row),
{ok, [{Id, lists:keyfind(doc, 1, Row)}|Acc]};
callback(complete, Acc) ->
{ok, lists:reverse(Acc)};
callback(timeout, _Acc) ->
{error, timeout}.
handle_error_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker,
Counters, _Replacements, _StartFun, _StartArgs) ->
case fabric_util:remove_down_workers(Counters, NodeRef) of
{ok, NewCounters} ->
{ok, NewCounters};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;
handle_error_message({rexi_EXIT, {maintenance_mode, _}}, Worker,
Counters, Replacements, StartFun, StartArgs) ->
handle_replacement(Worker, Counters, Replacements, StartFun, StartArgs);
handle_error_message({rexi_EXIT, Reason}, Worker,
Counters, _Replacements, _StartFun, _StartArgs) ->
handle_error(Reason, Worker, Counters);
handle_error_message({error, Reason}, Worker,
Counters, _Replacements, _StartFun, _StartArgs) ->
handle_error(Reason, Worker, Counters);
handle_error_message({'EXIT', Reason}, Worker,
Counters, _Replacements, _StartFun, _StartArgs) ->
handle_error({exit, Reason}, Worker, Counters);
handle_error_message(Reason, Worker, Counters,
_Replacements, _StartFun, _StartArgs) ->
couch_log:error("Unexpected error during request: ~p", [Reason]),
handle_error(Reason, Worker, Counters).
handle_error(Reason, Worker, Counters0) ->
Counters = fabric_dict:erase(Worker, Counters0),
case fabric_view:is_progress_possible(Counters) of
true ->
{ok, Counters};
false ->
{error, Reason}
end.
handle_replacement(Worker, OldCntrs0, OldReplacements, StartFun, StartArgs) ->
OldCounters = lists:filter(fun({#shard{ref=R}, _}) ->
R /= Worker#shard.ref
end, OldCntrs0),
case lists:keytake(Worker#shard.range, 1, OldReplacements) of
{value, {_Range, Replacements}, NewReplacements} ->
NewCounters = lists:foldl(fun(Repl, CounterAcc) ->
NewCounter = start_replacement(StartFun, StartArgs, Repl),
fabric_dict:store(NewCounter, nil, CounterAcc)
end, OldCounters, Replacements),
true = fabric_view:is_progress_possible(NewCounters),
NewRefs = fabric_dict:fetch_keys(NewCounters),
{new_refs, NewRefs, NewCounters, NewReplacements};
false ->
handle_error({nodedown, <<"progress not possible">>},
Worker, OldCounters)
end.
start_replacement(StartFun, StartArgs, Shard) ->
[DDoc, IndexName, QueryArgs] = StartArgs,
After = case QueryArgs#index_query_args.bookmark of
Bookmark when is_list(Bookmark) ->
lists:foldl(fun({#shard{range=R0}, After0}, Acc) ->
case R0 == Shard#shard.range of
true -> After0;
false -> Acc
end
end, nil, Bookmark);
_ ->
nil
end,
QueryArgs1 = QueryArgs#index_query_args{bookmark=After},
StartArgs1 = [DDoc, IndexName, QueryArgs1],
Ref = rexi:cast(Shard#shard.node,
{dreyfus_rpc, StartFun,
[Shard#shard.name|StartArgs1]}),
Shard#shard{ref = Ref}.