blob: ffe9768f07003da74b0434eff15ba77996f16d62 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_rep_writer).
-export([start_link/4]).
-include("couch_db.hrl").
start_link(Parent, Target, Reader, _PostProps) ->
{ok, spawn_link(fun() -> writer_loop(Parent, Reader, Target) end)}.
writer_loop(Parent, Reader, Target) ->
case couch_rep_reader:next(Reader) of
{complete, FinalSeq} ->
Parent ! {writer_checkpoint, FinalSeq},
ok;
{HighSeq, Docs} ->
DocCount = length(Docs),
try write_docs(Target, Docs) of
{ok, []} ->
Parent ! {update_stats, docs_written, DocCount};
{ok, Errors} ->
ErrorCount = length(Errors),
Parent ! {update_stats, doc_write_failures, ErrorCount},
Parent ! {update_stats, docs_written, DocCount - ErrorCount}
catch
{attachment_request_failed, Err} ->
?LOG_DEBUG("writer failed to write an attachment ~p", [Err]),
exit({attachment_request_failed, Err, Docs})
end,
Parent ! {writer_checkpoint, HighSeq},
couch_rep_att:cleanup(),
couch_util:should_flush(),
writer_loop(Parent, Reader, Target)
end.
write_docs(#http_db{} = Db, Docs) ->
JsonDocs = [couch_doc:to_json_obj(Doc, [revs,attachments]) || Doc <- Docs],
Request = Db#http_db{
resource = "_bulk_docs",
method = post,
body = {[{new_edits, false}, {docs, JsonDocs}]},
headers = [{"x-couch-full-commit", "false"} | Db#http_db.headers]
},
ErrorsJson = case couch_rep_httpc:request(Request) of
{FailProps} ->
exit({target_error, proplists:get_value(<<"error">>, FailProps)});
List when is_list(List) ->
List
end,
ErrorsList =
lists:map(
fun({Props}) ->
Id = proplists:get_value(<<"id">>, Props),
Rev = couch_doc:parse_rev(proplists:get_value(<<"rev">>, Props)),
ErrId = couch_util:to_existing_atom(
proplists:get_value(<<"error">>, Props)),
Reason = proplists:get_value(<<"reason">>, Props),
{{Id, Rev}, {ErrId, Reason}}
end, ErrorsJson),
{ok, ErrorsList};
write_docs(Db, Docs) ->
couch_db:update_docs(Db, Docs, [delay_commit], replicated_changes).