From 3cddc5387a7ef50a8e429f6d1e3628ba4253ce70 Mon Sep 17 00:00:00 2001 From: Tahier Hussain Date: Wed, 4 Dec 2024 15:50:13 +0530 Subject: [PATCH] Implemented throttle delay of 1 sec for workflow socket messages --- .../helpers/socket-messages/SocketMessages.js | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/frontend/src/components/helpers/socket-messages/SocketMessages.js b/frontend/src/components/helpers/socket-messages/SocketMessages.js index ce1ce2a31..8f073c504 100644 --- a/frontend/src/components/helpers/socket-messages/SocketMessages.js +++ b/frontend/src/components/helpers/socket-messages/SocketMessages.js @@ -32,6 +32,8 @@ function SocketMessages() { // Buffer to hold the logs between throttle intervals const psLogs = useRef([]); const wfLogs = useRef([]); + // Buffer to hold the staged messages between throttle intervals + const stagedMsgsBuffer = useRef([]); useEffect(() => { setLogId(sessionDetails?.logEventsId || ""); @@ -53,13 +55,24 @@ function SocketMessages() { }, THROTTLE_DELAY) ).current; + // Throttled function for staged messages + const stagedMsgsThrottledUpdate = useRef( + throttle((bufferedMsgs) => { + bufferedMsgs.forEach((msg) => { + pushStagedMessage(msg); + }); + stagedMsgsBuffer.current = []; + }, 2000) + ).current; + // Clean up throttling functions on unmount useEffect(() => { return () => { psLogsThrottledUpdate.cancel(); wfLogsThrottledUpdate.cancel(); + stagedMsgsThrottledUpdate.cancel(); }; - }, [psLogsThrottledUpdate, wfLogsThrottledUpdate]); + }, [psLogsThrottledUpdate, wfLogsThrottledUpdate, stagedMsgsThrottledUpdate]); const handlePsLogs = useCallback( (msg) => { @@ -76,10 +89,20 @@ function SocketMessages() { }, [wfLogsThrottledUpdate] ); + + const handleStagedMessages = useCallback( + (msg) => { + stagedMsgsBuffer.current = [...stagedMsgsBuffer.current, msg]; + stagedMsgsThrottledUpdate(stagedMsgsBuffer.current); + }, + [stagedMsgsThrottledUpdate] + ); + // Handle incoming socket messages const onMessage = (data) => { try { let msg = data.data; + // Attempt to decode data as JSON if it's in encoded state if (typeof msg === "string" || msg instanceof Uint8Array) { if (typeof msg === "string") { @@ -95,7 +118,7 @@ function SocketMessages() { msg.message = msg?.log; handleWfLogs(msg); } else if (msg?.type === "UPDATE") { - pushStagedMessage(msg); + handleStagedMessages(msg); } else if (msg?.type === "LOG" && msg?.service === "prompt") { handlePsLogs(msg); }