From 07d48d5246273c65e0f7c29c7fae72c249fc95d1 Mon Sep 17 00:00:00 2001 From: Davidson Souza Date: Wed, 29 Jan 2025 16:33:42 -0300 Subject: [PATCH] Add cancellation pattern to peer connection workflow (#327) (#360) Co-authored-by: vinny-pereira <142326456+vinny-pereira@users.noreply.github.com> --- crates/floresta-wire/src/p2p_wire/node.rs | 15 +++++++++++++-- crates/floresta-wire/src/p2p_wire/peer.rs | 10 ++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/crates/floresta-wire/src/p2p_wire/node.rs b/crates/floresta-wire/src/p2p_wire/node.rs index 543cd3e6..f66bb53b 100644 --- a/crates/floresta-wire/src/p2p_wire/node.rs +++ b/crates/floresta-wire/src/p2p_wire/node.rs @@ -917,9 +917,13 @@ where stream.set_nodelay(true)?; let (reader, writer) = tokio::io::split(stream); + let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel(); let (actor_receiver, actor) = create_tcp_stream_actor(reader); tokio::spawn(async move { - let _ = actor.run().await; + tokio::select! { + _ = cancellation_receiver => {} + _ = actor.run() => {} + } }); // Use create_peer function instead of manually creating the peer @@ -934,6 +938,7 @@ where actor_receiver, writer, user_agent, + cancellation_sender, ) .await; @@ -968,9 +973,14 @@ where let proxy = TcpStream::connect(proxy).await?; let stream = Socks5StreamBuilder::connect(proxy, addr, address.get_port()).await?; let (reader, writer) = tokio::io::split(stream); + + let (cancellation_sender, cancellation_receiver) = tokio::sync::oneshot::channel(); let (actor_receiver, actor) = create_tcp_stream_actor(reader); tokio::spawn(async move { - let _ = actor.run().await; + tokio::select! { + _ = cancellation_receiver => {} + _ = actor.run() => {} + } }); Peer::::create_peer( @@ -984,6 +994,7 @@ where actor_receiver, writer, user_agent, + cancellation_sender, ) .await; Ok(()) diff --git a/crates/floresta-wire/src/p2p_wire/peer.rs b/crates/floresta-wire/src/p2p_wire/peer.rs index ea274e39..2c4b18b2 100644 --- a/crates/floresta-wire/src/p2p_wire/peer.rs +++ b/crates/floresta-wire/src/p2p_wire/peer.rs @@ -170,6 +170,7 @@ pub struct Peer { actor_receiver: UnboundedReceiver, // Add the receiver for messages from TcpStreamActor writer: T, our_user_agent: String, + cancellation_sender: tokio::sync::oneshot::Sender<()>, } #[derive(Debug, Error)] @@ -243,6 +244,13 @@ impl Peer { ); } + if let Err(cancellation_err) = self.cancellation_sender.send(()) { + debug!( + "Failed to propagate cancellation signal for Peer {}: {cancellation_err:?}", + self.id + ); + } + if let Err(err) = err { debug!("Peer {} connection loop closed: {err:?}", self.id); } @@ -602,6 +610,7 @@ impl Peer { actor_receiver: UnboundedReceiver, writer: WriteHalf, our_user_agent: String, + cancellation_sender: tokio::sync::oneshot::Sender<()>, ) { let peer = Peer { address_id, @@ -626,6 +635,7 @@ impl Peer { actor_receiver, // Add the receiver for messages from TcpStreamActor writer, our_user_agent, + cancellation_sender, }; spawn(peer.read_loop());