From f153b858e817cf145fda3eca5a2a117ee9e68e6f Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Thu, 16 Nov 2023 10:00:53 +0100 Subject: [PATCH 1/7] refactor: split worker-related files into a sub-module This allows a better organization of the code Signed-off-by: Flavio Castelli --- src/lib.rs | 5 +- src/workers/mod.rs | 3 + src/{worker_pool.rs => workers/pool.rs} | 142 +----------------------- src/workers/precompiled_policy.rs | 133 ++++++++++++++++++++++ src/{ => workers}/worker.rs | 4 +- 5 files changed, 146 insertions(+), 141 deletions(-) create mode 100644 src/workers/mod.rs rename src/{worker_pool.rs => workers/pool.rs} (74%) create mode 100644 src/workers/precompiled_policy.rs rename src/{ => workers}/worker.rs (99%) diff --git a/src/lib.rs b/src/lib.rs index 0a0fcada..b3a505a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,8 +3,7 @@ mod communication; mod metrics; mod policy_downloader; mod server; -mod worker; -mod worker_pool; +mod workers; pub mod admission_review; pub mod config; @@ -26,7 +25,7 @@ use tracing_subscriber::{fmt, EnvFilter}; use communication::{EvalRequest, WorkerPoolBootRequest}; use config::Config; use policy_downloader::Downloader; -use worker_pool::WorkerPool; +use workers::pool::WorkerPool; lazy_static! { static ref TRACE_SYSTEM_INITIALIZED: RwLock = RwLock::new(false); diff --git a/src/workers/mod.rs b/src/workers/mod.rs new file mode 100644 index 00000000..312639cd --- /dev/null +++ b/src/workers/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod pool; +pub(crate) mod precompiled_policy; +pub(crate) mod worker; diff --git a/src/worker_pool.rs b/src/workers/pool.rs similarity index 74% rename from src/worker_pool.rs rename to src/workers/pool.rs index f29d3f9c..c0bcacf3 100644 --- a/src/worker_pool.rs +++ b/src/workers/pool.rs @@ -1,19 +1,14 @@ use anyhow::{anyhow, Result}; use core::time; -use lazy_static::lazy_static; use policy_evaluator::{ callback_requests::CallbackRequest, - policy_evaluator::{Evaluator, PolicyEvaluator, PolicyExecutionMode}, + policy_evaluator::{Evaluator, PolicyEvaluator}, policy_evaluator_builder::PolicyEvaluatorBuilder, - policy_metadata::Metadata, wasmtime, }; use rayon::prelude::*; -use semver::{BuildMetadata, Prerelease, Version}; use std::{ collections::HashMap, - fs, - path::Path, sync::{ atomic::{AtomicBool, Ordering}, Arc, Barrier, RwLock, @@ -25,90 +20,14 @@ use std::{ use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, info, warn}; +use crate::communication::{EvalRequest, WorkerPoolBootRequest}; +use crate::config; use crate::policy_downloader::FetchedPolicies; -use crate::worker::Worker; -use crate::{ - communication::{EvalRequest, WorkerPoolBootRequest}, - config, +use crate::workers::{ + precompiled_policy::{PrecompiledPolicies, PrecompiledPolicy}, + worker::Worker, }; -lazy_static! { - static ref KUBEWARDEN_VERSION: Version = { - let mut version = Version::parse(env!("CARGO_PKG_VERSION")).expect("Cannot parse CARGO_PKG_VERSION version"); - // Remove the patch, prerelease and build information to avoid rejections - // like this: v1.6.0-rc4 < v1.6.0 - version.patch = 0; - version.pre = Prerelease::EMPTY; - version.build = BuildMetadata::EMPTY; - version - }; -} - -/// Check if policy server version is compatible with minimum kubewarden -/// version required by the policy -fn has_minimum_kubewarden_version(metadata: &Metadata) -> Result<()> { - if let Some(minimum_kubewarden_version) = &metadata.minimum_kubewarden_version { - let sanitized_minimum_kubewarden_version = Version { - major: minimum_kubewarden_version.major, - minor: minimum_kubewarden_version.minor, - // Kubewarden stack version ignore patch, prerelease and build version numbers - patch: 0, - pre: Prerelease::EMPTY, - build: BuildMetadata::EMPTY, - }; - if *KUBEWARDEN_VERSION < sanitized_minimum_kubewarden_version { - return Err(anyhow!( - "Policy required Kubewarden version {} but is running on {}", - sanitized_minimum_kubewarden_version, - KUBEWARDEN_VERSION.to_string(), - )); - } - } - Ok(()) -} - -/// This structure holds a precompiled WebAssembly module -/// representing a policy. -/// -/// Compiling a WebAssembly module is an expensive operation. Each -/// worker thread needs to do that, for each policy defined by the user. -/// -/// Precompiling the policies ahead of time reduces the bootstrap time by a lot. -/// -/// **Warning:** when "rehydrating" the module, you have to use a `wasmtime::Engine` -/// that has been created with the same `wasmtime::Config` used at compilation time. -#[derive(Clone)] -pub(crate) struct PrecompiledPolicy { - /// A precompiled [`wasmtime::Module`] - pub precompiled_module: Vec, - - /// The execution mode of the policy - pub execution_mode: PolicyExecutionMode, -} - -impl PrecompiledPolicy { - /// Load a WebAssembly module from the disk and compiles it - fn new(engine: &wasmtime::Engine, wasm_module_path: &Path) -> Result { - let policy_contents = fs::read(wasm_module_path)?; - let policy_metadata = Metadata::from_contents(&policy_contents)?; - let metadata = policy_metadata.unwrap_or_default(); - let execution_mode = metadata.execution_mode; - has_minimum_kubewarden_version(&metadata)?; - - let precompiled_module = engine.precompile_module(&policy_contents)?; - - Ok(Self { - precompiled_module, - execution_mode, - }) - } -} - -/// A dictionary with: -/// * Key: the URL of the WebAssembly module -/// * value: the PrecompiledPolicy -pub(crate) type PrecompiledPolicies = HashMap; - pub(crate) struct WorkerPool { api_rx: mpsc::Receiver, bootstrap_rx: oneshot::Receiver, @@ -495,52 +414,3 @@ fn verify_policy_settings( Err(anyhow!("{}", errors.join(", "))) } } - -#[cfg(test)] -mod tests { - use super::*; - use rstest::rstest; - - fn generate_metadata(major: u64, minor: u64, patch: u64) -> Metadata { - let minimum_kubewarden_version = Version { - major, - minor, - patch, - pre: Prerelease::EMPTY, - build: BuildMetadata::EMPTY, - }; - Metadata { - minimum_kubewarden_version: Some(minimum_kubewarden_version), - ..Default::default() - } - } - - #[rstest] - #[case(generate_metadata(KUBEWARDEN_VERSION.major -1, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch))] - #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor - 1, KUBEWARDEN_VERSION.patch))] - fn recent_kubewarden_versions_test(#[case] metadata: Metadata) { - assert!(has_minimum_kubewarden_version(&metadata).is_ok()) - } - - #[rstest] - #[case(generate_metadata(KUBEWARDEN_VERSION.major +1, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch))] - #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor + 1, KUBEWARDEN_VERSION.patch))] - fn old_kubewarden_versions_test(#[case] metadata: Metadata) { - assert!(has_minimum_kubewarden_version(&metadata).is_err()) - } - - #[test] - fn no_mininum_kubewarden_version_is_valid_test() { - let metadata = Metadata { - minimum_kubewarden_version: None, - ..Default::default() - }; - assert!(has_minimum_kubewarden_version(&metadata).is_ok()) - } - - #[rstest] - #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch + 1))] - fn ignore_patch_version_test(#[case] metadata: Metadata) { - assert!(has_minimum_kubewarden_version(&metadata).is_ok()) - } -} diff --git a/src/workers/precompiled_policy.rs b/src/workers/precompiled_policy.rs new file mode 100644 index 00000000..57534736 --- /dev/null +++ b/src/workers/precompiled_policy.rs @@ -0,0 +1,133 @@ +use anyhow::{anyhow, Result}; +use lazy_static::lazy_static; +use policy_evaluator::{ + policy_evaluator::PolicyExecutionMode, policy_metadata::Metadata, wasmtime, +}; +use semver::{BuildMetadata, Prerelease, Version}; +use std::{collections::HashMap, fs, path::Path, vec::Vec}; + +lazy_static! { + static ref KUBEWARDEN_VERSION: Version = { + let mut version = Version::parse(env!("CARGO_PKG_VERSION")).expect("Cannot parse CARGO_PKG_VERSION version"); + // Remove the patch, prerelease and build information to avoid rejections + // like this: v1.6.0-rc4 < v1.6.0 + version.patch = 0; + version.pre = Prerelease::EMPTY; + version.build = BuildMetadata::EMPTY; + version + }; +} + +/// This structure holds a precompiled WebAssembly module +/// representing a policy. +/// +/// Compiling a WebAssembly module is an expensive operation. Each +/// worker thread needs to do that, for each policy defined by the user. +/// +/// Precompiling the policies ahead of time reduces the bootstrap time by a lot. +/// +/// **Warning:** when "rehydrating" the module, you have to use a `wasmtime::Engine` +/// that has been created with the same `wasmtime::Config` used at compilation time. +#[derive(Clone)] +pub(crate) struct PrecompiledPolicy { + /// A precompiled [`wasmtime::Module`] + pub precompiled_module: Vec, + + /// The execution mode of the policy + pub execution_mode: PolicyExecutionMode, +} + +impl PrecompiledPolicy { + /// Load a WebAssembly module from the disk and compiles it + pub fn new(engine: &wasmtime::Engine, wasm_module_path: &Path) -> Result { + let policy_contents = fs::read(wasm_module_path)?; + let policy_metadata = Metadata::from_contents(&policy_contents)?; + let metadata = policy_metadata.unwrap_or_default(); + let execution_mode = metadata.execution_mode; + has_minimum_kubewarden_version(&metadata)?; + + let precompiled_module = engine.precompile_module(&policy_contents)?; + + Ok(Self { + precompiled_module, + execution_mode, + }) + } +} + +/// A dictionary with: +/// * Key: the URL of the WebAssembly module +/// * value: the PrecompiledPolicy +pub(crate) type PrecompiledPolicies = HashMap; + +/// Check if policy server version is compatible with minimum kubewarden +/// version required by the policy +fn has_minimum_kubewarden_version(metadata: &Metadata) -> Result<()> { + if let Some(minimum_kubewarden_version) = &metadata.minimum_kubewarden_version { + let sanitized_minimum_kubewarden_version = Version { + major: minimum_kubewarden_version.major, + minor: minimum_kubewarden_version.minor, + // Kubewarden stack version ignore patch, prerelease and build version numbers + patch: 0, + pre: Prerelease::EMPTY, + build: BuildMetadata::EMPTY, + }; + if *KUBEWARDEN_VERSION < sanitized_minimum_kubewarden_version { + return Err(anyhow!( + "Policy required Kubewarden version {} but is running on {}", + sanitized_minimum_kubewarden_version, + KUBEWARDEN_VERSION.to_string(), + )); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::rstest; + + fn generate_metadata(major: u64, minor: u64, patch: u64) -> Metadata { + let minimum_kubewarden_version = Version { + major, + minor, + patch, + pre: Prerelease::EMPTY, + build: BuildMetadata::EMPTY, + }; + Metadata { + minimum_kubewarden_version: Some(minimum_kubewarden_version), + ..Default::default() + } + } + + #[rstest] + #[case(generate_metadata(KUBEWARDEN_VERSION.major -1, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch))] + #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor - 1, KUBEWARDEN_VERSION.patch))] + fn recent_kubewarden_versions_test(#[case] metadata: Metadata) { + assert!(has_minimum_kubewarden_version(&metadata).is_ok()) + } + + #[rstest] + #[case(generate_metadata(KUBEWARDEN_VERSION.major +1, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch))] + #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor + 1, KUBEWARDEN_VERSION.patch))] + fn old_kubewarden_versions_test(#[case] metadata: Metadata) { + assert!(has_minimum_kubewarden_version(&metadata).is_err()) + } + + #[test] + fn no_mininum_kubewarden_version_is_valid_test() { + let metadata = Metadata { + minimum_kubewarden_version: None, + ..Default::default() + }; + assert!(has_minimum_kubewarden_version(&metadata).is_ok()) + } + + #[rstest] + #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch + 1))] + fn ignore_patch_version_test(#[case] metadata: Metadata) { + assert!(has_minimum_kubewarden_version(&metadata).is_ok()) + } +} diff --git a/src/worker.rs b/src/workers/worker.rs similarity index 99% rename from src/worker.rs rename to src/workers/worker.rs index 59c7a2c2..04763b5d 100644 --- a/src/worker.rs +++ b/src/workers/worker.rs @@ -13,7 +13,7 @@ use tracing::{error, info, info_span}; use crate::communication::{EvalRequest, RequestOrigin}; use crate::config::{Policy, PolicyMode}; use crate::metrics::{self}; -use crate::worker_pool::PrecompiledPolicies; +use crate::workers::precompiled_policy::PrecompiledPolicies; struct PolicyEvaluatorWithSettings { policy_evaluator: Box, @@ -60,7 +60,7 @@ impl Worker { for (id, policy) in policies.iter() { // It's safe to clone the outer engine. This creates a shallow copy let inner_engine = engine.clone(); - let policy_evaluator = match crate::worker_pool::build_policy_evaluator( + let policy_evaluator = match crate::workers::pool::build_policy_evaluator( id, policy, &inner_engine, From e691a8d2473953fb4c4c2419ac5cd823d74d9796 Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Thu, 16 Nov 2023 10:06:49 +0100 Subject: [PATCH 2/7] precompiled policy: compute digest This allows us to uniquely identify a precompiled wasm module Signed-off-by: Flavio Castelli --- Cargo.lock | 1 + Cargo.toml | 1 + src/workers/precompiled_policy.rs | 9 +++++++++ 3 files changed, 11 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index d763f9e6..8152194a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3399,6 +3399,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "sha2", "tempfile", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index c710f3d0..8c4d5f53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ rayon = "1.8" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.9.27" +sha2 = "0.10" tokio = { version = "^1", features = ["full"] } tracing = "0.1" tracing-futures = "0.2" diff --git a/src/workers/precompiled_policy.rs b/src/workers/precompiled_policy.rs index 57534736..808d4e25 100644 --- a/src/workers/precompiled_policy.rs +++ b/src/workers/precompiled_policy.rs @@ -4,6 +4,7 @@ use policy_evaluator::{ policy_evaluator::PolicyExecutionMode, policy_metadata::Metadata, wasmtime, }; use semver::{BuildMetadata, Prerelease, Version}; +use sha2::{Digest, Sha256}; use std::{collections::HashMap, fs, path::Path, vec::Vec}; lazy_static! { @@ -35,6 +36,9 @@ pub(crate) struct PrecompiledPolicy { /// The execution mode of the policy pub execution_mode: PolicyExecutionMode, + + /// sha256 digest of the precompiled module + pub digest: String, } impl PrecompiledPolicy { @@ -48,9 +52,14 @@ impl PrecompiledPolicy { let precompiled_module = engine.precompile_module(&policy_contents)?; + let mut hasher = Sha256::new(); + hasher.update(&precompiled_module); + let digest = hasher.finalize(); + Ok(Self { precompiled_module, execution_mode, + digest: format!("{digest:x}"), }) } } From ae05f2e1d55214651b27d1cb75a1f47eef7c1883 Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Tue, 21 Nov 2023 22:35:50 +0100 Subject: [PATCH 3/7] test: add more policies to the e2e environment This is going to be useful also to perform the k6 tests. Signed-off-by: Flavio Castelli --- e2e-tests/test_data/policies.yaml | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/e2e-tests/test_data/policies.yaml b/e2e-tests/test_data/policies.yaml index 2485ac32..bf16f2ee 100644 --- a/e2e-tests/test_data/policies.yaml +++ b/e2e-tests/test_data/policies.yaml @@ -77,3 +77,26 @@ raw-mutation-wasi: - "banana" - "carrot" defaultResource: "hay" + +apparmor: + url: ghcr.io/kubewarden/tests/apparmor-psp:v0.1.13 + allowedToMutate: false + settings: + allowed_profiles: + - runtime/default + +psp-user-group: + url: ghcr.io/kubewarden/tests/user-group-psp:v0.4.9 + allowedToMutate: true + settings: + run_as_user: + rule: MustRunAs + ranges: + - min: 1000 + max: 2000 + run_as_group: + rule: RunAsAny + overwrite: false + supplemental_groups: + rule: RunAsAny + overwrite: false From 147011b4c3d5fee6b35f5d1fdd9cb3649c77ac4a Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Wed, 22 Nov 2023 22:44:43 +0100 Subject: [PATCH 4/7] fix: reduce memory consumption of Policy Server Reduce the memory consumption of Policy Server when multiple instances of the same Wasm module are loaded. Thanks to this change, a worker will have only once instance of `PolicyEvaluator` (hence of wasmtime stack), per unique type of module. Literally speaking, if a user has the `apparmor` policy deployed 5 times (different names, settings,...) only one instance of `PolicyEvaluator` will be allocated for it. Note: the optimization works at the worker level. Meaning that `PolicyEvaluator` are NOT sharing these instances between themselves. This commit helps to address issue https://github.com/kubewarden/kubewarden-controller/issues/528 Signed-off-by: Flavio Castelli --- Cargo.lock | 180 ++++++++-- Cargo.toml | 5 +- src/config.rs | 3 +- src/workers/evaluation_environment.rs | 293 +++++++++++++++++ src/workers/mod.rs | 6 + src/workers/policy_evaluation_settings.rs | 15 + src/workers/pool.rs | 88 ++++- src/workers/worker.rs | 383 ++++++++++------------ 8 files changed, 715 insertions(+), 258 deletions(-) create mode 100644 src/workers/evaluation_environment.rs create mode 100644 src/workers/policy_evaluation_settings.rs diff --git a/Cargo.lock b/Cargo.lock index 8152194a..b97f8df1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,13 +186,13 @@ dependencies = [ [[package]] name = "async-global-executor" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b4353121d5644cdf2beb5726ab752e79a8db1ebb52031770ec47db31d245526" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ "async-channel 2.1.1", "async-executor", - "async-io 2.2.1", + "async-io 2.2.2", "async-lock 3.2.0", "blocking", "futures-lite 2.1.0", @@ -221,9 +221,9 @@ dependencies = [ [[package]] name = "async-io" -version = "2.2.1" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6d3b15875ba253d1110c740755e246537483f152fa334f91abd7fe84c88b3ff" +checksum = "6afaa937395a620e33dc6a742c593c01aced20aa376ffb0f628121198578ccc7" dependencies = [ "async-lock 3.2.0", "cfg-if", @@ -303,7 +303,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -320,7 +320,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -502,7 +502,7 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "burrego" version = "0.3.4" -source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.12.2#f43d8b2b9f21b4f9053058fc957649b8bbe7551e" +source = "git+https://github.com/kubewarden/policy-evaluator?rev=da91fae49e29914e458d9ac4f09e0676c38764c0#da91fae49e29914e458d9ac4f09e0676c38764c0" dependencies = [ "base64 0.21.5", "chrono", @@ -1072,7 +1072,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1129,7 +1129,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1151,7 +1151,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1184,7 +1184,7 @@ checksum = "5fe87ce4529967e0ba1dcf8450bab64d97dfd5010a6256187ffe2e43e6f0e049" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1197,6 +1197,12 @@ dependencies = [ "serde", ] +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.10.7" @@ -1300,6 +1306,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dyn-clone" version = "1.0.16" @@ -1497,6 +1509,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1512,6 +1533,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fs-set-times" version = "0.20.1" @@ -1607,7 +1634,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -2302,7 +2329,7 @@ dependencies = [ "jsonpath_lib", "k8s-openapi", "kube-core", - "pem 3.0.2", + "pem 3.0.3", "pin-project", "rustls", "rustls-pemfile", @@ -2552,6 +2579,45 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockall" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "mockall_double" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ca96e5ac35256ae3e13536edd39b172b88f41615e1d7b653c8ad24524113e8" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.40", +] + [[package]] name = "multer" version = "2.1.0" @@ -2576,6 +2642,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27b02d87554356db9e9a873add8782d4ea6e3e58ea071a9adb9a2e8ddb884a8b" +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2646,7 +2718,7 @@ checksum = "cfb77679af88f8b125209d354a202862602672222e7f2313fdd6dc349bad4712" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -3080,9 +3152,9 @@ dependencies = [ [[package]] name = "pem" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3163d2912b7c3b52d651a055f2c7eec9ba5cd22d26ef75b8dd3a59980b185923" +checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" dependencies = [ "base64 0.21.5", "serde", @@ -3226,7 +3298,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -3305,7 +3377,7 @@ checksum = "14e6ab3f592e6fb464fc9712d8d6e6912de6473954635fd76a589d832cffcbb0" [[package]] name = "policy-evaluator" version = "0.12.2" -source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.12.2#f43d8b2b9f21b4f9053058fc957649b8bbe7551e" +source = "git+https://github.com/kubewarden/policy-evaluator?rev=da91fae49e29914e458d9ac4f09e0676c38764c0#da91fae49e29914e458d9ac4f09e0676c38764c0" dependencies = [ "anyhow", "base64 0.21.5", @@ -3386,6 +3458,8 @@ dependencies = [ "itertools 0.12.0", "k8s-openapi", "lazy_static", + "mockall", + "mockall_double", "num_cpus", "opentelemetry", "opentelemetry-otlp", @@ -3462,6 +3536,36 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "predicates" +version = "2.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" +dependencies = [ + "difflib", + "float-cmp", + "itertools 0.10.5", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "primeorder" version = "0.13.6" @@ -3838,7 +3942,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.39", + "syn 2.0.40", "unicode-ident", ] @@ -4100,7 +4204,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -4172,7 +4276,7 @@ dependencies = [ "darling 0.20.3", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -4439,9 +4543,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.39" +version = "2.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "13fa70a4ee923979ffb522cacce59d34421ebdea5625e1073c4326ef9d2dd42e" dependencies = [ "proc-macro2", "quote", @@ -4510,6 +4614,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.50" @@ -4527,7 +4637,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -4621,7 +4731,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -4797,7 +4907,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -5208,7 +5318,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", "wasm-bindgen-shared", ] @@ -5242,7 +5352,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5392,7 +5502,7 @@ dependencies = [ "anyhow", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", "wasmtime-component-util", "wasmtime-wit-bindgen", "wit-parser", @@ -5602,7 +5712,7 @@ checksum = "09b5575a75e711ca6c36bb9ad647c93541cdc8e34218031acba5da3f35919dd3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -5774,7 +5884,7 @@ dependencies = [ "proc-macro2", "quote", "shellexpand", - "syn 2.0.39", + "syn 2.0.40", "witx", ] @@ -5786,7 +5896,7 @@ checksum = "c6f321dbce722989d65c3082dba479fa392c7b7a1a4c3adc2a39545dd5aa452f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", "wiggle-generate", ] @@ -6133,7 +6243,7 @@ checksum = "be912bf68235a88fbefd1b73415cb218405958d1655b2ece9035a19920bdf6ba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -6153,7 +6263,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8c4d5f53..7a62d7b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,8 @@ opentelemetry = { version = "0.21", default-features = false, features = [ ] } opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] } procfs = "0.16" -policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.12.2" } +#policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.12.2" } +policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", rev = "da91fae49e29914e458d9ac4f09e0676c38764c0" } rayon = "1.8" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } @@ -44,8 +45,10 @@ warp = { version = "0.3.6", default_features = false, features = [ "tls", ] } semver = { version = "1.0.20", features = ["serde"] } +mockall_double = "0.3" [dev-dependencies] +mockall = "0.11" rstest = "0.18" tempfile = "3.8.1" reqwest = { version = "0.11", default_features = false, features = [ diff --git a/src/config.rs b/src/config.rs index 62c98311..eb39b4b5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, Result}; use clap::ArgMatches; use lazy_static::lazy_static; +use policy_evaluator::policy_evaluator::PolicySettings; use policy_evaluator::policy_fetcher::sources::{read_sources_file, Sources}; use policy_evaluator::policy_fetcher::verify::config::{ read_verification_file, LatestVerificationConfig, VerificationConfigV1, @@ -231,7 +232,7 @@ pub struct Policy { } impl Policy { - pub fn settings_to_json(&self) -> Result>> { + pub fn settings_to_json(&self) -> Result> { match self.settings.as_ref() { None => Ok(None), Some(settings) => { diff --git a/src/workers/evaluation_environment.rs b/src/workers/evaluation_environment.rs new file mode 100644 index 00000000..e3751823 --- /dev/null +++ b/src/workers/evaluation_environment.rs @@ -0,0 +1,293 @@ +use anyhow::{anyhow, Result}; +use policy_evaluator::{ + admission_response::AdmissionResponse, callback_requests::CallbackRequest, + evaluation_context::EvaluationContext, policy_evaluator::PolicyEvaluator, + policy_evaluator_builder::PolicyEvaluatorBuilder, wasmtime, +}; +use std::collections::HashMap; +use tokio::sync::mpsc; + +use crate::communication::EvalRequest; +use crate::config::PolicyMode; +use crate::workers::{ + policy_evaluation_settings::PolicyEvaluationSettings, + precompiled_policy::{PrecompiledPolicies, PrecompiledPolicy}, +}; + +#[cfg(test)] +use mockall::automock; + +/// This structure contains all the policies defined by the user inside of the `policies.yml`. +/// It also provides helper methods to perform the validation of a request and the validation +/// of the settings provided by the user. +/// +/// Each worker has its own dedicated instance of this structure. +/// At the worker level, the ultimate goal is to avoid duplicated instances of `PolicyEvaluator`. +/// That means that, given two or more identical Wasm modules, only one `PolicyEvaluator` +/// should be created. This is required to avoid a waste of memory by the Policy Server +/// process. +/// +/// Note: the `PolicyEvaluator` instances will still be duplicated across each worker. This is +/// something we have to deal with. +#[derive(Default)] +#[cfg_attr(test, allow(dead_code))] +pub(crate) struct EvaluationEnvironment { + /// Unique ID of the worker + worker_id: u64, + + /// The name of the Namespace where Policy Server doesn't operate. All the requests + /// involving this Namespace are going to be accepted. This is usually done to prevent user + /// policies from messing with the components of the Kubewarden stack (which are all + /// deployed inside of the same Namespace). + always_accept_admission_reviews_on_namespace: Option, + + /// A map with the unique ID of a Wasm module as key, and the associated `PolicyEvaluator` + /// instance as value. + /// Currently we the `module_id` is obtained by computing the sha255 digest of the + /// optimized Wasm module. + /// This dictionary allows us to reduce by amount of memory consumed by Policy Server. + module_id_to_evaluator: HashMap, + + /// A map with the ID of the policy as value, and the associated `EvaluationContext` as + /// value. + /// In this case, `policy_id` is the name of the policy as declared inside of the + /// `policies.yml` file. These names are guaranteed to be unique. + policy_id_to_eval_ctx: HashMap, + + /// Map a `policy_id` (the name given by the user inside of `policies.yml`) to the + /// `module_id`. This allows us to deduplicate the Wasm modules defined by the user. + policy_id_to_module_id: HashMap, + + /// Map a `policy_id` to the `PolicyEvaluationSettings` instance. This allows us to obtain + /// the list of settings to be used when evaluating a given policy. + policy_id_to_settings: HashMap, +} + +#[cfg_attr(test, automock)] +#[cfg_attr(test, allow(dead_code))] +impl EvaluationEnvironment { + /// Creates a new `EvaluationEnvironment` + pub(crate) fn new( + worker_id: u64, + always_accept_admission_reviews_on_namespace: Option, + ) -> Self { + Self { + worker_id, + always_accept_admission_reviews_on_namespace, + ..Default::default() + } + } + + /// Returns `true` if the given `namespace` is the special Namespace that is ignored by all + /// the policies + pub(crate) fn should_always_accept_requests_made_inside_of_namespace( + &self, + namespace: &str, + ) -> bool { + self.always_accept_admission_reviews_on_namespace.as_deref() == Some(namespace) + } + + /// Register a new policy. It takes care of creating a new `PolicyEvaluator` (when needed). + /// + /// Params: + /// - `policy_id`: the ID of the policy, as specified inside of the `policies.yml` by the + /// user + /// - `policy`: a data structure that maps all the information defined inside of + /// `policies.yml` for the given policy + /// - `engine`: the `wasmtime::Engine` to be used when creating the `PolicyEvaluator` + /// - `policy_modules`: all the `wasmtime::Module` precompiled for the current + /// OS/architecture + /// - `callback_handler_tx`: the transmission end of a channel that connects the worker + /// with the asynchronous world + /// - `policy_evaluation_limit_seconds`: when set, defines after how many seconds the + /// policy evaluation is interrupted + pub(crate) fn register( + &mut self, + policy_id: &str, + policy: &crate::config::Policy, + engine: &wasmtime::Engine, + policy_modules: &PrecompiledPolicies, + callback_handler_tx: mpsc::Sender, + policy_evaluation_limit_seconds: Option, + ) -> Result<()> { + let precompiled_policy = policy_modules.get(policy.url.as_str()).ok_or_else(|| { + anyhow!( + "could not find preoptimized module for policy: {:?}", + policy.url + ) + })?; + let module_id = precompiled_policy.digest.clone(); + + if !self.module_id_to_evaluator.contains_key(&module_id) { + let evaluator = create_policy_evaluator( + policy_id, + self.worker_id, + policy, + engine, + precompiled_policy, + callback_handler_tx.clone(), + policy_evaluation_limit_seconds, + )?; + self.module_id_to_evaluator + .insert(module_id.clone(), evaluator); + } + self.policy_id_to_module_id + .insert(policy_id.to_owned(), module_id); + + let policy_eval_settings = PolicyEvaluationSettings { + policy_mode: policy.policy_mode.clone(), + allowed_to_mutate: policy.allowed_to_mutate.unwrap_or(false), + settings: policy.settings_to_json()?.unwrap_or_default(), + }; + self.policy_id_to_settings + .insert(policy_id.to_owned(), policy_eval_settings); + + let eval_ctx = EvaluationContext { + policy_id: policy_id.to_owned(), + callback_channel: Some(callback_handler_tx.clone()), + ctx_aware_resources_allow_list: policy.context_aware_resources.clone(), + }; + self.policy_id_to_eval_ctx + .insert(policy_id.to_owned(), eval_ctx); + + Ok(()) + } + + /// Given a policy ID, return how the policy operates + pub fn get_policy_mode(&self, policy_id: &str) -> Result { + self.policy_id_to_settings + .get(policy_id) + .map(|settings| settings.policy_mode.clone()) + .ok_or(anyhow!("cannot find policy with ID {policy_id}")) + } + + /// Given a policy ID, returns true if the policy is allowed to mutate + pub fn get_policy_allowed_to_mutate(&self, policy_id: &str) -> Result { + self.policy_id_to_settings + .get(policy_id) + .map(|settings| settings.allowed_to_mutate) + .ok_or(anyhow!("cannot find policy with ID {policy_id}")) + } + + /// Given a policy ID and a request to be processed, uses the `PolicyEvaluator` to perform + /// a validation operation. + pub fn validate(&mut self, policy_id: &str, req: &EvalRequest) -> Result { + let settings = self.policy_id_to_settings.get(policy_id).ok_or(anyhow!( + "cannot find settings for policy with ID {policy_id}" + ))?; + + let module_id = self.policy_id_to_module_id.get(policy_id).ok_or(anyhow!( + "cannot find module_id for policy with ID {policy_id}" + ))?; + let evaluator = self + .module_id_to_evaluator + .get_mut(module_id) + .ok_or(anyhow!( + "cannot find evaluator for policy with ID {policy_id}" + ))?; + + let eval_ctx = self.policy_id_to_eval_ctx.get(policy_id).ok_or(anyhow!( + "cannot find evaluation context for policy with ID {policy_id}" + ))?; + + Ok(evaluator.validate(req.req.clone(), &settings.settings, eval_ctx)) + } +} + +/// Internal function, takes care of creating the `PolicyEvaluator` instance for the given policy +fn create_policy_evaluator( + policy_id: &str, + worker_id: u64, + policy: &crate::config::Policy, + engine: &wasmtime::Engine, + precompiled_policy: &PrecompiledPolicy, + callback_handler_tx: mpsc::Sender, + policy_evaluation_limit_seconds: Option, +) -> Result { + // See `wasmtime::Module::deserialize` to know why this method is `unsafe`. + // However, in our context, nothing bad will happen because we have + // full control of the precompiled module. This is generated by the + // WorkerPool thread + let module = + unsafe { wasmtime::Module::deserialize(engine, &precompiled_policy.precompiled_module) } + .map_err(|e| { + anyhow!( + "could not rehydrate wasmtime::Module {}: {:?}", + policy.url, + e + ) + })?; + + let mut policy_evaluator_builder = + PolicyEvaluatorBuilder::new(policy_id.to_string(), worker_id) + .engine(engine.clone()) + .policy_module(module) + .context_aware_resources_allowed(policy.context_aware_resources.clone()) + .callback_channel(callback_handler_tx) + .execution_mode(precompiled_policy.execution_mode); + + if let Some(limit) = policy_evaluation_limit_seconds { + policy_evaluator_builder = + policy_evaluator_builder.enable_epoch_interruptions(limit, limit); + } + + policy_evaluator_builder.build() +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + + use super::*; + use crate::config::Policy; + + /// Given to identical wasm modules, only one instance of PolicyEvaluator is going to be + /// created + #[test] + fn avoid_duplicated_instaces_of_policy_evaluator() { + let engine = wasmtime::Engine::default(); + let policy_ids = vec!["policy_1", "policy_2"]; + let module = wasmtime::Module::new(&engine, "(module (func))") + .expect("should be able to build the smallest wasm module ever"); + let (callback_handler_tx, _) = mpsc::channel(10); + + let precompiled_policy = PrecompiledPolicy { + precompiled_module: module.serialize().unwrap(), + execution_mode: policy_evaluator::policy_evaluator::PolicyExecutionMode::Wasi, + digest: "unique-digest".to_string(), + }; + + let mut policies = HashMap::new(); + let mut policy_modules = HashMap::new(); + + for policy_id in &policy_ids { + policies.insert( + policy_id.to_owned(), + Policy { + url: policy_id.to_string(), + policy_mode: PolicyMode::Protect, + allowed_to_mutate: None, + settings: None, + context_aware_resources: BTreeSet::new(), + }, + ); + policy_modules.insert(policy_id.to_string(), precompiled_policy.clone()); + } + + let mut evaluation_environment = EvaluationEnvironment::new(0, None); + for policy_id in policy_ids { + evaluation_environment + .register( + policy_id, + &policies[policy_id], + &engine, + &policy_modules, + callback_handler_tx.clone(), + None, + ) + .unwrap(); + } + + assert_eq!(evaluation_environment.module_id_to_evaluator.len(), 1); + } +} diff --git a/src/workers/mod.rs b/src/workers/mod.rs index 312639cd..ab2bb702 100644 --- a/src/workers/mod.rs +++ b/src/workers/mod.rs @@ -1,3 +1,9 @@ +mod evaluation_environment; +mod policy_evaluation_settings; pub(crate) mod pool; pub(crate) mod precompiled_policy; pub(crate) mod worker; + +// This is required to mock the `EvaluationEnvironment` inside of our tests +#[mockall_double::double] +pub(crate) use evaluation_environment::EvaluationEnvironment; diff --git a/src/workers/policy_evaluation_settings.rs b/src/workers/policy_evaluation_settings.rs new file mode 100644 index 00000000..47ae3a3c --- /dev/null +++ b/src/workers/policy_evaluation_settings.rs @@ -0,0 +1,15 @@ +use policy_evaluator::policy_evaluator::PolicySettings; + +use crate::config::PolicyMode; + +/// Holds the evaluation settings of loaded Policy. These settings are taken straight from the +/// `policies.yml` file provided by the user +#[cfg_attr(test, allow(dead_code))] +pub(crate) struct PolicyEvaluationSettings { + /// Whether the policy is operating in `protect` or `monitor` mode + pub(crate) policy_mode: PolicyMode, + /// Determines if a mutating policy is actually allowed to mutate + pub(crate) allowed_to_mutate: bool, + /// The policy-specific settings provided by the user + pub(crate) settings: PolicySettings, +} diff --git a/src/workers/pool.rs b/src/workers/pool.rs index c0bcacf3..be935681 100644 --- a/src/workers/pool.rs +++ b/src/workers/pool.rs @@ -1,16 +1,15 @@ use anyhow::{anyhow, Result}; use core::time; +use lazy_static::lazy_static; use policy_evaluator::{ - callback_requests::CallbackRequest, - policy_evaluator::{Evaluator, PolicyEvaluator}, - policy_evaluator_builder::PolicyEvaluatorBuilder, - wasmtime, + callback_requests::CallbackRequest, evaluation_context::EvaluationContext, + policy_evaluator::PolicyEvaluator, policy_evaluator_builder::PolicyEvaluatorBuilder, wasmtime, }; use rayon::prelude::*; use std::{ collections::HashMap, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, Barrier, RwLock, }, thread, @@ -28,15 +27,44 @@ use crate::workers::{ worker::Worker, }; +lazy_static! { + /// Used to create unique worker IDs + static ref WORKER_ID: Arc = Arc::new(AtomicU64::new(0)); +} + +/// Coordinates a set of workers. +/// Each worker takes care of performing the evaluation of the requests received by Policy Server +/// API endpoints. +/// +/// The HTTP API communicates with the worker pool via a dedicated chanel. The pool then assigns +/// the evaluation job to one of the workers. Currently this is done on a round-robin fashion. pub(crate) struct WorkerPool { + /// Channel used by the HTTP API to send to the pool the evaluation requests that have to be + /// processed api_rx: mpsc::Receiver, + + /// A oneshot channel used during the bootstrap phase. It's being used by the `main` to send + /// the data used to bootstrap of the workers. bootstrap_rx: oneshot::Receiver, + + /// The channel that connect the synchronous world of the workers with the tokio task that + /// handles all the async requests that might originate during the policy evaluation process. + /// For example, requesting Kubernetes resources, DNS resolution, Sigstore operations,... callback_handler_tx: mpsc::Sender, + + /// When set, this is the Namespace where all the policies do not apply. This is usually set + /// to be the Namespace where the Kubewarden is deployed; ensuring the user policies are not + /// going to interfere with the Kubewarden stack. always_accept_admission_reviews_on_namespace: Option, + + /// When set enables the policy timeout feature which prevents a rogue policy to enter an + /// endless loop/consume all the resources of a worker policy_evaluation_limit_seconds: Option, } impl WorkerPool { + /// Create a new `WorkerPool`, no bootstrap operation is done yet. This happens when invoking + /// the `run` method. pub(crate) fn new( bootstrap_rx: oneshot::Receiver, api_rx: mpsc::Receiver, @@ -53,9 +81,17 @@ impl WorkerPool { } } + /// Bootstrap the pool and then enter an endless loop that processes incoming requests. pub(crate) fn run(mut self) { + // The WorkerPool communicates with each worker over dedicated `mpsc::channel` (one per worker). + // This vector holds all the sender ends of these channels. let mut worker_tx_chans = Vec::>::new(); + + // Each worker has its own `wasmtime::Engine`, we have to keep track of them here because + // we have to increase their epoch tick to implement the policy timeout protection feature let mut worker_engines = Vec::::new(); + + // All the join handles of the spawned worker threads let mut join_handles = Vec::>>::new(); // Phase 1: wait for bootstrap data to be received by the main @@ -96,6 +132,9 @@ impl WorkerPool { } }; + // For each policy defined by the user, ensure the given settings are valid + // We exit with an error if one or more policies do not have valid + // settings. if let Err(error) = verify_policy_settings( &engine, &bootstrap_data.policies, @@ -134,8 +173,10 @@ impl WorkerPool { let (tx, rx) = mpsc::channel::(32); worker_tx_chans.push(tx); - // Each worker has its own wasmtime::Engine, sharing the + // Each worker has its own `wasmtime::Engine`, sharing the // same engine across all the workers leads to bad performance + // TODO: revisit this statement, it seems this issue has been solved by latest wasmtime + // releases let engine = match wasmtime::Engine::new(&wasmtime_config) { Ok(e) => e, Err(e) => { @@ -153,6 +194,8 @@ impl WorkerPool { return; } }; + // Note well: it's fast and cheap to clone a `wasmtime::Engine` as stated by the + // official docs. It's just a reference counter under the hood worker_engines.push(engine.clone()); let modules = precompiled_policies.clone(); @@ -166,7 +209,9 @@ impl WorkerPool { let join = thread::spawn(move || -> Result<()> { info!(spawned = n, total = pool_size, "spawning worker"); + let worker_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); let mut worker = match Worker::new( + worker_id, rx, &policies, &modules, @@ -204,6 +249,7 @@ impl WorkerPool { // meaning a lot of memory would have been consumed without a valid reason // during the whole execution time drop(precompiled_policies); + drop(policies); barrier.wait(); if !boot_canary.load(Ordering::SeqCst) { @@ -260,6 +306,7 @@ impl WorkerPool { pub(crate) fn build_policy_evaluator( policy_id: &str, + worker_id: u64, policy: &config::Policy, engine: &wasmtime::Engine, policy_modules: &PrecompiledPolicies, @@ -287,13 +334,13 @@ pub(crate) fn build_policy_evaluator( ) })?; - let mut policy_evaluator_builder = PolicyEvaluatorBuilder::new(policy_id.to_string()) - .engine(engine.clone()) - .policy_module(module) - .settings(policy.settings_to_json()?) - .context_aware_resources_allowed(policy.context_aware_resources.clone()) - .callback_channel(callback_handler_tx) - .execution_mode(policy_module.execution_mode); + let mut policy_evaluator_builder = + PolicyEvaluatorBuilder::new(policy_id.to_string(), worker_id) + .engine(engine.clone()) + .policy_module(module) + .context_aware_resources_allowed(policy.context_aware_resources.clone()) + .callback_channel(callback_handler_tx) + .execution_mode(policy_module.execution_mode); if let Some(limit) = policy_evaluation_limit_seconds { policy_evaluator_builder = @@ -346,6 +393,7 @@ fn precompile_policies( .collect()) } +/// Ensure the user provided valid settings for all the policies fn verify_policy_settings( engine: &wasmtime::Engine, policies: &HashMap, @@ -376,10 +424,15 @@ fn verify_policy_settings( }); } + // We have to create a worker_id because this is what the `PolicyEvaluator` constructor needs. + // In this case there's no actual `Worker`, we're going to run all the setting validation Wasm + // invocations inside of the thread of `WorkerPool` + let worker_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); let mut errors = vec![]; for (id, policy) in policies.iter() { let mut policy_evaluator = match build_policy_evaluator( id, + worker_id, policy, engine, policy_modules, @@ -392,7 +445,14 @@ fn verify_policy_settings( continue; } }; - let set_val_rep = policy_evaluator.validate_settings(); + let eval_ctx = EvaluationContext { + policy_id: id.to_owned(), + callback_channel: Some(callback_handler_tx.clone()), + ctx_aware_resources_allow_list: policy.context_aware_resources.clone(), + }; + + let set_val_rep = policy_evaluator + .validate_settings(&policy.settings_to_json()?.unwrap_or_default(), &eval_ctx); if !set_val_rep.valid { errors.push(format!( "[{}] settings are not valid: {:?}", diff --git a/src/workers/worker.rs b/src/workers/worker.rs index 04763b5d..0faede9b 100644 --- a/src/workers/worker.rs +++ b/src/workers/worker.rs @@ -1,10 +1,10 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use itertools::Itertools; use policy_evaluator::callback_requests::CallbackRequest; use policy_evaluator::wasmtime; use policy_evaluator::{ admission_response::{AdmissionResponse, AdmissionResponseStatus}, - policy_evaluator::{Evaluator, ValidateRequest}, + policy_evaluator::ValidateRequest, }; use std::{collections::HashMap, fmt, time::Instant}; use tokio::sync::mpsc::{Receiver, Sender}; @@ -13,17 +13,10 @@ use tracing::{error, info, info_span}; use crate::communication::{EvalRequest, RequestOrigin}; use crate::config::{Policy, PolicyMode}; use crate::metrics::{self}; -use crate::workers::precompiled_policy::PrecompiledPolicies; - -struct PolicyEvaluatorWithSettings { - policy_evaluator: Box, - policy_mode: PolicyMode, - allowed_to_mutate: bool, - always_accept_admission_reviews_on_namespace: Option, -} +use crate::workers::{precompiled_policy::PrecompiledPolicies, EvaluationEnvironment}; pub(crate) struct Worker { - evaluators: HashMap, + evaluation_environment: EvaluationEnvironment, channel_rx: Receiver, } @@ -40,12 +33,19 @@ impl fmt::Display for PolicyErrors { } impl Worker { + /// Create a new Worker. Takes care of allocating the `PolicyEvaluator` environments + /// required to evaluate the policies. + /// + /// No check is done against the policy settings provided by the user. The `WorkerPool` + /// already verified that all the settings are valid. + #[allow(clippy::too_many_arguments)] #[tracing::instrument( name = "worker_new", fields(host=crate::config::HOSTNAME.as_str()), skip_all, )] pub(crate) fn new( + worker_id: u64, rx: Receiver, policies: &HashMap, precompiled_policies: &PrecompiledPolicies, @@ -55,12 +55,13 @@ impl Worker { policy_evaluation_limit_seconds: Option, ) -> Result { let mut evs_errors = HashMap::new(); - let mut evs = HashMap::new(); + let mut evaluation_environment = + EvaluationEnvironment::new(worker_id, always_accept_admission_reviews_on_namespace); for (id, policy) in policies.iter() { // It's safe to clone the outer engine. This creates a shallow copy let inner_engine = engine.clone(); - let policy_evaluator = match crate::workers::pool::build_policy_evaluator( + if let Err(e) = evaluation_environment.register( id, policy, &inner_engine, @@ -68,25 +69,11 @@ impl Worker { callback_handler_tx.clone(), policy_evaluation_limit_seconds, ) { - Ok(pe) => Box::new(pe), - Err(e) => { - evs_errors.insert( - id.clone(), - format!("[{id}] could not create PolicyEvaluator: {e:?}"), - ); - continue; - } - }; - - let policy_evaluator_with_settings = PolicyEvaluatorWithSettings { - policy_evaluator, - policy_mode: policy.policy_mode.clone(), - allowed_to_mutate: policy.allowed_to_mutate.unwrap_or(false), - always_accept_admission_reviews_on_namespace: - always_accept_admission_reviews_on_namespace.clone(), - }; - - evs.insert(id.to_string(), policy_evaluator_with_settings); + evs_errors.insert( + id.clone(), + format!("[{id}] could not create PolicyEvaluator: {e:?}"), + ); + } } if !evs_errors.is_empty() { @@ -94,7 +81,7 @@ impl Worker { } Ok(Worker { - evaluators: evs, + evaluation_environment, channel_rx: rx, }) } @@ -154,31 +141,40 @@ impl Worker { } } + /// Endless loop that waits for the WorkerPool to give a new request to be processed. + /// The request is then evaluated and a response is returned back to the WorkerPool. pub(crate) fn run(&mut self) { while let Some(req) = self.channel_rx.blocking_recv() { let span = info_span!(parent: &req.parent_span, "policy_eval"); let _enter = span.enter(); - let res = match self.evaluators.get_mut(&req.policy_id) { - Some(pes) => Self::evaluate(req, pes), - None => req - .resp_chan - .send(None) - .map_err(|_| anyhow!("cannot send response back")), - }; - if res.is_err() { - error!("receiver dropped"); + let admission_response = self.evaluate(&req).unwrap_or_else(|e| { + AdmissionResponse::reject_internal_server_error( + req.req.uid().to_owned(), + e.to_string(), + ) + }); + if let Err(e) = req.resp_chan.send(Some(admission_response)) { + error!("cannot send response back: {e:?}"); } } } - fn evaluate(req: EvalRequest, pes: &mut PolicyEvaluatorWithSettings) -> anyhow::Result<()> { + /// Perform the actual evaluation + fn evaluate(&mut self, req: &EvalRequest) -> Result { let start_time = Instant::now(); - let policy_name = pes.policy_evaluator.policy_id(); - let policy_mode = pes.policy_mode.clone(); - let allowed_to_mutate = pes.allowed_to_mutate; - let vanilla_validation_response = pes.policy_evaluator.validate(req.req.clone()); + let policy_name = req.policy_id.clone(); + let policy_mode = self + .evaluation_environment + .get_policy_mode(&req.policy_id)?; + let allowed_to_mutate = self + .evaluation_environment + .get_policy_allowed_to_mutate(&req.policy_id)?; + + let vanilla_validation_response = + self.evaluation_environment.validate(&req.policy_id, req)?; + let policy_evaluation_duration = start_time.elapsed(); let accepted = vanilla_validation_response.allowed; let mutated = vanilla_validation_response.patch.is_some(); @@ -200,17 +196,23 @@ impl Worker { match req.req.clone() { ValidateRequest::AdmissionRequest(adm_req) => { - if let Some(namespace) = &pes.always_accept_admission_reviews_on_namespace { - // If the policy server is configured to - // always accept admission reviews on a - // given namespace, just set the `allowed` - // part of the response to `true` if the - // request matches this namespace. Keep - // the rest of the behaviors unchanged, - // such as checking if the policy is - // allowed to mutate. - - if adm_req.namespace == Some(namespace.to_string()) { + // TODO: we should check immediately if the request is coming from the "always + // accepted" namespace ASAP. Right now we do an evaluation and then we discard the + // result if the namespace is the special one. + // Moreover, I (flavio) don't like the fact we're using a mutable variable for + // `validation_response` + if let Some(ref req_namespace) = adm_req.namespace { + if self + .evaluation_environment + .should_always_accept_requests_made_inside_of_namespace(req_namespace) + { + // given namespace, just set the `allowed` + // part of the response to `true` if the + // request matches this namespace. Keep + // the rest of the behaviors unchanged, + // such as checking if the policy is + // allowed to mutate. + validation_response = AdmissionResponse { allowed: true, status: None, @@ -218,7 +220,6 @@ impl Worker { }; } } - let policy_evaluation = metrics::PolicyEvaluation { policy_name, policy_mode: policy_mode.into(), @@ -247,9 +248,7 @@ impl Worker { metrics::add_policy_evaluation(&policy_evaluation); } }; - - let res = req.resp_chan.send(Some(validation_response)); - res.map_err(|_| anyhow!("cannot send response back")) + Ok(validation_response) } } @@ -257,82 +256,70 @@ impl Worker { mod tests { use crate::admission_review::tests::build_admission_review; use crate::communication::RequestOrigin; - - use policy_evaluator::kubewarden_policy_sdk::settings::SettingsValidationResponse; - use policy_evaluator::ProtocolVersion; use rstest::*; - use tokio::sync::oneshot; + use tokio::sync::{mpsc, oneshot}; use super::*; const POLICY_ID: &str = "policy-id"; - struct MockPolicyEvaluator { - pub policy_id: String, - pub admission_response: AdmissionResponse, - pub settings_validation_response: SettingsValidationResponse, - pub protocol_version: Result, - } - - impl Default for MockPolicyEvaluator { - fn default() -> Self { - Self { - policy_id: "mock_policy".to_string(), - admission_response: AdmissionResponse { - allowed: false, - ..Default::default() - }, - settings_validation_response: SettingsValidationResponse { - valid: true, - message: None, - }, - protocol_version: Ok(ProtocolVersion::V1), - } - } - } - - impl MockPolicyEvaluator { - fn new_allowed() -> MockPolicyEvaluator { - MockPolicyEvaluator { - admission_response: AdmissionResponse { + fn create_evaluation_environment_that_accepts_request( + policy_mode: PolicyMode, + ) -> EvaluationEnvironment { + let mut mock_evaluation_environment = EvaluationEnvironment::default(); + mock_evaluation_environment + .expect_validate() + .returning(|_policy_id, request| { + Ok(AdmissionResponse { + uid: request.req.uid().to_owned(), allowed: true, ..Default::default() - }, - ..Default::default() - } - } - - fn new_rejected(message: Option, code: Option) -> MockPolicyEvaluator { - MockPolicyEvaluator { - admission_response: AdmissionResponse { - allowed: false, - status: Some(AdmissionResponseStatus { message, code }), - ..Default::default() - }, - ..Default::default() - } - } + }) + }); + mock_evaluation_environment + .expect_get_policy_mode() + .returning(move |_policy_id| Ok(policy_mode.clone())); + mock_evaluation_environment + .expect_get_policy_allowed_to_mutate() + .returning(|_policy_id| Ok(false)); + mock_evaluation_environment + .expect_should_always_accept_requests_made_inside_of_namespace() + .returning(|_namespace| false); + mock_evaluation_environment } - impl Evaluator for MockPolicyEvaluator { - fn validate(&mut self, _request: ValidateRequest) -> AdmissionResponse { - self.admission_response.clone() - } - - fn validate_settings(&mut self) -> SettingsValidationResponse { - self.settings_validation_response.clone() - } - - fn protocol_version(&mut self) -> Result { - match &self.protocol_version { - Ok(pv) => Ok(pv.clone()), - Err(e) => Err(anyhow::anyhow!("{}", e)), - } - } + #[derive(Clone)] + struct EvaluationEnvironmentRejectionDetails { + message: String, + code: u16, + } - fn policy_id(&self) -> String { - self.policy_id.clone() - } + fn create_evaluation_environment_that_reject_request( + policy_mode: PolicyMode, + rejection_details: EvaluationEnvironmentRejectionDetails, + allowed_namespace: String, + ) -> EvaluationEnvironment { + let mut mock_evaluation_environment = EvaluationEnvironment::default(); + mock_evaluation_environment + .expect_validate() + .returning(move |_policy_id, request| { + Ok(AdmissionResponse::reject( + request.req.uid().to_owned(), + rejection_details.message.clone(), + rejection_details.code, + )) + }); + mock_evaluation_environment + .expect_get_policy_mode() + .returning(move |_policy_id| Ok(policy_mode.clone())); + mock_evaluation_environment + .expect_get_policy_allowed_to_mutate() + .returning(|_policy_id| Ok(false)); + mock_evaluation_environment + .expect_should_always_accept_requests_made_inside_of_namespace() + .returning(move |namespace| namespace == allowed_namespace); + + mock_evaluation_environment } #[test] @@ -622,7 +609,7 @@ mod tests { #[case] policy_mode: PolicyMode, #[case] request_origin: RequestOrigin, ) { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let req = ValidateRequest::AdmissionRequest( build_admission_review().request.expect("no request"), ); @@ -635,21 +622,13 @@ mod tests { request_origin, }; - let mock_evaluator = MockPolicyEvaluator::new_allowed(); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let mut worker = Worker { + channel_rx, + evaluation_environment: create_evaluation_environment_that_accepts_request(policy_mode), }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); } @@ -664,7 +643,7 @@ mod tests { #[case] request_origin: RequestOrigin, #[case] accept: bool, ) { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let req = ValidateRequest::AdmissionRequest( build_admission_review().request.expect("no request"), ); @@ -677,23 +656,22 @@ mod tests { request_origin, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message.clone(), code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( policy_mode, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + rejection_details.clone(), + "".to_string(), + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); if accept { assert!(response.allowed); @@ -701,14 +679,14 @@ mod tests { } else { assert!(!response.allowed); let response_status = response.status.expect("should be set"); - assert_eq!(response_status.message, message); - assert_eq!(response_status.code, code); + assert_eq!(response_status.message, Some(rejection_details.message)); + assert_eq!(response_status.code, Some(rejection_details.code)); } } #[test] fn evaluate_policy_evaluator_accepts_request_raw() { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let request = serde_json::json!(r#"{"foo": "bar"}"#); let req = ValidateRequest::Raw(request.clone()); @@ -721,27 +699,21 @@ mod tests { request_origin: RequestOrigin::Validate, }; - let mock_evaluator = MockPolicyEvaluator::new_allowed(); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let mut worker = Worker { + channel_rx, + evaluation_environment: create_evaluation_environment_that_accepts_request( + PolicyMode::Protect, + ), }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); } #[test] fn evaluate_policy_evaluator_rejects_request_raw() { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let request = serde_json::json!(r#"{"foo": "bar"}"#); let req = ValidateRequest::Raw(request.clone()); @@ -754,28 +726,26 @@ mod tests { request_origin: RequestOrigin::Validate, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message.clone(), code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( + PolicyMode::Protect, + rejection_details.clone(), + "".to_string(), + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); - + let response = worker.evaluate(&eval_req).unwrap(); assert!(!response.allowed); let response_status = response.status.expect("should be set"); - assert_eq!(response_status.message, message); - assert_eq!(response_status.code, code); + assert_eq!(response_status.message, Some(rejection_details.message)); + assert_eq!(response_status.code, Some(rejection_details.code)); } #[rstest] @@ -789,7 +759,7 @@ mod tests { // of a namespace that is ignored by kubewarden -> this leads the // request to still be accepted - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let allowed_namespace = "kubewarden_special".to_string(); @@ -805,23 +775,22 @@ mod tests { request_origin, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message, code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: Some(allowed_namespace), + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( + PolicyMode::Protect, + rejection_details.clone(), + allowed_namespace, + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); assert!(response.status.is_none()); } From 08c64e08614f356a472d0133ff7487737e8b459c Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Thu, 30 Nov 2023 17:31:27 +0100 Subject: [PATCH 5/7] feat: reduce memory usage This commit significantly changes the internal architecture of Policy Server workers. This code takes advantage of the `PolicyEvaluatorPre` structure defined by the latest `policy-evalautor` crate. `PolicyEvaluatorPre` has different implementations, one per type of policy we support (waPC, Rego, WASI). Under the hood it holds a `wasmtime::InstancePre` instance that is used to quickly spawn a WebAssembly environment. Since `PolicyEvaluatorPre` instances are managed by the `EvaluationEnvironment` structure. `EvaluationEnvironment` takes care of deduplicating the WebAssembly modules defined inside of the `policies.yml` file, ensuring only one `PolicyEvaluatorPre` is created per policy. The `EvaluationEnvironment` struct provides `validate` and `validate_settings` methods. These methods create a fresh `PolicyEvaluator` instance by rehydrating its `PolicyEvaluatorPre` instance. Once the WebAssembly evaluation is done, the `PolicyEvaluator` instance is discarded. This is a big change compared to the previous approach, where each WebAssembly instance was a long lived object. This new architecture assures that each evaluation is done inside of a freshly created WebAssembly environment, which guarantees: - Policies leaking memory have a smaller impact on the memory consumption of the Policy Server process - Policy evaluation always starts with a clean slate, this is useful to prevent bugs caused by policies that are not written to be stateless In terms of memory optimizations, the `EvaluationEnvironment` is now an immutable object. That allows us to have one single instance of `EvaluationEnvironment` shared across all the worker threads, all without using mutex or locks. This significantly reduces the amount of memory required by the Policy Server instance, without impacting on the system performances. As an added bonus, a lot of code has been simplified during this transition. More code can be removed by future PRs. Signed-off-by: Flavio Castelli --- Cargo.lock | 245 ++++++++++++------------- Cargo.toml | 5 +- src/workers/evaluation_environment.rs | 250 +++++++++++++++----------- src/workers/pool.rs | 191 ++++---------------- src/workers/worker.rs | 73 ++------ 5 files changed, 307 insertions(+), 457 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b97f8df1..c7f4452a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -502,7 +502,7 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "burrego" version = "0.3.4" -source = "git+https://github.com/kubewarden/policy-evaluator?rev=da91fae49e29914e458d9ac4f09e0676c38764c0#da91fae49e29914e458d9ac4f09e0676c38764c0" +source = "git+https://github.com/flavio/policy-evaluator?branch=on-demand#8752bbda6a1599a7ed877735bed64fb50ff0c643" dependencies = [ "base64 0.21.5", "chrono", @@ -850,18 +850,18 @@ dependencies = [ [[package]] name = "cranelift-bforest" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b5bb9245ec7dcc04d03110e538d31f0969d301c9d673145f4b4d5c3478539a3" +checksum = "8e7e56668d2263f92b691cb9e4a2fcb186ca0384941fe420484322fa559c3329" dependencies = [ "cranelift-entity", ] [[package]] name = "cranelift-codegen" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebb18d10e5ddac43ba4ca8fd4e310938569c3e484cc01b6372b27dc5bb4dfd28" +checksum = "2a9ff61938bf11615f55b80361288c68865318025632ea73c65c0b44fa16283c" dependencies = [ "bumpalo", "cranelift-bforest", @@ -880,33 +880,33 @@ dependencies = [ [[package]] name = "cranelift-codegen-meta" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a3ce6d22982c1b9b6b012654258bab1a13947bb12703518bef06b1a4867c3d6" +checksum = "50656bf19e3d4a153b404ff835b8b59e924cfa3682ebe0d3df408994f37983f6" dependencies = [ "cranelift-codegen-shared", ] [[package]] name = "cranelift-codegen-shared" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47220fd4f9a0ce23541652b6f16f83868d282602c600d14934b2a4c166b4bd80" +checksum = "388041deeb26109f1ea73c1812ea26bfd406c94cbce0bb5230aa44277e43b209" [[package]] name = "cranelift-control" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed5a4c42672aea9b6e820046b52e47a1c05d3394a6cdf4cb3c3c4b702f954bd2" +checksum = "b39b7c512ffac527e5b5df9beae3d67ab85d07dca6d88942c16195439fedd1d3" dependencies = [ "arbitrary", ] [[package]] name = "cranelift-entity" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b4e9a3296fc827f9d35135dc2c0c8dd8d8359eb1ef904bae2d55d5bcb0c9f94" +checksum = "fdb25f573701284fe2bcf88209d405342125df00764b396c923e11eafc94d892" dependencies = [ "serde", "serde_derive", @@ -914,9 +914,9 @@ dependencies = [ [[package]] name = "cranelift-frontend" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ec537d0f0b8e084517f3e7bfa1d89af343d7c7df455573fca9f272d4e01267" +checksum = "e57374fd11d72cf9ffb85ff64506ed831440818318f58d09f45b4185e5e9c376" dependencies = [ "cranelift-codegen", "log", @@ -926,15 +926,15 @@ dependencies = [ [[package]] name = "cranelift-isle" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45bab6d69919d210a50331d35cc6ce111567bc040aebac63a8ae130d0400a075" +checksum = "ae769b235f6ea2f86623a3ff157cc04a4ff131dc9fe782c2ebd35f272043581e" [[package]] name = "cranelift-native" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f32e81605f352cf37af5463f11cd7deec7b6572741931a8d372f7fdd4a744f5d" +checksum = "3dc7bfb8f13a0526fe20db338711d9354729b861c336978380bb10f7f17dd207" dependencies = [ "cranelift-codegen", "libc", @@ -943,9 +943,9 @@ dependencies = [ [[package]] name = "cranelift-wasm" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0edaa4cbec1bc787395c074233df2652dd62f3e29d3ee60329514a0a51e6b045" +checksum = "2c5f41a4af931b756be05af0dd374ce200aae2d52cea16b0beb07e8b52732c35" dependencies = [ "cranelift-codegen", "cranelift-entity", @@ -953,7 +953,7 @@ dependencies = [ "itertools 0.10.5", "log", "smallvec", - "wasmparser 0.115.0", + "wasmparser 0.116.1", "wasmtime-types", ] @@ -1197,12 +1197,6 @@ dependencies = [ "serde", ] -[[package]] -name = "difflib" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" - [[package]] name = "digest" version = "0.10.7" @@ -1509,15 +1503,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "float-cmp" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" -dependencies = [ - "num-traits", -] - [[package]] name = "fnv" version = "1.0.7" @@ -2162,6 +2147,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.0" @@ -2179,9 +2173,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "ittapi" -version = "0.3.5" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a5c0b993601cad796222ea076565c5d9f337d35592f8622c753724f06d7271" +checksum = "6b996fe614c41395cdaedf3cf408a9534851090959d90d54a535f675550b64b1" dependencies = [ "anyhow", "ittapi-sys", @@ -2190,9 +2184,9 @@ dependencies = [ [[package]] name = "ittapi-sys" -version = "0.3.5" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7b5e473765060536a660eed127f758cf1a810c73e49063264959c60d1727d9" +checksum = "52f5385394064fa2c886205dba02598013ce83d3e92d33dbdc0c52fe0e7bf4fc" dependencies = [ "cc", ] @@ -2581,9 +2575,9 @@ dependencies = [ [[package]] name = "mockall" -version = "0.11.4" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" +checksum = "1a978c8292954bcb9347a4e28772c0a0621166a1598fc1be28ac0076a4bb810e" dependencies = [ "cfg-if", "downcast", @@ -2596,14 +2590,14 @@ dependencies = [ [[package]] name = "mockall_derive" -version = "0.11.4" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" +checksum = "ad2765371d0978ba4ace4ebef047baa62fc068b431e468444b5610dd441c639b" dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.40", ] [[package]] @@ -2642,12 +2636,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27b02d87554356db9e9a873add8782d4ea6e3e58ea071a9adb9a2e8ddb884a8b" -[[package]] -name = "normalize-line-endings" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" - [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -3377,7 +3365,7 @@ checksum = "14e6ab3f592e6fb464fc9712d8d6e6912de6473954635fd76a589d832cffcbb0" [[package]] name = "policy-evaluator" version = "0.12.2" -source = "git+https://github.com/kubewarden/policy-evaluator?rev=da91fae49e29914e458d9ac4f09e0676c38764c0#da91fae49e29914e458d9ac4f09e0676c38764c0" +source = "git+https://github.com/flavio/policy-evaluator?branch=on-demand#8752bbda6a1599a7ed877735bed64fb50ff0c643" dependencies = [ "anyhow", "base64 0.21.5", @@ -3475,6 +3463,7 @@ dependencies = [ "serde_yaml", "sha2", "tempfile", + "thiserror", "tokio", "tracing", "tracing-futures", @@ -3538,16 +3527,13 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "predicates" -version = "2.1.5" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" +checksum = "6dfc28575c2e3f19cb3c73b93af36460ae898d426eba6fc15b9bd2a5220758a0" dependencies = [ - "difflib", - "float-cmp", - "itertools 0.10.5", - "normalize-line-endings", + "anstyle", + "itertools 0.11.0", "predicates-core", - "regex", ] [[package]] @@ -5199,8 +5185,7 @@ dependencies = [ [[package]] name = "wapc" version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b27ae434134e725e4d7d2d41e52e4c70f974d312dd96392c443875385d85a6c" +source = "git+https://github.com/flavio/wapc-rs/?branch=expose-wasmtime-provider-pre#af803159547d0d0d710c0ab7b1fd31363a54249e" dependencies = [ "log", "parking_lot", @@ -5256,9 +5241,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasi-cap-std-sync" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fd94e147b273348ec68ae412b8bc17a4d372b9e070535b98e3e2c5a3ffd8e83" +checksum = "a4328de5cf2a0debfc48216fe9c2747badc64957837641f5836cd8b3d48d73f0" dependencies = [ "anyhow", "async-trait", @@ -5279,9 +5264,9 @@ dependencies = [ [[package]] name = "wasi-common" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d5166f7432ee36d06aa9f9bd7990a00330401fdbc75be7887ea952a299b9a19" +checksum = "84f6774ec9e464b7373f683bc57ff87fcca5fd26a7d6bdb7438fb2f56a545aa6" dependencies = [ "anyhow", "bitflags 2.4.1", @@ -5365,9 +5350,9 @@ checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "wasm-encoder" -version = "0.35.0" +version = "0.36.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca90ba1b5b0a70d3d49473c5579951f3bddc78d47b59256d2f9d4922b150aca" +checksum = "822b645bf4f2446b949776ffca47e2af60b167209ffb70814ef8779d299cd421" dependencies = [ "leb128", ] @@ -5396,9 +5381,9 @@ dependencies = [ [[package]] name = "wasmparser" -version = "0.115.0" +version = "0.116.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e06c0641a4add879ba71ccb3a1e4278fd546f76f1eafb21d8f7b07733b547cd5" +checksum = "a58e28b80dd8340cb07b8242ae654756161f6fc8d0038123d679b7b99964fa50" dependencies = [ "indexmap 2.1.0", "semver", @@ -5426,9 +5411,9 @@ dependencies = [ [[package]] name = "wasmtime" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca54f6090ce46973f33a79f265924b204f248f91aec09229bce53d19d567c1a6" +checksum = "642e12d108e800215263e3b95972977f473957923103029d7d617db701d67ba4" dependencies = [ "anyhow", "async-trait", @@ -5449,8 +5434,8 @@ dependencies = [ "serde_derive", "serde_json", "target-lexicon", - "wasm-encoder 0.35.0", - "wasmparser 0.115.0", + "wasm-encoder 0.36.2", + "wasmparser 0.116.1", "wasmtime-cache", "wasmtime-component-macro", "wasmtime-component-util", @@ -5466,18 +5451,18 @@ dependencies = [ [[package]] name = "wasmtime-asm-macros" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54984bc0b5689da87a43d7c181d23092b4d5cfcbb7ae3eb6b917dd55865d95e6" +checksum = "beada8bb15df52503de0a4c58de4357bfd2f96d9a44a6e547bad11efdd988b47" dependencies = [ "cfg-if", ] [[package]] name = "wasmtime-cache" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a4df7655bb73b592189033ab046aa47c1da486d70bc9c1ebf45e55ac030bdf4" +checksum = "aba5bf44d044d25892c03fb3534373936ee204141ff92bac8297787ac7f22318" dependencies = [ "anyhow", "base64 0.21.5", @@ -5495,9 +5480,9 @@ dependencies = [ [[package]] name = "wasmtime-component-macro" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64de99fb7c4c383832b85efcaae95f7094a5c505d80146227ce97ab436cbac68" +checksum = "56ccba556991465cca68d5a54769684bcf489fb532059da55105f851642d52c1" dependencies = [ "anyhow", "proc-macro2", @@ -5510,15 +5495,15 @@ dependencies = [ [[package]] name = "wasmtime-component-util" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f9141a8df069e106eee0c3a8173c0809cf1a4b5630628cfb1f25ab114720093" +checksum = "05492a177a6006cb73f034d6e9a6fad6da55b23c4398835cb0012b5fa51ecf67" [[package]] name = "wasmtime-cranelift" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cf3cee8be02f5006d21b773ffd6802f96a0b7d661ff2ad8a01fb93df458b1aa" +checksum = "fe2e7532f1d6adbcc57e69bb6a7c503f0859076d07a9b4b6aabe8021ff8a05fd" dependencies = [ "anyhow", "cfg-if", @@ -5533,7 +5518,7 @@ dependencies = [ "object", "target-lexicon", "thiserror", - "wasmparser 0.115.0", + "wasmparser 0.116.1", "wasmtime-cranelift-shared", "wasmtime-environ", "wasmtime-versioned-export-macros", @@ -5541,9 +5526,9 @@ dependencies = [ [[package]] name = "wasmtime-cranelift-shared" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420fd2a69bc162957f4c94f21c7fa08ecf60d916f4e87b56332507c555da381d" +checksum = "8c98d5378a856cbf058d36278627dfabf0ed68a888142958c7ae8e6af507dafa" dependencies = [ "anyhow", "cranelift-codegen", @@ -5557,9 +5542,9 @@ dependencies = [ [[package]] name = "wasmtime-environ" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb6a445ce2b2810127caee6c1b79b8da4ae57712b05556a674592c18b7500a14" +checksum = "a6d33a9f421da810a070cd56add9bc51f852bd66afbb8b920489d6242f15b70e" dependencies = [ "anyhow", "cranelift-entity", @@ -5571,8 +5556,8 @@ dependencies = [ "serde_derive", "target-lexicon", "thiserror", - "wasm-encoder 0.35.0", - "wasmparser 0.115.0", + "wasm-encoder 0.36.2", + "wasmparser 0.116.1", "wasmprinter", "wasmtime-component-util", "wasmtime-types", @@ -5580,10 +5565,11 @@ dependencies = [ [[package]] name = "wasmtime-fiber" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345a8b061c9eab459e10b9112df9fc357d5a9e8b5b1004bc5fc674fba9be6d2a" +checksum = "404741f4c6d7f4e043be2e8b466406a2aee289ccdba22bf9eba6399921121b97" dependencies = [ + "anyhow", "cc", "cfg-if", "rustix 0.38.28", @@ -5594,9 +5580,9 @@ dependencies = [ [[package]] name = "wasmtime-jit" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f0f6586c61125fbfc13c3108c3dd565d21f314dd5bac823b9a5b7ab576d21f1" +checksum = "8d0994a86d6dca5f7d9740d7f2bd0568be06d2014a550361dc1c397d289d81ef" dependencies = [ "addr2line", "anyhow", @@ -5621,9 +5607,9 @@ dependencies = [ [[package]] name = "wasmtime-jit-debug" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "109a9e46afe33580b952b14a4207354355f19bcdf0b47485b397b68409eaf553" +checksum = "4e0c4b74e606d1462d648631d5bc328e3d5b14e7f9d3ff93bc6db062fb8c5cd8" dependencies = [ "object", "once_cell", @@ -5633,9 +5619,9 @@ dependencies = [ [[package]] name = "wasmtime-jit-icache-coherence" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f67e6be36375c39cff57ed3b137ab691afbf2d9ba8ee1c01f77888413f218749" +checksum = "3090a69ba1476979e090aa7ed4bc759178bafdb65b22f98b9ba24fc6e7e578d5" dependencies = [ "cfg-if", "libc", @@ -5644,9 +5630,8 @@ dependencies = [ [[package]] name = "wasmtime-provider" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "559e7c5b79fbee0619789b0b51d8dae7a6efe46abfb2f3d90e1e2082ec49b6b0" +version = "1.12.0" +source = "git+https://github.com/flavio/wapc-rs/?branch=expose-wasmtime-provider-pre#af803159547d0d0d710c0ab7b1fd31363a54249e" dependencies = [ "anyhow", "cfg-if", @@ -5663,9 +5648,9 @@ dependencies = [ [[package]] name = "wasmtime-runtime" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d07986b2327b5e7f535ed638fbde25990fc8f85400194fda0d26db71c7b685e" +checksum = "b993ac8380385ed67bf71b51b9553edcf1ab0801b78a805a067de581b9a3e88a" dependencies = [ "anyhow", "cc", @@ -5681,7 +5666,7 @@ dependencies = [ "rand", "rustix 0.38.28", "sptr", - "wasm-encoder 0.35.0", + "wasm-encoder 0.36.2", "wasmtime-asm-macros", "wasmtime-environ", "wasmtime-fiber", @@ -5693,22 +5678,22 @@ dependencies = [ [[package]] name = "wasmtime-types" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e810a0d2e869abd1cb42bd232990f6bd211672b3d202d2ae7e70ffb97ed70ea3" +checksum = "8b5778112fcab2dc3d4371f4203ab8facf0c453dd94312b0a88dd662955e64e0" dependencies = [ "cranelift-entity", "serde", "serde_derive", "thiserror", - "wasmparser 0.115.0", + "wasmparser 0.116.1", ] [[package]] name = "wasmtime-versioned-export-macros" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09b5575a75e711ca6c36bb9ad647c93541cdc8e34218031acba5da3f35919dd3" +checksum = "f50f51f8d79bfd2aa8e9d9a0ae7c2d02b45fe412e62ff1b87c0c81b07c738231" dependencies = [ "proc-macro2", "quote", @@ -5717,9 +5702,9 @@ dependencies = [ [[package]] name = "wasmtime-wasi" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e6730a2853226292cee755a36549dd1a443b324cf99319cb390af1afed6cb8a" +checksum = "eff3f4ad191a5e6d002bb5bffa3e2931a58984da9b30e57b48f353848748cf80" dependencies = [ "anyhow", "async-trait", @@ -5752,16 +5737,16 @@ dependencies = [ [[package]] name = "wasmtime-winch" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1c1b6abbba5a01739bef9f00a87b419414a7dd99b795823d93fb12fc2bf994a" +checksum = "d638e7c72447253485fe131523e7465ca318c0455c826eb4f5f612fb67b7de90" dependencies = [ "anyhow", "cranelift-codegen", "gimli", "object", "target-lexicon", - "wasmparser 0.115.0", + "wasmparser 0.116.1", "wasmtime-cranelift-shared", "wasmtime-environ", "winch-codegen", @@ -5769,9 +5754,9 @@ dependencies = [ [[package]] name = "wasmtime-wit-bindgen" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d214ca7513d76af2872ad5bba4b0dcc0225821931745fdcb4fc30dd34bc3bf7" +checksum = "4b804dfd3d0c0d6d37aa21026fe7772ba1a769c89ee4f5c4f13b82d91d75216f" dependencies = [ "anyhow", "heck", @@ -5781,9 +5766,9 @@ dependencies = [ [[package]] name = "wasmtime-wmemcheck" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dafab2db172a53e23940e0fa3078c202f567ee5f13f4b42f66b694fab43c658" +checksum = "9b6060bc082cc32d9a45587c7640e29e3c7b89ada82677ac25d87850aaccb368" [[package]] name = "wast" @@ -5860,9 +5845,9 @@ checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" [[package]] name = "wiggle" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f6ce56a4019ce3d8592c298029a75abe6887d1c95a078a4c53ec77a0628262d" +checksum = "f91028b241e692fdf30627ac10ba9d5ac378353ea4119b4f904ac95177057a44" dependencies = [ "anyhow", "async-trait", @@ -5875,9 +5860,9 @@ dependencies = [ [[package]] name = "wiggle-generate" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e585a4b1e84195031c77d8484af99cd93f129f45d519e83cb8cc75e9a420cfd3" +checksum = "5e8b3d76531994513671b2ec3b29fd342bf041e2282945bb6c52eebe6aa9e7da" dependencies = [ "anyhow", "heck", @@ -5890,9 +5875,9 @@ dependencies = [ [[package]] name = "wiggle-macro" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6f321dbce722989d65c3082dba479fa392c7b7a1a4c3adc2a39545dd5aa452f" +checksum = "c189fe00c67f61bb330827f2abab1af9b5925c7929535cd13a68d265ec20b02d" dependencies = [ "proc-macro2", "quote", @@ -5933,9 +5918,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "winch-codegen" -version = "0.12.4" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f112bebb367a544d20c254083798087f22ceeb426168a970b955e8436f749dca" +checksum = "0c792487f4dc42733d182a72e75d718b1a563cedcc1599ff0a9ed683c33e8bb7" dependencies = [ "anyhow", "cranelift-codegen", @@ -5943,7 +5928,7 @@ dependencies = [ "regalloc2", "smallvec", "target-lexicon", - "wasmparser 0.115.0", + "wasmparser 0.116.1", "wasmtime-environ", ] @@ -6176,9 +6161,9 @@ dependencies = [ [[package]] name = "wit-parser" -version = "0.12.2" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43771ee863a16ec4ecf9da0fc65c3bbd4a1235c8e3da5f094b562894843dfa76" +checksum = "15df6b7b28ce94b8be39d8df5cb21a08a4f3b9f33b631aedb4aa5776f785ead3" dependencies = [ "anyhow", "id-arena", diff --git a/Cargo.toml b/Cargo.toml index 7a62d7b8..800fd35a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,12 +29,13 @@ opentelemetry = { version = "0.21", default-features = false, features = [ opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] } procfs = "0.16" #policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.12.2" } -policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", rev = "da91fae49e29914e458d9ac4f09e0676c38764c0" } +policy-evaluator = { git = "https://github.com/flavio/policy-evaluator", branch = "on-demand" } rayon = "1.8" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.9.27" sha2 = "0.10" +thiserror = "1.0" tokio = { version = "^1", features = ["full"] } tracing = "0.1" tracing-futures = "0.2" @@ -48,7 +49,7 @@ semver = { version = "1.0.20", features = ["serde"] } mockall_double = "0.3" [dev-dependencies] -mockall = "0.11" +mockall = "0.12" rstest = "0.18" tempfile = "3.8.1" reqwest = { version = "0.11", default_features = false, features = [ diff --git a/src/workers/evaluation_environment.rs b/src/workers/evaluation_environment.rs index e3751823..3ff26bda 100644 --- a/src/workers/evaluation_environment.rs +++ b/src/workers/evaluation_environment.rs @@ -1,11 +1,16 @@ use anyhow::{anyhow, Result}; use policy_evaluator::{ - admission_response::AdmissionResponse, callback_requests::CallbackRequest, - evaluation_context::EvaluationContext, policy_evaluator::PolicyEvaluator, - policy_evaluator_builder::PolicyEvaluatorBuilder, wasmtime, + admission_response::AdmissionResponse, + callback_requests::CallbackRequest, + evaluation_context::EvaluationContext, + kubewarden_policy_sdk::settings::SettingsValidationResponse, + policy_evaluator::{PolicyEvaluator, PolicyEvaluatorPre, PolicyExecutionMode}, + policy_evaluator_builder::PolicyEvaluatorBuilder, + wasmtime, }; use std::collections::HashMap; use tokio::sync::mpsc; +use tracing::debug; use crate::communication::EvalRequest; use crate::config::PolicyMode; @@ -21,32 +26,30 @@ use mockall::automock; /// It also provides helper methods to perform the validation of a request and the validation /// of the settings provided by the user. /// -/// Each worker has its own dedicated instance of this structure. -/// At the worker level, the ultimate goal is to avoid duplicated instances of `PolicyEvaluator`. -/// That means that, given two or more identical Wasm modules, only one `PolicyEvaluator` -/// should be created. This is required to avoid a waste of memory by the Policy Server -/// process. +/// This is an immutable structure that can be safely shared across different threads once wrapped +/// inside of a `Arc`. /// -/// Note: the `PolicyEvaluator` instances will still be duplicated across each worker. This is -/// something we have to deal with. +/// When performing a `validate` or `validate_settings` operation, a new WebAssembly environment is +/// created and used to perform the operation. The environment is then discarded once the +/// evaluation is over. +/// This ensures: +/// - no memory leaks caused by bogus policies affect the Policy Server long running process +/// - no data is shared between evaluations of the same module +/// +/// To reduce the creation time, this code makes use of `PolicyEvaluatorPre` which are created +/// only once, during the bootstrap phase. #[derive(Default)] #[cfg_attr(test, allow(dead_code))] pub(crate) struct EvaluationEnvironment { - /// Unique ID of the worker - worker_id: u64, - /// The name of the Namespace where Policy Server doesn't operate. All the requests /// involving this Namespace are going to be accepted. This is usually done to prevent user /// policies from messing with the components of the Kubewarden stack (which are all /// deployed inside of the same Namespace). always_accept_admission_reviews_on_namespace: Option, - /// A map with the unique ID of a Wasm module as key, and the associated `PolicyEvaluator` - /// instance as value. - /// Currently we the `module_id` is obtained by computing the sha255 digest of the - /// optimized Wasm module. - /// This dictionary allows us to reduce by amount of memory consumed by Policy Server. - module_id_to_evaluator: HashMap, + /// A map with the module digest as key, and the associated `PolicyEvaluatorPre` + /// as value + module_digest_to_policy_evaluator_pre: HashMap, /// A map with the ID of the policy as value, and the associated `EvaluationContext` as /// value. @@ -55,8 +58,8 @@ pub(crate) struct EvaluationEnvironment { policy_id_to_eval_ctx: HashMap, /// Map a `policy_id` (the name given by the user inside of `policies.yml`) to the - /// `module_id`. This allows us to deduplicate the Wasm modules defined by the user. - policy_id_to_module_id: HashMap, + /// module's digest. This allows us to deduplicate the Wasm modules defined by the user. + policy_id_to_module_digest: HashMap, /// Map a `policy_id` to the `PolicyEvaluationSettings` instance. This allows us to obtain /// the list of settings to be used when evaluating a given policy. @@ -68,14 +71,34 @@ pub(crate) struct EvaluationEnvironment { impl EvaluationEnvironment { /// Creates a new `EvaluationEnvironment` pub(crate) fn new( - worker_id: u64, + engine: &wasmtime::Engine, + policies: &HashMap, + precompiled_policies: &PrecompiledPolicies, always_accept_admission_reviews_on_namespace: Option, - ) -> Self { - Self { - worker_id, + policy_evaluation_limit_seconds: Option, + callback_handler_tx: mpsc::Sender, + ) -> Result { + let mut eval_env = Self { always_accept_admission_reviews_on_namespace, ..Default::default() + }; + + for (policy_id, policy) in policies { + let precompiled_policy = precompiled_policies + .get(&policy.url) + .ok_or_else(|| anyhow!("cannot find policy settings of {}", policy_id))?; + + eval_env.register( + engine, + policy_id, + precompiled_policy, + policy, + callback_handler_tx.clone(), + policy_evaluation_limit_seconds, + )?; } + + Ok(eval_env) } /// Returns `true` if the given `namespace` is the special Namespace that is ignored by all @@ -90,49 +113,47 @@ impl EvaluationEnvironment { /// Register a new policy. It takes care of creating a new `PolicyEvaluator` (when needed). /// /// Params: + /// - `engine`: the `wasmtime::Engine` to be used when creating the `PolicyEvaluator` /// - `policy_id`: the ID of the policy, as specified inside of the `policies.yml` by the /// user + /// - `precompiled_policy`: the `PrecompiledPolicy` associated with the Wasm module referenced + /// by the policy /// - `policy`: a data structure that maps all the information defined inside of /// `policies.yml` for the given policy - /// - `engine`: the `wasmtime::Engine` to be used when creating the `PolicyEvaluator` - /// - `policy_modules`: all the `wasmtime::Module` precompiled for the current - /// OS/architecture /// - `callback_handler_tx`: the transmission end of a channel that connects the worker /// with the asynchronous world /// - `policy_evaluation_limit_seconds`: when set, defines after how many seconds the /// policy evaluation is interrupted - pub(crate) fn register( + fn register( &mut self, + engine: &wasmtime::Engine, policy_id: &str, + precompiled_policy: &PrecompiledPolicy, policy: &crate::config::Policy, - engine: &wasmtime::Engine, - policy_modules: &PrecompiledPolicies, callback_handler_tx: mpsc::Sender, policy_evaluation_limit_seconds: Option, ) -> Result<()> { - let precompiled_policy = policy_modules.get(policy.url.as_str()).ok_or_else(|| { - anyhow!( - "could not find preoptimized module for policy: {:?}", - policy.url - ) - })?; - let module_id = precompiled_policy.digest.clone(); - - if !self.module_id_to_evaluator.contains_key(&module_id) { - let evaluator = create_policy_evaluator( - policy_id, - self.worker_id, - policy, + let module_digest = &precompiled_policy.digest; + + if !self + .module_digest_to_policy_evaluator_pre + .contains_key(module_digest) + { + debug!(policy_id = policy.url, "create wasmtime::Module"); + let module = create_wasmtime_module(&policy.url, engine, precompiled_policy)?; + debug!(policy_id = policy.url, "create PolicyEvaluatorPre"); + let pol_eval_pre = create_policy_evaluator_pre( engine, - precompiled_policy, - callback_handler_tx.clone(), + &module, + precompiled_policy.execution_mode, policy_evaluation_limit_seconds, )?; - self.module_id_to_evaluator - .insert(module_id.clone(), evaluator); + + self.module_digest_to_policy_evaluator_pre + .insert(module_digest.to_owned(), pol_eval_pre); } - self.policy_id_to_module_id - .insert(policy_id.to_owned(), module_id); + self.policy_id_to_module_digest + .insert(policy_id.to_owned(), module_digest.to_owned()); let policy_eval_settings = PolicyEvaluationSettings { policy_mode: policy.policy_mode.clone(), @@ -169,69 +190,86 @@ impl EvaluationEnvironment { .ok_or(anyhow!("cannot find policy with ID {policy_id}")) } - /// Given a policy ID and a request to be processed, uses the `PolicyEvaluator` to perform - /// a validation operation. - pub fn validate(&mut self, policy_id: &str, req: &EvalRequest) -> Result { + /// Perform a request validation + pub fn validate(&self, policy_id: &str, req: &EvalRequest) -> Result { let settings = self.policy_id_to_settings.get(policy_id).ok_or(anyhow!( "cannot find settings for policy with ID {policy_id}" ))?; - let module_id = self.policy_id_to_module_id.get(policy_id).ok_or(anyhow!( - "cannot find module_id for policy with ID {policy_id}" + let mut evaluator = self.rehydrate(policy_id)?; + + let eval_ctx = self.policy_id_to_eval_ctx.get(policy_id).ok_or(anyhow!( + "cannot find evaluation context for policy with ID {policy_id}" + ))?; + + Ok(evaluator.validate(req.req.clone(), &settings.settings, eval_ctx)) + } + + /// Validate the settings the user provided for the given policy + pub fn validate_settings(&self, policy_id: &str) -> Result { + let settings = self.policy_id_to_settings.get(policy_id).ok_or(anyhow!( + "cannot find settings for policy with ID {policy_id}" ))?; - let evaluator = self - .module_id_to_evaluator - .get_mut(module_id) + + let mut evaluator = self.rehydrate(policy_id)?; + + Ok(evaluator.validate_settings(&settings.settings)) + } + + /// Internal method, create a `PolicyEvaluator` by using a pre-initialized instance + fn rehydrate(&self, policy_id: &str) -> Result { + let module_digest = self + .policy_id_to_module_digest + .get(policy_id) + .ok_or(anyhow!( + "cannot find module_digest for policy with ID {policy_id}" + ))?; + let policy_evaluator_pre = self + .module_digest_to_policy_evaluator_pre + .get(module_digest) .ok_or(anyhow!( - "cannot find evaluator for policy with ID {policy_id}" + "cannot find PolicyEvaluatorPre for policy with ID {policy_id}" ))?; let eval_ctx = self.policy_id_to_eval_ctx.get(policy_id).ok_or(anyhow!( "cannot find evaluation context for policy with ID {policy_id}" ))?; - Ok(evaluator.validate(req.req.clone(), &settings.settings, eval_ctx)) + policy_evaluator_pre.rehydrate(eval_ctx) } } -/// Internal function, takes care of creating the `PolicyEvaluator` instance for the given policy -fn create_policy_evaluator( - policy_id: &str, - worker_id: u64, - policy: &crate::config::Policy, +fn create_wasmtime_module( + policy_url: &str, engine: &wasmtime::Engine, precompiled_policy: &PrecompiledPolicy, - callback_handler_tx: mpsc::Sender, - policy_evaluation_limit_seconds: Option, -) -> Result { +) -> Result { // See `wasmtime::Module::deserialize` to know why this method is `unsafe`. // However, in our context, nothing bad will happen because we have // full control of the precompiled module. This is generated by the // WorkerPool thread - let module = - unsafe { wasmtime::Module::deserialize(engine, &precompiled_policy.precompiled_module) } - .map_err(|e| { - anyhow!( - "could not rehydrate wasmtime::Module {}: {:?}", - policy.url, - e - ) - })?; - - let mut policy_evaluator_builder = - PolicyEvaluatorBuilder::new(policy_id.to_string(), worker_id) - .engine(engine.clone()) - .policy_module(module) - .context_aware_resources_allowed(policy.context_aware_resources.clone()) - .callback_channel(callback_handler_tx) - .execution_mode(precompiled_policy.execution_mode); + unsafe { wasmtime::Module::deserialize(engine, &precompiled_policy.precompiled_module) } + .map_err(|e| anyhow!("could not rehydrate wasmtime::Module {policy_url}: {e:?}")) +} + +/// Internal function, takes care of creating the `PolicyEvaluator` instance for the given policy +fn create_policy_evaluator_pre( + engine: &wasmtime::Engine, + module: &wasmtime::Module, + mode: PolicyExecutionMode, + policy_evaluation_limit_seconds: Option, +) -> Result { + let mut policy_evaluator_builder = PolicyEvaluatorBuilder::new() + .engine(engine.to_owned()) + .policy_module(module.to_owned()) + .execution_mode(mode); if let Some(limit) = policy_evaluation_limit_seconds { policy_evaluator_builder = policy_evaluator_builder.enable_epoch_interruptions(limit, limit); } - policy_evaluator_builder.build() + policy_evaluator_builder.build_pre() } #[cfg(test)] @@ -257,37 +295,39 @@ mod tests { digest: "unique-digest".to_string(), }; - let mut policies = HashMap::new(); - let mut policy_modules = HashMap::new(); + let mut policies: HashMap = HashMap::new(); + let mut precompiled_policies: PrecompiledPolicies = PrecompiledPolicies::new(); for policy_id in &policy_ids { + let policy_url = format!("file:///tmp/{policy_id}.wasm"); policies.insert( - policy_id.to_owned(), + policy_id.to_string(), Policy { - url: policy_id.to_string(), + url: policy_url.clone(), policy_mode: PolicyMode::Protect, allowed_to_mutate: None, settings: None, context_aware_resources: BTreeSet::new(), }, ); - policy_modules.insert(policy_id.to_string(), precompiled_policy.clone()); + precompiled_policies.insert(policy_url, precompiled_policy.clone()); } - let mut evaluation_environment = EvaluationEnvironment::new(0, None); - for policy_id in policy_ids { + let evaluation_environment = EvaluationEnvironment::new( + &engine, + &policies, + &precompiled_policies, + None, + None, + callback_handler_tx, + ) + .unwrap(); + + assert_eq!( evaluation_environment - .register( - policy_id, - &policies[policy_id], - &engine, - &policy_modules, - callback_handler_tx.clone(), - None, - ) - .unwrap(); - } - - assert_eq!(evaluation_environment.module_id_to_evaluator.len(), 1); + .module_digest_to_policy_evaluator_pre + .len(), + 1 + ); } } diff --git a/src/workers/pool.rs b/src/workers/pool.rs index be935681..5b9c07c7 100644 --- a/src/workers/pool.rs +++ b/src/workers/pool.rs @@ -1,10 +1,7 @@ use anyhow::{anyhow, Result}; use core::time; use lazy_static::lazy_static; -use policy_evaluator::{ - callback_requests::CallbackRequest, evaluation_context::EvaluationContext, - policy_evaluator::PolicyEvaluator, policy_evaluator_builder::PolicyEvaluatorBuilder, wasmtime, -}; +use policy_evaluator::{callback_requests::CallbackRequest, wasmtime}; use rayon::prelude::*; use std::{ collections::HashMap, @@ -22,16 +19,12 @@ use tracing::{debug, error, info, warn}; use crate::communication::{EvalRequest, WorkerPoolBootRequest}; use crate::config; use crate::policy_downloader::FetchedPolicies; +use crate::workers::EvaluationEnvironment; use crate::workers::{ precompiled_policy::{PrecompiledPolicies, PrecompiledPolicy}, worker::Worker, }; -lazy_static! { - /// Used to create unique worker IDs - static ref WORKER_ID: Arc = Arc::new(AtomicU64::new(0)); -} - /// Coordinates a set of workers. /// Each worker takes care of performing the evaluation of the requests received by Policy Server /// API endpoints. @@ -87,10 +80,6 @@ impl WorkerPool { // This vector holds all the sender ends of these channels. let mut worker_tx_chans = Vec::>::new(); - // Each worker has its own `wasmtime::Engine`, we have to keep track of them here because - // we have to increase their epoch tick to implement the policy timeout protection feature - let mut worker_engines = Vec::::new(); - // All the join handles of the spawned worker threads let mut join_handles = Vec::>>::new(); @@ -106,13 +95,12 @@ impl WorkerPool { } }; - // To reduce bootstrap time, we will precompile all the WebAssembly - // modules we are going to use. let mut wasmtime_config = wasmtime::Config::new(); if self.policy_evaluation_limit_seconds.is_some() { wasmtime_config.epoch_interruption(true); } + // We are going to share the same engine across all the workers let engine = match wasmtime::Engine::new(&wasmtime_config) { Ok(e) => e, Err(e) => { @@ -121,25 +109,39 @@ impl WorkerPool { } }; - // Use a reference counter to share access to precompiled policies - // between workers. This reduces memory usage - let precompiled_policies: Arc = + let precompiled_policies = match precompile_policies(&engine, &bootstrap_data.fetched_policies) { - Ok(pp) => Arc::new(pp), + Ok(pp) => pp, Err(e) => { eprintln!("{e}"); std::process::exit(1); } }; + // EvaluationEnvironment instance that is going to be shared across all + // the worker threads + let evaluation_environment = match EvaluationEnvironment::new( + &engine, + &bootstrap_data.policies, + &precompiled_policies, + self.always_accept_admission_reviews_on_namespace, + self.policy_evaluation_limit_seconds, + self.callback_handler_tx.clone(), + ) { + Ok(ee) => Arc::new(ee), + Err(e) => { + eprintln!("{e}"); + std::process::exit(1); + } + }; + // For each policy defined by the user, ensure the given settings are valid // We exit with an error if one or more policies do not have valid // settings. if let Err(error) = verify_policy_settings( &engine, &bootstrap_data.policies, - &precompiled_policies, - self.callback_handler_tx.clone(), + evaluation_environment.clone(), self.policy_evaluation_limit_seconds, ) { error!(?error, "cannot validate policy settings"); @@ -165,73 +167,17 @@ impl WorkerPool { warn!("policy timeout protection is disabled"); } - // Use a reference counter to share access to policies - // between workers. This reduces memory usage - let policies = Arc::new(bootstrap_data.policies); - for n in 1..=pool_size { let (tx, rx) = mpsc::channel::(32); worker_tx_chans.push(tx); - // Each worker has its own `wasmtime::Engine`, sharing the - // same engine across all the workers leads to bad performance - // TODO: revisit this statement, it seems this issue has been solved by latest wasmtime - // releases - let engine = match wasmtime::Engine::new(&wasmtime_config) { - Ok(e) => e, - Err(e) => { - if bootstrap_data - .resp_chan - .send(Err(anyhow!( - "cannot create wasmtime engine for one of the workers: {}", - e - ))) - .is_err() - { - eprint!("cannot create wasmtime engine for one of the workers: {e}"); - std::process::exit(1); - }; - return; - } - }; - // Note well: it's fast and cheap to clone a `wasmtime::Engine` as stated by the - // official docs. It's just a reference counter under the hood - worker_engines.push(engine.clone()); - - let modules = precompiled_policies.clone(); let b = barrier.clone(); - let canary = boot_canary.clone(); - let callback_handler_tx = self.callback_handler_tx.clone(); - let always_accept_admission_reviews_on_namespace = - self.always_accept_admission_reviews_on_namespace.clone(); - let policies = policies.clone(); + let inner_evaluation_environment = evaluation_environment.clone(); let join = thread::spawn(move || -> Result<()> { info!(spawned = n, total = pool_size, "spawning worker"); - let worker_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); - let mut worker = match Worker::new( - worker_id, - rx, - &policies, - &modules, - engine, - callback_handler_tx, - always_accept_admission_reviews_on_namespace, - self.policy_evaluation_limit_seconds, - ) { - Ok(w) => w, - Err(e) => { - error!(error = e.to_string().as_str(), "cannot spawn worker"); - canary.store(false, Ordering::SeqCst); - b.wait(); - return Err(anyhow!("Worker {} couldn't start: {}", n, e)); - } - }; - // Drop the Arc references ASAP, they are no longer needed - // at this point - drop(policies); - drop(modules); + let mut worker = Worker::new(rx, inner_evaluation_environment); b.wait(); debug!(id = n, "worker loop start"); @@ -249,7 +195,6 @@ impl WorkerPool { // meaning a lot of memory would have been consumed without a valid reason // during the whole execution time drop(precompiled_policies); - drop(policies); barrier.wait(); if !boot_canary.load(Ordering::SeqCst) { @@ -279,13 +224,12 @@ impl WorkerPool { // start a dedicated thread that send tick events to all // the workers. This is used by the wasmtime's epoch_interruption // to keep track of the execution time of each wasm module + let engine_timer_thread = engine.clone(); thread::spawn(move || { let one_second = time::Duration::from_secs(1); loop { thread::sleep(one_second); - for engine in &worker_engines { - engine.increment_epoch(); - } + engine_timer_thread.increment_epoch(); } }); } @@ -304,52 +248,6 @@ impl WorkerPool { } } -pub(crate) fn build_policy_evaluator( - policy_id: &str, - worker_id: u64, - policy: &config::Policy, - engine: &wasmtime::Engine, - policy_modules: &PrecompiledPolicies, - callback_handler_tx: mpsc::Sender, - policy_evaluation_limit_seconds: Option, -) -> Result { - let policy_module = policy_modules.get(policy.url.as_str()).ok_or_else(|| { - anyhow!( - "could not find preoptimized module for policy: {:?}", - policy.url - ) - })?; - - // See `wasmtime::Module::deserialize` to know why this method is `unsafe`. - // However, in our context, nothing bad will happen because we have - // full control of the precompiled module. This is generated by the - // WorkerPool thread - let module = - unsafe { wasmtime::Module::deserialize(engine, &policy_module.precompiled_module) } - .map_err(|e| { - anyhow!( - "could not rehydrate wasmtime::Module {}: {:?}", - policy.url, - e - ) - })?; - - let mut policy_evaluator_builder = - PolicyEvaluatorBuilder::new(policy_id.to_string(), worker_id) - .engine(engine.clone()) - .policy_module(module) - .context_aware_resources_allowed(policy.context_aware_resources.clone()) - .callback_channel(callback_handler_tx) - .execution_mode(policy_module.execution_mode); - - if let Some(limit) = policy_evaluation_limit_seconds { - policy_evaluator_builder = - policy_evaluator_builder.enable_epoch_interruptions(limit, limit); - } - - policy_evaluator_builder.build() -} - fn precompile_policies( engine: &wasmtime::Engine, fetched_policies: &FetchedPolicies, @@ -397,8 +295,7 @@ fn precompile_policies( fn verify_policy_settings( engine: &wasmtime::Engine, policies: &HashMap, - policy_modules: &HashMap, - callback_handler_tx: mpsc::Sender, + evaluation_environment: Arc, policy_evaluation_limit_seconds: Option, ) -> Result<()> { let tick_thread_lock = Arc::new(RwLock::new(true)); @@ -424,39 +321,13 @@ fn verify_policy_settings( }); } - // We have to create a worker_id because this is what the `PolicyEvaluator` constructor needs. - // In this case there's no actual `Worker`, we're going to run all the setting validation Wasm - // invocations inside of the thread of `WorkerPool` - let worker_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); let mut errors = vec![]; - for (id, policy) in policies.iter() { - let mut policy_evaluator = match build_policy_evaluator( - id, - worker_id, - policy, - engine, - policy_modules, - callback_handler_tx.clone(), - policy_evaluation_limit_seconds, - ) { - Ok(pe) => pe, - Err(e) => { - errors.push(format!("[{id}] cannot create PolicyEvaluator: {e:?}")); - continue; - } - }; - let eval_ctx = EvaluationContext { - policy_id: id.to_owned(), - callback_channel: Some(callback_handler_tx.clone()), - ctx_aware_resources_allow_list: policy.context_aware_resources.clone(), - }; - - let set_val_rep = policy_evaluator - .validate_settings(&policy.settings_to_json()?.unwrap_or_default(), &eval_ctx); + for (policy_id, _policy) in policies.iter() { + let set_val_rep = evaluation_environment.validate_settings(policy_id)?; if !set_val_rep.valid { errors.push(format!( "[{}] settings are not valid: {:?}", - id, set_val_rep.message + policy_id, set_val_rep.message )); continue; } diff --git a/src/workers/worker.rs b/src/workers/worker.rs index 0faede9b..a776fd60 100644 --- a/src/workers/worker.rs +++ b/src/workers/worker.rs @@ -1,37 +1,22 @@ use anyhow::Result; -use itertools::Itertools; -use policy_evaluator::callback_requests::CallbackRequest; -use policy_evaluator::wasmtime; use policy_evaluator::{ admission_response::{AdmissionResponse, AdmissionResponseStatus}, policy_evaluator::ValidateRequest, }; -use std::{collections::HashMap, fmt, time::Instant}; -use tokio::sync::mpsc::{Receiver, Sender}; +use std::{sync::Arc, time::Instant}; +use tokio::sync::mpsc::Receiver; use tracing::{error, info, info_span}; use crate::communication::{EvalRequest, RequestOrigin}; -use crate::config::{Policy, PolicyMode}; +use crate::config::PolicyMode; use crate::metrics::{self}; -use crate::workers::{precompiled_policy::PrecompiledPolicies, EvaluationEnvironment}; +use crate::workers::EvaluationEnvironment; pub(crate) struct Worker { - evaluation_environment: EvaluationEnvironment, + evaluation_environment: Arc, channel_rx: Receiver, } -pub struct PolicyErrors(HashMap); - -impl fmt::Display for PolicyErrors { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut errors = self - .0 - .iter() - .map(|(policy, error)| format!("[{policy}: {error}]")); - write!(f, "{}", errors.join(", ")) - } -} - impl Worker { /// Create a new Worker. Takes care of allocating the `PolicyEvaluator` environments /// required to evaluate the policies. @@ -45,45 +30,13 @@ impl Worker { skip_all, )] pub(crate) fn new( - worker_id: u64, rx: Receiver, - policies: &HashMap, - precompiled_policies: &PrecompiledPolicies, - engine: wasmtime::Engine, - callback_handler_tx: Sender, - always_accept_admission_reviews_on_namespace: Option, - policy_evaluation_limit_seconds: Option, - ) -> Result { - let mut evs_errors = HashMap::new(); - let mut evaluation_environment = - EvaluationEnvironment::new(worker_id, always_accept_admission_reviews_on_namespace); - - for (id, policy) in policies.iter() { - // It's safe to clone the outer engine. This creates a shallow copy - let inner_engine = engine.clone(); - if let Err(e) = evaluation_environment.register( - id, - policy, - &inner_engine, - precompiled_policies, - callback_handler_tx.clone(), - policy_evaluation_limit_seconds, - ) { - evs_errors.insert( - id.clone(), - format!("[{id}] could not create PolicyEvaluator: {e:?}"), - ); - } - } - - if !evs_errors.is_empty() { - return Err(PolicyErrors(evs_errors)); - } - - Ok(Worker { + evaluation_environment: Arc, + ) -> Self { + Worker { evaluation_environment, channel_rx: rx, - }) + } } // Returns a validation response with policy-server specific @@ -265,7 +218,7 @@ mod tests { fn create_evaluation_environment_that_accepts_request( policy_mode: PolicyMode, - ) -> EvaluationEnvironment { + ) -> Arc { let mut mock_evaluation_environment = EvaluationEnvironment::default(); mock_evaluation_environment .expect_validate() @@ -285,7 +238,7 @@ mod tests { mock_evaluation_environment .expect_should_always_accept_requests_made_inside_of_namespace() .returning(|_namespace| false); - mock_evaluation_environment + Arc::new(mock_evaluation_environment) } #[derive(Clone)] @@ -298,7 +251,7 @@ mod tests { policy_mode: PolicyMode, rejection_details: EvaluationEnvironmentRejectionDetails, allowed_namespace: String, - ) -> EvaluationEnvironment { + ) -> Arc { let mut mock_evaluation_environment = EvaluationEnvironment::default(); mock_evaluation_environment .expect_validate() @@ -319,7 +272,7 @@ mod tests { .expect_should_always_accept_requests_made_inside_of_namespace() .returning(move |namespace| namespace == allowed_namespace); - mock_evaluation_environment + Arc::new(mock_evaluation_environment) } #[test] From 37c3de9b4271fa7e9b7a76a850ed368a64acacec Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Wed, 13 Dec 2023 09:21:17 +0100 Subject: [PATCH 6/7] test: fix broken integration tests Introduce custom error types for `EvaluationEnvironment` and make use of them to properly identify not found policies. This is required to properly return a 404 response inside of the UI. This regression was detected by the integration tests. Signed-off-by: Flavio Castelli --- src/workers/error.rs | 18 +++++ src/workers/evaluation_environment.rs | 94 ++++++++++++++++----------- src/workers/mod.rs | 1 + src/workers/worker.rs | 19 ++++-- 4 files changed, 88 insertions(+), 44 deletions(-) create mode 100644 src/workers/error.rs diff --git a/src/workers/error.rs b/src/workers/error.rs new file mode 100644 index 00000000..a96b481b --- /dev/null +++ b/src/workers/error.rs @@ -0,0 +1,18 @@ +use thiserror::Error; + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +pub enum EvaluationError { + #[error("unknown policy: {0}")] + PolicyNotFound(String), + + #[error("bootstrap failure: {0}")] + BootstrapFailure(String), + + #[error("WebAssembly failure: {0}")] + WebAssemblyError(String), + + #[error("{0}")] + InternalError(String), +} diff --git a/src/workers/evaluation_environment.rs b/src/workers/evaluation_environment.rs index 3ff26bda..1dad02d5 100644 --- a/src/workers/evaluation_environment.rs +++ b/src/workers/evaluation_environment.rs @@ -1,4 +1,3 @@ -use anyhow::{anyhow, Result}; use policy_evaluator::{ admission_response::AdmissionResponse, callback_requests::CallbackRequest, @@ -14,6 +13,7 @@ use tracing::debug; use crate::communication::EvalRequest; use crate::config::PolicyMode; +use crate::workers::error::{EvaluationError, Result}; use crate::workers::{ policy_evaluation_settings::PolicyEvaluationSettings, precompiled_policy::{PrecompiledPolicies, PrecompiledPolicy}, @@ -84,18 +84,23 @@ impl EvaluationEnvironment { }; for (policy_id, policy) in policies { - let precompiled_policy = precompiled_policies - .get(&policy.url) - .ok_or_else(|| anyhow!("cannot find policy settings of {}", policy_id))?; - - eval_env.register( - engine, - policy_id, - precompiled_policy, - policy, - callback_handler_tx.clone(), - policy_evaluation_limit_seconds, - )?; + let precompiled_policy = precompiled_policies.get(&policy.url).ok_or_else(|| { + EvaluationError::BootstrapFailure(format!( + "cannot find policy settings of {}", + policy_id + )) + })?; + + eval_env + .register( + engine, + policy_id, + precompiled_policy, + policy, + callback_handler_tx.clone(), + policy_evaluation_limit_seconds, + ) + .map_err(|e| EvaluationError::BootstrapFailure(e.to_string()))?; } Ok(eval_env) @@ -158,7 +163,10 @@ impl EvaluationEnvironment { let policy_eval_settings = PolicyEvaluationSettings { policy_mode: policy.policy_mode.clone(), allowed_to_mutate: policy.allowed_to_mutate.unwrap_or(false), - settings: policy.settings_to_json()?.unwrap_or_default(), + settings: policy + .settings_to_json() + .map_err(|e| EvaluationError::InternalError(e.to_string()))? + .unwrap_or_default(), }; self.policy_id_to_settings .insert(policy_id.to_owned(), policy_eval_settings); @@ -179,7 +187,7 @@ impl EvaluationEnvironment { self.policy_id_to_settings .get(policy_id) .map(|settings| settings.policy_mode.clone()) - .ok_or(anyhow!("cannot find policy with ID {policy_id}")) + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string())) } /// Given a policy ID, returns true if the policy is allowed to mutate @@ -187,29 +195,36 @@ impl EvaluationEnvironment { self.policy_id_to_settings .get(policy_id) .map(|settings| settings.allowed_to_mutate) - .ok_or(anyhow!("cannot find policy with ID {policy_id}")) + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string())) } /// Perform a request validation pub fn validate(&self, policy_id: &str, req: &EvalRequest) -> Result { - let settings = self.policy_id_to_settings.get(policy_id).ok_or(anyhow!( - "cannot find settings for policy with ID {policy_id}" - ))?; + let settings = self + .policy_id_to_settings + .get(policy_id) + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string()))?; let mut evaluator = self.rehydrate(policy_id)?; - let eval_ctx = self.policy_id_to_eval_ctx.get(policy_id).ok_or(anyhow!( - "cannot find evaluation context for policy with ID {policy_id}" - ))?; + let eval_ctx = + self.policy_id_to_eval_ctx + .get(policy_id) + .ok_or(EvaluationError::InternalError(format!( + "cannot find evaluation context for policy with ID {policy_id}" + )))?; Ok(evaluator.validate(req.req.clone(), &settings.settings, eval_ctx)) } /// Validate the settings the user provided for the given policy pub fn validate_settings(&self, policy_id: &str) -> Result { - let settings = self.policy_id_to_settings.get(policy_id).ok_or(anyhow!( - "cannot find settings for policy with ID {policy_id}" - ))?; + let settings = + self.policy_id_to_settings + .get(policy_id) + .ok_or(EvaluationError::InternalError(format!( + "cannot find settings for policy with ID {policy_id}" + )))?; let mut evaluator = self.rehydrate(policy_id)?; @@ -221,21 +236,20 @@ impl EvaluationEnvironment { let module_digest = self .policy_id_to_module_digest .get(policy_id) - .ok_or(anyhow!( - "cannot find module_digest for policy with ID {policy_id}" - ))?; + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string()))?; let policy_evaluator_pre = self .module_digest_to_policy_evaluator_pre .get(module_digest) - .ok_or(anyhow!( - "cannot find PolicyEvaluatorPre for policy with ID {policy_id}" - ))?; + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string()))?; - let eval_ctx = self.policy_id_to_eval_ctx.get(policy_id).ok_or(anyhow!( - "cannot find evaluation context for policy with ID {policy_id}" - ))?; + let eval_ctx = self + .policy_id_to_eval_ctx + .get(policy_id) + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string()))?; - policy_evaluator_pre.rehydrate(eval_ctx) + policy_evaluator_pre.rehydrate(eval_ctx).map_err(|e| { + EvaluationError::WebAssemblyError(format!("cannot rehydrate PolicyEvaluatorPre: {e}")) + }) } } @@ -249,7 +263,11 @@ fn create_wasmtime_module( // full control of the precompiled module. This is generated by the // WorkerPool thread unsafe { wasmtime::Module::deserialize(engine, &precompiled_policy.precompiled_module) } - .map_err(|e| anyhow!("could not rehydrate wasmtime::Module {policy_url}: {e:?}")) + .map_err(|e| { + EvaluationError::WebAssemblyError(format!( + "could not rehydrate wasmtime::Module {policy_url}: {e:?}" + )) + }) } /// Internal function, takes care of creating the `PolicyEvaluator` instance for the given policy @@ -269,7 +287,9 @@ fn create_policy_evaluator_pre( policy_evaluator_builder.enable_epoch_interruptions(limit, limit); } - policy_evaluator_builder.build_pre() + policy_evaluator_builder.build_pre().map_err(|e| { + EvaluationError::WebAssemblyError(format!("cannot build PolicyEvaluatorPre {e}")) + }) } #[cfg(test)] diff --git a/src/workers/mod.rs b/src/workers/mod.rs index ab2bb702..72b831c5 100644 --- a/src/workers/mod.rs +++ b/src/workers/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod error; mod evaluation_environment; mod policy_evaluation_settings; pub(crate) mod pool; diff --git a/src/workers/worker.rs b/src/workers/worker.rs index a776fd60..255ec528 100644 --- a/src/workers/worker.rs +++ b/src/workers/worker.rs @@ -1,4 +1,3 @@ -use anyhow::Result; use policy_evaluator::{ admission_response::{AdmissionResponse, AdmissionResponseStatus}, policy_evaluator::ValidateRequest, @@ -10,7 +9,10 @@ use tracing::{error, info, info_span}; use crate::communication::{EvalRequest, RequestOrigin}; use crate::config::PolicyMode; use crate::metrics::{self}; -use crate::workers::EvaluationEnvironment; +use crate::workers::{ + error::{EvaluationError, Result}, + EvaluationEnvironment, +}; pub(crate) struct Worker { evaluation_environment: Arc, @@ -101,13 +103,16 @@ impl Worker { let span = info_span!(parent: &req.parent_span, "policy_eval"); let _enter = span.enter(); - let admission_response = self.evaluate(&req).unwrap_or_else(|e| { - AdmissionResponse::reject_internal_server_error( + let admission_response = match self.evaluate(&req) { + Ok(ar) => Some(ar), + Err(EvaluationError::PolicyNotFound(_)) => None, + Err(e) => Some(AdmissionResponse::reject_internal_server_error( req.req.uid().to_owned(), e.to_string(), - ) - }); - if let Err(e) = req.resp_chan.send(Some(admission_response)) { + )), + }; + + if let Err(e) = req.resp_chan.send(admission_response) { error!("cannot send response back: {e:?}"); } } From ad951ab3c1120fa7e90307dc7deceabae5600b76 Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Wed, 13 Dec 2023 15:09:07 +0100 Subject: [PATCH 7/7] refactor: adapt to new API of policy-evaluator Adapt the code to the new API exposed by latest version of policy-evaluator. On top of that, adding some extra unit tests. Signed-off-by: Flavio Castelli --- Cargo.lock | 6 +- Cargo.toml | 3 +- src/workers/evaluation_environment.rs | 95 +++++++++++++++++------ src/workers/policy_evaluation_settings.rs | 1 + src/workers/pool.rs | 3 +- 5 files changed, 77 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7f4452a..811fb832 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -502,7 +502,7 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "burrego" version = "0.3.4" -source = "git+https://github.com/flavio/policy-evaluator?branch=on-demand#8752bbda6a1599a7ed877735bed64fb50ff0c643" +source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.13.0#5df5c76c4c8e4295c0e1a1573b01e9c427ee033b" dependencies = [ "base64 0.21.5", "chrono", @@ -3364,8 +3364,8 @@ checksum = "14e6ab3f592e6fb464fc9712d8d6e6912de6473954635fd76a589d832cffcbb0" [[package]] name = "policy-evaluator" -version = "0.12.2" -source = "git+https://github.com/flavio/policy-evaluator?branch=on-demand#8752bbda6a1599a7ed877735bed64fb50ff0c643" +version = "0.13.0" +source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.13.0#5df5c76c4c8e4295c0e1a1573b01e9c427ee033b" dependencies = [ "anyhow", "base64 0.21.5", diff --git a/Cargo.toml b/Cargo.toml index 800fd35a..4c9b3026 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,8 +28,7 @@ opentelemetry = { version = "0.21", default-features = false, features = [ ] } opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] } procfs = "0.16" -#policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.12.2" } -policy-evaluator = { git = "https://github.com/flavio/policy-evaluator", branch = "on-demand" } +policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.13.0" } rayon = "1.8" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } diff --git a/src/workers/evaluation_environment.rs b/src/workers/evaluation_environment.rs index 1dad02d5..3dc78773 100644 --- a/src/workers/evaluation_environment.rs +++ b/src/workers/evaluation_environment.rs @@ -198,34 +198,28 @@ impl EvaluationEnvironment { .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string())) } - /// Perform a request validation - pub fn validate(&self, policy_id: &str, req: &EvalRequest) -> Result { + /// Given a policy ID, returns the settings provided by the user inside of `policies.yml` + fn get_policy_settings(&self, policy_id: &str) -> Result { let settings = self .policy_id_to_settings .get(policy_id) - .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string()))?; + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string()))? + .clone(); - let mut evaluator = self.rehydrate(policy_id)?; + Ok(settings) + } - let eval_ctx = - self.policy_id_to_eval_ctx - .get(policy_id) - .ok_or(EvaluationError::InternalError(format!( - "cannot find evaluation context for policy with ID {policy_id}" - )))?; + /// Perform a request validation + pub fn validate(&self, policy_id: &str, req: &EvalRequest) -> Result { + let settings = self.get_policy_settings(policy_id)?; + let mut evaluator = self.rehydrate(policy_id)?; - Ok(evaluator.validate(req.req.clone(), &settings.settings, eval_ctx)) + Ok(evaluator.validate(req.req.clone(), &settings.settings)) } /// Validate the settings the user provided for the given policy pub fn validate_settings(&self, policy_id: &str) -> Result { - let settings = - self.policy_id_to_settings - .get(policy_id) - .ok_or(EvaluationError::InternalError(format!( - "cannot find settings for policy with ID {policy_id}" - )))?; - + let settings = self.get_policy_settings(policy_id)?; let mut evaluator = self.rehydrate(policy_id)?; Ok(evaluator.validate_settings(&settings.settings)) @@ -294,15 +288,17 @@ fn create_policy_evaluator_pre( #[cfg(test)] mod tests { + use policy_evaluator::{ + admission_response::AdmissionResponse, policy_evaluator::ValidateRequest, + }; + use rstest::*; use std::collections::BTreeSet; use super::*; + use crate::admission_review::tests::build_admission_review; use crate::config::Policy; - /// Given to identical wasm modules, only one instance of PolicyEvaluator is going to be - /// created - #[test] - fn avoid_duplicated_instaces_of_policy_evaluator() { + fn build_evaluation_environment() -> Result { let engine = wasmtime::Engine::default(); let policy_ids = vec!["policy_1", "policy_2"]; let module = wasmtime::Module::new(&engine, "(module (func))") @@ -333,7 +329,7 @@ mod tests { precompiled_policies.insert(policy_url, precompiled_policy.clone()); } - let evaluation_environment = EvaluationEnvironment::new( + EvaluationEnvironment::new( &engine, &policies, &precompiled_policies, @@ -341,7 +337,58 @@ mod tests { None, callback_handler_tx, ) - .unwrap(); + } + + #[rstest] + #[case("policy_not_defined", true)] + #[case("policy_1", false)] + fn return_policy_not_found_error(#[case] policy_id: &str, #[case] expect_error: bool) { + let eval_env = build_evaluation_environment().unwrap(); + let req = ValidateRequest::AdmissionRequest( + build_admission_review().request.expect("no request"), + ); + + let (tx, _) = tokio::sync::oneshot::channel::>(); + let eval_req = EvalRequest { + policy_id: policy_id.to_string(), + req, + resp_chan: tx, + parent_span: tracing::Span::none(), + request_origin: crate::communication::RequestOrigin::Validate, + }; + + if expect_error { + assert!(matches!( + eval_env.get_policy_mode(policy_id), + Err(EvaluationError::PolicyNotFound(_)) + )); + assert!(matches!( + eval_env.get_policy_allowed_to_mutate(policy_id), + Err(EvaluationError::PolicyNotFound(_)) + )); + assert!(matches!( + eval_env.get_policy_settings(policy_id), + Err(EvaluationError::PolicyNotFound(_)) + )); + assert!(matches!( + eval_env.validate(policy_id, &eval_req), + Err(EvaluationError::PolicyNotFound(_)) + )); + } else { + assert!(eval_env.get_policy_mode(policy_id).is_ok()); + assert!(eval_env.get_policy_allowed_to_mutate(policy_id).is_ok()); + assert!(eval_env.get_policy_settings(policy_id).is_ok()); + // note: we do not test `validate` with a known policy because this would + // cause another error. The test policy we're using is just an empty Wasm + // module + } + } + + /// Given to identical wasm modules, only one instance of PolicyEvaluator is going to be + /// created + #[test] + fn avoid_duplicated_instaces_of_policy_evaluator() { + let evaluation_environment = build_evaluation_environment().unwrap(); assert_eq!( evaluation_environment diff --git a/src/workers/policy_evaluation_settings.rs b/src/workers/policy_evaluation_settings.rs index 47ae3a3c..aba66b2b 100644 --- a/src/workers/policy_evaluation_settings.rs +++ b/src/workers/policy_evaluation_settings.rs @@ -5,6 +5,7 @@ use crate::config::PolicyMode; /// Holds the evaluation settings of loaded Policy. These settings are taken straight from the /// `policies.yml` file provided by the user #[cfg_attr(test, allow(dead_code))] +#[derive(Clone)] pub(crate) struct PolicyEvaluationSettings { /// Whether the policy is operating in `protect` or `monitor` mode pub(crate) policy_mode: PolicyMode, diff --git a/src/workers/pool.rs b/src/workers/pool.rs index 5b9c07c7..f2154a4e 100644 --- a/src/workers/pool.rs +++ b/src/workers/pool.rs @@ -1,12 +1,11 @@ use anyhow::{anyhow, Result}; use core::time; -use lazy_static::lazy_static; use policy_evaluator::{callback_requests::CallbackRequest, wasmtime}; use rayon::prelude::*; use std::{ collections::HashMap, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, Barrier, RwLock, }, thread,