Skip to content

Commit

Permalink
Update taskprov advertisement encoding for latest draft
Browse files Browse the repository at this point in the history
  • Loading branch information
cjpatton committed Jan 15, 2025
1 parent b01c4e4 commit 053e895
Show file tree
Hide file tree
Showing 15 changed files with 785 additions and 552 deletions.
5 changes: 4 additions & 1 deletion crates/dapf/src/acceptance/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,10 @@ impl Test {
lifetime: 60,
min_batch_size: reports_per_batch.try_into().unwrap(),
query: DapBatchMode::LeaderSelected {
max_batch_size: NonZeroU32::new(reports_per_batch.try_into().unwrap()),
draft09_max_batch_size: match version {
DapVersion::Draft09 => NonZeroU32::new(reports_per_batch.try_into().unwrap()),
DapVersion::Latest => None,
},
},
vdaf: self.vdaf_config,
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion crates/dapf/src/cli_parsers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl FromStr for CliDapBatchMode {
Ok(Self(DapBatchMode::TimeInterval))
} else if let Some(size) = s.strip_prefix("leader-selected") {
Ok(Self(DapBatchMode::LeaderSelected {
max_batch_size: if let Some(size) = size.strip_prefix("-") {
draft09_max_batch_size: if let Some(size) = size.strip_prefix("-") {
Some(
size.parse()
.map_err(|e| format!("{s} is an invalid max batch size: {e:?}"))?,
Expand Down
6 changes: 4 additions & 2 deletions crates/dapf/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ async fn handle_test_routes(action: TestAction, http_client: HttpClient) -> anyh
let CliDapBatchMode(query) = use_or_request_from_user_or_default(
query,
|| DapBatchMode::LeaderSelected {
max_batch_size: None,
draft09_max_batch_size: None,
},
"query",
)?;
Expand Down Expand Up @@ -885,7 +885,9 @@ async fn handle_test_routes(action: TestAction, http_client: HttpClient) -> anyh
)?,
max_batch_size: match query {
DapBatchMode::TimeInterval => None,
DapBatchMode::LeaderSelected { max_batch_size } => max_batch_size,
DapBatchMode::LeaderSelected {
draft09_max_batch_size,
} => draft09_max_batch_size,
},
time_precision: use_or_request_from_user_or_default(
time_precision,
Expand Down
2 changes: 1 addition & 1 deletion crates/daphne-server/src/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl DapAggregator for crate::App {
num_agg_span_shards: global_config.default_num_agg_span_shards,
})
},
Some(task_config.task_expiration),
Some(task_config.not_after()),
)
.await
.map_err(|e| match &*e {
Expand Down
4 changes: 3 additions & 1 deletion crates/daphne-server/src/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ mod test_utils {
err = "command failed: unexpected max batch size"
))
}
(2, max_batch_size) => DapBatchMode::LeaderSelected { max_batch_size },
(2, max_batch_size) => DapBatchMode::LeaderSelected {
draft09_max_batch_size: max_batch_size,
},
_ => {
return Err(fatal_error!(
err = "command failed: unrecognized batch mode"
Expand Down
26 changes: 17 additions & 9 deletions crates/daphne-server/src/router/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -671,17 +671,25 @@ mod test {
helper_url: daphne::messages::taskprov::UrlBytes {
bytes: b"http://helper".into(),
},
query_config: daphne::messages::taskprov::QueryConfig {
time_precision: 1,
max_batch_query_count: 1,
min_batch_size: 1,
batch_mode: daphne::messages::taskprov::BatchMode::TimeInterval,
time_precision: 1,
min_batch_size: 1,
query_config: daphne::messages::taskprov::QueryConfig::TimeInterval,
lifetime: match version {
DapVersion::Draft09 => {
daphne::messages::taskprov::TaskLifetime::Draft09 { expiration: 1 }
}
DapVersion::Latest => daphne::messages::taskprov::TaskLifetime::Latest {
start: 0,
duration: 1,
},
},
task_expiration: 1,
vdaf_config: daphne::messages::taskprov::VdafConfig {
dp_config: daphne::messages::taskprov::DpConfig::None,
var: daphne::messages::taskprov::VdafTypeVar::Prio2 { dimension: 1 },
vdaf_config: daphne::messages::taskprov::VdafConfig::Prio2 { dimension: 1 },
extensions: Vec::new(),
draft09_max_batch_query_count: match version {
DapVersion::Draft09 => Some(1),
DapVersion::Latest => None,
},
draft09_dp_config: Some(daphne::messages::taskprov::DpConfig::None),
};

let req = test::<()>(
Expand Down
63 changes: 0 additions & 63 deletions crates/daphne-server/tests/e2e/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,69 +478,6 @@ async fn leader_upload_taskprov() {
.unwrap();
}

async fn leader_upload_taskprov_wrong_version(version: DapVersion) {
let wrong_version = match version {
DapVersion::Draft09 => DapVersion::Latest,
DapVersion::Latest => DapVersion::Draft09,
};
let method = match version {
DapVersion::Draft09 => &http::Method::PUT,
DapVersion::Latest => &http::Method::POST,
};
let t = TestRunner::default_with_version(version).await;
let client = t.http_client();
let hpke_config_list = t.get_hpke_configs(version, client).await.unwrap();

let (task_config, task_id, taskprov_advertisement) = DapTaskParameters {
version,
min_batch_size: 10,
query: DapBatchMode::TimeInterval,
leader_url: t.task_config.leader_url.clone(),
helper_url: t.task_config.helper_url.clone(),
..Default::default()
}
.to_config_with_taskprov(
b"cool task".to_vec(),
t.now,
daphne::roles::aggregator::TaskprovConfig {
hpke_collector_config: &t.taskprov_collector_hpke_receiver.config,
vdaf_verify_key_init: &t.taskprov_vdaf_verify_key_init,
},
)
.unwrap();

let report = task_config
.vdaf
.produce_report_with_extensions(
&hpke_config_list,
t.now,
&task_id,
DapMeasurement::U32Vec(vec![1; 10]),
vec![Extension::Taskprov],
version,
)
.unwrap();
t.leader_request_expect_abort(
client,
None,
&format!("tasks/{}/reports", task_id.to_base64url()),
method,
DapMediaType::Report,
Some(
&taskprov_advertisement
.serialize_to_header_value(wrong_version)
.unwrap(),
),
report.get_encoded_with_param(&version).unwrap(),
400,
"unrecognizedTask",
)
.await
.unwrap();
}

async_test_versions!(leader_upload_taskprov_wrong_version);

async fn internal_leader_process(version: DapVersion) {
let t = TestRunner::default_with_version(version).await;
let path = t.upload_path();
Expand Down
9 changes: 7 additions & 2 deletions crates/daphne-server/tests/e2e/test_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ impl TestRunner {
Self::with(
version,
&DapBatchMode::LeaderSelected {
max_batch_size: Some(NonZeroU32::new(MAX_BATCH_SIZE).unwrap()),
draft09_max_batch_size: match version {
DapVersion::Draft09 => NonZeroU32::new(MAX_BATCH_SIZE),
DapVersion::Latest => None,
},
},
)
.await
Expand Down Expand Up @@ -178,7 +181,9 @@ impl TestRunner {

let (batch_mode, max_batch_size) = match t.task_config.query {
DapBatchMode::TimeInterval => (1, None),
DapBatchMode::LeaderSelected { max_batch_size } => (2, Some(max_batch_size)),
DapBatchMode::LeaderSelected {
draft09_max_batch_size,
} => (2, draft09_max_batch_size),
};

const MAX_ATTEMPTS: usize = 10;
Expand Down
2 changes: 1 addition & 1 deletion crates/daphne-worker/src/aggregator/roles/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl DapAggregator for App {
num_agg_span_shards: global_config.default_num_agg_span_shards,
})
},
Some(task_config.task_expiration),
Some(task_config.not_after()),
),
)
.await
Expand Down
4 changes: 3 additions & 1 deletion crates/daphne-worker/src/aggregator/roles/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,9 @@ mod test_utils {
err = "command failed: unexpected max batch size"
))
}
(2, max_batch_size) => DapBatchMode::LeaderSelected { max_batch_size },
(2, max_batch_size) => DapBatchMode::LeaderSelected {
draft09_max_batch_size: max_batch_size,
},
_ => {
return Err(fatal_error!(
err = "command failed: unrecognized batch mode"
Expand Down
26 changes: 17 additions & 9 deletions crates/daphne-worker/src/aggregator/router/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -727,17 +727,25 @@ mod test {
helper_url: daphne::messages::taskprov::UrlBytes {
bytes: b"http://helper".into(),
},
query_config: daphne::messages::taskprov::QueryConfig {
time_precision: 1,
max_batch_query_count: 1,
min_batch_size: 1,
batch_mode: daphne::messages::taskprov::BatchMode::TimeInterval,
time_precision: 1,
min_batch_size: 1,
query_config: daphne::messages::taskprov::QueryConfig::TimeInterval,
lifetime: match version {
DapVersion::Draft09 => {
daphne::messages::taskprov::TaskLifetime::Draft09 { expiration: 1 }
}
DapVersion::Latest => daphne::messages::taskprov::TaskLifetime::Latest {
start: 0,
duration: 1,
},
},
task_expiration: 1,
vdaf_config: daphne::messages::taskprov::VdafConfig {
dp_config: daphne::messages::taskprov::DpConfig::None,
var: daphne::messages::taskprov::VdafTypeVar::Prio2 { dimension: 1 },
vdaf_config: daphne::messages::taskprov::VdafConfig::Prio2 { dimension: 1 },
extensions: Vec::new(),
draft09_max_batch_query_count: match version {
DapVersion::Draft09 => Some(1),
DapVersion::Latest => None,
},
draft09_dp_config: Some(daphne::messages::taskprov::DpConfig::None),
};

let req = test::<()>(
Expand Down
37 changes: 24 additions & 13 deletions crates/daphne/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl DapGlobalConfig {
}
}

/// DAP Query configuration.
/// DAP batch configuration.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
#[cfg_attr(any(test, feature = "test-utils"), derive(deepsize::DeepSizeOf))]
Expand All @@ -241,7 +241,8 @@ pub enum DapBatchMode {
/// Aggregators are meant to stop aggregating reports when this limit is reached.
LeaderSelected {
#[serde(default)]
max_batch_size: Option<NonZeroU32>,
#[serde(rename = "max_batch_size")]
draft09_max_batch_size: Option<NonZeroU32>,
},
}

Expand Down Expand Up @@ -553,6 +554,9 @@ impl DapTaskParameters {
now: Time,
taskprov_config: roles::aggregator::TaskprovConfig<'_>,
) -> Result<(DapTaskConfig, TaskId, TaskprovAdvertisement), DapError> {
let not_before = now;
let not_after = now + 86400 * 14; // expires in two weeks

let taskprov_advertisement = messages::taskprov::TaskprovAdvertisement {
task_info,
leader_url: messages::taskprov::UrlBytes {
Expand All @@ -561,16 +565,23 @@ impl DapTaskParameters {
helper_url: messages::taskprov::UrlBytes {
bytes: self.helper_url.to_string().into_bytes(),
},
query_config: messages::taskprov::QueryConfig {
time_precision: self.time_precision,
max_batch_query_count: 1,
min_batch_size: self.min_batch_size.try_into().unwrap(),
batch_mode: (&self.query).try_into()?,
time_precision: self.time_precision,
min_batch_size: self.min_batch_size.try_into().unwrap(),
query_config: (&self.query).try_into()?,
lifetime: messages::taskprov::TaskLifetime::from_validity_range(
self.version,
not_before,
not_after,
),
vdaf_config: (&self.vdaf).try_into()?,
extensions: Vec::new(),
draft09_max_batch_query_count: match self.version {
DapVersion::Draft09 => Some(1),
DapVersion::Latest => None,
},
task_expiration: now + 86400 * 14, // expires in two weeks
vdaf_config: messages::taskprov::VdafConfig {
dp_config: messages::taskprov::DpConfig::None,
var: (&self.vdaf).try_into()?,
draft09_dp_config: match self.version {
DapVersion::Draft09 => Some(messages::taskprov::DpConfig::None),
DapVersion::Latest => None,
},
};

Expand Down Expand Up @@ -736,7 +747,7 @@ impl DapTaskConfig {
) -> Result<bool, DapAbort> {
match self.query {
DapBatchMode::LeaderSelected {
max_batch_size: Some(max_batch_size),
draft09_max_batch_size: Some(max_batch_size),
} => {
if report_count > u64::from(max_batch_size.get()) {
return Err(DapAbort::InvalidBatchSize {
Expand All @@ -749,7 +760,7 @@ impl DapTaskConfig {
}
DapBatchMode::TimeInterval
| DapBatchMode::LeaderSelected {
max_batch_size: None,
draft09_max_batch_size: None,
} => (),
};

Expand Down
Loading

0 comments on commit 053e895

Please sign in to comment.