Skip to content

Cron cleanup repeatedly hits deadlocks on large environments where groups can overlap #28007

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions app/code/Magento/Cron/Model/DeadlockRetrier.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@
*/
class DeadlockRetrier implements DeadlockRetrierInterface
{
/**
* @var \Psr\Log\LoggerInterface
*/
private $logger;

/**
* @param \Psr\Log\LoggerInterface $logger
*/
public function __construct(
\Psr\Log\LoggerInterface $logger
) {
$this->logger = $logger;
}

/**
* @inheritdoc
*/
Expand All @@ -30,6 +44,7 @@ public function execute(callable $callback, AdapterInterface $connection)
try {
return $callback();
} catch (DeadlockException $e) {
$this->logger->warning(sprintf("Deadlock detected in cron: %s", $e->getMessage()));
continue;
}
}
Expand Down
36 changes: 26 additions & 10 deletions app/code/Magento/Cron/Model/ResourceModel/Schedule.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,31 +65,47 @@ public function trySetJobStatusAtomic($scheduleId, $newStatus, $currentStatus)
public function trySetJobUniqueStatusAtomic($scheduleId, $newStatus, $currentStatus)
{
$connection = $this->getConnection();
$connection->beginTransaction();

// this condition added to avoid cron jobs locking after incorrect termination of running job
$match = $connection->quoteInto(
'existing.job_code = current.job_code ' .
'AND (existing.executed_at > UTC_TIMESTAMP() - INTERVAL 1 DAY OR existing.executed_at IS NULL) ' .
'AND existing.status = ?',
'AND existing.status = ? ' .
'AND (existing.executed_at > UTC_TIMESTAMP() - INTERVAL 1 DAY OR existing.executed_at IS NULL)',
$newStatus
);

// Select and lock all related schedules - this prevents deadlock in case cron overlaps and two jobs of
// the same code attempt to lock at the same time, and force them to serialize
$selectIfUnlocked = $connection->select()
->from(
['current' => $this->getTable('cron_schedule')],
[]
)
->joinLeft(
['existing' => $this->getTable('cron_schedule')],
$match,
['status' => new \Zend_Db_Expr($connection->quote($newStatus))]
['existing.schedule_id']
)
->where('current.schedule_id = ?', $scheduleId)
->where('current.status = ?', $currentStatus)
->where('existing.schedule_id IS NULL');

$update = $connection->updateFromSelect($selectIfUnlocked, ['current' => $this->getTable('cron_schedule')]);
$result = $connection->query($update)->rowCount();
->forUpdate(true);

if ($result == 1) {
return true;
$scheduleId = $connection->fetchOne($selectIfUnlocked);
if (!empty($scheduleId)) {
// Existing running schedule found
$connection->commit();
return false;
}
return false;

// Mark our schedule as running
$connection->update(
$this->getTable('cron_schedule'),
['status' => new \Zend_Db_Expr($connection->quote($newStatus))],
['schedule_id = ?' => $scheduleId]
);

$connection->commit();
return true;
}
}
61 changes: 45 additions & 16 deletions app/code/Magento/Cron/Observer/ProcessCronQueueObserver.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
namespace Magento\Cron\Observer;

use Magento\Cron\Model\ResourceModel\Schedule\Collection as ScheduleCollection;
use Magento\Cron\Model\Schedule;
use Magento\Framework\App\State;
use Magento\Framework\Console\Cli;
Expand Down Expand Up @@ -83,7 +84,7 @@ class ProcessCronQueueObserver implements ObserverInterface
const MAX_RETRIES = 5;

/**
* @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
* @var ScheduleCollection
*/
protected $_pendingSchedules;

Expand Down Expand Up @@ -278,12 +279,12 @@ function ($groupId) use ($currentTime) {
*
* It should be taken by standalone (child) process, not by the parent process.
*
* @param int $groupId
* @param string $groupId
* @param callable $callback
*
* @return void
*/
private function lockGroup($groupId, callable $callback)
private function lockGroup(string $groupId, callable $callback): void
{
if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) {
$this->logger->warning(
Expand Down Expand Up @@ -399,7 +400,7 @@ function () use ($schedule) {
* @param string $jobName
* @return void
*/
private function startProfiling(string $jobName = '')
private function startProfiling(string $jobName = ''): void
{
$this->statProfiler->clear();
$this->statProfiler->start(
Expand All @@ -416,7 +417,7 @@ private function startProfiling(string $jobName = '')
* @param string $jobName
* @return void
*/
private function stopProfiling(string $jobName = '')
private function stopProfiling(string $jobName = ''): void
{
$this->statProfiler->stop(
sprintf(self::CRON_TIMERID, $jobName),
Expand Down Expand Up @@ -445,9 +446,9 @@ private function getProfilingStat(string $jobName): string
* Return job collection from data base with status 'pending'.
*
* @param string $groupId
* @return \Magento\Cron\Model\ResourceModel\Schedule\Collection
* @return ScheduleCollection
*/
private function getPendingSchedules($groupId)
private function getPendingSchedules(string $groupId): ScheduleCollection
{
$jobs = $this->_config->getJobs();
$pendingJobs = $this->_scheduleFactory->create()->getCollection();
Expand All @@ -462,7 +463,7 @@ private function getPendingSchedules($groupId)
* @param string $groupId
* @return $this
*/
private function generateSchedules($groupId)
private function generateSchedules(string $groupId): self
{
/**
* check if schedule generation is needed
Expand Down Expand Up @@ -533,13 +534,13 @@ protected function _generateJobs($jobs, $exists, $groupId)
* @param int $currentTime
* @return void
*/
private function cleanupJobs($groupId, $currentTime)
private function cleanupJobs(string $groupId, int $currentTime): void
{
// check if history cleanup is needed
$lastCleanup = (int)$this->_cache->load(self::CACHE_KEY_LAST_HISTORY_CLEANUP_AT . $groupId);
$historyCleanUp = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_CLEANUP_EVERY);
if ($lastCleanup > $this->dateTime->gmtTimestamp() - $historyCleanUp * self::SECONDS_IN_MINUTE) {
return $this;
return;
}
// save time history cleanup was ran with no expiration
$this->_cache->save(
Expand All @@ -550,6 +551,7 @@ private function cleanupJobs($groupId, $currentTime)
);

$this->cleanupDisabledJobs($groupId);
$this->cleanupRunningJobs($groupId);

$historySuccess = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_SUCCESS);
$historyFailure = (int)$this->getCronGroupConfigurationValue($groupId, self::XML_PATH_HISTORY_FAILURE);
Expand Down Expand Up @@ -673,7 +675,7 @@ protected function getScheduleTimeInterval($groupId)
* @param string $groupId
* @return void
*/
private function cleanupDisabledJobs($groupId)
private function cleanupDisabledJobs(string $groupId): void
{
$jobs = $this->_config->getJobs();
$jobsToCleanup = [];
Expand All @@ -696,6 +698,33 @@ private function cleanupDisabledJobs($groupId)
}
}

/**
* Cleanup jobs that were left in a running state due to an unexpected stop
*
* @param string $groupId
* @return void
*/
private function cleanupRunningJobs(string $groupId): void
{
$scheduleResource = $this->_scheduleFactory->create()->getResource();
$connection = $scheduleResource->getConnection();

$jobs = $this->_config->getJobs();

$connection->update(
$scheduleResource->getTable('cron_schedule'),
[
'status' => \Magento\Cron\Model\Schedule::STATUS_ERROR,
'messages' => 'Time out'
],
[
$connection->quoteInto('status = ?', \Magento\Cron\Model\Schedule::STATUS_RUNNING),
$connection->quoteInto('job_code IN (?)', array_keys($jobs[$groupId])),
'scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY'
]
);
}

/**
* Get cron expression of cron job.
*
Expand Down Expand Up @@ -773,13 +802,13 @@ private function isGroupInFilter($groupId): bool
* @param array $jobsRoot
* @param int $currentTime
*/
private function processPendingJobs($groupId, $jobsRoot, $currentTime)
private function processPendingJobs(string $groupId, array $jobsRoot, int $currentTime): void
{
$procesedJobs = [];
$processedJobs = [];
$pendingJobs = $this->getPendingSchedules($groupId);
/** @var Schedule $schedule */
foreach ($pendingJobs as $schedule) {
if (isset($procesedJobs[$schedule->getJobCode()])) {
if (isset($processedJobs[$schedule->getJobCode()])) {
// process only on job per run
continue;
}
Expand All @@ -796,7 +825,7 @@ private function processPendingJobs($groupId, $jobsRoot, $currentTime)
$this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);

if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
$procesedJobs[$schedule->getJobCode()] = true;
$processedJobs[$schedule->getJobCode()] = true;
}

$this->retrier->execute(
Expand All @@ -821,7 +850,7 @@ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule,
{
// use sha1 to limit length
// phpcs:ignore Magento2.Security.InsecureFunction
$lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
$lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());

try {
for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {
Expand Down
13 changes: 12 additions & 1 deletion app/code/Magento/Cron/Test/Unit/Model/DeadlockRetrierTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Magento\Framework\DB\Adapter\AdapterInterface;
use Magento\Framework\DB\Adapter\DeadlockException;
use PHPUnit\Framework\MockObject\MockObject;
use Psr\Log\LoggerInterface;

class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase
{
Expand All @@ -27,6 +28,11 @@ class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase
*/
private $adapterMock;

/**
* @var LoggerInterface|MockObject
*/
private $loggerMock;

/**
* @var AbstractModel|MockObject
*/
Expand All @@ -38,8 +44,9 @@ class DeadlockRetrierTest extends \PHPUnit\Framework\TestCase
protected function setUp(): void
{
$this->adapterMock = $this->getMockForAbstractClass(AdapterInterface::class);
$this->loggerMock = $this->getMockForAbstractClass(LoggerInterface::class);
$this->modelMock = $this->createMock(AbstractModel::class);
$this->retrier = new DeadlockRetrier();
$this->retrier = new DeadlockRetrier($this->loggerMock);
}

/**
Expand Down Expand Up @@ -75,6 +82,8 @@ public function testRetry(): void
$this->modelMock->expects($this->exactly(DeadlockRetrierInterface::MAX_RETRIES))
->method('getId')
->willThrowException(new DeadlockException());
$this->loggerMock->expects($this->exactly(DeadlockRetrierInterface::MAX_RETRIES - 1))
->method('warning');

$this->retrier->execute(
function () {
Expand All @@ -95,6 +104,8 @@ public function testRetrySecond(): void
$this->modelMock->expects($this->at(1))
->method('getId')
->willReturn(2);
$this->loggerMock->expects($this->once())
->method('warning');

$this->retrier->execute(
function () {
Expand Down
Loading