forked from infobloxopen/kubenodes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkubenodes.go
227 lines (201 loc) · 5.94 KB
/
kubenodes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package kubenodes
import (
"context"
"fmt"
"net"
"reflect"
"strings"
"sync"
"time"
"github.com/miekg/dns"
core "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/pkg/dnsutil"
"github.com/coredns/coredns/plugin/pkg/fall"
"github.com/coredns/coredns/plugin/pkg/upstream"
"github.com/coredns/coredns/request"
)
// KubeNodes is a plugin that creates records for a Kubernetes cluster's nodes.
type KubeNodes struct {
Next plugin.Handler
Zones []string
Upstream upstreamer
Fall fall.F
ttl uint32
ipType, dnsType core.NodeAddressType
// Kubernetes API interface
client kubernetes.Interface
controller cache.Controller
indexer cache.Indexer
// concurrency control to stop controller
stopLock sync.Mutex
shutdown bool
stopCh chan struct{}
}
type upstreamer interface {
Lookup(ctx context.Context, state request.Request, name string, typ uint16) (*dns.Msg, error)
}
// New returns a initialized KubeNodes.
func New(zones []string) *KubeNodes {
k := new(KubeNodes)
k.Zones = zones
k.Upstream = upstream.New()
k.ttl = defaultTTL
k.stopCh = make(chan struct{})
k.ipType = core.NodeInternalIP
k.dnsType = core.NodeInternalDNS
return k
}
const (
// defaultTTL to apply to all answers.
defaultTTL = 5
)
// Name implements the Handler interface.
func (k KubeNodes) Name() string { return "kubenodes" }
// ServeDNS implements the plugin.Handler interface.
func (k KubeNodes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
state := request.Request{W: w, Req: r}
qname := state.Name()
zone := plugin.Zones(k.Zones).Matches(qname)
if zone == "" || !supportedQtype(state.QType()) {
return plugin.NextOrFailure(k.Name(), k.Next, ctx, w, r)
}
zone = state.QName()[len(qname)-len(zone):] // maintain case of original query
state.Zone = zone
if len(zone) == len(qname) {
writeResponse(w, r, nil, nil, []dns.RR{k.soa()}, dns.RcodeSuccess)
return dns.RcodeSuccess, nil
}
// handle reverse lookups
if state.QType() == dns.TypePTR {
if addr := dnsutil.ExtractAddressFromReverse(qname); addr != "" {
objs, err := k.indexer.ByIndex("reverse", addr)
if err != nil {
return dns.RcodeServerFailure, err
}
if len(objs) == 0 {
if k.Fall.Through(state.Name()) {
return plugin.NextOrFailure(k.Name(), k.Next, ctx, w, r)
}
writeResponse(w, r, nil, nil, []dns.RR{k.soa()}, dns.RcodeNameError)
return dns.RcodeNameError, nil
}
var records []dns.RR
for _, obj := range objs {
node, ok := obj.(*core.Node)
if !ok {
return dns.RcodeServerFailure, fmt.Errorf("unexpected %q from *Node index", reflect.TypeOf(obj))
}
records = append(records, &dns.PTR{
Hdr: dns.RR_Header{Name: qname, Rrtype: dns.TypePTR, Class: dns.ClassINET, Ttl: k.ttl},
Ptr: dnsutil.Join(node.Name, k.Zones[0]),
})
}
writeResponse(w, r, records, nil, nil, dns.RcodeSuccess)
return dns.RcodeSuccess, nil
}
}
nodeName := state.Name()[0 : len(qname)-len(zone)-1]
if zone == "." {
nodeName = state.Name()[0 : len(qname)-len(zone)]
}
// get the node by key name from the indexer
item, exists, err := k.indexer.GetByKey(nodeName)
if err != nil {
return dns.RcodeServerFailure, err
}
if !exists {
if k.Fall.Through(state.Name()) {
return plugin.NextOrFailure(k.Name(), k.Next, ctx, w, r)
}
writeResponse(w, r, nil, nil, []dns.RR{k.soa()}, dns.RcodeNameError)
return dns.RcodeNameError, nil
}
node, ok := item.(*core.Node)
if !ok {
return dns.RcodeServerFailure, fmt.Errorf("unexpected %q from *Node index", reflect.TypeOf(item))
}
// extract IPs from the node
var ips []string
for _, addr := range node.Status.Addresses {
switch addr.Type {
case k.ipType:
ips = append(ips, addr.Address)
case k.dnsType:
// DNS type node addresses cant be represented as CNAMEs, since a CNAME record must be the only record with
// it's name. So look up the IP address and append them to ips
m, err := k.Upstream.Lookup(ctx, state, addr.Address, state.QType())
if err != nil {
return dns.RcodeServerFailure, err
}
for _, a := range m.Answer {
switch a.Header().Rrtype {
case dns.TypeA:
ips = append(ips, a.(*dns.A).A.String())
case dns.TypeAAAA:
ips = append(ips, a.(*dns.AAAA).AAAA.String())
}
}
}
}
// build response records
var records []dns.RR
if state.QType() == dns.TypeA {
for _, ip := range ips {
if strings.Contains(ip, ":") {
continue
}
if netIP := net.ParseIP(ip); netIP != nil {
records = append(records, &dns.A{A: netIP,
Hdr: dns.RR_Header{Name: qname, Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: k.ttl}})
}
}
}
if state.QType() == dns.TypeAAAA {
for _, ip := range ips {
if !strings.Contains(ip, ":") {
continue
}
if netIP := net.ParseIP(ip); netIP != nil {
records = append(records, &dns.AAAA{AAAA: netIP,
Hdr: dns.RR_Header{Name: qname, Rrtype: dns.TypeAAAA, Class: dns.ClassINET, Ttl: k.ttl}})
}
}
}
writeResponse(w, r, records, nil, nil, dns.RcodeSuccess)
return dns.RcodeSuccess, nil
}
func writeResponse(w dns.ResponseWriter, r *dns.Msg, answer, extra, ns []dns.RR, rcode int) {
m := new(dns.Msg)
m.SetReply(r)
m.Rcode = rcode
m.Authoritative = true
m.Answer = answer
m.Extra = extra
m.Ns = ns
w.WriteMsg(m)
}
func (k KubeNodes) soa() *dns.SOA {
return &dns.SOA{
Hdr: dns.RR_Header{Name: k.Zones[0], Rrtype: dns.TypeSOA, Class: dns.ClassINET, Ttl: k.ttl},
Ns: dnsutil.Join("ns.dns", k.Zones[0]),
Mbox: dnsutil.Join("hostmaster.dns", k.Zones[0]),
Serial: uint32(time.Now().Unix()),
Refresh: 7200,
Retry: 1800,
Expire: 86400,
Minttl: k.ttl,
}
}
func supportedQtype(qtype uint16) bool {
switch qtype {
case dns.TypeA, dns.TypeAAAA, dns.TypePTR:
return true
default:
return false
}
}
// Ready implements the ready.Readiness interface.
func (k *KubeNodes) Ready() bool { return k.controller.HasSynced() }