Skip to content

Commit

Permalink
Upgrades tokio runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
André Guedes committed Sep 13, 2021
1 parent b58a2e1 commit 7e313ad
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 27 deletions.
28 changes: 16 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,33 @@ exclude = ["tests/*", "examples/*"]

[features]
default = ["tokio_io"]
tls = ["tokio-tls", "native-tls"]
tls = ["tokio-native-tls", "native-tls"]
async_std = ["async-std"]
tokio_io = ["tokio"]

[dependencies]
byteorder = "^1.3"
chrono = "0.4"
chrono-tz = "0.5"
crossbeam = "0.7.3"
crossbeam = "0.8.0"
thiserror = "1.0.20"
futures-core = "0.3.5"
futures-sink = "0.3.5"
hostname = "^0.3"
lazy_static = "1.4.0"
lz4 = "1.23.2"
pin-project = "0.4.23"
pin-project = "1.0.4"
url="^2"
uuid = "0.8.1"
combine = "4.2.3"

[dependencies.futures-util]
version = "0.3.5"
version = "0.3.12"
features = ["sink"]

[dependencies.tokio]
version = "0.2.22"
version = "1.5"
default-features = false
features = ["io-util", "time", "net", "sync", "rt-threaded"]
features = ["io-util", "time", "net", "sync", "rt-multi-thread"]
optional = true

[dependencies.async-std]
Expand All @@ -61,15 +60,20 @@ features = ["std", "serde"]
version = "0.2"
optional = true

[dependencies.tokio-tls]
version = "0.3.1"
[dependencies.tokio-native-tls]
version = "^0.3"
optional = true

[dependencies.chrono]
version = "0.4"
default-features = false
features = [ "std" ]

[dev-dependencies]
env_logger = "^0.7"
rand = "^0.7"
env_logger = "^0.8"
rand = "^0.8"

[dev-dependencies.tokio]
version = "0.2.0"
version = "1.5"
default-features = false
features = ["macros"]
2 changes: 1 addition & 1 deletion src/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{borrow::Cow, io, result, str::Utf8Error, string::FromUtf8Error};

use thiserror::Error;
#[cfg(feature = "tokio_io")]
use tokio::time::Elapsed;
use tokio::time::error::Elapsed;
use url::ParseError;

/// Clickhouse error codes
Expand Down
37 changes: 30 additions & 7 deletions src/io/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::{
};

#[cfg(feature = "tokio_io")]
use tokio::net::TcpStream;
use tokio::{net::TcpStream, io::{ReadBuf, AsyncRead, AsyncWrite}};
#[cfg(feature = "tls")]
use tokio_tls::TlsStream;

#[cfg(feature = "tokio_io")]
use tokio::prelude::*;
// use tokio::prelude::*;
use pin_project::pin_project;
#[cfg(feature = "async_std")]
use async_std::net::TcpStream;
Expand Down Expand Up @@ -52,11 +52,16 @@ impl Stream {

#[cfg(not(feature = "async_std"))]
pub(crate) fn set_keepalive(&mut self, keepalive: Option<Duration>) -> io::Result<()> {
match *self {
Self::Plain(ref mut stream) => stream.set_keepalive(keepalive),
#[cfg(feature = "tls")]
Self::Secure(ref mut stream) => stream.get_mut().set_keepalive(keepalive),
}.map_err(|err| io::Error::new(err.kind(), format!("set_keepalive error: {}", err)))
// match *self {
// Self::Plain(ref mut stream) => stream.set_keepalive(keepalive),
// #[cfg(feature = "tls")]
// Self::Secure(ref mut stream) => stream.get_mut().set_keepalive(keepalive),
// }.map_err(|err| io::Error::new(err.kind(), format!("set_keepalive error: {}", err)))
if keepalive.is_some() {
// https://github.com/tokio-rs/tokio/issues/3082
log::warn!("`tokio` dropped `set_keepalive` in v0.3 and is currently working on getting it back")
}
Ok(())
}

pub(crate) fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
Expand All @@ -67,6 +72,7 @@ impl Stream {
}.map_err(|err| io::Error::new(err.kind(), format!("set_nodelay error: {}", err)))
}

#[cfg(feature = "async_std")]
pub(crate) fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
match self.project() {
StreamProj::Plain(stream) => stream.poll_read(cx, buf),
Expand All @@ -75,6 +81,23 @@ impl Stream {
}
}

#[cfg(not(feature = "async_std"))]
pub(crate) fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let mut read_buf = ReadBuf::new(buf);

let result = match self.project() {
StreamProj::Plain(stream) => stream.poll_read(cx, &mut read_buf),
#[cfg(feature = "tls")]
StreamProj::Secure(stream) => stream.poll_read(cx, &mut read_buf),
};

match result {
Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len())),
Poll::Ready(Err(x)) => Poll::Ready(Err(x)),
Poll::Pending => Poll::Pending,
}
}

pub(crate) fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
match self.project() {
StreamProj::Plain(stream) => stream.poll_write(cx, buf),
Expand Down
11 changes: 5 additions & 6 deletions src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(crate) struct Inner {
impl Inner {
pub(crate) fn release_conn(&self) {
self.ongoing.fetch_sub(1, Ordering::AcqRel);
while let Ok(task) = self.tasks.pop() {
while let Some(task) = self.tasks.pop() {
task.wake()
}
}
Expand Down Expand Up @@ -211,7 +211,7 @@ impl Pool {
}

fn handle_futures(&mut self, cx: &mut Context<'_>) -> Result<()> {
if let Ok(mut new) = self.inner.new.pop() {
if let Some(mut new) = self.inner.new.pop() {
match new.poll_unpin(cx) {
Poll::Ready(Ok(client)) => {
self.inner.idle.push(client).unwrap();
Expand All @@ -232,7 +232,7 @@ impl Pool {
}

fn take_conn(&mut self) -> Option<ClientHandle> {
if let Ok(mut client) = self.inner.idle.pop() {
if let Some(mut client) = self.inner.idle.pop() {
client.pool = PoolBinding::Attached(self.clone());
client.set_inside(false);
self.inner.ongoing.fetch_add(1, Ordering::AcqRel);
Expand All @@ -254,7 +254,7 @@ impl Pool {
}
self.inner.ongoing.fetch_sub(1, Ordering::AcqRel);

while let Ok(task) = self.inner.tasks.pop() {
while let Some(task) = self.inner.tasks.pop() {
task.wake()
}
}
Expand Down Expand Up @@ -388,8 +388,7 @@ mod test {
let barrier = Arc::new(AtomicBool::new(true));
let pool = Pool::new(options);

let mut runtime = Builder::new()
.threaded_scheduler()
let mut runtime = Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/retry_guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(crate) async fn retry_guard(

#[cfg(not(feature = "async_std"))]
{
tokio::time::delay_for(duration).await;
tokio::time::sleep(duration).await;
}
}
}
Expand Down

0 comments on commit 7e313ad

Please sign in to comment.