System Management

Metrics and Management

This section describes how to capture metrics for Spring Integration. In recent versions, we have relied more on Micrometer (see https://micrometer.io), and we plan to use Micrometer even more in future releases.

Configuring Metrics Capture

Prior to version 4.2, metrics were only available when JMX was enabled. See JMX Support.

To enable MessageSource, MessageChannel, and MessageHandler metrics, add an <int:management/> bean to the application context (in XML) or annotate one of your @Configuration classes with @EnableIntegrationManagement (in Java). MessageSource instances maintain only counts, MessageChannel instances and MessageHandler instances maintain duration statistics in addition to counts. See MessageChannel Metric Features and MessageHandler Metric Features, later in this chapter.

Doing so causes the automatic registration of the IntegrationManagementConfigurer bean in the application context. Only one such bean can exist in the context, and, if registered manually via a <bean/> definition, it must have the bean name set to integrationManagementConfigurer. This bean applies its configuration to beans after all beans in the context have been instantiated.

In addition to metrics, you can control debug logging in the main message flow. In very high volume applications, even calls to isDebugEnabled() can be quite expensive with some logging subsystems. You can disable all such logging to avoid this overhead. Exception logging (debug or otherwise) is not affected by this setting.

The following listing shows the available options for controlling logging:

<int:management
    default-logging-enabled="true" (1)
    default-counts-enabled="false" (2)
    default-stats-enabled="false" (3)
    counts-enabled-patterns="foo, !baz, ba*" (4)
    stats-enabled-patterns="fiz, buz" (5)
    metrics-factory="myMetricsFactory" /> (6)
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
    defaultLoggingEnabled = "true", (1)
    defaultCountsEnabled = "false", (2)
    defaultStatsEnabled = "false", (3)
    countsEnabled = { "foo", "${count.patterns}" }, (4)
    statsEnabled = { "qux", "!*" }, (5)
    MetricsFactory = "myMetricsFactory") (6)
public static class ContextConfiguration {
...
}
1 Set to false to disable all logging in the main message flow, regardless of the log system category settings. Set to 'true' to enable debug logging (if also enabled by the logging subsystem). Only applied if you have not explicitly configured the setting in a bean definition. The default is true.
2 Enable or disable count metrics for components that do not match one of the patterns in <4>. Only applied if you have not explicitly configured the setting in a bean definition. The default is false.
3 Enable or disable statistical metrics for components that do not match one of the patterns in <5>. Only applied if you have not explicitly configured the setting in a bean definition. The default is 'false'.
4 A comma-delimited list of patterns for beans for which counts should be enabled. You can negate the pattern with !. First match (positive or negative) wins. In the unlikely event that you have a bean name starting with !, escape the ! in the pattern. For example, \!something positively matches a bean named !something.
5 A comma-delimited list of patterns for beans for which statistical metrics should be enabled. You can negate the pattern\ with !. First match (positive or negative) wins. In the unlikely event that you have a bean name starting with !, escape the ! in the pattern. \!something positively matches a bean named !something. The collection of statistics implies the collection of counts.
6 A reference to a MetricsFactory. See Metrics Factory.

At runtime, counts and statistics can be obtained by calling getChannelMetrics, getHandlerMetrics and getSourceMetrics (all from the IntegrationManagementConfigurer class), which return MessageChannelMetrics, MessageHandlerMetrics, and MessageSourceMetrics, respectively.

See the Javadoc for complete information about these classes.

When JMX is enabled (see JMX Support), IntegrationMBeanExporter also exposes these metrics.

IMPORTANT: defaultLoggingEnabled, defaultCountsEnabled, and defaultStatsEnabled are applied only if you have not explicitly configured the corresponding setting in a bean definition.

Starting with version 5.0.2, the framework automatically detects whether the application context has a single MetricsFactory bean and, if so, uses it instead of the default metrics factory.

These legacy metrics have been deprecated in favor of Micrometer metrics discussed below. Legacy metrics support will be removed in a future release.

Micrometer Integration

Starting with version 5.0.3, the presence of a Micrometer MeterRegistry in the application context triggers support for Micrometer metrics in addition to the built-in metrics (note that the legacy built-in metrics will be removed in a future release).

Micrometer was first supported in version 5.0.2, but changes were made to the Micrometer Meters in version 5.0.3 to make them more suitable for use in dimensional systems. Further changes were made in 5.0.4. If you use Micrometer, a minimum of version 5.0.4 is recommended, since some of the changes in 5.0.4 were breaking API changes.

To use Micrometer, add one of the MeterRegistry beans to the application context. If the IntegrationManagementConfigurer detects exactly one MeterRegistry bean, it configures a MicrometerMetricsCaptor bean with a name of integrationMicrometerMetricsCaptor.

For each MessageHandler and MessageChannel, timers are registered. For each MessageSource, a counter is registered.

This only applies to objects that extend AbstractMessageHandler, AbstractMessageChannel, and AbstractMessageSource (which is the case for most framework components).

With Micrometer metrics, the statsEnabled flag has no effect, since statistics capture is delegated to Micrometer. The countsEnabled flag controls whether the Micrometer Meter instances are updated when processing each message.

The Timer Meters for send operations on message channels have the following names or tags:

  • name: spring.integration.send

  • tag: type:channel

  • tag: name:<componentName>

  • tag: result:(success|failure)

  • tag: exception:(none|exception simple class name)

  • description: Send processing time

(A failure result with a none exception means the channel’s send() operation returned false.)

The Counter Meters for receive operations on pollable message channels have the following names or tags:

  • name: spring.integration.receive

  • tag: type:channel

  • tag: name:<componentName>

  • tag: result:(success|failure)

  • tag: exception:(none|exception simple class name)

  • description: Messages received

The Timer Meters for operations on message handlers have the following names or tags:

  • name: spring.integration.send

  • tag: type:handler

  • tag: name:<componentName>

  • tag: result:(success|failure)

  • tag: exception:(none|exception simple class name)

  • description: Send processing time

The Counter meters for message sources have the following names/tags:

  • name: spring.integration.receive

  • tag: type:source

  • tag: name:<componentName>

  • tag: result:success

  • tag: exception:none

  • description: Messages received

In addition, there are three Gauge Meters:

  • spring.integration.channels: The number of MessageChannels in the application.

  • spring.integration.handlers: The number of MessageHandlers in the application.

  • spring.integration.sources: The number of MessageSources in the application.

It is possible to customize the names and tags of Meters created by integration components by providing a subclass of MicrometerMetricsCaptor. The MicrometerCustomMetricsTests test case shows a simple example of how to do that. You can also further customize the meters by overloading the build() methods on builder subclasses.

MessageChannel Metric Features

These legacy metrics will be removed in a future release. See Micrometer Integration.

Message channels report metrics according to their concrete type. If you are looking at a DirectChannel, you see statistics for the send operation. If it is a QueueChannel, you also see statistics for the receive operation as well as the count of messages that are currently buffered by this QueueChannel. In both cases, some metrics are simple counters (message count and error count), and some are estimates of averages of interesting quantities. The algorithms used to calculate these estimates are described briefly in the following table.

Table 1. MessageChannel Metrics
Metric Type Example Algorithm

Count

Send Count

Simple incrementer. Increases by one when an event occurs.

Error Count

Send Error Count

Simple incrementer. Increases by one when an send results in an error.

Duration

Send Duration (method execution time in milliseconds)

Exponential moving average with decay factor (ten by default). Average of the method execution time over roughly the last ten (by default) measurements.

Rate

Send Rate (number of operations per second)

Inverse of Exponential moving average of the interval between events with decay in time (lapsing over 60 seconds by default) and per measurement (last ten events by default).

Error Rate

Send Error Rate (number of errors per second)

Inverse of exponential moving average of the interval between error events with decay in time (lapsing over 60 seconds by default) and per measurement (last ten events by default).

Ratio

Send Success Ratio (ratio of successful to total sends)

Estimate the success ratio as the exponential moving average of the series composed of values (1 for success and 0 for failure, decaying as per the rate measurement over time and events by default). The error ratio is: 1 - success ratio.

MessageHandler Metric Features

These legacy metrics will be removed in a future release. See Micrometer Integration.

The following table shows the statistics maintained for message handlers. Some metrics are simple counters (message count and error count), and one is an estimate of averages of send duration. The algorithms used to calculate these estimates are described briefly in the following table:

Table 2. MessageHandlerMetrics
Metric Type Example Algorithm

Count

Handle Count

Simple incrementer. Increases by one when an event occurs.

Error Count

Handler Error Count

Simple incrementer. Increases by one when an invocation results in an error.

Active Count

Handler Active Count

Indicates the number of currently active threads currently invoking the handler (or any downstream synchronous flow).

Duration

Handle Duration (method execution time in milliseconds)

Exponential moving average with decay factor (ten by default). Average of the method execution time over roughly the last ten (default) measurements.

Time-Based Average Estimates

A feature of the time-based average estimates is that they decay with time if no new measurements arrive. To help interpret the behavior over time, the time (in seconds) since the last measurement is also exposed as a metric.

There are two basic exponential models: decay per measurement (appropriate for duration and anything where the number of measurements is part of the metric) and decay per time unit (more suitable for rate measurements where the time in between measurements is part of the metric). Both models depend on the fact that S(n) = sum(i=0,i=n) w(i) x(i) has a special form when w(i) = r^i, with r=constant: S(n) = x(n) + r S(n-1) (so you only have to store S(n-1) (not the whole series x(i)) to generate a new metric estimate from the last measurement). The algorithms used in the duration metrics use r=exp(-1/M) with M=10. The net effect is that the estimate, S(n), is more heavily weighted to recent measurements and is composed roughly of the last M measurements. So M is the “window” or lapse rate of the estimate. For the vanilla moving average, i is a counter over the number of measurements. For the rate, we interpret i as the elapsed time or a combination of elapsed time and a counter (so the metric estimate contains contributions roughly from the last M measurements and the last T seconds).

Metrics Factory

A strategy interface MetricsFactory has been introduced to let you provide custom channel metrics for your MessageChannel instances and MessageHandler instances. By default, a DefaultMetricsFactory provides a default implementation of MessageChannelMetrics and MessageHandlerMetrics, described earlier. To override the default MetricsFactory, configure it as described earlier, by providing a reference to your MetricsFactory bean instance. You can either customize the default implementations, as described in the next section, or provide completely different implementations by extending AbstractMessageChannelMetrics or AbstractMessageHandlerMetrics.

In addition to the default metrics factory described earlier, the framework provides the AggregatingMetricsFactory. This factory creates AggregatingMessageChannelMetrics and AggregatingMessageHandlerMetrics instances. In very high volume scenarios, the cost of capturing statistics can be prohibitive (the time to make two calls to the system and store the data for each message). The aggregating metrics aggregate the response time over a sample of messages. This can save significant CPU time.

The statistics are likely to be skewed if messages arrive in bursts. These metrics are intended for use with high, constant-volume, message rates.

The following example shows how to define an aggregrating metrics factory:

<bean id="aggregatingMetricsFactory"
            class="org.springframework.integration.support.management.AggregatingMetricsFactory">
    <constructor-arg value="1000" /> <!-- sample size -->
</bean>

The preceding configuration aggregates the duration over 1000 messages. Counts (send and error) are maintained per-message, but the statistics are per 1000 messages.

Customizing the Default Channel and Handler Statistics

See Time-Based Average Estimates and the Javadoc for the ExponentialMovingAverage* classes for more information about these values.

By default, the DefaultMessageChannelMetrics and DefaultMessageHandlerMetrics use a “window” of ten measurements, a rate period of one second (meaning rate per second) and a decay lapse period of one minute.

If you wish to override these defaults, you can provide a custom MetricsFactory that returns appropriately configured metrics and provide a reference to it in the MBean exporter, as described earlier.

The following example shows how to do so:

public static class CustomMetrics implements MetricsFactory {

    @Override
    public AbstractMessageChannelMetrics createChannelMetrics(String name) {
        return new DefaultMessageChannelMetrics(name,
                new ExponentialMovingAverage(20, 1000000.),
                new ExponentialMovingAverageRate(2000, 120000, 30, true),
                new ExponentialMovingAverageRatio(130000, 40, true),
                new ExponentialMovingAverageRate(3000, 140000, 50, true));
    }

    @Override
    public AbstractMessageHandlerMetrics createHandlerMetrics(String name) {
        return new DefaultMessageHandlerMetrics(name, new ExponentialMovingAverage(20, 1000000.));
    }

}
Advanced Customization

The customizations described earlier are wholesale and apply to all appropriate beans exported by the MBean exporter. This is the extent of customization available when you use XML configuration.

Individual beans can be provided with different implementations using by Java @Configuration or programmatically at runtime (after the application context has been refreshed) by invoking the configureMetrics methods on AbstractMessageChannel and AbstractMessageHandler.

Performance Improvement

Previously, the time-based metrics (see Time-Based Average Estimates) were calculated in real time. The statistics are now calculated when retrieved instead. This resulted in a significant performance improvement, at the expense of a small amount of additional memory for each statistic. As discussed earlier, you can disable the statistics altogether while retaining the MBean that allows the invocation of Lifecycle methods.

JMX Support

Spring Integration provides channel Adapters for receiving and publishing JMX Notifications.

You need to include this dependency into your project:

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jmx</artifactId>
    <version>5.2.0.RELEASE</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-jmx:5.2.0.RELEASE"

An inbound channel adapter allows for polling JMX MBean attribute values, and an outbound channel adapter allows for invoking JMX MBean operations.

Notification-listening Channel Adapter

The notification-listening channel adapter requires a JMX ObjectName for the MBean that publishes notifications to which this listener should be registered. A very simple configuration might resemble the following:

<int-jmx:notification-listening-channel-adapter id="adapter"
    channel="channel"
    object-name="example.domain:name=publisher"/>
The notification-listening-channel-adapter registers with an MBeanServer at startup, and the default bean name is mbeanServer, which happens to be the same bean name generated when using Spring’s <context:mbean-server/> element. If you need to use a different name, be sure to include the mbean-server attribute.

The adapter can also accept a reference to a NotificationFilter and a “handback” object to provide some context that is passed back with each notification. Both of those attributes are optional. Extending the preceding example to include those attributes as well as an explicit MBeanServer bean name produces the following example:

<int-jmx:notification-listening-channel-adapter id="adapter"
    channel="channel"
    mbean-server="someServer"
    object-name="example.domain:name=somePublisher"
    notification-filter="notificationFilter"
    handback="myHandback"/>

The _Notification-listening channel adapter is event-driven and registered with the MBeanServer directly. It does not require any poller configuration.

For this component only, the object-name attribute can contain an object name pattern (for example, "org.something:type=MyType,name=*"). In that case, the adapter receives notifications from all MBeans with object names that match the pattern. In addition, the object-name attribute can contain a SpEL reference to a <util:list> of object name patterns, as the following example shows:

<jmx:notification-listening-channel-adapter id="manyNotificationsAdapter"
    channel="manyNotificationsChannel"
    object-name="#{patterns}"/>

<util:list id="patterns">
    <value>org.foo:type=Foo,name=*</value>
    <value>org.foo:type=Bar,name=*</value>
</util:list>

The names of the located MBean(s) are logged when DEBUG level logging is enabled.

Notification-publishing Channel Adapter

The notification-publishing channel adapter is relatively simple. It requires only a JMX object name in its configuration, as the following example shows:

<context:mbean-export/>

<int-jmx:notification-publishing-channel-adapter id="adapter"
    channel="channel"
    object-name="example.domain:name=publisher"/>

It also requires that an MBeanExporter be present in the context. That is why the <context:mbean-export/> element is also shown in the preceding example.

When messages are sent to the channel for this adapter, the notification is created from the message content. If the payload is a String, it is passed as the message text for the notification. Any other payload type is passed as the userData of the notification.

JMX notifications also have a type, and it should be a dot-delimited String. There are two ways to provide the type. Precedence is always given to a message header value associated with the JmxHeaders.NOTIFICATION_TYPE key. Alternatively, you can provide a fallback default-notification-type attribute in the configuration, as the following example shows:

<context:mbean-export/>

<int-jmx:notification-publishing-channel-adapter id="adapter"
    channel="channel"
    object-name="example.domain:name=publisher"
    default-notification-type="some.default.type"/>

Attribute-polling Channel Adapter

The attribute-polling channel adapter is useful when you need to periodically check on some value that is available through an MBean as a managed attribute. You can configured the poller in the same way as any other polling adapter in Spring Integration (or you can rely on the default poller). The object-name and the attribute-name are required. An MBeanServer reference is also required. However, by default, it automatically checks for a bean named mbeanServer, same as the notification-listening channel adapter described earlier. The following example shows how to configure an attribute-polling channel adapter with XML:

<int-jmx:attribute-polling-channel-adapter id="adapter"
    channel="channel"
    object-name="example.domain:name=someService"
    attribute-name="InvocationCount">
        <int:poller max-messages-per-poll="1" fixed-rate="5000"/>
</int-jmx:attribute-polling-channel-adapter>

Tree-polling Channel Adapter

The tree-polling channel adapter queries the JMX MBean tree and sends a message with a payload that is the graph of objects that matches the query. By default, the MBeans are mapped to primitives and simple objects, such as Map, List, and arrays. Doing so permits simple transformation to (for example) JSON. An MBeanServer reference is also required. However, by default, it automatically checks for a bean named mbeanServer, same as the notification-listening channel adapter described earlier. The following example shows how to configure an tree-polling channel adapter with XML:

<int-jmx:tree-polling-channel-adapter id="adapter"
    channel="channel"
    query-name="example.domain:type=*">
        <int:poller max-messages-per-poll="1" fixed-rate="5000"/>
</int-jmx:tree-polling-channel-adapter>

The preceding example includes all of the attributes on the selected MBeans. You can filter the attributes by providing an MBeanObjectConverter that has an appropriate filter configured. You can provide the converter as a reference to a bean definition by using the converter attribute, or you can use an inner <bean/> definition. Spring Integration provides a DefaultMBeanObjectConverter that can take a MBeanAttributeFilter in its constructor argument.

Spring Integration provides two standard filters. The NamedFieldsMBeanAttributeFilter lets you specify a list of attributes to include. The NotNamedFieldsMBeanAttributeFilter lets you specify a list of attributes to exclude. You can also implement your own filter.

Operation-invoking Channel Adapter

The operation-invoking channel adapter enables message-driven invocation of any managed operation exposed by an MBean. Each invocation requires the operation name to be invoked and the object name of the target MBean. Both of these must be explicitly provided by adapter configuration or via JmxHeaders.OBJECT_NAME and JmxHeaders.OPERATION_NAME message headers, respectively:

<int-jmx:operation-invoking-channel-adapter id="adapter"
    object-name="example.domain:name=TestBean"
    operation-name="ping"/>

Then the adapter only needs to be able to discover the mbeanServer bean. If a different bean name is required, then provide the mbean-server attribute with a reference.

The payload of the message is mapped to the parameters of the operation, if any. A Map-typed payload with String keys is treated as name/value pairs, whereas a List or array is passed as a simple argument list (with no explicit parameter names). If the operation requires a single parameter value, the payload can represent that single value. Also, if the operation requires no parameters, the payload would be ignored.

If you want to expose a channel for a single common operation to be invoked by messages that need not contain headers, that last option works well.

Operation-invoking Outbound Gateway

Similarly to the operation-invoking channel adapter, Spring Integration also provides an operation-invoking outbound gateway, which you can use when dealing with non-void operations when a return value is required. The return value is sent as the message payload to the reply-channel specified by the gateway. The following example shows how to configure an operation-invoking outbound gateway with XML:

<int-jmx:operation-invoking-outbound-gateway request-channel="requestChannel"
   reply-channel="replyChannel"
   object-name="o.s.i.jmx.config:type=TestBean,name=testBeanGateway"
   operation-name="testWithReturn"/>

If you do not provide the reply-channel attribute, the reply message is sent to the channel identified by the IntegrationMessageHeaderAccessor.REPLY_CHANNEL header. That header is typically auto-created by the entry point into a message flow, such as any gateway component. However, if the message flow was started by manually creating a Spring Integration message and sending it directly to a channel, you must specify the message header explicitly or use the reply-channel attribute.

MBean Exporter

Spring Integration components may themselves be exposed as MBeans when the IntegrationMBeanExporter is configured. To create an instance of the IntegrationMBeanExporter, define a bean and provide a reference to an MBeanServer and a domain name (if desired). You can leave out the domain, in which case the default domain is org.springframework.integration. The following example shows how to declare an instance of an IntegrationMBeanExporter and an associated MBeanServer instance:

<int-jmx:mbean-export id="integrationMBeanExporter"
            default-domain="my.company.domain" server="mbeanServer"/>

<bean id="mbeanServer" class="org.springframework.jmx.support.MBeanServerFactoryBean">
    <property name="locateExistingServerIfPossible" value="true"/>
</bean>

The MBean exporter is orthogonal to the one provided in Spring core. It registers message channels and message handlers but does not register itself. You can expose the exporter itself (and certain other components in Spring Integration) by using the standard <context:mbean-export/> tag. The exporter has some metrics attached to it — for instance, a count of the number of active handlers and the number of queued messages.

It also has a useful operation, as discussed in Orderly Shutdown Managed Operation.

Spring Integration 4.0 introduced the @EnableIntegrationMBeanExport annotation to allow for convenient configuration of a default integrationMbeanExporter bean of type IntegrationMBeanExporter with several useful options at the @Configuration class level. The following example shows how to configure this bean:

@Configuration
@EnableIntegration
@EnableIntegrationMBeanExport(server = "mbeanServer", managedComponents = "input")
public class ContextConfiguration {

	@Bean
	public MBeanServerFactoryBean mbeanServer() {
		return new MBeanServerFactoryBean();
	}
}

If you need to provide more options or have several IntegrationMBeanExporter beans (such as for different MBean Servers or to avoid conflicts with the standard Spring MBeanExporter — such as through @EnableMBeanExport), you can configure an IntegrationMBeanExporter as a generic bean.

MBean Object Names

All the MessageChannel, MessageHandler, and MessageSource instances in the application are wrapped by the MBean exporter to provide management and monitoring features. The generated JMX object names for each component type are listed in the following table:

Table 3. MBean Object Names
Component Type Object Name

MessageChannel

 `o.s.i:type=MessageChannel,name=<channelName>`

MessageSource

 `o.s.i:type=MessageSource,name=<channelName>,bean=<source>`

MessageHandler

 `o.s.i:type=MessageSource,name=<channelName>,bean=<source>`

The bean attribute in the object names for sources and handlers takes one of the values in the following table:

Table 4. bean ObjectName Part
Bean Value Description

endpoint

The bean name of the enclosing endpoint (for example <service-activator>), if there is one

anonymous

An indication that the enclosing endpoint did not have a user-specified bean name, so the JMX name is the input channel name.

internal

For well known Spring Integration default components

handler/source

None of the above. Fall back to the toString() method of the object being monitored (handler or source)

You can append custom elements to the object name by providing a reference to a Properties object in the object-name-static-properties attribute.

Also, since Spring Integration 3.0, you can use a custom ObjectNamingStrategy by setting the object-naming-strategy attribute. Doing so permits greater control over the naming of the MBeans, such as grouping all integration MBeans under an 'Integration' type. The following example shows one possible custom naming strategy implementation:

public class Namer implements ObjectNamingStrategy {

	private final ObjectNamingStrategy realNamer = new KeyNamingStrategy();
	@Override
	public ObjectName getObjectName(Object managedBean, String beanKey) throws MalformedObjectNameException {
		String actualBeanKey = beanKey.replace("type=", "type=Integration,componentType=");
		return realNamer.getObjectName(managedBean, actualBeanKey);
	}

}

The beanKey argument is a String that contain the standard object name, beginning with the default-domain and including any additional static properties. The preceding example moves the standard type part to componentType and sets the type to 'Integration', enabling selection of all Integration MBeans in one query:`my.domain:type=Integration,*`. Doing so also groups the beans under one tree entry under the domain in such tools as VisualVM.

The default naming strategy is a MetadataNamingStrategy. The exporter propagates the default-domain to that object to let it generate a fallback object name if parsing of the bean key fails. If your custom naming strategy is a MetadataNamingStrategy (or a subclass of it), the exporter does not propagate the default-domain. You must configure it on your strategy bean.

Starting with version 5.1; any bean names (represented by the name key in the object name) will be quoted if they contain any characters that are not allowed in a Java identifier (or period .).

JMX Improvements

Version 4.2 introduced some important improvements, representing a fairly major overhaul to the JMX support in the framework. These resulted in a significant performance improvement of the JMX statistics collection and much more control thereof. However, it has some implications for user code in a few specific (uncommon) situations. These changes are detailed below, with a caution where necessary.

Metrics Capture

Previously, MessageSource, MessageChannel, and MessageHandler metrics were captured by wrapping the object in a JDK dynamic proxy to intercept appropriate method calls and capture the statistics. The proxy was added when an integration MBean exporter was declared in the context.

Now, the statistics are captured by the beans themselves. See Metrics and Management for more information.

This change means that you no longer automatically get an MBean or statistics for custom MessageHandler implementations, unless those custom handlers extend AbstractMessageHandler. The simplest way to resolve this is to extend AbstractMessageHandler. If you cannot do so, another work around is to implement the MessageHandlerMetrics interface. For convenience, a DefaultMessageHandlerMetrics is provided to capture and report statistics. You should invoke the beforeHandle and afterHandle at the appropriate times. Your MessageHandlerMetrics methods can then delegate to this object to obtain each statistic. Similarly, MessageSource implementations must extend AbstractMessageSource or implement MessageSourceMetrics. Message sources capture only a count, so there is no provided convenience class. You should maintain the count in an AtomicLong field.

The removal of the proxy has two additional benefits:

  • Stack traces in exceptions are reduced (when JMX is enabled) because the proxy is not on the stack

  • Cases where two MBeans were exported for the same bean now only export a single MBean with consolidated attributes and operations (see the MBean consolidation bullet, later).

Resolution

System.nanoTime() (rather than System.currentTimeMillis()) is now used to capture times . This may provide more accuracy on some JVMs, espcially when you expect durations of less than one millisecond.

Setting Initial Statistics Collection State

Previously, when JMX was enabled, all sources, channels, and handlers captured statistics. You can now control whether the statistics are enabled on an individual component. Further, you can capture simple counts on MessageChannel instances and MessageHandler instances instead of capturing the complete time-based statistics. This can have significant performance implications, because you can selectively configure where you need detailed statistics and enable and disable collection at runtime.

@IntegrationManagedResource

Similar to the @ManagedResource annotation, the @IntegrationManagedResource marks a class as being eligible to be exported as an MBean. However, it is exported only if the application context has an IntegrationMBeanExporter.

Certain Spring Integration classes (in the org.springframework.integration) package) that were previously annotated with`@ManagedResource` are now annotated with both @ManagedResource and @IntegrationManagedResource. This is for backwards compatibility (see the next item). Such MBeans are exported by any context MBeanServer or by an IntegrationMBeanExporter (but not both — if both exporters are present, the bean is exported by the integration exporter if the bean matches a managed-components pattern).

Consolidated MBeans

Certain classes within the framework (mapping routers, for example) have additional attributes and operations over and above those provided by metrics and Lifecycle. We use a Router as an example here.

Previously, beans of these types were exported as two distinct MBeans:

  • The metrics MBean (with an object name such as intDomain:type=MessageHandler,name=myRouter,bean=endpoint). This MBean had metrics attributes and metrics/Lifecycle operations.

  • A second MBean (with an object name such as ctxDomain:name=org.springframework.integration.config. RouterFactoryBean#0,type=MethodInvokingRouter`) was exported with the channel mappings attribute and operations.

    Now the attributes and operations are consolidated into a single MBean. The object name depends on the exporter. If exported by the integration MBean exporter, the object name is, for example: intDomain:type=MessageHandler,name=myRouter,bean=endpoint. If exported by another exporter, the object name is, for example: ctxDomain:name=org.springframework.integration.config. RouterFactoryBean#0,type=MethodInvokingRouter. There is no difference between these MBeans (aside from the object name), except that the statistics are not enabled (the attributes are 0) by exporters other than the integration exporter. You can enable statistics at runtime by using the JMX operations. When exported by the integration MBean exporter, the initial state can be managed as described earlier.

    If you currently use the second MBean to change, for example, channel mappings and you use the integration MBean exporter, note that the object name has changed because of the MBean consolidation. There is no change if you are not using the integration MBean exporter.
MBean Exporter Bean Name Patterns

Previously, the managed-components patterns were inclusive only. If a bean name matched one of the patterns, it would be included. Now, the pattern can be negated by prefixing it with !. For example, !thing*, things matches all bean names that do not start with thing except things. Patterns are evaluated left to right. The first match (positive or negative) wins, and then no further patterns are applied.

The addition of this syntax to the pattern causes one possible (although perhaps unlikely) problem. If you have a bean named "!thing" and you included a pattern of !thing in your MBean exporter’s managed-components patterns, it no longer matches; the pattern now matches all beans not named thing. In this case, you can escape the ! in the pattern with \. The \!thing pattern matches a bean named !thing.
IntegrationMBeanExporter changes

The IntegrationMBeanExporter no longer implements SmartLifecycle. This means that start() and stop() operations are no longer available to register and unregister MBeans. The MBeans are now registered during context initialization and unregistered when the context is destroyed.

Orderly Shutdown Managed Operation

The MBean exporter provides a JMX operation to shut down the application in an orderly manner, intended for use before terminating the JVM. The following example shows how to use it:

public void stopActiveComponents(long howLong)

Its use and operation are described in Orderly Shutdown.

Message History

The key benefit of a messaging architecture is loose coupling such that participating components do not maintain any awareness about one another. This fact alone makes an application extremely flexible, letting you change components without affecting the rest of the flow, change messaging routes, change message consuming styles (polling versus event driven), and so on. However, this unassuming style of architecture could prove to be difficult when things go wrong. When debugging, you probably want as much information (its origin, the channels it has traversed, and other details) about the message as you can get.

Message history is one of those patterns that helps by giving you an option to maintain some level of awareness of a message path either for debugging purposes or for maintaining an audit trail. Spring integration provides a simple way to configure your message flows to maintain the message history by adding a header to the message and updating that header every time a message passes through a tracked component.

Message History Configuration

To enable message history, you need only define the message-history element in your configuration, as shown in the following example:

<int:message-history/>

Now every named component (component that has an 'id' defined) is tracked. The framework sets the 'history' header in your message. Its value a List<Properties>.

Consider the following configuration example:

<int:gateway id="sampleGateway" 
    service-interface="org.springframework.integration.history.sample.SampleGateway"
    default-request-channel="bridgeInChannel"/>

<int:chain id="sampleChain" input-channel="chainChannel" output-channel="filterChannel">
  <int:header-enricher>
    <int:header name="baz" value="baz"/>
  </int:header-enricher>
</int:chain>

The preceding configuration produces a simple message history structure, with output similar to the following:

[{name=sampleGateway, type=gateway, timestamp=1283281668091},
 {name=sampleChain, type=chain, timestamp=1283281668094}]

To get access to message history, you need only access the MessageHistory header. The folloiwng example shows how to do so:

Iterator<Properties> historyIterator =
    message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));

You might not want to track all of the components. To limit the history to certain components based on their names, you can provide the tracked-components attribute and specify a comma-delimited list of component names and patterns that match the components you want to track. The following example shows how to do so:

<int:message-history tracked-components="*Gateway, sample*, aName"/>

In the preceding example, message history is maintained only for the components that end with 'Gateway', start with 'sample', or match the name, 'aName', exactly.

Starting with version 4.0, you can also use the @EnableMessageHistory annotation in a @Configuration class. In addition, the MessageHistoryConfigurer bean is now exposed as a JMX MBean by the IntegrationMBeanExporter (see MBean Exporter), letting you change the patterns at runtime. Note, however, that the bean must be stopped (turning off message history) in order to change the patterns. This feature might be useful to temporarily turn on history to analyze a system. The MBean’s object name is <domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer.

If multiple beans (declared by @EnableMessageHistory and <message-history/>) exist, they must all have identical component name patterns (when trimmed and sorted). Do not use a generic <bean/> definition for the MessageHistoryConfigurer.
By definition, the message history header is immutable (you cannot re-write history). Therefore, when writing message history values, the components either create new messages (when the component is an origin) or they copy the history from a request message, modifying it and setting the new list on a reply message. In either case, the values can be appended even if the message itself is crossing thread boundaries. That means that the history values can greatly simplify debugging in an asynchronous message flow.

Message Store

The Enterprise Integration Patterns (EIP) book identifies several patterns that have the ability to buffer messages. For example, an aggregator buffers messages until they can be released, and a QueueChannel buffers messages until consumers explicitly receive those messages from that channel. Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.

To mitigate the risk of losing messages, EIP defines the message store pattern, which lets EIP components store messages, typically in some type of persistent store (such as an RDBMS).

Spring Integration provides support for the message store pattern by:

  • Defining an org.springframework.integration.store.MessageStore strategy interface

  • Providing several implementations of this interface

  • Exposing a message-store attribute on all components that have the capability to buffer messages so that you can inject any instance that implements the MessageStore interface.

Details on how to configure a specific message store implementation and how to inject a MessageStore implementation into a specific buffering component are described throughout the manual (see the specific component, such as QueueChannel, Aggregator, Delayer, and others). The following pair of examples show how to add a reference to a message store for a QueueChannel and for an aggregator:

Example 1. QueueChannel
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
Example 2. Aggregator
<int:aggregator … message-store="refToMessageStore"/>

By default, messages are stored in-memory by using o.s.i.store.SimpleMessageStore, an implementation of MessageStore. That might be fine for development or simple low-volume environments where the potential loss of non-persistent messages is not a concern. However, the typical production application needs a more robust option, not only to mitigate the risk of message loss but also to avoid potential out-of-memory errors. Therefore, we also provide MessageStore implementations for a variety of data-stores. The following is a complete list of supported implementations:

However, be aware of some limitations while using persistent implementations of the MessageStore.

The Message data (payload and headers) is serialized and deserialized by using different serialization strategies, depending on the implementation of the MessageStore. For example, when using JdbcMessageStore, only Serializable data is persisted by default. In this case, non-Serializable headers are removed before serialization occurs. Also, be aware of the protocol-specific headers that are injected by transport adapters (such as FTP, HTTP, JMS, and others). For example, <http:inbound-channel-adapter/> maps HTTP headers into message headers, and one of them is an ArrayList of non-serializable org.springframework.http.MediaType instances. However, you can inject your own implementation of the Serializer and Deserializer strategy interfaces into some MessageStore implementations (such as JdbcMessageStore) to change the behavior of serialization and deserialization.

Pay special attention to the headers that represent certain types of data. For example, if one of the headers contains an instance of some Spring bean, upon deserialization, you may end up with a different instance of that bean, which directly affects some of the implicit headers created by the framework (such as REPLY_CHANNEL or ERROR_CHANNEL). Currently, they are not serializable, but, even if they were, the deserialized channel would not represent the expected instance.

Beginning with Spring Integration version 3.0, you can resolve this issue with a header enricher configured to replace these headers with a name after registering the channel with the HeaderChannelRegistry.

Also, consider what happens when you configure a message-flow as follows: gateway → queue-channel (backed by a persistent Message Store) → service-activator. That gateway creates a temporary reply channel, which is lost by the time the service-activator’s poller reads from the queue. Again, you can use the header enricher to replace the headers with a String representation.

For more information, see Header Enricher.

Spring Integration 4.0 introduced two new interfaces:

  • ChannelMessageStore: To implement operations specific for QueueChannel instances

  • PriorityCapableChannelMessageStore: To mark MessageStore implementations to be used for PriorityChannel instances and to provide priority order for persisted messages.

The real behavior depends on the implementation. The framework provides the following implementations, which can be used as a persistent MessageStore for QueueChannel and PriorityChannel:

Caution about SimpleMessageStore

Starting with version 4.1, the SimpleMessageStore no longer copies the message group when calling getMessageGroup(). For large message groups, this was a significant performance problem. 4.0.1 introduced a boolean copyOnGet property that lets you control this behavior. When used internally by the aggregator, this property was set to false to improve performance. It is now false by default.

Users accessing the group store outside of components such as aggregators now get a direct reference to the group being used by the aggregator instead of a copy. Manipulation of the group outside of the aggregator may cause unpredictable results.

For this reason, you should either not perform such manipulation or set the copyOnGet property to true.

Using MessageGroupFactory

Starting with version 4.3, some MessageGroupStore implementations can be injected with a custom MessageGroupFactory strategy to create and customize the MessageGroup instances used by the MessageGroupStore. This defaults to a SimpleMessageGroupFactory, which produces SimpleMessageGroup instances based on the GroupType.HASH_SET (LinkedHashSet) internal collection. Other possible options are SYNCHRONISED_SET and BLOCKING_QUEUE, where the last one can be used to reinstate the previous SimpleMessageGroup behavior. Also the PERSISTENT option is available. See the next section for more information. Starting with version 5.0.1, the LIST option is also available for when the order and uniqueness of messages in the group does not matter.

Persistent MessageGroupStore and Lazy-load

Starting with version 4.3, all persistent MessageGroupStore instances retrieve MessageGroup instances and their messages from the store in the lazy-load manner. In most cases, it is useful for the correlation MessageHandler instances (see Aggregator and Resequencer), when it would add overhead to load entire the MessageGroup from the store on each correlation operation.

You can use the AbstractMessageGroupStore.setLazyLoadMessageGroups(false) option to switch off the lazy-load behavior from the configuration.

Our performance tests for lazy-load on MongoDB MessageStore (MongoDB Message Store) and <aggregator> (Aggregator) use a custom release-strategy similar to the following:

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

It produces results similar to the following for 1000 simple messages:

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

Metadata Store

Many external systems, services, or resources are not transactional (Twitter, RSS, file systems, and so on), and there is no any ability to mark the data as read. Also, sometimes, you may need to implement the Enterprise Integration Pattern idempotent receiver in some integration solutions. To achieve this goal and store some previous state of the endpoint before the next interaction with external system or to deal with the next message, Spring Integration provides the metadata store component as an an implementation of the org.springframework.integration.metadata.MetadataStore interface with a general key-value contract.

The metadata store is designed to store various types of generic metadata (for example, the published date of the last feed entry that has been processed) to help components such as the feed adapter deal with duplicates. If a component is not directly provided with a reference to a MetadataStore, the algorithm for locating a metadata store is as follows: First, look for a bean with a metadataStore ID in the application context. If one is found, use it. Otherwise, create a new instance of SimpleMetadataStore, which is an in-memory implementation that persists only metadata within the lifecycle of the currently running application context. This means that, upon restart, you may end up with duplicate entries.

If you need to persist metadata between application context restarts, the framework provides the following persistent MetadataStores:

The PropertiesPersistingMetadataStore is backed by a properties file and a PropertiesPersister.

By default, it persists only the state when the application context is closed normally. It implements Flushable so that you can persist the state at will, by invoking flush(). The following example shows how to configure a 'PropertiesPersistingMetadataStore' with XML:

<bean id="metadataStore"
    class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>

Alternatively, you can provide your own implementation of the MetadataStore interface (for example, JdbcMetadataStore) and configure it as a bean in the application context.

Starting with version 4.0, SimpleMetadataStore, PropertiesPersistingMetadataStore, and RedisMetadataStore implement ConcurrentMetadataStore. These provide for atomic updates and can be used across multiple component or application instances.

Idempotent Receiver and Metadata Store

The metadata store is useful for implementing the EIP idempotent receiver pattern when there is need to filter an incoming message if it has already been processed and you can discard it or perform some other logic on discarding. The following configuration shows an example of how to do so:

<int:filter input-channel="serviceChannel"
			output-channel="idempotentServiceChannel"
			discard-channel="discardChannel"
			expression="@metadataStore.get(headers.businessKey) == null"/>

<int:publish-subscribe-channel id="idempotentServiceChannel"/>

<int:outbound-channel-adapter channel="idempotentServiceChannel"
                              expression="@metadataStore.put(headers.businessKey, '')"/>

<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>

The value of the idempotent entry may be an expiration date, after which that entry should be removed from metadata store by some scheduled reaper.

MetadataStoreListener

Some metadata stores (currently only zookeeper) support registering a listener to receive events when items change, as the following example shows:

public interface MetadataStoreListener {

	void onAdd(String key, String value);

	void onRemove(String key, String oldValue);

	void onUpdate(String key, String newValue);
}

See the Javadoc for more information. The MetadataStoreListenerAdapter can be subclassed if you are interested only in a subset of events.

Control Bus

As described in the Enterprise Integration Patterns (EIP) book, the idea behind the control bus is that the same messaging system can be used for monitoring and managing the components within the framework as is used for “application-level” messaging. In Spring Integration, we build upon the adapters described above so that you can send messages as a means of invoking exposed operations.

The following example shows how to configure a control bus with XML:

<int:control-bus input-channel="operationChannel"/>

The control bus has an input channel that can be accessed for invoking operations on the beans in the application context. It also has all the common properties of a service activating endpoint. For example, you can specify an output channel if the result of the operation has a return value that you want to send on to a downstream channel.

The control bus runs messages on the input channel as Spring Expression Language (SpEL) expressions. It takes a message, compiles the body to an expression, adds some context, and then runs it. The default context supports any method that has been annotated with @ManagedAttribute or @ManagedOperation. It also supports the methods on Spring’s Lifecycle interface (and its Pausable extension since version 5.2), and it supports methods that are used to configure several of Spring’s TaskExecutor and TaskScheduler implementations. The simplest way to ensure that your own methods are available to the control bus is to use the @ManagedAttribute or @ManagedOperation annotations. Since those annotations are also used for exposing methods to a JMX MBean registry, they offer a convenient by-product: Often, the same types of operations you want to expose to the control bus are reasonable for exposing through JMX). Resolution of any particular instance within the application context is achieved in the typical SpEL syntax. To do so, provide the bean name with the SpEL prefix for beans (@). For example, to execute a method on a Spring Bean, a client could send a message to the operation channel as follows:

Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)

The root of the context for the expression is the Message itself, so you also have access to the payload and headers as variables within your expression. This is consistent with all the other expression support in Spring Integration endpoints.

With Java annotations, you can configured the control bus as follows:

@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
    return new ExpressionControlBusFactoryBean();
}

Similarly, you can configure Java DSL flow definitions as follows:

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from("controlBus")
              .controlBus()
              .get();
}

If you prefer to use lambdas with automatic DirectChannel creation, you can create a control bus as follows:

@Bean
public IntegrationFlow controlBus() {
    return IntegrationFlowDefinition::controlBus;
}

In this case, the channel is named controlBus.input.

Orderly Shutdown

As described in "MBean Exporter", the MBean exporter provides a JMX operation called stopActiveComponents, which is used to stop the application in an orderly manner. The operation has a single Long parameter. The parameter indicates how long (in milliseconds) the operation waits to allow in-flight messages to complete. The operation works as follows:

  1. Call beforeShutdown() on all beans that implement OrderlyShutdownCapable.

    Doing so lets such components prepare for shutdown. Examples of components that implement this interface and what they do with this call include JMS and AMQP message-driven adapters that stop their listener containers, TCP server connection factories that stop accepting new connections (while keeping existing connections open), TCP inbound endpoints that drop (log) any new messages received, and HTTP inbound endpoints that return 503 - Service Unavailable for any new requests.

  2. Stop any active channels, such as JMS- or AMQP-backed channels.

  3. Stop all MessageSource instances.

  4. Stop all inbound MessageProducer s (that are not OrderlyShutdownCapable).

  5. Wait for any remaining time left, as defined by the value of the Long parameter passed in to the operation.

    Doing so lets any in-flight messages complete their journeys. It is therefore important to select an appropriate timeout when invoking this operation.

  6. Call afterShutdown() on all OrderlyShutdownCapable components.

    Doing so lets such components perform final shutdown tasks (closing all open sockets, for example).

As discussed in Orderly Shutdown Managed Operation, this operation can be invoked by using JMX. If you wish to programmatically invoke the method, you need to inject or otherwise get a reference to the IntegrationMBeanExporter. If no id attribute is provided on the <int-jmx:mbean-export/> definition, the bean has a generated name. This name contains a random component to avoid ObjectName collisions if multiple Spring Integration contexts exist in the same JVM (MBeanServer).

For this reason, if you wish to invoke the method programmatically, we recommend that you provide the exporter with an id attribute so that you can easily access it in the application context.

Finally, the operation can be invoked by using the <control-bus> element. See the monitoring Spring Integration sample application for details.

The algorithm described earlier was improved in version 4.1. Previously, all task executors and schedulers were stopped. This could cause mid-flow messages in QueueChannel instances to remain. Now the shutdown leaves pollers running, to let these messages be drained and processed.

Integration Graph

Starting with version 4.3, Spring Integration provides access to an application’s runtime object model, which can, optionally, include component metrics. It is exposed as a graph, which may be used to visualize the current state of the integration application. The o.s.i.support.management.graph package contains all the required classes to collect, build, and render the runtime state of Spring Integration components as a single tree-like Graph object. The IntegrationGraphServer should be declared as a bean to build, retrieve, and refresh the Graph object. The resulting Graph object can be serialized to any format, although JSON is flexible and convenient to parse and represent on the client side. A Spring Integration application with only the default components would expose a graph as follows:

{
  "contentDescriptor" : {
    "providerVersion" : "5.2.0.RELEASE",
    "providerFormatVersion" : 1.1,
    "provider" : "spring-integration",
    "name" : "myAppName:1.0"
  },
  "nodes" : [ {
    "nodeId" : 1,
    "componentType" : "null-channel",
    "properties" : { },
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 0.0,
        "max" : 0.0
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "receiveCounters" : {
      "successes" : 0,
      "failures" : 0
    },
    "name" : "nullChannel"
  }, {
    "nodeId" : 2,
    "componentType" : "publish-subscribe-channel",
    "properties" : { },
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 7.807002,
        "max" : 7.807002
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "name" : "errorChannel"
  }, {
    "nodeId" : 3,
    "componentType" : "logging-channel-adapter",
    "properties" : { },
    "output" : null,
    "input" : "errorChannel",
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 6.742722,
        "max" : 6.742722
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "name" : "_org.springframework.integration.errorLogger"
  } ],
  "links" : [ {
    "from" : 2,
    "to" : 3,
    "type" : "input"
  } ]
}
Version 5.2 has deprecated the legacy metrics in favor of Micrometer meters as discussed Metrics Management. While not shown above, the legacy metrics (under the stats child node) will continue to appear in the graph, but with an extra child node "deprecated" : "stats are deprecated in favor of sendTimers and receiveCounters". The providerFormatVersion has been changed to 1.1 to reflect this change.

In the preceding example, the graph consists of three top-level elements.

The contentDescriptor graph element contains general information about the application providing the data. The name can be customized on the IntegrationGraphServer bean or in the spring.application.name application context environment property. Other properties are provided by the framework and let you distinguish a similar model from other sources.

The links graph element represents connections between nodes from the nodes graph element and, therefore, between integration components in the source Spring Integration application. For example, from a MessageChannel to an EventDrivenConsumer with some MessageHandler or from an AbstractReplyProducingMessageHandler to a MessageChannel. For convenience and to let you determine a link’s purpose, the model includes the type attribute. The possible types are:

  • input: Identifies the direction from MessageChannel to the endpoint, inputChannel, or requestChannel property

  • output: The direction from the MessageHandler, MessageProducer, or SourcePollingChannelAdapter to the MessageChannel through an outputChannel or replyChannel property

  • error: From MessageHandler on PollingConsumer or MessageProducer or SourcePollingChannelAdapter to the MessageChannel through an errorChannel property;

  • discard: From DiscardingMessageHandler (such as MessageFilter) to the MessageChannel through an errorChannel property.

  • route: From AbstractMappingMessageRouter (such as HeaderValueRouter) to the MessageChannel. Similar to output but determined at run-time. May be a configured channel mapping or a dynamically resolved channel. Routers typically retain only up to 100 dynamic routes for this purpose, but you can modify this value by setting the dynamicChannelLimit property.

The information from this element can be used by a visualization tool to render connections between nodes from the nodes graph element, where the from and to numbers represent the value from the nodeId property of the linked nodes. For example, the link element can be used to determine the proper port on the target node.

The following “text image” shows the relationships between the types:

              +---(discard)
              |
         +----o----+
         |         |
         |         |
         |         |
(input)--o         o---(output)
         |         |
         |         |
         |         |
         +----o----+
              |
              +---(error)

The nodes graph element is perhaps the most interesting, because its elements contain not only the runtime components with their componentType instances and name values but can also optionally contain metrics exposed by the component. Node elements contain various properties that are generally self-explanatory. For example, expression-based components include the expression property that contains the primary expression string for the component. To enable the metrics, add an @EnableIntegrationManagement to a @Configuration class or add an <int:management/> element to your XML configuration. You can control exactly which components in the framework collect statistics. See Metrics and Management for complete information. See the stats attribute from the o.s.i.errorLogger component in the JSON example shown earlier. In this case, The nullChannel and errorChannel do not provide statistics information, because the configuration for this example was as follows:

@Configuration
@EnableIntegration
@EnableIntegrationManagement(statsEnabled = "_org.springframework.integration.errorLogger.handler",
      countsEnabled = "!*",
      defaultLoggingEnabled = "false")
public class ManagementConfiguration {

    @Bean
    public IntegrationGraphServer integrationGraphServer() {
        return new IntegrationGraphServer();
    }

}

The nodeId represents a unique incremental identifier to let you distinguish one component from another. It is also used in the links element to represent a relationship (connection) of this component to others, if any. The input and output attributes are for the inputChannel and outputChannel properties of the AbstractEndpoint, MessageHandler, SourcePollingChannelAdapter, or MessageProducerSupport. See the next section for more information.

Starting with version 5.1, the IntegrationGraphServer accepts a Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback for population of additional properties on the IntegrationNode for a particular NamedComponent. For example you can expose the SmartLifecycle autoStartup and running properties into the target graph:

server.setAdditionalPropertiesCallback(namedComponent -> {
            Map<String, Object> properties = null;
            if (namedComponent instanceof SmartLifecycle) {
                SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
                properties = new HashMap<>();
                properties.put("auto-startup", smartLifecycle.isAutoStartup());
                properties.put("running", smartLifecycle.isRunning());
            }
            return properties;
        });

Graph Runtime Model

Spring Integration components have various levels of complexity. For example, any polled MessageSource also has a SourcePollingChannelAdapter and a MessageChannel to which to periodically send messages from the source data. Other components might be middleware request-reply components (such as JmsOutboundGateway) with a consuming AbstractEndpoint to subscribe to (or poll) the requestChannel (input) for messages, and a replyChannel (output) to produce a reply message to send downstream. Meanwhile, any MessageProducerSupport implementation (such as ApplicationEventListeningMessageProducer) wraps some source protocol listening logic and sends messages to the outputChannel.

Within the graph, Spring Integration components are represented by using the IntegrationNode class hierarchy, which you can find in the o.s.i.support.management.graph package. For example, you can use the ErrorCapableDiscardingMessageHandlerNode for the AggregatingMessageHandler (because it has a discardChannel option) and can produce errors when consuming from a PollableChannel by using a PollingConsumer. Another example is CompositeMessageHandlerNode — for a MessageHandlerChain when subscribed to a SubscribableChannel by using an EventDrivenConsumer.

The @MessagingGateway (see Messaging Gateways) provides nodes for each of its method, where the name attribute is based on the gateway’s bean name and the short method signature. Consider the following example of a gateway:
@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {

	void foo(String foo);

	void foo(Integer foo);

	void bar(String bar);

}

The preceding gateway produces nodes similar to the following:

{
  "nodeId" : 10,
  "name" : "gate.bar(class java.lang.String)",
  "stats" : null,
  "componentType" : "gateway",
  "output" : "four",
  "errors" : null
},
{
  "nodeId" : 11,
  "name" : "gate.foo(class java.lang.String)",
  "stats" : null,
  "componentType" : "gateway",
  "output" : "four",
  "errors" : null
},
{
  "nodeId" : 12,
  "name" : "gate.foo(class java.lang.Integer)",
  "stats" : null,
  "componentType" : "gateway",
  "output" : "four",
  "errors" : null
}

You can use this IntegrationNode hierarchy for parsing the graph model on the client side as well as to understand the general Spring Integration runtime behavior. See also Programming Tips and Tricks for more information.

Integration Graph Controller

If your application is web-based (or built on top of Spring Boot with an embedded web container) and the Spring Integration HTTP or WebFlux module (see HTTP Support and WebFlux Support, respectively) is present on the classpath, you can use a IntegrationGraphController to expose the IntegrationGraphServer functionality as a REST service. For this purpose, the @EnableIntegrationGraphController and @Configuration class annotations and the <int-http:graph-controller/> XML element are available in the HTTP module. Together with the @EnableWebMvc annotation (or <mvc:annotation-driven/> for XML definitions), this configuration registers an IntegrationGraphController @RestController where its @RequestMapping.path can be configured on the @EnableIntegrationGraphController annotation or <int-http:graph-controller/> element. The default path is /integration.

The IntegrationGraphController @RestController provides the following services:

  • @GetMapping(name = "getGraph"): To retrieve the state of the Spring Integration components since the last IntegrationGraphServer refresh. The o.s.i.support.management.graph.Graph is returned as a @ResponseBody of the REST service.

  • @GetMapping(path = "/refresh", name = "refreshGraph"): To refresh the current Graph for the actual runtime state and return it as a REST response. It is not necessary to refresh the graph for metrics. They are provided in real-time when the graph is retrieved. Refresh can be called if the application context has been modified since the graph was last retrieved. In that case, the graph is completely rebuilt.

You can set security and cross-origin restrictions for the IntegrationGraphController with the standard configuration options and components provided by the Spring Security and Spring MVC projects. The following example achieves those goals:

<mvc:annotation-driven />

<mvc:cors>
	<mvc:mapping path="/myIntegration/**"
				 allowed-origins="http://localhost:9090"
				 allowed-methods="GET" />
</mvc:cors>

<security:http>
    <security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>


<int-http:graph-controller path="/myIntegration" />

The following example shows how to do the same thing with Java configuration:

@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) throws Exception {
	    http
            .authorizeRequests()
               .antMatchers("/testIntegration/**").hasRole("ADMIN")
            // ...
            .formLogin();
    }

    //...

}

Note that, for convenience, the @EnableIntegrationGraphController annotation provides an allowedOrigins attribute. This provides GET access to the path. For more sophistication, you can configure the CORS mappings by using standard Spring MVC mechanisms.