Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up PrefectDBInterface, use the models consistently #16392

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions docs/v3/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@
"type": "string",
"format": "date-time",
"description": "Only include runs that start or end after this time.",
"default": "0001-01-01T00:00:00",
"default": "0001-01-01T00:00:00+00:00",
"title": "Since"
},
"description": "Only include runs that start or end after this time."
Expand Down Expand Up @@ -20044,8 +20044,15 @@
"Graph": {
"properties": {
"start_time": {
"type": "string",
"format": "date-time",
"anyOf": [
{
"type": "string",
"format": "date-time"
},
{
"type": "null"
}
],
"title": "Start Time"
},
"end_time": {
Expand Down Expand Up @@ -20136,7 +20143,14 @@
"title": "Key"
},
"type": {
"type": "string",
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Type"
},
"is_latest": {
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ async def stop():
@database_app.command()
async def reset(yes: bool = typer.Option(False, "--yes", "-y")):
"""Drop and recreate all Prefect database tables"""
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database import provide_database_interface

db = provide_database_interface()
engine = await db.engine()
Expand Down Expand Up @@ -378,8 +378,8 @@ async def upgrade(
),
):
"""Upgrade the Prefect database"""
from prefect.server.database import provide_database_interface
from prefect.server.database.alembic_commands import alembic_upgrade
from prefect.server.database.dependencies import provide_database_interface

db = provide_database_interface()
engine = await db.engine()
Expand Down Expand Up @@ -418,8 +418,8 @@ async def downgrade(
),
):
"""Downgrade the Prefect database"""
from prefect.server.database import provide_database_interface
from prefect.server.database.alembic_commands import alembic_downgrade
from prefect.server.database.dependencies import provide_database_interface

db = provide_database_interface()

Expand Down
9 changes: 9 additions & 0 deletions src/prefect/client/schemas/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,15 @@ def result(
) -> Union[R, Exception]:
...

@overload
def result(
self: "State[R]",
raise_on_failure: bool = ...,
fetch: bool = ...,
retry_result_failure: bool = ...,
) -> Union[R, Exception]:
...

@deprecated.deprecated_parameter(
"fetch",
when=lambda fetch: fetch is not True,
Expand Down
10 changes: 2 additions & 8 deletions src/prefect/events/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from uuid import UUID

import pendulum
from pydantic import Field, PrivateAttr
from pydantic import Field

from prefect._internal.schemas.bases import PrefectBaseModel
from prefect.types import DateTime
Expand Down Expand Up @@ -41,18 +41,12 @@ class AutomationFilter(PrefectBaseModel):
class EventDataFilter(PrefectBaseModel, extra="forbid"): # type: ignore[call-arg]
"""A base class for filtering event data."""

_top_level_filter: Optional["EventFilter"] = PrivateAttr(None)

def get_filters(self) -> List["EventDataFilter"]:
filters: List["EventDataFilter"] = [
filter
for filter in [
getattr(self, name) for name, field in self.model_fields.items()
]
for filter in [getattr(self, name) for name in self.model_fields]
if isinstance(filter, EventDataFilter)
]
for filter in filters:
filter._top_level_filter = self._top_level_filter
return filters

def includes(self, event: Event) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/server/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from . import models, orchestration, schemas, services

__all__ = ["models", "orchestration", "schemas", "services"]
3 changes: 1 addition & 2 deletions src/prefect/server/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

import prefect
import prefect.settings
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/admin", tags=["Admin"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@

import prefect.server.api.dependencies as dependencies
from prefect.server import models
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.schemas import actions, core, filters, sorting
from prefect.server.utilities.server import PrefectRouter

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/automations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from prefect.server.api.validation import (
validate_job_variables_for_run_deployment_action,
)
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.events import actions
from prefect.server.events.filters import AutomationFilter, AutomationFilterCreated
from prefect.server.events.models import automations as automations_models
Expand Down
5 changes: 1 addition & 4 deletions src/prefect/server/api/block_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
from fastapi import Depends

from prefect.server import models
from prefect.server.database.dependencies import (
PrefectDBInterface,
provide_database_interface,
)
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/block_capabilities", tags=["Block capabilities"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/block_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

from prefect.server import models, schemas
from prefect.server.api import dependencies
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/block_documents", tags=["Block documents"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/block_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from prefect.server import models, schemas
from prefect.server.api import dependencies
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.models.block_schemas import MissingBlockTypeException
from prefect.server.utilities.server import PrefectRouter

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/block_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from prefect.blocks.core import _should_update_block_type
from prefect.server import models, schemas
from prefect.server.api import dependencies
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/block_types", tags=["Block types"])
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/concurrency_limits.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.api.concurrency_limits_v2 import MinimalConcurrencyLimitResponse
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.models import concurrency_limits
from prefect.server.utilities.server import PrefectRouter
from prefect.settings import PREFECT_TASK_RUN_TAG_CONCURRENCY_SLOT_WAIT_SECONDS
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/concurrency_limits_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.api.dependencies import LimitBody
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.schemas import actions
from prefect.server.utilities.schemas import PrefectBaseModel
from prefect.server.utilities.server import PrefectRouter
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/csrf_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

from prefect.logging import get_logger
from prefect.server import models, schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter
from prefect.settings import PREFECT_SERVER_CSRF_PROTECTION_ENABLED

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
validate_job_variables_for_deployment_flow_run,
)
from prefect.server.api.workers import WorkerLookups
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.exceptions import MissingVariableError, ObjectNotFoundError
from prefect.server.models.deployments import mark_deployments_ready
from prefect.server.models.workers import DEFAULT_AGENT_WORK_POOL_NAME
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

from prefect.logging import get_logger
from prefect.server.api.dependencies import is_ephemeral_request
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.events import messaging, stream
from prefect.server.events.counting import (
Countable,
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/flow_run_notification_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import prefect.server.api.dependencies as dependencies
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/flow_run_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/flow_run_states", tags=["Flow Run States"])
Expand Down
9 changes: 5 additions & 4 deletions src/prefect/server/api/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
Response,
status,
)
from fastapi.encoders import jsonable_encoder
from fastapi.responses import ORJSONResponse, PlainTextResponse, StreamingResponse
from sqlalchemy.exc import IntegrityError

Expand All @@ -29,8 +30,7 @@
from prefect.logging import get_logger
from prefect.server.api.run_history import run_history
from prefect.server.api.validation import validate_job_variables_for_deployment_flow_run
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.exceptions import FlowRunGraphTooLarge
from prefect.server.models.flow_runs import (
DependencyResult,
Expand Down Expand Up @@ -215,6 +215,7 @@ async def average_flow_run_lateness(
base_query = db.FlowRun.estimated_start_time_delta

query = await models.flow_runs._apply_flow_run_filters(
db,
sa.select(sa.func.avg(base_query)),
flow_filter=flows,
flow_run_filter=flow_runs,
Expand Down Expand Up @@ -321,8 +322,8 @@ async def read_flow_run_graph_v1(
@router.get("/{id:uuid}/graph-v2")
async def read_flow_run_graph_v2(
flow_run_id: UUID = Path(..., description="The flow run id", alias="id"),
since: datetime.datetime = Query(
datetime.datetime.min,
since: DateTime = Query(
default=jsonable_encoder(DateTime.min),
description="Only include runs that start or end after this time.",
),
db: PrefectDBInterface = Depends(provide_database_interface),
Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
import prefect.server.api.dependencies as dependencies
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.schemas.responses import FlowPaginationResponse
from prefect.server.utilities.server import PrefectRouter

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import prefect.server.api.dependencies as dependencies
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="/logs", tags=["Logs"])
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/server/api/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from prefect import settings
from prefect.server import models
from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database import provide_database_interface

NextMiddlewareFunction = Callable[[Request], Awaitable[Response]]

Expand Down
3 changes: 1 addition & 2 deletions src/prefect/server/api/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
from fastapi import Depends, status
from fastapi.responses import JSONResponse

from prefect.server.database.dependencies import provide_database_interface
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, provide_database_interface
from prefect.server.utilities.server import PrefectRouter

router = PrefectRouter(prefix="", tags=["Root"])
Expand Down
18 changes: 9 additions & 9 deletions src/prefect/server/api/run_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
import prefect.server.models as models
import prefect.server.schemas as schemas
from prefect.logging import get_logger
from prefect.server.database.dependencies import db_injector
from prefect.server.database.interface import PrefectDBInterface
from prefect.server.database import PrefectDBInterface, db_injector
from prefect.types import DateTime

logger = get_logger("server.api")
Expand Down Expand Up @@ -57,7 +56,7 @@ async def run_history(
)

# create a CTE for timestamp intervals
intervals = db.make_timestamp_intervals(
intervals = db.queries.make_timestamp_intervals(
history_start,
history_end,
history_interval,
Expand All @@ -66,6 +65,7 @@ async def run_history(
# apply filters to the flow runs (and related states)
runs = (
await run_filter_function(
db,
sa.select(
run_model.id,
run_model.expected_start_time,
Expand All @@ -92,7 +92,7 @@ async def run_history(
# build a JSON object, ignoring the case where the count of runs is 0
sa.case(
(sa.func.count(runs.c.id) == 0, None),
else_=db.build_json_object(
else_=db.queries.build_json_object(
"state_type",
runs.c.state_type,
"state_name",
Expand Down Expand Up @@ -140,10 +140,10 @@ async def run_history(
counts.c.interval_start,
counts.c.interval_end,
sa.func.coalesce(
db.json_arr_agg(db.cast_to_json(counts.c.state_agg)).filter(
counts.c.state_agg.is_not(None)
),
sa.text("'[]'"),
db.queries.json_arr_agg(
db.queries.cast_to_json(counts.c.state_agg)
).filter(counts.c.state_agg.is_not(None)),
sa.literal("[]", literal_execute=True),
).label("states"),
)
.group_by(counts.c.interval_start, counts.c.interval_end)
Expand All @@ -157,7 +157,7 @@ async def run_history(
records = result.mappings()

# load and parse the record if the database returns JSON as strings
if db.uses_json_strings:
if db.queries.uses_json_strings:
records = [dict(r) for r in records]
for r in records:
r["states"] = json.loads(r["states"])
Expand Down
Loading
Loading