Skip to content

Commit

Permalink
Implement API rate limiting
Browse files Browse the repository at this point in the history
Add enforcement of API rate limits if the user has an API quota for the
service set in the `GafaelfawrIngress`. If the request is rejected by
rate limiting, Gafaelfawr will return a 429 error with the HTTP
`Retry-After` header set to the time at which the rate limit resets.

Rate limiting is implemented using the `limits` Python package, which
unfortunately introduces a second Redis client with a separate pool
(`coredis`). Currently, rate limiting uses a fixed window without any
fancy rolling expiration or burst protection.
  • Loading branch information
rra committed Jan 11, 2025
1 parent 0fe0ac0 commit 9fe4790
Show file tree
Hide file tree
Showing 16 changed files with 644 additions and 297 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.9.0
rev: v0.9.1
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
3 changes: 3 additions & 0 deletions changelog.d/20250110_173247_rra_DM_48390.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### New features

- API rate limits are now enforced if configured. If a request exceeds the rate limit, Gafaelfawr will return a 429 response with a `Retry-After` header. Rate limit data is recorded in the new ephemeral Redis pool.
1 change: 1 addition & 0 deletions docs/documenteer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ bonsai = "https://bonsai.readthedocs.io/en/latest"
cryptography = "https://cryptography.io/en/latest"
jwt = "https://pyjwt.readthedocs.io/en/latest"
kopf = "https://kopf.readthedocs.io/en/stable"
limits = "https://limits.readthedocs.io/en/stable"
python = "https://docs.python.org/3"
redis = "https://redis-py.readthedocs.io/en/stable"
safir = "https://safir.lsst.io"
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies = [
"google-api-core",
"google-cloud-firestore",
"httpx",
"limits[async-redis]",
"kopf",
"kubernetes-asyncio",
"jinja2",
Expand Down
197 changes: 138 additions & 59 deletions requirements/dev.txt

Large diffs are not rendered by default.

403 changes: 271 additions & 132 deletions requirements/main.txt

Large diffs are not rendered by default.

37 changes: 19 additions & 18 deletions requirements/tox.txt
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ pywin32==308 ; sys_platform == 'win32' \
--hash=sha256:fd380990e792eaf6827fcb7e187b2b4b1cede0585e3d0c9e84201ec27b9905e4
# via
# -c requirements/dev.txt
# -c requirements/main.txt
# docker
requests==2.32.3 \
--hash=sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760 \
Expand Down Expand Up @@ -220,24 +221,24 @@ urllib3==2.3.0 \
# -c requirements/main.txt
# docker
# requests
uv==0.5.16 \
--hash=sha256:2cf976b5d29f67bd8e5aa0c9b9c481c7df1580839ac107cbf58811c894a7e6a6 \
--hash=sha256:2e6f4fbd1cef0227e15374ab56a59ffcbd10c66f263b4e22ac43ca37061539d1 \
--hash=sha256:3419888b178b82511faebd8d1ca9cb9f5920a7142406898d76878adaffe8dfb1 \
--hash=sha256:598a0fa168ffe2786e11bfe0103fe22c06e90fe193ced172b843daf9abb42212 \
--hash=sha256:8fb0b26b9885d57609724fd4efb65785e6896b9bd4ecb865480962999d4247d4 \
--hash=sha256:9522aad257223c9470bb93b78f08dc473acf73a4a1403b999b2f1871a06343a6 \
--hash=sha256:9e73ae821678e7395635be0fec810f242e42f47b5f9cb82a7fc3d279bc37edb5 \
--hash=sha256:a7508b271a461603a38095eaca27b1766bd71f0a02232868a8faae0d8e8a1ae3 \
--hash=sha256:ba5f364809a46e590368a3f85638b71118952f30c79125f87144d9f00be0c9cb \
--hash=sha256:c609bb564f4d5822acd25d8881d76123c2e9aa26b634a6444b56142daff851b8 \
--hash=sha256:c6d3b45d66adedebe5c56602970b2c0fdf6d1977f53464691d428f1a7daa911b \
--hash=sha256:c7ee83877f4e324df4a61e7c79a476d26725e415894a755ce45de87709fc9581 \
--hash=sha256:c8310f40b8834812ddfb195fd62aafaed44b2993c71de8fa75ca0960c739d04e \
--hash=sha256:d540bbc1831d4187a29a9e8f7219e2093c491a45bcbcf2a6196f81c6d2162691 \
--hash=sha256:d54777d6c87a12213a59d2109289e4e35374cc79f88e29617ff764189b8b9cad \
--hash=sha256:e49b2c242cbc201665670dcc3ffa509fa6e131ebcf0423c59df91f2f21eca9d7 \
--hash=sha256:e764721c425ca6cde184ae69517748e47c4ea7f0d9c7dafa78721feeabc58e01
uv==0.5.17 \
--hash=sha256:12789bf19457e3c5fc20767960203ab60222124afe2cbdfde92a657318651a64 \
--hash=sha256:19912bfb0db7fa3b7e769fecf44ce57512713ce0bf427a6ba207761fe1852611 \
--hash=sha256:21ddce1813ea1254398d6db0f650ac37236763d440430d4129e5d46acd0d5f48 \
--hash=sha256:2b8bda94be777896818b219260fe275342fcd2325b973983f91928235ff9b9e4 \
--hash=sha256:2dfbfa06856ae2c6658634be48a57e29293b1c7c78a79489762c5d000c489f3d \
--hash=sha256:364d96d9775a042a4dcea60c2b7ce666bb7df514e822abb3c90af27d6faad73b \
--hash=sha256:62e4003e98010dc93f1142c32687f3238c722ad0ce16ef3c7951616ac1f67d75 \
--hash=sha256:6a6c99c152d6ef7b9850fe72431b1f6aaaf68f32f0b6e020475075d7fe8423e1 \
--hash=sha256:78a9f69484beaf3bf590dbabc059403110ec512627ed6468b3554eff342726bb \
--hash=sha256:868cf64eb665b37daa5b183290cce7844566792ad2ee1257c881a47872f91139 \
--hash=sha256:8bc068efa148fd201a08cf24ee0296501123d61f8653c99e62457c79e42be187 \
--hash=sha256:924966babb8fcf6f87c3698a1d5811e4d45104a595137cbc3feb37fa81799bcc \
--hash=sha256:b0493f81a10995b96bf5b7e3f105a767fed27a3ddfc63d2a25caa82e44fba32e \
--hash=sha256:cb385f458cf55c90a24d52dc6e51d79cbbfcc9f454931ff4e33fa5a6434d9d7f \
--hash=sha256:cd8c70b209b71bbd4464ebfd8e87756a2f9adc03862cebee16594f2b18ec261c \
--hash=sha256:f04074aca4042e6ac2388d599cbc2adc64f66b4101ce07f2a78a8215bea31ef8 \
--hash=sha256:f1f124e8ff6a9f2a0c2aa0e01281795fe7f0dcc41cf1d1b4100e1f5cd3621a51
# via tox-uv
virtualenv==20.28.1 \
--hash=sha256:412773c85d4dab0409b83ec36f7a6499e72eaf08c80e81e9576bca61831c71cb \
Expand Down
17 changes: 17 additions & 0 deletions src/gafaelfawr/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,23 @@ def add_user_group(self) -> bool:
"""Whether to add a synthetic private user group."""
return bool(self.github or (self.ldap and self.ldap.add_user_group))

@property
def redis_rate_limit_url(self) -> str:
"""Redis DSN to use for rate limiting.
The :py:mod:`limits` package requires the Redis DSN in a
specific format with the password already included.
"""
host = self.redis_ephemeral_url.host
port = self.redis_ephemeral_url.port
netloc = f"{host}:{port}" if port else host
path = self.redis_ephemeral_url.path
if self.redis_password:
password = self.redis_password.get_secret_value()
return f"async+redis://:{password}@{netloc}{path}"
else:
return f"async+redis://{netloc}{path}"

def configure_logging(self) -> None:
"""Configure logging based on the Gafaelfawr configuration."""
configure_logging(name="gafaelfawr", log_level=self.log_level)
Expand Down
5 changes: 5 additions & 0 deletions src/gafaelfawr/dependencies/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Annotated, Any

from fastapi import Depends, HTTPException, Request
from limits.aio.strategies import RateLimiter
from safir.dependencies.db_session import db_session_dependency
from safir.dependencies.logger import logger_dependency
from safir.metrics import EventManager
Expand Down Expand Up @@ -56,6 +57,9 @@ class RequestContext:
session: async_scoped_session
"""The database session."""

rate_limiter: RateLimiter
"""API rate limiter."""

factory: Factory
"""The component factory."""

Expand Down Expand Up @@ -124,6 +128,7 @@ async def __call__(
logger=logger,
events=self._events,
session=session,
rate_limiter=self._process_context.rate_limiter,
factory=Factory(self._process_context, session, logger),
)

Expand Down
7 changes: 7 additions & 0 deletions src/gafaelfawr/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from google.cloud import firestore
from httpx import AsyncClient
from kubernetes_asyncio.client import ApiClient
from limits.aio.storage import RedisStorage
from limits.aio.strategies import FixedWindowRateLimiter, RateLimiter
from redis.asyncio import BlockingConnectionPool, Redis
from redis.asyncio.retry import Retry
from redis.backoff import ExponentialBackoff
Expand Down Expand Up @@ -98,6 +100,9 @@ class ProcessContext:
persistent_redis: Redis
"""Connection pool to use to talk to persistent Redis."""

rate_limiter: RateLimiter
"""API rate limiter."""

uid_cache: IdCache
"""Shared UID cache."""

Expand Down Expand Up @@ -168,6 +173,7 @@ async def from_config(cls, config: Config) -> Self:
timeout=REDIS_POOL_TIMEOUT,
)
ephemeral_redis_client = Redis.from_pool(ephemeral_redis_pool)
rate_limiter_storage = RedisStorage(config.redis_rate_limit_url)
persistent_redis_pool = BlockingConnectionPool.from_url(
str(config.redis_persistent_url),
password=redis_password,
Expand All @@ -187,6 +193,7 @@ async def from_config(cls, config: Config) -> Self:
ldap_pool=ldap_pool,
ephemeral_redis=ephemeral_redis_client,
persistent_redis=persistent_redis_client,
rate_limiter=FixedWindowRateLimiter(rate_limiter_storage),
uid_cache=IdCache(),
gid_cache=IdCache(),
ldap_group_cache=LDAPCache(list[Group]),
Expand Down
103 changes: 78 additions & 25 deletions src/gafaelfawr/handlers/ingress.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
from __future__ import annotations

from dataclasses import dataclass
from datetime import timedelta
from datetime import UTC, datetime, timedelta
from email.utils import format_datetime
from typing import Annotated

import sentry_sdk
from fastapi import APIRouter, Depends, Header, HTTPException, Query, Response
from limits import RateLimitItemPerMinute
from safir.datetime import current_datetime
from safir.models import ErrorModel
from safir.pydantic import SecondsTimedelta
Expand All @@ -39,6 +41,7 @@
)
from ..models.auth import AuthType, Satisfy
from ..models.token import TokenData
from ..models.userinfo import UserInfo
from ..util import is_bot_user

router = APIRouter(route_class=SlackRouteErrorHandler)
Expand Down Expand Up @@ -349,9 +352,33 @@ async def get_auth(
InsufficientScopeError("Access not allowed for this user"),
)

# Get user information and check rate limits.
info_service = context.factory.create_user_info_service()
try:
user_info = await info_service.get_user_info_from_token(token_data)
except ExternalUserInfoError as e:
# Catch these exceptions rather than raising an uncaught exception or
# reporting the exception to Slack. This route is called on every user
# request and may be called multiple times per second, so if we
# reported every exception during an LDAP outage to Slack, we would
# get rate-limited or destroy the Slack channel. Instead, log the
# exception and return 403 and rely on failures during login (which
# are reported to Slack) and external testing to detect these
# problems.
msg = "Unable to get user information"
context.logger.exception(msg, user=token_data.username, error=str(e))
raise HTTPException(
headers={"Cache-Control": "no-cache, no-store"},
status_code=500,
detail=[{"msg": msg, "type": "user_info_failed"}],
) from e
await check_rate_limit(context, auth_config, user_info)

# Log and return the results.
context.logger.info("Token authorized")
headers = await build_success_headers(context, auth_config, token_data)
headers = await build_success_headers(
context, auth_config, token_data, user_info
)
for key, value in headers:
response.headers.append(key, value)

Expand Down Expand Up @@ -465,8 +492,52 @@ def check_lifetime(
)


async def check_rate_limit(
context: RequestContext, auth_config: AuthConfig, user_info: UserInfo
) -> None:
"""Check whether this request is allowed by rate limits.
Parameters
----------
context
Context of the incoming request.
auth_config
Configuration parameters for the authorization.
user_info
Information about the user, including their quotas.
Raises
------
fastapi.HTTPException
Raised if the requet was rejected by rate limiting. This error will
use a 429 response code.
"""
if not user_info.quota or not auth_config.service:
return
quota = user_info.quota.api.get(auth_config.service)
if not quota:
return
key = ("api", user_info.username)
limit = RateLimitItemPerMinute(quota, 15)
if not await context.rate_limiter.hit(limit, *key):
stats = await context.rate_limiter.get_window_stats(limit, *key)
retry_after = datetime.fromtimestamp(stats.reset_time, tz=UTC)
msg = f"Rate limit ({quota}/15m) exceeded"
raise HTTPException(
detail=[{"msg": msg, "type": "rate_limited"}],
status_code=429,
headers={
"Cache-Control": "no-cache, no-store",
"Retry-After": format_datetime(retry_after, usegmt=True),
},
)


async def build_success_headers(
context: RequestContext, auth_config: AuthConfig, token_data: TokenData
context: RequestContext,
auth_config: AuthConfig,
token_data: TokenData,
user_info: UserInfo,
) -> list[tuple[str, str]]:
"""Construct the headers for successful authorization.
Expand All @@ -491,11 +562,13 @@ async def build_success_headers(
Parameters
----------
context
The context of the incoming request.
Context of the incoming request.
auth_config
Configuration parameters for the authorization.
token_data
The data from the authentication token.
Data from the authentication token.
user_info
User information for the authenticated user.
Returns
-------
Expand All @@ -508,26 +581,6 @@ async def build_success_headers(
Raised if user information could not be retrieved from external
systems.
"""
info_service = context.factory.create_user_info_service()
try:
user_info = await info_service.get_user_info_from_token(token_data)
except ExternalUserInfoError as e:
# Catch these exceptions rather than raising an uncaught exception or
# reporting the exception to Slack. This route is called on every user
# request and may be called multiple times per second, so if we
# reported every exception during an LDAP outage to Slack, we would
# get rate-limited or destroy the Slack channel. Instead, log the
# exception and return 403 and rely on failures during login (which
# are reported to Slack) and external testing to detect these
# problems.
msg = "Unable to get user information"
context.logger.exception(msg, user=token_data.username, error=str(e))
raise HTTPException(
headers={"Cache-Control": "no-cache, no-store"},
status_code=500,
detail=[{"msg": msg, "type": "user_info_failed"}],
) from e

headers = [("X-Auth-Request-User", token_data.username)]
if user_info.email:
headers.append(("X-Auth-Request-Email", user_info.email))
Expand Down
3 changes: 3 additions & 0 deletions tests/data/config/github-quota.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ quota:
default:
api:
datalinker: 1000
test: 1
notebook:
cpu: 8
memory: 4.0
groups:
foo:
api:
test: 1
notebook:
cpu: 0.0
memory: 4.0
Expand Down
40 changes: 39 additions & 1 deletion tests/handlers/ingress_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from __future__ import annotations

import base64
from datetime import timedelta
from datetime import UTC, datetime, timedelta
from email.utils import parsedate_to_datetime
from unittest.mock import ANY

import pytest
Expand Down Expand Up @@ -1150,3 +1151,40 @@ async def test_only_service(client: AsyncClient, factory: Factory) -> None:
assert isinstance(authenticate, AuthErrorChallenge)
assert authenticate.auth_type == AuthType.Bearer
assert authenticate.error == AuthError.insufficient_scope


@pytest.mark.asyncio
async def test_rate_limit(client: AsyncClient, factory: Factory) -> None:
await reconfigure("github-quota", factory)
token_data = await create_session_token(
factory, group_names=["foo"], scopes={"read:all"}
)

# Two requests should be allowed, one from the default quota and a second
# from the additional quota from the foo group.
r = await client.get(
"/ingress/auth",
params={"scope": "read:all", "service": "test"},
headers={"Authorization": f"Bearer {token_data.token}"},
)
assert r.status_code == 200
r = await client.get(
"/ingress/auth",
params={"scope": "read:all", "service": "test"},
headers={"Authorization": f"Bearer {token_data.token}"},
)
assert r.status_code == 200

# The third request should be rejected due to rate limiting, with a
# Retry-After header set to approximately fifteen minutes from now.
expected = (
datetime.now(tz=UTC) + timedelta(minutes=15) - timedelta(seconds=1)
)
r = await client.get(
"/ingress/auth",
params={"scope": "read:all", "service": "test"},
headers={"Authorization": f"Bearer {token_data.token}"},
)
assert r.status_code == 429
retry_after = parsedate_to_datetime(r.headers["Retry-After"])
assert expected <= retry_after <= expected + timedelta(seconds=2)
Loading

0 comments on commit 9fe4790

Please sign in to comment.