EMMA Coverage Report (generated Thu Jan 24 13:37:04 CST 2013)
[all classes][org.springframework.batch.core.step.tasklet]

COVERAGE SUMMARY FOR SOURCE FILE [TaskletStep.java]

nameclass, %method, %block, %line, %
TaskletStep.java100% (5/5)100% (36/36)97%  (570/588)96%  (140.4/146)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TaskletStep$ChunkTransactionCallback100% (1/1)100% (5/5)95%  (326/343)94%  (82.5/88)
afterCompletion (int): void 100% (1/1)93%  (52/56)97%  (16.5/17)
doInTransaction (TransactionStatus): Object 100% (1/1)94%  (215/228)91%  (48/53)
TaskletStep$ChunkTransactionCallback (TaskletStep, ChunkContext, Semaphore): ... 100% (1/1)100% (26/26)100% (8/8)
copy (StepExecution, StepExecution): void 100% (1/1)100% (24/24)100% (6/6)
rollback (StepExecution): void 100% (1/1)100% (9/9)100% (4/4)
     
class TaskletStep100% (1/1)100% (26/26)99%  (176/177)100% (45.9/46)
afterPropertiesSet (): void 100% (1/1)91%  (10/11)97%  (2.9/3)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
TaskletStep (): void 100% (1/1)100% (4/4)100% (2/2)
TaskletStep (String): void 100% (1/1)100% (30/30)100% (7/7)
access$000 (TaskletStep): StepInterruptionPolicy 100% (1/1)100% (3/3)100% (1/1)
access$100 (TaskletStep): PlatformTransactionManager 100% (1/1)100% (3/3)100% (1/1)
access$200 (TaskletStep): TransactionAttribute 100% (1/1)100% (3/3)100% (1/1)
access$300 (TaskletStep): CompositeChunkListener 100% (1/1)100% (3/3)100% (1/1)
access$400 (): Log 100% (1/1)100% (2/2)100% (1/1)
access$500 (TaskletStep): Tasklet 100% (1/1)100% (3/3)100% (1/1)
access$600 (TaskletStep): CompositeItemStream 100% (1/1)100% (3/3)100% (1/1)
access$700 (TaskletStep): JobRepository 100% (1/1)100% (3/3)100% (1/1)
access$800 (TaskletStep): JobRepository 100% (1/1)100% (3/3)100% (1/1)
close (ExecutionContext): void 100% (1/1)100% (4/4)100% (2/2)
createSemaphore (): Semaphore 100% (1/1)100% (5/5)100% (1/1)
doExecute (StepExecution): void 100% (1/1)100% (23/23)100% (5/5)
open (ExecutionContext): void 100% (1/1)100% (5/5)100% (2/2)
registerChunkListener (ChunkListener): void 100% (1/1)100% (5/5)100% (2/2)
registerStream (ItemStream): void 100% (1/1)100% (5/5)100% (2/2)
setChunkListeners (ChunkListener []): void 100% (1/1)100% (14/14)100% (3/3)
setInterruptionPolicy (StepInterruptionPolicy): void 100% (1/1)100% (4/4)100% (2/2)
setStepOperations (RepeatOperations): void 100% (1/1)100% (4/4)100% (2/2)
setStreams (ItemStream []): void 100% (1/1)100% (14/14)100% (3/3)
setTasklet (Tasklet): void 100% (1/1)100% (11/11)100% (4/4)
setTransactionAttribute (TransactionAttribute): void 100% (1/1)100% (4/4)100% (2/2)
setTransactionManager (PlatformTransactionManager): void 100% (1/1)100% (4/4)100% (2/2)
     
class TaskletStep$1100% (1/1)100% (2/2)100% (8/8)100% (2/2)
TaskletStep$1 (TaskletStep): void 100% (1/1)100% (6/6)100% (1/1)
rollbackOn (Throwable): boolean 100% (1/1)100% (2/2)100% (1/1)
     
class TaskletStep$2100% (1/1)100% (2/2)100% (56/56)100% (10/10)
TaskletStep$2 (TaskletStep, StepExecution, Semaphore): void 100% (1/1)100% (10/10)100% (1/1)
doInChunkContext (RepeatContext, ChunkContext): RepeatStatus 100% (1/1)100% (46/46)100% (9/9)
     
class TaskletStep$UncheckedTransactionException100% (1/1)100% (1/1)100% (4/4)100% (2/2)
TaskletStep$UncheckedTransactionException (Exception): void 100% (1/1)100% (4/4)100% (2/2)

1/*
2 * Copyright 2006-2007 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.springframework.batch.core.step.tasklet;
17 
18import java.util.concurrent.Semaphore;
19 
20import org.apache.commons.logging.Log;
21import org.apache.commons.logging.LogFactory;
22import org.springframework.batch.core.BatchStatus;
23import org.springframework.batch.core.ChunkListener;
24import org.springframework.batch.core.JobInterruptedException;
25import org.springframework.batch.core.StepContribution;
26import org.springframework.batch.core.StepExecution;
27import org.springframework.batch.core.StepExecutionListener;
28import org.springframework.batch.core.listener.CompositeChunkListener;
29import org.springframework.batch.core.repository.JobRepository;
30import org.springframework.batch.core.scope.context.ChunkContext;
31import org.springframework.batch.core.scope.context.StepContextRepeatCallback;
32import org.springframework.batch.core.step.AbstractStep;
33import org.springframework.batch.core.step.FatalStepExecutionException;
34import org.springframework.batch.core.step.StepInterruptionPolicy;
35import org.springframework.batch.core.step.ThreadStepInterruptionPolicy;
36import org.springframework.batch.item.ExecutionContext;
37import org.springframework.batch.item.ItemReader;
38import org.springframework.batch.item.ItemStream;
39import org.springframework.batch.item.ItemWriter;
40import org.springframework.batch.item.support.CompositeItemStream;
41import org.springframework.batch.repeat.RepeatContext;
42import org.springframework.batch.repeat.RepeatOperations;
43import org.springframework.batch.repeat.RepeatStatus;
44import org.springframework.batch.repeat.support.RepeatTemplate;
45import org.springframework.transaction.PlatformTransactionManager;
46import org.springframework.transaction.TransactionStatus;
47import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
48import org.springframework.transaction.interceptor.TransactionAttribute;
49import org.springframework.transaction.support.TransactionCallback;
50import org.springframework.transaction.support.TransactionSynchronization;
51import org.springframework.transaction.support.TransactionSynchronizationAdapter;
52import org.springframework.transaction.support.TransactionSynchronizationManager;
53import org.springframework.transaction.support.TransactionTemplate;
54import org.springframework.util.Assert;
55 
56/**
57 * Simple implementation of executing the step as a call to a {@link Tasklet},
58 * possibly repeated, and each call surrounded by a transaction. The structure
59 * is therefore that of a loop with transaction boundary inside the loop. The
60 * loop is controlled by the step operations (
61 * {@link #setStepOperations(RepeatOperations)}).<br/>
62 * <br/>
63 * 
64 * Clients can use interceptors in the step operations to intercept or listen to
65 * the iteration on a step-wide basis, for instance to get a callback when the
66 * step is complete. Those that want callbacks at the level of an individual
67 * tasks, can specify interceptors for the chunk operations.
68 * 
69 * @author Dave Syer
70 * @author Lucas Ward
71 * @author Ben Hale
72 * @author Robert Kasanicky
73 */
74public class TaskletStep extends AbstractStep {
75 
76        private static final Log logger = LogFactory.getLog(TaskletStep.class);
77 
78        private RepeatOperations stepOperations = new RepeatTemplate();
79 
80        private CompositeChunkListener chunkListener = new CompositeChunkListener();
81 
82        // default to checking current thread for interruption.
83        private StepInterruptionPolicy interruptionPolicy = new ThreadStepInterruptionPolicy();
84 
85        private CompositeItemStream stream = new CompositeItemStream();
86 
87        private PlatformTransactionManager transactionManager;
88 
89        private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute() {
90 
91                @Override
92                public boolean rollbackOn(Throwable ex) {
93                        return true;
94                }
95 
96        };
97 
98        private Tasklet tasklet;
99 
100        /**
101         * Default constructor.
102         */
103        public TaskletStep() {
104                this(null);
105        }
106 
107        /**
108         * @param name
109         */
110        public TaskletStep(String name) {
111                super(name);
112        }
113 
114        /*
115         * (non-Javadoc)
116         * 
117         * @see
118         * org.springframework.batch.core.step.AbstractStep#afterPropertiesSet()
119         */
120        @Override
121        public void afterPropertiesSet() throws Exception {
122                super.afterPropertiesSet();
123                Assert.state(transactionManager != null, "A transaction manager must be provided");
124        }
125 
126        /**
127         * Public setter for the {@link PlatformTransactionManager}.
128         * 
129         * @param transactionManager the transaction manager to set
130         */
131        public void setTransactionManager(PlatformTransactionManager transactionManager) {
132                this.transactionManager = transactionManager;
133        }
134 
135        /**
136         * Public setter for the {@link TransactionAttribute}.
137         * 
138         * @param transactionAttribute the {@link TransactionAttribute} to set
139         */
140        public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
141                this.transactionAttribute = transactionAttribute;
142        }
143 
144        /**
145         * Public setter for the {@link Tasklet}.
146         * 
147         * @param tasklet the {@link Tasklet} to set
148         */
149        public void setTasklet(Tasklet tasklet) {
150                this.tasklet = tasklet;
151                if (tasklet instanceof StepExecutionListener) {
152                        registerStepExecutionListener((StepExecutionListener) tasklet);
153                }
154        }
155 
156        /**
157         * Register a chunk listener for callbacks at the appropriate stages in a
158         * step execution.
159         * 
160         * @param listener a {@link ChunkListener}
161         */
162        public void registerChunkListener(ChunkListener listener) {
163                this.chunkListener.register(listener);
164        }
165 
166        /**
167         * Register each of the objects as listeners.
168         * 
169         * @param listeners an array of listener objects of known types.
170         */
171        public void setChunkListeners(ChunkListener[] listeners) {
172                for (int i = 0; i < listeners.length; i++) {
173                        registerChunkListener(listeners[i]);
174                }
175        }
176 
177        /**
178         * Register each of the streams for callbacks at the appropriate time in the
179         * step. The {@link ItemReader} and {@link ItemWriter} are automatically
180         * registered, but it doesn't hurt to also register them here. Injected
181         * dependencies of the reader and writer are not automatically registered,
182         * so if you implement {@link ItemWriter} using delegation to another object
183         * which itself is a {@link ItemStream}, you need to register the delegate
184         * here.
185         * 
186         * @param streams an array of {@link ItemStream} objects.
187         */
188        public void setStreams(ItemStream[] streams) {
189                for (int i = 0; i < streams.length; i++) {
190                        registerStream(streams[i]);
191                }
192        }
193 
194        /**
195         * Register a single {@link ItemStream} for callbacks to the stream
196         * interface.
197         * 
198         * @param stream
199         */
200        public void registerStream(ItemStream stream) {
201                this.stream.register(stream);
202        }
203 
204        /**
205         * The {@link RepeatOperations} to use for the outer loop of the batch
206         * processing. Should be set up by the caller through a factory. Defaults to
207         * a plain {@link RepeatTemplate}.
208         * 
209         * @param stepOperations a {@link RepeatOperations} instance.
210         */
211        public void setStepOperations(RepeatOperations stepOperations) {
212                this.stepOperations = stepOperations;
213        }
214 
215        /**
216         * Setter for the {@link StepInterruptionPolicy}. The policy is used to
217         * check whether an external request has been made to interrupt the job
218         * execution.
219         * 
220         * @param interruptionPolicy a {@link StepInterruptionPolicy}
221         */
222        public void setInterruptionPolicy(StepInterruptionPolicy interruptionPolicy) {
223                this.interruptionPolicy = interruptionPolicy;
224        }
225 
226        /**
227         * Process the step and update its context so that progress can be monitored
228         * by the caller. The step is broken down into chunks, each one executing in
229         * a transaction. The step and its execution and execution context are all
230         * given an up to date {@link BatchStatus}, and the {@link JobRepository} is
231         * used to store the result. Various reporting information are also added to
232         * the current context governing the step execution, which would normally be
233         * available to the caller through the step's {@link ExecutionContext}.<br/>
234         * 
235         * @throws JobInterruptedException if the step or a chunk is interrupted
236         * @throws RuntimeException if there is an exception during a chunk
237         * execution
238         * 
239         */
240        @Override
241        protected void doExecute(StepExecution stepExecution) throws Exception {
242 
243                stream.update(stepExecution.getExecutionContext());
244                getJobRepository().updateExecutionContext(stepExecution);
245 
246                // Shared semaphore per step execution, so other step executions can run
247                // in parallel without needing the lock
248                final Semaphore semaphore = createSemaphore();
249 
250                stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
251 
252                        @Override
253                        public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
254                                        throws Exception {
255 
256                                StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
257 
258                                // Before starting a new transaction, check for
259                                // interruption.
260                                interruptionPolicy.checkInterrupted(stepExecution);
261 
262                                RepeatStatus result;
263                                try {
264                                        result = (RepeatStatus) new TransactionTemplate(transactionManager, transactionAttribute)
265                                                        .execute(new ChunkTransactionCallback(chunkContext, semaphore));
266                                }
267                                catch (UncheckedTransactionException e) {
268                                        // Allow checked exceptions to be thrown inside callback
269                                        throw (Exception) e.getCause();
270                                }
271 
272                                chunkListener.afterChunk();
273 
274                                // Check for interruption after transaction as well, so that
275                                // the interrupted exception is correctly propagated up to
276                                // caller
277                                interruptionPolicy.checkInterrupted(stepExecution);
278 
279                                return result;
280                        }
281 
282                });
283 
284        }
285 
286        /**
287         * Extension point mainly for test purposes so that the behaviour of the
288         * lock can be manipulated to simulate various pathologies.
289         * 
290         * @return a semaphore for locking access to the JobRepository
291         */
292        protected Semaphore createSemaphore() {
293                return new Semaphore(1);
294        }
295 
296        protected void close(ExecutionContext ctx) throws Exception {
297                stream.close();
298        }
299 
300        protected void open(ExecutionContext ctx) throws Exception {
301                stream.open(ctx);
302        }
303 
304        /**
305         * A callback for the transactional work inside a chunk. Also detects
306         * failures in the transaction commit and rollback, only panicking if the
307         * transaction status is unknown (i.e. if a commit failure leads to a clean
308         * rollback then we assume the state is consistent).
309         * 
310         * @author Dave Syer
311         * 
312         */
313        private class ChunkTransactionCallback extends TransactionSynchronizationAdapter implements TransactionCallback {
314 
315                private final StepExecution stepExecution;
316 
317                private final ChunkContext chunkContext;
318 
319                private boolean rolledBack = false;
320 
321                private boolean stepExecutionUpdated = false;
322 
323                private StepExecution oldVersion;
324 
325                private boolean locked = false;
326 
327                private final Semaphore semaphore;
328 
329                public ChunkTransactionCallback(ChunkContext chunkContext, Semaphore semaphore) {
330                        this.chunkContext = chunkContext;
331                        this.stepExecution = chunkContext.getStepContext().getStepExecution();
332                        this.semaphore = semaphore;
333                }
334 
335                @Override
336                public void afterCompletion(int status) {
337                        try {
338                                if (status != TransactionSynchronization.STATUS_COMMITTED) {
339                                        if (stepExecutionUpdated) {
340                                                // Wah! the commit failed. We need to rescue the step
341                                                // execution data.
342                                                logger.info("Commit failed while step execution data was already updated. "
343                                                                + "Reverting to old version.");
344                                                copy(oldVersion, stepExecution);
345                                                if (status == TransactionSynchronization.STATUS_ROLLED_BACK) {
346                                                        rollback(stepExecution);
347                                                }
348                                        }
349                                }
350                                if (status == TransactionSynchronization.STATUS_UNKNOWN) {
351                                        logger.error("Rolling back with transaction in unknown state");
352                                        rollback(stepExecution);
353                                        stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
354                                        stepExecution.setTerminateOnly();
355                                }
356                        }
357                        finally {
358                                // Only release the lock if we acquired it, and release as late
359                                // as possible
360                                if (locked) {
361                                        semaphore.release();
362                                }
363                                locked = false;
364                        }
365                }
366 
367                public Object doInTransaction(TransactionStatus status) {
368 
369                        TransactionSynchronizationManager.registerSynchronization(this);
370 
371                        RepeatStatus result = RepeatStatus.CONTINUABLE;
372 
373                        StepContribution contribution = stepExecution.createStepContribution();
374 
375                        chunkListener.beforeChunk();
376 
377                        // In case we need to push it back to its old value
378                        // after a commit fails...
379                        oldVersion = new StepExecution(stepExecution.getStepName(), stepExecution.getJobExecution());
380                        copy(stepExecution, oldVersion);
381 
382                        try {
383 
384                                try {
385                                        try {
386                                                result = tasklet.execute(contribution, chunkContext);
387                                                if (result == null) {
388                                                        result = RepeatStatus.FINISHED;
389                                                }
390                                        }
391                                        catch (Exception e) {
392                                                if (transactionAttribute.rollbackOn(e)) {
393                                                        throw e;
394                                                }
395                                        }
396 
397                                }
398                                finally {
399 
400                                        // If the step operations are asynchronous then we need
401                                        // to synchronize changes to the step execution (at a
402                                        // minimum). Take the lock *before* changing the step
403                                        // execution.
404                                        try {
405                                                semaphore.acquire();
406                                                locked = true;
407                                        }
408                                        catch (InterruptedException e) {
409                                                logger.error("Thread interrupted while locking for repository update");
410                                                stepExecution.setStatus(BatchStatus.STOPPED);
411                                                stepExecution.setTerminateOnly();
412                                                Thread.currentThread().interrupt();
413                                        }
414 
415                                        // Apply the contribution to the step
416                                        // even if unsuccessful
417                                        logger.debug("Applying contribution: " + contribution);
418                                        stepExecution.apply(contribution);
419 
420                                }
421 
422                                stepExecutionUpdated = true;
423 
424                                stream.update(stepExecution.getExecutionContext());
425 
426                                try {
427                                        // Going to attempt a commit. If it fails this flag will
428                                        // stay false and we can use that later.
429                                        getJobRepository().updateExecutionContext(stepExecution);
430                                        stepExecution.incrementCommitCount();
431                                        logger.debug("Saving step execution before commit: " + stepExecution);
432                                        getJobRepository().update(stepExecution);
433                                }
434                                catch (Exception e) {
435                                        // If we get to here there was a problem saving the step
436                                        // execution and we have to fail.
437                                        String msg = "JobRepository failure forcing exit with unknown status";
438                                        logger.error(msg, e);
439                                        stepExecution.upgradeStatus(BatchStatus.UNKNOWN);
440                                        stepExecution.setTerminateOnly();
441                                        throw new FatalStepExecutionException(msg, e);
442                                }
443 
444                        }
445                        catch (Error e) {
446                                logger.debug("Rollback for Error: " + e.getClass().getName() + ": " + e.getMessage());
447                                rollback(stepExecution);
448                                throw e;
449                        }
450                        catch (RuntimeException e) {
451                                logger.debug("Rollback for RuntimeException: " + e.getClass().getName() + ": " + e.getMessage());
452                                rollback(stepExecution);
453                                throw e;
454                        }
455                        catch (Exception e) {
456                                logger.debug("Rollback for Exception: " + e.getClass().getName() + ": " + e.getMessage());
457                                rollback(stepExecution);
458                                // Allow checked exceptions
459                                throw new UncheckedTransactionException(e);
460                        }
461 
462                        return result;
463 
464                }
465 
466                private void rollback(StepExecution stepExecution) {
467                        if (!rolledBack) {
468                                stepExecution.incrementRollbackCount();
469                                rolledBack = true;
470                        }
471                }
472 
473                private void copy(final StepExecution source, final StepExecution target) {
474                        target.setVersion(source.getVersion());
475                        target.setWriteCount(source.getWriteCount());
476                        target.setFilterCount(source.getFilterCount());
477                        target.setCommitCount(source.getCommitCount());
478                        target.setExecutionContext(new ExecutionContext(source.getExecutionContext()));
479                }
480 
481        }
482 
483        /**
484         * Convenience wrapper for a checked exception so that it can cause a
485         * rollback and be extracted afterwards.
486         * 
487         * @author Dave Syer
488         * 
489         */
490        private static class UncheckedTransactionException extends RuntimeException {
491 
492                public UncheckedTransactionException(Exception e) {
493                        super(e);
494                }
495 
496        }
497 
498}

[all classes][org.springframework.batch.core.step.tasklet]
EMMA 2.0.5312 (C) Vladimir Roubtsov