Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closing consumer can leak a consumer when the connection is lost #1326

Open
Gilthoniel opened this issue Jan 23, 2025 · 0 comments
Open

Closing consumer can leak a consumer when the connection is lost #1326

Gilthoniel opened this issue Jan 23, 2025 · 0 comments

Comments

@Gilthoniel
Copy link
Contributor

Expected behavior

When calling the Close function on a consumer, it should be removed from the subscription in the broker eventually.

Actual behavior

When calling the Close function on a consumer and when the connection is interrupted during the RPC, the consumer ends up in a closed state as the point of view of the client, but is still active in the broker (and active so blocking in failover mode for instance).

Steps to reproduce

Hard to reproduce as it happens when closing a consumer and the connection is lost at the same time. It is however clear in the code:

_, err := pc.client.rpcClient.RequestOnCnx(pc._getConn(), requestID, pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
if err != nil {
	pc.log.WithError(err).Warn("Failed to close consumer")
} else {
	pc.log.Info("Closed consumer")
}

Here it only logs that closing has failed but it then continues and stops everything else resulting in a consumer in the broker like this:

"activeConsumerName" : "1b81691e",
"consumers" : [ {
        "appId" : "***",
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "bytesOutCounter" : 0,
        "msgOutCounter" : 0,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "1b81691e",
        "availablePermits" : 0,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "drainingHashesCount" : 0,
        "drainingHashesClearedTotal" : 0,
        "drainingHashesUnackedMessages" : 0,
        "address" : "/10.128.15.78:35170",
        "connectedSince" : "2025-01-23T13:28:58.27512622Z",
        "clientVersion" : "Pulsar Go v0.14.0",
        "lastAckedTimestamp" : 0,
        "lastConsumedTimestamp" : 0,
        "lastConsumedFlowTimestamp" : 0,
        "metadata" : { },
        "lastAckedTime" : "1970-01-01T00:00:00Z",
        "lastConsumedTime" : "1970-01-01T00:00:00Z"
      }

System configuration

Pulsar version: v4.0.1
Pulsar client: v0.14.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant