1 | /* |
2 | * Copyright 2006-2007 the original author or authors. |
3 | * |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | * you may not use this file except in compliance with the License. |
6 | * You may obtain a copy of the License at |
7 | * |
8 | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | * |
10 | * Unless required by applicable law or agreed to in writing, software |
11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | * See the License for the specific language governing permissions and |
14 | * limitations under the License. |
15 | */ |
16 | |
17 | package org.springframework.batch.core.configuration.xml; |
18 | |
19 | import java.util.ArrayList; |
20 | import java.util.Collection; |
21 | import java.util.HashSet; |
22 | import java.util.List; |
23 | import java.util.Map; |
24 | |
25 | import org.apache.commons.logging.Log; |
26 | import org.apache.commons.logging.LogFactory; |
27 | import org.springframework.aop.framework.Advised; |
28 | import org.springframework.batch.classify.BinaryExceptionClassifier; |
29 | import org.springframework.batch.core.ChunkListener; |
30 | import org.springframework.batch.core.Job; |
31 | import org.springframework.batch.core.Step; |
32 | import org.springframework.batch.core.StepExecutionListener; |
33 | import org.springframework.batch.core.StepListener; |
34 | import org.springframework.batch.core.job.flow.Flow; |
35 | import org.springframework.batch.core.job.flow.FlowStep; |
36 | import org.springframework.batch.core.launch.JobLauncher; |
37 | import org.springframework.batch.core.launch.support.SimpleJobLauncher; |
38 | import org.springframework.batch.core.partition.PartitionHandler; |
39 | import org.springframework.batch.core.partition.support.PartitionStep; |
40 | import org.springframework.batch.core.partition.support.Partitioner; |
41 | import org.springframework.batch.core.partition.support.SimpleStepExecutionSplitter; |
42 | import org.springframework.batch.core.partition.support.StepExecutionAggregator; |
43 | import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler; |
44 | import org.springframework.batch.core.repository.JobRepository; |
45 | import org.springframework.batch.core.step.AbstractStep; |
46 | import org.springframework.batch.core.step.item.FaultTolerantStepFactoryBean; |
47 | import org.springframework.batch.core.step.item.KeyGenerator; |
48 | import org.springframework.batch.core.step.item.SimpleStepFactoryBean; |
49 | import org.springframework.batch.core.step.job.JobParametersExtractor; |
50 | import org.springframework.batch.core.step.job.JobStep; |
51 | import org.springframework.batch.core.step.skip.SkipPolicy; |
52 | import org.springframework.batch.core.step.tasklet.Tasklet; |
53 | import org.springframework.batch.core.step.tasklet.TaskletStep; |
54 | import org.springframework.batch.item.ItemProcessor; |
55 | import org.springframework.batch.item.ItemReader; |
56 | import org.springframework.batch.item.ItemStream; |
57 | import org.springframework.batch.item.ItemWriter; |
58 | import org.springframework.batch.repeat.CompletionPolicy; |
59 | import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; |
60 | import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate; |
61 | import org.springframework.batch.retry.RetryListener; |
62 | import org.springframework.batch.retry.RetryPolicy; |
63 | import org.springframework.batch.retry.backoff.BackOffPolicy; |
64 | import org.springframework.batch.retry.policy.MapRetryContextCache; |
65 | import org.springframework.batch.retry.policy.RetryContextCache; |
66 | import org.springframework.beans.factory.BeanNameAware; |
67 | import org.springframework.beans.factory.FactoryBean; |
68 | import org.springframework.core.task.SyncTaskExecutor; |
69 | import org.springframework.core.task.TaskExecutor; |
70 | import org.springframework.transaction.PlatformTransactionManager; |
71 | import org.springframework.transaction.annotation.Isolation; |
72 | import org.springframework.transaction.annotation.Propagation; |
73 | import org.springframework.transaction.interceptor.DefaultTransactionAttribute; |
74 | import org.springframework.util.Assert; |
75 | |
76 | /** |
77 | * This {@link FactoryBean} is used by the batch namespace parser to create |
78 | * {@link Step} objects. Stores all of the properties that are configurable on |
79 | * the <step/> (and its inner <tasklet/>). Based on which properties |
80 | * are configured, the {@link #getObject()} method will delegate to the |
81 | * appropriate class for generating the {@link Step}. |
82 | * |
83 | * @author Dan Garrette |
84 | * @author Josh Long |
85 | * @see SimpleStepFactoryBean |
86 | * @see FaultTolerantStepFactoryBean |
87 | * @see TaskletStep |
88 | * @since 2.0 |
89 | */ |
90 | class StepParserStepFactoryBean<I, O> implements FactoryBean, BeanNameAware { |
91 | |
92 | private static final Log logger = LogFactory.getLog(StepParserStepFactoryBean.class); |
93 | |
94 | // |
95 | // Step Attributes |
96 | // |
97 | private String name; |
98 | |
99 | // |
100 | // Tasklet Attributes |
101 | // |
102 | private Boolean allowStartIfComplete; |
103 | |
104 | private JobRepository jobRepository; |
105 | |
106 | private Integer startLimit; |
107 | |
108 | private Tasklet tasklet; |
109 | |
110 | private PlatformTransactionManager transactionManager; |
111 | |
112 | // |
113 | // Flow Elements |
114 | // |
115 | private Flow flow; |
116 | |
117 | // |
118 | // Job Elements |
119 | // |
120 | private Job job; |
121 | |
122 | private JobLauncher jobLauncher; |
123 | |
124 | private JobParametersExtractor jobParametersExtractor; |
125 | |
126 | // |
127 | // Partition Elements |
128 | // |
129 | private Partitioner partitioner; |
130 | |
131 | private static final int DEFAULT_GRID_SIZE = 6; |
132 | |
133 | private Step step; |
134 | |
135 | private PartitionHandler partitionHandler; |
136 | |
137 | private int gridSize = DEFAULT_GRID_SIZE; |
138 | |
139 | // |
140 | // Tasklet Elements |
141 | // |
142 | private StepListener[] listeners; |
143 | |
144 | private Collection<Class<? extends Throwable>> noRollbackExceptionClasses; |
145 | |
146 | private Integer transactionTimeout; |
147 | |
148 | private Propagation propagation; |
149 | |
150 | private Isolation isolation; |
151 | |
152 | // |
153 | // Chunk Attributes |
154 | // |
155 | private Integer cacheCapacity; |
156 | |
157 | private CompletionPolicy chunkCompletionPolicy; |
158 | |
159 | private Integer commitInterval; |
160 | |
161 | private Boolean readerTransactionalQueue; |
162 | |
163 | private Boolean processorTransactional; |
164 | |
165 | private Integer retryLimit; |
166 | |
167 | private BackOffPolicy backOffPolicy; |
168 | |
169 | private RetryPolicy retryPolicy; |
170 | |
171 | private RetryContextCache retryContextCache; |
172 | |
173 | private KeyGenerator keyGenerator; |
174 | |
175 | private Integer skipLimit; |
176 | |
177 | private SkipPolicy skipPolicy; |
178 | |
179 | private TaskExecutor taskExecutor; |
180 | |
181 | private Integer throttleLimit; |
182 | |
183 | private ItemReader<? extends I> itemReader; |
184 | |
185 | private ItemProcessor<? super I, ? extends O> itemProcessor; |
186 | |
187 | private ItemWriter<? super O> itemWriter; |
188 | |
189 | // |
190 | // Chunk Elements |
191 | // |
192 | private RetryListener[] retryListeners; |
193 | |
194 | private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses; |
195 | |
196 | private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses; |
197 | |
198 | private ItemStream[] streams; |
199 | |
200 | // |
201 | // Additional |
202 | // |
203 | private boolean hasChunkElement = false; |
204 | |
205 | private StepExecutionAggregator stepExecutionAggregator; |
206 | |
207 | /** |
208 | * Create a {@link Step} from the configuration provided. |
209 | * |
210 | * @see FactoryBean#getObject() |
211 | */ |
212 | public final Object getObject() throws Exception { |
213 | if (hasChunkElement) { |
214 | Assert.isNull(tasklet, "Step [" + name |
215 | + "] has both a <chunk/> element and a 'ref' attribute referencing a Tasklet."); |
216 | |
217 | validateFaultTolerantSettings(); |
218 | if (isFaultTolerant()) { |
219 | FaultTolerantStepFactoryBean<I, O> fb = new FaultTolerantStepFactoryBean<I, O>(); |
220 | configureSimple(fb); |
221 | configureFaultTolerant(fb); |
222 | return fb.getObject(); |
223 | } |
224 | else { |
225 | SimpleStepFactoryBean<I, O> fb = new SimpleStepFactoryBean<I, O>(); |
226 | configureSimple(fb); |
227 | return fb.getObject(); |
228 | } |
229 | } |
230 | else if (tasklet != null) { |
231 | TaskletStep ts = new TaskletStep(); |
232 | configureTaskletStep(ts); |
233 | return ts; |
234 | } |
235 | else if (flow != null) { |
236 | FlowStep ts = new FlowStep(); |
237 | configureFlowStep(ts); |
238 | return ts; |
239 | } |
240 | else if (job != null) { |
241 | JobStep ts = new JobStep(); |
242 | configureJobStep(ts); |
243 | return ts; |
244 | } |
245 | else { |
246 | PartitionStep ts = new PartitionStep(); |
247 | configurePartitionStep(ts); |
248 | return ts; |
249 | } |
250 | } |
251 | |
252 | public boolean requiresTransactionManager() { |
253 | // Currently all step implementations other than TaskletStep are |
254 | // AbstractStep and do not require a transaction manager |
255 | return hasChunkElement || tasklet != null; |
256 | } |
257 | |
258 | private void configureAbstractStep(AbstractStep ts) { |
259 | if (name != null) { |
260 | ts.setName(name); |
261 | } |
262 | if (allowStartIfComplete != null) { |
263 | ts.setAllowStartIfComplete(allowStartIfComplete); |
264 | } |
265 | if (jobRepository != null) { |
266 | ts.setJobRepository(jobRepository); |
267 | } |
268 | if (startLimit != null) { |
269 | ts.setStartLimit(startLimit); |
270 | } |
271 | if (listeners != null) { |
272 | List<StepExecutionListener> newListeners = new ArrayList<StepExecutionListener>(); |
273 | for (StepListener listener : listeners) { |
274 | if (listener instanceof StepExecutionListener) { |
275 | newListeners.add((StepExecutionListener) listener); |
276 | } |
277 | } |
278 | ts.setStepExecutionListeners(newListeners.toArray(new StepExecutionListener[0])); |
279 | } |
280 | } |
281 | |
282 | private void configurePartitionStep(PartitionStep ts) { |
283 | Assert.state(partitioner != null, "A Partitioner must be provided for a partition step"); |
284 | configureAbstractStep(ts); |
285 | |
286 | if (partitionHandler != null) { |
287 | ts.setPartitionHandler(partitionHandler); |
288 | } |
289 | else { |
290 | TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler(); |
291 | partitionHandler.setStep(step); |
292 | if (taskExecutor == null) { |
293 | taskExecutor = new SyncTaskExecutor(); |
294 | } |
295 | partitionHandler.setGridSize(gridSize); |
296 | partitionHandler.setTaskExecutor(taskExecutor); |
297 | ts.setPartitionHandler(partitionHandler); |
298 | } |
299 | |
300 | boolean allowStartIfComplete = this.allowStartIfComplete != null ? this.allowStartIfComplete : false; |
301 | String name = this.name; |
302 | if (step != null) { |
303 | try { |
304 | allowStartIfComplete = step.isAllowStartIfComplete(); |
305 | name = step.getName(); |
306 | } |
307 | catch (Exception e) { |
308 | logger.info("Ignored exception from step asking for name and allowStartIfComplete flag. " |
309 | + "Using default from enclosing PartitionStep (" + name + "," + allowStartIfComplete + ")."); |
310 | } |
311 | } |
312 | SimpleStepExecutionSplitter splitter = new SimpleStepExecutionSplitter(jobRepository, allowStartIfComplete, |
313 | name, partitioner); |
314 | ts.setStepExecutionSplitter(splitter); |
315 | if (stepExecutionAggregator != null) { |
316 | ts.setStepExecutionAggregator(stepExecutionAggregator); |
317 | } |
318 | } |
319 | |
320 | private Object extractTarget(Object target, Class<?> type) { |
321 | if (target instanceof Advised) { |
322 | Object source; |
323 | try { |
324 | source = ((Advised) target).getTargetSource().getTarget(); |
325 | } |
326 | catch (Exception e) { |
327 | throw new IllegalStateException("Could not extract target from proxy", e); |
328 | } |
329 | if (source instanceof Advised) { |
330 | source = extractTarget(source, type); |
331 | } |
332 | if (type.isAssignableFrom(source.getClass())) { |
333 | target = source; |
334 | } |
335 | } |
336 | return target; |
337 | } |
338 | |
339 | private void configureSimple(SimpleStepFactoryBean<I, O> fb) { |
340 | if (name != null) { |
341 | fb.setBeanName(name); |
342 | } |
343 | if (allowStartIfComplete != null) { |
344 | fb.setAllowStartIfComplete(allowStartIfComplete); |
345 | } |
346 | if (jobRepository != null) { |
347 | fb.setJobRepository(jobRepository); |
348 | } |
349 | if (startLimit != null) { |
350 | fb.setStartLimit(startLimit); |
351 | } |
352 | if (transactionManager != null) { |
353 | fb.setTransactionManager(transactionManager); |
354 | } |
355 | if (listeners != null) { |
356 | fb.setListeners(listeners); |
357 | } |
358 | if (transactionTimeout != null) { |
359 | fb.setTransactionTimeout(transactionTimeout); |
360 | } |
361 | if (propagation != null) { |
362 | fb.setPropagation(propagation); |
363 | } |
364 | if (isolation != null) { |
365 | fb.setIsolation(isolation); |
366 | } |
367 | |
368 | if (chunkCompletionPolicy != null) { |
369 | fb.setChunkCompletionPolicy(chunkCompletionPolicy); |
370 | } |
371 | if (commitInterval != null) { |
372 | fb.setCommitInterval(commitInterval); |
373 | } |
374 | if (taskExecutor != null) { |
375 | fb.setTaskExecutor(taskExecutor); |
376 | } |
377 | if (throttleLimit != null) { |
378 | fb.setThrottleLimit(throttleLimit); |
379 | } |
380 | if (itemReader != null) { |
381 | fb.setItemReader(itemReader); |
382 | } |
383 | if (itemProcessor != null) { |
384 | fb.setItemProcessor(itemProcessor); |
385 | } |
386 | if (itemWriter != null) { |
387 | fb.setItemWriter(itemWriter); |
388 | } |
389 | |
390 | if (streams != null) { |
391 | fb.setStreams(streams); |
392 | } |
393 | } |
394 | |
395 | private void configureFaultTolerant(FaultTolerantStepFactoryBean<I, O> fb) { |
396 | if (cacheCapacity != null) { |
397 | fb.setCacheCapacity(cacheCapacity); |
398 | } |
399 | if (readerTransactionalQueue != null) { |
400 | fb.setIsReaderTransactionalQueue(readerTransactionalQueue); |
401 | } |
402 | if (processorTransactional != null) { |
403 | fb.setProcessorTransactional(processorTransactional); |
404 | } |
405 | if (retryLimit != null) { |
406 | fb.setRetryLimit(retryLimit); |
407 | } |
408 | if (skipLimit != null) { |
409 | fb.setSkipLimit(skipLimit); |
410 | } |
411 | if (skipPolicy != null) { |
412 | fb.setSkipPolicy(skipPolicy); |
413 | } |
414 | if (backOffPolicy != null) { |
415 | fb.setBackOffPolicy(backOffPolicy); |
416 | } |
417 | if (retryPolicy != null) { |
418 | fb.setRetryPolicy(retryPolicy); |
419 | } |
420 | if (retryContextCache != null) { |
421 | fb.setRetryContextCache(retryContextCache); |
422 | } |
423 | if (keyGenerator != null) { |
424 | fb.setKeyGenerator(keyGenerator); |
425 | } |
426 | |
427 | if (retryListeners != null) { |
428 | fb.setRetryListeners(retryListeners); |
429 | } |
430 | if (skippableExceptionClasses != null) { |
431 | fb.setSkippableExceptionClasses(skippableExceptionClasses); |
432 | } |
433 | if (retryableExceptionClasses != null) { |
434 | fb.setRetryableExceptionClasses(retryableExceptionClasses); |
435 | } |
436 | if (noRollbackExceptionClasses != null) { |
437 | fb.setNoRollbackExceptionClasses(noRollbackExceptionClasses); |
438 | } |
439 | } |
440 | |
441 | @SuppressWarnings("serial") |
442 | private void configureTaskletStep(TaskletStep ts) { |
443 | configureAbstractStep(ts); |
444 | if (listeners != null) { |
445 | List<ChunkListener> newListeners = new ArrayList<ChunkListener>(); |
446 | for (StepListener listener : listeners) { |
447 | if (listener instanceof ChunkListener) { |
448 | newListeners.add((ChunkListener) listener); |
449 | } |
450 | } |
451 | ts.setChunkListeners(newListeners.toArray(new ChunkListener[0])); |
452 | } |
453 | if (tasklet != null) { |
454 | ts.setTasklet(tasklet); |
455 | } |
456 | if (taskExecutor != null) { |
457 | TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate(); |
458 | repeatTemplate.setTaskExecutor(taskExecutor); |
459 | if (throttleLimit != null) { |
460 | repeatTemplate.setThrottleLimit(throttleLimit); |
461 | } |
462 | ts.setStepOperations(repeatTemplate); |
463 | } |
464 | if (transactionManager != null) { |
465 | ts.setTransactionManager(transactionManager); |
466 | } |
467 | if (transactionTimeout != null || propagation != null || isolation != null |
468 | || noRollbackExceptionClasses != null) { |
469 | DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); |
470 | if (propagation != null) { |
471 | attribute.setPropagationBehavior(propagation.value()); |
472 | } |
473 | if (isolation != null) { |
474 | attribute.setIsolationLevel(isolation.value()); |
475 | } |
476 | if (transactionTimeout != null) { |
477 | attribute.setTimeout(transactionTimeout); |
478 | } |
479 | Collection<Class<? extends Throwable>> exceptions = noRollbackExceptionClasses == null ? new HashSet<Class<? extends Throwable>>() |
480 | : noRollbackExceptionClasses; |
481 | final BinaryExceptionClassifier classifier = new BinaryExceptionClassifier(exceptions, false); |
482 | ts.setTransactionAttribute(new DefaultTransactionAttribute(attribute) { |
483 | @Override |
484 | public boolean rollbackOn(Throwable ex) { |
485 | return classifier.classify(ex); |
486 | } |
487 | }); |
488 | } |
489 | } |
490 | |
491 | @SuppressWarnings("serial") |
492 | private void configureFlowStep(FlowStep ts) { |
493 | configureAbstractStep(ts); |
494 | if (flow != null) { |
495 | ts.setFlow(flow); |
496 | } |
497 | } |
498 | |
499 | @SuppressWarnings("serial") |
500 | private void configureJobStep(JobStep ts) throws Exception { |
501 | configureAbstractStep(ts); |
502 | if (job != null) { |
503 | ts.setJob(job); |
504 | } |
505 | if (jobParametersExtractor != null) { |
506 | ts.setJobParametersExtractor(jobParametersExtractor); |
507 | } |
508 | if (jobLauncher == null) { |
509 | SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); |
510 | jobLauncher.setJobRepository(jobRepository); |
511 | jobLauncher.afterPropertiesSet(); |
512 | this.jobLauncher = jobLauncher; |
513 | } |
514 | ts.setJobLauncher(jobLauncher); |
515 | } |
516 | |
517 | private void validateFaultTolerantSettings() { |
518 | validateDependency("skippable-exception-classes", skippableExceptionClasses, "skip-limit", skipLimit, true); |
519 | validateDependency("retryable-exception-classes", retryableExceptionClasses, "retry-limit", retryLimit, true); |
520 | validateDependency("retry-listeners", retryListeners, "retry-limit", retryLimit, false); |
521 | if (isPresent(processorTransactional) && !processorTransactional && isPresent(readerTransactionalQueue) |
522 | && readerTransactionalQueue) { |
523 | throw new IllegalArgumentException( |
524 | "The field 'processor-transactional' cannot be false if 'reader-transactional-queue' is true"); |
525 | } |
526 | } |
527 | |
528 | /** |
529 | * Check if a field is present then a second is also. If the |
530 | * twoWayDependency flag is set then the opposite must also be true: if the |
531 | * second value is present, the first must also be. |
532 | * |
533 | * @param dependentName the name of the first field |
534 | * @param dependentValue the value of the first field |
535 | * @param name the name of the other field (which should be absent if the |
536 | * first is present) |
537 | * @param value the value of the other field |
538 | * @param twoWayDependency true if both depend on each other |
539 | * @throws IllegalArgumentException if either condition is violated |
540 | */ |
541 | private void validateDependency(String dependentName, Object dependentValue, String name, Object value, |
542 | boolean twoWayDependency) { |
543 | if (isPresent(dependentValue) && !isPresent(value)) { |
544 | throw new IllegalArgumentException("The field '" + dependentName + "' is not permitted on the step [" |
545 | + this.name + "] because there is no '" + name + "'."); |
546 | } |
547 | if (twoWayDependency && isPresent(value) && !isPresent(dependentValue)) { |
548 | throw new IllegalArgumentException("The field '" + name + "' is not permitted on the step [" + this.name |
549 | + "] because there is no '" + dependentName + "'."); |
550 | } |
551 | } |
552 | |
553 | /** |
554 | * Is the object non-null (or if an Integer, non-zero)? |
555 | * |
556 | * @param o an object |
557 | * @return true if the object has a value |
558 | */ |
559 | private boolean isPresent(Object o) { |
560 | if (o instanceof Integer) { |
561 | return isPositive((Integer) o); |
562 | } |
563 | return o != null; |
564 | } |
565 | |
566 | private boolean isFaultTolerant() { |
567 | return backOffPolicy != null || skipPolicy != null || retryPolicy != null || isPositive(skipLimit) |
568 | || isPositive(retryLimit) || isPositive(cacheCapacity) || isTrue(readerTransactionalQueue); |
569 | } |
570 | |
571 | private boolean isTrue(Boolean b) { |
572 | return b != null && b.booleanValue(); |
573 | } |
574 | |
575 | private boolean isPositive(Integer n) { |
576 | return n != null && n > 0; |
577 | } |
578 | |
579 | public Class<TaskletStep> getObjectType() { |
580 | return TaskletStep.class; |
581 | } |
582 | |
583 | public boolean isSingleton() { |
584 | return true; |
585 | } |
586 | |
587 | // ========================================================= |
588 | // Step Attributes |
589 | // ========================================================= |
590 | |
591 | /** |
592 | * Set the bean name property, which will become the name of the |
593 | * {@link Step} when it is created. |
594 | * |
595 | * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String) |
596 | */ |
597 | public void setBeanName(String name) { |
598 | if (this.name == null) { |
599 | this.name = name; |
600 | } |
601 | } |
602 | |
603 | /** |
604 | * @param name the name to set |
605 | */ |
606 | public void setName(String name) { |
607 | this.name = name; |
608 | } |
609 | |
610 | // ========================================================= |
611 | // Flow Attributes |
612 | // ========================================================= |
613 | |
614 | /** |
615 | * @param flow the flow to set |
616 | */ |
617 | public void setFlow(Flow flow) { |
618 | this.flow = flow; |
619 | } |
620 | |
621 | // ========================================================= |
622 | // Job Attributes |
623 | // ========================================================= |
624 | |
625 | public void setJob(Job job) { |
626 | this.job = job; |
627 | } |
628 | |
629 | public void setJobParametersExtractor(JobParametersExtractor jobParametersExtractor) { |
630 | this.jobParametersExtractor = jobParametersExtractor; |
631 | } |
632 | |
633 | public void setJobLauncher(JobLauncher jobLauncher) { |
634 | this.jobLauncher = jobLauncher; |
635 | } |
636 | |
637 | // ========================================================= |
638 | // Partition Attributes |
639 | // ========================================================= |
640 | |
641 | /** |
642 | * @param partitioner the partitioner to set |
643 | */ |
644 | public void setPartitioner(Partitioner partitioner) { |
645 | this.partitioner = partitioner; |
646 | } |
647 | |
648 | /** |
649 | * @param stepExecutionAggregator the stepExecutionAggregator to set |
650 | */ |
651 | public void setStepExecutionAggregator(StepExecutionAggregator stepExecutionAggregator) { |
652 | this.stepExecutionAggregator = stepExecutionAggregator; |
653 | } |
654 | |
655 | /** |
656 | * @param partitionHandler the partitionHandler to set |
657 | */ |
658 | public void setPartitionHandler(PartitionHandler partitionHandler) { |
659 | this.partitionHandler = partitionHandler; |
660 | } |
661 | |
662 | /** |
663 | * @param gridSize the gridSize to set |
664 | */ |
665 | public void setGridSize(int gridSize) { |
666 | this.gridSize = gridSize; |
667 | } |
668 | |
669 | /** |
670 | * @param step the step to set |
671 | */ |
672 | public void setStep(Step step) { |
673 | this.step = step; |
674 | } |
675 | |
676 | // ========================================================= |
677 | // Tasklet Attributes |
678 | // ========================================================= |
679 | |
680 | /** |
681 | * Public setter for the flag to indicate that the step should be replayed |
682 | * on a restart, even if successful the first time. |
683 | * |
684 | * @param allowStartIfComplete the shouldAllowStartIfComplete to set |
685 | */ |
686 | public void setAllowStartIfComplete(boolean allowStartIfComplete) { |
687 | this.allowStartIfComplete = allowStartIfComplete; |
688 | |
689 | } |
690 | |
691 | /** |
692 | * @return jobRepository |
693 | */ |
694 | public JobRepository getJobRepository() { |
695 | return jobRepository; |
696 | } |
697 | |
698 | /** |
699 | * Public setter for {@link JobRepository}. |
700 | * |
701 | * @param jobRepository |
702 | */ |
703 | public void setJobRepository(JobRepository jobRepository) { |
704 | this.jobRepository = jobRepository; |
705 | } |
706 | |
707 | /** |
708 | * The number of times that the step should be allowed to start |
709 | * |
710 | * @param startLimit |
711 | */ |
712 | public void setStartLimit(int startLimit) { |
713 | this.startLimit = startLimit; |
714 | } |
715 | |
716 | /** |
717 | * A preconfigured {@link Tasklet} to use. |
718 | * |
719 | * @param tasklet |
720 | */ |
721 | public void setTasklet(Tasklet tasklet) { |
722 | this.tasklet = tasklet; |
723 | } |
724 | |
725 | /** |
726 | * @return transactionManager |
727 | */ |
728 | public PlatformTransactionManager getTransactionManager() { |
729 | return transactionManager; |
730 | } |
731 | |
732 | /** |
733 | * @param transactionManager the transaction manager to set |
734 | */ |
735 | public void setTransactionManager(PlatformTransactionManager transactionManager) { |
736 | this.transactionManager = transactionManager; |
737 | } |
738 | |
739 | // ========================================================= |
740 | // Tasklet Elements |
741 | // ========================================================= |
742 | |
743 | /** |
744 | * The listeners to inject into the {@link Step}. Any instance of |
745 | * {@link StepListener} can be used, and will then receive callbacks at the |
746 | * appropriate stage in the step. |
747 | * |
748 | * @param listeners an array of listeners |
749 | */ |
750 | public void setListeners(StepListener[] listeners) { |
751 | this.listeners = listeners; |
752 | } |
753 | |
754 | /** |
755 | * Exception classes that may not cause a rollback if encountered in the |
756 | * right place. |
757 | * |
758 | * @param noRollbackExceptionClasses the noRollbackExceptionClasses to set |
759 | */ |
760 | public void setNoRollbackExceptionClasses(Collection<Class<? extends Throwable>> noRollbackExceptionClasses) { |
761 | this.noRollbackExceptionClasses = noRollbackExceptionClasses; |
762 | } |
763 | |
764 | /** |
765 | * @param transactionTimeout the transactionTimeout to set |
766 | */ |
767 | public void setTransactionTimeout(int transactionTimeout) { |
768 | this.transactionTimeout = transactionTimeout; |
769 | } |
770 | |
771 | /** |
772 | * @param isolation the isolation to set |
773 | */ |
774 | public void setIsolation(Isolation isolation) { |
775 | this.isolation = isolation; |
776 | } |
777 | |
778 | /** |
779 | * @param propagation the propagation to set |
780 | */ |
781 | public void setPropagation(Propagation propagation) { |
782 | this.propagation = propagation; |
783 | } |
784 | |
785 | // ========================================================= |
786 | // Parent Attributes - can be provided in parent bean but not namespace |
787 | // ========================================================= |
788 | |
789 | /** |
790 | * A backoff policy to be applied to retry process. |
791 | * |
792 | * @param backOffPolicy the {@link BackOffPolicy} to set |
793 | */ |
794 | public void setBackOffPolicy(BackOffPolicy backOffPolicy) { |
795 | this.backOffPolicy = backOffPolicy; |
796 | } |
797 | |
798 | /** |
799 | * A retry policy to apply when exceptions occur. If this is specified then |
800 | * the retry limit and retryable exceptions will be ignored. |
801 | * |
802 | * @param retryPolicy the {@link RetryPolicy} to set |
803 | */ |
804 | public void setRetryPolicy(RetryPolicy retryPolicy) { |
805 | this.retryPolicy = retryPolicy; |
806 | } |
807 | |
808 | /** |
809 | * @param retryContextCache the {@link RetryContextCache} to set |
810 | */ |
811 | public void setRetryContextCache(RetryContextCache retryContextCache) { |
812 | this.retryContextCache = retryContextCache; |
813 | } |
814 | |
815 | /** |
816 | * A key generator that can be used to compare items with previously |
817 | * recorded items in a retry. Only used if the reader is a transactional |
818 | * queue. |
819 | * |
820 | * @param keyGenerator the {@link KeyGenerator} to set |
821 | */ |
822 | public void setKeyGenerator(KeyGenerator keyGenerator) { |
823 | this.keyGenerator = keyGenerator; |
824 | } |
825 | |
826 | // ========================================================= |
827 | // Chunk Attributes |
828 | // ========================================================= |
829 | |
830 | /** |
831 | * Public setter for the capacity of the cache in the retry policy. If more |
832 | * items than this fail without being skipped or recovered an exception will |
833 | * be thrown. This is to guard against inadvertent infinite loops generated |
834 | * by item identity problems.<br/> |
835 | * <p/> |
836 | * The default value should be high enough and more for most purposes. To |
837 | * breach the limit in a single-threaded step typically you have to have |
838 | * this many failures in a single transaction. Defaults to the value in the |
839 | * {@link MapRetryContextCache}.<br/> |
840 | * |
841 | * @param cacheCapacity the cache capacity to set (greater than 0 else |
842 | * ignored) |
843 | */ |
844 | public void setCacheCapacity(int cacheCapacity) { |
845 | this.cacheCapacity = cacheCapacity; |
846 | } |
847 | |
848 | /** |
849 | * Public setter for the {@link CompletionPolicy} applying to the chunk |
850 | * level. A transaction will be committed when this policy decides to |
851 | * complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size |
852 | * equal to the commitInterval property. |
853 | * |
854 | * @param chunkCompletionPolicy the chunkCompletionPolicy to set |
855 | */ |
856 | public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) { |
857 | this.chunkCompletionPolicy = chunkCompletionPolicy; |
858 | } |
859 | |
860 | /** |
861 | * Set the commit interval. Either set this or the chunkCompletionPolicy but |
862 | * not both. |
863 | * |
864 | * @param commitInterval 1 by default |
865 | */ |
866 | public void setCommitInterval(int commitInterval) { |
867 | this.commitInterval = commitInterval; |
868 | } |
869 | |
870 | /** |
871 | * Flag to signal that the reader is transactional (usually a JMS consumer) |
872 | * so that items are re-presented after a rollback. The default is false and |
873 | * readers are assumed to be forward-only. |
874 | * |
875 | * @param isReaderTransactionalQueue the value of the flag |
876 | */ |
877 | public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) { |
878 | this.readerTransactionalQueue = isReaderTransactionalQueue; |
879 | } |
880 | |
881 | /** |
882 | * Flag to signal that the processor is transactional, in which case it |
883 | * should be called for every item in every transaction. If false then we |
884 | * can cache the processor results between transactions in the case of a |
885 | * rollback. |
886 | * |
887 | * @param processorTransactional the value to set |
888 | */ |
889 | public void setProcessorTransactional(Boolean processorTransactional) { |
890 | this.processorTransactional = processorTransactional; |
891 | } |
892 | |
893 | /** |
894 | * Public setter for the retry limit. Each item can be retried up to this |
895 | * limit. Note this limit includes the initial attempt to process the item, |
896 | * therefore <code>retryLimit == 1</code> by default. |
897 | * |
898 | * @param retryLimit the retry limit to set, must be greater or equal to 1. |
899 | */ |
900 | public void setRetryLimit(int retryLimit) { |
901 | this.retryLimit = retryLimit; |
902 | } |
903 | |
904 | /** |
905 | * Public setter for a limit that determines skip policy. If this value is |
906 | * positive then an exception in chunk processing will cause the item to be |
907 | * skipped and no exception propagated until the limit is reached. If it is |
908 | * zero then all exceptions will be propagated from the chunk and cause the |
909 | * step to abort. |
910 | * |
911 | * @param skipLimit the value to set. Default is 0 (never skip). |
912 | */ |
913 | public void setSkipLimit(int skipLimit) { |
914 | this.skipLimit = skipLimit; |
915 | } |
916 | |
917 | /** |
918 | * Public setter for a skip policy. If this value is set then the skip limit |
919 | * and skippable exceptions are ignored. |
920 | * |
921 | * @param skipPolicy the {@link SkipPolicy} to set |
922 | */ |
923 | public void setSkipPolicy(SkipPolicy skipPolicy) { |
924 | this.skipPolicy = skipPolicy; |
925 | } |
926 | |
927 | /** |
928 | * Public setter for the {@link TaskExecutor}. If this is set, then it will |
929 | * be used to execute the chunk processing inside the {@link Step}. |
930 | * |
931 | * @param taskExecutor the taskExecutor to set |
932 | */ |
933 | public void setTaskExecutor(TaskExecutor taskExecutor) { |
934 | this.taskExecutor = taskExecutor; |
935 | } |
936 | |
937 | /** |
938 | * Public setter for the throttle limit. This limits the number of tasks |
939 | * queued for concurrent processing to prevent thread pools from being |
940 | * overwhelmed. Defaults to |
941 | * {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}. |
942 | * |
943 | * @param throttleLimit the throttle limit to set. |
944 | */ |
945 | public void setThrottleLimit(Integer throttleLimit) { |
946 | this.throttleLimit = throttleLimit; |
947 | } |
948 | |
949 | /** |
950 | * @param itemReader the {@link ItemReader} to set |
951 | */ |
952 | public void setItemReader(ItemReader<? extends I> itemReader) { |
953 | this.itemReader = itemReader; |
954 | } |
955 | |
956 | /** |
957 | * @param itemProcessor the {@link ItemProcessor} to set |
958 | */ |
959 | public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) { |
960 | this.itemProcessor = itemProcessor; |
961 | } |
962 | |
963 | /** |
964 | * @param itemWriter the {@link ItemWriter} to set |
965 | */ |
966 | public void setItemWriter(ItemWriter<? super O> itemWriter) { |
967 | this.itemWriter = itemWriter; |
968 | } |
969 | |
970 | // ========================================================= |
971 | // Chunk Elements |
972 | // ========================================================= |
973 | |
974 | /** |
975 | * Public setter for the {@link RetryListener}s. |
976 | * |
977 | * @param retryListeners the {@link RetryListener}s to set |
978 | */ |
979 | public void setRetryListeners(RetryListener... retryListeners) { |
980 | this.retryListeners = retryListeners; |
981 | } |
982 | |
983 | /** |
984 | * Public setter for exception classes that when raised won't crash the job |
985 | * but will result in transaction rollback and the item which handling |
986 | * caused the exception will be skipped. |
987 | * |
988 | * @param exceptionClasses |
989 | */ |
990 | public void setSkippableExceptionClasses(Map<Class<? extends Throwable>, Boolean> exceptionClasses) { |
991 | this.skippableExceptionClasses = exceptionClasses; |
992 | } |
993 | |
994 | /** |
995 | * Public setter for exception classes that will retry the item when raised. |
996 | * |
997 | * @param retryableExceptionClasses the retryableExceptionClasses to set |
998 | */ |
999 | public void setRetryableExceptionClasses(Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses) { |
1000 | this.retryableExceptionClasses = retryableExceptionClasses; |
1001 | } |
1002 | |
1003 | /** |
1004 | * The streams to inject into the {@link Step}. Any instance of |
1005 | * {@link ItemStream} can be used, and will then receive callbacks at the |
1006 | * appropriate stage in the step. |
1007 | * |
1008 | * @param streams an array of listeners |
1009 | */ |
1010 | public void setStreams(ItemStream[] streams) { |
1011 | this.streams = streams; |
1012 | } |
1013 | |
1014 | // ========================================================= |
1015 | // Additional |
1016 | // ========================================================= |
1017 | |
1018 | /** |
1019 | * @param hasChunkElement |
1020 | */ |
1021 | public void setHasChunkElement(boolean hasChunkElement) { |
1022 | this.hasChunkElement = hasChunkElement; |
1023 | } |
1024 | |
1025 | } |