blob: c5249d150a7d5c3377a3be228496578577f8ce0d [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_util).
-export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1,
update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
remove_down_workers/2, doc_id_and_rev/1]).
-export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0]).
-export([stream_start/2, stream_start/4]).
-export([log_timeout/2, remove_done_workers/2]).
-compile({inline, [{doc_id_and_rev,1}]}).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("eunit/include/eunit.hrl").
remove_down_workers(Workers, BadNode) ->
Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
NewWorkers = fabric_dict:filter(Filter, Workers),
case fabric_view:is_progress_possible(NewWorkers) of
true ->
{ok, NewWorkers};
false ->
error
end.
submit_jobs(Shards, EndPoint, ExtraArgs) ->
submit_jobs(Shards, fabric_rpc, EndPoint, ExtraArgs).
submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
lists:map(fun(#shard{node=Node, name=ShardName} = Shard) ->
Ref = rexi:cast(Node, {Module, EndPoint, [ShardName | ExtraArgs]}),
Shard#shard{ref = Ref}
end, Shards).
cleanup(Workers) ->
[rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
stream_start(Workers, Keypos) ->
stream_start(Workers, Keypos, undefined, undefined).
stream_start(Workers0, Keypos, StartFun, Replacements) ->
Fun = fun handle_stream_start/3,
Acc = #stream_acc{
workers = fabric_dict:init(Workers0, waiting),
start_fun = StartFun,
replacements = Replacements
},
Timeout = request_timeout(),
case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
{ok, #stream_acc{workers=Workers}} ->
true = fabric_view:is_progress_possible(Workers),
AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
rexi:stream_start(From),
[Worker | WorkerAcc]
end, [], Workers),
{ok, AckedWorkers};
Else ->
Else
end.
handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
{ok, Workers} ->
{ok, St#stream_acc{workers=Workers}};
error ->
Reason = {nodedown, <<"progress not possible">>},
{error, Reason}
end;
handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
Replacements = St#stream_acc.replacements,
case {fabric_view:is_progress_possible(Workers), Reason} of
{true, _} ->
{ok, St#stream_acc{workers=Workers}};
{false, {maintenance_mode, _Node}} when Replacements /= undefined ->
% Check if we have replacements for this range
% and start the new workers if so.
case lists:keytake(Worker#shard.range, 1, Replacements) of
{value, {_Range, WorkerReplacements}, NewReplacements} ->
FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
NewWorker = (St#stream_acc.start_fun)(Repl),
fabric_dict:store(NewWorker, waiting, NewWorkers)
end, Workers, WorkerReplacements),
% Assert that our replaced worker provides us
% the oppurtunity to make progress.
true = fabric_view:is_progress_possible(FinalWorkers),
NewRefs = fabric_dict:fetch_keys(FinalWorkers),
{new_refs, NewRefs, St#stream_acc{
workers=FinalWorkers,
replacements=NewReplacements
}};
false ->
% If we progress isn't possible and we don't have any
% replacements then we're dead in the water.
Error = {nodedown, <<"progress not possible">>},
{error, Error}
end;
{false, _} ->
{error, fabric_util:error_info(Reason)}
end;
handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
undefined ->
% This worker lost the race with other partition copies, terminate
rexi:stream_cancel(From),
{ok, St};
waiting ->
% Don't ack the worker yet so they don't start sending us
% rows until we're ready
Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
case fabric_dict:any(waiting, Workers1) of
true ->
{ok, St#stream_acc{workers=Workers1}};
false ->
{stop, St#stream_acc{workers=Workers1}}
end
end;
handle_stream_start(Else, _, _) ->
exit({invalid_stream_start, Else}).
recv(Workers, Keypos, Fun, Acc0) ->
rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
request_timeout() ->
timeout("request", "60000").
all_docs_timeout() ->
timeout("all_docs", "10000").
attachments_timeout() ->
timeout("attachments", "600000").
timeout(Type, Default) ->
case config:get("fabric", Type ++ "_timeout", Default) of
"infinity" -> infinity;
N -> list_to_integer(N)
end.
log_timeout(Workers, EndPoint) ->
lists:map(fun(#shard{node=Dest, name=Name}) ->
Fmt = "fabric_worker_timeout ~s,~p,~p",
couch_log:error(Fmt, [EndPoint, Dest, Name])
end, Workers).
remove_done_workers(Workers, WaitingIndicator) ->
[W || {W, WI} <- fabric_dict:to_list(Workers), WI == WaitingIndicator].
get_db(DbName) ->
get_db(DbName, []).
get_db(DbName, Options) ->
{Local, SameZone, DifferentZone} = mem3:group_by_proximity(mem3:shards(DbName)),
% Prefer shards on the same node over other nodes, prefer shards in the same zone over
% over zones and sort each remote list by name so that we don't repeatedly try the same node.
Shards = Local ++ lists:keysort(#shard.name, SameZone) ++ lists:keysort(#shard.name, DifferentZone),
% suppress shards from down nodes
Nodes = [node()|erlang:nodes()],
Live = [S || #shard{node = N} = S <- Shards, lists:member(N, Nodes)],
Factor = list_to_integer(config:get("fabric", "shard_timeout_factor", "2")),
get_shard(Live, Options, 100, Factor).
get_shard([], _Opts, _Timeout, _Factor) ->
erlang:error({internal_server_error, "No DB shards could be opened."});
get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
MFA = {fabric_rpc, open_shard, [Name, [{timeout, Timeout} | Opts]]},
Ref = rexi:cast(Node, self(), MFA, [sync]),
try
receive {Ref, {ok, Db}} ->
{ok, Db};
{Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} ->
throw(Error);
{Ref, _Else} ->
get_shard(Rest, Opts, Timeout, Factor)
after Timeout ->
get_shard(Rest, Opts, Factor * Timeout, Factor)
end
after
rexi_monitor:stop(Mon)
end.
error_info({{<<"reduce_overflow_error">>, _} = Error, _Stack}) ->
Error;
error_info({{timeout, _} = Error, _Stack}) ->
Error;
error_info({{Error, Reason}, Stack}) ->
{Error, Reason, Stack};
error_info({Error, Stack}) ->
{Error, nil, Stack}.
update_counter(Item, Incr, D) ->
UpdateFun = fun ({Old, Count}) -> {Old, Count + Incr} end,
orddict:update(make_key(Item), UpdateFun, {Item, Incr}, D).
make_key({ok, L}) when is_list(L) ->
make_key(L);
make_key([]) ->
[];
make_key([{ok, #doc{revs= {Pos,[RevId | _]}}} | Rest]) ->
[{ok, {Pos, RevId}} | make_key(Rest)];
make_key([{{not_found, missing}, Rev} | Rest]) ->
[{not_found, Rev} | make_key(Rest)];
make_key({ok, #doc{id=Id,revs=Revs}}) ->
{Id, Revs};
make_key(Else) ->
Else.
% this presumes the incoming list is sorted, i.e. shorter revlists come first
remove_ancestors([], Acc) ->
lists:reverse(Acc);
remove_ancestors([{_, {{not_found, _}, Count}} = Head | Tail], Acc) ->
% any document is a descendant
case lists:filter(fun({_,{{ok, #doc{}}, _}}) -> true; (_) -> false end, Tail) of
[{_,{{ok, #doc{}} = Descendant, _}} | _] ->
remove_ancestors(update_counter(Descendant, Count, Tail), Acc);
[] ->
remove_ancestors(Tail, [Head | Acc])
end;
remove_ancestors([{_,{{ok, #doc{revs = {Pos, Revs}}}, Count}} = Head | Tail], Acc) ->
Descendants = lists:dropwhile(fun
({_,{{ok, #doc{revs = {Pos2, Revs2}}}, _}}) ->
case lists:nthtail(erlang:min(Pos2 - Pos, length(Revs2)), Revs2) of
[] ->
% impossible to tell if Revs2 is a descendant - assume no
true;
History ->
% if Revs2 is a descendant, History is a prefix of Revs
not lists:prefix(History, Revs)
end
end, Tail),
case Descendants of [] ->
remove_ancestors(Tail, [Head | Acc]);
[{Descendant, _} | _] ->
remove_ancestors(update_counter(Descendant, Count, Tail), Acc)
end;
remove_ancestors([Error | Tail], Acc) ->
remove_ancestors(Tail, [Error | Acc]).
create_monitors(Shards) ->
MonRefs = lists:usort([
rexi_utils:server_pid(N) || #shard{node=N} <- Shards
]),
rexi_monitor:start(MonRefs).
%% verify only id and rev are used in key.
update_counter_test() ->
Reply = {ok, #doc{id = <<"id">>, revs = <<"rev">>,
body = <<"body">>, atts = <<"atts">>}},
?assertEqual([{{<<"id">>,<<"rev">>}, {Reply, 1}}],
update_counter(Reply, 1, [])).
remove_ancestors_test() ->
Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
Bar2 = {not_found, {1,<<"bar">>}},
?assertEqual(
[kv(Bar1,1), kv(Foo1,1)],
remove_ancestors([kv(Bar1,1), kv(Foo1,1)], [])
),
?assertEqual(
[kv(Bar1,1), kv(Foo2,2)],
remove_ancestors([kv(Bar1,1), kv(Foo1,1), kv(Foo2,1)], [])
),
?assertEqual(
[kv(Bar1,2)],
remove_ancestors([kv(Bar2,1), kv(Bar1,1)], [])
).
%% test function
kv(Item, Count) ->
{make_key(Item), {Item,Count}}.
doc_id_and_rev(#doc{id=DocId, revs={RevNum, [RevHash|_]}}) ->
{DocId, {RevNum, RevHash}}.