1
1
import { devAssert } from '../jsutils/devAssert' ;
2
2
import { inspect } from '../jsutils/inspect' ;
3
3
import { invariant } from '../jsutils/invariant' ;
4
+ import { isAsyncIterable } from '../jsutils/isAsyncIterable' ;
4
5
import { isIterableObject } from '../jsutils/isIterableObject' ;
5
6
import { isObjectLike } from '../jsutils/isObjectLike' ;
6
7
import { isPromise } from '../jsutils/isPromise' ;
@@ -51,6 +52,7 @@ import {
51
52
collectFields ,
52
53
collectSubfields as _collectSubfields ,
53
54
} from './collectFields' ;
55
+ import { mapAsyncIterator } from './mapAsyncIterator' ;
54
56
import { getArgumentValues , getVariableValues } from './values' ;
55
57
56
58
/**
@@ -235,6 +237,7 @@ function buildResponse(
235
237
* Essential assertions before executing to provide developer feedback for
236
238
* improper use of the GraphQL library.
237
239
*
240
+ * TODO: consider no longer exporting this function
238
241
* @internal
239
242
*/
240
243
export function assertValidExecutionArguments (
@@ -260,6 +263,7 @@ export function assertValidExecutionArguments(
260
263
*
261
264
* Throws a GraphQLError if a valid execution context cannot be created.
262
265
*
266
+ * TODO: consider no longer exporting this function
263
267
* @internal
264
268
*/
265
269
export function buildExecutionContext (
@@ -543,6 +547,7 @@ function executeField(
543
547
}
544
548
545
549
/**
550
+ * TODO: consider no longer exporting this function
546
551
* @internal
547
552
*/
548
553
export function buildResolveInfo (
@@ -1009,3 +1014,225 @@ export const defaultFieldResolver: GraphQLFieldResolver<unknown, unknown> =
1009
1014
return property ;
1010
1015
}
1011
1016
} ;
1017
+
1018
+ /**
1019
+ * Implements the "Subscribe" algorithm described in the GraphQL specification.
1020
+ *
1021
+ * Returns a Promise which resolves to either an AsyncIterator (if successful)
1022
+ * or an ExecutionResult (error). The promise will be rejected if the schema or
1023
+ * other arguments to this function are invalid, or if the resolved event stream
1024
+ * is not an async iterable.
1025
+ *
1026
+ * If the client-provided arguments to this function do not result in a
1027
+ * compliant subscription, a GraphQL Response (ExecutionResult) with
1028
+ * descriptive errors and no data will be returned.
1029
+ *
1030
+ * If the source stream could not be created due to faulty subscription
1031
+ * resolver logic or underlying systems, the promise will resolve to a single
1032
+ * ExecutionResult containing `errors` and no `data`.
1033
+ *
1034
+ * If the operation succeeded, the promise resolves to an AsyncIterator, which
1035
+ * yields a stream of ExecutionResults representing the response stream.
1036
+ *
1037
+ * Accepts either an object with named arguments, or individual arguments.
1038
+ */
1039
+ export function subscribe (
1040
+ args : ExecutionArgs ,
1041
+ ) : PromiseOrValue <
1042
+ AsyncGenerator < ExecutionResult , void , void > | ExecutionResult
1043
+ > {
1044
+ const resultOrStream = createSourceEventStream ( args ) ;
1045
+
1046
+ if ( isPromise ( resultOrStream ) ) {
1047
+ return resultOrStream . then ( ( resolvedResultOrStream ) =>
1048
+ mapSourceToResponse ( resolvedResultOrStream , args ) ,
1049
+ ) ;
1050
+ }
1051
+
1052
+ return mapSourceToResponse ( resultOrStream , args ) ;
1053
+ }
1054
+
1055
+ function mapSourceToResponse (
1056
+ resultOrStream : ExecutionResult | AsyncIterable < unknown > ,
1057
+ args : ExecutionArgs ,
1058
+ ) : PromiseOrValue <
1059
+ AsyncGenerator < ExecutionResult , void , void > | ExecutionResult
1060
+ > {
1061
+ if ( ! isAsyncIterable ( resultOrStream ) ) {
1062
+ return resultOrStream ;
1063
+ }
1064
+
1065
+ // For each payload yielded from a subscription, map it over the normal
1066
+ // GraphQL `execute` function, with `payload` as the rootValue.
1067
+ // This implements the "MapSourceToResponseEvent" algorithm described in
1068
+ // the GraphQL specification. The `execute` function provides the
1069
+ // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
1070
+ // "ExecuteQuery" algorithm, for which `execute` is also used.
1071
+ return mapAsyncIterator ( resultOrStream , ( payload : unknown ) =>
1072
+ execute ( {
1073
+ ...args ,
1074
+ rootValue : payload ,
1075
+ } ) ,
1076
+ ) ;
1077
+ }
1078
+
1079
+ /**
1080
+ * Implements the "CreateSourceEventStream" algorithm described in the
1081
+ * GraphQL specification, resolving the subscription source event stream.
1082
+ *
1083
+ * Returns a Promise which resolves to either an AsyncIterable (if successful)
1084
+ * or an ExecutionResult (error). The promise will be rejected if the schema or
1085
+ * other arguments to this function are invalid, or if the resolved event stream
1086
+ * is not an async iterable.
1087
+ *
1088
+ * If the client-provided arguments to this function do not result in a
1089
+ * compliant subscription, a GraphQL Response (ExecutionResult) with
1090
+ * descriptive errors and no data will be returned.
1091
+ *
1092
+ * If the the source stream could not be created due to faulty subscription
1093
+ * resolver logic or underlying systems, the promise will resolve to a single
1094
+ * ExecutionResult containing `errors` and no `data`.
1095
+ *
1096
+ * If the operation succeeded, the promise resolves to the AsyncIterable for the
1097
+ * event stream returned by the resolver.
1098
+ *
1099
+ * A Source Event Stream represents a sequence of events, each of which triggers
1100
+ * a GraphQL execution for that event.
1101
+ *
1102
+ * This may be useful when hosting the stateful subscription service in a
1103
+ * different process or machine than the stateless GraphQL execution engine,
1104
+ * or otherwise separating these two steps. For more on this, see the
1105
+ * "Supporting Subscriptions at Scale" information in the GraphQL specification.
1106
+ */
1107
+ export function createSourceEventStream (
1108
+ args : ExecutionArgs ,
1109
+ ) : PromiseOrValue < AsyncIterable < unknown > | ExecutionResult > {
1110
+ const {
1111
+ schema,
1112
+ document,
1113
+ rootValue,
1114
+ contextValue,
1115
+ variableValues,
1116
+ operationName,
1117
+ subscribeFieldResolver,
1118
+ } = args ;
1119
+
1120
+ // If arguments are missing or incorrectly typed, this is an internal
1121
+ // developer mistake which should throw an early error.
1122
+ assertValidExecutionArguments ( schema , document , variableValues ) ;
1123
+
1124
+ // If a valid execution context cannot be created due to incorrect arguments,
1125
+ // a "Response" with only errors is returned.
1126
+ const exeContext = buildExecutionContext ( {
1127
+ schema,
1128
+ document,
1129
+ rootValue,
1130
+ contextValue,
1131
+ variableValues,
1132
+ operationName,
1133
+ subscribeFieldResolver,
1134
+ } ) ;
1135
+
1136
+ // Return early errors if execution context failed.
1137
+ if ( ! ( 'schema' in exeContext ) ) {
1138
+ return { errors : exeContext } ;
1139
+ }
1140
+
1141
+ try {
1142
+ const eventStream = executeSubscription ( exeContext ) ;
1143
+ if ( isPromise ( eventStream ) ) {
1144
+ return eventStream . then ( undefined , ( error ) => ( { errors : [ error ] } ) ) ;
1145
+ }
1146
+
1147
+ return eventStream ;
1148
+ } catch ( error ) {
1149
+ return { errors : [ error ] } ;
1150
+ }
1151
+ }
1152
+
1153
+ function executeSubscription (
1154
+ exeContext : ExecutionContext ,
1155
+ ) : PromiseOrValue < AsyncIterable < unknown > > {
1156
+ const { schema, fragments, operation, variableValues, rootValue } =
1157
+ exeContext ;
1158
+
1159
+ const rootType = schema . getSubscriptionType ( ) ;
1160
+ if ( rootType == null ) {
1161
+ throw new GraphQLError (
1162
+ 'Schema is not configured to execute subscription operation.' ,
1163
+ { nodes : operation } ,
1164
+ ) ;
1165
+ }
1166
+
1167
+ const rootFields = collectFields (
1168
+ schema ,
1169
+ fragments ,
1170
+ variableValues ,
1171
+ rootType ,
1172
+ operation . selectionSet ,
1173
+ ) ;
1174
+ const [ responseName , fieldNodes ] = [ ...rootFields . entries ( ) ] [ 0 ] ;
1175
+ const fieldName = fieldNodes [ 0 ] . name . value ;
1176
+ const fieldDef = schema . getField ( rootType , fieldName ) ;
1177
+
1178
+ if ( ! fieldDef ) {
1179
+ throw new GraphQLError (
1180
+ `The subscription field "${ fieldName } " is not defined.` ,
1181
+ { nodes : fieldNodes } ,
1182
+ ) ;
1183
+ }
1184
+
1185
+ const path = addPath ( undefined , responseName , rootType . name ) ;
1186
+ const info = buildResolveInfo (
1187
+ exeContext ,
1188
+ fieldDef ,
1189
+ fieldNodes ,
1190
+ rootType ,
1191
+ path ,
1192
+ ) ;
1193
+
1194
+ try {
1195
+ // Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
1196
+ // It differs from "ResolveFieldValue" due to providing a different `resolveFn`.
1197
+
1198
+ // Build a JS object of arguments from the field.arguments AST, using the
1199
+ // variables scope to fulfill any variable references.
1200
+ const args = getArgumentValues ( fieldDef , fieldNodes [ 0 ] , variableValues ) ;
1201
+
1202
+ // The resolve function's optional third argument is a context value that
1203
+ // is provided to every resolve function within an execution. It is commonly
1204
+ // used to represent an authenticated user, or request-specific caches.
1205
+ const contextValue = exeContext . contextValue ;
1206
+
1207
+ // Call the `subscribe()` resolver or the default resolver to produce an
1208
+ // AsyncIterable yielding raw payloads.
1209
+ const resolveFn = fieldDef . subscribe ?? exeContext . subscribeFieldResolver ;
1210
+ const result = resolveFn ( rootValue , args , contextValue , info ) ;
1211
+
1212
+ if ( isPromise ( result ) ) {
1213
+ return result . then ( assertEventStream ) . then ( undefined , ( error ) => {
1214
+ throw locatedError ( error , fieldNodes , pathToArray ( path ) ) ;
1215
+ } ) ;
1216
+ }
1217
+
1218
+ return assertEventStream ( result ) ;
1219
+ } catch ( error ) {
1220
+ throw locatedError ( error , fieldNodes , pathToArray ( path ) ) ;
1221
+ }
1222
+ }
1223
+
1224
+ function assertEventStream ( result : unknown ) : AsyncIterable < unknown > {
1225
+ if ( result instanceof Error ) {
1226
+ throw result ;
1227
+ }
1228
+
1229
+ // Assert field returned an event stream, otherwise yield an error.
1230
+ if ( ! isAsyncIterable ( result ) ) {
1231
+ throw new GraphQLError (
1232
+ 'Subscription field must return Async Iterable. ' +
1233
+ `Received: ${ inspect ( result ) } .` ,
1234
+ ) ;
1235
+ }
1236
+
1237
+ return result ;
1238
+ }
0 commit comments