A request from chttpd goes to dreyfus_httpd.
dreyfus_httpd:
parse_index_params
& validate_index_query
.dreyfus_fabric:
Shards = dreyfus_util:get_shards(DbName, QueryArgs)
, Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, search, [DDoc, IndexName, dreyfus_util:export(QueryArgs)])
rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State, infinity, 1000 * 60 * 60)
dreyfus_rpc:
dreyfus_index_manager:get_index(DbName, Index)
to get an index. dreyfus_index_manager will spawn a process of creating an index if the index doesn't exist.dreyfus_index:await(Pid, MinSeq)
.dreyfus_index:Fun(Pid, QueryArgs)
with a corresponding search request.dreyfus_index:
clouseau_rpc:search
.clouseau_rp:
ioq:call(Ref, Msg, erlang:get(io_priority))
to run search on clouseau nodes using Lucene.top_docs are returned from Lucene
top_docs are passed to dreyfus_index
top_docs are passed to dreyfus_rpc
dreyfus_rpc processes pass their individual top_docs as a reply rexi:reply(Result)
to the initial dreyfus_fabric process that spawned them.
dreyfus_fabric merges documents from all shards: MergedTopDocs = merge_top_docs(TopDocs, Sortable, Limit, Sort)
and returns the results to dreyfus_httpd.
dreyfus_httpd returns the formatted results to chttpd through send_json(..)
During a search request, before dreyfus_rpc calls dreyfus_index:search, dreyfus_rpc first initiates the updating of Lucene indexes. It does it in the following way:
The last sequence number (signifying the number of the last change in the database) in calculated: {_LastSeq, MinSeq} = calculate_seqs(Db, Stale)
. For the stale queries (queries that don‘t need to reflect recent changes in the database), MinSeq will be 0, meaning that they don’t need to initiate update of the index, before returning query results. The meaning of 0 is ‘wait until index is at least at update_seq 0’ which is true even for empty indexes.
Function call dreyfus_index:design_doc_to_index(DDoc, IndexName)
returns a record representation of an index: #index{ analyzer=Analyzer, ddoc_id=Id, def=Def, def_lang=Language, name=IndexName, sig=Sig}
. Sig
here is a hashed version of an index function and an analyzer represented in a Javascript function in a design document. Sig
is used to check if an index description is changed, and the index needs to be reconstructed.
Function call dreyfus_index_manager:get_index(DbName, Index)
will return Pid of the corresponding to this index dreyfus_index process. dreyfus_index_manager stores all the dreyfus_index processes for all indexes in the storage: ets:new(?BY_SIG, [set, private, named_table])
. If the dreyfus_index process of the given index exists in the ets ?BY_SIG, it will be returned. If it doesn't exist, a new dreyfus_index process will be spawned. For this, dreyfus_index_manager in the handle_call({get_index,..)
will return {noreply, State};
to not block gen_server, and will transfer handling creation of a new index process to the spawned process - spawn_link(fun() -> new_index(DbName, Index) end)
, remembering the Pid of the caller in the ets ?BY_SIG. new_index
will create a new index process, sending open_ok
message to the dreyfus_index_manager gen_server. handle_call({open_ok,..)
will retrieve the Pid - From
of the original caller, and send a reply to this caller, a message containing a Pid of the created index process - NewPid. Calling add_to_ets(NewPid, DbName, Sig)
will update two ets ?BY_SIG and ?BY_Pid.
dreyfus_index:await(Pid, MinSeq)
will initiate the update of the index, if the requested MinSeq is bigger than the current Seq stored in the index. It will do this by calling dreyfus_index_updater:update(IndexPid, Index)
. Dreyfus_index_updater will load all documents, modified since last seq stored in the drefus index, and for every document will call clouseau_rpc:delete
to delete documents in Java Lucene Index, or clouseau_rpc:update
to update an index in Java Lucene Index.