From e7c926945af160fca33b273912b28628edfa1472 Mon Sep 17 00:00:00 2001 From: Nicola Sirena Date: Wed, 21 Jun 2023 11:09:55 +0200 Subject: [PATCH 1/5] Add classes for InstanceRequirements definition Add InstanceRequirementsDefinition class to hold all the attributes required to define a ComputeResource. Add a InstanceRequirementsComputeResource to model a ComputeResource defined through IntanceRequirements. Signed-off-by: Nicola Sirena --- cli/src/pcluster/config/cluster_config.py | 78 +++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/cli/src/pcluster/config/cluster_config.py b/cli/src/pcluster/config/cluster_config.py index 3675a67286..85333b4e32 100644 --- a/cli/src/pcluster/config/cluster_config.py +++ b/cli/src/pcluster/config/cluster_config.py @@ -2167,6 +2167,84 @@ def max_network_interface_count(self) -> int: return least_max_nics +class InstanceRequirementsDefinition(Resource): + """Collects the attributes required to define a Slurm Compute Resource through Instance Requirements.""" + + def __init__( + self, + min_vcpus, + min_memory_mib, + max_vcpus=0, + max_memory_mib=0, + accelerator_count=0, + max_price_percentage=50, + allowed_instance_types: List[str] = None, + excluded_instance_types: List[str] = None, + accelerator_types: List[str] = None, + accelerator_manufacturers: List[str] = None, + bare_metal: List[str] = None, + instance_generations: List[str] = None, + **kwargs, + ): + super().__init__(**kwargs) + self.min_vcpus = Resource.init_param(min_vcpus) + self.max_vcpus = Resource.init_param(max_vcpus) + self.min_memory_mib = Resource.init_param(min_memory_mib) + self.max_memory_mib = Resource.init_param(max_memory_mib) + self.accelerator_count = Resource.init_param(accelerator_count) + self.max_price_percentage = Resource.init_param(max_price_percentage) + self.allowed_instance_types = Resource.init_param(allowed_instance_types) + self.excluded_instance_types = Resource.init_param(excluded_instance_types) + self.bare_metal = bare_metal + self.instance_generations = instance_generations + self.accelerator_types = accelerator_types + self.accelerator_manufacturers = accelerator_manufacturers + # Enforce desired default behavior + if self.accelerator_count > 0: + self.accelerator_types = ["gpu"] + self.accelerator_manufacturers = ["nvidia"] + else: + self.accelerator_types = None + self.accelerator_manufacturers = None + + +class InstanceRequirementsComputeResource(_BaseSlurmComputeResource): + """Represents a Slurm Compute Resource defined through Instance Requirements.""" + + def __init__( + self, + instance_requirements: InstanceRequirementsDefinition, + **kwargs, + ): + super().__init__(**kwargs) + self.instance_requirements = Resource.init_param(instance_requirements) + self.instance_type_list = [] + + @property + def disable_simultaneous_multithreading_manually(self) -> bool: + """Return true if simultaneous multithreading must be disabled with a cookbook script.""" + return self.disable_simultaneous_multithreading + + @property + def max_network_interface_count(self) -> int: + """Return max number of NICs for the compute resource. + + Currently customers are not allowed to specify 'NetworkInterfaceCount' as requirement so this method is + a placeholder for future improvements. + An implementation is required since it is an abstract method of the base class. + It is meant to be used in the validators. + For MVP the validator will list the instance types returned by CreateFlet and pick the minimum common value. + """ + return 1 + + @property + def instance_types(self) -> List[str]: + """Should Return list of instance type names in this compute resource.""" + # TODO (singleton) retrieve once the list of instance-types derived from the requirements with + # get-instance-types-from-instance-requirements and fill the list + return self.instance_type_list + + class SlurmComputeResource(_BaseSlurmComputeResource): """Represents a Slurm Compute Resource with a Single Instance Type.""" From 5da9262114ec437079af723d24bbb6fc0d702f6d Mon Sep 17 00:00:00 2001 From: Nicola Sirena Date: Wed, 21 Jun 2023 11:20:00 +0200 Subject: [PATCH 2/5] Add schema classes for InstanceRequirements Add InstanceRequirementsSchema class to define the customer facing contract of the feature. Extends SlurmComputeResourceSchema to support InstanceRequirementsSchema definitions. Signed-off-by: Nicola Sirena --- cli/src/pcluster/schemas/cluster_schema.py | 108 ++++++++++++++++++++- 1 file changed, 105 insertions(+), 3 deletions(-) diff --git a/cli/src/pcluster/schemas/cluster_schema.py b/cli/src/pcluster/schemas/cluster_schema.py index 91f9511e53..5efd57100d 100644 --- a/cli/src/pcluster/schemas/cluster_schema.py +++ b/cli/src/pcluster/schemas/cluster_schema.py @@ -60,6 +60,8 @@ Iam, Image, Imds, + InstanceRequirementsComputeResource, + InstanceRequirementsDefinition, IntelSoftware, LocalStorage, LoginNodes, @@ -1224,6 +1226,98 @@ def make_resource(self, data, **kwargs): return FlexibleInstanceType(**data) +class InstanceRequirementsSchema(BaseSchema): + """Represent the schema of InstanceRequirements for Compute Resources.""" + + # Customer Facing parameters + min_vcpus = fields.Int( + required=True, + data_key="MinvCpus", + validate=validate.Range(min=0), + metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY}, + ) + + max_vcpus = fields.Int( + data_key="MaxvCpus", + validate=validate.Range(min=0), # + metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY}, + ) + + min_memory_mib = fields.Int( + required=True, + data_key="MinMemoryMib", + validate=validate.Range(min=0), + metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY}, + ) + + max_memory_mib = fields.Int( + data_key="MaxMemoryMib", + validate=validate.Range(min=0), + metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY}, + ) + + accelerator_count = fields.Int( + data_key="AcceleratorCount", + validate=validate.Range(min=0), + metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY}, + ) + + max_price_percentage = fields.Int( + data_key="MaxPricePercentageOverLowestPrice", + validate=validate.Range(min=0), + metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY}, + ) + + allowed_instance_types = fields.List( + fields.Str(), data_key="AllowedInstanceTypes", metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY} + ) + + excluded_instance_types = fields.List( + fields.Str(), data_key="ExcludedInstanceTypes", metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY} + ) + + # Internal (non customer facing) parameters; used to adapt the default behavior + accelerator_types = fields.List( + fields.Str(), + data_key="AcceleratorTypes", + ) + + accelerator_manufacturers = fields.List( + fields.Str(), + data_key="AcceleratorManufacturers", + ) + + bare_metal = fields.List( + fields.Str(), + missing=["included"], # forcing the desired behavior + data_key="BareMetal", + ) + + instance_generations = fields.List( + fields.Str(), + missing=["current"], # forcing the desired behavior + data_key="InstanceGenerations", + ) + + @validates_schema + def only_only_list_of_istances_is_allowed(self, data, **kwargs): + """Validate that 'allowed_instance_types' and 'excluded_instance_types' do not co-exist.""" + if self.fields_coexist( + data, + ["allowed_instance_types", "excluded_instance_types"], + one_required=False, + **kwargs, + ): + raise ValidationError( + "Either AllowedInstanceTypes or ExcludedInstanceTypes can be used in InstanceRequirements definition." + ) + + @post_load + def make_resource(self, data, **kwargs): + """Generate resource.""" + return InstanceRequirementsDefinition(**data) + + class HeadNodeSchema(BaseSchema): """Represent the schema of the HeadNode.""" @@ -1359,6 +1453,9 @@ class SlurmComputeResourceSchema(_ComputeResourceSchema): many=True, metadata={"update_policy": UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, "update_key": "InstanceType"}, ) + instance_requirements = fields.Nested( + InstanceRequirementsSchema, + ) max_count = fields.Int(validate=validate.Range(min=1), metadata={"update_policy": UpdatePolicy.MAX_COUNT}) min_count = fields.Int(validate=validate.Range(min=0), metadata={"update_policy": UpdatePolicy.COMPUTE_FLEET_STOP}) spot_price = fields.Float( @@ -1392,11 +1489,13 @@ def no_coexist_instance_type_flexibility(self, data, **kwargs): """Validate that 'instance_type' and 'instances' do not co-exist.""" if self.fields_coexist( data, - ["instance_type", "instances"], + ["instance_type", "instances", "instance_requirements"], one_required=True, **kwargs, ): - raise ValidationError("A Compute Resource needs to specify either InstanceType or Instances.") + raise ValidationError( + "A Compute Resource needs to specify either InstanceType, Instances or InstanceRequirements." + ) @validates("instances") def no_duplicate_instance_types(self, flexible_instance_types: List[FlexibleInstanceType]): @@ -1422,7 +1521,10 @@ def make_resource(self, data, **kwargs): """Generate resource.""" if data.get("instances"): return SlurmFlexibleComputeResource(**data) - return SlurmComputeResource(**data) + elif data.get("instance_requirements"): + return InstanceRequirementsComputeResource(**data) + else: + return SlurmComputeResource(**data) class AwsBatchComputeResourceSchema(_ComputeResourceSchema): From 070defe72b9a0f3772c594e0c373e0de50d80e86 Mon Sep 17 00:00:00 2001 From: Nicola Sirena Date: Wed, 21 Jun 2023 11:29:51 +0200 Subject: [PATCH 3/5] Add tests for InstanceRequirements schema Signed-off-by: Nicola Sirena --- .../pcluster/schemas/test_cluster_schema.py | 4 +- .../test_instance_requirements_schema.py | 181 ++++++++++++++++++ 2 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 cli/tests/pcluster/schemas/test_instance_requirements_schema.py diff --git a/cli/tests/pcluster/schemas/test_cluster_schema.py b/cli/tests/pcluster/schemas/test_cluster_schema.py index 00e8945452..0df31c5b30 100644 --- a/cli/tests/pcluster/schemas/test_cluster_schema.py +++ b/cli/tests/pcluster/schemas/test_cluster_schema.py @@ -486,7 +486,7 @@ def test_scheduling_schema(mocker, config_dict, failure_message): } ], }, - "A Compute Resource needs to specify either InstanceType or Instances.", + "A Compute Resource needs to specify either InstanceType, Instances or InstanceRequirements.", ), # Mixing InstanceType and Instances in a Compute Resource should return a validation error ( @@ -502,7 +502,7 @@ def test_scheduling_schema(mocker, config_dict, failure_message): }, ], }, - "A Compute Resource needs to specify either InstanceType or Instances.", + "A Compute Resource needs to specify either InstanceType, Instances or InstanceRequirements.", ), # Instances in a Compute Resource should not have duplicate instance types ( diff --git a/cli/tests/pcluster/schemas/test_instance_requirements_schema.py b/cli/tests/pcluster/schemas/test_instance_requirements_schema.py new file mode 100644 index 0000000000..6d2317dad7 --- /dev/null +++ b/cli/tests/pcluster/schemas/test_instance_requirements_schema.py @@ -0,0 +1,181 @@ +# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance +# with the License. A copy of the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES +# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and +# limitations under the License. +import logging + +import pytest +from marshmallow import ValidationError + +from pcluster.schemas.cluster_schema import InstanceRequirementsSchema, SlurmComputeResourceSchema + + +@pytest.mark.parametrize( + "compute_resource_config, failure_message", + [ + pytest.param( + {"Name": "cr-single", "InstanceType": "c5.2xlarge"}, None, id="Using single instance type only is correct" + ), + pytest.param( + {"Name": "cr-multi", "Instances": [{"InstanceType": "c5.2xlarge"}, {"InstanceType": "c5.4xlarge"}]}, + None, + id="Using multiple instance type only is correct", + ), + pytest.param( + { + "Name": "cr-multi", + "InstanceRequirements": {"MinvCpus": 1, "MaxvCpus": 5, "MinMemoryMib": 1, "MaxMemoryMib": 5}, + }, + None, + id="Using InstanceRequirements only is correct", + ), + pytest.param( + { + "Name": "cr-single-multi", + "InstanceType": "c5.2xlarge", + "Instances": [{"InstanceType": "c5.2xlarge"}, {"InstanceType": "c5.4xlarge"}], + }, + "A Compute Resource needs to specify either InstanceType, Instances or InstanceRequirements.", + id="Using single single and multiple instance types together should fail", + ), + pytest.param( + { + "Name": "cr-single-requirements", + "InstanceType": "c5.2xlarge", + "InstanceRequirements": {"MinvCpus": 1, "MaxvCpus": 5, "MinMemoryMib": 1, "MaxMemoryMib": 5}, + }, + "A Compute Resource needs to specify either InstanceType, Instances or InstanceRequirements.", + id="Using single instance type and InstanceRequirements together should fail", + ), + pytest.param( + { + "Name": "cr-single-requirements", + "Instances": [{"InstanceType": "c5.2xlarge"}, {"InstanceType": "c5.4xlarge"}], + "InstanceRequirements": {"MinvCpus": 1, "MaxvCpus": 5, "MinMemoryMib": 1, "MaxMemoryMib": 5}, + }, + "A Compute Resource needs to specify either InstanceType, Instances or InstanceRequirements.", + id="Using multiple instance types and InstanceRequirements together should fail", + ), + ], +) +def test_compute_resource_definitions_are_mutually_exclusive(compute_resource_config, failure_message): + if failure_message: + with pytest.raises(ValidationError, match=failure_message): + SlurmComputeResourceSchema().load(compute_resource_config) + else: + ir_schema = SlurmComputeResourceSchema() + ir_obj = ir_schema.load(compute_resource_config) + ir_json = ir_schema.dump(ir_obj) + + logging.debug("Rendered resource: ", ir_json) + + +@pytest.mark.parametrize( + "instance_requirement_config, failure_message", + [ + pytest.param( + {"MinvCpus": 1, "MaxvCpus": 5, "MinMemoryMib": 1, "MaxMemoryMib": 5}, + None, + id="Using none of them is correct", + ), + pytest.param( + { + "MinvCpus": 1, + "MaxvCpus": 5, + "MinMemoryMib": 1, + "MaxMemoryMib": 5, + "AllowedInstanceTypes": ["in1", "in2"], + }, + None, + id="Using only AllowedInstanceTypes is correct", + ), + pytest.param( + { + "MinvCpus": 1, + "MaxvCpus": 5, + "MinMemoryMib": 1, + "MaxMemoryMib": 5, + "ExcludedInstanceTypes": ["in1", "in2"], + }, + None, + id="Using only ExcludedInstanceTypes is correct", + ), + pytest.param( + { + "MinvCpus": 1, + "MaxvCpus": 5, + "MinMemoryMib": 1, + "MaxMemoryMib": 5, + "AllowedInstanceTypes": ["in1", "in2"], + "ExcludedInstanceTypes": ["in1", "in2"], + }, + "Either AllowedInstanceTypes or ExcludedInstanceTypes can be used " "in InstanceRequirements definition.", + id="Using both AllowedInstanceTypes and ExcludedInstanceTypes should fail", + ), + ], +) +def test_allowed_and_excluded_instance_types_list_are_mutually_exclusive(instance_requirement_config, failure_message): + if failure_message: + with pytest.raises(ValidationError, match=failure_message): + InstanceRequirementsSchema().load(instance_requirement_config) + else: + ir_schema = InstanceRequirementsSchema() + ir_obj = ir_schema.load(instance_requirement_config) + ir_json = ir_schema.dump(ir_obj) + logging.debug("Rendered resource: ", ir_json) + + +@pytest.mark.parametrize( + "instance_requirement_config, should_be_set", + [ + pytest.param( + { + "MinvCpus": 1, + "MinMemoryMib": 1, + }, + False, + id="When AcceleratorCount is not set Manufacturer and Type should not be set", + ), + pytest.param( + {"MinvCpus": 1, "MinMemoryMib": 1, "AcceleratorCount": 0}, + False, + id="When AcceleratorCount is 0 Manufacturer and Type should not be set", + ), + pytest.param( + {"MinvCpus": 1, "MinMemoryMib": 1, "AcceleratorCount": 1}, + True, + id="When AcceleratorCount is > 0 Manufacturer and Type should be set", + ), + ], +) +def test_when_setting_accelerator_count_also_manufacturer_and_type_should_be_set( + instance_requirement_config, should_be_set +): + ir_schema = InstanceRequirementsSchema() + ir_obj = ir_schema.load(instance_requirement_config) + ir_json = ir_schema.dump(ir_obj) + logging.debug("Rendered resource: ", ir_json) + + if should_be_set: + assert ["gpu"] == ir_obj.accelerator_types + assert ["nvidia"] == ir_obj.accelerator_manufacturers + else: + assert ir_obj.accelerator_types is None + assert ir_obj.accelerator_manufacturers is None + + +def test_default_behavior_is_enforced(): + config = {"MinvCpus": 1, "MinMemoryMib": 1, "AcceleratorCount": 1} + ir_schema = InstanceRequirementsSchema() + ir_obj = ir_schema.load(config) + ir_json = ir_schema.dump(ir_obj) + logging.debug("Rendered resource: ", ir_json) + + assert ["included"] == ir_obj.bare_metal + assert ["current"] == ir_obj.instance_generations From fc97ed6ae3cf9d5a6e4470c9b09943079eb07457 Mon Sep 17 00:00:00 2001 From: Nicola Sirena Date: Wed, 21 Jun 2023 11:36:58 +0200 Subject: [PATCH 4/5] Update Changelog Signed-off-by: Nicola Sirena --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15b69e2834..205e00cfd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ CHANGELOG - Allow configuration of static and dynamic node priorities in Slurm compute resources via the ParallelCluster configuration YAML file. - Add support for Ubuntu 22. - Allow memory-based scheduling when multiple instance types are specified for a Slurm Compute Resource. +- Allow definition of a Slurm Compute Resource through InstanceRequirements. **CHANGES** - Assign Slurm dynamic nodes a priority (weight) of 1000 by default. This allows Slurm to prioritize idle static nodes over idle dynamic ones. From 2a45dac04356e5b38d02e8e1022f35e86316281a Mon Sep 17 00:00:00 2001 From: Nicola Sirena Date: Wed, 21 Jun 2023 16:37:33 +0200 Subject: [PATCH 5/5] Add logic to list instance-types matching Requirement set A set of InstanceRequirements may return a long list of instance-types (or none) depening on the attributes, the region and the architecture. Signed-off-by: Nicola Sirena --- cli/src/pcluster/aws/ec2.py | 18 ++++++++ cli/src/pcluster/config/cluster_config.py | 55 +++++++++++++++++++++-- 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/cli/src/pcluster/aws/ec2.py b/cli/src/pcluster/aws/ec2.py index c814520bd8..2630772f9f 100644 --- a/cli/src/pcluster/aws/ec2.py +++ b/cli/src/pcluster/aws/ec2.py @@ -487,3 +487,21 @@ def run_instances(self, **kwargs): except ClientError as e: if e.response.get("Error").get("Code") != "DryRunOperation": raise + + @AWSExceptionHandler.handle_client_exception + @Cache.cached + def get_instance_types_from_instance_requirements( + self, instance_requirements: str, architecture: str = "x86_64" + ) -> List[str]: + """Get list of instance types matching a set of instance_requirements.""" + config = { + "ArchitectureTypes": [architecture], + "VirtualizationTypes": ["hvm"], + "InstanceRequirements": instance_requirements, + } + + response = self._client.get_instance_types_from_instance_requirements(config) + if "InstanceTypes" in response: + return [res["InstanceType"] for res in response["InstanceTypes"]] + else: + return [] diff --git a/cli/src/pcluster/config/cluster_config.py b/cli/src/pcluster/config/cluster_config.py index 85333b4e32..d197df9349 100644 --- a/cli/src/pcluster/config/cluster_config.py +++ b/cli/src/pcluster/config/cluster_config.py @@ -2207,6 +2207,46 @@ def __init__( self.accelerator_types = None self.accelerator_manufacturers = None + def _vcpu_config(self): + config = { + "Min": self.min_vcpus, + } + if self.max_vcpus > 0: + config["Max"] = self.max_vcpus + + return config + + def _mem_config(self): + config = { + "Min": self.min_memory_mib, + } + if self.max_memory_mib > 0: + config["Max"] = self.max_memory_mib + + return config + + def config(self): + """Compiles an InstanceRequirement config to retrieve the list of matching instance-types.""" + config = { + "VCpuCount": self._vcpu_config(), + "MemoryMiB": self._mem_config(), + "InstanceGenerations": self.instance_generations, + "BareMetal": self.bare_metal, + "MaxPricePercentageOverLowestPrice": self.bare_metal, + } + + if self.accelerator_count > 0: + config["AcceleratorCount"] = self.accelerator_count + config["AcceleratorTypes"] = self.accelerator_types + config["AcceleratorManufacturers"] = self.max_price_percentage + + if self.allowed_instance_types: + config["AllowedInstanceTypes"] = self.allowed_instance_types + elif self.excluded_instance_types: + config["AllowedInstanceTypes"] = self.allowed_instance_types + + return config + class InstanceRequirementsComputeResource(_BaseSlurmComputeResource): """Represents a Slurm Compute Resource defined through Instance Requirements.""" @@ -2218,7 +2258,7 @@ def __init__( ): super().__init__(**kwargs) self.instance_requirements = Resource.init_param(instance_requirements) - self.instance_type_list = [] + self.instance_type_list = None @property def disable_simultaneous_multithreading_manually(self) -> bool: @@ -2237,11 +2277,20 @@ def max_network_interface_count(self) -> int: """ return 1 + def get_matching_instance_type_list(self, architecture): + """Return the list of instance types matching the Requirements for a given architecture.""" + # TODO add a mechanism to discover the architecture at ComputeResource level + # it should get the HeadNode architecture that wins over the CR config + # Currently we receive if from the outside (Validator) and delegate the burden to it + if self.instance_type_list is None: + self.instance_type_list = AWSApi.instance().ec2.get_instance_types_from_instance_requirements( + self.instance_requirements.config(), architecture + ) + return self.instance_type_list + @property def instance_types(self) -> List[str]: """Should Return list of instance type names in this compute resource.""" - # TODO (singleton) retrieve once the list of instance-types derived from the requirements with - # get-instance-types-from-instance-requirements and fill the list return self.instance_type_list