diff --git a/pkg/engine/datasource/graphql_datasource/graphql_ws_handler.go b/pkg/engine/datasource/graphql_datasource/graphql_ws_handler.go index 6f8cdae6e..a84ff7bae 100644 --- a/pkg/engine/datasource/graphql_datasource/graphql_ws_handler.go +++ b/pkg/engine/datasource/graphql_datasource/graphql_ws_handler.go @@ -103,12 +103,18 @@ func (h *gqlWSConnectionHandler) readBlocking(ctx context.Context, dataCh chan [ for { msgType, data, err := h.conn.Read(ctx) if ctx.Err() != nil { - errCh <- ctx.Err() - return + select { + case errCh <- ctx.Err(): + case <-ctx.Done(): + return + } } if err != nil { - errCh <- err - return + select { + case errCh <- err: + case <-ctx.Done(): + return + } } if msgType != websocket.MessageText { continue diff --git a/pkg/subscription/legacy_handler.go b/pkg/subscription/legacy_handler.go index 0a1ae7858..5e25738ca 100644 --- a/pkg/subscription/legacy_handler.go +++ b/pkg/subscription/legacy_handler.go @@ -388,6 +388,9 @@ func (h *Handler) handleKeepAlive(ctx context.Context) { case <-ctx.Done(): return case <-time.After(h.keepAliveInterval): + if !h.client.IsConnected() { + return + } h.sendKeepAlive() } }