1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.step.item; |
18 | |
19 | import java.util.List; |
20 | |
21 | import org.springframework.batch.core.StepContribution; |
22 | import org.springframework.batch.core.StepListener; |
23 | import org.springframework.batch.core.listener.MulticasterBatchListener; |
24 | import org.springframework.batch.item.ItemProcessor; |
25 | import org.springframework.batch.item.ItemWriter; |
26 | import org.springframework.beans.factory.InitializingBean; |
27 | import org.springframework.util.Assert; |
28 | |
29 | /** |
30 | * Simple implementation of the {@link ChunkProcessor} interface that handles |
31 | * basic item writing and processing. Any exceptions encountered will be |
32 | * rethrown. |
33 | * |
34 | * @see ChunkOrientedTasklet |
35 | */ |
36 | public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean { |
37 | |
38 | private ItemProcessor<? super I, ? extends O> itemProcessor; |
39 | |
40 | private ItemWriter<? super O> itemWriter; |
41 | |
42 | private final MulticasterBatchListener<I, O> listener = new MulticasterBatchListener<I, O>(); |
43 | |
44 | /** |
45 | * Default constructor for ease of configuration (both itemWriter and |
46 | * itemProcessor are mandatory). |
47 | */ |
48 | @SuppressWarnings("unused") |
49 | private SimpleChunkProcessor() { |
50 | this(null, null); |
51 | } |
52 | |
53 | public SimpleChunkProcessor(ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) { |
54 | this.itemProcessor = itemProcessor; |
55 | this.itemWriter = itemWriter; |
56 | } |
57 | |
58 | /** |
59 | * @param itemProcessor the {@link ItemProcessor} to set |
60 | */ |
61 | public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) { |
62 | this.itemProcessor = itemProcessor; |
63 | } |
64 | |
65 | /** |
66 | * @param itemWriter the {@link ItemWriter} to set |
67 | */ |
68 | public void setItemWriter(ItemWriter<? super O> itemWriter) { |
69 | this.itemWriter = itemWriter; |
70 | } |
71 | |
72 | /** |
73 | * Check mandatory properties. |
74 | * |
75 | * @see InitializingBean#afterPropertiesSet() |
76 | */ |
77 | public void afterPropertiesSet() throws Exception { |
78 | Assert.notNull(itemWriter, "ItemWriter must be set"); |
79 | Assert.notNull(itemProcessor, "ItemProcessor must be set"); |
80 | } |
81 | |
82 | /** |
83 | * Register some {@link StepListener}s with the handler. Each will get the |
84 | * callbacks in the order specified at the correct stage. |
85 | * |
86 | * @param listeners |
87 | */ |
88 | public void setListeners(List<? extends StepListener> listeners) { |
89 | for (StepListener listener : listeners) { |
90 | registerListener(listener); |
91 | } |
92 | } |
93 | |
94 | /** |
95 | * Register a listener for callbacks at the appropriate stages in a process. |
96 | * |
97 | * @param listener a {@link StepListener} |
98 | */ |
99 | public void registerListener(StepListener listener) { |
100 | this.listener.register(listener); |
101 | } |
102 | |
103 | /** |
104 | * @return the listener |
105 | */ |
106 | protected MulticasterBatchListener<I, O> getListener() { |
107 | return listener; |
108 | } |
109 | |
110 | /** |
111 | * @param item the input item |
112 | * @return the result of the processing |
113 | * @throws Exception |
114 | */ |
115 | protected final O doProcess(I item) throws Exception { |
116 | try { |
117 | listener.beforeProcess(item); |
118 | O result = itemProcessor.process(item); |
119 | listener.afterProcess(item, result); |
120 | return result; |
121 | } |
122 | catch (Exception e) { |
123 | listener.onProcessError(item, e); |
124 | throw e; |
125 | } |
126 | } |
127 | |
128 | /** |
129 | * Surrounds the actual write call with listener callbacks. |
130 | * |
131 | * @param items |
132 | * @throws Exception |
133 | */ |
134 | protected final void doWrite(List<O> items) throws Exception { |
135 | try { |
136 | listener.beforeWrite(items); |
137 | writeItems(items); |
138 | doAfterWrite(items); |
139 | } |
140 | catch (Exception e) { |
141 | listener.onWriteError(e, items); |
142 | throw e; |
143 | } |
144 | } |
145 | |
146 | /** |
147 | * Call the listener's after write method. |
148 | * |
149 | * @param items |
150 | */ |
151 | protected final void doAfterWrite(List<O> items) { |
152 | listener.afterWrite(items); |
153 | } |
154 | |
155 | protected void writeItems(List<O> items) throws Exception { |
156 | itemWriter.write(items); |
157 | } |
158 | |
159 | public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception { |
160 | |
161 | // Allow temporary state to be stored in the user data field |
162 | initializeUserData(inputs); |
163 | |
164 | // If there is no input we don't have to do anything more |
165 | if (isComplete(inputs)) { |
166 | return; |
167 | } |
168 | |
169 | // Make the transformation, calling remove() on the inputs iterator if |
170 | // any items are filtered. Might throw exception and cause rollback. |
171 | Chunk<O> outputs = transform(contribution, inputs); |
172 | |
173 | // Adjust the filter count based on available data |
174 | contribution.incrementFilterCount(getFilterCount(inputs, outputs)); |
175 | |
176 | // Adjust the outputs if necessary for housekeeping purposes, and then |
177 | // write them out... |
178 | write(contribution, inputs, getAdjustedOutputs(inputs, outputs)); |
179 | |
180 | } |
181 | |
182 | /** |
183 | * Extension point for subclasses to allow them to memorise the contents of |
184 | * the inputs, in case they are needed for accounting purposes later. The |
185 | * default implementation sets up some user data to remember the original |
186 | * size of the inputs. If this method is overridden then some or all of |
187 | * {@link #isComplete(Chunk)}, {@link #getFilterCount(Chunk, Chunk)} and |
188 | * {@link #getAdjustedOutputs(Chunk, Chunk)} might also need to be, to |
189 | * ensure that the user data is handled consistently. |
190 | * |
191 | * @param inputs the inputs for the process |
192 | */ |
193 | protected void initializeUserData(Chunk<I> inputs) { |
194 | inputs.setUserData(inputs.size()); |
195 | } |
196 | |
197 | /** |
198 | * Extension point for subclasses to calculate the filter count. Defaults to |
199 | * the difference between input size and output size. |
200 | * |
201 | * @param inputs the inputs after transformation |
202 | * @param outputs the outputs after transformation |
203 | * |
204 | * @return the difference in sizes |
205 | * |
206 | * @see #initializeUserData(Chunk) |
207 | */ |
208 | protected int getFilterCount(Chunk<I> inputs, Chunk<O> outputs) { |
209 | return (Integer) inputs.getUserData() - outputs.size(); |
210 | } |
211 | |
212 | /** |
213 | * Extension point for subclasses that want to store additional data in the |
214 | * inputs. Default just checks if inputs are empty. |
215 | * |
216 | * @param inputs the input chunk |
217 | * @return true if it is empty |
218 | * |
219 | * @see #initializeUserData(Chunk) |
220 | */ |
221 | protected boolean isComplete(Chunk<I> inputs) { |
222 | return inputs.isEmpty(); |
223 | } |
224 | |
225 | /** |
226 | * Extension point for subclasses that want to adjust the outputs based on |
227 | * additional saved data in the inputs. Default implementation just returns |
228 | * the outputs unchanged. |
229 | * |
230 | * @param inputs the inputs for the transformation |
231 | * @param outputs the result of the transformation |
232 | * @return the outputs unchanged |
233 | * |
234 | * @see #initializeUserData(Chunk) |
235 | */ |
236 | protected Chunk<O> getAdjustedOutputs(Chunk<I> inputs, Chunk<O> outputs) { |
237 | return outputs; |
238 | } |
239 | |
240 | /** |
241 | * Simple implementation delegates to the {@link #doWrite(List)} method and |
242 | * increments the write count in the contribution. Subclasses can handle |
243 | * more complicated scenarios, e.g.with fault tolerance. If output items are |
244 | * skipped they should be removed from the inputs as well. |
245 | * |
246 | * @param contribution the current step contribution |
247 | * @param inputs the inputs that gave rise to the ouputs |
248 | * @param outputs the outputs to write |
249 | * @throws Exception if there is a problem |
250 | */ |
251 | protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception { |
252 | doWrite(outputs.getItems()); |
253 | contribution.incrementWriteCount(outputs.size()); |
254 | } |
255 | |
256 | protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception { |
257 | Chunk<O> outputs = new Chunk<O>(); |
258 | for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) { |
259 | final I item = iterator.next(); |
260 | O output = doProcess(item); |
261 | if (output != null) { |
262 | outputs.add(output); |
263 | } |
264 | else { |
265 | iterator.remove(); |
266 | } |
267 | } |
268 | return outputs; |
269 | } |
270 | |
271 | } |