Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds endpoints to debug what has been persisted #291

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 80 additions & 3 deletions burr/tracking/server/run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
import os
from importlib.resources import files
from typing import Sequence
from typing import Optional, Sequence

from burr.integrations.base import require_plugin

Expand All @@ -13,7 +13,6 @@

from burr.tracking.server import backend as backend_module
from burr.tracking.server import schema
from burr.tracking.server.schema import ApplicationLogs

# dynamic importing due to the dashes (which make reading the examples on github easier)
email_assistant = importlib.import_module("burr.examples.email-assistant.server")
Expand Down Expand Up @@ -65,7 +64,9 @@ async def get_apps(request: Request, project_id: str) -> Sequence[schema.Applica


@app.get("/api/v0/{project_id}/{app_id}/apps")
async def get_application_logs(request: Request, project_id: str, app_id: str) -> ApplicationLogs:
async def get_application_logs(
request: Request, project_id: str, app_id: str
) -> schema.ApplicationLogs:
"""Lists steps for a given App.
TODO: add streaming capabilities for bi-directional communication
TODO: add pagination for quicker loading
Expand All @@ -78,6 +79,82 @@ async def get_application_logs(request: Request, project_id: str, app_id: str) -
return await backend.get_application_logs(request, project_id=project_id, app_id=app_id)


@app.post("/api/v0/application_ids", response_model=Optional[schema.ApplicationIDs])
def list_app_ids(
persister_type: str,
persister_kwargs: dict,
partition_key: str = None,
persister_method: str = "constructor",
) -> Optional[schema.ApplicationIDs]:
"""Lists all the app_ids for a given partition key using the provided persister.

This is meant for debugging use cases.

:param partition_key: the partition key to list app_ids for
:param persister_kwargs: the kwargs to pass to the persister class constructor
:param persister_type: the type of persister to use. This is a fully qualified name, e.g.burr.integrations.persisters.b_redis.RedisPersister
:param persister_method: the class method to use to create the persister. Defaults to "constructor".
:return: a list of app_ids
"""
persister = create_persister(persister_kwargs, persister_method, persister_type)
app_ids = persister.list_app_ids(partition_key)
if app_ids is None:
return None
return schema.ApplicationIDs(partition_key=partition_key, app_ids=app_ids)


def create_persister(persister_kwargs, persister_method, persister_type):
"""Given the persister type, imports the module and creates the class
e.g. burr.integrations.persisters.b_redis.RedisPersister
"""
module_name, class_name = persister_type.rsplit(".", 1)
module = importlib.import_module(module_name)
persister_class = getattr(module, class_name)
if persister_method == "constructor":
persister = persister_class(**persister_kwargs)
else:
persister = getattr(persister_class, persister_method)(**persister_kwargs)
return persister


@app.post(
"/api/v0/application_id/{application_id}", response_model=Optional[schema.PersistedStateData]
)
def load_app_step(
persister_type: str,
persister_kwargs: dict,
application_id: str,
partition_key: str = None,
sequence_id: int = None,
persister_method: str = "constructor",
) -> Optional[schema.PersistedStateData]:
"""Loads a persisted value for a given partition_key, application_id [, and sequence_id].

This is meant for debugging use cases.

:param persister_type: the type of persister to use. This is a fully qualified name, e.g.burr.integrations.persisters.b_redis.RedisPersister
:param persister_kwargs: the kwargs to pass to the persister class constructor
:param application_id: the application_id to load
:param partition_key: the partition key to list app_ids for
:param sequence_id: the sequence_id to load. Defaults to None. Gets last one.
:return: a list of app_ids
"""
persister = create_persister(persister_kwargs, persister_method, persister_type)
app_step = persister.load(partition_key, application_id, sequence_id)
if app_step is None:
return None
else:
return schema.PersistedStateData(
partition_key=app_step["partition_key"],
app_id=app_step["app_id"],
sequence_id=app_step["sequence_id"],
position=app_step["position"],
state=app_step["state"].serialize(),
created_at=app_step["created_at"],
status=app_step["status"],
)


@app.get("/api/v0/ready")
async def ready() -> bool:
return True
Expand Down
15 changes: 15 additions & 0 deletions burr/tracking/server/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,18 @@ class ApplicationLogs(pydantic.BaseModel):
steps: List[Step]
parent_pointer: Optional[PointerModel] = None
spawning_parent_pointer: Optional[PointerModel] = None


class ApplicationIDs(pydantic.BaseModel):
partition_key: str
app_ids: List[str]


class PersistedStateData(pydantic.BaseModel):
partition_key: Optional[str]
app_id: str
sequence_id: int
position: str
state: dict
created_at: str
status: str
Loading