Skip to content

Commit 40b5b18

Browse files
feat: use dedicated client
1 parent c12c8b6 commit 40b5b18

File tree

2 files changed

+13
-5
lines changed

2 files changed

+13
-5
lines changed

lib/adapter.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export interface RedisStreamsAdapterOptions {
3333
readCount?: number;
3434
/**
3535
* The number of ms before timing out.
36-
* @default 200
36+
* @default 1_000
3737
* @see https://redis.io/docs/latest/commands/xread/#blocking-for-data
3838
*/
3939
blockTimeInMs?: number;
@@ -67,7 +67,7 @@ export function createAdapter(
6767
streamName: "socket.io",
6868
maxLen: 10_000,
6969
readCount: 100,
70-
blockTimeInMs: 200,
70+
blockTimeInMs: 1_000,
7171
sessionKeyPrefix: "sio:session:",
7272
heartbeatInterval: 5_000,
7373
heartbeatTimeout: 10_000,
@@ -78,10 +78,13 @@ export function createAdapter(
7878
let polling = false;
7979
let shouldClose = false;
8080

81+
// we create a Redis client dedicated to polling the stream so that it does not interfere with XADD operations
82+
const pollClient = redisClient.duplicate();
83+
8184
async function poll() {
8285
try {
8386
let response = await XREAD(
84-
redisClient,
87+
pollClient,
8588
options.streamName,
8689
offset,
8790
options.readCount,
@@ -120,7 +123,11 @@ export function createAdapter(
120123
if (!polling) {
121124
polling = true;
122125
shouldClose = false;
123-
poll();
126+
if (typeof pollClient.connect === "function") {
127+
pollClient.connect().then(poll);
128+
} else {
129+
poll();
130+
}
124131
}
125132

126133
const defaultClose = adapter.close;
@@ -130,6 +137,7 @@ export function createAdapter(
130137

131138
if (namespaceToAdapters.size === 0) {
132139
shouldClose = true;
140+
pollClient.quit();
133141
}
134142

135143
defaultClose.call(adapter);

test/util.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ export function setup({
179179
const httpServer = createServer();
180180
const io = new Server(httpServer, {
181181
adapter: createAdapter(redisClient, {
182-
blockTimeInMs: 20, // reduce the polling interval to speed up the tests
182+
readCount: 1, // return as soon as possible
183183
}),
184184
...serverOptions,
185185
});

0 commit comments

Comments
 (0)