| defmodule ReplicationTest do |
| use CouchTestCase |
| |
| @moduledoc """ |
| Test CouchDB View Collation Behavior |
| This is a port of the view_collation.js suite |
| """ |
| |
| # TODO: Parameterize these |
| @admin_account "adm:pass" |
| @db_pairs_prefixes [ |
| {"remote-to-remote", "http://127.0.0.1:15984/", "http://127.0.0.1:15984/"} |
| ] |
| |
| # This should probably go into `make elixir` like what |
| # happens for JavaScript tests. |
| @moduletag config: [{"replicator", "startup_jitter", "0"}] |
| |
| test "source database not found with host" do |
| name = random_db_name() |
| src_url = "http://127.0.0.1:15984/" <> name <> "_src" |
| tgt_url = "http://127.0.0.1:15984/" <> name <> "_tgt" |
| check_not_found(src_url, tgt_url) |
| end |
| |
| def check_not_found(src, tgt) do |
| body = %{:source => src, :target => tgt} |
| resp = Couch.post("/_replicate", body: body) |
| assert resp.body["error"] == "db_not_found" |
| end |
| |
| test "replicating attachment without conflict - COUCHDB-885" do |
| name = random_db_name() |
| src_db_name = name <> "_src" |
| tgt_db_name = name <> "_tgt" |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| doc = %{"_id" => "doc1"} |
| [doc] = save_docs(src_db_name, [doc]) |
| |
| repl_src = "http://127.0.0.1:15984/" <> src_db_name |
| repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name |
| result = replicate(repl_src, repl_tgt) |
| assert result["ok"] |
| assert is_list(result["history"]) |
| history = Enum.at(result["history"], 0) |
| assert history["docs_written"] == 1 |
| assert history["docs_read"] == 1 |
| assert history["doc_write_failures"] == 0 |
| |
| doc = |
| Map.put(doc, "_attachments", %{ |
| "hello.txt" => %{ |
| "content_type" => "text/plain", |
| # base64:encode("hello world") |
| "data" => "aGVsbG8gd29ybGQ=" |
| }, |
| "foo.dat" => %{ |
| "content_type" => "not/compressible", |
| # base64:encode("i am not gziped") |
| "data" => "aSBhbSBub3QgZ3ppcGVk" |
| } |
| }) |
| |
| [doc] = save_docs(src_db_name, [doc]) |
| |
| repl_src = "http://127.0.0.1:15984/" <> src_db_name |
| repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name |
| result = replicate(repl_src, repl_tgt) |
| |
| assert result["ok"] |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 2 |
| history = Enum.at(result["history"], 0) |
| assert history["docs_written"] == 1 |
| assert history["docs_read"] == 1 |
| assert history["doc_write_failures"] == 0 |
| |
| query = %{ |
| :conflicts => true, |
| :deleted_conflicts => true, |
| :attachments => true, |
| :att_encoding_info => true |
| } |
| |
| opts = [headers: [Accept: "application/json"], query: query] |
| resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}", opts) |
| assert HTTPotion.Response.success?(resp) |
| assert is_map(resp.body) |
| refute Map.has_key?(resp.body, "_conflicts") |
| refute Map.has_key?(resp.body, "_deleted_conflicts") |
| |
| atts = resp.body["_attachments"] |
| |
| assert atts["hello.txt"]["content_type"] == "text/plain" |
| assert atts["hello.txt"]["data"] == "aGVsbG8gd29ybGQ=" |
| assert atts["hello.txt"]["encoding"] == "gzip" |
| |
| assert atts["foo.dat"]["content_type"] == "not/compressible" |
| assert atts["foo.dat"]["data"] == "aSBhbSBub3QgZ3ppcGVk" |
| refute Map.has_key?(atts["foo.dat"], "encoding") |
| end |
| |
| test "replication cancellation" do |
| name = random_db_name() |
| src_db_name = name <> "_src" |
| tgt_db_name = name <> "_tgt" |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| save_docs(src_db_name, make_docs(1..6)) |
| |
| repl_body = %{:continuous => true, :create_target => true} |
| repl_src = "http://127.0.0.1:15984/" <> src_db_name |
| repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| |
| assert result["ok"] |
| assert is_binary(result["_local_id"]) |
| repl_id = result["_local_id"] |
| |
| task = get_task(repl_id, 3_000) |
| assert is_map(task) |
| |
| assert task["replication_id"] == repl_id |
| |
| repl_body = %{ |
| "replication_id" => repl_id, |
| cancel: true |
| } |
| |
| result = Couch.post("/_replicate", body: repl_body) |
| assert result.status_code == 200 |
| |
| wait_for_repl_stop(repl_id) |
| |
| assert get_task(repl_id, 0) == nil |
| |
| result = Couch.post("/_replicate", body: repl_body) |
| assert result.status_code == 404 |
| end |
| |
| @tag user: [name: "joe", password: "erly", roles: ["erlanger"]] |
| test "unauthorized replication cancellation", ctx do |
| name = random_db_name() |
| src_db_name = name <> "_src" |
| tgt_db_name = name <> "_tgt" |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| save_docs(src_db_name, make_docs(1..6)) |
| |
| repl_src = "http://127.0.0.1:15984/" <> src_db_name |
| repl_tgt = "http://127.0.0.1:15984/" <> tgt_db_name |
| repl_body = %{"continuous" => true} |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| |
| assert result["ok"] |
| assert is_binary(result["_local_id"]) |
| repl_id = result["_local_id"] |
| |
| task = get_task(repl_id, 5_000) |
| assert is_map(task) |
| |
| sess = Couch.login(ctx[:userinfo]) |
| resp = Couch.Session.get(sess, "/_session") |
| assert resp.body["ok"] |
| assert resp.body["userCtx"]["name"] == "joe" |
| |
| repl_body = %{ |
| "replication_id" => repl_id, |
| cancel: true |
| } |
| |
| resp = Couch.Session.post(sess, "/_replicate", body: repl_body) |
| assert resp.status_code == 401 |
| assert resp.body["error"] == "unauthorized" |
| |
| assert Couch.Session.logout(sess).body["ok"] |
| |
| resp = Couch.post("/_replicate", body: repl_body) |
| assert resp.status_code == 200 |
| end |
| |
| Enum.each(@db_pairs_prefixes, fn {name, src_prefix, tgt_prefix} -> |
| @src_prefix src_prefix |
| @tgt_prefix tgt_prefix |
| |
| test "simple #{name} replication - #{name}" do |
| run_simple_repl(@src_prefix, @tgt_prefix) |
| end |
| |
| test "replicate with since_seq - #{name}" do |
| run_since_seq_repl(@src_prefix, @tgt_prefix) |
| end |
| |
| test "validate_doc_update failure replications - #{name}" do |
| run_vdu_repl(@src_prefix, @tgt_prefix) |
| end |
| |
| test "create_target filter option - #{name}" do |
| run_create_target_repl(@src_prefix, @tgt_prefix) |
| end |
| |
| test "filtered replications - #{name}" do |
| run_filtered_repl(@src_prefix, @tgt_prefix) |
| end |
| |
| test "replication restarts after filter change - COUCHDB-892 - #{name}" do |
| run_filter_changed_repl(@src_prefix, @tgt_prefix) |
| end |
| |
| test "replication by doc ids - #{name}" do |
| run_by_id_repl(@src_prefix, @tgt_prefix) |
| end |
| |
| test "continuous replication - #{name}" do |
| run_continuous_repl(@src_prefix, @tgt_prefix) |
| end |
| |
| @tag config: [ |
| {"attachments", "compression_level", "8"}, |
| {"attachments", "compressible_types", "text/*"} |
| ] |
| test "compressed attachment replication - #{name}" do |
| run_compressed_att_repl(@src_prefix, @tgt_prefix) |
| end |
| |
| @tag user: [name: "joe", password: "erly", roles: ["erlanger"]] |
| test "non-admin user on target - #{name}", ctx do |
| run_non_admin_target_user_repl(@src_prefix, @tgt_prefix, ctx) |
| end |
| |
| @tag user: [name: "joe", password: "erly", roles: ["erlanger"]] |
| test "non-admin or reader user on source - #{name}", ctx do |
| run_non_admin_or_reader_source_user_repl(@src_prefix, @tgt_prefix, ctx) |
| end |
| end) |
| |
| def run_simple_repl(src_prefix, tgt_prefix) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| att1_data = get_att1_data() |
| att2_data = get_att2_data() |
| |
| ddoc = %{ |
| "_id" => "_design/foo", |
| "language" => "javascript", |
| "value" => "ddoc" |
| } |
| |
| docs = make_docs(1..20) ++ [ddoc] |
| docs = save_docs(src_db_name, docs) |
| |
| docs = |
| for doc <- docs do |
| if doc["integer"] >= 10 and doc["integer"] < 15 do |
| add_attachment(src_db_name, doc, body: att1_data) |
| else |
| doc |
| end |
| end |
| |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| result = replicate(repl_src, repl_tgt) |
| assert result["ok"] |
| |
| src_info = |
| retry_until(fn -> |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert src_info["doc_count"] == tgt_info["doc_count"] |
| src_info |
| end) |
| |
| assert is_binary(result["session_id"]) |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 1 |
| history = Enum.at(result["history"], 0) |
| assert is_binary(history["start_time"]) |
| assert is_binary(history["end_time"]) |
| assert history["start_last_seq"] == 0 |
| assert history["missing_checked"] == src_info["doc_count"] |
| assert history["missing_found"] == src_info["doc_count"] |
| assert history["docs_read"] == src_info["doc_count"] |
| assert history["docs_written"] == src_info["doc_count"] |
| assert history["doc_write_failures"] == 0 |
| |
| for doc <- docs do |
| copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}").body |
| assert cmp_json(doc, copy) |
| |
| if doc["integer"] >= 10 and doc["integer"] < 15 do |
| atts = copy["_attachments"] |
| assert is_map(atts) |
| att = atts["readme.txt"] |
| assert is_map(att) |
| assert att["revpos"] == 2 |
| assert String.match?(att["content_type"], ~r/text\/plain/) |
| assert att["stub"] |
| |
| resp = Couch.get!("/#{tgt_db_name}/#{copy["_id"]}/readme.txt") |
| assert String.length(resp.body) == String.length(att1_data) |
| assert resp.body == att1_data |
| end |
| end |
| |
| # Add one more doc to source and more attachments to existing docs |
| new_doc = %{"_id" => "foo666", "value" => "d"} |
| [new_doc] = save_docs(src_db_name, [new_doc]) |
| |
| docs = |
| for doc <- docs do |
| if doc["integer"] >= 10 and doc["integer"] < 15 do |
| ctype = "application/binary" |
| opts = [name: "data.dat", body: att2_data, content_type: ctype] |
| add_attachment(src_db_name, doc, opts) |
| else |
| doc |
| end |
| end |
| |
| result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) |
| assert result["ok"] |
| |
| retry_until(fn -> |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| end) |
| |
| assert is_binary(result["session_id"]) |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 2 |
| history = Enum.at(result["history"], 0) |
| assert history["session_id"] == result["session_id"] |
| assert is_binary(history["start_time"]) |
| assert is_binary(history["end_time"]) |
| assert history["missing_checked"] == 6 |
| assert history["missing_found"] == 6 |
| assert history["docs_read"] == 6 |
| assert history["docs_written"] == 6 |
| assert history["doc_write_failures"] == 0 |
| |
| copy = Couch.get!("/#{tgt_db_name}/#{new_doc["_id"]}").body |
| assert copy["_id"] == new_doc["_id"] |
| assert copy["value"] == new_doc["value"] |
| |
| for i <- 10..14 do |
| doc = Enum.at(docs, i - 1) |
| copy = Couch.get!("/#{tgt_db_name}/#{i}").body |
| assert cmp_json(doc, copy) |
| |
| atts = copy["_attachments"] |
| assert is_map(atts) |
| att = atts["readme.txt"] |
| assert is_map(atts) |
| assert att["revpos"] == 2 |
| assert String.match?(att["content_type"], ~r/text\/plain/) |
| assert att["stub"] |
| |
| resp = Couch.get!("/#{tgt_db_name}/#{i}/readme.txt") |
| assert String.length(resp.body) == String.length(att1_data) |
| assert resp.body == att1_data |
| |
| att = atts["data.dat"] |
| assert is_map(att) |
| assert att["revpos"] == 3 |
| assert String.match?(att["content_type"], ~r/application\/binary/) |
| assert att["stub"] |
| |
| resp = Couch.get!("/#{tgt_db_name}/#{i}/data.dat") |
| assert String.length(resp.body) == String.length(att2_data) |
| assert resp.body == att2_data |
| end |
| |
| # Test deletion is replicated |
| del_doc = %{ |
| "_id" => "1", |
| "_rev" => Enum.at(docs, 0)["_rev"], |
| "_deleted" => true |
| } |
| |
| [del_doc] = save_docs(src_db_name, [del_doc]) |
| |
| result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) |
| assert result["ok"] |
| |
| retry_until(fn -> |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| assert tgt_info["doc_del_count"] == src_info["doc_del_count"] |
| assert tgt_info["doc_del_count"] == 1 |
| end) |
| |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 3 |
| history = Enum.at(result["history"], 0) |
| assert history["missing_checked"] == 1 |
| assert history["missing_found"] == 1 |
| assert history["docs_read"] == 1 |
| assert history["docs_written"] == 1 |
| assert history["doc_write_failures"] == 0 |
| |
| resp = Couch.get("/#{tgt_db_name}/#{del_doc["_id"]}") |
| assert resp.status_code == 404 |
| |
| resp = Couch.get!("/#{tgt_db_name}/_changes") |
| [change] = Enum.filter(resp.body["results"], &(&1["id"] == del_doc["_id"])) |
| assert change["id"] == del_doc["_id"] |
| assert change["deleted"] |
| |
| # Test replicating a conflict |
| doc = Couch.get!("/#{src_db_name}/2").body |
| [doc] = save_docs(src_db_name, [Map.put(doc, :value, "white")]) |
| |
| copy = Couch.get!("/#{tgt_db_name}/2").body |
| save_docs(tgt_db_name, [Map.put(copy, :value, "black")]) |
| |
| result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) |
| assert result["ok"] |
| |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 4 |
| history = Enum.at(result["history"], 0) |
| assert history["missing_checked"] == 1 |
| assert history["missing_found"] == 1 |
| assert history["docs_read"] == 1 |
| assert history["docs_written"] == 1 |
| assert history["doc_write_failures"] == 0 |
| |
| copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body |
| assert String.match?(copy["_rev"], ~r/^2-/) |
| assert is_list(copy["_conflicts"]) |
| assert length(copy["_conflicts"]) == 1 |
| conflict = Enum.at(copy["_conflicts"], 0) |
| assert String.match?(conflict, ~r/^2-/) |
| |
| # Re-replicate updated conflict |
| [doc] = save_docs(src_db_name, [Map.put(doc, :value, "yellow")]) |
| |
| result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) |
| assert result["ok"] |
| |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 5 |
| history = Enum.at(result["history"], 0) |
| assert history["missing_checked"] == 1 |
| assert history["missing_found"] == 1 |
| assert history["docs_read"] == 1 |
| assert history["docs_written"] == 1 |
| assert history["doc_write_failures"] == 0 |
| |
| copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body |
| assert String.match?(copy["_rev"], ~r/^3-/) |
| assert is_list(copy["_conflicts"]) |
| assert length(copy["_conflicts"]) == 1 |
| conflict = Enum.at(copy["_conflicts"], 0) |
| assert String.match?(conflict, ~r/^2-/) |
| |
| # Resolve the conflict and re-replicate new revision |
| resolve_doc = %{"_id" => "2", "_rev" => conflict, "_deleted" => true} |
| save_docs(tgt_db_name, [resolve_doc]) |
| save_docs(src_db_name, [Map.put(doc, :value, "rainbow")]) |
| |
| result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) |
| assert result["ok"] |
| |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 6 |
| history = Enum.at(result["history"], 0) |
| assert history["missing_checked"] == 1 |
| assert history["missing_found"] == 1 |
| assert history["docs_read"] == 1 |
| assert history["docs_written"] == 1 |
| assert history["doc_write_failures"] == 0 |
| |
| copy = Couch.get!("/#{tgt_db_name}/2", query: %{:conflicts => true}).body |
| |
| assert String.match?(copy["_rev"], ~r/^4-/) |
| assert not Map.has_key?(copy, "_conflicts") |
| |
| # Test that existing revisions are not replicated |
| src_docs = [ |
| %{"_id" => "foo1", "value" => 111}, |
| %{"_id" => "foo2", "value" => 222}, |
| %{"_id" => "foo3", "value" => 333} |
| ] |
| |
| save_docs(src_db_name, src_docs) |
| save_docs(tgt_db_name, Enum.filter(src_docs, &(&1["_id"] != "foo2"))) |
| |
| result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) |
| assert result["ok"] |
| |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 7 |
| history = Enum.at(result["history"], 0) |
| assert history["missing_checked"] == 3 |
| assert history["missing_found"] == 1 |
| assert history["docs_read"] == 1 |
| assert history["docs_written"] == 1 |
| assert history["doc_write_failures"] == 0 |
| |
| docs = [ |
| %{"_id" => "foo4", "value" => 444}, |
| %{"_id" => "foo5", "value" => 555} |
| ] |
| |
| save_docs(src_db_name, docs) |
| save_docs(tgt_db_name, docs) |
| |
| result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) |
| assert result["ok"] |
| |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 8 |
| history = Enum.at(result["history"], 0) |
| assert history["missing_checked"] == 2 |
| assert history["missing_found"] == 0 |
| assert history["docs_read"] == 0 |
| assert history["docs_written"] == 0 |
| assert history["doc_write_failures"] == 0 |
| |
| # Test nothing to replicate |
| result = replicate(src_prefix <> src_db_name, tgt_prefix <> tgt_db_name) |
| assert result["ok"] |
| assert result["no_changes"] |
| end |
| |
| def run_since_seq_repl(src_prefix, tgt_prefix) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| docs = make_docs(1..5) |
| docs = save_docs(src_db_name, docs) |
| |
| changes = get_db_changes(src_db_name)["results"] |
| since_seq = Enum.at(changes, 2)["seq"] |
| |
| # TODO: In JS we re-fetch _changes with since_seq, is that |
| # really necessary? |
| expected_ids = |
| for change <- Enum.drop(changes, 3) do |
| change["id"] |
| end |
| |
| assert length(expected_ids) == 2 |
| |
| cancel_replication(repl_src, repl_tgt) |
| result = replicate(repl_src, repl_tgt, body: %{:since_seq => since_seq}) |
| cancel_replication(repl_src, repl_tgt) |
| |
| assert result["ok"] |
| assert is_list(result["history"]) |
| history = Enum.at(result["history"], 0) |
| assert history["missing_checked"] == 2 |
| assert history["missing_found"] == 2 |
| assert history["docs_read"] == 2 |
| assert history["docs_written"] == 2 |
| assert history["doc_write_failures"] == 0 |
| |
| Enum.each(docs, fn doc -> |
| result = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") |
| |
| if Enum.member?(expected_ids, doc["_id"]) do |
| assert result.status_code < 300 |
| assert cmp_json(doc, result.body) |
| else |
| assert result.status_code == 404 |
| end |
| end) |
| end |
| |
| def run_vdu_repl(src_prefix, tgt_prefix) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| docs = make_docs(1..7) |
| |
| docs = |
| for doc <- docs do |
| if doc["integer"] == 2 do |
| Map.put(doc, "_attachments", %{ |
| "hello.txt" => %{ |
| :content_type => "text/plain", |
| # base64:encode("hello world") |
| :data => "aGVsbG8gd29ybGQ=" |
| } |
| }) |
| else |
| doc |
| end |
| end |
| |
| docs = save_docs(src_db_name, docs) |
| |
| ddoc = %{ |
| "_id" => "_design/test", |
| "language" => "javascript", |
| "validate_doc_update" => """ |
| function(newDoc, oldDoc, userCtx, secObj) { |
| if((newDoc.integer % 2) !== 0) { |
| throw {forbidden: "I only like multiples of 2."}; |
| } |
| } |
| """ |
| } |
| |
| [_] = save_docs(tgt_db_name, [ddoc]) |
| |
| result = replicate(repl_src, repl_tgt) |
| assert result["ok"] |
| |
| assert is_list(result["history"]) |
| history = Enum.at(result["history"], 0) |
| assert history["missing_checked"] == 7 |
| assert history["missing_found"] == 7 |
| assert history["docs_read"] == 7 |
| assert history["docs_written"] == 3 |
| assert history["doc_write_failures"] == 4 |
| |
| for doc <- docs do |
| result = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") |
| |
| if rem(doc["integer"], 2) == 0 do |
| assert result.status_code < 300 |
| assert result.body["integer"] == doc["integer"] |
| else |
| assert result.status_code == 404 |
| end |
| end |
| end |
| |
| def run_create_target_repl(src_prefix, tgt_prefix) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| |
| create_db(src_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| # tgt_db_name is created by the replication |
| |
| docs = make_docs(1..2) |
| save_docs(src_db_name, docs) |
| |
| replicate(repl_src, repl_tgt, body: %{:create_target => true}) |
| |
| retry_until(fn -> |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| |
| src_shards = seq_to_shards(src_info["update_seq"]) |
| tgt_shards = seq_to_shards(tgt_info["update_seq"]) |
| assert tgt_shards == src_shards |
| end) |
| end |
| |
| def run_filtered_repl(src_prefix, tgt_prefix) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| docs = make_docs(1..30) |
| |
| ddoc = %{ |
| "_id" => "_design/mydesign", |
| "language" => "javascript", |
| "filters" => %{ |
| "myfilter" => """ |
| function(doc, req) { |
| var modulus = Number(req.query.modulus); |
| var special = req.query.special; |
| return (doc.integer % modulus === 0) || (doc.string === special); |
| } |
| """ |
| } |
| } |
| |
| [_ | docs] = save_docs(src_db_name, [ddoc | docs]) |
| |
| repl_body = %{ |
| "filter" => "mydesign/myfilter", |
| "query_params" => %{ |
| "modulus" => "2", |
| "special" => "7" |
| } |
| } |
| |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| assert result["ok"] |
| |
| Enum.each(docs, fn doc -> |
| resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") |
| |
| if rem(doc["integer"], 2) == 0 || doc["string"] == "7" do |
| assert resp.status_code < 300 |
| assert cmp_json(doc, resp.body) |
| else |
| assert resp.status_code == 404 |
| end |
| end) |
| |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 1 |
| history = Enum.at(result["history"], 0) |
| |
| # We (incorrectly) don't record update sequences for things |
| # that don't pass the changes feed filter. Historically the |
| # last document to pass was the second to last doc which has |
| # an update sequence of 30. Work that has been applied to avoid |
| # conflicts from duplicate IDs breaking _bulk_docs updates added |
| # a sort to the logic which changes this. Now the last document |
| # to pass has a doc id of "8" and is at update_seq 29 (because only |
| # "9" and the design doc are after it). |
| # |
| # In the future the fix ought to be that we record that update |
| # sequence of the database. BigCouch has some existing work on |
| # this in the clustered case because if you have very few documents |
| # that pass the filter then (given single node's behavior) you end |
| # up having to rescan a large portion of the database. |
| # we can't rely on sequences in a cluster |
| # not only can one figure appear twice (at least for n>1), there's also |
| # hashes involved now - so comparing seq==29 is lottery |
| # (= cutting off hashes is nonsense) above, there was brute-force |
| # comparing all attrs of all docs - now we did check if excluded docs |
| # did NOT make it in any way, we can't rely on sequences in a |
| # cluster (so leave out) |
| |
| # 16 => 15 docs with even integer field + 1 doc with string field "7" |
| assert history["missing_checked"] == 16 |
| assert history["missing_found"] == 16 |
| assert history["docs_read"] == 16 |
| assert history["docs_written"] == 16 |
| assert history["doc_write_failures"] == 0 |
| |
| new_docs = make_docs(50..55) |
| new_docs = save_docs(src_db_name, new_docs) |
| |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| assert result["ok"] |
| |
| Enum.each(new_docs, fn doc -> |
| resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") |
| |
| if rem(doc["integer"], 2) == 0 do |
| assert resp.status_code < 300 |
| assert cmp_json(doc, resp.body) |
| else |
| assert resp.status_code == 404 |
| end |
| end) |
| |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 2 |
| history = Enum.at(result["history"], 0) |
| |
| assert history["missing_checked"] == 3 |
| assert history["missing_found"] == 3 |
| assert history["docs_read"] == 3 |
| assert history["docs_written"] == 3 |
| assert history["doc_write_failures"] == 0 |
| end |
| |
| def run_filter_changed_repl(src_prefix, tgt_prefix) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| filter_fun_1 = """ |
| function(doc, req) { |
| if(doc.value < Number(req.query.maxvalue)) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| """ |
| |
| filter_fun_2 = """ |
| function(doc, req) { |
| return true; |
| } |
| """ |
| |
| docs = [ |
| %{"_id" => "foo1", "value" => 1}, |
| %{"_id" => "foo2", "value" => 2}, |
| %{"_id" => "foo3", :value => 3}, |
| %{"_id" => "foo4", :value => 4} |
| ] |
| |
| ddoc = %{ |
| "_id" => "_design/mydesign", |
| :language => "javascript", |
| :filters => %{ |
| :myfilter => filter_fun_1 |
| } |
| } |
| |
| [ddoc | _] = save_docs(src_db_name, [ddoc | docs]) |
| |
| repl_body = %{ |
| :filter => "mydesign/myfilter", |
| :query_params => %{ |
| :maxvalue => "3" |
| } |
| } |
| |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| assert result["ok"] |
| |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 1 |
| history = Enum.at(result["history"], 0) |
| assert history["docs_read"] == 2 |
| assert history["docs_written"] == 2 |
| assert history["doc_write_failures"] == 0 |
| |
| resp = Couch.get!("/#{tgt_db_name}/foo1") |
| assert HTTPotion.Response.success?(resp) |
| assert resp.body["value"] == 1 |
| |
| resp = Couch.get!("/#{tgt_db_name}/foo2") |
| assert HTTPotion.Response.success?(resp) |
| assert resp.body["value"] == 2 |
| |
| resp = Couch.get!("/#{tgt_db_name}/foo3") |
| assert resp.status_code == 404 |
| |
| resp = Couch.get!("/#{tgt_db_name}/foo4") |
| assert resp.status_code == 404 |
| |
| # Replication should start from scratch after the filter's code changed |
| ddoc = Map.put(ddoc, :filters, %{:myfilter => filter_fun_2}) |
| [_] = save_docs(src_db_name, [ddoc]) |
| |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| assert result["ok"] |
| |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 1 |
| history = Enum.at(result["history"], 0) |
| assert history["docs_read"] == 3 |
| assert history["docs_written"] == 3 |
| assert history["doc_write_failures"] == 0 |
| |
| resp = Couch.get!("/#{tgt_db_name}/foo1") |
| assert HTTPotion.Response.success?(resp) |
| assert resp.body["value"] == 1 |
| |
| resp = Couch.get!("/#{tgt_db_name}/foo2") |
| assert HTTPotion.Response.success?(resp) |
| assert resp.body["value"] == 2 |
| |
| resp = Couch.get!("/#{tgt_db_name}/foo3") |
| assert HTTPotion.Response.success?(resp) |
| assert resp.body["value"] == 3 |
| |
| resp = Couch.get!("/#{tgt_db_name}/foo4") |
| assert HTTPotion.Response.success?(resp) |
| assert resp.body["value"] == 4 |
| |
| resp = Couch.get!("/#{tgt_db_name}/_design/mydesign") |
| assert HTTPotion.Response.success?(resp) |
| end |
| |
| def run_by_id_repl(src_prefix, tgt_prefix) do |
| target_doc_ids = [ |
| %{ |
| :initial => ["1", "2", "10"], |
| :after => [], |
| :conflict_id => "2" |
| }, |
| %{ |
| :initial => ["1", "2"], |
| :after => ["7"], |
| :conflict_id => "1" |
| }, |
| %{ |
| :initial => ["1", "foo_666", "10"], |
| :after => ["7"], |
| :conflict_id => "10" |
| }, |
| %{ |
| :initial => ["_design/foo", "8"], |
| :after => ["foo_5"], |
| :conflict_id => "8" |
| }, |
| %{ |
| :initial => ["_design%2Ffoo", "8"], |
| :after => ["foo_5"], |
| :conflict_id => "8" |
| }, |
| %{ |
| :initial => [], |
| :after => ["foo_1000", "_design/foo", "1"], |
| :conflict_id => "1" |
| } |
| ] |
| |
| Enum.each(target_doc_ids, fn test_data -> |
| run_by_id_repl_impl(src_prefix, tgt_prefix, test_data) |
| end) |
| end |
| |
| def run_by_id_repl_impl(src_prefix, tgt_prefix, test_data) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| |
| retry_until(fn -> |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| end) |
| |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| docs = make_docs(1..10) |
| |
| ddoc = %{ |
| "_id" => "_design/foo", |
| :language => "javascript", |
| "integer" => 1 |
| } |
| |
| doc_ids = test_data[:initial] |
| |
| num_missing = |
| Enum.count(doc_ids, fn doc_id -> |
| String.starts_with?(doc_id, "foo_") |
| end) |
| |
| total_replicated = length(doc_ids) - num_missing |
| |
| [_ | docs] = save_docs(src_db_name, [ddoc | docs]) |
| |
| repl_body = %{:doc_ids => doc_ids} |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| assert result["ok"] |
| |
| if total_replicated == 0 do |
| assert result["no_changes"] |
| else |
| assert is_binary(result["start_time"]) |
| assert is_binary(result["end_time"]) |
| assert result["docs_read"] == total_replicated |
| assert result["docs_written"] == total_replicated |
| assert result["doc_write_failures"] == 0 |
| end |
| |
| Enum.each(doc_ids, fn doc_id -> |
| doc_id = URI.decode(doc_id) |
| orig = Couch.get!("/#{src_db_name}/#{doc_id}") |
| copy = Couch.get!("/#{tgt_db_name}/#{doc_id}") |
| |
| if String.starts_with?(doc_id, "foo_") do |
| assert orig.status_code == 404 |
| assert copy.status_code == 404 |
| else |
| assert HTTPotion.Response.success?(orig) |
| assert HTTPotion.Response.success?(copy) |
| assert cmp_json(orig.body, copy.body) |
| end |
| end) |
| |
| # Be absolutely sure that other docs were not replicated |
| Enum.each(docs, fn doc -> |
| encoded_id = URI.encode_www_form(doc["_id"]) |
| copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") |
| is_doc_id = &Enum.member?(doc_ids, &1) |
| |
| if is_doc_id.(doc["_id"]) or is_doc_id.(encoded_id) do |
| assert HTTPotion.Response.success?(copy) |
| else |
| assert copy.status_code == 404 |
| end |
| end) |
| |
| retry_until(fn -> |
| tgt_info = get_db_info(tgt_db_name) |
| assert tgt_info["doc_count"] == total_replicated |
| end) |
| |
| doc_ids_after = test_data[:after] |
| |
| num_missing_after = |
| Enum.count(doc_ids_after, fn doc_id -> |
| String.starts_with?(doc_id, "foo_") |
| end) |
| |
| repl_body = %{:doc_ids => doc_ids_after} |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| assert result["ok"] |
| |
| total_replicated_after = length(doc_ids_after) - num_missing_after |
| |
| if total_replicated_after == 0 do |
| assert result["no_changes"] |
| else |
| assert is_binary(result["start_time"]) |
| assert is_binary(result["end_time"]) |
| assert result["docs_read"] == total_replicated_after |
| assert result["docs_written"] == total_replicated_after |
| assert result["doc_write_failures"] == 0 |
| end |
| |
| Enum.each(doc_ids_after, fn doc_id -> |
| orig = Couch.get!("/#{src_db_name}/#{doc_id}") |
| copy = Couch.get!("/#{tgt_db_name}/#{doc_id}") |
| |
| if String.starts_with?(doc_id, "foo_") do |
| assert orig.status_code == 404 |
| assert copy.status_code == 404 |
| else |
| assert HTTPotion.Response.success?(orig) |
| assert HTTPotion.Response.success?(copy) |
| assert cmp_json(orig.body, copy.body) |
| end |
| end) |
| |
| # Be absolutely sure that other docs were not replicated |
| all_doc_ids = doc_ids ++ doc_ids_after |
| |
| Enum.each(docs, fn doc -> |
| encoded_id = URI.encode_www_form(doc["_id"]) |
| copy = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") |
| is_doc_id = &Enum.member?(all_doc_ids, &1) |
| |
| if is_doc_id.(doc["_id"]) or is_doc_id.(encoded_id) do |
| assert HTTPotion.Response.success?(copy) |
| else |
| assert copy.status_code == 404 |
| end |
| end) |
| |
| retry_until(fn -> |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == total_replicated + total_replicated_after, |
| "#{inspect(test_data)}" |
| end) |
| |
| # Update a source document and re-replicate (no conflict introduced) |
| conflict_id = test_data[:conflict_id] |
| doc = Couch.get!("/#{src_db_name}/#{conflict_id}").body |
| assert is_map(doc) |
| doc = Map.put(doc, "integer", 666) |
| [doc] = save_docs(src_db_name, [doc]) |
| |
| att1 = [ |
| name: "readme.txt", |
| body: get_att1_data(), |
| content_type: "text/plain" |
| ] |
| |
| att2 = [ |
| name: "data.dat", |
| body: get_att2_data(), |
| content_type: "application/binary" |
| ] |
| |
| doc = add_attachment(src_db_name, doc, att1) |
| doc = add_attachment(src_db_name, doc, att2) |
| |
| repl_body = %{:doc_ids => [conflict_id]} |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| assert result["ok"] |
| |
| assert result["docs_read"] == 1 |
| assert result["docs_written"] == 1 |
| assert result["doc_write_failures"] == 0 |
| |
| query = %{"conflicts" => "true"} |
| copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}", query: query) |
| assert HTTPotion.Response.success?(copy) |
| assert copy.body["integer"] == 666 |
| assert String.starts_with?(copy.body["_rev"], "4-") |
| assert not Map.has_key?(doc, "_conflicts") |
| |
| atts = copy.body["_attachments"] |
| assert is_map(atts) |
| assert is_map(atts["readme.txt"]) |
| assert atts["readme.txt"]["revpos"] == 3 |
| assert String.match?(atts["readme.txt"]["content_type"], ~r/text\/plain/) |
| assert atts["readme.txt"]["stub"] |
| |
| att1_data = Couch.get!("/#{tgt_db_name}/#{conflict_id}/readme.txt").body |
| assert String.length(att1_data) == String.length(att1[:body]) |
| assert att1_data == att1[:body] |
| |
| assert is_map(atts["data.dat"]) |
| assert atts["data.dat"]["revpos"] == 4 |
| ct_re = ~r/application\/binary/ |
| assert String.match?(atts["data.dat"]["content_type"], ct_re) |
| assert atts["data.dat"]["stub"] |
| |
| att2_data = Couch.get!("/#{tgt_db_name}/#{conflict_id}/data.dat").body |
| assert String.length(att2_data) == String.length(att2[:body]) |
| assert att2_data == att2[:body] |
| |
| # Generate a conflict using replication by doc ids |
| orig = Couch.get!("/#{src_db_name}/#{conflict_id}").body |
| orig = Map.update!(orig, "integer", &(&1 + 100)) |
| [_] = save_docs(src_db_name, [orig]) |
| |
| copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}").body |
| copy = Map.update!(copy, "integer", &(&1 + 1)) |
| [_] = save_docs(tgt_db_name, [copy]) |
| |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| assert result["ok"] |
| assert result["docs_read"] == 1 |
| assert result["docs_written"] == 1 |
| assert result["doc_write_failures"] == 0 |
| |
| retry_until(fn -> |
| copy = Couch.get!("/#{tgt_db_name}/#{conflict_id}", query: query).body |
| assert String.match?(copy["_rev"], ~r/^5-/) |
| assert is_list(copy["_conflicts"]) |
| assert length(copy["_conflicts"]) == 1 |
| conflict_rev = Enum.at(copy["_conflicts"], 0) |
| assert String.match?(conflict_rev, ~r/^5-/) |
| end) |
| end |
| |
| def run_continuous_repl(src_prefix, tgt_prefix) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| ddoc = %{ |
| "_id" => "_design/mydesign", |
| "language" => "javascript", |
| "filters" => %{ |
| "myfilter" => "function(doc, req) { return true; }" |
| } |
| } |
| |
| docs = make_docs(1..25) |
| docs = save_docs(src_db_name, docs ++ [ddoc]) |
| |
| att1_data = get_att1_data() |
| |
| docs = |
| for doc <- docs do |
| if doc["integer"] >= 10 and doc["integer"] < 15 do |
| add_attachment(src_db_name, doc) |
| else |
| doc |
| end |
| end |
| |
| repl_body = %{:continuous => true} |
| result = replicate(repl_src, repl_tgt, body: repl_body) |
| |
| assert result["ok"] |
| assert is_binary(result["_local_id"]) |
| |
| repl_id = result["_local_id"] |
| task = get_task(repl_id, 30_000) |
| assert is_map(task), "Error waiting for replication to start" |
| |
| wait_for_repl(src_db_name, repl_id, 26) |
| |
| Enum.each(docs, fn doc -> |
| resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") |
| assert resp.status_code < 300 |
| assert cmp_json(doc, resp.body) |
| |
| if doc["integer"] >= 10 and doc["integer"] < 15 do |
| atts = resp.body["_attachments"] |
| assert is_map(atts) |
| att = atts["readme.txt"] |
| assert is_map(att) |
| assert att["revpos"] == 2 |
| assert String.match?(att["content_type"], ~r/text\/plain/) |
| assert att["stub"] |
| |
| resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/readme.txt") |
| assert String.length(resp.body) == String.length("some text") |
| assert resp.body == "some text" |
| end |
| end) |
| |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| |
| # Add attachments to more source docs |
| docs = |
| for doc <- docs do |
| is_ddoc = String.starts_with?(doc["_id"], "_design/") |
| |
| case doc["integer"] do |
| n when n >= 10 and n < 15 -> |
| ctype = "application/binary" |
| opts = [name: "data.dat", body: att1_data, content_type: ctype] |
| add_attachment(src_db_name, doc, opts) |
| |
| _ when is_ddoc -> |
| add_attachment(src_db_name, doc) |
| |
| _ -> |
| doc |
| end |
| end |
| |
| wait_for_repl(src_db_name, repl_id, 32) |
| |
| Enum.each(docs, fn doc -> |
| is_ddoc = String.starts_with?(doc["_id"], "_design/") |
| |
| case doc["integer"] do |
| N when (N >= 10 and N < 15) or is_ddoc -> |
| resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") |
| atts = resp.body["_attachments"] |
| assert is_map(atts) |
| att = atts["readme.txt"] |
| assert is_map(att) |
| assert att["revpos"] == 2 |
| assert String.match?(att["content_type"], ~r/text\/plain/) |
| assert att["stub"] |
| |
| resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/readme.txt") |
| assert String.length(resp.body) == String.length("some text") |
| assert resp.body == "some text" |
| |
| if not is_ddoc do |
| att = atts["data.dat"] |
| assert is_map(att) |
| assert att["revpos"] == 3 |
| assert String.match?(att["content_type"], ~r/application\/binary/) |
| assert att["stub"] |
| |
| resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}/data.dat") |
| assert String.length(resp.body) == String.length(att1_data) |
| assert resp.body == att1_data |
| end |
| |
| _ -> |
| :ok |
| end |
| end) |
| |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| |
| ddoc = List.last(docs) |
| ctype = "application/binary" |
| opts = [name: "data.dat", body: att1_data, content_type: ctype] |
| add_attachment(src_db_name, ddoc, opts) |
| |
| wait_for_repl(src_db_name, repl_id, 33) |
| |
| resp = Couch.get("/#{tgt_db_name}/#{ddoc["_id"]}") |
| atts = resp.body["_attachments"] |
| assert is_map(atts) |
| att = atts["readme.txt"] |
| assert is_map(att) |
| assert att["revpos"] == 2 |
| assert String.match?(att["content_type"], ~r/text\/plain/) |
| assert att["stub"] |
| |
| resp = Couch.get!("/#{tgt_db_name}/#{ddoc["_id"]}/readme.txt") |
| assert String.length(resp.body) == String.length("some text") |
| assert resp.body == "some text" |
| |
| att = atts["data.dat"] |
| assert is_map(att) |
| assert att["revpos"] == 3 |
| assert String.match?(att["content_type"], ~r/application\/binary/) |
| assert att["stub"] |
| |
| resp = Couch.get!("/#{tgt_db_name}/#{ddoc["_id"]}/data.dat") |
| assert String.length(resp.body) == String.length(att1_data) |
| assert resp.body == att1_data |
| |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| |
| # Check creating new normal documents |
| new_docs = make_docs(26..35) |
| new_docs = save_docs(src_db_name, new_docs) |
| |
| wait_for_repl(src_db_name, repl_id, 43) |
| |
| Enum.each(new_docs, fn doc -> |
| resp = Couch.get!("/#{tgt_db_name}/#{doc["_id"]}") |
| assert resp.status_code < 300 |
| assert cmp_json(doc, resp.body) |
| end) |
| |
| src_info = get_db_info(src_db_name) |
| tgt_info = get_db_info(tgt_db_name) |
| |
| assert tgt_info["doc_count"] == src_info["doc_count"] |
| |
| # Delete docs from the source |
| |
| doc1 = Enum.at(new_docs, 0) |
| query = %{:rev => doc1["_rev"]} |
| Couch.delete!("/#{src_db_name}/#{doc1["_id"]}", query: query) |
| |
| doc2 = Enum.at(new_docs, 6) |
| query = %{:rev => doc2["_rev"]} |
| Couch.delete!("/#{src_db_name}/#{doc2["_id"]}", query: query) |
| |
| wait_for_repl(src_db_name, repl_id, 45) |
| |
| resp = Couch.get("/#{tgt_db_name}/#{doc1["_id"]}") |
| assert resp.status_code == 404 |
| resp = Couch.get("/#{tgt_db_name}/#{doc2["_id"]}") |
| assert resp.status_code == 404 |
| |
| changes = get_db_changes(tgt_db_name, %{:since => tgt_info["update_seq"]}) |
| # quite unfortunately, there is no way on relying on ordering in a cluster |
| # but we can assume a length of 2 |
| changes = |
| for change <- changes["results"] do |
| {change["id"], change["deleted"]} |
| end |
| |
| assert Enum.sort(changes) == [{doc1["_id"], true}, {doc2["_id"], true}] |
| |
| # Cancel the replication |
| repl_body = %{:continuous => true, :cancel => true} |
| resp = replicate(repl_src, repl_tgt, body: repl_body) |
| assert resp["ok"] |
| assert resp["_local_id"] == repl_id |
| |
| doc = %{"_id" => "foobar", "value" => 666} |
| [doc] = save_docs(src_db_name, [doc]) |
| |
| wait_for_repl_stop(repl_id, 30_000) |
| |
| resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") |
| assert resp.status_code == 404 |
| end |
| |
| def run_compressed_att_repl(src_prefix, tgt_prefix) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| doc = %{"_id" => "foobar"} |
| [doc] = save_docs(src_db_name, [doc]) |
| |
| att1_data = get_att1_data() |
| num_copies = 1 + round(128 * 1024 / String.length(att1_data)) |
| |
| big_att = |
| List.foldl(Enum.to_list(1..num_copies), "", fn _, acc -> |
| acc <> att1_data |
| end) |
| |
| doc = add_attachment(src_db_name, doc, body: big_att) |
| |
| # Disable attachment compression |
| set_config_raw("attachments", "compression_level", "0") |
| |
| result = replicate(repl_src, repl_tgt) |
| assert result["ok"] |
| assert is_list(result["history"]) |
| assert length(result["history"]) == 1 |
| history = Enum.at(result["history"], 0) |
| assert history["missing_checked"] == 1 |
| assert history["missing_found"] == 1 |
| assert history["docs_read"] == 1 |
| assert history["docs_written"] == 1 |
| assert history["doc_write_failures"] == 0 |
| |
| token = Enum.random(1..1_000_000) |
| query = %{att_encoding_info: true, bypass_cache: token} |
| resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}", query: query) |
| assert resp.status_code < 300 |
| assert is_map(resp.body["_attachments"]) |
| att = resp.body["_attachments"]["readme.txt"] |
| assert att["encoding"] == "gzip" |
| assert is_integer(att["length"]) |
| assert is_integer(att["encoded_length"]) |
| assert att["encoded_length"] < att["length"] |
| end |
| |
| def run_non_admin_target_user_repl(src_prefix, tgt_prefix, ctx) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| set_security(tgt_db_name, %{ |
| :admins => %{ |
| :names => ["superman"], |
| :roles => ["god"] |
| } |
| }) |
| |
| docs = make_docs(1..6) |
| ddoc = %{"_id" => "_design/foo", "language" => "javascript"} |
| docs = save_docs(src_db_name, [ddoc | docs]) |
| |
| sess = Couch.login(ctx[:userinfo]) |
| resp = Couch.Session.get(sess, "/_session") |
| assert resp.body["ok"] |
| assert resp.body["userCtx"]["name"] == "joe" |
| |
| opts = [ |
| userinfo: ctx[:userinfo], |
| headers: [cookie: sess.cookie] |
| ] |
| |
| result = replicate(repl_src, repl_tgt, opts) |
| |
| assert Couch.Session.logout(sess).body["ok"] |
| |
| assert result["ok"] |
| history = Enum.at(result["history"], 0) |
| assert history["docs_read"] == length(docs) |
| # ddoc write failed |
| assert history["docs_written"] == length(docs) - 1 |
| # ddoc write failed |
| assert history["doc_write_failures"] == 1 |
| |
| Enum.each(docs, fn doc -> |
| resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") |
| |
| if String.starts_with?(doc["_id"], "_design/") do |
| assert resp.status_code == 404 |
| else |
| assert HTTPotion.Response.success?(resp) |
| assert cmp_json(doc, resp.body) |
| end |
| end) |
| end |
| |
| def run_non_admin_or_reader_source_user_repl(src_prefix, tgt_prefix, ctx) do |
| base_db_name = random_db_name() |
| src_db_name = base_db_name <> "_src" |
| tgt_db_name = base_db_name <> "_tgt" |
| repl_src = src_prefix <> src_db_name |
| repl_tgt = tgt_prefix <> tgt_db_name |
| |
| create_db(src_db_name) |
| create_db(tgt_db_name) |
| delete_on_exit([src_db_name, tgt_db_name]) |
| |
| set_security(tgt_db_name, %{ |
| :admins => %{ |
| :names => ["superman"], |
| :roles => ["god"] |
| }, |
| :readers => %{ |
| :names => ["john"], |
| :roles => ["secret"] |
| } |
| }) |
| |
| docs = make_docs(1..6) |
| ddoc = %{"_id" => "_design/foo", "language" => "javascript"} |
| docs = save_docs(src_db_name, [ddoc | docs]) |
| |
| sess = Couch.login(ctx[:userinfo]) |
| resp = Couch.Session.get(sess, "/_session") |
| assert resp.body["ok"] |
| assert resp.body["userCtx"]["name"] == "joe" |
| |
| opts = [ |
| userinfo: ctx[:userinfo], |
| headers: [cookie: sess.cookie] |
| ] |
| |
| assert_raise(ExUnit.AssertionError, fn -> |
| replicate(repl_src, repl_tgt, opts) |
| end) |
| |
| assert Couch.Session.logout(sess).body["ok"] |
| |
| Enum.each(docs, fn doc -> |
| resp = Couch.get("/#{tgt_db_name}/#{doc["_id"]}") |
| assert resp.status_code == 404 |
| end) |
| end |
| |
| def get_db_info(db_name) do |
| resp = Couch.get("/#{db_name}") |
| assert HTTPotion.Response.success?(resp) |
| resp.body |
| end |
| |
| def replicate(src, tgt, options \\ []) do |
| {userinfo, options} = Keyword.pop(options, :userinfo) |
| |
| userinfo = |
| if userinfo == nil do |
| @admin_account |
| else |
| userinfo |
| end |
| |
| src = set_user(src, userinfo) |
| tgt = set_user(tgt, userinfo) |
| |
| defaults = [headers: [], body: %{}, timeout: 30_000] |
| options = defaults |> Keyword.merge(options) |> Enum.into(%{}) |
| |
| %{body: body} = options |
| body = [source: src, target: tgt] |> Enum.into(body) |
| options = Map.put(options, :body, body) |
| |
| resp = Couch.post("/_replicate", Enum.to_list(options)) |
| assert HTTPotion.Response.success?(resp), "#{inspect(resp)}" |
| resp.body |
| end |
| |
| def cancel_replication(src, tgt) do |
| body = %{:cancel => true} |
| |
| try do |
| replicate(src, tgt, body: body) |
| rescue |
| ExUnit.AssertionError -> :ok |
| end |
| end |
| |
| def get_db_changes(db_name, query \\ %{}) do |
| resp = Couch.get("/#{db_name}/_changes", query: query) |
| assert HTTPotion.Response.success?(resp), "#{inspect(resp)} #{inspect(query)}" |
| resp.body |
| end |
| |
| def save_docs(db_name, docs) do |
| query = %{w: 3} |
| body = %{docs: docs} |
| resp = Couch.post("/#{db_name}/_bulk_docs", query: query, body: body) |
| assert HTTPotion.Response.success?(resp) |
| |
| for {doc, resp} <- Enum.zip(docs, resp.body) do |
| assert resp["ok"], "Error saving doc: #{doc["_id"]}" |
| Map.put(doc, "_rev", resp["rev"]) |
| end |
| end |
| |
| def set_security(db_name, sec_props) do |
| resp = Couch.put("/#{db_name}/_security", body: :jiffy.encode(sec_props)) |
| assert HTTPotion.Response.success?(resp) |
| assert resp.body["ok"] |
| end |
| |
| def add_attachment(db_name, doc, att \\ []) do |
| defaults = [ |
| name: <<"readme.txt">>, |
| body: <<"some text">>, |
| content_type: "text/plain" |
| ] |
| |
| att = defaults |> Keyword.merge(att) |> Enum.into(%{}) |
| uri = "/#{db_name}/#{URI.encode(doc["_id"])}/#{att[:name]}" |
| headers = ["Content-Type": att[:content_type]] |
| |
| params = |
| if doc["_rev"] do |
| %{:w => 3, :rev => doc["_rev"]} |
| else |
| %{:w => 3} |
| end |
| |
| retry_until(fn -> |
| resp = Couch.put(uri, headers: headers, query: params, body: att[:body]) |
| assert HTTPotion.Response.success?(resp) |
| Map.put(doc, "_rev", resp.body["rev"]) |
| end) |
| end |
| |
| def wait_for_repl(src_db_name, repl_id, expect_revs_checked) do |
| wait_for_repl(src_db_name, repl_id, expect_revs_checked, 30_000) |
| end |
| |
| def wait_for_repl(_, _, _, wait_left) when wait_left <= 0 do |
| assert false, "Timeout waiting for replication" |
| end |
| |
| def wait_for_repl(src_db_name, repl_id, expect_revs_checked, wait_left) do |
| task = get_task(repl_id, 0) |
| through_seq = task["through_seq"] || "0" |
| revs_checked = task["revisions_checked"] |
| changes = get_db_changes(src_db_name, %{:since => through_seq}) |
| |
| if length(changes["results"]) > 0 or revs_checked < expect_revs_checked do |
| :timer.sleep(500) |
| wait_for_repl(src_db_name, repl_id, expect_revs_checked, wait_left - 500) |
| end |
| |
| task |
| end |
| |
| def wait_for_repl_stop(repl_id) do |
| wait_for_repl_stop(repl_id, 30_000) |
| end |
| |
| def wait_for_repl_stop(repl_id, wait_left) when wait_left <= 0 do |
| assert false, "Timeout waiting for replication task to stop: #{repl_id}" |
| end |
| |
| def wait_for_repl_stop(repl_id, wait_left) do |
| task = get_task(repl_id, 0) |
| |
| if is_map(task) do |
| :timer.sleep(500) |
| wait_for_repl_stop(repl_id, wait_left - 500) |
| end |
| end |
| |
| def get_last_seq(db_name) do |
| body = get_db_changes(db_name, %{:since => "now"}) |
| body["last_seq"] |
| end |
| |
| def get_task(repl_id, delay) when delay <= 0 do |
| try_get_task(repl_id) |
| end |
| |
| def get_task(repl_id, delay) do |
| case try_get_task(repl_id) do |
| result when is_map(result) -> |
| result |
| |
| _ -> |
| :timer.sleep(500) |
| get_task(repl_id, delay - 500) |
| end |
| end |
| |
| def try_get_task(repl_id) do |
| resp = Couch.get("/_active_tasks") |
| assert HTTPotion.Response.success?(resp) |
| assert is_list(resp.body) |
| |
| Enum.find(resp.body, nil, fn task -> |
| task["replication_id"] == repl_id |
| end) |
| end |
| |
| def set_user(uri, userinfo) do |
| case URI.parse(uri) do |
| %{scheme: nil} -> |
| uri |
| |
| %{userinfo: nil} = uri -> |
| URI.to_string(Map.put(uri, :userinfo, userinfo)) |
| |
| _ -> |
| uri |
| end |
| end |
| |
| def get_att1_data do |
| File.read!(Path.expand("data/lorem.txt", __DIR__)) |
| end |
| |
| def get_att2_data do |
| File.read!(Path.expand("data/lorem_b64.txt", __DIR__)) |
| end |
| |
| def cmp_json(lhs, rhs) when is_map(lhs) and is_map(rhs) do |
| Enum.reduce_while(lhs, true, fn {k, v}, true -> |
| if Map.has_key?(rhs, k) do |
| if cmp_json(v, rhs[k]) do |
| {:cont, true} |
| else |
| Logger.error("#{inspect(lhs)} != #{inspect(rhs)}") |
| {:halt, false} |
| end |
| else |
| Logger.error("#{inspect(lhs)} != #{inspect(rhs)}") |
| {:halt, false} |
| end |
| end) |
| end |
| |
| def cmp_json(lhs, rhs), do: lhs == rhs |
| |
| def seq_to_shards(seq) do |
| for {_node, range, update_seq} <- decode_seq(seq) do |
| {range, update_seq} |
| end |
| end |
| |
| def decode_seq(seq) do |
| seq = String.replace(seq, ~r/\d+-/, "", global: false) |
| :erlang.binary_to_term(Base.url_decode64!(seq, padding: false)) |
| end |
| |
| def delete_on_exit(db_names) when is_list(db_names) do |
| on_exit(fn -> |
| Enum.each(db_names, fn name -> |
| delete_db(name) |
| end) |
| end) |
| end |
| end |