Skip to content

Commit

Permalink
Add cancellation pattern to peer connection workflow (#327) (#360)
Browse files Browse the repository at this point in the history
Co-authored-by: vinny-pereira <142326456+vinny-pereira@users.noreply.github.com>
  • Loading branch information
Davidson-Souza and vinny-pereira authored Jan 29, 2025
1 parent d23924b commit 07d48d5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
15 changes: 13 additions & 2 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,9 +917,13 @@ where
stream.set_nodelay(true)?;
let (reader, writer) = tokio::io::split(stream);

let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel();
let (actor_receiver, actor) = create_tcp_stream_actor(reader);
tokio::spawn(async move {
let _ = actor.run().await;
tokio::select! {
_ = cancellation_receiver => {}
_ = actor.run() => {}
}
});

// Use create_peer function instead of manually creating the peer
Expand All @@ -934,6 +938,7 @@ where
actor_receiver,
writer,
user_agent,
cancellation_sender,
)
.await;

Expand Down Expand Up @@ -968,9 +973,14 @@ where
let proxy = TcpStream::connect(proxy).await?;
let stream = Socks5StreamBuilder::connect(proxy, addr, address.get_port()).await?;
let (reader, writer) = tokio::io::split(stream);

let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel();
let (actor_receiver, actor) = create_tcp_stream_actor(reader);
tokio::spawn(async move {
let _ = actor.run().await;
tokio::select! {
_ = cancellation_receiver => {}
_ = actor.run() => {}
}
});

Peer::<WriteHalf>::create_peer(
Expand All @@ -984,6 +994,7 @@ where
actor_receiver,
writer,
user_agent,
cancellation_sender,
)
.await;
Ok(())
Expand Down
10 changes: 10 additions & 0 deletions crates/floresta-wire/src/p2p_wire/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ pub struct Peer<T: AsyncWrite + Unpin> {
actor_receiver: UnboundedReceiver<ReaderMessage>, // Add the receiver for messages from TcpStreamActor
writer: T,
our_user_agent: String,
cancellation_sender: tokio::sync::oneshot::Sender<()>,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -243,6 +244,13 @@ impl<T: AsyncWrite + Unpin> Peer<T> {
);
}

if let Err(cancellation_err) = self.cancellation_sender.send(()) {
debug!(
"Failed to propagate cancellation signal for Peer {}: {cancellation_err:?}",
self.id
);
}

if let Err(err) = err {
debug!("Peer {} connection loop closed: {err:?}", self.id);
}
Expand Down Expand Up @@ -602,6 +610,7 @@ impl<T: AsyncWrite + Unpin> Peer<T> {
actor_receiver: UnboundedReceiver<ReaderMessage>,
writer: WriteHalf<TcpStream>,
our_user_agent: String,
cancellation_sender: tokio::sync::oneshot::Sender<()>,
) {
let peer = Peer {
address_id,
Expand All @@ -626,6 +635,7 @@ impl<T: AsyncWrite + Unpin> Peer<T> {
actor_receiver, // Add the receiver for messages from TcpStreamActor
writer,
our_user_agent,
cancellation_sender,
};

spawn(peer.read_loop());
Expand Down

0 comments on commit 07d48d5

Please sign in to comment.