Merge branch '3010-port-429' into apache

COUCHDB-3010
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index f22cac8..e5f6253 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -392,6 +392,8 @@
                     throw({forbidden, get_value(<<"reason">>, Props)});
                 {412, <<"missing_stub">>} ->
                     throw({missing_stub, get_value(<<"reason">>, Props)});
+                {413, _} ->
+                    {error, request_body_too_large};
                 {_, Error} ->
                     {error, Error}
                 end
@@ -448,6 +450,8 @@
             {body, {BodyFun, [prefix | Docs]}}, {headers, Headers}],
         fun(201, _, Results) when is_list(Results) ->
                 {ok, bulk_results_to_errors(DocList, Results, remote)};
+           (413, _, _) ->
+                {error, request_body_too_large};
            (417, _, Results) when is_list(Results) ->
                 {ok, bulk_results_to_errors(DocList, Results, remote)}
         end);
diff --git a/src/couch_replicator_worker.erl b/src/couch_replicator_worker.erl
index 155e11d..ee0c455 100644
--- a/src/couch_replicator_worker.erl
+++ b/src/couch_replicator_worker.erl
@@ -447,23 +447,39 @@
 
 flush_docs(_Target, []) ->
     couch_replicator_stats:new();
-
 flush_docs(Target, DocList) ->
-    {ok, Errors} = couch_replicator_api_wrap:update_docs(
-        Target, DocList, [delay_commit], replicated_changes),
+    FlushResult = couch_replicator_api_wrap:update_docs(Target, DocList,
+        [delay_commit], replicated_changes),
+    handle_flush_docs_result(FlushResult, Target, DocList).
+
+
+handle_flush_docs_result({error, request_body_too_large}, _Target, [Doc]) ->
+    couch_log:error("Replicator: failed to write doc ~p. Too large", [Doc]),
+    couch_replicator_stats:new([{doc_write_failures, 1}]);
+handle_flush_docs_result({error, request_body_too_large}, Target, DocList) ->
+    Len = length(DocList),
+    {DocList1, DocList2} = lists:split(Len div 2, DocList),
+    couch_log:notice("Replicator: couldn't write batch of size ~p to ~p because"
+        " request body is too large. Splitting batch into 2 separate batches of"
+        " sizes ~p and ~p", [Len, couch_replicator_api_wrap:db_uri(Target),
+        length(DocList1), length(DocList2)]),
+    flush_docs(Target, DocList1),
+    flush_docs(Target, DocList2);
+handle_flush_docs_result({ok, Errors}, Target, DocList) ->
     DbUri = couch_replicator_api_wrap:db_uri(Target),
     lists:foreach(
         fun({Props}) ->
-            couch_log:error("Replicator: couldn't write document `~s`, revision `~s`,"
-                " to target database `~s`. Error: `~s`, reason: `~s`.",
-                [get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
-                    get_value(error, Props, ""), get_value(reason, Props, "")])
+            couch_log:error("Replicator: couldn't write document `~s`, revision"
+                " `~s`, to target database `~s`. Error: `~s`, reason: `~s`.", [
+                get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
+                get_value(error, Props, ""), get_value(reason, Props, "")])
         end, Errors),
     couch_replicator_stats:new([
         {docs_written, length(DocList) - length(Errors)},
         {doc_write_failures, length(Errors)}
     ]).
 
+
 flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
     try couch_replicator_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
     {ok, _} ->
diff --git a/test/couch_replicator_small_max_request_size_target.erl b/test/couch_replicator_small_max_request_size_target.erl
new file mode 100644
index 0000000..c46619d
--- /dev/null
+++ b/test/couch_replicator_small_max_request_size_target.erl
@@ -0,0 +1,186 @@
+-module(couch_replicator_small_max_request_size_target).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-import(couch_replicator_test_helper, [
+    db_url/1,
+    replicate/1,
+    compare_dbs/3
+]).
+
+-define(TIMEOUT_EUNIT, 30).
+
+
+setup() ->
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    ok = couch_db:close(Db),
+    DbName.
+
+
+setup(local) ->
+    setup();
+
+setup(remote) ->
+    {remote, setup()};
+
+setup({A, B}) ->
+    Ctx = test_util:start_couch([couch_replicator]),
+    config:set("couchdb", "max_document_size", "10000", false),
+    Source = setup(A),
+    Target = setup(B),
+    {Ctx, {Source, Target}}.
+
+
+teardown({remote, DbName}) ->
+    teardown(DbName);
+teardown(DbName) ->
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+    ok.
+
+teardown(_, {Ctx, {Source, Target}}) ->
+    teardown(Source),
+    teardown(Target),
+    ok = application:stop(couch_replicator),
+    ok = test_util:stop_couch(Ctx).
+
+
+reduce_max_request_size_test_() ->
+    Pairs = [{local, remote}, {remote, remote}],
+    {
+        "Replicate docs when target has a small max_document_size",
+        {
+            foreachx,
+            fun setup/1, fun teardown/2,
+            [{Pair, fun should_replicate_all_docs/2}
+             || Pair <- Pairs]
+            ++ [{Pair, fun should_replicate_one/2}
+             || Pair <- Pairs]
+            % Test below fails currently because of:
+            %  https://issues.apache.org/jira/browse/COUCHDB-3174
+            % Once that is fixed, can re-enable it
+            % ++ [{Pair, fun should_replicate_one_with_attachment/2}
+            %  || Pair <- Pairs]
+        }
+    }.
+
+
+% Test documents which are below max_document_size but when batched, batch size
+% will be greater than max_document_size. Replicator could automatically split
+% the batch into smaller batches and POST those separately.
+should_replicate_all_docs({From, To}, {_Ctx, {Source, Target}}) ->
+    {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+     {inorder, [should_populate_source(Source),
+                should_replicate(Source, Target),
+                should_compare_databases(Source, Target, [])]}}.
+
+
+% If a document is too large to post as a single request, that document is
+% skipped but replication overall will make progress and not crash.
+should_replicate_one({From, To}, {_Ctx, {Source, Target}}) ->
+    {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+     {inorder, [should_populate_source_one_large_one_small(Source),
+                should_replicate(Source, Target),
+                should_compare_databases(Source, Target, [<<"doc0">>])]}}.
+
+
+% If a document has an attachment > 64 * 1024 bytes, replicator will switch to
+% POST-ing individual documents directly and skip bulk_docs. Test that case
+% separately
+should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) ->
+    {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+     {inorder, [should_populate_source_one_large_attachment(Source),
+                should_populate_source(Source),
+                should_replicate(Source, Target),
+                should_compare_databases(Source, Target, [<<"doc0">>])]}}.
+
+
+should_populate_source({remote, Source}) ->
+    should_populate_source(Source);
+
+should_populate_source(Source) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(add_docs(Source, 5, 3000, 0))}.
+
+
+should_populate_source_one_large_one_small({remote, Source}) ->
+    should_populate_source_one_large_one_small(Source);
+
+should_populate_source_one_large_one_small(Source) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_one_small(Source, 12000, 3000))}.
+
+
+should_populate_source_one_large_attachment({remote, Source}) ->
+    should_populate_source_one_large_attachment(Source);
+
+should_populate_source_one_large_attachment(Source) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}.
+
+
+should_replicate({remote, Source}, Target) ->
+    should_replicate(db_url(Source), Target);
+
+should_replicate(Source, {remote, Target}) ->
+    should_replicate(Source, db_url(Target));
+
+should_replicate(Source, Target) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}.
+
+
+should_compare_databases({remote, Source}, Target, ExceptIds) ->
+    should_compare_databases(Source, Target, ExceptIds);
+
+should_compare_databases(Source, {remote, Target}, ExceptIds) ->
+    should_compare_databases(Source, Target, ExceptIds);
+
+should_compare_databases(Source, Target, ExceptIds) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target, ExceptIds))}.
+
+
+binary_chunk(Size) when is_integer(Size), Size > 0 ->
+    << <<"x">> || _ <- lists:seq(1, Size) >>.
+
+
+add_docs(DbName, DocCount, DocSize, AttSize) ->
+    [begin
+        DocId = iolist_to_binary(["doc", integer_to_list(Id)]),
+        add_doc(DbName, DocId, DocSize, AttSize)
+    end || Id <- lists:seq(1, DocCount)],
+    ok.
+
+
+one_large_one_small(DbName, Large, Small) ->
+    add_doc(DbName, <<"doc0">>, Large, 0),
+    add_doc(DbName, <<"doc1">>, Small, 0).
+
+
+one_large_attachment(DbName, Size, AttSize) ->
+    add_doc(DbName, <<"doc0">>, Size, AttSize).
+
+
+add_doc(DbName, DocId, Size, AttSize) when is_binary(DocId) ->
+     {ok, Db} = couch_db:open_int(DbName, []),
+     Doc0 = #doc{id = DocId, body = {[{<<"x">>, binary_chunk(Size)}]}},
+     Doc = Doc0#doc{atts = atts(AttSize)},
+     {ok, _} = couch_db:update_doc(Db, Doc, []),
+     couch_db:close(Db).
+
+
+atts(0) ->
+    [];
+
+atts(Size) ->
+    [couch_att:new([
+        {name, <<"att1">>},
+        {type, <<"app/binary">>},
+        {att_len, Size},
+        {data, fun(Bytes) -> binary_chunk(Bytes) end}
+    ])].
+
+
+replicate(Source, Target) ->
+    replicate({[
+        {<<"source">>, Source},
+        {<<"target">>, Target},
+        {<<"worker_processes">>, "1"} %  This make batch_size predictable
+    ]}).
diff --git a/test/couch_replicator_test_helper.erl b/test/couch_replicator_test_helper.erl
index 5b9d366..398b27b 100644
--- a/test/couch_replicator_test_helper.erl
+++ b/test/couch_replicator_test_helper.erl
@@ -3,57 +3,27 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
--export([compare_dbs/2, db_url/1, replicate/2]).
+-export([compare_dbs/2, compare_dbs/3, db_url/1, replicate/1, replicate/2]).
+
 
 compare_dbs(Source, Target) ->
+    compare_dbs(Source, Target, []).
+
+
+compare_dbs(Source, Target, ExceptIds) ->
     {ok, SourceDb} = couch_db:open_int(Source, []),
     {ok, TargetDb} = couch_db:open_int(Target, []),
 
     Fun = fun(FullDocInfo, _, Acc) ->
         {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
         Id = DocSource#doc.id,
-
-        {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
-        ?assertEqual(DocSource#doc.body, DocTarget#doc.body),
-
-        #doc{atts = SourceAtts} = DocSource,
-        #doc{atts = TargetAtts} = DocTarget,
-        ?assertEqual(lists:sort([couch_att:fetch(name, Att) || Att <- SourceAtts]),
-                     lists:sort([couch_att:fetch(name, Att) || Att <- TargetAtts])),
-
-        FunCompareAtts = fun(Att) ->
-            AttName = couch_att:fetch(name, Att),
-            {ok, AttTarget} = find_att(TargetAtts, AttName),
-            SourceMd5 = att_md5(Att),
-            TargetMd5 = att_md5(AttTarget),
-            case AttName of
-                <<"att1">> ->
-                    ?assertEqual(gzip, couch_att:fetch(encoding, Att)),
-                    ?assertEqual(gzip, couch_att:fetch(encoding, AttTarget)),
-                    DecSourceMd5 = att_decoded_md5(Att),
-                    DecTargetMd5 = att_decoded_md5(AttTarget),
-                    ?assertEqual(DecSourceMd5, DecTargetMd5);
-                _ ->
-                    ?assertEqual(identity, couch_att:fetch(encoding, AttTarget)),
-                    ?assertEqual(identity, couch_att:fetch(encoding, AttTarget))
-            end,
-            ?assertEqual(SourceMd5, TargetMd5),
-            ?assert(is_integer(couch_att:fetch(disk_len, Att))),
-            ?assert(is_integer(couch_att:fetch(att_len, Att))),
-            ?assert(is_integer(couch_att:fetch(disk_len, AttTarget))),
-            ?assert(is_integer(couch_att:fetch(att_len, AttTarget))),
-            ?assertEqual(couch_att:fetch(disk_len, Att),
-                         couch_att:fetch(disk_len, AttTarget)),
-            ?assertEqual(couch_att:fetch(att_len, Att),
-                         couch_att:fetch(att_len, AttTarget)),
-            ?assertEqual(couch_att:fetch(type, Att),
-                         couch_att:fetch(type, AttTarget)),
-            ?assertEqual(couch_att:fetch(md5, Att),
-                         couch_att:fetch(md5, AttTarget))
+        case lists:member(Id, ExceptIds) of
+            true ->
+                ?assertEqual(not_found, couch_db:get_doc_info(TargetDb, Id));
+            false ->
+                {ok, TDoc} = couch_db:open_doc(TargetDb, Id),
+                compare_docs(DocSource, TDoc)
         end,
-
-        lists:foreach(FunCompareAtts, SourceAtts),
-
         {ok, Acc}
     end,
 
@@ -61,6 +31,46 @@
     ok = couch_db:close(SourceDb),
     ok = couch_db:close(TargetDb).
 
+
+compare_docs(Doc1, Doc2) ->
+    ?assertEqual(Doc1#doc.body, Doc2#doc.body),
+    #doc{atts = Atts1} = Doc1,
+    #doc{atts = Atts2} = Doc2,
+    ?assertEqual(lists:sort([couch_att:fetch(name, Att) || Att <- Atts1]),
+                 lists:sort([couch_att:fetch(name, Att) || Att <- Atts2])),
+    FunCompareAtts = fun(Att) ->
+        AttName = couch_att:fetch(name, Att),
+        {ok, AttTarget} = find_att(Atts2, AttName),
+        SourceMd5 = att_md5(Att),
+        TargetMd5 = att_md5(AttTarget),
+        case AttName of
+            <<"att1">> ->
+                ?assertEqual(gzip, couch_att:fetch(encoding, Att)),
+                ?assertEqual(gzip, couch_att:fetch(encoding, AttTarget)),
+                DecSourceMd5 = att_decoded_md5(Att),
+                DecTargetMd5 = att_decoded_md5(AttTarget),
+                ?assertEqual(DecSourceMd5, DecTargetMd5);
+            _ ->
+                ?assertEqual(identity, couch_att:fetch(encoding, AttTarget)),
+                ?assertEqual(identity, couch_att:fetch(encoding, AttTarget))
+        end,
+        ?assertEqual(SourceMd5, TargetMd5),
+        ?assert(is_integer(couch_att:fetch(disk_len, Att))),
+        ?assert(is_integer(couch_att:fetch(att_len, Att))),
+        ?assert(is_integer(couch_att:fetch(disk_len, AttTarget))),
+        ?assert(is_integer(couch_att:fetch(att_len, AttTarget))),
+        ?assertEqual(couch_att:fetch(disk_len, Att),
+                     couch_att:fetch(disk_len, AttTarget)),
+        ?assertEqual(couch_att:fetch(att_len, Att),
+                     couch_att:fetch(att_len, AttTarget)),
+        ?assertEqual(couch_att:fetch(type, Att),
+                     couch_att:fetch(type, AttTarget)),
+        ?assertEqual(couch_att:fetch(md5, Att),
+                     couch_att:fetch(md5, AttTarget))
+    end,
+    lists:foreach(FunCompareAtts, Atts1).
+
+
 find_att([], _Name) ->
     nil;
 find_att([Att | Rest], Name) ->
@@ -94,10 +104,12 @@
     ]).
 
 replicate(Source, Target) ->
-    RepObject = {[
+    replicate({[
         {<<"source">>, Source},
         {<<"target">>, Target}
-    ]},
+    ]}).
+
+replicate({[_ | _]} = RepObject) ->
     {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
     {ok, Pid} = couch_replicator:async_replicate(Rep),
     MonRef = erlang:monitor(process, Pid),