Skip to content

Commit

Permalink
Clean up PrefectDBInterface, use the models consistently
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpieters committed Dec 18, 2024
1 parent 81a122a commit d90d102
Show file tree
Hide file tree
Showing 111 changed files with 2,918 additions and 2,723 deletions.
22 changes: 18 additions & 4 deletions docs/v3/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@
"type": "string",
"format": "date-time",
"description": "Only include runs that start or end after this time.",
"default": "0001-01-01T00:00:00",
"default": "0001-01-01T00:00:00+00:00",
"title": "Since"
},
"description": "Only include runs that start or end after this time."
Expand Down Expand Up @@ -20044,8 +20044,15 @@
"Graph": {
"properties": {
"start_time": {
"type": "string",
"format": "date-time",
"anyOf": [
{
"type": "string",
"format": "date-time"
},
{
"type": "null"
}
],
"title": "Start Time"
},
"end_time": {
Expand Down Expand Up @@ -20136,7 +20143,14 @@
"title": "Key"
},
"type": {
"type": "string",
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Type"
},
"is_latest": {
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ async def stop():
@database_app.command()
async def reset(yes: bool = typer.Option(False, "--yes", "-y")):
"""Drop and recreate all Prefect database tables"""
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database import provide_database_interface

db = provide_database_interface()
engine = await db.engine()
Expand Down Expand Up @@ -378,8 +378,8 @@ async def upgrade(
),
):
"""Upgrade the Prefect database"""
from prefect.server.database import provide_database_interface
from prefect.server.database.alembic_commands import alembic_upgrade
from prefect.server.database.dependencies import provide_database_interface

db = provide_database_interface()
engine = await db.engine()
Expand Down Expand Up @@ -418,8 +418,8 @@ async def downgrade(
),
):
"""Downgrade the Prefect database"""
from prefect.server.database import provide_database_interface
from prefect.server.database.alembic_commands import alembic_downgrade
from prefect.server.database.dependencies import provide_database_interface

db = provide_database_interface()

Expand Down
9 changes: 9 additions & 0 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ def result(
) -> Union[R, Exception]:
...

@overload
def result(
self: "State[R]",
raise_on_failure: bool = ...,
fetch: bool = ...,
retry_result_failure: bool = ...,
) -> Union[R, Exception]:
...

@deprecated.deprecated_parameter(
"fetch",
when=lambda fetch: fetch is not True,
Expand Down
10 changes: 2 additions & 8 deletions src/prefect/events/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from uuid import UUID

import pendulum
from pydantic import Field, PrivateAttr
from pydantic import Field

from prefect._internal.schemas.bases import PrefectBaseModel
from prefect.types import DateTime
Expand Down Expand Up @@ -41,18 +41,12 @@ class AutomationFilter(PrefectBaseModel):
class EventDataFilter(PrefectBaseModel, extra="forbid"): # type: ignore[call-arg]
"""A base class for filtering event data."""

_top_level_filter: Optional["EventFilter"] = PrivateAttr(None)

def get_filters(self) -> List["EventDataFilter"]:
filters: List["EventDataFilter"] = [
filter
for filter in [
getattr(self, name) for name, field in self.model_fields.items()
]
for filter in [getattr(self, name) for name in self.model_fields]
if isinstance(filter, EventDataFilter)
]
for filter in filters:
filter._top_level_filter = self._top_level_filter
return filters

def includes(self, event: Event) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/server/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from . import models, orchestration, schemas, services

__all__ = ["models", "orchestration", "schemas", "services"]
3 changes: 1 addition & 2 deletions src/prefect/server/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

import prefect
import prefect.settings
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/admin", tags=["Admin"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@

import prefect.server.api.dependencies as dependencies
from prefect.server import models
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.schemas import actions, core, filters, sorting
from prefect.server.utilities.server import PrefectRouter

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/automations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from prefect.server.api.validation import (
validate_job_variables_for_run_deployment_action,
)
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.events import actions
from prefect.server.events.filters import AutomationFilter, AutomationFilterCreated
from prefect.server.events.models import automations as automations_models
Expand Down
5 changes: 1 addition & 4 deletions src/prefect/server/api/block_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
from fastapi import Depends

from prefect.server import models
from prefect.server.database.dependencies import (
PrefectDBInterface,
provide_database_interface,
)
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/block_capabilities", tags=["Block capabilities"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/block_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

from prefect.server import models, schemas
from prefect.server.api import dependencies
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/block_documents", tags=["Block documents"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/block_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from prefect.server import models, schemas
from prefect.server.api import dependencies
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.models.block_schemas import MissingBlockTypeException
from prefect.server.utilities.server import PrefectRouter

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/block_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from prefect.blocks.core import _should_update_block_type
from prefect.server import models, schemas
from prefect.server.api import dependencies
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/block_types", tags=["Block types"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/concurrency_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.api.concurrency_limits_v2 import MinimalConcurrencyLimitResponse
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.models import concurrency_limits
from prefect.server.utilities.server import PrefectRouter
from prefect.settings import PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/concurrency_limits_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.api.dependencies import LimitBody
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.schemas import actions
from prefect.server.utilities.schemas import PrefectBaseModel
from prefect.server.utilities.server import PrefectRouter
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/csrf_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

from prefect.logging import get_logger
from prefect.server import models, schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter
from prefect.settings import PREFECT_SERVER_CSRF_PROTECTION_ENABLED

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
validate_job_variables_for_deployment_flow_run,
)
from prefect.server.api.workers import WorkerLookups
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.exceptions import MissingVariableError, ObjectNotFoundError
from prefect.server.models.deployments import mark_deployments_ready
from prefect.server.models.workers import DEFAULT_AGENT_WORK_POOL_NAME
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

from prefect.logging import get_logger
from prefect.server.api.dependencies import is_ephemeral_request
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.events import messaging, stream
from prefect.server.events.counting import (
Countable,
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/flow_run_notification_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import prefect.server.api.dependencies as dependencies
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/flow_run_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/flow_run_states", tags=["Flow Run States"])
Expand Down
9 changes: 5 additions & 4 deletions src/prefect/server/api/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Response,
status,
)
from fastapi.encoders import jsonable_encoder
from fastapi.responses import ORJSONResponse, PlainTextResponse, StreamingResponse
from sqlalchemy.exc import IntegrityError

Expand All @@ -29,8 +30,7 @@
from prefect.logging import get_logger
from prefect.server.api.run_history import run_history
from prefect.server.api.validation import validate_job_variables_for_deployment_flow_run
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.exceptions import FlowRunGraphTooLarge
from prefect.server.models.flow_runs import (
DependencyResult,
Expand Down Expand Up @@ -215,6 +215,7 @@ async def average_flow_run_lateness(
base_query = db.FlowRun.estimated_start_time_delta

query = await models.flow_runs._apply_flow_run_filters(
db,
sa.select(sa.func.avg(base_query)),
flow_filter=flows,
flow_run_filter=flow_runs,
Expand Down Expand Up @@ -321,8 +322,8 @@ async def read_flow_run_graph_v1(
@router.get("/{id:uuid}/graph-v2")
async def read_flow_run_graph_v2(
flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
since: datetime.datetime = Query(
datetime.datetime.min,
since: DateTime = Query(
default=jsonable_encoder(DateTime.min),
description="Only include runs that start or end after this time.",
),
db: PrefectDBInterface = Depends(provide_database_interface),
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import prefect.server.api.dependencies as dependencies
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.schemas.responses import FlowPaginationResponse
from prefect.server.utilities.server import PrefectRouter

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import prefect.server.api.dependencies as dependencies
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/logs", tags=["Logs"])
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/api/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from prefect import settings
from prefect.server import models
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database import provide_database_interface

NextMiddlewareFunction = Callable[[Request], Awaitable[Response]]

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from fastapi import Depends, status
from fastapi.responses import JSONResponse

from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="", tags=["Root"])
Expand Down
18 changes: 9 additions & 9 deletions src/prefect/server/api/run_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.logging import get_logger
from prefect.server.database.dependencies import db_injector
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, db_injector
from prefect.types import DateTime

logger = get_logger("server.api")
Expand Down Expand Up @@ -57,7 +56,7 @@ async def run_history(
)

# create a CTE for timestamp intervals
intervals = db.make_timestamp_intervals(
intervals = db.queries.make_timestamp_intervals(
history_start,
history_end,
history_interval,
Expand All @@ -66,6 +65,7 @@ async def run_history(
# apply filters to the flow runs (and related states)
runs = (
await run_filter_function(
db,
sa.select(
run_model.id,
run_model.expected_start_time,
Expand All @@ -92,7 +92,7 @@ async def run_history(
# build a JSON object, ignoring the case where the count of runs is 0
sa.case(
(sa.func.count(runs.c.id) == 0, None),
else_=db.build_json_object(
else_=db.queries.build_json_object(
"state_type",
runs.c.state_type,
"state_name",
Expand Down Expand Up @@ -140,10 +140,10 @@ async def run_history(
counts.c.interval_start,
counts.c.interval_end,
sa.func.coalesce(
db.json_arr_agg(db.cast_to_json(counts.c.state_agg)).filter(
counts.c.state_agg.is_not(None)
),
sa.text("'[]'"),
db.queries.json_arr_agg(
db.queries.cast_to_json(counts.c.state_agg)
).filter(counts.c.state_agg.is_not(None)),
sa.literal("[]", literal_execute=True),
).label("states"),
)
.group_by(counts.c.interval_start, counts.c.interval_end)
Expand All @@ -157,7 +157,7 @@ async def run_history(
records = result.mappings()

# load and parse the record if the database returns JSON as strings
if db.uses_json_strings:
if db.queries.uses_json_strings:
records = [dict(r) for r in records]
for r in records:
r["states"] = json.loads(r["states"])
Expand Down
Loading

0 comments on commit d90d102

Please sign in to comment.