Skip to content

Commit

Permalink
Make model communication non-blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
BSchilperoort committed Jan 25, 2024
1 parent 8bae092 commit 4a8a761
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 52 deletions.
77 changes: 59 additions & 18 deletions PyStemmusScope/bmi/docker_process.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
"""The Docker STEMMUS_SCOPE model process wrapper."""
import os
import socket as pysocket
import warnings
from time import sleep
from typing import Any
from PyStemmusScope.bmi.docker_utils import check_tags
from PyStemmusScope.bmi.docker_utils import find_image
from PyStemmusScope.bmi.docker_utils import make_docker_vols_binds
from PyStemmusScope.bmi.utils import MATLAB_ERROR
from PyStemmusScope.bmi.utils import PROCESS_FINALIZED
from PyStemmusScope.bmi.utils import PROCESS_READY
from PyStemmusScope.bmi.utils import MatlabError
from PyStemmusScope.config_io import read_config


Expand All @@ -14,26 +20,64 @@
docker = None


def _model_is_ready(socket: Any, client: Any, container_id: Any) -> None:
return _wait_for_model(PROCESS_READY, socket, client, container_id)


def _model_is_finalized(socket: Any, client: Any, container_id: Any) -> None:
return _wait_for_model(PROCESS_FINALIZED, socket, client, container_id)


def _wait_for_model(phrase: bytes, socket: Any, client: Any, container_id: Any) -> None:
"""Wait for the model to be ready to receive (more) commands, or is finalized."""
output = b""
error_msg = b"Error in "

while phrase not in output:
if error_msg in output:
try:
data = socket.read(1)
except TimeoutError as err:
client.stop(container_id)
logs = client.logs(container_id).decode("utf-8")
raise ValueError(
f"Error in container '{container_id['Id']}'. Please inspect logs."
"\nDOCKER LOGS:\n" + logs
msg = (
f"Container connection timed out '{container_id['Id']}'."
f"\nPlease inspect logs:\n{logs}"
)
raise TimeoutError(msg) from err

data = socket.read(1)
if data is None:
msg = "Could not read data from socket. Docker container might be dead."
raise ConnectionError(msg)
else:
output += bytes(data)

if MATLAB_ERROR in output:
client.stop(container_id)
logs = client.logs(container_id).decode("utf-8")
msg = (
f"Error in container '{container_id['Id']}'.\n"
f"Please inspect logs:\n{logs}"
)
raise MatlabError(msg)


def _attach_socket(client, container_id) -> Any:
"""Attach a socket to a container and add a timeout to it."""
connection_timeout = 30 # seconds

socket = client.attach_socket(container_id, {"stdin": 1, "stdout": 1, "stream": 1})
if isinstance(socket, pysocket.SocketIO):
socket._sock.settimeout(connection_timeout) # type: ignore
else:
warnings.warn(
message=(
"Unknown socket type found. This might cause issues with the Docker"
" connection. \nPlease report this to the developers in an issue "
"on: https://github.com/EcoExtreML/STEMMUS_SCOPE_Processing/issues"
),
stacklevel=1,
)
return socket


class StemmusScopeDocker:
"""Communicate with a STEMMUS_SCOPE Docker container."""
Expand Down Expand Up @@ -68,11 +112,9 @@ def __init__(self, cfg_file: str):

self.running = False

def wait_for_model(self) -> None:
def _wait_for_model(self) -> None:
"""Wait for the model to be ready to receive (more) commands."""
_wait_for_model(
self._process_ready_phrase, self.socket, self.client, self.container_id
)
_model_is_ready(self.socket, self.client, self.container_id)

def is_alive(self) -> bool:
"""Return if the process is alive."""
Expand All @@ -84,23 +126,22 @@ def initialize(self) -> None:
self.client.stop(self.container_id)

self.client.start(self.container_id)
self.socket = self.client.attach_socket(
self.container_id, {"stdin": 1, "stdout": 1, "stream": 1}
)
self.wait_for_model()
self.socket = _attach_socket(self.client, self.container_id)

self._wait_for_model()
os.write(
self.socket.fileno(),
bytes(f'initialize "{self.cfg_file}"\n', encoding="utf-8"),
)
self.wait_for_model()
self._wait_for_model()

self.running = True

def update(self) -> None:
"""Update the model and wait for it to be ready."""
if self.is_alive():
os.write(self.socket.fileno(), b"update\n")
self.wait_for_model()
self._wait_for_model()
else:
msg = "Docker container is not alive. Please restart the model."
raise ConnectionError(msg)
Expand All @@ -109,14 +150,14 @@ def finalize(self) -> None:
"""Finalize the model."""
if self.is_alive():
os.write(self.socket.fileno(), b"finalize\n")
_wait_for_model(
self._process_finalized_phrase,
_model_is_finalized(
self.socket,
self.client,
self.container_id,
)
sleep(0.5) # Ensure the container can stop cleanly.
self.client.stop(self.container_id)
self.running = False
self.client.remove_container(self.container_id, v=True)
else:
pass
99 changes: 79 additions & 20 deletions PyStemmusScope/bmi/local_process.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
"""The local STEMMUS_SCOPE model process wrapper."""
import os
import platform
import subprocess
from pathlib import Path
from time import sleep
from typing import Union
from PyStemmusScope.bmi.utils import MATLAB_ERROR
from PyStemmusScope.bmi.utils import PROCESS_READY
from PyStemmusScope.bmi.utils import MatlabError
from PyStemmusScope.config_io import read_config


def is_alive(process: Union[subprocess.Popen, None]) -> subprocess.Popen:
def alive_process(process: Union[subprocess.Popen, None]) -> subprocess.Popen:
"""Return process if the process is alive, raise an exception if it is not."""
if process is None:
msg = "Model process does not seem to be open."
Expand All @@ -17,12 +22,39 @@ def is_alive(process: Union[subprocess.Popen, None]) -> subprocess.Popen:
return process


def wait_for_model(process: subprocess.Popen, phrase=b"Select BMI mode:") -> None:
def read_stdout(process: subprocess.Popen) -> bytes:
"""Read from stdout. If the stream ends unexpectedly, an error is raised."""
assert process.stdout is not None # required for type narrowing.
read = process.stdout.read(1)
if read is None:
sleep(5)
read = process.stdout.read(1)
if read is not None:
return bytes(read)
msg = "Connection error: could not find expected output or "
raise ConnectionError(msg)
return bytes(read)


def _model_is_ready(process: subprocess.Popen) -> None:
return _wait_for_model(PROCESS_READY, process)


def _wait_for_model(phrase: bytes, process: subprocess.Popen) -> None:
"""Wait for model to be ready for interaction."""
output = b""
while is_alive(process) and phrase not in output:
assert process.stdout is not None # required for type narrowing.
output += bytes(process.stdout.read(1))

while alive_process(process) and phrase not in output:
output += read_stdout(process)
if MATLAB_ERROR in output:
try:
process.terminate()
finally:
msg = (
"Error encountered in Matlab.\n"
"Please inspect logs in the output directory"
)
raise MatlabError(msg)


def find_exe(config: dict) -> str:
Expand Down Expand Up @@ -51,46 +83,73 @@ def __init__(self, cfg_file: str) -> None:
exe_file = find_exe(config)
args = [exe_file, cfg_file, "bmi"]

os.environ["MATLAB_LOG_DIR"] = str(config["InputPath"])

self.matlab_process = subprocess.Popen(
lib_path = os.getenv("LD_LIBRARY_PATH")
if lib_path is None:
msg = (
"Environment variable LD_LIBRARY_PATH not found. "
"Refer the Matlab Compiler Runtime documentation"
)
raise ValueError(msg)

# Ensure output directory exists so log file can be written:
Path(config["OutputPath"]).mkdir(parents=True, exist_ok=True)
env = {
"LD_LIBRARY_PATH": lib_path,
"MATLAB_LOG_DIR": str(config["OutputPath"]),
}

self.process = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
bufsize=0,
env=env,
)

wait_for_model(self.matlab_process)
if platform.system() == "Linux":
assert self.process.stdout is not None # required for type narrowing.
# Make the connection non-blocking to allow for a timeout on read.
os.set_blocking(self.process.stdout.fileno(), False)
else:
msg = "Unexpected system. The executable is only compiled for Linux."
raise ValueError(msg)
_model_is_ready(self.process)

def is_alive(self) -> bool:
"""Return if the process is alive."""
try:
is_alive(self.matlab_process)
alive_process(self.process)
return True
except ConnectionError:
return False

def initialize(self) -> None:
"""Initialize the model and wait for it to be ready."""
self.matlab_process = is_alive(self.matlab_process)
self.process = alive_process(self.process)

self.matlab_process.stdin.write( # type: ignore
self.process.stdin.write( # type: ignore
bytes(f'initialize "{self.cfg_file}"\n', encoding="utf-8")
)
wait_for_model(self.matlab_process)
_model_is_ready(self.process)

def update(self) -> None:
"""Update the model and wait for it to be ready."""
if self.matlab_process is None:
if self.process is None:
msg = "Run initialize before trying to update the model."
raise AttributeError(msg)

self.matlab_process = is_alive(self.matlab_process)
self.matlab_process.stdin.write(b"update\n") # type: ignore
wait_for_model(self.matlab_process)
self.process = alive_process(self.process)
self.process.stdin.write(b"update\n") # type: ignore
_model_is_ready(self.process)

def finalize(self) -> None:
"""Finalize the model."""
self.matlab_process = is_alive(self.matlab_process)
self.matlab_process.stdin.write(b"finalize\n") # type: ignore
wait_for_model(self.matlab_process, phrase=b"Finished clean up.")
self.process = alive_process(self.process)
self.process.stdin.write(b"finalize\n") # type: ignore
sleep(10)
if self.process.poll() != 0:
try:
self.process.terminate()
finally:
msg = f"Model terminated with return code {self.process.poll()}"
raise ValueError(msg)
12 changes: 12 additions & 0 deletions PyStemmusScope/bmi/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@
import numpy as np


# Phrases defined in the Matlab code to check for:
PROCESS_READY = b"Select BMI mode:"
PROCESS_FINALIZED = b"Finished clean up."
MATLAB_ERROR = b"Error in "


INAPPLICABLE_GRID_METHOD_MSG = (
"This grid method is not implmented for the STEMMUS_SCOPE BMI because the model is"
"\non a rectilinear grid."
)


class MatlabError(Exception):
"""Matlab code encountered an error."""

pass


class InapplicableBmiMethods:
"""Holds methods that are not applicable for STEMMUS_SCOPE's rectilinear grid."""

Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ dev = [
"mypy",
"pytest",
"pytest-cov",
"types-requests", # type stubs
]
docs = [
"mkdocs",
Expand Down Expand Up @@ -143,6 +144,7 @@ extend-select = [
ignore = [
"E501", # Line length: fails on many docstrings (needs fixing).
"PLR2004", # magic value used in comparsion (i.e. `if ndays == 28: month_is_feb`).
"B009", # getattr is useful to not mess with typing.
]
line-length = 88
exclude = ["docs", "build"]
Expand Down
Loading

0 comments on commit 4a8a761

Please sign in to comment.