use fabric to read/write _replicator docs if sws=true closes https://github.com/apache/couchdb/issues/6029
diff --git a/src/couch_replicator/src/couch_replicator.app.src b/src/couch_replicator/src/couch_replicator.app.src index 4f68c56..8739ea6 100644 --- a/src/couch_replicator/src/couch_replicator.app.src +++ b/src/couch_replicator/src/couch_replicator.app.src
@@ -26,6 +26,7 @@ kernel, stdlib, couch_log, + fabric, mem3, config, couch,
diff --git a/src/couch_replicator/src/couch_replicator_docs.erl b/src/couch_replicator/src/couch_replicator_docs.erl index 8f134aa..8743733 100644 --- a/src/couch_replicator/src/couch_replicator_docs.erl +++ b/src/couch_replicator/src/couch_replicator_docs.erl
@@ -196,7 +196,23 @@ save_rep_doc(RepDbName, RepDoc#doc{body = {NewRepDocBody}}) end. +open_rep_doc(<<"shards/", _/binary>> = DbName, DocId) -> + case serialize_worker_startup() of + true -> + open_rep_doc_fabric(DbName, DocId); + false -> + open_rep_doc_local(DbName, DocId) + end; open_rep_doc(DbName, DocId) -> + open_rep_doc_local(DbName, DocId). + +open_rep_doc_fabric(<<"shards/", _/binary>> = ShardDbName, DocId) -> + DbName = mem3:dbname(ShardDbName), + fabric_call(fun() -> + fabric:open_doc(DbName, DocId, [?CTX]) + end). + +open_rep_doc_local(DbName, DocId) -> ioq:maybe_set_io_priority({system, DbName}), case couch_db:open_int(DbName, [?CTX, sys_db]) of {ok, Db} -> @@ -209,7 +225,33 @@ Else end. +save_rep_doc(<<"shards/", _/binary>> = DbName, Doc) -> + case serialize_worker_startup() of + true -> + save_rep_doc_fabric(DbName, Doc); + false -> + save_rep_doc_local(DbName, Doc) + end; save_rep_doc(DbName, Doc) -> + save_rep_doc_local(DbName, Doc). + +save_rep_doc_fabric(<<"shards/", _/binary>> = ShardDbName, Doc) -> + DbName = mem3:dbname(ShardDbName), + fabric_call(fun() -> + try + fabric:update_doc(DbName, Doc, [?CTX]) + catch + % User can accidentally write a VDU which prevents _replicator from + % updating replication documents. Avoid crashing replicator and thus + % preventing all other replication jobs on the node from running. + throw:{forbidden, Reason} -> + Msg = "~p VDU function preventing doc update to ~s ~s ~p", + couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]), + {ok, forbidden} + end + end). + +save_rep_doc_local(DbName, Doc) -> ioq:maybe_set_io_priority({system, DbName}), {ok, Db} = couch_db:open_int(DbName, [?CTX, sys_db]), try @@ -226,6 +268,34 @@ couch_db:close(Db) end. +serialize_worker_startup() -> + config:get_boolean("fabric", "serialize_worker_startup", true). + +fabric_call(Fun) -> + {Pid, Ref} = spawn_monitor(fun() -> + try Fun() of + Resp -> + exit({exit_ok, Resp}) + catch + throw:Reason -> + exit({exit_throw, Reason}); + error:Reason -> + exit({exit_error, Reason}); + exit:Reason -> + exit({exit_exit, Reason}) + end + end), + receive + {'DOWN', Ref, process, Pid, {exit_ok, Ret}} -> + Ret; + {'DOWN', Ref, process, Pid, {exit_throw, Reason}} -> + throw(Reason); + {'DOWN', Ref, process, Pid, {exit_error, Reason}} -> + error(Reason); + {'DOWN', Ref, process, Pid, {exit_exit, Reason}} -> + exit(Reason) + end. + -spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}. before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db, _UpdateType) -> Doc;