-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Numaflow reducer doesn't output grouped message right after fixed time window #81
Comments
Hi @borkox, the behaviour you're experiencing is due to the way the watermark progresses in the current implementation. The watermark, which is used to determine when a window has expired, stops progressing when the source is idling. This means that if no new messages are coming into the vertex, the watermark will not advance and thus the window will not expire, causing This is not an issue with the SDK itself, but rather a behaviour that needs to be addressed on the numaflow side. We are aware of this and are actively working on a solution. You can track the progress of this issue at numaproj/numaflow#633. It's a work in progress and we will update as soon as we have a fix available. |
Thanks for explanation. |
And another question how to workaround this problem. If I generate a message towards my grouping vertex, does it have to be keyed, because I have 2 partitions at the moment? How a random message without keys would help me if it goes to the wrong pod? What about the other pod, it will be silent I assume ? |
I also made few more tests and I put in the pipeline same message again and again and it seems reducer still keep old data inside and doesn't fire
If you check those logs it is visible that after incoming message still 2 more minutes needs to pass(from 17:07:43 to 17:09:24) in addition to initial 2 minutes for the fixed window. To me this seems like random behavior. Also it is a problem visible on first two rows that 2 messages trigger creation of 2 different reducers. Number after "PageErrorsReduceFactory - " represents hashCode of the created reducer from the factory. |
In Numaflow the progression of time is data-driven rather than being based on the clock time. This means that the watermark progression or window closure is not dependent on clock-time but rather on the incoming data flow. Is the data coming in at a very high rate? If so, could you please provide some insights into the transactions per second (TPS) you're currently experiencing? |
@borkox With idle source watermark, you should be able to fix this (Numaflow 1.1 release). The ideal generic solution will be through early triggers, which will be possible via a streaming-reduce concept, which should be available in the Numaflow 1.2 release. |
We are in testing mode so far, so I am testing with 3 to 4 messages for two hours. |
I tried to attach generator to the reducer as another source but seems I cannot control the watermark. The watermark on messages that are generated with user defined generator are always "-1" which is "1969-12-31T23:59:59.999Z" |
@borkox - attaching another source won't help - when we calculate the watermark, we need to check all the sources. Before we have v1.1, if you do not have continuous streaming data coming to the pipeline, one hacky workaround is sending some dummy data to the pipeline and filtering them out before the reduce, then the watermark will be moving hence the windows can be closed. |
@borkox thank you for being an early adopter of Numaflow. As we work through upcoming releases, we love to learn about our community use cases and take feedback, wanted to see if you'd be interested to chat with Numaflow team? |
I would love to chat, have discussion or video call. My email is borkox (at) gmail.com, I am Sofia time Zone (EET), so any working time from 9 AM to 18PM is fine for me. |
I tried several examples, summing odd even numbers and so on. I think reduce didn't worked as expected no matter what I do. The only case which it worked not so bad is to have a transformer which sets the time of the message which magically is converted to a watermark. |
Perfect will schedule a call soon. Meanwhile one of us will be helping with you some of the queries above |
The watermark is correct as expected. Watermark is not the same as the event time in the message, but an approximate timestamp that guarantees no data earlier that timestamp will be seen in the pipeline. Here is a video that explains event time and watermark in flink, that is not the way we calculate the wartermark, but it might be helpful for you to understand what watermark is. |
I have a numaflow v0.10.1 pipeline with a reduce vertex numaflow-java 0.5.5.
I am using keyed messages with size one and 2 partitions.
After several messages entering the group node I expect when the fixed time window gets expired my vertex to output one message with grouped things, but my method is called after many minutes or only when new messages are coming. If no messages are coming, then things in memory of the vertex are not output.
My group handler looks like this (Ideleted imports and actual code trying to make it anonymous)
The method
getOutput(...)
is not invoked from akka actors if new messages are not coming to the vertex. I think this is a bug.Sometimes
getOutput(...)
is invoked after 20 minutes, sometimes it is invoked after 2 hours. Sometimes it is not invoked for a whole day (when new messages are not coming in the vertex)The text was updated successfully, but these errors were encountered: