-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.go
96 lines (81 loc) · 2.38 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// -----------------------------------------------------------------------------
// worker for running tasks enqueued in background
//
// Copyright (C) 2024 Frank Mueller / Oldenburg / Germany / World
// -----------------------------------------------------------------------------
package worker // import "tideland.dev/go/worker"
import (
"context"
"time"
)
// Worker help to enqueue tasks and process them in the background in order. Stopping
// the worker with the according command ensures that all tasks before are processed.
type Worker struct {
rate int
burst int
timeout time.Duration
in input
out output
}
// New creates a new worker. The options are used to configure the worker. If no
// options are given the worker is created with default settings.
func New(ctx context.Context, options ...Option) (*Worker, error) {
worker := &Worker{}
// Set different options.
for _, option := range options {
if err := option(worker); err != nil {
return nil, err
}
}
// Check if the options are set or use defaults.
if worker.rate == 0 {
worker.rate = defaultRate
}
if worker.burst == 0 {
worker.burst = defaultBurst
}
if worker.timeout == 0 {
worker.timeout = defaultTimeout
}
// Set input and output for limited buffer.
in, out := setupRatedBuffer(ctx, worker.rate, worker.burst, worker.timeout)
worker.in = in
worker.out = out
// Start the worker as goroutine. It's ready when the started channel is closed.
started := make(chan struct{})
go worker.processor(started)
select {
case <-started:
case <-time.After(worker.timeout):
return nil, NotStartedError{}
}
return worker, nil
}
// enqueue passes a task to the worker.
func (w *Worker) enqueue(task actionTask) error {
return w.in(task)
}
// processor runs the worker goroutine for processing the tasks.
func (w *Worker) processor(started chan struct{}) {
close(started)
for atask := range w.out() {
action, task := atask()
// Check action.
switch action {
case actionProcess:
if err := task(); err != nil {
// Handle the error.
// TODO: log the error.
// Continue with the next task.
continue
}
case actionShutdown:
// Shutdown the worker.
// TODO: log the shutdown.
return
}
}
}
// -----------------------------------------------------------------------------
// end of file
// -----------------------------------------------------------------------------