-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuffer.go
94 lines (89 loc) · 2.43 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// -----------------------------------------------------------------------------
// worker for running tasks enqueued in background (buffer.go)
//
// Copyright (C) 2024 Frank Mueller / Oldenburg / Germany / World
// -----------------------------------------------------------------------------
package worker // import "tideland.dev/go/worker"
import (
"context"
"sync"
"time"
)
type input func(at actionTask) error
type output func() <-chan actionTask
// ratedBuffer is a buffer that limits the number of tasks that can be retrieved from
// in a given time frame.
func setupRatedBuffer(
ctx context.Context,
rate int,
burst int,
timeout time.Duration,
) (input, output) {
mu := sync.Mutex{}
if rate < 0 {
rate = 1
}
beat := time.Duration(int(time.Second) / rate)
if burst <= 0 {
burst = rate
}
buffer := make([]actionTask, 0, burst)
// Create input function. It accepts a task and tries to add it to the buffer as long
// as the burst limit is not reached. Otherwise it blocks until the buffer is ready.
in := func(at actionTask) error {
wait := time.Nanosecond
countdown := timeout
done := false
for {
select {
case <-ctx.Done():
return ShuttingDownError{}
case <-time.After(wait):
mu.Lock()
if len(buffer) < burst {
buffer = append(buffer, at)
done = true
}
mu.Unlock()
if done {
return nil
}
// Increase wait time and decrease countdown.
wait *= 2
countdown -= wait
case <-time.After(countdown):
return TimeoutError{}
}
}
}
// Create output function. It returns a channel that is filled with tasks from the
// buffer. If the buffer is empty it blocks. The delivery is limited to the given rate
// based limit duration. If the context is done the channel is closed. out can be used
// as iterator in a for task := range out() { ... } loop.
out := func() <-chan actionTask {
outc := make(chan actionTask, 1)
go func() {
ticker := time.NewTicker(beat)
defer ticker.Stop()
for range ticker.C {
mu.Lock()
if len(buffer) > 0 {
outc <- buffer[0]
buffer = buffer[1:]
}
mu.Unlock()
// Check if the context is done.
done, ok := <-ctx.Done()
if ok && done == struct{}{} {
close(outc)
return
}
}
}()
return outc
}
return in, out
}
// -----------------------------------------------------------------------------
// end of file
// -----------------------------------------------------------------------------