5. Oracle's Streams AQ (Advanced Queueing)

Oracle Streams is a feature that enables the propagation and management of data, transactions and events in a data stream either within a database, or from one database to another. This can be used both for replication and for messaging purposes. The Advanced Queuing (AQ) feature provides the messaging support. This messaging support will integrate with the standard JMS API provided with Java. Since the AQ support runs in the database it is possible to use the same transaction for both messaging and database access. This eliminates the need for expensive 2-phase commit processing that would be necessary when integrating database access with a traditional JMS solution.

Most of the JMS support we discuss in this chapter is provided directly by the Spring Framework. See the Spring Framework Reference Documentation for the details regarding this JMS support.

In addition to this standard support, The Advance Pack for Oracle Database provides easier configuration of a connection factory using the <orcl> namespace. It also provides support for some payload types not directly supported by the Spring JMS support like the XMLType and custom Advanced Data Types.

5.1 Supported payload types

JMS and Oracle Streams AQ can support a variety of payloads. These payloads are stored in the database and need to be converted to a Java representation in order for our programs to manipulate them. The following table outlines what payloads are supported and the corresponding classes that will work with these payloads and that will be able to convert them to and from a Java representation.

Table 5.1. supported payload types

Payload TypeSupport Notes
SYS.AQ$_JMS_TEXT_MESSAGE, SYS.AQ$_JMS_MAP_MESSAGE, SYS.AQ$_JMS_OBJECT_MESSAGE, SYS.AQ$_JMS_BYTES_MESSAGEDirectly supported by SimpleMessageConverter which is the default for the JmsTemplate and the DefaultMessageListenerContainer. When configuring a message listener container the DefaultMessageListenerContainer is the class that supports the Oracle AQ JMS features.
SYS.XMLTypeThis payload type requires a custom message listener container named XmlMessageListenerContainer. This listener container also needs a MessageListenerAdapter with an Oracle AQ XML specific message converter specified as XmlMessageConverter. See below for configuration details.
custom Advanced Data Type (ADT) (CREATE TYPE xxx AS OBJECT)This payload type requires a custom message listener container named AdtMessageListenerContainer. This listener container also can use a MessageListenerAdapter with a Oracle AQ ADT specific message converter specified as MappingAdtMessageConverter. This converter works with an implementation of the DatumMapper interface. See below for configuration details.


5.2 Configuration of the Connection Factory using the "orcl" namespace

When you use the JmsTemplate together with the Oracle AQ JMS support you can use the aq-jms-connection-factory entry to provide a connection factory to the JmsTemplate.

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:orcl="http://www.springframework.org/schema/data/orcl" 1
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/data/orcl
       http://www.springframework.org/schema/data/orcl/spring-data-orcl-1.0.xsd"> 2

 
    <orcl:aq-jms-connection-factory id="connectionFactory" 
            data-source="dataSource"/> 3

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="sessionTransacted" value="true"/>
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

    <bean id="transactionManager"
          class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <bean id="dataSource" ... />

</beans>

1

Here we specify the reference to the orcl schema.

2

We also specify the location for the orcl schema.

3

The connection factory is configured using a reference to the data source to be used.

The configuration for a Message-Driven POJO with a MessageListenerContainer is very similar. You use the same type of connection factory configuration. This is passed in to the listener container configuration. Here is an example using the JMS namespace support.

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:orcl="http://www.springframework.org/schema/data/orcl" 1
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/tx
       http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/data/orcl 2
       http://www.springframework.org/schema/data/orcl/spring-data-orcl-1.0.xsd
       http://www.springframework.org/schema/jms
       http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <context:annotation-config/>
    
    <tx:annotation-driven/>
    
    <bean id="messageDelegate" class="spring.test.MessageDelegate"/>
    
    <jms:listener-container connection-factory="connectionFactory" 
            transaction-manager="transactionManager">
        <jms:listener destination="jmsadmin.jms_text_queue" 
                ref="messageDelegate" method="handleMessage"/>
    </jms:listener-container> 3
    
    <orcl:aq-jms-connection-factory id="connectionFactory" 
            data-source="dataSource"/> 4
    
    <bean id="transactionManager"
          class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

    <bean id="dataSource" ... />

</beans>

1

Here we specify the reference to the orcl and jms schemas.

2

We also specify the location for the orcl and jms schemas.

4

The listener container is configured using a reference to the connection factory.

4

The connection factory is configured using a reference to the data source to be used.

See the next section for how to configure the transaction support and use a the same local transaction as the JDBC or ORM data access.

5.3 Configuring the Connection Factory to use the same local transaction as your data access code.

The configurations in the previous section will take advantage of the transaction synchronization provided by Spring, but there will be two transactions. One transaction for the data access and one for the JMS messaging. They will be synchronized, so if the data access transaction commits then the messaging transaction will also commit while if the data access transaction roll back then the messaging transaction will also roll back.

There is always a chance that the commit for the messaging transaction could fail after the data access transaction has committed successfully. This is of course a problem that you would have to account for in your code by checking for duplicate delivery of a message.

A better solution is to configure both data access and the messaging to share a transaction. Most often this is done using JTA, and that works, but has some impact on performance. For JTA you need to use distributed transactions and XA capable resources designed for two-phase commits. This comes at an extra cost that we try to avoid if possible.

Another option is to have the data access and the messaging share a local data access transaction. This is possible since the Oracle AQ implementation consists of a set of tables and stored procedures running in the database accessed through a standard JDBC connection. If you use the same database for data access and messaging with AQ, then you can configure the connection factory to share the database connection and the local transaction. You configure this connection and transaction sharing by setting the attribute use-local-data-source-transaction to true.

    <orcl:aq-jms-connection-factory id="connectionFactory"
        use-local-data-source-transaction="true" 1
        data-source="dataSource"/>

1

Setting the attribute use-local-data-source-transaction.

Configuring the connection factory to share a local data source transaction with the data access code has some implications for JMS connection and session caching. You can still configure a MessageListenerContainer to cache the JMS connection since each JMS session will be created as it's needed inside a data source transaction. However, if you cache the JMS session, then the database connection for it is established when the container starts up and it will not be possible to have this cached JMS session participite in the local data source transaction.

In many application server environments the JDBC connection is wrapped in an implementation specific class that delegates to the underlying native JDBC connection. Oracle's AQ connection factory needs the native Oracle connection and will throw an "oracle.jms.AQjmsException: JMS-112: Connection is invalid" exception if the connection is wrapped by a foreign class. To solve this problem you can specify a NativeJdbcExtractor that can be used to unwrap the connection. Spring provides a number of implementations to match the application server environment. Here is an example for specifying a NativeJdbcExtractor.

    <orcl:aq-jms-connection-factory id="connectionFactory"
        use-local-data-source-transaction="true"
        native-jdbc-extractor="dbcpNativeJdbcExtractor" 1
        data-source="dataSource" />

    <bean id="dbcpNativeJdbcExtractor" 
        class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>

    <bean id="dbcpDataSource" class="org.apache.commons.dbcp.BasicDataSource" 
            destroy-method="close">
        <property name="driverClassName" value="${jdbc.driverClassName}" />
        <property name="url" value="${jdbc.url}" />
        <property name="username" value="${jdbc.username}" />
        <property name="password" value="${jdbc.password}" />
    </bean>

1

Here we specify the reference to the native JDBC extractor.

For some use cases the default plain ConnectionFactory does not work and you need to explicitly use a QueueConnectionFactory or a TopicConnectionFactory. To support this requirement it is possible to specify this using the connection-factory-type attribute. The default is CONNECTION but you can specify QUEUE_CONNECTION or TOPIC_CONNECTION instead. Here is an example for specifying the connection factory type.

    <orcl:aq-jms-connection-factory id="connectionFactory"
        use-local-data-source-transaction="true"
        connection-factory-type="QUEUE_CONNECTION" 1
        data-source="dataSource" />

    <bean id="dbcpDataSource" class="org.apache.commons.dbcp.BasicDataSource"
            destroy-method="close">
        <property name="driverClassName" value="${jdbc.driverClassName}" />
        <property name="url" value="${jdbc.url}" />
        <property name="username" value="${jdbc.username}" />
        <property name="password" value="${jdbc.password}" />
    </bean>

1

Here we specify the type of connection factory to be used.

5.4 Configuration when using a SYS.XMLType payload

When you use a SYS.XMLType as payload there a few additional configuration settings are needed.

5.4.1 Enqueuing XML messages

When enqueuing messages the JmsTemplate can be configured with a message converter. This message converter should be of a type XmlMessageConverter configured with a specific XmlTypeHandler that you would like to use. The following handlers are available:

Table 5.2. xml handlers

XML HandlerUsage
StringXmlTypeHandlerHandles converting XMLTypes values to and from String representation.
DocumentXmlTypeHandlerHandles converting XMLTypes values to and from Document representation.
StreamXmlTypeHandlerHandles converting XMLTypes values to and from an InputStream.


<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"/> 1
    <property name="messageConverter">
        <bean id="messageConverter"2
           class="org.springframework.data.jdbc.jms.support.converter.oracle.XmlMessageConverter">
            <constructor-arg>
                <bean class="org.springframework.data.jdbc.support.oracle.StringXmlTypeHandler"/> 3
            </constructor-arg>
        </bean>
    </property>
</bean>

1

A reference to the configured connection factory.

2

Declaration of an XmlMessageConverter to convert from XMLType to desired representation.

3

Declaration of the specific XmlTypeHandler that should be used. In this case a StringXmlTypeHandler.

Once the JmsTemplate is configured the XML value can be sent using the convertAndSend method. In this example we are passing in a String containing the value.

        String xmlval = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
                "<product id=\"10\">\n" +
                " <description>Foo</description>\n" +
                " <price>2.05</price>\n" +
                "</product>";

        jmsTemplate.convertAndSend("jmsadmin.jms_xml_queue", xmlval);

5.4.2 Dequeuing XML messages

When you want to dequeue messages using a message listener container you need to configure an XmlMessageListenerContainer that can dequeue the messages and convert the XMLType payload.

<bean id="messageDelegate" class="org.springframework.data.jdbc.test.xml.MessageDelegate" />

<jms:listener-container connection-factory="connectionFactory" 1
    transaction-manager="transactionManager"
    message-converter="messageConverter" 
    container-class="org.springframework.data.jdbc.jms.listener.oracle.XmlMessageListenerContainer"> 2
    <jms:listener destination="jmsadmin.jms_xml_queue" 
        ref="messageDelegate" method="handleMessage">
    </jms:listener>
</jms:listener-container>

<bean id="messageConverter" 3
   class="org.springframework.data.jdbc.jms.support.converter.oracle.XmlMessageConverter">
    <constructor-arg>
        <bean class="org.springframework.data.jdbc.support.oracle.DocumentXmlTypeHandler"/> 4
    </constructor-arg>
</bean>

1

A reference to the configured connection factory.

2

Configuring the class to use for the container - this is a custom class XmlMessageListenerContainer that dequeues the Oracle XMLType messages.

3

The XmlMessageConverter is defined here.

4

The DocumentXmlTypeHandler is used to retrieve XML value as a Document.

Here is an example of the message delegate used in the above message listener container:

public class MessageDelegate {

    @Autowired
    private DomainService domainService;

    public void handleMessage(Document xmlDoc) 
            throws MessageConversionException, JMSException {
        domainService.processXmlMessage(xmlDoc);
    }

}

As you can see the method that handles the message takes a Document as its parameter. The conversion from the XMLType to a Document representation is handled by the MessageListenerAdapter since we specified a message converter.

5.5 Configuration when using a custom ADT payload

When you use a custom ADT as payload there are certain configuration settings that are needed. When creating the queue and its queue table you specify the custom type as the "queue_payload_type". This custom type is defined using a regular "CREATE TYPE" statement. In the code example that follow we have defined a PRODUCT type:

create or replace TYPE PRODUCT_TYPE AS OBJECT
(
  id INTEGER,
  description VARCHAR(50),
  price DECIMAL(12,2)
);

5.5.1 Enqueuing ADT messages

When enqueuing messages the JmsTemplate can be configured with a message converter. This message converter should be of a type MappingAdtMessageConverter configured with a specific DatumMapper that you would like to use. This DatumMapper can be a custom implementation or the provided StructDatumMapper that will map between bean properties and STRUCT attributes of the same name.

The DatumMapper interface has the following methods declared:

public interface DatumMapper {

    public Datum toDatum(Object object, Connection conn) throws SQLException;

    public Object fromDatum(Datum datum) throws SQLException;

}

The toDatum method will be called with the Object to convert to a STRUCT as the first parameter and the current connection as the second. It's up to the mapping implementation to extract the object properties and to create the STRUCT. For the fromDatum method the STRUCT is passed in and the implementation is responsible for retrieving the attributes and constructing and instance of the required class.

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  <property name="connectionFactory" ref="connectionFactory"/> 1
  <property name="messageConverter">
    <bean id="messageConverter"
       class="org.springframework.data.jdbc.jms.support.converter.oracle.MappingAdtMessageConverter">2
       <constructor-arg>
         <bean class="org.springframework.data.jdbc.jms.support.oracle.StructDatumMapper"> 3
           <constructor-arg index="0" value="JMSADMIN.PRODUCT_TYPE"/>
           <constructor-arg index="1" value="org.springframework.data.jdbc.test.domain.Product"/>
         </bean>
       </constructor-arg>
    </bean>
  </property>
</bean>

1

A reference to the configured connection factory.

2

Declaration of an MappingAdtMessageConverter to convert from custom type to corresponding JavaBean.

3

Declaration of the specific DatumMapper that should be used. In this case the provided StructDatumMapper.

Once the JmsTemplate is configured the XML value can be sent using the convertAndSend method. In this example we are passing in a String containing the value.

        Product product = new Product();
        product.setId(22L);
        product.setDescription("Foo");
        product.setPrice(new BigDecimal("42.95"));

        jms.convertAndSend("jmsadmin.jms_product_queue", product);

5.5.2 Dequeuing ADT messages

When you want to dequeue messages using a message listener container you need to configure an AdtMessageListenerContainer that can dequeue the messages and convert the ADT payload.

<bean id="messageDelegate" class="org.springframework.data.jdbc.test.adt.MessageDelegate" />

<jms:listener-container connection-factory="connectionFactory" 1
    transaction-manager="transactionManager"
    message-converter="messageConverter" 
    container-class="org.springframework.data.jdbc.jms.listener.oracle.AdtMessageListenerContainer"> 2
  <jms:listener destination="jmsadmin.jms_product_queue" 
      ref="messageDelegate" method="handleMessage">
  </jms:listener>
</jms:listener-container>

<bean id="messageConverter" 
   class="org.springframework.data.jdbc.jms.support.converter.oracle.MappingAdtMessageConverter">3
  <constructor-arg>
    <bean class="org.springframework.data.jdbc.jms.support.oracle.StructDatumMapper"> 4
      <constructor-arg index="0" value="JMSADMIN.PRODUCT_TYPE"/>
      <constructor-arg index="1" value="org.springframework.data.jdbc.test.domain.Product"/>
    </bean>
  </constructor-arg>
</bean>

1

A reference to the configured connection factory.

1

Configuring the class to use for the container - this is a custom class AdtMessageListenerContainer that dequeues the ADT messages.

3

The MappingAdtMessageConverter is defined here.

4

The StructDatumMapper is used to map the attributes of the STRUCT retrieved for the ADT to properties of the bean class specified as the second constructor argument.

Here is an example of the message delegate used in the above message listener container:

public class MessageDelegate {

    @Autowired
    private DomainService domainService;

    public void handleMessage(Product product) 
            throws MessageConversionException, JMSException {
        domainService.saveProduct(product);
    }

}

As you can see the method that handles the message takes a Product as its parameter. The conversion from the STRUCT to a Product is handled by the MessageListenerAdapter since we specified a message converter.