| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_file). |
| -behaviour(gen_server). |
| |
| -include("couch_db.hrl"). |
| |
| -define(SIZE_BLOCK, 4096). |
| |
| -record(file, { |
| fd, |
| tail_append_begin = 0, % 09 UPGRADE CODE |
| eof = 0 |
| }). |
| |
| -export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]). |
| -export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]). |
| -export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]). |
| -export([append_term_md5/2,append_binary_md5/2]). |
| -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]). |
| -export([delete/2,delete/3,init_delete_dir/1]). |
| |
| %%---------------------------------------------------------------------- |
| %% Args: Valid Options are [create] and [create,overwrite]. |
| %% Files are opened in read/write mode. |
| %% Returns: On success, {ok, Fd} |
| %% or {error, Reason} if the file could not be opened. |
| %%---------------------------------------------------------------------- |
| |
| open(Filepath) -> |
| open(Filepath, []). |
| |
| open(Filepath, Options) -> |
| case gen_server:start_link(couch_file, |
| {Filepath, Options, self(), Ref = make_ref()}, []) of |
| {ok, Fd} -> |
| {ok, Fd}; |
| ignore -> |
| % get the error |
| receive |
| {Ref, Pid, Error} -> |
| case process_info(self(), trap_exit) of |
| {trap_exit, true} -> receive {'EXIT', Pid, _} -> ok end; |
| {trap_exit, false} -> ok |
| end, |
| Error |
| end; |
| Error -> |
| Error |
| end. |
| |
| |
| %%---------------------------------------------------------------------- |
| %% Purpose: To append an Erlang term to the end of the file. |
| %% Args: Erlang term to serialize and append to the file. |
| %% Returns: {ok, Pos} where Pos is the file offset to the beginning the |
| %% serialized term. Use pread_term to read the term back. |
| %% or {error, Reason}. |
| %%---------------------------------------------------------------------- |
| |
| append_term(Fd, Term) -> |
| append_binary(Fd, term_to_binary(Term)). |
| |
| append_term_md5(Fd, Term) -> |
| append_binary_md5(Fd, term_to_binary(Term)). |
| |
| |
| %%---------------------------------------------------------------------- |
| %% Purpose: To append an Erlang binary to the end of the file. |
| %% Args: Erlang term to serialize and append to the file. |
| %% Returns: {ok, Pos} where Pos is the file offset to the beginning the |
| %% serialized term. Use pread_term to read the term back. |
| %% or {error, Reason}. |
| %%---------------------------------------------------------------------- |
| |
| append_binary(Fd, Bin) -> |
| Size = iolist_size(Bin), |
| gen_server:call(Fd, {append_bin, |
| [<<0:1/integer,Size:31/integer>>, Bin]}, infinity). |
| |
| append_binary_md5(Fd, Bin) -> |
| Size = iolist_size(Bin), |
| gen_server:call(Fd, {append_bin, |
| [<<1:1/integer,Size:31/integer>>, couch_util:md5(Bin), Bin]}, infinity). |
| |
| |
| %%---------------------------------------------------------------------- |
| %% Purpose: Reads a term from a file that was written with append_term |
| %% Args: Pos, the offset into the file where the term is serialized. |
| %% Returns: {ok, Term} |
| %% or {error, Reason}. |
| %%---------------------------------------------------------------------- |
| |
| |
| pread_term(Fd, Pos) -> |
| {ok, Bin} = pread_binary(Fd, Pos), |
| {ok, binary_to_term(Bin)}. |
| |
| |
| %%---------------------------------------------------------------------- |
| %% Purpose: Reads a binrary from a file that was written with append_binary |
| %% Args: Pos, the offset into the file where the term is serialized. |
| %% Returns: {ok, Term} |
| %% or {error, Reason}. |
| %%---------------------------------------------------------------------- |
| |
| pread_binary(Fd, Pos) -> |
| {ok, L} = pread_iolist(Fd, Pos), |
| {ok, iolist_to_binary(L)}. |
| |
| |
| pread_iolist(Fd, Pos) -> |
| case gen_server:call(Fd, {pread_iolist, Pos}, infinity) of |
| {ok, IoList, <<>>} -> |
| {ok, IoList}; |
| {ok, IoList, Md5} -> |
| case couch_util:md5(IoList) of |
| Md5 -> |
| {ok, IoList}; |
| _ -> |
| exit({file_corruption, <<"file corruption">>}) |
| end; |
| Error -> |
| Error |
| end. |
| |
| %%---------------------------------------------------------------------- |
| %% Purpose: The length of a file, in bytes. |
| %% Returns: {ok, Bytes} |
| %% or {error, Reason}. |
| %%---------------------------------------------------------------------- |
| |
| % length in bytes |
| bytes(Fd) -> |
| gen_server:call(Fd, bytes, infinity). |
| |
| %%---------------------------------------------------------------------- |
| %% Purpose: Truncate a file to the number of bytes. |
| %% Returns: ok |
| %% or {error, Reason}. |
| %%---------------------------------------------------------------------- |
| |
| truncate(Fd, Pos) -> |
| gen_server:call(Fd, {truncate, Pos}, infinity). |
| |
| %%---------------------------------------------------------------------- |
| %% Purpose: Ensure all bytes written to the file are flushed to disk. |
| %% Returns: ok |
| %% or {error, Reason}. |
| %%---------------------------------------------------------------------- |
| |
| sync(Filepath) when is_list(Filepath) -> |
| {ok, Fd} = file:open(Filepath, [append, raw]), |
| try ok = file:sync(Fd) after ok = file:close(Fd) end; |
| sync(Fd) -> |
| gen_server:call(Fd, sync, infinity). |
| |
| %%---------------------------------------------------------------------- |
| %% Purpose: Close the file. |
| %% Returns: ok |
| %%---------------------------------------------------------------------- |
| close(Fd) -> |
| MRef = erlang:monitor(process, Fd), |
| try |
| catch unlink(Fd), |
| catch exit(Fd, shutdown), |
| receive |
| {'DOWN', MRef, _, _, _} -> |
| ok |
| end |
| after |
| erlang:demonitor(MRef, [flush]) |
| end. |
| |
| |
| delete(RootDir, Filepath) -> |
| delete(RootDir, Filepath, true). |
| |
| |
| delete(RootDir, Filepath, Async) -> |
| DelFile = filename:join([RootDir,".delete", ?b2l(couch_uuids:random())]), |
| case file:rename(Filepath, DelFile) of |
| ok -> |
| if (Async) -> |
| spawn(file, delete, [DelFile]), |
| ok; |
| true -> |
| file:delete(DelFile) |
| end; |
| Error -> |
| Error |
| end. |
| |
| |
| init_delete_dir(RootDir) -> |
| Dir = filename:join(RootDir,".delete"), |
| % note: ensure_dir requires an actual filename companent, which is the |
| % reason for "foo". |
| filelib:ensure_dir(filename:join(Dir,"foo")), |
| filelib:fold_files(Dir, ".*", true, |
| fun(Filename, _) -> |
| ok = file:delete(Filename) |
| end, ok). |
| |
| |
| % 09 UPGRADE CODE |
| old_pread(Fd, Pos, Len) -> |
| {ok, <<RawBin:Len/binary>>, false} = gen_server:call(Fd, {pread, Pos, Len}, infinity), |
| {ok, RawBin}. |
| |
| % 09 UPGRADE CODE |
| upgrade_old_header(Fd, Sig) -> |
| gen_server:call(Fd, {upgrade_old_header, Sig}, infinity). |
| |
| |
| read_header(Fd) -> |
| case gen_server:call(Fd, find_header, infinity) of |
| {ok, Bin} -> |
| {ok, binary_to_term(Bin)}; |
| Else -> |
| Else |
| end. |
| |
| write_header(Fd, Data) -> |
| Bin = term_to_binary(Data), |
| Md5 = couch_util:md5(Bin), |
| % now we assemble the final header binary and write to disk |
| FinalBin = <<Md5/binary, Bin/binary>>, |
| gen_server:call(Fd, {write_header, FinalBin}, infinity). |
| |
| |
| |
| |
| init_status_error(ReturnPid, Ref, Error) -> |
| ReturnPid ! {Ref, self(), Error}, |
| ignore. |
| |
| % server functions |
| |
| init({Filepath, Options, ReturnPid, Ref}) -> |
| process_flag(trap_exit, true), |
| case lists:member(create, Options) of |
| true -> |
| filelib:ensure_dir(Filepath), |
| case file:open(Filepath, [read, append, raw, binary]) of |
| {ok, Fd} -> |
| {ok, Length} = file:position(Fd, eof), |
| case Length > 0 of |
| true -> |
| % this means the file already exists and has data. |
| % FYI: We don't differentiate between empty files and non-existant |
| % files here. |
| case lists:member(overwrite, Options) of |
| true -> |
| {ok, 0} = file:position(Fd, 0), |
| ok = file:truncate(Fd), |
| ok = file:sync(Fd), |
| maybe_track_open_os_files(Options), |
| {ok, #file{fd=Fd}}; |
| false -> |
| ok = file:close(Fd), |
| init_status_error(ReturnPid, Ref, file_exists) |
| end; |
| false -> |
| maybe_track_open_os_files(Options), |
| {ok, #file{fd=Fd}} |
| end; |
| Error -> |
| init_status_error(ReturnPid, Ref, Error) |
| end; |
| false -> |
| % open in read mode first, so we don't create the file if it doesn't exist. |
| case file:open(Filepath, [read, raw]) of |
| {ok, Fd_Read} -> |
| {ok, Fd} = file:open(Filepath, [read, append, raw, binary]), |
| ok = file:close(Fd_Read), |
| maybe_track_open_os_files(Options), |
| {ok, Length} = file:position(Fd, eof), |
| {ok, #file{fd=Fd, eof=Length}}; |
| Error -> |
| init_status_error(ReturnPid, Ref, Error) |
| end |
| end. |
| |
| maybe_track_open_os_files(FileOptions) -> |
| case lists:member(sys_db, FileOptions) of |
| true -> |
| ok; |
| false -> |
| couch_stats_collector:track_process_count({couchdb, open_os_files}) |
| end. |
| |
| terminate(_Reason, _Fd) -> |
| ok. |
| |
| |
| handle_call({pread_iolist, Pos}, _From, File) -> |
| {LenIolist, NextPos} = read_raw_iolist_int(File, Pos, 4), |
| case iolist_to_binary(LenIolist) of |
| <<1:1/integer,Len:31/integer>> -> % an MD5-prefixed term |
| {Md5AndIoList, _} = read_raw_iolist_int(File, NextPos, Len+16), |
| {Md5, IoList} = extract_md5(Md5AndIoList), |
| {reply, {ok, IoList, Md5}, File}; |
| <<0:1/integer,Len:31/integer>> -> |
| {Iolist, _} = read_raw_iolist_int(File, NextPos, Len), |
| {reply, {ok, Iolist, <<>>}, File} |
| end; |
| handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) -> |
| {ok, Bin} = file:pread(Fd, Pos, Bytes), |
| {reply, {ok, Bin, Pos >= TailAppendBegin}, File}; |
| handle_call(bytes, _From, #file{eof=Length}=File) -> |
| {reply, {ok, Length}, File}; |
| handle_call(sync, _From, #file{fd=Fd}=File) -> |
| {reply, file:sync(Fd), File}; |
| handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) -> |
| {ok, Pos} = file:position(Fd, Pos), |
| case file:truncate(Fd) of |
| ok -> |
| {reply, ok, File#file{eof=Pos}}; |
| Error -> |
| {reply, Error, File} |
| end; |
| handle_call({append_bin, Bin}, _From, #file{fd=Fd, eof=Pos}=File) -> |
| Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin), |
| case file:write(Fd, Blocks) of |
| ok -> |
| {reply, {ok, Pos}, File#file{eof=Pos+iolist_size(Blocks)}}; |
| Error -> |
| {reply, Error, File} |
| end; |
| handle_call({write_header, Bin}, _From, #file{fd=Fd, eof=Pos}=File) -> |
| BinSize = size(Bin), |
| case Pos rem ?SIZE_BLOCK of |
| 0 -> |
| Padding = <<>>; |
| BlockOffset -> |
| Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>> |
| end, |
| FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(5, [Bin])], |
| case file:write(Fd, FinalBin) of |
| ok -> |
| {reply, ok, File#file{eof=Pos+iolist_size(FinalBin)}}; |
| Error -> |
| {reply, Error, File} |
| end; |
| |
| |
| handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) -> |
| case (catch read_old_header(Fd, Prefix)) of |
| {ok, Header} -> |
| TailAppendBegin = File#file.eof, |
| Bin = term_to_binary(Header), |
| Md5 = couch_util:md5(Bin), |
| % now we assemble the final header binary and write to disk |
| FinalBin = <<Md5/binary, Bin/binary>>, |
| {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File), |
| ok = write_old_header(Fd, <<"upgraded">>, TailAppendBegin), |
| {reply, ok, File#file{tail_append_begin=TailAppendBegin}}; |
| _Error -> |
| case (catch read_old_header(Fd, <<"upgraded">>)) of |
| {ok, TailAppendBegin} -> |
| {reply, ok, File#file{tail_append_begin = TailAppendBegin}}; |
| _Error2 -> |
| {reply, ok, File} |
| end |
| end; |
| |
| |
| handle_call(find_header, _From, #file{fd=Fd, eof=Pos}=File) -> |
| {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}. |
| |
| % 09 UPGRADE CODE |
| -define(HEADER_SIZE, 2048). % size of each segment of the doubly written header |
| |
| % 09 UPGRADE CODE |
| read_old_header(Fd, Prefix) -> |
| {ok, Bin} = file:pread(Fd, 0, 2*(?HEADER_SIZE)), |
| <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin, |
| Result = |
| % read the first header |
| case extract_header(Prefix, Bin1) of |
| {ok, Header1} -> |
| case extract_header(Prefix, Bin2) of |
| {ok, Header2} -> |
| case Header1 == Header2 of |
| true -> |
| % Everything is completely normal! |
| {ok, Header1}; |
| false -> |
| % To get here we must have two different header versions with signatures intact. |
| % It's weird but possible (a commit failure right at the 2k boundary). Log it and take the first. |
| ?LOG_INFO("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]), |
| {ok, Header1} |
| end; |
| Error -> |
| % error reading second header. It's ok, but log it. |
| ?LOG_INFO("Secondary header corruption (error: ~p). Using primary header.", [Error]), |
| {ok, Header1} |
| end; |
| Error -> |
| % error reading primary header |
| case extract_header(Prefix, Bin2) of |
| {ok, Header2} -> |
| % log corrupt primary header. It's ok since the secondary is still good. |
| ?LOG_INFO("Primary header corruption (error: ~p). Using secondary header.", [Error]), |
| {ok, Header2}; |
| _ -> |
| % error reading secondary header too |
| % return the error, no need to log anything as the caller will be responsible for dealing with the error. |
| Error |
| end |
| end, |
| case Result of |
| {ok, {pointer_to_header_data, Ptr}} -> |
| pread_term(Fd, Ptr); |
| _ -> |
| Result |
| end. |
| |
| % 09 UPGRADE CODE |
| extract_header(Prefix, Bin) -> |
| SizeOfPrefix = size(Prefix), |
| SizeOfTermBin = ?HEADER_SIZE - |
| SizeOfPrefix - |
| 16, % md5 sig |
| |
| <<HeaderPrefix:SizeOfPrefix/binary, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin, |
| |
| % check the header prefix |
| case HeaderPrefix of |
| Prefix -> |
| % check the integrity signature |
| case couch_util:md5(TermBin) == Sig of |
| true -> |
| Header = binary_to_term(TermBin), |
| {ok, Header}; |
| false -> |
| header_corrupt |
| end; |
| _ -> |
| unknown_header_type |
| end. |
| |
| |
| % 09 UPGRADE CODE |
| write_old_header(Fd, Prefix, Data) -> |
| TermBin = term_to_binary(Data), |
| % the size of all the bytes written to the header, including the md5 signature (16 bytes) |
| FilledSize = byte_size(Prefix) + byte_size(TermBin) + 16, |
| {TermBin2, FilledSize2} = |
| case FilledSize > ?HEADER_SIZE of |
| true -> |
| % too big! |
| {ok, Pos} = append_binary(Fd, TermBin), |
| PtrBin = term_to_binary({pointer_to_header_data, Pos}), |
| {PtrBin, byte_size(Prefix) + byte_size(PtrBin) + 16}; |
| false -> |
| {TermBin, FilledSize} |
| end, |
| ok = file:sync(Fd), |
| % pad out the header with zeros, then take the md5 hash |
| PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>, |
| Sig = couch_util:md5([TermBin2, PadZeros]), |
| % now we assemble the final header binary and write to disk |
| WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>, |
| ?HEADER_SIZE = size(WriteBin), % sanity check |
| DblWriteBin = [WriteBin, WriteBin], |
| ok = file:pwrite(Fd, 0, DblWriteBin), |
| ok = file:sync(Fd). |
| |
| |
| handle_cast(close, Fd) -> |
| {stop,normal,Fd}. |
| |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| handle_info({'EXIT', _, normal}, Fd) -> |
| {noreply, Fd}; |
| handle_info({'EXIT', _, Reason}, Fd) -> |
| {stop, Reason, Fd}. |
| |
| |
| find_header(_Fd, -1) -> |
| no_valid_header; |
| find_header(Fd, Block) -> |
| case (catch load_header(Fd, Block)) of |
| {ok, Bin} -> |
| {ok, Bin}; |
| _Error -> |
| find_header(Fd, Block -1) |
| end. |
| |
| load_header(Fd, Block) -> |
| {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1), |
| {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4), |
| TotalBytes = calculate_total_read_len(1, HeaderLen), |
| {ok, <<RawBin:TotalBytes/binary>>} = |
| file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes), |
| <<Md5Sig:16/binary, HeaderBin/binary>> = |
| iolist_to_binary(remove_block_prefixes(1, RawBin)), |
| Md5Sig = couch_util:md5(HeaderBin), |
| {ok, HeaderBin}. |
| |
| -spec read_raw_iolist_int(#file{}, Pos::non_neg_integer(), Len::non_neg_integer()) -> |
| {Data::iolist(), CurPos::non_neg_integer()}. |
| read_raw_iolist_int(#file{fd=Fd, tail_append_begin=TAB}, Pos, Len) -> |
| BlockOffset = Pos rem ?SIZE_BLOCK, |
| TotalBytes = calculate_total_read_len(BlockOffset, Len), |
| {ok, <<RawBin:TotalBytes/binary>>} = file:pread(Fd, Pos, TotalBytes), |
| if Pos >= TAB -> |
| {remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}; |
| true -> |
| % 09 UPGRADE CODE |
| <<ReturnBin:Len/binary, _/binary>> = RawBin, |
| {[ReturnBin], Pos + Len} |
| end. |
| |
| -spec extract_md5(iolist()) -> {binary(), iolist()}. |
| extract_md5(FullIoList) -> |
| {Md5List, IoList} = split_iolist(FullIoList, 16, []), |
| {iolist_to_binary(Md5List), IoList}. |
| |
| calculate_total_read_len(0, FinalLen) -> |
| calculate_total_read_len(1, FinalLen) + 1; |
| calculate_total_read_len(BlockOffset, FinalLen) -> |
| case ?SIZE_BLOCK - BlockOffset of |
| BlockLeft when BlockLeft >= FinalLen -> |
| FinalLen; |
| BlockLeft -> |
| FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) + |
| if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) =:= 0 -> 0; |
| true -> 1 end |
| end. |
| |
| remove_block_prefixes(_BlockOffset, <<>>) -> |
| []; |
| remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) -> |
| remove_block_prefixes(1, Rest); |
| remove_block_prefixes(BlockOffset, Bin) -> |
| BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset, |
| case size(Bin) of |
| Size when Size > BlockBytesAvailable -> |
| <<DataBlock:BlockBytesAvailable/binary,Rest/binary>> = Bin, |
| [DataBlock | remove_block_prefixes(0, Rest)]; |
| _Size -> |
| [Bin] |
| end. |
| |
| make_blocks(_BlockOffset, []) -> |
| []; |
| make_blocks(0, IoList) -> |
| [<<0>> | make_blocks(1, IoList)]; |
| make_blocks(BlockOffset, IoList) -> |
| case split_iolist(IoList, (?SIZE_BLOCK - BlockOffset), []) of |
| {Begin, End} -> |
| [Begin | make_blocks(0, End)]; |
| _SplitRemaining -> |
| IoList |
| end. |
| |
| %% @doc Returns a tuple where the first element contains the leading SplitAt |
| %% bytes of the original iolist, and the 2nd element is the tail. If SplitAt |
| %% is larger than byte_size(IoList), return the difference. |
| -spec split_iolist(IoList::iolist(), SplitAt::non_neg_integer(), Acc::list()) -> |
| {iolist(), iolist()} | non_neg_integer(). |
| split_iolist(List, 0, BeginAcc) -> |
| {lists:reverse(BeginAcc), List}; |
| split_iolist([], SplitAt, _BeginAcc) -> |
| SplitAt; |
| split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) when SplitAt > byte_size(Bin) -> |
| split_iolist(Rest, SplitAt - byte_size(Bin), [Bin | BeginAcc]); |
| split_iolist([<<Bin/binary>> | Rest], SplitAt, BeginAcc) -> |
| <<Begin:SplitAt/binary,End/binary>> = Bin, |
| split_iolist([End | Rest], 0, [Begin | BeginAcc]); |
| split_iolist([Sublist| Rest], SplitAt, BeginAcc) when is_list(Sublist) -> |
| case split_iolist(Sublist, SplitAt, BeginAcc) of |
| {Begin, End} -> |
| {Begin, [End | Rest]}; |
| SplitRemaining -> |
| split_iolist(Rest, SplitAt - (SplitAt - SplitRemaining), [Sublist | BeginAcc]) |
| end; |
| split_iolist([Byte | Rest], SplitAt, BeginAcc) when is_integer(Byte) -> |
| split_iolist(Rest, SplitAt - 1, [Byte | BeginAcc]). |