Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ensure dials are sorted based on their recent success #2164

28 changes: 3 additions & 25 deletions packages/libp2p/src/connection-manager/auto-dial.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { logger } from '@libp2p/logger'
import { PeerMap, PeerSet } from '@libp2p/peer-collections'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { PeerJobQueue } from '../utils/peer-job-queue.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PEER_RETRY_THRESHOLD, AUTO_DIAL_PRIORITY, LAST_DIAL_FAILURE_KEY, MIN_CONNECTIONS } from './constants.js'
import { AUTO_DIAL_CONCURRENCY, AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE, AUTO_DIAL_INTERVAL, AUTO_DIAL_MAX_QUEUE_LENGTH, AUTO_DIAL_PRIORITY, MIN_CONNECTIONS } from './constants.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { TypedEventTarget } from '@libp2p/interface/events'
import type { PeerStore } from '@libp2p/interface/peer-store'
Expand All @@ -17,7 +16,6 @@ interface AutoDialInit {
autoDialConcurrency?: number
autoDialPriority?: number
autoDialInterval?: number
autoDialPeerRetryThreshold?: number
autoDialDiscoveredPeersDebounce?: number
}

Expand All @@ -33,7 +31,6 @@ const defaultOptions = {
autoDialConcurrency: AUTO_DIAL_CONCURRENCY,
autoDialPriority: AUTO_DIAL_PRIORITY,
autoDialInterval: AUTO_DIAL_INTERVAL,
autoDialPeerRetryThreshold: AUTO_DIAL_PEER_RETRY_THRESHOLD,
autoDialDiscoveredPeersDebounce: AUTO_DIAL_DISCOVERED_PEERS_DEBOUNCE
}

Expand All @@ -45,7 +42,6 @@ export class AutoDial implements Startable {
private readonly autoDialPriority: number
private readonly autoDialIntervalMs: number
private readonly autoDialMaxQueueLength: number
private readonly autoDialPeerRetryThresholdMs: number
private readonly autoDialDiscoveredPeersDebounce: number
private autoDialInterval?: ReturnType<typeof setInterval>
private started: boolean
Expand All @@ -63,7 +59,6 @@ export class AutoDial implements Startable {
this.autoDialPriority = init.autoDialPriority ?? defaultOptions.autoDialPriority
this.autoDialIntervalMs = init.autoDialInterval ?? defaultOptions.autoDialInterval
this.autoDialMaxQueueLength = init.maxQueueLength ?? defaultOptions.maxQueueLength
this.autoDialPeerRetryThresholdMs = init.autoDialPeerRetryThreshold ?? defaultOptions.autoDialPeerRetryThreshold
this.autoDialDiscoveredPeersDebounce = init.autoDialDiscoveredPeersDebounce ?? defaultOptions.autoDialDiscoveredPeersDebounce
this.started = false
this.running = false
Expand Down Expand Up @@ -232,26 +227,9 @@ export class AutoDial implements Startable {
return 0
})

const peersThatHaveNotFailed = sortedPeers.filter(peer => {
const lastDialFailure = peer.metadata.get(LAST_DIAL_FAILURE_KEY)
log('selected %d/%d peers to dial', sortedPeers.length, peers.length)

if (lastDialFailure == null) {
return true
}

const lastDialFailureTimestamp = parseInt(uint8ArrayToString(lastDialFailure))

if (isNaN(lastDialFailureTimestamp)) {
return true
}

// only dial if the time since the last failure is above the retry threshold
return Date.now() - lastDialFailureTimestamp > this.autoDialPeerRetryThresholdMs
})

log('selected %d/%d peers to dial', peersThatHaveNotFailed.length, peers.length)

for (const peer of peersThatHaveNotFailed) {
for (const peer of sortedPeers) {
this.queue.add(async () => {
const numConnections = this.connectionManager.getConnectionsMap().size

Expand Down
5 changes: 0 additions & 5 deletions packages/libp2p/src/connection-manager/constants.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,3 @@ export const MAX_CONNECTIONS = 100
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDials
*/
export const MAX_PARALLEL_DIALS = 50

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialPeerRetryThreshold
*/
export const AUTO_DIAL_PEER_RETRY_THRESHOLD = 1000 * 60 * 7
10 changes: 0 additions & 10 deletions packages/libp2p/src/connection-manager/constants.defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,3 @@ export const INBOUND_CONNECTION_THRESHOLD = 5
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxIncomingPendingConnections
*/
export const MAX_INCOMING_PENDING_CONNECTIONS = 10

/**
* Store as part of the peer store metadata for a given peer, the value for this
* key is a timestamp of the last time a dial attempted failed with the relevant
* peer stored as a string.
*
* Used to insure we do not endlessly try to auto dial peers we have recently
* failed to dial.
*/
export const LAST_DIAL_FAILURE_KEY = 'last-dial-failure'
5 changes: 0 additions & 5 deletions packages/libp2p/src/connection-manager/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,3 @@ export const MAX_CONNECTIONS = 300
* @see https://libp2p.github.io/js-libp2p/interfaces/index._internal_.ConnectionManagerConfig.html#maxParallelDials
*/
export const MAX_PARALLEL_DIALS = 100

/**
* @see https://libp2p.github.io/js-libp2p/interfaces/libp2p.index.unknown.ConnectionManagerInit.html#autoDialPeerRetryThreshold
*/
export const AUTO_DIAL_PEER_RETRY_THRESHOLD = 1000 * 60
97 changes: 74 additions & 23 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,21 @@ import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { type ClearableSignal, anySignal } from 'any-signal'
import pDefer from 'p-defer'
import PQueue from 'p-queue'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import {
DIAL_TIMEOUT,
MAX_PARALLEL_DIALS_PER_PEER,
MAX_PARALLEL_DIALS,
MAX_PEER_ADDRS_TO_DIAL,
LAST_DIAL_FAILURE_KEY
MAX_PEER_ADDRS_TO_DIAL
} from './constants.js'
import { combineSignals, resolveMultiaddrs } from './utils.js'
import type { AddressSorter, AbortOptions, PendingDial } from '@libp2p/interface'
import type { Connection } from '@libp2p/interface/connection'
import type { ConnectionGater } from '@libp2p/interface/connection-gater'
import type { Metric, Metrics } from '@libp2p/interface/metrics'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Address, PeerStore } from '@libp2p/interface/peer-store'
import type { Address, Peer, PeerStore } from '@libp2p/interface/peer-store'
import type { TransportManager } from '@libp2p/interface-internal/transport-manager'

const log = logger('libp2p:connection-manager:dial-queue')
Expand Down Expand Up @@ -231,19 +229,6 @@ export class DialQueue {
.catch(async err => {
log.error('dial failed to %s', pendingDial.multiaddrs.map(ma => ma.toString()).join(', '), err)

if (peerId != null) {
// record the last failed dial
try {
await this.peerStore.patch(peerId, {
metadata: {
[LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString())
}
})
} catch (err: any) {
log.error('could not update last dial failure key for %p', peerId, err)
}
}

// Error is a timeout
if (signal.aborted) {
const error = new CodeError(err.message, codes.ERR_TIMEOUT)
Expand Down Expand Up @@ -276,8 +261,7 @@ export class DialQueue {
return signal
}

// eslint-disable-next-line complexity
private async calculateMultiaddrs (peerId?: PeerId, addrs: Address[] = [], options: DialOptions = {}): Promise<Address[]> {
async calculateMultiaddrs (peerId?: PeerId, addrs: Address[] = [], options: DialOptions = {}): Promise<Address[]> {
// if a peer id or multiaddr(s) with a peer id, make sure it isn't our peer id and that we are allowed to dial it
if (peerId != null) {
if (this.peerId.equals(peerId)) {
Expand Down Expand Up @@ -377,6 +361,8 @@ export class DialQueue {
// append peer id to multiaddr if it is not already present
if (addressPeerId !== peerId.toString()) {
return {
lastFailure: addr.lastFailure,
lastSuccess: addr.lastSuccess,
multiaddr: addr.multiaddr.encapsulate(peerIdMultiaddr),
isCertified: addr.isCertified
}
Expand All @@ -403,9 +389,50 @@ export class DialQueue {
throw new CodeError('The connection gater denied all addresses in the dial request', codes.ERR_NO_VALID_ADDRESSES)
}

sortedGatedAddrs.sort((a, b) => this.sortMultiaddrsByDialability(a, b))

return sortedGatedAddrs
}

private sortMultiaddrsByDialability (a: Address, b: Address): number {
const { lastSuccess: lastSuccessA, lastFailure: lastFailureA } = a
const { lastSuccess: lastSuccessB, lastFailure: lastFailureB } = b

if (lastSuccessA !== undefined && lastSuccessB !== undefined) {
if (lastSuccessA > lastSuccessB) {
return -1
} else {
return 1
}
}

if (lastFailureA !== undefined && lastFailureB !== undefined) {
if (lastFailureA > lastFailureB) {
return 1
} else {
return -1
}
}

if (lastSuccessA !== undefined) {
return -1
}

if (lastSuccessB !== undefined) {
return 1
}

if (lastFailureA !== undefined) {
return 1
}

if (lastFailureB !== undefined) {
return -1
}

return 0
}

private async performDial (pendingDial: PendingDialInternal, options: DialOptions = {}): Promise<Connection> {
const dialAbortControllers: Array<(AbortController | undefined)> = pendingDial.multiaddrs.map(() => new AbortController())

Expand Down Expand Up @@ -454,6 +481,17 @@ export class DialQueue {
pendingDial.status = 'active'

let conn: Connection
let peer: Peer | undefined

try {
if (pendingDial.peerId != null) {
peer = await this.peerStore.get(pendingDial.peerId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this peerStore.get should happen in _updateAddressStatus and leave _updateAddressStatus signature alone?

I think it tracks more nicely to have the get+update in the "same place".
Also thats one less promise that has to be fulfilled before we get to dial.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take your point but when refactored that way a potential ERR_NOT_FOUND would be propagated in lieu of a consequent and perhaps more meaningful error such as a NO_RESERVATION error.

}
} catch (err: any) {
if (err.code === codes.ERR_NOT_FOUND) {
log.trace('peer %p not found in peer store, could be a new multiaddr', pendingDial.peerId)
}
}

try {
conn = await this.transportManager.dial(addr, {
Expand All @@ -462,11 +500,11 @@ export class DialQueue {
})

// mark multiaddr dial as successful
await this._updateAddressStatus(conn.remotePeer, addr, true)
await this._updateAddressStatus(conn.remotePeer, addr, true, peer)
} catch (err: any) {
if (pendingDial.peerId != null) {
// mark multiaddr dial as failure
await this._updateAddressStatus(pendingDial.peerId, addr, false)
await this._updateAddressStatus(pendingDial.peerId, addr, false, peer)
}

// rethrow error
Expand Down Expand Up @@ -547,15 +585,28 @@ export class DialQueue {
/**
* Record the last dial success/failure status of the passed multiaddr
*/
private async _updateAddressStatus (peerId: PeerId, multiaddr: Multiaddr, success: boolean): Promise<void> {
private async _updateAddressStatus (peerId: PeerId, multiaddr: Multiaddr, success: boolean, peer?: Peer): Promise<void> {
const addr: Address = {
multiaddr
}

if (peer !== undefined) {
const existingAddr = peer.addresses.find(a => a.multiaddr.equals(multiaddr))

if (existingAddr !== undefined) {
addr.lastSuccess = existingAddr.lastSuccess
addr.lastFailure = existingAddr.lastFailure
}
}

if (success) {
addr.lastSuccess = Date.now()
} else {
addr.lastFailure = Date.now()
if (addr.lastFailure !== undefined) {
addr.lastFailure *= 2
} else {
addr.lastFailure = Date.now()
Comment on lines +606 to +608
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something seems wrong here.
Is lastFailure a timeout value or a date?
If its a date, then multiplying it by 2 is wrong.
If its a timeout value, setting as the current date is wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lastFailure value is a unix timestamp but it's being utilized as a value for ordering the list of multiaddrs, therefore exponentially increasing it so that it will be penalized even further to the bottom of the list seems a valid approach i.e. it could be any unique incremental numerical value as long as it is sequentially increasing to create an order within the list of addrs.

}
}

await this.peerStore.merge(peerId, {
Expand Down
6 changes: 0 additions & 6 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ export interface ConnectionManagerInit {
*/
autoDialMaxQueueLength?: number

/**
* When we've failed to dial a peer, do not autodial them again within this
* number of ms. (default: 1 minute, 7 minutes in browsers)
*/
autoDialPeerRetryThreshold?: number

/**
* Newly discovered peers may be auto-dialed to increase the number of open
* connections, but they can be discovered in quick succession so add a small
Expand Down
67 changes: 0 additions & 67 deletions packages/libp2p/test/connection-manager/auto-dial.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import delay from 'delay'
import pWaitFor from 'p-wait-for'
import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { AutoDial } from '../../src/connection-manager/auto-dial.js'
import { LAST_DIAL_FAILURE_KEY } from '../../src/connection-manager/constants.js'
import { matchPeerId } from '../fixtures/match-peer-id.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { Connection } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -226,69 +224,4 @@ describe('auto-dial', () => {
// should only have queried peer store once
expect(peerStoreAllSpy.callCount).to.equal(1)
})

it('should not re-dial peers we have recently failed to dial', async () => {
const peerWithAddress: Peer = {
id: await createEd25519PeerId(),
protocols: [],
addresses: [{
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4001'),
isCertified: true
}],
metadata: new Map(),
tags: new Map()
}
const undialablePeer: Peer = {
id: await createEd25519PeerId(),
protocols: [],
addresses: [{
multiaddr: multiaddr('/ip4/127.0.0.1/tcp/4002'),
isCertified: true
}],
// we failed to dial them recently
metadata: new Map([[LAST_DIAL_FAILURE_KEY, uint8ArrayFromString(`${Date.now() - 10}`)]]),
tags: new Map()
}

await peerStore.save(peerWithAddress.id, peerWithAddress)
await peerStore.save(undialablePeer.id, undialablePeer)

const connectionManager = stubInterface<ConnectionManager>({
getConnectionsMap: Sinon.stub().returns(new PeerMap()),
getDialQueue: Sinon.stub().returns([])
})

autoDialler = new AutoDial({
peerStore,
connectionManager,
events
}, {
minConnections: 10,
autoDialPeerRetryThreshold: 2000
})
autoDialler.start()

void autoDialler.autoDial()

await pWaitFor(() => {
return connectionManager.openConnection.callCount === 1
})

expect(connectionManager.openConnection.callCount).to.equal(1)
expect(connectionManager.openConnection.calledWith(matchPeerId(peerWithAddress.id))).to.be.true()
expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.false()

// pass the retry threshold
await delay(2000)

// autodial again
void autoDialler.autoDial()

await pWaitFor(() => {
return connectionManager.openConnection.callCount === 3
})

// should have retried the unreachable peer
expect(connectionManager.openConnection.calledWith(matchPeerId(undialablePeer.id))).to.be.true()
})
})
Loading