Skip to content

Commit

Permalink
Improve (and shorten) query polling implementation. (#4243)
Browse files Browse the repository at this point in the history
This implementation has the following benefits:

- It collapses the QueryScheduler abstraction into the QueryManager (which
  was always ultimately responsible for managing the lifetime of polling
  timers), thus simplifying the relationship between the QueryManager and
  its ObservableQuery objects.

- It's about 100 bytes smaller than the previous implementation, after
  minification and gzip.

- It uses setTimeout rather than setInterval, so event loop starvation
  never leads to a rapid succession of setInterval catch-up calls.

- It guarantees at most one timeout will be pending for an arbitrary
  number of polling queries, rather than a separate timer for every
  distinct polling interval.

- Fewer independent timers means better batching behavior, usually.

- Though there may be a delay between the desired polling time for a given
  query and the actual polling time, the delay is never greater than the
  minimum polling interval across all queries, which changes dynamically
  as polling queries are started and stopped.
  • Loading branch information
benjamn committed Jan 17, 2019
1 parent 42902f1 commit b4f0c8e
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 316 deletions.
5 changes: 3 additions & 2 deletions packages/apollo-client/src/__mocks__/mockLinks.ts
Expand Up @@ -3,6 +3,7 @@ import {
ApolloLink,
FetchResult,
Observable,
GraphQLRequest,
// Observer,
} from 'apollo-link';

Expand All @@ -25,7 +26,7 @@ export function mockObservableLink(): MockSubscriptionLink {
}

export interface MockedResponse {
request: Operation;
request: GraphQLRequest;
result?: FetchResult;
error?: Error;
delay?: number;
Expand Down Expand Up @@ -145,7 +146,7 @@ export class MockSubscriptionLink extends ApolloLink {
}
}

function requestToKey(request: Operation): string {
function requestToKey(request: GraphQLRequest): string {
const queryString = request.query && print(request.query);

return JSON.stringify({
Expand Down
39 changes: 9 additions & 30 deletions packages/apollo-client/src/core/ObservableQuery.ts
Expand Up @@ -2,11 +2,7 @@ import { isEqual, tryFunctionOrLogError, cloneDeep } from 'apollo-utilities';
import { GraphQLError } from 'graphql';
import { NetworkStatus, isNetworkRequestInFlight } from './networkStatus';
import { Observable, Observer, Subscription } from '../util/Observable';

import { QueryScheduler } from '../scheduler/scheduler';

import { ApolloError } from '../errors/ApolloError';

import { QueryManager } from './QueryManager';
import { ApolloQueryResult, FetchType, OperationVariables } from './types';
import {
Expand Down Expand Up @@ -68,10 +64,8 @@ export class ObservableQuery<
*/
public variables: TVariables;

private isCurrentlyPolling: boolean;
private shouldSubscribe: boolean;
private isTornDown: boolean;
private scheduler: QueryScheduler<any>;
private queryManager: QueryManager<any>;
private observers: Observer<ApolloQueryResult<TData>>[];
private subscriptionHandles: Subscription[];
Expand All @@ -81,11 +75,11 @@ export class ObservableQuery<
private lastError: ApolloError;

constructor({
scheduler,
queryManager,
options,
shouldSubscribe = true,
}: {
scheduler: QueryScheduler<any>;
queryManager: QueryManager<any>;
options: WatchQueryOptions<TVariables>;
shouldSubscribe?: boolean;
}) {
Expand All @@ -94,18 +88,16 @@ export class ObservableQuery<
);

// active state
this.isCurrentlyPolling = false;
this.isTornDown = false;

// query information
this.options = options;
this.variables = options.variables || ({} as TVariables);
this.queryId = scheduler.queryManager.generateQueryId();
this.queryId = queryManager.generateQueryId();
this.shouldSubscribe = shouldSubscribe;

// related classes
this.scheduler = scheduler;
this.queryManager = scheduler.queryManager;
this.queryManager = queryManager;

// interal data stores
this.observers = [];
Expand Down Expand Up @@ -524,11 +516,8 @@ export class ObservableQuery<
}

public stopPolling() {
if (this.isCurrentlyPolling) {
this.scheduler.stopPollingQuery(this.queryId);
this.options.pollInterval = undefined;
this.isCurrentlyPolling = false;
}
this.queryManager.stopPollingQuery(this.queryId);
this.options.pollInterval = undefined;
}

public startPolling(pollInterval: number) {
Expand All @@ -541,13 +530,8 @@ export class ObservableQuery<
);
}

if (this.isCurrentlyPolling) {
this.scheduler.stopPollingQuery(this.queryId);
this.isCurrentlyPolling = false;
}
this.options.pollInterval = pollInterval;
this.isCurrentlyPolling = true;
this.scheduler.startPollingQuery(this.options, this.queryId);
this.queryManager.startPollingQuery(this.options, this.queryId);
}

private onSubscribe(observer: Observer<ApolloQueryResult<TData>>) {
Expand Down Expand Up @@ -598,8 +582,7 @@ export class ObservableQuery<
);
}

this.isCurrentlyPolling = true;
this.scheduler.startPollingQuery<TData>(this.options, this.queryId);
this.queryManager.startPollingQuery(this.options, this.queryId);
}

const observer: Observer<ApolloQueryResult<TData>> = {
Expand Down Expand Up @@ -627,11 +610,7 @@ export class ObservableQuery<

private tearDownQuery() {
this.isTornDown = true;

if (this.isCurrentlyPolling) {
this.scheduler.stopPollingQuery(this.queryId);
this.isCurrentlyPolling = false;
}
this.queryManager.stopPollingQuery(this.queryId);

// stop all active GraphQL subscriptions
this.subscriptionHandles.forEach(sub => sub.unsubscribe());
Expand Down
138 changes: 133 additions & 5 deletions packages/apollo-client/src/core/QueryManager.ts
Expand Up @@ -13,8 +13,6 @@ import {
hasDirectives,
} from 'apollo-utilities';

import { QueryScheduler } from '../scheduler/scheduler';

import { isApolloError, ApolloError } from '../errors/ApolloError';

import { Observer, Subscription, Observable } from '../util/Observable';
Expand Down Expand Up @@ -54,7 +52,6 @@ export interface QueryInfo {
}

export class QueryManager<TStore> {
public scheduler: QueryScheduler<TStore>;
public link: ApolloLink;
public mutationStore: MutationStore = new MutationStore();
public queryStore: QueryStore = new QueryStore();
Expand All @@ -66,6 +63,8 @@ export class QueryManager<TStore> {

private onBroadcast: () => void;

private ssrMode: boolean;

// let's not start at zero to avoid pain with bad checks
private idCounter = 1;

Expand Down Expand Up @@ -104,7 +103,7 @@ export class QueryManager<TStore> {
this.dataStore = store;
this.onBroadcast = onBroadcast;
this.clientAwareness = clientAwareness;
this.scheduler = new QueryScheduler({ queryManager: this, ssrMode });
this.ssrMode = ssrMode;
}

public mutate<T>({
Expand Down Expand Up @@ -662,7 +661,7 @@ export class QueryManager<TStore> {
let transformedOptions = { ...options } as WatchQueryOptions<TVariables>;

return new ObservableQuery<T, TVariables>({
scheduler: this.scheduler,
queryManager: this,
options: transformedOptions,
shouldSubscribe: shouldSubscribe,
});
Expand Down Expand Up @@ -1269,4 +1268,133 @@ export class QueryManager<TStore> {
},
};
}

public checkInFlight(queryId: string) {
const query = this.queryStore.get(queryId);

return (
query &&
query.networkStatus !== NetworkStatus.ready &&
query.networkStatus !== NetworkStatus.error
);
}

// Map from client ID to { interval, options }.
public pollingInfoByQueryId = new Map<string, {
interval: number;
lastPollTimeMs: number;
options: WatchQueryOptions;
}>();

private nextPoll: {
time: number;
timeout: NodeJS.Timeout;
} | null = null;

public startPollingQuery(
options: WatchQueryOptions,
queryId: string,
listener?: QueryListener,
): string {
const { pollInterval } = options;

if (!pollInterval) {
throw new Error(
'Attempted to start a polling query without a polling interval.',
);
}

// Do not poll in SSR mode
if (!this.ssrMode) {
this.pollingInfoByQueryId.set(queryId, {
interval: pollInterval,
// Avoid polling until at least pollInterval milliseconds from now.
// The -10 is a fudge factor to help with tests that rely on simulated
// timeouts via jest.runTimersToTime.
lastPollTimeMs: Date.now() - 10,
options: {
...options,
fetchPolicy: 'network-only',
},
});

if (listener) {
this.addQueryListener(queryId, listener);
}

this.schedulePoll(pollInterval);
}

return queryId;
}

public stopPollingQuery(queryId: string) {
// Since the master polling interval dynamically adjusts to the contents of
// this.pollingInfoByQueryId, stopping a query from polling is as easy as
// removing it from the map.
this.pollingInfoByQueryId.delete(queryId);
}

// Calling this method ensures a poll will happen within the specified time
// limit, canceling any pending polls that would not happen in time.
private schedulePoll(timeLimitMs: number) {
const now = Date.now();

if (this.nextPoll) {
if (timeLimitMs < this.nextPoll.time - now) {
// The next poll will happen too far in the future, so cancel it, and
// fall through to scheduling a new timeout.
clearTimeout(this.nextPoll.timeout);
} else {
// The next poll will happen within timeLimitMs, so all is well.
return;
}
}

this.nextPoll = {
// Estimated time when the timeout will fire.
time: now + timeLimitMs,

timeout: setTimeout(() => {
this.nextPoll = null;
let nextTimeLimitMs = Infinity;

this.pollingInfoByQueryId.forEach((info, queryId) => {
// Pick next timeout according to current minimum interval.
if (info.interval < nextTimeLimitMs) {
nextTimeLimitMs = info.interval;
}

if (!this.checkInFlight(queryId)) {
// If this query was last polled more than interval milliseconds
// ago, poll it now. Note that there may be a small delay between
// the desired polling time and the actual polling time (equal to
// at most the minimum polling interval across all queries), but
// that's the tradeoff to batching polling intervals.
if (Date.now() - info.lastPollTimeMs >= info.interval) {
const updateLastPollTime = () => {
info.lastPollTimeMs = Date.now();
};
this.fetchQuery(queryId, info.options, FetchType.poll).then(
// Set info.lastPollTimeMs after the fetch completes, whether
// or not it succeeded. Promise.prototype.finally would be nice
// here, but we don't have a polyfill for that at the moment,
// and this code has historically silenced errors, which is not
// the behavior of .finally(updateLastPollTime).
updateLastPollTime,
updateLastPollTime
);
}
}
});

// If there were no entries in this.pollingInfoByQueryId, then
// nextTimeLimitMs will still be Infinity, so this.schedulePoll will
// not be called, thus ending the master polling interval.
if (isFinite(nextTimeLimitMs)) {
this.schedulePoll(nextTimeLimitMs);
}
}, timeLimitMs),
};
}
}

0 comments on commit b4f0c8e

Please sign in to comment.