Skip to content

Commit

Permalink
clean/translate to 2.0 style queries in the sensitive data fixture
Browse files Browse the repository at this point in the history
  • Loading branch information
jacquesfize committed Feb 13, 2024
1 parent cfa4645 commit 24998c1
Showing 1 changed file with 28 additions and 57 deletions.
85 changes: 28 additions & 57 deletions backend/geonature/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ def synthese_data(app, users, datasets, source, sources_modules):
@pytest.fixture()
def synthese_sensitive_data(app, users, datasets, source):
data = {}
deb = time.time()

# Retrieve all the taxa with a protection status, and the corresponding areas
cte_taxa_area_with_status = (
sa.select(TaxrefBdcStatutTaxon.cd_nom, LAreas.id_area)
Expand Down Expand Up @@ -493,17 +493,10 @@ def synthese_sensitive_data(app, users, datasets, source):
# Retrieve a cd_nom and point that fit both a sensitivity rule and a protection status
sensitive_protected_cd_nom, sensitive_protected_id_area = db.session.execute(
sa.select(
cte_taxa_area_with_status.c.cd_nom,
cte_taxa_area_with_status.c.id_area,
)
.join(
cte_taxa_area_with_sensitivity,
sa.and_(
cte_taxa_area_with_status.c.cd_nom == cte_taxa_area_with_sensitivity.c.cd_nom,
cte_taxa_area_with_status.c.id_area == cte_taxa_area_with_sensitivity.c.id_area,
),
cte_taxa_area_with_sensitivity.c.cd_nom,
cte_taxa_area_with_sensitivity.c.id_area,
)
.order_by(cte_taxa_area_with_status.c.cd_nom)
.order_by(cte_taxa_area_with_sensitivity.c.cd_nom)
.limit(1)
).first()

Expand All @@ -514,13 +507,6 @@ def synthese_sensitive_data(app, users, datasets, source):
) # Allier
).scalar_one()

unsensitive_area = db.session.execute(
sa.select(LAreas).where(
LAreas.area_type.has(BibAreasTypes.type_code == "DEP"),
LAreas.area_code == "01",
) # Ain
).scalar_one()

sensitive_protected_taxon = db.session.execute(
select(Taxref).filter_by(cd_nom=139)
).scalar_one() # Triton crété
Expand All @@ -542,24 +528,6 @@ def synthese_sensitive_data(app, users, datasets, source):
sensitivity_rule.criterias == []
), "Le référentiel de sensibilité ne convient pas aux tests"

sensitivity_rule = db.session.scalars(
select(SensitivityRule)
.where(
SensitivityRule.cd_nom == sensitive_protected_taxon.cd_nom,
SensitivityRule.areas.any(LAreas.id_area == unsensitive_area.id_area),
)
.join_from(
cte_taxa_area_with_sensitivity,
cte_taxa_area_with_status,
sa.and_(
cte_taxa_area_with_status.c.cd_nom == cte_taxa_area_with_sensitivity.c.cd_nom,
cte_taxa_area_with_status.c.id_area == cte_taxa_area_with_sensitivity.c.id_area,
),
)
.order_by(cte_taxa_area_with_status.c.cd_nom)
.limit(1)
).first()

sensitivity_rule = db.session.scalars(
sa.select(SensitivityRule)
.where(
Expand All @@ -582,16 +550,16 @@ def synthese_sensitive_data(app, users, datasets, source):

# Add a criteria to the sensitivity rule if needed
id_nomenclature_bio_status = None
id_type_nomenclature_bio_status = (
BibNomenclaturesTypes.query.filter(BibNomenclaturesTypes.mnemonique == "STATUT_BIO")
.one()
.id_type
id_type_nomenclature_bio_status = db.session.scalar(
sa.select(BibNomenclaturesTypes.id_type).where(
BibNomenclaturesTypes.mnemonique == "STATUT_BIO"
)
)
id_nomenclature_behaviour = None
id_type_nomenclature_behaviour = (
BibNomenclaturesTypes.query.filter(BibNomenclaturesTypes.mnemonique == "OCC_COMPORTEMENT")
.one()
.id_type
id_type_nomenclature_behaviour = db.session.scalar(
sa.select(BibNomenclaturesTypes.id_type).where(
BibNomenclaturesTypes.mnemonique == "OCC_COMPORTEMENT"
)
)

# Get one criteria for the sensitivity rule if needed
Expand Down Expand Up @@ -652,7 +620,7 @@ def synthese_sensitive_data(app, users, datasets, source):
),
]:
geom = point
taxon = Taxref.query.filter_by(cd_nom=cd_nom).one()
taxon = db.session.execute(select(Taxref).filter_by(cd_nom=cd_nom)).scalar_one()
kwargs = {}
if id_nomenclature_bio_status:
kwargs["id_nomenclature_bio_status"] = id_nomenclature_bio_status
Expand All @@ -666,13 +634,17 @@ def synthese_sensitive_data(app, users, datasets, source):

# Assert that obs_sensitive_protected is a sensitive observation

synthese_to_assert = Synthese.query.filter(
Synthese.id_synthese.in_(
(
data[key].id_synthese
for key in ["obs_sensitive_protected", "obs_sensitive_protected_2"]
synthese_to_assert = db.session.scalars(
sa.select(Synthese)
.where(
Synthese.id_synthese.in_(
(
data[key].id_synthese
for key in ["obs_sensitive_protected", "obs_sensitive_protected_2"]
)
)
)
.limit(1)
).first()

assert synthese_to_assert.id_nomenclature_sensitivity != id_nomenclature_not_sensitive, (
Expand All @@ -684,20 +656,19 @@ def synthese_sensitive_data(app, users, datasets, source):

# Assert that obs_protected_not_sensitive is not a sensitive observation
assert (
Synthese.query.filter(
Synthese.id_synthese == data["obs_protected_not_sensitive"].id_synthese
db.session.scalar(
select(Synthese.id_nomenclature_sensitivity)
.where(Synthese.id_synthese == data["obs_protected_not_sensitive"].id_synthese)
.limit(1)
)
.limit(1)
.first()
.id_nomenclature_sensitivity
== id_nomenclature_not_sensitive
)
assert data["obs_sensitive_protected"].nomenclature_sensitivity.cd_nomenclature == "2"
assert data["obs_sensitive_protected_2"].nomenclature_sensitivity.cd_nomenclature == "2"
assert data["obs_protected_not_sensitive"].nomenclature_sensitivity.cd_nomenclature == "0"

for s in data.values():
assert db.session.execute(
assert db.session.scalar(
sa.exists(
sa.select(sa.literal(1))
.select_from(TaxrefBdcStatutTaxon)
Expand All @@ -724,7 +695,7 @@ def synthese_sensitive_data(app, users, datasets, source):
.where(TaxrefBdcStatutTaxon.cd_nom == s.cd_nom)
.where(TaxrefBdcStatutText.enable == True)
).select()
).scalar()
)

return data

Expand Down

0 comments on commit 24998c1

Please sign in to comment.