Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[develop] Introduce InstanceRequirements schema and resource #5419

Draft
wants to merge 5 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ CHANGELOG
- Allow configuration of static and dynamic node priorities in Slurm compute resources via the ParallelCluster configuration YAML file.
- Add support for Ubuntu 22.
- Allow memory-based scheduling when multiple instance types are specified for a Slurm Compute Resource.
- Allow definition of a Slurm Compute Resource through InstanceRequirements.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: InstanceRequirements


**CHANGES**
- Assign Slurm dynamic nodes a priority (weight) of 1000 by default. This allows Slurm to prioritize idle static nodes over idle dynamic ones.
Expand Down
18 changes: 18 additions & 0 deletions cli/src/pcluster/aws/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,21 @@ def run_instances(self, **kwargs):
except ClientError as e:
if e.response.get("Error").get("Code") != "DryRunOperation":
raise

@AWSExceptionHandler.handle_client_exception
@Cache.cached
def get_instance_types_from_instance_requirements(
self, instance_requirements: str, architecture: str = "x86_64"
) -> List[str]:
"""Get list of instance types matching a set of instance_requirements."""
config = {
"ArchitectureTypes": [architecture],
"VirtualizationTypes": ["hvm"],
"InstanceRequirements": instance_requirements,
}

response = self._client.get_instance_types_from_instance_requirements(config)
if "InstanceTypes" in response:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could write:

instance_types = response.get("InstanceTypes", [])
return [res.get("InstanceType") for res in instance_types]

or:

return [instance_types.get("InstanceType") for instance_types in response.get("InstanceTypes", [])]

return [res["InstanceType"] for res in response["InstanceTypes"]]
else:
return []
127 changes: 127 additions & 0 deletions cli/src/pcluster/config/cluster_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2167,6 +2167,133 @@ def max_network_interface_count(self) -> int:
return least_max_nics


class InstanceRequirementsDefinition(Resource):
"""Collects the attributes required to define a Slurm Compute Resource through Instance Requirements."""

def __init__(
self,
min_vcpus,
min_memory_mib,
max_vcpus=0,
max_memory_mib=0,
accelerator_count=0,
max_price_percentage=50,
allowed_instance_types: List[str] = None,
excluded_instance_types: List[str] = None,
accelerator_types: List[str] = None,
accelerator_manufacturers: List[str] = None,
bare_metal: List[str] = None,
instance_generations: List[str] = None,
**kwargs,
):
super().__init__(**kwargs)
self.min_vcpus = Resource.init_param(min_vcpus)
self.max_vcpus = Resource.init_param(max_vcpus)
self.min_memory_mib = Resource.init_param(min_memory_mib)
self.max_memory_mib = Resource.init_param(max_memory_mib)
self.accelerator_count = Resource.init_param(accelerator_count)
self.max_price_percentage = Resource.init_param(max_price_percentage)
self.allowed_instance_types = Resource.init_param(allowed_instance_types)
self.excluded_instance_types = Resource.init_param(excluded_instance_types)
self.bare_metal = bare_metal
self.instance_generations = instance_generations
self.accelerator_types = accelerator_types
self.accelerator_manufacturers = accelerator_manufacturers
# Enforce desired default behavior
if self.accelerator_count > 0:
self.accelerator_types = ["gpu"]
self.accelerator_manufacturers = ["nvidia"]
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you're forcing a specific behaviour you can avoid the set in the previous lines, that is useless:

        self.accelerator_types = accelerator_types
        self.accelerator_manufacturers = accelerator_manufacturers

self.accelerator_types = None
self.accelerator_manufacturers = None

def _vcpu_config(self):
config = {
"Min": self.min_vcpus,
}
if self.max_vcpus > 0:
config["Max"] = self.max_vcpus

return config

def _mem_config(self):
config = {
"Min": self.min_memory_mib,
}
if self.max_memory_mib > 0:
config["Max"] = self.max_memory_mib

return config

def config(self):
"""Compiles an InstanceRequirement config to retrieve the list of matching instance-types."""
config = {
"VCpuCount": self._vcpu_config(),
"MemoryMiB": self._mem_config(),
"InstanceGenerations": self.instance_generations,
"BareMetal": self.bare_metal,
"MaxPricePercentageOverLowestPrice": self.bare_metal,
}

if self.accelerator_count > 0:
config["AcceleratorCount"] = self.accelerator_count
config["AcceleratorTypes"] = self.accelerator_types
config["AcceleratorManufacturers"] = self.max_price_percentage

if self.allowed_instance_types:
config["AllowedInstanceTypes"] = self.allowed_instance_types
elif self.excluded_instance_types:
config["AllowedInstanceTypes"] = self.allowed_instance_types

return config


class InstanceRequirementsComputeResource(_BaseSlurmComputeResource):
"""Represents a Slurm Compute Resource defined through Instance Requirements."""

def __init__(
self,
instance_requirements: InstanceRequirementsDefinition,
**kwargs,
):
super().__init__(**kwargs)
self.instance_requirements = Resource.init_param(instance_requirements)
self.instance_type_list = None

@property
def disable_simultaneous_multithreading_manually(self) -> bool:
"""Return true if simultaneous multithreading must be disabled with a cookbook script."""
return self.disable_simultaneous_multithreading

@property
def max_network_interface_count(self) -> int:
"""Return max number of NICs for the compute resource.

Currently customers are not allowed to specify 'NetworkInterfaceCount' as requirement so this method is
a placeholder for future improvements.
An implementation is required since it is an abstract method of the base class.
It is meant to be used in the validators.
For MVP the validator will list the instance types returned by CreateFlet and pick the minimum common value.
"""
return 1

def get_matching_instance_type_list(self, architecture):
"""Return the list of instance types matching the Requirements for a given architecture."""
# TODO add a mechanism to discover the architecture at ComputeResource level
# it should get the HeadNode architecture that wins over the CR config
# Currently we receive if from the outside (Validator) and delegate the burden to it
if self.instance_type_list is None:
self.instance_type_list = AWSApi.instance().ec2.get_instance_types_from_instance_requirements(
self.instance_requirements.config(), architecture
)
return self.instance_type_list

@property
def instance_types(self) -> List[str]:
"""Should Return list of instance type names in this compute resource."""
return self.instance_type_list


class SlurmComputeResource(_BaseSlurmComputeResource):
"""Represents a Slurm Compute Resource with a Single Instance Type."""

Expand Down
108 changes: 105 additions & 3 deletions cli/src/pcluster/schemas/cluster_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
Iam,
Image,
Imds,
InstanceRequirementsComputeResource,
InstanceRequirementsDefinition,
IntelSoftware,
LocalStorage,
LoginNodes,
Expand Down Expand Up @@ -1224,6 +1226,98 @@ def make_resource(self, data, **kwargs):
return FlexibleInstanceType(**data)


class InstanceRequirementsSchema(BaseSchema):
"""Represent the schema of InstanceRequirements for Compute Resources."""

# Customer Facing parameters
min_vcpus = fields.Int(
required=True,
data_key="MinvCpus",
validate=validate.Range(min=0),
metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY},
)

max_vcpus = fields.Int(
data_key="MaxvCpus",
validate=validate.Range(min=0), #
metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY},
)

min_memory_mib = fields.Int(
required=True,
data_key="MinMemoryMib",
validate=validate.Range(min=0),
metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY},
)

max_memory_mib = fields.Int(
data_key="MaxMemoryMib",
validate=validate.Range(min=0),
metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY},
)

accelerator_count = fields.Int(
data_key="AcceleratorCount",
validate=validate.Range(min=0),
metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY},
)

max_price_percentage = fields.Int(
data_key="MaxPricePercentageOverLowestPrice",
validate=validate.Range(min=0),
metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY},
)

allowed_instance_types = fields.List(
fields.Str(), data_key="AllowedInstanceTypes", metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY}
)

excluded_instance_types = fields.List(
fields.Str(), data_key="ExcludedInstanceTypes", metadata={"update_policy": UpdatePolicy.QUEUE_UPDATE_STRATEGY}
)

# Internal (non customer facing) parameters; used to adapt the default behavior
accelerator_types = fields.List(
fields.Str(),
data_key="AcceleratorTypes",
)

accelerator_manufacturers = fields.List(
fields.Str(),
data_key="AcceleratorManufacturers",
)

bare_metal = fields.List(
fields.Str(),
missing=["included"], # forcing the desired behavior
data_key="BareMetal",
)

instance_generations = fields.List(
fields.Str(),
missing=["current"], # forcing the desired behavior
data_key="InstanceGenerations",
)

@validates_schema
def only_only_list_of_istances_is_allowed(self, data, **kwargs):
"""Validate that 'allowed_instance_types' and 'excluded_instance_types' do not co-exist."""
if self.fields_coexist(
data,
["allowed_instance_types", "excluded_instance_types"],
one_required=False,
**kwargs,
):
raise ValidationError(
"Either AllowedInstanceTypes or ExcludedInstanceTypes can be used in InstanceRequirements definition."
)

@post_load
def make_resource(self, data, **kwargs):
"""Generate resource."""
return InstanceRequirementsDefinition(**data)


class HeadNodeSchema(BaseSchema):
"""Represent the schema of the HeadNode."""

Expand Down Expand Up @@ -1359,6 +1453,9 @@ class SlurmComputeResourceSchema(_ComputeResourceSchema):
many=True,
metadata={"update_policy": UpdatePolicy.COMPUTE_FLEET_STOP_ON_REMOVE, "update_key": "InstanceType"},
)
instance_requirements = fields.Nested(
InstanceRequirementsSchema,
)
max_count = fields.Int(validate=validate.Range(min=1), metadata={"update_policy": UpdatePolicy.MAX_COUNT})
min_count = fields.Int(validate=validate.Range(min=0), metadata={"update_policy": UpdatePolicy.COMPUTE_FLEET_STOP})
spot_price = fields.Float(
Expand Down Expand Up @@ -1392,11 +1489,13 @@ def no_coexist_instance_type_flexibility(self, data, **kwargs):
"""Validate that 'instance_type' and 'instances' do not co-exist."""
if self.fields_coexist(
data,
["instance_type", "instances"],
["instance_type", "instances", "instance_requirements"],
one_required=True,
**kwargs,
):
raise ValidationError("A Compute Resource needs to specify either InstanceType or Instances.")
raise ValidationError(
"A Compute Resource needs to specify either InstanceType, Instances or InstanceRequirements."
)

@validates("instances")
def no_duplicate_instance_types(self, flexible_instance_types: List[FlexibleInstanceType]):
Expand All @@ -1422,7 +1521,10 @@ def make_resource(self, data, **kwargs):
"""Generate resource."""
if data.get("instances"):
return SlurmFlexibleComputeResource(**data)
return SlurmComputeResource(**data)
elif data.get("instance_requirements"):
return InstanceRequirementsComputeResource(**data)
else:
return SlurmComputeResource(**data)


class AwsBatchComputeResourceSchema(_ComputeResourceSchema):
Expand Down
4 changes: 2 additions & 2 deletions cli/tests/pcluster/schemas/test_cluster_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ def test_scheduling_schema(mocker, config_dict, failure_message):
}
],
},
"A Compute Resource needs to specify either InstanceType or Instances.",
"A Compute Resource needs to specify either InstanceType, Instances or InstanceRequirements.",
),
# Mixing InstanceType and Instances in a Compute Resource should return a validation error
(
Expand All @@ -502,7 +502,7 @@ def test_scheduling_schema(mocker, config_dict, failure_message):
},
],
},
"A Compute Resource needs to specify either InstanceType or Instances.",
"A Compute Resource needs to specify either InstanceType, Instances or InstanceRequirements.",
),
# Instances in a Compute Resource should not have duplicate instance types
(
Expand Down
Loading