blob: e960dcefe05ce8328e4449fadd8a17acbda64157 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(smoosh_persist).
-export([
unpersist/1,
persist/3,
check_setup/0
]).
-include_lib("kernel/include/file.hrl").
-include_lib("couch/include/couch_db.hrl").
-define(SUFFIX, ".smooshq").
% Public API
unpersist(Name) ->
Enabled = config:get_boolean("smoosh", "persist", false),
Capacity = smoosh_utils:capacity(Name),
unpersist(Enabled, Name, Capacity).
persist(Waiting, #{} = Active, #{} = Starting) ->
Enabled = config:get_boolean("smoosh", "persist", false),
persist(Enabled, Waiting, Active, Starting).
% Validate peristence setup for read/write/delete access. Emit warnings if
% there are any failures. Call this function once during startup. During
% runtime errors are ignored. Smoosh persistence is opportunistic, so if we
% cannot read or write we just move on.
%
check_setup() ->
Enabled = config:get_boolean("smoosh", "persist", false),
try
check_setup(Enabled)
catch
throw:{fail, Msg, Error} ->
LogMsg = "~s : " ++ Msg ++ " failed in directory ~p : ~p",
Args = [?MODULE, state_dir(), Error],
couch_log:warning(LogMsg, Args),
{error, {Msg, Error}}
end.
% Private functions
unpersist(false, Name, _Capacity) ->
smoosh_priority_queue:new(Name);
unpersist(true, Name, Capacity) ->
Path = file_path(Name),
case read(Path) of
{ok, Map} -> smoosh_priority_queue:from_map(Name, Capacity, Map);
{error, _} -> smoosh_priority_queue:new(Name)
end.
persist(false, _Waiting, _Active, _Starting) ->
ok;
persist(true, Waiting, Active, Starting) ->
Name = smoosh_priority_queue:name(Waiting),
WMap = smoosh_priority_queue:to_map(Waiting),
% Starting and active jobs are at priority level `infinity` as they are
% already running. We want them to be the first ones to continue after
% restart. We're relying on infinity sorting higher than float and integer
% numeric values here.
AMap = maps:map(fun(_, _) -> infinity end, Active),
SMap = maps:from_list([{K, infinity} || K <- maps:values(Starting)]),
Path = file_path(Name),
write(maps:merge(WMap, maps:merge(AMap, SMap)), Path).
check_setup(false) ->
disabled;
check_setup(true) ->
StateDir = state_dir(),
Path = filename:join(StateDir, "smooshq.test"),
Data = #{<<"test">> => 1},
case file:read_file_info(StateDir) of
{ok, #file_info{access = A}} when A == read; A == read_write ->
ok;
{ok, #file_info{access = Invalid}} ->
throw({fail, "read access", Invalid});
{error, Error1} ->
throw({fail, "read", Error1})
end,
case write(Data, Path) of
ok -> ok;
{error, Error2} -> throw({fail, "write", Error2})
end,
file:delete(Path, [raw]).
write(#{} = QData, Path) when is_list(Path), map_size(QData) == 0 ->
% Save a few bytes by deleting the persisted queue data if
% there are no waiting/starting or active jobs
file:delete(Path, [raw]);
write(#{} = QData, Path) when is_list(Path) ->
Bin = term_to_binary(QData, [compressed, {minor_version, 2}]),
TmpPath = tmp_path(Path),
case file:write_file(TmpPath, Bin, [raw]) of
ok -> file:rename(TmpPath, Path);
{error, _} = Error -> Error
end.
read(Path) ->
case file:read_file(Path) of
{ok, Bin} ->
try binary_to_term(Bin, [safe]) of
#{} = QData -> {ok, QData};
_ -> {error, term_not_a_map}
catch
_:_ ->
{error, invalid_term}
end;
{error, _} = Error ->
Error
end.
tmp_path(Path) ->
Time = abs(erlang:system_time()),
Path ++ "." ++ integer_to_list(Time) ++ ".tmp".
file_path(Name) ->
StateDir = filename:absname(state_dir()),
filename:join(StateDir, Name ++ ?SUFFIX).
state_dir() ->
Dir = config:get("smoosh", "state_dir", "."),
filename:absname(Dir).
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
persist_unpersist_test_() ->
{
foreach,
fun() ->
test_util:mock(config)
end,
fun(_) ->
meck:unload()
end,
[
?TDEF_FE(t_write_read_delete),
?TDEF_FE(t_fail_write_read_delete),
?TDEF_FE(t_corrupted_read),
?TDEF_FE(t_check_setup),
?TDEF_FE(t_persist_unpersist_disabled),
?TDEF_FE(t_persist_unpersist_enabled),
?TDEF_FE(t_persist_unpersist_errors)
]
}.
t_write_read_delete(_) ->
Path = file_path("foochan"),
Data = #{<<"a">> => 1},
?assertEqual(ok, write(Data, Path)),
?assertMatch({ok, _}, file:read_file_info(Path)),
ReadRes = read(Path),
?assertMatch({ok, #{}}, ReadRes),
{ok, ReadData} = ReadRes,
?assertEqual(Data, ReadData),
?assertEqual(ok, write(#{}, Path)),
?assertEqual({error, enoent}, file:read_file_info(Path)).
t_fail_write_read_delete(_) ->
meck:expect(config, get, fun("smoosh", "state_dir", _) -> "./x" end),
Path = file_path("foochan"),
?assertEqual({error, enoent}, write(#{<<"a">> => 1}, Path)),
?assertEqual({error, enoent}, read(Path)),
?assertEqual({error, enoent}, write(#{}, Path)).
t_corrupted_read(_) ->
Path = file_path("foochan"),
?assertEqual(ok, write(#{<<"a">> => 1}, Path)),
ok = file:write_file(Path, ?term_to_bin(foo), [raw]),
?assertEqual({error, term_not_a_map}, read(Path)),
ok = file:write_file(Path, <<"42">>, [raw]),
?assertEqual({error, invalid_term}, read(Path)),
?assertEqual(ok, write(#{}, Path)),
?assertEqual({error, enoent}, file:read_file_info(Path)).
t_check_setup(_) ->
?assertEqual(disabled, check_setup()),
meck:expect(config, get_boolean, fun("smoosh", "persist", _) -> true end),
?assertEqual(ok, check_setup()),
TDir = ?tempfile(),
meck:expect(config, get, fun("smoosh", "state_dir", _) -> TDir end),
?assertEqual({error, {"read", enoent}}, check_setup()),
Dir = state_dir(),
ok = file:make_dir(Dir),
% Can't write, only read
ok = file:change_mode(Dir, 8#500),
?assertEqual({error, {"write", eacces}}, check_setup()),
% Can't read, only write
ok = file:change_mode(Dir, 8#300),
?assertEqual({error, {"read access", write}}, check_setup()),
ok = file:del_dir_r(Dir).
t_persist_unpersist_disabled(_) ->
Name = "chan1",
Q = smoosh_priority_queue:new(Name),
[K1, K2, K3] = [<<"x">>, {<<"x">>, <<"_design/y">>}, {index_cleanup, <<"z">>}],
Active = #{K1 => self()},
Starting = #{make_ref() => K2},
Q1 = smoosh_priority_queue:in(K3, 1.0, 9999, Q),
?assertEqual(ok, persist(Q1, Active, Starting)),
Q2 = unpersist(Name),
?assertEqual(Name, smoosh_priority_queue:name(Q2)),
?assertEqual(#{max => 0, min => 0, size => 0}, smoosh_priority_queue:info(Q2)).
t_persist_unpersist_enabled(_) ->
Name = "chan2",
Q = smoosh_priority_queue:new(Name),
Keys = [K1, K2, K3] = [<<"x">>, {<<"x">>, <<"y">>}, {index_cleanup, <<"z">>}],
Active = #{K1 => self()},
Starting = #{make_ref() => K2},
Q1 = smoosh_priority_queue:in(K3, 1.0, 9999, Q),
meck:expect(config, get_boolean, fun("smoosh", "persist", _) -> true end),
?assertEqual(ok, persist(Q1, Active, Starting)),
Q2 = unpersist(Name),
?assertEqual(Name, smoosh_priority_queue:name(Q2)),
Info2 = smoosh_priority_queue:info(Q2),
?assertEqual(#{max => infinity, min => 1.0, size => 3}, Info2),
?assertEqual(Keys, drain_q(Q2)),
% Try to persist the already unpersisted queue
?assertEqual(ok, persist(Q2, #{}, #{})),
Q3 = unpersist(Name),
?assertEqual(Name, smoosh_priority_queue:name(Q3)),
Info3 = smoosh_priority_queue:info(Q2),
?assertEqual(#{max => infinity, min => 1.0, size => 3}, Info3),
?assertEqual(Keys, drain_q(Q3)).
t_persist_unpersist_errors(_) ->
Name = "chan3",
Q = smoosh_priority_queue:new(Name),
Q1 = smoosh_priority_queue:in(<<"x">>, 1.0, 9999, Q),
meck:expect(config, get_boolean, fun("smoosh", "persist", _) -> true end),
TDir = ?tempfile(),
meck:expect(config, get, fun
("smoosh", "state_dir", _) -> TDir;
(_, _, Default) -> Default
end),
Q2 = unpersist(Name),
?assertEqual(Name, smoosh_priority_queue:name(Q2)),
?assertEqual(#{max => 0, min => 0, size => 0}, smoosh_priority_queue:info(Q2)),
Dir = state_dir(),
ok = file:make_dir(Dir),
% Can't write, only read
ok = file:change_mode(Dir, 8#500),
?assertEqual({error, eacces}, persist(Q1, #{}, #{})),
Q3 = unpersist(Name),
?assertEqual(Name, smoosh_priority_queue:name(Q3)),
?assertEqual(#{max => 0, min => 0, size => 0}, smoosh_priority_queue:info(Q3)),
ok = file:del_dir_r(Dir).
drain_q(Q) ->
lists:reverse(drain_q(Q, [])).
drain_q(Q, Acc) ->
case smoosh_priority_queue:out(Q) of
false -> Acc;
{Key, Q1} -> drain_q(Q1, [Key | Acc])
end.
-endif.