24
24
25
25
class CloudTasksQueue extends LaravelQueue implements QueueContract
26
26
{
27
- private Closure |array $ headers = [];
27
+ private static ?Closure $ handlerUrlCallback = null ;
28
+ private static ?Closure $ taskHeadersCallback = null ;
28
29
29
30
public function __construct (public array $ config , public CloudTasksClient $ client , public $ dispatchAfterCommit = false )
30
31
{
31
32
//
32
33
}
34
+
35
+ public static function configureHandlerUrlUsing (Closure $ callback ): void
36
+ {
37
+ static ::$ handlerUrlCallback = $ callback ;
38
+ }
39
+
40
+ public static function forgetHandlerUrlCallback (): void
41
+ {
42
+ self ::$ handlerUrlCallback = null ;
43
+ }
44
+
45
+ public static function setTaskHeadersUsing (Closure $ callback ): void
46
+ {
47
+ static ::$ taskHeadersCallback = $ callback ;
48
+ }
49
+
50
+ public static function forgetTaskHeadersCallback (): void
51
+ {
52
+ self ::$ taskHeadersCallback = null ;
53
+ }
33
54
34
55
/**
35
56
* Get the size of the queue.
@@ -57,8 +78,8 @@ public function push($job, $data = '', $queue = null)
57
78
$ this ->createPayload ($ job , $ queue , $ data ),
58
79
$ queue ,
59
80
null ,
60
- function ($ payload , $ queue ) {
61
- return $ this ->pushRaw ($ payload , $ queue );
81
+ function ($ payload , $ queue ) use ( $ job ) {
82
+ return $ this ->pushRaw ($ payload , $ queue, [ ' job ' => $ job ] );
62
83
}
63
84
);
64
85
}
@@ -73,8 +94,9 @@ function ($payload, $queue) {
73
94
public function pushRaw ($ payload , $ queue = null , array $ options = [])
74
95
{
75
96
$ delay = ! empty ($ options ['delay ' ]) ? $ options ['delay ' ] : 0 ;
97
+ $ job = $ options ['job ' ] ?? null ;
76
98
77
- return $ this ->pushToCloudTasks ($ queue , $ payload , $ delay );
99
+ return $ this ->pushToCloudTasks ($ queue , $ payload , $ delay, $ job );
78
100
}
79
101
80
102
/**
@@ -93,8 +115,8 @@ public function later($delay, $job, $data = '', $queue = null)
93
115
$ this ->createPayload ($ job , $ queue , $ data ),
94
116
$ queue ,
95
117
$ delay ,
96
- function ($ payload , $ queue , $ delay ) {
97
- return $ this ->pushToCloudTasks ($ queue , $ payload , $ delay );
118
+ function ($ payload , $ queue , $ delay ) use ( $ job ) {
119
+ return $ this ->pushToCloudTasks ($ queue , $ payload , $ delay, $ job );
98
120
}
99
121
);
100
122
}
@@ -105,9 +127,10 @@ function ($payload, $queue, $delay) {
105
127
* @param string|null $queue
106
128
* @param string $payload
107
129
* @param \DateTimeInterface|\DateInterval|int $delay
130
+ * @param string|object $job
108
131
* @return string
109
132
*/
110
- protected function pushToCloudTasks ($ queue , $ payload , $ delay = 0 )
133
+ protected function pushToCloudTasks ($ queue , $ payload , $ delay, mixed $ job )
111
134
{
112
135
$ queue = $ queue ?: $ this ->config ['queue ' ];
113
136
@@ -122,7 +145,7 @@ protected function pushToCloudTasks($queue, $payload, $delay = 0)
122
145
connectionName: $ this ->getConnectionName (),
123
146
);
124
147
125
- $ this ->addPayloadToTask ($ payload , $ task );
148
+ $ this ->addPayloadToTask ($ payload , $ task, $ job );
126
149
127
150
// The deadline for requests sent to the app. If the app does not respond by
128
151
// this deadline then the request is cancelled and the attempt is marked as
@@ -173,9 +196,10 @@ private function enrichPayloadWithInternalData(
173
196
return $ payload ;
174
197
}
175
198
176
- public function addPayloadToTask (array $ payload , Task $ task ): Task
199
+ /** @param string|object $job */
200
+ public function addPayloadToTask (array $ payload , Task $ task , mixed $ job ): Task
177
201
{
178
- $ headers = value ( $ this ->headers , $ payload ) ?: [] ;
202
+ $ headers = $ this ->headers ( $ payload );
179
203
180
204
if (! empty ($ this ->config ['app_engine ' ])) {
181
205
$ path = \Safe \parse_url (route ('cloud-tasks.handle-task ' ), PHP_URL_PATH );
@@ -195,7 +219,7 @@ public function addPayloadToTask(array $payload, Task $task): Task
195
219
$ task ->setAppEngineHttpRequest ($ appEngineRequest );
196
220
} else {
197
221
$ httpRequest = new HttpRequest ();
198
- $ httpRequest ->setUrl ($ this ->getHandler ());
222
+ $ httpRequest ->setUrl ($ this ->getHandler ($ job ));
199
223
$ httpRequest ->setBody (json_encode ($ payload ));
200
224
$ httpRequest ->setHttpMethod (HttpMethod::POST );
201
225
$ httpRequest ->setHeaders ($ headers );
@@ -225,12 +249,17 @@ public function release(CloudTasksJob $job, int $delay = 0): void
225
249
$ this ->pushRaw (
226
250
payload: $ job ->getRawBody (),
227
251
queue: $ job ->getQueue (),
228
- options: ['delay ' => $ delay]
252
+ options: ['delay ' => $ delay, ' job ' => $ job ],
229
253
);
230
254
}
231
255
232
- public function getHandler (): string
256
+ /** @param string|object $job */
257
+ public function getHandler (mixed $ job ): string
233
258
{
259
+ if (static ::$ handlerUrlCallback ) {
260
+ return (static ::$ handlerUrlCallback )($ job );
261
+ }
262
+
234
263
if (empty ($ this ->config ['handler ' ])) {
235
264
$ this ->config ['handler ' ] = request ()->getSchemeAndHttpHost ();
236
265
}
@@ -244,8 +273,16 @@ public function getHandler(): string
244
273
return $ handler .'/ ' .config ('cloud-tasks.uri ' );
245
274
}
246
275
247
- public function setTaskHeaders (Closure |array $ headers ): void
276
+ /**
277
+ * @param array<string, mixed> $payload
278
+ * @return array<string, mixed>
279
+ */
280
+ private function headers (mixed $ payload ): array
248
281
{
249
- $ this ->headers = $ headers ;
282
+ if (!static ::$ taskHeadersCallback ) {
283
+ return [];
284
+ }
285
+
286
+ return (static ::$ taskHeadersCallback )($ payload );
250
287
}
251
288
}
0 commit comments