Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC of restructuring commands #2702

Draft
wants to merge 6 commits into
base: v5
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/client/lib/RESP/types.ts
@@ -1,3 +1,4 @@
import { CommandParser } from '../commander';
import { BlobError, SimpleError } from '../errors';
import { RedisScriptConfig, SHA1 } from '../lua-script';
import { RESP_TYPES } from './decoder';
Expand Down Expand Up @@ -272,6 +273,7 @@ export type Command = {
*/
IS_FORWARD_COMMAND?: boolean;
// POLICIES?: CommandPolicies;
parseCommand?(this: void, parser: CommandParser, ...args: Array<any>): void;
transformArguments(this: void, ...args: Array<any>): CommandArguments;
TRANSFORM_LEGACY_REPLY?: boolean;
transformReply: TransformReply | Record<RespVersions, TransformReply>;
Expand Down
122 changes: 90 additions & 32 deletions packages/client/lib/client/index.ts
Expand Up @@ -2,12 +2,12 @@ import COMMANDS from '../commands';
import RedisSocket, { RedisSocketOptions } from './socket';
import RedisCommandsQueue, { CommandOptions } from './commands-queue';
import { EventEmitter } from 'node:events';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { BasicCommandParser, CachedCommandParser, CommandParser, attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchError } from '../errors';
import { URL } from 'node:url';
import { TcpSocketConnectOpts } from 'node:net';
import { PUBSUB_TYPE, PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub';
import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument, ReplyWithTypeMapping, SimpleStringReply } from '../RESP/types';
import { Command, CommandSignature, TypeMapping, CommanderConfig, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, ReplyUnion, RespVersions, RedisArgument, ReplyWithTypeMapping, SimpleStringReply, TransformReply } from '../RESP/types';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';
import { RedisMultiQueuedCommand } from '../multi-command';
import HELLO, { HelloOptions } from '../commands/HELLO';
Expand Down Expand Up @@ -151,52 +151,84 @@ export default class RedisClient<
> extends EventEmitter {
static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);

return async function (this: ProxyClient, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
if (command.parseCommand) {
const parser = this._self.#newCommandParser(resp);
command.parseCommand(parser, ...args);

return this.executeCommand(undefined, parser, this._self._commandOptions, transformReply);
} else {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
}
}

static #createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);

return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this._self.sendCommand(redisArgs, this._self._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
if (command.parseCommand) {
const parser = this._self.#newCommandParser(resp);
command.parseCommand(parser, ...args);

return this._self.executeCommand(undefined, parser, this._self._commandOptions, transformReply);
} else {
const redisArgs = command.transformArguments(...args),
reply = await this._self.sendCommand(redisArgs, this._self._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
}
};
}

static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp);
const prefix = functionArgumentsPrefix(name, fn);
const transformReply = getTransformReply(fn, resp);

return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args),
reply = await this._self.sendCommand(
prefix.concat(fnArgs),
this._self._commandOptions
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
if (fn.parseCommand) {
const parser = this._self.#newCommandParser(resp);
fn.parseCommand(parser, ...args);

return this._self.executeCommand(prefix, parser, this._self._commandOptions, transformReply);
} else {
const fnArgs = fn.transformArguments(...args),
reply = await this._self.sendCommand(
prefix.concat(fnArgs),
this._self._commandOptions
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
}
};
}

static #createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp);
const prefix = scriptArgumentsPrefix(script);
const transformReply = getTransformReply(script, resp);

return async function (this: ProxyClient, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this.executeScript(script, redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
};
if (script.parseCommand) {
const parser = this._self.#newCommandParser(resp);
script.parseCommand(parser, ...args);

return this.executeCommand(prefix, parser, this._self._commandOptions, transformReply);
} else {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this.executeScript(script, redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
};
}
}

static factory<
Expand Down Expand Up @@ -309,6 +341,10 @@ export default class RedisClient<
this._self.#dirtyWatch = msg;
}

#newCommandParser(resp: RespVersions): CommandParser {
return new BasicCommandParser(resp);
}

constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
super();
this.#options = this.#initiateOptions(options);
Expand Down Expand Up @@ -573,6 +609,28 @@ export default class RedisClient<
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
}

async executeCommand<T = ReplyUnion>(
prefix: Array<string | Buffer> | undefined,
parser: CommandParser,
commandOptions: CommandOptions<TYPE_MAPPING> | undefined,
transformReply: TransformReply | undefined
) {
const redisArgs = prefix ? prefix.concat(parser.redisArgs) : parser.redisArgs;
const fn = () => { return this.sendCommand(redisArgs, commandOptions) };

const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions;
if (parser instanceof CachedCommandParser && parser.cachable && defaultTypeMapping) {
// TODO: caching goes here.
} else {
const reply = await fn();

if (transformReply) {
return transformReply(reply, parser.preserve);
}
return reply;
}
}

sendCommand<T = ReplyUnion>(
args: Array<RedisArgument>,
options?: CommandOptions
Expand Down
95 changes: 66 additions & 29 deletions packages/client/lib/client/pool.ts
Expand Up @@ -4,7 +4,7 @@ import RedisClient, { RedisClientType, RedisClientOptions, RedisClientExtensions
import { EventEmitter } from 'node:events';
import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-list';
import { TimeoutError } from '../errors';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { BasicCommandParser, CommandParser, attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { CommandOptions } from './commands-queue';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';

Expand Down Expand Up @@ -60,51 +60,83 @@ export class RedisClientPool<
> extends EventEmitter {
static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);

return async function (this: ProxyPool, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
if (command.parseCommand) {
const parser = this._self.#newCommandParser(resp);
command.parseCommand(parser, ...args);

return this.execute(client => client.executeCommand(undefined, parser, this._commandOptions, transformReply))
} else {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
}
};
}

static #createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);

return async function (this: NamespaceProxyPool, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this._self.sendCommand(redisArgs, this._self._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
if (command.parseCommand) {
const parser = this._self.#newCommandParser(resp);
command.parseCommand(parser, ...args);

return this._self.execute(client => client.executeCommand(undefined, parser, this._self._commandOptions, transformReply))
} else {
const redisArgs = command.transformArguments(...args),
reply = await this._self.sendCommand(redisArgs, this._self._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
}
};
}

static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp);
const prefix = functionArgumentsPrefix(name, fn);
const transformReply = getTransformReply(fn, resp);

return async function (this: NamespaceProxyPool, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args),
reply = await this._self.sendCommand(
prefix.concat(fnArgs),
this._self._commandOptions
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
if (fn.parseCommand) {
const parser = this._self.#newCommandParser(resp);
fn.parseCommand(parser, ...args);

return this._self.execute(client => client.executeCommand(prefix, parser, this._self._commandOptions, transformReply))
} else {
const fnArgs = fn.transformArguments(...args),
reply = await this._self.sendCommand(
prefix.concat(fnArgs),
this._self._commandOptions
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
}
};
}

static #createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp);
const prefix = scriptArgumentsPrefix(script);
const transformReply = getTransformReply(script, resp);

return async function (this: ProxyPool, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this.executeScript(script, redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
if (script.parseCommand) {
const parser = this._self.#newCommandParser(resp);
script.parseCommand(parser, ...args);

return this.execute(client => client.executeCommand(prefix, parser, this._commandOptions, transformReply))
} else {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this.executeScript(script, redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
}
};
}

Expand Down Expand Up @@ -207,6 +239,11 @@ export class RedisClientPool<
return this._self.#isClosing;
}

// FIXME: would choose parser here (i.e. if caching or not)
#newCommandParser(resp: RespVersions): CommandParser {
return new BasicCommandParser(resp);
}

/**
* You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`},
* {@link RedisClientPool.fromClient `RedisClientPool.fromClient`},
Expand Down