| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(smoosh_server). |
| -behaviour(gen_server). |
| -behaviour(config_listener). |
| |
| % public api. |
| -export([ |
| start_link/0, |
| suspend/0, |
| resume/0, |
| flush/0, |
| enqueue/1, |
| sync_enqueue/1, |
| handle_db_event/3, |
| status/0 |
| ]). |
| |
| % gen_server api. |
| -export([ |
| init/1, |
| handle_continue/2, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| code_change/3, |
| terminate/2 |
| ]). |
| |
| % config_listener api |
| -export([handle_config_change/5, handle_config_terminate/3]). |
| |
| % exported but for internal use. |
| -export([ |
| enqueue_request/2, |
| get_priority/2, |
| update_access/1, |
| access_cleaner/0 |
| ]). |
| |
| -ifdef(TEST). |
| -define(STALENESS_MIN, 0). |
| -define(ACCEESS_CLEAN_INTERVAL_MSEC, 300). |
| -define(RELISTEN_DELAY, 50). |
| -else. |
| -define(STALENESS_MIN, 5). |
| -define(ACCEESS_CLEAN_INTERVAL_MSEC, 3000). |
| -define(RELISTEN_DELAY, 5000). |
| -endif. |
| |
| -define(INDEX_CLEANUP, index_cleanup). |
| -define(ACCESS, smoosh_access). |
| -define(ACCESS_MAX_SIZE, 250000). |
| -define(ACCESS_NEVER, -1 bsl 58). |
| |
| % private records. |
| |
| -record(state, { |
| db_channels = [], |
| view_channels = [], |
| cleanup_channels = [], |
| event_listener, |
| waiting = #{}, |
| waiting_by_ref = #{}, |
| access_cleaner |
| }). |
| |
| -record(channel, { |
| name, |
| pid, |
| stab |
| }). |
| |
| % public functions. |
| |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| suspend() -> |
| gen_server:call(?MODULE, suspend, infinity). |
| |
| resume() -> |
| gen_server:call(?MODULE, resume, infinity). |
| |
| flush() -> |
| gen_server:call(?MODULE, flush, infinity). |
| |
| status() -> |
| try ets:foldl(fun get_channel_status/2, [], ?MODULE) of |
| Res -> {ok, Res} |
| catch |
| error:badarg -> |
| {ok, []} |
| end. |
| |
| enqueue(Object0) -> |
| Object = smoosh_utils:validate_arg(Object0), |
| case stale_enough(Object) of |
| true -> gen_server:cast(?MODULE, {enqueue, Object}); |
| false -> ok |
| end. |
| |
| sync_enqueue(Object0) -> |
| Object = smoosh_utils:validate_arg(Object0), |
| case stale_enough(Object) of |
| true -> gen_server:call(?MODULE, {enqueue, Object}, infinity); |
| false -> ok |
| end. |
| |
| handle_db_event(DbName, local_updated, St) -> |
| enqueue(DbName), |
| {ok, St}; |
| handle_db_event(DbName, updated, St) -> |
| enqueue(DbName), |
| {ok, St}; |
| handle_db_event(DbName, ddoc_updated, St) -> |
| enqueue({?INDEX_CLEANUP, mem3:dbname(DbName)}), |
| {ok, St}; |
| handle_db_event(DbName, {index_commit, IdxName}, St) -> |
| enqueue({DbName, IdxName}), |
| {ok, St}; |
| handle_db_event(DbName, {index_collator_upgrade, IdxName}, St) -> |
| enqueue({DbName, IdxName}), |
| {ok, St}; |
| handle_db_event(_DbName, _Event, St) -> |
| {ok, St}. |
| |
| % gen_server functions. |
| |
| init([]) -> |
| process_flag(trap_exit, true), |
| process_flag(message_queue_data, off_heap), |
| ok = config:listen_for_changes(?MODULE, nil), |
| Opts = [named_table, {read_concurrency, true}], |
| ets:new(?MODULE, Opts ++ [{keypos, #channel.name}]), |
| ets:new(?ACCESS, Opts ++ [{write_concurrency, true}, public]), |
| State = #state{ |
| access_cleaner = spawn_link(?MODULE, access_cleaner, []), |
| db_channels = smoosh_utils:db_channels(), |
| view_channels = smoosh_utils:view_channels(), |
| cleanup_channels = smoosh_utils:cleanup_channels() |
| }, |
| {ok, State, {continue, create_channels}}. |
| |
| handle_config_change("smoosh", "db_channels", _, _, _) -> |
| {ok, gen_server:cast(?MODULE, new_db_channels)}; |
| handle_config_change("smoosh", "view_channels", _, _, _) -> |
| {ok, gen_server:cast(?MODULE, new_view_channels)}; |
| handle_config_change("smoosh", "cleanup_channels", _, _, _) -> |
| {ok, gen_server:cast(?MODULE, new_cleanup_channels)}; |
| handle_config_change(_, _, _, _, _) -> |
| {ok, nil}. |
| |
| handle_config_terminate(_Server, stop, _State) -> |
| ok; |
| handle_config_terminate(_Server, _Reason, _State) -> |
| erlang:send_after( |
| ?RELISTEN_DELAY, |
| whereis(?MODULE), |
| restart_config_listener |
| ). |
| |
| handle_continue(create_channels, #state{} = State) -> |
| % Warn users about smoosh persistence misconfiguration issues. Do it once |
| % on startup to avoid continuously spamming logs with errors. |
| smoosh_persist:check_setup(), |
| State1 = create_missing_channels(State), |
| {ok, Pid} = start_event_listener(), |
| {noreply, State1#state{event_listener = Pid}}. |
| |
| handle_call({enqueue, Object}, _From, State) -> |
| {noreply, NewState} = handle_cast({enqueue, Object}, State), |
| {reply, ok, NewState}; |
| handle_call(suspend, _From, State) -> |
| Fun = fun(#channel{name = Name, pid = P}, _) -> |
| Level = smoosh_utils:log_level("compaction_log_level", "debug"), |
| couch_log:Level("Suspending ~p", [Name]), |
| smoosh_channel:suspend(P) |
| end, |
| ets:foldl(Fun, ok, ?MODULE), |
| {reply, ok, State}; |
| handle_call(resume, _From, State) -> |
| Fun = fun(#channel{name = Name, pid = P}, _) -> |
| Level = smoosh_utils:log_level("compaction_log_level", "debug"), |
| couch_log:Level("Resuming ~p", [Name]), |
| smoosh_channel:resume(P) |
| end, |
| ets:foldl(Fun, ok, ?MODULE), |
| {reply, ok, State}; |
| handle_call(flush, _From, State) -> |
| Fun = fun(#channel{pid = P}, _) -> smoosh_channel:flush(P) end, |
| ets:foldl(Fun, ok, ?MODULE), |
| {reply, ok, State}. |
| |
| handle_cast(new_db_channels, #state{} = State) -> |
| Channels = smoosh_utils:db_channels(), |
| Closed = State#state.db_channels -- Channels, |
| [smoosh_channel:close(channel_pid(C)) || C <- Closed], |
| State1 = State#state{db_channels = Channels}, |
| {noreply, create_missing_channels(State1)}; |
| handle_cast(new_view_channels, #state{} = State) -> |
| Channels = smoosh_utils:view_channels(), |
| Closed = State#state.view_channels -- Channels, |
| [smoosh_channel:close(channel_pid(C)) || C <- Closed], |
| State1 = State#state{view_channels = Channels}, |
| {noreply, create_missing_channels(State1)}; |
| handle_cast(new_cleanup_channels, #state{} = State) -> |
| Channels = smoosh_utils:cleanup_channels(), |
| Closed = State#state.cleanup_channels -- Channels, |
| [smoosh_channel:close(channel_pid(C)) || C <- Closed], |
| State1 = State#state{cleanup_channels = Channels}, |
| {noreply, create_missing_channels(State1)}; |
| handle_cast({enqueue, Object}, #state{waiting = Waiting} = State) -> |
| case is_map_key(Object, Waiting) of |
| true -> |
| {noreply, State}; |
| false -> |
| {_Pid, Ref} = spawn_monitor(?MODULE, enqueue_request, [State, Object]), |
| {noreply, add_enqueue_ref(Object, Ref, State)} |
| end. |
| |
| handle_info({'EXIT', Pid, Reason}, #state{event_listener = Pid} = State) -> |
| Level = smoosh_utils:log_level("compaction_log_level", "notice"), |
| couch_log:Level("update notifier died ~p", [Reason]), |
| {ok, Pid1} = start_event_listener(), |
| {noreply, State#state{event_listener = Pid1}}; |
| handle_info({'EXIT', Pid, Reason}, #state{access_cleaner = Pid} = State) -> |
| Level = smoosh_utils:log_level("compaction_log_level", "notice"), |
| couch_log:Level("access cleaner died ~p", [Reason]), |
| Pid1 = spawn_link(?MODULE, access_cleaner, []), |
| {noreply, State#state{access_cleaner = Pid1}}; |
| handle_info({'EXIT', Pid, Reason}, State) -> |
| Level = smoosh_utils:log_level("compaction_log_level", "notice"), |
| couch_log:Level("~p ~p died ~p", [?MODULE, Pid, Reason]), |
| case ets:match_object(?MODULE, #channel{pid = Pid, _ = '_'}) of |
| [#channel{name = Name}] -> |
| ets:delete(?MODULE, Name); |
| _ -> |
| ok |
| end, |
| {noreply, create_missing_channels(State)}; |
| handle_info({'DOWN', Ref, process, _, _}, #state{} = State) -> |
| {noreply, remove_enqueue_ref(Ref, State)}; |
| handle_info(restart_config_listener, State) -> |
| ok = config:listen_for_changes(?MODULE, nil), |
| {noreply, State}; |
| handle_info(_Msg, State) -> |
| {noreply, State}. |
| |
| terminate(_Reason, #state{access_cleaner = CPid}) -> |
| catch unlink(CPid), |
| exit(CPid, kill), |
| Fun = fun(#channel{pid = P}, _) -> |
| smoosh_channel:close(P) |
| end, |
| ets:foldl(Fun, ok, ?MODULE). |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| update_access(Object) -> |
| Now = erlang:monotonic_time(second), |
| true = ets:insert(?ACCESS, {Object, Now}), |
| ok. |
| |
| % private functions. |
| |
| add_enqueue_ref(Object, Ref, #state{} = State) when is_reference(Ref) -> |
| #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State, |
| Waiting1 = Waiting#{Object => Ref}, |
| WaitingByRef1 = WaitingByRef#{Ref => Object}, |
| State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}. |
| |
| remove_enqueue_ref(Ref, #state{} = State) when is_reference(Ref) -> |
| #state{waiting = Waiting, waiting_by_ref = WaitingByRef} = State, |
| {Object, WaitingByRef1} = maps:take(Ref, WaitingByRef), |
| {Ref, Waiting1} = maps:take(Object, Waiting), |
| State#state{waiting = Waiting1, waiting_by_ref = WaitingByRef1}. |
| |
| get_channel_status(#channel{name = Name, stab = Tab}, Acc) -> |
| Status = smoosh_channel:get_status(Tab), |
| [{Name, Status} | Acc]; |
| get_channel_status(_, Acc) -> |
| Acc. |
| |
| start_event_listener() -> |
| couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]). |
| |
| enqueue_request(State, Object) -> |
| try |
| case find_channel(State, Object) of |
| false -> |
| ok; |
| {ok, Pid, Priority} -> |
| case ets:info(?ACCESS, size) of |
| Size when Size =< ?ACCESS_MAX_SIZE -> |
| ok = update_access(Object); |
| _ -> |
| ok |
| end, |
| QuantizedPriority = quantize(Priority), |
| smoosh_channel:enqueue(Pid, Object, QuantizedPriority) |
| end |
| catch |
| Tag:Exception:Stack -> |
| Args = [?MODULE, Tag, Exception, smoosh_utils:stringify(Object), Stack], |
| couch_log:warning("~s: ~p ~p for ~s : ~p", Args), |
| ok |
| end. |
| |
| find_channel(#state{} = State, {?INDEX_CLEANUP, DbName}) -> |
| find_channel(State, State#state.cleanup_channels, {?INDEX_CLEANUP, DbName}); |
| find_channel(#state{} = State, {Shard, GroupId}) when is_binary(Shard) -> |
| find_channel(State, State#state.view_channels, {Shard, GroupId}); |
| find_channel(#state{} = State, DbName) -> |
| find_channel(State, State#state.db_channels, DbName). |
| |
| find_channel(#state{} = _State, [], _Object) -> |
| false; |
| find_channel(#state{} = State, [Channel | Rest], Object) -> |
| case stale_enough(Object) of |
| true -> |
| case smoosh_utils:ignore_db(Object) of |
| true -> |
| find_channel(State, Rest, Object); |
| _ -> |
| case get_priority(Channel, Object) of |
| 0 -> |
| find_channel(State, Rest, Object); |
| Priority -> |
| {ok, channel_pid(Channel), Priority} |
| end |
| end; |
| false -> |
| find_channel(State, Rest, Object) |
| end. |
| |
| stale_enough({?INDEX_CLEANUP, _}) -> |
| true; |
| stale_enough(Object) -> |
| LastUpdatedSec = last_updated(Object), |
| Staleness = erlang:monotonic_time(second) - LastUpdatedSec, |
| Staleness >= min_staleness_sec(). |
| |
| channel_pid(Channel) -> |
| [#channel{pid = Pid}] = ets:lookup(?MODULE, Channel), |
| Pid. |
| |
| create_missing_channels(#state{} = State) -> |
| create_missing_channels_type(State#state.db_channels), |
| create_missing_channels_type(State#state.view_channels), |
| create_missing_channels_type(State#state.cleanup_channels), |
| State. |
| |
| create_missing_channels_type([]) -> |
| ok; |
| create_missing_channels_type([Channel | Rest]) -> |
| case ets:lookup(?MODULE, Channel) of |
| [] -> |
| {ok, Pid} = smoosh_channel:start_link(Channel), |
| {ok, STab} = smoosh_channel:get_status_table(Pid), |
| Chan = #channel{ |
| name = Channel, |
| pid = Pid, |
| stab = STab |
| }, |
| true = ets:insert(?MODULE, Chan); |
| _ -> |
| ok |
| end, |
| create_missing_channels_type(Rest). |
| |
| get_priority(_Channel, {?INDEX_CLEANUP, DbName}) -> |
| try mem3:local_shards(mem3:dbname(DbName)) of |
| [_ | _] -> 1; |
| [] -> 0 |
| catch |
| error:database_does_not_exist -> |
| 0 |
| end; |
| get_priority(Channel, {Shard, GroupId}) -> |
| try couch_index_server:get_index(couch_mrview_index, Shard, GroupId) of |
| {ok, Pid} -> |
| try |
| {ok, ViewInfo} = couch_index:get_info(Pid), |
| {SizeInfo} = couch_util:get_value(sizes, ViewInfo), |
| DiskSize = couch_util:get_value(file, SizeInfo), |
| ActiveSize = couch_util:get_value(active, SizeInfo), |
| NeedsUpgrade = needs_upgrade(ViewInfo), |
| get_priority(Channel, DiskSize, ActiveSize, NeedsUpgrade) |
| catch |
| exit:{timeout, _} -> |
| 0 |
| end; |
| {not_found, _Reason} -> |
| 0; |
| {database_does_not_exist, _Stack} -> |
| 0; |
| {error, Reason} -> |
| couch_log:warning( |
| "Failed to get group_pid for ~p ~p ~p: ~p", |
| [Channel, Shard, GroupId, Reason] |
| ), |
| 0 |
| catch |
| throw:{not_found, _} -> |
| 0 |
| end; |
| get_priority(Channel, DbName) when is_binary(DbName) -> |
| case couch_db:open_int(DbName, []) of |
| {ok, Db} -> |
| try |
| get_priority(Channel, Db) |
| after |
| couch_db:close(Db) |
| end; |
| {not_found, no_db_file} -> |
| % It's expected that a db might be deleted while waiting in queue |
| 0 |
| end; |
| get_priority(Channel, Db) -> |
| {ok, DocInfo} = couch_db:get_db_info(Db), |
| {SizeInfo} = couch_util:get_value(sizes, DocInfo), |
| DiskSize = couch_util:get_value(file, SizeInfo), |
| ActiveSize = couch_util:get_value(active, SizeInfo), |
| NeedsUpgrade = needs_upgrade(DocInfo), |
| case db_changed(Channel, DocInfo) of |
| true -> get_priority(Channel, DiskSize, ActiveSize, NeedsUpgrade); |
| false -> 0 |
| end. |
| |
| get_priority(Channel, DiskSize, DataSize, NeedsUpgrade) -> |
| Priority = get_priority(Channel), |
| MinSize = to_number(Channel, "min_size", "1048576"), |
| MaxSize = to_number(Channel, "max_size", "infinity"), |
| DefaultMinPriority = |
| case Priority of |
| "slack" -> "536870912"; |
| _ -> "2.0" |
| end, |
| MinPriority = to_number(Channel, "min_priority", DefaultMinPriority), |
| MaxPriority = to_number(Channel, "max_priority", "infinity"), |
| if |
| Priority =:= "upgrade", NeedsUpgrade -> |
| 1; |
| DiskSize =< MinSize -> |
| 0; |
| DiskSize > MaxSize -> |
| 0; |
| DataSize =:= 0 -> |
| MinPriority; |
| Priority =:= "ratio", DiskSize / DataSize =< MinPriority -> |
| 0; |
| Priority =:= "ratio", DiskSize / DataSize > MaxPriority -> |
| 0; |
| Priority =:= "ratio" -> |
| DiskSize / DataSize; |
| Priority =:= "slack", DiskSize - DataSize =< MinPriority -> |
| 0; |
| Priority =:= "slack", DiskSize - DataSize > MaxPriority -> |
| 0; |
| Priority =:= "slack" -> |
| DiskSize - DataSize; |
| true -> |
| 0 |
| end. |
| |
| db_changed(Channel, Info) -> |
| case couch_util:get_value(compacted_seq, Info) of |
| undefined -> |
| true; |
| CompactedSeq -> |
| MinChanges = list_to_integer( |
| smoosh_utils:get(Channel, "min_changes", "0") |
| ), |
| UpdateSeq = couch_util:get_value(update_seq, Info), |
| UpdateSeq - CompactedSeq >= MinChanges |
| end. |
| |
| to_number(Channel, Name, Default) -> |
| case smoosh_utils:get(Channel, Name, Default) of |
| "infinity" -> |
| infinity; |
| Value -> |
| try |
| list_to_float(Value) |
| catch |
| error:badarg -> |
| list_to_integer(Value) |
| end |
| end. |
| |
| get_priority("ratio_dbs") -> |
| "ratio"; |
| get_priority("ratio_views") -> |
| "ratio"; |
| get_priority("slack_dbs") -> |
| "slack"; |
| get_priority("slack_views") -> |
| "slack"; |
| get_priority("upgrade_dbs") -> |
| "upgrade"; |
| get_priority("upgrade_views") -> |
| "upgrade"; |
| get_priority(Channel) -> |
| smoosh_utils:get(Channel, "priority", "ratio"). |
| |
| needs_upgrade(Props) -> |
| db_needs_upgrade(Props) orelse view_needs_upgrade(Props). |
| |
| db_needs_upgrade(Props) -> |
| DiskVersion = couch_util:get_value(disk_format_version, Props), |
| case couch_util:get_value(engine, Props) of |
| couch_bt_engine -> |
| (couch_bt_engine_header:latest(DiskVersion) =:= false); |
| _ -> |
| false |
| end. |
| |
| view_needs_upgrade(Props) -> |
| case couch_util:get_value(collator_versions, Props) of |
| undefined -> |
| false; |
| Versions when is_list(Versions) -> |
| Enabled = couch_mrview_util:compact_on_collator_upgrade(), |
| Enabled andalso length(Versions) >= 2 |
| end. |
| |
| access_cleaner() -> |
| JitterMSec = rand:uniform(?ACCEESS_CLEAN_INTERVAL_MSEC), |
| timer:sleep(?ACCEESS_CLEAN_INTERVAL_MSEC + JitterMSec), |
| NowSec = erlang:monotonic_time(second), |
| Limit = NowSec - min_staleness_sec(), |
| Head = {'_', '$1'}, |
| Guard = {'<', '$1', Limit}, |
| ets:select_delete(?ACCESS, [{Head, [Guard], [true]}]), |
| access_cleaner(). |
| |
| min_staleness_sec() -> |
| Min = config:get_integer("smoosh", "staleness", ?STALENESS_MIN), |
| Min * 60. |
| |
| last_updated(Object) -> |
| try ets:lookup(?ACCESS, Object) of |
| [{_, AccessSec}] -> |
| AccessSec; |
| [] -> |
| ?ACCESS_NEVER |
| catch |
| error:badarg -> |
| 0 |
| end. |
| |
| quantize(Ratio) when is_integer(Ratio) -> |
| Ratio; |
| quantize(Ratio) when is_float(Ratio), Ratio >= 16 -> |
| round(Ratio); |
| quantize(Ratio) when is_float(Ratio) -> |
| round(Ratio * 16) / 16. |
| |
| -ifdef(TEST). |
| |
| -include_lib("couch/include/couch_eunit.hrl"). |
| |
| setup_all() -> |
| Ctx = setup_all_no_mock(), |
| Pid = list_to_pid("<0.0.0>"), |
| meck:expect(couch_index_server, get_index, 3, {ok, Pid}), |
| meck:expect(config, get, fun(_, _, Default) -> Default end), |
| Ctx. |
| |
| setup_all_no_mock() -> |
| Ctx = test_util:start_couch([couch_log]), |
| meck:new([config, couch_index, couch_index_server], [passthrough]), |
| meck:expect(config, get, fun(_, _, Default) -> Default end), |
| Ctx. |
| |
| teardown_all(Ctx) -> |
| meck:unload(), |
| test_util:stop_couch(Ctx). |
| |
| setup() -> |
| Shard = <<"shards/00000000-1fffffff/test.1529510412">>, |
| GroupId = <<"_design/ddoc">>, |
| {ok, Shard, GroupId}. |
| |
| teardown(_) -> |
| ok. |
| |
| config_change_test_() -> |
| { |
| "Test config updates", |
| { |
| foreach, |
| fun() -> test_util:start_couch([smoosh]) end, |
| fun test_util:stop_couch/1, |
| [ |
| ?TDEF_FE(t_restart_config_listener) |
| ] |
| } |
| }. |
| |
| get_priority_test_() -> |
| { |
| setup, |
| fun setup_all/0, |
| fun teardown_all/1, |
| { |
| foreach, |
| fun setup/0, |
| fun teardown/1, |
| [ |
| ?TDEF_FE(t_ratio_view), |
| ?TDEF_FE(t_slack_view), |
| ?TDEF_FE(t_no_data_view), |
| ?TDEF_FE(t_below_min_priority_view), |
| ?TDEF_FE(t_below_min_size_view), |
| ?TDEF_FE(t_timeout_view), |
| ?TDEF_FE(t_missing_view), |
| ?TDEF_FE(t_invalid_view) |
| ] |
| } |
| }. |
| |
| get_priority_no_mock_test_() -> |
| { |
| setup, |
| fun setup_all_no_mock/0, |
| fun teardown_all/1, |
| { |
| foreach, |
| fun setup/0, |
| fun teardown/1, |
| [ |
| ?TDEF_FE(t_missing_db) |
| ] |
| } |
| }. |
| |
| t_restart_config_listener(_) -> |
| ConfigMonitor = config_listener_mon(), |
| ?assert(is_process_alive(ConfigMonitor)), |
| test_util:stop_sync(ConfigMonitor), |
| ?assertNot(is_process_alive(ConfigMonitor)), |
| NewConfigMonitor = test_util:wait(fun() -> |
| case config_listener_mon() of |
| undefined -> wait; |
| Pid -> Pid |
| end |
| end), |
| ?assert(is_process_alive(NewConfigMonitor)). |
| |
| t_ratio_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| {ok, [{sizes, {[{file, 5242880}, {active, 524288}]}}]} |
| end), |
| ?assertEqual(10.0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_slack_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| {ok, [{sizes, {[{file, 1073741824}, {active, 536870911}]}}]} |
| end), |
| ?assertEqual(2.0000000037252903, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(536870913, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_no_data_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| {ok, [{sizes, {[{file, 5242880}, {active, 0}]}}]} |
| end), |
| ?assertEqual(2.0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(536870912, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(2.0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_below_min_priority_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| {ok, [{sizes, {[{file, 5242880}, {active, 1048576}]}}]} |
| end), |
| ?assertEqual(5.0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_below_min_size_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| {ok, [{sizes, {[{file, 1048576}, {active, 512000}]}}]} |
| end), |
| ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_timeout_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index, get_info, fun(_) -> |
| exit({timeout, get_info}) |
| end), |
| ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_missing_db(_) -> |
| ShardGroup = {<<"shards/80000000-ffffffff/db2.1666895357">>, <<"x">>}, |
| ?assertEqual(0, get_priority("ratio_views", ShardGroup)), |
| ?assertEqual(0, get_priority("slack_views", ShardGroup)), |
| ?assertEqual(0, get_priority("upgrade_views", ShardGroup)). |
| |
| t_missing_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index_server, get_index, 3, {not_found, missing}), |
| ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| t_invalid_view({ok, Shard, GroupId}) -> |
| meck:expect(couch_index_server, get_index, 3, {error, undef}), |
| ?assertEqual(0, get_priority("ratio_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("slack_views", {Shard, GroupId})), |
| ?assertEqual(0, get_priority("upgrade_views", {Shard, GroupId})). |
| |
| config_listener_mon() -> |
| IsConfigMonitor = fun(P) -> |
| [M | _] = string:tokens(couch_debug:process_name(P), ":"), |
| M =:= "config_listener_mon" |
| end, |
| [{_, MonitoredBy}] = process_info(whereis(?MODULE), [monitored_by]), |
| case lists:filter(IsConfigMonitor, MonitoredBy) of |
| [Pid] -> Pid; |
| [] -> undefined |
| end. |
| |
| add_remove_enqueue_ref_test() -> |
| ObjCount = 10000, |
| ObjRefs = [{I, make_ref()} || I <- lists:seq(1, ObjCount)], |
| |
| St = lists:foldl( |
| fun({I, Ref}, #state{} = Acc) -> |
| add_enqueue_ref(I, Ref, Acc) |
| end, |
| #state{}, |
| ObjRefs |
| ), |
| |
| ?assertEqual(ObjCount, map_size(St#state.waiting)), |
| ?assertEqual(ObjCount, map_size(St#state.waiting_by_ref)), |
| |
| {_Objs, Refs} = lists:unzip(ObjRefs), |
| St1 = lists:foldl( |
| fun(Ref, #state{} = Acc) -> |
| remove_enqueue_ref(Ref, Acc) |
| end, |
| St, |
| Refs |
| ), |
| |
| % It's basically back to an initial (empty) state |
| ?assertEqual(St1, #state{}). |
| |
| quantize_test() -> |
| ?assertEqual(0, quantize(0)), |
| ?assertEqual(1, quantize(1)), |
| ?assertEqual(0.0, quantize(0.0)), |
| ?assertEqual(16, quantize(16.0)), |
| ?assertEqual(15.0, quantize(15.0)), |
| ?assertEqual(0.0, quantize(0.01)), |
| ?assertEqual(0.125, quantize(0.1)), |
| ?assertEqual(0.125, quantize(0.1042)), |
| ?assertEqual(0.125, quantize(0.125111111111)), |
| ?assertEqual(10.0, quantize(10.0002)). |
| |
| -endif. |