Skip to content

Commit

Permalink
fix: no forwarding if primary url own
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Jan 10, 2025
1 parent 8cd588e commit 6bf252a
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 114 deletions.
2 changes: 1 addition & 1 deletion src/schema_registry/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ async def subject_post(
except Exception as xx:
raise xx

elif not primary_url:
elif not primary_url or self.config.host in primary_url:
raise no_primary_url_error()
else:
return await forward_client.forward_request_remote(
Expand Down
11 changes: 8 additions & 3 deletions src/schema_registry/routers/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from dependency_injector.wiring import inject, Provide
from fastapi import APIRouter, Depends, Request
from karapace.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.config import Config
from karapace.container import KarapaceContainer
from karapace.forward_client import ForwardClient
from karapace.typing import Subject
from schema_registry.container import SchemaRegistryContainer
Expand Down Expand Up @@ -46,14 +48,15 @@ async def config_put(
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> CompatibilityResponse:
if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
return await controller.config_set(compatibility_level_request=compatibility_level_request)
if not primary_url:
if not primary_url or config.host in primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(
request=request, primary_url=primary_url, response_type=CompatibilityResponse
Expand Down Expand Up @@ -86,14 +89,15 @@ async def config_set_subject(
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> CompatibilityResponse:
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
return await controller.config_subject_set(subject=subject, compatibility_level_request=compatibility_level_request)
if not primary_url:
if not primary_url or config.host in primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(
request=request, primary_url=primary_url, response_type=CompatibilityResponse
Expand All @@ -110,14 +114,15 @@ async def config_delete_subject(
forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]),
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> CompatibilityResponse:
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
return await controller.config_subject_delete(subject=subject)
if not primary_url:
if not primary_url or config.host in primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(
request=request, primary_url=primary_url, response_type=CompatibilityResponse
Expand Down
8 changes: 6 additions & 2 deletions src/schema_registry/routers/subjects.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from dependency_injector.wiring import inject, Provide
from fastapi import APIRouter, Depends, Request
from karapace.auth import AuthenticatorAndAuthorizer, Operation, User
from karapace.config import Config
from karapace.container import KarapaceContainer
from karapace.forward_client import ForwardClient
from karapace.typing import Subject
from schema_registry.container import SchemaRegistryContainer
Expand Down Expand Up @@ -76,14 +78,15 @@ async def subjects_subject_delete(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> list[int]:
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
return await controller.subject_delete(subject=subject, permanent=permanent)
if not primary_url:
if not primary_url or config.host in primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=list[int])

Expand Down Expand Up @@ -157,14 +160,15 @@ async def subjects_subject_version_delete(
authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]),
schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.schema_registry]),
controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]),
config: Config = Depends(Provide[KarapaceContainer.config]),
) -> int:
if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"):
raise unauthorized()

i_am_primary, primary_url = await schema_registry.get_master()
if i_am_primary:
return await controller.subject_version_delete(subject=subject, version=version, permanent=permanent)
if not primary_url:
if not primary_url or config.host in primary_url:
raise no_primary_url_error()
return await forward_client.forward_request_remote(request=request, primary_url=primary_url, response_type=int)

Expand Down
13 changes: 10 additions & 3 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async def fixture_rest_async(
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
# Use non-default max request size for REST producer.
config.producer_max_request_size = REST_PRODUCER_MAX_REQUEST_BYTES
config.waiting_time_before_acting_as_master_ms = 300
config.waiting_time_before_acting_as_master_ms = 500
rest = KafkaRest(config=config)

assert rest.serializer.registry_client
Expand Down Expand Up @@ -352,7 +352,7 @@ async def fixture_rest_async_novalidation(
# Use non-default max request size for REST producer.
config.producer_max_request_size = REST_PRODUCER_MAX_REQUEST_BYTES
config.name_strategy_validation = False # This should be only difference from rest_async
config.waiting_time_before_acting_as_master_ms = 300
config.waiting_time_before_acting_as_master_ms = 500
rest = KafkaRest(config=config)

assert rest.serializer.registry_client
Expand Down Expand Up @@ -422,7 +422,7 @@ async def fixture_rest_async_registry_auth(
config.registry_port = registry.port
config.registry_user = "admin"
config.registry_password = "admin"
config.waiting_time_before_acting_as_master_ms = 300
config.waiting_time_before_acting_as_master_ms = 500
rest = KafkaRest(config=config)

try:
Expand Down Expand Up @@ -477,8 +477,10 @@ async def fixture_registry_async_pair(

config1 = Config()
config1.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config1.waiting_time_before_acting_as_master_ms = 500
config2 = Config()
config2.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config2.waiting_time_before_acting_as_master_ms = 500

async with start_schema_registry_cluster(
config_templates=[config1, config2],
Expand Down Expand Up @@ -507,6 +509,7 @@ async def fixture_registry_cluster(
return
config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.waiting_time_before_acting_as_master_ms = 500

user_config = request.param.get("config", {}) if hasattr(request, "param") else {}
config.__dict__.update(user_config)
Expand Down Expand Up @@ -593,6 +596,7 @@ async def fixture_registry_https_endpoint(

config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.waiting_time_before_acting_as_master_ms = 500
config.server_tls_certfile = server_cert
config.server_tls_keyfile = server_key

Expand Down Expand Up @@ -650,6 +654,7 @@ async def fixture_registry_http_auth_endpoint(

config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.waiting_time_before_acting_as_master_ms = 500
config.registry_authfile = "tests/integration/config/karapace.auth.json"

async with start_schema_registry_cluster(
Expand Down Expand Up @@ -703,10 +708,12 @@ async def fixture_registry_async_auth_pair(

config1 = Config()
config1.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config1.waiting_time_before_acting_as_master_ms = 500
config1.registry_authfile = "tests/integration/config/karapace.auth.json"

config2 = Config()
config2.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config2.waiting_time_before_acting_as_master_ms = 500
config2.registry_authfile = "tests/integration/config/karapace.auth.json"

async with start_schema_registry_cluster(
Expand Down
103 changes: 0 additions & 103 deletions tests/integration/test_master_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
from karapace.typing import SchemaReaderStoppper
from tests.integration.utils.kafka_server import KafkaServers
from tests.integration.utils.network import allocate_port
from tests.integration.utils.rest_client import RetryRestClient
from tests.utils import new_random_name

import asyncio
import json
import pytest


Expand Down Expand Up @@ -189,104 +187,3 @@ async def test_no_eligible_master(kafka_servers: KafkaServers) -> None:
assert mc.schema_coordinator.master_url is None
finally:
await mc.close()


async def test_schema_request_forwarding(
registry_async_pair,
registry_async_retry_client: RetryRestClient,
) -> None:
master_url, slave_url = registry_async_pair
max_tries, counter = 5, 0
wait_time = 0.5
subject = new_random_name("subject")
schema = {"type": "string"}
other_schema = {"type": "int"}
# Config updates
for subj_path in [None, subject]:
if subj_path:
path = f"config/{subject}"
else:
path = "config"
for compat in ["FULL", "BACKWARD", "FORWARD", "NONE"]:
resp = await registry_async_retry_client.put(f"{slave_url}/{path}", json={"compatibility": compat})
assert resp.ok
while True:
assert counter < max_tries, "Compat update not propagated"
resp = await registry_async_retry_client.get(f"{master_url}/{path}")
if not resp.ok:
print(f"Invalid http status code: {resp.status_code}")
continue
data = resp.json()
if "compatibilityLevel" not in data:
print(f"Invalid response: {data}")
counter += 1
await asyncio.sleep(wait_time)
continue
if data["compatibilityLevel"] != compat:
print(f"Bad compatibility: {data}")
counter += 1
await asyncio.sleep(wait_time)
continue
break

# New schema updates, last compatibility is None
for s in [schema, other_schema]:
resp = await registry_async_retry_client.post(
f"{slave_url}/subjects/{subject}/versions", json={"schema": json.dumps(s)}
)
assert resp.ok
data = resp.json()
assert "id" in data, data
counter = 0
while True:
assert counter < max_tries, "Subject schema data not propagated yet"
resp = await registry_async_retry_client.get(f"{master_url}/subjects/{subject}/versions")
if not resp.ok:
print(f"Invalid http status code: {resp.status_code}")
counter += 1
continue
data = resp.json()
if not data:
print(f"No versions registered for subject {subject} yet")
counter += 1
continue
assert len(data) == 2, data
assert data[0] == 1, data
print("Subject schema data propagated")
break

# Schema deletions
resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}/versions/1")
assert resp.ok
counter = 0
while True:
assert counter < max_tries, "Subject version deletion not propagated yet"
resp = await registry_async_retry_client.get(
f"{master_url}/subjects/{subject}/versions/1", expected_response_code=404
)
if resp.ok:
print(f"Subject {subject} still has version 1 on master")
counter += 1
continue
assert resp.status_code == 404
print(f"Subject {subject} no longer has version 1")
break

# Subject deletion
resp = await registry_async_retry_client.get(f"{master_url}/subjects/")
assert resp.ok
data = resp.json()
assert subject in data
resp = await registry_async_retry_client.delete(f"{slave_url}/subjects/{subject}")
assert resp.ok
counter = 0
while True:
assert counter < max_tries, "Subject deletion not propagated yet"
resp = await registry_async_retry_client.get(f"{master_url}/subjects/")
if not resp.ok:
print("Could not retrieve subject list on master")
counter += 1
continue
data = resp.json()
assert subject not in data
break
Loading

0 comments on commit 6bf252a

Please sign in to comment.