Skip to content

Commit

Permalink
Improve performance of taskdb::apply_operations (#533)
Browse files Browse the repository at this point in the history
The existing implementation performed a `txn.get_task` and
`txn.set_task` for each update operation. But typically, a sequence of
operations will all apply to the same task, so most of these calls are
redundant.

This introduces a small cache of tasks to which some update operations
have been applied (or which are known to not exist), avoiding the
redundant calls.
  • Loading branch information
djmitche authored Jan 13, 2025
1 parent c18edf1 commit 0e911d9
Showing 1 changed file with 169 additions and 6 deletions.
175 changes: 169 additions & 6 deletions src/taskdb/apply.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::errors::{Error, Result};
use crate::operation::Operation;
use crate::server::SyncOp;
use crate::storage::StorageTxn;
use crate::storage::{StorageTxn, TaskMap};
use crate::Operations;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use uuid::Uuid;

/// Apply `operations` to the database in the given single transaction.
///
Expand All @@ -11,37 +14,80 @@ use crate::Operations;
///
/// The transaction is not committed.
pub(super) fn apply_operations(txn: &mut dyn StorageTxn, operations: &Operations) -> Result<()> {
// A cache of TaskMaps updated in this sequence of operations, but for which `txn.set_task` has
// not yet been called.
let mut tasks: HashMap<Uuid, Option<TaskMap>> = HashMap::new();

fn get_cache<'t>(
uuid: Uuid,
tasks: &'t mut HashMap<Uuid, Option<TaskMap>>,
txn: &mut dyn StorageTxn,
) -> Result<Option<&'t mut TaskMap>> {
match tasks.entry(uuid) {
Entry::Occupied(occupied_entry) => Ok(occupied_entry.into_mut().as_mut()),
Entry::Vacant(vacant_entry) => {
let task = txn.get_task(uuid)?;
Ok(vacant_entry.insert(task).as_mut())
}
}
}

// Call `txn.set_task` for this task, if necessary, and remove from the cache.
fn flush_cache(
uuid: Uuid,
tasks: &mut HashMap<Uuid, Option<TaskMap>>,
txn: &mut dyn StorageTxn,
) -> Result<()> {
if let Entry::Occupied(occupied_entry) = tasks.entry(uuid) {
let v = occupied_entry.remove();
if let Some(taskmap) = v {
txn.set_task(uuid, taskmap)?;
}
}
Ok(())
}

for operation in operations {
match operation {
Operation::Create { uuid } => {
// The create_task method will do nothing if the task exists.
// The create_task method will do nothing if the task exists. If it was cached
// as not existing, clear that information. If it had cached updates, then there
// is no harm flushing those updates now.
flush_cache(*uuid, &mut tasks, txn)?;
txn.create_task(*uuid)?;
}
Operation::Delete { uuid, .. } => {
// The delete_task method will do nothing if the task does not exist.
txn.delete_task(*uuid)?;
// The task now unconditionally does not exist. If there was a pending
// `txn.set_task`, it can safely be skipped.
tasks.insert(*uuid, None);
}
Operation::Update {
uuid,
property,
value,
..
} => {
// TODO: could cache this value from op to op
let task = txn.get_task(*uuid)?;
let task = get_cache(*uuid, &mut tasks, txn)?;
// If the task does not exist, do nothing.
if let Some(mut task) = task {
if let Some(task) = task {
if let Some(v) = value {
task.insert(property.clone(), v.clone());
} else {
task.remove(property);
}
txn.set_task(*uuid, task)?;
}
}
Operation::UndoPoint => {}
}
}

// Flush any remaining tasks in the cache.
while let Some((uuid, _)) = tasks.iter().next() {
flush_cache(*uuid, &mut tasks, txn)?;
}

Ok(())
}

Expand Down Expand Up @@ -83,6 +129,7 @@ pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> Result<()> {

#[cfg(test)]
mod tests {
#![allow(clippy::vec_init_then_push)]
use super::*;
use crate::storage::{taskmap_with, TaskMap};
use crate::taskdb::TaskDb;
Expand Down Expand Up @@ -134,6 +181,46 @@ mod tests {
Ok(())
}

#[test]
fn apply_operations_create_exists_update() -> Result<()> {
let mut db = TaskDb::new_inmemory();
let now = Utc::now();
let uuid = Uuid::new_v4();
{
let mut txn = db.storage.txn()?;
txn.create_task(uuid)?;
txn.set_task(uuid, taskmap_with(vec![("foo".into(), "bar".into())]))?;
txn.commit()?;
}

{
let mut ops = Operations::new();
ops.push(Operation::Create { uuid });
ops.push(Operation::Update {
uuid,
property: String::from("title"),
value: Some("my task".into()),
timestamp: now,
old_value: None,
});
let mut txn = db.storage.txn()?;
apply_operations(txn.as_mut(), &ops)?;
txn.commit()?;
}

assert_eq!(
db.sorted_tasks(),
vec![(
uuid,
vec![
("foo".into(), "bar".into()),
("title".into(), "my task".into())
]
)]
);
Ok(())
}

#[test]
fn apply_operations_create_update() -> Result<()> {
let mut db = TaskDb::new_inmemory();
Expand Down Expand Up @@ -228,6 +315,82 @@ mod tests {
Ok(())
}

#[test]
fn apply_operations_delete_then_update() -> Result<()> {
let mut db = TaskDb::new_inmemory();
let uuid = Uuid::new_v4();
let now = Utc::now();
let mut ops = Operations::new();
ops.push(Operation::Create { uuid });
ops.push(Operation::Update {
uuid,
property: String::from("old"),
value: Some("uhoh".into()),
timestamp: now,
old_value: None,
});
ops.push(Operation::Delete {
uuid,
old_task: taskmap_with(vec![]),
});
ops.push(Operation::Update {
uuid,
property: String::from("new"),
value: Some("uhoh".into()),
timestamp: now,
old_value: None,
});

{
let mut txn = db.storage.txn()?;
apply_operations(txn.as_mut(), &ops)?;
txn.commit()?;
}

assert_eq!(db.sorted_tasks(), vec![]);
Ok(())
}

#[test]
fn apply_operations_several_tasks() -> Result<()> {
let mut db = TaskDb::new_inmemory();
let mut uuids = [Uuid::new_v4(), Uuid::new_v4()];
uuids.sort();
let now = Utc::now();
let mut ops = Operations::new();
ops.push(Operation::Create { uuid: uuids[0] });
ops.push(Operation::Create { uuid: uuids[1] });
ops.push(Operation::Update {
uuid: uuids[0],
property: String::from("p"),
value: Some("1".into()),
timestamp: now,
old_value: None,
});
ops.push(Operation::Update {
uuid: uuids[1],
property: String::from("p"),
value: Some("2".into()),
timestamp: now,
old_value: None,
});

{
let mut txn = db.storage.txn()?;
apply_operations(txn.as_mut(), &ops)?;
txn.commit()?;
}

assert_eq!(
db.sorted_tasks(),
vec![
(uuids[0], vec![("p".into(), "1".into())]),
(uuids[1], vec![("p".into(), "2".into())])
]
);
Ok(())
}

#[test]
fn apply_operations_create_delete() -> Result<()> {
let mut db = TaskDb::new_inmemory();
Expand Down

0 comments on commit 0e911d9

Please sign in to comment.