Oracle Streams is a feature that enables the propagation and management of data, transactions and events in a data stream either within a database, or from one database to another. This can be used both for replication and for messaging purposes. The Advanced Queuing (AQ) feature provides the messaging support. This messaging support will integrate with the standard JMS API provided with Java. Since the AQ support runs in the database it is possible to use the same transaction for both messaging and database access. This eliminates the need for expensive 2-phase commit processing that would be necessary when integrating database access with a traditional JMS solution.
Most of the JMS support we discuss in this chapter is provided directly by the Spring Framework. See the Spring Framework Reference Documentation for the details regarding this JMS support.
In addition to this standard support, The Advance Pack for Oracle
Database provides easier configuration of a connection factory using the
<orcl>
namespace. It also provides support for
some payload types not directly supported by the Spring JMS support like the
XMLType
and custom Advanced Data Types.
JMS and Oracle Streams AQ can support a variety of payloads. These payloads are stored in the database and need to be converted to a Java representation in order for our programs to manipulate them. The following table outlines what payloads are supported and the corresponding classes that will work with these payloads and that will be able to convert them to and from a Java representation.
Table 5.1. supported payload types
Payload Type | Support Notes |
---|---|
SYS.AQ$_JMS_TEXT_MESSAGE,
SYS.AQ$_JMS_MAP_MESSAGE, SYS.AQ$_JMS_OBJECT_MESSAGE,
SYS.AQ$_JMS_BYTES_MESSAGE | Directly supported by
SimpleMessageConverter which is the
default for the JmsTemplate and the
DefaultMessageListenerContainer . When
configuring a message listener container the
DefaultMessageListenerContainer is the
class that supports the Oracle AQ JMS features. |
SYS.XMLType | This payload type requires a custom message listener
container named
XmlMessageListenerContainer . This
listener container also needs a
MessageListenerAdapter with an Oracle AQ
XML specific message converter specified as
XmlMessageConverter . See below for
configuration details. |
custom Advanced Data Type (ADT) (CREATE TYPE xxx
AS OBJECT) | This payload type requires a custom message listener
container named
AdtMessageListenerContainer . This
listener container also can use a
MessageListenerAdapter with a Oracle AQ
ADT specific message converter specified as
MappingAdtMessageConverter . This
converter works with an implementation of the
DatumMapper interface. See below for
configuration details. |
When you use the JmsTemplate together with the Oracle AQ JMS support
you can use the aq-jms-connection-factory
entry to
provide a connection factory to the JmsTemplate.
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:orcl="http://www.springframework.org/schema/data/orcl" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/data/orcl http://www.springframework.org/schema/data/orcl/spring-data-orcl-1.0.xsd"> <orcl:aq-jms-connection-factory id="connectionFactory" data-source="dataSource"/> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="sessionTransacted" value="true"/> <property name="connectionFactory" ref="connectionFactory"/> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"/> </bean> <bean id="dataSource" ... /> </beans>
Here we specify the reference to the
| |
We also specify the location for the
| |
The connection factory is configured using a reference to the data source to be used. |
The configuration for a Message-Driven POJO with a
MessageListenerContainer
is very similar. You use
the same type of connection factory configuration. This is passed in to
the listener container configuration. Here is an example using the JMS
namespace support.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:orcl="http://www.springframework.org/schema/data/orcl" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/data/orcl http://www.springframework.org/schema/data/orcl/spring-data-orcl-1.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd"> <context:annotation-config/> <tx:annotation-driven/> <bean id="messageDelegate" class="spring.test.MessageDelegate"/> <jms:listener-container connection-factory="connectionFactory" transaction-manager="transactionManager"> <jms:listener destination="jmsadmin.jms_text_queue" ref="messageDelegate" method="handleMessage"/> </jms:listener-container> <orcl:aq-jms-connection-factory id="connectionFactory" data-source="dataSource"/> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource"/> </bean> <bean id="dataSource" ... /> </beans>
Here we specify the reference to the
| |
We also specify the location for the
| |
The listener container is configured using a reference to the connection factory. | |
The connection factory is configured using a reference to the data source to be used. |
See the next section for how to configure the transaction support and use a the same local transaction as the JDBC or ORM data access.
The configurations in the previous section will take advantage of the transaction synchronization provided by Spring, but there will be two transactions. One transaction for the data access and one for the JMS messaging. They will be synchronized, so if the data access transaction commits then the messaging transaction will also commit while if the data access transaction roll back then the messaging transaction will also roll back.
There is always a chance that the commit for the messaging transaction could fail after the data access transaction has committed successfully. This is of course a problem that you would have to account for in your code by checking for duplicate delivery of a message.
A better solution is to configure both data access and the messaging to share a transaction. Most often this is done using JTA, and that works, but has some impact on performance. For JTA you need to use distributed transactions and XA capable resources designed for two-phase commits. This comes at an extra cost that we try to avoid if possible.
Another option is to have the data access and the messaging share a
local data access transaction. This is possible since the Oracle AQ
implementation consists of a set of tables and stored procedures running
in the database accessed through a standard JDBC connection. If you use
the same database for data access and messaging with AQ, then you can
configure the connection factory to share the database connection and the
local transaction. You configure this connection and transaction sharing
by setting the attribute
use-local-data-source-transaction
to
true
.
<orcl:aq-jms-connection-factory id="connectionFactory" use-local-data-source-transaction="true" data-source="dataSource"/>
Configuring the connection factory to share a local data source transaction with the data access code has some implications for JMS connection and session caching. You can still configure a MessageListenerContainer to cache the JMS connection since each JMS session will be created as it's needed inside a data source transaction. However, if you cache the JMS session, then the database connection for it is established when the container starts up and it will not be possible to have this cached JMS session participite in the local data source transaction.
In many application server environments the JDBC connection is
wrapped in an implementation specific class that delegates to the
underlying native JDBC connection. Oracle's AQ connection factory needs
the native Oracle connection and will throw an "oracle.jms.AQjmsException:
JMS-112: Connection is invalid" exception if the connection is wrapped by
a foreign class. To solve this problem you can specify a
NativeJdbcExtractor
that can be used to unwrap the
connection. Spring provides a number of implementations to match the
application server environment. Here is an example for specifying a
NativeJdbcExtractor
.
<orcl:aq-jms-connection-factory id="connectionFactory" use-local-data-source-transaction="true" native-jdbc-extractor="dbcpNativeJdbcExtractor" data-source="dataSource" /> <bean id="dbcpNativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="dbcpDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="${jdbc.driverClassName}" /> <property name="url" value="${jdbc.url}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> </bean>
For some use cases the default plain ConnectionFactory does not work and
you need to explicitly use a QueueConnectionFactory or a TopicConnectionFactory.
To support this requirement it is possible to specify this using the
connection-factory-type
attribute. The default is
CONNECTION
but you can specify QUEUE_CONNECTION
or TOPIC_CONNECTION
instead. Here is an example for specifying the
connection factory type.
<orcl:aq-jms-connection-factory id="connectionFactory" use-local-data-source-transaction="true" connection-factory-type="QUEUE_CONNECTION" data-source="dataSource" /> <bean id="dbcpDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="${jdbc.driverClassName}" /> <property name="url" value="${jdbc.url}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> </bean>
When you use a SYS.XMLType as payload there a few additional configuration settings are needed.
When enqueuing messages the JmsTemplate can be configured with a
message converter. This message converter should be of a type
XmlMessageConverter
configured with a specific
XmlTypeHandler
that you would like to use. The
following handlers are available:
Table 5.2. xml handlers
XML Handler | Usage |
---|---|
StringXmlTypeHandler | Handles converting XMLTypes values to and from String representation. |
DocumentXmlTypeHandler | Handles converting XMLTypes values to and from Document representation. |
StreamXmlTypeHandler | Handles converting XMLTypes values to and from an InputStream. |
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="messageConverter"> <bean id="messageConverter"> class="org.springframework.data.jdbc.jms.support.converter.oracle.XmlMessageConverter"> <constructor-arg> <bean class="org.springframework.data.jdbc.support.oracle.StringXmlTypeHandler"/> </constructor-arg> </bean> </property> </bean>
A reference to the configured connection factory. | |
Declaration of an
| |
Declaration of the specific
|
Once the JmsTemplate is configured the XML value can be sent using
the convertAndSend
method. In this example we are
passing in a String containing the value.
String xmlval = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + "<product id=\"10\">\n" + " <description>Foo</description>\n" + " <price>2.05</price>\n" + "</product>"; jmsTemplate.convertAndSend("jmsadmin.jms_xml_queue", xmlval);
When you want to dequeue messages using a message listener
container you need to configure an
XmlMessageListenerContainer
that can dequeue the
messages and convert the XMLType
payload.
<bean id="messageDelegate" class="org.springframework.data.jdbc.test.xml.MessageDelegate" /> <jms:listener-container connection-factory="connectionFactory" transaction-manager="transactionManager" message-converter="messageConverter" container-class="org.springframework.data.jdbc.jms.listener.oracle.XmlMessageListenerContainer"> <jms:listener destination="jmsadmin.jms_xml_queue" ref="messageDelegate" method="handleMessage"> </jms:listener> </jms:listener-container> <bean id="messageConverter" class="org.springframework.data.jdbc.jms.support.converter.oracle.XmlMessageConverter"> <constructor-arg> <bean class="org.springframework.data.jdbc.support.oracle.DocumentXmlTypeHandler"/> </constructor-arg> </bean>
A reference to the configured connection factory. | |
Configuring the class to use for the container - this is a
custom class
| |
The | |
The |
Here is an example of the message delegate used in the above message listener container:
public class MessageDelegate { @Autowired private DomainService domainService; public void handleMessage(Document xmlDoc) throws MessageConversionException, JMSException { domainService.processXmlMessage(xmlDoc); } }
As you can see the method that handles the message takes a
Document
as its parameter. The conversion from
the XMLType to a Document representation is handled by the
MessageListenerAdapter
since we specified a
message converter.
When you use a custom ADT as payload there are certain configuration settings that are needed. When creating the queue and its queue table you specify the custom type as the "queue_payload_type". This custom type is defined using a regular "CREATE TYPE" statement. In the code example that follow we have defined a PRODUCT type:
create or replace TYPE PRODUCT_TYPE AS OBJECT ( id INTEGER, description VARCHAR(50), price DECIMAL(12,2) );
When enqueuing messages the JmsTemplate can be configured with a
message converter. This message converter should be of a type
MappingAdtMessageConverter
configured with a
specific DatumMapper
that you would like to use.
This DatumMapper
can be a custom implementation
or the provided StructDatumMapper
that will map
between bean properties and STRUCT attributes of the same name.
The DatumMapper
interface has the following
methods declared:
public interface DatumMapper { public Datum toDatum(Object object, Connection conn) throws SQLException; public Object fromDatum(Datum datum) throws SQLException; }
The toDatum
method will be called with the
Object to convert to a STRUCT as the first parameter and the current
connection as the second. It's up to the mapping implementation to
extract the object properties and to create the STRUCT. For the
fromDatum
method the STRUCT is passed in and the
implementation is responsible for retrieving the attributes and
constructing and instance of the required class.
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="messageConverter"> <bean id="messageConverter" class="org.springframework.data.jdbc.jms.support.converter.oracle.MappingAdtMessageConverter"> <constructor-arg> <bean class="org.springframework.data.jdbc.jms.support.oracle.StructDatumMapper"> <constructor-arg index="0" value="JMSADMIN.PRODUCT_TYPE"/> <constructor-arg index="1" value="org.springframework.data.jdbc.test.domain.Product"/> </bean> </constructor-arg> </bean> </property> </bean>
A reference to the configured connection factory. | |
Declaration of an
| |
Declaration of the specific
|
Once the JmsTemplate is configured the XML value can be sent using
the convertAndSend
method. In this example we are
passing in a String containing the value.
Product product = new Product(); product.setId(22L); product.setDescription("Foo"); product.setPrice(new BigDecimal("42.95")); jms.convertAndSend("jmsadmin.jms_product_queue", product);
When you want to dequeue messages using a message listener
container you need to configure an
AdtMessageListenerContainer
that can dequeue the
messages and convert the ADT payload.
<bean id="messageDelegate" class="org.springframework.data.jdbc.test.adt.MessageDelegate" /> <jms:listener-container connection-factory="connectionFactory" transaction-manager="transactionManager" message-converter="messageConverter" container-class="org.springframework.data.jdbc.jms.listener.oracle.AdtMessageListenerContainer"> <jms:listener destination="jmsadmin.jms_product_queue" ref="messageDelegate" method="handleMessage"> </jms:listener> </jms:listener-container> <bean id="messageConverter" class="org.springframework.data.jdbc.jms.support.converter.oracle.MappingAdtMessageConverter"> <constructor-arg> <bean class="org.springframework.data.jdbc.jms.support.oracle.StructDatumMapper"> <constructor-arg index="0" value="JMSADMIN.PRODUCT_TYPE"/> <constructor-arg index="1" value="org.springframework.data.jdbc.test.domain.Product"/> </bean> </constructor-arg> </bean>
A reference to the configured connection factory. | |
Configuring the class to use for the container - this is a
custom class
| |
The | |
The |
Here is an example of the message delegate used in the above message listener container:
public class MessageDelegate { @Autowired private DomainService domainService; public void handleMessage(Product product) throws MessageConversionException, JMSException { domainService.saveProduct(product); } }
As you can see the method that handles the message takes a
Product
as its parameter. The conversion from the
STRUCT to a Product is handled by the
MessageListenerAdapter
since we specified a
message converter.