blob: b0a3628c4ae3f296d521dddbbeb5eec29baaa7fc [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_view_changes).
-export([go/5, pack_seqs/1, unpack_seqs/2]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(fabric_db_update_listener, [wait_db_updated/1]).
go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse
Feed == "longpoll" ->
Args = make_changes_args(Options),
Since = get_start_seq(DbName, Args),
case validate_start_seq(DbName, Since) of
ok ->
{ok, Acc} = Callback(start, Acc0),
{Timeout, _} = couch_changes:get_changes_timeout(Args, Callback),
Ref = make_ref(),
Parent = self(),
UpdateListener = {spawn_link(fabric_db_update_listener, go,
[Parent, Ref, DbName, Timeout]),
Ref},
try
keep_sending_changes(
DbName,
Args,
Callback,
Since,
Acc,
Timeout,
UpdateListener,
os:timestamp()
)
after
fabric_db_update_listener:stop(UpdateListener)
end;
Error ->
Callback(Error, Acc0)
end;
go(DbName, "normal", Options, Callback, Acc0) ->
Args = make_changes_args(Options),
Since = get_start_seq(DbName, Args),
case validate_start_seq(DbName, Since) of
ok ->
{ok, Acc} = Callback(start, Acc0),
{ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
DbName,
Args,
Callback,
Since,
Acc,
5000
),
Callback({stop, pack_seqs(Seqs)}, AccOut);
Error ->
Callback(Error, Acc0)
end.
keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) ->
#changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
{ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout),
#collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
LastSeq = pack_seqs(NewSeqs),
if Limit > Limit2, Feed == "longpoll" ->
Callback({stop, LastSeq}, AccOut);
true ->
WaitForUpdate = wait_db_updated(UpListen),
AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000,
Max = case config:get("fabric", "changes_duration") of
undefined ->
infinity;
MaxStr ->
list_to_integer(MaxStr)
end,
case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of
{undefined, _, timeout} ->
Callback({stop, LastSeq}, AccOut);
{_, true, timeout} ->
Callback({stop, LastSeq}, AccOut);
_ ->
{ok, AccTimeout} = Callback(timeout, AccOut),
keep_sending_changes(
DbName,
Args#changes_args{limit=Limit2},
Callback,
LastSeq,
AccTimeout,
Timeout,
UpListen,
T0
)
end
end.
send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
LiveNodes = [node() | nodes()],
AllLiveShards = mem3:live_shards(DbName, LiveNodes),
Seqs = lists:flatmap(fun({#shard{name=Name, node=N} = Shard, Seq}) ->
case lists:member(Shard, AllLiveShards) of
true ->
Ref = rexi:cast(N, {fabric_rpc, changes, [Name,ChangesArgs,Seq]}),
[{Shard#shard{ref = Ref}, Seq}];
false ->
% Find some replacement shards to cover the missing range
% TODO It's possible in rare cases of shard merging to end up
% with overlapping shard ranges from this technique
lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
{NewShard#shard{ref = Ref}, 0}
end, find_replacement_shards(Shard, AllLiveShards))
end
end, unpack_seqs(PackedSeqs, DbName)),
{Workers, _} = lists:unzip(Seqs),
RexiMon = fabric_util:create_monitors(Workers),
State = #collector{
query_args = ChangesArgs,
callback = Callback,
counters = orddict:from_list(Seqs),
user_acc = AccIn,
limit = ChangesArgs#changes_args.limit,
rows = Seqs % store sequence positions instead
},
%% TODO: errors need to be handled here
try
receive_results(Workers, State, Timeout, Callback)
after
rexi_monitor:stop(RexiMon),
fabric_util:cleanup(Workers)
end.
receive_results(Workers, State, Timeout, Callback) ->
case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State,
infinity, Timeout) of
{timeout, NewState0} ->
{ok, AccOut} = Callback(timeout, NewState0#collector.user_acc),
NewState = NewState0#collector{user_acc = AccOut},
receive_results(Workers, NewState, Timeout, Callback);
{_, NewState} ->
{ok, NewState}
end.
handle_message({rexi_DOWN, _, {_, NodeRef}, _}, nil, State) ->
fabric_view:remove_down_shards(State, NodeRef);
handle_message({rexi_EXIT, Reason}, Worker, State) ->
#collector{
callback=Callback,
counters=Counters0,
rows = Seqs0,
user_acc=Acc
} = State,
Counters = fabric_dict:erase(Worker, Counters0),
Seqs = fabric_dict:erase(Worker, Seqs0),
case fabric_view:is_progress_possible(Counters) of
true ->
{ok, State#collector{counters = Counters, rows=Seqs}};
false ->
{ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
{error, Resp}
end;
handle_message(_, _, #collector{limit=0} = State) ->
{stop, State};
handle_message(#change{key=Seq} = Row0, {Worker, From}, St) ->
#collector{
query_args = #changes_args{include_docs=IncludeDocs},
callback = Callback,
counters = S0,
limit = Limit,
user_acc = AccIn
} = St,
case fabric_dict:lookup_element(Worker, S0) of
undefined ->
% this worker lost the race with other partition copies, terminate it
gen_server:reply(From, stop),
{ok, St};
_ ->
S1 = fabric_dict:store(Worker, Seq, S0),
S2 = fabric_view:remove_overlapping_shards(Worker, S1),
% this check should not be necessary at all, as holes in the ranges
% created from DOWN messages would have led to errors
case fabric_view:is_progress_possible(S2) of
true ->
Row = Row0#change{key = pack_seqs(S2)},
{Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
gen_server:reply(From, Go),
{Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}};
false ->
Reason = {range_not_covered, <<"progress not possible">>},
Callback({error, Reason}, AccIn),
gen_server:reply(From, stop),
{stop, St#collector{counters=S2}}
end
end;
handle_message({complete, EndSeq}, Worker, State) ->
#collector{
callback = Callback,
counters = S0,
total_rows = Completed, % override
user_acc = Acc
} = State,
case fabric_dict:lookup_element(Worker, S0) of
undefined ->
{ok, State};
_ ->
S1 = fabric_dict:store(Worker, EndSeq, S0),
% unlikely to have overlaps here, but possible w/ filters
S2 = fabric_view:remove_overlapping_shards(Worker, S1),
NewState = State#collector{counters=S2, total_rows=Completed+1},
case fabric_dict:size(S2) =:= (Completed+1) of
true ->
% check ranges are covered, again this should not be neccessary
% as any holes in the ranges due to DOWN messages would have errored
% out sooner
case fabric_view:is_progress_possible(S2) of
true ->
{stop, NewState};
false ->
Reason = {range_not_covered, <<"progress not possible">>},
Callback({error, Reason}, Acc),
{stop, NewState}
end;
false ->
{ok, NewState}
end
end.
make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
Args#changes_args{filter = Style};
make_changes_args(Args) ->
Args.
get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
Since;
get_start_seq(DbName, #changes_args{dir=rev}) ->
Shards = mem3:shards(DbName),
Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
{ok, Since} = fabric_util:recv(Workers, #shard.ref,
fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
Since.
collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
case fabric_dict:lookup_element(Shard, Counters) of
undefined ->
% already heard from someone else in this range
{ok, Counters};
-1 ->
C1 = fabric_dict:store(Shard, Seq, Counters),
C2 = fabric_view:remove_overlapping_shards(Shard, C1),
case fabric_dict:any(-1, C2) of
true ->
{ok, C2};
false ->
{stop, pack_seqs(C2)}
end
end.
pack_seqs(Workers) ->
SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
SeqSum = lists:sum(element(2, lists:unzip(Workers))),
Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
[SeqSum, Opaque].
unpack_seqs(0, DbName) ->
fabric_dict:init(mem3:shards(DbName), 0);
unpack_seqs("0", DbName) ->
fabric_dict:init(mem3:shards(DbName), 0);
unpack_seqs([_SeqNum, Opaque], DbName) ->
do_unpack_seqs(Opaque, DbName);
unpack_seqs(Packed, DbName) ->
NewPattern = "^\\[[0-9]+\s*,\s*\"(?<opaque>.*)\"\\]$",
OldPattern = "^\"?([0-9]+-)?(?<opaque>.*?)\"?$",
Options = [{capture, [opaque], binary}],
Opaque = case re:run(Packed, NewPattern, Options) of
{match, Match} ->
Match;
nomatch ->
{match, Match} = re:run(Packed, OldPattern, Options),
Match
end,
do_unpack_seqs(Opaque, DbName).
do_unpack_seqs(Opaque, DbName) ->
% A preventative fix for FB 13533 to remove duplicate shards.
% This just picks each unique shard and keeps the largest seq
% value recorded.
Decoded = binary_to_term(couch_util:decodeBase64Url(Opaque)),
DedupDict = lists:foldl(fun({Node, [A, B], Seq}, Acc) ->
dict:append({Node, [A, B]}, Seq, Acc)
end, dict:new(), Decoded),
Deduped = lists:map(fun({{Node, [A, B]}, SeqList}) ->
{Node, [A, B], lists:max(SeqList)}
end, dict:to_list(DedupDict)),
% Create a fabric_dict of {Shard, Seq} entries
% TODO relies on internal structure of fabric_dict as keylist
Unpacked = lists:flatmap(fun({Node, [A,B], Seq}) ->
case mem3:get_shard(DbName, Node, [A,B]) of
{ok, Shard} ->
[{Shard, Seq}];
{error, not_found} ->
[]
end
end, Deduped),
% Fill holes in the since sequence. If/when we ever start
% using overlapping shard ranges this will need to be updated
% to not include shard ranges that overlap entries in Upacked.
% A quick and dirty approach would be like such:
%
% lists:foldl(fun(S, Acc) ->
% fabric_view:remove_overlapping_shards(S, Acc)
% end, mem3:shards(DbName), Unpacked)
%
% Unfortunately remove_overlapping_shards isn't reusable because
% of its calls to rexi:kill/2. When we get to overlapping
% shard ranges and have to rewrite shard range management
% we can revisit this simpler algorithm.
case fabric_view:is_progress_possible(Unpacked) of
true ->
Unpacked;
false ->
Ranges = lists:usort([R || #shard{range=R} <- Unpacked]),
Filter = fun(S) -> not lists:member(S#shard.range, Ranges) end,
Replacements = lists:filter(Filter, mem3:shards(DbName)),
Unpacked ++ [{R, 0} || R <- Replacements]
end.
changes_row(#change{key=Seq, id=Id, value=Value, deleted=true, doc=Doc}, true) ->
{change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, Doc}]}};
changes_row(#change{key=Seq, id=Id, value=Value, deleted=true}, false) ->
{change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
changes_row(#change{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
{change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
changes_row(#change{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
{change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
changes_row(#change{key=Seq, id=Id, value=Value}, false) ->
{change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
find_replacement_shards(#shard{range=Range}, AllShards) ->
% TODO make this moar betta -- we might have split or merged the partition
[Shard || Shard <- AllShards, Shard#shard.range =:= Range].
validate_start_seq(DbName, Seq) ->
try unpack_seqs(Seq, DbName) of _Any ->
ok
catch
error:database_does_not_exist ->
{error, database_does_not_exist};
_:_ ->
Reason = <<"Malformed sequence supplied in 'since' parameter.">>,
{error, {bad_request, Reason}}
end.
unpack_seqs_test() ->
meck:new(mem3),
meck:new(fabric_view),
meck:expect(mem3, get_shard, fun(_, _, _) -> {ok, #shard{}} end),
meck:expect(fabric_view, is_progress_possible, fun(_) -> true end),
% BigCouch 0.3 style.
assert_shards("23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA"),
% BigCouch 0.4 style.
assert_shards([23423,<<"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA">>]),
% BigCouch 0.4 style (as string).
assert_shards("[23423,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
assert_shards("[23423 ,\"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
assert_shards("[23423, \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
assert_shards("[23423 , \"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\"]"),
% with internal hypen
assert_shards("651-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
"5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
"VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"),
assert_shards([651,"g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwNDLXMwBCwxygOFMiQ"
"5L8____sxJTcalIUgCSSfZgReE4FTmAFMWDFYXgVJQAUlQPVuSKS1EeC5BkaABSQHXz8"
"VgJUbgAonB_VqIPfoUHIArvE7T6AUQh0I1-WQAzp1XB"]),
% CouchDB 1.2 style
assert_shards("\"23423-g1AAAAE7eJzLYWBg4MhgTmHgS0ktM3QwND"
"LXMwBCwxygOFMiQ5L8____sxIZcKlIUgCSSfZgRUw4FTmAFMWDFTHiVJQAUlSPX1Ee"
"C5BkaABSQHXzsxKZ8StcAFG4H4_bIAoPQBTeJ2j1A4hCUJBkAQC7U1NA\""),
meck:unload(fabric_view),
meck:unload(mem3).
assert_shards(Packed) ->
?assertMatch([{#shard{},_}|_], unpack_seqs(Packed, <<"foo">>)).