diff --git a/src/taskdb/apply.rs b/src/taskdb/apply.rs index c256a1c74..78f4aa2ba 100644 --- a/src/taskdb/apply.rs +++ b/src/taskdb/apply.rs @@ -1,8 +1,11 @@ use crate::errors::{Error, Result}; use crate::operation::Operation; use crate::server::SyncOp; -use crate::storage::StorageTxn; +use crate::storage::{StorageTxn, TaskMap}; use crate::Operations; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use uuid::Uuid; /// Apply `operations` to the database in the given single transaction. /// @@ -11,15 +14,54 @@ use crate::Operations; /// /// The transaction is not committed. pub(super) fn apply_operations(txn: &mut dyn StorageTxn, operations: &Operations) -> Result<()> { + // A cache of TaskMaps updated in this sequence of operations, but for which `txn.set_task` has + // not yet been called. + let mut tasks: HashMap> = HashMap::new(); + + fn get_cache<'t>( + uuid: Uuid, + tasks: &'t mut HashMap>, + txn: &mut dyn StorageTxn, + ) -> Result> { + match tasks.entry(uuid) { + Entry::Occupied(occupied_entry) => Ok(occupied_entry.into_mut().as_mut()), + Entry::Vacant(vacant_entry) => { + let task = txn.get_task(uuid)?; + Ok(vacant_entry.insert(task).as_mut()) + } + } + } + + // Call `txn.set_task` for this task, if necessary, and remove from the cache. + fn flush_cache( + uuid: Uuid, + tasks: &mut HashMap>, + txn: &mut dyn StorageTxn, + ) -> Result<()> { + if let Entry::Occupied(occupied_entry) = tasks.entry(uuid) { + let v = occupied_entry.remove(); + if let Some(taskmap) = v { + txn.set_task(uuid, taskmap)?; + } + } + Ok(()) + } + for operation in operations { match operation { Operation::Create { uuid } => { - // The create_task method will do nothing if the task exists. + // The create_task method will do nothing if the task exists. If it was cached + // as not existing, clear that information. If it had cached updates, then there + // is no harm flushing those updates now. + flush_cache(*uuid, &mut tasks, txn)?; txn.create_task(*uuid)?; } Operation::Delete { uuid, .. } => { // The delete_task method will do nothing if the task does not exist. txn.delete_task(*uuid)?; + // The task now unconditionally does not exist. If there was a pending + // `txn.set_task`, it can safely be skipped. + tasks.insert(*uuid, None); } Operation::Update { uuid, @@ -27,21 +69,25 @@ pub(super) fn apply_operations(txn: &mut dyn StorageTxn, operations: &Operations value, .. } => { - // TODO: could cache this value from op to op - let task = txn.get_task(*uuid)?; + let task = get_cache(*uuid, &mut tasks, txn)?; // If the task does not exist, do nothing. - if let Some(mut task) = task { + if let Some(task) = task { if let Some(v) = value { task.insert(property.clone(), v.clone()); } else { task.remove(property); } - txn.set_task(*uuid, task)?; } } Operation::UndoPoint => {} } } + + // Flush any remaining tasks in the cache. + while let Some((uuid, _)) = tasks.iter().next() { + flush_cache(*uuid, &mut tasks, txn)?; + } + Ok(()) } @@ -83,6 +129,7 @@ pub(super) fn apply_op(txn: &mut dyn StorageTxn, op: &SyncOp) -> Result<()> { #[cfg(test)] mod tests { + #![allow(clippy::vec_init_then_push)] use super::*; use crate::storage::{taskmap_with, TaskMap}; use crate::taskdb::TaskDb; @@ -134,6 +181,46 @@ mod tests { Ok(()) } + #[test] + fn apply_operations_create_exists_update() -> Result<()> { + let mut db = TaskDb::new_inmemory(); + let now = Utc::now(); + let uuid = Uuid::new_v4(); + { + let mut txn = db.storage.txn()?; + txn.create_task(uuid)?; + txn.set_task(uuid, taskmap_with(vec![("foo".into(), "bar".into())]))?; + txn.commit()?; + } + + { + let mut ops = Operations::new(); + ops.push(Operation::Create { uuid }); + ops.push(Operation::Update { + uuid, + property: String::from("title"), + value: Some("my task".into()), + timestamp: now, + old_value: None, + }); + let mut txn = db.storage.txn()?; + apply_operations(txn.as_mut(), &ops)?; + txn.commit()?; + } + + assert_eq!( + db.sorted_tasks(), + vec![( + uuid, + vec![ + ("foo".into(), "bar".into()), + ("title".into(), "my task".into()) + ] + )] + ); + Ok(()) + } + #[test] fn apply_operations_create_update() -> Result<()> { let mut db = TaskDb::new_inmemory(); @@ -228,6 +315,82 @@ mod tests { Ok(()) } + #[test] + fn apply_operations_delete_then_update() -> Result<()> { + let mut db = TaskDb::new_inmemory(); + let uuid = Uuid::new_v4(); + let now = Utc::now(); + let mut ops = Operations::new(); + ops.push(Operation::Create { uuid }); + ops.push(Operation::Update { + uuid, + property: String::from("old"), + value: Some("uhoh".into()), + timestamp: now, + old_value: None, + }); + ops.push(Operation::Delete { + uuid, + old_task: taskmap_with(vec![]), + }); + ops.push(Operation::Update { + uuid, + property: String::from("new"), + value: Some("uhoh".into()), + timestamp: now, + old_value: None, + }); + + { + let mut txn = db.storage.txn()?; + apply_operations(txn.as_mut(), &ops)?; + txn.commit()?; + } + + assert_eq!(db.sorted_tasks(), vec![]); + Ok(()) + } + + #[test] + fn apply_operations_several_tasks() -> Result<()> { + let mut db = TaskDb::new_inmemory(); + let mut uuids = [Uuid::new_v4(), Uuid::new_v4()]; + uuids.sort(); + let now = Utc::now(); + let mut ops = Operations::new(); + ops.push(Operation::Create { uuid: uuids[0] }); + ops.push(Operation::Create { uuid: uuids[1] }); + ops.push(Operation::Update { + uuid: uuids[0], + property: String::from("p"), + value: Some("1".into()), + timestamp: now, + old_value: None, + }); + ops.push(Operation::Update { + uuid: uuids[1], + property: String::from("p"), + value: Some("2".into()), + timestamp: now, + old_value: None, + }); + + { + let mut txn = db.storage.txn()?; + apply_operations(txn.as_mut(), &ops)?; + txn.commit()?; + } + + assert_eq!( + db.sorted_tasks(), + vec![ + (uuids[0], vec![("p".into(), "1".into())]), + (uuids[1], vec![("p".into(), "2".into())]) + ] + ); + Ok(()) + } + #[test] fn apply_operations_create_delete() -> Result<()> { let mut db = TaskDb::new_inmemory();