blob: 5bf98b6fd16219edd717235f972cde002c3cf090 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_db_split).
-export([
split/3,
copy_local_docs/3,
cleanup_target/2
]).
-include_lib("couch/include/couch_db.hrl").
-define(DEFAULT_BUFFER_SIZE, 16777216).
-record(state, {
source_db,
source_uuid,
targets,
pickfun,
max_buffer_size = ?DEFAULT_BUFFER_SIZE,
hashfun
}).
-record(target, {
db,
uuid,
buffer = [],
buffer_size = 0
}).
-record(racc, {
id,
source_db,
target_db,
active = 0,
external = 0,
atts = []
}).
% Public API
split(Source, #{} = Targets, PickFun) when
map_size(Targets) >= 2, is_function(PickFun, 3) ->
case couch_db:open_int(Source, [?ADMIN_CTX]) of
{ok, SourceDb} ->
Engine = get_engine(SourceDb),
Partitioned = couch_db:is_partitioned(SourceDb),
HashFun = mem3_hash:get_hash_fun(couch_db:name(SourceDb)),
try
split(SourceDb, Partitioned, Engine, Targets, PickFun, HashFun)
catch
throw:{target_create_error, DbName, Error, TargetDbs} ->
cleanup_targets(TargetDbs, Engine),
{error, {target_create_error, DbName, Error}}
after
couch_db:close(SourceDb)
end;
{not_found, _} ->
{error, missing_source}
end.
copy_local_docs(Source, #{} = Targets0, PickFun) when
is_binary(Source), is_function(PickFun, 3) ->
case couch_db:open_int(Source, [?ADMIN_CTX]) of
{ok, SourceDb} ->
try
Targets = maps:map(fun(_, DbName) ->
{ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX]),
#target{db = Db, uuid = couch_db:get_uuid(Db)}
end, Targets0),
SourceName = couch_db:name(SourceDb),
try
State = #state{
source_db = SourceDb,
source_uuid = couch_db:get_uuid(SourceDb),
targets = Targets,
pickfun = PickFun,
hashfun = mem3_hash:get_hash_fun(SourceName)
},
copy_local_docs(State),
ok
after
maps:map(fun(_, #target{db = Db} = T) ->
couch_db:close(Db),
T#target{db = undefined}
end, Targets)
end
after
couch_db:close(SourceDb)
end;
{not_found, _} ->
{error, missing_source}
end.
cleanup_target(Source, Target) when is_binary(Source), is_binary(Target) ->
case couch_db:open_int(Source, [?ADMIN_CTX]) of
{ok, SourceDb} ->
try
delete_target(Target, get_engine(SourceDb))
after
couch_db:close(SourceDb)
end;
{not_found, _} ->
{error, missing_source}
end.
% Private Functions
split(SourceDb, Partitioned, Engine, Targets0, PickFun, {M, F, A} = HashFun) ->
Targets = maps:fold(fun(Key, DbName, Map) ->
case couch_db:validate_dbname(DbName) of
ok ->
ok;
{error, E} ->
throw({target_create_error, DbName, E, Map})
end,
{ok, Filepath} = couch_server:get_engine_path(DbName, Engine),
Opts = [create, ?ADMIN_CTX] ++ case Partitioned of
true -> [{props, [{partitioned, true}, {hash, [M, F, A]}]}];
false -> []
end,
case couch_db:start_link(Engine, DbName, Filepath, Opts) of
{ok, Db} ->
Map#{Key => #target{db = Db}};
{error, Error} ->
throw({target_create_error, DbName, Error, Map})
end
end, #{}, Targets0),
Seq = couch_db:get_update_seq(SourceDb),
State1 = #state{
source_db = SourceDb,
targets = Targets,
pickfun = PickFun,
hashfun = HashFun,
max_buffer_size = get_max_buffer_size()
},
State2 = copy_docs(State1),
State3 = copy_checkpoints(State2),
State4 = copy_meta(State3),
State5 = copy_purge_info(State4),
State6 = set_targets_update_seq(State5),
stop_targets(State6#state.targets),
{ok, Seq}.
cleanup_targets(#{} = Targets, Engine) ->
maps:map(fun(_, #target{db = Db} = T) ->
ok = stop_target_db(Db),
delete_target(couch_db:name(Db), Engine),
T
end, Targets).
stop_targets(#{} = Targets) ->
maps:map(fun(_, #target{db = Db} = T) ->
{ok, Db1} = couch_db_engine:commit_data(Db),
ok = stop_target_db(Db1),
T
end, Targets).
stop_target_db(Db) ->
couch_db:close(Db),
Pid = couch_db:get_pid(Db),
catch unlink(Pid),
catch exit(Pid, kill),
ok.
delete_target(DbName, Engine) ->
RootDir = config:get("couchdb", "database_dir", "."),
{ok, Filepath} = couch_server:get_engine_path(DbName, Engine),
DelOpt = [{context, compaction}, sync],
couch_db_engine:delete(Engine, RootDir, Filepath, DelOpt).
pick_target(DocId, #state{} = State, #{} = Targets) ->
#state{pickfun = PickFun, hashfun = HashFun} = State,
Key = PickFun(DocId, maps:keys(Targets), HashFun),
{Key, maps:get(Key, Targets)}.
set_targets_update_seq(#state{targets = Targets} = State) ->
Seq = couch_db:get_update_seq(State#state.source_db),
Targets1 = maps:map(fun(_, #target{db = Db} = Target) ->
{ok, Db1} = couch_db_engine:set_update_seq(Db, Seq),
Target#target{db = Db1}
end, Targets),
State#state{targets = Targets1}.
copy_checkpoints(#state{} = State) ->
#state{source_db = Db, source_uuid = SrcUUID, targets = Targets} = State,
FoldFun = fun(#doc{id = Id} = Doc, Acc) ->
UpdatedAcc = case Id of
<<?LOCAL_DOC_PREFIX, "shard-sync-", _/binary>> ->
% Transform mem3 internal replicator checkpoints to avoid
% rewinding the changes feed when it sees the new shards
maps:map(fun(_, #target{uuid = TgtUUID, buffer = Docs} = T) ->
Doc1 = update_checkpoint_doc(SrcUUID, TgtUUID, Doc),
T#target{buffer = [Doc1 | Docs]}
end, Acc);
<<?LOCAL_DOC_PREFIX, "purge-", _/binary>> ->
% Copy purge checkpoints to all shards
maps:map(fun(_, #target{buffer = Docs} = T) ->
T#target{buffer = [Doc | Docs]}
end, Acc);
<<?LOCAL_DOC_PREFIX, _/binary>> ->
% Skip copying these that will be done during
% local docs top off right before the shards are switched
Acc
end,
{ok, UpdatedAcc}
end,
{ok, Targets1} = couch_db_engine:fold_local_docs(Db, FoldFun, Targets, []),
Targets2 = maps:map(fun(_, #target{db = TDb, buffer = Docs} = T) ->
case Docs of
[] ->
T;
[_ | _] ->
Docs1 = lists:reverse(Docs),
{ok, TDb1} = couch_db_engine:write_doc_infos(TDb, [], Docs1),
{ok, TDb2} = couch_db_engine:commit_data(TDb1),
T#target{db = TDb2, buffer = []}
end
end, Targets1),
State#state{targets = Targets2}.
update_checkpoint_doc(Old, New, #doc{body = {Props}} = Doc) ->
NewProps = case couch_util:get_value(<<"target_uuid">>, Props) of
Old ->
replace_kv(Props, {<<"target_uuid">>, Old, New});
Other when is_binary(Other) ->
replace_kv(Props, {<<"source_uuid">>, Old, New})
end,
NewId = update_checkpoint_id(Doc#doc.id, Old, New),
Doc#doc{id = NewId, body = {NewProps}}.
update_checkpoint_id(Id, Old, New) ->
OldHash = mem3_rep:local_id_hash(Old),
NewHash = mem3_rep:local_id_hash(New),
binary:replace(Id, OldHash, NewHash).
replace_kv({[]}, _) ->
{[]};
replace_kv({KVs}, Replacement) ->
{[replace_kv(KV, Replacement) || KV <- KVs]};
replace_kv([], _) ->
[];
replace_kv(List, Replacement) when is_list(List) ->
[replace_kv(V, Replacement) || V <- List];
replace_kv({K, V}, {K, V, NewV}) ->
{K, NewV};
replace_kv({K, V}, Replacement) ->
{K, replace_kv(V, Replacement)};
replace_kv(V, _) ->
V.
copy_meta(#state{source_db = SourceDb, targets = Targets} = State) ->
RevsLimit = couch_db:get_revs_limit(SourceDb),
{SecProps} = couch_db:get_security(SourceDb),
PurgeLimit = couch_db:get_purge_infos_limit(SourceDb),
Targets1 = maps:map(fun(_, #target{db = Db} = T) ->
{ok, Db1} = couch_db_engine:set_revs_limit(Db, RevsLimit),
{ok, Db2} = couch_db_engine:set_security(Db1, SecProps),
{ok, Db3} = couch_db_engine:set_purge_infos_limit(Db2, PurgeLimit),
T#target{db = Db3}
end, Targets),
State#state{targets = Targets1}.
copy_purge_info(#state{source_db = Db} = State) ->
{ok, NewState} = couch_db:fold_purge_infos(Db, 0, fun purge_cb/2, State),
Targets = maps:map(fun(_, #target{} = T) ->
commit_purge_infos(T)
end, NewState#state.targets),
NewState#state{targets = Targets}.
acc_and_flush(Item, #target{}= Target, MaxBuffer, FlushCb) ->
#target{buffer = Buffer, buffer_size = BSize} = Target,
BSize1 = BSize + ?term_size(Item),
Target1 = Target#target{buffer = [Item | Buffer], buffer_size = BSize1},
case BSize1 > MaxBuffer of
true -> FlushCb(Target1);
false -> Target1
end.
purge_cb({_PSeq, _UUID, Id, _Revs} = PI, #state{targets = Targets} = State) ->
{Key, Target} = pick_target(Id, State, Targets),
MaxBuffer = State#state.max_buffer_size,
Target1 = acc_and_flush(PI, Target, MaxBuffer, fun commit_purge_infos/1),
{ok, State#state{targets = Targets#{Key => Target1}}}.
commit_purge_infos(#target{buffer = [], db = Db} = Target) ->
Target#target{db = Db};
commit_purge_infos(#target{buffer = PIs0, db = Db} = Target) ->
PIs = lists:reverse(PIs0),
{ok, Db1} = couch_db_engine:copy_purge_infos(Db, PIs),
{ok, Db2} = couch_db_engine:commit_data(Db1),
Target#target{buffer = [], buffer_size = 0, db = Db2}.
copy_docs(#state{source_db = Db} = State) ->
{ok, NewState} = couch_db:fold_changes(Db, 0, fun changes_cb/2, State),
CommitTargets = maps:map(fun(_, #target{} = T) ->
commit_docs(T)
end, NewState#state.targets),
NewState#state{targets = CommitTargets}.
% Backwards compatibility clause. Seq trees used to hold #doc_infos at one time
changes_cb(#doc_info{id = Id}, #state{source_db = Db} = State) ->
[FDI = #full_doc_info{}] = couch_db_engine:open_docs(Db, [Id]),
changes_cb(FDI, State);
changes_cb(#full_doc_info{id = Id} = FDI, #state{} = State) ->
#state{source_db = SourceDb, targets = Targets} = State,
{Key, Target} = pick_target(Id, State, Targets),
FDI1 = process_fdi(FDI, SourceDb, Target#target.db),
MaxBuffer = State#state.max_buffer_size,
Target1 = acc_and_flush(FDI1, Target, MaxBuffer, fun commit_docs/1),
{ok, State#state{targets = Targets#{Key => Target1}}}.
commit_docs(#target{buffer = [], db = Db} = Target) ->
Target#target{db = Db};
commit_docs(#target{buffer = FDIs, db = Db} = Target) ->
Pairs = [{not_found, FDI} || FDI <- lists:reverse(FDIs)],
{ok, Db1} = couch_db_engine:write_doc_infos(Db, Pairs, []),
{ok, Db2} = couch_db_engine:commit_data(Db1),
Target#target{buffer = [], buffer_size = 0, db = Db2}.
process_fdi(FDI, SourceDb, TargetDb) ->
#full_doc_info{id = Id, rev_tree = RTree} = FDI,
Acc = #racc{id = Id, source_db = SourceDb, target_db = TargetDb},
{NewRTree, NewAcc} = couch_key_tree:mapfold(fun revtree_cb/4, Acc, RTree),
{Active, External} = total_sizes(NewAcc),
FDI#full_doc_info{
rev_tree = NewRTree,
sizes = #size_info{active = Active, external = External}
}.
revtree_cb(_Rev, _Leaf, branch, Acc) ->
{[], Acc};
revtree_cb({Pos, RevId}, Leaf, leaf, Acc) ->
#racc{id = Id, source_db = SourceDb, target_db = TargetDb} = Acc,
#leaf{deleted = Deleted, ptr = Ptr, sizes = LeafSizes} = Leaf,
Doc0 = #doc{
id = Id,
revs = {Pos, [RevId]},
deleted = Deleted,
body = Ptr
},
Doc1 = couch_db_engine:read_doc_body(SourceDb, Doc0),
#doc{body = Body, atts = AttInfos0} = Doc1,
External = case LeafSizes#size_info.external of
0 when is_binary(Body) ->
couch_compress:uncompressed_size(Body);
0 ->
couch_ejson_size:encoded_size(Body);
N -> N
end,
AttInfos = if not is_binary(AttInfos0) -> AttInfos0; true ->
couch_compress:decompress(AttInfos0)
end,
Atts = [process_attachment(Att, SourceDb, TargetDb) || Att <- AttInfos],
Doc2 = Doc1#doc{atts = Atts},
Doc3 = couch_db_engine:serialize_doc(TargetDb, Doc2),
{ok, Doc4, Active} = couch_db_engine:write_doc_body(TargetDb, Doc3),
% element(3,...) and (4,...) are the stream pointer and size respecitively
% (see couch_att.erl) They are numeric for compatibility with older formats
AttSizes = [{element(3, A), element(4, A)} || A <- Atts],
NewLeaf = Leaf#leaf{
ptr = Doc4#doc.body,
sizes = #size_info{active = Active, external = External},
atts = AttSizes
},
{NewLeaf, add_sizes(Active, External, AttSizes, Acc)}.
% This is copied almost verbatim from the compactor
process_attachment({Name, Type, BinSp, AttLen, RevPos, ExpectedMd5}, SourceDb,
TargetDb) ->
% 010 upgrade code
{ok, SrcStream} = couch_db_engine:open_read_stream(SourceDb, BinSp),
{ok, DstStream} = couch_db_engine:open_write_stream(TargetDb, []),
ok = couch_stream:copy(SrcStream, DstStream),
{NewStream, AttLen, AttLen, ActualMd5, _IdentityMd5} =
couch_stream:close(DstStream),
{ok, NewBinSp} = couch_stream:to_disk_term(NewStream),
couch_util:check_md5(ExpectedMd5, ActualMd5),
{Name, Type, NewBinSp, AttLen, AttLen, RevPos, ExpectedMd5, identity};
process_attachment({Name, Type, BinSp, AttLen, DiskLen, RevPos, ExpectedMd5,
Enc1}, SourceDb, TargetDb) ->
{ok, SrcStream} = couch_db_engine:open_read_stream(SourceDb, BinSp),
{ok, DstStream} = couch_db_engine:open_write_stream(TargetDb, []),
ok = couch_stream:copy(SrcStream, DstStream),
{NewStream, AttLen, _, ActualMd5, _IdentityMd5} =
couch_stream:close(DstStream),
{ok, NewBinSp} = couch_stream:to_disk_term(NewStream),
couch_util:check_md5(ExpectedMd5, ActualMd5),
Enc = case Enc1 of
true -> gzip; % 0110 upgrade code
false -> identity; % 0110 upgrade code
_ -> Enc1
end,
{Name, Type, NewBinSp, AttLen, DiskLen, RevPos, ExpectedMd5, Enc}.
get_engine(Db) ->
{ok, DbInfoProps} = couch_db:get_db_info(Db),
proplists:get_value(engine, DbInfoProps).
add_sizes(Active, External, Atts, #racc{} = Acc) ->
#racc{active = ActiveAcc, external = ExternalAcc, atts = AttsAcc} = Acc,
NewActiveAcc = ActiveAcc + Active,
NewExternalAcc = ExternalAcc + External,
NewAttsAcc = lists:umerge(Atts, AttsAcc),
Acc#racc{
active = NewActiveAcc,
external = NewExternalAcc,
atts = NewAttsAcc
}.
total_sizes(#racc{active = Active, external = External, atts = Atts}) ->
TotalAtts = lists:foldl(fun({_, S}, A) -> S + A end, 0, Atts),
{Active + TotalAtts, External + TotalAtts}.
get_max_buffer_size() ->
config:get_integer("reshard", "split_buffer_size", ?DEFAULT_BUFFER_SIZE).
copy_local_docs(#state{source_db = Db, targets = Targets} = State) ->
FoldFun = fun(#doc{id = Id} = Doc, Acc) ->
UpdatedAcc = case Id of
<<?LOCAL_DOC_PREFIX, "shard-sync-", _/binary>> ->
Acc;
<<?LOCAL_DOC_PREFIX, "purge-", _/binary>> ->
Acc;
<<?LOCAL_DOC_PREFIX, _/binary>> ->
% Users' and replicator app's checkpoints go to their
% respective shards based on the general hashing algorithm
{Key, Target} = pick_target(Id, State, Acc),
#target{buffer = Docs} = Target,
Acc#{Key => Target#target{buffer = [Doc | Docs]}}
end,
{ok, UpdatedAcc}
end,
{ok, Targets1} = couch_db:fold_local_docs(Db, FoldFun, Targets, []),
Targets2 = maps:map(fun(_, #target{db = TDb, buffer = Docs} = T) ->
case Docs of
[] ->
T;
[_ | _] ->
Docs1 = lists:reverse(Docs),
{ok, _} = couch_db:update_docs(TDb, Docs1),
T#target{buffer = []}
end
end, Targets1),
State#state{targets = Targets2}.