-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
17 changed files
with
1,363 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,111 @@ | ||
package cache_lib_go | ||
package cache_lib | ||
|
||
import "log" | ||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"log" | ||
"time" | ||
|
||
func Cache() error { | ||
log.Print("Does cache, placeholder for real code") | ||
"github.com/go-redis/redis/v8" | ||
) | ||
|
||
return nil | ||
type Cache[Data any] interface { | ||
RememberBlocking(ctx context.Context, fn LongFunc[Data], key string, ttl time.Duration) (*Data, error) | ||
} | ||
|
||
type cache[Data any] struct { | ||
client *redis.Client | ||
} | ||
|
||
type LongFunc[Data any] func(ctx context.Context) (*Data, error) | ||
|
||
func NewCache[Data any](client *redis.Client) Cache[Data] { | ||
return &cache[Data]{ | ||
client: client, | ||
} | ||
} | ||
|
||
func (c *cache[Data]) getCachedData(ctx context.Context, key string) *Data { | ||
cachedData, _ := c.client.Get(ctx, key).Result() | ||
|
||
if cachedData == "" { | ||
return nil | ||
} | ||
|
||
var marshaledData Data | ||
err := json.Unmarshal([]byte(cachedData), &marshaledData) | ||
if err != nil { | ||
return nil | ||
} | ||
|
||
return &marshaledData | ||
} | ||
|
||
func (c *cache[Data]) RememberBlocking(ctx context.Context, fn LongFunc[Data], key string, ttl time.Duration) (*Data, error) { | ||
cachedData := c.getCachedData(ctx, key) | ||
if cachedData != nil { | ||
return cachedData, nil | ||
} | ||
success, err := c.client.SetNX(ctx, key, "", ttl).Result() | ||
if err != nil { | ||
log.Println(err) | ||
|
||
return nil, err | ||
} | ||
if !success { | ||
return c.rememberWait(ctx, key) | ||
} | ||
data, err := fn(ctx) | ||
if err != nil { | ||
c.client.Publish(ctx, key, "cache miss") | ||
|
||
return nil, err | ||
} | ||
bytedata, err := json.Marshal(*data) | ||
if err != nil { | ||
return nil, err | ||
} | ||
_, err = c.client.Set(ctx, key, string(bytedata), ttl).Result() | ||
if err != nil { | ||
log.Println(err) | ||
|
||
return nil, err | ||
} | ||
_, err = c.client.Publish(ctx, key, string(bytedata)).Result() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return data, nil | ||
} | ||
|
||
func (c *cache[Data]) rememberWait(ctx context.Context, key string) (*Data, error) { | ||
subscription := NewCacheSubscription(c.client, key) | ||
subscription.Subscribe(ctx) | ||
defer func() { | ||
err := subscription.Unsubscribe(ctx) | ||
if err != nil { | ||
log.Println(err) | ||
} | ||
}() | ||
|
||
channel, err := subscription.GetChannel(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
for msg := range channel { | ||
if msg.Payload != "" { | ||
var u Data | ||
// Unmarshal the data into the user | ||
if err := json.Unmarshal([]byte(msg.Payload), &u); err != nil { | ||
return nil, err | ||
} | ||
|
||
return &u, nil | ||
} | ||
} | ||
|
||
return nil, errors.New("error reading from pub/sub") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
package cache_lib | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
|
||
"github.com/go-redis/redis/v8" | ||
) | ||
|
||
type CacheSubscription interface { | ||
Unsubscribe(ctx context.Context) error | ||
Subscribe(ctx context.Context) CacheSubscription | ||
GetChannel(ctx context.Context) (<-chan *redis.Message, error) | ||
} | ||
|
||
type cacheSubscription struct { | ||
channel string | ||
client *redis.Client | ||
Subscription *redis.PubSub | ||
} | ||
|
||
func NewCacheSubscription(client *redis.Client, channel string) CacheSubscription { | ||
return &cacheSubscription{ | ||
client: client, | ||
channel: channel, | ||
Subscription: nil, | ||
} | ||
} | ||
|
||
func (cs *cacheSubscription) Unsubscribe(ctx context.Context) error { | ||
if cs.Subscription == nil { | ||
return nil | ||
} | ||
err := cs.Subscription.Unsubscribe(ctx, cs.channel) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
cs.Subscription = nil | ||
|
||
return nil | ||
} | ||
|
||
func (cs *cacheSubscription) Subscribe(ctx context.Context) CacheSubscription { | ||
cs.Subscription = cs.client.Subscribe(ctx, cs.channel) | ||
if cs.Subscription == nil { | ||
panic("no subscription created") | ||
} | ||
|
||
return cs | ||
} | ||
|
||
func (cs *cacheSubscription) GetChannel(ctx context.Context) (<-chan *redis.Message, error) { | ||
if cs.Subscription == nil { | ||
return nil, errors.New("No subscription") | ||
} | ||
|
||
return cs.Subscription.Channel(), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,158 @@ | ||
package cache_lib_go_test | ||
package cache_lib_test | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/go-redis/redis/v8" | ||
"github.com/go-redis/redismock/v8" | ||
"github.com/stretchr/testify/assert" | ||
|
||
cache_lib_go "github.com/honestbank/cache-lib-go" | ||
cache_lib "github.com/honestbank/cache-lib-go" | ||
) | ||
|
||
func TestCache(t *testing.T) { | ||
type Response struct { | ||
Result bool `json:"result"` | ||
} | ||
|
||
func TestNewCache(t *testing.T) { | ||
a := assert.New(t) | ||
err := cache_lib_go.Cache() | ||
|
||
a.NoError(err) | ||
db, _ := redismock.NewClientMock() | ||
cache := cache_lib.NewCache[Response](db) | ||
|
||
a.NotNil(cache) | ||
} | ||
|
||
func TestCache_RememberBlocking(t *testing.T) { | ||
db, mock := redismock.NewClientMock() | ||
cache := cache_lib.NewCache[Response](db) | ||
t.Run("single cache", func(t *testing.T) { | ||
a := assert.New(t) | ||
|
||
response := Response{Result: true} | ||
responseString, _ := json.Marshal(response) | ||
|
||
mock.ExpectGet("data").SetVal("") | ||
mock.ExpectSetNX("data", "", 1*time.Second).SetVal(true) | ||
mock.ExpectSet("data", string(responseString), 1*time.Second).SetVal(string(responseString)) | ||
mock.ExpectPublish("data", string(responseString)).SetVal(1) | ||
|
||
result, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) { | ||
time.Sleep(2 * time.Second) | ||
|
||
return &response, nil | ||
}, "data", 1*time.Second) | ||
|
||
a.NoError(err) | ||
a.Equal(response, *result) | ||
}) | ||
t.Run("single cache - invalid cached", func(t *testing.T) { | ||
a := assert.New(t) | ||
|
||
response := Response{Result: true} | ||
responseString, _ := json.Marshal(response) | ||
|
||
mock.ExpectGet("data").SetVal("not valid json") | ||
mock.ExpectSetNX("data", "", 1*time.Second).SetVal(true) | ||
mock.ExpectSet("data", string(responseString), 1*time.Second).SetVal(string(responseString)) | ||
mock.ExpectPublish("data", string(responseString)).SetVal(1) | ||
|
||
result, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) { | ||
time.Sleep(2 * time.Second) | ||
|
||
return &response, nil | ||
}, "data", 1*time.Second) | ||
|
||
a.NoError(err) | ||
a.Equal(response, *result) | ||
}) | ||
t.Run("single cache - setNX error", func(t *testing.T) { | ||
a := assert.New(t) | ||
|
||
response := Response{Result: true} | ||
|
||
mock.ExpectSetNX("data", "", 1*time.Second).SetErr(errors.New("Unable to set")) | ||
|
||
result, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) { | ||
time.Sleep(2 * time.Second) | ||
|
||
return &response, nil | ||
}, "data", 1*time.Second) | ||
|
||
a.Error(err) | ||
a.Nil(result) | ||
}) | ||
|
||
t.Run("single cache - request error", func(t *testing.T) { | ||
a := assert.New(t) | ||
|
||
mock.ExpectSetNX("data", "", 1*time.Second).SetVal(true) | ||
|
||
result, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) { | ||
time.Sleep(2 * time.Second) | ||
|
||
return nil, errors.New("request Failed") | ||
}, "data", 1*time.Second) | ||
|
||
a.Error(err) | ||
a.Nil(result) | ||
}) | ||
|
||
t.Run("single cache", func(t *testing.T) { | ||
a := assert.New(t) | ||
|
||
response := Response{Result: true} | ||
responseString, _ := json.Marshal(response) | ||
|
||
mock.ExpectGet("data").SetVal("{}") | ||
mock.ExpectSetNX("data", "", 1*time.Second).SetVal(true) | ||
mock.ExpectSetNX("data", string(responseString), 1*time.Second).SetVal(true) | ||
|
||
mock.ExpectPublish("data", string(responseString)).SetVal(1) | ||
|
||
result, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) { | ||
time.Sleep(2 * time.Second) | ||
|
||
return &response, nil | ||
}, "data", 1*time.Second) | ||
|
||
a.NoError(err) | ||
a.Equal(Response{}, *result) | ||
}) | ||
} | ||
|
||
func TestNewCacheSubscription(t *testing.T) { | ||
redisClient := redis.NewClient(&redis.Options{ | ||
Addr: ":6379", // We connect to host redis, thats what the hostname of the redis service is set to in the docker-compose | ||
DB: 0, | ||
}) | ||
|
||
cache := cache_lib.NewCache[Response](redisClient) | ||
|
||
t.Run("Multiple calls", func(t *testing.T) { | ||
a := assert.New(t) | ||
|
||
response := Response{Result: true} | ||
|
||
go func() { | ||
_, _ = cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) { | ||
time.Sleep(2 * time.Second) | ||
|
||
return &response, nil | ||
}, "data", 1*time.Second) | ||
}() | ||
|
||
result, err := cache.RememberBlocking(context.Background(), func(ctx context.Context) (*Response, error) { | ||
time.Sleep(2 * time.Second) | ||
|
||
return &response, nil | ||
}, "data", 1*time.Second) | ||
|
||
a.NoError(err) | ||
a.Equal(response, *result) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
module github.com/pubsub_example | ||
|
||
go 1.18 | ||
|
||
replace github.com/honestbank/cache-lib-go => ./../../ | ||
|
||
require ( | ||
github.com/go-redis/redis/v8 v8.11.5 | ||
github.com/go-resty/resty/v2 v2.7.0 | ||
github.com/gofiber/fiber/v2 v2.41.0 | ||
github.com/honestbank/cache-lib-go v0.0.0-00010101000000-000000000000 | ||
) | ||
|
||
require ( | ||
github.com/andybalholm/brotli v1.0.4 // indirect | ||
github.com/cespare/xxhash/v2 v2.1.2 // indirect | ||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect | ||
github.com/klauspost/compress v1.15.9 // indirect | ||
github.com/mattn/go-colorable v0.1.13 // indirect | ||
github.com/mattn/go-isatty v0.0.17 // indirect | ||
github.com/mattn/go-runewidth v0.0.14 // indirect | ||
github.com/rivo/uniseg v0.2.0 // indirect | ||
github.com/valyala/bytebufferpool v1.0.0 // indirect | ||
github.com/valyala/fasthttp v1.43.0 // indirect | ||
github.com/valyala/tcplisten v1.0.0 // indirect | ||
golang.org/x/net v0.4.0 // indirect | ||
golang.org/x/sys v0.3.0 // indirect | ||
) |
Oops, something went wrong.