blob: 16423e85c1577dcfa57daa2e4926269518b87082 [file] [log] [blame]
-module(couch_replicator_test_helper).
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/src/couch_replicator.hrl").
-define(USERNAME, "rep_test_user").
-define(PASSWORD, "rep_test_pass").
-export([
cluster_compare_dbs/2,
cluster_compare_dbs/3,
cluster_doc_revs/1,
cluster_open_rev/3,
cluster_url/0,
cluster_db_url/1,
replicate/1,
get_pid/1,
replicate/2,
test_setup/0,
test_teardown/1,
setup_db/0,
teardown_db/1
]).
cluster_compare_dbs(Source, Target) ->
cluster_compare_dbs(Source, Target, []).
cluster_compare_dbs(Source, Target, ExceptIds) ->
?assertMatch({ok, [_ | _]}, fabric:get_db_info(Source)),
?assertMatch({ok, [_ | _]}, fabric:get_db_info(Target)),
lists:foreach(
fun({Id, Rev}) ->
SrcDoc = cluster_open_rev(Source, Id, Rev),
TgtDoc = cluster_open_rev(Target, Id, Rev),
case lists:member(Id, ExceptIds) of
true ->
?assertEqual(not_found, TgtDoc);
false ->
compare_docs(SrcDoc, TgtDoc)
end
end,
cluster_doc_revs(Source)
).
cluster_open_rev(DbName, Id, Rev) ->
{ok, [Result]} = fabric:open_revs(DbName, Id, [Rev], []),
case Result of
{ok, #doc{} = Doc} ->
Doc;
{{not_found, missing}, _} ->
not_found
end.
cluster_doc_revs(DbName) ->
Opts = [{style, all_docs}],
{ok, Acc} = fabric_util:isolate(fun() ->
fabric:changes(DbName, fun changes_callback/2, [], Opts)
end),
Acc.
changes_callback(start, Acc) ->
{ok, Acc};
changes_callback({change, {Change}}, Acc) ->
Id = proplists:get_value(id, Change),
Revs = proplists:get_value(changes, Change),
IdRevs = [{Id, couch_doc:parse_rev(R)} || {[{<<"rev">>, R}]} <- Revs],
{ok, IdRevs ++ Acc};
changes_callback(timeout, Acc) ->
{ok, Acc};
changes_callback({stop, _EndSeq, _Pending}, Acc) ->
{ok, Acc}.
compare_docs(#doc{} = Doc1, not_found) ->
error({not_found, Doc1#doc.id});
compare_docs(Doc1, Doc2) ->
?assertEqual(Doc1#doc.body, Doc2#doc.body),
#doc{atts = Atts1} = Doc1,
#doc{atts = Atts2} = Doc2,
?assertEqual(
lists:sort([couch_att:fetch(name, Att) || Att <- Atts1]),
lists:sort([couch_att:fetch(name, Att) || Att <- Atts2])
),
FunCompareAtts = fun(Att) ->
AttName = couch_att:fetch(name, Att),
{ok, AttTarget} = find_att(Atts2, AttName),
SourceMd5 = att_md5(Att),
TargetMd5 = att_md5(AttTarget),
case AttName of
<<"att1">> ->
?assertEqual(gzip, couch_att:fetch(encoding, Att)),
?assertEqual(gzip, couch_att:fetch(encoding, AttTarget)),
DecSourceMd5 = att_decoded_md5(Att),
DecTargetMd5 = att_decoded_md5(AttTarget),
?assertEqual(DecSourceMd5, DecTargetMd5);
_ ->
?assertEqual(identity, couch_att:fetch(encoding, AttTarget)),
?assertEqual(identity, couch_att:fetch(encoding, AttTarget))
end,
?assertEqual(SourceMd5, TargetMd5),
?assert(is_integer(couch_att:fetch(disk_len, Att))),
?assert(is_integer(couch_att:fetch(att_len, Att))),
?assert(is_integer(couch_att:fetch(disk_len, AttTarget))),
?assert(is_integer(couch_att:fetch(att_len, AttTarget))),
?assertEqual(
couch_att:fetch(disk_len, Att),
couch_att:fetch(disk_len, AttTarget)
),
?assertEqual(
couch_att:fetch(att_len, Att),
couch_att:fetch(att_len, AttTarget)
),
?assertEqual(
couch_att:fetch(type, Att),
couch_att:fetch(type, AttTarget)
),
?assertEqual(
couch_att:fetch(md5, Att),
couch_att:fetch(md5, AttTarget)
)
end,
lists:foreach(FunCompareAtts, Atts1).
find_att([], _Name) ->
nil;
find_att([Att | Rest], Name) ->
case couch_att:fetch(name, Att) of
Name ->
{ok, Att};
_ ->
find_att(Rest, Name)
end.
att_md5(Att) ->
Md50 = couch_att:foldl(
Att,
fun(Chunk, Acc) -> couch_hash:md5_hash_update(Acc, Chunk) end,
couch_hash:md5_hash_init()
),
couch_hash:md5_hash_final(Md50).
att_decoded_md5(Att) ->
Md50 = couch_att:foldl_decode(
Att,
fun(Chunk, Acc) -> couch_hash:md5_hash_update(Acc, Chunk) end,
couch_hash:md5_hash_init()
),
couch_hash:md5_hash_final(Md50).
cluster_url() ->
Fmt = "http://~s:~s@~s:~b",
Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
Port = mochiweb_socket_server:get(chttpd, port),
Args = [?USERNAME, ?PASSWORD, Addr, Port],
?l2b(io_lib:format(Fmt, Args)).
cluster_db_url(<<"/", _/binary>> = Path) ->
<<(cluster_url())/binary, Path/binary>>;
cluster_db_url(Path) ->
<<(cluster_url())/binary, "/", Path/binary>>.
get_pid(RepId) ->
Pid = global:whereis_name({couch_replicator_scheduler_job, RepId}),
?assert(is_pid(Pid)),
Pid.
replicate(Source, Target) ->
replicate(
{[
{<<"source">>, Source},
{<<"target">>, Target}
]}
).
replicate({[_ | _]} = RepObject) ->
{ok, Rep} = couch_replicator_utils:parse_rep_doc(RepObject, ?ADMIN_USER),
ok = couch_replicator_scheduler:add_job(Rep),
couch_replicator_scheduler:reschedule(),
Pid = get_pid(Rep#rep.id),
MonRef = erlang:monitor(process, Pid),
receive
{'DOWN', MonRef, process, Pid, _} ->
ok
end,
ok = couch_replicator_scheduler:remove_job(Rep#rep.id).
setup_db() ->
DbName = ?tempdb(),
ok = fabric:create_db(DbName, [{q, 1}, {n, 1}, ?ADMIN_CTX]),
DbName.
teardown_db(DbName) ->
try
ok = fabric:delete_db(DbName, [?ADMIN_CTX])
catch
error:database_does_not_exist ->
ok
end.
test_setup() ->
Ctx = test_util:start_couch([fabric, mem3, chttpd, couch_replicator]),
Hashed = couch_passwords:hash_admin_password(?PASSWORD),
ok = config:set("admins", ?USERNAME, ?b2l(Hashed), _Persist = false),
Source = setup_db(),
Target = setup_db(),
{Ctx, {Source, Target}}.
test_teardown({Ctx, {Source, Target}}) ->
meck:unload(),
teardown_db(Source),
teardown_db(Target),
config:delete("admins", ?USERNAME, _Persist = false),
ok = test_util:stop_couch(Ctx).