EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.core.step.item]

COVERAGE SUMMARY FOR SOURCE FILE [SimpleStepFactoryBean.java]

nameclass, %method, %block, %line, %
SimpleStepFactoryBean.java100% (2/2)96%  (47/49)97%  (636/653)97%  (157.4/163)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class SimpleStepFactoryBean100% (1/1)96%  (45/47)97%  (627/644)97%  (156.4/162)
setChunkOperations (RepeatOperations): void 0%   (0/1)0%   (0/4)0%   (0/2)
setSingleton (boolean): void 0%   (0/1)0%   (0/4)0%   (0/2)
registerStepListeners (TaskletStep, RepeatOperations): void 100% (1/1)95%  (79/83)92%  (11/12)
registerItemListeners (SimpleChunkProvider, SimpleChunkProcessor): void 100% (1/1)97%  (97/100)98%  (20.5/21)
applyConfiguration (TaskletStep): void 100% (1/1)99%  (142/144)100% (30.9/31)
SimpleStepFactoryBean (): void 100% (1/1)100% (45/45)100% (14/14)
configureChunkProcessor (): SimpleChunkProcessor 100% (1/1)100% (8/8)100% (1/1)
configureChunkProvider (): SimpleChunkProvider 100% (1/1)100% (8/8)100% (1/1)
getChunkCompletionPolicy (): CompletionPolicy 100% (1/1)100% (41/41)100% (8/8)
getChunkOperations (): RepeatOperations 100% (1/1)100% (3/3)100% (1/1)
getCommitInterval (): int 100% (1/1)100% (3/3)100% (1/1)
getExceptionHandler (): ExceptionHandler 100% (1/1)100% (3/3)100% (1/1)
getItemProcessor (): ItemProcessor 100% (1/1)100% (3/3)100% (1/1)
getItemReader (): ItemReader 100% (1/1)100% (3/3)100% (1/1)
getItemWriter (): ItemWriter 100% (1/1)100% (3/3)100% (1/1)
getListeners (): StepListener [] 100% (1/1)100% (3/3)100% (1/1)
getName (): String 100% (1/1)100% (3/3)100% (1/1)
getObject (): Object 100% (1/1)100% (13/13)100% (4/4)
getObjectType (): Class 100% (1/1)100% (2/2)100% (1/1)
getStepOperations (): RepeatOperations 100% (1/1)100% (3/3)100% (1/1)
getTaskExecutor (): TaskExecutor 100% (1/1)100% (3/3)100% (1/1)
getTransactionAttribute (): TransactionAttribute 100% (1/1)100% (24/24)100% (5/5)
isReaderTransactionalQueue (): boolean 100% (1/1)100% (3/3)100% (1/1)
isSingleton (): boolean 100% (1/1)100% (3/3)100% (1/1)
registerChunkListeners (TaskletStep, StepListener): void 100% (1/1)100% (5/5)100% (2/2)
registerStreams (TaskletStep, ItemReader, ItemProcessor, ItemWriter): void 100% (1/1)100% (43/43)100% (4/4)
registerStreams (TaskletStep, ItemStream []): void 100% (1/1)100% (4/4)100% (2/2)
setAllowStartIfComplete (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setBeanName (String): void 100% (1/1)100% (4/4)100% (2/2)
setChunkCompletionPolicy (CompletionPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setCommitInterval (int): void 100% (1/1)100% (4/4)100% (2/2)
setExceptionHandler (ExceptionHandler): void 100% (1/1)100% (4/4)100% (2/2)
setIsReaderTransactionalQueue (boolean): void 100% (1/1)100% (4/4)100% (2/2)
setIsolation (Isolation): void 100% (1/1)100% (4/4)100% (2/2)
setItemProcessor (ItemProcessor): void 100% (1/1)100% (4/4)100% (2/2)
setItemReader (ItemReader): void 100% (1/1)100% (4/4)100% (2/2)
setItemWriter (ItemWriter): void 100% (1/1)100% (4/4)100% (2/2)
setJobRepository (JobRepository): void 100% (1/1)100% (4/4)100% (2/2)
setListeners (StepListener []): void 100% (1/1)100% (4/4)100% (2/2)
setPropagation (Propagation): void 100% (1/1)100% (4/4)100% (2/2)
setStartLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
setStepOperations (RepeatOperations): void 100% (1/1)100% (4/4)100% (2/2)
setStreams (ItemStream []): void 100% (1/1)100% (4/4)100% (2/2)
setTaskExecutor (TaskExecutor): void 100% (1/1)100% (4/4)100% (2/2)
setThrottleLimit (int): void 100% (1/1)100% (4/4)100% (2/2)
setTransactionManager (PlatformTransactionManager): void 100% (1/1)100% (4/4)100% (2/2)
setTransactionTimeout (int): void 100% (1/1)100% (4/4)100% (2/2)
     
class SimpleStepFactoryBean$1100% (1/1)100% (2/2)100% (9/9)100% (2/2)
SimpleStepFactoryBean$1 (SimpleStepFactoryBean, TransactionAttribute): void 100% (1/1)100% (7/7)100% (1/1)
rollbackOn (Throwable): boolean 100% (1/1)100% (2/2)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.item;
17 
18import java.util.Arrays;
19import java.util.List;
20 
21import org.apache.commons.logging.Log;
22import org.apache.commons.logging.LogFactory;
23import org.springframework.batch.core.ChunkListener;
24import org.springframework.batch.core.ItemProcessListener;
25import org.springframework.batch.core.ItemReadListener;
26import org.springframework.batch.core.ItemWriteListener;
27import org.springframework.batch.core.SkipListener;
28import org.springframework.batch.core.Step;
29import org.springframework.batch.core.StepExecutionListener;
30import org.springframework.batch.core.StepListener;
31import org.springframework.batch.core.listener.StepListenerFactoryBean;
32import org.springframework.batch.core.repository.JobRepository;
33import org.springframework.batch.core.step.tasklet.TaskletStep;
34import org.springframework.batch.item.ItemProcessor;
35import org.springframework.batch.item.ItemReader;
36import org.springframework.batch.item.ItemStream;
37import org.springframework.batch.item.ItemWriter;
38import org.springframework.batch.repeat.CompletionPolicy;
39import org.springframework.batch.repeat.RepeatOperations;
40import org.springframework.batch.repeat.exception.DefaultExceptionHandler;
41import org.springframework.batch.repeat.exception.ExceptionHandler;
42import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
43import org.springframework.batch.repeat.support.RepeatTemplate;
44import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
45import org.springframework.beans.factory.BeanNameAware;
46import org.springframework.beans.factory.FactoryBean;
47import org.springframework.core.task.TaskExecutor;
48import org.springframework.transaction.PlatformTransactionManager;
49import org.springframework.transaction.annotation.Isolation;
50import org.springframework.transaction.annotation.Propagation;
51import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
52import org.springframework.transaction.interceptor.TransactionAttribute;
53import org.springframework.util.Assert;
54 
55/**
56 * Most common configuration options for simple steps should be found here. Use
57 * this factory bean instead of creating a {@link Step} implementation manually.
58 * 
59 * This factory does not support configuration of fault-tolerant behavior, use
60 * appropriate subclass of this factory bean to configure skip or retry.
61 * 
62 * @see FaultTolerantStepFactoryBean
63 * 
64 * @author Dave Syer
65 * @author Robert Kasanicky
66 * 
67 */
68public class SimpleStepFactoryBean<T, S> implements FactoryBean, BeanNameAware {
69 
70        private static final int DEFAULT_COMMIT_INTERVAL = 1;
71 
72        private String name;
73 
74        private int startLimit = Integer.MAX_VALUE;
75 
76        private boolean allowStartIfComplete;
77 
78        private ItemReader<? extends T> itemReader;
79 
80        private ItemProcessor<? super T, ? extends S> itemProcessor;
81 
82        private ItemWriter<? super S> itemWriter;
83 
84        private PlatformTransactionManager transactionManager;
85 
86        private Propagation propagation = Propagation.REQUIRED;
87 
88        private Isolation isolation = Isolation.DEFAULT;
89 
90        private int transactionTimeout = DefaultTransactionAttribute.TIMEOUT_DEFAULT;
91 
92        private JobRepository jobRepository;
93 
94        private boolean singleton = true;
95 
96        private ItemStream[] streams = new ItemStream[0];
97 
98        private StepListener[] listeners = new StepListener[0];
99 
100        protected final Log logger = LogFactory.getLog(getClass());
101 
102        private int commitInterval = 0;
103 
104        private TaskExecutor taskExecutor;
105 
106        private RepeatOperations stepOperations;
107 
108        private RepeatOperations chunkOperations;
109 
110        private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();
111 
112        private CompletionPolicy chunkCompletionPolicy;
113 
114        private int throttleLimit = TaskExecutorRepeatTemplate.DEFAULT_THROTTLE_LIMIT;
115 
116        private boolean isReaderTransactionalQueue = false;
117 
118        /**
119         * Default constructor for {@link SimpleStepFactoryBean}.
120         */
121        public SimpleStepFactoryBean() {
122                super();
123        }
124 
125        /**
126         * Flag to signal that the reader is transactional (usually a JMS consumer)
127         * so that items are re-presented after a rollback. The default is false and
128         * readers are assumed to be forward-only.
129         * 
130         * @param isReaderTransactionalQueue the value of the flag
131         */
132        public void setIsReaderTransactionalQueue(boolean isReaderTransactionalQueue) {
133                this.isReaderTransactionalQueue = isReaderTransactionalQueue;
134        }
135 
136        /**
137         * Convenience method for subclasses.
138         * @return true if the flag is set (default false)
139         */
140        protected boolean isReaderTransactionalQueue() {
141                return isReaderTransactionalQueue;
142        }
143 
144        /**
145         * Set the bean name property, which will become the name of the
146         * {@link Step} when it is created.
147         * 
148         * @see org.springframework.beans.factory.BeanNameAware#setBeanName(java.lang.String)
149         */
150        public void setBeanName(String name) {
151                this.name = name;
152        }
153 
154        /**
155         * Public getter for the name of the step.
156         * @return the name
157         */
158        public String getName() {
159                return name;
160        }
161 
162        /**
163         * The timeout for an individual transaction in the step.
164         * 
165         * @param transactionTimeout the transaction timeout to set, defaults to
166         * infinite
167         */
168        public void setTransactionTimeout(int transactionTimeout) {
169                this.transactionTimeout = transactionTimeout;
170        }
171 
172        /**
173         * @param propagation the propagation to set for business transactions
174         */
175        public void setPropagation(Propagation propagation) {
176                this.propagation = propagation;
177        }
178 
179        /**
180         * @param isolation the isolation to set for business transactions
181         */
182        public void setIsolation(Isolation isolation) {
183                this.isolation = isolation;
184        }
185 
186        /**
187         * Public setter for the start limit for the step.
188         * 
189         * @param startLimit the startLimit to set
190         */
191        public void setStartLimit(int startLimit) {
192                this.startLimit = startLimit;
193        }
194 
195        /**
196         * Public setter for the flag to indicate that the step should be replayed
197         * on a restart, even if successful the first time.
198         * 
199         * @param allowStartIfComplete the shouldAllowStartIfComplete to set
200         */
201        public void setAllowStartIfComplete(boolean allowStartIfComplete) {
202                this.allowStartIfComplete = allowStartIfComplete;
203        }
204 
205        /**
206         * @param itemReader the {@link ItemReader} to set
207         */
208        public void setItemReader(ItemReader<? extends T> itemReader) {
209                this.itemReader = itemReader;
210        }
211 
212        /**
213         * @param itemWriter the {@link ItemWriter} to set
214         */
215        public void setItemWriter(ItemWriter<? super S> itemWriter) {
216                this.itemWriter = itemWriter;
217        }
218 
219        /**
220         * @param itemProcessor the {@link ItemProcessor} to set
221         */
222        public void setItemProcessor(ItemProcessor<? super T, ? extends S> itemProcessor) {
223                this.itemProcessor = itemProcessor;
224        }
225 
226        /**
227         * The streams to inject into the {@link Step}. Any instance of
228         * {@link ItemStream} can be used, and will then receive callbacks at the
229         * appropriate stage in the step.
230         * 
231         * @param streams an array of listeners
232         */
233        public void setStreams(ItemStream[] streams) {
234                this.streams = streams;
235        }
236 
237        /**
238         * The listeners to inject into the {@link Step}. Any instance of
239         * {@link StepListener} can be used, and will then receive callbacks at the
240         * appropriate stage in the step.
241         * 
242         * @param listeners an array of listeners
243         */
244        public void setListeners(StepListener[] listeners) {
245                this.listeners = listeners;
246        }
247 
248        /**
249         * Protected getter for the {@link StepListener}s.
250         * @return the listeners
251         */
252        protected StepListener[] getListeners() {
253                return listeners;
254        }
255 
256        /**
257         * Protected getter for the {@link ItemReader} for subclasses to use.
258         * @return the itemReader
259         */
260        protected ItemReader<? extends T> getItemReader() {
261                return itemReader;
262        }
263 
264        /**
265         * Protected getter for the {@link ItemWriter} for subclasses to use
266         * @return the itemWriter
267         */
268        protected ItemWriter<? super S> getItemWriter() {
269                return itemWriter;
270        }
271 
272        /**
273         * Protected getter for the {@link ItemProcessor} for subclasses to use
274         * @return the itemProcessor
275         */
276        protected ItemProcessor<? super T, ? extends S> getItemProcessor() {
277                return itemProcessor;
278        }
279 
280        /**
281         * Public setter for {@link JobRepository}.
282         * 
283         * @param jobRepository is a mandatory dependence (no default).
284         */
285        public void setJobRepository(JobRepository jobRepository) {
286                this.jobRepository = jobRepository;
287        }
288 
289        /**
290         * Public setter for the {@link PlatformTransactionManager}.
291         * 
292         * @param transactionManager the transaction manager to set
293         */
294        public void setTransactionManager(PlatformTransactionManager transactionManager) {
295                this.transactionManager = transactionManager;
296        }
297 
298        /**
299         * Getter for the {@link TransactionAttribute} for subclasses only.
300         * @return the transactionAttribute
301         */
302        @SuppressWarnings("serial")
303        protected TransactionAttribute getTransactionAttribute() {
304 
305                DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
306                attribute.setPropagationBehavior(propagation.value());
307                attribute.setIsolationLevel(isolation.value());
308                attribute.setTimeout(transactionTimeout);
309                return new DefaultTransactionAttribute(attribute) {
310 
311                        /**
312                         * Ignore the default behaviour and rollback on all exceptions that
313                         * bubble up to the tasklet level. The tasklet has to deal with the
314                         * rollback rules internally.
315                         */
316                        @Override
317                        public boolean rollbackOn(Throwable ex) {
318                                return true;
319                        }
320 
321                };
322 
323        }
324 
325        /**
326         * Create a {@link Step} from the configuration provided.
327         * 
328         * @see FactoryBean#getObject()
329         */
330        public final Object getObject() throws Exception {
331                TaskletStep step = new TaskletStep(getName());
332                applyConfiguration(step);
333                step.afterPropertiesSet();
334                return step;
335        }
336 
337        public Class<TaskletStep> getObjectType() {
338                return TaskletStep.class;
339        }
340 
341        /**
342         * Returns true by default, but in most cases a {@link Step} should not be
343         * treated as thread safe. Clients are recommended to create a new step for
344         * each job execution.
345         * 
346         * @see org.springframework.beans.factory.FactoryBean#isSingleton()
347         */
348        public boolean isSingleton() {
349                return this.singleton;
350        }
351 
352        /**
353         * Public setter for the singleton flag.
354         * @param singleton the value to set. Defaults to true.
355         */
356        public void setSingleton(boolean singleton) {
357                this.singleton = singleton;
358        }
359 
360        /**
361         * Set the commit interval. Either set this or the chunkCompletionPolicy but
362         * not both.
363         * 
364         * @param commitInterval 1 by default
365         */
366        public void setCommitInterval(int commitInterval) {
367                this.commitInterval = commitInterval;
368        }
369        
370        /**
371         * Accessor for commit interval if needed in sub classes.
372         * 
373         * @return the commit interval
374         */
375        protected int getCommitInterval() {
376                return commitInterval;
377        }
378 
379        /**
380         * Public setter for the {@link CompletionPolicy} applying to the chunk
381         * level. A transaction will be committed when this policy decides to
382         * complete. Defaults to a {@link SimpleCompletionPolicy} with chunk size
383         * equal to the commitInterval property.
384         * 
385         * @param chunkCompletionPolicy the chunkCompletionPolicy to set
386         */
387        public void setChunkCompletionPolicy(CompletionPolicy chunkCompletionPolicy) {
388                this.chunkCompletionPolicy = chunkCompletionPolicy;
389        }
390 
391        /**
392         * Protected getter for the step operations to make them available in
393         * subclasses.
394         * @return the step operations
395         */
396        protected RepeatOperations getStepOperations() {
397                return stepOperations;
398        }
399 
400        /**
401         * Public setter for the stepOperations.
402         * @param stepOperations the stepOperations to set
403         */
404        public void setStepOperations(RepeatOperations stepOperations) {
405                this.stepOperations = stepOperations;
406        }
407 
408        /**
409         * Public setter for the chunkOperations.
410         * @param chunkOperations the chunkOperations to set
411         */
412        public void setChunkOperations(RepeatOperations chunkOperations) {
413                this.chunkOperations = chunkOperations;
414        }
415 
416        /**
417         * Protected getter for the chunk operations to make them available in
418         * subclasses.
419         * @return the step operations
420         */
421        protected RepeatOperations getChunkOperations() {
422                return chunkOperations;
423        }
424 
425        /**
426         * Public setter for the {@link ExceptionHandler}.
427         * @param exceptionHandler the exceptionHandler to set
428         */
429        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
430                this.exceptionHandler = exceptionHandler;
431        }
432 
433        /**
434         * Protected getter for the {@link ExceptionHandler}.
435         * @return the {@link ExceptionHandler}
436         */
437        protected ExceptionHandler getExceptionHandler() {
438                return exceptionHandler;
439        }
440 
441        /**
442         * Public setter for the {@link TaskExecutor}. If this is set, then it will
443         * be used to execute the chunk processing inside the {@link Step}.
444         * 
445         * @param taskExecutor the taskExecutor to set
446         */
447        public void setTaskExecutor(TaskExecutor taskExecutor) {
448                this.taskExecutor = taskExecutor;
449        }
450 
451        /**
452         * Mkae the {@link TaskExecutor} available to subclasses
453         * @return the taskExecutor to be used to execute chunks
454         */
455        protected TaskExecutor getTaskExecutor() {
456                return taskExecutor;
457        }
458 
459        /**
460         * Public setter for the throttle limit. This limits the number of tasks
461         * queued for concurrent processing to prevent thread pools from being
462         * overwhelmed. Defaults to
463         * {@link TaskExecutorRepeatTemplate#DEFAULT_THROTTLE_LIMIT}.
464         * @param throttleLimit the throttle limit to set.
465         */
466        public void setThrottleLimit(int throttleLimit) {
467                this.throttleLimit = throttleLimit;
468        }
469 
470        /**
471         * @param step
472         * 
473         */
474        protected void applyConfiguration(TaskletStep step) {
475 
476                Assert.state(getItemReader()!=null, "ItemReader must be provided");
477                Assert.state(getItemWriter()!=null || getItemProcessor()!=null, "ItemWriter or ItemProcessor must be provided");
478                Assert.state(transactionManager!=null, "TransactionManager must be provided");
479 
480                step.setTransactionManager(transactionManager);
481                step.setTransactionAttribute(getTransactionAttribute());
482                step.setJobRepository(jobRepository);
483                step.setStartLimit(startLimit);
484                step.setAllowStartIfComplete(allowStartIfComplete);
485 
486                registerStreams(step, streams);
487 
488                if (chunkOperations == null) {
489                        RepeatTemplate repeatTemplate = new RepeatTemplate();
490                        repeatTemplate.setCompletionPolicy(getChunkCompletionPolicy());
491                        chunkOperations = repeatTemplate;
492                }
493 
494                if (stepOperations == null) {
495 
496                        stepOperations = new RepeatTemplate();
497 
498                        if (taskExecutor != null) {
499                                TaskExecutorRepeatTemplate repeatTemplate = new TaskExecutorRepeatTemplate();
500                                repeatTemplate.setTaskExecutor(taskExecutor);
501                                repeatTemplate.setThrottleLimit(throttleLimit);
502                                stepOperations = repeatTemplate;
503                        }
504 
505                        ((RepeatTemplate) stepOperations).setExceptionHandler(exceptionHandler);
506 
507                }
508 
509                step.setStepOperations(stepOperations);
510 
511                SimpleChunkProvider<T> chunkProvider = configureChunkProvider();
512 
513                SimpleChunkProcessor<T, S> chunkProcessor = configureChunkProcessor();
514 
515                registerItemListeners(chunkProvider, chunkProcessor);
516                registerStepListeners(step, chunkOperations);
517                registerStreams(step, itemReader, itemProcessor, itemWriter);
518 
519                ChunkOrientedTasklet<T> tasklet = new ChunkOrientedTasklet<T>(chunkProvider, chunkProcessor);
520                tasklet.setBuffering(!isReaderTransactionalQueue());
521 
522                step.setTasklet(tasklet);
523 
524        }
525 
526        /**
527         * Register the streams with the step.
528         * @param step the {@link TaskletStep}
529         * @param streams the streams to register
530         */
531        protected void registerStreams(TaskletStep step, ItemStream[] streams) {
532                step.setStreams(streams);
533        }
534 
535        /**
536         * Extension point for creating appropriate {@link ChunkProvider}. Return
537         * value must subclass {@link SimpleChunkProvider} due to listener
538         * registration.
539         */
540        protected SimpleChunkProvider<T> configureChunkProvider() {
541                return new SimpleChunkProvider<T>(itemReader, chunkOperations);
542        }
543 
544        /**
545         * Extension point for creating appropriate {@link ChunkProcessor}. Return
546         * value must subclass {@link SimpleChunkProcessor} due to listener
547         * registration.
548         */
549        protected SimpleChunkProcessor<T, S> configureChunkProcessor() {
550                return new SimpleChunkProcessor<T, S>(itemProcessor, itemWriter);
551        }
552 
553        /**
554         * @return a {@link CompletionPolicy} consistent with the commit interval
555         * and injected policy (if present).
556         */
557        private CompletionPolicy getChunkCompletionPolicy() {
558                Assert.state(!(chunkCompletionPolicy != null && commitInterval != 0),
559                                "You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
560                Assert.state(commitInterval >= 0, "The commitInterval must be positive or zero (for default value).");
561 
562                if (chunkCompletionPolicy != null) {
563                        return chunkCompletionPolicy;
564                }
565                if (commitInterval == 0) {
566                        logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")");
567                        commitInterval = DEFAULT_COMMIT_INTERVAL;
568                }
569                return new SimpleCompletionPolicy(commitInterval);
570        }
571 
572        private void registerStreams(TaskletStep step, ItemReader<? extends T> itemReader,
573                        ItemProcessor<? super T, ? extends S> itemProcessor, ItemWriter<? super S> itemWriter) {
574                for (Object itemHandler : new Object[] { itemReader, itemWriter, itemProcessor }) {
575                        if (itemHandler instanceof ItemStream) {
576                                registerStreams(step, new ItemStream[] { (ItemStream) itemHandler });
577                        }
578                }
579        }
580 
581        /**
582         * Register listeners with step and chunk.
583         */
584        private void registerStepListeners(TaskletStep step, RepeatOperations chunkOperations) {
585 
586                for (Object itemHandler : new Object[] { getItemReader(), itemWriter, itemProcessor }) {
587                        if (StepListenerFactoryBean.isListener(itemHandler)) {
588                                StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
589                                if (listener instanceof StepExecutionListener) {
590                                        step.registerStepExecutionListener((StepExecutionListener) listener);
591                                }
592                                if (listener instanceof ChunkListener) {
593                                        registerChunkListeners(step, listener);
594                                }
595                        }
596                }
597 
598                step.setStepExecutionListeners(BatchListenerFactoryHelper.getListeners(listeners, StepExecutionListener.class)
599                                .toArray(new StepExecutionListener[] {}));
600                
601                List<ChunkListener> chunkListeners = BatchListenerFactoryHelper.getListeners(listeners, ChunkListener.class);
602                for(ChunkListener chunkListener: chunkListeners){
603                        registerChunkListeners(step,chunkListener);
604                }
605        }
606 
607        protected void registerChunkListeners(TaskletStep step, StepListener listener) {
608                step.registerChunkListener((ChunkListener) listener);
609        }
610 
611        /**
612         * Register explicitly set ({@link #setListeners(StepListener[])}) item
613         * listeners and auto-register reader, processor and writer if applicable
614         */
615        private void registerItemListeners(SimpleChunkProvider<T> chunkProvider, SimpleChunkProcessor<T, S> chunkProcessor) {
616 
617                StepListener[] listeners = getListeners();
618 
619                // explicitly set item listeners
620                chunkProvider.setListeners(BatchListenerFactoryHelper.<ItemReadListener<T>> getListeners(listeners,
621                                ItemReadListener.class));
622                chunkProvider.setListeners(BatchListenerFactoryHelper.<SkipListener<T, S>> getListeners(listeners,
623                                SkipListener.class));
624 
625                chunkProcessor.setListeners(BatchListenerFactoryHelper.<ItemProcessListener<T, S>> getListeners(listeners,
626                                ItemProcessListener.class));
627                chunkProcessor.setListeners(BatchListenerFactoryHelper.<ItemWriteListener<S>> getListeners(listeners,
628                                ItemWriteListener.class));
629                chunkProcessor.setListeners(BatchListenerFactoryHelper.<SkipListener<T, S>> getListeners(listeners,
630                                SkipListener.class));
631 
632                List<StepListener> listofListeners = Arrays.asList(listeners);
633                // auto-register reader, processor and writer
634                for (Object itemHandler : new Object[] { getItemReader(), getItemWriter(), getItemProcessor() }) {
635 
636                        if (listofListeners.contains(itemHandler)) {
637                                continue;
638                        }
639 
640                        if (StepListenerFactoryBean.isListener(itemHandler)) {
641                                StepListener listener = StepListenerFactoryBean.getListener(itemHandler);
642                                if (listener instanceof SkipListener<?,?>) {
643                                        chunkProvider.registerListener(listener);
644                                        chunkProcessor.registerListener(listener);
645                                        // already registered with both so avoid double-registering
646                                        continue;
647                                }
648                                if (listener instanceof ItemReadListener<?>) {
649                                        chunkProvider.registerListener(listener);
650                                }
651                                if (listener instanceof ItemProcessListener<?,?> || listener instanceof ItemWriteListener<?>) {
652                                        chunkProcessor.registerListener(listener);
653                                }
654                        }
655                }
656        }
657 
658}

[all classes][org.springframework.batch.core.step.item]
EMMA 2.0.5312 (C) Vladimir Roubtsov