Skip to content

Commit

Permalink
Improve redis buffer (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
zikwall authored Jun 20, 2022
1 parent c1646f9 commit c6f02cd
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 110 deletions.
116 changes: 14 additions & 102 deletions bench/insert_redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ import (
"github.com/zikwall/clickhouse-buffer/v3/src/cx"
)

// x10
// x100
// goos: linux
// goarch: amd64
// pkg: github.com/zikwall/clickhouse-buffer/v3/bench
// cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
// BenchmarkInsertRedisObjects/10000-12 10 2165062600 ns/op 8949147 B/op 215127 allocs/op
// BenchmarkInsertRedisObjects/1000-12 10 205537440 ns/op 960664 B/op 23209 allocs/op
// BenchmarkInsertRedisObjects/100-12 10 20371570 ns/op 96292 B/op 2323 allocs/op
// BenchmarkInsertRedisObjects/10-12 10 2216160 ns/op 1868 B/op 32 allocs/op
// BenchmarkInsertRedisObjects/1-12 10 180490 ns/op 188 B/op 3 allocs/op
// BenchmarkInsertRedisObjects/1000-12 100 22404356 ns/op 96095 B/op 2322 allocs/op
// BenchmarkInsertRedisObjects/100-12 100 2243544 ns/op 9673 B/op 233 allocs/op
// BenchmarkInsertRedisObjects/10-12 100 271749 ns/op 1033 B/op 25 allocs/op
// PASS
// ok
// nolint:funlen,dupl // it's not important here
Expand All @@ -41,29 +39,8 @@ func BenchmarkInsertRedisObjects(b *testing.B) {
var writeAPI clickhousebuffer.Writer
b.ResetTimer()

b.Run("10000", func(b *testing.B) {
client.Options().SetBatchSize(10000)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
DB: 11,
}), "bucket", client.Options().BatchSize())
if err != nil {
log.Panicln(err)
}
writeAPI = client.Writer(
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
rxbuffer,
)
b.ResetTimer()

for i := 0; i < b.N; i++ {
insertObjects(writeAPI, 10000, b)
}
})

b.Run("1000", func(b *testing.B) {
client.Options().SetBatchSize(1000)
client.Options().SetBatchSize(1001)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
Expand All @@ -83,7 +60,7 @@ func BenchmarkInsertRedisObjects(b *testing.B) {
})

b.Run("100", func(b *testing.B) {
client.Options().SetBatchSize(100)
client.Options().SetBatchSize(101)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
Expand All @@ -103,7 +80,7 @@ func BenchmarkInsertRedisObjects(b *testing.B) {
})

b.Run("10", func(b *testing.B) {
client.Options().SetBatchSize(10)
client.Options().SetBatchSize(11)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
Expand All @@ -122,41 +99,19 @@ func BenchmarkInsertRedisObjects(b *testing.B) {
}
})

b.Run("1", func(b *testing.B) {
client.Options().SetBatchSize(1)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
}), "bucket", client.Options().BatchSize())
if err != nil {
log.Panicln(err)
}
writeAPI = client.Writer(
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
rxbuffer,
)
b.ResetTimer()

for i := 0; i < b.N; i++ {
insertObjects(writeAPI, 1, b)
}
})

b.StopTimer()
writeAPI.Close()
client.Close()
}

// x10
// x100
// goos: linux
// goarch: amd64
// pkg: github.com/zikwall/clickhouse-buffer/v3/bench
// cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
// BenchmarkInsertRedisVectors/10000-12 10 2035777840 ns/op 9124870 B/op 223549 allocs/op
// BenchmarkInsertRedisVectors/1000-12 10 208882330 ns/op 926325 B/op 22709 allocs/op
// BenchmarkInsertRedisVectors/100-12 10 19944150 ns/op 92810 B/op 2273 allocs/op
// BenchmarkInsertRedisVectors/10-12 10 1891890 ns/op 1588 B/op 28 allocs/op
// BenchmarkInsertRedisVectors/1-12 10 182180 ns/op 160 B/op 2 allocs/op
// BenchmarkInsertRedisVectors/1000-12 100 22145258 ns/op 92766 B/op 2274 allocs/op
// BenchmarkInsertRedisVectors/100-12 100 2320692 ns/op 9339 B/op 229 allocs/op
// BenchmarkInsertRedisVectors/10-12 100 202146 ns/op 157 B/op 2 allocs/op
// PASS
// ok
// nolint:funlen,dupl // it's not important here
Expand All @@ -174,31 +129,9 @@ func BenchmarkInsertRedisVectors(b *testing.B) {
var writeAPI clickhousebuffer.Writer
b.ResetTimer()

b.Run("10000", func(b *testing.B) {
b.StopTimer()
client.Options().SetBatchSize(10000)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
DB: 11,
}), "bucket", client.Options().BatchSize())
if err != nil {
log.Panicln(err)
}
writeAPI = client.Writer(
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
rxbuffer,
)
b.StartTimer()

for i := 0; i < b.N; i++ {
insertVectors(writeAPI, 10000, b)
}
})

b.Run("1000", func(b *testing.B) {
b.StopTimer()
client.Options().SetBatchSize(1000)
client.Options().SetBatchSize(1001)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
Expand All @@ -219,7 +152,7 @@ func BenchmarkInsertRedisVectors(b *testing.B) {

b.Run("100", func(b *testing.B) {
b.StopTimer()
client.Options().SetBatchSize(100)
client.Options().SetBatchSize(101)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
Expand All @@ -240,7 +173,7 @@ func BenchmarkInsertRedisVectors(b *testing.B) {

b.Run("10", func(b *testing.B) {
b.StopTimer()
client.Options().SetBatchSize(10)
client.Options().SetBatchSize(11)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
Expand All @@ -259,27 +192,6 @@ func BenchmarkInsertRedisVectors(b *testing.B) {
}
})

b.Run("1", func(b *testing.B) {
b.StopTimer()
client.Options().SetBatchSize(1)
rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{
Addr: redisHost,
Password: redisPass,
}), "bucket", client.Options().BatchSize())
if err != nil {
log.Panicln(err)
}
writeAPI = client.Writer(
cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()),
rxbuffer,
)
b.StartTimer()

for i := 0; i < b.N; i++ {
insertVectors(writeAPI, 1, b)
}
})

b.StopTimer()
writeAPI.Close()
client.Close()
Expand Down
22 changes: 14 additions & 8 deletions src/buffer/cxredis/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,29 @@ package cxredis

import (
"log"
"sync/atomic"

"github.com/zikwall/clickhouse-buffer/v3/src/cx"
)

func (r *redisBuffer) Write(row cx.Vector) {
buf, err := row.Encode()
if err == nil {
err = r.client.RPush(r.context, r.bucket, buf).Err()
if err != nil && !r.isContextClosedErr(err) {
var err error
var buf []byte
if buf, err = row.Encode(); err != nil {
log.Printf("redis buffer value encode err: %v\n", err.Error())
return
}
if err = r.client.RPush(r.context, r.bucket, buf).Err(); err != nil {
if !r.isContextClosedErr(err) {
log.Printf("redis buffer write err: %v\n", err.Error())
}
} else {
log.Printf("redis buffer value encode err: %v\n", err.Error())
return
}
atomic.AddInt64(&r.size, 1)
}

func (r *redisBuffer) Read() []cx.Vector {
values := r.client.LRange(r.context, r.bucket, 0, r.bufferSize).Val()
values := r.client.LRange(r.context, r.bucket, 0, atomic.LoadInt64(&r.size)).Val()
slices := make([]cx.Vector, 0, len(values))
for _, value := range values {
if v, err := cx.VectorDecoded(value).Decode(); err == nil {
Expand All @@ -32,9 +37,10 @@ func (r *redisBuffer) Read() []cx.Vector {
}

func (r *redisBuffer) Len() int {
return int(r.client.LLen(r.context, r.bucket).Val())
return int(r.size)
}

func (r *redisBuffer) Flush() {
r.client.LTrim(r.context, r.bucket, r.bufferSize, -1).Val()
atomic.CompareAndSwapInt64(&r.size, r.size, 0)
}
2 changes: 2 additions & 0 deletions src/buffer/cxredis/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type redisBuffer struct {
context context.Context
bucket string
bufferSize int64
size int64
}

func NewBuffer(ctx context.Context, rdb *redis.Client, bucket string, bufferSize uint) (cx.Buffer, error) {
Expand All @@ -28,6 +29,7 @@ func NewBuffer(ctx context.Context, rdb *redis.Client, bucket string, bufferSize
context: ctx,
bucket: key(bucket),
bufferSize: int64(bufferSize),
size: rdb.LLen(ctx, bucket).Val(),
}, nil
}

Expand Down

0 comments on commit c6f02cd

Please sign in to comment.