跳到内容

Scheduler

编辑此页

调度器组件管理 PHP 应用程序中的任务调度,例如每晚凌晨 3 点运行任务,每两周运行一次(节假日除外),或任何您可能需要的自定义计划。

此组件对于调度诸如维护(数据库清理、缓存清除等)、后台处理(队列处理、数据同步等)、定期数据更新、计划通知(电子邮件、警报)等任务非常有用。

本文档重点介绍如何在全栈 Symfony 应用程序的上下文中 使用 Scheduler 组件。

安装

在使用 Symfony Flex 的应用程序中,运行以下命令来安装 scheduler 组件

1
$ composer require symfony/scheduler

提示

MakerBundle v1.58.0 开始,您可以运行 php bin/console make:schedule 来生成一个基本计划,您可以自定义它来创建您自己的 Scheduler。

Symfony Scheduler 基础

使用此组件的主要好处是自动化由您的应用程序管理,这为您提供了 cron 作业无法实现的灵活性(例如,基于某些条件的动态计划)。

Scheduler 组件的核心允许您创建一个任务(称为消息),该任务由服务执行并按某个计划重复执行。它与 Symfony Messenger 组件有一些相似之处(例如消息、处理器、总线、传输等),但主要区别在于 Messenger 无法处理定期重复的任务。

考虑以下应用程序示例,该应用程序按计划向客户发送一些报告。首先,创建一个 Scheduler 消息,表示创建报告的任务

1
2
3
4
5
6
7
8
9
10
11
12
// src/Scheduler/Message/SendDailySalesReports.php
namespace App\Scheduler\Message;

class SendDailySalesReports
{
    public function __construct(private int $id) {}

    public function getId(): int
    {
        return $this->id;
    }
}

接下来,创建处理该类消息的处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// src/Scheduler/Handler/SendDailySalesReportsHandler.php
namespace App\Scheduler\Handler;

use App\Scheduler\Message\SendDailySalesReports;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class SendDailySalesReportsHandler
{
    public function __invoke(SendDailySalesReports $message)
    {
        // ... do some work to send the report to the customers
    }
}

目标不是立即发送这些消息(如 Messenger 组件中那样),而是根据预定义的频率创建它们。这可以通过 SchedulerTransport 实现,SchedulerTransport 是 Scheduler 消息的特殊传输方式。

传输方式根据分配的频率自主生成各种消息。下图说明了 Messenger 和 Scheduler 组件中消息处理的区别

在 Messenger 中

Symfony Messenger basic cycle

在 Scheduler 中

Symfony Scheduler basic cycle

另一个重要的区别是 Scheduler 组件中的消息是周期性的。它们通过 RecurringMessage 类表示。

将周期性消息附加到计划

消息频率的配置存储在实现 ScheduleProviderInterface 的类中。此提供程序使用 getSchedule() 方法返回包含不同周期性消息的计划。

AsSchedule 属性允许您在特定计划上注册,默认情况下,它引用名为 default 的计划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// src/Scheduler/SaleTaskProvider.php
namespace App\Scheduler;

use Symfony\Component\Scheduler\Attribute\AsSchedule;
use Symfony\Component\Scheduler\Schedule;
use Symfony\Component\Scheduler\ScheduleProviderInterface;

#[AsSchedule]
class SaleTaskProvider implements ScheduleProviderInterface
{
    public function getSchedule(): Schedule
    {
        // ...
    }
}

提示

默认情况下,计划名称为 default,传输名称遵循以下语法:scheduler_nameofyourschedule(例如 scheduler_default)。

提示

记忆化 您的计划是一种良好的做法,可以防止在 getSchedule() 方法被另一个服务检查时进行不必要的重建。

调度周期性消息

RecurringMessage 是与触发器关联的消息,触发器配置消息的频率。Symfony 提供了不同类型的触发器

CronExpressionTrigger
一种触发器,它使用与 cron 命令行实用程序 相同的语法。
CallbackTrigger
一种触发器,它使用回调来确定下一次运行日期。
ExcludeTimeTrigger
一种触发器,它从给定的触发器中排除某些时间。
JitterTrigger
一种触发器,它为给定的触发器添加随机抖动。抖动是添加到原始触发日期/时间的一些时间。这允许分散计划任务的负载,而不是在完全相同的时间运行所有任务。
PeriodicalTrigger
一种触发器,它使用 DateInterval 来确定下一次运行日期。

JitterTriggerExcludeTimeTrigger 是装饰器,它们修改它们包装的触发器的行为。您可以通过调用 inner()decorators() 方法来获取装饰器以及被装饰的触发器

1
2
3
4
$trigger = new ExcludeTimeTrigger(new JitterTrigger(CronExpressionTrigger::fromSpec('#midnight', new MyMessage()));

$trigger->inner(); // CronExpressionTrigger
$trigger->decorators(); // [ExcludeTimeTrigger, JitterTrigger]

它们中的大多数可以通过 RecurringMessage 类创建,如下例所示。

Cron 表达式触发器

在使用 cron 触发器之前,您必须安装以下依赖项

1
$ composer require dragonmantank/cron-expression

然后,使用与 cron 命令行实用程序 相同的语法定义触发器日期/时间

1
2
3
4
RecurringMessage::cron('* * * * *', new Message());

// optionally you can define the timezone used by the cron expression
RecurringMessage::cron('* * * * *', new Message(), new \DateTimeZone('Africa/Malabo'));

提示

如果您需要帮助构建/理解 cron 表达式,请查看 crontab.guru 网站

您还可以使用一些特殊值来表示常见的 cron 表达式

  • @yearly, @annually - 每年运行一次,1 月 1 日午夜 - 0 0 1 1 *
  • @monthly - 每月运行一次,每月第一天午夜 - 0 0 1 * *
  • @weekly - 每周运行一次,周日午夜 - 0 0 * * 0
  • @daily, @midnight - 每天运行一次,午夜 - 0 0 * * *
  • @hourly - 每小时运行一次,第一分钟 - 0 * * * *

例如

1
RecurringMessage::cron('@daily', new Message());

提示

您还可以使用 AsCronTask 属性 定义 cron 任务。

哈希 Cron 表达式

如果您在同一时间调度了许多触发器(例如,在午夜,0 0 * * *),这将创建一个非常长的在该确切时间运行的计划列表。如果任务有内存泄漏,这可能会导致问题。

您可以在表达式中添加哈希符号 (#) 以生成随机值。虽然这些值是随机的,但它们是可预测且一致的,因为它们是基于消息生成的。字符串表示为 my task 且定义的频率为 # # * * * 的消息将具有 56 20 * * * 的幂等频率(每天晚上 8:56)。

您还可以使用哈希范围 (#(x-y)) 来定义该随机部分可能的取值列表。例如,# #(0-7) * * * 表示每天,在午夜和凌晨 7 点之间的某个时间。使用不带范围的 # 会为该字段创建一个任何有效值的范围。# # # # ##(0-59) #(0-23) #(1-28) #(1-12) #(0-6) 的简写。

您还可以使用一些特殊值来表示常见的哈希 cron 表达式

别名 转换为
#hourly # * * * * (每小时的某个分钟)
#daily # # * * * (每天的某个时间)
#weekly # # * * # (每周的某个时间)
#weekly@midnight # #(0-2) * * # (每周有一天在 #midnight
#monthly # # # * * (每月一次在某天的某个时间)
#monthly@midnight # #(0-2) # * * (每月一次在某天的 #midnight
#annually # # # # * (每年一次在某天的某个时间)
#annually@midnight # #(0-2) # # * (每年一次在某天的 #midnight
#yearly # # # # * #annually 的别名
#yearly@midnight # #(0-2) # # * #annually@midnight 的别名
#midnight # #(0-2) * * * (每天午夜到凌晨 2:59 之间的某个时间)

例如

1
RecurringMessage::cron('#midnight', new Message());

注意

月份范围为 1-28,这是为了考虑到最少有 28 天的二月。

周期性触发器

这些触发器允许使用不同的数据类型(stringintegerDateInterval)配置频率。它们还支持 PHP datetime 函数定义的 相对格式

1
2
3
4
5
6
7
RecurringMessage::every('10 seconds', new Message());
RecurringMessage::every('3 weeks', new Message());
RecurringMessage::every('first Monday of next month', new Message());

$from = new \DateTimeImmutable('13:47', new \DateTimeZone('Europe/Paris'));
$until = '2023-06-12';
RecurringMessage::every('first Monday of next month', new Message(), $from, $until);

提示

您还可以使用 AsPeriodicTask 属性 定义周期性任务。

自定义触发器

自定义触发器允许动态配置任何频率。它们被创建为实现 TriggerInterface 的服务。

例如,如果您想每天发送客户报告,节假日除外

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// src/Scheduler/Trigger/NewUserWelcomeEmailHandler.php
namespace App\Scheduler\Trigger;

class ExcludeHolidaysTrigger implements TriggerInterface
{
    public function __construct(private TriggerInterface $inner)
    {
    }

    // use this method to give a nice displayable name to
    // identify your trigger (it eases debugging)
    public function __toString(): string
    {
        return $this->inner.' (except holidays)';
    }

    public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
    {
        if (!$nextRun = $this->inner->getNextRunDate($run)) {
            return null;
        }

        // loop until you get the next run date that is not a holiday
        while ($this->isHoliday($nextRun)) {
            $nextRun = $this->inner->getNextRunDate($nextRun);
        }

        return $nextRun;
    }

    private function isHoliday(\DateTimeImmutable $timestamp): bool
    {
        // add some logic to determine if the given $timestamp is a holiday
        // return true if holiday, false otherwise
    }
}

然后,定义您的周期性消息

1
2
3
4
5
6
RecurringMessage::trigger(
    new ExcludeHolidaysTrigger(
        CronExpressionTrigger::fromSpec('@daily'),
    ),
    new SendDailySalesReports('...'),
);

最后,周期性消息必须附加到计划中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// src/Scheduler/SaleTaskProvider.php
namespace App\Scheduler;

#[AsSchedule('uptoyou')]
class SaleTaskProvider implements ScheduleProviderInterface
{
    public function getSchedule(): Schedule
    {
        return $this->schedule ??= (new Schedule())
            ->with(
                RecurringMessage::trigger(
                    new ExcludeHolidaysTrigger(
                        CronExpressionTrigger::fromSpec('@daily'),
                    ),
                    new SendDailySalesReports()
                ),
                RecurringMessage::cron('3 8 * * 1', new CleanUpOldSalesReport())
            );
    }
}

因此,此 RecurringMessage 将包含触发器(定义消息的生成频率)和消息本身(要由特定处理器处理的消息)。

但有趣的是,它还为您提供了动态生成消息的能力。

为生成的消息提供动态愿景

当消息依赖于数据库或第三方服务中存储的数据时,这被证明特别有用。

继续之前的报告生成示例:它们依赖于客户请求。根据具体需求,可能需要在定义的频率下生成任意数量的报告。对于这些动态场景,它使您能够动态定义我们的消息,而不是静态定义。这可以通过定义 CallbackMessageProvider 来实现。

从本质上讲,这意味着您可以动态地在运行时通过回调定义您的消息,每次调度器传输检查要生成的消息时,都会执行该回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// src/Scheduler/SaleTaskProvider.php
namespace App\Scheduler;

#[AsSchedule('uptoyou')]
class SaleTaskProvider implements ScheduleProviderInterface
{
    public function getSchedule(): Schedule
    {
        return $this->schedule ??= (new Schedule())
            ->with(
                RecurringMessage::trigger(
                    new ExcludeHolidaysTrigger(
                        CronExpressionTrigger::fromSpec('@daily'),
                    ),
                    // instead of being static as in the previous example
                    new CallbackMessageProvider([$this, 'generateReports'], 'foo')
                ),
                RecurringMessage::cron('3 8 * * 1', new CleanUpOldSalesReport())
            );
    }

    public function generateReports(MessageContext $context)
    {
        // ...
        yield new SendDailySalesReports();
        yield new ReportSomethingReportSomethingElse();
    }
}

探索制作周期性消息的替代方案

还有另一种构建 RecurringMessage 的方法,可以通过向服务或命令添加以下属性之一来完成:AsPeriodicTaskAsCronTask

对于这两个属性,您都可以通过 schedule 选项定义要使用的计划。默认情况下,将使用名为 default 的计划。此外,默认情况下,将调用您服务的 __invoke 方法,但也可以通过 method 选项指定要调用的方法,如果需要,可以通过 arguments 选项定义参数。

AsCronTask 示例

这是使用此属性定义 cron 触发器的最基本方法

1
2
3
4
5
6
7
8
9
10
11
12
13
// src/Scheduler/Task/SendDailySalesReports.php
namespace App\Scheduler\Task;

use Symfony\Component\Scheduler\Attribute\AsCronTask;

#[AsCronTask('0 0 * * *')]
class SendDailySalesReports
{
    public function __invoke()
    {
        // ...
    }
}

该属性接受更多参数来自定义触发器

1
2
3
4
5
6
7
8
9
10
11
12
13
// adds randomly up to 6 seconds to the trigger time to avoid load spikes
#[AsCronTask('0 0 * * *', jitter: 6)]

// defines the method name to call instead as well as the arguments to pass to it
#[AsCronTask('0 0 * * *', method: 'sendEmail', arguments: ['email' => '[email protected]'])]

// defines the timezone to use
#[AsCronTask('0 0 * * *', timezone: 'Africa/Malabo')]

// when applying this attribute to a Symfony console command, you can pass
// arguments and options to the command using the 'arguments' option:
#[AsCronTask('0 0 * * *', arguments: 'some_argument --some-option --another-option=some_value')]
class MyCommand extends Command

AsPeriodicTask 示例

这是使用此属性定义周期性触发器的最基本方法

1
2
3
4
5
6
7
8
9
10
11
12
13
// src/Scheduler/Task/SendDailySalesReports.php
namespace App\Scheduler\Task;

use Symfony\Component\Scheduler\Attribute\AsPeriodicTask;

#[AsPeriodicTask(frequency: '1 day', from: '2022-01-01', until: '2023-06-12')]
class SendDailySalesReports
{
    public function __invoke()
    {
        // ...
    }
}

注意

fromuntil 选项是可选的。如果未定义,任务将无限期执行。

#[AsPeriodicTask] 属性接受许多参数来自定义触发器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// the frequency can be defined as an integer representing the number of seconds
#[AsPeriodicTask(frequency: 86400)]

// adds randomly up to 6 seconds to the trigger time to avoid load spikes
#[AsPeriodicTask(frequency: '1 day', jitter: 6)]

// defines the method name to call instead as well as the arguments to pass to it
#[AsPeriodicTask(frequency: '1 day', method: 'sendEmail', arguments: ['email' => '[email protected]'])]
class SendDailySalesReports
{
    public function sendEmail(string $email): void
    {
        // ...
    }
}

// when applying this attribute to a Symfony console command, you can pass
// arguments and options to the command using the 'arguments' option:
#[AsPeriodicTask(frequency: '1 day', arguments: 'some_argument --some-option --another-option=some_value')]
class MyCommand extends Command

管理计划消息

实时修改计划消息

虽然提前规划计划是有益的,但计划在一段时间内保持静态的情况很少见。在一段时间后,一些 RecurringMessages 可能会过时,而另一些可能需要集成到计划中。

作为一种通用做法,为了减轻繁重的工作负载,计划中的周期性消息存储在内存中,以避免每次调度器传输生成消息时都重新计算。但是,这种方法也可能存在缺点。

继续上面相同的报告生成示例,公司可能会在特定时期进行促销活动(并且需要在给定的时间范围内重复沟通),或者在某些情况下需要停止删除旧报告。

这就是为什么 Scheduler Incorporates 机制可以动态修改计划并实时考虑所有更改。

在计划中添加、移除和修改条目的策略

计划为您提供了 add()remove()clear() 所有关联的周期性消息的能力,从而导致周期性消息的内存堆栈重置和重新计算。

例如,出于各种原因,如果不需要生成报告,则可以使用回调来有条件地跳过生成部分或全部报告。

但是,如果目的是完全删除周期性消息及其重复,则 Schedule 提供了 remove()removeById() 方法。这在您的情况下可能特别有用,尤其是在您需要停止生成周期性消息(涉及删除旧报告)时。

在您的处理器中,您可以检查条件,如果肯定,则访问 Schedule 并调用此方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// src/Scheduler/SaleTaskProvider.php
namespace App\Scheduler;

#[AsSchedule('uptoyou')]
class SaleTaskProvider implements ScheduleProviderInterface
{
    public function getSchedule(): Schedule
    {
        $this->removeOldReports = RecurringMessage::cron(‘3 8 * * 1’, new CleanUpOldSalesReport());

        return $this->schedule ??= (new Schedule())
            ->with(
                // ...
                $this->removeOldReports;
            );
    }

    // ...

    public function removeCleanUpMessage()
    {
        $this->getSchedule()->getSchedule()->remove($this->removeOldReports);
    }
}

// src/Scheduler/Handler/CleanUpOldSalesReportHandler.php
namespace App\Scheduler\Handler;

#[AsMessageHandler]
class CleanUpOldSalesReportHandler
{
    public function __invoke(CleanUpOldSalesReport $cleanUpOldSalesReport): void
    {
        // do some work here...

        if ($isFinished) {
            $this->mySchedule->removeCleanUpMessage();
        }
    }
}

然而,此系统可能并非适用于所有场景。此外,理想情况下,处理器应设计为处理其预期的消息类型,而无需决定添加或删除新的周期性消息。

例如,如果由于外部事件,需要添加旨在删除报告的周期性消息,则在处理器中实现这一点可能具有挑战性。这是因为一旦没有更多该类型的消息,将不再调用或执行处理器。

但是,Scheduler 还具有事件系统,该系统通过嫁接到 Symfony Messenger 事件中集成到 Symfony 全栈应用程序中。这些事件通过侦听器分派,提供了一种方便的响应方式。

通过事件管理计划消息

战略性事件处理

目标是在保持解耦的同时,为决定何时采取行动提供灵活性。引入了三种主要事件类型

  • PRE_RUN_EVENT(预运行事件)
  • POST_RUN_EVENT(后运行事件)
  • FAILURE_EVENT(失败事件)

访问计划是一项关键功能,允许轻松添加或删除消息类型。此外,还可以访问当前处理的消息及其消息上下文。

考虑到我们的场景,您可以监听 PRE_RUN_EVENT 并检查是否满足特定条件。例如,您可能会决定再次添加用于清理旧报告的周期性消息,使用相同或不同的配置,或者添加任何其他周期性消息。

如果您选择处理定期消息的删除,则可以在此事件的监听器中执行此操作。重要的是,它揭示了一个特定的功能 shouldCancel(),允许您阻止已删除的定期消息的消息被传输并由其处理程序处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// src/Scheduler/SaleTaskProvider.php
namespace App\Scheduler;

#[AsSchedule('uptoyou')]
class SaleTaskProvider implements ScheduleProviderInterface
{
    public function __construct(private EventDispatcherInterface $dispatcher)
    {
    }

    public function getSchedule(): Schedule
    {
        $this->removeOldReports = RecurringMessage::cron('3 8 * * 1', new CleanUpOldSalesReport());

        return $this->schedule ??= (new Schedule($this->dispatcher))
            ->with(
                // ...
            )
            ->before(function(PreRunEvent $event) {
                $message = $event->getMessage();
                $messageContext = $event->getMessageContext();

                // can access the schedule
                $schedule = $event->getSchedule()->getSchedule();

                // can target directly the RecurringMessage being processed
                $schedule->removeById($messageContext->id);

                // allow to call the ShouldCancel() and avoid the message to be handled
                $event->shouldCancel(true);
            })
            ->after(function(PostRunEvent $event) {
                // Do what you want
            })
            ->onFailure(function(FailureEvent $event) {
                // Do what you want
            });
    }
}

Scheduler 事件

PreRunEvent

事件类: PreRunEvent

PreRunEvent 允许修改 Schedule 或在消息被消费之前取消消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Scheduler\Event\PreRunEvent;

public function onMessage(PreRunEvent $event): void
{
    $schedule = $event->getSchedule();
    $context = $event->getMessageContext();
    $message = $event->getMessage();

    // do something with the schedule, context or message

    // and/or cancel message
    $event->shouldCancel(true);
}

执行此命令以找出为此事件注册了哪些监听器及其优先级

1
$ php bin/console debug:event-dispatcher "Symfony\Component\Scheduler\Event\PreRunEvent"

PostRunEvent

事件类: PostRunEvent

PostRunEvent 允许在消息被消费后修改 Schedule

1
2
3
4
5
6
7
8
9
10
11
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Scheduler\Event\PostRunEvent;

public function onMessage(PostRunEvent $event): void
{
    $schedule = $event->getSchedule();
    $context = $event->getMessageContext();
    $message = $event->getMessage();

    // do something with the schedule, context or message
}

执行此命令以找出为此事件注册了哪些监听器及其优先级

1
$ php bin/console debug:event-dispatcher "Symfony\Component\Scheduler\Event\PostRunEvent"

FailureEvent

事件类: FailureEvent

FailureEvent 允许在消息消费抛出异常时修改 Schedule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Scheduler\Event\FailureEvent;

public function onMessage(FailureEvent $event): void
{
    $schedule = $event->getSchedule();
    $context = $event->getMessageContext();
    $message = $event->getMessage();

    $error = $event->getError();

    // do something with the schedule, context, message or error (logging, ...)

    // and/or ignore failure event
    $event->shouldIgnore(true);
}

执行此命令以找出为此事件注册了哪些监听器及其优先级

1
$ php bin/console debug:event-dispatcher "Symfony\Component\Scheduler\Event\FailureEvent"

消费消息

Scheduler 组件提供了两种消费消息的方式,具体取决于您的需求:使用 messenger:consume 命令或以编程方式创建 worker。当在完整的 Symfony 应用程序环境中使用 Scheduler 组件时,第一种解决方案是推荐的,而当将 Scheduler 组件作为独立组件使用时,第二种解决方案更合适。

运行 Worker

在定义定期消息并将其附加到计划后,您需要一种机制来根据其定义的频率生成和消费消息。为此,Scheduler 组件使用了 Messenger 组件中的 messenger:consume 命令

1
2
3
4
$ php bin/console messenger:consume scheduler_nameofyourschedule

# use -vv if you need details about what's happening
$ php bin/console messenger:consume scheduler_nameofyourschedule -vv
Symfony Scheduler - generate and consume

提示

根据您的部署场景,您可能更喜欢使用 cron、Supervisor 或 systemd 等工具来自动化 Messenger worker 进程的执行。这确保了 worker 持续运行。有关更多详细信息,请参阅 Messenger 组件文档的 部署到生产环境 部分。

以编程方式创建 Consumer

前一种解决方案的替代方法是创建并调用一个 worker 来消费消息。该组件带有一个名为 Scheduler 的即用型 worker,您可以在代码中使用它

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
use Symfony\Component\Scheduler\Scheduler;

$schedule = (new Schedule())
    ->with(
        RecurringMessage::trigger(
            new ExcludeHolidaysTrigger(
                CronExpressionTrigger::fromSpec('@daily'),
            ),
            new SendDailySalesReports()
        ),
    );

$scheduler = new Scheduler(handlers: [
    SendDailySalesReports::class => new SendDailySalesReportsHandler(),
    // add more handlers if you have more message types
], schedules: [
    $schedule,
    // the scheduler can take as many schedules as you need
]);

// finally, run the scheduler once it's ready
$scheduler->run();

注意

当将 Scheduler 组件作为独立组件使用时,可以使用 Scheduler。如果您在框架环境中使用它,则强烈建议使用 messenger:consume 命令,如上一节所述。

调试计划

debug:scheduler 命令提供了计划列表以及它们的定期消息。您可以将列表缩小到特定的计划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ php bin/console debug:scheduler

  Scheduler
  =========

  default
  -------

    ------------------- ------------------------- ----------------------
    Trigger             Provider                  Next Run
    ------------------- ------------------------- ----------------------
    every 2 days        App\Messenger\Foo(0:17..)  Sun, 03 Dec 2023 ...
    15 4 */3 * *        App\Messenger\Foo(0:17..)  Mon, 18 Dec 2023 ...
   -------------------- -------------------------- ---------------------

# you can also specify a date to use for the next run date:
$ php bin/console debug:scheduler --date=2025-10-18

# you can also specify a date to use for the next run date for a schedule:
$ php bin/console debug:scheduler name_of_schedule --date=2025-10-18

# use the --all option to also display the terminated recurring messages
$ php bin/console debug:scheduler --all

使用 Symfony Scheduler 进行高效管理

当 worker 重新启动或关闭一段时间后,Scheduler 传输将无法生成消息(因为它们是由 scheduler 传输即时创建的)。这意味着在 worker 不活动期间计划发送的任何消息都不会发送,并且 Scheduler 将丢失对上次处理消息的跟踪。重新启动后,它将重新计算从那时起要生成的消息。

为了说明这一点,考虑一个设置为每 3 天发送一次的定期消息。如果 worker 在第 2 天重新启动,则该消息将在重新启动后 3 天发送,即在第 5 天。

虽然这种行为不一定会构成问题,但有可能它可能与您所寻求的不符。

这就是为什么 scheduler 允许通过 stateful 选项(和 Cache 组件)记住消息的上次执行日期。这允许系统保留计划的状态,确保当 worker 重新启动时,它会从离开的点继续。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/Scheduler/SaleTaskProvider.php
namespace App\Scheduler;

#[AsSchedule('uptoyou')]
class SaleTaskProvider implements ScheduleProviderInterface
{
    public function getSchedule(): Schedule
    {
        $this->removeOldReports = RecurringMessage::cron('3 8 * * 1', new CleanUpOldSalesReport());

        return $this->schedule ??= (new Schedule())
            ->with(
                // ...
            )
            ->stateful($this->cache)
    }
}

使用 stateful 选项,所有错过的消息都将被处理。如果您只需要处理一次消息,可以使用 processOnlyLastMissedRun 选项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// src/Scheduler/SaleTaskProvider.php
namespace App\Scheduler;

#[AsSchedule('uptoyou')]
class SaleTaskProvider implements ScheduleProviderInterface
{
    public function getSchedule(): Schedule
    {
        $this->removeOldReports = RecurringMessage::cron('3 8 * * 1', new CleanUpOldSalesReport());

        return $this->schedule ??= (new Schedule())
            ->with(
                // ...
            )
            ->stateful($this->cache)
            ->processOnlyLastMissedRun(true)
    }
}

7.2

processOnlyLastMissedRun 选项在 Symfony 7.2 中引入。

为了更有效地扩展您的计划,您可以使用多个 worker。在这种情况下,一个好的做法是添加 lock 以防止同一任务多次执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/Scheduler/SaleTaskProvider.php
namespace App\Scheduler;

#[AsSchedule('uptoyou')]
class SaleTaskProvider implements ScheduleProviderInterface
{
    public function getSchedule(): Schedule
    {
        $this->removeOldReports = RecurringMessage::cron('3 8 * * 1', new CleanUpOldSalesReport());

        return $this->schedule ??= (new Schedule())
            ->with(
                // ...
            )
            ->lock($this->lockFactory->createLock('my-lock'));
    }
}

提示

消息的处理时间很重要。如果花费时间过长,所有后续的消息处理都可能会延迟。因此,一个好的做法是预测到这一点,并计划大于消息处理频率的频率。

此外,为了更好地扩展您的计划,您可以选择将您的消息包装在 RedispatchMessage 中。这允许您指定一个传输,您的消息将在该传输上重新调度,然后再进一步重新调度到其相应的处理程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// src/Scheduler/SaleTaskProvider.php
namespace App\Scheduler;

#[AsSchedule('uptoyou')]
class SaleTaskProvider implements ScheduleProviderInterface
{
    public function getSchedule(): Schedule
    {
        return $this->schedule ??= (new Schedule())
            ->with(
                RecurringMessage::every('5 seconds', new RedispatchMessage(new Message(), 'async'))
            );
    }
}

当使用 RedispatchMessage 时,Symfony 将附加一个 ScheduledStamp 到消息,帮助您在需要时识别这些消息。

这项工作,包括代码示例,根据 Creative Commons BY-SA 3.0 许可获得许可。
目录
    版本