-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add server timeout functionality plus docker test #104
base: master
Are you sure you want to change the base?
add server timeout functionality plus docker test #104
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
Feel free to reach out here or on Slack if you have any questions :-)
/// the disconnected state and attempt to reconnect | ||
/// The default is 60 seconds. | ||
/// </summary> | ||
IPulsarClientBuilder ServerResponseTimeout(TimeSpan interval); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this also a configurable setting for other clients? If not, we could just hardcode it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't know about other clients but it seems it is configurable on the broker so the client setting should agree with that so I think it needs to be configurable here. Shame the value isn't returned from the server as part of the connection response
private readonly Timer _timer; | ||
private readonly CommandPing _ping; | ||
private readonly CommandPong _pong; | ||
private long _lastCommand; | ||
private readonly TaskCompletionSource<object> _serverNotRespondingTcs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The non-generic TaskCompletionSource is a better fit since the object is never used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems it has been removed
StephenCleary/AsyncEx#176
Task.Factory.StartNew(() => SendPing()); | ||
_timer.Change(_keepAliveInterval, TimeSpan.Zero); | ||
DotPulsarMeter.ServerTimedout(); | ||
_serverNotRespondingTcs.SetResult(new object()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might as well just return here instead of wrapping the following code in an else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -152,6 +152,7 @@ private IPulsarClient CreateClient() | |||
=> PulsarClient | |||
.Builder() | |||
.Authentication(AuthenticationFactory.Token(ct => ValueTask.FromResult(_fixture.CreateToken(Timeout.InfiniteTimeSpan)))) | |||
.KeepAliveInterval(TimeSpan.FromSeconds(5)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to make the test run faster, the check to see if the server responded happens every 5 seconds instead of 30
@@ -18,3 +18,6 @@ namespace DotPulsar.Tests; | |||
|
|||
[CollectionDefinition("Integration")] | |||
public class IntegrationCollection : ICollectionFixture<IntegrationFixture> { } | |||
|
|||
[CollectionDefinition("KeepAlive")] | |||
public class KeepAliveCollection : ICollectionFixture<KeepAliveFixture> { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is a new (standalone) cluster needed?
Seems we could solve this with just one cluster/integration fixture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to use the existing cluster but once you start poking around with the network subsequent tests failed so I opted for a separate cluster
@@ -1,4 +1,5 @@ | |||
{ | |||
"$schema": "https://xunit.net/schema/current/xunit.runner.schema.json", | |||
"diagnosticMessages": true | |||
"diagnosticMessages": true, | |||
"parallelizeTestCollections": false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed again if you go ahead with just one cluster for all integration tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for same reason as above I can't
src/DotPulsar/Internal/Connection.cs
Outdated
@@ -294,6 +298,11 @@ private async Task Send(BaseCommand command, CancellationToken cancellationToken | |||
} | |||
|
|||
public async Task ProcessIncommingFrames(CancellationToken cancellationToken) | |||
{ | |||
await Task.WhenAny(ProcessIncommingFramesImpl(cancellationToken), _pingPongHandler.ServerNotResponding); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a ConfigureAwait(false)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -26,6 +26,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Processing", "samples\Processing\Processing.csproj", "{CC1494FA-4EB5-4DB9-8BE9-0A6E8D0D963E}" | |||
EndProject | |||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotPulsar.Consumer", "tests\DotPulsar.Consumer\DotPulsar.Consumer.csproj", "{36E6E6EF-A471-4AE4-B696-1C9DAAFA2770}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's get the new tests into DotPulsar.Tests instead of creating new test projects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need an executable that I can run in docker to connect to the server
As mentioned in the keep alive section of the pulsar binary configuration here if the server stops responding to our pings when we have to assume the connection is no longer valid. We can't just rely on the transport to tell us if the connection is alive or not. I've implemented it based on any response from the server rather than specifically a pong, its open to debate if this is correct, I could go either way.
I've also added a test to demonstrate the bug which is fixed by my code in the
PingPongHandler