Skip to content

Commit

Permalink
Merge pull request #381 from ryneeverett/batch-syncops
Browse files Browse the repository at this point in the history
Batch SyncOp's to limit Version size.
  • Loading branch information
djmitche authored Apr 28, 2024
2 parents 505a0b5 + c625264 commit ce4a4b8
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 67 deletions.
5 changes: 5 additions & 0 deletions taskchampion/src/server/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ impl TestServer {
let mut inner = self.0.lock().unwrap();
inner.versions.remove(&parent_version_id);
}

pub(crate) fn versions_len(&self) -> usize {
let inner = self.0.lock().unwrap();
inner.versions.len()
}
}

impl Server for TestServer {
Expand Down
233 changes: 166 additions & 67 deletions taskchampion/src/taskdb/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,84 +31,100 @@ pub(super) fn sync(
// replicas trying to sync to the same server). If the server insists on the same base
// version twice, then we have diverged.
let mut requested_parent_version_id = None;
loop {
'outer: loop {
trace!("beginning sync outer loop");
let mut base_version_id = txn.base_version()?;

let mut local_ops: Vec<SyncOp> = txn
.operations()?
.drain(..)
.filter_map(|op| op.into_sync())
.collect();

// first pull changes and "rebase" on top of them
loop {
trace!("beginning sync inner loop");
if let GetVersionResult::Version {
version_id,
history_segment,
..
} = server.get_child_version(base_version_id)?
{
let version_str = str::from_utf8(&history_segment).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();

// apply this verison and update base_version in storage
info!("applying version {:?} from server", version_id);
apply_version(txn, &mut local_ops, version)?;
txn.set_base_version(version_id)?;
base_version_id = version_id;
} else {
info!("no child versions of {:?}", base_version_id);
// at the moment, no more child versions, so we can try adding our own
break;
let mut local_ops = txn.operations()?;
let sync_ops = local_ops.drain(..).filter_map(|op| op.into_sync());
let mut sync_ops_peekable = sync_ops.peekable();

// batch operations into versions of no more than a million bytes to avoid excessively large http requests.
let sync_ops_batched = std::iter::from_fn(|| {
let mut batch_size = 0;
let mut batch = Vec::new();

while let Some(op) = sync_ops_peekable.next_if(|op| {
batch_size += serde_json::to_string(&op).unwrap().len();
// include if the batch is empty or if the batch size limit is not exceeded.
batch.is_empty() || batch_size <= 1000000
}) {
batch.push(op);
}
}

if local_ops.is_empty() {
info!("no changes to push to server");
// nothing to sync back to the server..
break;
}

trace!("sending {} operations to the server", local_ops.len());

// now make a version of our local changes and push those
let new_version = Version {
operations: local_ops,
};
let history_segment = serde_json::to_string(&new_version).unwrap().into();
info!("sending new version to server");
let (res, snapshot_urgency) = server.add_version(base_version_id, history_segment)?;
match res {
AddVersionResult::Ok(new_version_id) => {
info!("version {:?} received by server", new_version_id);
txn.set_base_version(new_version_id)?;

// make a snapshot if the server indicates it is urgent enough
let base_urgency = if avoid_snapshots {
SnapshotUrgency::High
Some(batch)
});

for mut sync_ops_batch in sync_ops_batched {
// first pull changes and "rebase" on top of them
loop {
trace!("beginning sync inner loop");
if let GetVersionResult::Version {
version_id,
history_segment,
..
} = server.get_child_version(base_version_id)?
{
let version_str = str::from_utf8(&history_segment).unwrap();
let version: Version = serde_json::from_str(version_str).unwrap();

// apply this version and update base_version in storage
info!("applying version {:?} from server", version_id);
apply_version(txn, &mut sync_ops_batch, version)?;
txn.set_base_version(version_id)?;
base_version_id = version_id;
} else {
SnapshotUrgency::Low
};
if snapshot_urgency >= base_urgency {
let snapshot = snapshot::make_snapshot(txn)?;
server.add_snapshot(new_version_id, snapshot)?;
info!("no child versions of {:?}", base_version_id);
// at the moment, no more child versions, so we can try adding our own
break;
}
}

break;
if sync_ops_batch.is_empty() {
info!("no changes to push to server");
// nothing to sync back to the server..
break 'outer;
}
AddVersionResult::ExpectedParentVersion(parent_version_id) => {
info!(
"new version rejected; must be based on {:?}",
parent_version_id
);
if let Some(requested) = requested_parent_version_id {
if parent_version_id == requested {
return Err(Error::OutOfSync);

trace!("sending {} operations to the server", sync_ops_batch.len());

// now make a version of our local changes and push those
let new_version = Version {
operations: sync_ops_batch,
};
let history_segment = serde_json::to_string(&new_version).unwrap().into();
info!("sending new version to server");
let (res, snapshot_urgency) = server.add_version(base_version_id, history_segment)?;
match res {
AddVersionResult::Ok(new_version_id) => {
info!("version {:?} received by server", new_version_id);
txn.set_base_version(new_version_id)?;
base_version_id = new_version_id;

// make a snapshot if the server indicates it is urgent enough
let base_urgency = if avoid_snapshots {
SnapshotUrgency::High
} else {
SnapshotUrgency::Low
};
if snapshot_urgency >= base_urgency {
let snapshot = snapshot::make_snapshot(txn)?;
server.add_snapshot(new_version_id, snapshot)?;
}
}
requested_parent_version_id = Some(parent_version_id);
AddVersionResult::ExpectedParentVersion(parent_version_id) => {
info!(
"new version rejected; must be based on {:?}",
parent_version_id
);
if let Some(requested) = requested_parent_version_id {
if parent_version_id == requested {
return Err(Error::OutOfSync);
}
}
requested_parent_version_id = Some(parent_version_id);
break;
}
}
}
}
Expand Down Expand Up @@ -383,4 +399,87 @@ mod test {

Ok(())
}

#[test]
fn test_sync_batched() -> Result<()> {
let test_server = TestServer::new();

let mut server: Box<dyn Server> = test_server.server();

let mut db = newdb();
sync(&mut server, db.storage.txn()?.as_mut(), false).unwrap();

// add a task to db
let uuid1 = Uuid::new_v4();
db.apply(SyncOp::Create { uuid: uuid1 }).unwrap();
db.apply(SyncOp::Update {
uuid: uuid1,
property: "title".into(),
value: Some("my first task".into()),
timestamp: Utc::now(),
})
.unwrap();

sync(&mut server, db.storage.txn()?.as_mut(), true).unwrap();
assert_eq!(test_server.versions_len(), 1);

// chars are four bytes, but they're only one when converted to a String
let data = vec!['a'; 400000];

// add some large operations to db
for _ in 0..3 {
db.apply(SyncOp::Update {
uuid: uuid1,
property: "description".into(),
value: Some(data.iter().collect()),
timestamp: Utc::now(),
})
.unwrap();
}

// this sync batches the operations into two versions.
sync(&mut server, db.storage.txn()?.as_mut(), true).unwrap();
assert_eq!(test_server.versions_len(), 3);

Ok(())
}

#[test]
fn test_sync_batches_at_least_one_op() -> Result<()> {
let test_server = TestServer::new();

let mut server: Box<dyn Server> = test_server.server();

let mut db = newdb();
sync(&mut server, db.storage.txn()?.as_mut(), false).unwrap();

// add a task to db
let uuid1 = Uuid::new_v4();
db.apply(SyncOp::Create { uuid: uuid1 }).unwrap();
db.apply(SyncOp::Update {
uuid: uuid1,
property: "title".into(),
value: Some("my first task".into()),
timestamp: Utc::now(),
})
.unwrap();

sync(&mut server, db.storage.txn()?.as_mut(), true).unwrap();
assert_eq!(test_server.versions_len(), 1);

// add an operation greater than the batch limit
let data = vec!['a'; 1000001];
db.apply(SyncOp::Update {
uuid: uuid1,
property: "description".into(),
value: Some(data.iter().collect()),
timestamp: Utc::now(),
})
.unwrap();

sync(&mut server, db.storage.txn()?.as_mut(), true).unwrap();
assert_eq!(test_server.versions_len(), 2);

Ok(())
}
}

0 comments on commit ce4a4b8

Please sign in to comment.