| #!/usr/bin/env escript |
| %% -*- erlang -*- |
| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| % Verify that compacting databases that are being used as the source or |
| % target of a replication doesn't affect the replication and that the |
| % replication doesn't hold their reference counters forever. |
| |
| -record(user_ctx, { |
| name = null, |
| roles = [], |
| handler |
| }). |
| |
| -record(changes_args, { |
| feed = "normal", |
| dir = fwd, |
| since = 0, |
| limit = 1000000000000000, |
| style = main_only, |
| heartbeat, |
| timeout, |
| filter = "", |
| filter_fun, |
| filter_args = [], |
| include_docs = false, |
| conflicts = false, |
| db_open_options = [] |
| }). |
| |
| -record(row, { |
| id, |
| seq, |
| deleted = false |
| }). |
| |
| |
| test_db_name() -> <<"couch_test_changes">>. |
| |
| |
| main(_) -> |
| test_util:init_code_path(), |
| |
| etap:plan(43), |
| case (catch test()) of |
| ok -> |
| etap:end_tests(); |
| Other -> |
| etap:diag(io_lib:format("Test died abnormally: ~p", [Other])), |
| etap:bail(Other) |
| end, |
| ok. |
| |
| |
| test() -> |
| couch_server_sup:start_link(test_util:config_files()), |
| |
| test_by_doc_ids(), |
| test_by_doc_ids_with_since(), |
| test_by_doc_ids_continuous(), |
| test_design_docs_only(), |
| test_heartbeat(), |
| |
| couch_server_sup:stop(), |
| ok. |
| |
| |
| test_by_doc_ids() -> |
| {ok, Db} = create_db(test_db_name()), |
| |
| {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), |
| {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), |
| {ok, Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), |
| {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), |
| {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}), |
| {ok, _Rev3_2} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev3}]}), |
| {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}), |
| {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}), |
| {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}), |
| |
| etap:diag("Folding changes in ascending order with _doc_ids filter"), |
| ChangesArgs = #changes_args{ |
| filter = "_doc_ids" |
| }, |
| DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], |
| Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, |
| Consumer = spawn_consumer(test_db_name(), ChangesArgs, Req), |
| |
| {Rows, LastSeq} = wait_finished(Consumer), |
| {ok, Db2} = couch_db:open_int(test_db_name(), []), |
| UpSeq = couch_db:get_update_seq(Db2), |
| couch_db:close(Db2), |
| etap:is(length(Rows), 2, "Received 2 changes rows"), |
| etap:is(LastSeq, UpSeq, "LastSeq is same as database update seq number"), |
| [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows, |
| etap:is(Id1, <<"doc4">>, "First row is for doc doc4"), |
| etap:is(Seq1, 4, "First row has seq 4"), |
| etap:is(Id2, <<"doc3">>, "Second row is for doc doc3"), |
| etap:is(Seq2, 6, "Second row has seq 6"), |
| |
| stop(Consumer), |
| etap:diag("Folding changes in descending order with _doc_ids filter"), |
| ChangesArgs2 = #changes_args{ |
| filter = "_doc_ids", |
| dir = rev |
| }, |
| Consumer2 = spawn_consumer(test_db_name(), ChangesArgs2, Req), |
| |
| {Rows2, LastSeq2} = wait_finished(Consumer2), |
| etap:is(length(Rows2), 2, "Received 2 changes rows"), |
| etap:is(LastSeq2, 4, "LastSeq is 4"), |
| [#row{seq = Seq1_2, id = Id1_2}, #row{seq = Seq2_2, id = Id2_2}] = Rows2, |
| etap:is(Id1_2, <<"doc3">>, "First row is for doc doc3"), |
| etap:is(Seq1_2, 6, "First row has seq 4"), |
| etap:is(Id2_2, <<"doc4">>, "Second row is for doc doc4"), |
| etap:is(Seq2_2, 4, "Second row has seq 6"), |
| |
| stop(Consumer2), |
| delete_db(Db). |
| |
| |
| test_by_doc_ids_with_since() -> |
| {ok, Db} = create_db(test_db_name()), |
| |
| {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), |
| {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), |
| {ok, Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), |
| {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), |
| {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}), |
| {ok, Rev3_2} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev3}]}), |
| {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}), |
| {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}), |
| {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}), |
| |
| ChangesArgs = #changes_args{ |
| filter = "_doc_ids", |
| since = 5 |
| }, |
| DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], |
| Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, |
| Consumer = spawn_consumer(test_db_name(), ChangesArgs, Req), |
| |
| {Rows, LastSeq} = wait_finished(Consumer), |
| {ok, Db2} = couch_db:open_int(test_db_name(), []), |
| UpSeq = couch_db:get_update_seq(Db2), |
| couch_db:close(Db2), |
| etap:is(LastSeq, UpSeq, "LastSeq is same as database update seq number"), |
| etap:is(length(Rows), 1, "Received 1 changes rows"), |
| [#row{seq = Seq1, id = Id1}] = Rows, |
| etap:is(Id1, <<"doc3">>, "First row is for doc doc3"), |
| etap:is(Seq1, 6, "First row has seq 6"), |
| |
| stop(Consumer), |
| |
| ChangesArgs2 = #changes_args{ |
| filter = "_doc_ids", |
| since = 6 |
| }, |
| Consumer2 = spawn_consumer(test_db_name(), ChangesArgs2, Req), |
| |
| {Rows2, LastSeq2} = wait_finished(Consumer2), |
| {ok, Db3} = couch_db:open_int(test_db_name(), []), |
| UpSeq2 = couch_db:get_update_seq(Db3), |
| couch_db:close(Db3), |
| etap:is(LastSeq2, UpSeq2, "LastSeq is same as database update seq number"), |
| etap:is(length(Rows2), 0, "Received 0 change rows"), |
| |
| stop(Consumer2), |
| |
| {ok, _Rev3_3} = save_doc( |
| Db, |
| {[{<<"_id">>, <<"doc3">>}, {<<"_deleted">>, true}, {<<"_rev">>, Rev3_2}]}), |
| |
| ChangesArgs3 = #changes_args{ |
| filter = "_doc_ids", |
| since = 9 |
| }, |
| Consumer3 = spawn_consumer(test_db_name(), ChangesArgs3, Req), |
| |
| {Rows3, LastSeq3} = wait_finished(Consumer3), |
| {ok, Db4} = couch_db:open_int(test_db_name(), []), |
| UpSeq3 = couch_db:get_update_seq(Db4), |
| couch_db:close(Db4), |
| etap:is(LastSeq3, UpSeq3, "LastSeq is same as database update seq number"), |
| etap:is(length(Rows3), 1, "Received 1 changes rows"), |
| etap:is( |
| [#row{seq = LastSeq3, id = <<"doc3">>, deleted = true}], |
| Rows3, |
| "Received row with doc3 deleted"), |
| |
| stop(Consumer3), |
| |
| delete_db(Db). |
| |
| |
| test_by_doc_ids_continuous() -> |
| {ok, Db} = create_db(test_db_name()), |
| |
| {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), |
| {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), |
| {ok, Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), |
| {ok, Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), |
| {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}), |
| {ok, Rev3_2} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev3}]}), |
| {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}), |
| {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}), |
| {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}), |
| |
| ChangesArgs = #changes_args{ |
| filter = "_doc_ids", |
| feed = "continuous" |
| }, |
| DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>], |
| Req = {json_req, {[{<<"doc_ids">>, DocIds}]}}, |
| Consumer = spawn_consumer(test_db_name(), ChangesArgs, Req), |
| |
| pause(Consumer), |
| Rows = get_rows(Consumer), |
| |
| etap:is(length(Rows), 2, "Received 2 changes rows"), |
| [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows, |
| etap:is(Id1, <<"doc4">>, "First row is for doc doc4"), |
| etap:is(Seq1, 4, "First row has seq 4"), |
| etap:is(Id2, <<"doc3">>, "Second row is for doc doc3"), |
| etap:is(Seq2, 6, "Second row has seq 6"), |
| |
| clear_rows(Consumer), |
| {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}), |
| {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}), |
| unpause(Consumer), |
| pause(Consumer), |
| etap:is(get_rows(Consumer), [], "No new rows"), |
| |
| {ok, Rev4_2} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}, {<<"_rev">>, Rev4}]}), |
| {ok, _Rev11} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}), |
| {ok, _Rev4_3} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}, {<<"_rev">>, Rev4_2}]}), |
| {ok, _Rev12} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}), |
| {ok, Rev3_3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev3_2}]}), |
| unpause(Consumer), |
| pause(Consumer), |
| |
| NewRows = get_rows(Consumer), |
| etap:is(length(NewRows), 2, "Received 2 new rows"), |
| [Row14, Row16] = NewRows, |
| etap:is(Row14#row.seq, 14, "First row has seq 14"), |
| etap:is(Row14#row.id, <<"doc4">>, "First row is for doc doc4"), |
| etap:is(Row16#row.seq, 16, "Second row has seq 16"), |
| etap:is(Row16#row.id, <<"doc3">>, "Second row is for doc doc3"), |
| |
| clear_rows(Consumer), |
| {ok, _Rev3_4} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}, {<<"_rev">>, Rev3_3}]}), |
| unpause(Consumer), |
| pause(Consumer), |
| etap:is(get_rows(Consumer), [#row{seq = 17, id = <<"doc3">>}], |
| "Got row for seq 17, doc doc3"), |
| |
| unpause(Consumer), |
| stop(Consumer), |
| delete_db(Db). |
| |
| |
| test_design_docs_only() -> |
| {ok, Db} = create_db(test_db_name()), |
| |
| {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), |
| {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), |
| {ok, Rev3} = save_doc(Db, {[{<<"_id">>, <<"_design/foo">>}]}), |
| |
| ChangesArgs = #changes_args{ |
| filter = "_design" |
| }, |
| Consumer = spawn_consumer(test_db_name(), ChangesArgs, {json_req, null}), |
| |
| {Rows, LastSeq} = wait_finished(Consumer), |
| {ok, Db2} = couch_db:open_int(test_db_name(), []), |
| UpSeq = couch_db:get_update_seq(Db2), |
| couch_db:close(Db2), |
| |
| etap:is(LastSeq, UpSeq, "LastSeq is same as database update seq number"), |
| etap:is(length(Rows), 1, "Received 1 changes rows"), |
| etap:is(Rows, [#row{seq = 3, id = <<"_design/foo">>}], "Received row with ddoc"), |
| |
| stop(Consumer), |
| |
| {ok, Db3} = couch_db:open_int( |
| test_db_name(), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]), |
| {ok, _Rev3_2} = save_doc( |
| Db3, |
| {[{<<"_id">>, <<"_design/foo">>}, {<<"_rev">>, Rev3}, |
| {<<"_deleted">>, true}]}), |
| |
| Consumer2 = spawn_consumer(test_db_name(), ChangesArgs, {json_req, null}), |
| |
| {Rows2, LastSeq2} = wait_finished(Consumer2), |
| UpSeq2 = UpSeq + 1, |
| couch_db:close(Db3), |
| |
| etap:is(LastSeq2, UpSeq2, "LastSeq is same as database update seq number"), |
| etap:is(length(Rows2), 1, "Received 1 changes rows"), |
| etap:is( |
| Rows2, |
| [#row{seq = 4, id = <<"_design/foo">>, deleted = true}], |
| "Received row with deleted ddoc"), |
| |
| stop(Consumer2), |
| delete_db(Db). |
| |
| test_heartbeat() -> |
| {ok, Db} = create_db(test_db_name()), |
| |
| {ok, _} = save_doc(Db, {[ |
| {<<"_id">>, <<"_design/foo">>}, |
| {<<"language">>, <<"javascript">>}, |
| {<<"filters">>, {[ |
| {<<"foo">>, <<"function(doc) { if ((doc._id == 'doc10') || |
| (doc._id == 'doc11') || |
| (doc._id == 'doc12')) { |
| return true; |
| } else { |
| return false; |
| }}">> |
| }]}} |
| ]}), |
| |
| ChangesArgs = #changes_args{ |
| filter = "foo/foo", |
| feed = "continuous", |
| timeout = 10000, |
| heartbeat = 1000 |
| }, |
| Consumer = spawn_consumer(test_db_name(), ChangesArgs, {json_req, null}), |
| |
| {ok, _Rev1} = save_doc(Db, {[{<<"_id">>, <<"doc1">>}]}), |
| timer:sleep(200), |
| {ok, _Rev2} = save_doc(Db, {[{<<"_id">>, <<"doc2">>}]}), |
| timer:sleep(200), |
| {ok, _Rev3} = save_doc(Db, {[{<<"_id">>, <<"doc3">>}]}), |
| timer:sleep(200), |
| {ok, _Rev4} = save_doc(Db, {[{<<"_id">>, <<"doc4">>}]}), |
| timer:sleep(200), |
| {ok, _Rev5} = save_doc(Db, {[{<<"_id">>, <<"doc5">>}]}), |
| timer:sleep(200), |
| {ok, _Rev6} = save_doc(Db, {[{<<"_id">>, <<"doc6">>}]}), |
| timer:sleep(200), |
| {ok, _Rev7} = save_doc(Db, {[{<<"_id">>, <<"doc7">>}]}), |
| timer:sleep(200), |
| {ok, _Rev8} = save_doc(Db, {[{<<"_id">>, <<"doc8">>}]}), |
| timer:sleep(200), |
| {ok, _Rev9} = save_doc(Db, {[{<<"_id">>, <<"doc9">>}]}), |
| Heartbeats = get_heartbeats(Consumer), |
| etap:is(Heartbeats, 2, "Received 2 heartbeats now"), |
| {ok, _Rev10} = save_doc(Db, {[{<<"_id">>, <<"doc10">>}]}), |
| timer:sleep(200), |
| {ok, _Rev11} = save_doc(Db, {[{<<"_id">>, <<"doc11">>}]}), |
| timer:sleep(200), |
| {ok, _Rev12} = save_doc(Db, {[{<<"_id">>, <<"doc12">>}]}), |
| Heartbeats2 = get_heartbeats(Consumer), |
| etap:is(Heartbeats2, 3, "Received 3 heartbeats now"), |
| Rows = get_rows(Consumer), |
| etap:is(length(Rows), 3, "Received 3 changes rows"), |
| |
| {ok, _Rev13} = save_doc(Db, {[{<<"_id">>, <<"doc13">>}]}), |
| timer:sleep(200), |
| {ok, _Rev14} = save_doc(Db, {[{<<"_id">>, <<"doc14">>}]}), |
| timer:sleep(200), |
| Heartbeats3 = get_heartbeats(Consumer), |
| etap:is(Heartbeats3, 6, "Received 6 heartbeats now"), |
| stop(Consumer), |
| couch_db:close(Db), |
| delete_db(Db). |
| |
| |
| save_doc(Db, Json) -> |
| Doc = couch_doc:from_json_obj(Json), |
| {ok, Rev} = couch_db:update_doc(Db, Doc, []), |
| {ok, couch_doc:rev_to_str(Rev)}. |
| |
| |
| get_rows(Consumer) -> |
| Ref = make_ref(), |
| Consumer ! {get_rows, Ref}, |
| receive |
| {rows, Ref, Rows} -> |
| Rows |
| after 3000 -> |
| etap:bail("Timeout getting rows from consumer") |
| end. |
| |
| get_heartbeats(Consumer) -> |
| Ref = make_ref(), |
| Consumer ! {get_heartbeats, Ref}, |
| receive |
| {hearthbeats, Ref, HeartBeats} -> |
| HeartBeats |
| after 3000 -> |
| etap:bail("Timeout getting heartbeats from consumer") |
| end. |
| |
| |
| clear_rows(Consumer) -> |
| Ref = make_ref(), |
| Consumer ! {reset, Ref}, |
| receive |
| {ok, Ref} -> |
| ok |
| after 3000 -> |
| etap:bail("Timeout clearing consumer rows") |
| end. |
| |
| |
| stop(Consumer) -> |
| Ref = make_ref(), |
| Consumer ! {stop, Ref}, |
| receive |
| {ok, Ref} -> |
| ok |
| after 3000 -> |
| etap:bail("Timeout stopping consumer") |
| end. |
| |
| |
| pause(Consumer) -> |
| Ref = make_ref(), |
| Consumer ! {pause, Ref}, |
| receive |
| {paused, Ref} -> |
| ok |
| after 3000 -> |
| etap:bail("Timeout pausing consumer") |
| end. |
| |
| |
| unpause(Consumer) -> |
| Ref = make_ref(), |
| Consumer ! {continue, Ref}, |
| receive |
| {ok, Ref} -> |
| ok |
| after 3000 -> |
| etap:bail("Timeout unpausing consumer") |
| end. |
| |
| |
| wait_finished(_Consumer) -> |
| receive |
| {consumer_finished, Rows, LastSeq} -> |
| {Rows, LastSeq} |
| after 30000 -> |
| etap:bail("Timeout waiting for consumer to finish") |
| end. |
| |
| |
| spawn_consumer(DbName, ChangesArgs0, Req) -> |
| Parent = self(), |
| spawn(fun() -> |
| put(heartbeat_count, 0), |
| Callback = fun({change, {Change}, _}, _, Acc) -> |
| Id = couch_util:get_value(<<"id">>, Change), |
| Seq = couch_util:get_value(<<"seq">>, Change), |
| Del = couch_util:get_value(<<"deleted">>, Change, false), |
| [#row{id = Id, seq = Seq, deleted = Del} | Acc]; |
| ({stop, LastSeq}, _, Acc) -> |
| Parent ! {consumer_finished, lists:reverse(Acc), LastSeq}, |
| stop_loop(Parent, Acc); |
| (timeout, _, Acc) -> |
| put(heartbeat_count, get(heartbeat_count) + 1), |
| maybe_pause(Parent, Acc); |
| (_, _, Acc) -> |
| maybe_pause(Parent, Acc) |
| end, |
| {ok, Db} = couch_db:open_int(DbName, []), |
| ChangesArgs = case (ChangesArgs0#changes_args.timeout =:= undefined) |
| andalso (ChangesArgs0#changes_args.heartbeat =:= undefined) of |
| true -> |
| ChangesArgs0#changes_args{timeout = 10, heartbeat = 10}; |
| false -> |
| ChangesArgs0 |
| end, |
| FeedFun = couch_changes:handle_changes(ChangesArgs, Req, Db), |
| try |
| FeedFun({Callback, []}) |
| catch throw:{stop, _} -> |
| ok |
| end, |
| catch couch_db:close(Db) |
| end). |
| |
| |
| maybe_pause(Parent, Acc) -> |
| receive |
| {get_rows, Ref} -> |
| Parent ! {rows, Ref, lists:reverse(Acc)}, |
| maybe_pause(Parent, Acc); |
| {get_heartbeats, Ref} -> |
| Parent ! {hearthbeats, Ref, get(heartbeat_count)}, |
| maybe_pause(Parent, Acc); |
| {reset, Ref} -> |
| Parent ! {ok, Ref}, |
| maybe_pause(Parent, []); |
| {pause, Ref} -> |
| Parent ! {paused, Ref}, |
| pause_loop(Parent, Acc); |
| {stop, Ref} -> |
| Parent ! {ok, Ref}, |
| throw({stop, Acc}) |
| after 0 -> |
| Acc |
| end. |
| |
| |
| pause_loop(Parent, Acc) -> |
| receive |
| {stop, Ref} -> |
| Parent ! {ok, Ref}, |
| throw({stop, Acc}); |
| {reset, Ref} -> |
| Parent ! {ok, Ref}, |
| pause_loop(Parent, []); |
| {continue, Ref} -> |
| Parent ! {ok, Ref}, |
| Acc; |
| {get_rows, Ref} -> |
| Parent ! {rows, Ref, lists:reverse(Acc)}, |
| pause_loop(Parent, Acc) |
| end. |
| |
| |
| stop_loop(Parent, Acc) -> |
| receive |
| {get_rows, Ref} -> |
| Parent ! {rows, Ref, lists:reverse(Acc)}, |
| stop_loop(Parent, Acc); |
| {stop, Ref} -> |
| Parent ! {ok, Ref}, |
| Acc |
| end. |
| |
| |
| create_db(DbName) -> |
| couch_db:create( |
| DbName, |
| [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]). |
| |
| |
| delete_db(Db) -> |
| ok = couch_server:delete( |
| couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]). |