Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize tagging controller workqueue handling #1091

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions pkg/controllers/tagging/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,15 @@ limitations under the License.
package tagging

import (
"sync"

"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"sync"
)

var register sync.Once

var (
workItemDuration = metrics.NewHistogramVec(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we ok dropping this metric?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have an equivalent metric from the workqueue package itself via:

_ "k8s.io/component-base/metrics/prometheus/workqueue" // enable prometheus provider for workqueue metrics

but please make sure we actually scrape that one @kmala !

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

work queue metrics are available by default https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go#L40 and i had verified the same the metrics endpoint of the CCM will provide the tagging controller metrics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we've had those metrics for a while because of the import above (you have to import it for these to be registered: https://github.com/kubernetes/kubernetes/blob/d06398aac3e378da0c95472cd39713998206e9ff/staging/src/k8s.io/component-base/metrics/prometheus/workqueue/metrics.go#L26)

But do we actually scrape those? I'm pretty sure we've been scraping this other metric that we're removing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, these metrics are available and i had verified the same by querying the metrics endpoint.
what do you mean by scrape those?

&metrics.HistogramOpts{
Name: "cloudprovider_aws_tagging_controller_work_item_duration_seconds",
Help: "workitem latency of workitem being in the queue and time it takes to process",
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(0.5, 1.5, 20),
},
[]string{"latency_type"})

workItemError = metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "cloudprovider_aws_tagging_controller_work_item_errors_total",
Expand All @@ -43,15 +35,10 @@ var (
// registerMetrics registers tagging-controller metrics.
func registerMetrics() {
register.Do(func() {
legacyregistry.MustRegister(workItemDuration)
legacyregistry.MustRegister(workItemError)
})
}

func recordWorkItemLatencyMetrics(latencyType string, timeTaken float64) {
workItemDuration.With(metrics.Labels{"latency_type": latencyType}).Observe(timeTaken)
}

func recordWorkItemErrorMetrics(errorType string, instanceID string) {
workItemError.With(metrics.Labels{"error_type": errorType, "instance_id": instanceID}).Inc()
}
100 changes: 57 additions & 43 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
Expand All @@ -42,16 +43,21 @@ func init() {
registerMetrics()
}

// workItem contains the node and an action for that node
// taggingControllerNode contains the node details required for tag/untag of node resources.
type taggingControllerNode struct {
providerID string
name string
}

// workItem contains the node name, provider id and an action for that node.
type workItem struct {
node *v1.Node
action func(node *v1.Node) error
requeuingCount int
enqueueTime time.Time
Comment on lines -49 to -50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So these 2 fields were the crux of the issue, right? If a workItem already existed in the queue, it's enqueueTime would always be different, even if it's requeingCount happened to match (0).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, mostly

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong, the requeuingCount looks like there is a maximum retry. Are we good to remove it?
Is there a option to keep those fields and only compare the node identity fields when enqueue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can get the requeue count from the workqueue itself and i changed the code to use that. So the functionality remains the same as before.

name string
providerID string
action string
}

func (w workItem) String() string {
return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime)
return fmt.Sprintf("[Node: %s, Action: %s]", w.name, w.action)
}

const (
Expand All @@ -62,17 +68,15 @@ const (
// The label for depicting total number of errors a work item encounter and succeed
totalErrorsWorkItemErrorMetric = "total_errors"

// The label for depicting total time when work item gets queued to processed
workItemProcessingTimeWorkItemMetric = "work_item_processing_time"

// The label for depicting total time when work item gets queued to dequeued
workItemDequeuingTimeWorkItemMetric = "work_item_dequeuing_time"

// The label for depicting total number of errors a work item encounter and fail
errorsAfterRetriesExhaustedWorkItemErrorMetric = "errors_after_retries_exhausted"

// The period of time after Node creation to retry tagging due to eventual consistency of the CreateTags API.
newNodeEventualConsistencyGracePeriod = time.Minute * 5

addTag = "ADD"

deleteTag = "DELETE"
)

// Controller is the controller implementation for tagging cluster resources.
Expand Down Expand Up @@ -152,7 +156,7 @@ func NewTaggingController(
tc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
tc.enqueueNode(node, tc.tagNodesResources)
tc.enqueueNode(node, addTag)
},
UpdateFunc: func(oldObj, newObj interface{}) {
node := newObj.(*v1.Node)
Expand All @@ -165,11 +169,11 @@ func NewTaggingController(
return
}

tc.enqueueNode(node, tc.tagNodesResources)
tc.enqueueNode(node, addTag)
},
DeleteFunc: func(obj interface{}) {
node := obj.(*v1.Node)
tc.enqueueNode(node, tc.untagNodeResources)
tc.enqueueNode(node, deleteTag)
},
})

Expand Down Expand Up @@ -215,21 +219,17 @@ func (tc *Controller) process() bool {
err := func(obj interface{}) error {
defer tc.workqueue.Done(obj)

workItem, ok := obj.(*workItem)
workItem, ok := obj.(workItem)
if !ok {
tc.workqueue.Forget(obj)
err := fmt.Errorf("expected workItem in workqueue but got %s", obj)
utilruntime.HandleError(err)
return nil
}

timeTaken := time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken)
klog.Infof("Dequeuing latency %f seconds", timeTaken)

instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID()
instanceID, err := awsv1.KubernetesInstanceID(workItem.providerID).MapToAWSInstanceID()
if err != nil {
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err)
err = fmt.Errorf("error in getting instanceID for node %s, error: %v", workItem.name, err)
utilruntime.HandleError(err)
return nil
}
Expand All @@ -241,26 +241,31 @@ func (tc *Controller) process() bool {
tc.workqueue.Forget(obj)
return nil
}

err = workItem.action(workItem.node)

if workItem.action == addTag {
err = tc.tagNodesResources(&taggingControllerNode{
name: workItem.name,
providerID: workItem.providerID,
})
} else {
err = tc.untagNodeResources(&taggingControllerNode{
name: workItem.name,
providerID: workItem.providerID,
})
}
if err != nil {
if workItem.requeuingCount < maxRequeuingCount {
numRetries := tc.workqueue.NumRequeues(workItem)
if numRetries < maxRequeuingCount {
// Put the item back on the workqueue to handle any transient errors.
workItem.requeuingCount++
tc.workqueue.AddRateLimited(workItem)

recordWorkItemErrorMetrics(totalErrorsWorkItemErrorMetric, string(instanceID))
return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount)
return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), numRetries)
}

klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error())
recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID))
} else {
klog.Infof("Finished processing %s", workItem)
timeTaken = time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken)
klog.Infof("Processing latency %f seconds", timeTaken)
}

tc.workqueue.Forget(obj)
Expand All @@ -277,11 +282,19 @@ func (tc *Controller) process() bool {

// tagNodesResources tag node resources
// If we want to tag more resources, modify this function appropriately
func (tc *Controller) tagNodesResources(node *v1.Node) error {
func (tc *Controller) tagNodesResources(node *taggingControllerNode) error {
for _, resource := range tc.resources {
switch resource {
case opt.Instance:
err := tc.tagEc2Instance(node)
v1node, err := tc.nodeInformer.Lister().Get(node.name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to try to capture all necessary information in taggingControllerNode itself to avoid this lookup?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(in a follow up!)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to have a lookup if we want to avoid having multiple items in workqueue since the labels can be different on different updates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The *Node should point to the same struct though, right? the watcher/lister should only have one copy of the Node in memory

if err != nil {
// If node not found, just ignore it as its okay to not add tags when the node object is deleted.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
err = tc.tagEc2Instance(v1node)
if err != nil {
return err
}
Expand Down Expand Up @@ -334,7 +347,7 @@ func (tc *Controller) tagEc2Instance(node *v1.Node) error {

// untagNodeResources untag node resources
// If we want to untag more resources, modify this function appropriately
func (tc *Controller) untagNodeResources(node *v1.Node) error {
func (tc *Controller) untagNodeResources(node *taggingControllerNode) error {
for _, resource := range tc.resources {
switch resource {
case opt.Instance:
Expand All @@ -350,13 +363,13 @@ func (tc *Controller) untagNodeResources(node *v1.Node) error {

// untagEc2Instances deletes the provided tags to each EC2 instances in
// the cluster.
func (tc *Controller) untagEc2Instance(node *v1.Node) error {
instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
func (tc *Controller) untagEc2Instance(node *taggingControllerNode) error {
instanceID, _ := awsv1.KubernetesInstanceID(node.providerID).MapToAWSInstanceID()

err := tc.cloud.UntagResource(string(instanceID), tc.tags)

if err != nil {
klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err)
klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.name, err)
return err
}

Expand All @@ -367,12 +380,13 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error {

// enqueueNode takes in the object and an
// action for the object for a workitem and enqueue to the workqueue
func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) {
item := &workItem{
node: node,
action: action,
requeuingCount: 0,
enqueueTime: time.Now(),
func (tc *Controller) enqueueNode(node *v1.Node, action string) {
// if the struct has fields which are all comparable then the workqueue add will handle make sure multiple adds of the same object
// will only have one item in the workqueue.
item := workItem{
name: node.GetName(),
providerID: node.Spec.ProviderID,
action: action,
Comment on lines +384 to +389
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The action func wasn't comparable, so this change allows multiple workitems for the same node, with different actions. Is that desirable? Why change action to a string?

Copy link
Contributor

@ndbaker1 ndbaker1 Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking the same. Seems we're relying on queuing-order to determine the final state which boils down to a last-write-wins situation in the workqueue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this change allows multiple workitems for the same node, with different actions. Is that desirable?

Before we are adding to workitems for every update if the add hasn't happened and there could be work item for the delete also. With this change we would only have maximum of 2 work items in the queue which should be the same behavior as before as as single or multiple add tag work items would mean the same behavior at the end.

the action to a string is made so that its comparable and will prevent duplicate updates to not add to the workqueue.

}

if tc.rateLimitEnabled {
Expand Down
86 changes: 80 additions & 6 deletions pkg/controllers/tagging/tagging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"golang.org/x/time/rate"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -221,27 +222,32 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
nodeMonitorPeriod: 1 * time.Second,
tags: map[string]string{"key2": "value2", "key1": "value1"},
resources: []string{"instance"},
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"),
rateLimitEnabled: testcase.rateLimited,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Millisecond, 5*time.Millisecond),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), "Tagging"),
rateLimitEnabled: testcase.rateLimited,
}

if testcase.toBeTagged {
tc.enqueueNode(testcase.currNode, tc.tagNodesResources)
tc.enqueueNode(testcase.currNode, addTag)
} else {
tc.enqueueNode(testcase.currNode, tc.untagNodeResources)
tc.enqueueNode(testcase.currNode, deleteTag)
}

if tc.rateLimitEnabled {
// If rate limit is enabled, sleep for 10 ms to wait for the item to be added to the queue since the base delay is 5 ms.
time.Sleep(10 * time.Millisecond)
}

cnt := 0
for tc.workqueue.Len() > 0 {
tc.process()

cnt++
// sleep briefly because of exponential backoff when requeueing failed workitem
// resulting in workqueue to be empty if checked immediately
time.Sleep(1500 * time.Millisecond)
time.Sleep(7 * time.Millisecond)
}

for _, msg := range testcase.expectedMessages {
Expand All @@ -256,12 +262,80 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
if !strings.Contains(logBuf.String(), "requeuing count exceeded") {
t.Errorf("\nExceeded requeue count but did not stop: \n%v\n", logBuf.String())
}
if cnt != maxRequeuingCount+1 {
t.Errorf("the node got requeued %d, more than the max requeuing count of %d", cnt, maxRequeuingCount)
}
}
}
})
}
}

func TestMultipleEnqueues(t *testing.T) {
awsServices := awsv1.NewFakeAWSServices(TestClusterID)
fakeAws, _ := awsv1.NewAWSCloud(config.CloudConfig{}, awsServices)

testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "i-0001",
},
}
testNode1 := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "i-0002",
},
}
clientset := fake.NewSimpleClientset(testNode, testNode1)
informer := informers.NewSharedInformerFactory(clientset, time.Second)
nodeInformer := informer.Core().V1().Nodes()

if err := syncNodeStore(nodeInformer, clientset); err != nil {
t.Errorf("unexpected error: %v", err)
}

tc, err := NewTaggingController(nodeInformer, clientset, fakeAws, time.Second, nil, []string{}, 0, 0)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
tc.enqueueNode(testNode, addTag)
if tc.workqueue.Len() != 1 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
// adding the same node with similar operation shouldn't add to the workqueue
tc.enqueueNode(testNode, addTag)
if tc.workqueue.Len() != 1 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
// adding the same node with different operation should add to the workqueue
tc.enqueueNode(testNode, deleteTag)
if tc.workqueue.Len() != 2 {
t.Errorf("invalid work queue length, expected 2, got %d", tc.workqueue.Len())
}
// adding the different node should add to the workqueue
tc.enqueueNode(testNode1, addTag)
if tc.workqueue.Len() != 3 {
t.Errorf("invalid work queue length, expected 3, got %d", tc.workqueue.Len())
}
// should handle the add tag properly
tc.process()
if tc.workqueue.Len() != 2 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
// should handle the delete tag properly
tc.process()
if tc.workqueue.Len() != 1 {
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
}
}

func syncNodeStore(nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error {
nodes, err := f.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
Expand Down
Loading