Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): allow using a specific node #2192

Merged
merged 17 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 114 additions & 20 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import type { Peer, PeerId, PeerInfo, PeerStore } from "@libp2p/interface";
import { TypedEventEmitter } from "@libp2p/interface";
import {
type Connection,
isPeerId,
type Peer,
type PeerId,
type PeerInfo,
type PeerStore,
type Stream,
TypedEventEmitter
} from "@libp2p/interface";
import { Multiaddr, multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import {
ConnectionManagerOptions,
DiscoveryTrigger,
Expand Down Expand Up @@ -36,7 +45,7 @@
private options: ConnectionManagerOptions;
private libp2p: Libp2p;
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();

Check warning on line 48 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 48 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];
Expand Down Expand Up @@ -219,15 +228,60 @@
this.startNetworkStatusListener();
}

private async dialPeer(peerId: PeerId): Promise<void> {
/**
* Attempts to establish a connection with a peer and set up specified protocols.
* The method handles both PeerId and Multiaddr inputs, manages connection attempts,
* and maintains the connection state.
*
* The dialing process includes:
* 1. Converting input to dialable peer info
* 2. Managing parallel dial attempts
* 3. Attempting to establish protocol-specific connections
* 4. Handling connection failures and retries
* 5. Updating the peer store and connection state
*
* @param {PeerId | MultiaddrInput} peer - The peer to connect to, either as a PeerId or multiaddr
* @param {string[]} [protocolCodecs] - Optional array of protocol-specific codec strings to establish
* (e.g., for LightPush, Filter, Store protocols)
*
* @throws {Error} If the multiaddr is missing a peer ID
* @throws {Error} If the maximum dial attempts are reached and the peer cannot be dialed
* @throws {Error} If there's an error deleting an undialable peer from the peer store
*
* @example
* ```typescript
* // Dial using PeerId
* await connectionManager.dialPeer(peerId);
*
* // Dial using multiaddr with specific protocols
* await connectionManager.dialPeer(multiaddr, [
* "/vac/waku/relay/2.0.0",
* "/vac/waku/lightpush/2.0.0-beta1"
* ]);
* ```
*
* @remarks
* - The method implements exponential backoff through multiple dial attempts
* - Maintains a queue for parallel dial attempts (limited by maxParallelDials)
* - Integrates with the KeepAliveManager for connection maintenance
* - Updates the peer store and connection state after successful/failed attempts
* - If all dial attempts fail, triggers DNS discovery as a fallback
*/
public async dialPeer(peer: PeerId | MultiaddrInput): Promise<Connection> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not needed to be public

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leaving it public for now, would be interesting to experiment with this

let connection: Connection | undefined;
let peerId: PeerId | undefined;
const peerDialInfo = this.getDialablePeerInfo(peer);
const peerIdStr = isPeerId(peerDialInfo)
? peerDialInfo.toString()
: peerDialInfo.getPeerId()!;

this.currentActiveParallelDialCount += 1;
let dialAttempt = 0;
while (dialAttempt < this.options.maxDialAttemptsForPeer) {
try {
log.info(
`Dialing peer ${peerId.toString()} on attempt ${dialAttempt + 1}`
);
await this.libp2p.dial(peerId);
log.info(`Dialing peer ${peerDialInfo} on attempt ${dialAttempt + 1}`);
connection = await this.libp2p.dial(peerDialInfo);
peerId = connection.remotePeer;

const tags = await this.getTagNamesForPeer(peerId);
// add tag to connection describing discovery mechanism
Expand All @@ -246,21 +300,17 @@
} catch (error) {
if (error instanceof AggregateError) {
// Handle AggregateError
log.error(
`Error dialing peer ${peerId.toString()} - ${error.errors}`
);
log.error(`Error dialing peer ${peerIdStr} - ${error.errors}`);
} else {
// Handle generic error
log.error(
`Error dialing peer ${peerId.toString()} - ${
(error as any).message
}`
`Error dialing peer ${peerIdStr} - ${(error as any).message}`

Check warning on line 307 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 307 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
);
}
this.dialErrorsForPeer.set(peerId.toString(), error);
this.dialErrorsForPeer.set(peerIdStr, error);

dialAttempt++;
this.dialAttemptsForPeer.set(peerId.toString(), dialAttempt);
this.dialAttemptsForPeer.set(peerIdStr, dialAttempt);
}
}

Expand All @@ -271,7 +321,7 @@
// If max dial attempts reached and dialing failed, delete the peer
if (dialAttempt === this.options.maxDialAttemptsForPeer) {
try {
const error = this.dialErrorsForPeer.get(peerId.toString());
const error = this.dialErrorsForPeer.get(peerIdStr);

if (error) {
let errorMessage;
Expand All @@ -288,21 +338,65 @@
}

log.info(
`Deleting undialable peer ${peerId.toString()} from peer store. Reason: ${errorMessage}`
`Deleting undialable peer ${peerIdStr} from peer store. Reason: ${errorMessage}`
);
}

this.dialErrorsForPeer.delete(peerId.toString());
await this.libp2p.peerStore.delete(peerId);
this.dialErrorsForPeer.delete(peerIdStr);
if (peerId) {
await this.libp2p.peerStore.delete(peerId);
}

// if it was last available peer - attempt DNS discovery
await this.attemptDnsDiscovery();
} catch (error) {
throw new Error(
`Error deleting undialable peer ${peerId.toString()} from peer store - ${error}`
`Error deleting undialable peer ${peerIdStr} from peer store - ${error}`
);
}
}

if (!connection) {
throw new Error(`Failed to dial peer ${peerDialInfo}`);
}

return connection;
}

/**
* Dial a peer with specific protocols.
* This method is a raw proxy to the libp2p dialProtocol method.
* @param peer - The peer to connect to, either as a PeerId or multiaddr
* @param protocolCodecs - Optional array of protocol-specific codec strings to establish
* @returns A stream to the peer
*/
public async rawDialPeerWithProtocols(
peer: PeerId | MultiaddrInput,
protocolCodecs: string[]
): Promise<Stream> {
const peerDialInfo = this.getDialablePeerInfo(peer);
return await this.libp2p.dialProtocol(peerDialInfo, protocolCodecs);
}

/**
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
* Internal utility to extract a PeerId or Multiaddr from a peer input.
* This is used internally by the connection manager to handle different peer input formats.
* @internal
*/
private getDialablePeerInfo(
peer: PeerId | MultiaddrInput
): PeerId | Multiaddr {
if (isPeerId(peer)) {
return peer;
} else {
// peer is of MultiaddrInput type
const ma = multiaddr(peer);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: can multiaddr throw?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the class implementation can throw in several cases, mostly parsing of the multiaddr passed to it; why do you ask?

const peerIdStr = ma.getPeerId();
if (!peerIdStr) {
throw new Error("Failed to dial multiaddr: missing peer ID");
}
return ma;
}
}

private async attemptDnsDiscovery(): Promise<void> {
Expand Down
8 changes: 8 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ export type IBaseProtocolSDK = {
readonly numPeersToUse: number;
};

export type StoreProtocolOptions = {
peer: string;
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
};

export type NetworkConfig = StaticSharding | AutoSharding;

//TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048
Expand Down Expand Up @@ -106,6 +110,10 @@ export type ProtocolCreateOptions = {
* List of peers to use to bootstrap the node. Ignored if defaultBootstrap is set to true.
*/
bootstrapPeers?: string[];
/**
* Options for the Store protocol.
*/
store?: Partial<StoreProtocolOptions>;
};

export type Callback<T extends IDecodedMessage> = (
Expand Down
43 changes: 33 additions & 10 deletions packages/sdk/src/protocols/store/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import type { Peer } from "@libp2p/interface";
import { ConnectionManager, StoreCore } from "@waku/core";
import {
IDecodedMessage,
IDecoder,
IStore,
Libp2p,
QueryRequestParams,
StoreCursor
StoreCursor,
StoreProtocolOptions
} from "@waku/interfaces";
import { messageHash } from "@waku/message-hash";
import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils";
Expand All @@ -23,7 +25,11 @@ const log = new Logger("waku:store:sdk");
export class Store extends BaseProtocolSDK implements IStore {
public readonly protocol: StoreCore;

public constructor(connectionManager: ConnectionManager, libp2p: Libp2p) {
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
private options?: Partial<StoreProtocolOptions>
) {
super(
new StoreCore(connectionManager.configuredPubsubTopics, libp2p),
connectionManager,
Expand Down Expand Up @@ -58,12 +64,8 @@ export class Store extends BaseProtocolSDK implements IStore {
...options
};

const peer = (
await this.protocol.getPeers({
numPeers: this.numPeersToUse,
maxBootstrapPeers: 1
})
)[0];
const peer = await this.getPeerToUse();

if (!peer) {
log.error("No peers available to query");
throw new Error("No peers available to query");
Expand Down Expand Up @@ -228,6 +230,26 @@ export class Store extends BaseProtocolSDK implements IStore {
decodersAsMap
};
}

private async getPeerToUse(): Promise<Peer | null> {
const peer = this.connectedPeers.find(
(p) => p.id.toString() === this.options?.peer
);
if (peer) {
return peer;
}

log.warn(
`Passed node to use for Store not found: ${this.options?.peer}. Attempting to use random peers.`
);

return (
await this.protocol.getPeers({
numPeers: this.numPeersToUse,
maxBootstrapPeers: 1
})
)[0];
}
}

/**
Expand All @@ -237,9 +259,10 @@ export class Store extends BaseProtocolSDK implements IStore {
* @returns A function that takes a Libp2p instance and returns a StoreSDK instance.
*/
export function wakuStore(
connectionManager: ConnectionManager
connectionManager: ConnectionManager,
options?: Partial<StoreProtocolOptions>
): (libp2p: Libp2p) => IStore {
return (libp2p: Libp2p) => {
return new Store(connectionManager, libp2p);
return new Store(connectionManager, libp2p, options);
};
}
22 changes: 15 additions & 7 deletions packages/sdk/src/waku/waku.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface";
import { isPeerId, PeerId, type Stream } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core";
import type {
IFilter,
IHealthManager,
Expand Down Expand Up @@ -106,7 +105,17 @@ export class WakuNode implements IWaku {
this.health = getHealthManager();

if (protocolsEnabled.store) {
const store = wakuStore(this.connectionManager);
if (options.store?.peer) {
this.connectionManager
.rawDialPeerWithProtocols(options.store.peer, [StoreCodec])
.catch((e) => {
log.error("Failed to dial store peer", e);
});
}

const store = wakuStore(this.connectionManager, {
peer: options.store?.peer
});
this.store = store(libp2p);
}

Expand Down Expand Up @@ -145,7 +154,6 @@ export class WakuNode implements IWaku {
protocols?: Protocols[]
): Promise<Stream> {
const _protocols = protocols ?? [];
const peerId = this.mapToPeerIdOrMultiaddr(peer);

if (typeof protocols === "undefined") {
this.relay && _protocols.push(Protocols.Relay);
Expand Down Expand Up @@ -194,9 +202,9 @@ export class WakuNode implements IWaku {
}
}

const peerId = this.mapToPeerIdOrMultiaddr(peer);
log.info(`Dialing to ${peerId.toString()} with protocols ${_protocols}`);

return this.libp2p.dialProtocol(peerId, codecs);
return await this.connectionManager.rawDialPeerWithProtocols(peer, codecs);
}

public async start(): Promise<void> {
Expand Down
Loading