Skip to content

Commit

Permalink
add fixture for protected taxon + relabel label for sensitive data + …
Browse files Browse the repository at this point in the history
…add protected taxon test
  • Loading branch information
jacquesfize committed Feb 16, 2024
1 parent c5a9a44 commit 6138d47
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 168 deletions.
188 changes: 69 additions & 119 deletions backend/geonature/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

from apptax.taxonomie.models import (
Taxref,
TaxrefBdcStatutCorTextValues,
TaxrefBdcStatutTaxon,
TaxrefBdcStatutText,
bdc_statut_cor_text_area,
)
from geonature import create_app
from geonature.core.gn_commons.models import BibTablesLocation, TMedias, TModules
Expand All @@ -35,7 +32,6 @@
Synthese,
TReport,
TSources,
corAreaSynthese,
)
from geonature.core.sensitivity.models import SensitivityRule, cor_sensitivity_area
from geonature.utils.env import db
Expand All @@ -53,6 +49,7 @@
"acquisition_frameworks",
"synthese_data",
"synthese_sensitive_data",
"synthese_with_protected_status",
"source",
"reports_data",
"medium",
Expand Down Expand Up @@ -557,53 +554,18 @@ def synthese_sensitive_data(app, users, datasets, source):
data = {}

# Retrieve a cd_nom and point that fit both a sensitivity rule and a protection status
sensitive_protected_cd_nom, sensitive_protected_id_area = db.session.execute(
sa.select(SensitivityRule.cd_nom, cor_sensitivity_area.c.id_area)
.select_from(SensitivityRule)
sensitivity_rule = db.session.execute(
sa.select(SensitivityRule)
.join(cor_sensitivity_area, SensitivityRule.id == cor_sensitivity_area.c.id_sensitivity)
.filter(SensitivityRule.active == True)
.join(LAreas, cor_sensitivity_area.c.id_area == LAreas.id_area)
.where(SensitivityRule.active == True)
.limit(1)
).first()

sensitive_area = db.session.execute(
sa.select(LAreas).where(
LAreas.area_type.has(BibAreasTypes.type_code == "DEP"),
LAreas.area_code == "03",
) # Allier
).scalar_one()

sensitive_protected_taxon = db.session.execute(
select(Taxref).filter_by(cd_nom=139)
).scalar_one() # Triton crété

sensitivity_rule = db.session.scalars(
select(SensitivityRule)
.where(
SensitivityRule.cd_nom == sensitive_protected_taxon.cd_nom,
SensitivityRule.areas.any(LAreas.id_area == sensitive_area.id_area),
SensitivityRule.nomenclature_sensitivity.has(
TNomenclatures.cd_nomenclature == "2"
), # diffusion à la maille 10
)
.limit(1)
).first()

assert sensitivity_rule, "Le référentiel de sensibilité ne convient pas aux tests"
assert (
sensitivity_rule.criterias == []
), "Le référentiel de sensibilité ne convient pas aux tests"

sensitivity_rule = db.session.scalars(
sa.select(SensitivityRule)
.where(
SensitivityRule.cd_nom == sensitive_protected_cd_nom,
SensitivityRule.areas.any(LAreas.id_area == sensitive_protected_id_area),
)
.limit(1)
).first()
sensitive_cd_nom, sensitive_id_area = sensitivity_rule.cd_nom, sensitivity_rule.areas[0].id_area

sensitive_protected_area = db.session.scalars(
select(LAreas).where(LAreas.id_area == sensitive_protected_id_area).limit(1)
select(LAreas).where(LAreas.id_area == sensitive_id_area).limit(1)
).first()

# Get one point inside the area : the centroid (assuming the area is convex)
Expand All @@ -628,9 +590,8 @@ def synthese_sensitive_data(app, users, datasets, source):
)

# Get one criteria for the sensitivity rule if needed
list_criterias_for_sensitivity_rule = sensitivity_rule.criterias
if list_criterias_for_sensitivity_rule:
one_criteria_for_sensitive_rule = list_criterias_for_sensitivity_rule[0]
if sensitivity_rule.criterias:
one_criteria_for_sensitive_rule = sensitivity_rule.criterias[0]
id_type_criteria_for_sensitive_rule = one_criteria_for_sensitive_rule.id_type
if id_type_criteria_for_sensitive_rule == id_type_nomenclature_bio_status:
id_nomenclature_bio_status = one_criteria_for_sensitive_rule.id_nomenclature
Expand All @@ -647,37 +608,45 @@ def synthese_sensitive_data(app, users, datasets, source):
) # Ain
).one()

unsensitive_protected_taxon = db.session.execute(
unsensitive_taxon = db.session.execute(
select(Taxref.cd_nom).filter_by(cd_nom=64357)
).scalar_one()

with db.session.begin_nested():
for name, cd_nom, point, ds, comment_description, id_nomenclature_sensitivity in [
obs_metadata = [
(
"obs_sensitive_protected",
sensitive_protected_cd_nom,
"obs_sensitive",
sensitive_cd_nom,
sensitive_protected_point,
datasets["own_dataset"],
"obs_sensitive_protected",
"obs_sensitive",
id_nomenclature_sensitive,
),
(
"obs_protected_not_sensitive",
unsensitive_protected_taxon,
"obs_not_sensitive",
unsensitive_taxon,
unsensitive_area_centroid,
datasets["own_dataset"],
"obs_protected_not_sensitive",
"obs_not_sensitive",
id_nomenclature_not_sensitive,
),
(
"obs_sensitive_protected_2",
sensitive_protected_cd_nom,
"obs_sensitive_2",
sensitive_cd_nom,
sensitive_protected_point,
datasets["associate_2_dataset_sensitive"],
"obs_sensitive_protected_2",
"obs_sensitive_2",
id_nomenclature_sensitive,
),
]:
]
for (
name,
cd_nom,
point,
ds,
comment_description,
id_nomenclature_sensitivity,
) in obs_metadata:
geom = point
taxon = db.session.execute(select(Taxref).filter_by(cd_nom=cd_nom)).scalar_one()
kwargs = {}
Expand All @@ -691,72 +660,53 @@ def synthese_sensitive_data(app, users, datasets, source):
db.session.add(s)
data[name] = s

# Assert that obs_sensitive_protected is a sensitive observation
return data


synthese_to_assert = db.session.scalars(
sa.select(Synthese)
@pytest.fixture()
def synthese_with_protected_status(users, datasets, source):
"""
Generate synthese observations with protected status
Parameters
----------
users : pytest.Fixture
users fixture
datasets : pytest.Fixture
datasets fixture
source : pytest.Fixture
source fixture
Returns
-------
pytest.Fixture
fixture with synthese observations
"""
synthese_element = {}

# Retrieve protected taxon from the bdc_statut_taxons table
protected_taxon = db.session.scalars(
select(Taxref)
.where(
Synthese.id_synthese.in_(
(
data[key].id_synthese
for key in ["obs_sensitive_protected", "obs_sensitive_protected_2"]
)
Taxref.cd_nom.in_(
select(TaxrefBdcStatutTaxon.cd_nom).limit(100),
)
)
.limit(1)
).first()
.distinct(Taxref.cd_ref, Taxref.cd_nom)
.limit(5)
).all()

assert synthese_to_assert.id_nomenclature_sensitivity != id_nomenclature_not_sensitive, (
f"cd_nom: {synthese_to_assert.cd_nom}, id_nomenclature_bio_status: {synthese_to_assert.id_nomenclature_bio_status}, "
f"id_nomenclature_behaviour: {synthese_to_assert.id_nomenclature_behaviour}, "
f"id_nomenclature_sensitivity: {synthese_to_assert.id_nomenclature_sensitivity}, "
f"geojson: {synthese_to_assert.the_geom_4326_geojson}"
)
# dumb geometry
geom = from_shape(Point(5.486786, 42.832182), 4326)

# Assert that obs_protected_not_sensitive is not a sensitive observation
assert (
db.session.scalar(
select(Synthese.id_nomenclature_sensitivity)
.where(Synthese.id_synthese == data["obs_protected_not_sensitive"].id_synthese)
.limit(1)
)
== id_nomenclature_not_sensitive
)
assert data["obs_sensitive_protected"].nomenclature_sensitivity.cd_nomenclature == "2"
assert data["obs_sensitive_protected_2"].nomenclature_sensitivity.cd_nomenclature == "2"
assert data["obs_protected_not_sensitive"].nomenclature_sensitivity.cd_nomenclature == "0"

for s in data.values():
assert db.session.scalar(
sa.exists(
sa.select(sa.literal(1))
.select_from(TaxrefBdcStatutTaxon)
.join(
TaxrefBdcStatutCorTextValues,
TaxrefBdcStatutCorTextValues.id_value_text
== TaxrefBdcStatutTaxon.id_value_text,
)
.join(
TaxrefBdcStatutText,
TaxrefBdcStatutText.id_text == TaxrefBdcStatutCorTextValues.id_text,
)
.join(
bdc_statut_cor_text_area,
bdc_statut_cor_text_area.c.id_text == TaxrefBdcStatutText.id_text,
)
.join(
LAreas,
LAreas.id_area == bdc_statut_cor_text_area.c.id_area,
)
.join(corAreaSynthese, corAreaSynthese.c.id_area == LAreas.id_area)
.join(Synthese, Synthese.id_synthese == corAreaSynthese.c.id_synthese)
.where(Synthese.id_synthese == s.id_synthese)
.where(TaxrefBdcStatutTaxon.cd_nom == s.cd_nom)
.where(TaxrefBdcStatutText.enable == True)
).select()
)
# Generate a Synthese object for each taxon retrieved previously
with db.session.begin_nested():
for taxon in protected_taxon:
synthese = create_synthese(geom, taxon, users["user"], datasets["own_dataset"], source)
db.session.add(synthese)
synthese_element[taxon.cd_nom] = synthese

return data
return synthese_element


def create_media(media_path=""):
Expand Down
Loading

0 comments on commit 6138d47

Please sign in to comment.