Skip to content

Commit

Permalink
feat: inroduce subscription manager
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko committed Jan 14, 2025
1 parent 2a5df25 commit c43cec2
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 520 deletions.
1 change: 0 additions & 1 deletion packages/sdk/src/protocols/filter/constants.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export const DEFAULT_KEEP_ALIVE = 60_000;
export const DEFAULT_MAX_PINGS = 3;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false;
export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000;
12 changes: 0 additions & 12 deletions packages/sdk/src/protocols/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import type {
IDecoder,
IFilter,
ILightPush,
IProtoMessage,
Libp2p,
PubsubTopic,
SubscribeResult,
Expand All @@ -25,7 +24,6 @@ import {

import { PeerManager } from "../peer_manager.js";

import { MessageCache } from "./message_cache.js";
import { Subscription } from "./subscription.js";
import { buildConfig } from "./utils.js";

Expand All @@ -35,7 +33,6 @@ class Filter implements IFilter {
public readonly protocol: FilterCore;

private readonly config: FilterProtocolOptions;
private readonly messageCache: MessageCache;
private activeSubscriptions = new Map<string, Subscription>();

public constructor(
Expand All @@ -46,7 +43,6 @@ class Filter implements IFilter {
config?: Partial<FilterProtocolOptions>
) {
this.config = buildConfig(config);
this.messageCache = new MessageCache(libp2p);

this.protocol = new FilterCore(
async (pubsubTopic, wakuMessage, peerIdStr) => {
Expand All @@ -58,14 +54,6 @@ class Filter implements IFilter {
return;
}

if (this.messageCache.has(pubsubTopic, wakuMessage as IProtoMessage)) {
log.info(
`Skipping duplicate message for pubsubTopic:${pubsubTopic} peerId:${peerIdStr}`
);
return;
}

this.messageCache.set(pubsubTopic, wakuMessage as IProtoMessage);
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},

Expand Down
53 changes: 0 additions & 53 deletions packages/sdk/src/protocols/filter/message_cache.ts

This file was deleted.

Loading

0 comments on commit c43cec2

Please sign in to comment.