Skip to content

Commit

Permalink
Merge pull request #626 from bcgov/feature/variableSubstitution
Browse files Browse the repository at this point in the history
Add variable substitution for proof configs
  • Loading branch information
loneil authored Sep 11, 2024
2 parents 7c8de01 + c1b967f commit a44ce58
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 4 deletions.
16 changes: 15 additions & 1 deletion oidc-controller/api/routers/oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from ..routers.socketio import connections_reload, sio

from ..verificationConfigs.crud import VerificationConfigCRUD
from ..verificationConfigs.helpers import VariableSubstitutionError


ChallengePollUri = "/poll"
AuthorizeCallbackUri = "/callback"
Expand Down Expand Up @@ -125,7 +127,19 @@ async def get_authorize(request: Request, db: Database = Depends(get_db)):
ver_config = await VerificationConfigCRUD(db).get(ver_config_id)

# Create presentation_request to show on screen
response = client.create_presentation_request(ver_config.generate_proof_request())
try:
response = client.create_presentation_request(
ver_config.generate_proof_request()
)
except VariableSubstitutionError as e:
return JSONResponse(
status_code=http_status.HTTP_400_BAD_REQUEST,
content={
"detail": f"Variable substitution error: \
'{e.variable_name}' not found in substitution map."
},
)

pres_exch_dict = response.dict()

# Prepeare the presentation request
Expand Down
43 changes: 43 additions & 0 deletions oidc-controller/api/verificationConfigs/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from .variableSubstitutions import variable_substitution_map


class VariableSubstitutionError(Exception):
"""Custom exception for if a variable is used that does not exist."""

def __init__(self, variable_name: str):
self.variable_name = variable_name
super().__init__(f"Variable '{variable_name}' not found in substitution map.")


def replace_proof_variables(proof_req_dict: dict) -> dict:
"""
Recursively replaces variables in the proof request with actual values.
The map is provided by imported variable_substitution_map.
Additional variables can be added to the map in the variableSubstitutions.py file,
or other dynamic functionality.
Args:
proof_req_dict (dict): The proof request dictionary from the resolved config.
Returns:
dict: The updated proof request dictionary with placeholder variables replaced.
"""

for k, v in proof_req_dict.items():
# If the value is a dictionary, recurse
if isinstance(v, dict):
replace_proof_variables(v)
# If the value is a list, iterate trhough list items and recurse
elif isinstance(v, list):
for i in v:
if isinstance(i, dict):
replace_proof_variables(i)
# If the value is a string and matches a key in the map, replace it
elif isinstance(v, str) and v.startswith("$"):
if v in variable_substitution_map:
proof_req_dict[k] = variable_substitution_map[v]()
else:
raise VariableSubstitutionError(v)

# Base case: If the value is not a dict, list, or matching string, do nothing
return proof_req_dict
11 changes: 8 additions & 3 deletions oidc-controller/api/verificationConfigs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .examples import ex_ver_config
from ..core.config import settings
from .helpers import replace_proof_variables


# Slightly modified from ACAPY models.
Expand Down Expand Up @@ -44,6 +45,9 @@ class VerificationConfigBase(BaseModel):
generate_consistent_identifier: Optional[bool] = Field(default=False)
include_v1_attributes: Optional[bool] = Field(default=False)

def get_now(self) -> int:
return int(time.time())

def generate_proof_request(self):
result = {
"name": "proof_requested",
Expand All @@ -61,15 +65,16 @@ def generate_proof_request(self):
"from": int(time.time()),
"to": int(time.time()),
}
# TODO add I indexing
for req_pred in self.proof_request.requested_predicates:
for i, req_pred in enumerate(self.proof_request.requested_predicates):
label = req_pred.label or "req_pred_" + str(i)
result["requested_predicates"][label] = req_pred.dict(exclude_none=True)
if settings.SET_NON_REVOKED:
result["requested_attributes"][label]["non_revoked"] = {
result["requested_predicates"][label]["non_revoked"] = {
"from": int(time.time()),
"to": int(time.time()),
}
# Recursively check for subistitution variables and invoke replacement function
result = replace_proof_variables(result)
return result

model_config = ConfigDict(json_schema_extra={"example": ex_ver_config})
Expand Down
66 changes: 66 additions & 0 deletions oidc-controller/api/verificationConfigs/tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import pytest
from unittest.mock import patch
from api.verificationConfigs.helpers import (
replace_proof_variables,
VariableSubstitutionError,
)

# Mock variable substitution map
mock_variable_substitution_map = {"$var1": lambda: "value1", "$var2": lambda: "value2"}


@patch(
"api.verificationConfigs.helpers.variable_substitution_map",
mock_variable_substitution_map,
)
def test_replace_proof_variables_empty_dict():
assert replace_proof_variables({}) == {}


@patch(
"api.verificationConfigs.helpers.variable_substitution_map",
mock_variable_substitution_map,
)
def test_replace_proof_variables_no_variables():
input_dict = {"key1": "value1", "key2": "value2"}
assert replace_proof_variables(input_dict) == input_dict


@patch(
"api.verificationConfigs.helpers.variable_substitution_map",
mock_variable_substitution_map,
)
def test_replace_proof_variables_with_variables():
input_dict = {"key1": "$var1", "key2": "$var2"}
expected_dict = {"key1": "value1", "key2": "value2"}
assert replace_proof_variables(input_dict) == expected_dict


@patch(
"api.verificationConfigs.helpers.variable_substitution_map",
mock_variable_substitution_map,
)
def test_replace_proof_variables_variable_not_found():
input_dict = {"key1": "$var3"}
with pytest.raises(VariableSubstitutionError):
replace_proof_variables(input_dict)


@patch(
"api.verificationConfigs.helpers.variable_substitution_map",
mock_variable_substitution_map,
)
def test_replace_proof_variables_nested_dict():
input_dict = {"key1": {"key2": "$var1"}}
expected_dict = {"key1": {"key2": "value1"}}
assert replace_proof_variables(input_dict) == expected_dict


@patch(
"api.verificationConfigs.helpers.variable_substitution_map",
mock_variable_substitution_map,
)
def test_replace_proof_variables_list():
input_dict = {"key1": [{"key2": "$var2"}]}
expected_dict = {"key1": [{"key2": "value2"}]}
assert replace_proof_variables(input_dict) == expected_dict
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import time
import pytest
from datetime import datetime, timedelta
from api.verificationConfigs.variableSubstitutions import VariableSubstitutionMap


def test_get_now():
vsm = VariableSubstitutionMap()
assert abs(vsm.get_now() - int(time.time())) < 2 # Allowing a small time difference


def test_get_today_date():
vsm = VariableSubstitutionMap()
assert vsm.get_today_date() == datetime.today().strftime("%Y%m%d")


def test_get_tomorrow_date():
vsm = VariableSubstitutionMap()
assert vsm.get_tomorrow_date() == (datetime.today() + timedelta(days=1)).strftime(
"%Y%m%d"
)


def test_get_threshold_years_date():
vsm = VariableSubstitutionMap()
years = 5
expected_date = (
datetime.today().replace(year=datetime.today().year - years).strftime("%Y%m%d")
)
assert vsm.get_threshold_years_date(years) == int(expected_date)


def test_contains_static_variable():
vsm = VariableSubstitutionMap()
assert "$now" in vsm
assert "$today_str" in vsm
assert "$tomorrow_str" in vsm


def test_contains_dynamic_variable():
vsm = VariableSubstitutionMap()
assert "$threshold_years_5" in vsm


def test_getitem_static_variable():
vsm = VariableSubstitutionMap()
assert callable(vsm["$now"])
assert callable(vsm["$today_str"])
assert callable(vsm["$tomorrow_str"])


def test_getitem_dynamic_variable():
vsm = VariableSubstitutionMap()
assert callable(vsm["$threshold_years_5"])


def test_getitem_key_error():
vsm = VariableSubstitutionMap()
with pytest.raises(KeyError):
vsm["$non_existent_key"]
78 changes: 78 additions & 0 deletions oidc-controller/api/verificationConfigs/variableSubstitutions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""
This file contains VariableSubstitutionMap class, which provides a mapping of
static variables that can be used in a proof.
Other users of this project can add their own variable substitutions or override
the entire file to suit their own needs.
"""

from datetime import datetime, timedelta
import time
import re


class VariableSubstitutionMap:
def __init__(self):
# Map of static variables that can be used in a proof
# This class defines threshold_years_X as a dynamic one
self.static_map = {
"$now": self.get_now,
"$today_str": self.get_today_date,
"$tomorrow_str": self.get_tomorrow_date,
}

def get_threshold_years_date(self, years: int) -> int:
"""
Calculate the threshold date for a given number of years.
Args:
years (int): The number of years to subtract from the current year.
Returns:
int: The current date minux X years in YYYYMMDD format.
"""
threshold_date = datetime.today().replace(year=datetime.today().year - years)
return int(threshold_date.strftime("%Y%m%d"))

def get_now(self) -> int:
"""
Get the current timestamp.
Returns:
int: The current timestamp in seconds since the epoch.
"""
return int(time.time())

def get_today_date(self) -> str:
"""
Get today's date in YYYYMMDD format.
Returns:
str: Today's date in YYYYMMDD format.
"""
return datetime.today().strftime("%Y%m%d")

def get_tomorrow_date(self) -> str:
"""
Get tomorrow's date in YYYYMMDD format.
Returns:
str: Tomorrow's date in YYYYMMDD format.
"""
return (datetime.today() + timedelta(days=1)).strftime("%Y%m%d")

# For "dynamic" variables, use a regex to match the key and return a lambda function
# So a proof request can use $threshold_years_X to get the years back for X years
def __contains__(self, key: str) -> bool:
return key in self.static_map or re.match(r"\$threshold_years_(\d+)", key)

def __getitem__(self, key: str):
if key in self.static_map:
return self.static_map[key]
match = re.match(r"\$threshold_years_(\d+)", key)
if match:
return lambda: self.get_threshold_years_date(int(match.group(1)))
raise KeyError(f"Key {key} not found in format_args_function_map")


# Create an instance of the custom mapping class
variable_substitution_map = VariableSubstitutionMap()

0 comments on commit a44ce58

Please sign in to comment.