diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 0b1d884..3d3e312 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -34,7 +34,7 @@ jobs: - name: Build env: GOPROXY: "https://proxy.golang.org" - run: go build . + run: go build ./example/cmd/simple/main.go - name: Test env: diff --git a/README.md b/README.md index cac191e..ecb21d1 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -[![build](https://github.com/zikwall/clickhouse-buffer/workflows/build_and_tests/badge.svg)](https://github.com/zikwall/clickhouse-buffer/v2/actions) -[![build](https://github.com/zikwall/clickhouse-buffer/workflows/golangci_lint/badge.svg)](https://github.com/zikwall/clickhouse-buffer/v2/actions) +[![build](https://github.com/zikwall/clickhouse-buffer/workflows/build_and_tests/badge.svg)](https://github.com/zikwall/clickhouse-buffer/v3/actions) +[![build](https://github.com/zikwall/clickhouse-buffer/workflows/golangci_lint/badge.svg)](https://github.com/zikwall/clickhouse-buffer/v3/actions)

Clickhouse Buffer

@@ -9,7 +9,7 @@ ## Install - for go-clickhouse v1 `$ go get -u github.com/zikwall/clickhouse-buffer` -- for go-clickhouse v2 `$ go get -u github.com/zikwall/clickhouse-buffer/v2` +- for go-clickhouse v2 `$ go get -u github.com/zikwall/clickhouse-buffer/v3` ### Why and why @@ -33,6 +33,7 @@ This is due to the fact that Clickhouse is designed so that it better processes - [x] **in-memory** - use native channels and slices - [x] **redis** - use redis server as queue and buffer +- [x] **in-memory-sync** - if you get direct access to buffer, it will help to avoid data race - [x] **retries** - resending "broken" or for some reason not sent packets ### Usage @@ -41,15 +42,15 @@ This is due to the fact that Clickhouse is designed so that it better processes import ( "database/sql" - cxnative "github.com/zikwall/clickhouse-buffer/v2/src/database/native" - cxsql "github.com/zikwall/clickhouse-buffer/v2/src/database/sql" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxsql" ) // if you already have a connection to Clickhouse you can just use wrappers // with native interface -ch := cxnative.NewClickhouseWithConn(conn: driver.Conn) +ch := cxnative.NewClickhouseWithConn(driver.Conn) // or use database/sql interface -ch := cxsql.NewClickhouseWithConn(conn: *sql.DB) +ch := cxsql.NewClickhouseWithConn(*sql.DB) ``` ```go @@ -57,7 +58,7 @@ ch := cxsql.NewClickhouseWithConn(conn: *sql.DB) // package can do it for you, just call the connection option you need: // with native interface -ch, conn, err := cxnative.NewClickhouse(ctx,&clickhouse.Options{ +ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{ Addr: ctx.StringSlice("clickhouse-host"), Auth: clickhouse.Auth{ Database: ctx.String("clickhouse-database"), @@ -96,17 +97,16 @@ ch, conn, err := cxsql.NewClickhouse(ctx, &clickhouse.Options{ ```go import ( - cx "github.com/zikwall/clickhouse-buffer/v2" - cxbuffer "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - cxmemory "github.com/zikwall/clickhouse-buffer/v2/src/buffer/memory" - cxredis "github.com/zikwall/clickhouse-buffer/v2/src/buffer/redis" + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxmem" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative" ) // create root client -client := cx.NewClientWithOptions(ctx, ch, - cx.DefaultOptions().SetFlushInterval(1000).SetBatchSize(5000), +client := clickhousebuffer.NewClientWithOptions(ctx, ch, + clickhousebuffer.DefaultOptions().SetFlushInterval(1000).SetBatchSize(5000), ) // create buffer engine -buffer := cxmemory.NewBuffer( +buffer := cxmem.NewBuffer( client.Options().BatchSize(), ) // or use redis @@ -114,10 +114,10 @@ buffer := cxredis.NewBuffer( contetx, *redis.Client, "bucket", client.Options().BatchSize(), ) // create new writer api: table name with columns -writeAPI := client.Writer(cxbuffer.View{ - Name: "clickhouse_database.clickhouse_table", - Columns: []string{"id", "uuid", "insert_ts"}, -}, buffer) +writeAPI := client.Writer( + cx.NewView("clickhouse_database.clickhouse_table", []string{"id", "uuid", "insert_ts"}), + buffer, +) // define your custom data structure type MyCustomDataView struct { @@ -125,9 +125,9 @@ type MyCustomDataView struct { uuid string insertTS time.Time } -// and implement cxbuffer.Inline interface -func (t *MyCustomDataView) Row() cxbuffer.RowSlice { - return cxbuffer.RowSlice{t.id, t.uuid, t.insertTS.Format(time.RFC822)} +// and implement cxbuffer.Vectorable interface +func (t *MyCustomDataView) Row() cx.Vector { + return cx.Vector{t.id, t.uuid, t.insertTS.Format(time.RFC822)} } // async write your data writeAPI.WriteRow(&MyCustomDataView{ @@ -150,12 +150,12 @@ Using the blocking writer interface ```go // create new writer api: table name with columns -writerBlocking := client.WriterBlocking(cxbuffer.View{ +writerBlocking := client.WriterBlocking(cx.View{ Name: "clickhouse_database.clickhouse_table", Columns: []string{"id", "uuid", "insert_ts"}, }) // non-asynchronous writing of data directly to Clickhouse -err := writerBlocking.WriteRow(ctx, []MyCustomDataView{ +err := writerBlocking.WriteRow(ctx, []&MyCustomDataView{ { id: 1, uuid: "1", insertTS: time.Now(), }, @@ -183,15 +183,15 @@ You can implement queue engine by defining the `Queueable` interface: ```go type Queueable interface { - Queue(packet *retryPacket) - Retries() <-chan *retryPacket + Queue(packet *Packet) + Retries() <-chan *Packet } ``` and set it as an engine: ```go -cx.DefaultOptions().SetDebugMode(true).SetRetryIsEnabled(true).SetQueueEngine(CustomQueueable) +clickhousebuffer.DefaultOptions().SetDebugMode(true).SetRetryIsEnabled(true).SetQueueEngine(CustomQueueable) ``` #### Logs: @@ -207,7 +207,7 @@ type Logger interface { ```go // example with default options -cx.DefaultOptions().SetDebugMode(true).SetLogger(SomeLogger) +clickhousebuffer.DefaultOptions().SetDebugMode(true).SetLogger(SomeLogger) ``` #### Tests: diff --git a/client.go b/client.go index 92235de..570f077 100644 --- a/client.go +++ b/client.go @@ -2,27 +2,130 @@ package clickhousebuffer import ( "context" + "sync" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - "github.com/zikwall/clickhouse-buffer/v2/src/database" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" + "github.com/zikwall/clickhouse-buffer/v3/src/retry" ) type Client interface { // Options returns the options associated with client Options() *Options - // HandleStream method for processing data x and sending it to Clickhouse - HandleStream(database.View, *buffer.Batch) error // WriteBatch method of sending data to Clickhouse is used implicitly in a non - blocking record, // and explicitly in a blocking record - WriteBatch(context.Context, database.View, *buffer.Batch) error + WriteBatch(context.Context, cx.View, *cx.Batch) error // Writer returns the asynchronous, non-blocking, Writer client. // Ensures using a single Writer instance for each table pair. - Writer(database.View, buffer.Buffer) Writer + Writer(cx.View, cx.Buffer) Writer // WriterBlocking returns the synchronous, blocking, WriterBlocking client. // Ensures using a single WriterBlocking instance for each table pair. - WriterBlocking(database.View) WriterBlocking + WriterBlocking(cx.View) WriterBlocking // RetryClient Get retry client - RetryClient() Retryable + RetryClient() retry.Retryable // Close ensures all ongoing asynchronous write clients finish. Close() } + +type clientImpl struct { + context context.Context + clickhouse cx.Clickhouse + options *Options + writeAPIs map[string]Writer + syncWriteAPIs map[string]WriterBlocking + mu sync.RWMutex + retry retry.Retryable + logger cx.Logger +} + +func NewClient(ctx context.Context, clickhouse cx.Clickhouse) Client { + return NewClientWithOptions(ctx, clickhouse, DefaultOptions()) +} + +func NewClientWithOptions(ctx context.Context, clickhouse cx.Clickhouse, options *Options) Client { + if options.logger == nil { + options.logger = cx.NewDefaultLogger() + } + client := &clientImpl{ + context: ctx, + clickhouse: clickhouse, + options: options, + writeAPIs: map[string]Writer{}, + syncWriteAPIs: map[string]WriterBlocking{}, + logger: options.logger, + } + if options.isRetryEnabled { + if options.queue == nil { + options.queue = retry.NewImMemoryQueueEngine() + } + client.retry = retry.NewRetry( + ctx, options.queue, retry.NewDefaultWriter(clickhouse), options.logger, options.isDebug, + ) + } + return client +} + +func (c *clientImpl) Options() *Options { + return c.options +} + +func (c *clientImpl) Writer(view cx.View, buf cx.Buffer) Writer { + key := view.Name + c.mu.Lock() + if _, ok := c.writeAPIs[key]; !ok { + c.writeAPIs[key] = NewWriter(c.context, c, view, buf) + } + writer := c.writeAPIs[key] + c.mu.Unlock() + return writer +} + +func (c *clientImpl) WriterBlocking(view cx.View) WriterBlocking { + key := view.Name + c.mu.Lock() + if _, ok := c.syncWriteAPIs[key]; !ok { + c.syncWriteAPIs[key] = NewWriterBlocking(c, view) + } + writer := c.syncWriteAPIs[key] + c.mu.Unlock() + return writer +} + +func (c *clientImpl) Close() { + if c.options.isDebug { + c.logger.Log("close clickhouse buffer client") + c.logger.Log("close async writers") + } + // closing and destroying all asynchronous writers + c.mu.Lock() + for key, w := range c.writeAPIs { + w.Close() + delete(c.writeAPIs, key) + } + c.mu.Unlock() + // closing and destroying all synchronous writers + if c.options.isDebug { + c.logger.Log("close sync writers") + } + c.mu.Lock() + for key := range c.syncWriteAPIs { + delete(c.syncWriteAPIs, key) + } + c.mu.Unlock() +} + +func (c *clientImpl) WriteBatch(ctx context.Context, view cx.View, batch *cx.Batch) error { + _, err := c.clickhouse.Insert(ctx, view, batch.Rows()) + if err != nil { + // if there is an acceptable error and if the functionality of resending data is activated, + // try to repeat the operation + if c.options.isRetryEnabled && cx.IsResendAvailable(err) { + c.retry.Retry(retry.NewPacket(view, batch)) + } + return err + } + return nil +} + +func (c *clientImpl) RetryClient() retry.Retryable { + return c.retry +} diff --git a/client_impl.go b/client_impl.go deleted file mode 100644 index 9fdc70a..0000000 --- a/client_impl.go +++ /dev/null @@ -1,121 +0,0 @@ -package clickhousebuffer - -import ( - "context" - "sync" - - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - "github.com/zikwall/clickhouse-buffer/v2/src/database" -) - -type clientImpl struct { - context context.Context - clickhouse database.Clickhouse - options *Options - writeAPIs map[string]Writer - syncWriteAPIs map[string]WriterBlocking - mu sync.RWMutex - retry Retryable - logger Logger -} - -func NewClient(ctx context.Context, clickhouse database.Clickhouse) Client { - return NewClientWithOptions(ctx, clickhouse, DefaultOptions()) -} - -func NewClientWithOptions(ctx context.Context, clickhouse database.Clickhouse, options *Options) Client { - if options.logger == nil { - options.logger = newDefaultLogger() - } - client := &clientImpl{ - context: ctx, - clickhouse: clickhouse, - options: options, - writeAPIs: map[string]Writer{}, - syncWriteAPIs: map[string]WriterBlocking{}, - logger: options.logger, - } - if options.isRetryEnabled { - if options.queue == nil { - options.queue = newImMemoryQueueEngine() - } - client.retry = NewRetry( - ctx, options.queue, NewDefaultWriter(clickhouse), options.logger, options.isDebug, - ) - } - return client -} - -func (cs *clientImpl) Options() *Options { - return cs.options -} - -func (cs *clientImpl) Writer(view database.View, buf buffer.Buffer) Writer { - key := view.Name - cs.mu.Lock() - if _, ok := cs.writeAPIs[key]; !ok { - cs.writeAPIs[key] = NewWriter(cs, view, buf, cs.options) - } - writer := cs.writeAPIs[key] - cs.mu.Unlock() - return writer -} - -func (cs *clientImpl) WriterBlocking(view database.View) WriterBlocking { - key := view.Name - cs.mu.Lock() - if _, ok := cs.syncWriteAPIs[key]; !ok { - cs.syncWriteAPIs[key] = NewWriterBlocking(cs, view) - } - writer := cs.syncWriteAPIs[key] - cs.mu.Unlock() - return writer -} - -func (cs *clientImpl) Close() { - if cs.options.isDebug { - cs.logger.Log("close clickhouse buffer client") - cs.logger.Log("close async writers") - } - // Closing and destroying all asynchronous writers - cs.mu.Lock() - for key, w := range cs.writeAPIs { - w.Close() - delete(cs.writeAPIs, key) - } - cs.mu.Unlock() - // Closing and destroying all synchronous writers - if cs.options.isDebug { - cs.logger.Log("close sync writers") - } - cs.mu.Lock() - for key := range cs.syncWriteAPIs { - delete(cs.syncWriteAPIs, key) - } - cs.mu.Unlock() -} - -func (cs *clientImpl) HandleStream(view database.View, btc *buffer.Batch) error { - err := cs.WriteBatch(cs.context, view, btc) - if err != nil { - // If there is an acceptable error and if the functionality of resending data is activated, - // try to repeat the operation - if cs.options.isRetryEnabled && database.IsResendAvailable(err) { - cs.retry.Retry(&retryPacket{ - view: view, - btc: btc, - }) - } - return err - } - return nil -} - -func (cs *clientImpl) WriteBatch(ctx context.Context, view database.View, batch *buffer.Batch) error { - _, err := cs.clickhouse.Insert(ctx, view, batch.Rows()) - return err -} - -func (cs *clientImpl) RetryClient() Retryable { - return cs.retry -} diff --git a/example/cmd/redis/main.go b/example/cmd/redis/main.go index 0d8ee2a..428ce9b 100644 --- a/example/cmd/redis/main.go +++ b/example/cmd/redis/main.go @@ -11,11 +11,11 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/go-redis/redis/v8" - cx "github.com/zikwall/clickhouse-buffer/v2" - "github.com/zikwall/clickhouse-buffer/v2/example/pkg/tables" - cxredis "github.com/zikwall/clickhouse-buffer/v2/src/buffer/redis" - cxbase "github.com/zikwall/clickhouse-buffer/v2/src/database" - cxnative "github.com/zikwall/clickhouse-buffer/v2/src/database/native" + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/example/pkg/tables" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxredis" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative" ) func main() { @@ -51,8 +51,8 @@ func main() { if err := tables.CreateTableNative(ctx, conn); err != nil { log.Panicln(err) } - client := cx.NewClientWithOptions(ctx, ch, - cx.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, + clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), ) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, @@ -61,11 +61,7 @@ func main() { if err != nil { log.Panicln(err) } - writeAPI := client.Writer(cxbase.View{ - Name: tables.ExampleTableName(), - Columns: tables.ExampleTableColumns(), - }, rxbuffer) - + writeAPI := client.Writer(cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer) wg := sync.WaitGroup{} wg.Add(1) go func() { diff --git a/example/cmd/redis_sql/main.go b/example/cmd/redis_sql/main.go index 43ec584..0ab509a 100644 --- a/example/cmd/redis_sql/main.go +++ b/example/cmd/redis_sql/main.go @@ -11,11 +11,11 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/go-redis/redis/v8" - cx "github.com/zikwall/clickhouse-buffer/v2" - "github.com/zikwall/clickhouse-buffer/v2/example/pkg/tables" - cxredis "github.com/zikwall/clickhouse-buffer/v2/src/buffer/redis" - cxbase "github.com/zikwall/clickhouse-buffer/v2/src/database" - cxsql "github.com/zikwall/clickhouse-buffer/v2/src/database/sql" + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/example/pkg/tables" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxredis" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxsql" ) func main() { @@ -51,8 +51,8 @@ func main() { if err := tables.CreateTableSQL(ctx, conn); err != nil { log.Panicln(err) } - client := cx.NewClientWithOptions(ctx, ch, - cx.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, + clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), ) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, @@ -61,10 +61,7 @@ func main() { if err != nil { log.Panicln(err) } - writeAPI := client.Writer(cxbase.View{ - Name: tables.ExampleTableName(), - Columns: tables.ExampleTableColumns(), - }, rxbuffer) + writeAPI := client.Writer(cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), rxbuffer) wg := sync.WaitGroup{} wg.Add(1) diff --git a/example/cmd/simple/main.go b/example/cmd/simple/main.go index eda1ce2..d9a087b 100644 --- a/example/cmd/simple/main.go +++ b/example/cmd/simple/main.go @@ -10,11 +10,11 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" - cx "github.com/zikwall/clickhouse-buffer/v2" - "github.com/zikwall/clickhouse-buffer/v2/example/pkg/tables" - cxmemory "github.com/zikwall/clickhouse-buffer/v2/src/buffer/memory" - cxbase "github.com/zikwall/clickhouse-buffer/v2/src/database" - cxnative "github.com/zikwall/clickhouse-buffer/v2/src/database/native" + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/example/pkg/tables" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxmem" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative" ) func main() { @@ -48,14 +48,14 @@ func main() { if err := tables.CreateTableNative(ctx, conn); err != nil { log.Panicln(err) } - client := cx.NewClientWithOptions(ctx, ch, - cx.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, + clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), ) - writeAPI := client.Writer(cxbase.View{ - Name: tables.ExampleTableName(), - Columns: tables.ExampleTableColumns(), - }, cxmemory.NewBuffer(client.Options().BatchSize())) + writeAPI := client.Writer( + cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), + cxmem.NewBuffer(client.Options().BatchSize()), + ) wg := sync.WaitGroup{} wg.Add(1) diff --git a/example/cmd/simple_2/main.go b/example/cmd/simple_2/main.go new file mode 100644 index 0000000..0a8ee66 --- /dev/null +++ b/example/cmd/simple_2/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "fmt" + "log" + "os" + "time" + + "github.com/ClickHouse/clickhouse-go/v2" + + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/example/pkg/tables" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxmem" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative" +) + +func main() { + hostname := os.Getenv("CLICKHOUSE_HOST") + username := os.Getenv("CLICKHOUSE_USER") + database := os.Getenv("CLICKHOUSE_DB") + password := os.Getenv("CLICKHOUSE_PASS") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{ + Addr: []string{hostname}, + Auth: clickhouse.Auth{ + Database: database, + Username: username, + Password: password, + }, + Settings: clickhouse.Settings{ + "max_execution_time": 60, + }, + DialTimeout: 5 * time.Second, + Compression: &clickhouse.Compression{ + Method: clickhouse.CompressionLZ4, + }, + Debug: true, + }) + if err != nil { + log.Panicln(err) + } + if err := tables.CreateTableNative(ctx, conn); err != nil { + log.Panicln(err) + } + client := clickhousebuffer.NewClientWithOptions(ctx, ch, + clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + ) + + writeAPI := client.Writer( + cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), + cxmem.NewBuffer(client.Options().BatchSize()), + ) + + int32s := []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + for _, val := range int32s { + writeAPI.WriteRow(&tables.ExampleTable{ + ID: val, UUID: fmt.Sprintf("uuidf %d", val), InsertTS: time.Now(), + }) + } + + <-time.After(time.Second * 2) + client.Close() +} diff --git a/example/cmd/simple_sql/main.go b/example/cmd/simple_sql/main.go index 232fddb..c6b80e4 100644 --- a/example/cmd/simple_sql/main.go +++ b/example/cmd/simple_sql/main.go @@ -10,11 +10,11 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" - cx "github.com/zikwall/clickhouse-buffer/v2" - "github.com/zikwall/clickhouse-buffer/v2/example/pkg/tables" - cxmemory "github.com/zikwall/clickhouse-buffer/v2/src/buffer/memory" - cxbase "github.com/zikwall/clickhouse-buffer/v2/src/database" - cxsql "github.com/zikwall/clickhouse-buffer/v2/src/database/sql" + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/example/pkg/tables" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxmem" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxsql" ) func main() { @@ -48,14 +48,14 @@ func main() { if err := tables.CreateTableSQL(ctx, conn); err != nil { log.Panicln(err) } - client := cx.NewClientWithOptions(ctx, ch, - cx.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), + client := clickhousebuffer.NewClientWithOptions(ctx, ch, + clickhousebuffer.DefaultOptions().SetDebugMode(true).SetFlushInterval(1000).SetBatchSize(5), ) - writeAPI := client.Writer(cxbase.View{ - Name: tables.ExampleTableName(), - Columns: tables.ExampleTableColumns(), - }, cxmemory.NewBuffer(client.Options().BatchSize())) + writeAPI := client.Writer( + cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), + cxmem.NewBuffer(client.Options().BatchSize()), + ) wg := sync.WaitGroup{} wg.Add(1) diff --git a/example/pkg/tables/example.go b/example/pkg/tables/example.go index 0f0a4c9..e6cdc20 100644 --- a/example/pkg/tables/example.go +++ b/example/pkg/tables/example.go @@ -7,8 +7,7 @@ import ( "time" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" - - cxbuffer "github.com/zikwall/clickhouse-buffer/v2/src/buffer" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) type ExampleTable struct { @@ -17,8 +16,8 @@ type ExampleTable struct { InsertTS time.Time } -func (t *ExampleTable) Row() cxbuffer.RowSlice { - return cxbuffer.RowSlice{t.ID, t.UUID, t.InsertTS.Format(time.RFC822)} +func (t *ExampleTable) Row() cx.Vector { + return cx.Vector{t.ID, t.UUID, t.InsertTS.Format(time.RFC822)} } func ExampleTableName() string { diff --git a/go.mod b/go.mod index 43d254b..bcba7bc 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,9 @@ -module github.com/zikwall/clickhouse-buffer/v2 +module github.com/zikwall/clickhouse-buffer/v3 go 1.18 require ( - github.com/ClickHouse/clickhouse-go/v2 v2.0.14 + github.com/ClickHouse/clickhouse-go/v2 v2.0.15 github.com/Rican7/retry v0.3.1 github.com/go-redis/redis/v8 v8.11.5 ) diff --git a/go.sum b/go.sum index dce6c65..cf48e2b 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,6 @@ github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= -github.com/ClickHouse/clickhouse-go/v2 v2.0.14 h1:7HW+MXPaQfVyCzPGEn/LciMc8K6cG58FZMUc7DXQmro= -github.com/ClickHouse/clickhouse-go/v2 v2.0.14/go.mod h1:iq2DUGgpA4BBki2CVwrF8x43zqBjdgHtbexkFkh5a6M= +github.com/ClickHouse/clickhouse-go/v2 v2.0.15 h1:lLAZliqrZEygkxosLaW1qHyeTb4Ho7fVCZ0WKCpLocU= +github.com/ClickHouse/clickhouse-go/v2 v2.0.15/go.mod h1:Z21o82zD8FFqefOQDg93c0XITlxGbTsWQuRm588Azkk= github.com/Rican7/retry v0.3.1 h1:scY4IbO8swckzoA/11HgBwaZRJEyY9vaNJshcdhp1Mc= github.com/Rican7/retry v0.3.1/go.mod h1:CxSDrhAyXmTMeEuRAnArMu1FHu48vtfjLREWqVl7Vw0= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= @@ -18,6 +18,7 @@ github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -39,7 +40,6 @@ github.com/mkevac/debugcharts v0.0.0-20191222103121-ae1c48aa8615/go.mod h1:Ad7oe github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= -github.com/paulmach/orb v0.5.0/go.mod h1:FWRlTgl88VI1RBx/MkrwWDRhQ96ctqMCh8boXhmqB/A= github.com/paulmach/orb v0.7.1 h1:Zha++Z5OX/l168sqHK3k4z18LDvr+YAO/VjK0ReQ9rU= github.com/paulmach/orb v0.7.1/go.mod h1:FWRlTgl88VI1RBx/MkrwWDRhQ96ctqMCh8boXhmqB/A= github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= @@ -56,12 +56,14 @@ github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM= go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk= go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o= @@ -82,6 +84,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191220220014-0732a990476f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -104,5 +107,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/retry_writer.go b/retry_writer.go deleted file mode 100644 index 8d0f2c0..0000000 --- a/retry_writer.go +++ /dev/null @@ -1,28 +0,0 @@ -package clickhousebuffer - -import ( - "context" - - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - "github.com/zikwall/clickhouse-buffer/v2/src/database" -) - -type Writeable interface { - Write(ctx context.Context, view database.View, batch *buffer.Batch) (uint64, error) -} - -type defaultWriter struct { - conn database.Clickhouse -} - -func NewDefaultWriter(conn database.Clickhouse) Writeable { - w := &defaultWriter{ - conn: conn, - } - return w -} - -func (w *defaultWriter) Write(ctx context.Context, view database.View, batch *buffer.Batch) (uint64, error) { - affected, err := w.conn.Insert(ctx, view, batch.Rows()) - return affected, err -} diff --git a/src/buffer/buffer.go b/src/buffer/buffer.go deleted file mode 100644 index 80364be..0000000 --- a/src/buffer/buffer.go +++ /dev/null @@ -1,10 +0,0 @@ -package buffer - -// Buffer it is the interface for creating a data buffer (temporary storage). -// It is enough to implement this interface so that you can use your own temporary storage -type Buffer interface { - Write(RowSlice) - Read() []RowSlice - Len() int - Flush() -} diff --git a/src/buffer/buffer_row.go b/src/buffer/buffer_row.go deleted file mode 100644 index 58d5c48..0000000 --- a/src/buffer/buffer_row.go +++ /dev/null @@ -1,36 +0,0 @@ -package buffer - -import ( - "bytes" - "encoding/gob" -) - -// Inline interface is an assistant in the correct formation of the order of fields in the data -// before sending it to Clickhouse -type Inline interface { - Row() RowSlice -} - -type RowSlice []interface{} -type RowDecoded string - -// Encode turns the RowSlice type into an array of bytes. -// This method is used for data serialization and storage in remote buffers, such as redis.Buffer -func (rw RowSlice) Encode() ([]byte, error) { - var buf bytes.Buffer - err := gob.NewEncoder(&buf).Encode(rw) - if err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -// Decode This method is required to reverse deserialize an array of bytes in a RowSlice type -func (rd RowDecoded) Decode() (RowSlice, error) { - var v RowSlice - err := gob.NewDecoder(bytes.NewReader([]byte(rd))).Decode(&v) - if err != nil { - return nil, err - } - return v, nil -} diff --git a/src/buffer/cxmem/buffer.go b/src/buffer/cxmem/buffer.go new file mode 100644 index 0000000..c669be6 --- /dev/null +++ b/src/buffer/cxmem/buffer.go @@ -0,0 +1,35 @@ +package cxmem + +import ( + "github.com/zikwall/clickhouse-buffer/v3/src/cx" +) + +type memory struct { + buffer []cx.Vector + size uint +} + +func NewBuffer(bufferSize uint) cx.Buffer { + return &memory{ + buffer: make([]cx.Vector, 0, bufferSize+1), + size: bufferSize + 1, + } +} + +func (i *memory) Write(row cx.Vector) { + i.buffer = append(i.buffer, row) +} + +func (i *memory) Read() []cx.Vector { + snapshot := make([]cx.Vector, len(i.buffer)) + copy(snapshot, i.buffer) + return snapshot +} + +func (i *memory) Len() int { + return len(i.buffer) +} + +func (i *memory) Flush() { + i.buffer = i.buffer[:0] +} diff --git a/src/buffer/redis/buffer.go b/src/buffer/cxredis/buffer.go similarity index 71% rename from src/buffer/redis/buffer.go rename to src/buffer/cxredis/buffer.go index e528d10..31667f5 100644 --- a/src/buffer/redis/buffer.go +++ b/src/buffer/cxredis/buffer.go @@ -1,12 +1,12 @@ -package redis +package cxredis import ( "log" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) -func (r *redisBuffer) Write(row buffer.RowSlice) { +func (r *redisBuffer) Write(row cx.Vector) { buf, err := row.Encode() if err == nil { err = r.client.RPush(r.context, r.bucket, buf).Err() @@ -18,11 +18,11 @@ func (r *redisBuffer) Write(row buffer.RowSlice) { } } -func (r *redisBuffer) Read() []buffer.RowSlice { +func (r *redisBuffer) Read() []cx.Vector { values := r.client.LRange(r.context, r.bucket, 0, r.bufferSize).Val() - slices := make([]buffer.RowSlice, 0, len(values)) + slices := make([]cx.Vector, 0, len(values)) for _, value := range values { - if v, err := buffer.RowDecoded(value).Decode(); err == nil { + if v, err := cx.VectorDecoded(value).Decode(); err == nil { slices = append(slices, v) } else { log.Printf("redis buffer read err: %v\n", err.Error()) diff --git a/src/buffer/redis/connection.go b/src/buffer/cxredis/connection.go similarity index 84% rename from src/buffer/redis/connection.go rename to src/buffer/cxredis/connection.go index 78ce9bb..ebd5c0e 100644 --- a/src/buffer/redis/connection.go +++ b/src/buffer/cxredis/connection.go @@ -1,4 +1,4 @@ -package redis +package cxredis import ( "context" @@ -6,7 +6,7 @@ import ( "github.com/go-redis/redis/v8" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) const prefix = "ch_buffer" @@ -22,7 +22,7 @@ type redisBuffer struct { bufferSize int64 } -func NewBuffer(ctx context.Context, rdb *redis.Client, bucket string, bufferSize uint) (buffer.Buffer, error) { +func NewBuffer(ctx context.Context, rdb *redis.Client, bucket string, bufferSize uint) (cx.Buffer, error) { return &redisBuffer{ client: rdb, context: ctx, diff --git a/src/buffer/cxsyncmem/buffer.go b/src/buffer/cxsyncmem/buffer.go new file mode 100644 index 0000000..1a16cde --- /dev/null +++ b/src/buffer/cxsyncmem/buffer.go @@ -0,0 +1,48 @@ +package cxsyncmem + +import ( + "sync" + + "github.com/zikwall/clickhouse-buffer/v3/src/cx" +) + +// special for tests with locks +type memory struct { + buffer []cx.Vector + size uint + mu *sync.RWMutex +} + +func NewBuffer(bufferSize uint) cx.Buffer { + return &memory{ + buffer: make([]cx.Vector, 0, bufferSize+1), + size: bufferSize + 1, + mu: &sync.RWMutex{}, + } +} + +func (i *memory) Write(row cx.Vector) { + i.mu.Lock() + i.buffer = append(i.buffer, row) + i.mu.Unlock() +} + +func (i *memory) Read() []cx.Vector { + i.mu.RLock() + snapshot := make([]cx.Vector, len(i.buffer)) + copy(snapshot, i.buffer) + i.mu.RUnlock() + return snapshot +} + +func (i *memory) Len() int { + i.mu.RLock() + defer i.mu.RUnlock() + return len(i.buffer) +} + +func (i *memory) Flush() { + i.mu.Lock() + i.buffer = i.buffer[:0] + i.mu.Unlock() +} diff --git a/src/buffer/memory/buffer.go b/src/buffer/memory/buffer.go deleted file mode 100644 index 2a76711..0000000 --- a/src/buffer/memory/buffer.go +++ /dev/null @@ -1,33 +0,0 @@ -package memory - -import ( - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" -) - -type memory struct { - buffer []buffer.RowSlice - size uint -} - -func NewBuffer(bufferSize uint) buffer.Buffer { - return &memory{ - buffer: make([]buffer.RowSlice, 0, bufferSize+1), - size: bufferSize + 1, - } -} - -func (i *memory) Write(row buffer.RowSlice) { - i.buffer = append(i.buffer, row) -} - -func (i *memory) Read() []buffer.RowSlice { - return i.buffer -} - -func (i *memory) Len() int { - return len(i.buffer) -} - -func (i *memory) Flush() { - i.buffer = make([]buffer.RowSlice, 0, i.size) -} diff --git a/src/cx/buffer.go b/src/cx/buffer.go new file mode 100644 index 0000000..26f408f --- /dev/null +++ b/src/cx/buffer.go @@ -0,0 +1,45 @@ +package cx + +import ( + "bytes" + "encoding/gob" +) + +// Buffer it is the interface for creating a data buffer (temporary storage). +// It is enough to implement this interface so that you can use your own temporary storage +type Buffer interface { + Write(Vector) + Read() []Vector + Len() int + Flush() +} + +// Vectorable interface is an assistant in the correct formation of the order of fields in the data +// before sending it to Clickhouse +type Vectorable interface { + Row() Vector +} + +type Vector []interface{} +type VectorDecoded string + +// Encode turns the Vector type into an array of bytes. +// This method is used for data serialization and storage in remote buffers, such as redis.Buffer +func (v Vector) Encode() ([]byte, error) { + var buf bytes.Buffer + err := gob.NewEncoder(&buf).Encode(v) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// Decode This method is required to reverse deserialize an array of bytes in a Vector type +func (d VectorDecoded) Decode() (Vector, error) { + var v Vector + err := gob.NewDecoder(bytes.NewReader([]byte(d))).Decode(&v) + if err != nil { + return nil, err + } + return v, nil +} diff --git a/src/buffer/buffer_batch.go b/src/cx/buffer_batch.go similarity index 59% rename from src/buffer/buffer_batch.go rename to src/cx/buffer_batch.go index 7044fc2..8c8729b 100644 --- a/src/buffer/buffer_batch.go +++ b/src/cx/buffer_batch.go @@ -1,17 +1,17 @@ -package buffer +package cx // Batch holds information for sending rows batch type Batch struct { - rows []RowSlice + rows []Vector } // NewBatch creates new batch -func NewBatch(rows []RowSlice) *Batch { +func NewBatch(rows []Vector) *Batch { return &Batch{ rows: rows, } } -func (b *Batch) Rows() []RowSlice { +func (b *Batch) Rows() []Vector { return b.rows } diff --git a/src/cx/db.go b/src/cx/db.go new file mode 100644 index 0000000..92625a7 --- /dev/null +++ b/src/cx/db.go @@ -0,0 +1,19 @@ +package cx + +import ( + "context" +) + +type View struct { + Name string + Columns []string +} + +func NewView(name string, columns []string) View { + return View{Name: name, Columns: columns} +} + +type Clickhouse interface { + Insert(context.Context, View, []Vector) (uint64, error) + Close() error +} diff --git a/log.go b/src/cx/log.go similarity index 87% rename from log.go rename to src/cx/log.go index 66219aa..0f802e8 100644 --- a/log.go +++ b/src/cx/log.go @@ -1,4 +1,4 @@ -package clickhousebuffer +package cx import ( "fmt" @@ -12,7 +12,7 @@ type Logger interface { type defaultLogger struct{} -func newDefaultLogger() Logger { +func NewDefaultLogger() Logger { d := &defaultLogger{} return d } diff --git a/src/database/support.go b/src/cx/support.go similarity index 87% rename from src/database/support.go rename to src/cx/support.go index 2084885..5cfb8d5 100644 --- a/src/database/support.go +++ b/src/cx/support.go @@ -1,11 +1,9 @@ -package database +package cx import ( - "context" "time" "github.com/ClickHouse/clickhouse-go/v2" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" ) const ( @@ -70,13 +68,3 @@ func IsResendAvailable(err error) bool { } return true } - -type View struct { - Name string - Columns []string -} - -type Clickhouse interface { - Insert(context.Context, View, []buffer.RowSlice) (uint64, error) - Close() error -} diff --git a/src/database/native/impl.go b/src/db/cxnative/impl.go similarity index 72% rename from src/database/native/impl.go rename to src/db/cxnative/impl.go index f7d345a..92969da 100644 --- a/src/database/native/impl.go +++ b/src/db/cxnative/impl.go @@ -1,4 +1,4 @@ -package native +package cxnative import ( "context" @@ -10,8 +10,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - "github.com/zikwall/clickhouse-buffer/v2/src/database" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) type clickhouseNative struct { @@ -25,7 +24,7 @@ func nativeInsertQuery(table string, cols []string) string { return prepared } -func (c *clickhouseNative) Insert(ctx context.Context, view database.View, rows []buffer.RowSlice) (uint64, error) { +func (c *clickhouseNative) Insert(ctx context.Context, view cx.View, rows []cx.Vector) (uint64, error) { var err error timeoutContext, cancel := context.WithTimeout(ctx, c.insertTimeout) defer cancel() @@ -51,15 +50,15 @@ func (c *clickhouseNative) Close() error { return c.conn.Close() } -func NewClickhouse(ctx context.Context, options *clickhouse.Options) (database.Clickhouse, driver.Conn, error) { +func NewClickhouse(ctx context.Context, options *clickhouse.Options) (cx.Clickhouse, driver.Conn, error) { if options.MaxIdleConns == 0 { - options.MaxIdleConns = database.GetDefaultMaxIdleConns() + options.MaxIdleConns = cx.GetDefaultMaxIdleConns() } if options.MaxOpenConns == 0 { - options.MaxOpenConns = database.GetDefaultMaxOpenConns() + options.MaxOpenConns = cx.GetDefaultMaxOpenConns() } if options.ConnMaxLifetime == 0 { - options.ConnMaxLifetime = database.GetDefaultConnMaxLifetime() + options.ConnMaxLifetime = cx.GetDefaultConnMaxLifetime() } conn, err := clickhouse.Open(options) if err != nil { @@ -78,13 +77,13 @@ func NewClickhouse(ctx context.Context, options *clickhouse.Options) (database.C } return &clickhouseNative{ conn: conn, - insertTimeout: database.GetDefaultInsertDurationTimeout(), + insertTimeout: cx.GetDefaultInsertDurationTimeout(), }, conn, nil } -func NewClickhouseWithConn(conn driver.Conn) database.Clickhouse { +func NewClickhouseWithConn(conn driver.Conn) cx.Clickhouse { return &clickhouseNative{ conn: conn, - insertTimeout: database.GetDefaultInsertDurationTimeout(), + insertTimeout: cx.GetDefaultInsertDurationTimeout(), } } diff --git a/src/database/sql/impl.go b/src/db/cxsql/impl.go similarity index 81% rename from src/database/sql/impl.go rename to src/db/cxsql/impl.go index 61cb931..5c0343f 100644 --- a/src/database/sql/impl.go +++ b/src/db/cxsql/impl.go @@ -1,4 +1,4 @@ -package sql +package cxsql import ( "context" @@ -10,8 +10,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - "github.com/zikwall/clickhouse-buffer/v2/src/database" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) type clickhouseSQL struct { @@ -33,7 +32,7 @@ func insertQuery(table string, cols []string) string { // There is no support for user interfaces as well as simple execution of an already prepared request // The entire batch bid is implemented through so-called "transactions", // although Clickhouse does not support them - it is only a client solution for preparing requests -func (c *clickhouseSQL) Insert(ctx context.Context, view database.View, rows []buffer.RowSlice) (uint64, error) { +func (c *clickhouseSQL) Insert(ctx context.Context, view cx.View, rows []cx.Vector) (uint64, error) { tx, err := c.conn.Begin() if err != nil { return 0, err @@ -82,18 +81,18 @@ func NewClickhouse( options *clickhouse.Options, runtimeOpts *RuntimeOptions, ) ( - database.Clickhouse, + cx.Clickhouse, *sql.DB, error, ) { if runtimeOpts.MaxIdleConns == 0 { - runtimeOpts.MaxIdleConns = database.GetDefaultMaxIdleConns() + runtimeOpts.MaxIdleConns = cx.GetDefaultMaxIdleConns() } if runtimeOpts.MaxOpenConns == 0 { - runtimeOpts.MaxOpenConns = database.GetDefaultMaxOpenConns() + runtimeOpts.MaxOpenConns = cx.GetDefaultMaxOpenConns() } if runtimeOpts.ConnMaxLifetime == 0 { - runtimeOpts.ConnMaxLifetime = database.GetDefaultConnMaxLifetime() + runtimeOpts.ConnMaxLifetime = cx.GetDefaultConnMaxLifetime() } conn := clickhouse.OpenDB(options) conn.SetMaxIdleConns(runtimeOpts.MaxIdleConns) @@ -112,13 +111,13 @@ func NewClickhouse( } return &clickhouseSQL{ conn: conn, - insertTimeout: database.GetDefaultInsertDurationTimeout(), + insertTimeout: cx.GetDefaultInsertDurationTimeout(), }, conn, nil } -func NewClickhouseWithConn(conn *sql.DB) database.Clickhouse { +func NewClickhouseWithConn(conn *sql.DB) cx.Clickhouse { return &clickhouseSQL{ conn: conn, - insertTimeout: database.GetDefaultInsertDurationTimeout(), + insertTimeout: cx.GetDefaultInsertDurationTimeout(), } } diff --git a/retry.go b/src/retry/retry.go similarity index 81% rename from retry.go rename to src/retry/retry.go index 7703a0e..4d0e8d9 100644 --- a/retry.go +++ b/src/retry/retry.go @@ -1,4 +1,4 @@ -package clickhousebuffer +package retry import ( "context" @@ -8,8 +8,7 @@ import ( "github.com/Rican7/retry/backoff" "github.com/Rican7/retry/strategy" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - "github.com/zikwall/clickhouse-buffer/v2/src/database" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) const ( @@ -40,13 +39,13 @@ const ( ) type Retryable interface { - Retry(packet *retryPacket) + Retry(packet *Packet) Metrics() (uint64, uint64, uint64) } type Queueable interface { - Queue(packet *retryPacket) - Retries() <-chan *retryPacket + Queue(packet *Packet) + Retries() <-chan *Packet } type Closable interface { @@ -54,14 +53,20 @@ type Closable interface { CloseMessage() string } -type retryPacket struct { - view database.View - btc *buffer.Batch +type Packet struct { + view cx.View + batch *cx.Batch tryCount uint8 } +func NewPacket(view cx.View, batch *cx.Batch) *Packet { + return &Packet{ + view: view, batch: batch, + } +} + type retryImpl struct { - logger Logger + logger cx.Logger writer Writeable engine Queueable isDebug bool @@ -72,7 +77,7 @@ type retryImpl struct { progress Countable } -func NewRetry(ctx context.Context, engine Queueable, writer Writeable, logger Logger, isDebug bool) Retryable { +func NewRetry(ctx context.Context, engine Queueable, writer Writeable, logger cx.Logger, isDebug bool) Retryable { r := &retryImpl{ engine: engine, writer: writer, @@ -92,7 +97,7 @@ func (r *retryImpl) Metrics() (successfully, failed, progress uint64) { return r.successfully.Val(), r.failed.Val(), r.progress.Val() } -func (r *retryImpl) Retry(packet *retryPacket) { +func (r *retryImpl) Retry(packet *Packet) { if value := r.progress.Inc(); value >= defaultRetryChanSize { r.logger.Log(queueIsFull) return @@ -126,7 +131,7 @@ func (r *retryImpl) backoffRetry(ctx context.Context) { } } -func (r *retryImpl) action(ctx context.Context, view database.View, btc *buffer.Batch) retry.Action { +func (r *retryImpl) action(ctx context.Context, view cx.View, btc *cx.Batch) retry.Action { return func(attempt uint) error { affected, err := r.writer.Write(ctx, view, btc) if err != nil { @@ -145,11 +150,11 @@ func (r *retryImpl) action(ctx context.Context, view database.View, btc *buffer. // if error is not in list of not allowed, // and the number of repetition cycles has not been exhausted, // try to re-send it to the processing queue -func (r *retryImpl) resend(packet *retryPacket, err error) bool { - if (packet.tryCount < defaultCycloCount) && database.IsResendAvailable(err) { - r.Retry(&retryPacket{ +func (r *retryImpl) resend(packet *Packet, err error) bool { + if (packet.tryCount < defaultCycloCount) && cx.IsResendAvailable(err) { + r.Retry(&Packet{ view: packet.view, - btc: packet.btc, + batch: packet.batch, tryCount: packet.tryCount + 1, }) if r.isDebug { @@ -160,12 +165,12 @@ func (r *retryImpl) resend(packet *retryPacket, err error) bool { return false } -func (r *retryImpl) handlePacket(ctx context.Context, packet *retryPacket) { +func (r *retryImpl) handlePacket(ctx context.Context, packet *Packet) { r.progress.Dec() if r.isDebug { r.logger.Log(handleRetryMsg) } - if err := retry.Retry(r.action(ctx, packet.view, packet.btc), r.limit, r.backoff); err != nil { + if err := retry.Retry(r.action(ctx, packet.view, packet.batch), r.limit, r.backoff); err != nil { r.logger.Logf("%s: %v", limitOfRetries, err) if !r.resend(packet, err) { // otherwise, increase failed counter and report in logs that the package is always lost diff --git a/retry_memory.go b/src/retry/retry_memory.go similarity index 55% rename from retry_memory.go rename to src/retry/retry_memory.go index 8101bd9..b171f8c 100644 --- a/retry_memory.go +++ b/src/retry/retry_memory.go @@ -1,21 +1,21 @@ -package clickhousebuffer +package retry type imMemoryQueueEngine struct { - retries chan *retryPacket + retries chan *Packet } -func newImMemoryQueueEngine() Queueable { +func NewImMemoryQueueEngine() Queueable { r := &imMemoryQueueEngine{ - retries: make(chan *retryPacket, defaultRetryChanSize), + retries: make(chan *Packet, defaultRetryChanSize), } return r } -func (r *imMemoryQueueEngine) Queue(packet *retryPacket) { +func (r *imMemoryQueueEngine) Queue(packet *Packet) { r.retries <- packet } -func (r *imMemoryQueueEngine) Retries() <-chan *retryPacket { +func (r *imMemoryQueueEngine) Retries() <-chan *Packet { return r.retries } diff --git a/retry_stat.go b/src/retry/retry_stat.go similarity index 94% rename from retry_stat.go rename to src/retry/retry_stat.go index 2db3c2c..6a49c6f 100644 --- a/retry_stat.go +++ b/src/retry/retry_stat.go @@ -1,4 +1,4 @@ -package clickhousebuffer +package retry import "sync/atomic" diff --git a/src/retry/retry_writer.go b/src/retry/retry_writer.go new file mode 100644 index 0000000..43d452e --- /dev/null +++ b/src/retry/retry_writer.go @@ -0,0 +1,27 @@ +package retry + +import ( + "context" + + "github.com/zikwall/clickhouse-buffer/v3/src/cx" +) + +type Writeable interface { + Write(ctx context.Context, view cx.View, batch *cx.Batch) (uint64, error) +} + +type defaultWriter struct { + conn cx.Clickhouse +} + +func NewDefaultWriter(conn cx.Clickhouse) Writeable { + w := &defaultWriter{ + conn: conn, + } + return w +} + +func (w *defaultWriter) Write(ctx context.Context, view cx.View, batch *cx.Batch) (uint64, error) { + affected, err := w.conn.Insert(ctx, view, batch.Rows()) + return affected, err +} diff --git a/src/buffer/buffer_row_test.go b/tests/buffer_row_test.go similarity index 79% rename from src/buffer/buffer_row_test.go rename to tests/buffer_row_test.go index 620b672..d4c0e1f 100644 --- a/src/buffer/buffer_row_test.go +++ b/tests/buffer_row_test.go @@ -1,9 +1,11 @@ -package buffer +package tests import ( "reflect" "testing" "time" + + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) type RowTestMock struct { @@ -12,8 +14,8 @@ type RowTestMock struct { insertTS time.Time } -func (vm RowTestMock) Row() RowSlice { - return RowSlice{vm.id, vm.uuid, vm.insertTS.Format(time.RFC822)} +func (vm RowTestMock) Row() cx.Vector { + return cx.Vector{vm.id, vm.uuid, vm.insertTS.Format(time.RFC822)} } func TestRow(t *testing.T) { @@ -27,7 +29,7 @@ func TestRow(t *testing.T) { if err != nil { t.Fatal(err) } - value, err := RowDecoded(encoded).Decode() + value, err := cx.VectorDecoded(encoded).Decode() if err != nil { t.Fatal(err) } diff --git a/client_impl_test.go b/tests/client_impl_test.go similarity index 69% rename from client_impl_test.go rename to tests/client_impl_test.go index 6c586c5..f6a3078 100644 --- a/client_impl_test.go +++ b/tests/client_impl_test.go @@ -1,17 +1,19 @@ -package clickhousebuffer +package tests import ( "context" "fmt" + "sync" + "sync/atomic" "testing" "time" "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer/memory" - "github.com/zikwall/clickhouse-buffer/v2/src/database" + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxsyncmem" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) var ( @@ -31,7 +33,7 @@ var ( type ClickhouseImplMock struct{} -func (c *ClickhouseImplMock) Insert(_ context.Context, _ database.View, _ []buffer.RowSlice) (uint64, error) { +func (c *ClickhouseImplMock) Insert(_ context.Context, _ cx.View, _ []cx.Vector) (uint64, error) { return 0, nil } @@ -45,7 +47,7 @@ func (c *ClickhouseImplMock) Conn() driver.Conn { type ClickhouseImplErrMock struct{} -func (ce *ClickhouseImplErrMock) Insert(_ context.Context, _ database.View, _ []buffer.RowSlice) (uint64, error) { +func (ce *ClickhouseImplErrMock) Insert(_ context.Context, _ cx.View, _ []cx.Vector) (uint64, error) { return 0, errClickhouseUnknownException } @@ -59,7 +61,7 @@ func (ce *ClickhouseImplErrMock) Conn() driver.Conn { type ClickhouseImplErrMockFailed struct{} -func (ce *ClickhouseImplErrMockFailed) Insert(_ context.Context, _ database.View, _ []buffer.RowSlice) (uint64, error) { +func (ce *ClickhouseImplErrMockFailed) Insert(_ context.Context, _ cx.View, _ []cx.Vector) (uint64, error) { return 0, errClickhouseUnknownTableException } @@ -72,11 +74,11 @@ func (ce *ClickhouseImplErrMockFailed) Conn() driver.Conn { } type ClickhouseImplRetryMock struct { - hasErr bool + hasErr int32 } -func (cr *ClickhouseImplRetryMock) Insert(_ context.Context, _ database.View, _ []buffer.RowSlice) (uint64, error) { - if !cr.hasErr { +func (cr *ClickhouseImplRetryMock) Insert(_ context.Context, _ cx.View, _ []cx.Vector) (uint64, error) { + if val := atomic.LoadInt32(&cr.hasErr); val == 0 { return 0, errClickhouseUnknownException } return 1, nil @@ -96,29 +98,26 @@ type RowMock struct { insertTS time.Time } -func (vm RowMock) Row() buffer.RowSlice { - return buffer.RowSlice{vm.id, vm.uuid, vm.insertTS} +func (vm RowMock) Row() cx.Vector { + return cx.Vector{vm.id, vm.uuid, vm.insertTS} } // nolint:funlen,gocyclo // it's not important here -func TestClientImplHandleStream(t *testing.T) { - tableView := database.View{ - Name: "test_db.test_table", - Columns: []string{"id", "uuid", "insert_ts"}, - } +func TestClient(t *testing.T) { + tableView := cx.NewView("test_db.test_table", []string{"id", "uuid", "insert_ts"}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() t.Run("it should be correct send and flush data", func(t *testing.T) { - client := NewClientWithOptions(ctx, &ClickhouseImplMock{}, - DefaultOptions(). + client := clickhousebuffer.NewClientWithOptions(ctx, &ClickhouseImplMock{}, + clickhousebuffer.DefaultOptions(). SetFlushInterval(200). SetBatchSize(3). SetDebugMode(true). SetRetryIsEnabled(true), ) defer client.Close() - memoryBuffer := memory.NewBuffer( + memoryBuffer := cxsyncmem.NewBuffer( client.Options().BatchSize(), ) writeAPI := client.Writer(tableView, memoryBuffer) @@ -145,24 +144,27 @@ func TestClientImplHandleStream(t *testing.T) { // nolint:dupl // it's not important here t.Run("it should be successfully received three errors about writing", func(t *testing.T) { - client := NewClientWithOptions(ctx, &ClickhouseImplErrMock{}, - DefaultOptions(). + client := clickhousebuffer.NewClientWithOptions(ctx, &ClickhouseImplErrMock{}, + clickhousebuffer.DefaultOptions(). SetFlushInterval(10). SetBatchSize(1). SetDebugMode(true). SetRetryIsEnabled(true), ) defer client.Close() - memoryBuffer := memory.NewBuffer( + memoryBuffer := cxsyncmem.NewBuffer( client.Options().BatchSize(), ) writeAPI := client.Writer(tableView, memoryBuffer) var errors []error + mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() // Create go proc for reading and storing errors go func() { for err := range errorsCh { + mu.Lock() errors = append(errors, err) + mu.Unlock() } }() writeAPI.WriteRow(RowMock{ @@ -175,6 +177,8 @@ func TestClientImplHandleStream(t *testing.T) { id: 3, uuid: "3", insertTS: time.Now().Add(time.Second * 2), }) simulateWait(time.Millisecond * 150) + mu.RLock() + defer mu.RUnlock() if len(errors) != 3 { t.Fatalf("failed, expected to get three errors, received %d", len(errors)) } @@ -191,32 +195,37 @@ func TestClientImplHandleStream(t *testing.T) { t.Run("it should be successfully handle retry", func(t *testing.T) { mock := &ClickhouseImplRetryMock{} - client := NewClientWithOptions(ctx, mock, - DefaultOptions(). + client := clickhousebuffer.NewClientWithOptions(ctx, mock, + clickhousebuffer.DefaultOptions(). SetFlushInterval(10). SetBatchSize(1). SetDebugMode(true). SetRetryIsEnabled(true), ) defer client.Close() - memoryBuffer := memory.NewBuffer( + memoryBuffer := cxsyncmem.NewBuffer( client.Options().BatchSize(), ) writeAPI := client.Writer(tableView, memoryBuffer) var errors []error + mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() // Create go proc for reading and storing errors go func() { for err := range errorsCh { + mu.Lock() errors = append(errors, err) + mu.Unlock() } }() writeAPI.WriteRow(RowMock{ id: 1, uuid: "1", insertTS: time.Now(), }) - simulateWait(time.Nanosecond * 6000) - mock.hasErr = true - simulateWait(time.Millisecond * 1000) + simulateWait(time.Millisecond * 10) + atomic.StoreInt32(&mock.hasErr, 1) + simulateWait(time.Millisecond * 2000) + mu.RLock() + defer mu.RUnlock() if len(errors) != 1 { t.Fatalf("failed, expected to get one error, received %d", len(errors)) } @@ -231,26 +240,60 @@ func TestClientImplHandleStream(t *testing.T) { simulateWait(time.Millisecond * 350) }) + t.Run("it should be successfully handle retry without error channel", func(t *testing.T) { + mock := &ClickhouseImplRetryMock{} + client := clickhousebuffer.NewClientWithOptions(ctx, mock, + clickhousebuffer.DefaultOptions(). + SetFlushInterval(10). + SetBatchSize(1). + SetDebugMode(true). + SetRetryIsEnabled(true), + ) + defer client.Close() + memoryBuffer := cxsyncmem.NewBuffer( + client.Options().BatchSize(), + ) + writeAPI := client.Writer(tableView, memoryBuffer) + writeAPI.WriteRow(RowMock{ + id: 1, uuid: "1", insertTS: time.Now(), + }) + simulateWait(time.Millisecond * 10) + atomic.StoreInt32(&mock.hasErr, 1) + simulateWait(time.Millisecond * 2000) + if memoryBuffer.Len() != 0 { + t.Fatal("failed, the buffer was expected to be cleared") + } + ok, nook, progress := client.RetryClient().Metrics() + fmt.Println("#3:", ok, nook, progress) + if ok != 1 || nook != 0 || progress != 0 { + t.Fatalf("failed, expect one successful and zero fail retries, expect %d and failed %d", ok, nook) + } + simulateWait(time.Millisecond * 350) + }) + // nolint:dupl // it's not important here t.Run("it should be successfully broken retry", func(t *testing.T) { - client := NewClientWithOptions(ctx, &ClickhouseImplErrMockFailed{}, - DefaultOptions(). + client := clickhousebuffer.NewClientWithOptions(ctx, &ClickhouseImplErrMockFailed{}, + clickhousebuffer.DefaultOptions(). SetFlushInterval(10). SetBatchSize(1). SetDebugMode(true). SetRetryIsEnabled(true), ) defer client.Close() - memoryBuffer := memory.NewBuffer( + memoryBuffer := cxsyncmem.NewBuffer( client.Options().BatchSize(), ) writeAPI := client.Writer(tableView, memoryBuffer) var errors []error + mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() // Create go proc for reading and storing errors go func() { for err := range errorsCh { + mu.Lock() errors = append(errors, err) + mu.Unlock() } }() writeAPI.WriteRow(RowMock{ @@ -263,6 +306,8 @@ func TestClientImplHandleStream(t *testing.T) { id: 3, uuid: "3", insertTS: time.Now().Add(time.Second * 2), }) simulateWait(time.Millisecond * 150) + mu.RLock() + defer mu.RUnlock() if len(errors) != 3 { t.Fatalf("failed, expected to get three errors, received %d", len(errors)) } @@ -279,16 +324,13 @@ func TestClientImplHandleStream(t *testing.T) { } func TestClientImplWriteBatch(t *testing.T) { - tableView := database.View{ - Name: "test_db.test_table", - Columns: []string{"id", "uuid", "insert_ts"}, - } + tableView := cx.NewView("test_db.test_table", []string{"id", "uuid", "insert_ts"}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() t.Run("it should be correct send data", func(t *testing.T) { - client := NewClientWithOptions(ctx, &ClickhouseImplMock{}, - DefaultOptions(). + client := clickhousebuffer.NewClientWithOptions(ctx, &ClickhouseImplMock{}, + clickhousebuffer.DefaultOptions(). SetFlushInterval(10). SetBatchSize(1). SetDebugMode(true). @@ -296,7 +338,7 @@ func TestClientImplWriteBatch(t *testing.T) { ) defer client.Close() writerBlocking := client.WriterBlocking(tableView) - err := writerBlocking.WriteRow(ctx, []buffer.Inline{ + err := writerBlocking.WriteRow(ctx, []cx.Vectorable{ RowMock{ id: 1, uuid: "1", insertTS: time.Now(), }, @@ -313,8 +355,8 @@ func TestClientImplWriteBatch(t *testing.T) { }) t.Run("it should be successfully received error about writing", func(t *testing.T) { - client := NewClientWithOptions(ctx, &ClickhouseImplErrMock{}, - DefaultOptions(). + client := clickhousebuffer.NewClientWithOptions(ctx, &ClickhouseImplErrMock{}, + clickhousebuffer.DefaultOptions(). SetFlushInterval(10). SetBatchSize(1). SetDebugMode(true). @@ -322,7 +364,7 @@ func TestClientImplWriteBatch(t *testing.T) { ) defer client.Close() writerBlocking := client.WriterBlocking(tableView) - err := writerBlocking.WriteRow(ctx, []buffer.Inline{ + err := writerBlocking.WriteRow(ctx, []cx.Vectorable{ RowMock{ id: 1, uuid: "1", insertTS: time.Now(), }, diff --git a/integration_memory_test.go b/tests/integration_memory_test.go similarity index 83% rename from integration_memory_test.go rename to tests/integration_memory_test.go index 17fe5cf..b9778c5 100644 --- a/integration_memory_test.go +++ b/tests/integration_memory_test.go @@ -1,16 +1,16 @@ //go:build integration // +build integration -package clickhousebuffer +package tests import ( "context" "log" "testing" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer/memory" - "github.com/zikwall/clickhouse-buffer/v2/src/database" + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxsyncmem" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) // nolint:dupl // it's OK @@ -73,7 +73,7 @@ func TestSQLMemory(t *testing.T) { } } -func useClientAndMemoryBuffer(ctx context.Context, clickhouse database.Clickhouse) (Client, buffer.Buffer) { +func useClientAndMemoryBuffer(ctx context.Context, clickhouse cx.Clickhouse) (clickhousebuffer.Client, cx.Buffer) { client := useCommonClient(ctx, clickhouse) - return client, memory.NewBuffer(client.Options().BatchSize()) + return client, cxsyncmem.NewBuffer(client.Options().BatchSize()) } diff --git a/integration_test.go b/tests/integration_test.go similarity index 86% rename from integration_test.go rename to tests/integration_test.go index d0c7ae4..b350cb0 100644 --- a/integration_test.go +++ b/tests/integration_test.go @@ -1,7 +1,7 @@ //go:build integration // +build integration -package clickhousebuffer +package tests import ( "context" @@ -10,6 +10,7 @@ import ( "fmt" "log" "os" + "sync" "testing" "time" @@ -17,11 +18,11 @@ import ( "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/go-redis/redis/v8" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - redis2 "github.com/zikwall/clickhouse-buffer/v2/src/buffer/redis" - "github.com/zikwall/clickhouse-buffer/v2/src/database" - "github.com/zikwall/clickhouse-buffer/v2/src/database/native" - sqlCh "github.com/zikwall/clickhouse-buffer/v2/src/database/sql" + clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3" + "github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxredis" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative" + "github.com/zikwall/clickhouse-buffer/v3/src/db/cxsql" ) const integrationTableName = "default.test_integration_xxx_xxx" @@ -32,8 +33,8 @@ type integrationRow struct { insertTS time.Time } -func (i integrationRow) Row() buffer.RowSlice { - return buffer.RowSlice{i.id, i.uuid, i.insertTS.Format(time.RFC822)} +func (i integrationRow) Row() cx.Vector { + return cx.Vector{i.id, i.uuid, i.insertTS.Format(time.RFC822)} } // This test is a complete simulation of the work of the buffer bundle (Redis) and the Clickhouse data warehouse @@ -66,10 +67,13 @@ func TestNative(t *testing.T) { // STEP 5: Write own data to redis writeAPI := useWriteAPI(client, redisBuffer) var errorsSlice []error + mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() go func() { for err := range errorsCh { + mu.Lock() errorsSlice = append(errorsSlice, err) + mu.Unlock() } }() writeDataToBuffer(writeAPI) @@ -89,6 +93,8 @@ func TestNative(t *testing.T) { } // we expect an exception from Clickhouse: code: 60, message: Table default.test_integration_xxx_xxx doesn't exist <-time.After(600 * time.Millisecond) + mu.RLock() + defer mu.RUnlock() if len(errorsSlice) != 1 { t.Fatalf("failed, the clickhouse was expected receive one error, received: %d", len(errorsSlice)) } @@ -124,10 +130,13 @@ func TestSQL(t *testing.T) { // STEP 5: Write own data to redis writeAPI := useWriteAPI(client, redisBuffer) var errorsSlice []error + mu := &sync.RWMutex{} errorsCh := writeAPI.Errors() go func() { for err := range errorsCh { + mu.Lock() errorsSlice = append(errorsSlice, err) + mu.Unlock() } }() writeDataToBuffer(writeAPI) @@ -147,6 +156,8 @@ func TestSQL(t *testing.T) { } // we expect an exception from Clickhouse: code: 60, message: Table default.test_integration_xxx_xxx doesn't exist <-time.After(600 * time.Millisecond) + mu.RLock() + defer mu.RUnlock() if len(errorsSlice) != 1 { t.Fatalf("failed, the clickhouse was expected receive one error, received: %d", len(errorsSlice)) } @@ -323,54 +334,51 @@ func useOptions() *clickhouse.Options { } } -func useClickhousePool(ctx context.Context) (driver.Conn, database.Clickhouse, error) { - nativeClickhouse, conn, err := native.NewClickhouse(ctx, useOptions()) +func useClickhousePool(ctx context.Context) (driver.Conn, cx.Clickhouse, error) { + nativeClickhouse, conn, err := cxnative.NewClickhouse(ctx, useOptions()) if err != nil { return nil, nil, err } return conn, nativeClickhouse, nil } -func useClickhouseSQLPool(ctx context.Context) (*sql.DB, database.Clickhouse, error) { - sqlClickhouse, conn, err := sqlCh.NewClickhouse(ctx, useOptions(), &sqlCh.RuntimeOptions{}) +func useClickhouseSQLPool(ctx context.Context) (*sql.DB, cx.Clickhouse, error) { + sqlClickhouse, conn, err := cxsql.NewClickhouse(ctx, useOptions(), &cxsql.RuntimeOptions{}) if err != nil { return nil, nil, err } return conn, sqlClickhouse, nil } -func useCommonClient(ctx context.Context, ch database.Clickhouse) Client { - return NewClientWithOptions(ctx, ch, - DefaultOptions().SetFlushInterval(500).SetBatchSize(6), +func useCommonClient(ctx context.Context, ch cx.Clickhouse) clickhousebuffer.Client { + return clickhousebuffer.NewClientWithOptions(ctx, ch, + clickhousebuffer.DefaultOptions().SetFlushInterval(500).SetBatchSize(6), ) } func useClientAndRedisBuffer( ctx context.Context, - ch database.Clickhouse, + ch cx.Clickhouse, db *redis.Client, ) ( - Client, - buffer.Buffer, + clickhousebuffer.Client, + cx.Buffer, error, ) { client := useCommonClient(ctx, ch) - buf, err := redis2.NewBuffer(ctx, db, "bucket", client.Options().BatchSize()) + buf, err := cxredis.NewBuffer(ctx, db, "bucket", client.Options().BatchSize()) if err != nil { return nil, nil, fmt.Errorf("could't create redis buffer: %s", err) } return client, buf, nil } -func useWriteAPI(client Client, buf buffer.Buffer) Writer { - writeAPI := client.Writer(database.View{ - Name: integrationTableName, - Columns: []string{"id", "uuid", "insert_ts"}, - }, buf) +func useWriteAPI(client clickhousebuffer.Client, buf cx.Buffer) clickhousebuffer.Writer { + writeAPI := client.Writer(cx.NewView(integrationTableName, []string{"id", "uuid", "insert_ts"}), buf) return writeAPI } -func writeDataToBuffer(writeAPI Writer) { +func writeDataToBuffer(writeAPI clickhousebuffer.Writer) { writeAPI.WriteRow(integrationRow{ id: 1, uuid: "1", insertTS: time.Now(), }) @@ -390,7 +398,7 @@ func writeDataToBuffer(writeAPI Writer) { <-time.After(50 * time.Millisecond) } -func checksBuffer(buf buffer.Buffer) error { +func checksBuffer(buf cx.Buffer) error { // try read from redis buffer before flushing data in buffer rows := buf.Read() if len(rows) != 5 { diff --git a/write.go b/write.go index bce5a4f..2a33d63 100644 --- a/write.go +++ b/write.go @@ -1,11 +1,12 @@ package clickhousebuffer import ( - "log" + "context" + "sync" + "sync/atomic" "time" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - "github.com/zikwall/clickhouse-buffer/v2/src/database" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) // Writer is client interface with non-blocking methods for writing rows asynchronously in batches into an Clickhouse server. @@ -13,146 +14,185 @@ import ( // When using multiple goroutines for writing, use a single WriteAPI instance in all goroutines. type Writer interface { // WriteRow writes asynchronously line protocol record into bucket. - WriteRow(vector buffer.Inline) - // Flush forces all pending writes from the buffer to be sent - Flush() + WriteRow(vector cx.Vectorable) // Errors returns a channel for reading errors which occurs during async writes. Errors() <-chan error // Close writer Close() } -type WriterImpl struct { - view database.View - streamer Client - writeBuffer buffer.Buffer - writeCh chan *buffer.Batch +type writer struct { + context context.Context + view cx.View + client Client + bufferEngine cx.Buffer + writeOptions *Options errCh chan error - bufferCh chan buffer.RowSlice - bufferFlush chan struct{} + clickhouseCh chan *cx.Batch + bufferCh chan cx.Vector doneCh chan struct{} writeStop chan struct{} bufferStop chan struct{} - writeOptions *Options + mu *sync.RWMutex + isOpenErr int32 } // NewWriter returns new non-blocking write client for writing rows to Clickhouse table -func NewWriter(client Client, view database.View, buf buffer.Buffer, writeOptions *Options) Writer { - w := &WriterImpl{ +func NewWriter(ctx context.Context, client Client, view cx.View, engine cx.Buffer) Writer { + w := &writer{ + mu: &sync.RWMutex{}, + context: ctx, view: view, - streamer: client, - writeBuffer: buf, - writeOptions: writeOptions, - writeCh: make(chan *buffer.Batch), - bufferCh: make(chan buffer.RowSlice), - bufferFlush: make(chan struct{}), - doneCh: make(chan struct{}), - bufferStop: make(chan struct{}), - writeStop: make(chan struct{}), + client: client, + bufferEngine: engine, + writeOptions: client.Options(), + // write buffers + clickhouseCh: make(chan *cx.Batch), + bufferCh: make(chan cx.Vector, 100), + // signals + doneCh: make(chan struct{}), + bufferStop: make(chan struct{}), + writeStop: make(chan struct{}), } - go w.listenBufferWrite() - go w.listenStreamWrite() + go w.runBufferBridge() + go w.runClickhouseBridge() return w } // WriteRow writes asynchronously line protocol record into bucket. // WriteRow adds record into the buffer which is sent on the background when it reaches the batch size. -func (w *WriterImpl) WriteRow(rower buffer.Inline) { +func (w *writer) WriteRow(rower cx.Vectorable) { + // maybe use atomic for check is closed + // atomic.LoadInt32(&w.isClosed) == 1 w.bufferCh <- rower.Row() } // Errors returns a channel for reading errors which occurs during async writes. // Must be called before performing any writes for errors to be collected. // The chan is unbuffered and must be drained or the writer will block. -func (w *WriterImpl) Errors() <-chan error { +func (w *writer) Errors() <-chan error { + w.mu.Lock() + defer w.mu.Unlock() if w.errCh == nil { + atomic.StoreInt32(&w.isOpenErr, 1) w.errCh = make(chan error) } return w.errCh } -// Flush forces all pending writes from the buffer to be sent -func (w *WriterImpl) Flush() { - w.bufferFlush <- struct{}{} - w.awaitFlushing() -} - -func (w *WriterImpl) awaitFlushing() { - // waiting buffer is flushed - <-time.After(time.Millisecond) +func (w *writer) hasErrReader() bool { + return atomic.LoadInt32(&w.isOpenErr) > 0 } // Close finishes outstanding write operations, // stop background routines and closes all channels -func (w *WriterImpl) Close() { - if w.writeCh != nil { - // Flush outstanding metrics - w.Flush() - +func (w *writer) Close() { + if w.clickhouseCh != nil { // stop and wait for write buffer close(w.bufferStop) <-w.doneCh - close(w.bufferFlush) - close(w.bufferCh) - // stop and wait for write clickhouse close(w.writeStop) <-w.doneCh - close(w.writeCh) - w.writeCh = nil - - // close errors if open - if w.errCh != nil { - close(w.errCh) - w.errCh = nil - } + // stop ticker for flush to batch + // close(w.tickerStop) + // <-w.doneCh } if w.writeOptions.isDebug { - log.Printf("close writer %s", w.view.Name) + w.writeOptions.logger.Logf("close writer %s", w.view.Name) } } -func (w *WriterImpl) flushBuffer() { - if w.writeBuffer.Len() > 0 { - w.writeCh <- buffer.NewBatch(w.writeBuffer.Read()) - w.writeBuffer.Flush() +func (w *writer) flush() { + if w.writeOptions.isDebug { + w.writeOptions.logger.Logf("flush buffer: %s", w.view.Name) } + w.clickhouseCh <- cx.NewBatch(w.bufferEngine.Read()) + w.bufferEngine.Flush() } -func (w *WriterImpl) listenBufferWrite() { +// func (w *writer) runTicker() { +// ticker := time.NewTicker(time.Duration(w.writeOptions.FlushInterval()) * time.Millisecond) +// w.writeOptions.logger.Logf("run ticker: %s", w.view.Name) +// defer func() { +// ticker.Stop() +// w.doneCh <- struct{}{} +// w.writeOptions.logger.Logf("stop ticker: %s", w.view.Name) +// }() +// for { +// select { +// case <-ticker.C: +// if w.bufferEngine.Len() > 0 { +// w.flush() +// } +// case <-w.tickerStop: +// return +// } +// } +//} + +// writing to a temporary buffer to collect more data +func (w *writer) runBufferBridge() { ticker := time.NewTicker(time.Duration(w.writeOptions.FlushInterval()) * time.Millisecond) + defer func() { + ticker.Stop() + // flush last data + if w.bufferEngine.Len() > 0 { + w.flush() + } + // close buffer channel + close(w.bufferCh) + w.bufferCh = nil + // send signal, buffer listener is done + w.doneCh <- struct{}{} + w.writeOptions.logger.Logf("stop buffer bridge: %s", w.view.Name) + }() + w.writeOptions.logger.Logf("run buffer bridge: %s", w.view.Name) for { select { case vector := <-w.bufferCh: - w.writeBuffer.Write(vector) - if w.writeBuffer.Len() == int(w.writeOptions.BatchSize()) { - w.flushBuffer() + w.bufferEngine.Write(vector) + if w.bufferEngine.Len() == int(w.writeOptions.BatchSize()) { + w.flush() } case <-w.bufferStop: - w.flushBuffer() - w.doneCh <- struct{}{} - ticker.Stop() return case <-ticker.C: - w.flushBuffer() - case <-w.bufferFlush: - w.flushBuffer() + if w.bufferEngine.Len() > 0 { + w.flush() + } } } } -func (w *WriterImpl) listenStreamWrite() { +// asynchronously write to Clickhouse database in large batches +func (w *writer) runClickhouseBridge() { + w.writeOptions.logger.Logf("run clickhouse bridge: %s", w.view.Name) + defer func() { + // close clickhouse channel + close(w.clickhouseCh) + w.clickhouseCh = nil + // close errors channel if it created + w.mu.Lock() + if w.errCh != nil { + close(w.errCh) + w.errCh = nil + } + w.mu.Unlock() + // send signal, clickhouse listener is done + w.doneCh <- struct{}{} + w.writeOptions.logger.Logf("stop clickhouse bridge: %s", w.view.Name) + }() for { select { - case btc := <-w.writeCh: - err := w.streamer.HandleStream(w.view, btc) - if err != nil && w.errCh != nil { + case batch := <-w.clickhouseCh: + err := w.client.WriteBatch(w.context, w.view, batch) + if err != nil && w.hasErrReader() { w.errCh <- err } case <-w.writeStop: - w.doneCh <- struct{}{} return } } diff --git a/write_blocking.go b/write_blocking.go index b0e56ec..0c46eac 100644 --- a/write_blocking.go +++ b/write_blocking.go @@ -3,33 +3,32 @@ package clickhousebuffer import ( "context" - "github.com/zikwall/clickhouse-buffer/v2/src/buffer" - "github.com/zikwall/clickhouse-buffer/v2/src/database" + "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) type WriterBlocking interface { // WriteRow writes row(s) into bucket. // WriteRow writes without implicit batching. Batch is created from given number of records // Non-blocking alternative is available in the Writer interface - WriteRow(ctx context.Context, row ...buffer.Inline) error + WriteRow(ctx context.Context, row ...cx.Vectorable) error } -type WriterBlockingImpl struct { - view database.View - streamer Client +type writerBlocking struct { + view cx.View + client Client } -func NewWriterBlocking(streamer Client, view database.View) WriterBlocking { - w := &WriterBlockingImpl{ - view: view, - streamer: streamer, +func NewWriterBlocking(client Client, view cx.View) WriterBlocking { + w := &writerBlocking{ + view: view, + client: client, } return w } -func (w *WriterBlockingImpl) WriteRow(ctx context.Context, row ...buffer.Inline) error { +func (w *writerBlocking) WriteRow(ctx context.Context, row ...cx.Vectorable) error { if len(row) > 0 { - rows := make([]buffer.RowSlice, 0, len(row)) + rows := make([]cx.Vector, 0, len(row)) for _, r := range row { rows = append(rows, r.Row()) } @@ -38,8 +37,8 @@ func (w *WriterBlockingImpl) WriteRow(ctx context.Context, row ...buffer.Inline) return nil } -func (w *WriterBlockingImpl) write(ctx context.Context, rows []buffer.RowSlice) error { - err := w.streamer.WriteBatch(ctx, w.view, buffer.NewBatch(rows)) +func (w *writerBlocking) write(ctx context.Context, rows []cx.Vector) error { + err := w.client.WriteBatch(ctx, w.view, cx.NewBatch(rows)) if err != nil { return err } diff --git a/write_options.go b/write_options.go index dc7d0f4..e7d9473 100644 --- a/write_options.go +++ b/write_options.go @@ -1,5 +1,10 @@ package clickhousebuffer +import ( + "github.com/zikwall/clickhouse-buffer/v3/src/cx" + "github.com/zikwall/clickhouse-buffer/v3/src/retry" +) + // Options holds write configuration properties type Options struct { // Maximum number of rows sent to server in single request. Default 5000 @@ -11,9 +16,9 @@ type Options struct { // Retry is enabled isRetryEnabled bool // Logger with - logger Logger + logger cx.Logger // Queueable with - queue Queueable + queue retry.Queueable } // BatchSize returns size of batch @@ -48,12 +53,12 @@ func (o *Options) SetRetryIsEnabled(enabled bool) *Options { return o } -func (o *Options) SetLogger(logger Logger) *Options { +func (o *Options) SetLogger(logger cx.Logger) *Options { o.logger = logger return o } -func (o *Options) SetQueueEngine(queue Queueable) *Options { +func (o *Options) SetQueueEngine(queue retry.Queueable) *Options { o.queue = queue return o }