View Javadoc

1   /*
2    * Copyright 2006-2013 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.springframework.batch.core.configuration.xml;
18  
19  import java.util.Collection;
20  import java.util.HashMap;
21  import java.util.HashSet;
22  import java.util.LinkedHashSet;
23  import java.util.Map;
24  import java.util.Set;
25  
26  import org.springframework.batch.core.ChunkListener;
27  import org.springframework.batch.core.ItemProcessListener;
28  import org.springframework.batch.core.ItemReadListener;
29  import org.springframework.batch.core.ItemWriteListener;
30  import org.springframework.batch.core.Job;
31  import org.springframework.batch.core.SkipListener;
32  import org.springframework.batch.core.Step;
33  import org.springframework.batch.core.StepExecutionListener;
34  import org.springframework.batch.core.StepListener;
35  import org.springframework.batch.core.job.flow.Flow;
36  import org.springframework.batch.core.launch.JobLauncher;
37  import org.springframework.batch.core.partition.PartitionHandler;
38  import org.springframework.batch.core.partition.support.Partitioner;
39  import org.springframework.batch.core.partition.support.StepExecutionAggregator;
40  import org.springframework.batch.core.repository.JobRepository;
41  import org.springframework.batch.core.step.builder.AbstractTaskletStepBuilder;
42  import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
43  import org.springframework.batch.core.step.builder.FlowStepBuilder;
44  import org.springframework.batch.core.step.builder.JobStepBuilder;
45  import org.springframework.batch.core.step.builder.PartitionStepBuilder;
46  import org.springframework.batch.core.step.builder.SimpleStepBuilder;
47  import org.springframework.batch.core.step.builder.StepBuilder;
48  import org.springframework.batch.core.step.builder.StepBuilderHelper;
49  import org.springframework.batch.core.step.builder.TaskletStepBuilder;
50  import org.springframework.batch.core.step.factory.FaultTolerantStepFactoryBean;
51  import org.springframework.batch.core.step.factory.SimpleStepFactoryBean;
52  import org.springframework.batch.core.step.item.KeyGenerator;
53  import org.springframework.batch.core.step.job.JobParametersExtractor;
54  import org.springframework.batch.core.step.skip.SkipPolicy;
55  import org.springframework.batch.core.step.tasklet.Tasklet;
56  import org.springframework.batch.core.step.tasklet.TaskletStep;
57  import org.springframework.batch.item.ItemProcessor;
58  import org.springframework.batch.item.ItemReader;
59  import org.springframework.batch.item.ItemStream;
60  import org.springframework.batch.item.ItemWriter;
61  import org.springframework.batch.repeat.CompletionPolicy;
62  import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
63  import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
64  import org.springframework.beans.factory.BeanNameAware;
65  import org.springframework.beans.factory.FactoryBean;
66  import org.springframework.classify.BinaryExceptionClassifier;
67  import org.springframework.core.task.TaskExecutor;
68  import org.springframework.retry.RetryListener;
69  import org.springframework.retry.RetryPolicy;
70  import org.springframework.retry.backoff.BackOffPolicy;
71  import org.springframework.retry.policy.MapRetryContextCache;
72  import org.springframework.retry.policy.RetryContextCache;
73  import org.springframework.transaction.PlatformTransactionManager;
74  import org.springframework.transaction.annotation.Isolation;
75  import org.springframework.transaction.annotation.Propagation;
76  import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
77  import org.springframework.util.Assert;
78  
79  /**
80   * This {@link FactoryBean} is used by the batch namespace parser to create {@link Step} objects. Stores all of the
81   * properties that are configurable on the <step/> (and its inner <tasklet/>). Based on which properties are
82   * configured, the {@link #getObject()} method will delegate to the appropriate class for generating the {@link Step}.
83   *
84   * @author Dan Garrette
85   * @author Josh Long
86   * @see SimpleStepFactoryBean
87   * @see FaultTolerantStepFactoryBean
88   * @see TaskletStep
89   * @since 2.0
90   */
91  @SuppressWarnings("rawtypes")
92  class StepParserStepFactoryBean<I, O> implements FactoryBean, BeanNameAware {
93  
94  	//
95  	// Step Attributes
96  	//
97  	private String name;
98  
99  	//
100 	// Tasklet Attributes
101 	//
102 	private Boolean allowStartIfComplete;
103 
104 	private JobRepository jobRepository;
105 
106 	private Integer startLimit;
107 
108 	private Tasklet tasklet;
109 
110 	private PlatformTransactionManager transactionManager;
111 
112 	private Set<StepExecutionListener> stepExecutionListeners = new LinkedHashSet<StepExecutionListener>();
113 
114 	//
115 	// Flow Elements
116 	//
117 	private Flow flow;
118 
119 	//
120 	// Job Elements
121 	//
122 	private Job job;
123 
124 	private JobLauncher jobLauncher;
125 
126 	private JobParametersExtractor jobParametersExtractor;
127 
128 	//
129 	// Partition Elements
130 	//
131 	private Partitioner partitioner;
132 
133 	private static final int DEFAULT_GRID_SIZE = 6;
134 
135 	private Step step;
136 
137 	private PartitionHandler partitionHandler;
138 
139 	private int gridSize = DEFAULT_GRID_SIZE;
140 
141 	//
142 	// Tasklet Elements
143 	//
144 	private Collection<Class<? extends Throwable>> noRollbackExceptionClasses;
145 
146 	private Integer transactionTimeout;
147 
148 	private Propagation propagation;
149 
150 	private Isolation isolation;
151 
152 	private Set<ChunkListener> chunkListeners = new LinkedHashSet<ChunkListener>();
153 
154 	//
155 	// Chunk Attributes
156 	//
157 	private int cacheCapacity = 0;
158 
159 	private CompletionPolicy chunkCompletionPolicy;
160 
161 	private Integer commitInterval;
162 
163 	private Boolean readerTransactionalQueue;
164 
165 	private Boolean processorTransactional;
166 
167 	private int retryLimit = 0;
168 
169 	private BackOffPolicy backOffPolicy;
170 
171 	private RetryPolicy retryPolicy;
172 
173 	private RetryContextCache retryContextCache;
174 
175 	private KeyGenerator keyGenerator;
176 
177 	private Integer skipLimit;
178 
179 	private SkipPolicy skipPolicy;
180 
181 	private TaskExecutor taskExecutor;
182 
183 	private Integer throttleLimit;
184 
185 	private ItemReader<? extends I> itemReader;
186 
187 	private ItemProcessor<? super I, ? extends O> itemProcessor;
188 
189 	private ItemWriter<? super O> itemWriter;
190 
191 	//
192 	// Chunk Elements
193 	//
194 	private RetryListener[] retryListeners;
195 
196 	private Map<Class<? extends Throwable>, Boolean> skippableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
197 
198 	private Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses = new HashMap<Class<? extends Throwable>, Boolean>();
199 
200 	private ItemStream[] streams;
201 
202 	private Set<ItemReadListener<I>> readListeners = new LinkedHashSet<ItemReadListener<I>>();
203 
204 	private Set<ItemWriteListener<O>> writeListeners = new LinkedHashSet<ItemWriteListener<O>>();
205 
206 	private Set<ItemProcessListener<I, O>> processListeners = new LinkedHashSet<ItemProcessListener<I, O>>();
207 
208 	private Set<SkipListener<I, O>> skipListeners = new LinkedHashSet<SkipListener<I, O>>();
209 
210 	//
211 	// Additional
212 	//
213 	private boolean hasChunkElement = false;
214 
215 	private StepExecutionAggregator stepExecutionAggregator;
216 
217 	private StepListener[] listeners;
218 
219 	/**
220 	 * Create a {@link Step} from the configuration provided.
221 	 *
222 	 * @see FactoryBean#getObject()
223 	 */
224 	@Override
225 	public final Object getObject() throws Exception {
226 		if (hasChunkElement) {
227 			Assert.isNull(tasklet, "Step [" + name
228 					+ "] has both a <chunk/> element and a 'ref' attribute  referencing a Tasklet.");
229 
230 			validateFaultTolerantSettings();
231 			if (isFaultTolerant()) {
232 				return createFaultTolerantStep();
233 			}
234 			else {
235 				return createSimpleStep();
236 			}
237 		}
238 		else if (tasklet != null) {
239 			return createTaskletStep();
240 		}
241 		else if (flow != null) {
242 			return createFlowStep();
243 		}
244 		else if (job != null) {
245 			return createJobStep();
246 		}
247 		else {
248 			return createPartitionStep();
249 		}
250 	}
251 
252 	public boolean requiresTransactionManager() {
253 		// Currently all step implementations other than TaskletStep are
254 		// AbstractStep and do not require a transaction manager
255 		return hasChunkElement || tasklet != null;
256 	}
257 
258 	private void enhanceCommonStep(StepBuilderHelper<?> builder) {
259 		if (allowStartIfComplete != null) {
260 			builder.allowStartIfComplete(allowStartIfComplete);
261 		}
262 		if (startLimit != null) {
263 			builder.startLimit(startLimit);
264 		}
265 		builder.repository(jobRepository);
266 		builder.transactionManager(transactionManager);
267 		for (StepExecutionListener listener : stepExecutionListeners) {
268 			builder.listener(listener);
269 		}
270 	}
271 
272 	private Step createPartitionStep() {
273 
274 		PartitionStepBuilder builder;
275 		if (partitioner != null) {
276 			builder = new StepBuilder(name).partitioner(step != null ? step.getName() : name, partitioner).step(step);
277 		}
278 		else {
279 			builder = new StepBuilder(name).partitioner(step);
280 		}
281 		enhanceCommonStep(builder);
282 
283 		if (partitionHandler != null) {
284 			builder.partitionHandler(partitionHandler);
285 		}
286 		else {
287 			builder.gridSize(gridSize);
288 			builder.taskExecutor(taskExecutor);
289 		}
290 
291 		builder.aggregator(stepExecutionAggregator);
292 
293 		return builder.build();
294 
295 	}
296 
297 	private Step createFaultTolerantStep() {
298 
299 		FaultTolerantStepBuilder<I, O> builder = new FaultTolerantStepBuilder<I, O>(new StepBuilder(name));
300 
301 		if (commitInterval != null) {
302 			builder.chunk(commitInterval);
303 		}
304 		enhanceTaskletStepBuilder(builder);
305 
306 		builder.reader(itemReader);
307 		builder.writer(itemWriter);
308 		builder.processor(itemProcessor);
309 
310 		if (processorTransactional != null && !processorTransactional) {
311 			builder.processorNonTransactional();
312 		}
313 
314 		for (SkipListener<I, O> listener : skipListeners) {
315 			builder.listener(listener);
316 		}
317 
318 		registerItemListeners(builder);
319 
320 		if (skipPolicy != null) {
321 			builder.skipPolicy(skipPolicy);
322 		}
323 		else if (skipLimit!=null) {
324 			builder.skipLimit(skipLimit);
325 			for (Class<? extends Throwable> type : skippableExceptionClasses.keySet()) {
326 				if (skippableExceptionClasses.get(type)) {
327 					builder.skip(type);
328 				}
329 				else {
330 					builder.noSkip(type);
331 				}
332 			}
333 		}
334 
335 		if (retryListeners != null) {
336 			for (RetryListener listener : retryListeners) {
337 				builder.listener(listener);
338 			}
339 		}
340 
341 		if (retryContextCache == null && cacheCapacity > 0) {
342 			retryContextCache = new MapRetryContextCache(cacheCapacity);
343 		}
344 		builder.retryContextCache(retryContextCache);
345 		builder.keyGenerator(keyGenerator);
346 		if (retryPolicy != null) {
347 			builder.retryPolicy(retryPolicy);
348 		}
349 		else {
350 			builder.retryLimit(retryLimit);
351 			builder.backOffPolicy(backOffPolicy);
352 			for (Class<? extends Throwable> type : retryableExceptionClasses.keySet()) {
353 				if (retryableExceptionClasses.get(type)) {
354 					builder.retry(type);
355 				}
356 				else {
357 					builder.noRetry(type);
358 				}
359 			}
360 		}
361 
362 		if (noRollbackExceptionClasses != null) {
363 			for (Class<? extends Throwable> type : noRollbackExceptionClasses) {
364 				builder.noRollback(type);
365 			}
366 		}
367 
368 		return builder.build();
369 
370 	}
371 
372 	private void registerItemListeners(SimpleStepBuilder<I, O> builder) {
373 		for (ItemReadListener<I> listener : readListeners) {
374 			builder.listener(listener);
375 		}
376 		for (ItemWriteListener<O> listener : writeListeners) {
377 			builder.listener(listener);
378 		}
379 		for (ItemProcessListener<I, O> listener : processListeners) {
380 			builder.listener(listener);
381 		}
382 	}
383 
384 	@SuppressWarnings("unchecked")
385 	private Step createSimpleStep() {
386 		SimpleStepBuilder builder = new SimpleStepBuilder(new StepBuilder(name));
387 		if (commitInterval != null) {
388 			builder.chunk(commitInterval);
389 		}
390 		enhanceTaskletStepBuilder(builder);
391 		registerItemListeners(builder);
392 		builder.reader(itemReader);
393 		builder.writer(itemWriter);
394 		builder.processor(itemProcessor);
395 		builder.chunk(chunkCompletionPolicy);
396 		return builder.build();
397 	}
398 
399 	private TaskletStep createTaskletStep() {
400 		TaskletStepBuilder builder = new StepBuilder(name).tasklet(tasklet);
401 		enhanceTaskletStepBuilder(builder);
402 		return builder.build();
403 	}
404 
405 	@SuppressWarnings("serial")
406 	private void enhanceTaskletStepBuilder(AbstractTaskletStepBuilder<?> builder) {
407 
408 		enhanceCommonStep(builder);
409 		for (ChunkListener listener : chunkListeners) {
410 			builder.listener(listener);
411 
412 		}
413 		builder.taskExecutor(taskExecutor);
414 		if (throttleLimit != null) {
415 			builder.throttleLimit(throttleLimit);
416 		}
417 		builder.transactionManager(transactionManager);
418 		if (transactionTimeout != null || propagation != null || isolation != null
419 				|| noRollbackExceptionClasses != null) {
420 			DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
421 			if (propagation != null) {
422 				attribute.setPropagationBehavior(propagation.value());
423 			}
424 			if (isolation != null) {
425 				attribute.setIsolationLevel(isolation.value());
426 			}
427 			if (transactionTimeout != null) {
428 				attribute.setTimeout(transactionTimeout);
429 			}
430 			Collection<Class<? extends Throwable>> exceptions = noRollbackExceptionClasses == null ? new HashSet<Class<? extends Throwable>>()
431 					: noRollbackExceptionClasses;
432 			final BinaryExceptionClassifier classifier = new BinaryExceptionClassifier(exceptions, false);
433 			builder.transactionAttribute(new DefaultTransactionAttribute(attribute) {
434 				@Override
435 				public boolean rollbackOn(Throwable ex) {
436 					return classifier.classify(ex);
437 				}
438 			});
439 		}
440 		if (streams != null) {
441 			for (ItemStream stream : streams) {
442 				builder.stream(stream);
443 			}
444 		}
445 
446 	}
447 
448 	private Step createFlowStep() {
449 		FlowStepBuilder builder = new StepBuilder(name).flow(flow);
450 		enhanceCommonStep(builder);
451 		return builder.build();
452 	}
453 
454 	private Step createJobStep() throws Exception {
455 
456 		JobStepBuilder builder = new StepBuilder(name).job(job);
457 		enhanceCommonStep(builder);
458 		builder.parametersExtractor(jobParametersExtractor);
459 		builder.launcher(jobLauncher);
460 		return builder.build();
461 
462 	}
463 
464 	private void validateFaultTolerantSettings() {
465 		validateDependency("skippable-exception-classes", skippableExceptionClasses, "skip-limit", skipLimit, true);
466 		validateDependency("retryable-exception-classes", retryableExceptionClasses, "retry-limit", retryLimit, true);
467 		validateDependency("retry-listeners", retryListeners, "retry-limit", retryLimit, false);
468 		if (isPresent(processorTransactional) && !processorTransactional && isPresent(readerTransactionalQueue)
469 				&& readerTransactionalQueue) {
470 			throw new IllegalArgumentException(
471 					"The field 'processor-transactional' cannot be false if 'reader-transactional-queue' is true");
472 		}
473 	}
474 
475 	/**
476 	 * Check if a field is present then a second is also. If the twoWayDependency flag is set then the opposite must
477 	 * also be true: if the second value is present, the first must also be.
478 	 *
479 	 * @param dependentName the name of the first field
480 	 * @param dependentValue the value of the first field
481 	 * @param name the name of the other field (which should be absent if the first is present)
482 	 * @param value the value of the other field
483 	 * @param twoWayDependency true if both depend on each other
484 	 * @throws IllegalArgumentException if either condition is violated
485 	 */
486 	private void validateDependency(String dependentName, Object dependentValue, String name, Object value,
487 			boolean twoWayDependency) {
488 		if (isPresent(dependentValue) && !isPresent(value)) {
489 			throw new IllegalArgumentException("The field '" + dependentName + "' is not permitted on the step ["
490 					+ this.name + "] because there is no '" + name + "'.");
491 		}
492 		if (twoWayDependency && isPresent(value) && !isPresent(dependentValue)) {
493 			throw new IllegalArgumentException("The field '" + name + "' is not permitted on the step [" + this.name
494 					+ "] because there is no '" + dependentName + "'.");
495 		}
496 	}
497 
498 	/**
499 	 * Is the object non-null (or if an Integer, non-zero)?
500 	 *
501 	 * @param o an object
502 	 * @return true if the object has a value
503 	 */
504 	private boolean isPresent(Object o) {
505 		if (o instanceof Integer) {
506 			return isPositive((Integer) o);
507 		}
508 		if (o instanceof Collection) {
509 			return !((Collection<?>) o).isEmpty();
510 		}
511 		if (o instanceof Map) {
512 			return !((Map<?, ?>) o).isEmpty();
513 		}
514 		return o != null;
515 	}
516 
517 	private boolean isFaultTolerant() {
518 		return backOffPolicy != null || skipPolicy != null || retryPolicy != null || isPositive(skipLimit)
519 				|| isPositive(retryLimit) || isPositive(cacheCapacity) || isTrue(readerTransactionalQueue);
520 	}
521 
522 	private boolean isTrue(Boolean b) {
523 		return b != null && b.booleanValue();
524 	}
525 
526 	private boolean isPositive(Integer n) {
527 		return n != null && n > 0;
528 	}
529 
530 	@Override
531 	public Class<TaskletStep> getObjectType() {
532 		return TaskletStep.class;
533 	}
534 
535 	@Override
536 	public boolean isSingleton() {
537 		return true;
538 	}
539 
540 	// =========================================================
541 	// Step Attributes
542 	// =========================================================
543 
544 	/**
545 	 * Set the bean name property, which will become the name of the {@link Step} when it is created.
546 	 *
547 	 * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
548 	 */
549 	@Override
550 	public void setBeanName(String name) {
551 		if (this.name == null) {
552 			this.name = name;
553 		}
554 	}
555 
556 	/**
557 	 * @param name the name to set
558 	 */
559 	public void setName(String name) {
560 		this.name = name;
561 	}
562 
563 	// =========================================================
564 	// Flow Attributes
565 	// =========================================================
566 
567 	/**
568 	 * @param flow the flow to set
569 	 */
570 	public void setFlow(Flow flow) {
571 		this.flow = flow;
572 	}
573 
574 	// =========================================================
575 	// Job Attributes
576 	// =========================================================
577 
578 	public void setJob(Job job) {
579 		this.job = job;
580 	}
581 
582 	public void setJobParametersExtractor(JobParametersExtractor jobParametersExtractor) {
583 		this.jobParametersExtractor = jobParametersExtractor;
584 	}
585 
586 	public void setJobLauncher(JobLauncher jobLauncher) {
587 		this.jobLauncher = jobLauncher;
588 	}
589 
590 	// =========================================================
591 	// Partition Attributes
592 	// =========================================================
593 
594 	/**
595 	 * @param partitioner the partitioner to set
596 	 */
597 	public void setPartitioner(Partitioner partitioner) {
598 		this.partitioner = partitioner;
599 	}
600 
601 	/**
602 	 * @param stepExecutionAggregator the stepExecutionAggregator to set
603 	 */
604 	public void setStepExecutionAggregator(StepExecutionAggregator stepExecutionAggregator) {
605 		this.stepExecutionAggregator = stepExecutionAggregator;
606 	}
607 
608 	/**
609 	 * @param partitionHandler the partitionHandler to set
610 	 */
611 	public void setPartitionHandler(PartitionHandler partitionHandler) {
612 		this.partitionHandler = partitionHandler;
613 	}
614 
615 	/**
616 	 * @param gridSize the gridSize to set
617 	 */
618 	public void setGridSize(int gridSize) {
619 		this.gridSize = gridSize;
620 	}
621 
622 	/**
623 	 * @param step the step to set
624 	 */
625 	public void setStep(Step step) {
626 		this.step = step;
627 	}
628 
629 	// =========================================================
630 	// Tasklet Attributes
631 	// =========================================================
632 
633 	/**
634 	 * Public setter for the flag to indicate that the step should be replayed on a restart, even if successful the
635 	 * first time.
636 	 *
637 	 * @param allowStartIfComplete the shouldAllowStartIfComplete to set
638 	 */
639 	public void setAllowStartIfComplete(boolean allowStartIfComplete) {
640 		this.allowStartIfComplete = allowStartIfComplete;
641 
642 	}
643 
644 	/**
645 	 * @return jobRepository
646 	 */
647 	public JobRepository getJobRepository() {
648 		return jobRepository;
649 	}
650 
651 	/**
652 	 * Public setter for {@link JobRepository}.
653 	 *
654 	 * @param jobRepository
655 	 */
656 	public void setJobRepository(JobRepository jobRepository) {
657 		this.jobRepository = jobRepository;
658 	}
659 
660 	/**
661 	 * The number of times that the step should be allowed to start
662 	 *
663 	 * @param startLimit
664 	 */
665 	public void setStartLimit(int startLimit) {
666 		this.startLimit = startLimit;
667 	}
668 
669 	/**
670 	 * A preconfigured {@link Tasklet} to use.
671 	 *
672 	 * @param tasklet
673 	 */
674 	public void setTasklet(Tasklet tasklet) {
675 		this.tasklet = tasklet;
676 	}
677 
678 	/**
679 	 * @return transactionManager
680 	 */
681 	public PlatformTransactionManager getTransactionManager() {
682 		return transactionManager;
683 	}
684 
685 	/**
686 	 * @param transactionManager the transaction manager to set
687 	 */
688 	public void setTransactionManager(PlatformTransactionManager transactionManager) {
689 		this.transactionManager = transactionManager;
690 	}
691 
692 	// =========================================================
693 	// Tasklet Elements
694 	// =========================================================
695 
696 	/**
697 	 * The listeners to inject into the {@link Step}. Any instance of {@link StepListener} can be used, and will then
698 	 * receive callbacks at the appropriate stage in the step.
699 	 *
700 	 * @param listeners an array of listeners
701 	 */
702 	public void setListeners(StepListener[] listeners) {
703 		this.listeners = listeners; // useful for testing
704 		for (StepListener listener : listeners) {
705 			if (listener instanceof SkipListener) {
706 				@SuppressWarnings("unchecked")
707 				SkipListener<I, O> skipListener = (SkipListener<I, O>) listener;
708 				skipListeners.add(skipListener);
709 			}
710 			if (listener instanceof StepExecutionListener) {
711 				StepExecutionListener stepExecutionListener = (StepExecutionListener) listener;
712 				stepExecutionListeners.add(stepExecutionListener);
713 			}
714 			if (listener instanceof ChunkListener) {
715 				ChunkListener chunkListener = (ChunkListener) listener;
716 				chunkListeners.add(chunkListener);
717 			}
718 			if (listener instanceof ItemReadListener) {
719 				@SuppressWarnings("unchecked")
720 				ItemReadListener<I> readListener = (ItemReadListener<I>) listener;
721 				readListeners.add(readListener);
722 			}
723 			if (listener instanceof ItemWriteListener) {
724 				@SuppressWarnings("unchecked")
725 				ItemWriteListener<O> writeListener = (ItemWriteListener<O>) listener;
726 				writeListeners.add(writeListener);
727 			}
728 			if (listener instanceof ItemProcessListener) {
729 				@SuppressWarnings("unchecked")
730 				ItemProcessListener<I, O> processListener = (ItemProcessListener<I, O>) listener;
731 				processListeners.add(processListener);
732 			}
733 		}
734 	}
735 
736 	/**
737 	 * Exception classes that may not cause a rollback if encountered in the right place.
738 	 *
739 	 * @param noRollbackExceptionClasses the noRollbackExceptionClasses to set
740 	 */
741 	public void setNoRollbackExceptionClasses(Collection<Class<? extends Throwable>> noRollbackExceptionClasses) {
742 		this.noRollbackExceptionClasses = noRollbackExceptionClasses;
743 	}
744 
745 	/**
746 	 * @param transactionTimeout the transactionTimeout to set
747 	 */
748 	public void setTransactionTimeout(int transactionTimeout) {
749 		this.transactionTimeout = transactionTimeout;
750 	}
751 
752 	/**
753 	 * @param isolation the isolation to set
754 	 */
755 	public void setIsolation(Isolation isolation) {
756 		this.isolation = isolation;
757 	}
758 
759 	/**
760 	 * @param propagation the propagation to set
761 	 */
762 	public void setPropagation(Propagation propagation) {
763 		this.propagation = propagation;
764 	}
765 
766 	// =========================================================
767 	// Parent Attributes - can be provided in parent bean but not namespace
768 	// =========================================================
769 
770 	/**
771 	 * A backoff policy to be applied to retry process.
772 	 *
773 	 * @param backOffPolicy the {@link BackOffPolicy} to set
774 	 */
775 	public void setBackOffPolicy(BackOffPolicy backOffPolicy) {
776 		this.backOffPolicy = backOffPolicy;
777 	}
778 
779 	/**
780 	 * A retry policy to apply when exceptions occur. If this is specified then the retry limit and retryable exceptions
781 	 * will be ignored.
782 	 *
783 	 * @param retryPolicy the {@link RetryPolicy} to set
784 	 */
785 	public void setRetryPolicy(RetryPolicy retryPolicy) {
786 		this.retryPolicy = retryPolicy;
787 	}
788 
789 	/**
790 	 * @param retryContextCache the {@link RetryContextCache} to set
791 	 */
792 	public void setRetryContextCache(RetryContextCache retryContextCache) {
793 		this.retryContextCache = retryContextCache;
794 	}
795 
796 	/**
797 	 * A key generator that can be used to compare items with previously recorded items in a retry. Only used if the
798 	 * reader is a transactional queue.
799 	 *
800 	 * @param keyGenerator the {@link KeyGenerator} to set
801 	 */
802 	public void setKeyGenerator(KeyGenerator keyGenerator) {
803 		this.keyGenerator = keyGenerator;
804 	}
805 
806 	// =========================================================
807 	// Chunk Attributes
808 	// =========================================================
809 
810 	/**
811 	 * Public setter for the capacity of the cache in the retry policy. If more items than this fail without being
812 	 * skipped or recovered an exception will be thrown. This is to guard against inadvertent infinite loops generated
813 	 * by item identity problems.<br/>
814 	 * <p/>
815 	 * The default value should be high enough and more for most purposes. To breach the limit in a single-threaded step
816 	 * typically you have to have this many failures in a single transaction. Defaults to the value in the
817 	 * {@link MapRetryContextCache}.<br/>
818 	 *
819 	 * @param cacheCapacity the cache capacity to set (greater than 0 else ignored)
820 	 */
821 	public void setCacheCapacity(int cacheCapacity) {
822 		this.cacheCapacity = cacheCapacity;
823 	}
824 
825 	/**
826 	 * Public setter for the {@link CompletionPolicy} applying to the chunk level. A transaction will be committed when
827 	 * this policy decides to complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size equal to the
828 	 * commitInterval property.
829 	 *
830 	 * @param chunkCompletionPolicy the chunkCompletionPolicy to set
831 	 */
832 	public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) {
833 		this.chunkCompletionPolicy = chunkCompletionPolicy;
834 	}
835 
836 	/**
837 	 * Set the commit interval. Either set this or the chunkCompletionPolicy but not both.
838 	 *
839 	 * @param commitInterval 1 by default
840 	 */
841 	public void setCommitInterval(int commitInterval) {
842 		this.commitInterval = commitInterval;
843 	}
844 
845 	/**
846 	 * Flag to signal that the reader is transactional (usually a JMS consumer) so that items are re-presented after a
847 	 * rollback. The default is false and readers are assumed to be forward-only.
848 	 *
849 	 * @param isReaderTransactionalQueue the value of the flag
850 	 */
851 	public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) {
852 		this.readerTransactionalQueue = isReaderTransactionalQueue;
853 	}
854 
855 	/**
856 	 * Flag to signal that the processor is transactional, in which case it should be called for every item in every
857 	 * transaction. If false then we can cache the processor results between transactions in the case of a rollback.
858 	 *
859 	 * @param processorTransactional the value to set
860 	 */
861 	public void setProcessorTransactional(Boolean processorTransactional) {
862 		this.processorTransactional = processorTransactional;
863 	}
864 
865 	/**
866 	 * Public setter for the retry limit. Each item can be retried up to this limit. Note this limit includes the
867 	 * initial attempt to process the item, therefore <code>retryLimit == 1</code> by default.
868 	 *
869 	 * @param retryLimit the retry limit to set, must be greater or equal to 1.
870 	 */
871 	public void setRetryLimit(int retryLimit) {
872 		this.retryLimit = retryLimit;
873 	}
874 
875 	/**
876 	 * Public setter for a limit that determines skip policy. If this value is positive then an exception in chunk
877 	 * processing will cause the item to be skipped and no exception propagated until the limit is reached. If it is
878 	 * zero then all exceptions will be propagated from the chunk and cause the step to abort.
879 	 *
880 	 * @param skipLimit the value to set. Default is 0 (never skip).
881 	 */
882 	public void setSkipLimit(int skipLimit) {
883 		this.skipLimit = skipLimit;
884 	}
885 
886 	/**
887 	 * Public setter for a skip policy. If this value is set then the skip limit and skippable exceptions are ignored.
888 	 *
889 	 * @param skipPolicy the {@link SkipPolicy} to set
890 	 */
891 	public void setSkipPolicy(SkipPolicy skipPolicy) {
892 		this.skipPolicy = skipPolicy;
893 	}
894 
895 	/**
896 	 * Public setter for the {@link TaskExecutor}. If this is set, then it will be used to execute the chunk processing
897 	 * inside the {@link Step}.
898 	 *
899 	 * @param taskExecutor the taskExecutor to set
900 	 */
901 	public void setTaskExecutor(TaskExecutor taskExecutor) {
902 		this.taskExecutor = taskExecutor;
903 	}
904 
905 	/**
906 	 * Public setter for the throttle limit. This limits the number of tasks queued for concurrent processing to prevent
907 	 * thread pools from being overwhelmed. Defaults to {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}.
908 	 *
909 	 * @param throttleLimit the throttle limit to set.
910 	 */
911 	public void setThrottleLimit(Integer throttleLimit) {
912 		this.throttleLimit = throttleLimit;
913 	}
914 
915 	/**
916 	 * @param itemReader the {@link ItemReader} to set
917 	 */
918 	public void setItemReader(ItemReader<? extends I> itemReader) {
919 		this.itemReader = itemReader;
920 	}
921 
922 	/**
923 	 * @param itemProcessor the {@link ItemProcessor} to set
924 	 */
925 	public void setItemProcessor(ItemProcessor<? super I, ? extends O> itemProcessor) {
926 		this.itemProcessor = itemProcessor;
927 	}
928 
929 	/**
930 	 * @param itemWriter the {@link ItemWriter} to set
931 	 */
932 	public void setItemWriter(ItemWriter<? super O> itemWriter) {
933 		this.itemWriter = itemWriter;
934 	}
935 
936 	// =========================================================
937 	// Chunk Elements
938 	// =========================================================
939 
940 	/**
941 	 * Public setter for the {@link RetryListener}s.
942 	 *
943 	 * @param retryListeners the {@link RetryListener}s to set
944 	 */
945 	public void setRetryListeners(RetryListener... retryListeners) {
946 		this.retryListeners = retryListeners;
947 	}
948 
949 	/**
950 	 * Public setter for exception classes that when raised won't crash the job but will result in transaction rollback
951 	 * and the item which handling caused the exception will be skipped.
952 	 *
953 	 * @param exceptionClasses
954 	 */
955 	public void setSkippableExceptionClasses(Map<Class<? extends Throwable>, Boolean> exceptionClasses) {
956 		this.skippableExceptionClasses = exceptionClasses;
957 	}
958 
959 	/**
960 	 * Public setter for exception classes that will retry the item when raised.
961 	 *
962 	 * @param retryableExceptionClasses the retryableExceptionClasses to set
963 	 */
964 	public void setRetryableExceptionClasses(Map<Class<? extends Throwable>, Boolean> retryableExceptionClasses) {
965 		this.retryableExceptionClasses = retryableExceptionClasses;
966 	}
967 
968 	/**
969 	 * The streams to inject into the {@link Step}. Any instance of {@link ItemStream} can be used, and will then
970 	 * receive callbacks at the appropriate stage in the step.
971 	 *
972 	 * @param streams an array of listeners
973 	 */
974 	public void setStreams(ItemStream[] streams) {
975 		this.streams = streams;
976 	}
977 
978 	// =========================================================
979 	// Additional
980 	// =========================================================
981 
982 	/**
983 	 * @param hasChunkElement
984 	 */
985 	public void setHasChunkElement(boolean hasChunkElement) {
986 		this.hasChunkElement = hasChunkElement;
987 	}
988 
989 }