Add tests which check small values of max_document_size setting on the target

A low max_document_size setting on the target will interact with the replicator,
this commit adds a few tests to check that interaction.

There are 3 test scenarios:

 * A basic test checks that individual document sizes can be smaller than
  max_document_size yet, when batched together by the replicator they exceed,
  the maximum size. Replicator in that case should split document batches into
  halves down to individual documents, such that the replication should succeed.

 * one_large_one_small test checks that a large single document should be
  skipped such that it doesn't end on the target and it doesn't crash the
  replication job (so the small document should reach the target).

 * The third test is currently disable because of COUCHDB-3174. Once that
  issue is fixed, it will test a corner case in replicator when it
  switches from using batches and POST-ing to _bulk_docs to using individual
  PUT's with multipart/mixed Content-Type. Those PUT request can also return
  413 error code, so this tests it explicitly.

Jira: COUCHDB-3168
diff --git a/test/couch_replicator_small_max_request_size_target.erl b/test/couch_replicator_small_max_request_size_target.erl
new file mode 100644
index 0000000..c46619d
--- /dev/null
+++ b/test/couch_replicator_small_max_request_size_target.erl
@@ -0,0 +1,186 @@
+-module(couch_replicator_small_max_request_size_target).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-import(couch_replicator_test_helper, [
+    db_url/1,
+    replicate/1,
+    compare_dbs/3
+]).
+
+-define(TIMEOUT_EUNIT, 30).
+
+
+setup() ->
+    DbName = ?tempdb(),
+    {ok, Db} = couch_db:create(DbName, [?ADMIN_CTX]),
+    ok = couch_db:close(Db),
+    DbName.
+
+
+setup(local) ->
+    setup();
+
+setup(remote) ->
+    {remote, setup()};
+
+setup({A, B}) ->
+    Ctx = test_util:start_couch([couch_replicator]),
+    config:set("couchdb", "max_document_size", "10000", false),
+    Source = setup(A),
+    Target = setup(B),
+    {Ctx, {Source, Target}}.
+
+
+teardown({remote, DbName}) ->
+    teardown(DbName);
+teardown(DbName) ->
+    ok = couch_server:delete(DbName, [?ADMIN_CTX]),
+    ok.
+
+teardown(_, {Ctx, {Source, Target}}) ->
+    teardown(Source),
+    teardown(Target),
+    ok = application:stop(couch_replicator),
+    ok = test_util:stop_couch(Ctx).
+
+
+reduce_max_request_size_test_() ->
+    Pairs = [{local, remote}, {remote, remote}],
+    {
+        "Replicate docs when target has a small max_document_size",
+        {
+            foreachx,
+            fun setup/1, fun teardown/2,
+            [{Pair, fun should_replicate_all_docs/2}
+             || Pair <- Pairs]
+            ++ [{Pair, fun should_replicate_one/2}
+             || Pair <- Pairs]
+            % Test below fails currently because of:
+            %  https://issues.apache.org/jira/browse/COUCHDB-3174
+            % Once that is fixed, can re-enable it
+            % ++ [{Pair, fun should_replicate_one_with_attachment/2}
+            %  || Pair <- Pairs]
+        }
+    }.
+
+
+% Test documents which are below max_document_size but when batched, batch size
+% will be greater than max_document_size. Replicator could automatically split
+% the batch into smaller batches and POST those separately.
+should_replicate_all_docs({From, To}, {_Ctx, {Source, Target}}) ->
+    {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+     {inorder, [should_populate_source(Source),
+                should_replicate(Source, Target),
+                should_compare_databases(Source, Target, [])]}}.
+
+
+% If a document is too large to post as a single request, that document is
+% skipped but replication overall will make progress and not crash.
+should_replicate_one({From, To}, {_Ctx, {Source, Target}}) ->
+    {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+     {inorder, [should_populate_source_one_large_one_small(Source),
+                should_replicate(Source, Target),
+                should_compare_databases(Source, Target, [<<"doc0">>])]}}.
+
+
+% If a document has an attachment > 64 * 1024 bytes, replicator will switch to
+% POST-ing individual documents directly and skip bulk_docs. Test that case
+% separately
+should_replicate_one_with_attachment({From, To}, {_Ctx, {Source, Target}}) ->
+    {lists:flatten(io_lib:format("~p -> ~p", [From, To])),
+     {inorder, [should_populate_source_one_large_attachment(Source),
+                should_populate_source(Source),
+                should_replicate(Source, Target),
+                should_compare_databases(Source, Target, [<<"doc0">>])]}}.
+
+
+should_populate_source({remote, Source}) ->
+    should_populate_source(Source);
+
+should_populate_source(Source) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(add_docs(Source, 5, 3000, 0))}.
+
+
+should_populate_source_one_large_one_small({remote, Source}) ->
+    should_populate_source_one_large_one_small(Source);
+
+should_populate_source_one_large_one_small(Source) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_one_small(Source, 12000, 3000))}.
+
+
+should_populate_source_one_large_attachment({remote, Source}) ->
+    should_populate_source_one_large_attachment(Source);
+
+should_populate_source_one_large_attachment(Source) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(one_large_attachment(Source, 70000, 70000))}.
+
+
+should_replicate({remote, Source}, Target) ->
+    should_replicate(db_url(Source), Target);
+
+should_replicate(Source, {remote, Target}) ->
+    should_replicate(Source, db_url(Target));
+
+should_replicate(Source, Target) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(replicate(Source, Target))}.
+
+
+should_compare_databases({remote, Source}, Target, ExceptIds) ->
+    should_compare_databases(Source, Target, ExceptIds);
+
+should_compare_databases(Source, {remote, Target}, ExceptIds) ->
+    should_compare_databases(Source, Target, ExceptIds);
+
+should_compare_databases(Source, Target, ExceptIds) ->
+    {timeout, ?TIMEOUT_EUNIT, ?_test(compare_dbs(Source, Target, ExceptIds))}.
+
+
+binary_chunk(Size) when is_integer(Size), Size > 0 ->
+    << <<"x">> || _ <- lists:seq(1, Size) >>.
+
+
+add_docs(DbName, DocCount, DocSize, AttSize) ->
+    [begin
+        DocId = iolist_to_binary(["doc", integer_to_list(Id)]),
+        add_doc(DbName, DocId, DocSize, AttSize)
+    end || Id <- lists:seq(1, DocCount)],
+    ok.
+
+
+one_large_one_small(DbName, Large, Small) ->
+    add_doc(DbName, <<"doc0">>, Large, 0),
+    add_doc(DbName, <<"doc1">>, Small, 0).
+
+
+one_large_attachment(DbName, Size, AttSize) ->
+    add_doc(DbName, <<"doc0">>, Size, AttSize).
+
+
+add_doc(DbName, DocId, Size, AttSize) when is_binary(DocId) ->
+     {ok, Db} = couch_db:open_int(DbName, []),
+     Doc0 = #doc{id = DocId, body = {[{<<"x">>, binary_chunk(Size)}]}},
+     Doc = Doc0#doc{atts = atts(AttSize)},
+     {ok, _} = couch_db:update_doc(Db, Doc, []),
+     couch_db:close(Db).
+
+
+atts(0) ->
+    [];
+
+atts(Size) ->
+    [couch_att:new([
+        {name, <<"att1">>},
+        {type, <<"app/binary">>},
+        {att_len, Size},
+        {data, fun(Bytes) -> binary_chunk(Bytes) end}
+    ])].
+
+
+replicate(Source, Target) ->
+    replicate({[
+        {<<"source">>, Source},
+        {<<"target">>, Target},
+        {<<"worker_processes">>, "1"} %  This make batch_size predictable
+    ]}).
diff --git a/test/couch_replicator_test_helper.erl b/test/couch_replicator_test_helper.erl
index 5b9d366..398b27b 100644
--- a/test/couch_replicator_test_helper.erl
+++ b/test/couch_replicator_test_helper.erl
@@ -3,57 +3,27 @@
 -include_lib("couch/include/couch_eunit.hrl").
 -include_lib("couch/include/couch_db.hrl").
 
--export([compare_dbs/2, db_url/1, replicate/2]).
+-export([compare_dbs/2, compare_dbs/3, db_url/1, replicate/1, replicate/2]).
+
 
 compare_dbs(Source, Target) ->
+    compare_dbs(Source, Target, []).
+
+
+compare_dbs(Source, Target, ExceptIds) ->
     {ok, SourceDb} = couch_db:open_int(Source, []),
     {ok, TargetDb} = couch_db:open_int(Target, []),
 
     Fun = fun(FullDocInfo, _, Acc) ->
         {ok, DocSource} = couch_db:open_doc(SourceDb, FullDocInfo),
         Id = DocSource#doc.id,
-
-        {ok, DocTarget} = couch_db:open_doc(TargetDb, Id),
-        ?assertEqual(DocSource#doc.body, DocTarget#doc.body),
-
-        #doc{atts = SourceAtts} = DocSource,
-        #doc{atts = TargetAtts} = DocTarget,
-        ?assertEqual(lists:sort([couch_att:fetch(name, Att) || Att <- SourceAtts]),
-                     lists:sort([couch_att:fetch(name, Att) || Att <- TargetAtts])),
-
-        FunCompareAtts = fun(Att) ->
-            AttName = couch_att:fetch(name, Att),
-            {ok, AttTarget} = find_att(TargetAtts, AttName),
-            SourceMd5 = att_md5(Att),
-            TargetMd5 = att_md5(AttTarget),
-            case AttName of
-                <<"att1">> ->
-                    ?assertEqual(gzip, couch_att:fetch(encoding, Att)),
-                    ?assertEqual(gzip, couch_att:fetch(encoding, AttTarget)),
-                    DecSourceMd5 = att_decoded_md5(Att),
-                    DecTargetMd5 = att_decoded_md5(AttTarget),
-                    ?assertEqual(DecSourceMd5, DecTargetMd5);
-                _ ->
-                    ?assertEqual(identity, couch_att:fetch(encoding, AttTarget)),
-                    ?assertEqual(identity, couch_att:fetch(encoding, AttTarget))
-            end,
-            ?assertEqual(SourceMd5, TargetMd5),
-            ?assert(is_integer(couch_att:fetch(disk_len, Att))),
-            ?assert(is_integer(couch_att:fetch(att_len, Att))),
-            ?assert(is_integer(couch_att:fetch(disk_len, AttTarget))),
-            ?assert(is_integer(couch_att:fetch(att_len, AttTarget))),
-            ?assertEqual(couch_att:fetch(disk_len, Att),
-                         couch_att:fetch(disk_len, AttTarget)),
-            ?assertEqual(couch_att:fetch(att_len, Att),
-                         couch_att:fetch(att_len, AttTarget)),
-            ?assertEqual(couch_att:fetch(type, Att),
-                         couch_att:fetch(type, AttTarget)),
-            ?assertEqual(couch_att:fetch(md5, Att),
-                         couch_att:fetch(md5, AttTarget))
+        case lists:member(Id, ExceptIds) of
+            true ->
+                ?assertEqual(not_found, couch_db:get_doc_info(TargetDb, Id));
+            false ->
+                {ok, TDoc} = couch_db:open_doc(TargetDb, Id),
+                compare_docs(DocSource, TDoc)
         end,
-
-        lists:foreach(FunCompareAtts, SourceAtts),
-
         {ok, Acc}
     end,
 
@@ -61,6 +31,46 @@
     ok = couch_db:close(SourceDb),
     ok = couch_db:close(TargetDb).
 
+
+compare_docs(Doc1, Doc2) ->
+    ?assertEqual(Doc1#doc.body, Doc2#doc.body),
+    #doc{atts = Atts1} = Doc1,
+    #doc{atts = Atts2} = Doc2,
+    ?assertEqual(lists:sort([couch_att:fetch(name, Att) || Att <- Atts1]),
+                 lists:sort([couch_att:fetch(name, Att) || Att <- Atts2])),
+    FunCompareAtts = fun(Att) ->
+        AttName = couch_att:fetch(name, Att),
+        {ok, AttTarget} = find_att(Atts2, AttName),
+        SourceMd5 = att_md5(Att),
+        TargetMd5 = att_md5(AttTarget),
+        case AttName of
+            <<"att1">> ->
+                ?assertEqual(gzip, couch_att:fetch(encoding, Att)),
+                ?assertEqual(gzip, couch_att:fetch(encoding, AttTarget)),
+                DecSourceMd5 = att_decoded_md5(Att),
+                DecTargetMd5 = att_decoded_md5(AttTarget),
+                ?assertEqual(DecSourceMd5, DecTargetMd5);
+            _ ->
+                ?assertEqual(identity, couch_att:fetch(encoding, AttTarget)),
+                ?assertEqual(identity, couch_att:fetch(encoding, AttTarget))
+        end,
+        ?assertEqual(SourceMd5, TargetMd5),
+        ?assert(is_integer(couch_att:fetch(disk_len, Att))),
+        ?assert(is_integer(couch_att:fetch(att_len, Att))),
+        ?assert(is_integer(couch_att:fetch(disk_len, AttTarget))),
+        ?assert(is_integer(couch_att:fetch(att_len, AttTarget))),
+        ?assertEqual(couch_att:fetch(disk_len, Att),
+                     couch_att:fetch(disk_len, AttTarget)),
+        ?assertEqual(couch_att:fetch(att_len, Att),
+                     couch_att:fetch(att_len, AttTarget)),
+        ?assertEqual(couch_att:fetch(type, Att),
+                     couch_att:fetch(type, AttTarget)),
+        ?assertEqual(couch_att:fetch(md5, Att),
+                     couch_att:fetch(md5, AttTarget))
+    end,
+    lists:foreach(FunCompareAtts, Atts1).
+
+
 find_att([], _Name) ->
     nil;
 find_att([Att | Rest], Name) ->
@@ -94,10 +104,12 @@
     ]).
 
 replicate(Source, Target) ->
-    RepObject = {[
+    replicate({[
         {<<"source">>, Source},
         {<<"target">>, Target}
-    ]},
+    ]}).
+
+replicate({[_ | _]} = RepObject) ->
     {ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
     {ok, Pid} = couch_replicator:async_replicate(Rep),
     MonRef = erlang:monitor(process, Pid),