% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(chttpd_db).

-compile(tuple_calls).

-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("mem3/include/mem3.hrl").

-export([
    handle_request/1,
    handle_compact_req/2,
    handle_design_req/2,
    db_req/2,
    couch_doc_open/4,
    handle_changes_req/2,
    update_doc_result_to_json/1, update_doc_result_to_json/2,
    handle_design_info_req/3,
    handle_view_cleanup_req/2,
    update_doc/4,
    http_code_from_status/1,
    handle_partition_req/2
]).

-import(
    chttpd,
    [
        send_json/2, send_json/3, send_json/4,
        send_method_not_allowed/2,
        start_json_response/2,
        send_chunk/2,
        end_json_response/1,
        start_chunked_response/3,
        absolute_uri/2,
        send/2,
        start_response_length/4
    ]
).

-record(doc_query_args, {
    options = [],
    rev = nil,
    open_revs = [],
    update_type = ?INTERACTIVE_EDIT,
    atts_since = nil
}).

% Accumulator for changes_callback function
-record(cacc, {
    etag,
    feed,
    mochi,
    prepend = "",
    responding = false,
    chunks_sent = 0,
    buffer = [],
    bufsize = 0,
    threshold,
    websocket
}).

-define(IS_ALL_DOCS(T),
    (T == <<"_all_docs">> orelse
        T == <<"_local_docs">> orelse
        T == <<"_design_docs">>)
).

-define(IS_MANGO(T),
    (T == <<"_index">> orelse
        T == <<"_find">> orelse
        T == <<"_explain">>)
).

% Database request handlers
handle_request(#httpd{path_parts = [DbName | RestParts], method = Method} = Req) ->
    case {Method, RestParts} of
        {'PUT', []} ->
            create_db_req(Req, DbName);
        {'DELETE', []} ->
            % if we get ?rev=... the user is using a faulty script where the
            % document id is empty by accident. Let them recover safely.
            case chttpd:qs_value(Req, "rev", false) of
                false ->
                    delete_db_req(Req, DbName);
                _Rev ->
                    throw(
                        {bad_request,
                            "You tried to DELETE a database with a ?=rev parameter. " ++
                                "Did you mean to DELETE a document instead?"}
                    )
            end;
        {_, []} ->
            do_db_req(Req, fun db_req/2);
        {_, [SecondPart | _]} ->
            Handler = chttpd_handlers:db_handler(SecondPart, fun db_req/2),
            do_db_req(Req, Handler)
    end.

handle_changes_req(#httpd{method = 'POST'} = Req, Db) ->
    chttpd:validate_ctype(Req, "application/json"),
    case chttpd:body_length(Req) of
        0 ->
            handle_changes_req1(Req, Db);
        _ ->
            {JsonProps} = chttpd:json_body_obj(Req),
            handle_changes_req1(Req#httpd{req_body = {JsonProps}}, Db)
    end;
handle_changes_req(#httpd{method = 'GET'} = Req, Db) ->
    handle_changes_req1(Req, Db);
handle_changes_req(#httpd{path_parts = [_, <<"_changes">>]} = Req, _Db) ->
    send_method_not_allowed(Req, "GET,POST,HEAD").

handle_changes_req1(#httpd{} = Req, Db) ->
    #changes_args{filter = Raw, style = Style} = Args0 = parse_changes_query(Req),
    ChangesArgs = Args0#changes_args{
        filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db),
        db_open_options = [{user_ctx, couch_db:get_user_ctx(Db)}]
    },
    Max = chttpd:chunked_response_buffer_size(),
    case ChangesArgs#changes_args.feed of
        "normal" ->
            T0 = os:timestamp(),
            {ok, Info} = fabric:get_db_info(Db),
            Suffix = mem3:shard_suffix(Db),
            Etag = chttpd:make_etag({Info, Suffix}),
            DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
            couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
            chttpd:etag_respond(Req, Etag, fun() ->
                Acc0 = #cacc{
                    feed = normal,
                    etag = Etag,
                    mochi = Req,
                    threshold = Max
                },
                fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
            end);
        Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" ->
            couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]),
            Acc0 = #cacc{
                feed = list_to_atom(Feed),
                mochi = Req,
                threshold = Max,
                websocket = websocket_upgrade_requested(Req)
            },
            try
                fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
            after
                couch_stats:decrement_counter([couchdb, httpd, clients_requesting_changes])
            end;
        _ ->
            Msg = <<"Supported `feed` types: normal, continuous, live, longpoll, eventsource">>,
            throw({bad_request, Msg})
    end.

% websocket callbacks
changes_callback(start, #cacc{feed = continuous, websocket = true} = Acc) ->
    {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(
        Acc#cacc.mochi#httpd.mochi_req, fun client_ws_loop/3),
    {ok, Acc#cacc{websocket = {ws, ReentryWs, ReplyChannel}}};
changes_callback({change, Change}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
    chttpd_stats:incr_rows(),
    ReplyChannel(?JSON_ENCODE(Change)),
    {ok, Acc};
changes_callback(timeout, #cacc{websocket = {ws, _ReentryWs, _ReplyChannel}} = Acc) ->
    ws_pong(Acc),
    {ok, Acc};
changes_callback(waiting_for_updates, #cacc{websocket = {ws, _ReentryWs, _ReplyChannel}} = Acc) ->
    {ok, Acc};
changes_callback({error, Reason}, #cacc{websocket = {ws, _ReentryWs, _ReplyChannel}} = Acc) ->
    {_Code, ErrorStr, ReasonStr} = chttpd:error_info(Reason),
    Row =
        {[
            {<<"error">>, ErrorStr},
            {<<"reason">>, ReasonStr}
         ]},
    ws_close(Acc, 1011, ?JSON_ENCODE(Row));
changes_callback({stop, EndSeq, Pending}, #cacc{websocket = {ws, _ReentryWs, ReplyChannel}} = Acc) ->
    Row =
        {[
            {<<"last_seq">>, EndSeq},
            {<<"pending">>, Pending}
        ]},
    ReplyChannel(?JSON_ENCODE(Row)),
    ws_close(Acc, 1000);

% callbacks for continuous feed (newline-delimited JSON Objects)
changes_callback(start, #cacc{feed = continuous} = Acc) ->
    {ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200),
    {ok, Acc#cacc{mochi = Resp, responding = true}};
changes_callback({change, Change}, #cacc{feed = continuous} = Acc) ->
    chttpd_stats:incr_rows(),
    Data = [?JSON_ENCODE(Change) | "\n"],
    Len = iolist_size(Data),
    maybe_flush_changes_feed(Acc, Data, Len);
changes_callback({stop, EndSeq, Pending}, #cacc{feed = continuous} = Acc) ->
    #cacc{mochi = Resp, buffer = Buf} = Acc,
    Row =
        {[
            {<<"last_seq">>, EndSeq},
            {<<"pending">>, Pending}
        ]},
    Data = [Buf, ?JSON_ENCODE(Row) | "\n"],
    {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Data),
    chttpd:end_delayed_json_response(Resp1);
% callbacks for eventsource feed (newline-delimited eventsource Objects)
changes_callback(start, #cacc{feed = eventsource} = Acc) ->
    #cacc{mochi = Req} = Acc,
    Headers = [
        {"Content-Type", "text/event-stream"},
        {"Cache-Control", "no-cache"}
    ],
    {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers),
    {ok, Acc#cacc{mochi = Resp, responding = true}};
changes_callback({change, {ChangeProp} = Change}, #cacc{feed = eventsource} = Acc) ->
    chttpd_stats:incr_rows(),
    Seq = proplists:get_value(seq, ChangeProp),
    Chunk = [
        "data: ",
        ?JSON_ENCODE(Change),
        "\n",
        "id: ",
        ?JSON_ENCODE(Seq),
        "\n\n"
    ],
    Len = iolist_size(Chunk),
    maybe_flush_changes_feed(Acc, Chunk, Len);
changes_callback(timeout, #cacc{feed = eventsource} = Acc) ->
    #cacc{mochi = Resp, chunks_sent = ChunksSet} = Acc,
    Chunk = "event: heartbeat\ndata: \n\n",
    {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
    {ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSet + 1}};
changes_callback({stop, _EndSeq, _Pending}, #cacc{feed = eventsource} = Acc) ->
    #cacc{mochi = Resp, buffer = Buf} = Acc,
    {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
    chttpd:end_delayed_json_response(Resp1);
% callbacks for longpoll and normal (single JSON Object)
changes_callback(start, #cacc{feed = normal} = Acc) ->
    #cacc{etag = Etag, mochi = Req} = Acc,
    FirstChunk = "{\"results\":[\n",
    {ok, Resp} = chttpd:start_delayed_json_response(
        Req,
        200,
        [{"ETag", Etag}],
        FirstChunk
    ),
    {ok, Acc#cacc{mochi = Resp, responding = true}};
changes_callback(start, Acc) ->
    #cacc{mochi = Req} = Acc,
    FirstChunk = "{\"results\":[\n",
    {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
    {ok, Acc#cacc{mochi = Resp, responding = true}};
changes_callback({change, Change}, Acc) ->
    chttpd_stats:incr_rows(),
    Data = [Acc#cacc.prepend, ?JSON_ENCODE(Change)],
    Len = iolist_size(Data),
    maybe_flush_changes_feed(Acc, Data, Len);
changes_callback({stop, EndSeq, Pending}, Acc) ->
    #cacc{buffer = Buf, mochi = Resp, threshold = Max} = Acc,
    Terminator = [
        "\n],\n\"last_seq\":",
        ?JSON_ENCODE(EndSeq),
        ",\"pending\":",
        ?JSON_ENCODE(Pending),
        "}\n"
    ],
    {ok, Resp1} = chttpd:close_delayed_json_object(Resp, Buf, Terminator, Max),
    chttpd:end_delayed_json_response(Resp1);
changes_callback(waiting_for_updates, #cacc{buffer = []} = Acc) ->
    #cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc,
    case ChunksSent > 0 of
        true ->
            {ok, Acc};
        false ->
            {ok, Resp1} = chttpd:send_delayed_chunk(Resp, <<"\n">>),
            {ok, Acc#cacc{mochi = Resp1, chunks_sent = 1}}
    end;
changes_callback(waiting_for_updates, Acc) ->
    #cacc{buffer = Buf, mochi = Resp, chunks_sent = ChunksSent} = Acc,
    {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Buf),
    {ok, Acc#cacc{
        buffer = [],
        bufsize = 0,
        mochi = Resp1,
        chunks_sent = ChunksSent + 1
    }};
changes_callback(timeout, Acc) ->
    #cacc{mochi = Resp, chunks_sent = ChunksSent} = Acc,
    {ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
    {ok, Acc#cacc{mochi = Resp1, chunks_sent = ChunksSent + 1}};
changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
    #cacc{mochi = Req} = Acc,
    chttpd:send_error(Req, Reason);
changes_callback({error, Reason}, #cacc{feed = normal, responding = false} = Acc) ->
    #cacc{mochi = Req} = Acc,
    chttpd:send_error(Req, Reason);
changes_callback({error, Reason}, Acc) ->
    chttpd:send_delayed_error(Acc#cacc.mochi, Reason).

maybe_flush_changes_feed(#cacc{bufsize = Size, threshold = Max} = Acc, Data, Len) when
    Size > 0 andalso (Size + Len) > Max
->
    #cacc{buffer = Buffer, mochi = Resp} = Acc,
    {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
    {ok, Acc#cacc{prepend = ",\r\n", buffer = Data, bufsize = Len, mochi = R1}};
maybe_flush_changes_feed(Acc0, Data, Len) ->
    #cacc{buffer = Buf, bufsize = Size, chunks_sent = ChunksSent} = Acc0,
    Acc = Acc0#cacc{
        prepend = ",\r\n",
        buffer = [Buf | Data],
        bufsize = Size + Len,
        chunks_sent = ChunksSent + 1
    },
    {ok, Acc}.

handle_compact_req(#httpd{method = 'POST'} = Req, Db) ->
    chttpd:validate_ctype(Req, "application/json"),
    case Req#httpd.path_parts of
        [_DbName, <<"_compact">>] ->
            ok = fabric:compact(Db),
            send_json(Req, 202, {[{ok, true}]});
        [DbName, <<"_compact">>, DesignName | _] ->
            case ddoc_cache:open(DbName, <<"_design/", DesignName/binary>>) of
                {ok, _DDoc} ->
                    ok = fabric:compact(Db, DesignName),
                    send_json(Req, 202, {[{ok, true}]});
                Error ->
                    throw(Error)
            end
    end;
handle_compact_req(Req, _Db) ->
    send_method_not_allowed(Req, "POST").

handle_view_cleanup_req(#httpd{method = 'POST'} = Req, Db) ->
    ok = fabric:cleanup_index_files_all_nodes(Db),
    send_json(Req, 202, {[{ok, true}]});
handle_view_cleanup_req(Req, _Db) ->
    send_method_not_allowed(Req, "POST").

handle_partition_req(#httpd{path_parts = [_, _]} = _Req, _Db) ->
    throw({bad_request, invalid_partition_req});
handle_partition_req(#httpd{method = 'GET', path_parts = [_, _, PartId]} = Req, Db) ->
    couch_partition:validate_partition(PartId),
    case couch_db:is_partitioned(Db) of
        true ->
            {ok, PartitionInfo} = fabric:get_partition_info(Db, PartId),
            send_json(Req, {PartitionInfo});
        false ->
            throw({bad_request, <<"database is not partitioned">>})
    end;
handle_partition_req(
    #httpd{
        method = 'POST',
        path_parts = [_, <<"_partition">>, <<"_", _/binary>>]
    },
    _Db
) ->
    Msg = <<"Partition must not start with an underscore">>,
    throw({illegal_partition, Msg});
handle_partition_req(#httpd{path_parts = [_, _, _]} = Req, _Db) ->
    send_method_not_allowed(Req, "GET");
handle_partition_req(#httpd{path_parts = [DbName, _, PartId | Rest]} = Req, Db) ->
    case couch_db:is_partitioned(Db) of
        true ->
            couch_partition:validate_partition(PartId),
            QS = chttpd:qs(Req),
            PartIdStr = ?b2l(PartId),
            QSPartIdStr = couch_util:get_value("partition", QS, PartIdStr),
            if
                QSPartIdStr == PartIdStr ->
                    ok;
                true ->
                    Msg = <<"Conflicting value for `partition` in query string">>,
                    throw({bad_request, Msg})
            end,
            NewQS = lists:ukeysort(1, [{"partition", PartIdStr} | QS]),
            NewReq = Req#httpd{
                path_parts = [DbName | Rest],
                qs = NewQS
            },
            update_partition_stats(Rest),
            case Rest of
                [OP | _] when OP == <<"_all_docs">> orelse ?IS_MANGO(OP) ->
                    case chttpd_handlers:db_handler(OP, fun db_req/2) of
                        Handler when is_function(Handler, 2) ->
                            Handler(NewReq, Db);
                        _ ->
                            chttpd:send_error(Req, not_found)
                    end;
                [<<"_design">>, _Name, <<"_", _/binary>> | _] ->
                    handle_design_req(NewReq, Db);
                _ ->
                    chttpd:send_error(Req, not_found)
            end;
        false ->
            throw({bad_request, <<"database is not partitioned">>})
    end;
handle_partition_req(Req, _Db) ->
    chttpd:send_error(Req, not_found).

update_partition_stats(PathParts) ->
    case PathParts of
        [<<"_design">> | _] ->
            couch_stats:increment_counter([couchdb, httpd, partition_view_requests]);
        [<<"_all_docs">> | _] ->
            couch_stats:increment_counter([couchdb, httpd, partition_all_docs_requests]);
        [<<"_find">> | _] ->
            couch_stats:increment_counter([couchdb, httpd, partition_find_requests]);
        [<<"_explain">> | _] ->
            couch_stats:increment_counter([couchdb, httpd, partition_explain_requests]);
        _ ->
            % ignore path that do not match
            ok
    end.

handle_design_req(
    #httpd{
        path_parts = [_DbName, _Design, Name, <<"_", _/binary>> = Action | _Rest]
    } = Req,
    Db
) ->
    DbName = mem3:dbname(couch_db:name(Db)),
    case ddoc_cache:open(DbName, <<"_design/", Name/binary>>) of
        {ok, DDoc} ->
            Handler = chttpd_handlers:design_handler(Action, fun bad_action_req/3),
            Handler(Req, Db, DDoc);
        Error ->
            throw(Error)
    end;
handle_design_req(Req, Db) ->
    db_req(Req, Db).

bad_action_req(#httpd{path_parts = [_, _, Name | FileNameParts]} = Req, Db, _DDoc) ->
    db_attachment_req(Req, Db, <<"_design/", Name/binary>>, FileNameParts).

handle_design_info_req(#httpd{method = 'GET'} = Req, Db, #doc{} = DDoc) ->
    [_, _, Name, _] = Req#httpd.path_parts,
    {ok, GroupInfoList} = fabric:get_view_group_info(Db, DDoc),
    send_json(
        Req,
        200,
        {[
            {name, Name},
            {view_index, {GroupInfoList}}
        ]}
    );
handle_design_info_req(Req, _Db, _DDoc) ->
    send_method_not_allowed(Req, "GET").

create_db_req(#httpd{} = Req, DbName) ->
    couch_httpd:verify_is_server_admin(Req),
    ShardsOpt = parse_shards_opt(Req),
    EngineOpt = parse_engine_opt(Req),
    DbProps = parse_partitioned_opt(Req),
    Options = lists:append([ShardsOpt, [{props, DbProps}], EngineOpt]),
    DocUrl = absolute_uri(Req, "/" ++ couch_util:url_encode(DbName)),
    case fabric:create_db(DbName, Options) of
        ok ->
            send_json(Req, 201, [{"Location", DocUrl}], {[{ok, true}]});
        accepted ->
            send_json(Req, 202, [{"Location", DocUrl}], {[{ok, true}]});
        {error, file_exists} ->
            chttpd:send_error(Req, file_exists);
        Error ->
            throw(Error)
    end.

delete_db_req(#httpd{} = Req, DbName) ->
    couch_httpd:verify_is_server_admin(Req),
    case fabric:delete_db(DbName, []) of
        ok ->
            send_json(Req, 200, {[{ok, true}]});
        accepted ->
            send_json(Req, 202, {[{ok, true}]});
        Error ->
            throw(Error)
    end.

do_db_req(#httpd{path_parts = [DbName | _], user_ctx = Ctx} = Req, Fun) ->
    Shard = hd(mem3:shards(DbName)),
    Props = couch_util:get_value(props, Shard#shard.opts, []),
    Opts =
        case Ctx of
            undefined ->
                [{props, Props}];
            #user_ctx{} ->
                [{user_ctx, Ctx}, {props, Props}]
        end,
    {ok, Db} = couch_db:clustered_db(DbName, Opts),
    Fun(Req, Db).

db_req(#httpd{method = 'GET', path_parts = [DbName]} = Req, _Db) ->
    % measure the time required to generate the etag, see if it's worth it
    T0 = os:timestamp(),
    {ok, DbInfo} = fabric:get_db_info(DbName),
    DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
    couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
    send_json(Req, {DbInfo});
db_req(#httpd{method = 'POST', path_parts = [DbName], user_ctx = Ctx} = Req, Db) ->
    chttpd:validate_ctype(Req, "application/json"),

    W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
    Options = [{user_ctx, Ctx}, {w, W}],

    Doc = couch_db:doc_from_json_obj_validate(Db, chttpd:json_body(Req)),
    validate_attachment_names(Doc),
    Doc2 =
        case Doc#doc.id of
            <<"">> ->
                Doc#doc{id = couch_uuids:new(), revs = {0, []}};
            _ ->
                Doc
        end,
    DocId = Doc2#doc.id,
    case chttpd:qs_value(Req, "batch") of
        "ok" ->
            % async_batching
            spawn(fun() ->
                case catch (fabric:update_doc(Db, Doc2, Options)) of
                    {ok, _} ->
                        chttpd_stats:incr_writes(),
                        ok;
                    {accepted, _} ->
                        chttpd_stats:incr_writes(),
                        ok;
                    Error ->
                        couch_log:debug("Batch doc error (~s): ~p", [DocId, Error])
                end
            end),

            send_json(
                Req,
                202,
                [],
                {[
                    {ok, true},
                    {id, DocId}
                ]}
            );
        _Normal ->
            % normal
            DocUrl = absolute_uri(Req, [
                $/,
                couch_util:url_encode(DbName),
                $/,
                couch_util:url_encode(DocId)
            ]),
            case fabric:update_doc(Db, Doc2, Options) of
                {ok, NewRev} ->
                    chttpd_stats:incr_writes(),
                    HttpCode = 201;
                {accepted, NewRev} ->
                    chttpd_stats:incr_writes(),
                    HttpCode = 202
            end,
            send_json(
                Req,
                HttpCode,
                [{"Location", DocUrl}],
                {[
                    {ok, true},
                    {id, DocId},
                    {rev, couch_doc:rev_to_str(NewRev)}
                ]}
            )
    end;
db_req(#httpd{path_parts = [_DbName]} = Req, _Db) ->
    send_method_not_allowed(Req, "DELETE,GET,HEAD,POST");
db_req(
    #httpd{
        method = 'POST',
        path_parts = [DbName, <<"_ensure_full_commit">>],
        user_ctx = Ctx
    } = Req,
    _Db
) ->
    chttpd:validate_ctype(Req, "application/json"),
    %% use fabric call to trigger a database_does_not_exist exception
    %% for missing databases that'd return error 404 from chttpd
    %% get_security used to prefer shards on the same node over other nodes
    fabric:get_security(DbName, [{user_ctx, Ctx}]),
    CreationTime = mem3:shard_creation_time(DbName),
    send_json(
        Req,
        201,
        {[
            {ok, true},
            {instance_start_time, CreationTime}
        ]}
    );
db_req(#httpd{path_parts = [_, <<"_ensure_full_commit">>]} = Req, _Db) ->
    send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'POST', path_parts = [_, <<"_bulk_docs">>], user_ctx = Ctx} = Req, Db) ->
    couch_stats:increment_counter([couchdb, httpd, bulk_requests]),
    chttpd:validate_ctype(Req, "application/json"),
    {JsonProps} = chttpd:json_body_obj(Req),
    DocsArray =
        case couch_util:get_value(<<"docs">>, JsonProps) of
            undefined ->
                throw({bad_request, <<"POST body must include `docs` parameter.">>});
            DocsArray0 when not is_list(DocsArray0) ->
                throw({bad_request, <<"`docs` parameter must be an array.">>});
            DocsArray0 ->
                DocsArray0
        end,
    couch_stats:update_histogram([couchdb, httpd, bulk_docs], length(DocsArray)),
    W =
        case couch_util:get_value(<<"w">>, JsonProps) of
            Value when is_integer(Value) ->
                integer_to_list(Value);
            _ ->
                chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db)))
        end,
    case chttpd:header_value(Req, "X-Couch-Full-Commit") of
        "true" ->
            Options = [full_commit, {user_ctx, Ctx}, {w, W}];
        "false" ->
            Options = [delay_commit, {user_ctx, Ctx}, {w, W}];
        _ ->
            Options = [{user_ctx, Ctx}, {w, W}]
    end,
    NewEdits = couch_util:get_value(<<"new_edits">>, JsonProps, true),
    Docs = lists:map(
        fun(JsonObj) ->
            Doc = couch_db:doc_from_json_obj_validate(Db, JsonObj),
            validate_revs(Doc, NewEdits),
            validate_attachment_names(Doc),
            case Doc#doc.id of
                <<>> -> Doc#doc{id = couch_uuids:new()};
                _ -> Doc
            end
        end,
        DocsArray
    ),
    case NewEdits of
        true ->
            Options2 =
                case couch_util:get_value(<<"all_or_nothing">>, JsonProps) of
                    true -> [all_or_nothing | Options];
                    _ -> Options
                end,
            case fabric:update_docs(Db, Docs, Options2) of
                {ok, Results} ->
                    % output the results
                    chttpd_stats:incr_writes(length(Results)),
                    DocResults = lists:zipwith(
                        fun update_doc_result_to_json/2,
                        Docs,
                        Results
                    ),
                    send_json(Req, 201, DocResults);
                {accepted, Results} ->
                    % output the results
                    chttpd_stats:incr_writes(length(Results)),
                    DocResults = lists:zipwith(
                        fun update_doc_result_to_json/2,
                        Docs,
                        Results
                    ),
                    send_json(Req, 202, DocResults);
                {error, Results} ->
                    % output the results
                    chttpd_stats:incr_writes(length(Results)),
                    DocResults = lists:zipwith(
                        fun update_doc_result_to_json/2,
                        Docs,
                        Results
                    ),
                    send_json(Req, 500, DocResults);
                {aborted, Errors} ->
                    ErrorsJson =
                        lists:map(fun update_doc_result_to_json/1, Errors),
                    send_json(Req, 417, ErrorsJson)
            end;
        false ->
            case fabric:update_docs(Db, Docs, [?REPLICATED_CHANGES | Options]) of
                {ok, Errors} ->
                    chttpd_stats:incr_writes(length(Docs)),
                    ErrorsJson = lists:map(fun update_doc_result_to_json/1, Errors),
                    send_json(Req, 201, ErrorsJson);
                {accepted, Errors} ->
                    chttpd_stats:incr_writes(length(Docs)),
                    ErrorsJson = lists:map(fun update_doc_result_to_json/1, Errors),
                    send_json(Req, 202, ErrorsJson);
                {error, Errors} ->
                    chttpd_stats:incr_writes(length(Docs)),
                    ErrorsJson = lists:map(fun update_doc_result_to_json/1, Errors),
                    send_json(Req, 500, ErrorsJson)
            end;
        _ ->
            throw({bad_request, <<"`new_edits` parameter must be a boolean.">>})
    end;
db_req(#httpd{path_parts = [_, <<"_bulk_docs">>]} = Req, _Db) ->
    send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'POST', path_parts = [_, <<"_bulk_get">>]} = Req, Db) ->
    couch_stats:increment_counter([couchdb, httpd, bulk_requests]),
    couch_httpd:validate_ctype(Req, "application/json"),
    {JsonProps} = chttpd:json_body_obj(Req),
    case couch_util:get_value(<<"docs">>, JsonProps) of
        undefined ->
            throw({bad_request, <<"Missing JSON list of 'docs'.">>});
        Docs ->
            #doc_query_args{options = Options0} = bulk_get_parse_doc_query(Req),
            Options = [{user_ctx, Req#httpd.user_ctx} | Options0],
            {ArgsRefs, ArgsMap} = bulk_get_parse_args(Db, Docs),
            ResultsMap = bulk_get_docs(Db, ArgsMap, Options),
            case bulk_get_is_multipart(Req) of
                false -> bulk_get_ret_json(Req, ArgsRefs, ResultsMap, Options);
                true -> bulk_get_ret_multipart(Req, ArgsRefs, ResultsMap, Options)
            end
    end;
db_req(#httpd{path_parts = [_, <<"_bulk_get">>]} = Req, _Db) ->
    send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'POST', path_parts = [_, <<"_purge">>]} = Req, Db) ->
    couch_stats:increment_counter([couchdb, httpd, purge_requests]),
    chttpd:validate_ctype(Req, "application/json"),
    W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
    Options = [{user_ctx, Req#httpd.user_ctx}, {w, W}],
    {IdsRevs} = chttpd:json_body_obj(Req),
    IdsRevs2 = [{Id, couch_doc:parse_revs(Revs)} || {Id, Revs} <- IdsRevs],
    MaxIds = config:get_integer("purge", "max_document_id_number", 100),
    case length(IdsRevs2) =< MaxIds of
        false -> throw({bad_request, "Exceeded maximum number of documents."});
        true -> ok
    end,
    RevsLen = lists:foldl(
        fun({_Id, Revs}, Acc) ->
            length(Revs) + Acc
        end,
        0,
        IdsRevs2
    ),
    MaxRevs = config:get_integer("purge", "max_revisions_number", 1000),
    case RevsLen =< MaxRevs of
        false -> throw({bad_request, "Exceeded maximum number of revisions."});
        true -> ok
    end,
    couch_stats:increment_counter([couchdb, document_purges, total], length(IdsRevs2)),
    Results2 =
        case fabric:purge_docs(Db, IdsRevs2, Options) of
            {ok, Results} ->
                chttpd_stats:incr_writes(length(Results)),
                Results;
            {accepted, Results} ->
                chttpd_stats:incr_writes(length(Results)),
                Results
        end,
    {Code, Json} = purge_results_to_json(IdsRevs2, Results2),
    send_json(Req, Code, {[{<<"purge_seq">>, null}, {<<"purged">>, {Json}}]});
db_req(#httpd{path_parts = [_, <<"_purge">>]} = Req, _Db) ->
    send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'GET', path_parts = [_, OP]} = Req, Db) when ?IS_ALL_DOCS(OP) ->
    case chttpd:qs_json_value(Req, "keys", nil) of
        Keys when is_list(Keys) ->
            all_docs_view(Req, Db, Keys, OP);
        nil ->
            all_docs_view(Req, Db, undefined, OP);
        _ ->
            throw({bad_request, "`keys` parameter must be an array."})
    end;
db_req(
    #httpd{
        method = 'POST',
        path_parts = [_, OP, <<"queries">>]
    } = Req,
    Db
) when ?IS_ALL_DOCS(OP) ->
    Props = chttpd:json_body_obj(Req),
    case couch_mrview_util:get_view_queries(Props) of
        undefined ->
            throw({bad_request, <<"POST body must include `queries` parameter.">>});
        Queries ->
            multi_all_docs_view(Req, Db, OP, Queries)
    end;
db_req(
    #httpd{path_parts = [_, OP, <<"queries">>]} = Req,
    _Db
) when ?IS_ALL_DOCS(OP) ->
    send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'POST', path_parts = [_, OP]} = Req, Db) when ?IS_ALL_DOCS(OP) ->
    chttpd:validate_ctype(Req, "application/json"),
    {Fields} = chttpd:json_body_obj(Req),
    case couch_util:get_value(<<"keys">>, Fields, nil) of
        Keys when is_list(Keys) ->
            all_docs_view(Req, Db, Keys, OP);
        nil ->
            all_docs_view(Req, Db, undefined, OP);
        _ ->
            throw({bad_request, "`keys` body member must be an array."})
    end;
db_req(#httpd{path_parts = [_, OP]} = Req, _Db) when ?IS_ALL_DOCS(OP) ->
    send_method_not_allowed(Req, "GET,HEAD,POST");
db_req(#httpd{method = 'POST', path_parts = [_, <<"_missing_revs">>]} = Req, Db) ->
    chttpd:validate_ctype(Req, "application/json"),
    {JsonDocIdRevs} = chttpd:json_body_obj(Req),
    case fabric:get_missing_revs(Db, JsonDocIdRevs) of
        {error, Reason} ->
            chttpd:send_error(Req, Reason);
        {ok, Results} ->
            Results2 = [
                {Id, couch_doc:revs_to_strs(Revs)}
             || {Id, Revs, _} <- Results
            ],
            send_json(
                Req,
                {[
                    {missing_revs, {Results2}}
                ]}
            )
    end;
db_req(#httpd{path_parts = [_, <<"_missing_revs">>]} = Req, _Db) ->
    send_method_not_allowed(Req, "POST");
db_req(#httpd{method = 'POST', path_parts = [_, <<"_revs_diff">>]} = Req, Db) ->
    chttpd:validate_ctype(Req, "application/json"),
    {JsonDocIdRevs} = chttpd:json_body_obj(Req),
    case fabric:get_missing_revs(Db, JsonDocIdRevs) of
        {error, Reason} ->
            chttpd:send_error(Req, Reason);
        {ok, Results} ->
            Results2 =
                lists:map(
                    fun({Id, MissingRevs, PossibleAncestors}) ->
                        {Id, {
                            [{missing, couch_doc:revs_to_strs(MissingRevs)}] ++
                                if
                                    PossibleAncestors == [] ->
                                        [];
                                    true ->
                                        [
                                            {possible_ancestors,
                                                couch_doc:revs_to_strs(PossibleAncestors)}
                                        ]
                                end
                        }}
                    end,
                    Results
                ),
            send_json(Req, {Results2})
    end;
db_req(#httpd{path_parts = [_, <<"_revs_diff">>]} = Req, _Db) ->
    send_method_not_allowed(Req, "POST");
db_req(
    #httpd{method = 'PUT', path_parts = [_, <<"_security">>], user_ctx = Ctx} = Req,
    Db
) ->
    DbName = ?b2l(couch_db:name(Db)),
    validate_security_can_be_edited(DbName),
    SecObj = chttpd:json_body(Req),
    case fabric:set_security(Db, SecObj, [{user_ctx, Ctx}]) of
        ok ->
            send_json(Req, {[{<<"ok">>, true}]});
        Else ->
            throw(Else)
    end;
db_req(#httpd{method = 'GET', path_parts = [_, <<"_security">>]} = Req, Db) ->
    send_json(Req, fabric:get_security(Db));
db_req(#httpd{path_parts = [_, <<"_security">>]} = Req, _Db) ->
    send_method_not_allowed(Req, "PUT,GET");
db_req(
    #httpd{method = 'PUT', path_parts = [_, <<"_revs_limit">>], user_ctx = Ctx} = Req,
    Db
) ->
    Limit = chttpd:json_body(Req),
    ok = fabric:set_revs_limit(Db, Limit, [{user_ctx, Ctx}]),
    send_json(Req, {[{<<"ok">>, true}]});
db_req(#httpd{method = 'GET', path_parts = [_, <<"_revs_limit">>]} = Req, Db) ->
    send_json(Req, fabric:get_revs_limit(Db));
db_req(#httpd{path_parts = [_, <<"_revs_limit">>]} = Req, _Db) ->
    send_method_not_allowed(Req, "PUT,GET");
db_req(#httpd{method = 'PUT', path_parts = [_, <<"_purged_infos_limit">>]} = Req, Db) ->
    Options = [{user_ctx, Req#httpd.user_ctx}],
    case chttpd:json_body(Req) of
        Limit when is_integer(Limit), Limit > 0 ->
            case fabric:set_purge_infos_limit(Db, Limit, Options) of
                ok ->
                    send_json(Req, {[{<<"ok">>, true}]});
                Error ->
                    throw(Error)
            end;
        _ ->
            throw({bad_request, "`purge_infos_limit` must be positive integer"})
    end;
db_req(#httpd{method = 'GET', path_parts = [_, <<"_purged_infos_limit">>]} = Req, Db) ->
    send_json(Req, fabric:get_purge_infos_limit(Db));
% Special case to enable using an unencoded slash in the URL of design docs,
% as slashes in document IDs must otherwise be URL encoded.
db_req(
    #httpd{
        method = 'GET', mochi_req = MochiReq, path_parts = [_DbName, <<"_design/", _/binary>> | _]
    } = Req,
    _Db
) ->
    [Head | Tail] = re:split(MochiReq:get(raw_path), "_design%2F", [{return, list}, caseless]),
    chttpd:send_redirect(Req, Head ++ "_design/" ++ Tail);
db_req(#httpd{path_parts = [_DbName, <<"_design">>, Name]} = Req, Db) ->
    db_doc_req(Req, Db, <<"_design/", Name/binary>>);
db_req(#httpd{path_parts = [_DbName, <<"_design">>, Name | FileNameParts]} = Req, Db) ->
    db_attachment_req(Req, Db, <<"_design/", Name/binary>>, FileNameParts);
% Special case to allow for accessing local documents without %2F
% encoding the docid. Throws out requests that don't have the second
% path part or that specify an attachment name.
db_req(#httpd{path_parts = [_DbName, <<"_local">>]}, _Db) ->
    throw({bad_request, <<"Invalid _local document id.">>});
db_req(#httpd{path_parts = [_DbName, <<"_local/">>]}, _Db) ->
    throw({bad_request, <<"Invalid _local document id.">>});
db_req(#httpd{path_parts = [_DbName, <<"_local">>, Name]} = Req, Db) ->
    db_doc_req(Req, Db, <<"_local/", Name/binary>>);
db_req(#httpd{path_parts = [_DbName, <<"_local">> | _Rest]}, _Db) ->
    throw({bad_request, <<"_local documents do not accept attachments.">>});
db_req(#httpd{path_parts = [_, DocId]} = Req, Db) ->
    db_doc_req(Req, Db, DocId);
db_req(#httpd{method = 'DELETE', path_parts = [_, DocId | FileNameParts]} = Req, Db) ->
    chttpd:body(Req),
    db_attachment_req(Req, Db, DocId, FileNameParts);
db_req(#httpd{path_parts = [_, DocId | FileNameParts]} = Req, Db) ->
    db_attachment_req(Req, Db, DocId, FileNameParts).

multi_all_docs_view(Req, Db, OP, Queries) ->
    Args0 = couch_mrview_http:parse_params(Req, undefined),
    Args1 = Args0#mrargs{view_type = map},
    ArgQueries = lists:map(
        fun({Query}) ->
            QueryArg1 = couch_mrview_http:parse_params(
                Query,
                undefined,
                Args1,
                [decoded]
            ),
            QueryArgs2 = fabric_util:validate_all_docs_args(Db, QueryArg1),
            set_namespace(OP, QueryArgs2)
        end,
        Queries
    ),
    Options = [{user_ctx, Req#httpd.user_ctx}],
    VAcc0 = #vacc{db = Db, req = Req, prepend = "\r\n"},
    FirstChunk = "{\"results\":[",
    {ok, Resp0} = chttpd:start_delayed_json_response(
        VAcc0#vacc.req,
        200,
        [],
        FirstChunk
    ),
    VAcc1 = VAcc0#vacc{resp = Resp0},
    VAcc2 = lists:foldl(
        fun(Args, Acc0) ->
            {ok, Acc1} = fabric:all_docs(
                Db,
                Options,
                fun view_cb/2,
                Acc0,
                Args
            ),
            Acc1
        end,
        VAcc1,
        ArgQueries
    ),
    {ok, Resp1} = chttpd:send_delayed_chunk(VAcc2#vacc.resp, "\r\n]}"),
    chttpd:end_delayed_json_response(Resp1).

all_docs_view(Req, Db, Keys, OP) ->
    Args0 = couch_mrview_http:parse_body_and_query(Req, Keys),
    Args1 = Args0#mrargs{view_type = map},
    Args2 = fabric_util:validate_all_docs_args(Db, Args1),
    Args3 = set_namespace(OP, Args2),
    Args4 = set_include_sysdocs(OP, Req, Args3),
    Options = [{user_ctx, Req#httpd.user_ctx}],
    Max = chttpd:chunked_response_buffer_size(),
    VAcc = #vacc{db = Db, req = Req, threshold = Max},
    {ok, Resp} = fabric:all_docs(Db, Options, fun view_cb/2, VAcc, Args4),
    {ok, Resp#vacc.resp}.

view_cb({row, Row} = Msg, Acc) ->
    case lists:keymember(doc, 1, Row) of
        true -> chttpd_stats:incr_reads();
        false -> ok
    end,
    chttpd_stats:incr_rows(),
    couch_mrview_http:view_cb(Msg, Acc);
view_cb(Msg, Acc) ->
    couch_mrview_http:view_cb(Msg, Acc).

db_doc_req(#httpd{method = 'DELETE'} = Req, Db, DocId) ->
    % check for the existence of the doc to handle the 404 case.
    couch_doc_open(Db, DocId, nil, []),
    case chttpd:qs_value(Req, "rev") of
        undefined ->
            Body = {[{<<"_deleted">>, true}]};
        Rev ->
            Body = {[{<<"_rev">>, ?l2b(Rev)}, {<<"_deleted">>, true}]}
    end,
    Doc = couch_doc_from_req(Req, Db, DocId, Body),
    send_updated_doc(Req, Db, DocId, Doc);
db_doc_req(#httpd{method = 'GET', mochi_req = MochiReq} = Req, Db, DocId) ->
    #doc_query_args{
        rev = Rev0,
        open_revs = Revs,
        options = Options0,
        atts_since = AttsSince
    } = parse_doc_query(Req),
    Options = [{user_ctx, Req#httpd.user_ctx} | Options0],
    case Revs of
        [] ->
            Options2 =
                if
                    AttsSince /= nil ->
                        [{atts_since, AttsSince}, attachments | Options];
                    true ->
                        Options
                end,
            Rev =
                case lists:member(latest, Options) of
                    % couch_doc_open will open the winning rev despite of a rev passed
                    % https://docs.couchdb.org/en/stable/api/document/common.html?highlight=latest#get--db-docid
                    true -> nil;
                    false -> Rev0
                end,
            Doc = couch_doc_open(Db, DocId, Rev, Options2),
            send_doc(Req, Doc, Options2);
        _ ->
            case fabric:open_revs(Db, DocId, Revs, Options) of
                {ok, []} when Revs == all ->
                    chttpd:send_error(Req, {not_found, missing});
                {ok, Results} ->
                    chttpd_stats:incr_reads(length(Results)),
                    case MochiReq:accepts_content_type("multipart/mixed") of
                        false ->
                            {ok, Resp} = start_json_response(Req, 200),
                            send_chunk(Resp, "["),
                            % We loop through the docs. The first time through the separator
                            % is whitespace, then a comma on subsequent iterations.
                            lists:foldl(
                                fun(Result, AccSeparator) ->
                                    case Result of
                                        {ok, Doc} ->
                                            JsonDoc = couch_doc:to_json_obj(Doc, Options),
                                            Json = ?JSON_ENCODE({[{ok, JsonDoc}]}),
                                            send_chunk(Resp, AccSeparator ++ Json);
                                        {{not_found, missing}, RevId} ->
                                            RevStr = couch_doc:rev_to_str(RevId),
                                            Json = ?JSON_ENCODE({[{<<"missing">>, RevStr}]}),
                                            send_chunk(Resp, AccSeparator ++ Json)
                                    end,
                                    % AccSeparator now has a comma
                                    ","
                                end,
                                "",
                                Results
                            ),
                            send_chunk(Resp, "]"),
                            end_json_response(Resp);
                        true ->
                            send_docs_multipart(Req, Results, Options)
                    end;
                {error, Error} ->
                    chttpd:send_error(Req, Error)
            end
    end;
db_doc_req(#httpd{method = 'POST', user_ctx = Ctx} = Req, Db, DocId) ->
    couch_httpd:validate_referer(Req),
    couch_db:validate_docid(Db, DocId),
    chttpd:validate_ctype(Req, "multipart/form-data"),

    W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
    Options = [{user_ctx, Ctx}, {w, W}],

    Form = couch_httpd:parse_form(Req),
    case proplists:is_defined("_doc", Form) of
        true ->
            Json = ?JSON_DECODE(couch_util:get_value("_doc", Form)),
            Doc = couch_doc_from_req(Req, Db, DocId, Json);
        false ->
            Rev = couch_doc:parse_rev(list_to_binary(couch_util:get_value("_rev", Form))),
            Doc =
                case fabric:open_revs(Db, DocId, [Rev], []) of
                    {ok, [{ok, Doc0}]} ->
                        chttpd_stats:incr_reads(),
                        Doc0;
                    {error, Error} ->
                        throw(Error)
                end
    end,
    UpdatedAtts = [
        couch_att:new([
            {name, validate_attachment_name(Name)},
            {type, list_to_binary(ContentType)},
            {data, Content}
        ])
     || {Name, {ContentType, _}, Content} <-
            proplists:get_all_values("_attachments", Form)
    ],
    #doc{atts = OldAtts} = Doc,
    OldAtts2 = lists:flatmap(
        fun(Att) ->
            OldName = couch_att:fetch(name, Att),
            case [1 || A <- UpdatedAtts, couch_att:fetch(name, A) == OldName] of
                % the attachment wasn't in the UpdatedAtts, return it
                [] -> [Att];
                % the attachment was in the UpdatedAtts, drop it
                _ -> []
            end
        end,
        OldAtts
    ),
    NewDoc = Doc#doc{
        atts = UpdatedAtts ++ OldAtts2
    },
    case fabric:update_doc(Db, NewDoc, Options) of
        {ok, NewRev} ->
            chttpd_stats:incr_writes(),
            HttpCode = 201;
        {accepted, NewRev} ->
            chttpd_stats:incr_writes(),
            HttpCode = 202
    end,
    send_json(
        Req,
        HttpCode,
        [{"ETag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewRev)) ++ "\""}],
        {[
            {ok, true},
            {id, DocId},
            {rev, couch_doc:rev_to_str(NewRev)}
        ]}
    );
db_doc_req(#httpd{method = 'PUT', user_ctx = Ctx} = Req, Db, DocId) ->
    #doc_query_args{
        update_type = UpdateType
    } = parse_doc_query(Req),
    DbName = couch_db:name(Db),
    couch_db:validate_docid(Db, DocId),

    W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
    Options = [{user_ctx, Ctx}, {w, W}],

    Loc = absolute_uri(Req, [
        $/,
        couch_util:url_encode(DbName),
        $/,
        couch_util:url_encode(DocId)
    ]),
    RespHeaders = [{"Location", Loc}],
    case couch_util:to_list(couch_httpd:header_value(Req, "Content-Type")) of
        ("multipart/related;" ++ _) = ContentType ->
            couch_httpd:check_max_request_length(Req),
            couch_httpd_multipart:num_mp_writers(mem3:n(mem3:dbname(DbName), DocId)),
            {ok, Doc0, WaitFun, Parser} = couch_doc:doc_from_multi_part_stream(
                ContentType,
                fun() -> receive_request_data(Req) end
            ),
            Doc = couch_doc_from_req(Req, Db, DocId, Doc0),
            validate_revs(Doc, UpdateType =:= ?INTERACTIVE_EDIT),

            try
                {HttpCode, RespHeaders1, RespBody} = update_doc_req(
                    Req,
                    Db,
                    DocId,
                    Doc,
                    RespHeaders,
                    UpdateType
                ),
                WaitFun(),
                send_json(Req, HttpCode, RespHeaders1, RespBody)
            catch
                throw:Err ->
                    % Document rejected by a validate_doc_update function.
                    couch_httpd_multipart:abort_multipart_stream(Parser),
                    throw(Err)
            end;
        _Else ->
            case chttpd:qs_value(Req, "batch") of
                "ok" ->
                    % batch
                    Doc = couch_doc_from_req(Req, Db, DocId, chttpd:json_body(Req)),
                    validate_revs(Doc, UpdateType =:= ?INTERACTIVE_EDIT),

                    spawn(fun() ->
                        case catch (fabric:update_doc(Db, Doc, Options)) of
                            {ok, _} ->
                                chttpd_stats:incr_writes(),
                                ok;
                            {accepted, _} ->
                                chttpd_stats:incr_writes(),
                                ok;
                            Error ->
                                couch_log:notice("Batch doc error (~s): ~p", [DocId, Error])
                        end
                    end),
                    send_json(
                        Req,
                        202,
                        [],
                        {[
                            {ok, true},
                            {id, DocId}
                        ]}
                    );
                _Normal ->
                    % normal
                    Body = chttpd:json_body(Req),
                    Doc = couch_doc_from_req(Req, Db, DocId, Body),
                    validate_revs(Doc, UpdateType =:= ?INTERACTIVE_EDIT),
                    send_updated_doc(Req, Db, DocId, Doc, RespHeaders, UpdateType)
            end
    end;
db_doc_req(#httpd{method = 'COPY', user_ctx = Ctx} = Req, Db, SourceDocId) ->
    SourceRev =
        case extract_header_rev(Req, chttpd:qs_value(Req, "rev")) of
            missing_rev -> nil;
            Rev -> Rev
        end,
    {TargetDocId0, TargetRevs} = couch_httpd_db:parse_copy_destination_header(Req),
    TargetDocId = list_to_binary(chttpd:unquote(TargetDocId0)),
    % open old doc
    Doc = couch_doc_open(Db, SourceDocId, SourceRev, []),
    % save new doc
    case
        fabric:update_doc(
            Db,
            Doc#doc{id = TargetDocId, revs = TargetRevs},
            [{user_ctx, Ctx}]
        )
    of
        {ok, NewTargetRev} ->
            chttpd_stats:incr_writes(),
            HttpCode = 201;
        {accepted, NewTargetRev} ->
            chttpd_stats:incr_writes(),
            HttpCode = 202
    end,
    % respond
    DbName = couch_db:name(Db),
    {PartRes} = update_doc_result_to_json(TargetDocId, {ok, NewTargetRev}),
    Loc = absolute_uri(
        Req, "/" ++ couch_util:url_encode(DbName) ++ "/" ++ couch_util:url_encode(TargetDocId)
    ),
    send_json(
        Req,
        HttpCode,
        [
            {"Location", Loc},
            {"ETag", "\"" ++ ?b2l(couch_doc:rev_to_str(NewTargetRev)) ++ "\""}
        ],
        {PartRes}
    );
db_doc_req(Req, _Db, _DocId) ->
    send_method_not_allowed(Req, "DELETE,GET,HEAD,POST,PUT,COPY").

send_doc(Req, Doc, Options) ->
    case Doc#doc.meta of
        [] ->
            DiskEtag = couch_httpd:doc_etag(Doc),
            % output etag only when we have no meta
            chttpd:etag_respond(Req, DiskEtag, fun() ->
                send_doc_efficiently(Req, Doc, [{"ETag", DiskEtag}], Options)
            end);
        _ ->
            send_doc_efficiently(Req, Doc, [], Options)
    end.

send_doc_efficiently(Req, #doc{atts = []} = Doc, Headers, Options) ->
    send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options));
send_doc_efficiently(#httpd{mochi_req = MochiReq} = Req, #doc{atts = Atts} = Doc, Headers, Options) ->
    case lists:member(attachments, Options) of
        true ->
            Refs = monitor_attachments(Atts),
            try
                case MochiReq:accepts_content_type("multipart/related") of
                    false ->
                        send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options));
                    true ->
                        Boundary = couch_uuids:random(),
                        JsonBytes = ?JSON_ENCODE(
                            couch_doc:to_json_obj(
                                Doc,
                                [attachments, follows, att_encoding_info | Options]
                            )
                        ),
                        {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(
                            Boundary, JsonBytes, Atts, true
                        ),
                        CType = {"Content-Type", ContentType},
                        {ok, Resp} = start_response_length(Req, 200, [CType | Headers], Len),
                        couch_doc:doc_to_multi_part_stream(
                            Boundary,
                            JsonBytes,
                            Atts,
                            fun(Data) -> couch_httpd:send(Resp, Data) end,
                            true
                        )
                end
            after
                demonitor_refs(Refs)
            end;
        false ->
            send_json(Req, 200, Headers, couch_doc:to_json_obj(Doc, Options))
    end.

send_docs_multipart_bulk_get(Results, Options0, OuterBoundary, Resp) ->
    InnerBoundary = bulk_get_multipart_boundary(),
    Options = [attachments, follows, att_encoding_info | Options0],
    lists:foreach(
        fun
            ({ok, #doc{id = Id, revs = Revs, atts = Atts} = Doc}) ->
                Refs = monitor_attachments(Doc#doc.atts),
                try
                    JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
                    couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>),
                    case non_stubbed_attachments(Atts) of
                        [] ->
                            couch_httpd:send_chunk(
                                Resp, <<"\r\nContent-Type: application/json\r\n\r\n">>
                            );
                        _ ->
                            lists:foreach(
                                fun(Header) -> couch_httpd:send_chunk(Resp, Header) end,
                                bulk_get_multipart_headers(Revs, Id, InnerBoundary)
                            )
                    end,
                    couch_doc:doc_to_multi_part_stream(
                        InnerBoundary,
                        JsonBytes,
                        Atts,
                        fun(Data) -> couch_httpd:send_chunk(Resp, Data) end,
                        true
                    )
                after
                    demonitor_refs(Refs)
                end;
            ({{not_found, missing}, RevId}) ->
                RevStr = couch_doc:rev_to_str(RevId),
                Json = ?JSON_ENCODE(
                    {[
                        {<<"rev">>, RevStr},
                        {<<"error">>, <<"not_found">>},
                        {<<"reason">>, <<"missing">>}
                    ]}
                ),
                couch_httpd:send_chunk(
                    Resp,
                    [
                        <<"\r\n--", OuterBoundary/binary>>,
                        <<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
                        Json
                    ]
                )
        end,
        Results
    ).

send_docs_multipart(Req, Results, Options1) ->
    OuterBoundary = couch_uuids:random(),
    InnerBoundary = couch_uuids:random(),
    Options = [attachments, follows, att_encoding_info | Options1],
    CType = {"Content-Type", "multipart/mixed; boundary=\"" ++ ?b2l(OuterBoundary) ++ "\""},
    {ok, Resp} = start_chunked_response(Req, 200, [CType]),
    couch_httpd:send_chunk(Resp, <<"--", OuterBoundary/binary>>),
    lists:foreach(
        fun
            ({ok, #doc{atts = Atts} = Doc}) ->
                Refs = monitor_attachments(Doc#doc.atts),
                try
                    JsonBytes = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, Options)),
                    {ContentType, _Len} = couch_doc:len_doc_to_multi_part_stream(
                        InnerBoundary, JsonBytes, Atts, true
                    ),
                    couch_httpd:send_chunk(
                        Resp, <<"\r\nContent-Type: ", ContentType/binary, "\r\n\r\n">>
                    ),
                    couch_doc:doc_to_multi_part_stream(
                        InnerBoundary,
                        JsonBytes,
                        Atts,
                        fun(Data) -> couch_httpd:send_chunk(Resp, Data) end,
                        true
                    ),
                    couch_httpd:send_chunk(Resp, <<"\r\n--", OuterBoundary/binary>>)
                after
                    demonitor_refs(Refs)
                end;
            ({{not_found, missing}, RevId}) ->
                RevStr = couch_doc:rev_to_str(RevId),
                Json = ?JSON_ENCODE({[{<<"missing">>, RevStr}]}),
                couch_httpd:send_chunk(
                    Resp,
                    [
                        <<"\r\nContent-Type: application/json; error=\"true\"\r\n\r\n">>,
                        Json,
                        <<"\r\n--", OuterBoundary/binary>>
                    ]
                )
        end,
        Results
    ),
    couch_httpd:send_chunk(Resp, <<"--">>),
    couch_httpd:last_chunk(Resp).

bulk_get_multipart_headers({0, []}, Id, Boundary) ->
    [
        <<"\r\nX-Doc-Id: ", Id/binary>>,
        <<"\r\nContent-Type: multipart/related; boundary=", Boundary/binary, "\r\n\r\n">>
    ];
bulk_get_multipart_headers({Start, [FirstRevId | _]}, Id, Boundary) ->
    RevStr = couch_doc:rev_to_str({Start, FirstRevId}),
    [
        <<"\r\nX-Doc-Id: ", Id/binary>>,
        <<"\r\nX-Rev-Id: ", RevStr/binary>>,
        <<"\r\nContent-Type: multipart/related; boundary=", Boundary/binary, "\r\n\r\n">>
    ].

bulk_get_multipart_boundary() ->
    Unique = couch_uuids:random(),
    <<"--", Unique/binary>>.

receive_request_data(Req) ->
    receive_request_data(Req, chttpd:body_length(Req)).

receive_request_data(Req, Len) when Len == chunked ->
    Ref = make_ref(),
    ChunkFun = fun({_Length, Binary}, _State) ->
        self() ! {chunk, Ref, Binary}
    end,
    couch_httpd:recv_chunked(Req, 4096, ChunkFun, ok),
    GetChunk = fun GC() ->
        receive
            {chunk, Ref, Binary} -> {Binary, GC}
        end
    end,
    {
        receive
            {chunk, Ref, Binary} -> Binary
        end,
        GetChunk
    };
receive_request_data(Req, LenLeft) when LenLeft > 0 ->
    Len = erlang:min(4096, LenLeft),
    Data = chttpd:recv(Req, Len),
    {Data, fun() -> receive_request_data(Req, LenLeft - iolist_size(Data)) end};
receive_request_data(_Req, _) ->
    throw(<<"expected more data">>).

update_doc_result_to_json({error, _} = Error) ->
    {_Code, Err, Msg} = chttpd:error_info(Error),
    {[
        {error, Err},
        {reason, Msg}
    ]};
update_doc_result_to_json({{Id, Rev}, Error}) ->
    {_Code, Err, Msg} = chttpd:error_info(Error),
    {[
        {id, Id},
        {rev, couch_doc:rev_to_str(Rev)},
        {error, Err},
        {reason, Msg}
    ]}.

update_doc_result_to_json(#doc{id = DocId}, Result) ->
    update_doc_result_to_json(DocId, Result);
update_doc_result_to_json(DocId, {ok, NewRev}) ->
    {[{ok, true}, {id, DocId}, {rev, couch_doc:rev_to_str(NewRev)}]};
update_doc_result_to_json(DocId, {accepted, NewRev}) ->
    {[{ok, true}, {id, DocId}, {rev, couch_doc:rev_to_str(NewRev)}, {accepted, true}]};
update_doc_result_to_json(DocId, Error) ->
    {_Code, ErrorStr, Reason} = chttpd:error_info(Error),
    {[{id, DocId}, {error, ErrorStr}, {reason, Reason}]}.

purge_results_to_json([], []) ->
    {201, []};
purge_results_to_json([{DocId, _Revs} | RIn], [{ok, PRevs} | ROut]) ->
    {Code, Results} = purge_results_to_json(RIn, ROut),
    couch_stats:increment_counter([couchdb, document_purges, success]),
    {Code, [{DocId, couch_doc:revs_to_strs(PRevs)} | Results]};
purge_results_to_json([{DocId, _Revs} | RIn], [{accepted, PRevs} | ROut]) ->
    {Code, Results} = purge_results_to_json(RIn, ROut),
    couch_stats:increment_counter([couchdb, document_purges, success]),
    NewResults = [{DocId, couch_doc:revs_to_strs(PRevs)} | Results],
    {erlang:max(Code, 202), NewResults};
purge_results_to_json([{DocId, _Revs} | RIn], [Error | ROut]) ->
    {Code, Results} = purge_results_to_json(RIn, ROut),
    {NewCode, ErrorStr, Reason} = chttpd:error_info(Error),
    couch_stats:increment_counter([couchdb, document_purges, failure]),
    NewResults = [{DocId, {[{error, ErrorStr}, {reason, Reason}]}} | Results],
    {erlang:max(NewCode, Code), NewResults}.

send_updated_doc(Req, Db, DocId, Json) ->
    send_updated_doc(Req, Db, DocId, Json, []).

send_updated_doc(Req, Db, DocId, Doc, Headers) ->
    send_updated_doc(Req, Db, DocId, Doc, Headers, ?INTERACTIVE_EDIT).

send_updated_doc(Req, Db, DocId, Doc, Headers, Type) ->
    {Code, Headers1, Body} = update_doc_req(Req, Db, DocId, Doc, Headers, Type),
    send_json(Req, Code, Headers1, Body).

update_doc_req(Req, Db, DocId, Doc, Headers, UpdateType) ->
    #httpd{user_ctx = Ctx} = Req,
    W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
    Options =
        case couch_httpd:header_value(Req, "X-Couch-Full-Commit") of
            "true" ->
                [full_commit, UpdateType, {user_ctx, Ctx}, {w, W}];
            "false" ->
                [delay_commit, UpdateType, {user_ctx, Ctx}, {w, W}];
            _ ->
                [UpdateType, {user_ctx, Ctx}, {w, W}]
        end,
    {Status, {etag, Etag}, Body} = update_doc(Db, DocId, Doc, Options),
    HttpCode = http_code_from_status(Status),
    ResponseHeaders = [{"ETag", Etag} | Headers],
    {HttpCode, ResponseHeaders, Body}.

http_code_from_status(Status) ->
    case Status of
        accepted ->
            202;
        created ->
            201;
        ok ->
            200
    end.

update_doc(Db, DocId, #doc{deleted = Deleted, body = DocBody} = Doc, Options) ->
    {_, Ref} = spawn_monitor(fun() ->
        try fabric:update_doc(Db, Doc, Options) of
            Resp ->
                exit({exit_ok, Resp})
        catch
            throw:Reason ->
                exit({exit_throw, Reason});
            error:Reason ->
                exit({exit_error, Reason});
            exit:Reason ->
                exit({exit_exit, Reason})
        end
    end),
    Result =
        receive
            {'DOWN', Ref, _, _, {exit_ok, Ret}} ->
                Ret;
            {'DOWN', Ref, _, _, {exit_throw, Reason}} ->
                throw(Reason);
            {'DOWN', Ref, _, _, {exit_error, Reason}} ->
                erlang:error(Reason);
            {'DOWN', Ref, _, _, {exit_exit, Reason}} ->
                erlang:exit(Reason)
        end,

    case Result of
        {ok, NewRev} ->
            Accepted = false;
        {accepted, NewRev} ->
            Accepted = true
    end,
    Etag = couch_httpd:doc_etag(DocId, DocBody, NewRev),
    Status =
        case {Accepted, Deleted} of
            {true, _} ->
                accepted;
            {false, true} ->
                ok;
            {false, false} ->
                created
        end,
    NewRevStr = couch_doc:rev_to_str(NewRev),
    Body = {[{ok, true}, {id, DocId}, {rev, NewRevStr}]},
    {Status, {etag, Etag}, Body}.

couch_doc_from_req(Req, _Db, DocId, #doc{revs = Revs} = Doc) ->
    validate_attachment_names(Doc),
    Rev =
        case chttpd:qs_value(Req, "rev") of
            undefined ->
                undefined;
            QSRev ->
                couch_doc:parse_rev(QSRev)
        end,
    Revs2 =
        case Revs of
            {Start, [RevId | _]} ->
                if
                    Rev /= undefined andalso Rev /= {Start, RevId} ->
                        throw(
                            {bad_request,
                                "Document rev from request body and query "
                                "string have different values"}
                        );
                    true ->
                        case extract_header_rev(Req, {Start, RevId}) of
                            missing_rev -> {0, []};
                            _ -> Revs
                        end
                end;
            _ ->
                case extract_header_rev(Req, Rev) of
                    missing_rev -> {0, []};
                    {Pos, RevId2} -> {Pos, [RevId2]}
                end
        end,
    Doc#doc{id = DocId, revs = Revs2};
couch_doc_from_req(Req, Db, DocId, Json) ->
    Doc = couch_db:doc_from_json_obj_validate(Db, Json),
    couch_doc_from_req(Req, Db, DocId, Doc).

% Useful for debugging
% couch_doc_open(Db, DocId) ->
%   couch_doc_open(Db, DocId, nil, []).

couch_doc_open(Db, DocId, Rev, Options0) ->
    Options = [{user_ctx, couch_db:get_user_ctx(Db)} | Options0],
    case Rev of
        % open most recent rev
        nil ->
            case fabric:open_doc(Db, DocId, Options) of
                {ok, Doc} ->
                    chttpd_stats:incr_reads(),
                    Doc;
                Error ->
                    throw(Error)
            end;
        % open a specific rev (deletions come back as stubs)
        _ ->
            case fabric:open_revs(Db, DocId, [Rev], Options) of
                {ok, [{ok, Doc}]} ->
                    chttpd_stats:incr_reads(),
                    Doc;
                {ok, [{{not_found, missing}, Rev}]} ->
                    throw(not_found);
                {ok, [Else]} ->
                    throw(Else);
                {error, Error} ->
                    throw(Error)
            end
    end.

get_existing_attachment(Atts, FileName) ->
    % Check if attachment exists, if not throw not_found
    case [A || A <- Atts, couch_att:fetch(name, A) == FileName] of
        [] -> throw({not_found, "Document is missing attachment"});
        [Att] -> Att
    end.

% Attachment request handlers

db_attachment_req(#httpd{method = 'GET', mochi_req = MochiReq} = Req, Db, DocId, FileNameParts) ->
    FileName = list_to_binary(
        mochiweb_util:join(
            lists:map(
                fun binary_to_list/1,
                FileNameParts
            ),
            "/"
        )
    ),
    #doc_query_args{
        rev = Rev,
        options = Options
    } = parse_doc_query(Req),
    #doc{
        atts = Atts
    } = Doc = couch_doc_open(Db, DocId, Rev, Options),
    Att = get_existing_attachment(Atts, FileName),
    [Type, Enc, DiskLen, AttLen, Md5] = couch_att:fetch(
        [type, encoding, disk_len, att_len, md5], Att
    ),
    Refs = monitor_attachments(Att),
    try
        Etag =
            case Md5 of
                <<>> -> chttpd:doc_etag(Doc);
                _ -> "\"" ++ ?b2l(base64:encode(Md5)) ++ "\""
            end,
        ReqAcceptsAttEnc = lists:member(
            atom_to_list(Enc),
            couch_httpd:accepted_encodings(Req)
        ),
        Headers0 =
            [
                {"ETag", Etag},
                {"Cache-Control", "must-revalidate"},
                {"Content-Type", binary_to_list(Type)}
            ] ++
                case ReqAcceptsAttEnc of
                    true when Enc =/= identity ->
                        % RFC 2616 says that the 'identify' encoding should not be used in
                        % the Content-Encoding header
                        [{"Content-Encoding", atom_to_list(Enc)}];
                    _ ->
                        []
                end ++
                case Enc of
                    identity ->
                        [{"Accept-Ranges", "bytes"}];
                    _ ->
                        [{"Accept-Ranges", "none"}]
                end,
        Headers = chttpd_util:maybe_add_csp_header("attachments", Headers0, "sandbox"),
        Len =
            case {Enc, ReqAcceptsAttEnc} of
                {identity, _} ->
                    % stored and served in identity form
                    DiskLen;
                {_, false} when DiskLen =/= AttLen ->
                    % Stored encoded, but client doesn't accept the encoding we used,
                    % so we need to decode on the fly.  DiskLen is the identity length
                    % of the attachment.
                    DiskLen;
                {_, true} ->
                    % Stored and served encoded.  AttLen is the encoded length.
                    AttLen;
                _ ->
                    % We received an encoded attachment and stored it as such, so we
                    % don't know the identity length.  The client doesn't accept the
                    % encoding, and since we cannot serve a correct Content-Length
                    % header we'll fall back to a chunked response.
                    undefined
            end,
        AttFun =
            case ReqAcceptsAttEnc of
                false ->
                    fun couch_att:foldl_decode/3;
                true ->
                    fun couch_att:foldl/3
            end,
        chttpd:etag_respond(
            Req,
            Etag,
            fun() ->
                case Len of
                    undefined ->
                        {ok, Resp} = start_chunked_response(Req, 200, Headers),
                        AttFun(Att, fun(Seg, _) -> send_chunk(Resp, Seg) end, {ok, Resp}),
                        couch_httpd:last_chunk(Resp);
                    _ ->
                        Ranges = parse_ranges(MochiReq:get(range), Len),
                        case {Enc, Ranges} of
                            {identity, [{From, To}]} ->
                                Headers1 =
                                    [{"Content-Range", make_content_range(From, To, Len)}] ++
                                        Headers,
                                {ok, Resp} = start_response_length(
                                    Req, 206, Headers1, To - From + 1
                                ),
                                couch_att:range_foldl(
                                    Att,
                                    From,
                                    To + 1,
                                    fun(Seg, _) -> send(Resp, Seg) end,
                                    {ok, Resp}
                                );
                            {identity, Ranges} when is_list(Ranges) andalso length(Ranges) < 10 ->
                                send_ranges_multipart(Req, Type, Len, Att, Ranges);
                            _ ->
                                {ok, Resp} = start_response_length(Req, 200, Headers, Len),
                                AttFun(Att, fun(Seg, _) -> send(Resp, Seg) end, {ok, Resp})
                        end
                end
            end
        )
    after
        demonitor_refs(Refs)
    end;
db_attachment_req(#httpd{method = Method, user_ctx = Ctx} = Req, Db, DocId, FileNameParts) when
    (Method == 'PUT') or (Method == 'DELETE')
->
    FileName = validate_attachment_name(
        mochiweb_util:join(
            lists:map(
                fun binary_to_list/1,
                FileNameParts
            ),
            "/"
        )
    ),

    NewAtt =
        case Method of
            'DELETE' ->
                [];
            _ ->
                MimeType =
                    case couch_httpd:header_value(Req, "Content-Type") of
                        % We could throw an error here or guess by the FileName.
                        % Currently, just giving it a default.
                        undefined -> <<"application/octet-stream">>;
                        CType -> list_to_binary(CType)
                    end,
                Data = fabric:att_receiver(Req, couch_db:name(Db), chttpd:body_length(Req)),
                ContentLen =
                    case couch_httpd:header_value(Req, "Content-Length") of
                        undefined -> undefined;
                        Length -> list_to_integer(Length)
                    end,
                ContentEnc = string:to_lower(
                    string:strip(
                        couch_httpd:header_value(Req, "Content-Encoding", "identity")
                    )
                ),
                Encoding =
                    case ContentEnc of
                        "identity" ->
                            identity;
                        "gzip" ->
                            gzip;
                        _ ->
                            throw({
                                bad_ctype,
                                "Only gzip and identity content-encodings are supported"
                            })
                    end,
                [
                    couch_att:new([
                        {name, FileName},
                        {type, MimeType},
                        {data, Data},
                        {att_len, ContentLen},
                        {encoding, Encoding}
                    ])
                ]
        end,

    Doc =
        case extract_header_rev(Req, chttpd:qs_value(Req, "rev")) of
            % make the new doc
            missing_rev ->
                if
                    Method =/= 'DELETE' ->
                        ok;
                    true ->
                        % check for the existence of the doc and attachment
                        CurrDoc = #doc{} = couch_doc_open(Db, DocId, nil, []),
                        get_existing_attachment(CurrDoc#doc.atts, FileName)
                end,
                couch_db:validate_docid(Db, DocId),
                #doc{id = DocId};
            Rev ->
                case fabric:open_revs(Db, DocId, [Rev], [{user_ctx, Ctx}]) of
                    {ok, [{ok, Doc0}]} ->
                        chttpd_stats:incr_reads(),
                        if
                            Method =/= 'DELETE' ->
                                ok;
                            true ->
                                % check if attachment exists
                                get_existing_attachment(Doc0#doc.atts, FileName)
                        end,
                        Doc0;
                    {ok, [Error]} ->
                        throw(Error);
                    {error, Error} ->
                        throw(Error)
                end
        end,

    #doc{atts = Atts} = Doc,
    DocEdited = Doc#doc{
        atts = NewAtt ++ [A || A <- Atts, couch_att:fetch(name, A) /= FileName]
    },
    W = chttpd:qs_value(Req, "w", integer_to_list(mem3:quorum(Db))),
    case fabric:update_doc(Db, DocEdited, [{user_ctx, Ctx}, {w, W}]) of
        {ok, UpdatedRev} ->
            chttpd_stats:incr_writes(),
            HttpCode = 201;
        {accepted, UpdatedRev} ->
            chttpd_stats:incr_writes(),
            HttpCode = 202
    end,
    erlang:put(mochiweb_request_recv, true),
    DbName = couch_db:name(Db),

    {Status, Headers} =
        case Method of
            'DELETE' ->
                {200, []};
            _ ->
                {HttpCode, [
                    {"Location",
                        absolute_uri(Req, [
                            $/,
                            DbName,
                            $/,
                            couch_util:url_encode(DocId),
                            $/,
                            couch_util:url_encode(FileName)
                        ])}
                ]}
        end,
    send_json(
        Req,
        Status,
        Headers,
        {[
            {ok, true},
            {id, DocId},
            {rev, couch_doc:rev_to_str(UpdatedRev)}
        ]}
    );
db_attachment_req(Req, _Db, _DocId, _FileNameParts) ->
    send_method_not_allowed(Req, "DELETE,GET,HEAD,PUT").

send_ranges_multipart(Req, ContentType, Len, Att, Ranges) ->
    Boundary = couch_uuids:random(),
    CType = {"Content-Type", "multipart/byteranges; boundary=\"" ++ ?b2l(Boundary) ++ "\""},
    {ok, Resp} = start_chunked_response(Req, 206, [CType]),
    couch_httpd:send_chunk(Resp, <<"--", Boundary/binary>>),
    lists:foreach(
        fun({From, To}) ->
            ContentRange = make_content_range(From, To, Len),
            couch_httpd:send_chunk(
                Resp,
                <<"\r\nContent-Type: ", ContentType/binary, "\r\n", "Content-Range: ",
                    ContentRange/binary, "\r\n", "\r\n">>
            ),
            couch_att:range_foldl(
                Att,
                From,
                To + 1,
                fun(Seg, _) -> send_chunk(Resp, Seg) end,
                {ok, Resp}
            ),
            couch_httpd:send_chunk(Resp, <<"\r\n--", Boundary/binary>>)
        end,
        Ranges
    ),
    couch_httpd:send_chunk(Resp, <<"--">>),
    couch_httpd:last_chunk(Resp),
    {ok, Resp}.

parse_ranges(undefined, _Len) ->
    undefined;
parse_ranges(fail, _Len) ->
    undefined;
parse_ranges(Ranges, Len) ->
    parse_ranges(Ranges, Len, []).

parse_ranges([], _Len, Acc) ->
    lists:reverse(Acc);
parse_ranges([{0, none} | _], _Len, _Acc) ->
    undefined;
parse_ranges([{From, To} | _], _Len, _Acc) when
    is_integer(From) andalso is_integer(To) andalso To < From
->
    throw(requested_range_not_satisfiable);
parse_ranges([{From, To} | Rest], Len, Acc) when
    is_integer(To) andalso To >= Len
->
    parse_ranges([{From, Len - 1}] ++ Rest, Len, Acc);
parse_ranges([{none, To} | Rest], Len, Acc) ->
    parse_ranges([{Len - To, Len - 1}] ++ Rest, Len, Acc);
parse_ranges([{From, none} | Rest], Len, Acc) ->
    parse_ranges([{From, Len - 1}] ++ Rest, Len, Acc);
parse_ranges([{From, To} | Rest], Len, Acc) ->
    parse_ranges(Rest, Len, [{From, To}] ++ Acc).

make_content_range(From, To, Len) ->
    ?l2b(io_lib:format("bytes ~B-~B/~B", [From, To, Len])).

parse_doc_query(Req) ->
    lists:foldl(fun parse_doc_query/2, #doc_query_args{}, chttpd:qs(Req)).

parse_shards_opt(Req) ->
    [
        {n, parse_shards_opt("n", Req, config:get_integer("cluster", "n", 3))},
        {q, parse_shards_opt("q", Req, config:get_integer("cluster", "q", 2))},
        {placement,
            parse_shards_opt(
                "placement", Req, config:get("cluster", "placement")
            )}
    ].

parse_shards_opt("placement", Req, Default) ->
    Err = <<"The `placement` value should be in a format `zone:n`.">>,
    case chttpd:qs_value(Req, "placement", Default) of
        Default ->
            Default;
        [] ->
            throw({bad_request, Err});
        Val ->
            try
                true = lists:all(
                    fun(Rule) ->
                        [_, N] = string:tokens(Rule, ":"),
                        couch_util:validate_positive_int(N)
                    end,
                    string:tokens(Val, ",")
                ),
                Val
            catch
                _:_ ->
                    throw({bad_request, Err})
            end
    end;
parse_shards_opt(Param, Req, Default) ->
    Val = chttpd:qs_value(Req, Param, Default),
    Err = ?l2b(["The `", Param, "` value should be a positive integer."]),
    case couch_util:validate_positive_int(Val) of
        true -> Val;
        false -> throw({bad_request, Err})
    end.

parse_engine_opt(Req) ->
    case chttpd:qs_value(Req, "engine") of
        undefined ->
            [];
        Extension ->
            Available = couch_server:get_engine_extensions(),
            case lists:member(Extension, Available) of
                true ->
                    [{engine, iolist_to_binary(Extension)}];
                false ->
                    throw({bad_request, invalid_engine_extension})
            end
    end.

parse_partitioned_opt(Req) ->
    case chttpd:qs_value(Req, "partitioned") of
        undefined ->
            [];
        "false" ->
            [];
        "true" ->
            ok = validate_partitioned_db_enabled(Req),
            [
                {partitioned, true},
                {hash, [couch_partition, hash, []]}
            ];
        _ ->
            throw({bad_request, <<"Invalid `partitioned` parameter">>})
    end.

validate_partitioned_db_enabled(Req) ->
    case couch_flags:is_enabled(partitioned, Req) of
        true ->
            ok;
        false ->
            throw({bad_request, <<"Partitioned feature is not enabled.">>})
    end.

parse_doc_query({Key, Value}, Args) ->
    case {Key, Value} of
        {"attachments", "true"} ->
            Options = [attachments | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"meta", "true"} ->
            Options = [revs_info, conflicts, deleted_conflicts | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"revs", "true"} ->
            Options = [revs | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"local_seq", "true"} ->
            Options = [local_seq | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"revs_info", "true"} ->
            Options = [revs_info | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"conflicts", "true"} ->
            Options = [conflicts | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"deleted", "true"} ->
            Options = [deleted | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"deleted_conflicts", "true"} ->
            Options = [deleted_conflicts | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"rev", Rev} ->
            Args#doc_query_args{rev = couch_doc:parse_rev(Rev)};
        {"open_revs", "all"} ->
            Args#doc_query_args{open_revs = all};
        {"open_revs", RevsJsonStr} ->
            JsonArray = ?JSON_DECODE(RevsJsonStr),
            Args#doc_query_args{open_revs = couch_doc:parse_revs(JsonArray)};
        {"latest", "true"} ->
            Options = [latest | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"atts_since", RevsJsonStr} ->
            JsonArray = ?JSON_DECODE(RevsJsonStr),
            Args#doc_query_args{atts_since = couch_doc:parse_revs(JsonArray)};
        {"new_edits", "false"} ->
            Args#doc_query_args{update_type = ?REPLICATED_CHANGES};
        {"new_edits", "true"} ->
            Args#doc_query_args{update_type = ?INTERACTIVE_EDIT};
        {"att_encoding_info", "true"} ->
            Options = [att_encoding_info | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"r", R} ->
            Options = [{r, R} | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        {"w", W} ->
            Options = [{w, W} | Args#doc_query_args.options],
            Args#doc_query_args{options = Options};
        % unknown key value pair, ignore.
        _Else ->
            Args
    end.

parse_changes_query(Req) ->
    erlang:erase(changes_seq_interval),
    ChangesArgs = lists:foldl(
        fun({Key, Value}, Args) ->
            case {string:to_lower(Key), Value} of
                {"feed", "live"} ->
                    %% sugar for continuous
                    Args#changes_args{feed = "continuous"};
                {"feed", _} ->
                    Args#changes_args{feed = Value};
                {"descending", "true"} ->
                    Args#changes_args{dir = rev};
                {"since", _} ->
                    Args#changes_args{since = Value};
                {"last-event-id", _} ->
                    Args#changes_args{since = Value};
                {"limit", _} ->
                    Args#changes_args{limit = list_to_integer(Value)};
                {"style", _} ->
                    Args#changes_args{style = list_to_existing_atom(Value)};
                {"heartbeat", "true"} ->
                    Args#changes_args{heartbeat = true};
                {"heartbeat", _} ->
                    try list_to_integer(Value) of
                        HeartbeatInteger when HeartbeatInteger > 0 ->
                            Args#changes_args{heartbeat = HeartbeatInteger};
                        _ ->
                            throw(
                                {bad_request,
                                    <<"The heartbeat value should be a positive integer (in milliseconds).">>}
                            )
                    catch
                        error:badarg ->
                            throw(
                                {bad_request,
                                    <<"Invalid heartbeat value. Expecting a positive integer value (in milliseconds).">>}
                            )
                    end;
                {"timeout", _} ->
                    Args#changes_args{timeout = list_to_integer(Value)};
                {"include_docs", "true"} ->
                    Args#changes_args{include_docs = true};
                {"conflicts", "true"} ->
                    Args#changes_args{conflicts = true};
                {"attachments", "true"} ->
                    Options = [attachments | Args#changes_args.doc_options],
                    Args#changes_args{doc_options = Options};
                {"att_encoding_info", "true"} ->
                    Options = [att_encoding_info | Args#changes_args.doc_options],
                    Args#changes_args{doc_options = Options};
                {"filter", _} ->
                    Args#changes_args{filter = Value};
                {"seq_interval", _} ->
                    try list_to_integer(Value) of
                        V when V > 0 ->
                            erlang:put(changes_seq_interval, V),
                            Args;
                        _ ->
                            throw({bad_request, invalid_seq_interval})
                    catch
                        error:badarg ->
                            throw({bad_request, invalid_seq_interval})
                    end;
                % unknown key value pair, ignore.
                _Else ->
                    Args
            end
        end,
        #changes_args{},
        chttpd:qs(Req)
    ),
    %% if it's an EventSource request with a Last-event-ID header
    %% that should override the `since` query string, since it's
    %% probably the browser reconnecting.
    case ChangesArgs#changes_args.feed of
        "eventsource" ->
            case couch_httpd:header_value(Req, "last-event-id") of
                undefined ->
                    ChangesArgs;
                Value ->
                    ChangesArgs#changes_args{since = Value}
            end;
        _ ->
            ChangesArgs
    end.

extract_header_rev(Req, ExplicitRev) when is_binary(ExplicitRev) or is_list(ExplicitRev) ->
    extract_header_rev(Req, couch_doc:parse_rev(ExplicitRev));
extract_header_rev(Req, ExplicitRev) ->
    Etag =
        case chttpd:header_value(Req, "If-Match") of
            undefined -> undefined;
            Value -> couch_doc:parse_rev(string:strip(Value, both, $"))
        end,
    case {ExplicitRev, Etag} of
        {undefined, undefined} -> missing_rev;
        {_, undefined} -> ExplicitRev;
        {undefined, _} -> Etag;
        _ when ExplicitRev == Etag -> Etag;
        _ -> throw({bad_request, "Document rev and etag have different values"})
    end.

validate_security_can_be_edited(DbName) ->
    UserDbName = config:get("chttpd_auth", "authentication_db", "_users"),
    CanEditUserSecurityObject = config:get("couchdb", "users_db_security_editable", "false"),
    case {DbName, CanEditUserSecurityObject} of
        {UserDbName, "false"} ->
            Msg = "You can't edit the security object of the user database.",
            throw({forbidden, Msg});
        {_, _} ->
            ok
    end.

validate_revs(_Doc, true) ->
    ok;
validate_revs(#doc{revs = {0, []}}, false) ->
    throw(
        {bad_request,
            ?l2b(
                "When `new_edits: false`, " ++
                    "the document needs `_rev` or `_revisions` specified"
            )}
    );
validate_revs(_Doc, false) ->
    ok.

validate_attachment_names(Doc) ->
    lists:foreach(
        fun(Att) ->
            Name = couch_att:fetch(name, Att),
            validate_attachment_name(Name)
        end,
        Doc#doc.atts
    ).

validate_attachment_name(Name) when is_list(Name) ->
    validate_attachment_name(list_to_binary(Name));
validate_attachment_name(<<"_", Rest/binary>>) ->
    throw(
        {bad_request,
            <<"Attachment name '_", Rest/binary, "' starts with prohibited character '_'">>}
    );
validate_attachment_name(Name) ->
    case couch_util:validate_utf8(Name) of
        true -> Name;
        false -> throw({bad_request, <<"Attachment name is not UTF-8 encoded">>})
    end.

-spec monitor_attachments(couch_att:att() | [couch_att:att()]) -> [reference()].
monitor_attachments(Atts) when is_list(Atts) ->
    lists:foldl(
        fun(Att, Monitors) ->
            case couch_att:fetch(data, Att) of
                {Fd, _} ->
                    [monitor(process, Fd) | Monitors];
                Else ->
                    couch_log:error("~p from couch_att:fetch(data, ~p)", [Else, Att]),
                    Monitors
            end
        end,
        [],
        non_stubbed_attachments(Atts)
    );
monitor_attachments(Att) ->
    monitor_attachments([Att]).

demonitor_refs(Refs) when is_list(Refs) ->
    [demonitor(Ref) || Ref <- Refs].

% Return attachments which are not stubs
non_stubbed_attachments(Atts) when is_list(Atts) ->
    lists:filter(
        fun(Att) ->
            couch_att:fetch(data, Att) =/= stub
        end,
        Atts
    ).

set_namespace(<<"_all_docs">>, Args) ->
    set_namespace(undefined, Args);
set_namespace(<<"_local_docs">>, Args) ->
    set_namespace(<<"_local">>, Args);
set_namespace(<<"_design_docs">>, Args) ->
    set_namespace(<<"_design">>, Args);
set_namespace(NS, #mrargs{} = Args) ->
    couch_mrview_util:set_extra(Args, namespace, NS).

set_include_sysdocs(<<"_local_docs">>, Req, Args) ->
    Val = chttpd:qs_value(Req, "include_system", "false") == "true",
    couch_mrview_util:set_extra(Args, include_system, Val);
set_include_sysdocs(_OP, _Req, Args) ->
    Args.

%% /db/_bulk_get stuff

bulk_get_is_multipart(#httpd{mochi_req = MochiReq}) ->
    Json = MochiReq:accepts_content_type("application/json"),
    Mixed = MochiReq:accepts_content_type("multipart/mixed"),
    Related = MochiReq:accepts_content_type("multipart/related"),
    not Json andalso (Mixed orelse Related).

bulk_get_docs(Db, #{} = ArgsMap, Options) ->
    % Sort args by doc ID to hopefully make querying B-trees a bit faster
    KeyFun = fun({Ref, {DocId, _, _}}) -> {DocId, Ref} end,
    CmpFun = fun(A, B) -> KeyFun(A) =< KeyFun(B) end,
    ArgsList = lists:sort(CmpFun, maps:to_list(ArgsMap)),
    % Split out known errors. Later, before returning, recombine them back into
    % the final result map.
    PartFun = fun({_Ref, {_DocId, RevsOrError, _DocOpts}}) ->
        case RevsOrError of
            L when is_list(L) -> true;
            all -> true;
            {error, _} -> false
        end
    end,
    {ValidArgs, ErrorArgs} = lists:partition(PartFun, ArgsList),
    UseBatches = config:get_boolean("chttpd", "bulk_get_use_batches", true),
    Responses =
        case UseBatches of
            true -> bulk_get_docs_batched(Db, ValidArgs, Options);
            false -> bulk_get_docs_individually(Db, ValidArgs, Options)
        end,
    MapFun = fun({Ref, {DocId, Response, _}} = RespTuple) ->
        case Response of
            [] ->
                % Remap empty reponses to errors. This is a peculiarity of the
                % _bulk_get HTTP API. If revision was not specifed, `undefined`
                % must be returned as the error revision ID.
                #{Ref := {_, Revs, _}} = ArgsMap,
                RevStr = bulk_get_rev_error(Revs),
                Error = {RevStr, <<"not_found">>, <<"missing">>},
                {Ref, {DocId, {error, Error}, []}};
            [_ | _] = DocRevisions ->
                chttpd_stats:incr_reads(length(DocRevisions)),
                RespTuple;
            _ ->
                RespTuple
        end
    end,
    % Recombine with the inital known errors and return as a map
    maps:from_list(lists:map(MapFun, Responses) ++ ErrorArgs).

bulk_get_docs_batched(Db, Args, Options) when is_list(Args) ->
    % Args is [{Ref, {DocId, Revs, DocOpts}}, ...] but fabric:open_revs/3
    % accepts [{{DocId, Revs}, DocOpts}, ...] so we need to transform them
    ArgsFun = fun({_Ref, {DocId, Revs, DocOpts}}) ->
        {{DocId, Revs}, DocOpts}
    end,
    OpenRevsArgs = lists:map(ArgsFun, Args),
    case fabric:open_revs(Db, OpenRevsArgs, Options) of
        {ok, Responses} ->
            ZipFun = fun({Ref, {DocId, _Rev, DocOpts}}, Response) ->
                {Ref, {DocId, Response, DocOpts ++ Options}}
            end,
            lists:zipwith(ZipFun, Args, Responses);
        {error, Error} ->
            % Distribute error to all request args, so it looks like they
            % individually failed with that error
            MapFun = fun({Ref, {DocId, Revs, _DocOpts}}) ->
                RevStr = bulk_get_rev_error(Revs),
                Tag = internal_fabric_error,
                % This error will be emitted as json so make sure it's rendered
                % to a string first.
                Reason = couch_util:to_binary(Error),
                {Ref, {DocId, {error, {RevStr, Tag, Reason}}, []}}
            end,
            lists:map(MapFun, Args)
    end.

bulk_get_docs_individually(Db, Args, Options) when is_list(Args) ->
    MapFun = fun({Ref, {DocId, Revs, DocOpts}}) ->
        case fabric:open_revs(Db, DocId, Revs, DocOpts ++ Options) of
            {ok, Response} ->
                {Ref, {DocId, Response, DocOpts}};
            {error, Error} ->
                RevStr = bulk_get_rev_error(Revs),
                Tag = internal_fabric_error,
                % This error will be emitted as json so make sure it's rendered
                % to a string first.
                Reason = couch_util:to_binary(Error),
                {Ref, {DocId, {error, {RevStr, Tag, Reason}}, []}}
        end
    end,
    lists:map(MapFun, Args).

bulk_get_ret_json(#httpd{} = Req, ArgsRefs, ResultsMap, Options) ->
    send_json(Req, 200, #{
        <<"results">> => lists:map(
            fun(Ref) ->
                #{Ref := {DocId, Result, DocOpts}} = ResultsMap,
                % We are about to encode the document into json and some of the
                % provided general options might affect that so we make sure to
                % combine all doc options and the general options together
                AllOptions = DocOpts ++ Options,
                #{
                    <<"id">> => DocId,
                    <<"docs">> => bulk_get_result(DocId, Result, AllOptions)
                }
            end,
            ArgsRefs
        )
    }).

bulk_get_result(DocId, {error, {Rev, Error, Reason}}, _Options) ->
    [bulk_get_json_error_map(DocId, Rev, Error, Reason)];
bulk_get_result(DocId, [_ | _] = DocRevs, Options) ->
    MapFun = fun
        ({ok, Doc}) ->
            #{<<"ok">> => couch_doc:to_json_obj(Doc, Options)};
        ({{Error, Reason}, RevId}) ->
            Rev = couch_doc:rev_to_str(RevId),
            bulk_get_json_error_map(DocId, Rev, Error, Reason)
    end,
    lists:map(MapFun, DocRevs).

bulk_get_json_error_map(DocId, Rev, Error, Reason) ->
    #{
        <<"error">> => #{
            <<"id">> => DocId,
            <<"rev">> => Rev,
            <<"error">> => Error,
            <<"reason">> => Reason
        }
    }.

bulk_get_ret_multipart(#httpd{} = Req, ArgsRefs, ResultsMap, Options) ->
    MochiReq = Req#httpd.mochi_req,
    Mixed = MochiReq:accepts_content_type("multipart/mixed"),
    MpType =
        case Mixed of
            true -> "multipart/mixed";
            false -> "multipart/related"
        end,
    Boundary = bulk_get_multipart_boundary(),
    BoundaryCType = MpType ++ "; boundary=\"" ++ ?b2l(Boundary) ++ "\"",
    CType = {"Content-Type", BoundaryCType},
    {ok, Resp} = start_chunked_response(Req, 200, [CType]),
    ForeachFun = fun(Ref) ->
        #{Ref := {DocId, Result, DocOpts}} = ResultsMap,
        case Result of
            [_ | _] = DocRevs ->
                AllOptions = DocOpts ++ Options,
                send_docs_multipart_bulk_get(DocRevs, AllOptions, Boundary, Resp);
            {error, {RevId, Error, Reason}} ->
                EJson = bulk_get_json_error_map(DocId, RevId, Error, Reason),
                Json = ?JSON_ENCODE(map_get(<<"error">>, EJson)),
                ErrCType = <<"Content-Type: application/json">>,
                Prefix = <<"\r\n", ErrCType/binary, "; error=\"true\"\r\n\r\n">>,
                ErrorChunk = [<<"\r\n--", Boundary/binary>>, Prefix, Json],
                couch_httpd:send_chunk(Resp, ErrorChunk)
        end
    end,
    lists:foreach(ForeachFun, ArgsRefs),
    case ArgsRefs of
        [] ->
            % Didn't send any docs, don't need to send a closing boundary
            ok;
        [_ | _] ->
            % Sent at least one doc response, so also send the last boundary
            EndBoundary = <<"\r\n", "--", Boundary/binary, "--\r\n">>,
            couch_httpd:send_chunk(Resp, EndBoundary)
    end,
    couch_httpd:last_chunk(Resp).

bulk_get_parse_doc_query(Req) ->
    lists:foldl(
        fun({Key, Value}, Args) ->
            ok = validate_query_param(Key),
            parse_doc_query({Key, Value}, Args)
        end,
        #doc_query_args{},
        chttpd:qs(Req)
    ).

validate_query_param("open_revs" = Key) ->
    throw_bad_query_param(Key);
validate_query_param("new_edits" = Key) ->
    throw_bad_query_param(Key);
validate_query_param("w" = Key) ->
    throw_bad_query_param(Key);
validate_query_param("rev" = Key) ->
    throw_bad_query_param(Key);
validate_query_param("atts_since" = Key) ->
    throw_bad_query_param(Key);
validate_query_param(_) ->
    ok.

throw_bad_query_param(Key) when is_list(Key) ->
    throw_bad_query_param(?l2b(Key));
throw_bad_query_param(Key) when is_binary(Key) ->
    Msg = <<"\"", Key/binary, "\" query parameter is not acceptable">>,
    throw({bad_request, Msg}).

% Parse and tag bulk_get arguments. Return a list of argument tags in the same
% order as they were provided and a map of #{tag => {DocId, RevOrError,
% DocOpts}. That list is used to return them in the response in the exact same
% order.
%
bulk_get_parse_args(Db, Docs) ->
    Fun = fun(Doc, Acc) ->
        Ref = make_ref(),
        Arg = {_DocId, _RevOrError, _DocOpts} = bulk_get_parse_arg(Db, Doc),
        {Ref, Acc#{Ref => Arg}}
    end,
    lists:mapfoldr(Fun, #{}, Docs).

bulk_get_parse_arg(Db, {[_ | _] = Props}) ->
    bulk_get_parse_doc_id(Db, Props);
bulk_get_parse_arg(_Db, _Invalid) ->
    Error = {null, bad_request, <<"document must be a JSON object">>},
    {null, {error, Error}, []}.

bulk_get_parse_doc_id(Db, [_ | _] = Props) ->
    case couch_util:get_value(<<"id">>, Props) of
        undefined ->
            Error = {null, bad_request, <<"document id missed">>},
            {null, {error, Error}, []};
        DocId ->
            try
                couch_db:validate_docid(Db, DocId),
                bulk_get_parse_revs(Props, DocId)
            catch
                throw:{Error, Reason} ->
                    {DocId, {error, {null, Error, Reason}}, []}
            end
    end.

bulk_get_parse_revs(Props, DocId) ->
    RevStr = couch_util:get_value(<<"rev">>, Props),

    case parse_field(<<"rev">>, RevStr) of
        {error, {RevStr, Error, Reason}} ->
            {DocId, {error, {RevStr, Error, Reason}}, []};
        {ok, undefined} ->
            bulk_get_parse_atts_since(Props, DocId, all);
        {ok, Rev} ->
            bulk_get_parse_atts_since(Props, DocId, [Rev])
    end.

bulk_get_parse_atts_since(Props, DocId, Revs) ->
    AttsSinceStr = couch_util:get_value(<<"atts_since">>, Props),
    case parse_field(<<"atts_since">>, AttsSinceStr) of
        {error, {BadAttsSinceRev, Error, Reason}} ->
            {DocId, {error, {BadAttsSinceRev, Error, Reason}}, []};
        {ok, []} ->
            {DocId, Revs, []};
        {ok, RevList} ->
            Options = [{atts_since, RevList}, attachments],
            {DocId, Revs, Options}
    end.

parse_field(<<"rev">>, undefined) ->
    {ok, undefined};
parse_field(<<"rev">>, Value) ->
    try
        Rev = couch_doc:parse_rev(Value),
        {ok, Rev}
    catch
        throw:{bad_request = Error, Reason} ->
            {error, {Value, Error, Reason}}
    end;
parse_field(<<"atts_since">>, undefined) ->
    {ok, []};
parse_field(<<"atts_since">>, []) ->
    {ok, []};
parse_field(<<"atts_since">>, Value) when is_list(Value) ->
    parse_atts_since(Value, []);
parse_field(<<"atts_since">>, Value) ->
    {error, {Value, bad_request, <<"att_since value must be array of revs.">>}}.

parse_atts_since([], Acc) ->
    {ok, lists:reverse(Acc)};
parse_atts_since([RevStr | Rest], Acc) ->
    case parse_field(<<"rev">>, RevStr) of
        {ok, Rev} ->
            parse_atts_since(Rest, [Rev | Acc]);
        {error, _} = Error ->
            Error
    end.

bulk_get_rev_error(all) ->
    % When revision is not defined respond with `undefined` on error in the
    % revision field.
    <<"undefined">>;
bulk_get_rev_error([{Pos, RevId} = Rev]) when is_integer(Pos), is_binary(RevId) ->
    couch_doc:rev_to_str(Rev).

websocket_upgrade_requested(#httpd{} = Req) ->
    Upgrade = chttpd:header_value(Req, "Upgrade"),
    Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".

client_ws_loop(_Payload, State, _ReplyChannel) ->
    State.

ws_pong(Req) ->
    Socket = ws_socket(Req),
    mochiweb_socket:send(Socket, <<1:1, 0:3, 10:4, 0:8>>).

ws_close(Req, Code) ->
    ws_close(Req, Code, <<>>).

ws_close(Req, Code, Payload) when is_integer(Code), is_binary(Payload)  ->
    Socket = ws_socket(Req),
    Prefix = <<1:1, 0:3, 8:4, (ws_payload_length(iolist_size(Payload) + 2))/binary>>,
    mochiweb_socket:send(Socket, [Prefix, <<Code:16>>, Payload]),
    ws_shutdown(Socket, write),
    ws_drain(Socket),
    mochiweb_socket:close(Socket),
    exit({shutdown, websocket_close}).

ws_drain(Socket) ->
    Result = gen_tcp:recv(Socket, 8192),
    case Result of
        {ok, _Data} ->
            ws_drain(socket);
        {error, closed} ->
            ok
    end.

ws_socket(#cacc{} = Acc) ->
    ws_socket(Acc#cacc.mochi#httpd.mochi_req);
ws_socket({ReqM, _} = Req) ->
    ReqM:get(socket, Req).

ws_shutdown({ssl, Socket}, How) ->
    gen_tcp:shutdown(Socket, How);
ws_shutdown(Socket, How) ->
    gen_tcp:shutdown(Socket, How).

ws_payload_length(N) ->
    case N of
      N when N =< 125 -> <<N>>;
      N when N =< 65535 -> <<126, N:16>>;
      N when N =< 9223372036854775807 -> <<127, N:64>>
    end.

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

monitor_attachments_test_() ->
    {"ignore stubs", fun() ->
        Atts = [couch_att:new([{data, stub}])],
        ?_assertEqual([], monitor_attachments(Atts))
    end}.

parse_partitioned_opt_test_() ->
    {
        foreach,
        fun setup/0,
        fun teardown/1,
        [
            t_should_allow_partitioned_db(),
            t_should_throw_on_not_allowed_partitioned_db(),
            t_returns_empty_array_for_partitioned_false(),
            t_returns_empty_array_for_no_partitioned_qs()
        ]
    }.

parse_shards_opt_test_() ->
    {
        foreach,
        fun setup/0,
        fun teardown/1,
        [
            t_should_allow_valid_q(),
            t_should_default_on_missing_q(),
            t_should_throw_on_invalid_q(),
            t_should_allow_valid_n(),
            t_should_default_on_missing_n(),
            t_should_throw_on_invalid_n(),
            t_should_allow_valid_placement(),
            t_should_default_on_missing_placement(),
            t_should_throw_on_invalid_placement()
        ]
    }.

setup() ->
    meck:expect(config, get, fun(_, _, Default) -> Default end),
    ok.

teardown(_) ->
    meck:unload().

mock_request(Url) ->
    Headers = mochiweb_headers:make([{"Host", "examples.com"}]),
    MochiReq = mochiweb_request:new(nil, 'PUT', Url, {1, 1}, Headers),
    #httpd{mochi_req = MochiReq}.

t_should_allow_partitioned_db() ->
    ?_test(begin
        meck:expect(couch_flags, is_enabled, 2, true),
        Req = mock_request("/all-test21?partitioned=true"),
        [Partitioned, _] = parse_partitioned_opt(Req),
        ?assertEqual(Partitioned, {partitioned, true})
    end).

t_should_throw_on_not_allowed_partitioned_db() ->
    ?_test(begin
        meck:expect(couch_flags, is_enabled, 2, false),
        Req = mock_request("/all-test21?partitioned=true"),
        Throw = {bad_request, <<"Partitioned feature is not enabled.">>},
        ?assertThrow(Throw, parse_partitioned_opt(Req))
    end).

t_returns_empty_array_for_partitioned_false() ->
    ?_test(begin
        Req = mock_request("/all-test21?partitioned=false"),
        ?assertEqual(parse_partitioned_opt(Req), [])
    end).

t_returns_empty_array_for_no_partitioned_qs() ->
    ?_test(begin
        Req = mock_request("/all-test21"),
        ?assertEqual(parse_partitioned_opt(Req), [])
    end).

t_should_allow_valid_q() ->
    ?_test(begin
        Req = mock_request("/all-test21?q=1"),
        Opts = parse_shards_opt(Req),
        ?assertEqual("1", couch_util:get_value(q, Opts))
    end).

t_should_default_on_missing_q() ->
    ?_test(begin
        Req = mock_request("/all-test21"),
        Opts = parse_shards_opt(Req),
        ?assertEqual(2, couch_util:get_value(q, Opts))
    end).

t_should_throw_on_invalid_q() ->
    ?_test(begin
        Req = mock_request("/all-test21?q="),
        Err = <<"The `q` value should be a positive integer.">>,
        ?assertThrow({bad_request, Err}, parse_shards_opt(Req))
    end).

t_should_allow_valid_n() ->
    ?_test(begin
        Req = mock_request("/all-test21?n=1"),
        Opts = parse_shards_opt(Req),
        ?assertEqual("1", couch_util:get_value(n, Opts))
    end).

t_should_default_on_missing_n() ->
    ?_test(begin
        Req = mock_request("/all-test21"),
        Opts = parse_shards_opt(Req),
        ?assertEqual(3, couch_util:get_value(n, Opts))
    end).

t_should_throw_on_invalid_n() ->
    ?_test(begin
        Req = mock_request("/all-test21?n="),
        Err = <<"The `n` value should be a positive integer.">>,
        ?assertThrow({bad_request, Err}, parse_shards_opt(Req))
    end).

t_should_allow_valid_placement() ->
    {
        foreach,
        fun() -> ok end,
        [
            {"single zone",
                ?_test(begin
                    Req = mock_request("/all-test21?placement=az:1"),
                    Opts = parse_shards_opt(Req),
                    ?assertEqual("az:1", couch_util:get_value(placement, Opts))
                end)},
            {"multi zone",
                ?_test(begin
                    Req = mock_request("/all-test21?placement=az:1,co:3"),
                    Opts = parse_shards_opt(Req),
                    ?assertEqual(
                        "az:1,co:3",
                        couch_util:get_value(placement, Opts)
                    )
                end)}
        ]
    }.

t_should_default_on_missing_placement() ->
    ?_test(begin
        Req = mock_request("/all-test21"),
        Opts = parse_shards_opt(Req),
        ?assertEqual(undefined, couch_util:get_value(placement, Opts))
    end).

t_should_throw_on_invalid_placement() ->
    Err = <<"The `placement` value should be in a format `zone:n`.">>,
    {
        foreach,
        fun() -> ok end,
        [
            {"empty placement",
                ?_test(begin
                    Req = mock_request("/all-test21?placement="),
                    ?assertThrow({bad_request, Err}, parse_shards_opt(Req))
                end)},
            {"invalid format",
                ?_test(begin
                    Req = mock_request("/all-test21?placement=moon"),
                    ?assertThrow({bad_request, Err}, parse_shards_opt(Req))
                end)},
            {"invalid n",
                ?_test(begin
                    Req = mock_request("/all-test21?placement=moon:eagle"),
                    ?assertThrow({bad_request, Err}, parse_shards_opt(Req))
                end)},
            {"one invalid zone",
                ?_test(begin
                    Req = mock_request("/all-test21?placement=az:1,co:moon"),
                    ?assertThrow({bad_request, Err}, parse_shards_opt(Req))
                end)}
        ]
    }.

-endif.
