blob: 0a4299ba84a9efb75e826dcca57d0b16d135ad49 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(chttpd_db).
-compile(tuple_calls).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("mem3/include/mem3.hrl").
-export([
handle_request/1,
handle_compact_req/2,
handle_design_req/2,
db_req/2,
couch_doc_open/4,
handle_changes_req/2,
update_doc_result_to_json/1, update_doc_result_to_json/2,
handle_design_info_req/3,
handle_view_cleanup_req/2,
update_doc/4,
http_code_from_status/1,
handle_partition_req/2
]).
-import(
chttpd,
[
send_json/2, send_json/3, send_json/4,
send_method_not_allowed/2,
start_json_response/2,
send_chunk/2,
end_json_response/1,
start_chunked_response/3,
absolute_uri/2,
send/2,
start_response_length/4
]
).
-record(doc_query_args, {
options = [],
rev = nil,
open_revs = [],
update_type = ?INTERACTIVE_EDIT,
atts_since = nil
}).
% Accumulator for changes_callback function
-record(cacc, {
etag,
feed,
mochi,
prepend = "",
responding = false,
chunks_sent = 0,
buffer = [],
bufsize = 0,
threshold
}).
-define(IS_ALL_DOCS(T),
(T == <<"_all_docs">> orelse
T == <<"_local_docs">> orelse
T == <<"_design_docs">>)
).
-define(IS_MANGO(T),
(T == <<"_index">> orelse
T == <<"_find">> orelse
T == <<"_explain">>)
).
% Database request handlers
handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req) ->
case {Method, RestParts} of
{'PUT', []} ->
create_db_req(Req, DbName);
{'DELETE', []} ->
% if we get ?rev=... the user is using a faulty script where the
% document id is empty by accident. Let them recover safely.
case chttpd:qs_value(Req, "rev", false) of
false ->
delete_db_req(Req, DbName);
_Rev ->
throw(
{bad_request,
"You tried to DELETE a database with a ?=rev parameter. " ++
"Did you mean to DELETE a document instead?"}
)
end;
{_, []} ->
do_db_req(Req, fun db_req/2);
{_, [SecondPart | _]} ->
Handler = chttpd_handlers:db_handler(SecondPart, fun db_req/2),
do_db_req(Req, Handler)
end.
handle_changes_req(#httpd{method = 'POST'} = Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),
case chttpd:body_length(Req) of
0 ->
handle_changes_req1(Req, Db);
_ ->
{JsonProps} = chttpd:json_body_obj(Req),
handle_changes_req1(Req#httpd{req_body = {JsonProps}}, Db)
end;
handle_changes_req(#httpd{method = 'GET'} = Req, Db) ->
handle_changes_req1(Req, Db);
handle_changes_req(#httpd{path_parts = [_, <<"_changes">>]} = Req, _Db) ->
send_method_not_allowed(Req, "GET,POST,HEAD").
handle_changes_req1(#httpd{} = Req, Db) ->
#changes_args{filter = Raw, style = Style} = Args0 = parse_changes_query(Req),
ChangesArgs = Args0#changes_args{
filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db),
db_open_options = [{user_ctx, couch_db:get_user_ctx(Db)}]
},
Max = chttpd:chunked_response_buffer_size(),
case ChangesArgs#changes_args.feed of
"normal" ->
T0 = os:timestamp(),
{ok, Info} = fabric:get_db_info(Db),
Suffix = mem3:shard_suffix(Db),
Etag = chttpd:make_etag({Info, Suffix}),
DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
chttpd:etag_respond(Req, Etag, fun() ->
Acc0 = #cacc{
feed = normal,
etag = Etag,
mochi = Req,
threshold = Max
},
fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
end);
Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" ->
couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]),
Acc0 = #cacc{
feed = list_to_atom(Feed),
mochi = Req,
threshold = Max
},
try
fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
after
couch_stats:decrement_counter([couchdb, httpd, clients_requesting_changes])
end;
_ ->
Msg = <<"Supported `feed` types: normal, continuous, live, longpoll, eventsource">>,
throw({bad_request, Msg})
end.
% callbacks for continuous feed (newline-delimited JSON Objects)
changes_callback(start, #cacc{feed = continuous} = Acc) ->
{ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200),
{ok, Acc#cacc{mochi = Resp, responding = true}};
changes_callback({change, Change}, #cacc{feed = continuous} = Acc) ->
chttpd_stats:incr_rows(),
Data = [?JSON_ENCODE(Change) | "\n"],
Len = iolist_size(Data),
maybe_flush_changes_feed(Acc, Data, Len);
changes_callback({stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc) ->
#cacc{mochi = Resp, buffer = Buf} = Acc,
Row =
{[
{<<"last_seq">>, EndSeq},
{<<"pending">>, Pending}
]},
Data = [Buf, ?JSON_ENCODE(Row) | "\n"],
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Data),
chttpd:end_delayed_json_response(Resp1);
% callbacks for eventsource feed (newline-delimited eventsource Objects)
changes_callback(start, #cacc{feed = eventsource} = Acc) ->
#cacc{mochi = Req} = Acc,
Headers = [
{"Content-Type", "text/event-stream"},
{"Cache-Control", "no-cache"}
],
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers),
{ok, Acc#cacc{mochi = Resp, responding = true}};
changes_callback({change, {ChangeProp} = Change}, #cacc{feed = eventsource} = Acc) ->
chttpd_stats:incr_rows(),
Seq = proplists:get_value(seq, ChangeProp),
Chunk = [
"data: ",
?JSON_ENCODE(Change),
"\n",
"id: ",
?JSON_ENCODE(Seq),
"\n\n"
],
Len = iolist_size(Chunk),
maybe_flush_changes_feed(Acc, Chunk, Len);
changes_callback(timeout, #cacc{feed = eventsource} = Acc) ->
#cacc{mochi = Resp, chunks_sent = ChunksSet} = Acc,
Chunk = "event: heartbeat\ndata: \n\n",
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
{ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSet + 1}};
changes_callback({stop, _EndSeq, _Pending}, #cacc{feed = eventsource} = Acc) ->
#cacc{mochi = Resp, buffer = Buf} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
chttpd:end_delayed_json_response(Resp1);
% callbacks for longpoll and normal (single JSON Object)
changes_callback(start, #cacc{feed = normal} = Acc) ->
#cacc{etag = Etag, mochi = Req} = Acc,
FirstChunk = "{\"results\":[\n",
{ok, Resp} = chttpd:start_delayed_json_response(
Req,
200,
[{"ETag", Etag}],
FirstChunk
),
{ok, Acc#cacc{mochi = Resp, responding = true}};
changes_callback(start, Acc) ->
#cacc{mochi = Req} = Acc,
FirstChunk = "{\"results\":[\n",
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
{ok, Acc#cacc{mochi = Resp, responding = true}};
changes_callback({change, Change}, Acc) ->
chttpd_stats:incr_rows(),
Data = [Acc#cacc.prepend, ?JSON_ENCODE(Change)],
Len = iolist_size(Data),
maybe_flush_changes_feed(Acc, Data, Len);
changes_callback({stop, EndSeq, Pending}, Acc) ->
#cacc{buffer = Buf, mochi = Resp, threshold = Max} = Acc,
Terminator = [
"\n],\n\"last_seq\":",
?JSON_ENCODE(EndSeq),
",\"pending\":",
?JSON_ENCODE(Pending),
"}\n"
],
{ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, Terminator, Max),
chttpd:end_delayed_json_response(Resp1);
changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) ->
#cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc,
case ChunksSent > 0 of
true ->
{ok, Acc};
false ->
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, <<"\n">>),
{ok, Acc#cacc{mochi = Resp1, chunks_sent = 1}}
end;
changes_callback(waiting_for_updates, Acc) ->
#cacc{buffer = Buf, mochi = Resp, chunks_sent = ChunksSent} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
{ok, Acc#cacc{
buffer = [],
bufsize = 0,
mochi = Resp1,
chunks_sent = ChunksSent + 1
}};
changes_callback(timeout, Acc) ->
#cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
{ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSent + 1}};
changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
#cacc{mochi = Req} = Acc,
chttpd:send_error(Req, Reason);
changes_callback({error, Reason}, #cacc{feed = normal, responding = false} = Acc) ->
#cacc{mochi = Req} = Acc,
chttpd:send_error(Req, Reason);
changes_callback({error, Reason}, Acc) ->
chttpd:send_delayed_error(Acc#cacc.mochi, Reason).
maybe_flush_changes_feed(#cacc{bufsize = Size, threshold = Max} = Acc, Data, Len) when
Size > 0 andalso (Size + Len) > Max
->
#cacc{buffer = Buffer, mochi = Resp} = Acc,
{ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
{ok, Acc#cacc{prepend = ",\r\n", buffer = Data, bufsize = Len, mochi = R1}};
maybe_flush_changes_feed(Acc0, Data, Len) ->
#cacc{buffer = Buf, bufsize = Size, chunks_sent = ChunksSent} = Acc0,
Acc = Acc0#cacc{
prepend = ",\r\n",
buffer = [Buf | Data],
bufsize = Size + Len,
chunks_sent = ChunksSent + 1
},
{ok, Acc}.
handle_compact_req(#httpd{method = 'POST'} = Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),
case Req#httpd.path_parts of
[_DbName, <<"_compact">>] ->
ok = fabric:compact(Db),
send_json(Req, 202, {[{ok, true}]});
[DbName, <<"_compact">>, DesignName | _] ->
case ddoc_cache:open(DbName, <<"_design/", DesignName/binary>>) of
{ok, _DDoc} ->
ok = fabric:compact(Db, DesignName),
send_json(Req, 202, {[{ok, true}]});
Error ->
throw(Error)
end
end;
handle_compact_req(Req, _Db) ->
send_method_not_allowed(Req, "POST").
handle_view_cleanup_req(#httpd{method = 'POST'} = Req, Db) ->
ok = fabric:cleanup_index_files_all_nodes(Db),
send_json(Req, 202, {[{ok, true}]});
handle_view_cleanup_req(Req, _Db) ->
send_method_not_allowed(Req, "POST").
handle_partition_req(#httpd{path_parts = [_, _]} = _Req, _Db) ->
throw({bad_request, invalid_partition_req});
handle_partition_req(#httpd{method = 'GET', path_parts = [_, _, PartId]} = Req, Db) ->
couch_partition:validate_partition(PartId),
case couch_db:is_partitioned(Db) of
true ->
{ok, PartitionInfo} = fabric:get_partition_info(Db, PartId),
send_json(Req, {PartitionInfo});
false ->
throw({bad_request, <<"database is not partitioned">>})
end;
handle_partition_req(
#httpd{
method = 'POST',
path_parts = [_, <<"_partition">>, <<"_", _/binary>>]
},
_Db
) ->
Msg = <<"Partition must not start with an underscore">>,
throw({illegal_partition, Msg});
handle_partition_req(#httpd{path_parts = [_, _, _]} = Req, _Db) ->
send_method_not_allowed(Req, "GET");
handle_partition_req(#httpd{path_parts = [DbName, _, PartId | Rest]} = Req, Db) ->
case couch_db:is_partitioned(Db) of
true ->
couch_partition:validate_partition(PartId),
QS = chttpd:qs(Req),
PartIdStr = ?b2l(PartId),
QSPartIdStr = couch_util:get_value("partition", QS, PartIdStr),
if
QSPartIdStr == PartIdStr ->
ok;
true ->
Msg = <<"Conflicting value for `partition` in query string">>,
throw({bad_request, Msg})
end,
NewQS = lists:ukeysort(1, [{"partition", PartIdStr} | QS]),
NewReq = Req#httpd{
path_parts = [DbName | Rest],
qs = NewQS
},
update_partition_stats(Rest),
case Rest of
[OP | _] when OP == <<"_all_docs">> orelse ?IS_MANGO(OP) ->
case chttpd_handlers:db_handler(OP, fun db_req/2) of
Handler when is_function(Handler, 2) ->
Handler(NewReq, Db);
_ ->
chttpd:send_error(Req, not_found)
end;
[<<"_design">>, _Name, <<"_", _/binary>> | _] ->
handle_design_req(NewReq, Db);
_ ->
chttpd:send_error(Req, not_found)
end;
false ->
throw({bad_request, <<"database is not partitioned">>})
end;
handle_partition_req(Req, _Db) ->
chttpd:send_error(Req, not_found).
update_partition_stats(PathParts) ->
case PathParts of
[<<"_design">> | _] ->
couch_stats:increment_counter([couchdb, httpd, partition_view_requests]);
[<<"_all_docs">> | _] ->
couch_stats:increment_counter([couchdb, httpd, partition_all_docs_requests]);
[<<"_find">> | _] ->
couch_stats:increment_counter([couchdb, httpd, partition_find_requests]);
[<<"_explain">> | _] ->
couch_stats:increment_counter([couchdb, httpd, partition_explain_requests]);
_ ->
% ignore path that do not match
ok
end.
handle_design_req(
#httpd{
path_parts = [_DbName, _Design, Name, <<"_", _/binary>> = Action | _Rest]
} = Req,
Db
) ->
DbName = mem3:dbname(couch_db:name(Db)),
case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of
{ok, DDoc} ->
Handler = chttpd_handlers:design_handler(Action, fun bad_action_req/3),
Handler(Req, Db, DDoc);
Error ->
throw(Error)
end;
handle_design_req(Req, Db) ->
db_req(Req, Db).
bad_action_req(#httpd{path_parts = [_, _, Name | FileNameParts]} = Req, Db, _DDoc) ->
db_attachment_req(Req, Db, <<"_design/", Name/binary>>, FileNameParts).
handle_design_info_req(#httpd{method = 'GET'} = Req, Db, #doc{} = DDoc) ->
[_, _, Name, _] = Req#httpd.path_parts,
{ok, GroupInfoList} = fabric:get_view_group_info(Db, DDoc),
send_json(
Req,
200,
{[
{name, Name},
{view_index, {GroupInfoList}}
]}
);
handle_design_info_req(Req, _Db, _DDoc) ->
send_method_not_allowed(Req, "GET").
create_db_req(#httpd{} = Req, DbName) ->
couch_httpd:verify_is_server_admin(Req),
ShardsOpt = parse_shards_opt(Req),
EngineOpt = parse_engine_opt(Req),
DbProps = parse_partitioned_opt(Req),
Options = lists:append([ShardsOpt, [{props, DbProps}], EngineOpt]),
DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
case fabric:create_db(DbName, Options) of
ok ->
send_json(Req, 201, [{"Location", DocUrl}], {[{ok, true}]});
accepted ->
send_json(Req, 202, [{"Location", DocUrl}], {[{ok, true}]});
{error, file_exists} ->
chttpd:send_error(Req, file_exists);
Error ->
throw(Error)
end.
delete_db_req(#httpd{} = Req, DbName) ->
couch_httpd:verify_is_server_admin(Req),
case fabric:delete_db(DbName, []) of
ok ->
send_json(Req, 200, {[{ok, true}]});
accepted ->
send_json(Req, 202, {[{ok, true}]});
Error ->
throw(Error)
end.
do_db_req(#httpd{path_parts = [DbName | _], user_ctx = Ctx} = Req, Fun) ->
Shard = hd(mem3:shards(DbName)),
Props = couch_util:get_value(props, Shard#shard.opts, []),
Opts =
case Ctx of
undefined ->
[{props, Props}];
#user_ctx{} ->
[{user_ctx, Ctx}, {props, Props}]
end,
{ok, Db} = couch_db:clustered_db(DbName, Opts),
Fun(Req, Db).
db_req(#httpd{method = 'GET', path_parts = [DbName]} = Req, _Db) ->
% measure the time required to generate the etag, see if it's worth it
T0 = os:timestamp(),
{ok, DbInfo} = fabric:get_db_info(DbName),
DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
send_json(Req, {DbInfo});
db_req(#httpd{method = 'POST', path_parts = [DbName], user_ctx = Ctx} = Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options = [{user_ctx, Ctx}, {w, W}],
Doc = couch_db:doc_from_json_obj_validate(Db, chttpd:json_body(Req)),
validate_attachment_names(Doc),
Doc2 =
case Doc#doc.id of
<<"">> ->
Doc#doc{id = couch_uuids:new(), revs = {0, []}};
_ ->
Doc
end,
DocId = Doc2#doc.id,
case chttpd:qs_value(Req, "batch") of
"ok" ->
% async_batching
spawn(fun() ->
case catch (fabric:update_doc(Db, Doc2, Options)) of
{ok, _} ->
chttpd_stats:incr_writes(),
ok;
{accepted, _} ->
chttpd_stats:incr_writes(),
ok;
Error ->
couch_log:debug("Batch doc error (~s): ~p", [DocId, Error])
end
end),
send_json(
Req,
202,
[],
{[
{ok, true},
{id, DocId}
]}
);
_Normal ->
% normal
DocUrl = absolute_uri(Req, [
$/,
couch_util:url_encode(DbName),
$/,
couch_util:url_encode(DocId)
]),
case fabric:update_doc(Db, Doc2, Options) of
{ok, NewRev} ->
chttpd_stats:incr_writes(),
HttpCode = 201;
{accepted, NewRev} ->
chttpd_stats:incr_writes(),
HttpCode = 202
end,
send_json(
Req,
HttpCode,
[{"Location", DocUrl}],
{[
{ok, true},
{id, DocId},
{rev, couch_doc:rev_to_str(NewRev)}
]}
)
end;
db_req(#httpd{path_parts = [_DbName]} = Req, _Db) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
db_req(
#httpd{
method = 'POST',
path_parts = [DbName, <<"_ensure_full_commit">>],
user_ctx = Ctx
} = Req,
_Db
) ->
chttpd:validate_ctype(Req, "application/json"),
%% use fabric call to trigger a database_does_not_exist exception
%% for missing databases that'd return error 404 from chttpd
%% get_security used to prefer shards on the same node over other nodes
fabric:get_security(DbName, [{user_ctx, Ctx}]),
CreationTime = mem3:shard_creation_time(DbName),
send_json(
Req,
201,
{[
{ok, true},
{instance_start_time, CreationTime}
]}
);
db_req(#httpd{path_parts = [_, <<"_ensure_full_commit">>]} = Req, _Db) ->
send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'POST', path_parts = [_, <<"_bulk_docs">>], user_ctx = Ctx} = Req, Db) ->
couch_stats:increment_counter([couchdb, httpd, bulk_requests]),
chttpd:validate_ctype(Req, "application/json"),
{JsonProps} = chttpd:json_body_obj(Req),
DocsArray =
case couch_util:get_value(<<"docs">>, JsonProps) of
undefined ->
throw({bad_request, <<"POST body must include `docs` parameter.">>});
DocsArray0 when not is_list(DocsArray0) ->
throw({bad_request, <<"`docs` parameter must be an array.">>});
DocsArray0 ->
DocsArray0
end,
couch_stats:update_histogram([couchdb, httpd, bulk_docs], length(DocsArray)),
W =
case couch_util:get_value(<<"w">>, JsonProps) of
Value when is_integer(Value) ->
integer_to_list(Value);
_ ->
chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db)))
end,
case chttpd:header_value(Req, "X-Couch-Full-Commit") of
"true" ->
Options = [full_commit, {user_ctx, Ctx}, {w, W}];
"false" ->
Options = [delay_commit, {user_ctx, Ctx}, {w, W}];
_ ->
Options = [{user_ctx, Ctx}, {w, W}]
end,
NewEdits = couch_util:get_value(<<"new_edits">>, JsonProps, true),
Docs = lists:map(
fun(JsonObj) ->
Doc = couch_db:doc_from_json_obj_validate(Db, JsonObj),
validate_revs(Doc, NewEdits),
validate_attachment_names(Doc),
case Doc#doc.id of
<<>> -> Doc#doc{id = couch_uuids:new()};
_ -> Doc
end
end,
DocsArray
),
case NewEdits of
true ->
Options2 =
case couch_util:get_value(<<"all_or_nothing">>, JsonProps) of
true -> [all_or_nothing | Options];
_ -> Options
end,
case fabric:update_docs(Db, Docs, Options2) of
{ok, Results} ->
% output the results
chttpd_stats:incr_writes(length(Results)),
DocResults = lists:zipwith(
fun update_doc_result_to_json/2,
Docs,
Results
),
send_json(Req, 201, DocResults);
{accepted, Results} ->
% output the results
chttpd_stats:incr_writes(length(Results)),
DocResults = lists:zipwith(
fun update_doc_result_to_json/2,
Docs,
Results
),
send_json(Req, 202, DocResults);
{error, Results} ->
% output the results
chttpd_stats:incr_writes(length(Results)),
DocResults = lists:zipwith(
fun update_doc_result_to_json/2,
Docs,
Results
),
send_json(Req, 500, DocResults);
{aborted, Errors} ->
ErrorsJson =
lists:map(fun update_doc_result_to_json/1, Errors),
send_json(Req, 417, ErrorsJson)
end;
false ->
case fabric:update_docs(Db, Docs, [?REPLICATED_CHANGES | Options]) of
{ok, Errors} ->
chttpd_stats:incr_writes(length(Docs)),
ErrorsJson = lists:map(fun update_doc_result_to_json/1, Errors),
send_json(Req, 201, ErrorsJson);
{accepted, Errors} ->
chttpd_stats:incr_writes(length(Docs)),
ErrorsJson = lists:map(fun update_doc_result_to_json/1, Errors),
send_json(Req, 202, ErrorsJson);
{error, Errors} ->
chttpd_stats:incr_writes(length(Docs)),
ErrorsJson = lists:map(fun update_doc_result_to_json/1, Errors),
send_json(Req, 500, ErrorsJson)
end;
_ ->
throw({bad_request, <<"`new_edits` parameter must be a boolean.">>})
end;
db_req(#httpd{path_parts = [_, <<"_bulk_docs">>]} = Req, _Db) ->
send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'POST', path_parts = [_, <<"_bulk_get">>]} = Req, Db) ->
couch_stats:increment_counter([couchdb, httpd, bulk_requests]),
couch_httpd:validate_ctype(Req, "application/json"),
{JsonProps} = chttpd:json_body_obj(Req),
case couch_util:get_value(<<"docs">>, JsonProps) of
undefined ->
throw({bad_request, <<"Missing JSON list of 'docs'.">>});
Docs ->
#doc_query_args{options = Options0} = bulk_get_parse_doc_query(Req),
Options = [{user_ctx, Req#httpd.user_ctx} | Options0],
{ArgsRefs, ArgsMap} = bulk_get_parse_args(Db, Docs),
ResultsMap = bulk_get_docs(Db, ArgsMap, Options),
case bulk_get_is_multipart(Req) of
false -> bulk_get_ret_json(Req, ArgsRefs, ResultsMap, Options);
true -> bulk_get_ret_multipart(Req, ArgsRefs, ResultsMap, Options)
end
end;
db_req(#httpd{path_parts = [_, <<"_bulk_get">>]} = Req, _Db) ->
send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'POST', path_parts = [_, <<"_purge">>]} = Req, Db) ->
couch_stats:increment_counter([couchdb, httpd, purge_requests]),
chttpd:validate_ctype(Req, "application/json"),
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}],
{IdsRevs} = chttpd:json_body_obj(Req),
IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
MaxIds = config:get_integer("purge", "max_document_id_number", 100),
case length(IdsRevs2) =< MaxIds of
false -> throw({bad_request, "Exceeded maximum number of documents."});
true -> ok
end,
RevsLen = lists:foldl(
fun({_Id, Revs}, Acc) ->
length(Revs) + Acc
end,
0,
IdsRevs2
),
MaxRevs = config:get_integer("purge", "max_revisions_number", 1000),
case RevsLen =< MaxRevs of
false -> throw({bad_request, "Exceeded maximum number of revisions."});
true -> ok
end,
couch_stats:increment_counter([couchdb, document_purges, total], length(IdsRevs2)),
Results2 =
case fabric:purge_docs(Db, IdsRevs2, Options) of
{ok, Results} ->
chttpd_stats:incr_writes(length(Results)),
Results;
{accepted, Results} ->
chttpd_stats:incr_writes(length(Results)),
Results
end,
{Code, Json} = purge_results_to_json(IdsRevs2, Results2),
send_json(Req, Code, {[{<<"purge_seq">>, null}, {<<"purged">>, {Json}}]});
db_req(#httpd{path_parts = [_, <<"_purge">>]} = Req, _Db) ->
send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'GET', path_parts = [_, <<"_purged_infos">>]} = Req, Db) ->
{ok, PurgedInfosRaw} = fabric:get_purged_infos(Db),
PurgedInfos = [
{[{id, Id}, {revs, [couch_doc:rev_to_str(Rev) || Rev <- Revs]}]}
|| {Id, Revs} <- PurgedInfosRaw
],
send_json(Req, {[{purged_infos, PurgedInfos}]});
db_req(#httpd{path_parts = [_, <<"_purged_infos">>]} = Req, _Db) ->
send_method_not_allowed(Req, "GET");
db_req(#httpd{method = 'GET', path_parts = [_, OP]} = Req, Db) when ?IS_ALL_DOCS(OP) ->
case chttpd:qs_json_value(Req, "keys", nil) of
Keys when is_list(Keys) ->
all_docs_view(Req, Db, Keys, OP);
nil ->
all_docs_view(Req, Db, undefined, OP);
_ ->
throw({bad_request, "`keys` parameter must be an array."})
end;
db_req(
#httpd{
method = 'POST',
path_parts = [_, OP, <<"queries">>]
} = Req,
Db
) when ?IS_ALL_DOCS(OP) ->
Props = chttpd:json_body_obj(Req),
case couch_mrview_util:get_view_queries(Props) of
undefined ->
throw({bad_request, <<"POST body must include `queries` parameter.">>});
Queries ->
multi_all_docs_view(Req, Db, OP, Queries)
end;
db_req(
#httpd{path_parts = [_, OP, <<"queries">>]} = Req,
_Db
) when ?IS_ALL_DOCS(OP) ->
send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'POST', path_parts = [_, OP]} = Req, Db) when ?IS_ALL_DOCS(OP) ->
chttpd:validate_ctype(Req, "application/json"),
{Fields} = chttpd:json_body_obj(Req),
case couch_util:get_value(<<"keys">>, Fields, nil) of
Keys when is_list(Keys) ->
all_docs_view(Req, Db, Keys, OP);
nil ->
all_docs_view(Req, Db, undefined, OP);
_ ->
throw({bad_request, "`keys` body member must be an array."})
end;
db_req(#httpd{path_parts = [_, OP]} = Req, _Db) when ?IS_ALL_DOCS(OP) ->
send_method_not_allowed(Req, "GET,HEAD,POST");
db_req(#httpd{method = 'POST', path_parts = [_, <<"_missing_revs">>]} = Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),
{JsonDocIdRevs} = chttpd:json_body_obj(Req),
case fabric:get_missing_revs(Db, JsonDocIdRevs) of
{error, Reason} ->
chttpd:send_error(Req, Reason);
{ok, Results} ->
Results2 = [
{Id, couch_doc:revs_to_strs(Revs)}
|| {Id, Revs, _} <- Results
],
send_json(
Req,
{[
{missing_revs, {Results2}}
]}
)
end;
db_req(#httpd{path_parts = [_, <<"_missing_revs">>]} = Req, _Db) ->
send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'POST', path_parts = [_, <<"_revs_diff">>]} = Req, Db) ->
chttpd:validate_ctype(Req, "application/json"),
{JsonDocIdRevs} = chttpd:json_body_obj(Req),
case fabric:get_missing_revs(Db, JsonDocIdRevs) of
{error, Reason} ->
chttpd:send_error(Req, Reason);
{ok, Results} ->
Results2 =
lists:map(
fun({Id, MissingRevs, PossibleAncestors}) ->
{Id, {
[{missing, couch_doc:revs_to_strs(MissingRevs)}] ++
if
PossibleAncestors == [] ->
[];
true ->
[
{possible_ancestors,
couch_doc:revs_to_strs(PossibleAncestors)}
]
end
}}
end,
Results
),
send_json(Req, {Results2})
end;
db_req(#httpd{path_parts = [_, <<"_revs_diff">>]} = Req, _Db) ->
send_method_not_allowed(Req, "POST");
db_req(
#httpd{method = 'PUT', path_parts = [_, <<"_security">>], user_ctx = Ctx} = Req,
Db
) ->
DbName = ?b2l(couch_db:name(Db)),
validate_security_can_be_edited(DbName),
SecObj = chttpd:json_body(Req),
case fabric:set_security(Db, SecObj, [{user_ctx, Ctx}]) of
ok ->
send_json(Req, {[{<<"ok">>, true}]});
Else ->
throw(Else)
end;
db_req(#httpd{method = 'GET', path_parts = [_, <<"_security">>]} = Req, Db) ->
send_json(Req, fabric:get_security(Db));
db_req(#httpd{path_parts = [_, <<"_security">>]} = Req, _Db) ->
send_method_not_allowed(Req, "PUT,GET");
db_req(
#httpd{method = 'PUT', path_parts = [_, <<"_revs_limit">>], user_ctx = Ctx} = Req,
Db
) ->
Limit = chttpd:json_body(Req),
ok = fabric:set_revs_limit(Db, Limit, [{user_ctx, Ctx}]),
send_json(Req, {[{<<"ok">>, true}]});
db_req(#httpd{method = 'GET', path_parts = [_, <<"_revs_limit">>]} = Req, Db) ->
send_json(Req, fabric:get_revs_limit(Db));
db_req(#httpd{path_parts = [_, <<"_revs_limit">>]} = Req, _Db) ->
send_method_not_allowed(Req, "PUT,GET");
db_req(#httpd{method = 'PUT', path_parts = [_, <<"_purged_infos_limit">>]} = Req, Db) ->
Options = [{user_ctx, Req#httpd.user_ctx}],
case chttpd:json_body(Req) of
Limit when is_integer(Limit), Limit > 0 ->
case fabric:set_purge_infos_limit(Db, Limit, Options) of
ok ->
send_json(Req, {[{<<"ok">>, true}]});
Error ->
throw(Error)
end;
_ ->
throw({bad_request, "`purged_infos_limit` must be positive integer"})
end;
db_req(#httpd{method = 'GET', path_parts = [_, <<"_purged_infos_limit">>]} = Req, Db) ->
send_json(Req, fabric:get_purge_infos_limit(Db));
db_req(#httpd{path_parts = [_, <<"_purged_infos_limit">>]} = Req, _Db) ->
send_method_not_allowed(Req, "GET,PUT");
% Special case to enable using an unencoded slash in the URL of design docs,
% as slashes in document IDs must otherwise be URL encoded.
db_req(
#httpd{
method = 'GET', mochi_req = MochiReq, path_parts = [_DbName, <<"_design/", _/binary>> | _]
} = Req,
_Db
) ->
[Head | Tail] = re:split(MochiReq:get(raw_path), "_design%2F", [{return, list}, caseless]),
chttpd:send_redirect(Req, Head ++ "_design/" ++ Tail);
db_req(#httpd{path_parts = [_DbName, <<"_design">>, Name]} = Req, Db) ->
db_doc_req(Req, Db, <<"_design/", Name/binary>>);
db_req(#httpd{path_parts = [_DbName, <<"_design">>, Name | FileNameParts]} = Req, Db) ->
db_attachment_req(Req, Db, <<"_design/", Name/binary>>, FileNameParts);
% Special case to allow for accessing local documents without %2F
% encoding the docid. Throws out requests that don't have the second
% path part or that specify an attachment name.
db_req(#httpd{path_parts = [_DbName, <<"_local">>]}, _Db) ->
throw({bad_request, <<"Invalid _local document id.">>});
db_req(#httpd{path_parts = [_DbName, <<"_local/">>]}, _Db) ->
throw({bad_request, <<"Invalid _local document id.">>});
db_req(#httpd{path_parts = [_DbName, <<"_local">>, Name]} = Req, Db) ->
db_doc_req(Req, Db, <<"_local/", Name/binary>>);
db_req(#httpd{path_parts = [_DbName, <<"_local">> | _Rest]}, _Db) ->
throw({bad_request, <<"_local documents do not accept attachments.">>});
db_req(#httpd{path_parts = [_, DocId]} = Req, Db) ->
db_doc_req(Req, Db, DocId);
db_req(#httpd{method = 'DELETE', path_parts = [_, DocId | FileNameParts]} = Req, Db) ->
chttpd:body(Req),
db_attachment_req(Req, Db, DocId, FileNameParts);
db_req(#httpd{path_parts = [_, DocId | FileNameParts]} = Req, Db) ->
db_attachment_req(Req, Db, DocId, FileNameParts).
multi_all_docs_view(Req, Db, OP, Queries) ->
Args0 = couch_mrview_http:parse_params(Req, undefined),
Args1 = Args0#mrargs{view_type = map},
ArgQueries = lists:map(
fun({Query}) ->
QueryArg1 = couch_mrview_http:parse_params(
Query,
undefined,
Args1,
[decoded]
),
QueryArgs2 = fabric_util:validate_all_docs_args(Db, QueryArg1),
set_namespace(OP, QueryArgs2)
end,
Queries
),
Options = [{user_ctx, Req#httpd.user_ctx}],
VAcc0 = #vacc{db = Db, req = Req, prepend = "\r\n"},
FirstChunk = "{\"results\":[",
{ok, Resp0} = chttpd:start_delayed_json_response(
VAcc0#vacc.req,
200,
[],
FirstChunk
),
VAcc1 = VAcc0#vacc{resp = Resp0},
VAcc2 = lists:foldl(
fun(Args, Acc0) ->
{ok, Acc1} = fabric:all_docs(
Db,
Options,
fun view_cb/2,
Acc0,
Args
),
Acc1
end,
VAcc1,
ArgQueries
),
{ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"),
chttpd:end_delayed_json_response(Resp1).
all_docs_view(Req, Db, Keys, OP) ->
Args0 = couch_mrview_http:parse_body_and_query(Req, Keys),
Args1 = Args0#mrargs{view_type = map},
Args2 = fabric_util:validate_all_docs_args(Db, Args1),
Args3 = set_namespace(OP, Args2),
Args4 = set_include_sysdocs(OP, Req, Args3),
Options = [{user_ctx, Req#httpd.user_ctx}],
Max = chttpd:chunked_response_buffer_size(),
VAcc = #vacc{db = Db, req = Req, threshold = Max},
{ok, Resp} = fabric:all_docs(Db, Options, fun view_cb/2, VAcc, Args4),
{ok, Resp#vacc.resp}.
view_cb({row, Row} = Msg, Acc) ->
case lists:keymember(doc, 1, Row) of
true -> chttpd_stats:incr_reads();
false -> ok
end,
chttpd_stats:incr_rows(),
couch_mrview_http:view_cb(Msg, Acc);
view_cb(Msg, Acc) ->
couch_mrview_http:view_cb(Msg, Acc).
db_doc_req(#httpd{method = 'DELETE'} = Req, Db, DocId) ->
% check for the existence of the doc to handle the 404 case.
couch_doc_open(Db, DocId, nil, []),
case chttpd:qs_value(Req, "rev") of
undefined ->
Body = {[{<<"_deleted">>, true}]};
Rev ->
Body = {[{<<"_rev">>, ?l2b(Rev)}, {<<"_deleted">>, true}]}
end,
Doc = couch_doc_from_req(Req, Db, DocId, Body),
send_updated_doc(Req, Db, DocId, Doc);
db_doc_req(#httpd{method = 'GET', mochi_req = MochiReq} = Req, Db, DocId) ->
#doc_query_args{
rev = Rev0,
open_revs = Revs,
options = Options0,
atts_since = AttsSince
} = parse_doc_query(Req),
Options = [{user_ctx, Req#httpd.user_ctx} | Options0],
case Revs of
[] ->
Options2 =
if
AttsSince /= nil ->
[{atts_since, AttsSince}, attachments | Options];
true ->
Options
end,
Rev =
case lists:member(latest, Options) of
% couch_doc_open will open the winning rev despite of a rev passed
% https://docs.couchdb.org/en/stable/api/document/common.html?highlight=latest#get--db-docid
true -> nil;
false -> Rev0
end,
Doc = couch_doc_open(Db, DocId, Rev, Options2),
send_doc(Req, Doc, Options2);
_ ->
case fabric:open_revs(Db, DocId, Revs, Options) of
{ok, []} when Revs == all ->
chttpd:send_error(Req, {not_found, missing});
{ok, Results} ->
chttpd_stats:incr_reads(length(Results)),
case MochiReq:accepts_content_type("multipart/mixed") of
false ->
{ok, Resp} = start_json_response(Req, 200),
send_chunk(Resp, "["),
% We loop through the docs. The first time through the separator
% is whitespace, then a comma on subsequent iterations.
lists:foldl(
fun(Result, AccSeparator) ->
case Result of
{ok, Doc} ->
JsonDoc = couch_doc:to_json_obj(Doc, Options),
Json = ?JSON_ENCODE({[{ok, JsonDoc}]}),
send_chunk(Resp, AccSeparator ++ Json);
{{not_found, missing}, RevId} ->
RevStr = couch_doc:rev_to_str(RevId),
Json = ?JSON_ENCODE({[{<<"missing">>, RevStr}]}),
send_chunk(Resp, AccSeparator ++ Json)
end,
% AccSeparator now has a comma
","
end,
"",
Results
),
send_chunk(Resp, "]"),
end_json_response(Resp);
true ->
send_docs_multipart(Req, Results, Options)
end;
{error, Error} ->
chttpd:send_error(Req, Error)
end
end;
db_doc_req(#httpd{method = 'POST', user_ctx = Ctx} = Req, Db, DocId) ->
couch_httpd:validate_referer(Req),
couch_db:validate_docid(Db, DocId),
chttpd:validate_ctype(Req, "multipart/form-data"),
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options = [{user_ctx, Ctx}, {w, W}],
Form = couch_httpd:parse_form(Req),
case proplists:is_defined("_doc", Form) of
true ->
Json = ?JSON_DECODE(couch_util:get_value("_doc", Form)),
Doc = couch_doc_from_req(Req, Db, DocId, Json);
false ->
Rev = couch_doc:parse_rev(list_to_binary(couch_util:get_value("_rev", Form))),
Doc =
case fabric:open_revs(Db, DocId, [Rev], []) of
{ok, [{ok, Doc0}]} ->
chttpd_stats:incr_reads(),
Doc0;
{error, Error} ->
throw(Error)
end
end,
UpdatedAtts = [
couch_att:new([
{name, validate_attachment_name(Name)},
{type, list_to_binary(ContentType)},
{data, Content}
])
|| {Name, {ContentType, _}, Content} <-
proplists:get_all_values("_attachments", Form)
],
#doc{atts = OldAtts} = Doc,
OldAtts2 = lists:flatmap(
fun(Att) ->
OldName = couch_att:fetch(name, Att),
case [1 || A <- UpdatedAtts, couch_att:fetch(name, A) == OldName] of
% the attachment wasn't in the UpdatedAtts, return it
[] -> [Att];
% the attachment was in the UpdatedAtts, drop it
_ -> []
end
end,
OldAtts
),
NewDoc = Doc#doc{
atts = UpdatedAtts ++ OldAtts2
},
case fabric:update_doc(Db, NewDoc, Options) of
{ok, NewRev} ->
chttpd_stats:incr_writes(),
HttpCode = 201;
{accepted, NewRev} ->
chttpd_stats:incr_writes(),
HttpCode = 202
end,
send_json(
Req,
HttpCode,
[{"ETag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewRev)) ++ "\""}],
{[
{ok, true},
{id, DocId},
{rev, couch_doc:rev_to_str(NewRev)}
]}
);
db_doc_req(#httpd{method = 'PUT', user_ctx = Ctx} = Req, Db, DocId) ->
#doc_query_args{
update_type = UpdateType
} = parse_doc_query(Req),
DbName = couch_db:name(Db),
couch_db:validate_docid(Db, DocId),
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options = [{user_ctx, Ctx}, {w, W}],
Loc = absolute_uri(Req, [
$/,
couch_util:url_encode(DbName),
$/,
couch_util:url_encode(DocId)
]),
RespHeaders = [{"Location", Loc}],
case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
("multipart/related;" ++ _) = ContentType ->
couch_httpd:check_max_request_length(Req),
couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)),
{ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(
ContentType,
fun() -> receive_request_data(Req) end
),
Doc = couch_doc_from_req(Req, Db, DocId, Doc0),
validate_revs(Doc, UpdateType =:= ?INTERACTIVE_EDIT),
try
{HttpCode, RespHeaders1, RespBody} = update_doc_req(
Req,
Db,
DocId,
Doc,
RespHeaders,
UpdateType
),
WaitFun(),
send_json(Req, HttpCode, RespHeaders1, RespBody)
catch
throw:Err ->
% Document rejected by a validate_doc_update function.
couch_httpd_multipart:abort_multipart_stream(Parser),
throw(Err)
end;
_Else ->
case chttpd:qs_value(Req, "batch") of
"ok" ->
% batch
Doc = couch_doc_from_req(Req, Db, DocId, chttpd:json_body(Req)),
validate_revs(Doc, UpdateType =:= ?INTERACTIVE_EDIT),
spawn(fun() ->
case catch (fabric:update_doc(Db, Doc, Options)) of
{ok, _} ->
chttpd_stats:incr_writes(),
ok;
{accepted, _} ->
chttpd_stats:incr_writes(),
ok;
Error ->
couch_log:notice("Batch doc error (~s): ~p", [DocId, Error])
end
end),
send_json(
Req,
202,
[],
{[
{ok, true},
{id, DocId}
]}
);
_Normal ->
% normal
Body = chttpd:json_body(Req),
Doc = couch_doc_from_req(Req, Db, DocId, Body),
validate_revs(Doc, UpdateType =:= ?INTERACTIVE_EDIT),
send_updated_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType)
end
end;
db_doc_req(#httpd{method = 'COPY', user_ctx = Ctx} = Req, Db, SourceDocId) ->
SourceRev =
case extract_header_rev(Req, chttpd:qs_value(Req, "rev")) of
missing_rev -> nil;
Rev -> Rev
end,
{TargetDocId0, TargetRevs} = couch_httpd_db:parse_copy_destination_header(Req),
TargetDocId = list_to_binary(chttpd:unquote(TargetDocId0)),
% open old doc
Doc = couch_doc_open(Db, SourceDocId, SourceRev, []),
% save new doc
case
fabric:update_doc(
Db,
Doc#doc{id = TargetDocId, revs = TargetRevs},
[{user_ctx, Ctx}]
)
of
{ok, NewTargetRev} ->
chttpd_stats:incr_writes(),
HttpCode = 201;
{accepted, NewTargetRev} ->
chttpd_stats:incr_writes(),
HttpCode = 202
end,
% respond
DbName = couch_db:name(Db),
{PartRes} = update_doc_result_to_json(TargetDocId, {ok, NewTargetRev}),
Loc = absolute_uri(
Req, "/" ++ couch_util:url_encode(DbName) ++ "/" ++ couch_util:url_encode(TargetDocId)
),
send_json(
Req,
HttpCode,
[
{"Location", Loc},
{"ETag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewTargetRev)) ++ "\""}
],
{PartRes}
);
db_doc_req(Req, _Db, _DocId) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY").
send_doc(Req, Doc, Options) ->
case Doc#doc.meta of
[] ->
DiskEtag = couch_httpd:doc_etag(Doc),
% output etag only when we have no meta
chttpd:etag_respond(Req, DiskEtag, fun() ->
send_doc_efficiently(Req, Doc, [{"ETag", DiskEtag}], Options)
end);
_ ->
send_doc_efficiently(Req, Doc, [], Options)
end.
send_doc_efficiently(Req, #doc{atts = []} = Doc, Headers, Options) ->
send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options));
send_doc_efficiently(#httpd{mochi_req = MochiReq} = Req, #doc{atts = Atts} = Doc, Headers, Options) ->
case lists:member(attachments, Options) of
true ->
Refs = monitor_attachments(Atts),
try
case MochiReq:accepts_content_type("multipart/related") of
false ->
send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options));
true ->
Boundary = couch_uuids:random(),
JsonBytes = ?JSON_ENCODE(
couch_doc:to_json_obj(
Doc,
[attachments, follows, att_encoding_info | Options]
)
),
{ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
Boundary, JsonBytes, Atts, true
),
CType = {"Content-Type", ContentType},
{ok, Resp} = start_response_length(Req, 200, [CType | Headers], Len),
couch_doc:doc_to_multi_part_stream(
Boundary,
JsonBytes,
Atts,
fun(Data) -> couch_httpd:send(Resp, Data) end,
true
)
end
after
demonitor_refs(Refs)
end;
false ->
send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options))
end.
send_docs_multipart_bulk_get(Results, Options0, OuterBoundary, Resp) ->
InnerBoundary = bulk_get_multipart_boundary(),
Options = [attachments, follows, att_encoding_info | Options0],
lists:foreach(
fun
({ok, #doc{id = Id, revs = Revs, atts = Atts} = Doc}) ->
Refs = monitor_attachments(Doc#doc.atts),
try
JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>),
case non_stubbed_attachments(Atts) of
[] ->
couch_httpd:send_chunk(
Resp, <<"\r\nContent-Type: application/json\r\n\r\n">>
);
_ ->
lists:foreach(
fun(Header) -> couch_httpd:send_chunk(Resp, Header) end,
bulk_get_multipart_headers(Revs, Id, InnerBoundary)
)
end,
couch_doc:doc_to_multi_part_stream(
InnerBoundary,
JsonBytes,
Atts,
fun(Data) -> couch_httpd:send_chunk(Resp, Data) end,
true
)
after
demonitor_refs(Refs)
end;
({{not_found, missing}, RevId}) ->
RevStr = couch_doc:rev_to_str(RevId),
Json = ?JSON_ENCODE(
{[
{<<"rev">>, RevStr},
{<<"error">>, <<"not_found">>},
{<<"reason">>, <<"missing">>}
]}
),
couch_httpd:send_chunk(
Resp,
[
<<"\r\n--", OuterBoundary/binary>>,
<<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
Json
]
)
end,
Results
).
send_docs_multipart(Req, Results, Options1) ->
OuterBoundary = couch_uuids:random(),
InnerBoundary = couch_uuids:random(),
Options = [attachments, follows, att_encoding_info | Options1],
CType = {"Content-Type", "multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""},
{ok, Resp} = start_chunked_response(Req, 200, [CType]),
couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>),
lists:foreach(
fun
({ok, #doc{atts = Atts} = Doc}) ->
Refs = monitor_attachments(Doc#doc.atts),
try
JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
{ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream(
InnerBoundary, JsonBytes, Atts, true
),
couch_httpd:send_chunk(
Resp, <<"\r\nContent-Type: ", ContentType/binary, "\r\n\r\n">>
),
couch_doc:doc_to_multi_part_stream(
InnerBoundary,
JsonBytes,
Atts,
fun(Data) -> couch_httpd:send_chunk(Resp, Data) end,
true
),
couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>)
after
demonitor_refs(Refs)
end;
({{not_found, missing}, RevId}) ->
RevStr = couch_doc:rev_to_str(RevId),
Json = ?JSON_ENCODE({[{<<"missing">>, RevStr}]}),
couch_httpd:send_chunk(
Resp,
[
<<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
Json,
<<"\r\n--", OuterBoundary/binary>>
]
)
end,
Results
),
couch_httpd:send_chunk(Resp, <<"--">>),
couch_httpd:last_chunk(Resp).
bulk_get_multipart_headers({0, []}, Id, Boundary) ->
[
<<"\r\nX-Doc-Id: ", Id/binary>>,
<<"\r\nContent-Type: multipart/related; boundary=", Boundary/binary, "\r\n\r\n">>
];
bulk_get_multipart_headers({Start, [FirstRevId | _]}, Id, Boundary) ->
RevStr = couch_doc:rev_to_str({Start, FirstRevId}),
[
<<"\r\nX-Doc-Id: ", Id/binary>>,
<<"\r\nX-Rev-Id: ", RevStr/binary>>,
<<"\r\nContent-Type: multipart/related; boundary=", Boundary/binary, "\r\n\r\n">>
].
bulk_get_multipart_boundary() ->
Unique = couch_uuids:random(),
<<"--", Unique/binary>>.
receive_request_data(Req) ->
receive_request_data(Req, chttpd:body_length(Req)).
receive_request_data(Req, Len) when Len == chunked ->
Ref = make_ref(),
ChunkFun = fun({_Length, Binary}, _State) ->
self() ! {chunk, Ref, Binary}
end,
couch_httpd:recv_chunked(Req, 4096, ChunkFun, ok),
GetChunk = fun GC() ->
receive
{chunk, Ref, Binary} -> {Binary, GC}
end
end,
{
receive
{chunk, Ref, Binary} -> Binary
end,
GetChunk
};
receive_request_data(Req, LenLeft) when LenLeft > 0 ->
Len = erlang:min(4096, LenLeft),
Data = chttpd:recv(Req, Len),
{Data, fun() -> receive_request_data(Req, LenLeft - iolist_size(Data)) end};
receive_request_data(_Req, _) ->
throw(<<"expected more data">>).
update_doc_result_to_json({error, _} = Error) ->
{_Code, Err, Msg} = chttpd:error_info(Error),
{[
{error, Err},
{reason, Msg}
]};
update_doc_result_to_json({{Id, Rev}, Error}) ->
{_Code, Err, Msg} = chttpd:error_info(Error),
{[
{id, Id},
{rev, couch_doc:rev_to_str(Rev)},
{error, Err},
{reason, Msg}
]}.
update_doc_result_to_json(#doc{id = DocId}, Result) ->
update_doc_result_to_json(DocId, Result);
update_doc_result_to_json(DocId, {ok, NewRev}) ->
{[{ok, true}, {id, DocId}, {rev, couch_doc:rev_to_str(NewRev)}]};
update_doc_result_to_json(DocId, {accepted, NewRev}) ->
{[{ok, true}, {id, DocId}, {rev, couch_doc:rev_to_str(NewRev)}, {accepted, true}]};
update_doc_result_to_json(DocId, Error) ->
{_Code, ErrorStr, Reason} = chttpd:error_info(Error),
{[{id, DocId}, {error, ErrorStr}, {reason, Reason}]}.
purge_results_to_json([], []) ->
{201, []};
purge_results_to_json([{DocId, _Revs} | RIn], [{ok, PRevs} | ROut]) ->
{Code, Results} = purge_results_to_json(RIn, ROut),
couch_stats:increment_counter([couchdb, document_purges, success]),
{Code, [{DocId, couch_doc:revs_to_strs(PRevs)} | Results]};
purge_results_to_json([{DocId, _Revs} | RIn], [{accepted, PRevs} | ROut]) ->
{Code, Results} = purge_results_to_json(RIn, ROut),
couch_stats:increment_counter([couchdb, document_purges, success]),
NewResults = [{DocId, couch_doc:revs_to_strs(PRevs)} | Results],
{erlang:max(Code, 202), NewResults};
purge_results_to_json([{DocId, _Revs} | RIn], [Error | ROut]) ->
{Code, Results} = purge_results_to_json(RIn, ROut),
{NewCode, ErrorStr, Reason} = chttpd:error_info(Error),
couch_stats:increment_counter([couchdb, document_purges, failure]),
NewResults = [{DocId, {[{error, ErrorStr}, {reason, Reason}]}} | Results],
{erlang:max(NewCode, Code), NewResults}.
send_updated_doc(Req, Db, DocId, Json) ->
send_updated_doc(Req, Db, DocId, Json, []).
send_updated_doc(Req, Db, DocId, Doc, Headers) ->
send_updated_doc(Req, Db, DocId, Doc, Headers, ?INTERACTIVE_EDIT).
send_updated_doc(Req, Db, DocId, Doc, Headers, Type) ->
{Code, Headers1, Body} = update_doc_req(Req, Db, DocId, Doc, Headers, Type),
send_json(Req, Code, Headers1, Body).
update_doc_req(Req, Db, DocId, Doc, Headers, UpdateType) ->
#httpd{user_ctx = Ctx} = Req,
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
Options =
case couch_httpd:header_value(Req, "X-Couch-Full-Commit") of
"true" ->
[full_commit, UpdateType, {user_ctx, Ctx}, {w, W}];
"false" ->
[delay_commit, UpdateType, {user_ctx, Ctx}, {w, W}];
_ ->
[UpdateType, {user_ctx, Ctx}, {w, W}]
end,
{Status, {etag, Etag}, Body} = update_doc(Db, DocId, Doc, Options),
HttpCode = http_code_from_status(Status),
ResponseHeaders = [{"ETag", Etag} | Headers],
{HttpCode, ResponseHeaders, Body}.
http_code_from_status(Status) ->
case Status of
accepted ->
202;
created ->
201;
ok ->
200
end.
update_doc(Db, DocId, #doc{deleted = Deleted, body = DocBody} = Doc, Options) ->
{_, Ref} = spawn_monitor(fun() ->
try fabric:update_doc(Db, Doc, Options) of
Resp ->
exit({exit_ok, Resp})
catch
throw:Reason ->
exit({exit_throw, Reason});
error:Reason ->
exit({exit_error, Reason});
exit:Reason ->
exit({exit_exit, Reason})
end
end),
Result =
receive
{'DOWN', Ref, _, _, {exit_ok, Ret}} ->
Ret;
{'DOWN', Ref, _, _, {exit_throw, Reason}} ->
throw(Reason);
{'DOWN', Ref, _, _, {exit_error, Reason}} ->
erlang:error(Reason);
{'DOWN', Ref, _, _, {exit_exit, Reason}} ->
erlang:exit(Reason)
end,
case Result of
{ok, NewRev} ->
Accepted = false;
{accepted, NewRev} ->
Accepted = true
end,
Etag = couch_httpd:doc_etag(DocId, DocBody, NewRev),
Status =
case {Accepted, Deleted} of
{true, _} ->
accepted;
{false, true} ->
ok;
{false, false} ->
created
end,
NewRevStr = couch_doc:rev_to_str(NewRev),
Body = {[{ok, true}, {id, DocId}, {rev, NewRevStr}]},
{Status, {etag, Etag}, Body}.
couch_doc_from_req(Req, _Db, DocId, #doc{revs = Revs} = Doc) ->
validate_attachment_names(Doc),
Rev =
case chttpd:qs_value(Req, "rev") of
undefined ->
undefined;
QSRev ->
couch_doc:parse_rev(QSRev)
end,
Revs2 =
case Revs of
{Start, [RevId | _]} ->
if
Rev /= undefined andalso Rev /= {Start, RevId} ->
throw(
{bad_request,
"Document rev from request body and query "
"string have different values"}
);
true ->
case extract_header_rev(Req, {Start, RevId}) of
missing_rev -> {0, []};
_ -> Revs
end
end;
_ ->
case extract_header_rev(Req, Rev) of
missing_rev -> {0, []};
{Pos, RevId2} -> {Pos, [RevId2]}
end
end,
Doc#doc{id = DocId, revs = Revs2};
couch_doc_from_req(Req, Db, DocId, Json) ->
Doc = couch_db:doc_from_json_obj_validate(Db, Json),
couch_doc_from_req(Req, Db, DocId, Doc).
% Useful for debugging
% couch_doc_open(Db, DocId) ->
% couch_doc_open(Db, DocId, nil, []).
couch_doc_open(Db, DocId, Rev, Options0) ->
Options = [{user_ctx, couch_db:get_user_ctx(Db)} | Options0],
case Rev of
% open most recent rev
nil ->
case fabric:open_doc(Db, DocId, Options) of
{ok, Doc} ->
chttpd_stats:incr_reads(),
Doc;
Error ->
throw(Error)
end;
% open a specific rev (deletions come back as stubs)
_ ->
case fabric:open_revs(Db, DocId, [Rev], Options) of
{ok, [{ok, Doc}]} ->
chttpd_stats:incr_reads(),
Doc;
{ok, [{{not_found, missing}, Rev}]} ->
throw(not_found);
{ok, [Else]} ->
throw(Else);
{error, Error} ->
throw(Error)
end
end.
get_existing_attachment(Atts, FileName) ->
% Check if attachment exists, if not throw not_found
case [A || A <- Atts, couch_att:fetch(name, A) == FileName] of
[] -> throw({not_found, "Document is missing attachment"});
[Att] -> Att
end.
% Attachment request handlers
db_attachment_req(#httpd{method = 'GET', mochi_req = MochiReq} = Req, Db, DocId, FileNameParts) ->
FileName = list_to_binary(
mochiweb_util:join(
lists:map(
fun binary_to_list/1,
FileNameParts
),
"/"
)
),
#doc_query_args{
rev = Rev,
options = Options
} = parse_doc_query(Req),
#doc{
atts = Atts
} = Doc = couch_doc_open(Db, DocId, Rev, Options),
Att = get_existing_attachment(Atts, FileName),
[Type, Enc, DiskLen, AttLen, Md5] = couch_att:fetch(
[type, encoding, disk_len, att_len, md5], Att
),
Refs = monitor_attachments(Att),
try
Etag =
case Md5 of
<<>> -> chttpd:doc_etag(Doc);
_ -> "\"" ++ ?b2l(base64:encode(Md5)) ++ "\""
end,
ReqAcceptsAttEnc = lists:member(
atom_to_list(Enc),
couch_httpd:accepted_encodings(Req)
),
Headers0 =
[
{"ETag", Etag},
{"Cache-Control", "must-revalidate"},
{"Content-Type", binary_to_list(Type)}
] ++
case ReqAcceptsAttEnc of
true when Enc =/= identity ->
% RFC 2616 says that the 'identify' encoding should not be used in
% the Content-Encoding header
[{"Content-Encoding", atom_to_list(Enc)}];
_ ->
[]
end ++
case Enc of
identity ->
[{"Accept-Ranges", "bytes"}];
_ ->
[{"Accept-Ranges", "none"}]
end,
Headers = chttpd_util:maybe_add_csp_header("attachments", Headers0, "sandbox"),
Len =
case {Enc, ReqAcceptsAttEnc} of
{identity, _} ->
% stored and served in identity form
DiskLen;
{_, false} when DiskLen =/= AttLen ->
% Stored encoded, but client doesn't accept the encoding we used,
% so we need to decode on the fly. DiskLen is the identity length
% of the attachment.
DiskLen;
{_, true} ->
% Stored and served encoded. AttLen is the encoded length.
AttLen;
_ ->
% We received an encoded attachment and stored it as such, so we
% don't know the identity length. The client doesn't accept the
% encoding, and since we cannot serve a correct Content-Length
% header we'll fall back to a chunked response.
undefined
end,
AttFun =
case ReqAcceptsAttEnc of
false ->
fun couch_att:foldl_decode/3;
true ->
fun couch_att:foldl/3
end,
chttpd:etag_respond(
Req,
Etag,
fun() ->
case Len of
undefined ->
{ok, Resp} = start_chunked_response(Req, 200, Headers),
AttFun(Att, fun(Seg, _) -> send_chunk(Resp, Seg) end, {ok, Resp}),
couch_httpd:last_chunk(Resp);
_ ->
Ranges = parse_ranges(MochiReq:get(range), Len),
case {Enc, Ranges} of
{identity, [{From, To}]} ->
Headers1 =
[{"Content-Range", make_content_range(From, To, Len)}] ++
Headers,
{ok, Resp} = start_response_length(
Req, 206, Headers1, To - From + 1
),
couch_att:range_foldl(
Att,
From,
To + 1,
fun(Seg, _) -> send(Resp, Seg) end,
{ok, Resp}
);
{identity, Ranges} when is_list(Ranges) andalso length(Ranges) < 10 ->
send_ranges_multipart(Req, Type, Len, Att, Ranges);
_ ->
{ok, Resp} = start_response_length(Req, 200, Headers, Len),
AttFun(Att, fun(Seg, _) -> send(Resp, Seg) end, {ok, Resp})
end
end
end
)
after
demonitor_refs(Refs)
end;
db_attachment_req(#httpd{method = Method, user_ctx = Ctx} = Req, Db, DocId, FileNameParts) when
(Method == 'PUT') or (Method == 'DELETE')
->
FileName = validate_attachment_name(
mochiweb_util:join(
lists:map(
fun binary_to_list/1,
FileNameParts
),
"/"
)
),
NewAtt =
case Method of
'DELETE' ->
[];
_ ->
MimeType =
case couch_httpd:header_value(Req, "Content-Type") of
% We could throw an error here or guess by the FileName.
% Currently, just giving it a default.
undefined -> <<"application/octet-stream">>;
CType -> list_to_binary(CType)
end,
Data = fabric:att_receiver(Req, couch_db:name(Db), chttpd:body_length(Req)),
ContentLen =
case couch_httpd:header_value(Req, "Content-Length") of
undefined -> undefined;
Length -> list_to_integer(Length)
end,
ContentEnc = string:to_lower(
string:strip(
couch_httpd:header_value(Req, "Content-Encoding", "identity")
)
),
Encoding =
case ContentEnc of
"identity" ->
identity;
"gzip" ->
gzip;
_ ->
throw({
bad_ctype,
"Only gzip and identity content-encodings are supported"
})
end,
[
couch_att:new([
{name, FileName},
{type, MimeType},
{data, Data},
{att_len, ContentLen},
{encoding, Encoding}
])
]
end,
Doc =
case extract_header_rev(Req, chttpd:qs_value(Req, "rev")) of
% make the new doc
missing_rev ->
if
Method =/= 'DELETE' ->
ok;
true ->
% check for the existence of the doc and attachment
CurrDoc = #doc{} = couch_doc_open(Db, DocId, nil, []),
get_existing_attachment(CurrDoc#doc.atts, FileName)
end,
couch_db:validate_docid(Db, DocId),
#doc{id = DocId};
Rev ->
case fabric:open_revs(Db, DocId, [Rev], [{user_ctx, Ctx}]) of
{ok, [{ok, Doc0}]} ->
chttpd_stats:incr_reads(),
if
Method =/= 'DELETE' ->
ok;
true ->
% check if attachment exists
get_existing_attachment(Doc0#doc.atts, FileName)
end,
Doc0;
{ok, [Error]} ->
throw(Error);
{error, Error} ->
throw(Error)
end
end,
#doc{atts = Atts} = Doc,
DocEdited = Doc#doc{
atts = NewAtt ++ [A || A <- Atts, couch_att:fetch(name, A) /= FileName]
},
W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
case fabric:update_doc(Db, DocEdited, [{user_ctx, Ctx}, {w, W}]) of
{ok, UpdatedRev} ->
chttpd_stats:incr_writes(),
HttpCode = 201;
{accepted, UpdatedRev} ->
chttpd_stats:incr_writes(),
HttpCode = 202
end,
erlang:put(mochiweb_request_recv, true),
DbName = couch_db:name(Db),
{Status, Headers} =
case Method of
'DELETE' ->
{200, []};
_ ->
{HttpCode, [
{"Location",
absolute_uri(Req, [
$/,
DbName,
$/,
couch_util:url_encode(DocId),
$/,
couch_util:url_encode(FileName)
])}
]}
end,
send_json(
Req,
Status,
Headers,
{[
{ok, true},
{id, DocId},
{rev, couch_doc:rev_to_str(UpdatedRev)}
]}
);
db_attachment_req(Req, _Db, _DocId, _FileNameParts) ->
send_method_not_allowed(Req, "DELETE,GET,HEAD,PUT").
send_ranges_multipart(Req, ContentType, Len, Att, Ranges) ->
Boundary = couch_uuids:random(),
CType = {"Content-Type", "multipart/byteranges; boundary=\"" ++ ?b2l(Boundary) ++ "\""},
{ok, Resp} = start_chunked_response(Req, 206, [CType]),
couch_httpd:send_chunk(Resp, <<"--", Boundary/binary>>),
lists:foreach(
fun({From, To}) ->
ContentRange = make_content_range(From, To, Len),
couch_httpd:send_chunk(
Resp,
<<"\r\nContent-Type: ", ContentType/binary, "\r\n", "Content-Range: ",
ContentRange/binary, "\r\n", "\r\n">>
),
couch_att:range_foldl(
Att,
From,
To + 1,
fun(Seg, _) -> send_chunk(Resp, Seg) end,
{ok, Resp}
),
couch_httpd:send_chunk(Resp, <<"\r\n--", Boundary/binary>>)
end,
Ranges
),
couch_httpd:send_chunk(Resp, <<"--">>),
couch_httpd:last_chunk(Resp),
{ok, Resp}.
parse_ranges(undefined, _Len) ->
undefined;
parse_ranges(fail, _Len) ->
undefined;
parse_ranges(Ranges, Len) ->
parse_ranges(Ranges, Len, []).
parse_ranges([], _Len, Acc) ->
lists:reverse(Acc);
parse_ranges([{0, none} | _], _Len, _Acc) ->
undefined;
parse_ranges([{From, To} | _], _Len, _Acc) when
is_integer(From) andalso is_integer(To) andalso To < From
->
throw(requested_range_not_satisfiable);
parse_ranges([{From, To} | Rest], Len, Acc) when
is_integer(To) andalso To >= Len
->
parse_ranges([{From, Len - 1}] ++ Rest, Len, Acc);
parse_ranges([{none, To} | Rest], Len, Acc) ->
parse_ranges([{Len - To, Len - 1}] ++ Rest, Len, Acc);
parse_ranges([{From, none} | Rest], Len, Acc) ->
parse_ranges([{From, Len - 1}] ++ Rest, Len, Acc);
parse_ranges([{From, To} | Rest], Len, Acc) ->
parse_ranges(Rest, Len, [{From, To}] ++ Acc).
make_content_range(From, To, Len) ->
?l2b(io_lib:format("bytes ~B-~B/~B", [From, To, Len])).
parse_doc_query(Req) ->
lists:foldl(fun parse_doc_query/2, #doc_query_args{}, chttpd:qs(Req)).
parse_shards_opt(Req) ->
[
{n, parse_shards_opt("n", Req, config:get_integer("cluster", "n", 3))},
{q, parse_shards_opt("q", Req, config:get_integer("cluster", "q", 2))},
{placement,
parse_shards_opt(
"placement", Req, config:get("cluster", "placement")
)}
].
parse_shards_opt("placement", Req, Default) ->
Err = <<"The `placement` value should be in a format `zone:n`.">>,
case chttpd:qs_value(Req, "placement", Default) of
Default ->
Default;
[] ->
throw({bad_request, Err});
Val ->
try
true = lists:all(
fun(Rule) ->
[_, N] = string:tokens(Rule, ":"),
couch_util:validate_positive_int(N)
end,
string:tokens(Val, ",")
),
Val
catch
_:_ ->
throw({bad_request, Err})
end
end;
parse_shards_opt(Param, Req, Default) ->
Val = chttpd:qs_value(Req, Param, Default),
Err = ?l2b(["The `", Param, "` value should be a positive integer."]),
case couch_util:validate_positive_int(Val) of
true -> Val;
false -> throw({bad_request, Err})
end.
parse_engine_opt(Req) ->
case chttpd:qs_value(Req, "engine") of
undefined ->
[];
Extension ->
Available = couch_server:get_engine_extensions(),
case lists:member(Extension, Available) of
true ->
[{engine, iolist_to_binary(Extension)}];
false ->
throw({bad_request, invalid_engine_extension})
end
end.
parse_partitioned_opt(Req) ->
case chttpd:qs_value(Req, "partitioned") of
undefined ->
[];
"false" ->
[];
"true" ->
ok = validate_partitioned_db_enabled(Req),
[
{partitioned, true},
{hash, [couch_partition, hash, []]}
];
_ ->
throw({bad_request, <<"Invalid `partitioned` parameter">>})
end.
validate_partitioned_db_enabled(Req) ->
case couch_flags:is_enabled(partitioned, Req) of
true ->
ok;
false ->
throw({bad_request, <<"Partitioned feature is not enabled.">>})
end.
parse_doc_query({Key, Value}, Args) ->
case {Key, Value} of
{"attachments", "true"} ->
Options = [attachments | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"meta", "true"} ->
Options = [revs_info, conflicts, deleted_conflicts | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"revs", "true"} ->
Options = [revs | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"local_seq", "true"} ->
Options = [local_seq | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"revs_info", "true"} ->
Options = [revs_info | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"conflicts", "true"} ->
Options = [conflicts | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"deleted", "true"} ->
Options = [deleted | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"deleted_conflicts", "true"} ->
Options = [deleted_conflicts | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"rev", Rev} ->
Args#doc_query_args{rev = couch_doc:parse_rev(Rev)};
{"open_revs", "all"} ->
Args#doc_query_args{open_revs = all};
{"open_revs", RevsJsonStr} ->
JsonArray = ?JSON_DECODE(RevsJsonStr),
Args#doc_query_args{open_revs = couch_doc:parse_revs(JsonArray)};
{"latest", "true"} ->
Options = [latest | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"atts_since", RevsJsonStr} ->
JsonArray = ?JSON_DECODE(RevsJsonStr),
Args#doc_query_args{atts_since = couch_doc:parse_revs(JsonArray)};
{"new_edits", "false"} ->
Args#doc_query_args{update_type = ?REPLICATED_CHANGES};
{"new_edits", "true"} ->
Args#doc_query_args{update_type = ?INTERACTIVE_EDIT};
{"att_encoding_info", "true"} ->
Options = [att_encoding_info | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"r", R} ->
Options = [{r, R} | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
{"w", W} ->
Options = [{w, W} | Args#doc_query_args.options],
Args#doc_query_args{options = Options};
% unknown key value pair, ignore.
_Else ->
Args
end.
parse_changes_query(Req) ->
erlang:erase(changes_seq_interval),
ChangesArgs = lists:foldl(
fun({Key, Value}, Args) ->
case {string:to_lower(Key), Value} of
{"feed", "live"} ->
%% sugar for continuous
Args#changes_args{feed = "continuous"};
{"feed", _} ->
Args#changes_args{feed = Value};
{"descending", "true"} ->
Args#changes_args{dir = rev};
{"since", _} ->
Args#changes_args{since = Value};
{"last-event-id", _} ->
Args#changes_args{since = Value};
{"limit", _} ->
Args#changes_args{limit = list_to_integer(Value)};
{"style", _} ->
Args#changes_args{style = list_to_existing_atom(Value)};
{"heartbeat", "true"} ->
Args#changes_args{heartbeat = true};
{"heartbeat", _} ->
try list_to_integer(Value) of
HeartbeatInteger when HeartbeatInteger > 0 ->
Args#changes_args{heartbeat = HeartbeatInteger};
_ ->
throw(
{bad_request,
<<"The heartbeat value should be a positive integer (in milliseconds).">>}
)
catch
error:badarg ->
throw(
{bad_request,
<<"Invalid heartbeat value. Expecting a positive integer value (in milliseconds).">>}
)
end;
{"timeout", _} ->
Args#changes_args{timeout = list_to_integer(Value)};
{"include_docs", "true"} ->
Args#changes_args{include_docs = true};
{"conflicts", "true"} ->
Args#changes_args{conflicts = true};
{"attachments", "true"} ->
Options = [attachments | Args#changes_args.doc_options],
Args#changes_args{doc_options = Options};
{"att_encoding_info", "true"} ->
Options = [att_encoding_info | Args#changes_args.doc_options],
Args#changes_args{doc_options = Options};
{"filter", _} ->
Args#changes_args{filter = Value};
{"seq_interval", _} ->
try list_to_integer(Value) of
V when V > 0 ->
erlang:put(changes_seq_interval, V),
Args;
_ ->
throw({bad_request, invalid_seq_interval})
catch
error:badarg ->
throw({bad_request, invalid_seq_interval})
end;
% unknown key value pair, ignore.
_Else ->
Args
end
end,
#changes_args{},
chttpd:qs(Req)
),
%% if it's an EventSource request with a Last-event-ID header
%% that should override the `since` query string, since it's
%% probably the browser reconnecting.
case ChangesArgs#changes_args.feed of
"eventsource" ->
case couch_httpd:header_value(Req, "last-event-id") of
undefined ->
ChangesArgs;
Value ->
ChangesArgs#changes_args{since = Value}
end;
_ ->
ChangesArgs
end.
extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev) ->
extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));
extract_header_rev(Req, ExplicitRev) ->
Etag =
case chttpd:header_value(Req, "If-Match") of
undefined -> undefined;
Value -> couch_doc:parse_rev(string:strip(Value, both, $"))
end,
case {ExplicitRev, Etag} of
{undefined, undefined} -> missing_rev;
{_, undefined} -> ExplicitRev;
{undefined, _} -> Etag;
_ when ExplicitRev == Etag -> Etag;
_ -> throw({bad_request, "Document rev and etag have different values"})
end.
validate_security_can_be_edited(DbName) ->
UserDbName = config:get("chttpd_auth", "authentication_db", "_users"),
CanEditUserSecurityObject = config:get("couchdb", "users_db_security_editable", "false"),
case {DbName, CanEditUserSecurityObject} of
{UserDbName, "false"} ->
Msg = "You can't edit the security object of the user database.",
throw({forbidden, Msg});
{_, _} ->
ok
end.
validate_revs(_Doc, true) ->
ok;
validate_revs(#doc{revs = {0, []}}, false) ->
throw(
{bad_request,
?l2b(
"When `new_edits: false`, " ++
"the document needs `_rev` or `_revisions` specified"
)}
);
validate_revs(_Doc, false) ->
ok.
validate_attachment_names(Doc) ->
lists:foreach(
fun(Att) ->
Name = couch_att:fetch(name, Att),
validate_attachment_name(Name)
end,
Doc#doc.atts
).
validate_attachment_name(Name) when is_list(Name) ->
validate_attachment_name(list_to_binary(Name));
validate_attachment_name(<<"_", Rest/binary>>) ->
throw(
{bad_request,
<<"Attachment name '_", Rest/binary, "' starts with prohibited character '_'">>}
);
validate_attachment_name(Name) ->
case couch_util:validate_utf8(Name) of
true -> Name;
false -> throw({bad_request, <<"Attachment name is not UTF-8 encoded">>})
end.
-spec monitor_attachments(couch_att:att() | [couch_att:att()]) -> [reference()].
monitor_attachments(Atts) when is_list(Atts) ->
lists:foldl(
fun(Att, Monitors) ->
case couch_att:fetch(data, Att) of
{Fd, _} ->
[monitor(process, Fd) | Monitors];
Else ->
couch_log:error("~p from couch_att:fetch(data, ~p)", [Else, Att]),
Monitors
end
end,
[],
non_stubbed_attachments(Atts)
);
monitor_attachments(Att) ->
monitor_attachments([Att]).
demonitor_refs(Refs) when is_list(Refs) ->
[demonitor(Ref) || Ref <- Refs].
% Return attachments which are not stubs
non_stubbed_attachments(Atts) when is_list(Atts) ->
lists:filter(
fun(Att) ->
couch_att:fetch(data, Att) =/= stub
end,
Atts
).
set_namespace(<<"_all_docs">>, Args) ->
set_namespace(undefined, Args);
set_namespace(<<"_local_docs">>, Args) ->
set_namespace(<<"_local">>, Args);
set_namespace(<<"_design_docs">>, Args) ->
set_namespace(<<"_design">>, Args);
set_namespace(NS, #mrargs{} = Args) ->
couch_mrview_util:set_extra(Args, namespace, NS).
set_include_sysdocs(<<"_local_docs">>, Req, Args) ->
Val = chttpd:qs_value(Req, "include_system", "false") == "true",
couch_mrview_util:set_extra(Args, include_system, Val);
set_include_sysdocs(_OP, _Req, Args) ->
Args.
%% /db/_bulk_get stuff
bulk_get_is_multipart(#httpd{mochi_req = MochiReq}) ->
Json = MochiReq:accepts_content_type("application/json"),
Mixed = MochiReq:accepts_content_type("multipart/mixed"),
Related = MochiReq:accepts_content_type("multipart/related"),
not Json andalso (Mixed orelse Related).
bulk_get_docs(Db, #{} = ArgsMap, Options) ->
% Sort args by doc ID to hopefully make querying B-trees a bit faster
KeyFun = fun({Ref, {DocId, _, _}}) -> {DocId, Ref} end,
CmpFun = fun(A, B) -> KeyFun(A) =< KeyFun(B) end,
ArgsList = lists:sort(CmpFun, maps:to_list(ArgsMap)),
% Split out known errors. Later, before returning, recombine them back into
% the final result map.
PartFun = fun({_Ref, {_DocId, RevsOrError, _DocOpts}}) ->
case RevsOrError of
L when is_list(L) -> true;
all -> true;
{error, _} -> false
end
end,
{ValidArgs, ErrorArgs} = lists:partition(PartFun, ArgsList),
UseBatches = config:get_boolean("chttpd", "bulk_get_use_batches", true),
Responses =
case UseBatches of
true -> bulk_get_docs_batched(Db, ValidArgs, Options);
false -> bulk_get_docs_individually(Db, ValidArgs, Options)
end,
MapFun = fun({Ref, {DocId, Response, _}} = RespTuple) ->
case Response of
[] ->
% Remap empty reponses to errors. This is a peculiarity of the
% _bulk_get HTTP API. If revision was not specifed, `undefined`
% must be returned as the error revision ID.
#{Ref := {_, Revs, _}} = ArgsMap,
RevStr = bulk_get_rev_error(Revs),
Error = {RevStr, <<"not_found">>, <<"missing">>},
{Ref, {DocId, {error, Error}, []}};
[_ | _] = DocRevisions ->
chttpd_stats:incr_reads(length(DocRevisions)),
RespTuple;
_ ->
RespTuple
end
end,
% Recombine with the inital known errors and return as a map
maps:from_list(lists:map(MapFun, Responses) ++ ErrorArgs).
bulk_get_docs_batched(Db, Args, Options) when is_list(Args) ->
% Args is [{Ref, {DocId, Revs, DocOpts}}, ...] but fabric:open_revs/3
% accepts [{{DocId, Revs}, DocOpts}, ...] so we need to transform them
ArgsFun = fun({_Ref, {DocId, Revs, DocOpts}}) ->
{{DocId, Revs}, DocOpts}
end,
OpenRevsArgs = lists:map(ArgsFun, Args),
case fabric:open_revs(Db, OpenRevsArgs, Options) of
{ok, Responses} ->
ZipFun = fun({Ref, {DocId, _Rev, DocOpts}}, Response) ->
{Ref, {DocId, Response, DocOpts ++ Options}}
end,
lists:zipwith(ZipFun, Args, Responses);
{error, Error} ->
% Distribute error to all request args, so it looks like they
% individually failed with that error
MapFun = fun({Ref, {DocId, Revs, _DocOpts}}) ->
RevStr = bulk_get_rev_error(Revs),
Tag = internal_fabric_error,
% This error will be emitted as json so make sure it's rendered
% to a string first.
Reason = couch_util:to_binary(Error),
{Ref, {DocId, {error, {RevStr, Tag, Reason}}, []}}
end,
lists:map(MapFun, Args)
end.
bulk_get_docs_individually(Db, Args, Options) when is_list(Args) ->
MapFun = fun({Ref, {DocId, Revs, DocOpts}}) ->
case fabric:open_revs(Db, DocId, Revs, DocOpts ++ Options) of
{ok, Response} ->
{Ref, {DocId, Response, DocOpts}};
{error, Error} ->
RevStr = bulk_get_rev_error(Revs),
Tag = internal_fabric_error,
% This error will be emitted as json so make sure it's rendered
% to a string first.
Reason = couch_util:to_binary(Error),
{Ref, {DocId, {error, {RevStr, Tag, Reason}}, []}}
end
end,
lists:map(MapFun, Args).
bulk_get_ret_json(#httpd{} = Req, ArgsRefs, ResultsMap, Options) ->
send_json(Req, 200, #{
<<"results">> => lists:map(
fun(Ref) ->
#{Ref := {DocId, Result, DocOpts}} = ResultsMap,
% We are about to encode the document into json and some of the
% provided general options might affect that so we make sure to
% combine all doc options and the general options together
AllOptions = DocOpts ++ Options,
#{
<<"id">> => DocId,
<<"docs">> => bulk_get_result(DocId, Result, AllOptions)
}
end,
ArgsRefs
)
}).
bulk_get_result(DocId, {error, {Rev, Error, Reason}}, _Options) ->
[bulk_get_json_error_map(DocId, Rev, Error, Reason)];
bulk_get_result(DocId, [_ | _] = DocRevs, Options) ->
MapFun = fun
({ok, Doc}) ->
#{<<"ok">> => couch_doc:to_json_obj(Doc, Options)};
({{Error, Reason}, RevId}) ->
Rev = couch_doc:rev_to_str(RevId),
bulk_get_json_error_map(DocId, Rev, Error, Reason)
end,
lists:map(MapFun, DocRevs).
bulk_get_json_error_map(DocId, Rev, Error, Reason) ->
#{
<<"error">> => #{
<<"id">> => DocId,
<<"rev">> => Rev,
<<"error">> => Error,
<<"reason">> => Reason
}
}.
bulk_get_ret_multipart(#httpd{} = Req, ArgsRefs, ResultsMap, Options) ->
MochiReq = Req#httpd.mochi_req,
Mixed = MochiReq:accepts_content_type("multipart/mixed"),
MpType =
case Mixed of
true -> "multipart/mixed";
false -> "multipart/related"
end,
Boundary = bulk_get_multipart_boundary(),
BoundaryCType = MpType ++ "; boundary=\"" ++ ?b2l(Boundary) ++ "\"",
CType = {"Content-Type", BoundaryCType},
{ok, Resp} = start_chunked_response(Req, 200, [CType]),
ForeachFun = fun(Ref) ->
#{Ref := {DocId, Result, DocOpts}} = ResultsMap,
case Result of
[_ | _] = DocRevs ->
AllOptions = DocOpts ++ Options,
send_docs_multipart_bulk_get(DocRevs, AllOptions, Boundary, Resp);
{error, {RevId, Error, Reason}} ->
EJson = bulk_get_json_error_map(DocId, RevId, Error, Reason),
Json = ?JSON_ENCODE(map_get(<<"error">>, EJson)),
ErrCType = <<"Content-Type: application/json">>,
Prefix = <<"\r\n", ErrCType/binary, "; error=\"true\"\r\n\r\n">>,
ErrorChunk = [<<"\r\n--", Boundary/binary>>, Prefix, Json],
couch_httpd:send_chunk(Resp, ErrorChunk)
end
end,
lists:foreach(ForeachFun, ArgsRefs),
case ArgsRefs of
[] ->
% Didn't send any docs, don't need to send a closing boundary
ok;
[_ | _] ->
% Sent at least one doc response, so also send the last boundary
EndBoundary = <<"\r\n", "--", Boundary/binary, "--\r\n">>,
couch_httpd:send_chunk(Resp, EndBoundary)
end,
couch_httpd:last_chunk(Resp).
bulk_get_parse_doc_query(Req) ->
lists:foldl(
fun({Key, Value}, Args) ->
ok = validate_query_param(Key),
parse_doc_query({Key, Value}, Args)
end,
#doc_query_args{},
chttpd:qs(Req)
).
validate_query_param("open_revs" = Key) ->
throw_bad_query_param(Key);
validate_query_param("new_edits" = Key) ->
throw_bad_query_param(Key);
validate_query_param("w" = Key) ->
throw_bad_query_param(Key);
validate_query_param("rev" = Key) ->
throw_bad_query_param(Key);
validate_query_param("atts_since" = Key) ->
throw_bad_query_param(Key);
validate_query_param(_) ->
ok.
throw_bad_query_param(Key) when is_list(Key) ->
throw_bad_query_param(?l2b(Key));
throw_bad_query_param(Key) when is_binary(Key) ->
Msg = <<"\"", Key/binary, "\" query parameter is not acceptable">>,
throw({bad_request, Msg}).
% Parse and tag bulk_get arguments. Return a list of argument tags in the same
% order as they were provided and a map of #{tag => {DocId, RevOrError,
% DocOpts}. That list is used to return them in the response in the exact same
% order.
%
bulk_get_parse_args(Db, Docs) ->
Fun = fun(Doc, Acc) ->
Ref = make_ref(),
Arg = {_DocId, _RevOrError, _DocOpts} = bulk_get_parse_arg(Db, Doc),
{Ref, Acc#{Ref => Arg}}
end,
lists:mapfoldr(Fun, #{}, Docs).
bulk_get_parse_arg(Db, {[_ | _] = Props}) ->
bulk_get_parse_doc_id(Db, Props);
bulk_get_parse_arg(_Db, _Invalid) ->
Error = {null, bad_request, <<"document must be a JSON object">>},
{null, {error, Error}, []}.
bulk_get_parse_doc_id(Db, [_ | _] = Props) ->
case couch_util:get_value(<<"id">>, Props) of
undefined ->
Error = {null, bad_request, <<"document id missed">>},
{null, {error, Error}, []};
DocId ->
try
couch_db:validate_docid(Db, DocId),
bulk_get_parse_revs(Props, DocId)
catch
throw:{Error, Reason} ->
{DocId, {error, {null, Error, Reason}}, []}
end
end.
bulk_get_parse_revs(Props, DocId) ->
RevStr = couch_util:get_value(<<"rev">>, Props),
case parse_field(<<"rev">>, RevStr) of
{error, {RevStr, Error, Reason}} ->
{DocId, {error, {RevStr, Error, Reason}}, []};
{ok, undefined} ->
bulk_get_parse_atts_since(Props, DocId, all);
{ok, Rev} ->
bulk_get_parse_atts_since(Props, DocId, [Rev])
end.
bulk_get_parse_atts_since(Props, DocId, Revs) ->
AttsSinceStr = couch_util:get_value(<<"atts_since">>, Props),
case parse_field(<<"atts_since">>, AttsSinceStr) of
{error, {BadAttsSinceRev, Error, Reason}} ->
{DocId, {error, {BadAttsSinceRev, Error, Reason}}, []};
{ok, []} ->
{DocId, Revs, []};
{ok, RevList} ->
Options = [{atts_since, RevList}, attachments],
{DocId, Revs, Options}
end.
parse_field(<<"rev">>, undefined) ->
{ok, undefined};
parse_field(<<"rev">>, Value) ->
try
Rev = couch_doc:parse_rev(Value),
{ok, Rev}
catch
throw:{bad_request = Error, Reason} ->
{error, {Value, Error, Reason}}
end;
parse_field(<<"atts_since">>, undefined) ->
{ok, []};
parse_field(<<"atts_since">>, []) ->
{ok, []};
parse_field(<<"atts_since">>, Value) when is_list(Value) ->
parse_atts_since(Value, []);
parse_field(<<"atts_since">>, Value) ->
{error, {Value, bad_request, <<"att_since value must be array of revs.">>}}.
parse_atts_since([], Acc) ->
{ok, lists:reverse(Acc)};
parse_atts_since([RevStr | Rest], Acc) ->
case parse_field(<<"rev">>, RevStr) of
{ok, Rev} ->
parse_atts_since(Rest, [Rev | Acc]);
{error, _} = Error ->
Error
end.
bulk_get_rev_error(all) ->
% When revision is not defined respond with `undefined` on error in the
% revision field.
<<"undefined">>;
bulk_get_rev_error([{Pos, RevId} = Rev]) when is_integer(Pos), is_binary(RevId) ->
couch_doc:rev_to_str(Rev).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
monitor_attachments_test_() ->
{"ignore stubs", fun() ->
Atts = [couch_att:new([{data, stub}])],
?_assertEqual([], monitor_attachments(Atts))
end}.
parse_partitioned_opt_test_() ->
{
foreach,
fun setup/0,
fun teardown/1,
[
t_should_allow_partitioned_db(),
t_should_throw_on_not_allowed_partitioned_db(),
t_returns_empty_array_for_partitioned_false(),
t_returns_empty_array_for_no_partitioned_qs()
]
}.
parse_shards_opt_test_() ->
{
foreach,
fun setup/0,
fun teardown/1,
[
t_should_allow_valid_q(),
t_should_default_on_missing_q(),
t_should_throw_on_invalid_q(),
t_should_allow_valid_n(),
t_should_default_on_missing_n(),
t_should_throw_on_invalid_n(),
t_should_allow_valid_placement(),
t_should_default_on_missing_placement(),
t_should_throw_on_invalid_placement()
]
}.
setup() ->
test_util:mock(config),
meck:expect(config, get_boolean, fun(_, _, Default) -> Default end),
meck:expect(config, get_float, fun(_, _, Default) -> Default end),
meck:expect(config, get_integer, fun(_, _, Default) -> Default end),
ok.
teardown(_) ->
meck:unload().
mock_request(Url) ->
Headers = mochiweb_headers:make([{"Host", "examples.com"}]),
MochiReq = mochiweb_request:new(nil, 'PUT', Url, {1, 1}, Headers),
#httpd{mochi_req = MochiReq}.
t_should_allow_partitioned_db() ->
?_test(begin
meck:expect(couch_flags, is_enabled, 2, true),
Req = mock_request("/all-test21?partitioned=true"),
[Partitioned, _] = parse_partitioned_opt(Req),
?assertEqual(Partitioned, {partitioned, true})
end).
t_should_throw_on_not_allowed_partitioned_db() ->
?_test(begin
meck:expect(couch_flags, is_enabled, 2, false),
Req = mock_request("/all-test21?partitioned=true"),
Throw = {bad_request, <<"Partitioned feature is not enabled.">>},
?assertThrow(Throw, parse_partitioned_opt(Req))
end).
t_returns_empty_array_for_partitioned_false() ->
?_test(begin
Req = mock_request("/all-test21?partitioned=false"),
?assertEqual(parse_partitioned_opt(Req), [])
end).
t_returns_empty_array_for_no_partitioned_qs() ->
?_test(begin
Req = mock_request("/all-test21"),
?assertEqual(parse_partitioned_opt(Req), [])
end).
t_should_allow_valid_q() ->
?_test(begin
Req = mock_request("/all-test21?q=1"),
Opts = parse_shards_opt(Req),
?assertEqual("1", couch_util:get_value(q, Opts))
end).
t_should_default_on_missing_q() ->
?_test(begin
Req = mock_request("/all-test21"),
Opts = parse_shards_opt(Req),
?assertEqual(2, couch_util:get_value(q, Opts))
end).
t_should_throw_on_invalid_q() ->
?_test(begin
Req = mock_request("/all-test21?q="),
Err = <<"The `q` value should be a positive integer.">>,
?assertThrow({bad_request, Err}, parse_shards_opt(Req))
end).
t_should_allow_valid_n() ->
?_test(begin
Req = mock_request("/all-test21?n=1"),
Opts = parse_shards_opt(Req),
?assertEqual("1", couch_util:get_value(n, Opts))
end).
t_should_default_on_missing_n() ->
?_test(begin
Req = mock_request("/all-test21"),
Opts = parse_shards_opt(Req),
?assertEqual(3, couch_util:get_value(n, Opts))
end).
t_should_throw_on_invalid_n() ->
?_test(begin
Req = mock_request("/all-test21?n="),
Err = <<"The `n` value should be a positive integer.">>,
?assertThrow({bad_request, Err}, parse_shards_opt(Req))
end).
t_should_allow_valid_placement() ->
{
foreach,
fun() -> ok end,
[
{"single zone",
?_test(begin
Req = mock_request("/all-test21?placement=az:1"),
Opts = parse_shards_opt(Req),
?assertEqual("az:1", couch_util:get_value(placement, Opts))
end)},
{"multi zone",
?_test(begin
Req = mock_request("/all-test21?placement=az:1,co:3"),
Opts = parse_shards_opt(Req),
?assertEqual(
"az:1,co:3",
couch_util:get_value(placement, Opts)
)
end)}
]
}.
t_should_default_on_missing_placement() ->
?_test(begin
Req = mock_request("/all-test21"),
Opts = parse_shards_opt(Req),
?assertEqual(undefined, couch_util:get_value(placement, Opts))
end).
t_should_throw_on_invalid_placement() ->
Err = <<"The `placement` value should be in a format `zone:n`.">>,
{
foreach,
fun() -> ok end,
[
{"empty placement",
?_test(begin
Req = mock_request("/all-test21?placement="),
?assertThrow({bad_request, Err}, parse_shards_opt(Req))
end)},
{"invalid format",
?_test(begin
Req = mock_request("/all-test21?placement=moon"),
?assertThrow({bad_request, Err}, parse_shards_opt(Req))
end)},
{"invalid n",
?_test(begin
Req = mock_request("/all-test21?placement=moon:eagle"),
?assertThrow({bad_request, Err}, parse_shards_opt(Req))
end)},
{"one invalid zone",
?_test(begin
Req = mock_request("/all-test21?placement=az:1,co:moon"),
?assertThrow({bad_request, Err}, parse_shards_opt(Req))
end)}
]
}.
-endif.