From e960c9a3693131c46d0f7969d03b89efa699ce55 Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Mon, 9 Dec 2024 19:22:01 -0500 Subject: [PATCH] Ibis check backend (#1831) * [wip] add minimal ibis check backend implementation Signed-off-by: cosmicBboy * support scalar, column, and table check output types Signed-off-by: cosmicBboy * support scalar, column, and table check output types Signed-off-by: cosmicBboy * Ibis check backend suggestions (#1855) * Apply suggestions from code review Signed-off-by: Deepyaman Datta * Fix row-order-dependent order by adding table cols Signed-off-by: Deepyaman Datta --------- Signed-off-by: Deepyaman Datta * fix lint Signed-off-by: cosmicBboy * fix unit tests Signed-off-by: cosmicBboy --------- Signed-off-by: cosmicBboy Signed-off-by: Deepyaman Datta Co-authored-by: Deepyaman Datta Signed-off-by: Deepyaman Datta --- pandera/api/ibis/types.py | 9 +- pandera/backends/ibis/checks.py | 160 ++++++++++++++++++ pandera/backends/ibis/components.py | 37 ++++- pandera/backends/ibis/register.py | 2 + pandera/backends/ibis/utils.py | 10 ++ pandera/backends/pandas/checks.py | 2 +- pandera/backends/polars/checks.py | 4 +- pandera/ibis.py | 1 + tests/ibis/test_ibis_check.py | 242 ++++++++++++++++++++++++++++ 9 files changed, 461 insertions(+), 6 deletions(-) create mode 100644 pandera/backends/ibis/checks.py create mode 100644 pandera/backends/ibis/utils.py create mode 100644 tests/ibis/test_ibis_check.py diff --git a/pandera/api/ibis/types.py b/pandera/api/ibis/types.py index 6c953e3af..ea4a4e78c 100644 --- a/pandera/api/ibis/types.py +++ b/pandera/api/ibis/types.py @@ -1,11 +1,16 @@ """Ibis types.""" -from typing import NamedTuple, Union +from typing import NamedTuple, Optional, Union import ibis.expr.datatypes as dt import ibis.expr.types as ir +class IbisData(NamedTuple): + table: ir.Table + key: Optional[str] = None + + class CheckResult(NamedTuple): """Check result for user-defined checks.""" @@ -15,6 +20,8 @@ class CheckResult(NamedTuple): failure_cases: ir.Table +IbisCheckObjects = Union[ir.Table, ir.Column] + IbisDtypeInputTypes = Union[ str, type, diff --git a/pandera/backends/ibis/checks.py b/pandera/backends/ibis/checks.py new file mode 100644 index 000000000..f0ca8d8e4 --- /dev/null +++ b/pandera/backends/ibis/checks.py @@ -0,0 +1,160 @@ +"""Check backend for Ibis.""" + +from functools import partial +from typing import Optional + + +import ibis +import ibis.expr.types as ir +from ibis import _, selectors as s +from ibis.expr.types.groupby import GroupedTable +from multimethod import overload + +from pandera.api.base.checks import CheckResult +from pandera.api.checks import Check +from pandera.api.ibis.types import IbisData +from pandera.backends.base import BaseCheckBackend +from pandera.backends.ibis.utils import select_column +from pandera.constants import CHECK_OUTPUT_KEY + +CHECK_OUTPUT_SUFFIX = f"__{CHECK_OUTPUT_KEY}__" + + +class IbisCheckBackend(BaseCheckBackend): + """Check backend for Ibis.""" + + def __init__(self, check: Check): + """Initializes a check backend object.""" + super().__init__(check) + assert check._check_fn is not None, "Check._check_fn must be set." + self.check = check + self.check_fn = partial(check._check_fn, **check._check_kwargs) + + def groupby(self, check_obj) -> GroupedTable: + """Implements groupby behavior for check object.""" + raise NotImplementedError + + def query(self, check_obj: ir.Table): + """Implements querying behavior to produce subset of check object.""" + raise NotImplementedError + + def aggregate(self, check_obj: ir.Table): + """Implements aggregation behavior for check object.""" + raise NotImplementedError + + def preprocess(self, check_obj: ir.Table, key: Optional[str]): + """Preprocesses a check object before applying the check function.""" + # This handles the case of Series validation, which has no other context except + # for the index to groupby on. Right now grouping by the index is not allowed. + return check_obj + + def apply(self, check_obj: IbisData): + """Apply the check function to a check object.""" + if self.check.element_wise: + selector = ( + select_column(check_obj.key) + if check_obj.key is not None + else s.all() + ) + out = check_obj.table.mutate( + s.across( + selector, self.check_fn, f"{{col}}{CHECK_OUTPUT_SUFFIX}" + ) + ).select(selector | s.endswith(CHECK_OUTPUT_SUFFIX)) + else: + out = self.check_fn(check_obj) + if isinstance(out, dict): + out = check_obj.table.mutate( + **{f"{k}{CHECK_OUTPUT_SUFFIX}": v for k, v in out.items()} + ) + + if isinstance(out, ir.Table): + # for checks that return a boolean dataframe, make sure all columns + # are boolean and reduce to a single boolean column. + acc = ibis.literal(True) + for col in out.columns: + if col.endswith(CHECK_OUTPUT_SUFFIX): + assert out[col].type().is_boolean(), ( + f"column '{col[: -len(CHECK_OUTPUT_SUFFIX)]}' " + "is not boolean. If check function returns a " + "dataframe, it must contain only boolean columns." + ) + acc = acc & out[col] + return out.mutate({CHECK_OUTPUT_KEY: acc}) + elif out.type().is_boolean(): + return out + else: + raise TypeError( # pragma: no cover + f"output type of check_fn not recognized: {type(out)}" + ) + + @overload + def postprocess(self, check_obj, check_output): + """Postprocesses the result of applying the check function.""" + raise TypeError( # pragma: no cover + f"output type of check_fn not recognized: {type(check_output)}" + ) + + @overload # type: ignore [no-redef] + def postprocess( + self, + check_obj: IbisData, + check_output: ir.BooleanScalar, + ) -> CheckResult: + """Postprocesses the result of applying the check function.""" + return CheckResult( + check_output=check_output, + check_passed=check_output, + checked_object=check_obj, + failure_cases=None, + ) + + @overload # type: ignore [no-redef] + def postprocess( + self, + check_obj: IbisData, + check_output: ir.BooleanColumn, + ) -> CheckResult: + """Postprocesses the result of applying the check function.""" + check_output = check_output.name(CHECK_OUTPUT_KEY) + failure_cases = check_obj.table.filter(~check_output) + if check_obj.key is not None: + failure_cases = failure_cases.select(check_obj.key) + return CheckResult( + check_output=check_output, + check_passed=check_output.all(), + checked_object=check_obj, + failure_cases=failure_cases, + ) + + @overload # type: ignore [no-redef] + def postprocess( + self, + check_obj: IbisData, + check_output: ir.Table, + ) -> CheckResult: + """Postprocesses the result of applying the check function.""" + passed = check_output[CHECK_OUTPUT_KEY].all() + failure_cases = check_output.filter(~_[CHECK_OUTPUT_KEY]).drop( + s.endswith(f"__{CHECK_OUTPUT_KEY}__") + | select_column(CHECK_OUTPUT_KEY) + ) + + if check_obj.key is not None: + failure_cases = failure_cases.select(check_obj.key) + return CheckResult( + check_output=check_output.select(CHECK_OUTPUT_KEY), + check_passed=passed, + checked_object=check_obj, + failure_cases=failure_cases, + ) + + def __call__( + self, + check_obj: ir.Table, + key: Optional[str] = None, + ) -> CheckResult: + check_obj = self.preprocess(check_obj, key) + ibis_data = IbisData(check_obj, key) + check_output = self.apply(ibis_data) + return self.postprocess(ibis_data, check_output) diff --git a/pandera/backends/ibis/components.py b/pandera/backends/ibis/components.py index 4633ca6bd..57987a06e 100644 --- a/pandera/backends/ibis/components.py +++ b/pandera/backends/ibis/components.py @@ -42,10 +42,12 @@ def validate( # run the checks core_checks = [ - (self.check_dtype, (sample, schema)), + self.check_dtype, + self.run_checks, ] - for check, args in core_checks: + args = (sample, schema) + for check in core_checks: results = check(*args) if isinstance(results, CoreCheckResult): results = [results] @@ -114,3 +116,34 @@ def check_dtype( message=msg, failure_cases=failure_cases, ) + + @validate_scope(scope=ValidationScope.DATA) + def run_checks(self, check_obj, schema) -> List[CoreCheckResult]: + check_results: List[CoreCheckResult] = [] + for check_index, check in enumerate(schema.checks): + try: + check_results.append( + self.run_check( + check_obj, + schema, + check, + check_index, + schema.selector, + ) + ) + except Exception as err: # pylint: disable=broad-except + # catch other exceptions that may occur when executing the Check + err_msg = f'"{err.args[0]}"' if len(err.args) > 0 else "" + msg = f"{err.__class__.__name__}({err_msg})" + check_results.append( + CoreCheckResult( + passed=False, + check=check, + check_index=check_index, + reason_code=SchemaErrorReason.CHECK_ERROR, + message=msg, + failure_cases=msg, + original_exc=err, + ) + ) + return check_results diff --git a/pandera/backends/ibis/register.py b/pandera/backends/ibis/register.py index 9e983163c..99a467e4f 100644 --- a/pandera/backends/ibis/register.py +++ b/pandera/backends/ibis/register.py @@ -16,6 +16,8 @@ def register_ibis_backends(): from pandera.api.ibis.container import DataFrameSchema from pandera.backends.ibis.components import ColumnBackend from pandera.backends.ibis.container import DataFrameSchemaBackend + from pandera.backends.ibis.checks import IbisCheckBackend DataFrameSchema.register_backend(ir.Table, DataFrameSchemaBackend) Column.register_backend(ir.Table, ColumnBackend) + Check.register_backend(ir.Table, IbisCheckBackend) diff --git a/pandera/backends/ibis/utils.py b/pandera/backends/ibis/utils.py new file mode 100644 index 000000000..26bf50373 --- /dev/null +++ b/pandera/backends/ibis/utils.py @@ -0,0 +1,10 @@ +"""Utility functions for the Ibis backend.""" + +from ibis import selectors as s + + +def select_column(*names): + """Select a column from a table.""" + if hasattr(s, "cols"): + return s.cols(*names) + return s.c(*names) diff --git a/pandera/backends/pandas/checks.py b/pandera/backends/pandas/checks.py index ee415a848..a007679d2 100644 --- a/pandera/backends/pandas/checks.py +++ b/pandera/backends/pandas/checks.py @@ -17,7 +17,7 @@ class PandasCheckBackend(BaseCheckBackend): - """Check backend ofr pandas.""" + """Check backend for pandas.""" def __init__(self, check: Check): """Initializes a check backend object.""" diff --git a/pandera/backends/polars/checks.py b/pandera/backends/polars/checks.py index 3a02d97af..19b37fc7f 100644 --- a/pandera/backends/polars/checks.py +++ b/pandera/backends/polars/checks.py @@ -1,4 +1,4 @@ -"""Check backend for pandas.""" +"""Check backend for polars.""" from functools import partial from typing import Optional @@ -18,7 +18,7 @@ class PolarsCheckBackend(BaseCheckBackend): - """Check backend ofr pandas.""" + """Check backend for polars.""" def __init__(self, check: Check): """Initializes a check backend object.""" diff --git a/pandera/ibis.py b/pandera/ibis.py index d0a1405c3..41b2a97db 100644 --- a/pandera/ibis.py +++ b/pandera/ibis.py @@ -6,3 +6,4 @@ from pandera.api.ibis.components import Column from pandera.api.ibis.container import DataFrameSchema from pandera.api.ibis.model import DataFrameModel +from pandera.api.ibis.types import IbisData diff --git a/tests/ibis/test_ibis_check.py b/tests/ibis/test_ibis_check.py new file mode 100644 index 000000000..d3310bc53 --- /dev/null +++ b/tests/ibis/test_ibis_check.py @@ -0,0 +1,242 @@ +"""Unit tests for the Ibis check backend.""" + +from typing import Dict + +import pytest + +import pandas as pd +import ibis +import ibis.expr.types as ir +import pandera.ibis as pa +from pandera.backends.ibis.register import register_ibis_backends +from pandera.constants import CHECK_OUTPUT_KEY + + +@pytest.fixture(autouse=True) +def _register_ibis_backends(): + register_ibis_backends() + + +@pytest.fixture +def column_t(): + return ibis.memtable(pd.DataFrame({"col": [1, 2, 3, 4]})) + + +@pytest.fixture +def t(): + return ibis.memtable( + pd.DataFrame({"col_1": [1, 2, 3, 4], "col_2": [1, 2, 3, 4]}) + ) + + +def _column_check_fn_col_out(data: pa.IbisData) -> ir.Table: + return data.table[data.key] > 0 + + +def _column_check_fn_scalar_out(data: pa.IbisData) -> ir.Table: + return (data.table[data.key] > 0).all() + + +@pytest.mark.parametrize( + "check_fn, invalid_data, expected_output", + [ + [_column_check_fn_col_out, [-1, 2, 3, -2], [False, True, True, False]], + [_column_check_fn_scalar_out, [-1, 2, 3, -2], False], + ], +) +def test_ibis_column_check( + column_t, + check_fn, + invalid_data, + expected_output, +): + check = pa.Check(check_fn) + check_result = check(column_t, column="col") + assert check_result.check_passed.execute() + + invalid_df = ibis.memtable(pd.DataFrame({"col": invalid_data})) + invalid_check_result = check(invalid_df, column="col") + assert not invalid_check_result.check_passed.execute() + + check_output = invalid_check_result.check_output.execute() + if isinstance(expected_output, list): + assert check_output.tolist() == expected_output + else: + assert check_output == expected_output + + +def _df_check_fn_dict_out(data: pa.IbisData) -> Dict[str, ir.BooleanColumn]: + return {col: data.table[col] >= 0 for col in data.table.columns} + + +def _df_check_fn_col_out(data: pa.IbisData) -> ir.BooleanColumn: + return data.table["col_1"] >= data.table["col_2"] + + +def _df_check_fn_scalar_out(data: pa.IbisData) -> ir.BooleanScalar: + acc = data.table[data.table.columns[0]] >= 0 + for col in data.table.columns[1:]: + acc &= data.table[col] >= 0 + return acc.all() + + +@pytest.mark.parametrize( + "check_fn, invalid_data, expected_output", + [ + [ + _df_check_fn_dict_out, + { + "col_1": [-1, 2, -3, 4], + "col_2": [1, 2, 3, -4], + }, + [False, True, False, False], + ], + [ + _df_check_fn_col_out, + { + "col_1": [1, 2, 3, 4], + "col_2": [2, 1, 2, 5], + }, + [False, True, True, False], + ], + [ + _df_check_fn_scalar_out, + { + "col_1": [-1, 2, 3, 4], + "col_2": [2, 1, 2, 5], + }, + [False], + ], + ], +) +def test_ibis_table_check( + t, + check_fn, + invalid_data, + expected_output, +): + check = pa.Check(check_fn) + check_result = check(t) + assert check_result.check_passed.as_scalar().execute() + + invalid_t = ibis.memtable(pd.DataFrame(invalid_data)) + invalid_check_result = check(invalid_t) + assert not invalid_check_result.check_passed.as_scalar().execute() + + if isinstance(invalid_check_result.check_output, ir.Table): + output = ( + invalid_check_result.check_output[CHECK_OUTPUT_KEY] + .to_pandas() + .to_list() + ) + elif isinstance(invalid_check_result.check_output, ir.BooleanColumn): + output = invalid_check_result.check_output.to_pandas().to_list() + elif isinstance(invalid_check_result.check_output, ir.BooleanScalar): + output = invalid_check_result.check_output.as_scalar().execute() + else: + raise ValueError( + f"Invalid check output type: {type(invalid_check_result.check_output)}" + ) + + assert output == expected_output + + +@ibis.udf.scalar.python +def _element_wise_check_fn(x: int) -> bool: + return x >= 0 + + +def test_ibis_element_wise_column_check(column_t): + + check = pa.Check(_element_wise_check_fn, element_wise=True) + check_result = check(column_t, column="col") + assert check_result.check_passed.execute() + + invalid_t = ibis.memtable(pd.DataFrame({"col": [-1, 2, 3, -2]})) + invalid_check_result = check(invalid_t, column="col") + assert not invalid_check_result.check_passed.execute() + failure_cases = invalid_check_result.failure_cases.execute() + expected_failure_cases = pd.DataFrame({"col": [-1, -2]}) + assert failure_cases.equals(expected_failure_cases) + + +def test_ibis_element_wise_dataframe_check(t): + + check = pa.Check(_element_wise_check_fn, element_wise=True) + check_result = check(t) + assert check_result.check_passed.execute() + + invalid_t = ibis.memtable( + pd.DataFrame({"col_1": [-1, 2, -3, 4], "col_2": [1, 2, 3, -4]}) + ) + invalid_check_result = check(invalid_t) + assert not invalid_check_result.check_passed.execute() + failure_cases = invalid_check_result.failure_cases.execute() + expected_failure_cases = pd.DataFrame( + {"col_1": [-1, -3, 4], "col_2": [1, 3, -4]} + ) + assert failure_cases.equals(expected_failure_cases) + + +def test_ibis_element_wise_dataframe_different_dtypes(): + # Custom check function + @ibis.udf.scalar.python + def check_gt_2(v: int) -> bool: + return v > 2 + + @ibis.udf.scalar.python + def check_len_ge_2(v: str) -> bool: + return len(v) >= 2 + + t = ibis.memtable( + pd.DataFrame( + {"int_col": [1, 2, 3, 4], "str_col": ["aaa", "bb", "c", "dd"]} + ) + ) + + _check_gt_2 = pa.Check(check_gt_2, element_wise=True) + _check_len_ge_2 = pa.Check(check_len_ge_2, element_wise=True) + + check_result = _check_gt_2(t, column="int_col") + check_result.check_passed.execute() + + assert not check_result.check_passed.execute() + assert check_result.failure_cases.to_pandas().equals( + pd.DataFrame({"int_col": [1, 2]}) + ) + + check_result = _check_len_ge_2(t, column="str_col") + assert not check_result.check_passed.execute() + assert check_result.failure_cases.to_pandas().equals( + pd.DataFrame({"str_col": ["c"]}) + ) + + +def test_ibis_custom_check(): + t = ibis.memtable( + pd.DataFrame( + {"column1": [None, "x", "y"], "column2": ["a", None, "c"]} + ) + ) + + def custom_check(data: pa.IbisData) -> ir.Table: + both_null = data.table.column1.isnull() & data.table.column2.isnull() + return ~both_null + + check = pa.Check(custom_check) + check_result = check(t) + assert check_result.check_passed.execute() + assert check_result.failure_cases.execute().empty + + invalid_t = ibis.memtable( + pd.DataFrame( + {"column1": [None, "x", "y"], "column2": [None, None, "c"]} + ) + ) + invalid_check_result = check(invalid_t) + assert not invalid_check_result.check_passed.execute() + failure_cases = invalid_check_result.failure_cases.execute() + expected_failure_cases = pd.DataFrame( + {"column1": [None], "column2": [None]} + ) + assert failure_cases.equals(expected_failure_cases)