blob: 7c4d38c9373d0161ccfff851e9060f38c562fec6 [file] [log] [blame]
#!/usr/bin/env escript
%% -*- erlang -*-
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
% Verify that compacting databases that are being used as the source or
% target of a replication doesn't affect the replication and that the
% replication doesn't hold their reference counters forever.
-define(b2l(B), binary_to_list(B)).
-record(user_ctx, {
name = null,
roles = [],
handler
}).
-record(db, {
main_pid = nil,
update_pid = nil,
compactor_pid = nil,
instance_start_time, % number of microsecs since jan 1 1970 as a binary string
fd,
updater_fd,
fd_ref_counter,
header = nil,
committed_update_seq,
fulldocinfo_by_id_btree,
docinfo_by_seq_btree,
local_docs_btree,
update_seq,
name,
filepath,
validate_doc_funs = [],
security = [],
security_ptr = nil,
user_ctx = #user_ctx{},
waiting_delayed_commit = nil,
revs_limit = 1000,
fsync_options = [],
options = [],
compression,
before_doc_update,
after_doc_read
}).
-record(rep, {
id,
source,
target,
options,
user_ctx,
doc_id
}).
source_db_name() -> <<"couch_test_rep_db_a">>.
target_db_name() -> <<"couch_test_rep_db_b">>.
main(_) ->
test_util:init_code_path(),
etap:plan(376),
case (catch test()) of
ok ->
etap:end_tests();
Other ->
etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
etap:bail(Other)
end,
ok.
test() ->
couch_server_sup:start_link(test_util:config_files()),
ibrowse:start(),
Pairs = [
{source_db_name(), target_db_name()},
{{remote, source_db_name()}, target_db_name()},
{source_db_name(), {remote, target_db_name()}},
{{remote, source_db_name()}, {remote, (target_db_name())}}
],
lists:foreach(
fun({Source, Target}) ->
{ok, SourceDb} = create_db(source_db_name()),
etap:is(couch_db:is_idle(SourceDb), true,
"Source database is idle before starting replication"),
{ok, TargetDb} = create_db(target_db_name()),
etap:is(couch_db:is_idle(TargetDb), true,
"Target database is idle before starting replication"),
{ok, RepPid, RepId} = replicate(Source, Target),
check_active_tasks(RepPid, RepId, Source, Target),
{ok, DocsWritten} = populate_and_compact_test(
RepPid, SourceDb, TargetDb),
wait_target_in_sync(DocsWritten, TargetDb),
check_active_tasks(RepPid, RepId, Source, Target),
cancel_replication(RepId, RepPid),
compare_dbs(SourceDb, TargetDb),
delete_db(SourceDb),
delete_db(TargetDb),
couch_server_sup:stop(),
ok = timer:sleep(1000),
couch_server_sup:start_link(test_util:config_files())
end,
Pairs),
couch_server_sup:stop(),
ok.
populate_and_compact_test(RepPid, SourceDb0, TargetDb0) ->
etap:is(is_process_alive(RepPid), true, "Replication process is alive"),
check_db_alive("source", SourceDb0),
check_db_alive("target", TargetDb0),
Writer = spawn_writer(SourceDb0),
lists:foldl(
fun(_, {SourceDb, TargetDb, DocCount}) ->
pause_writer(Writer),
compact_db("source", SourceDb),
etap:is(is_process_alive(RepPid), true,
"Replication process is alive after source database compaction"),
check_db_alive("source", SourceDb),
check_ref_counter("source", SourceDb),
compact_db("target", TargetDb),
etap:is(is_process_alive(RepPid), true,
"Replication process is alive after target database compaction"),
check_db_alive("target", TargetDb),
check_ref_counter("target", TargetDb),
{ok, SourceDb2} = reopen_db(SourceDb),
{ok, TargetDb2} = reopen_db(TargetDb),
resume_writer(Writer),
wait_writer(Writer, DocCount),
compact_db("source", SourceDb2),
etap:is(is_process_alive(RepPid), true,
"Replication process is alive after source database compaction"),
check_db_alive("source", SourceDb2),
pause_writer(Writer),
check_ref_counter("source", SourceDb2),
resume_writer(Writer),
compact_db("target", TargetDb2),
etap:is(is_process_alive(RepPid), true,
"Replication process is alive after target database compaction"),
check_db_alive("target", TargetDb2),
pause_writer(Writer),
check_ref_counter("target", TargetDb2),
resume_writer(Writer),
{ok, SourceDb3} = reopen_db(SourceDb2),
{ok, TargetDb3} = reopen_db(TargetDb2),
{SourceDb3, TargetDb3, DocCount + 50}
end,
{SourceDb0, TargetDb0, 50}, lists:seq(1, 5)),
DocsWritten = stop_writer(Writer),
{ok, DocsWritten}.
check_db_alive(Type, #db{main_pid = Pid}) ->
etap:is(is_process_alive(Pid), true,
"Local " ++ Type ++ " database main pid is alive").
compact_db(Type, #db{name = Name}) ->
{ok, Db} = couch_db:open_int(Name, []),
{ok, CompactPid} = couch_db:start_compact(Db),
MonRef = erlang:monitor(process, CompactPid),
receive
{'DOWN', MonRef, process, CompactPid, normal} ->
ok;
{'DOWN', MonRef, process, CompactPid, Reason} ->
etap:bail("Error compacting " ++ Type ++ " database " ++ ?b2l(Name) ++
": " ++ couch_util:to_list(Reason))
after 30000 ->
etap:bail("Compaction for " ++ Type ++ " database " ++ ?b2l(Name) ++
" didn't finish")
end,
ok = couch_db:close(Db).
check_ref_counter(Type, #db{name = Name, fd_ref_counter = OldRefCounter}) ->
MonRef = erlang:monitor(process, OldRefCounter),
receive
{'DOWN', MonRef, process, OldRefCounter, _} ->
etap:diag("Old " ++ Type ++ " database ref counter terminated")
after 30000 ->
etap:bail("Old " ++ Type ++ " database ref counter didn't terminate")
end,
{ok, #db{fd_ref_counter = NewRefCounter} = Db} = couch_db:open_int(Name, []),
ok = couch_db:close(Db),
etap:isnt(
NewRefCounter, OldRefCounter, Type ++ " database has new ref counter").
reopen_db(#db{name = Name}) ->
{ok, Db} = couch_db:open_int(Name, []),
ok = couch_db:close(Db),
{ok, Db}.
wait_target_in_sync(DocCount, #db{name = TargetName}) ->
wait_target_in_sync_loop(DocCount, TargetName, 300).
wait_target_in_sync_loop(_DocCount, _TargetName, 0) ->
etap:bail("Could not get source and target databases in sync");
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft) ->
{ok, Target} = couch_db:open_int(TargetName, []),
{ok, TargetInfo} = couch_db:get_db_info(Target),
ok = couch_db:close(Target),
TargetDocCount = couch_util:get_value(doc_count, TargetInfo),
case TargetDocCount == DocCount of
true ->
etap:diag("Source and target databases are in sync");
false ->
ok = timer:sleep(100),
wait_target_in_sync_loop(DocCount, TargetName, RetriesLeft - 1)
end.
compare_dbs(#db{name = SourceName}, #db{name = TargetName}) ->
{ok, SourceDb} = couch_db:open_int(SourceName, []),
{ok, TargetDb} = couch_db:open_int(TargetName, []),
Fun = fun(FullDocInfo, _, Acc) ->
{ok, Doc} = couch_db:open_doc(SourceDb, FullDocInfo),
{Props} = DocJson = couch_doc:to_json_obj(Doc, [attachments]),
DocId = couch_util:get_value(<<"_id">>, Props),
DocTarget = case couch_db:open_doc(TargetDb, DocId) of
{ok, DocT} ->
DocT;
Error ->
etap:bail("Error opening document '" ++ ?b2l(DocId) ++
"' from target: " ++ couch_util:to_list(Error))
end,
DocTargetJson = couch_doc:to_json_obj(DocTarget, [attachments]),
case DocTargetJson of
DocJson ->
ok;
_ ->
etap:bail("Content from document '" ++ ?b2l(DocId) ++
"' differs in target database")
end,
{ok, Acc}
end,
{ok, _, _} = couch_db:enum_docs(SourceDb, Fun, [], []),
etap:diag("Target database has the same documents as the source database"),
ok = couch_db:close(SourceDb),
ok = couch_db:close(TargetDb).
check_active_tasks(RepPid, {BaseId, Ext} = _RepId, Src, Tgt) ->
Source = case Src of
{remote, NameSrc} ->
<<(db_url(NameSrc))/binary, $/>>;
_ ->
Src
end,
Target = case Tgt of
{remote, NameTgt} ->
<<(db_url(NameTgt))/binary, $/>>;
_ ->
Tgt
end,
FullRepId = list_to_binary(BaseId ++ Ext),
Pid = list_to_binary(pid_to_list(RepPid)),
[RepTask] = couch_task_status:all(),
etap:is(couch_util:get_value(pid, RepTask), Pid,
"_active_tasks entry has correct pid property"),
etap:is(couch_util:get_value(replication_id, RepTask), FullRepId,
"_active_tasks entry has right replication id"),
etap:is(couch_util:get_value(continuous, RepTask), true,
"_active_tasks entry has continuous property set to true"),
etap:is(couch_util:get_value(source, RepTask), Source,
"_active_tasks entry has correct source property"),
etap:is(couch_util:get_value(target, RepTask), Target,
"_active_tasks entry has correct target property"),
etap:is(is_integer(couch_util:get_value(docs_read, RepTask)), true,
"_active_tasks entry has integer docs_read property"),
etap:is(is_integer(couch_util:get_value(docs_written, RepTask)), true,
"_active_tasks entry has integer docs_written property"),
etap:is(is_integer(couch_util:get_value(doc_write_failures, RepTask)), true,
"_active_tasks entry has integer doc_write_failures property"),
etap:is(is_integer(couch_util:get_value(revisions_checked, RepTask)), true,
"_active_tasks entry has integer revisions_checked property"),
etap:is(is_integer(couch_util:get_value(missing_revisions_found, RepTask)), true,
"_active_tasks entry has integer missing_revisions_found property"),
etap:is(is_integer(couch_util:get_value(checkpointed_source_seq, RepTask)), true,
"_active_tasks entry has integer checkpointed_source_seq property"),
etap:is(is_integer(couch_util:get_value(source_seq, RepTask)), true,
"_active_tasks entry has integer source_seq property"),
Progress = couch_util:get_value(progress, RepTask),
etap:is(is_integer(Progress), true,
"_active_tasks entry has an integer progress property"),
etap:is(Progress =< 100, true, "Progress is not greater than 100%").
wait_writer(Pid, NumDocs) ->
case get_writer_num_docs_written(Pid) of
N when N >= NumDocs ->
ok;
_ ->
wait_writer(Pid, NumDocs)
end.
spawn_writer(Db) ->
Parent = self(),
Pid = spawn(fun() -> writer_loop(Db, Parent, 0) end),
etap:diag("Started source database writer"),
Pid.
pause_writer(Pid) ->
Ref = make_ref(),
Pid ! {pause, Ref},
receive
{paused, Ref} ->
ok
after 30000 ->
etap:bail("Failed to pause source database writer")
end.
resume_writer(Pid) ->
Ref = make_ref(),
Pid ! {continue, Ref},
receive
{ok, Ref} ->
ok
after 30000 ->
etap:bail("Failed to unpause source database writer")
end.
get_writer_num_docs_written(Pid) ->
Ref = make_ref(),
Pid ! {get_count, Ref},
receive
{count, Ref, Count} ->
Count
after 30000 ->
etap:bail("Timeout getting number of documents written from "
"source database writer")
end.
stop_writer(Pid) ->
Ref = make_ref(),
Pid ! {stop, Ref},
receive
{stopped, Ref, DocsWritten} ->
MonRef = erlang:monitor(process, Pid),
receive
{'DOWN', MonRef, process, Pid, _Reason} ->
etap:diag("Stopped source database writer"),
DocsWritten
after 30000 ->
etap:bail("Timeout stopping source database writer")
end
after 30000 ->
etap:bail("Timeout stopping source database writer")
end.
writer_loop(#db{name = DbName}, Parent, Counter) ->
maybe_pause(Parent, Counter),
Doc = couch_doc:from_json_obj({[
{<<"_id">>, list_to_binary(integer_to_list(Counter + 1))},
{<<"value">>, Counter + 1},
{<<"_attachments">>, {[
{<<"icon1.png">>, {[
{<<"data">>, base64:encode(att_data())},
{<<"content_type">>, <<"image/png">>}
]}},
{<<"icon2.png">>, {[
{<<"data">>, base64:encode(iolist_to_binary(
[att_data(), att_data()]))},
{<<"content_type">>, <<"image/png">>}
]}}
]}}
]}),
maybe_pause(Parent, Counter),
{ok, Db} = couch_db:open_int(DbName, []),
{ok, _} = couch_db:update_doc(Db, Doc, []),
ok = couch_db:close(Db),
receive
{get_count, Ref} ->
Parent ! {count, Ref, Counter + 1},
writer_loop(Db, Parent, Counter + 1);
{stop, Ref} ->
Parent ! {stopped, Ref, Counter + 1}
after 0 ->
ok = timer:sleep(500),
writer_loop(Db, Parent, Counter + 1)
end.
maybe_pause(Parent, Counter) ->
receive
{get_count, Ref} ->
Parent ! {count, Ref, Counter};
{pause, Ref} ->
Parent ! {paused, Ref},
receive {continue, Ref2} -> Parent ! {ok, Ref2} end
after 0 ->
ok
end.
db_url(DbName) ->
iolist_to_binary([
"http://", couch_config:get("httpd", "bind_address", "127.0.0.1"),
":", integer_to_list(mochiweb_socket_server:get(couch_httpd, port)),
"/", DbName
]).
create_db(DbName) ->
{ok, Db} = couch_db:create(
DbName,
[{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]),
couch_db:close(Db),
{ok, Db}.
delete_db(#db{name = DbName, main_pid = Pid}) ->
ok = couch_server:delete(
DbName, [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]),
MonRef = erlang:monitor(process, Pid),
receive
{'DOWN', MonRef, process, Pid, _Reason} ->
ok
after 30000 ->
etap:bail("Timeout deleting database")
end.
replicate({remote, Db}, Target) ->
replicate(db_url(Db), Target);
replicate(Source, {remote, Db}) ->
replicate(Source, db_url(Db));
replicate(Source, Target) ->
RepObject = {[
{<<"source">>, Source},
{<<"target">>, Target},
{<<"continuous">>, true}
]},
{ok, Rep} = couch_replicator_utils:parse_rep_doc(
RepObject, #user_ctx{roles = [<<"_admin">>]}),
{ok, Pid} = couch_replicator:async_replicate(Rep),
{ok, Pid, Rep#rep.id}.
cancel_replication(RepId, RepPid) ->
{ok, _} = couch_replicator:cancel_replication(RepId),
etap:is(is_process_alive(RepPid), false,
"Replication process is no longer alive after cancel").
att_data() ->
{ok, Data} = file:read_file(
test_util:source_file("share/www/image/logo.png")),
Data.