blob: 2bf560f37193d36eb1b97fbfdbe96f9cbf58401b [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
%% A dreyfus_index gen_server is linked to its clouseau twin.
-module(dreyfus_index).
-behaviour(gen_server).
-vsn(1).
-include_lib("couch/include/couch_db.hrl").
-include("dreyfus.hrl").
% public api.
-export([start_link/2, design_doc_to_index/2, await/2, search/2, info/1,
group1/2, group2/2,
design_doc_to_indexes/1]).
% gen_server api.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
% private definitions.
-record(state, {
dbname,
index,
updater_pid=nil,
index_pid=nil,
waiting_list=[]
}).
% exported for callback.
-export([search_int/2, group1_int/2, group2_int/2, info_int/1]).
% public functions.
start_link(DbName, Index) ->
proc_lib:start_link(?MODULE, init, [{DbName, Index}]).
await(Pid, MinSeq) ->
MFA = {gen_server, call, [Pid, {await, MinSeq}, infinity]},
dreyfus_util:time([index, await], MFA).
search(Pid0, QueryArgs) ->
Pid = to_index_pid(Pid0),
MFA = {?MODULE, search_int, [Pid, QueryArgs]},
dreyfus_util:time([index, search], MFA).
group1(Pid0, QueryArgs) ->
Pid = to_index_pid(Pid0),
MFA = {?MODULE, group1_int, [Pid, QueryArgs]},
dreyfus_util:time([index, group1], MFA).
group2(Pid0, QueryArgs) ->
Pid = to_index_pid(Pid0),
MFA = {?MODULE, group2_int, [Pid, QueryArgs]},
dreyfus_util:time([index, group2], MFA).
info(Pid0) ->
Pid = to_index_pid(Pid0),
MFA = {?MODULE, info_int, [Pid]},
dreyfus_util:time([index, info], MFA).
%% We either have a dreyfus_index gen_server pid or the remote
%% clouseau pid.
to_index_pid(Pid) ->
case node(Pid) == node() of
true -> gen_server:call(Pid, get_index_pid, infinity);
false -> Pid
end.
design_doc_to_indexes(#doc{body={Fields}}=Doc) ->
RawIndexes = couch_util:get_value(<<"indexes">>, Fields, {[]}),
case RawIndexes of
{IndexList} when is_list(IndexList) ->
{IndexNames, _} = lists:unzip(IndexList),
lists:flatmap(
fun(IndexName) ->
case (catch design_doc_to_index(Doc, IndexName)) of
{ok, #index{}=Index} -> [Index];
_ -> []
end
end,
IndexNames);
_ -> []
end.
% gen_server functions.
init({DbName, Index}) ->
process_flag(trap_exit, true),
case open_index(DbName, Index) of
{ok, Pid, Seq} ->
State=#state{
dbname=DbName,
index=Index#index{current_seq=Seq, dbname=DbName},
index_pid=Pid
},
case couch_db:open_int(DbName, []) of
{ok, Db} ->
try couch_db:monitor(Db) after couch_db:close(Db) end,
dreyfus_util:maybe_create_local_purge_doc(Db, Pid, Index),
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], State);
Error ->
proc_lib:init_ack(Error)
end;
Error ->
proc_lib:init_ack(Error)
end.
handle_call({await, RequestSeq}, From,
#state{
index=#index{dbname=DbName,name=IdxName,ddoc_id=DDocId,current_seq=Seq}=Index,
index_pid=IndexPid,
updater_pid=nil,
waiting_list=WaitList
}=State) when RequestSeq > Seq ->
DbName2 = mem3:dbname(DbName),
<<"_design/", GroupId/binary>> = DDocId,
NewState = case dreyfus_util:in_black_list(DbName2, GroupId, IdxName) of
false ->
UpPid = spawn_link(fun() ->
dreyfus_index_updater:update(IndexPid,Index)
end),
State#state{
updater_pid=UpPid,
waiting_list=[{From,RequestSeq}|WaitList]
};
_ ->
couch_log:notice("Index Blocked from Updating - db: ~p,"
" ddocid: ~p name: ~p", [DbName, DDocId, IdxName]),
State
end,
{noreply, NewState};
handle_call({await, RequestSeq}, _From,
#state{index=#index{current_seq=Seq}}=State) when RequestSeq =< Seq ->
{reply, {ok, State#state.index_pid, Seq}, State};
handle_call({await, RequestSeq}, From, #state{waiting_list=WaitList}=State) ->
{noreply, State#state{
waiting_list=[{From,RequestSeq}|WaitList]
}};
handle_call(get_index_pid, _From, State) -> % upgrade
{reply, State#state.index_pid, State};
handle_call({search, QueryArgs0}, _From, State) -> % obsolete
Reply = search_int(State#state.index_pid, QueryArgs0),
{reply, Reply, State};
handle_call({group1, QueryArgs0}, _From, State) -> % obsolete
Reply = group1_int(State#state.index_pid, QueryArgs0),
{reply, Reply, State};
handle_call({group2, QueryArgs0}, _From, State) -> % obsolete
Reply = group2_int(State#state.index_pid, QueryArgs0),
{reply, Reply, State};
handle_call(info, _From, State) -> % obsolete
Reply = info_int(State#state.index_pid),
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'EXIT', FromPid, {updated, NewSeq}},
#state{
index=#index{dbname=DbName,name=IdxName,ddoc_id=DDocId}=Index0,
index_pid=IndexPid,
updater_pid=UpPid,
waiting_list=WaitList
}=State) when UpPid == FromPid ->
Index = Index0#index{current_seq=NewSeq},
case reply_with_index(IndexPid, Index, WaitList) of
[] ->
{noreply, State#state{index=Index,
updater_pid=nil,
waiting_list=[]
}};
StillWaiting ->
DbName2 = mem3:dbname(DbName),
<<"_design/", GroupId/binary>> = DDocId,
Pid = case dreyfus_util:in_black_list(DbName2, GroupId, IdxName) of
true ->
couch_log:notice("Index Blocked from Updating - db: ~p, ddocid: ~p"
" name: ~p", [DbName, GroupId, IdxName]),
nil;
false ->
spawn_link(fun() ->
dreyfus_index_updater:update(IndexPid, Index)
end)
end,
{noreply, State#state{index=Index,
updater_pid=Pid,
waiting_list=StillWaiting
}}
end;
handle_info({'EXIT', _, {updated, _}}, State) ->
{noreply, State};
handle_info({'EXIT', FromPid, Reason}, #state{
index=Index,
index_pid=IndexPid,
waiting_list=WaitList
}=State) when FromPid == IndexPid ->
couch_log:notice(
"index for ~p closed with reason ~p", [index_name(Index), Reason]),
[gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList],
{stop, normal, State};
handle_info({'EXIT', FromPid, Reason}, #state{
index=Index,
updater_pid=UpPid,
waiting_list=WaitList
}=State) when FromPid == UpPid ->
couch_log:info("Shutting down index server ~p, updater ~p closing w/ reason ~w",
[index_name(Index), UpPid, Reason]),
[gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList],
{stop, normal, State};
handle_info({'EXIT', Pid, Reason}, State) ->
% probably dreyfus_index_manager.
couch_log:notice("Unknown pid ~p closed with reason ~p", [Pid, Reason]),
{stop, normal, State};
handle_info({'DOWN',_,_,Pid,Reason}, #state{
index=Index,
waiting_list=WaitList
}=State) ->
couch_log:info("Shutting down index server ~p, db ~p closing w/ reason ~w",
[index_name(Index), Pid, Reason]),
[gen_server:reply(P, {error, Reason}) || {P, _} <- WaitList],
{stop, normal, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
% private functions.
open_index(DbName, #index{analyzer=Analyzer, sig=Sig}) ->
Path = <<DbName/binary,"/",Sig/binary>>,
case clouseau_rpc:open_index(self(), Path, Analyzer) of
{ok, Pid} ->
case clouseau_rpc:get_update_seq(Pid) of
{ok, Seq} ->
{ok, Pid, Seq};
Error ->
Error
end;
Error ->
Error
end.
design_doc_to_index(#doc{id=Id,body={Fields}}, IndexName) ->
Language = couch_util:get_value(<<"language">>, Fields, <<"javascript">>),
{RawIndexes} = couch_util:get_value(<<"indexes">>, Fields, {[]}),
InvalidDDocError = {invalid_design_doc,
<<"index `", IndexName/binary, "` must have parameter `index`">>},
case lists:keyfind(IndexName, 1, RawIndexes) of
false ->
{error, {not_found, <<IndexName/binary, " not found.">>}};
{IndexName, {Index}} ->
Analyzer = couch_util:get_value(<<"analyzer">>, Index, <<"standard">>),
case couch_util:get_value(<<"index">>, Index) of
undefined ->
{error, InvalidDDocError};
Def ->
Sig = ?l2b(couch_util:to_hex(couch_hash:md5_hash(
term_to_binary({Analyzer, Def})))),
{ok, #index{
analyzer=Analyzer,
ddoc_id=Id,
def=Def,
def_lang=Language,
name=IndexName,
sig=Sig}}
end;
_ ->
{error, InvalidDDocError}
end.
reply_with_index(IndexPid, Index, WaitList) ->
reply_with_index(IndexPid, Index, WaitList, []).
reply_with_index(_IndexPid, _Index, [], Acc) ->
Acc;
reply_with_index(IndexPid, #index{current_seq=IndexSeq}=Index, [{Pid, Seq}|Rest], Acc) when Seq =< IndexSeq ->
gen_server:reply(Pid, {ok, IndexPid, IndexSeq}),
reply_with_index(IndexPid, Index, Rest, Acc);
reply_with_index(IndexPid, Index, [{Pid, Seq}|Rest], Acc) ->
reply_with_index(IndexPid, Index, Rest, [{Pid, Seq}|Acc]).
index_name(#index{dbname=DbName,ddoc_id=DDocId,name=IndexName}) ->
<<DbName/binary, " ", DDocId/binary, " ", IndexName/binary>>.
args_to_proplist(#index_query_args{} = Args) ->
[
{'query', Args#index_query_args.q},
{partition, Args#index_query_args.partition},
{limit, Args#index_query_args.limit},
{refresh, Args#index_query_args.stale =:= false},
{'after', Args#index_query_args.bookmark},
{sort, Args#index_query_args.sort},
{include_fields, Args#index_query_args.include_fields},
{counts, Args#index_query_args.counts},
{ranges, Args#index_query_args.ranges},
{drilldown, Args#index_query_args.drilldown},
{highlight_fields, Args#index_query_args.highlight_fields},
{highlight_pre_tag, Args#index_query_args.highlight_pre_tag},
{highlight_post_tag, Args#index_query_args.highlight_post_tag},
{highlight_number, Args#index_query_args.highlight_number},
{highlight_size, Args#index_query_args.highlight_size}
].
args_to_proplist2(#index_query_args{} = Args) ->
[
{'query', Args#index_query_args.q},
{field, Args#index_query_args.grouping#grouping.by},
{refresh, Args#index_query_args.stale =:= false},
{groups, Args#index_query_args.grouping#grouping.groups},
{group_sort, Args#index_query_args.grouping#grouping.sort},
{sort, Args#index_query_args.sort},
{limit, Args#index_query_args.limit},
{include_fields, Args#index_query_args.include_fields},
{highlight_fields, Args#index_query_args.highlight_fields},
{highlight_pre_tag, Args#index_query_args.highlight_pre_tag},
{highlight_post_tag, Args#index_query_args.highlight_post_tag},
{highlight_number, Args#index_query_args.highlight_number},
{highlight_size, Args#index_query_args.highlight_size}
].
search_int(Pid, QueryArgs0) ->
QueryArgs = dreyfus_util:upgrade(QueryArgs0),
Props = args_to_proplist(QueryArgs),
clouseau_rpc:search(Pid, Props).
group1_int(Pid, QueryArgs0) ->
QueryArgs = dreyfus_util:upgrade(QueryArgs0),
#index_query_args{
q = Query,
stale = Stale,
grouping = #grouping{
by = GroupBy,
offset = Offset,
limit = Limit,
sort = Sort
}
} = QueryArgs,
clouseau_rpc:group1(Pid, Query, GroupBy, Stale =:= false, Sort,
Offset, Limit).
group2_int(Pid, QueryArgs0) ->
QueryArgs = dreyfus_util:upgrade(QueryArgs0),
Props = args_to_proplist2(QueryArgs),
clouseau_rpc:group2(Pid, Props).
info_int(Pid) ->
clouseau_rpc:info(Pid).