17. Message Publishing

The AOP Message Publishing feature allows you to construct and send a message as a by-product of method invocation. For example, imagine you have a component and every time the state of this component changes you would like to be notified via a Message. The easiest way to send such notifications would be to send a message to a dedicated channel, but how would you connect the method invocation that changes the state of the object to a message sending process, and how should the notification Message be structured? The AOP Message Publishing feature handles these responsibilities with a configuration-driven approach.

17.1 Message Publishing Configuration

Spring Integration provides two approaches: XML and Annotation-driven.

17.1.1 Annotation-driven approach via @Publisher annotation

The annotation-driven approach allows you to annotate any method with the @Publisher annotation, specifying 'channel' attribute. The Message will be constructed from the return value of method invocation and sent to a channel specified by 'channel' attribute. To further manage message structure you can also use a combination of both @Payload and @Header annotations.

Internally message publishing feature of Spring Integration uses both Spring AOP by defining PublisherAnnotationAdvisor and Spring 3.0 Expression Language (SpEL) support, giving you considerable flexibility and control over the structure of the Message it will build.

PublisherAnnotationAdvisor defines and binds the following variables:

  • #return - will bind to a return value allowing you to reference it or its attributes (e.g., #return.foo where 'foo' is an attribute of the object bound to #return)

  • #exception - will bind to an exception if one is thrown by the method invocation.

  • #args - will bind to method arguments, so individual arguments could be extracted by name (e.g., #args.fname as in the above method)

Let's look at couple of examples:

@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}

In the above example the Message will be constructed with the following structure:

  • Message payload - will be the return type and value of the method. This is the default.

  • A newly constructed message will be sent to a default publisher channel configured with annotation post processor (see the end of this section).

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}

In this example everything is the same as above, however we are not using default publishing channel. Instead we are specifying the publishing channel via 'channel' attribute of @Publisher annotation. We are also adding @Header annotation which results in the Message header with the name 'last' and the value of 'lname' input parameter to be added to the newly constructed Message.

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}

The above example is almost identical to the previous one. The only difference here is that we are using @Payload annotation on the method, thus explicitly specifying that the return value of the method should be used as a payload of the Message.

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}

Here we are expending on the previous configuration by using Spring Expression language in the @Payload annotation further instructing the framework on how the message should be constructed. In this particular case the message will be a concatenation of the return value of the method invocation and 'lname' input argument. Message header 'x' with value of 'num' input argument will be added to the newly constructed Message.

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}

In the above example you see another usage of @Payload annotation. Here we are annotating method argument which will become a payload of newly constructed message.

As with most other annotation-driven features in Spring, you will need to register a post-processor (PublisherAnnotationBeanPostProcessor).

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

You can also use namespace support for added convenience:

<si:annotation-config default-publisher-channel="defaultChannel"/>

Similar to other Spring annotations (e.g., @Controller), @Publisher is a meta-annotation, which means you can define your own annotations that will be treated as @Publisher

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
}

Here we defined @Audit annotation which itself is a @Publisher. Also note that you can define channel attribute on the meta-annotation thus encapsulating the behavior of where messages will be sent inside of this annotation. Now you can annotate any method:

@Audit
public String test() {
    return "foo";
}

In the above example every invocation of test() method will result in Message with payload which is the return value of the method invocation to be sent to auditChannel You can also annotate the class which would mean that the properties of this annotation will be applied on every public method of this class

@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .
  }

  public String credit(String amount) {
     . . .
  }
}

17.1.2 XML-based approach via <publishing-interceptor> element

The XML-based approach allows you to configure the same AOP-based Message Publishing functionality with simple namespace-based configuration of a MessagePublishingInterceptor. It certainly has some benefits over the annotation-driven approach since it allows you to use AOP pointcut expressions, thus possibly intercepting multiple methods at once or intercepting and publishing methods to which you don't have the source code.

To configure Message Publishing via XML, you only need to do the following two things:

  • Provide configuration for MessagePublishingInterceptor via the <publishing-interceptor> XML element.

  • Provide AOP configuration to apply the MessagePublishingInterceptor to managed objects.

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="foo" value="bar"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="foo" expression="'bar'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

As you can see the <publishing-interceptor> configuration look rather similar to Annotation-based approach and it also utilizes the power of the Spring 3.0 Expression Language.

In the above example the execution of the echo method of a testBean will render a Message with the following structure:

  • The Message payload will be of type String and value of "Echoing: [value]" where value is the value returned by an executed method.

  • The Message will have header with the key "foo" value "bar".

  • The Message will be sent to echoChannel.

The second method is very similar to the first. Here every method that begins with 'repl' will render a Message with the following structure:

  • The Message payload will be the same as in the above sample

  • The Message will have header with the key "foo" and value that is the result of the SpEL expression 'bar'.toUpperCase() .

  • The Message will be sent to echoChannel.

The second method, mapping the execution of any method that begins with echoDef of testBean, will produce a Message with the following structure.

  • The Message payload will be the value returned by an executed method.

  • Since the channel attribute is not provided explicitly, the Message will be sent to the defaultChannel defined by the publisher.

For simple mapping rules you can rely on the publisher defaults. For example:

<publishing-interceptor id="anotherInterceptor"/>
  

This will map the return value of every method that matches the pointcut expression to a payload and will be sent to a default-channel. If the defaultChannelis not specified (as above) the messages will be sent to the global nullChannel.

Async Publishing

One important thing to understand is that publishing occurs in the same thread as your component's execution. So by default in is synchronous. This means that the entire message flow would have to wait until he publisher flow completes.  However, quite often you want the complete opposite and that is to use Message publishing feature to initiate asynchronous sub-flows. For example, you might host a service (HTTP, WS etc.) which receives a remote request.You may want to send this request internally into a process that might take a while. However you may also want to reply to the user right away. So, instead of sending inbound request for processing via the output channel (the conventional way), you can simply use ''outout-channel or $replyChannel'' header to send simple acknowledgment-like reply back to the caller while using Message publisher feature to initiate a complex flow.

EXAMPLE: Here is the simple service that receives a complex payload, which needs to be sent further for processing, but it also need to reply to the caller with a simple acknowledgment.

public String echo(Object complexPayload){
     return "ACK"; 
}

So instead of hooking up the complex flow to the output channel we use Message publishing feature instead configuring it to create a new Message using the input argument of the service method (above) and sending it to the 'localProcessChannel'. And to make sure this sub-flow is asynchronous all we need to do is make sure that we send it to any type of async channel (ExecutorChannel in this example).

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleservice" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleservice)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>
<task:executor id="executor" pool-size="5"/>

Another way of handling thi type of scenario is through wire-tap

17.1.3 Producing and publishing messages based on a scheduled trigger

In the above sections we looked at the Message publishing feature of Spring Integration which constructs and publishes messages as by-products of Method invocations. However in that case, you are still responsible for invoking the method. In Spring Integration 2.0 we've added another related useful feature: support for scheduled Message producers/publishers via the new "expression" attribute on the 'inbound-channel-adapter' element. Scheduling could be based on several triggers, any one of which may be configured on the 'poller' sub-element. Currently we support cron, fixed-rate, fixed-delay as well as any custom trigger implemented by you.

As mentioned above, support for scheduled producers/publishers is provided via the <inbound-channel-adapter> xml element. Let's look at couple of examples:

<inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <poller fixed-delay="1000"/>
</inbound-channel-adapter>

In the above example an inbound Channel Adapter will be created which will construct a Message with its payload being the result of the expression  defined in the expression attribute. Such message will be created and sent every time after the delay specified by the fixed-delay attribute.

<inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <poller fixed-rate="1000"/>
</inbound-channel-adapter>

This example is very similar to the previous one, except that we are using the fixed-rate attribute which will allow us to send messages at a fixed rate (measuring from the start time of each task).

<inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <poller cron="7 6 5 4 3 ?"/>
</inbound-channel-adapter>

This example demonstrates how you can apply a Cron trigger with a value specified in the cron attribute.

<inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <poller fixed-delay="5000"/>
    <header name="foo" expression="6 * 7"/>
    <header name="bar" value="x"/>
</inbound-channel-adapter>

Here you can see that in a way very similar to the Message publishing feature we are enriching a newly constructed Message with extra Message headers which could take scalar values as well as the results of evaluating Spring expressions.

If you need to implement your own custom trigger you can use the trigger attribute to provide a reference to any spring configured bean which implements the org.springframework.scheduling.Trigger interface.

<inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <poller trigger="customTrigger"/>
</inbound-channel-adapter>

<beans:bean id="customTrigger" class="org.springframework.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>