Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreThis is part 4 of a 6 part series, with new posts Mondays and Thursdays, introducing Microsoft Azure for Spring developers. I couldn't have put this together without input from Microsoft's Asir Vedamuthu Selvasingh, Yitao Dong, Bruno Borges, Brian Benz and Theresa Nguyen. You can find the code for this series on Github. Hit me up on Twitter (@starbuxman) as you're reading the installments with any feedback or questions. You can also learn more about Microsoft Azure in my Spring Tips (@SpringTipsLive) installment, Bootiful Azure
Here are all the installments:
Azure Service Bus is a cloud messaging as a service and integration technology. It is, just like CosmosDB, as flexible as possible. It supports the AMQP 1.0 protocol, like RabbitMQ. AMQP is a flexible wire protocol. The protocol itself includes instructions for administering the broker, beyond just interacting with it. AMQP brokers are ideal for integration because they are language- and platform-agnostic. In an AMQP broker producers send messages to exchanges which then route the messages to queues, from which consumers then read the messages. The exchange is responsible for deciding to which queue the message should be sent. It does this in any of a number of ways but it usually involves looking at a key in the message headers called the routing key.
This indirection between the exhcange and the queues makes AMQP a bit more flexible than JMS-based brokers where producers send messages directly to Destination
objects that consumers then read from. This means that producers and consumers are coupled by their choice of Destination
. Additionally, JMS is an API for the JVM, it is not a wire protocol. As such, producers and consumers are dependent on the version of the library they're using being correct. That said, you can also use Azure Service Bus through the JMS API.
Like I said, Azure Service Bus is nothing if not flexible!
The AMQP model is illustrative because, basically, the native model for Azure Service Bus looks like AMQP. In Azure Service Bus you have topics or queues to which you send messages. Messages are then connected to subscriptions, from which consumers read. Let's build a simple example that sends and then consumes messages. We won't use AMQP or JMS, just the regular Microsoft Azure ServiceBus API.
You'll need to provision a servicebus namespace, a topic (top which we send messages and from which multiple consumers may listen) and a subscription (a consumer to either a topic or a queue) to connect to the topic. Here's an example script that does just that.
#!/usr/bin/env bash
destination=messages
topic=${destination}-topic
subscription=${destination}-subscription
namespace=bootiful
rg=$1
az servicebus namespace create --resource-group $rg \
--name ${namespace}
az servicebus topic create --resource-group $rg \
--namespace-name ${namespace} \
--name ${topic}
az servicebus topic subscription create --resource-group $rg \
--namespace-name ${namespace} --topic-name ${topic} \
--name ${subscription}
You'll need a connection string in order to connect your Spring application to the servicebus. Run this command and note the primaryConnectionString
attribute value for later.
az servicebus namespace authorization-rule keys list --resource-group bootiful --namespace-name bootiful --name RootManageSharedAccessKey
Add the following dependency to your build: com.microsoft.azure
: azure-servicebus-spring-boot-starter
.
We'll write two components: one a producer and the other a consumer. In a real application these things would naturally live in separate applications and separate processes. Messaging serves to support the integration of disparate applications, after all. We'll look at the consumer first. The consumer needs to register a subscriber before something else has produced the message, so we'll make these beans ordered - the Spring container will order their initialization one before the other based on the Ordered
value we give it.
package com.example.bootifulazure;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Log4j2
@Component
class ServiceBusConsumer implements Ordered {
private final ISubscriptionClient iSubscriptionClient;
ServiceBusConsumer(ISubscriptionClient isc) {
this.iSubscriptionClient = isc;
}
@EventListener(ApplicationReadyEvent.class)
public void consume() throws Exception {
this.iSubscriptionClient.registerMessageHandler(new IMessageHandler() {
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
log.info("received message " + new String(message.getBody()) + " with body ID " + message.getMessageId());
return CompletableFuture.completedFuture(null);
}
@Override
public void notifyException(Throwable exception, ExceptionPhase phase) {
log.error("eeks!", exception);
}
});
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
When a message arrives, we log its body
and messageId
.
Now, let's look at the producer.
package com.example.bootifulazure;
import com.microsoft.azure.servicebus.ITopicClient;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.time.Instant;
@Log4j2
@Component
class ServiceBusProducer implements Ordered {
private final ITopicClient iTopicClient;
ServiceBusProducer(ITopicClient iTopicClient) {
this.iTopicClient = iTopicClient;
}
@EventListener(ApplicationReadyEvent.class)
public void produce() throws Exception {
this.iTopicClient.send(new Message("Hello @ " + Instant.now().toString()));
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}
Pretty straightforward right? The meat of the classes are in the consume()
and produce()
methods. The consumer runs first, then the producer. If you've ever done any work with messaging technoogies you might find the lack of a mention of any sort of destination - the topic or queue - a bit puzzling. That configuration all lives in the properties (such as those in your application.properties
file) and are used when auto-configuring the ITopicClient
and ISubscriptionClient
. If you want to send messages or consume messages from multiple destinations, simply define the relevant beans yourself and make sure to not specify azure.service-bus.connection-string
in your application's properties, otherwise the default Spring Boot auto-configuration will kick in and try to create these beans for you.