Skip to content

Commit

Permalink
fix: Handle async delete in stateless cni (#2967)
Browse files Browse the repository at this point in the history
* feat: adding stateless CNI pipeline test

* feat: making change for stateless CNI pipeline

* feat: addressing the comments

* fix: fixing stateles cni yaml

* fix: stateless CNI delete fix

* fix: addressing the comments

* fix: addressing the comments and fix linter issues

* Update cns/fsnotify/fsnotify.go

Co-authored-by: tamilmani1989 <tamanoha@microsoft.com>
Signed-off-by: Behzad Mirkhanzadeh <b.mirkhanzadeh@gmail.com>

* Update cni/network/network.go

Co-authored-by: tamilmani1989 <tamanoha@microsoft.com>
Signed-off-by: Behzad Mirkhanzadeh <b.mirkhanzadeh@gmail.com>

* Update cni/network/network.go

Co-authored-by: tamilmani1989 <tamanoha@microsoft.com>
Signed-off-by: Behzad Mirkhanzadeh <b.mirkhanzadeh@gmail.com>

* fix: addressing the comments

* fix: fix the error code.

* Fix: decoupling hnsclient form CNS watcher

* fix: adding endpointmanager package to resolve platfrom specific call to HNS

* Update cns/endpointmanager/endpointmanager_linux.go

Co-authored-by: Evan Baker <rbtr@users.noreply.github.com>
Signed-off-by: Behzad Mirkhanzadeh <b.mirkhanzadeh@gmail.com>

* Update cns/service/main.go

Co-authored-by: Evan Baker <rbtr@users.noreply.github.com>
Signed-off-by: Behzad Mirkhanzadeh <b.mirkhanzadeh@gmail.com>

* Fix: addressing the comments

* fix: removing stateless CNI pipline changes form the PR

* Update cns/configuration/configuration.go

Co-authored-by: Evan Baker <rbtr@users.noreply.github.com>
Signed-off-by: Behzad Mirkhanzadeh <b.mirkhanzadeh@gmail.com>

* addressing the comment

---------

Signed-off-by: Behzad Mirkhanzadeh <b.mirkhanzadeh@gmail.com>
Co-authored-by: tamilmani1989 <tamanoha@microsoft.com>
Co-authored-by: Evan Baker <rbtr@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 21, 2024
1 parent 4c0eb94 commit a9fccfa
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 18 deletions.
23 changes: 22 additions & 1 deletion cni/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/Azure/azure-container-networking/cni/util"
"github.com/Azure/azure-container-networking/cns"
cnscli "github.com/Azure/azure-container-networking/cns/client"
"github.com/Azure/azure-container-networking/cns/fsnotify"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/dhcp"
"github.com/Azure/azure-container-networking/iptables"
Expand Down Expand Up @@ -1131,18 +1132,38 @@ func (plugin *NetPlugin) Delete(args *cniSkel.CmdArgs) error {
// network ID is passed in and used only for migration
// otherwise, in stateless, we don't need the network id for deletion
epInfos, err = plugin.nm.GetEndpointState(networkID, args.ContainerID)
// if stateless CNI fail to get the endpoint from CNS for any reason other than Endpoint Not found
if err != nil {
if errors.Is(err, network.ErrConnectionFailure) {
logger.Info("failed to connect to CNS", zap.String("containerID", args.ContainerID), zap.Error(err))
addErr := fsnotify.AddFile(args.ContainerID, args.ContainerID, watcherPath)
logger.Info("add containerid file for Asynch delete", zap.String("containerID", args.ContainerID), zap.Error(addErr))
if addErr != nil {
logger.Error("failed to add file to watcher", zap.String("containerID", args.ContainerID), zap.Error(addErr))
return errors.Wrap(addErr, fmt.Sprintf("failed to add file to watcher with containerID %s", args.ContainerID))
}
return nil
}
if errors.Is(err, network.ErrEndpointStateNotFound) {
logger.Info("Endpoint Not found", zap.String("containerID", args.ContainerID), zap.Error(err))
return nil
}
logger.Error("Get Endpoint State API returned error", zap.String("containerID", args.ContainerID), zap.Error(err))
return plugin.RetriableError(fmt.Errorf("failed to delete endpoint: %w", err))
}
} else {
epInfos = plugin.nm.GetEndpointInfosFromContainerID(args.ContainerID)
}

// for when the endpoint is not created, but the ips are already allocated (only works if single network, single infra)
// stateless cni won't have this issue
// this block is not applied to stateless CNI
if len(epInfos) == 0 {
endpointID := plugin.nm.GetEndpointID(args.ContainerID, args.IfName)
if !nwCfg.MultiTenancy {
logger.Error("Failed to query endpoint",
zap.String("endpoint", endpointID),
zap.Error(err))

logger.Error("Release ip by ContainerID (endpoint not found)",
zap.String("containerID", args.ContainerID))
sendEvent(plugin, fmt.Sprintf("Release ip by ContainerID (endpoint not found):%v", args.ContainerID))
Expand Down
21 changes: 12 additions & 9 deletions cns/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,30 +1028,33 @@ func (c *Client) GetEndpoint(ctx context.Context, endpointID string) (*restserve
// build the request
u := c.routes[cns.EndpointAPI]
uString := u.String() + endpointID
var response restserver.GetEndpointResponse
req, err := http.NewRequestWithContext(ctx, http.MethodGet, uString, http.NoBody)
if err != nil {
return nil, errors.Wrap(err, "failed to build request")
response.Response.ReturnCode = types.UnexpectedError
return &response, errors.Wrap(err, "failed to build request")
}

req.Header.Set(headerContentType, contentTypeJSON)
res, err := c.client.Do(req)
if err != nil {
return nil, errors.Wrap(err, "http request failed")
response.Response.ReturnCode = types.ConnectionError
return &response, &ConnectionFailureErr{cause: err}
}

defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return nil, errors.Errorf("http response %d", res.StatusCode)
response.Response.ReturnCode = types.UnexpectedError
return &response, errors.Errorf("http response %d", res.StatusCode)
}

var response restserver.GetEndpointResponse
err = json.NewDecoder(res.Body).Decode(&response)
if err != nil {
return nil, errors.Wrap(err, "failed to decode GetEndpointResponse")
response.Response.ReturnCode = types.UnexpectedError
return &response, errors.Wrap(err, "failed to decode GetEndpointResponse")
}

if response.Response.ReturnCode != 0 {
return nil, errors.New(response.Response.Message)
return &response, errors.New(response.Response.Message)
}

return &response, nil
Expand All @@ -1076,7 +1079,7 @@ func (c *Client) UpdateEndpoint(ctx context.Context, endpointID string, ipInfo m
req.Header.Set(headerContentType, contentTypeJSON)
res, err := c.client.Do(req)
if err != nil {
return nil, errors.Wrap(err, "http request failed with error from server")
return nil, &ConnectionFailureErr{cause: err}
}

defer res.Body.Close()
Expand Down
6 changes: 6 additions & 0 deletions cns/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"os"
"path/filepath"
"runtime"
"strings"

"github.com/Azure/azure-container-networking/cns"
Expand Down Expand Up @@ -231,3 +232,8 @@ func SetCNSConfigDefaults(config *CNSConfig) {
config.GRPCSettings.Enable = false
config.WatchPods = config.EnableIPAMv2 || config.EnableSwiftV2
}

// isStalessCNIMode verify if the CNI is running stateless mode
func (cnsconfig *CNSConfig) IsStalessCNIWindows() bool {
return !cnsconfig.InitializeFromCNI && cnsconfig.ManageEndpointState && runtime.GOOS == "windows"
}
21 changes: 21 additions & 0 deletions cns/endpointmanager/endpointmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package endpointmanager

import (
"context"

"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/restserver"
)

type EndpointManager struct {
cli releaseIPsClient // nolint
}

type releaseIPsClient interface {
ReleaseIPs(ctx context.Context, ipconfig cns.IPConfigsRequest) error
GetEndpoint(ctx context.Context, endpointID string) (*restserver.GetEndpointResponse, error)
}

func WithPlatformReleaseIPsManager(cli releaseIPsClient) *EndpointManager {
return &EndpointManager{cli: cli}
}
13 changes: 13 additions & 0 deletions cns/endpointmanager/endpointmanager_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package endpointmanager

import (
"context"

"github.com/Azure/azure-container-networking/cns"
"github.com/pkg/errors"
)

// ReleaseIPs implements an Interface in fsnotify for async delete of the HNS endpoint and IP addresses
func (em *EndpointManager) ReleaseIPs(ctx context.Context, ipconfigreq cns.IPConfigsRequest) error {
return errors.Wrap(em.cli.ReleaseIPs(ctx, ipconfigreq), "failed to release IP from CNS")
}
42 changes: 42 additions & 0 deletions cns/endpointmanager/endpointmanager_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package endpointmanager

import (
"context"

"github.com/Azure/azure-container-networking/cns"
"github.com/Azure/azure-container-networking/cns/hnsclient"
"github.com/Azure/azure-container-networking/cns/logger"
"github.com/pkg/errors"
)

// ReleaseIPs implements an Interface in fsnotify for async delete of the HNS endpoint and IP addresses
func (em *EndpointManager) ReleaseIPs(ctx context.Context, ipconfigreq cns.IPConfigsRequest) error {
logger.Printf("deleting HNS Endpoint asynchronously")
// remove HNS endpoint
if err := em.deleteEndpoint(ctx, ipconfigreq.InfraContainerID); err != nil {
logger.Errorf("failed to remove HNS endpoint %s", err.Error())
}
return errors.Wrap(em.cli.ReleaseIPs(ctx, ipconfigreq), "failed to release IP from CNS")
}

// deleteEndpoint API to get the state and then remove assiciated HNS
func (em *EndpointManager) deleteEndpoint(ctx context.Context, containerid string) error {
endpointResponse, err := em.cli.GetEndpoint(ctx, containerid)
if err != nil {
return errors.Wrap(err, "failed to read the endpoint from CNS state")
}
for _, ipInfo := range endpointResponse.EndpointInfo.IfnameToIPMap {
hnsEndpointID := ipInfo.HnsEndpointID
// we need to get the HNSENdpoint via the IP address if the HNSEndpointID is not present in the statefile
if ipInfo.HnsEndpointID == "" {
if hnsEndpointID, err = hnsclient.GetHNSEndpointbyIP(ipInfo.IPv4, ipInfo.IPv6); err != nil {
return errors.Wrap(err, "failed to find HNS endpoint with id")
}
}
logger.Printf("deleting HNS Endpoint with id %v", hnsEndpointID)
if err := hnsclient.DeleteHNSEndpointbyID(hnsEndpointID); err != nil {
return errors.Wrap(err, "failed to delete HNS endpoint with id "+ipInfo.HnsEndpointID)
}
}
return nil
}
6 changes: 3 additions & 3 deletions cns/fsnotify/fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
"golang.org/x/sync/errgroup"
)

type releaseIPsClient interface {
type ReleaseIPsClient interface {
ReleaseIPs(ctx context.Context, ipconfig cns.IPConfigsRequest) error
}

type watcher struct {
cli releaseIPsClient
cli ReleaseIPsClient
path string
log *zap.Logger

Expand All @@ -29,7 +29,7 @@ type watcher struct {
}

// Create the AsyncDelete watcher.
func New(cli releaseIPsClient, path string, logger *zap.Logger) (*watcher, error) { //nolint
func New(cli ReleaseIPsClient, path string, logger *zap.Logger) (*watcher, error) { //nolint
// Add directory where intended deletes are kept
if err := os.Mkdir(path, 0o755); err != nil && !errors.Is(err, fs.ErrExist) { //nolint
logger.Error("error making directory", zap.String("path", path), zap.Error(err))
Expand Down
69 changes: 69 additions & 0 deletions cns/hnsclient/hnsclient_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/Azure/azure-container-networking/network/policy"
"github.com/Microsoft/hcsshim"
"github.com/Microsoft/hcsshim/hcn"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -75,6 +76,9 @@ const (

// signals a APIPA endpoint type
apipaEndpointType = "APIPA"

// default network name used by HNS
defaultNetworkName = "azure"
)

// Named Lock for network and endpoint creation/deletion
Expand Down Expand Up @@ -685,3 +689,68 @@ func DeleteHostNCApipaEndpoint(

return nil
}

// DeleteHNSEndpointbyID deletes the HNS endpoint
func DeleteHNSEndpointbyID(hnsEndpointID string) error {
var (
hcnEndpoint *hcn.HostComputeEndpoint
err error
)

logger.Printf("Deleting hcn endpoint with id %v", hnsEndpointID)
hcnEndpoint, err = hcn.GetEndpointByID(hnsEndpointID)
if err != nil {
// If error is anything other than EndpointNotFoundError, return error.
// else log the error but don't return error because endpoint is already deleted.
var notFoundErr hcn.EndpointNotFoundError
if errors.As(err, &notFoundErr) {
return fmt.Errorf("Failed to get hcn endpoint with id: %s due to err: %w", hnsEndpointID, err)
}

logger.Errorf("Delete called on the Endpoint which doesn't exist. Error:%v", err)
return nil
}

// Remove this endpoint from the namespace
if err = hcn.RemoveNamespaceEndpoint(hcnEndpoint.HostComputeNamespace, hcnEndpoint.Id); err != nil {
logger.Errorf("Failed to remove hcn endpoint %s from namespace %s due to err: %v", hcnEndpoint.Id, hcnEndpoint.HostComputeNamespace, err)
}

if err = hcnEndpoint.Delete(); err != nil {
return fmt.Errorf("Failed to delete endpoint: %s. Error: %w", hnsEndpointID, err)
}

logger.Errorf("[Azure CNS] Successfully deleted endpoint: %+v", hnsEndpointID)

return nil
}

// GetHNSEndpointbyIP returns an HNSEndpoint with the corrsponding HNS Endpoint ID that matches an specific IP Address.
func GetHNSEndpointbyIP(ipv4, ipv6 []net.IPNet) (string, error) {
logger.Printf("Fetching missing HNS endpoint id for endpoints in network with id %s", defaultNetworkName)
hnsResponse, err := hcn.GetNetworkByName(defaultNetworkName)
if err != nil || hnsResponse == nil {
return "", errors.Wrapf(err, "HNS Network or endpoints not found")
}
hcnEndpoints, err := hcn.ListEndpointsOfNetwork(hnsResponse.Id)
if err != nil {
return "", errors.Wrapf(err, "failed to fetch HNS endpoints for the given network")
}
for i := range hcnEndpoints {
for _, ipConfiguration := range hcnEndpoints[i].IpConfigurations {
for _, ip := range ipv4 {
if ipConfiguration.IpAddress == ip.IP.String() {
logger.Printf("Successfully found hcn endpoint id for endpoint %s with ip %s", hcnEndpoints[i].Id, ip.IP.String())
return hcnEndpoints[i].Id, nil
}
}
for _, ip := range ipv6 {
if ipConfiguration.IpAddress == ip.IP.String() {
logger.Printf("Successfully found hcn endpoint id for endpoint %s with ip %s", hcnEndpoints[i].Id, ip.IP.String())
return hcnEndpoints[i].Id, nil
}
}
}
}
return "", errors.Wrapf(err, "No HNSEndpointID matches the IPAddress")
}
2 changes: 1 addition & 1 deletion cns/restserver/ipam.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ func (service *HTTPRestService) GetEndpointHelper(endpointID string) (*EndpointI
} else {
logger.Errorf("[GetEndpointState] Failed to retrieve state, err:%v", err)
}
return nil, errors.Wrap(err, "[GetEndpointState] Failed to retrieve state")
return nil, ErrEndpointStateNotFound
}
if endpointInfo, ok := service.EndpointState[endpointID]; ok {
logger.Warnf("[GetEndpointState] Found existing endpoint state for container %s", endpointID)
Expand Down
11 changes: 10 additions & 1 deletion cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/Azure/azure-container-networking/cns/cnireconciler"
"github.com/Azure/azure-container-networking/cns/common"
"github.com/Azure/azure-container-networking/cns/configuration"
"github.com/Azure/azure-container-networking/cns/endpointmanager"
"github.com/Azure/azure-container-networking/cns/fsnotify"
"github.com/Azure/azure-container-networking/cns/grpc"
"github.com/Azure/azure-container-networking/cns/healthserver"
Expand Down Expand Up @@ -950,14 +951,22 @@ func main() {

if cnsconfig.EnableAsyncPodDelete {
// Start fs watcher here
z.Info("AsyncPodDelete is enabled")
logger.Printf("AsyncPodDelete is enabled")
cnsclient, err := cnsclient.New("", cnsReqTimeout) //nolint
if err != nil {
z.Error("failed to create cnsclient", zap.Error(err))
}
go func() {
_ = retry.Do(func() error {
z.Info("starting fsnotify watcher to process missed Pod deletes")
w, err := fsnotify.New(cnsclient, cnsconfig.AsyncPodDeletePath, z)
logger.Printf("starting fsnotify watcher to process missed Pod deletes")
var endpointCleanup fsnotify.ReleaseIPsClient = cnsclient
// using endpointmanager implmentation for stateless CNI sceanrio to remove HNS endpoint alongside the IPs
if cnsconfig.IsStalessCNIWindows() {
endpointCleanup = endpointmanager.WithPlatformReleaseIPsManager(cnsclient)
}
w, err := fsnotify.New(endpointCleanup, cnsconfig.AsyncPodDeletePath, z)
if err != nil {
z.Error("failed to create fsnotify watcher", zap.Error(err))
return errors.Wrap(err, "failed to create fsnotify watcher, will retry")
Expand Down
1 change: 1 addition & 0 deletions cns/types/codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
StatusUnauthorized ResponseCode = 42
UnsupportedAPI ResponseCode = 43
FailedToAllocateBackendConfig ResponseCode = 44
ConnectionError ResponseCode = 45
UnexpectedError ResponseCode = 99
)

Expand Down
7 changes: 5 additions & 2 deletions network/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package network
import "errors"

var (
errSubnetV6NotFound = errors.New("Couldn't find ipv6 subnet in network info")
errV6SnatRuleNotSet = errors.New("ipv6 snat rule not set. Might be VM ipv6 address missing")
errSubnetV6NotFound = errors.New("Couldn't find ipv6 subnet in network info") // nolint
errV6SnatRuleNotSet = errors.New("ipv6 snat rule not set. Might be VM ipv6 address missing") // nolint
ErrEndpointStateNotFound = errors.New("endpoint state could not be found in the statefile")
ErrConnectionFailure = errors.New("couldn't connect to CNS")
ErrGetEndpointStateFailure = errors.New("failure to obtain the endpoint state")
)
9 changes: 8 additions & 1 deletion network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/Azure/azure-container-networking/cns"
cnsclient "github.com/Azure/azure-container-networking/cns/client"
"github.com/Azure/azure-container-networking/cns/restserver"
"github.com/Azure/azure-container-networking/cns/types"
"github.com/Azure/azure-container-networking/common"
"github.com/Azure/azure-container-networking/log"
"github.com/Azure/azure-container-networking/netio"
Expand Down Expand Up @@ -456,7 +457,13 @@ func validateUpdateEndpointState(endpointID string, ifNameToIPInfoMap map[string
func (nm *networkManager) GetEndpointState(networkID, containerID string) ([]*EndpointInfo, error) {
endpointResponse, err := nm.CnsClient.GetEndpoint(context.TODO(), containerID)
if err != nil {
return nil, errors.Wrapf(err, "Get endpoint API returned with error")
if endpointResponse.Response.ReturnCode == types.NotFound {
return nil, ErrEndpointStateNotFound
}
if endpointResponse.Response.ReturnCode == types.ConnectionError {
return nil, ErrConnectionFailure
}
return nil, ErrGetEndpointStateFailure
}
epInfos := cnsEndpointInfotoCNIEpInfos(endpointResponse.EndpointInfo, containerID)

Expand Down

0 comments on commit a9fccfa

Please sign in to comment.