File tree Expand file tree Collapse file tree 2 files changed +30
-17
lines changed Expand file tree Collapse file tree 2 files changed +30
-17
lines changed Original file line number Diff line number Diff line change @@ -55,6 +55,10 @@ export function createAdapter(
5555 redisClient : any ,
5656 opts ?: RedisStreamsAdapterOptions & ClusterAdapterOptions
5757) {
58+ const internalRedisClient =
59+ typeof redisClient . createPool === "function"
60+ ? redisClient . createPool ( )
61+ : redisClient ;
5862 const namespaceToAdapters = new Map < string , RedisStreamsAdapter > ( ) ;
5963 const options = Object . assign (
6064 {
@@ -74,7 +78,7 @@ export function createAdapter(
7478 async function poll ( ) {
7579 try {
7680 let response = await XREAD (
77- redisClient ,
81+ internalRedisClient ,
7882 options . streamName ,
7983 offset ,
8084 options . readCount
@@ -106,7 +110,7 @@ export function createAdapter(
106110 }
107111
108112 return function ( nsp ) {
109- const adapter = new RedisStreamsAdapter ( nsp , redisClient , options ) ;
113+ const adapter = new RedisStreamsAdapter ( nsp , internalRedisClient , options ) ;
110114 namespaceToAdapters . set ( nsp . name , adapter ) ;
111115
112116 if ( ! polling ) {
Original file line number Diff line number Diff line change @@ -37,7 +37,10 @@ export function hasBinary(obj: any, toJSON?: boolean): boolean {
3737 * @see https://github.com/redis/node-redis
3838 */
3939function isRedisV4Client ( redisClient : any ) {
40- return typeof redisClient . sSubscribe === "function" ;
40+ return (
41+ typeof redisClient . sSubscribe === "function" ||
42+ typeof redisClient . totalClients === "number"
43+ ) ;
4144}
4245
4346/**
@@ -68,21 +71,27 @@ export function XREAD(
6871) {
6972 if ( isRedisV4Client ( redisClient ) ) {
7073 return import ( "redis" ) . then ( ( redisPackage ) => {
71- return redisClient . xRead (
72- redisPackage . commandOptions ( {
73- isolated : true ,
74- } ) ,
75- [
76- {
77- key : streamName ,
78- id : offset ,
79- } ,
80- ] ,
74+ const streams = [
8175 {
82- COUNT : readCount ,
83- BLOCK : 5000 ,
84- }
85- ) ;
76+ key : streamName ,
77+ id : offset ,
78+ } ,
79+ ] ;
80+ const options = {
81+ COUNT : readCount ,
82+ BLOCK : 5000 ,
83+ } ;
84+ if ( redisPackage . commandOptions ) {
85+ return redisClient . xRead (
86+ redisPackage . commandOptions ( {
87+ isolated : true ,
88+ } ) ,
89+ streams ,
90+ options
91+ ) ;
92+ } else {
93+ return redisClient . xRead ( streams , options ) ;
94+ }
8695 } ) ;
8796 } else {
8897 return redisClient
You can’t perform that action at this time.
0 commit comments