| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(smoosh_server). |
| -behaviour(gen_server). |
| -vsn(4). |
| -behaviour(config_listener). |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| % public api. |
| -export([ |
| start_link/0, |
| suspend/0, |
| resume/0, |
| enqueue/1, |
| sync_enqueue/1, |
| sync_enqueue/2, |
| handle_db_event/3, |
| status/0 |
| ]). |
| |
| -define(SECONDS_PER_MINUTE, 60). |
| |
| % gen_server api. |
| -export([ |
| init/1, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| code_change/3, |
| terminate/2 |
| ]). |
| |
| % config_listener api |
| -export([handle_config_change/5, handle_config_terminate/3]). |
| |
| % exported but for internal use. |
| -export([enqueue_request/2]). |
| -export([get_priority/2]). |
| |
| % exported for testing and debugging |
| -export([get_channel/1]). |
| |
| -ifdef(TEST). |
| -define(RELISTEN_DELAY, 50). |
| -else. |
| -define(RELISTEN_DELAY, 5000). |
| -endif. |
| |
| % private records. |
| |
| -record(state, { |
| db_channels = [], |
| view_channels = [], |
| tab, |
| event_listener, |
| waiting = #{}, |
| waiting_by_ref = #{} |
| }). |
| |
| -record(channel, { |
| name, |
| pid |
| }). |
| |
| % public functions. |
| |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| suspend() -> |
| gen_server:call(?MODULE, suspend). |
| |
| resume() -> |
| gen_server:call(?MODULE, resume). |
| |
| status() -> |
| gen_server:call(?MODULE, status). |
| |
| enqueue(Object) -> |
| gen_server:cast(?MODULE, {enqueue, Object}). |
| |
| sync_enqueue(Object) -> |
| gen_server:call(?MODULE, {enqueue, Object}). |
| |
| sync_enqueue(Object, Timeout) -> |
| gen_server:call(?MODULE, {enqueue, Object}, Timeout). |
| |
| handle_db_event(DbName, local_updated, St) -> |
| enqueue(DbName), |
| {ok, St}; |
| handle_db_event(DbName, updated, St) -> |
| enqueue(DbName), |
| {ok, St}; |
| handle_db_event(DbName, {index_commit, IdxName}, St) -> |
| enqueue({DbName, IdxName}), |
| {ok, St}; |
| handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) -> |
| enqueue({DbName, IdxName}), |
| {ok, St}; |
| handle_db_event(_DbName, _Event, St) -> |
| {ok, St}. |
| |
| % for testing and debugging only |
| get_channel(ChannelName) -> |
| gen_server:call(?MODULE, {get_channel, ChannelName}). |
| |
| % gen_server functions. |
| |
| init([]) -> |
| process_flag(trap_exit, true), |
| ok = config:listen_for_changes(?MODULE, nil), |
| {ok, Pid} = start_event_listener(), |
| DbChannels = smoosh_utils:split( |
| config:get("smoosh", "db_channels", "upgrade_dbs,ratio_dbs,slack_dbs") |
| ), |
| ViewChannels = smoosh_utils:split( |
| config:get("smoosh", "view_channels", "upgrade_views,ratio_views,slack_views") |
| ), |
| Tab = ets:new(channels, [{keypos, #channel.name}]), |
| {ok, |
| create_missing_channels(#state{ |
| db_channels = DbChannels, |
| view_channels = ViewChannels, |
| event_listener = Pid, |
| tab = Tab |
| })}. |
| |
| handle_config_change("smoosh", "db_channels", L, _, _) -> |
| {ok, gen_server:cast(?MODULE, {new_db_channels, smoosh_utils:split(L)})}; |
| handle_config_change("smoosh", "view_channels", L, _, _) -> |
| {ok, gen_server:cast(?MODULE, {new_view_channels, smoosh_utils:split(L)})}; |
| handle_config_change(_, _, _, _, _) -> |
| {ok, nil}. |
| |
| handle_config_terminate(_Server, stop, _State) -> |
| ok; |
| handle_config_terminate(_Server, _Reason, _State) -> |
| erlang:send_after( |
| ?RELISTEN_DELAY, |
| whereis(?MODULE), |
| restart_config_listener |
| ). |
| |
| handle_call(status, _From, State) -> |
| Acc = ets:foldl(fun get_channel_status/2, [], State#state.tab), |
| {reply, {ok, Acc}, State}; |
| handle_call({enqueue, Object}, _From, State) -> |
| {noreply, NewState} = handle_cast({enqueue, Object}, State), |
| {reply, ok, NewState}; |
| handle_call(suspend, _From, State) -> |
| ets:foldl( |
| fun(#channel{name = Name, pid = P}, _) -> |
| Level = smoosh_utils:log_level("compaction_log_level", "debug"), |
| couch_log:Level("Suspending ~p", [Name]), |
| smoosh_channel:suspend(P) |
| end, |
| 0, |
| State#state.tab |
| ), |
| {reply, ok, State}; |
| handle_call(resume, _From, State) -> |
| ets:foldl( |
| fun(#channel{name = Name, pid = P}, _) -> |
| Level = smoosh_utils:log_level("compaction_log_level", "debug"), |
| couch_log:Level("Resuming ~p", [Name]), |
| smoosh_channel:resume(P) |
| end, |
| 0, |
| State#state.tab |
| ), |
| {reply, ok, State}; |
| handle_call({get_channel, ChannelName}, _From, #state{tab = Tab} = State) -> |
| {reply, {ok, channel_pid(Tab, ChannelName)}, State}. |
| |
| handle_cast({new_db_channels, Channels}, State) -> |
| [ |
| smoosh_channel:close(channel_pid(State#state.tab, C)) |
| || C <- State#state.db_channels -- Channels |
| ], |
| {noreply, create_missing_channels(State#state{db_channels = Channels})}; |
| handle_cast({new_view_channels, Channels}, State) -> |
| [ |
| smoosh_channel:close(channel_pid(State#state.tab, C)) |
| || C <- State#state.view_channels -- Channels |
| ], |
| {noreply, create_missing_channels(State#state{view_channels = Channels})}; |
| handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) -> |
| case maps:is_key(Object, Waiting) of |
| true -> |
| {noreply, State}; |
| false -> |
| {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]), |
| {noreply, add_enqueue_ref(Object, Ref, State)} |
| end. |
| |
| handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) -> |
| Level = smoosh_utils:log_level("compaction_log_level", "notice"), |
| couch_log:Level("update notifier died ~p", [Reason]), |
| {ok, Pid1} = start_event_listener(), |
| {noreply, State#state{event_listener = Pid1}}; |
| handle_info({'EXIT', Pid, Reason}, State) -> |
| Level = smoosh_utils:log_level("compaction_log_level", "notice"), |
| couch_log:Level("~p ~p died ~p", [?MODULE, Pid, Reason]), |
| case ets:match_object(State#state.tab, #channel{pid = Pid, _ = '_'}) of |
| [#channel{name = Name}] -> |
| ets:delete(State#state.tab, Name); |
| _ -> |
| ok |
| end, |
| {noreply, create_missing_channels(State)}; |
| handle_info({'DOWN', Ref, process, _, _}, #state{} = State) -> |
| {noreply, remove_enqueue_ref(Ref, State)}; |
| handle_info(restart_config_listener, State) -> |
| ok = config:listen_for_changes(?MODULE, nil), |
| {noreply, State}; |
| handle_info(_Msg, State) -> |
| {noreply, State}. |
| |
| terminate(_Reason, State) -> |
| ets:foldl( |
| fun(#channel{pid = P}, _) -> smoosh_channel:close(P) end, |
| 0, |
| State#state.tab |
| ), |
| ok. |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| % private functions. |
| |
| add_enqueue_ref(Object, Ref, #state{} = State) when is_reference(Ref) -> |
| #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State, |
| Waiting1 = Waiting#{Object => Ref}, |
| WaitingByRef1 = WaitingByRef#{Ref => Object}, |
| State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}. |
| |
| remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) -> |
| #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State, |
| {Object, WaitingByRef1} = maps:take(Ref, WaitingByRef), |
| {Ref, Waiting1} = maps:take(Object, Waiting), |
| State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}. |
| |
| get_channel_status(#channel{name = Name, pid = P}, Acc0) when is_pid(P) -> |
| try gen_server:call(P, status) of |
| {ok, Status} -> |
| [{Name, Status} | Acc0]; |
| _ -> |
| Acc0 |
| catch |
| _:_ -> |
| Acc0 |
| end; |
| get_channel_status(_, Acc0) -> |
| Acc0. |
| |
| start_event_listener() -> |
| couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]). |
| |
| enqueue_request(State, Object) -> |
| try |
| case find_channel(State, Object) of |
| false -> |
| ok; |
| {ok, Pid, Priority} -> |
| smoosh_channel:enqueue(Pid, Object, Priority) |
| end |
| catch |
| Class:Exception:Stack -> |
| couch_log:warning( |
| "~s: ~p ~p for ~s : ~p", |
| [ |
| ?MODULE, |
| Class, |
| Exception, |
| smoosh_utils:stringify(Object), |
| Stack |
| ] |
| ) |
| end. |
| |
| find_channel(#state{} = State, {Shard, GroupId}) -> |
| find_channel(State#state.tab, State#state.view_channels, {Shard, GroupId}); |
| find_channel(#state{} = State, DbName) -> |
| find_channel(State#state.tab, State#state.db_channels, DbName). |
| |
| find_channel(_Tab, [], _Object) -> |
| false; |
| find_channel(Tab, [Channel | Rest], Object) -> |
| Pid = channel_pid(Tab, Channel), |
| LastUpdated = smoosh_channel:last_updated(Pid, Object), |
| StalenessInSec = |
| config:get_integer("smoosh", "staleness", 5) * |
| ?SECONDS_PER_MINUTE, |
| Staleness = erlang:convert_time_unit(StalenessInSec, seconds, native), |
| Now = erlang:monotonic_time(), |
| Activated = smoosh_channel:is_activated(Pid), |
| StaleEnough = LastUpdated =:= false orelse Now - LastUpdated > Staleness, |
| case Activated andalso StaleEnough of |
| true -> |
| case smoosh_utils:ignore_db(Object) of |
| true -> |
| find_channel(Tab, Rest, Object); |
| _ -> |
| case get_priority(Channel, Object) of |
| 0 -> |
| find_channel(Tab, Rest, Object); |
| Priority -> |
| {ok, Pid, Priority} |
| end |
| end; |
| false -> |
| find_channel(Tab, Rest, Object) |
| end. |
| |
| channel_pid(Tab, Channel) -> |
| [#channel{pid = Pid}] = ets:lookup(Tab, Channel), |
| Pid. |
| |
| create_missing_channels(State) -> |
| create_missing_channels(State#state.tab, State#state.db_channels), |
| create_missing_channels(State#state.tab, State#state.view_channels), |
| State. |
| |
| create_missing_channels(_Tab, []) -> |
| ok; |
| create_missing_channels(Tab, [Channel | Rest]) -> |
| case ets:lookup(Tab, Channel) of |
| [] -> |
| {ok, Pid} = smoosh_channel:start_link(Channel), |
| true = ets:insert(Tab, [#channel{name = Channel, pid = Pid}]); |
| _ -> |
| ok |
| end, |
| create_missing_channels(Tab, Rest). |
| |
| get_priority(Channel, {Shard, GroupId}) -> |
| case couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of |
| {ok, Pid} -> |
| try |
| {ok, ViewInfo} = couch_index:get_info(Pid), |
| {SizeInfo} = couch_util:get_value(sizes, ViewInfo), |
| DiskSize = couch_util:get_value(file, SizeInfo), |
| ActiveSize = couch_util:get_value(active, SizeInfo), |
| NeedsUpgrade = needs_upgrade(ViewInfo), |
| get_priority(Channel, DiskSize, ActiveSize, NeedsUpgrade) |
| catch |
| exit:{timeout, _} -> |
| 0 |
| end; |
| {not_found, _Reason} -> |
| 0; |
| {database_does_not_exist, _Stack} -> |
| 0; |
| {error, Reason} -> |
| couch_log:warning( |
| "Failed to get group_pid for ~p ~p ~p: ~p", |
| [Channel, Shard, GroupId, Reason] |
| ), |
| 0 |
| end; |
| get_priority(Channel, DbName) when is_list(DbName) -> |
| get_priority(Channel, ?l2b(DbName)); |
| get_priority(Channel, DbName) when is_binary(DbName) -> |
| case couch_db:open_int(DbName, []) of |
| {ok, Db} -> |
| try |
| get_priority(Channel, Db) |
| after |
| couch_db:close(Db) |
| end; |
| Error = {not_found, no_db_file} -> |
| couch_log:warning( |
| "~p: Error getting priority for ~p: ~p", |
| [Channel, DbName, Error] |
| ), |
| 0 |
| end; |
| get_priority(Channel, Db) -> |
| {ok, DocInfo} = couch_db:get_db_info(Db), |
| {SizeInfo} = couch_util:get_value(sizes, DocInfo), |
| DiskSize = couch_util:get_value(file, SizeInfo), |
| ActiveSize = couch_util:get_value(active, SizeInfo), |
| NeedsUpgrade = needs_upgrade(DocInfo), |
| case db_changed(Channel, DocInfo) of |
| true -> get_priority(Channel, DiskSize, ActiveSize, NeedsUpgrade); |
| false -> 0 |
| end. |
| |
| get_priority(Channel, DiskSize, DataSize, NeedsUpgrade) -> |
| Priority = get_priority(Channel), |
| MinSize = to_number(Channel, "min_size", "1048576"), |
| MaxSize = to_number(Channel, "max_size", "infinity"), |
| DefaultMinPriority = |
| case Priority of |
| "slack" -> "536870912"; |
| _ -> "2.0" |
| end, |
| MinPriority = to_number(Channel, "min_priority", DefaultMinPriority), |
| MaxPriority = to_number(Channel, "max_priority", "infinity"), |
| if |
| Priority =:= "upgrade", NeedsUpgrade -> |
| 1; |
| DiskSize =< MinSize -> |
| 0; |
| DiskSize > MaxSize -> |
| 0; |
| DataSize =:= 0 -> |
| MinPriority; |
| Priority =:= "ratio", DiskSize / DataSize =< MinPriority -> |
| 0; |
| Priority =:= "ratio", DiskSize / DataSize > MaxPriority -> |
| 0; |
| Priority =:= "ratio" -> |
| DiskSize / DataSize; |
| Priority =:= "slack", DiskSize - DataSize =< MinPriority -> |
| 0; |
| Priority =:= "slack", DiskSize - DataSize > MaxPriority -> |
| 0; |
| Priority =:= "slack" -> |
| DiskSize - DataSize; |
| true -> |
| 0 |
| end. |
| |
| db_changed(Channel, Info) -> |
| case couch_util:get_value(compacted_seq, Info) of |
| undefined -> |
| true; |
| CompactedSeq -> |
| MinChanges = list_to_integer( |
| smoosh_utils:get(Channel, "min_changes", "0") |
| ), |
| UpdateSeq = couch_util:get_value(update_seq, Info), |
| UpdateSeq - CompactedSeq >= MinChanges |
| end. |
| |
| to_number(Channel, Name, Default) -> |
| case smoosh_utils:get(Channel, Name, Default) of |
| "infinity" -> |
| infinity; |
| Value -> |
| try |
| list_to_float(Value) |
| catch |
| error:badarg -> |
| list_to_integer(Value) |
| end |
| end. |
| |
| get_priority("ratio_dbs") -> |
| "ratio"; |
| get_priority("ratio_views") -> |
| "ratio"; |
| get_priority("slack_dbs") -> |
| "slack"; |
| get_priority("slack_views") -> |
| "slack"; |
| get_priority("upgrade_dbs") -> |
| "upgrade"; |
| get_priority("upgrade_views") -> |
| "upgrade"; |
| get_priority(Channel) -> |
| smoosh_utils:get(Channel, "priority", "ratio"). |
| |
| needs_upgrade(Props) -> |
| db_needs_upgrade(Props) orelse view_needs_upgrade(Props). |
| |
| db_needs_upgrade(Props) -> |
| DiskVersion = couch_util:get_value(disk_format_version, Props), |
| case couch_util:get_value(engine, Props) of |
| couch_bt_engine -> |
| (couch_bt_engine_header:latest(DiskVersion) =:= false); |
| _ -> |
| false |
| end. |
| |
| view_needs_upgrade(Props) -> |
| case couch_util:get_value(collator_versions, Props) of |
| undefined -> |
| false; |
| Versions when is_list(Versions) -> |
| Enabled = couch_mrview_util:compact_on_collator_upgrade(), |
| Enabled andalso length(Versions) >= 2 |
| end. |
| |
| -ifdef(TEST). |
| |
| -include_lib("couch/include/couch_eunit.hrl"). |
| |
| setup_all() -> |
| Ctx = setup_all_no_mock(), |
| Pid = list_to_pid("<0.0.0>"), |
| meck:expect(couch_index_server, get_index, 3, {ok, Pid}), |
| meck:expect(config, get, fun(_, _, Default) -> Default end), |
| Ctx. |
| |
| setup_all_no_mock() -> |
| Ctx = test_util:start_couch([couch_log]), |
| meck:new([config, couch_index, couch_index_server], [passthrough]), |
| meck:expect(config, get, fun(_, _, Default) -> Default end), |
| Ctx. |
| |
| teardown_all(Ctx) -> |
| meck:unload(), |
| test_util:stop_couch(Ctx). |
| |
| setup() -> |
| Shard = <<"shards/00000000-1fffffff/test.1529510412">>, |
| GroupId = <<"_design/ddoc">>, |
| {ok, Shard, GroupId}. |
| |
| teardown(_) -> |
| ok. |
| |
| config_change_test_() -> |
| { |
| "Test config updates", |
| { |
| foreach, |
| fun() -> test_util:start_couch([smoosh]) end, |
| fun test_util:stop_couch/1, |
| [ |
| ?TDEF_FE(t_restart_config_listener) |
| ] |
| } |
| }. |
| |
| get_priority_test_() -> |
| { |
| setup, |
| fun setup_all/0, |
| fun teardown_all/1, |
| { |
| foreach, |
| fun setup/0, |
| fun teardown/1, |
| [ |
| ?TDEF_FE(t_ratio_view), |
| ?TDEF_FE(t_slack_view), |
| ?TDEF_FE(t_no_data_view), |
| ?TDEF_FE(t_below_min_priority_view), |
| ?TDEF_FE(t_below_min_size_view), |
| ?TDEF_FE(t_timeout_view), |
| ?TDEF_FE(t_missing_view), |
| ?TDEF_FE(t_invalid_view) |
| ] |
| } |
| }. |
| |
| get_priority_no_mock_test_() -> |
| { |
| setup, |
| fun setup_all_no_mock/0, |
| fun teardown_all/1, |
| { |
| foreach, |
| fun setup/0, |
| fun teardown/1, |
| [ |
| ?TDEF_FE(t_missing_db) |
| ] |
| } |
| }. |
| |
| t_restart_config_listener(_) -> |
| ConfigMonitor = config_listener_mon(), |
| ?assert(is_process_alive(ConfigMonitor)), |
| test_util:stop_sync(ConfigMonitor), |
| ?assertNot(is_process_alive(ConfigMonitor)), |
| NewConfigMonitor = test_util:wait(fun() -> |
| case config_listener_mon() of |
| undefined -> wait; |
| Pid -> Pid |
| end |
| end), |
| ?assert(is_process_alive(NewConfigMonitor)). |
| |
| t_ratio_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| {ok, [{sizes, {[{file, 5242880}, {active, 524288}]}}]} |
| end), |
| ?assertEqual(10.0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_slack_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| {ok, [{sizes, {[{file, 1073741824}, {active, 536870911}]}}]} |
| end), |
| ?assertEqual(2.0000000037252903, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(536870913, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_no_data_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| {ok, [{sizes, {[{file, 5242880}, {active, 0}]}}]} |
| end), |
| ?assertEqual(2.0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(536870912, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(2.0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_below_min_priority_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| {ok, [{sizes, {[{file, 5242880}, {active, 1048576}]}}]} |
| end), |
| ?assertEqual(5.0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_below_min_size_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| {ok, [{sizes, {[{file, 1048576}, {active, 512000}]}}]} |
| end), |
| ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_timeout_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| exit({timeout, get_info}) |
| end), |
| ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_missing_db(_) -> |
| ShardGroup = {<<"shards/80000000-ffffffff/db2.1666895357">>, <<"x">>}, |
| ?assertEqual(0, get_priority("ratio_views", ShardGroup)), |
| ?assertEqual(0, get_priority("slack_views", ShardGroup)), |
| ?assertEqual(0, get_priority("upgrade_views", ShardGroup)). |
| |
| t_missing_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index_server, get_index, 3, {not_found, missing}), |
| ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_invalid_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index_server, get_index, 3, {error, undef}), |
| ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| config_listener_mon() -> |
| IsConfigMonitor = fun(P) -> |
| [M | _] = string:tokens(couch_debug:process_name(P), ":"), |
| M =:= "config_listener_mon" |
| end, |
| [{_, MonitoredBy}] = process_info(whereis(?MODULE), [monitored_by]), |
| case lists:filter(IsConfigMonitor, MonitoredBy) of |
| [Pid] -> Pid; |
| [] -> undefined |
| end. |
| |
| add_remove_enqueue_ref_test() -> |
| ObjCount = 10000, |
| ObjRefs = [{I, make_ref()} || I <- lists:seq(1, ObjCount)], |
| |
| St = lists:foldl( |
| fun({I, Ref}, #state{} = Acc) -> |
| add_enqueue_ref(I, Ref, Acc) |
| end, |
| #state{}, |
| ObjRefs |
| ), |
| |
| ?assertEqual(ObjCount, map_size(St#state.waiting)), |
| ?assertEqual(ObjCount, map_size(St#state.waiting_by_ref)), |
| |
| {_Objs, Refs} = lists:unzip(ObjRefs), |
| St1 = lists:foldl( |
| fun(Ref, #state{} = Acc) -> |
| remove_enqueue_ref(Ref, Acc) |
| end, |
| St, |
| Refs |
| ), |
| |
| % It's basically back to an initial (empty) state |
| ?assertEqual(St1, #state{}). |
| |
| -endif. |