blob: 57de204f2d9592ee683e42d182bb0fa41c860510 [file] [log] [blame]
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
-module(nouveau_api).
-include("nouveau.hrl").
-export([
analyze/2,
index_info/1,
create_index/2,
delete_path/1,
delete_path/2,
delete_doc_async/5,
purge_doc/5,
update_doc_async/7,
search/2,
set_purge_seq/4,
set_update_seq/4,
drain_async_responses/2,
jaxrs_error/2
]).
-define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}).
analyze(Text, Analyzer) when
is_binary(Text), is_binary(Analyzer)
->
ReqBody = {[{<<"text">>, Text}, {<<"analyzer">>, Analyzer}]},
Resp = send_if_enabled(
nouveau_util:nouveau_url() ++ "/analyze",
[?JSON_CONTENT_TYPE],
post,
jiffy:encode(ReqBody)
),
case Resp of
{ok, "200", _, RespBody} ->
Json = jiffy:decode(RespBody, [return_maps]),
{ok, maps:get(<<"tokens">>, Json)};
{ok, StatusCode, _, RespBody} ->
{error, jaxrs_error(StatusCode, RespBody)};
{error, Reason} ->
send_error(Reason)
end;
analyze(_, _) ->
{error, {bad_request, <<"'text' and 'analyzer' fields must be non-empty strings">>}}.
index_info(#index{} = Index) ->
Resp = send_if_enabled(index_url(Index), [], get),
case Resp of
{ok, "200", _, RespBody} ->
{ok, jiffy:decode(RespBody, [return_maps])};
{ok, StatusCode, _, RespBody} ->
{error, jaxrs_error(StatusCode, RespBody)};
{error, Reason} ->
send_error(Reason)
end.
create_index(#index{} = Index, IndexDefinition) ->
Resp = send_if_enabled(
index_url(Index), [?JSON_CONTENT_TYPE], put, jiffy:encode(IndexDefinition)
),
case Resp of
{ok, "204", _, _} ->
ok;
{ok, StatusCode, _, RespBody} ->
{error, jaxrs_error(StatusCode, RespBody)};
{error, Reason} ->
send_error(Reason)
end.
delete_path(Path) ->
delete_path(Path, []).
delete_path(Path, Exclusions) when
is_binary(Path), is_list(Exclusions)
->
Resp = send_if_enabled(
index_path(Path), [?JSON_CONTENT_TYPE], delete, jiffy:encode(Exclusions)
),
case Resp of
{ok, "204", _, _} ->
ok;
{ok, StatusCode, _, RespBody} ->
{error, jaxrs_error(StatusCode, RespBody)};
{error, Reason} ->
send_error(Reason)
end.
delete_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq) when
is_pid(ConnPid),
is_binary(DocId),
is_integer(MatchSeq),
MatchSeq >= 0,
is_integer(UpdateSeq),
UpdateSeq > 0
->
ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false},
send_direct_if_enabled(
ConnPid,
doc_url(Index, DocId),
[?JSON_CONTENT_TYPE],
delete,
jiffy:encode(ReqBody),
[
{stream_to, self()}
]
).
purge_doc(ConnPid, #index{} = Index, DocId, MatchSeq, PurgeSeq) when
is_pid(ConnPid),
is_binary(DocId),
is_integer(MatchSeq),
MatchSeq >= 0,
is_integer(PurgeSeq),
PurgeSeq > 0
->
ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true},
Resp = send_direct_if_enabled(
ConnPid, doc_url(Index, DocId), [?JSON_CONTENT_TYPE], delete, jiffy:encode(ReqBody), []
),
case Resp of
{ok, "204", _, _} ->
ok;
{ok, StatusCode, _, RespBody} ->
{error, jaxrs_error(StatusCode, RespBody)};
{error, Reason} ->
send_error(Reason)
end.
update_doc_async(ConnPid, #index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when
is_pid(ConnPid),
is_binary(DocId),
is_integer(MatchSeq),
MatchSeq >= 0,
is_integer(UpdateSeq),
UpdateSeq > 0,
(is_binary(Partition) orelse Partition == null),
is_list(Fields)
->
ReqBody = #{
match_seq => MatchSeq,
seq => UpdateSeq,
partition => Partition,
fields => Fields
},
send_direct_if_enabled(
ConnPid,
doc_url(Index, DocId),
[?JSON_CONTENT_TYPE],
put,
jiffy:encode(ReqBody),
[
{stream_to, self()}
]
).
search(#index{} = Index, QueryArgs) ->
Resp = send_if_enabled(
search_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(QueryArgs)
),
case Resp of
{ok, "200", _, RespBody} ->
{ok, jiffy:decode(RespBody, [return_maps])};
{ok, StatusCode, _, RespBody} ->
{error, jaxrs_error(StatusCode, RespBody)};
{error, Reason} ->
send_error(Reason)
end.
set_update_seq(ConnPid, #index{} = Index, MatchSeq, UpdateSeq) ->
ReqBody = #{
match_update_seq => MatchSeq,
update_seq => UpdateSeq
},
set_seq(ConnPid, Index, ReqBody).
set_purge_seq(ConnPid, #index{} = Index, MatchSeq, PurgeSeq) ->
ReqBody = #{
match_purge_seq => MatchSeq,
purge_seq => PurgeSeq
},
set_seq(ConnPid, Index, ReqBody).
set_seq(ConnPid, #index{} = Index, ReqBody) ->
Resp = send_direct_if_enabled(
ConnPid, index_url(Index), [?JSON_CONTENT_TYPE], post, jiffy:encode(ReqBody), []
),
case Resp of
{ok, "204", _, _} ->
ok;
{ok, StatusCode, _, RespBody} ->
{error, jaxrs_error(StatusCode, RespBody)};
{error, Reason} ->
send_error(Reason)
end.
%% wait for enough async responses to reduce the Queue to Min length.
drain_async_responses(Queue0, Min) when Min >= 0 ->
case queue:len(Queue0) > Min of
true ->
{{value, ReqId}, Queue1} = queue:out(Queue0),
wait_for_response(ReqId),
drain_async_responses(Queue1, Min);
false ->
Queue0
end.
wait_for_response(ReqId) ->
case drain_async_response(ReqId) of
{ok, "204", _Headers, _Body} ->
ok;
{ok, StatusCode, _Headers, RespBody} ->
exit({error, jaxrs_error(StatusCode, RespBody)})
end.
drain_async_response(ReqId) ->
drain_async_response(ReqId, undefined, undefined, undefined).
drain_async_response(ReqId, Code0, Headers0, Body0) ->
receive
{ibrowse_async_headers, ReqId, Code1, Headers1} ->
drain_async_response(ReqId, Code1, Headers1, Body0);
{ibrowse_async_response, ReqId, Body1} ->
drain_async_response(ReqId, Code0, Headers0, Body1);
{ibrowse_async_response_end, ReqId} ->
{ok, Code0, Headers0, Body0}
end.
%% private functions
index_path(Path) ->
lists:flatten(
io_lib:format(
"~s/index/~s",
[
nouveau_util:nouveau_url(),
couch_util:url_encode(Path)
]
)
).
index_url(#index{} = Index) ->
lists:flatten(
io_lib:format(
"~s/index/~s",
[
nouveau_util:nouveau_url(),
couch_util:url_encode(nouveau_util:index_name(Index))
]
)
).
doc_url(#index{} = Index, DocId) ->
lists:flatten(
io_lib:format(
"~s/index/~s/doc/~s",
[
nouveau_util:nouveau_url(),
couch_util:url_encode(nouveau_util:index_name(Index)),
couch_util:url_encode(DocId)
]
)
).
search_url(IndexName) ->
index_url(IndexName) ++ "/search".
jaxrs_error("400", Body) ->
{bad_request, message(Body)};
jaxrs_error("404", Body) ->
{not_found, message(Body)};
jaxrs_error("405", Body) ->
{method_not_allowed, message(Body)};
jaxrs_error("409", Body) ->
{conflict, message(Body)};
jaxrs_error("417", Body) ->
{expectation_failed, message(Body)};
jaxrs_error("422", Body) ->
{bad_request, lists:join(" and ", errors(Body))};
jaxrs_error("500", Body) ->
{internal_server_error, message(Body)}.
send_error({conn_failed, _}) ->
{error, {service_unavailable, <<"Search service unavailable.">>}};
send_error(Reason) ->
{error, Reason}.
message(Body) ->
Json = jiffy:decode(Body, [return_maps]),
maps:get(<<"message">>, Json).
errors(Body) ->
Json = jiffy:decode(Body, [return_maps]),
maps:get(<<"errors">>, Json).
send_if_enabled(Url, Header, Method) ->
send_if_enabled(Url, Header, Method, []).
send_if_enabled(Url, Header, Method, Body) ->
send_if_enabled(Url, Header, Method, Body, []).
send_if_enabled(Url, Header, Method, Body, Options) ->
case nouveau:enabled() of
true ->
ibrowse:send_req(Url, Header, Method, Body, Options);
false ->
{error, nouveau_not_enabled}
end.
send_direct_if_enabled(ConnPid, Url, Header, Method, Body, Options) ->
case nouveau:enabled() of
true ->
ibrowse:send_req_direct(ConnPid, Url, Header, Method, Body, Options);
false ->
{error, nouveau_not_enabled}
end.