Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When an MQTT connection dies, it fails with a NPE and never reconnects #1181

Closed
ctron opened this issue Apr 28, 2021 · 11 comments · Fixed by #1228 · May be fixed by vert-x3/vertx-mqtt#197
Closed

When an MQTT connection dies, it fails with a NPE and never reconnects #1181

ctron opened this issue Apr 28, 2021 · 11 comments · Fixed by #1228 · May be fixed by vert-x3/vertx-mqtt#197
Labels

Comments

@ctron
Copy link
Contributor

ctron commented Apr 28, 2021

Assuming there is successfully connected MQTT connection, which then just dies. In the log you can see a NPE (`NullPointerException') and the client never reconnects.

2021-04-28 12:50:12,161 ERROR [io.sma.rea.mes.mqtt] (vert.x-eventloop-thread-0) SRMSG17103: An error has been caught while sending a MQTT message to the broker: java.lang.NullPointerException
Full exception stack trace
2021-04-28 12:50:12,161 ERROR [io.sma.rea.mes.mqtt] (vert.x-eventloop-thread-0) SRMSG17103: An error has been caught while sending a MQTT message to the broker: java.lang.NullPointerException
	at io.vertx.mqtt.impl.MqttClientImpl.write(MqttClientImpl.java:785)
	at io.vertx.mqtt.impl.MqttClientImpl.disconnect(MqttClientImpl.java:298)
	at io.vertx.mqtt.impl.MqttClientImpl.disconnect(MqttClientImpl.java:279)
	at io.vertx.mutiny.mqtt.MqttClient.disconnectAndForget(MqttClient.java:564)
	at io.smallrye.reactive.messaging.mqtt.MqttSink.lambda$new$4(MqttSink.java:78)
	at io.smallrye.context.impl.wrappers.SlowContextualRunnable.run(SlowContextualRunnable.java:19)
	at io.smallrye.mutiny.operators.multi.MultiOnCompletionInvoke$MultiOnCompletionInvokeProcessor.onCompletion(MultiOnCompletionInvoke.java:36)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.handleTerminationIfDone(MultiFlatMapOp.java:514)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.ifDoneOrCancelled(MultiFlatMapOp.java:486)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drainLoop(MultiFlatMapOp.java:286)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drain(MultiFlatMapOp.java:266)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onCompletion(MultiFlatMapOp.java:220)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.handleTerminationIfDone(MultiFlatMapOp.java:514)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.ifDoneOrCancelled(MultiFlatMapOp.java:486)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drainLoop(MultiFlatMapOp.java:286)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drain(MultiFlatMapOp.java:266)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onCompletion(MultiFlatMapOp.java:220)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.streams.utils.ConnectableProcessor.onComplete(ConnectableProcessor.java:129)
	at org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber$1DefaultCompletionSubscriber.onComplete(CompletionSubscriber.java:95)
	at org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber$1DefaultCompletionSubscriber.onComplete(CompletionSubscriber.java:95)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.handleTerminationIfDone(MultiFlatMapOp.java:514)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.ifDoneOrCancelled(MultiFlatMapOp.java:486)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drainLoop(MultiFlatMapOp.java:286)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drain(MultiFlatMapOp.java:266)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onCompletion(MultiFlatMapOp.java:220)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.streams.utils.ConnectableProcessor.onComplete(ConnectableProcessor.java:129)
	at io.smallrye.mutiny.streams.utils.WrappedProcessor.onComplete(WrappedProcessor.java:54)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onCompletion(MultiOperatorProcessor.java:74)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.handleTerminationIfDone(MultiFlatMapOp.java:514)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.ifDoneOrCancelled(MultiFlatMapOp.java:486)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drainLoop(MultiFlatMapOp.java:286)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drain(MultiFlatMapOp.java:266)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onCompletion(MultiFlatMapOp.java:220)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.streams.utils.ConnectableProcessor.onComplete(ConnectableProcessor.java:129)
	at io.smallrye.mutiny.streams.utils.WrappedProcessor.onComplete(WrappedProcessor.java:54)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onCompletion(MultiOperatorProcessor.java:74)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.isEmptyOrCompleted(MultiPublishOp.java:315)
	at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.drain(MultiPublishOp.java:381)
	at io.smallrye.mutiny.operators.multi.multicast.MultiPublishOp$PublishSubscriber.onCompletion(MultiPublishOp.java:195)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.handleTerminationIfDone(MultiFlatMapOp.java:514)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.ifDoneOrCancelled(MultiFlatMapOp.java:486)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drainLoop(MultiFlatMapOp.java:286)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.drain(MultiFlatMapOp.java:266)
	at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onCompletion(MultiFlatMapOp.java:220)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.streams.utils.ConnectableProcessor.onComplete(ConnectableProcessor.java:129)
	at io.smallrye.mutiny.streams.utils.WrappedProcessor.onComplete(WrappedProcessor.java:54)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onCompletion(MultiOperatorProcessor.java:74)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.subscription.SafeSubscriber.onComplete(SafeSubscriber.java:167)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onCompletion(MultiOperatorProcessor.java:74)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onCompletion(MultiOperatorProcessor.java:74)
	at io.smallrye.mutiny.operators.multi.MultiOnCancellationInvoke$MultiOnCancellationInvokeProcessor.onCompletion(MultiOnCancellationInvoke.java:36)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToMulti$FlatMapPublisherSubscriber.onComplete(UniOnItemTransformToMulti.java:69)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToMulti$FlatMapPublisherSubscriber.onComplete(UniOnItemTransformToMulti.java:69)
	at io.smallrye.mutiny.helpers.HalfSerializer.onComplete(HalfSerializer.java:75)
	at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onCompletion(StrictMultiSubscriber.java:95)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onCompletion(MultiOperatorProcessor.java:74)
	at io.smallrye.mutiny.operators.multi.MultiOperatorProcessor.onCompletion(MultiOperatorProcessor.java:74)
	at io.smallrye.mutiny.subscription.MultiSubscriber.onComplete(MultiSubscriber.java:83)
	at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor$BroadcastSubscription.onComplete(BroadcastProcessor.java:225)
	at io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor.onComplete(BroadcastProcessor.java:164)
	at io.smallrye.reactive.messaging.mqtt.Clients$ClientHolder.lambda$new$0(Clients.java:64)
	at io.vertx.mqtt.impl.MqttClientImpl.handleClosed(MqttClientImpl.java:806)
	at io.vertx.mqtt.impl.MqttClientImpl.lambda$null$1(MqttClientImpl.java:234)
	at io.vertx.core.net.impl.ConnectionBase.handleClosed(ConnectionBase.java:353)
	at io.vertx.core.net.impl.NetSocketImpl.handleClosed(NetSocketImpl.java:346)
	at io.vertx.core.net.impl.VertxHandler.lambda$channelInactive$3(VertxHandler.java:153)
	at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:366)
	at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:229)
	at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:221)
	at io.vertx.core.net.impl.VertxHandler.channelInactive(VertxHandler.java:153)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
	at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.handler.stream.ChunkedWriteHandler.channelInactive(ChunkedWriteHandler.java:138)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389)
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354)
	at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1106)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:832)
@cescoffier
Copy link
Contributor

The Vert.x MQTT client does not support reconnection, unfortunately.

@cescoffier cescoffier added the mqtt label May 2, 2021
@ctron
Copy link
Contributor Author

ctron commented May 3, 2021

That would mean that smallrye must provide this part. Would you accept PRs fixing that in smallrye?

Because right now, that would make the feature completely unusable from my point of view. Never recovering and not reporting this condition wouldn't allow any serious usage of the feature.

@cescoffier
Copy link
Contributor

Yes, sure, even if last time I checked it was quite complicated and should probably be something to add to the MQTT client directly.

@ctron
Copy link
Contributor Author

ctron commented May 4, 2021

It looks like a PR for this already exists: #1081

Also it looks to me as if adding a functionality like this is too opinionated for vertx: vert-x3/vertx-mqtt#66

My proposal would be focus on the PR mentioned above. It looks like we are not the only ones having interest in this feature.

@cescoffier
Copy link
Contributor

Well, yes, there is another PR and good reasons while it's not merged yet. A quick run revealed a hidden thread pool and the reactive API is just delegating blocking calls on that thread pool. So, not really in line with the overall architecture.

The outcome of vert-x3/vertx-mqtt#66 does not seem to be valid. I've discussed it with @vietj about it, and it's definitely a feature the client should provide. Unfortunately, at the moment, MQTT is not a priority.

@ctron
Copy link
Contributor Author

ctron commented May 6, 2021

Implementing such a feature isn't a big (technical) problem. It only looks to me like people ask for it, so do we. I think I can find the time do the implementation. But I would really like to focus on the technical aspect then, and not go into lengthy discussions on the "why" and "where". So if you can tell me "where" and "how" you would want to see this fixed, then I can try to get it done as soon as I can.

@cescoffier
Copy link
Contributor

cescoffier commented May 6, 2021

That would be awesome!

My first guess would be to fix it in the VErt.x MQTT Client (in https://github.com/vert-x3/vertx-mqtt/blob/master/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java),

Around https://github.com/vert-x3/vertx-mqtt/blob/master/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java#L260, you get a socket, and you may be able to attach a disconnect handler or just reuse the closeHandler:
https://github.com/vert-x3/vertx-mqtt/blob/master/src/main/java/io/vertx/mqtt/impl/MqttClientImpl.java#L279. Send if reconnect is enabled, I would just try to reconnect. Options such as reconnect interval, and number of attempt would be nice, but let's get something working first.

@ppatierno
Copy link
Contributor

it's definitely a feature the client should provide

@cescoffier it's totally wrong. The MQTT client HAS TO implement MQTT 3.1.1 specification and not other unrelated stuff. The rest has to be at higher level so at SmallRye level or application level.

@cescoffier
Copy link
Contributor

Well, it needs to be somewhere. We got many requests for such a feature, even a fork of the connector using a different client that is handling the reconnect. Note that if the Vert.x client would have offered an easy way to detect connection failures and reconnect (as Paho is doing BTW), we would not need this new abstraction, and everything could have been done in the connector.

Now if you want to keep the Vert.x client a pure implementation, we can:

  • move of this code here - but let's be clear, I'm not going to maintain it actively, as I don't know MQTT enough.
  • create a different project extending the Vert.x MQTT client with a reconnect option

@ctron
Copy link
Contributor Author

ctron commented May 18, 2021

I think handling reconnects is something that is not specific to smallrye, but others using MQTT as well.

And other clients (like HiveMQ, Paho and Fuse) do it the same way. I am not sure why Vertx must be special here.

So, if that it is a huge problem for everyone involved to put it into vertx-mqtt, then I would suggest create a vertx-mqtt-reconnect-client project (or some other name) with the content of the current PR.

@ppatierno
Copy link
Contributor

ppatierno commented May 18, 2021

(as Paho is doing BTW)

Regarding Paho having the reconnect feature, with a quick look on the code I noticed that it's just about reconnecting and then calling back to allow the higher level to add some logic. It's not trying to trace the subscriptions as we are trying to do with the PR on the Vert.x client. Anyway, If I am wrong please let me know maybe I missed some part of the Paho code.
If it's just about reconnecting I would agree to have it in the MQTT client.
AFAIK even when you implement an MQTT broker, there is a lower layer just implementing the MQTT spec and then the higher layer tracing subscription, messages and so on; this kind of stuff is not part of the spec.
Even the MQTT server part of Vert.x, it's not a full broker, it's just a layer that provides the 3.1.1 spec from a server perspective but that you can use for building a broker on it, or maybe a bridge to a different protocol.

So, if that it is a huge problem for everyone involved to put it into vertx-mqtt, then I would suggest create a vertx-mqtt-reconnect-client project (or some other name) with the content of the current PR.

I would not use the "huge problem" term, just highlighting that it's not MQTT part spec. Tbh I like more the solution of a specific project for it. Anyway I would pay attention to call it "Session", because it's ambiguos with what a session is in the MQTT land.

ctron added a commit to ctron/smallrye-reactive-messaging that referenced this issue May 27, 2021
ctron added a commit to ctron/smallrye-reactive-messaging that referenced this issue May 27, 2021
ctron added a commit to ctron/smallrye-reactive-messaging that referenced this issue May 27, 2021
ctron added a commit to ctron/smallrye-reactive-messaging that referenced this issue May 28, 2021
ctron added a commit to ctron/smallrye-reactive-messaging that referenced this issue May 28, 2021
ctron added a commit to ctron/smallrye-reactive-messaging that referenced this issue May 31, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants