-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support - Consumer.NegativeAcknowledge #45
Comments
From what I can tell both the Java and C++ clients use a client sided tracker to keep track of negatively acknowledged messages add by the Consumer. negativeAcknowledge in a NegativeAcksTracker. The tracker is responsible for calling the RedeliverUnacknowledgedMessages consumer command, with those message ids that exceed their respective timeout. I see there is some interaction with another mechanism required to track unacked messages but this only exists when the client actually sets an AckTimeout (request in #46) |
This sounds related to the "reconsumeLater" functionality. Not sure how they differ. |
* feat: apache#45 `Consumer` now supports `NegativeAcknowledge` * add `AcknowledgementTimeout` to `ConsumerOptions` * add `NegativeAcknowledgementRedeliveryDelay` to `ConsumerOptions` * add `NegativeackedMessageState` to manage nacked messages * add `UnackedMessageState` to manage unacked messages * add `MessageTracker` to periodically check unacked and nacked messages, on a fixed polling timeout of 10ms * add `AwaitingAck` to track both unacked and nacked messages * add `InactiveMessageTracker` to reduce overhead when no `AcknowledgementTimeout` or `NegativeAcknowledgementRedeliveryDelay` is configured * add `InactiveNegativeackedMessageState` to reduce overhead when no `NegativeAcknowledgementRedeliveryDelay` is configured * add `InactiveUnackedMessageState` to reduce overhead when no `AcknowledgementTimeout` is configured * update `ConsumerBuilder` to allow setting `AcknowledgementTimeout` * update `ConsumerBuilder` to allow setting `NegativeAcknowledgementRedeliveryDelay` * refactor `ConsumerChannel` to support `NegativeAcknowledge` * refactor `AsyncQueue<T>` to implement missing interface `IAsyncQueue<T>` * refactor `BatchHandler<TMessage>` to implement missing interface `IBatchHandler<TMessage>` * add `AutoFixture` and `AutoFixture.AutoNSubstitute` dependencies to unit test project * add missing `ConsumerBuilderTests` unit tests * add missing `ConsumerChannelFactoryTests` unit tests * add missing `ConsumerChannelTests` unit tests * add missing `ConsumerTests` unit tests * add IntegrationTests for consumer ack timout and nack delays * skipped integration test `SinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition` to avoid CI failures * skipped integration test `RoundRobinPartition_WhenSendMessages_ThenGetMessagesFromPartitionsInOrder` to avoid CI failures Closes: apache#46 Closes: apache#45
* feat: apache#45 `Consumer` now supports `NegativeAcknowledge` * add `AcknowledgementTimeout` to `ConsumerOptions` * add `NegativeAcknowledgementRedeliveryDelay` to `ConsumerOptions` * add `NegativeackedMessageState` to manage nacked messages * add `UnackedMessageState` to manage unacked messages * add `MessageTracker` to periodically check unacked and nacked messages, on a fixed polling timeout of 10ms * add `AwaitingAck` to track both unacked and nacked messages * add `InactiveMessageTracker` to reduce overhead when no `AcknowledgementTimeout` or `NegativeAcknowledgementRedeliveryDelay` is configured * add `InactiveNegativeackedMessageState` to reduce overhead when no `NegativeAcknowledgementRedeliveryDelay` is configured * add `InactiveUnackedMessageState` to reduce overhead when no `AcknowledgementTimeout` is configured * update `ConsumerBuilder` to allow setting `AcknowledgementTimeout` * update `ConsumerBuilder` to allow setting `NegativeAcknowledgementRedeliveryDelay` * refactor `ConsumerChannel` to support `NegativeAcknowledge` * refactor `AsyncQueue<T>` to implement missing interface `IAsyncQueue<T>` * refactor `BatchHandler<TMessage>` to implement missing interface `IBatchHandler<TMessage>` * add `AutoFixture` and `AutoFixture.AutoNSubstitute` dependencies to unit test project * add missing `ConsumerBuilderTests` unit tests * add missing `ConsumerChannelFactoryTests` unit tests * add missing `ConsumerChannelTests` unit tests * add missing `ConsumerTests` unit tests * add IntegrationTests for consumer ack timout and nack delays * skipped integration test `SinglePartition_WhenSendMessages_ThenGetMessagesFromSinglePartition` to avoid CI failures * skipped integration test `RoundRobinPartition_WhenSendMessages_ThenGetMessagesFromPartitionsInOrder` to avoid CI failures Closes: apache#46 Closes: apache#45
@blankensteiner Is this something we are considering in the future? Would love to utilize this functionality, it's already in F#'s version, but just wondering if we can expect this in the near future, it's just nice for automated retry or reconsuming upon an exception occurring or other. |
Hi @alexcherka |
Hey @blankensteiner Just checking if we have an update for this? Been more than a year lol... |
Hi @alexcherka |
I am currently working with Apache Pulsar and using DotPulsar for my messaging needs, I have a requirement NegativeAcknowledge. Is this will avail in next release. |
NegativeAcknowledge provides a nice way to shape error handling when messages cannot be processed and you want to do so at a later time with a given timeout:
Prefer negative acknowledgements over acknowledgement timeout. Negative acknowledgement controls the re-delivery of individual messages with more precision, and avoids invalid redeliveries when the message processing time exceeds the acknowledgement timeout.
The text was updated successfully, but these errors were encountered: