Merge branch '3010-port-429' into apache
COUCHDB-3010
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index f22cac8..e5f6253 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -392,6 +392,8 @@
throw({forbidden, get_value(<<"reason">>, Props)});
{412, <<"missing_stub">>} ->
throw({missing_stub, get_value(<<"reason">>, Props)});
+ {413, _} ->
+ {error, request_body_too_large};
{_, Error} ->
{error, Error}
end
@@ -448,6 +450,8 @@
{body, {BodyFun, [prefix | Docs]}}, {headers, Headers}],
fun(201, _, Results) when is_list(Results) ->
{ok, bulk_results_to_errors(DocList, Results, remote)};
+ (413, _, _) ->
+ {error, request_body_too_large};
(417, _, Results) when is_list(Results) ->
{ok, bulk_results_to_errors(DocList, Results, remote)}
end);
diff --git a/src/couch_replicator_worker.erl b/src/couch_replicator_worker.erl
index 155e11d..ee0c455 100644
--- a/src/couch_replicator_worker.erl
+++ b/src/couch_replicator_worker.erl
@@ -447,23 +447,39 @@
flush_docs(_Target, []) ->
couch_replicator_stats:new();
-
flush_docs(Target, DocList) ->
- {ok, Errors} = couch_replicator_api_wrap:update_docs(
- Target, DocList, [delay_commit], replicated_changes),
+ FlushResult = couch_replicator_api_wrap:update_docs(Target, DocList,
+ [delay_commit], replicated_changes),
+ handle_flush_docs_result(FlushResult, Target, DocList).
+
+
+handle_flush_docs_result({error, request_body_too_large}, _Target, [Doc]) ->
+ couch_log:error("Replicator: failed to write doc ~p. Too large", [Doc]),
+ couch_replicator_stats:new([{doc_write_failures, 1}]);
+handle_flush_docs_result({error, request_body_too_large}, Target, DocList) ->
+ Len = length(DocList),
+ {DocList1, DocList2} = lists:split(Len div 2, DocList),
+ couch_log:notice("Replicator: couldn't write batch of size ~p to ~p because"
+ " request body is too large. Splitting batch into 2 separate batches of"
+ " sizes ~p and ~p", [Len, couch_replicator_api_wrap:db_uri(Target),
+ length(DocList1), length(DocList2)]),
+ flush_docs(Target, DocList1),
+ flush_docs(Target, DocList2);
+handle_flush_docs_result({ok, Errors}, Target, DocList) ->
DbUri = couch_replicator_api_wrap:db_uri(Target),
lists:foreach(
fun({Props}) ->
- couch_log:error("Replicator: couldn't write document `~s`, revision `~s`,"
- " to target database `~s`. Error: `~s`, reason: `~s`.",
- [get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
- get_value(error, Props, ""), get_value(reason, Props, "")])
+ couch_log:error("Replicator: couldn't write document `~s`, revision"
+ " `~s`, to target database `~s`. Error: `~s`, reason: `~s`.", [
+ get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
+ get_value(error, Props, ""), get_value(reason, Props, "")])
end, Errors),
couch_replicator_stats:new([
{docs_written, length(DocList) - length(Errors)},
{doc_write_failures, length(Errors)}
]).
+
flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
try couch_replicator_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
{ok, _} ->
diff --git a/test/couch_replicator_small_max_request_size_target.erl b/test/couch_replicator_small_max_request_size_target.erl
new file mode 100644
index 0000000..c46619d
--- /dev/null
+++ b/test/couch_replicator_small_max_request_size_target.erl
@@ -0,0 +1,186 @@
+-module(couch_replicator_small_max_request_size_target).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-import(couch_replicator_test_helper, [
+ db_url/1,
+ replicate/1,
+ compare_dbs/3
+]).
+
+-define(TIMEOUT_EUNIT, 30).
+
+
+setup() ->
+ DbName = ?tempdb(),
+ {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+ ok = couch_db:close(Db),
+ DbName.
+
+
+setup(local) ->
+ setup();
+
+setup(remote) ->
+ {remote, setup()};
+
+setup({A, B}) ->
+ Ctx = test_util:start_couch([couch_replicator]),
+ config:set("couchdb", "max_document_size", "10000", false),
+ Source = setup(A),
+ Target = setup(B),
+ {Ctx, {Source, Target}}.
+
+
+teardown({remote, DbName}) ->
+ teardown(DbName);
+teardown(DbName) ->
+ ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+ ok.
+
+teardown(_, {Ctx, {Source, Target}}) ->
+ teardown(Source),
+ teardown(Target),
+ ok = application:stop(couch_replicator),
+ ok = test_util:stop_couch(Ctx).
+
+
+reduce_max_request_size_test_() ->
+ Pairs = [{local, remote}, {remote, remote}],
+ {
+ "Replicate docs when target has a small max_document_size",
+ {
+ foreachx,
+ fun setup/1, fun teardown/2,
+ [{Pair, fun should_replicate_all_docs/2}
+ || Pair <- Pairs]
+ ++ [{Pair, fun should_replicate_one/2}
+ || Pair <- Pairs]
+ % Test below fails currently because of:
+ % https://issues.apache.org/jira/browse/COUCHDB-3174
+ % Once that is fixed, can re-enable it
+ % ++ [{Pair, fun should_replicate_one_with_attachment/2}
+ % || Pair <- Pairs]
+ }
+ }.
+
+
+% Test documents which are below max_document_size but when batched, batch size
+% will be greater than max_document_size. Replicator could automatically split
+% the batch into smaller batches and POST those separately.
+should_replicate_all_docs({From, To}, {_Ctx, {Source, Target}}) ->
+ {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+ {inorder, [should_populate_source(Source),
+ should_replicate(Source, Target),
+ should_compare_databases(Source, Target, [])]}}.
+
+
+% If a document is too large to post as a single request, that document is
+% skipped but replication overall will make progress and not crash.
+should_replicate_one({From, To}, {_Ctx, {Source, Target}}) ->
+ {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+ {inorder, [should_populate_source_one_large_one_small(Source),
+ should_replicate(Source, Target),
+ should_compare_databases(Source, Target, [<<"doc0">>])]}}.
+
+
+% If a document has an attachment > 64 * 1024 bytes, replicator will switch to
+% POST-ing individual documents directly and skip bulk_docs. Test that case
+% separately
+should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) ->
+ {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+ {inorder, [should_populate_source_one_large_attachment(Source),
+ should_populate_source(Source),
+ should_replicate(Source, Target),
+ should_compare_databases(Source, Target, [<<"doc0">>])]}}.
+
+
+should_populate_source({remote, Source}) ->
+ should_populate_source(Source);
+
+should_populate_source(Source) ->
+ {timeout, ?TIMEOUT_EUNIT, ?_test(add_docs(Source, 5, 3000, 0))}.
+
+
+should_populate_source_one_large_one_small({remote, Source}) ->
+ should_populate_source_one_large_one_small(Source);
+
+should_populate_source_one_large_one_small(Source) ->
+ {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_one_small(Source, 12000, 3000))}.
+
+
+should_populate_source_one_large_attachment({remote, Source}) ->
+ should_populate_source_one_large_attachment(Source);
+
+should_populate_source_one_large_attachment(Source) ->
+ {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}.
+
+
+should_replicate({remote, Source}, Target) ->
+ should_replicate(db_url(Source), Target);
+
+should_replicate(Source, {remote, Target}) ->
+ should_replicate(Source, db_url(Target));
+
+should_replicate(Source, Target) ->
+ {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}.
+
+
+should_compare_databases({remote, Source}, Target, ExceptIds) ->
+ should_compare_databases(Source, Target, ExceptIds);
+
+should_compare_databases(Source, {remote, Target}, ExceptIds) ->
+ should_compare_databases(Source, Target, ExceptIds);
+
+should_compare_databases(Source, Target, ExceptIds) ->
+ {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target, ExceptIds))}.
+
+
+binary_chunk(Size) when is_integer(Size), Size > 0 ->
+ << <<"x">> || _ <- lists:seq(1, Size) >>.
+
+
+add_docs(DbName, DocCount, DocSize, AttSize) ->
+ [begin
+ DocId = iolist_to_binary(["doc", integer_to_list(Id)]),
+ add_doc(DbName, DocId, DocSize, AttSize)
+ end || Id <- lists:seq(1, DocCount)],
+ ok.
+
+
+one_large_one_small(DbName, Large, Small) ->
+ add_doc(DbName, <<"doc0">>, Large, 0),
+ add_doc(DbName, <<"doc1">>, Small, 0).
+
+
+one_large_attachment(DbName, Size, AttSize) ->
+ add_doc(DbName, <<"doc0">>, Size, AttSize).
+
+
+add_doc(DbName, DocId, Size, AttSize) when is_binary(DocId) ->
+ {ok, Db} = couch_db:open_int(DbName, []),
+ Doc0 = #doc{id = DocId, body = {[{<<"x">>, binary_chunk(Size)}]}},
+ Doc = Doc0#doc{atts = atts(AttSize)},
+ {ok, _} = couch_db:update_doc(Db, Doc, []),
+ couch_db:close(Db).
+
+
+atts(0) ->
+ [];
+
+atts(Size) ->
+ [couch_att:new([
+ {name, <<"att1">>},
+ {type, <<"app/binary">>},
+ {att_len, Size},
+ {data, fun(Bytes) -> binary_chunk(Bytes) end}
+ ])].
+
+
+replicate(Source, Target) ->
+ replicate({[
+ {<<"source">>, Source},
+ {<<"target">>, Target},
+ {<<"worker_processes">>, "1"} % This make batch_size predictable
+ ]}).
diff --git a/test/couch_replicator_test_helper.erl b/test/couch_replicator_test_helper.erl
index 5b9d366..398b27b 100644
--- a/test/couch_replicator_test_helper.erl
+++ b/test/couch_replicator_test_helper.erl
@@ -3,57 +3,27 @@
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
--export([compare_dbs/2, db_url/1, replicate/2]).
+-export([compare_dbs/2, compare_dbs/3, db_url/1, replicate/1, replicate/2]).
+
compare_dbs(Source, Target) ->
+ compare_dbs(Source, Target, []).
+
+
+compare_dbs(Source, Target, ExceptIds) ->
{ok, SourceDb} = couch_db:open_int(Source, []),
{ok, TargetDb} = couch_db:open_int(Target, []),
Fun = fun(FullDocInfo, _, Acc) ->
{ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
Id = DocSource#doc.id,
-
- {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
- ?assertEqual(DocSource#doc.body, DocTarget#doc.body),
-
- #doc{atts = SourceAtts} = DocSource,
- #doc{atts = TargetAtts} = DocTarget,
- ?assertEqual(lists:sort([couch_att:fetch(name, Att) || Att <- SourceAtts]),
- lists:sort([couch_att:fetch(name, Att) || Att <- TargetAtts])),
-
- FunCompareAtts = fun(Att) ->
- AttName = couch_att:fetch(name, Att),
- {ok, AttTarget} = find_att(TargetAtts, AttName),
- SourceMd5 = att_md5(Att),
- TargetMd5 = att_md5(AttTarget),
- case AttName of
- <<"att1">> ->
- ?assertEqual(gzip, couch_att:fetch(encoding, Att)),
- ?assertEqual(gzip, couch_att:fetch(encoding, AttTarget)),
- DecSourceMd5 = att_decoded_md5(Att),
- DecTargetMd5 = att_decoded_md5(AttTarget),
- ?assertEqual(DecSourceMd5, DecTargetMd5);
- _ ->
- ?assertEqual(identity, couch_att:fetch(encoding, AttTarget)),
- ?assertEqual(identity, couch_att:fetch(encoding, AttTarget))
- end,
- ?assertEqual(SourceMd5, TargetMd5),
- ?assert(is_integer(couch_att:fetch(disk_len, Att))),
- ?assert(is_integer(couch_att:fetch(att_len, Att))),
- ?assert(is_integer(couch_att:fetch(disk_len, AttTarget))),
- ?assert(is_integer(couch_att:fetch(att_len, AttTarget))),
- ?assertEqual(couch_att:fetch(disk_len, Att),
- couch_att:fetch(disk_len, AttTarget)),
- ?assertEqual(couch_att:fetch(att_len, Att),
- couch_att:fetch(att_len, AttTarget)),
- ?assertEqual(couch_att:fetch(type, Att),
- couch_att:fetch(type, AttTarget)),
- ?assertEqual(couch_att:fetch(md5, Att),
- couch_att:fetch(md5, AttTarget))
+ case lists:member(Id, ExceptIds) of
+ true ->
+ ?assertEqual(not_found, couch_db:get_doc_info(TargetDb, Id));
+ false ->
+ {ok, TDoc} = couch_db:open_doc(TargetDb, Id),
+ compare_docs(DocSource, TDoc)
end,
-
- lists:foreach(FunCompareAtts, SourceAtts),
-
{ok, Acc}
end,
@@ -61,6 +31,46 @@
ok = couch_db:close(SourceDb),
ok = couch_db:close(TargetDb).
+
+compare_docs(Doc1, Doc2) ->
+ ?assertEqual(Doc1#doc.body, Doc2#doc.body),
+ #doc{atts = Atts1} = Doc1,
+ #doc{atts = Atts2} = Doc2,
+ ?assertEqual(lists:sort([couch_att:fetch(name, Att) || Att <- Atts1]),
+ lists:sort([couch_att:fetch(name, Att) || Att <- Atts2])),
+ FunCompareAtts = fun(Att) ->
+ AttName = couch_att:fetch(name, Att),
+ {ok, AttTarget} = find_att(Atts2, AttName),
+ SourceMd5 = att_md5(Att),
+ TargetMd5 = att_md5(AttTarget),
+ case AttName of
+ <<"att1">> ->
+ ?assertEqual(gzip, couch_att:fetch(encoding, Att)),
+ ?assertEqual(gzip, couch_att:fetch(encoding, AttTarget)),
+ DecSourceMd5 = att_decoded_md5(Att),
+ DecTargetMd5 = att_decoded_md5(AttTarget),
+ ?assertEqual(DecSourceMd5, DecTargetMd5);
+ _ ->
+ ?assertEqual(identity, couch_att:fetch(encoding, AttTarget)),
+ ?assertEqual(identity, couch_att:fetch(encoding, AttTarget))
+ end,
+ ?assertEqual(SourceMd5, TargetMd5),
+ ?assert(is_integer(couch_att:fetch(disk_len, Att))),
+ ?assert(is_integer(couch_att:fetch(att_len, Att))),
+ ?assert(is_integer(couch_att:fetch(disk_len, AttTarget))),
+ ?assert(is_integer(couch_att:fetch(att_len, AttTarget))),
+ ?assertEqual(couch_att:fetch(disk_len, Att),
+ couch_att:fetch(disk_len, AttTarget)),
+ ?assertEqual(couch_att:fetch(att_len, Att),
+ couch_att:fetch(att_len, AttTarget)),
+ ?assertEqual(couch_att:fetch(type, Att),
+ couch_att:fetch(type, AttTarget)),
+ ?assertEqual(couch_att:fetch(md5, Att),
+ couch_att:fetch(md5, AttTarget))
+ end,
+ lists:foreach(FunCompareAtts, Atts1).
+
+
find_att([], _Name) ->
nil;
find_att([Att | Rest], Name) ->
@@ -94,10 +104,12 @@
]).
replicate(Source, Target) ->
- RepObject = {[
+ replicate({[
{<<"source">>, Source},
{<<"target">>, Target}
- ]},
+ ]}).
+
+replicate({[_ | _]} = RepObject) ->
{ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
{ok, Pid} = couch_replicator:async_replicate(Rep),
MonRef = erlang:monitor(process, Pid),