1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | package org.springframework.batch.core.step.factory; |
17 | |
18 | import org.apache.commons.logging.Log; |
19 | import org.apache.commons.logging.LogFactory; |
20 | import org.springframework.batch.core.ChunkListener; |
21 | import org.springframework.batch.core.ItemProcessListener; |
22 | import org.springframework.batch.core.ItemReadListener; |
23 | import org.springframework.batch.core.ItemWriteListener; |
24 | import org.springframework.batch.core.Step; |
25 | import org.springframework.batch.core.StepExecutionListener; |
26 | import org.springframework.batch.core.StepListener; |
27 | import org.springframework.batch.core.repository.JobRepository; |
28 | import org.springframework.batch.core.step.builder.SimpleStepBuilder; |
29 | import org.springframework.batch.core.step.builder.StepBuilder; |
30 | import org.springframework.batch.core.step.tasklet.TaskletStep; |
31 | import org.springframework.batch.item.ItemProcessor; |
32 | import org.springframework.batch.item.ItemReader; |
33 | import org.springframework.batch.item.ItemStream; |
34 | import org.springframework.batch.item.ItemWriter; |
35 | import org.springframework.batch.repeat.CompletionPolicy; |
36 | import org.springframework.batch.repeat.RepeatOperations; |
37 | import org.springframework.batch.repeat.exception.DefaultExceptionHandler; |
38 | import org.springframework.batch.repeat.exception.ExceptionHandler; |
39 | import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; |
40 | import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; |
41 | import org.springframework.beans.factory.BeanNameAware; |
42 | import org.springframework.beans.factory.FactoryBean; |
43 | import org.springframework.core.task.TaskExecutor; |
44 | import org.springframework.transaction.PlatformTransactionManager; |
45 | import org.springframework.transaction.annotation.Isolation; |
46 | import org.springframework.transaction.annotation.Propagation; |
47 | import org.springframework.transaction.interceptor.DefaultTransactionAttribute; |
48 | import org.springframework.transaction.interceptor.TransactionAttribute; |
49 | |
50 | /** |
51 | * Most common configuration options for simple steps should be found here. Use this factory bean instead of creating a |
52 | * {@link Step} implementation manually. |
53 | * |
54 | * This factory does not support configuration of fault-tolerant behavior, use appropriate subclass of this factory bean |
55 | * to configure skip or retry. |
56 | * |
57 | * @see FaultTolerantStepFactoryBean |
58 | * |
59 | * @author Dave Syer |
60 | * @author Robert Kasanicky |
61 | * |
62 | */ |
63 | @SuppressWarnings("rawtypes") |
64 | public class SimpleStepFactoryBean<T, S> implements FactoryBean, BeanNameAware { |
65 | |
66 | private String name; |
67 | |
68 | private int startLimit = Integer.MAX_VALUE; |
69 | |
70 | private boolean allowStartIfComplete; |
71 | |
72 | private ItemReader<? extends T> itemReader; |
73 | |
74 | private ItemProcessor<? super T, ? extends S> itemProcessor; |
75 | |
76 | private ItemWriter<? super S> itemWriter; |
77 | |
78 | private PlatformTransactionManager transactionManager; |
79 | |
80 | private Propagation propagation = Propagation.REQUIRED; |
81 | |
82 | private Isolation isolation = Isolation.DEFAULT; |
83 | |
84 | private int transactionTimeout = DefaultTransactionAttribute.TIMEOUT_DEFAULT; |
85 | |
86 | private JobRepository jobRepository; |
87 | |
88 | private boolean singleton = true; |
89 | |
90 | private ItemStream[] streams = new ItemStream[0]; |
91 | |
92 | private StepListener[] listeners = new StepListener[0]; |
93 | |
94 | protected final Log logger = LogFactory.getLog(getClass()); |
95 | |
96 | private int commitInterval = 0; |
97 | |
98 | private TaskExecutor taskExecutor; |
99 | |
100 | private RepeatOperations stepOperations; |
101 | |
102 | private RepeatOperations chunkOperations; |
103 | |
104 | private ExceptionHandler exceptionHandler = new DefaultExceptionHandler(); |
105 | |
106 | private CompletionPolicy chunkCompletionPolicy; |
107 | |
108 | private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT; |
109 | |
110 | private boolean isReaderTransactionalQueue = false; |
111 | |
112 | /** |
113 | * Default constructor for {@link SimpleStepFactoryBean}. |
114 | */ |
115 | public SimpleStepFactoryBean() { |
116 | super(); |
117 | } |
118 | |
119 | /** |
120 | * Flag to signal that the reader is transactional (usually a JMS consumer) so that items are re-presented after a |
121 | * rollback. The default is false and readers are assumed to be forward-only. |
122 | * |
123 | * @param isReaderTransactionalQueue the value of the flag |
124 | */ |
125 | public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) { |
126 | this.isReaderTransactionalQueue = isReaderTransactionalQueue; |
127 | } |
128 | |
129 | /** |
130 | * Convenience method for subclasses. |
131 | * @return true if the flag is set (default false) |
132 | */ |
133 | protected boolean isReaderTransactionalQueue() { |
134 | return isReaderTransactionalQueue; |
135 | } |
136 | |
137 | /** |
138 | * Set the bean name property, which will become the name of the {@link Step} when it is created. |
139 | * |
140 | * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) |
141 | */ |
142 | @Override |
143 | public void setBeanName(String name) { |
144 | this.name = name; |
145 | } |
146 | |
147 | /** |
148 | * Public getter for the name of the step. |
149 | * @return the name |
150 | */ |
151 | public String getName() { |
152 | return name; |
153 | } |
154 | |
155 | /** |
156 | * The timeout for an individual transaction in the step. |
157 | * |
158 | * @param transactionTimeout the transaction timeout to set, defaults to infinite |
159 | */ |
160 | public void setTransactionTimeout(int transactionTimeout) { |
161 | this.transactionTimeout = transactionTimeout; |
162 | } |
163 | |
164 | /** |
165 | * @param propagation the propagation to set for business transactions |
166 | */ |
167 | public void setPropagation(Propagation propagation) { |
168 | this.propagation = propagation; |
169 | } |
170 | |
171 | /** |
172 | * @param isolation the isolation to set for business transactions |
173 | */ |
174 | public void setIsolation(Isolation isolation) { |
175 | this.isolation = isolation; |
176 | } |
177 | |
178 | /** |
179 | * Public setter for the start limit for the step. |
180 | * |
181 | * @param startLimit the startLimit to set |
182 | */ |
183 | public void setStartLimit(int startLimit) { |
184 | this.startLimit = startLimit; |
185 | } |
186 | |
187 | /** |
188 | * Public setter for the flag to indicate that the step should be replayed on a restart, even if successful the |
189 | * first time. |
190 | * |
191 | * @param allowStartIfComplete the shouldAllowStartIfComplete to set |
192 | */ |
193 | public void setAllowStartIfComplete(boolean allowStartIfComplete) { |
194 | this.allowStartIfComplete = allowStartIfComplete; |
195 | } |
196 | |
197 | /** |
198 | * @param itemReader the {@link ItemReader} to set |
199 | */ |
200 | public void setItemReader(ItemReader<? extends T> itemReader) { |
201 | this.itemReader = itemReader; |
202 | } |
203 | |
204 | /** |
205 | * @param itemWriter the {@link ItemWriter} to set |
206 | */ |
207 | public void setItemWriter(ItemWriter<? super S> itemWriter) { |
208 | this.itemWriter = itemWriter; |
209 | } |
210 | |
211 | /** |
212 | * @param itemProcessor the {@link ItemProcessor} to set |
213 | */ |
214 | public void setItemProcessor(ItemProcessor<? super T, ? extends S> itemProcessor) { |
215 | this.itemProcessor = itemProcessor; |
216 | } |
217 | |
218 | /** |
219 | * The streams to inject into the {@link Step}. Any instance of {@link ItemStream} can be used, and will then |
220 | * receive callbacks at the appropriate stage in the step. |
221 | * |
222 | * @param streams an array of listeners |
223 | */ |
224 | public void setStreams(ItemStream[] streams) { |
225 | this.streams = streams; |
226 | } |
227 | |
228 | /** |
229 | * The listeners to inject into the {@link Step}. Any instance of {@link StepListener} can be used, and will then |
230 | * receive callbacks at the appropriate stage in the step. |
231 | * |
232 | * @param listeners an array of listeners |
233 | */ |
234 | public void setListeners(StepListener[] listeners) { |
235 | this.listeners = listeners; |
236 | } |
237 | |
238 | /** |
239 | * Protected getter for the {@link StepListener}s. |
240 | * @return the listeners |
241 | */ |
242 | protected StepListener[] getListeners() { |
243 | return listeners; |
244 | } |
245 | |
246 | /** |
247 | * Protected getter for the {@link ItemReader} for subclasses to use. |
248 | * @return the itemReader |
249 | */ |
250 | protected ItemReader<? extends T> getItemReader() { |
251 | return itemReader; |
252 | } |
253 | |
254 | /** |
255 | * Protected getter for the {@link ItemWriter} for subclasses to use |
256 | * @return the itemWriter |
257 | */ |
258 | protected ItemWriter<? super S> getItemWriter() { |
259 | return itemWriter; |
260 | } |
261 | |
262 | /** |
263 | * Protected getter for the {@link ItemProcessor} for subclasses to use |
264 | * @return the itemProcessor |
265 | */ |
266 | protected ItemProcessor<? super T, ? extends S> getItemProcessor() { |
267 | return itemProcessor; |
268 | } |
269 | |
270 | /** |
271 | * Public setter for {@link JobRepository}. |
272 | * |
273 | * @param jobRepository is a mandatory dependence (no default). |
274 | */ |
275 | public void setJobRepository(JobRepository jobRepository) { |
276 | this.jobRepository = jobRepository; |
277 | } |
278 | |
279 | /** |
280 | * Public setter for the {@link PlatformTransactionManager}. |
281 | * |
282 | * @param transactionManager the transaction manager to set |
283 | */ |
284 | public void setTransactionManager(PlatformTransactionManager transactionManager) { |
285 | this.transactionManager = transactionManager; |
286 | } |
287 | |
288 | /** |
289 | * Getter for the {@link TransactionAttribute} for subclasses only. |
290 | * @return the transactionAttribute |
291 | */ |
292 | @SuppressWarnings("serial") |
293 | protected TransactionAttribute getTransactionAttribute() { |
294 | |
295 | DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); |
296 | attribute.setPropagationBehavior(propagation.value()); |
297 | attribute.setIsolationLevel(isolation.value()); |
298 | attribute.setTimeout(transactionTimeout); |
299 | return new DefaultTransactionAttribute(attribute) { |
300 | |
301 | /** |
302 | * Ignore the default behaviour and rollback on all exceptions that bubble up to the tasklet level. The |
303 | * tasklet has to deal with the rollback rules internally. |
304 | */ |
305 | @Override |
306 | public boolean rollbackOn(Throwable ex) { |
307 | return true; |
308 | } |
309 | |
310 | }; |
311 | |
312 | } |
313 | |
314 | /** |
315 | * Create a {@link Step} from the configuration provided. |
316 | * |
317 | * @see FactoryBean#getObject() |
318 | */ |
319 | @Override |
320 | public final Object getObject() throws Exception { |
321 | SimpleStepBuilder<T, S> builder = createBuilder(getName()); |
322 | applyConfiguration(builder); |
323 | TaskletStep step = builder.build(); |
324 | return step; |
325 | } |
326 | |
327 | protected SimpleStepBuilder<T, S> createBuilder(String name) { |
328 | return new SimpleStepBuilder<T, S>(new StepBuilder(name)); |
329 | } |
330 | |
331 | @Override |
332 | public Class<TaskletStep> getObjectType() { |
333 | return TaskletStep.class; |
334 | } |
335 | |
336 | /** |
337 | * Returns true by default, but in most cases a {@link Step} should not be treated as thread safe. Clients are |
338 | * recommended to create a new step for each job execution. |
339 | * |
340 | * @see org.springframework.beans.factory.FactoryBean#isSingleton() |
341 | */ |
342 | @Override |
343 | public boolean isSingleton() { |
344 | return this.singleton; |
345 | } |
346 | |
347 | /** |
348 | * Public setter for the singleton flag. |
349 | * @param singleton the value to set. Defaults to true. |
350 | */ |
351 | public void setSingleton(boolean singleton) { |
352 | this.singleton = singleton; |
353 | } |
354 | |
355 | /** |
356 | * Set the commit interval. Either set this or the chunkCompletionPolicy but not both. |
357 | * |
358 | * @param commitInterval 1 by default |
359 | */ |
360 | public void setCommitInterval(int commitInterval) { |
361 | this.commitInterval = commitInterval; |
362 | } |
363 | |
364 | /** |
365 | * Public setter for the {@link CompletionPolicy} applying to the chunk level. A transaction will be committed when |
366 | * this policy decides to complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size equal to the |
367 | * commitInterval property. |
368 | * |
369 | * @param chunkCompletionPolicy the chunkCompletionPolicy to set |
370 | */ |
371 | public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) { |
372 | this.chunkCompletionPolicy = chunkCompletionPolicy; |
373 | } |
374 | |
375 | /** |
376 | * Protected getter for the step operations to make them available in subclasses. |
377 | * @return the step operations |
378 | */ |
379 | protected RepeatOperations getStepOperations() { |
380 | return stepOperations; |
381 | } |
382 | |
383 | /** |
384 | * Public setter for the stepOperations. |
385 | * @param stepOperations the stepOperations to set |
386 | */ |
387 | public void setStepOperations(RepeatOperations stepOperations) { |
388 | this.stepOperations = stepOperations; |
389 | } |
390 | |
391 | /** |
392 | * Public setter for the chunkOperations. |
393 | * @param chunkOperations the chunkOperations to set |
394 | */ |
395 | public void setChunkOperations(RepeatOperations chunkOperations) { |
396 | this.chunkOperations = chunkOperations; |
397 | } |
398 | |
399 | /** |
400 | * Protected getter for the chunk operations to make them available in subclasses. |
401 | * @return the step operations |
402 | */ |
403 | protected RepeatOperations getChunkOperations() { |
404 | return chunkOperations; |
405 | } |
406 | |
407 | /** |
408 | * Public setter for the {@link ExceptionHandler}. |
409 | * @param exceptionHandler the exceptionHandler to set |
410 | */ |
411 | public void setExceptionHandler(ExceptionHandler exceptionHandler) { |
412 | this.exceptionHandler = exceptionHandler; |
413 | } |
414 | |
415 | /** |
416 | * Protected getter for the {@link ExceptionHandler}. |
417 | * @return the {@link ExceptionHandler} |
418 | */ |
419 | protected ExceptionHandler getExceptionHandler() { |
420 | return exceptionHandler; |
421 | } |
422 | |
423 | /** |
424 | * Public setter for the {@link TaskExecutor}. If this is set, then it will be used to execute the chunk processing |
425 | * inside the {@link Step}. |
426 | * |
427 | * @param taskExecutor the taskExecutor to set |
428 | */ |
429 | public void setTaskExecutor(TaskExecutor taskExecutor) { |
430 | this.taskExecutor = taskExecutor; |
431 | } |
432 | |
433 | /** |
434 | * Mkae the {@link TaskExecutor} available to subclasses |
435 | * @return the taskExecutor to be used to execute chunks |
436 | */ |
437 | protected TaskExecutor getTaskExecutor() { |
438 | return taskExecutor; |
439 | } |
440 | |
441 | /** |
442 | * Public setter for the throttle limit. This limits the number of tasks queued for concurrent processing to prevent |
443 | * thread pools from being overwhelmed. Defaults to {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}. |
444 | * @param throttleLimit the throttle limit to set. |
445 | */ |
446 | public void setThrottleLimit(int throttleLimit) { |
447 | this.throttleLimit = throttleLimit; |
448 | } |
449 | |
450 | protected void applyConfiguration(SimpleStepBuilder<T, S> builder) { |
451 | |
452 | builder.reader(itemReader); |
453 | builder.processor(itemProcessor); |
454 | builder.writer(itemWriter); |
455 | for (StepExecutionListener listener : BatchListenerFactoryHelper.<StepExecutionListener> getListeners( |
456 | listeners, StepExecutionListener.class)) { |
457 | builder.listener(listener); |
458 | } |
459 | for (ChunkListener listener : BatchListenerFactoryHelper.<ChunkListener> getListeners(listeners, |
460 | ChunkListener.class)) { |
461 | builder.listener(listener); |
462 | } |
463 | for (ItemReadListener<T> listener : BatchListenerFactoryHelper.<ItemReadListener<T>> getListeners(listeners, |
464 | ItemReadListener.class)) { |
465 | builder.listener(listener); |
466 | } |
467 | for (ItemWriteListener<S> listener : BatchListenerFactoryHelper.<ItemWriteListener<S>> getListeners(listeners, |
468 | ItemWriteListener.class)) { |
469 | builder.listener(listener); |
470 | } |
471 | for (ItemProcessListener<T, S> listener : BatchListenerFactoryHelper.<ItemProcessListener<T, S>> getListeners( |
472 | listeners, ItemProcessListener.class)) { |
473 | builder.listener(listener); |
474 | } |
475 | builder.transactionManager(transactionManager); |
476 | builder.transactionAttribute(getTransactionAttribute()); |
477 | builder.repository(jobRepository); |
478 | builder.startLimit(startLimit); |
479 | builder.allowStartIfComplete(allowStartIfComplete); |
480 | builder.chunk(commitInterval); |
481 | builder.chunk(chunkCompletionPolicy); |
482 | builder.chunkOperations(chunkOperations); |
483 | builder.stepOperations(stepOperations); |
484 | builder.taskExecutor(taskExecutor); |
485 | builder.throttleLimit(throttleLimit); |
486 | builder.exceptionHandler(exceptionHandler); |
487 | if (isReaderTransactionalQueue) { |
488 | builder.readerIsTransactionalQueue(); |
489 | } |
490 | for (ItemStream stream : streams) { |
491 | builder.stream(stream); |
492 | } |
493 | |
494 | } |
495 | } |