Spring Integration provides Channel Adapters for receiving and publishing JMX Notifications. There is also an inbound Channel Adapter for polling JMX MBean attribute values, and an outbound Channel Adapter for invoking JMX MBean operations.
The Notification-listening Channel Adapter requires a JMX ObjectName for the MBean that publishes Notifications to which this listener should be registered. A very simple configuration might look like this:
<int-jmx:notification-listening-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=publisher"/>
Tip | |
---|---|
The notification-listening-channel-adapter registers with an MBeanServer at startup, and the default bean name is "mbeanServer" which happens to be the same bean name generated when using Spring's <context:mbean-server/> element. If you need to use a different name be sure to include the "mbean-server" attribute. |
The adapter can also accept a reference to a NotificationFilter and a "handback" Object to provide some context that is passed back with each Notification. Both of those attributes are optional. Extending the above example to include those attributes as well as an explicit MBeanServer bean name would produce the following:
<int-jmx:notification-listening-channel-adapter id="adapter" channel="channel" mbean-server="someServer" object-name="example.domain:name=somePublisher" notification-filter="notificationFilter" handback="myHandback"/>
Since the notification-listening adapter is registered with the MBeanServer directly, it is event-driven and does not require any poller configuration.
The Notification-publishing Channel Adapter is relatively simple. It only requires a JMX ObjectName in its configuration as shown below.
<context:mbean:export/> <int-jmx:notification-publishing-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=publisher"/>
It does also require that an MBeanExporter be present in the context. That is why the <context:mbean-export/> element is shown above as well.
When Messages are sent to the channel for this adapter, the Notification is created from the Message content. If the payload is a String it will be passed as the "message" text for the Notification. Any other payload type will be passed as the "userData" of the Notification.
JMX Notifications also have a "type", and it should be a dot-delimited String. There are two ways to provide the type. Precedence will always be given to a Message header value associated with the JmxHeaders.NOTIFICATION_TYPE key. On the other hand, you can rely on a fallback "default-notification-type" attribute provided in the configuration.
<context:mbean:export/> <int-jmx:notification-publishing-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=publisher" default-notification-type="some.default.type"/>
The attribute polling adapter is useful when you have a requirement to periodically check on some value that is available through an MBean as a managed attribute. The poller can be configured in the same way as any other polling adapter in Spring Integration (or it's possible to rely on the default poller). The "object-name" and "attribute-name" are required. An MBeanServer reference is also required, but it will automatically check for a bean named "mbeanServer" by default just like the notification-listening-channel-adapter described above.
<int-jmx:attribute-polling-channel-adapter id="adapter" channel="channel" object-name="example.domain:name=someService" attribute-name="InvocationCount"> <int:poller max-messages-per-poll="1" fixed-rate="5000"/> </int-jmx:attribute-polling-channel-adapter>
The operation-invoking-channel-adapter enables Message-driven invocation of any managed operation exposed by an MBean. Each invocation requires the operation name to be invoked and the ObjectName of the target MBean. Both of these must be explicitly provided via adapter configuration:
<int-jmx:operation-invoking-channel-adapter id="adapter" object-name="example.domain:name=TestBean" operation-name="ping"/>
Then the adapter only needs to be able to discover the "mbeanServer" bean. If a different bean name is required, then provide the "mbean-server" attribute with a reference.
The payload of the Message will be mapped to the parameters of the operation, if any. A Map-typed payload with String keys is treated as name/value pairs whereas a List or array would be passed as a simple argument list (with no explicit parameter names). If the operation requires a single parameter value, then the payload can represent that single value, and if the operation requires no parameters, then the payload would be ignored.
If you want to expose a channel for a single common operation to be invoked by Messages that need not contain headers, then that option works well.
Similar to operation-invoking-channel-adapter Spring Integration also provides operation-invoking-outbound-gateway which could be used when dealing with non-void operations and return value is required. Such return value will be sent as message payload to the 'reply-channel' specified by this Gateway.
<int-jmx:operation-invoking-outbound-gateway request-channel="requestChannel" reply-channel="replyChannel" object-name="org.springframework.integration.jmx.config:type=TestBean,name=testBeanGateway" operation-name="testWithReturn"/>
Another way of
provideing the 'reply-channel' is by setting
MessageHeaders.REPLY_CHANNEL
Message
Header
Spring Integration components themselves may be exposed as MBeans
when the IntegrationMBeanExporter
is configured. To
create an instance of the IntegrationMBeanExporter
,
define a bean and provide a reference to an MBeanServer and a domain name
(if desired). The domain can be left out in which case the default domain
is "org.springframework.integration".
<int-jmx:mbean-exporter default-domain="my.company.domain" server="mbeanServer"/> <bean id="mbeanServer" class="org.springframework.jmx.support.MBeanServerFactoryBean"> <property name="locateExistingServerIfPossible" value="true"/> </bean>
Once the exporter is defined start up your application with
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=6969 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
Then start
JConsole (free with the JDK), and connect to the local process on
localhost:6969
to get a look at the management
endpoints exposed. (The port and client are just examples to get you
started quickly, there are other JMX clients available and some offer more
sophisticated features than JConsole.)
The MBean exporter is orthogonal to the one provided in Spring core
- it registers message channels and message handlers, but not itself. You
can expose the exporter itself, and certain other components in Spring
Integration, using the standard
<context:mbean-export/>
tag. The exporter has a
couple of useful metrics attached to it, for instance a count of the
number of active handlers and the number of queued messages (these would
both be important if you wanted to shutdown the context without losing any
messages).
All the MessageChannel, MessageHandler and MessageSource instances in the application are wrapped by the MBean exporter to provide management and monitoring features. For example, MessageChannel send The generated JMX object names for each component type are listed in the table below
Table 8.1.
Component Type | ObjectName |
---|---|
MessageChannel | org.springframework.integration:type=MessageChannel,name=<channelName> |
MessageSource | org.springframework.integration:type=MessageSource,name=<channelName>,bean=<source> |
MessageHandler | org.springframework.integration:type=MessageSource,name=<channelName>,bean=<source> |
The "bean" attribute in the object names for sources
and handlers takes one of the values in the table below
Table 8.2.
Bean Value | Description |
---|---|
endpoint | The bean name of the enclosing endpoint (e.g. <service-activator>) if there is one |
anonymous | An indication that the enclosing endpoint didn't have a user-specified bean name, so the JMX name is the input channel name |
internal | For well-known Spring Integration default components |
handler | None of the above: fallback to the
toString() of the object being monitored
(handler or source) |
Message channels report metrics according to their concrete type.
If you are looking at a DirectChannel
you will
see statistics for the send operation. If it is a
QueueChannel
you will also see statistics for the
receive operation. In both cases there are some metrics that are simple
counters (message count and error count), and some that are estimates of
averages of interesting quantities. The algorithms used to calculate
these estimates are described briefly in the table below:
Table 8.3.
Metric Type | Example | Algorithm |
---|---|---|
Count | Send Count | Simple incrementer. Increase by one when an event occurs. |
Duration | Send Duration (method execution time in milliseconds) | Exponential Moving Average with decay factor 10. Average of the method execution time over roughly the last 10 measurements. |
Rate | Send Rate (number of operations per second) | Inverse of Exponential Moving Average of the interval between events with decay in time (lapsing over 60 seconds) and per measurement (last 10 events). |
Ratio | Send Error Ratio (ratio of errors to total sends) | Estimate the success ratio as the Exponential Moving Average of the series composed of values 1 for success and 0 for failure (decaying as per the rate measurement over time and events). Error ratio is 1 - success ratio. |
A feature of the time-based average estimates is that they decay with time if no new measurements arrive. To help interpret the behaviour over time, the time (in seconds) since the last measurement is also exposed as a metric.
There are two basic exponential models: decay per measurement (appropriate for duration and anything where the number of measurements is part of the metric), and decay per time unit (more suitable for rate measurements where the time in between measurements is part of the metric). Both models depend on the fact that
S(n) = sum(i=0,i=n) w(i) x(i)
has a special form when w(i) = r^i
, with
r=constant
:
S(n) = x(n) + r S(n-1)
(so
you only have to store S(n-1)
, not the whole series
x(i)
, to generate a new metric estimate from the last
measurement). The algorithms used in the duration metrics use
r=exp(-1/M)
with M=10
. The net
effect is that the estimate S(n)
is more heavily
weighted to recent measurements and is composed roughly of the last
M
measurements. So M
is the
"window" or lapse rate of the estimate In the case of the vanilla moving
average, i
is a counter over the number of
measurements. In the case of the rate we interpret i
as the elapsed time, or a combination of elapsed time and a counter (so
the metric estimate contains contributions roughly from the last
M
measurements and the last T
seconds).
The key benefit of a messaging architecture is loose coupling where participating components do not maintain any awareness about one another. This fact alone makes your application extremely flexible, allowing you to change components without affecting the rest of the flow, change messaging routes, message consuming styles (polling vs event driven), and so on. However, this unassuming style of architecture could prove to be difficult when things go wrong. When debugging, you would probably like to get as much information about the message as you can (its origin, channels it has traversed, etc.)
Message History is one of those patterns that helps by giving you an option to maintain some level of awareness of a message path either for debugging purposes or to maintain an audit trail. Spring integration provides a simple way to configure your message flows to maintain the Message History by adding a header to the Message and updating that header every time a message passes through a tracked component.
To enable Message History all you need is to define the message-history
element in your configuration.
<int:message-history/>
Now every named component (component that has an 'id' defined) will be tracked.
The framework will set the 'history' header in your Message. Its value is very simple - List<Properties>
.
<int:gateway id="sampleGateway" service-interface="org.springframework.integration.history.sample.SampleGateway" default-request-channel="bridgeInChannel"/> <int:chain id="sampleChain" input-channel="chainChannel" output-channel="filterChannel"> <int:header-enricher> <int:header name="baz" value="baz"/> </int:header-enricher> </int:chain>
The above configuration will produce a very simple Message History structure:
[{name=sampleGateway, type=gateway, timestamp=1283281668091}, {name=sampleChain, type=chain, timestamp=1283281668094}]
To get access to Message History all you need is access the MessageHistory header. For example:
Iterator<Properties> historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); assertTrue(historyIterator.hasNext()); Properties gatewayHistory = historyIterator.next(); assertEquals("sampleGateway", gatewayHistory.get("name")); assertTrue(historyIterator.hasNext()); Properties chainHistory = historyIterator.next(); assertEquals("sampleChain", chainHistory.get("name"));
You might not want to track all of the components. To limit the history to certain components based on their names,
all you need is provide the tracked-components
attribute and specify
a comma-delimited list of component names and/or patterns that match the components you want to track.
<int:message-history tracked-components="*Gateway, sample*, foo"/>
In the above example, Message History will only be maintained for all of the components that end with 'Gateway', start with 'sample', or match the name 'foo' exactly.
Note | |
---|---|
Remember that by definition the Message History header is immutable (you can't re-write history, although some try). Therefore, when writing Message History values, the components are either creating brand new Messages (when the component is an origin), or they are copying the history from a request Message, modifying it and setting the new list on a reply Message. In either case, the values can be appended even if the Message itself is crossing thread boundaries. That means that the history values can greatly simplify debugging in an asynchronous message flow. |
Enterprise Integration Patterns (EIP) identifies several patterns that have the capability to buffer messages. For example, an Aggregator buffers messages until they can be released and a QueueChannel buffers messages until consumers explicitly receive those messages from that channel. Because of the failures that can occur at any point within your message flow, EIP components that buffer messages also introduce a point where messages could be lost.
To mitigate the risk of losing Messages, EIP defines the Message Store pattern which allows EIP components to store Messages typically in some type of persistent store (e.g. RDBMS).
Spring Integration provides support for the Message Store pattern by
a) defining a org.springframework.integration.store.MessageStore
strategy interface,
b) providing several implementations of this interface, and
c) exposing a message-store
attribute on all components that have the capability to buffer messages
so that you can inject any instance that implements the MessageStore
interface.
Details on how to configure a specific Message Store implementation and/or how to inject
a MessageStore
implementation into a specific buffering component are described
throughout the manual (see the specific component, such as QueueChannel, Aggregator,
Resequencer etc.), but here are a couple of samples to give you an idea:
QueueChannel
<int:channel id="myQueueChannel"> <int:queue message-store="refToMessageStore"/> <int:channel>
Aggregator
<int:aggregator . . . message-store="refToMessageStore"/>
By default Messages are stored in-memory using org.springframework.integration.store.SimpleMessageStore
,
an implementation of MessageStore
. That might be fine for development or simple low-volume environments where the potential loss
of non-persistent messages is not a concern. However, the typical production application will need a more robust option, not only to mitigate the risk of
message loss but also to avoid potential out-of-memory errors. Therefore, we also provide MessageStore implementations for a variety of data-stores.
Below is a complete list of supported implementations:
Important | |
---|---|
However be aware of some limitations while using persistent implementations of the The Message data (payload and headers) is serialized and deserialized
using different serialization strategies depending on the implementation of the
Special attention must be paid to the headers that represent certain types of data.
For example, if one of the headers contains an instance of some Spring Bean, upon deserialization you may end
up with a different instance of that bean,
which directly affects some of the implicit headers created by the framework (e.g., REPLY_CHANNEL or ERROR_CHANNEL).
Currently they are not serializable, but even if they were the deserialized channel would not represent the expected instance.
As a workaround we suggest to remove bean-ref headers via a Also avoid configuration of a message-flow like this: gateway -> queue-channel (backed by a persistent Message Store) -> service-activator That gateway creates a Temporary Reply Channel in the background, and it will be lost by the time the service-activator's poller reads from the queue, because it has been deserialized by another thread on the sending side. Nevertheless we are constantly thinking about potential improvements to the framework, such as a way to provide some robust default serialization strategy for messages in these cases. |
As described in (EIP), the idea behind the Control Bus is that the same messaging system can be used for monitoring and managing the components within the framework as is used for "application-level" messaging. In Spring Integration we build upon the adapters described above so that it's possible to send Messages as a means of invoking exposed operations.
<int:control-bus input-channel="operationChannel"/>
The Control Bus has an input channel that can be accessed for invoking operations on the beans in the application context. It also has all the common properties of a service activating endpoint, e.g. you can specify an output channel if the result of the operation has a return value that you want to send on to a downstream channel.
The Control Bus executes messages on the input channel as Spring Expression Language expressions. It takes a message, compiles the body to an expression, adds some context, and then executes it. The default context supports any method that has been annotated with @ManagedAttribute or @ManagedOperation. It also supports the methods on Spring's Lifecycle interface, and it supports methods that are used to configure several of Spring's TaskExecutor and TaskScheduler implementations. The simplest way to ensure that your own methods are available to the Control Bus is to use the @ManagedAttribute and/or @ManagedOperation annotations. Since those are also used for exposing methods to a JMX MBean registry, it's a convenient by-product (often the same types of operations you want to expose to the Control Bus would be reasonable for exposing via JMS). Resolution of any particular instance within the application context is achieved in the typical SpEL syntax. Simply provide the bean name with the SpEL prefix for beans (@). For example, to execute a method on a Spring Bean a client could send a message to the operation channel as follows:
Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build(); operationChannel.send(operation)
The root of the context for the expression is the
Message
itself, so you also have access to the 'payload'
and 'headers' as variables within your expression. This is consistent with all the other expression
support in Spring Integration endpoints.