From c625264793f0adf9f2f4b2d3333ba2b0edcff3b0 Mon Sep 17 00:00:00 2001 From: ryneeverett Date: Wed, 24 Apr 2024 11:26:29 -0400 Subject: [PATCH] Batch SyncOp's to limit Version size. This will effectively limit the size of http requests. Resolve https://github.com/GothenburgBitFactory/taskwarrior/issues/3331. --- taskchampion/src/server/test.rs | 5 + taskchampion/src/taskdb/sync.rs | 233 +++++++++++++++++++++++--------- 2 files changed, 171 insertions(+), 67 deletions(-) diff --git a/taskchampion/src/server/test.rs b/taskchampion/src/server/test.rs index 165fa557b..2b9058792 100644 --- a/taskchampion/src/server/test.rs +++ b/taskchampion/src/server/test.rs @@ -58,6 +58,11 @@ impl TestServer { let mut inner = self.0.lock().unwrap(); inner.versions.remove(&parent_version_id); } + + pub(crate) fn versions_len(&self) -> usize { + let inner = self.0.lock().unwrap(); + inner.versions.len() + } } impl Server for TestServer { diff --git a/taskchampion/src/taskdb/sync.rs b/taskchampion/src/taskdb/sync.rs index 95e2938c4..532df8182 100644 --- a/taskchampion/src/taskdb/sync.rs +++ b/taskchampion/src/taskdb/sync.rs @@ -31,84 +31,100 @@ pub(super) fn sync( // replicas trying to sync to the same server). If the server insists on the same base // version twice, then we have diverged. let mut requested_parent_version_id = None; - loop { + 'outer: loop { trace!("beginning sync outer loop"); let mut base_version_id = txn.base_version()?; - let mut local_ops: Vec = txn - .operations()? - .drain(..) - .filter_map(|op| op.into_sync()) - .collect(); - - // first pull changes and "rebase" on top of them - loop { - trace!("beginning sync inner loop"); - if let GetVersionResult::Version { - version_id, - history_segment, - .. - } = server.get_child_version(base_version_id)? - { - let version_str = str::from_utf8(&history_segment).unwrap(); - let version: Version = serde_json::from_str(version_str).unwrap(); - - // apply this verison and update base_version in storage - info!("applying version {:?} from server", version_id); - apply_version(txn, &mut local_ops, version)?; - txn.set_base_version(version_id)?; - base_version_id = version_id; - } else { - info!("no child versions of {:?}", base_version_id); - // at the moment, no more child versions, so we can try adding our own - break; + let mut local_ops = txn.operations()?; + let sync_ops = local_ops.drain(..).filter_map(|op| op.into_sync()); + let mut sync_ops_peekable = sync_ops.peekable(); + + // batch operations into versions of no more than a million bytes to avoid excessively large http requests. + let sync_ops_batched = std::iter::from_fn(|| { + let mut batch_size = 0; + let mut batch = Vec::new(); + + while let Some(op) = sync_ops_peekable.next_if(|op| { + batch_size += serde_json::to_string(&op).unwrap().len(); + // include if the batch is empty or if the batch size limit is not exceeded. + batch.is_empty() || batch_size <= 1000000 + }) { + batch.push(op); } - } - - if local_ops.is_empty() { - info!("no changes to push to server"); - // nothing to sync back to the server.. - break; - } - trace!("sending {} operations to the server", local_ops.len()); - - // now make a version of our local changes and push those - let new_version = Version { - operations: local_ops, - }; - let history_segment = serde_json::to_string(&new_version).unwrap().into(); - info!("sending new version to server"); - let (res, snapshot_urgency) = server.add_version(base_version_id, history_segment)?; - match res { - AddVersionResult::Ok(new_version_id) => { - info!("version {:?} received by server", new_version_id); - txn.set_base_version(new_version_id)?; - - // make a snapshot if the server indicates it is urgent enough - let base_urgency = if avoid_snapshots { - SnapshotUrgency::High + Some(batch) + }); + + for mut sync_ops_batch in sync_ops_batched { + // first pull changes and "rebase" on top of them + loop { + trace!("beginning sync inner loop"); + if let GetVersionResult::Version { + version_id, + history_segment, + .. + } = server.get_child_version(base_version_id)? + { + let version_str = str::from_utf8(&history_segment).unwrap(); + let version: Version = serde_json::from_str(version_str).unwrap(); + + // apply this version and update base_version in storage + info!("applying version {:?} from server", version_id); + apply_version(txn, &mut sync_ops_batch, version)?; + txn.set_base_version(version_id)?; + base_version_id = version_id; } else { - SnapshotUrgency::Low - }; - if snapshot_urgency >= base_urgency { - let snapshot = snapshot::make_snapshot(txn)?; - server.add_snapshot(new_version_id, snapshot)?; + info!("no child versions of {:?}", base_version_id); + // at the moment, no more child versions, so we can try adding our own + break; } + } - break; + if sync_ops_batch.is_empty() { + info!("no changes to push to server"); + // nothing to sync back to the server.. + break 'outer; } - AddVersionResult::ExpectedParentVersion(parent_version_id) => { - info!( - "new version rejected; must be based on {:?}", - parent_version_id - ); - if let Some(requested) = requested_parent_version_id { - if parent_version_id == requested { - return Err(Error::OutOfSync); + + trace!("sending {} operations to the server", sync_ops_batch.len()); + + // now make a version of our local changes and push those + let new_version = Version { + operations: sync_ops_batch, + }; + let history_segment = serde_json::to_string(&new_version).unwrap().into(); + info!("sending new version to server"); + let (res, snapshot_urgency) = server.add_version(base_version_id, history_segment)?; + match res { + AddVersionResult::Ok(new_version_id) => { + info!("version {:?} received by server", new_version_id); + txn.set_base_version(new_version_id)?; + base_version_id = new_version_id; + + // make a snapshot if the server indicates it is urgent enough + let base_urgency = if avoid_snapshots { + SnapshotUrgency::High + } else { + SnapshotUrgency::Low + }; + if snapshot_urgency >= base_urgency { + let snapshot = snapshot::make_snapshot(txn)?; + server.add_snapshot(new_version_id, snapshot)?; } } - requested_parent_version_id = Some(parent_version_id); + AddVersionResult::ExpectedParentVersion(parent_version_id) => { + info!( + "new version rejected; must be based on {:?}", + parent_version_id + ); + if let Some(requested) = requested_parent_version_id { + if parent_version_id == requested { + return Err(Error::OutOfSync); + } + } + requested_parent_version_id = Some(parent_version_id); + break; + } } } } @@ -383,4 +399,87 @@ mod test { Ok(()) } + + #[test] + fn test_sync_batched() -> Result<()> { + let test_server = TestServer::new(); + + let mut server: Box = test_server.server(); + + let mut db = newdb(); + sync(&mut server, db.storage.txn()?.as_mut(), false).unwrap(); + + // add a task to db + let uuid1 = Uuid::new_v4(); + db.apply(SyncOp::Create { uuid: uuid1 }).unwrap(); + db.apply(SyncOp::Update { + uuid: uuid1, + property: "title".into(), + value: Some("my first task".into()), + timestamp: Utc::now(), + }) + .unwrap(); + + sync(&mut server, db.storage.txn()?.as_mut(), true).unwrap(); + assert_eq!(test_server.versions_len(), 1); + + // chars are four bytes, but they're only one when converted to a String + let data = vec!['a'; 400000]; + + // add some large operations to db + for _ in 0..3 { + db.apply(SyncOp::Update { + uuid: uuid1, + property: "description".into(), + value: Some(data.iter().collect()), + timestamp: Utc::now(), + }) + .unwrap(); + } + + // this sync batches the operations into two versions. + sync(&mut server, db.storage.txn()?.as_mut(), true).unwrap(); + assert_eq!(test_server.versions_len(), 3); + + Ok(()) + } + + #[test] + fn test_sync_batches_at_least_one_op() -> Result<()> { + let test_server = TestServer::new(); + + let mut server: Box = test_server.server(); + + let mut db = newdb(); + sync(&mut server, db.storage.txn()?.as_mut(), false).unwrap(); + + // add a task to db + let uuid1 = Uuid::new_v4(); + db.apply(SyncOp::Create { uuid: uuid1 }).unwrap(); + db.apply(SyncOp::Update { + uuid: uuid1, + property: "title".into(), + value: Some("my first task".into()), + timestamp: Utc::now(), + }) + .unwrap(); + + sync(&mut server, db.storage.txn()?.as_mut(), true).unwrap(); + assert_eq!(test_server.versions_len(), 1); + + // add an operation greater than the batch limit + let data = vec!['a'; 1000001]; + db.apply(SyncOp::Update { + uuid: uuid1, + property: "description".into(), + value: Some(data.iter().collect()), + timestamp: Utc::now(), + }) + .unwrap(); + + sync(&mut server, db.storage.txn()?.as_mut(), true).unwrap(); + assert_eq!(test_server.versions_len(), 2); + + Ok(()) + } }