Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add push message handler registration and make all pubsub use it. #2735

Draft
wants to merge 4 commits into
base: v5
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/client/lib/RESP/encoder.ts
Expand Up @@ -2,7 +2,7 @@ import { RedisArgument } from './types';

const CRLF = '\r\n';

export default function encodeCommand(args: Array<RedisArgument>): Array<RedisArgument> {
export default function encodeCommand(args: ReadonlyArray<RedisArgument>): Array<RedisArgument> {
const toWrite: Array<RedisArgument> = [];

let strings = '*' + args.length + CRLF;
Expand Down
94 changes: 72 additions & 22 deletions packages/client/lib/client/commands-queue.ts
@@ -1,8 +1,8 @@
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
import encodeCommand from '../RESP/encoder';
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types';
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types';
import { COMMANDS, ChannelListeners, PUBSUB_TYPE, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { AbortError, ErrorReply } from '../errors';
import { MonitorCallback } from '.';

Expand All @@ -17,7 +17,7 @@ export interface CommandOptions<T = TypeMapping> {
}

export interface CommandToWrite extends CommandWaitingForReply {
args: CommandArguments;
args: ReadonlyArray<RedisArgument>;
chainId: symbol | undefined;
abort: {
signal: AbortSignal;
Expand Down Expand Up @@ -51,6 +51,7 @@ export default class RedisCommandsQueue {
#chainInExecution: symbol | undefined;
readonly decoder;
readonly #pubSub = new PubSub();
readonly #pushHandlers: Map<string, Map<Symbol, (pushMsg: ReadonlyArray<any>) => unknown>> = new Map();

get isPubSubActive() {
return this.#pubSub.isActive;
Expand All @@ -64,6 +65,17 @@ export default class RedisCommandsQueue {
this.#respVersion = respVersion;
this.#maxLength = maxLength;
this.#onShardedChannelMoved = onShardedChannelMoved;

this.#addPushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), this.#pubSub.handleMessageReplyChannel.bind(this.#pubSub));
this.#addPushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.toString(), this.#handleStatusReply.bind(this));
this.#addPushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.toString(), this.#handleStatusReply.bind(this));
this.#addPushHandler(COMMANDS[PUBSUB_TYPE.PATTERNS].message.toString(), this.#pubSub.handleMessageReplyPattern.bind(this.#pubSub));
this.#addPushHandler(COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.toString(), this.#handleStatusReply.bind(this));
this.#addPushHandler(COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.toString(), this.#handleStatusReply.bind(this));
this.#addPushHandler(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub));
this.#addPushHandler(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this));
this.#addPushHandler(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this));

this.decoder = this.#initiateDecoder();
}

Expand All @@ -75,28 +87,68 @@ export default class RedisCommandsQueue {
this.#waitingForReply.shift()!.reject(err);
}

#onPush(push: Array<any>) {
// TODO: type
if (this.#pubSub.handleMessageReply(push)) return true;

const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push);
if (isShardedUnsubscribe && !this.#waitingForReply.length) {
#handleStatusReply(push: ReadonlyArray<any>) {
const head = this.#waitingForReply.head!.value;
if (
(Number.isNaN(head.channelsCounter!) && push[2] === 0) ||
--head.channelsCounter! === 0
) {
this.#waitingForReply.shift()!.resolve();
}
}

#handleShardedUnsubscribe(push: ReadonlyArray<any>) {
if (!this.#waitingForReply.length) {
const channel = push[1].toString();
this.#onShardedChannelMoved(
channel,
this.#pubSub.removeShardedListeners(channel)
);
return true;
} else if (isShardedUnsubscribe || PubSub.isStatusReply(push)) {
const head = this.#waitingForReply.head!.value;
if (
(Number.isNaN(head.channelsCounter!) && push[2] === 0) ||
--head.channelsCounter! === 0
) {
this.#waitingForReply.shift()!.resolve();
} else {
this.#handleStatusReply(push);
}
}

#addPushHandler(messageType: string, handler: (pushMsg: ReadonlyArray<any>) => unknown) {
let handlerMap = this.#pushHandlers.get(messageType);
if (handlerMap === undefined) {
handlerMap = new Map();
this.#pushHandlers.set(messageType, handlerMap);
}

const symbol = Symbol(messageType);
handlerMap.set(symbol, handler);

return symbol;
}

addPushHandler(messageType: string, handler: (pushMsg: ReadonlyArray<any>) => unknown) {
if (this.#respVersion !== 3) throw new Error("cannot add push handlers to resp2 clients")

return this.#addPushHandler(messageType, handler);
}

removePushHandler(symbol: Symbol) {
const handlers = this.#pushHandlers.get(symbol.description!);
if (handlers) {
handlers.delete(symbol);
if (handlers.size === 0) {
this.#pushHandlers.delete(symbol.description!);
}
}
}

#onPush(push: Array<any>) {
const handlers = this.#pushHandlers.get(push[0].toString());
if (handlers) {
for (const handler of handlers.values()) {
handler(push);
}

return true;
}

return false;
}

#getTypeMapping() {
Expand All @@ -108,16 +160,14 @@ export default class RedisCommandsQueue {
onReply: reply => this.#onReply(reply),
onErrorReply: err => this.#onErrorReply(err),
onPush: push => {
if (!this.#onPush(push)) {

}
return this.#onPush(push);
},
getTypeMapping: () => this.#getTypeMapping()
});
}

addCommand<T>(
args: CommandArguments,
args: ReadonlyArray<RedisArgument>,
options?: CommandOptions
): Promise<T> {
if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) {
Expand Down Expand Up @@ -346,7 +396,7 @@ export default class RedisCommandsQueue {
*commandsToWrite() {
let toSend = this.#toWrite.shift();
while (toSend) {
let encoded: CommandArguments;
let encoded: ReadonlyArray<RedisArgument>
try {
encoded = encodeCommand(toSend.args);
} catch (err) {
Expand Down
36 changes: 36 additions & 0 deletions packages/client/lib/client/index.spec.ts
Expand Up @@ -9,6 +9,7 @@ import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec'
import { RESP_TYPES } from '../RESP/decoder';
import { BlobStringReply, NumberReply } from '../RESP/types';
import { SortedSetMember } from '../commands/generic-transformers';
import { COMMANDS, PUBSUB_TYPE } from './pub-sub';

export const SQUARE_SCRIPT = defineScript({
SCRIPT:
Expand Down Expand Up @@ -769,4 +770,39 @@ describe('Client', () => {
}
}, GLOBAL.SERVERS.OPEN);
});

describe('Push Handlers', () => {
testUtils.testWithClient('RESP3: add/remove invalidate handler, and validate its called', async client => {
const key = 'x'

let nodeResolve;

const promise = new Promise((res) => {
nodeResolve = res;
});

const symbol = client.addPushHandler("invalidate", (push: ReadonlyArray<any>) => {
assert.equal(push[0].toString(), "invalidate");
assert.equal(push[1].length, 1);
assert.equal(push[1].length, 1);
assert.equal(push[1][0].toString(), key);
// this test removing the handler,
// as flushAll in cleanup of test will issue a full invalidate,
// which would fail if this handler is called on it
client.removePushHandler(symbol);
nodeResolve();
})

await client.sendCommand(['CLIENT', 'TRACKING', 'ON']);
await client.get(key);
await client.set(key, '1');

await promise;
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3
}
});
});
});
8 changes: 8 additions & 0 deletions packages/client/lib/client/index.ts
Expand Up @@ -573,6 +573,14 @@ export default class RedisClient<
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
}

addPushHandler(messageType: string, handler: (pushMsg: ReadonlyArray<any>) => unknown) {
return this._self.#queue.addPushHandler(messageType, handler);
}

removePushHandler(symbol: Symbol) {
this._self.#queue.removePushHandler(symbol);
}

sendCommand<T = ReplyUnion>(
args: Array<RedisArgument>,
options?: CommandOptions
Expand Down
43 changes: 26 additions & 17 deletions packages/client/lib/client/pub-sub.ts
Expand Up @@ -11,7 +11,7 @@ export type PUBSUB_TYPE = typeof PUBSUB_TYPE;

export type PubSubType = PUBSUB_TYPE[keyof PUBSUB_TYPE];

const COMMANDS = {
export const COMMANDS = {
[PUBSUB_TYPE.CHANNELS]: {
subscribe: Buffer.from('subscribe'),
unsubscribe: Buffer.from('unsubscribe'),
Expand Down Expand Up @@ -344,28 +344,37 @@ export class PubSub {
return commands;
}

handleMessageReplyChannel(push: ReadonlyArray<Buffer>) {
this.#emitPubSubMessage(
PUBSUB_TYPE.CHANNELS,
push[2],
push[1]
);
}

handleMessageReplyPattern(push: ReadonlyArray<Buffer>) {
this.#emitPubSubMessage(
PUBSUB_TYPE.PATTERNS,
push[3],
push[2],
push[1]
);
}

handleMessageReplySharded(push: ReadonlyArray<Buffer>) {
this.#emitPubSubMessage(
PUBSUB_TYPE.SHARDED,
push[2],
push[1]
);
}

handleMessageReply(reply: Array<Buffer>): boolean {
if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(reply[0])) {
this.#emitPubSubMessage(
PUBSUB_TYPE.CHANNELS,
reply[2],
reply[1]
);
return true;
} else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(reply[0])) {
this.#emitPubSubMessage(
PUBSUB_TYPE.PATTERNS,
reply[3],
reply[2],
reply[1]
);
return true;
} else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(reply[0])) {
this.#emitPubSubMessage(
PUBSUB_TYPE.SHARDED,
reply[2],
reply[1]
);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/client/lib/client/socket.ts
Expand Up @@ -271,7 +271,7 @@ export default class RedisSocket extends EventEmitter {
});
}

write(iterable: Iterable<Array<RedisArgument>>) {
write(iterable: Iterable<ReadonlyArray<RedisArgument>>) {
if (!this.#socket) return;

this.#socket.cork();
Expand Down