Note | |
---|---|
Prior to version 4.2 metrics were only available when JMX was enabled. See Section 9.2, “JMX Support”. |
To enable MessageSource
, MessageChannel
and MessageHandler
metrics, add an <int:management/>
bean to the
application context, or annotate one of your @Configuration
classes with @EnableIntegrationManagement
.
MessageSource
s only maintain counts, MessageChannel
s and MessageHandler
s maintain duration statistics in
addition to counts.
See Section 9.1.2, “MessageChannel Metric Features” and Section 9.1.3, “MessageHandler Metric Features” below.
This causes the automatic registration of the IntegrationManagementConfigurer
bean in the application context.
Only one such bean can exist in the context and it must have the bean name integrationManagementConfigurer
if registered manually via a <bean/>
definition.
This bean applies it’s configuration to beans after all beans in the context have been instantiated.
In addition to metrics, you can control debug logging in the main message flow.
It has been found that in very high volume applications, even calls to isDebugEnabled()
can be quite expensive with
some logging subsystems.
You can disable all such logging to avoid this overhead; exception logging (debug or otherwise) are not affected
by this setting.
A number of options are available:
<int:management default-logging-enabled="true" default-counts-enabled="false" default-stats-enabled="false" counts-enabled-patterns="foo, !baz, ba*" stats-enabled-patterns="fiz, buz" metrics-factory="myMetricsFactory" />
@Configuration @EnableIntegration @EnableIntegrationManagement( defaultLoggingEnabled = "true", defaultCountsEnabled = "false", defaultStatsEnabled = "false", countsEnabled = { "foo", "${count.patterns}" }, statsEnabled = { "qux", "!*" }, MetricsFactory = "myMetricsFactory") public static class ContextConfiguration { ... }
Set to | |
Enable or disable count metrics for components not matching one of the patterns in <4>.
Only applied if you have not explicitly configured the setting in a bean definition.
Default | |
Enable or disable statistical metrics for components not matching one of the patterns in <5>. Only applied if you have not explicitly configured the setting in a bean definition. Default false. | |
A comma-delimited list of patterns for beans for which counts should be enabled; negate the pattern with | |
A comma-delimited list of patterns for beans for which statistical metrics should be enabled; negate the pattern
with | |
A reference to a |
At runtime, counts and statistics can be obtained by calling IntegrationManagementConfigurer
getChannelMetrics
,
getHandlerMetrics
and getSourceMetrics
, returning MessageChannelMetrics
, MessageHandlerMetrics
and
MessageSourceMetrics
respectively.
See the javadocs for complete information about these classes.
When JMX is enabled (see Section 9.2, “JMX Support”), these metrics are also exposed by the IntegrationMBeanExporter
.
Important | |
---|---|
|
Message channels report metrics according to their concrete type.
If you are looking at a DirectChannel
, you will see statistics for the send operation.
If it is a QueueChannel
, you will also see statistics for the receive operation, as well as the count of messages that are currently buffered by this QueueChannel
.
In both cases there are some metrics that are simple counters (message count and error count), and some that are estimates of averages of interesting quantities.
The algorithms used to calculate these estimates are described briefly in the section below.
Table 9.1. MessageChannel Metrics
Metric Type | Example | Algorithm |
---|---|---|
Count | Send Count | Simple incrementer. Increases by one when an event occurs. |
Error Count | Send Error Count | Simple incrementer. Increases by one when an send results in an error. |
Duration | Send Duration (method execution time in milliseconds) | Exponential Moving Average with decay factor (10 by default). Average of the method execution time over roughly the last 10 (default) measurements. |
Rate | Send Rate (number of operations per second) | Inverse of Exponential Moving Average of the interval between events with decay in time (lapsing over 60 seconds by default) and per measurement (last 10 events by default). |
Error Rate | Send Error Rate (number of errors per second) | Inverse of Exponential Moving Average of the interval between error events with decay in time (lapsing over 60 seconds by default) and per measurement (last 10 events by default). |
Ratio | Send Success Ratio (ratio of successful to total sends) | Estimate the success ratio as the Exponential Moving Average of the series composed of values 1 for success and 0 for failure (decaying as per the rate measurement over time and events by default). Error ratio is 1 - success ratio. |
The following table shows the statistics maintained for message handlers. Some metrics are simple counters (message count and error count), and one is an estimate of averages of send duration. The algorithms used to calculate these estimates are described briefly in the table below:
Table 9.2. MessageHandlerMetrics
Metric Type | Example | Algorithm |
---|---|---|
Count | Handle Count | Simple incrementer. Increases by one when an event occurs. |
Error Count | Handler Error Count | Simple incrementer. Increases by one when an invocation results in an error. |
Active Count | Handler Active Count | Indicates the number of currently active threads currently invoking the handler (or any downstream synchronous flow). |
Duration | Handle Duration (method execution time in milliseconds) | Exponential Moving Average with decay factor (10 by default). Average of the method execution time over roughly the last 10 (default) measurements. |
A feature of the time-based average estimates is that they decay with time if no new measurements arrive. To help interpret the behaviour over time, the time (in seconds) since the last measurement is also exposed as a metric.
There are two basic exponential models: decay per measurement (appropriate for duration and anything where the number of measurements is part of the metric), and decay per time unit (more suitable for rate measurements where the time in between measurements is part of the metric). Both models depend on the fact that
S(n) = sum(i=0,i=n) w(i) x(i)
has a special form when w(i) = r^i
, with r=constant
:
S(n) = x(n) + r S(n-1)
(so you only have to store S(n-1)
, not the whole series x(i)
, to generate a new metric estimate from the last measurement).
The algorithms used in the duration metrics use r=exp(-1/M)
with M=10
.
The net effect is that the estimate S(n)
is more heavily weighted to recent measurements and is composed roughly of the last M
measurements.
So M
is the "window" or lapse rate of the estimate In the case of the vanilla moving average, i
is a counter over the number of measurements.
In the case of the rate we interpret i
as the elapsed time, or a combination of elapsed time and a counter (so the metric estimate contains contributions roughly from the last M
measurements and the last T
seconds).
A new strategy interface MetricsFactory
has been introduced allowing you to provide custom channel metrics for your
MessageChannel
s and MessageHandler
s.
By default, a DefaultMetricsFactory
provides default implementation of MessageChannelMetrics
and
MessageHandlerMetrics
which are described in the next bullet.
To override the default MetricsFactory
configure it as described above, by providing a reference to your
MetricsFactory
bean instance.
You can either customize the default implementations as described in the next bullet, or provide completely different
implementations by extending AbstractMessageChannelMetrics
and/or AbstractMessageHandlerMetrics
.
In addition to the default metrics factory described above, the framework provides the AggregatingMetricsFactory
.
This factory creates AggregatingMessageChannelMetrics
and AggregatingMessageHandlerMetrics
.
In very high volume scenarios, the cost of capturing statistics can be prohibitive (2 calls to the system time and
storing the data for each message).
The aggregating metrics aggregate the response time over a sample of messages.
This can save significant CPU time.
Caution | |
---|---|
The statistics will be skewed if messages arrive in bursts. These metrics are intended for use with high, constant-volume, message rates. |
<bean id="aggregatingMetricsFactory" class="org.springframework.integration.support.management.AggregatingMetricsFactory"> <constructor-arg value="1000" /> <!-- sample size --> </bean>
The above configuration aggregates the duration over 1000 messages. Counts (send, error) are maintained per-message but the statistics are per 1000 messages.
See Section 9.1.4, “Time-Based Average Estimates” and the Javadocs for the ExponentialMovingAverage*
classes for more information about these
values.
By default, the DefaultMessageChannelMetrics
and DefaultMessageHandlerMetrics
use a window
of 10 measurements,
a rate period of 1 second (rate per second) and a decay lapse period of 1 minute.
If you wish to override these defaults, you can provide a custom MetricsFactory
that returns appropriately configured
metrics and provide a reference to it to the MBean exporter as described above.
Example:
public static class CustomMetrics implements MetricsFactory { @Override public AbstractMessageChannelMetrics createChannelMetrics(String name) { return new DefaultMessageChannelMetrics(name, new ExponentialMovingAverage(20, 1000000.), new ExponentialMovingAverageRate(2000, 120000, 30, true), new ExponentialMovingAverageRatio(130000, 40, true), new ExponentialMovingAverageRate(3000, 140000, 50, true)); } @Override public AbstractMessageHandlerMetrics createHandlerMetrics(String name) { return new DefaultMessageHandlerMetrics(name, new ExponentialMovingAverage(20, 1000000.)); } }
The customizations described above are wholesale and will apply to all appropriate beans exported by the MBean exporter. This is the extent of customization available using XML configuration.
Individual beans can be provided with different implementations using java @Configuration
or programmatically at
runtime, after the application context has been refreshed, by invoking the configureMetrics
methods on
AbstractMessageChannel
and AbstractMessageHandler
.
Previously, the time-based metrics (see Section 9.1.4, “Time-Based Average Estimates”) were calculated in real time.
The statistics are now calculated when retrieved instead.
This resulted in a significant performance improvement, at the expense of a small amount of additional memory for each statistic.
As discussed in the bullet above, the statistics can be disabled altogether, while retaining the MBean allowing the invocation of Lifecycle
methods.
Spring Integration provides Channel Adapters for receiving and publishing JMX Notifications. There is also an_Inbound Channel Adapter_ for polling JMX MBean attribute values, and an Outbound Channel Adapter for invoking JMX MBean operations.
The Notification-listening Channel Adapter requires a JMX ObjectName for the MBean that publishes notifications to which this listener should be registered. A very simple configuration might look like this:
<int-jmx:notification-listening-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=publisher"/>
Tip | |
---|---|
The notification-listening-channel-adapter registers with an |
The adapter can also accept a reference to a NotificationFilter
and a handback Object to provide some context that is passed back with each Notification.
Both of those attributes are optional.
Extending the above example to include those attributes as well as an explicit MBeanServer
bean name would produce the following:
<int-jmx:notification-listening-channel-adapter id="adapter" channel="channel" mbean-server="someServer" object-name="example.domain:name=somePublisher" notification-filter="notificationFilter" handback="myHandback"/>
The Notification-listening Channel Adapter is event-driven and registered with the MBeanServer
directly.
It does not require any poller configuration.
Note | |
---|---|
For this component only, the object-name attribute can contain an ObjectName pattern (e.g. "org.foo:type=Bar,name=*") and the adapter will receive notifications from all MBeans with ObjectNames that match the pattern. In addition, the object-name attribute can contain a SpEL reference to a <util:list/> of ObjectName patterns: <jmx:notification-listening-channel-adapter id="manyNotificationsAdapter" channel="manyNotificationsChannel" object-name="#{patterns}"/> <util:list id="patterns"> <value>org.foo:type=Foo,name=*</value> <value>org.foo:type=Bar,name=*</value> </util:list> The names of the located MBean(s) will be logged when DEBUG level logging is enabled. |
The Notification-publishing Channel Adapter is relatively simple. It only requires a JMX ObjectName in its configuration as shown below.
<context:mbean-export/> <int-jmx:notification-publishing-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=publisher"/>
It does also require that an MBeanExporter
be present in the context.
That is why the <context:mbean-export/> element is shown above as well.
When Messages are sent to the channel for this adapter, the Notification is created from the Message content. If the payload is a String it will be passed as the message text for the Notification. Any other payload type will be passed as the userData of the Notification.
JMX Notifications also have a type, and it should be a dot-delimited String.
There are two ways to provide the type.
Precedence will always be given to a Message header value associated with the JmxHeaders.NOTIFICATION_TYPE
key.
On the other hand, you can rely on a fallback default-notification-type attribute provided in the configuration.
<context:mbean-export/> <int-jmx:notification-publishing-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=publisher" default-notification-type="some.default.type"/>
The Attribute Polling Channel Adapter is useful when you have a requirement, to periodically check on some value that is available through an MBean as a managed attribute. The poller can be configured in the same way as any other polling adapter in Spring Integration (or it’s possible to rely on the default poller). The object-name and attribute-name are required. An MBeanServer reference is also required, but it will automatically check for a bean named mbeanServer by default, just like the Notification-listening Channel Adapter described above.
<int-jmx:attribute-polling-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=someService" attribute-name="InvocationCount"> <int:poller max-messages-per-poll="1" fixed-rate="5000"/> </int-jmx:attribute-polling-channel-adapter>
The Tree Polling Channel Adapter queries the JMX MBean tree and sends a message with a payload that is the graph of objects that matches the query. By default the MBeans are mapped to primitives and simple Objects like Map, List and arrays - permitting simple transformation, for example, to JSON. An MBeanServer reference is also required, but it will automatically check for a bean named mbeanServer by default, just like the Notification-listening Channel Adapter described above. A basic configuration would be:
<int-jmx:tree-polling-channel-adapter id="adapter" channel="channel" query-name="example.domain:type=*"> <int:poller max-messages-per-poll="1" fixed-rate="5000"/> </int-jmx:tree-polling-channel-adapter>
This will include all attributes on the MBeans selected.
You can filter the attributes by providing an MBeanObjectConverter
that has an appropriate filter configured.
The converter can be provided as a reference to a bean definition using the converter
attribute, or as an inner <bean/> definition.
A DefaultMBeanObjectConverter
is provided which can take a MBeanAttributeFilter
in its constructor argument.
Two standard filters are provided; the NamedFieldsMBeanAttributeFilter
allows you to specify a list of attributes to include and the NotNamedFieldsMBeanAttributeFilter
allows you to specify a list of attributes to exclude.
You can also implement your own filter
The operation-invoking-channel-adapter enables Message-driven invocation of any managed operation exposed by an MBean. Each invocation requires the operation name to be invoked and the ObjectName of the target MBean. Both of these must be explicitly provided via adapter configuration:
<int-jmx:operation-invoking-channel-adapter id="adapter" object-name="example.domain:name=TestBean" operation-name="ping"/>
Then the adapter only needs to be able to discover the mbeanServer bean. If a different bean name is required, then provide the mbean-server attribute with a reference.
The payload of the Message will be mapped to the parameters of the operation, if any. A Map-typed payload with String keys is treated as name/value pairs, whereas a List or array would be passed as a simple argument list (with no explicit parameter names). If the operation requires a single parameter value, then the payload can represent that single value, and if the operation requires no parameters, then the payload would be ignored.
If you want to expose a channel for a single common operation to be invoked by Messages that need not contain headers, then that option works well.
Similar to the operation-invoking-channel-adapter Spring Integration also provides a operation-invoking-outbound-gateway, which could be used when dealing with non-void operations and a return value is required. Such return value will be sent as message payload to the reply-channel specified by this Gateway.
<int-jmx:operation-invoking-outbound-gateway request-channel="requestChannel" reply-channel="replyChannel" object-name="o.s.i.jmx.config:type=TestBean,name=testBeanGateway" operation-name="testWithReturn"/>
If the reply-channel attribute is not provided, the reply message will be sent to the channel that is identified by the IntegrationMessageHeaderAccessor.REPLY_CHANNEL
header.
That header is typically auto-created by the entry point into a message flow, such as any Gateway component.
However, if the message flow was started by manually creating a Spring Integration Message and sending it directly to a Channel, then you must specify the message header explicitly or use the provided reply-channel attribute.
Spring Integration components themselves may be exposed as MBeans when the IntegrationMBeanExporter
is configured.
To create an instance of the IntegrationMBeanExporter
, define a bean and provide a reference to an MBeanServer
and a domain name (if desired).
The domain can be left out, in which case the default domain is org.springframework.integration.
<int-jmx:mbean-export id="integrationMBeanExporter" default-domain="my.company.domain" server="mbeanServer"/> <bean id="mbeanServer" class="org.springframework.jmx.support.MBeanServerFactoryBean"> <property name="locateExistingServerIfPossible" value="true"/> </bean>
Important | |
---|---|
The MBean exporter is orthogonal to the one provided in Spring core - it registers message channels and message handlers, but not itself.
You can expose the exporter itself, and certain other components in Spring Integration, using the standard It also has a useful operation, as discussed in the section called “Orderly Shutdown Managed Operation”. |
Starting with Spring Integration 4.0 the @EnableIntegrationMBeanExport
annotation has been introduced for convenient configuration of a default (integrationMbeanExporter
) bean of type IntegrationMBeanExporter
with several useful options at the @Configuration
class level.
For example:
@Configuration @EnableIntegration @EnableIntegrationMBeanExport(server = "mbeanServer", managedComponents = "input") public class ContextConfiguration { @Bean public MBeanServerFactoryBean mbeanServer() { return new MBeanServerFactoryBean(); } }
If there is a need to provide more options, or have several IntegrationMBeanExporter
beans e.g.
for different MBean Servers, or to avoid conflicts with the standard Spring MBeanExporter
(e.g.
via @EnableMBeanExport
), you can simply configure an IntegrationMBeanExporter
as a generic bean.
All the MessageChannel
, MessageHandler
and MessageSource
instances in the application are wrapped by the MBean exporter to provide management and monitoring features.
The generated JMX object names for each component type are listed in the table below:
Table 9.3. MBean ObjectNames
Component Type | ObjectName |
---|---|
MessageChannel | o.s.i:type=MessageChannel,name=<channelName> |
MessageSource | o.s.i:type=MessageSource,name=<channelName>,bean=<source> |
MessageHandler | o.s.i:type=MessageSource,name=<channelName>,bean=<source> |
The bean attribute in the object names for sources and handlers takes one of the values in the table below:
Table 9.4. bean ObjectName Part
Bean Value | Description |
---|---|
endpoint | The bean name of the enclosing endpoint (e.g. <service-activator>) if there is one |
anonymous | An indication that the enclosing endpoint didn’t have a user-specified bean name, so the JMX name is the input channel name |
internal | For well-known Spring Integration default components |
handler/source | None of the above: fallback to the |
Custom elements can be appended to the object name by providing a reference to a Properties
object in the object-name-static-properties
attribute.
Also, since Spring Integration 3.0, you can use a custom ObjectNamingStrategy using the object-naming-strategy
attribute.
This permits greater control over the naming of the MBeans.
For example, to group all Integration MBeans under an Integration type.
A simple custom naming strategy implementation might be:
public class Namer implements ObjectNamingStrategy { private final ObjectNamingStrategy realNamer = new KeyNamingStrategy(); @Override public ObjectName getObjectName(Object managedBean, String beanKey) throws MalformedObjectNameException { String actualBeanKey = beanKey.replace("type=", "type=Integration,componentType="); return realNamer.getObjectName(managedBean, actualBeanKey); } }
The beanKey
argument is a String containing the standard object name beginning with the default-domain
and including any additional static properties.
This example simply moves the standard type
part to componentType
and sets the type
to Integration, enabling selection of all Integration MBeans in one query:"my.domain:type=Integration,*
.
This also groups the beans under one tree entry under the domain in tools like VisualVM.
Note | |
---|---|
The default naming strategy is a MetadataNamingStrategy.
The exporter propagates the |
Version 4.2 introduced some important improvements, representing a fairly major overhaul to the JMX support in the framework. These resulted in a significant performance improvement of the JMX statistics collection and much more control thereof, but has some implications for user code in a few specific (uncommon) situations. These changes are detailed below, with a caution where necessary.
Previously, MessageSource
, MessageChannel
and MessageHandler
metrics were captured by wrapping the object in a JDK dynamic proxy to intercept appropriate method calls and capture the statistics.
The proxy was added when an integration MBean exporter was declared in the context.
Now, the statistics are captured by the beans themselves; see Section 9.1, “Metrics and Management” for more information.
Warning | |
---|---|
This change means that you no longer automatically get an MBean or statistics for custom |
The removal of the proxy has two additional benefits; 1) stack traces in exceptions are reduced (when JMX is enabled) because the proxy is not on the stack; 2) cases where 2 MBeans were exported for the same bean now only export a single MBean with consolidated attributes/operations (see the MBean consolidation bullet below).
System.nanoTime()
is now used to capture times instead of System.currentTimeMillis()
.
This may provide more accuracy on some JVMs, espcially when durations of less than 1 millisecond are expected
Previously, when JMX was enabled, all sources, channels, handlers captured statistics.
It is now possible to control whether the statisics are enabled on an individual component.
Further, it is possible to capture simple counts on MessageChannel
s and MessageHandler
s instead of the complete time-based statistics.
This can have significant performance implications because you can selectively configure where you need detailed statistics, as well as enable/disable at runtime.
See Section 9.1, “Metrics and Management”.
Similar to the @ManagedResource
annotation, the @IntegrationManagedResource
marks a class as eligible to be exported as an MBean; however, it will only be exported if there is an IntegrationMBeanExporter
in the application context.
Certain Spring Integration classes (in the org.springframework.integration
) package) that were previously annotated with`@ManagedResource` are now annotated with both @ManagedResource
and @IntegrationManagedResource
.
This is for backwards compatibility (see the next bullet).
Such MBeans will be exported by any context MBeanServer
or an IntegrationMBeanExporter
(but not both - if both exporters are present, the bean is exported by the integration exporter if the bean matches a managed-components
pattern).
Certain classes within the framework (mapping routers for example) have additional attributes/operations over and above those provided by metrics and Lifecycle
.
We will use a Router
as an example here.
Previously, beans of these types were exported as two distinct MBeans:
1) the metrics MBean (with an objectName such as: intDomain:type=MessageHandler,name=myRouter,bean=endpoint
).
This MBean had metrics attributes and metrics/Lifecycle operations.
2) a second MBean (with an objectName such as:
ctxDomain:name=org.springframework.integration.config.RouterFactoryBean#0
,type=MethodInvokingRouter
)
was exported with the channel mappings attribute and operations.
Now, the attributes and operations are consolidated into a single MBean.
The objectName will depend on the exporter.
If exported by the integration MBean exporter, the objectName will be, for example: intDomain:type=MessageHandler,name=myRouter,bean=endpoint
.
If exported by another exporter, the objectName will be, for example: ctxDomain:name=org.springframework.integration.config.RouterFactoryBean#0
,type=MethodInvokingRouter
.
There is no difference between these MBeans (aside from the objectName), except that the statistics will not be enabled (the attributes will be 0) by exporters other than the integration exporter; statistics can be enabled at runtime using the JMX operations.
When exported by the integration MBean exporter, the initial state can be managed as described above.
Warning | |
---|---|
If you are currently using the second MBean to change, for example, channel mappings, and you are using the integration MBean exporter, note that the objectName has changed because of the MBean consolidation. There is no change if you are not using the integration MBean exporter. |
Previously, the managed-components
patterns were inclusive only.
If a bean name matched one of the patterns it would be included.
Now, the pattern can be negated by prefixing it with !
.
i.e.
"!foo*, foox"
will match all beans that don’t start with foo
, except foox
.
Patterns are evaluated left to right and the first match (positive or negative) wins and no further patterns are applied.
Warning | |
---|---|
The addition of this syntax to the pattern causes one possible (although perhaps unlikely) problem.
If you have a bean |
The IntegrationMBeanExporter
no longer implements SmartLifecycle
; this means that start()
and stop()
operations
are no longer available to register/unregister MBeans.
The MBeans are now registered during context initialization and unregistered when the context is destroyed.
The MBean exporter provides a JMX operation to shut down the application in an orderly manner, intended for use before terminating the JVM.
public void stopActiveComponents(long howLong)
Its use and operation are described in Section 9.7, “Orderly Shutdown”.
The key benefit of a messaging architecture is loose coupling where participating components do not maintain any awareness about one another. This fact alone makes your application extremely flexible, allowing you to change components without affecting the rest of the flow, change messaging routes, message consuming styles (polling vs event driven), and so on. However, this unassuming style of architecture could prove to be difficult when things go wrong. When debugging, you would probably like to get as much information about the message as you can (its origin, channels it has traversed, etc.)
Message History is one of those patterns that helps by giving you an option to maintain some level of awareness of a message path either for debugging purposes or to maintain an audit trail. Spring integration provides a simple way to configure your message flows to maintain the Message History by adding a header to the Message and updating that header every time a message passes through a tracked component.
To enable Message History all you need is to define the message-history
element in your configuration.
<int:message-history/>
Now every named component (component that has an id defined) will be tracked.
The framework will set the history header in your Message.
Its value is very simple - List<Properties>
.
<int:gateway id="sampleGateway" service-interface="org.springframework.integration.history.sample.SampleGateway" default-request-channel="bridgeInChannel"/> <int:chain id="sampleChain" input-channel="chainChannel" output-channel="filterChannel"> <int:header-enricher> <int:header name="baz" value="baz"/> </int:header-enricher> </int:chain>
The above configuration will produce a very simple Message History structure:
[{name=sampleGateway, type=gateway, timestamp=1283281668091}, {name=sampleChain, type=chain, timestamp=1283281668094}]
To get access to Message History all you need is access the MessageHistory header. For example:
Iterator<Properties> historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); assertTrue(historyIterator.hasNext()); Properties gatewayHistory = historyIterator.next(); assertEquals("sampleGateway", gatewayHistory.get("name")); assertTrue(historyIterator.hasNext()); Properties chainHistory = historyIterator.next(); assertEquals("sampleChain", chainHistory.get("name"));
You might not want to track all of the components.
To limit the history to certain components based on their names, all you need is provide the tracked-components
attribute and specify a comma-delimited list of component names and/or patterns that match the components you want to track.
<int:message-history tracked-components="*Gateway, sample*, foo"/>
In the above example, Message History will only be maintained for all of the components that end with Gateway, start with sample, or match the name foo exactly.
Starting with version 4.0, you can also use the @EnableMessageHistory
annotation in a @Configuration
class.
In addition, the MessageHistoryConfigurer
bean is now exposed as a JMX MBean by the IntegrationMBeanExporter
(see Section 9.2.7, “MBean Exporter”), allowing the patterns to be changed at runtime.
Note, however, that the bean must be stopped (turning off message history) in order to change the patterns.
This feature might be useful to temporarily turn on history to analyze a system.
The MBean’s object name is "<domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer"
.
Important | |
---|---|
If multiple beans (declared by |
Note | |
---|---|
Remember that by definition the Message History header is immutable (you can’t re-write history, although some try). Therefore, when writing Message History values, the components are either creating brand new Messages (when the component is an origin), or they are copying the history from a request Message, modifying it and setting the new list on a reply Message. In either case, the values can be appended even if the Message itself is crossing thread boundaries. That means that the history values can greatly simplify debugging in an asynchronous message flow. |
Enterprise Integration Patterns (EIP) identifies several patterns that have the capability to buffer messages. For example, an Aggregator buffers messages until they can be released and a QueueChannel buffers messages until consumers explicitly receive those messages from that channel. Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.
To mitigate the risk of losing Messages, EIP defines the Message Store pattern which allows EIP components to store Messages typically in some type of persistent store (e.g. RDBMS).
Spring Integration provides support for the Message Store pattern by a) defining a org.springframework.integration.store.MessageStore
strategy interface, b) providing several implementations of this interface, and c) exposing a message-store
attribute on all components that have the capability to buffer messages so that you can inject any instance that implements the MessageStore
interface.
Details on how to configure a specific Message Store implementation and/or how to inject a MessageStore
implementation into a specific buffering component are described throughout the manual (see the specific component, such as QueueChannel, Aggregator, Delayer etc.), but here are a couple of samples to give you an idea:
QueueChannel
<int:channel id="myQueueChannel"> <int:queue message-store="refToMessageStore"/> <int:channel>
Aggregator
<int:aggregator … message-store="refToMessageStore"/>
By default Messages are stored in-memory using org.springframework.integration.store.SimpleMessageStore
, an implementation of MessageStore
.
That might be fine for development or simple low-volume environments where the potential loss of non-persistent messages is not a concern.
However, the typical production application will need a more robust option, not only to mitigate the risk of message loss but also to avoid potential out-of-memory errors.
Therefore, we also provide MessageStore implementations for a variety of data-stores.
Below is a complete list of supported implementations:
Important | |
---|---|
However be aware of some limitations while using persistent implementations of the The Message data (payload and headers) is serialized and deserialized using different serialization strategies depending on the implementation of the Special attention must be paid to the headers that represent certain types of data. For example, if one of the headers contains an instance of some Spring Bean, upon deserialization you may end up with a different instance of that bean, which directly affects some of the implicit headers created by the framework (e.g., REPLY_CHANNEL or ERROR_CHANNEL). Currently they are not serializable, but even if they were, the deserialized channel would not represent the expected instance. Beginning with Spring Integration version 3.0, this issue can be resolved with a header enricher, configured to replace these headers with a name after registering the channel with the Also when configuring a message-flow like this: gateway → queue-channel (backed by a persistent Message Store) → service-activator That gateway creates a Temporary Reply Channel, and it will be lost by the time the service-activator’s poller reads from the queue. Again, you can use the header enricher to replace the headers with a String representation. For more information, refer to the Section 7.2.2, “Header Enricher”. |
Spring Integration 4.0 introduced two new interfaces ChannelMessageStore
- to implement operations specific for QueueChannel
s, PriorityCapableChannelMessageStore
- to mark MessageStore
implementation to be used for PriorityChannel
s and to provide priority order for persisted Messages.
The real behaviour depends on implementation.
The Framework provides these implementations, which can be used as a persistent MessageStore
for QueueChannel
and PriorityChannel
:
Starting with version 4.3, some MessageGroupStore
implementations can be injected with a custom
MessageGroupFactory
strategy to create/customize the MessageGroup
instances used by the MessageGroupStore
.
This defaults to a SimpleMessageGroupFactory
which produces SimpleMessageGroup
s based on the GroupType.HASH_SET
(LinkedHashSet
) internal collection.
Other possible options are SYNCHRONISED_SET
and BLOCKING_QUEUE
, where the last one can be used to reinstate the
previous SimpleMessageGroup
behavior.
Also the PERSISTENT
option is available. See the next section for more information.
Starting with version 4.3, all persistence MessageGroupStore
s retrieve MessageGroup
s and their messages
from the store with the Lazy-Load manner.
In most cases it is useful for the Correlation MessageHandler
s (Section 6.4, “Aggregator” and Section 6.5, “Resequencer”),
when it would be an overhead to load entire MessageGroup
from the store on each correlation operation.
To switch off the lazy-load behavior the AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
option
can be used from the configuration.
Our performance tests for lazy-load on MongoDB MessageStore
(Section 22.3, “MongoDB Message Store”) and
<aggregator>
(Section 6.4, “Aggregator”)
with custom release-strategy
like:
<int:aggregator input-channel="inputChannel" output-channel="outputChannel" message-store="mongoStore" release-strategy-expression="size() == 1000"/>
demonstrate this results for 1000 simple messages:
StopWatch 'Lazy-Load Performance': running time (millis) = 38918 ----------------------------------------- ms % Task name ----------------------------------------- 02652 007% Lazy-Load 36266 093% Eager
Many external systems, services or resources aren’t transactional (Twitter, RSS, file system etc.) and there is no any ability to mark the data as read.
Or there is just need to implement the Enterprise Integration Pattern Idempotent Receiver in some integration solutions.
To achieve this goal and store some previous state of the Endpoint before the next interaction with external system, or deal with the next Message, Spring Integration provides the Metadata Store component being an implementation of the org.springframework.integration.metadata.MetadataStore
interface with a general key-value contract.
The Metadata Store is designed to store various types of generic meta-data (e.g., published date of the last feed entry that has been processed) to help components such as the Feed adapter deal with duplicates.
If a component is not directly provided with a reference to a MetadataStore
, the algorithm for locating a metadata store is as follows: First, look for a bean with id metadataStore
in the ApplicationContext.
If one is found then it will be used, otherwise it will create a new instance of SimpleMetadataStore
which is an in-memory implementation that will only persist metadata within the lifecycle of the currently running Application Context.
This means that upon restart you may end up with duplicate entries.
If you need to persist metadata between Application Context restarts, these persistent MetadataStores
are provided by
the framework:
The PropertiesPersistingMetadataStore
is backed by a properties file and a PropertiesPersister.
By default, it only persists the state when the application context is closed normally. It implements Flushable
so you
can persist the state at will, be invoking flush()
.
<bean id="metadataStore" class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>
Alternatively, you can provide your own implementation of the MetadataStore
interface (e.g.
JdbcMetadataStore) and configure it as a bean in the Application Context.
Starting with version 4.0, SimpleMetadataStore
, PropertiesPersistingMetadataStore
and RedisMetadataStore
implement ConcurrentMetadataStore
.
These provide for atomic updates and can be used across multiple component or application instances.
The Metadata Store is useful for implementing the EIP Idempotent Receiver pattern, when there is need to filter an incoming Message if it has already been processed, and just discard it or perform some other logic on discarding. The following configuration is an example of how to do this:
<int:filter input-channel="serviceChannel" output-channel="idempotentServiceChannel" discard-channel="discardChannel" expression="@metadataStore.get(headers.businessKey) == null"/> <int:publish-subscribe-channel id="idempotentServiceChannel"/> <int:outbound-channel-adapter channel="idempotentServiceChannel" expression="@metadataStore.put(headers.businessKey, '')"/> <int:service-activator input-channel="idempotentServiceChannel" ref="service"/>
The value
of the idempotent entry may be some expiration date, after which that entry should be removed from Metadata Store by some scheduled reaper.
Also see Section 8.9.11, “Idempotent Receiver Enterprise Integration Pattern”.
Some metadata stores (currently only zookeeper) support registering a listener to receive events when items change.
public interface MetadataStoreListener { void onAdd(String key, String value); void onRemove(String key, String oldValue); void onUpdate(String key, String newValue); }
See the javadocs for more information.
The MetadataStoreListenerAdapter
can be subclassed if you are only interested in a subset of events.
As described in (EIP), the idea behind the Control Bus is that the same messaging system can be used for monitoring and managing the components within the framework as is used for "application-level" messaging. In Spring Integration we build upon the adapters described above so that it’s possible to send Messages as a means of invoking exposed operations.
<int:control-bus input-channel="operationChannel"/>
The Control Bus has an input channel that can be accessed for invoking operations on the beans in the application context. It also has all the common properties of a service activating endpoint, e.g. you can specify an output channel if the result of the operation has a return value that you want to send on to a downstream channel.
The Control Bus executes messages on the input channel as Spring Expression Language expressions.
It takes a message, compiles the body to an expression, adds some context, and then executes it.
The default context supports any method that has been annotated with @ManagedAttribute
or @ManagedOperation
.
It also supports the methods on Spring’s Lifecycle
interface, and it supports methods that are used to configure several of Spring’s TaskExecutor
and TaskScheduler
implementations.
The simplest way to ensure that your own methods are available to the Control Bus is to use the @ManagedAttribute
and/or @ManagedOperation
annotations.
Since those are also used for exposing methods to a JMX MBean registry, it’s a convenient by-product (often the same types of operations you want to expose to the Control Bus would be reasonable for exposing via JMX).
Resolution of any particular instance within the application context is achieved in the typical SpEL syntax.
Simply provide the bean name with the SpEL prefix for beans (@
).
For example, to execute a method on a Spring Bean a client could send a message to the operation channel as follows:
Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)
The root of the context for the expression is the Message
itself, so you also have access to the payload
and headers
as variables within your expression.
This is consistent with all the other expression support in Spring Integration endpoints.
With Java and Annotations the Control Bus can be configured as follows:
@Bean @ServiceActivator(inputChannel = "operationChannel") public ExpressionControlBusFactoryBean controlBus() { return new ExpressionControlBusFactoryBean(); }
Or, when using Java DSL flow definitions:
@Bean public IntegrationFlow controlBusFlow() { return IntegrationFlows.from("controlBus") .controlBus() .get(); }
Or, if you prefer Lambda style with automatic DirectChannel
creation:
@Bean public IntegrationFlow controlBus() { return IntegrationFlowDefinition::controlBus; }
In this case, the channel is named controlBus.input
.
As described in Section 9.2.7, “MBean Exporter”, the MBean exporter provides a JMX operation stopActiveComponents, which is used to stop the application in an orderly manner. The operation has a single long parameter. The parameter indicates how long (in milliseconds) the operation will wait to allow in-flight messages to complete. The operation works as follows:
The first step calls beforeShutdown()
on all beans that implement OrderlyShutdownCapable
.
This allows such components to prepare for shutdown.
Examples of components that implement this interface, and what they do with this call include: JMS and AMQP message-driven adapters stop their listener containers; TCP server connection factories stop accepting new connections (while keeping existing connections open); TCP inbound endpoints drop (log) any new messages received; http inbound endpoints return 503 - Service Unavailable for any new requests.
The second step stops any active channels, such as JMS- or AMQP-backed channels.
The third step stops all MessageSource
s.
The fourth step stops all inbound MessageProducer
s (that are not OrderlyShutdownCapable
).
The fifth step waits for any remaining time left, as defined by the value of the long parameter passed in to the operation. This is intended to allow any in-flight messages to complete their journeys. It is therefore important to select an appropriate timeout when invoking this operation.
The sixth step calls afterShutdown()
on all OrderlyShutdownCapable components.
This allows such components to perform final shutdown tasks (closing all open sockets, for example).
As discussed in the section called “Orderly Shutdown Managed Operation” this operation can be invoked using JMX.
If you wish to programmatically invoke the method, you will need to inject, or otherwise get a reference to, the IntegrationMBeanExporter
.
If no id
attribute is provided on the <int-jmx:mbean-export/>
definition, the bean will have a generated name.
This name contains a random component to avoid ObjectName
collisions if multiple Spring Integration contexts exist in the same JVM (MBeanServer).
For this reason, if you wish to invoke the method programmatically, it is recommended that you provide the exporter with an id
attribute so it can easily be accessed in the application context.
Finally, the operation can be invoked using the <control-bus>
; see the monitoring Spring Integration sample application for details.
Important | |
---|---|
The above algorithm was improved in version 4.1.
Previously, all task executors and schedulers were stopped.
This could cause mid-flow messages in |
Starting with version 4.3, Spring Integration provides access to an application’s runtime object model which can, optionally, include component metrics.
It is exposed as a graph, which may be used to visualize the current state of the integration application.
The o.s.i.support.management.graph
package contains all the required classes to collect, build and render the runtime state of Spring Integration components as a single tree-like Graph
object.
The IntegrationGraphServer
should be declared as a bean to build, retrieve and refresh the Graph
object.
The resulting Graph
object can be serialized to any format, although JSON is flexible and convenient to parse and represent on the client side.
A simple Spring Integration application with only the default components would expose a graph as follows:
{ "contentDescriptor": { "providerVersion": "4.3.0.RELEASE", "providerFormatVersion": 1.0, "provider": "spring-integration", "name": "myApplication" }, "nodes": [ { "nodeId": 1, "name": "nullChannel", "stats": null, "componentType": "channel" }, { "nodeId": 2, "name": "errorChannel", "stats": null, "componentType": "publish-subscribe-channel" }, { "nodeId": 3, "name": "_org.springframework.integration.errorLogger", "stats": { "duration": { "count": 0, "min": 0.0, "max": 0.0, "mean": 0.0, "standardDeviation": 0.0, "countLong": 0 }, "errorCount": 0, "standardDeviationDuration": 0.0, "countsEnabled": true, "statsEnabled": true, "loggingEnabled": false, "handleCount": 0, "meanDuration": 0.0, "maxDuration": 0.0, "minDuration": 0.0, "activeCount": 0 }, "componentType": "logging-channel-adapter", "output": null, "input": "errorChannel" } ], "links": [ { "from": 2, "to": 3, "type": "input" } ] }
As you can see, the graph consists of three top-level elements.
The contentDescriptor
graph element is pretty straightforward and contains general information about the application providing the data.
The name
can be customized on the IntegrationGraphServer
bean or via spring.application.name
application context environment property.
Other properties are provided by the framework and allows you to distinguish a similar model from other sources.
The links
graph element represents connections between nodes from the nodes
graph element and, therefore, between integration components in the source Spring Integration application.
For example from a MessageChannel
to an EventDrivenConsumer
with some MessageHandler
;
or from an AbstractReplyProducingMessageHandler
to a MessageChannel
.
For the convenience and to allow to determine a link purpose, the model is supplied with the type
attribute.
The possible types are:
MessageChannel
to the endpoint; inputChannel
or requestChannel
property;
MessageHandler
, MessageProducer
or SourcePollingChannelAdapter
to the MessageChannel
via an outputChannel
or replyChannel
property;
MessageHandler
on PollingConsumer
or MessageProducer
or SourcePollingChannelAdapter
to the MessageChannel
via an errorChannel
property;
DiscardingMessageHandler
(e.g. MessageFilter
) to the MessageChannel
via errorChannel
property.
AbstractMappingMessageRouter
(e.g. HeaderValueRouter
) to the MessageChannel
.
Similar to output but determined at run-time.
May be a configured channel mapping, or a dynamically resolved channel.
Routers will typically only retain up to 100 dynamic routes for this purpose, but this can be modified using the dynamicChannelLimit
property.
The information from this element can be used by a visualizing tool to render connections between nodes from the nodes
graph element, where the from
and to
numbers represent the value from the nodeId
property of the linked nodes.
For example the link type
can be used to determine the proper port on the target node:
+---(discard) | +----o----+ | | | | | | (input)--o o---(output) | | | | | | +----o----+ | +---(error)
The nodes
graph element is perhaps the most interesting because its elements contain not only the runtime components with their componentType
s and name
s, but can also optionally contain metrics exposed by the component.
Node elements contain various properties which are generally self-explanatory.
For example, expression-based components include the expression
property containing the primary expression string for the component.
To enable the metrics, add an @EnableIntegrationManagement
to some @Configuration
class or add an <int:management/>
element to your XML configuration.
You can control exactly which components in the framework collect statistics.
See Section 9.1, “Metrics and Management” for complete information.
See the stats
attribute from the _org.springframework.integration.errorLogger
component in the JSON example above.
The nullChannel
and errorChannel
don’t provide statistics information in this case, because the configuration for this example was:
@Configuration @EnableIntegration @EnableIntegrationManagement(statsEnabled = "_org.springframework.integration.errorLogger.handler", countsEnabled = "!*", defaultLoggingEnabled = "false") public class ManagementConfiguration { @Bean public IntegrationGraphServer integrationGraphServer() { return new IntegrationGraphServer(); } }
The nodeId
represents a unique incremental identifier to distinguish one component from another.
It is also used in the links
element to represent a relationship (connection) of this component to others, if any.
The input
and output
attributes are for the inputChannel
and outputChannel
properties of the AbstractEndpoint
, MessageHandler
, SourcePollingChannelAdapter
or MessageProducerSupport
.
See the next paragraph for more information.
Spring Integration components have various levels of complexity.
For example, any polled MessageSource
also has a SourcePollingChannelAdapter
and a MessageChannel
to which to send messages from the source data periodically.
Other components might be middleware request-reply components, e.g. JmsOutboundGateway
, with a consuming AbstractEndpoint
to subscribe to (or poll) the requestChannel
(input
) for messages, and a replyChannel
(output
) to produce a reply message to send downstream.
Meanwhile, any MessageProducerSupport
implementation (e.g. ApplicationEventListeningMessageProducer
) simply wraps some source protocol listening logic and sends messages to the outputChannel
.
Within the graph, Spring Integration components are represented using the IntegrationNode
class hierarchy, which you can find in the o.s.i.support.management.graph
package.
For example the ErrorCapableDiscardingMessageHandlerNode
could be used for the AggregatingMessageHandler
(because it has a discardChannel
option) and can produce errors when consuming from a PollableChannel
using a PollingConsumer
.
Another sample is CompositeMessageHandlerNode
- for a MessageHandlerChain
when subscribed to a SubscribableChannel
, using an EventDrivenConsumer
.
Note | |
---|---|
The |
@MessagingGateway(defaultRequestChannel = "four") public interface Gate { void foo(String foo); void foo(Integer foo); void bar(String bar); }
produces nodes like:
{ "nodeId" : 10, "name" : "gate.bar(class java.lang.String)", "stats" : null, "componentType" : "gateway", "output" : "four", "errors" : null }, { "nodeId" : 11, "name" : "gate.foo(class java.lang.String)", "stats" : null, "componentType" : "gateway", "output" : "four", "errors" : null }, { "nodeId" : 12, "name" : "gate.foo(class java.lang.Integer)", "stats" : null, "componentType" : "gateway", "output" : "four", "errors" : null }
This IntegrationNode
hierarchy can be used for parsing the graph model on the client side, as well as for the understanding the general Spring Integration runtime behavior.
See also Section 3.7, “Programming Tips and Tricks” for more information.
If your application is WEB-based (or built on top of Spring Boot using an embedded web container) and the Spring Integration HTTP module (see Chapter 17, HTTP Support) is present on the classpath, you can use a IntegrationGraphController
to expose the IntegrationGraphServer
functionality as a REST service.
For this purpose, the @EnableIntegrationGraphController
@Configuration
class annotation and the <int-http:graph-controller/>
XML element, are available in the HTTP module.
Together with the @EnableWebMvc
annotation (or <mvc:annotation-driven/>
for xml definitions), this configuration registers an IntegrationGraphController
@RestController
where its @RequestMapping.path
can be configured on the @EnableIntegrationGraphController
annotation or <int-http:graph-controller/>
element.
The default path is /integration
.
The IntegrationGraphController
@RestController
provides these services:
@GetMapping(name = "getGraph")
- to retrieve the state of the Spring Integration components since the last IntegrationGraphServer
refresh.
The o.s.i.support.management.graph.Graph
is returned as a @ResponseBody
of the REST service;
@GetMapping(path = "/refresh", name = "refreshGraph")
- to refresh the current Graph
for the actual runtime state and return it as a REST response.
It is not necessary to refresh the graph for metrics, they are provided in real-time when the graph is retrieved.
Refresh can be called if the application context has been modified since the graph was last retrieved and the graph is completely rebuilt.
Any Security and Cross Origin restrictions for the IntegrationGraphController
can be achieved with the standard configuration options and components provided by Spring Security and Spring MVC projects.
A simple example of that follows:
<mvc:annotation-driven /> <mvc:cors> <mvc:mapping path="/myIntegration/**" allowed-origins="http://localhost:9090" allowed-methods="GET" /> </mvc:cors> <security:http> <security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" /> </security:http> <int-http:graph-controller path="/myIntegration" />
The Java & Annotation Configuration variant follows; note that, for convenience, the annotation provides an allowedOrigins
attribute; this just provides GET
access to the path
.
For more sophistication, you can configure the CORS mappings using standard Spring MVC mechanisms.
@Configuration @EnableWebMvc @EnableWebSecurity @EnableIntegration @EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090") public class IntegrationConfiguration extends WebSecurityConfigurerAdapter { @Override protected void configure(HttpSecurity http) throws Exception { http .authorizeRequests() .antMatchers("/testIntegration/**").hasRole("ADMIN") // ... .formLogin(); } //... }