| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_rep_writer). |
| |
| -export([start_link/4]). |
| |
| -include("couch_db.hrl"). |
| |
| start_link(Parent, _Target, Reader, _PostProps) -> |
| {ok, spawn_link(fun() -> writer_loop(Parent, Reader) end)}. |
| |
| writer_loop(Parent, Reader) -> |
| case couch_rep_reader:next(Reader) of |
| {complete, nil} -> |
| ok; |
| {complete, FinalSeq} -> |
| Parent ! {writer_checkpoint, FinalSeq}, |
| ok; |
| {HighSeq, Docs} -> |
| DocCount = length(Docs), |
| {ok, Target} = gen_server:call(Parent, get_target_db, infinity), |
| try write_docs(Target, Docs) of |
| {ok, []} -> |
| Parent ! {update_stats, docs_written, DocCount}; |
| {ok, Errors} -> |
| ErrorCount = length(Errors), |
| Parent ! {update_stats, doc_write_failures, ErrorCount}, |
| Parent ! {update_stats, docs_written, DocCount - ErrorCount} |
| catch |
| {attachment_request_failed, Err} -> |
| ?LOG_DEBUG("writer failed to write an attachment ~p", [Err]), |
| exit({attachment_request_failed, Err, Docs}) |
| end, |
| case HighSeq of |
| nil -> |
| ok; |
| _SeqNumber -> |
| Parent ! {writer_checkpoint, HighSeq} |
| end, |
| couch_rep_att:cleanup(), |
| couch_util:should_flush(), |
| writer_loop(Parent, Reader) |
| end. |
| |
| write_docs(#http_db{} = Db, Docs) -> |
| {DocsAtts, DocsNoAtts} = lists:partition( |
| fun(#doc{atts=[]}) -> false; (_) -> true end, |
| Docs |
| ), |
| ErrorsJson0 = write_bulk_docs(Db, DocsNoAtts), |
| ErrorsJson = lists:foldl( |
| fun(Doc, Acc) -> write_multi_part_doc(Db, Doc) ++ Acc end, |
| ErrorsJson0, |
| DocsAtts |
| ), |
| {ok, ErrorsJson}; |
| write_docs(Db, Docs) -> |
| couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes). |
| |
| write_bulk_docs(_Db, []) -> |
| []; |
| write_bulk_docs(#http_db{headers = Headers} = Db, Docs) -> |
| JsonDocs = [ |
| couch_doc:to_json_obj(Doc, [revs]) || Doc <- Docs |
| ], |
| Request = Db#http_db{ |
| resource = "_bulk_docs", |
| method = post, |
| body = {[{new_edits, false}, {docs, JsonDocs}]}, |
| headers = couch_util:proplist_apply_field({"Content-Type", "application/json"}, [{"X-Couch-Full-Commit", "false"} | Headers]) |
| }, |
| ErrorsJson = case couch_rep_httpc:request(Request) of |
| {FailProps} -> |
| exit({target_error, couch_util:get_value(<<"error">>, FailProps)}); |
| List when is_list(List) -> |
| List |
| end, |
| [write_docs_1(V) || V <- ErrorsJson]. |
| |
| write_multi_part_doc(#http_db{headers=Headers} = Db, #doc{atts=Atts} = Doc) -> |
| JsonBytes = ?JSON_ENCODE( |
| couch_doc:to_json_obj( |
| Doc, |
| [follows, att_encoding_info, attachments, revs] |
| ) |
| ), |
| Boundary = couch_uuids:random(), |
| {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream( |
| Boundary, JsonBytes, Atts, true |
| ), |
| StreamerPid = spawn_link( |
| fun() -> streamer_fun(Boundary, JsonBytes, Atts) end |
| ), |
| BodyFun = fun(Acc) -> |
| DataQueue = case Acc of |
| nil -> |
| StreamerPid ! {start, self()}, |
| receive |
| {queue, Q} -> |
| Q |
| end; |
| Queue -> |
| Queue |
| end, |
| case couch_work_queue:dequeue(DataQueue) of |
| closed -> |
| eof; |
| {ok, Data} -> |
| {ok, iolist_to_binary(Data), DataQueue} |
| end |
| end, |
| Request = Db#http_db{ |
| resource = couch_util:url_encode(Doc#doc.id), |
| method = put, |
| qs = [{new_edits, false}], |
| body = {BodyFun, nil}, |
| headers = [ |
| {"x-couch-full-commit", "false"}, |
| {"Content-Type", ?b2l(ContentType)}, |
| {"Content-Length", Len} | Headers |
| ] |
| }, |
| Result = case couch_rep_httpc:request(Request) of |
| {[{<<"error">>, Error}, {<<"reason">>, Reason}]} -> |
| {Pos, [RevId | _]} = Doc#doc.revs, |
| ErrId = couch_util:to_existing_atom(Error), |
| [{Doc#doc.id, couch_doc:rev_to_str({Pos, RevId})}, {ErrId, Reason}]; |
| _ -> |
| [] |
| end, |
| StreamerPid ! stop, |
| Result. |
| |
| streamer_fun(Boundary, JsonBytes, Atts) -> |
| receive |
| stop -> |
| ok; |
| {start, From} -> |
| % better use a brand new queue, to ensure there's no garbage from |
| % a previous (failed) iteration |
| {ok, DataQueue} = couch_work_queue:new(1024 * 1024, 1000), |
| From ! {queue, DataQueue}, |
| couch_doc:doc_to_multi_part_stream( |
| Boundary, |
| JsonBytes, |
| Atts, |
| fun(Data) -> |
| couch_work_queue:queue(DataQueue, Data) |
| end, |
| true |
| ), |
| couch_work_queue:close(DataQueue), |
| streamer_fun(Boundary, JsonBytes, Atts) |
| end. |
| |
| write_docs_1({Props}) -> |
| Id = couch_util:get_value(<<"id">>, Props), |
| Rev = couch_doc:parse_rev(couch_util:get_value(<<"rev">>, Props)), |
| ErrId = couch_util:to_existing_atom(couch_util:get_value(<<"error">>, Props)), |
| Reason = couch_util:get_value(<<"reason">>, Props), |
| {{Id, Rev}, {ErrId, Reason}}. |