3. Spring Integration Overview

3.1 Background

One of the key themes of the Spring Framework is inversion of control. In its broadest sense, this means that the framework handles responsibilities on behalf of the components that are managed within its context. The components themselves are simplified since they are relieved of those responsibilities. For example, dependency injection relieves the components of the responsibility of locating or creating their dependencies. Likewise, aspect-oriented programming relieves business components of generic cross-cutting concerns by modularizing them into reusable aspects. In each case, the end result is a system that is easier to test, understand, maintain, and extend.

Furthermore, the Spring framework and portfolio provide a comprehensive programming model for building enterprise applications. Developers benefit from the consistency of this model and especially the fact that it is based upon well-established best practices such as programming to interfaces and favoring composition over inheritance. Spring’s simplified abstractions and powerful support libraries boost developer productivity while simultaneously increasing the level of testability and portability.

Spring Integration is motivated by these same goals and principles. It extends the Spring programming model into the messaging domain and builds upon Spring’s existing enterprise integration support to provide an even higher level of abstraction. It supports message-driven architectures where inversion of control applies to runtime concerns, such as when certain business logic should execute and where the response should be sent. It supports routing and transformation of messages so that different transports and different data formats can be integrated without impacting testability. In other words, the messaging and integration concerns are handled by the framework, so business components are further isolated from the infrastructure and developers are relieved of complex integration responsibilities.

As an extension of the Spring programming model, Spring Integration provides a wide variety of configuration options including annotations, XML with namespace support, XML with generic "bean" elements, and of course direct usage of the underlying API. That API is based upon well-defined strategy interfaces and non-invasive, delegating adapters. Spring Integration’s design is inspired by the recognition of a strong affinity between common patterns within Spring and the well-known Enterprise Integration Patterns as described in the book of the same name by Gregor Hohpe and Bobby Woolf (Addison Wesley, 2004). Developers who have read that book should be immediately comfortable with the Spring Integration concepts and terminology.

3.2 Goals and Principles

Spring Integration is motivated by the following goals:

  • Provide a simple model for implementing complex enterprise integration solutions.
  • Facilitate asynchronous, message-driven behavior within a Spring-based application.
  • Promote intuitive, incremental adoption for existing Spring users.

Spring Integration is guided by the following principles:

  • Components should be loosely coupled for modularity and testability.
  • The framework should enforce separation of concerns between business logic and integration logic.
  • Extension points should be abstract in nature but within well-defined boundaries to promote reuse and portability.

3.3 Main Components

From the vertical perspective, a layered architecture facilitates separation of concerns, and interface-based contracts between layers promote loose coupling. Spring-based applications are typically designed this way, and the Spring framework and portfolio provide a strong foundation for following this best practice for the full-stack of an enterprise application. Message-driven architectures add a horizontal perspective, yet these same goals are still relevant. Just as "layered architecture" is an extremely generic and abstract paradigm, messaging systems typically follow the similarly abstract "pipes-and-filters" model. The "filters" represent any component that is capable of producing and/or consuming messages, and the "pipes" transport the messages between filters so that the components themselves remain loosely-coupled. It is important to note that these two high-level paradigms are not mutually exclusive. The underlying messaging infrastructure that supports the "pipes" should still be encapsulated in a layer whose contracts are defined as interfaces. Likewise, the "filters" themselves would typically be managed within a layer that is logically above the application’s service layer, interacting with those services through interfaces much in the same way that a web-tier would.

3.3.1 Message

In Spring Integration, a Message is a generic wrapper for any Java object combined with metadata used by the framework while handling that object. It consists of a payload and headers. The payload can be of any type and the headers hold commonly required information such as id, timestamp, correlation id, and return address. Headers are also used for passing values to and from connected transports. For example, when creating a Message from a received File, the file name may be stored in a header to be accessed by downstream components. Likewise, if a Message’s content is ultimately going to be sent by an outbound Mail adapter, the various properties (to, from, cc, subject, etc.) may be configured as Message header values by an upstream component. Developers can also store any arbitrary key-value pairs in the headers.

Figure 3.1. Message

Message

3.3.2 Message Channel

A Message Channel represents the "pipe" of a pipes-and-filters architecture. Producers send Messages to a channel, and consumers receive Messages from a channel. The Message Channel therefore decouples the messaging components, and also provides a convenient point for interception and monitoring of Messages.

Figure 3.2. Message Channel

Message Channel

A Message Channel may follow either Point-to-Point or Publish/Subscribe semantics. With a Point-to-Point channel, at most one consumer can receive each Message sent to the channel. Publish/Subscribe channels, on the other hand, will attempt to broadcast each Message to all of its subscribers. Spring Integration supports both of these.

Whereas "Point-to-Point" and "Publish/Subscribe" define the two options for how many consumers will ultimately receive each Message, there is another important consideration: should the channel buffer messages? In Spring Integration, Pollable Channels are capable of buffering Messages within a queue. The advantage of buffering is that it allows for throttling the inbound Messages and thereby prevents overloading a consumer. However, as the name suggests, this also adds some complexity, since a consumer can only receive the Messages from such a channel if a poller is configured. On the other hand, a consumer connected to a Subscribable Channel is simply Message-driven. The variety of channel implementations available in Spring Integration will be discussed in detail in Section 4.1.2, “Message Channel Implementations”.

3.3.3 Message Endpoint

One of the primary goals of Spring Integration is to simplify the development of enterprise integration solutions through inversion of control. This means that you should not have to implement consumers and producers directly, and you should not even have to build Messages and invoke send or receive operations on a Message Channel. Instead, you should be able to focus on your specific domain model with an implementation based on plain Objects. Then, by providing declarative configuration, you can "connect" your domain-specific code to the messaging infrastructure provided by Spring Integration. The components responsible for these connections are Message Endpoints. This does not mean that you will necessarily connect your existing application code directly. Any real-world enterprise integration solution will require some amount of code focused upon integration concerns such as routing and transformation. The important thing is to achieve separation of concerns between such integration logic and business logic. In other words, as with the Model-View-Controller paradigm for web applications, the goal should be to provide a thin but dedicated layer that translates inbound requests into service layer invocations, and then translates service layer return values into outbound replies. The next section will provide an overview of the Message Endpoint types that handle these responsibilities, and in upcoming chapters, you will see how Spring Integration’s declarative configuration options provide a non-invasive way to use each of these.

3.4 Message Endpoints

A Message Endpoint represents the "filter" of a pipes-and-filters architecture. As mentioned above, the endpoint’s primary role is to connect application code to the messaging framework and to do so in a non-invasive manner. In other words, the application code should ideally have no awareness of the Message objects or the Message Channels. This is similar to the role of a Controller in the MVC paradigm. Just as a Controller handles HTTP requests, the Message Endpoint handles Messages. Just as Controllers are mapped to URL patterns, Message Endpoints are mapped to Message Channels. The goal is the same in both cases: isolate application code from the infrastructure. These concepts are discussed at length along with all of the patterns that follow in the Enterprise Integration Patterns book. Here, we provide only a high-level description of the main endpoint types supported by Spring Integration and their roles. The chapters that follow will elaborate and provide sample code as well as configuration examples.

3.4.1 Transformer

A Message Transformer is responsible for converting a Message’s content or structure and returning the modified Message. Probably the most common type of transformer is one that converts the payload of the Message from one format to another (e.g. from XML Document to java.lang.String). Similarly, a transformer may be used to add, remove, or modify the Message’s header values.

3.4.2 Filter

A Message Filter determines whether a Message should be passed to an output channel at all. This simply requires a boolean test method that may check for a particular payload content type, a property value, the presence of a header, etc. If the Message is accepted, it is sent to the output channel, but if not it will be dropped (or for a more severe implementation, an Exception could be thrown). Message Filters are often used in conjunction with a Publish Subscribe channel, where multiple consumers may receive the same Message and use the filter to narrow down the set of Messages to be processed based on some criteria.

[Note]Note

Be careful not to confuse the generic use of "filter" within the Pipes-and-Filters architectural pattern with this specific endpoint type that selectively narrows down the Messages flowing between two channels. The Pipes-and-Filters concept of "filter" matches more closely with Spring Integration’s Message Endpoint: any component that can be connected to Message Channel(s) in order to send and/or receive Messages.

3.4.3 Router

A Message Router is responsible for deciding what channel or channels should receive the Message next (if any). Typically the decision is based upon the Message’s content and/or metadata available in the Message Headers. A Message Router is often used as a dynamic alternative to a statically configured output channel on a Service Activator or other endpoint capable of sending reply Messages. Likewise, a Message Router provides a proactive alternative to the reactive Message Filters used by multiple subscribers as described above.

Figure 3.3. Router

Router

3.4.4 Splitter

A Splitter is another type of Message Endpoint whose responsibility is to accept a Message from its input channel, split that Message into multiple Messages, and then send each of those to its output channel. This is typically used for dividing a "composite" payload object into a group of Messages containing the sub-divided payloads.

3.4.5 Aggregator

Basically a mirror-image of the Splitter, the Aggregator is a type of Message Endpoint that receives multiple Messages and combines them into a single Message. In fact, Aggregators are often downstream consumers in a pipeline that includes a Splitter. Technically, the Aggregator is more complex than a Splitter, because it is required to maintain state (the Messages to-be-aggregated), to decide when the complete group of Messages is available, and to timeout if necessary. Furthermore, in case of a timeout, the Aggregator needs to know whether to send the partial results or to discard them to a separate channel. Spring Integration provides a CorrelationStrategy, a ReleaseStrategy and configurable settings for: timeout, whether to send partial results upon timeout, and a discard channel.

3.4.6 Service Activator

A Service Activator is a generic endpoint for connecting a service instance to the messaging system. The input Message Channel must be configured, and if the service method to be invoked is capable of returning a value, an output Message Channel may also be provided.

[Note]Note

The output channel is optional, since each Message may also provide its own Return Address header. This same rule applies for all consumer endpoints.

The Service Activator invokes an operation on some service object to process the request Message, extracting the request Message’s payload and converting if necessary (if the method does not expect a Message-typed parameter). Whenever the service object’s method returns a value, that return value will likewise be converted to a reply Message if necessary (if it’s not already a Message). That reply Message is sent to the output channel. If no output channel has been configured, then the reply will be sent to the channel specified in the Message’s "return address" if available.

A request-reply "Service Activator" endpoint connects a target object’s method to input and output Message Channels.

Figure 3.4. Service Activator

handler endpoint

[Note]Note

As discussed in Message Channel above, channels can be Pollable or Subscribable; in this diagram, this is depicted by the "clock" symbol and the solid arrow (poll) and the dotted arrow (subscribe).

3.4.7 Channel Adapter

A Channel Adapter is an endpoint that connects a Message Channel to some other system or transport. Channel Adapters may be either inbound or outbound. Typically, the Channel Adapter will do some mapping between the Message and whatever object or resource is received-from or sent-to the other system (File, HTTP Request, JMS Message, etc). Depending on the transport, the Channel Adapter may also populate or extract Message header values. Spring Integration provides a number of Channel Adapters, and they will be described in upcoming chapters.

Figure 3.5. An inbound "Channel Adapter" endpoint connects a source system to a MessageChannel.

source endpoint

[Note]Note

Message sources can be Pollable (e.g. POP3) or Message-Driven (e.g. IMAP Idle); in this diagram, this is depicted by the "clock" symbol and the solid arrow (poll) and the dotted arrow (message-driven).

Figure 3.6. An outbound "Channel Adapter" endpoint connects a MessageChannel to a target system.

target endpoint

[Note]Note

As discussed in Message Channel above, channels can be Pollable or Subscribable; in this diagram, this is depicted by the "clock" symbol and the solid arrow (poll) and the dotted arrow (subscribe).

3.5 Configuration and @EnableIntegration

Throughout this document you will see references to XML namespace support for declaring elements in a Spring Integration flow. This support is provided by a series of namespace parsers that generate appropriate bean definitions to implement a particular component. For example, many endpoints consist of a MessageHandler bean and a ConsumerEndpointFactoryBean into which the handler and an input channel name are injected.

The first time a Spring Integration namespace element is encountered, the framework automatically declares a number of beans that are used to support the runtime environment (task scheduler, implicit channel creator, etc).

[Important]Important

Starting with version 4.0, the @EnableIntegration annotation has been introduced, to allow the registration of Spring Integration infrastructure beans (see JavaDocs). This annotation is required when only Java & Annotation configuration is used, e.g. with Spring Boot and/or Spring Integration Messaging Annotation support and Spring Integration Java DSL with no XML integration configuration.

The @EnableIntegration annotation is also useful when you have a parent context with no Spring Integration components and 2 or more child contexts that use Spring Integration. It enables these common components to be declared once only, in the parent context.

The @EnableIntegration annotation registers many infrastructure components with the application context:

  • Registers some built-in beans, e.g. errorChannel and its LoggingHandler, taskScheduler for pollers, jsonPath SpEL-function etc.;
  • Adds several BeanFactoryPostProcessor s to enhance the BeanFactory for global and default integration environment;
  • Adds several BeanPostProcessor s to enhance and/or convert and wrap particular beans for integration purposes;
  • Adds annotations processors to parse Messaging Annotations and registers components for them with the application context.

The @IntegrationComponentScan annotation has also been introduced to permit classpath scanning. This annotation plays a similar role as the standard Spring Framework @ComponentScan annotation, but it is restricted just to Spring Integration specific components and annotations, which aren’t reachable by the standard Spring Framework component scan mechanism. For example Section 8.4.6, “@MessagingGateway Annotation”.

The @EnablePublisher annotation has been introduced to register a PublisherAnnotationBeanPostProcessor bean and configure the default-publisher-channel for those @Publisher annotations which are provided without a channel attribute. If more than one @EnablePublisher annotation is found, they must all have the same value for the default channel. See Section 31.3.2, “TCP Failover Client Connection Factory” for more information.

The @GlobalChannelInterceptor annotation has been introduced to mark ChannelInterceptor beans for global channel interception. This annotation is an analogue of the <int:channel-interceptor> xml element (see the section called “Global Channel Interceptor Configuration”). @GlobalChannelInterceptor annotations can be placed at the class level (with a @Component stereotype annotation), or on @Bean methods within @Configuration classes. In either case, the bean must be a ChannelInterceptor.

The @IntegrationConverter annotation has been introduced to mark Converter, GenericConverter or ConverterFactory beans as candidate converters for integrationConversionService. This annotation is an analogue of the <int:converter> xml element (see Section 8.1.6, “Payload Type Conversion”). @IntegrationConverter annotations can be placed at the class level (with a @Component stereotype annotation), or on @Bean methods within @Configuration classes.

Also see Section 31.3.2, “TCP Failover Client Connection Factory” for more information about Messaging Annotations.

3.6 Programming Considerations

It is generally recommended that you use plain old java objects (POJOs) whenever possible and only expose the framework in your code when absolutely necessary.

If you do expose the framework to your classes, there are some considerations that need to be taken into account, especially during application startup; some of these are listed here.

  • If your component is ApplicationContextAware, you should generally not "use" the ApplicationContext in the setApplicationContext() method; just store a reference and defer such uses until later in the context lifecycle.
  • If your component is an InitializingBean or uses @PostConstruct methods, do not send any messages from these initialization methods - the application context is not yet initialized when these methods are called, and sending such messages will likely fail. If you need to send a messages during startup, implement ApplicationListener and wait for the ContextRefreshedEvent. Alternatively, implement SmartLifecycle, put your bean in a late phase, and send the messages from the start() method.

3.7 Considerations When using Packaged (e.g. Shaded) Jars

Spring Integration bootstraps certain features using Spring Framework’s SpringFactories mechanism to load several IntegrationConfigurationInitializer classes. This includes the -core jar as well as certain others such as -http, -jmx, etc. The information for this process is stored in a file META-INF/spring.factories in each jar.

Some developers prefer to repackage their application and all dependencies into a single jar using well-known tools, such as the Apache Maven Shade Plugin.

By default, the shade plugin will not merge the spring.factories files when producing the shaded jar.

In addition to spring.factories, there are other META-INF files (spring.handlers, spring.schemas) used for XML configuration. These also need to be merged.

[Important]Important

Spring Boot’s executable jar mechanism takes a different approach in that it nests the jars, thus retaining each spring.factories file on the class path. So, with a Spring Boot application, nothing more is needed, if you use its default executable jar format.

Even if you are not using Spring Boot, you can still use tooling provided by Boot to enhance the shade plugin by adding transformers for the above mentioned files.

The following is an example configuration for the plugin at the time of writing. You may wish to consult the current spring-boot-starter-parent pom to see the current settings that boot uses.

pom.xml. 

...
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <configuration>
                <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <dependencies>
                <dependency> 1
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                </dependency>
            </dependencies>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers> 2
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.handlers</resource>
                            </transformer>
                            <transformer
                                implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                <resource>META-INF/spring.factories</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.schemas</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
...

Specifically,

1

add the spring-boot-maven-plugin as a dependency

2

configure the transformers

Add a property for ${spring.boot.version} or use a version explicitly there.

3.8 Programming Tips and Tricks

With XML configuration and Spring Integration Namespace support, the XML Parsers hide how target beans are built and wired together. For Java & Annotation Configuration, it is important to understand the Framework API for the target end-user applications.

The first class citizens for EIP implementation are Message, Channel and Endpoint (see Section 3.3, “Main Components” above). Their implementations (contracts) are:

The first two are simple enough to understand how to implement, configure and use, respectively; the last one deserves more review.

The AbstractEndpoint is widely used throughout the Framework for different component implementations; its main implementations are:

  • EventDrivenConsumer, when we subscribe to a SubscribableChannel to listen for messages;
  • PollingConsumer, when we poll for messages from a PollableChannel.

Using Messaging Annotations and/or Java DSL, you shouldn’t worry about these components, because the Framework produces them automatically via appropriate annotations and BeanPostProcessor s. When building components manually, the ConsumerEndpointFactoryBean should be used to help to determine the target AbstractEndpoint implementation based on the provided inputChannel property.

On the other hand, the ConsumerEndpointFactoryBean exhibits an another first class citizens in the Framework - org.springframework.messaging.MessageHandler. The goal of the implementation of this class is to handle the message consumed by the endpoint from the channel. All EIP components in Spring Integration are MessageHandler implementations, e.g. AggregatingMessageHandler, MessageTransformingHandler, AbstractMessageSplitter etc.; as well as the target protocol outbound adapters are implementations, too, e.g. FileWritingMessageHandler, HttpRequestExecutingMessageHandler, AbstractMqttMessageHandler etc. When you develop Spring Integration applications with Java & Annotation Configuration, you should take a look into the Spring Integration module to find an appropriate MessageHandler implementation to be used for the @ServiceActivator configuration. For example to send an XMPP message (see Section 31.3.2, “TCP Failover Client Connection Factory”) we should configure something like this:

@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
    ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);

    DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
    xmppHeaderMapper.setRequestHeaderNames("*");
    handler.setHeaderMapper(xmppHeaderMapper);

    return handler;
}

The MessageHandler implementations represent the outbound and processing part of the message flow.

The inbound message flow side has its own components, which are divided to the polling and listening behavior. The listening components are pretty simple and typically requires only one target class implementation to be ready to produce messages. Listening components can be one-way MessageProducerSupport implementations, e.g. AbstractMqttMessageDrivenChannelAdapter and ImapIdleChannelAdapter; and request-reply - MessagingGatewaySupport implementations, e.g. AmqpInboundGateway and AbstractWebServiceInboundGateway.

Polling inbound endpoints are for those protocols which don’t provide a listener API or aren’t intended for such a behavior. For example any File based protocol, as an FTP, any data bases (RDBMS or NoSQL) etc.

These inbound endpoints consist with two components: the poller configuration, to initiate the polling task periodically, and message source class to read data from the target protocol and produce a message for the downstream integration flow. The first class, for poller configuration, is SourcePollingChannelAdapter. It is one more AbstractEndpoint implementation, but especially for the polling purpose for initiating an integration flow. Typically, with the Messaging Annotations or Java DSL, you shouldn’t worry about this class, the Framework produces a bean for it, based on the @InboundChannelAdapter configuration or Java DSL particular Builder.

The message source components are more important for the target application development and they all implement the MessageSource interface, e.g. MongoDbMessageSource and AbstractTwitterMessageSource. With that in mind, our config for reading data from an RDBMS table with JDBC may look like:

@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
    return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}

All the required inbound and outbound classes for the target protocols you can find in the particular Spring Integration module, in most cases in the respective package. For example spring-integration-websocket adapters are:

  • o.s.i.websocket.inbound.WebSocketInboundChannelAdapter - implements MessageProducerSupport implementation to listen frames on the socket and produce message to the channel;
  • o.s.i.websocket.outbound.WebSocketOutboundMessageHandler - the one-way AbstractMessageHandler implementation to convert incoming messages to the appropriate frame and send over websocket.

If you are familiar with Spring Integration XML configuration already, starting with version 4.3, we provide in the XSD elements definitions the description with the pointer which target classes are used to produce beans for the adapter or gateway, for example:

<xsd:element name="outbound-async-gateway">
    <xsd:annotation>
		<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
       </xsd:documentation>
	</xsd:annotation>