blob: 7a6aec06fcfc76ead840e55ed4c75c9db2285a2f [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_peruser).
-behaviour(gen_server).
-behaviour(mem3_cluster).
-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/include/mem3.hrl").
% gen_server callbacks
-export([
start_link/0,
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-export([init_changes_handler/1, changes_handler/3]).
% mem3_cluster callbacks
-export([
cluster_stable/1,
cluster_unstable/1
]).
-record(changes_state, {
parent :: pid(),
db_name :: binary(),
delete_dbs :: boolean(),
changes_pid :: pid(),
changes_ref :: reference(),
q_for_peruser_db :: integer(),
peruser_dbname_prefix :: binary()
}).
-record(state, {
parent :: pid(),
db_name :: binary(),
delete_dbs :: boolean(),
states :: list(),
mem3_cluster_pid :: pid(),
cluster_stable :: boolean(),
q_for_peruser_db :: integer(),
peruser_dbname_prefix :: binary()
}).
-define(DEFAULT_USERDB_PREFIX, "userdb-").
-define(RELISTEN_DELAY, 5000).
% seconds
-define(DEFAULT_QUIET_PERIOD, 60).
% seconds
-define(DEFAULT_START_PERIOD, 5).
%%
%% Please leave in the commented-out couch_log:debug calls, thanks! — Jan
%%
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec init_state() -> #state{}.
init_state() ->
couch_log:debug("peruser: starting on node ~p in pid ~p", [node(), self()]),
case config:get_boolean("couch_peruser", "enable", false) of
false ->
couch_log:debug("peruser: disabled on node ~p", [node()]),
#state{};
true ->
couch_log:debug("peruser: enabled on node ~p", [node()]),
DbName = ?l2b(
config:get(
"couch_httpd_auth", "authentication_db", "_users"
)
),
ioq:set_io_priority({system, DbName}),
DeleteDbs = config:get_boolean("couch_peruser", "delete_dbs", false),
Q = config:get_integer("couch_peruser", "q", 1),
Prefix = config:get("couch_peruser", "database_prefix", ?DEFAULT_USERDB_PREFIX),
case couch_db:validate_dbname(Prefix) of
ok ->
ok;
Error ->
couch_log:error(
"couch_peruser can't proceed as illegal database prefix ~p.\n"
" Error: ~p",
[Prefix, Error]
),
throw(Error)
end,
% set up cluster-stable listener
Period = abs(
config:get_integer(
"couch_peruser",
"cluster_quiet_period",
?DEFAULT_QUIET_PERIOD
)
),
StartPeriod = abs(
config:get_integer(
"couch_peruser",
"cluster_start_period",
?DEFAULT_START_PERIOD
)
),
{ok, Mem3Cluster} = mem3_cluster:start_link(
?MODULE,
self(),
StartPeriod,
Period
),
#state{
parent = self(),
db_name = DbName,
delete_dbs = DeleteDbs,
mem3_cluster_pid = Mem3Cluster,
cluster_stable = false,
q_for_peruser_db = Q,
peruser_dbname_prefix = ?l2b(Prefix)
}
end.
-spec start_listening(State :: #state{}) -> #state{} | ok.
start_listening(#state{states = ChangesStates} = State) when
length(ChangesStates) > 0
->
% couch_log:debug("peruser: start_listening() already run on node ~p in pid ~p", [node(), self()]),
State;
start_listening(
#state{
db_name = DbName,
delete_dbs = DeleteDbs,
q_for_peruser_db = Q,
peruser_dbname_prefix = Prefix
} = State
) ->
% couch_log:debug("peruser: start_listening() on node ~p", [node()]),
try
States = lists:map(
fun(A) ->
S = #changes_state{
parent = State#state.parent,
db_name = A#shard.name,
delete_dbs = DeleteDbs,
q_for_peruser_db = Q,
peruser_dbname_prefix = Prefix
},
{Pid, Ref} = spawn_opt(
?MODULE, init_changes_handler, [S], [link, monitor]
),
S#changes_state{changes_pid = Pid, changes_ref = Ref}
end,
mem3:local_shards(DbName)
),
% couch_log:debug("peruser: start_listening() States ~p", [States]),
State#state{states = States, cluster_stable = true}
catch
error:database_does_not_exist ->
couch_log:warning(
"couch_peruser can't proceed as underlying database (~s) is missing, disables itself.",
[DbName]
),
config:set(
"couch_peruser",
"enable",
"false",
lists:concat([binary_to_list(DbName), " is missing"])
)
end.
-spec init_changes_handler(ChangesState :: #changes_state{}) -> ok.
init_changes_handler(#changes_state{db_name = DbName} = ChangesState) ->
% couch_log:debug("peruser: init_changes_handler() on DbName ~p", [DbName]),
ioq:set_io_priority({system, DbName}),
try
{ok, Db} = couch_db:open_int(DbName, [?ADMIN_CTX, sys_db]),
FunAcc = {fun ?MODULE:changes_handler/3, ChangesState},
(couch_changes:handle_db_changes(
#changes_args{feed = "continuous", timeout = infinity},
{json_req, null},
Db
))(
FunAcc
)
catch
error:database_does_not_exist ->
ok
end.
-type db_change() :: {atom(), tuple(), binary()}.
-spec changes_handler(
Change :: db_change(),
ResultType :: any(),
ChangesState :: #changes_state{}
) -> #changes_state{}.
changes_handler(
{change, {Doc}, _Prepend},
_ResType,
ChangesState = #changes_state{
db_name = DbName,
q_for_peruser_db = Q,
peruser_dbname_prefix = Prefix
}
) ->
% couch_log:debug("peruser: changes_handler() on DbName/Doc ~p/~p", [DbName, Doc]),
case couch_util:get_value(<<"id">>, Doc) of
<<"org.couchdb.user:", User/binary>> = DocId ->
case should_handle_doc(DbName, DocId) of
true ->
case couch_util:get_value(<<"deleted">>, Doc, false) of
false ->
UserDb = ensure_user_db(Prefix, User, Q),
ok = ensure_security(User, UserDb, fun add_user/3),
ChangesState;
true ->
case ChangesState#changes_state.delete_dbs of
true ->
_UserDb = delete_user_db(Prefix, User),
ChangesState;
false ->
UserDb = user_db_name(Prefix, User),
ok = ensure_security(User, UserDb, fun remove_user/3),
ChangesState
end
end;
false ->
ChangesState
end;
_ ->
ChangesState
end;
changes_handler(_Event, _ResType, ChangesState) ->
ChangesState.
-spec should_handle_doc(ShardName :: binary(), DocId :: binary()) -> boolean().
should_handle_doc(ShardName, DocId) ->
case is_stable() of
false ->
% when the cluster is unstable, we have already stopped all Listeners
% the next stable event will restart all listeners and pick up this
% doc change
couch_log:debug(
"peruser: skipping, cluster unstable ~s/~s",
[ShardName, DocId]
),
false;
true ->
should_handle_doc_int(ShardName, DocId)
end.
-spec should_handle_doc_int(
ShardName :: binary(),
DocId :: binary()
) -> boolean().
should_handle_doc_int(ShardName, DocId) ->
DbName = mem3:dbname(ShardName),
Live = [erlang:node() | erlang:nodes()],
Shards = mem3:shards(DbName, DocId),
Nodes = [N || #shard{node = N} <- Shards, lists:member(N, Live)],
case mem3:owner(DbName, DocId, Nodes) of
ThisNode when ThisNode =:= node() ->
couch_log:debug("peruser: handling ~s/~s", [DbName, DocId]),
% do the database action
true;
_OtherNode ->
couch_log:debug("peruser: skipping ~s/~s", [DbName, DocId]),
false
end.
-spec delete_user_db(Prefix :: binary(), User :: binary()) -> binary().
delete_user_db(Prefix, User) ->
UserDb = user_db_name(Prefix, User),
try
case fabric:delete_db(UserDb, [?ADMIN_CTX]) of
ok -> ok;
accepted -> ok
end
catch
error:database_does_not_exist ->
ok
end,
UserDb.
-spec ensure_user_db(Prefix :: binary(), User :: binary(), Q :: integer()) -> binary().
ensure_user_db(Prefix, User, Q) ->
UserDb = user_db_name(Prefix, User),
try
{ok, _DbInfo} = fabric:get_db_info(UserDb)
catch
error:database_does_not_exist ->
case fabric:create_db(UserDb, [?ADMIN_CTX, {q, integer_to_list(Q)}]) of
{error, file_exists} -> ok;
ok -> ok;
accepted -> ok
end
end,
UserDb.
-spec add_user(
User :: binary(),
Properties :: tuple(),
Acc :: tuple()
) -> tuple().
add_user(User, Prop, {Modified, SecProps}) ->
{PropValue} = couch_util:get_value(Prop, SecProps, {[]}),
Names = couch_util:get_value(<<"names">>, PropValue, []),
case lists:member(User, Names) of
true ->
{Modified, SecProps};
false ->
{true,
lists:keystore(
Prop,
1,
SecProps,
{Prop,
{lists:keystore(
<<"names">>,
1,
PropValue,
{<<"names">>, [User | Names]}
)}}
)}
end.
-spec remove_user(
User :: binary(),
Properties :: tuple(),
Acc :: tuple()
) -> tuple().
remove_user(User, Prop, {Modified, SecProps}) ->
{PropValue} = couch_util:get_value(Prop, SecProps, {[]}),
Names = couch_util:get_value(<<"names">>, PropValue, []),
case lists:member(User, Names) of
false ->
{Modified, SecProps};
true ->
{true,
lists:keystore(
Prop,
1,
SecProps,
{Prop,
{lists:keystore(
<<"names">>,
1,
PropValue,
{<<"names">>, lists:delete(User, Names)}
)}}
)}
end.
-spec ensure_security(
User :: binary(),
UserDb :: binary(),
TransformFun :: fun()
) -> ok.
ensure_security(User, UserDb, TransformFun) ->
case fabric:get_all_security(UserDb, [?ADMIN_CTX]) of
{error, no_majority} ->
% TODO: make sure this is still true: single node, ignore
ok;
{ok, Shards} ->
{_ShardInfo, {SecProps}} = hd(Shards),
% assert that shards have the same security object
true = lists:all(
fun({_, {SecProps1}}) ->
SecProps =:= SecProps1
end,
Shards
),
case
lists:foldl(
fun(Prop, SAcc) -> TransformFun(User, Prop, SAcc) end,
{false, SecProps},
[<<"admins">>, <<"members">>]
)
of
{false, _} ->
ok;
{true, SecProps1} ->
ok = fabric:set_security(UserDb, {SecProps1}, [?ADMIN_CTX])
end
end.
-spec user_db_name(Prefix :: binary(), User :: binary()) -> binary().
user_db_name(Prefix, User) ->
HexUser = list_to_binary(
[string:to_lower(integer_to_list(X, 16)) || <<X>> <= User]
),
<<Prefix/binary, HexUser/binary>>.
-spec exit_changes(State :: #state{}) -> ok.
exit_changes(State) ->
lists:foreach(
fun(ChangesState) ->
demonitor(ChangesState#changes_state.changes_ref, [flush]),
unlink(ChangesState#changes_state.changes_pid),
exit(ChangesState#changes_state.changes_pid, kill)
end,
State#state.states
).
-spec is_stable() -> true | false.
is_stable() ->
gen_server:call(?MODULE, is_stable).
-spec subscribe_for_changes() -> ok.
subscribe_for_changes() ->
config:subscribe_for_changes([
{"couch_httpd_auth", "authentication_db"},
"couch_peruser"
]).
% Mem3 cluster callbacks
% TODO: find out what type Server is
-spec cluster_unstable(Server :: any()) -> any().
cluster_unstable(Server) ->
gen_server:cast(Server, cluster_unstable),
Server.
% TODO: find out what type Server is
-spec cluster_stable(Server :: any()) -> any().
cluster_stable(Server) ->
gen_server:cast(Server, cluster_stable),
Server.
%% gen_server callbacks
-spec init(Options :: list()) -> {ok, #state{}}.
init([]) ->
ok = subscribe_for_changes(),
{ok, init_state()}.
handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
{reply, IsStable, State};
handle_call(_Msg, _From, State) ->
{reply, error, State}.
handle_cast(update_config, State) when State#state.states =/= undefined ->
exit_changes(State),
{noreply, init_state()};
handle_cast(update_config, _) ->
{noreply, init_state()};
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(cluster_unstable, State) when State#state.states =/= undefined ->
exit_changes(State),
{noreply, init_state()};
handle_cast(cluster_unstable, _) ->
{noreply, init_state()};
handle_cast(cluster_stable, State) ->
{noreply, start_listening(State)};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', _Ref, _, _, _Reason}, State) ->
{stop, normal, State};
handle_info({config_change, "couch_peruser", _, _, _}, State) ->
handle_cast(update_config, State);
handle_info(
{
config_change,
"couch_httpd_auth",
"authentication_db",
_,
_
},
State
) ->
handle_cast(update_config, State);
handle_info({gen_event_EXIT, _Handler, _Reason}, State) ->
erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener),
{noreply, State};
handle_info({'EXIT', _Pid, _Reason}, State) ->
erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener),
{noreply, State};
handle_info(restart_config_listener, State) ->
ok = subscribe_for_changes(),
{noreply, State};
handle_info(_Msg, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
%% Everything should be linked or monitored, let nature
%% take its course.
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.