blob: 83b0611eb5c75270b0ab199445ca1d947cffc5ea [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_stream).
-behaviour(gen_server).
-vsn(1).
-export([
open/1,
open/2,
close/1,
copy/2,
write/2,
to_disk_term/1,
foldl/3,
foldl/4,
foldl_decode/5,
range_foldl/5
]).
-export([
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3
]).
-include_lib("couch/include/couch_db.hrl").
-define(DEFAULT_BUFFER_SIZE, 4096).
-record(stream, {
engine,
opener_monitor,
written_pointers=[],
buffer_list = [],
buffer_len = 0,
max_buffer,
written_len = 0,
md5,
% md5 of the content without any transformation applied (e.g. compression)
% needed for the attachment upload integrity check (ticket 558)
identity_md5,
identity_len = 0,
encoding_fun,
end_encoding_fun
}).
open({_StreamEngine, _StreamEngineState} = Engine) ->
open(Engine, []).
open({_StreamEngine, _StreamEngineState} = Engine, Options) ->
gen_server:start_link(?MODULE, {Engine, self(), erlang:get(io_priority), Options}, []).
close(Pid) ->
gen_server:call(Pid, close, infinity).
copy(Src, Dst) ->
foldl(Src, fun(Bin, _) ->
ok = write(Dst, Bin)
end, ok).
write(_Pid, <<>>) ->
ok;
write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
to_disk_term({Engine, EngineState}) ->
Engine:to_disk_term(EngineState).
foldl({Engine, EngineState}, Fun, Acc) ->
Engine:foldl(EngineState, Fun, Acc).
foldl(Engine, <<>>, Fun, Acc) ->
foldl(Engine, Fun, Acc);
foldl(Engine, Md5, UserFun, UserAcc) ->
InitAcc = {crypto:hash_init(md5), UserFun, UserAcc},
{Md5Acc, _, OutAcc} = foldl(Engine, fun foldl_md5/2, InitAcc),
Md5 = crypto:hash_final(Md5Acc),
OutAcc.
foldl_decode(Engine, Md5, Enc, UserFun, UserAcc1) ->
{DecDataFun, DecEndFun} = case Enc of
gzip -> ungzip_init();
identity -> identity_enc_dec_funs()
end,
InitAcc = {DecDataFun, UserFun, UserAcc1},
{_, _, UserAcc2} = foldl(Engine, Md5, fun foldl_decode/2, InitAcc),
DecEndFun(),
UserAcc2.
range_foldl(Engine, From, To, UserFun, UserAcc) when To >= From ->
NewEngine = do_seek(Engine, From),
InitAcc = {To - From, UserFun, UserAcc},
try
{_, _, UserAcc2} = foldl(NewEngine, fun foldl_length/2, InitAcc),
UserAcc2
catch
throw:{finished, UserAcc3} ->
UserAcc3
end.
foldl_md5(Bin, {Md5Acc, UserFun, UserAcc}) ->
NewMd5Acc = crypto:hash_update(Md5Acc, Bin),
{NewMd5Acc, UserFun, UserFun(Bin, UserAcc)}.
foldl_decode(EncBin, {DecFun, UserFun, UserAcc}) ->
case DecFun(EncBin) of
<<>> -> {DecFun, UserFun, UserAcc};
Dec -> {DecFun, UserFun, UserFun(Dec, UserAcc)}
end.
foldl_length(Bin, {Length, UserFun, UserAcc}) ->
BinSize = size(Bin),
case BinSize =< Length of
true ->
{Length - BinSize, UserFun, UserFun(Bin, UserAcc)};
false ->
<<Trunc:BinSize/binary, _/binary>> = Bin,
throw({finished, UserFun(Trunc, UserAcc)})
end.
gzip_init(Options) ->
case couch_util:get_value(compression_level, Options, 0) of
Lvl when Lvl >= 1 andalso Lvl =< 9 ->
Z = zlib:open(),
% 15 = ?MAX_WBITS (defined in the zlib module)
% the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default),
{
fun(Data) ->
zlib:deflate(Z, Data)
end,
fun() ->
Last = zlib:deflate(Z, [], finish),
ok = zlib:deflateEnd(Z),
ok = zlib:close(Z),
Last
end
};
_ ->
identity_enc_dec_funs()
end.
ungzip_init() ->
Z = zlib:open(),
zlib:inflateInit(Z, 16 + 15),
{
fun(Data) ->
zlib:inflate(Z, Data)
end,
fun() ->
ok = zlib:inflateEnd(Z),
ok = zlib:close(Z)
end
}.
identity_enc_dec_funs() ->
{
fun(Data) -> Data end,
fun() -> [] end
}.
init({Engine, OpenerPid, OpenerPriority, Options}) ->
erlang:put(io_priority, OpenerPriority),
{EncodingFun, EndEncodingFun} =
case couch_util:get_value(encoding, Options, identity) of
identity -> identity_enc_dec_funs();
gzip -> gzip_init(Options)
end,
{ok, #stream{
engine=Engine,
opener_monitor=erlang:monitor(process, OpenerPid),
md5=crypto:hash_init(md5),
identity_md5=crypto:hash_init(md5),
encoding_fun=EncodingFun,
end_encoding_fun=EndEncodingFun,
max_buffer=couch_util:get_value(
buffer_size, Options, ?DEFAULT_BUFFER_SIZE)
}
}.
terminate(_Reason, _Stream) ->
ok.
handle_call({write, Bin}, _From, Stream) ->
BinSize = iolist_size(Bin),
#stream{
engine = Engine,
written_len = WrittenLen,
buffer_len = BufferLen,
buffer_list = Buffer,
max_buffer = Max,
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
encoding_fun = EncodingFun} = Stream,
if BinSize + BufferLen > Max ->
WriteBin = lists:reverse(Buffer, [Bin]),
IdenMd5_2 = crypto:hash_update(IdenMd5, WriteBin),
case EncodingFun(WriteBin) of
[] ->
% case where the encoder did some internal buffering
% (zlib does it for example)
NewEngine = Engine,
WrittenLen2 = WrittenLen,
Md5_2 = Md5;
WriteBin2 ->
NewEngine = do_write(Engine, WriteBin2),
WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
Md5_2 = crypto:hash_update(Md5, WriteBin2)
end,
{reply, ok, Stream#stream{
engine = NewEngine,
written_len=WrittenLen2,
buffer_list=[],
buffer_len=0,
md5=Md5_2,
identity_md5=IdenMd5_2,
identity_len=IdenLen + BinSize}, hibernate};
true ->
{reply, ok, Stream#stream{
buffer_list=[Bin|Buffer],
buffer_len=BufferLen + BinSize,
identity_len=IdenLen + BinSize}}
end;
handle_call(close, _From, Stream) ->
#stream{
engine = Engine,
opener_monitor = MonRef,
written_len = WrittenLen,
buffer_list = Buffer,
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
encoding_fun = EncodingFun,
end_encoding_fun = EndEncodingFun} = Stream,
WriteBin = lists:reverse(Buffer),
IdenMd5Final = crypto:hash_final(crypto:hash_update(IdenMd5, WriteBin)),
WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
Md5Final = crypto:hash_final(crypto:hash_update(Md5, WriteBin2)),
Result = case WriteBin2 of
[] ->
{do_finalize(Engine), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
_ ->
NewEngine = do_write(Engine, WriteBin2),
StreamLen = WrittenLen + iolist_size(WriteBin2),
{do_finalize(NewEngine), StreamLen, IdenLen, Md5Final, IdenMd5Final}
end,
erlang:demonitor(MonRef),
{stop, normal, Result, Stream}.
handle_cast(_Msg, State) ->
{noreply,State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_info({'DOWN', Ref, _, _, _}, #stream{opener_monitor=Ref} = State) ->
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
do_seek({Engine, EngineState}, Offset) ->
{ok, NewState} = Engine:seek(EngineState, Offset),
{Engine, NewState}.
do_write({Engine, EngineState}, Data) ->
{ok, NewState} = Engine:write(EngineState, Data),
{Engine, NewState}.
do_finalize({Engine, EngineState}) ->
{ok, NewState} = Engine:finalize(EngineState),
{Engine, NewState}.