From 053e895f573ed4a3b4d0b3394a83b41d4e05dc25 Mon Sep 17 00:00:00 2001 From: Christopher Patton Date: Mon, 13 Jan 2025 15:18:02 -0800 Subject: [PATCH] Update taskprov advertisement encoding for latest draft --- crates/dapf/src/acceptance/mod.rs | 5 +- crates/dapf/src/cli_parsers.rs | 2 +- crates/dapf/src/main.rs | 6 +- crates/daphne-server/src/roles/aggregator.rs | 2 +- crates/daphne-server/src/roles/mod.rs | 4 +- crates/daphne-server/src/router/extractor.rs | 26 +- crates/daphne-server/tests/e2e/e2e.rs | 63 -- crates/daphne-server/tests/e2e/test_runner.rs | 9 +- .../src/aggregator/roles/aggregator.rs | 2 +- .../daphne-worker/src/aggregator/roles/mod.rs | 4 +- .../src/aggregator/router/extractor.rs | 26 +- crates/daphne/src/lib.rs | 37 +- crates/daphne/src/messages/taskprov.rs | 902 +++++++++++------- crates/daphne/src/roles/mod.rs | 10 +- crates/daphne/src/taskprov.rs | 239 +++-- 15 files changed, 785 insertions(+), 552 deletions(-) diff --git a/crates/dapf/src/acceptance/mod.rs b/crates/dapf/src/acceptance/mod.rs index dc82028a..83848fbe 100644 --- a/crates/dapf/src/acceptance/mod.rs +++ b/crates/dapf/src/acceptance/mod.rs @@ -381,7 +381,10 @@ impl Test { lifetime: 60, min_batch_size: reports_per_batch.try_into().unwrap(), query: DapBatchMode::LeaderSelected { - max_batch_size: NonZeroU32::new(reports_per_batch.try_into().unwrap()), + draft09_max_batch_size: match version { + DapVersion::Draft09 => NonZeroU32::new(reports_per_batch.try_into().unwrap()), + DapVersion::Latest => None, + }, }, vdaf: self.vdaf_config, ..Default::default() diff --git a/crates/dapf/src/cli_parsers.rs b/crates/dapf/src/cli_parsers.rs index 4b3cbf2c..d5f53891 100644 --- a/crates/dapf/src/cli_parsers.rs +++ b/crates/dapf/src/cli_parsers.rs @@ -155,7 +155,7 @@ impl FromStr for CliDapBatchMode { Ok(Self(DapBatchMode::TimeInterval)) } else if let Some(size) = s.strip_prefix("leader-selected") { Ok(Self(DapBatchMode::LeaderSelected { - max_batch_size: if let Some(size) = size.strip_prefix("-") { + draft09_max_batch_size: if let Some(size) = size.strip_prefix("-") { Some( size.parse() .map_err(|e| format!("{s} is an invalid max batch size: {e:?}"))?, diff --git a/crates/dapf/src/main.rs b/crates/dapf/src/main.rs index 218a73d4..685285ae 100644 --- a/crates/dapf/src/main.rs +++ b/crates/dapf/src/main.rs @@ -845,7 +845,7 @@ async fn handle_test_routes(action: TestAction, http_client: HttpClient) -> anyh let CliDapBatchMode(query) = use_or_request_from_user_or_default( query, || DapBatchMode::LeaderSelected { - max_batch_size: None, + draft09_max_batch_size: None, }, "query", )?; @@ -885,7 +885,9 @@ async fn handle_test_routes(action: TestAction, http_client: HttpClient) -> anyh )?, max_batch_size: match query { DapBatchMode::TimeInterval => None, - DapBatchMode::LeaderSelected { max_batch_size } => max_batch_size, + DapBatchMode::LeaderSelected { + draft09_max_batch_size, + } => draft09_max_batch_size, }, time_precision: use_or_request_from_user_or_default( time_precision, diff --git a/crates/daphne-server/src/roles/aggregator.rs b/crates/daphne-server/src/roles/aggregator.rs index 26b1a2d5..7018bb82 100644 --- a/crates/daphne-server/src/roles/aggregator.rs +++ b/crates/daphne-server/src/roles/aggregator.rs @@ -196,7 +196,7 @@ impl DapAggregator for crate::App { num_agg_span_shards: global_config.default_num_agg_span_shards, }) }, - Some(task_config.task_expiration), + Some(task_config.not_after()), ) .await .map_err(|e| match &*e { diff --git a/crates/daphne-server/src/roles/mod.rs b/crates/daphne-server/src/roles/mod.rs index ec96287a..0e27339f 100644 --- a/crates/daphne-server/src/roles/mod.rs +++ b/crates/daphne-server/src/roles/mod.rs @@ -287,7 +287,9 @@ mod test_utils { err = "command failed: unexpected max batch size" )) } - (2, max_batch_size) => DapBatchMode::LeaderSelected { max_batch_size }, + (2, max_batch_size) => DapBatchMode::LeaderSelected { + draft09_max_batch_size: max_batch_size, + }, _ => { return Err(fatal_error!( err = "command failed: unrecognized batch mode" diff --git a/crates/daphne-server/src/router/extractor.rs b/crates/daphne-server/src/router/extractor.rs index 2faacd62..ef088c12 100644 --- a/crates/daphne-server/src/router/extractor.rs +++ b/crates/daphne-server/src/router/extractor.rs @@ -671,17 +671,25 @@ mod test { helper_url: daphne::messages::taskprov::UrlBytes { bytes: b"http://helper".into(), }, - query_config: daphne::messages::taskprov::QueryConfig { - time_precision: 1, - max_batch_query_count: 1, - min_batch_size: 1, - batch_mode: daphne::messages::taskprov::BatchMode::TimeInterval, + time_precision: 1, + min_batch_size: 1, + query_config: daphne::messages::taskprov::QueryConfig::TimeInterval, + lifetime: match version { + DapVersion::Draft09 => { + daphne::messages::taskprov::TaskLifetime::Draft09 { expiration: 1 } + } + DapVersion::Latest => daphne::messages::taskprov::TaskLifetime::Latest { + start: 0, + duration: 1, + }, }, - task_expiration: 1, - vdaf_config: daphne::messages::taskprov::VdafConfig { - dp_config: daphne::messages::taskprov::DpConfig::None, - var: daphne::messages::taskprov::VdafTypeVar::Prio2 { dimension: 1 }, + vdaf_config: daphne::messages::taskprov::VdafConfig::Prio2 { dimension: 1 }, + extensions: Vec::new(), + draft09_max_batch_query_count: match version { + DapVersion::Draft09 => Some(1), + DapVersion::Latest => None, }, + draft09_dp_config: Some(daphne::messages::taskprov::DpConfig::None), }; let req = test::<()>( diff --git a/crates/daphne-server/tests/e2e/e2e.rs b/crates/daphne-server/tests/e2e/e2e.rs index 5d7057d1..d0cd8ca9 100644 --- a/crates/daphne-server/tests/e2e/e2e.rs +++ b/crates/daphne-server/tests/e2e/e2e.rs @@ -478,69 +478,6 @@ async fn leader_upload_taskprov() { .unwrap(); } -async fn leader_upload_taskprov_wrong_version(version: DapVersion) { - let wrong_version = match version { - DapVersion::Draft09 => DapVersion::Latest, - DapVersion::Latest => DapVersion::Draft09, - }; - let method = match version { - DapVersion::Draft09 => &http::Method::PUT, - DapVersion::Latest => &http::Method::POST, - }; - let t = TestRunner::default_with_version(version).await; - let client = t.http_client(); - let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap(); - - let (task_config, task_id, taskprov_advertisement) = DapTaskParameters { - version, - min_batch_size: 10, - query: DapBatchMode::TimeInterval, - leader_url: t.task_config.leader_url.clone(), - helper_url: t.task_config.helper_url.clone(), - ..Default::default() - } - .to_config_with_taskprov( - b"cool task".to_vec(), - t.now, - daphne::roles::aggregator::TaskprovConfig { - hpke_collector_config: &t.taskprov_collector_hpke_receiver.config, - vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init, - }, - ) - .unwrap(); - - let report = task_config - .vdaf - .produce_report_with_extensions( - &hpke_config_list, - t.now, - &task_id, - DapMeasurement::U32Vec(vec![1; 10]), - vec![Extension::Taskprov], - version, - ) - .unwrap(); - t.leader_request_expect_abort( - client, - None, - &format!("tasks/{}/reports", task_id.to_base64url()), - method, - DapMediaType::Report, - Some( - &taskprov_advertisement - .serialize_to_header_value(wrong_version) - .unwrap(), - ), - report.get_encoded_with_param(&version).unwrap(), - 400, - "unrecognizedTask", - ) - .await - .unwrap(); -} - -async_test_versions!(leader_upload_taskprov_wrong_version); - async fn internal_leader_process(version: DapVersion) { let t = TestRunner::default_with_version(version).await; let path = t.upload_path(); diff --git a/crates/daphne-server/tests/e2e/test_runner.rs b/crates/daphne-server/tests/e2e/test_runner.rs index 2217e2c2..1556f856 100644 --- a/crates/daphne-server/tests/e2e/test_runner.rs +++ b/crates/daphne-server/tests/e2e/test_runner.rs @@ -68,7 +68,10 @@ impl TestRunner { Self::with( version, &DapBatchMode::LeaderSelected { - max_batch_size: Some(NonZeroU32::new(MAX_BATCH_SIZE).unwrap()), + draft09_max_batch_size: match version { + DapVersion::Draft09 => NonZeroU32::new(MAX_BATCH_SIZE), + DapVersion::Latest => None, + }, }, ) .await @@ -178,7 +181,9 @@ impl TestRunner { let (batch_mode, max_batch_size) = match t.task_config.query { DapBatchMode::TimeInterval => (1, None), - DapBatchMode::LeaderSelected { max_batch_size } => (2, Some(max_batch_size)), + DapBatchMode::LeaderSelected { + draft09_max_batch_size, + } => (2, draft09_max_batch_size), }; const MAX_ATTEMPTS: usize = 10; diff --git a/crates/daphne-worker/src/aggregator/roles/aggregator.rs b/crates/daphne-worker/src/aggregator/roles/aggregator.rs index cbe3ae3b..1d9124e4 100644 --- a/crates/daphne-worker/src/aggregator/roles/aggregator.rs +++ b/crates/daphne-worker/src/aggregator/roles/aggregator.rs @@ -195,7 +195,7 @@ impl DapAggregator for App { num_agg_span_shards: global_config.default_num_agg_span_shards, }) }, - Some(task_config.task_expiration), + Some(task_config.not_after()), ), ) .await diff --git a/crates/daphne-worker/src/aggregator/roles/mod.rs b/crates/daphne-worker/src/aggregator/roles/mod.rs index 574d840d..5fd7d3d1 100644 --- a/crates/daphne-worker/src/aggregator/roles/mod.rs +++ b/crates/daphne-worker/src/aggregator/roles/mod.rs @@ -269,7 +269,9 @@ mod test_utils { err = "command failed: unexpected max batch size" )) } - (2, max_batch_size) => DapBatchMode::LeaderSelected { max_batch_size }, + (2, max_batch_size) => DapBatchMode::LeaderSelected { + draft09_max_batch_size: max_batch_size, + }, _ => { return Err(fatal_error!( err = "command failed: unrecognized batch mode" diff --git a/crates/daphne-worker/src/aggregator/router/extractor.rs b/crates/daphne-worker/src/aggregator/router/extractor.rs index 3572604c..dbf0cacc 100644 --- a/crates/daphne-worker/src/aggregator/router/extractor.rs +++ b/crates/daphne-worker/src/aggregator/router/extractor.rs @@ -727,17 +727,25 @@ mod test { helper_url: daphne::messages::taskprov::UrlBytes { bytes: b"http://helper".into(), }, - query_config: daphne::messages::taskprov::QueryConfig { - time_precision: 1, - max_batch_query_count: 1, - min_batch_size: 1, - batch_mode: daphne::messages::taskprov::BatchMode::TimeInterval, + time_precision: 1, + min_batch_size: 1, + query_config: daphne::messages::taskprov::QueryConfig::TimeInterval, + lifetime: match version { + DapVersion::Draft09 => { + daphne::messages::taskprov::TaskLifetime::Draft09 { expiration: 1 } + } + DapVersion::Latest => daphne::messages::taskprov::TaskLifetime::Latest { + start: 0, + duration: 1, + }, }, - task_expiration: 1, - vdaf_config: daphne::messages::taskprov::VdafConfig { - dp_config: daphne::messages::taskprov::DpConfig::None, - var: daphne::messages::taskprov::VdafTypeVar::Prio2 { dimension: 1 }, + vdaf_config: daphne::messages::taskprov::VdafConfig::Prio2 { dimension: 1 }, + extensions: Vec::new(), + draft09_max_batch_query_count: match version { + DapVersion::Draft09 => Some(1), + DapVersion::Latest => None, }, + draft09_dp_config: Some(daphne::messages::taskprov::DpConfig::None), }; let req = test::<()>( diff --git a/crates/daphne/src/lib.rs b/crates/daphne/src/lib.rs index d7499078..7c91e311 100644 --- a/crates/daphne/src/lib.rs +++ b/crates/daphne/src/lib.rs @@ -227,7 +227,7 @@ impl DapGlobalConfig { } } -/// DAP Query configuration. +/// DAP batch configuration. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] #[cfg_attr(any(test, feature = "test-utils"), derive(deepsize::DeepSizeOf))] @@ -241,7 +241,8 @@ pub enum DapBatchMode { /// Aggregators are meant to stop aggregating reports when this limit is reached. LeaderSelected { #[serde(default)] - max_batch_size: Option, + #[serde(rename = "max_batch_size")] + draft09_max_batch_size: Option, }, } @@ -553,6 +554,9 @@ impl DapTaskParameters { now: Time, taskprov_config: roles::aggregator::TaskprovConfig<'_>, ) -> Result<(DapTaskConfig, TaskId, TaskprovAdvertisement), DapError> { + let not_before = now; + let not_after = now + 86400 * 14; // expires in two weeks + let taskprov_advertisement = messages::taskprov::TaskprovAdvertisement { task_info, leader_url: messages::taskprov::UrlBytes { @@ -561,16 +565,23 @@ impl DapTaskParameters { helper_url: messages::taskprov::UrlBytes { bytes: self.helper_url.to_string().into_bytes(), }, - query_config: messages::taskprov::QueryConfig { - time_precision: self.time_precision, - max_batch_query_count: 1, - min_batch_size: self.min_batch_size.try_into().unwrap(), - batch_mode: (&self.query).try_into()?, + time_precision: self.time_precision, + min_batch_size: self.min_batch_size.try_into().unwrap(), + query_config: (&self.query).try_into()?, + lifetime: messages::taskprov::TaskLifetime::from_validity_range( + self.version, + not_before, + not_after, + ), + vdaf_config: (&self.vdaf).try_into()?, + extensions: Vec::new(), + draft09_max_batch_query_count: match self.version { + DapVersion::Draft09 => Some(1), + DapVersion::Latest => None, }, - task_expiration: now + 86400 * 14, // expires in two weeks - vdaf_config: messages::taskprov::VdafConfig { - dp_config: messages::taskprov::DpConfig::None, - var: (&self.vdaf).try_into()?, + draft09_dp_config: match self.version { + DapVersion::Draft09 => Some(messages::taskprov::DpConfig::None), + DapVersion::Latest => None, }, }; @@ -736,7 +747,7 @@ impl DapTaskConfig { ) -> Result { match self.query { DapBatchMode::LeaderSelected { - max_batch_size: Some(max_batch_size), + draft09_max_batch_size: Some(max_batch_size), } => { if report_count > u64::from(max_batch_size.get()) { return Err(DapAbort::InvalidBatchSize { @@ -749,7 +760,7 @@ impl DapTaskConfig { } DapBatchMode::TimeInterval | DapBatchMode::LeaderSelected { - max_batch_size: None, + draft09_max_batch_size: None, } => (), }; diff --git a/crates/daphne/src/messages/taskprov.rs b/crates/daphne/src/messages/taskprov.rs index 15ab8bb1..166c0fc7 100644 --- a/crates/daphne/src/messages/taskprov.rs +++ b/crates/daphne/src/messages/taskprov.rs @@ -10,8 +10,8 @@ use crate::messages::{ use crate::pine::PineParam; use crate::{DapError, DapVersion}; use prio::codec::{ - decode_u8_items, encode_u8_items, CodecError, Decode, Encode, ParameterizedDecode, - ParameterizedEncode, + decode_u16_items, decode_u8_items, encode_u16_items, encode_u8_items, CodecError, Decode, + Encode, ParameterizedDecode, ParameterizedEncode, }; use serde::{Deserialize, Serialize}; use std::io::{Cursor, Read}; @@ -34,7 +34,7 @@ const DP_MECHANISM_NONE: u8 = 0x01; /// A VDAF type along with its type-specific data. #[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] -pub enum VdafTypeVar { +pub enum VdafConfig { Prio2 { dimension: u32, }, @@ -128,82 +128,168 @@ impl Decode for PineParam { } } -impl ParameterizedEncode for VdafTypeVar { +impl ParameterizedEncode for VdafConfig { fn encode_with_param( &self, - _version: &DapVersion, + version: &DapVersion, bytes: &mut Vec, ) -> Result<(), CodecError> { - match self { - Self::Prio2 { dimension } => { - VDAF_TYPE_PRIO2.encode(bytes)?; - dimension.encode(bytes)?; - } - Self::Prio3SumVecField64MultiproofHmacSha256Aes128 { - length, - bits, - chunk_length, - num_proofs, - } => { - VDAF_TYPE_PRIO3_SUM_VEC_FIELD64_MULTIPROOF_HMAC_SHA256_AES128.encode(bytes)?; - length.encode(bytes)?; - bits.encode(bytes)?; - chunk_length.encode(bytes)?; - num_proofs.encode(bytes)?; - } - Self::Pine32HmacSha256Aes128 { param } => { - VDAF_TYPE_PINE_FIELD32_HMAC_SHA256_AES128.encode(bytes)?; - param.encode(bytes)?; - } - Self::Pine64HmacSha256Aes128 { param } => { - VDAF_TYPE_PINE_FIELD64_HMAC_SHA256_AES128.encode(bytes)?; - param.encode(bytes)?; - } - Self::NotImplemented { typ, param } => { - typ.encode(bytes)?; - bytes.extend_from_slice(param); - } + match version { + DapVersion::Draft09 => match self { + Self::Prio2 { dimension } => { + VDAF_TYPE_PRIO2.encode(bytes)?; + dimension.encode(bytes)?; + } + Self::Prio3SumVecField64MultiproofHmacSha256Aes128 { + length, + bits, + chunk_length, + num_proofs, + } => { + VDAF_TYPE_PRIO3_SUM_VEC_FIELD64_MULTIPROOF_HMAC_SHA256_AES128.encode(bytes)?; + length.encode(bytes)?; + bits.encode(bytes)?; + chunk_length.encode(bytes)?; + num_proofs.encode(bytes)?; + } + Self::Pine32HmacSha256Aes128 { param } => { + VDAF_TYPE_PINE_FIELD32_HMAC_SHA256_AES128.encode(bytes)?; + param.encode(bytes)?; + } + Self::Pine64HmacSha256Aes128 { param } => { + VDAF_TYPE_PINE_FIELD64_HMAC_SHA256_AES128.encode(bytes)?; + param.encode(bytes)?; + } + Self::NotImplemented { typ, param } => { + typ.encode(bytes)?; + bytes.extend_from_slice(param); + } + }, + DapVersion::Latest => match self { + Self::Prio2 { dimension } => { + VDAF_TYPE_PRIO2.encode(bytes)?; + encode_u16_prefixed(*version, bytes, |_version, bytes| { + dimension.encode(bytes)?; + Ok(()) + })?; + } + Self::Prio3SumVecField64MultiproofHmacSha256Aes128 { + length, + bits, + chunk_length, + num_proofs, + } => { + VDAF_TYPE_PRIO3_SUM_VEC_FIELD64_MULTIPROOF_HMAC_SHA256_AES128.encode(bytes)?; + encode_u16_prefixed(*version, bytes, |_version, bytes| { + length.encode(bytes)?; + bits.encode(bytes)?; + chunk_length.encode(bytes)?; + num_proofs.encode(bytes)?; + Ok(()) + })?; + } + Self::Pine32HmacSha256Aes128 { param } => { + VDAF_TYPE_PINE_FIELD32_HMAC_SHA256_AES128.encode(bytes)?; + encode_u16_prefixed(*version, bytes, |_version, bytes| { + param.encode(bytes)?; + Ok(()) + })?; + } + Self::Pine64HmacSha256Aes128 { param } => { + VDAF_TYPE_PINE_FIELD64_HMAC_SHA256_AES128.encode(bytes)?; + encode_u16_prefixed(*version, bytes, |_version, bytes| { + param.encode(bytes)?; + Ok(()) + })?; + } + Self::NotImplemented { typ, param } => { + typ.encode(bytes)?; + encode_u16_prefixed(*version, bytes, |_version, bytes| { + bytes.extend_from_slice(param); + Ok(()) + })?; + } + }, }; Ok(()) } } -impl ParameterizedDecode<(DapVersion, Option)> for VdafTypeVar { +impl ParameterizedDecode<(DapVersion, Option)> for VdafConfig { fn decode_with_param( - (_version, bytes_left): &(DapVersion, Option), + (version, bytes_left): &(DapVersion, Option), bytes: &mut Cursor<&[u8]>, ) -> Result { - let vdaf_type = u32::decode(bytes)?; - match (bytes_left, vdaf_type) { - (.., VDAF_TYPE_PRIO2) => Ok(Self::Prio2 { - dimension: u32::decode(bytes)?, - }), - (.., VDAF_TYPE_PRIO3_SUM_VEC_FIELD64_MULTIPROOF_HMAC_SHA256_AES128) => { - Ok(Self::Prio3SumVecField64MultiproofHmacSha256Aes128 { - length: u32::decode(bytes)?, - bits: u8::decode(bytes)?, - chunk_length: u32::decode(bytes)?, - num_proofs: u8::decode(bytes)?, - }) - } - (.., VDAF_TYPE_PINE_FIELD32_HMAC_SHA256_AES128) => Ok(Self::Pine32HmacSha256Aes128 { - param: PineParam::decode(bytes)?, - }), - (.., VDAF_TYPE_PINE_FIELD64_HMAC_SHA256_AES128) => Ok(Self::Pine64HmacSha256Aes128 { - param: PineParam::decode(bytes)?, - }), - (Some(bytes_left), ..) => { - let mut param = vec![0; bytes_left - 4]; - bytes.read_exact(&mut param)?; - Ok(Self::NotImplemented { - typ: vdaf_type, - param, - }) - } - (None, ..) => Err(CodecError::Other( - "cannot decode VdafConfig variant without knowing the length of the remainder" - .into(), - )), + match version { + DapVersion::Draft09 => match (bytes_left, u32::decode(bytes)?) { + (.., VDAF_TYPE_PRIO2) => Ok(Self::Prio2 { + dimension: u32::decode(bytes)?, + }), + (.., VDAF_TYPE_PRIO3_SUM_VEC_FIELD64_MULTIPROOF_HMAC_SHA256_AES128) => { + Ok(Self::Prio3SumVecField64MultiproofHmacSha256Aes128 { + length: u32::decode(bytes)?, + bits: u8::decode(bytes)?, + chunk_length: u32::decode(bytes)?, + num_proofs: u8::decode(bytes)?, + }) + } + (.., VDAF_TYPE_PINE_FIELD32_HMAC_SHA256_AES128) => { + Ok(Self::Pine32HmacSha256Aes128 { + param: PineParam::decode(bytes)?, + }) + } + (.., VDAF_TYPE_PINE_FIELD64_HMAC_SHA256_AES128) => { + Ok(Self::Pine64HmacSha256Aes128 { + param: PineParam::decode(bytes)?, + }) + } + (Some(bytes_left), typ) => { + let mut param = vec![0; bytes_left - 4]; + bytes.read_exact(&mut param)?; + Ok(Self::NotImplemented { typ, param }) + } + (None, ..) => Err(CodecError::Other( + "cannot decode VdafConfig variant without knowing the length of the remainder" + .into(), + )), + }, + DapVersion::Latest => match u32::decode(bytes)? { + VDAF_TYPE_PRIO2 => { + decode_u16_prefixed(*version, bytes, |_version, bytes, _bytes_left| { + Ok(Self::Prio2 { + dimension: u32::decode(bytes)?, + }) + }) + } + VDAF_TYPE_PRIO3_SUM_VEC_FIELD64_MULTIPROOF_HMAC_SHA256_AES128 => { + decode_u16_prefixed(*version, bytes, |_version, bytes, _bytes_left| { + Ok(Self::Prio3SumVecField64MultiproofHmacSha256Aes128 { + length: u32::decode(bytes)?, + bits: u8::decode(bytes)?, + chunk_length: u32::decode(bytes)?, + num_proofs: u8::decode(bytes)?, + }) + }) + } + VDAF_TYPE_PINE_FIELD32_HMAC_SHA256_AES128 => { + decode_u16_prefixed(*version, bytes, |_version, bytes, _bytes_left| { + Ok(Self::Pine32HmacSha256Aes128 { + param: PineParam::decode(bytes)?, + }) + }) + } + VDAF_TYPE_PINE_FIELD64_HMAC_SHA256_AES128 => { + decode_u16_prefixed(*version, bytes, |_version, bytes, _bytes_left| { + Ok(Self::Pine64HmacSha256Aes128 { + param: PineParam::decode(bytes)?, + }) + }) + } + typ => Ok(Self::NotImplemented { + typ, + param: decode_u16_bytes(bytes)?, + }), + }, } } } @@ -253,47 +339,6 @@ impl ParameterizedDecode<(DapVersion, Option)> for DpConfig { } } -/// A VDAF configuration, made up from a differential privacy configuration, -/// a VDAF type, and type-specific configuration. -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] -pub struct VdafConfig { - pub dp_config: DpConfig, - pub var: VdafTypeVar, -} - -impl ParameterizedEncode for VdafConfig { - fn encode_with_param( - &self, - version: &DapVersion, - bytes: &mut Vec, - ) -> Result<(), CodecError> { - encode_u16_prefixed(*version, bytes, |_version, inner| { - self.dp_config.encode(inner) - })?; - self.var.encode_with_param(version, bytes)?; - Ok(()) - } -} - -impl ParameterizedDecode<(DapVersion, Option)> for VdafConfig { - fn decode_with_param( - (version, bytes_left): &(DapVersion, Option), - bytes: &mut Cursor<&[u8]>, - ) -> Result { - let prefix_start = bytes.position(); - let dp_config = decode_u16_prefixed(*version, bytes, |version, inner, bytes_left| { - DpConfig::decode_with_param(&(version, bytes_left), inner) - })?; - let prefix_len = usize::try_from(bytes.position() - prefix_start) - .map_err(|e| CodecError::Other(e.into()))?; - let bytes_left = bytes_left.map(|l| l - prefix_len); - Ok(Self { - dp_config, - var: VdafTypeVar::decode_with_param(&(*version, bytes_left), bytes)?, - }) - } -} - /// A URL encode / decode helper struct, essentially a box for /// a `Vec`. #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] @@ -317,36 +362,15 @@ impl Decode for UrlBytes { /// A `QueryConfig` type and its associated task configuration data. #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] -pub enum BatchMode { +pub enum QueryConfig { TimeInterval, - LeaderSelected { max_batch_size: Option }, - NotImplemented { mode: u8, param: Vec }, -} - -/// A query configuration. -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] -pub struct QueryConfig { - pub time_precision: Duration, - pub max_batch_query_count: u16, - pub min_batch_size: u32, - pub batch_mode: BatchMode, -} - -impl QueryConfig { - fn encode_batch_mode(&self, bytes: &mut Vec) -> Result<(), CodecError> { - match &self.batch_mode { - BatchMode::TimeInterval => { - BATCH_MODE_TIME_INTERVAL.encode(bytes)?; - } - BatchMode::LeaderSelected { .. } => { - BATCH_MODE_LEADER_SELECTED.encode(bytes)?; - } - BatchMode::NotImplemented { mode, .. } => { - mode.encode(bytes)?; - } - }; - Ok(()) - } + LeaderSelected { + draft09_max_batch_size: Option, + }, + NotImplemented { + mode: u8, + param: Vec, + }, } impl ParameterizedEncode for QueryConfig { @@ -355,27 +379,49 @@ impl ParameterizedEncode for QueryConfig { version: &DapVersion, bytes: &mut Vec, ) -> Result<(), CodecError> { - self.time_precision.encode(bytes)?; - if *version == DapVersion::Draft09 { - self.max_batch_query_count.encode(bytes)?; + match version { + DapVersion::Draft09 => match self { + Self::TimeInterval => { + BATCH_MODE_TIME_INTERVAL.encode(bytes)?; + } + Self::LeaderSelected { + draft09_max_batch_size, + } => { + BATCH_MODE_LEADER_SELECTED.encode(bytes)?; + match draft09_max_batch_size { + Some(ref max_batch_size) => max_batch_size.get().encode(bytes)?, + None => 0_u32.encode(bytes)?, + } + } + Self::NotImplemented { mode, ref param } => { + mode.encode(bytes)?; + bytes.extend_from_slice(param); + } + }, + DapVersion::Latest => match self { + Self::TimeInterval => { + BATCH_MODE_TIME_INTERVAL.encode(bytes)?; + encode_u16_prefixed(*version, bytes, |_, _| Ok(()))?; + } + Self::LeaderSelected { + draft09_max_batch_size: None, + } => { + BATCH_MODE_LEADER_SELECTED.encode(bytes)?; + encode_u16_prefixed(*version, bytes, |_, _| Ok(()))?; + } + Self::LeaderSelected { + draft09_max_batch_size: Some(_), + } => { + return Err(CodecError::Other( + "expected max batch size to not be set".into(), + )) + } + Self::NotImplemented { mode, ref param } => { + mode.encode(bytes)?; + encode_u16_bytes(bytes, param)?; + } + }, } - self.min_batch_size.encode(bytes)?; - self.encode_batch_mode(bytes)?; - match &self.batch_mode { - BatchMode::TimeInterval => (), - BatchMode::LeaderSelected { max_batch_size } => { - match version { - DapVersion::Draft09 => match max_batch_size { - Some(x) => x.get().encode(bytes)?, - None => 0u32.encode(bytes)?, - }, - DapVersion::Latest => (), - }; - } - BatchMode::NotImplemented { mode: _, param } => { - bytes.extend_from_slice(param); - } - }; Ok(()) } } @@ -385,46 +431,97 @@ impl ParameterizedDecode<(DapVersion, Option)> for QueryConfig { (version, bytes_left): &(DapVersion, Option), bytes: &mut Cursor<&[u8]>, ) -> Result { - let time_precision = Duration::decode(bytes)?; - let max_batch_query_count = match version { - DapVersion::Draft09 => u16::decode(bytes)?, - DapVersion::Latest => 1, - }; - let fixed_size = match version { - DapVersion::Draft09 => 15, - DapVersion::Latest => 13, - }; - let min_batch_size = u32::decode(bytes)?; - let batch_mode_number = u8::decode(bytes)?; - let batch_mode = - match (bytes_left, batch_mode_number) { - (.., BATCH_MODE_TIME_INTERVAL) => BatchMode::TimeInterval, - (.., BATCH_MODE_LEADER_SELECTED) => BatchMode::LeaderSelected { - max_batch_size: match version { - DapVersion::Draft09 => NonZeroU32::new(u32::decode(bytes)?), - DapVersion::Latest => None, - }, - }, - (Some(bytes_left), ..) => { - let mut param = vec![0; bytes_left - fixed_size]; + match (version, bytes_left) { + (DapVersion::Draft09, Some(bytes_left)) => match u8::decode(bytes)? { + BATCH_MODE_TIME_INTERVAL => Ok(Self::TimeInterval), + BATCH_MODE_LEADER_SELECTED => Ok(Self::LeaderSelected { + draft09_max_batch_size: NonZeroU32::new(u32::decode(bytes)?), + }), + mode => { + let mut param = vec![0; *bytes_left - 1]; bytes.read_exact(&mut param)?; - - BatchMode::NotImplemented { - mode: batch_mode_number, - param, - } + Ok(Self::NotImplemented { mode, param }) } - (None, ..) => return Err(CodecError::Other( - "cannot decode QueryConfig variant without knowing the length of the remainder" - .into(), - )), - }; + }, + (DapVersion::Draft09, None) => Err(CodecError::Other( + "draft 09: can't decode query config without knowing the number of bytes remaining" + .into(), + )), + (DapVersion::Latest, _) => match u8::decode(bytes)? { + BATCH_MODE_TIME_INTERVAL => { + decode_u16_prefixed(*version, bytes, |_, _, _| Ok(()))?; + Ok(Self::TimeInterval) + } + BATCH_MODE_LEADER_SELECTED => { + decode_u16_prefixed(*version, bytes, |_, _, _| Ok(()))?; + Ok(Self::LeaderSelected { + draft09_max_batch_size: None, + }) + } + mode => Ok(Self::NotImplemented { + mode, + param: decode_u16_bytes(bytes)?, + }), + }, + } + } +} - Ok(Self { - time_precision, - max_batch_query_count, - min_batch_size, - batch_mode, +/// Task lifetime parameters. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub enum TaskLifetime { + Latest { + /// Task start time. + start: Time, + /// Task duration. + duration: Duration, + }, + /// draft09 compatibility: Previously the DAP task parameters (and thus Taskprov) only + /// expressed an end time and not a start time. + Draft09 { expiration: Time }, +} + +impl TaskLifetime { + pub(crate) fn from_validity_range( + version: DapVersion, + not_before: Time, + not_after: Time, + ) -> Self { + match version { + DapVersion::Draft09 => Self::Draft09 { + expiration: not_after, + }, + DapVersion::Latest => Self::Latest { + start: not_before, + duration: not_after - not_before, + }, + } + } +} + +/// Taskprov extensions. +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub enum TaskprovExtension { + NotImplemented { typ: u16, payload: Vec }, +} + +impl Encode for TaskprovExtension { + fn encode(&self, bytes: &mut Vec) -> Result<(), CodecError> { + match self { + Self::NotImplemented { typ, payload } => { + typ.encode(bytes)?; + encode_u16_bytes(bytes, payload)?; + } + }; + Ok(()) + } +} + +impl Decode for TaskprovExtension { + fn decode(bytes: &mut Cursor<&[u8]>) -> Result { + Ok(Self::NotImplemented { + typ: u16::decode(bytes)?, + payload: decode_u16_bytes(bytes)?, }) } } @@ -435,9 +532,14 @@ pub struct TaskprovAdvertisement { pub task_info: Vec, pub leader_url: UrlBytes, pub helper_url: UrlBytes, + pub time_precision: Duration, + pub min_batch_size: u32, pub query_config: QueryConfig, - pub task_expiration: Time, + pub lifetime: TaskLifetime, pub vdaf_config: VdafConfig, + pub extensions: Vec, + pub draft09_max_batch_query_count: Option, + pub draft09_dp_config: Option, } impl TaskprovAdvertisement { @@ -486,16 +588,60 @@ impl ParameterizedEncode for TaskprovAdvertisement { version: &DapVersion, bytes: &mut Vec, ) -> Result<(), CodecError> { - encode_u8_items(bytes, &(), &self.task_info)?; - self.leader_url.encode(bytes)?; - self.helper_url.encode(bytes)?; - encode_u16_prefixed(*version, bytes, |version, inner| { - self.query_config.encode_with_param(&version, inner) - })?; - self.task_expiration.encode(bytes)?; - encode_u16_prefixed(*version, bytes, |version, inner| { - self.vdaf_config.encode_with_param(&version, inner) - })?; + match version { + DapVersion::Draft09 => { + encode_u8_items(bytes, &(), &self.task_info)?; + self.leader_url.encode(bytes)?; + self.helper_url.encode(bytes)?; + encode_u16_prefixed(*version, bytes, |version, bytes| { + self.time_precision.encode(bytes)?; + let Some(max_batch_query_count) = self.draft09_max_batch_query_count else { + return Err(CodecError::Other( + "max batch query count should be set".into(), + )); + }; + max_batch_query_count.encode(bytes)?; + self.min_batch_size.encode(bytes)?; + self.query_config.encode_with_param(&version, bytes)?; + Ok(()) + })?; + let TaskLifetime::Draft09 { expiration } = self.lifetime else { + return Err(CodecError::Other("task expiration should be set".into())); + }; + expiration.encode(bytes)?; + encode_u16_prefixed(*version, bytes, |version, bytes| { + encode_u16_prefixed(version, bytes, |_version, bytes| { + let Some(ref dp_config) = self.draft09_dp_config else { + return Err(CodecError::Other("dp config should be set".into())); + }; + dp_config.encode(bytes)?; + Ok(()) + })?; + self.vdaf_config.encode_with_param(&version, bytes) + })?; + if !self.extensions.is_empty() { + return Err(CodecError::Other("extensions field should be empty".into())); + } + } + + DapVersion::Latest => { + encode_u8_items(bytes, &(), &self.task_info)?; + self.leader_url.encode(bytes)?; + self.helper_url.encode(bytes)?; + self.time_precision.encode(bytes)?; + self.min_batch_size.encode(bytes)?; + self.query_config.encode_with_param(version, bytes)?; + let TaskLifetime::Latest { start, duration } = self.lifetime else { + return Err(CodecError::Other( + "task start time and duration should be set".into(), + )); + }; + start.encode(bytes)?; + duration.encode(bytes)?; + self.vdaf_config.encode_with_param(version, bytes)?; + encode_u16_items(bytes, &(), &self.extensions)?; + } + } Ok(()) } } @@ -505,35 +651,96 @@ impl ParameterizedDecode for TaskprovAdvertisement { version: &DapVersion, bytes: &mut Cursor<&[u8]>, ) -> Result { - let task_info = decode_u8_items(&(), bytes)?; - let leader_url = UrlBytes::decode(bytes)?; - let helper_url = UrlBytes::decode(bytes)?; - let query_config = decode_u16_prefixed(*version, bytes, |version, inner, len| { - // We need to know the length of the `QueryConfig` in order to decode variants we don't - // recognize. Likewise for `VdafConfig` below. - // - // Ideally the message can be decoded without knowing the length of the remainder. This - // is not possible because of taskprov's choice to prefix the `QueryConfig` with its - // length, rather than prefix the variant part (everything after the "select"). We - // could modify taskprov so that the length prefix immediately precedes the bits that - // we don't know how to parse. This would be consistent with other protocols that use - // TLS syntax. We could also consider dropping TLS syntax in the DAP spec in favor of a - // format that is better at being self-describing. - QueryConfig::decode_with_param(&(version, len), inner) - })?; - let task_expiration = Time::decode(bytes)?; - let vdaf_config = decode_u16_prefixed(*version, bytes, |version, inner, len| { - VdafConfig::decode_with_param(&(version, len), inner) - })?; + match version { + DapVersion::Draft09 => { + let task_info = decode_u8_items(&(), bytes)?; + let leader_url = UrlBytes::decode(bytes)?; + let helper_url = UrlBytes::decode(bytes)?; + let (time_precision, draft09_max_batch_query_count, min_batch_size, query_config) = + decode_u16_prefixed(*version, bytes, |version, bytes, mut bytes_left| { + let time_precision = Time::decode(bytes)?; + let max_batch_query_count = u16::decode(bytes)?; + let min_batch_size = u32::decode(bytes)?; + // We need to know the length of the `QueryConfig` in order to decode variants we don't + // recognize. Likewise for `VdafConfig` below. + // + // Ideally the message can be decoded without knowing the length of the remainder. This + // is not possible because of taskprov's choice to prefix the `QueryConfig` with its + // length, rather than prefix the variant part (everything after the "select"). We + // could modify taskprov so that the length prefix immediately precedes the bits that + // we don't know how to parse. This would be consistent with other protocols that use + // TLS syntax. We could also consider dropping TLS syntax in the DAP spec in favor of a + // format that is better at being self-describing. + bytes_left = bytes_left.map(|l| l - 14); + let query_config = + QueryConfig::decode_with_param(&(version, bytes_left), bytes)?; + Ok(( + time_precision, + Some(max_batch_query_count), + min_batch_size, + query_config, + )) + })?; + let lifetime = TaskLifetime::Draft09 { + expiration: Time::decode(bytes)?, + }; + let (draft09_dp_config, vdaf_config) = + decode_u16_prefixed(*version, bytes, |version, bytes, mut bytes_left| { + let dp_config_bytes = decode_u16_bytes(bytes)?; + let dp_config = DpConfig::get_decoded_with_param( + &(version, Some(dp_config_bytes.len())), + &dp_config_bytes, + )?; + bytes_left = bytes_left.map(|len| len - dp_config_bytes.len()); + let vdaf_config = + VdafConfig::decode_with_param(&(version, bytes_left), bytes)?; + Ok((Some(dp_config), vdaf_config)) + })?; + + Ok(TaskprovAdvertisement { + task_info, + leader_url, + helper_url, + time_precision, + min_batch_size, + query_config, + lifetime, + vdaf_config, + extensions: Vec::new(), + draft09_dp_config, + draft09_max_batch_query_count, + }) + } - Ok(TaskprovAdvertisement { - task_info, - leader_url, - helper_url, - query_config, - task_expiration, - vdaf_config, - }) + DapVersion::Latest => { + let task_info = decode_u8_items(&(), bytes)?; + let leader_url = UrlBytes::decode(bytes)?; + let helper_url = UrlBytes::decode(bytes)?; + let time_precision = Duration::decode(bytes)?; + let min_batch_size = u32::decode(bytes)?; + let query_config = QueryConfig::decode_with_param(&(*version, None), bytes)?; + let lifetime = TaskLifetime::Latest { + start: Time::decode(bytes)?, + duration: Duration::decode(bytes)?, + }; + let vdaf_config = VdafConfig::decode_with_param(&(*version, None), bytes)?; + let extensions = decode_u16_items(&(), bytes)?; + + Ok(TaskprovAdvertisement { + task_info, + leader_url, + helper_url, + time_precision, + min_batch_size, + query_config, + lifetime, + vdaf_config, + extensions, + draft09_dp_config: None, + draft09_max_batch_query_count: None, + }) + } + } } } @@ -552,24 +759,36 @@ mod tests { helper_url: UrlBytes { bytes: b"https://someservice.cloudflareresearch.com".to_vec(), }, - query_config: QueryConfig { - time_precision: 12_341_234, - max_batch_query_count: match version { - DapVersion::Draft09 => 1337, - DapVersion::Latest => 1, + time_precision: 12_341_234, + min_batch_size: 55, + query_config: QueryConfig::LeaderSelected { + draft09_max_batch_size: match version { + DapVersion::Draft09 => Some(NonZeroU32::new(57).unwrap()), + DapVersion::Latest => None, }, - min_batch_size: 55, - batch_mode: BatchMode::LeaderSelected { - max_batch_size: match version { - DapVersion::Draft09 => Some(NonZeroU32::new(57).unwrap()), - DapVersion::Latest => None, + }, + lifetime: TaskLifetime::from_validity_range(version, 23_232_232_232, 23_232_232_232), + vdaf_config: VdafConfig::Prio2 { dimension: 99_999 }, + extensions: match version { + DapVersion::Latest => vec![ + TaskprovExtension::NotImplemented { + typ: 1337, + payload: b"collecter.com".to_vec(), }, - }, + TaskprovExtension::NotImplemented { + typ: 42, + payload: b"hey, don't forget the differential privacy!!".to_vec(), + }, + ], + DapVersion::Draft09 => Vec::new(), }, - task_expiration: 23_232_232_232, - vdaf_config: VdafConfig { - dp_config: DpConfig::None, - var: VdafTypeVar::Prio2 { dimension: 99_999 }, + draft09_max_batch_query_count: match version { + DapVersion::Draft09 => Some(1337), + DapVersion::Latest => None, + }, + draft09_dp_config: match version { + DapVersion::Draft09 => Some(DpConfig::None), + DapVersion::Latest => None, }, }; println!("want {:?}", want.get_encoded_with_param(&version).unwrap()); @@ -591,8 +810,12 @@ mod tests { 101, 46, 99, 111, 109, 47, 118, 48, 50, 0, 42, 104, 116, 116, 112, 115, 58, 47, 47, 115, 111, 109, 101, 115, 101, 114, 118, 105, 99, 101, 46, 99, 108, 111, 117, 100, 102, 108, 97, 114, 101, 114, 101, 115, 101, 97, 114, 99, 104, 46, 99, 111, 109, 0, - 13, 0, 0, 0, 0, 0, 188, 79, 242, 0, 0, 0, 55, 2, 0, 0, 0, 5, 104, 191, 187, 40, 0, - 11, 0, 1, 1, 255, 255, 0, 0, 0, 1, 134, 159, + 0, 0, 0, 0, 188, 79, 242, 0, 0, 0, 55, 2, 0, 0, 0, 0, 0, 5, 104, 191, 187, 40, 0, + 0, 0, 0, 0, 0, 0, 0, 255, 255, 0, 0, 0, 4, 0, 1, 134, 159, 0, 65, 5, 57, 0, 13, 99, + 111, 108, 108, 101, 99, 116, 101, 114, 46, 99, 111, 109, 0, 42, 0, 44, 104, 101, + 121, 44, 32, 100, 111, 110, 39, 116, 32, 102, 111, 114, 103, 101, 116, 32, 116, + 104, 101, 32, 100, 105, 102, 102, 101, 114, 101, 110, 116, 105, 97, 108, 32, 112, + 114, 105, 118, 97, 99, 121, 33, 33, ] .as_slice(), }; @@ -604,15 +827,7 @@ mod tests { test_versions! { read_task_config } fn roundtrip_query_config(version: DapVersion) { - let query_config = QueryConfig { - time_precision: 12_345_678, - max_batch_query_count: match version { - DapVersion::Draft09 => 1337, - DapVersion::Latest => 1, - }, - min_batch_size: 12_345_678, - batch_mode: BatchMode::TimeInterval, - }; + let query_config = QueryConfig::TimeInterval; let encoded = query_config.get_encoded_with_param(&version).unwrap(); assert_eq!( @@ -620,18 +835,10 @@ mod tests { query_config ); - let query_config = QueryConfig { - time_precision: 12_345_678, - max_batch_query_count: match version { - DapVersion::Draft09 => 1337, - DapVersion::Latest => 1, - }, - min_batch_size: 12_345_678, - batch_mode: BatchMode::LeaderSelected { - max_batch_size: match version { - DapVersion::Draft09 => Some(NonZeroU32::new(12_345_678).unwrap()), - DapVersion::Latest => None, - }, + let query_config = QueryConfig::LeaderSelected { + draft09_max_batch_size: match version { + DapVersion::Draft09 => Some(NonZeroU32::new(12_345_678).unwrap()), + DapVersion::Latest => None, }, }; let encoded = query_config.get_encoded_with_param(&version).unwrap(); @@ -645,14 +852,9 @@ mod tests { test_versions! { roundtrip_query_config } fn roundtrip_query_config_not_implemented(version: DapVersion) { - let query_config = QueryConfig { - time_precision: 12_345_678, - max_batch_query_count: 1, - min_batch_size: 12_345_678, - batch_mode: BatchMode::NotImplemented { - mode: 0, - param: b"query config param".to_vec(), - }, + let query_config = QueryConfig::NotImplemented { + mode: 0, + param: b"query config param".to_vec(), }; let encoded = query_config.get_encoded_with_param(&version).unwrap(); @@ -693,10 +895,7 @@ mod tests { test_versions! { roundtrip_dp_config_not_implemented } fn roundtrip_vdaf_config_prio2(version: DapVersion) { - let vdaf_config = VdafConfig { - dp_config: DpConfig::None, - var: VdafTypeVar::Prio2 { dimension: 1337 }, - }; + let vdaf_config = VdafConfig::Prio2 { dimension: 1337 }; assert_eq!( VdafConfig::get_decoded_with_param( &(version, None), @@ -712,14 +911,11 @@ mod tests { fn roundtrip_vdaf_config_prio3_sum_vec_field64_multiproof_hmac_sha256_aes128( version: DapVersion, ) { - let vdaf_config = VdafConfig { - dp_config: DpConfig::None, - var: VdafTypeVar::Prio3SumVecField64MultiproofHmacSha256Aes128 { - bits: 23, - length: 1337, - chunk_length: 42, - num_proofs: 99, - }, + let vdaf_config = VdafConfig::Prio3SumVecField64MultiproofHmacSha256Aes128 { + bits: 23, + length: 1337, + chunk_length: 42, + num_proofs: 99, }; let encoded = vdaf_config.get_encoded_with_param(&version).unwrap(); @@ -732,20 +928,17 @@ mod tests { test_versions! { roundtrip_vdaf_config_prio3_sum_vec_field64_multiproof_hmac_sha256_aes128 } fn roundtrip_vdaf_config_pine32_hmac_sha256_aes128(version: DapVersion) { - let vdaf_config = VdafConfig { - dp_config: DpConfig::None, - var: VdafTypeVar::Pine32HmacSha256Aes128 { - param: PineParam { - norm_bound: 1337, - dimension: 1_000_000, - frac_bits: 15, - chunk_len: 999, - chunk_len_sq_norm_equal: 1400, - num_proofs: 15, - num_proofs_sq_norm_equal: 1, - num_wr_tests: 50, - num_wr_successes: 17, - }, + let vdaf_config = VdafConfig::Pine32HmacSha256Aes128 { + param: PineParam { + norm_bound: 1337, + dimension: 1_000_000, + frac_bits: 15, + chunk_len: 999, + chunk_len_sq_norm_equal: 1400, + num_proofs: 15, + num_proofs_sq_norm_equal: 1, + num_wr_tests: 50, + num_wr_successes: 17, }, }; let encoded = vdaf_config.get_encoded_with_param(&version).unwrap(); @@ -759,20 +952,17 @@ mod tests { test_versions! { roundtrip_vdaf_config_pine32_hmac_sha256_aes128 } fn roundtrip_vdaf_config_pine64_hmac_sha256_aes128(version: DapVersion) { - let vdaf_config = VdafConfig { - dp_config: DpConfig::None, - var: VdafTypeVar::Pine64HmacSha256Aes128 { - param: PineParam { - norm_bound: 1337, - dimension: 1_000_000, - frac_bits: 15, - chunk_len: 999, - chunk_len_sq_norm_equal: 1400, - num_proofs: 15, - num_proofs_sq_norm_equal: 17, - num_wr_tests: 50, - num_wr_successes: 17, - }, + let vdaf_config = VdafConfig::Pine64HmacSha256Aes128 { + param: PineParam { + norm_bound: 1337, + dimension: 1_000_000, + frac_bits: 15, + chunk_len: 999, + chunk_len_sq_norm_equal: 1400, + num_proofs: 15, + num_proofs_sq_norm_equal: 17, + num_wr_tests: 50, + num_wr_successes: 17, }, }; let encoded = vdaf_config.get_encoded_with_param(&version).unwrap(); @@ -794,24 +984,25 @@ mod tests { helper_url: UrlBytes { bytes: b"https://someservice.cloudflareresearch.com".to_vec(), }, - query_config: QueryConfig { - time_precision: 12_341_234, - max_batch_query_count: match version { - DapVersion::Draft09 => 1337, - DapVersion::Latest => 1, - }, - min_batch_size: 55, - batch_mode: BatchMode::LeaderSelected { - max_batch_size: match version { - DapVersion::Draft09 => Some(NonZeroU32::new(57).unwrap()), - DapVersion::Latest => None, - }, + time_precision: 12_341_234, + min_batch_size: 55, + query_config: QueryConfig::LeaderSelected { + draft09_max_batch_size: match version { + DapVersion::Draft09 => Some(NonZeroU32::new(57).unwrap()), + DapVersion::Latest => None, }, }, - task_expiration: 23_232_232_232, - vdaf_config: VdafConfig { - dp_config: DpConfig::None, - var: VdafTypeVar::Prio2 { dimension: 99_999 }, + lifetime: TaskLifetime::from_validity_range(version, 23_232_232_232, 23_232_232_232), + vdaf_config: VdafConfig::Prio2 { dimension: 99_999 }, + extensions: Vec::new(), + draft09_max_batch_query_count: match version { + DapVersion::Draft09 => Some(1337), + DapVersion::Latest => None, + }, + + draft09_dp_config: match version { + DapVersion::Draft09 => Some(DpConfig::None), + DapVersion::Latest => None, }, }; @@ -831,12 +1022,9 @@ mod tests { test_versions! { roundtrip_taskprov_advertisement } fn roundtrip_vdaf_config_not_implemented(version: DapVersion) { - let vdaf_config = VdafConfig { - dp_config: DpConfig::None, - var: VdafTypeVar::NotImplemented { - typ: 1337, - param: b"vdaf type param".to_vec(), - }, + let vdaf_config = VdafConfig::NotImplemented { + typ: 1337, + param: b"vdaf type param".to_vec(), }; let encoded = vdaf_config.get_encoded_with_param(&version).unwrap(); @@ -857,24 +1045,24 @@ mod tests { helper_url: UrlBytes { bytes: b"https://someservice.cloudflareresearch.com".to_vec(), }, - query_config: QueryConfig { - time_precision: 12_341_234, - max_batch_query_count: match version { - DapVersion::Draft09 => 1337, - DapVersion::Latest => 1, - }, - min_batch_size: 55, - batch_mode: BatchMode::LeaderSelected { - max_batch_size: match version { - DapVersion::Draft09 => Some(NonZeroU32::new(57).unwrap()), - DapVersion::Latest => None, - }, + time_precision: 12_341_234, + min_batch_size: 55, + query_config: QueryConfig::LeaderSelected { + draft09_max_batch_size: match version { + DapVersion::Draft09 => Some(NonZeroU32::new(57).unwrap()), + DapVersion::Latest => None, }, }, - task_expiration: 23_232_232_232, - vdaf_config: VdafConfig { - dp_config: DpConfig::None, - var: VdafTypeVar::Prio2 { dimension: 99_999 }, + lifetime: TaskLifetime::from_validity_range(version, 23_232_232_232, 23_232_232_232), + vdaf_config: VdafConfig::Prio2 { dimension: 99_999 }, + extensions: Vec::new(), + draft09_max_batch_query_count: match version { + DapVersion::Draft09 => Some(1337), + DapVersion::Latest => None, + }, + draft09_dp_config: match version { + DapVersion::Draft09 => Some(DpConfig::None), + DapVersion::Latest => None, }, }; diff --git a/crates/daphne/src/roles/mod.rs b/crates/daphne/src/roles/mod.rs index 56c3df98..ecc27711 100644 --- a/crates/daphne/src/roles/mod.rs +++ b/crates/daphne/src/roles/mod.rs @@ -243,7 +243,10 @@ mod test { not_after: now + Self::TASK_TIME_PRECISION, min_batch_size: 1, query: DapBatchMode::LeaderSelected { - max_batch_size: Some(NonZeroU32::new(2).unwrap()), + draft09_max_batch_size: match version { + DapVersion::Draft09 => Some(NonZeroU32::new(2).unwrap()), + DapVersion::Latest => None, + }, }, vdaf: vdaf_config, vdaf_verify_key: vdaf_config.gen_verify_key(), @@ -1451,7 +1454,10 @@ mod test { version, min_batch_size: 1, query: DapBatchMode::LeaderSelected { - max_batch_size: Some(NonZeroU32::new(2).unwrap()), + draft09_max_batch_size: match version { + DapVersion::Draft09 => Some(NonZeroU32::new(2).unwrap()), + DapVersion::Latest => None, + }, }, vdaf: vdaf_config, ..Default::default() diff --git a/crates/daphne/src/taskprov.rs b/crates/daphne/src/taskprov.rs index cb5a0081..247df18a 100644 --- a/crates/daphne/src/taskprov.rs +++ b/crates/daphne/src/taskprov.rs @@ -11,11 +11,7 @@ use crate::{ error::DapAbort, fatal_error, hpke::HpkeConfig, - messages::{ - self, - taskprov::{BatchMode, TaskprovAdvertisement, VdafTypeVar}, - Duration, TaskId, Time, - }, + messages::{self, taskprov::TaskprovAdvertisement, Duration, TaskId, Time}, roles::aggregator::TaskprovConfig, vdaf::VdafVerifyKey, DapBatchMode, DapError, DapTaskConfig, DapTaskConfigMethod, DapVersion, VdafConfig, @@ -117,16 +113,23 @@ fn url_from_bytes(task_id: &TaskId, url_bytes: &[u8]) -> Result { } impl DapBatchMode { - fn try_from_taskprov_advertisement(task_id: &TaskId, var: BatchMode) -> Result { + fn try_from_taskprov_advertisement( + task_id: &TaskId, + var: messages::taskprov::QueryConfig, + ) -> Result { match var { - BatchMode::LeaderSelected { max_batch_size } => { - Ok(DapBatchMode::LeaderSelected { max_batch_size }) - } - BatchMode::TimeInterval => Ok(DapBatchMode::TimeInterval), - BatchMode::NotImplemented { mode, .. } => Err(DapAbort::InvalidTask { - detail: format!("unimplemented batch mode ({mode})"), - task_id: *task_id, + messages::taskprov::QueryConfig::LeaderSelected { + draft09_max_batch_size, + } => Ok(DapBatchMode::LeaderSelected { + draft09_max_batch_size, }), + messages::taskprov::QueryConfig::TimeInterval => Ok(DapBatchMode::TimeInterval), + messages::taskprov::QueryConfig::NotImplemented { mode, .. } => { + Err(DapAbort::InvalidTask { + detail: format!("unimplemented batch mode ({mode})"), + task_id: *task_id, + }) + } } } } @@ -166,12 +169,12 @@ impl VdafConfig { fn try_from_taskprov_advertisement( task_id: &TaskId, version: DapVersion, - var: VdafTypeVar, + vdaf_config: messages::taskprov::VdafConfig, ) -> Result { const PRIO3_MAX_PROOFS: u8 = 3; - match (version, var) { - (_, VdafTypeVar::Prio2 { dimension }) => Ok(VdafConfig::Prio2 { + match (version, vdaf_config) { + (_, messages::taskprov::VdafConfig::Prio2 { dimension }) => Ok(VdafConfig::Prio2 { dimension: dimension.try_into().map_err(|_| DapAbort::InvalidTask { detail: "dimension is larger than the system's word size".to_string(), task_id: *task_id, @@ -179,7 +182,7 @@ impl VdafConfig { }), ( DapVersion::Draft09, - VdafTypeVar::Prio3SumVecField64MultiproofHmacSha256Aes128 { + messages::taskprov::VdafConfig::Prio3SumVecField64MultiproofHmacSha256Aes128 { bits, length, chunk_length, @@ -212,7 +215,10 @@ impl VdafConfig { }, )) } - (DapVersion::Draft09, VdafTypeVar::Pine32HmacSha256Aes128 { param }) => { + ( + DapVersion::Draft09, + messages::taskprov::VdafConfig::Pine32HmacSha256Aes128 { param }, + ) => { if let Err(e) = pine32_hmac_sha256_aes128(¶m) { Err(DapAbort::InvalidTask { detail: format!("invalid parameters for Pine32: {e}"), @@ -229,7 +235,10 @@ impl VdafConfig { })) } } - (DapVersion::Draft09, VdafTypeVar::Pine64HmacSha256Aes128 { param }) => { + ( + DapVersion::Draft09, + messages::taskprov::VdafConfig::Pine64HmacSha256Aes128 { param }, + ) => { if let Err(e) = pine64_hmac_sha256_aes128(¶m) { Err(DapAbort::InvalidTask { detail: format!("invalid parameters for Pine64: {e}"), @@ -246,10 +255,12 @@ impl VdafConfig { })) } } - (_, VdafTypeVar::NotImplemented { typ, .. }) => Err(DapAbort::InvalidTask { - detail: format!("unimplemented VDAF type ({typ})"), - task_id: *task_id, - }), + (_, messages::taskprov::VdafConfig::NotImplemented { typ, .. }) => { + Err(DapAbort::InvalidTask { + detail: format!("unimplemented VDAF type ({typ})"), + task_id: *task_id, + }) + } (_, _) => Err(DapAbort::InvalidTask { detail: format!("VDAF not supported in {version}"), task_id: *task_id, @@ -283,11 +294,21 @@ pub struct DapTaskConfigNeedsOptIn { pub(crate) collector_hpke_config: HpkeConfig, pub(crate) method: DapTaskConfigMethod, - /// The time at which the task expires. - pub task_expiration: Time, + /// Lifetime of the task. + /// + /// draft09: Only the expiration date is conveyed by the taskprov advertisement. + pub lifetime: messages::taskprov::TaskLifetime, } impl DapTaskConfigNeedsOptIn { + /// Return the time after which the task is no longer valid. + pub fn not_after(&self) -> Time { + match self.lifetime { + messages::taskprov::TaskLifetime::Draft09 { expiration } => expiration, + messages::taskprov::TaskLifetime::Latest { start, duration } => start + duration, + } + } + pub(crate) fn try_from_taskprov_advertisement( version: DapVersion, task_id: &TaskId, @@ -295,11 +316,14 @@ impl DapTaskConfigNeedsOptIn { taskprov_config: TaskprovConfig<'_>, ) -> Result { // Only one query per batch is currently supported. - if taskprov_advertisement.query_config.max_batch_query_count != 1 { + if !matches!( + taskprov_advertisement.draft09_max_batch_query_count, + None | Some(1) + ) { return Err(DapAbort::InvalidTask { detail: format!( - "unsupported max batch query count {}", - taskprov_advertisement.query_config.max_batch_query_count + "unsupported max batch query count {:?}", + taskprov_advertisement.draft09_max_batch_query_count ), task_id: *task_id, }); @@ -308,7 +332,7 @@ impl DapTaskConfigNeedsOptIn { let vdaf = VdafConfig::try_from_taskprov_advertisement( task_id, version, - taskprov_advertisement.vdaf_config.var, + taskprov_advertisement.vdaf_config, )?; let vdaf_verify_key = compute_vdaf_verify_key(version, taskprov_config.vdaf_verify_key_init, task_id); @@ -316,12 +340,12 @@ impl DapTaskConfigNeedsOptIn { version, leader_url: url_from_bytes(task_id, &taskprov_advertisement.leader_url.bytes)?, helper_url: url_from_bytes(task_id, &taskprov_advertisement.helper_url.bytes)?, - time_precision: taskprov_advertisement.query_config.time_precision, - task_expiration: taskprov_advertisement.task_expiration, - min_batch_size: taskprov_advertisement.query_config.min_batch_size.into(), + time_precision: taskprov_advertisement.time_precision, + lifetime: taskprov_advertisement.lifetime, + min_batch_size: taskprov_advertisement.min_batch_size.into(), query: DapBatchMode::try_from_taskprov_advertisement( task_id, - taskprov_advertisement.query_config.batch_mode, + taskprov_advertisement.query_config, )?, vdaf, vdaf_verify_key, @@ -334,6 +358,17 @@ impl DapTaskConfigNeedsOptIn { /// Complete configuration of a task via taskprov using the supplied parameters. pub fn into_opted_in(self, param: &OptInParam) -> DapTaskConfig { + let (not_before, not_after) = match self.lifetime { + messages::taskprov::TaskLifetime::Latest { start, duration } => { + (start, start.saturating_add(duration)) + } + // draft09 compatibility: Previously the task start time was not conveyed by the + // taskprov advertisement, so we need to get this value from the opt-in parameters. + messages::taskprov::TaskLifetime::Draft09 { expiration } => { + (param.not_before, expiration) + } + }; + DapTaskConfig { version: self.version, leader_url: self.leader_url, @@ -342,8 +377,8 @@ impl DapTaskConfigNeedsOptIn { min_batch_size: self.min_batch_size, query: self.query, vdaf: self.vdaf, - not_before: param.not_before, - not_after: self.task_expiration, + not_before, + not_after, vdaf_verify_key: self.vdaf_verify_key, collector_hpke_config: self.collector_hpke_config, method: self.method, @@ -352,22 +387,22 @@ impl DapTaskConfigNeedsOptIn { } } -impl TryFrom<&DapBatchMode> for messages::taskprov::BatchMode { +impl TryFrom<&DapBatchMode> for messages::taskprov::QueryConfig { type Error = DapError; fn try_from(query_config: &DapBatchMode) -> Result { Ok(match query_config { - DapBatchMode::TimeInterval => messages::taskprov::BatchMode::TimeInterval, - DapBatchMode::LeaderSelected { max_batch_size } => { - messages::taskprov::BatchMode::LeaderSelected { - max_batch_size: *max_batch_size, - } - } + DapBatchMode::TimeInterval => messages::taskprov::QueryConfig::TimeInterval, + DapBatchMode::LeaderSelected { + draft09_max_batch_size, + } => messages::taskprov::QueryConfig::LeaderSelected { + draft09_max_batch_size: *draft09_max_batch_size, + }, }) } } -impl TryFrom<&VdafConfig> for messages::taskprov::VdafTypeVar { +impl TryFrom<&VdafConfig> for messages::taskprov::VdafConfig { type Error = DapError; fn try_from(vdaf_config: &VdafConfig) -> Result { @@ -428,18 +463,26 @@ impl TryFrom<&DapTaskConfig> for messages::taskprov::TaskprovAdvertisement { helper_url: messages::taskprov::UrlBytes { bytes: task_config.helper_url.to_string().into_bytes(), }, - query_config: messages::taskprov::QueryConfig { - time_precision: task_config.time_precision, - min_batch_size: task_config.min_batch_size.try_into().map_err(|_| { - fatal_error!(err = "task min batch size is too large for taskprov") - })?, - max_batch_query_count: 1, - batch_mode: (&task_config.query).try_into()?, + time_precision: task_config.time_precision, + min_batch_size: task_config + .min_batch_size + .try_into() + .map_err(|_| fatal_error!(err = "task min batch size is too large for taskprov"))?, + query_config: (&task_config.query).try_into()?, + lifetime: messages::taskprov::TaskLifetime::from_validity_range( + task_config.version, + task_config.not_before, + task_config.not_after, + ), + vdaf_config: (&task_config.vdaf).try_into()?, + extensions: Vec::new(), + draft09_max_batch_query_count: match task_config.version { + DapVersion::Draft09 => Some(1), + DapVersion::Latest => None, }, - task_expiration: task_config.not_after, - vdaf_config: messages::taskprov::VdafConfig { - dp_config: messages::taskprov::DpConfig::None, - var: (&task_config.vdaf).try_into()?, + draft09_dp_config: match task_config.version { + DapVersion::Draft09 => Some(messages::taskprov::DpConfig::None), + DapVersion::Latest => None, }, }) } @@ -472,18 +515,24 @@ mod test { helper_url: messages::taskprov::UrlBytes { bytes: b"http://helper.org:8788/".to_vec(), }, - query_config: messages::taskprov::QueryConfig { - time_precision: 3600, - max_batch_query_count: 1, - min_batch_size: 1, - batch_mode: messages::taskprov::BatchMode::LeaderSelected { - max_batch_size: Some(NonZeroU32::new(2).unwrap()), + time_precision: 3600, + min_batch_size: 1, + query_config: messages::taskprov::QueryConfig::LeaderSelected { + draft09_max_batch_size: match version { + DapVersion::Draft09 => Some(NonZeroU32::new(2).unwrap()), + DapVersion::Latest => None, }, }, - task_expiration: 1337, - vdaf_config: messages::taskprov::VdafConfig { - dp_config: messages::taskprov::DpConfig::None, - var: messages::taskprov::VdafTypeVar::Prio2 { dimension: 10 }, + lifetime: messages::taskprov::TaskLifetime::from_validity_range(version, 1337, 1337), + vdaf_config: messages::taskprov::VdafConfig::Prio2 { dimension: 10 }, + extensions: Vec::new(), + draft09_max_batch_query_count: match version { + DapVersion::Draft09 => Some(1), + DapVersion::Latest => None, + }, + draft09_dp_config: match version { + DapVersion::Draft09 => Some(messages::taskprov::DpConfig::None), + DapVersion::Latest => None, }, }; @@ -551,22 +600,28 @@ mod test { helper_url: messages::taskprov::UrlBytes { bytes: b"http://helper.org:8788/".to_vec(), }, - query_config: messages::taskprov::QueryConfig { - time_precision: 3600, - max_batch_query_count: 1, - min_batch_size: 1, - batch_mode: messages::taskprov::BatchMode::LeaderSelected { - max_batch_size: Some(NonZeroU32::new(2).unwrap()), + time_precision: 3600, + min_batch_size: 1, + query_config: messages::taskprov::QueryConfig::LeaderSelected { + draft09_max_batch_size: match version { + DapVersion::Draft09 => Some(NonZeroU32::new(2).unwrap()), + DapVersion::Latest => None, }, }, - task_expiration: 0, - vdaf_config: messages::taskprov::VdafConfig { - dp_config: messages::taskprov::DpConfig::None, - // unrecognized VDAF - var: messages::taskprov::VdafTypeVar::NotImplemented { - typ: 1337, - param: b"vdaf type param".to_vec(), - }, + lifetime: messages::taskprov::TaskLifetime::from_validity_range(version, 0, 0), + // unrecognized VDAF + vdaf_config: messages::taskprov::VdafConfig::NotImplemented { + typ: 1337, + param: b"vdaf type param".to_vec(), + }, + extensions: Vec::new(), + draft09_max_batch_query_count: match version { + DapVersion::Draft09 => Some(1), + DapVersion::Latest => None, + }, + draft09_dp_config: match version { + DapVersion::Draft09 => Some(messages::taskprov::DpConfig::None), + DapVersion::Latest => None, }, }; let task_id = { @@ -618,21 +673,27 @@ mod test { helper_url: messages::taskprov::UrlBytes { bytes: b"http://helper.org:8788/".to_vec(), }, - query_config: messages::taskprov::QueryConfig { - time_precision: 3600, - max_batch_query_count: 1, - min_batch_size: 1, - batch_mode: messages::taskprov::BatchMode::LeaderSelected { - max_batch_size: Some(NonZeroU32::new(2).unwrap()), + time_precision: 3600, + min_batch_size: 1, + query_config: messages::taskprov::QueryConfig::LeaderSelected { + draft09_max_batch_size: match version { + DapVersion::Draft09 => Some(NonZeroU32::new(2).unwrap()), + DapVersion::Latest => None, }, }, - task_expiration: 0, - vdaf_config: messages::taskprov::VdafConfig { - dp_config: messages::taskprov::DpConfig::NotImplemented { + lifetime: messages::taskprov::TaskLifetime::from_validity_range(version, 0, 0), + vdaf_config: messages::taskprov::VdafConfig::Prio2 { dimension: 1337 }, + extensions: Vec::new(), + draft09_max_batch_query_count: match version { + DapVersion::Draft09 => Some(1), + DapVersion::Latest => None, + }, + draft09_dp_config: match version { + DapVersion::Draft09 => Some(messages::taskprov::DpConfig::NotImplemented { typ: 99, param: b"Just, do it!".to_vec(), - }, - var: messages::taskprov::VdafTypeVar::Prio2 { dimension: 1337 }, + }), + DapVersion::Latest => None, }, }; let task_id = {