| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_compaction_daemon). |
| -behaviour(gen_server). |
| -vsn(1). |
| -behaviour(config_listener). |
| |
| % public API |
| -export([start_link/0, in_progress/0]). |
| |
| % gen_server callbacks |
| -export([init/1, handle_call/3, handle_info/2, handle_cast/2]). |
| -export([code_change/3, terminate/2]). |
| |
| % config_listener api |
| -export([handle_config_change/5, handle_config_terminate/3]). |
| |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("kernel/include/file.hrl"). |
| |
| -define(CONFIG_ETS, couch_compaction_daemon_config). |
| |
| -define(RELISTEN_DELAY, 5000). |
| |
| -record(state, { |
| loop_pid, |
| in_progress = [] |
| }). |
| |
| -record(config, { |
| db_frag = nil, |
| view_frag = nil, |
| period = nil, |
| cancel = false, |
| parallel_view_compact = false |
| }). |
| |
| -record(period, { |
| from = nil, |
| to = nil |
| }). |
| |
| |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| in_progress() -> |
| gen_server:call(?MODULE, in_progress). |
| |
| init(_) -> |
| process_flag(trap_exit, true), |
| ?CONFIG_ETS = ets:new(?CONFIG_ETS, [named_table, set, protected]), |
| ok = config:listen_for_changes(?MODULE, nil), |
| load_config(), |
| Server = self(), |
| Loop = spawn_link(fun() -> compact_loop(Server) end), |
| {ok, #state{loop_pid = Loop}}. |
| |
| |
| handle_cast({config_update, DbName, deleted}, State) -> |
| true = ets:delete(?CONFIG_ETS, ?l2b(DbName)), |
| {noreply, State}; |
| |
| handle_cast({config_update, DbName, Config}, #state{loop_pid = Loop} = State) -> |
| case parse_config(DbName, Config) of |
| {ok, NewConfig} -> |
| WasEmpty = (ets:info(?CONFIG_ETS, size) =:= 0), |
| true = ets:insert(?CONFIG_ETS, {?l2b(DbName), NewConfig}), |
| case WasEmpty of |
| true -> |
| Loop ! {self(), have_config}; |
| false -> |
| ok |
| end; |
| error -> |
| ok |
| end, |
| {noreply, State}. |
| |
| |
| handle_call({start, DbName}, {Pid, _}, |
| #state{loop_pid = Pid, in_progress = InProgress} = State) -> |
| {reply, ok, State#state{in_progress = [DbName|InProgress]}}; |
| handle_call({stop, DbName}, {Pid, _}, |
| #state{loop_pid = Pid, in_progress = InProgress} = State) -> |
| {reply, ok, State#state{in_progress = InProgress -- [DbName]}}; |
| handle_call(in_progress, _From, #state{in_progress = InProgress} = State) -> |
| {reply, InProgress, State}; |
| handle_call(Msg, _From, State) -> |
| {stop, {unexpected_call, Msg}, State}. |
| |
| |
| handle_info({'EXIT', Pid, Reason}, #state{loop_pid = Pid} = State) -> |
| {stop, {compaction_loop_died, Reason}, State}; |
| handle_info(restart_config_listener, State) -> |
| ok = config:listen_for_changes(?MODULE, nil), |
| {noreply, State}. |
| |
| |
| terminate(_Reason, _State) -> |
| true = ets:delete(?CONFIG_ETS). |
| |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| |
| handle_config_change("compactions", DbName, Value, _, _) -> |
| {ok, gen_server:cast(?MODULE, {config_update, DbName, Value})}; |
| handle_config_change(_, _, _, _, _) -> |
| {ok, nil}. |
| |
| handle_config_terminate(_, stop, _) -> |
| ok; |
| handle_config_terminate(_Server, _Reason, _State) -> |
| erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). |
| |
| compact_loop(Parent) -> |
| SnoozePeriod = config:get_integer("compaction_daemon", "snooze_period", 3), |
| {ok, _} = couch_server:all_databases( |
| fun(DbName, Acc) -> |
| case ets:info(?CONFIG_ETS, size) =:= 0 of |
| true -> |
| {stop, Acc}; |
| false -> |
| case get_db_config(DbName) of |
| nil -> |
| ok; |
| {ok, Config} -> |
| case check_period(Config) of |
| true -> |
| maybe_compact_db(Parent, DbName, Config), |
| ok = timer:sleep(SnoozePeriod * 1000); |
| false -> |
| ok |
| end |
| end, |
| {ok, Acc} |
| end |
| end, ok), |
| case ets:info(?CONFIG_ETS, size) =:= 0 of |
| true -> |
| receive {Parent, have_config} -> ok end; |
| false -> |
| PausePeriod = config:get_integer("compaction_daemon", "check_interval", 3600), |
| ok = timer:sleep(PausePeriod * 1000) |
| end, |
| compact_loop(Parent). |
| |
| |
| maybe_compact_db(Parent, DbName, Config) -> |
| case (catch couch_db:open_int(DbName, [?ADMIN_CTX])) of |
| {ok, Db} -> |
| DDocNames = db_ddoc_names(Db), |
| case can_db_compact(Config, Db) of |
| true -> |
| gen_server:call(Parent, {start, DbName}), |
| {ok, _} = couch_db:start_compact(Db), |
| TimeLeft = compact_time_left(Config), |
| case Config#config.parallel_view_compact of |
| true -> |
| ViewsCompactPid = spawn_link(fun() -> |
| maybe_compact_views(DbName, DDocNames, Config) |
| end), |
| ViewsMonRef = erlang:monitor(process, ViewsCompactPid); |
| false -> |
| ViewsCompactPid = nil, |
| ViewsMonRef = nil |
| end, |
| case couch_db:wait_for_compaction(Db, TimeLeft) of |
| ok -> |
| couch_db:close(Db), |
| case Config#config.parallel_view_compact of |
| true -> ok; |
| false -> maybe_compact_views(DbName, DDocNames, Config) |
| end; |
| {error, timeout} -> |
| couch_log:info("Compaction daemon - canceling compaction " |
| "for databaes `~s` because exceeded the allowed time.", |
| [DbName]), |
| ok = couch_db:cancel_compact(Db), |
| couch_db:close(Db); |
| {error, Reason} -> |
| couch_db:close(Db), |
| couch_log:error("Compaction daemon - an error ocurred while" |
| " compacting the database `~s`: ~p", [DbName, Reason]) |
| end, |
| case ViewsMonRef of |
| nil -> |
| ok; |
| _ -> |
| receive |
| {'DOWN', ViewsMonRef, process, _, _Reason} -> |
| ok |
| after TimeLeft + 1000 -> |
| % Under normal circunstances, the view compaction process |
| % should have finished already. |
| erlang:demonitor(ViewsMonRef, [flush]), |
| unlink(ViewsCompactPid), |
| exit(ViewsCompactPid, kill) |
| end |
| end, |
| gen_server:call(Parent, {stop, DbName}); |
| false -> |
| couch_db:close(Db), |
| maybe_compact_views(DbName, DDocNames, Config) |
| end; |
| _ -> |
| ok |
| end. |
| |
| |
| maybe_compact_views(_DbName, [], _Config) -> |
| ok; |
| maybe_compact_views(DbName, [DDocName | Rest], Config) -> |
| case check_period(Config) of |
| true -> |
| case maybe_compact_view(DbName, DDocName, Config) of |
| ok -> |
| maybe_compact_views(DbName, Rest, Config); |
| timeout -> |
| ok |
| end, |
| SnoozePeriod = config:get_integer("compaction_daemon", "snooze_period", 3), |
| ok = timer:sleep(SnoozePeriod * 1000); |
| false -> |
| ok |
| end. |
| |
| |
| db_ddoc_names(Db) -> |
| FoldFun = fun ddoc_name/2, |
| Opts = [{start_key, <<"_design/">>}], |
| {ok, DDocNames} = couch_db:fold_docs(Db, FoldFun, [], Opts), |
| DDocNames. |
| |
| ddoc_name(#full_doc_info{id = <<"_design/", _/binary>>, deleted = true}, Acc) -> |
| {ok, Acc}; |
| ddoc_name(#full_doc_info{id = <<"_design/", Id/binary>>}, Acc) -> |
| {ok, [Id | Acc]}; |
| ddoc_name(_, Acc) -> |
| {stop, Acc}. |
| |
| |
| maybe_compact_view(DbName, GroupId, Config) -> |
| DDocId = <<"_design/", GroupId/binary>>, |
| case (catch couch_mrview:get_info(DbName, DDocId)) of |
| {ok, GroupInfo} -> |
| case can_view_compact(Config, DbName, GroupId, GroupInfo) of |
| true -> |
| {ok, MonRef} = couch_mrview:compact(DbName, DDocId, [monitor]), |
| TimeLeft = compact_time_left(Config), |
| receive |
| {'DOWN', MonRef, process, _, normal} -> |
| ok; |
| {'DOWN', MonRef, process, _, Reason} -> |
| couch_log:error("Compaction daemon - an error ocurred" |
| " while compacting the view group `~s` from database " |
| "`~s`: ~p", [GroupId, DbName, Reason]), |
| ok |
| after TimeLeft -> |
| couch_log:info("Compaction daemon - canceling the compaction" |
| " for the view group `~s` of the database `~s` because it's" |
| " exceeding the allowed period.", [GroupId, DbName]), |
| erlang:demonitor(MonRef, [flush]), |
| ok = couch_mrview:cancel_compaction(DbName, DDocId), |
| timeout |
| end; |
| false -> |
| ok |
| end; |
| Error -> |
| couch_log:error("Error opening view group `~s` from database `~s`: ~p", |
| [GroupId, DbName, Error]), |
| ok |
| end. |
| |
| |
| compact_time_left(#config{cancel = false}) -> |
| infinity; |
| compact_time_left(#config{period = nil}) -> |
| infinity; |
| compact_time_left(#config{period = #period{to = {ToH, ToM} = To}}) -> |
| {H, M, _} = time(), |
| case To > {H, M} of |
| true -> |
| ((ToH - H) * 60 * 60 * 1000) + (abs(ToM - M) * 60 * 1000); |
| false -> |
| ((24 - H + ToH) * 60 * 60 * 1000) + (abs(ToM - M) * 60 * 1000) |
| end. |
| |
| |
| get_db_config(ShardName) -> |
| case ets:lookup(?CONFIG_ETS, ShardName) of |
| [] -> |
| DbName = mem3:dbname(ShardName), |
| case ets:lookup(?CONFIG_ETS, DbName) of |
| [] -> |
| case ets:lookup(?CONFIG_ETS, <<"_default">>) of |
| [] -> |
| nil; |
| [{<<"_default">>, Config}] -> |
| {ok, Config} |
| end; |
| [{DbName, Config}] -> |
| {ok, Config} |
| end; |
| [{ShardName, Config}] -> |
| {ok, Config} |
| end. |
| |
| |
| can_db_compact(#config{db_frag = Threshold} = Config, Db) -> |
| case check_period(Config) of |
| false -> |
| false; |
| true -> |
| {ok, DbInfo} = couch_db:get_db_info(Db), |
| {Frag, SpaceRequired} = frag(DbInfo), |
| couch_log:debug("Fragmentation for database `~s` is ~p%, estimated" |
| " space for compaction is ~p bytes.", |
| [couch_db:name(Db), Frag, SpaceRequired]), |
| case check_frag(Threshold, Frag) of |
| false -> |
| false; |
| true -> |
| Free = free_space(config:get("couchdb", "database_dir")), |
| case Free >= SpaceRequired of |
| true -> |
| true; |
| false -> |
| couch_log:warning("Compaction daemon - skipping database `~s` " |
| "compaction: the estimated necessary disk space is about ~p" |
| " bytes but the currently available disk space is ~p bytes.", |
| [couch_db:name(Db), SpaceRequired, Free]), |
| false |
| end |
| end |
| end. |
| |
| can_view_compact(Config, DbName, GroupId, GroupInfo) -> |
| case check_period(Config) of |
| false -> |
| false; |
| true -> |
| case couch_util:get_value(updater_running, GroupInfo) of |
| true -> |
| false; |
| false -> |
| {Frag, SpaceRequired} = frag(GroupInfo), |
| couch_log:debug("Fragmentation for view group `~s` (database `~s`)" |
| " is ~p%, estimated space for compaction is ~p bytes.", |
| [GroupId, DbName, Frag, SpaceRequired]), |
| case check_frag(Config#config.view_frag, Frag) of |
| false -> |
| false; |
| true -> |
| Free = free_space(couch_index_util:root_dir()), |
| case Free >= SpaceRequired of |
| true -> |
| true; |
| false -> |
| couch_log:warning("Compaction daemon - skipping view group" |
| " `~s` compaction (database `~s`): the estimated" |
| " necessary disk space is about ~p bytes" |
| " but the currently available disk space is ~p bytes.", |
| [GroupId, DbName, SpaceRequired, Free]), |
| false |
| end |
| end |
| end |
| end. |
| |
| |
| check_period(#config{period = nil}) -> |
| true; |
| check_period(#config{period = #period{from = From, to = To}}) -> |
| {HH, MM, _} = erlang:time(), |
| case From < To of |
| true -> |
| ({HH, MM} >= From) andalso ({HH, MM} < To); |
| false -> |
| ({HH, MM} >= From) orelse ({HH, MM} < To) |
| end. |
| |
| |
| check_frag(nil, _) -> |
| true; |
| check_frag(Threshold, Frag) -> |
| Frag >= Threshold. |
| |
| |
| frag(Props) -> |
| {Sizes} = couch_util:get_value(sizes, Props), |
| FileSize = couch_util:get_value(file, Sizes), |
| MinFileSize = list_to_integer( |
| config:get("compaction_daemon", "min_file_size", "131072")), |
| case FileSize < MinFileSize of |
| true -> |
| {0, FileSize}; |
| false -> |
| case couch_util:get_value(active, Sizes) of |
| 0 -> |
| {0, FileSize}; |
| DataSize when is_integer(DataSize), DataSize > 0 -> |
| Frag = round(((FileSize - DataSize) / FileSize * 100)), |
| {Frag, space_required(DataSize)}; |
| _ -> |
| {100, FileSize} |
| end |
| end. |
| |
| % Rough, and pessimistic, estimation of necessary disk space to compact a |
| % database or view index. |
| space_required(DataSize) -> |
| round(DataSize * 2.0). |
| |
| |
| load_config() -> |
| lists:foreach( |
| fun({DbName, ConfigString}) -> |
| case parse_config(DbName, ConfigString) of |
| {ok, Config} -> |
| true = ets:insert(?CONFIG_ETS, {?l2b(DbName), Config}); |
| error -> |
| ok |
| end |
| end, |
| config:get("compactions")). |
| |
| parse_config(DbName, ConfigString) -> |
| case (catch do_parse_config(ConfigString)) of |
| {ok, Conf} -> |
| {ok, Conf}; |
| incomplete_period -> |
| couch_log:error("Incomplete period ('to' or 'from' missing)" |
| " in the compaction configuration for database `~s`", |
| [DbName]), |
| error; |
| _ -> |
| couch_log:error("Invalid compaction configuration for database " |
| "`~s`: `~s`", [DbName, ConfigString]), |
| error |
| end. |
| |
| do_parse_config(ConfigString) -> |
| {ok, ConfProps} = couch_util:parse_term(ConfigString), |
| {ok, #config{period = Period} = Conf} = config_record(ConfProps, #config{}), |
| case Period of |
| nil -> |
| {ok, Conf}; |
| #period{from = From, to = To} when From =/= nil, To =/= nil -> |
| {ok, Conf}; |
| #period{} -> |
| incomplete_period |
| end. |
| |
| config_record([], Config) -> |
| {ok, Config}; |
| |
| config_record([{db_fragmentation, V} | Rest], Config) -> |
| [Frag] = string:tokens(V, "%"), |
| config_record(Rest, Config#config{db_frag = list_to_integer(Frag)}); |
| |
| config_record([{view_fragmentation, V} | Rest], Config) -> |
| [Frag] = string:tokens(V, "%"), |
| config_record(Rest, Config#config{view_frag = list_to_integer(Frag)}); |
| |
| config_record([{from, V} | Rest], #config{period = Period0} = Config) -> |
| Time = parse_time(V), |
| Period = case Period0 of |
| nil -> |
| #period{from = Time}; |
| #period{} -> |
| Period0#period{from = Time} |
| end, |
| config_record(Rest, Config#config{period = Period}); |
| |
| config_record([{to, V} | Rest], #config{period = Period0} = Config) -> |
| Time = parse_time(V), |
| Period = case Period0 of |
| nil -> |
| #period{to = Time}; |
| #period{} -> |
| Period0#period{to = Time} |
| end, |
| config_record(Rest, Config#config{period = Period}); |
| |
| config_record([{strict_window, true} | Rest], Config) -> |
| config_record(Rest, Config#config{cancel = true}); |
| |
| config_record([{strict_window, false} | Rest], Config) -> |
| config_record(Rest, Config#config{cancel = false}); |
| |
| config_record([{parallel_view_compaction, true} | Rest], Config) -> |
| config_record(Rest, Config#config{parallel_view_compact = true}); |
| |
| config_record([{parallel_view_compaction, false} | Rest], Config) -> |
| config_record(Rest, Config#config{parallel_view_compact = false}). |
| |
| |
| parse_time(String) -> |
| [HH, MM] = string:tokens(String, ":"), |
| {list_to_integer(HH), list_to_integer(MM)}. |
| |
| |
| free_space(Path) -> |
| DiskData = lists:sort( |
| fun({PathA, _, _}, {PathB, _, _}) -> |
| length(filename:split(PathA)) > length(filename:split(PathB)) |
| end, |
| disksup:get_disk_data()), |
| {ok, AbsPath} = abs_path(Path), |
| free_space_rec(AbsPath, DiskData). |
| |
| free_space_rec(_Path, []) -> |
| undefined; |
| free_space_rec(Path0, [{MountPoint, Total, Usage} | Rest]) -> |
| case abs_path(Path0) of |
| {ok, Path} -> |
| case MountPoint =:= string:substr(Path, 1, length(MountPoint)) of |
| false -> |
| free_space_rec(Path, Rest); |
| true -> |
| trunc(Total - (Total * (Usage / 100))) * 1024 |
| end; |
| {error, Reason} -> |
| couch_log:debug("Compaction daemon - unable to calculate free space" |
| " for `~s`: `~s` for mount mount `~p`", |
| [Path0, Reason, MountPoint]), |
| free_space_rec(Path0, Rest) |
| end. |
| |
| abs_path(Path0) -> |
| case file:read_link_info(Path0) of |
| {ok, Info} -> |
| case Info#file_info.type of |
| symlink -> |
| {ok, Path} = file:read_link(Path0), |
| abs_path(Path); |
| _ -> |
| abs_path2(Path0) |
| end; |
| {error, Reason} -> |
| {error, Reason} |
| end. |
| |
| abs_path2(Path0) -> |
| Path = filename:absname(Path0), |
| case lists:last(Path) of |
| $/ -> |
| {ok, Path}; |
| _ -> |
| {ok, Path ++ "/"} |
| end. |
| |
| |
| -ifdef(TEST). |
| -include_lib("eunit/include/eunit.hrl"). |
| |
| free_space_rec_test() -> |
| ?assertEqual(undefined, free_space_rec("", [])), |
| ?assertEqual(51200, free_space_rec("/tmp/", [{"/", 100, 50}])), |
| ?assertEqual(51200, free_space_rec("/tmp/", [ |
| {"/floop", 200, 25}, |
| {"/run", 0, 0}, |
| {"/", 100, 50} |
| ])), |
| ?assertEqual(undefined, free_space_rec("/flopp/", [{"/", 300, 75}])), |
| ?assertEqual(undefined, free_space_rec("/flopp/", [ |
| {"/floop", 200, 25}, |
| {"/run", 0, 0}, |
| {"/", 100, 50} |
| ])), |
| ok. |
| |
| abs_path2_test() -> |
| ?assertEqual({ok, "/a/"}, abs_path2("/a")), |
| ?assertEqual({ok, "/a/"}, abs_path2("/a/")), |
| |
| ?assertEqual({ok, "/a/b/"}, abs_path2("/a/b")), |
| ?assertEqual({ok, "/a/b/"}, abs_path2("/a/b")), |
| ok. |
| |
| -endif. |