| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- |
| |
| -module(dreyfus_fabric_search). |
| |
| -include("dreyfus.hrl"). |
| -include_lib("mem3/include/mem3.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| -export([go/4]). |
| |
| -record(state, { |
| limit, |
| sort, |
| top_docs, |
| counters, |
| start_args, |
| replacements, |
| ring_opts, |
| await_time = 0 |
| }). |
| |
| go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> |
| {ok, DDoc} = fabric:open_doc( |
| DbName, |
| <<"_design/", GroupId/binary>>, |
| [ejson_body] |
| ), |
| dreyfus_util:maybe_deny_index(DbName, GroupId, IndexName), |
| go(DbName, DDoc, IndexName, QueryArgs); |
| go(DbName, DDoc, IndexName, #index_query_args{bookmark = nil} = QueryArgs) -> |
| DesignName = dreyfus_util:get_design_docid(DDoc), |
| dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), |
| Shards = dreyfus_util:get_shards(DbName, QueryArgs), |
| RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), |
| Workers = fabric_util:submit_jobs( |
| Shards, |
| dreyfus_rpc, |
| search, |
| [DDoc, IndexName, dreyfus_util:export(QueryArgs)] |
| ), |
| Counters = fabric_dict:init(Workers, nil), |
| go(DbName, DDoc, IndexName, QueryArgs, Counters, Counters, RingOpts); |
| go(DbName, DDoc, IndexName, #index_query_args{} = QueryArgs) -> |
| Bookmark0 = |
| try |
| dreyfus_bookmark:unpack(DbName, QueryArgs) |
| catch |
| _:_ -> |
| throw({bad_request, "Invalid bookmark parameter supplied"}) |
| end, |
| Shards = dreyfus_util:get_shards(DbName, QueryArgs), |
| LiveNodes = [node() | nodes()], |
| LiveShards = [S || #shard{node = Node} = S <- Shards, lists:member(Node, LiveNodes)], |
| Bookmark1 = dreyfus_bookmark:add_missing_shards(Bookmark0, LiveShards), |
| Counters0 = lists:flatmap( |
| fun({#shard{name = Name, node = N} = Shard, After}) -> |
| QueryArgs1 = dreyfus_util:export(QueryArgs#index_query_args{ |
| bookmark = After |
| }), |
| case lists:member(Shard, LiveShards) of |
| true -> |
| Ref = rexi:cast(N, {dreyfus_rpc, search, [Name, DDoc, IndexName, QueryArgs1]}), |
| [Shard#shard{ref = Ref}]; |
| false -> |
| lists:map( |
| fun(#shard{name = Name2, node = N2} = NewShard) -> |
| Ref = rexi:cast( |
| N2, {dreyfus_rpc, search, [Name2, DDoc, IndexName, QueryArgs1]} |
| ), |
| NewShard#shard{ref = Ref} |
| end, |
| find_replacement_shards(Shard, LiveShards) |
| ) |
| end |
| end, |
| Bookmark1 |
| ), |
| Counters = fabric_dict:init(Counters0, nil), |
| WorkerShards = fabric_dict:fetch_keys(Counters), |
| RingOpts = dreyfus_util:get_ring_opts(QueryArgs, WorkerShards), |
| QueryArgs2 = QueryArgs#index_query_args{ |
| bookmark = Bookmark1 |
| }, |
| go(DbName, DDoc, IndexName, QueryArgs2, Counters, Bookmark1, RingOpts); |
| go(DbName, DDoc, IndexName, OldArgs) -> |
| go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)). |
| |
| go(DbName, DDoc, IndexName, QueryArgs, Counters, Bookmark, RingOpts) -> |
| {Workers, _} = lists:unzip(Counters), |
| #index_query_args{ |
| limit = Limit, |
| sort = Sort, |
| raw_bookmark = RawBookmark |
| } = QueryArgs, |
| Replacements = fabric_view:get_shard_replacements(DbName, Workers), |
| State = #state{ |
| limit = Limit, |
| sort = Sort, |
| top_docs = #top_docs{total_hits = 0, hits = []}, |
| counters = Counters, |
| start_args = [DDoc, IndexName, QueryArgs], |
| replacements = Replacements, |
| ring_opts = RingOpts |
| }, |
| RexiMon = fabric_util:create_monitors(Workers), |
| try |
| rexi_utils:recv( |
| Workers, |
| #shard.ref, |
| fun handle_message/3, |
| State, |
| infinity, |
| 1000 * 60 * 60 |
| ) |
| of |
| {ok, Result} -> |
| #state{top_docs = TopDocs, await_time = AwaitTime} = Result, |
| #top_docs{ |
| total_hits = TotalHits, |
| hits = Hits, |
| counts = Counts, |
| ranges = Ranges |
| } = TopDocs, |
| case RawBookmark of |
| true -> |
| {ok, Bookmark, TotalHits, Hits, Counts, Ranges, AwaitTime}; |
| false -> |
| Bookmark1 = dreyfus_bookmark:update(Sort, Bookmark, Hits), |
| Hits1 = remove_sortable(Hits), |
| {ok, Bookmark1, TotalHits, Hits1, Counts, Ranges, AwaitTime} |
| end; |
| {error, Reason} -> |
| {error, Reason} |
| after |
| rexi_monitor:stop(RexiMon), |
| fabric_util:cleanup(Workers) |
| end. |
| |
| handle_message({ok, #top_docs{} = NewTopDocs}, Shard, State0) -> |
| State = upgrade_state(State0), |
| #state{top_docs = TopDocs, limit = Limit, sort = Sort} = State, |
| case fabric_dict:lookup_element(Shard, State#state.counters) of |
| undefined -> |
| %% already heard from someone else in this range |
| {ok, State}; |
| nil -> |
| C1 = fabric_dict:store(Shard, ok, State#state.counters), |
| C2 = fabric_view:remove_overlapping_shards(Shard, C1), |
| Sortable = make_sortable(Shard, NewTopDocs), |
| MergedTopDocs = merge_top_docs(TopDocs, Sortable, Limit, Sort), |
| State1 = State#state{ |
| counters = C2, |
| top_docs = MergedTopDocs |
| }, |
| case fabric_dict:any(nil, C2) of |
| true -> |
| {ok, State1}; |
| false -> |
| {stop, State1} |
| end |
| end; |
| % upgrade clause |
| handle_message({ok, {top_docs, UpdateSeq, TotalHits, Hits}}, Shard, State) -> |
| TopDocs = #top_docs{ |
| update_seq = UpdateSeq, |
| total_hits = TotalHits, |
| hits = Hits |
| }, |
| handle_message({ok, TopDocs}, Shard, State); |
| handle_message({await_time, Time}, Shard, State) -> |
| case fabric_dict:lookup_element(Shard, State#state.counters) of |
| undefined -> |
| %% already heard from someone else in this range |
| {ok, State}; |
| nil -> |
| State1 = State#state{ |
| await_time = max(Time, State#state.await_time) |
| }, |
| {ok, State1} |
| end; |
| handle_message(Error, Worker, State0) -> |
| State = upgrade_state(State0), |
| case |
| dreyfus_fabric:handle_error_message( |
| Error, |
| Worker, |
| State#state.counters, |
| State#state.replacements, |
| search, |
| State#state.start_args, |
| State#state.ring_opts |
| ) |
| of |
| {ok, Counters} -> |
| {ok, State#state{counters = Counters}}; |
| {new_refs, NewRefs, NewCounters, NewReplacements} -> |
| NewState = State#state{ |
| counters = NewCounters, |
| replacements = NewReplacements |
| }, |
| {new_refs, NewRefs, NewState}; |
| Else -> |
| Else |
| end. |
| |
| find_replacement_shards(#shard{range = Range}, AllShards) -> |
| [Shard || Shard <- AllShards, Shard#shard.range =:= Range]. |
| |
| make_sortable(Shard, #top_docs{} = TopDocs) -> |
| Hits = make_sortable(Shard, TopDocs#top_docs.hits), |
| TopDocs#top_docs{hits = Hits}; |
| make_sortable(Shard, List) when is_list(List) -> |
| make_sortable(Shard, List, []). |
| |
| make_sortable(_, [], Acc) -> |
| lists:reverse(Acc); |
| make_sortable(Shard, [#hit{} = Hit | Rest], Acc) -> |
| make_sortable(Shard, Rest, [#sortable{item = Hit, order = Hit#hit.order, shard = Shard} | Acc]). |
| |
| remove_sortable(List) -> |
| remove_sortable(List, []). |
| |
| remove_sortable([], Acc) -> |
| lists:reverse(Acc); |
| remove_sortable([#sortable{item = Item} | Rest], Acc) -> |
| remove_sortable(Rest, [Item | Acc]). |
| |
| merge_top_docs(#top_docs{} = TopDocsA, #top_docs{} = TopDocsB, Limit, Sort) -> |
| MergedTotal = sum_element(#top_docs.total_hits, TopDocsA, TopDocsB), |
| MergedHits = lists:sublist( |
| dreyfus_util:sort( |
| Sort, |
| TopDocsA#top_docs.hits ++ TopDocsB#top_docs.hits |
| ), |
| Limit |
| ), |
| MergedCounts = merge_facets(TopDocsA#top_docs.counts, TopDocsB#top_docs.counts), |
| MergedRanges = merge_facets(TopDocsA#top_docs.ranges, TopDocsB#top_docs.ranges), |
| #top_docs{ |
| total_hits = MergedTotal, |
| hits = MergedHits, |
| counts = MergedCounts, |
| ranges = MergedRanges |
| }. |
| |
| merge_facets(undefined, undefined) -> |
| undefined; |
| merge_facets(undefined, Facets) -> |
| sort_facets(Facets); |
| merge_facets(Facets, undefined) -> |
| sort_facets(Facets); |
| merge_facets(FacetsA, FacetsB) -> |
| merge_facets_int(sort_facets(FacetsA), sort_facets(FacetsB)). |
| |
| merge_facets_int([], []) -> |
| []; |
| merge_facets_int(FacetsA, []) -> |
| FacetsA; |
| merge_facets_int([], FacetsB) -> |
| FacetsB; |
| merge_facets_int([{KA, _, _} = A | RA], [{KB, _, _} | _] = FB) when KA < KB -> |
| [A | merge_facets_int(RA, FB)]; |
| merge_facets_int([{KA, VA, CA} | RA], [{KB, VB, CB} | RB]) when KA =:= KB -> |
| [{KA, VA + VB, merge_facets_int(CA, CB)} | merge_facets_int(RA, RB)]; |
| merge_facets_int([{KA, _, _} | _] = FA, [{KB, _, _} = B | RB]) when KA > KB -> |
| [B | merge_facets_int(FA, RB)]. |
| |
| sort_facets([]) -> |
| []; |
| sort_facets(Facets) -> |
| lists:sort( |
| lists:map( |
| fun({K, V, C}) -> {K, V, sort_facets(C)} end, |
| Facets |
| ) |
| ). |
| |
| sum_element(N, T1, T2) -> |
| element(N, T1) + element(N, T2). |
| |
| upgrade_state({state, Limit, Sort, TopDocs, Counters}) -> |
| #state{ |
| limit = Limit, |
| sort = Sort, |
| top_docs = TopDocs, |
| counters = Counters, |
| replacements = [] |
| }; |
| upgrade_state(#state{} = State) -> |
| State. |
| |
| -ifdef(TEST). |
| -include_lib("eunit/include/eunit.hrl"). |
| |
| merge_facets_test() -> |
| % empty list is a no-op |
| ?assertEqual([{foo, 1.0, []}], merge_facets([{foo, 1.0, []}], [])), |
| |
| % one level, one key |
| ?assertEqual( |
| [{foo, 3.0, []}], |
| merge_facets( |
| [{foo, 1.0, []}], |
| [{foo, 2.0, []}] |
| ) |
| ), |
| |
| % one level, two keys |
| ?assertEqual( |
| [{bar, 6.0, []}, {foo, 9.0, []}], |
| merge_facets( |
| [{foo, 1.0, []}, {bar, 2.0, []}], |
| [{bar, 4.0, []}, {foo, 8.0, []}] |
| ) |
| ), |
| |
| % multi level, multi keys |
| ?assertEqual( |
| [{foo, 2.0, [{bar, 2.0, []}]}], |
| merge_facets( |
| [{foo, 1.0, [{bar, 1.0, []}]}], |
| [{foo, 1.0, [{bar, 1.0, []}]}] |
| ) |
| ), |
| |
| ?assertEqual( |
| [{foo, 5.0, [{bar, 7.0, [{bar, 1.0, []}, {baz, 3.0, []}, {foo, 6.5, []}]}]}], |
| merge_facets( |
| [{foo, 1.0, [{bar, 2.0, [{baz, 3.0, []}, {foo, 0.5, []}]}]}], |
| [{foo, 4.0, [{bar, 5.0, [{foo, 6.0, []}, {bar, 1.0, []}]}]}] |
| ) |
| ). |
| |
| -endif. |