1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.List; |
20 | |
21 | import org.springframework.batch.core.StepContribution; |
22 | import org.springframework.batch.core.StepListener; |
23 | import org.springframework.batch.core.listener.MulticasterBatchListener; |
24 | import org.springframework.batch.item.ItemProcessor; |
25 | import org.springframework.batch.item.ItemWriter; |
26 | import org.springframework.beans.factory.InitializingBean; |
27 | import org.springframework.util.Assert; |
28 | |
29 | /** |
30 | * Simple implementation of the {@link ChunkProcessor} interface that handles |
31 | * basic item writing and processing. Any exceptions encountered will be |
32 | * rethrown. |
33 | * |
34 | * @see ChunkOrientedTasklet |
35 | */ |
36 | public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean { |
37 | |
38 | private ItemProcessor<? super I, ? extends O> itemProcessor; |
39 | |
40 | private ItemWriter<? super O> itemWriter; |
41 | |
42 | private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>(); |
43 | |
44 | /** |
45 | * Default constructor for ease of configuration (both itemWriter and |
46 | * itemProcessor are mandatory). |
47 | */ |
48 | @SuppressWarnings("unused") |
49 | private SimpleChunkProcessor() { |
50 | this(null, null); |
51 | } |
52 | |
53 | public SimpleChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) { |
54 | this.itemProcessor = itemProcessor; |
55 | this.itemWriter = itemWriter; |
56 | } |
57 | |
58 | /** |
59 | * @param itemProcessor the {@link ItemProcessor} to set |
60 | */ |
61 | public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) { |
62 | this.itemProcessor = itemProcessor; |
63 | } |
64 | |
65 | /** |
66 | * @param itemWriter the {@link ItemWriter} to set |
67 | */ |
68 | public void setItemWriter(ItemWriter<? super O> itemWriter) { |
69 | this.itemWriter = itemWriter; |
70 | } |
71 | |
72 | /** |
73 | * Check mandatory properties. |
74 | * |
75 | * @see InitializingBean#afterPropertiesSet() |
76 | */ |
77 | @Override |
78 | public void afterPropertiesSet() throws Exception { |
79 | Assert.notNull(itemWriter, "ItemWriter must be set"); |
80 | Assert.notNull(itemProcessor, "ItemProcessor must be set"); |
81 | } |
82 | |
83 | /** |
84 | * Register some {@link StepListener}s with the handler. Each will get the |
85 | * callbacks in the order specified at the correct stage. |
86 | * |
87 | * @param listeners |
88 | */ |
89 | public void setListeners(List<? extends StepListener> listeners) { |
90 | for (StepListener listener : listeners) { |
91 | registerListener(listener); |
92 | } |
93 | } |
94 | |
95 | /** |
96 | * Register a listener for callbacks at the appropriate stages in a process. |
97 | * |
98 | * @param listener a {@link StepListener} |
99 | */ |
100 | public void registerListener(StepListener listener) { |
101 | this.listener.register(listener); |
102 | } |
103 | |
104 | /** |
105 | * @return the listener |
106 | */ |
107 | protected MulticasterBatchListener<I, O> getListener() { |
108 | return listener; |
109 | } |
110 | |
111 | /** |
112 | * @param item the input item |
113 | * @return the result of the processing |
114 | * @throws Exception |
115 | */ |
116 | protected final O doProcess(I item) throws Exception { |
117 | |
118 | if (itemProcessor == null) { |
119 | @SuppressWarnings("unchecked") |
120 | O result = (O) item; |
121 | return result; |
122 | } |
123 | |
124 | try { |
125 | listener.beforeProcess(item); |
126 | O result = itemProcessor.process(item); |
127 | listener.afterProcess(item, result); |
128 | return result; |
129 | } |
130 | catch (Exception e) { |
131 | listener.onProcessError(item, e); |
132 | throw e; |
133 | } |
134 | |
135 | } |
136 | |
137 | /** |
138 | * Surrounds the actual write call with listener callbacks. |
139 | * |
140 | * @param items |
141 | * @throws Exception |
142 | */ |
143 | protected final void doWrite(List<O> items) throws Exception { |
144 | |
145 | if (itemWriter == null) { |
146 | return; |
147 | } |
148 | |
149 | try { |
150 | listener.beforeWrite(items); |
151 | writeItems(items); |
152 | doAfterWrite(items); |
153 | } |
154 | catch (Exception e) { |
155 | doOnWriteError(e, items); |
156 | throw e; |
157 | } |
158 | |
159 | } |
160 | |
161 | /** |
162 | * Call the listener's after write method. |
163 | * |
164 | * @param items |
165 | */ |
166 | protected final void doAfterWrite(List<O> items) { |
167 | listener.afterWrite(items); |
168 | } |
169 | protected final void doOnWriteError(Exception e, List<O> items) { |
170 | listener.onWriteError(e, items); |
171 | } |
172 | |
173 | protected void writeItems(List<O> items) throws Exception { |
174 | if (itemWriter != null) { |
175 | itemWriter.write(items); |
176 | } |
177 | } |
178 | |
179 | @Override |
180 | public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception { |
181 | |
182 | // Allow temporary state to be stored in the user data field |
183 | initializeUserData(inputs); |
184 | |
185 | // If there is no input we don't have to do anything more |
186 | if (isComplete(inputs)) { |
187 | return; |
188 | } |
189 | |
190 | // Make the transformation, calling remove() on the inputs iterator if |
191 | // any items are filtered. Might throw exception and cause rollback. |
192 | Chunk<O> outputs = transform(contribution, inputs); |
193 | |
194 | // Adjust the filter count based on available data |
195 | contribution.incrementFilterCount(getFilterCount(inputs, outputs)); |
196 | |
197 | // Adjust the outputs if necessary for housekeeping purposes, and then |
198 | // write them out... |
199 | write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); |
200 | |
201 | } |
202 | |
203 | /** |
204 | * Extension point for subclasses to allow them to memorise the contents of |
205 | * the inputs, in case they are needed for accounting purposes later. The |
206 | * default implementation sets up some user data to remember the original |
207 | * size of the inputs. If this method is overridden then some or all of |
208 | * {@link #isComplete(Chunk)}, {@link #getFilterCount(Chunk, Chunk)} and |
209 | * {@link #getAdjustedOutputs(Chunk, Chunk)} might also need to be, to |
210 | * ensure that the user data is handled consistently. |
211 | * |
212 | * @param inputs the inputs for the process |
213 | */ |
214 | protected void initializeUserData(Chunk<I> inputs) { |
215 | inputs.setUserData(inputs.size()); |
216 | } |
217 | |
218 | /** |
219 | * Extension point for subclasses to calculate the filter count. Defaults to |
220 | * the difference between input size and output size. |
221 | * |
222 | * @param inputs the inputs after transformation |
223 | * @param outputs the outputs after transformation |
224 | * |
225 | * @return the difference in sizes |
226 | * |
227 | * @see #initializeUserData(Chunk) |
228 | */ |
229 | protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) { |
230 | return (Integer) inputs.getUserData() - outputs.size(); |
231 | } |
232 | |
233 | /** |
234 | * Extension point for subclasses that want to store additional data in the |
235 | * inputs. Default just checks if inputs are empty. |
236 | * |
237 | * @param inputs the input chunk |
238 | * @return true if it is empty |
239 | * |
240 | * @see #initializeUserData(Chunk) |
241 | */ |
242 | protected boolean isComplete(Chunk<I> inputs) { |
243 | return inputs.isEmpty(); |
244 | } |
245 | |
246 | /** |
247 | * Extension point for subclasses that want to adjust the outputs based on |
248 | * additional saved data in the inputs. Default implementation just returns |
249 | * the outputs unchanged. |
250 | * |
251 | * @param inputs the inputs for the transformation |
252 | * @param outputs the result of the transformation |
253 | * @return the outputs unchanged |
254 | * |
255 | * @see #initializeUserData(Chunk) |
256 | */ |
257 | protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) { |
258 | return outputs; |
259 | } |
260 | |
261 | /** |
262 | * Simple implementation delegates to the {@link #doWrite(List)} method and |
263 | * increments the write count in the contribution. Subclasses can handle |
264 | * more complicated scenarios, e.g.with fault tolerance. If output items are |
265 | * skipped they should be removed from the inputs as well. |
266 | * |
267 | * @param contribution the current step contribution |
268 | * @param inputs the inputs that gave rise to the ouputs |
269 | * @param outputs the outputs to write |
270 | * @throws Exception if there is a problem |
271 | */ |
272 | protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception { |
273 | try { |
274 | doWrite(outputs.getItems()); |
275 | } |
276 | catch (Exception e) { |
277 | /* |
278 | * For a simple chunk processor (no fault tolerance) we are done |
279 | * here, so prevent any more processing of these inputs. |
280 | */ |
281 | inputs.clear(); |
282 | throw e; |
283 | } |
284 | contribution.incrementWriteCount(outputs.size()); |
285 | } |
286 | |
287 | protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception { |
288 | Chunk<O> outputs = new Chunk<O>(); |
289 | for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { |
290 | final I item = iterator.next(); |
291 | O output; |
292 | try { |
293 | output = doProcess(item); |
294 | } |
295 | catch (Exception e) { |
296 | /* |
297 | * For a simple chunk processor (no fault tolerance) we are done |
298 | * here, so prevent any more processing of these inputs. |
299 | */ |
300 | inputs.clear(); |
301 | throw e; |
302 | } |
303 | if (output != null) { |
304 | outputs.add(output); |
305 | } |
306 | else { |
307 | iterator.remove(); |
308 | } |
309 | } |
310 | return outputs; |
311 | } |
312 | |
313 | } |