The AOP Message Publishing feature allows you to construct and send a message as a by-product of a method invocation. For example, imagine you have a component and every time the state of this component changes you would like to be notified via a Message. The easiest way to send such notifications would be to send a message to a dedicated channel, but how would you connect the method invocation that changes the state of the object to a message sending process, and how should the notification Message be structured? The AOP Message Publishing feature handles these responsibilities with a configuration-driven approach.
Spring Integration provides two approaches: XML and Annotation-driven.
The annotation-driven approach allows you to annotate any method with the @Publisher
annotation, specifying a channel attribute.
The Message will be constructed from the return value of the method invocation and sent to a channel specified by the channel attribute.
To further manage message structure, you can also use a combination of both @Payload
and @Header
annotations.
Internally this message publishing feature of Spring Integration uses both Spring AOP by defining PublisherAnnotationAdvisor
and Spring 3.0’s Expression Language (SpEL) support, giving you considerable flexibility and control over the structure of the Message it will publish.
The PublisherAnnotationAdvisor
defines and binds the following variables:
Let’s look at a couple of examples:
@Publisher public String defaultPayload(String fname, String lname) { return fname + " " + lname; }
In the above example the Message will be constructed with the following structure:
@Publisher(channel="testChannel") public String defaultPayload(String fname, @Header("last") String lname) { return fname + " " + lname; }
In this example everything is the same as above, except that we are not using a default publishing channel.
Instead we are specifying the publishing channel via the channel attribute of the @Publisher
annotation.
We are also adding a @Header
annotation which results in the Message header named last having the same value as the lname method parameter.
That header will be added to the newly constructed Message.
@Publisher(channel="testChannel") @Payload public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) { return fname + " " + lname; }
The above example is almost identical to the previous one.
The only difference here is that we are using a @Payload
annotation on the method, thus explicitly specifying that the return value of the method should be used as the payload of the Message.
@Publisher(channel="testChannel") @Payload("#return + #args.lname") public String setName(String fname, String lname, @Header("x") int num) { return fname + " " + lname; }
Here we are expanding on the previous configuration by using the Spring Expression Language in the @Payload
annotation to further instruct the framework how the message should be constructed.
In this particular case the message will be a concatenation of the return value of the method invocation and the lname input argument.
The Message header named x will have its value determined by the num input argument.
That header will be added to the newly constructed Message.
@Publisher(channel="testChannel") public String argumentAsPayload(@Payload String fname, @Header String lname) { return fname + " " + lname; }
In the above example you see another usage of the @Payload
annotation.
Here we are annotating a method argument which will become the payload of the newly constructed message.
As with most other annotation-driven features in Spring, you will need to register a post-processor (PublisherAnnotationBeanPostProcessor
).
<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>
You can instead use namespace support for a more concise configuration:
<int:annotation-config default-publisher-channel="defaultChannel"/>
Similar to other Spring annotations (@Component, @Scheduled, etc.), @Publisher
can also be used as a meta-annotation.
That means you can define your own annotations that will be treated in the same way as the @Publisher
itself.
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Publisher(channel="auditChannel") public @interface Audit { }
Here we defined the @Audit
annotation which itself is annotated with @Publisher
.
Also note that you can define a channel
attribute on the meta-annotation thus encapsulating the behavior of where messages will be sent inside of this annotation.
Now you can annotate any method:
@Audit public String test() { return "foo"; }
In the above example every invocation of the test()
method will result in a Message with a payload created from its return value.
Each Message will be sent to the channel named auditChannel.
One of the benefits of this technique is that you can avoid the duplication of the same channel name across multiple annotations.
You also can provide a level of indirection between your own, potentially domain-specific annotations and those provided by the framework.
You can also annotate the class which would mean that the properties of this annotation will be applied on every public method of that class.
@Audit static class BankingOperationsImpl implements BankingOperations { public String debit(String amount) { . . . } public String credit(String amount) { . . . } }
The XML-based approach allows you to configure the same AOP-based Message Publishing functionality with simple namespace-based configuration of a MessagePublishingInterceptor
.
It certainly has some benefits over the annotation-driven approach since it allows you to use AOP pointcut expressions, thus possibly intercepting multiple methods at once or intercepting and publishing methods to which you don’t have the source code.
To configure Message Publishing via XML, you only need to do the following two things:
MessagePublishingInterceptor
via the <publishing-interceptor>
XML element.
MessagePublishingInterceptor
to managed objects.
<aop:config> <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" /> </aop:config> <publishing-interceptor id="interceptor" default-channel="defaultChannel"> <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel"> <header name="foo" value="bar"/> </method> <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel"> <header name="foo" expression="'bar'.toUpperCase()"/> </method> <method pattern="echoDef*" payload="#return"/> </publishing-interceptor>
As you can see the <publishing-interceptor>
configuration looks rather similar to the Annotation-based approach, and it also utilizes the power of the Spring 3.0 Expression Language.
In the above example the execution of the echo
method of a testBean
will render a Message with the following structure:
value
is the value returned by an executed method.
echoChannel
.
The second method is very similar to the first. Here every method that begins with repl will render a Message with the following structure:
'bar'.toUpperCase()
.
echoChannel
.
The second method, mapping the execution of any method that begins with echoDef
of testBean
, will produce a Message with the following structure.
channel
attribute is not provided explicitly, the Message will be sent to the defaultChannel
defined by the publisher.
For simple mapping rules you can rely on the publisher defaults. For example:
<publishing-interceptor id="anotherInterceptor"/>
This will map the return value of every method that matches the pointcut expression to a payload and will be sent to a default-channel. If the defaultChannel_is not specified (as above) the messages will be sent to the global _nullChannel.
Async Publishing
One important thing to understand is that publishing occurs in the same thread as your component’s execution. So by default in is synchronous. This means that the entire message flow would have to wait until the publisher’s flow completes. However, quite often you want the complete opposite and that is to use this Message publishing feature to initiate asynchronous sub-flows. For example, you might host a service (HTTP, WS etc.) which receives a remote request.You may want to send this request internally into a process that might take a while. However you may also want to reply to the user right away. So, instead of sending inbound requests for processing via the output channel (the conventional way), you can simply use output-channel or a replyChannel header to send a simple acknowledgment-like reply back to the caller while using the Message publisher feature to initiate a complex flow.
EXAMPLE: Here is the simple service that receives a complex payload, which needs to be sent further for processing, but it also needs to reply to the caller with a simple acknowledgment.
public String echo(Object complexPayload) { return "ACK"; }
So instead of hooking up the complex flow to the output channel we use the Message publishing feature instead. We configure it to create a new Message using the input argument of the service method (above) and send that to the localProcessChannel. And to make sure this sub-flow is asynchronous all we need to do is send it to any type of asynchronous channel (ExecutorChannel in this example).
<int:service-activator input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/> <bean id="sampleservice" class="test.SampleService"/> <aop:config> <aop:advisor advice-ref="interceptor" pointcut="bean(sampleservice)" /> </aop:config> <int:publishing-interceptor id="interceptor" > <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel"> <int:header name="sample_header" expression="'some sample value'"/> </int:method> </int:publishing-interceptor> <int:channel id="localProcessChannel"> <int:dispatcher task-executor="executor"/> </int:channel> <task:executor id="executor" pool-size="5"/>
Another way of handling this type of scenario is with a wire-tap.
In the above sections we looked at the Message publishing feature of Spring Integration which constructs and publishes messages as by-products of Method invocations.
However in those cases, you are still responsible for invoking the method.
In Spring Integration 2.0 we’ve added another related useful feature: support for scheduled Message producers/publishers via the new "expression" attribute on the inbound-channel-adapter element.
Scheduling could be based on several triggers, any one of which may be configured on the poller sub-element.
Currently we support cron
, fixed-rate
, fixed-delay
as well as any custom trigger implemented by you and referenced by the trigger attribute value.
As mentioned above, support for scheduled producers/publishers is provided via the <inbound-channel-adapter> xml element. Let’s look at couple of examples:
<int:inbound-channel-adapter id="fixedDelayProducer" expression="'fixedDelayTest'" channel="fixedDelayChannel"> <int:poller fixed-delay="1000"/> </int:inbound-channel-adapter>
In the above example an inbound Channel Adapter will be created which will construct a Message with its payload being the result of the expression defined in the expression
attribute.
Such messages will be created and sent every time the delay specified by the fixed-delay
attribute occurs.
<int:inbound-channel-adapter id="fixedRateProducer" expression="'fixedRateTest'" channel="fixedRateChannel"> <int:poller fixed-rate="1000"/> </int:inbound-channel-adapter>
This example is very similar to the previous one, except that we are using the fixed-rate
attribute which will allow us to send messages at a fixed rate (measuring from the start time of each task).
<int:inbound-channel-adapter id="cronProducer" expression="'cronTest'" channel="cronChannel"> <int:poller cron="7 6 5 4 3 ?"/> </int:inbound-channel-adapter>
This example demonstrates how you can apply a Cron trigger with a value specified in the cron
attribute.
<int:inbound-channel-adapter id="headerExpressionsProducer" expression="'headerExpressionsTest'" channel="headerExpressionsChannel" auto-startup="false"> <int:poller fixed-delay="5000"/> <int:header name="foo" expression="6 * 7"/> <int:header name="bar" value="x"/> </int:inbound-channel-adapter>
Here you can see that in a way very similar to the Message publishing feature we are enriching a newly constructed Message with extra Message headers which can take scalar values or the results of evaluating Spring expressions.
If you need to implement your own custom trigger you can use the trigger
attribute to provide a reference to any spring configured bean which implements the org.springframework.scheduling.Trigger
interface.
<int:inbound-channel-adapter id="triggerRefProducer" expression="'triggerRefTest'" channel="triggerRefChannel"> <int:poller trigger="customTrigger"/> </int:inbound-channel-adapter> <beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger"> <beans:constructor-arg value="9999"/> </beans:bean>