-
Notifications
You must be signed in to change notification settings - Fork 459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: ensure dials are sorted based on their recent success #2164
Changes from 7 commits
dce8861
66ee265
10b461e
33457c0
6cef35c
0701485
cae34f6
05f191f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,23 +7,21 @@ import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers' | |
import { type ClearableSignal, anySignal } from 'any-signal' | ||
import pDefer from 'p-defer' | ||
import PQueue from 'p-queue' | ||
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' | ||
import { codes } from '../errors.js' | ||
import { getPeerAddress } from '../get-peer.js' | ||
import { | ||
DIAL_TIMEOUT, | ||
MAX_PARALLEL_DIALS_PER_PEER, | ||
MAX_PARALLEL_DIALS, | ||
MAX_PEER_ADDRS_TO_DIAL, | ||
LAST_DIAL_FAILURE_KEY | ||
MAX_PEER_ADDRS_TO_DIAL | ||
} from './constants.js' | ||
import { combineSignals, resolveMultiaddrs } from './utils.js' | ||
import type { AddressSorter, AbortOptions, PendingDial } from '@libp2p/interface' | ||
import type { Connection } from '@libp2p/interface/connection' | ||
import type { ConnectionGater } from '@libp2p/interface/connection-gater' | ||
import type { Metric, Metrics } from '@libp2p/interface/metrics' | ||
import type { PeerId } from '@libp2p/interface/peer-id' | ||
import type { Address, PeerStore } from '@libp2p/interface/peer-store' | ||
import type { Address, Peer, PeerStore } from '@libp2p/interface/peer-store' | ||
import type { TransportManager } from '@libp2p/interface-internal/transport-manager' | ||
|
||
const log = logger('libp2p:connection-manager:dial-queue') | ||
|
@@ -234,19 +232,6 @@ export class DialQueue { | |
.catch(async err => { | ||
log.error('dial failed to %s', pendingDial.multiaddrs.map(ma => ma.toString()).join(', '), err) | ||
|
||
if (peerId != null) { | ||
// record the last failed dial | ||
try { | ||
await this.peerStore.patch(peerId, { | ||
metadata: { | ||
[LAST_DIAL_FAILURE_KEY]: uint8ArrayFromString(Date.now().toString()) | ||
} | ||
}) | ||
} catch (err: any) { | ||
log.error('could not update last dial failure key for %p', peerId, err) | ||
} | ||
} | ||
|
||
// Error is a timeout | ||
if (signal.aborted) { | ||
const error = new CodeError(err.message, codes.ERR_TIMEOUT) | ||
|
@@ -279,8 +264,7 @@ export class DialQueue { | |
return signal | ||
} | ||
|
||
// eslint-disable-next-line complexity | ||
private async calculateMultiaddrs (peerId?: PeerId, addrs: Address[] = [], options: DialOptions = {}): Promise<Address[]> { | ||
async calculateMultiaddrs (peerId?: PeerId, addrs: Address[] = [], options: DialOptions = {}): Promise<Address[]> { | ||
// if a peer id or multiaddr(s) with a peer id, make sure it isn't our peer id and that we are allowed to dial it | ||
if (peerId != null) { | ||
if (this.peerId.equals(peerId)) { | ||
|
@@ -380,6 +364,8 @@ export class DialQueue { | |
// append peer id to multiaddr if it is not already present | ||
if (addressPeerId !== peerId.toString()) { | ||
return { | ||
lastFailure: addr.lastFailure, | ||
lastSuccess: addr.lastSuccess, | ||
multiaddr: addr.multiaddr.encapsulate(peerIdMultiaddr), | ||
isCertified: addr.isCertified | ||
} | ||
|
@@ -406,9 +392,50 @@ export class DialQueue { | |
throw new CodeError('The connection gater denied all addresses in the dial request', codes.ERR_NO_VALID_ADDRESSES) | ||
} | ||
|
||
sortedGatedAddrs.sort((a, b) => this.sortMultiaddrsByDialability(a, b)) | ||
|
||
return sortedGatedAddrs | ||
} | ||
|
||
private sortMultiaddrsByDialability (a: Address, b: Address): number { | ||
const { lastSuccess: lastSuccessA, lastFailure: lastFailureA } = a | ||
const { lastSuccess: lastSuccessB, lastFailure: lastFailureB } = b | ||
|
||
if (lastSuccessA !== undefined && lastSuccessB !== undefined) { | ||
if (lastSuccessA > lastSuccessB) { | ||
return -1 | ||
} else { | ||
return 1 | ||
} | ||
} | ||
|
||
if (lastFailureA !== undefined && lastFailureB !== undefined) { | ||
if (lastFailureA > lastFailureB) { | ||
return 1 | ||
} else { | ||
return -1 | ||
} | ||
} | ||
|
||
if (lastSuccessA !== undefined) { | ||
return -1 | ||
} | ||
|
||
if (lastSuccessB !== undefined) { | ||
return 1 | ||
} | ||
|
||
if (lastFailureA !== undefined) { | ||
return 1 | ||
} | ||
|
||
if (lastFailureB !== undefined) { | ||
return -1 | ||
} | ||
|
||
return 0 | ||
} | ||
|
||
private async performDial (pendingDial: PendingDialInternal, options: DialOptions = {}): Promise<Connection> { | ||
const dialAbortControllers: Array<(AbortController | undefined)> = pendingDial.multiaddrs.map(() => new AbortController()) | ||
|
||
|
@@ -457,6 +484,17 @@ export class DialQueue { | |
pendingDial.status = 'active' | ||
|
||
let conn: Connection | ||
let peer: Peer | undefined | ||
|
||
try { | ||
if (pendingDial.peerId != null) { | ||
peer = await this.peerStore.get(pendingDial.peerId) | ||
} | ||
} catch (err: any) { | ||
if (err.code === codes.ERR_NOT_FOUND) { | ||
log.trace('peer %p not found in peer store, could be a new multiaddr', pendingDial.peerId) | ||
} | ||
} | ||
|
||
try { | ||
conn = await this.transportManager.dial(addr, { | ||
|
@@ -465,11 +503,11 @@ export class DialQueue { | |
}) | ||
|
||
// mark multiaddr dial as successful | ||
await this._updateAddressStatus(conn.remotePeer, addr, true) | ||
await this._updateAddressStatus(conn.remotePeer, addr, true, peer) | ||
} catch (err: any) { | ||
if (pendingDial.peerId != null) { | ||
// mark multiaddr dial as failure | ||
await this._updateAddressStatus(pendingDial.peerId, addr, false) | ||
await this._updateAddressStatus(pendingDial.peerId, addr, false, peer) | ||
} | ||
|
||
// rethrow error | ||
|
@@ -550,15 +588,28 @@ export class DialQueue { | |
/** | ||
* Record the last dial success/failure status of the passed multiaddr | ||
*/ | ||
private async _updateAddressStatus (peerId: PeerId, multiaddr: Multiaddr, success: boolean): Promise<void> { | ||
private async _updateAddressStatus (peerId: PeerId, multiaddr: Multiaddr, success: boolean, peer?: Peer): Promise<void> { | ||
const addr: Address = { | ||
multiaddr | ||
} | ||
|
||
if (peer !== undefined) { | ||
const existingAddr = peer.addresses.find(a => a.multiaddr.equals(multiaddr)) | ||
|
||
if (existingAddr !== undefined) { | ||
addr.lastSuccess = existingAddr.lastSuccess | ||
addr.lastFailure = existingAddr.lastFailure | ||
} | ||
} | ||
|
||
if (success) { | ||
addr.lastSuccess = Date.now() | ||
} else { | ||
addr.lastFailure = Date.now() | ||
if (addr.lastFailure !== undefined) { | ||
addr.lastFailure *= 2 | ||
} else { | ||
addr.lastFailure = Date.now() | ||
Comment on lines
+606
to
+608
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something seems wrong here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
} | ||
} | ||
|
||
await this.peerStore.merge(peerId, { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this peerStore.get should happen in _updateAddressStatus and leave _updateAddressStatus signature alone?
I think it tracks more nicely to have the get+update in the "same place".
Also thats one less promise that has to be fulfilled before we get to dial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take your point but when refactored that way a potential
ERR_NOT_FOUND
would be propagated in lieu of a consequent and perhaps more meaningful error such as aNO_RESERVATION
error.