Skip to content

Commit

Permalink
Fix accumulated values for backdated queries
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaReiser committed Dec 31, 2024
1 parent 88a1d77 commit 37b9d6e
Show file tree
Hide file tree
Showing 14 changed files with 206 additions and 41 deletions.
4 changes: 2 additions & 2 deletions src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use accumulated_map::AccumulatedMap;

use crate::{
cycle::CycleRecoveryStrategy,
ingredient::{fmt_index, Ingredient, Jar},
ingredient::{fmt_index, Ingredient, Jar, MaybeChangedAfter},
plumbing::JarAux,
zalsa::IngredientIndex,
zalsa_local::QueryOrigin,
Expand Down Expand Up @@ -106,7 +106,7 @@ impl<A: Accumulator> Ingredient for IngredientImpl<A> {
_db: &dyn Database,
_input: Option<Id>,
_revision: Revision,
) -> bool {
) -> MaybeChangedAfter {
panic!("nothing should ever depend on an accumulator directly")
}

Expand Down
39 changes: 32 additions & 7 deletions src/accumulator/accumulated_map.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::ops::{BitOr, BitOrAssign};

use crossbeam::atomic::AtomicCell;
use rustc_hash::FxHashMap;

use crate::IngredientIndex;
Expand All @@ -10,7 +13,7 @@ pub struct AccumulatedMap {

/// [`InputAccumulatedValues::Empty`] if any input read during the query's execution
/// has any direct or indirect accumulated values.
inputs: InputAccumulatedValues,
inputs: AtomicCell<InputAccumulatedValues>,
}

impl AccumulatedMap {
Expand All @@ -22,18 +25,22 @@ impl AccumulatedMap {
}

/// Adds the accumulated state of an input to this accumulated map.
pub(crate) fn add_input(&mut self, input: InputAccumulatedValues) {
pub(crate) fn add_input(&self, input: InputAccumulatedValues) {
if input.is_any() {
self.inputs = InputAccumulatedValues::Any;
self.inputs.store(InputAccumulatedValues::Any);
}
}

pub(crate) fn set_inputs(&self, input: InputAccumulatedValues) {
self.inputs.store(input);
}

/// Returns whether an input of the associated query has any accumulated values.
///
/// Note: Use [`InputAccumulatedValues::from_map`] to check if the associated query itself
/// or any of its inputs has accumulated values.
pub(crate) fn inputs(&self) -> InputAccumulatedValues {
self.inputs
self.inputs.load()
}

pub fn extend_with_accumulated<A: Accumulator>(
Expand All @@ -60,7 +67,7 @@ impl Clone for AccumulatedMap {
.iter()
.map(|(&key, value)| (key, value.cloned()))
.collect(),
inputs: self.inputs,
inputs: AtomicCell::new(self.inputs.load()),
}
}
}
Expand All @@ -70,7 +77,7 @@ impl Clone for AccumulatedMap {
/// Knowning whether any input has accumulated values makes aggregating the accumulated values
/// cheaper because we can skip over entire subtrees.
#[derive(Copy, Clone, Debug, Default)]
pub(crate) enum InputAccumulatedValues {
pub enum InputAccumulatedValues {
/// The query nor any of its inputs have any accumulated values.
#[default]
Empty,
Expand All @@ -82,7 +89,7 @@ pub(crate) enum InputAccumulatedValues {
impl InputAccumulatedValues {
pub(crate) fn from_map(accumulated: &AccumulatedMap) -> Self {
if accumulated.map.is_empty() {
accumulated.inputs
accumulated.inputs.load()
} else {
Self::Any
}
Expand All @@ -96,3 +103,21 @@ impl InputAccumulatedValues {
matches!(self, Self::Empty)
}
}

impl BitOr for InputAccumulatedValues {
type Output = Self;

fn bitor(self, rhs: Self) -> Self::Output {
if rhs.is_any() {
InputAccumulatedValues::Any
} else {
self
}
}
}

impl BitOrAssign for InputAccumulatedValues {
fn bitor_assign(&mut self, rhs: Self) {
*self = *self | rhs;
}
}
4 changes: 2 additions & 2 deletions src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{any::Any, fmt, sync::Arc};
use crate::{
accumulator::accumulated_map::AccumulatedMap,
cycle::CycleRecoveryStrategy,
ingredient::fmt_index,
ingredient::{fmt_index, MaybeChangedAfter},
key::DatabaseKeyIndex,
plumbing::JarAux,
salsa_struct::SalsaStructInDb,
Expand Down Expand Up @@ -194,7 +194,7 @@ where
db: &dyn Database,
input: Option<Id>,
revision: Revision,
) -> bool {
) -> MaybeChangedAfter {
let key = input.unwrap();
let db = db.as_view::<C::DbView>();
self.maybe_changed_after(db, key, revision)
Expand Down
52 changes: 41 additions & 11 deletions src/function/maybe_changed_after.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{
accumulator::accumulated_map::InputAccumulatedValues,
ingredient::MaybeChangedAfter,
key::DatabaseKeyIndex,
zalsa::{Zalsa, ZalsaDatabase},
zalsa_local::{ActiveQueryGuard, EdgeKind, QueryOrigin},
Expand All @@ -16,7 +18,7 @@ where
db: &'db C::DbView,
id: Id,
revision: Revision,
) -> bool {
) -> MaybeChangedAfter {
let (zalsa, zalsa_local) = db.zalsas();
zalsa_local.unwind_if_revision_cancelled(db.as_dyn_database());

Expand All @@ -29,7 +31,11 @@ where
let memo_guard = self.get_memo_from_table_for(zalsa, id);
if let Some(memo) = &memo_guard {
if self.shallow_verify_memo(db, zalsa, database_key_index, memo) {
return memo.revisions.changed_at > revision;
return if memo.revisions.changed_at > revision {
MaybeChangedAfter::Yes
} else {
MaybeChangedAfter::No(memo.revisions.accumulated.inputs())
};
}
drop(memo_guard); // release the arc-swap guard before cold path
if let Some(mcs) = self.maybe_changed_after_cold(db, id, revision) {
Expand All @@ -39,7 +45,7 @@ where
}
} else {
// No memo? Assume has changed.
return true;
return MaybeChangedAfter::Yes;
}
}
}
Expand All @@ -49,7 +55,7 @@ where
db: &'db C::DbView,
key_index: Id,
revision: Revision,
) -> Option<bool> {
) -> Option<MaybeChangedAfter> {
let (zalsa, zalsa_local) = db.zalsas();
let database_key_index = self.database_key_index(key_index);

Expand All @@ -63,7 +69,7 @@ where

// Load the current memo, if any.
let Some(old_memo) = self.get_memo_from_table_for(zalsa, key_index) else {
return Some(true);
return Some(MaybeChangedAfter::Yes);
};

tracing::debug!(
Expand All @@ -74,7 +80,11 @@ where

// Check if the inputs are still valid and we can just compare `changed_at`.
if self.deep_verify_memo(db, &old_memo, &active_query) {
return Some(old_memo.revisions.changed_at > revision);
return Some(if old_memo.revisions.changed_at > revision {
MaybeChangedAfter::Yes
} else {
MaybeChangedAfter::No(old_memo.revisions.accumulated.inputs())
});
}

// If inputs have changed, but we have an old value, we can re-execute.
Expand All @@ -84,11 +94,18 @@ where
if old_memo.value.is_some() {
let memo = self.execute(db, active_query, Some(old_memo));
let changed_at = memo.revisions.changed_at;
return Some(changed_at > revision);

return Some(if changed_at > revision {
MaybeChangedAfter::Yes
} else {
MaybeChangedAfter::No(
memo.revisions.accumulated.inputs() | InputAccumulatedValues::from_map(&memo.revisions.accumulated)
)
});
}

// Otherwise, nothing for it: have to consider the value to have changed.
Some(true)
Some(MaybeChangedAfter::Yes)
}

/// True if the memo's value and `changed_at` time is still valid in this revision.
Expand Down Expand Up @@ -117,7 +134,12 @@ where
if memo.check_durability(zalsa) {
// No input of the suitable durability has changed since last verified.
let db = db.as_dyn_database();
memo.mark_as_verified(db, revision_now, database_key_index);
memo.mark_as_verified(
db,
revision_now,
database_key_index,
memo.revisions.accumulated.inputs(),
);
memo.mark_outputs_as_verified(db, database_key_index);
return true;
}
Expand Down Expand Up @@ -151,6 +173,8 @@ where
return true;
}

let mut inputs = InputAccumulatedValues::default();

match &old_memo.revisions.origin {
QueryOrigin::Assigned(_) => {
// If the value was assigneed by another query,
Expand Down Expand Up @@ -185,10 +209,15 @@ where
for &(edge_kind, dependency_index) in edges.input_outputs.iter() {
match edge_kind {
EdgeKind::Input => {
if dependency_index
match dependency_index
.maybe_changed_after(db.as_dyn_database(), last_verified_at)
{
return false;
MaybeChangedAfter::Yes => {
return false;
}
MaybeChangedAfter::No(input_accumulated) => {
inputs |= input_accumulated;
}
}
}
EdgeKind::Output => {
Expand Down Expand Up @@ -220,6 +249,7 @@ where
db.as_dyn_database(),
zalsa.current_revision(),
database_key_index,
inputs,
);
true
}
Expand Down
3 changes: 3 additions & 0 deletions src/function/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use crossbeam::atomic::AtomicCell;

use crate::accumulator::accumulated_map::InputAccumulatedValues;
use crate::zalsa_local::QueryOrigin;
use crate::{
key::DatabaseKeyIndex, zalsa::Zalsa, zalsa_local::QueryRevisions, Event, EventKind, Id,
Expand Down Expand Up @@ -143,6 +144,7 @@ impl<V> Memo<V> {
db: &dyn crate::Database,
revision_now: Revision,
database_key_index: DatabaseKeyIndex,
accumulated: InputAccumulatedValues,
) {
db.salsa_event(&|| Event {
thread_id: std::thread::current().id(),
Expand All @@ -152,6 +154,7 @@ impl<V> Memo<V> {
});

self.verified_at.store(revision_now);
self.revisions.accumulated.set_inputs(accumulated);
}

pub(super) fn mark_outputs_as_verified(
Expand Down
2 changes: 2 additions & 0 deletions src/function/specify.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crossbeam::atomic::AtomicCell;

use crate::{
accumulator::accumulated_map::InputAccumulatedValues,
tracked_struct::TrackedStructInDb,
zalsa::ZalsaDatabase,
zalsa_local::{QueryOrigin, QueryRevisions},
Expand Down Expand Up @@ -127,6 +128,7 @@ where
db.as_dyn_database(),
zalsa.current_revision(),
database_key_index,
InputAccumulatedValues::Empty,
);
}
}
24 changes: 22 additions & 2 deletions src/ingredient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
};

use crate::{
accumulator::accumulated_map::AccumulatedMap,
accumulator::accumulated_map::{AccumulatedMap, InputAccumulatedValues},
cycle::CycleRecoveryStrategy,
zalsa::{IngredientIndex, MemoIngredientIndex},
zalsa_local::QueryOrigin,
Expand Down Expand Up @@ -61,7 +61,7 @@ pub trait Ingredient: Any + std::fmt::Debug + Send + Sync {
db: &'db dyn Database,
input: Option<Id>,
revision: Revision,
) -> bool;
) -> MaybeChangedAfter;

/// What were the inputs (if any) that were used to create the value at `key_index`.
fn origin(&self, db: &dyn Database, key_index: Id) -> Option<QueryOrigin>;
Expand Down Expand Up @@ -172,3 +172,23 @@ pub(crate) fn fmt_index(
write!(fmt, "{debug_name}()")
}
}

#[derive(Copy, Clone, Debug)]
pub enum MaybeChangedAfter {
/// The query result hasn't changed.
///
/// The inner value tracks whether the memo or any of its dependencies have an accumulated value.
No(InputAccumulatedValues),

/// The query's result has changed since the last revision or the query isn't cached yet.
Yes,
}

impl From<bool> for MaybeChangedAfter {
fn from(value: bool) -> Self {
match value {
true => MaybeChangedAfter::Yes,
false => MaybeChangedAfter::No(InputAccumulatedValues::Empty),
}
}
}
6 changes: 3 additions & 3 deletions src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
accumulator::accumulated_map::InputAccumulatedValues,
cycle::CycleRecoveryStrategy,
id::{AsId, FromId},
ingredient::{fmt_index, Ingredient},
ingredient::{fmt_index, Ingredient, MaybeChangedAfter},
key::{DatabaseKeyIndex, DependencyIndex},
plumbing::{Jar, JarAux, Stamp},
table::{memo::MemoTable, sync::SyncTable, Slot, Table},
Expand Down Expand Up @@ -222,10 +222,10 @@ impl<C: Configuration> Ingredient for IngredientImpl<C> {
_db: &dyn Database,
_input: Option<Id>,
_revision: Revision,
) -> bool {
) -> MaybeChangedAfter {
// Input ingredients are just a counter, they store no data, they are immortal.
// Their *fields* are stored in function ingredients elsewhere.
false
MaybeChangedAfter::No(InputAccumulatedValues::Empty)
}

fn cycle_recovery_strategy(&self) -> CycleRecoveryStrategy {
Expand Down
7 changes: 4 additions & 3 deletions src/input/input_field.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::cycle::CycleRecoveryStrategy;
use crate::ingredient::{fmt_index, Ingredient};
use crate::ingredient::{fmt_index, Ingredient, MaybeChangedAfter};
use crate::input::Configuration;
use crate::zalsa::IngredientIndex;
use crate::zalsa_local::QueryOrigin;
Expand Down Expand Up @@ -54,11 +54,12 @@ where
db: &dyn Database,
input: Option<Id>,
revision: Revision,
) -> bool {
) -> MaybeChangedAfter {
let zalsa = db.zalsa();
let input = input.unwrap();
let value = <IngredientImpl<C>>::data(zalsa, input);
value.stamps[self.field_index].changed_at > revision

MaybeChangedAfter::from(value.stamps[self.field_index].changed_at > revision)
}

fn origin(&self, _db: &dyn Database, _key_index: Id) -> Option<QueryOrigin> {
Expand Down
Loading

0 comments on commit 37b9d6e

Please sign in to comment.