Various changes. See README for details
diff --git a/README b/README
index 8092c70..adb1d7d 100644
--- a/README
+++ b/README
@@ -18,12 +18,32 @@
Comments to : Chandrashekhar.Mullaparthi@gmail.com
-Version : 1.6.2
+Version : 2.0.0
Latest version : git://github.com/cmullaparthi/ibrowse.git
CONTRIBUTIONS & CHANGE HISTORY
==============================
+22-09-2010 - * Added option preserve_chunked_encoding. This allows the caller to get
+ the raw HTTP response when the Transfer-Encoding is Chunked. This feature
+ was requested by Benoit Chesneau who wanted to write a HTTP proxy using
+ ibrowse.
+ * Fixed bug with the {stream_to, {Pid, once}} option. Bug report and lot
+ of help from Filipe David Manana. Thank you Filipe.
+ * The {error, conn_failed} and {error, send_failed} return values are
+ now of the form {error, {conn_failed, Err}} and
+ {error, {send_failed, Err}}. This is so that the specific socket error
+ can be returned to the caller. I think it looks a bit ugly, but that
+ is the best compromise I could come up with.
+ * Added application configuration parameters default_max_sessions and
+ default_max_pipeline_size. These were previously hard coded to 10.
+ * Versioning of ibrowse now follows the Semantic Versioning principles.
+ See http://semver.org. Thanks to Anthony Molinaro for nudging me in
+ this direction.
+ * The connect_timeout option now only applies to the connection setup
+ phase. In previous versions, the time taken to setup the connection
+ was deducted from the specified timeout value for the request.
+
17-07-2010 - * Merged change made by Filipe David Manana to use the base64
module for encoding/decoding.
@@ -153,7 +173,7 @@
12-01-2007 - Derek Upham sent in a bug fix. The reset_state function was not
behaving correctly when the transfer encoding was not chunked.
-13-11-2006 - Youns Hafri reported a bug where ibrowse was not returning the
+13-11-2006 - Youns Hafri reported a bug where ibrowse was not returning the
temporary filename when the server was closing the connection
after sending the data (as in HTTP/1.0).
Released ibrowse under the BSD license
@@ -172,7 +192,7 @@
22-Nov-2005 - Added ability to generate requests using the Chunked
Transfer-Encoding.
-08-May-2005 - Youns Hafri made a CRUX LINUX port of ibrowse.
+08-May-2005 - Youns Hafri made a CRUX LINUX port of ibrowse.
http://yhafri.club.fr/crux/index.html
Here are some usage examples. Enjoy!
diff --git a/doc/ibrowse.html b/doc/ibrowse.html
index 590ff9a..af65022 100644
--- a/doc/ibrowse.html
+++ b/doc/ibrowse.html
@@ -12,7 +12,7 @@
<ul class="index"><li><a href="#description">Description</a></li><li><a href="#index">Function Index</a></li><li><a href="#functions">Function Details</a></li></ul>The ibrowse application implements an HTTP 1.1 client.
<p>Copyright © 2005-2010 Chandrashekhar Mullaparthi</p>
-<p><b>Version:</b> 1.6.0</p>
+<p><b>Version:</b> 2.0.0</p>
<p><b>Behaviours:</b> <a href="gen_server.html"><tt>gen_server</tt></a>.</p>
<p><b>Authors:</b> Chandrashekhar Mullaparthi (<a href="mailto:chandrashekhar dot mullaparthi at gmail dot com"><tt>chandrashekhar dot mullaparthi at gmail dot com</tt></a>).</p>
@@ -202,7 +202,7 @@
<div class="spec">
<p><tt>send_req(Url::string(), Headers::<a href="#type-headerList">headerList()</a>, Method::<a href="#type-method">method()</a>, Body::<a href="#type-body">body()</a>, Options::<a href="#type-optionList">optionList()</a>) -> <a href="#type-response">response()</a></tt>
<ul class="definitions"><li><tt><a name="type-optionList">optionList()</a> = [<a href="#type-option">option()</a>]</tt></li>
-<li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, <a href="#type-boolean">boolean()</a>} | {is_ssl, <a href="#type-boolean">boolean()</a>} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, string()} | {proxy_password, string()} | {use_absolute_uri, <a href="#type-boolean">boolean()</a>} | {basic_auth, {<a href="#type-username">username()</a>, <a href="#type-password">password()</a>}} | {cookie, string()} | {content_length, integer()} | {content_type, string()} | {save_response_to_file, <a href="#type-srtf">srtf()</a>} | {stream_to, <a href="#type-stream_to">stream_to()</a>} | {http_vsn, {MajorVsn, MinorVsn}} | {host_header, string()} | {inactivity_timeout, integer()} | {connect_timeout, integer()} | {socket_options, Sock_opts} | {transfer_encoding, {chunked, ChunkSize}} | {headers_as_is, <a href="#type-boolean">boolean()</a>} | {give_raw_headers, <a href="#type-boolean">boolean()</a>}</tt></li>
+<li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, <a href="#type-boolean">boolean()</a>} | {is_ssl, <a href="#type-boolean">boolean()</a>} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, string()} | {proxy_password, string()} | {use_absolute_uri, <a href="#type-boolean">boolean()</a>} | {basic_auth, {<a href="#type-username">username()</a>, <a href="#type-password">password()</a>}} | {cookie, string()} | {content_length, integer()} | {content_type, string()} | {save_response_to_file, <a href="#type-srtf">srtf()</a>} | {stream_to, <a href="#type-stream_to">stream_to()</a>} | {http_vsn, {MajorVsn, MinorVsn}} | {host_header, string()} | {inactivity_timeout, integer()} | {connect_timeout, integer()} | {socket_options, Sock_opts} | {transfer_encoding, {chunked, ChunkSize}} | {headers_as_is, <a href="#type-boolean">boolean()</a>} | {give_raw_headers, <a href="#type-boolean">boolean()</a>} | {preserve_chunked_encoding, <a href="#type-boolean">boolean()</a>}</tt></li>
<li><tt><a name="type-stream_to">stream_to()</a> = <a href="#type-process">process()</a> | {<a href="#type-process">process()</a>, once}</tt></li>
<li><tt><a name="type-process">process()</a> = pid() | atom()</tt></li>
<li><tt><a name="type-username">username()</a> = string()</tt></li>
@@ -284,6 +284,11 @@
caller to get access to the raw status line and raw unparsed
headers. Not quite sure why someone would want this, but one of my
users asked for it, so here it is. </li>
+
+ <li> The <code>preserve_chunked_encoding</code> option enables the caller
+ to receive the raw data stream when the Transfer-Encoding of the server
+ response is Chunked.
+ </li>
</ul>
</p>
@@ -441,6 +446,6 @@
<hr>
<div class="navbar"><a name="#navbar_bottom"></a><table width="100%" border="0" cellspacing="0" cellpadding="2" summary="navigation bar"><tr><td><a href="overview-summary.html" target="overviewFrame">Overview</a></td><td><a href="http://www.erlang.org/"><img src="erlang.png" align="right" border="0" alt="erlang logo"></a></td></tr></table></div>
-<p><i>Generated by EDoc, May 17 2010, 23:21:42.</i></p>
+<p><i>Generated by EDoc, Sep 22 2010, 22:56:44.</i></p>
</body>
</html>
diff --git a/src/ibrowse.erl b/src/ibrowse.erl
index 09d36a3..9f7b3a2 100644
--- a/src/ibrowse.erl
+++ b/src/ibrowse.erl
@@ -7,7 +7,7 @@
%%%-------------------------------------------------------------------
%% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
%% @copyright 2005-2010 Chandrashekhar Mullaparthi
-%% @version 1.6.0
+%% @version 2.0.0
%% @doc The ibrowse application implements an HTTP 1.1 client. This
%% module implements the API of the HTTP client. There is one named
%% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
@@ -236,6 +236,11 @@
%% caller to get access to the raw status line and raw unparsed
%% headers. Not quite sure why someone would want this, but one of my
%% users asked for it, so here it is. </li>
+%%
+%% <li> The <code>preserve_chunked_encoding</code> option enables the caller
+%% to receive the raw data stream when the Transfer-Encoding of the server
+%% response is Chunked.
+%% </li>
%% </ul>
%%
%% @spec send_req(Url::string(), Headers::headerList(), Method::method(), Body::body(), Options::optionList()) -> response()
@@ -266,7 +271,8 @@
%% {socket_options, Sock_opts} |
%% {transfer_encoding, {chunked, ChunkSize}} |
%% {headers_as_is, boolean()} |
-%% {give_raw_headers, boolean()}
+%% {give_raw_headers, boolean()} |
+%% {preserve_chunked_encoding,boolean()}
%%
%% stream_to() = process() | {process(), once}
%% process() = pid() | atom()
@@ -302,24 +308,47 @@
Options_1 = merge_options(Host, Port, Options),
{SSLOptions, IsSSL} =
case (Protocol == https) orelse
- get_value(is_ssl, Options_1, false) of
+ get_value(is_ssl, Options_1, false) of
false -> {[], false};
true -> {get_value(ssl_options, Options_1, []), true}
end,
- case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
- Max_sessions,
- Max_pipeline_size,
- {SSLOptions, IsSSL}) of
- {ok, Conn_Pid} ->
- do_send_req(Conn_Pid, Parsed_url, Headers,
- Method, Body, Options_1, Timeout);
- Err ->
- Err
- end;
+ try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1, Timeout, 0);
Err ->
{error, {url_parsing_failed, Err}}
end.
+try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 ->
+ case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL}) of
+ {ok, Conn_Pid} ->
+ case do_send_req(Conn_Pid, Parsed_url, Headers,
+ Method, Body, Options_1, Timeout) of
+ {error, sel_conn_closed} ->
+ io:format("Selected connection closed. Trying again...~n", []),
+ try_routing_request(Lb_pid, Parsed_url,
+ Max_sessions,
+ Max_pipeline_size,
+ {SSLOptions, IsSSL},
+ Headers, Method, Body, Options_1, Timeout, Try_count + 1);
+ Res ->
+ Res
+ end;
+ Err ->
+ Err
+ end;
+try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
+ {error, retry_later}.
+
merge_options(Host, Port, Options) ->
Config_options = get_config_value({options, Host, Port}, []),
lists:foldl(
@@ -337,11 +366,27 @@
get_max_sessions(Host, Port, Options) ->
get_value(max_sessions, Options,
- get_config_value({max_sessions, Host, Port}, ?DEF_MAX_SESSIONS)).
+ get_config_value({max_sessions, Host, Port},
+ default_max_sessions())).
get_max_pipeline_size(Host, Port, Options) ->
get_value(max_pipeline_size, Options,
- get_config_value({max_pipeline_size, Host, Port}, ?DEF_MAX_PIPELINE_SIZE)).
+ get_config_value({max_pipeline_size, Host, Port},
+ default_max_pipeline_size())).
+
+default_max_sessions() ->
+ safe_get_env(ibrowse, default_max_sessions, ?DEF_MAX_SESSIONS).
+
+default_max_pipeline_size() ->
+ safe_get_env(ibrowse, default_max_pipeline_size, ?DEF_MAX_PIPELINE_SIZE).
+
+safe_get_env(App, Key, Def_val) ->
+ case application:get_env(App, Key) of
+ undefined ->
+ Def_val;
+ {ok, Val} ->
+ Val
+ end.
%% @doc Deprecated. Use set_max_sessions/3 and set_max_pipeline_size/3
%% for achieving the same effect.
@@ -375,6 +420,10 @@
Options, Timeout) of
{'EXIT', {timeout, _}} ->
{error, req_timedout};
+ {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
+ {error, sel_conn_closed};
+ {error, connection_closed} ->
+ {error, sel_conn_closed};
{'EXIT', Reason} ->
{error, {'EXIT', Reason}};
{ok, St_code, Headers, Body} = Ret when is_binary(Body) ->
@@ -684,6 +733,10 @@
handle_call(stop, _From, State) ->
do_trace("IBROWSE shutting down~n", []),
+ ets:foldl(fun(#lb_pid{pid = Pid}, Acc) ->
+ ibrowse_lb:stop(Pid),
+ Acc
+ end, [], ibrowse_lb),
{stop, normal, ok, State};
handle_call({set_config_value, Key, Val}, _From, State) ->
diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl
index 1633e5b..7816359 100644
--- a/src/ibrowse_http_client.erl
+++ b/src/ibrowse_http_client.erl
@@ -47,7 +47,8 @@
status_line, raw_headers,
is_closing, send_timer, content_length,
deleted_crlf = false, transfer_encoding,
- chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
+ chunk_size, chunk_size_buffer = <<>>,
+ recvd_chunk_size, interim_reply_sent = false,
lb_ets_tid, cur_pipeline_size = 0, prev_req_id
}).
@@ -57,7 +58,7 @@
req_id,
stream_chunk_size,
save_response_to_file = false,
- tmp_file_name, tmp_file_fd,
+ tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
response_format}).
-import(ibrowse_lib, [
@@ -82,8 +83,13 @@
gen_server:start_link(?MODULE, Args, []).
stop(Conn_pid) ->
- catch gen_server:call(Conn_pid, stop),
- ok.
+ case catch gen_server:call(Conn_pid, stop) of
+ {'EXIT', {timeout, _}} ->
+ exit(Conn_pid, kill),
+ ok;
+ _ ->
+ ok
+ end.
send_req(Conn_Pid, Url, Headers, Method, Body, Options, Timeout) ->
gen_server:call(
@@ -171,6 +177,7 @@
%% {stop, Reason, State} (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
+%% io:format("Recvd data: ~p~n", [Data]),
do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
handle_sock_data(Data, State);
handle_info({ssl, _Sock, Data}, State) ->
@@ -178,13 +185,14 @@
handle_info({stream_next, Req_id}, #state{socket = Socket,
cur_req = #request{req_id = Req_id}} = State) ->
+ %% io:format("Client process set {active, once}~n", []),
do_setopts(Socket, [{active, once}], State),
{noreply, State};
handle_info({stream_next, _Req_id}, State) ->
{noreply, State};
-handle_info({tcp_closed, _Sock}, State) ->
+handle_info({tcp_closed, _Sock}, State) ->
do_trace("TCP connection closed by peer!~n", []),
handle_sock_closed(State),
{stop, normal, State};
@@ -194,11 +202,11 @@
{stop, normal, State};
handle_info({tcp_error, _Sock}, State) ->
- io:format("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
+ do_trace("Error on connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
handle_sock_closed(State),
{stop, normal, State};
handle_info({ssl_error, _Sock}, State) ->
- io:format("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
+ do_trace("Error on SSL connection to ~1000.p:~1000.p~n", [State#state.host, State#state.port]),
handle_sock_closed(State),
{stop, normal, State};
@@ -233,7 +241,8 @@
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
- do_close(State).
+ do_close(State),
+ ok.
%%--------------------------------------------------------------------
%% Func: code_change/3
@@ -269,6 +278,7 @@
end;
handle_sock_data(Data, #state{status = get_body,
+ socket = Socket,
content_length = CL,
http_status_code = StatCode,
recvd_headers = Headers,
@@ -293,6 +303,21 @@
fail_pipelined_requests(State,
{error, {Reason, {stat_code, StatCode}, Headers}}),
{stop, normal, State};
+ #state{cur_req = #request{caller_controls_socket = Ccs},
+ interim_reply_sent = Irs} = State_1 ->
+ %% io:format("Irs: ~p~n", [Irs]),
+ case Irs of
+ true ->
+ active_once(State_1);
+ false when Ccs == true ->
+ %% io:format("Setting {active,once}~n", []),
+ do_setopts(Socket, [{active, once}], State);
+ false ->
+ active_once(State_1)
+ end,
+ State_2 = State_1#state{interim_reply_sent = false},
+ set_inac_timer(State_2),
+ {noreply, State_2};
State_1 ->
active_once(State_1),
set_inac_timer(State_1),
@@ -338,17 +363,25 @@
{error, Reason} ->
{error, {file_write_error, Reason}}
end;
-accumulate_response(<<>>, State) ->
- State;
-accumulate_response(Data, #state{reply_buffer = RepBuf,
- rep_buf_size = RepBufSize,
- streamed_size = Streamed_size,
- cur_req = CurReq}=State) ->
- #request{stream_to=StreamTo, req_id=ReqId,
- stream_chunk_size = Stream_chunk_size,
- response_format = Response_format,
- caller_controls_socket = Caller_controls_socket} = CurReq,
- RepBuf_1 = list_to_binary([RepBuf, Data]),
+%% accumulate_response(<<>>, #state{cur_req = #request{caller_controls_socket = Ccs},
+%% socket = Socket} = State) ->
+%% case Ccs of
+%% true ->
+%% do_setopts(Socket, [{active, once}], State);
+%% false ->
+%% ok
+%% end,
+%% State;
+accumulate_response(Data, #state{reply_buffer = RepBuf,
+ rep_buf_size = RepBufSize,
+ streamed_size = Streamed_size,
+ cur_req = CurReq}=State) ->
+ #request{stream_to = StreamTo,
+ req_id = ReqId,
+ stream_chunk_size = Stream_chunk_size,
+ response_format = Response_format,
+ caller_controls_socket = Caller_controls_socket} = CurReq,
+ RepBuf_1 = <<RepBuf/binary, Data/binary>>,
New_data_size = RepBufSize - Streamed_size,
case StreamTo of
undefined ->
@@ -356,15 +389,21 @@
_ when Caller_controls_socket == true ->
do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
State#state{reply_buffer = <<>>,
+ interim_reply_sent = true,
streamed_size = Streamed_size + size(RepBuf_1)};
_ when New_data_size >= Stream_chunk_size ->
{Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
- accumulate_response(
- Rem_data,
- State#state{
- reply_buffer = <<>>,
- streamed_size = Streamed_size + Stream_chunk_size});
+ State_1 = State#state{
+ reply_buffer = <<>>,
+ interim_reply_sent = true,
+ streamed_size = Streamed_size + Stream_chunk_size},
+ case Rem_data of
+ <<>> ->
+ State_1;
+ _ ->
+ accumulate_response(Rem_data, State_1)
+ end;
_ ->
State#state{reply_buffer = RepBuf_1}
end.
@@ -498,9 +537,9 @@
is_ssl = true,
use_proxy = true,
proxy_tunnel_setup = Pts
- }) when Pts /= done -> gen_tcp:close(Sock);
-do_close(#state{socket = Sock, is_ssl = true}) -> ssl:close(Sock);
-do_close(#state{socket = Sock, is_ssl = false}) -> gen_tcp:close(Sock).
+ }) when Pts /= done -> catch gen_tcp:close(Sock);
+do_close(#state{socket = Sock, is_ssl = true}) -> catch ssl:close(Sock);
+do_close(#state{socket = Sock, is_ssl = false}) -> catch gen_tcp:close(Sock).
active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
ok;
@@ -542,25 +581,17 @@
end,
State_2 = check_ssl_options(Options, State_1),
do_trace("Connecting...~n", []),
- Start_ts = now(),
Conn_timeout = get_value(connect_timeout, Options, Timeout),
case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
{ok, Sock} ->
- do_trace("Connected!~n", []),
- End_ts = now(),
- Timeout_1 = case Timeout of
- infinity ->
- infinity;
- _ ->
- Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000))
- end,
+ do_trace("Connected! Socket: ~1000.p~n", [Sock]),
State_3 = State_2#state{socket = Sock,
connect_timeout = Conn_timeout},
- send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3);
+ send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State_3);
Err ->
shutting_down(State_2),
do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
- gen_server:reply(From, {error, conn_failed}),
+ gen_server:reply(From, {error, {conn_failed, Err}}),
{stop, normal, State_2}
end;
@@ -580,8 +611,9 @@
use_proxy = true,
is_ssl = true} = State) ->
NewReq = #request{
- method = connect,
- options = Options
+ method = connect,
+ preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
+ options = Options
},
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
@@ -611,13 +643,13 @@
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
@@ -666,7 +698,9 @@
save_response_to_file = SaveResponseToFile,
stream_chunk_size = get_stream_chunk_size(Options),
response_format = Resp_format,
- from = From},
+ from = From,
+ preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false)
+ },
State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
{Req, Body_1} = make_request(Method,
@@ -705,13 +739,13 @@
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end;
Err ->
shutting_down(State_1),
do_trace("Send failed... Reason: ~p~n", [Err]),
- gen_server:reply(From, {error, send_failed}),
+ gen_server:reply(From, {error, {send_failed, Err}}),
{stop, normal, State_1}
end.
@@ -768,14 +802,14 @@
ibrowse_lib:encode_base64(Username ++ [$: | Password]).
make_request(Method, Headers, AbsPath, RelPath, Body, Options,
- #state{use_proxy = UseProxy}) ->
+ #state{use_proxy = UseProxy, is_ssl = Is_ssl}) ->
HttpVsn = http_vsn_string(get_value(http_vsn, Options, {1,1})),
Headers_1 =
case get_value(content_length, Headers, false) of
false when (Body == []) or
- (Body == <<>>) or
- is_tuple(Body) or
- is_function(Body) ->
+ (Body == <<>>) or
+ is_tuple(Body) or
+ is_function(Body) ->
Headers;
false when is_binary(Body) ->
[{"content-length", integer_to_list(size(Body))} | Headers];
@@ -799,7 +833,12 @@
Headers_3 = cons_headers(Headers_2),
Uri = case get_value(use_absolute_uri, Options, false) or UseProxy of
true ->
- AbsPath;
+ case Is_ssl of
+ true ->
+ RelPath;
+ false ->
+ AbsPath
+ end;
false ->
RelPath
end,
@@ -1017,7 +1056,7 @@
send_queued_requests(lists:reverse(Q), State_1);
Err ->
do_trace("Upgrade to SSL socket failed. Reson: ~p~n", [Err]),
- do_error_reply(State, {error, send_failed}),
+ do_error_reply(State, {error, {send_failed, Err}}),
{error, send_failed}
end.
@@ -1029,12 +1068,12 @@
case send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State) of
{noreply, State_1} ->
send_queued_requests(Q, State_1);
- _ ->
+ Err ->
do_trace("Error sending queued SSL request: ~n"
"URL : ~s~n"
"Method : ~p~n"
"Headers : ~p~n", [Url, Method, Headers]),
- do_error_reply(State, {error, send_failed}),
+ do_error_reply(State, {error, {send_failed, Err}}),
{error, send_failed}
end.
@@ -1046,11 +1085,12 @@
%% This clause determines the chunk size when given data from the beginning of the chunk
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked,
- chunk_size = chunk_start,
+ chunk_size = chunk_start,
chunk_size_buffer = Chunk_sz_buf
} = State) ->
case scan_crlf(Chunk_sz_buf, DataRecvd) of
{yes, ChunkHeader, Data_1} ->
+ State_1 = maybe_accumulate_ce_data(State, <<ChunkHeader/binary, $\r, $\n>>),
ChunkSize = parse_chunk_header(ChunkHeader),
%%
%% Do we have to preserve the chunk encoding when
@@ -1061,10 +1101,10 @@
RemLen = size(Data_1),
do_trace("Determined chunk size: ~p. Already recvd: ~p~n",
[ChunkSize, RemLen]),
- parse_11_response(Data_1, State#state{chunk_size_buffer = <<>>,
- deleted_crlf = true,
- recvd_chunk_size = 0,
- chunk_size = ChunkSize});
+ parse_11_response(Data_1, State_1#state{chunk_size_buffer = <<>>,
+ deleted_crlf = true,
+ recvd_chunk_size = 0,
+ chunk_size = ChunkSize});
{no, Data_1} ->
State#state{chunk_size_buffer = Data_1}
end;
@@ -1074,13 +1114,15 @@
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked,
chunk_size = tbd,
- chunk_size_buffer = Buf}=State) ->
+ chunk_size_buffer = Buf
+ } = State) ->
case scan_crlf(Buf, DataRecvd) of
{yes, _, NextChunk} ->
- State_1 = State#state{chunk_size = chunk_start,
- chunk_size_buffer = <<>>,
- deleted_crlf = true},
- parse_11_response(NextChunk, State_1);
+ State_1 = maybe_accumulate_ce_data(State, <<$\r, $\n>>),
+ State_2 = State_1#state{chunk_size = chunk_start,
+ chunk_size_buffer = <<>>,
+ deleted_crlf = true},
+ parse_11_response(NextChunk, State_2);
{no, Data_1} ->
State#state{chunk_size_buffer = Data_1}
end;
@@ -1090,9 +1132,10 @@
%% received is silently discarded.
parse_11_response(DataRecvd,
#state{transfer_encoding = chunked, chunk_size = 0,
- cur_req = CurReq,
- deleted_crlf = DelCrlf,
- chunk_size_buffer = Trailer, reqs = Reqs}=State) ->
+ cur_req = CurReq,
+ deleted_crlf = DelCrlf,
+ chunk_size_buffer = Trailer,
+ reqs = Reqs} = State) ->
do_trace("Detected end of chunked transfer...~n", []),
DataRecvd_1 = case DelCrlf of
false ->
@@ -1101,12 +1144,14 @@
<<$\r, $\n, DataRecvd/binary>>
end,
case scan_header(Trailer, DataRecvd_1) of
- {yes, _TEHeaders, Rem} ->
+ {yes, TEHeaders, Rem} ->
{_, Reqs_1} = queue:out(Reqs),
- State_1 = handle_response(CurReq, State#state{reqs = Reqs_1}),
- parse_response(Rem, reset_state(State_1));
+ State_1 = maybe_accumulate_ce_data(State, <<TEHeaders/binary, $\r, $\n>>),
+ State_2 = handle_response(CurReq,
+ State_1#state{reqs = Reqs_1}),
+ parse_response(Rem, reset_state(State_2));
{no, Rem} ->
- State#state{chunk_size_buffer = Rem, deleted_crlf = false}
+ accumulate_response(<<>>, State#state{chunk_size_buffer = Rem, deleted_crlf = false})
end;
%% This clause extracts a chunk, given the size.
@@ -1121,8 +1166,15 @@
case DataLen >= NeedBytes of
true ->
{RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
- do_trace("Recvd another chunk...~n", []),
+ do_trace("Recvd another chunk...~p~n", [RemChunk]),
do_trace("RemData -> ~p~n", [RemData]),
+ case RemData of
+ <<>> ->
+ %% io:format("RemData -> ~p~n", [RemData]);
+ ok;
+ _ ->
+ ok
+ end,
case accumulate_response(RemChunk, State) of
{error, Reason} ->
do_trace("Error accumulating response --> ~p~n", [Reason]),
@@ -1155,6 +1207,11 @@
accumulate_response(DataRecvd, State#state{rep_buf_size = (RepBufSz+DataLen)})
end.
+maybe_accumulate_ce_data(#state{cur_req = #request{preserve_chunked_encoding = false}} = State, _) ->
+ State;
+maybe_accumulate_ce_data(State, Data) ->
+ accumulate_response(Data, State).
+
handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
response_format = Resp_format,
save_response_to_file = SaveResponseToFile,
@@ -1177,11 +1234,12 @@
_ ->
{file, TmpFilename}
end,
+ {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(RespHeaders, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of
true ->
- {ok, Status_line, Raw_headers, ResponseBody};
+ {ok, Status_line, Raw_headers_1, ResponseBody};
false ->
- {ok, SCode, RespHeaders, ResponseBody}
+ {ok, SCode, Resp_headers_1, ResponseBody}
end,
State_2 = do_reply(State_1, From, StreamTo, ReqId, Resp_format, Reply),
cancel_timer(ReqTimer, {eat_message, {req_timedout, From}}),
@@ -1192,16 +1250,17 @@
#state{http_status_code = SCode,
status_line = Status_line,
raw_headers = Raw_headers,
- recvd_headers = RespHeaders,
+ recvd_headers = Resp_headers,
reply_buffer = RepBuf,
send_timer = ReqTimer} = State) ->
Body = RepBuf,
%% State_1 = set_cur_request(State),
+ {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
Reply = case get_value(give_raw_headers, Options, false) of
true ->
- {ok, Status_line, Raw_headers, Body};
+ {ok, Status_line, Raw_headers_1, Body};
false ->
- {ok, SCode, RespHeaders, Body}
+ {ok, SCode, Resp_headers_1, Body}
end,
State_1 = case get(conn_close) of
"close" ->
@@ -1227,7 +1286,8 @@
deleted_crlf = false,
http_status_code = undefined,
chunk_size = undefined,
- transfer_encoding = undefined}.
+ transfer_encoding = undefined
+ }.
set_cur_request(#state{reqs = Reqs} = State) ->
case queue:to_list(Reqs) of
@@ -1459,15 +1519,29 @@
ok;
send_async_headers(ReqId, StreamTo, Give_raw_headers,
#state{status_line = Status_line, raw_headers = Raw_headers,
- recvd_headers = Headers, http_status_code = StatCode
- }) ->
+ recvd_headers = Headers, http_status_code = StatCode,
+ cur_req = #request{options = Opts}
+ }) ->
+ {Headers_1, Raw_headers_1} = maybe_add_custom_headers(Headers, Raw_headers, Opts),
case Give_raw_headers of
false ->
- catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers};
+ catch StreamTo ! {ibrowse_async_headers, ReqId, StatCode, Headers_1};
true ->
- catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers}
+ catch StreamTo ! {ibrowse_async_headers, ReqId, Status_line, Raw_headers_1}
end.
+maybe_add_custom_headers(Headers, Raw_headers, Opts) ->
+ Custom_headers = get_value(add_custom_headers, Opts, []),
+ Headers_1 = Headers ++ Custom_headers,
+ Raw_headers_1 = case Custom_headers of
+ [_ | _] when is_binary(Raw_headers) ->
+ Custom_headers_bin = list_to_binary(string:join([[X, $:, Y] || {X, Y} <- Custom_headers], "\r\n")),
+ <<Raw_headers/binary, "\r\n", Custom_headers_bin/binary>>;
+ _ ->
+ Raw_headers
+ end,
+ {Headers_1, Raw_headers_1}.
+
format_response_data(Resp_format, Body) ->
case Resp_format of
list when is_list(Body) ->
diff --git a/src/ibrowse_lb.erl b/src/ibrowse_lb.erl
index 6bc600b..0e001d4 100644
--- a/src/ibrowse_lb.erl
+++ b/src/ibrowse_lb.erl
@@ -16,7 +16,8 @@
%% External exports
-export([
start_link/1,
- spawn_connection/5
+ spawn_connection/5,
+ stop/1
]).
%% gen_server callbacks
@@ -85,6 +86,14 @@
is_integer(Max_sessions) ->
gen_server:call(Lb_pid,
{spawn_connection, Url, Max_sessions, Max_pipeline_size, SSL_options}).
+
+stop(Lb_pid) ->
+ case catch gen_server:call(Lb_pid, stop) of
+ {'EXIT', {timeout, _}} ->
+ exit(Lb_pid, kill);
+ ok ->
+ ok
+ end.
%%--------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
@@ -120,6 +129,18 @@
ets:insert(Tid, {{1, Pid}, []}),
{reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}};
+handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
+ gen_server:reply(_From, ok),
+ {stop, normal, State};
+
+handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
+ ets:foldl(fun({{_, Pid}, _}, Acc) ->
+ ibrowse_http_client:stop(Pid),
+ Acc
+ end, [], Tid),
+ gen_server:reply(_From, ok),
+ {stop, normal, State};
+
handle_call(Request, _From, State) ->
Reply = {unknown_request, Request},
{reply, Reply, State}.
diff --git a/src/ibrowse_test.erl b/src/ibrowse_test.erl
index 00b0244..e7d6e59 100644
--- a/src/ibrowse_test.erl
+++ b/src/ibrowse_test.erl
@@ -17,6 +17,7 @@
ue_test/1,
verify_chunked_streaming/0,
verify_chunked_streaming/1,
+ test_chunked_streaming_once/0,
i_do_async_req_list/4,
test_stream_once/3,
test_stream_once/4
@@ -260,7 +261,20 @@
io:format("Fetching data with streaming as binary...~n", []),
Async_response_bin = do_async_req_list(
Url, get, [{response_format, binary} | Options]),
- compare_responses(Result_without_streaming, Async_response_list, Async_response_bin).
+ io:format("Fetching data with streaming as binary, {active, once}...~n", []),
+ Async_response_bin_once = do_async_req_list(
+ Url, get, [once, {response_format, binary} | Options]),
+ compare_responses(Result_without_streaming, Async_response_list, Async_response_bin),
+ compare_responses(Result_without_streaming, Async_response_list, Async_response_bin_once).
+
+test_chunked_streaming_once() ->
+ test_chunked_streaming_once([]).
+
+test_chunked_streaming_once(Options) ->
+ Url = "http://www.httpwatch.com/httpgallery/chunked/",
+ io:format("URL: ~s~n", [Url]),
+ io:format("Fetching data with streaming as binary, {active, once}...~n", []),
+ do_async_req_list(Url, get, [once, {response_format, binary} | Options]).
compare_responses({ok, St_code, _, Body}, {ok, St_code, _, Body}, {ok, St_code, _, Body}) ->
success;
@@ -313,31 +327,54 @@
Msg ->
io:format("Recvd unknown message: ~p~n", [Msg]),
wait_for_resp(Pid)
- after 10000 ->
+ after 100000 ->
{error, timeout}
end.
i_do_async_req_list(Parent, Url, Method, Options) ->
- Res = ibrowse:send_req(Url, [], Method, [], [{stream_to, self()} | Options]),
+ Options_1 = case lists:member(once, Options) of
+ true ->
+ [{stream_to, {self(), once}} | (Options -- [once])];
+ false ->
+ [{stream_to, self()} | Options]
+ end,
+ Res = ibrowse:send_req(Url, [], Method, [], Options_1),
case Res of
{ibrowse_req_id, Req_id} ->
- Result = wait_for_async_resp(Req_id, undefined, undefined, []),
+ Result = wait_for_async_resp(Req_id, Options, undefined, undefined, []),
Parent ! {async_result, self(), Result};
Err ->
Parent ! {async_result, self(), Err}
end.
-wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, Body) ->
+wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, Body) ->
receive
{ibrowse_async_headers, Req_id, StatCode, Headers} ->
- wait_for_async_resp(Req_id, StatCode, Headers, Body);
+ %% io:format("Recvd headers...~n", []),
+ maybe_stream_next(Req_id, Options),
+ wait_for_async_resp(Req_id, Options, StatCode, Headers, Body);
{ibrowse_async_response_end, Req_id} ->
+ io:format("Recvd end of response.~n", []),
Body_1 = list_to_binary(lists:reverse(Body)),
{ok, Acc_Stat_code, Acc_Headers, Body_1};
{ibrowse_async_response, Req_id, Data} ->
- wait_for_async_resp(Req_id, Acc_Stat_code, Acc_Headers, [Data | Body]);
+ maybe_stream_next(Req_id, Options),
+ %% io:format("Recvd data...~n", []),
+ wait_for_async_resp(Req_id, Options, Acc_Stat_code, Acc_Headers, [Data | Body]);
+ {ibrowse_async_response, Req_id, {error, _} = Err} ->
+ {ok, Acc_Stat_code, Acc_Headers, Err};
Err ->
{ok, Acc_Stat_code, Acc_Headers, Err}
+ after 10000 ->
+ {timeout, Acc_Stat_code, Acc_Headers, Body}
+ end.
+
+maybe_stream_next(Req_id, Options) ->
+ case lists:member(once, Options) of
+ true ->
+ ibrowse:stream_next(Req_id);
+ false ->
+ ok
end.
execute_req(Url, Method, Options) ->
diff --git a/vsn.mk b/vsn.mk
index 0cc9f5a..b3fd96c 100644
--- a/vsn.mk
+++ b/vsn.mk
@@ -1,2 +1,2 @@
-IBROWSE_VSN = 1.6.2
+IBROWSE_VSN = 2.0.0