| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(mem3_rep). |
| |
| -export([ |
| go/2, |
| go/3, |
| make_local_id/2, |
| make_local_id/3, |
| make_purge_id/2, |
| verify_purge_checkpoint/2, |
| find_source_seq/4, |
| find_split_target_seq/4, |
| local_id_hash/1 |
| ]). |
| |
| -export([ |
| changes_enumerator/2 |
| ]). |
| |
| -include_lib("mem3/include/mem3.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| -record(acc, { |
| batch_size, |
| batch_count, |
| seq = 0, |
| revcount = 0, |
| source, |
| targets, |
| filter, |
| db, |
| hashfun, |
| incomplete_ranges |
| }). |
| |
| -record(tgt, { |
| shard, |
| seq = 0, |
| infos = [], |
| localid, |
| purgeid, |
| history = {[]}, |
| remaining = 0 |
| }). |
| |
| go(Source, Target) -> |
| go(Source, Target, []). |
| |
| go(DbName, Node, Opts) when is_binary(DbName), is_atom(Node) -> |
| go(#shard{name = DbName, node = node()}, #shard{name = DbName, node = Node}, Opts); |
| go(#shard{} = Source0, #shard{} = Target0, Opts) -> |
| Source = add_range(Source0), |
| Target = add_range(Target0), |
| case mem3:db_is_current(Source) of |
| true -> |
| go(Source, targets_map(Source, Target), Opts); |
| false -> |
| % Database could have been recreated |
| {error, missing_source} |
| end; |
| go(#shard{} = Source, #{} = Targets0, Opts) when map_size(Targets0) > 0 -> |
| Targets = maps:map(fun(_, T) -> #tgt{shard = T} end, Targets0), |
| case couch_server:exists(Source#shard.name) of |
| true -> |
| sync_security(Source, Targets), |
| BatchSize = |
| case proplists:get_value(batch_size, Opts) of |
| BS when is_integer(BS), BS > 0 -> BS; |
| _ -> 100 |
| end, |
| BatchCount = |
| case proplists:get_value(batch_count, Opts) of |
| all -> all; |
| BC when is_integer(BC), BC > 0 -> BC; |
| _ -> 1 |
| end, |
| IncompleteRanges = config:get_boolean( |
| "mem3", |
| "incomplete_ranges", |
| false |
| ), |
| Filter = proplists:get_value(filter, Opts), |
| Acc = #acc{ |
| batch_size = BatchSize, |
| batch_count = BatchCount, |
| source = Source, |
| targets = Targets, |
| filter = Filter, |
| incomplete_ranges = IncompleteRanges |
| }, |
| go(Acc); |
| false -> |
| {error, missing_source} |
| end. |
| |
| go(#acc{source = Source, batch_count = BC} = Acc) -> |
| case couch_db:open(Source#shard.name, [?ADMIN_CTX]) of |
| {ok, Db} -> |
| Resp = |
| try |
| HashFun = mem3_hash:get_hash_fun(couch_db:name(Db)), |
| repl(Acc#acc{db = Db, hashfun = HashFun}) |
| catch |
| error:{error, missing_source} -> |
| {error, missing_source}; |
| error:{not_found, no_db_file} -> |
| {error, missing_target} |
| after |
| couch_db:close(Db) |
| end, |
| case Resp of |
| {ok, P} when P > 0, BC == all -> |
| go(Acc); |
| {ok, P} when P > 0, BC > 1 -> |
| go(Acc#acc{batch_count = BC - 1}); |
| Else -> |
| Else |
| end; |
| {not_found, no_db_file} -> |
| {error, missing_source} |
| end. |
| |
| make_local_id(Source, Target) -> |
| make_local_id(Source, Target, undefined). |
| |
| make_local_id(#shard{node = SourceNode}, #shard{node = TargetNode}, Filter) -> |
| make_local_id(SourceNode, TargetNode, Filter); |
| make_local_id(SourceThing, TargetThing, F) when is_binary(F) -> |
| S = local_id_hash(SourceThing), |
| T = local_id_hash(TargetThing), |
| <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>; |
| make_local_id(SourceThing, TargetThing, Filter) -> |
| S = local_id_hash(SourceThing), |
| T = local_id_hash(TargetThing), |
| F = filter_hash(Filter), |
| <<"_local/shard-sync-", S/binary, "-", T/binary, F/binary>>. |
| |
| filter_hash(Filter) when is_function(Filter) -> |
| {new_uniq, Hash} = erlang:fun_info(Filter, new_uniq), |
| B = couch_util:encodeBase64Url(Hash), |
| <<"-", B/binary>>; |
| filter_hash(_) -> |
| <<>>. |
| |
| local_id_hash(Thing) -> |
| couch_util:encodeBase64Url(couch_hash:md5_hash(term_to_binary(Thing))). |
| |
| make_purge_id(SourceUUID, TargetUUID) -> |
| <<"_local/purge-mem3-", SourceUUID/binary, "-", TargetUUID/binary>>. |
| |
| verify_purge_checkpoint(DbName, Props) -> |
| try |
| Type = couch_util:get_value(<<"type">>, Props), |
| if |
| Type =/= <<"internal_replication">> -> |
| false; |
| true -> |
| SourceBin = couch_util:get_value(<<"source">>, Props), |
| TargetBin = couch_util:get_value(<<"target">>, Props), |
| Range = couch_util:get_value(<<"range">>, Props), |
| |
| Source = binary_to_existing_atom(SourceBin, latin1), |
| Target = binary_to_existing_atom(TargetBin, latin1), |
| |
| try |
| Nodes = lists:foldl( |
| fun(Shard, Acc) -> |
| case Shard#shard.range == Range of |
| true -> [Shard#shard.node | Acc]; |
| false -> Acc |
| end |
| end, |
| [], |
| mem3:shards(DbName) |
| ), |
| lists:member(Source, Nodes) andalso lists:member(Target, Nodes) |
| catch |
| error:database_does_not_exist -> |
| false |
| end |
| end |
| catch |
| _:_ -> |
| false |
| end. |
| |
| %% @doc Find and return the largest update_seq in SourceDb |
| %% that the client has seen from TargetNode. |
| %% |
| %% When reasoning about this function it is very important to |
| %% understand the direction of replication for this comparison. |
| %% We're only interesting in internal replications initiated |
| %% by this node to the node being replaced. When doing a |
| %% replacement the most important thing is that the client doesn't |
| %% miss any updates. This means we can only fast-forward as far |
| %% as they've seen updates on this node. We can detect that by |
| %% looking for our push replication history and choosing the |
| %% largest source_seq that has a target_seq =< TgtSeq. |
| find_source_seq(SrcDb, TgtNode, TgtUUIDPrefix, TgtSeq) -> |
| case find_repl_doc(SrcDb, TgtUUIDPrefix) of |
| {ok, TgtUUID, Doc} -> |
| SrcNode = atom_to_binary(node(), utf8), |
| find_source_seq_int(Doc, SrcNode, TgtNode, TgtUUID, TgtSeq); |
| {not_found, _} -> |
| couch_log:warning( |
| "~p find_source_seq repl doc not_found " |
| "src_db: ~p, tgt_node: ~p, tgt_uuid_prefix: ~p, tgt_seq: ~p", |
| [?MODULE, SrcDb, TgtNode, TgtUUIDPrefix, TgtSeq] |
| ), |
| 0 |
| end. |
| |
| find_source_seq_int(#doc{body = {Props}}, SrcNode0, TgtNode0, TgtUUID, TgtSeq) -> |
| SrcNode = |
| case is_atom(SrcNode0) of |
| true -> atom_to_binary(SrcNode0, utf8); |
| false -> SrcNode0 |
| end, |
| TgtNode = |
| case is_atom(TgtNode0) of |
| true -> atom_to_binary(TgtNode0, utf8); |
| false -> TgtNode0 |
| end, |
| % This is split off purely for the ability to run unit tests |
| % against this bit of code without requiring all sorts of mocks. |
| {History} = couch_util:get_value(<<"history">>, Props, {[]}), |
| SrcHistory = couch_util:get_value(SrcNode, History, []), |
| UseableHistory = lists:filter( |
| fun({Entry}) -> |
| couch_util:get_value(<<"target_node">>, Entry) =:= TgtNode andalso |
| couch_util:get_value(<<"target_uuid">>, Entry) =:= TgtUUID andalso |
| couch_util:get_value(<<"target_seq">>, Entry) =< TgtSeq |
| end, |
| SrcHistory |
| ), |
| |
| % This relies on SrcHistory being ordered desceding by source |
| % sequence. |
| case UseableHistory of |
| [{Entry} | _] -> |
| couch_util:get_value(<<"source_seq">>, Entry); |
| [] -> |
| couch_log:warning( |
| "~p find_source_seq_int nil useable history " |
| "src_node: ~p, tgt_node: ~p, tgt_uuid: ~p, tgt_seq: ~p, " |
| "src_history: ~p", |
| [?MODULE, SrcNode, TgtNode, TgtUUID, TgtSeq, SrcHistory] |
| ), |
| 0 |
| end. |
| |
| find_split_target_seq(TgtDb, SrcNode0, SrcUUIDPrefix, SrcSeq) -> |
| SrcNode = |
| case is_atom(SrcNode0) of |
| true -> atom_to_binary(SrcNode0, utf8); |
| false -> SrcNode0 |
| end, |
| case find_split_target_seq_int(TgtDb, SrcNode, SrcUUIDPrefix) of |
| {ok, [{BulkCopySeq, BulkCopySeq} | _]} when SrcSeq =< BulkCopySeq -> |
| % Check if source sequence is at or below the initial bulk copy |
| % checkpointed sequence. That sequence or anything lower than it |
| % can be directly replaced with the same value for each target. For |
| % extra safety we assert that the initial source and target |
| % sequences are the same value |
| SrcSeq; |
| {ok, Seqs = [{_, _} | _]} -> |
| % Pick the target sequence for the greatest source sequence that is |
| % less than `SrcSeq`. |
| case lists:takewhile(fun({Seq, _}) -> Seq < SrcSeq end, Seqs) of |
| [] -> |
| couch_log:warning( |
| "~p find_split_target_seq target seq not found " |
| "tgt_db: ~p, src_uuid_prefix: ~p, src_seq: ~p", |
| [?MODULE, couch_db:name(TgtDb), SrcUUIDPrefix, SrcSeq] |
| ), |
| 0; |
| [{_, _} | _] = Seqs1 -> |
| {_, TSeq} = lists:last(Seqs1), |
| TSeq |
| end; |
| {not_found, _} -> |
| couch_log:warning( |
| "~p find_split_target_seq target seq not found " |
| "tgt_db: ~p, src_uuid_prefix: ~p, src_seq: ~p", |
| [?MODULE, couch_db:name(TgtDb), SrcUUIDPrefix, SrcSeq] |
| ), |
| 0 |
| end. |
| |
| repl(#acc{db = Db0} = Acc0) -> |
| erlang:put(io_priority, {internal_repl, couch_db:name(Db0)}), |
| Acc1 = calculate_start_seq_multi(Acc0), |
| try |
| Acc3 = |
| case config:get_boolean("mem3", "replicate_purges", true) of |
| true -> |
| Acc2 = pull_purges_multi(Acc1), |
| push_purges_multi(Acc2); |
| false -> |
| Acc1 |
| end, |
| push_changes(Acc3) |
| catch |
| throw:{finished, Count} -> |
| {ok, Count} |
| end. |
| |
| pull_purges_multi(#acc{} = Acc0) -> |
| #acc{ |
| source = Source, |
| targets = Targets0, |
| batch_size = Count, |
| seq = UpdateSeq, |
| hashfun = HashFun |
| } = Acc0, |
| with_src_db(Acc0, fun(Db) -> |
| Targets = maps:map( |
| fun(_, #tgt{} = T) -> |
| pull_purges(Db, Count, Source, T, HashFun) |
| end, |
| reset_remaining(Targets0) |
| ), |
| Remaining = maps:fold( |
| fun(_, #tgt{remaining = R}, Sum) -> |
| Sum + R |
| end, |
| 0, |
| Targets |
| ), |
| if |
| Remaining == 0 -> |
| Acc0#acc{targets = Targets}; |
| true -> |
| PurgeSeq = couch_db:get_purge_seq(Db), |
| OldestPurgeSeq = couch_db:get_oldest_purge_seq(Db), |
| PurgesToPush = PurgeSeq - OldestPurgeSeq, |
| Changes = couch_db:count_changes_since(Db, UpdateSeq), |
| Pending = Remaining + PurgesToPush + Changes, |
| throw({finished, Pending}) |
| end |
| end). |
| |
| pull_purges(Db, Count, #shard{} = SrcShard, #tgt{} = Tgt0, HashFun) -> |
| #tgt{shard = TgtShard} = Tgt0, |
| SrcUUID = couch_db:get_uuid(Db), |
| #shard{node = TgtNode, name = TgtDbName} = TgtShard, |
| {LocalPurgeId, Infos, ThroughSeq, Remaining} = |
| mem3_rpc:load_purge_infos(TgtNode, TgtDbName, SrcUUID, Count), |
| Tgt = Tgt0#tgt{purgeid = LocalPurgeId}, |
| if |
| Infos == [] -> |
| ok; |
| true -> |
| % When shard ranges are split it's possible to pull purges from a |
| % larger target range to a smaller source range, we don't want to |
| % pull purges which don't belong on the source, so we filter them |
| % out using the same pickfun which we use when picking documents |
| #shard{range = SrcRange} = SrcShard, |
| BelongsFun = fun({_UUID, Id, _Revs}) when is_binary(Id) -> |
| case SrcRange of |
| [B, E] when is_integer(B), is_integer(E) -> |
| mem3_reshard_job:pickfun(Id, [SrcRange], HashFun) =:= SrcRange; |
| undefined -> |
| % We may replicate node-local databases |
| % which are not associated with a shard range. In that case |
| % range will be undefined. |
| true |
| end |
| end, |
| Infos1 = lists:filter(BelongsFun, Infos), |
| {ok, _} = couch_db:purge_docs(Db, Infos1, [?REPLICATED_CHANGES]), |
| Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq), |
| mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, LocalPurgeId, Body) |
| end, |
| Tgt#tgt{remaining = max(0, Remaining)}. |
| |
| push_purges_multi(#acc{} = Acc) -> |
| #acc{ |
| source = SrcShard, |
| targets = Targets0, |
| batch_size = BatchSize, |
| seq = UpdateSeq, |
| hashfun = HashFun |
| } = Acc, |
| with_src_db(Acc, fun(Db) -> |
| Targets = maps:map( |
| fun(_, #tgt{} = T) -> |
| push_purges(Db, BatchSize, SrcShard, T, HashFun) |
| end, |
| reset_remaining(Targets0) |
| ), |
| Remaining = maps:fold( |
| fun(_, #tgt{remaining = R}, Sum) -> |
| Sum + R |
| end, |
| 0, |
| Targets |
| ), |
| if |
| Remaining == 0 -> |
| Acc#acc{targets = Targets}; |
| true -> |
| Changes = couch_db:count_changes_since(Db, UpdateSeq), |
| throw({finished, Remaining + Changes}) |
| end |
| end). |
| |
| push_purges(Db, BatchSize, SrcShard, Tgt, HashFun) -> |
| #tgt{shard = TgtShard, purgeid = LocalPurgeId} = Tgt, |
| #shard{node = TgtNode, range = TgtRange, name = TgtDbName} = TgtShard, |
| StartSeq = |
| case couch_db:open_doc(Db, LocalPurgeId, []) of |
| {ok, #doc{body = {Props}}} -> |
| couch_util:get_value(<<"purge_seq">>, Props); |
| {not_found, _} -> |
| Oldest = couch_db:get_oldest_purge_seq(Db), |
| erlang:max(0, Oldest - 1) |
| end, |
| BelongsFun = fun(Id) when is_binary(Id) -> |
| case TgtRange of |
| [B, E] when is_integer(B), is_integer(E) -> |
| mem3_reshard_job:pickfun(Id, [TgtRange], HashFun) =:= TgtRange; |
| undefined -> |
| % We may replicate node-local databases which are not associated |
| % with a shard range. In that case range will be undefined. |
| true |
| end |
| end, |
| FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) -> |
| case BelongsFun(Id) of |
| true -> |
| NewCount = Count + length(Revs), |
| NewInfos = [{UUID, Id, Revs} | Infos], |
| Status = |
| if |
| NewCount < BatchSize -> ok; |
| true -> stop |
| end, |
| {Status, {NewCount, NewInfos, PSeq}}; |
| false -> |
| % In case of split shard ranges, purges, like documents, will |
| % belong only to one target |
| {ok, {Count, Infos, PSeq}} |
| end |
| end, |
| InitAcc = {0, [], StartSeq}, |
| {ok, {_, Infos, ThroughSeq}} = |
| couch_db:fold_purge_infos(Db, StartSeq, FoldFun, InitAcc), |
| if |
| Infos == [] -> |
| ok; |
| true -> |
| ok = purge_on_target(TgtNode, TgtDbName, Infos), |
| Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq), |
| Doc = #doc{id = LocalPurgeId, body = Body}, |
| {ok, _} = couch_db:update_doc(Db, Doc, []) |
| end, |
| Tgt#tgt{remaining = max(0, couch_db:get_purge_seq(Db) - ThroughSeq)}. |
| |
| calculate_start_seq_multi(#acc{} = Acc) -> |
| #acc{db = Db, targets = Targets0, filter = Filter} = Acc, |
| FilterHash = filter_hash(Filter), |
| Targets = maps:map( |
| fun(_, #tgt{} = T) -> |
| calculate_start_seq(Db, FilterHash, T) |
| end, |
| Targets0 |
| ), |
| % There will always be at least one target |
| #tgt{seq = Seq0} = hd(maps:values(Targets)), |
| Seq = maps:fold(fun(_, #tgt{seq = S}, M) -> min(S, M) end, Seq0, Targets), |
| Acc#acc{seq = Seq, targets = Targets}. |
| |
| calculate_start_seq(Db, FilterHash, #tgt{shard = TgtShard} = Tgt) -> |
| UUID = couch_db:get_uuid(Db), |
| #shard{node = Node, name = Name} = TgtShard, |
| {NewDocId, Doc} = mem3_rpc:load_checkpoint( |
| Node, |
| Name, |
| node(), |
| UUID, |
| FilterHash |
| ), |
| #doc{id = FoundId, body = {TProps}} = Doc, |
| Tgt1 = Tgt#tgt{localid = NewDocId}, |
| % NewDocId and FoundId may be different the first time |
| % this code runs to save our newly named internal replication |
| % checkpoints. We store NewDocId to use when saving checkpoints |
| % but use FoundId to reuse the same docid that the target used. |
| case couch_db:open_doc(Db, FoundId, [ejson_body]) of |
| {ok, #doc{body = {SProps}}} -> |
| SourceSeq = couch_util:get_value(<<"seq">>, SProps, 0), |
| TargetSeq = couch_util:get_value(<<"seq">>, TProps, 0), |
| % We resume from the lower update seq stored in the two |
| % shard copies. We also need to be sure and use the |
| % corresponding history. A difference here could result |
| % from either a write failure on one of the nodes or if |
| % either shard was truncated by an operator. |
| case SourceSeq =< TargetSeq of |
| true -> |
| Seq = SourceSeq, |
| History = couch_util:get_value(<<"history">>, SProps, {[]}); |
| false -> |
| Seq = TargetSeq, |
| History = couch_util:get_value(<<"history">>, TProps, {[]}) |
| end, |
| Tgt1#tgt{seq = Seq, history = History}; |
| {not_found, _} -> |
| compare_epochs(Db, Tgt1) |
| end. |
| |
| push_changes(#acc{} = Acc0) -> |
| #acc{ |
| db = Db0, |
| seq = Seq |
| } = Acc0, |
| |
| % Avoid needless rewriting the internal replication |
| % checkpoint document if nothing is replicated. |
| UpdateSeq = couch_db:get_update_seq(Db0), |
| if |
| Seq < UpdateSeq -> ok; |
| true -> throw({finished, 0}) |
| end, |
| |
| with_src_db(Acc0, fun(Db) -> |
| Acc1 = Acc0#acc{db = Db}, |
| Fun = fun ?MODULE:changes_enumerator/2, |
| {ok, Acc2} = couch_db:fold_changes(Db, Seq, Fun, Acc1), |
| {ok, #acc{seq = LastSeq}} = replicate_batch_multi(Acc2), |
| {ok, couch_db:count_changes_since(Db, LastSeq)} |
| end). |
| |
| compare_epochs(Db, #tgt{shard = TgtShard} = Tgt) -> |
| #shard{node = Node, name = Name} = TgtShard, |
| UUID = couch_db:get_uuid(Db), |
| Epochs = couch_db:get_epochs(Db), |
| Seq = mem3_rpc:find_common_seq(Node, Name, UUID, Epochs), |
| Tgt#tgt{seq = Seq, history = {[]}}. |
| |
| changes_enumerator(#doc_info{id = DocId}, #acc{db = Db} = Acc) -> |
| {ok, FDI} = couch_db:get_full_doc_info(Db, DocId), |
| changes_enumerator(FDI, Acc); |
| changes_enumerator(#full_doc_info{} = FDI, #acc{} = Acc0) -> |
| #acc{ |
| revcount = C, |
| targets = Targets0, |
| hashfun = HashFun, |
| incomplete_ranges = IncompleteRanges |
| } = Acc0, |
| #doc_info{high_seq = Seq, revs = Revs} = couch_doc:to_doc_info(FDI), |
| {Count, Targets} = |
| case filter_doc(Acc0#acc.filter, FDI) of |
| keep -> |
| NewTargets = changes_append_fdi( |
| FDI, |
| Targets0, |
| HashFun, |
| IncompleteRanges |
| ), |
| {C + length(Revs), NewTargets}; |
| discard -> |
| {C, Targets0} |
| end, |
| Acc1 = Acc0#acc{seq = Seq, revcount = Count, targets = Targets}, |
| Go = |
| if |
| Count < Acc1#acc.batch_size -> ok; |
| true -> stop |
| end, |
| {Go, Acc1}. |
| |
| changes_append_fdi( |
| #full_doc_info{id = Id} = FDI, |
| Targets, |
| HashFun, |
| IncompleteRanges |
| ) -> |
| case mem3_reshard_job:pickfun(Id, maps:keys(Targets), HashFun) of |
| not_in_range when IncompleteRanges -> |
| Targets; |
| not_in_range when not IncompleteRanges -> |
| ErrMsg = "~p : ~p not in any target ranges: ~p", |
| TShards = [TS || #tgt{shard = TS} <- maps:values(Targets)], |
| TNames = [TN || #shard{name = TN} <- TShards], |
| couch_log:error(ErrMsg, [?MODULE, Id, TNames]), |
| error({error, {Id, not_in_target_ranges}}); |
| Key -> |
| maps:update_with( |
| Key, |
| fun(#tgt{infos = Infos} = T) -> |
| T#tgt{infos = [FDI | Infos]} |
| end, |
| Targets |
| ) |
| end. |
| |
| replicate_batch_multi(#acc{targets = Targets0, seq = Seq, db = Db} = Acc) -> |
| Targets = maps:map( |
| fun(_, #tgt{} = T) -> |
| replicate_batch(T, Db, Seq) |
| end, |
| Targets0 |
| ), |
| {ok, Acc#acc{targets = Targets, revcount = 0}}. |
| |
| replicate_batch(#tgt{shard = TgtShard, infos = Infos} = Target, Db, Seq) -> |
| #shard{node = Node, name = Name} = TgtShard, |
| case find_missing_revs(Target) of |
| [] -> |
| ok; |
| Missing -> |
| lists:map( |
| fun(Chunk) -> |
| Docs = open_docs(Db, Infos, Chunk), |
| ok = save_on_target(Node, Name, Docs) |
| end, |
| chunk_revs(Missing) |
| ) |
| end, |
| update_locals(Target, Db, Seq), |
| Target#tgt{infos = []}. |
| |
| find_missing_revs(#tgt{shard = TgtShard, infos = Infos}) -> |
| #shard{node = Node, name = Name} = TgtShard, |
| IdsRevs = lists:map( |
| fun(FDI) -> |
| #doc_info{id = Id, revs = RevInfos} = couch_doc:to_doc_info(FDI), |
| {Id, [R || #rev_info{rev = R} <- RevInfos]} |
| end, |
| Infos |
| ), |
| Missing = mem3_rpc:get_missing_revs(Node, Name, IdsRevs, [ |
| {io_priority, {internal_repl, Name}}, |
| ?ADMIN_CTX |
| ]), |
| lists:filter( |
| fun |
| ({_Id, [], _Ancestors}) -> false; |
| ({_Id, _Revs, _Ancestors}) -> true |
| end, |
| Missing |
| ). |
| |
| chunk_revs(Revs) -> |
| Limit = list_to_integer(config:get("mem3", "rev_chunk_size", "5000")), |
| chunk_revs(Revs, Limit). |
| |
| chunk_revs(Revs, Limit) -> |
| chunk_revs(Revs, {0, []}, [], Limit). |
| |
| chunk_revs([], {_Count, Chunk}, Chunks, _Limit) -> |
| [Chunk | Chunks]; |
| chunk_revs([{Id, R, A} | Revs], {Count, Chunk}, Chunks, Limit) when length(R) =< Limit - Count -> |
| chunk_revs( |
| Revs, |
| {Count + length(R), [{Id, R, A} | Chunk]}, |
| Chunks, |
| Limit |
| ); |
| chunk_revs([{Id, R, A} | Revs], {Count, Chunk}, Chunks, Limit) -> |
| {This, Next} = lists:split(Limit - Count, R), |
| chunk_revs( |
| [{Id, Next, A} | Revs], |
| {0, []}, |
| [[{Id, This, A} | Chunk] | Chunks], |
| Limit |
| ). |
| |
| open_docs(Db, Infos, Missing) -> |
| lists:flatmap( |
| fun({Id, Revs, _}) -> |
| FDI = lists:keyfind(Id, #full_doc_info.id, Infos), |
| #full_doc_info{rev_tree = RevTree} = FDI, |
| {FoundRevs, _} = couch_key_tree:get_key_leafs(RevTree, Revs), |
| lists:map( |
| fun({#leaf{deleted = IsDel, ptr = SummaryPtr}, FoundRevPath}) -> |
| couch_db:make_doc(Db, Id, IsDel, SummaryPtr, FoundRevPath) |
| end, |
| FoundRevs |
| ) |
| end, |
| Missing |
| ). |
| |
| save_on_target(Node, Name, Docs) -> |
| mem3_rpc:update_docs(Node, Name, Docs, [ |
| ?REPLICATED_CHANGES, |
| full_commit, |
| ?ADMIN_CTX, |
| {io_priority, {internal_repl, Name}} |
| ]), |
| ok. |
| |
| purge_on_target(Node, Name, PurgeInfos) -> |
| mem3_rpc:purge_docs(Node, Name, PurgeInfos, [ |
| ?REPLICATED_CHANGES, |
| full_commit, |
| ?ADMIN_CTX, |
| {io_priority, {internal_repl, Name}} |
| ]), |
| ok. |
| |
| update_locals(Target, Db, Seq) -> |
| #tgt{shard = TgtShard, localid = Id, history = History} = Target, |
| #shard{node = Node, name = Name} = TgtShard, |
| NewEntry = [ |
| {<<"source_node">>, atom_to_binary(node(), utf8)}, |
| {<<"source_uuid">>, couch_db:get_uuid(Db)}, |
| {<<"source_seq">>, Seq}, |
| {<<"timestamp">>, list_to_binary(mem3_util:iso8601_timestamp())} |
| ], |
| NewBody = mem3_rpc:save_checkpoint(Node, Name, Id, Seq, NewEntry, History), |
| {ok, _} = couch_db:update_doc(Db, #doc{id = Id, body = NewBody}, []). |
| |
| purge_cp_body(#shard{} = Source, #shard{} = Target, PurgeSeq) -> |
| {Mega, Secs, _} = os:timestamp(), |
| NowSecs = Mega * 1000000 + Secs, |
| {[ |
| {<<"type">>, <<"internal_replication">>}, |
| {<<"updated_on">>, NowSecs}, |
| {<<"purge_seq">>, PurgeSeq}, |
| {<<"source">>, atom_to_binary(Source#shard.node, latin1)}, |
| {<<"target">>, atom_to_binary(Target#shard.node, latin1)}, |
| {<<"range">>, Source#shard.range} |
| ]}. |
| |
| find_repl_doc(SrcDb, TgtUUIDPrefix) -> |
| SrcUUID = couch_db:get_uuid(SrcDb), |
| S = local_id_hash(SrcUUID), |
| DocIdPrefix = <<"_local/shard-sync-", S/binary, "-">>, |
| FoldFun = fun(#doc{id = DocId, body = {BodyProps}} = Doc, _) -> |
| TgtUUID = couch_util:get_value(<<"target_uuid">>, BodyProps, <<>>), |
| case is_prefix(DocIdPrefix, DocId) of |
| true -> |
| case is_prefix(TgtUUIDPrefix, TgtUUID) of |
| true -> |
| {stop, {TgtUUID, Doc}}; |
| false -> |
| {ok, not_found} |
| end; |
| _ -> |
| {stop, not_found} |
| end |
| end, |
| Options = [{start_key, DocIdPrefix}], |
| case couch_db:fold_local_docs(SrcDb, FoldFun, not_found, Options) of |
| {ok, {TgtUUID, Doc}} -> |
| {ok, TgtUUID, Doc}; |
| {ok, not_found} -> |
| {not_found, missing}; |
| Else -> |
| couch_log:error("Error finding replication doc: ~w", [Else]), |
| {not_found, missing} |
| end. |
| |
| find_split_target_seq_int(TgtDb, Node, SrcUUIDPrefix) -> |
| TgtUUID = couch_db:get_uuid(TgtDb), |
| FoldFun = fun(#doc{body = {Props}}, _) -> |
| DocTgtUUID = couch_util:get_value(<<"target_uuid">>, Props, <<>>), |
| case TgtUUID == DocTgtUUID of |
| true -> |
| {History} = couch_util:get_value(<<"history">>, Props, {[]}), |
| HProps = couch_util:get_value(Node, History, []), |
| case get_target_seqs(HProps, TgtUUID, Node, SrcUUIDPrefix, []) of |
| [] -> |
| % No replication found from source to target |
| {ok, not_found}; |
| [{_, _} | _] = SeqPairs -> |
| % Found shared replicated history from source to target |
| % Return sorted list by the earliest source sequence |
| {stop, lists:sort(SeqPairs)} |
| end; |
| false -> |
| {ok, not_found} |
| end |
| end, |
| Options = [{start_key, <<"_local/shard-sync-">>}], |
| case couch_db:fold_local_docs(TgtDb, FoldFun, not_found, Options) of |
| {ok, Seqs} when is_list(Seqs) -> |
| {ok, Seqs}; |
| {ok, not_found} -> |
| {not_found, missing}; |
| Else -> |
| couch_log:error("Error finding replication doc: ~w", [Else]), |
| {not_found, missing} |
| end. |
| |
| % Get target sequences for each checkpoint when source replicated to the target |
| % The "target" is the current db where the history entry was read from and "source" |
| % is another, now possibly deleted, database. |
| get_target_seqs([], _TgtUUID, _Node, _SrcUUIDPrefix, Acc) -> |
| lists:reverse(Acc); |
| get_target_seqs([{Entry} | HProps], TgtUUID, Node, SrcUUIDPrefix, Acc) -> |
| SameTgt = couch_util:get_value(<<"target_uuid">>, Entry) =:= TgtUUID, |
| SameNode = couch_util:get_value(<<"target_node">>, Entry) =:= Node, |
| SrcUUID = couch_util:get_value(<<"source_uuid">>, Entry), |
| IsPrefix = is_prefix(SrcUUIDPrefix, SrcUUID), |
| Acc1 = |
| case SameTgt andalso SameNode andalso IsPrefix of |
| true -> |
| EntrySourceSeq = couch_util:get_value(<<"source_seq">>, Entry), |
| EntryTargetSeq = couch_util:get_value(<<"target_seq">>, Entry), |
| [{EntrySourceSeq, EntryTargetSeq} | Acc]; |
| false -> |
| Acc |
| end, |
| get_target_seqs(HProps, TgtUUID, Node, SrcUUIDPrefix, Acc1). |
| |
| with_src_db(#acc{source = Source}, Fun) -> |
| case couch_db:open(Source#shard.name, [?ADMIN_CTX]) of |
| {ok, Db} -> |
| try |
| Fun(Db) |
| after |
| couch_db:close(Db) |
| end; |
| {not_found, _} -> |
| error({error, missing_source}) |
| end. |
| |
| is_prefix(Prefix, Subject) -> |
| binary:longest_common_prefix([Prefix, Subject]) == size(Prefix). |
| |
| filter_doc(Filter, FullDocInfo) when is_function(Filter) -> |
| try Filter(FullDocInfo) of |
| discard -> discard; |
| _ -> keep |
| catch |
| _:_ -> |
| keep |
| end; |
| filter_doc(_, _) -> |
| keep. |
| |
| sync_security(#shard{} = Source, #{} = Targets) -> |
| maps:map( |
| fun(_, #tgt{shard = Target}) -> |
| mem3_sync_security:maybe_sync(Source, Target) |
| end, |
| Targets |
| ). |
| |
| targets_map( |
| #shard{name = <<"shards/", _/binary>> = SrcName} = Src, |
| #shard{name = <<"shards/", _/binary>>, node = TgtNode} = Tgt |
| ) -> |
| % Parse range from name in case the passed shard is built with a name only |
| SrcRange = mem3:range(SrcName), |
| Shards0 = mem3:shards(mem3:dbname(SrcName)), |
| Shards1 = [S || S <- Shards0, not shard_eq(S, Src)], |
| Shards2 = [S || S <- Shards1, check_overlap(SrcRange, TgtNode, S)], |
| case [{R, S} || #shard{range = R} = S <- Shards2] of |
| [] -> |
| % If target map is empty, create a target map with just |
| % that one target. This is to support tooling which may be |
| % moving / copying shards using mem3:go/2,3 before the |
| % shards are present in the shard map |
| #{mem3:range(SrcName) => Tgt}; |
| [_ | _] = TMapList -> |
| maps:from_list(TMapList) |
| end; |
| targets_map(_Src, Tgt) -> |
| #{[0, ?RING_END] => Tgt}. |
| |
| shard_eq(#shard{name = Name, node = Node}, #shard{name = Name, node = Node}) -> |
| true; |
| shard_eq(_, _) -> |
| false. |
| |
| check_overlap(SrcRange, Node, #shard{node = Node, range = TgtRange}) -> |
| mem3_util:range_overlap(SrcRange, TgtRange); |
| check_overlap([_, _], _, #shard{}) -> |
| false. |
| |
| reset_remaining(#{} = Targets) -> |
| maps:map( |
| fun(_, #tgt{} = T) -> |
| T#tgt{remaining = 0} |
| end, |
| Targets |
| ). |
| |
| add_range(#shard{name = DbName} = Shard) when is_binary(DbName) -> |
| case DbName of |
| <<"shards/", _Start:8/binary, "-", _End:8/binary, "/", _/binary>> -> |
| Shard#shard{range = mem3:range(DbName)}; |
| <<_/binary>> -> |
| % We may replicate local dbs which do not have a shard range. |
| Shard |
| end. |
| |
| -ifdef(TEST). |
| |
| -include_lib("couch/include/couch_eunit.hrl"). |
| |
| name_node_to_shard_local_db_test() -> |
| DbName = <<"foo">>, |
| Node = 'n1@bar.net', |
| Shard = add_range(#shard{name = DbName, node = Node}), |
| ?assertMatch(#shard{}, Shard), |
| ?assertEqual(DbName, Shard#shard.name), |
| ?assertEqual(Node, Shard#shard.node), |
| ?assertEqual(undefined, Shard#shard.range). |
| |
| name_node_to_shard_local_shard_test() -> |
| DbName = <<"shards/00000000-7fffffff/db.1687450595">>, |
| Node = 'n2@baz.org', |
| Shard = add_range(#shard{name = DbName, node = Node}), |
| ?assertMatch(#shard{}, Shard), |
| ?assertEqual(DbName, Shard#shard.name), |
| ?assertEqual(Node, Shard#shard.node), |
| ?assertEqual([0, 2147483647], Shard#shard.range). |
| |
| find_source_seq_int_test_() -> |
| { |
| setup, |
| fun() -> meck:expect(couch_log, warning, 2, ok) end, |
| fun(_) -> meck:unload() end, |
| with([ |
| ?TDEF(t_unknown_node), |
| ?TDEF(t_unknown_uuid), |
| ?TDEF(t_ok), |
| ?TDEF(t_old_ok), |
| ?TDEF(t_different_node) |
| ]) |
| }. |
| |
| t_unknown_node(_) -> |
| ?assertEqual( |
| find_source_seq_int(doc_(), <<"foo">>, <<"bing">>, <<"bar_uuid">>, 10), |
| 0 |
| ). |
| |
| t_unknown_uuid(_) -> |
| ?assertEqual( |
| find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"teapot">>, 10), |
| 0 |
| ). |
| |
| t_ok(_) -> |
| ?assertEqual( |
| find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 100), |
| 100 |
| ). |
| |
| t_old_ok(_) -> |
| ?assertEqual( |
| find_source_seq_int(doc_(), <<"foo">>, <<"bar">>, <<"bar_uuid">>, 84), |
| 50 |
| ). |
| |
| t_different_node(_) -> |
| ?assertEqual( |
| find_source_seq_int(doc_(), <<"foo2">>, <<"bar">>, <<"bar_uuid">>, 92), |
| 31 |
| ). |
| |
| -define(SNODE, <<"source_node">>). |
| -define(SUUID, <<"source_uuid">>). |
| -define(SSEQ, <<"source_seq">>). |
| -define(TNODE, <<"target_node">>). |
| -define(TUUID, <<"target_uuid">>). |
| -define(TSEQ, <<"target_seq">>). |
| |
| doc_() -> |
| Foo_Bar = [ |
| {[ |
| {?SNODE, <<"foo">>}, |
| {?SUUID, <<"foo_uuid">>}, |
| {?SSEQ, 100}, |
| {?TNODE, <<"bar">>}, |
| {?TUUID, <<"bar_uuid">>}, |
| {?TSEQ, 100} |
| ]}, |
| {[ |
| {?SNODE, <<"foo">>}, |
| {?SUUID, <<"foo_uuid">>}, |
| {?SSEQ, 90}, |
| {?TNODE, <<"bar">>}, |
| {?TUUID, <<"bar_uuid">>}, |
| {?TSEQ, 85} |
| ]}, |
| {[ |
| {?SNODE, <<"foo">>}, |
| {?SUUID, <<"foo_uuid">>}, |
| {?SSEQ, 50}, |
| {?TNODE, <<"bar">>}, |
| {?TUUID, <<"bar_uuid">>}, |
| {?TSEQ, 51} |
| ]}, |
| {[ |
| {?SNODE, <<"foo">>}, |
| {?SUUID, <<"foo_uuid">>}, |
| {?SSEQ, 40}, |
| {?TNODE, <<"bar">>}, |
| {?TUUID, <<"bar_uuid">>}, |
| {?TSEQ, 45} |
| ]}, |
| {[ |
| {?SNODE, <<"foo">>}, |
| {?SUUID, <<"foo_uuid">>}, |
| {?SSEQ, 2}, |
| {?TNODE, <<"bar">>}, |
| {?TUUID, <<"bar_uuid">>}, |
| {?TSEQ, 2} |
| ]} |
| ], |
| Foo2_Bar = [ |
| {[ |
| {?SNODE, <<"foo2">>}, |
| {?SUUID, <<"foo_uuid">>}, |
| {?SSEQ, 100}, |
| {?TNODE, <<"bar">>}, |
| {?TUUID, <<"bar_uuid">>}, |
| {?TSEQ, 100} |
| ]}, |
| {[ |
| {?SNODE, <<"foo2">>}, |
| {?SUUID, <<"foo_uuid">>}, |
| {?SSEQ, 92}, |
| {?TNODE, <<"bar">>}, |
| {?TUUID, <<"bar_uuid">>}, |
| {?TSEQ, 93} |
| ]}, |
| {[ |
| {?SNODE, <<"foo2">>}, |
| {?SUUID, <<"foo_uuid">>}, |
| {?SSEQ, 31}, |
| {?TNODE, <<"bar">>}, |
| {?TUUID, <<"bar_uuid">>}, |
| {?TSEQ, 30} |
| ]} |
| ], |
| History = |
| {[ |
| {<<"foo">>, Foo_Bar}, |
| {<<"foo2">>, Foo2_Bar} |
| ]}, |
| #doc{ |
| body = {[{<<"history">>, History}]} |
| }. |
| |
| targets_map_test_() -> |
| { |
| setup, |
| fun() -> meck:new(mem3, [passthrough]) end, |
| fun(_) -> meck:unload() end, |
| with([ |
| ?TDEF(target_not_a_shard), |
| ?TDEF(source_contained_in_target), |
| ?TDEF(multiple_targets), |
| ?TDEF(uneven_overlap), |
| ?TDEF(target_not_in_shard_map) |
| ]) |
| }. |
| |
| target_not_a_shard(_) -> |
| ?assertEqual(#{[0, ?RING_END] => <<"t">>}, targets_map(<<"s">>, <<"t">>)). |
| |
| source_contained_in_target(_) -> |
| R07 = [16#00000000, 16#7fffffff], |
| R8f = [16#80000000, 16#ffffffff], |
| R0f = [16#00000000, 16#ffffffff], |
| |
| Shards = [ |
| #shard{node = 'n1', range = R07}, |
| #shard{node = 'n1', range = R8f}, |
| #shard{node = 'n2', range = R07}, |
| #shard{node = 'n2', range = R8f}, |
| #shard{node = 'n3', range = R0f} |
| ], |
| meck:expect(mem3, shards, 1, Shards), |
| |
| SrcName1 = <<"shards/00000000-7fffffff/d.1551893552">>, |
| TgtName1 = <<"shards/00000000-7fffffff/d.1551893552">>, |
| |
| Src1 = #shard{name = SrcName1, node = 'n1'}, |
| Tgt1 = #shard{name = TgtName1, node = 'n2'}, |
| Map1 = targets_map(Src1, Tgt1), |
| ?assertEqual(1, map_size(Map1)), |
| ?assertMatch(#{R07 := #shard{node = 'n2'}}, Map1), |
| |
| Tgt2 = #shard{name = TgtName1, node = 'n3'}, |
| Map2 = targets_map(Src1, Tgt2), |
| ?assertEqual(1, map_size(Map2)), |
| ?assertMatch(#{R0f := #shard{node = 'n3'}}, Map2). |
| |
| multiple_targets(_) -> |
| R07 = [16#00000000, 16#7fffffff], |
| R8f = [16#80000000, 16#ffffffff], |
| R0f = [16#00000000, 16#ffffffff], |
| |
| Shards = [ |
| #shard{node = 'n1', range = R07}, |
| #shard{node = 'n1', range = R8f}, |
| #shard{node = 'n2', range = R0f} |
| ], |
| meck:expect(mem3, shards, 1, Shards), |
| |
| SrcName = <<"shards/00000000-ffffffff/d.1551893552">>, |
| TgtName = <<"shards/00000000-7fffffff/d.1551893552">>, |
| |
| Src = #shard{name = SrcName, node = 'n2'}, |
| Tgt = #shard{name = TgtName, node = 'n1'}, |
| Map = targets_map(Src, Tgt), |
| ?assertEqual(2, map_size(Map)), |
| ?assertMatch(#{R07 := #shard{node = 'n1'}}, Map), |
| ?assertMatch(#{R8f := #shard{node = 'n1'}}, Map). |
| |
| uneven_overlap(_) -> |
| R04 = [16#00000000, 16#4fffffff], |
| R26 = [16#20000000, 16#6fffffff], |
| R58 = [16#50000000, 16#8fffffff], |
| R9f = [16#90000000, 16#ffffffff], |
| Shards = [ |
| #shard{node = 'n1', range = R04}, |
| #shard{node = 'n1', range = R58}, |
| #shard{node = 'n1', range = R9f}, |
| #shard{node = 'n2', range = R26} |
| ], |
| |
| meck:expect(mem3, shards, 1, Shards), |
| |
| SrcName = <<"shards/20000000-6fffffff/d.1551893552">>, |
| TgtName = <<"shards/20000000-6fffffff/d.1551893552">>, |
| |
| Src = #shard{name = SrcName, node = 'n2'}, |
| Tgt = #shard{name = TgtName, node = 'n1'}, |
| Map = targets_map(Src, Tgt), |
| ?assertEqual(2, map_size(Map)), |
| ?assertMatch(#{R04 := #shard{node = 'n1'}}, Map), |
| ?assertMatch(#{R58 := #shard{node = 'n1'}}, Map). |
| |
| target_not_in_shard_map(_) -> |
| R0f = [16#00000000, 16#ffffffff], |
| Name = <<"shards/00000000-ffffffff/d.1551893552">>, |
| Shards = [ |
| #shard{name = Name, node = 'n1', range = R0f}, |
| #shard{name = Name, node = 'n2', range = R0f} |
| ], |
| meck:expect(mem3, shards, 1, Shards), |
| Src = #shard{name = Name, node = 'n1'}, |
| Tgt = #shard{name = Name, node = 'n3'}, |
| Map = targets_map(Src, Tgt), |
| ?assertEqual(1, map_size(Map)), |
| ?assertMatch(#{R0f := #shard{name = Name, node = 'n3'}}, Map). |
| |
| -endif. |