1 | /* |
2 | * Copyright 2006-2013 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.configuration.xml; |
18 | |
19 | import org.springframework.batch.core.ChunkListener; |
20 | import org.springframework.batch.core.ItemProcessListener; |
21 | import org.springframework.batch.core.ItemReadListener; |
22 | import org.springframework.batch.core.ItemWriteListener; |
23 | import org.springframework.batch.core.Job; |
24 | import org.springframework.batch.core.SkipListener; |
25 | import org.springframework.batch.core.Step; |
26 | import org.springframework.batch.core.StepExecutionListener; |
27 | import org.springframework.batch.core.StepListener; |
28 | import org.springframework.batch.core.job.flow.Flow; |
29 | import org.springframework.batch.core.launch.JobLauncher; |
30 | import org.springframework.batch.core.partition.PartitionHandler; |
31 | import org.springframework.batch.core.partition.support.Partitioner; |
32 | import org.springframework.batch.core.partition.support.StepExecutionAggregator; |
33 | import org.springframework.batch.core.repository.JobRepository; |
34 | import org.springframework.batch.core.step.builder.AbstractTaskletStepBuilder; |
35 | import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder; |
36 | import org.springframework.batch.core.step.builder.FlowStepBuilder; |
37 | import org.springframework.batch.core.step.builder.JobStepBuilder; |
38 | import org.springframework.batch.core.step.builder.PartitionStepBuilder; |
39 | import org.springframework.batch.core.step.builder.SimpleStepBuilder; |
40 | import org.springframework.batch.core.step.builder.StepBuilder; |
41 | import org.springframework.batch.core.step.builder.StepBuilderHelper; |
42 | import org.springframework.batch.core.step.builder.TaskletStepBuilder; |
43 | import org.springframework.batch.core.step.factory.FaultTolerantStepFactoryBean; |
44 | import org.springframework.batch.core.step.factory.SimpleStepFactoryBean; |
45 | import org.springframework.batch.core.step.item.KeyGenerator; |
46 | import org.springframework.batch.core.step.job.JobParametersExtractor; |
47 | import org.springframework.batch.core.step.skip.SkipPolicy; |
48 | import org.springframework.batch.core.step.tasklet.Tasklet; |
49 | import org.springframework.batch.core.step.tasklet.TaskletStep; |
50 | import org.springframework.batch.item.ItemProcessor; |
51 | import org.springframework.batch.item.ItemReader; |
52 | import org.springframework.batch.item.ItemStream; |
53 | import org.springframework.batch.item.ItemWriter; |
54 | import org.springframework.batch.repeat.CompletionPolicy; |
55 | import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; |
56 | import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; |
57 | import org.springframework.beans.factory.BeanNameAware; |
58 | import org.springframework.beans.factory.FactoryBean; |
59 | import org.springframework.classify.BinaryExceptionClassifier; |
60 | import org.springframework.core.task.TaskExecutor; |
61 | import org.springframework.retry.RetryListener; |
62 | import org.springframework.retry.RetryPolicy; |
63 | import org.springframework.retry.backoff.BackOffPolicy; |
64 | import org.springframework.retry.policy.MapRetryContextCache; |
65 | import org.springframework.retry.policy.RetryContextCache; |
66 | import org.springframework.transaction.PlatformTransactionManager; |
67 | import org.springframework.transaction.annotation.Isolation; |
68 | import org.springframework.transaction.annotation.Propagation; |
69 | import org.springframework.transaction.interceptor.DefaultTransactionAttribute; |
70 | import org.springframework.util.Assert; |
71 | |
72 | import java.util.Collection; |
73 | import java.util.HashMap; |
74 | import java.util.HashSet; |
75 | import java.util.LinkedHashSet; |
76 | import java.util.Map; |
77 | import java.util.Set; |
78 | |
79 | /** |
80 | * This {@link FactoryBean} is used by the batch namespace parser to create {@link Step} objects. Stores all of the |
81 | * properties that are configurable on the <step/> (and its inner <tasklet/>). Based on which properties are |
82 | * configured, the {@link #getObject()} method will delegate to the appropriate class for generating the {@link Step}. |
83 | * |
84 | * @author Dan Garrette |
85 | * @author Josh Long |
86 | * @see SimpleStepFactoryBean |
87 | * @see FaultTolerantStepFactoryBean |
88 | * @see TaskletStep |
89 | * @since 2.0 |
90 | */ |
91 | @SuppressWarnings("rawtypes") |
92 | class StepParserStepFactoryBean<I, O> implements FactoryBean, BeanNameAware { |
93 | |
94 | // |
95 | // Step Attributes |
96 | // |
97 | private String name; |
98 | |
99 | // |
100 | // Tasklet Attributes |
101 | // |
102 | private Boolean allowStartIfComplete; |
103 | |
104 | private JobRepository jobRepository; |
105 | |
106 | private Integer startLimit; |
107 | |
108 | private Tasklet tasklet; |
109 | |
110 | private PlatformTransactionManager transactionManager; |
111 | |
112 | private Set<StepExecutionListener> stepExecutionListeners = new LinkedHashSet<StepExecutionListener>(); |
113 | |
114 | // |
115 | // Flow Elements |
116 | // |
117 | private Flow flow; |
118 | |
119 | // |
120 | // Job Elements |
121 | // |
122 | private Job job; |
123 | |
124 | private JobLauncher jobLauncher; |
125 | |
126 | private JobParametersExtractor jobParametersExtractor; |
127 | |
128 | // |
129 | // Partition Elements |
130 | // |
131 | private Partitioner partitioner; |
132 | |
133 | private static final int DEFAULT_GRID_SIZE = 6; |
134 | |
135 | private Step step; |
136 | |
137 | private PartitionHandler partitionHandler; |
138 | |
139 | private int gridSize = DEFAULT_GRID_SIZE; |
140 | |
141 | // |
142 | // Tasklet Elements |
143 | // |
144 | private Collection<Class<? extends Throwable>> noRollbackExceptionClasses; |
145 | |
146 | private Integer transactionTimeout; |
147 | |
148 | private Propagation propagation; |
149 | |
150 | private Isolation isolation; |
151 | |
152 | private Set<ChunkListener> chunkListeners = new LinkedHashSet<ChunkListener>(); |
153 | |
154 | // |
155 | // Chunk Attributes |
156 | // |
157 | private int cacheCapacity = 0; |
158 | |
159 | private CompletionPolicy chunkCompletionPolicy; |
160 | |
161 | private Integer commitInterval; |
162 | |
163 | private Boolean readerTransactionalQueue; |
164 | |
165 | private Boolean processorTransactional; |
166 | |
167 | private int retryLimit = 0; |
168 | |
169 | private BackOffPolicy backOffPolicy; |
170 | |
171 | private RetryPolicy retryPolicy; |
172 | |
173 | private RetryContextCache retryContextCache; |
174 | |
175 | private KeyGenerator keyGenerator; |
176 | |
177 | private Integer skipLimit; |
178 | |
179 | private SkipPolicy skipPolicy; |
180 | |
181 | private TaskExecutor taskExecutor; |
182 | |
183 | private Integer throttleLimit; |
184 | |
185 | private ItemReader<? extends I> itemReader; |
186 | |
187 | private ItemProcessor<? super I, ? extends O> itemProcessor; |
188 | |
189 | private ItemWriter<? super O> itemWriter; |
190 | |
191 | // |
192 | // Chunk Elements |
193 | // |
194 | private RetryListener[] retryListeners; |
195 | |
196 | private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>(); |
197 | |
198 | private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>(); |
199 | |
200 | private ItemStream[] streams; |
201 | |
202 | private Set<ItemReadListener<I>> readListeners = new LinkedHashSet<ItemReadListener<I>>(); |
203 | |
204 | private Set<ItemWriteListener<O>> writeListeners = new LinkedHashSet<ItemWriteListener<O>>(); |
205 | |
206 | private Set<ItemProcessListener<I, O>> processListeners = new LinkedHashSet<ItemProcessListener<I, O>>(); |
207 | |
208 | private Set<SkipListener<I, O>> skipListeners = new LinkedHashSet<SkipListener<I, O>>(); |
209 | |
210 | // |
211 | // Additional |
212 | // |
213 | private boolean hasChunkElement = false; |
214 | |
215 | private StepExecutionAggregator stepExecutionAggregator; |
216 | |
217 | private StepListener[] listeners; |
218 | |
219 | /** |
220 | * Create a {@link Step} from the configuration provided. |
221 | * |
222 | * @see FactoryBean#getObject() |
223 | */ |
224 | @Override |
225 | public final Object getObject() throws Exception { |
226 | if (hasChunkElement) { |
227 | Assert.isNull(tasklet, "Step [" + name |
228 | + "] has both a <chunk/> element and a 'ref' attribute referencing a Tasklet."); |
229 | |
230 | validateFaultTolerantSettings(); |
231 | if (isFaultTolerant()) { |
232 | return createFaultTolerantStep(); |
233 | } |
234 | else { |
235 | return createSimpleStep(); |
236 | } |
237 | } |
238 | else if (tasklet != null) { |
239 | return createTaskletStep(); |
240 | } |
241 | else if (flow != null) { |
242 | return createFlowStep(); |
243 | } |
244 | else if (job != null) { |
245 | return createJobStep(); |
246 | } |
247 | else { |
248 | return createPartitionStep(); |
249 | } |
250 | } |
251 | |
252 | public boolean requiresTransactionManager() { |
253 | // Currently all step implementations other than TaskletStep are |
254 | // AbstractStep and do not require a transaction manager |
255 | return hasChunkElement || tasklet != null; |
256 | } |
257 | |
258 | private void enhanceCommonStep(StepBuilderHelper<?> builder) { |
259 | if (allowStartIfComplete != null) { |
260 | builder.allowStartIfComplete(allowStartIfComplete); |
261 | } |
262 | if (startLimit != null) { |
263 | builder.startLimit(startLimit); |
264 | } |
265 | builder.repository(jobRepository); |
266 | builder.transactionManager(transactionManager); |
267 | for (StepExecutionListener listener : stepExecutionListeners) { |
268 | builder.listener(listener); |
269 | } |
270 | } |
271 | |
272 | private Step createPartitionStep() { |
273 | |
274 | PartitionStepBuilder builder; |
275 | if (partitioner != null) { |
276 | builder = new StepBuilder(name).partitioner(step != null ? step.getName() : name, partitioner).step(step); |
277 | } |
278 | else { |
279 | builder = new StepBuilder(name).partitioner(step); |
280 | } |
281 | enhanceCommonStep(builder); |
282 | |
283 | if (partitionHandler != null) { |
284 | builder.partitionHandler(partitionHandler); |
285 | } |
286 | else { |
287 | builder.gridSize(gridSize); |
288 | builder.taskExecutor(taskExecutor); |
289 | } |
290 | |
291 | builder.aggregator(stepExecutionAggregator); |
292 | |
293 | return builder.build(); |
294 | |
295 | } |
296 | |
297 | private Step createFaultTolerantStep() { |
298 | |
299 | FaultTolerantStepBuilder<I, O> builder = new FaultTolerantStepBuilder<I, O>(new StepBuilder(name)); |
300 | |
301 | if (commitInterval != null) { |
302 | builder.chunk(commitInterval); |
303 | } |
304 | builder.chunk(chunkCompletionPolicy); |
305 | enhanceTaskletStepBuilder(builder); |
306 | |
307 | builder.reader(itemReader); |
308 | builder.writer(itemWriter); |
309 | builder.processor(itemProcessor); |
310 | |
311 | if (processorTransactional != null && !processorTransactional) { |
312 | builder.processorNonTransactional(); |
313 | } |
314 | |
315 | if (readerTransactionalQueue!=null && readerTransactionalQueue==true) { |
316 | builder.readerIsTransactionalQueue(); |
317 | } |
318 | |
319 | for (SkipListener<I, O> listener : skipListeners) { |
320 | builder.listener(listener); |
321 | } |
322 | |
323 | registerItemListeners(builder); |
324 | |
325 | if (skipPolicy != null) { |
326 | builder.skipPolicy(skipPolicy); |
327 | } |
328 | else if (skipLimit!=null) { |
329 | builder.skipLimit(skipLimit); |
330 | for (Class<? extends Throwable> type : skippableExceptionClasses.keySet()) { |
331 | if (skippableExceptionClasses.get(type)) { |
332 | builder.skip(type); |
333 | } |
334 | else { |
335 | builder.noSkip(type); |
336 | } |
337 | } |
338 | } |
339 | |
340 | if (retryListeners != null) { |
341 | for (RetryListener listener : retryListeners) { |
342 | builder.listener(listener); |
343 | } |
344 | } |
345 | |
346 | if (retryContextCache == null && cacheCapacity > 0) { |
347 | retryContextCache = new MapRetryContextCache(cacheCapacity); |
348 | } |
349 | builder.retryContextCache(retryContextCache); |
350 | builder.keyGenerator(keyGenerator); |
351 | if (retryPolicy != null) { |
352 | builder.retryPolicy(retryPolicy); |
353 | } |
354 | else { |
355 | builder.retryLimit(retryLimit); |
356 | builder.backOffPolicy(backOffPolicy); |
357 | for (Class<? extends Throwable> type : retryableExceptionClasses.keySet()) { |
358 | if (retryableExceptionClasses.get(type)) { |
359 | builder.retry(type); |
360 | } |
361 | else { |
362 | builder.noRetry(type); |
363 | } |
364 | } |
365 | } |
366 | |
367 | if (noRollbackExceptionClasses != null) { |
368 | for (Class<? extends Throwable> type : noRollbackExceptionClasses) { |
369 | builder.noRollback(type); |
370 | } |
371 | } |
372 | |
373 | return builder.build(); |
374 | |
375 | } |
376 | |
377 | private void registerItemListeners(SimpleStepBuilder<I, O> builder) { |
378 | for (ItemReadListener<I> listener : readListeners) { |
379 | builder.listener(listener); |
380 | } |
381 | for (ItemWriteListener<O> listener : writeListeners) { |
382 | builder.listener(listener); |
383 | } |
384 | for (ItemProcessListener<I, O> listener : processListeners) { |
385 | builder.listener(listener); |
386 | } |
387 | } |
388 | |
389 | @SuppressWarnings("unchecked") |
390 | private Step createSimpleStep() { |
391 | SimpleStepBuilder builder = new SimpleStepBuilder(new StepBuilder(name)); |
392 | if (commitInterval != null) { |
393 | builder.chunk(commitInterval); |
394 | } |
395 | enhanceTaskletStepBuilder(builder); |
396 | registerItemListeners(builder); |
397 | builder.reader(itemReader); |
398 | builder.writer(itemWriter); |
399 | builder.processor(itemProcessor); |
400 | builder.chunk(chunkCompletionPolicy); |
401 | return builder.build(); |
402 | } |
403 | |
404 | private TaskletStep createTaskletStep() { |
405 | TaskletStepBuilder builder = new StepBuilder(name).tasklet(tasklet); |
406 | enhanceTaskletStepBuilder(builder); |
407 | return builder.build(); |
408 | } |
409 | |
410 | @SuppressWarnings("serial") |
411 | private void enhanceTaskletStepBuilder(AbstractTaskletStepBuilder<?> builder) { |
412 | |
413 | enhanceCommonStep(builder); |
414 | for (ChunkListener listener : chunkListeners) { |
415 | builder.listener(listener); |
416 | |
417 | } |
418 | builder.taskExecutor(taskExecutor); |
419 | if (throttleLimit != null) { |
420 | builder.throttleLimit(throttleLimit); |
421 | } |
422 | builder.transactionManager(transactionManager); |
423 | if (transactionTimeout != null || propagation != null || isolation != null |
424 | || noRollbackExceptionClasses != null) { |
425 | DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); |
426 | if (propagation != null) { |
427 | attribute.setPropagationBehavior(propagation.value()); |
428 | } |
429 | if (isolation != null) { |
430 | attribute.setIsolationLevel(isolation.value()); |
431 | } |
432 | if (transactionTimeout != null) { |
433 | attribute.setTimeout(transactionTimeout); |
434 | } |
435 | Collection<Class<? extends Throwable>> exceptions = noRollbackExceptionClasses == null ? new HashSet<Class<? extends Throwable>>() |
436 | : noRollbackExceptionClasses; |
437 | final BinaryExceptionClassifier classifier = new BinaryExceptionClassifier(exceptions, false); |
438 | builder.transactionAttribute(new DefaultTransactionAttribute(attribute) { |
439 | @Override |
440 | public boolean rollbackOn(Throwable ex) { |
441 | return classifier.classify(ex); |
442 | } |
443 | }); |
444 | } |
445 | if (streams != null) { |
446 | for (ItemStream stream : streams) { |
447 | builder.stream(stream); |
448 | } |
449 | } |
450 | |
451 | } |
452 | |
453 | private Step createFlowStep() { |
454 | FlowStepBuilder builder = new StepBuilder(name).flow(flow); |
455 | enhanceCommonStep(builder); |
456 | return builder.build(); |
457 | } |
458 | |
459 | private Step createJobStep() throws Exception { |
460 | |
461 | JobStepBuilder builder = new StepBuilder(name).job(job); |
462 | enhanceCommonStep(builder); |
463 | builder.parametersExtractor(jobParametersExtractor); |
464 | builder.launcher(jobLauncher); |
465 | return builder.build(); |
466 | |
467 | } |
468 | |
469 | private void validateFaultTolerantSettings() { |
470 | validateDependency("skippable-exception-classes", skippableExceptionClasses, "skip-limit", skipLimit, true); |
471 | validateDependency("retryable-exception-classes", retryableExceptionClasses, "retry-limit", retryLimit, true); |
472 | validateDependency("retry-listeners", retryListeners, "retry-limit", retryLimit, false); |
473 | if (isPresent(processorTransactional) && !processorTransactional && isPresent(readerTransactionalQueue) |
474 | && readerTransactionalQueue) { |
475 | throw new IllegalArgumentException( |
476 | "The field 'processor-transactional' cannot be false if 'reader-transactional-queue' is true"); |
477 | } |
478 | } |
479 | |
480 | /** |
481 | * Check if a field is present then a second is also. If the twoWayDependency flag is set then the opposite must |
482 | * also be true: if the second value is present, the first must also be. |
483 | * |
484 | * @param dependentName the name of the first field |
485 | * @param dependentValue the value of the first field |
486 | * @param name the name of the other field (which should be absent if the first is present) |
487 | * @param value the value of the other field |
488 | * @param twoWayDependency true if both depend on each other |
489 | * @throws IllegalArgumentException if either condition is violated |
490 | */ |
491 | private void validateDependency(String dependentName, Object dependentValue, String name, Object value, |
492 | boolean twoWayDependency) { |
493 | if (isPresent(dependentValue) && !isPresent(value)) { |
494 | throw new IllegalArgumentException("The field '" + dependentName + "' is not permitted on the step [" |
495 | + this.name + "] because there is no '" + name + "'."); |
496 | } |
497 | if (twoWayDependency && isPresent(value) && !isPresent(dependentValue)) { |
498 | throw new IllegalArgumentException("The field '" + name + "' is not permitted on the step [" + this.name |
499 | + "] because there is no '" + dependentName + "'."); |
500 | } |
501 | } |
502 | |
503 | /** |
504 | * Is the object non-null (or if an Integer, non-zero)? |
505 | * |
506 | * @param o an object |
507 | * @return true if the object has a value |
508 | */ |
509 | private boolean isPresent(Object o) { |
510 | if (o instanceof Integer) { |
511 | return isPositive((Integer) o); |
512 | } |
513 | if (o instanceof Collection) { |
514 | return !((Collection<?>) o).isEmpty(); |
515 | } |
516 | if (o instanceof Map) { |
517 | return !((Map<?, ?>) o).isEmpty(); |
518 | } |
519 | return o != null; |
520 | } |
521 | |
522 | private boolean isFaultTolerant() { |
523 | return backOffPolicy != null || skipPolicy != null || retryPolicy != null || isPositive(skipLimit) |
524 | || isPositive(retryLimit) || isPositive(cacheCapacity) || isTrue(readerTransactionalQueue); |
525 | } |
526 | |
527 | private boolean isTrue(Boolean b) { |
528 | return b != null && b.booleanValue(); |
529 | } |
530 | |
531 | private boolean isPositive(Integer n) { |
532 | return n != null && n > 0; |
533 | } |
534 | |
535 | @Override |
536 | public Class<TaskletStep> getObjectType() { |
537 | return TaskletStep.class; |
538 | } |
539 | |
540 | @Override |
541 | public boolean isSingleton() { |
542 | return true; |
543 | } |
544 | |
545 | // ========================================================= |
546 | // Step Attributes |
547 | // ========================================================= |
548 | |
549 | /** |
550 | * Set the bean name property, which will become the name of the {@link Step} when it is created. |
551 | * |
552 | * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) |
553 | */ |
554 | @Override |
555 | public void setBeanName(String name) { |
556 | if (this.name == null) { |
557 | this.name = name; |
558 | } |
559 | } |
560 | |
561 | /** |
562 | * @param name the name to set |
563 | */ |
564 | public void setName(String name) { |
565 | this.name = name; |
566 | } |
567 | |
568 | // ========================================================= |
569 | // Flow Attributes |
570 | // ========================================================= |
571 | |
572 | /** |
573 | * @param flow the flow to set |
574 | */ |
575 | public void setFlow(Flow flow) { |
576 | this.flow = flow; |
577 | } |
578 | |
579 | // ========================================================= |
580 | // Job Attributes |
581 | // ========================================================= |
582 | |
583 | public void setJob(Job job) { |
584 | this.job = job; |
585 | } |
586 | |
587 | public void setJobParametersExtractor(JobParametersExtractor jobParametersExtractor) { |
588 | this.jobParametersExtractor = jobParametersExtractor; |
589 | } |
590 | |
591 | public void setJobLauncher(JobLauncher jobLauncher) { |
592 | this.jobLauncher = jobLauncher; |
593 | } |
594 | |
595 | // ========================================================= |
596 | // Partition Attributes |
597 | // ========================================================= |
598 | |
599 | /** |
600 | * @param partitioner the partitioner to set |
601 | */ |
602 | public void setPartitioner(Partitioner partitioner) { |
603 | this.partitioner = partitioner; |
604 | } |
605 | |
606 | /** |
607 | * @param stepExecutionAggregator the stepExecutionAggregator to set |
608 | */ |
609 | public void setStepExecutionAggregator(StepExecutionAggregator stepExecutionAggregator) { |
610 | this.stepExecutionAggregator = stepExecutionAggregator; |
611 | } |
612 | |
613 | /** |
614 | * @param partitionHandler the partitionHandler to set |
615 | */ |
616 | public void setPartitionHandler(PartitionHandler partitionHandler) { |
617 | this.partitionHandler = partitionHandler; |
618 | } |
619 | |
620 | /** |
621 | * @param gridSize the gridSize to set |
622 | */ |
623 | public void setGridSize(int gridSize) { |
624 | this.gridSize = gridSize; |
625 | } |
626 | |
627 | /** |
628 | * @param step the step to set |
629 | */ |
630 | public void setStep(Step step) { |
631 | this.step = step; |
632 | } |
633 | |
634 | // ========================================================= |
635 | // Tasklet Attributes |
636 | // ========================================================= |
637 | |
638 | /** |
639 | * Public setter for the flag to indicate that the step should be replayed on a restart, even if successful the |
640 | * first time. |
641 | * |
642 | * @param allowStartIfComplete the shouldAllowStartIfComplete to set |
643 | */ |
644 | public void setAllowStartIfComplete(boolean allowStartIfComplete) { |
645 | this.allowStartIfComplete = allowStartIfComplete; |
646 | |
647 | } |
648 | |
649 | /** |
650 | * @return jobRepository |
651 | */ |
652 | public JobRepository getJobRepository() { |
653 | return jobRepository; |
654 | } |
655 | |
656 | /** |
657 | * Public setter for {@link JobRepository}. |
658 | * |
659 | * @param jobRepository |
660 | */ |
661 | public void setJobRepository(JobRepository jobRepository) { |
662 | this.jobRepository = jobRepository; |
663 | } |
664 | |
665 | /** |
666 | * The number of times that the step should be allowed to start |
667 | * |
668 | * @param startLimit |
669 | */ |
670 | public void setStartLimit(int startLimit) { |
671 | this.startLimit = startLimit; |
672 | } |
673 | |
674 | /** |
675 | * A preconfigured {@link Tasklet} to use. |
676 | * |
677 | * @param tasklet |
678 | */ |
679 | public void setTasklet(Tasklet tasklet) { |
680 | this.tasklet = tasklet; |
681 | } |
682 | |
683 | /** |
684 | * @return transactionManager |
685 | */ |
686 | public PlatformTransactionManager getTransactionManager() { |
687 | return transactionManager; |
688 | } |
689 | |
690 | /** |
691 | * @param transactionManager the transaction manager to set |
692 | */ |
693 | public void setTransactionManager(PlatformTransactionManager transactionManager) { |
694 | this.transactionManager = transactionManager; |
695 | } |
696 | |
697 | // ========================================================= |
698 | // Tasklet Elements |
699 | // ========================================================= |
700 | |
701 | /** |
702 | * The listeners to inject into the {@link Step}. Any instance of {@link StepListener} can be used, and will then |
703 | * receive callbacks at the appropriate stage in the step. |
704 | * |
705 | * @param listeners an array of listeners |
706 | */ |
707 | public void setListeners(StepListener[] listeners) { |
708 | this.listeners = listeners; // useful for testing |
709 | for (StepListener listener : listeners) { |
710 | if (listener instanceof SkipListener) { |
711 | @SuppressWarnings("unchecked") |
712 | SkipListener<I, O> skipListener = (SkipListener<I, O>) listener; |
713 | skipListeners.add(skipListener); |
714 | } |
715 | if (listener instanceof StepExecutionListener) { |
716 | StepExecutionListener stepExecutionListener = (StepExecutionListener) listener; |
717 | stepExecutionListeners.add(stepExecutionListener); |
718 | } |
719 | if (listener instanceof ChunkListener) { |
720 | ChunkListener chunkListener = (ChunkListener) listener; |
721 | chunkListeners.add(chunkListener); |
722 | } |
723 | if (listener instanceof ItemReadListener) { |
724 | @SuppressWarnings("unchecked") |
725 | ItemReadListener<I> readListener = (ItemReadListener<I>) listener; |
726 | readListeners.add(readListener); |
727 | } |
728 | if (listener instanceof ItemWriteListener) { |
729 | @SuppressWarnings("unchecked") |
730 | ItemWriteListener<O> writeListener = (ItemWriteListener<O>) listener; |
731 | writeListeners.add(writeListener); |
732 | } |
733 | if (listener instanceof ItemProcessListener) { |
734 | @SuppressWarnings("unchecked") |
735 | ItemProcessListener<I, O> processListener = (ItemProcessListener<I, O>) listener; |
736 | processListeners.add(processListener); |
737 | } |
738 | } |
739 | } |
740 | |
741 | /** |
742 | * Exception classes that may not cause a rollback if encountered in the right place. |
743 | * |
744 | * @param noRollbackExceptionClasses the noRollbackExceptionClasses to set |
745 | */ |
746 | public void setNoRollbackExceptionClasses(Collection<Class<? extends Throwable>> noRollbackExceptionClasses) { |
747 | this.noRollbackExceptionClasses = noRollbackExceptionClasses; |
748 | } |
749 | |
750 | /** |
751 | * @param transactionTimeout the transactionTimeout to set |
752 | */ |
753 | public void setTransactionTimeout(int transactionTimeout) { |
754 | this.transactionTimeout = transactionTimeout; |
755 | } |
756 | |
757 | /** |
758 | * @param isolation the isolation to set |
759 | */ |
760 | public void setIsolation(Isolation isolation) { |
761 | this.isolation = isolation; |
762 | } |
763 | |
764 | /** |
765 | * @param propagation the propagation to set |
766 | */ |
767 | public void setPropagation(Propagation propagation) { |
768 | this.propagation = propagation; |
769 | } |
770 | |
771 | // ========================================================= |
772 | // Parent Attributes - can be provided in parent bean but not namespace |
773 | // ========================================================= |
774 | |
775 | /** |
776 | * A backoff policy to be applied to retry process. |
777 | * |
778 | * @param backOffPolicy the {@link BackOffPolicy} to set |
779 | */ |
780 | public void setBackOffPolicy(BackOffPolicy backOffPolicy) { |
781 | this.backOffPolicy = backOffPolicy; |
782 | } |
783 | |
784 | /** |
785 | * A retry policy to apply when exceptions occur. If this is specified then the retry limit and retryable exceptions |
786 | * will be ignored. |
787 | * |
788 | * @param retryPolicy the {@link RetryPolicy} to set |
789 | */ |
790 | public void setRetryPolicy(RetryPolicy retryPolicy) { |
791 | this.retryPolicy = retryPolicy; |
792 | } |
793 | |
794 | /** |
795 | * @param retryContextCache the {@link RetryContextCache} to set |
796 | */ |
797 | public void setRetryContextCache(RetryContextCache retryContextCache) { |
798 | this.retryContextCache = retryContextCache; |
799 | } |
800 | |
801 | /** |
802 | * A key generator that can be used to compare items with previously recorded items in a retry. Only used if the |
803 | * reader is a transactional queue. |
804 | * |
805 | * @param keyGenerator the {@link KeyGenerator} to set |
806 | */ |
807 | public void setKeyGenerator(KeyGenerator keyGenerator) { |
808 | this.keyGenerator = keyGenerator; |
809 | } |
810 | |
811 | // ========================================================= |
812 | // Chunk Attributes |
813 | // ========================================================= |
814 | |
815 | /** |
816 | * Public setter for the capacity of the cache in the retry policy. If more items than this fail without being |
817 | * skipped or recovered an exception will be thrown. This is to guard against inadvertent infinite loops generated |
818 | * by item identity problems.<br/> |
819 | * <p/> |
820 | * The default value should be high enough and more for most purposes. To breach the limit in a single-threaded step |
821 | * typically you have to have this many failures in a single transaction. Defaults to the value in the |
822 | * {@link MapRetryContextCache}.<br/> |
823 | * |
824 | * @param cacheCapacity the cache capacity to set (greater than 0 else ignored) |
825 | */ |
826 | public void setCacheCapacity(int cacheCapacity) { |
827 | this.cacheCapacity = cacheCapacity; |
828 | } |
829 | |
830 | /** |
831 | * Public setter for the {@link CompletionPolicy} applying to the chunk level. A transaction will be committed when |
832 | * this policy decides to complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size equal to the |
833 | * commitInterval property. |
834 | * |
835 | * @param chunkCompletionPolicy the chunkCompletionPolicy to set |
836 | */ |
837 | public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) { |
838 | this.chunkCompletionPolicy = chunkCompletionPolicy; |
839 | } |
840 | |
841 | /** |
842 | * Set the commit interval. Either set this or the chunkCompletionPolicy but not both. |
843 | * |
844 | * @param commitInterval 1 by default |
845 | */ |
846 | public void setCommitInterval(int commitInterval) { |
847 | this.commitInterval = commitInterval; |
848 | } |
849 | |
850 | /** |
851 | * Flag to signal that the reader is transactional (usually a JMS consumer) so that items are re-presented after a |
852 | * rollback. The default is false and readers are assumed to be forward-only. |
853 | * |
854 | * @param isReaderTransactionalQueue the value of the flag |
855 | */ |
856 | public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) { |
857 | this.readerTransactionalQueue = isReaderTransactionalQueue; |
858 | } |
859 | |
860 | /** |
861 | * Flag to signal that the processor is transactional, in which case it should be called for every item in every |
862 | * transaction. If false then we can cache the processor results between transactions in the case of a rollback. |
863 | * |
864 | * @param processorTransactional the value to set |
865 | */ |
866 | public void setProcessorTransactional(Boolean processorTransactional) { |
867 | this.processorTransactional = processorTransactional; |
868 | } |
869 | |
870 | /** |
871 | * Public setter for the retry limit. Each item can be retried up to this limit. Note this limit includes the |
872 | * initial attempt to process the item, therefore <code>retryLimit == 1</code> by default. |
873 | * |
874 | * @param retryLimit the retry limit to set, must be greater or equal to 1. |
875 | */ |
876 | public void setRetryLimit(int retryLimit) { |
877 | this.retryLimit = retryLimit; |
878 | } |
879 | |
880 | /** |
881 | * Public setter for a limit that determines skip policy. If this value is positive then an exception in chunk |
882 | * processing will cause the item to be skipped and no exception propagated until the limit is reached. If it is |
883 | * zero then all exceptions will be propagated from the chunk and cause the step to abort. |
884 | * |
885 | * @param skipLimit the value to set. Default is 0 (never skip). |
886 | */ |
887 | public void setSkipLimit(int skipLimit) { |
888 | this.skipLimit = skipLimit; |
889 | } |
890 | |
891 | /** |
892 | * Public setter for a skip policy. If this value is set then the skip limit and skippable exceptions are ignored. |
893 | * |
894 | * @param skipPolicy the {@link SkipPolicy} to set |
895 | */ |
896 | public void setSkipPolicy(SkipPolicy skipPolicy) { |
897 | this.skipPolicy = skipPolicy; |
898 | } |
899 | |
900 | /** |
901 | * Public setter for the {@link TaskExecutor}. If this is set, then it will be used to execute the chunk processing |
902 | * inside the {@link Step}. |
903 | * |
904 | * @param taskExecutor the taskExecutor to set |
905 | */ |
906 | public void setTaskExecutor(TaskExecutor taskExecutor) { |
907 | this.taskExecutor = taskExecutor; |
908 | } |
909 | |
910 | /** |
911 | * Public setter for the throttle limit. This limits the number of tasks queued for concurrent processing to prevent |
912 | * thread pools from being overwhelmed. Defaults to {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}. |
913 | * |
914 | * @param throttleLimit the throttle limit to set. |
915 | */ |
916 | public void setThrottleLimit(Integer throttleLimit) { |
917 | this.throttleLimit = throttleLimit; |
918 | } |
919 | |
920 | /** |
921 | * @param itemReader the {@link ItemReader} to set |
922 | */ |
923 | public void setItemReader(ItemReader<? extends I> itemReader) { |
924 | this.itemReader = itemReader; |
925 | } |
926 | |
927 | /** |
928 | * @param itemProcessor the {@link ItemProcessor} to set |
929 | */ |
930 | public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) { |
931 | this.itemProcessor = itemProcessor; |
932 | } |
933 | |
934 | /** |
935 | * @param itemWriter the {@link ItemWriter} to set |
936 | */ |
937 | public void setItemWriter(ItemWriter<? super O> itemWriter) { |
938 | this.itemWriter = itemWriter; |
939 | } |
940 | |
941 | // ========================================================= |
942 | // Chunk Elements |
943 | // ========================================================= |
944 | |
945 | /** |
946 | * Public setter for the {@link RetryListener}s. |
947 | * |
948 | * @param retryListeners the {@link RetryListener}s to set |
949 | */ |
950 | public void setRetryListeners(RetryListener... retryListeners) { |
951 | this.retryListeners = retryListeners; |
952 | } |
953 | |
954 | /** |
955 | * Public setter for exception classes that when raised won't crash the job but will result in transaction rollback |
956 | * and the item which handling caused the exception will be skipped. |
957 | * |
958 | * @param exceptionClasses |
959 | */ |
960 | public void setSkippableExceptionClasses(Map<Class<? extends Throwable>, Boolean> exceptionClasses) { |
961 | this.skippableExceptionClasses = exceptionClasses; |
962 | } |
963 | |
964 | /** |
965 | * Public setter for exception classes that will retry the item when raised. |
966 | * |
967 | * @param retryableExceptionClasses the retryableExceptionClasses to set |
968 | */ |
969 | public void setRetryableExceptionClasses(Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses) { |
970 | this.retryableExceptionClasses = retryableExceptionClasses; |
971 | } |
972 | |
973 | /** |
974 | * The streams to inject into the {@link Step}. Any instance of {@link ItemStream} can be used, and will then |
975 | * receive callbacks at the appropriate stage in the step. |
976 | * |
977 | * @param streams an array of listeners |
978 | */ |
979 | public void setStreams(ItemStream[] streams) { |
980 | this.streams = streams; |
981 | } |
982 | |
983 | // ========================================================= |
984 | // Additional |
985 | // ========================================================= |
986 | |
987 | /** |
988 | * @param hasChunkElement |
989 | */ |
990 | public void setHasChunkElement(boolean hasChunkElement) { |
991 | this.hasChunkElement = hasChunkElement; |
992 | } |
993 | |
994 | } |