blob: 2167486788951594d6a3f1a0a9fd2dcaca4a2793 [file] [log] [blame]
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-module(nouveau_fabric_search).
-export([go/4]).
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-record(state, {
limit,
sort,
counters,
search_results
}).
go(DbName, GroupId, IndexName, QueryArgs0) when is_binary(GroupId) ->
{ok, DDoc} = fabric:open_doc(
DbName,
<<"_design/", GroupId/binary>>,
[ejson_body]
),
go(DbName, DDoc, IndexName, QueryArgs0);
go(DbName, #doc{} = DDoc, IndexName, QueryArgs0) ->
{ok, Index} = nouveau_util:design_doc_to_index(DbName, DDoc, IndexName),
Shards = get_shards(DbName, QueryArgs0),
{PackedBookmark, #{limit := Limit, sort := Sort} = QueryArgs1} =
maps:take(bookmark, QueryArgs0),
Bookmark = nouveau_bookmark:unpack(DbName, PackedBookmark),
Counters0 = lists:map(
fun(#shard{} = Shard) ->
After = maps:get(Shard#shard.range, Bookmark, null),
Ref = rexi:cast(
Shard#shard.node,
{nouveau_rpc, search, [Shard#shard.name, Index, QueryArgs1#{'after' => After}]}
),
Shard#shard{ref = Ref}
end,
Shards
),
Counters = fabric_dict:init(Counters0, nil),
Workers = fabric_dict:fetch_keys(Counters),
RexiMon = fabric_util:create_monitors(Workers),
State = #state{
limit = Limit,
sort = Sort,
counters = Counters,
search_results = #{}
},
try
rexi_utils:recv(
Workers,
#shard.ref,
fun handle_message/3,
State,
fabric_util:timeout("nouveau", "infinity"),
fabric_util:timeout("nouveau_permsg", "3600000")
)
of
{ok, SearchResults} ->
NewBookmark = nouveau_bookmark:update(DbName, Bookmark, SearchResults),
{ok, simplify_hits(SearchResults#{bookmark => NewBookmark})};
{error, Reason} ->
{error, Reason}
after
rexi_monitor:stop(RexiMon),
fabric_util:cleanup(Workers)
end.
handle_message({ok, Response}, Shard, State) ->
case fabric_dict:lookup_element(Shard, State#state.counters) of
undefined ->
%% already heard from someone else in this range
{ok, State};
nil ->
SearchResults = merge_search_results(
State#state.search_results, Shard, Response, State
),
Counters1 = fabric_dict:store(Shard, ok, State#state.counters),
Counters2 = fabric_view:remove_overlapping_shards(Shard, Counters1),
State1 = State#state{
counters = Counters2, search_results = SearchResults
},
case fabric_dict:any(nil, Counters2) of
true ->
{ok, State1};
false ->
{stop, SearchResults}
end
end;
handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Shard, State) ->
#state{counters = Counters0} = State,
case fabric_util:remove_down_workers(Counters0, NodeRef, []) of
{ok, Counters1} ->
{ok, Counters1};
error ->
{error, {nodedown, <<"progress not possible">>}}
end;
handle_message({error, Reason}, _Shard, _State) ->
{error, Reason};
handle_message(Else, _Shard, _State) ->
{error, Else}.
merge_search_results(Acc, #shard{} = Shard, Response, #state{} = State) ->
#{
<<"index_versions">> => merge_index_version(
maps:get(<<"index_versions">>, Acc, #{}),
Shard,
maps:get(<<"index_version">>, Response)
),
<<"total_hits">> => merge_total_hits(
maps:get(<<"total_hits">>, Acc, 0), maps:get(<<"total_hits">>, Response, 0)
),
<<"total_hits_relation">> => merge_total_hits_relation(
maps:get(<<"total_hits_relation">>, Acc, null),
maps:get(<<"total_hits_relation">>, Response, null)
),
<<"hits">> => merge_hits(
maps:get(<<"hits">>, Acc, []),
maps:get(<<"hits">>, Response, []),
State#state.sort,
State#state.limit
),
<<"counts">> => merge_facets(
maps:get(<<"counts">>, Acc, null),
maps:get(<<"counts">>, Response, null),
State#state.limit
),
<<"ranges">> => merge_facets(
maps:get(<<"ranges">>, Acc, null),
maps:get(<<"ranges">>, Response, null),
State#state.limit
)
}.
merge_index_version(Acc, #shard{} = Shard, IndexVersion) ->
Range = ?l2b(io_lib:format("~16.16b-~.16b", Shard#shard.range)),
Acc#{Range => IndexVersion}.
merge_total_hits(TotalHitsA, TotalHitsB) ->
TotalHitsA + TotalHitsB.
merge_total_hits_relation(A, B) when
A == <<"GREATER_THAN_OR_EQUAL_TO">>; B == <<"GREATER_THAN_OR_EQUAL_TO">>
->
<<"GREATER_THAN_OR_EQUAL_TO">>;
merge_total_hits_relation(A, B) when A == <<"EQUAL_TO">>; B == <<"EQUAL_TO">> ->
<<"EQUAL_TO">>;
merge_total_hits_relation(null, null) ->
%% not supported in selected Lucene version.
null.
merge_hits(HitsA, HitsB, Sort, Limit) ->
MergedHits = lists:merge(merge_fun(Sort), HitsA, HitsB),
lists:sublist(MergedHits, Limit).
simplify_hits(SearchResults) ->
#{<<"hits">> := Hits} = SearchResults,
SearchResults#{<<"hits">> => lists:map(fun simplify_hit/1, Hits)}.
simplify_hit(#{} = Hit) ->
#{<<"fields">> := Fields} = Hit,
Hit#{<<"fields">> => simplify_fields(Fields)}.
simplify_fields(Fields) when is_list(Fields) ->
Fun = fun(Field, Acc) ->
{Key, Value} = simplify_field(Field),
Acc#{Key => Value}
end,
lists:foldl(Fun, #{}, Fields).
simplify_field(#{<<"@type">> := <<"stored">>} = Field) ->
#{<<"name">> := Key, <<"value">> := Value} = Field,
{Key, Value}.
merge_fun(Sort) ->
fun(HitA, HitB) ->
OrderA = maps:get(<<"order">>, HitA),
OrderB = maps:get(<<"order">>, HitB),
compare_order(Sort, OrderA, OrderB)
end.
%% no sort order specified
compare_order(null, [A | ARest], [B | BRest]) ->
case couch_ejson_compare:less(convert_item(A), convert_item(B)) of
0 ->
compare_order(null, ARest, BRest);
Less ->
Less < 1
end;
%% server-side adds _id on the end of sort order if not present
compare_order([], [A], [B]) ->
couch_ejson_compare:less(convert_item(A), convert_item(B)) < 1;
%% reverse order specified
compare_order([<<"-", _/binary>> | SortRest], [A | ARest], [B | BRest]) ->
case couch_ejson_compare:less(convert_item(B), convert_item(A)) of
0 ->
compare_order(SortRest, ARest, BRest);
Less ->
Less < 1
end;
%% forward order specified
compare_order([_ | SortRest], [A | ARest], [B | BRest]) ->
case couch_ejson_compare:less(convert_item(A), convert_item(B)) of
0 ->
compare_order(SortRest, ARest, BRest);
Less ->
Less < 1
end.
convert_item(Item) ->
case maps:get(<<"@type">>, Item) of
<<"bytes">> ->
base64:decode(maps:get(<<"value">>, Item));
_ ->
maps:get(<<"value">>, Item)
end.
merge_facets(FacetsA, null, _Limit) ->
FacetsA;
merge_facets(null, FacetsB, _Limit) ->
FacetsB;
merge_facets(FacetsA, FacetsB, _Limit) ->
Combiner = fun(_, V1, V2) -> maps:merge_with(fun(_, V3, V4) -> V3 + V4 end, V1, V2) end,
maps:merge_with(Combiner, FacetsA, FacetsB).
get_shards(DbName, #{partition := Partition}) when is_binary(Partition) ->
PartitionId = couch_partition:shard_key(Partition),
mem3:shards(DbName, PartitionId);
get_shards(DbName, _QueryArgs) ->
mem3:shards(DbName).