You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello.
I have embedded the Moquette version 0.17 in a spring boot project and overrided the AbstractInterceptHandler class to handle onPublish callback, but I get the following error when I publish a simple json message to the broker.
@Override
public void onPublish(InterceptPublishMessage msg) {
super.onPublish(msg);
final String decodedPayload = msg.getPayload().toString(UTF_8); //get error here!
System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
}
error:
Exception in thread "pool-2-thread-2" io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.handler.codec.mqtt.MqttPublishMessage.content(MqttPublishMessage.java:49)
at io.netty.handler.codec.mqtt.MqttPublishMessage.payload(MqttPublishMessage.java:42)
at io.moquette.interception.messages.InterceptPublishMessage.getPayload(InterceptPublishMessage.java:40)
at com.test.broker.intercept.BrokerEventListener.onPublish(BrokerEventListener.java:201)
at com.test.broker.intercept.BrokerEventListener$$FastClassBySpringCGLIB$$b1d48b25.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.test.broker.intercept.BrokerEventListener$$EnhancerBySpringCGLIB$$18337bb1.onPublish(<generated>)
at io.moquette.interception.BrokerInterceptor.lambda$notifyTopicPublished$3(BrokerInterceptor.java:134)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
The text was updated successfully, but these errors were encountered:
Moquette (by way of Netty) uses reference-counted buffers to avoid duplicating data. The last one to receive the buffer realeases it. In this case you pass the buffer on to the super-class, thus the super class releases the buffer and you can no longer use it afterwards.
To avoid the release, you have to add a "claim" to it by increasing the reference count before passing it on, and releasing the claim again once you are done:
@Override
public void onPublish(InterceptPublishMessage msg) {
msg.getPayload().retain();
super.onPublish(msg);
final String decodedPayload = msg.getPayload().toString(UTF_8);
System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
msg.getPayload().release();
}
Or, alternatively, move the call to super to be the last in your method.
Hello.
I have embedded the Moquette version
0.17
in a spring boot project and overrided theAbstractInterceptHandler
class to handleonPublish
callback, but I get the following error when I publish a simple json message to the broker.error:
The text was updated successfully, but these errors were encountered: