From 9ed2f92205fc40fc31d9bf79bc751f8659fc6691 Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Wed, 22 Nov 2023 22:44:43 +0100 Subject: [PATCH] fix: reduce memory consumption of Policy Server Reduce the memory consumption of Policy Server when multiple instances of the same Wasm module are loaded. Thanks to this change, a worker will have only once instance of `PolicyEvaluator` (hence of wasmtime stack), per unique type of module. Literally speaking, if a user has the `apparmor` policy deployed 5 times (different names, settings,...) only one instance of `PolicyEvaluator` will be allocated for it. Note: the optimization works at the worker level. Meaning that `PolicyEvaluator` are NOT sharing these instances between themselves. This commit helps to address issue https://github.com/kubewarden/kubewarden-controller/issues/528 Signed-off-by: Flavio Castelli --- Cargo.lock | 244 +++++++++++--- Cargo.toml | 5 +- src/settings.rs | 3 +- src/workers/evaluation_environment.rs | 293 +++++++++++++++++ src/workers/mod.rs | 6 + src/workers/policy_evaluation_settings.rs | 15 + src/workers/pool.rs | 88 ++++- src/workers/worker.rs | 383 ++++++++++------------ 8 files changed, 763 insertions(+), 274 deletions(-) create mode 100644 src/workers/evaluation_environment.rs create mode 100644 src/workers/policy_evaluation_settings.rs diff --git a/Cargo.lock b/Cargo.lock index 34bb1309..54d8eac5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,16 +186,16 @@ dependencies = [ [[package]] name = "async-global-executor" -version = "2.3.1" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +checksum = "9b4353121d5644cdf2beb5726ab752e79a8db1ebb52031770ec47db31d245526" dependencies = [ - "async-channel 1.9.0", + "async-channel 2.1.0", "async-executor", - "async-io", - "async-lock 2.8.0", + "async-io 2.2.0", + "async-lock 3.1.1", "blocking", - "futures-lite 1.13.0", + "futures-lite 2.0.1", "once_cell", ] @@ -212,13 +212,33 @@ dependencies = [ "futures-lite 1.13.0", "log", "parking", - "polling", + "polling 2.8.0", "rustix 0.37.27", "slab", "socket2 0.4.10", "waker-fn", ] +[[package]] +name = "async-io" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41ed9d5715c2d329bf1b4da8d60455b99b187f27ba726df2883799af9af60997" +dependencies = [ + "async-lock 3.1.1", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite 2.0.1", + "parking", + "polling 3.3.0", + "rustix 0.38.25", + "slab", + "tracing", + "waker-fn", + "windows-sys 0.48.0", +] + [[package]] name = "async-lock" version = "2.8.0" @@ -247,7 +267,7 @@ checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" dependencies = [ "async-channel 1.9.0", "async-global-executor", - "async-io", + "async-io 1.13.0", "async-lock 2.8.0", "crossbeam-utils", "futures-channel", @@ -483,7 +503,7 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "burrego" version = "0.3.4" -source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.12.2#f43d8b2b9f21b4f9053058fc957649b8bbe7551e" +source = "git+https://github.com/kubewarden/policy-evaluator?rev=da91fae49e29914e458d9ac4f09e0676c38764c0#da91fae49e29914e458d9ac4f09e0676c38764c0" dependencies = [ "base64 0.21.5", "chrono", @@ -1178,6 +1198,12 @@ dependencies = [ "serde", ] +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.10.7" @@ -1281,6 +1307,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dyn-clone" version = "1.0.16" @@ -1478,6 +1510,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1486,13 +1527,19 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fs-set-times" version = "0.20.0" @@ -2018,6 +2065,16 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "idna" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "if_chain" version = "1.0.2" @@ -2524,6 +2581,45 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockall" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "mockall_double" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae71c7bb287375187c775cf82e2dcf1bef3388aaf58f0789a77f9c7ab28466f6" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "multer" version = "2.1.0" @@ -2548,6 +2644,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27b02d87554356db9e9a873add8782d4ea6e3e58ea071a9adb9a2e8ddb884a8b" +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -3071,9 +3173,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "phf" @@ -3277,7 +3379,7 @@ checksum = "14e6ab3f592e6fb464fc9712d8d6e6912de6473954635fd76a589d832cffcbb0" [[package]] name = "policy-evaluator" version = "0.12.2" -source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.12.2#f43d8b2b9f21b4f9053058fc957649b8bbe7551e" +source = "git+https://github.com/kubewarden/policy-evaluator?rev=da91fae49e29914e458d9ac4f09e0676c38764c0#da91fae49e29914e458d9ac4f09e0676c38764c0" dependencies = [ "anyhow", "base64 0.21.5", @@ -3358,6 +3460,8 @@ dependencies = [ "itertools 0.12.0", "k8s-openapi", "lazy_static", + "mockall", + "mockall_double", "num_cpus", "opentelemetry", "opentelemetry-otlp", @@ -3396,6 +3500,20 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "polling" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e53b6af1f60f36f8c2ac2aad5459d75a5a9b4be1e8cdd40264f315d78193e531" +dependencies = [ + "cfg-if", + "concurrent-queue", + "pin-project-lite", + "rustix 0.38.25", + "tracing", + "windows-sys 0.48.0", +] + [[package]] name = "poly1305" version = "0.8.0" @@ -3419,6 +3537,36 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "predicates" +version = "2.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" +dependencies = [ + "difflib", + "float-cmp", + "itertools 0.10.5", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "primeorder" version = "0.13.6" @@ -3692,7 +3840,6 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls", - "rustls-native-certs", "rustls-pemfile", "serde", "serde_json", @@ -3752,9 +3899,9 @@ dependencies = [ [[package]] name = "rsa" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ef35bf3e7fe15a53c4ab08a998e42271eab13eb0db224126bc7bc4c4bad96d" +checksum = "6a3211b01eea83d80687da9eef70e39d65144a3894866a5153a2723e425a157f" dependencies = [ "const-oid", "digest", @@ -4023,9 +4170,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.192" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" dependencies = [ "serde_derive", ] @@ -4051,9 +4198,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.192" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", @@ -4467,6 +4614,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.50" @@ -4930,12 +5083,12 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", - "idna", + "idna 0.5.0", "percent-encoding", "serde", ] @@ -4954,9 +5107,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58fe91d841bc04822c9801002db4ea904b9e4b8e6bbad25127b46eff8dc516b" +checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" [[package]] name = "validator" @@ -4964,7 +5117,7 @@ version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b92f40481c04ff1f4f61f304d61793c7b56ff76ac1469f1beb199b1445b253bd" dependencies = [ - "idna", + "idna 0.4.0", "lazy_static", "regex", "serde", @@ -5221,9 +5374,9 @@ dependencies = [ [[package]] name = "wasm-encoder" -version = "0.37.0" +version = "0.38.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d135e8940b69dbee0f5b0a0be9c1cd6fa8b71d774904c13a3fcfc5dc265e43d" +checksum = "7b09bc5df933a3dabbdb72ae4b6b71be8ae07f58774d5aa41bd20adcd41a235a" dependencies = [ "leb128", ] @@ -5251,17 +5404,6 @@ dependencies = [ "semver", ] -[[package]] -name = "wasmparser" -version = "0.117.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b206de0c992af9f0b51ef2fb9455623e0a19eb68f172cd8ba9cd0e46637f5ab" -dependencies = [ - "hashbrown 0.14.2", - "indexmap 2.1.0", - "semver", -] - [[package]] name = "wasmparser" version = "0.118.0" @@ -5274,12 +5416,12 @@ dependencies = [ [[package]] name = "wasmprinter" -version = "0.2.73" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a4fdb34710b461c868c3f79a10a48b404f23b46fd471ab02bcaa60fd96c5c4b" +checksum = "61a7a046e6636d25c06a5df00bdc34e02f9e6e0e8a356d738299b961a6126114" dependencies = [ "anyhow", - "wasmparser 0.117.0", + "wasmparser 0.118.0", ] [[package]] @@ -5654,23 +5796,23 @@ dependencies = [ [[package]] name = "wast" -version = "68.0.0" +version = "69.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bf3081ac6bcb3a5b72a401693b3566feb529dc2b7e7b62ea544c8a30d0f4d05" +checksum = "efa51b5ad1391943d1bfad537e50f28fe938199ee76b115be6bae83802cd5185" dependencies = [ "leb128", "memchr", "unicode-width", - "wasm-encoder 0.37.0", + "wasm-encoder 0.38.0", ] [[package]] name = "wat" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fabe07d22a837b3bd5662ba9e980d73de115c040923659a1801934c7ccebe49" +checksum = "74a4c2488d058326466e086a43f5d4ea448241a8d0975e3eb0642c0828be1eb3" dependencies = [ - "wast 68.0.0", + "wast 69.0.0", ] [[package]] @@ -5712,9 +5854,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.2" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" +checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" [[package]] name = "wiggle" diff --git a/Cargo.toml b/Cargo.toml index 424a14f2..caa0ad3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,8 @@ opentelemetry = { version = "0.21", default-features = false, features = [ ] } opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] } procfs = "0.16" -policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.12.2" } +#policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.12.2" } +policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", rev = "da91fae49e29914e458d9ac4f09e0676c38764c0" } rayon = "1.8" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } @@ -44,7 +45,9 @@ warp = { version = "0.3.6", default_features = false, features = [ "tls", ] } semver = { version = "1.0.20", features = ["serde"] } +mockall_double = "0.3" [dev-dependencies] +mockall = "0.11" rstest = "0.18" tempfile = "3.8.1" diff --git a/src/settings.rs b/src/settings.rs index a9ba2a6c..b955423d 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, Result}; +use policy_evaluator::policy_evaluator::PolicySettings; use policy_evaluator::policy_metadata::ContextAwareResource; use serde::Deserialize; use serde_yaml::Value; @@ -39,7 +40,7 @@ pub struct Policy { } impl Policy { - pub fn settings_to_json(&self) -> Result>> { + pub fn settings_to_json(&self) -> Result> { match self.settings.as_ref() { None => Ok(None), Some(settings) => { diff --git a/src/workers/evaluation_environment.rs b/src/workers/evaluation_environment.rs new file mode 100644 index 00000000..dbfd4c08 --- /dev/null +++ b/src/workers/evaluation_environment.rs @@ -0,0 +1,293 @@ +use anyhow::{anyhow, Result}; +use policy_evaluator::{ + admission_response::AdmissionResponse, callback_requests::CallbackRequest, + evaluation_context::EvaluationContext, policy_evaluator::PolicyEvaluator, + policy_evaluator_builder::PolicyEvaluatorBuilder, wasmtime, +}; +use std::collections::HashMap; +use tokio::sync::mpsc; + +use crate::communication::EvalRequest; +use crate::settings::PolicyMode; +use crate::workers::{ + policy_evaluation_settings::PolicyEvaluationSettings, + precompiled_policy::{PrecompiledPolicies, PrecompiledPolicy}, +}; + +#[cfg(test)] +use mockall::automock; + +/// This structure contains all the policies defined by the user inside of the `policies.yml`. +/// It also provides helper methods to perform the validation of a request and the validation +/// of the settings provided by the user. +/// +/// Each worker has its own dedicated instance of this structure. +/// At the worker level, the ultimate goal is to avoid duplicated instances of `PolicyEvaluator`. +/// That means that, given two or more identical Wasm modules, only one `PolicyEvaluator` +/// should be created. This is required to avoid a waste of memory by the Policy Server +/// process. +/// +/// Note: the `PolicyEvaluator` instances will still be duplicated across each worker. This is +/// something we have to deal with. +#[derive(Default)] +#[cfg_attr(test, allow(dead_code))] +pub(crate) struct EvaluationEnvironment { + /// Unique ID of the worker + worker_id: u64, + + /// The name of the Namespace where Policy Server doesn't operate. All the requests + /// involving this Namespace are going to be accepted. This is usually done to prevent user + /// policies from messing with the components of the Kubewarden stack (which are all + /// deployed inside of the same Namespace). + always_accept_admission_reviews_on_namespace: Option, + + /// A map with the unique ID of a Wasm module as key, and the associated `PolicyEvaluator` + /// instance as value. + /// Currently we the `module_id` is obtained by computing the sha255 digest of the + /// optimized Wasm module. + /// This dictionary allows us to reduce by amount of memory consumed by Policy Server. + module_id_to_evaluator: HashMap, + + /// A map with the ID of the policy as value, and the associated `EvaluationContext` as + /// value. + /// In this case, `policy_id` is the name of the policy as declared inside of the + /// `policies.yml` file. These names are guaranteed to be unique. + policy_id_to_eval_ctx: HashMap, + + /// Map a `policy_id` (the name given by the user inside of `policies.yml`) to the + /// `module_id`. This allows us to deduplicate the Wasm modules defined by the user. + policy_id_to_module_id: HashMap, + + /// Map a `policy_id` to the `PolicyEvaluationSettings` instance. This allows us to obtain + /// the list of settings to be used when evaluating a given policy. + policy_id_to_settings: HashMap, +} + +#[cfg_attr(test, automock)] +#[cfg_attr(test, allow(dead_code))] +impl EvaluationEnvironment { + /// Creates a new `EvaluationEnvironment` + pub(crate) fn new( + worker_id: u64, + always_accept_admission_reviews_on_namespace: Option, + ) -> Self { + Self { + worker_id, + always_accept_admission_reviews_on_namespace, + ..Default::default() + } + } + + /// Returns `true` if the given `namespace` is the special Namespace that is ignored by all + /// the policies + pub(crate) fn should_always_accept_requests_made_inside_of_namespace( + &self, + namespace: &str, + ) -> bool { + self.always_accept_admission_reviews_on_namespace.as_deref() == Some(namespace) + } + + /// Register a new policy. It takes care of creating a new `PolicyEvaluator` (when needed). + /// + /// Params: + /// - `policy_id`: the ID of the policy, as specified inside of the `policies.yml` by the + /// user + /// - `policy`: a data structure that maps all the information defined inside of + /// `policies.yml` for the given policy + /// - `engine`: the `wasmtime::Engine` to be used when creating the `PolicyEvaluator` + /// - `policy_modules`: all the `wasmtime::Module` precompiled for the current + /// OS/architecture + /// - `callback_handler_tx`: the transmission end of a channel that connects the worker + /// with the asynchronous world + /// - `policy_evaluation_limit_seconds`: when set, defines after how many seconds the + /// policy evaluation is interrupted + pub(crate) fn register( + &mut self, + policy_id: &str, + policy: &crate::settings::Policy, + engine: &wasmtime::Engine, + policy_modules: &PrecompiledPolicies, + callback_handler_tx: mpsc::Sender, + policy_evaluation_limit_seconds: Option, + ) -> Result<()> { + let precompiled_policy = policy_modules.get(policy.url.as_str()).ok_or_else(|| { + anyhow!( + "could not find preoptimized module for policy: {:?}", + policy.url + ) + })?; + let module_id = precompiled_policy.digest.clone(); + + if !self.module_id_to_evaluator.contains_key(&module_id) { + let evaluator = create_policy_evaluator( + policy_id, + self.worker_id, + policy, + engine, + precompiled_policy, + callback_handler_tx.clone(), + policy_evaluation_limit_seconds, + )?; + self.module_id_to_evaluator + .insert(module_id.clone(), evaluator); + } + self.policy_id_to_module_id + .insert(policy_id.to_owned(), module_id); + + let policy_eval_settings = PolicyEvaluationSettings { + policy_mode: policy.policy_mode.clone(), + allowed_to_mutate: policy.allowed_to_mutate.unwrap_or(false), + settings: policy.settings_to_json()?.unwrap_or_default(), + }; + self.policy_id_to_settings + .insert(policy_id.to_owned(), policy_eval_settings); + + let eval_ctx = EvaluationContext { + policy_id: policy_id.to_owned(), + callback_channel: Some(callback_handler_tx.clone()), + ctx_aware_resources_allow_list: policy.context_aware_resources.clone(), + }; + self.policy_id_to_eval_ctx + .insert(policy_id.to_owned(), eval_ctx); + + Ok(()) + } + + /// Given a policy ID, return how the policy operates + pub fn get_policy_mode(&self, policy_id: &str) -> Result { + self.policy_id_to_settings + .get(policy_id) + .map(|settings| settings.policy_mode.clone()) + .ok_or(anyhow!("cannot find policy with ID {policy_id}")) + } + + /// Given a policy ID, returns true if the policy is allowed to mutate + pub fn get_policy_allowed_to_mutate(&self, policy_id: &str) -> Result { + self.policy_id_to_settings + .get(policy_id) + .map(|settings| settings.allowed_to_mutate) + .ok_or(anyhow!("cannot find policy with ID {policy_id}")) + } + + /// Given a policy ID and a request to be processed, uses the `PolicyEvaluator` to perform + /// a validation operation. + pub fn validate(&mut self, policy_id: &str, req: &EvalRequest) -> Result { + let settings = self.policy_id_to_settings.get(policy_id).ok_or(anyhow!( + "cannot find settings for policy with ID {policy_id}" + ))?; + + let module_id = self.policy_id_to_module_id.get(policy_id).ok_or(anyhow!( + "cannot find module_id for policy with ID {policy_id}" + ))?; + let evaluator = self + .module_id_to_evaluator + .get_mut(module_id) + .ok_or(anyhow!( + "cannot find evaluator for policy with ID {policy_id}" + ))?; + + let eval_ctx = self.policy_id_to_eval_ctx.get(policy_id).ok_or(anyhow!( + "cannot find evaluation context for policy with ID {policy_id}" + ))?; + + Ok(evaluator.validate(req.req.clone(), &settings.settings, eval_ctx)) + } +} + +/// Internal function, takes care of creating the `PolicyEvaluator` instance for the given policy +fn create_policy_evaluator( + policy_id: &str, + worker_id: u64, + policy: &crate::settings::Policy, + engine: &wasmtime::Engine, + precompiled_policy: &PrecompiledPolicy, + callback_handler_tx: mpsc::Sender, + policy_evaluation_limit_seconds: Option, +) -> Result { + // See `wasmtime::Module::deserialize` to know why this method is `unsafe`. + // However, in our context, nothing bad will happen because we have + // full control of the precompiled module. This is generated by the + // WorkerPool thread + let module = + unsafe { wasmtime::Module::deserialize(engine, &precompiled_policy.precompiled_module) } + .map_err(|e| { + anyhow!( + "could not rehydrate wasmtime::Module {}: {:?}", + policy.url, + e + ) + })?; + + let mut policy_evaluator_builder = + PolicyEvaluatorBuilder::new(policy_id.to_string(), worker_id) + .engine(engine.clone()) + .policy_module(module) + .context_aware_resources_allowed(policy.context_aware_resources.clone()) + .callback_channel(callback_handler_tx) + .execution_mode(precompiled_policy.execution_mode); + + if let Some(limit) = policy_evaluation_limit_seconds { + policy_evaluator_builder = + policy_evaluator_builder.enable_epoch_interruptions(limit, limit); + } + + policy_evaluator_builder.build() +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + + use super::*; + use crate::settings::Policy; + + /// Given to identical wasm modules, only one instance of PolicyEvaluator is going to be + /// created + #[test] + fn avoid_duplicated_instaces_of_policy_evaluator() { + let engine = wasmtime::Engine::default(); + let policy_ids = vec!["policy_1", "policy_2"]; + let module = wasmtime::Module::new(&engine, "(module (func))") + .expect("should be able to build the smallest wasm module ever"); + let (callback_handler_tx, _) = mpsc::channel(10); + + let precompiled_policy = PrecompiledPolicy { + precompiled_module: module.serialize().unwrap(), + execution_mode: policy_evaluator::policy_evaluator::PolicyExecutionMode::Wasi, + digest: "unique-digest".to_string(), + }; + + let mut policies = HashMap::new(); + let mut policy_modules = HashMap::new(); + + for policy_id in &policy_ids { + policies.insert( + policy_id.to_owned(), + Policy { + url: policy_id.to_string(), + policy_mode: PolicyMode::Protect, + allowed_to_mutate: None, + settings: None, + context_aware_resources: BTreeSet::new(), + }, + ); + policy_modules.insert(policy_id.to_string(), precompiled_policy.clone()); + } + + let mut evaluation_environment = EvaluationEnvironment::new(0, None); + for policy_id in policy_ids { + evaluation_environment + .register( + policy_id, + &policies[policy_id], + &engine, + &policy_modules, + callback_handler_tx.clone(), + None, + ) + .unwrap(); + } + + assert_eq!(evaluation_environment.module_id_to_evaluator.len(), 1); + } +} diff --git a/src/workers/mod.rs b/src/workers/mod.rs index 312639cd..ab2bb702 100644 --- a/src/workers/mod.rs +++ b/src/workers/mod.rs @@ -1,3 +1,9 @@ +mod evaluation_environment; +mod policy_evaluation_settings; pub(crate) mod pool; pub(crate) mod precompiled_policy; pub(crate) mod worker; + +// This is required to mock the `EvaluationEnvironment` inside of our tests +#[mockall_double::double] +pub(crate) use evaluation_environment::EvaluationEnvironment; diff --git a/src/workers/policy_evaluation_settings.rs b/src/workers/policy_evaluation_settings.rs new file mode 100644 index 00000000..0607e33e --- /dev/null +++ b/src/workers/policy_evaluation_settings.rs @@ -0,0 +1,15 @@ +use policy_evaluator::policy_evaluator::PolicySettings; + +use crate::settings::PolicyMode; + +/// Holds the evaluation settings of loaded Policy. These settings are taken straight from the +/// `policies.yml` file provided by the user +#[cfg_attr(test, allow(dead_code))] +pub(crate) struct PolicyEvaluationSettings { + /// Whether the policy is operating in `protect` or `monitor` mode + pub(crate) policy_mode: PolicyMode, + /// Determines if a mutating policy is actually allowed to mutate + pub(crate) allowed_to_mutate: bool, + /// The policy-specific settings provided by the user + pub(crate) settings: PolicySettings, +} diff --git a/src/workers/pool.rs b/src/workers/pool.rs index 5547fbda..1eb43d87 100644 --- a/src/workers/pool.rs +++ b/src/workers/pool.rs @@ -1,16 +1,15 @@ use anyhow::{anyhow, Result}; use core::time; +use lazy_static::lazy_static; use policy_evaluator::{ - callback_requests::CallbackRequest, - policy_evaluator::{Evaluator, PolicyEvaluator}, - policy_evaluator_builder::PolicyEvaluatorBuilder, - wasmtime, + callback_requests::CallbackRequest, evaluation_context::EvaluationContext, + policy_evaluator::PolicyEvaluator, policy_evaluator_builder::PolicyEvaluatorBuilder, wasmtime, }; use rayon::prelude::*; use std::{ collections::HashMap, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, Barrier, RwLock, }, thread, @@ -27,15 +26,44 @@ use crate::workers::{ worker::Worker, }; +lazy_static! { + /// Used to create unique worker IDs + static ref WORKER_ID: Arc = Arc::new(AtomicU64::new(0)); +} + +/// Coordinates a set of workers. +/// Each worker takes care of performing the evaluation of the requests received by Policy Server +/// API endpoints. +/// +/// The HTTP API communicates with the worker pool via a dedicated chanel. The pool then assigns +/// the evaluation job to one of the workers. Currently this is done on a round-robin fashion. pub(crate) struct WorkerPool { + /// Channel used by the HTTP API to send to the pool the evaluation requests that have to be + /// processed api_rx: mpsc::Receiver, + + /// A oneshot channel used during the bootstrap phase. It's being used by the `main` to send + /// the data used to bootstrap of the workers. bootstrap_rx: oneshot::Receiver, + + /// The channel that connect the synchronous world of the workers with the tokio task that + /// handles all the async requests that might originate during the policy evaluation process. + /// For example, requesting Kubernetes resources, DNS resolution, Sigstore operations,... callback_handler_tx: mpsc::Sender, + + /// When set, this is the Namespace where all the policies do not apply. This is usually set + /// to be the Namespace where the Kubewarden is deployed; ensuring the user policies are not + /// going to interfere with the Kubewarden stack. always_accept_admission_reviews_on_namespace: Option, + + /// When set enables the policy timeout feature which prevents a rogue policy to enter an + /// endless loop/consume all the resources of a worker policy_evaluation_limit_seconds: Option, } impl WorkerPool { + /// Create a new `WorkerPool`, no bootstrap operation is done yet. This happens when invoking + /// the `run` method. pub(crate) fn new( bootstrap_rx: oneshot::Receiver, api_rx: mpsc::Receiver, @@ -52,9 +80,17 @@ impl WorkerPool { } } + /// Bootstrap the pool and then enter an endless loop that processes incoming requests. pub(crate) fn run(mut self) { + // The WorkerPool communicates with each worker over dedicated `mpsc::channel` (one per worker). + // This vector holds all the sender ends of these channels. let mut worker_tx_chans = Vec::>::new(); + + // Each worker has its own `wasmtime::Engine`, we have to keep track of them here because + // we have to increase their epoch tick to implement the policy timeout protection feature let mut worker_engines = Vec::::new(); + + // All the join handles of the spawned worker threads let mut join_handles = Vec::>>::new(); // Phase 1: wait for bootstrap data to be received by the main @@ -95,6 +131,9 @@ impl WorkerPool { } }; + // For each policy defined by the user, ensure the given settings are valid + // We exit with an error if one or more policies do not have valid + // settings. if let Err(error) = verify_policy_settings( &engine, &bootstrap_data.policies, @@ -133,8 +172,10 @@ impl WorkerPool { let (tx, rx) = mpsc::channel::(32); worker_tx_chans.push(tx); - // Each worker has its own wasmtime::Engine, sharing the + // Each worker has its own `wasmtime::Engine`, sharing the // same engine across all the workers leads to bad performance + // TODO: revisit this statement, it seems this issue has been solved by latest wasmtime + // releases let engine = match wasmtime::Engine::new(&wasmtime_config) { Ok(e) => e, Err(e) => { @@ -152,6 +193,8 @@ impl WorkerPool { return; } }; + // Note well: it's fast and cheap to clone a `wasmtime::Engine` as stated by the + // official docs. It's just a reference counter under the hood worker_engines.push(engine.clone()); let modules = precompiled_policies.clone(); @@ -165,7 +208,9 @@ impl WorkerPool { let join = thread::spawn(move || -> Result<()> { info!(spawned = n, total = pool_size, "spawning worker"); + let worker_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); let mut worker = match Worker::new( + worker_id, rx, &policies, &modules, @@ -203,6 +248,7 @@ impl WorkerPool { // meaning a lot of memory would have been consumed without a valid reason // during the whole execution time drop(precompiled_policies); + drop(policies); barrier.wait(); if !boot_canary.load(Ordering::SeqCst) { @@ -259,6 +305,7 @@ impl WorkerPool { pub(crate) fn build_policy_evaluator( policy_id: &str, + worker_id: u64, policy: &crate::settings::Policy, engine: &wasmtime::Engine, policy_modules: &PrecompiledPolicies, @@ -286,13 +333,13 @@ pub(crate) fn build_policy_evaluator( ) })?; - let mut policy_evaluator_builder = PolicyEvaluatorBuilder::new(policy_id.to_string()) - .engine(engine.clone()) - .policy_module(module) - .settings(policy.settings_to_json()?) - .context_aware_resources_allowed(policy.context_aware_resources.clone()) - .callback_channel(callback_handler_tx) - .execution_mode(policy_module.execution_mode); + let mut policy_evaluator_builder = + PolicyEvaluatorBuilder::new(policy_id.to_string(), worker_id) + .engine(engine.clone()) + .policy_module(module) + .context_aware_resources_allowed(policy.context_aware_resources.clone()) + .callback_channel(callback_handler_tx) + .execution_mode(policy_module.execution_mode); if let Some(limit) = policy_evaluation_limit_seconds { policy_evaluator_builder = @@ -345,6 +392,7 @@ fn precompile_policies( .collect()) } +/// Ensure the user provided valid settings for all the policies fn verify_policy_settings( engine: &wasmtime::Engine, policies: &HashMap, @@ -375,10 +423,15 @@ fn verify_policy_settings( }); } + // We have to create a worker_id because this is what the `PolicyEvaluator` constructor needs. + // In this case there's no actual `Worker`, we're going to run all the setting validation Wasm + // invocations inside of the thread of `WorkerPool` + let worker_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); let mut errors = vec![]; for (id, policy) in policies.iter() { let mut policy_evaluator = match build_policy_evaluator( id, + worker_id, policy, engine, policy_modules, @@ -391,7 +444,14 @@ fn verify_policy_settings( continue; } }; - let set_val_rep = policy_evaluator.validate_settings(); + let eval_ctx = EvaluationContext { + policy_id: id.to_owned(), + callback_channel: Some(callback_handler_tx.clone()), + ctx_aware_resources_allow_list: policy.context_aware_resources.clone(), + }; + + let set_val_rep = policy_evaluator + .validate_settings(&policy.settings_to_json()?.unwrap_or_default(), &eval_ctx); if !set_val_rep.valid { errors.push(format!( "[{}] settings are not valid: {:?}", diff --git a/src/workers/worker.rs b/src/workers/worker.rs index 730773fe..c528ce35 100644 --- a/src/workers/worker.rs +++ b/src/workers/worker.rs @@ -1,10 +1,10 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use itertools::Itertools; use policy_evaluator::callback_requests::CallbackRequest; use policy_evaluator::wasmtime; use policy_evaluator::{ admission_response::{AdmissionResponse, AdmissionResponseStatus}, - policy_evaluator::{Evaluator, ValidateRequest}, + policy_evaluator::ValidateRequest, }; use std::{collections::HashMap, fmt, time::Instant}; use tokio::sync::mpsc::{Receiver, Sender}; @@ -13,17 +13,10 @@ use tracing::{error, info, info_span}; use crate::communication::{EvalRequest, RequestOrigin}; use crate::metrics::{self}; use crate::settings::{Policy, PolicyMode}; -use crate::workers::precompiled_policy::PrecompiledPolicies; - -struct PolicyEvaluatorWithSettings { - policy_evaluator: Box, - policy_mode: PolicyMode, - allowed_to_mutate: bool, - always_accept_admission_reviews_on_namespace: Option, -} +use crate::workers::{precompiled_policy::PrecompiledPolicies, EvaluationEnvironment}; pub(crate) struct Worker { - evaluators: HashMap, + evaluation_environment: EvaluationEnvironment, channel_rx: Receiver, } @@ -40,12 +33,19 @@ impl fmt::Display for PolicyErrors { } impl Worker { + /// Create a new Worker. Takes care of allocating the `PolicyEvaluator` environments + /// required to evaluate the policies. + /// + /// No check is done against the policy settings provided by the user. The `WorkerPool` + /// already verified that all the settings are valid. + #[allow(clippy::too_many_arguments)] #[tracing::instrument( name = "worker_new", fields(host=crate::cli::HOSTNAME.as_str()), skip_all, )] pub(crate) fn new( + worker_id: u64, rx: Receiver, policies: &HashMap, precompiled_policies: &PrecompiledPolicies, @@ -55,12 +55,13 @@ impl Worker { policy_evaluation_limit_seconds: Option, ) -> Result { let mut evs_errors = HashMap::new(); - let mut evs = HashMap::new(); + let mut evaluation_environment = + EvaluationEnvironment::new(worker_id, always_accept_admission_reviews_on_namespace); for (id, policy) in policies.iter() { // It's safe to clone the outer engine. This creates a shallow copy let inner_engine = engine.clone(); - let policy_evaluator = match crate::workers::pool::build_policy_evaluator( + if let Err(e) = evaluation_environment.register( id, policy, &inner_engine, @@ -68,25 +69,11 @@ impl Worker { callback_handler_tx.clone(), policy_evaluation_limit_seconds, ) { - Ok(pe) => Box::new(pe), - Err(e) => { - evs_errors.insert( - id.clone(), - format!("[{id}] could not create PolicyEvaluator: {e:?}"), - ); - continue; - } - }; - - let policy_evaluator_with_settings = PolicyEvaluatorWithSettings { - policy_evaluator, - policy_mode: policy.policy_mode.clone(), - allowed_to_mutate: policy.allowed_to_mutate.unwrap_or(false), - always_accept_admission_reviews_on_namespace: - always_accept_admission_reviews_on_namespace.clone(), - }; - - evs.insert(id.to_string(), policy_evaluator_with_settings); + evs_errors.insert( + id.clone(), + format!("[{id}] could not create PolicyEvaluator: {e:?}"), + ); + } } if !evs_errors.is_empty() { @@ -94,7 +81,7 @@ impl Worker { } Ok(Worker { - evaluators: evs, + evaluation_environment, channel_rx: rx, }) } @@ -154,31 +141,40 @@ impl Worker { } } + /// Endless loop that waits for the WorkerPool to give a new request to be processed. + /// The request is then evaluated and a response is returned back to the WorkerPool. pub(crate) fn run(&mut self) { while let Some(req) = self.channel_rx.blocking_recv() { let span = info_span!(parent: &req.parent_span, "policy_eval"); let _enter = span.enter(); - let res = match self.evaluators.get_mut(&req.policy_id) { - Some(pes) => Self::evaluate(req, pes), - None => req - .resp_chan - .send(None) - .map_err(|_| anyhow!("cannot send response back")), - }; - if res.is_err() { - error!("receiver dropped"); + let admission_response = self.evaluate(&req).unwrap_or_else(|e| { + AdmissionResponse::reject_internal_server_error( + req.req.uid().to_owned(), + e.to_string(), + ) + }); + if let Err(e) = req.resp_chan.send(Some(admission_response)) { + error!("cannot send response back: {e:?}"); } } } - fn evaluate(req: EvalRequest, pes: &mut PolicyEvaluatorWithSettings) -> anyhow::Result<()> { + /// Perform the actual evaluation + fn evaluate(&mut self, req: &EvalRequest) -> Result { let start_time = Instant::now(); - let policy_name = pes.policy_evaluator.policy_id(); - let policy_mode = pes.policy_mode.clone(); - let allowed_to_mutate = pes.allowed_to_mutate; - let vanilla_validation_response = pes.policy_evaluator.validate(req.req.clone()); + let policy_name = req.policy_id.clone(); + let policy_mode = self + .evaluation_environment + .get_policy_mode(&req.policy_id)?; + let allowed_to_mutate = self + .evaluation_environment + .get_policy_allowed_to_mutate(&req.policy_id)?; + + let vanilla_validation_response = + self.evaluation_environment.validate(&req.policy_id, req)?; + let policy_evaluation_duration = start_time.elapsed(); let accepted = vanilla_validation_response.allowed; let mutated = vanilla_validation_response.patch.is_some(); @@ -200,17 +196,23 @@ impl Worker { match req.req.clone() { ValidateRequest::AdmissionRequest(adm_req) => { - if let Some(namespace) = &pes.always_accept_admission_reviews_on_namespace { - // If the policy server is configured to - // always accept admission reviews on a - // given namespace, just set the `allowed` - // part of the response to `true` if the - // request matches this namespace. Keep - // the rest of the behaviors unchanged, - // such as checking if the policy is - // allowed to mutate. - - if adm_req.namespace == Some(namespace.to_string()) { + // TODO: we should check immediately if the request is coming from the "always + // accepted" namespace ASAP. Right now we do an evaluation and then we discard the + // result if the namespace is the special one. + // Moreover, I (flavio) don't like the fact we're using a mutable variable for + // `validation_response` + if let Some(ref req_namespace) = adm_req.namespace { + if self + .evaluation_environment + .should_always_accept_requests_made_inside_of_namespace(req_namespace) + { + // given namespace, just set the `allowed` + // part of the response to `true` if the + // request matches this namespace. Keep + // the rest of the behaviors unchanged, + // such as checking if the policy is + // allowed to mutate. + validation_response = AdmissionResponse { allowed: true, status: None, @@ -218,7 +220,6 @@ impl Worker { }; } } - let policy_evaluation = metrics::PolicyEvaluation { policy_name, policy_mode: policy_mode.into(), @@ -246,9 +247,7 @@ impl Worker { metrics::add_policy_evaluation(&policy_evaluation); } }; - - let res = req.resp_chan.send(Some(validation_response)); - res.map_err(|_| anyhow!("cannot send response back")) + Ok(validation_response) } } @@ -256,82 +255,70 @@ impl Worker { mod tests { use crate::admission_review::tests::build_admission_review; use crate::communication::RequestOrigin; - - use policy_evaluator::kubewarden_policy_sdk::settings::SettingsValidationResponse; - use policy_evaluator::ProtocolVersion; use rstest::*; - use tokio::sync::oneshot; + use tokio::sync::{mpsc, oneshot}; use super::*; const POLICY_ID: &str = "policy-id"; - struct MockPolicyEvaluator { - pub policy_id: String, - pub admission_response: AdmissionResponse, - pub settings_validation_response: SettingsValidationResponse, - pub protocol_version: Result, - } - - impl Default for MockPolicyEvaluator { - fn default() -> Self { - Self { - policy_id: "mock_policy".to_string(), - admission_response: AdmissionResponse { - allowed: false, - ..Default::default() - }, - settings_validation_response: SettingsValidationResponse { - valid: true, - message: None, - }, - protocol_version: Ok(ProtocolVersion::V1), - } - } - } - - impl MockPolicyEvaluator { - fn new_allowed() -> MockPolicyEvaluator { - MockPolicyEvaluator { - admission_response: AdmissionResponse { + fn create_evaluation_environment_that_accepts_request( + policy_mode: PolicyMode, + ) -> EvaluationEnvironment { + let mut mock_evaluation_environment = EvaluationEnvironment::default(); + mock_evaluation_environment + .expect_validate() + .returning(|_policy_id, request| { + Ok(AdmissionResponse { + uid: request.req.uid().to_owned(), allowed: true, ..Default::default() - }, - ..Default::default() - } - } - - fn new_rejected(message: Option, code: Option) -> MockPolicyEvaluator { - MockPolicyEvaluator { - admission_response: AdmissionResponse { - allowed: false, - status: Some(AdmissionResponseStatus { message, code }), - ..Default::default() - }, - ..Default::default() - } - } + }) + }); + mock_evaluation_environment + .expect_get_policy_mode() + .returning(move |_policy_id| Ok(policy_mode.clone())); + mock_evaluation_environment + .expect_get_policy_allowed_to_mutate() + .returning(|_policy_id| Ok(false)); + mock_evaluation_environment + .expect_should_always_accept_requests_made_inside_of_namespace() + .returning(|_namespace| false); + mock_evaluation_environment } - impl Evaluator for MockPolicyEvaluator { - fn validate(&mut self, _request: ValidateRequest) -> AdmissionResponse { - self.admission_response.clone() - } - - fn validate_settings(&mut self) -> SettingsValidationResponse { - self.settings_validation_response.clone() - } - - fn protocol_version(&mut self) -> Result { - match &self.protocol_version { - Ok(pv) => Ok(pv.clone()), - Err(e) => Err(anyhow::anyhow!("{}", e)), - } - } + #[derive(Clone)] + struct EvaluationEnvironmentRejectionDetails { + message: String, + code: u16, + } - fn policy_id(&self) -> String { - self.policy_id.clone() - } + fn create_evaluation_environment_that_reject_request( + policy_mode: PolicyMode, + rejection_details: EvaluationEnvironmentRejectionDetails, + allowed_namespace: String, + ) -> EvaluationEnvironment { + let mut mock_evaluation_environment = EvaluationEnvironment::default(); + mock_evaluation_environment + .expect_validate() + .returning(move |_policy_id, request| { + Ok(AdmissionResponse::reject( + request.req.uid().to_owned(), + rejection_details.message.clone(), + rejection_details.code, + )) + }); + mock_evaluation_environment + .expect_get_policy_mode() + .returning(move |_policy_id| Ok(policy_mode.clone())); + mock_evaluation_environment + .expect_get_policy_allowed_to_mutate() + .returning(|_policy_id| Ok(false)); + mock_evaluation_environment + .expect_should_always_accept_requests_made_inside_of_namespace() + .returning(move |namespace| namespace == allowed_namespace); + + mock_evaluation_environment } #[test] @@ -621,7 +608,7 @@ mod tests { #[case] policy_mode: PolicyMode, #[case] request_origin: RequestOrigin, ) { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let req = ValidateRequest::AdmissionRequest( build_admission_review().request.expect("no request"), ); @@ -634,21 +621,13 @@ mod tests { request_origin, }; - let mock_evaluator = MockPolicyEvaluator::new_allowed(); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let mut worker = Worker { + channel_rx, + evaluation_environment: create_evaluation_environment_that_accepts_request(policy_mode), }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); } @@ -663,7 +642,7 @@ mod tests { #[case] request_origin: RequestOrigin, #[case] accept: bool, ) { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let req = ValidateRequest::AdmissionRequest( build_admission_review().request.expect("no request"), ); @@ -676,23 +655,22 @@ mod tests { request_origin, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message.clone(), code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( policy_mode, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + rejection_details.clone(), + "".to_string(), + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); if accept { assert!(response.allowed); @@ -700,14 +678,14 @@ mod tests { } else { assert!(!response.allowed); let response_status = response.status.expect("should be set"); - assert_eq!(response_status.message, message); - assert_eq!(response_status.code, code); + assert_eq!(response_status.message, Some(rejection_details.message)); + assert_eq!(response_status.code, Some(rejection_details.code)); } } #[test] fn evaluate_policy_evaluator_accepts_request_raw() { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let request = serde_json::json!(r#"{"foo": "bar"}"#); let req = ValidateRequest::Raw(request.clone()); @@ -720,27 +698,21 @@ mod tests { request_origin: RequestOrigin::Validate, }; - let mock_evaluator = MockPolicyEvaluator::new_allowed(); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let mut worker = Worker { + channel_rx, + evaluation_environment: create_evaluation_environment_that_accepts_request( + PolicyMode::Protect, + ), }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); } #[test] fn evaluate_policy_evaluator_rejects_request_raw() { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let request = serde_json::json!(r#"{"foo": "bar"}"#); let req = ValidateRequest::Raw(request.clone()); @@ -753,28 +725,26 @@ mod tests { request_origin: RequestOrigin::Validate, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message.clone(), code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( + PolicyMode::Protect, + rejection_details.clone(), + "".to_string(), + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); - + let response = worker.evaluate(&eval_req).unwrap(); assert!(!response.allowed); let response_status = response.status.expect("should be set"); - assert_eq!(response_status.message, message); - assert_eq!(response_status.code, code); + assert_eq!(response_status.message, Some(rejection_details.message)); + assert_eq!(response_status.code, Some(rejection_details.code)); } #[rstest] @@ -788,7 +758,7 @@ mod tests { // of a namespace that is ignored by kubewarden -> this leads the // request to still be accepted - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let allowed_namespace = "kubewarden_special".to_string(); @@ -804,23 +774,22 @@ mod tests { request_origin, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message, code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: Some(allowed_namespace), + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( + PolicyMode::Protect, + rejection_details.clone(), + allowed_namespace, + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); assert!(response.status.is_none()); }