From c6f02cd306e42dfeee0c1947dec02b789fc32552 Mon Sep 17 00:00:00 2001 From: Andrey Kapitonov Date: Mon, 20 Jun 2022 16:20:06 +0300 Subject: [PATCH] Improve redis buffer (#31) --- bench/insert_redis_test.go | 116 ++++--------------------------- src/buffer/cxredis/buffer.go | 22 +++--- src/buffer/cxredis/connection.go | 2 + 3 files changed, 30 insertions(+), 110 deletions(-) diff --git a/bench/insert_redis_test.go b/bench/insert_redis_test.go index d47fc18..a317e51 100644 --- a/bench/insert_redis_test.go +++ b/bench/insert_redis_test.go @@ -14,16 +14,14 @@ import ( "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) -// x10 +// x100 // goos: linux // goarch: amd64 // pkg: github.com/zikwall/clickhouse-buffer/v3/bench // cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz -// BenchmarkInsertRedisObjects/10000-12 10 2165062600 ns/op 8949147 B/op 215127 allocs/op -// BenchmarkInsertRedisObjects/1000-12 10 205537440 ns/op 960664 B/op 23209 allocs/op -// BenchmarkInsertRedisObjects/100-12 10 20371570 ns/op 96292 B/op 2323 allocs/op -// BenchmarkInsertRedisObjects/10-12 10 2216160 ns/op 1868 B/op 32 allocs/op -// BenchmarkInsertRedisObjects/1-12 10 180490 ns/op 188 B/op 3 allocs/op +// BenchmarkInsertRedisObjects/1000-12 100 22404356 ns/op 96095 B/op 2322 allocs/op +// BenchmarkInsertRedisObjects/100-12 100 2243544 ns/op 9673 B/op 233 allocs/op +// BenchmarkInsertRedisObjects/10-12 100 271749 ns/op 1033 B/op 25 allocs/op // PASS // ok // nolint:funlen,dupl // it's not important here @@ -41,29 +39,8 @@ func BenchmarkInsertRedisObjects(b *testing.B) { var writeAPI clickhousebuffer.Writer b.ResetTimer() - b.Run("10000", func(b *testing.B) { - client.Options().SetBatchSize(10000) - rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ - Addr: redisHost, - Password: redisPass, - DB: 11, - }), "bucket", client.Options().BatchSize()) - if err != nil { - log.Panicln(err) - } - writeAPI = client.Writer( - cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), - rxbuffer, - ) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - insertObjects(writeAPI, 10000, b) - } - }) - b.Run("1000", func(b *testing.B) { - client.Options().SetBatchSize(1000) + client.Options().SetBatchSize(1001) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, Password: redisPass, @@ -83,7 +60,7 @@ func BenchmarkInsertRedisObjects(b *testing.B) { }) b.Run("100", func(b *testing.B) { - client.Options().SetBatchSize(100) + client.Options().SetBatchSize(101) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, Password: redisPass, @@ -103,7 +80,7 @@ func BenchmarkInsertRedisObjects(b *testing.B) { }) b.Run("10", func(b *testing.B) { - client.Options().SetBatchSize(10) + client.Options().SetBatchSize(11) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, Password: redisPass, @@ -122,41 +99,19 @@ func BenchmarkInsertRedisObjects(b *testing.B) { } }) - b.Run("1", func(b *testing.B) { - client.Options().SetBatchSize(1) - rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ - Addr: redisHost, - Password: redisPass, - }), "bucket", client.Options().BatchSize()) - if err != nil { - log.Panicln(err) - } - writeAPI = client.Writer( - cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), - rxbuffer, - ) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - insertObjects(writeAPI, 1, b) - } - }) - b.StopTimer() writeAPI.Close() client.Close() } -// x10 +// x100 // goos: linux // goarch: amd64 // pkg: github.com/zikwall/clickhouse-buffer/v3/bench // cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz -// BenchmarkInsertRedisVectors/10000-12 10 2035777840 ns/op 9124870 B/op 223549 allocs/op -// BenchmarkInsertRedisVectors/1000-12 10 208882330 ns/op 926325 B/op 22709 allocs/op -// BenchmarkInsertRedisVectors/100-12 10 19944150 ns/op 92810 B/op 2273 allocs/op -// BenchmarkInsertRedisVectors/10-12 10 1891890 ns/op 1588 B/op 28 allocs/op -// BenchmarkInsertRedisVectors/1-12 10 182180 ns/op 160 B/op 2 allocs/op +// BenchmarkInsertRedisVectors/1000-12 100 22145258 ns/op 92766 B/op 2274 allocs/op +// BenchmarkInsertRedisVectors/100-12 100 2320692 ns/op 9339 B/op 229 allocs/op +// BenchmarkInsertRedisVectors/10-12 100 202146 ns/op 157 B/op 2 allocs/op // PASS // ok // nolint:funlen,dupl // it's not important here @@ -174,31 +129,9 @@ func BenchmarkInsertRedisVectors(b *testing.B) { var writeAPI clickhousebuffer.Writer b.ResetTimer() - b.Run("10000", func(b *testing.B) { - b.StopTimer() - client.Options().SetBatchSize(10000) - rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ - Addr: redisHost, - Password: redisPass, - DB: 11, - }), "bucket", client.Options().BatchSize()) - if err != nil { - log.Panicln(err) - } - writeAPI = client.Writer( - cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), - rxbuffer, - ) - b.StartTimer() - - for i := 0; i < b.N; i++ { - insertVectors(writeAPI, 10000, b) - } - }) - b.Run("1000", func(b *testing.B) { b.StopTimer() - client.Options().SetBatchSize(1000) + client.Options().SetBatchSize(1001) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, Password: redisPass, @@ -219,7 +152,7 @@ func BenchmarkInsertRedisVectors(b *testing.B) { b.Run("100", func(b *testing.B) { b.StopTimer() - client.Options().SetBatchSize(100) + client.Options().SetBatchSize(101) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, Password: redisPass, @@ -240,7 +173,7 @@ func BenchmarkInsertRedisVectors(b *testing.B) { b.Run("10", func(b *testing.B) { b.StopTimer() - client.Options().SetBatchSize(10) + client.Options().SetBatchSize(11) rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ Addr: redisHost, Password: redisPass, @@ -259,27 +192,6 @@ func BenchmarkInsertRedisVectors(b *testing.B) { } }) - b.Run("1", func(b *testing.B) { - b.StopTimer() - client.Options().SetBatchSize(1) - rxbuffer, err := cxredis.NewBuffer(ctx, redis.NewClient(&redis.Options{ - Addr: redisHost, - Password: redisPass, - }), "bucket", client.Options().BatchSize()) - if err != nil { - log.Panicln(err) - } - writeAPI = client.Writer( - cx.NewView(tables.ExampleTableName(), tables.ExampleTableColumns()), - rxbuffer, - ) - b.StartTimer() - - for i := 0; i < b.N; i++ { - insertVectors(writeAPI, 1, b) - } - }) - b.StopTimer() writeAPI.Close() client.Close() diff --git a/src/buffer/cxredis/buffer.go b/src/buffer/cxredis/buffer.go index 31667f5..43d5eef 100644 --- a/src/buffer/cxredis/buffer.go +++ b/src/buffer/cxredis/buffer.go @@ -2,24 +2,29 @@ package cxredis import ( "log" + "sync/atomic" "github.com/zikwall/clickhouse-buffer/v3/src/cx" ) func (r *redisBuffer) Write(row cx.Vector) { - buf, err := row.Encode() - if err == nil { - err = r.client.RPush(r.context, r.bucket, buf).Err() - if err != nil && !r.isContextClosedErr(err) { + var err error + var buf []byte + if buf, err = row.Encode(); err != nil { + log.Printf("redis buffer value encode err: %v\n", err.Error()) + return + } + if err = r.client.RPush(r.context, r.bucket, buf).Err(); err != nil { + if !r.isContextClosedErr(err) { log.Printf("redis buffer write err: %v\n", err.Error()) } - } else { - log.Printf("redis buffer value encode err: %v\n", err.Error()) + return } + atomic.AddInt64(&r.size, 1) } func (r *redisBuffer) Read() []cx.Vector { - values := r.client.LRange(r.context, r.bucket, 0, r.bufferSize).Val() + values := r.client.LRange(r.context, r.bucket, 0, atomic.LoadInt64(&r.size)).Val() slices := make([]cx.Vector, 0, len(values)) for _, value := range values { if v, err := cx.VectorDecoded(value).Decode(); err == nil { @@ -32,9 +37,10 @@ func (r *redisBuffer) Read() []cx.Vector { } func (r *redisBuffer) Len() int { - return int(r.client.LLen(r.context, r.bucket).Val()) + return int(r.size) } func (r *redisBuffer) Flush() { r.client.LTrim(r.context, r.bucket, r.bufferSize, -1).Val() + atomic.CompareAndSwapInt64(&r.size, r.size, 0) } diff --git a/src/buffer/cxredis/connection.go b/src/buffer/cxredis/connection.go index ebd5c0e..025b8b2 100644 --- a/src/buffer/cxredis/connection.go +++ b/src/buffer/cxredis/connection.go @@ -20,6 +20,7 @@ type redisBuffer struct { context context.Context bucket string bufferSize int64 + size int64 } func NewBuffer(ctx context.Context, rdb *redis.Client, bucket string, bufferSize uint) (cx.Buffer, error) { @@ -28,6 +29,7 @@ func NewBuffer(ctx context.Context, rdb *redis.Client, bucket string, bufferSize context: ctx, bucket: key(bucket), bufferSize: int64(bufferSize), + size: rdb.LLen(ctx, bucket).Val(), }, nil }