| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(fabric_view). |
| |
| -export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1, |
| transform_row/1, keydict/1, extract_view/4, get_shards/2, |
| remove_down_shards/2]). |
| |
| -include_lib("fabric/include/fabric.hrl"). |
| -include_lib("mem3/include/mem3.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("couch_mrview/include/couch_mrview.hrl"). |
| |
| -spec remove_down_shards(#collector{}, node()) -> |
| {ok, #collector{}} | {error, any()}. |
| remove_down_shards(Collector, BadNode) -> |
| #collector{callback=Callback, counters=Counters, user_acc=Acc} = Collector, |
| case fabric_util:remove_down_workers(Counters, BadNode) of |
| {ok, NewCounters} -> |
| {ok, Collector#collector{counters = NewCounters}}; |
| error -> |
| Reason = {nodedown, <<"progress not possible">>}, |
| Callback({error, Reason}, Acc), |
| {stop, Collector} |
| end. |
| |
| %% @doc looks for a fully covered keyrange in the list of counters |
| -spec is_progress_possible([{#shard{}, term()}]) -> boolean(). |
| is_progress_possible([]) -> |
| false; |
| is_progress_possible(Counters) -> |
| Ranges = fabric_dict:fold(fun(#shard{range=[X,Y]}, _, A) -> [{X,Y}|A] end, |
| [], Counters), |
| [{Start, Tail0} | Rest] = lists:ukeysort(1, Ranges), |
| Result = lists:foldl(fun |
| (_, fail) -> |
| % we've already declared failure |
| fail; |
| (_, complete) -> |
| % this is the success condition, we can fast-forward |
| complete; |
| ({X,_}, Tail) when X > (Tail+1) -> |
| % gap in the keyrange, we're dead |
| fail; |
| ({_,Y}, Tail) -> |
| case erlang:max(Tail, Y) of |
| End when (End+1) =:= (2 bsl 31) -> |
| complete; |
| Else -> |
| % the normal condition, adding to the tail |
| Else |
| end |
| end, if (Tail0+1) =:= (2 bsl 31) -> complete; true -> Tail0 end, Rest), |
| (Start =:= 0) andalso (Result =:= complete). |
| |
| -spec remove_overlapping_shards(#shard{}, [{#shard{}, any()}]) -> |
| [{#shard{}, any()}]. |
| remove_overlapping_shards(#shard{range=[A,B]} = Shard0, Shards) -> |
| fabric_dict:filter(fun(#shard{range=[X,Y], node=Node, ref=Ref} = Shard, _) -> |
| if Shard =:= Shard0 -> |
| % we can't remove ourselves |
| true; |
| A < B, X >= A, X < B -> |
| % lower bound is inside our range |
| rexi:kill(Node, Ref), |
| false; |
| A < B, Y > A, Y =< B -> |
| % upper bound is inside our range |
| rexi:kill(Node, Ref), |
| false; |
| B < A, X >= A orelse B < A, X < B -> |
| % target shard wraps the key range, lower bound is inside |
| rexi:kill(Node, Ref), |
| false; |
| B < A, Y > A orelse B < A, Y =< B -> |
| % target shard wraps the key range, upper bound is inside |
| rexi:kill(Node, Ref), |
| false; |
| true -> |
| true |
| end |
| end, Shards). |
| |
| maybe_send_row(#collector{limit=0} = State) -> |
| #collector{counters=Counters, user_acc=AccIn, callback=Callback} = State, |
| case fabric_dict:any(0, Counters) of |
| true -> |
| % we still need to send the total/offset header |
| {ok, State}; |
| false -> |
| {_, Acc} = Callback(complete, AccIn), |
| {stop, State#collector{user_acc=Acc}} |
| end; |
| maybe_send_row(State) -> |
| #collector{ |
| callback = Callback, |
| counters = Counters, |
| skip = Skip, |
| limit = Limit, |
| user_acc = AccIn |
| } = State, |
| case fabric_dict:any(0, Counters) of |
| true -> |
| {ok, State}; |
| false -> |
| try get_next_row(State) of |
| {_, NewState} when Skip > 0 -> |
| maybe_send_row(NewState#collector{skip=Skip-1}); |
| {Row, NewState} -> |
| case Callback(transform_row(possibly_embed_doc(NewState,Row)), AccIn) of |
| {stop, Acc} -> |
| {stop, NewState#collector{user_acc=Acc, limit=Limit-1}}; |
| {ok, Acc} -> |
| maybe_send_row(NewState#collector{user_acc=Acc, limit=Limit-1}) |
| end |
| catch complete -> |
| {_, Acc} = Callback(complete, AccIn), |
| {stop, State#collector{user_acc=Acc}} |
| end |
| end. |
| |
| %% if include_docs=true is used when keys and |
| %% the values contain "_id" then use the "_id"s |
| %% to retrieve documents and embed in result |
| possibly_embed_doc(_State, |
| #view_row{id=reduced}=Row) -> |
| Row; |
| possibly_embed_doc(_State, |
| #view_row{value=undefined}=Row) -> |
| Row; |
| possibly_embed_doc(#collector{db_name=DbName, query_args=Args}, |
| #view_row{key=_Key, id=_Id, value=Value, doc=_Doc}=Row) -> |
| #mrargs{include_docs=IncludeDocs} = Args, |
| case IncludeDocs andalso is_tuple(Value) of |
| true -> |
| {Props} = Value, |
| Rev0 = couch_util:get_value(<<"_rev">>, Props), |
| case couch_util:get_value(<<"_id">>,Props) of |
| null -> Row#view_row{doc=null}; |
| undefined -> Row; |
| IncId -> |
| % use separate process to call fabric:open_doc |
| % to not interfere with current call |
| {Pid, Ref} = spawn_monitor(fun() -> |
| exit( |
| case Rev0 of |
| undefined -> |
| case fabric:open_doc(DbName, IncId, []) of |
| {ok, NewDoc} -> |
| Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])}; |
| {not_found, _} -> |
| Row#view_row{doc=null} |
| end; |
| Rev0 -> |
| Rev = couch_doc:parse_rev(Rev0), |
| case fabric:open_revs(DbName, IncId, [Rev], []) of |
| {ok, [{ok, NewDoc}]} -> |
| Row#view_row{doc=couch_doc:to_json_obj(NewDoc,[])}; |
| {ok, [{{not_found, _}, Rev}]} -> |
| Row#view_row{doc=null} |
| end |
| end) end), |
| receive {'DOWN',Ref,process,Pid, Resp} -> |
| Resp |
| end |
| end; |
| _ -> Row |
| end. |
| |
| |
| keydict(undefined) -> |
| undefined; |
| keydict(Keys) -> |
| {Dict,_} = lists:foldl(fun(K, {D,I}) -> {dict:store(K,I,D), I+1} end, |
| {dict:new(),0}, Keys), |
| Dict. |
| |
| %% internal %% |
| |
| get_next_row(#collector{rows = []}) -> |
| throw(complete); |
| get_next_row(#collector{reducer = RedSrc} = St) when RedSrc =/= undefined -> |
| #collector{ |
| query_args = #mrargs{direction=Dir}, |
| keys = Keys, |
| rows = RowDict, |
| os_proc = Proc, |
| counters = Counters0 |
| } = St, |
| {Key, RestKeys} = find_next_key(Keys, Dir, RowDict), |
| case dict:find(Key, RowDict) of |
| {ok, Records} -> |
| NewRowDict = dict:erase(Key, RowDict), |
| Counters = lists:foldl(fun(#view_row{worker={Worker,From}}, CntrsAcc) -> |
| case From of |
| {Pid, _} when is_pid(Pid) -> |
| gen_server:reply(From, ok); |
| Pid when is_pid(Pid) -> |
| rexi:stream_ack(From) |
| end, |
| fabric_dict:update_counter(Worker, -1, CntrsAcc) |
| end, Counters0, Records), |
| Wrapped = [[V] || #view_row{value=V} <- Records], |
| {ok, [Reduced]} = couch_query_servers:rereduce(Proc, [RedSrc], Wrapped), |
| NewSt = St#collector{keys=RestKeys, rows=NewRowDict, counters=Counters}, |
| {#view_row{key=Key, id=reduced, value=Reduced}, NewSt}; |
| error -> |
| get_next_row(St#collector{keys=RestKeys}) |
| end; |
| get_next_row(State) -> |
| #collector{rows = [Row|Rest], counters = Counters0} = State, |
| {Worker, From} = Row#view_row.worker, |
| rexi:stream_ack(From), |
| Counters1 = fabric_dict:update_counter(Worker, -1, Counters0), |
| {Row, State#collector{rows = Rest, counters=Counters1}}. |
| |
| find_next_key(nil, Dir, RowDict) -> |
| case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of |
| [] -> |
| throw(complete); |
| [Key|_] -> |
| {Key, nil} |
| end; |
| find_next_key([], _, _) -> |
| throw(complete); |
| find_next_key([Key|Rest], _, _) -> |
| {Key, Rest}. |
| |
| transform_row(#view_row{key=Key, id=reduced, value=Value}) -> |
| {row, {[{key,Key}, {value,Value}]}}; |
| transform_row(#view_row{key=Key, id=undefined}) -> |
| {row, {[{key,Key}, {error,not_found}]}}; |
| transform_row(#view_row{key=Key, id=Id, value=Value, doc=undefined}) -> |
| {row, {[{id,Id}, {key,Key}, {value,Value}]}}; |
| transform_row(#view_row{key=Key, id=Id, value=Value, doc={error,Reason}}) -> |
| {row, {[{id,Id}, {key,Key}, {value,Value}, {error,Reason}]}}; |
| transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) -> |
| {row, {[{id,Id}, {key,Key}, {value,Value}, {doc,Doc}]}}. |
| |
| |
| sort_fun(fwd) -> |
| fun(A,A) -> true; (A,B) -> couch_view:less_json(A,B) end; |
| sort_fun(rev) -> |
| fun(A,A) -> true; (A,B) -> couch_view:less_json(B,A) end. |
| |
| extract_view(Pid, ViewName, [], _ViewType) -> |
| couch_log:error("missing_named_view ~p", [ViewName]), |
| exit(Pid, kill), |
| exit(missing_named_view); |
| extract_view(Pid, ViewName, [View|Rest], ViewType) -> |
| case lists:member(ViewName, view_names(View, ViewType)) of |
| true -> |
| if ViewType == reduce -> |
| {index_of(ViewName, view_names(View, reduce)), View}; |
| true -> |
| View |
| end; |
| false -> |
| extract_view(Pid, ViewName, Rest, ViewType) |
| end. |
| |
| view_names(View, Type) when Type == red_map; Type == reduce -> |
| [Name || {Name, _} <- View#mrview.reduce_funs]; |
| view_names(View, map) -> |
| View#mrview.map_names. |
| |
| index_of(X, List) -> |
| index_of(X, List, 1). |
| |
| index_of(_X, [], _I) -> |
| not_found; |
| index_of(X, [X|_Rest], I) -> |
| I; |
| index_of(X, [_|Rest], I) -> |
| index_of(X, Rest, I+1). |
| |
| get_shards(DbName, #mrargs{stale=Stale}) |
| when Stale == ok orelse Stale == update_after -> |
| mem3:ushards(DbName); |
| get_shards(DbName, #mrargs{stale=false}) -> |
| mem3:shards(DbName). |
| |
| % unit test |
| is_progress_possible_test() -> |
| EndPoint = 2 bsl 31, |
| T1 = [[0, EndPoint-1]], |
| ?assertEqual(is_progress_possible(mk_cnts(T1)),true), |
| T2 = [[0,10],[11,20],[21,EndPoint-1]], |
| ?assertEqual(is_progress_possible(mk_cnts(T2)),true), |
| % gap |
| T3 = [[0,10],[12,EndPoint-1]], |
| ?assertEqual(is_progress_possible(mk_cnts(T3)),false), |
| % outside range |
| T4 = [[1,10],[11,20],[21,EndPoint-1]], |
| ?assertEqual(is_progress_possible(mk_cnts(T4)),false), |
| % outside range |
| T5 = [[0,10],[11,20],[21,EndPoint]], |
| ?assertEqual(is_progress_possible(mk_cnts(T5)),false). |
| |
| remove_overlapping_shards_test() -> |
| meck:new(rexi), |
| meck:expect(rexi, kill, fun(_, _) -> ok end), |
| EndPoint = 2 bsl 31, |
| T1 = [[0,10],[11,20],[21,EndPoint-1]], |
| Shards = mk_cnts(T1,3), |
| ?assertEqual(orddict:size( |
| remove_overlapping_shards(#shard{name=list_to_atom("node-3"), |
| node=list_to_atom("node-3"), |
| range=[11,20]}, |
| Shards)),7), |
| meck:unload(rexi). |
| |
| mk_cnts(Ranges) -> |
| Shards = lists:map(fun(Range) -> |
| #shard{range=Range} |
| end, |
| Ranges), |
| orddict:from_list([{Shard,nil} || Shard <- Shards]). |
| |
| mk_cnts(Ranges, NoNodes) -> |
| orddict:from_list([{Shard,nil} |
| || Shard <- |
| lists:flatten(lists:map( |
| fun(Range) -> |
| mk_shards(NoNodes,Range,[]) |
| end, Ranges))] |
| ). |
| |
| mk_shards(0,_Range,Shards) -> |
| Shards; |
| mk_shards(NoNodes,Range,Shards) -> |
| NodeName = list_to_atom("node-" ++ integer_to_list(NoNodes)), |
| mk_shards(NoNodes-1,Range, |
| [#shard{name=NodeName, node=NodeName, range=Range} | Shards]). |