EMMA Coverage Report (generated Fri Aug 21 15:59:46 BST 2009)
[all classes][org.springframework.batch.core.step.tasklet]

COVERAGE SUMMARY FOR SOURCE FILE [TaskletStep.java]

nameclass, %method, %block, %line, %
TaskletStep.java100% (3/3)100% (31/31)98%  (448/459)97%  (108/111)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TaskletStep$2100% (1/1)100% (2/2)96%  (258/269)95%  (57/60)
doInChunkContext (RepeatContext, ChunkContext): RepeatStatus 100% (1/1)96%  (251/262)95%  (55/58)
TaskletStep$2 (TaskletStep, StepExecution): void 100% (1/1)100% (7/7)100% (2/2)
     
class TaskletStep100% (1/1)100% (27/27)100% (182/182)100% (52/52)
<static initializer> 100% (1/1)100% (4/4)100% (2/2)
TaskletStep (): void 100% (1/1)100% (4/4)100% (2/2)
TaskletStep (String): void 100% (1/1)100% (36/36)100% (8/8)
access$0 (TaskletStep): StepInterruptionPolicy 100% (1/1)100% (3/3)100% (1/1)
access$1 (TaskletStep): PlatformTransactionManager 100% (1/1)100% (3/3)100% (1/1)
access$2 (TaskletStep): TransactionAttribute 100% (1/1)100% (3/3)100% (1/1)
access$3 (TaskletStep): CompositeChunkListener 100% (1/1)100% (3/3)100% (1/1)
access$4 (TaskletStep): Semaphore 100% (1/1)100% (3/3)100% (1/1)
access$5 (): Log 100% (1/1)100% (2/2)100% (1/1)
access$6 (TaskletStep): Tasklet 100% (1/1)100% (3/3)100% (1/1)
access$7 (TaskletStep): CompositeItemStream 100% (1/1)100% (3/3)100% (1/1)
access$8 (TaskletStep): JobRepository 100% (1/1)100% (3/3)100% (1/1)
access$9 (TaskletStep, StepExecution, TransactionStatus): void 100% (1/1)100% (5/5)100% (1/1)
afterPropertiesSet (): void 100% (1/1)100% (7/7)100% (3/3)
close (ExecutionContext): void 100% (1/1)100% (4/4)100% (2/2)
doExecute (StepExecution): void 100% (1/1)100% (19/19)100% (4/4)
open (ExecutionContext): void 100% (1/1)100% (5/5)100% (2/2)
registerChunkListener (ChunkListener): void 100% (1/1)100% (5/5)100% (2/2)
registerStream (ItemStream): void 100% (1/1)100% (5/5)100% (2/2)
rollback (StepExecution, TransactionStatus): void 100% (1/1)100% (7/7)100% (3/3)
setChunkListeners (ChunkListener []): void 100% (1/1)100% (14/14)100% (3/3)
setInterruptionPolicy (StepInterruptionPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setStepOperations (RepeatOperations): void 100% (1/1)100% (4/4)100% (2/2)
setStreams (ItemStream []): void 100% (1/1)100% (14/14)100% (3/3)
setTasklet (Tasklet): void 100% (1/1)100% (11/11)100% (4/4)
setTransactionAttribute (TransactionAttribute): void 100% (1/1)100% (4/4)100% (2/2)
setTransactionManager (PlatformTransactionManager): void 100% (1/1)100% (4/4)100% (2/2)
     
class TaskletStep$1100% (1/1)100% (2/2)100% (8/8)100% (3/3)
TaskletStep$1 (TaskletStep): void 100% (1/1)100% (6/6)100% (2/2)
rollbackOn (Throwable): boolean 100% (1/1)100% (2/2)100% (1/1)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.tasklet;
17 
18import java.util.concurrent.Semaphore;
19 
20import org.apache.commons.logging.Log;
21import org.apache.commons.logging.LogFactory;
22import org.springframework.batch.core.BatchStatus;
23import org.springframework.batch.core.ChunkListener;
24import org.springframework.batch.core.JobInterruptedException;
25import org.springframework.batch.core.StepContribution;
26import org.springframework.batch.core.StepExecution;
27import org.springframework.batch.core.StepExecutionListener;
28import org.springframework.batch.core.listener.CompositeChunkListener;
29import org.springframework.batch.core.repository.JobRepository;
30import org.springframework.batch.core.scope.context.ChunkContext;
31import org.springframework.batch.core.scope.context.StepContextRepeatCallback;
32import org.springframework.batch.core.step.AbstractStep;
33import org.springframework.batch.core.step.StepInterruptionPolicy;
34import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
35import org.springframework.batch.item.ExecutionContext;
36import org.springframework.batch.item.ItemReader;
37import org.springframework.batch.item.ItemStream;
38import org.springframework.batch.item.ItemWriter;
39import org.springframework.batch.item.support.CompositeItemStream;
40import org.springframework.batch.repeat.RepeatContext;
41import org.springframework.batch.repeat.RepeatOperations;
42import org.springframework.batch.repeat.RepeatStatus;
43import org.springframework.batch.repeat.support.RepeatTemplate;
44import org.springframework.transaction.PlatformTransactionManager;
45import org.springframework.transaction.TransactionStatus;
46import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
47import org.springframework.transaction.interceptor.TransactionAttribute;
48import org.springframework.util.Assert;
49 
50/**
51 * Simple implementation of executing the step as a call to a {@link Tasklet},
52 * possibly repeated, and each call surrounded by a transaction. The structure
53 * is therefore that of a loop with transaction boundary inside the loop. The
54 * loop is controlled by the step operations (
55 * {@link #setStepOperations(RepeatOperations)}).<br/>
56 * <br/>
57 * 
58 * Clients can use interceptors in the step operations to intercept or listen to
59 * the iteration on a step-wide basis, for instance to get a callback when the
60 * step is complete. Those that want callbacks at the level of an individual
61 * tasks, can specify interceptors for the chunk operations.
62 * 
63 * @author Dave Syer
64 * @author Lucas Ward
65 * @author Ben Hale
66 * @author Robert Kasanicky
67 */
68public class TaskletStep extends AbstractStep {
69 
70        private static final Log logger = LogFactory.getLog(TaskletStep.class);
71 
72        private RepeatOperations stepOperations = new RepeatTemplate();
73 
74        private CompositeChunkListener chunkListener = new CompositeChunkListener();
75 
76        // default to checking current thread for interruption.
77        private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
78 
79        private CompositeItemStream stream = new CompositeItemStream();
80 
81        private PlatformTransactionManager transactionManager;
82 
83        private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
84 
85                @Override
86                public boolean rollbackOn(Throwable ex) {
87                        return true;
88                }
89 
90        };
91 
92        private Tasklet tasklet;
93 
94        private Semaphore semaphore = new Semaphore(1);
95 
96        /**
97         * Default constructor.
98         */
99        public TaskletStep() {
100                this(null);
101        }
102 
103        /**
104         * @param name
105         */
106        public TaskletStep(String name) {
107                super(name);
108        }
109 
110        /*
111         * (non-Javadoc)
112         * 
113         * @see
114         * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet()
115         */
116        @Override
117        public void afterPropertiesSet() throws Exception {
118                super.afterPropertiesSet();
119                Assert.notNull(transactionManager, "TransactionManager is mandatory");
120        }
121 
122        /**
123         * Public setter for the {@link PlatformTransactionManager}.
124         * 
125         * @param transactionManager the transaction manager to set
126         */
127        public void setTransactionManager(PlatformTransactionManager transactionManager) {
128                this.transactionManager = transactionManager;
129        }
130 
131        /**
132         * Public setter for the {@link TransactionAttribute}.
133         * 
134         * @param transactionAttribute the {@link TransactionAttribute} to set
135         */
136        public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
137                this.transactionAttribute = transactionAttribute;
138        }
139 
140        /**
141         * Public setter for the {@link Tasklet}.
142         * 
143         * @param tasklet the {@link Tasklet} to set
144         */
145        public void setTasklet(Tasklet tasklet) {
146                this.tasklet = tasklet;
147                if (tasklet instanceof StepExecutionListener) {
148                        registerStepExecutionListener((StepExecutionListener) tasklet);
149                }
150        }
151 
152        /**
153         * Register a chunk listener for callbacks at the appropriate stages in a
154         * step execution.
155         * 
156         * @param listener a {@link ChunkListener}
157         */
158        public void registerChunkListener(ChunkListener listener) {
159                this.chunkListener.register(listener);
160        }
161 
162        /**
163         * Register each of the objects as listeners.
164         * 
165         * @param listeners an array of listener objects of known types.
166         */
167        public void setChunkListeners(ChunkListener[] listeners) {
168                for (int i = 0; i < listeners.length; i++) {
169                        registerChunkListener(listeners[i]);
170                }
171        }
172 
173        /**
174         * Register each of the streams for callbacks at the appropriate time in the
175         * step. The {@link ItemReader} and {@link ItemWriter} are automatically
176         * registered, but it doesn't hurt to also register them here. Injected
177         * dependencies of the reader and writer are not automatically registered,
178         * so if you implement {@link ItemWriter} using delegation to another object
179         * which itself is a {@link ItemStream}, you need to register the delegate
180         * here.
181         * 
182         * @param streams an array of {@link ItemStream} objects.
183         */
184        public void setStreams(ItemStream[] streams) {
185                for (int i = 0; i < streams.length; i++) {
186                        registerStream(streams[i]);
187                }
188        }
189 
190        /**
191         * Register a single {@link ItemStream} for callbacks to the stream
192         * interface.
193         * 
194         * @param stream
195         */
196        public void registerStream(ItemStream stream) {
197                this.stream.register(stream);
198        }
199 
200        /**
201         * The {@link RepeatOperations} to use for the outer loop of the batch
202         * processing. Should be set up by the caller through a factory. Defaults to
203         * a plain {@link RepeatTemplate}.
204         * 
205         * @param stepOperations a {@link RepeatOperations} instance.
206         */
207        public void setStepOperations(RepeatOperations stepOperations) {
208                this.stepOperations = stepOperations;
209        }
210 
211        /**
212         * Setter for the {@link StepInterruptionPolicy}. The policy is used to
213         * check whether an external request has been made to interrupt the job
214         * execution.
215         * 
216         * @param interruptionPolicy a {@link StepInterruptionPolicy}
217         */
218        public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
219                this.interruptionPolicy = interruptionPolicy;
220        }
221 
222        /**
223         * Process the step and update its context so that progress can be monitored
224         * by the caller. The step is broken down into chunks, each one executing in
225         * a transaction. The step and its execution and execution context are all
226         * given an up to date {@link BatchStatus}, and the {@link JobRepository} is
227         * used to store the result. Various reporting information are also added to
228         * the current context governing the step execution, which would normally be
229         * available to the caller through the step's {@link ExecutionContext}.<br/>
230         * 
231         * @throws JobInterruptedException if the step or a chunk is interrupted
232         * @throws RuntimeException if there is an exception during a chunk
233         * execution
234         * 
235         */
236        @Override
237        protected void doExecute(StepExecution stepExecution) throws Exception {
238 
239                stream.update(stepExecution.getExecutionContext());
240                getJobRepository().updateExecutionContext(stepExecution);
241 
242                stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
243 
244                        @Override
245                        public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
246                                        throws Exception {
247 
248                                StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
249 
250                                StepContribution contribution = stepExecution.createStepContribution();
251 
252                                // Before starting a new transaction, check for
253                                // interruption.
254                                interruptionPolicy.checkInterrupted(stepExecution);
255 
256                                RepeatStatus result = RepeatStatus.CONTINUABLE;
257 
258                                TransactionStatus transaction = transactionManager.getTransaction(transactionAttribute);
259 
260                                chunkListener.beforeChunk();
261 
262                                boolean locked = false;
263 
264                                try {
265 
266                                        try {
267                                                try {
268                                                        result = tasklet.execute(contribution, chunkContext);
269                                                        if(result == null) {
270                                                                result = RepeatStatus.FINISHED;
271                                                        }
272                                                }
273                                                catch (Exception e) {
274                                                        if (transactionAttribute.rollbackOn(e)) {
275                                                                throw e;
276                                                        }
277                                                }
278                                                chunkListener.afterChunk();
279                                        }
280                                        finally {
281                                                // Apply the contribution to the step
282                                                // even if unsuccessful
283                                                logger.debug("Applying contribution: " + contribution);
284                                                stepExecution.apply(contribution);
285 
286                                        }
287 
288                                        // If the step operations are asynchronous then we need
289                                        // to synchronize changes to the step execution (at a
290                                        // minimum).
291                                        try {
292                                                semaphore.acquire();
293                                                locked = true;
294                                        }
295                                        catch (InterruptedException e) {
296                                                stepExecution.setStatus(BatchStatus.STOPPED);
297                                                Thread.currentThread().interrupt();
298                                        }
299 
300                                        stream.update(stepExecution.getExecutionContext());
301 
302                                        try {
303                                                getJobRepository().updateExecutionContext(stepExecution);
304                                                transactionManager.commit(transaction);
305                                                stepExecution.incrementCommitCount();
306                                                logger.debug("Saving step execution after commit: " + stepExecution);
307                                                getJobRepository().update(stepExecution);
308                                        }
309                                        catch (Exception e) {
310                                                throw new FatalException("Fatal failure detected", e);
311                                        }
312 
313                                }
314                                catch (FatalException e) {
315                                        try {
316                                                logger.debug("Rollback for FatalException: " + e.getClass().getName() + ": " + e.getMessage());
317                                                rollback(stepExecution, transaction);
318                                        }
319                                        catch (Exception rollbackException) {
320                                                /*
321                                                 * Propagate the original fatal failure; only log the
322                                                 * failed rollback. The failure can be caused by
323                                                 * attempting a rollback when the commit has already
324                                                 * succeeded (which is normal so only logged at debug
325                                                 * level)
326                                                 */
327                                                logger.debug("Rollback caused by fatal failure failed", rollbackException);
328                                        }
329                                        throw e;
330                                }
331                                catch (Error e) {
332                                        try {
333                                                logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
334                                                rollback(stepExecution, transaction);
335                                        }
336                                        catch (Exception rollbackException) {
337                                                logger.error("Fatal rollback failure, original exception that caused the rollback is", e);
338                                                throw new FatalException("Failed while processing rollback", rollbackException);
339                                        }
340                                        throw e;
341 
342                                }
343                                catch (Exception e) {
344                                        try {
345                                                logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
346                                                rollback(stepExecution, transaction);
347                                        }
348                                        catch (Exception rollbackException) {
349                                                logger.error("Fatal rollback failure, original exception that caused the rollback is", e);
350                                                throw new FatalException("Failed while processing rollback", rollbackException);
351                                        }
352                                        throw e;
353                                }
354                                finally {
355                                        // only release the lock if we acquired it
356                                        if (locked) {
357                                                semaphore.release();
358                                        }
359                                        locked = false;
360                                }
361 
362                                // Check for interruption after transaction as well, so that
363                                // the interrupted exception is correctly propagated up to
364                                // caller
365                                interruptionPolicy.checkInterrupted(stepExecution);
366 
367                                return result;
368                        }
369 
370                });
371 
372        }
373 
374        protected void close(ExecutionContext ctx) throws Exception {
375                stream.close();
376        }
377 
378        protected void open(ExecutionContext ctx) throws Exception {
379                stream.open(ctx);
380        }
381 
382        private void rollback(StepExecution stepExecution, TransactionStatus transaction) {
383                transactionManager.rollback(transaction);
384                stepExecution.incrementRollbackCount();
385        }
386}

[all classes][org.springframework.batch.core.step.tasklet]
EMMA 2.0.5312 (C) Vladimir Roubtsov