The Spring AMQP Samples project includes two sample applications. The first is a simple "Hello World" example that demonstrates both synchronous and asynchronous message reception. It provides an excellent starting point for acquiring an understanding of the essential components. The second sample is based on a stock-trading use case to demonstrate the types of interaction that would be common in real world applications. In this chapter, we will provide a quick walk-through of each sample so that you can focus on the most important components. The samples are both Maven-based, so you should be able to import them directly into any Maven-aware IDE (such as SpringSource Tool Suite).
The Hello World sample demonstrates both synchronous and asynchronous message reception. You can import the 'spring-rabbit-helloworld' sample into the IDE and then follow the discussion below.
Within the 'src/main/java' directory, navigate to the 'org.springframework.amqp.helloworld' package. Open the HelloWorldConfiguration class and notice that it contains the @Configuration annotation at class-level and some @Bean annotations at method-level. This is an example of Spring's Java-based configuration. You can read more about that here.
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; }
The configuration also contains an instance
of RabbitAdmin
, which by default looks
for any beans of type Exchange, Queue, or Binding and then
declares them on the broker. In fact, the "helloWorldQueue" bean
that is generated in HelloWorldConfiguration is an example
simply because it is an instance of Queue.
@Bean public Queue helloWorldQueue() { return new Queue(this.helloWorldQueueName); }
Looking back at the "rabbitTemplate" bean configuration, you will see that it has the helloWorldQueue's name set as its "queue" property (for receiving Messages) and for its "routingKey" property (for sending Messages).
Now that we've explored the configuration, let's look at the code that actually uses these components. First, open the Producer class from within the same package. It contains a main() method where the Spring ApplicationContext is created.
public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class); AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class); amqpTemplate.convertAndSend("Hello World"); System.out.println("Sent: Hello World"); }
As you can see in the example above, the AmqpTemplate bean is retrieved and used for sending a Message. Since the client code should rely on interfaces whenever possible, the type is AmqpTemplate rather than RabbitTemplate. Even though the bean created in HelloWorldConfiguration is an instance of RabbitTemplate, relying on the interface means that this code is more portable (the configuration can be changed independently of the code). Since the convertAndSend() method is invoked, the template will be delegating to its MessageConverter instance. In this case, it's using the default SimpleMessageConverter, but a different implementation could be provided to the "rabbitTemplate" bean as defined in HelloWorldConfiguration.
Now open the Consumer class. It actually shares the same configuration base class which means it will be sharing the "rabbitTemplate" bean. That's why we configured that template with both a "routingKey" (for sending) and "queue" (for receiving). As you saw in Section 3.3, “AmqpTemplate”, you could instead pass the 'routingKey' argument to the send method and the 'queue' argument to the receive method. The Consumer code is basically a mirror image of the Producer, calling receiveAndConvert() rather than convertAndSend().
public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(RabbitConfiguration.class); AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class); System.out.println("Received: " + amqpTemplate.receiveAndConvert()); }
If you run the Producer, and then run the Consumer, you should see the message "Received: Hello World" in the console output.
Now that we've walked through the synchronous Hello World sample, it's time to move on to a slightly more advanced but significantly more powerful option. With a few modifications, the Hello World sample can provide an example of asynchronous reception, a.k.a. Message-driven POJOs. In fact, there is a sub-package that provides exactly that: org.springframework.amqp.samples.helloworld.async.
Once again, we will start with the sending side. Open the ProducerConfiguration class and notice that it creates a "connectionFactory" and "rabbitTemplate" bean. This time, since the configuration is dedicated to the message sending side, we don't even need any Queue definitions, and the RabbitTemplate only has the 'routingKey' property set. Recall that messages are sent to an Exchange rather than being sent directly to a Queue. The AMQP default Exchange is a direct Exchange with no name. All Queues are bound to that default Exchange with their name as the routing key. That is why we only need to provide the routing key here.
public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setRoutingKey(this.helloWorldQueueName); return template; }
Since this sample will be demonstrating asynchronous message reception, the producing side is designed to continuously send messages (if it were a message-per-execution model like the synchronous version, it would not be quite so obvious that it is in fact a message-driven consumer). The component responsible for sending messages continuously is defined as an inner class within the ProducerConfiguration. It is configured to execute every 3 seconds.
static class ScheduledProducer { @Autowired private volatile RabbitTemplate rabbitTemplate; private final AtomicInteger counter = new AtomicInteger(); @Scheduled(fixedRate = 3000) public void sendMessage() { rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet()); } }
You don't need to understand all of the details since the real focus should be on the receiving side (which we will cover momentarily). However, if you are not yet familiar with Spring 3.0 task scheduling support, you can learn more here. The short story is that the "postProcessor" bean in the ProducerConfiguration is registering the task with a scheduler.
Now, let's turn to the receiving side. To emphasize the Message-driven POJO behavior will start with the component that is reacting to the messages. The class is called HelloWorldHandler.
public class HelloWorldHandler { public void handleMessage(String text) { System.out.println("Received: " + text); } }
Clearly, that is a POJO. It does not extend any base class, it doesn't implement any interfaces, and it doesn't even contain any imports. It is being "adapted" to the MessageListener interface by the Spring AMQP MessageListenerAdapter. That adapter can then be configured on a SimpleMessageListenerContainer. For this sample, the container is created in the ConsumerConfiguration class. You can see the POJO wrapped in the adapter there.
@Bean public SimpleMessageListenerContainer listenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setQueueName(this.helloWorldQueueName); container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler())); return container; }
The SimpleMessageListenerContainer is a Spring lifecycle component and will start automatically by default. If you look in the Consumer class, you will see that its main() method consists of nothing more than a one-line bootstrap to create the ApplicationContext. The Producer's main() method is also a one-line bootstrap, since the component whose method is annotated with @Scheduled will also start executing automatically. You can start the Producer and Consumer in any order, and you should see messages being sent and received every 3 seconds.
The Stock Trading sample demonstrates more advanced messaging scenarios than the Hello World sample. However, the configuration is very similar - just a bit more involved. Since we've walked through the Hello World configuration in detail, here we'll focus on what makes this sample different. There is a server that pushes market data (stock quotes) to a Topic Exchange. Then, clients can subscribe to the market data feed by binding a Queue with a routing pattern (e.g. "app.stock.quotes.nasdaq.*"). The other main feature of this demo is a request-reply "stock trade" interaction that is initiated by the client and handled by the server. That involves a private "replyTo" Queue that is sent by the client within the order request Message itself.
The Server's core configuration is in the RabbitServerConfiguration class within the org.springframework.amqp.rabbit.stocks.config.server package. It extends the AbstractStockAppRabbitConfiguration. That is where the resources common to the Server and Client(s) are defined, including the market data Topic Exchange (whose name is 'app.stock.marketdata') and the Queue that the Server exposes for stock trades (whose name is 'app.stock.request'). In that common configuration file, you will also see that a JsonMessageConverter is configured on the RabbitTemplate.
The Server-specific configuration consists of 2 things. First, it configures the market data exchange on the RabbitTemplate so that it does not need to provide that exchange name with every call to send a Message. It does this within an abstract callback method defined in the base configuration class.
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) { rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME); }
Secondly, the stock request queue is declared. It does not require any explicit bindings in this case, because it will be bound to the default no-name exchange with its own name as the routing key. As mentioned earlier, the AMQP specification defines that behavior.
@Bean public Queue stockRequestQueue() { return new Queue(STOCK_REQUEST_QUEUE_NAME); }
Now that you've seen the configuration of the Server's AMQP resources, navigate to the 'org.springframework.amqp.rabbit.stocks' package under the 'src/test/java' directory. There you will see the actual Server class that provides a main() method. It creates an ApplicationContext based on the 'server-bootstrap.xml' config file. In there you will see the scheduled task that publishes dummy market data. That configuration relies upon Spring 3.0's "task" namespace support. The bootstrap config file also imports a few other files. The most interesting one is 'server-messaging.xml' which is directly under 'src/main/resources'. In there you will see the "messageListenerContainer" bean that is responsible for handling the stock trade requests. Finally have a look at the "serverHandler" bean that is defined in "server-handlers.xml" (also in 'src/main/resources'). That bean is an instance of the ServerHandler class and is a good example of a Message-driven POJO that is also capable of sending reply Messages. Notice that it is not itself coupled to the framework or any of the AMQP concepts. It simply accepts a TradeRequest and returns a TradeResponse.
public TradeResponse handleMessage(TradeRequest tradeRequest) { ... }
Now that we've seen the most important configuration and code for the Server, let's turn to the Client. The best starting point is probably RabbitClientConfiguration within the 'org.springframework.amqp.rabbit.stocks.config.client' package. Notice that it declares two queues without providing explicit names.
@Bean public Queue marketDataQueue() { return amqpAdmin().declareQueue(); } @Bean public Queue traderJoeQueue() { return amqpAdmin().declareQueue(); }
Those are private queues, and unique names will be generated automatically. The first generated queue is used by the Client to bind to the market data exchange that has been exposed by the Server. Recall that in AMQP, consumers interact with Queues while producers interact with Exchanges. The "binding" of Queues to Exchanges is what instructs the broker to deliver, or route, messages from a given Exchange to a Queue. Since the market data exchange is a Topic Exchange, the binding can be expressed with a routing pattern. The RabbitClientConfiguration declares that with a Binding object, and that object is generated with the BindingBuilder's fluent API.
@Value("${stocks.quote.pattern}") private String marketDataRoutingKey; @Bean public Binding marketDataBinding() { return BindingBuilder.bind( marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey); }
Notice that the actual value has been externalized in a properties file ("client.properties" under src/main/resources), and that we are using Spring's @Value annotation to inject that value. This is generally a good idea, since otherwise the value would have been hardcoded in a class and unmodifiable without recompilation. In this case, it makes it much easier to run multiple versions of the Client while making changes to the routing pattern used for binding. Let's try that now.
Start by running org.springframework.amqp.rabbit.stocks.Server and then org.springframework.amqp.rabbit.stocks.Client. You should see dummy quotes for NASDAQ stocks because the current value associated with the 'stocks.quote.pattern' key in client.properties is 'app.stock.quotes.nasdaq.*'. Now, while keeping the existing Server and Client running, change that property value to 'app.stock.quotes.nyse.*' and start a second Client instance. You should see that the first client is still receiving NASDAQ quotes while the second client receives NYSE quotes. You could instead change the pattern to get all stocks or even an individual ticker.
The final feature we'll explore is the request-reply interaction from the Client's perspective. Recall that we have already seen the ServerHandler that is accepting TradeRequest objects and returning TradeResponse objects. The corresponding code on the Client side is RabbitStockServiceGateway in the 'org.springframework.amqp.rabbit.stocks.gateway' package. It delegates to the RabbitTemplate in order to send Messages.
public void send(TradeRequest tradeRequest) { getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() { public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue)); try { message.getMessageProperties().setCorrelationId( UUID.randomUUID().toString().getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { throw new AmqpException(e); } return message; } }); }
Notice that prior to sending the message, it sets the "replyTo" address. It's providing the queue that was generated by the "traderJoeQueue" bean definition shown above. Here's the @Bean definition for the StockServiceGateway class itself.
@Bean public StockServiceGateway stockServiceGateway() { RabbitStockServiceGateway gateway = new RabbitStockServiceGateway(); gateway.setRabbitTemplate(rabbitTemplate()); gateway.setDefaultReplyToQueue(traderJoeQueue()); return gateway; }
If you are no longer running the Server and Client, start them now. Try sending a request with the format of '100 TCKR'. After a brief artificial delay that simulates "processing" of the request, you should see a confirmation message appear on the Client.