blob: e283b50cc72afebd76a43aae595a522feb7e53c0 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_replicator_docs).
-export([
remove_state_fields/3,
update_completed/4,
update_failed/4,
before_doc_update/3,
after_doc_read/2
]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl").
-include("couch_replicator.hrl").
-include_lib("kernel/include/logger.hrl").
-define(OWNER, <<"owner">>).
-define(CTX, {user_ctx, #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}}).
-define(replace(L, K, V), lists:keystore(K, 1, L, {K, V})).
remove_state_fields(null, null, null) ->
ok;
remove_state_fields(DbName, DbUUID, DocId) ->
update_rep_doc(DbName, DbUUID, DocId, [
{?REPLICATION_STATE, undefined},
{?REPLICATION_STATE_TIME, undefined},
{?REPLICATION_STATE_REASON, undefined},
{?REPLICATION_ID, undefined},
{?REPLICATION_STATS, undefined}
]),
ok.
-spec update_completed(binary(), binary(), binary(), [_]) -> ok.
update_completed(null, null, _, _) ->
ok;
update_completed(DbName, DbUUID, DocId, #{} = Stats0) ->
Stats = {maps:to_list(Stats0)},
update_rep_doc(DbName, DbUUID, DocId, [
{?REPLICATION_STATE, ?ST_COMPLETED},
{?REPLICATION_STATE_REASON, undefined},
{?REPLICATION_STATS, Stats}
]),
couch_stats:increment_counter([
couch_replicator,
docs,
completed_state_updates
]),
ok.
-spec update_failed(binary(), binary(), binary(), any()) -> ok.
update_failed(null, null, null, _) ->
ok;
update_failed(DbName, DbUUID, DocId, Error) ->
Reason = error_reason(Error),
?LOG_ERROR(#{
what => replication_failed,
in => replicator,
replicator_db => DbName,
replicator_doc => DocId,
details => Reason
}),
couch_log:error(
"Error processing replication doc `~s` from `~s`: ~s",
[DocId, DbName, Reason]
),
update_rep_doc(DbName, DbUUID, DocId, [
{?REPLICATION_STATE, ?ST_FAILED},
{?REPLICATION_STATS, undefined},
{?REPLICATION_STATE_REASON, Reason}
]),
couch_stats:increment_counter([
couch_replicator,
docs,
failed_state_updates
]),
ok.
-spec before_doc_update(#doc{}, Db :: any(), couch_db:update_type()) -> #doc{}.
before_doc_update(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _, _) ->
Doc;
before_doc_update(#doc{body = {Body}} = Doc, Db, _UpdateType) ->
#user_ctx{roles = Roles, name = Name} = fabric2_db:get_user_ctx(Db),
IsReplicator = lists:member(<<"_replicator">>, Roles),
Doc1 =
case IsReplicator of
true ->
Doc;
false ->
case couch_util:get_value(?OWNER, Body) of
undefined ->
Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
Name ->
Doc;
Other ->
case (catch fabric2_db:check_is_admin(Db)) of
ok when Other =:= null ->
Doc#doc{body = {?replace(Body, ?OWNER, Name)}};
ok ->
Doc;
_ ->
throw(
{forbidden,
<<"Can't update replication",
"documents from other users.">>}
)
end
end
end,
Deleted = Doc1#doc.deleted,
IsFailed = couch_util:get_value(?REPLICATION_STATE, Body) == ?ST_FAILED,
case IsReplicator orelse Deleted orelse IsFailed of
true ->
ok;
false ->
try
couch_replicator_parse:parse_rep_doc(Doc1#doc.body)
catch
throw:{bad_rep_doc, Error} ->
throw({forbidden, Error})
end
end,
Doc1.
-spec after_doc_read(#doc{}, Db :: any()) -> #doc{}.
after_doc_read(#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>} = Doc, _Db) ->
Doc;
after_doc_read(#doc{body = {Body}} = Doc, Db) ->
#user_ctx{name = Name} = fabric2_db:get_user_ctx(Db),
case (catch fabric2_db:check_is_admin(Db)) of
ok ->
Doc;
_ ->
case couch_util:get_value(?OWNER, Body) of
Name ->
Doc;
_ ->
Source0 = couch_util:get_value(<<"source">>, Body),
Target0 = couch_util:get_value(<<"target">>, Body),
Source = strip_credentials(Source0),
Target = strip_credentials(Target0),
NewBody0 = ?replace(Body, <<"source">>, Source),
NewBody = ?replace(NewBody0, <<"target">>, Target),
#doc{revs = {Pos, [_ | Revs]}} = Doc,
NewDoc = Doc#doc{body = {NewBody}, revs = {Pos - 1, Revs}},
fabric2_db:new_revid(Db, NewDoc)
end
end.
update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs) ->
update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs, 1).
update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs, Wait) when
is_binary(RepDocId)
->
try
case open_rep_doc(RepDbName, RepDbUUID, RepDocId) of
{ok, LastRepDoc} ->
update_rep_doc(
RepDbName,
RepDbUUID,
LastRepDoc,
KVs,
Wait * 2
);
_ ->
ok
end
catch
throw:conflict ->
Delay = couch_rand:uniform(erlang:min(128, Wait)) * 100,
?LOG_ERROR(#{
what => replication_doc_conflict,
in => replicator,
replication_db => RepDbName,
replication_doc => RepDocId,
retry_delay_sec => Delay
}),
Msg = "Conflict when updating replication doc `~s`. Retrying.",
couch_log:error(Msg, [RepDocId]),
ok = timer:sleep(Delay),
update_rep_doc(RepDbName, RepDbUUID, RepDocId, KVs, Wait * 2)
end;
update_rep_doc(RepDbName, RepDbUUID, #doc{body = {RepDocBody}} = RepDoc, KVs, _Try) ->
NewRepDocBody = lists:foldl(
fun
({K, undefined}, Body) when is_binary(K) ->
lists:keydelete(K, 1, Body);
({?REPLICATION_STATE = K, State} = KV, Body) when is_binary(K) ->
case couch_util:get_value(K, Body) of
State ->
Body;
_ ->
Body1 = lists:keystore(K, 1, Body, KV),
Timestamp = couch_replicator_utils:iso8601(),
lists:keystore(
?REPLICATION_STATE_TIME,
1,
Body1,
{?REPLICATION_STATE_TIME, Timestamp}
)
end;
({K, _V} = KV, Body) when is_binary(K) ->
lists:keystore(K, 1, Body, KV)
end,
RepDocBody,
KVs
),
case NewRepDocBody of
RepDocBody ->
ok;
_ ->
% Might not succeed - when the replication doc is deleted right
% before this update (not an error, ignore).
save_rep_doc(RepDbName, RepDbUUID, RepDoc#doc{body = {NewRepDocBody}})
end.
open_rep_doc(DbName, DbUUID, DocId) when
is_binary(DbName),
is_binary(DbUUID),
is_binary(DocId)
->
try
case fabric2_db:open(DbName, [?CTX, sys_db, {uuid, DbUUID}]) of
{ok, Db} -> fabric2_db:open_doc(Db, DocId, [ejson_body]);
Else -> Else
end
catch
error:database_does_not_exist ->
{not_found, database_does_not_exist}
end.
save_rep_doc(DbName, DbUUID, Doc) when is_binary(DbName), is_binary(DbUUID) ->
try
{ok, Db} = fabric2_db:open(DbName, [?CTX, sys_db, {uuid, DbUUID}]),
fabric2_db:update_doc(Db, Doc, [])
catch
error:database_does_not_exist ->
{not_found, database_does_not_exist};
% User can accidentally write a VDU which prevents _replicator from
% updating replication documents. Avoid crashing replicator and thus
% preventing all other replication jobs on the node from running.
throw:{forbidden, Reason} ->
?LOG_ERROR(#{
what => replication_doc_update_forbidden,
in => replicator,
replication_db => DbName,
replication_doc => Doc#doc.id,
details => Reason
}),
Msg = "~p VDU or BDU function preventing doc update to ~s ~s ~p",
couch_log:error(Msg, [?MODULE, DbName, Doc#doc.id, Reason]),
{ok, forbidden}
end.
-spec strip_credentials
(undefined) -> undefined;
(binary()) -> binary();
({[_]}) -> {[_]}.
strip_credentials(undefined) ->
undefined;
strip_credentials(Url) when is_binary(Url) ->
re:replace(
Url,
"http(s)?://(?:[^:]+):[^@]+@(.*)$",
"http\\1://\\2",
[{return, binary}]
);
strip_credentials({Props0}) ->
Props1 = lists:keydelete(<<"headers">>, 1, Props0),
% Strip "auth" just like headers, for replication plugins it can be a place
% to stash credential that are not necessarily in headers
Props2 = lists:keydelete(<<"auth">>, 1, Props1),
{Props2}.
error_reason({shutdown, Error}) ->
error_reason(Error);
error_reason({bad_rep_doc, Reason}) ->
couch_util:to_binary(Reason);
error_reason(#{<<"error">> := Error, <<"reason">> := Reason}) when
is_binary(Error), is_binary(Reason)
->
couch_util:to_binary(io_list:format("~s: ~s", [Error, Reason]));
error_reason({error, {Error, Reason}}) when
is_atom(Error), is_binary(Reason)
->
couch_util:to_binary(io_lib:format("~s: ~s", [Error, Reason]));
error_reason({error, Reason}) ->
couch_util:to_binary(Reason);
error_reason(Reason) ->
couch_util:to_binary(Reason).