Monday 13 April 2020

Azure Service Bus Queue

Overview of Azure Service Bus - Whizlabs Blog

Lately my work is being revolved around Queues as we have a requirement where everyday we need to schedule a task at a particular time of the day. There is a way in spring boot to do this via @Schedule annotation along with cron expression but we wanted something to work on multi-node environment to ensure high availability. So we decided to use azure's service bus queue.
When using queues, components of a distributed application do not communicate directly with each other; instead they exchange messages via a queue, which acts as an intermediary (broker). A message producer (sender) hands off a message to the queue and then continues its processing. Asynchronously, a message consumer (receiver) pulls the message from the queue and processes it. The producer does not have to wait for a reply from the consumer in order to continue to process and send further messages. Queues offer First In, First Out (FIFO) message delivery to one or more competing consumers. That is, messages are typically received and processed by the receivers in the order in which they were added to the queue, and each message is received and processed by only one message consumer.
Service Bus queues are a general-purpose technology that can be used for a wide variety of scenarios:
  • Communication between web and worker roles in a multi-tier Azure application.
  • Communication between on-premises apps and Azure-hosted apps in a hybrid solution.
  • Communication between components of a distributed application running on-premises in different organizations or departments of an organization.

QueueConcepts

Using queues enables you to scale your applications more easily, and enable more resiliency to your architecture.
Service Bus queues support a maximum message size of 256 KB. The header, which includes the standard and custom application properties, can have a maximum size of 64 KB. There is no limit on the number of messages held in a queue but there is a cap on the total size of the messages held by a queue. This queue size is defined at creation time, with an upper limit of 5 GB.


Send Messages To a Queue:

To send messages to a Service Bus Queue, your application instantiates a QueueClient object and sends messages asynchronously. The following code shows how to send a message:

@Component
public class QueueSendService {
   QueueClient queueClient=new QueueClient(new ConnectionStringBuilder(
"ConnectionString","yourQueueName"),ReceiveMode.PEEKLOCK);
    private static final Logger logger = LoggerFactory.getLogger(QueueSendService.class);
    public QueueSendService() throws ServiceBusException, InterruptedException {
    }
    public void addToQueue() {
        logger.info("Sending Data to queue....");
        final String msg="This message will be enqueued after 60 seconds....";
        final Message message=new Message(msg.getBytes(StandardCharsets.UTF_8));
        try {
            queueClient.scheduleMessage(message,
Clock.systemUTC().instant().plusSeconds(60));
        }
        catch (Exception ignored){
        }
        logger.info("Data Sent...");
    }
}


Receive Messages From a Queue:

The primary way to receive messages from a queue is to use a ServiceBusContract object. Received messages can work in two different modes: ReceiveAndDelete and PeekLock.
When using the ReceiveAndDelete mode, receive is a single-shot operation - that is, when Service Bus receives a read request for a message in a queue, it marks the message as being consumed and returns it to the application. ReceiveAndDelete mode (which is the default mode) is the simplest model and works best for scenarios in which an application can tolerate not processing a message in the event of a failure. To understand this, consider a scenario in which the consumer issues the receive request and then crashes before processing it. Because Service Bus has marked the message as being consumed, then when the application restarts and begins consuming messages again, it has missed the message that was consumed prior to the crash.
In PeekLock mode, receive becomes a two stage operation, which makes it possible to support applications that cannot tolerate missing messages. When Service Bus receives a request, it finds the next message to be consumed, locks it to prevent other consumers receiving it, and then returns it to the application. After the application finishes processing the message (or stores it reliably for future processing), it completes the second stage of the receive process by calling complete() on the received message. When Service Bus sees the complete() call, it marks the message as being consumed and remove it from the queue.
The following example demonstrates how messages can be received and processed using PeekLock mode:


public class ListenerService {
    private final Logger logger = LoggerFactory.getLogger(ListenerService.class);
    ListenerService() throws Exception {
        IMessageReceiver receiver = ClientFactory.
createMessageReceiverFromConnectionStringBuilder(
                new ConnectionStringBuilder(yourConnectionString, yourQueueName),
 ReceiveMode.PEEKLOCK);
        this.receiveMessagesAsync(receiver);
    }
    void receiveMessagesAsync(IMessageReceiver receiver) {
        CompletableFuture currentTask = new CompletableFuture();
        try {
            CompletableFuture.runAsync(() -> {
                while (!currentTask.isCancelled()) {
                    try {
                        IMessage message = receiver.receive(Duration.ofSeconds(60));
                        if (message != null) {
                            logger.info("Recieved a Message from queue");
                            receiver.completeAsync(message.getLockToken());
                        }
                    } catch (Exception e) {
                        currentTask.completeExceptionally(e);
                    }
                }
                currentTask.complete(null);
            });
        } catch (Exception e) {
            currentTask.completeExceptionally(e);
        }
    }
}

The only maven dependency that you need to add to your pom.xml is:
        <dependency>
            <groupId>com.microsoft.azure</groupId>
            <artifactId>azure-servicebus</artifactId>
            <version>1.2.8</version>
        </dependency>

A key benefit of using queues is to achieve "temporal decoupling" of application components. In other words, the producers (senders) and consumers (receivers) do not have to be sending and receiving messages at the same time, because messages are stored durably in the queue. Furthermore, the producer does not have to wait for a reply from the consumer in order to continue to process and send messages.
A related benefit is "load leveling," which enables producers and consumers to send and receive messages at different rates. In many applications, the system load varies over time; however, the processing time required for each unit of work is typically constant. As the load increases, more worker processes can be added to read from the queue. Each message is processed by only one of the worker processes. Furthermore, this pull-based load balancing allows for optimum use of the worker computers even if the worker computers differ with regard to processing power, as they pull messages at their own maximum rate. This is often termed the "competing consumer" pattern.
Using queues to intermediate between message producers and consumers provides an inherent loose coupling between the components. Because producers and consumers are not aware of each other, a consumer can be upgraded without having any effect on the producer.

No comments:

Post a Comment