Skip to content

Commit

Permalink
feat: add MergeUnbounded::is_empty and MergeUnbounded::len
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Aug 29, 2024
1 parent 1461ce4 commit cdb330d
Showing 1 changed file with 30 additions and 5 deletions.
35 changes: 30 additions & 5 deletions src/merge_unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,27 @@ impl<S> MergeUnbounded<S> {
}
}
}

/// Returns `true` if there are no streams in the set.
pub fn is_empty(&self) -> bool {
if self.groups.is_empty() {
true
} else {
self.groups.iter().all(|g| g.streams.is_empty())
}
}

/// Returns the number of streams currently in the set.
pub fn len(&self) -> usize {
self.groups.iter().map(|g| g.streams.len()).sum()
}
}

impl<S: Stream + Unpin> Stream for MergeUnbounded<S> {
type Item = S::Item;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
// rem,
groups,
poll_next,
} = &mut *self;
let Self { groups, poll_next } = &mut *self;
if groups.is_empty() {
return Poll::Ready(None);
}
Expand Down Expand Up @@ -192,23 +202,38 @@ mod tests {
let b = stream::repeat(3).take(3);
let mut s = MergeUnbounded::default();
assert_eq!(s.next().await, None);
assert_eq!(s.is_empty(), true);
assert_eq!(s.len(), 0);

s.push(a);
s.push(b);

assert_eq!(s.is_empty(), false);
assert_eq!(s.len(), 2);

let mut counter = 0;
while let Some(n) = s.next().await {
counter += n;
assert_eq!(s.is_empty(), false);
}

assert_eq!(s.is_empty(), true);
assert_eq!(s.len(), 0);

let b = stream::repeat(4).take(4);
s.push(b);

assert_eq!(s.is_empty(), false);
assert_eq!(s.len(), 1);

while let Some(n) = s.next().await {
counter += n;
}

assert_eq!(counter, 4 + 9 + 16);

assert_eq!(s.is_empty(), true);
assert_eq!(s.len(), 0);
})
}

Expand Down

0 comments on commit cdb330d

Please sign in to comment.