Skip to content

Commit

Permalink
Add request timeout (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
zuiderkwast committed Jan 29, 2024
1 parent 48eefeb commit c56d0df
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 62 deletions.
28 changes: 16 additions & 12 deletions src/gun.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@
-type req_opts() :: #{
flow => pos_integer(),
reply_to => pid(),
tunnel => stream_ref()
tunnel => stream_ref(),
timeout => timeout()
}.
-export_type([req_opts/0]).

Expand Down Expand Up @@ -634,8 +635,9 @@ headers(ServerPid, Method, Path, Headers0, ReqOpts) ->
StreamRef = make_stream_ref(Tunnel),
InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
Timeout = maps:get(timeout, ReqOpts, infinity),
gen_statem:cast(ServerPid, {headers, ReplyTo, StreamRef,
Method, Path, normalize_headers(Headers0), InitialFlow}),
Method, Path, normalize_headers(Headers0), InitialFlow, Timeout}),
StreamRef.

-spec request(pid(), iodata(), iodata(), req_headers(), iodata()) -> stream_ref().
Expand All @@ -648,8 +650,9 @@ request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
StreamRef = make_stream_ref(Tunnel),
InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
Timeout = maps:get(timeout, ReqOpts, infinity),
gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef,
Method, Path, normalize_headers(Headers), Body, InitialFlow}),
Method, Path, normalize_headers(Headers), Body, InitialFlow, Timeout}),
StreamRef.

get_tunnel(#{tunnel := Tunnel}) when is_reference(Tunnel) ->
Expand Down Expand Up @@ -700,8 +703,9 @@ connect(ServerPid, Destination, Headers, ReqOpts) ->
StreamRef = make_stream_ref(Tunnel),
InitialFlow = maps:get(flow, ReqOpts, infinity),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
Timeout = maps:get(timeout, ReqOpts, infinity),
gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef,
Destination, Headers, InitialFlow}),
Destination, Headers, InitialFlow, Timeout}),
StreamRef.

%% Awaiting gun messages.
Expand Down Expand Up @@ -1295,34 +1299,34 @@ connected_ws_only(Type, Event, State) ->
%%
%% @todo It might be better, internally, to pass around a URIMap
%% containing the target URI, instead of separate Host/Port/PathWithQs.
connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow},
connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow, Timeout},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Commands, CookieStore, EvHandlerState} = Protocol:headers(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout),
commands(Commands, State#state{cookie_store=CookieStore,
event_handler_state=EvHandlerState});
connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow},
connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow, Timeout},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Commands, CookieStore, EvHandlerState} = Protocol:request(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
InitialFlow, CookieStore0, EvHandler, EvHandlerState0, Timeout),
commands(Commands, State#state{cookie_store=CookieStore,
event_handler_state=EvHandlerState});
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow, Timeout},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{Commands, EvHandlerState} = Protocol:connect(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Destination, #{host => Host, port => Port},
Headers, InitialFlow, EvHandler, EvHandlerState0),
Headers, InitialFlow, EvHandler, EvHandlerState0, Timeout),
commands(Commands, State#state{event_handler_state=EvHandlerState});
%% Public Websocket interface.
connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) ->
Expand Down Expand Up @@ -1387,11 +1391,11 @@ closing(state_timeout, closing_timeout, State=#state{status=Status}) ->
end,
disconnect(State, Reason);
%% When reconnect is disabled, fail HTTP/Websocket operations immediately.
closing(cast, {headers, ReplyTo, StreamRef, _Method, _Path, _Headers, _InitialFlow},
closing(cast, {headers, ReplyTo, StreamRef, _Method, _Path, _Headers, _InitialFlow, _Timeout},
State=#state{opts=#{retry := 0}}) ->
ReplyTo ! {gun_error, self(), StreamRef, closing},
{keep_state, State};
closing(cast, {request, ReplyTo, StreamRef, _Method, _Path, _Headers, _Body, _InitialFlow},
closing(cast, {request, ReplyTo, StreamRef, _Method, _Path, _Headers, _Body, _InitialFlow, _Timeout},
State=#state{opts=#{retry := 0}}) ->
ReplyTo ! {gun_error, self(), StreamRef, closing},
{keep_state, State};
Expand Down
76 changes: 55 additions & 21 deletions src/gun_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
-export([closing/4]).
-export([close/4]).
-export([keepalive/3]).
-export([headers/12]).
-export([request/13]).
-export([headers/13]).
-export([request/14]).
-export([data/7]).
-export([connect/9]).
-export([connect/10]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
-export([down/1]).
-export([ws_upgrade/11]).
Expand Down Expand Up @@ -62,7 +63,8 @@
path :: iodata(),

is_alive :: boolean(),
handler_state :: undefined | gun_content_handler:state()
handler_state :: undefined | gun_content_handler:state(),
timer_ref :: undefined | reference()
}).

-record(http_state, {
Expand Down Expand Up @@ -545,7 +547,8 @@ close_streams(_, [], _) ->
ok;
close_streams(State, [#stream{is_alive=false}|Tail], Reason) ->
close_streams(State, Tail, Reason);
close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo, timer_ref=TimerRef}|Tail], Reason) ->
cancel_stream_timer(TimerRef),
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason},
close_streams(State, Tail, Reason).

Expand All @@ -561,43 +564,43 @@ keepalive(#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandle
keepalive(_State, _, EvHandlerState) ->
{[], EvHandlerState}.

headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState, _)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{[], CookieStore, EvHandlerState};
headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0, Timeout) ->
{SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
Command = case SendResult of
ok ->
InitialFlow = initial_flow(InitialFlow0, Opts),
{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)};
ReplyTo, Method, Authority, Path, InitialFlow, Timeout)};
Error={error, _} ->
Error
end,
{Command, CookieStore, EvHandlerState}.

request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState, _)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{[], CookieStore, EvHandlerState};
request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0, Timeout) ->
{SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
Command = case SendResult of
ok ->
InitialFlow = initial_flow(InitialFlow0, Opts),
{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
ReplyTo, Method, Authority, Path, InitialFlow)};
ReplyTo, Method, Authority, Path, InitialFlow, Timeout)};
Error={error, _} ->
Error
end,
Expand Down Expand Up @@ -760,19 +763,19 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
{[], EvHandlerState0}
end.

connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState, _)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
{[], EvHandlerState};
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState, _)
when Streams =/= [] ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
{[], EvHandlerState};
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
EvHandler, EvHandlerState0) ->
EvHandler, EvHandlerState0, Timeout) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
_ -> Host0
Expand Down Expand Up @@ -817,7 +820,7 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
{{state, new_stream(State, {connect, StreamRef, Destination},
ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow)},
ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow, Timeout)},
EvHandlerState};
Error={error, _} ->
{Error, EvHandlerState1}
Expand All @@ -840,6 +843,17 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
{[], EvHandlerState0}
end.

timeout(State0=#http_state{streams=Streams}, {?MODULE, stream_timeout, StreamRef}, TRef) ->
case lists:keyfind(StreamRef, #stream.ref, Streams) of
#stream{reply_to=ReplyTo, timer_ref=TRef} ->
error_stream_timeout(State, StreamRef, ReplyTo),
State = cancel_stream(State0, StreamRef),
{state, State};
_ ->
%% Ignore non-existing streams and streams where TRef doesn't match.
[]
end.

stream_info(#http_state{streams=Streams}, StreamRef) ->
case lists:keyfind(StreamRef, #stream.ref, Streams) of
#stream{reply_to=ReplyTo, is_alive=IsAlive} ->
Expand All @@ -862,6 +876,11 @@ down(#http_state{streams=Streams}) ->
_ -> Ref
end || #stream{ref=Ref} <- Streams].

error_stream_timeout(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {timeout,
"The stream has timed out."}},
ok.

error_stream_closed(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
Expand Down Expand Up @@ -934,27 +953,41 @@ stream_ref(#websocket{ref=StreamRef}) -> StreamRef;
stream_ref(StreamRef) -> StreamRef.

new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo,
Method, Authority, Path, InitialFlow) ->
Method, Authority, Path, InitialFlow, Timeout) ->
TimerRef = start_stream_timer(StreamRef, Timeout),
State#http_state{streams=Streams
++ [#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
method=iolist_to_binary(Method), authority=Authority,
path=iolist_to_binary(Path), is_alive=true}]}.
path=iolist_to_binary(Path), is_alive=true,
timer_ref=TimerRef}]}.

is_stream(#http_state{streams=Streams}, StreamRef) ->
lists:keymember(StreamRef, #stream.ref, Streams).

cancel_stream(State=#http_state{streams=Streams}, StreamRef) ->
Streams2 = [case Ref of
StreamRef ->
Tuple#stream{is_alive=false};
cancel_stream_timer(TimerRef),
Tuple#stream{is_alive=false, timer_ref=undefined};
_ ->
Tuple
end || Tuple = #stream{ref=Ref} <- Streams],
end || Tuple = #stream{ref=Ref, timer_ref=TimerRef} <- Streams],
State#http_state{streams=Streams2}.

end_stream(State=#http_state{streams=[_|Tail]}) ->
end_stream(State=#http_state{streams=[#stream{timer_ref=TimerRef}|Tail]}) ->
cancel_stream_timer(TimerRef),
State#http_state{in=head, streams=Tail}.

start_stream_timer(_StreamRef, infinity) ->
undefined;
start_stream_timer(StreamRef, Timeout) ->
erlang:start_timer(Timeout, self(), {?MODULE, stream_timeout, StreamRef}).

cancel_stream_timer(undefined) ->
ok;
cancel_stream_timer(TimerRef) ->
erlang:cancel_timer(TimerRef).

%% Websocket upgrade.

ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState)
Expand Down Expand Up @@ -999,7 +1032,8 @@ ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
{state, new_stream(State#http_state{connection=Conn, out=Out},
#websocket{ref=StreamRef, reply_to=ReplyTo, key=Key,
extensions=GunExtensions, opts=WsOpts},
ReplyTo, <<"GET">>, Authority, Path, InitialFlow)};
ReplyTo, <<"GET">>, Authority, Path, InitialFlow,
infinity)};
Error={error, _} ->
Error
end,
Expand Down
Loading

0 comments on commit c56d0df

Please sign in to comment.