1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.List; |
20 | |
21 | import org.springframework.batch.core.StepContribution; |
22 | import org.springframework.batch.core.StepListener; |
23 | import org.springframework.batch.core.listener.MulticasterBatchListener; |
24 | import org.springframework.batch.item.ItemProcessor; |
25 | import org.springframework.batch.item.ItemWriter; |
26 | import org.springframework.beans.factory.InitializingBean; |
27 | import org.springframework.util.Assert; |
28 | |
29 | /** |
30 | * Simple implementation of the {@link ChunkProcessor} interface that handles |
31 | * basic item writing and processing. Any exceptions encountered will be |
32 | * rethrown. |
33 | * |
34 | * @see ChunkOrientedTasklet |
35 | */ |
36 | public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean { |
37 | |
38 | private ItemProcessor<? super I, ? extends O> itemProcessor; |
39 | |
40 | private ItemWriter<? super O> itemWriter; |
41 | |
42 | private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>(); |
43 | |
44 | /** |
45 | * Default constructor for ease of configuration (both itemWriter and |
46 | * itemProcessor are mandatory). |
47 | */ |
48 | @SuppressWarnings("unused") |
49 | private SimpleChunkProcessor() { |
50 | this(null, null); |
51 | } |
52 | |
53 | public SimpleChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) { |
54 | this.itemProcessor = itemProcessor; |
55 | this.itemWriter = itemWriter; |
56 | } |
57 | |
58 | /** |
59 | * @param itemProcessor the {@link ItemProcessor} to set |
60 | */ |
61 | public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) { |
62 | this.itemProcessor = itemProcessor; |
63 | } |
64 | |
65 | /** |
66 | * @param itemWriter the {@link ItemWriter} to set |
67 | */ |
68 | public void setItemWriter(ItemWriter<? super O> itemWriter) { |
69 | this.itemWriter = itemWriter; |
70 | } |
71 | |
72 | /** |
73 | * Check mandatory properties. |
74 | * |
75 | * @see InitializingBean#afterPropertiesSet() |
76 | */ |
77 | public void afterPropertiesSet() throws Exception { |
78 | Assert.notNull(itemWriter, "ItemWriter must be set"); |
79 | Assert.notNull(itemProcessor, "ItemProcessor must be set"); |
80 | } |
81 | |
82 | /** |
83 | * Register some {@link StepListener}s with the handler. Each will get the |
84 | * callbacks in the order specified at the correct stage. |
85 | * |
86 | * @param listeners |
87 | */ |
88 | public void setListeners(List<? extends StepListener> listeners) { |
89 | for (StepListener listener : listeners) { |
90 | registerListener(listener); |
91 | } |
92 | } |
93 | |
94 | /** |
95 | * Register a listener for callbacks at the appropriate stages in a process. |
96 | * |
97 | * @param listener a {@link StepListener} |
98 | */ |
99 | public void registerListener(StepListener listener) { |
100 | this.listener.register(listener); |
101 | } |
102 | |
103 | /** |
104 | * @return the listener |
105 | */ |
106 | protected MulticasterBatchListener<I, O> getListener() { |
107 | return listener; |
108 | } |
109 | |
110 | /** |
111 | * @param item the input item |
112 | * @return the result of the processing |
113 | * @throws Exception |
114 | */ |
115 | protected final O doProcess(I item) throws Exception { |
116 | |
117 | if (itemProcessor == null) { |
118 | @SuppressWarnings("unchecked") |
119 | O result = (O) item; |
120 | return result; |
121 | } |
122 | |
123 | try { |
124 | listener.beforeProcess(item); |
125 | O result = itemProcessor.process(item); |
126 | listener.afterProcess(item, result); |
127 | return result; |
128 | } |
129 | catch (Exception e) { |
130 | listener.onProcessError(item, e); |
131 | throw e; |
132 | } |
133 | |
134 | } |
135 | |
136 | /** |
137 | * Surrounds the actual write call with listener callbacks. |
138 | * |
139 | * @param items |
140 | * @throws Exception |
141 | */ |
142 | protected final void doWrite(List<O> items) throws Exception { |
143 | |
144 | if (itemWriter == null) { |
145 | return; |
146 | } |
147 | |
148 | try { |
149 | listener.beforeWrite(items); |
150 | writeItems(items); |
151 | doAfterWrite(items); |
152 | } |
153 | catch (Exception e) { |
154 | doOnWriteError(e, items); |
155 | throw e; |
156 | } |
157 | |
158 | } |
159 | |
160 | /** |
161 | * Call the listener's after write method. |
162 | * |
163 | * @param items |
164 | */ |
165 | protected final void doAfterWrite(List<O> items) { |
166 | listener.afterWrite(items); |
167 | } |
168 | protected final void doOnWriteError(Exception e, List<O> items) { |
169 | listener.onWriteError(e, items); |
170 | } |
171 | |
172 | protected void writeItems(List<O> items) throws Exception { |
173 | if (itemWriter != null) { |
174 | itemWriter.write(items); |
175 | } |
176 | } |
177 | |
178 | public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception { |
179 | |
180 | // Allow temporary state to be stored in the user data field |
181 | initializeUserData(inputs); |
182 | |
183 | // If there is no input we don't have to do anything more |
184 | if (isComplete(inputs)) { |
185 | return; |
186 | } |
187 | |
188 | // Make the transformation, calling remove() on the inputs iterator if |
189 | // any items are filtered. Might throw exception and cause rollback. |
190 | Chunk<O> outputs = transform(contribution, inputs); |
191 | |
192 | // Adjust the filter count based on available data |
193 | contribution.incrementFilterCount(getFilterCount(inputs, outputs)); |
194 | |
195 | // Adjust the outputs if necessary for housekeeping purposes, and then |
196 | // write them out... |
197 | write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); |
198 | |
199 | } |
200 | |
201 | /** |
202 | * Extension point for subclasses to allow them to memorise the contents of |
203 | * the inputs, in case they are needed for accounting purposes later. The |
204 | * default implementation sets up some user data to remember the original |
205 | * size of the inputs. If this method is overridden then some or all of |
206 | * {@link #isComplete(Chunk)}, {@link #getFilterCount(Chunk, Chunk)} and |
207 | * {@link #getAdjustedOutputs(Chunk, Chunk)} might also need to be, to |
208 | * ensure that the user data is handled consistently. |
209 | * |
210 | * @param inputs the inputs for the process |
211 | */ |
212 | protected void initializeUserData(Chunk<I> inputs) { |
213 | inputs.setUserData(inputs.size()); |
214 | } |
215 | |
216 | /** |
217 | * Extension point for subclasses to calculate the filter count. Defaults to |
218 | * the difference between input size and output size. |
219 | * |
220 | * @param inputs the inputs after transformation |
221 | * @param outputs the outputs after transformation |
222 | * |
223 | * @return the difference in sizes |
224 | * |
225 | * @see #initializeUserData(Chunk) |
226 | */ |
227 | protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) { |
228 | return (Integer) inputs.getUserData() - outputs.size(); |
229 | } |
230 | |
231 | /** |
232 | * Extension point for subclasses that want to store additional data in the |
233 | * inputs. Default just checks if inputs are empty. |
234 | * |
235 | * @param inputs the input chunk |
236 | * @return true if it is empty |
237 | * |
238 | * @see #initializeUserData(Chunk) |
239 | */ |
240 | protected boolean isComplete(Chunk<I> inputs) { |
241 | return inputs.isEmpty(); |
242 | } |
243 | |
244 | /** |
245 | * Extension point for subclasses that want to adjust the outputs based on |
246 | * additional saved data in the inputs. Default implementation just returns |
247 | * the outputs unchanged. |
248 | * |
249 | * @param inputs the inputs for the transformation |
250 | * @param outputs the result of the transformation |
251 | * @return the outputs unchanged |
252 | * |
253 | * @see #initializeUserData(Chunk) |
254 | */ |
255 | protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) { |
256 | return outputs; |
257 | } |
258 | |
259 | /** |
260 | * Simple implementation delegates to the {@link #doWrite(List)} method and |
261 | * increments the write count in the contribution. Subclasses can handle |
262 | * more complicated scenarios, e.g.with fault tolerance. If output items are |
263 | * skipped they should be removed from the inputs as well. |
264 | * |
265 | * @param contribution the current step contribution |
266 | * @param inputs the inputs that gave rise to the ouputs |
267 | * @param outputs the outputs to write |
268 | * @throws Exception if there is a problem |
269 | */ |
270 | protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception { |
271 | try { |
272 | doWrite(outputs.getItems()); |
273 | } |
274 | catch (Exception e) { |
275 | /* |
276 | * For a simple chunk processor (no fault tolerance) we are done |
277 | * here, so prevent any more processing of these inputs. |
278 | */ |
279 | inputs.clear(); |
280 | throw e; |
281 | } |
282 | contribution.incrementWriteCount(outputs.size()); |
283 | } |
284 | |
285 | protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception { |
286 | Chunk<O> outputs = new Chunk<O>(); |
287 | for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { |
288 | final I item = iterator.next(); |
289 | O output; |
290 | try { |
291 | output = doProcess(item); |
292 | } |
293 | catch (Exception e) { |
294 | /* |
295 | * For a simple chunk processor (no fault tolerance) we are done |
296 | * here, so prevent any more processing of these inputs. |
297 | */ |
298 | inputs.clear(); |
299 | throw e; |
300 | } |
301 | if (output != null) { |
302 | outputs.add(output); |
303 | } |
304 | else { |
305 | iterator.remove(); |
306 | } |
307 | } |
308 | return outputs; |
309 | } |
310 | |
311 | } |