EMMA Coverage Report (generated Fri Aug 21 15:59:46 BST 2009)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [SimpleStepFactoryBean.java]

nameclass, %method, %block, %line, %
SimpleStepFactoryBean.java100% (3/3)96%  (47/49)98%  (613/628)97%  (159.5/165)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SimpleStepFactoryBean100% (1/1)96%  (43/45)98%  (596/611)97%  (156.5/162)
setChunkOperations (RepeatOperations): void 0%   (0/1)0%   (0/4)0%   (0/2)
setSingleton (boolean): void 0%   (0/1)0%   (0/4)0%   (0/2)
registerStepListeners (TaskletStep, RepeatOperations): void 100% (1/1)95%  (69/73)92%  (11/12)
registerItemListeners (SimpleChunkProvider, SimpleChunkProcessor): void 100% (1/1)97%  (91/94)98%  (21.5/22)
SimpleStepFactoryBean (): void 100% (1/1)100% (51/51)100% (15/15)
applyConfiguration (TaskletStep): void 100% (1/1)100% (129/129)100% (31/31)
configureChunkProcessor (): SimpleChunkProcessor 100% (1/1)100% (8/8)100% (1/1)
configureChunkProvider (): SimpleChunkProvider 100% (1/1)100% (8/8)100% (1/1)
getChunkCompletionPolicy (): CompletionPolicy 100% (1/1)100% (41/41)100% (9/9)
getChunkOperations (): RepeatOperations 100% (1/1)100% (3/3)100% (1/1)
getExceptionHandler (): ExceptionHandler 100% (1/1)100% (3/3)100% (1/1)
getItemProcessor (): ItemProcessor 100% (1/1)100% (3/3)100% (1/1)
getItemReader (): ItemReader 100% (1/1)100% (3/3)100% (1/1)
getItemWriter (): ItemWriter 100% (1/1)100% (3/3)100% (1/1)
getListeners (): StepListener [] 100% (1/1)100% (3/3)100% (1/1)
getName (): String 100% (1/1)100% (3/3)100% (1/1)
getObject (): Object 100% (1/1)100% (13/13)100% (4/4)
getObjectType (): Class 100% (1/1)100% (2/2)100% (1/1)
getStepOperations (): RepeatOperations 100% (1/1)100% (3/3)100% (1/1)
getTaskExecutor (): TaskExecutor 100% (1/1)100% (3/3)100% (1/1)
getTransactionAttribute (): TransactionAttribute 100% (1/1)100% (24/24)100% (5/5)
isReaderTransactionalQueue (): boolean 100% (1/1)100% (3/3)100% (1/1)
isSingleton (): boolean 100% (1/1)100% (3/3)100% (1/1)
registerStreams (TaskletStep, ItemReader, ItemProcessor, ItemWriter): void 100% (1/1)100% (43/43)100% (4/4)
registerStreams (TaskletStep, ItemStream []): void 100% (1/1)100% (4/4)100% (2/2)
setAllowStartIfComplete (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setBeanName (String): void 100% (1/1)100% (4/4)100% (2/2)
setChunkCompletionPolicy (CompletionPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setCommitInterval (int): void 100% (1/1)100% (4/4)100% (2/2)
setExceptionHandler (ExceptionHandler): void 100% (1/1)100% (4/4)100% (2/2)
setIsReaderTransactionalQueue (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setIsolation (Isolation): void 100% (1/1)100% (4/4)100% (2/2)
setItemProcessor (ItemProcessor): void 100% (1/1)100% (4/4)100% (2/2)
setItemReader (ItemReader): void 100% (1/1)100% (4/4)100% (2/2)
setItemWriter (ItemWriter): void 100% (1/1)100% (4/4)100% (2/2)
setJobRepository (JobRepository): void 100% (1/1)100% (4/4)100% (2/2)
setListeners (StepListener []): void 100% (1/1)100% (4/4)100% (2/2)
setPropagation (Propagation): void 100% (1/1)100% (4/4)100% (2/2)
setStartLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
setStepOperations (RepeatOperations): void 100% (1/1)100% (4/4)100% (2/2)
setStreams (ItemStream []): void 100% (1/1)100% (4/4)100% (2/2)
setTaskExecutor (TaskExecutor): void 100% (1/1)100% (4/4)100% (2/2)
setThrottleLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
setTransactionManager (PlatformTransactionManager): void 100% (1/1)100% (4/4)100% (2/2)
setTransactionTimeout (int): void 100% (1/1)100% (4/4)100% (2/2)
     
class SimpleStepFactoryBean$1100% (1/1)100% (2/2)100% (8/8)100% (3/3)
SimpleStepFactoryBean$1 (SimpleStepFactoryBean): void 100% (1/1)100% (6/6)100% (2/2)
process (Object): Object 100% (1/1)100% (2/2)100% (1/1)
     
class SimpleStepFactoryBean$2100% (1/1)100% (2/2)100% (9/9)100% (3/3)
SimpleStepFactoryBean$2 (SimpleStepFactoryBean, TransactionAttribute): void 100% (1/1)100% (7/7)100% (2/2)
rollbackOn (Throwable): boolean 100% (1/1)100% (2/2)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.item;
17 
18import org.apache.commons.logging.Log;
19import org.apache.commons.logging.LogFactory;
20import org.springframework.batch.core.ChunkListener;
21import org.springframework.batch.core.ItemProcessListener;
22import org.springframework.batch.core.ItemReadListener;
23import org.springframework.batch.core.ItemWriteListener;
24import org.springframework.batch.core.SkipListener;
25import org.springframework.batch.core.Step;
26import org.springframework.batch.core.StepExecutionListener;
27import org.springframework.batch.core.StepListener;
28import org.springframework.batch.core.listener.StepListenerFactoryBean;
29import org.springframework.batch.core.repository.JobRepository;
30import org.springframework.batch.core.step.tasklet.TaskletStep;
31import org.springframework.batch.item.ItemProcessor;
32import org.springframework.batch.item.ItemReader;
33import org.springframework.batch.item.ItemStream;
34import org.springframework.batch.item.ItemWriter;
35import org.springframework.batch.repeat.CompletionPolicy;
36import org.springframework.batch.repeat.RepeatOperations;
37import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
38import org.springframework.batch.repeat.exception.ExceptionHandler;
39import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
40import org.springframework.batch.repeat.support.RepeatTemplate;
41import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
42import org.springframework.beans.factory.BeanNameAware;
43import org.springframework.beans.factory.FactoryBean;
44import org.springframework.core.task.TaskExecutor;
45import org.springframework.transaction.PlatformTransactionManager;
46import org.springframework.transaction.annotation.Isolation;
47import org.springframework.transaction.annotation.Propagation;
48import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
49import org.springframework.transaction.interceptor.TransactionAttribute;
50import org.springframework.util.Assert;
51 
52/**
53 * Most common configuration options for simple steps should be found here. Use
54 * this factory bean instead of creating a {@link Step} implementation manually.
55 * 
56 * This factory does not support configuration of fault-tolerant behavior, use
57 * appropriate subclass of this factory bean to configure skip or retry.
58 * 
59 * @see FaultTolerantStepFactoryBean
60 * 
61 * @author Dave Syer
62 * @author Robert Kasanicky
63 * 
64 */
65public class SimpleStepFactoryBean<T, S> implements FactoryBean, BeanNameAware {
66 
67        private static final int DEFAULT_COMMIT_INTERVAL = 1;
68 
69        private String name;
70 
71        private int startLimit = Integer.MAX_VALUE;
72 
73        private boolean allowStartIfComplete;
74 
75        private ItemReader<? extends T> itemReader;
76 
77        private ItemWriter<? super S> itemWriter;
78 
79        private PlatformTransactionManager transactionManager;
80 
81        private Propagation propagation = Propagation.REQUIRED;
82 
83        private Isolation isolation = Isolation.DEFAULT;
84 
85        private int transactionTimeout = DefaultTransactionAttribute.TIMEOUT_DEFAULT;
86 
87        private JobRepository jobRepository;
88 
89        private boolean singleton = true;
90 
91        private ItemStream[] streams = new ItemStream[0];
92 
93        private StepListener[] listeners = new StepListener[0];
94 
95        protected final Log logger = LogFactory.getLog(getClass());
96 
97        private ItemProcessor<? super T, ? extends S> itemProcessor = new ItemProcessor<T, S>() {
98                @SuppressWarnings("unchecked")
99                public S process(T item) throws Exception {
100                        return (S) item;
101                }
102        };
103 
104        private int commitInterval = 0;
105 
106        private TaskExecutor taskExecutor;
107 
108        private RepeatOperations stepOperations;
109 
110        private RepeatOperations chunkOperations;
111 
112        private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
113 
114        private CompletionPolicy chunkCompletionPolicy;
115 
116        private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT;
117 
118        private boolean isReaderTransactionalQueue = false;
119 
120        /**
121         * Default constructor for {@link SimpleStepFactoryBean}.
122         */
123        public SimpleStepFactoryBean() {
124                super();
125        }
126 
127        /**
128         * Flag to signal that the reader is transactional (usually a JMS consumer)
129         * so that items are re-presented after a rollback. The default is false and
130         * readers are assumed to be forward-only.
131         * 
132         * @param isReaderTransactionalQueue the value of the flag
133         */
134        public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) {
135                this.isReaderTransactionalQueue = isReaderTransactionalQueue;
136        }
137 
138        /**
139         * Convenience method for subclasses.
140         * @return true if the flag is set (default false)
141         */
142        protected boolean isReaderTransactionalQueue() {
143                return isReaderTransactionalQueue;
144        }
145 
146        /**
147         * Set the bean name property, which will become the name of the
148         * {@link Step} when it is created.
149         * 
150         * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
151         */
152        public void setBeanName(String name) {
153                this.name = name;
154        }
155 
156        /**
157         * Public getter for the name of the step.
158         * @return the name
159         */
160        public String getName() {
161                return name;
162        }
163 
164        /**
165         * The timeout for an individual transaction in the step.
166         * 
167         * @param transactionTimeout the transaction timeout to set, defaults to
168         * infinite
169         */
170        public void setTransactionTimeout(int transactionTimeout) {
171                this.transactionTimeout = transactionTimeout;
172        }
173 
174        /**
175         * @param propagation the propagation to set for business transactions
176         */
177        public void setPropagation(Propagation propagation) {
178                this.propagation = propagation;
179        }
180 
181        /**
182         * @param isolation the isolation to set for business transactions
183         */
184        public void setIsolation(Isolation isolation) {
185                this.isolation = isolation;
186        }
187 
188        /**
189         * Public setter for the start limit for the step.
190         * 
191         * @param startLimit the startLimit to set
192         */
193        public void setStartLimit(int startLimit) {
194                this.startLimit = startLimit;
195        }
196 
197        /**
198         * Public setter for the flag to indicate that the step should be replayed
199         * on a restart, even if successful the first time.
200         * 
201         * @param allowStartIfComplete the shouldAllowStartIfComplete to set
202         */
203        public void setAllowStartIfComplete(boolean allowStartIfComplete) {
204                this.allowStartIfComplete = allowStartIfComplete;
205        }
206 
207        /**
208         * @param itemReader the {@link ItemReader} to set
209         */
210        public void setItemReader(ItemReader<? extends T> itemReader) {
211                this.itemReader = itemReader;
212        }
213 
214        /**
215         * @param itemWriter the {@link ItemWriter} to set
216         */
217        public void setItemWriter(ItemWriter<? super S> itemWriter) {
218                this.itemWriter = itemWriter;
219        }
220 
221        /**
222         * @param itemProcessor the {@link ItemProcessor} to set
223         */
224        public void setItemProcessor(ItemProcessor<? super T, ? extends S> itemProcessor) {
225                this.itemProcessor = itemProcessor;
226        }
227 
228        /**
229         * The streams to inject into the {@link Step}. Any instance of
230         * {@link ItemStream} can be used, and will then receive callbacks at the
231         * appropriate stage in the step.
232         * 
233         * @param streams an array of listeners
234         */
235        public void setStreams(ItemStream[] streams) {
236                this.streams = streams;
237        }
238 
239        /**
240         * The listeners to inject into the {@link Step}. Any instance of
241         * {@link StepListener} can be used, and will then receive callbacks at the
242         * appropriate stage in the step.
243         * 
244         * @param listeners an array of listeners
245         */
246        public void setListeners(StepListener[] listeners) {
247                this.listeners = listeners;
248        }
249 
250        /**
251         * Protected getter for the {@link StepListener}s.
252         * @return the listeners
253         */
254        protected StepListener[] getListeners() {
255                return listeners;
256        }
257 
258        /**
259         * Protected getter for the {@link ItemReader} for subclasses to use.
260         * @return the itemReader
261         */
262        protected ItemReader<? extends T> getItemReader() {
263                return itemReader;
264        }
265 
266        /**
267         * Protected getter for the {@link ItemWriter} for subclasses to use
268         * @return the itemWriter
269         */
270        protected ItemWriter<? super S> getItemWriter() {
271                return itemWriter;
272        }
273 
274        /**
275         * Protected getter for the {@link ItemProcessor} for subclasses to use
276         * @return the itemProcessor
277         */
278        protected ItemProcessor<? super T, ? extends S> getItemProcessor() {
279                return itemProcessor;
280        }
281 
282        /**
283         * Public setter for {@link JobRepository}.
284         * 
285         * @param jobRepository is a mandatory dependence (no default).
286         */
287        public void setJobRepository(JobRepository jobRepository) {
288                this.jobRepository = jobRepository;
289        }
290 
291        /**
292         * Public setter for the {@link PlatformTransactionManager}.
293         * 
294         * @param transactionManager the transaction manager to set
295         */
296        public void setTransactionManager(PlatformTransactionManager transactionManager) {
297                this.transactionManager = transactionManager;
298        }
299 
300        /**
301         * Getter for the {@link TransactionAttribute} for subclasses only.
302         * @return the transactionAttribute
303         */
304        protected TransactionAttribute getTransactionAttribute() {
305 
306                DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
307                attribute.setPropagationBehavior(propagation.value());
308                attribute.setIsolationLevel(isolation.value());
309                attribute.setTimeout(transactionTimeout);
310                return new DefaultTransactionAttribute(attribute) {
311 
312                        /**
313                         * Ignore the default behaviour and rollback on all exceptions that
314                         * bubble up to the tasklet level. The tasklet has to deal with the
315                         * rollback rules internally.
316                         */
317                        @Override
318                        public boolean rollbackOn(Throwable ex) {
319                                return true;
320                        }
321 
322                };
323 
324        }
325 
326        /**
327         * Create a {@link Step} from the configuration provided.
328         * 
329         * @see FactoryBean#getObject()
330         */
331        public final Object getObject() throws Exception {
332                TaskletStep step = new TaskletStep(getName());
333                applyConfiguration(step);
334                step.afterPropertiesSet();
335                return step;
336        }
337 
338        public Class<TaskletStep> getObjectType() {
339                return TaskletStep.class;
340        }
341 
342        /**
343         * Returns true by default, but in most cases a {@link Step} should not be
344         * treated as thread safe. Clients are recommended to create a new step for
345         * each job execution.
346         * 
347         * @see org.springframework.beans.factory.FactoryBean#isSingleton()
348         */
349        public boolean isSingleton() {
350                return this.singleton;
351        }
352 
353        /**
354         * Public setter for the singleton flag.
355         * @param singleton the value to set. Defaults to true.
356         */
357        public void setSingleton(boolean singleton) {
358                this.singleton = singleton;
359        }
360 
361        /**
362         * Set the commit interval. Either set this or the chunkCompletionPolicy but
363         * not both.
364         * 
365         * @param commitInterval 1 by default
366         */
367        public void setCommitInterval(int commitInterval) {
368                this.commitInterval = commitInterval;
369        }
370 
371        /**
372         * Public setter for the {@link CompletionPolicy} applying to the chunk
373         * level. A transaction will be committed when this policy decides to
374         * complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size
375         * equal to the commitInterval property.
376         * 
377         * @param chunkCompletionPolicy the chunkCompletionPolicy to set
378         */
379        public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) {
380                this.chunkCompletionPolicy = chunkCompletionPolicy;
381        }
382 
383        /**
384         * Protected getter for the step operations to make them available in
385         * subclasses.
386         * @return the step operations
387         */
388        protected RepeatOperations getStepOperations() {
389                return stepOperations;
390        }
391 
392        /**
393         * Public setter for the stepOperations.
394         * @param stepOperations the stepOperations to set
395         */
396        public void setStepOperations(RepeatOperations stepOperations) {
397                this.stepOperations = stepOperations;
398        }
399 
400        /**
401         * Public setter for the chunkOperations.
402         * @param chunkOperations the chunkOperations to set
403         */
404        public void setChunkOperations(RepeatOperations chunkOperations) {
405                this.chunkOperations = chunkOperations;
406        }
407 
408        /**
409         * Protected getter for the chunk operations to make them available in
410         * subclasses.
411         * @return the step operations
412         */
413        protected RepeatOperations getChunkOperations() {
414                return chunkOperations;
415        }
416 
417        /**
418         * Public setter for the {@link ExceptionHandler}.
419         * @param exceptionHandler the exceptionHandler to set
420         */
421        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
422                this.exceptionHandler = exceptionHandler;
423        }
424 
425        /**
426         * Protected getter for the {@link ExceptionHandler}.
427         * @return the {@link ExceptionHandler}
428         */
429        protected ExceptionHandler getExceptionHandler() {
430                return exceptionHandler;
431        }
432 
433        /**
434         * Public setter for the {@link TaskExecutor}. If this is set, then it will
435         * be used to execute the chunk processing inside the {@link Step}.
436         * 
437         * @param taskExecutor the taskExecutor to set
438         */
439        public void setTaskExecutor(TaskExecutor taskExecutor) {
440                this.taskExecutor = taskExecutor;
441        }
442 
443        /**
444         * Mkae the {@link TaskExecutor} available to subclasses
445         * @return the taskExecutor to be used to execute chunks
446         */
447        protected TaskExecutor getTaskExecutor() {
448                return taskExecutor;
449        }
450 
451        /**
452         * Public setter for the throttle limit. This limits the number of tasks
453         * queued for concurrent processing to prevent thread pools from being
454         * overwhelmed. Defaults to
455         * {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}.
456         * @param throttleLimit the throttle limit to set.
457         */
458        public void setThrottleLimit(int throttleLimit) {
459                this.throttleLimit = throttleLimit;
460        }
461 
462        /**
463         * @param step
464         * 
465         */
466        protected void applyConfiguration(TaskletStep step) {
467 
468                Assert.notNull(getItemReader(), "ItemReader must be provided");
469                Assert.notNull(getItemWriter(), "ItemWriter must be provided");
470                Assert.notNull(transactionManager, "TransactionManager must be provided");
471 
472                step.setTransactionManager(transactionManager);
473                step.setTransactionAttribute(getTransactionAttribute());
474                step.setJobRepository(jobRepository);
475                step.setStartLimit(startLimit);
476                step.setAllowStartIfComplete(allowStartIfComplete);
477 
478                registerStreams(step, streams);
479 
480                if (chunkOperations == null) {
481                        RepeatTemplate repeatTemplate = new RepeatTemplate();
482                        repeatTemplate.setCompletionPolicy(getChunkCompletionPolicy());
483                        chunkOperations = repeatTemplate;
484                }
485 
486                if (stepOperations == null) {
487 
488                        stepOperations = new RepeatTemplate();
489 
490                        if (taskExecutor != null) {
491                                TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
492                                repeatTemplate.setTaskExecutor(taskExecutor);
493                                repeatTemplate.setThrottleLimit(throttleLimit);
494                                stepOperations = repeatTemplate;
495                        }
496 
497                        ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);
498 
499                }
500 
501                step.setStepOperations(stepOperations);
502 
503                SimpleChunkProvider<T> chunkProvider = configureChunkProvider();
504 
505                SimpleChunkProcessor<T, S> chunkProcessor = configureChunkProcessor();
506 
507                registerItemListeners(chunkProvider, chunkProcessor);
508                registerStepListeners(step, chunkOperations);
509                registerStreams(step, itemReader, itemProcessor, itemWriter);
510 
511                ChunkOrientedTasklet<T> tasklet = new ChunkOrientedTasklet<T>(chunkProvider, chunkProcessor);
512                tasklet.setBuffering(!isReaderTransactionalQueue());
513 
514                step.setTasklet(tasklet);
515 
516        }
517 
518        /**
519         * Register the streams with the step.
520         * @param step the {@link TaskletStep}
521         * @param streams the streams to register
522         */
523        protected void registerStreams(TaskletStep step, ItemStream[] streams) {
524                step.setStreams(streams);
525        }
526 
527        /**
528         * Extension point for creating appropriate {@link ChunkProvider}. Return
529         * value must subclass {@link SimpleChunkProvider} due to listener
530         * registration.
531         */
532        protected SimpleChunkProvider<T> configureChunkProvider() {
533                return new SimpleChunkProvider<T>(itemReader, chunkOperations);
534        }
535 
536        /**
537         * Extension point for creating appropriate {@link ChunkProcessor}. Return
538         * value must subclass {@link SimpleChunkProcessor} due to listener
539         * registration.
540         */
541        protected SimpleChunkProcessor<T, S> configureChunkProcessor() {
542                return new SimpleChunkProcessor<T, S>(itemProcessor, itemWriter);
543        }
544 
545        /**
546         * @return a {@link CompletionPolicy} consistent with the commit interval
547         * and injected policy (if present).
548         */
549        private CompletionPolicy getChunkCompletionPolicy() {
550                Assert.state(!(chunkCompletionPolicy != null && commitInterval != 0),
551                                "You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
552                Assert.state(commitInterval >= 0, "The commitInterval must be positive or zero (for default value).");
553 
554                if (chunkCompletionPolicy != null) {
555                        return chunkCompletionPolicy;
556                }
557                if (commitInterval == 0) {
558                        logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")");
559                        commitInterval = DEFAULT_COMMIT_INTERVAL;
560                }
561                return new SimpleCompletionPolicy(commitInterval);
562        }
563 
564        private void registerStreams(TaskletStep step, ItemReader<? extends T> itemReader,
565                        ItemProcessor<? super T, ? extends S> itemProcessor, ItemWriter<? super S> itemWriter) {
566                for (Object itemHandler : new Object[] { itemReader, itemWriter, itemProcessor }) {
567                        if (itemHandler instanceof ItemStream) {
568                                registerStreams(step, new ItemStream[] { (ItemStream) itemHandler });
569                        }
570                }
571        }
572 
573        /**
574         * Register listeners with step and chunk.
575         */
576        private void registerStepListeners(TaskletStep step, RepeatOperations chunkOperations) {
577 
578                for (Object itemHandler : new Object[] { getItemReader(), itemWriter, itemProcessor }) {
579                        if (StepListenerFactoryBean.isListener(itemHandler)) {
580                                StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
581                                if (listener instanceof StepExecutionListener) {
582                                        step.registerStepExecutionListener((StepExecutionListener) listener);
583                                }
584                                if (listener instanceof ChunkListener) {
585                                        step.registerChunkListener((ChunkListener) listener);
586                                }
587                        }
588                }
589 
590                step.setStepExecutionListeners(BatchListenerFactoryHelper.getListeners(listeners, StepExecutionListener.class)
591                                .toArray(new StepExecutionListener[] {}));
592                step.setChunkListeners(BatchListenerFactoryHelper.getListeners(listeners, ChunkListener.class).toArray(
593                                new ChunkListener[] {}));
594        }
595 
596        /**
597         * Register explicitly set ({@link #setListeners(StepListener[])}) item
598         * listeners and auto-register reader, processor and writer if applicable
599         */
600        private void registerItemListeners(SimpleChunkProvider<T> chunkProvider, SimpleChunkProcessor<T, S> chunkProcessor) {
601 
602                // explicitly set item listeners
603                chunkProvider.setListeners(BatchListenerFactoryHelper.<ItemReadListener<T>> getListeners(getListeners(),
604                                ItemReadListener.class));
605                chunkProvider.setListeners(BatchListenerFactoryHelper.<SkipListener<T, S>> getListeners(getListeners(),
606                                SkipListener.class));
607 
608                chunkProcessor.setListeners(BatchListenerFactoryHelper.<ItemProcessListener<T, S>> getListeners(getListeners(),
609                                ItemProcessListener.class));
610                chunkProcessor.setListeners(BatchListenerFactoryHelper.<ItemWriteListener<S>> getListeners(getListeners(),
611                                ItemWriteListener.class));
612                chunkProcessor.setListeners(BatchListenerFactoryHelper.<SkipListener<T, S>> getListeners(getListeners(),
613                                SkipListener.class));
614 
615                // auto-register reader, processor and writer
616                for (Object itemHandler : new Object[] { getItemReader(), getItemWriter(), getItemProcessor() }) {
617 
618                        if (StepListenerFactoryBean.isListener(itemHandler)) {
619                                StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
620                                if (listener instanceof SkipListener) {
621                                        chunkProvider.registerListener(listener);
622                                        chunkProcessor.registerListener(listener);
623                                        // already registered with both so avoid double-registering
624                                        continue;
625                                }
626                                if (listener instanceof ItemReadListener) {
627                                        chunkProvider.registerListener(listener);
628                                }
629                                if (listener instanceof ItemProcessListener || listener instanceof ItemWriteListener) {
630                                        chunkProcessor.registerListener(listener);
631                                }
632                        }
633                }
634        }
635 
636}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov