Spring Integration provides Channel Adapters for receiving and publishing JMX Notifications. There is also an Inbound Channel Adapter for polling JMX MBean attribute values, and an Outbound Channel Adapter for invoking JMX MBean operations.
The Notification-listening Channel Adapter requires a JMX ObjectName for the MBean that publishes notifications to which this listener should be registered. A very simple configuration might look like this:
<int-jmx:notification-listening-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=publisher"/>
Tip | |
---|---|
The notification-listening-channel-adapter
registers with an MBeanServer at
startup, and the default bean name is mbeanServer
which happens to be the same bean name generated when using
Spring's <context:mbean-server/> element.
If you need to use a different name, be sure to include the
mbean-server attribute.
|
The adapter can also accept a reference to a
NotificationFilter
and a
handback Object to provide some context that is
passed back with each Notification. Both of those attributes are optional.
Extending the above example to include those attributes as well as an
explicit MBeanServer
bean name would
produce the following:
<int-jmx:notification-listening-channel-adapter id="adapter" channel="channel" mbean-server="someServer" object-name="example.domain:name=somePublisher" notification-filter="notificationFilter" handback="myHandback"/>
The Notification-listening Channel Adapter is
event-driven and registered with the MBeanServer
directly. It does not require any poller configuration.
Note | |
---|---|
For this component only, the object-name attribute can contain an
ObjectName pattern (e.g. "org.foo:type=Bar,name=*") and the adapter will receive notifications
from all MBeans with ObjectNames that match the pattern. In addition, the object-name
attribute can contain a SpEL reference to a <util:list/> of ObjectName patterns:
<jmx:notification-listening-channel-adapter id="manyNotificationsAdapter" channel="manyNotificationsChannel" object-name="#{patterns}"/> <util:list id="patterns"> <value>org.foo:type=Foo,name=*</value> <value>org.foo:type=Bar,name=*</value> </util:list>The names of the located MBean(s) will be logged when DEBUG level logging is enabled. |
The Notification-publishing Channel Adapter is relatively simple. It only requires a JMX ObjectName in its configuration as shown below.
<context:mbean-export/> <int-jmx:notification-publishing-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=publisher"/>
It does also require that an MBeanExporter
be
present in the context. That is why the <context:mbean-export/>
element is shown above as well.
When Messages are sent to the channel for this adapter, the Notification is created from the Message content. If the payload is a String it will be passed as the message text for the Notification. Any other payload type will be passed as the userData of the Notification.
JMX Notifications also have a type, and it should be a
dot-delimited String. There are two ways to provide the
type. Precedence will always be given to a
Message header value associated with the JmxHeaders.NOTIFICATION_TYPE
key. On the other hand, you can rely on a fallback default-notification-type
attribute provided in the configuration.
<context:mbean-export/> <int-jmx:notification-publishing-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=publisher" default-notification-type="some.default.type"/>
The Attribute Polling Channel Adapter is useful when you have a requirement, to periodically check on some value that is available through an MBean as a managed attribute. The poller can be configured in the same way as any other polling adapter in Spring Integration (or it's possible to rely on the default poller). The object-name and attribute-name are required. An MBeanServer reference is also required, but it will automatically check for a bean named mbeanServer by default, just like the Notification-listening Channel Adapter described above.
<int-jmx:attribute-polling-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=someService" attribute-name="InvocationCount"> <int:poller max-messages-per-poll="1" fixed-rate="5000"/> </int-jmx:attribute-polling-channel-adapter>
The Tree Polling Channel Adapter queries the JMX MBean tree and sends a message with a payload that is the graph of objects that matches the query. By default the MBeans are mapped to primitives and simple Objects like Map, List and arrays - permitting simple transformation, for example, to JSON. An MBeanServer reference is also required, but it will automatically check for a bean named mbeanServer by default, just like the Notification-listening Channel Adapter described above. A basic configuration would be:
<int-jmx:tree-polling-channel-adapter id="adapter" channel="channel" query-name="example.domain:type=*"> <int:poller max-messages-per-poll="1" fixed-rate="5000"/> </int-jmx:tree-polling-channel-adapter>
This will include all attributes on the MBeans selected. You can filter the attributes by
providing an MBeanObjectConverter
that
has an appropriate filter configured. The converter can be provided as a reference to
a bean definition using the converter
attribute, or as an
inner <bean/> definition. A DefaultMBeanObjectConverter
is
provided which can take a MBeanAttributeFilter
in
its constructor argument.
Two standard filters are provided; the NamedFieldsMBeanAttributeFilter
allows you to specify a list of attributes to include and the
NotNamedFieldsMBeanAttributeFilter
allows you to specify a list
of attributes to exclude. You can also implement your own filter
The operation-invoking-channel-adapter enables Message-driven invocation of any managed operation exposed by an MBean. Each invocation requires the operation name to be invoked and the ObjectName of the target MBean. Both of these must be explicitly provided via adapter configuration:
<int-jmx:operation-invoking-channel-adapter id="adapter" object-name="example.domain:name=TestBean" operation-name="ping"/>
Then the adapter only needs to be able to discover the mbeanServer bean. If a different bean name is required, then provide the mbean-server attribute with a reference.
The payload of the Message will be mapped to the parameters of the operation, if any. A Map-typed payload with String keys is treated as name/value pairs, whereas a List or array would be passed as a simple argument list (with no explicit parameter names). If the operation requires a single parameter value, then the payload can represent that single value, and if the operation requires no parameters, then the payload would be ignored.
If you want to expose a channel for a single common operation to be invoked by Messages that need not contain headers, then that option works well.
Similar to the operation-invoking-channel-adapter Spring Integration also provides a operation-invoking-outbound-gateway, which could be used when dealing with non-void operations and a return value is required. Such return value will be sent as message payload to the reply-channel specified by this Gateway.
<int-jmx:operation-invoking-outbound-gateway request-channel="requestChannel" reply-channel="replyChannel" object-name="o.s.i.jmx.config:type=TestBean,name=testBeanGateway" operation-name="testWithReturn"/>
If the reply-channel attribute is not provided,
the reply message will be sent to the channel that is identified
by the MessageHeaders.REPLY_CHANNEL
header. That header is typically auto-created by the entry point
into a message flow, such as any Gateway component.
However, if the message flow was started by manually creating a
Spring Integration Message and sending it directly to a
Channel, then you must specify the message header
explicitly or use the provided reply-channel attribute.
Spring Integration components themselves may be exposed as MBeans
when the IntegrationMBeanExporter
is configured. To
create an instance of the IntegrationMBeanExporter
,
define a bean and provide a reference to an MBeanServer
and a domain name (if desired). The domain can be left out, in which
case the default domain is org.springframework.integration.
<int-jmx:mbean-export id="integrationMBeanExporter" default-domain="my.company.domain" server="mbeanServer"/> <bean id="mbeanServer" class="org.springframework.jmx.support.MBeanServerFactoryBean"> <property name="locateExistingServerIfPossible" value="true"/> </bean>
Once the exporter is defined, start up your application with:
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=6969 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
Then start JConsole (free with the JDK), and connect to the local process on
localhost:6969
to get a look at the management
endpoints exposed. (The port and client are just examples to get you
started quickly, there are other JMX clients available and some offer more
sophisticated features than JConsole.)
Important | |
---|---|
The MBean exporter is orthogonal to the one provided in Spring core
- it registers message channels and message handlers, but not itself. You
can expose the exporter itself, and certain other components in Spring
Integration, using the standard It also has a useful operation, as discussed in the section called “Orderly Shutdown Managed Operation”. |
All the MessageChannel
,
MessageHandler
and
MessageSource
instances
in the application are wrapped by the MBean exporter to provide
management and monitoring features. The generated JMX object names
for each component type are listed in the table below:
Table 8.1.
Component Type | ObjectName |
---|---|
MessageChannel | o.s.i:type=MessageChannel,name=<channelName> |
MessageSource | o.s.i:type=MessageSource,name=<channelName>,bean=<source> |
MessageHandler | o.s.i:type=MessageSource,name=<channelName>,bean=<source> |
The bean attribute in the object names for sources and handlers takes one of the values in the table below:
Table 8.2.
Bean Value | Description |
---|---|
endpoint | The bean name of the enclosing endpoint (e.g. <service-activator>) if there is one |
anonymous | An indication that the enclosing endpoint didn't have a user-specified bean name, so the JMX name is the input channel name |
internal | For well-known Spring Integration default components |
handler | None of the above: fallback to the
toString() of the object being monitored
(handler or source)
|
Custom elements can be appended to the object name by providing a reference to a
Properties
object in the object-name-static-properties
attribute.
Also, since Spring Integration 3.0, you can use a custom
ObjectNamingStrategy
using the object-naming-strategy
attribute. This permits greater control over the naming of the
MBeans. For example, to group all Integration MBeans under
an 'Integration' type. A simple custom naming strategy implementation might be:
public class Namer implements ObjectNamingStrategy { private final ObjectNamingStrategy realNamer = new KeyNamingStrategy(); @Override public ObjectName getObjectName(Object managedBean, String beanKey) throws MalformedObjectNameException { String actualBeanKey = beanKey.replace("type=", "type=Integration,componentType="); return realNamer.getObjectName(managedBean, actualBeanKey); } }
The beanKey
argument is a String containing the standard object name beginning with
the default-domain
and including any additional static properties.
This example simply moves the standard type
part to componentType
and
sets the type
to 'Integration',
enabling selection of all Integration MBeans in one query:
"my.domain:type=Integration,*
. This also groups the beans under one tree entry under the
domain in tools like VisualVM.
Note | |
---|---|
The default naming strategy is a
MetadataNamingStrategy. The exporter propagates the default-domain to that object to allow it
to generate a fallback object name if parsing of the bean key fails. If your custom naming strategy is a
MetadataNamingStrategy (or subclass), the exporter will not
propagate the default-domain ; you will need to configure it on your strategy bean.
|
Message channels report metrics according to their concrete type.
If you are looking at a DirectChannel
, you
will see statistics for the send operation. If it is a
QueueChannel
, you will also see statistics for the
receive operation, as well as the count of messages that are currently
buffered by this QueueChannel
. In both
cases there are some metrics that are simple counters (message
count and error count), and some that are estimates of averages
of interesting quantities. The algorithms used to calculate these
estimates are described briefly in the table below:
Table 8.3.
Metric Type | Example | Algorithm |
---|---|---|
Count | Send Count | Simple incrementer. Increase by one when an event occurs. |
Duration | Send Duration (method execution time in milliseconds) | Exponential Moving Average with decay factor 10. Average of the method execution time over roughly the last 10 measurements. |
Rate | Send Rate (number of operations per second) | Inverse of Exponential Moving Average of the interval between events with decay in time (lapsing over 60 seconds) and per measurement (last 10 events). |
Ratio | Send Error Ratio (ratio of errors to total sends) | Estimate the success ratio as the Exponential Moving Average of the series composed of values 1 for success and 0 for failure (decaying as per the rate measurement over time and events). Error ratio is 1 - success ratio. |
A feature of the time-based average estimates is that they decay with time if no new measurements arrive. To help interpret the behaviour over time, the time (in seconds) since the last measurement is also exposed as a metric.
There are two basic exponential models: decay per measurement (appropriate for duration and anything where the number of measurements is part of the metric), and decay per time unit (more suitable for rate measurements where the time in between measurements is part of the metric). Both models depend on the fact that
S(n) = sum(i=0,i=n) w(i) x(i)
has a special form when w(i) = r^i
, with
r=constant
:
S(n) = x(n) + r S(n-1)
(so you only have to store S(n-1)
, not the whole series
x(i)
, to generate a new metric estimate from the last
measurement). The algorithms used in the duration metrics use
r=exp(-1/M)
with M=10
. The net
effect is that the estimate S(n)
is more heavily
weighted to recent measurements and is composed roughly of the last
M
measurements. So M
is the
"window" or lapse rate of the estimate In the case of the vanilla moving
average, i
is a counter over the number of
measurements. In the case of the rate we interpret i
as the elapsed time, or a combination of elapsed time and a counter (so
the metric estimate contains contributions roughly from the last
M
measurements and the last T
seconds).
The MBean exporter provides a JMX operation to shut down the application in an orderly manner, intended for use before terminating the JVM.
public void stopActiveComponents(boolean force, long howLong)
Its use and operation are described in Section 8.6, “Orderly Shutdown”.
The key benefit of a messaging architecture is loose coupling where participating components do not maintain any awareness about one another. This fact alone makes your application extremely flexible, allowing you to change components without affecting the rest of the flow, change messaging routes, message consuming styles (polling vs event driven), and so on. However, this unassuming style of architecture could prove to be difficult when things go wrong. When debugging, you would probably like to get as much information about the message as you can (its origin, channels it has traversed, etc.)
Message History is one of those patterns that helps by giving you an option to maintain some level of awareness of a message path either for debugging purposes or to maintain an audit trail. Spring integration provides a simple way to configure your message flows to maintain the Message History by adding a header to the Message and updating that header every time a message passes through a tracked component.
To enable Message History all you need is to define the message-history
element in your configuration.
<int:message-history/>
Now every named component (component that has an 'id' defined) will be tracked.
The framework will set the 'history' header in your Message. Its value is very simple - List<Properties>
.
<int:gateway id="sampleGateway" service-interface="org.springframework.integration.history.sample.SampleGateway" default-request-channel="bridgeInChannel"/> <int:chain id="sampleChain" input-channel="chainChannel" output-channel="filterChannel"> <int:header-enricher> <int:header name="baz" value="baz"/> </int:header-enricher> </int:chain>
The above configuration will produce a very simple Message History structure:
[{name=sampleGateway, type=gateway, timestamp=1283281668091}, {name=sampleChain, type=chain, timestamp=1283281668094}]
To get access to Message History all you need is access the MessageHistory header. For example:
Iterator<Properties> historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); assertTrue(historyIterator.hasNext()); Properties gatewayHistory = historyIterator.next(); assertEquals("sampleGateway", gatewayHistory.get("name")); assertTrue(historyIterator.hasNext()); Properties chainHistory = historyIterator.next(); assertEquals("sampleChain", chainHistory.get("name"));
You might not want to track all of the components. To limit the history to certain components based on their names,
all you need is provide the tracked-components
attribute and specify
a comma-delimited list of component names and/or patterns that match the components you want to track.
<int:message-history tracked-components="*Gateway, sample*, foo"/>
In the above example, Message History will only be maintained for all of the components that end with 'Gateway', start with 'sample', or match the name 'foo' exactly.
Note | |
---|---|
Remember that by definition the Message History header is immutable (you can't re-write history, although some try). Therefore, when writing Message History values, the components are either creating brand new Messages (when the component is an origin), or they are copying the history from a request Message, modifying it and setting the new list on a reply Message. In either case, the values can be appended even if the Message itself is crossing thread boundaries. That means that the history values can greatly simplify debugging in an asynchronous message flow. |
Enterprise Integration Patterns (EIP) identifies several patterns that have the capability to buffer messages. For example, an Aggregator buffers messages until they can be released and a QueueChannel buffers messages until consumers explicitly receive those messages from that channel. Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.
To mitigate the risk of losing Messages, EIP defines the Message Store pattern which allows EIP components to store Messages typically in some type of persistent store (e.g. RDBMS).
Spring Integration provides support for the Message Store pattern by
a) defining a org.springframework.integration.store.MessageStore
strategy interface,
b) providing several implementations of this interface, and
c) exposing a message-store
attribute on all components that have the capability to buffer messages
so that you can inject any instance that implements the MessageStore
interface.
Details on how to configure a specific Message Store implementation and/or how to inject
a MessageStore
implementation into a specific buffering component are described
throughout the manual (see the specific component, such as QueueChannel, Aggregator,
Resequencer etc.), but here are a couple of samples to give you an idea:
QueueChannel
<int:channel id="myQueueChannel"> <int:queue message-store="refToMessageStore"/> <int:channel>
Aggregator
<int:aggregator … message-store="refToMessageStore"/>
By default Messages are stored in-memory using org.springframework.integration.store.SimpleMessageStore
,
an implementation of MessageStore
. That might be fine for development or simple low-volume environments where the potential loss
of non-persistent messages is not a concern. However, the typical production application will need a more robust option, not only to mitigate the risk of
message loss but also to avoid potential out-of-memory errors. Therefore, we also provide MessageStore implementations for a variety of data-stores.
Below is a complete list of supported implementations:
Important | |
---|---|
However be aware of some limitations while using persistent implementations of the The Message data (payload and headers) is serialized and deserialized
using different serialization strategies depending on the implementation of the Special attention must be paid to the headers that represent certain types of data. For example, if one of the headers contains an instance of some Spring Bean, upon deserialization you may end up with a different instance of that bean, which directly affects some of the implicit headers created by the framework (e.g., REPLY_CHANNEL or ERROR_CHANNEL). Currently they are not serializable, but even if they were, the deserialized channel would not represent the expected instance.
Beginning with Spring Integration version 3.0, this issue can be resolved with a header enricher,
configured to replace these headers with a name after registering the channel with the Also when configuring a message-flow like this: gateway -> queue-channel (backed by a persistent Message Store) -> service-activator That gateway creates a Temporary Reply Channel, and it will be lost by the time the service-activator's poller reads from the queue. Again, you can use the header enricher to replace the headers with a String representation. For more information, refer to the Section 6.2.2, “Header Enricher”. |
Many external systems, services or resources aren't transactional (Twitter, RSS, file system etc.)
and there is no any ability to mark the data as read. Or there is just need to implement the
Enterprise Integration Pattern Idempotent Receiver
in some integration solutions. To achieve this goal and store some previous state of the Endpoint before the next
interaction with external system, or deal with the next Message, Spring Integration provides the Metadata Store
component being an implementation of the org.springframework.integration.metadata.MetadataStore
interface with a general key-value contract.
The Metadata Store is designed to store various types of generic meta-data
(e.g., published date of the last feed entry that has been processed) to help components such as the Feed adapter deal with duplicates.
If a component is not directly provided with a reference to a MetadataStore
,
the algorithm for locating a metadata store is as follows: First, look for a bean with id
metadataStore
in the ApplicationContext. If one is found then it will be used, otherwise
it will create a new instance of SimpleMetadataStore
which is an in-memory implementation
that will only persist metadata within the lifecycle of the currently running Application Context. This means
that upon restart you may end up with duplicate entries.
If you need to persist metadata between Application Context restarts, two
persistent MetadataStores
are provided by the framework:
The PropertiesPersistingMetadataStore
is backed by a properties file and a
PropertiesPersister
.
<bean id="metadataStore" class="org.springframework.integration.store.PropertiesPersistingMetadataStore"/>
Alternatively, you can provide your own implementation of the
MetadataStore
interface (e.g. JdbcMetadataStore)
and configure it as a bean in the Application Context.
The Metadata Store is useful for implementating the EIP Idempotent Receiver pattern, when there is need to filter an incoming Message if it has already been processed, and just discard it or perform some other logic on discarding. The following configuration is an example of how to do this:
<int:filter input-channel="serviceChannel" output-channel="idempotentServiceChannel" discard-channel="discardChannel" expression="@metadataStore.get(headers.businessKey) == null"/> <int:publish-subscribe-channel id="idempotentServiceChannel"/> <int:outbound-channel-adapter channel="idempotentServiceChannel" expression="@metadataStore.put(headers.businessKey, '')"/> <int:service-activator input-channel="idempotentServiceChannel" ref="service"/>
The value
of the idempotent entry may be some expiration date, after which that entry should
be removed from Metadata Store by some scheduled reaper.
As described in (EIP), the idea behind the Control Bus is that the same messaging system can be used for monitoring and managing the components within the framework as is used for "application-level" messaging. In Spring Integration we build upon the adapters described above so that it's possible to send Messages as a means of invoking exposed operations.
<int:control-bus input-channel="operationChannel"/>
The Control Bus has an input channel that can be accessed for invoking operations on the beans in the application context. It also has all the common properties of a service activating endpoint, e.g. you can specify an output channel if the result of the operation has a return value that you want to send on to a downstream channel.
The Control Bus executes messages on the input channel as Spring Expression Language expressions. It takes a message, compiles the body to an expression, adds some context, and then executes it. The default context supports any method that has been annotated with @ManagedAttribute or @ManagedOperation. It also supports the methods on Spring's Lifecycle interface, and it supports methods that are used to configure several of Spring's TaskExecutor and TaskScheduler implementations. The simplest way to ensure that your own methods are available to the Control Bus is to use the @ManagedAttribute and/or @ManagedOperation annotations. Since those are also used for exposing methods to a JMX MBean registry, it's a convenient by-product (often the same types of operations you want to expose to the Control Bus would be reasonable for exposing via JMS). Resolution of any particular instance within the application context is achieved in the typical SpEL syntax. Simply provide the bean name with the SpEL prefix for beans (@). For example, to execute a method on a Spring Bean a client could send a message to the operation channel as follows:
Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)
The root of the context for the expression is the
Message
itself, so you also have access to the 'payload'
and 'headers' as variables within your expression. This is consistent with all the other expression
support in Spring Integration endpoints.
As described in Section 8.1.7, “MBean Exporter”, the MBean exporter provides a JMX operation stopActiveComponents, which is used to stop the application in an orderly manner. The operation has two parameters, a boolean and a long. The boolean indicates whether attempts will be made to stop (interrupt) active threads; in most cases this will be set to false for orderly shutdown. The long parameter indicates how long (in milliseconds) the operation will wait to allow in-flight messages to complete. The operation works as follows:
The first step calls beforeShutdown()
on all beans that implement
OrderlyShutdownCapable
. This allows such components to prepare for shutdown.
Examples of components that implement this interface, and what they do with this call include: JMS and
AMQP message-driven adapters stop their listener containers; TCP server connection factories stop
accepting new connections (while keeping existing connections open); TCP inbound endpoints drop (log)
any new messages received; http inbound endpoints return 503 - Service Unavailable for any new
requests.
The second step stops any active channels, such as JMS- or AMQP-backed channels.
The third step stops all TaskScheduler
s, preventing any new
scheduled operations (polling etc).
The fourth step stops all TaskExecutor
s, preventing any new
tasks from running.
Note | |
---|---|
If the shutdown is running from a Spring-managed TaskExecutor , shutting down that
executor would cause all the timeout time to be consumed by this step, because the thread won't terminate).
For this reason, either use a dedicated executor (via the shutdownExecutor property on the MBean exporter),
or do not use a Spring-managed executor to invoke this operation.
|
The fifth step stops all MessageSource
s.
The sixth step waits for any remaining time left, as defined by the value of the long parameter passed in to the operation. This is intended to allow any in-flight messages to complete their journeys. It is therefore important to select an appropriate timeout when invoking this operation.
The seventh step calls afterShutdown()
on all OrderlyShutdownCapable components.
This allows such components to perform final shutdown tasks (closing all open sockets, for example).
Note | |
---|---|
If no time is left when we get to step 6, it probably means some thread is hung; in which case, the operation attempts a forced shutdown on all schedulers and executors before exiting. |
As discussed in the section called “Orderly Shutdown Managed Operation” this operation can be invoked using JMX. If you
wish to programmatically invoke the method, you will need to inject, or otherwise get a reference to,
the IntegrationMBeanExporter
. If no id
attribute is provided on
the <int-jmx:mbean-export/>
definition, the bean will have a generated name. This
name contains a random component to avoid ObjectName
collisions if multiple
Spring Integration contexts exist in the same JVM (MBeanServer).
For this reason, if you wish to invoke the method programmatically, it is recommended that you
provide the exporter with an id
attribute so it can easily be accessed in the
application context.
Finally, the operation can be invoked using the <control-bus>
; see the
monitoring Spring Integration sample application for details.