blob: 04c177702ec68d9cc7324c45b212cdf949eff610 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_stream).
-behaviour(gen_server).
-define(FILE_POINTER_BYTES, 8).
-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
-define(STREAM_OFFSET_BYTES, 4).
-define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)).
-define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go
-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
-export([open/1, open/3, close/1, write/2, foldl/4, foldl/5, foldl_decode/6,
old_foldl/5,old_copy_to_new_stream/4]).
-export([copy_to_new_stream/3,old_read_term/2]).
-export([init/1, terminate/2, handle_call/3]).
-export([handle_cast/2,code_change/3,handle_info/2]).
-include("couch_db.hrl").
-record(stream,
{fd = 0,
written_pointers=[],
buffer_list = [],
buffer_len = 0,
max_buffer = 4096,
written_len = 0,
md5,
% md5 of the content without any transformation applied (e.g. compression)
% needed for the attachment upload integrity check (ticket 558)
identity_md5,
identity_len = 0,
encoding_fun,
end_encoding_fun
}).
%%% Interface functions %%%
open(Fd) ->
open(Fd, identity, []).
open(Fd, Encoding, Options) ->
gen_server:start_link(couch_stream, {Fd, Encoding, Options}, []).
close(Pid) ->
gen_server:call(Pid, close, infinity).
copy_to_new_stream(Fd, PosList, DestFd) ->
{ok, Dest} = open(DestFd),
foldl(Fd, PosList,
fun(Bin, _) ->
ok = write(Dest, Bin)
end, ok),
close(Dest).
% 09 UPGRADE CODE
old_copy_to_new_stream(Fd, Pos, Len, DestFd) ->
{ok, Dest} = open(DestFd),
old_foldl(Fd, Pos, Len,
fun(Bin, _) ->
ok = write(Dest, Bin)
end, ok),
close(Dest).
% 09 UPGRADE CODE
old_foldl(_Fd, null, 0, _Fun, Acc) ->
Acc;
old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)->
{ok, Acc2, _} = old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc),
Acc2.
foldl(_Fd, [], _Fun, Acc) ->
Acc;
foldl(Fd, [Pos|Rest], Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
foldl(Fd, PosList, <<>>, Fun, Acc) ->
foldl(Fd, PosList, Fun, Acc);
foldl(Fd, PosList, Md5, Fun, Acc) ->
foldl(Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc).
foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
{DecDataFun, DecEndFun} = case Enc of
gzip ->
ungzip_init();
identity ->
identity_enc_dec_funs()
end,
Result = foldl_decode(
DecDataFun, Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc
),
DecEndFun(),
Result.
foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
Md5 = couch_util:md5_final(Md5Acc),
Acc;
foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, Bin)),
Fun(Bin, Acc);
foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Md5, couch_util:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
Md5 = couch_util:md5_final(Md5Acc),
Acc;
foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
{ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, EncBin)),
Bin = DecFun(EncBin),
Fun(Bin, Acc);
foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
{ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
Bin = DecFun(EncBin),
Md5Acc2 = couch_util:md5_update(Md5Acc, EncBin),
foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
gzip_init(Options) ->
case couch_util:get_value(compression_level, Options, 0) of
Lvl when Lvl >= 1 andalso Lvl =< 9 ->
Z = zlib:open(),
% 15 = ?MAX_WBITS (defined in the zlib module)
% the 16 + ?MAX_WBITS formula was obtained by inspecting zlib:gzip/1
ok = zlib:deflateInit(Z, Lvl, deflated, 16 + 15, 8, default),
{
fun(Data) ->
zlib:deflate(Z, Data)
end,
fun() ->
Last = zlib:deflate(Z, [], finish),
ok = zlib:deflateEnd(Z),
ok = zlib:close(Z),
Last
end
};
_ ->
identity_enc_dec_funs()
end.
ungzip_init() ->
Z = zlib:open(),
zlib:inflateInit(Z, 16 + 15),
{
fun(Data) ->
zlib:inflate(Z, Data)
end,
fun() ->
ok = zlib:inflateEnd(Z),
ok = zlib:close(Z)
end
}.
identity_enc_dec_funs() ->
{
fun(Data) -> Data end,
fun() -> [] end
}.
write(_Pid, <<>>) ->
ok;
write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
init({Fd, Encoding, Options}) ->
{EncodingFun, EndEncodingFun} = case Encoding of
identity ->
identity_enc_dec_funs();
gzip ->
gzip_init(Options)
end,
{ok, #stream{
fd=Fd,
md5=couch_util:md5_init(),
identity_md5=couch_util:md5_init(),
encoding_fun=EncodingFun,
end_encoding_fun=EndEncodingFun
}
}.
terminate(_Reason, _Stream) ->
ok.
handle_call({write, Bin}, _From, Stream) ->
BinSize = iolist_size(Bin),
#stream{
fd = Fd,
written_len = WrittenLen,
written_pointers = Written,
buffer_len = BufferLen,
buffer_list = Buffer,
max_buffer = Max,
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
encoding_fun = EncodingFun} = Stream,
if BinSize + BufferLen > Max ->
WriteBin = lists:reverse(Buffer, [Bin]),
IdenMd5_2 = couch_util:md5_update(IdenMd5, WriteBin),
case EncodingFun(WriteBin) of
[] ->
% case where the encoder did some internal buffering
% (zlib does it for example)
WrittenLen2 = WrittenLen,
Md5_2 = Md5,
Written2 = Written;
WriteBin2 ->
{ok, Pos} = couch_file:append_binary(Fd, WriteBin2),
WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
Md5_2 = couch_util:md5_update(Md5, WriteBin2),
Written2 = [Pos|Written]
end,
{reply, ok, Stream#stream{
written_len=WrittenLen2,
written_pointers=Written2,
buffer_list=[],
buffer_len=0,
md5=Md5_2,
identity_md5=IdenMd5_2,
identity_len=IdenLen + BinSize}};
true ->
{reply, ok, Stream#stream{
buffer_list=[Bin|Buffer],
buffer_len=BufferLen + BinSize,
identity_len=IdenLen + BinSize}}
end;
handle_call(close, _From, Stream) ->
#stream{
fd = Fd,
written_len = WrittenLen,
written_pointers = Written,
buffer_list = Buffer,
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
encoding_fun = EncodingFun,
end_encoding_fun = EndEncodingFun} = Stream,
WriteBin = lists:reverse(Buffer),
IdenMd5Final = couch_util:md5_final(couch_util:md5_update(IdenMd5, WriteBin)),
WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
Md5Final = couch_util:md5_final(couch_util:md5_update(Md5, WriteBin2)),
Result = case WriteBin2 of
[] ->
{lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
_ ->
{ok, Pos} = couch_file:append_binary(Fd, WriteBin2),
StreamInfo = lists:reverse(Written, [Pos]),
StreamLen = WrittenLen + iolist_size(WriteBin2),
{StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
end,
{stop, normal, Result, Stream}.
handle_cast(_Msg, State) ->
{noreply,State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_info(_Info, State) ->
{noreply, State}.
% 09 UPGRADE CODE
old_read_term(Fd, Sp) ->
{ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
= old_read(Fd, Sp, ?STREAM_OFFSET_BYTES),
{ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen),
{ok, binary_to_term(Bin)}.
old_read(Fd, Sp, Num) ->
{ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []),
Bin = list_to_binary(lists:reverse(RevBin)),
{ok, Bin, Sp2}.
% 09 UPGRADE CODE
old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
{ok, Acc, Sp};
old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
{ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
= couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
Sp = {NextPos, NextOffset},
% Check NextPos is past current Pos (this is always true in a stream)
% Guards against potential infinite loops caused by corruption.
case NextPos > Pos of
true -> ok;
false -> throw({error, stream_corruption})
end,
old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
ReadAmount = lists:min([MaxChunk, Num, Offset]),
{ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount),
Sp = {Pos + ReadAmount, Offset - ReadAmount},
old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)).
% Tests moved to tests/etap/050-stream.t