blob: 3715fa7b0a21ca064fb7aec4160ca3f2c99a7fc3 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_util).
-export([
submit_jobs/3, submit_jobs/4,
cleanup/1,
recv/4,
get_db/1, get_db/2,
error_info/1,
update_counter/3,
remove_ancestors/2,
create_monitors/1,
kv/2,
remove_down_workers/2, remove_down_workers/3,
doc_id_and_rev/1
]).
-export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0, view_timeout/1]).
-export([log_timeout/2, remove_done_workers/2]).
-export([is_users_db/1, is_replicator_db/1]).
-export([open_cluster_db/1, open_cluster_db/2]).
-export([is_partitioned/1]).
-export([validate_all_docs_args/2, validate_args/3]).
-export([upgrade_mrargs/1]).
-export([worker_ranges/1]).
-export([get_uuid_prefix_len/0]).
-export([isolate/1, isolate/2]).
-compile({inline, [{doc_id_and_rev, 1}]}).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("eunit/include/eunit.hrl").
remove_down_workers(Workers, BadNode) ->
remove_down_workers(Workers, BadNode, []).
remove_down_workers(Workers, BadNode, RingOpts) ->
Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
NewWorkers = fabric_dict:filter(Filter, Workers),
case fabric_ring:is_progress_possible(NewWorkers, RingOpts) of
true ->
{ok, NewWorkers};
false ->
error
end.
submit_jobs(Shards, EndPoint, ExtraArgs) ->
submit_jobs(Shards, fabric_rpc, EndPoint, ExtraArgs).
submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
lists:map(
fun(#shard{node = Node, name = ShardName} = Shard) ->
Ref = rexi:cast(Node, {Module, EndPoint, [ShardName | ExtraArgs]}),
Shard#shard{ref = Ref}
end,
Shards
).
cleanup(Workers) ->
rexi:kill_all([{Node, Ref} || #shard{node = Node, ref = Ref} <- Workers]).
recv(Workers, Keypos, Fun, Acc0) ->
rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
request_timeout() ->
timeout("request", "60000").
all_docs_timeout() ->
timeout("all_docs", "10000").
attachments_timeout() ->
timeout("attachments", "600000").
view_timeout(Args) ->
PartitionQuery = couch_mrview_util:get_extra(Args, partition, false),
case PartitionQuery of
false -> timeout("view", "infinity");
_ -> timeout("partition_view", "infinity")
end.
timeout(Type, Default) ->
case config:get("fabric", Type ++ "_timeout", Default) of
"infinity" -> infinity;
N -> list_to_integer(N)
end.
log_timeout(Workers, EndPoint) ->
CounterKey = [fabric, worker, timeouts],
couch_stats:increment_counter(CounterKey),
lists:map(
fun(#shard{node = Dest, name = Name}) ->
Fmt = "fabric_worker_timeout ~s,~p,~p",
couch_log:error(Fmt, [EndPoint, Dest, Name])
end,
Workers
).
remove_done_workers(Workers, WaitingIndicator) ->
[W || {W, WI} <- fabric_dict:to_list(Workers), WI == WaitingIndicator].
get_db(DbName) ->
get_db(DbName, []).
get_db(DbName, Options) ->
{Local, SameZone, DifferentZone} = mem3:group_by_proximity(mem3:shards(DbName)),
% Prefer shards on the same node over other nodes, prefer shards in the same zone over
% over zones and sort each remote list by name so that we don't repeatedly try the same node.
Shards =
Local ++ lists:keysort(#shard.name, SameZone) ++ lists:keysort(#shard.name, DifferentZone),
% suppress shards from down nodes
Nodes = [node() | erlang:nodes()],
Live = [S || #shard{node = N} = S <- Shards, lists:member(N, Nodes)],
% Only accept factors > 1, otherwise our math breaks further down
Factor = max(2, config:get_integer("fabric", "shard_timeout_factor", 2)),
MinTimeout = config:get_integer("fabric", "shard_timeout_min_msec", 100),
MaxTimeout = request_timeout(),
Timeout = get_db_timeout(length(Live), Factor, MinTimeout, MaxTimeout),
get_shard(Live, Options, Timeout, Factor).
get_shard([], _Opts, _Timeout, _Factor) ->
erlang:error({internal_server_error, "No DB shards could be opened."});
get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) ->
Mon = rexi_monitor:start([rexi_utils:server_pid(Node)]),
MFA = {fabric_rpc, open_shard, [Name, [{timeout, Timeout} | Opts]]},
Ref = rexi:cast(Node, self(), MFA, [sync]),
try
receive
{Ref, {ok, Db}, {cost, Cost}} ->
couch_cost:accumulate_costs(Cost),
{ok, Db};
{Ref, {'rexi_EXIT', {{unauthorized, _} = Error, _}}} ->
throw(Error);
{Ref, {'rexi_EXIT', {{forbidden, _} = Error, _}}} ->
throw(Error);
{Ref, Reason} ->
couch_log:debug("Failed to open shard ~p because: ~p", [Name, Reason]),
get_shard(Rest, Opts, Timeout, Factor);
Other ->
io:format("GOT UNEXPECTED MESSAGE FORMAT: ~p~n", [Other]),
erlang:error(Other)
after Timeout ->
couch_log:debug("Failed to open shard ~p after: ~p", [Name, Timeout]),
get_shard(Rest, Opts, Factor * Timeout, Factor)
end
after
rexi_monitor:stop(Mon)
end.
get_db_timeout(N, Factor, MinTimeout, infinity) ->
% MaxTimeout may be infinity so we just use the largest Erlang small int to
% avoid blowing up the arithmetic
get_db_timeout(N, Factor, MinTimeout, 1 bsl 59);
get_db_timeout(N, Factor, MinTimeout, MaxTimeout) ->
%
% The progression of timeouts forms a geometric series:
%
% MaxTimeout = T + T*F + T*F^2 + T*F^3 ...
%
% Where T is the initial timeout and F is the factor. The formula for
% the sum is:
%
% Sum[T * F^I, I <- 0..N] = T * (1 - F^(N + 1)) / (1 - F)
%
% Then, for a given sum and factor we can calculate the initial timeout T:
%
% T = Sum / ((1 - F^(N+1)) / (1 - F))
%
Timeout = MaxTimeout / ((1 - math:pow(Factor, N + 1)) / (1 - Factor)),
% Apply a minimum timeout value
max(MinTimeout, trunc(Timeout)).
error_info({{timeout, _} = Error, _Stack}) ->
Error;
error_info({{Error, Reason}, Stack}) ->
{Error, Reason, Stack};
error_info({Error, Stack}) ->
{Error, nil, Stack}.
update_counter(Item, Incr, D) ->
UpdateFun = fun({Old, Count}) -> {Old, Count + Incr} end,
orddict:update(make_key(Item), UpdateFun, {Item, Incr}, D).
make_key({ok, L}) when is_list(L) ->
make_key(L);
make_key([]) ->
[];
make_key([{ok, #doc{revs = {Pos, [RevId | _]}}} | Rest]) ->
[{ok, {Pos, RevId}} | make_key(Rest)];
make_key([{{not_found, missing}, Rev} | Rest]) ->
[{not_found, Rev} | make_key(Rest)];
make_key({ok, #doc{id = Id, revs = Revs}}) ->
{Id, Revs};
make_key(Else) ->
Else.
% this presumes the incoming list is sorted, i.e. shorter revlists come first
remove_ancestors([], Acc) ->
lists:reverse(Acc);
remove_ancestors([{_, {{not_found, _}, Count}} = Head | Tail], Acc) ->
% any document is a descendant
case
lists:filter(
fun
({_, {{ok, #doc{}}, _}}) -> true;
(_) -> false
end,
Tail
)
of
[{_, {{ok, #doc{}} = Descendant, _}} | _] ->
remove_ancestors(update_counter(Descendant, Count, Tail), Acc);
[] ->
remove_ancestors(Tail, [Head | Acc])
end;
remove_ancestors([{_, {{ok, #doc{revs = {Pos, Revs}}}, Count}} = Head | Tail], Acc) ->
Descendants = lists:dropwhile(
fun({_, {{ok, #doc{revs = {Pos2, Revs2}}}, _}}) ->
case lists:nthtail(erlang:min(Pos2 - Pos, length(Revs2)), Revs2) of
[] ->
% impossible to tell if Revs2 is a descendant - assume no
true;
History ->
% if Revs2 is a descendant, History is a prefix of Revs
not lists:prefix(History, Revs)
end
end,
Tail
),
case Descendants of
[] ->
remove_ancestors(Tail, [Head | Acc]);
[{Descendant, _} | _] ->
remove_ancestors(update_counter(Descendant, Count, Tail), Acc)
end;
remove_ancestors([Error | Tail], Acc) ->
remove_ancestors(Tail, [Error | Acc]).
create_monitors(Shards) ->
MonRefs = lists:usort([rexi_utils:server_pid(N) || #shard{node = N} <- Shards]),
rexi_monitor:start(MonRefs).
%% verify only id and rev are used in key.
update_counter_test() ->
Reply =
{ok, #doc{
id = <<"id">>,
revs = <<"rev">>,
body = <<"body">>,
atts = <<"atts">>
}},
?assertEqual(
[{{<<"id">>, <<"rev">>}, {Reply, 1}}],
update_counter(Reply, 1, [])
).
remove_ancestors_test() ->
Foo1 = {ok, #doc{revs = {1, [<<"foo">>]}}},
Foo2 = {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}},
Bar1 = {ok, #doc{revs = {1, [<<"bar">>]}}},
Bar2 = {not_found, {1, <<"bar">>}},
?assertEqual(
[kv(Bar1, 1), kv(Foo1, 1)],
remove_ancestors([kv(Bar1, 1), kv(Foo1, 1)], [])
),
?assertEqual(
[kv(Bar1, 1), kv(Foo2, 2)],
remove_ancestors([kv(Bar1, 1), kv(Foo1, 1), kv(Foo2, 1)], [])
),
?assertEqual(
[kv(Bar1, 2)],
remove_ancestors([kv(Bar2, 1), kv(Bar1, 1)], [])
).
is_replicator_db(DbName) ->
path_ends_with(DbName, <<"_replicator">>).
is_users_db(DbName) ->
ConfigName = list_to_binary(
config:get(
"chttpd_auth", "authentication_db", "_users"
)
),
DbName == ConfigName orelse path_ends_with(DbName, <<"_users">>).
path_ends_with(Path, Suffix) ->
Suffix =:= couch_db:dbname_suffix(Path).
open_cluster_db(#shard{dbname = DbName, opts = Options}) ->
case couch_util:get_value(props, Options) of
Props when is_list(Props) ->
{ok, Db} = couch_db:clustered_db(DbName, [{props, Props}]),
Db;
_ ->
{ok, Db} = couch_db:clustered_db(DbName, []),
Db
end.
open_cluster_db(DbName, Opts) ->
% as admin
{SecProps} = fabric:get_security(DbName),
UserCtx = couch_util:get_value(user_ctx, Opts, #user_ctx{}),
{ok, Db} = couch_db:clustered_db(DbName, UserCtx, SecProps),
Db.
%% test function
kv(Item, Count) ->
{make_key(Item), {Item, Count}}.
doc_id_and_rev(#doc{id = DocId, revs = {RevNum, [RevHash | _]}}) ->
{DocId, {RevNum, RevHash}}.
is_partitioned(DbName0) when is_binary(DbName0) ->
Shards = mem3:shards(fabric:dbname(DbName0)),
is_partitioned(open_cluster_db(hd(Shards)));
is_partitioned(Db) ->
couch_db:is_partitioned(Db).
validate_all_docs_args(DbName, Args) when is_binary(DbName) ->
Shards = mem3:shards(fabric:dbname(DbName)),
Db = open_cluster_db(hd(Shards)),
validate_all_docs_args(Db, Args);
validate_all_docs_args(Db, Args) ->
true = couch_db:is_clustered(Db),
couch_mrview_util:validate_all_docs_args(Db, Args).
validate_args(DbName, DDoc, Args) when is_binary(DbName) ->
Shards = mem3:shards(fabric:dbname(DbName)),
Db = open_cluster_db(hd(Shards)),
validate_args(Db, DDoc, Args);
validate_args(Db, DDoc, Args) ->
true = couch_db:is_clustered(Db),
couch_mrview_util:validate_args(Db, DDoc, Args).
upgrade_mrargs(#mrargs{} = Args) ->
Args;
upgrade_mrargs(
{mrargs, ViewType, Reduce, PreflightFun, StartKey, StartKeyDocId, EndKey, EndKeyDocId, Keys,
Direction, Limit, Skip, GroupLevel, Group, Stale, MultiGet, InclusiveEnd, IncludeDocs,
DocOptions, UpdateSeq, Conflicts, Callback, Sorted, Extra}
) ->
{Stable, Update} =
case Stale of
ok -> {true, false};
update_after -> {true, lazy};
_ -> {false, true}
end,
#mrargs{
view_type = ViewType,
reduce = Reduce,
preflight_fun = PreflightFun,
start_key = StartKey,
start_key_docid = StartKeyDocId,
end_key = EndKey,
end_key_docid = EndKeyDocId,
keys = Keys,
direction = Direction,
limit = Limit,
skip = Skip,
group_level = GroupLevel,
group = Group,
stable = Stable,
update = Update,
multi_get = MultiGet,
inclusive_end = InclusiveEnd,
include_docs = IncludeDocs,
doc_options = DocOptions,
update_seq = UpdateSeq,
conflicts = Conflicts,
callback = Callback,
sorted = Sorted,
extra = Extra
}.
worker_ranges(Workers) ->
Ranges = fabric_dict:fold(
fun(#shard{range = [X, Y]}, _, Acc) ->
[{X, Y} | Acc]
end,
[],
Workers
),
lists:usort(Ranges).
get_uuid_prefix_len() ->
config:get_integer("fabric", "uuid_prefix_len", 7).
% If we issue multiple fabric calls from the same process we have to isolate
% them so in case of error they don't pollute the processes dictionary or the
% mailbox
isolate(Fun) ->
isolate(Fun, infinity).
isolate(Fun, Timeout) ->
{Pid, Ref} = erlang:spawn_monitor(fun() -> exit(do_isolate(Fun)) end),
receive
{'DOWN', Ref, _, _, {'$isolres', Res}} ->
Res;
{'DOWN', Ref, _, _, {'$isolerr', Tag, Reason, Stack}} ->
erlang:raise(Tag, Reason, Stack)
after Timeout ->
erlang:demonitor(Ref, [flush]),
exit(Pid, kill),
erlang:error(timeout)
end.
do_isolate(Fun) ->
try
{'$isolres', Fun()}
catch
Tag:Reason:Stack ->
{'$isolerr', Tag, Reason, Stack}
end.
get_db_timeout_test() ->
% Q=1, N=1
?assertEqual(20000, get_db_timeout(1, 2, 100, 60000)),
% Q=2, N=1
?assertEqual(8571, get_db_timeout(2, 2, 100, 60000)),
% Q=2, N=3 (default)
?assertEqual(472, get_db_timeout(2 * 3, 2, 100, 60000)),
% Q=3, N=3
?assertEqual(100, get_db_timeout(3 * 3, 2, 100, 60000)),
% Q=4, N=1
?assertEqual(1935, get_db_timeout(4, 2, 100, 60000)),
% Q=8, N=1
?assertEqual(117, get_db_timeout(8, 2, 100, 60000)),
% Q=8, N=3 (default in 2.x)
?assertEqual(100, get_db_timeout(8 * 3, 2, 100, 60000)),
% Q=256, N=3
?assertEqual(100, get_db_timeout(256 * 3, 2, 100, 60000)),
% Large factor = 100
?assertEqual(100, get_db_timeout(2 * 3, 100, 100, 60000)),
% Small total request timeout = 1 sec
?assertEqual(100, get_db_timeout(2 * 3, 2, 100, 1000)),
% Large total request timeout
?assertEqual(28346, get_db_timeout(2 * 3, 2, 100, 3600000)),
% No shards at all
?assertEqual(60000, get_db_timeout(0, 2, 100, 60000)),
% request_timeout was set to infinity, with enough shards it still gets to
% 100 min timeout at the start from the exponential logic
?assertEqual(100, get_db_timeout(64, 2, 100, infinity)).