Skip to content

Commit

Permalink
POC of restructuring commands
Browse files Browse the repository at this point in the history
  • Loading branch information
sjpotter committed Feb 15, 2024
1 parent cc85112 commit f576fe2
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 17 deletions.
2 changes: 2 additions & 0 deletions packages/client/lib/RESP/types.ts
@@ -1,3 +1,4 @@
import { CommandParser } from '../commander';
import { BlobError, SimpleError } from '../errors';
import { RedisScriptConfig, SHA1 } from '../lua-script';
import { RESP_TYPES } from './decoder';
Expand Down Expand Up @@ -272,6 +273,7 @@ export type Command = {
*/
IS_FORWARD_COMMAND?: boolean;
// POLICIES?: CommandPolicies;
parseCommand?(this: void, parser: CommandParser, ...args: Array<any>): void;
transformArguments(this: void, ...args: Array<any>): CommandArguments;
TRANSFORM_LEGACY_REPLY?: boolean;
transformReply: TransformReply | Record<RespVersions, TransformReply>;
Expand Down
45 changes: 38 additions & 7 deletions packages/client/lib/client/index.ts
Expand Up @@ -2,7 +2,7 @@ import COMMANDS from '../commands';
import RedisSocket, { RedisSocketOptions } from './socket';
import RedisCommandsQueue, { CommandOptions } from './commands-queue';
import { EventEmitter } from 'node:events';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { BasicCommandParser, CachedCommandParser, CommandParser, attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchError } from '../errors';
import { URL } from 'node:url';
import { TcpSocketConnectOpts } from 'node:net';
Expand Down Expand Up @@ -151,12 +151,19 @@ export default class RedisClient<
static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: ProxyClient, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
if (command.parseCommand) {
const parser = this._self.#newCommandParser();
command.parseCommand!(parser, ...args);

return this.executeCommand(undefined, parser, this._self._commandOptions);
} else {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
};
}
}

static #createModuleCommand(command: Command, resp: RespVersions) {
Expand Down Expand Up @@ -308,6 +315,11 @@ export default class RedisClient<
this._self.#dirtyWatch = msg;
}

// FIXME: would choose parser here (i.e. if caching or not)
#newCommandParser(): CommandParser {
return new BasicCommandParser();
}

constructor(options?: RedisClientOptions<M, F, S, RESP, TYPE_MAPPING>) {
super();
this.#options = this.#initiateOptions(options);
Expand Down Expand Up @@ -572,6 +584,25 @@ export default class RedisClient<
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
}

async executeCommand<T = ReplyUnion>(
prefix: Array<string | Buffer> | undefined,
parser: CommandParser,
commandOptions: CommandOptions<TYPE_MAPPING> | undefined,
) {
if (parser instanceof BasicCommandParser) {
let redisArgs = parser.redisArgs;
if (prefix) {
redisArgs = prefix.concat(redisArgs);
}

const reply = await this.sendCommand(redisArgs, commandOptions)

return parser.transformReply(reply);
} else if (parser instanceof CachedCommandParser) {
// TODO
}
}

sendCommand<T = ReplyUnion>(
args: Array<RedisArgument>,
options?: CommandOptions
Expand Down
24 changes: 18 additions & 6 deletions packages/client/lib/client/pool.ts
Expand Up @@ -4,7 +4,7 @@ import RedisClient, { RedisClientType, RedisClientOptions, RedisClientExtensions
import { EventEmitter } from 'node:events';
import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-list';
import { TimeoutError } from '../errors';
import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { BasicCommandParser, CommandParser, attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander';
import { CommandOptions } from './commands-queue';
import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command';

Expand Down Expand Up @@ -61,11 +61,18 @@ export class RedisClientPool<
static #createCommand(command: Command, resp: RespVersions) {
const transformReply = getTransformReply(command, resp);
return async function (this: ProxyPool, ...args: Array<unknown>) {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
if (command.parseCommand) {
const parser = this._self.#newCommandParser();
command.parseCommand(parser, ...args);

return this._self.execute(client => client.executeCommand(undefined, parser, this._commandOptions))
} else {
const redisArgs = command.transformArguments(...args),
reply = await this.sendCommand(redisArgs, this._commandOptions);
return transformReply ?
transformReply(reply, redisArgs.preserve) :
reply;
}
};
}

Expand Down Expand Up @@ -207,6 +214,11 @@ export class RedisClientPool<
return this._self.#isClosing;
}

// FIXME: would choose parser here (i.e. if caching or not)
#newCommandParser(): CommandParser {
return new BasicCommandParser();
}

/**
* You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`},
* {@link RedisClientPool.fromClient `RedisClientPool.fromClient`},
Expand Down
77 changes: 76 additions & 1 deletion packages/client/lib/commander.ts
@@ -1,4 +1,79 @@
import { Command, CommanderConfig, RedisCommands, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, RespVersions } from './RESP/types';
import { RedisArgument } from '..';
import { Command, CommanderConfig, RedisCommands, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, RespVersions, TransformReply } from './RESP/types';

export interface CommandParser {
push: (arg: RedisArgument) => unknown;
pushKey: (key: RedisArgument) => unknown;
setTransformReply: (transformReply: TransformReply) => unknown;
isParsed: () => boolean;
setCachable: () => unknown;
}

abstract class AbstractCommandParser implements CommandParser {
parsed: boolean = false;
redisArgs: Array<RedisArgument> = [];
transformReply: TransformReply = (reply) => { return reply }

push(arg: RedisArgument) {
this.parsed = true;
this.redisArgs.push(arg);

};
pushKey(key: RedisArgument) {
this.redisArgs.push(key);
};

setTransformReply(transformReply: TransformReply) {
this.transformReply = transformReply;
}

isParsed() {
return this.parsed;
}

setCachable() {};
}

export class BasicCommandParser extends AbstractCommandParser {};

export class CachedCommandParser extends AbstractCommandParser {
keys: Array<RedisArgument> = [];
#cachable = false;
get cachable() {
return this.#cachable;
}

override pushKey(key: RedisArgument) {
this.keys.push(key)
super.pushKey(key);
}

override setCachable() {
this.#cachable = true;
}
}

export class ClusterCommandParser extends BasicCommandParser {
firstKey?: RedisArgument;

override pushKey(key: RedisArgument): void {
if (!this.firstKey) {
this.firstKey = key;
}
super.pushKey(key);
}
}

export class ClusterCachedCommandParser extends CachedCommandParser {
firstKey?: RedisArgument;

override pushKey(key: RedisArgument): void {
if (!this.firstKey) {
this.firstKey = key;
}
super.pushKey(key);
}
}

interface AttachConfigOptions<
M extends RedisModules,
Expand Down
9 changes: 7 additions & 2 deletions packages/client/lib/commands/GET.ts
@@ -1,10 +1,15 @@
import { RedisArgument, BlobStringReply, NullReply, Command } from '../RESP/types';
import { CommandParser } from '../commander';

export default {
FIRST_KEY_INDEX: 1,
IS_READ_ONLY: true,
transformArguments(key: RedisArgument) {
return ['GET', key];
parseCommand(parser: CommandParser, key: RedisArgument) {
parser.setCachable();
parser.setTransformReply(undefined as unknown as () => BlobStringReply | NullReply)
parser.push('GET');
parser.pushKey(key);
},
transformArguments: () => { return [] },
transformReply: undefined as unknown as () => BlobStringReply | NullReply
} as const satisfies Command;
2 changes: 1 addition & 1 deletion packages/test-utils/lib/index.ts
Expand Up @@ -229,7 +229,7 @@ export default class TestUtils {
it(title, async function () {
if (!dockerPromise) return this.skip();

const pool = createClientPool({
const pool = u({
...options.clientOptions,
socket: {
...options.clientOptions?.socket,
Expand Down

0 comments on commit f576fe2

Please sign in to comment.