blob: 14c03294ddf99772163938418d7da65a5bf268c7 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric2_server).
-behaviour(gen_server).
-vsn(1).
-export([
start_link/0,
fetch/2,
store/1,
maybe_update/1,
remove/1,
maybe_remove/1,
fdb_directory/0,
fdb_cluster/0,
get_retry_limit/0
]).
-export([
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3
]).
-include_lib("couch/include/couch_db.hrl").
-include_lib("kernel/include/file.hrl").
-include_lib("kernel/include/logger.hrl").
-include("fabric2.hrl").
-define(CLUSTER_FILE_MACOS, "/usr/local/etc/foundationdb/fdb.cluster").
-define(CLUSTER_FILE_LINUX, "/etc/foundationdb/fdb.cluster").
-define(CLUSTER_FILE_WIN32, "C:/ProgramData/foundationdb/fdb.cluster").
-define(FDB_DIRECTORY, fdb_directory).
-define(FDB_CLUSTER, fdb_cluster).
-define(DEFAULT_FDB_DIRECTORY, <<"couchdb">>).
-define(TX_OPTIONS_SECTION, "fdb_tx_options").
-define(RELISTEN_DELAY, 1000).
-define(DEFAULT_TIMEOUT_MSEC, "60000").
-define(DEFAULT_RETRY_LIMIT, "100").
-define(TX_OPTIONS, #{
machine_id => {binary, undefined},
datacenter_id => {binary, undefined},
transaction_logging_max_field_length => {integer, undefined},
timeout => {integer, ?DEFAULT_TIMEOUT_MSEC},
retry_limit => {integer, ?DEFAULT_RETRY_LIMIT},
max_retry_delay => {integer, undefined},
size_limit => {integer, undefined}
}).
-define(CONFIG_LIMITS, [
{"couchdb", "max_document_size", ?DOCUMENT_SIZE_LIMIT_BYTES},
{"couchdb", "max_attachment_size", ?ATT_SIZE_LIMIT_BYTES},
{"couchdb", "max_document_id_length", ?DOC_ID_LIMIT_BYTES},
{"fabric", "binary_chunk_size", ?DEFAULT_BINARY_CHUNK_SIZE_BYTES}
]).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
fetch(DbName, UUID) when is_binary(DbName) ->
case {UUID, ets:lookup(?MODULE, DbName)} of
{_, []} -> undefined;
{undefined, [{DbName, _UUID, _, #{} = Db}]} -> Db;
{<<_/binary>>, [{DbName, UUID, _, #{} = Db}]} -> Db;
{<<_/binary>>, [{DbName, _UUID, _, #{} = _Db}]} -> undefined
end.
store(#{name := DbName} = Db0) when is_binary(DbName) ->
#{
uuid := UUID,
md_version := MDVer
} = Db0,
Db1 = sanitize(Db0),
case ets:insert_new(?MODULE, {DbName, UUID, MDVer, Db1}) of
true -> ok;
false -> maybe_update(Db1)
end,
ok.
maybe_update(#{name := DbName} = Db0) when is_binary(DbName) ->
#{
uuid := UUID,
md_version := MDVer
} = Db0,
Db1 = sanitize(Db0),
Head = {DbName, UUID, '$1', '_'},
Guard = {'=<', '$1', MDVer},
Body = {DbName, UUID, MDVer, {const, Db1}},
try
1 =:= ets:select_replace(?MODULE, [{Head, [Guard], [{Body}]}])
catch
error:badarg ->
false
end.
remove(DbName) when is_binary(DbName) ->
true = ets:delete(?MODULE, DbName),
ok.
maybe_remove(#{name := DbName} = Db) when is_binary(DbName) ->
#{
uuid := UUID,
md_version := MDVer
} = Db,
Head = {DbName, UUID, '$1', '_'},
Guard = {'=<', '$1', MDVer},
1 =:= ets:select_delete(?MODULE, [{Head, [Guard], [true]}]).
init(_) ->
ets:new(?MODULE, [
public,
named_table,
{read_concurrency, true},
{write_concurrency, true}
]),
{Cluster, Db} = get_db_and_cluster([empty]),
application:set_env(fabric, ?FDB_CLUSTER, Cluster),
application:set_env(fabric, db, Db),
Dir =
case config:get("fabric", "fdb_directory") of
Val when is_list(Val), length(Val) > 0 ->
[?l2b(Val)];
_ ->
[?DEFAULT_FDB_DIRECTORY]
end,
application:set_env(fabric, ?FDB_DIRECTORY, Dir),
config:subscribe_for_changes([?TX_OPTIONS_SECTION]),
check_config_limits(),
{ok, nil}.
check_config_limits() ->
lists:foreach(
fun({Sect, Key, Limit}) ->
ConfigVal = config:get_integer(Sect, Key, Limit),
case ConfigVal > Limit of
true ->
LogMsg = "Config value of ~p for [~s] ~s is greater than the limit: ~p",
couch_log:warning(LogMsg, [ConfigVal, Sect, Key, Limit]);
false ->
ok
end
end,
?CONFIG_LIMITS
).
terminate(_, _St) ->
ok.
handle_call(Msg, _From, St) ->
{stop, {bad_call, Msg}, {bad_call, Msg}, St}.
handle_cast(Msg, St) ->
{stop, {bad_cast, Msg}, St}.
handle_info({config_change, ?TX_OPTIONS_SECTION, _K, deleted, _}, St) ->
% Since we don't know the exact default values to reset the options
% to we recreate the db handle instead which will start with a default
% handle and re-apply all the options
{_Cluster, NewDb} = get_db_and_cluster([]),
application:set_env(fabric, db, NewDb),
{noreply, St};
handle_info({config_change, ?TX_OPTIONS_SECTION, K, V, _}, St) ->
{ok, Db} = application:get_env(fabric, db),
apply_tx_options(Db, [{K, V}]),
{noreply, St};
handle_info({gen_event_EXIT, _Handler, _Reason}, St) ->
erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener),
{noreply, St};
handle_info(restart_config_listener, St) ->
config:subscribe_for_changes([?TX_OPTIONS_SECTION]),
{noreply, St};
handle_info(Msg, St) ->
{stop, {bad_info, Msg}, St}.
code_change(_OldVsn, St, _Extra) ->
{ok, St}.
fdb_directory() ->
get_env(?FDB_DIRECTORY).
fdb_cluster() ->
get_env(?FDB_CLUSTER).
get_retry_limit() ->
Default = list_to_integer(?DEFAULT_RETRY_LIMIT),
config:get_integer(?TX_OPTIONS_SECTION, "retry_limit", Default).
get_env(Key) ->
case get(Key) of
undefined ->
case application:get_env(fabric, Key) of
undefined ->
erlang:error(fabric_application_not_started);
{ok, Value} ->
put(Key, Value),
Value
end;
Value ->
Value
end.
get_db_and_cluster(EunitDbOpts) ->
{Cluster, Db} =
case application:get_env(fabric, eunit_run) of
{ok, true} ->
{<<"eunit_test">>, erlfdb_util:get_test_db(EunitDbOpts)};
undefined ->
ClusterFileStr = get_cluster_file_path(),
{ok, ConnectionStr} = file:read_file(ClusterFileStr),
DbHandle = erlfdb:open(iolist_to_binary(ClusterFileStr)),
{string:trim(ConnectionStr), DbHandle}
end,
apply_tx_options(Db, config:get(?TX_OPTIONS_SECTION)),
{Cluster, Db}.
get_cluster_file_path() ->
Locations =
[
{custom, config:get("erlfdb", "cluster_file")},
{custom, os:getenv("FDB_CLUSTER_FILE", undefined)}
] ++ default_locations(os:type()),
case find_cluster_file(Locations) of
{ok, Location} ->
Location;
{error, Reason} ->
erlang:error(Reason)
end.
default_locations({unix, _}) ->
[
{default, ?CLUSTER_FILE_MACOS},
{default, ?CLUSTER_FILE_LINUX}
];
default_locations({win32, _}) ->
[
{default, ?CLUSTER_FILE_WIN32}
].
find_cluster_file([]) ->
{error, cluster_file_missing};
find_cluster_file([{custom, undefined} | Rest]) ->
find_cluster_file(Rest);
find_cluster_file([{Type, Location} | Rest]) ->
Msg = #{
what => fdb_connection_setup,
configuration_type => Type,
cluster_file => Location
},
case file:read_file_info(Location, [posix]) of
{ok, #file_info{access = read_write}} ->
?LOG_INFO(Msg#{status => ok}),
couch_log:info(
"Using ~s FDB cluster file: ~s",
[Type, Location]
),
{ok, Location};
{ok, #file_info{access = read}} ->
?LOG_WARNING(Msg#{
status => read_only_file,
details =>
"If coordinators are changed without updating this "
"file CouchDB may be unable to connect to the FDB cluster!"
}),
couch_log:warning(
"Using read-only ~s FDB cluster file: ~s -- if coordinators "
"are changed without updating this file CouchDB may be unable "
"to connect to the FDB cluster!",
[Type, Location]
),
{ok, Location};
{ok, _} ->
?LOG_ERROR(Msg#{
status => permissions_error,
details => "CouchDB needs read/write access to FDB cluster file"
}),
couch_log:error(
"CouchDB needs read/write access to FDB cluster file: ~s",
[Location]
),
{error, cluster_file_permissions};
{error, Reason} when Type =:= custom ->
?LOG_ERROR(Msg#{
status => Reason,
details => file:format_error(Reason)
}),
couch_log:error(
"Encountered ~p error looking for FDB cluster file: ~s",
[Reason, Location]
),
{error, Reason};
{error, enoent} when Type =:= default ->
?LOG_INFO(Msg#{
status => enoent,
details => file:format_error(enoent)
}),
couch_log:info(
"No FDB cluster file found at ~s",
[Location]
),
find_cluster_file(Rest);
{error, Reason} when Type =:= default ->
?LOG_WARNING(Msg#{
status => Reason,
details => file:format_error(Reason)
}),
couch_log:warning(
"Encountered ~p error looking for FDB cluster file: ~s",
[Reason, Location]
),
find_cluster_file(Rest)
end.
apply_tx_options(Db, Cfg) ->
maps:map(
fun(Option, {Type, Default}) ->
case lists:keyfind(atom_to_list(Option), 1, Cfg) of
false ->
case Default of
undefined -> ok;
_Defined -> apply_tx_option(Db, Option, Default, Type)
end;
{_K, Val} ->
apply_tx_option(Db, Option, Val, Type)
end
end,
?TX_OPTIONS
).
apply_tx_option(Db, Option, Val, integer) ->
try
set_option(Db, Option, list_to_integer(Val))
catch
error:badarg ->
?LOG_ERROR(#{
what => invalid_transaction_option_value,
option => Option,
value => Val
}),
Msg = "~p : Invalid integer tx option ~p = ~p",
couch_log:error(Msg, [?MODULE, Option, Val])
end;
apply_tx_option(Db, Option, Val, binary) ->
BinVal = list_to_binary(Val),
case size(BinVal) < 16 of
true ->
set_option(Db, Option, BinVal);
false ->
?LOG_ERROR(#{
what => invalid_transaction_option_value,
option => Option,
value => Val,
details => "string transaction option must be less than 16 bytes"
}),
Msg = "~p : String tx option ~p is larger than 16 bytes",
couch_log:error(Msg, [?MODULE, Option])
end.
set_option(Db, Option, Val) ->
try
erlfdb:set_option(Db, Option, Val)
catch
% This could happen if the option is not supported by erlfdb or
% fdbsever.
error:badarg ->
?LOG_ERROR(#{
what => transaction_option_error,
option => Option,
value => Val
}),
Msg = "~p : Could not set fdb tx option ~p = ~p",
couch_log:error(Msg, [?MODULE, Option, Val])
end.
sanitize(#{} = Db) ->
Db#{
tx := undefined,
user_ctx := #user_ctx{},
security_fun := undefined,
interactive := false
}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
setup() ->
meck:new(file, [unstick, passthrough]),
meck:expect(file, read_file_info, fun
("ok.cluster", _) ->
{ok, #file_info{access = read_write}};
("readonly.cluster", _) ->
{ok, #file_info{access = read}};
("noaccess.cluster", _) ->
{ok, #file_info{access = none}};
("missing.cluster", _) ->
{error, enoent};
(Path, Options) ->
meck:passthrough([Path, Options])
end).
teardown(_) ->
meck:unload().
find_cluster_file_test_() ->
{setup, fun setup/0, fun teardown/1, [
{"ignore unspecified config",
?_assertEqual(
{ok, "ok.cluster"},
find_cluster_file([
{custom, undefined},
{custom, "ok.cluster"}
])
)},
{"allow read-only file",
?_assertEqual(
{ok, "readonly.cluster"},
find_cluster_file([
{custom, "readonly.cluster"}
])
)},
{"fail if no access to configured cluster file",
?_assertEqual(
{error, cluster_file_permissions},
find_cluster_file([
{custom, "noaccess.cluster"}
])
)},
{"fail if configured cluster file is missing",
?_assertEqual(
{error, enoent},
find_cluster_file([
{custom, "missing.cluster"},
{default, "ok.cluster"}
])
)},
{"check multiple default locations",
?_assertEqual(
{ok, "ok.cluster"},
find_cluster_file([
{default, "missing.cluster"},
{default, "ok.cluster"}
])
)}
]}.
-endif.