-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: #46 support nack delay and ack timeouts #83
base: master
Are you sure you want to change the base?
Conversation
@blankensteiner let me know what you think |
* feat: apache#45 `Consumer` now supports `NegativeAcknowledge` * add `AcknowledgementTimeout` to `ConsumerOptions` * add `NegativeAcknowledgementRedeliveryDelay` to `ConsumerOptions` * add `NegativeackedMessageState` to manage nacked messages * add `UnackedMessageState` to manage unacked messages * add `MessageTracker` to periodically check unacked and nacked messages, on a fixed polling timeout of 10ms * add `AwaitingAck` to track both unacked and nacked messages * add `InactiveMessageTracker` to reduce overhead when no `AcknowledgementTimeout` or `NegativeAcknowledgementRedeliveryDelay` is configured * add `InactiveNegativeackedMessageState` to reduce overhead when no `NegativeAcknowledgementRedeliveryDelay` is configured * add `InactiveUnackedMessageState` to reduce overhead when no `AcknowledgementTimeout` is configured * update `ConsumerBuilder` to allow setting `AcknowledgementTimeout` * update `ConsumerBuilder` to allow setting `NegativeAcknowledgementRedeliveryDelay` * refactor `ConsumerChannel` to support `NegativeAcknowledge` * refactor `AsyncQueue<T>` to implement missing interface `IAsyncQueue<T>` * refactor `BatchHandler<TMessage>` to implement missing interface `IBatchHandler<TMessage>` * add `AutoFixture` and `AutoFixture.AutoNSubstitute` dependencies to unit test project * add missing `ConsumerBuilderTests` unit tests * add missing `ConsumerChannelFactoryTests` unit tests * add missing `ConsumerChannelTests` unit tests * add missing `ConsumerTests` unit tests * add IntegrationTests for consumer ack timout and nack delays * skipped integration test `SinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition` to avoid CI failures * skipped integration test `RoundRobinPartition_WhenSendMessages_ThenGetMessagesFromPartitionsInOrder` to avoid CI failures Closes: apache#46 Closes: apache#45
@blankensteiner so I was a bit worried about the interactions with batching, so I've added two integration tests where I attempt to create a scenario where this functionality is tested: pulsar-dotpulsar/tests/DotPulsar.IntegrationTests/ConsumerTests.cs Lines 225 to 226 in e7df3a5
Looking at the other PR's and the still open issue #7, am I fooling myself that this tests actually batches the messages and that at the moment there is actually no way (yet) to produce messages in a batched way using dotpulsar? That would make testing this particular behaviour tricky. I'd need to rely on deep(er) knowledge of the workings of batching to implement the right handling for this. |
Thanks for your PR. Currently, dotpulsar does not yet support batch sending. The code in your test isn't the correct batched way. Can we handle this scenario after the batch support is added? |
@RobertIndie yes that's okay for me, though I would need a strategy (for this PR) to simulate a batched message (to read). Do you have any ideas? |
Is there a reason this has not been merged? |
Consumer
now supportsNegativeAcknowledge
AcknowledgementTimeout
toConsumerOptions
NegativeAcknowledgementRedeliveryDelay
toConsumerOptions
NegativeackedMessageState
to manage nacked messagesUnackedMessageState
to manage unacked messagesMessageTracker
to periodically check unacked and nacked messages, on a fixed polling timeout of 10msAwaitingAck
to track both unacked and nacked messagesInactiveMessageTracker
to reduce overhead when noAcknowledgementTimeout
orNegativeAcknowledgementRedeliveryDelay
is configuredInactiveNegativeackedMessageState
to reduce overhead when noNegativeAcknowledgementRedeliveryDelay
is configuredInactiveUnackedMessageState
to reduce overhead when noAcknowledgementTimeout
is configuredConsumerBuilder
to allow settingAcknowledgementTimeout
ConsumerBuilder
to allow settingNegativeAcknowledgementRedeliveryDelay
ConsumerChannel
to supportNegativeAcknowledge
AsyncQueue<T>
to implement missing interfaceIAsyncQueue<T>
BatchHandler<TMessage>
to implement missing interfaceIBatchHandler<TMessage>
AutoFixture
andAutoFixture.AutoNSubstitute
dependencies to unit test projectConsumerBuilderTests
unit testsConsumerChannelFactoryTests
unit testsConsumerChannelTests
unit testsConsumerTests
unit testsSinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition
to avoid CI failuresRoundRobinPartition_WhenSendMessages_ThenGetMessagesFromPartitionsInOrder
to avoid CI failuresCloses: #46
Closes: #45