blob: fc300fd7197584444a3007cfe809eee920a8dc7d [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_file).
-behaviour(gen_server).
-vsn(2).
-include_lib("couch/include/couch_db.hrl").
-define(INITIAL_WAIT, 60000).
-define(MONITOR_CHECK, 10000).
% 4 KiB
-define(SIZE_BLOCK, 16#1000).
-define(ENCRYPTION_HEADER_SIZE, 16#800).
-define(IS_OLD_STATE(S), is_pid(S#file.db_monitor)).
-define(PREFIX_SIZE, 5).
-define(DEFAULT_READ_COUNT, 1024).
-define(ENCRYPTED_HEADER_MARKER, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15).
%% Database encryption design details
%% On file creation, if an encryption manager is configured, the
%% manager is asked to generate a new data encryption key (DEK). The
%% manager either returns a new DEK along with a wrapped form of the
%% same value that is safe to store in plain view (WEK) or it returns
%% `dont_encrypt` to indicate that the database should not be
%% encrypted.
%% When opening an encrypted file the encryption manager is asked to
%% unwrap the WEK retrieved from encryption header. The manager can
%% return just the unwrapped DEK or can return the unwrapped DEK and a
%% new WEK (wrapping the same DEK value) and a new KeyID. This
%% supports efficient key rotation.
%% The first 4096 bytes of an encrypted file contains two copies of
%% the encrypted header, to protect against partial writes. If the two
%% copies are valid but different the first copy is used.
%% All bytes, excepting the first 4096 bytes, are encrypted with AES
%% in Counter Mode. A random initialisation vector is generated at
%% file creation and stored in the encryption header. This
%% initialisation vector is re-randomised if the file is truncated.
-type block_id() :: non_neg_integer().
-type location() :: non_neg_integer().
-type header_size() :: non_neg_integer().
-record(file, {
fd,
is_sys,
eof = 0,
db_monitor,
pread_limit = 0,
iv,
dek
}).
-define(encrypt_ctr(File, Pos, Data),
crypto:stream_encrypt(
crypto:stream_init(aes_ctr, File#file.dek, aes_ctr(File#file.iv, Pos)), Data
)
).
-define(decrypt_ctr(File, Pos, Data),
crypto:stream_decrypt(
crypto:stream_init(aes_ctr, File#file.dek, aes_ctr(File#file.iv, Pos)), Data
)
).
-ifdef(OTP_RELEASE).
-if(?OTP_RELEASE >= 22).
-undef(encrypt_ctr).
-define(encrypt_ctr(File, Pos, Data),
crypto:crypto_one_time(aes_256_ctr, File#file.dek, aes_ctr(File#file.iv, Pos), Data, true)
).
-undef(decrypt_ctr).
-define(decrypt_ctr(File, Pos, Data),
crypto:crypto_one_time(aes_256_ctr, File#file.dek, aes_ctr(File#file.iv, Pos), Data, false)
).
-endif.
-endif.
% public API
-export([open/1, open/2, close/1, bytes/1, sync/1, truncate/2, set_db_pid/2]).
-export([pread_term/2, pread_iolist/2, pread_binary/2]).
-export([append_binary/2]).
-export([append_raw_chunk/2, assemble_file_chunk/2]).
-export([append_term/2, append_term/3]).
-export([pread_terms/2]).
-export([append_terms/2, append_terms/3]).
-export([write_header/2, read_header/1]).
-export([delete/2, delete/3, nuke_dir/2, init_delete_dir/1]).
-export([last_read/1]).
% gen_server callbacks
-export([init/1, terminate/2, code_change/3, format_status/2]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
%% helper functions
-export([process_info/1]).
%%----------------------------------------------------------------------
%% Args: Valid Options are [create] and [create,overwrite].
%% Files are opened in read/write mode.
%% Returns: On success, {ok, Fd}
%% or {error, Reason} if the file could not be opened.
%%----------------------------------------------------------------------
open(Filepath) ->
open(Filepath, []).
open(Filepath, Options) ->
case
gen_server:start_link(
couch_file,
{Filepath, Options, self(), Ref = make_ref()},
[]
)
of
{ok, Fd} ->
{ok, Fd};
ignore ->
% get the error
receive
{Ref, Pid, {error, Reason} = Error} ->
case process_info(self(), trap_exit) of
{trap_exit, true} ->
receive
{'EXIT', Pid, _} -> ok
end;
{trap_exit, false} ->
ok
end,
case {lists:member(nologifmissing, Options), Reason} of
{true, enoent} ->
ok;
_ ->
couch_log:error(
"Could not open file ~s: ~s",
[Filepath, file:format_error(Reason)]
)
end,
Error
end;
Error ->
% We can't say much here, because it could be any kind of error.
% Just let it bubble and an encapsulating subcomponent can perhaps
% be more informative. It will likely appear in the SASL log, anyway.
Error
end.
set_db_pid(Fd, Pid) ->
gen_server:call(Fd, {set_db_pid, Pid}).
%%----------------------------------------------------------------------
%% Purpose: To append an Erlang term to the end of the file.
%% Args: Erlang term to serialize and append to the file.
%% Returns: {ok, Pos, NumBytesWritten} where Pos is the file offset to
%% the beginning the serialized term. Use pread_term to read the term
%% back.
%% or {error, Reason}.
%%----------------------------------------------------------------------
append_term(Fd, Term) ->
append_term(Fd, Term, []).
append_term(Fd, Term, Options) ->
Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
append_binary(Fd, couch_compress:compress(Term, Comp)).
%%----------------------------------------------------------------------
%% Purpose: To append an Erlang binary to the end of the file.
%% Args: Erlang term to serialize and append to the file.
%% Returns: {ok, Pos, NumBytesWritten} where Pos is the file offset to the
%% beginning the serialized term. Use pread_term to read the term back.
%% or {error, Reason}.
%%----------------------------------------------------------------------
append_binary(Fd, Bin) ->
ioq:call(Fd, {append_bin, assemble_file_chunk(Bin)}, erlang:get(io_priority)).
append_raw_chunk(Fd, Chunk) ->
ioq:call(Fd, {append_bin, Chunk}, erlang:get(io_priority)).
assemble_file_chunk(Bin) ->
[<<0:1/integer, (iolist_size(Bin)):31/integer>>, Bin].
assemble_file_chunk(Bin, Md5) ->
[<<1:1/integer, (iolist_size(Bin)):31/integer>>, Md5, Bin].
%%----------------------------------------------------------------------
%% Purpose: Reads a term from a file that was written with append_term
%% Args: Pos, the offset into the file where the term is serialized.
%% Returns: {ok, Term}
%% or {error, Reason}.
%%----------------------------------------------------------------------
pread_term(Fd, Pos) ->
{ok, Bin} = pread_binary(Fd, Pos),
{ok, couch_compress:decompress(Bin)}.
%%----------------------------------------------------------------------
%% Purpose: Reads a binrary from a file that was written with append_binary
%% Args: Pos, the offset into the file where the term is serialized.
%% Returns: {ok, Term}
%% or {error, Reason}.
%%----------------------------------------------------------------------
pread_binary(Fd, Pos) ->
{ok, L} = pread_iolist(Fd, Pos),
{ok, iolist_to_binary(L)}.
pread_iolist(Fd, Pos) ->
case ioq:call(Fd, {pread_iolist, Pos}, erlang:get(io_priority)) of
{ok, IoList, Md5} ->
{ok, verify_md5(Fd, Pos, IoList, Md5)};
Error ->
Error
end.
pread_terms(Fd, PosList) ->
{ok, Bins} = pread_binaries(Fd, PosList),
Terms = lists:map(
fun(Bin) ->
couch_compress:decompress(Bin)
end,
Bins
),
{ok, Terms}.
pread_binaries(Fd, PosList) ->
{ok, Data} = pread_iolists(Fd, PosList),
{ok, lists:map(fun erlang:iolist_to_binary/1, Data)}.
pread_iolists(Fd, PosList) ->
case ioq:call(Fd, {pread_iolists, PosList}, erlang:get(io_priority)) of
{ok, DataMd5s} ->
Data = lists:zipwith(
fun(Pos, {IoList, Md5}) ->
verify_md5(Fd, Pos, IoList, Md5)
end,
PosList,
DataMd5s
),
{ok, Data};
Error ->
Error
end.
append_terms(Fd, Terms) ->
append_terms(Fd, Terms, []).
append_terms(Fd, Terms, Options) ->
Comp = couch_util:get_value(compression, Options, ?DEFAULT_COMPRESSION),
Bins = lists:map(
fun(Term) ->
couch_compress:compress(Term, Comp)
end,
Terms
),
append_binaries(Fd, Bins).
append_binaries(Fd, Bins) ->
WriteBins = lists:map(fun assemble_file_chunk/1, Bins),
ioq:call(Fd, {append_bins, WriteBins}, erlang:get(io_priority)).
%%----------------------------------------------------------------------
%% Purpose: The length of a file, in bytes.
%% Returns: {ok, Bytes}
%% or {error, Reason}.
%%----------------------------------------------------------------------
% length in bytes
bytes(Fd) ->
gen_server:call(Fd, bytes, infinity).
%%----------------------------------------------------------------------
%% Purpose: Truncate a file to the number of bytes.
%% Returns: ok
%% or {error, Reason}.
%%----------------------------------------------------------------------
truncate(Fd, Pos) ->
gen_server:call(Fd, {truncate, Pos}, infinity).
%%----------------------------------------------------------------------
%% Purpose: Ensure all bytes written to the file are flushed to disk.
%% Returns: ok
%% or {error, Reason}.
%%----------------------------------------------------------------------
sync(Filepath) when is_list(Filepath) ->
case file:open(Filepath, [append, raw]) of
{ok, Fd} ->
try
case file:sync(Fd) of
ok ->
ok;
{error, Reason} ->
erlang:error({fsync_error, Reason})
end
after
ok = file:close(Fd)
end;
{error, Error} ->
erlang:error(Error)
end;
sync(Fd) ->
case gen_server:call(Fd, sync, infinity) of
ok ->
ok;
{error, Reason} ->
erlang:error({fsync_error, Reason})
end.
%%----------------------------------------------------------------------
%% Purpose: Close the file.
%% Returns: ok
%%----------------------------------------------------------------------
close(Fd) ->
gen_server:call(Fd, close, infinity).
delete(RootDir, Filepath) ->
delete(RootDir, Filepath, []).
delete(RootDir, FullFilePath, Options) ->
EnableRecovery = config:get_boolean(
"couchdb",
"enable_database_recovery",
false
),
Async = not lists:member(sync, Options),
Context = couch_util:get_value(context, Options, compaction),
case Context =:= delete andalso EnableRecovery of
true ->
rename_file(FullFilePath);
false ->
DeleteAfterRename = config:get_boolean(
"couchdb",
"delete_after_rename",
true
),
delete_file(RootDir, FullFilePath, Async, DeleteAfterRename)
end.
delete_file(RootDir, Filepath, Async, DeleteAfterRename) ->
DelFile = filename:join([RootDir, ".delete", ?b2l(couch_uuids:random())]),
case file:rename(Filepath, DelFile) of
ok when DeleteAfterRename ->
if
(Async) ->
spawn(file, delete, [DelFile]),
ok;
true ->
file:delete(DelFile)
end;
Else ->
Else
end.
rename_file(Original) ->
DeletedFileName = deleted_filename(Original),
Now = calendar:local_time(),
case file:rename(Original, DeletedFileName) of
ok -> file:change_time(DeletedFileName, Now);
Else -> Else
end.
deleted_filename(Original) ->
{{Y, Mon, D}, {H, Min, S}} = calendar:universal_time(),
Suffix = lists:flatten(
io_lib:format(
".~w~2.10.0B~2.10.0B." ++
"~2.10.0B~2.10.0B~2.10.0B.deleted" ++
filename:extension(Original),
[Y, Mon, D, H, Min, S]
)
),
filename:rootname(Original) ++ Suffix.
nuke_dir(RootDelDir, Dir) ->
EnableRecovery = config:get_boolean(
"couchdb",
"enable_database_recovery",
false
),
case EnableRecovery of
true ->
rename_file(Dir);
false ->
delete_dir(RootDelDir, Dir)
end.
delete_dir(RootDelDir, Dir) ->
DeleteAfterRename = config:get_boolean(
"couchdb",
"delete_after_rename",
true
),
FoldFun = fun(File) ->
Path = Dir ++ "/" ++ File,
case filelib:is_dir(Path) of
true ->
ok = nuke_dir(RootDelDir, Path),
file:del_dir(Path);
false ->
delete_file(RootDelDir, Path, false, DeleteAfterRename)
end
end,
case file:list_dir(Dir) of
{ok, Files} ->
lists:foreach(FoldFun, Files),
ok = file:del_dir(Dir);
{error, enoent} ->
ok
end.
init_delete_dir(RootDir) ->
Dir = filename:join(RootDir, ".delete"),
% note: ensure_dir requires an actual filename companent, which is the
% reason for "foo".
filelib:ensure_dir(filename:join(Dir, "foo")),
spawn(fun() ->
filelib:fold_files(
Dir,
".*",
true,
fun(Filename, _) ->
ok = file:delete(Filename)
end,
ok
)
end),
ok.
read_header(Fd) ->
case ioq:call(Fd, find_header, erlang:get(io_priority)) of
{ok, Bin} ->
{ok, binary_to_term(Bin)};
Else ->
Else
end.
write_header(Fd, Data) ->
Bin = term_to_binary(Data),
Md5 = couch_hash:md5_hash(Bin),
% now we assemble the final header binary and write to disk
FinalBin = <<Md5/binary, Bin/binary>>,
ioq:call(Fd, {write_header, FinalBin}, erlang:get(io_priority)).
init_status_error(ReturnPid, Ref, Error) ->
ReturnPid ! {Ref, self(), Error},
ignore.
last_read(Fd) when is_pid(Fd) ->
Now = os:timestamp(),
couch_util:process_dict_get(Fd, read_timestamp, Now).
% server functions
init({Filepath, Options, ReturnPid, Ref}) ->
OpenOptions = file_open_options(Options),
Limit = get_pread_limit(),
IsSys = lists:member(sys_db, Options),
update_read_timestamp(),
case lists:member(create, Options) of
true ->
filelib:ensure_dir(Filepath),
case file:open(Filepath, OpenOptions) of
{ok, Fd} ->
%% Save Fd in process dictionary for debugging purposes
put(couch_file_fd, {Fd, Filepath}),
{ok, Length} = file:position(Fd, eof),
case Length > 0 of
true ->
% this means the file already exists and has data.
% FYI: We don't differentiate between empty files and non-existant
% files here.
case lists:member(overwrite, Options) of
true ->
{ok, 0} = file:position(Fd, 0),
ok = file:truncate(Fd),
ok = file:sync(Fd),
maybe_track_open_os_files(Options),
erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
case
init_crypto(
Filepath,
#file{
fd = Fd, is_sys = IsSys, pread_limit = Limit
},
Options
)
of
{ok, File} ->
{ok, File};
Error ->
init_status_error(ReturnPid, Ref, Error)
end;
false ->
ok = file:close(Fd),
init_status_error(ReturnPid, Ref, {error, eexist})
end;
false ->
maybe_track_open_os_files(Options),
erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
case
init_crypto(
Filepath,
#file{
fd = Fd, is_sys = IsSys, pread_limit = Limit
},
Options
)
of
{ok, File} ->
{ok, File};
Error ->
init_status_error(ReturnPid, Ref, Error)
end
end;
Error ->
init_status_error(ReturnPid, Ref, Error)
end;
false ->
% open in read mode first, so we don't create the file if it doesn't exist.
case file:open(Filepath, [read, raw]) of
{ok, Fd_Read} ->
case file:open(Filepath, OpenOptions) of
{ok, Fd} ->
%% Save Fd in process dictionary for debugging purposes
put(couch_file_fd, {Fd, Filepath}),
ok = file:close(Fd_Read),
maybe_track_open_os_files(Options),
{ok, Eof} = file:position(Fd, eof),
erlang:send_after(?INITIAL_WAIT, self(), maybe_close),
case
init_crypto(
Filepath,
#file{
fd = Fd, eof = Eof, is_sys = IsSys, pread_limit = Limit
},
Options
)
of
{ok, File} ->
{ok, File};
Error ->
init_status_error(ReturnPid, Ref, Error)
end;
Error ->
init_status_error(ReturnPid, Ref, Error)
end;
Error ->
init_status_error(ReturnPid, Ref, Error)
end
end.
file_open_options(Options) ->
[read, raw, binary] ++
case lists:member(read_only, Options) of
true ->
[];
false ->
[append]
end.
maybe_track_open_os_files(Options) ->
case not lists:member(sys_db, Options) of
true ->
couch_stats_process_tracker:track([couchdb, open_os_files]);
false ->
ok
end.
terminate(_Reason, #file{fd = nil}) ->
ok;
terminate(_Reason, #file{fd = Fd}) ->
ok = file:close(Fd).
handle_call(Msg, From, File) when ?IS_OLD_STATE(File) ->
handle_call(Msg, From, upgrade_state(File));
handle_call(close, _From, #file{fd = Fd} = File) ->
{stop, normal, file:close(Fd), File#file{fd = nil}};
handle_call({pread_iolist, Pos}, _From, File) ->
update_read_timestamp(),
{LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4),
case iolist_to_binary(LenIolist) of
% an MD5-prefixed term
<<1:1/integer, Len:31/integer>> ->
{Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len + 16),
{Md5, IoList} = extract_md5(Md5AndIoList),
{reply, {ok, IoList, Md5}, File};
<<0:1/integer, Len:31/integer>> ->
{Iolist, _} = read_raw_iolist_int(File, NextPos, Len),
{reply, {ok, Iolist, <<>>}, File}
end;
handle_call({pread_iolists, PosL}, _From, File) ->
update_read_timestamp(),
LocNums1 = [{Pos, 4} || Pos <- PosL],
DataSizes = read_multi_raw_iolists_int(File, LocNums1),
LocNums2 = lists:map(
fun({LenIoList, NextPos}) ->
case iolist_to_binary(LenIoList) of
% an MD5-prefixed term
<<1:1/integer, Len:31/integer>> ->
{NextPos, Len + 16};
<<0:1/integer, Len:31/integer>> ->
{NextPos, Len}
end
end,
DataSizes
),
Resps = read_multi_raw_iolists_int(File, LocNums2),
Extracted = lists:zipwith(
fun({LenIoList, _}, {IoList, _}) ->
case iolist_to_binary(LenIoList) of
<<1:1/integer, _:31/integer>> ->
{Md5, IoList} = extract_md5(IoList),
{IoList, Md5};
<<0:1/integer, _:31/integer>> ->
{IoList, <<>>}
end
end,
DataSizes,
Resps
),
{reply, {ok, Extracted}, File};
handle_call(bytes, _From, #file{fd = Fd} = File) ->
{reply, file:position(Fd, eof), File};
handle_call({set_db_pid, Pid}, _From, #file{db_monitor = OldRef} = File) ->
case is_reference(OldRef) of
true -> demonitor(OldRef, [flush]);
false -> ok
end,
Ref = monitor(process, Pid),
{reply, ok, File#file{db_monitor = Ref}};
handle_call(sync, _From, #file{fd = Fd} = File) ->
case file:sync(Fd) of
ok ->
{reply, ok, File};
{error, _} = Error ->
% We're intentionally dropping all knowledge
% of this Fd so that we don't accidentally
% recover in some whacky edge case that I
% can't fathom.
{stop, Error, Error, #file{fd = nil}}
end;
handle_call({truncate, Pos}, _From, #file{fd = Fd, dek = DEK} = File) when
DEK == undefined orelse Pos >= ?SIZE_BLOCK
->
{ok, Pos} = file:position(Fd, Pos),
case file:truncate(Fd) of
ok ->
{reply, ok, File#file{eof = Pos}};
Error ->
{reply, Error, File}
end;
%% truncating an encrypted file earlier than the end of the encryption header.
%% reuse the wrapped key with a new iv.
handle_call({truncate, Pos}, _From, #file{fd = Fd, dek = DEK} = File0) when
DEK /= undefined andalso Pos < ?SIZE_BLOCK
->
case read_encryption_header(File0) of
{ok, {KeyID, WEK, _IV}} ->
{ok, 0} = file:position(Fd, 0),
case file:truncate(Fd) of
ok ->
File1 = File0#file{eof = 0},
IV = new_aes_iv(),
case write_encryption_header(File1, KeyID, WEK, IV) of
{ok, File2} ->
ok = file:sync(File2#file.fd),
{reply, ok, init_crypto_file(File2, DEK, IV)};
{error, Reason} ->
{reply, {error, Reason}, File1}
end;
Error ->
{reply, Error, File0}
end;
{error, Reason} ->
{reply, {error, Reason}, File0}
end;
handle_call({append_bin, Bin}, _From, #file{eof = Pos} = File) ->
Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
Size = iolist_size(Blocks),
case encrypted_write(File, Blocks) of
ok ->
{reply, {ok, Pos, Size}, File#file{eof = Pos + Size}};
Error ->
{reply, Error, reset_eof(File)}
end;
handle_call({append_bins, Bins}, _From, #file{eof = Pos} = File) ->
{BlockResps, FinalPos} = lists:mapfoldl(
fun(Bin, PosAcc) ->
Blocks = make_blocks(PosAcc rem ?SIZE_BLOCK, Bin),
Size = iolist_size(Blocks),
{{Blocks, {PosAcc, Size}}, PosAcc + Size}
end,
Pos,
Bins
),
{AllBlocks, Resps} = lists:unzip(BlockResps),
case encrypted_write(File, AllBlocks) of
ok ->
{reply, {ok, Resps}, File#file{eof = FinalPos}};
Error ->
{reply, Error, reset_eof(File)}
end;
handle_call({write_header, Bin}, _From, #file{eof = Pos} = File) ->
BinSize = byte_size(Bin),
case Pos rem ?SIZE_BLOCK of
0 ->
Padding = <<>>;
BlockOffset ->
Padding = <<0:(8 * (?SIZE_BLOCK - BlockOffset))>>
end,
FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])],
case encrypted_write(File, FinalBin) of
ok ->
{reply, ok, File#file{eof = Pos + iolist_size(FinalBin)}};
Error ->
{reply, Error, reset_eof(File)}
end;
handle_call(find_header, _From, #file{eof = Pos} = File) ->
{reply, find_header(File, Pos div ?SIZE_BLOCK), File}.
handle_cast(close, Fd) ->
{stop, normal, Fd}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_info(Msg, File) when ?IS_OLD_STATE(File) ->
handle_info(Msg, upgrade_state(File));
handle_info(maybe_close, File) ->
case is_idle(File) of
true ->
{stop, normal, File};
false ->
erlang:send_after(?MONITOR_CHECK, self(), maybe_close),
{noreply, File}
end;
handle_info({'DOWN', Ref, process, _Pid, _Info}, #file{db_monitor = Ref} = File) ->
case is_idle(File) of
true -> {stop, normal, File};
false -> {noreply, File}
end.
format_status(_Opt, [PDict, #file{} = File]) ->
{_Fd, FilePath} = couch_util:get_value(couch_file_fd, PDict),
[{data, [{"State", File}, {"InitialFilePath", FilePath}]}].
find_header(#file{} = File, Block) ->
case (catch load_header(File, Block)) of
{ok, Bin} ->
{ok, Bin};
_Error ->
ReadCount = config:get_integer(
"couchdb", "find_header_read_count", ?DEFAULT_READ_COUNT
),
find_header(File, Block - 1, ReadCount)
end.
load_header(#file{} = File, Block) ->
{ok, <<1, HeaderLen:32/integer, RestBlock/binary>>} =
encrypted_pread(File, Block * ?SIZE_BLOCK, ?SIZE_BLOCK),
load_header(File, Block * ?SIZE_BLOCK, HeaderLen, RestBlock).
load_header(#file{} = File, Pos, HeaderLen) ->
load_header(File, Pos, HeaderLen, <<>>).
load_header(#file{} = File, Pos, HeaderLen, RestBlock) ->
TotalBytes = calculate_total_read_len(?PREFIX_SIZE, HeaderLen),
RawBin =
case TotalBytes =< byte_size(RestBlock) of
true ->
<<RawBin0:TotalBytes/binary, _/binary>> = RestBlock,
RawBin0;
false ->
ReadStart = Pos + ?PREFIX_SIZE + byte_size(RestBlock),
ReadLen = TotalBytes - byte_size(RestBlock),
{ok, Missing} = encrypted_pread(File, ReadStart, ReadLen),
<<RestBlock/binary, Missing/binary>>
end,
<<Md5Sig:16/binary, HeaderBin/binary>> =
iolist_to_binary(remove_block_prefixes(?PREFIX_SIZE, RawBin)),
Md5Sig = couch_hash:md5_hash(HeaderBin),
{ok, HeaderBin}.
%% Read multiple block locations using a single file:pread/2.
-spec find_header(file:fd(), block_id(), non_neg_integer()) ->
{ok, binary()} | no_valid_header.
find_header(_File, Block, _ReadCount) when Block < 0 ->
no_valid_header;
find_header(#file{} = File, Block, ReadCount) ->
FirstBlock = max(0, Block - ReadCount + 1),
BlockLocations = [?SIZE_BLOCK * B || B <- lists:seq(FirstBlock, Block)],
{ok, DataL} = encrypted_pread(File, [{L, ?PREFIX_SIZE} || L <- BlockLocations]),
%% Since BlockLocations are ordered from oldest to newest, we rely
%% on lists:foldl/3 to reverse the order, making HeaderLocations
%% correctly ordered from newest to oldest.
HeaderLocations = lists:foldl(
fun
({Loc, <<1, HeaderSize:32/integer>>}, Acc) ->
[{Loc, HeaderSize} | Acc];
(_, Acc) ->
Acc
end,
[],
lists:zip(BlockLocations, DataL)
),
case find_newest_header(File, HeaderLocations) of
{ok, _Location, HeaderBin} ->
{ok, HeaderBin};
_ ->
ok = file:advise(
File#file.fd, hd(BlockLocations), ReadCount * ?SIZE_BLOCK, dont_need
),
NextBlock = hd(BlockLocations) div ?SIZE_BLOCK - 1,
find_header(File, NextBlock, ReadCount)
end.
-spec find_newest_header(file:fd(), [{location(), header_size()}]) ->
{ok, location(), binary()} | not_found.
find_newest_header(_File, []) ->
not_found;
find_newest_header(#file{} = File, [{Location, Size} | LocationSizes]) ->
case (catch load_header(File, Location, Size)) of
{ok, HeaderBin} ->
{ok, Location, HeaderBin};
_Error ->
find_newest_header(File, LocationSizes)
end.
-spec read_raw_iolist_int(#file{}, Pos :: non_neg_integer(), Len :: non_neg_integer()) ->
{Data :: iolist(), CurPos :: non_neg_integer()}.
% 0110 UPGRADE CODE
read_raw_iolist_int(Fd, {Pos, _Size}, Len) ->
read_raw_iolist_int(Fd, Pos, Len);
read_raw_iolist_int(#file{} = File, Pos, Len) ->
{Pos, TotalBytes} = get_pread_locnum(File, Pos, Len),
case catch encrypted_pread(File, Pos, TotalBytes) of
{ok, <<RawBin:TotalBytes/binary>>} ->
{remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes};
Else ->
% This clause matches when the file we are working with got truncated
% outside of CouchDB after we opened it. To find affected files, we
% need to log the file path.
%
% Technically, this should also go into read_multi_raw_iolists_int/2,
% but that doesn’t seem to be in use anywhere.
{_Fd, Filepath} = get(couch_file_fd),
throw({file_truncate_error, Else, Filepath})
end.
% used in couch_bt_engine_compactor.erl via pread_terms/2
read_multi_raw_iolists_int(#file{} = File, PosLens) ->
LocNums = lists:map(
fun({Pos, Len}) ->
get_pread_locnum(File, Pos, Len)
end,
PosLens
),
{ok, Bins} = encrypted_pread(File, LocNums),
lists:zipwith(
fun({Pos, TotalBytes}, Bin) ->
<<RawBin:TotalBytes/binary>> = Bin,
{remove_block_prefixes(Pos rem ?SIZE_BLOCK, RawBin), Pos + TotalBytes}
end,
LocNums,
Bins
).
get_pread_locnum(File, Pos, Len) ->
BlockOffset = Pos rem ?SIZE_BLOCK,
TotalBytes = calculate_total_read_len(BlockOffset, Len),
case Pos + TotalBytes of
Size when Size > File#file.eof ->
couch_stats:increment_counter([pread, exceed_eof]),
{_Fd, Filepath} = get(couch_file_fd),
throw({read_beyond_eof, Filepath});
Size when Size > File#file.pread_limit ->
couch_stats:increment_counter([pread, exceed_limit]),
{_Fd, Filepath} = get(couch_file_fd),
throw({exceed_pread_limit, Filepath, File#file.pread_limit});
_ ->
{Pos, TotalBytes}
end.
-spec extract_md5(iolist()) -> {binary(), iolist()}.
extract_md5(FullIoList) ->
{Md5List, IoList} = split_iolist(FullIoList, 16, []),
{iolist_to_binary(Md5List), IoList}.
calculate_total_read_len(0, FinalLen) ->
calculate_total_read_len(1, FinalLen) + 1;
calculate_total_read_len(BlockOffset, FinalLen) ->
case ?SIZE_BLOCK - BlockOffset of
BlockLeft when BlockLeft >= FinalLen ->
FinalLen;
BlockLeft ->
FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK - 1)) +
if
((FinalLen - BlockLeft) rem (?SIZE_BLOCK - 1)) =:= 0 -> 0;
true -> 1
end
end.
remove_block_prefixes(_BlockOffset, <<>>) ->
[];
remove_block_prefixes(0, <<_BlockPrefix, Rest/binary>>) ->
remove_block_prefixes(1, Rest);
remove_block_prefixes(BlockOffset, Bin) ->
BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset,
case size(Bin) of
Size when Size > BlockBytesAvailable ->
<<DataBlock:BlockBytesAvailable/binary, Rest/binary>> = Bin,
[DataBlock | remove_block_prefixes(0, Rest)];
_Size ->
[Bin]
end.
make_blocks(_BlockOffset, []) ->
[];
make_blocks(0, IoList) ->
[<<0>> | make_blocks(1, IoList)];
make_blocks(BlockOffset, IoList) ->
case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of
{Begin, End} ->
[Begin | make_blocks(0, End)];
_SplitRemaining ->
IoList
end.
%% @doc Returns a tuple where the first element contains the leading SplitAt
%% bytes of the original iolist, and the 2nd element is the tail. If SplitAt
%% is larger than byte_size(IoList), return the difference.
-spec split_iolist(IoList :: iolist(), SplitAt :: non_neg_integer(), Acc :: list()) ->
{iolist(), iolist()} | non_neg_integer().
split_iolist(List, 0, BeginAcc) ->
{lists:reverse(BeginAcc), List};
split_iolist([], SplitAt, _BeginAcc) ->
SplitAt;
split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) when SplitAt > byte_size(Bin) ->
split_iolist(Rest, SplitAt - byte_size(Bin), [Bin | BeginAcc]);
split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) ->
<<Begin:SplitAt/binary, End/binary>> = Bin,
split_iolist([End | Rest], 0, [Begin | BeginAcc]);
split_iolist([Sublist | Rest], SplitAt, BeginAcc) when is_list(Sublist) ->
case split_iolist(Sublist, SplitAt, BeginAcc) of
{Begin, End} ->
{Begin, [End | Rest]};
SplitRemaining ->
split_iolist(Rest, SplitAt - (SplitAt - SplitRemaining), [Sublist | BeginAcc])
end;
split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) ->
split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]).
monitored_by_pids() ->
{monitored_by, PidsAndRefs} = process_info(self(), monitored_by),
lists:filter(fun is_pid/1, PidsAndRefs).
verify_md5(_Fd, _Pos, IoList, <<>>) ->
IoList;
verify_md5(Fd, Pos, IoList, Md5) ->
case couch_hash:md5_hash(IoList) of
Md5 -> IoList;
_ -> report_md5_error(Fd, Pos)
end.
report_md5_error(Fd, Pos) ->
couch_log:emergency("File corruption in ~p at position ~B", [Fd, Pos]),
exit({file_corruption, <<"file corruption">>}).
% System dbs aren't monitored by couch_stats_process_tracker
is_idle(#file{is_sys = true}) ->
case monitored_by_pids() of
[] -> true;
_ -> false
end;
is_idle(#file{is_sys = false}) ->
Tracker = whereis(couch_stats_process_tracker),
case monitored_by_pids() of
[] -> true;
[Tracker] -> true;
[_] -> exit(tracker_monitoring_failed);
_ -> false
end.
-spec process_info(CouchFilePid :: pid()) ->
{Fd :: pid() | tuple(), FilePath :: string()} | undefined.
process_info(Pid) ->
couch_util:process_dict_get(Pid, couch_file_fd).
update_read_timestamp() ->
put(read_timestamp, os:timestamp()).
upgrade_state(#file{db_monitor = DbPid} = File) when is_pid(DbPid) ->
unlink(DbPid),
Ref = monitor(process, DbPid),
File#file{db_monitor = Ref};
upgrade_state(State) ->
State.
get_pread_limit() ->
case config:get_integer("couchdb", "max_pread_size", 0) of
N when N > 0 -> N;
_ -> infinity
end.
%% in event of a partially successful write.
reset_eof(#file{} = File) ->
{ok, Eof} = file:position(File#file.fd, eof),
File#file{eof = Eof}.
%% new file.
init_crypto(_Filepath, #file{eof = 0, dek = undefined} = File0, Options) ->
case lists:keyfind(db_name, 1, Options) of
{db_name, DbName} ->
case couch_encryption_manager:new_dek(DbName) of
{ok, KeyID, DEK, WEK} ->
IV = new_aes_iv(),
case write_encryption_header(File0, KeyID, WEK, IV) of
{ok, File1} ->
ok = file:sync(File1#file.fd),
{ok, init_crypto_file(File1, DEK, IV)};
{error, Reason} ->
{error, Reason}
end;
dont_encrypt ->
{ok, File0};
{error, Reason} ->
{error, Reason}
end;
false ->
{ok, File0}
end;
%% we're opening an existing file and need to unwrap the key if file is encrypted.
init_crypto(Filepath, #file{eof = Eof, dek = undefined} = File, _Options) when Eof >= ?SIZE_BLOCK ->
case read_encryption_header(File) of
{ok, {KeyID, WEK, IV}} ->
case couch_encryption_manager:unwrap_dek(KeyID, WEK) of
{ok, DEK} ->
{ok, init_crypto_file(File, DEK, IV)};
{ok, NewKeyID, DEK, NewWEK} ->
%% manager has rewrapped the DEK with a new key, update our header.
case file:open(Filepath, [read, write, raw]) of
{ok, Fd} ->
case write_encryption_header(#file{fd = Fd}, NewKeyID, NewWEK, IV) of
{ok, _File} ->
ok = file:sync(Fd),
ok = file:close(Fd),
{ok, init_crypto_file(File, DEK, IV)};
{error, Reason} ->
ok = file:close(Fd),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end;
not_encrypted ->
{ok, File};
{error, Reason} ->
{error, Reason}
end.
init_crypto_file(#file{} = File, DEK, IV) when is_binary(DEK), is_binary(IV) ->
File#file{iv = crypto:bytes_to_integer(IV), dek = DEK}.
write_encryption_header(#file{eof = 0} = File, KeyID, WrappedKey, IV) when
bit_size(KeyID) =< 128, bit_size(WrappedKey) == 512, bit_size(IV) == 128
->
Header = [<<?ENCRYPTED_HEADER_MARKER>>, <<(byte_size(KeyID)):16>>, KeyID, IV, WrappedKey],
PaddedHeader = [Header, <<0:((?ENCRYPTION_HEADER_SIZE - iolist_size(Header) - 32) * 8)>>],
DigestHeader = [PaddedHeader, crypto:hash(sha256, PaddedHeader)],
?ENCRYPTION_HEADER_SIZE = iolist_size(DigestHeader),
case file:write(File#file.fd, [DigestHeader, DigestHeader]) of
ok ->
{ok, File#file{eof = ?SIZE_BLOCK}};
{error, Reason} ->
{error, Reason}
end.
read_encryption_header(#file{} = File) ->
case file:pread(File#file.fd, 0, ?SIZE_BLOCK) of
{ok,
<<?ENCRYPTED_HEADER_MARKER, Bin1:(?ENCRYPTION_HEADER_SIZE - 16)/binary,
?ENCRYPTED_HEADER_MARKER, Bin2/binary>>} ->
Header1 = extract_header(<<?ENCRYPTED_HEADER_MARKER, Bin1/binary>>),
Header2 = extract_header(<<?ENCRYPTED_HEADER_MARKER, Bin2/binary>>),
{_Fd, Filepath} = get(couch_file_fd),
case {Header1, Header2} of
{{ok, Header}, {ok, Header}} ->
% headers match
{ok, Header};
{{ok, Header1}, {ok, Header2}} ->
couch_log:warning(
"~s: Encryption header version differences.~nPrimary header: ~p~nSecondary header: ~p. Using primary header",
[Filepath, Header1, Header2]
),
{ok, Header1};
{{ok, Header}, {error, Reason}} ->
couch_log:warning(
"~s: Secondary encryption header corruption (error: ~p). Using primary header.",
[
Filepath, Reason
]
),
{ok, Header};
{{error, Reason}, {ok, Header}} ->
couch_log:warning(
"~s: Primary encryption header corruption (error: ~p). Using secondary header.",
[
Filepath, Reason
]
),
{ok, Header};
{{error, Reason1}, {error, Reason2}} ->
couch_log:error("~s: Both encryption headers corrupted (errors: ~p, ~p).", [
Filepath, Reason1, Reason2
]),
{error, corrupted_encryption_header}
end;
{ok, _} ->
not_encrypted;
{error, Reason} ->
{error, Reason}
end.
extract_header(
<<?ENCRYPTED_HEADER_MARKER, KeyIDLen:16, KeyID:(KeyIDLen)/binary, IV:16/binary,
WrappedKey:64/binary, _/binary>> = Bin
) ->
case check_header(Bin) of
true ->
{ok, {KeyID, WrappedKey, IV}};
false ->
{error, corrupted_encryption_header}
end;
extract_header(_) ->
{error, corrupted_encryption_header}.
check_header(Bin) ->
Data = binary:part(Bin, 0, byte_size(Bin) - 32),
Digest = binary:part(Bin, byte_size(Bin), -32),
Digest == crypto:hash(sha256, Data).
%% We can encrypt any section of the file but we must make
%% sure we align with the key stream.
encrypted_write(#file{dek = undefined} = File, Data) ->
file:write(File#file.fd, Data);
encrypted_write(#file{} = File, Data) ->
CipherText = ?encrypt_ctr(File, File#file.eof, pad(File#file.eof, Data)),
file:write(File#file.fd, unpad(File#file.eof, CipherText)).
encrypted_pread(#file{dek = undefined} = File, LocNums) ->
file:pread(File#file.fd, LocNums);
encrypted_pread(#file{} = File, LocNums) ->
case file:pread(File#file.fd, LocNums) of
{ok, DataL} ->
{ok,
lists:zipwith(
fun({Pos, _Len}, CipherText) ->
PlainText = ?decrypt_ctr(File, Pos, pad(Pos, CipherText)),
unpad(Pos, PlainText)
end,
LocNums,
DataL
)};
Else ->
Else
end.
encrypted_pread(#file{dek = undefined} = File, Pos, Len) ->
file:pread(File#file.fd, Pos, Len);
encrypted_pread(#file{} = File, Pos, Len) ->
case file:pread(File#file.fd, Pos, Len) of
{ok, CipherText} ->
PlainText = ?decrypt_ctr(File, Pos, pad(Pos, CipherText)),
{ok, unpad(Pos, PlainText)};
Else ->
Else
end.
new_aes_iv() ->
crypto:strong_rand_bytes(16).
aes_ctr(IV, Pos) ->
<<(IV + (Pos div 16)):128>>.
pad(Pos, IOData) ->
[<<0:(Pos rem 16 * 8)>>, IOData].
unpad(Pos, Bin) when is_binary(Bin) ->
Size = Pos rem 16 * 8,
<<_:Size, Result/binary>> = Bin,
Result.
-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").
deleted_filename_test_() ->
DbNames = ["dbname", "db.name", "user/dbname"],
Fixtures = make_filename_fixtures(DbNames),
lists:map(
fun(Fixture) ->
should_create_proper_deleted_filename(Fixture)
end,
Fixtures
).
should_create_proper_deleted_filename(Before) ->
{Before,
?_test(begin
BeforeExtension = filename:extension(Before),
BeforeBasename = filename:basename(Before, BeforeExtension),
Re = "^" ++ BeforeBasename ++ "\.[0-9]{8}\.[0-9]{6}\.deleted\..*$",
After = deleted_filename(Before),
?assertEqual(
match,
re:run(filename:basename(After), Re, [{capture, none}])
),
?assertEqual(BeforeExtension, filename:extension(After))
end)}.
make_filename_fixtures(DbNames) ->
Formats = [
"~s.couch",
".~s_design/mrview/3133e28517e89a3e11435dd5ac4ad85a.view",
"shards/00000000-1fffffff/~s.1458336317.couch",
".shards/00000000-1fffffff/~s.1458336317_design",
".shards/00000000-1fffffff/~s.1458336317_design"
"/mrview/3133e28517e89a3e11435dd5ac4ad85a.view"
],
lists:flatmap(
fun(DbName) ->
lists:map(
fun(Format) ->
filename:join("/srv/data", io_lib:format(Format, [DbName]))
end,
Formats
)
end,
DbNames
).
-endif.