| %% |
| %% Licensed under the Apache License, Version 2.0 (the "License"); |
| %% you may not use this file except in compliance with the License. |
| %% You may obtain a copy of the License at |
| %% |
| %% http://www.apache.org/licenses/LICENSE-2.0 |
| %% |
| %% Unless required by applicable law or agreed to in writing, software |
| %% distributed under the License is distributed on an "AS IS" BASIS, |
| %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| %% See the License for the specific language governing permissions and |
| %% limitations under the License. |
| |
| %% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- |
| |
| -module(nouveau_fabric_search). |
| |
| -export([go/4]). |
| |
| -include_lib("mem3/include/mem3.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| -record(state, { |
| limit, |
| sort, |
| counters, |
| search_results |
| }). |
| |
| go(DbName, GroupId, IndexName, QueryArgs0) when is_binary(GroupId) -> |
| {ok, DDoc} = fabric:open_doc( |
| DbName, |
| <<"_design/", GroupId/binary>>, |
| [ejson_body] |
| ), |
| go(DbName, DDoc, IndexName, QueryArgs0); |
| go(DbName, #doc{} = DDoc, IndexName, QueryArgs0) -> |
| {ok, Index} = nouveau_util:design_doc_to_index(DbName, DDoc, IndexName), |
| Shards = get_shards(DbName, QueryArgs0), |
| {PackedBookmark, #{limit := Limit, sort := Sort} = QueryArgs1} = |
| maps:take(bookmark, QueryArgs0), |
| Bookmark = nouveau_bookmark:unpack(DbName, PackedBookmark), |
| Counters0 = lists:map( |
| fun(#shard{} = Shard) -> |
| After = maps:get(Shard#shard.range, Bookmark, null), |
| Ref = rexi:cast( |
| Shard#shard.node, |
| {nouveau_rpc, search, [Shard#shard.name, Index, QueryArgs1#{'after' => After}]} |
| ), |
| Shard#shard{ref = Ref} |
| end, |
| Shards |
| ), |
| Counters = fabric_dict:init(Counters0, nil), |
| Workers = fabric_dict:fetch_keys(Counters), |
| RexiMon = fabric_util:create_monitors(Workers), |
| State = #state{ |
| limit = Limit, |
| sort = Sort, |
| counters = Counters, |
| search_results = #{} |
| }, |
| try |
| rexi_utils:recv( |
| Workers, |
| #shard.ref, |
| fun handle_message/3, |
| State, |
| fabric_util:timeout("nouveau", "infinity"), |
| fabric_util:timeout("nouveau_permsg", "3600000") |
| ) |
| of |
| {ok, SearchResults} -> |
| NewBookmark = nouveau_bookmark:update(DbName, Bookmark, SearchResults), |
| {ok, simplify_hits(SearchResults#{bookmark => NewBookmark})}; |
| {error, Reason} -> |
| {error, Reason} |
| after |
| rexi_monitor:stop(RexiMon), |
| fabric_util:cleanup(Workers) |
| end. |
| |
| handle_message({ok, Response}, Shard, State) -> |
| case fabric_dict:lookup_element(Shard, State#state.counters) of |
| undefined -> |
| %% already heard from someone else in this range |
| {ok, State}; |
| nil -> |
| SearchResults = merge_search_results( |
| State#state.search_results, Shard, Response, State |
| ), |
| Counters1 = fabric_dict:store(Shard, ok, State#state.counters), |
| Counters2 = fabric_view:remove_overlapping_shards(Shard, Counters1), |
| State1 = State#state{ |
| counters = Counters2, search_results = SearchResults |
| }, |
| case fabric_dict:any(nil, Counters2) of |
| true -> |
| {ok, State1}; |
| false -> |
| {stop, SearchResults} |
| end |
| end; |
| handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, State) -> |
| #state{counters = Counters0} = State, |
| case fabric_util:remove_down_workers(Counters0, NodeRef, []) of |
| {ok, Counters1} -> |
| {ok, Counters1}; |
| error -> |
| {error, {nodedown, <<"progress not possible">>}} |
| end; |
| handle_message({error, Reason}, _Shard, _State) -> |
| {error, Reason}; |
| handle_message(Else, _Shard, _State) -> |
| {error, Else}. |
| |
| merge_search_results(Acc, #shard{} = Shard, Response, #state{} = State) -> |
| #{ |
| <<"index_versions">> => merge_index_version( |
| maps:get(<<"index_versions">>, Acc, #{}), |
| Shard, |
| maps:get(<<"index_version">>, Response) |
| ), |
| <<"total_hits">> => merge_total_hits( |
| maps:get(<<"total_hits">>, Acc, 0), maps:get(<<"total_hits">>, Response, 0) |
| ), |
| <<"total_hits_relation">> => merge_total_hits_relation( |
| maps:get(<<"total_hits_relation">>, Acc, null), |
| maps:get(<<"total_hits_relation">>, Response, null) |
| ), |
| <<"hits">> => merge_hits( |
| maps:get(<<"hits">>, Acc, []), |
| maps:get(<<"hits">>, Response, []), |
| State#state.sort, |
| State#state.limit |
| ), |
| <<"counts">> => merge_facets( |
| maps:get(<<"counts">>, Acc, null), |
| maps:get(<<"counts">>, Response, null), |
| State#state.limit |
| ), |
| <<"ranges">> => merge_facets( |
| maps:get(<<"ranges">>, Acc, null), |
| maps:get(<<"ranges">>, Response, null), |
| State#state.limit |
| ) |
| }. |
| |
| merge_index_version(Acc, #shard{} = Shard, IndexVersion) -> |
| Range = ?l2b(io_lib:format("~16.16b-~.16b", Shard#shard.range)), |
| Acc#{Range => IndexVersion}. |
| |
| merge_total_hits(TotalHitsA, TotalHitsB) -> |
| TotalHitsA + TotalHitsB. |
| |
| merge_total_hits_relation(A, B) when |
| A == <<"GREATER_THAN_OR_EQUAL_TO">>; B == <<"GREATER_THAN_OR_EQUAL_TO">> |
| -> |
| <<"GREATER_THAN_OR_EQUAL_TO">>; |
| merge_total_hits_relation(A, B) when A == <<"EQUAL_TO">>; B == <<"EQUAL_TO">> -> |
| <<"EQUAL_TO">>; |
| merge_total_hits_relation(null, null) -> |
| %% not supported in selected Lucene version. |
| null. |
| |
| merge_hits(HitsA, HitsB, Sort, Limit) -> |
| MergedHits = lists:merge(merge_fun(Sort), HitsA, HitsB), |
| lists:sublist(MergedHits, Limit). |
| |
| simplify_hits(SearchResults) -> |
| #{<<"hits">> := Hits} = SearchResults, |
| SearchResults#{<<"hits">> => lists:map(fun simplify_hit/1, Hits)}. |
| |
| simplify_hit(#{} = Hit) -> |
| #{<<"fields">> := Fields} = Hit, |
| Hit#{<<"fields">> => simplify_fields(Fields)}. |
| |
| simplify_fields(Fields) when is_list(Fields) -> |
| Fun = fun(Field, Acc) -> |
| {Key, Value} = simplify_field(Field), |
| Acc#{Key => Value} |
| end, |
| lists:foldl(Fun, #{}, Fields). |
| |
| simplify_field(#{<<"@type">> := <<"stored">>} = Field) -> |
| #{<<"name">> := Key, <<"value">> := Value} = Field, |
| {Key, Value}. |
| |
| merge_fun(Sort) -> |
| fun(HitA, HitB) -> |
| OrderA = maps:get(<<"order">>, HitA), |
| OrderB = maps:get(<<"order">>, HitB), |
| compare_order(Sort, OrderA, OrderB) |
| end. |
| |
| %% no sort order specified |
| compare_order(null, [A | ARest], [B | BRest]) -> |
| case couch_ejson_compare:less(convert_item(A), convert_item(B)) of |
| 0 -> |
| compare_order(null, ARest, BRest); |
| Less -> |
| Less < 1 |
| end; |
| %% server-side adds _id on the end of sort order if not present |
| compare_order([], [A], [B]) -> |
| couch_ejson_compare:less(convert_item(A), convert_item(B)) < 1; |
| %% reverse order specified |
| compare_order([<<"-", _/binary>> | SortRest], [A | ARest], [B | BRest]) -> |
| case couch_ejson_compare:less(convert_item(B), convert_item(A)) of |
| 0 -> |
| compare_order(SortRest, ARest, BRest); |
| Less -> |
| Less < 1 |
| end; |
| %% forward order specified |
| compare_order([_ | SortRest], [A | ARest], [B | BRest]) -> |
| case couch_ejson_compare:less(convert_item(A), convert_item(B)) of |
| 0 -> |
| compare_order(SortRest, ARest, BRest); |
| Less -> |
| Less < 1 |
| end. |
| |
| convert_item(Item) -> |
| case maps:get(<<"@type">>, Item) of |
| <<"bytes">> -> |
| base64:decode(maps:get(<<"value">>, Item)); |
| _ -> |
| maps:get(<<"value">>, Item) |
| end. |
| |
| merge_facets(FacetsA, null, _Limit) -> |
| FacetsA; |
| merge_facets(null, FacetsB, _Limit) -> |
| FacetsB; |
| merge_facets(FacetsA, FacetsB, _Limit) -> |
| Combiner = fun(_, V1, V2) -> maps:merge_with(fun(_, V3, V4) -> V3 + V4 end, V1, V2) end, |
| maps:merge_with(Combiner, FacetsA, FacetsB). |
| |
| get_shards(DbName, #{partition := Partition}) when is_binary(Partition) -> |
| PartitionId = couch_partition:shard_key(Partition), |
| mem3:shards(DbName, PartitionId); |
| get_shards(DbName, _QueryArgs) -> |
| mem3:shards(DbName). |