-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrebouncer.go
98 lines (82 loc) · 2.31 KB
/
rebouncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package rebouncer
import (
"sync"
)
// incomingEvents will have this capacity
const DefaultBufferSize = 1024
type Rebouncer[NICE any] interface {
Subscribe() <-chan NICE // the channel a consumer can subsribe to
emit() // flushes the Queue
readQueue() []NICE // gets the Queue, with safety and locking
writeQueue([]NICE) // sets the Queue, handling safety and locking
ingest(Ingester[NICE])
quantize(Quantizer[NICE]) // decides whether the flush the Queue
reduce(Reducer[NICE], NICE) // removes unwanted NiceEvents from the Queue
Interrupt() // call this to initiate the "Draining" state
}
// NewRebouncer is the best way to create a new Rebouncer.
func NewRebouncer[NICE any](
ingestFunc Ingester[NICE],
reduceFunc Reducer[NICE],
quantizeFunc Quantizer[NICE],
bufferSize int, // for sizing the buffered channel that accepts incoming events
) Rebouncer[NICE] {
// channels
m := rebounceMachine[NICE]{
incomingEvents: make(chan NICE, bufferSize),
outgoingEvents: make(chan NICE),
lifeCycle: make(chan lifeCycleState, 1),
queue: make([]NICE, 0, bufferSize),
}
m.SetLifeCycleState(Running)
// ingest loop
go func() {
for niceEvent := range m.incomingEvents {
m.reduce(reduceFunc, niceEvent)
}
}()
// quantize loop
go func() {
for lifeEvent := range m.lifeCycle {
switch lifeEvent {
case Quantizing:
m.quantize(quantizeFunc)
case Emiting:
m.emit()
case Draining:
//close(m.incomingEvents)
m.SetLifeCycleState(Draining)
case Drained:
close(m.outgoingEvents)
}
}
}()
m.lifeCycle <- Quantizing // start quantizer
m.ingest(ingestFunc)
return &m
}
// rebounceMachine implements [Rebouncer]
type rebounceMachine[NICE any] struct {
lifeCycle chan lifeCycleState
incomingEvents chan NICE
outgoingEvents chan NICE
queue Queue[NICE]
mu sync.RWMutex
lifeState lifeCycleState
}
func (m *rebounceMachine[NICE]) SetLifeCycleState(s lifeCycleState) {
m.mu.Lock()
defer m.mu.Unlock()
m.lifeState = s
}
func (m *rebounceMachine[NICE]) GetLifeCycleState() lifeCycleState {
m.mu.Lock()
defer m.mu.Unlock()
return m.lifeState
}
func (m *rebounceMachine[NICE]) Interrupt() {
m.lifeCycle <- Draining
}
func (m *rebounceMachine[NICE]) Subscribe() <-chan NICE {
return m.outgoingEvents
}