From b4da28cf19817a615175df9197d40d3bb60637c3 Mon Sep 17 00:00:00 2001 From: Flavio Castelli Date: Wed, 22 Nov 2023 22:44:43 +0100 Subject: [PATCH] fix: reduce memory consumption of Policy Server Reduce the memory consumption of Policy Server when multiple instances of the same Wasm module are loaded. Thanks to this change, a worker will have only once instance of `PolicyEvaluator` (hence of wasmtime stack), per unique type of module. Literally speaking, if a user has the `apparmor` policy deployed 5 times (different names, settings,...) only one instance of `PolicyEvaluator` will be allocated for it. Note: the optimization works at the worker level. Meaning that `PolicyEvaluator` are NOT sharing these instances between themselves. This commit helps to address issue https://github.com/kubewarden/kubewarden-controller/issues/528 Signed-off-by: Flavio Castelli --- Cargo.lock | 322 ++++++++++++------ Cargo.toml | 5 +- src/settings.rs | 3 +- src/workers/evaluation_environment.rs | 293 +++++++++++++++++ src/workers/mod.rs | 6 + src/workers/policy_evaluation_settings.rs | 15 + src/workers/pool.rs | 88 ++++- src/workers/worker.rs | 383 ++++++++++------------ 8 files changed, 784 insertions(+), 331 deletions(-) create mode 100644 src/workers/evaluation_environment.rs create mode 100644 src/workers/policy_evaluation_settings.rs diff --git a/Cargo.lock b/Cargo.lock index 8b957d6c..27dd82be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,15 +172,15 @@ dependencies = [ [[package]] name = "async-executor" -version = "1.7.1" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de517d5a758a65a16d18d8f605e7a6beed477444cca270116af40fd3cd59d27" +checksum = "4b0c4a4f319e45986f347ee47fef8bf5e81c9abc3f6f58dc2391439f30df65f0" dependencies = [ - "async-lock 3.1.0", + "async-lock 2.8.0", "async-task", "concurrent-queue", "fastrand 2.0.1", - "futures-lite 2.0.1", + "futures-lite 1.13.0", "slab", ] @@ -397,12 +397,6 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" -[[package]] -name = "base64" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" - [[package]] name = "base64" version = "0.21.5" @@ -489,14 +483,14 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "burrego" version = "0.3.4" -source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.12.1#12f678594e30fc8f69747f272dd9130f9bfe127a" +source = "git+https://github.com/flavio/policy-evaluator?branch=reduce-memory-usage#49f35edcc1df1bcd4c841bc5956f6be5912be8d4" dependencies = [ "base64 0.21.5", "chrono", "chrono-tz", "gtmpl", "gtmpl_value", - "itertools 0.11.0", + "itertools 0.12.0", "json-patch", "lazy_static", "regex", @@ -609,7 +603,7 @@ checksum = "6ffc30dee200c20b4dcb80572226f42658e1d9c4b668656d7cc59c33d50e396e" dependencies = [ "cap-primitives", "cap-std", - "rustix 0.38.21", + "rustix 0.38.24", "smallvec", ] @@ -625,7 +619,7 @@ dependencies = [ "io-lifetimes 2.0.2", "ipnet", "maybe-owned", - "rustix 0.38.21", + "rustix 0.38.24", "windows-sys 0.48.0", "winx", ] @@ -649,7 +643,7 @@ dependencies = [ "cap-primitives", "io-extras", "io-lifetimes 2.0.2", - "rustix 0.38.21", + "rustix 0.38.24", ] [[package]] @@ -660,7 +654,7 @@ checksum = "f8f52b3c8f4abfe3252fd0a071f3004aaa3b18936ec97bdbd8763ce03aff6247" dependencies = [ "cap-primitives", "once_cell", - "rustix 0.38.21", + "rustix 0.38.24", "winx", ] @@ -675,10 +669,11 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.84" +version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f8e7c90afad890484a21653d08b6e209ae34770fb5ee298f9c699fcc1e5c856" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" dependencies = [ + "jobserver", "libc", ] @@ -1183,6 +1178,12 @@ dependencies = [ "serde", ] +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.10.7" @@ -1286,6 +1287,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dyn-clone" version = "1.0.16" @@ -1294,9 +1301,9 @@ checksum = "545b22097d44f8a9581187cdf93de7a71e4722bf51200cfaba810865b49a495d" [[package]] name = "ecdsa" -version = "0.16.8" +version = "0.16.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4b1e0c257a9e9f25f90ff76d7a68360ed497ee519c8e428d1825ef0000799d4" +checksum = "ee27f32b5c5292967d2d4a9d7f1e0b0aed2c15daded5a60300e4abb9d8020bca" dependencies = [ "der", "digest", @@ -1318,15 +1325,16 @@ dependencies = [ [[package]] name = "ed25519-dalek" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7277392b266383ef8396db7fdeb1e77b6c52fed775f5df15bb24f35b72156980" +checksum = "1f628eaec48bfd21b865dc2950cfa014450c01d2fa2b69a86c2fd5844ec523c0" dependencies = [ "curve25519-dalek", "ed25519", "rand_core", "serde", "sha2", + "subtle", "zeroize", ] @@ -1338,9 +1346,9 @@ checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" [[package]] name = "elliptic-curve" -version = "0.13.6" +version = "0.13.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d97ca172ae9dc9f9b779a6e3a65d308f2af74e5b8c921299075bdb4a0370e914" +checksum = "e9775b22bc152ad86a0cf23f0f348b884b26add12bf741e7ffc4d4ab2ab4d205" dependencies = [ "base16ct", "crypto-bigint", @@ -1383,9 +1391,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.6" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c18ee0ed65a5f1f81cac6b1d213b69c35fa47d4252ad41f1486dbd8226fe36e" +checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8" dependencies = [ "libc", "windows-sys 0.48.0", @@ -1446,7 +1454,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b0377f1edc77dbd1118507bc7a66e4ab64d2b90c66f90726dc801e73a8c68f9" dependencies = [ "cfg-if", - "rustix 0.38.21", + "rustix 0.38.24", "windows-sys 0.48.0", ] @@ -1462,9 +1470,9 @@ dependencies = [ [[package]] name = "fiat-crypto" -version = "0.2.3" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f69037fe1b785e84986b4f2cbcf647381876a00671d25ceef715d7812dd7e1dd" +checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" [[package]] name = "flagset" @@ -1482,6 +1490,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1497,6 +1514,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fs-set-times" version = "0.20.0" @@ -1504,7 +1527,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd738b84894214045e8414eaded76359b4a5773f0a0a56b16575110739cdcf39" dependencies = [ "io-lifetimes 2.0.2", - "rustix 0.38.21", + "rustix 0.38.24", "windows-sys 0.48.0", ] @@ -1752,9 +1775,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.21" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" +checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" dependencies = [ "bytes", "fnv", @@ -1762,7 +1785,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 1.9.3", + "indexmap 2.1.0", "slab", "tokio", "tokio-util", @@ -1865,9 +1888,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f95b9abcae896730d42b78e09c155ed4ddf82c07b4de772c64aee5b2d8b7c150" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" dependencies = [ "bytes", "fnv", @@ -2107,15 +2130,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.12.0" @@ -2173,6 +2187,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" +[[package]] +name = "jobserver" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.65" @@ -2245,9 +2268,9 @@ dependencies = [ [[package]] name = "kube" -version = "0.86.0" +version = "0.87.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8647c2211a9b480d910b155d573602c52cd5f646acecb06a03d594865dc4784" +checksum = "e34392aea935145070dcd5b39a6dea689ac6534d7d117461316c3d157b1d0fc3" dependencies = [ "k8s-openapi", "kube-client", @@ -2256,11 +2279,11 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.86.0" +version = "0.87.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af8952521f3e8ce11920229e5f2965fef70525aecd9efc7b65e39bf9e2c6f66d" +checksum = "7266548b9269d9fa19022620d706697e64f312fb2ba31b93e6986453fcc82c92" dependencies = [ - "base64 0.20.0", + "base64 0.21.5", "bytes", "chrono", "either", @@ -2292,9 +2315,9 @@ dependencies = [ [[package]] name = "kube-core" -version = "0.86.0" +version = "0.87.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7608a0cd05dfa36167d2da982bb70f17feb5450f73ec601f6d428bbcf991c5b9" +checksum = "b8321c315b96b59f59ef6b33f604b84b905ab8f9ff114a4f909d934c520227b1" dependencies = [ "chrono", "form_urlencoded", @@ -2476,7 +2499,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b2cffa4ad52c6f791f4f8b15f0c05f9824b2ced1160e88cc393d64fff9a8ac64" dependencies = [ - "rustix 0.38.21", + "rustix 0.38.24", ] [[package]] @@ -2524,6 +2547,45 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockall" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "mockall_double" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae71c7bb287375187c775cf82e2dcf1bef3388aaf58f0789a77f9c7ab28466f6" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "multer" version = "2.1.0" @@ -2548,6 +2610,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27b02d87554356db9e9a873add8782d4ea6e3e58ea071a9adb9a2e8ddb884a8b" +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2868,9 +2936,9 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.21.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5b3ce3f5705e2ae493be467a0b23be4bc563c193cdb7713e55372c89a906b34" +checksum = "968ba3f2ca03e90e5187f5e4f46c791ef7f2c163ae87789c8ce5f5ca3b7b7de5" dependencies = [ "async-trait", "crossbeam-channel", @@ -3277,7 +3345,7 @@ checksum = "14e6ab3f592e6fb464fc9712d8d6e6912de6473954635fd76a589d832cffcbb0" [[package]] name = "policy-evaluator" version = "0.12.1" -source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.12.1#12f678594e30fc8f69747f272dd9130f9bfe127a" +source = "git+https://github.com/flavio/policy-evaluator?branch=reduce-memory-usage#49f35edcc1df1bcd4c841bc5956f6be5912be8d4" dependencies = [ "anyhow", "base64 0.21.5", @@ -3286,7 +3354,7 @@ dependencies = [ "chrono", "dns-lookup", "email_address", - "itertools 0.11.0", + "itertools 0.12.0", "json-patch", "k8s-openapi", "kube", @@ -3310,7 +3378,7 @@ dependencies = [ "wapc", "wasi-cap-std-sync", "wasi-common", - "wasmparser 0.115.0", + "wasmparser 0.116.1", "wasmtime", "wasmtime-provider", "wasmtime-wasi", @@ -3358,6 +3426,8 @@ dependencies = [ "itertools 0.12.0", "k8s-openapi", "lazy_static", + "mockall", + "mockall_double", "num_cpus", "opentelemetry", "opentelemetry-otlp", @@ -3419,11 +3489,41 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "predicates" +version = "2.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59230a63c37f3e18569bdb90e4a89cbf5bf8b06fea0b84e65ea10cc4df47addd" +dependencies = [ + "difflib", + "float-cmp", + "itertools 0.10.5", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "primeorder" -version = "0.13.3" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7dbe9ed3b56368bd99483eb32fe9c17fdd3730aebadc906918ce78d54c7eeb4" +checksum = "353e1ca18966c16d9deb1c69278edbc5f194139612772bd9537af60ac231e1e6" dependencies = [ "elliptic-curve", ] @@ -3473,7 +3573,7 @@ dependencies = [ "hex", "lazy_static", "procfs-core", - "rustix 0.38.21", + "rustix 0.38.24", ] [[package]] @@ -3835,9 +3935,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.21" +version = "0.38.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b426b0506e5d50a7d8dafcf2e81471400deb602392c7dd110815afb4eaf02a3" +checksum = "9ad981d6c340a49cdc40a1028d9c6084ec7e9fa33fcb839cab656a267071e234" dependencies = [ "bitflags 2.4.1", "errno", @@ -3850,9 +3950,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.8" +version = "0.21.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "446e14c5cda4f3f30fe71863c34ec70f5ac79d6087097ad0bb433e1be5edf04c" +checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9" dependencies = [ "log", "ring 0.17.5", @@ -4205,9 +4305,9 @@ dependencies = [ [[package]] name = "signature" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e1788eed21689f9cf370582dfc467ef36ed9c707f073528ddafa8d83e3b8500" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ "digest", "rand_core", @@ -4442,7 +4542,7 @@ dependencies = [ "cap-std", "fd-lock", "io-lifetimes 2.0.2", - "rustix 0.38.21", + "rustix 0.38.24", "windows-sys 0.48.0", "winx", ] @@ -4462,10 +4562,16 @@ dependencies = [ "cfg-if", "fastrand 2.0.1", "redox_syscall", - "rustix 0.38.21", + "rustix 0.38.24", "windows-sys 0.48.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.50" @@ -4776,17 +4882,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - [[package]] name = "tracing-log" version = "0.2.0" @@ -4811,7 +4906,7 @@ dependencies = [ "smallvec", "tracing", "tracing-core", - "tracing-log 0.2.0", + "tracing-log", "tracing-subscriber", "web-time", ] @@ -4828,9 +4923,9 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.17" +version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ "matchers", "nu-ansi-term", @@ -4843,7 +4938,7 @@ dependencies = [ "thread_local", "tracing", "tracing-core", - "tracing-log 0.1.4", + "tracing-log", "tracing-serde", ] @@ -5127,7 +5222,7 @@ dependencies = [ "io-extras", "io-lifetimes 2.0.2", "once_cell", - "rustix 0.38.21", + "rustix 0.38.24", "system-interface", "tracing", "wasi-common", @@ -5146,7 +5241,7 @@ dependencies = [ "cap-std", "io-extras", "log", - "rustix 0.38.21", + "rustix 0.38.24", "thiserror", "tracing", "wasmtime", @@ -5231,9 +5326,9 @@ dependencies = [ [[package]] name = "wasm-encoder" -version = "0.36.2" +version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "822b645bf4f2446b949776ffca47e2af60b167209ffb70814ef8779d299cd421" +checksum = "7d135e8940b69dbee0f5b0a0be9c1cd6fa8b71d774904c13a3fcfc5dc265e43d" dependencies = [ "leb128", ] @@ -5271,14 +5366,25 @@ dependencies = [ "semver", ] +[[package]] +name = "wasmparser" +version = "0.117.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b206de0c992af9f0b51ef2fb9455623e0a19eb68f172cd8ba9cd0e46637f5ab" +dependencies = [ + "hashbrown 0.14.2", + "indexmap 2.1.0", + "semver", +] + [[package]] name = "wasmprinter" -version = "0.2.72" +version = "0.2.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aff4df0cdf1906ec040e97d78c3fc8fd26d3f8d70adaac81f07f80957b63b54" +checksum = "7a4fdb34710b461c868c3f79a10a48b404f23b46fd471ab02bcaa60fd96c5c4b" dependencies = [ "anyhow", - "wasmparser 0.116.1", + "wasmparser 0.117.0", ] [[package]] @@ -5341,7 +5447,7 @@ dependencies = [ "bincode", "directories-next", "log", - "rustix 0.38.21", + "rustix 0.38.24", "serde", "serde_derive", "sha2", @@ -5443,7 +5549,7 @@ checksum = "345a8b061c9eab459e10b9112df9fc357d5a9e8b5b1004bc5fc674fba9be6d2a" dependencies = [ "cc", "cfg-if", - "rustix 0.38.21", + "rustix 0.38.24", "wasmtime-asm-macros", "wasmtime-versioned-export-macros", "windows-sys 0.48.0", @@ -5465,7 +5571,7 @@ dependencies = [ "log", "object", "rustc-demangle", - "rustix 0.38.21", + "rustix 0.38.24", "serde", "serde_derive", "target-lexicon", @@ -5484,7 +5590,7 @@ checksum = "109a9e46afe33580b952b14a4207354355f19bcdf0b47485b397b68409eaf553" dependencies = [ "object", "once_cell", - "rustix 0.38.21", + "rustix 0.38.24", "wasmtime-versioned-export-macros", ] @@ -5536,7 +5642,7 @@ dependencies = [ "memoffset", "paste", "rand", - "rustix 0.38.21", + "rustix 0.38.24", "sptr", "wasm-encoder 0.35.0", "wasmtime-asm-macros", @@ -5594,7 +5700,7 @@ dependencies = [ "libc", "log", "once_cell", - "rustix 0.38.21", + "rustix 0.38.24", "system-interface", "thiserror", "tokio", @@ -5653,23 +5759,23 @@ dependencies = [ [[package]] name = "wast" -version = "67.0.1" +version = "68.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a974d82fac092b5227c1663e16514e7a85f32014e22e6fdcb08b71aec9d3fb1e" +checksum = "7bf3081ac6bcb3a5b72a401693b3566feb529dc2b7e7b62ea544c8a30d0f4d05" dependencies = [ "leb128", "memchr", "unicode-width", - "wasm-encoder 0.36.2", + "wasm-encoder 0.37.0", ] [[package]] name = "wat" -version = "1.0.79" +version = "1.0.80" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adb220934f92f8551144c0003d1bc57a060674c99139f45ed623fbbf6d9262e7" +checksum = "6fabe07d22a837b3bd5662ba9e980d73de115c040923659a1801934c7ccebe49" dependencies = [ - "wast 67.0.1", + "wast 68.0.0", ] [[package]] @@ -6019,18 +6125,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.25" +version = "0.7.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cd369a67c0edfef15010f980c3cbe45d7f651deac2cd67ce097cd801de16557" +checksum = "e97e415490559a91254a2979b4829267a57d2fcd741a98eee8b722fb57289aa0" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.25" +version = "0.7.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f140bda219a26ccc0cdb03dba58af72590c53b22642577d88a927bc5c87d6b" +checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f" dependencies = [ "proc-macro2", "quote", @@ -6039,9 +6145,9 @@ dependencies = [ [[package]] name = "zeroize" -version = "1.6.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" dependencies = [ "zeroize_derive", ] diff --git a/Cargo.toml b/Cargo.toml index effe2dd1..a2a9b973 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,8 @@ opentelemetry = { version = "0.21", default-features = false, features = [ ] } opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] } procfs = "0.16" -policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.12.1" } +#policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.12.1" } +policy-evaluator = { git = "https://github.com/flavio/policy-evaluator", branch = "reduce-memory-usage" } rayon = "1.8" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } @@ -44,7 +45,9 @@ warp = { version = "0.3.6", default_features = false, features = [ "tls", ] } semver = { version = "1.0.20", features = ["serde"] } +mockall_double = "0.3" [dev-dependencies] +mockall = "0.11" rstest = "0.18" tempfile = "3.8.1" diff --git a/src/settings.rs b/src/settings.rs index d52d86ef..9eff80d5 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, Result}; +use policy_evaluator::policy_evaluator::PolicySettings; use policy_evaluator::policy_metadata::ContextAwareResource; use serde::Deserialize; use serde_yaml::Value; @@ -39,7 +40,7 @@ pub struct Policy { } impl Policy { - pub fn settings_to_json(&self) -> Result>> { + pub fn settings_to_json(&self) -> Result> { match self.settings.as_ref() { None => Ok(None), Some(settings) => { diff --git a/src/workers/evaluation_environment.rs b/src/workers/evaluation_environment.rs new file mode 100644 index 00000000..82affb08 --- /dev/null +++ b/src/workers/evaluation_environment.rs @@ -0,0 +1,293 @@ +use anyhow::{anyhow, Result}; +use policy_evaluator::{ + admission_response::AdmissionResponse, callback_requests::CallbackRequest, + evaluation_context::EvaluationContext, policy_evaluator::PolicyEvaluator, + policy_evaluator_builder::PolicyEvaluatorBuilder, wasmtime, +}; +use std::collections::HashMap; +use tokio::sync::mpsc; + +use crate::communication::EvalRequest; +use crate::settings::PolicyMode; +use crate::workers::{ + policy_evaluation_settings::PolicyEvaluationSettings, + precompiled_policy::{PrecompiledPolicies, PrecompiledPolicy}, +}; + +#[cfg(test)] +use mockall::automock; + +/// This structure contains all the policies defined by the user inside of the `policies.yml`. +/// It also provides helper methods to perform the validation of a request and the validation +/// of the settings provided by the user. +/// +/// Each worker has its own dedicated instance of this structure. +/// At the worker level, the ultimate goal is to avoid duplicated instances of `PolicyEvaluator`. +/// That means that, given two or more identical Wasm modules, only one `PolicyEvaluator` +/// should be created. This is required to avoid a waste of memory by the Policy Server +/// process. +/// +/// Note: the `PolicyEvaluator` instances will still be duplicated across each worker. This is +/// something we have to deal with. +#[derive(Default)] +#[cfg_attr(test, allow(dead_code))] +pub(crate) struct EvaluationEnvironment { + /// Unique ID of the worker + worker_id: u64, + + /// The name of the Namespace where Policy Server doesn't operate. All the requests + /// involving this Namespace are going to be accepted. This is usually done to prevent user + /// policies from messing with the components of the Kubewarden stack (which are all + /// deployed inside of the same Namespace). + always_accept_admission_reviews_on_namespace: Option, + + /// A map with the unique ID of a Wasm module as key, and the associated `PolicyEvaluator` + /// instance as value. + /// Currently we the `module_id` is obtained by computing the sha255 digest of the + /// optimized Wasm module. + /// This dictionary allows us to reduce by amount of memory consumed by Policy Server. + module_id_to_evaluator: HashMap, + + /// A map with the ID of the policy as value, and the associated `EvaluationContext` as + /// value. + /// In this case, `policy_id` is the name of the policy as declared inside of the + /// `policies.yml` file. These names are guaranteed to be unique. + policy_id_to_eval_ctx: HashMap, + + /// Map a `policy_id` (the name given by the user inside of `policies.yml`) to the + /// `module_id`. This allows us to deduplicate the Wasm modules defined by the user. + policy_id_to_module_id: HashMap, + + /// Map a `policy_id` to the `PolicyEvaluationSettings` instance. This allows us to obtain + /// the list of settings to be used when evaluating a given policy. + policy_id_to_settings: HashMap, +} + +#[cfg_attr(test, automock)] +#[cfg_attr(test, allow(dead_code))] +impl EvaluationEnvironment { + /// Creates a new `EvaluationEnvironment` + pub(crate) fn new( + worker_id: u64, + always_accept_admission_reviews_on_namespace: Option, + ) -> Self { + Self { + worker_id, + always_accept_admission_reviews_on_namespace, + ..Default::default() + } + } + + /// Returns `true` if the given `namespace` is the special Namespace that is ignored by all + /// the policies + pub(crate) fn should_always_accept_requests_made_inside_of_namespace( + &self, + namespace: &str, + ) -> bool { + self.always_accept_admission_reviews_on_namespace.as_deref() == Some(namespace) + } + + /// Register a new policy. It takes care of creating a new `PolicyEvaluator` (when needed). + /// + /// Params: + /// - `policy_id`: the ID of the policy, as specified inside of the `policies.yml` by the + /// user + /// - `policy`: a data structure that maps all the information defined inside of + /// `policies.yml` for the given policy + /// - `engine`: the `wasmtime::Engine` to be used when creating the `PolicyEvaluator` + /// - `policy_modules`: all the `wasmtime::Module` precompiled for the current + /// OS/architecture + /// - `callback_handler_tx`: the transmission end of a channel that connects the worker + /// with the asynchronous world + /// - `policy_evaluation_limit_seconds`: when set, defines after how many seconds the + /// policy evaluation is interrupted + pub(crate) fn register( + &mut self, + policy_id: &str, + policy: &crate::settings::Policy, + engine: &wasmtime::Engine, + policy_modules: &PrecompiledPolicies, + callback_handler_tx: mpsc::Sender, + policy_evaluation_limit_seconds: Option, + ) -> Result<()> { + let precompiled_policy = policy_modules.get(policy.url.as_str()).ok_or_else(|| { + anyhow!( + "could not find preoptimized module for policy: {:?}", + policy.url + ) + })?; + let module_id = precompiled_policy.digest.clone(); + + if !self.module_id_to_evaluator.contains_key(&module_id) { + let evaluator = create_policy_evaluator( + policy_id, + self.worker_id, + policy, + engine, + precompiled_policy, + callback_handler_tx.clone(), + policy_evaluation_limit_seconds, + )?; + self.module_id_to_evaluator + .insert(module_id.clone(), evaluator); + } + self.policy_id_to_module_id + .insert(policy_id.to_owned(), module_id); + + let policy_eval_settings = PolicyEvaluationSettings { + policy_mode: policy.policy_mode.clone(), + allowed_to_mutate: policy.allowed_to_mutate.unwrap_or(false), + settings: policy.settings_to_json()?.unwrap_or_default(), + }; + self.policy_id_to_settings + .insert(policy_id.to_owned(), policy_eval_settings); + + let eval_ctx = EvaluationContext { + policy_id: policy_id.to_owned(), + callback_channel: Some(callback_handler_tx.clone()), + ctx_aware_resources_allow_list: policy.context_aware_resources.clone(), + }; + self.policy_id_to_eval_ctx + .insert(policy_id.to_owned(), eval_ctx); + + Ok(()) + } + + /// Given a policy ID, return how the policy operates + pub fn get_policy_mode(&self, policy_id: &str) -> Result { + self.policy_id_to_settings + .get(policy_id) + .map(|settings| settings.policy_mode.clone()) + .ok_or(anyhow!("cannot find policy with ID {policy_id}")) + } + + /// Given a policy ID, returns true if the policy is allowed to mutate + pub fn get_policy_allowed_to_mutate(&self, policy_id: &str) -> Result { + self.policy_id_to_settings + .get(policy_id) + .map(|settings| settings.allowed_to_mutate) + .ok_or(anyhow!("cannot find policy with ID {policy_id}")) + } + + /// Given a policy ID and a request to be processed, uses the `PolicyEvaluator` to perform + /// a validation operation. + pub fn validate(&mut self, policy_id: &str, req: &EvalRequest) -> Result { + let settings = self.policy_id_to_settings.get(policy_id).ok_or(anyhow!( + "cannot find settings for policy with ID {policy_id}" + ))?; + + let module_id = self.policy_id_to_module_id.get(policy_id).ok_or(anyhow!( + "cannot find module_id for policy with ID {policy_id}" + ))?; + let evaluator = self + .module_id_to_evaluator + .get_mut(module_id) + .ok_or(anyhow!( + "cannot find evaluator for policy with ID {policy_id}" + ))?; + + let eval_ctx = self.policy_id_to_eval_ctx.get(policy_id).ok_or(anyhow!( + "cannot find evaluation context for policy with ID {policy_id}" + ))?; + + Ok(evaluator.validate(req.req.clone(), &settings.settings, eval_ctx)) + } +} + +/// Internal function, takes care of creating the `PolicyEvaluator` instance for the given policy +fn create_policy_evaluator( + policy_id: &str, + worker_id: u64, + policy: &crate::settings::Policy, + engine: &wasmtime::Engine, + precompiled_policy: &PrecompiledPolicy, + callback_handler_tx: mpsc::Sender, + policy_evaluation_limit_seconds: Option, +) -> Result { + // See `wasmtime::Module::deserialize` to know why this method is `unsafe`. + // However, in our context, nothing bad will happen because we have + // full control of the precompiled module. This is generated by the + // WorkerPool thread + let module = + unsafe { wasmtime::Module::deserialize(engine, &precompiled_policy.precompiled_module) } + .map_err(|e| { + anyhow!( + "could not rehydrate wasmtime::Module {}: {:?}", + policy.url, + e + ) + })?; + + let mut policy_evaluator_builder = + PolicyEvaluatorBuilder::new(policy_id.to_string(), worker_id) + .engine(engine.clone()) + .policy_module(module) + .context_aware_resources_allowed(policy.context_aware_resources.clone()) + .callback_channel(callback_handler_tx) + .execution_mode(precompiled_policy.execution_mode); + + if let Some(limit) = policy_evaluation_limit_seconds { + policy_evaluator_builder = + policy_evaluator_builder.enable_epoch_interruptions(limit, limit); + } + + policy_evaluator_builder.build() +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use super::*; + use crate::settings::Policy; + + /// Given to identical wasm modules, only one instance of PolicyEvaluator is going to be + /// created + #[test] + fn avoid_duplicated_instaces_of_policy_evaluator() { + let engine = wasmtime::Engine::default(); + let policy_ids = vec!["policy_1", "policy_2"]; + let module = wasmtime::Module::new(&engine, "(module (func))") + .expect("should be able to build the smallest wasm module ever"); + let (callback_handler_tx, _) = mpsc::channel(10); + + let precompiled_policy = PrecompiledPolicy { + precompiled_module: module.serialize().unwrap(), + execution_mode: policy_evaluator::policy_evaluator::PolicyExecutionMode::Wasi, + digest: "unique-digest".to_string(), + }; + + let mut policies = HashMap::new(); + let mut policy_modules = HashMap::new(); + + for policy_id in &policy_ids { + policies.insert( + policy_id.to_owned(), + Policy { + url: policy_id.to_string(), + policy_mode: PolicyMode::Protect, + allowed_to_mutate: None, + settings: None, + context_aware_resources: HashSet::new(), + }, + ); + policy_modules.insert(policy_id.to_string(), precompiled_policy.clone()); + } + + let mut evaluation_environment = EvaluationEnvironment::new(0, None); + for policy_id in policy_ids { + evaluation_environment + .register( + policy_id, + &policies[policy_id], + &engine, + &policy_modules, + callback_handler_tx.clone(), + None, + ) + .unwrap(); + } + + assert_eq!(evaluation_environment.module_id_to_evaluator.len(), 1); + } +} diff --git a/src/workers/mod.rs b/src/workers/mod.rs index 312639cd..ab2bb702 100644 --- a/src/workers/mod.rs +++ b/src/workers/mod.rs @@ -1,3 +1,9 @@ +mod evaluation_environment; +mod policy_evaluation_settings; pub(crate) mod pool; pub(crate) mod precompiled_policy; pub(crate) mod worker; + +// This is required to mock the `EvaluationEnvironment` inside of our tests +#[mockall_double::double] +pub(crate) use evaluation_environment::EvaluationEnvironment; diff --git a/src/workers/policy_evaluation_settings.rs b/src/workers/policy_evaluation_settings.rs new file mode 100644 index 00000000..0607e33e --- /dev/null +++ b/src/workers/policy_evaluation_settings.rs @@ -0,0 +1,15 @@ +use policy_evaluator::policy_evaluator::PolicySettings; + +use crate::settings::PolicyMode; + +/// Holds the evaluation settings of loaded Policy. These settings are taken straight from the +/// `policies.yml` file provided by the user +#[cfg_attr(test, allow(dead_code))] +pub(crate) struct PolicyEvaluationSettings { + /// Whether the policy is operating in `protect` or `monitor` mode + pub(crate) policy_mode: PolicyMode, + /// Determines if a mutating policy is actually allowed to mutate + pub(crate) allowed_to_mutate: bool, + /// The policy-specific settings provided by the user + pub(crate) settings: PolicySettings, +} diff --git a/src/workers/pool.rs b/src/workers/pool.rs index 5547fbda..1eb43d87 100644 --- a/src/workers/pool.rs +++ b/src/workers/pool.rs @@ -1,16 +1,15 @@ use anyhow::{anyhow, Result}; use core::time; +use lazy_static::lazy_static; use policy_evaluator::{ - callback_requests::CallbackRequest, - policy_evaluator::{Evaluator, PolicyEvaluator}, - policy_evaluator_builder::PolicyEvaluatorBuilder, - wasmtime, + callback_requests::CallbackRequest, evaluation_context::EvaluationContext, + policy_evaluator::PolicyEvaluator, policy_evaluator_builder::PolicyEvaluatorBuilder, wasmtime, }; use rayon::prelude::*; use std::{ collections::HashMap, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, Barrier, RwLock, }, thread, @@ -27,15 +26,44 @@ use crate::workers::{ worker::Worker, }; +lazy_static! { + /// Used to create unique worker IDs + static ref WORKER_ID: Arc = Arc::new(AtomicU64::new(0)); +} + +/// Coordinates a set of workers. +/// Each worker takes care of performing the evaluation of the requests received by Policy Server +/// API endpoints. +/// +/// The HTTP API communicates with the worker pool via a dedicated chanel. The pool then assigns +/// the evaluation job to one of the workers. Currently this is done on a round-robin fashion. pub(crate) struct WorkerPool { + /// Channel used by the HTTP API to send to the pool the evaluation requests that have to be + /// processed api_rx: mpsc::Receiver, + + /// A oneshot channel used during the bootstrap phase. It's being used by the `main` to send + /// the data used to bootstrap of the workers. bootstrap_rx: oneshot::Receiver, + + /// The channel that connect the synchronous world of the workers with the tokio task that + /// handles all the async requests that might originate during the policy evaluation process. + /// For example, requesting Kubernetes resources, DNS resolution, Sigstore operations,... callback_handler_tx: mpsc::Sender, + + /// When set, this is the Namespace where all the policies do not apply. This is usually set + /// to be the Namespace where the Kubewarden is deployed; ensuring the user policies are not + /// going to interfere with the Kubewarden stack. always_accept_admission_reviews_on_namespace: Option, + + /// When set enables the policy timeout feature which prevents a rogue policy to enter an + /// endless loop/consume all the resources of a worker policy_evaluation_limit_seconds: Option, } impl WorkerPool { + /// Create a new `WorkerPool`, no bootstrap operation is done yet. This happens when invoking + /// the `run` method. pub(crate) fn new( bootstrap_rx: oneshot::Receiver, api_rx: mpsc::Receiver, @@ -52,9 +80,17 @@ impl WorkerPool { } } + /// Bootstrap the pool and then enter an endless loop that processes incoming requests. pub(crate) fn run(mut self) { + // The WorkerPool communicates with each worker over dedicated `mpsc::channel` (one per worker). + // This vector holds all the sender ends of these channels. let mut worker_tx_chans = Vec::>::new(); + + // Each worker has its own `wasmtime::Engine`, we have to keep track of them here because + // we have to increase their epoch tick to implement the policy timeout protection feature let mut worker_engines = Vec::::new(); + + // All the join handles of the spawned worker threads let mut join_handles = Vec::>>::new(); // Phase 1: wait for bootstrap data to be received by the main @@ -95,6 +131,9 @@ impl WorkerPool { } }; + // For each policy defined by the user, ensure the given settings are valid + // We exit with an error if one or more policies do not have valid + // settings. if let Err(error) = verify_policy_settings( &engine, &bootstrap_data.policies, @@ -133,8 +172,10 @@ impl WorkerPool { let (tx, rx) = mpsc::channel::(32); worker_tx_chans.push(tx); - // Each worker has its own wasmtime::Engine, sharing the + // Each worker has its own `wasmtime::Engine`, sharing the // same engine across all the workers leads to bad performance + // TODO: revisit this statement, it seems this issue has been solved by latest wasmtime + // releases let engine = match wasmtime::Engine::new(&wasmtime_config) { Ok(e) => e, Err(e) => { @@ -152,6 +193,8 @@ impl WorkerPool { return; } }; + // Note well: it's fast and cheap to clone a `wasmtime::Engine` as stated by the + // official docs. It's just a reference counter under the hood worker_engines.push(engine.clone()); let modules = precompiled_policies.clone(); @@ -165,7 +208,9 @@ impl WorkerPool { let join = thread::spawn(move || -> Result<()> { info!(spawned = n, total = pool_size, "spawning worker"); + let worker_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); let mut worker = match Worker::new( + worker_id, rx, &policies, &modules, @@ -203,6 +248,7 @@ impl WorkerPool { // meaning a lot of memory would have been consumed without a valid reason // during the whole execution time drop(precompiled_policies); + drop(policies); barrier.wait(); if !boot_canary.load(Ordering::SeqCst) { @@ -259,6 +305,7 @@ impl WorkerPool { pub(crate) fn build_policy_evaluator( policy_id: &str, + worker_id: u64, policy: &crate::settings::Policy, engine: &wasmtime::Engine, policy_modules: &PrecompiledPolicies, @@ -286,13 +333,13 @@ pub(crate) fn build_policy_evaluator( ) })?; - let mut policy_evaluator_builder = PolicyEvaluatorBuilder::new(policy_id.to_string()) - .engine(engine.clone()) - .policy_module(module) - .settings(policy.settings_to_json()?) - .context_aware_resources_allowed(policy.context_aware_resources.clone()) - .callback_channel(callback_handler_tx) - .execution_mode(policy_module.execution_mode); + let mut policy_evaluator_builder = + PolicyEvaluatorBuilder::new(policy_id.to_string(), worker_id) + .engine(engine.clone()) + .policy_module(module) + .context_aware_resources_allowed(policy.context_aware_resources.clone()) + .callback_channel(callback_handler_tx) + .execution_mode(policy_module.execution_mode); if let Some(limit) = policy_evaluation_limit_seconds { policy_evaluator_builder = @@ -345,6 +392,7 @@ fn precompile_policies( .collect()) } +/// Ensure the user provided valid settings for all the policies fn verify_policy_settings( engine: &wasmtime::Engine, policies: &HashMap, @@ -375,10 +423,15 @@ fn verify_policy_settings( }); } + // We have to create a worker_id because this is what the `PolicyEvaluator` constructor needs. + // In this case there's no actual `Worker`, we're going to run all the setting validation Wasm + // invocations inside of the thread of `WorkerPool` + let worker_id = WORKER_ID.fetch_add(1, Ordering::Relaxed); let mut errors = vec![]; for (id, policy) in policies.iter() { let mut policy_evaluator = match build_policy_evaluator( id, + worker_id, policy, engine, policy_modules, @@ -391,7 +444,14 @@ fn verify_policy_settings( continue; } }; - let set_val_rep = policy_evaluator.validate_settings(); + let eval_ctx = EvaluationContext { + policy_id: id.to_owned(), + callback_channel: Some(callback_handler_tx.clone()), + ctx_aware_resources_allow_list: policy.context_aware_resources.clone(), + }; + + let set_val_rep = policy_evaluator + .validate_settings(&policy.settings_to_json()?.unwrap_or_default(), &eval_ctx); if !set_val_rep.valid { errors.push(format!( "[{}] settings are not valid: {:?}", diff --git a/src/workers/worker.rs b/src/workers/worker.rs index 730773fe..c528ce35 100644 --- a/src/workers/worker.rs +++ b/src/workers/worker.rs @@ -1,10 +1,10 @@ -use anyhow::{anyhow, Result}; +use anyhow::Result; use itertools::Itertools; use policy_evaluator::callback_requests::CallbackRequest; use policy_evaluator::wasmtime; use policy_evaluator::{ admission_response::{AdmissionResponse, AdmissionResponseStatus}, - policy_evaluator::{Evaluator, ValidateRequest}, + policy_evaluator::ValidateRequest, }; use std::{collections::HashMap, fmt, time::Instant}; use tokio::sync::mpsc::{Receiver, Sender}; @@ -13,17 +13,10 @@ use tracing::{error, info, info_span}; use crate::communication::{EvalRequest, RequestOrigin}; use crate::metrics::{self}; use crate::settings::{Policy, PolicyMode}; -use crate::workers::precompiled_policy::PrecompiledPolicies; - -struct PolicyEvaluatorWithSettings { - policy_evaluator: Box, - policy_mode: PolicyMode, - allowed_to_mutate: bool, - always_accept_admission_reviews_on_namespace: Option, -} +use crate::workers::{precompiled_policy::PrecompiledPolicies, EvaluationEnvironment}; pub(crate) struct Worker { - evaluators: HashMap, + evaluation_environment: EvaluationEnvironment, channel_rx: Receiver, } @@ -40,12 +33,19 @@ impl fmt::Display for PolicyErrors { } impl Worker { + /// Create a new Worker. Takes care of allocating the `PolicyEvaluator` environments + /// required to evaluate the policies. + /// + /// No check is done against the policy settings provided by the user. The `WorkerPool` + /// already verified that all the settings are valid. + #[allow(clippy::too_many_arguments)] #[tracing::instrument( name = "worker_new", fields(host=crate::cli::HOSTNAME.as_str()), skip_all, )] pub(crate) fn new( + worker_id: u64, rx: Receiver, policies: &HashMap, precompiled_policies: &PrecompiledPolicies, @@ -55,12 +55,13 @@ impl Worker { policy_evaluation_limit_seconds: Option, ) -> Result { let mut evs_errors = HashMap::new(); - let mut evs = HashMap::new(); + let mut evaluation_environment = + EvaluationEnvironment::new(worker_id, always_accept_admission_reviews_on_namespace); for (id, policy) in policies.iter() { // It's safe to clone the outer engine. This creates a shallow copy let inner_engine = engine.clone(); - let policy_evaluator = match crate::workers::pool::build_policy_evaluator( + if let Err(e) = evaluation_environment.register( id, policy, &inner_engine, @@ -68,25 +69,11 @@ impl Worker { callback_handler_tx.clone(), policy_evaluation_limit_seconds, ) { - Ok(pe) => Box::new(pe), - Err(e) => { - evs_errors.insert( - id.clone(), - format!("[{id}] could not create PolicyEvaluator: {e:?}"), - ); - continue; - } - }; - - let policy_evaluator_with_settings = PolicyEvaluatorWithSettings { - policy_evaluator, - policy_mode: policy.policy_mode.clone(), - allowed_to_mutate: policy.allowed_to_mutate.unwrap_or(false), - always_accept_admission_reviews_on_namespace: - always_accept_admission_reviews_on_namespace.clone(), - }; - - evs.insert(id.to_string(), policy_evaluator_with_settings); + evs_errors.insert( + id.clone(), + format!("[{id}] could not create PolicyEvaluator: {e:?}"), + ); + } } if !evs_errors.is_empty() { @@ -94,7 +81,7 @@ impl Worker { } Ok(Worker { - evaluators: evs, + evaluation_environment, channel_rx: rx, }) } @@ -154,31 +141,40 @@ impl Worker { } } + /// Endless loop that waits for the WorkerPool to give a new request to be processed. + /// The request is then evaluated and a response is returned back to the WorkerPool. pub(crate) fn run(&mut self) { while let Some(req) = self.channel_rx.blocking_recv() { let span = info_span!(parent: &req.parent_span, "policy_eval"); let _enter = span.enter(); - let res = match self.evaluators.get_mut(&req.policy_id) { - Some(pes) => Self::evaluate(req, pes), - None => req - .resp_chan - .send(None) - .map_err(|_| anyhow!("cannot send response back")), - }; - if res.is_err() { - error!("receiver dropped"); + let admission_response = self.evaluate(&req).unwrap_or_else(|e| { + AdmissionResponse::reject_internal_server_error( + req.req.uid().to_owned(), + e.to_string(), + ) + }); + if let Err(e) = req.resp_chan.send(Some(admission_response)) { + error!("cannot send response back: {e:?}"); } } } - fn evaluate(req: EvalRequest, pes: &mut PolicyEvaluatorWithSettings) -> anyhow::Result<()> { + /// Perform the actual evaluation + fn evaluate(&mut self, req: &EvalRequest) -> Result { let start_time = Instant::now(); - let policy_name = pes.policy_evaluator.policy_id(); - let policy_mode = pes.policy_mode.clone(); - let allowed_to_mutate = pes.allowed_to_mutate; - let vanilla_validation_response = pes.policy_evaluator.validate(req.req.clone()); + let policy_name = req.policy_id.clone(); + let policy_mode = self + .evaluation_environment + .get_policy_mode(&req.policy_id)?; + let allowed_to_mutate = self + .evaluation_environment + .get_policy_allowed_to_mutate(&req.policy_id)?; + + let vanilla_validation_response = + self.evaluation_environment.validate(&req.policy_id, req)?; + let policy_evaluation_duration = start_time.elapsed(); let accepted = vanilla_validation_response.allowed; let mutated = vanilla_validation_response.patch.is_some(); @@ -200,17 +196,23 @@ impl Worker { match req.req.clone() { ValidateRequest::AdmissionRequest(adm_req) => { - if let Some(namespace) = &pes.always_accept_admission_reviews_on_namespace { - // If the policy server is configured to - // always accept admission reviews on a - // given namespace, just set the `allowed` - // part of the response to `true` if the - // request matches this namespace. Keep - // the rest of the behaviors unchanged, - // such as checking if the policy is - // allowed to mutate. - - if adm_req.namespace == Some(namespace.to_string()) { + // TODO: we should check immediately if the request is coming from the "always + // accepted" namespace ASAP. Right now we do an evaluation and then we discard the + // result if the namespace is the special one. + // Moreover, I (flavio) don't like the fact we're using a mutable variable for + // `validation_response` + if let Some(ref req_namespace) = adm_req.namespace { + if self + .evaluation_environment + .should_always_accept_requests_made_inside_of_namespace(req_namespace) + { + // given namespace, just set the `allowed` + // part of the response to `true` if the + // request matches this namespace. Keep + // the rest of the behaviors unchanged, + // such as checking if the policy is + // allowed to mutate. + validation_response = AdmissionResponse { allowed: true, status: None, @@ -218,7 +220,6 @@ impl Worker { }; } } - let policy_evaluation = metrics::PolicyEvaluation { policy_name, policy_mode: policy_mode.into(), @@ -246,9 +247,7 @@ impl Worker { metrics::add_policy_evaluation(&policy_evaluation); } }; - - let res = req.resp_chan.send(Some(validation_response)); - res.map_err(|_| anyhow!("cannot send response back")) + Ok(validation_response) } } @@ -256,82 +255,70 @@ impl Worker { mod tests { use crate::admission_review::tests::build_admission_review; use crate::communication::RequestOrigin; - - use policy_evaluator::kubewarden_policy_sdk::settings::SettingsValidationResponse; - use policy_evaluator::ProtocolVersion; use rstest::*; - use tokio::sync::oneshot; + use tokio::sync::{mpsc, oneshot}; use super::*; const POLICY_ID: &str = "policy-id"; - struct MockPolicyEvaluator { - pub policy_id: String, - pub admission_response: AdmissionResponse, - pub settings_validation_response: SettingsValidationResponse, - pub protocol_version: Result, - } - - impl Default for MockPolicyEvaluator { - fn default() -> Self { - Self { - policy_id: "mock_policy".to_string(), - admission_response: AdmissionResponse { - allowed: false, - ..Default::default() - }, - settings_validation_response: SettingsValidationResponse { - valid: true, - message: None, - }, - protocol_version: Ok(ProtocolVersion::V1), - } - } - } - - impl MockPolicyEvaluator { - fn new_allowed() -> MockPolicyEvaluator { - MockPolicyEvaluator { - admission_response: AdmissionResponse { + fn create_evaluation_environment_that_accepts_request( + policy_mode: PolicyMode, + ) -> EvaluationEnvironment { + let mut mock_evaluation_environment = EvaluationEnvironment::default(); + mock_evaluation_environment + .expect_validate() + .returning(|_policy_id, request| { + Ok(AdmissionResponse { + uid: request.req.uid().to_owned(), allowed: true, ..Default::default() - }, - ..Default::default() - } - } - - fn new_rejected(message: Option, code: Option) -> MockPolicyEvaluator { - MockPolicyEvaluator { - admission_response: AdmissionResponse { - allowed: false, - status: Some(AdmissionResponseStatus { message, code }), - ..Default::default() - }, - ..Default::default() - } - } + }) + }); + mock_evaluation_environment + .expect_get_policy_mode() + .returning(move |_policy_id| Ok(policy_mode.clone())); + mock_evaluation_environment + .expect_get_policy_allowed_to_mutate() + .returning(|_policy_id| Ok(false)); + mock_evaluation_environment + .expect_should_always_accept_requests_made_inside_of_namespace() + .returning(|_namespace| false); + mock_evaluation_environment } - impl Evaluator for MockPolicyEvaluator { - fn validate(&mut self, _request: ValidateRequest) -> AdmissionResponse { - self.admission_response.clone() - } - - fn validate_settings(&mut self) -> SettingsValidationResponse { - self.settings_validation_response.clone() - } - - fn protocol_version(&mut self) -> Result { - match &self.protocol_version { - Ok(pv) => Ok(pv.clone()), - Err(e) => Err(anyhow::anyhow!("{}", e)), - } - } + #[derive(Clone)] + struct EvaluationEnvironmentRejectionDetails { + message: String, + code: u16, + } - fn policy_id(&self) -> String { - self.policy_id.clone() - } + fn create_evaluation_environment_that_reject_request( + policy_mode: PolicyMode, + rejection_details: EvaluationEnvironmentRejectionDetails, + allowed_namespace: String, + ) -> EvaluationEnvironment { + let mut mock_evaluation_environment = EvaluationEnvironment::default(); + mock_evaluation_environment + .expect_validate() + .returning(move |_policy_id, request| { + Ok(AdmissionResponse::reject( + request.req.uid().to_owned(), + rejection_details.message.clone(), + rejection_details.code, + )) + }); + mock_evaluation_environment + .expect_get_policy_mode() + .returning(move |_policy_id| Ok(policy_mode.clone())); + mock_evaluation_environment + .expect_get_policy_allowed_to_mutate() + .returning(|_policy_id| Ok(false)); + mock_evaluation_environment + .expect_should_always_accept_requests_made_inside_of_namespace() + .returning(move |namespace| namespace == allowed_namespace); + + mock_evaluation_environment } #[test] @@ -621,7 +608,7 @@ mod tests { #[case] policy_mode: PolicyMode, #[case] request_origin: RequestOrigin, ) { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let req = ValidateRequest::AdmissionRequest( build_admission_review().request.expect("no request"), ); @@ -634,21 +621,13 @@ mod tests { request_origin, }; - let mock_evaluator = MockPolicyEvaluator::new_allowed(); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let mut worker = Worker { + channel_rx, + evaluation_environment: create_evaluation_environment_that_accepts_request(policy_mode), }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); } @@ -663,7 +642,7 @@ mod tests { #[case] request_origin: RequestOrigin, #[case] accept: bool, ) { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let req = ValidateRequest::AdmissionRequest( build_admission_review().request.expect("no request"), ); @@ -676,23 +655,22 @@ mod tests { request_origin, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message.clone(), code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( policy_mode, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + rejection_details.clone(), + "".to_string(), + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); if accept { assert!(response.allowed); @@ -700,14 +678,14 @@ mod tests { } else { assert!(!response.allowed); let response_status = response.status.expect("should be set"); - assert_eq!(response_status.message, message); - assert_eq!(response_status.code, code); + assert_eq!(response_status.message, Some(rejection_details.message)); + assert_eq!(response_status.code, Some(rejection_details.code)); } } #[test] fn evaluate_policy_evaluator_accepts_request_raw() { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let request = serde_json::json!(r#"{"foo": "bar"}"#); let req = ValidateRequest::Raw(request.clone()); @@ -720,27 +698,21 @@ mod tests { request_origin: RequestOrigin::Validate, }; - let mock_evaluator = MockPolicyEvaluator::new_allowed(); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let mut worker = Worker { + channel_rx, + evaluation_environment: create_evaluation_environment_that_accepts_request( + PolicyMode::Protect, + ), }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); } #[test] fn evaluate_policy_evaluator_rejects_request_raw() { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let request = serde_json::json!(r#"{"foo": "bar"}"#); let req = ValidateRequest::Raw(request.clone()); @@ -753,28 +725,26 @@ mod tests { request_origin: RequestOrigin::Validate, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message.clone(), code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( + PolicyMode::Protect, + rejection_details.clone(), + "".to_string(), + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); - + let response = worker.evaluate(&eval_req).unwrap(); assert!(!response.allowed); let response_status = response.status.expect("should be set"); - assert_eq!(response_status.message, message); - assert_eq!(response_status.code, code); + assert_eq!(response_status.message, Some(rejection_details.message)); + assert_eq!(response_status.code, Some(rejection_details.code)); } #[rstest] @@ -788,7 +758,7 @@ mod tests { // of a namespace that is ignored by kubewarden -> this leads the // request to still be accepted - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let allowed_namespace = "kubewarden_special".to_string(); @@ -804,23 +774,22 @@ mod tests { request_origin, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message, code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: Some(allowed_namespace), + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( + PolicyMode::Protect, + rejection_details.clone(), + allowed_namespace, + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); assert!(response.status.is_none()); }