Skip to content

Commit

Permalink
Add resolution for Fn::Sub and Fn::FindInMap
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong committed Nov 2, 2023
1 parent 8d774e1 commit 8ba19e0
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 47 deletions.
12 changes: 6 additions & 6 deletions src/cfnlint/context/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class Context:
mappings: Dict[str, "Map"] = field(init=True, default_factory=dict)

# other Refs from Fn::Sub
ref_values: Dict[str, List[Any]] = field(init=True, default_factory=dict)
ref_values: Dict[str, Any] = field(init=True, default_factory=dict)

# Resolved value
resolved_value: bool = field(init=True, default=False)
Expand All @@ -86,9 +86,9 @@ def __post_init__(self) -> None:
if self.path is None:
self.path = deque([])
for pseudo_parameter in PSEUDOPARAMS:
self.ref_values[pseudo_parameter] = [
_get_pseudo_value(pseudo_parameter, self.region)
]
self.ref_values[pseudo_parameter] = _get_pseudo_value(
pseudo_parameter, self.region
)

def evolve(self, **kwargs) -> "Context":
"""
Expand Down Expand Up @@ -233,7 +233,7 @@ class _MappingSecondaryKey:
This class holds a mapping value
"""

keys: Dict[str, List[Any] | str] = field(init=False)
keys: Dict[str, List[Any] | str] = field(init=False, default_factory=dict)
instance: InitVar[Any]

def __post_init__(self, instance) -> None:
Expand All @@ -255,7 +255,7 @@ class Map:
This class holds a mapping
"""

keys: Dict[str, _MappingSecondaryKey] = field(init=False)
keys: Dict[str, _MappingSecondaryKey] = field(init=False, default_factory=dict)
resource: InitVar[Any]

def __post_init__(self, mapping) -> None:
Expand Down
32 changes: 2 additions & 30 deletions src/cfnlint/jsonschema/_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@

# Code is taken from jsonschema package and adapted CloudFormation use
# https://github.com/python-jsonschema/jsonschema
from typing import Any, Dict, Iterator, Sequence, Tuple
from typing import Any, Sequence, Tuple

import regex as re

from cfnlint.decode.node import dict_node
from cfnlint.helpers import REGEX_SUB_PARAMETERS, ToPy
from cfnlint.helpers import ToPy
from cfnlint.jsonschema._utils import ensure_list

_all_types = ["array", "boolean", "integer", "number", "object", "string"]
Expand Down Expand Up @@ -82,21 +79,6 @@ def _filter_schemas(self, schema, validator: Any) -> Tuple[Any, Any]:

return standard_schema, group_schema

def _cleanse_object(
self, validator: Any, instance: Dict[str, Any]
) -> Iterator[Tuple[str, Any]]:
if validator.context.transforms.has_language_extensions_transform():
for k, v in instance.items():
for p in re.findall(REGEX_SUB_PARAMETERS, k):
if p in validator.context.ref_values:
k = re.sub(
REGEX_SUB_PARAMETERS, validator.context.ref_values[p], k
)
else:
yield (k, v)
else:
yield from instance.items()

def filter(self, validator: Any, instance: Any, schema: Any):
# dependencies, required, minProperties, maxProperties
# need to have filtered properties to validate
Expand All @@ -110,16 +92,6 @@ def filter(self, validator: Any, instance: Any, schema: Any):
yield (scenario.get("Object"), group_schema)

if validator.is_type(instance, "object"):
if hasattr(instance, "start_mark"):
start_mark = instance.start_mark
end_mark = instance.end_mark
instance = dict_node(
dict(self._cleanse_object(validator, instance)),
start_mark=start_mark,
end_mark=end_mark,
)
else:
instance = dict(self._cleanse_object(validator, instance))
if len(instance) == 1:
for k, v in instance.items():
if k in validator.context.functions:
Expand Down
76 changes: 72 additions & 4 deletions src/cfnlint/jsonschema/_resolvers_cfn.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
from __future__ import annotations

import json
from typing import Any, Dict, Iterator

import regex as re

from cfnlint.helpers import AVAILABILITY_ZONES
from cfnlint.jsonschema import Validator

Expand All @@ -20,7 +28,7 @@ def ref(validator: Validator, instance: Any) -> Iterator[Any]:
if instance in validator.context.ref_values:
# Ref: AWS::NoValue returns None making this fail
if validator.context.ref_values[instance] is not None:
yield from iter(validator.context.ref_values[instance])
yield validator.context.ref_values[instance]
return
if instance in validator.context.parameters:
if validator.context.parameters[instance].allowed_values:
Expand Down Expand Up @@ -49,7 +57,7 @@ def find_in_map(validator: Validator, instance: Any) -> Iterator[Any]:
if not validator.is_type(second_level_key, "string"):
continue
try:
yield validator.context.mappings[map_name].find_in_map(
yield from validator.context.mappings[map_name].find_in_map(
top_level_key,
second_level_key,
)
Expand Down Expand Up @@ -150,9 +158,69 @@ def split(validator: Validator, instance: Any) -> Iterator[Any]:
yield source_string.split(delimiter)


def _sub_parameter_expansion(
validator: Validator, parameters: Dict[str, Any]
) -> Iterator[Dict[str, Any]]:
parameters = parameters.copy()
if len(parameters) == 0:
return

Check warning on line 166 in src/cfnlint/jsonschema/_resolvers_cfn.py

View check run for this annotation

Codecov / codecov/patch

src/cfnlint/jsonschema/_resolvers_cfn.py#L166

Added line #L166 was not covered by tests

if len(parameters) == 1:
for key, value in parameters.items():
for resolved_value in validator.resolve_value(value):
yield {key: resolved_value}
return

key = list(parameters.keys())[0]
value = parameters.pop(key)
for resolved_value in validator.resolve_value(value):
for values in _sub_parameter_expansion(validator, parameters):
yield dict({key: resolved_value}, **values)

Check warning on line 178 in src/cfnlint/jsonschema/_resolvers_cfn.py

View check run for this annotation

Codecov / codecov/patch

src/cfnlint/jsonschema/_resolvers_cfn.py#L178

Added line #L178 was not covered by tests


def _sub_string(validator: Validator, string: str) -> Iterator[str]:
sub_regex = re.compile(r"(\${([^!].*?)})")

def _replace(matchobj):
if matchobj.group(2) in validator.context.ref_values:
return validator.context.ref_values[matchobj.group(2)]
raise ValueError(f"No matches for {matchobj.group(2)}")

try:
yield re.sub(sub_regex, _replace, string)
except ValueError:
return


def sub(validator: Validator, instance: Any) -> Iterator[Any]:
return
yield
if not (
validator.is_type(instance, "array") or validator.is_type(instance, "string")
):
return

Check warning on line 199 in src/cfnlint/jsonschema/_resolvers_cfn.py

View check run for this annotation

Codecov / codecov/patch

src/cfnlint/jsonschema/_resolvers_cfn.py#L199

Added line #L199 was not covered by tests

if validator.is_type(instance, "array"):
if len(instance) != 2:
return

Check warning on line 203 in src/cfnlint/jsonschema/_resolvers_cfn.py

View check run for this annotation

Codecov / codecov/patch

src/cfnlint/jsonschema/_resolvers_cfn.py#L203

Added line #L203 was not covered by tests

string = instance[0]
parameters = instance[1]

if not validator.is_type(parameters, "object"):
return

Check warning on line 209 in src/cfnlint/jsonschema/_resolvers_cfn.py

View check run for this annotation

Codecov / codecov/patch

src/cfnlint/jsonschema/_resolvers_cfn.py#L209

Added line #L209 was not covered by tests

for resolved_parameters in _sub_parameter_expansion(validator, parameters):
resolved_validator = validator.evolve(
context=validator.context.evolve(
ref_values=resolved_parameters,
)
)
for resolved_string in _sub_string(resolved_validator, string):
yield resolved_string

return

# its a string
yield from _sub_string(validator, instance)


def to_json_string(validator: Validator, instance: Any) -> Iterator[Any]:
Expand Down
22 changes: 18 additions & 4 deletions src/cfnlint/jsonschema/_validators_cfn.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,10 @@ def find_in_map(
self._max_length = 4
self.items.append(_FindInMapDefault())

for err in self.iter_errors(validator, s, instance, schema):
yield err
errs = self.iter_errors(validator, s, instance, schema)
if errs:
yield from iter(errs)
return

for value in validator.resolve_value(instance):
for err in validator.descend(value, s):
Expand Down Expand Up @@ -781,14 +783,26 @@ def sub(
if errs > 0:
return

validator = validator.evolve(
validator_string = validator.evolve(
context=validator.context.evolve(
ref_values=dict.fromkeys(keys, Parameter({"Type": "String"})),
)
)
value = value[0]
else:
validator_string = validator

errors = list(self._validate_string(validator_string, value))
if errors:
yield from iter(errors)
return

yield from self._validate_string(validator, value)
for value in validator.resolve_value(instance):
for err in validator.descend(value, s):
err.path.appendleft(self.fn.name)
err.message = err.message.replace(f"{value!r}", f"{instance!r}")
err.message = f"{err.message} when {self.fn.name!r} is resolved"
yield err

Check warning on line 805 in src/cfnlint/jsonschema/_validators_cfn.py

View check run for this annotation

Codecov / codecov/patch

src/cfnlint/jsonschema/_validators_cfn.py#L802-L805

Added lines #L802 - L805 were not covered by tests


class FnCidr(FnArray):
Expand Down
6 changes: 3 additions & 3 deletions test/unit/rules/resources/iam/test_identity_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ def test_object_statements(self):
],
"Resource": [
{
"Fn::Sub": "arn:${AWS::Partition}:iam::123456789012/role/object-role"
"Fn::Sub": "arn:${AWS::Partition}:iam::123456789012:role/object-role"
},
{
"NotValid": [
"arn:${AWS::Partition}:iam::123456789012/role/object-role"
"arn:${AWS::Partition}:iam::123456789012:role/object-role"
]
},
],
Expand All @@ -110,7 +110,7 @@ def test_object_statements(self):
self.assertListEqual(list(errs[0].path), ["Statement", 0, "Effect"])
self.assertEqual(
errs[1].message,
"{'NotValid': ['arn:${AWS::Partition}:iam::123456789012/role/object-role']} is not of type 'string'",
"{'NotValid': ['arn:${AWS::Partition}:iam::123456789012:role/object-role']} is not of type 'string'",
)
self.assertListEqual(list(errs[1].path), ["Statement", 0, "Resource", 1])

Expand Down

0 comments on commit 8ba19e0

Please sign in to comment.