10. System Management

10.1 Metrics and Management

10.1.1 Configuring Metrics Capture

[Note]Note

Prior to version 4.2 metrics were only available when JMX was enabled. See Section 10.2, “JMX Support”.

To enable MessageSource, MessageChannel and MessageHandler metrics, add an <int:management/> bean to the application context, or annotate one of your @Configuration classes with @EnableIntegrationManagement. MessageSource s only maintain counts, MessageChannel s and MessageHandler s maintain duration statistics in addition to counts. See Section 10.1.3, “MessageChannel Metric Features” and Section 10.1.4, “MessageHandler Metric Features” below.

This causes the automatic registration of the IntegrationManagementConfigurer bean in the application context. Only one such bean can exist in the context and it must have the bean name integrationManagementConfigurer if registered manually via a <bean/> definition. This bean applies it’s configuration to beans after all beans in the context have been instantiated.

In addition to metrics, you can control debug logging in the main message flow. It has been found that in very high volume applications, even calls to isDebugEnabled() can be quite expensive with some logging subsystems. You can disable all such logging to avoid this overhead; exception logging (debug or otherwise) are not affected by this setting.

A number of options are available:

<int:management
    default-logging-enabled="true" 1
    default-counts-enabled="false" 2
    default-stats-enabled="false" 3
    counts-enabled-patterns="foo, !baz, ba*" 4
    stats-enabled-patterns="fiz, buz" 5
    metrics-factory="myMetricsFactory" /> 6
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
    defaultLoggingEnabled = "true", 1
    defaultCountsEnabled = "false", 2
    defaultStatsEnabled = "false", 3
    countsEnabled = { "foo", "${count.patterns}" }, 4
    statsEnabled = { "qux", "!*" }, 5
    MetricsFactory = "myMetricsFactory") 6
public static class ContextConfiguration {
...
}

1 1

Set to false to disable all logging in the main message flow, regardless of the log system category settings. Set to true to enable debug logging (if also enabled by the logging subsystem). Only applied if you have not explicitly configured the setting in a bean definition. Default true.

2 2

Enable or disable count metrics for components not matching one of the patterns in <4>. Only applied if you have not explicitly configured the setting in a bean definition. Default false.

3 3

Enable or disable statistical metrics for components not matching one of the patterns in <5>. Only applied if you have not explicitly configured the setting in a bean definition. Default false.

4 4

A comma-delimited list of patterns for beans for which counts should be enabled; negate the pattern with !. First match wins (positive or negative). In the unlikely event that you have a bean name starting with !, escape the ! in the pattern: \!foo positively matches a bean named !foo.

5 5

A comma-delimited list of patterns for beans for which statistical metrics should be enabled; negate the pattern with !. First match wins (positive or negative). In the unlikely event that you have a bean name starting with !, escape the ! in the pattern: \!foo positively matches a bean named !foo. Stats implies counts.

6 6

A reference to a MetricsFactory. See Section 10.1.6, “Metrics Factory”.

At runtime, counts and statistics can be obtained by calling IntegrationManagementConfigurer getChannelMetrics, getHandlerMetrics and getSourceMetrics, returning MessageChannelMetrics, MessageHandlerMetrics and MessageSourceMetrics respectively.

See the javadocs for complete information about these classes.

When JMX is enabled (see Section 10.2, “JMX Support”), these metrics are also exposed by the IntegrationMBeanExporter.

[Important]Important

defaultLoggingEnabled, defaultCountsEnabled, and defaultStatsEnabled are only applied if you have not explicitly configured the corresponding setting in a bean definition.

Starting with version 5.0.2, the framework will automatically detect if there is a single MetricsFactory bean in the application context and use it instead of the default metrics factory.

10.1.2 Micrometer Integration

Starting with version 5.0.3, the presence of a Micrometer MeterRegistry in the application context will trigger support for Micrometer metrics in addition to the inbuilt metrics (inbuilt metrics will be removed in a future release).

[Important]Important

Micrometer was first supported in version 5.0.2, but changes were made to the Micrometer Meters in version 5.0.3 to make them more suitable for use in dimensional systems. Further changes were made in 5.0.4; if using Micrometer, a minimum of version 5.0.4 is recommended since some of the changes in 5.0.4 were breaking API changes.

Simply add a MeterRegistry bean of choice to the application context. If the IntegrationManagementConfigurer detects exactly one MeterRegistry bean, it will configure a MicrometerMetricsCaptor bean with name integrationMicrometerMetricsCaptor.

For each MessageHandler and MessageChannel, timers are registered. For each MessageSource, a counter is registered.

This only applies to objects that extend AbstractMessageHandler, AbstractMessageChannel and AbstractMessageSource respectively (which is the case for most framework components).

With Micrometer metrics, the statsEnabled flag takes no effect, since statistics capture is delegated to Micrometer. The countsEnabled flag controls whether the Micrometer Meter s are updated when processing each message.

The Timer Meters for send operations on message channels have the following name/tags:

  • name : spring.integration.send
  • tag : type:channel
  • tag : name:<componentName>
  • tag : result:(success|failure)
  • tag : exception:(none|exception simple class name)
  • description : Send processing time

(A failure result with a none exception means the channel send() operation returned false).

The Counter Meters for receive operations on pollable message channels have the following names/tags:

  • name : spring.integration.receive
  • tag : type:channel
  • tag : name:<componentName>
  • tag : result:(success|failure)
  • tag : exception:(none|exception simple class name)
  • description : Messages received

The Timer Meters for operations on message handlers have the following name/tags:

  • name : spring.integration.send
  • tag : type:handler
  • tag : name:<componentName>
  • tag : result:(success|failure)
  • tag : exception:(none|exception simple class name)
  • description : Send processing time

The Counter meters for message sources have the following names/tags:

  • name : spring.integration.receive
  • tag : type:source
  • tag : name:<componentName>
  • tag : result:success
  • tag : exception:none
  • description : Messages received

In addition, there are three Gauge Meters:

spring.integration.channels - the number of MessageChannels in the application. spring.integration.handlers - the number of MessageHandlers in the application. spring.integration.sources - the number of MessageSources in the application.

10.1.3 MessageChannel Metric Features

These legacy metrics will be removed in a future release; see Section 10.1.2, “Micrometer Integration”.

Message channels report metrics according to their concrete type. If you are looking at a DirectChannel, you will see statistics for the send operation. If it is a QueueChannel, you will also see statistics for the receive operation, as well as the count of messages that are currently buffered by this QueueChannel. In both cases there are some metrics that are simple counters (message count and error count), and some that are estimates of averages of interesting quantities. The algorithms used to calculate these estimates are described briefly in the section below.

Table 10.1. MessageChannel Metrics

Metric TypeExampleAlgorithm

Count

Send Count

Simple incrementer. Increases by one when an event occurs.

Error Count

Send Error Count

Simple incrementer. Increases by one when an send results in an error.

Duration

Send Duration (method execution time in milliseconds)

Exponential Moving Average with decay factor (10 by default). Average of the method execution time over roughly the last 10 (default) measurements.

Rate

Send Rate (number of operations per second)

Inverse of Exponential Moving Average of the interval between events with decay in time (lapsing over 60 seconds by default) and per measurement (last 10 events by default).

Error Rate

Send Error Rate (number of errors per second)

Inverse of Exponential Moving Average of the interval between error events with decay in time (lapsing over 60 seconds by default) and per measurement (last 10 events by default).

Ratio

Send Success Ratio (ratio of successful to total sends)

Estimate the success ratio as the Exponential Moving Average of the series composed of values 1 for success and 0 for failure (decaying as per the rate measurement over time and events by default). Error ratio is 1 - success ratio.


10.1.4 MessageHandler Metric Features

These legacy metrics will be removed in a future release; see Section 10.1.2, “Micrometer Integration”.

The following table shows the statistics maintained for message handlers. Some metrics are simple counters (message count and error count), and one is an estimate of averages of send duration. The algorithms used to calculate these estimates are described briefly in the table below:

Table 10.2. MessageHandlerMetrics

Metric TypeExampleAlgorithm

Count

Handle Count

Simple incrementer. Increases by one when an event occurs.

Error Count

Handler Error Count

Simple incrementer. Increases by one when an invocation results in an error.

Active Count

Handler Active Count

Indicates the number of currently active threads currently invoking the handler (or any downstream synchronous flow).

Duration

Handle Duration (method execution time in milliseconds)

Exponential Moving Average with decay factor (10 by default). Average of the method execution time over roughly the last 10 (default) measurements.


10.1.5 Time-Based Average Estimates

A feature of the time-based average estimates is that they decay with time if no new measurements arrive. To help interpret the behaviour over time, the time (in seconds) since the last measurement is also exposed as a metric.

There are two basic exponential models: decay per measurement (appropriate for duration and anything where the number of measurements is part of the metric), and decay per time unit (more suitable for rate measurements where the time in between measurements is part of the metric). Both models depend on the fact that

S(n) = sum(i=0,i=n) w(i) x(i) has a special form when w(i) = r^i, with r=constant:

S(n) = x(n) + r S(n-1) (so you only have to store S(n-1), not the whole series x(i), to generate a new metric estimate from the last measurement). The algorithms used in the duration metrics use r=exp(-1/M) with M=10. The net effect is that the estimate S(n) is more heavily weighted to recent measurements and is composed roughly of the last M measurements. So M is the "window" or lapse rate of the estimate In the case of the vanilla moving average, i is a counter over the number of measurements. In the case of the rate we interpret i as the elapsed time, or a combination of elapsed time and a counter (so the metric estimate contains contributions roughly from the last M measurements and the last T seconds).

10.1.6 Metrics Factory

A strategy interface MetricsFactory has been introduced allowing you to provide custom channel metrics for your MessageChannel s and MessageHandler s. By default, a DefaultMetricsFactory provides default implementation of MessageChannelMetrics and MessageHandlerMetrics which are described above. To override the default MetricsFactory configure it as described above, by providing a reference to your MetricsFactory bean instance. You can either customize the default implementations as described in the next bullet, or provide completely different implementations by extending AbstractMessageChannelMetrics and/or AbstractMessageHandlerMetrics.

Also see Section 10.1.2, “Micrometer Integration”.

In addition to the default metrics factory described above, the framework provides the AggregatingMetricsFactory. This factory creates AggregatingMessageChannelMetrics and AggregatingMessageHandlerMetrics. In very high volume scenarios, the cost of capturing statistics can be prohibitive (2 calls to the system time and storing the data for each message). The aggregating metrics aggregate the response time over a sample of messages. This can save significant CPU time.

[Caution]Caution

The statistics will be skewed if messages arrive in bursts. These metrics are intended for use with high, constant-volume, message rates.

<bean id="aggregatingMetricsFactory"
            class="org.springframework.integration.support.management.AggregatingMetricsFactory">
    <constructor-arg value="1000" /> <!-- sample size -->
</bean>

The above configuration aggregates the duration over 1000 messages. Counts (send, error) are maintained per-message but the statistics are per 1000 messages.

  • Customizing the Default Channel/Handler Statistics

See Section 10.1.5, “Time-Based Average Estimates” and the Javadocs for the ExponentialMovingAverage* classes for more information about these values.

By default, the DefaultMessageChannelMetrics and DefaultMessageHandlerMetrics use a window of 10 measurements, a rate period of 1 second (rate per second) and a decay lapse period of 1 minute.

If you wish to override these defaults, you can provide a custom MetricsFactory that returns appropriately configured metrics and provide a reference to it to the MBean exporter as described above.

Example:

public static class CustomMetrics implements MetricsFactory {

    @Override
    public AbstractMessageChannelMetrics createChannelMetrics(String name) {
        return new DefaultMessageChannelMetrics(name,
                new ExponentialMovingAverage(20, 1000000.),
                new ExponentialMovingAverageRate(2000, 120000, 30, true),
                new ExponentialMovingAverageRatio(130000, 40, true),
                new ExponentialMovingAverageRate(3000, 140000, 50, true));
    }

    @Override
    public AbstractMessageHandlerMetrics createHandlerMetrics(String name) {
        return new DefaultMessageHandlerMetrics(name, new ExponentialMovingAverage(20, 1000000.));
    }

}
  • Advanced Customization

The customizations described above are wholesale and will apply to all appropriate beans exported by the MBean exporter. This is the extent of customization available using XML configuration.

Individual beans can be provided with different implementations using java @Configuration or programmatically at runtime, after the application context has been refreshed, by invoking the configureMetrics methods on AbstractMessageChannel and AbstractMessageHandler.

  • Performance Improvement

Previously, the time-based metrics (see Section 10.1.5, “Time-Based Average Estimates”) were calculated in real time. The statistics are now calculated when retrieved instead. This resulted in a significant performance improvement, at the expense of a small amount of additional memory for each statistic. As discussed in the bullet above, the statistics can be disabled altogether, while retaining the MBean allowing the invocation of Lifecycle methods.

10.2 JMX Support

Spring Integration provides Channel Adapters for receiving and publishing JMX Notifications. There is also an_Inbound Channel Adapter_ for polling JMX MBean attribute values, and an Outbound Channel Adapter for invoking JMX MBean operations.

10.2.1 Notification Listening Channel Adapter

The Notification-listening Channel Adapter requires a JMX ObjectName for the MBean that publishes notifications to which this listener should be registered. A very simple configuration might look like this:

<int-jmx:notification-listening-channel-adapter id="adapter"
    channel="channel"
    object-name="example.domain:name=publisher"/>
[Tip]Tip

The notification-listening-channel-adapter registers with an MBeanServer at startup, and the default bean name is mbeanServer which happens to be the same bean name generated when using Spring’s <context:mbean-server/> element. If you need to use a different name, be sure to include the_mbean-server_ attribute.

The adapter can also accept a reference to a NotificationFilter and a handback Object to provide some context that is passed back with each Notification. Both of those attributes are optional. Extending the above example to include those attributes as well as an explicit MBeanServer bean name would produce the following:

<int-jmx:notification-listening-channel-adapter id="adapter"
    channel="channel"
    mbean-server="someServer"
    object-name="example.domain:name=somePublisher"
    notification-filter="notificationFilter"
    handback="myHandback"/>

The Notification-listening Channel Adapter is event-driven and registered with the MBeanServer directly. It does not require any poller configuration.

[Note]Note

For this component only, the object-name attribute can contain an ObjectName pattern (e.g. "org.foo:type=Bar,name=*") and the adapter will receive notifications from all MBeans with ObjectNames that match the pattern. In addition, the object-name attribute can contain a SpEL reference to a <util:list/> of ObjectName patterns:

<jmx:notification-listening-channel-adapter id="manyNotificationsAdapter"
    channel="manyNotificationsChannel"
    object-name="#{patterns}"/>

<util:list id="patterns">
    <value>org.foo:type=Foo,name=*</value>
    <value>org.foo:type=Bar,name=*</value>
</util:list>

The names of the located MBean(s) will be logged when DEBUG level logging is enabled.

10.2.2 Notification Publishing Channel Adapter

The Notification-publishing Channel Adapter is relatively simple. It only requires a JMX ObjectName in its configuration as shown below.

<context:mbean-export/>

<int-jmx:notification-publishing-channel-adapter id="adapter"
    channel="channel"
    object-name="example.domain:name=publisher"/>

It does also require that an MBeanExporter be present in the context. That is why the <context:mbean-export/> element is shown above as well.

When Messages are sent to the channel for this adapter, the Notification is created from the Message content. If the payload is a String it will be passed as the message text for the Notification. Any other payload type will be passed as the userData of the Notification.

JMX Notifications also have a type, and it should be a dot-delimited String. There are two ways to provide the type. Precedence will always be given to a Message header value associated with the JmxHeaders.NOTIFICATION_TYPE key. On the other hand, you can rely on a fallback default-notification-type attribute provided in the configuration.

<context:mbean-export/>

<int-jmx:notification-publishing-channel-adapter id="adapter"
    channel="channel"
    object-name="example.domain:name=publisher"
    default-notification-type="some.default.type"/>

10.2.3 Attribute Polling Channel Adapter

The Attribute Polling Channel Adapter is useful when you have a requirement, to periodically check on some value that is available through an MBean as a managed attribute. The poller can be configured in the same way as any other polling adapter in Spring Integration (or it’s possible to rely on the default poller). The object-name and attribute-name are required. An MBeanServer reference is also required, but it will automatically check for a bean named mbeanServer by default, just like the Notification-listening Channel Adapter described above.

<int-jmx:attribute-polling-channel-adapter id="adapter"
    channel="channel"
    object-name="example.domain:name=someService"
    attribute-name="InvocationCount">
        <int:poller max-messages-per-poll="1" fixed-rate="5000"/>
</int-jmx:attribute-polling-channel-adapter>

10.2.4 Tree Polling Channel Adapter

The Tree Polling Channel Adapter queries the JMX MBean tree and sends a message with a payload that is the graph of objects that matches the query. By default the MBeans are mapped to primitives and simple Objects like Map, List and arrays - permitting simple transformation, for example, to JSON. An MBeanServer reference is also required, but it will automatically check for a bean named mbeanServer by default, just like the Notification-listening Channel Adapter described above. A basic configuration would be:

<int-jmx:tree-polling-channel-adapter id="adapter"
    channel="channel"
    query-name="example.domain:type=*">
        <int:poller max-messages-per-poll="1" fixed-rate="5000"/>
</int-jmx:tree-polling-channel-adapter>

This will include all attributes on the MBeans selected. You can filter the attributes by providing an MBeanObjectConverter that has an appropriate filter configured. The converter can be provided as a reference to a bean definition using the converter attribute, or as an inner <bean/> definition. A DefaultMBeanObjectConverter is provided which can take a MBeanAttributeFilter in its constructor argument.

Two standard filters are provided; the NamedFieldsMBeanAttributeFilter allows you to specify a list of attributes to include and the NotNamedFieldsMBeanAttributeFilter allows you to specify a list of attributes to exclude. You can also implement your own filter

10.2.5 Operation Invoking Channel Adapter

The operation-invoking-channel-adapter enables Message-driven invocation of any managed operation exposed by an MBean. Each invocation requires the operation name to be invoked and the ObjectName of the target MBean. Both of these must be explicitly provided via adapter configuration:

<int-jmx:operation-invoking-channel-adapter id="adapter"
    object-name="example.domain:name=TestBean"
    operation-name="ping"/>

Then the adapter only needs to be able to discover the mbeanServer bean. If a different bean name is required, then provide the mbean-server attribute with a reference.

The payload of the Message will be mapped to the parameters of the operation, if any. A Map-typed payload with String keys is treated as name/value pairs, whereas a List or array would be passed as a simple argument list (with no explicit parameter names). If the operation requires a single parameter value, then the payload can represent that single value, and if the operation requires no parameters, then the payload would be ignored.

If you want to expose a channel for a single common operation to be invoked by Messages that need not contain headers, then that option works well.

10.2.6 Operation Invoking Outbound Gateway

Similar to the operation-invoking-channel-adapter Spring Integration also provides a operation-invoking-outbound-gateway, which could be used when dealing with non-void operations and a return value is required. Such return value will be sent as message payload to the reply-channel specified by this Gateway.

<int-jmx:operation-invoking-outbound-gateway request-channel="requestChannel"
   reply-channel="replyChannel"
   object-name="o.s.i.jmx.config:type=TestBean,name=testBeanGateway"
   operation-name="testWithReturn"/>

If the reply-channel attribute is not provided, the reply message will be sent to the channel that is identified by the IntegrationMessageHeaderAccessor.REPLY_CHANNEL header. That header is typically auto-created by the entry point into a message flow, such as any Gateway component. However, if the message flow was started by manually creating a Spring Integration Message and sending it directly to a Channel, then you must specify the message header explicitly or use the provided reply-channel attribute.

10.2.7 MBean Exporter

Spring Integration components themselves may be exposed as MBeans when the IntegrationMBeanExporter is configured. To create an instance of the IntegrationMBeanExporter, define a bean and provide a reference to an MBeanServer and a domain name (if desired). The domain can be left out, in which case the default domain is org.springframework.integration.

<int-jmx:mbean-export id="integrationMBeanExporter"
            default-domain="my.company.domain" server="mbeanServer"/>

<bean id="mbeanServer" class="org.springframework.jmx.support.MBeanServerFactoryBean">
    <property name="locateExistingServerIfPossible" value="true"/>
</bean>
[Important]Important

The MBean exporter is orthogonal to the one provided in Spring core - it registers message channels and message handlers, but not itself. You can expose the exporter itself, and certain other components in Spring Integration, using the standard <context:mbean-export/> tag. The exporter has a some metrics attached to it, for instance a count of the number of active handlers and the number of queued messages.

It also has a useful operation, as discussed in the section called “Orderly Shutdown Managed Operation”.

Starting with Spring Integration 4.0 the @EnableIntegrationMBeanExport annotation has been introduced for convenient configuration of a default (integrationMbeanExporter) bean of type IntegrationMBeanExporter with several useful options at the @Configuration class level. For example:

@Configuration
@EnableIntegration
@EnableIntegrationMBeanExport(server = "mbeanServer", managedComponents = "input")
public class ContextConfiguration {

	@Bean
	public MBeanServerFactoryBean mbeanServer() {
		return new MBeanServerFactoryBean();
	}
}

If there is a need to provide more options, or have several IntegrationMBeanExporter beans e.g. for different MBean Servers, or to avoid conflicts with the standard Spring MBeanExporter (e.g. via @EnableMBeanExport), you can simply configure an IntegrationMBeanExporter as a generic bean.

MBean ObjectNames

All the MessageChannel, MessageHandler and MessageSource instances in the application are wrapped by the MBean exporter to provide management and monitoring features. The generated JMX object names for each component type are listed in the table below:

Table 10.3. MBean ObjectNames

Component TypeObjectName

MessageChannel

o.s.i:type=MessageChannel,name=<channelName>

MessageSource

o.s.i:type=MessageSource,name=<channelName>,bean=<source>

MessageHandler

o.s.i:type=MessageSource,name=<channelName>,bean=<source>

The bean attribute in the object names for sources and handlers takes one of the values in the table below:

Table 10.4. bean ObjectName Part

Bean ValueDescription

endpoint

The bean name of the enclosing endpoint (e.g. <service-activator>) if there is one

anonymous

An indication that the enclosing endpoint didn’t have a user-specified bean name, so the JMX name is the input channel name

internal

For well-known Spring Integration default components

handler/source

None of the above: fallback to the toString() of the object being monitored (handler or source)


Custom elements can be appended to the object name by providing a reference to a Properties object in the object-name-static-properties attribute.

Also, since Spring Integration 3.0, you can use a custom ObjectNamingStrategy using the object-naming-strategy attribute. This permits greater control over the naming of the MBeans. For example, to group all Integration MBeans under an Integration type. A simple custom naming strategy implementation might be:

public class Namer implements ObjectNamingStrategy {

	private final ObjectNamingStrategy realNamer = new KeyNamingStrategy();
	@Override
	public ObjectName getObjectName(Object managedBean, String beanKey) throws MalformedObjectNameException {
		String actualBeanKey = beanKey.replace("type=", "type=Integration,componentType=");
		return realNamer.getObjectName(managedBean, actualBeanKey);
	}

}

The beanKey argument is a String containing the standard object name beginning with the default-domain and including any additional static properties. This example simply moves the standard type part to componentType and sets the type to Integration, enabling selection of all Integration MBeans in one query:"my.domain:type=Integration,*. This also groups the beans under one tree entry under the domain in tools like VisualVM.

[Note]Note

The default naming strategy is a MetadataNamingStrategy. The exporter propagates the default-domain to that object to allow it to generate a fallback object name if parsing of the bean key fails. If your custom naming strategy is a MetadataNamingStrategy (or subclass), the exporter will not propagate the default-domain; you will need to configure it on your strategy bean.

JMX Improvements

Version 4.2 introduced some important improvements, representing a fairly major overhaul to the JMX support in the framework. These resulted in a significant performance improvement of the JMX statistics collection and much more control thereof, but has some implications for user code in a few specific (uncommon) situations. These changes are detailed below, with a caution where necessary.

  • Metrics Capture

Previously, MessageSource, MessageChannel and MessageHandler metrics were captured by wrapping the object in a JDK dynamic proxy to intercept appropriate method calls and capture the statistics. The proxy was added when an integration MBean exporter was declared in the context.

Now, the statistics are captured by the beans themselves; see Section 10.1, “Metrics and Management” for more information.

[Warning]Warning

This change means that you no longer automatically get an MBean or statistics for custom MessageHandler implementations, unless those custom handlers extend AbstractMessageHandler. The simplest way to resolve this is to extend AbstractMessageHandler. If that’s not possible, or desired, another work-around is to implement the MessageHandlerMetrics interface. For convenience, a DefaultMessageHandlerMetrics is provided to capture and report statistics. Invoke the beforeHandle and afterHandle at the appropriate times. Your MessageHandlerMetrics methods can then delegate to this object to obtain each statistic. Similarly, MessageSource implementations must extend AbstractMessageSource or implement MessageSourceMetrics. Message sources only capture a count so there is no provided convenience class; simply maintain the count in an AtomicLong field.

The removal of the proxy has two additional benefits; 1) stack traces in exceptions are reduced (when JMX is enabled) because the proxy is not on the stack; 2) cases where 2 MBeans were exported for the same bean now only export a single MBean with consolidated attributes/operations (see the MBean consolidation bullet below).

  • Resolution

System.nanoTime() is now used to capture times instead of System.currentTimeMillis(). This may provide more accuracy on some JVMs, espcially when durations of less than 1 millisecond are expected

  • Setting Initial Statistics Collection State

Previously, when JMX was enabled, all sources, channels, handlers captured statistics. It is now possible to control whether the statisics are enabled on an individual component. Further, it is possible to capture simple counts on MessageChannel s and MessageHandler s instead of the complete time-based statistics. This can have significant performance implications because you can selectively configure where you need detailed statistics, as well as enable/disable at runtime.

See Section 10.1, “Metrics and Management”.

  • @IntegrationManagedResource

Similar to the @ManagedResource annotation, the @IntegrationManagedResource marks a class as eligible to be exported as an MBean; however, it will only be exported if there is an IntegrationMBeanExporter in the application context.

Certain Spring Integration classes (in the org.springframework.integration) package) that were previously annotated with`@ManagedResource` are now annotated with both @ManagedResource and @IntegrationManagedResource. This is for backwards compatibility (see the next bullet). Such MBeans will be exported by any context MBeanServeror an IntegrationMBeanExporter (but not both - if both exporters are present, the bean is exported by the integration exporter if the bean matches a managed-components pattern).

  • Consolidated MBeans

Certain classes within the framework (mapping routers for example) have additional attributes/operations over and above those provided by metrics and Lifecycle. We will use a Router as an example here.

Previously, beans of these types were exported as two distinct MBeans:

1) the metrics MBean (with an objectName such as: intDomain:type=MessageHandler,name=myRouter,bean=endpoint). This MBean had metrics attributes and metrics/Lifecycle operations.

2) a second MBean (with an objectName such as: ctxDomain:name=org.springframework.integration.config.RouterFactoryBean#0 ,type=MethodInvokingRouter) was exported with the channel mappings attribute and operations.

Now, the attributes and operations are consolidated into a single MBean. The objectName will depend on the exporter. If exported by the integration MBean exporter, the objectName will be, for example: intDomain:type=MessageHandler,name=myRouter,bean=endpoint. If exported by another exporter, the objectName will be, for example: ctxDomain:name=org.springframework.integration.config.RouterFactoryBean#0 ,type=MethodInvokingRouter. There is no difference between these MBeans (aside from the objectName), except that the statistics will not be enabled (the attributes will be 0) by exporters other than the integration exporter; statistics can be enabled at runtime using the JMX operations. When exported by the integration MBean exporter, the initial state can be managed as described above.

[Warning]Warning

If you are currently using the second MBean to change, for example, channel mappings, and you are using the integration MBean exporter, note that the objectName has changed because of the MBean consolidation. There is no change if you are not using the integration MBean exporter.

  • MBean Exporter Bean Name Patterns

Previously, the managed-components patterns were inclusive only. If a bean name matched one of the patterns it would be included. Now, the pattern can be negated by prefixing it with !. i.e. "!foo*, foox" will match all beans that don’t start with foo, except foox. Patterns are evaluated left to right and the first match (positive or negative) wins and no further patterns are applied.

[Warning]Warning

The addition of this syntax to the pattern causes one possible (although perhaps unlikely) problem. If you have a bean "!foo"and you included a pattern "!foo" in your MBean exporter’s managed-components patterns; it will no long match; the pattern will now match all beans not named foo. In this case, you can escape the ! in the pattern with \. The pattern "\!foo" means match a bean named "!foo".

  • IntegrationMBeanExporter changes

The IntegrationMBeanExporter no longer implements SmartLifecycle; this means that start() and stop() operations are no longer available to register/unregister MBeans. The MBeans are now registered during context initialization and unregistered when the context is destroyed.

Orderly Shutdown Managed Operation

The MBean exporter provides a JMX operation to shut down the application in an orderly manner, intended for use before terminating the JVM.

public void stopActiveComponents(long howLong)

Its use and operation are described in Section 10.7, “Orderly Shutdown”.

10.3 Message History

The key benefit of a messaging architecture is loose coupling where participating components do not maintain any awareness about one another. This fact alone makes your application extremely flexible, allowing you to change components without affecting the rest of the flow, change messaging routes,   message consuming styles (polling vs event driven), and so on. However, this unassuming style of architecture could prove to be difficult when things go wrong. When debugging, you would probably like to get as much information about the message as you can (its origin, channels it has traversed, etc.)

Message History is one of those patterns that helps by giving you an option to maintain some level of awareness of a message path either for debugging purposes or to maintain an audit trail. Spring integration provides a simple way to configure your message flows to maintain the Message History by adding a header to the Message and updating that header every time a message passes through a tracked component.

10.3.1 Message History Configuration

To enable Message History all you need is to define the message-history element in your configuration.

<int:message-history/>

Now every named component (component that has an id defined) will be tracked. The framework will set the history header in your Message. Its value is very simple - List<Properties>.

<int:gateway id="sampleGateway" 
    service-interface="org.springframework.integration.history.sample.SampleGateway"
    default-request-channel="bridgeInChannel"/>

<int:chain id="sampleChain" input-channel="chainChannel" output-channel="filterChannel">
  <int:header-enricher>
    <int:header name="baz" value="baz"/>
  </int:header-enricher>
</int:chain>

The above configuration will produce a very simple Message History structure:

[{name=sampleGateway, type=gateway, timestamp=1283281668091},
 {name=sampleChain, type=chain, timestamp=1283281668094}]

To get access to Message History all you need is access the MessageHistory header. For example:

Iterator<Properties> historyIterator =
    message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));

You might not want to track all of the components. To limit the history to certain components based on their names, all you need is provide the tracked-components attribute and specify a comma-delimited list of component names and/or patterns that match the components you want to track.

<int:message-history tracked-components="*Gateway, sample*, foo"/>

In the above example, Message History will only be maintained for all of the components that end with Gateway, start with sample, or match the name foo exactly.

Starting with version 4.0, you can also use the @EnableMessageHistory annotation in a @Configuration class. In addition, the MessageHistoryConfigurer bean is now exposed as a JMX MBean by the IntegrationMBeanExporter (see Section 10.2.7, “MBean Exporter”), allowing the patterns to be changed at runtime. Note, however, that the bean must be stopped (turning off message history) in order to change the patterns. This feature might be useful to temporarily turn on history to analyze a system. The MBean’s object name is "<domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer".

[Important]Important

If multiple beans (declared by @EnableMessageHistory and/or <message-history/>) they all must have identical component name patterns (when trimmed and sorted). Do not use a generic <bean/> definition for the MessageHistoryConfigurer.

[Note]Note

Remember that by definition the Message History header is immutable (you can’t re-write history, although some try). Therefore, when writing Message History values, the components are either creating brand new Messages (when the component is an origin), or they are copying the history from a request Message, modifying it and setting the new list on a reply Message. In either case, the values can be appended even if the Message itself is crossing thread boundaries. That means that the history values can greatly simplify debugging in an asynchronous message flow.

10.4 Message Store

Enterprise Integration Patterns (EIP) identifies several patterns that have the capability to buffer messages. For example, an Aggregator buffers messages until they can be released and a QueueChannel buffers messages until consumers explicitly receive those messages from that channel. Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.

To mitigate the risk of losing Messages, EIP defines the Message Store pattern which allows EIP components to store Messages typically in some type of persistent store (e.g. RDBMS).

Spring Integration provides support for the Message Store pattern by a) defining a org.springframework.integration.store.MessageStore strategy interface, b) providing several implementations of this interface, and c) exposing a message-store attribute on all components that have the capability to buffer messages so that you can inject any instance that implements the MessageStore interface.

Details on how to configure a specific Message Store implementation and/or how to inject a MessageStore implementation into a specific buffering component are described throughout the manual (see the specific component, such as QueueChannel, Aggregator, Delayer etc.), but here are a couple of samples to give you an idea:

QueueChannel

<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>

Aggregator

<int:aggregator  message-store="refToMessageStore"/>

By default Messages are stored in-memory using org.springframework.integration.store.SimpleMessageStore, an implementation of MessageStore. That might be fine for development or simple low-volume environments where the potential loss of non-persistent messages is not a concern. However, the typical production application will need a more robust option, not only to mitigate the risk of message loss but also to avoid potential out-of-memory errors. Therefore, we also provide MessageStore implementations for a variety of data-stores. Below is a complete list of supported implementations:

[Important]Important

However be aware of some limitations while using persistent implementations of the MessageStore.

The Message data (payload and headers) is serialized and deserialized using different serialization strategies depending on the implementation of the MessageStore. For example, when using JdbcMessageStore, only Serializable data is persisted by default. In this case non-Serializable headers are removed before serialization occurs. Also be aware of the protocol specific headers that are injected by transport adapters (e.g., FTP, HTTP, JMS etc.). For example, <http:inbound-channel-adapter/> maps HTTP-headers into Message Headers and one of them is an ArrayList of non-Serializable org.springframework.http.MediaType instances. However you are able to inject your own implementation of the Serializer and/or Deserializer strategy interfaces into some MessageStore implementations (such as JdbcMessageStore) to change the behaviour of serialization and deserialization.

Special attention must be paid to the headers that represent certain types of data. For example, if one of the headers contains an instance of some Spring Bean, upon deserialization you may end up with a different instance of that bean, which directly affects some of the implicit headers created by the framework (e.g., REPLY_CHANNEL or ERROR_CHANNEL). Currently they are not serializable, but even if they were, the deserialized channel would not represent the expected instance.

Beginning with Spring Integration version 3.0, this issue can be resolved with a header enricher, configured to replace these headers with a name after registering the channel with the HeaderChannelRegistry.

Also when configuring a message-flow like this: gateway → queue-channel (backed by a persistent Message Store) → service-activator That gateway creates a Temporary Reply Channel, and it will be lost by the time the service-activator’s poller reads from the queue. Again, you can use the header enricher to replace the headers with a String representation.

For more information, refer to the Section 7.2.2, “Header Enricher”.

Spring Integration 4.0 introduced two new interfaces ChannelMessageStore - to implement operations specific for QueueChannel s, PriorityCapableChannelMessageStore - to mark MessageStore implementation to be used for PriorityChannel s and to provide priority order for persisted Messages. The real behaviour depends on implementation. The Framework provides these implementations, which can be used as a persistent MessageStore for QueueChannel and PriorityChannel:

[Warning]Caution with SimpleMessageStore

Starting with version 4.1, the SimpleMessageStore no longer copies the message group when calling getMessageGroup(). For large message groups, this was a significant performance problem. 4.0.1 introduced a boolean copyOnGet allowing this to be controlled. When used internally by the aggregator, this was set to false to improve performance. It is now false by default.

Users accessing the group store outside of components such as aggregators, will now get a direct reference to the group being used by the aggregator, instead of a copy. Manipulation of the group outside of the aggregator may cause unpredictable results.

For this reason, users should not perform such manipulation, or set the copyOnGet property to true.

10.4.1 MessageGroupFactory

Starting with version 4.3, some MessageGroupStore implementations can be injected with a custom MessageGroupFactory strategy to create/customize the MessageGroup instances used by the MessageGroupStore. This defaults to a SimpleMessageGroupFactory which produces SimpleMessageGroup s based on the GroupType.HASH_SET (LinkedHashSet) internal collection. Other possible options are SYNCHRONISED_SET and BLOCKING_QUEUE, where the last one can be used to reinstate the previous SimpleMessageGroup behavior. Also the PERSISTENT option is available. See the next section for more information. Starting with _version 5.0.1, the LIST option is also available for use-cases when the order and uniqueness of messages in the group doesn’t matter.

10.4.2 Persistence MessageGroupStore and Lazy-Load

Starting with version 4.3, all persistence MessageGroupStore s retrieve MessageGroup s and their messages from the store with the Lazy-Load manner. In most cases it is useful for the Correlation MessageHandler s (Section 6.4, “Aggregator” and Section 6.5, “Resequencer”), when it would be an overhead to load entire MessageGroup from the store on each correlation operation.

To switch off the lazy-load behavior the AbstractMessageGroupStore.setLazyLoadMessageGroups(false) option can be used from the configuration.

Our performance tests for lazy-load on MongoDB MessageStore (Section 23.3, “MongoDB Message Store”) and <aggregator> (Section 6.4, “Aggregator”) with custom release-strategy like:

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

demonstrate this results for 1000 simple messages:

StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager

10.5 Metadata Store

Many external systems, services or resources aren’t transactional (Twitter, RSS, file system etc.) and there is no any ability to mark the data as read. Or there is just need to implement the Enterprise Integration Pattern Idempotent Receiver in some integration solutions. To achieve this goal and store some previous state of the Endpoint before the next interaction with external system, or deal with the next Message, Spring Integration provides the Metadata Store component being an implementation of the org.springframework.integration.metadata.MetadataStore interface with a general key-value contract.

The Metadata Store is designed to store various types of generic meta-data (e.g., published date of the last feed entry that has been processed) to help components such as the Feed adapter deal with duplicates. If a component is not directly provided with a reference to a MetadataStore, the algorithm for locating a metadata store is as follows: First, look for a bean with id metadataStore in the ApplicationContext. If one is found then it will be used, otherwise it will create a new instance of SimpleMetadataStore which is an in-memory implementation that will only persist metadata within the lifecycle of the currently running Application Context. This means that upon restart you may end up with duplicate entries.

If you need to persist metadata between Application Context restarts, these persistent MetadataStores are provided by the framework:

The PropertiesPersistingMetadataStore is backed by a properties file and a PropertiesPersister.

By default, it only persists the state when the application context is closed normally. It implements Flushable so you can persist the state at will, be invoking flush().

<bean id="metadataStore"
    class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>

Alternatively, you can provide your own implementation of the MetadataStore interface (e.g. JdbcMetadataStore) and configure it as a bean in the Application Context.

Starting with version 4.0, SimpleMetadataStore, PropertiesPersistingMetadataStore and RedisMetadataStore implement ConcurrentMetadataStore. These provide for atomic updates and can be used across multiple component or application instances.

10.5.1 Idempotent Receiver and Metadata Store

The Metadata Store is useful for implementing the EIP Idempotent Receiver pattern, when there is need to filter an incoming Message if it has already been processed, and just discard it or perform some other logic on discarding. The following configuration is an example of how to do this:

<int:filter input-channel="serviceChannel"
			output-channel="idempotentServiceChannel"
			discard-channel="discardChannel"
			expression="@metadataStore.get(headers.businessKey) == null"/>

<int:publish-subscribe-channel id="idempotentServiceChannel"/>

<int:outbound-channel-adapter channel="idempotentServiceChannel"
                              expression="@metadataStore.put(headers.businessKey, '')"/>

<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>

The value of the idempotent entry may be some expiration date, after which that entry should be removed from Metadata Store by some scheduled reaper.

Also see Section 8.9.11, “Idempotent Receiver Enterprise Integration Pattern”.

10.5.2 MetadataStoreListener

Some metadata stores (currently only zookeeper) support registering a listener to receive events when items change.

public interface MetadataStoreListener {

	void onAdd(String key, String value);

	void onRemove(String key, String oldValue);

	void onUpdate(String key, String newValue);
}

See the javadocs for more information. The MetadataStoreListenerAdapter can be subclassed if you are only interested in a subset of events.

10.6 Control Bus

As described in (EIP), the idea behind the Control Bus is that the same messaging system can be used for monitoring and managing the components within the framework as is used for "application-level" messaging. In Spring Integration we build upon the adapters described above so that it’s possible to send Messages as a means of invoking exposed operations.

<int:control-bus input-channel="operationChannel"/>

The Control Bus has an input channel that can be accessed for invoking operations on the beans in the application context. It also has all the common properties of a service activating endpoint, e.g. you can specify an output channel if the result of the operation has a return value that you want to send on to a downstream channel.

The Control Bus executes messages on the input channel as Spring Expression Language expressions. It takes a message, compiles the body to an expression, adds some context, and then executes it. The default context supports any method that has been annotated with @ManagedAttribute or @ManagedOperation. It also supports the methods on Spring’s Lifecycle interface, and it supports methods that are used to configure several of Spring’s TaskExecutor and TaskScheduler implementations. The simplest way to ensure that your own methods are available to the Control Bus is to use the @ManagedAttribute and/or @ManagedOperation annotations. Since those are also used for exposing methods to a JMX MBean registry, it’s a convenient by-product (often the same types of operations you want to expose to the Control Bus would be reasonable for exposing via JMX). Resolution of any particular instance within the application context is achieved in the typical SpEL syntax. Simply provide the bean name with the SpEL prefix for beans (@). For example, to execute a method on a Spring Bean a client could send a message to the operation channel as follows:

Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)

The root of the context for the expression is the Message itself, so you also have access to the payload and headers as variables within your expression. This is consistent with all the other expression support in Spring Integration endpoints.

With Java and Annotations the Control Bus can be configured as follows:

@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
    return new ExpressionControlBusFactoryBean();
}

Or, when using Java DSL flow definitions:

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from("controlBus")
              .controlBus()
              .get();
}

Or, if you prefer Lambda style with automatic DirectChannel creation:

@Bean
public IntegrationFlow controlBus() {
    return IntegrationFlowDefinition::controlBus;
}

In this case, the channel is named controlBus.input.

10.7 Orderly Shutdown

As described in Section 10.2.7, “MBean Exporter”, the MBean exporter provides a JMX operation stopActiveComponents, which is used to stop the application in an orderly manner. The operation has a single long parameter. The parameter indicates how long (in milliseconds) the operation will wait to allow in-flight messages to complete. The operation works as follows:

The first step calls beforeShutdown() on all beans that implement OrderlyShutdownCapable. This allows such components to prepare for shutdown. Examples of components that implement this interface, and what they do with this call include: JMS and AMQP message-driven adapters stop their listener containers; TCP server connection factories stop accepting new connections (while keeping existing connections open); TCP inbound endpoints drop (log) any new messages received; http inbound endpoints return 503 - Service Unavailable for any new requests.

The second step stops any active channels, such as JMS- or AMQP-backed channels.

The third step stops all MessageSource s.

The fourth step stops all inbound MessageProducer s (that are not OrderlyShutdownCapable).

The fifth step waits for any remaining time left, as defined by the value of the long parameter passed in to the operation. This is intended to allow any in-flight messages to complete their journeys. It is therefore important to select an appropriate timeout when invoking this operation.

The sixth step calls afterShutdown() on all OrderlyShutdownCapable components. This allows such components to perform final shutdown tasks (closing all open sockets, for example).

As discussed in the section called “Orderly Shutdown Managed Operation” this operation can be invoked using JMX. If you wish to programmatically invoke the method, you will need to inject, or otherwise get a reference to, the IntegrationMBeanExporter. If no id attribute is provided on the <int-jmx:mbean-export/> definition, the bean will have a generated name. This name contains a random component to avoid ObjectName collisions if multiple Spring Integration contexts exist in the same JVM (MBeanServer).

For this reason, if you wish to invoke the method programmatically, it is recommended that you provide the exporter with an id attribute so it can easily be accessed in the application context.

Finally, the operation can be invoked using the <control-bus>; see the monitoring Spring Integration sample application for details.

[Important]Important

The above algorithm was improved in version 4.1. Previously, all task executors and schedulers were stopped. This could cause mid-flow messages in QueueChannel s to remain. Now, the shutdown leaves pollers running in order to allow these messages to be drained and processed.

10.8 Integration Graph

Starting with version 4.3, Spring Integration provides access to an application’s runtime object model which can, optionally, include component metrics. It is exposed as a graph, which may be used to visualize the current state of the integration application. The o.s.i.support.management.graph package contains all the required classes to collect, build and render the runtime state of Spring Integration components as a single tree-like Graph object. The IntegrationGraphServer should be declared as a bean to build, retrieve and refresh the Graph object. The resulting Graph object can be serialized to any format, although JSON is flexible and convenient to parse and represent on the client side. A simple Spring Integration application with only the default components would expose a graph as follows:

{
  "contentDescriptor": {
    "providerVersion": "4.3.0.RELEASE",
    "providerFormatVersion": 1.0,
    "provider": "spring-integration",
    "name": "myApplication"
  },
  "nodes": [
    {
      "nodeId": 1,
      "name": "nullChannel",
      "stats": null,
      "componentType": "channel"
    },
    {
      "nodeId": 2,
      "name": "errorChannel",
      "stats": null,
      "componentType": "publish-subscribe-channel"
    },
    {
      "nodeId": 3,
      "name": "_org.springframework.integration.errorLogger",
      "stats": {
        "duration": {
          "count": 0,
          "min": 0.0,
          "max": 0.0,
          "mean": 0.0,
          "standardDeviation": 0.0,
          "countLong": 0
        },
        "errorCount": 0,
        "standardDeviationDuration": 0.0,
        "countsEnabled": true,
        "statsEnabled": true,
        "loggingEnabled": false,
        "handleCount": 0,
        "meanDuration": 0.0,
        "maxDuration": 0.0,
        "minDuration": 0.0,
        "activeCount": 0
      },
      "componentType": "logging-channel-adapter",
      "output": null,
      "input": "errorChannel"
    }
  ],
  "links": [
    {
      "from": 2,
      "to": 3,
      "type": "input"
    }
  ]
}

As you can see, the graph consists of three top-level elements.

The contentDescriptor graph element is pretty straightforward and contains general information about the application providing the data. The name can be customized on the IntegrationGraphServer bean or via spring.application.name application context environment property. Other properties are provided by the framework and allows you to distinguish a similar model from other sources.

The links graph element represents connections between nodes from the nodes graph element and, therefore, between integration components in the source Spring Integration application. For example from a MessageChannel to an EventDrivenConsumer with some MessageHandler; or from an AbstractReplyProducingMessageHandler to a MessageChannel. For the convenience and to allow to determine a link purpose, the model is supplied with the type attribute. The possible types are:

  • input - identify the direction from MessageChannel to the endpoint; inputChannel or requestChannel property;
  • output - the direction from MessageHandler, MessageProducer or SourcePollingChannelAdapter to the MessageChannel via an outputChannel or replyChannel property;
  • error - from MessageHandler on PollingConsumer or MessageProducer or SourcePollingChannelAdapter to the MessageChannel via an errorChannel property;
  • discard - from DiscardingMessageHandler (e.g. MessageFilter) to the MessageChannel via errorChannel property.
  • route - from AbstractMappingMessageRouter (e.g. HeaderValueRouter) to the MessageChannel. Similar to output but determined at run-time. May be a configured channel mapping, or a dynamically resolved channel. Routers will typically only retain up to 100 dynamic routes for this purpose, but this can be modified using the dynamicChannelLimit property.

The information from this element can be used by a visualizing tool to render connections between nodes from the nodes graph element, where the from and to numbers represent the value from the nodeId property of the linked nodes. For example the link type can be used to determine the proper port on the target node:

              +---(discard)
              |
         +----o----+
         |         |
         |         |
         |         |
(input)--o         o---(output)
         |         |
         |         |
         |         |
         +----o----+
              |
              +---(error)

The nodes graph element is perhaps the most interesting because its elements contain not only the runtime components with their componentType s and name s, but can also optionally contain metrics exposed by the component. Node elements contain various properties which are generally self-explanatory. For example, expression-based components include the expression property containing the primary expression string for the component. To enable the metrics, add an @EnableIntegrationManagement to some @Configuration class or add an <int:management/> element to your XML configuration. You can control exactly which components in the framework collect statistics. See Section 10.1, “Metrics and Management” for complete information. See the stats attribute from the _org.springframework.integration.errorLogger component in the JSON example above. The nullChannel and errorChannel don’t provide statistics information in this case, because the configuration for this example was:

@Configuration
@EnableIntegration
@EnableIntegrationManagement(statsEnabled = "_org.springframework.integration.errorLogger.handler",
      countsEnabled = "!*",
      defaultLoggingEnabled = "false")
public class ManagementConfiguration {

    @Bean
    public IntegrationGraphServer integrationGraphServer() {
        return new IntegrationGraphServer();
    }

}

The nodeId represents a unique incremental identifier to distinguish one component from another. It is also used in the links element to represent a relationship (connection) of this component to others, if any. The input and output attributes are for the inputChannel and outputChannel properties of the AbstractEndpoint, MessageHandler, SourcePollingChannelAdapter or MessageProducerSupport. See the next paragraph for more information.

10.8.1 Graph Runtime Model

Spring Integration components have various levels of complexity. For example, any polled MessageSource also has a SourcePollingChannelAdapter and a MessageChannel to which to send messages from the source data periodically. Other components might be middleware request-reply components, e.g. JmsOutboundGateway, with a consuming AbstractEndpoint to subscribe to (or poll) the requestChannel (input) for messages, and a replyChannel (output) to produce a reply message to send downstream. Meanwhile, any MessageProducerSupport implementation (e.g. ApplicationEventListeningMessageProducer) simply wraps some source protocol listening logic and sends messages to the outputChannel.

Within the graph, Spring Integration components are represented using the IntegrationNode class hierarchy, which you can find in the o.s.i.support.management.graph package. For example the ErrorCapableDiscardingMessageHandlerNode could be used for the AggregatingMessageHandler (because it has a discardChannel option) and can produce errors when consuming from a PollableChannel using a PollingConsumer. Another sample is CompositeMessageHandlerNode - for a MessageHandlerChain when subscribed to a SubscribableChannel, using an EventDrivenConsumer.

[Note]Note

The @MessagingGateway (see Section 8.4, “Messaging Gateways”) provides nodes for each its method, where the name attribute is based on the gateway’s bean name and the short method signature. For example the gateway:

@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {

	void foo(String foo);

	void foo(Integer foo);

	void bar(String bar);

}

produces nodes like:

{
  "nodeId" : 10,
  "name" : "gate.bar(class java.lang.String)",
  "stats" : null,
  "componentType" : "gateway",
  "output" : "four",
  "errors" : null
},
{
  "nodeId" : 11,
  "name" : "gate.foo(class java.lang.String)",
  "stats" : null,
  "componentType" : "gateway",
  "output" : "four",
  "errors" : null
},
{
  "nodeId" : 12,
  "name" : "gate.foo(class java.lang.Integer)",
  "stats" : null,
  "componentType" : "gateway",
  "output" : "four",
  "errors" : null
}

This IntegrationNode hierarchy can be used for parsing the graph model on the client side, as well as for the understanding the general Spring Integration runtime behavior. See also Section 3.8, “Programming Tips and Tricks” for more information.

10.9 Integration Graph Controller

If your application is WEB-based (or built on top of Spring Boot using an embedded web container) and the Spring Integration HTTP or WebFlux module (see Chapter 18, HTTP Support and Chapter 34, WebFlux Support) is present on the classpath, you can use a IntegrationGraphController to expose the IntegrationGraphServer functionality as a REST service. For this purpose, the @EnableIntegrationGraphController @Configuration class annotation and the <int-http:graph-controller/> XML element, are available in the HTTP module. Together with the @EnableWebMvc annotation (or <mvc:annotation-driven/> for xml definitions), this configuration registers an IntegrationGraphController @RestController where its @RequestMapping.path can be configured on the @EnableIntegrationGraphController annotation or <int-http:graph-controller/> element. The default path is /integration.

The IntegrationGraphController @RestController provides these services:

  • @GetMapping(name = "getGraph") - to retrieve the state of the Spring Integration components since the last IntegrationGraphServer refresh. The o.s.i.support.management.graph.Graph is returned as a @ResponseBody of the REST service;
  • @GetMapping(path = "/refresh", name = "refreshGraph") - to refresh the current Graph for the actual runtime state and return it as a REST response. It is not necessary to refresh the graph for metrics, they are provided in real-time when the graph is retrieved. Refresh can be called if the application context has been modified since the graph was last retrieved and the graph is completely rebuilt.

Any Security and Cross Origin restrictions for the IntegrationGraphController can be achieved with the standard configuration options and components provided by Spring Security and Spring MVC projects. A simple example of that follows:

<mvc:annotation-driven />

<mvc:cors>
	<mvc:mapping path="/myIntegration/**"
				 allowed-origins="http://localhost:9090"
				 allowed-methods="GET" />
</mvc:cors>

<security:http>
    <security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>


<int-http:graph-controller path="/myIntegration" />

The Java & Annotation Configuration variant follows; note that, for convenience, the annotation provides an allowedOrigins attribute; this just provides GET access to the path. For more sophistication, you can configure the CORS mappings using standard Spring MVC mechanisms.

@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) throws Exception {
	    http
            .authorizeRequests()
               .antMatchers("/testIntegration/**").hasRole("ADMIN")
            // ...
            .formLogin();
    }

    //...

}