Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ibis check backend #1831

Merged
merged 6 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pandera/api/ibis/types.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
"""Ibis types."""

from typing import NamedTuple, Union
from typing import NamedTuple, Optional, Union

import ibis.expr.datatypes as dt
import ibis.expr.types as ir


class IbisData(NamedTuple):
table: ir.Table
key: Optional[str] = None
deepyaman marked this conversation as resolved.
Show resolved Hide resolved


class CheckResult(NamedTuple):
"""Check result for user-defined checks."""

Expand All @@ -15,6 +20,9 @@ class CheckResult(NamedTuple):
failure_cases: ir.Table


IbisCheckObjects = Union[ir.Table, ir.Column]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: pandera/engines/ibis_engine.py defines IbisObject = Union[ir.Column, ir.Table]; should that just import this instead, or could that set of types be different in some situation?



IbisDtypeInputTypes = Union[
str,
type,
Expand Down
159 changes: 159 additions & 0 deletions pandera/backends/ibis/checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""Check backend for Ibis."""

from functools import partial
from typing import Optional


import ibis
import ibis.expr.types as ir
from ibis.expr.types.groupby import GroupedTable
from ibis.expr.datatypes import core as idt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from ibis.expr.datatypes import core as idt
import ibis.expr.datatypes as dt

would be the "standard" Ibis import for this.

from multimethod import overload

from pandera.api.base.checks import CheckResult
from pandera.api.checks import Check
from pandera.api.ibis.types import IbisData
from pandera.backends.base import BaseCheckBackend

from pandera.constants import CHECK_OUTPUT_KEY


class IbisCheckBackend(BaseCheckBackend):
"""Check backend for Ibis."""

def __init__(self, check: Check):
"""Initializes a check backend object."""
super().__init__(check)
assert check._check_fn is not None, "Check._check_fn must be set."
self.check = check
self.check_fn = partial(check._check_fn, **check._check_kwargs)

def groupby(self, check_obj) -> GroupedTable:
"""Implements groupby behavior for check object."""
raise NotImplementedError

def query(self, check_obj: ir.Table):
"""Implements querying behavior to produce subset of check object."""
raise NotImplementedError

def aggregate(self, check_obj: ir.Table):
"""Implements aggregation behavior for check object."""
raise NotImplementedError

def preprocess(self, check_obj: ir.Table, key: Optional[str]):
"""Preprocesses a check object before applying the check function."""
# This handles the case of Series validation, which has no other context except
# for the index to groupby on. Right now grouping by the index is not allowed.
return check_obj

def apply(self, check_obj: IbisData):
"""Apply the check function to a check object."""
if self.check.element_wise:
columns = (
[check_obj.key] if check_obj.key else check_obj.table.columns
)
_fn = self.check_fn
out = check_obj.table.mutate(
**{col: _fn(check_obj.table[col]) for col in columns}
)
out = out.select(columns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need to import ibis.selectors as s above, but then you can:

Suggested change
columns = (
[check_obj.key] if check_obj.key else check_obj.table.columns
)
_fn = self.check_fn
out = check_obj.table.mutate(
**{col: _fn(check_obj.table[col]) for col in columns}
)
out = out.select(columns)
selector = s.cols(check_obj.key) if check_obj.key is not None else s.all()
out = check_obj.table.mutate(s.across(selector, self.check_fn)).select(
selector
)

More like the Polars implementation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! was scouring the ibis API for this but missed the across selector.

else:
out = self.check_fn(check_obj)

if isinstance(out, (ir.BooleanScalar, ir.BooleanColumn)):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(out, (ir.BooleanScalar, ir.BooleanColumn)):
if out.type().is_boolean():

return out
elif isinstance(out, ir.Table):
# for checks that return a boolean dataframe, make sure all columns
# are boolean and reduce to a single boolean column.
for _col, _dtype in out.schema().items():
assert isinstance(_dtype, idt.Boolean), (
f"column {_col} is not boolean. If check function "
"returns a dataframe, it must contain only boolean columns."
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? If so, what's the ideal behavior (for my understanding)?

I.e. would you like to (automatically?) cast to boolean if the check doesn't return a boolean, or should we explicitly make sure each of the columns is a boolean?

Regardless, we can ignore the idt import altogether by checking dtype.is_boolean().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you like to (automatically?) cast to boolean if the check doesn't return a boolean

No, this shouldn't be automatic... if a check function doesn't return a boolean it's a user/developer error.

The expectation of a check function is it should produce a scalar boolean, column boolean, or table boolean, since it's meant to produce a set of truth values about data.

bool_out = out.mutate(**{CHECK_OUTPUT_KEY: out.columns[0]})
for col in out.columns[1:]:
bool_out = bool_out.mutate(
**{CHECK_OUTPUT_KEY: bool_out[CHECK_OUTPUT_KEY] & out[col]}
)
bool_out = bool_out.select(CHECK_OUTPUT_KEY)
return bool_out
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool_out = out.mutate(**{CHECK_OUTPUT_KEY: out.columns[0]})
for col in out.columns[1:]:
bool_out = bool_out.mutate(
**{CHECK_OUTPUT_KEY: bool_out[CHECK_OUTPUT_KEY] & out[col]}
)
bool_out = bool_out.select(CHECK_OUTPUT_KEY)
return bool_out
return out.select(
reduce(lambda x, y: x & out[y], out.columns, ibis.literal(True)).name(
CHECK_OUTPUT_KEY
)
)

seems simpler (and more like the logic in the Polars backend)

else:
raise TypeError( # pragma: no cover
f"output type of check_fn not recognized: {type(out)}"
)

@overload
def postprocess(self, check_obj, check_output):
"""Postprocesses the result of applying the check function."""
raise TypeError( # pragma: no cover
f"output type of check_fn not recognized: {type(check_output)}"
)

@overload # type: ignore [no-redef]
def postprocess(
self,
check_obj: IbisData,
check_output: ir.BooleanScalar,
) -> CheckResult:
"""Postprocesses the result of applying the check function."""
return CheckResult(
check_output=check_output,
check_passed=check_output,
checked_object=check_obj,
failure_cases=None,
)

@overload # type: ignore [no-redef]
def postprocess(
self,
check_obj: IbisData,
check_output: ir.BooleanColumn,
) -> CheckResult:
"""Postprocesses the result of applying the check function."""
check_output = check_output.name(CHECK_OUTPUT_KEY)
failure_cases = check_obj.table.filter(~check_output)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to comment for postprocess(..., ir.Table) below, I think check_output will need to have the original input as well as check results for this to work reliably.

if check_obj.key is not None:
failure_cases = failure_cases.select(check_obj.key)
return CheckResult(
check_output=check_output,
check_passed=check_output.all(),
checked_object=check_obj,
failure_cases=failure_cases,
)

@overload # type: ignore [no-redef]
def postprocess(
self,
check_obj: IbisData,
check_output: ir.Table,
) -> CheckResult:
"""Postprocesses the result of applying the check function."""
passed = check_output[CHECK_OUTPUT_KEY].all()

_left = check_obj.table.mutate(_id=ibis.row_number())
_right = check_output.mutate(_id=ibis.row_number())
_t = _left.join(
check_output.mutate(_id=ibis.row_number()),
_left._id == _right._id,
how="inner",
).drop("_id")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think row order can be guaranteed like this.

I think the way to do this would be to create a set of result columns in apply() instead of overwriting the data columns.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you suggest a code change for this? I was racking my brain on how to basically get the failure cases from the original data based on the check output boolean values.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah; let me make a PR to your branch, probably easier since it'll span a couple functions. The output of apply() is not used raw (without postprocessing) anywhere, right?

Copy link
Collaborator

@deepyaman deepyaman Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cosmicBboy #1855 does this, with one caveat: instead of returning a table (in the case of a check function that takes a table and returns a table), I changed the API to return a dictionary of column names to expressions, because returning a table creates something that cannot be aligned with the original data.

This makes sense to me, but I wanted to see what you think.

(I also made most of the other changes suggested here in that PR.)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, the reason things work is because the tests are using the pandas backend. Now that I think about it, we should test with DuckDB or something else (could even be Polars, I suppose), because Ibis will drop the pandas backend. This will also provide a bit better guardrails around doing stuff that only works in pandas.


failure_cases = _t.filter(~_t[CHECK_OUTPUT_KEY]).drop(CHECK_OUTPUT_KEY)
if check_obj.key is not None:
failure_cases = failure_cases.select(check_obj.key)
return CheckResult(
check_output=check_output,
check_passed=passed,
checked_object=check_obj,
failure_cases=failure_cases,
)

def __call__(
self,
check_obj: ir.Table,
key: Optional[str] = None,
) -> CheckResult:
check_obj = self.preprocess(check_obj, key)
ibis_data = IbisData(check_obj, key)
check_output = self.apply(ibis_data)
return self.postprocess(ibis_data, check_output)
37 changes: 35 additions & 2 deletions pandera/backends/ibis/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ def validate(

# run the checks
core_checks = [
(self.check_dtype, (sample, schema)),
self.check_dtype,
self.run_checks,
]

for check, args in core_checks:
args = (sample, schema)
for check in core_checks:
results = check(*args)
if isinstance(results, CoreCheckResult):
results = [results]
Expand Down Expand Up @@ -114,3 +116,34 @@ def check_dtype(
message=msg,
failure_cases=failure_cases,
)

@validate_scope(scope=ValidationScope.DATA)
def run_checks(self, check_obj, schema) -> List[CoreCheckResult]:
check_results: List[CoreCheckResult] = []
for check_index, check in enumerate(schema.checks):
try:
check_results.append(
self.run_check(
check_obj,
schema,
check,
check_index,
schema.selector,
)
)
except Exception as err: # pylint: disable=broad-except
# catch other exceptions that may occur when executing the Check
err_msg = f'"{err.args[0]}"' if len(err.args) > 0 else ""
msg = f"{err.__class__.__name__}({err_msg})"
check_results.append(
CoreCheckResult(
passed=False,
check=check,
check_index=check_index,
reason_code=SchemaErrorReason.CHECK_ERROR,
message=msg,
failure_cases=msg,
original_exc=err,
)
)
return check_results
2 changes: 2 additions & 0 deletions pandera/backends/ibis/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def register_ibis_backends():
from pandera.api.ibis.container import DataFrameSchema
from pandera.backends.ibis.components import ColumnBackend
from pandera.backends.ibis.container import DataFrameSchemaBackend
from pandera.backends.ibis.checks import IbisCheckBackend

DataFrameSchema.register_backend(ir.Table, DataFrameSchemaBackend)
Column.register_backend(ir.Table, ColumnBackend)
Check.register_backend(ir.Table, IbisCheckBackend)
4 changes: 2 additions & 2 deletions pandera/backends/polars/checks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Check backend for pandas."""
"""Check backend for polars."""

from functools import partial
from typing import Optional
Expand All @@ -19,7 +19,7 @@


class PolarsCheckBackend(BaseCheckBackend):
"""Check backend ofr pandas."""
"""Check backend for polars."""

def __init__(self, check: Check):
"""Initializes a check backend object."""
Expand Down
1 change: 1 addition & 0 deletions pandera/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
from pandera.api.ibis.components import Column
from pandera.api.ibis.container import DataFrameSchema
from pandera.api.ibis.model import DataFrameModel
from pandera.api.ibis.types import IbisData
Loading
Loading