diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts index 90ec3794b6..43fbf11374 100644 --- a/packages/sdk/src/protocols/filter/constants.ts +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -1,4 +1,3 @@ export const DEFAULT_KEEP_ALIVE = 60_000; export const DEFAULT_MAX_PINGS = 3; export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false; -export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index 07e7ec7876..1e4b6d86bf 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -8,7 +8,6 @@ import type { IDecoder, IFilter, ILightPush, - IProtoMessage, Libp2p, PubsubTopic, SubscribeResult, @@ -25,7 +24,6 @@ import { import { PeerManager } from "../peer_manager.js"; -import { MessageCache } from "./message_cache.js"; import { Subscription } from "./subscription.js"; import { buildConfig } from "./utils.js"; @@ -35,7 +33,6 @@ class Filter implements IFilter { public readonly protocol: FilterCore; private readonly config: FilterProtocolOptions; - private readonly messageCache: MessageCache; private activeSubscriptions = new Map(); public constructor( @@ -46,7 +43,6 @@ class Filter implements IFilter { config?: Partial ) { this.config = buildConfig(config); - this.messageCache = new MessageCache(libp2p); this.protocol = new FilterCore( async (pubsubTopic, wakuMessage, peerIdStr) => { @@ -58,14 +54,6 @@ class Filter implements IFilter { return; } - if (this.messageCache.has(pubsubTopic, wakuMessage as IProtoMessage)) { - log.info( - `Skipping duplicate message for pubsubTopic:${pubsubTopic} peerId:${peerIdStr}` - ); - return; - } - - this.messageCache.set(pubsubTopic, wakuMessage as IProtoMessage); await subscription.processIncomingMessage(wakuMessage, peerIdStr); }, diff --git a/packages/sdk/src/protocols/filter/message_cache.ts b/packages/sdk/src/protocols/filter/message_cache.ts deleted file mode 100644 index 5c28b34f43..0000000000 --- a/packages/sdk/src/protocols/filter/message_cache.ts +++ /dev/null @@ -1,53 +0,0 @@ -import type { IProtoMessage, Libp2p } from "@waku/interfaces"; -import { messageHashStr } from "@waku/message-hash"; - -type Hash = string; -type Timestamp = number; - -export class MessageCache { - private intervalID: number | undefined = undefined; - private readonly messages: Map = new Map(); - - public constructor(libp2p: Libp2p) { - this.onStart = this.onStart.bind(this); - this.onStop = this.onStop.bind(this); - - libp2p.addEventListener("start", this.onStart); - libp2p.addEventListener("stop", this.onStop); - } - - public set(pubsubTopic: string, message: IProtoMessage): void { - const hash = messageHashStr(pubsubTopic, message); - this.messages.set(hash, Date.now()); - } - - public has(pubsubTopic: string, message: IProtoMessage): boolean { - const hash = messageHashStr(pubsubTopic, message); - return this.messages.has(hash); - } - - private onStart(): void { - if (this.intervalID) { - return; - } - - this.intervalID = setInterval(() => { - this.prune(); - }, 60_000) as unknown as number; - } - - private onStop(): void { - if (!this.intervalID) { - return; - } - - clearInterval(this.intervalID); - } - - private prune(): void { - Array.from(this.messages.entries()) - .filter(([_, seenTimestamp]) => Date.now() - seenTimestamp >= 60_000) - .map(([hash, _]) => hash) - .forEach((hash) => this.messages.delete(hash)); - } -} diff --git a/packages/sdk/src/protocols/filter/subscription.ts b/packages/sdk/src/protocols/filter/subscription.ts index 3534f26a7f..c585a05408 100644 --- a/packages/sdk/src/protocols/filter/subscription.ts +++ b/packages/sdk/src/protocols/filter/subscription.ts @@ -1,17 +1,8 @@ -import type { Peer } from "@libp2p/interface"; -import type { PeerId } from "@libp2p/interface"; -import { - ConnectionManager, - createDecoder, - createEncoder, - FilterCore, - LightPushCore -} from "@waku/core"; +import { ConnectionManager, createDecoder, FilterCore } from "@waku/core"; import { type Callback, type ContentTopic, type CoreProtocolResult, - EConnectionStateEvents, FilterProtocolOptions, type IDecodedMessage, type IDecoder, @@ -28,48 +19,41 @@ import { import { WakuMessage } from "@waku/proto"; import { groupByContentTopic, Logger } from "@waku/utils"; -import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; -import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js"; import { PeerManager } from "../peer_manager.js"; -import { DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL } from "./constants.js"; +import { SubscriptionMonitor } from "./subscription_monitor.js"; -const log = new Logger("sdk:filter:subscription_manager"); +const log = new Logger("sdk:filter:subscription"); export class Subscription implements ISubscription { - private reliabilityMonitor: ReceiverReliabilityMonitor; - - private keepAliveTimeout: number; - private enableLightPushFilterCheck: boolean; - private keepAliveInterval: ReturnType | null = null; + private readonly monitor: SubscriptionMonitor; private subscriptionCallbacks: Map< ContentTopic, SubscriptionCallback - >; + > = new Map(); public constructor( private readonly pubsubTopic: PubsubTopic, private readonly protocol: FilterCore, - private readonly connectionManager: ConnectionManager, - private readonly peerManager: PeerManager, - private readonly libp2p: Libp2p, - config: FilterProtocolOptions, - private readonly lightPush?: ILightPush + connectionManager: ConnectionManager, + peerManager: PeerManager, + libp2p: Libp2p, + private readonly config: FilterProtocolOptions, + lightPush?: ILightPush ) { this.pubsubTopic = pubsubTopic; - this.subscriptionCallbacks = new Map(); - - this.reliabilityMonitor = ReliabilityMonitorManager.createReceiverMonitor( - this.pubsubTopic, - this.peerManager, - () => Array.from(this.subscriptionCallbacks.keys()), - this.protocol.subscribe.bind(this.protocol), - this.sendLightPushCheckMessage.bind(this) - ); - this.reliabilityMonitor.setMaxPingFailures(config.pingsBeforePeerRenewed); - this.keepAliveTimeout = config.keepAliveIntervalMs; - this.enableLightPushFilterCheck = config.enableLightPushFilterCheck; + + this.monitor = new SubscriptionMonitor({ + pubsubTopic, + config, + libp2p, + connectionManager, + filter: protocol, + peerManager, + lightPush, + activeSubscriptions: this.subscriptionCallbacks + }); } public async subscribe( @@ -92,10 +76,10 @@ export class Subscription implements ISubscription { } } - if (this.enableLightPushFilterCheck) { + if (this.config.enableLightPushFilterCheck) { decodersArray.push( createDecoder( - this.buildLightPushContentTopic(), + this.monitor.reservedContentTopic, this.pubsubTopic ) as IDecoder ); @@ -104,10 +88,10 @@ export class Subscription implements ISubscription { const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); - const peers = await this.peerManager.getPeers(); - const promises = peers.map(async (peer) => - this.subscribeWithPeerVerification(peer, contentTopics) - ); + const peers = await this.monitor.getPeers(); + const promises = peers.map(async (peer) => { + return this.protocol.subscribe(this.pubsubTopic, peer, contentTopics); + }); const results = await Promise.allSettled(promises); @@ -125,7 +109,7 @@ export class Subscription implements ISubscription { } as unknown as SubscriptionCallback; // don't handle case of internal content topic - if (contentTopic === this.buildLightPushContentTopic()) { + if (contentTopic === this.monitor.reservedContentTopic) { return; } @@ -134,7 +118,7 @@ export class Subscription implements ISubscription { this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); }); - this.startSubscriptionsMaintenance(this.keepAliveTimeout); + this.monitor.start(); return finalResult; } @@ -142,7 +126,7 @@ export class Subscription implements ISubscription { public async unsubscribe( contentTopics: ContentTopic[] ): Promise { - const peers = await this.peerManager.getPeers(); + const peers = await this.monitor.getPeers(); const promises = peers.map(async (peer) => { const response = await this.protocol.unsubscribe( this.pubsubTopic, @@ -161,26 +145,22 @@ export class Subscription implements ISubscription { const finalResult = this.handleResult(results, "unsubscribe"); if (this.subscriptionCallbacks.size === 0) { - this.stopSubscriptionsMaintenance(); + this.monitor.stop(); } return finalResult; } - public async ping(peerId?: PeerId): Promise { - log.info("Sending keep-alive ping"); - const peers = peerId - ? [peerId] - : (await this.peerManager.getPeers()).map((peer) => peer.id); + public async ping(): Promise { + const peers = await this.monitor.getPeers(); + const promises = peers.map((peer) => this.protocol.ping(peer)); - const promises = peers.map((peerId) => this.pingSpecificPeer(peerId)); const results = await Promise.allSettled(promises); - return this.handleResult(results, "ping"); } public async unsubscribeAll(): Promise { - const peers = await this.peerManager.getPeers(); + const peers = await this.monitor.getPeers(); const promises = peers.map(async (peer) => this.protocol.unsubscribeAll(this.pubsubTopic, peer) ); @@ -191,7 +171,7 @@ export class Subscription implements ISubscription { const finalResult = this.handleResult(results, "unsubscribeAll"); - this.stopSubscriptionsMaintenance(); + this.monitor.stop(); return finalResult; } @@ -200,12 +180,12 @@ export class Subscription implements ISubscription { message: WakuMessage, peerIdStr: PeerIdStr ): Promise { - const alreadyReceived = this.reliabilityMonitor.notifyMessageReceived( + const received = this.monitor.notifyMessageReceived( peerIdStr, message as IProtoMessage ); - if (alreadyReceived) { + if (received) { log.info("Message already received, skipping"); return; } @@ -225,20 +205,6 @@ export class Subscription implements ISubscription { await pushMessage(subscriptionCallback, this.pubsubTopic, message); } - private async subscribeWithPeerVerification( - peer: Peer, - contentTopics: string[] - ): Promise { - const result = await this.protocol.subscribe( - this.pubsubTopic, - peer, - contentTopics - ); - - await this.sendLightPushCheckMessage(peer); - return result; - } - private handleResult( results: PromiseSettledResult[], type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" @@ -263,145 +229,6 @@ export class Subscription implements ISubscription { } return result; } - - private async pingSpecificPeer(peerId: PeerId): Promise { - const peers = await this.peerManager.getPeers(); - const peer = peers.find((p) => p.id.equals(peerId)); - if (!peer) { - return { - success: null, - failure: { - peerId, - error: ProtocolError.NO_PEER_AVAILABLE - } - }; - } - - let result; - try { - result = await this.protocol.ping(peer); - } catch (error) { - result = { - success: null, - failure: { - peerId, - error: ProtocolError.GENERIC_FAIL - } - }; - } - - log.info( - `Received result from filter ping peerId:${peerId.toString()}\tsuccess:${result.success?.toString()}\tfailure:${result.failure?.error}` - ); - await this.reliabilityMonitor.handlePingResult(peerId, result); - return result; - } - - private startSubscriptionsMaintenance(timeout: number): void { - log.info("Starting subscriptions maintenance"); - this.startKeepAlivePings(timeout); - this.startConnectionListener(); - } - - private stopSubscriptionsMaintenance(): void { - log.info("Stopping subscriptions maintenance"); - this.stopKeepAlivePings(); - this.stopConnectionListener(); - } - - private startConnectionListener(): void { - this.connectionManager.addEventListener( - EConnectionStateEvents.CONNECTION_STATUS, - this.connectionListener.bind(this) as (v: CustomEvent) => void - ); - } - - private stopConnectionListener(): void { - this.connectionManager.removeEventListener( - EConnectionStateEvents.CONNECTION_STATUS, - this.connectionListener.bind(this) as (v: CustomEvent) => void - ); - } - - private async connectionListener({ - detail: isConnected - }: CustomEvent): Promise { - if (!isConnected) { - this.stopKeepAlivePings(); - return; - } - - try { - // we do nothing here, as the renewal process is managed internally by `this.ping()` - await this.ping(); - } catch (err) { - log.error(`networkStateListener failed to recover: ${err}`); - } - - this.startKeepAlivePings(this.keepAliveTimeout); - } - - private startKeepAlivePings(timeout: number): void { - if (this.keepAliveInterval) { - log.info("Recurring pings already set up."); - return; - } - - this.keepAliveInterval = setInterval(() => { - void this.ping(); - }, timeout); - } - - private stopKeepAlivePings(): void { - if (!this.keepAliveInterval) { - log.info("Already stopped recurring pings."); - return; - } - - log.info("Stopping recurring pings."); - clearInterval(this.keepAliveInterval); - this.keepAliveInterval = null; - } - - private async sendLightPushCheckMessage(peer: Peer): Promise { - if ( - this.lightPush && - this.libp2p && - this.reliabilityMonitor.shouldVerifyPeer(peer.id) - ) { - const encoder = createEncoder({ - contentTopic: this.buildLightPushContentTopic(), - pubsubTopic: this.pubsubTopic, - ephemeral: true - }); - - const message = { payload: new Uint8Array(1) }; - const protoMessage = await encoder.toProtoObj(message); - - // make a delay to be sure message is send when subscription is in place - setTimeout( - (async () => { - const result = await (this.lightPush!.protocol as LightPushCore).send( - encoder, - message, - peer - ); - this.reliabilityMonitor.notifyMessageSent(peer.id, protoMessage); - if (result.failure) { - log.error( - `failed to send lightPush ping message to peer:${peer.id.toString()}\t${result.failure.error}` - ); - return; - } - }) as () => void, - DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL - ); - } - } - - private buildLightPushContentTopic(): string { - return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`; - } } async function pushMessage( diff --git a/packages/sdk/src/protocols/filter/subscription_monitor.ts b/packages/sdk/src/protocols/filter/subscription_monitor.ts new file mode 100644 index 0000000000..966e197f89 --- /dev/null +++ b/packages/sdk/src/protocols/filter/subscription_monitor.ts @@ -0,0 +1,287 @@ +import type { EventHandler, Peer, PeerId } from "@libp2p/interface"; +import { FilterCore } from "@waku/core"; +import type { + FilterProtocolOptions, + IConnectionManager, + ILightPush, + IProtoMessage, + Libp2p +} from "@waku/interfaces"; +import { EConnectionStateEvents } from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; + +import { PeerManager } from "../peer_manager.js"; + +// TODO(weboko): consider adding as config property or combine with maxAllowedPings +const MAX_SUBSCRIBE_ATTEMPTS = 3; + +type SubscriptionMonitorConstructorOptions = { + pubsubTopic: string; + config: FilterProtocolOptions; + libp2p: Libp2p; + connectionManager: IConnectionManager; + filter: FilterCore; + peerManager: PeerManager; + lightPush?: ILightPush; + activeSubscriptions: Map; +}; + +export class SubscriptionMonitor { + /** + * Cached peers that are in use by subscription. + * Needed to understand if they disconnect later or not. + */ + public peers: Peer[] = []; + + private isStarted: boolean = false; + + private readonly pubsubTopic: string; + private readonly config: FilterProtocolOptions; + + private readonly libp2p: Libp2p; + private readonly filter: FilterCore; + private readonly peerManager: PeerManager; + private readonly connectionManager: IConnectionManager; + private readonly activeSubscriptions: Map; + + private keepAliveIntervalId: number | undefined; + private pingFailedAttempts = new Map(); + + private receivedMessagesFormPeer = new Set(); + private receivedMessages = new Set(); + private verifiedPeers = new Set(); + + public constructor(options: SubscriptionMonitorConstructorOptions) { + this.config = options.config; + this.connectionManager = options.connectionManager; + this.filter = options.filter; + this.peerManager = options.peerManager; + this.libp2p = options.libp2p; + this.activeSubscriptions = options.activeSubscriptions; + this.pubsubTopic = options.pubsubTopic; + + this.onConnectionChange = this.onConnectionChange.bind(this); + this.onPeerConnected = this.onPeerConnected.bind(this); + this.onPeerDisconnected = this.onPeerDisconnected.bind(this); + } + + /** + * @returns content topic used for Filter verification + */ + public get reservedContentTopic(): string { + return `/js-waku-subscription-ping/1/${this.libp2p.peerId.toString()}/utf8`; + } + + /** + * Starts: + * - recurring ping queries; + * - connection event observers; + */ + public start(): void { + if (this.isStarted) { + return; + } + + this.isStarted = true; + + this.startKeepAlive(); + this.startConnectionListener(); + this.startPeerConnectionListener(); + } + + /** + * Stops all recurring queries, event listeners or timers. + */ + public stop(): void { + if (!this.isStarted) { + return; + } + + this.isStarted = false; + + this.stopKeepAlive(); + this.stopConnectionListener(); + this.stopPeerConnectionListener(); + } + + /** + * Method to get peers that are used by particular subscription or, if initially called, peers that can be used by subscription. + * @returns array of peers + */ + public async getPeers(): Promise { + if (!this.isStarted) { + this.peers = await this.peerManager.getPeers(); + } + + return this.peers; + } + + /** + * Notifies monitor if message was received. + * + * @param peerId peer from which message is received + * @param message received message + * + * @returns true if message was received from peer + */ + public notifyMessageReceived( + peerId: string, + message: IProtoMessage + ): boolean { + const hash = this.buildMessageHash(message); + + this.verifiedPeers.add(peerId); + this.receivedMessagesFormPeer.add(`${peerId}-${hash}`); + + if (this.receivedMessages.has(hash)) { + return true; + } + + this.receivedMessages.add(hash); + + return false; + } + + private buildMessageHash(message: IProtoMessage): string { + return messageHashStr(this.pubsubTopic, message); + } + + private startConnectionListener(): void { + this.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.onConnectionChange as (v: CustomEvent) => void + ); + } + + private stopConnectionListener(): void { + this.connectionManager.removeEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + this.onConnectionChange as (v: CustomEvent) => void + ); + } + + private async onConnectionChange({ + detail: isConnected + }: CustomEvent): Promise { + if (!isConnected) { + this.stopKeepAlive(); + return; + } + + await Promise.all(this.peers.map((peer) => this.ping(peer, true))); + this.startKeepAlive(); + } + + private startKeepAlive(): void { + if (this.keepAliveIntervalId) { + return; + } + + this.keepAliveIntervalId = setInterval(() => { + void this.peers.map((peer) => this.ping(peer)); + }, this.config.keepAliveIntervalMs) as unknown as number; + } + + private stopKeepAlive(): void { + if (!this.keepAliveIntervalId) { + return; + } + + clearInterval(this.keepAliveIntervalId); + this.keepAliveIntervalId = undefined; + } + + private startPeerConnectionListener(): void { + this.libp2p.addEventListener( + "peer:connect", + this.onPeerConnected as EventHandler> + ); + this.libp2p.addEventListener( + "peer:disconnect", + this.onPeerDisconnected as EventHandler> + ); + } + + private stopPeerConnectionListener(): void { + this.libp2p.removeEventListener( + "peer:connect", + this.onPeerConnected as EventHandler> + ); + this.libp2p.removeEventListener( + "peer:disconnect", + this.onPeerDisconnected as EventHandler> + ); + } + + private async onPeerConnected(_event: CustomEvent): Promise { + // TODO(weboko): use config.numOfUsedPeers here + if (this.peers.length > 0) { + return; + } + + this.peers = await this.peerManager.getPeers(); + await Promise.all(this.peers.map((peer) => this.subscribe(peer))); + } + + private async onPeerDisconnected(event: CustomEvent): Promise { + const hasNotBeenUsed = !this.peers.find((p) => p.id.equals(event.detail)); + if (hasNotBeenUsed) { + return; + } + + this.peers = await this.peerManager.getPeers(); + await Promise.all(this.peers.map((peer) => this.subscribe(peer))); + } + + private async subscribe(_peer: Peer | undefined): Promise { + let peer: Peer | undefined = _peer; + + for (let i = 0; i < MAX_SUBSCRIBE_ATTEMPTS; i++) { + if (!peer) { + return; + } + + const response = await this.filter.subscribe( + this.pubsubTopic, + peer, + Array.from(this.activeSubscriptions.keys()) + ); + + if (response.success) { + return; + } + + peer = await this.peerManager.requestRenew(peer.id); + } + } + + private async ping( + peer: Peer, + renewOnFirstFail: boolean = false + ): Promise { + const peerIdStr = peer.id.toString(); + const response = await this.filter.ping(peer); + + if (response.failure && renewOnFirstFail) { + const newPeer = await this.peerManager.requestRenew(peer.id); + await this.subscribe(newPeer); + return; + } + + if (response.failure) { + const prev = this.pingFailedAttempts.get(peerIdStr) || 0; + this.pingFailedAttempts.set(peerIdStr, prev + 1); + } + + if (response.success) { + this.pingFailedAttempts.set(peerIdStr, 0); + } + + const madeAttempts = this.pingFailedAttempts.get(peerIdStr) || 0; + + if (madeAttempts >= this.config.pingsBeforePeerRenewed) { + const newPeer = await this.peerManager.requestRenew(peer.id); + await this.subscribe(newPeer); + } + } +} diff --git a/packages/sdk/src/reliability_monitor/index.ts b/packages/sdk/src/reliability_monitor/index.ts deleted file mode 100644 index 120d208404..0000000000 --- a/packages/sdk/src/reliability_monitor/index.ts +++ /dev/null @@ -1,56 +0,0 @@ -import type { Peer } from "@libp2p/interface"; -import { - ContentTopic, - CoreProtocolResult, - PubsubTopic -} from "@waku/interfaces"; - -import { PeerManager } from "../protocols/peer_manager.js"; - -import { ReceiverReliabilityMonitor } from "./receiver.js"; - -export class ReliabilityMonitorManager { - private static receiverMonitors: Map< - PubsubTopic, - ReceiverReliabilityMonitor - > = new Map(); - - public static createReceiverMonitor( - pubsubTopic: PubsubTopic, - peerManager: PeerManager, - getContentTopics: () => ContentTopic[], - protocolSubscribe: ( - pubsubTopic: PubsubTopic, - peer: Peer, - contentTopics: ContentTopic[] - ) => Promise, - sendLightPushMessage: (peer: Peer) => Promise - ): ReceiverReliabilityMonitor { - if (ReliabilityMonitorManager.receiverMonitors.has(pubsubTopic)) { - return ReliabilityMonitorManager.receiverMonitors.get(pubsubTopic)!; - } - - const monitor = new ReceiverReliabilityMonitor( - pubsubTopic, - peerManager, - getContentTopics, - protocolSubscribe, - sendLightPushMessage - ); - ReliabilityMonitorManager.receiverMonitors.set(pubsubTopic, monitor); - return monitor; - } - - private constructor() {} - - public static stop(pubsubTopic: PubsubTopic): void { - this.receiverMonitors.delete(pubsubTopic); - } - - public static stopAll(): void { - for (const [pubsubTopic, monitor] of this.receiverMonitors) { - monitor.setMaxPingFailures(undefined); - this.receiverMonitors.delete(pubsubTopic); - } - } -} diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts deleted file mode 100644 index 6e2c6d59dc..0000000000 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ /dev/null @@ -1,185 +0,0 @@ -import type { Peer, PeerId } from "@libp2p/interface"; -import { - ContentTopic, - CoreProtocolResult, - IProtoMessage, - PeerIdStr, - PubsubTopic -} from "@waku/interfaces"; -import { messageHashStr } from "@waku/message-hash"; -import { Logger } from "@waku/utils"; -import { bytesToUtf8 } from "@waku/utils/bytes"; - -import { PeerManager } from "../protocols/peer_manager.js"; - -const log = new Logger("sdk:receiver:reliability_monitor"); - -const DEFAULT_MAX_PINGS = 3; -const MESSAGE_VERIFICATION_DELAY = 5_000; - -export class ReceiverReliabilityMonitor { - private receivedMessagesFormPeer = new Set(); - private receivedMessages = new Set(); - private scheduledVerification = new Map(); - private verifiedPeers = new Set(); - - private peerFailures: Map = new Map(); - private maxPingFailures: number = DEFAULT_MAX_PINGS; - private peerRenewalLocks: Set = new Set(); - - public constructor( - private readonly pubsubTopic: PubsubTopic, - private readonly peerManager: PeerManager, - private getContentTopics: () => ContentTopic[], - private protocolSubscribe: ( - pubsubTopic: PubsubTopic, - peer: Peer, - contentTopics: ContentTopic[] - ) => Promise, - private sendLightPushMessage: (peer: Peer) => Promise - ) {} - - public setMaxPingFailures(value: number | undefined): void { - if (value === undefined) { - return; - } - this.maxPingFailures = value; - } - - public async handlePingResult( - peerId: PeerId, - result?: CoreProtocolResult - ): Promise { - if (result?.success) { - this.peerFailures.delete(peerId.toString()); - return; - } - - const failures = (this.peerFailures.get(peerId.toString()) || 0) + 1; - this.peerFailures.set(peerId.toString(), failures); - - if (failures >= this.maxPingFailures) { - try { - log.info( - `Attempting to renew ${peerId.toString()} due to ping failures.` - ); - await this.renewAndSubscribePeer(peerId); - this.peerFailures.delete(peerId.toString()); - } catch (error) { - log.error(`Failed to renew peer ${peerId.toString()}: ${error}.`); - } - } - } - - public notifyMessageReceived( - peerIdStr: string, - message: IProtoMessage - ): boolean { - const hash = this.buildMessageHash(message); - - this.verifiedPeers.add(peerIdStr); - this.receivedMessagesFormPeer.add(`${peerIdStr}-${hash}`); - - log.info( - `notifyMessage received debug: ephemeral:${message.ephemeral}\t${bytesToUtf8(message.payload)}` - ); - log.info(`notifyMessage received: peer:${peerIdStr}\tmessage:${hash}`); - - if (this.receivedMessages.has(hash)) { - return true; - } - - this.receivedMessages.add(hash); - - return false; - } - - public notifyMessageSent(peerId: PeerId, message: IProtoMessage): void { - const peerIdStr = peerId.toString(); - const hash = this.buildMessageHash(message); - - log.info(`notifyMessage sent debug: ${bytesToUtf8(message.payload)}`); - - if (this.scheduledVerification.has(peerIdStr)) { - log.warn( - `notifyMessage sent: attempting to schedule verification for pending peer:${peerIdStr}\tmessage:${hash}` - ); - return; - } - - const timeout = window.setTimeout( - (async () => { - const receivedAnyMessage = this.verifiedPeers.has(peerIdStr); - const receivedTestMessage = this.receivedMessagesFormPeer.has( - `${peerIdStr}-${hash}` - ); - - if (receivedAnyMessage || receivedTestMessage) { - log.info( - `notifyMessage sent setTimeout: verified that peer pushes filter messages, peer:${peerIdStr}\tmessage:${hash}` - ); - return; - } - - log.warn( - `notifyMessage sent setTimeout: peer didn't return probe message, attempting renewAndSubscribe, peer:${peerIdStr}\tmessage:${hash}` - ); - this.scheduledVerification.delete(peerIdStr); - await this.renewAndSubscribePeer(peerId); - }) as () => void, - MESSAGE_VERIFICATION_DELAY - ); - - this.scheduledVerification.set(peerIdStr, timeout); - } - - public shouldVerifyPeer(peerId: PeerId): boolean { - const peerIdStr = peerId.toString(); - - const isPeerVerified = this.verifiedPeers.has(peerIdStr); - const isVerificationPending = this.scheduledVerification.has(peerIdStr); - - return !(isPeerVerified || isVerificationPending); - } - - private buildMessageHash(message: IProtoMessage): string { - return messageHashStr(this.pubsubTopic, message); - } - - private async renewAndSubscribePeer( - peerId: PeerId - ): Promise { - const peerIdStr = peerId.toString(); - try { - if (this.peerRenewalLocks.has(peerIdStr)) { - log.info(`Peer ${peerIdStr} is already being renewed.`); - return; - } - - this.peerRenewalLocks.add(peerIdStr); - - const newPeer = await this.peerManager.requestRenew(peerId); - if (!newPeer) { - log.warn(`Failed to renew peer ${peerIdStr}: No new peer found.`); - return; - } - - await this.protocolSubscribe( - this.pubsubTopic, - newPeer, - this.getContentTopics() - ); - - await this.sendLightPushMessage(newPeer); - - this.peerFailures.delete(peerIdStr); - - return newPeer; - } catch (error) { - log.error(`Failed to renew peer ${peerIdStr}: ${error}.`); - return; - } finally { - this.peerRenewalLocks.delete(peerIdStr); - } - } -} diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index c2f3a339e2..a4f2d43ffe 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -20,7 +20,6 @@ import { wakuFilter } from "../protocols/filter/index.js"; import { wakuLightPush } from "../protocols/light_push/index.js"; import { PeerManager } from "../protocols/peer_manager.js"; import { wakuStore } from "../protocols/store/index.js"; -import { ReliabilityMonitorManager } from "../reliability_monitor/index.js"; import { waitForRemotePeer } from "./wait_for_remote_peer.js"; @@ -178,7 +177,6 @@ export class WakuNode implements IWaku { } public async stop(): Promise { - ReliabilityMonitorManager.stopAll(); this.peerManager.stop(); this.connectionManager.stop(); await this.libp2p.stop();