From 4817f013a491a054a469a0141449755473b26a82 Mon Sep 17 00:00:00 2001 From: Lenz Weber Date: Wed, 17 Apr 2019 20:53:01 +0200 Subject: [PATCH] feat: use worker-rpc library for inter-process communication (#246) * add worker-rpc for inter-process-communication (use signal-based messaging for minimal code changes) * use rpc calls instead of signaling --- package.json | 3 +- src/CancellationToken.ts | 2 +- src/RpcTypes.ts | 9 ++++ src/WorkResult.ts | 54 --------------------- src/cluster.ts | 83 ++++++++++++++++++-------------- src/index.ts | 27 +++++++++-- src/service.ts | 39 +++++++++------ test/unit/WorkResult.spec.js | 92 ------------------------------------ yarn.lock | 10 ++++ 9 files changed, 116 insertions(+), 203 deletions(-) create mode 100644 src/RpcTypes.ts delete mode 100644 src/WorkResult.ts delete mode 100644 test/unit/WorkResult.spec.js diff --git a/package.json b/package.json index 54307b48..27d2aedd 100644 --- a/package.json +++ b/package.json @@ -87,7 +87,8 @@ "micromatch": "^3.1.10", "minimatch": "^3.0.4", "semver": "^5.6.0", - "tapable": "^1.0.0" + "tapable": "^1.0.0", + "worker-rpc": "^0.1.0" }, "husky": { "hooks": { diff --git a/src/CancellationToken.ts b/src/CancellationToken.ts index 9e84d264..8593c654 100644 --- a/src/CancellationToken.ts +++ b/src/CancellationToken.ts @@ -7,7 +7,7 @@ import * as ts from 'typescript'; // Imported for types alone import { FsHelper } from './FsHelper'; -interface CancellationTokenData { +export interface CancellationTokenData { isCancelled: boolean; cancellationFileName: string; } diff --git a/src/RpcTypes.ts b/src/RpcTypes.ts new file mode 100644 index 00000000..86848a66 --- /dev/null +++ b/src/RpcTypes.ts @@ -0,0 +1,9 @@ +import { CancellationTokenData } from './CancellationToken'; +import { Message } from './Message'; + +export const RUN = 'run'; +export type RunPayload = CancellationTokenData; +export type RunResult = + | Message + // when run was cancelled via CancellationToken, undefined is returned + | undefined; diff --git a/src/WorkResult.ts b/src/WorkResult.ts deleted file mode 100644 index 81794ba4..00000000 --- a/src/WorkResult.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { Message } from './Message'; - -export class WorkResult { - private workResult: { - [key: string]: Message; - } = {}; - - constructor(private workDomain: number[]) {} - - public supports(workName: number) { - return this.workDomain.includes(workName); - } - - public set(workName: number, result: Message) { - if (!this.supports(workName)) { - throw new Error( - 'Cannot set result - work "' + workName + '" is not supported.' - ); - } - - this.workResult[workName] = result; - } - - public has(workName: number) { - return this.supports(workName) && undefined !== this.workResult[workName]; - } - - public get(workName: number) { - if (!this.supports(workName)) { - throw new Error( - 'Cannot get result - work "' + workName + '" is not supported.' - ); - } - - return this.workResult[workName]; - } - - public hasAll() { - return this.workDomain.every(key => this.has(key)); - } - - public clear() { - this.workResult = {}; - } - - public reduce( - reducer: (m1: Message, m2: Message) => Message, - initial: Message - ) { - return this.workDomain.reduce((reduced, workName) => { - return reducer(reduced, this.workResult[workName]); - }, initial); - } -} diff --git a/src/cluster.ts b/src/cluster.ts index bc2026d7..51c8b07e 100644 --- a/src/cluster.ts +++ b/src/cluster.ts @@ -1,10 +1,11 @@ import * as childProcess from 'child_process'; import * as path from 'path'; import * as process from 'process'; +import { RpcProvider } from 'worker-rpc'; -import { WorkResult } from './WorkResult'; import { NormalizedMessage } from './NormalizedMessage'; import { Message } from './Message'; +import { RunPayload, RunResult, RUN } from './RpcTypes'; // fork workers... const division = parseInt(process.env.WORK_DIVISION || '', 10); @@ -20,12 +21,20 @@ for (let num = 0; num < division; num++) { ); } -const pids = workers.map(worker => worker.pid); -const result = new WorkResult(pids); +// communication with parent process +const parentRpc = new RpcProvider(message => { + try { + process.send!(message); + } catch (e) { + // channel closed... + process.exit(); + } +}); +process.on('message', message => parentRpc.dispatch(message)); -process.on('message', (message: Message) => { - // broadcast message to all workers - workers.forEach(worker => { +// communication with worker processes +const workerRpcs = workers.map(worker => { + const rpc = new RpcProvider(message => { try { worker.send(message); } catch (e) { @@ -33,41 +42,43 @@ process.on('message', (message: Message) => { process.exit(); } }); - - // clear previous result set - result.clear(); + worker.on('message', message => rpc.dispatch(message)); + return rpc; }); -// listen to all workers -workers.forEach(worker => { - worker.on('message', (message: Message) => { - // set result from worker - result.set(worker.pid, { - diagnostics: message.diagnostics.map(NormalizedMessage.createFromJSON), - lints: message.lints.map(NormalizedMessage.createFromJSON) - }); +parentRpc.registerRpcHandler(RUN, async message => { + const workerResults = await Promise.all( + workerRpcs.map(workerRpc => + workerRpc.rpc(RUN, message) + ) + ); - // if we have result from all workers, send merged - if (result.hasAll()) { - const merged: Message = result.reduce( - (innerMerged: Message, innerResult: Message) => ({ - diagnostics: innerMerged.diagnostics.concat(innerResult.diagnostics), - lints: innerMerged.lints.concat(innerResult.lints) - }), - { diagnostics: [], lints: [] } - ); + function workerFinished( + workerResult: (Message | undefined)[] + ): workerResult is Message[] { + return workerResult.every(result => typeof result !== 'undefined'); + } - merged.diagnostics = NormalizedMessage.deduplicate(merged.diagnostics); - merged.lints = NormalizedMessage.deduplicate(merged.lints); + if (!workerFinished(workerResults)) { + return undefined; + } - try { - process.send!(merged); - } catch (e) { - // channel closed... - process.exit(); - } - } - }); + const merged: Message = workerResults.reduce( + (innerMerged: Message, innerResult: Message) => ({ + diagnostics: innerMerged.diagnostics.concat( + innerResult.diagnostics.map(NormalizedMessage.createFromJSON) + ), + lints: innerMerged.lints.concat( + innerResult.lints.map(NormalizedMessage.createFromJSON) + ) + }), + { diagnostics: [], lints: [] } + ); + + merged.diagnostics = NormalizedMessage.deduplicate(merged.diagnostics); + merged.lints = NormalizedMessage.deduplicate(merged.lints); + + return merged; }); process.on('SIGINT', () => { diff --git a/src/index.ts b/src/index.ts index 9c734321..3d0774ea 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,7 @@ import * as path from 'path'; import * as process from 'process'; import * as childProcess from 'child_process'; +import { RpcProvider } from 'worker-rpc'; import * as semver from 'semver'; import chalk, { Chalk } from 'chalk'; import * as micromatch from 'micromatch'; @@ -17,6 +18,7 @@ import { FsHelper } from './FsHelper'; import { Message } from './Message'; import { getForkTsCheckerWebpackPluginHooks, legacyHookMap } from './hooks'; +import { RunPayload, RunResult, RUN } from './RpcTypes'; const checkerPluginName = 'fork-ts-checker-webpack-plugin'; @@ -121,6 +123,7 @@ class ForkTsCheckerWebpackPlugin { private tslintVersion: string; private service?: childProcess.ChildProcess; + private serviceRpc?: RpcProvider; private vue: boolean; @@ -395,7 +398,14 @@ class ForkTsCheckerWebpackPlugin { if (this.measureTime) { this.startAt = this.performance.now(); } - this.service!.send(this.cancellationToken); + this.serviceRpc!.rpc( + RUN, + this.cancellationToken.toJSON() + ).then(result => { + if (result) { + this.handleServiceMessage(result); + } + }); } catch (error) { if (!this.silent && this.logger) { this.logger.error( @@ -440,7 +450,14 @@ class ForkTsCheckerWebpackPlugin { } try { - this.service!.send(this.cancellationToken); + this.serviceRpc!.rpc( + RUN, + this.cancellationToken.toJSON() + ).then(result => { + if (result) { + this.handleServiceMessage(result); + } + }); } catch (error) { if (!this.silent && this.logger) { this.logger.error( @@ -578,6 +595,8 @@ class ForkTsCheckerWebpackPlugin { stdio: ['inherit', 'inherit', 'inherit', 'ipc'] } ); + this.serviceRpc = new RpcProvider(message => this.service!.send(message)); + this.service.on('message', message => this.serviceRpc!.dispatch(message)); if ('hooks' in this.compiler) { // webpack 4+ @@ -630,9 +649,6 @@ class ForkTsCheckerWebpackPlugin { } } - this.service.on('message', (message: Message) => - this.handleServiceMessage(message) - ); this.service.on('exit', (code: string | number, signal: string) => this.handleServiceExit(code, signal) ); @@ -649,6 +665,7 @@ class ForkTsCheckerWebpackPlugin { this.service.kill(); this.service = undefined; + this.serviceRpc = undefined; } catch (e) { if (this.logger && !this.silent) { this.logger.error(e); diff --git a/src/service.ts b/src/service.ts index 89b2adfe..90c47306 100644 --- a/src/service.ts +++ b/src/service.ts @@ -10,6 +10,18 @@ import { makeCreateNormalizedMessageFromDiagnostic, makeCreateNormalizedMessageFromRuleFailure } from './NormalizedMessageFactories'; +import { RpcProvider } from 'worker-rpc'; +import { RunPayload, RunResult, RUN } from './RpcTypes'; + +const rpc = new RpcProvider(message => { + try { + process.send!(message); + } catch (e) { + // channel closed... + process.exit(); + } +}); +process.on('message', message => rpc.dispatch(message)); const typescript: typeof ts = require(process.env.TYPESCRIPT_PATH!); @@ -61,28 +73,27 @@ async function run(cancellationToken: CancellationToken) { } } catch (error) { if (error instanceof typescript.OperationCanceledException) { - return; + return undefined; } throw error; } - if (!cancellationToken.isCancellationRequested()) { - try { - process.send!({ - diagnostics, - lints - }); - } catch (e) { - // channel closed... - process.exit(); - } + if (cancellationToken.isCancellationRequested()) { + return undefined; } + + return { + diagnostics, + lints + }; } -process.on('message', message => { - run(CancellationToken.createFromJSON(typescript, message)); -}); +rpc.registerRpcHandler(RUN, message => + typeof message !== 'undefined' + ? run(CancellationToken.createFromJSON(typescript, message!)) + : undefined +); process.on('SIGINT', () => { process.exit(); diff --git a/test/unit/WorkResult.spec.js b/test/unit/WorkResult.spec.js deleted file mode 100644 index ccb08e52..00000000 --- a/test/unit/WorkResult.spec.js +++ /dev/null @@ -1,92 +0,0 @@ -var describe = require('mocha').describe; -var it = require('mocha').it; -var beforeEach = require('mocha').beforeEach; -var expect = require('chai').expect; -var sinon = require('sinon'); -var WorkResult = require('../../lib/WorkResult').WorkResult; - -describe('[UNIT] WorkResult', function() { - var result; - - beforeEach(function() { - result = new WorkResult([1, 2, 3]); - }); - - it('should allow result only from work domain', function() { - expect(result.supports(1)).to.be.true; - expect(result.supports(2)).to.be.true; - expect(result.supports(3)).to.be.true; - expect(result.supports(4)).to.be.false; - expect(result.supports('1')).to.be.false; - expect(result.supports('something else')).to.be.false; - }); - - it('should throw error if we want set or get result out of work domain', function() { - expect(function() { - result.set(1, 'abc'); - }).to.not.throw(); - expect(function() { - result.set(4, 'abc'); - }).to.throw(); - expect(function() { - result.get(1); - }).to.not.throw(); - expect(function() { - result.get(4); - }).to.throw(); - }); - - it('should set and get result', function() { - result.set(1, 'test'); - expect(result.has(1)).to.be.true; - expect(result.has(2)).to.be.false; - expect(result.get(1)).to.be.equal('test'); - expect(result.get(2)).to.be.undefined; - }); - - it('should check if we have all result', function() { - expect(result.hasAll()).to.be.false; - result.set(1, 'abc'); - expect(result.hasAll()).to.be.false; - result.set(3, 'xyz'); - expect(result.hasAll()).to.be.false; - result.set(1, 'efg'); - expect(result.hasAll()).to.be.false; - result.set(2, undefined); - expect(result.hasAll()).to.be.false; - result.set(2, 'foo'); - expect(result.hasAll()).to.be.true; - }); - - it('should clear work result', function() { - expect(function() { - result.clear(); - }).to.not.throw(); - result.set(1, 'test'); - result.clear(); - expect(result.get(1)).to.be.undefined; - result.set(1, 'a'); - result.set(2, 'b'); - result.set(3, 'c'); - result.clear(); - expect(result.hasAll()).to.be.false; - }); - - it('should reduce work result', function() { - result.set(2, 'c'); - var reducer = sinon.spy(function(reduced, current) { - return reduced.concat(current); - }); - - var reduced = result.reduce(reducer, []); - expect(reduced).to.be.a('array'); - expect(reduced).to.be.deep.equal([undefined, 'c', undefined]); - expect(reducer.callCount).to.be.equal(3); - expect(reducer.getCall(0).args[0]).to.be.deep.equal([]); - expect(reducer.getCall(0).args[1]).to.be.undefined; - expect(reducer.getCall(1).args[0]).to.be.deep.equal([undefined]); - expect(reducer.getCall(1).args[1]).to.be.equal('c'); - expect(reducer.getCall(2).args[0]).to.be.deep.equal([undefined, 'c']); - expect(reducer.getCall(2).args[1]).to.be.equal(undefined); - }); -}); diff --git a/yarn.lock b/yarn.lock index 13d5983e..5cfcc34b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2207,6 +2207,10 @@ merge-source-map@^1.1.0: dependencies: source-map "^0.6.1" +microevent.ts@~0.1.0: + version "0.1.0" + resolved "https://registry.yarnpkg.com/microevent.ts/-/microevent.ts-0.1.0.tgz#390748b8a515083e6b63cd5112a3f18c2fe0eba8" + micromatch@^3.1.10, micromatch@^3.1.4, micromatch@^3.1.8: version "3.1.10" resolved "https://registry.yarnpkg.com/micromatch/-/micromatch-3.1.10.tgz#70859bc95c9840952f359a068a3fc49f9ecfac23" @@ -3892,6 +3896,12 @@ worker-farm@^1.5.2: dependencies: errno "~0.1.7" +worker-rpc@^0.1.0: + version "0.1.0" + resolved "https://registry.yarnpkg.com/worker-rpc/-/worker-rpc-0.1.0.tgz#5f1258dca3d617cd18ca86587f8a05ac0eebd834" + dependencies: + microevent.ts "~0.1.0" + wrap-ansi@^3.0.1: version "3.0.1" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-3.0.1.tgz#288a04d87eda5c286e060dfe8f135ce8d007f8ba"