Messenger:同步 & 队列消息处理
Messenger 提供了一个消息总线,它可以发送消息,然后在你的应用程序中立即处理它们,或者通过传输器(例如队列)发送它们以便稍后处理。要更深入地了解它,请阅读 Messenger 组件文档。
创建消息 & 处理器
Messenger 围绕你将创建的两个不同类展开:(1)一个保存数据的消息类,以及(2)一个处理器类,当该消息被调度时将被调用。处理器类将读取消息类并执行一项或多项任务。
消息类没有特定的要求,除了它可以被序列化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// src/Message/SmsNotification.php
namespace App\Message;
class SmsNotification
{
public function __construct(
private string $content,
) {
}
public function getContent(): string
{
return $this->content;
}
}
消息处理器是一个 PHP 可调用对象,推荐的创建方式是创建一个类,该类具有 AsMessageHandler 属性,并且具有一个使用消息类(或消息接口)类型提示的 __invoke()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class SmsNotificationHandler
{
public function __invoke(SmsNotification $message)
{
// ... do some work - like sending an SMS message!
}
}
提示
你也可以在单个类方法上使用 #[AsMessageHandler]
属性。你可以在单个类中的任意多个方法上使用该属性,允许你将多个相关消息类型的处理分组。
感谢 自动配置 和 SmsNotification
类型提示,Symfony 知道当 SmsNotification
消息被调度时应该调用此处理器。大多数情况下,这就是你所需要做的全部。但是你也可以 手动配置消息处理器。要查看所有配置的处理器,请运行
1
$ php bin/console debug:messenger
调度消息
你准备好了!要调度消息(并调用处理器),请注入 messenger.default_bus
服务(通过 MessageBusInterface
),就像在控制器中一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// src/Controller/DefaultController.php
namespace App\Controller;
use App\Message\SmsNotification;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;
class DefaultController extends AbstractController
{
public function index(MessageBusInterface $bus): Response
{
// will cause the SmsNotificationHandler to be called
$bus->dispatch(new SmsNotification('Look! I created a message!'));
// ...
}
}
传输器:异步/队列消息
默认情况下,消息在被调度后立即处理。如果你想异步处理消息,你可以配置一个传输器。传输器能够发送消息(例如,到一个队列系统),然后通过 worker 接收它们。Messenger 支持 多个传输器。
注意
如果你想使用不支持的传输器,请查看 Enqueue 的传输器,它支持 Kafka 和 Google Pub/Sub 等服务。
传输器使用 "DSN" 注册。感谢 Messenger 的 Flex recipe,你的 .env
文件已经有一些示例。
1 2 3
# MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# MESSENGER_TRANSPORT_DSN=doctrine://default
# MESSENGER_TRANSPORT_DSN=redis://127.0.0.1:6379/messages
取消注释你想要的任何传输器(或在 .env.local
中设置它)。有关更多详细信息,请参阅 Messenger:同步 & 队列消息处理。
接下来,在 config/packages/messenger.yaml
中,让我们定义一个名为 async
的传输器,它使用此配置
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
# or expanded to configure more options
#async:
# dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
# options: []
将消息路由到传输器
现在你已经配置了一个传输器,你可以配置消息发送到传输器,而不是立即处理消息
1 2 3 4 5 6 7 8 9 10
// src/Message/SmsNotification.php
namespace App\Message;
use Symfony\Component\Messenger\Attribute\AsMessage;
#[AsMessage('async')]
class SmsNotification
{
// ...
}
7.2
#[AsMessage]
属性在 Symfony 7.2 中引入。
因此,App\Message\SmsNotification
将被发送到 async
传输器,并且它的处理器将不会立即被调用。任何在 routing
下不匹配的消息仍然会立即处理,即同步处理。
注意
如果你同时使用 YAML/XML/PHP 配置文件和 PHP 属性配置路由,则配置始终优先于类属性。此行为允许你按环境覆盖路由。
注意
当在单独的 YAML/XML/PHP 文件中配置路由时,你可以使用部分 PHP 命名空间,如 'App\Message\*'
来匹配匹配命名空间内的所有消息。唯一的要求是 '*'
通配符必须放在命名空间的末尾。
你可以使用 '*'
作为消息类。这将充当任何在 routing
下不匹配的消息的默认路由规则。这对于确保默认情况下没有消息被同步处理很有用。
唯一的缺点是 '*'
也将应用于使用 Symfony Mailer 发送的电子邮件(当 Messenger 可用时,它使用 SendEmailMessage
)。如果你的电子邮件不可序列化(例如,如果它们包含作为 PHP 资源/流的文件附件),这可能会导致问题。
你还可以通过它们的父类或接口来路由类。或者将消息发送到多个传输器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// src/Message/SmsNotification.php
namespace App\Message;
use Symfony\Component\Messenger\Attribute\AsMessage;
#[AsMessage(['async', 'audit'])]
class SmsNotification
{
// ...
}
// if you prefer, you can also apply multiple attributes to the message class
#[AsMessage('async')]
#[AsMessage('audit')]
class SmsNotification
{
// ...
}
注意
如果你为子类和父类都配置了路由,则两个规则都将被使用。例如,如果你有一个从 Notification
扩展的 SmsNotification
对象,则 Notification
和 SmsNotification
的路由都将被使用。
提示
你可以在运行时通过在消息的信封上使用 TransportNamesStamp 来定义和覆盖消息正在使用的传输器。此标记将传输器名称数组作为其唯一参数。有关标记的更多信息,请参阅 信封 & 标记。
消息中的 Doctrine 实体
如果你需要在消息中传递 Doctrine 实体,最好传递实体的主键(或处理器实际需要的任何相关信息,例如 email
等),而不是对象(否则你可能会看到与实体管理器相关的错误)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// src/Message/NewUserWelcomeEmail.php
namespace App\Message;
class NewUserWelcomeEmail
{
public function __construct(
private int $userId,
) {
}
public function getUserId(): int
{
return $this->userId;
}
}
然后,在你的处理器中,你可以查询一个新的对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
// src/MessageHandler/NewUserWelcomeEmailHandler.php
namespace App\MessageHandler;
use App\Message\NewUserWelcomeEmail;
use App\Repository\UserRepository;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
class NewUserWelcomeEmailHandler
{
public function __construct(
private UserRepository $userRepository,
) {
}
public function __invoke(NewUserWelcomeEmail $welcomeEmail): void
{
$user = $this->userRepository->find($welcomeEmail->getUserId());
// ... send an email!
}
}
这保证了实体包含最新的数据。
同步处理消息
如果消息不 匹配任何路由规则,它将不会被发送到任何传输器,并将立即处理。在某些情况下(例如当 将处理器绑定到不同的传输器 时),显式地处理它更容易或更灵活:通过创建一个 sync
传输器并将消息“发送”到那里以便立即处理
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
# ... other transports
sync: 'sync://'
routing:
App\Message\SmsNotification: sync
创建你自己的传输器
如果需要从不支持的东西发送或接收消息,你也可以创建自己的传输器。请参阅 如何创建你自己的 Messenger 传输器。
消费消息(运行 Worker)
一旦你的消息被路由,在大多数情况下,你需要“消费”它们。你可以使用 messenger:consume
命令来完成此操作
1 2 3 4
$ php bin/console messenger:consume async
# use -vv to see details about what's happening
$ php bin/console messenger:consume async -vv
第一个参数是接收器的名称(或者如果你路由到自定义服务,则是服务 ID)。默认情况下,该命令将永远运行:在你的传输器上查找新消息并处理它们。此命令称为你的“worker”。
如果你想从所有可用的接收器消费消息,你可以使用带有 --all
选项的命令
1
$ php bin/console messenger:consume --all
7.1
--all
选项在 Symfony 7.1 中引入。
--keepalive
选项可用于防止消息在长时间运行的处理过程中被过早地重新传递。它将消息标记为“进行中”,并防止在 worker 完成处理之前重新传递消息。
注意
此选项仅适用于受支持的传输器,即 Beanstalkd 和 AmazonSQS 传输器。
7.2
--keepalive
选项在 Symfony 7.2 中引入。
提示
在开发环境中,如果你正在使用 Symfony CLI 工具,你可以配置 worker 与 Web 服务器一起自动运行。你可以在 Symfony CLI Workers 文档中找到更多信息。
提示
要正确停止 worker,请抛出一个 StopWorkerException 的实例。
部署到生产环境
在生产环境中,有一些重要的事情需要考虑
- 使用进程管理器(如 Supervisor 或 systemd)来保持你的 worker 运行
- 你将需要一个或多个“worker”始终运行。为此,请使用进程控制系统,如 Supervisor 或 systemd。
- 不要让 Worker 永远运行
- 某些服务(如 Doctrine 的
EntityManager
)会随着时间的推移消耗更多内存。因此,不要让你的 worker 永远运行,而是使用像messenger:consume --limit=10
这样的标志来告诉你的 worker 只处理 10 条消息后退出(然后进程管理器将创建一个新进程)。还有其他选项,如--memory-limit=128M
和--time-limit=3600
。 - 停止遇到错误的 Worker
- 如果 worker 依赖项(如你的数据库服务器)宕机,或者达到超时,你可以尝试添加 重新连接逻辑,或者如果 worker 收到太多错误,可以使用
messenger:consume
命令的--failure-limit
选项退出 worker。 - 在部署时重启 Worker
- 每次部署时,你都需要重启所有 worker 进程,以便它们看到新部署的代码。为此,请在部署时运行
messenger:stop-workers
。这将向每个 worker 发出信号,表明它应该完成当前正在处理的消息,并应优雅地关闭。然后,进程管理器将创建新的 worker 进程。该命令在内部使用 app 缓存 - 因此请确保将其配置为使用你喜欢的适配器。 - 在部署之间使用相同的缓存
- 如果你的部署策略涉及创建新的目标目录,你应该为 cache.prefix_seed 配置选项设置一个值,以便在部署之间使用相同的缓存命名空间。否则,
cache.app
池将使用kernel.project_dir
参数的值作为命名空间的基础,这将导致每次进行新部署时都使用不同的命名空间。
优先级传输器
有时某些类型的消息应该具有更高的优先级,并在其他消息之前处理。为了实现这一点,你可以创建多个传输器并将不同的消息路由到它们。例如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
# queue_name is specific to the doctrine transport
queue_name: high
# for AMQP send to a separate exchange then queue
#exchange:
# name: high
#queues:
# messages_high: ~
# for redis try "group"
async_priority_low:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
queue_name: low
routing:
'App\Message\SmsNotification': async_priority_low
'App\Message\NewUserWelcomeEmail': async_priority_high
然后,你可以为每个传输器运行单独的 worker,或者指示一个 worker 以优先级顺序处理消息
1
$ php bin/console messenger:consume async_priority_high async_priority_low
worker 将始终首先查找在 async_priority_high
上等待的消息。如果没有,然后它将从 async_priority_low
消费消息。
限制消费特定队列
某些传输器(特别是 AMQP)具有交换机和队列的概念。Symfony 传输器始终绑定到一个交换机。默认情况下,worker 从附加到指定传输器的交换机的所有队列中消费。但是,在某些用例中,你可能希望 worker 仅从特定队列中消费。
你可以限制 worker 仅处理来自特定队列的消息
1 2 3 4
$ php bin/console messenger:consume my_transport --queues=fasttrack
# you can pass the --queues option more than once to process multiple queues
$ php bin/console messenger:consume my_transport --queues=fasttrack1 --queues=fasttrack2
注意
为了允许使用 queues
选项,接收器必须实现 QueueReceiverInterface。
检查每个传输器的队列消息数量
运行 messenger:stats
命令以了解某些或所有传输器的“队列”中有多少消息
1 2 3 4 5 6 7 8 9
# displays the number of queued messages in all transports
$ php bin/console messenger:stats
# shows stats only for some transports
$ php bin/console messenger:stats my_transport_name other_transport_name
# you can also output the stats in JSON format
$ php bin/console messenger:stats --format=json
$ php bin/console messenger:stats my_transport_name other_transport_name --format=json
7.2
format
选项在 Symfony 7.2 中引入。
注意
为了使此命令工作,配置的传输器的接收器必须实现 MessageCountAwareInterface。
Supervisor 配置
Supervisor 是一个很棒的工具,可以保证你的 worker 进程始终运行(即使它由于故障、达到消息限制或感谢 messenger:stop-workers
而关闭)。例如,你可以通过以下方式在 Ubuntu 上安装它
1
$ sudo apt-get install supervisor
Supervisor 配置文件通常位于 /etc/supervisor/conf.d
目录中。例如,你可以在那里创建一个新的 messenger-worker.conf
文件,以确保始终运行 2 个 messenger:consume
实例
1 2 3 4 5 6 7 8 9 10
;/etc/supervisor/conf.d/messenger-worker.conf
[program:messenger-consume]
command=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
user=ubuntu
numprocs=2
startsecs=0
autostart=true
autorestart=true
startretries=10
process_name=%(program_name)s_%(process_num)02d
更改 async
参数以使用你的传输器(或多个传输器)的名称,并将 user
更改为你的服务器上的 Unix 用户。
警告
在部署期间,某些服务可能不可用(例如,数据库),导致消费者启动失败。在这种情况下,Supervisor 将尝试 startretries
次数来重启命令。请务必更改此设置,以避免命令进入 FATAL 状态,该状态将永远不会再次重启。
每次重启,Supervisor 都会将延迟增加 1 秒。例如,如果该值是 10
,它将等待 1 秒、2 秒、3 秒,依此类推。这为服务提供了总共 55 秒的时间再次变为可用状态。增加 startretries
设置以覆盖最大预期的停机时间。
如果您使用 Redis Transport,请注意每个 worker 都需要一个唯一的消费者名称,以避免同一消息被多个 worker 处理。实现此目的的一种方法是在 Supervisor 配置文件中设置一个环境变量,然后您可以在 messenger.yaml
中引用它(请参阅下面的 Redis 部分)。
1
environment=MESSENGER_CONSUMER_NAME=%(program_name)s_%(process_num)02d
接下来,告诉 Supervisor 读取您的配置并启动您的 workers。
1 2 3 4 5 6 7 8 9
$ sudo supervisorctl reread
$ sudo supervisorctl update
$ sudo supervisorctl start messenger-consume:*
# If you deploy an update of your code, don't forget to restart your workers
# to run the new code
$ sudo supervisorctl restart messenger-consume:*
有关更多详细信息,请参阅 Supervisor 文档。
优雅关机
如果您在项目中安装了 PCNTL PHP 扩展,workers 将处理 SIGTERM
或 SIGINT
POSIX 信号,以在终止之前完成当前消息的处理。
但是,您可能更喜欢使用不同的 POSIX 信号进行优雅关机。您可以通过设置 framework.messenger.stop_worker_on_signals
配置选项来覆盖默认信号。
在某些情况下,SIGTERM
信号由 Supervisor 本身发送(例如,停止将 Supervisor 作为入口点的 Docker 容器)。在这些情况下,您需要在程序配置中添加一个 stopwaitsecs
键(值为所需的优雅期限,以秒为单位),以便执行优雅关机。
1 2
[program:x]
stopwaitsecs=20
Systemd 配置
虽然 Supervisor 是一个很棒的工具,但它的缺点是您需要系统访问权限才能运行它。Systemd 已成为大多数 Linux 发行版上的标准,并且有一个很好的替代方案,称为用户服务。
Systemd 用户服务配置文件通常位于 ~/.config/systemd/user
目录中。例如,您可以创建一个新的 messenger-worker.service
文件。或者,如果您想同时运行更多实例,可以创建一个 messenger-worker@.service
文件。
1 2 3 4 5 6 7 8 9 10 11 12
[Unit]
Description=Symfony messenger-consume %i
[Service]
ExecStart=php /path/to/your/app/bin/console messenger:consume async --time-limit=3600
# for Redis, set a custom consumer name for each instance
Environment="MESSENGER_CONSUMER_NAME=symfony-%n-%i"
Restart=always
RestartSec=30
[Install]
WantedBy=default.target
现在,告诉 systemd 启用并启动一个 worker。
1 2 3 4 5 6
$ systemctl --user enable messenger-worker@1.service
$ systemctl --user start messenger-worker@1.service
# to enable and start 20 workers
$ systemctl --user enable messenger-worker@{1..20}.service
$ systemctl --user start messenger-worker@{1..20}.service
如果您更改了服务配置文件,则需要重新加载守护程序。
1
$ systemctl --user daemon-reload
要重启所有消费者,请执行以下操作:
1
$ systemctl --user restart messenger-consume@*.service
systemd 用户实例仅在特定用户的首次登录后启动。消费者通常需要在系统启动时启动。在用户上启用持久性以激活该行为。
1
$ loginctl enable-linger <your-username>
日志由 journald 管理,可以使用 journalctl 命令进行处理。
1 2 3 4 5 6 7 8
# follow logs of consumer nr 11
$ journalctl -f --user-unit messenger-consume@11.service
# follow logs of all consumers
$ journalctl -f --user-unit messenger-consume@*
# follow all logs from your user services
$ journalctl -f _UID=$UID
有关更多详细信息,请参阅 systemd 文档。
注意
您要么需要 journalctl
命令的提升权限,要么将您的用户添加到 systemd-journal 组。
1
$ sudo usermod -a -G systemd-journal <your-username>
无状态 Worker
PHP 被设计为无状态的,不同的请求之间没有共享资源。在 HTTP 上下文中,PHP 在发送响应后会清理所有内容,因此您可以决定不处理可能泄漏内存的服务。
另一方面,worker 通常会在长时间运行的 CLI 进程中按顺序处理消息,这些进程在处理单条消息后不会结束。请注意服务状态以防止信息和/或内存泄漏,因为 Symfony 会在所有消息中注入服务的相同实例,从而保留服务的内部状态。
但是,某些 Symfony 服务(例如 Monolog fingers crossed handler)在设计上会泄漏。Symfony 提供了服务重置功能来解决此问题。当在两条消息之间自动重置容器时,Symfony 会查找任何实现 ResetInterface 的服务(包括您自己的服务),并调用它们的 reset()
方法,以便它们可以清理其内部状态。
如果服务不是无状态的,并且您希望在每条消息之后重置其属性,则该服务必须实现 ResetInterface,您可以在 reset()
方法中重置属性。
如果您不想重置容器,请在运行 messenger:consume
命令时添加 --no-reset
选项。
速率限制传输器
有时您可能需要限制消息 worker 的速率。您可以在传输上配置速率限制器(需要 RateLimiter 组件),方法是设置其 rate_limiter
选项。
1 2 3 4 5 6
# config/packages/messenger.yaml
framework:
messenger:
transports:
async:
rate_limiter: your_rate_limiter_name
警告
当在传输上配置速率限制器时,当达到限制时,它将阻止整个 worker。您应该确保为速率受限的传输配置专用的 worker,以避免其他传输被阻止。
重试 & 失败
如果在从传输消费消息时抛出异常,它将自动重新发送到传输以再次尝试。默认情况下,消息将在被丢弃或 发送到失败传输 之前重试 3 次。每次重试也会延迟,以防故障是由于临时问题引起的。所有这些都可以在每个传输中配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
# default configuration
retry_strategy:
max_retries: 3
# milliseconds delay
delay: 1000
# causes the delay to be higher before each retry
# e.g. 1 second delay, 2 seconds, 4 seconds
multiplier: 2
max_delay: 0
# applies randomness to the delay that can prevent the thundering herd effect
# the value (between 0 and 1.0) is the percentage of 'delay' that will be added/subtracted
jitter: 0.1
# override all of this with a service that
# implements Symfony\Component\Messenger\Retry\RetryStrategyInterface
# service: null
7.1
jitter
选项在 Symfony 7.1 中引入。
提示
当消息被重试时,Symfony 会触发 WorkerMessageRetriedEvent,以便您可以运行自己的逻辑。
注意
感谢 SerializedMessageStamp,消息的序列化形式被保存,这可以防止稍后重试消息时再次序列化它。
避免重试
有时,处理消息可能会以您知道是永久性的方式失败,并且不应重试。如果您抛出 UnrecoverableMessageHandlingException,则该消息将不会被重试。
注意
不会重试的消息仍会显示在配置的失败传输中。如果您想避免这种情况,请考虑自行处理错误并让处理程序成功结束。
强制重试
有时,处理消息必须以您知道是临时性的方式失败,并且必须重试。如果您抛出 RecoverableMessageHandlingException,则消息将始终无限期地重试,并且 max_retries
设置将被忽略。
您可以通过在 RecoverableMessageHandlingException
的构造函数中设置 retryDelay
参数来定义自定义重试延迟(例如,使用 HTTP 响应中的 Retry-After
标头中的值)。
7.2
retryDelay
参数和 getRetryDelay()
方法在 Symfony 7.2 中引入。
保存 & 重试失败的消息
如果消息失败,它将重试多次 (max_retries
),然后将被丢弃。为避免这种情况发生,您可以配置 failure_transport
。
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
# after retrying, messages will be sent to the "failed" transport
failure_transport: failed
transports:
# ... other transports
failed: 'doctrine://default?queue_name=failed'
在此示例中,如果处理消息失败 3 次(默认 max_retries
),则会将其发送到 failed
传输。虽然您可以像使用普通传输一样使用 messenger:consume failed
来消费此传输,但您通常希望手动查看失败传输中的消息,并选择重试它们。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
# see all messages in the failure transport with a default limit of 50
$ php bin/console messenger:failed:show
# see the 10 first messages
$ php bin/console messenger:failed:show --max=10
# see only MyClass messages
$ php bin/console messenger:failed:show --class-filter='MyClass'
# see the number of messages by message class
$ php bin/console messenger:failed:show --stats
# see details about a specific failure
$ php bin/console messenger:failed:show 20 -vv
# for each message, this command asks whether to retry, skip, or delete
$ php bin/console messenger:failed:retry -vv
# retry specific messages
$ php bin/console messenger:failed:retry 20 30 --force
# remove a message without retrying it
$ php bin/console messenger:failed:remove 20
# remove messages without retrying them and show each message before removing it
$ php bin/console messenger:failed:remove 20 30 --show-messages
# remove all messages in the failure transport
$ php bin/console messenger:failed:remove --all
如果消息再次失败,由于正常的 重试规则,它将重新发送回失败传输。一旦达到最大重试次数,该消息将被永久丢弃。
7.2
在 messenger:failed:retry
命令中跳过消息的选项在 Symfony 7.2 中引入。
多个失败的传输器
有时,配置单个全局 failed transport
是不够的,因为某些消息比其他消息更重要。在这些情况下,您可以仅为特定传输覆盖失败传输。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# config/packages/messenger.yaml
framework:
messenger:
# after retrying, messages will be sent to the "failed" transport
# by default if no "failed_transport" is configured inside a transport
failure_transport: failed_default
transports:
async_priority_high:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
failure_transport: failed_high_priority
# since no failed transport is configured, the one used will be
# the global "failure_transport" set
async_priority_low:
dsn: 'doctrine://default?queue_name=async_priority_low'
failed_default: 'doctrine://default?queue_name=failed_default'
failed_high_priority: 'doctrine://default?queue_name=failed_high_priority'
如果未全局或在传输级别定义 failure_transport
,则消息将在重试次数后被丢弃。
失败命令有一个可选选项 --transport
,用于指定在传输级别配置的 failure_transport
。
1 2 3 4 5 6 7 8
# see all messages in "failure_transport" transport
$ php bin/console messenger:failed:show --transport=failure_transport
# retry specific messages from "failure_transport"
$ php bin/console messenger:failed:retry 20 30 --transport=failure_transport --force
# remove a message without retrying it from "failure_transport"
$ php bin/console messenger:failed:remove 20 --transport=failure_transport
传输器配置
Messenger 支持多种不同的传输类型,每种类型都有自己的选项。选项可以通过 DSN 字符串或配置传递到传输。
1 2
# .env
MESSENGER_TRANSPORT_DSN=amqp://127.0.0.1/%2f/messages?auto_setup=false
1 2 3 4 5 6 7 8
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_transport:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
auto_setup: false
在 options
下定义的选项优先于 DSN 中定义的选项。
AMQP 传输器
AMQP 传输使用 AMQP PHP 扩展将消息发送到 RabbitMQ 等队列。通过运行以下命令安装它:
1
$ composer require symfony/amqp-messenger
AMQP 传输 DSN 可能如下所示:
1 2 3 4 5
# .env
MESSENGER_TRANSPORT_DSN=amqp://guest:guest@localhost:5672/%2f/messages
# or use the AMQPS protocol
MESSENGER_TRANSPORT_DSN=amqps://guest:guest@localhost/%2f/messages
如果您想使用 TLS/SSL 加密的 AMQP,您还必须提供 CA 证书。在 amqp.cacert
PHP.ini 设置(例如 amqp.cacert = /etc/ssl/certs
)或 DSN 的 cacert
参数(例如 amqps://127.0.0.1?cacert=/etc/ssl/certs/
)中定义证书路径。
TLS/SSL 加密的 AMQP 使用的默认端口是 5671,但您可以在 DSN 的 port
参数中覆盖它(例如 amqps://127.0.0.1?cacert=/etc/ssl/certs/&port=12345
)。
注意
默认情况下,传输将自动创建所需的任何交换机、队列和绑定键。这可以禁用,但某些功能可能无法正常工作(例如延迟队列)。要不自动创建任何队列,您可以配置一个 queues: []
的传输。
注意
您可以限制 AMQP 传输的消费者仅处理来自交换机的某些队列的消息。请参阅 Messenger:同步和排队消息处理。
传输有许多其他选项,包括配置交换机、队列绑定键等的方法。请参阅 Connection 上的文档。
传输有许多选项:
auto_setup
(默认值:true
)- 是否应在发送/获取期间自动创建交换机和队列。
cacert
- PEM 格式的 CA 证书文件的路径。
cert
- PEM 格式的客户端证书的路径。
channel_max
- 指定服务器允许的最高通道号。0 表示标准扩展限制。
confirm_timeout
- 确认超时时间(以秒为单位);如果未指定,传输将不等待消息确认。注意:0 秒或更长时间。可以是小数。
connect_timeout
- 连接超时时间。注意:0 秒或更长时间。可以是小数。
frame_max
- 服务器为连接提出的最大帧大小,包括帧头和结束字节。0 表示标准扩展限制(取决于 librabbimq 默认帧大小限制)。
heartbeat
- 服务器想要的连接心跳延迟(以秒为单位)。0 表示服务器不需要心跳。注意,librabbitmq 的心跳支持有限,这意味着仅在阻塞调用期间检查心跳。
host
- AMQP 服务的主机名。
key
- PEM 格式的客户端密钥的路径。
login
- 用于连接 AMQP 服务的用户名。
password
- 用于连接 AMQP 服务的密码。
persistent
(默认值:'false'
)- 连接是否持久。
port
- AMQP 服务的端口。
read_timeout
- 收入活动的超时时间。注意:0 秒或更长时间。可以是小数。
retry
- (无描述可用)
sasl_method
- (无描述可用)
connection_name
- 用于自定义连接名称(至少需要 PHP AMQP 扩展的 1.10 版本)。
verify
- 启用或禁用对等验证。如果启用对等验证,则服务器证书中的公用名称必须与服务器名称匹配。默认情况下启用对等验证。
vhost
- 要与 AMQP 服务一起使用的虚拟主机。
write_timeout
- 结果活动的超时时间。注意:0 秒或更长时间。可以是小数。
delay[queue_name_pattern]
(默认值:delay_%exchange_name%_%routing_key%_%delay%
)- 用于创建队列的模式。
delay[exchange_name]
(默认值:delays
)- 用于延迟/重试消息的交换机的名称。
queues[name][arguments]
- 额外的参数。
queues[name][binding_arguments]
- 绑定队列时要使用的参数。
queues[name][binding_keys]
- 要绑定到此队列的绑定键(如果有)。
queues[name][flags]
(默认值:AMQP_DURABLE
)- 队列标志。
exchange[arguments]
- 交换机的额外参数(例如
alternate-exchange
)。 exchange[default_publish_routing_key]
- 发布时要使用的路由键,如果在消息中未指定。
exchange[flags]
(默认值:AMQP_DURABLE
)- 交换机标志。
exchange[name]
- 交换机的名称。
exchange[type]
(默认值:fanout
)- 交换机的类型。
您还可以通过将 AmqpStamp 添加到您的 Envelope 来配置消息的 AMQP 特定设置。
1 2 3 4 5 6 7
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp;
// ...
$attributes = [];
$bus->dispatch(new SmsNotification(), [
new AmqpStamp('custom-routing-key', AMQP_NOPARAM, $attributes),
]);
警告
消费者不会显示在管理面板中,因为此传输不依赖于阻塞的 \AmqpQueue::consume()
。拥有阻塞接收器会使 messenger:consume
命令的 --time-limit/--memory-limit
选项以及 messenger:stop-workers
命令效率低下,因为它们都依赖于接收器立即返回的事实,无论它是否找到消息。消费 worker 负责迭代,直到它收到要处理的消息和/或直到达到其中一个停止条件。因此,如果 worker 卡在阻塞调用中,则无法达到 worker 的停止逻辑。
Doctrine 传输器
Doctrine 传输可用于将消息存储在数据库表中。通过运行以下命令安装它:
1
$ composer require symfony/doctrine-messenger
Doctrine 传输 DSN 可能如下所示:
1 2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default
格式为 doctrine://<connection_name>
,如果您有多个连接并想使用除“default”以外的连接。传输将自动创建一个名为 messenger_messages
的表。
如果您想更改默认表名,请使用 table_name
选项在 DSN 中传递自定义表名。
1 2
# .env
MESSENGER_TRANSPORT_DSN=doctrine://default?table_name=your_custom_table_name
或者,要自己创建表,请将 auto_setup
选项设置为 false
,并 生成迁移。
传输有许多选项:
table_name
(默认值:messenger_messages
)- 表的名称。
queue_name
(默认值:default
)- 队列的名称(表中的一列,用于为一个表使用多个传输)。
redeliver_timeout
(默认值:3600
)-
重试队列中处于“处理”状态的消息之前的超时时间(如果 worker 由于某种原因停止,则会发生这种情况,最终您应该重试该消息) - 以秒为单位。
注意
将
redeliver_timeout
设置为大于您最慢消息持续时间的值。否则,某些消息将在第一个消息仍在处理时第二次启动。 auto_setup
- 是否应在发送/获取期间自动创建表。
当使用 PostgreSQL 时,您可以访问以下选项来利用 LISTEN/NOTIFY 功能。这允许比 Doctrine 传输的默认轮询行为更有效的方法,因为 PostgreSQL 会在新消息插入表时直接通知 worker。
use_notify
(默认值:true
)- 是否使用 LISTEN/NOTIFY。
check_delayed_interval
(默认值:60000
)- 检查延迟消息的间隔,以毫秒为单位。设置为 0 以禁用检查。
get_notify_timeout
(默认值:0
)- 调用
PDO::pgsqlGetNotify
时等待响应的时间长度,以毫秒为单位。
Beanstalkd 传输器
Beanstalkd 传输将消息直接发送到 Beanstalkd 工作队列。通过运行以下命令安装它:
1
$ composer require symfony/beanstalkd-messenger
Beanstalkd 传输 DSN 可能如下所示:
1 2 3 4 5
# .env
MESSENGER_TRANSPORT_DSN=beanstalkd://127.0.0.1:11300?tube_name=foo&timeout=4&ttr=120
# If no port, it will default to 11300
MESSENGER_TRANSPORT_DSN=beanstalkd://127.0.0.1
传输有许多选项:
tube_name
(默认值:default
)- 队列的名称。
timeout
(默认值:0
)- 消息保留超时时间 - 以秒为单位。0 将导致服务器立即返回响应或抛出 TransportException。
ttr
(默认值:90
)- 消息在放回就绪队列之前运行的时间 - 以秒为单位。
7.2
在 Symfony 7.2 中添加了使用 --keepalive
选项的 Keepalive 支持。
Redis 传输器
Redis 传输使用 streams 来排队消息。此传输需要 Redis PHP 扩展(>=4.3)和正在运行的 Redis 服务器(^5.0)。通过运行以下命令安装它:
1
$ composer require symfony/redis-messenger
Redis 传输 DSN 可能如下所示:
1 2 3 4 5 6 7 8 9 10 11 12
# .env
MESSENGER_TRANSPORT_DSN=redis://127.0.0.1:6379/messages
# Full DSN Example
MESSENGER_TRANSPORT_DSN=redis://password@localhost:6379/messages/symfony/consumer?auto_setup=true&serializer=1&stream_max_entries=0&dbindex=0
# Redis Cluster Example
MESSENGER_TRANSPORT_DSN=redis://host-01:6379,redis://host-02:6379,redis://host-03:6379,redis://host-04:6379
# Unix Socket Example
MESSENGER_TRANSPORT_DSN=redis:///var/run/redis.sock
# TLS Example
MESSENGER_TRANSPORT_DSN=rediss://127.0.0.1:6379/messages
# Multiple Redis Sentinel Hosts Example
MESSENGER_TRANSPORT_DSN=redis:?host[redis1:26379]&host[redis2:26379]&host[redis3:26379]&sentinel_master=db
许多选项可以通过 DSN 或通过 messenger.yaml
中传输下的 options
键进行配置。
stream
(默认值:messages
)- Redis 流名称。
group
(默认值:symfony
)- Redis 消费者组名称。
consumer
(默认值:consumer
)- 在 Redis 中使用的消费者名称。
auto_setup
(默认值:true
)- 是否自动创建 Redis 组。
auth
- Redis 密码。
delete_after_ack
(默认值:true
)- 如果为
true
,则在处理消息后自动删除消息。 delete_after_reject
(默认值:true
)- 如果为
true
,则在拒绝消息后自动删除消息。 lazy
(默认值:false
)- 仅在真正需要连接时才连接。
serializer
(默认值:Redis::SERIALIZER_PHP
)- 如何在 Redis 中序列化最终负载(
Redis::OPT_SERIALIZER
选项)。 stream_max_entries
(默认值:0
)- 流将被修剪到的最大条目数。将其设置为足够大的数字,以避免丢失待处理的消息。
redeliver_timeout
(默认值:3600
)- 重试由废弃的消费者拥有的待处理消息之前的超时时间(以秒为单位)(如果 worker 由于某种原因死亡,则会发生这种情况,最终您应该重试该消息)。
claim_interval
(默认值:60000
)- 应检查待处理/废弃消息以声明的间隔 - 以毫秒为单位。
persistent_id
(默认值:null
)- 字符串,如果为空,则连接是非持久的。
retry_interval
(默认值:0
)- 整数,以毫秒为单位的值。
read_timeout
(默认值:0
)- 浮点数,以秒为单位的值,默认值表示无限。
timeout
(默认值:0
)- 连接超时时间。浮点数,以秒为单位的值,默认值表示无限。
sentinel_master
(默认值:null
)- 字符串,如果为空或空,则禁用 Sentinel 支持。
redis_sentinel
(默认值:null
)-
sentinel_master
选项的别名。7.1
redis_sentinel
选项在 Symfony 7.1 中引入。 ssl
(默认值:null
)-
SSL 上下文选项的映射,用于 TLS 通道。例如,这对于更改测试中 TLS 通道的要求很有用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# config/packages/test/messenger.yaml framework: messenger: transports: redis: dsn: "rediss://127.0.0.1" options: ssl: allow_self_signed: true capture_peer_cert: true capture_peer_cert_chain: true disable_compression: true SNI_enabled: true verify_peer: true verify_peer_name: true
警告
不应有多个 messenger:consume
命令以相同的 stream
、group
和 consumer
组合运行,否则消息最终可能会被多次处理。如果您运行多个队列 worker,consumer
可以设置为环境变量,例如 %env(MESSENGER_CONSUMER_NAME)%
,由 Supervisor(如下例)或用于管理 worker 进程的任何其他服务设置。在容器环境中,HOSTNAME
可以用作消费者名称,因为每个容器/主机只有一个 worker。如果使用 Kubernetes 来编排容器,请考虑使用 StatefulSet
以获得稳定的名称。
提示
将 delete_after_ack
设置为 true
(如果您使用单个组)或定义 stream_max_entries
(如果您可以估计在您的情况下可以接受的最大条目数)以避免内存泄漏。否则,所有消息将永远保留在 Redis 中。
内存传输器
in-memory
传输实际上不传递消息。相反,它在请求期间将它们保存在内存中,这对于测试很有用。例如,如果您有 async_priority_normal
传输,您可以在 test
环境中覆盖它以使用此传输。
1 2 3 4 5
# config/packages/test/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: 'in-memory://'
然后,在测试时,消息不会传递到真正的传输。更好的是,在测试中,您可以检查是否在请求期间准确发送了一条消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// tests/Controller/DefaultControllerTest.php
namespace App\Tests\Controller;
use Symfony\Bundle\FrameworkBundle\Test\WebTestCase;
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport;
class DefaultControllerTest extends WebTestCase
{
public function testSomething(): void
{
$client = static::createClient();
// ...
$this->assertSame(200, $client->getResponse()->getStatusCode());
/** @var InMemoryTransport $transport */
$transport = $this->getContainer()->get('messenger.transport.async_priority_normal');
$this->assertCount(1, $transport->getSent());
}
}
传输有许多选项:
serialize
(布尔值, 默认值:false
)- 是否序列化消息。这对于测试额外的层非常有用,尤其是在您使用自己的消息序列化器时。
注意
所有 in-memory
传输将在扩展 KernelTestCase 或 WebTestCase 的测试类中的每次测试后自动重置。
Amazon SQS
Amazon SQS 传输非常适合托管在 AWS 上的应用程序。通过运行以下命令安装它:
1
$ composer require symfony/amazon-sqs-messenger
SQS 传输 DSN 可能如下所示:
1 2 3
# .env
MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/123456789012/messages?access_key=AKIAIOSFODNN7EXAMPLE&secret_key=j17M97ffSVoKI0briFoo9a
MESSENGER_TRANSPORT_DSN=sqs://127.0.0.1:9494/messages?sslmode=disable
注意
传输将自动创建所需的队列。可以通过将 auto_setup
选项设置为 false
来禁用此功能。
提示
在发送或接收消息之前,Symfony 需要通过调用 AWS 中的 GetQueueUrl
API 将队列名称转换为 AWS 队列 URL。可以通过提供作为队列 URL 的 DSN 来避免此额外的 API 调用。
传输有许多选项:
access_key
- AWS 访问密钥(必须进行 URL 编码)。
account
(默认值: 凭证的所有者)- AWS 账户的标识符。
auto_setup
(默认值:true
)- 是否应在发送/获取期间自动创建队列。
buffer_size
(默认值:9
)- 要预取的消息数。
debug
(默认值:false
)- 如果为
true
,它会记录所有 HTTP 请求和响应(这会影响性能)。 endpoint
(默认值:https://sqs.eu-west-1.amazonaws.com
)- SQS 服务的绝对 URL。
poll_timeout
(默认值:0.1
)- 等待新消息持续时间(以秒为单位)。
queue_name
(默认值:messages
)- 队列的名称。
region
(默认值:eu-west-1
)- AWS 区域的名称。
secret_key
- AWS 密钥(必须进行 URL 编码)。
session_token
- AWS 会话令牌。
visibility_timeout
(默认值: 队列的配置)- 消息将不可见的秒数(可见性超时)。
wait_time
(默认值:20
)- 长轮询持续时间(以秒为单位)。
注意
wait_time
参数定义了 Amazon SQS 在发送响应之前应等待队列中消息可用的最长时间。它通过消除空响应的数量来帮助降低使用 Amazon SQS 的成本。
poll_timeout
参数定义了接收器在返回 null 之前应等待的持续时间。它避免了阻止调用其他接收器。
注意
如果队列名称以 .fifo
结尾,AWS 将创建一个 FIFO 队列。使用 stamp AmazonSqsFifoStamp 来定义 Message group ID
和 Message deduplication ID
。
另一种可能性是启用 AddFifoStampMiddleware。如果您的消息实现了 MessageDeduplicationAwareInterface,则中间件将自动添加 AmazonSqsFifoStamp 并设置 Message deduplication ID
。此外,如果您的消息实现了 MessageGroupAwareInterface,则中间件将自动设置 stamp 的 Message group ID
。
您可以在 专用部分中了解有关中间件的更多信息。
FIFO 队列不支持为每条消息设置延迟,重试策略设置中需要 delay: 0
的值。
7.2
在 Symfony 7.2 中添加了使用 `--keepalive` 选项的 Keepalive 支持。
序列化消息
当消息发送到传输(并从传输接收)时,它们会使用 PHP 的原生 serialize()
和 unserialize()
函数进行序列化。您可以全局更改此设置(或为每个传输更改此设置)为实现 SerializerInterface 的服务。
1 2 3 4 5 6 7 8 9 10 11 12 13
# config/packages/messenger.yaml
framework:
messenger:
serializer:
default_serializer: messenger.transport.symfony_serializer
symfony_serializer:
format: json
context: { }
transports:
async_priority_normal:
dsn: # ...
serializer: messenger.transport.symfony_serializer
messenger.transport.symfony_serializer
是一个内置服务,它使用 Serializer 组件,并且可以通过几种方式进行配置。如果您确实选择使用 Symfony serializer,您可以通过 SerializerStamp 在具体情况下控制上下文(请参阅 信封 & 标记)。
提示
当向/从另一个应用程序发送/接收消息时,您可能需要更多地控制序列化过程。使用自定义序列化器可以提供这种控制。有关详细信息,请参阅 SymfonyCasts 的消息序列化器教程。
运行命令和外部进程
触发命令
可以通过分发 RunCommandMessage 来触发任何命令。Symfony 将负责处理此消息并执行传递给消息参数的命令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use Symfony\Component\Console\Messenger\RunCommandMessage;
use Symfony\Component\Messenger\MessageBusInterface;
class CleanUpService
{
public function __construct(private readonly MessageBusInterface $bus)
{
}
public function cleanUp(): void
{
// Long task with some caching...
// Once finished, dispatch some clean up commands
$this->bus->dispatch(new RunCommandMessage('app:my-cache:clean-up --dir=var/temp'));
$this->bus->dispatch(new RunCommandMessage('cache:clear'));
}
}
您可以配置在命令执行期间出现错误时的行为。为此,您可以在创建 RunCommandMessage 实例时使用 throwOnFailure
和 catchExceptions
参数。
处理完成后,处理程序将返回一个 RunCommandContext,其中包含许多有用的信息,例如退出代码或进程的输出。您可以参考 处理程序结果 上的专用页面以获取更多信息。
触发外部进程
Messenger 提供了一个方便的助手,通过分发消息来运行外部进程。这利用了 Process 组件 的优势。通过分发 RunProcessMessage,Messenger 将负责使用您传递的参数创建一个新进程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Process\Messenger\RunProcessMessage;
class CleanUpService
{
public function __construct(private readonly MessageBusInterface $bus)
{
}
public function cleanUp(): void
{
$this->bus->dispatch(new RunProcessMessage(['rm', '-rf', 'var/log/temp/*'], cwd: '/my/custom/working-dir'));
// ...
}
}
处理完成后,处理程序将返回一个 RunProcessContext,其中包含许多有用的信息,例如退出代码或进程的输出。您可以参考 处理程序结果 上的专用页面以获取更多信息。
Ping Web 服务
有时,您可能需要定期 ping Web 服务以获取其状态,例如,它是否启动或关闭。可以通过分发 PingWebhookMessage 来实现这一点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
use Symfony\Component\HttpClient\Messenger\PingWebhookMessage;
use Symfony\Component\Messenger\MessageBusInterface;
class LivenessService
{
public function __construct(private readonly MessageBusInterface $bus)
{
}
public function ping(): void
{
// An HttpExceptionInterface is thrown on 3xx/4xx/5xx
$this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status'));
// Ping, but does not throw on 3xx/4xx/5xx
$this->bus->dispatch(new PingWebhookMessage('GET', 'https://example.com/status', throw: false));
// Any valid HttpClientInterface option can be used
$this->bus->dispatch(new PingWebhookMessage('POST', 'https://example.com/status', [
'headers' => [
'Authorization' => 'Bearer ...'
],
'json' => [
'data' => 'some-data',
],
]));
}
}
处理程序将返回一个 ResponseInterface,允许您收集和处理 HTTP 请求返回的信息。
从处理器获取结果
当消息被处理时,HandleMessageMiddleware 为处理消息的每个对象添加一个 HandledStamp。您可以使用它来获取处理程序返回的值。
1 2 3 4 5 6 7 8 9 10 11
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\HandledStamp;
$envelope = $messageBus->dispatch(new SomeMessage());
// get the value that was returned by the last message handler
$handledStamp = $envelope->last(HandledStamp::class);
$handledStamp->getResult();
// or get info about all of handlers
$handledStamps = $envelope->all(HandledStamp::class);
在使用命令和查询总线时获取结果
Messenger 组件可以用于 CQRS 架构中,其中命令和查询总线是应用程序的核心部分。阅读 Martin Fowler 关于 CQRS 的 文章 以了解更多信息,并参阅 如何配置多个总线。
由于查询通常是同步的并且期望被处理一次,因此从处理程序获取结果是一个常见的需求。
存在一个 HandleTrait,用于在同步处理时获取处理程序的结果。它还确保只注册一个处理程序。HandleTrait
可以用于任何具有 $messageBus
属性的类中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
// src/Action/ListItems.php
namespace App\Action;
use App\Message\ListItemsQuery;
use App\MessageHandler\ListItemsQueryResult;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;
class ListItems
{
use HandleTrait;
public function __construct(
private MessageBusInterface $messageBus,
) {
}
public function __invoke(): void
{
$result = $this->query(new ListItemsQuery(/* ... */));
// Do something with the result
// ...
}
// Creating such a method is optional, but allows type-hinting the result
private function query(ListItemsQuery $query): ListItemsQueryResult
{
return $this->handle($query);
}
}
因此,您可以使用该 trait 来创建命令和查询总线类。例如,您可以创建一个特殊的 QueryBus
类,并在任何需要查询总线行为的地方注入它,而不是 MessageBusInterface
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
// src/MessageBus/QueryBus.php
namespace App\MessageBus;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\HandleTrait;
use Symfony\Component\Messenger\MessageBusInterface;
class QueryBus
{
use HandleTrait;
public function __construct(
private MessageBusInterface $messageBus,
) {
}
/**
* @param object|Envelope $query
*
* @return mixed The handler returned value
*/
public function query($query): mixed
{
return $this->handle($query);
}
}
自定义处理器
手动配置处理器
Symfony 通常会 自动查找并注册您的处理程序。但是,您也可以在使用 #AsMessageHandler
属性或使用 messenger.message_handler
标记处理程序服务时手动配置处理程序 - 并传递一些额外的配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler(fromTransport: 'async', priority: 10)]
class SmsNotificationHandler
{
public function __invoke(SmsNotification $message): void
{
// ...
}
}
可以使用标签配置的可能选项是:
bus
- 总线的名称,处理程序可以从该总线接收消息,默认为所有总线。
from_transport
- 传输的名称,处理程序可以从该传输接收消息,默认为所有传输。
handles
- 可以由处理程序处理的消息类型 (FQCN),只有在无法通过类型提示猜测时才需要。
method
- 将处理消息的方法的名称。
priority
- 当多个处理程序可以处理同一消息时,处理程序的优先级。
处理多个消息
单个处理程序类可以处理多个消息。为此,请将 #AsMessageHandler
属性添加到所有处理方法中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\OtherSmsNotification;
use App\Message\SmsNotification;
class SmsNotificationHandler
{
#[AsMessageHandler]
public function handleSmsNotification(SmsNotification $message): void
{
// ...
}
#[AsMessageHandler]
public function handleOtherSmsNotification(OtherSmsNotification $message): void
{
// ...
}
}
事务性消息:在处理完成后处理新消息
消息处理程序可以在处理其他消息时 dispatch
新消息,可以分发到同一个总线或不同的总线(如果应用程序有 多个总线)。在此过程中发生的任何错误或异常都可能产生意想不到的后果,例如:
- 如果使用
DoctrineTransactionMiddleware
并且分发的消息抛出异常,则原始处理程序中的任何数据库事务都将被回滚。 - 如果消息被分发到不同的总线,即使当前处理程序稍后的某些代码抛出异常,分发的消息仍将被处理。
RegisterUser
进程示例
考虑一个同时具有命令总线和事件总线的应用程序。该应用程序将名为 RegisterUser
的命令分发到命令总线。该命令由 RegisterUserHandler
处理,后者创建一个 User
对象,将该对象存储到数据库,并将 UserRegistered
消息分发到事件总线。
UserRegistered
消息有许多处理程序,其中一个处理程序可能会向新用户发送欢迎电子邮件。我们正在使用 DoctrineTransactionMiddleware
将所有数据库查询包装在一个数据库事务中。
问题 1: 如果在发送欢迎电子邮件时抛出异常,则不会创建用户,因为 DoctrineTransactionMiddleware
将回滚 Doctrine 事务,在该事务中已创建用户。
问题 2: 如果在将用户保存到数据库时抛出异常,欢迎电子邮件仍会发送,因为它被异步处理。
DispatchAfterCurrentBusMiddleware 中间件
对于许多应用程序,期望的行为是仅在处理程序完全完成后才处理由处理程序分发的消息。这可以通过使用 DispatchAfterCurrentBusMiddleware
并向 消息信封 添加 DispatchAfterCurrentBusStamp
标记来完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
// src/Messenger/CommandHandler/RegisterUserHandler.php
namespace App\Messenger\CommandHandler;
use App\Entity\User;
use App\Messenger\Command\RegisterUser;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp;
class RegisterUserHandler
{
public function __construct(
private MessageBusInterface $eventBus,
private EntityManagerInterface $em,
) {
}
public function __invoke(RegisterUser $command): void
{
$user = new User($command->getUuid(), $command->getName(), $command->getEmail());
$this->em->persist($user);
// The DispatchAfterCurrentBusStamp marks the event message to be handled
// only if this handler does not throw an exception.
$event = new UserRegistered($command->getUuid());
$this->eventBus->dispatch(
(new Envelope($event))
->with(new DispatchAfterCurrentBusStamp())
);
// ...
}
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// src/Messenger/EventSubscriber/WhenUserRegisteredThenSendWelcomeEmail.php
namespace App\Messenger\EventSubscriber;
use App\Entity\User;
use App\Messenger\Event\UserRegistered;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Mime\RawMessage;
class WhenUserRegisteredThenSendWelcomeEmail
{
public function __construct(
private MailerInterface $mailer,
EntityManagerInterface $em,
) {
}
public function __invoke(UserRegistered $event): void
{
$user = $this->em->getRepository(User::class)->find($event->getUuid());
$this->mailer->send(new RawMessage('Welcome '.$user->getFirstName()));
}
}
这意味着 UserRegistered
消息将不会被处理,直到 RegisterUserHandler
完成并且新的 User
被持久化到数据库之后。如果 RegisterUserHandler
遇到异常,则 UserRegistered
事件将永远不会被处理。如果在发送欢迎电子邮件时抛出异常,则 Doctrine 事务将不会被回滚。
注意
如果 WhenUserRegisteredThenSendWelcomeEmail
抛出异常,则该异常将被包装到 DelayedMessageHandlingException
中。使用 DelayedMessageHandlingException::getWrappedExceptions
将为您提供在使用 DispatchAfterCurrentBusStamp
处理消息时抛出的所有异常。
dispatch_after_current_bus
中间件默认启用。如果您手动配置中间件,请确保在中间件链中的 doctrine_transaction
之前注册 dispatch_after_current_bus
。此外,必须为所有正在使用的总线加载 dispatch_after_current_bus
中间件。
将处理器绑定到不同的传输器
每个消息可以有多个处理程序,并且当消息被消费时,所有处理程序都会被调用。但是您也可以配置处理程序仅在从特定传输接收到消息时才被调用。这允许您拥有单个消息,其中每个处理程序都由正在消费不同传输的不同“worker”调用。
假设您有一个带有两个处理程序的 UploadedImage
消息:
ThumbnailUploadedImageHandler
:您希望它由名为image_transport
的传输处理。NotifyAboutNewUploadedImageHandler
:您希望它由名为async_priority_normal
的传输处理。
为此,请将 from_transport
选项添加到每个处理程序。例如:
1 2 3 4 5 6 7 8 9 10 11 12 13
// src/MessageHandler/ThumbnailUploadedImageHandler.php
namespace App\MessageHandler;
use App\Message\UploadedImage;
#[AsMessageHandler(fromTransport: 'image_transport')]
class ThumbnailUploadedImageHandler
{
public function __invoke(UploadedImage $uploadedImage): void
{
// do some thumbnailing
}
}
类似地:
1 2 3 4 5 6 7 8
// src/MessageHandler/NotifyAboutNewUploadedImageHandler.php
// ...
#[AsMessageHandler(fromTransport: 'async_priority_normal')]
class NotifyAboutNewUploadedImageHandler
{
// ...
}
然后,确保将您的消息“路由”到两个传输:
1 2 3 4 5 6 7 8 9 10
# config/packages/messenger.yaml
framework:
messenger:
transports:
async_priority_normal: # ...
image_transport: # ...
routing:
# ...
'App\Message\UploadedImage': [image_transport, async_priority_normal]
就这样!您现在可以消费每个传输:
1 2 3 4
# will only call ThumbnailUploadedImageHandler when handling the message
$ php bin/console messenger:consume image_transport -vv
$ php bin/console messenger:consume async_priority_normal -vv
警告
如果处理程序没有 from_transport
配置,它将在从其接收消息的每个传输上执行。
按批次处理消息
您可以声明“特殊”处理程序,这些处理程序将批量处理消息。通过这样做,处理程序将等待一定数量的消息处于挂起状态,然后再处理它们。批处理处理程序的声明是通过实现 BatchHandlerInterface 来完成的。还提供了 BatchHandlerTrait,以便简化这些特殊处理程序的声明。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
use Symfony\Component\Messenger\Handler\Acknowledger;
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
use Symfony\Component\Messenger\Handler\BatchHandlerTrait;
class MyBatchHandler implements BatchHandlerInterface
{
use BatchHandlerTrait;
public function __invoke(MyMessage $message, ?Acknowledger $ack = null): mixed
{
return $this->handle($message, $ack);
}
private function process(array $jobs): void
{
foreach ($jobs as [$message, $ack]) {
try {
// Compute $result from $message...
// Acknowledge the processing of the message
$ack->ack($result);
} catch (\Throwable $e) {
$ack->nack($e);
}
}
}
// Optionally, you can override some of the trait methods, such as the
// `getBatchSize()` method, to specify your own batch size...
private function getBatchSize(): int
{
return 100;
}
}
注意
当 __invoke()
的 $ack
参数为 null
时,消息预计将同步处理。否则,__invoke()
预计会返回挂起消息的数量。BatchHandlerTrait 为您处理此问题。
注意
默认情况下,挂起的批处理在 worker 空闲以及停止时都会被刷新。
扩展 Messenger
信封 & 标记
消息可以是任何 PHP 对象。有时,您可能需要配置有关消息的一些额外信息 - 例如它在 AMQP 中的处理方式,或者在消息应该被处理之前添加延迟。您可以通过向消息添加“标记”来做到这一点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
public function index(MessageBusInterface $bus): void
{
// wait 5 seconds before processing
$bus->dispatch(new SmsNotification('...'), [
new DelayStamp(5000),
]);
// or explicitly create an Envelope
$bus->dispatch(new Envelope(new SmsNotification('...'), [
new DelayStamp(5000),
]));
// ...
}
在内部,每个消息都包装在 Envelope
中,其中包含消息和标记。您可以手动创建它,或者允许消息总线来完成它。有各种不同的标记用于不同的目的,它们在内部用于跟踪有关消息的信息 - 例如处理它的消息总线,或者它是否在失败后正在重试。
中间件
将消息分发到消息总线时会发生什么取决于其中间件的集合及其顺序。默认情况下,为每个总线配置的中间件如下所示:
add_bus_name_stamp_middleware
- 添加一个标记以记录此消息被分发到哪个总线;dispatch_after_current_bus
- 请参阅 Messenger:同步和排队的消息处理;failed_message_processing_middleware
- 处理正在通过 失败传输 重试的消息,以使其正常运行,就像它们是从原始传输接收的一样;- 您自己的 中间件 集合;
send_message
- 如果为传输配置了路由,则将消息发送到该传输并停止中间件链;handle_message
- 调用给定消息的消息处理程序。
注意
这些中间件名称实际上是快捷方式名称。真正的服务 ID 以 messenger.middleware.
为前缀(例如,messenger.middleware.handle_message
)。
中间件在消息分发时执行,但也在通过 worker 接收消息时再次执行(对于发送到传输以进行异步处理的消息)。如果您创建自己的中间件,请记住这一点。
您可以将自己的中间件添加到此列表,或者完全禁用默认中间件,而仅包含您自己的中间件。
如果中间件服务是抽象的,您可以配置其构造函数的参数,并且每个总线将创建一个不同的实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# config/packages/messenger.yaml
framework:
messenger:
buses:
messenger.bus.default:
# disable the default middleware
default_middleware: false
middleware:
# use and configure parts of the default middleware you want
- 'add_bus_name_stamp_middleware': ['messenger.bus.default']
# add your own services that implement Symfony\Component\Messenger\Middleware\MiddlewareInterface
- 'App\Middleware\MyMiddleware'
- 'App\Middleware\AnotherMiddleware'
提示
如果您安装了 MakerBundle,则可以使用 make:messenger-middleware
命令来引导创建您自己的 messenger 中间件。
Doctrine 的中间件
如果您在应用程序中使用 Doctrine,则存在许多可选的中间件,您可能想要使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
# config/packages/messenger.yaml
framework:
messenger:
buses:
command_bus:
middleware:
# each time a message is handled, the Doctrine connection
# is "pinged" and reconnected if it's closed. Useful
# if your workers run for a long time and the database
# connection is sometimes lost
- doctrine_ping_connection
# After handling, the Doctrine connection is closed,
# which can free up database connections in a worker,
# instead of keeping them open forever
- doctrine_close_connection
# logs an error when a Doctrine transaction was opened but not closed
- doctrine_open_transaction_logger
# wraps all handlers in a single Doctrine transaction
# handlers do not need to call flush() and an error
# in any handler will cause a rollback
- doctrine_transaction
# or pass a different entity manager to any
#- doctrine_transaction: ['custom']
其他中间件
如果您需要在消费者中生成绝对 URL(例如,渲染带有链接的模板),请添加 router_context
中间件。此中间件存储原始请求上下文(即主机、HTTP 端口等),这是构建绝对 URL 所需的。
如果您需要在处理消息对象之前使用 Validator 组件 验证消息对象,请添加 validation
中间件。如果验证失败,将抛出 ValidationFailedException
。ValidationStamp 可用于配置验证组。
1 2 3 4 5 6 7 8
# config/packages/messenger.yaml
framework:
messenger:
buses:
command_bus:
middleware:
- router_context
- validation
Messenger 事件
除了中间件之外,Messenger 还分发几个事件。您可以 创建一个事件监听器 来挂钩到过程的各个部分。对于每个事件,事件类就是事件名称:
额外的处理器参数
可以使用 HandlerArgumentsStamp 让 messenger 将其他数据传递给消息处理程序。在中间件中将此标记添加到信封,并用您希望在处理程序中可用的任何其他数据填充它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// src/Messenger/AdditionalArgumentMiddleware.php
namespace App\Messenger;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
final class AdditionalArgumentMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
$envelope = $envelope->with(new HandlerArgumentsStamp([
$this->resolveAdditionalArgument($envelope->getMessage()),
]));
return $stack->next()->handle($envelope, $stack);
}
private function resolveAdditionalArgument(object $message): mixed
{
// ...
}
}
然后您的处理程序将如下所示:
1 2 3 4 5 6 7 8 9 10 11 12
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
final class SmsNotificationHandler
{
public function __invoke(SmsNotification $message, mixed $additionalArgument)
{
// ...
}
}
自定义数据格式的消息序列化器
如果您从其他应用程序接收消息,则它们可能不完全是您需要的格式。并非所有应用程序都会返回带有 body
和 headers
字段的 JSON 消息。在这些情况下,您需要创建一个新的消息序列化器,实现 SerializerInterface。假设您要创建一个消息解码器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
namespace App\Messenger\Serializer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class MessageWithTokenDecoder implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
try {
// parse the data you received with your custom fields
$data = $encodedEnvelope['data'];
$data['token'] = $encodedEnvelope['token'];
// other operations like getting information from stamps
} catch (\Throwable $throwable) {
// wrap any exception that may occur in the envelope to send it to the failure transport
return new Envelope($throwable);
}
return new Envelope($data);
}
public function encode(Envelope $envelope): array
{
// this decoder does not encode messages, but you can implement it by returning
// an array with serialized stamps if you need to send messages in a custom format
throw new \LogicException('This serializer is only used for decoding messages.');
}
}
下一步是告诉 Symfony 在您的一个或多个传输中使用此序列化器:
1 2 3 4 5 6 7
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_transport:
dsn: '%env(MY_TRANSPORT_DSN)%'
serializer: 'App\Messenger\Serializer\MessageWithTokenDecoder'
多个总线、命令和事件总线
Messenger 默认情况下为您提供单个消息总线服务。但是,您可以配置任意多个,创建“命令”、“查询”或“事件”总线,并控制它们的中间件。
构建应用程序时,常见的架构是将命令与查询分开。命令是执行某些操作的操作,而查询则获取数据。这称为 CQRS(命令查询职责分离)。请参阅 Martin Fowler 关于 CQRS 的 文章 以了解更多信息。可以通过定义多个总线将此架构与 Messenger 组件一起使用。
命令总线与查询总线略有不同。例如,命令总线通常不提供任何结果,而查询总线很少是异步的。您可以使用中间件配置这些总线及其规则。
通过引入事件总线来将操作与反应分开可能也是一个好主意。事件总线可以有零个或多个订阅者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
framework:
messenger:
# The bus that is going to be injected when injecting MessageBusInterface
default_bus: command.bus
buses:
command.bus:
middleware:
- validation
- doctrine_transaction
query.bus:
middleware:
- validation
event.bus:
default_middleware:
enabled: true
# set "allow_no_handlers" to true (default is false) to allow having
# no handler configured for this bus without throwing an exception
allow_no_handlers: false
# set "allow_no_senders" to false (default is true) to throw an exception
# if no sender is configured for this bus
allow_no_senders: true
middleware:
- validation
这将创建三个新服务:
command.bus
:可通过 MessageBusInterface 类型提示自动装配(因为这是default_bus
);query.bus
:可通过MessageBusInterface $queryBus
自动装配;event.bus
:可通过MessageBusInterface $eventBus
自动装配。
限制每个总线的处理器
默认情况下,每个处理程序都可以在所有总线上处理消息。为了防止在没有错误的情况下将消息分发到错误的总线,您可以使用 messenger.message_handler
标签将每个处理程序限制为特定的总线。
1 2 3 4
# config/services.yaml
services:
App\MessageHandler\SomeCommandHandler:
tags: [{ name: messenger.message_handler, bus: command.bus }]
这样,App\MessageHandler\SomeCommandHandler
处理程序将仅为 command.bus
总线所知。
您还可以使用 _instanceof 服务配置 自动将此标签添加到多个类。使用此功能,您可以根据实现的接口确定消息总线。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# config/services.yaml
services:
# ...
_instanceof:
# all services implementing the CommandHandlerInterface
# will be registered on the command.bus bus
App\MessageHandler\CommandHandlerInterface:
tags:
- { name: messenger.message_handler, bus: command.bus }
# while those implementing QueryHandlerInterface will be
# registered on the query.bus bus
App\MessageHandler\QueryHandlerInterface:
tags:
- { name: messenger.message_handler, bus: query.bus }
调试总线
debug:messenger
命令列出每个总线的可用消息和处理程序。您还可以通过提供总线的名称作为参数,将列表限制为特定的总线。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
$ php bin/console debug:messenger
Messenger
=========
command.bus
-----------
The following messages can be dispatched:
---------------------------------------------------------------------------------------
App\Message\DummyCommand
handled by App\MessageHandler\DummyCommandHandler
App\Message\MultipleBusesMessage
handled by App\MessageHandler\MultipleBusesMessageHandler
---------------------------------------------------------------------------------------
query.bus
---------
The following messages can be dispatched:
---------------------------------------------------------------------------------------
App\Message\DummyQuery
handled by App\MessageHandler\DummyQueryHandler
App\Message\MultipleBusesMessage
handled by App\MessageHandler\MultipleBusesMessageHandler
---------------------------------------------------------------------------------------
提示
该命令还将显示消息和处理程序类的 PHPDoc 描述。
重新调度消息
如果您想重新分发消息(使用相同的传输和信封),请创建一个新的 RedispatchMessage 并通过您的总线分发它。重用前面显示的相同 SmsNotification
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
// src/MessageHandler/SmsNotificationHandler.php
namespace App\MessageHandler;
use App\Message\SmsNotification;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Message\RedispatchMessage;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsMessageHandler]
class SmsNotificationHandler
{
public function __construct(private MessageBusInterface $bus)
{
}
public function __invoke(SmsNotification $message): void
{
// do something with the message
// then redispatch it based on your own logic
if ($needsRedispatch) {
$this->bus->dispatch(new RedispatchMessage($message));
}
}
}
内置的 RedispatchMessageHandler 将处理此消息,以通过最初分发它的同一总线重新分发它。您还可以使用 RedispatchMessage
构造函数的第二个参数来提供在重新分发消息时要使用的传输。