support cluster-local source/target WIP
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 7f0c7ee..ee91b5d 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -639,12 +639,15 @@
init_state(Rep) ->
#rep{
id = {BaseId, _Ext},
- source = Src0, target = Tgt,
+ source = Src0, target = Tgt0,
options = Options, user_ctx = UserCtx,
type = Type, view = View
} = Rep,
+ % note if fabric should be used
+ Src1 = maybe_fabric(Rep, Src0),
+ Tgt = maybe_fabric(Rep, Tgt0),
% Adjust minimum number of http source connections to 2 to avoid deadlock
- Src = adjust_maxconn(Src0, BaseId),
+ Src = adjust_maxconn(Src1, BaseId),
{ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
{ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
get_value(create_target, Options, false)),
@@ -693,6 +696,14 @@
},
State#rep_state{timer = start_timer(State)}.
+%% annotate "local" dbname with fabric tuple if
+%% from a clustered database.
+maybe_fabric(#rep{}, #httpdb{} = HttpDb) ->
+ HttpDb;
+maybe_fabric(#rep{db_name = <<"shards/", _/binary>>}, DbName) ->
+ {fabric, DbName};
+maybe_fabric(#rep{}, DbName) ->
+ DbName.
find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index ff6b00c..11dedc6 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -1,4 +1,4 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
@@ -62,10 +62,12 @@
db_uri(#db{name = Name}) ->
db_uri(Name);
+db_uri(#fabricdb{name = Name}) ->
+ db_uri(Name);
+
db_uri(DbName) ->
?b2l(DbName).
-
db_open(Db, Options) ->
db_open(Db, Options, false).
@@ -121,9 +123,9 @@
true ->
ok = couch_httpd:verify_is_server_admin(
get_value(user_ctx, Options)),
- couch_db:create(DbName, Options)
+ create(DbName, Options)
end,
- case couch_db:open(DbName, Options) of
+ case open(DbName, Options) of
{error, {illegal_database_name, _}} ->
throw({db_not_found, DbName});
{not_found, _Reason} ->
@@ -148,6 +150,9 @@
fun(200, _, {Props}) ->
{ok, Props}
end);
+get_db_info(#fabricdb{name = DbName, user_ctx = UserCtx}) ->
+ fabric:get_security(DbName, [{user_ctx, UserCtx}]),
+ fabric:get_db_info(DbName);
get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
{ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
{ok, Info} = couch_db:get_db_info(Db),
@@ -171,6 +176,9 @@
send_req(Db, Options, fun(200, _, {Props}) ->
{ok, couch_util:get_value(<<"pending">>, Props, null)}
end);
+get_pending_count(#fabricdb{} = Db, Seq) ->
+ Args = #changes_args{since=Seq, limit=0, db_open_options=[{user_ctx, Db#fabricdb.user_ctx}]},
+ with_fabric(changes, [Db#fabricdb.name, fun pending_callback/2, nil, Args]);
get_pending_count(#db{name=DbName}=Db, Seq) when is_number(Seq) ->
{ok, CountDb} = couch_db:open(DbName, [{user_ctx, Db#db.user_ctx}]),
Pending = couch_db:count_changes_since(CountDb, Seq),
@@ -340,6 +348,13 @@
(_, _, {Props}) ->
{error, get_value(<<"error">>, Props)}
end);
+open_doc(#fabricdb{} = Db, Id, Options) ->
+ case with_fabric(open_doc, [Db#fabricdb.name, Id, Options]) of
+ {ok, _} = Ok ->
+ Ok;
+ {not_found, _Reason} ->
+ {error, <<"not_found">>}
+ end;
open_doc(Db, Id, Options) ->
case couch_db:open_doc(Db, Id, Options) of
{ok, _} = Ok ->
@@ -972,3 +987,50 @@
_ ->
Default
end.
+
+%% fabric bits
+
+create({fabric, DbName}, Options) ->
+ with_fabric(create_db, [DbName, Options]);
+create(DbName, Options) ->
+ couch_db:create(DbName, Options).
+
+open({fabric, DbName}, Options) ->
+ with_fabric(get_security, [DbName, Options]),
+ UserCtx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
+ {ok, #fabricdb{name=DbName, user_ctx=UserCtx}};
+open(DbName, Options) ->
+ couch_db:open(DbName, Options).
+
+handle_db_changes(Args, Req, Db) ->
+ couch_changes:handle_db_changes(Args, Req, Db).
+
+pending_callback(start, Acc) ->
+ {ok, Acc};
+pending_callback({stop, _Seq, Pending}, _Acc) ->
+ {ok, Pending}.
+
+with_fabric(F, A) ->
+ {Pid, Ref} = spawn_monitor(fun() ->
+ try apply(fabric, F, A) of
+ Resp ->
+ exit({exit_ok, Resp})
+ catch
+ throw:Reason ->
+ exit({exit_throw, Reason});
+ error:Reason ->
+ exit({exit_error, Reason});
+ exit:Reason ->
+ exit({exit_exit, Reason})
+ end
+ end),
+ receive
+ {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
+ Ret;
+ {'DOWN', Ref, process, Pid, {exit_throw, Reason}} ->
+ throw(Reason);
+ {'DOWN', Ref, process, Pid, {exit_error, Reason}} ->
+ erlang:error(Reason);
+ {'DOWN', Ref, process, Pid, {exit_exit, Reason}} ->
+ erlang:exit(Reason)
+ end.
diff --git a/src/couch_replicator_api_wrap.hrl b/src/couch_replicator_api_wrap.hrl
index eee04da..a29e6a1 100644
--- a/src/couch_replicator_api_wrap.hrl
+++ b/src/couch_replicator_api_wrap.hrl
@@ -27,6 +27,8 @@
http_connections
}).
+-record(fabricdb, {name, user_ctx}).
+
-record(oauth, {
consumer_key,
token,
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 953b1bf..f973cf0 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -514,7 +514,6 @@
}
end.
-
maybe_start_replication(State, DbName, DocId, RepDoc) ->
#rep{id = {BaseId, _} = RepId} = Rep0 = parse_rep_doc(RepDoc),
Rep = Rep0#rep{db_name = DbName},
@@ -567,7 +566,6 @@
end,
Rep.
-
maybe_tag_rep_doc(DbName, DocId, {RepProps}, RepId) ->
case get_json_value(<<"_replication_id">>, RepProps) of
RepId ->