Skip to content

Commit

Permalink
Merge pull request #112 from fal-ai/send-empty-packet
Browse files Browse the repository at this point in the history
fix: send empty packet to the client every 600 seconds
  • Loading branch information
isidentical authored Mar 4, 2024
2 parents f910fb5 + 8cbcd92 commit adc8c45
Showing 1 changed file with 13 additions and 1 deletion.
14 changes: 13 additions & 1 deletion src/isolate/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import threading
import time
import traceback
from collections import defaultdict
from concurrent import futures
Expand Down Expand Up @@ -32,6 +33,7 @@
from isolate.server.health_server import HealthServicer
from isolate.server.interface import from_grpc, to_grpc

EMPTY_MESSAGE_INTERVAL = float(os.getenv("ISOLATE_EMPTY_MESSAGE_INTERVAL", 600))
MAX_GRPC_WAIT_TIMEOUT = float(os.getenv("ISOLATE_MAX_GRPC_WAIT_TIMEOUT", 10.0))

# Whether to inherit all the packages from the current environment or not.
Expand Down Expand Up @@ -299,11 +301,21 @@ def watch_queue_until_completed(
"""Watch the given queue until the is_completed function returns True. Note that even
if the function is completed, this function might not finish until the queue is empty.
"""

timer = time.monotonic()
while not is_completed():
try:
yield queue.get(timeout=_Q_WAIT_DELAY)
except QueueEmpty:
continue
# Send an empty (but 'real') packet to the client, currently a hacky way
# to make sure the stream results are never ignored.
if time.monotonic() - timer > EMPTY_MESSAGE_INTERVAL:
timer = time.monotonic()
yield definitions.PartialRunResult(
is_complete=False,
logs=[],
result=None,
)

# Clear the final messages
while not queue.empty():
Expand Down

0 comments on commit adc8c45

Please sign in to comment.