Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cmd parser with commands #2733

Draft
wants to merge 9 commits into
base: v5
Choose a base branch
from
Draft
  •  
  •  
  •  
34 changes: 30 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/client/lib/RESP/encoder.ts
Expand Up @@ -2,7 +2,7 @@ import { RedisArgument } from './types';

const CRLF = '\r\n';

export default function encodeCommand(args: Array<RedisArgument>): Array<RedisArgument> {
export default function encodeCommand(args: ReadonlyArray<RedisArgument>): ReadonlyArray<RedisArgument> {
const toWrite: Array<RedisArgument> = [];

let strings = '*' + args.length + CRLF;
Expand Down
2 changes: 2 additions & 0 deletions packages/client/lib/RESP/types.ts
@@ -1,3 +1,4 @@
import { CommandParser } from '../client/parser';
import { BlobError, SimpleError } from '../errors';
import { RedisScriptConfig, SHA1 } from '../lua-script';
import { RESP_TYPES } from './decoder';
Expand Down Expand Up @@ -272,6 +273,7 @@ export type Command = {
*/
IS_FORWARD_COMMAND?: boolean;
// POLICIES?: CommandPolicies;
parseCommand?(this: void, parser: CommandParser, ...args: Array<any>): void;
transformArguments(this: void, ...args: Array<any>): CommandArguments;
TRANSFORM_LEGACY_REPLY?: boolean;
transformReply: TransformReply | Record<RespVersions, TransformReply>;
Expand Down
8 changes: 4 additions & 4 deletions packages/client/lib/client/commands-queue.ts
@@ -1,7 +1,7 @@
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
import encodeCommand from '../RESP/encoder';
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types';
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { AbortError, ErrorReply } from '../errors';
import { MonitorCallback } from '.';
Expand All @@ -17,7 +17,7 @@ export interface CommandOptions<T = TypeMapping> {
}

export interface CommandToWrite extends CommandWaitingForReply {
args: CommandArguments;
args: ReadonlyArray<RedisArgument>;
chainId: symbol | undefined;
abort: {
signal: AbortSignal;
Expand Down Expand Up @@ -117,7 +117,7 @@ export default class RedisCommandsQueue {
}

addCommand<T>(
args: CommandArguments,
args: ReadonlyArray<RedisArgument>,
options?: CommandOptions
): Promise<T> {
if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) {
Expand Down Expand Up @@ -346,7 +346,7 @@ export default class RedisCommandsQueue {
*commandsToWrite() {
let toSend = this.#toWrite.shift();
while (toSend) {
let encoded: CommandArguments;
let encoded: ReadonlyArray<RedisArgument>
try {
encoded = encodeCommand(toSend.args);
} catch (err) {
Expand Down
114 changes: 82 additions & 32 deletions packages/client/lib/client/index.ts
Expand Up @@ -7,14 +7,15 @@ import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchErr
import { URL } from 'node:url';
import { TcpSocketConnectOpts } from 'node:net';
import { PUBSUB_TYPE, PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument, ReplyWithTypeMapping, SimpleStringReply } from '../RESP/types';
import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument, ReplyWithTypeMapping, SimpleStringReply, TransformReply } from '../RESP/types';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
import { RedisMultiQueuedCommand } from '../multi-command';
import HELLO, { HelloOptions } from '../commands/HELLO';
import { ScanOptions, ScanCommonOptions } from '../commands/SCAN';
import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode';
import { RedisPoolOptions, RedisClientPool } from './pool';
import { RedisVariadicArgument, pushVariadicArguments } from '../commands/generic-transformers';
import { BasicCommandParser, CommandParser } from './parser';

export interface RedisClientOptions<
M extends RedisModules = RedisModules,
Expand Down Expand Up @@ -151,52 +152,87 @@ export default class RedisClient<
> extends EventEmitter {
static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);

return async function (this: ProxyClient, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
if (command.parseCommand) {
const parser = new BasicCommandParser(resp);

command.parseCommand(parser, ...args);

return this.executeCommand(parser, this._commandOptions, transformReply);
} else {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
}
}

static #createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);

return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this._self.sendCommand(redisArgs, this._self._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
if (command.parseCommand) {
const parser = new BasicCommandParser(resp);
command.parseCommand(parser, ...args);

return this._self.executeCommand(parser, this._self._commandOptions, transformReply);
} else {
const redisArgs = command.transformArguments(...args),
reply = await this._self.sendCommand(redisArgs, this._self._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
}
};
}

static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp);
const prefix = functionArgumentsPrefix(name, fn);
const transformReply = getTransformReply(fn, resp);

return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args),
reply = await this._self.sendCommand(
prefix.concat(fnArgs),
this._self._commandOptions
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
if (fn.parseCommand) {
const parser = new BasicCommandParser(resp);
parser.pushVariadic(prefix);
fn.parseCommand(parser, ...args);

return this._self.executeCommand(parser, this._self._commandOptions, transformReply);
} else {
const fnArgs = fn.transformArguments(...args),
reply = await this._self.sendCommand(
prefix.concat(fnArgs),
this._self._commandOptions
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
}
};
}

static #createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp);
const prefix = scriptArgumentsPrefix(script);
const transformReply = getTransformReply(script, resp);

return async function (this: ProxyClient, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this.executeScript(script, redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
};
if (script.parseCommand) {
const parser = new BasicCommandParser(resp);
parser.pushVariadic(prefix);
script.parseCommand(parser, ...args);

return this.executeCommand(parser, this._commandOptions, transformReply);
} else {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this.executeScript(script, redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
};
}
}

static factory<
Expand Down Expand Up @@ -573,8 +609,22 @@ export default class RedisClient<
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
}

async executeCommand<T = ReplyUnion>(
parser: CommandParser,
commandOptions: CommandOptions<TYPE_MAPPING> | undefined,
transformReply: TransformReply | undefined
) {
const reply = await this.sendCommand(parser.redisArgs, commandOptions);

if (transformReply) {
return transformReply(reply, parser.preserve);
}

return reply;
}

sendCommand<T = ReplyUnion>(
args: Array<RedisArgument>,
args: ReadonlyArray<RedisArgument>,
options?: CommandOptions
): Promise<T> {
if (!this._self.#socket.isOpen) {
Expand Down