blob: 913977fb4a999334eaf9d759375c4011eb802dfa [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_stream).
-behaviour(gen_server).
-vsn(1).
% public API
-export([open/1, open/2, close/1]).
-export([foldl/4, foldl/5, foldl_decode/6, range_foldl/6]).
-export([copy_to_new_stream/3, write/2]).
% gen_server callbacks
-export([init/1, terminate/2, code_change/3]).
-export([handle_cast/2, handle_call/3, handle_info/2]).
-include_lib("couch/include/couch_db.hrl").
-define(DEFAULT_BUFFER_SIZE, 4096).
-record(stream,
{fd = 0,
opener_monitor,
written_pointers=[],
buffer_list = [],
buffer_len = 0,
max_buffer,
written_len = 0,
md5,
% md5 of the content without any transformation applied (e.g. compression)
% needed for the attachment upload integrity check (ticket 558)
identity_md5,
identity_len = 0,
encoding_fun,
end_encoding_fun
}).
%%% Interface functions %%%
open(Fd) ->
open(Fd, []).
open(Fd, Options) ->
gen_server:start_link(couch_stream, {Fd, self(), erlang:get(io_priority), Options}, []).
close(Pid) ->
gen_server:call(Pid, close, infinity).
copy_to_new_stream(Fd, PosList, DestFd) ->
{ok, Dest} = open(DestFd),
foldl(Fd, PosList,
fun(Bin, _) ->
ok = write(Dest, Bin)
end, ok),
close(Dest).
foldl(_Fd, [], _Fun, Acc) ->
Acc;
foldl(Fd, [Pos|Rest], Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
foldl(Fd, PosList, <<>>, Fun, Acc) ->
foldl(Fd, PosList, Fun, Acc);
foldl(Fd, PosList, Md5, Fun, Acc) ->
foldl(Fd, PosList, Md5, couch_crypto:hash_init(md5), Fun, Acc).
foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
{DecDataFun, DecEndFun} = case Enc of
gzip ->
ungzip_init();
identity ->
identity_enc_dec_funs()
end,
Result = foldl_decode(
DecDataFun, Fd, PosList, Md5, couch_crypto:hash_init(md5), Fun, Acc
),
DecEndFun(),
Result.
foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
Md5 = couch_crypto:hash_final(md5, Md5Acc),
Acc;
foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE
foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc);
foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
Md5 = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5Acc, Bin)),
Fun(Bin, Acc);
foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Md5, couch_crypto:hash_update(md5, Md5Acc, Bin), Fun, Fun(Bin, Acc)).
range_foldl(Fd, PosList, From, To, Fun, Acc) ->
range_foldl(Fd, PosList, From, To, 0, Fun, Acc).
range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To ->
Acc;
range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc);
range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size ->
range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc);
range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
Bin1 = if
From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered
true ->
PrefixLen = clip(From - Off, 0, Size),
PostfixLen = clip(Off + Size - To, 0, Size),
MatchLen = Size - PrefixLen - PostfixLen,
<<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin),
Match
end,
range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)).
clip(Value, Lo, Hi) ->
if
Value < Lo -> Lo;
Value > Hi -> Hi;
true -> Value
end.
foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
Md5 = couch_crypto:hash_final(md5, Md5Acc),
Acc;
foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) ->
foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc);
foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
{ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
Md5 = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5Acc, EncBin)),
Bin = DecFun(EncBin),
Fun(Bin, Acc);
foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
{ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
Bin = DecFun(EncBin),
Md5Acc2 = couch_crypto:hash_update(md5, Md5Acc, EncBin),
foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
gzip_init(Options) ->
case couch_util:get_value(compression_level, Options, 0) of
Lvl when Lvl >= 1 andalso Lvl =< 9 ->
Z = zlib:open(),
% 15 = ?MAX_WBITS (defined in the zlib module)
% the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default),
{
fun(Data) ->
zlib:deflate(Z, Data)
end,
fun() ->
Last = zlib:deflate(Z, [], finish),
ok = zlib:deflateEnd(Z),
ok = zlib:close(Z),
Last
end
};
_ ->
identity_enc_dec_funs()
end.
ungzip_init() ->
Z = zlib:open(),
zlib:inflateInit(Z, 16 + 15),
{
fun(Data) ->
zlib:inflate(Z, Data)
end,
fun() ->
ok = zlib:inflateEnd(Z),
ok = zlib:close(Z)
end
}.
identity_enc_dec_funs() ->
{
fun(Data) -> Data end,
fun() -> [] end
}.
write(_Pid, <<>>) ->
ok;
write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
init({Fd, OpenerPid, OpenerPriority, Options}) ->
erlang:put(io_priority, OpenerPriority),
{EncodingFun, EndEncodingFun} =
case couch_util:get_value(encoding, Options, identity) of
identity ->
identity_enc_dec_funs();
gzip ->
gzip_init(Options)
end,
{ok, #stream{
fd=Fd,
opener_monitor=erlang:monitor(process, OpenerPid),
md5=couch_crypto:hash_init(md5),
identity_md5=couch_crypto:hash_init(md5),
encoding_fun=EncodingFun,
end_encoding_fun=EndEncodingFun,
max_buffer=couch_util:get_value(
buffer_size, Options, ?DEFAULT_BUFFER_SIZE)
}
}.
terminate(_Reason, _Stream) ->
ok.
handle_call({write, Bin}, _From, Stream) ->
BinSize = iolist_size(Bin),
#stream{
fd = Fd,
written_len = WrittenLen,
written_pointers = Written,
buffer_len = BufferLen,
buffer_list = Buffer,
max_buffer = Max,
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
encoding_fun = EncodingFun} = Stream,
if BinSize + BufferLen > Max ->
WriteBin = lists:reverse(Buffer, [Bin]),
IdenMd5_2 = couch_crypto:hash_update(md5, IdenMd5, WriteBin),
case EncodingFun(WriteBin) of
[] ->
% case where the encoder did some internal buffering
% (zlib does it for example)
WrittenLen2 = WrittenLen,
Md5_2 = Md5,
Written2 = Written;
WriteBin2 ->
{ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
Md5_2 = couch_crypto:hash_update(md5, Md5, WriteBin2),
Written2 = [{Pos, iolist_size(WriteBin2)}|Written]
end,
{reply, ok, Stream#stream{
written_len=WrittenLen2,
written_pointers=Written2,
buffer_list=[],
buffer_len=0,
md5=Md5_2,
identity_md5=IdenMd5_2,
identity_len=IdenLen + BinSize}};
true ->
{reply, ok, Stream#stream{
buffer_list=[Bin|Buffer],
buffer_len=BufferLen + BinSize,
identity_len=IdenLen + BinSize}}
end;
handle_call(close, _From, Stream) ->
#stream{
fd = Fd,
opener_monitor = MonRef,
written_len = WrittenLen,
written_pointers = Written,
buffer_list = Buffer,
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
encoding_fun = EncodingFun,
end_encoding_fun = EndEncodingFun} = Stream,
WriteBin = lists:reverse(Buffer),
IdenMd5Final = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, IdenMd5, WriteBin)),
WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
Md5Final = couch_crypto:hash_final(md5, couch_crypto:hash_update(md5, Md5, WriteBin2)),
Result = case WriteBin2 of
[] ->
{lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
_ ->
{ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]),
StreamLen = WrittenLen + iolist_size(WriteBin2),
{StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
end,
erlang:demonitor(MonRef),
{stop, normal, Result, Stream}.
handle_cast(_Msg, State) ->
{noreply,State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_info({'DOWN', Ref, _, _, _}, #stream{opener_monitor=Ref} = State) ->
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.