Skip to content

Commit

Permalink
V3: implementation of new version (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
zikwall authored Jun 10, 2022
1 parent 42846ba commit 6b5fbc6
Show file tree
Hide file tree
Showing 38 changed files with 767 additions and 567 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- name: Build
env:
GOPROXY: "https://proxy.golang.org"
run: go build .
run: go build ./example/cmd/simple/main.go

- name: Test
env:
Expand Down
56 changes: 28 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[![build](https://github.com/zikwall/clickhouse-buffer/workflows/build_and_tests/badge.svg)](https://github.com/zikwall/clickhouse-buffer/v2/actions)
[![build](https://github.com/zikwall/clickhouse-buffer/workflows/golangci_lint/badge.svg)](https://github.com/zikwall/clickhouse-buffer/v2/actions)
[![build](https://github.com/zikwall/clickhouse-buffer/workflows/build_and_tests/badge.svg)](https://github.com/zikwall/clickhouse-buffer/v3/actions)
[![build](https://github.com/zikwall/clickhouse-buffer/workflows/golangci_lint/badge.svg)](https://github.com/zikwall/clickhouse-buffer/v3/actions)

<div align="center">
<h1>Clickhouse Buffer</h1>
Expand All @@ -9,7 +9,7 @@
## Install

- for go-clickhouse v1 `$ go get -u github.com/zikwall/clickhouse-buffer`
- for go-clickhouse v2 `$ go get -u github.com/zikwall/clickhouse-buffer/v2`
- for go-clickhouse v2 `$ go get -u github.com/zikwall/clickhouse-buffer/v3`

### Why and why

Expand All @@ -33,6 +33,7 @@ This is due to the fact that Clickhouse is designed so that it better processes

- [x] **in-memory** - use native channels and slices
- [x] **redis** - use redis server as queue and buffer
- [x] **in-memory-sync** - if you get direct access to buffer, it will help to avoid data race
- [x] **retries** - resending "broken" or for some reason not sent packets

### Usage
Expand All @@ -41,23 +42,23 @@ This is due to the fact that Clickhouse is designed so that it better processes
import (
"database/sql"

cxnative "github.com/zikwall/clickhouse-buffer/v2/src/database/native"
cxsql "github.com/zikwall/clickhouse-buffer/v2/src/database/sql"
"github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative"
"github.com/zikwall/clickhouse-buffer/v3/src/db/cxsql"
)

// if you already have a connection to Clickhouse you can just use wrappers
// with native interface
ch := cxnative.NewClickhouseWithConn(conn: driver.Conn)
ch := cxnative.NewClickhouseWithConn(driver.Conn)
// or use database/sql interface
ch := cxsql.NewClickhouseWithConn(conn: *sql.DB)
ch := cxsql.NewClickhouseWithConn(*sql.DB)
```

```go
// if you don't want to create connections yourself,
// package can do it for you, just call the connection option you need:

// with native interface
ch, conn, err := cxnative.NewClickhouse(ctx,&clickhouse.Options{
ch, conn, err := cxnative.NewClickhouse(ctx, &clickhouse.Options{
Addr: ctx.StringSlice("clickhouse-host"),
Auth: clickhouse.Auth{
Database: ctx.String("clickhouse-database"),
Expand Down Expand Up @@ -96,38 +97,37 @@ ch, conn, err := cxsql.NewClickhouse(ctx, &clickhouse.Options{

```go
import (
cx "github.com/zikwall/clickhouse-buffer/v2"
cxbuffer "github.com/zikwall/clickhouse-buffer/v2/src/buffer"
cxmemory "github.com/zikwall/clickhouse-buffer/v2/src/buffer/memory"
cxredis "github.com/zikwall/clickhouse-buffer/v2/src/buffer/redis"
clickhousebuffer "github.com/zikwall/clickhouse-buffer/v3"
"github.com/zikwall/clickhouse-buffer/v3/src/buffer/cxmem"
"github.com/zikwall/clickhouse-buffer/v3/src/db/cxnative"
)
// create root client
client := cx.NewClientWithOptions(ctx, ch,
cx.DefaultOptions().SetFlushInterval(1000).SetBatchSize(5000),
client := clickhousebuffer.NewClientWithOptions(ctx, ch,
clickhousebuffer.DefaultOptions().SetFlushInterval(1000).SetBatchSize(5000),
)
// create buffer engine
buffer := cxmemory.NewBuffer(
buffer := cxmem.NewBuffer(
client.Options().BatchSize(),
)
// or use redis
buffer := cxredis.NewBuffer(
contetx, *redis.Client, "bucket", client.Options().BatchSize(),
)
// create new writer api: table name with columns
writeAPI := client.Writer(cxbuffer.View{
Name: "clickhouse_database.clickhouse_table",
Columns: []string{"id", "uuid", "insert_ts"},
}, buffer)
writeAPI := client.Writer(
cx.NewView("clickhouse_database.clickhouse_table", []string{"id", "uuid", "insert_ts"}),
buffer,
)

// define your custom data structure
type MyCustomDataView struct {
id int
uuid string
insertTS time.Time
}
// and implement cxbuffer.Inline interface
func (t *MyCustomDataView) Row() cxbuffer.RowSlice {
return cxbuffer.RowSlice{t.id, t.uuid, t.insertTS.Format(time.RFC822)}
// and implement cxbuffer.Vectorable interface
func (t *MyCustomDataView) Row() cx.Vector {
return cx.Vector{t.id, t.uuid, t.insertTS.Format(time.RFC822)}
}
// async write your data
writeAPI.WriteRow(&MyCustomDataView{
Expand All @@ -150,12 +150,12 @@ Using the blocking writer interface

```go
// create new writer api: table name with columns
writerBlocking := client.WriterBlocking(cxbuffer.View{
writerBlocking := client.WriterBlocking(cx.View{
Name: "clickhouse_database.clickhouse_table",
Columns: []string{"id", "uuid", "insert_ts"},
})
// non-asynchronous writing of data directly to Clickhouse
err := writerBlocking.WriteRow(ctx, []MyCustomDataView{
err := writerBlocking.WriteRow(ctx, []&MyCustomDataView{
{
id: 1, uuid: "1", insertTS: time.Now(),
},
Expand Down Expand Up @@ -183,15 +183,15 @@ You can implement queue engine by defining the `Queueable` interface:

```go
type Queueable interface {
Queue(packet *retryPacket)
Retries() <-chan *retryPacket
Queue(packet *Packet)
Retries() <-chan *Packet
}
```

and set it as an engine:

```go
cx.DefaultOptions().SetDebugMode(true).SetRetryIsEnabled(true).SetQueueEngine(CustomQueueable)
clickhousebuffer.DefaultOptions().SetDebugMode(true).SetRetryIsEnabled(true).SetQueueEngine(CustomQueueable)
```

#### Logs:
Expand All @@ -207,7 +207,7 @@ type Logger interface {

```go
// example with default options
cx.DefaultOptions().SetDebugMode(true).SetLogger(SomeLogger)
clickhousebuffer.DefaultOptions().SetDebugMode(true).SetLogger(SomeLogger)
```

#### Tests:
Expand Down
119 changes: 111 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,130 @@ package clickhousebuffer

import (
"context"
"sync"

"github.com/zikwall/clickhouse-buffer/v2/src/buffer"
"github.com/zikwall/clickhouse-buffer/v2/src/database"
"github.com/zikwall/clickhouse-buffer/v3/src/cx"
"github.com/zikwall/clickhouse-buffer/v3/src/retry"
)

type Client interface {
// Options returns the options associated with client
Options() *Options
// HandleStream method for processing data x and sending it to Clickhouse
HandleStream(database.View, *buffer.Batch) error
// WriteBatch method of sending data to Clickhouse is used implicitly in a non - blocking record,
// and explicitly in a blocking record
WriteBatch(context.Context, database.View, *buffer.Batch) error
WriteBatch(context.Context, cx.View, *cx.Batch) error
// Writer returns the asynchronous, non-blocking, Writer client.
// Ensures using a single Writer instance for each table pair.
Writer(database.View, buffer.Buffer) Writer
Writer(cx.View, cx.Buffer) Writer
// WriterBlocking returns the synchronous, blocking, WriterBlocking client.
// Ensures using a single WriterBlocking instance for each table pair.
WriterBlocking(database.View) WriterBlocking
WriterBlocking(cx.View) WriterBlocking
// RetryClient Get retry client
RetryClient() Retryable
RetryClient() retry.Retryable
// Close ensures all ongoing asynchronous write clients finish.
Close()
}

type clientImpl struct {
context context.Context
clickhouse cx.Clickhouse
options *Options
writeAPIs map[string]Writer
syncWriteAPIs map[string]WriterBlocking
mu sync.RWMutex
retry retry.Retryable
logger cx.Logger
}

func NewClient(ctx context.Context, clickhouse cx.Clickhouse) Client {
return NewClientWithOptions(ctx, clickhouse, DefaultOptions())
}

func NewClientWithOptions(ctx context.Context, clickhouse cx.Clickhouse, options *Options) Client {
if options.logger == nil {
options.logger = cx.NewDefaultLogger()
}
client := &clientImpl{
context: ctx,
clickhouse: clickhouse,
options: options,
writeAPIs: map[string]Writer{},
syncWriteAPIs: map[string]WriterBlocking{},
logger: options.logger,
}
if options.isRetryEnabled {
if options.queue == nil {
options.queue = retry.NewImMemoryQueueEngine()
}
client.retry = retry.NewRetry(
ctx, options.queue, retry.NewDefaultWriter(clickhouse), options.logger, options.isDebug,
)
}
return client
}

func (c *clientImpl) Options() *Options {
return c.options
}

func (c *clientImpl) Writer(view cx.View, buf cx.Buffer) Writer {
key := view.Name
c.mu.Lock()
if _, ok := c.writeAPIs[key]; !ok {
c.writeAPIs[key] = NewWriter(c.context, c, view, buf)
}
writer := c.writeAPIs[key]
c.mu.Unlock()
return writer
}

func (c *clientImpl) WriterBlocking(view cx.View) WriterBlocking {
key := view.Name
c.mu.Lock()
if _, ok := c.syncWriteAPIs[key]; !ok {
c.syncWriteAPIs[key] = NewWriterBlocking(c, view)
}
writer := c.syncWriteAPIs[key]
c.mu.Unlock()
return writer
}

func (c *clientImpl) Close() {
if c.options.isDebug {
c.logger.Log("close clickhouse buffer client")
c.logger.Log("close async writers")
}
// closing and destroying all asynchronous writers
c.mu.Lock()
for key, w := range c.writeAPIs {
w.Close()
delete(c.writeAPIs, key)
}
c.mu.Unlock()
// closing and destroying all synchronous writers
if c.options.isDebug {
c.logger.Log("close sync writers")
}
c.mu.Lock()
for key := range c.syncWriteAPIs {
delete(c.syncWriteAPIs, key)
}
c.mu.Unlock()
}

func (c *clientImpl) WriteBatch(ctx context.Context, view cx.View, batch *cx.Batch) error {
_, err := c.clickhouse.Insert(ctx, view, batch.Rows())
if err != nil {
// if there is an acceptable error and if the functionality of resending data is activated,
// try to repeat the operation
if c.options.isRetryEnabled && cx.IsResendAvailable(err) {
c.retry.Retry(retry.NewPacket(view, batch))
}
return err
}
return nil
}

func (c *clientImpl) RetryClient() retry.Retryable {
return c.retry
}
Loading

0 comments on commit 6b5fbc6

Please sign in to comment.