Skip to content

Commit

Permalink
support of LowCardinality types
Browse files Browse the repository at this point in the history
  • Loading branch information
suharev7 committed Nov 9, 2023
1 parent 069d8ca commit b4e54a5
Show file tree
Hide file tree
Showing 29 changed files with 1,198 additions and 208 deletions.
20 changes: 8 additions & 12 deletions src/binary/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,27 @@ use log::{trace, warn};
use crate::{
binary::{protocol, ReadEx},
errors::{DriverError, Error, Result, ServerError},
io::transport::TransportInfo,
types::{Block, Packet, ProfileInfo, Progress, ServerInfo, TableColumns},
};

/// The internal clickhouse response parser.
pub(crate) struct Parser<T> {
pub(crate) struct Parser<'i, T> {
reader: T,
tz: Option<Tz>,
compress: bool,
info: &'i TransportInfo,
}

/// The parser can be used to parse clickhouse responses into values. Generally
/// you normally do not use this directly as it's already done for you by
/// the client but in some more complex situations it might be useful to be
/// able to parse the clickhouse responses.
impl<T: Read> Parser<T> {
impl<'i, T: Read> Parser<'i, T> {
/// Creates a new parser that parses the data behind the reader. More
/// than one value can be behind the reader in which case the parser can
/// be invoked multiple times. In other words: the stream does not have
/// to be terminated.
pub(crate) fn new(reader: T, tz: Option<Tz>, compress: bool) -> Parser<T> {
Self {
reader,
tz,
compress,
}
pub(crate) fn new(reader: T, info: &'i TransportInfo) -> Parser<T> {
Self { reader, info }
}

/// Parses a single value out of the stream. If there are multiple
Expand All @@ -54,11 +50,11 @@ impl<T: Read> Parser<T> {
}

fn parse_block(&mut self) -> Result<Packet<()>> {
match self.tz {
match self.info.timezone {
None => Err(Error::Driver(DriverError::UnexpectedPacket)),
Some(tz) => {
self.reader.skip_string()?;
let block = Block::load(&mut self.reader, tz, self.compress)?;
let block = Block::load(&mut self.reader, tz, self.info.compress)?;
Ok(Packet::Block(block))
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/client_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,11 @@ pub fn description() -> String {

#[test]
fn test_description() {
assert_eq!(description(), format!("Rust SQLDriver {CLICK_HOUSE_DBMSVERSION_MAJOR}.{CLICK_HOUSE_DBMSVERSION_MINOR}.{CLICK_HOUSE_REVISION}"))
assert_eq!(
description(),
format!(
"Rust SQLDriver {}.{}.{}",
CLICK_HOUSE_DBMSVERSION_MAJOR, CLICK_HOUSE_DBMSVERSION_MINOR, CLICK_HOUSE_REVISION
)
)
}
3 changes: 3 additions & 0 deletions src/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub enum DriverError {

#[error("Invalid utf-8 sequence.")]
Utf8Error(Utf8Error),

#[error("Deserialize error: `{}`", _0)]
Deserialize(Cow<'static, str>),
}

/// This type enumerates cast from sql type errors.
Expand Down
29 changes: 19 additions & 10 deletions src/io/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ use crate::{
use futures_core::Stream;
use futures_util::StreamExt;

pub(crate) struct TransportInfo {
pub(crate) timezone: Option<Tz>,
pub(crate) revision: u64,
pub(crate) compress: bool,
}

/// Line transport
#[pin_project(project = ClickhouseTransportProj)]
pub(crate) struct ClickhouseTransport {
Expand All @@ -43,9 +49,10 @@ pub(crate) struct ClickhouseTransport {
// Queued commands
cmds: VecDeque<Cmd>,
// Server time zone
timezone: Option<Tz>,
revision: u64,
compress: bool,
// timezone: Option<Tz>,
// revision: u64,
// compress: bool,
info: TransportInfo,
// Whether there are unread packets
pub(crate) inconsistent: bool,
status: Arc<TransportStatus>,
Expand Down Expand Up @@ -78,9 +85,11 @@ impl ClickhouseTransport {
buf_is_incomplete: false,
wr: io::Cursor::new(vec![]),
cmds: VecDeque::new(),
timezone: None,
revision: 0,
compress,
info: TransportInfo {
timezone: None,
revision: 0,
compress,
},
inconsistent: false,
status: Arc::new(TransportStatus::new(pool)),
}
Expand Down Expand Up @@ -150,14 +159,14 @@ impl<'p> ClickhouseTransportProj<'p> {
let ret = {
let mut cursor = Cursor::new(&self.rd);
let res = {
let mut parser = Parser::new(&mut cursor, *self.timezone, *self.compress);
parser.parse_packet(*self.revision)
let mut parser = Parser::new(&mut cursor, self.info);
parser.parse_packet(self.info.revision)
};
pos = cursor.position() as usize;

if let Ok(Packet::Hello(_, ref packet)) = res {
*self.timezone = Some(packet.timezone);
*self.revision = packet.revision;
self.info.timezone = Some(packet.timezone);
self.info.revision = packet.revision;
}

match res {
Expand Down
4 changes: 4 additions & 0 deletions src/types/column/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ impl ColumnData for ArrayColumnData {
}
None
}

fn get_timezone(&self) -> Option<Tz> {
self.inner.get_timezone()
}
}

#[cfg(test)]
Expand Down
8 changes: 8 additions & 0 deletions src/types/column/chrono_datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ impl ColumnData for ChronoDateTimeColumnData {
None
}
}

fn get_timezone(&self) -> Option<Tz> {
Some(self.tz)
}
}

#[inline(always)]
Expand Down Expand Up @@ -214,4 +218,8 @@ impl ColumnData for ChronoDateTimeAdapter {
) -> Result<()> {
unimplemented!()
}

fn get_timezone(&self) -> Option<Tz> {
self.column.get_timezone()
}
}
5 changes: 5 additions & 0 deletions src/types/column/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use chrono_tz::Tz;
use std::{cmp, ops};

use crate::{
Expand Down Expand Up @@ -53,4 +54,8 @@ impl ColumnData for ChunkColumnData {
fn clone_instance(&self) -> BoxColumnData {
unimplemented!()
}

fn get_timezone(&self) -> Option<Tz> {
self.data.get_timezone()
}
}
23 changes: 20 additions & 3 deletions src/types/column/column_data.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{convert, sync::Arc};
use chrono_tz::Tz;
use std::sync::Arc;

use crate::{
binary::Encoder,
Expand All @@ -10,6 +11,12 @@ pub(crate) type ArcColumnData = Arc<dyn ColumnData + Send + Sync>;

pub(crate) type BoxColumnData = Box<dyn ColumnData + Send + Sync>;

pub trait LowCardinalityAccessor {
fn get_string(&self, _index: usize) -> &[u8] {
unimplemented!()
}
}

pub trait ColumnData {
fn sql_type(&self) -> SqlType;
fn save(&self, encoder: &mut Encoder, start: usize, end: usize);
Expand All @@ -28,17 +35,27 @@ pub trait ColumnData {
Err(Error::FromSql(FromSqlError::UnsupportedOperation))
}

unsafe fn get_internals(&self, _data: *mut (), _level: u8, _props: u32) -> Result<()> {
Err(Error::FromSql(FromSqlError::UnsupportedOperation))
}

fn cast_to(&self, _this: &ArcColumnData, _target: &SqlType) -> Option<ArcColumnData> {
None
}

fn get_timezone(&self) -> Option<Tz>;

fn get_low_cardinality_accessor(&self) -> Option<&dyn LowCardinalityAccessor> {
None
}
}

pub(crate) trait ColumnDataExt {
fn append<T: convert::Into<Value>>(&mut self, value: T);
fn append<T: Into<Value>>(&mut self, value: T);
}

impl<C: ColumnData> ColumnDataExt for C {
fn append<T: convert::Into<Value>>(&mut self, value: T) {
fn append<T: Into<Value>>(&mut self, value: T) {
self.push(value.into());
}
}
8 changes: 6 additions & 2 deletions src/types/column/concat.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::iter;
use chrono_tz::Tz;

use crate::{
binary::Encoder,
Expand Down Expand Up @@ -79,11 +79,15 @@ impl ColumnData for ConcatColumnData {
Err(Error::FromSql(FromSqlError::UnsupportedOperation))
}
}

fn get_timezone(&self) -> Option<Tz> {
self.data.first().and_then(|chunk| chunk.get_timezone())
}
}

fn build_index<'a, I>(sizes: I) -> Vec<usize>
where
I: iter::Iterator<Item = usize> + 'a,
I: Iterator<Item = usize> + 'a,
{
let mut acc = 0;
let mut index = vec![acc];
Expand Down
69 changes: 61 additions & 8 deletions src/types/column/date.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert, fmt, sync::Arc};
use std::{fmt, ptr, sync::Arc};

use chrono::prelude::*;
use chrono_tz::Tz;
Expand All @@ -10,7 +10,7 @@ use crate::{
types::{
column::{
array::ArrayColumnData,
column_data::{BoxColumnData, ColumnData},
column_data::{BoxColumnData, ColumnData, LowCardinalityAccessor},
datetime64::DEFAULT_TZ,
list::List,
nullable::NullableColumnData,
Expand All @@ -27,8 +27,8 @@ where
+ Unmarshal<T>
+ Marshal
+ Copy
+ convert::Into<Value>
+ convert::From<Value>
+ Into<Value>
+ From<Value>
+ fmt::Display
+ Sync
+ Default
Expand All @@ -44,8 +44,8 @@ where
+ Unmarshal<T>
+ Marshal
+ Copy
+ convert::Into<Value>
+ convert::From<Value>
+ Into<Value>
+ From<Value>
+ fmt::Display
+ Sync
+ Default
Expand Down Expand Up @@ -161,14 +161,30 @@ impl ColumnFrom for Vec<Option<NaiveDate>> {
}
}

impl<T> LowCardinalityAccessor for DateColumnData<T> where
T: StatBuffer
+ Unmarshal<T>
+ Marshal
+ Copy
+ Into<Value>
+ From<Value>
+ fmt::Display
+ Sync
+ Send
+ DateConverter
+ Default
+ 'static
{
}

impl<T> ColumnData for DateColumnData<T>
where
T: StatBuffer
+ Unmarshal<T>
+ Marshal
+ Copy
+ convert::Into<Value>
+ convert::From<Value>
+ Into<Value>
+ From<Value>
+ fmt::Display
+ Sync
+ Send
Expand Down Expand Up @@ -215,6 +231,43 @@ where
*(pointers[2] as *mut usize) = self.len();
Ok(())
}

unsafe fn get_internals(&self, data_ptr: *mut (), _level: u8, _props: u32) -> Result<()> {
unsafe {
let data_ref = &mut *(data_ptr as *mut DateTimeInternals);
data_ref.begin = self.data.as_ptr().cast();
data_ref.len = self.data.len();
data_ref.tz = self.tz;
Ok(())
}
}

fn get_timezone(&self) -> Option<Tz> {
Some(self.tz)
}

fn get_low_cardinality_accessor(&self) -> Option<&dyn LowCardinalityAccessor> {
Some(self)
}
}

#[derive(Debug)]
pub(crate) struct DateTimeInternals {
pub(crate) begin: *const (),
pub(crate) len: usize,
pub(crate) tz: Tz,
pub(crate) precision: Option<u32>,
}

impl Default for DateTimeInternals {
fn default() -> Self {
Self {
begin: ptr::null(),
len: 0,
tz: Tz::Zulu,
precision: None,
}
}
}

#[cfg(test)]
Expand Down
18 changes: 18 additions & 0 deletions src/types/column/datetime64.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
types::{
column::{
column_data::{BoxColumnData, ColumnData},
date::DateTimeInternals,
list::List,
},
DateTimeType, SqlType, Value, ValueRef,
Expand Down Expand Up @@ -96,6 +97,23 @@ impl ColumnData for DateTime64ColumnData {
*(pointers[3] as *mut Option<u32>) = Some(*precision);
Ok(())
}

unsafe fn get_internals(&self, data_ptr: *mut (), _level: u8, _props: u32) -> Result<()> {
let (precision, tz) = &self.params;
unsafe {
let data_ref = &mut *(data_ptr as *mut DateTimeInternals);
data_ref.begin = self.data.as_ptr().cast();
data_ref.len = self.data.len();
data_ref.tz = *tz;
data_ref.precision = Some(*precision);
Ok(())
}
}

fn get_timezone(&self) -> Option<Tz> {
let (_, tz) = self.params;
Some(tz)
}
}

pub(crate) fn from_datetime<T: TimeZone>(time: DateTime<T>, precision: u32) -> i64 {
Expand Down
Loading

0 comments on commit b4e54a5

Please sign in to comment.