From 2629dcb35eaa7d3f82800aa11b17629a5010acf5 Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Wed, 11 Dec 2024 10:46:43 -0600 Subject: [PATCH 1/3] Add `Block.aload` method and remove `@sync_compatible` from `Block.load` --- .../_internal/compatibility/async_dispatch.py | 45 ++++- src/prefect/blocks/core.py | 159 ++++++++++++++++-- src/prefect/client/orchestration.py | 38 +++++ src/prefect/client/utilities.py | 16 +- .../compatibility/test_async_dispatch.py | 122 +++++++++++++- tests/blocks/test_core.py | 80 ++++++++- 6 files changed, 427 insertions(+), 33 deletions(-) diff --git a/src/prefect/_internal/compatibility/async_dispatch.py b/src/prefect/_internal/compatibility/async_dispatch.py index 12422d696d0e..bc0ada514537 100644 --- a/src/prefect/_internal/compatibility/async_dispatch.py +++ b/src/prefect/_internal/compatibility/async_dispatch.py @@ -1,11 +1,12 @@ import asyncio import inspect from functools import wraps -from typing import Any, Callable, Coroutine, Optional, TypeVar, Union +from typing import TYPE_CHECKING, Any, Callable, Coroutine, Optional, TypeVar, Union from typing_extensions import ParamSpec -from prefect.tasks import Task +if TYPE_CHECKING: + from prefect.tasks import Task R = TypeVar("R") P = ParamSpec("P") @@ -13,20 +14,44 @@ def is_in_async_context() -> bool: """ - Returns True if called from within an async context (coroutine or running event loop) + Returns True if called from within an async context. + + An async context is one of: + - a coroutine + - a running event loop + - a task or flow that is async """ + from prefect.context import get_run_context + from prefect.exceptions import MissingContextError + try: - asyncio.get_running_loop() - return True - except RuntimeError: - return False + run_ctx = get_run_context() + parent_obj = getattr(run_ctx, "task", None) + if not parent_obj: + parent_obj = getattr(run_ctx, "flow", None) + return getattr(parent_obj, "isasync", True) + except MissingContextError: + # not in an execution context, make best effort to + # decide whether to syncify + try: + asyncio.get_running_loop() + return True + except RuntimeError: + return False -def _is_acceptable_callable(obj: Union[Callable, Task]) -> bool: +def _is_acceptable_callable(obj: Union[Callable, "Task", classmethod]) -> bool: if inspect.iscoroutinefunction(obj): return True - if isinstance(obj, Task) and inspect.iscoroutinefunction(obj.fn): + + # Check if a task or flow. Need to avoid importing `Task` or `Flow` here + # due to circular imports. + if (fn := getattr(obj, "fn", None)) and inspect.iscoroutinefunction(fn): return True + + if isinstance(obj, classmethod) and inspect.iscoroutinefunction(obj.__func__): + return True + return False @@ -56,6 +81,8 @@ def wrapper( if should_run_sync: return sync_fn(*args, **kwargs) + if isinstance(async_impl, classmethod): + return async_impl.__func__(*args, **kwargs) return async_impl(*args, **kwargs) return wrapper # type: ignore diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index cdd403bb767c..2873a0c45393 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -41,6 +41,7 @@ from typing_extensions import Literal, ParamSpec, Self, get_args import prefect.exceptions +from prefect._internal.compatibility.async_dispatch import async_dispatch from prefect.client.schemas import ( DEFAULT_BLOCK_SCHEMA_VERSION, BlockDocument, @@ -53,7 +54,7 @@ from prefect.logging.loggers import disable_logger from prefect.plugins import load_prefect_collections from prefect.types import SecretDict -from prefect.utilities.asyncutils import sync_compatible +from prefect.utilities.asyncutils import run_coro_as_sync, sync_compatible from prefect.utilities.collections import listrepr, remove_nested_keys, visit_collection from prefect.utilities.dispatch import lookup_type, register_base_type from prefect.utilities.hashing import hash_objects @@ -64,7 +65,7 @@ if TYPE_CHECKING: from pydantic.main import IncEx - from prefect.client.orchestration import PrefectClient + from prefect.client.orchestration import PrefectClient, SyncPrefectClient R = TypeVar("R") P = ParamSpec("P") @@ -777,12 +778,13 @@ def _define_metadata_on_nested_blocks( ) @classmethod - @inject_client - async def _get_block_document( + async def _aget_block_document( cls, name: str, - client: Optional["PrefectClient"] = None, - ): + client: "PrefectClient", + ) -> tuple[BlockDocument, str]: + if TYPE_CHECKING: + assert isinstance(client, PrefectClient) if cls.__name__ == "Block": block_type_slug, block_document_name = name.split("/", 1) else: @@ -801,6 +803,32 @@ async def _get_block_document( return block_document, block_document_name + @classmethod + def _get_block_document( + cls, + name: str, + client: "SyncPrefectClient", + ) -> tuple[BlockDocument, str]: + if TYPE_CHECKING: + assert isinstance(client, PrefectClient) + if cls.__name__ == "Block": + block_type_slug, block_document_name = name.split("/", 1) + else: + block_type_slug = cls.get_block_type_slug() + block_document_name = name + + try: + block_document = client.read_block_document_by_name( + name=block_document_name, block_type_slug=block_type_slug + ) + except prefect.exceptions.ObjectNotFound as e: + raise ValueError( + f"Unable to find block document named {block_document_name} for block" + f" type {block_type_slug}" + ) from e + + return block_document, block_document_name + @classmethod @sync_compatible @inject_client @@ -829,9 +857,97 @@ async def _get_block_document_by_id( return block_document, block_document.name @classmethod - @sync_compatible @inject_client - async def load( + async def aload( + cls, + name: str, + validate: bool = True, + client: Optional["PrefectClient"] = None, + ) -> "Self": + """ + Retrieves data from the block document with the given name for the block type + that corresponds with the current class and returns an instantiated version of + the current class with the data stored in the block document. + + If a block document for a given block type is saved with a different schema + than the current class calling `aload`, a warning will be raised. + + If the current class schema is a subset of the block document schema, the block + can be loaded as normal using the default `validate = True`. + + If the current class schema is a superset of the block document schema, `aload` + must be called with `validate` set to False to prevent a validation error. In + this case, the block attributes will default to `None` and must be set manually + and saved to a new block document before the block can be used as expected. + + Args: + name: The name or slug of the block document. A block document slug is a + string with the format / + validate: If False, the block document will be loaded without Pydantic + validating the block schema. This is useful if the block schema has + changed client-side since the block document referred to by `name` was saved. + client: The client to use to load the block document. If not provided, the + default client will be injected. + + Raises: + ValueError: If the requested block document is not found. + + Returns: + An instance of the current class hydrated with the data stored in the + block document with the specified name. + + Examples: + Load from a Block subclass with a block document name: + ```python + class Custom(Block): + message: str + + Custom(message="Hello!").save("my-custom-message") + + loaded_block = await Custom.aload("my-custom-message") + ``` + + Load from Block with a block document slug: + ```python + class Custom(Block): + message: str + + Custom(message="Hello!").save("my-custom-message") + + loaded_block = await Block.aload("custom/my-custom-message") + ``` + + Migrate a block document to a new schema: + ```python + # original class + class Custom(Block): + message: str + + Custom(message="Hello!").save("my-custom-message") + + # Updated class with new required field + class Custom(Block): + message: str + number_of_ducks: int + + loaded_block = await Custom.aload("my-custom-message", validate=False) + + # Prints UserWarning about schema mismatch + + loaded_block.number_of_ducks = 42 + + loaded_block.save("my-custom-message", overwrite=True) + ``` + """ + if TYPE_CHECKING: + assert isinstance(client, PrefectClient) + block_document, _ = await cls._aget_block_document(name, client=client) + + return cls._load_from_block_document(block_document, validate=validate) + + @classmethod + @async_dispatch(aload) + def load( cls, name: str, validate: bool = True, @@ -912,9 +1028,18 @@ class Custom(Block): loaded_block.save("my-custom-message", overwrite=True) ``` """ - block_document, block_document_name = await cls._get_block_document( - name, client=client - ) + # Need to use a `PrefectClient` here to ensure `Block.load` and `Block.aload` signatures match + # TODO: replace with only sync client once all internal calls are updated to use `Block.aload` and `@async_dispatch` is removed + if client is None: + # If a client wasn't provided, we get to use a sync client + from prefect.client.orchestration import get_client + + sync_client = get_client(sync_client=True) + block_document, _ = cls._get_block_document(name, client=sync_client) + else: + block_document, _ = run_coro_as_sync( + cls._aget_block_document(name, client=client) + ) return cls._load_from_block_document(block_document, validate=validate) @@ -968,14 +1093,16 @@ async def load_from_ref( """ block_document = None if isinstance(ref, (str, UUID)): - block_document, _ = await cls._get_block_document_by_id(ref) + block_document, _ = await cls._get_block_document_by_id(ref, client=client) elif isinstance(ref, dict): if block_document_id := ref.get("block_document_id"): block_document, _ = await cls._get_block_document_by_id( - block_document_id + block_document_id, client=client ) elif block_document_slug := ref.get("block_document_slug"): - block_document, _ = await cls._get_block_document(block_document_slug) + block_document, _ = await cls._get_block_document( + block_document_slug, client=client + ) if not block_document: raise ValueError(f"Invalid reference format {ref!r}.") @@ -1220,7 +1347,9 @@ async def delete( name: str, client: Optional["PrefectClient"] = None, ): - block_document, block_document_name = await cls._get_block_document(name) + if TYPE_CHECKING: + assert isinstance(client, PrefectClient) + block_document, _ = await cls._aget_block_document(name, client=client) await client.delete_block_document(block_document.id) diff --git a/src/prefect/client/orchestration.py b/src/prefect/client/orchestration.py index 7afaf385b6c8..13ece0c15463 100644 --- a/src/prefect/client/orchestration.py +++ b/src/prefect/client/orchestration.py @@ -4415,3 +4415,41 @@ def update_flow_run_labels( json=labels, ) response.raise_for_status() + + def read_block_document_by_name( + self, + name: str, + block_type_slug: str, + include_secrets: bool = True, + ) -> BlockDocument: + """ + Read the block document with the specified name that corresponds to a + specific block type name. + + Args: + name: The block document name. + block_type_slug: The block type slug. + include_secrets (bool): whether to include secret values + on the Block, corresponding to Pydantic's `SecretStr` and + `SecretBytes` fields. These fields are automatically obfuscated + by Pydantic, but users can additionally choose not to receive + their values from the API. Note that any business logic on the + Block may not work if this is `False`. + + Raises: + httpx.RequestError: if the block document was not found for any reason + + Returns: + A block document or None. + """ + try: + response = self._client.get( + f"/block_types/slug/{block_type_slug}/block_documents/name/{name}", + params=dict(include_secrets=include_secrets), + ) + except httpx.HTTPStatusError as e: + if e.response.status_code == status.HTTP_404_NOT_FOUND: + raise prefect.exceptions.ObjectNotFound(http_exc=e) from e + else: + raise + return BlockDocument.model_validate(response.json()) diff --git a/src/prefect/client/utilities.py b/src/prefect/client/utilities.py index 4622a7d6fe32..75bbd24b5d14 100644 --- a/src/prefect/client/utilities.py +++ b/src/prefect/client/utilities.py @@ -7,7 +7,7 @@ from collections.abc import Awaitable, Coroutine from functools import wraps -from typing import TYPE_CHECKING, Any, Callable, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Optional, Union, overload from typing_extensions import Concatenate, ParamSpec, TypeGuard, TypeVar @@ -71,9 +71,23 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: return wrapper +@overload def inject_client( fn: Callable[P, Coroutine[Any, Any, R]], ) -> Callable[P, Coroutine[Any, Any, R]]: + ... + + +@overload +def inject_client( + fn: Callable[P, R], +) -> Callable[P, R]: + ... + + +def inject_client( + fn: Callable[P, Union[Coroutine[Any, Any, R], R]], +) -> Callable[P, Union[Coroutine[Any, Any, R], R]]: """ Simple helper to provide a context managed client to an asynchronous function. diff --git a/tests/_internal/compatibility/test_async_dispatch.py b/tests/_internal/compatibility/test_async_dispatch.py index ea7b10c4387b..c2427a728d3e 100644 --- a/tests/_internal/compatibility/test_async_dispatch.py +++ b/tests/_internal/compatibility/test_async_dispatch.py @@ -7,7 +7,9 @@ async_dispatch, is_in_async_context, ) -from prefect.utilities.asyncutils import run_sync_in_worker_thread +from prefect.flows import flow +from prefect.tasks import task +from prefect.utilities.asyncutils import run_coro_as_sync, run_sync_in_worker_thread class TestAsyncDispatchBasicUsage: @@ -58,6 +60,38 @@ def my_function() -> None: await my_function(_sync=False) assert data == ["async"] + async def test_works_with_classmethods(self): + """Verify that async_impl can be a classmethod""" + + class MyClass: + @classmethod + async def my_amethod(cls) -> str: + return "async" + + @classmethod + @async_dispatch(my_amethod) + def my_method(cls) -> str: + return "sync" + + assert await MyClass.my_amethod() == "async" + assert MyClass.my_method(_sync=True) == "sync" + assert await MyClass.my_method() == "async" + + def test_works_with_classmethods_in_sync_context(self): + """Verify that classmethods work in sync context""" + + class MyClass: + @classmethod + async def my_amethod(cls) -> str: + return "async" + + @classmethod + @async_dispatch(my_amethod) + def my_method(cls) -> str: + return "sync" + + assert MyClass.my_method() == "sync" + class TestAsyncDispatchValidation: def test_async_compatible_requires_async_implementation(self): @@ -182,3 +216,89 @@ def check_context() -> None: assert ( is_in_async_context() is False ), "the loop should be closed and not considered an async context" + + +class TestIsInARunContext: + def test_dispatches_to_async_in_async_flow(self): + """ + Verify that async_dispatch dispatches to async in async flow + + The test function is sync, but the flow is async, so we should dispatch to + the async implementation. + """ + + async def my_afunction() -> str: + return "async" + + @async_dispatch(my_afunction) + def my_function() -> str: + return "sync" + + @flow + async def my_flow() -> str: + return await my_function() + + assert run_coro_as_sync(my_flow()) == "async" + + async def test_dispatches_to_sync_in_sync_flow(self): + """ + Verify that async_dispatch dispatches to sync in sync flow + + The test function is async, but the flow is sync, so we should dispatch to + the sync implementation. + """ + + async def my_afunction() -> str: + return "async" + + @async_dispatch(my_afunction) + def my_function() -> str: + return "sync" + + @flow + def my_flow() -> str: + return my_function() + + assert my_flow() == "sync" + + def test_dispatches_to_async_in_an_async_task(self): + """ + Verify that async_dispatch dispatches to async in an async task + + The test function is sync, but the task is async, so we should dispatch to + the async implementation. + """ + + async def my_afunction() -> str: + return "async" + + @async_dispatch(my_afunction) + def my_function() -> str: + return "sync" + + @task + async def my_task() -> str: + return await my_function() + + assert run_coro_as_sync(my_task()) == "async" + + async def test_dispatches_to_sync_in_a_sync_task(self): + """ + Verify that async_dispatch dispatches to sync in a sync task + + The test function is async, but the task is sync, so we should dispatch to + the sync implementation. + """ + + async def my_afunction() -> str: + return "async" + + @async_dispatch(my_afunction) + def my_function() -> str: + return "sync" + + @task + def my_task() -> str: + return my_function() + + assert my_task() == "sync" diff --git a/tests/blocks/test_core.py b/tests/blocks/test_core.py index 80a7e2dd75ab..d70c6d12cc16 100644 --- a/tests/blocks/test_core.py +++ b/tests/blocks/test_core.py @@ -887,8 +887,8 @@ class ParentBlock(Block): async def test_block_load( self, test_block, block_document, in_memory_prefect_client ): - my_block = await test_block.load( - block_document.name, client=in_memory_prefect_client + my_block = test_block.load( + block_document.name, client=in_memory_prefect_client, _sync=True ) assert my_block._block_document_name == block_document.name @@ -897,6 +897,16 @@ async def test_block_load( assert my_block._block_schema_id == block_document.block_schema_id assert my_block.foo == "bar" + my_aloaded_block = await test_block.aload( + block_document.name, client=in_memory_prefect_client + ) + + assert my_aloaded_block._block_document_name == block_document.name + assert my_aloaded_block._block_document_id == block_document.id + assert my_aloaded_block._block_type_id == block_document.block_type_id + assert my_aloaded_block._block_schema_id == block_document.block_schema_id + assert my_aloaded_block.foo == "bar" + @patch("prefect.blocks.core.load_prefect_collections") async def test_block_load_loads_collections( self, @@ -905,7 +915,16 @@ async def test_block_load_loads_collections( block_document: BlockDocument, in_memory_prefect_client, ): - await Block.load( + Block.load( + block_document.block_type.slug + "/" + block_document.name, + client=in_memory_prefect_client, + _sync=True, + ) + mock_load_prefect_collections.assert_called_once() + + mock_load_prefect_collections.reset_mock() + + await Block.aload( block_document.block_type.slug + "/" + block_document.name, client=in_memory_prefect_client, ) @@ -918,9 +937,12 @@ class Custom(Block): my_custom_block = Custom(message="hello") await my_custom_block.save("my-custom-block") - loaded_block = await Block.load("custom/my-custom-block") + loaded_block = Block.load("custom/my-custom-block", _sync=True) assert loaded_block.message == "hello" + aloaded_block = await Block.aload("custom/my-custom-block") + assert aloaded_block.message == "hello" + async def test_load_nested_block(self, session, in_memory_prefect_client): class B(Block): _block_schema_type = "abc" @@ -1018,8 +1040,8 @@ class E(Block): await session.commit() - block_instance = await E.load( - "outer-block-document", client=in_memory_prefect_client + block_instance = E.load( + "outer-block-document", client=in_memory_prefect_client, _sync=True ) assert isinstance(block_instance, E) assert isinstance(block_instance.c, C) @@ -1051,12 +1073,56 @@ class E(Block): "block_type_slug": "d", } + aloaded_block_instance = await E.aload( + "outer-block-document", client=in_memory_prefect_client + ) + assert isinstance(aloaded_block_instance, E) + assert isinstance(aloaded_block_instance.c, C) + assert isinstance(aloaded_block_instance.d, D) + + assert aloaded_block_instance._block_document_name == outer_block_document.name + assert aloaded_block_instance._block_document_id == outer_block_document.id + assert ( + aloaded_block_instance._block_type_id == outer_block_document.block_type_id + ) + assert ( + aloaded_block_instance._block_schema_id + == outer_block_document.block_schema_id + ) + assert aloaded_block_instance.c.model_dump() == { + "y": 2, + "_block_document_id": middle_block_document_1.id, + "_block_document_name": "middle-block-document-1", + "_is_anonymous": False, + "block_type_slug": "c", + } + assert aloaded_block_instance.d.model_dump() == { + "b": { + "x": 1, + "_block_document_id": inner_block_document.id, + "_block_document_name": "inner-block-document", + "_is_anonymous": False, + "block_type_slug": "b", + }, + "z": "ztop", + "_block_document_id": middle_block_document_2.id, + "_block_document_name": "middle-block-document-2", + "_is_anonymous": False, + "block_type_slug": "d", + } + async def test_create_block_from_nonexistent_name(self, test_block): with pytest.raises( ValueError, match="Unable to find block document named blocky for block type x", ): - await test_block.load("blocky") + test_block.load("blocky", _sync=True) + + with pytest.raises( + ValueError, + match="Unable to find block document named blocky for block type x", + ): + await test_block.aload("blocky") async def test_save_block_from_flow(self): class Test(Block): From 02b47f52f877adb1b0cbd2fde44adf450da76712 Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Wed, 11 Dec 2024 10:55:58 -0600 Subject: [PATCH 2/3] little bit of clean up --- src/prefect/blocks/core.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/prefect/blocks/core.py b/src/prefect/blocks/core.py index 2873a0c45393..2a57b3149f8a 100644 --- a/src/prefect/blocks/core.py +++ b/src/prefect/blocks/core.py @@ -783,8 +783,6 @@ async def _aget_block_document( name: str, client: "PrefectClient", ) -> tuple[BlockDocument, str]: - if TYPE_CHECKING: - assert isinstance(client, PrefectClient) if cls.__name__ == "Block": block_type_slug, block_document_name = name.split("/", 1) else: @@ -809,8 +807,6 @@ def _get_block_document( name: str, client: "SyncPrefectClient", ) -> tuple[BlockDocument, str]: - if TYPE_CHECKING: - assert isinstance(client, PrefectClient) if cls.__name__ == "Block": block_type_slug, block_document_name = name.split("/", 1) else: @@ -1034,9 +1030,10 @@ class Custom(Block): # If a client wasn't provided, we get to use a sync client from prefect.client.orchestration import get_client - sync_client = get_client(sync_client=True) - block_document, _ = cls._get_block_document(name, client=sync_client) + with get_client(sync_client=True) as sync_client: + block_document, _ = cls._get_block_document(name, client=sync_client) else: + # If a client was provided, reuse it, even though it's async, to avoid excessive client creation block_document, _ = run_coro_as_sync( cls._aget_block_document(name, client=client) ) From 972c8bcd18d64c4e458f7f82ccce6f9dbacc7ac9 Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Wed, 11 Dec 2024 11:24:37 -0600 Subject: [PATCH 3/3] Use `Block.aload` internally --- src/prefect/cli/_prompts.py | 2 +- src/prefect/cli/deploy.py | 2 +- src/prefect/deployments/steps/pull.py | 2 +- src/prefect/results.py | 4 ++-- src/prefect/testing/utilities.py | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/prefect/cli/_prompts.py b/src/prefect/cli/_prompts.py index 1d18507eb5a0..9fc542d5ac68 100644 --- a/src/prefect/cli/_prompts.py +++ b/src/prefect/cli/_prompts.py @@ -503,7 +503,7 @@ async def prompt_push_custom_docker_image( docker_registry_creds_name = f"deployment-{slugify(deployment_config['name'])}-{slugify(deployment_config['work_pool']['name'])}-registry-creds" create_new_block = False try: - await credentials_block.load(docker_registry_creds_name) + await credentials_block.aload(docker_registry_creds_name) if not confirm( ( "Would you like to use the existing Docker registry credentials" diff --git a/src/prefect/cli/deploy.py b/src/prefect/cli/deploy.py index 63d9e079e8df..0c5ec0851855 100644 --- a/src/prefect/cli/deploy.py +++ b/src/prefect/cli/deploy.py @@ -1028,7 +1028,7 @@ async def _generate_git_clone_pull_step( ) try: - await Secret.load(token_secret_block_name) + await Secret.aload(token_secret_block_name) if not confirm( ( "We found an existing token saved for this deployment. Would" diff --git a/src/prefect/deployments/steps/pull.py b/src/prefect/deployments/steps/pull.py index 8f2a82f54cb9..69ea5d54b8c8 100644 --- a/src/prefect/deployments/steps/pull.py +++ b/src/prefect/deployments/steps/pull.py @@ -190,7 +190,7 @@ async def pull_with_block(block_document_name: str, block_type_slug: str): full_slug = f"{block_type_slug}/{block_document_name}" try: - block = await Block.load(full_slug) + block = await Block.aload(full_slug) except Exception: deployment_logger.exception("Unable to load block '%s'", full_slug) raise diff --git a/src/prefect/results.py b/src/prefect/results.py index dd17f614953d..96569ea1f5b3 100644 --- a/src/prefect/results.py +++ b/src/prefect/results.py @@ -129,7 +129,7 @@ async def resolve_result_storage( elif isinstance(result_storage, Path): storage_block = LocalFileSystem(basepath=str(result_storage)) elif isinstance(result_storage, str): - storage_block = await Block.load(result_storage, client=client) + storage_block = await Block.aload(result_storage, client=client) storage_block_id = storage_block._block_document_id assert storage_block_id is not None, "Loaded storage blocks must have ids" elif isinstance(result_storage, UUID): @@ -168,7 +168,7 @@ async def get_or_create_default_task_scheduling_storage() -> ResultStorage: default_block = settings.tasks.scheduling.default_storage_block if default_block is not None: - return await Block.load(default_block) + return await Block.aload(default_block) # otherwise, use the local file system basepath = settings.results.local_storage_path diff --git a/src/prefect/testing/utilities.py b/src/prefect/testing/utilities.py index d73cb66f0adf..068ecb3a8a5a 100644 --- a/src/prefect/testing/utilities.py +++ b/src/prefect/testing/utilities.py @@ -249,7 +249,7 @@ async def assert_uses_result_storage( ( storage if isinstance(storage, Block) - else await Block.load(storage, client=client) + else await Block.aload(storage, client=client) ), ) else: @@ -260,7 +260,7 @@ async def assert_uses_result_storage( ( storage if isinstance(storage, Block) - else await Block.load(storage, client=client) + else await Block.aload(storage, client=client) ), )