Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add QueryResultOwned type for lifetime-independent streaming #216

Open
wants to merge 2 commits into
base: async-await
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 72 additions & 27 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ pub use crate::{
pool::Pool,
types::{block::Block, Options, Simple},
};
use crate::types::query_result_owned::QueryResultOwned;
use crate::types::query_result_owned::stream_blocks::BlockStreamOwned;

mod binary;
mod client_info;
Expand Down Expand Up @@ -301,7 +303,7 @@ impl Client {
},
timeout,
)
.await
.await
}
}

Expand Down Expand Up @@ -366,13 +368,13 @@ impl ClientHandle {
},
timeout,
)
.await
.await
}

/// Executes Clickhouse `query` on Conn.
pub fn query<Q>(&mut self, sql: Q) -> QueryResult
where
Query: From<Q>,
where
Query: From<Q>,
{
let query = Query::from(sql);
QueryResult {
Expand All @@ -381,19 +383,31 @@ impl ClientHandle {
}
}

/// Executes Clickhouse `query` on Conn.
pub fn query_owned<Q>(self, sql: Q) -> QueryResultOwned
where
Query: From<Q>,
{
let query = Query::from(sql);
QueryResultOwned {
client: self,
query,
}
}

/// Convenience method to prepare and execute a single SQL statement.
pub async fn execute<Q>(&mut self, sql: Q) -> Result<()>
where
Query: From<Q>,
where
Query: From<Q>,
{
let transport = self.execute_(sql).await?;
self.inner = Some(transport);
Ok(())
}

async fn execute_<Q>(&mut self, sql: Q) -> Result<ClickhouseTransport>
where
Query: From<Q>,
where
Query: From<Q>,
{
let timeout = try_opt!(self.context.options.get())
.execute_timeout
Expand Down Expand Up @@ -429,18 +443,18 @@ impl ClientHandle {
Ok(h.unwrap())
}
})
.await
.await
},
timeout,
)
.await
.await
}

/// Convenience method to insert block of data.
pub async fn insert<Q, B>(&mut self, table: Q, block: B) -> Result<()>
where
Query: From<Q>,
B: AsRef<Block>,
where
Query: From<Q>,
B: AsRef<Block>,
{
let query = Self::make_query(table, block.as_ref())?;
let transport = self.insert_(query.clone(), block.as_ref()).await?;
Expand Down Expand Up @@ -474,11 +488,11 @@ impl ClientHandle {
Self::insert_tail_(transport, context, query, chunks).await
}
})
.await
.await
},
timeout,
)
.await
.await
}

async fn insert_tail_(
Expand Down Expand Up @@ -520,8 +534,8 @@ impl ClientHandle {
}

fn make_query<Q>(table: Q, block: &Block) -> Result<Query>
where
Query: From<Q>,
where
Query: From<Q>,
{
let mut names: Vec<_> = Vec::with_capacity(block.as_ref().column_count());
for column in block.as_ref().columns() {
Expand All @@ -532,10 +546,10 @@ impl ClientHandle {
}

pub(crate) async fn wrap_future<T, R, F>(&mut self, f: F) -> Result<T>
where
F: FnOnce(&mut Self) -> R + Send,
R: Future<Output = Result<T>>,
T: 'static,
where
F: FnOnce(&mut Self) -> R + Send,
R: Future<Output=Result<T>>,
T: 'static,
{
let ping_before_query = try_opt!(self.context.options.get()).ping_before_query;

Expand All @@ -546,8 +560,8 @@ impl ClientHandle {
}

pub(crate) fn wrap_stream<'a, F>(&'a mut self, f: F) -> BoxStream<'a, Result<Block>>
where
F: (FnOnce(&'a mut Self) -> Result<BlockStream<'a>>) + Send + 'static,
where
F: (FnOnce(&'a mut Self) -> Result<BlockStream<'a>>) + Send + 'static,
{
let ping_before_query = match self.context.options.get() {
Ok(val) => val.ping_before_query,
Expand Down Expand Up @@ -575,6 +589,37 @@ impl ClientHandle {
}
}

pub(crate) fn wrap_stream_owned<F>(mut self, f: F) -> BoxStream<'static, Result<Block>>
where
F: (FnOnce(Self) -> Result<BlockStreamOwned>) + Send + 'static,
{
let ping_before_query = match self.context.options.get() {
Ok(val) => val.ping_before_query,
Err(err) => return Box::pin(stream::once(future::err(err))),
};

if ping_before_query {
let fut: BoxFuture<'static, BoxStream<'static, Result<Block>>> = Box::pin(async move {
let inner: BoxStream<'static, Result<Block>> = if let Err(err) = self.check_connection().await {
Box::pin(stream::once(future::err(err)))
} else {
match f(self) {
Ok(s) => Box::pin(s),
Err(err) => Box::pin(stream::once(future::err(err))),
}
};
inner
});

Box::pin(fut.flatten_stream())
} else {
match f(self) {
Ok(s) => Box::pin(s),
Err(err) => Box::pin(stream::once(future::err(err))),
}
}
}

/// Check connection and try to reconnect if necessary.
pub async fn check_connection(&mut self) -> Result<()> {
self.pool.detach();
Expand Down Expand Up @@ -626,8 +671,8 @@ fn column_name_to_string(name: &str) -> Result<String> {

#[cfg(feature = "async_std")]
async fn with_timeout<F, T>(future: F, duration: Duration) -> F::Output
where
F: Future<Output = Result<T>>,
where
F: Future<Output=Result<T>>,
{
use async_std::io;
use futures_util::future::TryFutureExt;
Expand All @@ -639,8 +684,8 @@ where

#[cfg(not(feature = "async_std"))]
async fn with_timeout<F, T>(future: F, timeout: Duration) -> F::Output
where
F: Future<Output = Result<T>>,
where
F: Future<Output=Result<T>>,
{
tokio::time::timeout(timeout, future).await?
}
Expand Down
2 changes: 1 addition & 1 deletion src/types/column/datetime64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ pub(crate) fn to_native_datetime_opt(value: i64, precision: u32) -> Option<Naive
let sec = nano / 1_000_000_000;
let nsec = nano - sec * 1_000_000_000;

NaiveDateTime::from_timestamp_opt(sec, nsec as u32)
DateTime::<Utc>::from_timestamp(sec, nsec as u32).map(|d| d.naive_utc())
}

#[cfg(test)]
Expand Down
54 changes: 27 additions & 27 deletions src/types/column/iter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,9 @@ impl FusedIterator for StringIterator<'_> {}
impl<'a> DecimalIterator<'a> {
#[inline(always)]
unsafe fn next_unchecked_<T>(&mut self) -> Decimal
where
T: Copy + Sized,
i64: From<T>,
where
T: Copy + Sized,
i64: From<T>,
{
let current_value = *(self.ptr as *const T);
self.ptr = (self.ptr as *const T).offset(1) as *const u8;
Expand Down Expand Up @@ -479,7 +479,7 @@ impl<'a> NativeDateTimeIterator<'a> {
match &self.inner {
DateTimeInnerIterator::DateTime32(ptr) => {
let current_value = *ptr.add(index_);
NaiveDateTime::from_timestamp_opt(i64::from(current_value), 0).unwrap()
DateTime::from_timestamp(i64::from(current_value), 0).unwrap().naive_utc()
}
DateTimeInnerIterator::DateTime64(ptr, precision) => {
let current_value = *ptr.add(index_);
Expand Down Expand Up @@ -664,8 +664,8 @@ impl<'a> Iterator for DateTimeIterator<'a> {
}

impl<'a, I> ExactSizeIterator for NullableIterator<'a, I>
where
I: Iterator,
where
I: Iterator,
{
#[inline(always)]
fn len(&self) -> usize {
Expand All @@ -675,8 +675,8 @@ where
}

impl<'a, I> Iterator for NullableIterator<'a, I>
where
I: Iterator,
where
I: Iterator,
{
type Item = Option<I::Item>;

Expand Down Expand Up @@ -758,8 +758,8 @@ impl<'a, I: Iterator> Iterator for ArrayIterator<'a, I> {
impl<'a, I: Iterator> FusedIterator for ArrayIterator<'a, I> {}

impl<'a, K: Iterator, V: Iterator> ExactSizeIterator for MapIterator<'a, K, V>
where
K::Item: Eq + Hash,
where
K::Item: Eq + Hash,
{
#[inline(always)]
fn len(&self) -> usize {
Expand All @@ -768,8 +768,8 @@ where
}

impl<'a, K: Iterator, V: Iterator> Iterator for MapIterator<'a, K, V>
where
K::Item: Eq + Hash,
where
K::Item: Eq + Hash,
{
type Item = HashMap<K::Item, V::Item>;

Expand Down Expand Up @@ -935,7 +935,7 @@ impl<'a> Iterable<'a, Simple> for &[u8] {
return Err(Error::FromSql(FromSqlError::InvalidType {
src: column.sql_type().to_string(),
dst: SqlType::String.to_string(),
}))
}));
}
};

Expand Down Expand Up @@ -1196,8 +1196,8 @@ fn date_iter(column: &Column<Simple>, props: u32) -> Result<DateTimeInternals> {
}

impl<'a, T> Iterable<'a, Simple> for Option<T>
where
T: Iterable<'a, Simple>,
where
T: Iterable<'a, Simple>,
{
type Iter = NullableIterator<'a, T::Iter>;

Expand Down Expand Up @@ -1238,8 +1238,8 @@ where
}

impl<'a, T> Iterable<'a, Simple> for Vec<T>
where
T: Iterable<'a, Simple>,
where
T: Iterable<'a, Simple>,
{
type Iter = ArrayIterator<'a, T::Iter>;

Expand Down Expand Up @@ -1279,10 +1279,10 @@ where
}

impl<'a, K, V> Iterable<'a, Simple> for HashMap<K, V>
where
K: Iterable<'a, Simple>,
<<K as Iterable<'a, Simple>>::Iter as Iterator>::Item: Eq + Hash,
V: Iterable<'a, Simple>,
where
K: Iterable<'a, Simple>,
<<K as Iterable<'a, Simple>>::Iter as Iterator>::Item: Eq + Hash,
V: Iterable<'a, Simple>,
{
type Iter = MapIterator<'a, K::Iter, V::Iter>;

Expand Down Expand Up @@ -1328,8 +1328,8 @@ where
}

pub struct ComplexIterator<'a, T>
where
T: Iterable<'a, Simple>,
where
T: Iterable<'a, Simple>,
{
column_type: SqlType,

Expand All @@ -1342,8 +1342,8 @@ where
}

impl<'a, T> Iterator for ComplexIterator<'a, T>
where
T: Iterable<'a, Simple>,
where
T: Iterable<'a, Simple>,
{
type Item = <<T as Iterable<'a, Simple>>::Iter as Iterator>::Item;

Expand Down Expand Up @@ -1392,8 +1392,8 @@ where
}

impl<'a, T> Iterable<'a, Complex> for T
where
T: Iterable<'a, Simple> + 'a,
where
T: Iterable<'a, Simple> + 'a,
{
type Iter = ComplexIterator<'a, T>;

Expand Down
12 changes: 6 additions & 6 deletions src/types/from_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ impl<'a> FromSql<'a> for String {
}

impl<'a, K, V> FromSql<'a> for HashMap<K, V>
where
K: FromSql<'a> + Eq + PartialEq + Hash,
V: FromSql<'a>,
where
K: FromSql<'a> + Eq + PartialEq + Hash,
V: FromSql<'a>,
{
fn from_sql(value: ValueRef<'a>) -> FromSqlResult<Self> {
if let ValueRef::Map(_k, _v, hm) = value {
Expand Down Expand Up @@ -274,8 +274,8 @@ from_sql_vec_impl! {
}

impl<'a, T> FromSql<'a> for Option<T>
where
T: FromSql<'a>,
where
T: FromSql<'a>,
{
fn from_sql(value: ValueRef<'a>) -> FromSqlResult<Self> {
match value {
Expand All @@ -301,7 +301,7 @@ impl<'a> FromSql<'a> for NaiveDate {
fn from_sql(value: ValueRef<'a>) -> FromSqlResult<Self> {
match value {
ValueRef::Date(v) => NaiveDate::from_ymd_opt(1970, 1, 1)
.map(|unix_epoch| unix_epoch + Duration::days(v.into()))
.map(|unix_epoch| unix_epoch + Duration::try_days(v.into()).expect("TimeDelta::days out of bounds"))
.ok_or(Error::FromSql(FromSqlError::OutOfRange)),
_ => {
let from = SqlType::from(value).to_string();
Expand Down
Loading