Skip to content

Dispatch after commit & request validation #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 54 additions & 17 deletions src/CloudTasksQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ public function size($queue = null)
return 0;
}

/**
* Fallback method for Laravel 6x and 7x
*
* @param \Closure|string|object $job
* @param string $payload
* @param string $queue
* @param \DateTimeInterface|\DateInterval|int|null $delay
* @param callable $callback
* @return mixed
*/
protected function enqueueUsing($job, $payload, $queue, $delay, $callback)
{
if (method_exists(parent::class, 'enqueueUsing')) {
return parent::enqueueUsing($job, $payload, $queue, $delay, $callback);
}

return $callback($payload, $queue, $delay);
}

/**
* Push a new job onto the queue.
*
Expand All @@ -52,9 +71,15 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
$this->pushToCloudTasks($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
));
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushRaw($payload, $queue);
}
);
}

/**
Expand All @@ -63,11 +88,11 @@ public function push($job, $data = '', $queue = null)
* @param string $payload
* @param string|null $queue
* @param array $options
* @return void
* @return string
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$this->pushToCloudTasks($queue, $payload);
return $this->pushToCloudTasks($queue, $payload);
}

/**
Expand All @@ -81,9 +106,15 @@ public function pushRaw($payload, $queue = null, array $options = [])
*/
public function later($delay, $job, $data = '', $queue = null)
{
$this->pushToCloudTasks($queue, $this->createPayload(
$job, $this->getQueue($queue), $data
), $delay);
return $this->enqueueUsing(
$job,
$this->createPayload($job, $this->getQueue($queue), $data),
$queue,
$delay,
function ($payload, $queue, $delay) {
return $this->pushToCloudTasks($queue, $payload, $delay);
}
);
}

/**
Expand All @@ -92,7 +123,7 @@ public function later($delay, $job, $data = '', $queue = null)
* @param string|null $queue
* @param string $payload
* @param \DateTimeInterface|\DateInterval|int $delay
* @return void
* @return string
*/
protected function pushToCloudTasks($queue, $payload, $delay = 0)
{
Expand All @@ -103,12 +134,13 @@ protected function pushToCloudTasks($queue, $payload, $delay = 0)
$httpRequest = $this->createHttpRequest();
$httpRequest->setUrl($this->getHandler());
$httpRequest->setHttpMethod(HttpMethod::POST);
$httpRequest->setBody(
// Laravel 7+ jobs have a uuid, but Laravel 6 doesn't have it.
// Since we are using and expecting the uuid in some places
// we will add it manually here if it's not present yet.
$this->withUuid($payload)
);

// Laravel 7+ jobs have a uuid, but Laravel 6 doesn't have it.
// Since we are using and expecting the uuid in some places
// we will add it manually here if it's not present yet.
[$payload, $uuid] = $this->withUuid($payload);

$httpRequest->setBody($payload);

$task = $this->createTask();
$task->setHttpRequest($httpRequest);
Expand All @@ -128,9 +160,11 @@ protected function pushToCloudTasks($queue, $payload, $delay = 0)
$createdTask = CloudTasksApi::createTask($queueName, $task);

event((new TaskCreated)->queue($queue)->task($task));

return $uuid;
}

private function withUuid(string $payload): string
private function withUuid(string $payload): array
{
/** @var array $decoded */
$decoded = json_decode($payload, true);
Expand All @@ -139,7 +173,10 @@ private function withUuid(string $payload): string
$decoded['uuid'] = (string) Str::uuid();
}

return json_encode($decoded);
return [
json_encode($decoded),
$decoded['uuid'],
];
}

/**
Expand Down
67 changes: 45 additions & 22 deletions src/TaskHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
use Illuminate\Queue\Jobs\Job;
use Illuminate\Queue\WorkerOptions;
use Illuminate\Support\Str;
use Illuminate\Validation\ValidationException;
use Safe\Exceptions\JsonException;
use stdClass;
use UnexpectedValueException;
use function Safe\json_decode;
Expand Down Expand Up @@ -40,9 +42,9 @@ public function __construct(CloudTasksClient $client)
$this->client = $client;
}

public function handle(?array $task = null): void
public function handle(?string $task = null): void
{
$task = $task ?: $this->captureTask();
$task = $this->captureTask($task);

$this->loadQueueConnectionConfiguration($task);

Expand All @@ -53,6 +55,47 @@ public function handle(?array $task = null): void
$this->handleTask($task);
}

/**
* @param string|array|null $task
* @return array
* @throws JsonException
*/
private function captureTask($task): array
{
$task = $task ?: (string) (request()->getContent());

try {
$array = json_decode($task, true);
} catch (JsonException $e) {
$array = [];
}

$validator = validator([
'json' => $task,
'task' => $array,
'name_header' => request()->header('X-CloudTasks-Taskname'),
'retry_count_header' => request()->header('X-CloudTasks-TaskRetryCount'),
], [
'json' => 'required|json',
'task' => 'required|array',
'task.data' => 'required|array',
'name_header' => 'required|string',
'retry_count_header' => 'required|numeric',
]);

try {
$validator->validate();
} catch (ValidationException $e) {
if (config('app.debug')) {
throw $e;
} else {
abort(404);
}
}

return json_decode($task, true);
}

private function loadQueueConnectionConfiguration(array $task): void
{
/**
Expand All @@ -71,26 +114,6 @@ private function setQueue(): void
$this->queue = new CloudTasksQueue($this->config, $this->client);
}

/**
* @throws CloudTasksException
*/
private function captureTask(): array
{
$input = (string) (request()->getContent());

if (!$input) {
throw new CloudTasksException('Could not read incoming task');
}

$task = json_decode($input, true);

if (!is_array($task)) {
throw new CloudTasksException('Could not decode incoming task');
}

return $task;
}

private function handleTask(array $task): void
{
$job = new CloudTasksJob($task, $this->queue);
Expand Down
2 changes: 1 addition & 1 deletion tests/CloudTasksDashboardTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public function when_a_job_is_dispatched_it_will_be_added_to_the_dashboard()
'name' => SimpleJob::class,
]);
$payload = \Safe\json_decode($task->getMetadata()['payload'], true);
$this->assertSame($payload, $job->payload);
$this->assertSame($payload, $job->payloadAsArray);
}

/**
Expand Down
27 changes: 27 additions & 0 deletions tests/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

use Google\Cloud\Tasks\V2\HttpMethod;
use Google\Cloud\Tasks\V2\Task;
use Illuminate\Queue\Events\JobQueued;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Event;
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi;
use Stackkit\LaravelGoogleCloudTasksQueue\TaskHandler;
use Tests\Support\FailingJob;
Expand Down Expand Up @@ -154,4 +157,28 @@ public function it_posts_the_task_the_correct_queue()
&& $queueName === 'projects/my-test-project/locations/europe-west6/queues/my-special-queue';
});
}

/**
* @test
*/
public function it_can_dispatch_after_commit()
{
if (version_compare(app()->version(), '8.0.0', '<')) {
$this->markTestSkipped('Not supported by Laravel 7.x and below.');
}

// Arrange
CloudTasksApi::fake();
Event::fake();

// Act & Assert
Event::assertNotDispatched(JobQueued::class);
DB::beginTransaction();
SimpleJob::dispatch()->afterCommit();
Event::assertNotDispatched(JobQueued::class);
DB::commit();
Event::assertDispatched(JobQueued::class, function (JobQueued $event) {
return $event->job instanceof SimpleJob;
});
}
}
Loading