diff --git a/Cargo.lock b/Cargo.lock index d763f9e6..811fb832 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,13 +186,13 @@ dependencies = [ [[package]] name = "async-global-executor" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b4353121d5644cdf2beb5726ab752e79a8db1ebb52031770ec47db31d245526" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" dependencies = [ "async-channel 2.1.1", "async-executor", - "async-io 2.2.1", + "async-io 2.2.2", "async-lock 3.2.0", "blocking", "futures-lite 2.1.0", @@ -221,9 +221,9 @@ dependencies = [ [[package]] name = "async-io" -version = "2.2.1" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6d3b15875ba253d1110c740755e246537483f152fa334f91abd7fe84c88b3ff" +checksum = "6afaa937395a620e33dc6a742c593c01aced20aa376ffb0f628121198578ccc7" dependencies = [ "async-lock 3.2.0", "cfg-if", @@ -303,7 +303,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -320,7 +320,7 @@ checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -502,7 +502,7 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "burrego" version = "0.3.4" -source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.12.2#f43d8b2b9f21b4f9053058fc957649b8bbe7551e" +source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.13.0#5df5c76c4c8e4295c0e1a1573b01e9c427ee033b" dependencies = [ "base64 0.21.5", "chrono", @@ -850,18 +850,18 @@ dependencies = [ [[package]] name = "cranelift-bforest" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b5bb9245ec7dcc04d03110e538d31f0969d301c9d673145f4b4d5c3478539a3" +checksum = "8e7e56668d2263f92b691cb9e4a2fcb186ca0384941fe420484322fa559c3329" dependencies = [ "cranelift-entity", ] [[package]] name = "cranelift-codegen" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebb18d10e5ddac43ba4ca8fd4e310938569c3e484cc01b6372b27dc5bb4dfd28" +checksum = "2a9ff61938bf11615f55b80361288c68865318025632ea73c65c0b44fa16283c" dependencies = [ "bumpalo", "cranelift-bforest", @@ -880,33 +880,33 @@ dependencies = [ [[package]] name = "cranelift-codegen-meta" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a3ce6d22982c1b9b6b012654258bab1a13947bb12703518bef06b1a4867c3d6" +checksum = "50656bf19e3d4a153b404ff835b8b59e924cfa3682ebe0d3df408994f37983f6" dependencies = [ "cranelift-codegen-shared", ] [[package]] name = "cranelift-codegen-shared" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47220fd4f9a0ce23541652b6f16f83868d282602c600d14934b2a4c166b4bd80" +checksum = "388041deeb26109f1ea73c1812ea26bfd406c94cbce0bb5230aa44277e43b209" [[package]] name = "cranelift-control" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed5a4c42672aea9b6e820046b52e47a1c05d3394a6cdf4cb3c3c4b702f954bd2" +checksum = "b39b7c512ffac527e5b5df9beae3d67ab85d07dca6d88942c16195439fedd1d3" dependencies = [ "arbitrary", ] [[package]] name = "cranelift-entity" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b4e9a3296fc827f9d35135dc2c0c8dd8d8359eb1ef904bae2d55d5bcb0c9f94" +checksum = "fdb25f573701284fe2bcf88209d405342125df00764b396c923e11eafc94d892" dependencies = [ "serde", "serde_derive", @@ -914,9 +914,9 @@ dependencies = [ [[package]] name = "cranelift-frontend" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ec537d0f0b8e084517f3e7bfa1d89af343d7c7df455573fca9f272d4e01267" +checksum = "e57374fd11d72cf9ffb85ff64506ed831440818318f58d09f45b4185e5e9c376" dependencies = [ "cranelift-codegen", "log", @@ -926,15 +926,15 @@ dependencies = [ [[package]] name = "cranelift-isle" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45bab6d69919d210a50331d35cc6ce111567bc040aebac63a8ae130d0400a075" +checksum = "ae769b235f6ea2f86623a3ff157cc04a4ff131dc9fe782c2ebd35f272043581e" [[package]] name = "cranelift-native" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f32e81605f352cf37af5463f11cd7deec7b6572741931a8d372f7fdd4a744f5d" +checksum = "3dc7bfb8f13a0526fe20db338711d9354729b861c336978380bb10f7f17dd207" dependencies = [ "cranelift-codegen", "libc", @@ -943,9 +943,9 @@ dependencies = [ [[package]] name = "cranelift-wasm" -version = "0.101.4" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0edaa4cbec1bc787395c074233df2652dd62f3e29d3ee60329514a0a51e6b045" +checksum = "2c5f41a4af931b756be05af0dd374ce200aae2d52cea16b0beb07e8b52732c35" dependencies = [ "cranelift-codegen", "cranelift-entity", @@ -953,7 +953,7 @@ dependencies = [ "itertools 0.10.5", "log", "smallvec", - "wasmparser 0.115.0", + "wasmparser 0.116.1", "wasmtime-types", ] @@ -1072,7 +1072,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1129,7 +1129,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1151,7 +1151,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ "darling_core 0.20.3", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1184,7 +1184,7 @@ checksum = "5fe87ce4529967e0ba1dcf8450bab64d97dfd5010a6256187ffe2e43e6f0e049" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -1300,6 +1300,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "dyn-clone" version = "1.0.16" @@ -1512,6 +1518,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "fs-set-times" version = "0.20.1" @@ -1607,7 +1619,7 @@ checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -2135,6 +2147,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.0" @@ -2152,9 +2173,9 @@ checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" [[package]] name = "ittapi" -version = "0.3.5" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25a5c0b993601cad796222ea076565c5d9f337d35592f8622c753724f06d7271" +checksum = "6b996fe614c41395cdaedf3cf408a9534851090959d90d54a535f675550b64b1" dependencies = [ "anyhow", "ittapi-sys", @@ -2163,9 +2184,9 @@ dependencies = [ [[package]] name = "ittapi-sys" -version = "0.3.5" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb7b5e473765060536a660eed127f758cf1a810c73e49063264959c60d1727d9" +checksum = "52f5385394064fa2c886205dba02598013ce83d3e92d33dbdc0c52fe0e7bf4fc" dependencies = [ "cc", ] @@ -2302,7 +2323,7 @@ dependencies = [ "jsonpath_lib", "k8s-openapi", "kube-core", - "pem 3.0.2", + "pem 3.0.3", "pin-project", "rustls", "rustls-pemfile", @@ -2552,6 +2573,45 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mockall" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a978c8292954bcb9347a4e28772c0a0621166a1598fc1be28ac0076a4bb810e" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad2765371d0978ba4ace4ebef047baa62fc068b431e468444b5610dd441c639b" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.40", +] + +[[package]] +name = "mockall_double" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1ca96e5ac35256ae3e13536edd39b172b88f41615e1d7b653c8ad24524113e8" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.40", +] + [[package]] name = "multer" version = "2.1.0" @@ -2646,7 +2706,7 @@ checksum = "cfb77679af88f8b125209d354a202862602672222e7f2313fdd6dc349bad4712" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -3080,9 +3140,9 @@ dependencies = [ [[package]] name = "pem" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3163d2912b7c3b52d651a055f2c7eec9ba5cd22d26ef75b8dd3a59980b185923" +checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310" dependencies = [ "base64 0.21.5", "serde", @@ -3226,7 +3286,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -3304,8 +3364,8 @@ checksum = "14e6ab3f592e6fb464fc9712d8d6e6912de6473954635fd76a589d832cffcbb0" [[package]] name = "policy-evaluator" -version = "0.12.2" -source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.12.2#f43d8b2b9f21b4f9053058fc957649b8bbe7551e" +version = "0.13.0" +source = "git+https://github.com/kubewarden/policy-evaluator?tag=v0.13.0#5df5c76c4c8e4295c0e1a1573b01e9c427ee033b" dependencies = [ "anyhow", "base64 0.21.5", @@ -3386,6 +3446,8 @@ dependencies = [ "itertools 0.12.0", "k8s-openapi", "lazy_static", + "mockall", + "mockall_double", "num_cpus", "opentelemetry", "opentelemetry-otlp", @@ -3399,7 +3461,9 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "sha2", "tempfile", + "thiserror", "tokio", "tracing", "tracing-futures", @@ -3461,6 +3525,33 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "predicates" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dfc28575c2e3f19cb3c73b93af36460ae898d426eba6fc15b9bd2a5220758a0" +dependencies = [ + "anstyle", + "itertools 0.11.0", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "primeorder" version = "0.13.6" @@ -3837,7 +3928,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.39", + "syn 2.0.40", "unicode-ident", ] @@ -4099,7 +4190,7 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -4171,7 +4262,7 @@ dependencies = [ "darling 0.20.3", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -4438,9 +4529,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.39" +version = "2.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" +checksum = "13fa70a4ee923979ffb522cacce59d34421ebdea5625e1073c4326ef9d2dd42e" dependencies = [ "proc-macro2", "quote", @@ -4509,6 +4600,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "thiserror" version = "1.0.50" @@ -4526,7 +4623,7 @@ checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -4620,7 +4717,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -4796,7 +4893,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -5088,8 +5185,7 @@ dependencies = [ [[package]] name = "wapc" version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b27ae434134e725e4d7d2d41e52e4c70f974d312dd96392c443875385d85a6c" +source = "git+https://github.com/flavio/wapc-rs/?branch=expose-wasmtime-provider-pre#af803159547d0d0d710c0ab7b1fd31363a54249e" dependencies = [ "log", "parking_lot", @@ -5145,9 +5241,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasi-cap-std-sync" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fd94e147b273348ec68ae412b8bc17a4d372b9e070535b98e3e2c5a3ffd8e83" +checksum = "a4328de5cf2a0debfc48216fe9c2747badc64957837641f5836cd8b3d48d73f0" dependencies = [ "anyhow", "async-trait", @@ -5168,9 +5264,9 @@ dependencies = [ [[package]] name = "wasi-common" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d5166f7432ee36d06aa9f9bd7990a00330401fdbc75be7887ea952a299b9a19" +checksum = "84f6774ec9e464b7373f683bc57ff87fcca5fd26a7d6bdb7438fb2f56a545aa6" dependencies = [ "anyhow", "bitflags 2.4.1", @@ -5207,7 +5303,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", "wasm-bindgen-shared", ] @@ -5241,7 +5337,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5254,9 +5350,9 @@ checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "wasm-encoder" -version = "0.35.0" +version = "0.36.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca90ba1b5b0a70d3d49473c5579951f3bddc78d47b59256d2f9d4922b150aca" +checksum = "822b645bf4f2446b949776ffca47e2af60b167209ffb70814ef8779d299cd421" dependencies = [ "leb128", ] @@ -5285,9 +5381,9 @@ dependencies = [ [[package]] name = "wasmparser" -version = "0.115.0" +version = "0.116.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e06c0641a4add879ba71ccb3a1e4278fd546f76f1eafb21d8f7b07733b547cd5" +checksum = "a58e28b80dd8340cb07b8242ae654756161f6fc8d0038123d679b7b99964fa50" dependencies = [ "indexmap 2.1.0", "semver", @@ -5315,9 +5411,9 @@ dependencies = [ [[package]] name = "wasmtime" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca54f6090ce46973f33a79f265924b204f248f91aec09229bce53d19d567c1a6" +checksum = "642e12d108e800215263e3b95972977f473957923103029d7d617db701d67ba4" dependencies = [ "anyhow", "async-trait", @@ -5338,8 +5434,8 @@ dependencies = [ "serde_derive", "serde_json", "target-lexicon", - "wasm-encoder 0.35.0", - "wasmparser 0.115.0", + "wasm-encoder 0.36.2", + "wasmparser 0.116.1", "wasmtime-cache", "wasmtime-component-macro", "wasmtime-component-util", @@ -5355,18 +5451,18 @@ dependencies = [ [[package]] name = "wasmtime-asm-macros" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54984bc0b5689da87a43d7c181d23092b4d5cfcbb7ae3eb6b917dd55865d95e6" +checksum = "beada8bb15df52503de0a4c58de4357bfd2f96d9a44a6e547bad11efdd988b47" dependencies = [ "cfg-if", ] [[package]] name = "wasmtime-cache" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a4df7655bb73b592189033ab046aa47c1da486d70bc9c1ebf45e55ac030bdf4" +checksum = "aba5bf44d044d25892c03fb3534373936ee204141ff92bac8297787ac7f22318" dependencies = [ "anyhow", "base64 0.21.5", @@ -5384,14 +5480,14 @@ dependencies = [ [[package]] name = "wasmtime-component-macro" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64de99fb7c4c383832b85efcaae95f7094a5c505d80146227ce97ab436cbac68" +checksum = "56ccba556991465cca68d5a54769684bcf489fb532059da55105f851642d52c1" dependencies = [ "anyhow", "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", "wasmtime-component-util", "wasmtime-wit-bindgen", "wit-parser", @@ -5399,15 +5495,15 @@ dependencies = [ [[package]] name = "wasmtime-component-util" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f9141a8df069e106eee0c3a8173c0809cf1a4b5630628cfb1f25ab114720093" +checksum = "05492a177a6006cb73f034d6e9a6fad6da55b23c4398835cb0012b5fa51ecf67" [[package]] name = "wasmtime-cranelift" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cf3cee8be02f5006d21b773ffd6802f96a0b7d661ff2ad8a01fb93df458b1aa" +checksum = "fe2e7532f1d6adbcc57e69bb6a7c503f0859076d07a9b4b6aabe8021ff8a05fd" dependencies = [ "anyhow", "cfg-if", @@ -5422,7 +5518,7 @@ dependencies = [ "object", "target-lexicon", "thiserror", - "wasmparser 0.115.0", + "wasmparser 0.116.1", "wasmtime-cranelift-shared", "wasmtime-environ", "wasmtime-versioned-export-macros", @@ -5430,9 +5526,9 @@ dependencies = [ [[package]] name = "wasmtime-cranelift-shared" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420fd2a69bc162957f4c94f21c7fa08ecf60d916f4e87b56332507c555da381d" +checksum = "8c98d5378a856cbf058d36278627dfabf0ed68a888142958c7ae8e6af507dafa" dependencies = [ "anyhow", "cranelift-codegen", @@ -5446,9 +5542,9 @@ dependencies = [ [[package]] name = "wasmtime-environ" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb6a445ce2b2810127caee6c1b79b8da4ae57712b05556a674592c18b7500a14" +checksum = "a6d33a9f421da810a070cd56add9bc51f852bd66afbb8b920489d6242f15b70e" dependencies = [ "anyhow", "cranelift-entity", @@ -5460,8 +5556,8 @@ dependencies = [ "serde_derive", "target-lexicon", "thiserror", - "wasm-encoder 0.35.0", - "wasmparser 0.115.0", + "wasm-encoder 0.36.2", + "wasmparser 0.116.1", "wasmprinter", "wasmtime-component-util", "wasmtime-types", @@ -5469,10 +5565,11 @@ dependencies = [ [[package]] name = "wasmtime-fiber" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345a8b061c9eab459e10b9112df9fc357d5a9e8b5b1004bc5fc674fba9be6d2a" +checksum = "404741f4c6d7f4e043be2e8b466406a2aee289ccdba22bf9eba6399921121b97" dependencies = [ + "anyhow", "cc", "cfg-if", "rustix 0.38.28", @@ -5483,9 +5580,9 @@ dependencies = [ [[package]] name = "wasmtime-jit" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f0f6586c61125fbfc13c3108c3dd565d21f314dd5bac823b9a5b7ab576d21f1" +checksum = "8d0994a86d6dca5f7d9740d7f2bd0568be06d2014a550361dc1c397d289d81ef" dependencies = [ "addr2line", "anyhow", @@ -5510,9 +5607,9 @@ dependencies = [ [[package]] name = "wasmtime-jit-debug" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "109a9e46afe33580b952b14a4207354355f19bcdf0b47485b397b68409eaf553" +checksum = "4e0c4b74e606d1462d648631d5bc328e3d5b14e7f9d3ff93bc6db062fb8c5cd8" dependencies = [ "object", "once_cell", @@ -5522,9 +5619,9 @@ dependencies = [ [[package]] name = "wasmtime-jit-icache-coherence" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f67e6be36375c39cff57ed3b137ab691afbf2d9ba8ee1c01f77888413f218749" +checksum = "3090a69ba1476979e090aa7ed4bc759178bafdb65b22f98b9ba24fc6e7e578d5" dependencies = [ "cfg-if", "libc", @@ -5533,9 +5630,8 @@ dependencies = [ [[package]] name = "wasmtime-provider" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "559e7c5b79fbee0619789b0b51d8dae7a6efe46abfb2f3d90e1e2082ec49b6b0" +version = "1.12.0" +source = "git+https://github.com/flavio/wapc-rs/?branch=expose-wasmtime-provider-pre#af803159547d0d0d710c0ab7b1fd31363a54249e" dependencies = [ "anyhow", "cfg-if", @@ -5552,9 +5648,9 @@ dependencies = [ [[package]] name = "wasmtime-runtime" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d07986b2327b5e7f535ed638fbde25990fc8f85400194fda0d26db71c7b685e" +checksum = "b993ac8380385ed67bf71b51b9553edcf1ab0801b78a805a067de581b9a3e88a" dependencies = [ "anyhow", "cc", @@ -5570,7 +5666,7 @@ dependencies = [ "rand", "rustix 0.38.28", "sptr", - "wasm-encoder 0.35.0", + "wasm-encoder 0.36.2", "wasmtime-asm-macros", "wasmtime-environ", "wasmtime-fiber", @@ -5582,33 +5678,33 @@ dependencies = [ [[package]] name = "wasmtime-types" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e810a0d2e869abd1cb42bd232990f6bd211672b3d202d2ae7e70ffb97ed70ea3" +checksum = "8b5778112fcab2dc3d4371f4203ab8facf0c453dd94312b0a88dd662955e64e0" dependencies = [ "cranelift-entity", "serde", "serde_derive", "thiserror", - "wasmparser 0.115.0", + "wasmparser 0.116.1", ] [[package]] name = "wasmtime-versioned-export-macros" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09b5575a75e711ca6c36bb9ad647c93541cdc8e34218031acba5da3f35919dd3" +checksum = "f50f51f8d79bfd2aa8e9d9a0ae7c2d02b45fe412e62ff1b87c0c81b07c738231" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] name = "wasmtime-wasi" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e6730a2853226292cee755a36549dd1a443b324cf99319cb390af1afed6cb8a" +checksum = "eff3f4ad191a5e6d002bb5bffa3e2931a58984da9b30e57b48f353848748cf80" dependencies = [ "anyhow", "async-trait", @@ -5641,16 +5737,16 @@ dependencies = [ [[package]] name = "wasmtime-winch" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1c1b6abbba5a01739bef9f00a87b419414a7dd99b795823d93fb12fc2bf994a" +checksum = "d638e7c72447253485fe131523e7465ca318c0455c826eb4f5f612fb67b7de90" dependencies = [ "anyhow", "cranelift-codegen", "gimli", "object", "target-lexicon", - "wasmparser 0.115.0", + "wasmparser 0.116.1", "wasmtime-cranelift-shared", "wasmtime-environ", "winch-codegen", @@ -5658,9 +5754,9 @@ dependencies = [ [[package]] name = "wasmtime-wit-bindgen" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d214ca7513d76af2872ad5bba4b0dcc0225821931745fdcb4fc30dd34bc3bf7" +checksum = "4b804dfd3d0c0d6d37aa21026fe7772ba1a769c89ee4f5c4f13b82d91d75216f" dependencies = [ "anyhow", "heck", @@ -5670,9 +5766,9 @@ dependencies = [ [[package]] name = "wasmtime-wmemcheck" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9dafab2db172a53e23940e0fa3078c202f567ee5f13f4b42f66b694fab43c658" +checksum = "9b6060bc082cc32d9a45587c7640e29e3c7b89ada82677ac25d87850aaccb368" [[package]] name = "wast" @@ -5749,9 +5845,9 @@ checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" [[package]] name = "wiggle" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f6ce56a4019ce3d8592c298029a75abe6887d1c95a078a4c53ec77a0628262d" +checksum = "f91028b241e692fdf30627ac10ba9d5ac378353ea4119b4f904ac95177057a44" dependencies = [ "anyhow", "async-trait", @@ -5764,28 +5860,28 @@ dependencies = [ [[package]] name = "wiggle-generate" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e585a4b1e84195031c77d8484af99cd93f129f45d519e83cb8cc75e9a420cfd3" +checksum = "5e8b3d76531994513671b2ec3b29fd342bf041e2282945bb6c52eebe6aa9e7da" dependencies = [ "anyhow", "heck", "proc-macro2", "quote", "shellexpand", - "syn 2.0.39", + "syn 2.0.40", "witx", ] [[package]] name = "wiggle-macro" -version = "14.0.4" +version = "15.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6f321dbce722989d65c3082dba479fa392c7b7a1a4c3adc2a39545dd5aa452f" +checksum = "c189fe00c67f61bb330827f2abab1af9b5925c7929535cd13a68d265ec20b02d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", "wiggle-generate", ] @@ -5822,9 +5918,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "winch-codegen" -version = "0.12.4" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f112bebb367a544d20c254083798087f22ceeb426168a970b955e8436f749dca" +checksum = "0c792487f4dc42733d182a72e75d718b1a563cedcc1599ff0a9ed683c33e8bb7" dependencies = [ "anyhow", "cranelift-codegen", @@ -5832,7 +5928,7 @@ dependencies = [ "regalloc2", "smallvec", "target-lexicon", - "wasmparser 0.115.0", + "wasmparser 0.116.1", "wasmtime-environ", ] @@ -6065,9 +6161,9 @@ dependencies = [ [[package]] name = "wit-parser" -version = "0.12.2" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43771ee863a16ec4ecf9da0fc65c3bbd4a1235c8e3da5f094b562894843dfa76" +checksum = "15df6b7b28ce94b8be39d8df5cb21a08a4f3b9f33b631aedb4aa5776f785ead3" dependencies = [ "anyhow", "id-arena", @@ -6132,7 +6228,7 @@ checksum = "be912bf68235a88fbefd1b73415cb218405958d1655b2ece9035a19920bdf6ba" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] @@ -6152,7 +6248,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.39", + "syn 2.0.40", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c710f3d0..4c9b3026 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,11 +28,13 @@ opentelemetry = { version = "0.21", default-features = false, features = [ ] } opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] } procfs = "0.16" -policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.12.2" } +policy-evaluator = { git = "https://github.com/kubewarden/policy-evaluator", tag = "v0.13.0" } rayon = "1.8" serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.9.27" +sha2 = "0.10" +thiserror = "1.0" tokio = { version = "^1", features = ["full"] } tracing = "0.1" tracing-futures = "0.2" @@ -43,8 +45,10 @@ warp = { version = "0.3.6", default_features = false, features = [ "tls", ] } semver = { version = "1.0.20", features = ["serde"] } +mockall_double = "0.3" [dev-dependencies] +mockall = "0.12" rstest = "0.18" tempfile = "3.8.1" reqwest = { version = "0.11", default_features = false, features = [ diff --git a/e2e-tests/test_data/policies.yaml b/e2e-tests/test_data/policies.yaml index 2485ac32..bf16f2ee 100644 --- a/e2e-tests/test_data/policies.yaml +++ b/e2e-tests/test_data/policies.yaml @@ -77,3 +77,26 @@ raw-mutation-wasi: - "banana" - "carrot" defaultResource: "hay" + +apparmor: + url: ghcr.io/kubewarden/tests/apparmor-psp:v0.1.13 + allowedToMutate: false + settings: + allowed_profiles: + - runtime/default + +psp-user-group: + url: ghcr.io/kubewarden/tests/user-group-psp:v0.4.9 + allowedToMutate: true + settings: + run_as_user: + rule: MustRunAs + ranges: + - min: 1000 + max: 2000 + run_as_group: + rule: RunAsAny + overwrite: false + supplemental_groups: + rule: RunAsAny + overwrite: false diff --git a/src/config.rs b/src/config.rs index 62c98311..eb39b4b5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, Result}; use clap::ArgMatches; use lazy_static::lazy_static; +use policy_evaluator::policy_evaluator::PolicySettings; use policy_evaluator::policy_fetcher::sources::{read_sources_file, Sources}; use policy_evaluator::policy_fetcher::verify::config::{ read_verification_file, LatestVerificationConfig, VerificationConfigV1, @@ -231,7 +232,7 @@ pub struct Policy { } impl Policy { - pub fn settings_to_json(&self) -> Result>> { + pub fn settings_to_json(&self) -> Result> { match self.settings.as_ref() { None => Ok(None), Some(settings) => { diff --git a/src/lib.rs b/src/lib.rs index 0a0fcada..b3a505a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,8 +3,7 @@ mod communication; mod metrics; mod policy_downloader; mod server; -mod worker; -mod worker_pool; +mod workers; pub mod admission_review; pub mod config; @@ -26,7 +25,7 @@ use tracing_subscriber::{fmt, EnvFilter}; use communication::{EvalRequest, WorkerPoolBootRequest}; use config::Config; use policy_downloader::Downloader; -use worker_pool::WorkerPool; +use workers::pool::WorkerPool; lazy_static! { static ref TRACE_SYSTEM_INITIALIZED: RwLock = RwLock::new(false); diff --git a/src/worker_pool.rs b/src/worker_pool.rs deleted file mode 100644 index f29d3f9c..00000000 --- a/src/worker_pool.rs +++ /dev/null @@ -1,546 +0,0 @@ -use anyhow::{anyhow, Result}; -use core::time; -use lazy_static::lazy_static; -use policy_evaluator::{ - callback_requests::CallbackRequest, - policy_evaluator::{Evaluator, PolicyEvaluator, PolicyExecutionMode}, - policy_evaluator_builder::PolicyEvaluatorBuilder, - policy_metadata::Metadata, - wasmtime, -}; -use rayon::prelude::*; -use semver::{BuildMetadata, Prerelease, Version}; -use std::{ - collections::HashMap, - fs, - path::Path, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Barrier, RwLock, - }, - thread, - thread::JoinHandle, - vec::Vec, -}; -use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, error, info, warn}; - -use crate::policy_downloader::FetchedPolicies; -use crate::worker::Worker; -use crate::{ - communication::{EvalRequest, WorkerPoolBootRequest}, - config, -}; - -lazy_static! { - static ref KUBEWARDEN_VERSION: Version = { - let mut version = Version::parse(env!("CARGO_PKG_VERSION")).expect("Cannot parse CARGO_PKG_VERSION version"); - // Remove the patch, prerelease and build information to avoid rejections - // like this: v1.6.0-rc4 < v1.6.0 - version.patch = 0; - version.pre = Prerelease::EMPTY; - version.build = BuildMetadata::EMPTY; - version - }; -} - -/// Check if policy server version is compatible with minimum kubewarden -/// version required by the policy -fn has_minimum_kubewarden_version(metadata: &Metadata) -> Result<()> { - if let Some(minimum_kubewarden_version) = &metadata.minimum_kubewarden_version { - let sanitized_minimum_kubewarden_version = Version { - major: minimum_kubewarden_version.major, - minor: minimum_kubewarden_version.minor, - // Kubewarden stack version ignore patch, prerelease and build version numbers - patch: 0, - pre: Prerelease::EMPTY, - build: BuildMetadata::EMPTY, - }; - if *KUBEWARDEN_VERSION < sanitized_minimum_kubewarden_version { - return Err(anyhow!( - "Policy required Kubewarden version {} but is running on {}", - sanitized_minimum_kubewarden_version, - KUBEWARDEN_VERSION.to_string(), - )); - } - } - Ok(()) -} - -/// This structure holds a precompiled WebAssembly module -/// representing a policy. -/// -/// Compiling a WebAssembly module is an expensive operation. Each -/// worker thread needs to do that, for each policy defined by the user. -/// -/// Precompiling the policies ahead of time reduces the bootstrap time by a lot. -/// -/// **Warning:** when "rehydrating" the module, you have to use a `wasmtime::Engine` -/// that has been created with the same `wasmtime::Config` used at compilation time. -#[derive(Clone)] -pub(crate) struct PrecompiledPolicy { - /// A precompiled [`wasmtime::Module`] - pub precompiled_module: Vec, - - /// The execution mode of the policy - pub execution_mode: PolicyExecutionMode, -} - -impl PrecompiledPolicy { - /// Load a WebAssembly module from the disk and compiles it - fn new(engine: &wasmtime::Engine, wasm_module_path: &Path) -> Result { - let policy_contents = fs::read(wasm_module_path)?; - let policy_metadata = Metadata::from_contents(&policy_contents)?; - let metadata = policy_metadata.unwrap_or_default(); - let execution_mode = metadata.execution_mode; - has_minimum_kubewarden_version(&metadata)?; - - let precompiled_module = engine.precompile_module(&policy_contents)?; - - Ok(Self { - precompiled_module, - execution_mode, - }) - } -} - -/// A dictionary with: -/// * Key: the URL of the WebAssembly module -/// * value: the PrecompiledPolicy -pub(crate) type PrecompiledPolicies = HashMap; - -pub(crate) struct WorkerPool { - api_rx: mpsc::Receiver, - bootstrap_rx: oneshot::Receiver, - callback_handler_tx: mpsc::Sender, - always_accept_admission_reviews_on_namespace: Option, - policy_evaluation_limit_seconds: Option, -} - -impl WorkerPool { - pub(crate) fn new( - bootstrap_rx: oneshot::Receiver, - api_rx: mpsc::Receiver, - callback_handler_tx: mpsc::Sender, - always_accept_admission_reviews_on_namespace: Option, - policy_evaluation_limit_seconds: Option, - ) -> WorkerPool { - WorkerPool { - api_rx, - bootstrap_rx, - callback_handler_tx, - always_accept_admission_reviews_on_namespace, - policy_evaluation_limit_seconds, - } - } - - pub(crate) fn run(mut self) { - let mut worker_tx_chans = Vec::>::new(); - let mut worker_engines = Vec::::new(); - let mut join_handles = Vec::>>::new(); - - // Phase 1: wait for bootstrap data to be received by the main - // code running in the async block. Once the data is received - // populate the worker pool - - let bootstrap_data = match self.bootstrap_rx.blocking_recv() { - Ok(data) => data, - Err(e) => { - eprintln!("workers pool bootstrap: error receiving bootstrap data: {e:?}"); - std::process::exit(1); - } - }; - - // To reduce bootstrap time, we will precompile all the WebAssembly - // modules we are going to use. - let mut wasmtime_config = wasmtime::Config::new(); - if self.policy_evaluation_limit_seconds.is_some() { - wasmtime_config.epoch_interruption(true); - } - - let engine = match wasmtime::Engine::new(&wasmtime_config) { - Ok(e) => e, - Err(e) => { - eprintln!("workers pool bootstrap: cannot instantiate `wasmtime::Engine`: {e:?}"); - std::process::exit(1); - } - }; - - // Use a reference counter to share access to precompiled policies - // between workers. This reduces memory usage - let precompiled_policies: Arc = - match precompile_policies(&engine, &bootstrap_data.fetched_policies) { - Ok(pp) => Arc::new(pp), - Err(e) => { - eprintln!("{e}"); - std::process::exit(1); - } - }; - - if let Err(error) = verify_policy_settings( - &engine, - &bootstrap_data.policies, - &precompiled_policies, - self.callback_handler_tx.clone(), - self.policy_evaluation_limit_seconds, - ) { - error!(?error, "cannot validate policy settings"); - match bootstrap_data.resp_chan.send(Err(error)) { - Ok(_) => return, - Err(_) => { - eprint!("worker bootstrap: cannot send back failure through channel"); - std::process::exit(1); - } - }; - } - - let pool_size: usize = bootstrap_data.pool_size; - let barrier = Arc::new(Barrier::new(pool_size + 1)); - let boot_canary = Arc::new(AtomicBool::new(true)); - - if let Some(limit) = self.policy_evaluation_limit_seconds { - info!( - execution_limit_seconds = limit, - "policy timeout protection is enabled" - ); - } else { - warn!("policy timeout protection is disabled"); - } - - // Use a reference counter to share access to policies - // between workers. This reduces memory usage - let policies = Arc::new(bootstrap_data.policies); - - for n in 1..=pool_size { - let (tx, rx) = mpsc::channel::(32); - worker_tx_chans.push(tx); - - // Each worker has its own wasmtime::Engine, sharing the - // same engine across all the workers leads to bad performance - let engine = match wasmtime::Engine::new(&wasmtime_config) { - Ok(e) => e, - Err(e) => { - if bootstrap_data - .resp_chan - .send(Err(anyhow!( - "cannot create wasmtime engine for one of the workers: {}", - e - ))) - .is_err() - { - eprint!("cannot create wasmtime engine for one of the workers: {e}"); - std::process::exit(1); - }; - return; - } - }; - worker_engines.push(engine.clone()); - - let modules = precompiled_policies.clone(); - let b = barrier.clone(); - let canary = boot_canary.clone(); - let callback_handler_tx = self.callback_handler_tx.clone(); - let always_accept_admission_reviews_on_namespace = - self.always_accept_admission_reviews_on_namespace.clone(); - let policies = policies.clone(); - - let join = thread::spawn(move || -> Result<()> { - info!(spawned = n, total = pool_size, "spawning worker"); - - let mut worker = match Worker::new( - rx, - &policies, - &modules, - engine, - callback_handler_tx, - always_accept_admission_reviews_on_namespace, - self.policy_evaluation_limit_seconds, - ) { - Ok(w) => w, - Err(e) => { - error!(error = e.to_string().as_str(), "cannot spawn worker"); - canary.store(false, Ordering::SeqCst); - b.wait(); - return Err(anyhow!("Worker {} couldn't start: {}", n, e)); - } - }; - // Drop the Arc references ASAP, they are no longer needed - // at this point - drop(policies); - drop(modules); - b.wait(); - - debug!(id = n, "worker loop start"); - worker.run(); - debug!(id = n, "worker loop exit"); - - Ok(()) - }); - join_handles.push(join); - } - - // Deallocate all the memory used by the precompiled policies since - // they are no longer needed. Without this explicit cleanup - // the reference would be dropped right before Policy Server exits, - // meaning a lot of memory would have been consumed without a valid reason - // during the whole execution time - drop(precompiled_policies); - barrier.wait(); - - if !boot_canary.load(Ordering::SeqCst) { - match bootstrap_data - .resp_chan - .send(Err(anyhow!("could not init one of the workers"))) - { - Ok(_) => return, - Err(_) => { - eprint!("worker bootstrap: cannot send back failure through channel"); - std::process::exit(1); - } - }; - } - - // bootstrap went smoothly - if bootstrap_data.resp_chan.send(Ok(())).is_err() { - eprint!("worker bootstrap: cannot send back success message through channel"); - std::process::exit(1); - } - - // Phase 2: the worker pool has been successfully bootstraped. - // We can start waiting for admission review requests to be evaluated - let mut next_worker_id = 0; - - if self.policy_evaluation_limit_seconds.is_some() { - // start a dedicated thread that send tick events to all - // the workers. This is used by the wasmtime's epoch_interruption - // to keep track of the execution time of each wasm module - thread::spawn(move || { - let one_second = time::Duration::from_secs(1); - loop { - thread::sleep(one_second); - for engine in &worker_engines { - engine.increment_epoch(); - } - } - }); - } - - while let Some(req) = self.api_rx.blocking_recv() { - let _ = worker_tx_chans[next_worker_id].blocking_send(req); - next_worker_id += 1; - if next_worker_id >= pool_size { - next_worker_id = 0; - } - } - - for handle in join_handles { - handle.join().unwrap().unwrap(); - } - } -} - -pub(crate) fn build_policy_evaluator( - policy_id: &str, - policy: &config::Policy, - engine: &wasmtime::Engine, - policy_modules: &PrecompiledPolicies, - callback_handler_tx: mpsc::Sender, - policy_evaluation_limit_seconds: Option, -) -> Result { - let policy_module = policy_modules.get(policy.url.as_str()).ok_or_else(|| { - anyhow!( - "could not find preoptimized module for policy: {:?}", - policy.url - ) - })?; - - // See `wasmtime::Module::deserialize` to know why this method is `unsafe`. - // However, in our context, nothing bad will happen because we have - // full control of the precompiled module. This is generated by the - // WorkerPool thread - let module = - unsafe { wasmtime::Module::deserialize(engine, &policy_module.precompiled_module) } - .map_err(|e| { - anyhow!( - "could not rehydrate wasmtime::Module {}: {:?}", - policy.url, - e - ) - })?; - - let mut policy_evaluator_builder = PolicyEvaluatorBuilder::new(policy_id.to_string()) - .engine(engine.clone()) - .policy_module(module) - .settings(policy.settings_to_json()?) - .context_aware_resources_allowed(policy.context_aware_resources.clone()) - .callback_channel(callback_handler_tx) - .execution_mode(policy_module.execution_mode); - - if let Some(limit) = policy_evaluation_limit_seconds { - policy_evaluator_builder = - policy_evaluator_builder.enable_epoch_interruptions(limit, limit); - } - - policy_evaluator_builder.build() -} - -fn precompile_policies( - engine: &wasmtime::Engine, - fetched_policies: &FetchedPolicies, -) -> Result { - debug!( - wasm_modules_count = fetched_policies.len(), - "instantiating wasmtime::Module objects" - ); - - let precompiled_policies: HashMap> = fetched_policies - .par_iter() - .map(|(policy_url, wasm_module_path)| { - let precompiled_policy = PrecompiledPolicy::new(engine, wasm_module_path); - debug!(?policy_url, "module compiled"); - (policy_url.clone(), precompiled_policy) - }) - .collect(); - - let errors: Vec = precompiled_policies - .iter() - .filter_map(|(url, result)| match result { - Ok(_) => None, - Err(e) => Some(format!( - "[{url}] policy cannot be compiled to WebAssembly module: {e:?}" - )), - }) - .collect(); - if !errors.is_empty() { - return Err(anyhow!( - "workers pool bootstrap: cannot instantiate `wasmtime::Module` objects: {:?}", - errors.join(", ") - )); - } - - Ok(precompiled_policies - .iter() - .filter_map(|(url, result)| match result { - Ok(p) => Some((url.clone(), p.clone())), - Err(_) => None, - }) - .collect()) -} - -fn verify_policy_settings( - engine: &wasmtime::Engine, - policies: &HashMap, - policy_modules: &HashMap, - callback_handler_tx: mpsc::Sender, - policy_evaluation_limit_seconds: Option, -) -> Result<()> { - let tick_thread_lock = Arc::new(RwLock::new(true)); - - if policy_evaluation_limit_seconds.is_some() { - // start a dedicated thread that send tick events to the - // wasmtime engine. - // This is used by the wasmtime's epoch_interruption - // to keep track of the execution time of each wasm module - - let loop_engine = engine.clone(); - let keep_going_lock = tick_thread_lock.clone(); - - thread::spawn(move || { - let one_second = time::Duration::from_secs(1); - loop { - thread::sleep(one_second); - loop_engine.increment_epoch(); - if !(*keep_going_lock.read().unwrap()) { - break; - } - } - }); - } - - let mut errors = vec![]; - for (id, policy) in policies.iter() { - let mut policy_evaluator = match build_policy_evaluator( - id, - policy, - engine, - policy_modules, - callback_handler_tx.clone(), - policy_evaluation_limit_seconds, - ) { - Ok(pe) => pe, - Err(e) => { - errors.push(format!("[{id}] cannot create PolicyEvaluator: {e:?}")); - continue; - } - }; - let set_val_rep = policy_evaluator.validate_settings(); - if !set_val_rep.valid { - errors.push(format!( - "[{}] settings are not valid: {:?}", - id, set_val_rep.message - )); - continue; - } - } - - if policy_evaluation_limit_seconds.is_some() { - // Tell the ticker thread loop to stop - let mut w = tick_thread_lock.write().unwrap(); - *w = false; - } - - if errors.is_empty() { - Ok(()) - } else { - Err(anyhow!("{}", errors.join(", "))) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use rstest::rstest; - - fn generate_metadata(major: u64, minor: u64, patch: u64) -> Metadata { - let minimum_kubewarden_version = Version { - major, - minor, - patch, - pre: Prerelease::EMPTY, - build: BuildMetadata::EMPTY, - }; - Metadata { - minimum_kubewarden_version: Some(minimum_kubewarden_version), - ..Default::default() - } - } - - #[rstest] - #[case(generate_metadata(KUBEWARDEN_VERSION.major -1, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch))] - #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor - 1, KUBEWARDEN_VERSION.patch))] - fn recent_kubewarden_versions_test(#[case] metadata: Metadata) { - assert!(has_minimum_kubewarden_version(&metadata).is_ok()) - } - - #[rstest] - #[case(generate_metadata(KUBEWARDEN_VERSION.major +1, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch))] - #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor + 1, KUBEWARDEN_VERSION.patch))] - fn old_kubewarden_versions_test(#[case] metadata: Metadata) { - assert!(has_minimum_kubewarden_version(&metadata).is_err()) - } - - #[test] - fn no_mininum_kubewarden_version_is_valid_test() { - let metadata = Metadata { - minimum_kubewarden_version: None, - ..Default::default() - }; - assert!(has_minimum_kubewarden_version(&metadata).is_ok()) - } - - #[rstest] - #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch + 1))] - fn ignore_patch_version_test(#[case] metadata: Metadata) { - assert!(has_minimum_kubewarden_version(&metadata).is_ok()) - } -} diff --git a/src/workers/error.rs b/src/workers/error.rs new file mode 100644 index 00000000..a96b481b --- /dev/null +++ b/src/workers/error.rs @@ -0,0 +1,18 @@ +use thiserror::Error; + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +pub enum EvaluationError { + #[error("unknown policy: {0}")] + PolicyNotFound(String), + + #[error("bootstrap failure: {0}")] + BootstrapFailure(String), + + #[error("WebAssembly failure: {0}")] + WebAssemblyError(String), + + #[error("{0}")] + InternalError(String), +} diff --git a/src/workers/evaluation_environment.rs b/src/workers/evaluation_environment.rs new file mode 100644 index 00000000..3dc78773 --- /dev/null +++ b/src/workers/evaluation_environment.rs @@ -0,0 +1,400 @@ +use policy_evaluator::{ + admission_response::AdmissionResponse, + callback_requests::CallbackRequest, + evaluation_context::EvaluationContext, + kubewarden_policy_sdk::settings::SettingsValidationResponse, + policy_evaluator::{PolicyEvaluator, PolicyEvaluatorPre, PolicyExecutionMode}, + policy_evaluator_builder::PolicyEvaluatorBuilder, + wasmtime, +}; +use std::collections::HashMap; +use tokio::sync::mpsc; +use tracing::debug; + +use crate::communication::EvalRequest; +use crate::config::PolicyMode; +use crate::workers::error::{EvaluationError, Result}; +use crate::workers::{ + policy_evaluation_settings::PolicyEvaluationSettings, + precompiled_policy::{PrecompiledPolicies, PrecompiledPolicy}, +}; + +#[cfg(test)] +use mockall::automock; + +/// This structure contains all the policies defined by the user inside of the `policies.yml`. +/// It also provides helper methods to perform the validation of a request and the validation +/// of the settings provided by the user. +/// +/// This is an immutable structure that can be safely shared across different threads once wrapped +/// inside of a `Arc`. +/// +/// When performing a `validate` or `validate_settings` operation, a new WebAssembly environment is +/// created and used to perform the operation. The environment is then discarded once the +/// evaluation is over. +/// This ensures: +/// - no memory leaks caused by bogus policies affect the Policy Server long running process +/// - no data is shared between evaluations of the same module +/// +/// To reduce the creation time, this code makes use of `PolicyEvaluatorPre` which are created +/// only once, during the bootstrap phase. +#[derive(Default)] +#[cfg_attr(test, allow(dead_code))] +pub(crate) struct EvaluationEnvironment { + /// The name of the Namespace where Policy Server doesn't operate. All the requests + /// involving this Namespace are going to be accepted. This is usually done to prevent user + /// policies from messing with the components of the Kubewarden stack (which are all + /// deployed inside of the same Namespace). + always_accept_admission_reviews_on_namespace: Option, + + /// A map with the module digest as key, and the associated `PolicyEvaluatorPre` + /// as value + module_digest_to_policy_evaluator_pre: HashMap, + + /// A map with the ID of the policy as value, and the associated `EvaluationContext` as + /// value. + /// In this case, `policy_id` is the name of the policy as declared inside of the + /// `policies.yml` file. These names are guaranteed to be unique. + policy_id_to_eval_ctx: HashMap, + + /// Map a `policy_id` (the name given by the user inside of `policies.yml`) to the + /// module's digest. This allows us to deduplicate the Wasm modules defined by the user. + policy_id_to_module_digest: HashMap, + + /// Map a `policy_id` to the `PolicyEvaluationSettings` instance. This allows us to obtain + /// the list of settings to be used when evaluating a given policy. + policy_id_to_settings: HashMap, +} + +#[cfg_attr(test, automock)] +#[cfg_attr(test, allow(dead_code))] +impl EvaluationEnvironment { + /// Creates a new `EvaluationEnvironment` + pub(crate) fn new( + engine: &wasmtime::Engine, + policies: &HashMap, + precompiled_policies: &PrecompiledPolicies, + always_accept_admission_reviews_on_namespace: Option, + policy_evaluation_limit_seconds: Option, + callback_handler_tx: mpsc::Sender, + ) -> Result { + let mut eval_env = Self { + always_accept_admission_reviews_on_namespace, + ..Default::default() + }; + + for (policy_id, policy) in policies { + let precompiled_policy = precompiled_policies.get(&policy.url).ok_or_else(|| { + EvaluationError::BootstrapFailure(format!( + "cannot find policy settings of {}", + policy_id + )) + })?; + + eval_env + .register( + engine, + policy_id, + precompiled_policy, + policy, + callback_handler_tx.clone(), + policy_evaluation_limit_seconds, + ) + .map_err(|e| EvaluationError::BootstrapFailure(e.to_string()))?; + } + + Ok(eval_env) + } + + /// Returns `true` if the given `namespace` is the special Namespace that is ignored by all + /// the policies + pub(crate) fn should_always_accept_requests_made_inside_of_namespace( + &self, + namespace: &str, + ) -> bool { + self.always_accept_admission_reviews_on_namespace.as_deref() == Some(namespace) + } + + /// Register a new policy. It takes care of creating a new `PolicyEvaluator` (when needed). + /// + /// Params: + /// - `engine`: the `wasmtime::Engine` to be used when creating the `PolicyEvaluator` + /// - `policy_id`: the ID of the policy, as specified inside of the `policies.yml` by the + /// user + /// - `precompiled_policy`: the `PrecompiledPolicy` associated with the Wasm module referenced + /// by the policy + /// - `policy`: a data structure that maps all the information defined inside of + /// `policies.yml` for the given policy + /// - `callback_handler_tx`: the transmission end of a channel that connects the worker + /// with the asynchronous world + /// - `policy_evaluation_limit_seconds`: when set, defines after how many seconds the + /// policy evaluation is interrupted + fn register( + &mut self, + engine: &wasmtime::Engine, + policy_id: &str, + precompiled_policy: &PrecompiledPolicy, + policy: &crate::config::Policy, + callback_handler_tx: mpsc::Sender, + policy_evaluation_limit_seconds: Option, + ) -> Result<()> { + let module_digest = &precompiled_policy.digest; + + if !self + .module_digest_to_policy_evaluator_pre + .contains_key(module_digest) + { + debug!(policy_id = policy.url, "create wasmtime::Module"); + let module = create_wasmtime_module(&policy.url, engine, precompiled_policy)?; + debug!(policy_id = policy.url, "create PolicyEvaluatorPre"); + let pol_eval_pre = create_policy_evaluator_pre( + engine, + &module, + precompiled_policy.execution_mode, + policy_evaluation_limit_seconds, + )?; + + self.module_digest_to_policy_evaluator_pre + .insert(module_digest.to_owned(), pol_eval_pre); + } + self.policy_id_to_module_digest + .insert(policy_id.to_owned(), module_digest.to_owned()); + + let policy_eval_settings = PolicyEvaluationSettings { + policy_mode: policy.policy_mode.clone(), + allowed_to_mutate: policy.allowed_to_mutate.unwrap_or(false), + settings: policy + .settings_to_json() + .map_err(|e| EvaluationError::InternalError(e.to_string()))? + .unwrap_or_default(), + }; + self.policy_id_to_settings + .insert(policy_id.to_owned(), policy_eval_settings); + + let eval_ctx = EvaluationContext { + policy_id: policy_id.to_owned(), + callback_channel: Some(callback_handler_tx.clone()), + ctx_aware_resources_allow_list: policy.context_aware_resources.clone(), + }; + self.policy_id_to_eval_ctx + .insert(policy_id.to_owned(), eval_ctx); + + Ok(()) + } + + /// Given a policy ID, return how the policy operates + pub fn get_policy_mode(&self, policy_id: &str) -> Result { + self.policy_id_to_settings + .get(policy_id) + .map(|settings| settings.policy_mode.clone()) + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string())) + } + + /// Given a policy ID, returns true if the policy is allowed to mutate + pub fn get_policy_allowed_to_mutate(&self, policy_id: &str) -> Result { + self.policy_id_to_settings + .get(policy_id) + .map(|settings| settings.allowed_to_mutate) + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string())) + } + + /// Given a policy ID, returns the settings provided by the user inside of `policies.yml` + fn get_policy_settings(&self, policy_id: &str) -> Result { + let settings = self + .policy_id_to_settings + .get(policy_id) + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string()))? + .clone(); + + Ok(settings) + } + + /// Perform a request validation + pub fn validate(&self, policy_id: &str, req: &EvalRequest) -> Result { + let settings = self.get_policy_settings(policy_id)?; + let mut evaluator = self.rehydrate(policy_id)?; + + Ok(evaluator.validate(req.req.clone(), &settings.settings)) + } + + /// Validate the settings the user provided for the given policy + pub fn validate_settings(&self, policy_id: &str) -> Result { + let settings = self.get_policy_settings(policy_id)?; + let mut evaluator = self.rehydrate(policy_id)?; + + Ok(evaluator.validate_settings(&settings.settings)) + } + + /// Internal method, create a `PolicyEvaluator` by using a pre-initialized instance + fn rehydrate(&self, policy_id: &str) -> Result { + let module_digest = self + .policy_id_to_module_digest + .get(policy_id) + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string()))?; + let policy_evaluator_pre = self + .module_digest_to_policy_evaluator_pre + .get(module_digest) + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string()))?; + + let eval_ctx = self + .policy_id_to_eval_ctx + .get(policy_id) + .ok_or(EvaluationError::PolicyNotFound(policy_id.to_string()))?; + + policy_evaluator_pre.rehydrate(eval_ctx).map_err(|e| { + EvaluationError::WebAssemblyError(format!("cannot rehydrate PolicyEvaluatorPre: {e}")) + }) + } +} + +fn create_wasmtime_module( + policy_url: &str, + engine: &wasmtime::Engine, + precompiled_policy: &PrecompiledPolicy, +) -> Result { + // See `wasmtime::Module::deserialize` to know why this method is `unsafe`. + // However, in our context, nothing bad will happen because we have + // full control of the precompiled module. This is generated by the + // WorkerPool thread + unsafe { wasmtime::Module::deserialize(engine, &precompiled_policy.precompiled_module) } + .map_err(|e| { + EvaluationError::WebAssemblyError(format!( + "could not rehydrate wasmtime::Module {policy_url}: {e:?}" + )) + }) +} + +/// Internal function, takes care of creating the `PolicyEvaluator` instance for the given policy +fn create_policy_evaluator_pre( + engine: &wasmtime::Engine, + module: &wasmtime::Module, + mode: PolicyExecutionMode, + policy_evaluation_limit_seconds: Option, +) -> Result { + let mut policy_evaluator_builder = PolicyEvaluatorBuilder::new() + .engine(engine.to_owned()) + .policy_module(module.to_owned()) + .execution_mode(mode); + + if let Some(limit) = policy_evaluation_limit_seconds { + policy_evaluator_builder = + policy_evaluator_builder.enable_epoch_interruptions(limit, limit); + } + + policy_evaluator_builder.build_pre().map_err(|e| { + EvaluationError::WebAssemblyError(format!("cannot build PolicyEvaluatorPre {e}")) + }) +} + +#[cfg(test)] +mod tests { + use policy_evaluator::{ + admission_response::AdmissionResponse, policy_evaluator::ValidateRequest, + }; + use rstest::*; + use std::collections::BTreeSet; + + use super::*; + use crate::admission_review::tests::build_admission_review; + use crate::config::Policy; + + fn build_evaluation_environment() -> Result { + let engine = wasmtime::Engine::default(); + let policy_ids = vec!["policy_1", "policy_2"]; + let module = wasmtime::Module::new(&engine, "(module (func))") + .expect("should be able to build the smallest wasm module ever"); + let (callback_handler_tx, _) = mpsc::channel(10); + + let precompiled_policy = PrecompiledPolicy { + precompiled_module: module.serialize().unwrap(), + execution_mode: policy_evaluator::policy_evaluator::PolicyExecutionMode::Wasi, + digest: "unique-digest".to_string(), + }; + + let mut policies: HashMap = HashMap::new(); + let mut precompiled_policies: PrecompiledPolicies = PrecompiledPolicies::new(); + + for policy_id in &policy_ids { + let policy_url = format!("file:///tmp/{policy_id}.wasm"); + policies.insert( + policy_id.to_string(), + Policy { + url: policy_url.clone(), + policy_mode: PolicyMode::Protect, + allowed_to_mutate: None, + settings: None, + context_aware_resources: BTreeSet::new(), + }, + ); + precompiled_policies.insert(policy_url, precompiled_policy.clone()); + } + + EvaluationEnvironment::new( + &engine, + &policies, + &precompiled_policies, + None, + None, + callback_handler_tx, + ) + } + + #[rstest] + #[case("policy_not_defined", true)] + #[case("policy_1", false)] + fn return_policy_not_found_error(#[case] policy_id: &str, #[case] expect_error: bool) { + let eval_env = build_evaluation_environment().unwrap(); + let req = ValidateRequest::AdmissionRequest( + build_admission_review().request.expect("no request"), + ); + + let (tx, _) = tokio::sync::oneshot::channel::>(); + let eval_req = EvalRequest { + policy_id: policy_id.to_string(), + req, + resp_chan: tx, + parent_span: tracing::Span::none(), + request_origin: crate::communication::RequestOrigin::Validate, + }; + + if expect_error { + assert!(matches!( + eval_env.get_policy_mode(policy_id), + Err(EvaluationError::PolicyNotFound(_)) + )); + assert!(matches!( + eval_env.get_policy_allowed_to_mutate(policy_id), + Err(EvaluationError::PolicyNotFound(_)) + )); + assert!(matches!( + eval_env.get_policy_settings(policy_id), + Err(EvaluationError::PolicyNotFound(_)) + )); + assert!(matches!( + eval_env.validate(policy_id, &eval_req), + Err(EvaluationError::PolicyNotFound(_)) + )); + } else { + assert!(eval_env.get_policy_mode(policy_id).is_ok()); + assert!(eval_env.get_policy_allowed_to_mutate(policy_id).is_ok()); + assert!(eval_env.get_policy_settings(policy_id).is_ok()); + // note: we do not test `validate` with a known policy because this would + // cause another error. The test policy we're using is just an empty Wasm + // module + } + } + + /// Given to identical wasm modules, only one instance of PolicyEvaluator is going to be + /// created + #[test] + fn avoid_duplicated_instaces_of_policy_evaluator() { + let evaluation_environment = build_evaluation_environment().unwrap(); + + assert_eq!( + evaluation_environment + .module_digest_to_policy_evaluator_pre + .len(), + 1 + ); + } +} diff --git a/src/workers/mod.rs b/src/workers/mod.rs new file mode 100644 index 00000000..72b831c5 --- /dev/null +++ b/src/workers/mod.rs @@ -0,0 +1,10 @@ +pub(crate) mod error; +mod evaluation_environment; +mod policy_evaluation_settings; +pub(crate) mod pool; +pub(crate) mod precompiled_policy; +pub(crate) mod worker; + +// This is required to mock the `EvaluationEnvironment` inside of our tests +#[mockall_double::double] +pub(crate) use evaluation_environment::EvaluationEnvironment; diff --git a/src/workers/policy_evaluation_settings.rs b/src/workers/policy_evaluation_settings.rs new file mode 100644 index 00000000..aba66b2b --- /dev/null +++ b/src/workers/policy_evaluation_settings.rs @@ -0,0 +1,16 @@ +use policy_evaluator::policy_evaluator::PolicySettings; + +use crate::config::PolicyMode; + +/// Holds the evaluation settings of loaded Policy. These settings are taken straight from the +/// `policies.yml` file provided by the user +#[cfg_attr(test, allow(dead_code))] +#[derive(Clone)] +pub(crate) struct PolicyEvaluationSettings { + /// Whether the policy is operating in `protect` or `monitor` mode + pub(crate) policy_mode: PolicyMode, + /// Determines if a mutating policy is actually allowed to mutate + pub(crate) allowed_to_mutate: bool, + /// The policy-specific settings provided by the user + pub(crate) settings: PolicySettings, +} diff --git a/src/workers/pool.rs b/src/workers/pool.rs new file mode 100644 index 00000000..f2154a4e --- /dev/null +++ b/src/workers/pool.rs @@ -0,0 +1,346 @@ +use anyhow::{anyhow, Result}; +use core::time; +use policy_evaluator::{callback_requests::CallbackRequest, wasmtime}; +use rayon::prelude::*; +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Barrier, RwLock, + }, + thread, + thread::JoinHandle, + vec::Vec, +}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, error, info, warn}; + +use crate::communication::{EvalRequest, WorkerPoolBootRequest}; +use crate::config; +use crate::policy_downloader::FetchedPolicies; +use crate::workers::EvaluationEnvironment; +use crate::workers::{ + precompiled_policy::{PrecompiledPolicies, PrecompiledPolicy}, + worker::Worker, +}; + +/// Coordinates a set of workers. +/// Each worker takes care of performing the evaluation of the requests received by Policy Server +/// API endpoints. +/// +/// The HTTP API communicates with the worker pool via a dedicated chanel. The pool then assigns +/// the evaluation job to one of the workers. Currently this is done on a round-robin fashion. +pub(crate) struct WorkerPool { + /// Channel used by the HTTP API to send to the pool the evaluation requests that have to be + /// processed + api_rx: mpsc::Receiver, + + /// A oneshot channel used during the bootstrap phase. It's being used by the `main` to send + /// the data used to bootstrap of the workers. + bootstrap_rx: oneshot::Receiver, + + /// The channel that connect the synchronous world of the workers with the tokio task that + /// handles all the async requests that might originate during the policy evaluation process. + /// For example, requesting Kubernetes resources, DNS resolution, Sigstore operations,... + callback_handler_tx: mpsc::Sender, + + /// When set, this is the Namespace where all the policies do not apply. This is usually set + /// to be the Namespace where the Kubewarden is deployed; ensuring the user policies are not + /// going to interfere with the Kubewarden stack. + always_accept_admission_reviews_on_namespace: Option, + + /// When set enables the policy timeout feature which prevents a rogue policy to enter an + /// endless loop/consume all the resources of a worker + policy_evaluation_limit_seconds: Option, +} + +impl WorkerPool { + /// Create a new `WorkerPool`, no bootstrap operation is done yet. This happens when invoking + /// the `run` method. + pub(crate) fn new( + bootstrap_rx: oneshot::Receiver, + api_rx: mpsc::Receiver, + callback_handler_tx: mpsc::Sender, + always_accept_admission_reviews_on_namespace: Option, + policy_evaluation_limit_seconds: Option, + ) -> WorkerPool { + WorkerPool { + api_rx, + bootstrap_rx, + callback_handler_tx, + always_accept_admission_reviews_on_namespace, + policy_evaluation_limit_seconds, + } + } + + /// Bootstrap the pool and then enter an endless loop that processes incoming requests. + pub(crate) fn run(mut self) { + // The WorkerPool communicates with each worker over dedicated `mpsc::channel` (one per worker). + // This vector holds all the sender ends of these channels. + let mut worker_tx_chans = Vec::>::new(); + + // All the join handles of the spawned worker threads + let mut join_handles = Vec::>>::new(); + + // Phase 1: wait for bootstrap data to be received by the main + // code running in the async block. Once the data is received + // populate the worker pool + + let bootstrap_data = match self.bootstrap_rx.blocking_recv() { + Ok(data) => data, + Err(e) => { + eprintln!("workers pool bootstrap: error receiving bootstrap data: {e:?}"); + std::process::exit(1); + } + }; + + let mut wasmtime_config = wasmtime::Config::new(); + if self.policy_evaluation_limit_seconds.is_some() { + wasmtime_config.epoch_interruption(true); + } + + // We are going to share the same engine across all the workers + let engine = match wasmtime::Engine::new(&wasmtime_config) { + Ok(e) => e, + Err(e) => { + eprintln!("workers pool bootstrap: cannot instantiate `wasmtime::Engine`: {e:?}"); + std::process::exit(1); + } + }; + + let precompiled_policies = + match precompile_policies(&engine, &bootstrap_data.fetched_policies) { + Ok(pp) => pp, + Err(e) => { + eprintln!("{e}"); + std::process::exit(1); + } + }; + + // EvaluationEnvironment instance that is going to be shared across all + // the worker threads + let evaluation_environment = match EvaluationEnvironment::new( + &engine, + &bootstrap_data.policies, + &precompiled_policies, + self.always_accept_admission_reviews_on_namespace, + self.policy_evaluation_limit_seconds, + self.callback_handler_tx.clone(), + ) { + Ok(ee) => Arc::new(ee), + Err(e) => { + eprintln!("{e}"); + std::process::exit(1); + } + }; + + // For each policy defined by the user, ensure the given settings are valid + // We exit with an error if one or more policies do not have valid + // settings. + if let Err(error) = verify_policy_settings( + &engine, + &bootstrap_data.policies, + evaluation_environment.clone(), + self.policy_evaluation_limit_seconds, + ) { + error!(?error, "cannot validate policy settings"); + match bootstrap_data.resp_chan.send(Err(error)) { + Ok(_) => return, + Err(_) => { + eprint!("worker bootstrap: cannot send back failure through channel"); + std::process::exit(1); + } + }; + } + + let pool_size: usize = bootstrap_data.pool_size; + let barrier = Arc::new(Barrier::new(pool_size + 1)); + let boot_canary = Arc::new(AtomicBool::new(true)); + + if let Some(limit) = self.policy_evaluation_limit_seconds { + info!( + execution_limit_seconds = limit, + "policy timeout protection is enabled" + ); + } else { + warn!("policy timeout protection is disabled"); + } + + for n in 1..=pool_size { + let (tx, rx) = mpsc::channel::(32); + worker_tx_chans.push(tx); + + let b = barrier.clone(); + let inner_evaluation_environment = evaluation_environment.clone(); + + let join = thread::spawn(move || -> Result<()> { + info!(spawned = n, total = pool_size, "spawning worker"); + + let mut worker = Worker::new(rx, inner_evaluation_environment); + b.wait(); + + debug!(id = n, "worker loop start"); + worker.run(); + debug!(id = n, "worker loop exit"); + + Ok(()) + }); + join_handles.push(join); + } + + // Deallocate all the memory used by the precompiled policies since + // they are no longer needed. Without this explicit cleanup + // the reference would be dropped right before Policy Server exits, + // meaning a lot of memory would have been consumed without a valid reason + // during the whole execution time + drop(precompiled_policies); + barrier.wait(); + + if !boot_canary.load(Ordering::SeqCst) { + match bootstrap_data + .resp_chan + .send(Err(anyhow!("could not init one of the workers"))) + { + Ok(_) => return, + Err(_) => { + eprint!("worker bootstrap: cannot send back failure through channel"); + std::process::exit(1); + } + }; + } + + // bootstrap went smoothly + if bootstrap_data.resp_chan.send(Ok(())).is_err() { + eprint!("worker bootstrap: cannot send back success message through channel"); + std::process::exit(1); + } + + // Phase 2: the worker pool has been successfully bootstraped. + // We can start waiting for admission review requests to be evaluated + let mut next_worker_id = 0; + + if self.policy_evaluation_limit_seconds.is_some() { + // start a dedicated thread that send tick events to all + // the workers. This is used by the wasmtime's epoch_interruption + // to keep track of the execution time of each wasm module + let engine_timer_thread = engine.clone(); + thread::spawn(move || { + let one_second = time::Duration::from_secs(1); + loop { + thread::sleep(one_second); + engine_timer_thread.increment_epoch(); + } + }); + } + + while let Some(req) = self.api_rx.blocking_recv() { + let _ = worker_tx_chans[next_worker_id].blocking_send(req); + next_worker_id += 1; + if next_worker_id >= pool_size { + next_worker_id = 0; + } + } + + for handle in join_handles { + handle.join().unwrap().unwrap(); + } + } +} + +fn precompile_policies( + engine: &wasmtime::Engine, + fetched_policies: &FetchedPolicies, +) -> Result { + debug!( + wasm_modules_count = fetched_policies.len(), + "instantiating wasmtime::Module objects" + ); + + let precompiled_policies: HashMap> = fetched_policies + .par_iter() + .map(|(policy_url, wasm_module_path)| { + let precompiled_policy = PrecompiledPolicy::new(engine, wasm_module_path); + debug!(?policy_url, "module compiled"); + (policy_url.clone(), precompiled_policy) + }) + .collect(); + + let errors: Vec = precompiled_policies + .iter() + .filter_map(|(url, result)| match result { + Ok(_) => None, + Err(e) => Some(format!( + "[{url}] policy cannot be compiled to WebAssembly module: {e:?}" + )), + }) + .collect(); + if !errors.is_empty() { + return Err(anyhow!( + "workers pool bootstrap: cannot instantiate `wasmtime::Module` objects: {:?}", + errors.join(", ") + )); + } + + Ok(precompiled_policies + .iter() + .filter_map(|(url, result)| match result { + Ok(p) => Some((url.clone(), p.clone())), + Err(_) => None, + }) + .collect()) +} + +/// Ensure the user provided valid settings for all the policies +fn verify_policy_settings( + engine: &wasmtime::Engine, + policies: &HashMap, + evaluation_environment: Arc, + policy_evaluation_limit_seconds: Option, +) -> Result<()> { + let tick_thread_lock = Arc::new(RwLock::new(true)); + + if policy_evaluation_limit_seconds.is_some() { + // start a dedicated thread that send tick events to the + // wasmtime engine. + // This is used by the wasmtime's epoch_interruption + // to keep track of the execution time of each wasm module + + let loop_engine = engine.clone(); + let keep_going_lock = tick_thread_lock.clone(); + + thread::spawn(move || { + let one_second = time::Duration::from_secs(1); + loop { + thread::sleep(one_second); + loop_engine.increment_epoch(); + if !(*keep_going_lock.read().unwrap()) { + break; + } + } + }); + } + + let mut errors = vec![]; + for (policy_id, _policy) in policies.iter() { + let set_val_rep = evaluation_environment.validate_settings(policy_id)?; + if !set_val_rep.valid { + errors.push(format!( + "[{}] settings are not valid: {:?}", + policy_id, set_val_rep.message + )); + continue; + } + } + + if policy_evaluation_limit_seconds.is_some() { + // Tell the ticker thread loop to stop + let mut w = tick_thread_lock.write().unwrap(); + *w = false; + } + + if errors.is_empty() { + Ok(()) + } else { + Err(anyhow!("{}", errors.join(", "))) + } +} diff --git a/src/workers/precompiled_policy.rs b/src/workers/precompiled_policy.rs new file mode 100644 index 00000000..808d4e25 --- /dev/null +++ b/src/workers/precompiled_policy.rs @@ -0,0 +1,142 @@ +use anyhow::{anyhow, Result}; +use lazy_static::lazy_static; +use policy_evaluator::{ + policy_evaluator::PolicyExecutionMode, policy_metadata::Metadata, wasmtime, +}; +use semver::{BuildMetadata, Prerelease, Version}; +use sha2::{Digest, Sha256}; +use std::{collections::HashMap, fs, path::Path, vec::Vec}; + +lazy_static! { + static ref KUBEWARDEN_VERSION: Version = { + let mut version = Version::parse(env!("CARGO_PKG_VERSION")).expect("Cannot parse CARGO_PKG_VERSION version"); + // Remove the patch, prerelease and build information to avoid rejections + // like this: v1.6.0-rc4 < v1.6.0 + version.patch = 0; + version.pre = Prerelease::EMPTY; + version.build = BuildMetadata::EMPTY; + version + }; +} + +/// This structure holds a precompiled WebAssembly module +/// representing a policy. +/// +/// Compiling a WebAssembly module is an expensive operation. Each +/// worker thread needs to do that, for each policy defined by the user. +/// +/// Precompiling the policies ahead of time reduces the bootstrap time by a lot. +/// +/// **Warning:** when "rehydrating" the module, you have to use a `wasmtime::Engine` +/// that has been created with the same `wasmtime::Config` used at compilation time. +#[derive(Clone)] +pub(crate) struct PrecompiledPolicy { + /// A precompiled [`wasmtime::Module`] + pub precompiled_module: Vec, + + /// The execution mode of the policy + pub execution_mode: PolicyExecutionMode, + + /// sha256 digest of the precompiled module + pub digest: String, +} + +impl PrecompiledPolicy { + /// Load a WebAssembly module from the disk and compiles it + pub fn new(engine: &wasmtime::Engine, wasm_module_path: &Path) -> Result { + let policy_contents = fs::read(wasm_module_path)?; + let policy_metadata = Metadata::from_contents(&policy_contents)?; + let metadata = policy_metadata.unwrap_or_default(); + let execution_mode = metadata.execution_mode; + has_minimum_kubewarden_version(&metadata)?; + + let precompiled_module = engine.precompile_module(&policy_contents)?; + + let mut hasher = Sha256::new(); + hasher.update(&precompiled_module); + let digest = hasher.finalize(); + + Ok(Self { + precompiled_module, + execution_mode, + digest: format!("{digest:x}"), + }) + } +} + +/// A dictionary with: +/// * Key: the URL of the WebAssembly module +/// * value: the PrecompiledPolicy +pub(crate) type PrecompiledPolicies = HashMap; + +/// Check if policy server version is compatible with minimum kubewarden +/// version required by the policy +fn has_minimum_kubewarden_version(metadata: &Metadata) -> Result<()> { + if let Some(minimum_kubewarden_version) = &metadata.minimum_kubewarden_version { + let sanitized_minimum_kubewarden_version = Version { + major: minimum_kubewarden_version.major, + minor: minimum_kubewarden_version.minor, + // Kubewarden stack version ignore patch, prerelease and build version numbers + patch: 0, + pre: Prerelease::EMPTY, + build: BuildMetadata::EMPTY, + }; + if *KUBEWARDEN_VERSION < sanitized_minimum_kubewarden_version { + return Err(anyhow!( + "Policy required Kubewarden version {} but is running on {}", + sanitized_minimum_kubewarden_version, + KUBEWARDEN_VERSION.to_string(), + )); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use rstest::rstest; + + fn generate_metadata(major: u64, minor: u64, patch: u64) -> Metadata { + let minimum_kubewarden_version = Version { + major, + minor, + patch, + pre: Prerelease::EMPTY, + build: BuildMetadata::EMPTY, + }; + Metadata { + minimum_kubewarden_version: Some(minimum_kubewarden_version), + ..Default::default() + } + } + + #[rstest] + #[case(generate_metadata(KUBEWARDEN_VERSION.major -1, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch))] + #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor - 1, KUBEWARDEN_VERSION.patch))] + fn recent_kubewarden_versions_test(#[case] metadata: Metadata) { + assert!(has_minimum_kubewarden_version(&metadata).is_ok()) + } + + #[rstest] + #[case(generate_metadata(KUBEWARDEN_VERSION.major +1, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch))] + #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor + 1, KUBEWARDEN_VERSION.patch))] + fn old_kubewarden_versions_test(#[case] metadata: Metadata) { + assert!(has_minimum_kubewarden_version(&metadata).is_err()) + } + + #[test] + fn no_mininum_kubewarden_version_is_valid_test() { + let metadata = Metadata { + minimum_kubewarden_version: None, + ..Default::default() + }; + assert!(has_minimum_kubewarden_version(&metadata).is_ok()) + } + + #[rstest] + #[case(generate_metadata(KUBEWARDEN_VERSION.major, KUBEWARDEN_VERSION.minor, KUBEWARDEN_VERSION.patch + 1))] + fn ignore_patch_version_test(#[case] metadata: Metadata) { + assert!(has_minimum_kubewarden_version(&metadata).is_ok()) + } +} diff --git a/src/worker.rs b/src/workers/worker.rs similarity index 66% rename from src/worker.rs rename to src/workers/worker.rs index 59c7a2c2..255ec528 100644 --- a/src/worker.rs +++ b/src/workers/worker.rs @@ -1,45 +1,31 @@ -use anyhow::{anyhow, Result}; -use itertools::Itertools; -use policy_evaluator::callback_requests::CallbackRequest; -use policy_evaluator::wasmtime; use policy_evaluator::{ admission_response::{AdmissionResponse, AdmissionResponseStatus}, - policy_evaluator::{Evaluator, ValidateRequest}, + policy_evaluator::ValidateRequest, }; -use std::{collections::HashMap, fmt, time::Instant}; -use tokio::sync::mpsc::{Receiver, Sender}; +use std::{sync::Arc, time::Instant}; +use tokio::sync::mpsc::Receiver; use tracing::{error, info, info_span}; use crate::communication::{EvalRequest, RequestOrigin}; -use crate::config::{Policy, PolicyMode}; +use crate::config::PolicyMode; use crate::metrics::{self}; -use crate::worker_pool::PrecompiledPolicies; - -struct PolicyEvaluatorWithSettings { - policy_evaluator: Box, - policy_mode: PolicyMode, - allowed_to_mutate: bool, - always_accept_admission_reviews_on_namespace: Option, -} +use crate::workers::{ + error::{EvaluationError, Result}, + EvaluationEnvironment, +}; pub(crate) struct Worker { - evaluators: HashMap, + evaluation_environment: Arc, channel_rx: Receiver, } -pub struct PolicyErrors(HashMap); - -impl fmt::Display for PolicyErrors { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let mut errors = self - .0 - .iter() - .map(|(policy, error)| format!("[{policy}: {error}]")); - write!(f, "{}", errors.join(", ")) - } -} - impl Worker { + /// Create a new Worker. Takes care of allocating the `PolicyEvaluator` environments + /// required to evaluate the policies. + /// + /// No check is done against the policy settings provided by the user. The `WorkerPool` + /// already verified that all the settings are valid. + #[allow(clippy::too_many_arguments)] #[tracing::instrument( name = "worker_new", fields(host=crate::config::HOSTNAME.as_str()), @@ -47,56 +33,12 @@ impl Worker { )] pub(crate) fn new( rx: Receiver, - policies: &HashMap, - precompiled_policies: &PrecompiledPolicies, - engine: wasmtime::Engine, - callback_handler_tx: Sender, - always_accept_admission_reviews_on_namespace: Option, - policy_evaluation_limit_seconds: Option, - ) -> Result { - let mut evs_errors = HashMap::new(); - let mut evs = HashMap::new(); - - for (id, policy) in policies.iter() { - // It's safe to clone the outer engine. This creates a shallow copy - let inner_engine = engine.clone(); - let policy_evaluator = match crate::worker_pool::build_policy_evaluator( - id, - policy, - &inner_engine, - precompiled_policies, - callback_handler_tx.clone(), - policy_evaluation_limit_seconds, - ) { - Ok(pe) => Box::new(pe), - Err(e) => { - evs_errors.insert( - id.clone(), - format!("[{id}] could not create PolicyEvaluator: {e:?}"), - ); - continue; - } - }; - - let policy_evaluator_with_settings = PolicyEvaluatorWithSettings { - policy_evaluator, - policy_mode: policy.policy_mode.clone(), - allowed_to_mutate: policy.allowed_to_mutate.unwrap_or(false), - always_accept_admission_reviews_on_namespace: - always_accept_admission_reviews_on_namespace.clone(), - }; - - evs.insert(id.to_string(), policy_evaluator_with_settings); - } - - if !evs_errors.is_empty() { - return Err(PolicyErrors(evs_errors)); - } - - Ok(Worker { - evaluators: evs, + evaluation_environment: Arc, + ) -> Self { + Worker { + evaluation_environment, channel_rx: rx, - }) + } } // Returns a validation response with policy-server specific @@ -154,31 +96,43 @@ impl Worker { } } + /// Endless loop that waits for the WorkerPool to give a new request to be processed. + /// The request is then evaluated and a response is returned back to the WorkerPool. pub(crate) fn run(&mut self) { while let Some(req) = self.channel_rx.blocking_recv() { let span = info_span!(parent: &req.parent_span, "policy_eval"); let _enter = span.enter(); - let res = match self.evaluators.get_mut(&req.policy_id) { - Some(pes) => Self::evaluate(req, pes), - None => req - .resp_chan - .send(None) - .map_err(|_| anyhow!("cannot send response back")), + let admission_response = match self.evaluate(&req) { + Ok(ar) => Some(ar), + Err(EvaluationError::PolicyNotFound(_)) => None, + Err(e) => Some(AdmissionResponse::reject_internal_server_error( + req.req.uid().to_owned(), + e.to_string(), + )), }; - if res.is_err() { - error!("receiver dropped"); + + if let Err(e) = req.resp_chan.send(admission_response) { + error!("cannot send response back: {e:?}"); } } } - fn evaluate(req: EvalRequest, pes: &mut PolicyEvaluatorWithSettings) -> anyhow::Result<()> { + /// Perform the actual evaluation + fn evaluate(&mut self, req: &EvalRequest) -> Result { let start_time = Instant::now(); - let policy_name = pes.policy_evaluator.policy_id(); - let policy_mode = pes.policy_mode.clone(); - let allowed_to_mutate = pes.allowed_to_mutate; - let vanilla_validation_response = pes.policy_evaluator.validate(req.req.clone()); + let policy_name = req.policy_id.clone(); + let policy_mode = self + .evaluation_environment + .get_policy_mode(&req.policy_id)?; + let allowed_to_mutate = self + .evaluation_environment + .get_policy_allowed_to_mutate(&req.policy_id)?; + + let vanilla_validation_response = + self.evaluation_environment.validate(&req.policy_id, req)?; + let policy_evaluation_duration = start_time.elapsed(); let accepted = vanilla_validation_response.allowed; let mutated = vanilla_validation_response.patch.is_some(); @@ -200,17 +154,23 @@ impl Worker { match req.req.clone() { ValidateRequest::AdmissionRequest(adm_req) => { - if let Some(namespace) = &pes.always_accept_admission_reviews_on_namespace { - // If the policy server is configured to - // always accept admission reviews on a - // given namespace, just set the `allowed` - // part of the response to `true` if the - // request matches this namespace. Keep - // the rest of the behaviors unchanged, - // such as checking if the policy is - // allowed to mutate. - - if adm_req.namespace == Some(namespace.to_string()) { + // TODO: we should check immediately if the request is coming from the "always + // accepted" namespace ASAP. Right now we do an evaluation and then we discard the + // result if the namespace is the special one. + // Moreover, I (flavio) don't like the fact we're using a mutable variable for + // `validation_response` + if let Some(ref req_namespace) = adm_req.namespace { + if self + .evaluation_environment + .should_always_accept_requests_made_inside_of_namespace(req_namespace) + { + // given namespace, just set the `allowed` + // part of the response to `true` if the + // request matches this namespace. Keep + // the rest of the behaviors unchanged, + // such as checking if the policy is + // allowed to mutate. + validation_response = AdmissionResponse { allowed: true, status: None, @@ -218,7 +178,6 @@ impl Worker { }; } } - let policy_evaluation = metrics::PolicyEvaluation { policy_name, policy_mode: policy_mode.into(), @@ -247,9 +206,7 @@ impl Worker { metrics::add_policy_evaluation(&policy_evaluation); } }; - - let res = req.resp_chan.send(Some(validation_response)); - res.map_err(|_| anyhow!("cannot send response back")) + Ok(validation_response) } } @@ -257,82 +214,70 @@ impl Worker { mod tests { use crate::admission_review::tests::build_admission_review; use crate::communication::RequestOrigin; - - use policy_evaluator::kubewarden_policy_sdk::settings::SettingsValidationResponse; - use policy_evaluator::ProtocolVersion; use rstest::*; - use tokio::sync::oneshot; + use tokio::sync::{mpsc, oneshot}; use super::*; const POLICY_ID: &str = "policy-id"; - struct MockPolicyEvaluator { - pub policy_id: String, - pub admission_response: AdmissionResponse, - pub settings_validation_response: SettingsValidationResponse, - pub protocol_version: Result, - } - - impl Default for MockPolicyEvaluator { - fn default() -> Self { - Self { - policy_id: "mock_policy".to_string(), - admission_response: AdmissionResponse { - allowed: false, - ..Default::default() - }, - settings_validation_response: SettingsValidationResponse { - valid: true, - message: None, - }, - protocol_version: Ok(ProtocolVersion::V1), - } - } - } - - impl MockPolicyEvaluator { - fn new_allowed() -> MockPolicyEvaluator { - MockPolicyEvaluator { - admission_response: AdmissionResponse { + fn create_evaluation_environment_that_accepts_request( + policy_mode: PolicyMode, + ) -> Arc { + let mut mock_evaluation_environment = EvaluationEnvironment::default(); + mock_evaluation_environment + .expect_validate() + .returning(|_policy_id, request| { + Ok(AdmissionResponse { + uid: request.req.uid().to_owned(), allowed: true, ..Default::default() - }, - ..Default::default() - } - } - - fn new_rejected(message: Option, code: Option) -> MockPolicyEvaluator { - MockPolicyEvaluator { - admission_response: AdmissionResponse { - allowed: false, - status: Some(AdmissionResponseStatus { message, code }), - ..Default::default() - }, - ..Default::default() - } - } + }) + }); + mock_evaluation_environment + .expect_get_policy_mode() + .returning(move |_policy_id| Ok(policy_mode.clone())); + mock_evaluation_environment + .expect_get_policy_allowed_to_mutate() + .returning(|_policy_id| Ok(false)); + mock_evaluation_environment + .expect_should_always_accept_requests_made_inside_of_namespace() + .returning(|_namespace| false); + Arc::new(mock_evaluation_environment) } - impl Evaluator for MockPolicyEvaluator { - fn validate(&mut self, _request: ValidateRequest) -> AdmissionResponse { - self.admission_response.clone() - } - - fn validate_settings(&mut self) -> SettingsValidationResponse { - self.settings_validation_response.clone() - } - - fn protocol_version(&mut self) -> Result { - match &self.protocol_version { - Ok(pv) => Ok(pv.clone()), - Err(e) => Err(anyhow::anyhow!("{}", e)), - } - } + #[derive(Clone)] + struct EvaluationEnvironmentRejectionDetails { + message: String, + code: u16, + } - fn policy_id(&self) -> String { - self.policy_id.clone() - } + fn create_evaluation_environment_that_reject_request( + policy_mode: PolicyMode, + rejection_details: EvaluationEnvironmentRejectionDetails, + allowed_namespace: String, + ) -> Arc { + let mut mock_evaluation_environment = EvaluationEnvironment::default(); + mock_evaluation_environment + .expect_validate() + .returning(move |_policy_id, request| { + Ok(AdmissionResponse::reject( + request.req.uid().to_owned(), + rejection_details.message.clone(), + rejection_details.code, + )) + }); + mock_evaluation_environment + .expect_get_policy_mode() + .returning(move |_policy_id| Ok(policy_mode.clone())); + mock_evaluation_environment + .expect_get_policy_allowed_to_mutate() + .returning(|_policy_id| Ok(false)); + mock_evaluation_environment + .expect_should_always_accept_requests_made_inside_of_namespace() + .returning(move |namespace| namespace == allowed_namespace); + + Arc::new(mock_evaluation_environment) } #[test] @@ -622,7 +567,7 @@ mod tests { #[case] policy_mode: PolicyMode, #[case] request_origin: RequestOrigin, ) { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let req = ValidateRequest::AdmissionRequest( build_admission_review().request.expect("no request"), ); @@ -635,21 +580,13 @@ mod tests { request_origin, }; - let mock_evaluator = MockPolicyEvaluator::new_allowed(); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let mut worker = Worker { + channel_rx, + evaluation_environment: create_evaluation_environment_that_accepts_request(policy_mode), }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); } @@ -664,7 +601,7 @@ mod tests { #[case] request_origin: RequestOrigin, #[case] accept: bool, ) { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let req = ValidateRequest::AdmissionRequest( build_admission_review().request.expect("no request"), ); @@ -677,23 +614,22 @@ mod tests { request_origin, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message.clone(), code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( policy_mode, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + rejection_details.clone(), + "".to_string(), + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); if accept { assert!(response.allowed); @@ -701,14 +637,14 @@ mod tests { } else { assert!(!response.allowed); let response_status = response.status.expect("should be set"); - assert_eq!(response_status.message, message); - assert_eq!(response_status.code, code); + assert_eq!(response_status.message, Some(rejection_details.message)); + assert_eq!(response_status.code, Some(rejection_details.code)); } } #[test] fn evaluate_policy_evaluator_accepts_request_raw() { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let request = serde_json::json!(r#"{"foo": "bar"}"#); let req = ValidateRequest::Raw(request.clone()); @@ -721,27 +657,21 @@ mod tests { request_origin: RequestOrigin::Validate, }; - let mock_evaluator = MockPolicyEvaluator::new_allowed(); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let mut worker = Worker { + channel_rx, + evaluation_environment: create_evaluation_environment_that_accepts_request( + PolicyMode::Protect, + ), }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); } #[test] fn evaluate_policy_evaluator_rejects_request_raw() { - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let request = serde_json::json!(r#"{"foo": "bar"}"#); let req = ValidateRequest::Raw(request.clone()); @@ -754,28 +684,26 @@ mod tests { request_origin: RequestOrigin::Validate, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message.clone(), code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: None, + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( + PolicyMode::Protect, + rejection_details.clone(), + "".to_string(), + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); - + let response = worker.evaluate(&eval_req).unwrap(); assert!(!response.allowed); let response_status = response.status.expect("should be set"); - assert_eq!(response_status.message, message); - assert_eq!(response_status.code, code); + assert_eq!(response_status.message, Some(rejection_details.message)); + assert_eq!(response_status.code, Some(rejection_details.code)); } #[rstest] @@ -789,7 +717,7 @@ mod tests { // of a namespace that is ignored by kubewarden -> this leads the // request to still be accepted - let (tx, mut rx) = oneshot::channel::>(); + let (tx, _) = oneshot::channel::>(); let allowed_namespace = "kubewarden_special".to_string(); @@ -805,23 +733,22 @@ mod tests { request_origin, }; - let message = Some("boom".to_string()); - let code = Some(500); - let mock_evaluator = MockPolicyEvaluator::new_rejected(message, code); - let mut pes = PolicyEvaluatorWithSettings { - policy_evaluator: Box::new(mock_evaluator), - policy_mode: PolicyMode::Protect, - allowed_to_mutate: false, - always_accept_admission_reviews_on_namespace: Some(allowed_namespace), + let (_, channel_rx) = mpsc::channel::(10); + let rejection_details = EvaluationEnvironmentRejectionDetails { + message: "boom".to_string(), + code: 500, + }; + let mock_evaluation_environment = create_evaluation_environment_that_reject_request( + PolicyMode::Protect, + rejection_details.clone(), + allowed_namespace, + ); + let mut worker = Worker { + channel_rx, + evaluation_environment: mock_evaluation_environment, }; - let result = Worker::evaluate(eval_req, &mut pes); - assert!(result.is_ok()); - - let response = rx - .try_recv() - .expect("Got an error") - .expect("expected a AdmissionResponse object"); + let response = worker.evaluate(&eval_req).unwrap(); assert!(response.allowed); assert!(response.status.is_none()); }