Database

Like most enterprise application styles, a database is the central storage mechanism for batch. However, batch differs from other application styles due to the sheer size of the datasets with which the system must work. If a SQL statement returns 1 million rows, the result set probably holds all returned results in memory until all rows have been read. Spring Batch provides two types of solutions for this problem:

Cursor-based ItemReader Implementations

Using a database cursor is generally the default approach of most batch developers, because it is the database’s solution to the problem of 'streaming' relational data. The Java ResultSet class is essentially an object oriented mechanism for manipulating a cursor. A ResultSet maintains a cursor to the current row of data. Calling next on a ResultSet moves this cursor to the next row. The Spring Batch cursor-based ItemReader implementation opens a cursor on initialization and moves the cursor forward one row for every call to read, returning a mapped object that can be used for processing. The close method is then called to ensure all resources are freed up. The Spring core JdbcTemplate gets around this problem by using the callback pattern to completely map all rows in a ResultSet and close before returning control back to the method caller. However, in batch, this must wait until the step is complete. The following image shows a generic diagram of how a cursor-based ItemReader works. Note that, while the example uses SQL (because SQL is so widely known), any technology could implement the basic approach.

Cursor Example
Figure 1. Cursor Example

This example illustrates the basic pattern. Given a 'FOO' table, which has three columns: ID, NAME, and BAR, select all rows with an ID greater than 1 but less than 7. This puts the beginning of the cursor (row 1) on ID 2. The result of this row should be a completely mapped Foo object. Calling read() again moves the cursor to the next row, which is the Foo with an ID of 3. The results of these reads are written out after each read, allowing the objects to be garbage collected (assuming no instance variables are maintaining references to them).

JdbcCursorItemReader

JdbcCursorItemReader is the JDBC implementation of the cursor-based technique. It works directly with a ResultSet and requires an SQL statement to run against a connection obtained from a DataSource. The following database schema is used as an example:

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

Many people prefer to use a domain object for each row, so the following example uses an implementation of the RowMapper interface to map a CustomerCredit object:

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

Because JdbcCursorItemReader shares key interfaces with JdbcTemplate, it is useful to see an example of how to read in this data with JdbcTemplate, in order to contrast it with the ItemReader. For the purposes of this example, assume there are 1,000 rows in the CUSTOMER database. The first example uses JdbcTemplate:

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

After running the preceding code snippet, the customerCredits list contains 1,000 CustomerCredit objects. In the query method, a connection is obtained from the DataSource, the provided SQL is run against it, and the mapRow method is called for each row in the ResultSet. Contrast this with the approach of the JdbcCursorItemReader, shown in the following example:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

After running the preceding code snippet, the counter equals 1,000. If the code above had put the returned customerCredit into a list, the result would have been exactly the same as with the JdbcTemplate example. However, the big advantage of the ItemReader is that it allows items to be 'streamed'. The read method can be called once, the item can be written out by an ItemWriter, and then the next item can be obtained with read. This allows item reading and writing to be done in 'chunks' and committed periodically, which is the essence of high performance batch processing. Furthermore, it is easily configured for injection into a Spring Batch Step.

  • Java

  • XML

The following example shows how to inject an ItemReader into a Step in Java:

Java Configuration
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
	return new JdbcCursorItemReaderBuilder<CustomerCredit>()
			.dataSource(this.dataSource)
			.name("creditReader")
			.sql("select ID, NAME, CREDIT from CUSTOMER")
			.rowMapper(new CustomerCreditRowMapper())
			.build();

}

The following example shows how to inject an ItemReader into a Step in XML:

XML Configuration
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

Additional Properties

Because there are so many varying options for opening a cursor in Java, there are many properties on the JdbcCursorItemReader that can be set, as described in the following table:

Table 1. JdbcCursorItemReader Properties

ignoreWarnings

Determines whether or not SQLWarnings are logged or cause an exception. The default is true (meaning that warnings are logged).

fetchSize

Gives the JDBC driver a hint as to the number of rows that should be fetched from the database when more rows are needed by the ResultSet object used by the ItemReader. By default, no hint is given.

maxRows

Sets the limit for the maximum number of rows the underlying ResultSet can hold at any one time.

queryTimeout

Sets the number of seconds the driver waits for a Statement object to run. If the limit is exceeded, a DataAccessException is thrown. (Consult your driver vendor documentation for details).

verifyCursorPosition

Because the same ResultSet held by the ItemReader is passed to the RowMapper, it is possible for users to call ResultSet.next() themselves, which could cause issues with the reader’s internal count. Setting this value to true causes an exception to be thrown if the cursor position is not the same after the RowMapper call as it was before.

saveState

Indicates whether or not the reader’s state should be saved in the ExecutionContext provided by ItemStream#update(ExecutionContext). The default is true.

driverSupportsAbsolute

Indicates whether the JDBC driver supports setting the absolute row on a ResultSet. It is recommended that this is set to true for JDBC drivers that support ResultSet.absolute(), as it may improve performance, especially if a step fails while working with a large data set. Defaults to false.

setUseSharedExtendedConnection

Indicates whether the connection used for the cursor should be used by all other processing, thus sharing the same transaction. If this is set to false, then the cursor is opened with its own connection and does not participate in any transactions started for the rest of the step processing. If you set this flag to true then you must wrap the DataSource in an ExtendedConnectionDataSourceProxy to prevent the connection from being closed and released after each commit. When you set this option to true, the statement used to open the cursor is created with both 'READ_ONLY' and 'HOLD_CURSORS_OVER_COMMIT' options. This allows holding the cursor open over transaction start and commits performed in the step processing. To use this feature, you need a database that supports this and a JDBC driver supporting JDBC 3.0 or later. Defaults to false.

StoredProcedureItemReader

Sometimes it is necessary to obtain the cursor data by using a stored procedure. The StoredProcedureItemReader works like the JdbcCursorItemReader, except that, instead of running a query to obtain a cursor, it runs a stored procedure that returns a cursor. The stored procedure can return the cursor in three different ways:

  • As a returned ResultSet (used by SQL Server, Sybase, DB2, Derby, and MySQL).

  • As a ref-cursor returned as an out parameter (used by Oracle and PostgreSQL).

  • As the return value of a stored function call.

  • Java

  • XML

The following Java example configuration uses the same 'customer credit' example as earlier examples:

Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());

	return reader;
}

The following XML example configuration uses the same 'customer credit' example as earlier examples:

XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

The preceding example relies on the stored procedure to provide a ResultSet as a returned result (option 1 from earlier).

If the stored procedure returned a ref-cursor (option 2), then we would need to provide the position of the out parameter that is the returned ref-cursor.

  • Java

  • XML

The following example shows how to work with the first parameter being a ref-cursor in Java:

Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setRefCursorPosition(1);

	return reader;
}

The following example shows how to work with the first parameter being a ref-cursor in XML:

XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

If the cursor was returned from a stored function (option 3), we would need to set the property "function" to true. It defaults to false.

  • Java

  • XML

The following example shows property to true in Java:

Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setFunction(true);

	return reader;
}

The following example shows property to true in XML:

XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="sp_customer_credit"/>
    <property name="function" value="true"/>
    <property name="rowMapper">
        <bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
    </property>
</bean>

In all of these cases, we need to define a RowMapper as well as a DataSource and the actual procedure name.

If the stored procedure or function takes in parameters, then they must be declared and set by using the parameters property. The following example, for Oracle, declares three parameters. The first one is the out parameter that returns the ref-cursor, and the second and third are in parameters that takes a value of type INTEGER.

  • Java

  • XML

The following example shows how to work with parameters in Java:

Java Configuration
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	List<SqlParameter> parameters = new ArrayList<>();
	parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
	parameters.add(new SqlParameter("amount", Types.INTEGER);
	parameters.add(new SqlParameter("custId", Types.INTEGER);

	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("spring.cursor_func");
	reader.setParameters(parameters);
	reader.setRefCursorPosition(1);
	reader.setRowMapper(rowMapper());
	reader.setPreparedStatementSetter(parameterSetter());

	return reader;
}

The following example shows how to work with parameters in XML:

XML Configuration
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="procedureName" value="spring.cursor_func"/>
    <property name="parameters">
        <list>
            <bean class="org.springframework.jdbc.core.SqlOutParameter">
                <constructor-arg index="0" value="newid"/>
                <constructor-arg index="1">
                    <util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="amount"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
            <bean class="org.springframework.jdbc.core.SqlParameter">
                <constructor-arg index="0" value="custid"/>
                <constructor-arg index="1">
                    <util:constant static-field="java.sql.Types.INTEGER"/>
                </constructor-arg>
            </bean>
        </list>
    </property>
    <property name="refCursorPosition" value="1"/>
    <property name="rowMapper" ref="rowMapper"/>
    <property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>

In addition to the parameter declarations, we need to specify a PreparedStatementSetter implementation that sets the parameter values for the call. This works the same as for the JdbcCursorItemReader above. All the additional properties listed in Additional Properties apply to the StoredProcedureItemReader as well.

Paging ItemReader Implementations

An alternative to using a database cursor is running multiple queries where each query fetches a portion of the results. We refer to this portion as a page. Each query must specify the starting row number and the number of rows that we want returned in the page.

JdbcPagingItemReader

One implementation of a paging ItemReader is the JdbcPagingItemReader. The JdbcPagingItemReader needs a PagingQueryProvider responsible for providing the SQL queries used to retrieve the rows making up a page. Since each database has its own strategy for providing paging support, we need to use a different PagingQueryProvider for each supported database type. There is also the SqlPagingQueryProviderFactoryBean that auto-detects the database that is being used and determine the appropriate PagingQueryProvider implementation. This simplifies the configuration and is the recommended best practice.

The SqlPagingQueryProviderFactoryBean requires that you specify a select clause and a from clause. You can also provide an optional where clause. These clauses and the required sortKey are used to build an SQL statement.

It is important to have a unique key constraint on the sortKey to guarantee that no data is lost between executions.

After the reader has been opened, it passes back one item per call to read in the same basic fashion as any other ItemReader. The paging happens behind the scenes when additional rows are needed.

  • Java

  • XML

The following Java example configuration uses a similar 'customer credit' example as the cursor-based ItemReaders shown previously:

Java Configuration
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
	Map<String, Object> parameterValues = new HashMap<>();
	parameterValues.put("status", "NEW");

	return new JdbcPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.dataSource(dataSource)
           				.queryProvider(queryProvider)
           				.parameterValues(parameterValues)
           				.rowMapper(customerCreditMapper())
           				.pageSize(1000)
           				.build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
	SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

	provider.setSelectClause("select id, name, credit");
	provider.setFromClause("from customer");
	provider.setWhereClause("where status=:status");
	provider.setSortKey("id");

	return provider;
}

The following XML example configuration uses a similar 'customer credit' example as the cursor-based ItemReaders shown previously:

XML Configuration
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
    <property name="dataSource" ref="dataSource"/>
    <property name="queryProvider">
        <bean class="org.spr...SqlPagingQueryProviderFactoryBean">
            <property name="selectClause" value="select id, name, credit"/>
            <property name="fromClause" value="from customer"/>
            <property name="whereClause" value="where status=:status"/>
            <property name="sortKey" value="id"/>
        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="status" value="NEW"/>
        </map>
    </property>
    <property name="pageSize" value="1000"/>
    <property name="rowMapper" ref="customerMapper"/>
</bean>

This configured ItemReader returns CustomerCredit objects using the RowMapper, which must be specified. The 'pageSize' property determines the number of entities read from the database for each query run.

The 'parameterValues' property can be used to specify a Map of parameter values for the query. If you use named parameters in the where clause, the key for each entry should match the name of the named parameter. If you use a traditional '?' placeholder, then the key for each entry should be the number of the placeholder, starting with 1.

JpaPagingItemReader

Another implementation of a paging ItemReader is the JpaPagingItemReader. JPA does not have a concept similar to the Hibernate StatelessSession, so we have to use other features provided by the JPA specification. Since JPA supports paging, this is a natural choice when it comes to using JPA for batch processing. After each page is read, the entities become detached and the persistence context is cleared, to allow the entities to be garbage collected once the page is processed.

The JpaPagingItemReader lets you declare a JPQL statement and pass in a EntityManagerFactory. It then passes back one item per call to read in the same basic fashion as any other ItemReader. The paging happens behind the scenes when additional entities are needed.

  • Java

  • XML

The following Java example configuration uses the same 'customer credit' example as the JDBC reader shown previously:

Java Configuration
@Bean
public JpaPagingItemReader itemReader() {
	return new JpaPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.entityManagerFactory(entityManagerFactory())
           				.queryString("select c from CustomerCredit c")
           				.pageSize(1000)
           				.build();
}

The following XML example configuration uses the same 'customer credit' example as the JDBC reader shown previously:

XML Configuration
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
    <property name="entityManagerFactory" ref="entityManagerFactory"/>
    <property name="queryString" value="select c from CustomerCredit c"/>
    <property name="pageSize" value="1000"/>
</bean>

This configured ItemReader returns CustomerCredit objects in the exact same manner as described for the JdbcPagingItemReader above, assuming the CustomerCredit object has the correct JPA annotations or ORM mapping file. The 'pageSize' property determines the number of entities read from the database for each query execution.

Database ItemWriters

While both flat files and XML files have a specific ItemWriter instance, there is no exact equivalent in the database world. This is because transactions provide all the needed functionality. ItemWriter implementations are necessary for files because they must act as if they’re transactional, keeping track of written items and flushing or clearing at the appropriate times. Databases have no need for this functionality, since the write is already contained in a transaction. Users can create their own DAOs that implement the ItemWriter interface or use one from a custom ItemWriter that’s written for generic processing concerns. Either way, they should work without any issues. One thing to look out for is the performance and error handling capabilities that are provided by batching the outputs. This is most common when using hibernate as an ItemWriter but could have the same issues when using JDBC batch mode. Batching database output does not have any inherent flaws, assuming we are careful to flush and there are no errors in the data. However, any errors while writing can cause confusion, because there is no way to know which individual item caused an exception or even if any individual item was responsible, as illustrated in the following image:

Error On Flush
Figure 2. Error On Flush

If items are buffered before being written, any errors are not thrown until the buffer is flushed just before a commit. For example, assume that 20 items are written per chunk, and the 15th item throws a DataIntegrityViolationException. As far as the Step is concerned, all 20 item are written successfully, since there is no way to know that an error occurs until they are actually written. Once Session#flush() is called, the buffer is emptied and the exception is hit. At this point, there is nothing the Step can do. The transaction must be rolled back. Normally, this exception might cause the item to be skipped (depending upon the skip/retry policies), and then it is not written again. However, in the batched scenario, there is no way to know which item caused the issue. The whole buffer was being written when the failure happened. The only way to solve this issue is to flush after each item, as shown in the following image:

Error On Write
Figure 3. Error On Write

This is a common use case, especially when using Hibernate, and the simple guideline for implementations of ItemWriter is to flush on each call to write(). Doing so allows for items to be skipped reliably, with Spring Batch internally taking care of the granularity of the calls to ItemWriter after an error.