Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Cli\SignalLoop;
use Yiisoft\Queue\Cli\SimpleLoop;
use Yiisoft\Queue\Message\ClassResolver\ArrayMessageClassResolver;
use Yiisoft\Queue\Message\ClassResolver\MessageClassResolverInterface;
use Yiisoft\Queue\Message\Serializer\JsonMessageEncoder;
use Yiisoft\Queue\Message\Serializer\MessageEncoderInterface;
use Yiisoft\Queue\Message\Serializer\MessageSerializer;
Expand Down Expand Up @@ -49,4 +51,10 @@
],
MessageEncoderInterface::class => JsonMessageEncoder::class,
MessageSerializerInterface::class => MessageSerializer::class,
MessageClassResolverInterface::class => [
'class' => ArrayMessageClassResolver::class,
'__construct()' => [
'map' => $params['yiisoft/queue']['messages'],
],
],
];
20 changes: 20 additions & 0 deletions config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Yiisoft\Queue\Debug\QueueCollector;
use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy;
use Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy;
use Yiisoft\Queue\Message\MessageHandlerInterface;
use Yiisoft\Queue\Message\Serializer\MessageSerializer;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\Worker\WorkerInterface;

Expand All @@ -20,6 +22,24 @@
],
],
'yiisoft/queue' => [
/**
* Map of message type to message class. Used by {@see MessageSerializer} to reconstruct the original typed
* message object on unserialize. Example:
* [
* 'send-email' => SendEmailMessage::class,
* 'generate-report' => GenerateReportMessage::class,
* ]
*/
'messages' => [],
/**
* Map of message type to handler. The worker uses this to find the handler for a received message.
* A handler may be a class name implementing {@see MessageHandlerInterface}, a callable, or any definition
* supported by yiisoft/injector. Example:
* [
* 'send-email' => SendEmailHandler::class,
* 'generate-report' => [GenerateReportHandler::class, 'handle'],
* ]
*/
'handlers' => [],
'middlewares-push' => [],
'middlewares-consume' => [],
Expand Down
17 changes: 17 additions & 0 deletions docs/guide/en/configuration-with-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ Advanced applications eventually need the following tweaks:

- **Queue names** — configure queue/back-end per logical queue name via [`yiisoft/queue.queues` config](queue-names.md) when you need to parallelize message handling or send some of them to a different application.
- **Named handlers or callable definitions** — map a short message type to a callable in [`yiisoft/queue.handlers` config](message-handler-advanced.md) when another application is the message producer and you cannot use FQCN as message type.
- **Message class map** — map message types to specific message classes so that
`MessageSerializerInterface::unserialize()` reconstructs the original typed object instead of falling back
to `GenericMessage`. Configure via `yiisoft/queue.messages`:

```php
return [
'yiisoft/queue' => [
'messages' => [
'send-email' => SendEmailMessage::class,
'download-file' => DownloadFileMessage::class,
],
],
];
```

When a type is not present in the map, `GenericMessage` is used as a fallback.

- **Middleware pipelines** — adjust push/consume/failure behavior: collect metrics, modify messages, and so on. See [Middleware pipelines](middleware-pipelines.md) for details.

If you don't have a broker yet (for development, testing, or as a stepping stone before introducing
Expand Down
35 changes: 35 additions & 0 deletions src/Message/ClassResolver/ArrayMessageClassResolver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message\ClassResolver;

use Yiisoft\Queue\Message\MessageInterface;

/**
* Resolves message classes from a predefined map of type-to-class associations.
*/
final class ArrayMessageClassResolver implements MessageClassResolverInterface

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What other implementations do you see?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Search by attribute.

{
/**
* @param array $map Map of message type to message class, where keys are message types and values are
* fully-qualified class names implementing {@see MessageInterface}. For example:
*
* ```php
* [
* 'order.created' => OrderCreatedMessage::class,
* 'send_email' => SendEmailMessage::class,
* ]
* ```
*
* @psalm-param array<string, class-string<MessageInterface>> $map
*/
public function __construct(
private readonly array $map = [],
) {}

public function resolve(string $type): ?string
{
return $this->map[$type] ?? null;
}
}
24 changes: 24 additions & 0 deletions src/Message/ClassResolver/MessageClassResolverInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message\ClassResolver;

use Yiisoft\Queue\Message\MessageInterface;

/**
* Resolves a message class by message type.
*/
interface MessageClassResolverInterface
{
/**
* Returns the message class for the given type, or `null` if the type is not registered.
*
* @param string $type Message type.
*
* @return string|null Message class, or `null` if the type is not registered.
*
* @psalm-return class-string<MessageInterface>|null
*/
public function resolve(string $type): ?string;
}
47 changes: 22 additions & 25 deletions src/Message/Serializer/MessageSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,48 @@

namespace Yiisoft\Queue\Message\Serializer;

use Yiisoft\Queue\Message\Envelope;
use Yiisoft\Queue\Message\ClassResolver\ArrayMessageClassResolver;
use Yiisoft\Queue\Message\ClassResolver\MessageClassResolverInterface;
use Yiisoft\Queue\Message\GenericMessage;
use Yiisoft\Queue\Message\MessageInterface;

use function is_array;
use function is_string;

/**
* Serializes and unserializes queue messages, preserving the original message class in metadata.
* Serializes and unserializes queue messages, resolving the message class via a {@see MessageClassResolverInterface}.
*
* When serializing, assembles an array with `type`, `data`, and `meta` keys and passes it as a single array to
* {@see MessageEncoderInterface}, which encodes it to a string. When unserializing, decodes the string back to an
* array and reconstructs the original message class from the `meta` key, falling back to {@see GenericMessage}
* if the class is missing or invalid.
* array and resolves the message class from the type via the resolver, falling back to {@see GenericMessage}
* if the type is not registered.
*/
final class MessageSerializer implements MessageSerializerInterface
{
private const META_MESSAGE_CLASS = 'message-class';

private readonly MessageClassResolverInterface $resolver;

/**
* @param MessageEncoderInterface $encoder Encoder used to encode and decode message data.
* @param MessageClassResolverInterface|array $classResolver Resolver for message classes, or a map of type to
* class.
*
* @psalm-param MessageClassResolverInterface|array<string, class-string<MessageInterface>> $classResolver
*/
public function __construct(
private readonly MessageEncoderInterface $encoder,
) {}
MessageClassResolverInterface|array $classResolver = [],
) {
$this->resolver = is_array($classResolver)
? new ArrayMessageClassResolver($classResolver)
: $classResolver;
}

public function serialize(MessageInterface $message): string
{
$metadata = $message->getMetadata();

if (!isset($metadata[self::META_MESSAGE_CLASS])) {
$metadata[self::META_MESSAGE_CLASS] = $message instanceof Envelope
? $message->getMessage()::class
: $message::class;
}

return $this->encoder->encode([
'type' => $message->getType(),
'data' => $message->getData(),
'meta' => $metadata,
'meta' => $message->getMetadata(),
]);
}

Expand All @@ -62,15 +67,7 @@ public function unserialize(string $value): MessageInterface
throw new MessageSerializerException('Metadata must be an array. Got ' . get_debug_type($metadata) . '.');
}

$class = $metadata[self::META_MESSAGE_CLASS] ?? GenericMessage::class;

// Don't check subclasses when it's a default class: that's faster
if ($class !== GenericMessage::class
&& (!is_string($class) || !is_subclass_of($class, MessageInterface::class))
) {
$class = GenericMessage::class;
}
/** @var class-string<MessageInterface> $class */
$class = $this->resolver->resolve($type) ?? GenericMessage::class;

return $class::fromData($type, $data['data'] ?? null)->withMetadata($metadata);
}
Expand Down
46 changes: 46 additions & 0 deletions tests/Unit/Message/ClassResolver/ArrayMessageClassResolverTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Tests\Unit\Message\ClassResolver;

use PHPUnit\Framework\TestCase;
use Yiisoft\Queue\Message\ClassResolver\ArrayMessageClassResolver;
use Yiisoft\Queue\Message\GenericMessage;
use Yiisoft\Queue\Tests\Unit\Support\TestMessage;

final class ArrayMessageClassResolverTest extends TestCase
{
public function testResolveRegisteredType(): void
{
$resolver = new ArrayMessageClassResolver(['test' => TestMessage::class]);

$this->assertSame(TestMessage::class, $resolver->resolve('test'));
}

public function testResolveUnregisteredTypeReturnsNull(): void
{
$resolver = new ArrayMessageClassResolver(['test' => TestMessage::class]);

$this->assertNull($resolver->resolve('unknown'));
}

public function testResolveWithEmptyMapReturnsNull(): void
{
$resolver = new ArrayMessageClassResolver();

$this->assertNull($resolver->resolve('test'));
}

public function testResolveMultipleTypes(): void
{
$resolver = new ArrayMessageClassResolver([
'generic' => GenericMessage::class,
'test' => TestMessage::class,
]);

$this->assertSame(GenericMessage::class, $resolver->resolve('generic'));
$this->assertSame(TestMessage::class, $resolver->resolve('test'));
$this->assertNull($resolver->resolve('not-registered'));
}
}
36 changes: 8 additions & 28 deletions tests/Unit/Message/Serializer/MessageSerializerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,30 +61,15 @@ public function testUnsupportedMetadata(mixed $metadata): void
$this->createSerializer()->unserialize($value);
}

public function testDefaultMessageClassFallbackWrongClass(): void
{
$payload = [
'type' => 'handler',
'data' => 'test',
'meta' => [
'message-class' => 'NonExistentClass',
],
];

$message = $this->createSerializer()->unserialize(json_encode($payload, JSON_THROW_ON_ERROR));

$this->assertInstanceOf(GenericMessage::class, $message);
}

public function testDefaultMessageClassFallbackClassNotSet(): void
public function testFallbackToGenericMessageForUnknownType(): void
{
$payload = [
'type' => 'handler',
'data' => 'test',
'meta' => [],
];

$message = $this->createSerializer()->unserialize(json_encode($payload, JSON_THROW_ON_ERROR));
$message = $this->createSerializer()->unserialize(json_encode($payload));

$this->assertInstanceOf(GenericMessage::class, $message);
}
Expand Down Expand Up @@ -116,7 +101,7 @@ public function testSerialize(): void
$json = $this->createSerializer()->serialize($message);

$this->assertEquals(
'{"type":"handler","data":"test","meta":{"message-class":"Yiisoft\\\\Queue\\\\Message\\\\GenericMessage"}}',
'{"type":"handler","data":"test","meta":[]}',
$json,
);
}
Expand All @@ -129,11 +114,7 @@ public function testSerializeEnvelopeStack(): void
$json = $serializer->serialize($message);

$this->assertEquals(
sprintf(
'{"type":"handler","data":"test","meta":{"%s":"test-id","message-class":"%s"}}',
IdEnvelope::META_ID,
str_replace('\\', '\\\\', GenericMessage::class),
),
sprintf('{"type":"handler","data":"test","meta":{"%s":"test-id"}}', IdEnvelope::META_ID),
$json,
);

Expand All @@ -142,14 +123,13 @@ public function testSerializeEnvelopeStack(): void
$this->assertInstanceOf(GenericMessage::class, $restored);
$this->assertEquals([
IdEnvelope::META_ID => 'test-id',
'message-class' => GenericMessage::class,
], $restored->getMetadata());
}

public function testRestoreOriginalMessageClass(): void
{
$message = new TestMessage();
$serializer = $this->createSerializer();
$serializer = $this->createSerializer(['test' => TestMessage::class]);

$restored = $serializer->unserialize($serializer->serialize($message));

Expand All @@ -159,15 +139,15 @@ public function testRestoreOriginalMessageClass(): void
public function testRestoreOriginalMessageClassWithEnvelope(): void
{
$message = new IdEnvelope(new TestMessage(), 1);
$serializer = $this->createSerializer();
$serializer = $this->createSerializer(['test' => TestMessage::class]);

$restored = $serializer->unserialize($serializer->serialize($message));

$this->assertInstanceOf(TestMessage::class, $restored);
}

private function createSerializer(): MessageSerializer
private function createSerializer(array $classResolver = []): MessageSerializer
{
return new MessageSerializer(new JsonMessageEncoder());
return new MessageSerializer(new JsonMessageEncoder(), $classResolver);
}
}
Loading