View Javadoc

1   package org.springframework.batch.sample.common;
2   
3   import javax.sql.DataSource;
4   
5   import org.springframework.batch.item.ItemProcessor;
6   import org.springframework.beans.factory.InitializingBean;
7   import org.springframework.dao.OptimisticLockingFailureException;
8   import org.springframework.jdbc.core.JdbcOperations;
9   import org.springframework.jdbc.core.JdbcTemplate;
10  import org.springframework.util.Assert;
11  
12  /**
13   * Marks the input row as 'processed'. (This change will rollback if there is
14   * problem later)
15   *
16   * @param <T> item type
17   *
18   * @see StagingItemReader
19   * @see StagingItemWriter
20   * @see ProcessIndicatorItemWrapper
21   *
22   * @author Robert Kasanicky
23   */
24  public class StagingItemProcessor<T> implements ItemProcessor<ProcessIndicatorItemWrapper<T>, T>, InitializingBean {
25  
26  	private JdbcOperations jdbcTemplate;
27  
28  	public void setJdbcTemplate(JdbcOperations jdbcTemplate) {
29  		this.jdbcTemplate = jdbcTemplate;
30  	}
31  
32  	public void setDataSource(DataSource dataSource) {
33  		this.jdbcTemplate = new JdbcTemplate(dataSource);
34  	}
35  
36  	public void afterPropertiesSet() throws Exception {
37  		Assert.notNull(jdbcTemplate, "Either jdbcTemplate or dataSource must be set");
38  	}
39  
40  	/**
41  	 * Use the technical identifier to mark the input row as processed and
42  	 * return unwrapped item.
43  	 */
44  	public T process(ProcessIndicatorItemWrapper<T> wrapper) throws Exception {
45  
46  		int count = jdbcTemplate.update("UPDATE BATCH_STAGING SET PROCESSED=? WHERE ID=? AND PROCESSED=?",
47  				StagingItemWriter.DONE, wrapper.getId(), StagingItemWriter.NEW);
48  		if (count != 1) {
49  			throw new OptimisticLockingFailureException("The staging record with ID=" + wrapper.getId()
50  					+ " was updated concurrently when trying to mark as complete (updated " + count + " records.");
51  		}
52  		return wrapper.getItem();
53  	}
54  
55  }