forked from aler9/rtsp-simple-proxy
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathendpoint-lb.go
159 lines (123 loc) · 3.04 KB
/
endpoint-lb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package main
import (
"encoding/json"
"fmt"
"hash/fnv"
"io/ioutil"
"log"
"net/http"
"net/url"
)
// LoadBalancer -> Define general load-balancer interface for
// specific algorithms to implement.
type LoadBalancer interface {
getMapping(string) (string, error)
setResources([]string)
}
// MapToEndpoint -> Public function which gets the mapping from
// input ip to endpoint ip
func MapToUrl(lb LoadBalancer, ip string) (string, error) {
return lb.getMapping(ip)
}
func UseTestingEndpoints(lb LoadBalancer, resources []string) {
lb.setResources(resources)
}
// getSvcUrls -> Given a dest ip, retrieve the associated endpoints
func getUrls(resource string) ([]string, error) {
host := "k8s-apiclient-service:9899"
path := "/svc/resources"
queryParams := "resource=" + url.QueryEscape(resource)
query := fmt.Sprintf("http://%s%s?%s", host, path, queryParams)
log.Printf("Query = %s", query)
resp, err := http.Get(query)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var resources []string
err = json.Unmarshal(body, &resources)
if err != nil {
return nil, err
} else if len(resources) == 0 {
return nil, fmt.Errorf("No resources for url %s!", resource)
}
return resources, nil
}
type RoundRobinLB struct {
resources []string
index int
}
func (rr *RoundRobinLB) setResources(urls []string) {
rr.resources = urls
}
func NewRoundRobinLB(resource string) (*RoundRobinLB, error) {
LB := RoundRobinLB{}
resources, err := getUrls(resource)
if err != nil {
return &LB, err
}
LB.resources = resources
LB.index = -1
return &LB, nil
}
// getMapping -> return endpoint at index then update index
func (rr *RoundRobinLB) getMapping(resource string) (string, error) {
err := rr.advanceIndex()
if err != nil {
return "", err
}
value := rr.resources[rr.index]
return value, nil
}
func (rr *RoundRobinLB) advanceIndex() error {
if len(rr.resources) == 0 {
return fmt.Errorf("len of endpoints cant be 0")
}
// edge case, index needs to reset to 0
if rr.index == (len(rr.resources) - 1) {
rr.index = 0
} else {
rr.index += 1
}
return nil
}
// HashModLB -> Simple hash modulo load balancer type
type HashModLB struct {
resources []string
}
// NewHashModLB -> Instantiate load balander
func NewHashModLB(clusterIP string) (*HashModLB, error) {
LB := HashModLB{}
endpoints, err := getUrls(clusterIP)
if err != nil {
return &LB, err
}
LB.resources = endpoints
return &LB, nil
}
func (hm *HashModLB) setEndpoints(endpoints []string) {
hm.resources = endpoints
}
// getMapping -> Hash the input ip then mod it by the number of
// indices we have.
func (hm *HashModLB) getMapping(clientIP string) (string, error) {
ipHash, err := hash(clientIP)
if err != nil {
return "", err
}
var index = ipHash % uint32(len(hm.resources))
var mapping = hm.resources[index]
return mapping, nil
}
func hash(s string) (uint32, error) {
h := fnv.New32a()
_, err := h.Write([]byte(s))
if err != nil {
return 0, err
}
return h.Sum32(), nil
}