| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_httpd_multipart). |
| |
| -export([ |
| abort_multipart_stream/1, |
| decode_multipart_stream/3, |
| encode_multipart_stream/5, |
| length_multipart_stream/3, |
| num_mp_writers/0, |
| num_mp_writers/1 |
| ]). |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| decode_multipart_stream(ContentType, DataFun, Ref) -> |
| Parent = self(), |
| NumMpWriters = num_mp_writers(), |
| {Parser, ParserRef} = spawn_monitor(fun() -> |
| ParentRef = erlang:monitor(process, Parent), |
| put(mp_parent_ref, ParentRef), |
| num_mp_writers(NumMpWriters), |
| {<<"--",_/binary>>, _, _} = couch_httpd:parse_multipart_request( |
| ContentType, DataFun, |
| fun(Next) -> mp_parse_doc(Next, []) end), |
| unlink(Parent) |
| end), |
| Parser ! {get_doc_bytes, Ref, self()}, |
| receive |
| {started_open_doc_revs, NewRef} -> |
| %% FIXME: How to remove the knowledge about this message? |
| {{started_open_doc_revs, NewRef}, Parser, ParserRef}; |
| {doc_bytes, Ref, DocBytes} -> |
| {{doc_bytes, Ref, DocBytes}, Parser, ParserRef}; |
| {'DOWN', ParserRef, _, _, normal} -> |
| ok; |
| {'DOWN', ParserRef, process, Parser, {{nocatch, {Error, Msg}}, _}} -> |
| couch_log:error("Multipart streamer ~p died with reason ~p", |
| [ParserRef, Msg]), |
| throw({Error, Msg}); |
| {'DOWN', ParserRef, _, _, Reason} -> |
| couch_log:error("Multipart streamer ~p died with reason ~p", |
| [ParserRef, Reason]), |
| throw({error, Reason}) |
| end. |
| |
| |
| mp_parse_doc({headers, H}, []) -> |
| case couch_util:get_value("content-type", H) of |
| {"application/json", _} -> |
| fun (Next) -> |
| mp_parse_doc(Next, []) |
| end; |
| _ -> |
| throw({bad_ctype, <<"Content-Type must be application/json">>}) |
| end; |
| mp_parse_doc({body, Bytes}, AccBytes) -> |
| fun (Next) -> |
| mp_parse_doc(Next, [Bytes | AccBytes]) |
| end; |
| mp_parse_doc(body_end, AccBytes) -> |
| receive {get_doc_bytes, Ref, From} -> |
| From ! {doc_bytes, Ref, lists:reverse(AccBytes)} |
| end, |
| fun(Next) -> |
| mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []}) |
| end. |
| |
| mp_parse_atts({headers, _}, Acc) -> |
| fun(Next) -> mp_parse_atts(Next, Acc) end; |
| mp_parse_atts(body_end, Acc) -> |
| fun(Next) -> mp_parse_atts(Next, Acc) end; |
| mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}) -> |
| case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}) of |
| abort_parsing -> |
| fun(Next) -> mp_abort_parse_atts(Next, nil) end; |
| NewAcc -> |
| fun(Next) -> mp_parse_atts(Next, NewAcc) end |
| end; |
| mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> |
| N = num_mp_writers(), |
| M = length(Counters), |
| case (M == N) andalso Chunks == [] of |
| true -> |
| ok; |
| false -> |
| ParentRef = get(mp_parent_ref), |
| receive |
| abort_parsing -> |
| ok; |
| {get_bytes, Ref, From} -> |
| C2 = update_writer(From, Counters), |
| case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of |
| abort_parsing -> |
| ok; |
| NewAcc -> |
| mp_parse_atts(eof, NewAcc) |
| end; |
| {'DOWN', ParentRef, _, _, _} -> |
| exit(mp_reader_coordinator_died); |
| {'DOWN', WriterRef, _, WriterPid, _} -> |
| case remove_writer(WriterPid, WriterRef, Counters) of |
| abort_parsing -> |
| ok; |
| C2 -> |
| NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]}, |
| mp_parse_atts(eof, NewAcc) |
| end |
| after 300000 -> |
| ok |
| end |
| end. |
| |
| mp_abort_parse_atts(eof, _) -> |
| ok; |
| mp_abort_parse_atts(_, _) -> |
| fun(Next) -> mp_abort_parse_atts(Next, nil) end. |
| |
| maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> |
| receive {get_bytes, Ref, From} -> |
| NewCounters = update_writer(From, Counters), |
| maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]}) |
| after 0 -> |
| % reply to as many writers as possible |
| NewWaiting = lists:filter(fun(Writer) -> |
| {_, WhichChunk} = orddict:fetch(Writer, Counters), |
| ListIndex = WhichChunk - Offset, |
| if ListIndex =< length(Chunks) -> |
| Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)}, |
| false; |
| true -> |
| true |
| end |
| end, Waiting), |
| |
| % check if we can drop a chunk from the head of the list |
| SmallestIndex = case Counters of |
| [] -> |
| 0; |
| _ -> |
| lists:min([C || {_WPid, {_WRef, C}} <- Counters]) |
| end, |
| Size = length(Counters), |
| N = num_mp_writers(), |
| if Size == N andalso SmallestIndex == (Offset+1) -> |
| NewChunks = tl(Chunks), |
| NewOffset = Offset+1; |
| true -> |
| NewChunks = Chunks, |
| NewOffset = Offset |
| end, |
| |
| % we should wait for a writer if no one has written the last chunk |
| LargestIndex = lists:max([0] ++ [C || {_WPid, {_WRef, C}} <- Counters]), |
| if LargestIndex >= (Offset + length(Chunks)) -> |
| % someone has written all possible chunks, keep moving |
| {Ref, NewChunks, NewOffset, Counters, NewWaiting}; |
| true -> |
| ParentRef = get(mp_parent_ref), |
| receive |
| abort_parsing -> |
| abort_parsing; |
| {'DOWN', ParentRef, _, _, _} -> |
| exit(mp_reader_coordinator_died); |
| {'DOWN', WriterRef, _, WriterPid, _} -> |
| case remove_writer(WriterPid, WriterRef, Counters) of |
| abort_parsing -> |
| abort_parsing; |
| C2 -> |
| RestWaiting = NewWaiting -- [WriterPid], |
| NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting}, |
| maybe_send_data(NewAcc) |
| end; |
| {get_bytes, Ref, X} -> |
| C2 = update_writer(X, Counters), |
| maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]}) |
| after 300000 -> |
| abort_parsing |
| end |
| end |
| end. |
| |
| |
| update_writer(WriterPid, Counters) -> |
| UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end, |
| InitialValue = case orddict:find(WriterPid, Counters) of |
| {ok, IV} -> |
| IV; |
| error -> |
| WriterRef = erlang:monitor(process, WriterPid), |
| {WriterRef, 1} |
| end, |
| orddict:update(WriterPid, UpdateFun, InitialValue, Counters). |
| |
| |
| remove_writer(WriterPid, WriterRef, Counters) -> |
| case orddict:find(WriterPid, Counters) of |
| {ok, {WriterRef, _}} -> |
| case num_mp_writers() of |
| N when N > 1 -> |
| num_mp_writers(N - 1), |
| orddict:erase(WriterPid, Counters); |
| _ -> |
| abort_parsing |
| end; |
| {ok, _} -> |
| % We got a different ref fired for a known worker |
| abort_parsing; |
| error -> |
| % Unknown worker pid? |
| abort_parsing |
| end. |
| |
| |
| num_mp_writers(N) -> |
| erlang:put(mp_att_writers, N). |
| |
| |
| num_mp_writers() -> |
| case erlang:get(mp_att_writers) of |
| undefined -> 1; |
| Count -> Count |
| end. |
| |
| encode_multipart_stream(_Boundary, JsonBytes, [], WriteFun, _AttFun) -> |
| WriteFun(JsonBytes); |
| encode_multipart_stream(Boundary, JsonBytes, Atts, WriteFun, AttFun) -> |
| WriteFun([<<"--", Boundary/binary, |
| "\r\nContent-Type: application/json\r\n\r\n">>, |
| JsonBytes, <<"\r\n--", Boundary/binary>>]), |
| atts_to_mp(Atts, Boundary, WriteFun, AttFun). |
| |
| atts_to_mp([], _Boundary, WriteFun, _AttFun) -> |
| WriteFun(<<"--">>); |
| atts_to_mp([{Att, Name, Len, Type, Encoding} | RestAtts], Boundary, WriteFun, |
| AttFun) -> |
| LengthBin = list_to_binary(integer_to_list(Len)), |
| % write headers |
| WriteFun(<<"\r\nContent-Disposition: attachment; filename=\"", Name/binary, "\"">>), |
| WriteFun(<<"\r\nContent-Type: ", Type/binary>>), |
| WriteFun(<<"\r\nContent-Length: ", LengthBin/binary>>), |
| case Encoding of |
| identity -> |
| ok; |
| _ -> |
| EncodingBin = atom_to_binary(Encoding, latin1), |
| WriteFun(<<"\r\nContent-Encoding: ", EncodingBin/binary>>) |
| end, |
| |
| % write data |
| WriteFun(<<"\r\n\r\n">>), |
| AttFun(Att, fun(Data, _) -> WriteFun(Data) end, ok), |
| WriteFun(<<"\r\n--", Boundary/binary>>), |
| atts_to_mp(RestAtts, Boundary, WriteFun, AttFun). |
| |
| length_multipart_stream(Boundary, JsonBytes, Atts) -> |
| AttsSize = lists:foldl(fun({_Att, Name, Len, Type, Encoding}, AccAttsSize) -> |
| AccAttsSize + |
| 4 + % "\r\n\r\n" |
| length(integer_to_list(Len)) + |
| Len + |
| 4 + % "\r\n--" |
| size(Boundary) + |
| % attachment headers |
| % (the length of the Content-Length has already been set) |
| size(Name) + |
| size(Type) + |
| length("\r\nContent-Disposition: attachment; filename=\"\"") + |
| length("\r\nContent-Type: ") + |
| length("\r\nContent-Length: ") + |
| case Encoding of |
| identity -> |
| 0; |
| _ -> |
| length(atom_to_list(Encoding)) + |
| length("\r\nContent-Encoding: ") |
| end |
| end, 0, Atts), |
| if AttsSize == 0 -> |
| {<<"application/json">>, iolist_size(JsonBytes)}; |
| true -> |
| {<<"multipart/related; boundary=\"", Boundary/binary, "\"">>, |
| 2 + % "--" |
| size(Boundary) + |
| 36 + % "\r\ncontent-type: application/json\r\n\r\n" |
| iolist_size(JsonBytes) + |
| 4 + % "\r\n--" |
| size(Boundary) + |
| + AttsSize + |
| 2 % "--" |
| } |
| end. |
| |
| abort_multipart_stream(Parser) -> |
| MonRef = erlang:monitor(process, Parser), |
| Parser ! abort_parsing, |
| receive |
| {'DOWN', MonRef, _, _, _} -> ok |
| after 60000 -> |
| % One minute is quite on purpose for this timeout. We |
| % want to try and read data to keep the socket open |
| % when possible but we also don't want to just make |
| % this a super long timeout because people have to |
| % wait this long to see if they just had an error |
| % like a validate_doc_update failure. |
| throw(multi_part_abort_timeout) |
| end. |