| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_replicator). |
| |
| -export([ |
| replicate/2, |
| |
| jobs/0, |
| job/1, |
| docs/2, |
| doc/2, |
| |
| after_db_create/2, |
| after_db_delete/2, |
| after_doc_write/6, |
| |
| ensure_rep_db_exists/0, |
| |
| rescan_jobs/0, |
| rescan_jobs/1, |
| reenqueue_jobs/0, |
| reenqueue_jobs/1, |
| remove_jobs/0, |
| get_job_ids/0 |
| ]). |
| |
| -include_lib("ibrowse/include/ibrowse.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| -include("couch_replicator.hrl"). |
| -include_lib("kernel/include/logger.hrl"). |
| |
| -spec replicate({[_]}, any()) -> |
| {ok, {continuous, binary()}} |
| | {ok, #{}} |
| | {ok, {cancelled, binary()}} |
| | {error, any()} |
| | no_return(). |
| replicate(Body, #user_ctx{name = User} = UserCtx) -> |
| {ok, Id, Rep} = couch_replicator_parse:parse_transient_rep(Body, User), |
| #{?OPTIONS := Options} = Rep, |
| JobId = |
| case couch_replicator_jobs:get_job_id(undefined, Id) of |
| {ok, JobId0} -> JobId0; |
| {error, not_found} -> Id |
| end, |
| case maps:get(<<"cancel">>, Options, false) of |
| true -> |
| case check_authorization(JobId, UserCtx) of |
| ok -> cancel_replication(JobId); |
| not_found -> {error, not_found} |
| end; |
| false -> |
| check_authorization(JobId, UserCtx), |
| ok = start_transient_job(JobId, Rep), |
| case maps:get(<<"continuous">>, Options, false) of |
| true -> |
| case couch_replicator_jobs:wait_running(JobId) of |
| {ok, #{?STATE := ?ST_RUNNING} = JobData} -> |
| {ok, {continuous, maps:get(?REP_ID, JobData)}}; |
| {ok, #{?STATE := ?ST_FAILED} = JobData} -> |
| {error, maps:get(?STATE_INFO, JobData)}; |
| {error, Error} -> |
| {error, Error} |
| end; |
| false -> |
| case couch_replicator_jobs:wait_result(JobId) of |
| {ok, #{?STATE := ?ST_COMPLETED} = JobData} -> |
| {ok, maps:get(?CHECKPOINT_HISTORY, JobData)}; |
| {ok, #{?STATE := ?ST_FAILED} = JobData} -> |
| {error, maps:get(?STATE_INFO, JobData)}; |
| {error, Error} -> |
| {error, Error} |
| end |
| end |
| end. |
| |
| jobs() -> |
| FoldFun = fun(_JTx, _JobId, CouchJobsState, JobData, Acc) -> |
| case CouchJobsState of |
| pending -> [job_ejson(JobData) | Acc]; |
| running -> [job_ejson(JobData) | Acc]; |
| finished -> Acc |
| end |
| end, |
| couch_replicator_jobs:fold_jobs(undefined, FoldFun, []). |
| |
| job(Id0) when is_binary(Id0) -> |
| Id1 = couch_replicator_ids:convert(Id0), |
| JobId = |
| case couch_replicator_jobs:get_job_id(undefined, Id1) of |
| {ok, JobId0} -> JobId0; |
| {error, not_found} -> Id1 |
| end, |
| case couch_replicator_jobs:get_job_data(undefined, JobId) of |
| {ok, #{} = JobData} -> {ok, job_ejson(JobData)}; |
| {error, not_found} -> {error, not_found} |
| end. |
| |
| docs(#{} = Db, States) when is_list(States) -> |
| DbName = fabric2_db:name(Db), |
| FoldFun = fun(_JTx, _JobId, _, JobData, Acc) -> |
| case JobData of |
| #{?DB_NAME := DbName, ?STATE := State} -> |
| case {States, lists:member(State, States)} of |
| {[], _} -> [doc_ejson(JobData) | Acc]; |
| {[_ | _], true} -> [doc_ejson(JobData) | Acc]; |
| {[_ | _], false} -> Acc |
| end; |
| #{} -> |
| Acc |
| end |
| end, |
| couch_replicator_jobs:fold_jobs(undefined, FoldFun, []). |
| |
| doc(#{} = Db, DocId) when is_binary(DocId) -> |
| DbUUID = fabric2_db:get_uuid(Db), |
| JobId = couch_replicator_ids:job_id(DbUUID, DocId), |
| case couch_replicator_jobs:get_job_data(undefined, JobId) of |
| {ok, #{} = JobData} -> {ok, doc_ejson(JobData)}; |
| {error, not_found} -> {error, not_found} |
| end. |
| |
| after_db_create(DbName, DbUUID) when ?IS_REP_DB(DbName) -> |
| couch_stats:increment_counter([couch_replicator, docs, dbs_created]), |
| try fabric2_db:open(DbName, [{uuid, DbUUID}, ?ADMIN_CTX]) of |
| {ok, Db} -> |
| fabric2_fdb:transactional(Db, fun(TxDb) -> |
| ok = add_jobs_from_db(TxDb) |
| end) |
| catch |
| error:database_does_not_exist -> |
| ok |
| end; |
| after_db_create(_DbName, _DbUUID) -> |
| ok. |
| |
| after_db_delete(DbName, DbUUID) when ?IS_REP_DB(DbName) -> |
| couch_stats:increment_counter([couch_replicator, docs, dbs_deleted]), |
| FoldFun = fun(JTx, JobId, _, JobData, ok) -> |
| case JobData of |
| #{?DB_UUID := DbUUID} -> |
| ok = couch_replicator_jobs:remove_job(JTx, JobId); |
| #{} -> |
| ok |
| end |
| end, |
| couch_replicator_jobs:fold_jobs(undefined, FoldFun, ok); |
| after_db_delete(_DbName, _DbUUID) -> |
| ok. |
| |
| after_doc_write( |
| #{name := DbName} = Db, |
| #doc{} = Doc, |
| _NewWinner, |
| _OldWinner, |
| _NewRevId, |
| _Seq |
| ) when ?IS_REP_DB(DbName) -> |
| couch_stats:increment_counter([couch_replicator, docs, db_changes]), |
| {Props} = Doc#doc.body, |
| case couch_util:get_value(?REPLICATION_STATE, Props) of |
| ?ST_COMPLETED -> ok; |
| ?ST_FAILED -> ok; |
| _ -> process_change(Db, Doc) |
| end; |
| after_doc_write(_Db, _Doc, _NewWinner, _OldWinner, _NewRevId, _Seq) -> |
| ok. |
| |
| % This is called from supervisor, must return ignore. |
| -spec ensure_rep_db_exists() -> ignore. |
| ensure_rep_db_exists() -> |
| couch_replicator_jobs:set_timeout(), |
| case config:get_boolean("replicator", "create_replicator_db", false) of |
| true -> |
| UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]}, |
| Opts = [{user_ctx, UserCtx}, sys_db], |
| case fabric2_db:create(?REP_DB_NAME, Opts) of |
| {error, file_exists} -> ok; |
| {ok, _Db} -> ok |
| end; |
| false -> |
| ok |
| end, |
| ignore. |
| |
| % Testing and debug functions |
| |
| rescan_jobs() -> |
| rescan_jobs(?REP_DB_NAME). |
| |
| rescan_jobs(DbName) when is_binary(DbName), ?IS_REP_DB(DbName) -> |
| try fabric2_db:open(DbName, [?ADMIN_CTX]) of |
| {ok, Db} -> |
| after_db_create(DbName, fabric2_db:get_uuid(Db)) |
| catch |
| error:database_does_not_exist -> |
| database_does_not_exist |
| end. |
| |
| reenqueue_jobs() -> |
| reenqueue_jobs(?REP_DB_NAME). |
| |
| reenqueue_jobs(DbName) when is_binary(DbName), ?IS_REP_DB(DbName) -> |
| try fabric2_db:open(DbName, [?ADMIN_CTX]) of |
| {ok, Db} -> |
| DbUUID = fabric2_db:get_uuid(Db), |
| ok = after_db_delete(DbName, DbUUID), |
| ok = after_db_create(DbName, DbUUID) |
| catch |
| error:database_does_not_exist -> |
| database_does_not_exist |
| end. |
| |
| remove_jobs() -> |
| % If we clear a large number of jobs make sure to use batching so we don't |
| % take too long, if use individual transactions, and also don't timeout if |
| % use a single transaction |
| FoldFun = fun |
| (_, JobId, _, _, Acc) when length(Acc) > 250 -> |
| couch_replicator_jobs:remove_jobs(undefined, [JobId | Acc]); |
| (_, JobId, _, _, Acc) -> |
| [JobId | Acc] |
| end, |
| Acc = couch_replicator_jobs:fold_jobs(undefined, FoldFun, []), |
| [] = couch_replicator_jobs:remove_jobs(undefined, Acc), |
| ok. |
| |
| get_job_ids() -> |
| couch_replicator_jobs:get_job_ids(undefined). |
| |
| % Private functions |
| |
| -spec start_transient_job(binary(), #{}) -> ok. |
| start_transient_job(JobId, #{} = Rep) -> |
| JobData = couch_replicator_jobs:new_job( |
| Rep, |
| null, |
| null, |
| null, |
| ?ST_PENDING, |
| null, |
| null |
| ), |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> |
| case couch_replicator_jobs:get_job_data(JTx, JobId) of |
| {ok, #{?REP := OldRep, ?STATE := State}} -> |
| SameRep = couch_replicator_utils:compare_reps(Rep, OldRep), |
| Active = State =:= ?ST_PENDING orelse State =:= ?ST_RUNNING, |
| case SameRep andalso Active of |
| true -> |
| % If a job with the same paremeters is running we don't |
| % stop and just ignore the request. This is mainly for |
| % compatibility where users are able to idempotently |
| % POST the same job without it being stopped and |
| % restarted. |
| ok; |
| false -> |
| couch_replicator_jobs:add_job(JTx, JobId, JobData) |
| end; |
| {error, not_found} -> |
| ok = couch_replicator_jobs:add_job(JTx, JobId, JobData) |
| end |
| end). |
| |
| -spec cancel_replication(job_id()) -> |
| {ok, {cancelled, binary()}} | {error, not_found}. |
| cancel_replication(JobId) when is_binary(JobId) -> |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(), fun(JTx) -> |
| Id = |
| case couch_replicator_jobs:get_job_data(JTx, JobId) of |
| {ok, #{?REP_ID := RepId}} when is_binary(RepId) -> |
| RepId; |
| _ -> |
| JobId |
| end, |
| ?LOG_NOTICE(#{what => cancel_replication, in => replicator, id => Id}), |
| couch_log:notice("Canceling replication '~s'", [Id]), |
| case couch_replicator_jobs:remove_job(JTx, JobId) of |
| {error, not_found} -> |
| {error, not_found}; |
| ok -> |
| {ok, {cancelled, Id}} |
| end |
| end). |
| |
| process_change(_Db, #doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) -> |
| ok; |
| process_change(#{} = Db, #doc{deleted = true} = Doc) -> |
| DbUUID = fabric2_db:get_uuid(Db), |
| JobId = couch_replicator_ids:job_id(DbUUID, Doc#doc.id), |
| couch_replicator_jobs:remove_job(undefined, JobId); |
| process_change(#{} = Db, #doc{deleted = false} = Doc) -> |
| #doc{id = DocId, body = {Props} = Body} = Doc, |
| DbName = fabric2_db:name(Db), |
| DbUUID = fabric2_db:get_uuid(Db), |
| {Rep, DocState, Error} = |
| try |
| Rep0 = couch_replicator_parse:parse_rep_doc(Body), |
| DocState0 = couch_util:get_value(?REPLICATION_STATE, Props, null), |
| {Rep0, DocState0, null} |
| catch |
| throw:{bad_rep_doc, Reason} -> |
| {null, null, couch_replicator_utils:rep_error_to_binary(Reason)} |
| end, |
| JobId = couch_replicator_ids:job_id(DbUUID, DocId), |
| JobData = |
| case Rep of |
| null -> |
| couch_relicator_jobs:new_job( |
| Rep, |
| DbName, |
| DbUUID, |
| DocId, |
| ?ST_FAILED, |
| Error, |
| null |
| ); |
| #{} -> |
| couch_replicator_jobs:new_job( |
| Rep, |
| DbName, |
| DbUUID, |
| DocId, |
| ?ST_PENDING, |
| null, |
| DocState |
| ) |
| end, |
| |
| ?LOG_NOTICE(#{ |
| what => replication_update, |
| db => DbName, |
| docid => DocId, |
| job_id => JobId, |
| job_state => DocState |
| }), |
| LogMsg = "~p : replication doc update db:~s doc:~s job_id:~s doc_state:~s", |
| couch_log:notice(LogMsg, [?MODULE, DbName, DocId, JobId, DocState]), |
| |
| couch_jobs_fdb:tx(couch_jobs_fdb:get_jtx(Db), fun(JTx) -> |
| case couch_replicator_jobs:get_job_data(JTx, JobId) of |
| {ok, #{?REP := null, ?STATE_INFO := Error}} when Rep =:= null -> |
| % Same error as before occurred, don't bother updating the job |
| ok; |
| {ok, #{?REP := null}} when Rep =:= null -> |
| % New error so the job is updated |
| couch_replicator_jobs:add_job(JTx, JobId, JobData); |
| {ok, #{?REP := OldRep, ?STATE := State}} when is_map(Rep) -> |
| SameRep = couch_replicator_utils:compare_reps(Rep, OldRep), |
| Active = State =:= ?ST_PENDING orelse State =:= ?ST_RUNNING, |
| case SameRep andalso Active of |
| true -> |
| % Document was changed but none of the parameters |
| % relevent for the replication job have changed, so |
| % make it a no-op |
| ok; |
| false -> |
| couch_replicator_jobs:add_job(JTx, JobId, JobData) |
| end; |
| {error, not_found} -> |
| couch_replicator_jobs:add_job(JTx, JobId, JobData) |
| end |
| end). |
| |
| -spec add_jobs_from_db(#{}) -> ok. |
| add_jobs_from_db(#{} = TxDb) -> |
| FoldFun = fun |
| ({meta, _Meta}, ok) -> |
| {ok, ok}; |
| (complete, ok) -> |
| {ok, ok}; |
| ({row, Row}, ok) -> |
| Db = TxDb#{tx := undefined}, |
| ok = process_change(Db, get_doc(TxDb, Row)), |
| {ok, ok} |
| end, |
| Opts = [{restart_tx, true}], |
| {ok, ok} = fabric2_db:fold_docs(TxDb, FoldFun, ok, Opts), |
| ok. |
| |
| -spec get_doc(#{}, list()) -> #doc{}. |
| get_doc(TxDb, Row) -> |
| {_, DocId} = lists:keyfind(id, 1, Row), |
| {ok, #doc{deleted = false} = Doc} = fabric2_db:open_doc(TxDb, DocId, []), |
| Doc. |
| |
| doc_ejson(#{} = JobData) -> |
| #{ |
| ?REP := Rep, |
| ?REP_ID := RepId, |
| ?DB_NAME := DbName, |
| ?DOC_ID := DocId, |
| ?STATE := State, |
| ?STATE_INFO := Info0, |
| ?ERROR_COUNT := ErrorCount, |
| ?LAST_UPDATED := LastUpdatedSec, |
| ?REP_NODE := Node, |
| ?REP_PID := Pid, |
| ?REP_STATS := Stats |
| } = JobData, |
| |
| #{ |
| ?SOURCE := #{<<"url">> := Source, <<"proxy_url">> := SourceProxy}, |
| ?TARGET := #{<<"url">> := Target, <<"proxy_url">> := TargetProxy}, |
| ?START_TIME := StartSec |
| } = Rep, |
| |
| LastUpdatedISO8601 = couch_replicator_utils:iso8601(LastUpdatedSec), |
| StartISO8601 = couch_replicator_utils:iso8601(StartSec), |
| |
| Info = |
| case State of |
| ?ST_RUNNING -> Stats; |
| ?ST_PENDING -> Stats; |
| _Other -> Info0 |
| end, |
| |
| #{ |
| <<"id">> => RepId, |
| <<"database">> => DbName, |
| <<"doc_id">> => DocId, |
| <<"source">> => ejson_url(Source), |
| <<"target">> => ejson_url(Target), |
| <<"source_proxy">> => ejson_url(SourceProxy), |
| <<"target_proxy">> => ejson_url(TargetProxy), |
| <<"state">> => State, |
| <<"info">> => Info, |
| <<"error_count">> => ErrorCount, |
| <<"last_updated">> => LastUpdatedISO8601, |
| <<"start_time">> => StartISO8601, |
| <<"node">> => Node, |
| <<"pid">> => Pid |
| }. |
| |
| job_ejson(#{} = JobData) -> |
| #{ |
| ?REP := Rep, |
| ?REP_ID := RepId, |
| ?DB_NAME := DbName, |
| ?DOC_ID := DocId, |
| ?STATE := State, |
| ?STATE_INFO := Info0, |
| ?JOB_HISTORY := History, |
| ?REP_STATS := Stats, |
| ?REP_NODE := Node, |
| ?REP_PID := Pid |
| } = JobData, |
| |
| #{ |
| ?SOURCE := #{<<"url">> := Source}, |
| ?TARGET := #{<<"url">> := Target}, |
| ?REP_USER := User, |
| ?START_TIME := StartSec |
| } = Rep, |
| |
| StartISO8601 = couch_replicator_utils:iso8601(StartSec), |
| |
| History1 = lists:map( |
| fun(#{?HIST_TIMESTAMP := Ts} = Evt) -> |
| Evt#{?HIST_TIMESTAMP := couch_replicator_utils:iso8601(Ts)} |
| end, |
| History |
| ), |
| |
| Info = |
| case State of |
| ?ST_RUNNING -> Stats; |
| ?ST_PENDING -> Stats; |
| _Other -> Info0 |
| end, |
| |
| #{ |
| <<"id">> => RepId, |
| <<"database">> => DbName, |
| <<"doc_id">> => DocId, |
| <<"source">> => ejson_url(Source), |
| <<"target">> => ejson_url(Target), |
| <<"state">> => State, |
| <<"info">> => Info, |
| <<"user">> => User, |
| <<"history">> => History1, |
| <<"start_time">> => StartISO8601, |
| <<"node">> => Node, |
| <<"pid">> => Pid |
| }. |
| |
| ejson_url(Url) when is_binary(Url) -> |
| strip_url_creds(Url); |
| ejson_url(null) -> |
| null. |
| |
| -spec strip_url_creds(binary()) -> binary() | null. |
| strip_url_creds(Url) -> |
| try |
| case ibrowse_lib:parse_url(binary_to_list(Url)) of |
| #url{} -> ok; |
| {error, Error} -> error(Error) |
| end, |
| iolist_to_binary(couch_util:url_strip_password(Url)) |
| catch |
| error:_ -> |
| % Avoid exposing any part of the URL in case there is a password in |
| % the malformed endpoint URL |
| null |
| end. |
| |
| -spec check_authorization(rep_id(), #user_ctx{}) -> ok | not_found. |
| check_authorization(JobId, #user_ctx{} = Ctx) when is_binary(JobId) -> |
| #user_ctx{name = Name} = Ctx, |
| case couch_replicator_jobs:get_job_data(undefined, JobId) of |
| {error, not_found} -> |
| not_found; |
| {ok, #{?DB_NAME := DbName}} when is_binary(DbName) -> |
| throw({unauthorized, <<"Persistent replication collision">>}); |
| {ok, #{?REP := #{?REP_USER := Name}}} -> |
| ok; |
| {ok, #{}} -> |
| couch_httpd:verify_is_server_admin(Ctx) |
| end. |
| |
| -ifdef(TEST). |
| |
| -include_lib("eunit/include/eunit.hrl"). |
| -include_lib("fabric/test/fabric2_test.hrl"). |
| |
| authorization_test_() -> |
| { |
| foreach, |
| fun() -> ok end, |
| fun(_) -> meck:unload() end, |
| [ |
| ?TDEF_FE(t_admin_is_always_authorized), |
| ?TDEF_FE(t_username_must_match), |
| ?TDEF_FE(t_replication_not_found) |
| ] |
| }. |
| |
| t_admin_is_always_authorized(_) -> |
| expect_job_data({ok, #{?REP => #{?REP_USER => <<"someuser">>}}}), |
| UserCtx = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]}, |
| ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx)). |
| |
| t_username_must_match(_) -> |
| expect_job_data({ok, #{?REP => #{?REP_USER => <<"user1">>}}}), |
| UserCtx1 = #user_ctx{name = <<"user1">>, roles = [<<"somerole">>]}, |
| ?assertEqual(ok, check_authorization(<<"RepId">>, UserCtx1)), |
| UserCtx2 = #user_ctx{name = <<"other">>, roles = [<<"somerole">>]}, |
| ?assertThrow( |
| {unauthorized, _}, |
| check_authorization( |
| <<"RepId">>, |
| UserCtx2 |
| ) |
| ). |
| |
| t_replication_not_found(_) -> |
| expect_job_data({error, not_found}), |
| UserCtx1 = #user_ctx{name = <<"user">>, roles = [<<"somerole">>]}, |
| ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx1)), |
| UserCtx2 = #user_ctx{name = <<"adm">>, roles = [<<"_admin">>]}, |
| ?assertEqual(not_found, check_authorization(<<"RepId">>, UserCtx2)). |
| |
| expect_job_data(JobDataRes) -> |
| meck:expect(couch_replicator_jobs, get_job_data, 2, JobDataRes). |
| |
| strip_url_creds_test_() -> |
| { |
| setup, |
| fun() -> |
| meck:expect(config, get, fun(_, _, Default) -> Default end) |
| end, |
| fun(_) -> |
| meck:unload() |
| end, |
| with([ |
| ?TDEF(t_strip_http_basic_creds), |
| ?TDEF(t_strip_url_creds_errors) |
| ]) |
| }. |
| |
| t_strip_http_basic_creds(_) -> |
| Url1 = <<"http://adm:pass@host/db/">>, |
| ?assertEqual(<<"http://adm:*****@host/db/">>, strip_url_creds(Url1)), |
| Url2 = <<"https://adm:pass@host/db/">>, |
| ?assertEqual(<<"https://adm:*****@host/db/">>, strip_url_creds(Url2)), |
| Url3 = <<"http://adm:pass@host:80/db/">>, |
| ?assertEqual(<<"http://adm:*****@host:80/db/">>, strip_url_creds(Url3)), |
| Url4 = <<"http://adm:pass@host/db?a=b&c=d">>, |
| ?assertEqual( |
| <<"http://adm:*****@host/db?a=b&c=d">>, |
| strip_url_creds(Url4) |
| ). |
| |
| t_strip_url_creds_errors(_) -> |
| Bad1 = <<"http://adm:pass/bad">>, |
| ?assertEqual(null, strip_url_creds(Bad1)), |
| Bad2 = <<"more garbage">>, |
| ?assertEqual(null, strip_url_creds(Bad2)), |
| Bad3 = <<"http://a:b:c">>, |
| ?assertEqual(null, strip_url_creds(Bad3)), |
| Bad4 = <<"http://adm:pass:pass/bad">>, |
| ?assertEqual(null, strip_url_creds(Bad4)), |
| ?assertEqual(null, strip_url_creds(null)), |
| ?assertEqual(null, strip_url_creds(42)), |
| ?assertEqual(null, strip_url_creds([<<"a">>, <<"b">>])), |
| Bad5 = <<"http://adm:pass/bad">>, |
| ?assertEqual(null, strip_url_creds(Bad5)). |
| |
| -endif. |