Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,10 @@ export default class RedisClient<
return new RedisCommandsQueue(
this.#options.RESP ?? DEFAULT_RESP,
this.#options.commandsQueueMaxLength,
(channel, listeners) => {
this.emit('sharded-channel-moved', channel, listeners);
this.emit('server-sunsubscribe', channel, listeners);
},
Comment thread
aartisonigra marked this conversation as resolved.
(channel, listeners) => this.emit('sharded-channel-moved', channel, listeners),
clientId
);
Expand Down
31 changes: 17 additions & 14 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -946,20 +946,21 @@ export default class RedisClusterSlots<
}

async #initiateShardedPubSubClient(master: MasterNode<M, F, S, RESP, TYPE_MAPPING>) {
const client = this.#createClient(master, false)
.on('server-sunsubscribe', async (channel, listeners) => {
try {
await this.rediscover(client);
const redirectTo = await this.getShardedPubSubClient(channel);
await redirectTo.extendPubSubChannelListeners(
PUBSUB_TYPE.SHARDED,
channel,
listeners
);
} catch (err) {
this.#emit('sharded-shannel-moved-error', err, channel, listeners);
}
});
const client = this.#createClient(master, false);

client.on('server-sunsubscribe', async (channel, listeners) => {
try {
await this.rediscover(client);
const redirectTo = await this.getShardedPubSubClient(channel);
await redirectTo.extendPubSubChannelListeners(
PUBSUB_TYPE.SHARDED,
channel,
listeners
);
} catch (err) {
this.#emit('sharded-channel-moved-error', err, channel, listeners);
}
});

master.pubSub = {
client,
Expand All @@ -977,6 +978,8 @@ export default class RedisClusterSlots<
return master.pubSub.connectPromise!;
}



async executeShardedUnsubscribeCommand(
channel: string,
unsubscribe: (client: RedisClientType<M, F, S, RESP, TYPE_MAPPING>) => Promise<void>
Expand Down
5 changes: 4 additions & 1 deletion packages/search/lib/commands/SEARCH.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { strict as assert } from 'node:assert';

import testUtils, { GLOBAL } from '../test-utils';
import SEARCH from './SEARCH';
import { parseArgs } from '@redis/client/lib/commands/generic-transformers';
Expand Down Expand Up @@ -260,7 +261,9 @@ describe('FT.SEARCH', () => {
number: 1
}
}),
['FT.SEARCH', 'index', 'query', 'PARAMS', '6', 'string', 'string', 'buffer', Buffer.from('buffer'), 'number', '1', 'DIALECT', DEFAULT_DIALECT]

['FT.SEARCH', 'index', 'query', 'PARAMS', '3', 'string', 'string', 'buffer', Buffer.from('buffer'), 'number', '1', 'DIALECT', DEFAULT_DIALECT]

);
});

Expand Down
43 changes: 28 additions & 15 deletions packages/search/lib/commands/SEARCH.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,37 @@ import { getMapValue, mapLikeToObject, mapLikeValues, parseDocumentValue, parseS
export type FtSearchParams = Record<string, RedisArgument | number>;

export function parseParamsArgument(parser: CommandParser, params?: FtSearchParams) {
if (params) {
parser.push('PARAMS');

const args: Array<RedisArgument> = [];
for (const key in params) {
if (!Object.hasOwn(params, key)) continue;

const value = params[key];
args.push(
key,
typeof value === 'number' ? value.toString() : value
);
}

parser.pushVariadicWithLength(args);
if (!params) return;

parser.push('PARAMS');

// FT.SEARCH expects: PARAMS <num-args> <k1> <v1> <k2> <v2> ...
// Where <num-args> is the *number of pairs*.
//
// The previous implementation incorrectly used `pushVariadicWithLength(args)`
// which sets the length to the number of *arguments*, not the required
// number of pairs. This causes exact-match queries like `@field:"$x"`
// to bind parameters incorrectly on some Redis Stack/FT versions.
const pairArgs: Array<RedisArgument> = [];
let pairs = 0;

for (const key in params) {
if (!Object.hasOwn(params, key)) continue;

const value = params[key];
pairArgs.push(
key,
typeof value === 'number' ? value.toString() : value
);
pairs++;
}

// <num-args> is the number of pairs, so it must be `pairs`.
parser.push(pairs.toString());
parser.pushVariadic(pairArgs);
}


export interface FtSearchOptions {
VERBATIM?: boolean;
NOSTOPWORDS?: boolean;
Expand Down