Skip to content

Commit

Permalink
[fix] [client] fix same producer/consumer use more than one connectio…
Browse files Browse the repository at this point in the history
…n per broker (#1323)

* [fix] [client] fix same producer/consumer use more than one connection per broker

* Fix lint

* Apply suggestions from code review

Co-authored-by: Zike Yang <zike@apache.org>

* Address comment

* Addressd comment

* make lint

---------

Co-authored-by: Zike Yang <zike@apache.org>
  • Loading branch information
shibd and RobertIndie authored Jan 3, 2025
1 parent ffba2a8 commit 4e71a47
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 24 deletions.
8 changes: 5 additions & 3 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ type partitionConsumer struct {
state uAtomic.Int32
options *partitionConsumerOpts

conn atomic.Pointer[internal.Connection]
conn atomic.Pointer[internal.Connection]
cnxKeySuffix int32

topic string
name string
Expand Down Expand Up @@ -351,6 +352,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
parentConsumer: parent,
client: client,
options: options,
cnxKeySuffix: client.cnxPool.GenerateRoundRobinIndex(),
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
Expand Down Expand Up @@ -1964,7 +1966,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL string) error {
cmdSubscribe.ForceTopicCreation = proto.Bool(false)
}

res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
res, err := pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
pb.BaseCommand_SUBSCRIBE, cmdSubscribe)

if err != nil {
Expand All @@ -1975,7 +1977,7 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL string) error {
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
}
_, _ = pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
_, _ = pc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, pc.cnxKeySuffix, requestID,
pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
}
return err
Expand Down
28 changes: 28 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5038,3 +5038,31 @@ func TestClientVersion(t *testing.T) {
assert.True(t, strings.HasSuffix(publisher.ClientVersion, "-test-client"))

}

func TestSelectConnectionForSameConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
MaxConnectionsPerBroker: 10,
})
assert.NoError(t, err)
defer client.Close()

topicName := newTopicName()

_consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "sub-1",
Type: Shared,
})
assert.NoError(t, err)
defer _consumer.Close()

partitionConsumerImpl := _consumer.(*consumer).consumers[0]
conn := partitionConsumerImpl._getConn()

for i := 0; i < 5; i++ {
assert.NoError(t, partitionConsumerImpl.grabConn(""))
assert.Equal(t, conn.ID(), partitionConsumerImpl._getConn().ID(),
"The consumer uses a different connection when reconnecting")
}
}
33 changes: 17 additions & 16 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ import (
// ConnectionPool is a interface of connection pool.
type ConnectionPool interface {
// GetConnection get a connection from ConnectionPool.
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, keySuffix int32) (Connection, error)

// GetConnections get all connections in the pool.
GetConnections() map[string]Connection

// GenerateRoundRobinIndex generates a round-robin index.
GenerateRoundRobinIndex() int32

// Close all the connections in the pool
Close()
}
Expand All @@ -47,8 +50,8 @@ type connectionPool struct {
connectionTimeout time.Duration
tlsOptions *TLSOptions
auth auth.Provider
maxConnectionsPerHost int32
roundRobinCnt int32
maxConnectionsPerHost uint32
roundRobinCnt uint32
keepAliveInterval time.Duration
closeCh chan struct{}

Expand All @@ -73,7 +76,7 @@ func NewConnectionPool(
tlsOptions: tlsOptions,
auth: auth,
connectionTimeout: connectionTimeout,
maxConnectionsPerHost: int32(maxConnectionsPerHost),
maxConnectionsPerHost: uint32(maxConnectionsPerHost),
keepAliveInterval: keepAliveInterval,
log: logger,
metrics: metrics,
Expand All @@ -84,9 +87,12 @@ func NewConnectionPool(
return p
}

func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
p.log.WithField("logicalAddr", logicalAddr).WithField("physicalAddr", physicalAddr).Debug("Getting pooled connection")
key := p.getMapKey(logicalAddr, physicalAddr)
func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL,
keySuffix int32) (Connection, error) {
p.log.WithField("logicalAddr", logicalAddr).
WithField("physicalAddr", physicalAddr).
WithField("keySuffix", keySuffix).Debug("Getting pooled connection")
key := fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", keySuffix)

p.Lock()
conn, ok := p.connections[key]
Expand Down Expand Up @@ -141,6 +147,10 @@ func (p *connectionPool) GetConnections() map[string]Connection {
return conns
}

func (p *connectionPool) GenerateRoundRobinIndex() int32 {
return int32(atomic.AddUint32(&p.roundRobinCnt, 1) % p.maxConnectionsPerHost)
}

func (p *connectionPool) Close() {
p.Lock()
close(p.closeCh)
Expand All @@ -151,15 +161,6 @@ func (p *connectionPool) Close() {
p.Unlock()
}

func (p *connectionPool) getMapKey(logicalAddr *url.URL, physicalAddr *url.URL) string {
cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
if cnt < 0 {
cnt = -cnt
}
idx := cnt % p.maxConnectionsPerHost
return fmt.Sprint(logicalAddr.Host, "-", physicalAddr.Host, "-", idx)
}

func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime time.Duration) {
if maxIdleTime < 0 {
return
Expand Down
12 changes: 12 additions & 0 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.
}, nil
}

func (c *mockedLookupRPCClient) RequestWithCnxKeySuffix(_ *url.URL, _ *url.URL,
_ int32, _ uint64, _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, error) {
assert.Fail(c.t, "Shouldn't be called")
return nil, nil
}

func (c *mockedLookupRPCClient) RequestOnCnx(_ Connection, _ uint64, _ pb.BaseCommand_Type,
_ proto.Message) (*RPCResult, error) {
assert.Fail(c.t, "Shouldn't be called")
Expand Down Expand Up @@ -492,6 +498,12 @@ func (m mockedPartitionedTopicMetadataRPCClient) Request(_ *url.URL, _ *url.URL,
return nil, nil
}

func (m *mockedPartitionedTopicMetadataRPCClient) RequestWithCnxKeySuffix(_ *url.URL, _ *url.URL,
_ int32, _ uint64, _ pb.BaseCommand_Type, _ proto.Message) (*RPCResult, error) {
assert.Fail(m.t, "Shouldn't be called")
return nil, nil
}

func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(_ Connection, _ pb.BaseCommand_Type,
_ proto.Message) error {
assert.Fail(m.t, "Shouldn't be called")
Expand Down
11 changes: 10 additions & 1 deletion pulsar/internal/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type RPCClient interface {
RequestToHost(serviceNameResolver *ServiceNameResolver, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr *url.URL, cnxKeySuffix int32, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)

Expand Down Expand Up @@ -154,7 +157,13 @@ func (c *rpcClient) RequestToHost(serviceNameResolver *ServiceNameResolver, requ

func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
return c.RequestWithCnxKeySuffix(logicalAddr, physicalAddr, c.pool.GenerateRoundRobinIndex(),
requestID, cmdType, message)
}

func (c *rpcClient) RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr *url.URL, cnxKeySuffix int32,
requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr, cnxKeySuffix)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ type partitionProducer struct {
topic string
log log.Logger

conn uAtomic.Value
conn uAtomic.Value
cnxKeySuffix int32

options *ProducerOptions
producerName string
Expand Down Expand Up @@ -179,6 +180,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
client: client,
topic: topic,
log: logger,
cnxKeySuffix: client.cnxPool.GenerateRoundRobinIndex(),
options: options,
producerID: client.rpcClient.NewProducerID(),
dataChan: make(chan *sendRequest, maxPendingMessages),
Expand Down Expand Up @@ -301,7 +303,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
cmdProducer.Metadata = toKeyValues(p.options.Properties)
}

cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, lr.PhysicalAddr)
cnx, err := p.client.cnxPool.GetConnection(lr.LogicalAddr, lr.PhysicalAddr, p.cnxKeySuffix)
// registering the producer first in case broker sends commands in the middle
if err != nil {
p.log.Error("Failed to get connection")
Expand Down
31 changes: 31 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2574,3 +2574,34 @@ func TestProducerKeepReconnectingAndThenCallClose(t *testing.T) {
return true
}, 30*time.Second, 1*time.Second)
}

func TestSelectConnectionForSameProducer(t *testing.T) {
topicName := newTopicName()

client, err := NewClient(ClientOptions{
URL: serviceURL,
MaxConnectionsPerBroker: 10,
})
assert.NoError(t, err)
defer client.Close()

reconnectNum := uint(1)
_producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
MaxReconnectToBroker: &reconnectNum,
})
assert.NoError(t, err)
defer _producer.Close()

partitionProducerImp := _producer.(*producer).producers[0].(*partitionProducer)
conn := partitionProducerImp._getConn()

for i := 0; i < 5; i++ {
partitionProducerImp.grabCnx("")
currentConn := partitionProducerImp._getConn()
assert.Equal(t, conn.ID(), currentConn.ID(),
"The producer uses a different connection when reconnecting")
}

client.Close()
}
6 changes: 4 additions & 2 deletions pulsar/transaction_coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type transactionHandler struct {
tc *transactionCoordinatorClient
state uAtomic.Int32
conn uAtomic.Value
cnxKeySuffix int32
partition uint64
closeCh chan any
requestCh chan any
Expand All @@ -67,6 +68,7 @@ func (t *transactionHandler) getState() txnHandlerState {
func (tc *transactionCoordinatorClient) newTransactionHandler(partition uint64) (*transactionHandler, error) {
handler := &transactionHandler{
tc: tc,
cnxKeySuffix: tc.client.cnxPool.GenerateRoundRobinIndex(),
partition: partition,
closeCh: make(chan any),
requestCh: make(chan any),
Expand Down Expand Up @@ -95,8 +97,8 @@ func (t *transactionHandler) grabConn() error {
TcId: proto.Uint64(t.partition),
}

res, err := t.tc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST, &cmdTCConnect)
res, err := t.tc.client.rpcClient.RequestWithCnxKeySuffix(lr.LogicalAddr, lr.PhysicalAddr, t.cnxKeySuffix,
requestID, pb.BaseCommand_TC_CLIENT_CONNECT_REQUEST, &cmdTCConnect)

if err != nil {
t.log.WithError(err).Error("Failed to connect transaction_impl coordinator " +
Expand Down

0 comments on commit 4e71a47

Please sign in to comment.