diff --git a/Cargo.toml b/Cargo.toml index 9bf84b70..ec1a8979 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,34 +15,33 @@ exclude = ["tests/*", "examples/*"] [features] default = ["tokio_io"] -tls = ["tokio-tls", "native-tls"] +tls = ["tokio-native-tls", "native-tls"] async_std = ["async-std"] tokio_io = ["tokio"] [dependencies] byteorder = "^1.3" -chrono = "0.4" chrono-tz = "0.5" -crossbeam = "0.7.3" +crossbeam = "0.8.0" thiserror = "1.0.20" futures-core = "0.3.5" futures-sink = "0.3.5" hostname = "^0.3" lazy_static = "1.4.0" lz4 = "1.23.2" -pin-project = "0.4.23" +pin-project = "1.0.4" url="^2" uuid = "0.8.1" combine = "4.2.3" [dependencies.futures-util] -version = "0.3.5" +version = "0.3.12" features = ["sink"] [dependencies.tokio] -version = "0.2.22" +version = "1.5" default-features = false -features = ["io-util", "time", "net", "sync", "rt-threaded"] +features = ["io-util", "time", "net", "sync", "rt-multi-thread"] optional = true [dependencies.async-std] @@ -61,15 +60,20 @@ features = ["std", "serde"] version = "0.2" optional = true -[dependencies.tokio-tls] -version = "0.3.1" +[dependencies.tokio-native-tls] +version = "^0.3" optional = true +[dependencies.chrono] +version = "0.4" +default-features = false +features = [ "std" ] + [dev-dependencies] -env_logger = "^0.7" -rand = "^0.7" +env_logger = "^0.8" +rand = "^0.8" [dev-dependencies.tokio] -version = "0.2.0" +version = "1.5" default-features = false features = ["macros"] diff --git a/src/errors/mod.rs b/src/errors/mod.rs index 966bc69b..9f189871 100644 --- a/src/errors/mod.rs +++ b/src/errors/mod.rs @@ -2,7 +2,7 @@ use std::{borrow::Cow, io, result, str::Utf8Error, string::FromUtf8Error}; use thiserror::Error; #[cfg(feature = "tokio_io")] -use tokio::time::Elapsed; +use tokio::time::error::Elapsed; use url::ParseError; /// Clickhouse error codes diff --git a/src/io/stream.rs b/src/io/stream.rs index f7e133e1..05b8a0f3 100644 --- a/src/io/stream.rs +++ b/src/io/stream.rs @@ -6,12 +6,12 @@ use std::{ }; #[cfg(feature = "tokio_io")] -use tokio::net::TcpStream; +use tokio::{net::TcpStream, io::{ReadBuf, AsyncRead, AsyncWrite}}; #[cfg(feature = "tls")] use tokio_tls::TlsStream; #[cfg(feature = "tokio_io")] -use tokio::prelude::*; +// use tokio::prelude::*; use pin_project::pin_project; #[cfg(feature = "async_std")] use async_std::net::TcpStream; @@ -52,11 +52,16 @@ impl Stream { #[cfg(not(feature = "async_std"))] pub(crate) fn set_keepalive(&mut self, keepalive: Option) -> io::Result<()> { - match *self { - Self::Plain(ref mut stream) => stream.set_keepalive(keepalive), - #[cfg(feature = "tls")] - Self::Secure(ref mut stream) => stream.get_mut().set_keepalive(keepalive), - }.map_err(|err| io::Error::new(err.kind(), format!("set_keepalive error: {}", err))) + // match *self { + // Self::Plain(ref mut stream) => stream.set_keepalive(keepalive), + // #[cfg(feature = "tls")] + // Self::Secure(ref mut stream) => stream.get_mut().set_keepalive(keepalive), + // }.map_err(|err| io::Error::new(err.kind(), format!("set_keepalive error: {}", err))) + if keepalive.is_some() { + // https://github.com/tokio-rs/tokio/issues/3082 + log::warn!("`tokio` dropped `set_keepalive` in v0.3 and is currently working on getting it back") + } + Ok(()) } pub(crate) fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> { @@ -67,6 +72,7 @@ impl Stream { }.map_err(|err| io::Error::new(err.kind(), format!("set_nodelay error: {}", err))) } + #[cfg(feature = "async_std")] pub(crate) fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { match self.project() { StreamProj::Plain(stream) => stream.poll_read(cx, buf), @@ -75,6 +81,23 @@ impl Stream { } } + #[cfg(not(feature = "async_std"))] + pub(crate) fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + let mut read_buf = ReadBuf::new(buf); + + let result = match self.project() { + StreamProj::Plain(stream) => stream.poll_read(cx, &mut read_buf), + #[cfg(feature = "tls")] + StreamProj::Secure(stream) => stream.poll_read(cx, &mut read_buf), + }; + + match result { + Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len())), + Poll::Ready(Err(x)) => Poll::Ready(Err(x)), + Poll::Pending => Poll::Pending, + } + } + pub(crate) fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { match self.project() { StreamProj::Plain(stream) => stream.poll_write(cx, buf), diff --git a/src/pool/mod.rs b/src/pool/mod.rs index 2d64e968..9d9ace0c 100644 --- a/src/pool/mod.rs +++ b/src/pool/mod.rs @@ -32,7 +32,7 @@ pub(crate) struct Inner { impl Inner { pub(crate) fn release_conn(&self) { self.ongoing.fetch_sub(1, Ordering::AcqRel); - while let Ok(task) = self.tasks.pop() { + while let Some(task) = self.tasks.pop() { task.wake() } } @@ -211,7 +211,7 @@ impl Pool { } fn handle_futures(&mut self, cx: &mut Context<'_>) -> Result<()> { - if let Ok(mut new) = self.inner.new.pop() { + if let Some(mut new) = self.inner.new.pop() { match new.poll_unpin(cx) { Poll::Ready(Ok(client)) => { self.inner.idle.push(client).unwrap(); @@ -232,7 +232,7 @@ impl Pool { } fn take_conn(&mut self) -> Option { - if let Ok(mut client) = self.inner.idle.pop() { + if let Some(mut client) = self.inner.idle.pop() { client.pool = PoolBinding::Attached(self.clone()); client.set_inside(false); self.inner.ongoing.fetch_add(1, Ordering::AcqRel); @@ -254,7 +254,7 @@ impl Pool { } self.inner.ongoing.fetch_sub(1, Ordering::AcqRel); - while let Ok(task) = self.inner.tasks.pop() { + while let Some(task) = self.inner.tasks.pop() { task.wake() } } @@ -388,8 +388,7 @@ mod test { let barrier = Arc::new(AtomicBool::new(true)); let pool = Pool::new(options); - let mut runtime = Builder::new() - .threaded_scheduler() + let mut runtime = Builder::new_multi_thread() .enable_all() .build() .unwrap(); diff --git a/src/retry_guard.rs b/src/retry_guard.rs index 2ac49d92..6d689626 100644 --- a/src/retry_guard.rs +++ b/src/retry_guard.rs @@ -44,7 +44,7 @@ pub(crate) async fn retry_guard( #[cfg(not(feature = "async_std"))] { - tokio::time::delay_for(duration).await; + tokio::time::sleep(duration).await; } } }