Skip to content

Commit

Permalink
POC of restructuring commands
Browse files Browse the repository at this point in the history
  • Loading branch information
sjpotter committed Feb 15, 2024
1 parent cc85112 commit 20daef4
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 37 deletions.
2 changes: 2 additions & 0 deletions packages/client/lib/RESP/types.ts
@@ -1,3 +1,4 @@
import { CommandParser } from '../commander';
import { BlobError, SimpleError } from '../errors';
import { RedisScriptConfig, SHA1 } from '../lua-script';
import { RESP_TYPES } from './decoder';
Expand Down Expand Up @@ -272,6 +273,7 @@ export type Command = {
*/
IS_FORWARD_COMMAND?: boolean;
// POLICIES?: CommandPolicies;
parseCommand?(this: void, parser: CommandParser, ...args: Array<any>): void;
transformArguments(this: void, ...args: Array<any>): CommandArguments;
TRANSFORM_LEGACY_REPLY?: boolean;
transformReply: TransformReply | Record<RespVersions, TransformReply>;
Expand Down
106 changes: 79 additions & 27 deletions packages/client/lib/client/index.ts
Expand Up @@ -2,7 +2,7 @@ import COMMANDS from '../commands';
import RedisSocket, { RedisSocketOptions } from './socket';
import RedisCommandsQueue, { CommandOptions } from './commands-queue';
import { EventEmitter } from 'node:events';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { BasicCommandParser, CachedCommandParser, CommandParser, attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchError } from '../errors';
import { URL } from 'node:url';
import { TcpSocketConnectOpts } from 'node:net';
Expand Down Expand Up @@ -150,52 +150,82 @@ export default class RedisClient<
> extends EventEmitter {
static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);

return async function (this: ProxyClient, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
if (command.parseCommand) {
const parser = this._self.#newCommandParser();
command.parseCommand(parser, ...args);

return this.executeCommand(undefined, parser, this._self._commandOptions);
} else {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
}
}

static #createModuleCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);

return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this._self.sendCommand(redisArgs, this._self._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
if (command.parseCommand) {
const parser = this._self.#newCommandParser();
command.parseCommand(parser, ...args);

return this._self.executeCommand(undefined, parser, this._self._commandOptions);
} else {
const redisArgs = command.transformArguments(...args),
reply = await this._self.sendCommand(redisArgs, this._self._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
}
};
}

static #createFunctionCommand(name: string, fn: RedisFunction, resp: RespVersions) {
const prefix = functionArgumentsPrefix(name, fn),
transformReply = getTransformReply(fn, resp);
return async function (this: NamespaceProxyClient, ...args: Array<unknown>) {
const fnArgs = fn.transformArguments(...args),
reply = await this._self.sendCommand(
prefix.concat(fnArgs),
this._self._commandOptions
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
if (fn.parseCommand) {
const parser = this._self.#newCommandParser();
fn.parseCommand(parser, ...args);

return this._self.executeCommand(prefix, parser, this._self._commandOptions);
} else {
const fnArgs = fn.transformArguments(...args),
reply = await this._self.sendCommand(
prefix.concat(fnArgs),
this._self._commandOptions
);
return transformReply ?
transformReply(reply, fnArgs.preserve) :
reply;
}
};
}

static #createScriptCommand(script: RedisScript, resp: RespVersions) {
const prefix = scriptArgumentsPrefix(script),
transformReply = getTransformReply(script, resp);
return async function (this: ProxyClient, ...args: Array<unknown>) {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this.executeScript(script, redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
};
if (script.parseCommand) {
const parser = this._self.#newCommandParser();
script.parseCommand(parser, ...args);

return this.executeCommand(prefix, parser, this._self._commandOptions);
} else {
const scriptArgs = script.transformArguments(...args),
redisArgs = prefix.concat(scriptArgs),
reply = await this.executeScript(script, redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, scriptArgs.preserve) :
reply;
};
}
}

static factory<
Expand Down Expand Up @@ -308,6 +338,11 @@ export default class RedisClient<
this._self.#dirtyWatch = msg;
}

// FIXME: would choose parser here (i.e. if caching or not)
#newCommandParser(): CommandParser {
return new BasicCommandParser();
}

constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
super();
this.#options = this.#initiateOptions(options);
Expand Down Expand Up @@ -572,6 +607,23 @@ export default class RedisClient<
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
}

async executeCommand<T = ReplyUnion>(
prefix: Array<string | Buffer> | undefined,
parser: CommandParser,
commandOptions: CommandOptions<TYPE_MAPPING> | undefined,
) {
const redisArgs = prefix ? prefix.concat(parser.redisArgs) : parser.redisArgs;
const fn = () => { return this.sendCommand(redisArgs, commandOptions) };

const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions;
if (parser instanceof CachedCommandParser && parser.cachable && defaultTypeMapping) {
// TODO: caching goes here.
} else {
const reply = await fn();
return parser.transformReply(reply);
}
}

sendCommand<T = ReplyUnion>(
args: Array<RedisArgument>,
options?: CommandOptions
Expand Down
24 changes: 18 additions & 6 deletions packages/client/lib/client/pool.ts
Expand Up @@ -4,7 +4,7 @@ import RedisClient, { RedisClientType, RedisClientOptions, RedisClientExtensions
import { EventEmitter } from 'node:events';
import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-list';
import { TimeoutError } from '../errors';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { BasicCommandParser, CommandParser, attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { CommandOptions } from './commands-queue';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';

Expand Down Expand Up @@ -61,11 +61,18 @@ export class RedisClientPool<
static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: ProxyPool, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
if (command.parseCommand) {
const parser = this._self.#newCommandParser();
command.parseCommand(parser, ...args);

return this._self.execute(client => client.executeCommand(undefined, parser, this._commandOptions))
} else {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
}
};
}

Expand Down Expand Up @@ -207,6 +214,11 @@ export class RedisClientPool<
return this._self.#isClosing;
}

// FIXME: would choose parser here (i.e. if caching or not)
#newCommandParser(): CommandParser {
return new BasicCommandParser();
}

/**
* You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`},
* {@link RedisClientPool.fromClient `RedisClientPool.fromClient`},
Expand Down
82 changes: 81 additions & 1 deletion packages/client/lib/commander.ts
@@ -1,4 +1,84 @@
import { Command, CommanderConfig, RedisCommands, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, RespVersions } from './RESP/types';
import { RedisArgument } from '..';
import { Command, CommanderConfig, RedisCommands, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, RespVersions, TransformReply } from './RESP/types';

export interface CommandParser {
redisArgs: Array<RedisArgument>;
transformReply: TransformReply

push: (arg: RedisArgument) => unknown;
pushKey: (key: RedisArgument) => unknown;
setTransformReply: (transformReply: TransformReply) => unknown;
setCachable: () => unknown;
}

abstract class AbstractCommandParser implements CommandParser {
#redisArgs: Array<RedisArgument> = [];
#transformReply: TransformReply = (reply) => { return reply }

get redisArgs() {
return this.#redisArgs;
}

get transformReply() {
return this.#transformReply;
}

push(arg: RedisArgument) {
this.#redisArgs.push(arg);

};

pushKey(key: RedisArgument) {
this.#redisArgs.push(key);
};

setTransformReply(transformReply: TransformReply) {
this.#transformReply = transformReply;
}

setCachable() {};
}

export class BasicCommandParser extends AbstractCommandParser {};

export class CachedCommandParser extends AbstractCommandParser {
keys: Array<RedisArgument> = [];
#cachable = false;
get cachable() {
return this.#cachable;
}

override pushKey(key: RedisArgument) {
this.keys.push(key)
super.pushKey(key);
}

override setCachable() {
this.#cachable = true;
}
}

export class ClusterCommandParser extends BasicCommandParser {
firstKey?: RedisArgument;

override pushKey(key: RedisArgument): void {
if (!this.firstKey) {
this.firstKey = key;
}
super.pushKey(key);
}
}

export class ClusterCachedCommandParser extends CachedCommandParser {
firstKey?: RedisArgument;

override pushKey(key: RedisArgument): void {
if (!this.firstKey) {
this.firstKey = key;
}
super.pushKey(key);
}
}

interface AttachConfigOptions<
M extends RedisModules,
Expand Down
9 changes: 7 additions & 2 deletions packages/client/lib/commands/GET.ts
@@ -1,10 +1,15 @@
import { RedisArgument, BlobStringReply, NullReply, Command } from '../RESP/types';
import { CommandParser } from '../commander';

export default {
FIRST_KEY_INDEX: 1,
IS_READ_ONLY: true,
transformArguments(key: RedisArgument) {
return ['GET', key];
parseCommand(parser: CommandParser, key: RedisArgument) {
parser.setCachable();
parser.setTransformReply(undefined as unknown as () => BlobStringReply | NullReply)
parser.push('GET');
parser.pushKey(key);
},
transformArguments: () => { return [] },
transformReply: undefined as unknown as () => BlobStringReply | NullReply
} as const satisfies Command;
2 changes: 1 addition & 1 deletion packages/test-utils/lib/index.ts
Expand Up @@ -229,7 +229,7 @@ export default class TestUtils {
it(title, async function () {
if (!dockerPromise) return this.skip();

const pool = createClientPool({
const pool = u({
...options.clientOptions,
socket: {
...options.clientOptions?.socket,
Expand Down

0 comments on commit 20daef4

Please sign in to comment.