View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.springframework.batch.item.database;
17  
18  import java.sql.PreparedStatement;
19  import java.sql.SQLException;
20  import java.util.ArrayList;
21  import java.util.List;
22  
23  import javax.sql.DataSource;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.springframework.batch.item.ItemWriter;
28  import org.springframework.beans.factory.InitializingBean;
29  import org.springframework.dao.DataAccessException;
30  import org.springframework.dao.EmptyResultDataAccessException;
31  import org.springframework.dao.InvalidDataAccessApiUsageException;
32  import org.springframework.jdbc.core.PreparedStatementCallback;
33  import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
34  import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
35  import org.springframework.jdbc.core.namedparam.SqlParameterSource;
36  import org.springframework.util.Assert;
37  
38  /**
39   * <p>{@link ItemWriter} that uses the batching features from
40   * {@link NamedParameterJdbcTemplate} to execute a batch of statements for all items
41   * provided.</p>
42   *
43   * The user must provide an SQL query and a special callback in the for of either
44   * {@link ItemPreparedStatementSetter}, or a {@link ItemSqlParameterSourceProvider}.
45   * You can use either named parameters or the traditional '?' placeholders. If you use the
46   * named parameter support then you should provide a {@link ItemSqlParameterSourceProvider},
47   * otherwise you should provide a  {@link ItemPreparedStatementSetter}.
48   * This callback would be responsible for mapping the item to the parameters needed to
49   * execute the SQL statement.<br/>
50   *
51   * It is expected that {@link #write(List)} is called inside a transaction.<br/>
52   *
53   * The writer is thread safe after its properties are set (normal singleton
54   * behavior), so it can be used to write in multiple concurrent transactions.
55   *
56   * @author Dave Syer
57   * @author Thomas Risberg
58   * @since 2.0
59   */
60  public class JdbcBatchItemWriter<T> implements ItemWriter<T>, InitializingBean {
61  
62  	protected static final Log logger = LogFactory.getLog(JdbcBatchItemWriter.class);
63  
64  	private NamedParameterJdbcOperations namedParameterJdbcTemplate;
65  
66  	private ItemPreparedStatementSetter<T> itemPreparedStatementSetter;
67  
68  	private ItemSqlParameterSourceProvider<T> itemSqlParameterSourceProvider;
69  
70  	private String sql;
71  
72  	private boolean assertUpdates = true;
73  
74  	private int parameterCount;
75  
76  	private boolean usingNamedParameters;
77  
78  	/**
79  	 * Public setter for the flag that determines whether an assertion is made
80  	 * that all items cause at least one row to be updated.
81  	 * @param assertUpdates the flag to set. Defaults to true;
82  	 */
83  	public void setAssertUpdates(boolean assertUpdates) {
84  		this.assertUpdates = assertUpdates;
85  	}
86  
87  	/**
88  	 * Public setter for the query string to execute on write. The parameters
89  	 * should correspond to those known to the
90  	 * {@link ItemPreparedStatementSetter}.
91  	 * @param sql the query to set
92  	 */
93  	public void setSql(String sql) {
94  		this.sql = sql;
95  	}
96  
97  	/**
98  	 * Public setter for the {@link ItemPreparedStatementSetter}.
99  	 * @param preparedStatementSetter the {@link ItemPreparedStatementSetter} to
100 	 * set. This is required when using traditional '?' placeholders for the SQL statement.
101 	 */
102 	public void setItemPreparedStatementSetter(ItemPreparedStatementSetter<T> preparedStatementSetter) {
103 		this.itemPreparedStatementSetter = preparedStatementSetter;
104 	}
105 
106 	/**
107 	 * Public setter for the {@link ItemSqlParameterSourceProvider}.
108 	 * @param itemSqlParameterSourceProvider the {@link ItemSqlParameterSourceProvider} to
109 	 * set. This is required when using named parameters for the SQL statement.
110 	 */
111 	public void setItemSqlParameterSourceProvider(ItemSqlParameterSourceProvider<T> itemSqlParameterSourceProvider) {
112 		this.itemSqlParameterSourceProvider = itemSqlParameterSourceProvider;
113 	}
114 
115 	/**
116 	 * Public setter for the data source for injection purposes.
117 	 *
118 	 * @param dataSource
119 	 */
120 	public void setDataSource(DataSource dataSource) {
121 		if (namedParameterJdbcTemplate == null) {
122 			this.namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
123 		}
124 	}
125 
126 	/**
127 	 * Public setter for the {@link NamedParameterJdbcOperations}.
128 	 * @param namedParameterJdbcTemplate the {@link NamedParameterJdbcOperations} to set
129 	 */
130 	public void setJdbcTemplate(NamedParameterJdbcOperations namedParameterJdbcTemplate) {
131 		this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
132 	}
133 
134 	/**
135 	 * Check mandatory properties - there must be a SimpleJdbcTemplate and an SQL statement plus a
136 	 * parameter source.
137 	 */
138 	@Override
139 	public void afterPropertiesSet() {
140 		Assert.notNull(namedParameterJdbcTemplate, "A DataSource or a NamedParameterJdbcTemplate is required.");
141 		Assert.notNull(sql, "An SQL statement is required.");
142 		List<String> namedParameters = new ArrayList<String>();
143 		parameterCount = JdbcParameterUtils.countParameterPlaceholders(sql, namedParameters);
144 		if (namedParameters.size() > 0) {
145 			if (parameterCount != namedParameters.size()) {
146 				throw new InvalidDataAccessApiUsageException("You can't use both named parameters and classic \"?\" placeholders: " + sql);
147 			}
148 			usingNamedParameters = true;
149 		}
150 		if (usingNamedParameters) {
151 			Assert.notNull(itemSqlParameterSourceProvider, "Using SQL statement with named parameters requires an ItemSqlParameterSourceProvider");
152 		}
153 		else {
154 			Assert.notNull(itemPreparedStatementSetter, "Using SQL statement with '?' placeholders requires an ItemPreparedStatementSetter");
155 		}
156 	}
157 
158 	/* (non-Javadoc)
159 	 * @see org.springframework.batch.item.ItemWriter#write(java.util.List)
160 	 */
161 	@Override
162 	public void write(final List<? extends T> items) throws Exception {
163 
164 		if (!items.isEmpty()) {
165 
166 			if (logger.isDebugEnabled()) {
167 				logger.debug("Executing batch with " + items.size() + " items.");
168 			}
169 
170 			int[] updateCounts = null;
171 
172 			if (usingNamedParameters) {
173 				SqlParameterSource[] batchArgs = new SqlParameterSource[items.size()];
174 				int i = 0;
175 				for (T item : items) {
176 					batchArgs[i++] = itemSqlParameterSourceProvider.createSqlParameterSource(item);
177 				}
178 				updateCounts = namedParameterJdbcTemplate.batchUpdate(sql, batchArgs);
179 			}
180 			else {
181 				updateCounts = (int[]) namedParameterJdbcTemplate.getJdbcOperations().execute(sql, new PreparedStatementCallback() {
182 					@Override
183 					public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
184 						for (T item : items) {
185 							itemPreparedStatementSetter.setValues(item, ps);
186 							ps.addBatch();
187 						}
188 						return ps.executeBatch();
189 					}
190 				});
191 			}
192 
193 			if (assertUpdates) {
194 				for (int i = 0; i < updateCounts.length; i++) {
195 					int value = updateCounts[i];
196 					if (value == 0) {
197 						throw new EmptyResultDataAccessException("Item " + i + " of " + updateCounts.length
198 								+ " did not update any rows: [" + items.get(i) + "]", 1);
199 					}
200 				}
201 			}
202 		}
203 	}
204 }