View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.step.item;
18  
19  import java.util.List;
20  
21  import org.springframework.batch.core.StepContribution;
22  import org.springframework.batch.core.StepListener;
23  import org.springframework.batch.core.listener.MulticasterBatchListener;
24  import org.springframework.batch.item.ItemProcessor;
25  import org.springframework.batch.item.ItemWriter;
26  import org.springframework.beans.factory.InitializingBean;
27  import org.springframework.util.Assert;
28  
29  /**
30   * Simple implementation of the {@link ChunkProcessor} interface that handles
31   * basic item writing and processing. Any exceptions encountered will be
32   * rethrown.
33   *
34   * @see ChunkOrientedTasklet
35   */
36  public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
37  
38  	private ItemProcessor<? super I, ? extends O> itemProcessor;
39  
40  	private ItemWriter<? super O> itemWriter;
41  
42  	private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>();
43  
44  	/**
45  	 * Default constructor for ease of configuration (both itemWriter and
46  	 * itemProcessor are mandatory).
47  	 */
48  	@SuppressWarnings("unused")
49  	private SimpleChunkProcessor() {
50  		this(null, null);
51  	}
52  
53  	public SimpleChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) {
54  		this.itemProcessor = itemProcessor;
55  		this.itemWriter = itemWriter;
56  	}
57  
58  	/**
59  	 * @param itemProcessor the {@link ItemProcessor} to set
60  	 */
61  	public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) {
62  		this.itemProcessor = itemProcessor;
63  	}
64  
65  	/**
66  	 * @param itemWriter the {@link ItemWriter} to set
67  	 */
68  	public void setItemWriter(ItemWriter<? super O> itemWriter) {
69  		this.itemWriter = itemWriter;
70  	}
71  
72  	/**
73  	 * Check mandatory properties.
74  	 *
75  	 * @see InitializingBean#afterPropertiesSet()
76  	 */
77  	@Override
78  	public void afterPropertiesSet() throws Exception {
79  		Assert.notNull(itemWriter, "ItemWriter must be set");
80  		Assert.notNull(itemProcessor, "ItemProcessor must be set");
81  	}
82  
83  	/**
84  	 * Register some {@link StepListener}s with the handler. Each will get the
85  	 * callbacks in the order specified at the correct stage.
86  	 *
87  	 * @param listeners
88  	 */
89  	public void setListeners(List<? extends StepListener> listeners) {
90  		for (StepListener listener : listeners) {
91  			registerListener(listener);
92  		}
93  	}
94  
95  	/**
96  	 * Register a listener for callbacks at the appropriate stages in a process.
97  	 *
98  	 * @param listener a {@link StepListener}
99  	 */
100 	public void registerListener(StepListener listener) {
101 		this.listener.register(listener);
102 	}
103 
104 	/**
105 	 * @return the listener
106 	 */
107 	protected MulticasterBatchListener<I, O> getListener() {
108 		return listener;
109 	}
110 
111 	/**
112 	 * @param item the input item
113 	 * @return the result of the processing
114 	 * @throws Exception
115 	 */
116 	protected final O doProcess(I item) throws Exception {
117 
118 		if (itemProcessor == null) {
119 			@SuppressWarnings("unchecked")
120 			O result = (O) item;
121 			return result;
122 		}
123 
124 		try {
125 			listener.beforeProcess(item);
126 			O result = itemProcessor.process(item);
127 			listener.afterProcess(item, result);
128 			return result;
129 		}
130 		catch (Exception e) {
131 			listener.onProcessError(item, e);
132 			throw e;
133 		}
134 
135 	}
136 
137 	/**
138 	 * Surrounds the actual write call with listener callbacks.
139 	 *
140 	 * @param items
141 	 * @throws Exception
142 	 */
143 	protected final void doWrite(List<O> items) throws Exception {
144 
145 		if (itemWriter == null) {
146 			return;
147 		}
148 
149 		try {
150 			listener.beforeWrite(items);
151 			writeItems(items);
152 			doAfterWrite(items);
153 		}
154 		catch (Exception e) {
155 			doOnWriteError(e, items);
156 			throw e;
157 		}
158 
159 	}
160 
161 	/**
162 	 * Call the listener's after write method.
163 	 *
164 	 * @param items
165 	 */
166 	protected final void doAfterWrite(List<O> items) {
167 		listener.afterWrite(items);
168 	}
169 	protected final void doOnWriteError(Exception e, List<O> items) {
170 		listener.onWriteError(e, items);
171 	}
172 
173 	protected void writeItems(List<O> items) throws Exception {
174 		if (itemWriter != null) {
175 			itemWriter.write(items);
176 		}
177 	}
178 
179 	@Override
180 	public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
181 
182 		// Allow temporary state to be stored in the user data field
183 		initializeUserData(inputs);
184 
185 		// If there is no input we don't have to do anything more
186 		if (isComplete(inputs)) {
187 			return;
188 		}
189 
190 		// Make the transformation, calling remove() on the inputs iterator if
191 		// any items are filtered. Might throw exception and cause rollback.
192 		Chunk<O> outputs = transform(contribution, inputs);
193 
194 		// Adjust the filter count based on available data
195 		contribution.incrementFilterCount(getFilterCount(inputs, outputs));
196 
197 		// Adjust the outputs if necessary for housekeeping purposes, and then
198 		// write them out...
199 		write(contribution, inputs, getAdjustedOutputs(inputs, outputs));
200 
201 	}
202 
203 	/**
204 	 * Extension point for subclasses to allow them to memorise the contents of
205 	 * the inputs, in case they are needed for accounting purposes later. The
206 	 * default implementation sets up some user data to remember the original
207 	 * size of the inputs. If this method is overridden then some or all of
208 	 * {@link #isComplete(Chunk)}, {@link #getFilterCount(Chunk, Chunk)} and
209 	 * {@link #getAdjustedOutputs(Chunk, Chunk)} might also need to be, to
210 	 * ensure that the user data is handled consistently.
211 	 *
212 	 * @param inputs the inputs for the process
213 	 */
214 	protected void initializeUserData(Chunk<I> inputs) {
215 		inputs.setUserData(inputs.size());
216 	}
217 
218 	/**
219 	 * Extension point for subclasses to calculate the filter count. Defaults to
220 	 * the difference between input size and output size.
221 	 *
222 	 * @param inputs the inputs after transformation
223 	 * @param outputs the outputs after transformation
224 	 *
225 	 * @return the difference in sizes
226 	 *
227 	 * @see #initializeUserData(Chunk)
228 	 */
229 	protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) {
230 		return (Integer) inputs.getUserData() - outputs.size();
231 	}
232 
233 	/**
234 	 * Extension point for subclasses that want to store additional data in the
235 	 * inputs. Default just checks if inputs are empty.
236 	 *
237 	 * @param inputs the input chunk
238 	 * @return true if it is empty
239 	 *
240 	 * @see #initializeUserData(Chunk)
241 	 */
242 	protected boolean isComplete(Chunk<I> inputs) {
243 		return inputs.isEmpty();
244 	}
245 
246 	/**
247 	 * Extension point for subclasses that want to adjust the outputs based on
248 	 * additional saved data in the inputs. Default implementation just returns
249 	 * the outputs unchanged.
250 	 *
251 	 * @param inputs the inputs for the transformation
252 	 * @param outputs the result of the transformation
253 	 * @return the outputs unchanged
254 	 *
255 	 * @see #initializeUserData(Chunk)
256 	 */
257 	protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) {
258 		return outputs;
259 	}
260 
261 	/**
262 	 * Simple implementation delegates to the {@link #doWrite(List)} method and
263 	 * increments the write count in the contribution. Subclasses can handle
264 	 * more complicated scenarios, e.g.with fault tolerance. If output items are
265 	 * skipped they should be removed from the inputs as well.
266 	 *
267 	 * @param contribution the current step contribution
268 	 * @param inputs the inputs that gave rise to the ouputs
269 	 * @param outputs the outputs to write
270 	 * @throws Exception if there is a problem
271 	 */
272 	protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
273 		try {
274 			doWrite(outputs.getItems());
275 		}
276 		catch (Exception e) {
277 			/*
278 			 * For a simple chunk processor (no fault tolerance) we are done
279 			 * here, so prevent any more processing of these inputs.
280 			 */
281 			inputs.clear();
282 			throw e;
283 		}
284 		contribution.incrementWriteCount(outputs.size());
285 	}
286 
287 	protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
288 		Chunk<O> outputs = new Chunk<O>();
289 		for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
290 			final I item = iterator.next();
291 			O output;
292 			try {
293 				output = doProcess(item);
294 			}
295 			catch (Exception e) {
296 				/*
297 				 * For a simple chunk processor (no fault tolerance) we are done
298 				 * here, so prevent any more processing of these inputs.
299 				 */
300 				inputs.clear();
301 				throw e;
302 			}
303 			if (output != null) {
304 				outputs.add(output);
305 			}
306 			else {
307 				iterator.remove();
308 			}
309 		}
310 		return outputs;
311 	}
312 
313 }