Inbound Channel Adapter

The main function of an inbound channel adapter is to execute a SQL SELECT query and turn the result set into a message. The message payload is the whole result set (expressed as a List), and the types of the items in the list depend on the row-mapping strategy. The default strategy is a generic mapper that returns a Map for each row in the query result. Optionally, you can change this by adding a reference to a RowMapper instance (see the Spring JDBC documentation for more detailed information about row mapping).

If you want to convert rows in the SELECT query result to individual messages, you can use a downstream splitter.

The inbound adapter also requires a reference to either a JdbcTemplate instance or a DataSource.

As well as the SELECT statement to generate the messages, the adapter also has an UPDATE statement that marks the records as processed so that they do not show up in the next poll. The update can be parameterized by the list of IDs from the original select. By default, this is done through a naming convention (a column in the input result set called id is translated into a list in the parameter map for the update called id). The following example defines an inbound channel adapter with an update query and a DataSource reference.

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />
The parameters in the update query are specified with a colon (:) prefix to the name of a parameter (which, in the preceding example, is an expression to be applied to each of the rows in the polled result set). This is a standard feature of the named parameter JDBC support in Spring JDBC, combined with a convention (projection onto the polled result list) adopted in Spring Integration. The underlying Spring JDBC features limit the available expressions (for example, most special characters other than a period are disallowed), but since the target is usually a list of objects (possibly a list of one) that are addressable by bean paths this is not unduly restrictive.

To change the parameter generation strategy, you can inject a SqlParameterSourceFactory into the adapter to override the default behavior (the adapter has a sql-parameter-source-factory attribute). Spring Integration provides ExpressionEvaluatingSqlParameterSourceFactory, which creates a SpEL-based parameter source, with the results of the query as the #root object. (If update-per-row is true, the root object is the row). If the same parameter name appears multiple times in the update query, it is evaluated only once, and its result is cached.

You can also use a parameter source for the select query. In this case, since there is no “result” object to evaluate against, a single parameter source is used each time (rather than using a parameter source factory). Starting with version 4.0, you can use Spring to create a SpEL based parameter source, as the following example shows:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

The value in each parameter expression can be any valid SpEL expression. The #root object for the expression evaluation is the constructor argument defined on the parameterSource bean. It is static for all evaluations (in the preceding example, an empty String).

Starting with version 5.0, you can supply ExpressionEvaluatingSqlParameterSourceFactory with sqlParameterTypes to specify the target SQL type for the particular parameter.

The following example provides SQL types for the parameters being used in the query:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>
Use the createParameterSourceNoCache factory method. Otherwise, the parameter source caches the result of the evaluation. Also note that, because caching is disabled, if the same parameter name appears in the select query multiple times, it is re-evaluated for each occurrence.

Polling and Transactions

The inbound adapter accepts a regular Spring Integration poller as a child element. Consequently, the frequency of the polling can be controlled (among other uses). An important feature of the poller for JDBC usage is the option to wrap the poll operation in a transaction, as the following example shows:

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
If you do not explicitly specify a poller, a default value is used. As is normal with Spring Integration, it can be defined as a top-level bean).

In the preceding example, the database is polled every 1000 milliseconds (or once a second), and the update and select queries are both executed in the same transaction. The transaction manager configuration is not shown. However, as long as it is aware of the data source, the poll is transactional. A common use case is for the downstream channels to be direct channels (the default), so that the endpoints are invoked in the same thread and, hence, the same transaction. That way, if any of them fail, the transaction rolls back and the input data is reverted to its original state.

max-rows Versus max-messages-per-poll

The JDBC inbound channel adapter defines an attribute called max-rows. When you specify the adapter’s poller, you can also define a property called max-messages-per-poll. While these two attributes look similar, their meaning is quite different.

max-messages-per-poll specifies the number of times the query is executed per polling interval, whereas max-rows specifies the number of rows returned for each execution.

Under normal circumstances, you would likely not want to set the poller’s max-messages-per-poll property when you use the JDBC inbound channel adapter. Its default value is 1, which means that the JDBC inbound channel adapter’s receive() method is executed exactly once for each poll interval.

Setting the max-messages-per-poll attribute to a larger value means that the query is executed that many times back to back. For more information regarding the max-messages-per-poll attribute, see Configuring An Inbound Channel Adapter.

In contrast, the max-rows attribute, if greater than 0, specifies the maximum number of rows to be used from the query result set created by the receive() method. If the attribute is set to 0, all rows are included in the resulting message. The attribute defaults to 0.

It is recommended to use result set limiting via vendor-specific query options, for example MySQL LIMIT or SQL Server TOP or Oracle’s ROWNUM. See the particular vendor documentation for more information.