1 | /* |
2 | * Copyright 2006-2014 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.builder; |
17 | |
18 | import org.springframework.batch.core.ChunkListener; |
19 | import org.springframework.batch.core.ItemProcessListener; |
20 | import org.springframework.batch.core.ItemReadListener; |
21 | import org.springframework.batch.core.ItemWriteListener; |
22 | import org.springframework.batch.core.StepExecutionListener; |
23 | import org.springframework.batch.core.StepListener; |
24 | import org.springframework.batch.core.annotation.AfterProcess; |
25 | import org.springframework.batch.core.annotation.AfterRead; |
26 | import org.springframework.batch.core.annotation.AfterWrite; |
27 | import org.springframework.batch.core.annotation.BeforeProcess; |
28 | import org.springframework.batch.core.annotation.BeforeRead; |
29 | import org.springframework.batch.core.annotation.BeforeWrite; |
30 | import org.springframework.batch.core.annotation.OnProcessError; |
31 | import org.springframework.batch.core.annotation.OnReadError; |
32 | import org.springframework.batch.core.annotation.OnWriteError; |
33 | import org.springframework.batch.core.listener.StepListenerFactoryBean; |
34 | import org.springframework.batch.core.step.item.ChunkOrientedTasklet; |
35 | import org.springframework.batch.core.step.item.SimpleChunkProcessor; |
36 | import org.springframework.batch.core.step.item.SimpleChunkProvider; |
37 | import org.springframework.batch.core.step.tasklet.Tasklet; |
38 | import org.springframework.batch.core.step.tasklet.TaskletStep; |
39 | import org.springframework.batch.item.ItemProcessor; |
40 | import org.springframework.batch.item.ItemReader; |
41 | import org.springframework.batch.item.ItemStream; |
42 | import org.springframework.batch.item.ItemWriter; |
43 | import org.springframework.batch.repeat.CompletionPolicy; |
44 | import org.springframework.batch.repeat.RepeatOperations; |
45 | import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; |
46 | import org.springframework.batch.repeat.support.RepeatTemplate; |
47 | import org.springframework.batch.support.ReflectionUtils; |
48 | import org.springframework.util.Assert; |
49 | |
50 | import java.lang.reflect.Method; |
51 | import java.util.ArrayList; |
52 | import java.util.HashSet; |
53 | import java.util.LinkedHashSet; |
54 | import java.util.Set; |
55 | |
56 | /** |
57 | * Step builder for simple item processing (chunk oriented) steps. Items are read and cached in chunks, and then |
58 | * processed (transformed) and written (optionally either the processor or the writer can be omitted) all in the same |
59 | * transaction. |
60 | * |
61 | * @see FaultTolerantStepBuilder for a step that handles retry and skip of failed items |
62 | * |
63 | * @author Dave Syer |
64 | * |
65 | * @since 2.2 |
66 | */ |
67 | public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> { |
68 | |
69 | private static final int DEFAULT_COMMIT_INTERVAL = 1; |
70 | |
71 | private ItemReader<? extends I> reader; |
72 | |
73 | private ItemWriter<? super O> writer; |
74 | |
75 | private ItemProcessor<? super I, ? extends O> processor; |
76 | |
77 | private int chunkSize = 0; |
78 | |
79 | private RepeatOperations chunkOperations; |
80 | |
81 | private CompletionPolicy completionPolicy; |
82 | |
83 | private Set<StepListener> itemListeners = new LinkedHashSet<StepListener>(); |
84 | |
85 | private boolean readerTransactionalQueue = false; |
86 | |
87 | /** |
88 | * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used. |
89 | * |
90 | * @param parent a parent helper containing common step properties |
91 | */ |
92 | public SimpleStepBuilder(StepBuilderHelper<?> parent) { |
93 | super(parent); |
94 | } |
95 | |
96 | /** |
97 | * Create a new builder initialized with any properties in the parent. The parent is copied, so it can be re-used. |
98 | * |
99 | * @param parent a parent helper containing common step properties |
100 | */ |
101 | protected SimpleStepBuilder(SimpleStepBuilder<I, O> parent) { |
102 | super(parent); |
103 | this.chunkSize = parent.chunkSize; |
104 | this.completionPolicy = parent.completionPolicy; |
105 | this.chunkOperations = parent.chunkOperations; |
106 | this.reader = parent.reader; |
107 | this.writer = parent.writer; |
108 | this.processor = parent.processor; |
109 | this.itemListeners = parent.itemListeners; |
110 | this.readerTransactionalQueue = parent.readerTransactionalQueue; |
111 | } |
112 | |
113 | public FaultTolerantStepBuilder<I, O> faultTolerant() { |
114 | FaultTolerantStepBuilder<I, O> builder = new FaultTolerantStepBuilder<I, O>(this); |
115 | return builder; |
116 | } |
117 | |
118 | /** |
119 | * Build a step with the reader, writer, processor as provided. |
120 | * |
121 | * @see org.springframework.batch.core.step.builder.AbstractTaskletStepBuilder#build() |
122 | */ |
123 | @Override |
124 | public TaskletStep build() { |
125 | registerStepListenerAsItemListener(); |
126 | registerAsStreamsAndListeners(reader, processor, writer); |
127 | return super.build(); |
128 | } |
129 | |
130 | private void registerStepListenerAsItemListener() { |
131 | for (StepExecutionListener stepExecutionListener: properties.getStepExecutionListeners()){ |
132 | checkAndAddItemListener(stepExecutionListener); |
133 | } |
134 | for (ChunkListener chunkListener: this.chunkListeners){ |
135 | checkAndAddItemListener(chunkListener); |
136 | } |
137 | } |
138 | |
139 | @SuppressWarnings("unchecked") |
140 | private void checkAndAddItemListener(StepListener stepListener) { |
141 | if (stepListener instanceof ItemReadListener){ |
142 | listener((ItemReadListener<I>)stepListener); |
143 | } |
144 | if (stepListener instanceof ItemProcessListener){ |
145 | listener((ItemProcessListener<I,O>)stepListener); |
146 | } |
147 | if (stepListener instanceof ItemWriteListener){ |
148 | listener((ItemWriteListener<O>)stepListener); |
149 | } |
150 | } |
151 | |
152 | @Override |
153 | protected Tasklet createTasklet() { |
154 | Assert.state(reader != null, "ItemReader must be provided"); |
155 | Assert.state(processor != null || writer != null, "ItemWriter or ItemProcessor must be provided"); |
156 | RepeatOperations repeatOperations = createChunkOperations(); |
157 | SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider<I>(reader, repeatOperations); |
158 | SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor<I, O>(processor, writer); |
159 | chunkProvider.setListeners(new ArrayList<StepListener>(itemListeners)); |
160 | chunkProcessor.setListeners(new ArrayList<StepListener>(itemListeners)); |
161 | ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet<I>(chunkProvider, chunkProcessor); |
162 | tasklet.setBuffering(!readerTransactionalQueue); |
163 | return tasklet; |
164 | } |
165 | |
166 | /** |
167 | * Sets the chunk size or commit interval for this step. This is the maximum number of items that will be read |
168 | * before processing starts in a single transaction. Not compatible with {@link #completionPolicy} |
169 | * . |
170 | * |
171 | * @param chunkSize the chunk size (a.k.a commit interval) |
172 | * @return this for fluent chaining |
173 | */ |
174 | public SimpleStepBuilder<I, O> chunk(int chunkSize) { |
175 | Assert.state(completionPolicy == null || chunkSize == 0, |
176 | "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); |
177 | this.chunkSize = chunkSize; |
178 | return this; |
179 | } |
180 | |
181 | /** |
182 | * Sets a completion policy for the chunk processing. Items are read until this policy determines that a chunk is |
183 | * complete, giving more control than with just the {@link #chunk(int) chunk size} (or commit interval). |
184 | * |
185 | * @param completionPolicy a completion policy for the chunk |
186 | * @return this for fluent chaining |
187 | */ |
188 | public SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) { |
189 | Assert.state(chunkSize == 0 || completionPolicy == null, |
190 | "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); |
191 | this.completionPolicy = completionPolicy; |
192 | return this; |
193 | } |
194 | |
195 | /** |
196 | * An item reader that provides a stream of items. Will be automatically registered as a {@link #stream(ItemStream)} |
197 | * or listener if it implements the corresponding interface. By default assumed to be non-transactional. |
198 | * |
199 | * @see #readerTransactionalQueue |
200 | * @param reader an item reader |
201 | * @return this for fluent chaining |
202 | */ |
203 | public SimpleStepBuilder<I, O> reader(ItemReader<? extends I> reader) { |
204 | this.reader = reader; |
205 | return this; |
206 | } |
207 | |
208 | /** |
209 | * An item writer that writes a chunk of items. Will be automatically registered as a {@link #stream(ItemStream)} or |
210 | * listener if it implements the corresponding interface. |
211 | * |
212 | * @param writer an item writer |
213 | * @return this for fluent chaining |
214 | */ |
215 | public SimpleStepBuilder<I, O> writer(ItemWriter<? super O> writer) { |
216 | this.writer = writer; |
217 | return this; |
218 | } |
219 | |
220 | /** |
221 | * An item processor that processes or transforms a stream of items. Will be automatically registered as a |
222 | * {@link #stream(ItemStream)} or listener if it implements the corresponding interface. |
223 | * |
224 | * @param processor an item processor |
225 | * @return this for fluent chaining |
226 | */ |
227 | public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) { |
228 | this.processor = processor; |
229 | return this; |
230 | } |
231 | |
232 | /** |
233 | * Sets a flag to say that the reader is transactional (usually a queue), which is to say that failed items might be |
234 | * rolled back and re-presented in a subsequent transaction. Default is false, meaning that the items are read |
235 | * outside a transaction and possibly cached. |
236 | * |
237 | * @return this for fluent chaining |
238 | */ |
239 | public SimpleStepBuilder<I, O> readerIsTransactionalQueue() { |
240 | this.readerTransactionalQueue = true; |
241 | return this; |
242 | } |
243 | |
244 | /** |
245 | * Registers objects using the annotation based listener configuration. |
246 | * |
247 | * @param listener the object that has a method configured with listener annotation |
248 | * @return this for fluent chaining |
249 | */ |
250 | @Override |
251 | public SimpleStepBuilder listener(Object listener) { |
252 | super.listener(listener); |
253 | |
254 | Set<Method> itemListenerMethods = new HashSet<Method>(); |
255 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeRead.class)); |
256 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterRead.class)); |
257 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeProcess.class)); |
258 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterProcess.class)); |
259 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), BeforeWrite.class)); |
260 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), AfterWrite.class)); |
261 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnReadError.class)); |
262 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnProcessError.class)); |
263 | itemListenerMethods.addAll(ReflectionUtils.findMethod(listener.getClass(), OnWriteError.class)); |
264 | |
265 | if(itemListenerMethods.size() > 0) { |
266 | StepListenerFactoryBean factory = new StepListenerFactoryBean(); |
267 | factory.setDelegate(listener); |
268 | itemListeners.add((StepListener) factory.getObject()); |
269 | } |
270 | |
271 | @SuppressWarnings("unchecked") |
272 | SimpleStepBuilder result = this; |
273 | return result; |
274 | } |
275 | |
276 | |
277 | /** |
278 | * Register an item reader listener. |
279 | * |
280 | * @param listener the listener to register |
281 | * @return this for fluent chaining |
282 | */ |
283 | public SimpleStepBuilder<I, O> listener(ItemReadListener<? super I> listener) { |
284 | itemListeners.add(listener); |
285 | return this; |
286 | } |
287 | |
288 | /** |
289 | * Register an item writer listener. |
290 | * |
291 | * @param listener the listener to register |
292 | * @return this for fluent chaining |
293 | */ |
294 | public SimpleStepBuilder<I, O> listener(ItemWriteListener<? super O> listener) { |
295 | itemListeners.add(listener); |
296 | return this; |
297 | } |
298 | |
299 | /** |
300 | * Register an item processor listener. |
301 | * |
302 | * @param listener the listener to register |
303 | * @return this for fluent chaining |
304 | */ |
305 | public SimpleStepBuilder<I, O> listener(ItemProcessListener<? super I, ? super O> listener) { |
306 | itemListeners.add(listener); |
307 | return this; |
308 | } |
309 | |
310 | /** |
311 | * Instead of a {@link #chunk(int) chunk size} or {@link #chunk(CompletionPolicy) completion policy} you can provide |
312 | * a complete repeat operations instance that handles the iteration over the item reader. |
313 | * |
314 | * @param repeatTemplate a cmplete repeat template for the chunk |
315 | * @return this for fluent chaining |
316 | */ |
317 | public SimpleStepBuilder<I, O> chunkOperations(RepeatOperations repeatTemplate) { |
318 | this.chunkOperations = repeatTemplate; |
319 | return this; |
320 | } |
321 | |
322 | protected RepeatOperations createChunkOperations() { |
323 | RepeatOperations repeatOperations = chunkOperations; |
324 | if (repeatOperations == null) { |
325 | RepeatTemplate repeatTemplate = new RepeatTemplate(); |
326 | repeatTemplate.setCompletionPolicy(getChunkCompletionPolicy()); |
327 | repeatOperations = repeatTemplate; |
328 | } |
329 | return repeatOperations; |
330 | } |
331 | |
332 | protected ItemReader<? extends I> getReader() { |
333 | return reader; |
334 | } |
335 | |
336 | protected ItemWriter<? super O> getWriter() { |
337 | return writer; |
338 | } |
339 | |
340 | protected ItemProcessor<? super I, ? extends O> getProcessor() { |
341 | return processor; |
342 | } |
343 | |
344 | protected int getChunkSize() { |
345 | return chunkSize; |
346 | } |
347 | |
348 | protected boolean isReaderTransactionalQueue() { |
349 | return readerTransactionalQueue; |
350 | } |
351 | |
352 | protected Set<StepListener> getItemListeners() { |
353 | return itemListeners; |
354 | } |
355 | |
356 | /** |
357 | * @return a {@link CompletionPolicy} consistent with the chunk size and injected policy (if present). |
358 | */ |
359 | private CompletionPolicy getChunkCompletionPolicy() { |
360 | Assert.state(!(completionPolicy != null && chunkSize > 0), |
361 | "You must specify either a chunkCompletionPolicy or a commitInterval but not both."); |
362 | Assert.state(chunkSize >= 0, "The commitInterval must be positive or zero (for default value)."); |
363 | |
364 | if (completionPolicy != null) { |
365 | return completionPolicy; |
366 | } |
367 | if (chunkSize == 0) { |
368 | logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")"); |
369 | chunkSize = DEFAULT_COMMIT_INTERVAL; |
370 | } |
371 | return new SimpleCompletionPolicy(chunkSize); |
372 | } |
373 | |
374 | private void registerAsStreamsAndListeners(ItemReader<? extends I> itemReader, |
375 | ItemProcessor<? super I, ? extends O> itemProcessor, ItemWriter<? super O> itemWriter) { |
376 | for (Object itemHandler : new Object[] { itemReader, itemWriter, itemProcessor }) { |
377 | if (itemHandler instanceof ItemStream) { |
378 | stream((ItemStream) itemHandler); |
379 | } |
380 | if (StepListenerFactoryBean.isListener(itemHandler)) { |
381 | StepListener listener = StepListenerFactoryBean.getListener(itemHandler); |
382 | if (listener instanceof StepExecutionListener) { |
383 | listener((StepExecutionListener) listener); |
384 | } |
385 | if (listener instanceof ChunkListener) { |
386 | listener((ChunkListener) listener); |
387 | } |
388 | if (listener instanceof ItemReadListener<?> || listener instanceof ItemProcessListener<?, ?> |
389 | || listener instanceof ItemWriteListener<?>) { |
390 | itemListeners.add(listener); |
391 | } |
392 | } |
393 | } |
394 | } |
395 | |
396 | } |