Skip to content

Commit

Permalink
Allow for all getatts
Browse files Browse the repository at this point in the history
  • Loading branch information
kddejong committed Jan 10, 2024
1 parent a2e5fe7 commit cacbc08
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 55 deletions.
9 changes: 5 additions & 4 deletions src/cfnlint/rules/functions/Join.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from cfnlint.helpers import VALID_PARAMETER_TYPES_LIST
from cfnlint.rules import CloudFormationLintRule, RuleMatch
from cfnlint.template import GetAtts, Template


class Join(CloudFormationLintRule):
Expand Down Expand Up @@ -101,19 +102,19 @@ def _is_ref_a_list(self, parameter, template_parameters):
return True
return False

def _is_getatt_a_list(self, parameter, get_atts) -> Union[bool, None]:
def _is_getatt_a_list(self, parameter, get_atts: GetAtts) -> Union[bool, None]:
"""Is a GetAtt a List"""
try:
getatt = get_atts.match("us-east-1", parameter)
if getatt.get("type") == "array" or not getatt:
if getatt.type == "array" or getatt.type is None:
return True
return False
except: # pylint: disable=bare-except
# this means we can't match the get_att. This is
# covered by another rule
return None

def _match_string_objs(self, join_string_objs, cfn, path):
def _match_string_objs(self, join_string_objs, cfn: Template, path):
"""Check join list"""

matches = []
Expand Down Expand Up @@ -202,7 +203,7 @@ def _match_string_objs(self, join_string_objs, cfn, path):

return matches

def match(self, cfn):
def match(self, cfn: Template):
matches = []

join_objs = cfn.search_deep_keys("Fn::Join")
Expand Down
5 changes: 3 additions & 2 deletions src/cfnlint/rules/functions/Sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
from cfnlint.helpers import PSEUDOPARAMS, VALID_PARAMETER_TYPES_LIST
from cfnlint.rules import CloudFormationLintRule, RuleMatch
from cfnlint.template import Template


class Sub(CloudFormationLintRule):
Expand Down Expand Up @@ -107,7 +108,7 @@ def _test_parameters(self, parameters, cfn, tree):

return matches

def _test_parameter(self, parameter, cfn, parameters, tree):
def _test_parameter(self, parameter, cfn: Template, parameters, tree):
"""Test a parameter"""

matches = []
Expand All @@ -134,7 +135,7 @@ def _test_parameter(self, parameter, cfn, parameters, tree):
if not found:
try:
d = get_atts.match(cfn.regions[0], parameter)
if d.get("type") == "array":
if d.type == "array":
message = "Fn::Sub cannot use list {0} at {1}"
matches.append(
RuleMatch(
Expand Down
3 changes: 1 addition & 2 deletions src/cfnlint/rules/outputs/Value.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ def match(self, cfn: Template):
attribute = res_schema.get_atts().get(obj[1])
# Bad schema or bad attribute. Skip if either is true
if attribute:
attribute_type = attribute.get("type")
if attribute_type == "array":
if attribute.type == "array":
if getatt[-4] != "Fn::Join" and getatt[-3] != 1:
message = "Output {0} value {1} is of type list"
matches.append(
Expand Down
1 change: 1 addition & 0 deletions src/cfnlint/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from cfnlint.schema.exceptions import ResourceNotFoundError
from cfnlint.schema.getatts import GetAtt, GetAttType
from cfnlint.schema.manager import PROVIDER_SCHEMA_MANAGER
from cfnlint.schema.schema import Schema
33 changes: 33 additions & 0 deletions src/cfnlint/schema/_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
_all_property_types = [
"AWS::Amplify::Branch",
"AWS::Amplify::Domain",
"AWS::AppSync::DomainName",
"AWS::Backup::BackupSelection",
"AWS::Backup::BackupVault",
"AWS::CodeArtifact::Domain",
"AWS::CodeArtifact::Repository",
"AWS::EC2::CapacityReservation",
"AWS::EC2::Subnet",
"AWS::EC2::VPC",
"AWS::EFS::MountTarget",
"AWS::EKS::Nodegroup",
"AWS::ImageBuilder::Component",
"AWS::ImageBuilder::ContainerRecipe",
"AWS::ImageBuilder::DistributionConfiguration",
"AWS::ImageBuilder::ImagePipeline",
"AWS::ImageBuilder::ImageRecipe",
"AWS::ImageBuilder::InfrastructureConfiguration",
"AWS::RDS::DBParameterGroup",
"AWS::RoboMaker::RobotApplication",
"AWS::RoboMaker::SimulationApplication",
"AWS::Route53Resolver::ResolverRule",
"AWS::Route53Resolver::ResolverRuleAssociation",
"AWS::SNS::Topic",
"AWS::SQS::Queue",
"AWS::SageMaker::DataQualityJobDefinition",
"AWS::SageMaker::ModelBiasJobDefinition",
"AWS::SageMaker::ModelExplainabilityJobDefinition",
"AWS::SageMaker::ModelQualityJobDefinition",
"AWS::SageMaker::MonitoringSchedule",
"AWS::StepFunctions::Activity",
]
86 changes: 86 additions & 0 deletions src/cfnlint/schema/getatts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
SPDX-License-Identifier: MIT-0
"""
import enum
from typing import Any, Dict, Optional

from cfnlint.schema._constants import _all_property_types
from cfnlint.schema._pointer import resolve_pointer


class GetAttType(enum.Enum):
ReadOnly = 1
All = 2


class GetAtt:
def __init__(self, schema: Dict[str, Any], getatt_type: GetAttType) -> None:
self._getatt_type: GetAttType = getatt_type
self._type: str = schema.get("type")
self._item_type: Optional[str]
if self._type == "array":
self._item_type: Optional[str] = schema.get("items", {}).get("type")

@property
def type(self) -> str:
return self._type

@property
def item_type(self) -> Optional[str]:
return self._item_type

@property
def getatt_type(self) -> GetAttType:
return self._getatt_type


class GetAtts:
"""Class for helping with GetAtt logic"""

_attrs: Dict[str, GetAtt] = {}
_schema = {}

def __init__(self, schema: Dict[str, Any]) -> None:
self._attrs = {}
self._schema = schema
type_name = schema.get("typeName", "")
if type_name in _all_property_types:
for name, value in schema.get("properties", {}).items():
self._process_schema(name, value, GetAttType.All)
return
for ro_attr in schema.get("readOnlyProperties", []):
try:
self._process_schema_by_pointer(ro_attr, GetAttType.ReadOnly)
except KeyError:
pass

def _process_schema_by_pointer(self, ptr: str, getatt_type):
name = ".".join(ptr.split("/")[2:])
schema = resolve_pointer(self._schema, ptr)
self._process_schema(name, schema, getatt_type)

def _process_schema(
self, name: str, schema: Dict[str, Any], getatt_type: GetAttType
):
if "$ref" in schema:
self._process_schema_by_pointer(schema.get("$ref"), getatt_type)
elif schema.get("type") == "object":
for prop, value in schema.get("properties", {}).items():
self._process_schema(f"{name}.{prop}", value, getatt_type)
elif schema.get("type") == "array":
# GetAtt doesn't support going into an array of objects or another array
# so we only look at an array of strings, etc.
if schema.get("items", {}).get("type") in [
"string",
"integer",
"number",
"boolean",
]:
self._attrs[name] = GetAtt(schema, getatt_type)
else:
self._attrs[name] = GetAtt(schema, getatt_type)

@property
def attrs(self) -> Dict[str, GetAtt]:
return self._attrs
3 changes: 2 additions & 1 deletion src/cfnlint/schema/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
url_has_newer_version,
)
from cfnlint.schema.exceptions import ResourceNotFoundError
from cfnlint.schema.getatts import GetAtt
from cfnlint.schema.patch import SchemaPatch
from cfnlint.schema.schema import Schema

Expand Down Expand Up @@ -345,7 +346,7 @@ def _patch(self, patch: SchemaPatch, region: str) -> None:
continue
schema.patch(patches=patches)

def get_type_getatts(self, resource_type: str, region: str) -> Dict[str, Dict]:
def get_type_getatts(self, resource_type: str, region: str) -> Dict[str, GetAtt]:
"""Get the GetAtts for a type in a region
Args:
Expand Down
42 changes: 6 additions & 36 deletions src/cfnlint/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
SPDX-License-Identifier: MIT-0
"""
from copy import deepcopy
from typing import Dict, List, Tuple
from typing import Dict, List

import jsonpatch

from cfnlint.schema._pointer import resolve_pointer
from cfnlint.schema.getatts import GetAtt, GetAtts

# Can't use a dataclass because its hard to parse in json
# with optional fields without addtional help
Expand All @@ -20,6 +20,7 @@ def __init__(self, schema) -> None:
self.schema = deepcopy(schema)
self._json_schema = self._cleanse_schema(schema=schema)
self.type_name = schema["typeName"]
self._getatts = GetAtts(self.schema)

def _cleanse_schema(self, schema) -> Dict:
for ro_prop in schema.get("readOnlyProperties", []):
Expand All @@ -43,30 +44,6 @@ def json_schema(self) -> Dict:
"""
return self._json_schema

def _flatten_getatts(self, ptr: str) -> Tuple[Dict[str, Dict], bool]:
getatts = {}
name = ".".join(ptr.split("/")[2:])
obj = resolve_pointer(self.schema, ptr)
is_object = False
if "$ref" in obj:
subs, sub_is_object = self._flatten_getatts(obj.get("$ref"))
for sub_name, sub in subs.items():
cp = deepcopy(obj)
del cp["$ref"]
cp.update(sub)
if sub_is_object:
getatts[f"{name}.{sub_name}"] = cp
else:
getatts[name] = cp
elif obj.get("type") == "object":
is_object = True
for prop, value in obj.get("properties", {}).items():
getatts[prop] = value
else:
getatts[name] = obj

return getatts, is_object

def patch(self, patches: List[Dict]) -> None:
"""Patches the schema file
Expand All @@ -77,20 +54,13 @@ def patch(self, patches: List[Dict]) -> None:
"""
jsonpatch.JsonPatch(patches).apply(self._json_schema, in_place=True)

def get_atts(self) -> Dict[str, dict]:
def get_atts(self) -> Dict[str, GetAtt]:
"""Get the valid GetAtts for this schema. Schemas are defined in property
readOnlyProperties and we need to build definitions of those properties
Args:
Returns:
Dict[str, dict]: Dict of keys with valid strings and the json schema
Dict[str, GetAtt]: Dict of keys with valid strings and the json schema
object for the property
"""
attrs = {}
for ro_attr in self.schema.get("readOnlyProperties", []):
try:
attrs.update(self._flatten_getatts(ro_attr)[0].items())
except KeyError:
pass

return attrs
return self._getatts.attrs
30 changes: 20 additions & 10 deletions src/cfnlint/template/getatts.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from typing import Dict, Iterable, List, Optional, Tuple, Union

from cfnlint.helpers import RegexDict
from cfnlint.schema.manager import PROVIDER_SCHEMA_MANAGER, ResourceNotFoundError
from cfnlint.schema import (
PROVIDER_SCHEMA_MANAGER,
GetAtt,
GetAttType,
ResourceNotFoundError,
)


class GetAtts:
# [region][resource_name][attribute][JsonSchema Dict]
_getatts: Dict[str, Dict[str, RegexDict]]
# Dict[str, RegexDict[str, RegexDict[str, GetAtt]]]
_getatts: Dict[str, Dict[str, Dict[str, GetAtt]]]

_astrik_string_types = ("AWS::CloudFormation::Stack",)
_astrik_unknown_types = (
Expand All @@ -19,24 +25,28 @@ def __init__(self, regions: List[str]) -> None:
self._regions = regions
self._getatts = {}
for region in self._regions:
self._getatts[region] = {}
self._getatts[region] = RegexDict()

def add(self, resource_name: str, resource_type: str) -> None:
for region in self._regions:
if resource_name not in self._getatts[region]:
if resource_type.endswith("::MODULE"):
self._getatts[region][f"{resource_name}.*"] = RegexDict()
self._getatts[region][f"{resource_name}.*"][".*"] = {}
self._getatts[region][f"{resource_name}.*"][".*"] = GetAtt(
schema={}, getatt_type=GetAttType.ReadOnly
)
continue

self._getatts[region][resource_name] = RegexDict()

if resource_type.startswith(self._astrik_string_types):
self._getatts[region][resource_name]["Outputs..*"] = {
"type": "string"
}
self._getatts[region][resource_name]["Outputs\\..*"] = GetAtt(
schema={"type": "string"}, getatt_type=GetAttType.ReadOnly
)
elif resource_type.startswith(self._astrik_unknown_types):
self._getatts[region][resource_name][".*"] = {}
self._getatts[region][resource_name][".*"] = GetAtt(
schema={}, getatt_type=GetAttType.ReadOnly
)
else:
try:
for (
Expand Down Expand Up @@ -109,7 +119,7 @@ def json_schema(self, region: str) -> Dict:
schema["oneOf"].append(schema_strings)
return schema

def match(self, region: str, getatt: Union[str, List[str]]) -> Dict:
def match(self, region: str, getatt: Union[str, List[str]]) -> GetAtt:
if isinstance(getatt, str):
getatt = getatt.split(".", 1)

Expand All @@ -128,7 +138,7 @@ def match(self, region: str, getatt: Union[str, List[str]]) -> Dict:
else:
raise TypeError("Invalid GetAtt structure")

def items(self, region: Optional[str] = None) -> Iterable[Tuple[str, Dict]]:
def items(self, region: Optional[str] = None) -> Iterable[Tuple[str, GetAtt]]:
if region is None:
region = self._regions[0]
for k, v in self._getatts.get(region, {}).items():
Expand Down

0 comments on commit cacbc08

Please sign in to comment.