Skip to content

Commit

Permalink
Clean up PrefectDBInterface, use the models consistently
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpieters committed Dec 16, 2024
1 parent b42d3a1 commit 382d5d9
Show file tree
Hide file tree
Showing 111 changed files with 2,857 additions and 2,682 deletions.
9 changes: 8 additions & 1 deletion docs/v3/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -20136,7 +20136,14 @@
"title": "Key"
},
"type": {
"type": "string",
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Type"
},
"is_latest": {
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ async def stop():
@database_app.command()
async def reset(yes: bool = typer.Option(False, "--yes", "-y")):
"""Drop and recreate all Prefect database tables"""
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database import provide_database_interface

db = provide_database_interface()
engine = await db.engine()
Expand Down Expand Up @@ -378,8 +378,8 @@ async def upgrade(
),
):
"""Upgrade the Prefect database"""
from prefect.server.database import provide_database_interface
from prefect.server.database.alembic_commands import alembic_upgrade
from prefect.server.database.dependencies import provide_database_interface

db = provide_database_interface()
engine = await db.engine()
Expand Down Expand Up @@ -418,8 +418,8 @@ async def downgrade(
),
):
"""Downgrade the Prefect database"""
from prefect.server.database import provide_database_interface
from prefect.server.database.alembic_commands import alembic_downgrade
from prefect.server.database.dependencies import provide_database_interface

db = provide_database_interface()

Expand Down
9 changes: 9 additions & 0 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ def result(
) -> Union[R, Exception]:
...

@overload
def result(
self: "State[R]",
raise_on_failure: bool = ...,
fetch: bool = ...,
retry_result_failure: bool = ...,
) -> Union[R, Exception]:
...

@deprecated.deprecated_parameter(
"fetch",
when=lambda fetch: fetch is not True,
Expand Down
7 changes: 5 additions & 2 deletions src/prefect/events/filters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional, Tuple, cast
from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from uuid import UUID

import pendulum
Expand All @@ -10,6 +10,9 @@

from .schemas.events import Event, Resource, ResourceSpecification

if TYPE_CHECKING:
import sqlalchemy as sa


class AutomationFilterCreated(PrefectBaseModel):
"""Filter by `Automation.created`."""
Expand Down Expand Up @@ -41,7 +44,7 @@ class AutomationFilter(PrefectBaseModel):
class EventDataFilter(PrefectBaseModel, extra="forbid"): # type: ignore[call-arg]
"""A base class for filtering event data."""

_top_level_filter: Optional["EventFilter"] = PrivateAttr(None)
_top_level_filter: Optional["sa.Select[tuple[UUID]]"] = PrivateAttr(None)

def get_filters(self) -> List["EventDataFilter"]:
filters: List["EventDataFilter"] = [
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/server/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from . import models, orchestration, schemas, services

__all__ = ["models", "orchestration", "schemas", "services"]
3 changes: 1 addition & 2 deletions src/prefect/server/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

import prefect
import prefect.settings
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/admin", tags=["Admin"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@

import prefect.server.api.dependencies as dependencies
from prefect.server import models
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.schemas import actions, core, filters, sorting
from prefect.server.utilities.server import PrefectRouter

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/automations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from prefect.server.api.validation import (
validate_job_variables_for_run_deployment_action,
)
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.events import actions
from prefect.server.events.filters import AutomationFilter, AutomationFilterCreated
from prefect.server.events.models import automations as automations_models
Expand Down
5 changes: 1 addition & 4 deletions src/prefect/server/api/block_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
from fastapi import Depends

from prefect.server import models
from prefect.server.database.dependencies import (
PrefectDBInterface,
provide_database_interface,
)
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/block_capabilities", tags=["Block capabilities"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/block_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

from prefect.server import models, schemas
from prefect.server.api import dependencies
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/block_documents", tags=["Block documents"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/block_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from prefect.server import models, schemas
from prefect.server.api import dependencies
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.models.block_schemas import MissingBlockTypeException
from prefect.server.utilities.server import PrefectRouter

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/block_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from prefect.blocks.core import _should_update_block_type
from prefect.server import models, schemas
from prefect.server.api import dependencies
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/block_types", tags=["Block types"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/concurrency_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.api.concurrency_limits_v2 import MinimalConcurrencyLimitResponse
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.models import concurrency_limits
from prefect.server.utilities.server import PrefectRouter
from prefect.settings import PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/concurrency_limits_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.api.dependencies import LimitBody
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.schemas import actions
from prefect.server.utilities.schemas import PrefectBaseModel
from prefect.server.utilities.server import PrefectRouter
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/csrf_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

from prefect.logging import get_logger
from prefect.server import models, schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter
from prefect.settings import PREFECT_SERVER_CSRF_PROTECTION_ENABLED

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
validate_job_variables_for_deployment_flow_run,
)
from prefect.server.api.workers import WorkerLookups
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.exceptions import MissingVariableError, ObjectNotFoundError
from prefect.server.models.deployments import mark_deployments_ready
from prefect.server.models.workers import DEFAULT_AGENT_WORK_POOL_NAME
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

from prefect.logging import get_logger
from prefect.server.api.dependencies import is_ephemeral_request
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.events import messaging, stream
from prefect.server.events.counting import (
Countable,
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/flow_run_notification_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import prefect.server.api.dependencies as dependencies
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/flow_run_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/flow_run_states", tags=["Flow Run States"])
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/server/api/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
from prefect.logging import get_logger
from prefect.server.api.run_history import run_history
from prefect.server.api.validation import validate_job_variables_for_deployment_flow_run
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.exceptions import FlowRunGraphTooLarge
from prefect.server.models.flow_runs import (
DependencyResult,
Expand Down Expand Up @@ -215,6 +214,7 @@ async def average_flow_run_lateness(
base_query = db.FlowRun.estimated_start_time_delta

query = await models.flow_runs._apply_flow_run_filters(
db,
sa.select(sa.func.avg(base_query)),
flow_filter=flows,
flow_run_filter=flow_runs,
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import prefect.server.api.dependencies as dependencies
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.schemas.responses import FlowPaginationResponse
from prefect.server.utilities.server import PrefectRouter

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import prefect.server.api.dependencies as dependencies
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/logs", tags=["Logs"])
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/api/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from prefect import settings
from prefect.server import models
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database import provide_database_interface

NextMiddlewareFunction = Callable[[Request], Awaitable[Response]]

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from fastapi import Depends, status
from fastapi.responses import JSONResponse

from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="", tags=["Root"])
Expand Down
18 changes: 9 additions & 9 deletions src/prefect/server/api/run_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.logging import get_logger
from prefect.server.database.dependencies import db_injector
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, db_injector
from prefect.types import DateTime

logger = get_logger("server.api")
Expand Down Expand Up @@ -57,7 +56,7 @@ async def run_history(
)

# create a CTE for timestamp intervals
intervals = db.make_timestamp_intervals(
intervals = db.queries.make_timestamp_intervals(
history_start,
history_end,
history_interval,
Expand All @@ -66,6 +65,7 @@ async def run_history(
# apply filters to the flow runs (and related states)
runs = (
await run_filter_function(
db,
sa.select(
run_model.id,
run_model.expected_start_time,
Expand All @@ -92,7 +92,7 @@ async def run_history(
# build a JSON object, ignoring the case where the count of runs is 0
sa.case(
(sa.func.count(runs.c.id) == 0, None),
else_=db.build_json_object(
else_=db.queries.build_json_object(
"state_type",
runs.c.state_type,
"state_name",
Expand Down Expand Up @@ -140,10 +140,10 @@ async def run_history(
counts.c.interval_start,
counts.c.interval_end,
sa.func.coalesce(
db.json_arr_agg(db.cast_to_json(counts.c.state_agg)).filter(
counts.c.state_agg.is_not(None)
),
sa.text("'[]'"),
db.queries.json_arr_agg(
db.queries.cast_to_json(counts.c.state_agg)
).filter(counts.c.state_agg.is_not(None)),
sa.literal("[]", literal_execute=True),
).label("states"),
)
.group_by(counts.c.interval_start, counts.c.interval_end)
Expand All @@ -157,7 +157,7 @@ async def run_history(
records = result.mappings()

# load and parse the record if the database returns JSON as strings
if db.uses_json_strings:
if db.queries.uses_json_strings:
records = [dict(r) for r in records]
for r in records:
r["states"] = json.loads(r["states"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/saved_searches.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
import prefect.server.api.dependencies as dependencies
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/saved_searches", tags=["SavedSearches"])
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/server/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ def create_app(
async def run_migrations():
"""Ensure the database is created and up to date with the current migrations"""
if prefect.settings.PREFECT_API_DATABASE_MIGRATE_ON_START:
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database import provide_database_interface

db = provide_database_interface()
await db.create_db()
Expand All @@ -514,7 +514,7 @@ async def add_block_types():
if not prefect.settings.PREFECT_API_BLOCKS_REGISTER_ON_START:
return

from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database import provide_database_interface
from prefect.server.models.block_registration import run_block_auto_registration

db = provide_database_interface()
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/task_run_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/task_run_states", tags=["Task Run States"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/task_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import prefect.server.schemas as schemas
from prefect.logging import get_logger
from prefect.server.api.run_history import run_history
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.orchestration import dependencies as orchestration_dependencies
from prefect.server.orchestration.core_policy import CoreTaskPolicy
from prefect.server.orchestration.policies import BaseOrchestrationPolicy
Expand Down
Loading

0 comments on commit 382d5d9

Please sign in to comment.