-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathloader_etcd_integration_test.go
327 lines (276 loc) · 9.38 KB
/
loader_etcd_integration_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
//go:build integration
// Copyright The ActForGood Authors.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://github.com/actforgood/xconf/blob/main/LICENSE.
package xconf_test
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"reflect"
"strings"
"testing"
"time"
"github.com/actforgood/xconf"
clientv3 "go.etcd.io/etcd/client/v3"
)
// Note: data from this test file can be generated with ./scripts/etcd_data_provider.sh
func TestEtcdLoader_withJSON_integration(t *testing.T) {
const key = "json-key"
const format = xconf.RemoteValueJSON
t.Run("single key", testEtcdLoaderIntegration(format, key, false))
t.Run("prefix key", testEtcdLoaderIntegration(format, key, true))
}
func TestEtcdLoader_withYAML_integration(t *testing.T) {
const key = "yaml-key"
const format = xconf.RemoteValueYAML
t.Run("single key", testEtcdLoaderIntegration(format, key, false))
t.Run("prefix key", testEtcdLoaderIntegration(format, key, true))
}
func TestEtcdLoader_withPlain_integration(t *testing.T) {
const key = "plain-key"
const format = xconf.RemoteValuePlain
t.Run("single key", testEtcdLoaderIntegration(format, key, false))
t.Run("prefix key", testEtcdLoaderIntegration(format, key, true))
}
func TestEtcdLoader_withWatcher_success(t *testing.T) {
// arrange
ctx, cancelCtx := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelCtx()
// setup an aux client that creates/updates/deletes 3 keys we will play with.
setUpClient, err := clientv3.New(clientv3.Config{
Endpoints: getDefaultEtcdEndpoints(),
DialTimeout: 10 * time.Second,
})
if err != nil {
t.Fatal("prerequisites failed: could not setup etcd client:", err)
}
defer setUpClient.Close()
if _, err := setUpClient.Put(ctx, "ETCD_TEST_INTEGRATION_WATCH_FOO", "foo"); err != nil {
t.Fatal("prerequisites failed: could not create 'foo' key:", err)
}
if _, err := setUpClient.Put(ctx, "ETCD_TEST_INTEGRATION_WATCH_BAR", "bar"); err != nil {
t.Fatal("prerequisites failed: could not create 'bar' key:", err)
}
defer func() { // remove the keys we played with.
_, _ = setUpClient.Delete(ctx, "ETCD_TEST_INTEGRATION_WATCH_FOO")
_, _ = setUpClient.Delete(ctx, "ETCD_TEST_INTEGRATION_WATCH_BAR")
_, _ = setUpClient.Delete(ctx, "ETCD_TEST_INTEGRATION_WATCH_BAZ")
}()
opts := []xconf.EtcdLoaderOption{
xconf.EtcdLoaderWithValueFormat(xconf.RemoteValuePlain),
xconf.EtcdLoaderWithContext(ctx),
xconf.EtcdLoaderWithWatcher(),
xconf.EtcdLoaderWithPrefix(),
}
subject := xconf.NewEtcdLoader("ETCD_TEST_INTEGRATION_WATCH_", opts...)
defer subject.Close() // nicely close resources
// act
config, err := subject.Load()
assertNil(t, err)
assertEqual(
t,
map[string]any{
"ETCD_TEST_INTEGRATION_WATCH_FOO": "foo",
"ETCD_TEST_INTEGRATION_WATCH_BAR": "bar",
},
config,
)
// update foo, delete bar, create baz
if _, err := setUpClient.Put(ctx, "ETCD_TEST_INTEGRATION_WATCH_FOO", "foo - updated"); err != nil {
t.Fatal("prerequisites failed: could not update 'foo' key:", err)
}
if _, err := setUpClient.Delete(ctx, "ETCD_TEST_INTEGRATION_WATCH_BAR"); err != nil {
t.Fatal("prerequisites failed: could not delete 'bar' key:", err)
}
if _, err := setUpClient.Put(ctx, "ETCD_TEST_INTEGRATION_WATCH_BAZ", "baz"); err != nil {
t.Fatal("prerequisites failed: could not create 'baz' key:", err)
}
expectedNewConfig := map[string]any{
"ETCD_TEST_INTEGRATION_WATCH_FOO": "foo - updated",
"ETCD_TEST_INTEGRATION_WATCH_BAZ": "baz", // new key
}
// give watcher some time to act
maxTry := 4
sleep := time.Second
for i := 0; i < maxTry; i++ {
time.Sleep(sleep)
// act again - see new configuration
config, err = subject.Load()
if reflect.DeepEqual(expectedNewConfig, config) || i == maxTry {
assertNil(t, err)
assertEqual(t, expectedNewConfig, config)
break
}
sleep *= 2
}
}
func TestEtcdLoader_withWatcher_error(t *testing.T) {
// arrange
ctx, cancelCtx := context.WithTimeout(context.Background(), time.Minute)
defer cancelCtx()
// setup an aux client that creates/updates the key we will play with.
setUpClient, err := clientv3.New(clientv3.Config{
Endpoints: getDefaultEtcdEndpoints(),
DialTimeout: 10 * time.Second,
})
if err != nil {
t.Fatal("prerequisites failed: could not setup etcd client:", err)
}
defer setUpClient.Close()
if _, err := setUpClient.Put(ctx, "ETCD_TEST_INTEGRATION_WATCH_FOO_JSON", `{"etcd_foo":"bar"}`); err != nil {
t.Fatal("prerequisites failed: could not create 'foo' json key:", err)
}
defer func() {
_, _ = setUpClient.Delete(ctx, "ETCD_TEST_INTEGRATION_WATCH_FOO_JSON") // rm the key we played with.
}()
opts := []xconf.EtcdLoaderOption{
xconf.EtcdLoaderWithValueFormat(xconf.RemoteValueJSON),
xconf.EtcdLoaderWithContext(ctx),
xconf.EtcdLoaderWithWatcher(),
}
subject := xconf.NewEtcdLoader("ETCD_TEST_INTEGRATION_WATCH_FOO_JSON", opts...)
defer subject.Close() // nicely close resources
// act
config, err := subject.Load()
assertNil(t, err)
assertEqual(t, map[string]any{"etcd_foo": "bar"}, config)
// we update foo, with corrupted json
if _, err := setUpClient.Put(ctx, "ETCD_TEST_INTEGRATION_WATCH_FOO_JSON", "{corrupted json"); err != nil {
t.Fatal("prerequisites failed: could not update 'foo' json key:", err)
}
// give watcher some time to act
maxTry := 4
sleep := time.Second
for i := 0; i < maxTry; i++ {
time.Sleep(sleep)
// act again - see error is returned
config, err = subject.Load()
if err != nil || i == maxTry {
// old "version" is returned, but also error.
assertEqual(t, map[string]any{"etcd_foo": "bar"}, config)
var jsonErr *json.SyntaxError
assertTrue(t, errors.As(err, &jsonErr))
break
}
sleep *= 2
}
// we update foo, fixing the json
if _, err := setUpClient.Put(ctx, "ETCD_TEST_INTEGRATION_WATCH_FOO_JSON", `{"etcd_foo":"baz"}`); err != nil {
t.Fatal("prerequisites failed: could not update 'foo' json key:", err)
}
// give watcher some time to act
sleep = time.Second
for i := 0; i < maxTry; i++ {
time.Sleep(sleep)
// act again - see everything went back to normal
config, err = subject.Load()
if err == nil || i == maxTry {
assertNil(t, err)
assertEqual(t, map[string]any{"etcd_foo": "baz"}, config)
break
}
sleep *= 2
}
}
func testEtcdLoaderIntegration(format, key string, withPrefix bool) func(t *testing.T) {
return func(t *testing.T) {
// arrange
ctx, cancelCtx := context.WithTimeout(context.Background(), 15*time.Second)
defer cancelCtx()
opts := []xconf.EtcdLoaderOption{
xconf.EtcdLoaderWithValueFormat(format),
xconf.EtcdLoaderWithContext(ctx),
}
if withPrefix {
opts = append(opts, xconf.EtcdLoaderWithPrefix())
}
subject := xconf.NewEtcdLoader(key, opts...)
// act
config, err := subject.Load()
// assert
assertNil(t, err)
assertEqual(t, getEtcdExpectedConfigMapIntegration(format, withPrefix), config)
}
}
func TestEtcdLoader_withTLS_integration(t *testing.T) {
// arrange
endpoints, tlsConfig, err := getEtcdTLSInfo()
if err != nil {
t.Fatal(err)
}
const key = "plain-key"
const format = xconf.RemoteValuePlain
ctx, cancelCtx := context.WithTimeout(context.Background(), 15*time.Second)
defer cancelCtx()
opts := []xconf.EtcdLoaderOption{
xconf.EtcdLoaderWithValueFormat(format),
xconf.EtcdLoaderWithContext(ctx),
xconf.EtcdLoaderWithPrefix(),
xconf.EtcdLoaderWithEndpoints(endpoints),
xconf.EtcdLoaderWithTLS(tlsConfig),
}
subject := xconf.NewEtcdLoader(key, opts...)
// act
config, err := subject.Load()
// assert
assertNil(t, err)
assertEqual(t, getEtcdExpectedConfigMapIntegration(format, true), config)
}
// getEtcdExpectedConfigMapIntegration returns expected config maps for integration tests.
func getEtcdExpectedConfigMapIntegration(format string, withPrefix bool) map[string]any {
return getConsulExpectedConfigMapIntegration(format, withPrefix) // same as consul...
}
// getDefaultEtcdEndpoints tries to get etcd endpoints from ENV.
// It defaults on localhost address.
func getDefaultEtcdEndpoints() (endpoints []string) {
// try to get from env variables
if eps := os.Getenv("ETCD_ENDPOINTS"); eps != "" {
endpoints = strings.Split(eps, ",")
} else {
endpoints = []string{"127.0.0.1:2379"}
}
return
}
// getEtcdTLSInfo returns etcd endpoint and config.
func getEtcdTLSInfo() ([]string, *tls.Config, error) {
var (
tlsCfg = new(tls.Config)
endpoints []string
caCertFilePath string
)
const defaultCaCertFilePath = "scripts/tls/certs/ca_cert.pem"
if eps := os.Getenv("ETCDS_ENDPOINTS"); eps != "" {
endpoints = strings.Split(eps, ",")
hostname, _, err := net.SplitHostPort(endpoints[0])
if err != nil {
return nil, nil, fmt.Errorf("could not parse address %q: %w", endpoints[0], err)
}
tlsCfg.ServerName = hostname
caCertFilePath = fmt.Sprintf(
"%s%c%s",
os.Getenv("GITHUB_WORKSPACE"), os.PathSeparator, defaultCaCertFilePath,
)
} else {
endpoints = []string{"localhost:2389"}
tlsCfg.ServerName = "localhost"
caCertFilePath = defaultCaCertFilePath
}
certContent, err := os.ReadFile(caCertFilePath)
if err != nil {
return nil, nil, fmt.Errorf("could not read CA cert: %w", err)
}
certPool := x509.NewCertPool()
if ok := certPool.AppendCertsFromPEM(certContent); !ok {
return nil, nil, fmt.Errorf("could not parse PEM CA certificate %q", caCertFilePath)
}
tlsCfg.MinVersion = tls.VersionTLS12
tlsCfg.RootCAs = certPool
return endpoints, tlsCfg, nil
}