View Javadoc

1   /*
2    * Copyright 2006-2007 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.sample.common;
18  
19  import java.io.Serializable;
20  import java.sql.PreparedStatement;
21  import java.sql.SQLException;
22  import java.util.List;
23  import java.util.ListIterator;
24  
25  import org.springframework.batch.core.ExitStatus;
26  import org.springframework.batch.core.StepExecution;
27  import org.springframework.batch.core.StepExecutionListener;
28  import org.springframework.batch.item.ItemWriter;
29  import org.springframework.batch.support.SerializationUtils;
30  import org.springframework.jdbc.core.BatchPreparedStatementSetter;
31  import org.springframework.jdbc.core.support.JdbcDaoSupport;
32  import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
33  import org.springframework.util.Assert;
34  import org.springframework.util.ClassUtils;
35  
36  /**
37   * Database {@link ItemWriter} implementing the process indicator pattern.
38   */
39  public class StagingItemWriter<T> extends JdbcDaoSupport implements StepExecutionListener, ItemWriter<T> {
40  
41  	public static final String NEW = "N";
42  
43  	public static final String DONE = "Y";
44  
45  	public static final Object WORKING = "W";
46  
47  	private DataFieldMaxValueIncrementer incrementer;
48  
49  	private StepExecution stepExecution;
50  
51  	/**
52  	 * Check mandatory properties.
53  	 * 
54  	 * @see org.springframework.dao.support.DaoSupport#initDao()
55  	 */
56  	protected void initDao() throws Exception {
57  		super.initDao();
58  		Assert.notNull(incrementer, "DataFieldMaxValueIncrementer is required - set the incrementer property in the "
59  				+ ClassUtils.getShortName(StagingItemWriter.class));
60  	}
61  
62  	/**
63  	 * Setter for the key generator for the staging table.
64  	 * 
65  	 * @param incrementer the {@link DataFieldMaxValueIncrementer} to set
66  	 */
67  	public void setIncrementer(DataFieldMaxValueIncrementer incrementer) {
68  		this.incrementer = incrementer;
69  	}
70  
71  	/**
72  	 * Serialize the item to the staging table, and add a NEW processed flag.
73  	 * 
74  	 * @see ItemWriter#write(java.util.List)
75  	 */
76  	public void write(final List<? extends T> items) {
77  
78  		final ListIterator<? extends T> itemIterator = items.listIterator();
79  		getJdbcTemplate().batchUpdate("INSERT into BATCH_STAGING (ID, JOB_ID, VALUE, PROCESSED) values (?,?,?,?)",
80  				new BatchPreparedStatementSetter() {
81  
82  					public int getBatchSize() {
83  						return items.size();
84  					}
85  
86  					public void setValues(PreparedStatement ps, int i) throws SQLException {
87  
88  						long id = incrementer.nextLongValue();
89  						long jobId = stepExecution.getJobExecution().getJobId();
90  
91  						Assert.state(itemIterator.nextIndex() == i,
92  								"Item ordering must be preserved in batch sql update");
93  
94  						byte[] blob = SerializationUtils.serialize((Serializable) itemIterator.next());
95  
96  						ps.setLong(1, id);
97  						ps.setLong(2, jobId);
98  						ps.setBytes(3, blob);
99  						ps.setString(4, NEW);
100 					}
101 				});
102 
103 	}
104 
105 	/*
106 	 * (non-Javadoc)
107 	 * 
108 	 * @see
109 	 * org.springframework.batch.core.domain.StepListener#afterStep(StepExecution
110 	 * )
111 	 */
112 	public ExitStatus afterStep(StepExecution stepExecution) {
113 		return null;
114 	}
115 
116 	/*
117 	 * (non-Javadoc)
118 	 * 
119 	 * @seeorg.springframework.batch.core.domain.StepListener#beforeStep(org.
120 	 * springframework.batch.core.domain.StepExecution)
121 	 */
122 	public void beforeStep(StepExecution stepExecution) {
123 		this.stepExecution = stepExecution;
124 	}
125 
126 }