diff --git a/Cargo.lock b/Cargo.lock index 4e3b59f18eb55..2a4c475198b69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1581,6 +1581,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "chrono-tz" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c39203181991a7dd4343b8005bd804e7a9a37afb8ac070e43771e8c820bbde" +dependencies = [ + "chrono", + "chrono-tz-build 0.0.3", + "phf", + "serde", +] + [[package]] name = "chrono-tz" version = "0.7.0" @@ -1588,11 +1600,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbc529705a6e0028189c83f0a5dd9fb214105116f7e3c0eeab7ff0369766b0d1" dependencies = [ "chrono", - "chrono-tz-build", + "chrono-tz-build 0.1.0", "phf", "serde", ] +[[package]] +name = "chrono-tz-build" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f509c3a87b33437b05e2458750a0700e5bdd6956176773e6c7d6dd15a283a0c" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "chrono-tz-build" version = "0.1.0" @@ -1728,6 +1751,43 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "clickhouse-rs" +version = "1.0.0-alpha.1" +source = "git+https://github.com/suharev7/clickhouse-rs?rev=e40016b#e40016bbc7546fb4d32340db074d3c66643cd5ca" +dependencies = [ + "byteorder", + "chrono", + "chrono-tz 0.6.3", + "clickhouse-rs-cityhash-sys", + "combine 4.6.4", + "crossbeam", + "either", + "futures-core", + "futures-sink", + "futures-util", + "hostname", + "lazy_static", + "log", + "lz4", + "native-tls", + "percent-encoding", + "pin-project", + "thiserror", + "tokio", + "tokio-native-tls", + "url", + "uuid 0.8.2", +] + +[[package]] +name = "clickhouse-rs-cityhash-sys" +version = "0.1.2" +source = "git+https://github.com/suharev7/clickhouse-rs?rev=e40016b#e40016bbc7546fb4d32340db074d3c66643cd5ca" +dependencies = [ + "cc", +] + [[package]] name = "clipboard-win" version = "4.4.1" @@ -2008,6 +2068,20 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "crossbeam" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.4" @@ -2284,7 +2358,7 @@ version = "0.1.0" dependencies = [ "bytes 1.2.1", "chrono", - "chrono-tz", + "chrono-tz 0.7.0", "criterion", "lalrpop", "lalrpop-util", @@ -4363,6 +4437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if", + "serde", ] [[package]] @@ -4435,6 +4510,26 @@ dependencies = [ "cc", ] +[[package]] +name = "lz4" +version = "1.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" +dependencies = [ + "libc", + "lz4-sys", +] + +[[package]] +name = "lz4-sys" +version = "1.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "macaddr" version = "1.0.1" @@ -5622,6 +5717,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9dd5609d4b2df87167f908a32e1b146ce309c16cf35df76bc11f440b756048e4" dependencies = [ "siphasher", + "uncased", ] [[package]] @@ -8630,6 +8726,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" +[[package]] +name = "uncased" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b01702b0fd0b3fadcf98e098780badda8742d4f4a7676615cad90e8ac73622" +dependencies = [ + "version_check", +] + [[package]] name = "unicase" version = "2.6.0" @@ -8845,8 +8950,10 @@ dependencies = [ "bytes 1.2.1", "bytesize", "chrono", + "chrono-tz 0.6.3", "cidr-utils", "clap 4.0.18", + "clickhouse-rs", "codecs", "colored", "console-subscriber", @@ -8859,6 +8966,7 @@ dependencies = [ "dirs-next", "dnsmsg-parser", "dyn-clone", + "either", "encoding_rs", "enrichment", "enum_dispatch", @@ -9074,7 +9182,7 @@ dependencies = [ "async-stream", "bytes 1.2.1", "chrono", - "chrono-tz", + "chrono-tz 0.7.0", "crossbeam-utils", "derivative", "futures 0.3.25", @@ -9103,7 +9211,7 @@ name = "vector-config" version = "0.1.0" dependencies = [ "chrono", - "chrono-tz", + "chrono-tz 0.7.0", "encoding_rs", "indexmap", "inventory", @@ -9357,7 +9465,7 @@ dependencies = [ "cbc", "cfb-mode", "chrono", - "chrono-tz", + "chrono-tz 0.7.0", "cidr-utils", "criterion", "csv", @@ -9406,7 +9514,7 @@ version = "0.1.0" dependencies = [ "ansi_term", "chrono", - "chrono-tz", + "chrono-tz 0.7.0", "clap 4.0.18", "enrichment", "glob", diff --git a/Cargo.toml b/Cargo.toml index b16772549b435..b68339b1cc906 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -236,13 +236,16 @@ bollard = { version = "0.13.0", default-features = false, features = ["ssl", "ch bytes = { version = "1.2.1", default-features = false, features = ["serde"] } bytesize = { version = "1.1.0", default-features = false } chrono = { version = "0.4.22", default-features = false, features = ["serde"] } +chrono-tz = { version = "0.6", default-features = false, features = ["serde"], optional = true } cidr-utils = { version = "0.5.8", default-features = false } clap = { version = "4.0.18", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] } +clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs", rev="e40016b", features = ["tokio_io", "tls"], optional = true } colored = { version = "2.0.0", default-features = false } csv = { version = "1.1", default-features = false } derivative = { version = "2.2.0", default-features = false } dirs-next = { version = "2.0.0", default-features = false, optional = true } dyn-clone = { version = "1.0.9", default-features = false } +either = { version = "1.6.1", default-features = false, optional = true } encoding_rs = { version = "0.8.31", default-features = false, features = ["serde"] } enum_dispatch = { version = "0.3.8", default-features = false } exitcode = { version = "1.1.2", default-features = false } @@ -662,7 +665,7 @@ sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", sinks-azure_monitor_logs = [] sinks-blackhole = [] sinks-chronicle = [] -sinks-clickhouse = [] +sinks-clickhouse = ["dep:clickhouse-rs", "dep:chrono-tz", "dep:either", "dep:nom"] sinks-console = [] sinks-datadog_archives = ["sinks-aws_s3", "sinks-azure_blob", "sinks-gcp"] sinks-datadog_events = [] diff --git a/src/sinks/clickhouse/config.rs b/src/sinks/clickhouse/config.rs index 68941393755a8..cff7412ecc3ad 100644 --- a/src/sinks/clickhouse/config.rs +++ b/src/sinks/clickhouse/config.rs @@ -1,5 +1,3 @@ -use vector_config::configurable_component; - use crate::{ codecs::Transformer, config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}, @@ -13,8 +11,10 @@ use crate::{ }, tls::TlsConfig, }; +use std::collections::BTreeMap; +use vector_config::configurable_component; -use super::http_sink::build_http_sink; +use super::{http_sink::build_http_sink, native::build_native_sink}; /// Configuration for the `clickhouse` sink. #[configurable_component(sink("clickhouse"))] @@ -30,10 +30,16 @@ pub struct ClickhouseConfig { /// The database that contains the table that data will be inserted into. pub database: Option, + /// If true`, ClickHouse Native Protocol is used. Defaults to `false`, using `JSONEachRow` over HTTP. + #[serde(default)] + pub use_native_proto: bool, /// Sets `input_format_skip_unknown_fields`, allowing Clickhouse to discard fields not present in the table schema. #[serde(default)] pub skip_unknown_fields: bool, + /// SQL table column definition. For example: {"col1":"String", "col_2":"Nullable(UInt16)", ...} + #[serde(default)] + pub sql_table_col_def: BTreeMap, #[configurable(derived)] #[serde(default = "Compression::gzip_default")] @@ -74,9 +80,12 @@ impl_generate_config_from_default!(ClickhouseConfig); #[async_trait::async_trait] impl SinkConfig for ClickhouseConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - // later we can build different sink(http, native) here - // according to the clickhouseConfig - build_http_sink(self, cx).await + // Determine the sink to build from the config. + if !self.use_native_proto { + build_http_sink(self, cx).await + } else { + build_native_sink(self, cx).await + } } fn input(&self) -> Input { @@ -87,3 +96,13 @@ impl SinkConfig for ClickhouseConfig { &self.acknowledgements } } + +#[cfg(test)] +mod tests { + use super::ClickhouseConfig; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } +} diff --git a/src/sinks/clickhouse/mod.rs b/src/sinks/clickhouse/mod.rs index 2f4e5af1870fc..b998ca78da4fc 100644 --- a/src/sinks/clickhouse/mod.rs +++ b/src/sinks/clickhouse/mod.rs @@ -2,4 +2,6 @@ mod config; mod http_sink; #[cfg(all(test, feature = "clickhouse-integration-tests"))] mod integration_tests; -pub use self::config::ClickhouseConfig; +mod native; + +pub use self::config::*; diff --git a/src/sinks/clickhouse/native/convert.rs b/src/sinks/clickhouse/native/convert.rs new file mode 100644 index 0000000000000..99152f8ba101b --- /dev/null +++ b/src/sinks/clickhouse/native/convert.rs @@ -0,0 +1,270 @@ +use crate::event::LogEvent; +use chrono::TimeZone; +use chrono_tz::Tz; +use clickhouse_rs::types::{Block, DateTimeType, SqlType, Value as CHValue}; +use either::Either; +use snafu::{ResultExt, Snafu}; +use std::{ + collections::HashMap, + net::{Ipv4Addr, Ipv6Addr}, + sync::Arc, +}; +use value::Value; + +pub(super) fn build_block( + schema: Vec<(String, SqlType)>, + events: Vec, +) -> crate::Result { + let mut b = Block::new(); + for ev in &events { + b.push(get_row_from_events(&schema, ev)?) + .map_err(Box::new)?; + } + Ok(b) +} + +fn get_row_from_events( + schema: &Vec<(String, SqlType)>, + event: &LogEvent, +) -> crate::Result> { + let mut row = vec![]; + for (col_name, ty) in schema { + let event_field = event.get(col_name.as_str()); + let column = into_clickhouse_value(event_field, ty).map_err(Box::new)?; + row.push((col_name.clone(), column)) + } + Ok(row) +} + +#[derive(Debug, Snafu)] +enum ConvertError { + #[snafu(display("cannot get data from column {col}"))] + LackOfColumn { col: String }, + #[snafu(display("cannot convert {from} to {to}"))] + TypeMisMatch { from: Value, to: String }, + #[snafu(context(false))] + InvalidUTF8 { source: std::str::Utf8Error }, + #[snafu(display("invalid ip addr: {val}"))] + InvalidIPAddr { + source: std::net::AddrParseError, + val: String, + }, + #[snafu(display("invalid time format, val:{val}, format:{format}"))] + InvalidTimeFormat { + source: chrono::ParseError, + val: String, + format: &'static str, + }, + #[snafu(display("only nullable type can be empty"))] + NoValue, + #[snafu(display("type {to} isn't supported, please file an issue"))] + UnsupportedType { to: SqlType }, +} + +type CResult = std::result::Result; + +fn into_integer(v: Option<&Value>, conv: impl Fn(i64) -> CHValue) -> CResult { + match v { + Some(Value::Integer(i)) => Ok(conv(*i)), + Some(Value::Float(f)) => Ok(conv(f.into_inner() as i64)), + Some(inner) => Err(ConvertError::TypeMisMatch { + from: inner.clone(), + to: stringify!($ty).to_string(), + }), + None => Err(ConvertError::NoValue), + } +} + +fn into_float(v: Option<&Value>, to_f32: bool) -> CResult { + match v { + None => Err(ConvertError::NoValue), + Some(Value::Float(v)) => { + if to_f32 { + return Ok(CHValue::Float32(v.as_f32().into())); + } + Ok(CHValue::Float64(v.into_inner())) + } + Some(inner) => { + let target_type = if to_f32 { "f32" } else { "f64" }; + Err(ConvertError::TypeMisMatch { + from: inner.clone(), + to: target_type.to_string(), + }) + } + } +} + +fn into_ip(v: Option<&Value>, conv: impl Fn(&str) -> CResult) -> CResult { + match v { + None => Err(ConvertError::NoValue), + Some(Value::Bytes(bs)) => { + let w = &bs.to_vec()[..]; + let s = std::str::from_utf8(w)?; + conv(s) + } + Some(inner) => Err(ConvertError::TypeMisMatch { + from: inner.clone(), + to: stringify!($chtype).to_string(), + }), + } +} + +fn into_clickhouse_value(v: Option<&Value>, target_type: &SqlType) -> CResult { + match target_type { + SqlType::UInt8 => into_integer(v, |t| CHValue::UInt8(t as u8)), + SqlType::UInt16 => into_integer(v, |t| CHValue::UInt16(t as u16)), + SqlType::UInt32 => into_integer(v, |t| CHValue::UInt32(t as u32)), + SqlType::UInt64 => into_integer(v, |t| CHValue::UInt64(t as u64)), + SqlType::Int8 => into_integer(v, |t| CHValue::Int8(t as i8)), + SqlType::Int16 => into_integer(v, |t| CHValue::Int16(t as i16)), + SqlType::Int32 => into_integer(v, |t| CHValue::Int32(t as i32)), + SqlType::Int64 => into_integer(v, |t| CHValue::Int64(t as i64)), + SqlType::String => into_string(v), + SqlType::FixedString(len) => into_fixed_string(v, *len), + SqlType::Float32 => into_float(v, true), + SqlType::Float64 => into_float(v, false), + SqlType::Date => into_date(v), + SqlType::DateTime(ty) => match ty { + DateTimeType::DateTime32 => into_datetime(v), + DateTimeType::Chrono => Err(ConvertError::UnsupportedType { + to: SqlType::DateTime(DateTimeType::Chrono), + }), + DateTimeType::DateTime64(p, t) => Err(ConvertError::UnsupportedType { + to: SqlType::DateTime(DateTimeType::DateTime64(*p, *t)), + }), + }, + SqlType::Ipv4 => into_ip(v, |s| { + let w: Ipv4Addr = s + .parse() + .context(InvalidIPAddrSnafu { val: s.to_string() })?; + Ok(CHValue::Ipv4(w.octets())) + }), + SqlType::Ipv6 => into_ip(v, |s| { + let w: Ipv6Addr = s + .parse() + .context(InvalidIPAddrSnafu { val: s.to_string() })?; + Ok(CHValue::Ipv6(w.octets())) + }), + SqlType::Nullable(ty) => into_nullable(v, *ty), + SqlType::Array(ty) => into_array(v, ty), + SqlType::Map(_, ty) => into_map(v, ty), + _ => Err(ConvertError::UnsupportedType { + to: target_type.clone(), + }), + } +} + +fn into_nullable(v: Option<&Value>, target_type: &SqlType) -> CResult { + let rs = into_clickhouse_value(v, target_type); + match rs { + Ok(v) => Ok(CHValue::Nullable(Either::Right(Box::new(v)))), + Err(e) if matches!(e, ConvertError::NoValue) => Ok(CHValue::Nullable(Either::Left( + (*target_type).clone().into(), + ))), + Err(e) => Err(e), + } +} + +fn into_array(v: Option<&Value>, target_type: &SqlType) -> CResult { + match v { + None => Err(ConvertError::NoValue), + Some(Value::Array(arr)) => { + let mut w = Vec::with_capacity(arr.len()); + for ev in arr { + w.push(into_clickhouse_value(Some(ev), target_type)?); + } + Ok(CHValue::Array((*target_type).clone().into(), Arc::new(w))) + } + Some(inner) => Err(ConvertError::TypeMisMatch { + from: inner.clone(), + to: target_type.to_string().into_owned(), + }), + } +} + +// only support Map(String, xxx) +fn into_map(v: Option<&Value>, target_type: &SqlType) -> CResult { + match v { + None => Err(ConvertError::NoValue), + Some(Value::Object(bt)) => { + let mut hm = HashMap::with_capacity(bt.len()); + for (k, v) in bt { + hm.insert( + CHValue::String(Arc::new(k.clone().into_bytes())), + into_clickhouse_value(Some(v), target_type)?, + ); + } + Ok(CHValue::Map( + SqlType::String.into(), + (*target_type).clone().into(), + Arc::new(hm), + )) + } + Some(inner) => Err(ConvertError::TypeMisMatch { + from: inner.clone(), + to: target_type.to_string().into_owned(), + }), + } +} + +fn into_string(v: Option<&Value>) -> CResult { + match v { + None => Err(ConvertError::NoValue), + Some(Value::Bytes(bs)) => Ok(CHValue::String(Arc::new(bs.to_vec()))), + Some(inner) => Err(ConvertError::TypeMisMatch { + from: inner.clone(), + to: "string".to_string(), + }), + } +} + +fn into_fixed_string(v: Option<&Value>, len: usize) -> CResult { + match v { + None => Err(ConvertError::NoValue), + Some(Value::Bytes(bs)) => { + let mut w = bs.to_vec(); + w.truncate(len); + Ok(CHValue::String(Arc::new(w))) + } + Some(inner) => Err(ConvertError::TypeMisMatch { + from: inner.clone(), + to: format!("fixedstring{}", len), + }), + } +} + +const TIME_FORMAT: &str = "%d/%m/%Y %H:%M:%S%.9f%z"; + +fn into_date(v: Option<&Value>) -> CResult { + match v { + None => Err(ConvertError::NoValue), + Some(Value::Timestamp(ts)) => { + let s = ts.format(TIME_FORMAT).to_string(); + let t = Tz::UTC + .datetime_from_str(s.as_str(), TIME_FORMAT) + .context(InvalidTimeFormatSnafu { + val: s, + format: TIME_FORMAT, + })? + .date(); + Ok(t.into()) + } + Some(Value::Integer(ts_nano)) => Ok(Tz::UTC.timestamp_nanos(*ts_nano).date().into()), + Some(inner) => Err(ConvertError::TypeMisMatch { + from: inner.clone(), + to: "date".to_string(), + }), + } +} + +fn into_datetime(v: Option<&Value>) -> CResult { + match v { + None => Err(ConvertError::NoValue), + Some(Value::Timestamp(ts)) => Ok((*ts).into()), + Some(Value::Integer(ts_nano)) => Ok(Tz::UTC.timestamp_nanos(*ts_nano).into()), + Some(inner) => Err(ConvertError::TypeMisMatch { + from: inner.clone(), + to: "datetime".to_string(), + }), + } +} diff --git a/src/sinks/clickhouse/native/mod.rs b/src/sinks/clickhouse/native/mod.rs new file mode 100644 index 0000000000000..57a61bea43825 --- /dev/null +++ b/src/sinks/clickhouse/native/mod.rs @@ -0,0 +1,6 @@ +mod convert; +mod native_service; +mod native_sink; +mod parse; + +pub use native_sink::build_native_sink; diff --git a/src/sinks/clickhouse/native/native_service.rs b/src/sinks/clickhouse/native/native_service.rs new file mode 100644 index 0000000000000..6341942a6a646 --- /dev/null +++ b/src/sinks/clickhouse/native/native_service.rs @@ -0,0 +1,70 @@ +use super::convert::build_block; +use crate::event::{EventStatus, LogEvent}; +use clickhouse_rs::{types::SqlType, Pool}; +use futures::future::BoxFuture; +use std::task::{Context, Poll}; +use vector_common::{internal_event::CountByteSize, Error}; +use vector_core::{stream::DriverResponse, ByteSizeOf}; + +pub(super) struct ClickhouseResponse { + pub(super) event_count: usize, + pub(super) event_byte_size: usize, +} + +impl DriverResponse for ClickhouseResponse { + fn event_status(&self) -> EventStatus { + EventStatus::Delivered + } + + fn events_sent(&self) -> CountByteSize { + CountByteSize(self.event_count, self.event_byte_size) + } +} + +pub(super) struct ClickhouseService { + pool: Pool, + schema: Vec<(String, SqlType)>, + table: String, +} + +impl tower::Service> for ClickhouseService { + type Response = ClickhouseResponse; + type Error = Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + // Readiness check of the client is done through the `push_events()` + // call happening inside `call()`. That check blocks until the client is + // ready to perform another request. + // + // See: + Poll::Ready(Ok(())) + } + + fn call(&mut self, events: Vec) -> Self::Future { + let pool = self.pool.clone(); + let event_count = events.len(); + let event_byte_size = events.iter().map(|e| e.size_of()).sum(); + let schema = self.schema.clone(); + let table = self.table.clone(); + Box::pin(async move { + let block = build_block(schema, events)?; + let mut handle = pool.get_handle().await?; + handle.insert(table, block).await?; + Ok(ClickhouseResponse { + event_count, + event_byte_size, + }) + }) + } +} + +impl ClickhouseService { + pub(super) fn new(pool: Pool, schema: Vec<(String, SqlType)>, table: String) -> Self { + Self { + pool, + schema, + table, + } + } +} diff --git a/src/sinks/clickhouse/native/native_sink.rs b/src/sinks/clickhouse/native/native_sink.rs new file mode 100644 index 0000000000000..222af0195ff2a --- /dev/null +++ b/src/sinks/clickhouse/native/native_sink.rs @@ -0,0 +1,88 @@ +use std::collections::BTreeMap; + +use super::{super::ClickhouseConfig, native_service::ClickhouseService, parse::parse_field_type}; +use crate::{ + config::{SinkContext, SinkHealthcheckOptions}, + event::Event, + sinks::{ + util::{SinkBuilderExt, StreamSink}, + Healthcheck, VectorSink, + }, +}; +use async_trait::async_trait; +use clickhouse_rs::{types::SqlType, Pool}; +use futures::{stream::BoxStream, StreamExt}; +use vector_core::stream::BatcherSettings; + +pub async fn build_native_sink( + cfg: &ClickhouseConfig, + cx: SinkContext, +) -> crate::Result<(VectorSink, Healthcheck)> { + let table_schema = gen_table_schema(&cfg.sql_table_col_def)?; + let batch = cfg.batch.into_batcher_settings()?; + let pool = Pool::new(cfg.endpoint.to_string()); + let sink = NativeClickhouseSink::new(pool.clone(), batch, cfg.table.clone(), table_schema); + let health_check = healthcheck(pool, cx.healthcheck); + Ok(( + VectorSink::from_event_streamsink(sink), + Box::pin(health_check), + )) +} + +fn gen_table_schema(table: &BTreeMap) -> crate::Result> { + table + .iter() + .map(|(k, v)| { + parse_field_type(v.as_str()) + .map(|(_, t)| (k.to_owned(), t)) + .map_err(|e| e.to_string().into()) + }) + .collect() +} + +async fn healthcheck(pool: Pool, opts: SinkHealthcheckOptions) -> crate::Result<()> { + if !opts.enabled { + return Ok(()); + } + let mut client = pool.get_handle().await?; + client.ping().await.map_err(|e| e.into()) +} + +struct NativeClickhouseSink { + pool: Pool, + batch: BatcherSettings, + table: String, + table_schema: Vec<(String, SqlType)>, +} + +impl NativeClickhouseSink { + fn new( + pool: Pool, + batch: BatcherSettings, + table: String, + table_schema: Vec<(String, SqlType)>, + ) -> Self { + Self { + pool, + batch, + table, + table_schema, + } + } +} + +#[async_trait] +impl StreamSink for NativeClickhouseSink { + async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .map(|e| e.into_log()) + .batched(self.batch.into_byte_size_config()) + .into_driver(ClickhouseService::new( + self.pool, + self.table_schema, + self.table, + )) + .run() + .await + } +} diff --git a/src/sinks/clickhouse/native/parse.rs b/src/sinks/clickhouse/native/parse.rs new file mode 100644 index 0000000000000..ceec8ee0318b8 --- /dev/null +++ b/src/sinks/clickhouse/native/parse.rs @@ -0,0 +1,191 @@ +use chrono_tz::Tz; +use clickhouse_rs::types::{DateTimeType, SqlType}; +use nom::{ + branch::alt, + bytes::complete::{tag, take_until1}, + character::complete::{u32 as p_u32, u64 as p_u64, u8 as p_u8}, + combinator::{all_consuming, map_res}, + sequence::{delimited, pair, preceded}, + IResult, +}; + +pub(super) fn parse_field_type(s: &str) -> IResult<&str, SqlType> { + all_consuming(parse_sql_type)(s) +} + +fn parse_sql_type(s: &str) -> IResult<&str, SqlType> { + alt(( + // types that can be wrapped by Nullable(xxx) + parse_nullable_inner, + // Nullable + parse_nullable, + // types that can NOT be wrapped by Nullable + parse_array, + parse_map, + ))(s) +} + +fn parse_static_type(s: &str) -> IResult<&str, SqlType> { + alt(( + |i| tag("UInt8")(i).map(|(rest, _)| (rest, SqlType::UInt8)), + |i| tag("UInt16")(i).map(|(rest, _)| (rest, SqlType::UInt16)), + |i| tag("UInt32")(i).map(|(rest, _)| (rest, SqlType::UInt32)), + |i| tag("UInt64")(i).map(|(rest, _)| (rest, SqlType::UInt64)), + |i| tag("Int8")(i).map(|(rest, _)| (rest, SqlType::Int8)), + |i| tag("Int16")(i).map(|(rest, _)| (rest, SqlType::Int16)), + |i| tag("Int32")(i).map(|(rest, _)| (rest, SqlType::Int32)), + |i| tag("Int64")(i).map(|(rest, _)| (rest, SqlType::Int64)), + |i| tag("Float32")(i).map(|(rest, _)| (rest, SqlType::Float32)), + |i| tag("Float64")(i).map(|(rest, _)| (rest, SqlType::Float64)), + |i| tag("String")(i).map(|(rest, _)| (rest, SqlType::String)), + |i| tag("UUID")(i).map(|(rest, _)| (rest, SqlType::Uuid)), + |i| tag("Date")(i).map(|(rest, _)| (rest, SqlType::Date)), + |i| tag("IPv4")(i).map(|(rest, _)| (rest, SqlType::Ipv4)), + |i| tag("IPv6")(i).map(|(rest, _)| (rest, SqlType::Ipv6)), + ))(s) +} + +fn parse_array(s: &str) -> IResult<&str, SqlType> { + preceded(tag("Array"), delimited(tag("("), parse_sql_type, tag(")")))(s) + .map(|(rest, v)| (rest, SqlType::Array(v.into()))) +} + +fn parse_map(s: &str) -> IResult<&str, SqlType> { + preceded( + tag("Map"), + delimited( + tag("("), + pair(parse_sql_type, preceded(tag(","), parse_sql_type)), + tag(")"), + ), + )(s) + .map(|(rest, (k, v))| (rest, SqlType::Map(k.into(), v.into()))) +} + +fn parse_nullable(s: &str) -> IResult<&str, SqlType> { + preceded( + tag("Nullable"), + delimited(tag("("), parse_nullable_inner, tag(")")), + )(s) + .map(|(rest, v)| (rest, SqlType::Nullable(v.into()))) +} + +fn parse_nullable_inner(s: &str) -> IResult<&str, SqlType> { + alt(( + parse_datetime64, + parse_datetime, + parse_static_type, + parse_fixed_string, + parse_decimal, + ))(s) +} + +fn parse_datetime(s: &str) -> IResult<&str, SqlType> { + tag("DateTime")(s).map(|(rest, _)| (rest, SqlType::DateTime(DateTimeType::DateTime32))) +} + +fn parse_datetime64(s: &str) -> IResult<&str, SqlType> { + preceded( + tag("DateTime64"), + delimited( + tag("("), + pair( + p_u32, + preceded( + tag(","), + map_res(take_until1(")"), |s: &str| s.parse::()), + ), + ), + tag(")"), + ), + )(s) + .map(|(rest, (precision, tz))| { + ( + rest, + SqlType::DateTime(DateTimeType::DateTime64(precision, tz)), + ) + }) +} + +fn parse_fixed_string(s: &str) -> IResult<&str, SqlType> { + preceded(tag("FixedString"), delimited(tag("("), p_u64, tag(")")))(s) + .map(|(rest, v)| (rest, SqlType::FixedString(v as usize))) +} + +fn parse_decimal(s: &str) -> IResult<&str, SqlType> { + preceded( + tag("Decimal"), + delimited(tag("("), pair(p_u8, preceded(tag(","), p_u8)), tag(")")), + )(s) + .map(|(rest, (p, s))| (rest, SqlType::Decimal(p, s))) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono_tz::Tz; + use clickhouse_rs::types::{DateTimeType, SqlType}; + + #[test] + fn test_parse_datetime64() { + let table = vec![( + "DateTime64(3,Asia/Shanghai)", + SqlType::DateTime(DateTimeType::DateTime64(3, Tz::Asia__Shanghai)), + )]; + for (s, expect) in table { + let (_, actual) = parse_datetime64(s).unwrap(); + assert_eq!(actual, expect); + } + } + + #[test] + fn test_table() { + let table = vec![ + ( + "Nullable(UInt16)", + SqlType::Nullable(SqlType::UInt16.into()), + ), + ( + "Array(Nullable(String))", + SqlType::Array(SqlType::Nullable(SqlType::String.into()).into()), + ), + ( + "Map(Float32,Date)", + SqlType::Map(SqlType::Float32.into(), SqlType::Date.into()), + ), + ( + "Map(Int64,FixedString(6))", + SqlType::Map(SqlType::Int64.into(), SqlType::FixedString(6).into()), + ), + ( + "Map(Float64,Nullable(UUID))", + SqlType::Map( + SqlType::Float64.into(), + SqlType::Nullable(SqlType::Uuid.into()).into(), + ), + ), + ( + "Map(DateTime64(3,Asia/Shanghai),Nullable(Decimal(9,5)))", + SqlType::Map( + SqlType::DateTime(DateTimeType::DateTime64(3, Tz::Asia__Shanghai)).into(), + SqlType::Nullable(SqlType::Decimal(9, 5).into()).into(), + ), + ), + ("IPv4", SqlType::Ipv4), + ("IPv6", SqlType::Ipv6), + ]; + for (s, expect) in table { + let (rest, actual) = parse_field_type(s).unwrap(); + assert!(rest.is_empty()); + assert_eq!(actual, expect); + } + } + + #[test] + fn test_nullable_cannot_wrap() { + let table = vec!["Nullable(Array(UInt8))", "Nullable(Map(String,String))"]; + for s in table { + assert!(parse_field_type(s).is_err()) + } + } +} diff --git a/website/cue/reference/components/sinks/clickhouse.cue b/website/cue/reference/components/sinks/clickhouse.cue index 56cd17e1fd7d8..44383ab77545d 100644 --- a/website/cue/reference/components/sinks/clickhouse.cue +++ b/website/cue/reference/components/sinks/clickhouse.cue @@ -90,9 +90,14 @@ components: sinks: clickhouse: { description: "The endpoint of the [Clickhouse](\(urls.clickhouse)) server." required: true type: string: { - examples: ["http://localhost:8123"] + examples: ["http://localhost:8123", "tcp://localhost:9223"] } } + use_native_proto: { + description: "If true`, ClickHouse Native Protocol is used. Defaults to `false`, using `JSONEachRow` over HTTP." + required: false + type: bool: default: false + } table: { description: "The table that data will be inserted into." required: true @@ -100,6 +105,41 @@ components: sinks: clickhouse: { examples: ["mytable"] } } + sql_table_col_def: { + description: """ + The clickhouse table column definition. + If use_native_proto is true, this field must be configured! + The key represents not only the column name of clickhouse table but also the key of the log. + The value now only supports limited type: + _type: UInt8,16,32,64 Int8,16,32,64 String FixedString(int) Float32,64 Date DateTime IPv4,6 + Array(_type) Nullable(_type) Map(String, _type) + + Note: for now, empty space is not acceptable in type definition, which means + Map(String,UInt8) Nullable(Date) are valid + Map( String, UInt8) Nullable(Date ) is invalid + """ + required: false + type: object: { + examples: [ + { + "name": "String" + "age": "UInt8" + "hobbites": "Map(String,String)" + } + ] + options: { + "*": { + common: false + description: "clickhouse table difinition" + required: false + type: string: { + default: null + examples: ["String", "Map(String,Date)", "Array(Int64)"] + } + } + } + } + } skip_unknown_fields: { common: true description: "Sets `input_format_skip_unknown_fields`, allowing Clickhouse to discard fields not present in the table schema."