blob: 0e09f72649882d49b03080b45957f2170c15373d [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_httpd_multipart).
-export([
abort_multipart_stream/1,
decode_multipart_stream/3,
encode_multipart_stream/5,
length_multipart_stream/3,
num_mp_writers/0,
num_mp_writers/1
]).
-export([
parse_multipart_request/3
]).
-include_lib("couch/include/couch_db.hrl").
decode_multipart_stream(ContentType, DataFun, Ref) ->
Parent = self(),
NumMpWriters = num_mp_writers(),
{Parser, ParserRef} = spawn_monitor(fun() ->
ParentRef = erlang:monitor(process, Parent),
put(mp_parent_ref, ParentRef),
num_mp_writers(NumMpWriters),
{<<"--",_/binary>>, _, _} = parse_multipart_request(
ContentType, DataFun,
fun(Next) -> mp_parse_doc(Next, []) end),
unlink(Parent)
end),
Parser ! {get_doc_bytes, Ref, self()},
receive
{started_open_doc_revs, NewRef} ->
%% FIXME: How to remove the knowledge about this message?
{{started_open_doc_revs, NewRef}, Parser, ParserRef};
{doc_bytes, Ref, DocBytes} ->
{{doc_bytes, Ref, DocBytes}, Parser, ParserRef};
{'DOWN', ParserRef, _, _, normal} ->
ok;
{'DOWN', ParserRef, process, Parser, {{nocatch, {Error, Msg}}, _}} ->
couch_log:error("Multipart streamer ~p died with reason ~p",
[ParserRef, Msg]),
throw({Error, Msg});
{'DOWN', ParserRef, _, _, Reason} ->
couch_log:error("Multipart streamer ~p died with reason ~p",
[ParserRef, Reason]),
throw({error, Reason})
end.
mp_parse_doc({headers, H}, []) ->
case couch_util:get_value("content-type", H) of
{"application/json", _} ->
fun (Next) ->
mp_parse_doc(Next, [])
end;
_ ->
throw({bad_ctype, <<"Content-Type must be application/json">>})
end;
mp_parse_doc({body, Bytes}, AccBytes) ->
fun (Next) ->
mp_parse_doc(Next, [Bytes | AccBytes])
end;
mp_parse_doc(body_end, AccBytes) ->
receive {get_doc_bytes, Ref, From} ->
From ! {doc_bytes, Ref, lists:reverse(AccBytes)}
end,
fun(Next) ->
mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []})
end.
mp_parse_atts({headers, _}, Acc) ->
fun(Next) -> mp_parse_atts(Next, Acc) end;
mp_parse_atts(body_end, Acc) ->
fun(Next) -> mp_parse_atts(Next, Acc) end;
mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}) ->
case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}) of
abort_parsing ->
fun(Next) -> mp_abort_parse_atts(Next, nil) end;
NewAcc ->
fun(Next) -> mp_parse_atts(Next, NewAcc) end
end;
mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
N = num_mp_writers(),
M = length(Counters),
case (M == N) andalso Chunks == [] of
true ->
ok;
false ->
ParentRef = get(mp_parent_ref),
receive
abort_parsing ->
ok;
{get_bytes, Ref, From} ->
C2 = orddict:update_counter(From, 1, Counters),
NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}),
mp_parse_atts(eof, NewAcc);
{'DOWN', ParentRef, _, _, _} ->
exit(mp_reader_coordinator_died)
after 3600000 ->
ok
end
end.
mp_abort_parse_atts(eof, _) ->
ok;
mp_abort_parse_atts(_, _) ->
fun(Next) -> mp_abort_parse_atts(Next, nil) end.
maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
receive {get_bytes, Ref, From} ->
NewCounters = orddict:update_counter(From, 1, Counters),
maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
after 0 ->
% reply to as many writers as possible
NewWaiting = lists:filter(fun(Writer) ->
WhichChunk = orddict:fetch(Writer, Counters),
ListIndex = WhichChunk - Offset,
if ListIndex =< length(Chunks) ->
Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)},
false;
true ->
true
end
end, Waiting),
% check if we can drop a chunk from the head of the list
case Counters of
[] ->
SmallestIndex = 0;
_ ->
SmallestIndex = lists:min(element(2, lists:unzip(Counters)))
end,
Size = length(Counters),
N = num_mp_writers(),
if Size == N andalso SmallestIndex == (Offset+1) ->
NewChunks = tl(Chunks),
NewOffset = Offset+1;
true ->
NewChunks = Chunks,
NewOffset = Offset
end,
% we should wait for a writer if no one has written the last chunk
LargestIndex = lists:max([0|element(2, lists:unzip(Counters))]),
if LargestIndex >= (Offset + length(Chunks)) ->
% someone has written all possible chunks, keep moving
{Ref, NewChunks, NewOffset, Counters, NewWaiting};
true ->
ParentRef = get(mp_parent_ref),
receive
abort_parsing ->
abort_parsing;
{'DOWN', ParentRef, _, _, _} ->
exit(mp_reader_coordinator_died);
{get_bytes, Ref, X} ->
C2 = orddict:update_counter(X, 1, Counters),
maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]})
end
end
end.
num_mp_writers(N) ->
erlang:put(mp_att_writers, N).
num_mp_writers() ->
case erlang:get(mp_att_writers) of
undefined -> 1;
Count -> Count
end.
encode_multipart_stream(_Boundary, JsonBytes, [], WriteFun, _AttFun) ->
WriteFun(JsonBytes);
encode_multipart_stream(Boundary, JsonBytes, Atts, WriteFun, AttFun) ->
WriteFun([<<"--", Boundary/binary,
"\r\nContent-Type: application/json\r\n\r\n">>,
JsonBytes, <<"\r\n--", Boundary/binary>>]),
atts_to_mp(Atts, Boundary, WriteFun, AttFun).
atts_to_mp([], _Boundary, WriteFun, _AttFun) ->
WriteFun(<<"--">>);
atts_to_mp([{Att, Name, Len, Type, Encoding} | RestAtts], Boundary, WriteFun,
AttFun) ->
LengthBin = list_to_binary(integer_to_list(Len)),
% write headers
WriteFun(<<"\r\nContent-Disposition: attachment; filename=\"", Name/binary, "\"">>),
WriteFun(<<"\r\nContent-Type: ", Type/binary>>),
WriteFun(<<"\r\nContent-Length: ", LengthBin/binary>>),
case Encoding of
identity ->
ok;
_ ->
EncodingBin = atom_to_binary(Encoding, latin1),
WriteFun(<<"\r\nContent-Encoding: ", EncodingBin/binary>>)
end,
% write data
WriteFun(<<"\r\n\r\n">>),
AttFun(Att, fun(Data, _) -> WriteFun(Data) end, ok),
WriteFun(<<"\r\n--", Boundary/binary>>),
atts_to_mp(RestAtts, Boundary, WriteFun, AttFun).
length_multipart_stream(Boundary, JsonBytes, Atts) ->
AttsSize = lists:foldl(fun({_Att, Name, Len, Type, Encoding}, AccAttsSize) ->
AccAttsSize +
4 + % "\r\n\r\n"
length(integer_to_list(Len)) +
Len +
4 + % "\r\n--"
size(Boundary) +
% attachment headers
% (the length of the Content-Length has already been set)
size(Name) +
size(Type) +
length("\r\nContent-Disposition: attachment; filename=\"\"") +
length("\r\nContent-Type: ") +
length("\r\nContent-Length: ") +
case Encoding of
identity ->
0;
_ ->
length(atom_to_list(Encoding)) +
length("\r\nContent-Encoding: ")
end
end, 0, Atts),
if AttsSize == 0 ->
{<<"application/json">>, iolist_size(JsonBytes)};
true ->
{<<"multipart/related; boundary=\"", Boundary/binary, "\"">>,
2 + % "--"
size(Boundary) +
36 + % "\r\ncontent-type: application/json\r\n\r\n"
iolist_size(JsonBytes) +
4 + % "\r\n--"
size(Boundary) +
+ AttsSize +
2 % "--"
}
end.
abort_multipart_stream(Parser) ->
MonRef = erlang:monitor(process, Parser),
Parser ! abort_parsing,
receive
{'DOWN', MonRef, _, _, _} -> ok
after 60000 ->
% One minute is quite on purpose for this timeout. We
% want to try and read data to keep the socket open
% when possible but we also don't want to just make
% this a super long timeout because people have to
% wait this long to see if they just had an error
% like a validate_doc_update failure.
throw(multi_part_abort_timeout)
end.
-record(mp, {boundary, buffer, data_fun, callback}).
parse_multipart_request(ContentType, DataFun, Callback) ->
Boundary0 = iolist_to_binary(get_boundary(ContentType)),
Boundary = <<"\r\n--", Boundary0/binary>>,
Mp = #mp{boundary= Boundary,
buffer= <<>>,
data_fun=DataFun,
callback=Callback},
{Mp2, _NilCallback} = read_until(Mp, <<"--", Boundary0/binary>>,
fun nil_callback/1),
#mp{buffer=Buffer, data_fun=DataFun2, callback=Callback2} =
parse_part_header(Mp2),
{Buffer, DataFun2, Callback2}.
nil_callback(_Data)->
fun nil_callback/1.
get_boundary({"multipart/" ++ _, Opts}) ->
case couch_util:get_value("boundary", Opts) of
S when is_list(S) ->
S
end;
get_boundary(ContentType) ->
{"multipart/" ++ _ , Opts} = mochiweb_util:parse_header(ContentType),
get_boundary({"multipart/", Opts}).
split_header(<<>>) ->
[];
split_header(Line) ->
{Name, Rest} = lists:splitwith(fun (C) -> C =/= $: end,
binary_to_list(Line)),
[$: | Value] = case Rest of
[] ->
throw({bad_request, <<"bad part header">>});
Res ->
Res
end,
[{string:to_lower(string:strip(Name)),
mochiweb_util:parse_header(Value)}].
read_until(#mp{data_fun=DataFun, buffer=Buffer}=Mp, Pattern, Callback) ->
case couch_util:find_in_binary(Pattern, Buffer) of
not_found ->
Callback2 = Callback(Buffer),
{Buffer2, DataFun2} = DataFun(),
Buffer3 = iolist_to_binary(Buffer2),
read_until(Mp#mp{data_fun=DataFun2,buffer=Buffer3}, Pattern, Callback2);
{partial, 0} ->
{NewData, DataFun2} = DataFun(),
read_until(Mp#mp{data_fun=DataFun2,
buffer= iolist_to_binary([Buffer,NewData])},
Pattern, Callback);
{partial, Skip} ->
<<DataChunk:Skip/binary, Rest/binary>> = Buffer,
Callback2 = Callback(DataChunk),
{NewData, DataFun2} = DataFun(),
read_until(Mp#mp{data_fun=DataFun2,
buffer= iolist_to_binary([Rest | NewData])},
Pattern, Callback2);
{exact, 0} ->
PatternLen = size(Pattern),
<<_:PatternLen/binary, Rest/binary>> = Buffer,
{Mp#mp{buffer= Rest}, Callback};
{exact, Skip} ->
PatternLen = size(Pattern),
<<DataChunk:Skip/binary, _:PatternLen/binary, Rest/binary>> = Buffer,
Callback2 = Callback(DataChunk),
{Mp#mp{buffer= Rest}, Callback2}
end.
parse_part_header(#mp{callback=UserCallBack}=Mp) ->
{Mp2, AccCallback} = read_until(Mp, <<"\r\n\r\n">>,
fun(Next) -> acc_callback(Next, []) end),
HeaderData = AccCallback(get_data),
Headers =
lists:foldl(fun(Line, Acc) ->
split_header(Line) ++ Acc
end, [], re:split(HeaderData,<<"\r\n">>, [])),
NextCallback = UserCallBack({headers, Headers}),
parse_part_body(Mp2#mp{callback=NextCallback}).
parse_part_body(#mp{boundary=Prefix, callback=Callback}=Mp) ->
{Mp2, WrappedCallback} = read_until(Mp, Prefix,
fun(Data) -> body_callback_wrapper(Data, Callback) end),
Callback2 = WrappedCallback(get_callback),
Callback3 = Callback2(body_end),
case check_for_last(Mp2#mp{callback=Callback3}) of
{last, #mp{callback=Callback3}=Mp3} ->
Mp3#mp{callback=Callback3(eof)};
{more, Mp3} ->
parse_part_header(Mp3)
end.
acc_callback(get_data, Acc)->
iolist_to_binary(lists:reverse(Acc));
acc_callback(Data, Acc)->
fun(Next) -> acc_callback(Next, [Data | Acc]) end.
body_callback_wrapper(get_callback, Callback) ->
Callback;
body_callback_wrapper(Data, Callback) ->
Callback2 = Callback({body, Data}),
fun(Next) -> body_callback_wrapper(Next, Callback2) end.
check_for_last(#mp{buffer=Buffer, data_fun=DataFun}=Mp) ->
case Buffer of
<<"--",_/binary>> -> {last, Mp};
<<_, _, _/binary>> -> {more, Mp};
_ -> % not long enough
{Data, DataFun2} = DataFun(),
check_for_last(Mp#mp{buffer= <<Buffer/binary, Data/binary>>,
data_fun = DataFun2})
end.