diff --git a/etc/api/angular_devkit/core/node/_golden-api.d.ts b/etc/api/angular_devkit/core/node/_golden-api.d.ts index 285d1d188bbe..6059512644b1 100644 --- a/etc/api/angular_devkit/core/node/_golden-api.d.ts +++ b/etc/api/angular_devkit/core/node/_golden-api.d.ts @@ -39,6 +39,12 @@ export declare class NodeJsSyncHost implements virtualFs.Host { write(path: Path, content: virtualFs.FileBuffer): Observable; } +export declare class NodeModuleJobRegistry implements core_experimental.jobs.Registry { + constructor(_resolveLocal?: boolean, _resolveGlobal?: boolean); + protected _resolve(name: string): string | null; + get(name: core_experimental.jobs.JobName): Observable | null>; +} + export interface ProcessOutput { write(buffer: string | Buffer): boolean; } diff --git a/packages/angular_devkit/core/BUILD b/packages/angular_devkit/core/BUILD index 39faa08980ec..6d64bb664eaa 100644 --- a/packages/angular_devkit/core/BUILD +++ b/packages/angular_devkit/core/BUILD @@ -109,6 +109,7 @@ ts_library( deps = [ ":core", ":node", + "//tests/angular_devkit/core/node/jobs:jobs_test_lib", "@rxjs", "@rxjs//operators", "@npm//@types/node", diff --git a/packages/angular_devkit/core/node/_golden-api.ts b/packages/angular_devkit/core/node/_golden-api.ts index 1008bfa1c3c9..ca9054ed2649 100644 --- a/packages/angular_devkit/core/node/_golden-api.ts +++ b/packages/angular_devkit/core/node/_golden-api.ts @@ -5,6 +5,13 @@ * Use of this source code is governed by an MIT-style license that can be * found in the LICENSE file at https://angular.io/license */ + +// Start experimental namespace +// Start jobs namespace +export * from './experimental/job-registry'; +// End jobs namespace +// End experimental namespace + export * from './fs'; export * from './cli-logger'; export * from './host'; diff --git a/packages/angular_devkit/core/node/experimental/index.ts b/packages/angular_devkit/core/node/experimental/index.ts new file mode 100644 index 000000000000..c32982a08d12 --- /dev/null +++ b/packages/angular_devkit/core/node/experimental/index.ts @@ -0,0 +1,12 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +import * as jobs from './job-registry'; + +export { + jobs, +}; diff --git a/packages/angular_devkit/core/node/experimental/job-registry.ts b/packages/angular_devkit/core/node/experimental/job-registry.ts new file mode 100644 index 000000000000..266773dc527b --- /dev/null +++ b/packages/angular_devkit/core/node/experimental/job-registry.ts @@ -0,0 +1,81 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +import { Observable, of } from 'rxjs'; +import { JsonValue, experimental as core_experimental, schema } from '../../src'; +import { ModuleNotFoundException, resolve } from '../resolve'; + +export class NodeModuleJobRegistry implements core_experimental.jobs.Registry { + public constructor(private _resolveLocal = true, private _resolveGlobal = false) { + } + + protected _resolve(name: string): string | null { + try { + return resolve(name, { + checkLocal: this._resolveLocal, + checkGlobal: this._resolveGlobal, + basedir: __dirname, + }); + } catch (e) { + if (e instanceof ModuleNotFoundException) { + return null; + } + throw e; + } + } + + /** + * Get a job description for a named job. + * + * @param name The name of the job. + * @returns A description, or null if the job is not registered. + */ + get( + name: core_experimental.jobs.JobName, + ): Observable | null> { + const [moduleName, exportName] = name.split(/#/, 2); + + const resolvedPath = this._resolve(moduleName); + if (!resolvedPath) { + return of(null); + } + + const pkg = require(resolvedPath); + const handler = pkg[exportName || 'default']; + if (!handler) { + return of(null); + } + + // TODO: this should be unknown + // tslint:disable-next-line:no-any + function _getValue(...fields: any[]) { + return fields.find(x => schema.isJsonSchema(x)) || true; + } + + const argument = _getValue(pkg.argument, handler.argument); + const input = _getValue(pkg.input, handler.input); + const output = _getValue(pkg.output, handler.output); + const channels = _getValue(pkg.channels, handler.channels); + + return of(Object.assign(handler.bind(undefined), { + jobDescription: { + argument, + input, + output, + channels, + }, + })); + } +} diff --git a/packages/angular_devkit/core/node/experimental/job-registry_spec.ts b/packages/angular_devkit/core/node/experimental/job-registry_spec.ts new file mode 100644 index 000000000000..b951f830c685 --- /dev/null +++ b/packages/angular_devkit/core/node/experimental/job-registry_spec.ts @@ -0,0 +1,26 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +import * as path from 'path'; +import { experimental as core_experimental } from '../../src'; +import { NodeModuleJobRegistry } from './job-registry'; + +const root = path.join( + path.dirname(require.resolve(__filename)), + '../../../../../tests/angular_devkit/core/node/jobs', +); + + +describe('NodeModuleJobScheduler', () => { + it('works', async () => { + const registry = new NodeModuleJobRegistry(); + const scheduler = new core_experimental.jobs.SimpleScheduler(registry); + + const job = scheduler.schedule(path.join(root, 'add'), [1, 2, 3]); + expect(await job.output.toPromise()).toBe(6); + }); +}); diff --git a/packages/angular_devkit/core/node/index.ts b/packages/angular_devkit/core/node/index.ts index 22d6d35a531d..8b5f1d676693 100644 --- a/packages/angular_devkit/core/node/index.ts +++ b/packages/angular_devkit/core/node/index.ts @@ -5,11 +5,13 @@ * Use of this source code is governed by an MIT-style license that can be * found in the LICENSE file at https://angular.io/license */ +import * as experimental from './experimental/job-registry'; import * as fs from './fs'; export * from './cli-logger'; export * from './host'; export { ModuleNotFoundException, ResolveOptions, resolve } from './resolve'; export { + experimental, fs, }; diff --git a/packages/angular_devkit/core/src/experimental.ts b/packages/angular_devkit/core/src/experimental.ts index 5e3dcf689ed7..a2f8e102b86a 100644 --- a/packages/angular_devkit/core/src/experimental.ts +++ b/packages/angular_devkit/core/src/experimental.ts @@ -5,6 +5,10 @@ * Use of this source code is governed by an MIT-style license that can be * found in the LICENSE file at https://angular.io/license */ +import * as jobs from './experimental/jobs/index'; import * as workspace from './workspace/index'; -export { workspace }; +export { + jobs, + workspace, +}; diff --git a/packages/angular_devkit/core/src/experimental/jobs/README.md b/packages/angular_devkit/core/src/experimental/jobs/README.md new file mode 100644 index 000000000000..3989c30d0d38 --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/README.md @@ -0,0 +1,495 @@ +# Description + +Jobs is the Angular DevKit subsystem for scheduling and running generic functions with clearly +typed inputs and outputs. A `Job` instance is a function associated with metadata. You can +schedule a job, synchronize it with other jobs, and use it to schedule other jobs. + +The whole API is serializable, allowing you to use a Node Stream or message channel to +communicate between the job and the job scheduler. + +Jobs are lazy, cold, and guaranteed to execute exactly once when scheduled. Subscribing to a job +returns messages from the point where the job is at. + +## Argument, Input, Output and Channels +A job receives a single argument when scheduled and can also listen to an input channel. It can +emit multiple outputs, and can also provide multiple output channels that emit asynchronous JSON +messages, which can be typed. + +The I/O model is like that of an executable, where the argument corresponds to arguments on the +command line, the input channel to STDIN, the output channel to STDOUT, and the channels +would be additional output streams. + +In addition, a `Job` has a logging channel that can be used to log messages to the user. The +code that schedules the job must listen for or forward these messages. You can think of those +messages as STDERR. + +## LifeCycle +A `Job` goes through multiple LifeCycle messages before its completion; +1. `JobState.Queued`. The job was queued and is waiting. This is the default state from the + scheduler. +1. `JobState.Ready`. The job's dependencies (see + ["Synchronizing and Dependencies"](#Dependencies)) are done running, the argument is + validated, and the job is ready to execute. +1. `JobState.Started`. The argument has been validated, the job has been called and is running. + This is handled by the job itself (or `createJobHandler()`). +1. `JobState.Ended`. The job has ended and is done running. This is handled by the job itself (or + `createJobHandler()`). +1. `JobState.Errored`. A unrecoverable error happened. + +Each state (except `Queued`) corresponds to a `JobOutboundMessage` on the `outboundBus` observable +that triggers the state change. The `Scheduler` emits the `Ready` and `Errored` messages; the job +implementation should not emit them, and if it does they are filtered out. You can listen for +these messages or use the corresponding state member. + +The job implementation should emit the `Start` and `End` messages when it is starting the job logic +itself. Only the first `Start` and `End` messages will be forwarded. Any more will be filtered out. + +The `Queued` state is set as the job is scheduled, so there is no need to listen for the message. + +## `Job` Object +The `Job` object that is returned when you schedule a job provides access to the job's status and +utilities for tracking and modifying the job. + +1. `id`. A unique symbol that can be used as a Map key. +1. `description`. The description of the job from the scheduler. See `JobDescription` object. +1. `argument`. The argument value that was used to start the job. +1. `input`. An `Observer` that can be used to send validated inputs to the job itself. +1. `output`. An `Observable` that filters out messages to get only the returned output + of a job. +1. `promise`. A promise that waits for the last output of a job. Returns the last value outputted + (or no value if there's no last value). +1. `state`. The current state of the job (see `LifeCycle`). +1. `channels`. A map of side channels the user can listen to as `Observable`. +1. `ping()`. A function that can be used to ping the job, receiving a `Promise` for when the ping + is answered. +1. `stop()`. Sends a `stop` input to the job, which suggests to stop the job. The job itself can + choose to ignore this message. +1. `inboundBus`. The raw input `Observer`. This can be used to send messages to + the `context.inboundBus` observable in the job. These are `JobInboundMessage` messages. See + ["Communicating With Jobs"](#Communicating). +1. `outboundBus`. The raw output `Observable`. This can be used to listen to messages + from the job. See ["Communicating With Jobs"](#Communicating). + +## `JobHandlerContext` Object +The `JobHandlerContext<>` is passed to the job handler code in addition to its argument. The +context contains the following members: + +1. `description`. The description of the job. Its name and schemas. +1. `scheduler`. A `Scheduler<>` instance that can be used to create additional jobs. +1. `dependencies`. A generic list of other job instances that were run as dependencies when + scheduling this job. Their `id` is not guaranteed to match the `id` of the `Job<>` instance + itself (those `Job<>`s might just be proxies). The state of those `Job<>` is guaranteed to be + `JobState.Ended`, as `JobState.Errored` would have prevented this handler from running. +1. `inboundBus`. The raw input observable, complement of the `inboundBus` observer from the `Job<>`. + +# Examples + +An example of a job that adds all input together and return the output value. We use a +simple synchronous job registry and a simple job scheduler. + +```typescript +import { jobs } from '@angular-devkit/core'; + +const add = jobs.createJobHandle( + input => input.reduce((total, curr) => total + curr, 0), +); + +// Register the job in a SimpleJobRegistry. Different registries have different API. +const registry = new jobs.SimpleJobRegistry(); +const scheduler = new jobs.SimpleScheduler(registry); +registry.register(add, { + name: 'add', + input: { type: 'array', items: { type: 'number' } }, + output: { type: 'number' }, +}); + +scheduler.schedule('add', [1, 2, 3, 4]).promise + .then(output => console.log('1 + 2 + 3 + 4 is ' + output)); +``` + +# Creating Jobs + +A job is at its core a function with a description object attached to it. The description object +stores the JSON schemas used to validate the types of the argument passed in, the input and +output values. By default, a job accepts and can output any JSON object. + +```typescript +import { Observable } from 'rxjs'; +import { jobs } from '@angular-devkit/core'; + +const argument = { + type: 'array', items: { type: 'number' }, +}; +const output = { + type: 'number', +}; + +export function add(argument: number[]): Observable> { + return new Observable(o => { + o.next({ kind: jobs.JobOutboundMessageKind.Start }); + o.next({ + kind: jobs.JobOutboundMessageKind.Output, + output: argument.reduce((total, curr) => total + curr, 0), + }); + o.next({ kind: jobs.JobOutboundMessageKind.End }); + o.complete(); + }); +} + +// Add a property to `add` to make it officially a JobHandler. The Job system does not recognize +// any function as a JobHandler. +add.jobDescription = { + argument: argument, + output: output, +}; + +// Call the job with an array as argument, and log its output. +declare const scheduler: jobs.Scheduler; +scheduler.schedule('add', [1, 2, 3, 4]) + .output.subscribe(x => console.log(x)); // Will output 10. +``` + +This is a lot of boilerplate, so we made some helpers to improve readability and manage argument, +input and output automatically: + +```typescript +// Add is a JobHandler function, like the above. +export const add = jobs.createJobHandler( + argument => argument.reduce((total, curr) => total + curr, 0), +); + +// Schedule like above. +``` + +You can also return a Promise or an Observable, as jobs are asynchronous. This helper will set +start and end messages appropriately, and will pass in a logger. It will also manage channels +automatically (see below). + +A more complex job can be declared like this: + +```typescript +import { Observable } from 'rxjs'; +import { jobs } from '@angular-devkit/core'; + +// Show progress with each count in a separate output channel. Output "more" in a channel. +export const count = jobs.createJobHandler( + // Receive a context that contains additional methods to create channels. + (argument: number, { createChannel }) => new Observable(o => { + const side = createChannel('side', { type: 'string', const: 'more' }); + const progress = createChannel('progress', { type: 'number' }); + let i = 0; + function doCount() { + o.next(i++); + progress.next(i / argument); + side.next('more'); + + if (i < argument) { + setTimeout(doCount, 100); + } else { + o.complete(); + } + } + setTimeout(doCount, 100); + }), + { + argument: { type: 'number' }, + output: { type: 'number' }, + }, +); + +// Get a hold of a scheduler that refers to the job above. +declare const scheduler: jobs.Scheduler; + +const job = scheduler.schedule('count', 0); +job.getChannel('side').subscribe(x => console.log(x)); +// You can type a channel too. Messages will be filtered out. +job.getChannel('progress', { type: 'number' }).subscribe(x => console.log(x)); +``` + +## Communicating With Jobs +Jobs can be started and updated in a separate process or thread, and as such communication with a +job should avoid using global objects (which might not be shared). The jobs API and schedulers +provide 2 communication streams (one for input and the other for output), named `inboundBus` and +`outboundBus`. + +### Raw Input Stream +The `schedule()` function returns a `Job<>` interface that contains a `inboundBus` member of type +`Observer`. All messages sent _to_ the job goes through this stream. The `kind` +member of the `JobInboundMessage` interface dictates what kind of message it is sending: + +1. `JobInboundMessageKind.Ping`. A simple message that should be answered with + `JobOutboundMessageKind.Pong` when the job is responsive. The `id` field of the message should + be used when returning `Pong`. +1. `JobInboundMessageKind.Stop`. The job should be stopped. This is used when + cancelling/unsubscribing from the `output` (or by calling `stop()`). Any inputs or outputs + after this message will be ignored. +1. `JobInboundMessageKind.Input` is used when sending inputs to a job. These correspond to the + `next` methods of an `Observer` and are reported to the job through its `context.input` + Observable. There is no way to communicate an error to the job. + +Using the `createJobHandler()` helper, all those messages are automatically handled by the +boilerplate code. If you need direct access to raw inputs, you should subscribe to the +`context.inboundBus` Observable. + +### Raw Output Stream +The `Job<>` interface also contains a `outboundBus` member (of type +`Observable>` where `O` is the typed output of the job) which is the output +complement of `inboundBus`. All messages sent _from_ the job goes through this stream. The `kind` +member of the `JobOutboundMessage` interface dictates what kind of message it is sending: + +1. `JobOutboundMessageKind.Create`. The `Job<>` was created, its dependencies are done, and the + library is validating Argument and calling the internal job code. +1. `JobOutboundMessageKind.Start`. The job code itself should send that message when started. + `createJobHandler()` will do it automatically. +1. `JobOutboundMessageKind.End`. The job has ended. This is done by the job itself and should always + be sent when completed. The scheduler will listen to this message to set the state and unblock + dependent jobs. `createJobHandler()` automatically send this message. +1. `JobOutboundMessageKind.Pong`. The job should answer a `JobInboundMessageKind.Ping` message with + this. Automatically done by `createJobHandler()`. +1. `JobOutboundMessageKind.Log`. A logging message (side effect that should be shown to the user). +1. `JobOutboundMessageKind.Output`. An `Output` has been generated by the job. +1. `JobOutboundMessageKind.ChannelMessage`, `JobOutboundMessageKind.ChannelError` and + `JobOutboundMessageKind.ChannelComplete` are used for output channels. These correspond to the + `next`, `error` and `complete` methods of an `Observer` and are available to the callee through + the `job.channels` map of Observable. + +Those messages can be accessed directly through the `job.outboundBus` member. The job itself should +return an `Observable>`. The `createJobHandler()` helper handles most of use +cases of this and makes it easier for jobs to handle this. + +## Job Dispatchers +Dispatchers are a helper that redirect to different jobs given conditions. To create a job +dispatcher, use the `createDispatcher()` function: + +```typescript +import { jobs } from '@angular-devkit/core'; + +// A dispatcher that installs node modules given a user's preference. +const dispatcher = jobs.createDispatcher({ + name: 'node-install', + argument: { properties: { moduleName: { type: 'string' } } }, + output: { type: 'boolean' }, +}); + +const npmInstall = jobs.createJobHandler(/* ... */, { name: 'npm-install' }); +const yarnInstall = jobs.createJobHandler(/* ... */, { name: 'yarn-install' }); +const pnpmInstall = jobs.createJobHandler(/* ... */, { name: 'pnpm-install' }); + +declare const registry: jobs.SimpleJobRegistry; +registry.register(dispatcher); +registry.register(npmInstall); +registry.register(yarnInstall); +registry.register(pnpmInstall); + +// Default to npm. +dispatcher.setDefaultDelegate(npmInstall.name); +// If the user is asking for yarn over npm, uses it. +dispatcher.addConditionalDelegate(() => userWantsYarn, yarnInstall.name); +``` + +## Execution Strategy +Jobs are always run in parallel and will always start, but many helper functions are provided +when creating a job to help you control the execution strategy; + +1. `serialize()`. Multiple runs of this job will be queued with each others. +1. `memoize(replayMessages = false)` will create a job, or reuse the same job when inputs are +matching. If the inputs don't match, a new job will be started and its outputs will be stored. + +These strategies can be used when creating the job: + +```typescript +// Same input and output as above. + +export const add = jobs.strategy.memoize()( + jobs.createJobHandler( + argument => argument.reduce((total, curr) => total + curr, 0), + ), +); +``` + +Strategies can be reused to synchronize between jobs. For example, given jobs `jobA` and `jobB`, +you can reuse the strategy to serialize both jobs together; + +```typescript +const strategy = jobs.strategy.serialize(); +const jobA = strategy(jobs.createJobHandler(...)); +const jobB = strategy(jobs.createJobHandler(...)); +``` + +Even further, we can have package A and package B run in serialization, and B and C also be +serialized. Running A and C will run in parallel, while running B will wait for both A and C +to finish. + +```typescript +const strategy1 = jobs.strategy.serialize(); +const strategy2 = jobs.strategy.serialize(); +const jobA = strategy1(jobs.createJobHandler(...)); +const jobB = strategy1(strategy2(jobs.createJobHandler(...))); +const jobC = strategy2(jobs.createJobHandler(...)); +``` + +# Scheduling Jobs +Jobs can be scheduled using a `Scheduler` interface, which contains a `schedule()` method: + +```typescript +interface Scheduler { + /** + * Schedule a job to be run, using its name. + * @param name The name of job to be run. + * @param argument The argument to send to the job when starting it. + * @param options Scheduling options. + * @returns The Job being run. + */ + schedule( + name: JobName, + argument: I, + options?: ScheduleJobOptions, + ): Job; +} +``` + +The scheduler also has a `getDescription()` method to get a `JobDescription` object for a certain +name; that description contains schemas for the argument, input, output, and other channels: + +```typescript +interface Scheduler { + /** + * Get a job description for a named job. + * + * @param name The name of the job. + * @returns A description, or null if the job cannot be scheduled. + */ + getDescription(name: JobName): JobDescription | null; + + /** + * Returns true if the job name has been registered. + * @param name The name of the job. + * @returns True if the job exists, false otherwise. + */ + has(name: JobName): boolean; +} +``` + +Finally, the scheduler interface has a `pause()` method to stop scheduling. This will queue all +jobs and wait for the unpause function to be called before unblocking all the jobs scheduled. +This does not affect already running jobs. + +```typescript +interface Scheduler { + /** + * Pause the scheduler, temporary queueing _new_ jobs. Returns a resume function that should be + * used to resume execution. If multiple `pause()` were called, all their resume functions must + * be called before the Scheduler actually starts new jobs. Additional calls to the same resume + * function will have no effect. + * + * Jobs already running are NOT paused. This is pausing the scheduler only. + * + * @returns A function that can be run to resume the scheduler. If multiple `pause()` calls + * were made, all their return function must be called (in any order) before the + * scheduler can resume. + */ + pause(): () => void; +} +``` + +## Synchronizing and Dependencies +When scheduling jobs, it is often necessary to run jobs after certain other jobs are finished. +This is done through the `dependencies` options in the `schedule()` method. + +These jobs will also be passed to the job being scheduled, through its context. This can be +useful if, for example, the output of those jobs are of a known type, or have known side channels. + +An example of this would be a compiler that needs to know the output directory of other compilers +before it, in a tool chain. + +### Dependencies +When scheduling jobs, the user can add a `dependencies` field to the scheduling options. The +scheduler will wait for those dependencies to finish before running the job, and pass those jobs +in the context of the job. + +### Accessing Dependencies +Jobs are called with a `JobHandlerContext` as a second argument, which contains a +`dependencies: Job[]` member which contains all dependencies that were used when +scheduling the job. Those aren't fully typed as they are determined by the user, and not the job +itself. They also can contain jobs that are not finished, and the job should use the `state` +member of the job itself before trying to access its content. + +### Scheduler Sub Jobs +The `JobHandlerContext` also contains a `scheduler` member which can be used to schedule jobs +using the same scheduler that was used for the job. This allows jobs to call other jobs +and wait for them to end. + +## Available Schedulers +The Core Angular DevKit library provides 2 implementations for the `Scheduler` interface: + +## SimpleJobRegistry +Available in the jobs namespace. A registry that accept job registration, and can also schedule +jobs. + +```typescript +import { jobs } from '@angular-devkit/core'; + +const add = jobs.createJobHandler( + argument => argument.reduce((total, curr) => total + curr, 0), +); + +// Register the job in a SimpleJobRegistry. Different registries have different API. +const registry = new jobs.SimpleJobRegistry(); +const scheduler = new SimpleJobScheduler(registry); + +registry.register(add, { + name: 'add', + argument: { type: 'array', items: { type: 'number' } }, + output: { type: 'number' }, +}); + +scheduler.schedule('add', [1, 2, 3, 4]); +``` + +## NodeModuleJobRegistry +Available through `@angular-devkit/core/node`. + +A scheduler that loads jobs using their node package names. These jobs need to use the +`createJobHandler()` helper and report their argument/input/output schemas that way. + +```typescript +declare const registry: NodeModuleJobRegistry; +const scheduler = new SimpleJobScheduler(registry); + +scheduler.schedule('some-node-package#someExport', 'input'); +``` + +# Gotchas + +1. Deadlocking Dependencies + It is impossible to add dependencies to an already running job, but it is entirely possible to + get locked between jobs. Be aware of your own dependencies. + +1. Using `job.promise` + `job.promise` waits for the job to ends. Don't rely on it unless you know the job is not + watching and running for a long time. If you aren't sure, use + `job.output.pipe(first()).toPromise()` instead which will return the first next output, + regardless of whether the job watches and rerun or not. + + +# FAQ + +1. Laziness + A job is lazy until executed, but its messages will be replayed when resubscribed. + +1. Serialize Strategy vs Dependencies + Strategies are functions that transform the execution of a job, and can be used when + declaring the job, or registering it. Dependencies, on the other hand, are listed when + scheduling a job to order jobs during scheduling. + + A job has no control over the way it's scheduled, and its dependencies. It can, however, + declare that it shouldn't run at the same time as itself. Alternatively, a user could + schedule a job twice and imply that the second run should wait for the first to finish. In + practice, this would be equivalent to having the job be serialized, but the important detail + is in _whom_ is defining the rules; using the `serialize()` strategy, the job implementation + is, while when using dependencies, the user is. + + The user does not need to know how to job needs to synchronize with itself, and the job does + not need to know how it synchronizes with other jobs that it doesn't know about. That's part + of the strength of this system as every job can be developed in a vacuum, only caring about + its contracts (argument, input and output) and its own synchronization. diff --git a/packages/angular_devkit/core/src/experimental/jobs/api.ts b/packages/angular_devkit/core/src/experimental/jobs/api.ts new file mode 100644 index 000000000000..92f045e5369e --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/api.ts @@ -0,0 +1,478 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +import { Observable, Observer } from 'rxjs'; +import { JsonObject, JsonValue, schema } from '../../json/index'; +import { LogEntry, LoggerApi } from '../../logger/index'; +import { DeepReadonly } from '../../utils/index'; + +/** + * A job name is just a string (needs to be serializable). + */ +export type JobName = string; + + +/** + * The job handler function, which is a method that's executed for the job. + */ +export interface JobHandler< + ArgT extends JsonValue, + InputT extends JsonValue, + OutputT extends JsonValue, +> { + ( + argument: ArgT, + context: JobHandlerContext, + ): Observable>; + + jobDescription: Partial; +} + + +/** + * The context in which the job is run. + */ +export interface JobHandlerContext< + MinimumArgumentValueT extends JsonValue = JsonValue, + MinimumInputValueT extends JsonValue = JsonValue, + MinimumOutputValueT extends JsonValue = JsonValue, +> { + readonly description: JobDescription; + readonly scheduler: Scheduler; + + // In this context, JsonValue is comparable to `any`. + readonly dependencies: Job[]; + + readonly inboundBus: Observable>; +} + + +/** + * Metadata associated with a job. + */ +export interface JobDescription extends JsonObject { + readonly name: JobName; + + readonly argument: DeepReadonly; + readonly input: DeepReadonly; + readonly output: DeepReadonly; +} + +/** + * Messages that can be sent TO a job. The job needs to listen to those. + */ +export enum JobInboundMessageKind { + Ping = 'ip', + Stop = 'is', + + // Channel specific messages. + Input = 'in', + // Input channel does not allow completion / error. Erroring this will just close the Subject + // but not notify the job. +} + +/** Base interface for the all job inbound messages. */ +export interface JobInboundMessageBase extends JsonObject { + /** + * The kind of message this is. + */ + readonly kind: JobInboundMessageKind; +} + +/** + * A ping to the job. The job should reply with a pong as soon as possible. + */ +export interface JobInboundMessagePing extends JobInboundMessageBase { + readonly kind: JobInboundMessageKind.Ping; + + /** + * An ID that should be returned in the corresponding Pong. + */ + readonly id: number; +} + +/** + * Stop the job. This is handled by the job itself and jobs might not handle it. It will also + * unsubscribe from the Observable<>. + * This is equivalent to SIGTERM. + */ +export interface JobInboundMessageStop extends JobInboundMessageBase { + readonly kind: JobInboundMessageKind.Stop; +} + +/** + * A Job wants to send a message to a channel. This can be marshaled, and the Job object + * has helpers to transform this into an observable. The context also can create RxJS subjects that + * marshall messages through a channel. + */ +export interface JobInboundMessageInput extends JobInboundMessageBase { + readonly kind: JobInboundMessageKind.Input; + + /** + * The input being sent to the job. + */ + readonly value: InputT; +} + +export type JobInboundMessage = + JobInboundMessagePing + | JobInboundMessageStop + | JobInboundMessageInput + ; + +/** + * Kind of messages that can be outputted from a job. + */ +export enum JobOutboundMessageKind { + // Lifecycle specific messages. + OnReady = 'c', + Start = 's', + End = 'e', + Pong = 'p', + + // Feedback messages. + Log = 'l', + Output = 'o', + + // Channel specific messages. + ChannelCreate = 'cn', + ChannelMessage = 'cm', + ChannelError = 'ce', + ChannelComplete = 'cc', +} + +/** Base interface for the all job messages. */ +export interface JobOutboundMessageBase { + /** + * The job description. + */ + readonly description: JobDescription; + + /** + * The kind of message this is. + */ + readonly kind: JobOutboundMessageKind; +} + +/** + * The job has been created and will validate its input. + */ +export interface JobOutboundMessageOnReady extends JobOutboundMessageBase { + readonly kind: JobOutboundMessageKind.OnReady; +} + +/** + * The job started. This is done by the job itself. + */ +export interface JobOutboundMessageStart extends JobOutboundMessageBase { + readonly kind: JobOutboundMessageKind.Start; +} + +/** + * A logging message, supporting the logging.LogEntry. + */ +export interface JobOutboundMessageLog extends JobOutboundMessageBase { + readonly kind: JobOutboundMessageKind.Log; + readonly entry: LogEntry; +} + +/** + * An output value is available. + */ +export interface JobOutboundMessageOutput< + OutputT extends JsonValue, +> extends JobOutboundMessageBase { + readonly kind: JobOutboundMessageKind.Output; + + /** + * The message being outputted from the job. + */ + readonly value: OutputT; +} + + +/** + * Base interface for all job message related to channels. + */ +export interface JobOutboundMessageChannelBase extends JobOutboundMessageBase { + /** + * The name of the channel. + */ + readonly name: string; +} + +/** + * A job wants to send a message to a channel. This can be marshaled, and the Job object + * has helpers to transform this into an observable. The context also can create RxJS subjects that + * marshall messages through a channel. + */ +export interface JobOutboundMessageChannelMessage extends JobOutboundMessageChannelBase { + readonly kind: JobOutboundMessageKind.ChannelMessage; + + /** + * The message being sent to the channel. + */ + readonly message: JsonValue; +} + +/** + * A job wants to send an error to one of its channel. This is the equivalent of throwing through + * an Observable. The side channel will not receive any more messages after this, and will not + * complete. + */ +export interface JobOutboundMessageChannelError extends JobOutboundMessageChannelBase { + readonly kind: JobOutboundMessageKind.ChannelError; + + /** + * The error message being sent to the channel. + */ + readonly error: JsonValue; +} + +/** + * A job wants to create a new channel. + */ +export interface JobOutboundMessageChannelCreate extends JobOutboundMessageChannelBase { + readonly kind: JobOutboundMessageKind.ChannelCreate; +} + +/** + * A job wants to close the channel, as completed. This is done automatically when the job ends, + * or can be done from the job to close it. A closed channel might be reopened, but the user + * need to recall getChannel(). + */ +export interface JobOutboundMessageChannelComplete extends JobOutboundMessageChannelBase { + readonly kind: JobOutboundMessageKind.ChannelComplete; +} + +/** + * OnEnd of the job run. + */ +export interface JobOutboundMessageEnd extends JobOutboundMessageBase { + readonly kind: JobOutboundMessageKind.End; +} + +/** + * A pong response from a ping input. The id is the same as the one passed in. + */ +export interface JobOutboundMessagePong extends JobOutboundMessageBase { + readonly kind: JobOutboundMessageKind.Pong; + + /** + * The ID that was passed in the `Ping` messages. + */ + readonly id: number; +} + +/** + * Generic message type. + */ +export type JobOutboundMessage = + JobOutboundMessageOnReady + | JobOutboundMessageStart + | JobOutboundMessageLog + | JobOutboundMessageOutput + | JobOutboundMessageChannelCreate + | JobOutboundMessageChannelMessage + | JobOutboundMessageChannelError + | JobOutboundMessageChannelComplete + | JobOutboundMessageEnd + | JobOutboundMessagePong + ; + + +/** + * The state of a job. These are changed as the job reports a new state through its messages. + */ +export enum JobState { + /** + * The job was queued and is waiting to start. + */ + Queued = 'queued', + /** + * The job description was found, its dependencies (see "Synchronizing and Dependencies") + * are done running, and the job's argument is validated and the job's code will be executed. + */ + Ready = 'ready', + /** + * The job has been started. The job implementation is expected to send this as soon as its + * work is starting. + */ + Started = 'started', + /** + * The job has ended and is done running. + */ + Ended = 'ended', + /** + * An error occured and the job stopped because of internal state. + */ + Errored = 'errored', +} + + +/** + * A Job instance, returned from scheduling a job. A Job instance is _not_ serializable. + */ +export interface Job< + ArgumentT extends JsonValue = JsonValue, + InputT extends JsonValue = JsonValue, + OutputT extends JsonValue = JsonValue, +> { + /** + * Description of the job. Resolving the job's description can be done asynchronously, so this + * is an observable that will resolve when it's ready. + */ + readonly description: Observable; + + /** + * Argument sent when scheduling the job. This is a copy of the argument. + */ + readonly argument: ArgumentT; + + /** + * The input to the job. This goes through the input channel as messages. + */ + readonly input: Observer; + + /** + * Outputs of this job. + */ + readonly output: Observable; + + /** + * The current state of the job. + */ + readonly state: JobState; + + /** + * Get a channel that validates against the schema. Messages will be filtered by the schema. + * @param name The name of the channel. + * @param schema A schema to use to validate messages. + */ + getChannel(name: string, schema?: schema.JsonSchema): Observable; + + /** + * Pings the job and wait for the resulting Pong before completing. + */ + ping(): Observable; + + /** + * Stops the job from running. This is different than unsubscribing from the output as in it + * sends the JobInboundMessageKind.Stop raw input to the job. + */ + stop(): void; + + /** + * The JobInboundMessage messages TO the job. + */ + readonly inboundBus: Observer>; + + /** + * The JobOutboundMessage FROM the job. + */ + readonly outboundBus: Observable>; +} + +/** + * Options for scheduling jobs. + */ +export interface ScheduleJobOptions { + /** + * Where should logging be passed in. By default logging will be dropped. + */ + logger?: LoggerApi; + + /** + * Jobs that need to finish before scheduling this job. These dependencies will be passed + * to the job itself in its context. + */ + dependencies?: Job | Job[]; +} + +export interface Registry< + MinimumArgumentValueT extends JsonValue = JsonValue, + MinimumInputValueT extends JsonValue = JsonValue, + MinimumOutputValueT extends JsonValue = JsonValue, +> { + /** + * Get a job handler. + * @param name The name of the job to get a handler from. + */ + get< + A extends MinimumArgumentValueT, + I extends MinimumInputValueT, + O extends MinimumOutputValueT, + >(name: JobName): Observable | null>; +} + +/** + * An interface that can schedule jobs. + */ +export interface Scheduler< + MinimumArgumentValueT extends JsonValue = JsonValue, + MinimumInputValueT extends JsonValue = JsonValue, + MinimumOutputValueT extends JsonValue = JsonValue, +> { + /** + * Get a job description for a named job. + * + * @param name The name of the job. + * @returns A description, or null if no description is available for this job. + */ + getDescription(name: JobName): Observable; + + /** + * Returns true if the job name has been registered. + * @param name The name of the job. + * @returns True if the job exists, false otherwise. + */ + has(name: JobName): Observable; + + /** + * Pause the scheduler, temporary queueing _new_ jobs. Returns a resume function that should be + * used to resume execution. If multiple `pause()` were called, all their resume functions must + * be called before the Scheduler actually starts new jobs. Additional calls to the same resume + * function will have no effect. + * + * Jobs already running are NOT paused. This is pausing the scheduler only. + * + * @returns A function that can be run to resume the scheduler. If multiple `pause()` calls + * were made, all their return function must be called (in any order) before the + * scheduler can resume. + */ + pause(): () => void; + + /** + * Schedule a job to be run, using its name. + * @param name The name of job to be run. + * @param argument The argument to send to the job when starting it. + * @param options Scheduling options. + * @returns The job being run. + */ + schedule< + A extends MinimumArgumentValueT, + I extends MinimumInputValueT, + O extends MinimumOutputValueT, + >( + name: JobName, + argument: A, + options?: ScheduleJobOptions, + ): Job; +} + + +export function isJobHandler< + A extends JsonValue, + I extends JsonValue, + O extends JsonValue, +// TODO: this should be unknown +// tslint:disable-next-line:no-any +>(value: any): value is JobHandler { + return typeof value == 'function' + && typeof value.jobDescription == 'object' + && value.jobDescription !== null; +} diff --git a/packages/angular_devkit/core/src/experimental/jobs/architecture.md b/packages/angular_devkit/core/src/experimental/jobs/architecture.md new file mode 100644 index 000000000000..216339f167dd --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/architecture.md @@ -0,0 +1,257 @@ + +# Overview +Jobs is a high-order API that adds inputs, runtime type checking, sequencing, and other +functionality on top of RxJS' `Observable`s. + +# Background +An `Observable` (at a higher level) is a function that receives a `Subscriber`, and outputs +multiple values, and finishes once it calls the `Subscriber.prototype.complete()` method (in +JavaScript): + +```javascript +const output1To10EverySecond = function (subscriber) { + let t = 0; + const i = setInterval(() => { + t++; + subscriber.next(t); + if (t === 10) { + subscriber.complete(t); + } + }, 1000); + return () => clearInterval(i); +}; + +const stream$ = new Observable(output1To10EverySecond); +// Start the function, and output 1 to 100, once per line. +stream$.subscribe(x => console.log(x)); +``` + +This, of course, can be typed in TypeScript, but those types are not enforced at runtime. + +# Glossary +- `job handler`. The function that implements the job's logic. +- `raw input`. The input observable sending messages to the job. These messages are of type + `JobInboundMessage`. +- `raw output`. The output observer returned from the `job handler`. Messages on this observable + are of type `JobOutboundMessage`. + +# Description + +A `JobHandler`, similar to observables, is a function that receives an argument and a context, and +returns an `Observable` of messages, which can include outputs that are typed at runtime (using a +Json Schema): + +```javascript +const output1ToXEverySecond = function (x, context) { + return new Observable(subscriber => { + let t = 0; + + // Notify our users that the actual work is started. + subscriber.next({ kind: JobOutboundMessageKind.Start }); + const i = setInterval(() => { + t++; + subscriber.next({ kind: JobOutboundMessageKind.Output, value: t }); + if (t === x) { + subscriber.next({ kind: JobOutboundMessageKind.End }); + subscriber.complete(); + } + }, 1000); + + return () => { + clearInterval(i); + }; + }) +}; + +// For now, jobs can not be called without a registry and scheduler. +const registry = new SimpleJobRegistry(); +registry.register('output-from-1-to-x', output1ToXEverySecond, { + argument: { type: 'number' }, + output: { type: 'number' }, +}); +const scheduler = new SimpleScheduler(registry); + +// Need to keep the same name that the registry would understand. +// Count from 1 to 10. +const job = scheduler.schedule('output-from-1-to-x', 10); + +// A Job<> instance has more members, but we only want the output values here. +job.output.subscribe(x => console.log(x)); +``` + +This seems like a lot of boilerplate in comparison, but there are a few advantages; + +1. lifecycle. Jobs can tell when they start doing work and when work is done. +1. everything is typed, even at runtime. +1. the context also contains an input Observable that receives typed input messages, including + input values, and stop requests. +1. jobs can also schedule other jobs and wait for them, even if they don't know if a job is + implemented in the system. + +## Diagram +A simpler way to think about jobs in contrast to observables is that job are closer to a Unix +process. It has an argument (command line flags), receive inputs (STDIN and interupt signals), +and output values (STDOUT) as well as diagnostic (STDERR). They can be plugged one into another +(piping), and can be transformed, synchronized and scheduled (fork, exec, cron). + +```plain +- given A the type of the argument +- given I the type of the input +- given O the type of the output + + ,______________________ + JobInboundMessage --> | handler(argument: A) | --> JobOutboundMessage + `⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻⎻ - JobOutboundMessageKind.Log + - JobOutboundMessageKind.Output + - ... +``` + +`JobInboundMessage` includes: + +1. `JobInboundMessageKind.Ping`. A simple message that should be answered with + `JobOutboundMessageKind.Pong` when the job is responsive. The `id` field of the message should + be used when returning `Pong`. +1. `JobInboundMessageKind.Stop`. The job should be stopped. This is used when + cancelling/unsubscribing from the `output` (or by calling `stop()`). Any inputs or outputs + after this message will be ignored. +1. `JobInboundMessageKind.Input` is used when sending inputs to a job. These correspond to the + `next` methods of an `Observer` and are reported to the job through its `context.input` + Observable. There is no way to communicate an error to the job. + +`JobOutboundMessage` includes: + +1. `JobOutboundMessageKind.Ready`. The `Job<>` was created, its dependencies are done, and the + library is validating Argument and calling the internal job code. +1. `JobOutboundMessageKind.Start`. The job code itself should send that message when started. + `createJobHandler()` will do it automatically. +1. `JobOutboundMessageKind.End`. The job has ended. This is done by the job itself and should + always be sent when completed. The scheduler will listen to this message to set the state and + unblock dependent jobs. `createJobHandler()` automatically send this message. +1. `JobOutboundMessageKind.Pong`. The job should answer a `JobInboundMessageKind.Ping` message with + this. Automatically done by `createJobHandler()`. +1. `JobOutboundMessageKind.Log`. A logging message (side effect that should be shown to the user). +1. `JobOutboundMessageKind.Output`. An `Output` has been generated by the job. +1. `JobOutboundMessageKind.ChannelMessage`, `JobOutboundMessageKind.ChannelError` and + `JobOutboundMessageKind.ChannelComplete` are used for output channels. These correspond to + the `next`, `error` and `complete` methods of an `Observer` and are available to the callee + through the `job.channels` map of Observable. + +Utilities should have some filtering and dispatching to separate observables, as a convenience for +the user. An example of this would be the `Job.prototype.output` observable which only contains +the value contained by messages of type `JobOutboundMessageKind.Output`. + +# Higher Order Jobs +Because jobs are expected to be pure functions, they can be composed or transformed to create +more complex behaviour, similar to how RxJS operators can transform observables. + +```javascript +// Runs a job on the hour, every hour, regardless of how long the job takes. +// This creates a job function that can be registered by itself. +function scheduleJobOnTheHour(jobFunction) { + return function(argument, context) { + return new Observable(observer => { + let timeout = 0; + + function _timeoutToNextHour() { + // Just wait until the next hour. + const t = new Date(); + const secondsToNextHour = 3600 - t.getSeconds() - t.getMinutes() * 60; + timeout = setTimeout(_scheduleJobAndWaitAnHour, secondsToNextHour); + } + + function _scheduleJobAndWaitAnHour() { + jobFunction(argument, context).subscribe( + message => observer.next(message), + error => observer.error(error), + // Do not forward completion, but use it to schedule the next job run. + () => { + _timeoutToNextHour(); + }, + ); + } + + // Kick off by waiting for next hour. + _timeoutToNextHour(); + + return () => clearTimeout(timeout); + }); + }; +} +``` + +Another way to compose jobs is to schedule jobs based on their name, from other jobs. + +```javascript +// Runs a job on the hour, every hour, regardless of how long the job takes. +// This creates a high order job by getting a job name and an argument, and scheduling the job +// every hour. +function scheduleJobOnTheHour(job, context) { + const { name, argument } = job; // Destructure our input. + + return new Observable(observer => { + let timeout = 0; + + function _timeoutToNextHour() { + // Just wait until the next hour. + const t = new Date(); + const secondsToNextHour = 3600 - t.getSeconds() - t.getMinutes() * 60; + timeout = setTimeout(_scheduleJobAndWaitAnHour, secondsToNextHour); + } + + function _scheduleJobAndWaitAnHour() { + const subJob = context.scheduler.schedule(name, argument); + // We do not forward the input to the sub-job but that would be a valid example as well. + subJob.outboundBus.subscribe( + message => observer.next(message), + error => observer.error(error), + // Do not forward completion, but use it to schedule the next job run. + () => { + _timeoutToNextHour(); + }, + ); + } + + // Kick off by waiting for next hour. + _timeoutToNextHour(); + + return () => clearTimeout(timeout); + }); +} + +const registry = new SimpleJobRegistry(); +registry.register('schedule-job-on-the-hour', scheduleJobOnTheHour, { + argument: { + properties: { + name: { type: 'string' }, + argument: { type: true }, + }, + }, +}); + +// Implementation left to the reader. +registry.register('copy-files-from-a-to-b', require('some-package/copy-job')); + +const scheduler = new SimpleScheduler(registry); + +// A rudimentary backup system. +const job = scheduler.schedule('schedule-job-on-the-hour', { + name: 'copy-files-from-a-to-b', + argument: { + from: '/some-directory/to/backup', + to: '/volumes/usb-key', + }, +}); +job.output.subscribe(x => console.log(x)); +``` + +# Limitations +Jobs input, output and argument must be serializable to JSONs. This is a big limitation in usage, +but comes with the benefit that jobs can be serialized and called across memory boundaries. An +example would be an operator that takes a module path and run the job from that path in a separate +process. Or even a separate server, using HTTP calls. + +Another limitation is that the boilerplate is complex. Manually managing start/end life cycle, and +other messages such as logging, etc. is tedious and requires a lot of code. A good way to keep +this limitation under control is to provide helpers to create `JobHandler`s which manage those +messages for the developer. A simple handler could be to get a `Promise` and return the output of +that promise automatically. diff --git a/packages/angular_devkit/core/src/experimental/jobs/create-job-handler.ts b/packages/angular_devkit/core/src/experimental/jobs/create-job-handler.ts new file mode 100644 index 000000000000..688644230896 --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/create-job-handler.ts @@ -0,0 +1,221 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + * + */ +import { Observable, Observer, Subject, Subscription, from, isObservable } from 'rxjs'; +import { switchMap, tap } from 'rxjs/operators'; +import { BaseException } from '../../exception/index'; +import { JsonValue } from '../../json/index'; +import { Logger, LoggerApi } from '../../logger/index'; +import { isPromise } from '../../utils/index'; +import { + JobDescription, + JobHandler, + JobHandlerContext, + JobInboundMessageKind, + JobOutboundMessage, + JobOutboundMessageKind, +} from './api'; + + +export class ChannelAlreadyExistException extends BaseException { + constructor(name: string) { + super(`Channel ${JSON.stringify(name)} already exist.`); + } +} + +/** + * Interface for the JobHandler context that is used when using `createJobHandler()`. It extends + * the basic `JobHandlerContext` with additional functionality. + */ +export interface SimpleJobHandlerContext< + A extends JsonValue, + I extends JsonValue, + O extends JsonValue, +> extends JobHandlerContext { + logger: LoggerApi; + createChannel: (name: string) => Observer; + input: Observable; +} + + +/** + * A simple version of the JobHandler. This simplifies a lot of the interaction with the job + * scheduler and registry. For example, instead of returning a JobOutboundMessage observable, you + * can directly return an output. + */ +export type SimpleJobHandlerFn = ( + input: A, + context: SimpleJobHandlerContext, +) => O | Promise | Observable; + + +/** + * Make a simple job handler that sets start and end from a function that's synchronous. + * + * @param fn The function to create a handler for. + * @param options An optional set of properties to set on the handler. Some fields might be + * required by registry or schedulers. + */ +export function createJobHandler( + fn: SimpleJobHandlerFn, + options: Partial = {}, +): JobHandler { + const handler = (argument: A, context: JobHandlerContext) => { + const description = context.description; + const inboundBus = context.inboundBus; + const inputChannel = new Subject(); + let subscription: Subscription; + + return new Observable>(subject => { + // Handle input. + inboundBus.subscribe(message => { + switch (message.kind) { + case JobInboundMessageKind.Ping: + subject.next({ kind: JobOutboundMessageKind.Pong, description, id: message.id }); + break; + + case JobInboundMessageKind.Stop: + // There's no way to cancel a promise or a synchronous function, but we do cancel + // observables where possible. + if (subscription) { + subscription.unsubscribe(); + } + subject.next({ kind: JobOutboundMessageKind.End, description }); + subject.complete(); + // Close all channels. + channels.forEach(x => x.complete()); + break; + + case JobInboundMessageKind.Input: + inputChannel.next(message.value); + break; + } + }); + + // Configure a logger to pass in as additional context. + const logger = new Logger('job'); + logger.subscribe(entry => { + subject.next({ + kind: JobOutboundMessageKind.Log, + description, + entry, + }); + }); + + // Execute the function with the additional context. + subject.next({ kind: JobOutboundMessageKind.Start, description }); + + const channels = new Map>(); + + const newContext = { + ...context, + input: inputChannel.asObservable(), + logger, + createChannel(name: string) { + if (channels.has(name)) { + throw new ChannelAlreadyExistException(name); + } + const channelSubject = new Subject(); + channelSubject.subscribe( + message => { + subject.next({ + kind: JobOutboundMessageKind.ChannelMessage, description, name, message, + }); + }, + error => { + subject.next({ kind: JobOutboundMessageKind.ChannelError, description, name, error }); + // This can be reopened. + channels.delete(name); + }, + () => { + subject.next({ kind: JobOutboundMessageKind.ChannelComplete, description, name }); + // This can be reopened. + channels.delete(name); + }, + ); + + channels.set(name, channelSubject); + + return channelSubject; + }, + }; + + const result = fn(argument, newContext); + // If the result is a promise, simply wait for it to complete before reporting the result. + if (isPromise(result)) { + result.then(result => { + subject.next({ kind: JobOutboundMessageKind.Output, description, value: result }); + subject.next({ kind: JobOutboundMessageKind.End, description }); + subject.complete(); + }, err => subject.error(err)); + } else if (isObservable(result)) { + subscription = (result as Observable).subscribe( + (value: O) => subject.next({ kind: JobOutboundMessageKind.Output, description, value }), + error => subject.error(error), + () => { + subject.next({ kind: JobOutboundMessageKind.End, description }); + subject.complete(); + }, + ); + + return subscription; + } else { + // If it's a scalar value, report it synchronously. + subject.next({ kind: JobOutboundMessageKind.Output, description, value: result as O }); + subject.next({ kind: JobOutboundMessageKind.End, description }); + subject.complete(); + } + }); + }; + + return Object.assign(handler, { jobDescription: options }); +} + + +/** + * Lazily create a job using a function. + * @param loader A factory function that returns a promise/observable of a JobHandler. + * @param options Same options as createJob. + */ +export function createJobFactory( + loader: () => Promise>, + options: Partial, +): JobHandler { + const handler = (argument: A, context: JobHandlerContext) => { + return from(loader()) + .pipe(switchMap(fn => fn(argument, context))); + }; + + return Object.assign(handler, { jobDescription: options }); +} + + +/** + * Creates a job that logs out input/output messages of another Job. The messages are still + * propagated to the other job. + */ +export function createLoggerJob( + job: JobHandler, + logger: LoggerApi, +): JobHandler { + const handler = (argument: A, context: JobHandlerContext) => { + context.inboundBus.pipe( + tap(message => logger.info(`Input: ${JSON.stringify(message)}`)), + ).subscribe(); + + return job(argument, context).pipe( + tap( + message => logger.info(`Message: ${JSON.stringify(message)}`), + error => logger.warn(`Error: ${JSON.stringify(error)}`), + () => logger.info(`Completed`), + ), + ); + }; + + return Object.assign(handler, job); +} diff --git a/packages/angular_devkit/core/src/experimental/jobs/dispatcher.ts b/packages/angular_devkit/core/src/experimental/jobs/dispatcher.ts new file mode 100644 index 000000000000..fc3841042d82 --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/dispatcher.ts @@ -0,0 +1,89 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + * + */ +import { JsonValue } from '../../json/index'; +import { Readwrite } from '../../utils/index'; +import { + Job, + JobDescription, + JobHandler, + JobHandlerContext, + JobName, + isJobHandler, +} from './api'; +import { JobDoesNotExistException } from './exception'; + +/** + * A JobDispatcher can be used to dispatch between multiple jobs. + */ +export interface JobDispatcher< + A extends JsonValue, + I extends JsonValue, + O extends JsonValue, +> extends JobHandler { + /** + * Set the default job if all conditionals failed. + * @param name The default name if all conditions are false. + */ + setDefaultJob(name: JobName | null | JobHandler): void; + + /** + * Add a conditional job that will be selected if the input fits a predicate. + * @param predicate + * @param name + */ + addConditionalJob(predicate: (args: A) => boolean, name: string): void; +} + + +/** + * OnReady a dispatcher that can dispatch to a sub job, depending on conditions. + * @param options + */ +export function createDispatcher< + A extends JsonValue, + I extends JsonValue, + O extends JsonValue, +>( + options: Partial> = {}, +): JobDispatcher { + let defaultDelegate: JobName | null = null; + const conditionalDelegateList: [(args: A) => boolean, JobName][] = []; + + const job: JobHandler = Object.assign((argument: A, context: JobHandlerContext) => { + const maybeDelegate = conditionalDelegateList.find(([predicate]) => predicate(argument)); + let delegate: Job | null = null; + + if (maybeDelegate) { + delegate = context.scheduler.schedule(maybeDelegate[1], argument); + } else if (defaultDelegate) { + delegate = context.scheduler.schedule(defaultDelegate, argument); + } else { + throw new JobDoesNotExistException(''); + } + + context.inboundBus.subscribe(delegate.inboundBus); + + return delegate.outboundBus; + }, { + jobDescription: options, + }); + + return Object.assign(job, { + setDefaultJob(name: JobName | null | JobHandler) { + if (isJobHandler(name)) { + name = name.jobDescription.name === undefined ? null : name.jobDescription.name; + } + + defaultDelegate = name; + }, + addConditionalJob(predicate: (args: A) => boolean, name: JobName) { + conditionalDelegateList.push([predicate, name]); + }, + }); +} diff --git a/packages/angular_devkit/core/src/experimental/jobs/dispatcher_spec.ts b/packages/angular_devkit/core/src/experimental/jobs/dispatcher_spec.ts new file mode 100644 index 000000000000..d7cabe13a3da --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/dispatcher_spec.ts @@ -0,0 +1,38 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +import { createJobHandler } from './create-job-handler'; +import { createDispatcher } from './dispatcher'; +import { SimpleJobRegistry } from './simple-registry'; +import { SimpleScheduler } from './simple-scheduler'; + +describe('createDispatcher', () => { + it('works', async () => { + const registry = new SimpleJobRegistry(); + const scheduler = new SimpleScheduler(registry); + + const dispatcher = createDispatcher({ + name: 'add', + argument: { items: { type: 'number' } }, + output: { type: 'number' }, + }); + const add0 = createJobHandler((input: number[]) => input.reduce((a, c) => a + c, 0), { + name: 'add0', + }); + const add100 = createJobHandler((input: number[]) => input.reduce((a, c) => a + c, 100), { + name: 'add100', + }); + + registry.register(dispatcher); + registry.register(add0); + registry.register(add100); + + dispatcher.setDefaultJob(add0); + const sum = scheduler.schedule('add', [1, 2, 3, 4]); + expect(await sum.output.toPromise()).toBe(10); + }); +}); diff --git a/packages/angular_devkit/core/src/experimental/jobs/exception.ts b/packages/angular_devkit/core/src/experimental/jobs/exception.ts new file mode 100644 index 000000000000..0fcb761a978f --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/exception.ts @@ -0,0 +1,21 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +import { BaseException } from '../../exception/index'; +import { JobName } from './api'; + +export class JobNameAlreadyRegisteredException extends BaseException { + constructor(name: JobName) { + super(`Job named ${JSON.stringify(name)} already exists.`); + } +} + +export class JobDoesNotExistException extends BaseException { + constructor(name: JobName) { + super(`Job name ${JSON.stringify(name)} does not exist.`); + } +} diff --git a/packages/angular_devkit/core/src/experimental/jobs/index.ts b/packages/angular_devkit/core/src/experimental/jobs/index.ts new file mode 100644 index 000000000000..f4cb091fc57a --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/index.ts @@ -0,0 +1,14 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +export * from './api'; +export * from './create-job-handler'; +export * from './exception'; +export * from './dispatcher'; +export * from './simple-registry'; +export * from './simple-scheduler'; +export * from './strategy'; diff --git a/packages/angular_devkit/core/src/experimental/jobs/simple-registry.ts b/packages/angular_devkit/core/src/experimental/jobs/simple-registry.ts new file mode 100644 index 000000000000..b9a1b475427a --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/simple-registry.ts @@ -0,0 +1,144 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +import { Observable, of } from 'rxjs'; +import { JsonValue, schema } from '../../json'; +import { JobDescription, JobHandler, JobName, Registry, isJobHandler } from './api'; +import { JobNameAlreadyRegisteredException } from './exception'; + + +/** + * SimpleJobRegistry job registration options. + */ +export interface RegisterJobOptions extends Partial {} + +/** + * A simple job registry that keep a map of JobName => JobHandler internally. + */ +export class SimpleJobRegistry< + MinimumArgumentValueT extends JsonValue = JsonValue, + MinimumInputValueT extends JsonValue = JsonValue, + MinimumOutputValueT extends JsonValue = JsonValue, +> implements Registry { + private _jobNames = new Map< + JobName, + JobHandler + >(); + + get< + A extends MinimumArgumentValueT = MinimumArgumentValueT, + I extends MinimumInputValueT = MinimumInputValueT, + O extends MinimumOutputValueT = MinimumOutputValueT, + >(name: JobName): Observable | null> { + return of(this._jobNames.get(name) as (JobHandler | null) || null); + } + + /** + * Register a job handler. The name must be unique. + * + * @param name The name of the job. + * @param handler The function that will be called for the job. + * @param options An optional list of options to override the handler. {@see RegisterJobOptions} + */ + register< + A extends MinimumArgumentValueT, + I extends MinimumInputValueT, + O extends MinimumOutputValueT, + >( + name: JobName, + handler: JobHandler, + options?: RegisterJobOptions, + ): void; + + /** + * Register a job handler. The name must be unique. + * + * @param handler The function that will be called for the job. + * @param options An optional list of options to override the handler. {@see RegisterJobOptions} + */ + register( + handler: JobHandler, + // This version MUST contain a name. + options?: RegisterJobOptions & { name: string }, + ): void; + + register( + nameOrHandler: JobName | JobHandler, + handlerOrOptions: JobHandler | RegisterJobOptions = {}, + options: RegisterJobOptions = {}, + ): void { + // Switch on the arguments. + if (typeof nameOrHandler == 'string') { + if (!isJobHandler(handlerOrOptions)) { + // This is an error. + throw new TypeError('Expected a JobHandler as second argument.'); + } + + this._register(nameOrHandler, handlerOrOptions, options); + } else if (isJobHandler(nameOrHandler)) { + if (typeof handlerOrOptions !== 'object') { + // This is an error. + throw new TypeError('Expected an object options as second argument.'); + } + + const name = options.name || nameOrHandler.jobDescription.name || handlerOrOptions.name; + if (name === undefined) { + throw new TypeError('Expected name to be a string.'); + } + + this._register(name, nameOrHandler, options); + } else { + throw new TypeError('Unrecognized arguments.'); + } + } + + protected _register< + ArgumentT extends JsonValue, + InputT extends JsonValue, + OutputT extends JsonValue, + >( + name: JobName, + handler: JobHandler, + options: RegisterJobOptions, + ): void { + if (this._jobNames.has(name)) { + // We shouldn't allow conflicts. + throw new JobNameAlreadyRegisteredException(name); + } + + // Merge all fields with the ones in the handler (to make sure we respect the handler). + const argument = schema.mergeSchemas( + handler.jobDescription.argument, + options.argument, + ); + const input = schema.mergeSchemas( + handler.jobDescription.input, + options.input, + ); + const output = schema.mergeSchemas( + handler.jobDescription.output, + options.output, + ); + + // Create the job description. + const jobDescription: JobDescription = { + name, + argument, + output, + input, + }; + + this._jobNames.set(name, Object.assign(handler.bind(undefined), { jobDescription })); + } + + /** + * Returns the job names of all jobs. + */ + getJobNames(): JobName[] { + return [...this._jobNames.keys()]; + } +} diff --git a/packages/angular_devkit/core/src/experimental/jobs/simple-registry_spec.ts b/packages/angular_devkit/core/src/experimental/jobs/simple-registry_spec.ts new file mode 100644 index 000000000000..770a14682357 --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/simple-registry_spec.ts @@ -0,0 +1,34 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +// tslint:disable:no-big-function no-non-null-assertion +import { EMPTY, Observable, of, timer } from 'rxjs'; +import { map, take, toArray } from 'rxjs/operators'; +import { JobHandlerContext, JobOutboundMessage, JobOutboundMessageKind, JobState } from './api'; +import { createJobHandler } from './create-job-handler'; +import { SimpleJobRegistry } from './simple-registry'; +import { SimpleScheduler } from './simple-scheduler'; + +describe('SimpleJobRegistry', () => { + let registry: SimpleJobRegistry; + + beforeEach(() => { + registry = new SimpleJobRegistry(); + }); + + it('works for a simple case', async () => { + registry.register( + 'add', createJobHandler((arg: number[]) => arg.reduce((a, c) => a + c, 0)), { + argument: { items: { type: 'number' } }, + output: { type: 'number' }, + }, + ); + + expect(await registry.get('add').toPromise()).not.toBeNull(); + expect(await registry.get('add2').toPromise()).toBeNull(); + }); +}); diff --git a/packages/angular_devkit/core/src/experimental/jobs/simple-scheduler.ts b/packages/angular_devkit/core/src/experimental/jobs/simple-scheduler.ts new file mode 100644 index 000000000000..05b216171509 --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/simple-scheduler.ts @@ -0,0 +1,545 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +import { + EMPTY, + MonoTypeOperatorFunction, + Observable, + Observer, + Subject, + Subscription, + concat, + from, + merge, + of, +} from 'rxjs'; +import { + concatMap, + filter, + first, + ignoreElements, + map, + shareReplay, + switchMap, + tap, +} from 'rxjs/operators'; +import { JsonValue, schema } from '../../json'; +import { NullLogger } from '../../logger'; +import { + Job, + JobDescription, + JobHandler, + JobInboundMessage, + JobInboundMessageKind, + JobName, + JobOutboundMessage, + JobOutboundMessageKind, + JobOutboundMessageOutput, + JobState, + Registry, + ScheduleJobOptions, + Scheduler, +} from './api'; +import { JobDoesNotExistException } from './exception'; + + +export class JobInboundMessageSchemaValidationError extends schema.SchemaValidationException { + constructor(errors?: schema.SchemaValidatorError[]) { + super(errors, 'Job Inbound Message failed to validate. Errors: '); + } +} +export class JobOutputSchemaValidationError extends schema.SchemaValidationException { + constructor(errors?: schema.SchemaValidatorError[]) { + super(errors, 'Job Output failed to validate. Errors: '); + } +} + + +interface JobHandlerWithExtra extends JobHandler { + jobDescription: JobDescription; + + argumentV: Observable; + outputV: Observable; + inputV: Observable; +} + + +function _jobShare(): MonoTypeOperatorFunction { + // This is the same code as a `shareReplay()` operator, but uses a dumber Subject rather than a + // ReplaySubject. + return (source: Observable): Observable => { + let refCount = 0; + let subject: Subject; + let hasError = false; + let isComplete = false; + let subscription: Subscription; + + return new Observable(subscriber => { + let innerSub: Subscription; + refCount++; + if (!subject) { + subject = new Subject(); + + innerSub = subject.subscribe(subscriber); + subscription = source.subscribe({ + next(value) { subject.next(value); }, + error(err) { + hasError = true; + subject.error(err); + }, + complete() { + isComplete = true; + subject.complete(); + }, + }); + } else { + innerSub = subject.subscribe(subscriber); + } + + return () => { + refCount--; + innerSub.unsubscribe(); + if (subscription && refCount === 0 && (isComplete || hasError)) { + subscription.unsubscribe(); + } + }; + }); + }; +} + + +/** + * Simple scheduler. Should be the base of all registries and schedulers. + */ +export class SimpleScheduler< + MinimumArgumentT extends JsonValue = JsonValue, + MinimumInputT extends JsonValue = JsonValue, + MinimumOutputT extends JsonValue = JsonValue, +> implements Scheduler { + private _internalJobDescriptionMap = new Map(); + private _queue: (() => void)[] = []; + private _pauseCounter = 0; + + constructor( + protected _jobRegistry: Registry, + protected _schemaRegistry: schema.SchemaRegistry = new schema.CoreSchemaRegistry(), + ) {} + + private _getInternalDescription(name: JobName): Observable { + const maybeHandler = this._internalJobDescriptionMap.get(name); + if (maybeHandler !== undefined) { + return of(maybeHandler); + } + + const handler = this._jobRegistry.get(name); + + return handler.pipe( + switchMap(handler => { + if (handler === null) { + return of(null); + } + + const description: JobDescription = { + name, + argument: handler.jobDescription.argument || true, + input: handler.jobDescription.input || true, + output: handler.jobDescription.output || true, + channels: handler.jobDescription.channels || {}, + }; + + const handlerWithExtra = Object.assign(handler.bind(undefined), { + jobDescription: description, + argumentV: this._schemaRegistry.compile(description.argument).pipe(shareReplay(1)), + inputV: this._schemaRegistry.compile(description.input).pipe(shareReplay(1)), + outputV: this._schemaRegistry.compile(description.output).pipe(shareReplay(1)), + }); + this._internalJobDescriptionMap.set(name, handlerWithExtra); + + return of(handlerWithExtra); + }), + ); + } + + /** + * Get a job description for a named job. + * + * @param name The name of the job. + * @returns A description, or null if the job is not registered. + */ + getDescription(name: JobName) { + return concat( + this._getInternalDescription(name).pipe(map(x => x && x.jobDescription)), + of(null), + ).pipe( + first(), + ); + } + + /** + * Returns true if the job name has been registered. + * @param name The name of the job. + * @returns True if the job exists, false otherwise. + */ + has(name: JobName) { + return this.getDescription(name).pipe( + map(x => x !== null), + ); + } + + /** + * Pause the scheduler, temporary queueing _new_ jobs. Returns a resume function that should be + * used to resume execution. If multiple `pause()` were called, all their resume functions must + * be called before the Scheduler actually starts new jobs. Additional calls to the same resume + * function will have no effect. + * + * Jobs already running are NOT paused. This is pausing the scheduler only. + */ + pause() { + let called = false; + this._pauseCounter++; + + return () => { + if (!called) { + called = true; + if (--this._pauseCounter == 0) { + // Resume the queue. + const q = this._queue; + this._queue = []; + q.forEach(fn => fn()); + } + } + }; + } + + /** + * Schedule a job to be run, using its name. + * @param name The name of job to be run. + * @param argument The argument to send to the job when starting it. + * @param options Scheduling options. + * @returns The Job being run. + */ + schedule( + name: JobName, + argument: A, + options?: ScheduleJobOptions, + ): Job { + if (this._pauseCounter > 0) { + const waitable = new Subject(); + this._queue.push(() => waitable.complete()); + + return this._scheduleJob(name, argument, options || {}, waitable); + } + + return this._scheduleJob(name, argument, options || {}, EMPTY); + } + + /** + * Filter messages. + * @private + */ + private _filterJobOutboundMessages( + message: JobOutboundMessage, + state: JobState, + ) { + switch (message.kind) { + case JobOutboundMessageKind.OnReady: + return state == JobState.Queued; + case JobOutboundMessageKind.Start: + return state == JobState.Ready; + + case JobOutboundMessageKind.End: + return state == JobState.Started || state == JobState.Ready; + } + + return true; + } + + /** + * Return a new state. This is just to simplify the reading of the _createJob method. + * @private + */ + private _updateState( + message: JobOutboundMessage, + state: JobState, + ): JobState { + switch (message.kind) { + case JobOutboundMessageKind.OnReady: + return JobState.Ready; + case JobOutboundMessageKind.Start: + return JobState.Started; + case JobOutboundMessageKind.End: + return JobState.Ended; + } + + return state; + } + + /** + * Create the job. + * @private + */ + private _createJob( + name: JobName, + argument: A, + handler: Observable, + inboundBus: Observer>, + outboundBus: Observable>, + options: ScheduleJobOptions, + ): Job { + const schemaRegistry = this._schemaRegistry; + + const channelsSubject = new Map>(); + const channels = new Map>(); + + let state = JobState.Queued; + let pingId = 0; + + const logger = options.logger ? options.logger.createChild('job') : new NullLogger(); + + // Create the input channel by having a filter. + const input = new Subject(); + input.pipe( + switchMap(message => handler.pipe( + switchMap(handler => { + if (handler === null) { + throw new JobDoesNotExistException(name); + } else { + return handler.inputV.pipe( + switchMap(validate => validate(message)), + ); + } + }), + )), + filter(result => result.success), + map(result => result.data as I), + ).subscribe( + value => inboundBus.next({ kind: JobInboundMessageKind.Input, value }), + ); + + outboundBus = concat( + outboundBus, + // Add an End message at completion. This will be filtered out if the job actually send an + // End. + handler.pipe(switchMap(handler => { + if (handler) { + return of>({ + kind: JobOutboundMessageKind.End, description: handler.jobDescription, + }); + } else { + return EMPTY as Observable>; + } + })), + ).pipe( + filter(message => this._filterJobOutboundMessages(message, state)), + // Update internal logic and Job<> members. + tap(message => { + // Update the state. + state = this._updateState(message, state); + + switch (message.kind) { + case JobOutboundMessageKind.Log: + logger.next(message.entry); + break; + + case JobOutboundMessageKind.ChannelCreate: { + const maybeSubject = channelsSubject.get(message.name); + // If it doesn't exist or it's closed on the other end. + if (!maybeSubject) { + const s = new Subject(); + channelsSubject.set(message.name, s); + channels.set(message.name, s.asObservable()); + } + break; + } + + case JobOutboundMessageKind.ChannelMessage: { + const maybeSubject = channelsSubject.get(message.name); + if (maybeSubject) { + maybeSubject.next(message.message); + } + break; + } + + case JobOutboundMessageKind.ChannelComplete: { + const maybeSubject = channelsSubject.get(message.name); + if (maybeSubject) { + maybeSubject.complete(); + channelsSubject.delete(message.name); + } + break; + } + + case JobOutboundMessageKind.ChannelError: { + const maybeSubject = channelsSubject.get(message.name); + if (maybeSubject) { + maybeSubject.error(message.error); + channelsSubject.delete(message.name); + } + break; + } + } + }, () => { + state = JobState.Errored; + }), + + // Do output validation (might include default values so this might have side + // effects). We keep all messages in order. + concatMap(message => { + if (message.kind !== JobOutboundMessageKind.Output) { + return of(message); + } + + return handler.pipe( + switchMap(handler => { + if (handler === null) { + throw new JobDoesNotExistException(name); + } else { + return handler.outputV.pipe( + switchMap(validate => validate(message.value)), + switchMap(output => { + if (!output.success) { + throw new JobOutputSchemaValidationError(output.errors); + } + + return of({ + ...message, + output: output.data as O, + } as JobOutboundMessageOutput); + }), + ); + } + }), + ) as Observable>; + }), + _jobShare(), + ); + + const output = outboundBus.pipe( + filter(x => x.kind == JobOutboundMessageKind.Output), + map((x: JobOutboundMessageOutput) => x.value), + shareReplay(1), + ); + + // Return the Job. + return { + get state() { return state; }, + argument, + description: handler.pipe( + switchMap(handler => { + if (handler === null) { + throw new JobDoesNotExistException(name); + } else { + return of(handler.jobDescription); + } + }), + ), + output, + getChannel( + name: JobName, + schema: schema.JsonSchema = true, + ): Observable { + let maybeObservable = channels.get(name); + if (!maybeObservable) { + const s = new Subject(); + channelsSubject.set(name, s); + channels.set(name, s.asObservable()); + + maybeObservable = s.asObservable(); + } + + return maybeObservable.pipe( + // Keep the order of messages. + concatMap( + message => { + return schemaRegistry.compile(schema).pipe( + switchMap(validate => validate(message)), + filter(x => x.success), + map(x => x.data as T), + ); + }, + ), + ); + }, + ping() { + const id = pingId++; + inboundBus.next({ kind: JobInboundMessageKind.Ping, id }); + + return outboundBus.pipe( + filter(x => x.kind === JobOutboundMessageKind.Pong && x.id == id), + first(), + ignoreElements(), + ); + }, + stop() { + inboundBus.next({ kind: JobInboundMessageKind.Stop }); + }, + input, + inboundBus, + outboundBus, + }; + } + + protected _scheduleJob< + A extends MinimumArgumentT, + I extends MinimumInputT, + O extends MinimumOutputT, + >( + name: JobName, + argument: A, + options: ScheduleJobOptions, + waitable: Observable, + ): Job { + // Get handler first, since this can error out if there's no handler for the job name. + const handler = this._getInternalDescription(name); + + const optionsDeps = (options && options.dependencies) || []; + const dependencies = Array.isArray(optionsDeps) ? optionsDeps : [optionsDeps]; + + const inboundBus = new Subject>(); + const outboundBus = concat( + // Wait for dependencies, make sure to not report messages from dependencies. Subscribe to + // all dependencies at the same time so they run concurrently. + merge(...dependencies.map(x => x.outboundBus)).pipe(ignoreElements()), + + // Wait for pause() to clear (if necessary). + waitable, + + from(handler).pipe( + switchMap(handler => new Observable((subscriber: Observer>) => { + if (!handler) { + throw new JobDoesNotExistException(name); + } + + // Validate the argument. + return handler.argumentV.pipe( + switchMap(validate => validate(argument)), + switchMap(output => { + if (!output.success) { + throw new JobInboundMessageSchemaValidationError(output.errors); + } + + const argument: A = output.data as A; + const description = handler.jobDescription; + subscriber.next({ kind: JobOutboundMessageKind.OnReady, description }); + + const context = { + description, + dependencies: [...dependencies], + inboundBus: inboundBus.asObservable(), + scheduler: this as Scheduler, + }; + + return handler(argument, context); + }), + ).subscribe(subscriber); + })), + ), + ); + + return this._createJob(name, argument, handler, inboundBus, outboundBus, options); + } +} diff --git a/packages/angular_devkit/core/src/experimental/jobs/simple-scheduler_spec.ts b/packages/angular_devkit/core/src/experimental/jobs/simple-scheduler_spec.ts new file mode 100644 index 000000000000..02a4755722ad --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/simple-scheduler_spec.ts @@ -0,0 +1,594 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +// tslint:disable:no-big-function no-non-null-assertion +import { EMPTY, Observable, of, timer } from 'rxjs'; +import { map, take, toArray } from 'rxjs/operators'; +import { JobHandlerContext, JobOutboundMessage, JobOutboundMessageKind, JobState } from './api'; +import { createJobHandler } from './create-job-handler'; +import { SimpleJobRegistry } from './simple-registry'; +import { + JobInboundMessageSchemaValidationError, + JobOutputSchemaValidationError, + SimpleScheduler, +} from './simple-scheduler'; + +describe('SimpleScheduler', () => { + let registry: SimpleJobRegistry; + let scheduler: SimpleScheduler; + + beforeEach(() => { + registry = new SimpleJobRegistry(); + scheduler = new SimpleScheduler(registry); + }); + + it('works for a simple case', async () => { + registry.register( + 'add', createJobHandler((arg: number[]) => arg.reduce((a, c) => a + c, 0)), { + argument: { items: { type: 'number' } }, + output: { type: 'number' }, + }, + ); + + const sum = await (scheduler.schedule('add', [1, 2, 3, 4])).output.toPromise(); + expect(sum).toBe(10); + }); + + it('calls jobs in parallel', async () => { + let started = 0; + let finished = 0; + + registry.register( + 'add', + createJobHandler((argument: number[]) => { + started++; + + return new Promise( + resolve => setTimeout(() => { + finished++; + resolve(argument.reduce((a, c) => a + c, 0)); + }, 10), + ); + }), + { + argument: { items: { type: 'number' } }, + output: { type: 'number' }, + }, + ); + + const job1 = scheduler.schedule('add', [1, 2, 3, 4]); + const job2 = scheduler.schedule('add', [1, 2, 3, 4, 5]); + expect(started).toBe(0); + + const p1 = job1.output.toPromise(); + expect(started).toBe(1); + + const p2 = job2.output.toPromise(); + expect(started).toBe(2); + expect(finished).toBe(0); + + const [sum, sum2] = await Promise.all([p1, p2]); + expect(started).toBe(2); + expect(finished).toBe(2); + + expect(sum).toBe(10); + expect(sum2).toBe(15); + }); + + it('validates arguments', async () => { + registry.register( + 'add', createJobHandler((arg: number[]) => arg.reduce((a, c) => a + c, 0)), { + argument: { items: { type: 'number' } }, + output: { type: 'number' }, + }, + ); + + await (scheduler.schedule('add', [1, 2, 3, 4])).output.toPromise(); + try { + await (scheduler.schedule('add', ['1', 2, 3, 4])).output.toPromise(); + expect(true).toBe(false); + } catch (e) { + // TODO: enable this when https://github.com/bazelbuild/rules_typescript/commit/37807e2c4 + // is released, otherwise this breaks because bazel downgrade to ES5 which does not support + // extending Error. + // expect(e instanceof JobInboundMessageSchemaValidationError).toBe(true); + expect(e.message).toMatch(/"\[0\]".*number/); + } + }); + + it('validates outputs', async () => { + registry.register( + 'add', createJobHandler(() => 'hello world'), { + output: { type: 'number' }, + }, + ); + + try { + await (scheduler.schedule('add', [1, 2, 3, 4])).output.toPromise(); + expect(true).toBe(false); + } catch (e) { + // TODO: enable this when https://github.com/bazelbuild/rules_typescript/commit/37807e2c4 + // is released, otherwise this breaks because bazel downgrade to ES5 which does not support + // extending Error. + // expect(e instanceof JobOutputSchemaValidationError).toBe(true); + expect(e.message).toMatch(/"".*number/); + } + }); + + it('works with dependencies', async () => { + const done: number[] = []; + + registry.register( + 'job', + createJobHandler((argument) => { + return new Promise(resolve => setImmediate(() => { + done.push(argument); + resolve(argument); + })); + }), + { argument: true, output: true }, + ); + + // Run jobs. + const job1 = scheduler.schedule('job', 1); + const job2 = scheduler.schedule('job', 2); + const job3 = scheduler.schedule('job', 3); + + // Run a job to wait for 1. + const job4 = scheduler.schedule('job', 4, { dependencies: job1 }); + + // Run a job to wait for 2. + const job5 = scheduler.schedule('job', 5, { dependencies: job2 }); + + // Run a job to wait for 3. + const job6 = scheduler.schedule('job', 6, { dependencies: job3 }); + + // Run a job to wait for 4, 5 and 6. + const job7 = scheduler.schedule('job', 7, { dependencies: [job4, job5, job6] }); + + expect(done.length).toBe(0); + + await job1.output.toPromise(); + expect(done).toContain(1); + expect(done).not.toContain(4); + expect(done).not.toContain(7); + + await job5.output.toPromise(); + expect(done).toContain(1); + expect(done).toContain(2); + expect(done).not.toContain(4); + expect(done).toContain(5); + expect(done).not.toContain(7); + + await job7.output.toPromise(); + expect(done.length).toBe(7); + // Might be out of order. + expect(done).toEqual(jasmine.arrayContaining([1, 2, 3, 4, 5, 6, 7])); + // Verify at least partial order. + expect(done[done.length - 1]).toBe(7); + expect(done.indexOf(4)).toBeGreaterThan(done.indexOf(1)); + expect(done.indexOf(5)).toBeGreaterThan(done.indexOf(2)); + expect(done.indexOf(6)).toBeGreaterThan(done.indexOf(3)); + }); + + it('does not start dependencies until the last one is subscribed to', async () => { + // This test creates the following graph of dependencies: + // 1 <-.-- 2 <-.-- 4 <-.---------.-- 6 + // +-- 3 <-+-- 5 <-' + // Which can result only in the execution orders: [1, 2, 3, 4, 5, 6] + // Only subscribe to the last one. + + const started: number[] = []; + const done: number[] = []; + + registry.register( + 'job', + createJobHandler((argument: number) => { + started.push(argument); + + return new Promise(resolve => setImmediate(() => { + done.push(argument); + resolve(argument); + })); + }), + { argument: true, output: true }, + ); + + // Run jobs. + const job1 = scheduler.schedule('job', 1); + const job2 = scheduler.schedule('job', 2, { dependencies: job1 }); + const job3 = scheduler.schedule('job', 3, { dependencies: job2 }); + const job4 = scheduler.schedule('job', 4, { dependencies: [job2, job3] }); + const job5 = scheduler.schedule('job', 5, { dependencies: [job1, job2, job4] }); + const job6 = scheduler.schedule('job', 6, { dependencies: [job4, job5] }); + + // Just subscribe to the last job in the lot. + job6.outboundBus.subscribe(); + // Expect the first one to start. + expect(started).toEqual([1]); + // Wait for the first one to finish. + await job1.output.toPromise(); + // Expect the second one to have started, and the first one to be done. + expect(started).toEqual([1, 2]); + expect(done).toEqual([1]); + + // Rinse and repeat. + await job2.output.toPromise(); + expect(started).toEqual([1, 2, 3]); + expect(done).toEqual([1, 2]); + + await job3.output.toPromise(); + expect(started).toEqual([1, 2, 3, 4]); + expect(done).toEqual([1, 2, 3]); + + await job4.output.toPromise(); + expect(started).toEqual([1, 2, 3, 4, 5]); + expect(done).toEqual([1, 2, 3, 4]); + + // Just skip job 5. + await job6.output.toPromise(); + expect(done).toEqual(started); + }); + + it('can be paused', async () => { + let resume: (() => void) | null = null; + + registry.register( + 'job', + createJobHandler((argument, context) => { + return Promise.resolve() + .then(() => { + expect(resume).toBeNull(); + resume = context.scheduler.pause(); + }) + .then(() => argument); + }), + ); + + // Run the job once. Wait for it to finish. We should have a `resume()` and the scheduler will + // be paused. + const p0 = (scheduler.schedule('job', 0)).output.toPromise(); + expect(await p0).toBe(0); + + // This will wait. + const p1 = (scheduler.schedule('job', 1)).output.toPromise(); + await Promise.resolve(); + + expect(resume).not.toBeNull(); + resume !(); + resume = null; + + // Running p1. + expect(await p1).toBe(1); + expect(resume).not.toBeNull(); + + const p2 = (scheduler.schedule('job', 2)).output.toPromise(); + + await Promise.resolve(); + resume !(); + resume = null; + expect(await p2).toBe(2); + expect(resume).not.toBeNull(); + + resume !(); + // Should not error since all jobs have run. + await Promise.resolve(); + }); + + it('can be paused (multiple)', async () => { + const done: number[] = []; + + registry.register( + 'jobA', + createJobHandler((argument: number) => { + done.push(argument); + + return Promise.resolve() + .then(() => argument); + }), + ); + + // Pause manually. + const resume = scheduler.pause(); + const p10 = (scheduler.schedule('jobA', 10)).output.toPromise(); + const p11 = (scheduler.schedule('jobA', 11)).output.toPromise(); + const p12 = (scheduler.schedule('jobA', 12)).output.toPromise(); + await Promise.resolve(); + + expect(done).toEqual([]); + resume(); + await Promise.resolve(); + expect(done).not.toEqual([]); + expect(await p10).toBe(10); + expect(await p11).toBe(11); + expect(await p12).toBe(12); + expect(done).toEqual([10, 11, 12]); + }); + + it('can be cancelled by unsubscribing from the raw output', async () => { + const done: number[] = []; + const resolves: (() => void)[] = []; + let keepGoing = true; + + registry.register( + 'job', + createJobHandler((argument: number) => { + return new Observable(observer => { + function fn() { + if (keepGoing) { + const p = new Promise(r => resolves.push(r)); + + observer.next(argument); + done.push(argument); + argument++; + + // tslint:disable-next-line:no-floating-promises + p.then(fn); + } else { + done.push(-1); + observer.complete(); + } + } + + setImmediate(fn); + + return () => { + keepGoing = false; + }; + }); + }), + ); + + const job = scheduler.schedule('job', 0); + await new Promise(r => setTimeout(r, 10)); + expect(job.state).toBe(JobState.Queued); + const subscription = job.output.subscribe(); + + await new Promise(r => setTimeout(r, 10)); + expect(job.state).toBe(JobState.Started); + expect(done).toEqual([0]); + expect(resolves.length).toBe(1); + resolves[0](); + + await new Promise(r => setTimeout(r, 10)); + expect(done).toEqual([0, 1]); + expect(resolves.length).toBe(2); + resolves[1](); + + await new Promise(r => setTimeout(r, 10)); + expect(done).toEqual([0, 1, 2]); + expect(resolves.length).toBe(3); + subscription.unsubscribe(); + resolves[2](); + + job.stop(); + await job.output.toPromise(); + expect(keepGoing).toBe(false); + expect(done).toEqual([0, 1, 2, -1]); + expect(job.state).toBe(JobState.Ended); + }); + + it('sequences raw outputs properly for all use cases', async () => { + registry.register('job-sync', createJobHandler(arg => arg + 1)); + registry.register('job-promise', createJobHandler(arg => { + return Promise.resolve(arg + 1); + })); + registry.register('job-obs-sync', createJobHandler(arg => of(arg + 1))); + registry.register('job-obs-async', createJobHandler(arg => { + return timer(1).pipe( + take(3), + take(1), + map(() => arg + 1), + ); + })); + + const job1 = scheduler.schedule('job-sync', 100); + const job1OutboundBus = await job1.outboundBus.pipe( + // Descriptions are going to differ, so get rid of those. + map(x => ({ ...x, description: null })), + toArray(), + ).toPromise(); + + const job2 = scheduler.schedule('job-promise', 100); + const job2OutboundBus = await job2.outboundBus.pipe( + // Descriptions are going to differ, so get rid of those. + map(x => ({ ...x, description: null })), + toArray(), + ).toPromise(); + + const job3 = scheduler.schedule('job-obs-sync', 100); + const job3OutboundBus = await job3.outboundBus.pipe( + // Descriptions are going to differ, so get rid of those. + map(x => ({ ...x, description: null })), + toArray(), + ).toPromise(); + + const job4 = scheduler.schedule('job-obs-async', 100); + const job4OutboundBus = await job4.outboundBus.pipe( + // Descriptions are going to differ, so get rid of those. + map(x => ({ ...x, description: null })), + toArray(), + ).toPromise(); + + // The should all report the same stuff. + expect(job1OutboundBus).toEqual(job4OutboundBus); + expect(job2OutboundBus).toEqual(job4OutboundBus); + expect(job3OutboundBus).toEqual(job4OutboundBus); + }); + + describe('channels', () => { + it('works', async () => { + registry.register( + 'job', + createJobHandler((argument, context) => { + const channel = context.createChannel('any'); + channel.next('hello world'); + channel.complete(); + + return 0; + }), + ); + + const job = scheduler.schedule('job', 0); + let sideValue = ''; + const c = job.getChannel('any') as Observable; + c.subscribe(x => sideValue = x); + + expect(await job.output.toPromise()).toBe(0); + expect(sideValue).toBe('hello world'); + }); + + it('validates', async () => { + registry.register( + 'job', + createJobHandler((argument, context) => { + const channel = context.createChannel('any'); + channel.next('hello world'); + channel.complete(); + + return 0; + }), { + argument: true, + output: true, + }, + ); + + const job = scheduler.schedule('job', 0); + let sideValue = ''; + const c = job.getChannel('any', { type: 'number' }) as Observable; + expect(c).toBeDefined(null); + + if (c) { + c.subscribe(x => sideValue = x); + } + + expect(await job.output.toPromise()).toBe(0); + expect(sideValue).not.toBe('hello world'); + }); + }); + + describe('lifecycle messages', () => { + it('sequences double start once', async () => { + const fn = (_: never, { description }: JobHandlerContext) => { + return new Observable>(observer => { + observer.next({ kind: JobOutboundMessageKind.Start, description }); + observer.next({ kind: JobOutboundMessageKind.Start, description }); + observer.next({ kind: JobOutboundMessageKind.End, description }); + observer.complete(); + }); + }; + + registry.register('job', Object.assign(fn, { jobDescription: {} })); + const allOutput = await scheduler.schedule('job', 0).outboundBus.pipe( + toArray(), + ).toPromise(); + + expect(allOutput.map(x => ({ ...x, description: null }))).toEqual([ + { kind: JobOutboundMessageKind.OnReady, description: null }, + { kind: JobOutboundMessageKind.Start, description: null }, + { kind: JobOutboundMessageKind.End, description: null }, + ]); + }); + + it('add an End if there is not one', async () => { + const fn = () => EMPTY; + + registry.register('job', Object.assign(fn, { jobDescription: {} })); + const allOutput = await scheduler.schedule('job', 0).outboundBus.pipe( + toArray(), + ).toPromise(); + + expect(allOutput.map(x => ({ ...x, description: null }))).toEqual([ + { kind: JobOutboundMessageKind.OnReady, description: null }, + { kind: JobOutboundMessageKind.End, description: null }, + ]); + }); + + it('only one End', async () => { + const fn = (_: never, { description }: JobHandlerContext) => { + return new Observable>(observer => { + observer.next({ kind: JobOutboundMessageKind.End, description }); + observer.next({ kind: JobOutboundMessageKind.End, description }); + observer.complete(); + }); + }; + + registry.register('job', Object.assign(fn, { jobDescription: {} })); + const allOutput = await scheduler.schedule('job', 0).outboundBus.pipe( + toArray(), + ).toPromise(); + + expect(allOutput.map(x => ({ ...x, description: null }))).toEqual([ + { kind: JobOutboundMessageKind.OnReady, description: null }, + { kind: JobOutboundMessageKind.End, description: null }, + ]); + }); + }); + + describe('input', () => { + it('works', async () => { + registry.register( + 'job', + createJobHandler((argument, context) => { + return new Observable(subscriber => { + context.input.subscribe(x => { + if (x === null) { + subscriber.complete(); + } else { + subscriber.next(parseInt('' + x) + argument); + } + }); + }); + }), + ); + + const job = scheduler.schedule('job', 100); + const outputs: number[] = []; + + job.output.subscribe(x => outputs.push(x as number)); + + job.input.next(1); + job.input.next('2'); + job.input.next(3); + job.input.next(null); + + expect(await job.output.toPromise()).toBe(103); + expect(outputs).toEqual([101, 102, 103]); + }); + + it('validates', async () => { + const handler = createJobHandler((argument, context) => { + return new Observable(subscriber => { + context.input.subscribe(x => { + if (x === null) { + subscriber.complete(); + } else { + subscriber.next(parseInt('' + x) + argument); + } + }); + }); + }, { + input: { anyOf: [{ type: 'number' }, { type: 'null' }] }, + }); + + registry.register('job', handler); + + const job = scheduler.schedule('job', 100); + const outputs: number[] = []; + + job.output.subscribe(x => outputs.push(x as number)); + + job.input.next(1); + job.input.next('2'); + job.input.next(3); + job.input.next(null); + + expect(await job.output.toPromise()).toBe(103); + expect(outputs).toEqual([101, 103]); + }); + }); +}); diff --git a/packages/angular_devkit/core/src/experimental/jobs/strategy.ts b/packages/angular_devkit/core/src/experimental/jobs/strategy.ts new file mode 100644 index 000000000000..146ba6b6decb --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/strategy.ts @@ -0,0 +1,85 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +import { Observable, concat, of } from 'rxjs'; +import { ignoreElements, share, shareReplay } from 'rxjs/operators'; +import { JsonValue } from '../../json'; +import { JobDescription, JobHandler, JobHandlerContext, JobOutboundMessage } from './api'; + +const stableStringify = require('fast-json-stable-stringify'); + +export namespace strategy { + + export type JobStrategy = ( + handler: JobHandler, + options?: Partial>, + ) => JobHandler; + + /** + * Creates a JobStrategy that serializes every call. This strategy can be mixed between jobs. + */ + export function serialize< + A extends JsonValue = JsonValue, + I extends JsonValue = JsonValue, + O extends JsonValue = JsonValue, + >(): JobStrategy { + let latest: Observable> = of(); + + return (handler, options) => { + const newHandler = (argument: A, context: JobHandlerContext) => { + const previous = latest; + latest = concat( + previous.pipe(ignoreElements()), + new Observable>(o => handler(argument, context).subscribe(o)), + ).pipe( + shareReplay(0), + ); + + return latest; + }; + + return Object.assign(newHandler, { + jobDescription: Object.assign({}, handler.jobDescription, options), + }); + }; + } + + + /** + * Creates a JobStrategy that will reuse a running job if the argument matches. + * @param replayMessages Replay ALL messages if a job is reused, otherwise just hook up where it + * is. + */ + export function memoize< + A extends JsonValue = JsonValue, + I extends JsonValue = JsonValue, + O extends JsonValue = JsonValue, + >(replayMessages = false): JobStrategy { + const runs = new Map>>(); + + return (handler, options) => { + const newHandler = (argument: A, context: JobHandlerContext) => { + const argumentJson = stableStringify(argument); + const maybeJob = runs.get(argumentJson); + + if (maybeJob) { + return maybeJob; + } + + const run = handler(argument, context).pipe( + replayMessages ? shareReplay() : share(), + ); + runs.set(argumentJson, run); + + return run; + }; + + return Object.assign(newHandler, handler, options || {}); + }; + } + +} diff --git a/packages/angular_devkit/core/src/experimental/jobs/strategy_spec.ts b/packages/angular_devkit/core/src/experimental/jobs/strategy_spec.ts new file mode 100644 index 000000000000..3fbef7e689bb --- /dev/null +++ b/packages/angular_devkit/core/src/experimental/jobs/strategy_spec.ts @@ -0,0 +1,215 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +import { JobState } from './api'; +import { createJobHandler } from './create-job-handler'; +import { SimpleJobRegistry } from './simple-registry'; +import { SimpleScheduler } from './simple-scheduler'; +import { strategy } from './strategy'; + +describe('strategy.serialize()', () => { + let registry: SimpleJobRegistry; + let scheduler: SimpleScheduler; + + beforeEach(() => { + registry = new SimpleJobRegistry(); + scheduler = new SimpleScheduler(registry); + }); + + it('works', async () => { + let started = 0; + let finished = 0; + + registry.register(strategy.serialize()(createJobHandler((input: number[]) => { + started++; + + return new Promise( + resolve => setTimeout(() => { + finished++; + resolve(input.reduce((a, c) => a + c, 0)); + }, 10), + ); + })), { + argument: { items: { type: 'number' } }, + output: { type: 'number' }, + name: 'add', + }); + + const job1 = await scheduler.schedule('add', [1, 2, 3, 4]); + const job2 = await scheduler.schedule('add', [1, 2, 3, 4, 5]); + expect(started).toBe(0); + expect(finished).toBe(0); + + job1.output.subscribe(); + expect(started).toBe(1); + + job2.output.subscribe(); + expect(started).toBe(1); // Job2 starts when Job1 ends. + + expect(finished).toBe(0); + + await Promise.all([ + job1.output.toPromise().then(s => { + expect(finished).toBe(1); + expect(s).toBe(10); + }), + job2.output.toPromise().then(s => { + expect(finished).toBe(2); + expect(s).toBe(15); + }), + ]); + + expect(started).toBe(2); + expect(finished).toBe(2); + }); + + it('works across jobs', async () => { + let started = 0; + let finished = 0; + + const strategy1 = strategy.serialize(); + + registry.register(strategy1(createJobHandler((input: number[]) => { + started++; + + return new Promise( + resolve => setTimeout(() => { + finished++; + resolve(input.reduce((a, c) => a + c, 0)); + }, 10), + ); + })), { + argument: { items: { type: 'number' } }, + output: { type: 'number' }, + name: 'add', + }); + registry.register(strategy1(createJobHandler((input: number[]) => { + started++; + + return new Promise( + resolve => setTimeout(() => { + finished++; + resolve(input.reduce((a, c) => a + c, 100)); + }, 10), + ); + })), { + argument: { items: { type: 'number' } }, + output: { type: 'number' }, + name: 'add100', + }); + + const job1 = await scheduler.schedule('add', [1, 2, 3, 4]); + const job2 = await scheduler.schedule('add100', [1, 2, 3, 4, 5]); + expect(started).toBe(0); + expect(finished).toBe(0); + + job1.output.subscribe(); + expect(started).toBe(1); + + job2.output.subscribe(); + expect(started).toBe(1); // Job2 starts when Job1 ends. + + expect(finished).toBe(0); + + await Promise.all([ + job1.output.toPromise().then(s => { + expect(finished).toBe(1); + expect(s).toBe(10); + }), + job2.output.toPromise().then(s => { + expect(finished).toBe(2); + expect(s).toBe(115); + }), + ]); + + expect(started).toBe(2); + expect(finished).toBe(2); + }); +}); + +describe('strategy.memoize()', () => { + let registry: SimpleJobRegistry; + let scheduler: SimpleScheduler; + + beforeEach(() => { + registry = new SimpleJobRegistry(); + scheduler = new SimpleScheduler(registry); + }); + + it('works', async () => { + let started = 0; + let finished = 0; + + registry.register(strategy.memoize()(createJobHandler((input: number[]) => { + started++; + + return new Promise( + resolve => setTimeout(() => { + finished++; + resolve(input.reduce((a, c) => a + c, 0)); + }, 10), + ); + })), { + argument: { items: { type: 'number' } }, + output: { type: 'number' }, + name: 'add', + }); + + const job1 = await scheduler.schedule('add', [1, 2, 3, 4]); + const job2 = await scheduler.schedule('add', [1, 2, 3, 4]); + const job3 = await scheduler.schedule('add', [1, 2, 3, 4, 5]); + const job4 = await scheduler.schedule('add', [1, 2, 3, 4, 5]); + expect(started).toBe(0); + expect(finished).toBe(0); + + job1.output.subscribe(); + expect(started).toBe(1); + expect(finished).toBe(0); + + job2.output.subscribe(); + expect(started).toBe(1); // job2 is reusing job1. + expect(finished).toBe(0); + + job3.output.subscribe(); + expect(started).toBe(2); + expect(finished).toBe(0); + + job4.output.subscribe(); + expect(started).toBe(2); // job4 is reusing job3. + expect(finished).toBe(0); + + await Promise.all([ + job1.output.toPromise().then(s => { + // This is hard since job3 and job1 might finish out of order. + expect(finished).toBeGreaterThanOrEqual(1); + expect(s).toBe(10); + }), + job2.output.toPromise().then(s => { + // This is hard since job3 and job1 might finish out of order. + expect(finished).toBeGreaterThanOrEqual(1); + expect(job1.state).toBe(JobState.Ended); + expect(job2.state).toBe(JobState.Ended); + expect(s).toBe(10); + }), + job3.output.toPromise().then(s => { + // This is hard since job3 and job1 might finish out of order. + expect(finished).toBeGreaterThanOrEqual(1); + expect(s).toBe(15); + }), + job4.output.toPromise().then(s => { + expect(job3.state).toBe(JobState.Ended); + expect(job4.state).toBe(JobState.Ended); + // This is hard since job3 and job1 might finish out of order. + expect(finished).toBeGreaterThanOrEqual(1); + expect(s).toBe(15); + }), + ]); + + expect(started).toBe(2); + expect(finished).toBe(2); + }); +}); diff --git a/packages/angular_devkit/core/src/json/schema/schema.ts b/packages/angular_devkit/core/src/json/schema/schema.ts index f9b1e58dc98a..d7c5a653ebdc 100644 --- a/packages/angular_devkit/core/src/json/schema/schema.ts +++ b/packages/angular_devkit/core/src/json/schema/schema.ts @@ -5,7 +5,8 @@ * Use of this source code is governed by an MIT-style license that can be * found in the LICENSE file at https://angular.io/license */ -import { JsonObject } from '../interface'; +import { clean } from '../../utils'; +import { JsonObject, isJsonObject } from '../interface'; /** * A specialized interface for JsonSchema (to come). JsonSchemas are also JsonObject. @@ -13,3 +14,38 @@ import { JsonObject } from '../interface'; * @public */ export type JsonSchema = JsonObject | boolean; + + +// TODO: this should be unknown +// tslint:disable-next-line:no-any +export function isJsonSchema(value: any): value is JsonSchema { + return isJsonObject(value) || value === false || value === true; +} + +/** + * Return a schema that is the merge of all subschemas, ie. it should validate all the schemas + * that were passed in. It is possible to make an invalid schema this way, e.g. by using + * `mergeSchemas({ type: 'number' }, { type: 'string' })`, which will never validate. + * @param schemas All schemas to be merged. + */ +export function mergeSchemas(...schemas: (JsonSchema | undefined)[]): JsonSchema { + return clean(schemas).reduce((prev, curr) => { + if (prev === false || curr === false) { + return false; + } else if (prev === true) { + return curr; + } else if (curr === true) { + return prev; + } else if (Array.isArray(prev.allOf)) { + if (Array.isArray(curr.allOf)) { + return { ...prev, allOf: [...prev.allOf, ...curr.allOf] }; + } else { + return { ...prev, allOf: [...prev.allOf, curr] }; + } + } else if (Array.isArray(curr.allOf)) { + return { ...prev, allOf: [prev, ...curr.allOf] }; + } else { + return { ...prev, allOf: [prev, curr] }; + } + }, true as JsonSchema); +} diff --git a/tests/angular_devkit/core/node/jobs/BUILD b/tests/angular_devkit/core/node/jobs/BUILD new file mode 100644 index 000000000000..8a79ee1a212f --- /dev/null +++ b/tests/angular_devkit/core/node/jobs/BUILD @@ -0,0 +1,22 @@ +# Copyright Google Inc. All Rights Reserved. +# +# Use of this source code is governed by an MIT-style license that can be +# found in the LICENSE file at https://angular.io/license +package(default_visibility = ["//visibility:public"]) + +load("@build_bazel_rules_typescript//:defs.bzl", "ts_library") + +licenses(["notice"]) # MIT License + +ts_library( + name = "jobs_test_lib", + srcs = glob( + include = [ + "**/*.ts", + ], + ), + deps = [ + "//packages/angular_devkit/core", + "@npm//@types/node", + ], +) diff --git a/tests/angular_devkit/core/node/jobs/add.ts b/tests/angular_devkit/core/node/jobs/add.ts new file mode 100644 index 000000000000..c271d8cfd031 --- /dev/null +++ b/tests/angular_devkit/core/node/jobs/add.ts @@ -0,0 +1,18 @@ +/** + * @license + * Copyright Google Inc. All Rights Reserved. + * + * Use of this source code is governed by an MIT-style license that can be + * found in the LICENSE file at https://angular.io/license + */ +// tslint:disable:no-global-tslint-disable +// tslint:disable:no-implicit-dependencies +import { experimental } from '@angular-devkit/core'; + +// Export the job using a createJob. We use our own spec file here to do the job. +export default experimental.jobs.createJobHandler(input => { + return input.reduce((a, c) => a + c, 0); +}, { + input: { items: { type: 'number' } }, + output: { type: 'number' }, +});