30. Message Publishing

The AOP Message Publishing feature allows you to construct and send a message as a by-product of a method invocation. For example, imagine you have a component and every time the state of this component changes you would like to be notified via a Message. The easiest way to send such notifications would be to send a message to a dedicated channel, but how would you connect the method invocation that changes the state of the object to a message sending process, and how should the notification Message be structured? The AOP Message Publishing feature handles these responsibilities with a configuration-driven approach.

30.1 Message Publishing Configuration

Spring Integration provides two approaches: XML and Annotation-driven.

30.1.1 Annotation-driven approach via @Publisher annotation

The annotation-driven approach allows you to annotate any method with the @Publisher annotation, specifying a 'channel' attribute. The Message will be constructed from the return value of the method invocation and sent to a channel specified by the 'channel' attribute. To further manage message structure, you can also use a combination of both @Payload and @Header annotations.

Internally this message publishing feature of Spring Integration uses both Spring AOP by defining PublisherAnnotationAdvisor and Spring 3.0's Expression Language (SpEL) support, giving you considerable flexibility and control over the structure of the Message it will publish.

The PublisherAnnotationAdvisor defines and binds the following variables:

  • #return - will bind to a return value allowing you to reference it or its attributes (e.g., #return.foo where 'foo' is an attribute of the object bound to #return)

  • #exception - will bind to an exception if one is thrown by the method invocation.

  • #args - will bind to method arguments, so individual arguments could be extracted by name (e.g., #args.fname as in the above method)

Let's look at a couple of examples:

@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}

In the above example the Message will be constructed with the following structure:

  • Message payload - will be the return type and value of the method. This is the default.

  • A newly constructed message will be sent to a default publisher channel configured with an annotation post processor (see the end of this section).

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}

In this example everything is the same as above, except that we are not using a default publishing channel. Instead we are specifying the publishing channel via the 'channel' attribute of the @Publisher annotation. We are also adding a @Header annotation which results in the Message header named 'last' having the same value as the 'lname' method parameter. That header will be added to the newly constructed Message.

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}

The above example is almost identical to the previous one. The only difference here is that we are using a @Payload annotation on the method, thus explicitly specifying that the return value of the method should be used as the payload of the Message.

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}

Here we are expanding on the previous configuration by using the Spring Expression Language in the @Payload annotation to further instruct the framework how the message should be constructed. In this particular case the message will be a concatenation of the return value of the method invocation and the 'lname' input argument. The Message header named 'x' will have its value determined by the 'num' input argument. That header will be added to the newly constructed Message.

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}

In the above example you see another usage of the @Payload annotation. Here we are annotating a method argument which will become the payload of the newly constructed message.

As with most other annotation-driven features in Spring, you will need to register a post-processor (PublisherAnnotationBeanPostProcessor).

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

You can instead use namespace support for a more concise configuration:

<int:annotation-config default-publisher-channel="defaultChannel"/>

Similar to other Spring annotations (@Component, @Scheduled, etc.), @Publisher can also be used as a meta-annotation. That means you can define your own annotations that will be treated in the same way as the @Publisher itself.

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
}

Here we defined the @Audit annotation which itself is annotated with @Publisher. Also note that you can define a channel attribute on the meta-annotation thus encapsulating the behavior of where messages will be sent inside of this annotation. Now you can annotate any method:

@Audit
public String test() {
    return "foo";
}

In the above example every invocation of the test() method will result in a Message with a payload created from its return value. Each Message will be sent to the channel named auditChannel. One of the benefits of this technique is that you can avoid the duplication of the same channel name across multiple annotations. You also can provide a level of indirection between your own, potentially domain-specific annotations and those provided by the framework.

You can also annotate the class which would mean that the properties of this annotation will be applied on every public method of that class.

@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .
  }

  public String credit(String amount) {
     . . .
  }
}

30.1.2 XML-based approach via the <publishing-interceptor> element

The XML-based approach allows you to configure the same AOP-based Message Publishing functionality with simple namespace-based configuration of a MessagePublishingInterceptor. It certainly has some benefits over the annotation-driven approach since it allows you to use AOP pointcut expressions, thus possibly intercepting multiple methods at once or intercepting and publishing methods to which you don't have the source code.

To configure Message Publishing via XML, you only need to do the following two things:

  • Provide configuration for MessagePublishingInterceptor via the <publishing-interceptor> XML element.

  • Provide AOP configuration to apply the MessagePublishingInterceptor to managed objects.

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="foo" value="bar"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="foo" expression="'bar'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

As you can see the <publishing-interceptor> configuration looks rather similar to the Annotation-based approach, and it also utilizes the power of the Spring 3.0 Expression Language.

In the above example the execution of the echo method of a testBean will render a Message with the following structure:

  • The Message payload will be of type String with the content "Echoing: [value]" where value is the value returned by an executed method.

  • The Message will have a header with the name "foo" and value "bar".

  • The Message will be sent to echoChannel.

The second method is very similar to the first. Here every method that begins with 'repl' will render a Message with the following structure:

  • The Message payload will be the same as in the above sample

  • The Message will have a header named "foo" whose value is the result of the SpEL expression 'bar'.toUpperCase() .

  • The Message will be sent to echoChannel.

The second method, mapping the execution of any method that begins with echoDef of testBean, will produce a Message with the following structure.

  • The Message payload will be the value returned by an executed method.

  • Since the channel attribute is not provided explicitly, the Message will be sent to the defaultChannel defined by the publisher.

For simple mapping rules you can rely on the publisher defaults. For example:

<publishing-interceptor id="anotherInterceptor"/>
  

This will map the return value of every method that matches the pointcut expression to a payload and will be sent to a default-channel. If the defaultChannelis not specified (as above) the messages will be sent to the global nullChannel.

Async Publishing

One important thing to understand is that publishing occurs in the same thread as your component's execution. So by default in is synchronous. This means that the entire message flow would have to wait until the publisher's flow completes.  However, quite often you want the complete opposite and that is to use this Message publishing feature to initiate asynchronous sub-flows. For example, you might host a service (HTTP, WS etc.) which receives a remote request.You may want to send this request internally into a process that might take a while. However you may also want to reply to the user right away. So, instead of sending inbound requests for processing via the output channel (the conventional way), you can simply use 'output-channel' or a 'replyChannel' header to send a simple acknowledgment-like reply back to the caller while using the Message publisher feature to initiate a complex flow.

EXAMPLE: Here is the simple service that receives a complex payload, which needs to be sent further for processing, but it also needs to reply to the caller with a simple acknowledgment.

public String echo(Object complexPayload) {
     return "ACK"; 
}

So instead of hooking up the complex flow to the output channel we use the Message publishing feature instead. We configure it to create a new Message using the input argument of the service method (above) and send that to the 'localProcessChannel'. And to make sure this sub-flow is asynchronous all we need to do is send it to any type of asynchronous channel (ExecutorChannel in this example).

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleservice" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleservice)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>

Another way of handling this type of scenario is with a wire-tap.

30.1.3 Producing and publishing messages based on a scheduled trigger

In the above sections we looked at the Message publishing feature of Spring Integration which constructs and publishes messages as by-products of Method invocations. However in those cases, you are still responsible for invoking the method. In Spring Integration 2.0 we've added another related useful feature: support for scheduled Message producers/publishers via the new "expression" attribute on the 'inbound-channel-adapter' element. Scheduling could be based on several triggers, any one of which may be configured on the 'poller' sub-element. Currently we support cron, fixed-rate, fixed-delay as well as any custom trigger implemented by you and referenced by the 'trigger' attribute value.

As mentioned above, support for scheduled producers/publishers is provided via the <inbound-channel-adapter> xml element. Let's look at couple of examples:

<int:inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>

In the above example an inbound Channel Adapter will be created which will construct a Message with its payload being the result of the expression  defined in the expression attribute. Such messages will be created and sent every time the delay specified by the fixed-delay attribute occurs.

<int:inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

This example is very similar to the previous one, except that we are using the fixed-rate attribute which will allow us to send messages at a fixed rate (measuring from the start time of each task).

<int:inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>

This example demonstrates how you can apply a Cron trigger with a value specified in the cron attribute.

<int:inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <int:poller fixed-delay="5000"/>
    <int:header name="foo" expression="6 * 7"/>
    <int:header name="bar" value="x"/>
</int:inbound-channel-adapter>

Here you can see that in a way very similar to the Message publishing feature we are enriching a newly constructed Message with extra Message headers which can take scalar values or the results of evaluating Spring expressions.

If you need to implement your own custom trigger you can use the trigger attribute to provide a reference to any spring configured bean which implements the org.springframework.scheduling.Trigger interface.

<int:inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>