9
9
*/
10
10
namespace Magento \Cron \Observer ;
11
11
12
+ use Magento \Cron \Model \ResourceModel \Schedule \Collection as ScheduleCollection ;
12
13
use Magento \Cron \Model \Schedule ;
13
14
use Magento \Framework \App \State ;
14
15
use Magento \Framework \Console \Cli ;
@@ -83,7 +84,7 @@ class ProcessCronQueueObserver implements ObserverInterface
83
84
const MAX_RETRIES = 5 ;
84
85
85
86
/**
86
- * @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
87
+ * @var ScheduleCollection
87
88
*/
88
89
protected $ _pendingSchedules ;
89
90
@@ -278,12 +279,12 @@ function ($groupId) use ($currentTime) {
278
279
*
279
280
* It should be taken by standalone (child) process, not by the parent process.
280
281
*
281
- * @param int $groupId
282
+ * @param string $groupId
282
283
* @param callable $callback
283
284
*
284
285
* @return void
285
286
*/
286
- private function lockGroup ($ groupId , callable $ callback )
287
+ private function lockGroup (string $ groupId , callable $ callback ): void
287
288
{
288
289
if (!$ this ->lockManager ->lock (self ::LOCK_PREFIX . $ groupId , self ::LOCK_TIMEOUT )) {
289
290
$ this ->logger ->warning (
@@ -399,7 +400,7 @@ function () use ($schedule) {
399
400
* @param string $jobName
400
401
* @return void
401
402
*/
402
- private function startProfiling (string $ jobName = '' )
403
+ private function startProfiling (string $ jobName = '' ): void
403
404
{
404
405
$ this ->statProfiler ->clear ();
405
406
$ this ->statProfiler ->start (
@@ -416,7 +417,7 @@ private function startProfiling(string $jobName = '')
416
417
* @param string $jobName
417
418
* @return void
418
419
*/
419
- private function stopProfiling (string $ jobName = '' )
420
+ private function stopProfiling (string $ jobName = '' ): void
420
421
{
421
422
$ this ->statProfiler ->stop (
422
423
sprintf (self ::CRON_TIMERID , $ jobName ),
@@ -445,9 +446,9 @@ private function getProfilingStat(string $jobName): string
445
446
* Return job collection from data base with status 'pending'.
446
447
*
447
448
* @param string $groupId
448
- * @return \Magento\Cron\Model\ResourceModel\Schedule\Collection
449
+ * @return ScheduleCollection
449
450
*/
450
- private function getPendingSchedules ($ groupId )
451
+ private function getPendingSchedules (string $ groupId ): ScheduleCollection
451
452
{
452
453
$ jobs = $ this ->_config ->getJobs ();
453
454
$ pendingJobs = $ this ->_scheduleFactory ->create ()->getCollection ();
@@ -462,7 +463,7 @@ private function getPendingSchedules($groupId)
462
463
* @param string $groupId
463
464
* @return $this
464
465
*/
465
- private function generateSchedules ($ groupId )
466
+ private function generateSchedules (string $ groupId ): self
466
467
{
467
468
/**
468
469
* check if schedule generation is needed
@@ -533,13 +534,13 @@ protected function _generateJobs($jobs, $exists, $groupId)
533
534
* @param int $currentTime
534
535
* @return void
535
536
*/
536
- private function cleanupJobs ($ groupId , $ currentTime )
537
+ private function cleanupJobs (string $ groupId , int $ currentTime ): void
537
538
{
538
539
// check if history cleanup is needed
539
540
$ lastCleanup = (int )$ this ->_cache ->load (self ::CACHE_KEY_LAST_HISTORY_CLEANUP_AT . $ groupId );
540
541
$ historyCleanUp = (int )$ this ->getCronGroupConfigurationValue ($ groupId , self ::XML_PATH_HISTORY_CLEANUP_EVERY );
541
542
if ($ lastCleanup > $ this ->dateTime ->gmtTimestamp () - $ historyCleanUp * self ::SECONDS_IN_MINUTE ) {
542
- return $ this ;
543
+ return ;
543
544
}
544
545
// save time history cleanup was ran with no expiration
545
546
$ this ->_cache ->save (
@@ -550,6 +551,7 @@ private function cleanupJobs($groupId, $currentTime)
550
551
);
551
552
552
553
$ this ->cleanupDisabledJobs ($ groupId );
554
+ $ this ->cleanupRunningJobs ($ groupId );
553
555
554
556
$ historySuccess = (int )$ this ->getCronGroupConfigurationValue ($ groupId , self ::XML_PATH_HISTORY_SUCCESS );
555
557
$ historyFailure = (int )$ this ->getCronGroupConfigurationValue ($ groupId , self ::XML_PATH_HISTORY_FAILURE );
@@ -673,7 +675,7 @@ protected function getScheduleTimeInterval($groupId)
673
675
* @param string $groupId
674
676
* @return void
675
677
*/
676
- private function cleanupDisabledJobs ($ groupId )
678
+ private function cleanupDisabledJobs (string $ groupId ): void
677
679
{
678
680
$ jobs = $ this ->_config ->getJobs ();
679
681
$ jobsToCleanup = [];
@@ -696,6 +698,33 @@ private function cleanupDisabledJobs($groupId)
696
698
}
697
699
}
698
700
701
+ /**
702
+ * Cleanup jobs that were left in a running state due to an unexpected stop
703
+ *
704
+ * @param string $groupId
705
+ * @return void
706
+ */
707
+ private function cleanupRunningJobs (string $ groupId ): void
708
+ {
709
+ $ scheduleResource = $ this ->_scheduleFactory ->create ()->getResource ();
710
+ $ connection = $ scheduleResource ->getConnection ();
711
+
712
+ $ jobs = $ this ->_config ->getJobs ();
713
+
714
+ $ connection ->update (
715
+ $ scheduleResource ->getTable ('cron_schedule ' ),
716
+ [
717
+ 'status ' => \Magento \Cron \Model \Schedule::STATUS_ERROR ,
718
+ 'messages ' => 'Time out '
719
+ ],
720
+ [
721
+ $ connection ->quoteInto ('status = ? ' , \Magento \Cron \Model \Schedule::STATUS_RUNNING ),
722
+ $ connection ->quoteInto ('job_code IN (?) ' , array_keys ($ jobs [$ groupId ])),
723
+ 'scheduled_at < UTC_TIMESTAMP() - INTERVAL 1 DAY '
724
+ ]
725
+ );
726
+ }
727
+
699
728
/**
700
729
* Get cron expression of cron job.
701
730
*
@@ -773,13 +802,13 @@ private function isGroupInFilter($groupId): bool
773
802
* @param array $jobsRoot
774
803
* @param int $currentTime
775
804
*/
776
- private function processPendingJobs ($ groupId , $ jobsRoot , $ currentTime )
805
+ private function processPendingJobs (string $ groupId , array $ jobsRoot , int $ currentTime ): void
777
806
{
778
- $ procesedJobs = [];
807
+ $ processedJobs = [];
779
808
$ pendingJobs = $ this ->getPendingSchedules ($ groupId );
780
809
/** @var Schedule $schedule */
781
810
foreach ($ pendingJobs as $ schedule ) {
782
- if (isset ($ procesedJobs [$ schedule ->getJobCode ()])) {
811
+ if (isset ($ processedJobs [$ schedule ->getJobCode ()])) {
783
812
// process only on job per run
784
813
continue ;
785
814
}
@@ -796,7 +825,7 @@ private function processPendingJobs($groupId, $jobsRoot, $currentTime)
796
825
$ this ->tryRunJob ($ scheduledTime , $ currentTime , $ jobConfig , $ schedule , $ groupId );
797
826
798
827
if ($ schedule ->getStatus () === Schedule::STATUS_SUCCESS ) {
799
- $ procesedJobs [$ schedule ->getJobCode ()] = true ;
828
+ $ processedJobs [$ schedule ->getJobCode ()] = true ;
800
829
}
801
830
802
831
$ this ->retrier ->execute (
@@ -821,7 +850,7 @@ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule,
821
850
{
822
851
// use sha1 to limit length
823
852
// phpcs:ignore Magento2.Security.InsecureFunction
824
- $ lockName = self ::LOCK_PREFIX . md5 ($ groupId . '_ ' . $ schedule ->getJobCode ());
853
+ $ lockName = self ::LOCK_PREFIX . md5 ($ groupId . '_ ' . $ schedule ->getJobCode ());
825
854
826
855
try {
827
856
for ($ retries = self ::MAX_RETRIES ; $ retries > 0 ; $ retries --) {
0 commit comments