Skip to content

Commit

Permalink
Merge pull request #98 from lsst-sqre/tickets/DM-34473
Browse files Browse the repository at this point in the history
DM-34473: Add arq FastAPI dependency
  • Loading branch information
jonathansick authored Jun 13, 2022
2 parents cd047df + 437db77 commit 6fee2a3
Show file tree
Hide file tree
Showing 12 changed files with 1,104 additions and 1 deletion.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ Change log
.. Headline template:
X.Y.Z (YYYY-MM-DD)
3.2.0 (2022-05-13)
==================

- New support for `arq <https://arq-docs.helpmanual.io>`__, the Redis-based asyncio distributed queue package.
The ``safir.arq`` module provides an arq client and metadata/result data classes with a mock implementation for testing.
The FastAPI dependency, ``safir.dependencies.arq.arq_dependency``, provides a convenient way to use the arq client from HTTP handlers.

3.1.0 (2022-06-01)
==================

Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.PHONY: init
init:
pip install --upgrade pip tox tox-docker pre-commit
pip install --upgrade -e ".[db,dev,kubernetes]"
pip install --upgrade -e ".[arq,db,dev,kubernetes]"
pre-commit install
rm -rf .tox
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Features
- Middleware for attaching request context to the logger to include a request UUID, method, and route in all log messages.
- Process ``X-Forwarded-*`` headers to determine the source IP and related information of the request.
- Gather and structure standard metadata about your application.
- Operate a distributed Redis job queue with arq_ using convenient clients, testing mocks, and a FastAPI dependency.

Developing Safir
================
Expand All @@ -40,3 +41,4 @@ For details, see https://safir.lsst.io/dev/development.html.
.. _Roundtable: https://roundtable.lsst.io
.. _FastAPI: https://fastapi.tiangolo.com/
.. _fastapi_safir_app: https://github.com/lsst/templates/tree/master/project_templates/fastapi_safir_app
.. _arq: https://arq-docs.helpmanual.io
6 changes: 6 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@ API reference
.. automodapi:: safir
:include-all-objects:

.. automodapi:: safir.arq
:include-all-objects:

.. automodapi:: safir.database

.. automodapi:: safir.dependencies.arq
:include-all-objects:

.. automodapi:: safir.dependencies.db_session
:include-all-objects:

Expand Down
288 changes: 288 additions & 0 deletions docs/arq.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
.. currentmodule:: safir.arq

###############################################
Using the arq Redis queue client and dependency
###############################################

Distributed queues allow your application to decouple slow-running processing tasks from your user-facing endpoint handlers.
arq_ is a simple distributed queue library with an asyncio API that uses Redis to store both queue metadata and results.
To simplify integrating arq_ into your FastAPI application and test suites, Safir both an arq client (`~safir.arq.ArqQueue`) with a drop-in mock for testing and an endpoint handler dependency (`safir.dependencies.arq`) that provides an arq_ client.

For information on using arq in general, see the `arq documentation <https://arq-docs.helpmanual.io>`_.
For real-world examples of how this dependency, and arq-based distributed queues in general are used in FastAPI apps, see our `Times Square <https://github.com/lsst-sqre/times-square>`__ and `Noteburst <https://github.com/lsst-sqre/noteburst>`__ applications.

Quick start
===========

.. _arq-dependency-setup:

Dependency set up and configuration
-----------------------------------

In your application's FastAPI setup module, typically :file:`main.py`, you need to initialize `safir.dependencies.arq.ArqDependency` during the start up event:

.. code-block:: python
from fastapi import Depends, FastAPI
from safir.dependencies.arq import arq_dependency
app = FastAPI()
@app.on_event("startup")
async def startup() -> None:
await arq_dependency.initialize(
mode=config.arq_mode, redis_settings=config.arq_redis_settings
)
The ``mode`` parameter for `safir.dependencies.arq.ArqDependency.initialize` takes `ArqMode` enum values of either ``"production"`` or ``"test"``. The ``"production"`` mode configures a real arq_ queue backed by Redis, whereas ``"test"`` configures a mock version of the arq_ queue.

Running under the regular ``"production"`` mode, you need to provide a `arq.connections.RedisSettings` instance.
If your app uses a configuration system like ``pydantic.BaseSettings``, this example ``Config`` class shows how to create a `~arq.connections.RedisSettings` object from a regular Redis URI:

.. code-block:: python
from urllib.parse import urlparse
from arq.connections import RedisSettings
from pydantic import BaseSettings, Field, RedisDsn
from safir.arq import ArqMode
class Config(BaseSettings):
arq_queue_url: RedisDsn = Field(
"redis://localhost:6379/1", env="APP_ARQ_QUEUE_URL"
)
arq_mode: ArqMode = Field(ArqMode.production, env="APP_ARQ_MODE")
@property
def arq_redis_settings(self) -> RedisSettings:
"""Create a Redis settings instance for arq."""
url_parts = urlparse(self.redis_queue_url)
redis_settings = RedisSettings(
host=url_parts.hostname or "localhost",
port=url_parts.port or 6379,
database=int(url_parts.path.lstrip("/")) if url_parts.path else 0,
)
return redis_settings
Worker set up
-------------

Workers that run queued tasks are separate application deployments, though they can (but don't necessarily need to) operate from the same codebase as the FastAPI-based front-end application.
A convenient pattern is to co-locate the worker inside a ``worker`` sub-package:

.. code-block:: text
.
├── src
│   └── yourapp
│   ├── __init__.py
│   ├── config.py
│   ├── main.py
│   └── worker
│   ├── __init__.py
│   ├── functions
│   │   ├── __init__.py
│   │   ├── function_a.py
│   │   └── function_b.py
│   ├── main.py
The :file:`src/yourapp/worker/main.py` module looks like:

.. code-block:: python
from __future__ import annotations
import uuid
from typing import Any, Dict
import httpx
import structlog
from safir.logging import configure_logging
from ..config import config
from .functions import function_a, function_b
async def startup(ctx: Dict[Any, Any]) -> None:
"""Runs during worker start-up to set up the worker context."""
configure_logging(
profile=config.profile,
log_level=config.log_level,
name="yourapp",
)
logger = structlog.get_logger("yourapp")
# The instance key uniquely identifies this worker in logs
instance_key = uuid.uuid4().hex
logger = logger.bind(worker_instance=instance_key)
http_client = httpx.AsyncClient()
ctx["http_client"] = http_client
ctx["logger"] = logger
logger.info("Worker start up complete")
async def shutdown(ctx: Dict[Any, Any]) -> None:
"""Runs during worker shutdown to cleanup resources."""
if "logger" in ctx.keys():
logger = ctx["logger"]
else:
logger = structlog.get_logger("yourapp")
logger.info("Running worker shutdown.")
try:
await ctx["http_client"].aclose()
except Exception as e:
logger.warning("Issue closing the http_client: %s", str(e))
logger.info("Worker shutdown complete.")
class WorkerSettings:
"""Configuration for the arq worker.
See `arq.worker.Worker` for details on these attributes.
"""
functions = [function_a, function_b]
redis_settings = config.arq_redis_settings
on_startup = startup
on_shutdown = shutdown
The ``WorkerSettings`` class is where you configure the queue and declare worker functions.
See `arq.worker.Worker` for details.

The ``on_startup`` and ``on_shutdown`` handlers are ideal places to set up (and tear down) worker state, including network and database clients.
The context variable, ``ctx``, passed to these functions are also passed to the worker functions.

To run a worker, you run your application's Docker image with the ``arq`` command, followed by the fully-qualified namespace of the ``WorkerSettings`` class.

Using the arq dependency in endpoint handlers
---------------------------------------------

The `safir.dependencies.arq.arq_dependency` dependency provides your FastAPI endpoint handlers with an `ArqQueue` client that you can use to add jobs (`ArqQueue.enqueue`) to the queue, and get metadata (`ArqQueue.get_job_metadata`) and results (`ArqQueue.get_job_result`) from the queue:

.. code-block:: python
from fastapi import Depends, HTTPException
from safir.arq import ArqQueue
from safir.dependencies.arq import arq_dependency
@app.post("/jobs")
async def post_job(
arq_queue: ArqQueue = Depends(arq_dependency),
a: str = "hello",
b: int = 42,
) -> Dict[str, Any]:
"""Create a job."""
job = await arq_queue.enqueue("test_task", a, a_number=b)
return {"job_id": job.id}
@app.get("/jobs/{job_id}")
async def get_job(
job_id: str,
arq_queue: ArqQueue = Depends(arq_dependency),
) -> Dict[str, Any]:
"""Get metadata about a job."""
try:
job = await arq_queue.get_job_metadata(job_id, queue_name=queue_name)
except JobNotFound:
raise HTTPException(status_code=404)
response = {
"id": job.id,
"status": job.status,
"name": job.name,
"args": job.args,
"kwargs": job.kwargs,
}
if job.status == JobStatus.complete:
try:
job_result = await arq_queue.get_job_result(
job_id, queue_name=queue_name
)
except (JobNotFound, JobResultUnavailable):
raise HTTPException(status_code=404)
response["result"] = job_result.result
return response
For information on the metadata available from jobs, see `JobMetadata` and `JobResult`.

Testing applications with an arq queue
======================================

Unit testing an application with a running distributed queue is difficult since three components (two instances of the application and a redis database) must coordinate.
A better unit testing approach is to test the front-end application separately from the worker functions.
To help you do this, the arq dependency allows you to run a mocked version of an arq queue.
With the mocked client, your front-end application can run the three basic client methods as normal: `ArqQueue.enqueue`, `ArqQueue.get_job_metadata`, and `ArqQueue.get_job_result`).
This mocked client is a subclass of `ArqQueue` called `MockArqQueue`.

Configuring the test mode
-------------------------

You get a `MockArqQueue` from the `safir.dependencies.arq.arq_dependency` instance by passing a `ArqMode.test` value to the ``mode`` argument of `safir.dependencies.arq.ArqDependency.initialize` in your application's start up (see :ref:`arq-dependency-setup`).
As the above example shows, you can make this an environment variable configuration, and then set the arq mode in your tox settings.

Interacting with the queue state
--------------------------------

Your tests can add jobs and get job metadata or results using the normal code paths.
Since queue jobs never run, your test code needs to manually change the status of jobs and set job results.
You can do this by manually calling the `safir.dependencies.arq.arq_dependency` instance from your test (a `MockArqQueue`) and using the `MockArqQueue.set_in_progress` and `MockArqQueue.set_complete` methods.

This example adapted from Noteburst shows how this works:

.. code-block:: python
from safir.arq import MockArqQueue
from safir.dependencies.arq import arq_dependency
@pytest.mark.asyncio
async def test_post_nbexec(
client: AsyncClient, sample_ipynb: str, sample_ipynb_executed: str
) -> None:
arq_queue = await arq_dependency()
assert isinstance(arq_queue, MockArqQueue)
response = await client.post(
"/noteburst/v1/notebooks/",
json={
"ipynb": sample_ipynb,
"kernel_name": "LSST",
},
)
assert response.status_code == 202
data = response.json()
assert data["status"] == "queued"
job_url = response.headers["Location"]
job_id = data["job_id"]
# Toggle the job to in-progress; the status should update
await arq_queue.set_in_progress(job_id)
response = await client.get(job_url)
assert response.status_code == 200
data = response.json()
assert data["status"] == "in_progress"
# Toggle the job to complete
await arq_queue.set_complete(job_id, result=sample_ipynb_executed)
response = await client.get(job_url)
assert response.status_code == 200
data = response.json()
assert data["status"] == "complete"
assert data["success"] is True
assert data["ipynb"] == sample_ipynb_executed
3 changes: 3 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

rst_epilog = """
.. _arq: https://arq-docs.helpmanual.io
.. _FastAPI: https://fastapi.tiangolo.com/
.. _mypy: http://www.mypy-lang.org
.. _pre-commit: https://pre-commit.com
Expand Down Expand Up @@ -72,6 +73,7 @@
"python": ("https://docs.python.org/3/", None),
"sqlalchemy": ("https://docs.sqlalchemy.org/en/latest/", None),
"structlog": ("https://www.structlog.org/en/stable/", None),
"arq": ("https://arq-docs.helpmanual.io", None),
}

intersphinx_timeout = 10.0 # seconds
Expand All @@ -94,6 +96,7 @@
("py:class", "starlette.middleware.base.BaseHTTPMiddleware"),
("py:class", "starlette.requests.Request"),
("py:class", "starlette.responses.Response"),
("py:obj", "JobMetadata.id"),
]

# Linkcheck builder ==========================================================
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Guides
.. toctree::
:maxdepth: 2

arq
database
http-client
gafaelfawr
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ dev =
sphinx-prompt
kubernetes =
kubernetes_asyncio
arq =
# 0.23 or later includes support for aioredis 2, but is not released yet.
arq==0.23a1

[flake8]
max-line-length = 79
Expand Down
Loading

0 comments on commit 6fee2a3

Please sign in to comment.