How to work with queues

OpenLoyalty uses queue system to process tasks in an asynchronous way. We have chosen RabbitMQ as a queue provider. In case you need to create a process which uses queue you have to create a consumer and a producer. The producer is responsible for either creating a request to run the process or running this process immediately when queue processing is disabled. The consumer is a class which processes elements from queue.

Configuration

In Symfony config files a simple default configuration is defined. Administrators can enable the queue by setting queue_enabled to true.

queue_enabled: false
queue_host: queue
queue_port: 5672
queue_user: guest
queue_password: guest

Example how to use queue along with CQRS

Below you may find a description on how to a producer for recreating segments. This is a real use case from the application.

1. You don’t have to create a consumer. The general Consumer class already exists at src/OpenLoyalty/Component/Core/Infrastructure/Queue/RabbitMq/Consumer.php This Consumer uses CommandBus to dispatch Commands taken from queue.

2. Next, create a producer for sync and async processing. Classes OpenLoyaltyComponentSegmentInfrastructureQueueProducer should implement a specific interface SegmentProducerInterface). The only required dependency is on the queue connection that implements QueueProducerConnectionInterface

Now, you have to configure connection and Producer class in the container.

oloy.segment.rabbitmq.connection:
    class: OpenLoyalty\Infrastructure\Core\Queue\RabbitMq\Producer
    arguments:
        $connection: '@old_sound_rabbit_mq.segment_producer'
    calls:
        - [setLogger, ['@logger']]

OpenLoyalty\Component\Segment\Infrastructure\Queue\Producer:
    arguments:
        $queueProducerConnection: '@oloy.segment.rabbitmq.connection'
        $queueEnabled: '%queue_enabled%'
  1. Third step is to create configuration of the connection between application and the queue system (RabbitMq in our case).
old_sound_rabbit_mq:
    producers:
        segment:
            connection:       default
            exchange_options: {name: 'segment', type: direct}
            enable_logger: true
    consumers:
        segment-dead-letters:
            connection:       default
            exchange_options: { name: 'segment-dead-letters', type: fanout }
            queue_options:    { name: 'segment-dead-letters' }
            callback:         OpenLoyalty\Infrastructure\Core\Queue\RabbitMq\DeadLetter
    multiple_consumers:
        segment:
            connection:       default
            exchange_options: {name: 'segment', type: direct}
            enable_logger: true
            queues:
                recreate-segment:
                    name: recreate-segment
                    arguments:
                        x-dead-letter-exchange:    ['S', 'segment-dead-letters']
                        x-dead-letter-routing-key: ['S', 'segment-dead-letters']
                    callback: OpenLoyalty\Infrastructure\Core\Queue\RabbitMq\Consumer
                    routing_keys:
                        - recreate-segment

The name of this service MUST be of old_sound_rabbit_mq.{producer_attribute_value}_producer format.

4. Finally, the consumer should be registered as a supervisord worker. Add your configuration to the directories named: docker/dev/php/conf/supervisord/conf.d/ and docker/prod/php/conf/supervisord/conf.d/

Example of such a file is docker/dev/php/conf/supervisord/conf.d/segment-consumer.conf.

Run the consumer manually

In order to run the consumer manually in the console, type:

bin/console rabbitmq:multiple-consumer segment

Example how to use queue without CQRS

The above example strongly uses CQRS pattern and dispatches commands to the command bus. However, sometimes you need a custom consumer for unusual cases.

Below you may find a description on how to create both consumer and a producer for sending emails. This is also a real use case from the application.

1. First, create a consumer. This class must implement OldSoundRabbitMqBundleRabbitMqConsumerInterface. The consumer must be registered in container as simple service with injected dependencies.

In this case

OpenLoyalty\Infrastructure\Messaging\Queue\RabbitMq\EmailConsumer:
    arguments:
        $logger: '@logger'
        $mailer: '@oloy.mailer'

2. Next, create a producer. This step is the same like in the above example with CQRS. So let’s see how the configuration looks like for sending emails.

oloy.email.rabbitmq.connection:
    class: OpenLoyalty\Infrastructure\Core\Queue\RabbitMq\Producer
    arguments:
        $connection: '@old_sound_rabbit_mq.email_producer'
    calls:
        - [setLogger, ['@logger']]

OpenLoyalty\Infrastructure\Messaging\Queue\RabbitMq\EmailProducer:
    arguments:
        $mailer: '@oloy.mailer'
        $queueProducerConnection: '@oloy.email.rabbitmq.connection'
        $queueEnabled: '%queue_enabled%'

Nothing different than in the previous example.

3. Third step is to create configuration of the connection between application and the queue system (RabbitMq in our case). In this case, the only difference is the “callback” for multiple consumers. We don’t use general Consumer from Core to dispatch commands to the command bus but rather our own implementation from the step 1.

old_sound_rabbit_mq:
    producers:
        email:
            connection:       default
            exchange_options: {name: 'email', type: direct}
            enable_logger: true
    consumers:
        email-dead-letters:
            connection:       default
            exchange_options: { name: 'email-dead-letters', type: fanout }
            queue_options:    { name: 'email-dead-letters' }
            callback:         OpenLoyalty\Infrastructure\Core\Queue\RabbitMq\DeadLetter
    multiple_consumers:
        email:
            connection:       default
            exchange_options: {name: 'email', type: direct}
            enable_logger: true
            queues:
                email-queue:
                    name: email-queue
                    arguments:
                        x-dead-letter-exchange:    ['S', 'email-dead-letters']
                        x-dead-letter-routing-key: ['S', 'email-dead-letters']
                    callback: OpenLoyalty\Infrastructure\Messaging\Queue\RabbitMq\EmailConsumer
                    routing_keys:
                        - email-queue

The name of this service MUST be of old_sound_rabbit_mq.{producer_attribute_value}_producer format.

4. Finally, the consumer should be registered as a supervisord worker. Add your configuration to the directories named: docker/dev/php/conf/supervisord/conf.d/ and docker/prod/php/conf/supervisord/conf.d/

Example of such a file is docker/dev/php/conf/supervisord/conf.d/segment-consumer.conf.

Run the consumer manually

In order to run the consumer manually in the console, type:

bin/console rabbitmq:multiple-consumer email