In this section, we will review a sample application that is included in the Spring Integration distribution. This sample is inspired by one of the samples featured in Gregor Hohpe's Ramblings.
The domain is that of a Cafe, and the basic flow is depicted in the following diagram:
The DrinkOrder
object may contain multiple Drinks
. Once the order
is placed, a Splitter will break the composite order message into a single message per
drink. Each of these is then processed by a Router that determines whether the drink is hot
or cold (checking the Drink
object's 'isIced' property). Finally the
Barista
prepares each drink, but hot and cold drink preparation are handled by two
distinct methods: 'prepareHotDrink' and 'prepareColdDrink'.
Here is the XML configuration:
<beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <message-bus/> <annotation-driven/> <context:component-scan base-package="org.springframework.integration.samples.cafe"/> <channel id="orders"/> <channel id="drinks"/> <channel id="coldDrinks"/> <channel id="hotDrinks"/> <service-activator input-channel="coldDrinks" ref="barista" method="prepareColdDrink"/> <service-activator input-channel="hotDrinks" ref="barista" method="prepareHotDrink"/> <beans:bean id="cafe" class="org.springframework.integration.samples.cafe.Cafe"> <beans:property name="orderChannel" ref="orders"/> </beans:bean> </beans:beans>
Notice that the Message Bus is defined. It will automatically detect and register all channels and endpoints.
The 'annotation-driven' element will enable the detection of the splitter and router - both of which carry
the @MessageEndpoint
annotation. That annotation extends Spring's
"stereotype" annotations (by relying on the @Component meta-annotation), and so all classes carrying the
endpoint annotation are capable of being detected by the component-scanner.
@MessageEndpoint(input="orders", output="drinks") public class OrderSplitter { @Splitter public List<Drink> split(DrinkOrder order) { return order.getDrinks(); } }
@MessageEndpoint(input="drinks") public class DrinkRouter { @Router public String resolveDrinkChannel(Drink drink) { return (drink.isIced()) ? "coldDrinks" : "hotDrinks"; } }
Now turning back to the XML, you see that there are two <service-activator> elements. Each of these
is delegating to the same Barista
instance but different methods. The 'barista' could
have been defined in the XML, but instead the @Component
annotation is applied:
@Component public class Barista { private long hotDrinkDelay = 5000; private long coldDrinkDelay = 1000; private AtomicInteger hotDrinkCounter = new AtomicInteger(); private AtomicInteger coldDrinkCounter = new AtomicInteger(); public void setHotDrinkDelay(long hotDrinkDelay) { this.hotDrinkDelay = hotDrinkDelay; } public void setColdDrinkDelay(long coldDrinkDelay) { this.coldDrinkDelay = coldDrinkDelay; } public void prepareHotDrink(Drink drink) { try { Thread.sleep(this.hotDrinkDelay); System.out.println(Thread.currentThread().getName() + " prepared hot drink #" + hotDrinkCounter.incrementAndGet() + ": " + drink); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public void prepareColdDrink(Drink drink) { try { Thread.sleep(this.coldDrinkDelay); System.out.println(Thread.currentThread().getName() + " prepared cold drink #" + coldDrinkCounter.incrementAndGet() + ": " + drink); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
As you can see from the code excerpt above, the barista methods have different delays (the hot
drinks take 5 times as long to prepare). This simulates work being completed at different rates.
When the CafeDemo
'main' method runs, it will loop 100
times sending a single hot drink and a single cold drink each time.
public static void main(String[] args) { AbstractApplicationContext context = null; if(args.length > 0) { context = new FileSystemXmlApplicationContext(args); } else { context = new ClassPathXmlApplicationContext("cafeDemo.xml", CafeDemo.class); } context.start(); Cafe cafe = (Cafe) context.getBean("cafe"); DrinkOrder order = new DrinkOrder(); Drink hotDoubleLatte = new Drink(DrinkType.LATTE, 2, false); Drink icedTripleMocha = new Drink(DrinkType.MOCHA, 3, true); order.addDrink(hotDoubleLatte); order.addDrink(icedTripleMocha); for (int i = 0; i < 100; i++) { cafe.placeOrder(order); } }
To run this demo, go to the "samples" directory within the root of the Spring Integration distribution. On
Unix/Mac you can run 'cafeDemo.sh', and on Windows you can run 'cafeDemo.bat'. Each of these will by default
create a Spring ApplicationContext
from the 'cafeDemo.xml' file that is
in the "spring-integration-samples" JAR and hence on the classpath (it is the same as the XML above). However, a
copy of that file is also available within the "samples" directory, so that you can provide the file name as a
command line argument to either 'cafeDemo.sh' or 'cafeDemo.bat'. This will allow you to experiment with the
configuration and immediately run the demo with your changes. It is probably a good idea to first copy the
original file so that you can make as many changes as you want and still refer back to the original to compare.
When you run cafeDemo, you will see that all 100 cold drinks are prepared in roughly the same amount of time as only 20 of the hot drinks. This is to be expected based on their respective delays of 1000 and 5000 milliseconds. However, by configuring a poller with a concurrent task executor, you can dramatically change the results. For example, you could use a thread pool executor with 5 workers for the hot drink barista:
<service-activator input-channel="coldDrinks" ref="barista" method="prepareColdDrink"/> <service-activator input-channel="hotDrinks" ref="barista" method="prepareHotDrink"> <poller period="1000" task-executor="pool"/> </service-activator> <pool-executor id="pool" core-size="5"/>
Also, notice that the worker thread name is displayed with each invocation. You should see that most of the hot drinks are prepared by the task-executor threads, but that occasionally it throttles the input by forcing the message-bus (the caller) to invoke the operation. In addition to experimenting with the 'concurrency' settings, you can also add the 'transactional' sub-element as described in Section 4.2.2, “Configuring Message Endpoints”. If you want to explore the sample in more detail, the source JAR is available in the "src" directory: 'org.springframework.integration.samples-sources-1.0.0.M6.jar'.