Skip to content

Commit 2c850a4

Browse files
ENGCOM-8074: Cron cleanup repeatedly hits deadlocks on large environments where groups can overlap #28007
- Merge Pull Request #28007 from driskell/magento2:Fix-cron-schedule-deadlock - Merged commits: 1. aa12503 2. b0b72b0 3. 315bd3a 4. 96029b5 5. 7df183b 6. 68d679b 7. a901619 8. dda5c72 9. 4d4fa68 10. e95d2fe 11. 916bc93 12. edd7753 13. e70027f 14. 92dc764 15. d1b22ff 16. a76482c
2 parents ad29452 + a76482c commit 2c850a4

File tree

7 files changed

+158
-62
lines changed

7 files changed

+158
-62
lines changed

app/code/Magento/Cron/Model/DeadlockRetrier.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,20 @@
1717
*/
1818
class DeadlockRetrier implements DeadlockRetrierInterface
1919
{
20+
/**
21+
* @var \Psr\Log\LoggerInterface
22+
*/
23+
private $logger;
24+
25+
/**
26+
* @param \Psr\Log\LoggerInterface $logger
27+
*/
28+
public function __construct(
29+
\Psr\Log\LoggerInterface $logger
30+
) {
31+
$this->logger = $logger;
32+
}
33+
2034
/**
2135
* @inheritdoc
2236
*/
@@ -30,6 +44,7 @@ public function execute(callable $callback, AdapterInterface $connection)
3044
try {
3145
return $callback();
3246
} catch (DeadlockException $e) {
47+
$this->logger->warning(sprintf("Deadlock detected in cron: %s", $e->getMessage()));
3348
continue;
3449
}
3550
}

app/code/Magento/Cron/Model/ResourceModel/Schedule.php

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,31 +65,47 @@ public function trySetJobStatusAtomic($scheduleId, $newStatus, $currentStatus)
6565
public function trySetJobUniqueStatusAtomic($scheduleId, $newStatus, $currentStatus)
6666
{
6767
$connection = $this->getConnection();
68+
$connection->beginTransaction();
6869

6970
// this condition added to avoid cron jobs locking after incorrect termination of running job
7071
$match = $connection->quoteInto(
7172
'existing.job_code = current.job_code ' .
72-
'AND (existing.executed_at > UTC_TIMESTAMP() - INTERVAL 1 DAY OR existing.executed_at IS NULL) ' .
73-
'AND existing.status = ?',
73+
'AND existing.status = ? ' .
74+
'AND (existing.executed_at > UTC_TIMESTAMP() - INTERVAL 1 DAY OR existing.executed_at IS NULL)',
7475
$newStatus
7576
);
7677

78+
// Select and lock all related schedules - this prevents deadlock in case cron overlaps and two jobs of
79+
// the same code attempt to lock at the same time, and force them to serialize
7780
$selectIfUnlocked = $connection->select()
81+
->from(
82+
['current' => $this->getTable('cron_schedule')],
83+
[]
84+
)
7885
->joinLeft(
7986
['existing' => $this->getTable('cron_schedule')],
8087
$match,
81-
['status' => new \Zend_Db_Expr($connection->quote($newStatus))]
88+
['existing.schedule_id']
8289
)
8390
->where('current.schedule_id = ?', $scheduleId)
8491
->where('current.status = ?', $currentStatus)
85-
->where('existing.schedule_id IS NULL');
86-
87-
$update = $connection->updateFromSelect($selectIfUnlocked, ['current' => $this->getTable('cron_schedule')]);
88-
$result = $connection->query($update)->rowCount();
92+
->forUpdate(true);
8993

90-
if ($result == 1) {
91-
return true;
94+
$scheduleId = $connection->fetchOne($selectIfUnlocked);
95+
if (!empty($scheduleId)) {
96+
// Existing running schedule found
97+
$connection->commit();
98+
return false;
9299
}
93-
return false;
100+
101+
// Mark our schedule as running
102+
$connection->update(
103+
$this->getTable('cron_schedule'),
104+
['status' => new \Zend_Db_Expr($connection->quote($newStatus))],
105+
['schedule_id = ?' => $scheduleId]
106+
);
107+
108+
$connection->commit();
109+
return true;
94110
}
95111
}

app/code/Magento/Cron/Observer/ProcessCronQueueObserver.php

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
*/
1010
namespace Magento\Cron\Observer;
1111

12+
use Magento\Cron\Model\ResourceModel\Schedule\Collection as ScheduleCollection;
1213
use Magento\Cron\Model\Schedule;
1314
use Magento\Framework\App\State;
1415
use Magento\Framework\Console\Cli;
@@ -83,7 +84,7 @@ class ProcessCronQueueObserver implements ObserverInterface
8384
const MAX_RETRIES = 5;
8485

8586
/**
86-
* @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
87+
* @var ScheduleCollection
8788
*/
8889
protected $_pendingSchedules;
8990

@@ -278,12 +279,12 @@ function ($groupId) use ($currentTime) {
278279
*
279280
* It should be taken by standalone (child) process, not by the parent process.
280281
*
281-
* @param int $groupId
282+
* @param string $groupId
282283
* @param callable $callback
283284
*
284285
* @return void
285286
*/
286-
private function lockGroup($groupId, callable $callback)
287+
private function lockGroup(string $groupId, callable $callback): void
287288
{
288289
if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) {
289290
$this->logger->warning(
@@ -399,7 +400,7 @@ function () use ($schedule) {
399400
* @param string $jobName
400401
* @return void
401402
*/
402-
private function startProfiling(string $jobName = '')
403+
private function startProfiling(string $jobName = ''): void
403404
{
404405
$this->statProfiler->clear();
405406
$this->statProfiler->start(
@@ -416,7 +417,7 @@ private function startProfiling(string $jobName = '')
416417
* @param string $jobName
417418
* @return void
418419
*/
419-
private function stopProfiling(string $jobName = '')
420+
private function stopProfiling(string $jobName = ''): void
420421
{
421422
$this->statProfiler->stop(
422423
sprintf(self::CRON_TIMERID, $jobName),
@@ -445,9 +446,9 @@ private function getProfilingStat(string $jobName): string
445446
* Return job collection from data base with status 'pending'.
446447
*
447448
* @param string $groupId
448-
* @return \Magento\Cron\Model\ResourceModel\Schedule\Collection
449+
* @return ScheduleCollection
449450
*/
450-
private function getPendingSchedules($groupId)
451+
private function getPendingSchedules(string $groupId): ScheduleCollection
451452
{
452453
$jobs = $this->_config->getJobs();
453454
$pendingJobs = $this->_scheduleFactory->create()->getCollection();
@@ -462,7 +463,7 @@ private function getPendingSchedules($groupId)
462463
* @param string $groupId
463464
* @return $this
464465
*/
465-
private function generateSchedules($groupId)
466+
private function generateSchedules(string $groupId): self
466467
{
467468
/**
468469
* check if schedule generation is needed
@@ -533,13 +534,13 @@ protected function _generateJobs($jobs, $exists, $groupId)
533534
* @param int $currentTime
534535
* @return void
535536
*/
536-
private function cleanupJobs($groupId, $currentTime)
537+
private function cleanupJobs(string $groupId, int $currentTime): void
537538
{
538539
// check if history cleanup is needed
539540
$lastCleanup = (int)$this->_cache->load(self::CACHE_KEY_LAST_HISTORY_CLEANUP_AT . $groupId);
540541
$historyCleanUp = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_CLEANUP_EVERY);
541542
if ($lastCleanup > $this->dateTime->gmtTimestamp() - $historyCleanUp * self::SECONDS_IN_MINUTE) {
542-
return $this;
543+
return;
543544
}
544545
// save time history cleanup was ran with no expiration
545546
$this->_cache->save(
@@ -550,6 +551,7 @@ private function cleanupJobs($groupId, $currentTime)
550551
);
551552

552553
$this->cleanupDisabledJobs($groupId);
554+
$this->cleanupRunningJobs($groupId);
553555

554556
$historySuccess = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_SUCCESS);
555557
$historyFailure = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_FAILURE);
@@ -673,7 +675,7 @@ protected function getScheduleTimeInterval($groupId)
673675
* @param string $groupId
674676
* @return void
675677
*/
676-
private function cleanupDisabledJobs($groupId)
678+
private function cleanupDisabledJobs(string $groupId): void
677679
{
678680
$jobs = $this->_config->getJobs();
679681
$jobsToCleanup = [];
@@ -696,6 +698,33 @@ private function cleanupDisabledJobs($groupId)
696698
}
697699
}
698700

701+
/**
702+
* Cleanup jobs that were left in a running state due to an unexpected stop
703+
*
704+
* @param string $groupId
705+
* @return void
706+
*/
707+
private function cleanupRunningJobs(string $groupId): void
708+
{
709+
$scheduleResource = $this->_scheduleFactory->create()->getResource();
710+
$connection = $scheduleResource->getConnection();
711+
712+
$jobs = $this->_config->getJobs();
713+
714+
$connection->update(
715+
$scheduleResource->getTable('cron_schedule'),
716+
[
717+
'status' => \Magento\Cron\Model\Schedule::STATUS_ERROR,
718+
'messages' => 'Time out'
719+
],
720+
[
721+
$connection->quoteInto('status = ?', \Magento\Cron\Model\Schedule::STATUS_RUNNING),
722+
$connection->quoteInto('job_code IN (?)', array_keys($jobs[$groupId])),
723+
'scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY'
724+
]
725+
);
726+
}
727+
699728
/**
700729
* Get cron expression of cron job.
701730
*
@@ -773,13 +802,13 @@ private function isGroupInFilter($groupId): bool
773802
* @param array $jobsRoot
774803
* @param int $currentTime
775804
*/
776-
private function processPendingJobs($groupId, $jobsRoot, $currentTime)
805+
private function processPendingJobs(string $groupId, array $jobsRoot, int $currentTime): void
777806
{
778-
$procesedJobs = [];
807+
$processedJobs = [];
779808
$pendingJobs = $this->getPendingSchedules($groupId);
780809
/** @var Schedule $schedule */
781810
foreach ($pendingJobs as $schedule) {
782-
if (isset($procesedJobs[$schedule->getJobCode()])) {
811+
if (isset($processedJobs[$schedule->getJobCode()])) {
783812
// process only on job per run
784813
continue;
785814
}
@@ -796,7 +825,7 @@ private function processPendingJobs($groupId, $jobsRoot, $currentTime)
796825
$this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
797826

798827
if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
799-
$procesedJobs[$schedule->getJobCode()] = true;
828+
$processedJobs[$schedule->getJobCode()] = true;
800829
}
801830

802831
$this->retrier->execute(
@@ -821,7 +850,7 @@ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule,
821850
{
822851
// use sha1 to limit length
823852
// phpcs:ignore Magento2.Security.InsecureFunction
824-
$lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
853+
$lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
825854

826855
try {
827856
for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {

app/code/Magento/Cron/Test/Unit/Model/DeadlockRetrierTest.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use Magento\Framework\DB\Adapter\AdapterInterface;
1414
use Magento\Framework\DB\Adapter\DeadlockException;
1515
use PHPUnit\Framework\MockObject\MockObject;
16+
use Psr\Log\LoggerInterface;
1617

1718
class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase
1819
{
@@ -27,6 +28,11 @@ class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase
2728
*/
2829
private $adapterMock;
2930

31+
/**
32+
* @var LoggerInterface|MockObject
33+
*/
34+
private $loggerMock;
35+
3036
/**
3137
* @var AbstractModel|MockObject
3238
*/
@@ -38,8 +44,9 @@ class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase
3844
protected function setUp(): void
3945
{
4046
$this->adapterMock = $this->getMockForAbstractClass(AdapterInterface::class);
47+
$this->loggerMock = $this->getMockForAbstractClass(LoggerInterface::class);
4148
$this->modelMock = $this->createMock(AbstractModel::class);
42-
$this->retrier = new DeadlockRetrier();
49+
$this->retrier = new DeadlockRetrier($this->loggerMock);
4350
}
4451

4552
/**
@@ -75,6 +82,8 @@ public function testRetry(): void
7582
$this->modelMock->expects($this->exactly(DeadlockRetrierInterface::MAX_RETRIES))
7683
->method('getId')
7784
->willThrowException(new DeadlockException());
85+
$this->loggerMock->expects($this->exactly(DeadlockRetrierInterface::MAX_RETRIES - 1))
86+
->method('warning');
7887

7988
$this->retrier->execute(
8089
function () {
@@ -95,6 +104,8 @@ public function testRetrySecond(): void
95104
$this->modelMock->expects($this->at(1))
96105
->method('getId')
97106
->willReturn(2);
107+
$this->loggerMock->expects($this->once())
108+
->method('warning');
98109

99110
$this->retrier->execute(
100111
function () {

0 commit comments

Comments
 (0)