消息组件
消息组件帮助应用程序向其他应用程序或通过消息队列发送和接收消息。
该组件很大程度上受到 Matthias Noback 关于命令总线的博客文章系列和 SimpleBus 项目的启发。
另请参阅
本文解释了如何在任何 PHP 应用程序中将 Messenger 功能用作独立组件。阅读Messenger:同步和队列消息处理文章,了解如何在 Symfony 应用程序中使用它。
安装
1
$ composer require symfony/messenger
注意
如果您在 Symfony 应用程序之外安装此组件,则必须在代码中引入 vendor/autoload.php
文件,以启用 Composer 提供的类自动加载机制。阅读本文了解更多详情。
概念
- 发送器 (Sender):
- 负责序列化消息并将其发送到某处。这个某处可以是消息代理或第三方 API 等。
- 接收器 (Receiver):
- 负责检索、反序列化消息并将消息转发给处理器。这可以是消息队列拉取器或 API 端点等。
- 处理器 (Handler):
- 负责使用适用于消息的业务逻辑处理消息。处理器由
HandleMessageMiddleware
中间件调用。 - 中间件 (Middleware):
- 中间件可以在消息通过总线分发时访问消息及其包装器(信封)。字面意思是“中间的软件”,这些与应用程序的核心关注点(业务逻辑)无关。相反,它们是适用于整个应用程序并影响整个消息总线的横切关注点。例如:日志记录、验证消息、启动事务等。它们还负责调用链中的下一个中间件,这意味着它们可以调整信封,通过向其添加印章甚至替换它,以及中断中间件链。中间件在最初分发消息时和稍后从传输接收消息时都会被调用。
- 信封 (Envelope):
- Messenger 特有的概念,它通过将消息包装在其中,从而在消息总线内部提供完全的灵活性,允许通过信封印章在内部添加有用的信息。
- 信封印章 (Envelope Stamps):
- 您需要附加到消息的信息片段:用于传输的序列化器上下文、标识已接收消息的标记或您的中间件或传输层可能使用的任何类型的元数据。
总线 (Bus)
总线用于分发消息。总线的行为在其有序的中间件堆栈中。该组件附带一组您可以使用的中间件。
当在 Symfony 的 FrameworkBundle 中使用消息总线时,将为您配置以下中间件
- SendMessageMiddleware(启用异步处理,如果您提供记录器,则记录消息的处理过程)
- HandleMessageMiddleware(调用注册的处理器)
示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
use App\Message\MyMessage;
use App\MessageHandler\MyMessageHandler;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
$handler = new MyMessageHandler();
$bus = new MessageBus([
new HandleMessageMiddleware(new HandlersLocator([
MyMessage::class => [$handler],
])),
]);
$bus->dispatch(new MyMessage(/* ... */));
注意
每个中间件都需要实现 MiddlewareInterface。
处理器 (Handlers)
一旦分发到总线,消息将由“消息处理器”处理。消息处理器是一个 PHP 可调用对象(即函数或类的实例),它将为您的消息执行所需的处理
1 2 3 4 5 6 7 8 9 10 11
namespace App\MessageHandler;
use App\Message\MyMessage;
class MyMessageHandler
{
public function __invoke(MyMessage $message): void
{
// Message processing...
}
}
向消息添加元数据 (信封)
如果您需要向消息添加元数据或某些配置,请使用 Envelope 类包装它并添加印章。例如,要设置消息通过传输层时使用的序列化组,请使用 SerializerStamp
印章
1 2 3 4 5 6 7 8 9 10
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\SerializerStamp;
$bus->dispatch(
(new Envelope($message))->with(new SerializerStamp([
// groups are applied to the whole message, so make sure
// to define the group for every embedded object
'groups' => ['my_serialization_groups'],
]))
);
以下是 Symfony Messenger 附带的一些重要的信封印章
- DelayStamp,用于延迟异步消息的处理。
- DispatchAfterCurrentBusStamp,使消息在当前总线执行后处理。阅读 Messenger:同步和队列消息处理 了解更多信息。
- HandledStamp,一个将消息标记为由特定处理器处理的印章。允许访问处理器返回值和处理器名称。
- ReceivedStamp,一个内部印章,将消息标记为从传输接收。
- SentStamp,一个将消息标记为由特定发送器发送的印章。允许从 SendersLocator 访问发送器 FQCN 和别名(如果可用)。
- SerializerStamp,用于配置传输使用的序列化组。
- ValidationStamp,用于配置启用验证中间件时使用的验证组。
- ErrorDetailsStamp,当消息由于处理器中的异常而失败时的内部印章。
- ScheduledStamp,一个将消息标记为由调度器生成的印章。这有助于将其与“手动”创建的消息区分开来。您可以在调度器文档中了解更多信息。
注意
ErrorDetailsStamp 印章包含一个 FlattenException,它是导致消息失败的异常的表示形式。您可以使用 getFlattenException() 方法获取此异常。由于 FlattenExceptionNormalizer,此异常已规范化,这有助于 Messenger 上下文中的错误报告。
您在中间件中收到的不是直接处理消息,而是处理信封。因此,您可以检查信封内容及其印章,或添加任何内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
use App\Message\Stamp\AnotherStamp;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
use Symfony\Component\Messenger\Middleware\StackInterface;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
class MyOwnMiddleware implements MiddlewareInterface
{
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
if (null !== $envelope->last(ReceivedStamp::class)) {
// Message just has been received...
// You could for example add another stamp.
$envelope = $envelope->with(new AnotherStamp(/* ... */));
} else {
// Message was just originally dispatched
}
return $stack->next()->handle($envelope, $stack);
}
}
如果消息刚刚被接收(即至少有一个 ReceivedStamp
印章),则以上示例会将消息转发到带有附加印章的下一个中间件。您可以通过实现 StampInterface 来创建您自己的印章。
如果要检查信封上的所有印章,请使用 $envelope->all()
方法,该方法返回按类型 (FQCN) 分组的所有印章。或者,您可以使用 FQCN 作为此方法的第一个参数(例如 $envelope->all(ReceivedStamp::class)
)迭代特定类型的所有印章。
注意
如果使用 Serializer 基本序列化器通过传输,则任何印章都必须使用 Symfony Serializer 组件进行序列化。
传输 (Transports)
为了发送和接收消息,您必须配置传输。传输将负责与您的消息代理或第三方通信。
您自己的发送器 (Sender)
假设您已经有一个 ImportantAction
消息通过消息总线并由处理器处理。现在,您还想将此消息作为电子邮件发送(使用 Mime 和 Mailer 组件)。
使用 SenderInterface,您可以创建自己的消息发送器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
namespace App\MessageSender;
use App\Message\ImportantAction;
use Symfony\Component\Mailer\MailerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Mime\Email;
class ImportantActionToEmailSender implements SenderInterface
{
public function __construct(
private MailerInterface $mailer,
private string $toEmail,
) {
}
public function send(Envelope $envelope): Envelope
{
$message = $envelope->getMessage();
if (!$message instanceof ImportantAction) {
throw new \InvalidArgumentException(sprintf('This transport only supports "%s" messages.', ImportantAction::class));
}
$this->mailer->send(
(new Email())
->to($this->toEmail)
->subject('Important action made')
->html('<h1>Important action</h1><p>Made by '.$message->getUsername().'</p>')
);
return $envelope;
}
}
您自己的接收器 (Receiver)
接收器负责从源获取消息并将其分发到应用程序。
假设您已经在应用程序中使用 NewOrder
消息处理了一些“订单”。现在您想与第三方或遗留应用程序集成,但您无法使用 API,需要使用包含新订单的共享 CSV 文件。
您将读取此 CSV 文件并分发 NewOrder
消息。您需要做的就是编写自己的 CSV 接收器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
namespace App\MessageReceiver;
use App\Message\NewOrder;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Serializer\SerializerInterface;
class NewOrdersFromCsvFileReceiver implements ReceiverInterface
{
private $connection;
public function __construct(
private SerializerInterface $serializer,
private string $filePath,
) {
// Available connection bundled with the Messenger component
// can be found in "Symfony\Component\Messenger\Bridge\*\Transport\Connection".
$this->connection = /* create your connection */;
}
public function get(): iterable
{
// Receive the envelope according to your transport ($yourEnvelope here),
// in most cases, using a connection is the easiest solution.
$yourEnvelope = $this->connection->get();
if (null === $yourEnvelope) {
return [];
}
try {
$envelope = $this->serializer->decode([
'body' => $yourEnvelope['body'],
'headers' => $yourEnvelope['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($yourEnvelope['id']);
throw $exception;
}
return [$envelope->with(new CustomStamp($yourEnvelope['id']))];
}
public function ack(Envelope $envelope): void
{
// Add information about the handled message
}
public function reject(Envelope $envelope): void
{
// In the case of a custom connection
$id = /* get the message id thanks to information or stamps present in the envelope */;
$this->connection->reject($id);
}
}
同一总线上的接收器和发送器
为了允许在同一总线上发送和接收消息并防止无限循环,消息总线会将 ReceivedStamp 印章添加到消息信封,并且 SendMessageMiddleware 中间件将知道不应再次将这些消息路由到传输。