-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathbase.go
135 lines (102 loc) · 2.29 KB
/
base.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package ograph
import (
"context"
"sync"
"github.com/symphony09/ograph/ogcore"
)
type BaseNode struct {
name string
Action ogcore.Action
}
func (node BaseNode) Run(ctx context.Context, state ogcore.State) error {
if node.Action != nil {
return node.Action(ctx, state)
}
return nil
}
func (node *BaseNode) Name() string {
return node.name
}
func (node *BaseNode) SetName(name string) {
node.name = name
}
type FuncNode struct {
BaseNode
RunFunc func(ctx context.Context, state ogcore.State) error
}
func (node *FuncNode) Run(ctx context.Context, state ogcore.State) error {
if node.RunFunc != nil {
return node.RunFunc(ctx, state)
} else {
return nil
}
}
func NewFuncNode(runFunc func(ctx context.Context, state ogcore.State) error) *FuncNode {
return &FuncNode{
RunFunc: runFunc,
}
}
type BaseCluster struct {
BaseNode
Group []ogcore.Node
NodeMap map[string]ogcore.Node
}
func (cluster *BaseCluster) Join(nodes []ogcore.Node) {
cluster.Group = append(cluster.Group, nodes...)
if cluster.NodeMap == nil {
cluster.NodeMap = make(map[string]ogcore.Node)
}
for _, node := range nodes {
if nameable, ok := node.(ogcore.Nameable); ok {
cluster.NodeMap[nameable.Name()] = node
}
}
}
func (cluster BaseCluster) Run(ctx context.Context, state ogcore.State) error {
for _, node := range cluster.Group {
if err := node.Run(ctx, state); err != nil {
return err
}
}
return nil
}
type BaseWrapper struct {
BaseNode
ogcore.Node
}
func (wrapper *BaseWrapper) Wrap(node ogcore.Node) {
wrapper.Node = node
}
func (wrapper BaseWrapper) Run(ctx context.Context, state ogcore.State) error {
return wrapper.Node.Run(ctx, state)
}
type BaseState struct {
store map[any]any
sync.RWMutex
}
func (state *BaseState) Get(key any) (any, bool) {
state.RLock()
defer state.RUnlock()
if val, ok := state.store[key]; !ok {
return nil, false
} else {
return val, true
}
}
func (state *BaseState) Set(key any, val any) {
state.Lock()
defer state.Unlock()
state.store[key] = val
}
func (state *BaseState) Update(key any, updateFunc func(val any) any) {
state.Lock()
defer state.Unlock()
oldVal := state.store[key]
newVal := updateFunc(oldVal)
state.store[key] = newVal
}
func NewState() *BaseState {
state := new(BaseState)
state.store = make(map[any]any)
return state
}