Skip to content

Add an option to delay job execution when adding to the queue #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion docs/running-queues.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ This will cause command to check for the new jobs every 10 seconds if the queue

### With CRON

Using queues with CRON is more challenging, but definitely doable. You can use command like this:
Using queues with CRON is more challenging but definitely doable. You can use command like this:

php spark queue:work emails -max-jobs 20 --stop-when-empty

Expand Down Expand Up @@ -63,6 +63,22 @@ But we can also run the worker like this:

This way, worker will consume jobs with the `low` priority and then with `high`. The order set in the config file is override.

### Delaying jobs

Normally, when we add jobs to a queue, they are run in the order in which we added them to the queue (FIFO - first in, first out).
Of course, there are also priorities, which we described in the previous section. But what about the scenario where we want to run a job, but not earlier than in 5 minutes?

This is where job delay comes into play. We measure the delay in seconds.

```php
// This job will be run not sooner than in 5 minutes
service('queue')->setDelay(5 * MINUTE)->push('emails', 'email', ['message' => 'Email sent no sooner than 5 minutes from now']);
```

Note that there is no guarantee that the job will run exactly in 5 minutes. If many new jobs are added to the queue (without a delay), it may take a long time before the delayed job is actually executed.

We can also combine delayed jobs with priorities.

### Running many instances of the same queue

As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases as long as you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver, this setting is not relevant as it provides atomicity without the need for explicit concurrency control.
Expand Down
2 changes: 1 addition & 1 deletion phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
stopOnFailure="false"
stopOnIncomplete="false"
stopOnSkipped="false"
cacheDirectory=".phpunit.cache"
cacheDirectory="build/.phpunit.cache"
beStrictAboutCoverageMetadata="true">

<coverage includeUncoveredFiles="true">
Expand Down
5 changes: 5 additions & 0 deletions src/Exceptions/QueueException.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public static function forIncorrectQueuePriority(string $priority, string $queue
{
return new self(lang('Queue.incorrectQueuePriority', [$priority, $queue]));
}

public static function forIncorrectDelayValue(): static
{
return new self(lang('Queue.incorrectDelayValue'));
}
}
15 changes: 15 additions & 0 deletions src/Handlers/BaseHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ abstract class BaseHandler
{
protected QueueConfig $config;
protected ?string $priority = null;
protected ?int $delay = null;

abstract public function name(): string;

Expand Down Expand Up @@ -62,6 +63,20 @@ public function setPriority(string $priority): static
return $this;
}

/**
* Set delay for job queue (in seconds).
*/
public function setDelay(int $delay): static
{
if ($delay < 0) {
throw QueueException::forIncorrectDelayValue();
}

$this->delay = $delay;

return $this;
}

/**
* Retry failed job.
*
Expand Down
4 changes: 2 additions & 2 deletions src/Handlers/DatabaseHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public function push(string $queue, string $job, array $data): bool
'priority' => $this->priority,
'status' => Status::PENDING->value,
'attempts' => 0,
'available_at' => Time::now(),
'available_at' => Time::now()->addSeconds($this->delay ?? 0),
]);

$this->priority = null;
$this->priority = $this->delay = null;

return $this->jobModel->insert($queueJob, false);
}
Expand Down
8 changes: 5 additions & 3 deletions src/Handlers/PredisHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,21 @@ public function push(string $queue, string $job, array $data): bool

helper('text');

$availableAt = Time::now()->addSeconds($this->delay ?? 0);

$queueJob = new QueueJob([
'id' => random_string('numeric', 16),
'queue' => $queue,
'payload' => new Payload($job, $data),
'priority' => $this->priority,
'status' => Status::PENDING->value,
'attempts' => 0,
'available_at' => Time::now(),
'available_at' => $availableAt,
]);

$result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => Time::now()->timestamp]);
$result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => $availableAt->timestamp]);

$this->priority = null;
$this->priority = $this->delay = null;

return $result > 0;
}
Expand Down
8 changes: 5 additions & 3 deletions src/Handlers/RedisHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,21 @@ public function push(string $queue, string $job, array $data): bool

helper('text');

$availableAt = Time::now()->addSeconds($this->delay ?? 0);

$queueJob = new QueueJob([
'id' => random_string('numeric', 16),
'queue' => $queue,
'payload' => new Payload($job, $data),
'priority' => $this->priority,
'status' => Status::PENDING->value,
'attempts' => 0,
'available_at' => Time::now(),
'available_at' => $availableAt,
]);

$result = (int) $this->redis->zAdd("queues:{$queue}:{$this->priority}", Time::now()->timestamp, json_encode($queueJob));
$result = (int) $this->redis->zAdd("queues:{$queue}:{$this->priority}", $availableAt->timestamp, json_encode($queueJob));

$this->priority = null;
$this->priority = $this->delay = null;

return $result > 0;
}
Expand Down
1 change: 1 addition & 0 deletions src/Language/en/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@
'incorrectPriorityFormat' => 'The priority name should consists only lowercase letters.',
'tooLongPriorityName' => 'The priority name is too long. It should be no longer than 64 letters.',
'incorrectQueuePriority' => 'This queue has incorrectly defined priority: "{0}" for the queue: "{1}".',
'incorrectDelayValue' => 'The number of seconds of delay must be a positive integer.',
];
35 changes: 35 additions & 0 deletions tests/DatabaseHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ public function testPushWithPriority(): void
]);
}

/**
* @throws ReflectionException
*/
public function testPushAndPopWithPriority(): void
{
Time::setTestNow('2023-12-29 14:15:16');
Expand Down Expand Up @@ -148,6 +151,38 @@ public function testPushAndPopWithPriority(): void
$this->assertSame($payload, $result->payload);
}

/**
* @throws Exception
*/
public function testPushWithDelay(): void
{
Time::setTestNow('2023-12-29 14:15:16');

$handler = new DatabaseHandler($this->config);
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key' => 'value']);

$this->assertTrue($result);

$availableAt = 1703859376;

$this->seeInDatabase('queue_jobs', [
'queue' => 'queue-delay',
'payload' => json_encode(['job' => 'success', 'data' => ['key' => 'value']]),
'available_at' => $availableAt,
]);

$this->assertEqualsWithDelta(MINUTE, $availableAt - Time::now()->getTimestamp(), 1);
}

public function testPushWithDelayException(): void
{
$this->expectException(QueueException::class);
$this->expectExceptionMessage('The number of seconds of delay must be a positive integer.');

$handler = new DatabaseHandler($this->config);
$handler->setDelay(-60);
}

/**
* @throws ReflectionException
*/
Expand Down
21 changes: 21 additions & 0 deletions tests/PredisHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,27 @@ public function testPushWithPriority(): void
$this->assertSame(['key' => 'value'], $queueJob->payload['data']);
}

/**
* @throws ReflectionException
*/
public function testPushWithDelay(): void
{
Time::setTestNow('2023-12-29 14:15:16');

$handler = new PredisHandler($this->config);
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key' => 'value']);

$this->assertTrue($result);

$predis = self::getPrivateProperty($handler, 'predis');
$this->assertSame(1, $predis->zcard('queues:queue-delay:default'));

$task = $predis->zrangebyscore('queues:queue-delay:default', '-inf', Time::now()->addSeconds(MINUTE)->timestamp, ['limit' => [0, 1]]);
$queueJob = new QueueJob(json_decode((string) $task[0], true));
$this->assertSame('success', $queueJob->payload['job']);
$this->assertSame(['key' => 'value'], $queueJob->payload['data']);
}

public function testPushException(): void
{
$this->expectException(QueueException::class);
Expand Down
103 changes: 103 additions & 0 deletions tests/PushAndPopWithDelayTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?php

declare(strict_types=1);

/**
* This file is part of CodeIgniter Queue.
*
* (c) CodeIgniter Foundation <[email protected]>
*
* For the full copyright and license information, please view
* the LICENSE file that was distributed with this source code.
*/

namespace Tests;

use CodeIgniter\I18n\Time;
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Test\ReflectionHelper;
use PHPUnit\Framework\Attributes\DataProvider;
use Tests\Support\Config\Queue as QueueConfig;
use Tests\Support\Database\Seeds\TestDatabaseQueueSeeder;
use Tests\Support\TestCase;

/**
* @internal
*/
final class PushAndPopWithDelayTest extends TestCase
{
use ReflectionHelper;

protected $seed = TestDatabaseQueueSeeder::class;
private QueueConfig $config;

protected function setUp(): void
{
parent::setUp();

$this->config = config(QueueConfig::class);
}

public static function handlerProvider(): iterable
{
return [
[
'database', // name
'CodeIgniter\Queue\Handlers\DatabaseHandler', // class
],
[
'redis',
'CodeIgniter\Queue\Handlers\RedisHandler',
],
[
'predis',
'CodeIgniter\Queue\Handlers\PredisHandler',
],
];
}

#[DataProvider('handlerProvider')]
public function testPushAndPopWithDelay(string $name, string $class): void
{
Time::setTestNow('2023-12-29 14:15:16');

$handler = new $class($this->config);
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key1' => 'value1']);

$this->assertTrue($result);

$result = $handler->push('queue-delay', 'success', ['key2' => 'value2']);

$this->assertTrue($result);

if ($name === 'database') {
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue-delay',
'payload' => json_encode(['job' => 'success', 'data' => ['key1' => 'value1']]),
'available_at' => 1703859376,
]);

$this->seeInDatabase('queue_jobs', [
'queue' => 'queue-delay',
'payload' => json_encode(['job' => 'success', 'data' => ['key2' => 'value2']]),
'available_at' => 1703859316,
]);
}

$result = $handler->pop('queue-delay', ['default']);
$this->assertInstanceOf(QueueJob::class, $result);
$payload = ['job' => 'success', 'data' => ['key2' => 'value2']];
$this->assertSame($payload, $result->payload);

$result = $handler->pop('queue-delay', ['default']);
$this->assertNull($result);

// add 1 minute
Time::setTestNow('2023-12-29 14:16:16');

$result = $handler->pop('queue-delay', ['default']);
$this->assertInstanceOf(QueueJob::class, $result);
$payload = ['job' => 'success', 'data' => ['key1' => 'value1']];
$this->assertSame($payload, $result->payload);
}
}
22 changes: 22 additions & 0 deletions tests/RedisHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use CodeIgniter\Queue\Handlers\RedisHandler;
use CodeIgniter\Test\ReflectionHelper;
use Exception;
use ReflectionException;
use Tests\Support\Config\Queue as QueueConfig;
use Tests\Support\Database\Seeds\TestRedisQueueSeeder;
use Tests\Support\TestCase;
Expand Down Expand Up @@ -95,6 +96,27 @@ public function testPushWithPriority(): void
$this->assertSame(['key' => 'value'], $queueJob->payload['data']);
}

/**
* @throws ReflectionException
*/
public function testPushWithDelay(): void
{
Time::setTestNow('2023-12-29 14:15:16');

$handler = new RedisHandler($this->config);
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key' => 'value']);

$this->assertTrue($result);

$redis = self::getPrivateProperty($handler, 'redis');
$this->assertSame(1, $redis->zCard('queues:queue-delay:default'));

$task = $redis->zRangeByScore('queues:queue-delay:default', '-inf', Time::now()->addSeconds(MINUTE)->timestamp, ['limit' => [0, 1]]);
$queueJob = new QueueJob(json_decode((string) $task[0], true));
$this->assertSame('success', $queueJob->payload['job']);
$this->assertSame(['key' => 'value'], $queueJob->payload['data']);
}

public function testPushException(): void
{
$this->expectException(QueueException::class);
Expand Down
Loading