Sample Applications

The Spring AMQP Samples project includes two sample applications. The first is a simple “Hello World” example that demonstrates both synchronous and asynchronous message reception. It provides an excellent starting point for acquiring an understanding of the essential components. The second sample is based on a stock-trading use case to demonstrate the types of interaction that would be common in real world applications. In this chapter, we provide a quick walk-through of each sample so that you can focus on the most important components. The samples are both Maven-based, so you should be able to import them directly into any Maven-aware IDE (such as SpringSource Tool Suite).

The “Hello World” Sample

The “Hello World” sample demonstrates both synchronous and asynchronous message reception. You can import the spring-rabbit-helloworld sample into the IDE and then follow the discussion below.

Synchronous Example

Within the src/main/java directory, navigate to the org.springframework.amqp.helloworld package. Open the HelloWorldConfiguration class and notice that it contains the @Configuration annotation at the class level and notice some @Bean annotations at method-level. This is an example of Spring’s Java-based configuration. You can read more about that here.

The following listing shows how the connection factory is created:

@Bean
public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory =
        new CachingConnectionFactory("localhost");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

The configuration also contains an instance of RabbitAdmin, which, by default, looks for any beans of type exchange, queue, or binding and then declares them on the broker. In fact, the helloWorldQueue bean that is generated in HelloWorldConfiguration is an example because it is an instance of Queue.

The following listing shows the helloWorldQueue bean definition:

@Bean
public Queue helloWorldQueue() {
    return new Queue(this.helloWorldQueueName);
}

Looking back at the rabbitTemplate bean configuration, you can see that it has the name of helloWorldQueue set as its queue property (for receiving messages) and for its routingKey property (for sending messages).

Now that we have explored the configuration, we can look at the code that actually uses these components. First, open the Producer class from within the same package. It contains a main() method where the Spring ApplicationContext is created.

The following listing shows the main method:

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    amqpTemplate.convertAndSend("Hello World");
    System.out.println("Sent: Hello World");
}

In the preceding example, the AmqpTemplate bean is retrieved and used for sending a Message. Since the client code should rely on interfaces whenever possible, the type is AmqpTemplate rather than RabbitTemplate. Even though the bean created in HelloWorldConfiguration is an instance of RabbitTemplate, relying on the interface means that this code is more portable (you can change the configuration independently of the code). Since the convertAndSend() method is invoked, the template delegates to its MessageConverter instance. In this case, it uses the default SimpleMessageConverter, but a different implementation could be provided to the rabbitTemplate bean, as defined in HelloWorldConfiguration.

Now open the Consumer class. It actually shares the same configuration base class, which means it shares the rabbitTemplate bean. That is why we configured that template with both a routingKey (for sending) and a queue (for receiving). As we describe in [amqp-template], you could instead pass the 'routingKey' argument to the send method and the 'queue' argument to the receive method. The Consumer code is basically a mirror image of the Producer, calling receiveAndConvert() rather than convertAndSend().

The following listing shows the main method for the Consumer:

public static void main(String[] args) {
    ApplicationContext context =
        new AnnotationConfigApplicationContext(RabbitConfiguration.class);
    AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
    System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}

If you run the Producer and then run the Consumer, you should see Received: Hello World in the console output.

Asynchronous Example

Synchronous Example walked through the synchronous Hello World sample. This section describes a slightly more advanced but significantly more powerful option. With a few modifications, the Hello World sample can provide an example of asynchronous reception, also known as message-driven POJOs. In fact, there is a sub-package that provides exactly that: org.springframework.amqp.samples.helloworld.async.

Again, we start with the sending side. Open the ProducerConfiguration class and notice that it creates a connectionFactory and a rabbitTemplate bean. This time, since the configuration is dedicated to the message sending side, we do not even need any queue definitions, and the RabbitTemplate has only the 'routingKey' property set. Recall that messages are sent to an exchange rather than being sent directly to a queue. The AMQP default exchange is a direct exchange with no name. All queues are bound to that default exchange with their name as the routing key. That is why we only need to provide the routing key here.

The following listing shows the rabbitTemplate definition:

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(this.helloWorldQueueName);
    return template;
}

Since this sample demonstrates asynchronous message reception, the producing side is designed to continuously send messages (if it were a message-per-execution model like the synchronous version, it would not be quite so obvious that it is, in fact, a message-driven consumer). The component responsible for continuously sending messages is defined as an inner class within the ProducerConfiguration. It is configured to run every three seconds.

The following listing shows the component:

static class ScheduledProducer {

    @Autowired
    private volatile RabbitTemplate rabbitTemplate;

    private final AtomicInteger counter = new AtomicInteger();

    @Scheduled(fixedRate = 3000)
    public void sendMessage() {
        rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
    }
}

You do not need to understand all of the details, since the real focus should be on the receiving side (which we cover next). However, if you are not yet familiar with Spring task scheduling support, you can learn more here. The short story is that the postProcessor bean in the ProducerConfiguration registers the task with a scheduler.

Now we can turn to the receiving side. To emphasize the message-driven POJO behavior, we start with the component that react to the messages. The class is called HelloWorldHandler and is shown in the following listing:

public class HelloWorldHandler {

    public void handleMessage(String text) {
        System.out.println("Received: " + text);
    }

}

That class is a POJO. It does not extend any base class, it does not implement any interfaces, and it does not even contain any imports. It is being “adapted” to the MessageListener interface by the Spring AMQP MessageListenerAdapter. You can then configure that adapter on a SimpleMessageListenerContainer. For this sample, the container is created in the ConsumerConfiguration class. You can see the POJO wrapped in the adapter there.

The following listing shows how the listenerContainer is defined:

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueName(this.helloWorldQueueName);
    container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
    return container;
}

The SimpleMessageListenerContainer is a Spring lifecycle component and, by default, starts automatically. If you look in the Consumer class, you can see that its main() method consists of nothing more than a one-line bootstrap to create the ApplicationContext. The Producer’s main() method is also a one-line bootstrap, since the component whose method is annotated with @Scheduled also starts automatically. You can start the Producer and Consumer in any order, and you should see messages being sent and received every three seconds.

Stock Trading

The Stock Trading sample demonstrates more advanced messaging scenarios than the Hello World sample. However, the configuration is very similar, if a bit more involved. Since we walked through the Hello World configuration in detail, here, we focus on what makes this sample different. There is a server that pushes market data (stock quotations) to a topic exchange. Then, clients can subscribe to the market data feed by binding a queue with a routing pattern (for example, app.stock.quotes.nasdaq.*). The other main feature of this demo is a request-reply “stock trade” interaction that is initiated by the client and handled by the server. That involves a private replyTo queue that is sent by the client within the order request message itself.

The server’s core configuration is in the RabbitServerConfiguration class within the org.springframework.amqp.rabbit.stocks.config.server package. It extends the AbstractStockAppRabbitConfiguration. That is where the resources common to the server and client are defined, including the market data topic exchange (whose name is 'app.stock.marketdata') and the queue that the server exposes for stock trades (whose name is 'app.stock.request'). In that common configuration file, you also see that a Jackson2JsonMessageConverter is configured on the RabbitTemplate.

The server-specific configuration consists of two things. First, it configures the market data exchange on the RabbitTemplate so that it does not need to provide that exchange name with every call to send a Message. It does this within an abstract callback method defined in the base configuration class. The following listing shows that method:

public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
    rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}

Second, the stock request queue is declared. It does not require any explicit bindings in this case, because it is bound to the default no-name exchange with its own name as the routing key. As mentioned earlier, the AMQP specification defines that behavior. The following listing shows the definition of the stockRequestQueue bean:

@Bean
public Queue stockRequestQueue() {
    return new Queue(STOCK_REQUEST_QUEUE_NAME);
}

Now that you have seen the configuration of the server’s AMQP resources, navigate to the org.springframework.amqp.rabbit.stocks package under the src/test/java directory. There, you can see the actual Server class that provides a main() method. It creates an ApplicationContext based on the server-bootstrap.xml config file. There, you can see the scheduled task that publishes dummy market data. That configuration relies upon Spring’s task namespace support. The bootstrap config file also imports a few other files. The most interesting one is server-messaging.xml, which is directly under src/main/resources. There, you can see the messageListenerContainer bean that is responsible for handling the stock trade requests. Finally, have a look at the serverHandler bean that is defined in server-handlers.xml (which is also in 'src/main/resources'). That bean is an instance of the ServerHandler class and is a good example of a message-driven POJO that can also send reply messages. Notice that it is not itself coupled to the framework or any of the AMQP concepts. It accepts a TradeRequest and returns a TradeResponse. The following listing shows the definition of the handleMessage method:

public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}

Now that we have seen the most important configuration and code for the server, we can turn to the client. The best starting point is probably RabbitClientConfiguration, in the org.springframework.amqp.rabbit.stocks.config.client package. Notice that it declares two queues without providing explicit names. The following listing shows the bean definitions for the two queues:

@Bean
public Queue marketDataQueue() {
    return amqpAdmin().declareQueue();
}

@Bean
public Queue traderJoeQueue() {
    return amqpAdmin().declareQueue();
}

Those are private queues, and unique names are generated automatically. The first generated queue is used by the client to bind to the market data exchange that has been exposed by the server. Recall that, in AMQP, consumers interact with queues while producers interact with exchanges. The “binding” of queues to exchanges is what tells the broker to deliver (or route) messages from a given exchange to a queue. Since the market data exchange is a topic exchange, the binding can be expressed with a routing pattern. The RabbitClientConfiguration does so with a Binding object, and that object is generated with the BindingBuilder fluent API. The following listing shows the Binding:

@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Binding marketDataBinding() {
    return BindingBuilder.bind(
        marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

Notice that the actual value has been externalized in a properties file (client.properties under src/main/resources), and that we use Spring’s @Value annotation to inject that value. This is generally a good idea. Otherwise, the value would have been hardcoded in a class and unmodifiable without recompilation. In this case, it is much easier to run multiple versions of the client while making changes to the routing pattern used for binding. We can try that now.

Start by running org.springframework.amqp.rabbit.stocks.Server and then org.springframework.amqp.rabbit.stocks.Client. You should see dummy quotations for NASDAQ stocks, because the current value associated with the 'stocks.quote.pattern' key in client.properties is 'app.stock.quotes.nasdaq.'. Now, while keeping the existing Server and Client running, change that property value to 'app.stock.quotes.nyse.' and start a second Client instance. You should see that the first client still receives NASDAQ quotes while the second client receives NYSE quotes. You could instead change the pattern to get all stocks or even an individual ticker.

The final feature we explore is the request-reply interaction from the client’s perspective. Recall that we have already seen the ServerHandler that accepts TradeRequest objects and returns TradeResponse objects. The corresponding code on the Client side is RabbitStockServiceGateway in the org.springframework.amqp.rabbit.stocks.gateway package. It delegates to the RabbitTemplate in order to send messages. The following listing shows the send method:

public void send(TradeRequest tradeRequest) {
    getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
            try {
                message.getMessageProperties().setCorrelationId(
                    UUID.randomUUID().toString().getBytes("UTF-8"));
            }
            catch (UnsupportedEncodingException e) {
                throw new AmqpException(e);
            }
            return message;
        }
    });
}

Notice that, prior to sending the message, it sets the replyTo address. It provides the queue that was generated by the traderJoeQueue bean definition (shown earlier). The following listing shows the @Bean definition for the StockServiceGateway class itself:

@Bean
public StockServiceGateway stockServiceGateway() {
    RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
    gateway.setRabbitTemplate(rabbitTemplate());
    gateway.setDefaultReplyToQueue(traderJoeQueue());
    return gateway;
}

If you are no longer running the server and client, start them now. Try sending a request with the format of '100 TCKR'. After a brief artificial delay that simulates “processing” of the request, you should see a confirmation message appear on the client.

Receiving JSON from Non-Spring Applications

Spring applications, when sending JSON, set the TypeId header to the fully qualified class name to assist the receiving application in converting the JSON back to a Java object.

The spring-rabbit-json sample explores several techniques to convert the JSON from a non-Spring application.